第 14 篇 建立三层交付语义:引擎 EXACTLY_ONCE checkpoint alone 不够,Sink 必须把外部写入拆成「预提交(不可见)」与「提交(可见)」,且 提交时刻不得早于 checkpoint 全局完成。Flink 把这一模式抽象为 TwoPhaseCommitSinkFunction / GenericTwoPhaseCommitSink(Flink 文档 Two-Phase Commit Sink,A 级)。
lakehouse
第 19 章 从 表格式侧 描述
IcebergStreamWriter +
IcebergFilesCommitter、checkpoint 完成后的
Iceberg append;lakehouse
第 11 章 说明 提交 = catalog 对 metadata 指针的
CAS。本文从 Flink 引擎侧 说明 2PC
协议如何调用这些 sink、Kafka 事务 sink 如何同构、以及
commit 前崩溃 / commit 后崩溃 / 重复 commit
三条失败线。不重复 lakehouse/19 的 equality delete、Connect
control topic 与小文件治理全文。
本文是「流式数据处理」系列第 15 篇(共 18 篇)。→ 系列目录
篇目 核心内容 第 14 篇 · 交付语义 三层语义与最弱环 第 15 篇 · 两阶段提交 2PC 协议、Kafka/Iceberg sink、失败场景 第 16 篇 · Debezium CDC CDC 事件与 Connect offset
版本锚定:Flink 1.20+ / 2.x;Kafka 3.x;Iceberg Flink sink 以官方 Flink Writes 为准。
环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2),未安装 JVM / Flink / Kafka / Iceberg。协议来自 Flink 官方文档与 connector 设计;不粘贴未执行的 EOS 计数实验输出。
一、为什么需要两阶段提交
1.1 单阶段写入的失败窗口
第
14 篇 的时间线:Sink 在 invoke 里直接写
Kafka / JDBC,写入可能早于 checkpoint
持久化。TM 崩溃后:
- 引擎从 旧 checkpoint 恢复,重放尚未 checkpoint 的 input;
- 外部系统里 已有 第一次写入的结果 → 重复。
要把「对外可见」推迟到 确认全局快照成功 之后,需要:
- Phase 1(pre-commit):副作用进入 pending 状态,下游不可见(或不可提交);
- Checkpoint 持久化:pending 句柄写入 checkpoint 存储;
- Phase 2(commit):Flink 调用
notifyCheckpointComplete(checkpointId)后才 commit; - Abort:checkpoint 失败或作业 fail-over 时 撤销 未 commit 的 pending。
sequenceDiagram
participant JM as JobManager
participant OP as Sink subtask
participant EXT as 外部系统
JM->>OP: checkpoint N 开始
OP->>EXT: preCommit(不可见)
OP->>JM: pending 状态写入 ckpt N
JM->>JM: 所有算子 ack,ckpt N 完成
JM->>OP: notifyCheckpointComplete(N)
OP->>EXT: commit(可见)
1.2 与分布式事务 2PC 的对应
经典 2PC:Coordinator 协调 Participant prepare → commit/abort。Flink 中:
| 经典 2PC | Flink |
|---|---|
| Coordinator | JobManager / CheckpointCoordinator |
| Participant | TwoPhaseCommitSink subtask |
| Prepare | preCommit / snapshotState |
| Commit | commit(在 notifyComplete 后) |
| Abort | abort(checkpoint 过期或恢复时) |
Flink 不是 XA 全局事务管理器:每个 sink subtask 独立 与外部系统协商;端到端一致性靠 checkpoint 边界统一 commit 时刻 + 外部系统 幂等 commit。
二、GenericTwoPhaseCommitSink 协议
2.1 接口生命周期
Flink
TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>(及
Sink V2 中的 commit 语义)核心方法(文档 Two-Phase
Commit Sink):
| 方法 | 调用时机 | 职责 |
|---|---|---|
beginTransaction() |
新 checkpoint 周期开始 | 创建外部事务句柄 TXN |
invoke(TXN, value) |
每条 record | 写入事务缓冲 |
preCommit(TXN) |
snapshotState |
刷盘、封账;返回可序列化 pending |
commit(TXN) |
notifyCheckpointComplete |
使外部系统可见 |
abort(TXN) |
恢复 / checkpoint 废弃 | 撤销 pending |
关键不变量(Flink 文档,A 级):
preCommit之后、commit之前,外部系统 不得 把数据暴露给正常 consumer / reader;commit必须是 幂等 的:同一 checkpointId 重复commit不得产生 duplicate visible effect;- 恢复时,对 已 commit 但未 ack 完成 或 in-doubt 的事务,须能 commit 或 abort 到一致状态。
2.2 与 CheckpointCoordinator 的挂钩
第 10 篇 的生命周期:
- Barrier 对齐 → 各算子
snapshotState(Sink 在此preCommit); - 所有 state handle 持久化 → checkpoint COMPLETED;
notifyCheckpointComplete(checkpointId)→ Sinkcommit;- 若 checkpoint 超时/失败 → 该 checkpoint 的 pending 事务 abort,恢复后从新 barrier 重算。
Sink 不得 在 snapshotState
返回后假设一定会 commit——JM 可能在 collect
阶段失败。
2.3 多 subtask Sink 的协调
Kafka sink:每个 subtask 独立 Kafka
transaction,同一
transactional.id 前缀 +
fencing 防止 zombie producer(Kafka 文档
Transactions)。
Iceberg sink:Writer 多并行 subtask 各写
data file;Committer 常为
低并行度(甚至 1),在 commit 阶段汇总各
writer 的 DataFile 清单做一次 Iceberg 提交(lakehouse
第 19 章 架构图——本文不重复 upsert SQL)。Committer 的
2PC 状态存 checkpoint;writer 只负责 flush 文件。
2.4 Sink V2(Unified Sink API)中的 Committer
Flink 1.12+ 引入 Sink /
SinkWriter / Committer 分离(2.x
文档 Unified Sink):
| 组件 | 2PC 职责 |
|---|---|
SinkWriter |
缓冲写、在 checkpoint 时 emit committable |
CommittingOperator |
收集 committable,snapshotState
持久化;notifyCheckpointComplete 调用
commit |
TwoPhaseCommittingSink 接口把经典
TwoPhaseCommitSinkFunction 拆进统一 API;Kafka
/ Iceberg 官方 connector
新版本走此路径。语义不变:仍是 preCommit →
checkpoint → commit。
2.5 恢复时的「悬挂事务」清理
作业 fail-over 后,新 JM 从 checkpoint 元数据读取 已完成 与 进行中 的 checkpoint 列表。对 TwoPhaseCommitSink:
- 已完成 checkpoint \(N\) 的 commit 已调用:恢复后 不得 再次 commit 同一批(依赖 sink 侧 checkpointId 去重);
- 未完成 checkpoint \(M\) 的 preCommit
已持久化:恢复后 abort \(M\) 对应
pending,或在外部系统查询 in-doubt txn 后
commit/rollback(Kafka
initTransactionsfencing 会 abort 旧 epoch)。
Flink 文档 Fault Tolerance 要求 sink 实现 recoverable commit——这是 EOS 与「仅一次 preCommit」的分界线。
三、Kafka Sink 的 2PC 落点
3.1 事务边界与 checkpoint 对齐
Flink Kafka Sink exactly-once(文档 Kafka Connector → Exactly Once):
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-app-")
.build();| 配置 | 作用 |
|---|---|
DeliveryGuarantee.EXACTLY_ONCE |
启用 2PC 路径 |
transactional.id 前缀 |
每 subtask 派生唯一 transactional.id |
| checkpoint 间隔 | 事务边界 ≈ checkpoint 间隔 |
流程:
beginTransaction→ Kafkaproducer.initTransactions;invoke→producer.send(未 commit);preCommit→flush;- checkpoint 完成 →
commitTransaction; - abort 路径 →
abortTransaction。
Consumer 端须 isolation.level=read_committed
才能读 EOS 管道输出(第
14 篇)。
3.2 典型失败场景
| 崩溃点 | 行为 | 结果 |
|---|---|---|
| preCommit 前 | 事务未 flush | abort,无可见数据,at-least-once 重放 |
| preCommit 后、commit 前 | pending 在 checkpoint | 恢复后 commit 或 abort 该 txn;Kafka 未 commit 的消息对 read_committed 不可见 |
| commit 后、notify 前 JM 崩 | 可能重复 notify | commit 须幂等(Kafka 按 transactional.id + epoch fencing) |
| commit 成功后 | — | 正常;下游可见 |
Kafka 事务
timeout(transaction.max.timeout.ms)须
大于 checkpoint 间隔 + 最大恢复时间,否则
broker 会 abort 长事务,破坏 EOS。
3.3
transactional.id 命名与多作业隔离
| 规则 | 原因 |
|---|---|
| 前缀全局唯一 | 同 broker 上两作业共用前缀 → fencing 互相 abort |
| 稳定前缀 + subtask 派生后缀 | Flink 为每个并行 subtask 生成独立 txn |
| 作业升级时不随意改前缀 | 否则旧 txn 可能残留 in-doubt |
配置 setTransactionalIdPrefix("my-app-v3-")
时,把 作业名与版本
编进前缀,避免蓝绿部署冲突。
3.4 read_committed 下游
EOS 管道的 输出 topic 须被
isolation.level=read_committed 的 consumer
读取,否则 aborted txn 中的消息可能被看见(取决于 consumer
版本与配置)。输入 topic 若由上游事务
producer 写入,Flink source 同样应设
read_committed(第
14 篇)。
四、Iceberg / 湖表 Sink 的 2PC 落点
4.1 与 lakehouse/19 的分工
| 内容 | lakehouse/19 | 本文 |
|---|---|---|
| Writer 写 Parquet、equality delete | 详述 | 引用 |
| preCommit / notifyComplete 时序 | 表格式 + Flink 钩子 | 引擎回调顺序与失败语义 |
| 小文件、compaction | 详述 | 见 第 17 篇 |
| CAS 提交 | 指向 lakehouse/11 | 作为 Sink commit 的原子性来源 |
引擎侧时序(与 lakehouse/19 第二节一致,此处只强调 Flink 责任):
- Writer
snapshotState:flush 并关闭本轮 data file,文件清单写入 writer 算子 state; - Committer
snapshotState:收集各 writer 的 pending files(pre-commit 语义:文件在对象存储,未 进表 snapshot); - Checkpoint COMPLETED → Committer
notifyCheckpointComplete→ 调用 Iceberg API append/upsert commit; - Iceberg catalog 执行 CAS(lakehouse 第 11 章):指针 \(v_N \to v_{N+1}\) 成功则读端可见新 snapshot。
Flink 保证:不在 step 3 之前调用 Iceberg commit。Iceberg 保证:commit CAS 幂等冲突重试。两者叠加 才形成入湖 EOS(第 14 篇 三不变量)。
4.2 重复 commit 与乐观并发
Committer 重复对 同一 checkpointId 提交:Iceberg sink 实现应记录 已提交的 checkpoint id,跳过或 no-op(Flink Iceberg connector 设计)。
不同作业或 不同 checkpoint 并发 commit:CAS 失败 → 按 lakehouse 第 11 章 重试——这是 表格式层 行为,Flink 侧表现为 commit 异常重试或作业 fail(视 connector 版本与配置)。引擎调优见 第 17 篇(并行 writer 与冲突率)。
4.3 Hudi / Delta(边界)
Hudi Flink sink 同样挂 checkpoint 2PC;Delta Structured Streaming 用 micro-batch id + 表事务语义。细节各读官方文档;与 Flink notifyComplete 对齐的模式同构,不在此展开表格式差异(lakehouse 系列已覆盖)。
4.4 SQL
INSERT INTO iceberg 表
Flink SQL 流式
INSERT INTO catalog.db.table SELECT ... 在
execution.checkpointing.mode=exactly-once 且
catalog 为 Iceberg 时,Planner 插入 与 DataStream
sink 等价的 writer/committer 算子(Iceberg 文档
Flink Writes)。运维上 SQL 与 DataStream
共享同一 2PC 语义;调 checkpoint 间隔 即调 Iceberg
commit 频率(第 17
篇)。
五、JDBC 与其它 Sink 的边界
5.1 JDBC XA 的限制
JDBC sink 的 exactly-once 依赖数据库 XA
事务 与 XASourceSinkFunction
类路径——生产少见,原因:
- 连接池与 XA 兼容性差;
- commit 延迟高;
- 与 checkpoint 间隔耦合的长事务锁表。
更常见:at-least-once +
INSERT ON CONFLICT 幂等(第
14 篇)。不要 假设「JDBC sink +
EXACTLY_ONCE checkpoint = EOS」。
5.2 非 2PC 自定义 Sink 反模式
// 反模式:在 invoke 中直接写外部 DB,无 2PC
public void invoke(String value, Context ctx) {
jdbcTemplate.update("INSERT INTO out VALUES (?)", value);
}即使
CheckpointingMode.EXACTLY_ONCE,这条路径仍是
at-least-once 对外部 DB。正确方向:实现
TwoPhaseCommitSinkFunction 或使用带 EOS 的官方
connector。
5.3 Elasticsearch / HTTP Sink
Bulk API 在 invoke 中 flush 同样无 2PC;EOS
需 外部版本号 或 at-least-once +
下游 dedup。这类 sink 在 第
14 篇 语义矩阵中默认
at-least-once。
六、失败场景全景
6.1 时间线图
stateDiagram-v2
[*] --> Processing: 正常处理
Processing --> PreCommit: checkpoint barrier
PreCommit --> CkptPending: snapshot 上传
CkptPending --> Committed: notifyComplete + commit
CkptPending --> Aborted: ckpt 失败 / 超时
Aborted --> Processing: 恢复重放
PreCommit --> Aborted: TM 崩溃
Committed --> [*]
6.2 三类问题对照表
| 问题 | 根因 | 缓解 |
|---|---|---|
| 重复消费(下游看到 duplicate) | commit 早于 checkpoint 或无 2PC | 2PC + read_committed / 幂等 sink |
| 丢失 | at-most-once offset 推进 | checkpoint 存 offset;禁用 unsafe auto-commit |
| 重复 commit 副作用 | notify 重试、JM failover | sink commit 幂等;Iceberg CAS;Kafka
epoch |
| 孤儿文件 | preCommit 后 crash,未进表 | Iceberg
remove_orphan_files(lakehouse/20);不影响读端正确性 |
| in-doubt 事务 | 部分 subtask commit 成功 | Flink 恢复时按 checkpoint 元数据 完成或 abort 悬挂 txn |
6.3 commit 后崩溃与 consumer 视角
Checkpoint \(N\) commit 成功后,JM 记录 \(N\) 为最新 completed。若此后立即崩溃:
- 恢复从 \(N\) 或 \(N-1\) 取决于 completed checkpoint 列表;
- 已 commit 的外部写入不应回滚(否则违反 EOS);
- Source 从 \(N\) 记录的 offset 重放,引擎 state 与 \(N\) 一致 → processed 过的 record 不会多算;
- Sink 若收到 重复 preCommit 同批数据,依赖 幂等 commit 或 file 级 dedup(Iceberg 新 snapshot 只引用新文件清单)。
6.4 JobManager 切换与 ZooKeeper / K8s HA
Flink HA 模式下 JM 故障会选新 leader;已完成
checkpoint 列表 从 CheckpointStorage
恢复。TwoPhaseCommitSink 须保证:
- 新 JM 不会 跳过 对已 completed checkpoint 的 commit 回调(若旧 JM 在 notify 前死亡,新 JM 在恢复 pipeline 后 重放 notifyCheckpointComplete);
- 重复 notify 时外部系统 幂等。
这与 Kafka broker 对 transactional.id epoch
的 fencing、lakehouse
第 11 章 对 CAS 冲突的重试
同一类问题:at-least-once 控制面 +
幂等数据面。
七、端到端 EOS 管道示例(Kafka → Flink → Kafka)
7.1 最小配置骨架
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input")
.setGroupId("eos-demo")
.setStartingOffsets(OffsetsInitializer.committedOffsets())
.setProperty("isolation.level", "read_committed")
.build();
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("eos-demo-")
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-in")
.map(String::toUpperCase)
.sinkTo(sink);检查项:
7.2 可复现实验(不提供伪造计数)
环境(系列 PLAN 台账):Flink 1.20+、Kafka 3.x KRaft、单 topic 输入输出、有界或无界计数源。
步骤:
- 运行 EOS 管道,向 input 写入已知总量 \(M\) 条带唯一 id 的记录;
- 在 连续两次 checkpoint 之间 kill TaskManager 进程;
- 作业恢复后继续运行至 stable;
- 用
read_committedconsumer 读 output,统计 distinct id 数 与 总条数。
预期(协议推导,非 fabricated 数字):distinct id = \(M\),总条数 = \(M\)(无重复无丢)。若 sink 仅 at-least-once,总条数可能 \(> M\)。
7.3 Iceberg 入湖 EOS 检查项(引擎侧)
不重复 lakehouse/19 的 DDL,只列 Flink 侧 与 2PC 相关的核对:
八、与 lakehouse 提交协议的对称
flowchart TB
subgraph Flink["Flink 引擎(本文)"]
CP[Checkpoint N 完成]
NC[notifyCheckpointComplete]
end
subgraph Iceberg["Iceberg(lakehouse/11、19)"]
W[Data files 已写对象存储]
CAS["Catalog CAS<br/>metadata 指针 swap"]
SN[新 snapshot 可见]
end
CP --> NC
NC --> CAS
W --> CAS
CAS --> SN
| Flink 概念 | Iceberg 概念 |
|---|---|
| preCommit(文件 flush,未 commit 表) | 新 data file 在 storage,未进 snapshot |
| notifyCheckpointComplete → commit | Catalog CAS 成功 |
| abort / 未 complete 的 ckpt | 孤儿文件,不进入 snapshot |
| checkpointId 幂等 | 同一批文件不重复挂 snapshot |
读者应 交叉阅读 lakehouse 第 19 章(入湖三不变量)与 lakehouse 第 11 章(CAS 冲突重试),本系列 第 17 篇 讲 作业旋钮 如何影响 commit 频率与冲突。
8.1 三不变量在引擎侧的落地
lakehouse 第 19 章 第三节的 exactly-once 三不变量,用 Flink 术语重述:
- 数据文件可重写、不可半可见 → Writer preCommit 前文件已在对象存储,Iceberg reader 未挂 snapshot;
- 提交幂等 → Committer 按 checkpointId 去重 + Iceberg CAS;
- offset 与提交同生死 → Kafka offset 与 writer/committer state 写入 同一 checkpoint(第 10 篇)。
任一条在引擎侧被打破(例如提前 commit、offset 不在 checkpoint),端到端 EOS 不成立——表格式与引擎各管一条,缺一则全缺。
九、边界与后文
本文 不 重复 Debezium envelope 与 Connect offset(第 16 篇),也不展开背压与 checkpoint 超时连锁(第 18 篇)。
未覆盖:
- Google Cloud Pub/Sub、Pulsar 等非 Kafka 2PC sink;
- Flink 2.x Unified Sink API 全部实现类枚举;
- Changelog state + 2PC 叠加细节。
下一篇 第 16 篇 回到 CDC 上游:Debezium 事件结构、snapshot vs streaming、schema history——为 入湖 upsert 的主键与顺序 提供 source 侧前提。
参考资料
- Apache Flink Documentation, Two-Phase Commit Sink、Fault Tolerance Guarantees、Checkpointing、Kafka Connector(Exactly Once)。A 级。
- Apache Flink 源码:
flink-streaming-java—TwoPhaseCommitSinkFunction;各 connector 的Committer实现(release-1.20 / release-2.x)。A 级。 - Apache Kafka Documentation, Transactions(transactional.id、fencing、timeout)。A 级。
- Apache Iceberg Documentation, Flink
Writes(
IcebergFilesCommitter、checkpoint 提交)。A 级。 - 本仓库:lakehouse 第 11 章 提交并发、lakehouse 第 19 章 流式 CDC 入湖。
- 本系列:第 10 篇 Checkpoint、第 14 篇 交付语义、第 17 篇 流式入湖深化。
返回 系列目录 · 上一篇 交付语义:从 at-most-once 到 exactly-once · 下一篇 Debezium 与 Change Data Capture
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照
从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。
【流式数据处理】交付语义:从 at-most-once 到 exactly-once
用 Source、引擎、Sink 三层模型拆解 at-most-once、at-least-once、exactly-once 的组合规则与最弱环决定律;对照 Flink checkpoint 模式、Kafka 事务与幂等 producer、重复消费/重复写入的三类修复手段,为两阶段提交 sink 铺垫。
【流式数据处理】Kafka · Flink · 状态 · Exactly-Once
承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。
【流式数据处理】Kafka 事务与幂等 Producer
从幂等 producer 的 PID 与 sequence 去重,到事务 producer 的 init/begin/commit/abort 生命周期、__transaction_state 与 read_committed 隔离,讲清 Kafka 3.x 单集群 EOS 边界及其与 Flink checkpoint 的衔接。