土法炼钢兴趣小组的算法知识备份

【流式数据处理】交付语义:从 at-most-once 到 exactly-once

文章导航

分类入口
databasedistributed
标签入口
#flink#delivery-semantics#exactly-once#at-least-once#at-most-once#checkpoint#kafka#idempotent#fault-tolerance

目录

第 10 篇 讲 checkpoint 如何把 Kafka offset 与算子 state 绑在同一一致性点;第 13 篇 讲 RocksDB 状态如何膨胀。运维问得最多的仍是:「开了 exactly-once,为什么下游还有重复?」「at-least-once 够不够?」「重复消费和重复写入是一回事吗?」

这些问题不能用一个开关回答。流管道的交付语义是 Source、引擎、Sink 三层各自承诺的叠加;端到端语义由 最弱环 决定。Flink 的 EXACTLY_ONCE checkpoint 模式只保证 引擎内部 对每条 record 的处理效果等价于「恰好一次」——不自动保证 Kafka 里没重复、Iceberg 里没双份。

本文建立三层模型,逐层说明 at-most-once / at-least-once / exactly-once 的定义、Flink 配置落点、以及三类修复手段(幂等、去重、两阶段提交)。第 15 篇 在此基础上展开 GenericTwoPhaseCommitSink 与 lakehouse 表提交的对齐。

本文是「流式数据处理」系列第 14 篇(共 18 篇)。→ 系列目录

篇目 核心内容
第 10 篇 · Checkpoint Barrier 对齐与一致性快照
第 14 篇 · 交付语义 三层组合、最弱环、修复手段
第 15 篇 · 两阶段提交 2PC sink 与端到端 EOS

版本锚定:Flink 1.20+ / 2.x;Kafka 3.x 新 Source/Sink API 与事务 producer。语义描述以 Flink Documentation Fault Tolerance Guarantees 为准。

环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2),未安装 JVM / Flink / Kafka。机制来自 Flink 与 Kafka 官方文档;不伪造故障注入实验的计数结果


一、三层模型:端到端语义由最弱环决定

1.1 为什么需要分层

一条记录从进入管道到落进外部系统,至少经过三段独立进度(第 10 篇):

负责什么 典型持久化点
Source 从日志/CDC 读入,能否 replay、读到哪 Kafka offset、Debezium Connect offset
引擎 算子变换、state 更新、barrier 对齐 Flink checkpoint(state + source state)
Sink 对外部系统的副作用(写 topic、写表、RPC) Kafka 事务 commit、Iceberg snapshot、JDBC commit
flowchart LR
  SRC["Source 层<br/>offset / 可重放"]
  ENG["引擎层<br/>checkpoint + state"]
  SNK["Sink 层<br/>外部副作用"]
  SRC --> ENG --> SNK

组合规则(Flink 文档 Fault Tolerance Guarantees,A 级):

\[ \text{端到端语义} = f(\text{Source},\ \text{引擎},\ \text{Sink}) \]

若任一层仅为 at-least-once,整体 不可能 达到 exactly-once,除非下游用幂等/去重 在应用层伪造 exactly-once 效果。

1.2 三种语义的精确定义

单条 record 的效果(非「物理上网络只传一次」):

语义 定义 失败重试时的典型表现
at-most-once 每条 record 最多 被处理一次;可能 丢失 未 checkpoint 的进度与 state 一起丢
at-least-once 每条 record 至少 被处理一次;可能 重复 从上次 checkpoint 重放,已写入 sink 的可能再写
exactly-once 每条 record 的效果等价于 恰好处理一次 重放与外部提交对齐,重复物理写入被抵消或不可见

「物理 exactly-once」在分布式系统里通常不可达(网络重传、进程崩溃)。工程上的 exactly-once 指 可观测结果 的等价性——与数据库 REDO/UNDO 的直觉一致。

1.3 常见误区

误区 纠正
开了 EXACTLY_ONCE 就端到端 EOS 只保证引擎内部;Sink 须支持 2PC 或幂等
at-least-once「不够生产」 下游幂等 + 可接受少量重复窗口时,at-least-once 更简单
Kafka acks=all = exactly-once acks 管 broker 持久化,不管 consumer 重复读
checkpoint 成功 = 数据已入湖 Iceberg commit 在 notifyCheckpointComplete,见 lakehouse 第 19 章

二、Source 层:可重放与起始位点

2.1 Kafka Source 与 offset

Flink KafkaSource 在 checkpoint 中持久化 各 partition 的 offset(Flink Kafka Connector 文档,A 级)。恢复时:

第 5 篇(若已读)区分 broker committed offsetFlink checkpoint offset:容错以后者为准。

2.2 起始模式

StartupMode 语义影响
earliest 首次启动全量 replay;与 EOS 兼容但耗时长
latest 可能丢历史
group-offsets 依赖外部 committed offset,与 Flink checkpoint 混用时需明确主从
specific-offsets / timestamp 补数、对账场景

CDC source(Debezium、Flink CDC)还有 snapshot 阶段binlog 位点(本系列 第 16 篇)。Source 层 exactly-once 要求:位点与引擎 checkpoint 绑定,Connect 的 offset topic 单独使用时 不构成 Flink 端到端 EOS。

2.3 Source 层能单独保证什么

Source 层 不能 单独保证「写下游 exactly-once」。

2.4 重复消费从哪来

即使 Source 正确 checkpoint offset,下列情况仍会出现 重复读

原因 说明
Checkpoint 失败回滚 从上一个 completed checkpoint 重放自上次成功点以来的所有 record
作业 restart 未从 savepoint 若手动指定 setStartFromEarliest 等,offset 重置
Kafka consumer rebalance 非 Flink 管理的 standalone consumer;Flink source 由 checkpoint 管 offset,rebalance 期间仍可能重复处理 未完成 checkpoint 的批次
上游 topic compaction 不适用 compact topic 只保证 key 最新值,不保证 Flink 读路径不 replay

重复读 ≠ 重复生效:引擎 EXACTLY_ONCE 模式下,重复读不应改变 keyed state 的计数结果;无 2PC 的 sink 仍会把重复读写出


3.1 CheckpointingMode

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 或 CheckpointingMode.AT_LEAST_ONCE
模式 Barrier 行为 State 更新
EXACTLY_ONCE 多输入算子 对齐 barrier,对齐前缓存的数据不参与当前 checkpoint 快照与 barrier 切分一致
AT_LEAST_ONCE 不对齐,barrier 与 record 可交错 快照可能含「将被重放」的数据 → 恢复后 state 可能 多计

Flink 文档明确:EXACTLY_ONCE 模式不启用两阶段提交 sink 时,端到端仍是 at-least-once——因为 sink 可能在 checkpoint 完成前已写外部系统,失败后重放会 重复写

3.2 与 Unaligned Checkpoint 的关系

Unaligned checkpoint(第 10 篇)缩短背压下对齐等待,不改变 EXACTLY_ONCE 对 state 的语义,仍要求 exactly-once 模式的 barrier 协议。调 unaligned 是为了 可用性/延迟,不是为了把 at-least-once 升级成 EOS。

3.3 引擎层 exactly-once 的边界

引擎保证的是:从 source 重放同一批 input 时,算子 state 与内部 side effect(如不含 2PC 的 print)与「每条 record 处理一次」一致

不包含

3.4 侧输出(Side Output)与语义

OutputTag 侧输出流若 再 sink 到外部系统 且未走 2PC,其语义 独立于 主输出的 checkpoint 模式。常见陷阱:主链路 EXACTLY_ONCE + 侧输出 print 或写 Kafka 非事务 topic → 侧输出仍是 at-least-once。

3.5 算子链(Operator Chain)不改变语义

第 7 篇 的 chain 合并只减少序列化;barrier 仍按 JobGraph 边传播。chain 内的最后一个算子独立 Sink 算子 在 2PC 挂载点上无区别——EOS 取决于 Sink 是否实现 2PC,不取决于是否 chain。


四、Sink 层:外部副作用与可见性

4.1 为什么 Sink 是最常见短板

Sink 写入 外部系统,其 commit 点与 Flink checkpoint 默认不同步。典型失败时序:

sequenceDiagram
  participant E as 引擎
  participant S as Sink(无 2PC)
  participant K as Kafka 目标 topic
  E->>S: 处理 record r,调用 send
  S->>K: 写入 r(已可见)
  E->>E: checkpoint 进行中…
  Note over E: TM 崩溃
  E->>E: 从上个 checkpoint 恢复
  E->>S: 重放 r
  S->>K: 再次写入 r

无 2PC 时,至少一次写入 不可避免。修复路径:Sink 层 2PC第 15 篇)、幂等 write、或下游 去重

4.2 Sink 类型与默认语义

Sink 类型 默认语义 EOS 条件
print / 无状态副作用 随引擎 引擎 EXACTLY_ONCE 即可
Kafka Sink(事务) 需配置 EOS 事务 producer + 2PC committer
JDBC / ES(无 2PC) at-least-once 幂等 UPSERT 或外部去重
Iceberg / Hudi Flink sink 2PC checkpoint complete 后 commit(lakehouse/19)
自定义 SinkFunction 通常 at-least-once 自行实现 TwoPhaseCommitSinkFunction

lakehouse 第 19 章表格式侧 说明 writer / committer 分离与提交幂等;本文强调 引擎须在 notifyCheckpointComplete 才触发 commit,否则表格式 CAS 再完美也无法 EOS。

4.3 重复写入的三种形态

形态 典型原因 下游表现
完全 duplicate record 无 2PC sink,checkpoint 失败后重放 两行内容完全相同
同 key 多次 UPDATE CDC upsert 重复 依赖版本列;无版本则「最后写 wins」
跨 checkpoint 的 partial batch JDBC batch 中途 commit 部分行重复、部分丢失(最严重)

区分形态决定修复手段:完全 duplicate 可用 主键 dedup;partial batch 必须 batch 级 2PC 或整批幂等


五、三层组合矩阵

5.1 典型组合

Source 引擎 checkpoint Sink 端到端
Kafka offset 可重放 AT_LEAST_ONCE 无 2PC JDBC at-least-once
Kafka offset 可重放 EXACTLY_ONCE 无 2PC at-least-once(state 不重复计,sink 仍重复写)
Kafka + 事务读 EXACTLY_ONCE Kafka 事务 2PC exactly-once(Kafka→Kafka 管道)
Kafka EXACTLY_ONCE Iceberg 2PC sink exactly-once(入湖,见第 15 篇)
自定义 source 丢 offset 任意 任意 可能 at-most-once 丢数据

5.2 最弱环决策流

flowchart TD
  Q1{Source 可重放<br/>且 offset 在 checkpoint?}
  Q1 -->|否| AMO[可能 at-most-once 丢数]
  Q1 -->|是| Q2{引擎 EXACTLY_ONCE?}
  Q2 -->|否| ALO[至少 at-least-once]
  Q2 -->|是| Q3{Sink 2PC 或幂等?}
  Q3 -->|否| ALO2[at-least-once<br/>state 精确 sink 重复]
  Q3 -->|是| EOS[端到端 exactly-once<br/>(在协议假设内)]

5.3 与 distributed 系列的一致性直觉

distributed 系列 讨论复制与共识:Kafka ISR 保证 日志不丢(在 acks/min.isr 配置下),不保证 consumer 只读一次。Flink checkpoint 是 算子级一致性点,类似 Chandy-Lamport 快照(第 10 篇)。端到端 EOS 需要 快照点与外部事务 commit 同序,这是 第 15 篇 的两阶段提交要解决的问题,不是 CAP 定理的「魔法开关」。


六、三类修复手段

6.1 幂等 Sink

思路:同一条逻辑记录写多次,外部效果相同。

手段 示例
主键 UPSERT JDBC INSERT ... ON CONFLICT UPDATE
确定性消息 key Kafka producer 同 key 覆盖(依赖 compact topic 语义时需谨慎)
业务 idempotency key 下游服务 dedup 表

限制:UPDATE 非幂等、DELETE 重复执行可能出错;需要 单调版本号只 append 不可变事实

6.2 去重(Deduplication)

在引擎内或下游维护 已见 event id 集合:

去重将 exactly-once 效果 建立在 有界或可回收的 dedup state 上——与 第 13 篇 的状态膨胀同一账本。

6.3 两阶段提交(2PC)

思路:外部写入分 预提交(不可见)与 提交(可见);Flink 仅在 checkpoint 全局完成 后调用 commit。

Kafka 事务 sink、Iceberg IcebergFilesCommitter、JDBC XA(支持有限)均走此路。细节见 第 15 篇,此处不重复协议步骤。


Kafka 幂等 producerenable.idempotence=true)用 pid + sequence 在 单分区 上去重,解决 producer 重试导致的重复,不解决 consumer 重复读

Kafka 事务 producer 扩展为跨 partition 的原子 write + commit marker;consumer isolation.level=read_committed 跳过未提交事务。

Flink Kafka Sink V2 的 exactly-once 实现(Flink 文档 Kafka Connector):

  1. 每个 checkpoint 边界 beginTransaction
  2. 写入随 checkpoint pre-commit;
  3. notifyCheckpointCompletecommitTransaction
  4. 失败 abortTransaction,恢复后新事务重写。

这与 lakehouse 第 19 章 的 Iceberg 2PC 同构预提交进 checkpoint 状态,提交挂 checkpoint 完成回调。表格式侧 CAS 见 lakehouse 第 11 章,引擎侧职责是 不提前 commit

7.1 如何验证管道语义(不伪造数字)

方法 适用
唯一 id 计数 源端每条 record 带 uuid,sink 端 COUNT DISTINCT
对账表 按日 partition 汇总源/汇 SUM(amount)
Kafka 事务 marker read_committed consumer 看不到 aborted txn
Iceberg snapshot 历史 每次 commit 对应一次 completed checkpoint(时间相关,见第 17 篇)

杀 TM 实验见 第 15 篇 第七节;本文只列 验证口径,不填未执行环境的计数。


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(600_000);

// Kafka source — 读写分离配置见 connector 文档
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("input")
    .setGroupId("flink-eos-demo")
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    .setProperty("isolation.level", "read_committed") // 上游有事务写时
    .build();

// Kafka sink EOS — 须使用官方 EOS sink 实现并设置 transactional.id 前缀
// Iceberg sink — execution.checkpointing.mode EXACTLY_ONCE + catalog 配置

execution.checkpointing.mode 在 Flink 2.x 配置键可能有迁移,以发行版 flink-conf.yaml 为准。

不保证 EOS 的配置


九、怎么选语义

场景 建议 理由
实时大屏、近似计数 at-least-once + 幂等聚合 简单,重复计数可容忍或 merge
Kafka → Kafka 转发 Flink EOS + Kafka 事务 官方路径成熟
CDC → Iceberg 主键表 EXACTLY_ONCE + Iceberg 2PC sink 重复行破坏 upsert 语义
写 MySQL 报表库 at-least-once + UPSERT JDBC 2PC 支持差
金融 ledger 端到端 EOS + 审计对账 2PC + 外部对账双保险

「选 at-least-once 还是 exactly-once」不是道德题,是 sink 能力、延迟、运维复杂度与错误成本 的乘积。EXACTLY_ONCE 的代价:checkpoint 对齐、事务协调、commit 延迟(第 17 篇 的小文件与可见延迟)。

9.1 Spark Structured Streaming 对照(一句话)

Spark SS 的 foreachBatch + 手动提交若未对 batchId 做幂等,micro-batch 重试会导致 at-least-once 对外部系统;官方 Iceberg/Delta sink 用 checkpoint WAL 绑定 offset 与 commit。Flink 用 barrier + 2PC 实现同类效果——引擎不同,三层最弱环 规律相同(第 18 篇 对照表展开)。

9.2 监控字段

组件 字段 含义
Flink numberOfCompletedCheckpoints / failed checkpoint 健康度
Flink Kafka source currentOffsets vs lag 是否与预期 replay 一致
Kafka consumer(下游) records-lag 突刺 + duplicate key 告警 可能 at-least-once 重复写
Iceberg snapshot 时间戳 vs Flink last checkpoint commit 是否跟上(粗粒度)

9.3 端到端语义 checklist(发布前)

  1. Source:offset / CDC 位点是否写入 checkpoint?起始模式是否符合补数预期?
  2. 引擎CheckpointingMode 是否为 EXACTLY_ONCE?是否存在侧输出 / 异步 IO 绕开 barrier?
  3. Sink:是否官方 EOS / 2PC 实现?transactional.id 或 catalog 权限是否隔离环境?
  4. 外部:下游 consumer 是否 read_committed?Iceberg 表是否开启 upsert 且 equality field 正确?
  5. 运维:checkpoint 超时、Kafka txn timeout、commit 冲突告警是否配置?

五项齐备才声称「端到端 exactly-once」;缺任一项应明确降级为 at-least-once 并设计幂等。

9.4 与第 15 篇的分工

本文建立 三层语义与组合规则第 15 篇 专讲 2PC 如何实现 Sink 层 exactly-oncepreCommit / commit 时序、Kafka/Iceberg 落点、失败恢复)。读顺序:先弄清「最弱环在哪」,再深入「commit 钩子怎么挂 checkpoint」。


十、边界与后文

本文 展开 GenericTwoPhaseCommitSink 的 beginTransaction / preCommit / commit 调用序(第 15 篇),也 重复 lakehouse 第 19 章 的 equality delete 与 Connect control topic。

下一篇 第 15 篇:Kafka / JDBC / Iceberg sink 的 2PC 落点、失败时间线、与 lakehouse/11 CAS 提交的对称关系。


参考资料

  1. Apache Flink Documentation, Fault Tolerance GuaranteesCheckpointingDelivery Semantics。A 级。
  2. Apache Flink Documentation, Kafka Connector(EOS sink、offset 与 checkpoint)。A 级。
  3. Apache Kafka Documentation, TransactionsIdempotent Producer。A 级。
  4. Chandy & Lamport, Distributed Snapshots: Determining Global States of Distributed Systems(1985)。A 级(快照直觉)。
  5. 本系列:第 10 篇 Checkpoint第 13 篇 状态调优
  6. 本仓库:lakehouse 第 19 章 流式 CDC 入湖lakehouse 第 11 章 提交并发(Sink 侧 CAS,第 15 篇交叉引用)。

返回 系列目录 · 上一篇 状态放大、Compaction 与调优 · 下一篇 两阶段提交与端到端 Exactly-Once

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照

从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。

2026-07-01 · database / distributed

【流式数据处理】两阶段提交与端到端 Exactly-Once

拆解 Flink GenericTwoPhaseCommitSink 协议:preCommit 进 checkpoint、commit 挂 notifyCheckpointComplete;对照 Kafka 事务 sink、JDBC 与 Iceberg 2PC 落点,以及 commit 前/后崩溃与重复 commit 的幂等边界——与 lakehouse/11 CAS、lakehouse/19 入湖侧对读,不重复表格式全文。

2026-07-01 · database / distributed

【流式数据处理】背压、故障模式与引擎对照

收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。


By .