【架构实战】CQRS命令查询职责分离:读写分离的进阶实践
【架构实战】CQRS命令查询职责分离:读写分离的进阶实践
一、背景:一个报表查询拖垮了整个交易系统
2020年双十一,我们交易系统出现了一次P0事故。
事情的起因很简单:运营想在活动看板上实时查看"各渠道的GMV和转化率"。后端查询需要JOIN五张表(订单表、订单明细表、渠道表、用户表、支付流水表),做了三个聚合计算(SUM、COUNT DISTINCT、GROUP BY)。
上午10点,运营打开了看板页面。30秒后,交易系统响应时间从50ms飙升到8秒。数据库CPU从30%直接打到100%。所有下单请求开始超时。
根本原因:查询和命令共享了同一套数据模型。交易系统用的主库既要处理高并发写入(下单),又要承受复杂的分析查询(报表)。这在传统架构里是无解的——你不可能对着一张既要快速插入又要复杂查询的表,同时优化写性能和读性能。
这就是CQRS要解决的问题。
二、CQRS核心原理
2.1 什么是CQRS
CQRS(Command Query Responsibility Segregation,命令查询职责分离)是Greg Young在2010年提出的一种架构模式。核心思想只有一句话:将系统的读操作和写操作分离到不同的模型。
【传统架构】
┌──────────┐
│ Controller │
└──────┬───┘
│
┌──────▼───┐
│ Service │ ← 读写混合,一个模型同时处理
└──────┬───┘
│
┌────────────┼────────────┐
│ │ │
┌─────▼─────┐ ┌────▼────┐ ┌─────▼─────┐
│ 查询1 │ │ 写入 │ │ 查询2 │
│ (报表) │ │ (下单) │ │ (详情) │
└───────────┘ └─────────┘ └───────────┘
【CQRS架构】
┌──────────┐
│ 应用层 │
└──────┬───┘
│
┌────────────┼────────────┐
│ │
┌─────▼─────┐ ┌────▼────┐
│ 命令模型 │ │ 查询模型 │
│(Command) │ │(Query) │
│ 写优化 │────事件─────▶│ 读优化 │
└─────┬─────┘ └────┬────┘
│ │
┌─────▼─────┐ ┌────▼────┐
│ 写库 │ │ 读库 │
│ MySQL(主) │ │ ES/Redis │
└───────────┘ └──────────┘
2.2 CQRS vs 传统读写分离
很多人以为CQRS就是数据库主从读写分离。这是一个常见的误解。
| 维度 | 数据库读写分离 | CQRS |
|---|---|---|
| 分离层级 | 数据库层 | 应用层(模型层) |
| 数据模型 | 同一个表结构 | 不同的数据模型(可以不同表、甚至不同数据库) |
| 一致性 | 主从复制延迟(秒级) | 事件异步同步(毫秒到秒级) |
| 优化方向 | 主库优化写入,从库分担读压力 | 写模型优化业务完整性,读模型优化查询性能 |
| 适用场景 | 读多写少 | 读写模型差异大、查询复杂 |
一句话总结:CQRS是模型级别的读写分离,传统方案是数据级别的读写分离。
2.3 何时该用CQRS
不是所有系统都需要CQRS。判断标准:
需要CQRS的标志:
✅ 查询需要的数据结构与写入的数据结构差异很大
✅ 查询需要JOIN多张表,写入只需要单表
✅ 查询量远大于写入量(100:1以上)
✅ 查询需要跨多个限界上下文聚合数据
不需要CQRS的标志:
❌ 读写模型几乎一致
❌ 业务逻辑简单,CRUD即可
❌ 团队规模小,引入CQRS增加维护成本
❌ 查询和写入量都比较低
三、实战:交易系统CQRS改造
3.1 命令模型(写模型)
// ===== 命令端:领域模型,保持业务完整性 =====
// 命令对象
@Data
public class CreateOrderCommand {
@NotBlank
private String userId;
@NotEmpty
private List<OrderItemCommand> items;
private String couponId;
private String addressId;
}
// 命令处理器
@Service
public class CreateOrderCommandHandler {
@Autowired
private OrderRepository orderRepository;
@Autowired
private InventoryService inventoryService;
@Autowired
private EventBus eventBus;
@Transactional
public OrderId handle(CreateOrderCommand command) {
// 1. 创建订单聚合(充血模型,包含业务规则)
Order order = Order.create(command);
// 2. 扣减库存
command.getItems().forEach(item ->
inventoryService.deduct(item.getProductId(), item.getQuantity()));
// 3. 持久化
orderRepository.save(order);
// 4. 发布领域事件 → 驱动读模型更新
eventBus.publish(new OrderCreatedEvent(order));
return order.getId();
}
}
// 订单聚合(写模型:关注业务完整性)
@Entity
@Table(name = "t_order")
public class Order {
@Id
private String orderId;
private String userId;
private BigDecimal totalAmount;
private String status; // CREATED → PAID → SHIPPED → COMPLETED
private String addressId;
private String couponId;
private LocalDateTime createdAt;
// 订单明细(一对多)
@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@JoinColumn(name = "order_id")
private List<OrderItem> items;
// ===== 业务方法 =====
public static Order create(CreateOrderCommand command) {
Order order = new Order();
order.orderId = OrderIdGenerator.generate();
order.userId = command.getUserId();
order.status = "CREATED";
order.createdAt = LocalDateTime.now();
// 计算订单金额
order.items = command.getItems().stream()
.map(item -> OrderItem.create(order.orderId, item))
.collect(Collectors.toList());
order.totalAmount = order.items.stream()
.map(OrderItem::getSubTotal)
.reduce(BigDecimal.ZERO, BigDecimal::add);
return order;
}
public void pay(String payMethod) {
if (!"CREATED".equals(this.status)) {
throw new OrderException("只能支付创建状态的订单");
}
this.status = "PAID";
}
}
3.2 查询模型(读模型)
// ===== 查询端:扁平化、宽表、NoSQL,优化查询性能 =====
// 读模型DTO:一张宽表,包含所有展示需要的数据
@Document(indexName = "order_view")
@Data
public class OrderView {
@Id
private String orderId;
private String userId;
private String userName; // 冗余用户名
private String userPhone; // 冗余用户手机
private BigDecimal totalAmount;
private String status;
private String statusDisplay; // 状态中文名:已创建/已支付/已发货
private String addressDetail; // 冗余地址详情
private String couponName; // 冗余优惠券名称
private List<OrderItemView> items; // 嵌套对象
private LocalDateTime createdAt;
private LocalDateTime paidAt;
}
// 查询处理器
@Service
public class OrderQueryHandler {
@Autowired
private ElasticsearchTemplate esTemplate;
@Autowired
private RedisTemplate<String, OrderView> redisTemplate;
public PageResult<OrderListDTO> listOrders(ListOrdersQuery query) {
// 构建ES查询
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("userId", query.getUserId()))
.must(QueryBuilders.rangeQuery("createdAt")
.gte(query.getStartTime())
.lte(query.getEndTime())))
.withSort(SortBuilders.fieldSort("createdAt").order(SortOrder.DESC))
.withPageable(PageRequest.of(query.getPage(), query.getSize()))
.build();
// 从ES查询(毫秒级响应)
return esTemplate.search(searchQuery, OrderView.class);
}
public OrderDetailDTO getOrderDetail(String orderId) {
// 先查缓存
String cacheKey = "order:detail:" + orderId;
OrderView cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return OrderDetailDTO.from(cached);
}
// 缓存未命中,查ES
OrderView view = esTemplate.get(orderId, OrderView.class);
if (view != null) {
redisTemplate.opsForValue().set(cacheKey, view, 5, TimeUnit.MINUTES);
}
return OrderDetailDTO.from(view);
}
// 运营看板:实时聚合查询(ClickHouse)
public DashboardDTO getDashboard() {
// 从ClickHouse查询聚合数据
String sql = """
SELECT
channel_id,
COUNT(DISTINCT user_id) as uv,
COUNT(*) as order_cnt,
SUM(amount) as gmv,
SUM(amount) / COUNT(*) as avg_order_amount
FROM order_wide_table
WHERE created_at >= today()
GROUP BY channel_id
""";
return clickhouseTemplate.query(sql, DashboardDTO.class);
}
}
3.3 事件处理器:同步读写模型
// ===== 写模型变更 → 发布事件 → 读模型更新 =====
@Component
public class OrderEventProjector {
@Autowired
private ElasticsearchTemplate esTemplate;
@Autowired
private RedisTemplate<String, OrderView> redisTemplate;
// 监听订单创建事件
@EventListener
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderCreated(OrderCreatedEvent event) {
Order order = event.getOrder();
// 构建读模型(聚合多源数据)
OrderView view = new OrderView();
view.setOrderId(order.getOrderId());
view.setUserId(order.getUserId());
view.setUserName(userService.getName(order.getUserId())); // 冗余用户名
view.setTotalAmount(order.getTotalAmount());
view.setStatus("CREATED");
view.setStatusDisplay("已创建");
view.setAddressDetail(addressService.getDetail(order.getAddressId()));
if (order.getCouponId() != null) {
view.setCouponName(couponService.getName(order.getCouponId()));
}
view.setItems(order.getItems().stream()
.map(item -> {
OrderItemView itemView = new OrderItemView();
itemView.setProductId(item.getProductId());
itemView.setProductName(productService.getName(item.getProductId()));
itemView.setQuantity(item.getQuantity());
itemView.setPrice(item.getPrice());
return itemView;
})
.collect(Collectors.toList()));
// 写入ES(查询模型)
esTemplate.save(view);
// 清除列表缓存
String listCacheKey = "order:list:" + order.getUserId();
redisTemplate.delete(listCacheKey);
log.info("读模型已更新: orderId={}", order.getOrderId());
}
// 监听订单支付完成事件
@EventListener
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderPaid(OrderPaidEvent event) {
// 只更新变化的状态字段(增量更新)
UpdateRequest updateRequest = new UpdateRequest()
.set("status", "PAID")
.set("statusDisplay", "已支付")
.set("paidAt", event.getPaidAt());
esTemplate.update(updateRequest, OrderView.class, event.getOrderId());
// 清除详情缓存
String detailCacheKey = "order:detail:" + event.getOrderId();
redisTemplate.delete(detailCacheKey);
}
}
四、CQRS的核心挑战与应对
4.1 最终一致性
问题:写模型更新了,但读模型还没更新,用户刷新后看到旧数据。
应对策略:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 命令端返回最新状态 | 写入成功后,直接在响应中返回最新数据 | 简单场景,下单后展示订单详情 |
| 前端轮询+loading | 前端显示"处理中",定时刷新直到数据一致 | 秒杀、支付结果查询 |
| 事件驱动+WebSocket推送 | 读模型更新后,主动推送给前端 | 实时要求高的场景 |
| 乐观UI更新 | 提交后立即在前端显示新状态,后台异步纠正 | 社交互动(点赞、评论) |
我们的方案:命令执行成功后,对于关键操作(支付、退款),使用WebSocket主动推送状态变更;对于非关键操作(浏览记录),接受秒级延迟。
4.2 读写模型差异大如何保证数据正确
问题:读模型从多个数据源聚合数据,如何保证聚合逻辑的正确性?
应对策略:投影(Projection)模式——每个事件驱动的读模型更新,都应该是一个可重放的、幂等的函数。
// 读模型重建:当发现数据不一致时,可以全量重建
@Component
public class OrderViewRebuilder {
@Autowired
private OrderRepository orderRepository;
@Autowired
private ElasticsearchTemplate esTemplate;
/** 全量重建指定时间范围的读模型 */
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点
public void rebuildDailyViews() {
LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
LocalDateTime today = LocalDateTime.now();
List<Order> orders = orderRepository.findByCreatedAtBetween(yesterday, today);
for (Order order : orders) {
OrderView view = buildOrderView(order);
esTemplate.save(view);
}
log.info("读模型重建完成: 共{}条订单", orders.size());
}
}
4.3 事件顺序问题
如果订单先发生"创建"事件,再发生"支付"事件,但如果"支付"事件先到达投影器,就会导致状态错误。
解决方案:
// 利用数据库的乐观锁保证顺序
@Document(indexName = "order_view")
public class OrderView {
@Version
private Long version; // ES版本号,乐观锁
// 只有高版本才能覆盖低版本
}
// 投影器:只接受更高版本的事件
public void onOrderPaid(OrderPaidEvent event) {
// 检查当前读模型的版本
OrderView current = esTemplate.get(event.getOrderId(), OrderView.class);
if (current != null && current.getVersion() >= event.getVersion()) {
log.warn("忽略过期事件: orderId={}, currentVersion={}, eventVersion={}",
event.getOrderId(), current.getVersion(), event.getVersion());
return;
}
// 更新读模型...
}
五、CQRS配合Event Sourcing的进阶用法
当CQRS和Event Sourcing结合,可以构建出可审计、可回溯的终极架构:
// 事件存储(Event Store)
CREATE TABLE event_store (
event_id VARCHAR(64) PRIMARY KEY,
aggregate_id VARCHAR(64) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSON NOT NULL,
version INT NOT NULL,
occurred_at TIMESTAMP NOT NULL,
INDEX idx_aggregate (aggregate_id, version)
);
// 命令处理器:不修改状态,只追加事件
@Service
public class OrderCommandHandler {
@Transactional
public void payOrder(PayOrderCommand command) {
// 1. 加载事件流
List<Event> events = eventStore.load(command.getOrderId());
// 2. 重放事件,重建聚合状态
Order order = Order.replay(events);
// 3. 执行业务操作,产生新事件
OrderPaidEvent paidEvent = order.pay(command.getPayMethod());
// 4. 追加事件
eventStore.append(command.getOrderId(), paidEvent, events.size() + 1);
// 5. 发布事件(异步更新读模型)
eventBus.publish(paidEvent);
}
}
六、技术选型建议
| 组件 | 推荐方案 | 备选方案 |
|---|---|---|
| 写数据库 | MySQL/PostgreSQL | MongoDB |
| 读数据库(搜索) | Elasticsearch | Solr |
| 读数据库(聚合) | ClickHouse | Druid、Doris |
| 缓存 | Redis Cluster | Caffeine(本地) |
| 事件总线 | RocketMQ/Kafka | RabbitMQ |
| 事件存储 | PostgreSQL/EventStoreDB | Axon Framework |
七、总结
CQRS不是银弹,它有明显的代价:系统复杂度翻倍,运维成本增加,最终一致性需要额外处理。
但如果你的系统面临以下情况,CQRS是值得投入的:
- 读写模型差异大:查询需要JOIN多张表,写入需要保持事务完整性
- 读写比例悬殊:大量复杂查询拖慢写入性能
- 团队足够成熟:能驾驭事件驱动、最终一致性、投影重建等概念
- 业务价值足够高:性能提升带来的业务收益 > 架构复杂度带来的成本
核心经验:
- 从最简单的CQRS开始:先分离读写Service,再分离数据模型,最后分离数据库
- 读模型可以有多个:搜索用ES、聚合用ClickHouse、列表用Redis,各司其职
- 不要追求即时一致性:接受秒级延迟,用"好体验"覆盖"小延迟"
- 建立读模型重建机制:比"保证永远正确"更重要的是"出错后能快速修复"
CQRS的本质是用空间换时间、用最终一致性换性能。当你接受这个权衡时,你离高可用的系统架构就不远了。
个人观点,仅供参考
更多推荐
所有评论(0)