土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】CQRS + Event Sourcing 完整实战:从领域建模到部署

文章导航

分类入口
architecture
标签入口
#CQRS#event-sourcing#event-store#projection#upcasting#EventStoreDB

目录

某金融交易平台在引入事件溯源(Event Sourcing)后,获得了完整的审计日志和时间旅行能力。但三个月后,团队发现一些事件流已经积累了超过 10 万条事件,聚合加载时间从毫秒级退化到秒级。更麻烦的是,业务迭代中修改了事件结构,旧版本事件无法反序列化。这些问题不是事件溯源本身的缺陷,而是工程实践上的坑——教科书通常不会告诉你这些。

命令查询职责分离(Command Query Responsibility Segregation,CQRS)和事件溯源(Event Sourcing,ES)是两个独立但高度互补的模式。CQRS 将读写模型分离,ES 用事件序列作为唯一事实来源。本文给出一条完整的实现路径:从事件存储的 schema 设计到投影重建、从快照策略到事件版本化,覆盖真实项目中的关键工程决策。

上一篇:防腐层与开放主机服务 | 下一篇:DDD 与微服务


一、CQRS 基础回顾

1.1 核心思想

传统的 CRUD 架构中,同一个模型同时负责读和写。CQRS 将这两个职责拆分为独立的模型:

graph LR
    Client["客户端"] -->|"命令"| CMD["命令处理器<br/>Command Handler"]
    Client -->|"查询"| QRY["查询处理器<br/>Query Handler"]

    CMD --> WM["写模型<br/>Write Model"]
    WM --> ES["事件存储<br/>Event Store"]
    ES -->|"事件发布"| PROJ["投影器<br/>Projector"]
    PROJ --> RM["读模型<br/>Read Model"]
    QRY --> RM

    style WM fill:#e1f5fe
    style RM fill:#f3e5f5

1.2 CQRS 的三种层次

层次 描述 复杂度
单数据库 CQRS 读写模型共享数据库,通过不同的查询视图分离
双数据库 CQRS 读写使用不同的数据库,通过事件同步
CQRS + ES 写侧使用事件存储,读侧使用投影视图

本文聚焦第三种——CQRS + Event Sourcing 的完整实现。


二、Event Sourcing 核心概念

2.1 传统存储 vs 事件存储

传统存储保存”当前状态”,事件存储保存”状态变化的历史”。

传统存储:
┌──────────────────────────────────┐
│ accounts 表                      │
│ id=A001, balance=1000, status=OK │
└──────────────────────────────────┘
  → 只知道当前余额是 1000,不知道怎么到的 1000

事件存储:
┌──────────────────────────────────┐
│ events 表                        │
│ 1. AccountOpened(A001, 0)        │
│ 2. MoneyDeposited(A001, 500)     │
│ 3. MoneyDeposited(A001, 800)     │
│ 4. MoneyWithdrawn(A001, 300)     │
└──────────────────────────────────┘
  → 完整的历史:0 → 500 → 1300 → 1000

2.2 事件溯源的核心公式

当前状态 = 初始状态 + fold(所有事件)
// 从事件流重建聚合状态
public class BankAccount {
    private AccountId id;
    private Money balance;
    private AccountStatus status;

    // 从事件流重建
    public static BankAccount reconstitute(List<DomainEvent> events) {
        BankAccount account = new BankAccount();
        for (DomainEvent event : events) {
            account.apply(event);
        }
        return account;
    }

    private void apply(DomainEvent event) {
        if (event instanceof AccountOpenedEvent e) {
            this.id = e.accountId();
            this.balance = Money.ZERO;
            this.status = AccountStatus.ACTIVE;
        } else if (event instanceof MoneyDepositedEvent e) {
            this.balance = this.balance.add(e.amount());
        } else if (event instanceof MoneyWithdrawnEvent e) {
            this.balance = this.balance.subtract(e.amount());
        } else if (event instanceof AccountClosedEvent e) {
            this.status = AccountStatus.CLOSED;
        }
    }
}

2.3 事件溯源的优势与代价

优势 说明
完整审计日志 每一次状态变化都有记录
时间旅行 可以重建任意历史时刻的状态
事件驱动集成 事件天然可作为集成消息
无数据丢失 不会因为覆盖而丢失信息
灵活的读模型 可以从事件流构建任意形态的读模型
代价 说明
学习曲线 团队需要适应事件思维
查询复杂 不能直接 SQL 查询当前状态
事件版本化 事件结构变更需要迁移策略
最终一致性 读模型存在延迟
事件流增长 长事件流导致加载变慢

三、事件存储的 Schema 设计

3.1 核心表结构

-- 事件存储的核心表
CREATE TABLE event_store (
    -- 全局唯一的事件 ID
    event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 事件流标识(通常是聚合类型+聚合ID)
    stream_name VARCHAR(500) NOT NULL,

    -- 事件在流中的版本号(从 1 开始,单调递增)
    stream_version BIGINT NOT NULL,

    -- 事件类型(用于反序列化路由)
    event_type VARCHAR(200) NOT NULL,

    -- 事件负载(JSON 格式)
    payload JSONB NOT NULL,

    -- 事件元数据(因果关系、用户信息等)
    metadata JSONB NOT NULL DEFAULT '{}',

    -- 事件发生时间
    occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- 事件写入时间(服务器时间,用于排序)
    recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- 保证同一流内版本号唯一(乐观锁)
    UNIQUE (stream_name, stream_version)
);

-- 按流名称和版本号建索引,加速聚合加载
CREATE INDEX idx_event_store_stream
    ON event_store (stream_name, stream_version);

-- 按事件类型建索引,支持按类型订阅
CREATE INDEX idx_event_store_type
    ON event_store (event_type, recorded_at);

-- 按时间建索引,支持全局事件流订阅
CREATE INDEX idx_event_store_recorded
    ON event_store (recorded_at);

3.2 Stream 命名规范

格式:{聚合类型}-{聚合ID}

示例:
  BankAccount-acc_001       → 银行账户 acc_001 的事件流
  Order-ord_20250413_001    → 订单的事件流
  Inventory-prod_SKU123     → 库存的事件流

3.3 事件负载示例

{
  "event_id": "e7a1b2c3-d4e5-6789-abcd-ef0123456789",
  "stream_name": "BankAccount-acc_001",
  "stream_version": 3,
  "event_type": "MoneyDeposited",
  "payload": {
    "account_id": "acc_001",
    "amount": "500.00",
    "currency": "CNY",
    "description": "工资入账"
  },
  "metadata": {
    "user_id": "user_123",
    "correlation_id": "txn_456",
    "causation_id": "cmd_789",
    "ip_address": "192.168.1.100"
  },
  "occurred_at": "2025-04-13T10:30:00Z",
  "recorded_at": "2025-04-13T10:30:00.123Z"
}

3.4 Go 实现:事件存储

type EventStore struct {
    db *sql.DB
}

type StoredEvent struct {
    EventID       string
    StreamName    string
    StreamVersion int64
    EventType     string
    Payload       json.RawMessage
    Metadata      json.RawMessage
    OccurredAt    time.Time
    RecordedAt    time.Time
}

// 追加事件到事件流(带乐观锁)
func (s *EventStore) Append(ctx context.Context, streamName string,
    expectedVersion int64, events []StoredEvent) error {

    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("开始事务失败: %w", err)
    }
    defer tx.Rollback()

    // 验证当前版本号(乐观锁)
    var currentVersion int64
    err = tx.QueryRowContext(ctx,
        `SELECT COALESCE(MAX(stream_version), 0)
         FROM event_store WHERE stream_name = $1`,
        streamName).Scan(&currentVersion)
    if err != nil {
        return fmt.Errorf("查询当前版本失败: %w", err)
    }

    if currentVersion != expectedVersion {
        return fmt.Errorf(
            "并发冲突: 期望版本 %d, 当前版本 %d",
            expectedVersion, currentVersion)
    }

    // 插入事件
    stmt, err := tx.PrepareContext(ctx,
        `INSERT INTO event_store
         (event_id, stream_name, stream_version, event_type,
          payload, metadata, occurred_at)
         VALUES ($1, $2, $3, $4, $5, $6, $7)`)
    if err != nil {
        return fmt.Errorf("准备语句失败: %w", err)
    }
    defer stmt.Close()

    for i, event := range events {
        version := expectedVersion + int64(i) + 1
        _, err = stmt.ExecContext(ctx,
            event.EventID, streamName, version,
            event.EventType, event.Payload,
            event.Metadata, event.OccurredAt)
        if err != nil {
            return fmt.Errorf("插入事件失败: %w", err)
        }
    }

    return tx.Commit()
}

// 加载事件流
func (s *EventStore) LoadStream(ctx context.Context,
    streamName string) ([]StoredEvent, error) {

    rows, err := s.db.QueryContext(ctx,
        `SELECT event_id, stream_name, stream_version, event_type,
                payload, metadata, occurred_at, recorded_at
         FROM event_store
         WHERE stream_name = $1
         ORDER BY stream_version ASC`,
        streamName)
    if err != nil {
        return nil, fmt.Errorf("查询事件流失败: %w", err)
    }
    defer rows.Close()

    var events []StoredEvent
    for rows.Next() {
        var e StoredEvent
        if err := rows.Scan(&e.EventID, &e.StreamName,
            &e.StreamVersion, &e.EventType,
            &e.Payload, &e.Metadata,
            &e.OccurredAt, &e.RecordedAt); err != nil {
            return nil, fmt.Errorf("扫描事件失败: %w", err)
        }
        events = append(events, e)
    }
    return events, nil
}

四、投影与读模型

4.1 投影的概念

投影(Projection)是从事件流构建读模型的过程。投影器(Projector)订阅事件流,将每个事件转换为读模型的更新操作。

graph LR
    ES["事件存储"] -->|"事件流"| P1["投影器 A<br/>订单列表"]
    ES -->|"事件流"| P2["投影器 B<br/>统计报表"]
    ES -->|"事件流"| P3["投影器 C<br/>搜索索引"]

    P1 --> RM1["PostgreSQL<br/>订单列表表"]
    P2 --> RM2["ClickHouse<br/>统计表"]
    P3 --> RM3["Elasticsearch<br/>搜索索引"]

4.2 同步投影 vs 异步投影

维度 同步投影 异步投影
实现方式 在写入事件的同一事务中更新读模型 通过事件订阅异步更新读模型
一致性 强一致 最终一致
延迟 无延迟 有延迟(通常毫秒到秒级)
写入性能 较差(事务更大) 较好(写入和投影解耦)
故障影响 投影失败会导致写入失败 投影失败不影响写入
适用场景 对一致性要求极高的场景 大多数场景

4.3 投影器实现

// 异步投影器
type OrderListProjector struct {
    db        *sql.DB
    store     *EventStore
    lastPos   int64 // 上次处理的位置
}

func (p *OrderListProjector) Run(ctx context.Context) error {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := p.processNewEvents(ctx); err != nil {
                log.Printf("投影器处理失败: %v", err)
            }
        }
    }
}

func (p *OrderListProjector) processNewEvents(ctx context.Context) error {
    events, err := p.store.LoadSince(ctx, p.lastPos, 100)
    if err != nil {
        return err
    }

    for _, event := range events {
        if err := p.handleEvent(ctx, event); err != nil {
            return fmt.Errorf("处理事件 %s 失败: %w",
                event.EventID, err)
        }
        p.lastPos = event.RecordedAt.UnixNano()
    }
    return nil
}

func (p *OrderListProjector) handleEvent(ctx context.Context,
    event StoredEvent) error {

    switch event.EventType {
    case "OrderPlaced":
        var payload OrderPlacedPayload
        if err := json.Unmarshal(event.Payload, &payload); err != nil {
            return err
        }
        _, err := p.db.ExecContext(ctx,
            `INSERT INTO order_list_view
             (order_id, customer_id, total_amount, status, created_at)
             VALUES ($1, $2, $3, $4, $5)`,
            payload.OrderID, payload.CustomerID,
            payload.TotalAmount, "待支付", event.OccurredAt)
        return err

    case "OrderConfirmed":
        var payload OrderConfirmedPayload
        if err := json.Unmarshal(event.Payload, &payload); err != nil {
            return err
        }
        _, err := p.db.ExecContext(ctx,
            `UPDATE order_list_view SET status = $1
             WHERE order_id = $2`,
            "已确认", payload.OrderID)
        return err

    default:
        return nil // 忽略不关心的事件
    }
}

4.4 投影重建

投影重建(Replay)是事件溯源的核心优势之一——当读模型出错或需要新的读模型时,可以从事件流重新构建。

func (p *OrderListProjector) Rebuild(ctx context.Context) error {
    // 1. 清空现有读模型
    _, err := p.db.ExecContext(ctx, "TRUNCATE TABLE order_list_view")
    if err != nil {
        return fmt.Errorf("清空读模型失败: %w", err)
    }

    // 2. 重置位置
    p.lastPos = 0

    // 3. 重新处理所有事件
    for {
        events, err := p.store.LoadSince(ctx, p.lastPos, 1000)
        if err != nil {
            return err
        }
        if len(events) == 0 {
            break
        }
        for _, event := range events {
            if err := p.handleEvent(ctx, event); err != nil {
                return err
            }
            p.lastPos = event.RecordedAt.UnixNano()
        }
    }

    log.Println("投影重建完成")
    return nil
}

五、快照策略

5.1 问题

当事件流很长时(例如一个活跃账户可能有数万条事件),每次加载聚合都要回放所有事件,性能不可接受。

5.2 快照的原理

定期保存聚合的当前状态快照,加载时从最近的快照开始,只回放快照之后的事件。

无快照:加载 10000 个事件
  Event 1 → Event 2 → ... → Event 10000
  加载时间:200ms

有快照(每 100 个事件一个快照):
  Snapshot@9900 → Event 9901 → ... → Event 10000
  加载时间:5ms

5.3 快照表设计

CREATE TABLE snapshots (
    stream_name VARCHAR(500) NOT NULL,
    stream_version BIGINT NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (stream_name, stream_version)
);

-- 每个流只保留最新的快照(可选)
CREATE INDEX idx_snapshots_latest
    ON snapshots (stream_name, stream_version DESC);

5.4 带快照的聚合加载

func (r *EventSourcedRepository) Load(ctx context.Context,
    streamName string) (*BankAccount, error) {

    // 1. 尝试加载最新快照
    snapshot, snapshotVersion, err := r.loadLatestSnapshot(ctx, streamName)
    if err != nil && !errors.Is(err, ErrSnapshotNotFound) {
        return nil, err
    }

    // 2. 从快照版本之后加载事件
    fromVersion := int64(0)
    if snapshot != nil {
        fromVersion = snapshotVersion
    }

    events, err := r.store.LoadStreamFrom(ctx, streamName, fromVersion)
    if err != nil {
        return nil, err
    }

    // 3. 重建聚合
    var account *BankAccount
    if snapshot != nil {
        account = snapshot // 从快照开始
    } else {
        account = &BankAccount{} // 从空白开始
    }

    for _, event := range events {
        account.apply(event)
    }

    return account, nil
}

func (r *EventSourcedRepository) Save(ctx context.Context,
    account *BankAccount) error {

    // 保存事件
    err := r.store.Append(ctx, account.streamName(),
        account.version(), account.uncommittedEvents())
    if err != nil {
        return err
    }

    // 检查是否需要创建快照
    if account.version() > 0 && account.version()%100 == 0 {
        if err := r.saveSnapshot(ctx, account); err != nil {
            // 快照保存失败不应阻塞主流程
            log.Printf("快照保存失败: %v", err)
        }
    }

    account.markCommitted()
    return nil
}

5.5 快照频率的权衡

快照频率 加载速度 存储成本 写入开销
每 50 个事件
每 100 个事件
每 500 个事件
从不快照 最慢

推荐策略:从每 100 个事件一个快照开始,根据实际性能数据调整。


六、事件版本化(Upcasting)

6.1 问题

业务迭代中,事件的结构会变化。例如:

V1: MoneyDeposited { accountId, amount }
V2: MoneyDeposited { accountId, amount, currency }     ← 新增字段
V3: MoneyDeposited { accountId, amount, currency, source } ← 新增字段

已存储的 V1 事件无法直接反序列化为 V3 结构。

6.2 Upcasting 方案

Upcasting 是在加载事件时,将旧版本事件”升级”为最新版本的过程。

public interface EventUpcaster {
    boolean canUpcast(String eventType, int fromVersion);
    JsonNode upcast(JsonNode oldPayload, int fromVersion);
}

public class MoneyDepositedUpcaster implements EventUpcaster {

    @Override
    public boolean canUpcast(String eventType, int fromVersion) {
        return "MoneyDeposited".equals(eventType) && fromVersion < 3;
    }

    @Override
    public JsonNode upcast(JsonNode payload, int fromVersion) {
        ObjectNode node = (ObjectNode) payload.deepCopy();

        if (fromVersion < 2) {
            // V1 → V2: 添加默认币种
            node.put("currency", "CNY");
        }
        if (fromVersion < 3) {
            // V2 → V3: 添加默认来源
            node.put("source", "UNKNOWN");
        }

        return node;
    }
}

6.3 Go 实现

type Upcaster func(payload json.RawMessage, fromVersion int) (json.RawMessage, error)

var upcasters = map[string]Upcaster{
    "MoneyDeposited": upcastMoneyDeposited,
}

func upcastMoneyDeposited(payload json.RawMessage, fromVersion int) (json.RawMessage, error) {
    var data map[string]interface{}
    if err := json.Unmarshal(payload, &data); err != nil {
        return nil, err
    }

    if fromVersion < 2 {
        if _, ok := data["currency"]; !ok {
            data["currency"] = "CNY"
        }
    }
    if fromVersion < 3 {
        if _, ok := data["source"]; !ok {
            data["source"] = "UNKNOWN"
        }
    }

    return json.Marshal(data)
}

// 在加载事件时应用 upcasting
func loadAndUpcast(stored StoredEvent) (StoredEvent, error) {
    upcaster, exists := upcasters[stored.EventType]
    if !exists {
        return stored, nil
    }

    // 从元数据获取事件版本
    version := extractSchemaVersion(stored.Metadata)
    if version < currentSchemaVersion(stored.EventType) {
        newPayload, err := upcaster(stored.Payload, version)
        if err != nil {
            return stored, fmt.Errorf("upcasting 失败: %w", err)
        }
        stored.Payload = newPayload
    }
    return stored, nil
}

6.4 事件版本化的策略对比

策略 实现方式 优点 缺点
Upcasting 加载时升级旧事件 不修改历史数据 upcaster 链会变长
Copy-and-Transform 批量迁移旧事件为新格式 简单直接 修改了历史数据
Weak Schema 使用宽松的反序列化 无需迁移 类型安全性差
多版本支持 每个版本独立的反序列化器 精确控制 代码膨胀

推荐使用 Upcasting 作为主要策略,它在不修改历史数据的前提下保持了系统的演进能力。


七、最终一致性的处理

7.1 问题

CQRS + ES 架构中,写入事件和读模型更新之间存在延迟。用户提交一个操作后,立刻查询可能看不到最新状态。

7.2 应对策略

策略 1:写后读一致性(Read-your-writes)
  用户写入后,从写模型直接返回结果,不经过读模型

策略 2:轮询等待
  客户端提交后轮询读模型,直到看到最新状态

策略 3:版本号校验
  客户端携带期望版本号,读模型低于该版本时返回 HTTP 409

策略 4:Server-Sent Events / WebSocket
  服务端主动推送读模型更新通知
// 策略 1:写后读一致性
func (h *PlaceOrderHandler) Handle(ctx context.Context,
    cmd PlaceOrderCommand) (*OrderResponse, error) {

    // 写入
    order, err := h.orderService.PlaceOrder(ctx, cmd)
    if err != nil {
        return nil, err
    }

    // 直接从写模型构建响应,不查读模型
    return &OrderResponse{
        OrderID:   order.ID().String(),
        Status:    order.Status().String(),
        Total:     order.Total().String(),
        CreatedAt: order.CreatedAt(),
    }, nil
}

// 策略 3:版本号校验
func (h *GetOrderHandler) Handle(ctx context.Context,
    query GetOrderQuery) (*OrderListView, error) {

    view, err := h.readStore.FindByID(ctx, query.OrderID)
    if err != nil {
        return nil, err
    }

    // 如果客户端指定了最小版本号
    if query.MinVersion > 0 && view.Version < query.MinVersion {
        return nil, ErrStaleData // 客户端可以重试
    }

    return view, nil
}

八、测试事件溯源系统

8.1 Given-When-Then 模式

事件溯源系统天然适合 Given-When-Then 风格的测试:

@Test
void shouldWithdrawMoney() {
    // Given: 账户有 1000 元余额
    given(
        new AccountOpenedEvent("acc_001"),
        new MoneyDepositedEvent("acc_001", money(1000, "CNY"))
    )
    // When: 取款 300 元
    .when(new WithdrawMoneyCommand("acc_001", money(300, "CNY")))
    // Then: 产生取款事件
    .then(
        new MoneyWithdrawnEvent("acc_001", money(300, "CNY"))
    );
}

@Test
void shouldRejectWithdrawalWhenInsufficientBalance() {
    given(
        new AccountOpenedEvent("acc_001"),
        new MoneyDepositedEvent("acc_001", money(100, "CNY"))
    )
    .when(new WithdrawMoneyCommand("acc_001", money(200, "CNY")))
    .thenThrows(InsufficientBalanceException.class);
}

8.2 投影器测试

func TestOrderListProjector_OrderPlaced(t *testing.T) {
    db := setupTestDB(t)
    projector := NewOrderListProjector(db)

    event := StoredEvent{
        EventType: "OrderPlaced",
        Payload: mustJSON(map[string]interface{}{
            "order_id":     "ord_001",
            "customer_id":  "cust_001",
            "total_amount": "99.99",
        }),
        OccurredAt: time.Now(),
    }

    err := projector.handleEvent(context.Background(), event)
    require.NoError(t, err)

    // 验证读模型
    view, err := queryOrderView(db, "ord_001")
    require.NoError(t, err)
    assert.Equal(t, "待支付", view.Status)
    assert.Equal(t, "99.99", view.TotalAmount)
}

九、工具与框架

工具 语言/平台 特点
EventStoreDB 跨平台 专为事件溯源设计的数据库,支持投影和订阅
Axon Framework Java 完整的 CQRS+ES 框架,集成 Spring
Eventuate Java 微服务事件溯源框架
Marten .NET PostgreSQL 上的事件存储和文档数据库
EventSauce PHP 轻量级事件溯源库
自建 任意语言 基于 PostgreSQL/MySQL + 消息队列

9.1 EventStoreDB 的简单使用

// 使用 EventStoreDB 的 Go 客户端
import "github.com/EventStore/EventStore-Client-Go/v4/esdb"

func appendEvents(client *esdb.Client, streamName string,
    events []esdb.EventData) error {

    _, err := client.AppendToStream(
        context.Background(),
        streamName,
        esdb.AppendToStreamOptions{},
        events...,
    )
    return err
}

func readStream(client *esdb.Client,
    streamName string) ([]*esdb.ResolvedEvent, error) {

    stream, err := client.ReadStream(
        context.Background(),
        streamName,
        esdb.ReadStreamOptions{
            Direction: esdb.Forwards,
            From:      esdb.Start{},
        },
        math.MaxUint64,
    )
    if err != nil {
        return nil, err
    }
    defer stream.Close()

    var events []*esdb.ResolvedEvent
    for {
        event, err := stream.Recv()
        if errors.Is(err, io.EOF) {
            break
        }
        if err != nil {
            return nil, err
        }
        events = append(events, event)
    }
    return events, nil
}

十、真实项目中的坑

10.1 坑一:事件粒度太细

每个字段变更都产生一个事件,导致事件流爆炸式增长。

解决方案:事件应该对应有业务意义的状态变迁,而不是字段级别的变更。

10.2 坑二:投影器成为瓶颈

所有读模型由一个投影器构建,事件量增大后成为性能瓶颈。

解决方案:每个读模型使用独立的投影器,并行处理。

10.3 坑三:忽略幂等性

消息可能重复投递,投影器不做幂等处理导致数据异常。

解决方案:记录已处理的事件 ID,或使用 UPSERT 操作。

10.4 坑四:快照与 Upcasting 冲突

快照中保存的是旧版本的聚合状态,schema 变更后快照无法反序列化。

解决方案:快照也需要版本化,或在 schema 变更后主动清除旧快照。


十一、综合权衡

决策维度 CQRS + ES 传统 CRUD 权衡点
审计能力 完整的事件历史 需要额外审计日志 合规要求
查询灵活性 可构建任意读模型 受限于数据库 schema 查询复杂度
开发复杂度 高(事件设计、投影、版本化) 团队能力
性能 读写独立优化 读写共享资源 性能要求
一致性 最终一致 强一致 业务容忍度
可维护性 事件版本化成本 schema 迁移成本 长期维护
适用场景 金融、审计密集型、复杂领域 简单 CRUD 业务 业务复杂度
团队规模 需要经验丰富的团队 适合各种团队 人力成本

十二、下一步

CQRS + Event Sourcing 是 DDD 在技术实现层面的高级模式。然而,如何将这些模式与微服务架构结合,如何用限界上下文指导服务边界的划分,是另一个重要的实践主题。

下一篇:DDD 与微服务:用领域模型划分服务边界


参考资料

  1. Young, Greg. “CQRS Documents.” cqrs.files.wordpress.com, 2010.
  2. Young, Greg. “Event Sourcing.” Event Store Documentation.
  3. Vernon, Vaughn. Implementing Domain-Driven Design. Addison-Wesley, 2013.
  4. Overeem, Michiel; Spoor, Marten; Jansen, Slinger. “The Dark Side of Event Sourcing: Managing Data Conversion.” IEEE SANER, 2017.
  5. EventStoreDB 官方文档:https://www.eventstore.com/docs/
  6. Axon Framework 官方文档:https://docs.axoniq.io/
  7. Fowler, Martin. “CQRS.” martinfowler.com.
  8. Betts, Dominic 等. Exploring CQRS and Event Sourcing. Microsoft Patterns & Practices, 2013.
  9. Richardson, Chris. Microservices Patterns. Manning, 2018.
  10. Kleppmann, Martin. Designing Data-Intensive Applications. O’Reilly, 2017.

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】CQRS:读写分离的架构哲学

CQRS 不是 Event Sourcing 的附属品——本文从 Greg Young 的原始定义出发,拆解简单 CQRS 与完整 CQRS 的区别、读模型物化视图策略、最终一致性的用户体验设计,以及不和 Event Sourcing 绑定时 CQRS 仍然有价值的场景。

2026-04-13 · architecture

【系统架构设计百科】事件驱动架构:从消息通知到事件溯源

事件通知、事件携带状态转移、事件溯源三种模式经常被混为一谈,但它们在耦合度、数据一致性、存储成本和调试难度上有本质差异。本文基于 Martin Fowler 的 EDA 分类,拆解三种模式的机制与取舍,分析 Kafka 在事件驱动架构中的角色与局限,讨论事件排序的工程挑战和 schema 演进策略。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。


By .