某金融交易平台在引入事件溯源(Event Sourcing)后,获得了完整的审计日志和时间旅行能力。但三个月后,团队发现一些事件流已经积累了超过 10 万条事件,聚合加载时间从毫秒级退化到秒级。更麻烦的是,业务迭代中修改了事件结构,旧版本事件无法反序列化。这些问题不是事件溯源本身的缺陷,而是工程实践上的坑——教科书通常不会告诉你这些。
命令查询职责分离(Command Query Responsibility Segregation,CQRS)和事件溯源(Event Sourcing,ES)是两个独立但高度互补的模式。CQRS 将读写模型分离,ES 用事件序列作为唯一事实来源。本文给出一条完整的实现路径:从事件存储的 schema 设计到投影重建、从快照策略到事件版本化,覆盖真实项目中的关键工程决策。
上一篇:防腐层与开放主机服务 | 下一篇:DDD 与微服务
一、CQRS 基础回顾
1.1 核心思想
传统的 CRUD 架构中,同一个模型同时负责读和写。CQRS 将这两个职责拆分为独立的模型:
- 命令模型(Command Model / Write Model):处理状态变更,保证业务规则和数据一致性;
- 查询模型(Query Model / Read Model):为查询优化的数据视图,可以有多个不同形态。
graph LR
Client["客户端"] -->|"命令"| CMD["命令处理器<br/>Command Handler"]
Client -->|"查询"| QRY["查询处理器<br/>Query Handler"]
CMD --> WM["写模型<br/>Write Model"]
WM --> ES["事件存储<br/>Event Store"]
ES -->|"事件发布"| PROJ["投影器<br/>Projector"]
PROJ --> RM["读模型<br/>Read Model"]
QRY --> RM
style WM fill:#e1f5fe
style RM fill:#f3e5f5
1.2 CQRS 的三种层次
| 层次 | 描述 | 复杂度 |
|---|---|---|
| 单数据库 CQRS | 读写模型共享数据库,通过不同的查询视图分离 | 低 |
| 双数据库 CQRS | 读写使用不同的数据库,通过事件同步 | 中 |
| CQRS + ES | 写侧使用事件存储,读侧使用投影视图 | 高 |
本文聚焦第三种——CQRS + Event Sourcing 的完整实现。
二、Event Sourcing 核心概念
2.1 传统存储 vs 事件存储
传统存储保存”当前状态”,事件存储保存”状态变化的历史”。
传统存储:
┌──────────────────────────────────┐
│ accounts 表 │
│ id=A001, balance=1000, status=OK │
└──────────────────────────────────┘
→ 只知道当前余额是 1000,不知道怎么到的 1000
事件存储:
┌──────────────────────────────────┐
│ events 表 │
│ 1. AccountOpened(A001, 0) │
│ 2. MoneyDeposited(A001, 500) │
│ 3. MoneyDeposited(A001, 800) │
│ 4. MoneyWithdrawn(A001, 300) │
└──────────────────────────────────┘
→ 完整的历史:0 → 500 → 1300 → 1000
2.2 事件溯源的核心公式
当前状态 = 初始状态 + fold(所有事件)
// 从事件流重建聚合状态
public class BankAccount {
private AccountId id;
private Money balance;
private AccountStatus status;
// 从事件流重建
public static BankAccount reconstitute(List<DomainEvent> events) {
BankAccount account = new BankAccount();
for (DomainEvent event : events) {
account.apply(event);
}
return account;
}
private void apply(DomainEvent event) {
if (event instanceof AccountOpenedEvent e) {
this.id = e.accountId();
this.balance = Money.ZERO;
this.status = AccountStatus.ACTIVE;
} else if (event instanceof MoneyDepositedEvent e) {
this.balance = this.balance.add(e.amount());
} else if (event instanceof MoneyWithdrawnEvent e) {
this.balance = this.balance.subtract(e.amount());
} else if (event instanceof AccountClosedEvent e) {
this.status = AccountStatus.CLOSED;
}
}
}2.3 事件溯源的优势与代价
| 优势 | 说明 |
|---|---|
| 完整审计日志 | 每一次状态变化都有记录 |
| 时间旅行 | 可以重建任意历史时刻的状态 |
| 事件驱动集成 | 事件天然可作为集成消息 |
| 无数据丢失 | 不会因为覆盖而丢失信息 |
| 灵活的读模型 | 可以从事件流构建任意形态的读模型 |
| 代价 | 说明 |
|---|---|
| 学习曲线 | 团队需要适应事件思维 |
| 查询复杂 | 不能直接 SQL 查询当前状态 |
| 事件版本化 | 事件结构变更需要迁移策略 |
| 最终一致性 | 读模型存在延迟 |
| 事件流增长 | 长事件流导致加载变慢 |
三、事件存储的 Schema 设计
3.1 核心表结构
-- 事件存储的核心表
CREATE TABLE event_store (
-- 全局唯一的事件 ID
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- 事件流标识(通常是聚合类型+聚合ID)
stream_name VARCHAR(500) NOT NULL,
-- 事件在流中的版本号(从 1 开始,单调递增)
stream_version BIGINT NOT NULL,
-- 事件类型(用于反序列化路由)
event_type VARCHAR(200) NOT NULL,
-- 事件负载(JSON 格式)
payload JSONB NOT NULL,
-- 事件元数据(因果关系、用户信息等)
metadata JSONB NOT NULL DEFAULT '{}',
-- 事件发生时间
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 事件写入时间(服务器时间,用于排序)
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 保证同一流内版本号唯一(乐观锁)
UNIQUE (stream_name, stream_version)
);
-- 按流名称和版本号建索引,加速聚合加载
CREATE INDEX idx_event_store_stream
ON event_store (stream_name, stream_version);
-- 按事件类型建索引,支持按类型订阅
CREATE INDEX idx_event_store_type
ON event_store (event_type, recorded_at);
-- 按时间建索引,支持全局事件流订阅
CREATE INDEX idx_event_store_recorded
ON event_store (recorded_at);3.2 Stream 命名规范
格式:{聚合类型}-{聚合ID}
示例:
BankAccount-acc_001 → 银行账户 acc_001 的事件流
Order-ord_20250413_001 → 订单的事件流
Inventory-prod_SKU123 → 库存的事件流
3.3 事件负载示例
{
"event_id": "e7a1b2c3-d4e5-6789-abcd-ef0123456789",
"stream_name": "BankAccount-acc_001",
"stream_version": 3,
"event_type": "MoneyDeposited",
"payload": {
"account_id": "acc_001",
"amount": "500.00",
"currency": "CNY",
"description": "工资入账"
},
"metadata": {
"user_id": "user_123",
"correlation_id": "txn_456",
"causation_id": "cmd_789",
"ip_address": "192.168.1.100"
},
"occurred_at": "2025-04-13T10:30:00Z",
"recorded_at": "2025-04-13T10:30:00.123Z"
}3.4 Go 实现:事件存储
type EventStore struct {
db *sql.DB
}
type StoredEvent struct {
EventID string
StreamName string
StreamVersion int64
EventType string
Payload json.RawMessage
Metadata json.RawMessage
OccurredAt time.Time
RecordedAt time.Time
}
// 追加事件到事件流(带乐观锁)
func (s *EventStore) Append(ctx context.Context, streamName string,
expectedVersion int64, events []StoredEvent) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
// 验证当前版本号(乐观锁)
var currentVersion int64
err = tx.QueryRowContext(ctx,
`SELECT COALESCE(MAX(stream_version), 0)
FROM event_store WHERE stream_name = $1`,
streamName).Scan(¤tVersion)
if err != nil {
return fmt.Errorf("查询当前版本失败: %w", err)
}
if currentVersion != expectedVersion {
return fmt.Errorf(
"并发冲突: 期望版本 %d, 当前版本 %d",
expectedVersion, currentVersion)
}
// 插入事件
stmt, err := tx.PrepareContext(ctx,
`INSERT INTO event_store
(event_id, stream_name, stream_version, event_type,
payload, metadata, occurred_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)`)
if err != nil {
return fmt.Errorf("准备语句失败: %w", err)
}
defer stmt.Close()
for i, event := range events {
version := expectedVersion + int64(i) + 1
_, err = stmt.ExecContext(ctx,
event.EventID, streamName, version,
event.EventType, event.Payload,
event.Metadata, event.OccurredAt)
if err != nil {
return fmt.Errorf("插入事件失败: %w", err)
}
}
return tx.Commit()
}
// 加载事件流
func (s *EventStore) LoadStream(ctx context.Context,
streamName string) ([]StoredEvent, error) {
rows, err := s.db.QueryContext(ctx,
`SELECT event_id, stream_name, stream_version, event_type,
payload, metadata, occurred_at, recorded_at
FROM event_store
WHERE stream_name = $1
ORDER BY stream_version ASC`,
streamName)
if err != nil {
return nil, fmt.Errorf("查询事件流失败: %w", err)
}
defer rows.Close()
var events []StoredEvent
for rows.Next() {
var e StoredEvent
if err := rows.Scan(&e.EventID, &e.StreamName,
&e.StreamVersion, &e.EventType,
&e.Payload, &e.Metadata,
&e.OccurredAt, &e.RecordedAt); err != nil {
return nil, fmt.Errorf("扫描事件失败: %w", err)
}
events = append(events, e)
}
return events, nil
}四、投影与读模型
4.1 投影的概念
投影(Projection)是从事件流构建读模型的过程。投影器(Projector)订阅事件流,将每个事件转换为读模型的更新操作。
graph LR
ES["事件存储"] -->|"事件流"| P1["投影器 A<br/>订单列表"]
ES -->|"事件流"| P2["投影器 B<br/>统计报表"]
ES -->|"事件流"| P3["投影器 C<br/>搜索索引"]
P1 --> RM1["PostgreSQL<br/>订单列表表"]
P2 --> RM2["ClickHouse<br/>统计表"]
P3 --> RM3["Elasticsearch<br/>搜索索引"]
4.2 同步投影 vs 异步投影
| 维度 | 同步投影 | 异步投影 |
|---|---|---|
| 实现方式 | 在写入事件的同一事务中更新读模型 | 通过事件订阅异步更新读模型 |
| 一致性 | 强一致 | 最终一致 |
| 延迟 | 无延迟 | 有延迟(通常毫秒到秒级) |
| 写入性能 | 较差(事务更大) | 较好(写入和投影解耦) |
| 故障影响 | 投影失败会导致写入失败 | 投影失败不影响写入 |
| 适用场景 | 对一致性要求极高的场景 | 大多数场景 |
4.3 投影器实现
// 异步投影器
type OrderListProjector struct {
db *sql.DB
store *EventStore
lastPos int64 // 上次处理的位置
}
func (p *OrderListProjector) Run(ctx context.Context) error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := p.processNewEvents(ctx); err != nil {
log.Printf("投影器处理失败: %v", err)
}
}
}
}
func (p *OrderListProjector) processNewEvents(ctx context.Context) error {
events, err := p.store.LoadSince(ctx, p.lastPos, 100)
if err != nil {
return err
}
for _, event := range events {
if err := p.handleEvent(ctx, event); err != nil {
return fmt.Errorf("处理事件 %s 失败: %w",
event.EventID, err)
}
p.lastPos = event.RecordedAt.UnixNano()
}
return nil
}
func (p *OrderListProjector) handleEvent(ctx context.Context,
event StoredEvent) error {
switch event.EventType {
case "OrderPlaced":
var payload OrderPlacedPayload
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
_, err := p.db.ExecContext(ctx,
`INSERT INTO order_list_view
(order_id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, $4, $5)`,
payload.OrderID, payload.CustomerID,
payload.TotalAmount, "待支付", event.OccurredAt)
return err
case "OrderConfirmed":
var payload OrderConfirmedPayload
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
_, err := p.db.ExecContext(ctx,
`UPDATE order_list_view SET status = $1
WHERE order_id = $2`,
"已确认", payload.OrderID)
return err
default:
return nil // 忽略不关心的事件
}
}4.4 投影重建
投影重建(Replay)是事件溯源的核心优势之一——当读模型出错或需要新的读模型时,可以从事件流重新构建。
func (p *OrderListProjector) Rebuild(ctx context.Context) error {
// 1. 清空现有读模型
_, err := p.db.ExecContext(ctx, "TRUNCATE TABLE order_list_view")
if err != nil {
return fmt.Errorf("清空读模型失败: %w", err)
}
// 2. 重置位置
p.lastPos = 0
// 3. 重新处理所有事件
for {
events, err := p.store.LoadSince(ctx, p.lastPos, 1000)
if err != nil {
return err
}
if len(events) == 0 {
break
}
for _, event := range events {
if err := p.handleEvent(ctx, event); err != nil {
return err
}
p.lastPos = event.RecordedAt.UnixNano()
}
}
log.Println("投影重建完成")
return nil
}五、快照策略
5.1 问题
当事件流很长时(例如一个活跃账户可能有数万条事件),每次加载聚合都要回放所有事件,性能不可接受。
5.2 快照的原理
定期保存聚合的当前状态快照,加载时从最近的快照开始,只回放快照之后的事件。
无快照:加载 10000 个事件
Event 1 → Event 2 → ... → Event 10000
加载时间:200ms
有快照(每 100 个事件一个快照):
Snapshot@9900 → Event 9901 → ... → Event 10000
加载时间:5ms
5.3 快照表设计
CREATE TABLE snapshots (
stream_name VARCHAR(500) NOT NULL,
stream_version BIGINT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (stream_name, stream_version)
);
-- 每个流只保留最新的快照(可选)
CREATE INDEX idx_snapshots_latest
ON snapshots (stream_name, stream_version DESC);5.4 带快照的聚合加载
func (r *EventSourcedRepository) Load(ctx context.Context,
streamName string) (*BankAccount, error) {
// 1. 尝试加载最新快照
snapshot, snapshotVersion, err := r.loadLatestSnapshot(ctx, streamName)
if err != nil && !errors.Is(err, ErrSnapshotNotFound) {
return nil, err
}
// 2. 从快照版本之后加载事件
fromVersion := int64(0)
if snapshot != nil {
fromVersion = snapshotVersion
}
events, err := r.store.LoadStreamFrom(ctx, streamName, fromVersion)
if err != nil {
return nil, err
}
// 3. 重建聚合
var account *BankAccount
if snapshot != nil {
account = snapshot // 从快照开始
} else {
account = &BankAccount{} // 从空白开始
}
for _, event := range events {
account.apply(event)
}
return account, nil
}
func (r *EventSourcedRepository) Save(ctx context.Context,
account *BankAccount) error {
// 保存事件
err := r.store.Append(ctx, account.streamName(),
account.version(), account.uncommittedEvents())
if err != nil {
return err
}
// 检查是否需要创建快照
if account.version() > 0 && account.version()%100 == 0 {
if err := r.saveSnapshot(ctx, account); err != nil {
// 快照保存失败不应阻塞主流程
log.Printf("快照保存失败: %v", err)
}
}
account.markCommitted()
return nil
}5.5 快照频率的权衡
| 快照频率 | 加载速度 | 存储成本 | 写入开销 |
|---|---|---|---|
| 每 50 个事件 | 快 | 高 | 大 |
| 每 100 个事件 | 中 | 中 | 中 |
| 每 500 个事件 | 慢 | 低 | 小 |
| 从不快照 | 最慢 | 无 | 无 |
推荐策略:从每 100 个事件一个快照开始,根据实际性能数据调整。
六、事件版本化(Upcasting)
6.1 问题
业务迭代中,事件的结构会变化。例如:
V1: MoneyDeposited { accountId, amount }
V2: MoneyDeposited { accountId, amount, currency } ← 新增字段
V3: MoneyDeposited { accountId, amount, currency, source } ← 新增字段
已存储的 V1 事件无法直接反序列化为 V3 结构。
6.2 Upcasting 方案
Upcasting 是在加载事件时,将旧版本事件”升级”为最新版本的过程。
public interface EventUpcaster {
boolean canUpcast(String eventType, int fromVersion);
JsonNode upcast(JsonNode oldPayload, int fromVersion);
}
public class MoneyDepositedUpcaster implements EventUpcaster {
@Override
public boolean canUpcast(String eventType, int fromVersion) {
return "MoneyDeposited".equals(eventType) && fromVersion < 3;
}
@Override
public JsonNode upcast(JsonNode payload, int fromVersion) {
ObjectNode node = (ObjectNode) payload.deepCopy();
if (fromVersion < 2) {
// V1 → V2: 添加默认币种
node.put("currency", "CNY");
}
if (fromVersion < 3) {
// V2 → V3: 添加默认来源
node.put("source", "UNKNOWN");
}
return node;
}
}6.3 Go 实现
type Upcaster func(payload json.RawMessage, fromVersion int) (json.RawMessage, error)
var upcasters = map[string]Upcaster{
"MoneyDeposited": upcastMoneyDeposited,
}
func upcastMoneyDeposited(payload json.RawMessage, fromVersion int) (json.RawMessage, error) {
var data map[string]interface{}
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
if fromVersion < 2 {
if _, ok := data["currency"]; !ok {
data["currency"] = "CNY"
}
}
if fromVersion < 3 {
if _, ok := data["source"]; !ok {
data["source"] = "UNKNOWN"
}
}
return json.Marshal(data)
}
// 在加载事件时应用 upcasting
func loadAndUpcast(stored StoredEvent) (StoredEvent, error) {
upcaster, exists := upcasters[stored.EventType]
if !exists {
return stored, nil
}
// 从元数据获取事件版本
version := extractSchemaVersion(stored.Metadata)
if version < currentSchemaVersion(stored.EventType) {
newPayload, err := upcaster(stored.Payload, version)
if err != nil {
return stored, fmt.Errorf("upcasting 失败: %w", err)
}
stored.Payload = newPayload
}
return stored, nil
}6.4 事件版本化的策略对比
| 策略 | 实现方式 | 优点 | 缺点 |
|---|---|---|---|
| Upcasting | 加载时升级旧事件 | 不修改历史数据 | upcaster 链会变长 |
| Copy-and-Transform | 批量迁移旧事件为新格式 | 简单直接 | 修改了历史数据 |
| Weak Schema | 使用宽松的反序列化 | 无需迁移 | 类型安全性差 |
| 多版本支持 | 每个版本独立的反序列化器 | 精确控制 | 代码膨胀 |
推荐使用 Upcasting 作为主要策略,它在不修改历史数据的前提下保持了系统的演进能力。
七、最终一致性的处理
7.1 问题
CQRS + ES 架构中,写入事件和读模型更新之间存在延迟。用户提交一个操作后,立刻查询可能看不到最新状态。
7.2 应对策略
策略 1:写后读一致性(Read-your-writes)
用户写入后,从写模型直接返回结果,不经过读模型
策略 2:轮询等待
客户端提交后轮询读模型,直到看到最新状态
策略 3:版本号校验
客户端携带期望版本号,读模型低于该版本时返回 HTTP 409
策略 4:Server-Sent Events / WebSocket
服务端主动推送读模型更新通知
// 策略 1:写后读一致性
func (h *PlaceOrderHandler) Handle(ctx context.Context,
cmd PlaceOrderCommand) (*OrderResponse, error) {
// 写入
order, err := h.orderService.PlaceOrder(ctx, cmd)
if err != nil {
return nil, err
}
// 直接从写模型构建响应,不查读模型
return &OrderResponse{
OrderID: order.ID().String(),
Status: order.Status().String(),
Total: order.Total().String(),
CreatedAt: order.CreatedAt(),
}, nil
}
// 策略 3:版本号校验
func (h *GetOrderHandler) Handle(ctx context.Context,
query GetOrderQuery) (*OrderListView, error) {
view, err := h.readStore.FindByID(ctx, query.OrderID)
if err != nil {
return nil, err
}
// 如果客户端指定了最小版本号
if query.MinVersion > 0 && view.Version < query.MinVersion {
return nil, ErrStaleData // 客户端可以重试
}
return view, nil
}八、测试事件溯源系统
8.1 Given-When-Then 模式
事件溯源系统天然适合 Given-When-Then 风格的测试:
- Given:一系列历史事件(聚合的历史状态);
- When:执行一个命令;
- Then:期望产生的新事件。
@Test
void shouldWithdrawMoney() {
// Given: 账户有 1000 元余额
given(
new AccountOpenedEvent("acc_001"),
new MoneyDepositedEvent("acc_001", money(1000, "CNY"))
)
// When: 取款 300 元
.when(new WithdrawMoneyCommand("acc_001", money(300, "CNY")))
// Then: 产生取款事件
.then(
new MoneyWithdrawnEvent("acc_001", money(300, "CNY"))
);
}
@Test
void shouldRejectWithdrawalWhenInsufficientBalance() {
given(
new AccountOpenedEvent("acc_001"),
new MoneyDepositedEvent("acc_001", money(100, "CNY"))
)
.when(new WithdrawMoneyCommand("acc_001", money(200, "CNY")))
.thenThrows(InsufficientBalanceException.class);
}8.2 投影器测试
func TestOrderListProjector_OrderPlaced(t *testing.T) {
db := setupTestDB(t)
projector := NewOrderListProjector(db)
event := StoredEvent{
EventType: "OrderPlaced",
Payload: mustJSON(map[string]interface{}{
"order_id": "ord_001",
"customer_id": "cust_001",
"total_amount": "99.99",
}),
OccurredAt: time.Now(),
}
err := projector.handleEvent(context.Background(), event)
require.NoError(t, err)
// 验证读模型
view, err := queryOrderView(db, "ord_001")
require.NoError(t, err)
assert.Equal(t, "待支付", view.Status)
assert.Equal(t, "99.99", view.TotalAmount)
}九、工具与框架
| 工具 | 语言/平台 | 特点 |
|---|---|---|
| EventStoreDB | 跨平台 | 专为事件溯源设计的数据库,支持投影和订阅 |
| Axon Framework | Java | 完整的 CQRS+ES 框架,集成 Spring |
| Eventuate | Java | 微服务事件溯源框架 |
| Marten | .NET | PostgreSQL 上的事件存储和文档数据库 |
| EventSauce | PHP | 轻量级事件溯源库 |
| 自建 | 任意语言 | 基于 PostgreSQL/MySQL + 消息队列 |
9.1 EventStoreDB 的简单使用
// 使用 EventStoreDB 的 Go 客户端
import "github.com/EventStore/EventStore-Client-Go/v4/esdb"
func appendEvents(client *esdb.Client, streamName string,
events []esdb.EventData) error {
_, err := client.AppendToStream(
context.Background(),
streamName,
esdb.AppendToStreamOptions{},
events...,
)
return err
}
func readStream(client *esdb.Client,
streamName string) ([]*esdb.ResolvedEvent, error) {
stream, err := client.ReadStream(
context.Background(),
streamName,
esdb.ReadStreamOptions{
Direction: esdb.Forwards,
From: esdb.Start{},
},
math.MaxUint64,
)
if err != nil {
return nil, err
}
defer stream.Close()
var events []*esdb.ResolvedEvent
for {
event, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
events = append(events, event)
}
return events, nil
}十、真实项目中的坑
10.1 坑一:事件粒度太细
每个字段变更都产生一个事件,导致事件流爆炸式增长。
解决方案:事件应该对应有业务意义的状态变迁,而不是字段级别的变更。
10.2 坑二:投影器成为瓶颈
所有读模型由一个投影器构建,事件量增大后成为性能瓶颈。
解决方案:每个读模型使用独立的投影器,并行处理。
10.3 坑三:忽略幂等性
消息可能重复投递,投影器不做幂等处理导致数据异常。
解决方案:记录已处理的事件 ID,或使用 UPSERT 操作。
10.4 坑四:快照与 Upcasting 冲突
快照中保存的是旧版本的聚合状态,schema 变更后快照无法反序列化。
解决方案:快照也需要版本化,或在 schema 变更后主动清除旧快照。
十一、综合权衡
| 决策维度 | CQRS + ES | 传统 CRUD | 权衡点 |
|---|---|---|---|
| 审计能力 | 完整的事件历史 | 需要额外审计日志 | 合规要求 |
| 查询灵活性 | 可构建任意读模型 | 受限于数据库 schema | 查询复杂度 |
| 开发复杂度 | 高(事件设计、投影、版本化) | 低 | 团队能力 |
| 性能 | 读写独立优化 | 读写共享资源 | 性能要求 |
| 一致性 | 最终一致 | 强一致 | 业务容忍度 |
| 可维护性 | 事件版本化成本 | schema 迁移成本 | 长期维护 |
| 适用场景 | 金融、审计密集型、复杂领域 | 简单 CRUD 业务 | 业务复杂度 |
| 团队规模 | 需要经验丰富的团队 | 适合各种团队 | 人力成本 |
十二、下一步
CQRS + Event Sourcing 是 DDD 在技术实现层面的高级模式。然而,如何将这些模式与微服务架构结合,如何用限界上下文指导服务边界的划分,是另一个重要的实践主题。
参考资料
- Young, Greg. “CQRS Documents.” cqrs.files.wordpress.com, 2010.
- Young, Greg. “Event Sourcing.” Event Store Documentation.
- Vernon, Vaughn. Implementing Domain-Driven Design. Addison-Wesley, 2013.
- Overeem, Michiel; Spoor, Marten; Jansen, Slinger. “The Dark Side of Event Sourcing: Managing Data Conversion.” IEEE SANER, 2017.
- EventStoreDB 官方文档:https://www.eventstore.com/docs/
- Axon Framework 官方文档:https://docs.axoniq.io/
- Fowler, Martin. “CQRS.” martinfowler.com.
- Betts, Dominic 等. Exploring CQRS and Event Sourcing. Microsoft Patterns & Practices, 2013.
- Richardson, Chris. Microservices Patterns. Manning, 2018.
- Kleppmann, Martin. Designing Data-Intensive Applications. O’Reilly, 2017.
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【系统架构设计百科】CQRS:读写分离的架构哲学
CQRS 不是 Event Sourcing 的附属品——本文从 Greg Young 的原始定义出发,拆解简单 CQRS 与完整 CQRS 的区别、读模型物化视图策略、最终一致性的用户体验设计,以及不和 Event Sourcing 绑定时 CQRS 仍然有价值的场景。
【系统架构设计百科】事件驱动架构:从消息通知到事件溯源
事件通知、事件携带状态转移、事件溯源三种模式经常被混为一谈,但它们在耦合度、数据一致性、存储成本和调试难度上有本质差异。本文基于 Martin Fowler 的 EDA 分类,拆解三种模式的机制与取舍,分析 Kafka 在事件驱动架构中的角色与局限,讨论事件排序的工程挑战和 schema 演进策略。
【系统架构设计百科】架构质量属性:不只是"高可用高性能"
需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。
【系统架构设计百科】告警策略:如何避免"狼来了"
大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。