第 10 篇 讲 checkpoint 如何把 Kafka offset 与算子 state 绑在同一一致性点;第 13 篇 讲 RocksDB 状态如何膨胀。运维问得最多的仍是:「开了 exactly-once,为什么下游还有重复?」「at-least-once 够不够?」「重复消费和重复写入是一回事吗?」
这些问题不能用一个开关回答。流管道的交付语义是
Source、引擎、Sink
三层各自承诺的叠加;端到端语义由
最弱环 决定。Flink 的
EXACTLY_ONCE checkpoint 模式只保证
引擎内部 对每条 record
的处理效果等价于「恰好一次」——不自动保证 Kafka
里没重复、Iceberg 里没双份。
本文建立三层模型,逐层说明 at-most-once / at-least-once / exactly-once 的定义、Flink 配置落点、以及三类修复手段(幂等、去重、两阶段提交)。第 15 篇 在此基础上展开 GenericTwoPhaseCommitSink 与 lakehouse 表提交的对齐。
本文是「流式数据处理」系列第 14 篇(共 18 篇)。→ 系列目录
篇目 核心内容 第 10 篇 · Checkpoint Barrier 对齐与一致性快照 第 14 篇 · 交付语义 三层组合、最弱环、修复手段 第 15 篇 · 两阶段提交 2PC sink 与端到端 EOS
版本锚定:Flink 1.20+ / 2.x;Kafka 3.x 新 Source/Sink API 与事务 producer。语义描述以 Flink Documentation Fault Tolerance Guarantees 为准。
环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2),未安装 JVM / Flink / Kafka。机制来自 Flink 与 Kafka 官方文档;不伪造故障注入实验的计数结果。
一、三层模型:端到端语义由最弱环决定
1.1 为什么需要分层
一条记录从进入管道到落进外部系统,至少经过三段独立进度(第 10 篇):
| 层 | 负责什么 | 典型持久化点 |
|---|---|---|
| Source | 从日志/CDC 读入,能否 replay、读到哪 | Kafka offset、Debezium Connect offset |
| 引擎 | 算子变换、state 更新、barrier 对齐 | Flink checkpoint(state + source state) |
| Sink | 对外部系统的副作用(写 topic、写表、RPC) | Kafka 事务 commit、Iceberg snapshot、JDBC commit |
flowchart LR
SRC["Source 层<br/>offset / 可重放"]
ENG["引擎层<br/>checkpoint + state"]
SNK["Sink 层<br/>外部副作用"]
SRC --> ENG --> SNK
组合规则(Flink 文档 Fault Tolerance Guarantees,A 级):
\[ \text{端到端语义} = f(\text{Source},\ \text{引擎},\ \text{Sink}) \]
若任一层仅为 at-least-once,整体 不可能 达到 exactly-once,除非下游用幂等/去重 在应用层伪造 exactly-once 效果。
1.2 三种语义的精确定义
对 单条 record 的效果(非「物理上网络只传一次」):
| 语义 | 定义 | 失败重试时的典型表现 |
|---|---|---|
| at-most-once | 每条 record 最多 被处理一次;可能 丢失 | 未 checkpoint 的进度与 state 一起丢 |
| at-least-once | 每条 record 至少 被处理一次;可能 重复 | 从上次 checkpoint 重放,已写入 sink 的可能再写 |
| exactly-once | 每条 record 的效果等价于 恰好处理一次 | 重放与外部提交对齐,重复物理写入被抵消或不可见 |
「物理 exactly-once」在分布式系统里通常不可达(网络重传、进程崩溃)。工程上的 exactly-once 指 可观测结果 的等价性——与数据库 REDO/UNDO 的直觉一致。
1.3 常见误区
| 误区 | 纠正 |
|---|---|
开了 EXACTLY_ONCE 就端到端 EOS |
只保证引擎内部;Sink 须支持 2PC 或幂等 |
| at-least-once「不够生产」 | 下游幂等 + 可接受少量重复窗口时,at-least-once 更简单 |
Kafka acks=all = exactly-once |
acks 管 broker 持久化,不管 consumer 重复读 |
| checkpoint 成功 = 数据已入湖 | Iceberg commit 在
notifyCheckpointComplete,见 lakehouse
第 19 章 |
二、Source 层:可重放与起始位点
2.1 Kafka Source 与 offset
Flink KafkaSource 在 checkpoint 中持久化
各 partition 的 offset(Flink Kafka
Connector 文档,A 级)。恢复时:
- at-least-once:从 checkpoint offset 重新消费,可能重复读已处理过的消息;
- exactly-once(引擎):配合 barrier 对齐,保证重复读的消息 不会 在 state 里多计——但 已 commit 到外部 sink 的重复 仍取决于 sink 层。
第 5 篇(若已读)区分 broker committed offset 与 Flink checkpoint offset:容错以后者为准。
2.2 起始模式
StartupMode |
语义影响 |
|---|---|
earliest |
首次启动全量 replay;与 EOS 兼容但耗时长 |
latest |
可能丢历史 |
group-offsets |
依赖外部 committed offset,与 Flink checkpoint 混用时需明确主从 |
specific-offsets / timestamp |
补数、对账场景 |
CDC source(Debezium、Flink CDC)还有 snapshot 阶段 与 binlog 位点(本系列 第 16 篇)。Source 层 exactly-once 要求:位点与引擎 checkpoint 绑定,Connect 的 offset topic 单独使用时 不构成 Flink 端到端 EOS。
2.3 Source 层能单独保证什么
- at-most-once:auto-commit offset 且 不做 checkpoint,或 checkpoint 失败仍推进 offset(反模式);
- at-least-once:checkpoint 存 offset,失败从 checkpoint 重读;
- exactly-once(读侧):Kafka 事务 +
read_committedconsumer 只读已提交事务消息——这是 Kafka 生态 的 EOS 读路径(Kafka Transactions 文档),Flink Kafka source 在 EOS 管道中与事务 producer 配合。
Source 层 不能 单独保证「写下游 exactly-once」。
2.4 重复消费从哪来
即使 Source 正确 checkpoint offset,下列情况仍会出现 重复读:
| 原因 | 说明 |
|---|---|
| Checkpoint 失败回滚 | 从上一个 completed checkpoint 重放自上次成功点以来的所有 record |
| 作业 restart 未从 savepoint | 若手动指定 setStartFromEarliest 等,offset
重置 |
| Kafka consumer rebalance | 非 Flink 管理的 standalone consumer;Flink source 由 checkpoint 管 offset,rebalance 期间仍可能重复处理 未完成 checkpoint 的批次 |
| 上游 topic compaction 不适用 | compact topic 只保证 key 最新值,不保证 Flink 读路径不 replay |
重复读 ≠ 重复生效:引擎 EXACTLY_ONCE 模式下,重复读不应改变 keyed state 的计数结果;无 2PC 的 sink 仍会把重复读写出。
三、引擎层:Flink Checkpoint 模式
3.1
CheckpointingMode
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 或 CheckpointingMode.AT_LEAST_ONCE| 模式 | Barrier 行为 | State 更新 |
|---|---|---|
| EXACTLY_ONCE | 多输入算子 对齐 barrier,对齐前缓存的数据不参与当前 checkpoint | 快照与 barrier 切分一致 |
| AT_LEAST_ONCE | 不对齐,barrier 与 record 可交错 | 快照可能含「将被重放」的数据 → 恢复后 state 可能 多计 |
Flink 文档明确:EXACTLY_ONCE 模式不启用两阶段提交 sink 时,端到端仍是 at-least-once——因为 sink 可能在 checkpoint 完成前已写外部系统,失败后重放会 重复写。
3.2 与 Unaligned Checkpoint 的关系
Unaligned checkpoint(第 10 篇)缩短背压下对齐等待,不改变 EXACTLY_ONCE 对 state 的语义,仍要求 exactly-once 模式的 barrier 协议。调 unaligned 是为了 可用性/延迟,不是为了把 at-least-once 升级成 EOS。
3.3 引擎层 exactly-once 的边界
引擎保证的是:从 source 重放同一批 input 时,算子 state 与内部 side effect(如不含 2PC 的 print)与「每条 record 处理一次」一致。
不包含:
- 已在 sink 提交的外部写入;
- 算子内 手动 调用的外部 HTTP/JDBC(无 2PC);
- 异步 IO 在 barrier 之后完成且未纳入 snapshot 的回调。
3.4 侧输出(Side Output)与语义
OutputTag 侧输出流若 再 sink
到外部系统 且未走 2PC,其语义
独立于 主输出的 checkpoint
模式。常见陷阱:主链路 EXACTLY_ONCE + 侧输出
print 或写 Kafka 非事务 topic
→ 侧输出仍是 at-least-once。
3.5 算子链(Operator Chain)不改变语义
第 7 篇 的 chain 合并只减少序列化;barrier 仍按 JobGraph 边传播。chain 内的最后一个算子 与 独立 Sink 算子 在 2PC 挂载点上无区别——EOS 取决于 Sink 是否实现 2PC,不取决于是否 chain。
四、Sink 层:外部副作用与可见性
4.1 为什么 Sink 是最常见短板
Sink 写入 外部系统,其 commit 点与 Flink checkpoint 默认不同步。典型失败时序:
sequenceDiagram
participant E as 引擎
participant S as Sink(无 2PC)
participant K as Kafka 目标 topic
E->>S: 处理 record r,调用 send
S->>K: 写入 r(已可见)
E->>E: checkpoint 进行中…
Note over E: TM 崩溃
E->>E: 从上个 checkpoint 恢复
E->>S: 重放 r
S->>K: 再次写入 r
无 2PC 时,至少一次写入 不可避免。修复路径:Sink 层 2PC(第 15 篇)、幂等 write、或下游 去重。
4.2 Sink 类型与默认语义
| Sink 类型 | 默认语义 | EOS 条件 |
|---|---|---|
print / 无状态副作用 |
随引擎 | 引擎 EXACTLY_ONCE 即可 |
| Kafka Sink(事务) | 需配置 EOS | 事务 producer + 2PC committer |
| JDBC / ES(无 2PC) | at-least-once | 幂等 UPSERT 或外部去重 |
| Iceberg / Hudi Flink sink | 2PC | checkpoint complete 后 commit(lakehouse/19) |
自定义 SinkFunction |
通常 at-least-once | 自行实现 TwoPhaseCommitSinkFunction |
lakehouse
第 19 章 从 表格式侧 说明 writer /
committer 分离与提交幂等;本文强调 引擎须在
notifyCheckpointComplete 才触发
commit,否则表格式 CAS 再完美也无法 EOS。
4.3 重复写入的三种形态
| 形态 | 典型原因 | 下游表现 |
|---|---|---|
| 完全 duplicate record | 无 2PC sink,checkpoint 失败后重放 | 两行内容完全相同 |
| 同 key 多次 UPDATE | CDC upsert 重复 | 依赖版本列;无版本则「最后写 wins」 |
| 跨 checkpoint 的 partial batch | JDBC batch 中途 commit | 部分行重复、部分丢失(最严重) |
区分形态决定修复手段:完全 duplicate 可用 主键 dedup;partial batch 必须 batch 级 2PC 或整批幂等。
五、三层组合矩阵
5.1 典型组合
| Source | 引擎 checkpoint | Sink | 端到端 |
|---|---|---|---|
| Kafka offset 可重放 | AT_LEAST_ONCE | 无 2PC JDBC | at-least-once |
| Kafka offset 可重放 | EXACTLY_ONCE | 无 2PC | at-least-once(state 不重复计,sink 仍重复写) |
| Kafka + 事务读 | EXACTLY_ONCE | Kafka 事务 2PC | exactly-once(Kafka→Kafka 管道) |
| Kafka | EXACTLY_ONCE | Iceberg 2PC sink | exactly-once(入湖,见第 15 篇) |
| 自定义 source 丢 offset | 任意 | 任意 | 可能 at-most-once 丢数据 |
5.2 最弱环决策流
flowchart TD
Q1{Source 可重放<br/>且 offset 在 checkpoint?}
Q1 -->|否| AMO[可能 at-most-once 丢数]
Q1 -->|是| Q2{引擎 EXACTLY_ONCE?}
Q2 -->|否| ALO[至少 at-least-once]
Q2 -->|是| Q3{Sink 2PC 或幂等?}
Q3 -->|否| ALO2[at-least-once<br/>state 精确 sink 重复]
Q3 -->|是| EOS[端到端 exactly-once<br/>(在协议假设内)]
5.3 与 distributed 系列的一致性直觉
distributed 系列 讨论复制与共识:Kafka ISR 保证 日志不丢(在 acks/min.isr 配置下),不保证 consumer 只读一次。Flink checkpoint 是 算子级一致性点,类似 Chandy-Lamport 快照(第 10 篇)。端到端 EOS 需要 快照点与外部事务 commit 同序,这是 第 15 篇 的两阶段提交要解决的问题,不是 CAP 定理的「魔法开关」。
六、三类修复手段
6.1 幂等 Sink
思路:同一条逻辑记录写多次,外部效果相同。
| 手段 | 示例 |
|---|---|
| 主键 UPSERT | JDBC INSERT ... ON CONFLICT UPDATE |
| 确定性消息 key | Kafka producer 同 key 覆盖(依赖 compact topic 语义时需谨慎) |
| 业务 idempotency key | 下游服务 dedup 表 |
限制:UPDATE 非幂等、DELETE 重复执行可能出错;需要 单调版本号 或 只 append 不可变事实。
6.2 去重(Deduplication)
在引擎内或下游维护 已见 event id 集合:
- Flink
KeyedProcessFunction+ValueState<Boolean>存eventId(state 成本); - 下游 OLAP / DB 唯一约束 + 拒绝重复 insert;
- Kafka Streams
suppress+ changelog(对照引擎,见 第 18 篇)。
去重将 exactly-once 效果 建立在 有界或可回收的 dedup state 上——与 第 13 篇 的状态膨胀同一账本。
6.3 两阶段提交(2PC)
思路:外部写入分 预提交(不可见)与 提交(可见);Flink 仅在 checkpoint 全局完成 后调用 commit。
- 引擎:
snapshotState→ pre-commit;notifyCheckpointComplete→ commit; - 失败回滚:abort 未 commit 事务,重放后重新 pre-commit + commit。
Kafka 事务 sink、Iceberg
IcebergFilesCommitter、JDBC
XA(支持有限)均走此路。细节见 第 15
篇,此处不重复协议步骤。
七、Kafka 事务与 Flink 的衔接(概要)
Kafka 幂等
producer(enable.idempotence=true)用
pid + sequence 在 单分区
上去重,解决 producer
重试导致的重复,不解决 consumer
重复读。
Kafka 事务 producer 扩展为跨 partition
的原子 write + commit marker;consumer
isolation.level=read_committed
跳过未提交事务。
Flink Kafka Sink V2 的 exactly-once 实现(Flink 文档 Kafka Connector):
- 每个 checkpoint 边界
beginTransaction; - 写入随 checkpoint pre-commit;
notifyCheckpointComplete时commitTransaction;- 失败
abortTransaction,恢复后新事务重写。
这与 lakehouse 第 19 章 的 Iceberg 2PC 同构:预提交进 checkpoint 状态,提交挂 checkpoint 完成回调。表格式侧 CAS 见 lakehouse 第 11 章,引擎侧职责是 不提前 commit。
7.1 如何验证管道语义(不伪造数字)
| 方法 | 适用 |
|---|---|
| 唯一 id 计数 | 源端每条 record 带 uuid,sink 端
COUNT DISTINCT |
| 对账表 | 按日 partition 汇总源/汇 SUM(amount) |
| Kafka 事务 marker | read_committed consumer 看不到 aborted txn |
| Iceberg snapshot 历史 | 每次 commit 对应一次 completed checkpoint(时间相关,见第 17 篇) |
杀 TM 实验见 第 15 篇 第七节;本文只列 验证口径,不填未执行环境的计数。
八、配置清单(Flink 侧)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(600_000);
// Kafka source — 读写分离配置见 connector 文档
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input")
.setGroupId("flink-eos-demo")
.setStartingOffsets(OffsetsInitializer.committedOffsets())
.setProperty("isolation.level", "read_committed") // 上游有事务写时
.build();
// Kafka sink EOS — 须使用官方 EOS sink 实现并设置 transactional.id 前缀
// Iceberg sink — execution.checkpointing.mode EXACTLY_ONCE + catalog 配置execution.checkpointing.mode 在 Flink 2.x
配置键可能有迁移,以发行版 flink-conf.yaml
为准。
不保证 EOS 的配置:
CheckpointingMode.AT_LEAST_ONCE+ 任意 sink;- EXACTLY_ONCE 引擎 + 自定义 sink 在
invoke里直接写外部 DB; - checkpoint 禁用或 interval 极大 + 依赖 Kafka auto-commit 作唯一进度。
九、怎么选语义
| 场景 | 建议 | 理由 |
|---|---|---|
| 实时大屏、近似计数 | at-least-once + 幂等聚合 | 简单,重复计数可容忍或 merge |
| Kafka → Kafka 转发 | Flink EOS + Kafka 事务 | 官方路径成熟 |
| CDC → Iceberg 主键表 | EXACTLY_ONCE + Iceberg 2PC sink | 重复行破坏 upsert 语义 |
| 写 MySQL 报表库 | at-least-once + UPSERT | JDBC 2PC 支持差 |
| 金融 ledger | 端到端 EOS + 审计对账 | 2PC + 外部对账双保险 |
「选 at-least-once 还是 exactly-once」不是道德题,是 sink 能力、延迟、运维复杂度与错误成本 的乘积。EXACTLY_ONCE 的代价:checkpoint 对齐、事务协调、commit 延迟(第 17 篇 的小文件与可见延迟)。
9.1 Spark Structured Streaming 对照(一句话)
Spark SS 的 foreachBatch + 手动提交若未对
batchId 做幂等,micro-batch 重试会导致
at-least-once 对外部系统;官方
Iceberg/Delta sink 用 checkpoint WAL 绑定 offset 与
commit。Flink 用 barrier + 2PC
实现同类效果——引擎不同,三层最弱环
规律相同(第 18
篇 对照表展开)。
9.2 监控字段
| 组件 | 字段 | 含义 |
|---|---|---|
| Flink | numberOfCompletedCheckpoints / failed |
checkpoint 健康度 |
| Flink Kafka source | currentOffsets vs lag |
是否与预期 replay 一致 |
| Kafka consumer(下游) | records-lag 突刺 + duplicate key 告警 |
可能 at-least-once 重复写 |
| Iceberg | snapshot 时间戳 vs Flink last checkpoint | commit 是否跟上(粗粒度) |
9.3 端到端语义 checklist(发布前)
- Source:offset / CDC 位点是否写入 checkpoint?起始模式是否符合补数预期?
- 引擎:
CheckpointingMode是否为 EXACTLY_ONCE?是否存在侧输出 / 异步 IO 绕开 barrier? - Sink:是否官方 EOS / 2PC
实现?
transactional.id或 catalog 权限是否隔离环境? - 外部:下游 consumer 是否
read_committed?Iceberg 表是否开启 upsert 且 equality field 正确? - 运维:checkpoint 超时、Kafka txn timeout、commit 冲突告警是否配置?
五项齐备才声称「端到端 exactly-once」;缺任一项应明确降级为 at-least-once 并设计幂等。
9.4 与第 15 篇的分工
本文建立 三层语义与组合规则;第 15
篇 专讲 2PC 如何实现 Sink 层
exactly-once(preCommit /
commit 时序、Kafka/Iceberg
落点、失败恢复)。读顺序:先弄清「最弱环在哪」,再深入「commit
钩子怎么挂 checkpoint」。
十、边界与后文
本文 不 展开 GenericTwoPhaseCommitSink
的 beginTransaction / preCommit /
commit 调用序(第 15
篇),也 不 重复 lakehouse
第 19 章 的 equality delete 与 Connect control
topic。
下一篇 第 15 篇:Kafka / JDBC / Iceberg sink 的 2PC 落点、失败时间线、与 lakehouse/11 CAS 提交的对称关系。
参考资料
- Apache Flink Documentation, Fault Tolerance Guarantees、Checkpointing、Delivery Semantics。A 级。
- Apache Flink Documentation, Kafka Connector(EOS sink、offset 与 checkpoint)。A 级。
- Apache Kafka Documentation, Transactions、Idempotent Producer。A 级。
- Chandy & Lamport, Distributed Snapshots: Determining Global States of Distributed Systems(1985)。A 级(快照直觉)。
- 本系列:第 10 篇 Checkpoint、第 13 篇 状态调优。
- 本仓库:lakehouse 第 19 章 流式 CDC 入湖、lakehouse 第 11 章 提交并发(Sink 侧 CAS,第 15 篇交叉引用)。
返回 系列目录 · 上一篇 状态放大、Compaction 与调优 · 下一篇 两阶段提交与端到端 Exactly-Once
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照
从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。
【流式数据处理】Kafka · Flink · 状态 · Exactly-Once
承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。
【流式数据处理】两阶段提交与端到端 Exactly-Once
拆解 Flink GenericTwoPhaseCommitSink 协议:preCommit 进 checkpoint、commit 挂 notifyCheckpointComplete;对照 Kafka 事务 sink、JDBC 与 Iceberg 2PC 落点,以及 commit 前/后崩溃与重复 commit 的幂等边界——与 lakehouse/11 CAS、lakehouse/19 入湖侧对读,不重复表格式全文。
【流式数据处理】背压、故障模式与引擎对照
收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。