土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】应用层数据一致性模式:在正确性与性能之间走钢丝

文章导航

分类入口
architecture
标签入口
#consistency#Saga#TCC#outbox-pattern#distributed-transaction#compensation

目录

一个电商系统要处理一笔订单,涉及三个服务:订单服务扣减库存、支付服务冻结余额、物流服务预约发货。三个服务各有各的数据库。如果扣减库存成功了,冻结余额也成功了,但预约发货失败了——系统应该怎么办?

在单体应用里,一个数据库事务就能解决:三步操作要么全部提交,要么全部回滚。但在微服务架构下,三个服务的数据分散在三个数据库里,没有一个全局的事务管理器能同时控制三个数据源。传统的两阶段提交(2PC)协议理论上可以做到,但它的性能代价和可用性风险在高并发场景下几乎不可接受。

这就是分布式事务的本质困境:跨服务的数据一致性不能再依赖数据库层面的原子性保证,必须在应用层自己设计一致性方案

Saga、TCC、本地消息表、事务发件箱——这些都是应用层的一致性模式。它们的共同点是放弃了强一致性(Strong Consistency),转而追求最终一致性(Eventual Consistency)。它们的区别在于:补偿的粒度、隔离性的保证程度、对业务逻辑的侵入程度、以及实现和运维的复杂度。

这篇文章要回答一个核心问题:这些模式各自适用什么场景,选型的依据是什么?

上一篇 中我们讨论了数据迁移与版本化的问题。数据迁移关注的是数据模式的演进,而本文关注的是跨服务数据操作的一致性保障——两个问题经常在同一个系统中同时出现。


一、分布式事务的本质困境

单体事务为什么简单

在单体应用中,一个数据库事务提供四个保证:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)——即 ACID。这四个保证由数据库引擎内部实现,应用代码只需要 BEGIN、执行 SQL、COMMITROLLBACK

BEGIN;
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001;
UPDATE account SET balance = balance - 99.00 WHERE user_id = 42;
INSERT INTO orders (user_id, product_id, amount) VALUES (42, 1001, 99.00);
COMMIT;

如果中间任何一步失败,ROLLBACK 会把所有修改撤销。应用开发者不需要自己写”撤销扣减库存”或”退回余额”的逻辑。

2PC 的工作机制

当数据分散在多个数据库时,需要一个协议来协调多个数据源的提交。两阶段提交(Two-Phase Commit,2PC)是经典方案。

2PC 分两个阶段:

  1. 准备阶段(Prepare):协调者(Coordinator)向所有参与者(Participant)发送 PREPARE 请求。每个参与者执行本地事务但不提交,将结果写入预写日志(WAL),然后回复 YESNO
  2. 提交阶段(Commit):如果所有参与者都回复 YES,协调者发送 COMMIT;否则发送 ABORT。参与者收到指令后执行提交或回滚。
协调者                   参与者A          参与者B
  |--- PREPARE ----------->|               |
  |--- PREPARE --------------------------->|
  |<-- YES -----------------|               |
  |<-- YES -------------------------------|
  |--- COMMIT ------------->|               |
  |--- COMMIT ----------------------------->|
  |<-- ACK ------------------|               |
  |<-- ACK --------------------------------|

2PC 的三个致命问题

同步阻塞:在准备阶段,参与者持有本地事务锁。如果协调者在发送 COMMIT 之前崩溃,所有参与者都会处于”已准备但未提交”的状态,持有的锁无法释放。在这个窗口期内,其他事务无法访问被锁住的数据。在高并发系统中,这种阻塞会迅速级联放大。

单点故障:协调者是单点。虽然可以通过主备复制来提高协调者的可用性,但复制本身又引入了一致性问题——协调者的状态如果丢失或不一致,参与者就无法知道应该提交还是回滚。

网络分区下的困境:如果协调者和某个参与者之间的网络中断,协调者无法确认该参与者的状态。三阶段提交(3PC)试图通过增加一个 PRE-COMMIT 阶段来缓解这个问题,但在异步网络模型中,3PC 仍然无法完全避免不一致(FLP 不可能定理的推论)。

性能数据

从工程实践看,2PC 的性能代价是显著的。一个 2PC 事务的延迟通常是本地事务的 5-10 倍:准备阶段需要一次网络往返加磁盘 fsync,提交阶段又需要一次。假设单次网络往返 1ms、磁盘 fsync 5ms,一个两参与者的 2PC 事务至少需要 (1 + 5) x 2 = 12ms 的额外开销。在每秒处理上万笔订单的系统中,这种开销是不可接受的。

更关键的是可用性。如果参与者 A 正常但参与者 B 宕机了,在 2PC 中整个事务会被阻塞。但在业务层面,我们可能完全可以接受”先下单,稍后重试支付”的方案——这就是应用层一致性模式要解决的问题。

从强一致到最终一致

放弃 2PC 意味着放弃跨服务的强一致性。取而代之的是最终一致性——系统在某个时间窗口内可能处于不一致状态,但最终会收敛到一致。

这个取舍的前提是业务允许。电商场景中,用户下单后余额还没扣减,但 30 秒后扣减完成——这在业务上是可以接受的。但如果是银行转账,一方已经扣款但另一方还没到账,这个窗口的长度和处理方式就需要非常谨慎地设计。

下面逐一分析四种主要的应用层一致性模式。


二、Saga 模式详解

Saga 的起源

Saga 由 Hector Garcia-Molina 和 Kenneth Salem 在 1987 年的论文 “Sagas”(ACM SIGMOD)中提出。论文的原始动机是解决长事务(Long-Lived Transaction)的问题——当一个事务持续几分钟甚至几小时,一直持有数据库锁是不现实的。

Saga 的核心思路:把一个长事务拆成一系列短事务(称为”步骤”或”子事务”),每个步骤独立提交。如果中间某一步失败,不做全局回滚,而是按逆序执行前面每一步的”补偿操作”(Compensating Transaction),把系统恢复到初始状态。

形式化地说,一个 Saga 由 n 个步骤组成:T1, T2, …, Tn。每个步骤 Ti 有一个对应的补偿操作 Ci。如果 Tk 失败(1 < k <= n),系统按逆序执行 C(k-1), C(k-2), …, C1。

编排(Orchestration)vs 协调(Choreography)

Saga 有两种实现方式,区别在于”谁来决定下一步做什么”。

协调式 Saga(Choreography)

没有中央协调者。每个服务在完成自己的本地事务后,发布一个领域事件(Domain Event)。其他服务订阅感兴趣的事件,触发自己的本地事务。

以订单处理为例:

  1. 订单服务创建订单,发布 OrderCreated 事件。
  2. 库存服务订阅 OrderCreated,扣减库存,发布 InventoryReserved 事件。
  3. 支付服务订阅 InventoryReserved,冻结余额,发布 PaymentProcessed 事件。
  4. 订单服务订阅 PaymentProcessed,确认订单。

如果支付失败,支付服务发布 PaymentFailed 事件。库存服务订阅该事件,执行补偿操作释放库存。订单服务订阅该事件,将订单标记为失败。

// 库存服务的事件处理器
@EventHandler
public void on(OrderCreated event) {
    Inventory inv = inventoryRepository.findByProductId(event.getProductId());
    if (inv.getStock() < event.getQuantity()) {
        eventBus.publish(new InventoryInsufficient(event.getOrderId()));
        return;
    }
    inv.reserve(event.getQuantity());
    inventoryRepository.save(inv);
    eventBus.publish(new InventoryReserved(event.getOrderId(), event.getProductId()));
}

@EventHandler
public void on(PaymentFailed event) {
    // 补偿操作:释放已预留的库存
    Inventory inv = inventoryRepository.findByProductId(event.getProductId());
    inv.release(event.getQuantity());
    inventoryRepository.save(inv);
    eventBus.publish(new InventoryReleased(event.getOrderId()));
}

优点:服务之间完全解耦,每个服务只关心自己订阅的事件。没有单点故障。

缺点:随着步骤增多,事件链变得难以追踪。一个 Saga 涉及 5 个服务时,理解”订单失败后会发生什么”需要在 5 个服务之间跳来跳去看代码。补偿逻辑散落在各个服务中,没有一个地方能看到 Saga 的全局流程。测试困难——你无法在单个服务中测试完整的 Saga 流程。

编排式 Saga(Orchestration)

有一个中央协调者(Orchestrator),负责告诉每个参与者”现在该做什么”。协调者维护 Saga 的状态机,根据参与者的响应决定下一步是继续执行还是执行补偿。

stateDiagram-v2
    [*] --> 创建订单
    创建订单 --> 预留库存: 订单已创建
    预留库存 --> 处理支付: 库存已预留
    处理支付 --> 确认订单: 支付成功
    确认订单 --> [*]: 订单完成

    处理支付 --> 释放库存: 支付失败
    释放库存 --> 取消订单: 库存已释放
    取消订单 --> [*]: 订单已取消

    预留库存 --> 取消订单: 库存不足

下面是一个编排式 Saga 协调者的 Java 实现:

public class OrderSagaOrchestrator {

    private final OrderService orderService;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;

    public enum SagaState {
        STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSED,
        ORDER_CONFIRMED, COMPENSATING, FAILED
    }

    public void execute(CreateOrderCommand cmd) {
        SagaState state = SagaState.STARTED;
        String sagaId = UUID.randomUUID().toString();

        try {
            // 步骤 1:创建订单
            Order order = orderService.createOrder(cmd);
            sagaLog.record(sagaId, "ORDER_CREATED", order.getId());

            // 步骤 2:预留库存
            inventoryService.reserve(order.getProductId(), order.getQuantity());
            sagaLog.record(sagaId, "INVENTORY_RESERVED", order.getId());
            state = SagaState.INVENTORY_RESERVED;

            // 步骤 3:处理支付
            paymentService.processPayment(order.getUserId(), order.getAmount());
            sagaLog.record(sagaId, "PAYMENT_PROCESSED", order.getId());
            state = SagaState.PAYMENT_PROCESSED;

            // 步骤 4:确认订单
            orderService.confirmOrder(order.getId());
            sagaLog.record(sagaId, "ORDER_CONFIRMED", order.getId());
            state = SagaState.ORDER_CONFIRMED;

        } catch (InventoryInsufficientException e) {
            compensate(sagaId, state, cmd);
        } catch (PaymentFailedException e) {
            compensate(sagaId, state, cmd);
        }
    }

    private void compensate(String sagaId, SagaState failedAt, CreateOrderCommand cmd) {
        sagaLog.record(sagaId, "COMPENSATING", failedAt.name());

        switch (failedAt) {
            case PAYMENT_PROCESSED:
                // 不应该到这里,支付成功后不会触发补偿
                break;
            case INVENTORY_RESERVED:
                // 支付失败,回滚库存
                inventoryService.release(cmd.getProductId(), cmd.getQuantity());
                sagaLog.record(sagaId, "INVENTORY_RELEASED", "");
                // fall through
            case STARTED:
                // 取消订单
                orderService.cancelOrder(cmd.getOrderId());
                sagaLog.record(sagaId, "ORDER_CANCELLED", "");
                break;
        }
    }
}

优点:Saga 的完整流程在协调者中一目了然。补偿逻辑集中管理。添加新步骤时只需修改协调者,不需要在多个服务之间添加事件订阅。便于测试——可以对协调者做完整的单元测试。

缺点:协调者本身成为一个需要高可用保障的组件。服务之间的耦合从”事件耦合”变成了”协调者耦合”——协调者直接调用每个服务的接口,知道所有服务的存在。

编排与协调的选型判断

维度 协调式(Choreography) 编排式(Orchestration)
参与者数量 适合 2-4 个服务 适合 4 个以上服务
流程可见性 低,需要在多个服务间跳转 高,集中在协调者
耦合方式 事件耦合,松散 接口耦合,紧密
单点风险 无中央单点 协调者是潜在单点
测试难度 高,需要集成测试 中等,可单元测试协调者
适用场景 简单流程、服务团队独立 复杂流程、需要统一管理

经验判断:如果 Saga 涉及的步骤不超过 3 个、服务之间的依赖关系是线性的,协调式就够了。一旦步骤超过 3 个、或者步骤之间有分支和条件逻辑,编排式几乎是唯一可维护的选择。


三、Saga 的补偿机制设计

补偿是 Saga 的核心难点。和数据库的 ROLLBACK 不同,Saga 的补偿是语义层面的(Semantic Compensation),不是物理层面的撤销。

语义补偿 vs 物理回滚

数据库回滚是精确的物理操作:undo log 记录了每个数据页的修改前镜像,回滚时精确恢复。但 Saga 的每一步都已经提交了,无法从数据库层面撤销。

补偿操作需要在业务语义上”抵消”前一步的效果。“扣减库存”的补偿不是”回滚扣减”,而是”释放已预留的库存”。“冻结余额”的补偿不是”撤销冻结”,而是”解冻余额”。这两件事在代码实现上可能完全不同。

更麻烦的是,在步骤 Ti 执行和补偿 Ci 执行之间,可能已经有其他事务修改了同一条数据。比如:库存服务扣减了商品 A 的库存,在补偿执行之前,另一个订单也扣减了商品 A 的库存。这时候”释放库存”不能简单地加回原来的数量,而是需要精确地只释放这一笔订单预留的数量。

补偿操作的设计原则

原则一:补偿操作必须幂等。

网络抖动、消息重复投递、协调者重试——都可能导致补偿操作被执行多次。如果”释放库存”不是幂等的,重复执行就会多释放库存。

实现幂等的常见做法是使用唯一标识符:

public class InventoryService {

    public void release(String reservationId) {
        // 通过 reservationId 保证幂等
        Reservation reservation = reservationRepository.findById(reservationId);
        if (reservation == null || reservation.getStatus() == ReservationStatus.RELEASED) {
            // 已释放或不存在,直接返回
            return;
        }
        Inventory inv = inventoryRepository.findByProductId(reservation.getProductId());
        inv.setStock(inv.getStock() + reservation.getQuantity());
        reservation.setStatus(ReservationStatus.RELEASED);

        // 在同一个本地事务中更新库存和预留记录
        inventoryRepository.save(inv);
        reservationRepository.save(reservation);
    }
}

原则二:补偿操作必须能处理”前进操作未执行”的情况。

由于网络问题,协调者可能认为某一步执行失败,但实际上参与者还在执行中。协调者触发补偿时,前进操作可能尚未完成。补偿操作需要能够正确处理这种”空补偿”(Empty Compensation)场景。

public void cancelPayment(String paymentId) {
    Payment payment = paymentRepository.findById(paymentId);
    if (payment == null) {
        // 空补偿:支付请求从未到达,直接成功
        return;
    }
    if (payment.getStatus() == PaymentStatus.CANCELLED) {
        // 幂等:已取消,直接成功
        return;
    }
    payment.setStatus(PaymentStatus.CANCELLED);
    accountService.unfreeze(payment.getUserId(), payment.getAmount());
    paymentRepository.save(payment);
}

原则三:补偿操作不应该失败。

如果补偿操作本身也会失败,Saga 就陷入了”补偿失败”的困境。设计补偿操作时应尽量确保它们只涉及本地数据库操作,不依赖外部服务。

如果补偿确实可能失败(比如需要调用外部退款接口),需要一个重试机制。通常的做法是:

  1. 将补偿请求写入本地的”补偿任务表”。
  2. 一个后台调度器定期扫描未完成的补偿任务,重试执行。
  3. 设置最大重试次数和退避策略。
  4. 超过最大重试次数后,触发告警,转人工处理。
@Scheduled(fixedDelay = 5000)
public void retryFailedCompensations() {
    List<CompensationTask> pending = compensationTaskRepository
        .findByStatusAndRetryCountLessThan(
            CompensationStatus.PENDING, MAX_RETRIES);

    for (CompensationTask task : pending) {
        try {
            executeCompensation(task);
            task.setStatus(CompensationStatus.COMPLETED);
        } catch (Exception e) {
            task.setRetryCount(task.getRetryCount() + 1);
            task.setNextRetryAt(calculateBackoff(task.getRetryCount()));
            if (task.getRetryCount() >= MAX_RETRIES) {
                task.setStatus(CompensationStatus.FAILED);
                alertService.notify("Compensation failed permanently: " + task.getId());
            }
        }
        compensationTaskRepository.save(task);
    }
}

隔离性问题

Saga 没有隔离性保证。在 T1 执行到 T3 之间,其他 Saga 或普通事务可以看到 T1 和 T2 已提交但 T3 尚未完成的中间状态。这可能导致”脏读”——另一个事务读到了一个最终会被补偿掉的状态。

缓解方案有几种:

语义锁(Semantic Lock):在每个步骤中,不是直接修改数据,而是先标记为”处理中”状态。比如库存不是直接减少,而是先”预留”——库存总量不变,但可用库存减少。只有在 Saga 全部成功后,才将”预留”状态变为”已扣减”。

-- 预留阶段:不直接修改 stock,而是增加 reserved
UPDATE inventory
SET reserved = reserved + 1
WHERE product_id = 1001 AND (stock - reserved) >= 1;

-- 确认阶段:Saga 全部成功后
UPDATE inventory
SET stock = stock - 1, reserved = reserved - 1
WHERE product_id = 1001;

-- 补偿阶段:Saga 失败后
UPDATE inventory
SET reserved = reserved - 1
WHERE product_id = 1001;

交换律约束(Commutative Updates):如果步骤之间的操作满足交换律(即执行顺序不影响最终结果),隔离性问题就不存在。“加减库存”本质上是可交换的——两个 Saga 分别扣 1 和扣 2,无论谁先执行结果都是扣 3。但”设置库存为某个值”就不满足交换律。

版本号检查(Optimistic Concurrency):在补偿操作执行时检查版本号,确保补偿的是自己造成的修改。


四、TCC 模式

TCC 的工作机制

TCC(Try-Confirm-Cancel)由 Pat Helland 在 2007 年的论文 “Life beyond Distributed Transactions: an Apostate’s Opinion” 中阐述相关思想。TCC 将每个参与者的操作拆分为三个阶段:

  1. Try:预留资源。不真正执行业务操作,只是检查条件并锁定必要的资源。
  2. Confirm:确认执行。在 Try 成功的基础上,真正执行业务操作。Confirm 操作必须幂等。
  3. Cancel:取消预留。释放 Try 阶段锁定的资源。Cancel 操作必须幂等。

与 Saga 的关键区别在于:Saga 的每一步直接执行业务操作并提交,失败后通过补偿来”反转”效果;TCC 的 Try 阶段只预留资源不真正执行,Confirm 才真正执行。这意味着 TCC 在 Try 阶段就能发现问题,在任何真正的业务修改发生之前就可以取消,不需要像 Saga 那样处理”已提交的操作需要语义补偿”的复杂性。

订单处理的 TCC 实现

// 库存服务的 TCC 接口
public interface InventoryTccService {

    /**
     * Try:预留库存。
     * 冻结指定数量的库存,但不真正扣减。
     * 幂等:同一个 xid 重复调用返回成功。
     */
    boolean tryReserve(String xid, String productId, int quantity);

    /**
     * Confirm:确认扣减。
     *  Try 阶段冻结的库存真正扣减。
     * 幂等:同一个 xid 重复调用返回成功。
     */
    boolean confirm(String xid);

    /**
     * Cancel:释放预留。
     * 解冻 Try 阶段冻结的库存。
     * 幂等:同一个 xid 重复调用返回成功。
     */
    boolean cancel(String xid);
}
// 库存服务的 TCC 实现
public class InventoryTccServiceImpl implements InventoryTccService {

    @Override
    @Transactional
    public boolean tryReserve(String xid, String productId, int quantity) {
        // 幂等检查
        TccRecord record = tccRecordRepository.findByXid(xid);
        if (record != null) {
            return record.getStatus() != TccStatus.CANCELLED;
        }

        Inventory inv = inventoryRepository.findByProductId(productId);
        if (inv.getAvailable() < quantity) {
            return false;
        }

        // 冻结库存:available 减少,frozen 增加
        inv.setAvailable(inv.getAvailable() - quantity);
        inv.setFrozen(inv.getFrozen() + quantity);
        inventoryRepository.save(inv);

        // 记录 TCC 事务
        tccRecordRepository.save(new TccRecord(xid, productId, quantity, TccStatus.TRIED));
        return true;
    }

    @Override
    @Transactional
    public boolean confirm(String xid) {
        TccRecord record = tccRecordRepository.findByXid(xid);
        if (record == null) {
            return false;
        }
        if (record.getStatus() == TccStatus.CONFIRMED) {
            return true; // 幂等
        }

        Inventory inv = inventoryRepository.findByProductId(record.getProductId());
        // 真正扣减:frozen 减少
        inv.setFrozen(inv.getFrozen() - record.getQuantity());
        inventoryRepository.save(inv);

        record.setStatus(TccStatus.CONFIRMED);
        tccRecordRepository.save(record);
        return true;
    }

    @Override
    @Transactional
    public boolean cancel(String xid) {
        TccRecord record = tccRecordRepository.findByXid(xid);
        if (record == null) {
            return true; // 空取消,直接成功
        }
        if (record.getStatus() == TccStatus.CANCELLED) {
            return true; // 幂等
        }

        Inventory inv = inventoryRepository.findByProductId(record.getProductId());
        // 解冻:frozen 减少,available 恢复
        inv.setFrozen(inv.getFrozen() - record.getQuantity());
        inv.setAvailable(inv.getAvailable() + record.getQuantity());
        inventoryRepository.save(inv);

        record.setStatus(TccStatus.CANCELLED);
        tccRecordRepository.save(record);
        return true;
    }
}

Go 语言的等效实现,展示了 TCC 协调者的逻辑:

type TccCoordinator struct {
    inventorySvc InventoryTccService
    paymentSvc   PaymentTccService
    orderSvc     OrderTccService
}

func (c *TccCoordinator) ProcessOrder(ctx context.Context, cmd CreateOrderCmd) error {
    xid := uuid.New().String()

    // Phase 1: Try
    if err := c.orderSvc.TryCreate(ctx, xid, cmd); err != nil {
        return fmt.Errorf("order try failed: %w", err)
    }
    if err := c.inventorySvc.TryReserve(ctx, xid, cmd.ProductID, cmd.Quantity); err != nil {
        c.orderSvc.Cancel(ctx, xid)
        return fmt.Errorf("inventory try failed: %w", err)
    }
    if err := c.paymentSvc.TryFreeze(ctx, xid, cmd.UserID, cmd.Amount); err != nil {
        c.inventorySvc.Cancel(ctx, xid)
        c.orderSvc.Cancel(ctx, xid)
        return fmt.Errorf("payment try failed: %w", err)
    }

    // Phase 2: Confirm
    // 所有 Try 都成功后,执行 Confirm。
    // Confirm 失败需要重试(通常由框架保证)。
    if err := c.orderSvc.Confirm(ctx, xid); err != nil {
        log.Printf("order confirm failed, will retry: %v", err)
    }
    if err := c.inventorySvc.Confirm(ctx, xid); err != nil {
        log.Printf("inventory confirm failed, will retry: %v", err)
    }
    if err := c.paymentSvc.Confirm(ctx, xid); err != nil {
        log.Printf("payment confirm failed, will retry: %v", err)
    }

    return nil
}

TCC 的适用场景与局限

TCC 的最大优势是资源隔离。在 Try 阶段,资源被”冻结”而不是”消耗”。其他事务能看到的状态是”可用库存 = 总库存 - 已冻结”,不会出现 Saga 那种”库存已扣但可能被补偿回来”的不确定状态。

但 TCC 的代价也很高:

业务侵入性强:每个参与者必须把业务逻辑拆成 Try/Confirm/Cancel 三个方法。对于已有系统来说,改造成本很高。不是所有业务操作都能自然地拆成”预留-确认-取消”三步——比如”发送短信通知”,预留一条短信的概念就很牵强。

Try 阶段的资源锁定:Try 成功后 Confirm 之前,资源处于冻结状态。如果 Confirm 迟迟没来(协调者崩溃、网络超时),冻结的资源就被白白占用。需要一个超时释放机制:

// 定时任务:释放超时的 TCC 预留
@Scheduled(fixedDelay = 10000)
public void releaseExpiredReservations() {
    LocalDateTime cutoff = LocalDateTime.now().minusMinutes(5);
    List<TccRecord> expired = tccRecordRepository
        .findByStatusAndCreatedAtBefore(TccStatus.TRIED, cutoff);
    for (TccRecord record : expired) {
        cancel(record.getXid());
        log.warn("Released expired TCC reservation: {}", record.getXid());
    }
}

并发性能:在高并发场景下,大量 Try 操作会冻结大量资源。如果商品库存只有 100 件,同时有 200 个 Try 请求,其中 100 个会成功冻结、100 个会立即失败。而这 100 个冻结中可能有一部分最终会 Cancel(用户取消、支付失败等),造成资源浪费。


五、本地消息表模式

核心思路

本地消息表(Local Message Table)的核心想法很简单:在本地数据库事务中,同时完成业务操作和消息写入。消息不是直接发送到消息队列,而是先写入本地数据库的一张”消息表”。然后由一个后台进程定期扫描消息表,将未发送的消息投递到消息队列。

这种做法解决的关键问题是:业务操作和消息发送的原子性

如果先执行业务操作、再发送消息,可能业务操作成功但消息发送失败——下游服务永远不知道发生了什么。如果先发送消息、再执行业务操作,可能消息发送成功但业务操作失败——下游服务收到了一条不该存在的消息。

本地消息表把这两步合并到一个数据库事务中:

BEGIN;
-- 业务操作
UPDATE account SET balance = balance - 100 WHERE user_id = 42;

-- 写入消息表(同一个数据库)
INSERT INTO outbox_messages (id, topic, payload, status, created_at)
VALUES ('msg-001', 'payment.completed',
        '{"orderId": "ord-123", "userId": 42, "amount": 100}',
        'PENDING', NOW());

COMMIT;

两个操作在同一个本地事务中,要么都成功,要么都失败。

消息投递与重试

后台进程负责将消息表中的待发送消息投递到消息队列:

@Component
public class OutboxMessageRelay {

    private final OutboxMessageRepository messageRepository;
    private final MessageBroker messageBroker;

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void publishPendingMessages() {
        List<OutboxMessage> pending = messageRepository
            .findByStatusOrderByCreatedAtAsc(MessageStatus.PENDING, PageRequest.of(0, 100));

        for (OutboxMessage msg : pending) {
            try {
                messageBroker.publish(msg.getTopic(), msg.getPayload());
                msg.setStatus(MessageStatus.SENT);
                msg.setSentAt(LocalDateTime.now());
            } catch (Exception e) {
                msg.setRetryCount(msg.getRetryCount() + 1);
                if (msg.getRetryCount() > MAX_RETRIES) {
                    msg.setStatus(MessageStatus.FAILED);
                    log.error("Outbox message permanently failed: {}", msg.getId(), e);
                }
            }
            messageRepository.save(msg);
        }
    }
}

消费端的幂等保证

由于消息可能被重复投递(投递成功但更新状态失败,下次轮询会再次投递),消费端必须实现幂等处理:

@MessageHandler
public void handlePaymentCompleted(PaymentCompletedEvent event) {
    // 幂等检查:使用消息 ID 去重
    if (processedMessageRepository.existsById(event.getMessageId())) {
        log.info("Message already processed: {}", event.getMessageId());
        return;
    }

    // 在同一个事务中处理业务逻辑和记录消息 ID
    orderService.confirmPayment(event.getOrderId());
    processedMessageRepository.save(
        new ProcessedMessage(event.getMessageId(), LocalDateTime.now()));
}

本地消息表的优缺点

优点

缺点


六、事务发件箱 + CDC

从轮询到变更数据捕获

本地消息表的轮询模式有两个固有问题:延迟和数据库负载。事务发件箱模式(Transactional Outbox Pattern)与变更数据捕获(Change Data Capture,CDC)的结合解决了这两个问题。

核心思路不变——业务操作和消息写入在同一个本地事务中完成。但消息的投递方式从轮询改为 CDC:通过读取数据库的事务日志(如 MySQL 的 binlog、PostgreSQL 的 WAL),实时捕获 outbox 表的变更,然后将变更推送到消息队列。

业务服务                    数据库                      CDC 连接器              消息队列
  |                          |                           |                     |
  |--- BEGIN --------------->|                           |                     |
  |--- UPDATE business ----->|                           |                     |
  |--- INSERT outbox ------->|                           |                     |
  |--- COMMIT -------------->|                           |                     |
  |                          |-- binlog/WAL 变更 ------->|                     |
  |                          |                           |--- 发布消息 -------->|

Debezium 实现

Debezium 是目前最成熟的开源 CDC 工具,支持 MySQL、PostgreSQL、MongoDB、SQL Server 等数据库。它作为 Kafka Connect 的一个 Source Connector 运行,读取数据库的事务日志,将变更事件发送到 Kafka 主题。

Outbox 表的设计:

CREATE TABLE outbox (
    id            BIGINT PRIMARY KEY AUTO_INCREMENT,
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id   VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    payload        JSON NOT NULL,
    created_at     TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

Debezium 提供了专门的 Outbox Event Router(SMT,Single Message Transform),可以将 outbox 表的行变更自动转换为结构化的事件消息:

{
    "name": "outbox-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "order-db",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "******",
        "database.server.id": "1",
        "database.server.name": "order-service",
        "table.include.list": "orders.outbox",
        "transforms": "outbox",
        "transforms.outbox.type":
            "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.table.field.event.id": "id",
        "transforms.outbox.table.field.event.key": "aggregate_id",
        "transforms.outbox.table.field.event.type": "event_type",
        "transforms.outbox.table.field.event.payload": "payload",
        "transforms.outbox.route.by.field": "aggregate_type",
        "transforms.outbox.route.topic.replacement":
            "outbox.event.${routedByValue}"
    }
}

配置完成后,每当业务事务向 outbox 表插入一行,Debezium 会在毫秒级别内捕获该变更,提取 payload,根据 aggregate_type 路由到对应的 Kafka 主题。比如 aggregate_type = "Order" 的事件会被发送到 outbox.event.Order 主题。

Outbox 表的清理

CDC 模式下,outbox 表的行在被 Debezium 捕获后就不再需要了。但不能简单地在业务事务中 DELETE——因为 DELETE 操作也会产生 binlog 事件,可能干扰 CDC 连接器。

推荐的做法是定期批量清理:

-- 清理 24 小时前的已处理消息
DELETE FROM outbox WHERE created_at < DATE_SUB(NOW(), INTERVAL 24 HOUR) LIMIT 10000;

分批删除(每次 10000 行)避免长事务锁表。间隔 24 小时是为了保留足够的窗口,以防 Debezium 连接器临时不可用后需要从断点恢复。

CDC 方案的优缺点

优点

缺点


七、Eventuate Tram 实现剖析

框架定位

Eventuate Tram 是 Chris Richardson(《Microservices Patterns》作者)开发的开源框架,专门解决微服务架构下的事务性消息传递和 Saga 编排问题。它的核心机制就是事务发件箱模式。

Eventuate Tram 的架构分三层:

  1. Tram 核心:提供事务性消息发送和接收的 API。
  2. Tram Sagas:在核心之上提供 Saga 编排的 DSL 和执行引擎。
  3. CDC 服务:独立部署的进程,读取数据库的事务日志,将 outbox 表中的消息投递到消息代理(Kafka 或 ActiveMQ)。

消息发送的内部流程

当业务代码调用 messageSender.send() 时,Eventuate Tram 不会直接发送到 Kafka,而是将消息插入本地数据库的 message 表:

// Eventuate Tram 的 MessageProducer 内部实现(简化版)
public class MessageProducerImpl implements MessageProducer {

    private final MessageRepository messageRepository;

    @Override
    @Transactional
    public void send(String destination, Message message) {
        message.setId(idGenerator.genId());
        message.setDestination(destination);
        message.setHeaders(enrichHeaders(message.getHeaders()));
        // 关键:消息写入本地数据库,与业务操作在同一个事务中
        messageRepository.save(toMessageEntity(message));
    }
}

message 表的 schema(以 MySQL 为例):

CREATE TABLE message (
    id            VARCHAR(767) PRIMARY KEY,
    destination   VARCHAR(1000) NOT NULL,
    headers       VARCHAR(1000) NOT NULL,
    payload       TEXT NOT NULL,
    published     SMALLINT DEFAULT 0,
    creation_time BIGINT
);

CDC 服务(eventuate-cdc-service)以独立进程运行,通过 MySQL binlog 或 PostgreSQL WAL 监听 message 表的变更。捕获到新插入的行后,将 payload 发送到 destination 指定的 Kafka 主题,然后将 published 标记为 1。

Saga 编排引擎

Eventuate Tram Sagas 提供了一个声明式的 DSL 来定义 Saga 的步骤:

public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

    private final SagaDefinition<CreateOrderSagaData> sagaDefinition;

    public CreateOrderSaga(OrderServiceProxy orderService,
                           InventoryServiceProxy inventoryService,
                           PaymentServiceProxy paymentService) {
        this.sagaDefinition =
            step()
                .invokeLocal(this::createOrder)
                .withCompensation(this::rejectOrder)
            .step()
                .invokeParticipant(inventoryService.reserve)
                .withCompensation(inventoryService.release)
            .step()
                .invokeParticipant(paymentService.process)
                .withCompensation(paymentService.refund)
            .step()
                .invokeLocal(this::approveOrder)
            .build();
    }

    @Override
    public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
        return sagaDefinition;
    }

    private void createOrder(CreateOrderSagaData data) {
        // 创建订单,状态为 PENDING
        Order order = orderRepository.save(
            new Order(data.getUserId(), data.getProductId(),
                      data.getQuantity(), OrderStatus.PENDING));
        data.setOrderId(order.getId());
    }

    private void rejectOrder(CreateOrderSagaData data) {
        orderRepository.updateStatus(data.getOrderId(), OrderStatus.REJECTED);
    }

    private void approveOrder(CreateOrderSagaData data) {
        orderRepository.updateStatus(data.getOrderId(), OrderStatus.APPROVED);
    }
}

Saga 引擎维护了一个 saga_instance 表来跟踪每个 Saga 实例的状态:

CREATE TABLE saga_instance (
    saga_type        VARCHAR(255) NOT NULL,
    saga_id          VARCHAR(100) NOT NULL,
    state_name       VARCHAR(255) NOT NULL,
    last_request_id  VARCHAR(100),
    end_state        TINYINT(1),
    compensating     TINYINT(1),
    saga_data_type   VARCHAR(1000) NOT NULL,
    saga_data_json   TEXT NOT NULL,
    PRIMARY KEY (saga_type, saga_id)
);

每次状态转换,引擎都会更新 saga_instance 表并在同一个事务中写入命令消息到 message 表。这保证了 Saga 状态推进和命令发送的原子性。

参与者端的消息处理

参与者端收到命令消息后执行业务逻辑,然后发送回复消息:

public class InventoryCommandHandler {

    @CommandHandler
    public Message reserve(CommandMessage<ReserveInventoryCommand> cm) {
        ReserveInventoryCommand cmd = cm.getCommand();
        try {
            inventoryService.reserve(cmd.getProductId(), cmd.getQuantity());
            return withSuccess(new InventoryReserved(cmd.getProductId()));
        } catch (InsufficientInventoryException e) {
            return withFailure(new InventoryInsufficient(cmd.getProductId()));
        }
    }

    @CommandHandler
    public Message release(CommandMessage<ReleaseInventoryCommand> cm) {
        ReleaseInventoryCommand cmd = cm.getCommand();
        inventoryService.release(cmd.getProductId(), cmd.getQuantity());
        return withSuccess(new InventoryReleased(cmd.getProductId()));
    }
}

回复消息同样通过本地消息表发送,经 CDC 投递到 Kafka,最终被 Saga 引擎消费并驱动状态机的下一步。

Eventuate Tram 的架构特点

整个消息流的可靠性链条:

  1. 业务操作 + 消息写入 = 一个本地事务(原子性)。
  2. CDC 读取事务日志 = 至少一次投递(at-least-once)。
  3. 消费端幂等处理 = 重复消息不会导致错误。

最终效果是:只要数据库和 Kafka 正常运行,消息不会丢失也不会重复生效。延迟主要取决于 CDC 的轮询间隔(默认 500ms)和 Kafka 的消费延迟。


八、工程案例:电商订单处理系统

场景描述

一个中型电商平台,日均订单量 50 万笔,峰值 QPS 约 2000。系统拆分为以下微服务:

订单创建流程需要跨越前三个服务,通知是异步的、允许延迟。

方案选型过程

排除 2PC:三个服务使用不同的数据库(订单用 MySQL,库存用 Redis + MySQL,支付对接第三方),无法使用 XA 事务。即便能用,2PC 的性能在峰值 2000 QPS 下也不可接受。

评估 TCC:TCC 的资源隔离能力对库存和支付场景很合适——“冻结库存”和”冻结余额”是天然的 Try 操作。但支付服务对接第三方支付网关,第三方接口不支持 Try/Confirm/Cancel 语义。强行在中间加一层”预冻结”会引入额外的复杂性和延迟。

选择 Saga + Outbox:使用编排式 Saga 管理订单流程,使用事务发件箱模式保证消息可靠投递。

实现方案

Saga 协调者使用 Eventuate Tram Sagas 框架:

public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

    private final SagaDefinition<CreateOrderSagaData> sagaDefinition;

    public CreateOrderSaga() {
        this.sagaDefinition =
            step()
                .invokeLocal(this::createPendingOrder)
                .withCompensation(this::rejectOrder)
            .step()
                .invokeParticipant(this::reserveInventory)
                .onReply(InventoryReserved.class, this::onInventoryReserved)
                .withCompensation(this::releaseInventory)
            .step()
                .invokeParticipant(this::requestPayment)
                .onReply(PaymentCompleted.class, this::onPaymentCompleted)
                .withCompensation(this::refundPayment)
            .step()
                .invokeLocal(this::approveOrder)
            .step()
                .invokeParticipant(this::sendNotification)
                // 通知不需要补偿——发了就发了
            .build();
    }

    private void createPendingOrder(CreateOrderSagaData data) {
        Order order = new Order();
        order.setUserId(data.getUserId());
        order.setProductId(data.getProductId());
        order.setQuantity(data.getQuantity());
        order.setAmount(data.getAmount());
        order.setStatus(OrderStatus.PENDING);
        order = orderRepository.save(order);
        data.setOrderId(order.getId());
    }

    private CommandWithDestination reserveInventory(CreateOrderSagaData data) {
        return send(new ReserveInventoryCommand(
                data.getOrderId(), data.getProductId(), data.getQuantity()))
            .to("inventoryService")
            .build();
    }

    private CommandWithDestination releaseInventory(CreateOrderSagaData data) {
        return send(new ReleaseInventoryCommand(
                data.getOrderId(), data.getProductId(), data.getQuantity()))
            .to("inventoryService")
            .build();
    }

    private CommandWithDestination requestPayment(CreateOrderSagaData data) {
        return send(new ProcessPaymentCommand(
                data.getOrderId(), data.getUserId(), data.getAmount()))
            .to("paymentService")
            .build();
    }

    private CommandWithDestination refundPayment(CreateOrderSagaData data) {
        return send(new RefundPaymentCommand(
                data.getOrderId(), data.getPaymentId()))
            .to("paymentService")
            .build();
    }

    private void approveOrder(CreateOrderSagaData data) {
        orderRepository.updateStatus(data.getOrderId(), OrderStatus.APPROVED);
    }

    private void rejectOrder(CreateOrderSagaData data) {
        orderRepository.updateStatus(data.getOrderId(), OrderStatus.REJECTED);
    }

    private void onInventoryReserved(CreateOrderSagaData data,
                                      InventoryReserved reply) {
        data.setReservationId(reply.getReservationId());
    }

    private void onPaymentCompleted(CreateOrderSagaData data,
                                     PaymentCompleted reply) {
        data.setPaymentId(reply.getPaymentId());
    }

    private CommandWithDestination sendNotification(CreateOrderSagaData data) {
        return send(new SendOrderConfirmationCommand(
                data.getOrderId(), data.getUserId()))
            .to("notificationService")
            .build();
    }
}

支付补偿的特殊处理

支付服务对接第三方支付网关,退款接口可能需要几秒到几分钟才能返回结果。补偿操作不能同步等待退款完成。

public class PaymentCommandHandler {

    @CommandHandler
    public Message refund(CommandMessage<RefundPaymentCommand> cm) {
        RefundPaymentCommand cmd = cm.getCommand();
        Payment payment = paymentRepository.findById(cmd.getPaymentId());

        if (payment == null || payment.getStatus() == PaymentStatus.REFUNDED) {
            return withSuccess(new PaymentRefunded(cmd.getPaymentId()));
        }

        // 发起异步退款请求
        String refundId = paymentGateway.initiateRefund(payment.getTransactionId(),
                                                         payment.getAmount());
        payment.setStatus(PaymentStatus.REFUND_PENDING);
        payment.setRefundId(refundId);
        paymentRepository.save(payment);

        // 立即返回成功——退款结果由回调处理
        return withSuccess(new PaymentRefundInitiated(cmd.getPaymentId(), refundId));
    }

    // 第三方支付网关的退款回调
    @PostMapping("/webhook/refund")
    public void handleRefundCallback(@RequestBody RefundCallbackRequest req) {
        Payment payment = paymentRepository.findByRefundId(req.getRefundId());
        if ("SUCCESS".equals(req.getStatus())) {
            payment.setStatus(PaymentStatus.REFUNDED);
        } else {
            payment.setStatus(PaymentStatus.REFUND_FAILED);
            alertService.notify("Refund failed: " + req.getRefundId());
        }
        paymentRepository.save(payment);
    }
}

这里有一个设计决策:Saga 的补偿步骤在发起退款请求后就返回成功,而不是等待退款真正完成。这是因为 Saga 协调者需要继续执行后续的补偿步骤(释放库存、取消订单),不能卡在退款上。退款的最终结果由异步回调处理,如果退款失败则触发告警转人工。


九、各模式的选型决策框架

对比表

维度 2PC Saga TCC 本地消息表 Outbox + CDC
一致性级别 强一致 最终一致 最终一致 最终一致 最终一致
性能影响 高(同步阻塞) 中(Try 锁定资源)
可用性 低(协调者单点)
业务侵入性 中(需写补偿逻辑) 高(需拆分三阶段)
隔离性 完整 ACID 隔离 无隔离(需额外设计) 有资源冻结隔离 无隔离 无隔离
实现复杂度 低(数据库支持) 中(需 CDC 基础设施)
运维复杂度 高(CDC + 消息队列)
适用事务时长 短事务 长短均可 短事务 异步消息传递 异步消息传递
跨语言支持 取决于数据库
回滚精确度 精确(物理回滚) 语义补偿 精确(Cancel) 不涉及回滚 不涉及回滚

决策树

选型可以按以下思路逐步排除:

第一步:是否需要强一致性? 如果业务绝对不能容忍任何时间窗口的不一致(如金融核心账务系统、证券交易清算),考虑 2PC 或数据库级别的方案。但要清楚代价:性能下降和可用性风险。

第二步:参与者是否支持 Try/Confirm/Cancel 语义? 如果所有参与者的操作都能自然地拆分为资源预留和确认两步,且对隔离性有明确要求,TCC 是好选择。如果有参与者无法支持(如第三方接口、发送通知),排除 TCC。

第三步:跨服务操作的步骤数量? 如果只涉及两个服务之间的异步消息传递(A 做完通知 B),本地消息表或 Outbox + CDC 就够了。如果涉及多个服务的有序操作和补偿逻辑,需要 Saga。

第四步:是否已有 CDC 基础设施? 如果系统已经在使用 Debezium 或类似工具做数据同步,Outbox + CDC 的增量成本很低。如果没有,引入 CDC 技术栈的运维成本需要认真评估。

第五步:Saga 的编排方式? 步骤少于 4 个、依赖关系简单用协调式;步骤多于 3 个、有条件分支用编排式。

混合使用

实际系统中,多种模式经常混合使用。上面的电商案例中:

一种常见的反模式是”全局统一用一种方案”。比如把所有跨服务操作都用 Saga 包装,包括那些只需要简单异步通知的场景。过度设计的一致性方案会引入不必要的复杂性和运维负担。


十、补偿设计的工程实践

补偿日志

无论使用哪种模式,补偿操作的执行记录必须持久化:

CREATE TABLE compensation_log (
    id              BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id         VARCHAR(100) NOT NULL,
    step_name       VARCHAR(100) NOT NULL,
    action          VARCHAR(20) NOT NULL,  -- 'FORWARD' or 'COMPENSATE'
    status          VARCHAR(20) NOT NULL,  -- 'SUCCESS', 'FAILED', 'PENDING'
    request_payload TEXT,
    response_payload TEXT,
    retry_count     INT DEFAULT 0,
    created_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_saga_id (saga_id),
    INDEX idx_status (status)
);

这张表有两个作用:

  1. 故障恢复:协调者重启后,通过补偿日志恢复 Saga 的状态,继续执行未完成的补偿。
  2. 审计追踪:出现数据不一致时,通过补偿日志追溯每一步操作的执行情况和结果。

监控与告警

需要监控的关键指标:

// Go 语言的 Saga 监控指标
var (
    sagaStarted = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "saga_started_total",
        Help: "Total number of sagas started",
    })
    sagaCompleted = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "saga_completed_total",
        Help: "Total number of sagas completed successfully",
    })
    sagaCompensated = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "saga_compensated_total",
        Help: "Total number of sagas that triggered compensation",
    })
    sagaFailed = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "saga_failed_total",
        Help: "Total number of sagas that failed permanently",
    })
    sagaDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "saga_duration_seconds",
        Help:    "Duration of saga execution",
        Buckets: prometheus.ExponentialBuckets(0.01, 2, 15),
    })
    compensationRetries = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "compensation_retries",
        Help:    "Number of retries before compensation succeeds",
        Buckets: prometheus.LinearBuckets(0, 1, 10),
    })
)

关键告警规则:

人工介入机制

当自动补偿失败后,系统需要提供人工介入的手段:

// 管理后台接口:手动触发 Saga 补偿或强制完成
@RestController
@RequestMapping("/admin/sagas")
public class SagaAdminController {

    @PostMapping("/{sagaId}/retry-compensation")
    public ResponseEntity<?> retryCompensation(@PathVariable String sagaId) {
        SagaInstance saga = sagaRepository.findById(sagaId);
        if (saga == null) {
            return ResponseEntity.notFound().build();
        }
        if (!saga.isCompensating()) {
            return ResponseEntity.badRequest()
                .body("Saga is not in compensating state");
        }
        sagaEngine.retryCompensation(sagaId);
        return ResponseEntity.ok("Compensation retry initiated");
    }

    @PostMapping("/{sagaId}/force-complete")
    public ResponseEntity<?> forceComplete(@PathVariable String sagaId) {
        // 强制标记 Saga 为完成状态
        // 仅在人工确认数据已手动修复后使用
        SagaInstance saga = sagaRepository.findById(sagaId);
        saga.setEndState(true);
        saga.setCompensating(false);
        sagaRepository.save(saga);
        auditLog.record("SAGA_FORCE_COMPLETE", sagaId,
            SecurityContextHolder.getContext().getAuthentication().getName());
        return ResponseEntity.ok("Saga force completed");
    }
}

测试策略

Saga 和 TCC 的测试需要覆盖三类场景:

  1. 正常路径:所有步骤成功完成。
  2. 每一步失败的补偿路径:步骤 1 失败、步骤 2 失败、步骤 3 失败——每种情况的补偿序列是否正确。
  3. 异常场景:网络超时、重复消息、空补偿、并发冲突。
@Test
void sagaShouldCompensateWhenPaymentFails() {
    // Given
    when(inventoryService.reserve(any(), anyInt()))
        .thenReturn(new InventoryReserved("res-1"));
    when(paymentService.process(any(), any(), any()))
        .thenThrow(new PaymentFailedException("Insufficient balance"));

    // When
    sagaOrchestrator.execute(new CreateOrderCommand("user-1", "prod-1", 1, 99.00));

    // Then
    // 验证补偿操作按逆序执行
    InOrder inOrder = inOrder(inventoryService, orderService);
    inOrder.verify(inventoryService).release("prod-1", 1);
    inOrder.verify(orderService).cancelOrder(anyString());

    // 验证订单状态
    Order order = orderRepository.findByUserId("user-1");
    assertEquals(OrderStatus.REJECTED, order.getStatus());
}

@Test
void compensationShouldBeIdempotent() {
    // Given:模拟补偿被执行两次
    inventoryService.reserve("res-1", "prod-1", 1);

    // When:第一次补偿
    inventoryService.release("res-1");
    int stockAfterFirst = inventoryRepository.findByProductId("prod-1").getStock();

    // When:第二次补偿(重复)
    inventoryService.release("res-1");
    int stockAfterSecond = inventoryRepository.findByProductId("prod-1").getStock();

    // Then:库存只释放了一次
    assertEquals(stockAfterFirst, stockAfterSecond);
}

十一、常见陷阱与应对

陷阱一:忽略消息顺序

在协调式 Saga 中,事件通过消息队列传递。如果消息队列不保证顺序(或消费者有多个实例并行消费),可能出现补偿事件先于正向事件到达的情况。

比如:库存服务还没收到 OrderCreated 事件,就先收到了 PaymentFailed 需要释放库存——但它从来没有预留过库存。

应对方案:消费端对”未知预留”的释放请求做空操作处理,或者使用消息队列的分区机制保证同一个 orderId 的消息被同一个消费者顺序处理。

陷阱二:补偿和正向操作并发执行

在编排式 Saga 中,协调者向参与者发送正向命令后等待回复。如果网络超时,协调者可能判定步骤失败并发起补偿。但实际上参与者正在执行正向操作,补偿命令和正向操作可能并发执行。

应对方案:参与者端使用状态机管理操作状态。只有在”已完成”状态下才接受补偿,在”进行中”状态下拒绝补偿(让协调者稍后重试),或者使用悲观锁/乐观锁防止并发。

@Transactional
public void handleCompensation(String operationId) {
    OperationRecord record = operationRepository
        .findByIdForUpdate(operationId); // SELECT ... FOR UPDATE
    if (record == null) {
        // 正向操作尚未开始,记录一个"预补偿"标记
        operationRepository.save(new OperationRecord(operationId,
            OperationStatus.PRE_COMPENSATED));
        return;
    }
    if (record.getStatus() == OperationStatus.IN_PROGRESS) {
        throw new ConcurrentOperationException(
            "Operation in progress, retry later");
    }
    // 正向操作已完成,执行补偿
    doCompensate(record);
}

陷阱三:不必要的 Saga

并非所有跨服务操作都需要 Saga。如果操作之间没有依赖关系,可以用更简单的方案:

陷阱四:Outbox 表膨胀

在高吞吐系统中,outbox 表的增长速度可能很快。日均 50 万笔订单,每笔订单产生 3-5 条 outbox 消息,一天就是 150-250 万行。如果不及时清理,表会快速膨胀,影响写入性能和 CDC 连接器的效率。

应对方案:

-- 使用分区表,按天分区
CREATE TABLE outbox (
    id            BIGINT NOT NULL AUTO_INCREMENT,
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id   VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    payload        JSON NOT NULL,
    created_at     TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)) (
    PARTITION p20260413 VALUES LESS THAN (UNIX_TIMESTAMP('2026-04-14')),
    PARTITION p20260414 VALUES LESS THAN (UNIX_TIMESTAMP('2026-04-15')),
    PARTITION pmax VALUES LESS THAN MAXVALUE
);

-- 清理:直接 DROP 旧分区,比 DELETE 快几个数量级
ALTER TABLE outbox DROP PARTITION p20260411;

十二、总结

应用层一致性模式的本质是一种工程权衡:在分布式环境下放弃全局事务的便利性,换取性能和可用性。

几个核心判断:

选型时不要追求”一种方案解决所有问题”。真实系统中,不同的业务流程可能需要不同的一致性方案,混合使用才是常态。关键是理解每种模式的代价和边界,在正确性和性能之间找到自己系统的平衡点。


导航

上一篇: 数据迁移与版本化

下一篇: API 设计


参考资料

论文

书籍

开源项目

规范与设计文档

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。

2026-04-13 · architecture

【系统架构设计百科】复杂性管理:架构的核心战场

系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略


By .