一个电商系统要处理一笔订单,涉及三个服务:订单服务扣减库存、支付服务冻结余额、物流服务预约发货。三个服务各有各的数据库。如果扣减库存成功了,冻结余额也成功了,但预约发货失败了——系统应该怎么办?
在单体应用里,一个数据库事务就能解决:三步操作要么全部提交,要么全部回滚。但在微服务架构下,三个服务的数据分散在三个数据库里,没有一个全局的事务管理器能同时控制三个数据源。传统的两阶段提交(2PC)协议理论上可以做到,但它的性能代价和可用性风险在高并发场景下几乎不可接受。
这就是分布式事务的本质困境:跨服务的数据一致性不能再依赖数据库层面的原子性保证,必须在应用层自己设计一致性方案。
Saga、TCC、本地消息表、事务发件箱——这些都是应用层的一致性模式。它们的共同点是放弃了强一致性(Strong Consistency),转而追求最终一致性(Eventual Consistency)。它们的区别在于:补偿的粒度、隔离性的保证程度、对业务逻辑的侵入程度、以及实现和运维的复杂度。
这篇文章要回答一个核心问题:这些模式各自适用什么场景,选型的依据是什么?
在 上一篇 中我们讨论了数据迁移与版本化的问题。数据迁移关注的是数据模式的演进,而本文关注的是跨服务数据操作的一致性保障——两个问题经常在同一个系统中同时出现。
一、分布式事务的本质困境
单体事务为什么简单
在单体应用中,一个数据库事务提供四个保证:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)——即
ACID。这四个保证由数据库引擎内部实现,应用代码只需要
BEGIN、执行 SQL、COMMIT 或
ROLLBACK。
BEGIN;
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001;
UPDATE account SET balance = balance - 99.00 WHERE user_id = 42;
INSERT INTO orders (user_id, product_id, amount) VALUES (42, 1001, 99.00);
COMMIT;如果中间任何一步失败,ROLLBACK
会把所有修改撤销。应用开发者不需要自己写”撤销扣减库存”或”退回余额”的逻辑。
2PC 的工作机制
当数据分散在多个数据库时,需要一个协议来协调多个数据源的提交。两阶段提交(Two-Phase Commit,2PC)是经典方案。
2PC 分两个阶段:
- 准备阶段(Prepare):协调者(Coordinator)向所有参与者(Participant)发送
PREPARE请求。每个参与者执行本地事务但不提交,将结果写入预写日志(WAL),然后回复YES或NO。 - 提交阶段(Commit):如果所有参与者都回复
YES,协调者发送COMMIT;否则发送ABORT。参与者收到指令后执行提交或回滚。
协调者 参与者A 参与者B
|--- PREPARE ----------->| |
|--- PREPARE --------------------------->|
|<-- YES -----------------| |
|<-- YES -------------------------------|
|--- COMMIT ------------->| |
|--- COMMIT ----------------------------->|
|<-- ACK ------------------| |
|<-- ACK --------------------------------|
2PC 的三个致命问题
同步阻塞:在准备阶段,参与者持有本地事务锁。如果协调者在发送
COMMIT
之前崩溃,所有参与者都会处于”已准备但未提交”的状态,持有的锁无法释放。在这个窗口期内,其他事务无法访问被锁住的数据。在高并发系统中,这种阻塞会迅速级联放大。
单点故障:协调者是单点。虽然可以通过主备复制来提高协调者的可用性,但复制本身又引入了一致性问题——协调者的状态如果丢失或不一致,参与者就无法知道应该提交还是回滚。
网络分区下的困境:如果协调者和某个参与者之间的网络中断,协调者无法确认该参与者的状态。三阶段提交(3PC)试图通过增加一个
PRE-COMMIT
阶段来缓解这个问题,但在异步网络模型中,3PC
仍然无法完全避免不一致(FLP 不可能定理的推论)。
性能数据
从工程实践看,2PC 的性能代价是显著的。一个 2PC 事务的延迟通常是本地事务的 5-10 倍:准备阶段需要一次网络往返加磁盘 fsync,提交阶段又需要一次。假设单次网络往返 1ms、磁盘 fsync 5ms,一个两参与者的 2PC 事务至少需要 (1 + 5) x 2 = 12ms 的额外开销。在每秒处理上万笔订单的系统中,这种开销是不可接受的。
更关键的是可用性。如果参与者 A 正常但参与者 B 宕机了,在 2PC 中整个事务会被阻塞。但在业务层面,我们可能完全可以接受”先下单,稍后重试支付”的方案——这就是应用层一致性模式要解决的问题。
从强一致到最终一致
放弃 2PC 意味着放弃跨服务的强一致性。取而代之的是最终一致性——系统在某个时间窗口内可能处于不一致状态,但最终会收敛到一致。
这个取舍的前提是业务允许。电商场景中,用户下单后余额还没扣减,但 30 秒后扣减完成——这在业务上是可以接受的。但如果是银行转账,一方已经扣款但另一方还没到账,这个窗口的长度和处理方式就需要非常谨慎地设计。
下面逐一分析四种主要的应用层一致性模式。
二、Saga 模式详解
Saga 的起源
Saga 由 Hector Garcia-Molina 和 Kenneth Salem 在 1987 年的论文 “Sagas”(ACM SIGMOD)中提出。论文的原始动机是解决长事务(Long-Lived Transaction)的问题——当一个事务持续几分钟甚至几小时,一直持有数据库锁是不现实的。
Saga 的核心思路:把一个长事务拆成一系列短事务(称为”步骤”或”子事务”),每个步骤独立提交。如果中间某一步失败,不做全局回滚,而是按逆序执行前面每一步的”补偿操作”(Compensating Transaction),把系统恢复到初始状态。
形式化地说,一个 Saga 由 n 个步骤组成:T1, T2, …, Tn。每个步骤 Ti 有一个对应的补偿操作 Ci。如果 Tk 失败(1 < k <= n),系统按逆序执行 C(k-1), C(k-2), …, C1。
编排(Orchestration)vs 协调(Choreography)
Saga 有两种实现方式,区别在于”谁来决定下一步做什么”。
协调式 Saga(Choreography)
没有中央协调者。每个服务在完成自己的本地事务后,发布一个领域事件(Domain Event)。其他服务订阅感兴趣的事件,触发自己的本地事务。
以订单处理为例:
- 订单服务创建订单,发布
OrderCreated事件。 - 库存服务订阅
OrderCreated,扣减库存,发布InventoryReserved事件。 - 支付服务订阅
InventoryReserved,冻结余额,发布PaymentProcessed事件。 - 订单服务订阅
PaymentProcessed,确认订单。
如果支付失败,支付服务发布 PaymentFailed
事件。库存服务订阅该事件,执行补偿操作释放库存。订单服务订阅该事件,将订单标记为失败。
// 库存服务的事件处理器
@EventHandler
public void on(OrderCreated event) {
Inventory inv = inventoryRepository.findByProductId(event.getProductId());
if (inv.getStock() < event.getQuantity()) {
eventBus.publish(new InventoryInsufficient(event.getOrderId()));
return;
}
inv.reserve(event.getQuantity());
inventoryRepository.save(inv);
eventBus.publish(new InventoryReserved(event.getOrderId(), event.getProductId()));
}
@EventHandler
public void on(PaymentFailed event) {
// 补偿操作:释放已预留的库存
Inventory inv = inventoryRepository.findByProductId(event.getProductId());
inv.release(event.getQuantity());
inventoryRepository.save(inv);
eventBus.publish(new InventoryReleased(event.getOrderId()));
}优点:服务之间完全解耦,每个服务只关心自己订阅的事件。没有单点故障。
缺点:随着步骤增多,事件链变得难以追踪。一个 Saga 涉及 5 个服务时,理解”订单失败后会发生什么”需要在 5 个服务之间跳来跳去看代码。补偿逻辑散落在各个服务中,没有一个地方能看到 Saga 的全局流程。测试困难——你无法在单个服务中测试完整的 Saga 流程。
编排式 Saga(Orchestration)
有一个中央协调者(Orchestrator),负责告诉每个参与者”现在该做什么”。协调者维护 Saga 的状态机,根据参与者的响应决定下一步是继续执行还是执行补偿。
stateDiagram-v2
[*] --> 创建订单
创建订单 --> 预留库存: 订单已创建
预留库存 --> 处理支付: 库存已预留
处理支付 --> 确认订单: 支付成功
确认订单 --> [*]: 订单完成
处理支付 --> 释放库存: 支付失败
释放库存 --> 取消订单: 库存已释放
取消订单 --> [*]: 订单已取消
预留库存 --> 取消订单: 库存不足
下面是一个编排式 Saga 协调者的 Java 实现:
public class OrderSagaOrchestrator {
private final OrderService orderService;
private final InventoryService inventoryService;
private final PaymentService paymentService;
public enum SagaState {
STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSED,
ORDER_CONFIRMED, COMPENSATING, FAILED
}
public void execute(CreateOrderCommand cmd) {
SagaState state = SagaState.STARTED;
String sagaId = UUID.randomUUID().toString();
try {
// 步骤 1:创建订单
Order order = orderService.createOrder(cmd);
sagaLog.record(sagaId, "ORDER_CREATED", order.getId());
// 步骤 2:预留库存
inventoryService.reserve(order.getProductId(), order.getQuantity());
sagaLog.record(sagaId, "INVENTORY_RESERVED", order.getId());
state = SagaState.INVENTORY_RESERVED;
// 步骤 3:处理支付
paymentService.processPayment(order.getUserId(), order.getAmount());
sagaLog.record(sagaId, "PAYMENT_PROCESSED", order.getId());
state = SagaState.PAYMENT_PROCESSED;
// 步骤 4:确认订单
orderService.confirmOrder(order.getId());
sagaLog.record(sagaId, "ORDER_CONFIRMED", order.getId());
state = SagaState.ORDER_CONFIRMED;
} catch (InventoryInsufficientException e) {
compensate(sagaId, state, cmd);
} catch (PaymentFailedException e) {
compensate(sagaId, state, cmd);
}
}
private void compensate(String sagaId, SagaState failedAt, CreateOrderCommand cmd) {
sagaLog.record(sagaId, "COMPENSATING", failedAt.name());
switch (failedAt) {
case PAYMENT_PROCESSED:
// 不应该到这里,支付成功后不会触发补偿
break;
case INVENTORY_RESERVED:
// 支付失败,回滚库存
inventoryService.release(cmd.getProductId(), cmd.getQuantity());
sagaLog.record(sagaId, "INVENTORY_RELEASED", "");
// fall through
case STARTED:
// 取消订单
orderService.cancelOrder(cmd.getOrderId());
sagaLog.record(sagaId, "ORDER_CANCELLED", "");
break;
}
}
}优点:Saga 的完整流程在协调者中一目了然。补偿逻辑集中管理。添加新步骤时只需修改协调者,不需要在多个服务之间添加事件订阅。便于测试——可以对协调者做完整的单元测试。
缺点:协调者本身成为一个需要高可用保障的组件。服务之间的耦合从”事件耦合”变成了”协调者耦合”——协调者直接调用每个服务的接口,知道所有服务的存在。
编排与协调的选型判断
| 维度 | 协调式(Choreography) | 编排式(Orchestration) |
|---|---|---|
| 参与者数量 | 适合 2-4 个服务 | 适合 4 个以上服务 |
| 流程可见性 | 低,需要在多个服务间跳转 | 高,集中在协调者 |
| 耦合方式 | 事件耦合,松散 | 接口耦合,紧密 |
| 单点风险 | 无中央单点 | 协调者是潜在单点 |
| 测试难度 | 高,需要集成测试 | 中等,可单元测试协调者 |
| 适用场景 | 简单流程、服务团队独立 | 复杂流程、需要统一管理 |
经验判断:如果 Saga 涉及的步骤不超过 3 个、服务之间的依赖关系是线性的,协调式就够了。一旦步骤超过 3 个、或者步骤之间有分支和条件逻辑,编排式几乎是唯一可维护的选择。
三、Saga 的补偿机制设计
补偿是 Saga 的核心难点。和数据库的 ROLLBACK
不同,Saga 的补偿是语义层面的(Semantic
Compensation),不是物理层面的撤销。
语义补偿 vs 物理回滚
数据库回滚是精确的物理操作:undo log 记录了每个数据页的修改前镜像,回滚时精确恢复。但 Saga 的每一步都已经提交了,无法从数据库层面撤销。
补偿操作需要在业务语义上”抵消”前一步的效果。“扣减库存”的补偿不是”回滚扣减”,而是”释放已预留的库存”。“冻结余额”的补偿不是”撤销冻结”,而是”解冻余额”。这两件事在代码实现上可能完全不同。
更麻烦的是,在步骤 Ti 执行和补偿 Ci 执行之间,可能已经有其他事务修改了同一条数据。比如:库存服务扣减了商品 A 的库存,在补偿执行之前,另一个订单也扣减了商品 A 的库存。这时候”释放库存”不能简单地加回原来的数量,而是需要精确地只释放这一笔订单预留的数量。
补偿操作的设计原则
原则一:补偿操作必须幂等。
网络抖动、消息重复投递、协调者重试——都可能导致补偿操作被执行多次。如果”释放库存”不是幂等的,重复执行就会多释放库存。
实现幂等的常见做法是使用唯一标识符:
public class InventoryService {
public void release(String reservationId) {
// 通过 reservationId 保证幂等
Reservation reservation = reservationRepository.findById(reservationId);
if (reservation == null || reservation.getStatus() == ReservationStatus.RELEASED) {
// 已释放或不存在,直接返回
return;
}
Inventory inv = inventoryRepository.findByProductId(reservation.getProductId());
inv.setStock(inv.getStock() + reservation.getQuantity());
reservation.setStatus(ReservationStatus.RELEASED);
// 在同一个本地事务中更新库存和预留记录
inventoryRepository.save(inv);
reservationRepository.save(reservation);
}
}原则二:补偿操作必须能处理”前进操作未执行”的情况。
由于网络问题,协调者可能认为某一步执行失败,但实际上参与者还在执行中。协调者触发补偿时,前进操作可能尚未完成。补偿操作需要能够正确处理这种”空补偿”(Empty Compensation)场景。
public void cancelPayment(String paymentId) {
Payment payment = paymentRepository.findById(paymentId);
if (payment == null) {
// 空补偿:支付请求从未到达,直接成功
return;
}
if (payment.getStatus() == PaymentStatus.CANCELLED) {
// 幂等:已取消,直接成功
return;
}
payment.setStatus(PaymentStatus.CANCELLED);
accountService.unfreeze(payment.getUserId(), payment.getAmount());
paymentRepository.save(payment);
}原则三:补偿操作不应该失败。
如果补偿操作本身也会失败,Saga 就陷入了”补偿失败”的困境。设计补偿操作时应尽量确保它们只涉及本地数据库操作,不依赖外部服务。
如果补偿确实可能失败(比如需要调用外部退款接口),需要一个重试机制。通常的做法是:
- 将补偿请求写入本地的”补偿任务表”。
- 一个后台调度器定期扫描未完成的补偿任务,重试执行。
- 设置最大重试次数和退避策略。
- 超过最大重试次数后,触发告警,转人工处理。
@Scheduled(fixedDelay = 5000)
public void retryFailedCompensations() {
List<CompensationTask> pending = compensationTaskRepository
.findByStatusAndRetryCountLessThan(
CompensationStatus.PENDING, MAX_RETRIES);
for (CompensationTask task : pending) {
try {
executeCompensation(task);
task.setStatus(CompensationStatus.COMPLETED);
} catch (Exception e) {
task.setRetryCount(task.getRetryCount() + 1);
task.setNextRetryAt(calculateBackoff(task.getRetryCount()));
if (task.getRetryCount() >= MAX_RETRIES) {
task.setStatus(CompensationStatus.FAILED);
alertService.notify("Compensation failed permanently: " + task.getId());
}
}
compensationTaskRepository.save(task);
}
}隔离性问题
Saga 没有隔离性保证。在 T1 执行到 T3 之间,其他 Saga 或普通事务可以看到 T1 和 T2 已提交但 T3 尚未完成的中间状态。这可能导致”脏读”——另一个事务读到了一个最终会被补偿掉的状态。
缓解方案有几种:
语义锁(Semantic Lock):在每个步骤中,不是直接修改数据,而是先标记为”处理中”状态。比如库存不是直接减少,而是先”预留”——库存总量不变,但可用库存减少。只有在 Saga 全部成功后,才将”预留”状态变为”已扣减”。
-- 预留阶段:不直接修改 stock,而是增加 reserved
UPDATE inventory
SET reserved = reserved + 1
WHERE product_id = 1001 AND (stock - reserved) >= 1;
-- 确认阶段:Saga 全部成功后
UPDATE inventory
SET stock = stock - 1, reserved = reserved - 1
WHERE product_id = 1001;
-- 补偿阶段:Saga 失败后
UPDATE inventory
SET reserved = reserved - 1
WHERE product_id = 1001;交换律约束(Commutative Updates):如果步骤之间的操作满足交换律(即执行顺序不影响最终结果),隔离性问题就不存在。“加减库存”本质上是可交换的——两个 Saga 分别扣 1 和扣 2,无论谁先执行结果都是扣 3。但”设置库存为某个值”就不满足交换律。
版本号检查(Optimistic Concurrency):在补偿操作执行时检查版本号,确保补偿的是自己造成的修改。
四、TCC 模式
TCC 的工作机制
TCC(Try-Confirm-Cancel)由 Pat Helland 在 2007 年的论文 “Life beyond Distributed Transactions: an Apostate’s Opinion” 中阐述相关思想。TCC 将每个参与者的操作拆分为三个阶段:
- Try:预留资源。不真正执行业务操作,只是检查条件并锁定必要的资源。
- Confirm:确认执行。在 Try 成功的基础上,真正执行业务操作。Confirm 操作必须幂等。
- Cancel:取消预留。释放 Try 阶段锁定的资源。Cancel 操作必须幂等。
与 Saga 的关键区别在于:Saga 的每一步直接执行业务操作并提交,失败后通过补偿来”反转”效果;TCC 的 Try 阶段只预留资源不真正执行,Confirm 才真正执行。这意味着 TCC 在 Try 阶段就能发现问题,在任何真正的业务修改发生之前就可以取消,不需要像 Saga 那样处理”已提交的操作需要语义补偿”的复杂性。
订单处理的 TCC 实现
// 库存服务的 TCC 接口
public interface InventoryTccService {
/**
* Try:预留库存。
* 冻结指定数量的库存,但不真正扣减。
* 幂等:同一个 xid 重复调用返回成功。
*/
boolean tryReserve(String xid, String productId, int quantity);
/**
* Confirm:确认扣减。
* 将 Try 阶段冻结的库存真正扣减。
* 幂等:同一个 xid 重复调用返回成功。
*/
boolean confirm(String xid);
/**
* Cancel:释放预留。
* 解冻 Try 阶段冻结的库存。
* 幂等:同一个 xid 重复调用返回成功。
*/
boolean cancel(String xid);
}// 库存服务的 TCC 实现
public class InventoryTccServiceImpl implements InventoryTccService {
@Override
@Transactional
public boolean tryReserve(String xid, String productId, int quantity) {
// 幂等检查
TccRecord record = tccRecordRepository.findByXid(xid);
if (record != null) {
return record.getStatus() != TccStatus.CANCELLED;
}
Inventory inv = inventoryRepository.findByProductId(productId);
if (inv.getAvailable() < quantity) {
return false;
}
// 冻结库存:available 减少,frozen 增加
inv.setAvailable(inv.getAvailable() - quantity);
inv.setFrozen(inv.getFrozen() + quantity);
inventoryRepository.save(inv);
// 记录 TCC 事务
tccRecordRepository.save(new TccRecord(xid, productId, quantity, TccStatus.TRIED));
return true;
}
@Override
@Transactional
public boolean confirm(String xid) {
TccRecord record = tccRecordRepository.findByXid(xid);
if (record == null) {
return false;
}
if (record.getStatus() == TccStatus.CONFIRMED) {
return true; // 幂等
}
Inventory inv = inventoryRepository.findByProductId(record.getProductId());
// 真正扣减:frozen 减少
inv.setFrozen(inv.getFrozen() - record.getQuantity());
inventoryRepository.save(inv);
record.setStatus(TccStatus.CONFIRMED);
tccRecordRepository.save(record);
return true;
}
@Override
@Transactional
public boolean cancel(String xid) {
TccRecord record = tccRecordRepository.findByXid(xid);
if (record == null) {
return true; // 空取消,直接成功
}
if (record.getStatus() == TccStatus.CANCELLED) {
return true; // 幂等
}
Inventory inv = inventoryRepository.findByProductId(record.getProductId());
// 解冻:frozen 减少,available 恢复
inv.setFrozen(inv.getFrozen() - record.getQuantity());
inv.setAvailable(inv.getAvailable() + record.getQuantity());
inventoryRepository.save(inv);
record.setStatus(TccStatus.CANCELLED);
tccRecordRepository.save(record);
return true;
}
}Go 语言的等效实现,展示了 TCC 协调者的逻辑:
type TccCoordinator struct {
inventorySvc InventoryTccService
paymentSvc PaymentTccService
orderSvc OrderTccService
}
func (c *TccCoordinator) ProcessOrder(ctx context.Context, cmd CreateOrderCmd) error {
xid := uuid.New().String()
// Phase 1: Try
if err := c.orderSvc.TryCreate(ctx, xid, cmd); err != nil {
return fmt.Errorf("order try failed: %w", err)
}
if err := c.inventorySvc.TryReserve(ctx, xid, cmd.ProductID, cmd.Quantity); err != nil {
c.orderSvc.Cancel(ctx, xid)
return fmt.Errorf("inventory try failed: %w", err)
}
if err := c.paymentSvc.TryFreeze(ctx, xid, cmd.UserID, cmd.Amount); err != nil {
c.inventorySvc.Cancel(ctx, xid)
c.orderSvc.Cancel(ctx, xid)
return fmt.Errorf("payment try failed: %w", err)
}
// Phase 2: Confirm
// 所有 Try 都成功后,执行 Confirm。
// Confirm 失败需要重试(通常由框架保证)。
if err := c.orderSvc.Confirm(ctx, xid); err != nil {
log.Printf("order confirm failed, will retry: %v", err)
}
if err := c.inventorySvc.Confirm(ctx, xid); err != nil {
log.Printf("inventory confirm failed, will retry: %v", err)
}
if err := c.paymentSvc.Confirm(ctx, xid); err != nil {
log.Printf("payment confirm failed, will retry: %v", err)
}
return nil
}TCC 的适用场景与局限
TCC 的最大优势是资源隔离。在 Try 阶段,资源被”冻结”而不是”消耗”。其他事务能看到的状态是”可用库存 = 总库存 - 已冻结”,不会出现 Saga 那种”库存已扣但可能被补偿回来”的不确定状态。
但 TCC 的代价也很高:
业务侵入性强:每个参与者必须把业务逻辑拆成 Try/Confirm/Cancel 三个方法。对于已有系统来说,改造成本很高。不是所有业务操作都能自然地拆成”预留-确认-取消”三步——比如”发送短信通知”,预留一条短信的概念就很牵强。
Try 阶段的资源锁定:Try 成功后 Confirm 之前,资源处于冻结状态。如果 Confirm 迟迟没来(协调者崩溃、网络超时),冻结的资源就被白白占用。需要一个超时释放机制:
// 定时任务:释放超时的 TCC 预留
@Scheduled(fixedDelay = 10000)
public void releaseExpiredReservations() {
LocalDateTime cutoff = LocalDateTime.now().minusMinutes(5);
List<TccRecord> expired = tccRecordRepository
.findByStatusAndCreatedAtBefore(TccStatus.TRIED, cutoff);
for (TccRecord record : expired) {
cancel(record.getXid());
log.warn("Released expired TCC reservation: {}", record.getXid());
}
}并发性能:在高并发场景下,大量 Try 操作会冻结大量资源。如果商品库存只有 100 件,同时有 200 个 Try 请求,其中 100 个会成功冻结、100 个会立即失败。而这 100 个冻结中可能有一部分最终会 Cancel(用户取消、支付失败等),造成资源浪费。
五、本地消息表模式
核心思路
本地消息表(Local Message Table)的核心想法很简单:在本地数据库事务中,同时完成业务操作和消息写入。消息不是直接发送到消息队列,而是先写入本地数据库的一张”消息表”。然后由一个后台进程定期扫描消息表,将未发送的消息投递到消息队列。
这种做法解决的关键问题是:业务操作和消息发送的原子性。
如果先执行业务操作、再发送消息,可能业务操作成功但消息发送失败——下游服务永远不知道发生了什么。如果先发送消息、再执行业务操作,可能消息发送成功但业务操作失败——下游服务收到了一条不该存在的消息。
本地消息表把这两步合并到一个数据库事务中:
BEGIN;
-- 业务操作
UPDATE account SET balance = balance - 100 WHERE user_id = 42;
-- 写入消息表(同一个数据库)
INSERT INTO outbox_messages (id, topic, payload, status, created_at)
VALUES ('msg-001', 'payment.completed',
'{"orderId": "ord-123", "userId": 42, "amount": 100}',
'PENDING', NOW());
COMMIT;两个操作在同一个本地事务中,要么都成功,要么都失败。
消息投递与重试
后台进程负责将消息表中的待发送消息投递到消息队列:
@Component
public class OutboxMessageRelay {
private final OutboxMessageRepository messageRepository;
private final MessageBroker messageBroker;
@Scheduled(fixedDelay = 1000)
@Transactional
public void publishPendingMessages() {
List<OutboxMessage> pending = messageRepository
.findByStatusOrderByCreatedAtAsc(MessageStatus.PENDING, PageRequest.of(0, 100));
for (OutboxMessage msg : pending) {
try {
messageBroker.publish(msg.getTopic(), msg.getPayload());
msg.setStatus(MessageStatus.SENT);
msg.setSentAt(LocalDateTime.now());
} catch (Exception e) {
msg.setRetryCount(msg.getRetryCount() + 1);
if (msg.getRetryCount() > MAX_RETRIES) {
msg.setStatus(MessageStatus.FAILED);
log.error("Outbox message permanently failed: {}", msg.getId(), e);
}
}
messageRepository.save(msg);
}
}
}消费端的幂等保证
由于消息可能被重复投递(投递成功但更新状态失败,下次轮询会再次投递),消费端必须实现幂等处理:
@MessageHandler
public void handlePaymentCompleted(PaymentCompletedEvent event) {
// 幂等检查:使用消息 ID 去重
if (processedMessageRepository.existsById(event.getMessageId())) {
log.info("Message already processed: {}", event.getMessageId());
return;
}
// 在同一个事务中处理业务逻辑和记录消息 ID
orderService.confirmPayment(event.getOrderId());
processedMessageRepository.save(
new ProcessedMessage(event.getMessageId(), LocalDateTime.now()));
}本地消息表的优缺点
优点:
- 实现简单,不需要额外的中间件。
- 业务操作和消息写入的原子性由本地数据库事务保证,可靠性高。
- 对业务代码的侵入性低——只需要在业务事务中额外插入一条消息记录。
缺点:
- 消息表与业务表在同一个数据库中,会增加数据库的写负载。在高吞吐场景下,消息表可能成为性能瓶颈。
- 轮询模式(Polling)有固有延迟。轮询间隔设置为 1 秒意味着消息最多有 1 秒的延迟。间隔设置太短又会增加数据库压力。
- 消息表会持续增长,需要定期清理已发送的消息记录。
六、事务发件箱 + CDC
从轮询到变更数据捕获
本地消息表的轮询模式有两个固有问题:延迟和数据库负载。事务发件箱模式(Transactional Outbox Pattern)与变更数据捕获(Change Data Capture,CDC)的结合解决了这两个问题。
核心思路不变——业务操作和消息写入在同一个本地事务中完成。但消息的投递方式从轮询改为 CDC:通过读取数据库的事务日志(如 MySQL 的 binlog、PostgreSQL 的 WAL),实时捕获 outbox 表的变更,然后将变更推送到消息队列。
业务服务 数据库 CDC 连接器 消息队列
| | | |
|--- BEGIN --------------->| | |
|--- UPDATE business ----->| | |
|--- INSERT outbox ------->| | |
|--- COMMIT -------------->| | |
| |-- binlog/WAL 变更 ------->| |
| | |--- 发布消息 -------->|
Debezium 实现
Debezium 是目前最成熟的开源 CDC 工具,支持 MySQL、PostgreSQL、MongoDB、SQL Server 等数据库。它作为 Kafka Connect 的一个 Source Connector 运行,读取数据库的事务日志,将变更事件发送到 Kafka 主题。
Outbox 表的设计:
CREATE TABLE outbox (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSON NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);Debezium 提供了专门的 Outbox Event Router(SMT,Single Message Transform),可以将 outbox 表的行变更自动转换为结构化的事件消息:
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "order-db",
"database.port": "3306",
"database.user": "debezium",
"database.password": "******",
"database.server.id": "1",
"database.server.name": "order-service",
"table.include.list": "orders.outbox",
"transforms": "outbox",
"transforms.outbox.type":
"io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement":
"outbox.event.${routedByValue}"
}
}配置完成后,每当业务事务向 outbox 表插入一行,Debezium
会在毫秒级别内捕获该变更,提取 payload,根据
aggregate_type 路由到对应的 Kafka 主题。比如
aggregate_type = "Order" 的事件会被发送到
outbox.event.Order 主题。
Outbox 表的清理
CDC 模式下,outbox 表的行在被 Debezium
捕获后就不再需要了。但不能简单地在业务事务中
DELETE——因为 DELETE 操作也会产生 binlog
事件,可能干扰 CDC 连接器。
推荐的做法是定期批量清理:
-- 清理 24 小时前的已处理消息
DELETE FROM outbox WHERE created_at < DATE_SUB(NOW(), INTERVAL 24 HOUR) LIMIT 10000;分批删除(每次 10000 行)避免长事务锁表。间隔 24 小时是为了保留足够的窗口,以防 Debezium 连接器临时不可用后需要从断点恢复。
CDC 方案的优缺点
优点:
- 接近实时的消息投递,延迟通常在毫秒级别。
- 不需要轮询数据库,消除了轮询带来的负载。
- 利用数据库已有的事务日志机制,不需要修改业务代码。
缺点:
- 引入了 CDC 工具(Debezium)和消息队列(Kafka)的运维复杂度。
- CDC 连接器本身需要高可用保障——如果连接器宕机,消息会延迟。
- 数据库的事务日志格式和 CDC 连接器的版本需要匹配,升级数据库时可能遇到兼容性问题。
- 对数据库有配置要求:MySQL 需要开启 binlog 且格式为
ROW,PostgreSQL 需要设置
wal_level = logical。
七、Eventuate Tram 实现剖析
框架定位
Eventuate Tram 是 Chris Richardson(《Microservices Patterns》作者)开发的开源框架,专门解决微服务架构下的事务性消息传递和 Saga 编排问题。它的核心机制就是事务发件箱模式。
Eventuate Tram 的架构分三层:
- Tram 核心:提供事务性消息发送和接收的 API。
- Tram Sagas:在核心之上提供 Saga 编排的 DSL 和执行引擎。
- CDC 服务:独立部署的进程,读取数据库的事务日志,将 outbox 表中的消息投递到消息代理(Kafka 或 ActiveMQ)。
消息发送的内部流程
当业务代码调用 messageSender.send()
时,Eventuate Tram 不会直接发送到
Kafka,而是将消息插入本地数据库的 message
表:
// Eventuate Tram 的 MessageProducer 内部实现(简化版)
public class MessageProducerImpl implements MessageProducer {
private final MessageRepository messageRepository;
@Override
@Transactional
public void send(String destination, Message message) {
message.setId(idGenerator.genId());
message.setDestination(destination);
message.setHeaders(enrichHeaders(message.getHeaders()));
// 关键:消息写入本地数据库,与业务操作在同一个事务中
messageRepository.save(toMessageEntity(message));
}
}message 表的 schema(以 MySQL 为例):
CREATE TABLE message (
id VARCHAR(767) PRIMARY KEY,
destination VARCHAR(1000) NOT NULL,
headers VARCHAR(1000) NOT NULL,
payload TEXT NOT NULL,
published SMALLINT DEFAULT 0,
creation_time BIGINT
);CDC
服务(eventuate-cdc-service)以独立进程运行,通过
MySQL binlog 或 PostgreSQL WAL 监听 message
表的变更。捕获到新插入的行后,将 payload 发送到
destination 指定的 Kafka 主题,然后将
published 标记为 1。
Saga 编排引擎
Eventuate Tram Sagas 提供了一个声明式的 DSL 来定义 Saga 的步骤:
public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {
private final SagaDefinition<CreateOrderSagaData> sagaDefinition;
public CreateOrderSaga(OrderServiceProxy orderService,
InventoryServiceProxy inventoryService,
PaymentServiceProxy paymentService) {
this.sagaDefinition =
step()
.invokeLocal(this::createOrder)
.withCompensation(this::rejectOrder)
.step()
.invokeParticipant(inventoryService.reserve)
.withCompensation(inventoryService.release)
.step()
.invokeParticipant(paymentService.process)
.withCompensation(paymentService.refund)
.step()
.invokeLocal(this::approveOrder)
.build();
}
@Override
public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
return sagaDefinition;
}
private void createOrder(CreateOrderSagaData data) {
// 创建订单,状态为 PENDING
Order order = orderRepository.save(
new Order(data.getUserId(), data.getProductId(),
data.getQuantity(), OrderStatus.PENDING));
data.setOrderId(order.getId());
}
private void rejectOrder(CreateOrderSagaData data) {
orderRepository.updateStatus(data.getOrderId(), OrderStatus.REJECTED);
}
private void approveOrder(CreateOrderSagaData data) {
orderRepository.updateStatus(data.getOrderId(), OrderStatus.APPROVED);
}
}Saga 引擎维护了一个 saga_instance
表来跟踪每个 Saga 实例的状态:
CREATE TABLE saga_instance (
saga_type VARCHAR(255) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
state_name VARCHAR(255) NOT NULL,
last_request_id VARCHAR(100),
end_state TINYINT(1),
compensating TINYINT(1),
saga_data_type VARCHAR(1000) NOT NULL,
saga_data_json TEXT NOT NULL,
PRIMARY KEY (saga_type, saga_id)
);每次状态转换,引擎都会更新 saga_instance
表并在同一个事务中写入命令消息到 message
表。这保证了 Saga 状态推进和命令发送的原子性。
参与者端的消息处理
参与者端收到命令消息后执行业务逻辑,然后发送回复消息:
public class InventoryCommandHandler {
@CommandHandler
public Message reserve(CommandMessage<ReserveInventoryCommand> cm) {
ReserveInventoryCommand cmd = cm.getCommand();
try {
inventoryService.reserve(cmd.getProductId(), cmd.getQuantity());
return withSuccess(new InventoryReserved(cmd.getProductId()));
} catch (InsufficientInventoryException e) {
return withFailure(new InventoryInsufficient(cmd.getProductId()));
}
}
@CommandHandler
public Message release(CommandMessage<ReleaseInventoryCommand> cm) {
ReleaseInventoryCommand cmd = cm.getCommand();
inventoryService.release(cmd.getProductId(), cmd.getQuantity());
return withSuccess(new InventoryReleased(cmd.getProductId()));
}
}回复消息同样通过本地消息表发送,经 CDC 投递到 Kafka,最终被 Saga 引擎消费并驱动状态机的下一步。
Eventuate Tram 的架构特点
整个消息流的可靠性链条:
- 业务操作 + 消息写入 = 一个本地事务(原子性)。
- CDC 读取事务日志 = 至少一次投递(at-least-once)。
- 消费端幂等处理 = 重复消息不会导致错误。
最终效果是:只要数据库和 Kafka 正常运行,消息不会丢失也不会重复生效。延迟主要取决于 CDC 的轮询间隔(默认 500ms)和 Kafka 的消费延迟。
八、工程案例:电商订单处理系统
场景描述
一个中型电商平台,日均订单量 50 万笔,峰值 QPS 约 2000。系统拆分为以下微服务:
- 订单服务(Order Service):管理订单生命周期。
- 库存服务(Inventory Service):管理商品库存。
- 支付服务(Payment Service):处理支付和退款。
- 通知服务(Notification Service):发送短信和邮件通知。
订单创建流程需要跨越前三个服务,通知是异步的、允许延迟。
方案选型过程
排除 2PC:三个服务使用不同的数据库(订单用 MySQL,库存用 Redis + MySQL,支付对接第三方),无法使用 XA 事务。即便能用,2PC 的性能在峰值 2000 QPS 下也不可接受。
评估 TCC:TCC 的资源隔离能力对库存和支付场景很合适——“冻结库存”和”冻结余额”是天然的 Try 操作。但支付服务对接第三方支付网关,第三方接口不支持 Try/Confirm/Cancel 语义。强行在中间加一层”预冻结”会引入额外的复杂性和延迟。
选择 Saga + Outbox:使用编排式 Saga 管理订单流程,使用事务发件箱模式保证消息可靠投递。
实现方案
Saga 协调者使用 Eventuate Tram Sagas 框架:
public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {
private final SagaDefinition<CreateOrderSagaData> sagaDefinition;
public CreateOrderSaga() {
this.sagaDefinition =
step()
.invokeLocal(this::createPendingOrder)
.withCompensation(this::rejectOrder)
.step()
.invokeParticipant(this::reserveInventory)
.onReply(InventoryReserved.class, this::onInventoryReserved)
.withCompensation(this::releaseInventory)
.step()
.invokeParticipant(this::requestPayment)
.onReply(PaymentCompleted.class, this::onPaymentCompleted)
.withCompensation(this::refundPayment)
.step()
.invokeLocal(this::approveOrder)
.step()
.invokeParticipant(this::sendNotification)
// 通知不需要补偿——发了就发了
.build();
}
private void createPendingOrder(CreateOrderSagaData data) {
Order order = new Order();
order.setUserId(data.getUserId());
order.setProductId(data.getProductId());
order.setQuantity(data.getQuantity());
order.setAmount(data.getAmount());
order.setStatus(OrderStatus.PENDING);
order = orderRepository.save(order);
data.setOrderId(order.getId());
}
private CommandWithDestination reserveInventory(CreateOrderSagaData data) {
return send(new ReserveInventoryCommand(
data.getOrderId(), data.getProductId(), data.getQuantity()))
.to("inventoryService")
.build();
}
private CommandWithDestination releaseInventory(CreateOrderSagaData data) {
return send(new ReleaseInventoryCommand(
data.getOrderId(), data.getProductId(), data.getQuantity()))
.to("inventoryService")
.build();
}
private CommandWithDestination requestPayment(CreateOrderSagaData data) {
return send(new ProcessPaymentCommand(
data.getOrderId(), data.getUserId(), data.getAmount()))
.to("paymentService")
.build();
}
private CommandWithDestination refundPayment(CreateOrderSagaData data) {
return send(new RefundPaymentCommand(
data.getOrderId(), data.getPaymentId()))
.to("paymentService")
.build();
}
private void approveOrder(CreateOrderSagaData data) {
orderRepository.updateStatus(data.getOrderId(), OrderStatus.APPROVED);
}
private void rejectOrder(CreateOrderSagaData data) {
orderRepository.updateStatus(data.getOrderId(), OrderStatus.REJECTED);
}
private void onInventoryReserved(CreateOrderSagaData data,
InventoryReserved reply) {
data.setReservationId(reply.getReservationId());
}
private void onPaymentCompleted(CreateOrderSagaData data,
PaymentCompleted reply) {
data.setPaymentId(reply.getPaymentId());
}
private CommandWithDestination sendNotification(CreateOrderSagaData data) {
return send(new SendOrderConfirmationCommand(
data.getOrderId(), data.getUserId()))
.to("notificationService")
.build();
}
}支付补偿的特殊处理
支付服务对接第三方支付网关,退款接口可能需要几秒到几分钟才能返回结果。补偿操作不能同步等待退款完成。
public class PaymentCommandHandler {
@CommandHandler
public Message refund(CommandMessage<RefundPaymentCommand> cm) {
RefundPaymentCommand cmd = cm.getCommand();
Payment payment = paymentRepository.findById(cmd.getPaymentId());
if (payment == null || payment.getStatus() == PaymentStatus.REFUNDED) {
return withSuccess(new PaymentRefunded(cmd.getPaymentId()));
}
// 发起异步退款请求
String refundId = paymentGateway.initiateRefund(payment.getTransactionId(),
payment.getAmount());
payment.setStatus(PaymentStatus.REFUND_PENDING);
payment.setRefundId(refundId);
paymentRepository.save(payment);
// 立即返回成功——退款结果由回调处理
return withSuccess(new PaymentRefundInitiated(cmd.getPaymentId(), refundId));
}
// 第三方支付网关的退款回调
@PostMapping("/webhook/refund")
public void handleRefundCallback(@RequestBody RefundCallbackRequest req) {
Payment payment = paymentRepository.findByRefundId(req.getRefundId());
if ("SUCCESS".equals(req.getStatus())) {
payment.setStatus(PaymentStatus.REFUNDED);
} else {
payment.setStatus(PaymentStatus.REFUND_FAILED);
alertService.notify("Refund failed: " + req.getRefundId());
}
paymentRepository.save(payment);
}
}这里有一个设计决策:Saga 的补偿步骤在发起退款请求后就返回成功,而不是等待退款真正完成。这是因为 Saga 协调者需要继续执行后续的补偿步骤(释放库存、取消订单),不能卡在退款上。退款的最终结果由异步回调处理,如果退款失败则触发告警转人工。
九、各模式的选型决策框架
对比表
| 维度 | 2PC | Saga | TCC | 本地消息表 | Outbox + CDC |
|---|---|---|---|---|---|
| 一致性级别 | 强一致 | 最终一致 | 最终一致 | 最终一致 | 最终一致 |
| 性能影响 | 高(同步阻塞) | 低 | 中(Try 锁定资源) | 低 | 低 |
| 可用性 | 低(协调者单点) | 高 | 中 | 高 | 高 |
| 业务侵入性 | 低 | 中(需写补偿逻辑) | 高(需拆分三阶段) | 低 | 低 |
| 隔离性 | 完整 ACID 隔离 | 无隔离(需额外设计) | 有资源冻结隔离 | 无隔离 | 无隔离 |
| 实现复杂度 | 低(数据库支持) | 中 | 高 | 低 | 中(需 CDC 基础设施) |
| 运维复杂度 | 中 | 中 | 高 | 低 | 高(CDC + 消息队列) |
| 适用事务时长 | 短事务 | 长短均可 | 短事务 | 异步消息传递 | 异步消息传递 |
| 跨语言支持 | 取决于数据库 | 好 | 好 | 好 | 好 |
| 回滚精确度 | 精确(物理回滚) | 语义补偿 | 精确(Cancel) | 不涉及回滚 | 不涉及回滚 |
决策树
选型可以按以下思路逐步排除:
第一步:是否需要强一致性? 如果业务绝对不能容忍任何时间窗口的不一致(如金融核心账务系统、证券交易清算),考虑 2PC 或数据库级别的方案。但要清楚代价:性能下降和可用性风险。
第二步:参与者是否支持 Try/Confirm/Cancel 语义? 如果所有参与者的操作都能自然地拆分为资源预留和确认两步,且对隔离性有明确要求,TCC 是好选择。如果有参与者无法支持(如第三方接口、发送通知),排除 TCC。
第三步:跨服务操作的步骤数量? 如果只涉及两个服务之间的异步消息传递(A 做完通知 B),本地消息表或 Outbox + CDC 就够了。如果涉及多个服务的有序操作和补偿逻辑,需要 Saga。
第四步:是否已有 CDC 基础设施? 如果系统已经在使用 Debezium 或类似工具做数据同步,Outbox + CDC 的增量成本很低。如果没有,引入 CDC 技术栈的运维成本需要认真评估。
第五步:Saga 的编排方式? 步骤少于 4 个、依赖关系简单用协调式;步骤多于 3 个、有条件分支用编排式。
混合使用
实际系统中,多种模式经常混合使用。上面的电商案例中:
- 订单、库存、支付之间使用编排式 Saga。
- Saga 的消息传递通过 Outbox + CDC 保证可靠性。
- 通知服务作为 Saga 的最后一步,不需要补偿——失败了就进入重试队列。
- 库存服务内部使用语义锁(预留/确认/释放)来提供隔离性,这实质上是 TCC 的 Try/Confirm/Cancel 思想。
一种常见的反模式是”全局统一用一种方案”。比如把所有跨服务操作都用 Saga 包装,包括那些只需要简单异步通知的场景。过度设计的一致性方案会引入不必要的复杂性和运维负担。
十、补偿设计的工程实践
补偿日志
无论使用哪种模式,补偿操作的执行记录必须持久化:
CREATE TABLE compensation_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
saga_id VARCHAR(100) NOT NULL,
step_name VARCHAR(100) NOT NULL,
action VARCHAR(20) NOT NULL, -- 'FORWARD' or 'COMPENSATE'
status VARCHAR(20) NOT NULL, -- 'SUCCESS', 'FAILED', 'PENDING'
request_payload TEXT,
response_payload TEXT,
retry_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_saga_id (saga_id),
INDEX idx_status (status)
);这张表有两个作用:
- 故障恢复:协调者重启后,通过补偿日志恢复 Saga 的状态,继续执行未完成的补偿。
- 审计追踪:出现数据不一致时,通过补偿日志追溯每一步操作的执行情况和结果。
监控与告警
需要监控的关键指标:
// Go 语言的 Saga 监控指标
var (
sagaStarted = prometheus.NewCounter(prometheus.CounterOpts{
Name: "saga_started_total",
Help: "Total number of sagas started",
})
sagaCompleted = prometheus.NewCounter(prometheus.CounterOpts{
Name: "saga_completed_total",
Help: "Total number of sagas completed successfully",
})
sagaCompensated = prometheus.NewCounter(prometheus.CounterOpts{
Name: "saga_compensated_total",
Help: "Total number of sagas that triggered compensation",
})
sagaFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "saga_failed_total",
Help: "Total number of sagas that failed permanently",
})
sagaDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "saga_duration_seconds",
Help: "Duration of saga execution",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 15),
})
compensationRetries = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "compensation_retries",
Help: "Number of retries before compensation succeeds",
Buckets: prometheus.LinearBuckets(0, 1, 10),
})
)关键告警规则:
saga_failed_total持续增长:有 Saga 无法完成补偿,需要人工介入。saga_duration_secondsP99 超过阈值:Saga 执行变慢,可能某个参与者响应变慢。compensation_retries中位数超过 3:补偿操作不稳定,需要检查参与者的健康状况。- Outbox 表中
status = PENDING且created_at超过 5 分钟的记录数 > 0:CDC 可能停止工作。
人工介入机制
当自动补偿失败后,系统需要提供人工介入的手段:
// 管理后台接口:手动触发 Saga 补偿或强制完成
@RestController
@RequestMapping("/admin/sagas")
public class SagaAdminController {
@PostMapping("/{sagaId}/retry-compensation")
public ResponseEntity<?> retryCompensation(@PathVariable String sagaId) {
SagaInstance saga = sagaRepository.findById(sagaId);
if (saga == null) {
return ResponseEntity.notFound().build();
}
if (!saga.isCompensating()) {
return ResponseEntity.badRequest()
.body("Saga is not in compensating state");
}
sagaEngine.retryCompensation(sagaId);
return ResponseEntity.ok("Compensation retry initiated");
}
@PostMapping("/{sagaId}/force-complete")
public ResponseEntity<?> forceComplete(@PathVariable String sagaId) {
// 强制标记 Saga 为完成状态
// 仅在人工确认数据已手动修复后使用
SagaInstance saga = sagaRepository.findById(sagaId);
saga.setEndState(true);
saga.setCompensating(false);
sagaRepository.save(saga);
auditLog.record("SAGA_FORCE_COMPLETE", sagaId,
SecurityContextHolder.getContext().getAuthentication().getName());
return ResponseEntity.ok("Saga force completed");
}
}测试策略
Saga 和 TCC 的测试需要覆盖三类场景:
- 正常路径:所有步骤成功完成。
- 每一步失败的补偿路径:步骤 1 失败、步骤 2 失败、步骤 3 失败——每种情况的补偿序列是否正确。
- 异常场景:网络超时、重复消息、空补偿、并发冲突。
@Test
void sagaShouldCompensateWhenPaymentFails() {
// Given
when(inventoryService.reserve(any(), anyInt()))
.thenReturn(new InventoryReserved("res-1"));
when(paymentService.process(any(), any(), any()))
.thenThrow(new PaymentFailedException("Insufficient balance"));
// When
sagaOrchestrator.execute(new CreateOrderCommand("user-1", "prod-1", 1, 99.00));
// Then
// 验证补偿操作按逆序执行
InOrder inOrder = inOrder(inventoryService, orderService);
inOrder.verify(inventoryService).release("prod-1", 1);
inOrder.verify(orderService).cancelOrder(anyString());
// 验证订单状态
Order order = orderRepository.findByUserId("user-1");
assertEquals(OrderStatus.REJECTED, order.getStatus());
}
@Test
void compensationShouldBeIdempotent() {
// Given:模拟补偿被执行两次
inventoryService.reserve("res-1", "prod-1", 1);
// When:第一次补偿
inventoryService.release("res-1");
int stockAfterFirst = inventoryRepository.findByProductId("prod-1").getStock();
// When:第二次补偿(重复)
inventoryService.release("res-1");
int stockAfterSecond = inventoryRepository.findByProductId("prod-1").getStock();
// Then:库存只释放了一次
assertEquals(stockAfterFirst, stockAfterSecond);
}十一、常见陷阱与应对
陷阱一:忽略消息顺序
在协调式 Saga 中,事件通过消息队列传递。如果消息队列不保证顺序(或消费者有多个实例并行消费),可能出现补偿事件先于正向事件到达的情况。
比如:库存服务还没收到 OrderCreated
事件,就先收到了 PaymentFailed
需要释放库存——但它从来没有预留过库存。
应对方案:消费端对”未知预留”的释放请求做空操作处理,或者使用消息队列的分区机制保证同一个
orderId 的消息被同一个消费者顺序处理。
陷阱二:补偿和正向操作并发执行
在编排式 Saga 中,协调者向参与者发送正向命令后等待回复。如果网络超时,协调者可能判定步骤失败并发起补偿。但实际上参与者正在执行正向操作,补偿命令和正向操作可能并发执行。
应对方案:参与者端使用状态机管理操作状态。只有在”已完成”状态下才接受补偿,在”进行中”状态下拒绝补偿(让协调者稍后重试),或者使用悲观锁/乐观锁防止并发。
@Transactional
public void handleCompensation(String operationId) {
OperationRecord record = operationRepository
.findByIdForUpdate(operationId); // SELECT ... FOR UPDATE
if (record == null) {
// 正向操作尚未开始,记录一个"预补偿"标记
operationRepository.save(new OperationRecord(operationId,
OperationStatus.PRE_COMPENSATED));
return;
}
if (record.getStatus() == OperationStatus.IN_PROGRESS) {
throw new ConcurrentOperationException(
"Operation in progress, retry later");
}
// 正向操作已完成,执行补偿
doCompensate(record);
}陷阱三:不必要的 Saga
并非所有跨服务操作都需要 Saga。如果操作之间没有依赖关系,可以用更简单的方案:
- 独立重试:每个操作独立执行,失败后单独重试,不需要全局补偿。适合”最多一次”或”至少一次”语义即可的场景。
- 查询对账:定期对比两个服务的数据,发现不一致后修复。适合允许较长不一致窗口的场景(如积分系统、统计系统)。
- 幂等接口 + 重试:上游直接调用下游的幂等接口,失败后重试。适合只有两个服务、无需第三方协调的场景。
陷阱四:Outbox 表膨胀
在高吞吐系统中,outbox 表的增长速度可能很快。日均 50 万笔订单,每笔订单产生 3-5 条 outbox 消息,一天就是 150-250 万行。如果不及时清理,表会快速膨胀,影响写入性能和 CDC 连接器的效率。
应对方案:
-- 使用分区表,按天分区
CREATE TABLE outbox (
id BIGINT NOT NULL AUTO_INCREMENT,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSON NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)) (
PARTITION p20260413 VALUES LESS THAN (UNIX_TIMESTAMP('2026-04-14')),
PARTITION p20260414 VALUES LESS THAN (UNIX_TIMESTAMP('2026-04-15')),
PARTITION pmax VALUES LESS THAN MAXVALUE
);
-- 清理:直接 DROP 旧分区,比 DELETE 快几个数量级
ALTER TABLE outbox DROP PARTITION p20260411;十二、总结
应用层一致性模式的本质是一种工程权衡:在分布式环境下放弃全局事务的便利性,换取性能和可用性。
几个核心判断:
- 2PC 适合”不能出错”且”可以慢”的场景,在互联网业务中很少用。
- TCC 适合对隔离性有要求、参与者操作可自然拆分为三阶段的场景,实现成本高。
- Saga 是最通用的模式,编排式适合复杂流程、协调式适合简单流程。补偿设计是核心难点——幂等、空补偿、补偿失败处理必须覆盖。
- 本地消息表和 Outbox + CDC 不是独立的一致性模式,而是 Saga 和 TCC 的基础设施——保证”业务操作和消息发送的原子性”。
选型时不要追求”一种方案解决所有问题”。真实系统中,不同的业务流程可能需要不同的一致性方案,混合使用才是常态。关键是理解每种模式的代价和边界,在正确性和性能之间找到自己系统的平衡点。
导航
上一篇: 数据迁移与版本化
下一篇: API 设计
参考资料
论文
- Hector Garcia-Molina, Kenneth Salem. “Sagas.” ACM SIGMOD, 1987.
- Pat Helland. “Life beyond Distributed Transactions: an Apostate’s Opinion.” CIDR, 2007.
- Jim Gray, Leslie Lamport. “Consensus on Transaction Commit.” ACM TODS, 2006.
书籍
- Chris Richardson.《Microservices Patterns: With examples in Java》. Manning, 2018. 第 4 章:Managing transactions with sagas.
- Martin Kleppmann.《Designing Data-Intensive Applications》. O’Reilly, 2017. 第 7 章和第 9 章关于分布式事务的讨论.
开源项目
- Eventuate Tram: https://github.com/eventuate-tram/eventuate-tram-core
- Eventuate Tram Sagas: https://github.com/eventuate-tram/eventuate-tram-sagas
- Debezium: https://debezium.io/documentation/reference/stable/
- Debezium Outbox Event Router: https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html
- Seata (阿里巴巴开源的分布式事务框架): https://seata.io/
规范与设计文档
- XA Specification (X/Open CAE Specification): Distributed Transaction Processing.
- Kafka Connect: https://kafka.apache.org/documentation/#connect
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【系统架构设计百科】架构质量属性:不只是"高可用高性能"
需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。
【系统架构设计百科】告警策略:如何避免"狼来了"
大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。
【系统架构设计百科】复杂性管理:架构的核心战场
系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略
【系统架构设计百科】微服务架构深度审视:优势、代价与适用边界
微服务不是免费的午餐。本文从分布式系统八大谬误出发,拆解微服务真正解决的问题与引入的代价,梳理服务边界划分的工程方法论,还原 Amazon 和 Netflix 从单体到微服务的真实演进时间线,给出微服务适用与不适用的判断框架。