土法炼钢兴趣小组的算法知识备份

【流式数据处理】两阶段提交与端到端 Exactly-Once

文章导航

分类入口
databasedistributed
标签入口
#flink#two-phase-commit#exactly-once#kafka-transactions#iceberg-sink#GenericTwoPhaseCommitSink#checkpoint#notifyCheckpointComplete

目录

第 14 篇 建立三层交付语义:引擎 EXACTLY_ONCE checkpoint alone 不够,Sink 必须把外部写入拆成「预提交(不可见)」与「提交(可见)」,且 提交时刻不得早于 checkpoint 全局完成。Flink 把这一模式抽象为 TwoPhaseCommitSinkFunction / GenericTwoPhaseCommitSink(Flink 文档 Two-Phase Commit Sink,A 级)。

lakehouse 第 19 章表格式侧 描述 IcebergStreamWriter + IcebergFilesCommitter、checkpoint 完成后的 Iceberg append;lakehouse 第 11 章 说明 提交 = catalog 对 metadata 指针的 CAS。本文从 Flink 引擎侧 说明 2PC 协议如何调用这些 sink、Kafka 事务 sink 如何同构、以及 commit 前崩溃 / commit 后崩溃 / 重复 commit 三条失败线。不重复 lakehouse/19 的 equality delete、Connect control topic 与小文件治理全文。

本文是「流式数据处理」系列第 15 篇(共 18 篇)。→ 系列目录

篇目 核心内容
第 14 篇 · 交付语义 三层语义与最弱环
第 15 篇 · 两阶段提交 2PC 协议、Kafka/Iceberg sink、失败场景
第 16 篇 · Debezium CDC CDC 事件与 Connect offset

版本锚定:Flink 1.20+ / 2.x;Kafka 3.x;Iceberg Flink sink 以官方 Flink Writes 为准。

环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2),未安装 JVM / Flink / Kafka / Iceberg。协议来自 Flink 官方文档与 connector 设计;不粘贴未执行的 EOS 计数实验输出


一、为什么需要两阶段提交

1.1 单阶段写入的失败窗口

第 14 篇 的时间线:Sink 在 invoke 里直接写 Kafka / JDBC,写入可能早于 checkpoint 持久化。TM 崩溃后:

要把「对外可见」推迟到 确认全局快照成功 之后,需要:

  1. Phase 1(pre-commit):副作用进入 pending 状态,下游不可见(或不可提交);
  2. Checkpoint 持久化:pending 句柄写入 checkpoint 存储;
  3. Phase 2(commit):Flink 调用 notifyCheckpointComplete(checkpointId) 后才 commit
  4. Abort:checkpoint 失败或作业 fail-over 时 撤销 未 commit 的 pending。
sequenceDiagram
  participant JM as JobManager
  participant OP as Sink subtask
  participant EXT as 外部系统
  JM->>OP: checkpoint N 开始
  OP->>EXT: preCommit(不可见)
  OP->>JM: pending 状态写入 ckpt N
  JM->>JM: 所有算子 ack,ckpt N 完成
  JM->>OP: notifyCheckpointComplete(N)
  OP->>EXT: commit(可见)

1.2 与分布式事务 2PC 的对应

经典 2PC:Coordinator 协调 Participant preparecommit/abort。Flink 中:

经典 2PC Flink
Coordinator JobManager / CheckpointCoordinator
Participant TwoPhaseCommitSink subtask
Prepare preCommit / snapshotState
Commit commit(在 notifyComplete 后)
Abort abort(checkpoint 过期或恢复时)

Flink 不是 XA 全局事务管理器:每个 sink subtask 独立 与外部系统协商;端到端一致性靠 checkpoint 边界统一 commit 时刻 + 外部系统 幂等 commit


二、GenericTwoPhaseCommitSink 协议

2.1 接口生命周期

Flink TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>(及 Sink V2 中的 commit 语义)核心方法(文档 Two-Phase Commit Sink):

方法 调用时机 职责
beginTransaction() 新 checkpoint 周期开始 创建外部事务句柄 TXN
invoke(TXN, value) 每条 record 写入事务缓冲
preCommit(TXN) snapshotState 刷盘、封账;返回可序列化 pending
commit(TXN) notifyCheckpointComplete 使外部系统可见
abort(TXN) 恢复 / checkpoint 废弃 撤销 pending

关键不变量(Flink 文档,A 级):

  1. preCommit 之后、commit 之前,外部系统 不得 把数据暴露给正常 consumer / reader;
  2. commit 必须是 幂等 的:同一 checkpointId 重复 commit 不得产生 duplicate visible effect;
  3. 恢复时,对 已 commit 但未 ack 完成in-doubt 的事务,须能 commit 或 abort 到一致状态

2.2 与 CheckpointCoordinator 的挂钩

第 10 篇 的生命周期:

  1. Barrier 对齐 → 各算子 snapshotState(Sink 在此 preCommit);
  2. 所有 state handle 持久化 → checkpoint COMPLETED
  3. notifyCheckpointComplete(checkpointId) → Sink commit
  4. 若 checkpoint 超时/失败 → 该 checkpoint 的 pending 事务 abort,恢复后从新 barrier 重算。

Sink 不得snapshotState 返回后假设一定会 commit——JM 可能在 collect 阶段失败。

2.3 多 subtask Sink 的协调

Kafka sink:每个 subtask 独立 Kafka transaction,同一 transactional.id 前缀 + fencing 防止 zombie producer(Kafka 文档 Transactions)。

Iceberg sink:Writer 多并行 subtask 各写 data file;Committer 常为 低并行度(甚至 1),在 commit 阶段汇总各 writer 的 DataFile 清单做一次 Iceberg 提交(lakehouse 第 19 章 架构图——本文不重复 upsert SQL)。Committer 的 2PC 状态存 checkpoint;writer 只负责 flush 文件。

2.4 Sink V2(Unified Sink API)中的 Committer

Flink 1.12+ 引入 Sink / SinkWriter / Committer 分离(2.x 文档 Unified Sink):

组件 2PC 职责
SinkWriter 缓冲写、在 checkpoint 时 emit committable
CommittingOperator 收集 committable,snapshotState 持久化;notifyCheckpointComplete 调用 commit

TwoPhaseCommittingSink 接口把经典 TwoPhaseCommitSinkFunction 拆进统一 API;Kafka / Iceberg 官方 connector 新版本走此路径。语义不变:仍是 preCommit → checkpoint → commit。

2.5 恢复时的「悬挂事务」清理

作业 fail-over 后,新 JM 从 checkpoint 元数据读取 已完成进行中 的 checkpoint 列表。对 TwoPhaseCommitSink:

Flink 文档 Fault Tolerance 要求 sink 实现 recoverable commit——这是 EOS 与「仅一次 preCommit」的分界线。


三、Kafka Sink 的 2PC 落点

3.1 事务边界与 checkpoint 对齐

Flink Kafka Sink exactly-once(文档 Kafka ConnectorExactly Once):

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-app-")
    .build();
配置 作用
DeliveryGuarantee.EXACTLY_ONCE 启用 2PC 路径
transactional.id 前缀 每 subtask 派生唯一 transactional.id
checkpoint 间隔 事务边界 ≈ checkpoint 间隔

流程:

  1. beginTransaction → Kafka producer.initTransactions
  2. invokeproducer.send(未 commit);
  3. preCommitflush
  4. checkpoint 完成 → commitTransaction
  5. abort 路径 → abortTransaction

Consumer 端须 isolation.level=read_committed 才能读 EOS 管道输出(第 14 篇)。

3.2 典型失败场景

崩溃点 行为 结果
preCommit 前 事务未 flush abort,无可见数据,at-least-once 重放
preCommit 后、commit 前 pending 在 checkpoint 恢复后 commit 或 abort 该 txn;Kafka 未 commit 的消息对 read_committed 不可见
commit 后、notify 前 JM 崩 可能重复 notify commit 须幂等(Kafka 按 transactional.id + epoch fencing)
commit 成功后 正常;下游可见

Kafka 事务 timeouttransaction.max.timeout.ms)须 大于 checkpoint 间隔 + 最大恢复时间,否则 broker 会 abort 长事务,破坏 EOS。

3.3 transactional.id 命名与多作业隔离

规则 原因
前缀全局唯一 同 broker 上两作业共用前缀 → fencing 互相 abort
稳定前缀 + subtask 派生后缀 Flink 为每个并行 subtask 生成独立 txn
作业升级时不随意改前缀 否则旧 txn 可能残留 in-doubt

配置 setTransactionalIdPrefix("my-app-v3-") 时,把 作业名与版本 编进前缀,避免蓝绿部署冲突。

3.4 read_committed 下游

EOS 管道的 输出 topic 须被 isolation.level=read_committed 的 consumer 读取,否则 aborted txn 中的消息可能被看见(取决于 consumer 版本与配置)。输入 topic 若由上游事务 producer 写入,Flink source 同样应设 read_committed第 14 篇)。


四、Iceberg / 湖表 Sink 的 2PC 落点

4.1 与 lakehouse/19 的分工

内容 lakehouse/19 本文
Writer 写 Parquet、equality delete 详述 引用
preCommit / notifyComplete 时序 表格式 + Flink 钩子 引擎回调顺序与失败语义
小文件、compaction 详述 第 17 篇
CAS 提交 指向 lakehouse/11 作为 Sink commit 的原子性来源

引擎侧时序(与 lakehouse/19 第二节一致,此处只强调 Flink 责任):

  1. Writer snapshotState:flush 并关闭本轮 data file,文件清单写入 writer 算子 state
  2. Committer snapshotState:收集各 writer 的 pending files(pre-commit 语义:文件在对象存储, 进表 snapshot);
  3. Checkpoint COMPLETED → Committer notifyCheckpointComplete → 调用 Iceberg API append/upsert commit
  4. Iceberg catalog 执行 CASlakehouse 第 11 章):指针 \(v_N \to v_{N+1}\) 成功则读端可见新 snapshot。

Flink 保证:不在 step 3 之前调用 Iceberg commit。Iceberg 保证:commit CAS 幂等冲突重试。两者叠加 才形成入湖 EOS(第 14 篇 三不变量)。

4.2 重复 commit 与乐观并发

Committer 重复对 同一 checkpointId 提交:Iceberg sink 实现应记录 已提交的 checkpoint id,跳过或 no-op(Flink Iceberg connector 设计)。

不同作业或 不同 checkpoint 并发 commit:CAS 失败 → 按 lakehouse 第 11 章 重试——这是 表格式层 行为,Flink 侧表现为 commit 异常重试或作业 fail(视 connector 版本与配置)。引擎调优见 第 17 篇(并行 writer 与冲突率)。

4.3 Hudi / Delta(边界)

Hudi Flink sink 同样挂 checkpoint 2PC;Delta Structured Streaming 用 micro-batch id + 表事务语义。细节各读官方文档;与 Flink notifyComplete 对齐的模式同构,不在此展开表格式差异(lakehouse 系列已覆盖)。

4.4 SQL INSERT INTO iceberg 表

Flink SQL 流式 INSERT INTO catalog.db.table SELECT ...execution.checkpointing.mode=exactly-once 且 catalog 为 Iceberg 时,Planner 插入 与 DataStream sink 等价的 writer/committer 算子(Iceberg 文档 Flink Writes)。运维上 SQL 与 DataStream 共享同一 2PC 语义;调 checkpoint 间隔 即调 Iceberg commit 频率(第 17 篇)。


五、JDBC 与其它 Sink 的边界

5.1 JDBC XA 的限制

JDBC sink 的 exactly-once 依赖数据库 XA 事务XASourceSinkFunction 类路径——生产少见,原因:

更常见:at-least-once + INSERT ON CONFLICT 幂等(第 14 篇)。不要 假设「JDBC sink + EXACTLY_ONCE checkpoint = EOS」。

5.2 非 2PC 自定义 Sink 反模式

// 反模式:在 invoke 中直接写外部 DB,无 2PC
public void invoke(String value, Context ctx) {
    jdbcTemplate.update("INSERT INTO out VALUES (?)", value);
}

即使 CheckpointingMode.EXACTLY_ONCE,这条路径仍是 at-least-once 对外部 DB。正确方向:实现 TwoPhaseCommitSinkFunction 或使用带 EOS 的官方 connector。

5.3 Elasticsearch / HTTP Sink

Bulk API 在 invoke 中 flush 同样无 2PC;EOS 需 外部版本号at-least-once + 下游 dedup。这类 sink 在 第 14 篇 语义矩阵中默认 at-least-once


六、失败场景全景

6.1 时间线图

stateDiagram-v2
  [*] --> Processing: 正常处理
  Processing --> PreCommit: checkpoint barrier
  PreCommit --> CkptPending: snapshot 上传
  CkptPending --> Committed: notifyComplete + commit
  CkptPending --> Aborted: ckpt 失败 / 超时
  Aborted --> Processing: 恢复重放
  PreCommit --> Aborted: TM 崩溃
  Committed --> [*]

6.2 三类问题对照表

问题 根因 缓解
重复消费(下游看到 duplicate) commit 早于 checkpoint 或无 2PC 2PC + read_committed / 幂等 sink
丢失 at-most-once offset 推进 checkpoint 存 offset;禁用 unsafe auto-commit
重复 commit 副作用 notify 重试、JM failover sink commit 幂等;Iceberg CAS;Kafka epoch
孤儿文件 preCommit 后 crash,未进表 Iceberg remove_orphan_files(lakehouse/20);不影响读端正确性
in-doubt 事务 部分 subtask commit 成功 Flink 恢复时按 checkpoint 元数据 完成或 abort 悬挂 txn

6.3 commit 后崩溃与 consumer 视角

Checkpoint \(N\) commit 成功后,JM 记录 \(N\) 为最新 completed。若此后立即崩溃:

6.4 JobManager 切换与 ZooKeeper / K8s HA

Flink HA 模式下 JM 故障会选新 leader;已完成 checkpoint 列表CheckpointStorage 恢复。TwoPhaseCommitSink 须保证:

这与 Kafka broker 对 transactional.id epoch 的 fencing、lakehouse 第 11 章 对 CAS 冲突的重试 同一类问题at-least-once 控制面 + 幂等数据面


7.1 最小配置骨架

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("input")
    .setGroupId("eos-demo")
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    .setProperty("isolation.level", "read_committed")
    .build();

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("eos-demo-")
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-in")
   .map(String::toUpperCase)
   .sinkTo(sink);

检查项

7.2 可复现实验(不提供伪造计数)

环境系列 PLAN 台账):Flink 1.20+、Kafka 3.x KRaft、单 topic 输入输出、有界或无界计数源。

步骤

  1. 运行 EOS 管道,向 input 写入已知总量 \(M\) 条带唯一 id 的记录;
  2. 连续两次 checkpoint 之间 kill TaskManager 进程;
  3. 作业恢复后继续运行至 stable;
  4. read_committed consumer 读 output,统计 distinct id 数总条数

预期(协议推导,非 fabricated 数字):distinct id = \(M\),总条数 = \(M\)(无重复无丢)。若 sink 仅 at-least-once,总条数可能 \(> M\)

7.3 Iceberg 入湖 EOS 检查项(引擎侧)

不重复 lakehouse/19 的 DDL,只列 Flink 侧 与 2PC 相关的核对:


八、与 lakehouse 提交协议的对称

flowchart TB
  subgraph Flink["Flink 引擎(本文)"]
    CP[Checkpoint N 完成]
    NC[notifyCheckpointComplete]
  end
  subgraph Iceberg["Iceberg(lakehouse/11、19)"]
    W[Data files 已写对象存储]
    CAS["Catalog CAS<br/>metadata 指针 swap"]
    SN[新 snapshot 可见]
  end
  CP --> NC
  NC --> CAS
  W --> CAS
  CAS --> SN
Flink 概念 Iceberg 概念
preCommit(文件 flush,未 commit 表) 新 data file 在 storage,未进 snapshot
notifyCheckpointComplete → commit Catalog CAS 成功
abort / 未 complete 的 ckpt 孤儿文件,不进入 snapshot
checkpointId 幂等 同一批文件不重复挂 snapshot

读者应 交叉阅读 lakehouse 第 19 章(入湖三不变量)与 lakehouse 第 11 章(CAS 冲突重试),本系列 第 17 篇作业旋钮 如何影响 commit 频率与冲突。

8.1 三不变量在引擎侧的落地

lakehouse 第 19 章 第三节的 exactly-once 三不变量,用 Flink 术语重述:

  1. 数据文件可重写、不可半可见 → Writer preCommit 前文件已在对象存储,Iceberg reader 未挂 snapshot;
  2. 提交幂等 → Committer 按 checkpointId 去重 + Iceberg CAS;
  3. offset 与提交同生死 → Kafka offset 与 writer/committer state 写入 同一 checkpoint第 10 篇)。

任一条在引擎侧被打破(例如提前 commit、offset 不在 checkpoint),端到端 EOS 不成立——表格式与引擎各管一条,缺一则全缺


九、边界与后文

本文 重复 Debezium envelope 与 Connect offset(第 16 篇),也不展开背压与 checkpoint 超时连锁(第 18 篇)。

未覆盖

下一篇 第 16 篇 回到 CDC 上游:Debezium 事件结构、snapshot vs streaming、schema history——为 入湖 upsert 的主键与顺序 提供 source 侧前提。


参考资料

  1. Apache Flink Documentation, Two-Phase Commit SinkFault Tolerance GuaranteesCheckpointingKafka Connector(Exactly Once)。A 级。
  2. Apache Flink 源码:flink-streaming-javaTwoPhaseCommitSinkFunction;各 connector 的 Committer 实现(release-1.20 / release-2.x)。A 级。
  3. Apache Kafka Documentation, Transactions(transactional.id、fencing、timeout)。A 级。
  4. Apache Iceberg Documentation, Flink WritesIcebergFilesCommitter、checkpoint 提交)。A 级。
  5. 本仓库:lakehouse 第 11 章 提交并发lakehouse 第 19 章 流式 CDC 入湖
  6. 本系列:第 10 篇 Checkpoint第 14 篇 交付语义第 17 篇 流式入湖深化

返回 系列目录 · 上一篇 交付语义:从 at-most-once 到 exactly-once · 下一篇 Debezium 与 Change Data Capture

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照

从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。

2026-07-01 · database / distributed

【流式数据处理】交付语义:从 at-most-once 到 exactly-once

用 Source、引擎、Sink 三层模型拆解 at-most-once、at-least-once、exactly-once 的组合规则与最弱环决定律;对照 Flink checkpoint 模式、Kafka 事务与幂等 producer、重复消费/重复写入的三类修复手段,为两阶段提交 sink 铺垫。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。

2026-07-01 · database / distributed

【流式数据处理】Kafka 事务与幂等 Producer

从幂等 producer 的 PID 与 sequence 去重,到事务 producer 的 init/begin/commit/abort 生命周期、__transaction_state 与 read_committed 隔离,讲清 Kafka 3.x 单集群 EOS 边界及其与 Flink checkpoint 的衔接。


By .