第
4 篇 把 partition 上的 LogSegment 与 offset
单调性钉住了:producer append 到 leader
replica,consumer 按 offset 顺序 fetch。单 broker
磁盘故障就会丢分区数据——副本(replication)
把同一份日志复制到多个 broker。运维调
replication.factor 和
min.insync.replicas 时,实际上在
durability、可用性、延迟
三者之间划界;另一边,consumer group 的
rebalance 会让 lag 尖刺、Flink
作业重启,却常被误当成「Kafka 变慢」。
本文讲 Kafka 3.x(KRaft) 下副本与消费组的工程语义,回答:
- Leader、Follower、ISR、HW、LEO 各自表示什么;ISR 收缩何时发生?
acks=0/1/all与min.insync.replicas组合下,producer 何时得到「已 committed」?- Consumer group 的 Range / Sticky / Cooperative 分配与 rebalance 代价?
- Auto commit offset 与 Flink checkpoint 各管什么,为什么不能混为一谈?
先修:第
4 篇 日志与分区;distributed 系列
日志复制直觉可辅助理解 ISR。事务与
read_committed 见 第
6 篇;Flink 恢复见 第 10
篇。
环境说明:本机 WSL2(Linux 6.6.87.2)、i9-12900K / 32 GiB,未在本写作环境运行多 broker Kafka 集群。机制来自 Apache Kafka Documentation(Replication、Consumer、Configuration)与 KIP-345(Static Membership)、KIP-429(Cooperative Rebalance);不粘贴未执行的 broker 日志或 lag 数值。文末给出可复现实验步骤。
版本锚定:Apache Kafka 3.x,KRaft 模式。副本协议与 ZK 时代一致;Controller 在 KRaft 下通过 Raft 日志更新 leader/ISR(来源:KRaft)。
一、Leader、Follower 与 ISR
每个 partition 有 \(R\) 个
replica(replication.factor = R),分布在不同
broker。同一时刻其中 一个 replica 为
Leader,其余为
Follower(来源:Replication)。
| 角色 | 写 | 读(默认) |
|---|---|---|
| Leader | 接受 producer append | 服务 consumer fetch |
| Follower | 从 leader 拉取复制 | 不服务普通 consumer(KRaft 下 readonly replica 等扩展另论) |
flowchart LR
PRO["Producer"] -->|append| L["Leader<br/>broker-1"]
L -->|fetch 复制| F1["Follower<br/>broker-2"]
L -->|fetch 复制| F2["Follower<br/>broker-3"]
CON["Consumer"] -->|fetch| L
ISR(In-Sync Replicas):与 leader
日志差距在阈值内 的 replica 集合(由 broker
端 replica.lag.time.max.ms 等判定;具体
follower 是否「in sync」由 leader 维护并上报
controller)。ISR 写入元数据(KRaft
metadata.log),是 committed
边界 的计算基础。
1.1 LEO 与 HW
对任意 replica,定义(来源:Replication,Log):
| 缩写 | 全称 | 含义 |
|---|---|---|
| LEO | Log End Offset | 下一条待写入 offset(= 已有最大 offset + 1) |
| HW | High Watermark | 对消费者可见 的最大 offset + 1;即 committed 上界 |
Leader 自身 LEO 随 append 增长。Follower 通过 fetch 追 leader;leader 根据 ISR 内各 replica 的 LEO 计算 HW:
\[\text{HW} = \min_{\text{replica} \in \text{ISR}} (\text{LEO}_\text{replica})\]
(表述为工程直觉:HW 之前的数据已在 ISR
内全部副本上存在;精确实现见 LeaderEpoch 与
delayed operation 文档。)
Consumer 只能读到 HW 之前的消息(普通 fetch);HW 之后 leader 已收到但尚未「提交」的数据,对 consumer 不可见。
1.2 ISR 收缩与扩张
Follower 落后超过
replica.lag.time.max.ms(默认
30s,可配)时,leader 将其 移出 ISR。ISR
缩小后:
- 若仍满足
min.insync.replicas,写入可继续; - 若 ISR 大小 <
min.insync.replicas,且 producer 用
acks=all,produce 会失败(NOT_ENOUGH_REPLICAS)。
Follower 追平后 重新加入 ISR(由 leader 检测并上报 controller)。ISR 变更通过 LeaderAndIsr 请求下发到相关 broker(KRaft controller 路径)。
Unclean leader
election(unclean.leader.election.enable,现代默认
false):ISR 全空时是否允许 非
ISR 的 stale follower 当选 leader。允许则可能
丢 committed 数据;禁止则 partition
不可用 直到原 ISR 成员恢复——生产环境保持
false(来源:Replication,Operating
Kafka)。
二、Producer
acks 与 Durability
Producer 配置
acks(来源:Producer
API,Durability)决定 broker 何时应答:
acks |
Leader 行为 | 丢失场景 | 延迟 |
|---|---|---|---|
| 0 | 不等待 broker 确认 | broker 未收到即丢 | 最低 |
| 1 | leader 写入本地 log 即应答 | leader 宕机且未复制到 follower | 中 |
| all(或 -1) | leader 等待 ISR 内所有副本 都写入(至 leader HW 规则满足) | ISR 内 simultaneous failure 仍可能丢;配合
min.insync.replicas 收紧 |
最高 |
2.1
min.insync.replicas 与 topic 级覆盖
Broker 默认
min.insync.replicas(常设为
2;单节点开发集群为 1)。Topic 级
min.insync.replicas
可覆盖(kafka-configs.sh)。
当 acks=all 且 存活 ISR 数量 <
min.insync.replicas 时,producer
收到错误,不会 误以为数据已 committed。
典型生产组合(\(R=3\)):
# broker server.properties 或 topic 配置
min.insync.replicas=2
replication.factor=3
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 配合幂等,见第 6 篇
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);工程含义:至少 2 副本同步才应答,允许 1 个 broker 故障 而不丢 committed 数据(在 unclean election 关闭前提下)。
2.2
acks=1 与日志末尾丢失(Log at End of
Stream)
Leader 已应答但 尚未复制到 follower 时
leader 崩溃,新 leader 可能 不包含
该条消息——consumer 永远读不到,producer
却以为成功。金融、CDC、Flink EOS 链路应
避免 单独依赖 acks=1(第
6、14 篇)。
2.3 Follower 复制是 pull 模型
Follower 主动 Fetch leader,而非 leader
push。复制延迟受 follower 负载、网络、
num.replica.fetchers 影响。监控
UnderReplicatedPartitions、ReplicaFetcherLag
可预警 ISR 收缩。
三、Consumer Group 基础语义
Consumer 必须 属于一个
group.id(除非显式
assign 手动指定 partition)。同 group 内:
- 每条 partition 同一时刻只分配给一个 consumer 实例;
- 多实例 水平扩展消费;实例数 > partition 数 时,多余实例空闲。
这与 Flink 不同:Flink 用 算子并行度 + KeyGroup(第 7、8 篇),Kafka consumer group 是 传输层 的消费调度。
flowchart TB
T["Topic P=4"]
G["Group G1"]
C1["Consumer A<br/>P0,P1"]
C2["Consumer B<br/>P2,P3"]
T --> G
G --> C1
G --> C2
3.1 Group
Coordinator 与 __consumer_offsets
Broker 中 GroupCoordinator(按
group.id hash 选 broker)管理:
- 成员心跳(
session.timeout.ms、heartbeat.interval.ms); - 分区分配方案下发;
- Committed offset 持久化到内部 topic
__consumer_offsets(compact topic,第 4 篇 compact 语义)。
Offset 提交 不是 存在 consumer
本地;重启后通过 group.id + partition 从
__consumer_offsets 恢复。
3.2 消费进度:position vs committed offset
| 概念 | 含义 |
|---|---|
| Position | 下次 poll() 将读的下一条 offset |
| Committed offset | 已提交、失败重启后从此继续的 offset |
enable.auto.commit=true 时,客户端按间隔
自动 把 position 提交为
committed;false 时应用 手动
commitSync /
commitAsync(来源:Consumer API)。
四、分区分配与 Rebalance
当 group 成员 加入、离开、崩溃 或 订阅 topic 变化 时,触发 rebalance:撤销旧分配、重新划分 partition、下发新 assignment(来源:Consumer,Group Management)。
4.1 分配策略
| 策略 | 行为 | 适用 |
|---|---|---|
| Range(默认旧行为) | 按 topic 范围切片,可能不均衡 | 简单 topic 少 |
| RoundRobin | 轮询 partition | 需均匀 |
| Sticky | 尽量保持原分配,减少迁移 | 大 state 外部系统 |
| Cooperative Sticky(KIP-429) | 增量 rebalance,先 revoke 部分再 assign | 降低 stop-the-world |
配置示例:
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
List.of(CooperativeStickyAssignor.class.getName()));Cooperative 模式下 rebalance
分阶段:consumer 先 释放 部分 partition 并
poll(),再收到新 assignment——减少
全部分区同时停顿 的时间窗口。
4.2 Rebalance 代价
Rebalance 期间 partition 暂停消费(classic protocol 下全组 stop-the-world;cooperative 缩短但仍有窗口)。代价包括:
- Lag 尖刺:停止 pull 期间 producer 仍写;
- 重复处理:若 commit 在 rebalance 前 未完成,新 owner 可能从旧 committed offset 重读;
- Flink 作业:Kafka Source 算子重启会触发
整个 Flink 作业 与 Kafka group
交互(取决于是否共用
group.id;Flink 推荐 独立 group.id 且 offset 以 checkpoint 为准,第 10 篇)。
sequenceDiagram
participant C1 as Consumer-1
participant CO as GroupCoordinator
participant C2 as Consumer-2
Note over C2: 新成员加入
CO->>C1: Revoke partitions
CO->>C2: Join group
CO->>C1: SyncGroup 新 assignment
CO->>C2: SyncGroup 新 assignment
C1->>C1: 暂停消费
C2->>C2: 开始消费新 partition
4.3 Static Membership(KIP-345)
group.instance.id(静态成员
ID):broker 在 短暂重启 内把同一实例 ID
视为原成员,避免 立即 rebalance。适合
滚动升级 consumer、Connect
worker、短生命周期容器——与 Flink TM 重启场景相关但
不能替代 Flink checkpoint。
五、Offset 提交模式与语义
5.1 Auto commit
enable.auto.commit=true
auto.commit.interval.ms=5000
客户端在 poll() 循环中周期性提交
当前 position。若 业务已处理但尚未
commit 时崩溃,会 重复消费;若
commit 后处理失败,会
丢消息(at-most-once 倾向)。
5.2 手动 commit 与「先处理后提交」
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> rec : records) {
process(rec); // 业务逻辑
}
consumer.commitSync(); // 全部处理成功后再提交
}这是 at-least-once:崩溃后可能重复
process。Exactly-once 需 幂等
sink 或 事务(第
6、14 篇)。
5.3
isolation.level 与事务消息
默认
read_uncommitted:consumer
可见 HW 内所有消息。事务 producer 写入的 未 commit
事务 消息也会被读到。
read_committed:隐藏未
commit 事务消息;open 事务前的 abort 消息按事务协议过滤(第
6 篇)。Flink EOS Kafka Source 在 exactly-once
模式下使用 read_committed
与事务边界对齐。
六、Flink Checkpoint 与 Kafka Offset 的分工
第 10 篇 详述 barrier 快照;此处强调 与 Kafka consumer offset 的关系:
| 机制 | 存什么 | 谁用 |
|---|---|---|
Kafka __consumer_offsets |
group 的 committed offset | 原生 consumer、监控 lag |
| Flink checkpoint | Source operator state(各 partition offset + metadata) | Flink 故障恢复 |
Flink 可选
setCommitOffsetsOnCheckpoints |
是否把 checkpoint offset 同步 commit 到 Kafka | 外部工具读 lag |
Flink 容错不依赖 Kafka broker 上的
committed offset:恢复时读 最近成功
checkpoint 里的 split state。若开启向 Kafka
commit,只为
运维可见性(kafka-consumer-groups.sh --describe
与 Flink 进度一致)。
不要 让外部 consumer 与 Flink 作业
共用同一 group.id 且假设 offset
一致——Flink 重启与 rebalance 语义独立。
Kafka Connect / Debezium 的 offset 在 独立 compact topic(第 16 篇),与 Flink 无直接共用。
6.1
KafkaSource 与旧
FlinkKafkaConsumer 的 offset 存储
Flink 1.14+ KafkaSource
把每个 partition 的 offset 存为 Operator
State(split enumerator + reader
state),checkpoint 时由 JM 持久化到
分布式存储(state.checkpoints.dir),不是
__consumer_offsets 的主路径。旧
FlinkKafkaConsumer 同样以
Flink state 为准,可选
setCommitOffsetsOnCheckpoints(true) 同步到
Kafka。
| 组件 | 恢复真相 | Lag 监控 |
|---|---|---|
| 原生 consumer | __consumer_offsets |
kafka-consumer-groups.sh |
| Flink Kafka Source | Checkpoint / Savepoint | Flink metrics + 可选 Kafka commit |
| Kafka Connect | Connect offset topic | Connect REST / JMX |
作业 cancel 不带 savepoint 后重启,若未配置 retained checkpoint,只能从 策略指定的起始位点(earliest/latest/group)重新消费——与 Kafka group committed offset 无关。
6.2 为何 Flink 默认不向 Kafka 频繁 commit offset
每次 checkpoint 都 commitSync 到
__consumer_offsets 会:
- 增加 broker 写负载(compact topic 也占 ISR);
- 让 外部 consumer 误以为 可安全从该 offset 接力,而 Flink state 可能尚未对齐(checkpoint 未完成时);
- 在 exactly-once 下与 事务 offset 语义混淆(第 6 篇)。
因此生产 Flink 作业:监控 lag 用 Flink REST / Prometheus;若必须用 Kafka lag,再显式开启 commit on checkpoint 并 文档化 外部不得共用 group。
七、Lag、心跳与监控入口
Consumer lag = LEO − consumer committed offset(按 partition 计)。Lag 持续上升表示 消费慢于生产 或 rebalance 停顿。
常用排查(命令在官方文档;不在此伪造输出):
# 查看 group lag 与 partition 分配
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group
# 查看 ISR 与 leader
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic my-topic| 指标 / 现象 | 可能原因 |
|---|---|
| 单 partition lag 高 | 热点 key、该 partition 数据倾斜 |
| 全 partition lag 尖刺 | rebalance、consumer 全停 |
| UnderReplicatedPartitions | follower 落后、ISR 收缩 |
| NOT_ENOUGH_REPLICAS | ISR < min.insync.replicas 仍写 acks=all |
Flink 背压导致 Source 不 poll,lag 也会升——需区分 传输层 与 计算层(第 18 篇)。
7.1
__consumer_offsets 本身也是 Kafka topic
__consumer_offsets 使用
compact 策略(第
4 篇):key 为 (group, topic, partition)
编码,value 为 committed metadata。RF 与 min
ISR 应与其他关键 topic 同级——该 topic 不可用会导致
全集群 consumer 无法 commit offset,但
不影响 已 commit offset 的读取路径(直到
compact 损坏等极端情况)。
八、Classic 与 Cooperative Consumer 协议对照
| 维度 | Classic(Eager) | Cooperative(Incremental) |
|---|---|---|
| Revoke 时机 | 一次 revoke 全部 partition | 分多轮 revoke 子集 |
| Stop-the-world | 全组暂停消费窗口较长 | 缩短,未 revoke 的 partition 可继续 poll |
| Assignor | Range、RoundRobin、Sticky | CooperativeStickyAssignor |
| 客户端要求 | 旧版即可 | 需支持 incremental protocol 的 consumer 版本 |
升级路径:组内 所有 consumer 必须支持 cooperative,否则回退 classic。Flink Kafka connector 版本需与 broker 匹配(见 Flink 发行说明与 Kafka 兼容性矩阵)。
Rebalance listener 可在
onPartitionsRevoked 中 同步
commit 或 flush 外部 store,减少
重复消费窗口——Flink 内部 Source
算子有自己的 checkpoint barrier,不依赖 listener,但
原生 consumer 应用 应在 revoke 回调里
commit(来源:Consumer API,Rebalance
Listener)。
九、本地复现实验(步骤级)
实验 A:ISR 收缩(需 ≥2 broker 或 min ISR=1 的单节点模拟;以下按官方多 broker KRaft 文档搭建):
- 创建
replication.factor=3、min.insync.replicas=2的 topic。 acks=all生产消息,确认成功。- 停止一个 follower broker
或人为阻塞网络,直至该 replica 移出 ISR(观察 metrics 或
kafka-topics.sh --describe的 Isr 列表变短)。 - 继续 produce:若 ISR 仍 ≥ min ISR,应成功;若 ISR 不足,应失败。
- 恢复 follower,观察重新入 ISR 与 HW 推进。
实验 B:Rebalance 与 lag
- 启动 2 个 console consumer 同一
group.id,topicP=4。 - 持续生产,记录 lag。
- 启动第 3 个 consumer,触发 rebalance;观察短时间 lag 上升与 partition 迁移。
- 改用
CooperativeStickyAssignor+ static instance id,重复滚动重启,对比 rebalance 次数(从 broker/coordinator 日志统计,不伪造具体毫秒)。
实验 C:Auto commit vs 手动
- 两程序相同
group.id不同enable.auto.commit,人工在process中 sleep 后 kill -9。 - 重启后对比 重复消息条数,验证 at-least-once 边界。
十、与分布式日志复制的对照
distributed 系列 讲 Quorum、leader 选举、日志匹配;Kafka ISR 是 工程化折中:
- 不是 每条写入都跑 Paxos;
- HW + ISR 定义 committed,follower pull 复制;
- Leader epoch(KIP-101)防止 split-brain 下 offset 错位。
读 ISR 时带入 「可见性上界 = HW」 即可对接 formal 日志复制直觉,无需重新证明 safety。
十一、生产清单
| 检查项 | 建议 |
|---|---|
replication.factor |
生产 ≥ 3 |
min.insync.replicas |
与 RF 配合,常见 RF=3、min ISR=2 |
unclean.leader.election.enable |
false |
Producer acks |
all + 幂等(第 6 篇) |
| Consumer rebalance | Cooperative Sticky;Connect/Flink 用独立 group |
| Flink 作业 | offset 以 checkpoint 为准;谨慎
commitOffsetsOnCheckpoints |
十二、术语表
| 术语 | 含义 |
|---|---|
| ISR | 与 leader 同步的 replica 集合 |
| HW | High Watermark,consumer 可见上界 |
| LEO | Log End Offset |
| GroupCoordinator | 管理 consumer group 与 offset 的 broker 角色 |
| Rebalance | 组内 partition 分配重算 |
| Cooperative rebalance | 增量 revoke/assign,缩短全停窗口 |
| Static membership | group.instance.id 减少短暂重启
rebalance |
十三、小结
Kafka 副本通过 leader + ISR + HW 定义
committed:producer acks=all 与
min.insync.replicas 共同约束
durability;consumer 只能读 HW 以内。Consumer group 把
partition 独占分配 给组内成员,rebalance
带来停顿与 lag 尖刺,Cooperative 与 static membership
可缓解。Offset 提交持久化在
__consumer_offsets;Flink
作业则以 checkpoint 中的 source state
为恢复真相,两者分工不同。
下一篇进入 Kafka 事务与幂等
Producer:pid 与 sequence
去重、事务生命周期、__transaction_state 与
read_committed,以及和 Flink EOS 的衔接点。
参考资料
- Apache Kafka Documentation, Replication(Leader/Follower、HW、ISR)。
- Apache Kafka Documentation, Consumer(Group management、offset commit、assignors)。
- Apache Kafka Producer
Configuration(
acks、enable.idempotence)。 - KIP-345: Static Membership for Kafka Consumer Groups。
- KIP-429: Kafka Consumer Incremental Rebalance Protocol。
- KIP-101: Alter Replication Protocol to use Leader Epoch。
- Apache Kafka Documentation, KRaft(Controller 与 LeaderAndIsr)。
- 本系列 第 4 篇;第 10 篇(Flink offset);第 16 篇(Connect offset topic)。
返回 系列目录 | 上一篇:Kafka 日志模型与分区 | 下一篇:Kafka 事务与幂等 Producer
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】流处理全景:从日志到有状态计算
从批、流、微批四维度对比出发,建立「可重放日志 + 有状态计算」心智模型,厘清 Lambda/Kappa 边界与流表对偶,并给出与 lakehouse 入湖侧对称的全系列地图。
【流式数据处理】Kafka 日志模型与分区
从 Topic、Partition 到 Log Segment 的 .log/.index/.timeindex 文件布局,讲清 offset 单调性、分区内有序、顺序写与 sendfile 读路径,以及 Kafka 3.x KRaft 模式下元数据与日志目录的分工,为副本与 consumer 语义打底。
【流式数据处理】Kafka 事务与幂等 Producer
从幂等 producer 的 PID 与 sequence 去重,到事务 producer 的 init/begin/commit/abort 生命周期、__transaction_state 与 read_committed 隔离,讲清 Kafka 3.x 单集群 EOS 边界及其与 Flink checkpoint 的衔接。
【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照
从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。