第
5 篇 把 ISR、HW 与 consumer group offset
讲清楚了:acks=all 保证副本 committed,但
producer 重试 仍可能在日志里留下
重复 batch——网络超时后客户端不知道 broker
是否已写入,再次发送即两条相同 offset 不同的记录。Flink
exactly-once 要求 Kafka → 计算 → Kafka
链路无重复无丢失(第
14、15 篇),Kafka 侧必须提供 幂等写入
与 跨 partition 原子提交。Kafka 0.11+ 的
幂等 producer 与 事务
producer(KIP-98 等)就是为此设计的。
本文讲 Apache Kafka 3.x(KRaft) 下事务语义,回答:
- 幂等 producer 如何用
ProducerId(PID)与 sequence number 去重? - 事务 producer 的
transactional.id、__transaction_state、commit/abort 各改变什么可见性? - Consumer
isolation.level=read_committed与read_uncommitted差在哪? - Kafka EOS 止于此集群边界;与 Flink checkpoint 如何拼接成端到端 EOS?
先修:第 4 篇 RecordBatch;第 5 篇 HW 与 offset。Flink 侧 Kafka sink 2PC 见 第 15 篇。
环境说明:本机 WSL2(Linux 6.6.87.2)、i9-12900K / 32 GiB,未在本写作环境运行 Kafka 事务实验。机制来自 Apache Kafka Documentation(Transactions、Producer Configs)、KIP-98、KIP-447(增量式事务);不粘贴未执行的 consume 输出或 transaction marker 十六进制 dump。文末给出可复现步骤与应观测行为。
版本锚定:Kafka 3.x。事务协调者(TransactionCoordinator)与 KRaft controller 共存于 broker 进程模型;事务协议与 ZK 时代一致,元数据由 KRaft 管理(来源:Transactions、KRaft)。
一、为什么需要幂等与事务
1.1 至少一次发送的重复问题
Producer 默认
至少一次(retries > 0):send()
后未收到 ack,客户端重试。Broker 可能
已写入第一次,第二次又 append →
重复消息(不同 offset,相同 payload)。
下游 无状态 consumer 可能无所谓;Flink keyed state 会把重复事件算两次(第 9 篇)。修复路径:
| 层级 | 手段 |
|---|---|
| Kafka broker | 幂等 producer 去重 |
| Kafka 多 partition | 事务原子 commit |
| Flink 引擎 | checkpoint + 2PC sink(第 15 篇) |
| 业务 sink | 幂等键、UPSERT |
1.2 三层语义组合(预览)
端到端 exactly-once = Source 语义 × 引擎语义 × Sink 语义 的最弱环(第 14 篇)。本篇覆盖 Kafka Producer/Sink 作为 Source/Sink 时 的单集群保证;跨 JDBC、Iceberg 需 Flink 2PC 配合。
二、幂等 Producer
启用
enable.idempotence=true(Kafka
2.0+;与 acks=all、适当 retries
联动,来源:Producer Configs)时,broker 为 producer 分配
Producer ID(PID),每个
(PID, partition) 维护 sequence
number。
2.1 去重规则
Leader broker 收到 batch 时检查(来源:Design,Idempotent Producer):
- 若 sequence 等于 期望的下一个序号 → 正常 append,序号 +1;
- 若 小于 期望(重复)→ 丢弃,仍返回 success(客户端认为已写入);
- 若 大于 期望(out-of-order gap)→ 抛出 OutOfOrderSequenceException,通常需重置 producer。
因此 单 partition 内 重试不会产生重复 offset;跨 partition 仍无原子性——一条成功一条失败时,consumer 可能只看到部分。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 隐式设置 acks=all, retries=MAX, max.in.flight.requests.per.connection<=5(版本相关)
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", key, value));
producer.flush();
producer.close();2.2 PID 生命周期
PID 由 TransactionCoordinator 分配(即使不开事务,幂等路径也走 TC 的 PID 池)。Producer 重启 后 PID 变化,去重窗口重置——幂等只保证 同一 producer 实例生命周期内 的重试安全。
transactional.id
未设置时,实例重启 不能 关联旧 PID。要
跨 session fencing,必须启用事务
ID(第三节)。
2.3 与
max.in.flight.requests.per.connection
幂等模式下 in-flight 请求数受限(默认 5),保证 broker 端 sequence 检测有意义。过大 in-flight + 非幂等曾导致 reorder;幂等路径下 broker 强制顺序语义。
2.4 幂等与事务的配置联动
开启
enable.idempotence=true
时,客户端自动设置(来源:Producer
Configs,Idempotence):
| 配置 | 幂等下的典型值 | 原因 |
|---|---|---|
acks |
all |
与 ISR committed 一致 |
retries |
Integer.MAX_VALUE |
安全重试 |
max.in.flight.requests.per.connection |
\(\leq 5\) | 保证 sequence 可判定 |
显式设置冲突值(如 acks=1)会导致
ConfigException
或自动覆盖——以客户端版本文档为准。仅幂等、不
设 transactional.id 时,不能
调用 beginTransaction()。
三、事务 Producer
事务 在幂等之上增加:一组
partition 上的消息要么全部 commit 可见,要么全部 abort
不可见(consumer read_committed
视角)。核心 API(Java,KafkaProducer):
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-txn-app-1");
producer.initTransactions(); // 注册 transactional.id,可能 fencing 旧实例
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("out", "k1", "v1"));
producer.send(new ProducerRecord<>("out", "k2", "v2"));
producer.commitTransaction(); // 原子标记 commit
} catch (Exception e) {
producer.abortTransaction();
}3.1 生命周期状态机
stateDiagram-v2
[*] --> Uninitialized: new producer
Uninitialized --> Ready: initTransactions OK
Ready --> InTransaction: beginTransaction
InTransaction --> Ready: commitTransaction
InTransaction --> Ready: abortTransaction
Ready --> FatalError: unrecoverable error
| 阶段 | 作用 |
|---|---|
initTransactions() |
向 TransactionCoordinator 注册
transactional.id;完成 PID
分配;fencing 同一 transactional.id 的旧
producer epoch |
beginTransaction() |
开始新事务,后续 send 带 transaction marker |
commitTransaction() |
写 commit marker;消息对
read_committed 可见 |
abortTransaction() |
写 abort marker;未 commit 数据对
read_committed 不可见 |
来源:Kafka Documentation,Transactions;协议层 Transaction Markers。
3.2
transactional.id 与 Fencing
transactional.id 是
逻辑 producer
身份(字符串,用户指定)。新实例
initTransactions() 时,TC 把该 ID 的
producer epoch 递增;旧实例再 send 会得到
ProducerFencedException——防止 僵尸
producer 在 failover 后双写。
Flink Kafka Sink 在 exactly-once 模式下为每个 subtask 或作业分配 唯一 transactional.id 前缀(实现见 Flink Kafka connector 文档),与 checkpoint epoch 对齐(第 15 篇)。
3.3 内部
Topic:__transaction_state
TransactionCoordinator 把事务元数据持久化在内部 topic
__transaction_state(compact,第
4 篇)。内容包括
transactional.id、PID、epoch、事务状态(Ongoing /
PrepareCommit / CompleteCommit / PrepareAbort /
CompleteAbort 等)。
TC 本身由
transaction.state.log.replication.factor
等配置保护;生产应与业务 topic 同级副本策略。
3.4 Transaction Marker 与 HW
事务消息先作为 普通 batch 写入 partition
log(对 read_uncommitted
可见);commitTransaction 时写入
ControlBatch(commit/abort
marker)。Consumer
read_committed 在 HW 内
过滤 未 commit 事务的数据,仅交付已 commit
事务内的 record + 非事务 record。
Abort 的消息仍占 disk offset,但对 committed 隔离不可见——存储与可见性分离。
3.5 事务状态在 TC 侧的存储
TransactionCoordinator 对每个 ongoing 事务 维护状态(来源:Transactions 实现说明)。简化状态转移:
stateDiagram-v2
Empty --> Ongoing: beginTransaction
Ongoing --> PrepareCommit: commitTransaction 开始
PrepareCommit --> CompleteCommit: 所有 partition marker 写入
Ongoing --> PrepareAbort: abortTransaction
PrepareAbort --> CompleteAbort: abort marker 完成
CompleteCommit --> Empty: 清理
CompleteAbort --> Empty: 清理
PrepareCommit 阶段需向事务涉及的 所有 partition leader 写入 commit marker;任一 partition 不可用会导致 commit 阻塞或超时。这与 多 partition 原子性 的成本一致:事务跨度越大,失败面越大。
__transaction_state topic
的 RF 不足时,TC 可能无法持久化
epoch——整个集群事务不可用,而普通 produce
仍可能工作。运维监控应包含
TransactionCoordinatorLoad 与
__transaction_state
under-replicated。
四、Consumer 隔离级别
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");isolation.level |
行为 |
|---|---|
read_uncommitted(默认) |
返回 HW 内所有消息,含 open / aborted 事务数据 |
read_committed |
跳过 aborted 事务;open 事务数据 阻塞 交付直到 commit 或 abort(可能增加 lag) |
Flink exactly-once Kafka Source 使用 read_committed,避免读到 未 commit 的下游 sink 回写 或 中间事务态(与 Flink checkpoint 边界对齐,第 10 篇)。
4.1
consume-transform-produce 模式
Kafka 支持 同一 transactional.id 下
consumer + producer
原子:sendOffsetsToTransaction
把 consumer offset 与产出消息 同一事务
commit(来源:Transactions,Kafka
Streams EOS)。Flink 不 使用该 API
作为主路径,而用 checkpoint 存
offset;Kafka Streams 的 EOS 依赖此 API(第 18
篇 对照)。
// Kafka Streams / 手写 EOS 消费循环示意(非 Flink 主路径)
consumer.subscribe(List.of("input"));
producer.initTransactions();
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
producer.beginTransaction();
for (ConsumerRecord<String, String> rec : records) {
producer.send(new ProducerRecord<>("output", transform(rec)));
}
Map<TopicPartition, OffsetAndMetadata> offsets = computeOffsets(records);
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
}五、Kafka EOS 的边界
5.1 单集群、单事务内保证什么
| 保证 | 条件 |
|---|---|
| 单 partition 无重复 | enable.idempotence=true |
| 多 partition 原子可见 | 事务 commit + read_committed consumer |
| 跨 session fencing | transactional.id +
initTransactions |
| 持久化 | acks=all + 足够 ISR(第
5 篇) |
5.2 不保证什么
- 跨 Kafka 集群 原子性;
- Kafka + 外部 DB 原子性(需 Kafka Connect 或 Flink 2PC);
- Consumer 处理 + 外部副作用 原子性(需事务 consume-transform-produce 或 Flink);
- 无限长 事务:事务超时
transaction.timeout.ms(默认 60s,brokertransactions.max.timeout.ms上限)过大事务会被 abort。
5.3 事务超时与
max.block.ms
长时间 不 commit 的 open 事务会阻塞
read_committed consumer 在该
partition 上的进度(等待 marker)。Flink checkpoint 完成前
Kafka sink 可能处于 pre-commit
态——connector 设计必须 在超时前 commit 或
abort(第 15
篇)。
六、与 Flink Exactly-Once 的衔接
sequenceDiagram
participant F as Flink JobManager
participant TM as TaskManager
participant KS as Kafka Source
participant KK as Kafka Sink TX
participant KB as Kafka Broker
F->>TM: 触发 checkpoint N
TM->>KS: barrier 对齐 snapshot offset
TM->>KK: pre-commit 事务 N
TM->>F: ack checkpoint N
F->>F: checkpoint N 完成
TM->>KK: commitTransaction N
KK->>KB: commit markers
Note over KS,KB: read_committed 仅见 committed 事务
要点(来源:Flink Documentation,Kafka Connector、Two-Phase Commit):
- Source:offset 存在 Flink
checkpoint;consumer read_committed;不依赖
sendOffsetsToTransaction为主路径。 - Sink:
KafkaSinkexactly-once 使用 事务 producer;checkpoint notifyCheckpointComplete 后 commitTransaction;失败则 abort,下一 checkpoint 用新 epoch 重试。 - Transactional.id 必须
唯一且稳定(常含
job.id、subtask index);作业升级需注意 fencing(第 11 篇)。 - 端到端 EOS 还需 checkpoint 模式 EXACTLY_ONCE + sink 2PC 实现完整(第 14、15 篇)。
与 lakehouse 第 19 章 衔接:Iceberg Flink sink 的 2PC commit 与 Kafka 事务 独立;全局 EOS 由 Flink 在 同一 checkpoint 协调多个 sink 的 pre-commit/commit(第 17 篇)。
6.1 Checkpoint 与 Kafka 事务的两阶段对应
| Flink checkpoint 阶段 | Kafka transactional sink 典型动作 |
|---|---|
| Snapshot 开始 / barrier 对齐 | 继续缓冲或
beginTransaction(实现版本相关) |
snapshotState / pre-commit |
写入 batch 至 transaction,未 commit |
| JM 确认 checkpoint 成功 | commitTransaction() |
| Checkpoint 失败 / 作业 fail | abortTransaction(),consumer 不可见 |
Flink Kafka Sink 在
EXACTLY_ONCE 下要求
delivery.guarantee=EXACTLY_ONCE(Flink
1.15+ 统一 Sink API 配置键,以 connector
文档为准)。Semantic 与 checkpoint
模式 必须同时为 EXACTLY_ONCE,否则退化为
at-least-once。
6.2
transactional.id 命名与并行度变更
推荐模式:{prefix}-{jobId}-{subtaskIndex} 或
connector 生成的 UUID 稳定前缀。Rescale 或
改并行度 后 subtask index 变化 → 新
transactional.id → 旧 open 事务可能被
abort 或 fencing;从
savepoint 恢复时需确认 connector 的 id
生成规则与 状态兼容(第 11
篇)。
七、配置参考
7.1 Producer
| 配置 | 作用 |
|---|---|
enable.idempotence |
开启 PID + sequence |
transactional.id |
事务身份;启用事务必配 |
transaction.timeout.ms |
事务最大存活 |
acks |
幂等/事务要求 all |
max.in.flight.requests.per.connection |
幂等下受限 |
7.2 Broker
| 配置 | 作用 |
|---|---|
transaction.state.log.replication.factor |
__transaction_state 副本数 |
transaction.state.log.min.isr |
TC 日志 min ISR |
transactions.max.timeout.ms |
允许的最大 producer 事务超时 |
7.3 Consumer
| 配置 | 作用 |
|---|---|
isolation.level |
read_committed /
read_uncommitted |
enable.auto.commit |
EOS 消费循环通常 false(手动与事务 offset 绑定) |
7.4 与 Connect / MirrorMaker 的边界
Kafka Connect sink connector 若声明 exactly-once,底层同样依赖 事务 producer 或 幂等 + 下游 dedup(视 connector 文档)。MirrorMaker 2 跨集群复制 不 提供单全局事务——目标集群是 独立 EOS 域。设计 多集群灾备 时勿假设 Kafka 事务跨集群原子。
八、本地复现(行为级,无伪造输出)
实验 A:幂等 vs 非幂等
- KRaft 单节点启动(第 4 篇 第七节步骤)。
- 编写 producer:
acks=all,模拟 send 后不确认、强制重试同一 batch(或断网重试)。 - 对比
enable.idempotence=true/false,用 console consumerread_uncommitted统计 相同 key/value 条数。
预期:幂等路径 仅一条;非幂等 多条。
实验 B:事务 abort 与 read_committed
- 创建 topic
txn-demo。 - 事务 producer:
beginTransaction→ send 若干条 →abortTransaction()。 - 分别用
read_uncommitted与read_committedconsumer 从 earliest 消费。
预期:uncommitted 可见 abort 前写入(或 marker 行为以版本文档为准);committed 不可见 abort 事务内业务消息。
实验 C:commit 后可见
- 同上,但
commitTransaction()。 read_committedconsumer 应交付 事务内消息。
实验 D:Fencing
- 两个进程使用 相同
transactional.id,先后initTransactions()。 - 后启动者应成功;先启动者继续
send应得 ProducerFencedException。
实验 E:open 事务与 read_committed lag
- 事务 producer
beginTransaction后 send,故意不 commit,保持进程存活。 read_committedconsumer 消费同一 partition。
预期:consumer stall 在 open 事务之前的位置(或按 broker 版本阻塞策略),lag 上升;commit 后 lag 恢复。说明 长事务 会伤害下游可见性——Flink sink 必须在 checkpoint 超时前结束事务。
九、故障模式
| 现象 | 可能原因 | 方向 |
|---|---|---|
ProducerFencedException |
同 transactional.id 新 epoch | 预期 fencing;检查 Flink 并行 id |
InvalidTxnStateException |
未 init / 重复 begin | 调整 init/begin 顺序 |
TransactionCoordinatorNotFound |
TC 未就绪 / broker 滚动 | 检查 __transaction_state ISR |
| read_committed lag 高 | 上游长事务未 commit | 查 open transaction;调小 Flink checkpoint 与 sink flush |
| 重复消息 | 幂等未开或新 PID | 开 idempotence;EOS 用 transactional sink |
十、与 Kafka Streams EOS 的一行对照
Kafka Streams 应用层 EOS =
consumes + produces + offset 单事务
commit(依赖
sendOffsetsToTransaction)。Flink
= checkpoint 统一快照 + Kafka sink
事务。Streams 适合纯 Kafka
拓扑;Flink 适合异构 source/sink 与复杂
state(第 18
篇)。
十一、术语表
| 术语 | 含义 |
|---|---|
| PID | Producer ID,broker 分配的幂等标识 |
| Sequence number | 每 (PID, partition) 单调序号 |
| TransactionCoordinator | 管理 transactional.id 与事务状态的 broker 组件 |
| Commit / Abort marker | ControlBatch,定义事务可见性 |
| Fencing | 新 epoch 使旧 producer 发送失败 |
| read_committed | 只读已 commit 事务数据 |
十二、小结
幂等 producer 用 PID + sequence 消除
单 partition 重试重复;事务
producer 用 transactional.id、TC 与
commit/abort marker 实现 多 partition
原子可见 与 跨 session
fencing。Consumer 需
read_committed
才能与事务边界一致。Kafka 事务
覆盖集群内日志语义;Flink 端到端 EOS 还要把
checkpoint 与 Kafka sink
commit、以及其它 sink 的 2PC 绑在同一一致性点(第 15
篇)。
下一篇进入 Flink 运行时模型:JobManager、TaskManager、Slot 与 StreamGraph→ExecutionGraph,看 Kafka Source 的 partition split 如何变成 Slot 上的 Task(第 7 篇)。
参考资料
- Apache Kafka Documentation, Transactions(生命周期、隔离级别、CTM)。
- Apache Kafka Documentation, Producer
Configs(
enable.idempotence、transactional.id)。 - KIP-98: Exactly Once Delivery and Transactional Messaging。
- KIP-447: Producer scalability for sequential transactional writes(高吞吐事务演进,3.x 相关)。
- Apache Kafka Documentation, Design(Idempotent Producer、RecordBatch)。
- Apache Flink Documentation, Kafka Connector、Two-Phase Commit Sink。
- 本系列 第 5 篇(acks/ISR);第 10、14、15 篇(Flink EOS)。
返回 系列目录 | 上一篇:副本、ISR 与 Consumer Group | 下一篇:Flink 运行时模型
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照
从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。
【流式数据处理】交付语义:从 at-most-once 到 exactly-once
用 Source、引擎、Sink 三层模型拆解 at-most-once、at-least-once、exactly-once 的组合规则与最弱环决定律;对照 Flink checkpoint 模式、Kafka 事务与幂等 producer、重复消费/重复写入的三类修复手段,为两阶段提交 sink 铺垫。
【流式数据处理】两阶段提交与端到端 Exactly-Once
拆解 Flink GenericTwoPhaseCommitSink 协议:preCommit 进 checkpoint、commit 挂 notifyCheckpointComplete;对照 Kafka 事务 sink、JDBC 与 Iceberg 2PC 落点,以及 commit 前/后崩溃与重复 commit 的幂等边界——与 lakehouse/11 CAS、lakehouse/19 入湖侧对读,不重复表格式全文。
【流式数据处理】Kafka · Flink · 状态 · Exactly-Once
承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。