【架构实战】CQRS命令查询职责分离:读写分离的进阶实践

一、背景:一个报表查询拖垮了整个交易系统

2020年双十一,我们交易系统出现了一次P0事故。

事情的起因很简单:运营想在活动看板上实时查看"各渠道的GMV和转化率"。后端查询需要JOIN五张表(订单表、订单明细表、渠道表、用户表、支付流水表),做了三个聚合计算(SUM、COUNT DISTINCT、GROUP BY)。

上午10点,运营打开了看板页面。30秒后,交易系统响应时间从50ms飙升到8秒。数据库CPU从30%直接打到100%。所有下单请求开始超时。

根本原因:查询和命令共享了同一套数据模型。交易系统用的主库既要处理高并发写入(下单),又要承受复杂的分析查询(报表)。这在传统架构里是无解的——你不可能对着一张既要快速插入又要复杂查询的表,同时优化写性能和读性能。

这就是CQRS要解决的问题。


二、CQRS核心原理

2.1 什么是CQRS

CQRS(Command Query Responsibility Segregation,命令查询职责分离)是Greg Young在2010年提出的一种架构模式。核心思想只有一句话:将系统的读操作和写操作分离到不同的模型

【传统架构】
                    ┌──────────┐
                    │   Controller  │
                    └──────┬───┘
                           │
                    ┌──────▼───┐
                    │  Service    │ ← 读写混合,一个模型同时处理
                    └──────┬───┘
                           │
              ┌────────────┼────────────┐
              │            │            │
        ┌─────▼─────┐ ┌────▼────┐ ┌─────▼─────┐
        │  查询1     │ │  写入   │ │  查询2     │
        │ (报表)     │ │ (下单)  │ │ (详情)     │
        └───────────┘ └─────────┘ └───────────┘

【CQRS架构】
                    ┌──────────┐
                    │  应用层    │
                    └──────┬───┘
                           │
              ┌────────────┼────────────┐
              │                         │
        ┌─────▼─────┐              ┌────▼────┐
        │  命令模型  │              │ 查询模型  │
        │(Command) │              │(Query)  │
        │  写优化    │────事件─────▶│  读优化    │
        └─────┬─────┘              └────┬────┘
              │                         │
        ┌─────▼─────┐              ┌────▼────┐
        │  写库      │              │  读库     │
        │ MySQL(主)  │              │ ES/Redis  │
        └───────────┘              └──────────┘

2.2 CQRS vs 传统读写分离

很多人以为CQRS就是数据库主从读写分离。这是一个常见的误解。

维度 数据库读写分离 CQRS
分离层级 数据库层 应用层(模型层)
数据模型 同一个表结构 不同的数据模型(可以不同表、甚至不同数据库)
一致性 主从复制延迟(秒级) 事件异步同步(毫秒到秒级)
优化方向 主库优化写入,从库分担读压力 写模型优化业务完整性,读模型优化查询性能
适用场景 读多写少 读写模型差异大、查询复杂

一句话总结:CQRS是模型级别的读写分离,传统方案是数据级别的读写分离。

2.3 何时该用CQRS

不是所有系统都需要CQRS。判断标准:

需要CQRS的标志:
✅ 查询需要的数据结构与写入的数据结构差异很大
✅ 查询需要JOIN多张表,写入只需要单表
✅ 查询量远大于写入量(100:1以上)
✅ 查询需要跨多个限界上下文聚合数据

不需要CQRS的标志:
❌ 读写模型几乎一致
❌ 业务逻辑简单,CRUD即可
❌ 团队规模小,引入CQRS增加维护成本
❌ 查询和写入量都比较低

三、实战:交易系统CQRS改造

3.1 命令模型(写模型)

// ===== 命令端:领域模型,保持业务完整性 =====

// 命令对象
@Data
public class CreateOrderCommand {
    @NotBlank
    private String userId;
    
    @NotEmpty
    private List<OrderItemCommand> items;
    
    private String couponId;
    private String addressId;
}

// 命令处理器
@Service
public class CreateOrderCommandHandler {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private InventoryService inventoryService;
    
    @Autowired
    private EventBus eventBus;
    
    @Transactional
    public OrderId handle(CreateOrderCommand command) {
        // 1. 创建订单聚合(充血模型,包含业务规则)
        Order order = Order.create(command);
        
        // 2. 扣减库存
        command.getItems().forEach(item -> 
            inventoryService.deduct(item.getProductId(), item.getQuantity()));
        
        // 3. 持久化
        orderRepository.save(order);
        
        // 4. 发布领域事件 → 驱动读模型更新
        eventBus.publish(new OrderCreatedEvent(order));
        
        return order.getId();
    }
}

// 订单聚合(写模型:关注业务完整性)
@Entity
@Table(name = "t_order")
public class Order {
    @Id
    private String orderId;
    private String userId;
    private BigDecimal totalAmount;
    private String status;       // CREATED → PAID → SHIPPED → COMPLETED
    private String addressId;
    private String couponId;
    private LocalDateTime createdAt;
    
    // 订单明细(一对多)
    @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
    @JoinColumn(name = "order_id")
    private List<OrderItem> items;
    
    // ===== 业务方法 =====
    
    public static Order create(CreateOrderCommand command) {
        Order order = new Order();
        order.orderId = OrderIdGenerator.generate();
        order.userId = command.getUserId();
        order.status = "CREATED";
        order.createdAt = LocalDateTime.now();
        
        // 计算订单金额
        order.items = command.getItems().stream()
            .map(item -> OrderItem.create(order.orderId, item))
            .collect(Collectors.toList());
        
        order.totalAmount = order.items.stream()
            .map(OrderItem::getSubTotal)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
        
        return order;
    }
    
    public void pay(String payMethod) {
        if (!"CREATED".equals(this.status)) {
            throw new OrderException("只能支付创建状态的订单");
        }
        this.status = "PAID";
    }
}

3.2 查询模型(读模型)

// ===== 查询端:扁平化、宽表、NoSQL,优化查询性能 =====

// 读模型DTO:一张宽表,包含所有展示需要的数据
@Document(indexName = "order_view")
@Data
public class OrderView {
    @Id
    private String orderId;
    private String userId;
    private String userName;           // 冗余用户名
    private String userPhone;          // 冗余用户手机
    
    private BigDecimal totalAmount;
    private String status;
    private String statusDisplay;      // 状态中文名:已创建/已支付/已发货
    
    private String addressDetail;      // 冗余地址详情
    private String couponName;         // 冗余优惠券名称
    
    private List<OrderItemView> items; // 嵌套对象
    private LocalDateTime createdAt;
    private LocalDateTime paidAt;
}

// 查询处理器
@Service
public class OrderQueryHandler {
    
    @Autowired
    private ElasticsearchTemplate esTemplate;
    
    @Autowired
    private RedisTemplate<String, OrderView> redisTemplate;
    
    public PageResult<OrderListDTO> listOrders(ListOrdersQuery query) {
        // 构建ES查询
        NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
            .withQuery(QueryBuilders.boolQuery()
                .must(QueryBuilders.termQuery("userId", query.getUserId()))
                .must(QueryBuilders.rangeQuery("createdAt")
                    .gte(query.getStartTime())
                    .lte(query.getEndTime())))
            .withSort(SortBuilders.fieldSort("createdAt").order(SortOrder.DESC))
            .withPageable(PageRequest.of(query.getPage(), query.getSize()))
            .build();
        
        // 从ES查询(毫秒级响应)
        return esTemplate.search(searchQuery, OrderView.class);
    }
    
    public OrderDetailDTO getOrderDetail(String orderId) {
        // 先查缓存
        String cacheKey = "order:detail:" + orderId;
        OrderView cached = redisTemplate.opsForValue().get(cacheKey);
        if (cached != null) {
            return OrderDetailDTO.from(cached);
        }
        
        // 缓存未命中,查ES
        OrderView view = esTemplate.get(orderId, OrderView.class);
        if (view != null) {
            redisTemplate.opsForValue().set(cacheKey, view, 5, TimeUnit.MINUTES);
        }
        
        return OrderDetailDTO.from(view);
    }
    
    // 运营看板:实时聚合查询(ClickHouse)
    public DashboardDTO getDashboard() {
        // 从ClickHouse查询聚合数据
        String sql = """
            SELECT 
                channel_id,
                COUNT(DISTINCT user_id) as uv,
                COUNT(*) as order_cnt,
                SUM(amount) as gmv,
                SUM(amount) / COUNT(*) as avg_order_amount
            FROM order_wide_table
            WHERE created_at >= today()
            GROUP BY channel_id
        """;
        return clickhouseTemplate.query(sql, DashboardDTO.class);
    }
}

3.3 事件处理器:同步读写模型

// ===== 写模型变更 → 发布事件 → 读模型更新 =====

@Component
public class OrderEventProjector {
    
    @Autowired
    private ElasticsearchTemplate esTemplate;
    
    @Autowired
    private RedisTemplate<String, OrderView> redisTemplate;
    
    // 监听订单创建事件
    @EventListener
    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onOrderCreated(OrderCreatedEvent event) {
        Order order = event.getOrder();
        
        // 构建读模型(聚合多源数据)
        OrderView view = new OrderView();
        view.setOrderId(order.getOrderId());
        view.setUserId(order.getUserId());
        view.setUserName(userService.getName(order.getUserId()));  // 冗余用户名
        view.setTotalAmount(order.getTotalAmount());
        view.setStatus("CREATED");
        view.setStatusDisplay("已创建");
        view.setAddressDetail(addressService.getDetail(order.getAddressId()));
        
        if (order.getCouponId() != null) {
            view.setCouponName(couponService.getName(order.getCouponId()));
        }
        
        view.setItems(order.getItems().stream()
            .map(item -> {
                OrderItemView itemView = new OrderItemView();
                itemView.setProductId(item.getProductId());
                itemView.setProductName(productService.getName(item.getProductId()));
                itemView.setQuantity(item.getQuantity());
                itemView.setPrice(item.getPrice());
                return itemView;
            })
            .collect(Collectors.toList()));
        
        // 写入ES(查询模型)
        esTemplate.save(view);
        
        // 清除列表缓存
        String listCacheKey = "order:list:" + order.getUserId();
        redisTemplate.delete(listCacheKey);
        
        log.info("读模型已更新: orderId={}", order.getOrderId());
    }
    
    // 监听订单支付完成事件
    @EventListener
    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onOrderPaid(OrderPaidEvent event) {
        // 只更新变化的状态字段(增量更新)
        UpdateRequest updateRequest = new UpdateRequest()
            .set("status", "PAID")
            .set("statusDisplay", "已支付")
            .set("paidAt", event.getPaidAt());
        
        esTemplate.update(updateRequest, OrderView.class, event.getOrderId());
        
        // 清除详情缓存
        String detailCacheKey = "order:detail:" + event.getOrderId();
        redisTemplate.delete(detailCacheKey);
    }
}

四、CQRS的核心挑战与应对

4.1 最终一致性

问题:写模型更新了,但读模型还没更新,用户刷新后看到旧数据。

应对策略

策略 说明 适用场景
命令端返回最新状态 写入成功后,直接在响应中返回最新数据 简单场景,下单后展示订单详情
前端轮询+loading 前端显示"处理中",定时刷新直到数据一致 秒杀、支付结果查询
事件驱动+WebSocket推送 读模型更新后,主动推送给前端 实时要求高的场景
乐观UI更新 提交后立即在前端显示新状态,后台异步纠正 社交互动(点赞、评论)

我们的方案:命令执行成功后,对于关键操作(支付、退款),使用WebSocket主动推送状态变更;对于非关键操作(浏览记录),接受秒级延迟。

4.2 读写模型差异大如何保证数据正确

问题:读模型从多个数据源聚合数据,如何保证聚合逻辑的正确性?

应对策略投影(Projection)模式——每个事件驱动的读模型更新,都应该是一个可重放的、幂等的函数。

// 读模型重建:当发现数据不一致时,可以全量重建
@Component
public class OrderViewRebuilder {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private ElasticsearchTemplate esTemplate;
    
    /** 全量重建指定时间范围的读模型 */
    @Scheduled(cron = "0 0 3 * * ?")  // 每天凌晨3点
    public void rebuildDailyViews() {
        LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
        LocalDateTime today = LocalDateTime.now();
        
        List<Order> orders = orderRepository.findByCreatedAtBetween(yesterday, today);
        
        for (Order order : orders) {
            OrderView view = buildOrderView(order);
            esTemplate.save(view);
        }
        
        log.info("读模型重建完成: 共{}条订单", orders.size());
    }
}

4.3 事件顺序问题

如果订单先发生"创建"事件,再发生"支付"事件,但如果"支付"事件先到达投影器,就会导致状态错误。

解决方案

// 利用数据库的乐观锁保证顺序
@Document(indexName = "order_view")
public class OrderView {
    @Version
    private Long version;  // ES版本号,乐观锁
    
    // 只有高版本才能覆盖低版本
}

// 投影器:只接受更高版本的事件
public void onOrderPaid(OrderPaidEvent event) {
    // 检查当前读模型的版本
    OrderView current = esTemplate.get(event.getOrderId(), OrderView.class);
    
    if (current != null && current.getVersion() >= event.getVersion()) {
        log.warn("忽略过期事件: orderId={}, currentVersion={}, eventVersion={}", 
            event.getOrderId(), current.getVersion(), event.getVersion());
        return;
    }
    
    // 更新读模型...
}

五、CQRS配合Event Sourcing的进阶用法

当CQRS和Event Sourcing结合,可以构建出可审计、可回溯的终极架构:

// 事件存储(Event Store)
CREATE TABLE event_store (
    event_id VARCHAR(64) PRIMARY KEY,
    aggregate_id VARCHAR(64) NOT NULL,
    aggregate_type VARCHAR(50) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    event_data JSON NOT NULL,
    version INT NOT NULL,
    occurred_at TIMESTAMP NOT NULL,
    INDEX idx_aggregate (aggregate_id, version)
);

// 命令处理器:不修改状态,只追加事件
@Service
public class OrderCommandHandler {
    
    @Transactional
    public void payOrder(PayOrderCommand command) {
        // 1. 加载事件流
        List<Event> events = eventStore.load(command.getOrderId());
        
        // 2. 重放事件,重建聚合状态
        Order order = Order.replay(events);
        
        // 3. 执行业务操作,产生新事件
        OrderPaidEvent paidEvent = order.pay(command.getPayMethod());
        
        // 4. 追加事件
        eventStore.append(command.getOrderId(), paidEvent, events.size() + 1);
        
        // 5. 发布事件(异步更新读模型)
        eventBus.publish(paidEvent);
    }
}

六、技术选型建议

组件 推荐方案 备选方案
写数据库 MySQL/PostgreSQL MongoDB
读数据库(搜索) Elasticsearch Solr
读数据库(聚合) ClickHouse Druid、Doris
缓存 Redis Cluster Caffeine(本地)
事件总线 RocketMQ/Kafka RabbitMQ
事件存储 PostgreSQL/EventStoreDB Axon Framework

七、总结

CQRS不是银弹,它有明显的代价:系统复杂度翻倍,运维成本增加,最终一致性需要额外处理

但如果你的系统面临以下情况,CQRS是值得投入的:

  1. 读写模型差异大:查询需要JOIN多张表,写入需要保持事务完整性
  2. 读写比例悬殊:大量复杂查询拖慢写入性能
  3. 团队足够成熟:能驾驭事件驱动、最终一致性、投影重建等概念
  4. 业务价值足够高:性能提升带来的业务收益 > 架构复杂度带来的成本

核心经验

  • 从最简单的CQRS开始:先分离读写Service,再分离数据模型,最后分离数据库
  • 读模型可以有多个:搜索用ES、聚合用ClickHouse、列表用Redis,各司其职
  • 不要追求即时一致性:接受秒级延迟,用"好体验"覆盖"小延迟"
  • 建立读模型重建机制:比"保证永远正确"更重要的是"出错后能快速修复"

CQRS的本质是用空间换时间、用最终一致性换性能。当你接受这个权衡时,你离高可用的系统架构就不远了。


个人观点,仅供参考

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐