土法炼钢兴趣小组的算法知识备份

【流式数据处理】Kafka 事务与幂等 Producer

文章导航

分类入口
databasedistributed
标签入口
#kafka#transactions#idempotent-producer#pid#sequence#transactional-id#read-committed#exactly-once#two-phase-commit

目录

第 5 篇 把 ISR、HW 与 consumer group offset 讲清楚了:acks=all 保证副本 committed,但 producer 重试 仍可能在日志里留下 重复 batch——网络超时后客户端不知道 broker 是否已写入,再次发送即两条相同 offset 不同的记录。Flink exactly-once 要求 Kafka → 计算 → Kafka 链路无重复无丢失(第 14、15 篇),Kafka 侧必须提供 幂等写入跨 partition 原子提交。Kafka 0.11+ 的 幂等 producer事务 producer(KIP-98 等)就是为此设计的。

本文讲 Apache Kafka 3.x(KRaft) 下事务语义,回答:

先修:第 4 篇 RecordBatch;第 5 篇 HW 与 offset。Flink 侧 Kafka sink 2PC 见 第 15 篇

环境说明:本机 WSL2(Linux 6.6.87.2)、i9-12900K / 32 GiB,未在本写作环境运行 Kafka 事务实验。机制来自 Apache Kafka Documentation(TransactionsProducer Configs)、KIP-98、KIP-447(增量式事务);不粘贴未执行的 consume 输出或 transaction marker 十六进制 dump。文末给出可复现步骤与应观测行为。

版本锚定:Kafka 3.x。事务协调者(TransactionCoordinator)与 KRaft controller 共存于 broker 进程模型;事务协议与 ZK 时代一致,元数据由 KRaft 管理(来源:TransactionsKRaft)。


一、为什么需要幂等与事务

1.1 至少一次发送的重复问题

Producer 默认 至少一次retries > 0):send() 后未收到 ack,客户端重试。Broker 可能 已写入第一次,第二次又 append → 重复消息(不同 offset,相同 payload)。

下游 无状态 consumer 可能无所谓;Flink keyed state 会把重复事件算两次(第 9 篇)。修复路径:

层级 手段
Kafka broker 幂等 producer 去重
Kafka 多 partition 事务原子 commit
Flink 引擎 checkpoint + 2PC sink(第 15 篇
业务 sink 幂等键、UPSERT

1.2 三层语义组合(预览)

端到端 exactly-once = Source 语义 × 引擎语义 × Sink 语义 的最弱环(第 14 篇)。本篇覆盖 Kafka Producer/Sink 作为 Source/Sink 时 的单集群保证;跨 JDBC、Iceberg 需 Flink 2PC 配合。


二、幂等 Producer

启用 enable.idempotence=true(Kafka 2.0+;与 acks=all、适当 retries 联动,来源:Producer Configs)时,broker 为 producer 分配 Producer ID(PID),每个 (PID, partition) 维护 sequence number

2.1 去重规则

Leader broker 收到 batch 时检查(来源:DesignIdempotent Producer):

  1. 若 sequence 等于 期望的下一个序号 → 正常 append,序号 +1;
  2. 小于 期望(重复)→ 丢弃,仍返回 success(客户端认为已写入);
  3. 大于 期望(out-of-order gap)→ 抛出 OutOfOrderSequenceException,通常需重置 producer。

因此 单 partition 内 重试不会产生重复 offset;跨 partition 仍无原子性——一条成功一条失败时,consumer 可能只看到部分。

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 隐式设置 acks=all, retries=MAX, max.in.flight.requests.per.connection<=5(版本相关)

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", key, value));
producer.flush();
producer.close();

2.2 PID 生命周期

PID 由 TransactionCoordinator 分配(即使不开事务,幂等路径也走 TC 的 PID 池)。Producer 重启 后 PID 变化,去重窗口重置——幂等只保证 同一 producer 实例生命周期内 的重试安全。

transactional.id 未设置时,实例重启 不能 关联旧 PID。要 跨 session fencing,必须启用事务 ID(第三节)。

2.3 与 max.in.flight.requests.per.connection

幂等模式下 in-flight 请求数受限(默认 5),保证 broker 端 sequence 检测有意义。过大 in-flight + 非幂等曾导致 reorder;幂等路径下 broker 强制顺序语义。

2.4 幂等与事务的配置联动

开启 enable.idempotence=true 时,客户端自动设置(来源:Producer Configs,Idempotence):

配置 幂等下的典型值 原因
acks all 与 ISR committed 一致
retries Integer.MAX_VALUE 安全重试
max.in.flight.requests.per.connection \(\leq 5\) 保证 sequence 可判定

显式设置冲突值(如 acks=1)会导致 ConfigException 或自动覆盖——以客户端版本文档为准。仅幂等、transactional.id 时,不能 调用 beginTransaction()


三、事务 Producer

事务 在幂等之上增加:一组 partition 上的消息要么全部 commit 可见,要么全部 abort 不可见(consumer read_committed 视角)。核心 API(Java,KafkaProducer):

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-txn-app-1");

producer.initTransactions();   // 注册 transactional.id,可能 fencing 旧实例

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("out", "k1", "v1"));
    producer.send(new ProducerRecord<>("out", "k2", "v2"));
    producer.commitTransaction();  // 原子标记 commit
} catch (Exception e) {
    producer.abortTransaction();
}

3.1 生命周期状态机

stateDiagram-v2
  [*] --> Uninitialized: new producer
  Uninitialized --> Ready: initTransactions OK
  Ready --> InTransaction: beginTransaction
  InTransaction --> Ready: commitTransaction
  InTransaction --> Ready: abortTransaction
  Ready --> FatalError: unrecoverable error
阶段 作用
initTransactions() 向 TransactionCoordinator 注册 transactional.id;完成 PID 分配;fencing 同一 transactional.id 的旧 producer epoch
beginTransaction() 开始新事务,后续 send 带 transaction marker
commitTransaction() commit marker;消息对 read_committed 可见
abortTransaction() abort marker;未 commit 数据对 read_committed 不可见

来源:Kafka Documentation,Transactions;协议层 Transaction Markers

3.2 transactional.id 与 Fencing

transactional.id逻辑 producer 身份(字符串,用户指定)。新实例 initTransactions() 时,TC 把该 ID 的 producer epoch 递增;旧实例再 send 会得到 ProducerFencedException——防止 僵尸 producer 在 failover 后双写。

Flink Kafka Sink 在 exactly-once 模式下为每个 subtask 或作业分配 唯一 transactional.id 前缀(实现见 Flink Kafka connector 文档),与 checkpoint epoch 对齐(第 15 篇)。

3.3 内部 Topic:__transaction_state

TransactionCoordinator 把事务元数据持久化在内部 topic __transaction_state(compact,第 4 篇)。内容包括 transactional.id、PID、epoch、事务状态(Ongoing / PrepareCommit / CompleteCommit / PrepareAbort / CompleteAbort 等)。

TC 本身由 transaction.state.log.replication.factor 等配置保护;生产应与业务 topic 同级副本策略。

3.4 Transaction Marker 与 HW

事务消息先作为 普通 batch 写入 partition log(对 read_uncommitted 可见);commitTransaction 时写入 ControlBatch(commit/abort marker)。Consumer read_committed 在 HW 内 过滤 未 commit 事务的数据,仅交付已 commit 事务内的 record + 非事务 record。

Abort 的消息仍占 disk offset,但对 committed 隔离不可见——存储与可见性分离。

3.5 事务状态在 TC 侧的存储

TransactionCoordinator 对每个 ongoing 事务 维护状态(来源:Transactions 实现说明)。简化状态转移:

stateDiagram-v2
  Empty --> Ongoing: beginTransaction
  Ongoing --> PrepareCommit: commitTransaction 开始
  PrepareCommit --> CompleteCommit: 所有 partition marker 写入
  Ongoing --> PrepareAbort: abortTransaction
  PrepareAbort --> CompleteAbort: abort marker 完成
  CompleteCommit --> Empty: 清理
  CompleteAbort --> Empty: 清理

PrepareCommit 阶段需向事务涉及的 所有 partition leader 写入 commit marker;任一 partition 不可用会导致 commit 阻塞或超时。这与 多 partition 原子性 的成本一致:事务跨度越大,失败面越大。

__transaction_state topic 的 RF 不足时,TC 可能无法持久化 epoch——整个集群事务不可用,而普通 produce 仍可能工作。运维监控应包含 TransactionCoordinatorLoad__transaction_state under-replicated


四、Consumer 隔离级别

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
isolation.level 行为
read_uncommitted(默认) 返回 HW 内所有消息,含 open / aborted 事务数据
read_committed 跳过 aborted 事务;open 事务数据 阻塞 交付直到 commit 或 abort(可能增加 lag)

Flink exactly-once Kafka Source 使用 read_committed,避免读到 未 commit 的下游 sink 回写中间事务态(与 Flink checkpoint 边界对齐,第 10 篇)。

4.1 consume-transform-produce 模式

Kafka 支持 同一 transactional.id 下 consumer + producer 原子sendOffsetsToTransaction 把 consumer offset 与产出消息 同一事务 commit(来源:TransactionsKafka Streams EOS)。Flink 使用该 API 作为主路径,而用 checkpoint 存 offset;Kafka Streams 的 EOS 依赖此 API(第 18 篇 对照)。

// Kafka Streams / 手写 EOS 消费循环示意(非 Flink 主路径)
consumer.subscribe(List.of("input"));
producer.initTransactions();

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) continue;
    producer.beginTransaction();
    for (ConsumerRecord<String, String> rec : records) {
        producer.send(new ProducerRecord<>("output", transform(rec)));
    }
    Map<TopicPartition, OffsetAndMetadata> offsets = computeOffsets(records);
    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
    producer.commitTransaction();
}

五、Kafka EOS 的边界

5.1 单集群、单事务内保证什么

保证 条件
单 partition 无重复 enable.idempotence=true
多 partition 原子可见 事务 commit + read_committed consumer
跨 session fencing transactional.id + initTransactions
持久化 acks=all + 足够 ISR(第 5 篇

5.2 不保证什么

5.3 事务超时与 max.block.ms

长时间 不 commit 的 open 事务会阻塞 read_committed consumer 在该 partition 上的进度(等待 marker)。Flink checkpoint 完成前 Kafka sink 可能处于 pre-commit 态——connector 设计必须 在超时前 commit 或 abort(第 15 篇)。


sequenceDiagram
  participant F as Flink JobManager
  participant TM as TaskManager
  participant KS as Kafka Source
  participant KK as Kafka Sink TX
  participant KB as Kafka Broker
  F->>TM: 触发 checkpoint N
  TM->>KS: barrier 对齐 snapshot offset
  TM->>KK: pre-commit 事务 N
  TM->>F: ack checkpoint N
  F->>F: checkpoint N 完成
  TM->>KK: commitTransaction N
  KK->>KB: commit markers
  Note over KS,KB: read_committed 仅见 committed 事务

要点(来源:Flink Documentation,Kafka ConnectorTwo-Phase Commit):

  1. Source:offset 存在 Flink checkpoint;consumer read_committed;不依赖 sendOffsetsToTransaction 为主路径。
  2. SinkKafkaSink exactly-once 使用 事务 producer;checkpoint notifyCheckpointCompletecommitTransaction;失败则 abort,下一 checkpoint 用新 epoch 重试。
  3. Transactional.id 必须 唯一且稳定(常含 job.id、subtask index);作业升级需注意 fencing(第 11 篇)。
  4. 端到端 EOS 还需 checkpoint 模式 EXACTLY_ONCE + sink 2PC 实现完整(第 14、15 篇)。

lakehouse 第 19 章 衔接:Iceberg Flink sink 的 2PC commit 与 Kafka 事务 独立;全局 EOS 由 Flink 在 同一 checkpoint 协调多个 sink 的 pre-commit/commit(第 17 篇)。

6.1 Checkpoint 与 Kafka 事务的两阶段对应

Flink checkpoint 阶段 Kafka transactional sink 典型动作
Snapshot 开始 / barrier 对齐 继续缓冲或 beginTransaction(实现版本相关)
snapshotState / pre-commit 写入 batch 至 transaction,未 commit
JM 确认 checkpoint 成功 commitTransaction()
Checkpoint 失败 / 作业 fail abortTransaction(),consumer 不可见

Flink Kafka SinkEXACTLY_ONCE 下要求 delivery.guarantee=EXACTLY_ONCE(Flink 1.15+ 统一 Sink API 配置键,以 connector 文档为准)。Semanticcheckpoint 模式 必须同时为 EXACTLY_ONCE,否则退化为 at-least-once。

6.2 transactional.id 命名与并行度变更

推荐模式:{prefix}-{jobId}-{subtaskIndex} 或 connector 生成的 UUID 稳定前缀。Rescale改并行度 后 subtask index 变化 → 新 transactional.id → 旧 open 事务可能被 abortfencing;从 savepoint 恢复时需确认 connector 的 id 生成规则与 状态兼容第 11 篇)。


七、配置参考

7.1 Producer

配置 作用
enable.idempotence 开启 PID + sequence
transactional.id 事务身份;启用事务必配
transaction.timeout.ms 事务最大存活
acks 幂等/事务要求 all
max.in.flight.requests.per.connection 幂等下受限

7.2 Broker

配置 作用
transaction.state.log.replication.factor __transaction_state 副本数
transaction.state.log.min.isr TC 日志 min ISR
transactions.max.timeout.ms 允许的最大 producer 事务超时

7.3 Consumer

配置 作用
isolation.level read_committed / read_uncommitted
enable.auto.commit EOS 消费循环通常 false(手动与事务 offset 绑定)

7.4 与 Connect / MirrorMaker 的边界

Kafka Connect sink connector 若声明 exactly-once,底层同样依赖 事务 producer幂等 + 下游 dedup(视 connector 文档)。MirrorMaker 2 跨集群复制 提供单全局事务——目标集群是 独立 EOS 域。设计 多集群灾备 时勿假设 Kafka 事务跨集群原子。


八、本地复现(行为级,无伪造输出)

实验 A:幂等 vs 非幂等

  1. KRaft 单节点启动(第 4 篇 第七节步骤)。
  2. 编写 producer:acks=all,模拟 send 后不确认、强制重试同一 batch(或断网重试)。
  3. 对比 enable.idempotence=true/false,用 console consumer read_uncommitted 统计 相同 key/value 条数

预期:幂等路径 仅一条;非幂等 多条

实验 B:事务 abort 与 read_committed

  1. 创建 topic txn-demo
  2. 事务 producer:beginTransaction → send 若干条 → abortTransaction()
  3. 分别用 read_uncommittedread_committed consumer 从 earliest 消费。

预期:uncommitted 可见 abort 前写入(或 marker 行为以版本文档为准);committed 不可见 abort 事务内业务消息。

实验 C:commit 后可见

  1. 同上,但 commitTransaction()
  2. read_committed consumer 应交付 事务内消息。

实验 D:Fencing

  1. 两个进程使用 相同 transactional.id,先后 initTransactions()
  2. 后启动者应成功;先启动者继续 send 应得 ProducerFencedException

实验 E:open 事务与 read_committed lag

  1. 事务 producer beginTransaction 后 send,故意不 commit,保持进程存活。
  2. read_committed consumer 消费同一 partition。

预期:consumer stall 在 open 事务之前的位置(或按 broker 版本阻塞策略),lag 上升;commit 后 lag 恢复。说明 长事务 会伤害下游可见性——Flink sink 必须在 checkpoint 超时前结束事务。


九、故障模式

现象 可能原因 方向
ProducerFencedException 同 transactional.id 新 epoch 预期 fencing;检查 Flink 并行 id
InvalidTxnStateException 未 init / 重复 begin 调整 init/begin 顺序
TransactionCoordinatorNotFound TC 未就绪 / broker 滚动 检查 __transaction_state ISR
read_committed lag 高 上游长事务未 commit 查 open transaction;调小 Flink checkpoint 与 sink flush
重复消息 幂等未开或新 PID 开 idempotence;EOS 用 transactional sink

十、与 Kafka Streams EOS 的一行对照

Kafka Streams 应用层 EOS = consumes + produces + offset 单事务 commit(依赖 sendOffsetsToTransaction)。Flink = checkpoint 统一快照 + Kafka sink 事务。Streams 适合纯 Kafka 拓扑;Flink 适合异构 source/sink 与复杂 state(第 18 篇)。


十一、术语表

术语 含义
PID Producer ID,broker 分配的幂等标识
Sequence number 每 (PID, partition) 单调序号
TransactionCoordinator 管理 transactional.id 与事务状态的 broker 组件
Commit / Abort marker ControlBatch,定义事务可见性
Fencing 新 epoch 使旧 producer 发送失败
read_committed 只读已 commit 事务数据

十二、小结

幂等 producer 用 PID + sequence 消除 单 partition 重试重复事务 producertransactional.id、TC 与 commit/abort marker 实现 多 partition 原子可见跨 session fencing。Consumer 需 read_committed 才能与事务边界一致。Kafka 事务 覆盖集群内日志语义;Flink 端到端 EOS 还要把 checkpointKafka sink commit、以及其它 sink 的 2PC 绑在同一一致性点(第 15 篇)。

下一篇进入 Flink 运行时模型:JobManager、TaskManager、Slot 与 StreamGraph→ExecutionGraph,看 Kafka Source 的 partition split 如何变成 Slot 上的 Task(第 7 篇)。


参考资料

  1. Apache Kafka Documentation, Transactions(生命周期、隔离级别、CTM)。
  2. Apache Kafka Documentation, Producer Configsenable.idempotencetransactional.id)。
  3. KIP-98: Exactly Once Delivery and Transactional Messaging
  4. KIP-447: Producer scalability for sequential transactional writes(高吞吐事务演进,3.x 相关)。
  5. Apache Kafka Documentation, Design(Idempotent Producer、RecordBatch)。
  6. Apache Flink Documentation, Kafka ConnectorTwo-Phase Commit Sink
  7. 本系列 第 5 篇(acks/ISR);第 10、14、15 篇(Flink EOS)。

返回 系列目录 | 上一篇:副本、ISR 与 Consumer Group | 下一篇:Flink 运行时模型

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照

从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。

2026-07-01 · database / distributed

【流式数据处理】交付语义:从 at-most-once 到 exactly-once

用 Source、引擎、Sink 三层模型拆解 at-most-once、at-least-once、exactly-once 的组合规则与最弱环决定律;对照 Flink checkpoint 模式、Kafka 事务与幂等 producer、重复消费/重复写入的三类修复手段,为两阶段提交 sink 铺垫。

2026-07-01 · database / distributed

【流式数据处理】两阶段提交与端到端 Exactly-Once

拆解 Flink GenericTwoPhaseCommitSink 协议:preCommit 进 checkpoint、commit 挂 notifyCheckpointComplete;对照 Kafka 事务 sink、JDBC 与 Iceberg 2PC 落点,以及 commit 前/后崩溃与重复 commit 的幂等边界——与 lakehouse/11 CAS、lakehouse/19 入湖侧对读,不重复表格式全文。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。


By .