土法炼钢兴趣小组的算法知识备份

【流式数据处理】副本、ISR 与 Consumer Group

文章导航

分类入口
databasedistributed
标签入口
#kafka#replication#isr#hw#leo#consumer-group#rebalance#acks#offset-commit#min-insync-replicas

目录

第 4 篇 把 partition 上的 LogSegment 与 offset 单调性钉住了:producer append 到 leader replica,consumer 按 offset 顺序 fetch。单 broker 磁盘故障就会丢分区数据——副本(replication) 把同一份日志复制到多个 broker。运维调 replication.factormin.insync.replicas 时,实际上在 durability、可用性、延迟 三者之间划界;另一边,consumer group 的 rebalance 会让 lag 尖刺、Flink 作业重启,却常被误当成「Kafka 变慢」。

本文讲 Kafka 3.x(KRaft) 下副本与消费组的工程语义,回答:

先修:第 4 篇 日志与分区;distributed 系列 日志复制直觉可辅助理解 ISR。事务与 read_committed第 6 篇;Flink 恢复见 第 10 篇

环境说明:本机 WSL2(Linux 6.6.87.2)、i9-12900K / 32 GiB,未在本写作环境运行多 broker Kafka 集群。机制来自 Apache Kafka Documentation(ReplicationConsumerConfiguration)与 KIP-345(Static Membership)、KIP-429(Cooperative Rebalance);不粘贴未执行的 broker 日志或 lag 数值。文末给出可复现实验步骤。

版本锚定:Apache Kafka 3.x,KRaft 模式。副本协议与 ZK 时代一致;Controller 在 KRaft 下通过 Raft 日志更新 leader/ISR(来源:KRaft)。


一、Leader、Follower 与 ISR

每个 partition 有 \(R\) 个 replicareplication.factor = R),分布在不同 broker。同一时刻其中 一个 replica 为 Leader,其余为 Follower(来源:Replication)。

角色 读(默认)
Leader 接受 producer append 服务 consumer fetch
Follower 从 leader 拉取复制 不服务普通 consumer(KRaft 下 readonly replica 等扩展另论)
flowchart LR
  PRO["Producer"] -->|append| L["Leader<br/>broker-1"]
  L -->|fetch 复制| F1["Follower<br/>broker-2"]
  L -->|fetch 复制| F2["Follower<br/>broker-3"]
  CON["Consumer"] -->|fetch| L

ISR(In-Sync Replicas):与 leader 日志差距在阈值内 的 replica 集合(由 broker 端 replica.lag.time.max.ms 等判定;具体 follower 是否「in sync」由 leader 维护并上报 controller)。ISR 写入元数据(KRaft metadata.log),是 committed 边界 的计算基础。

1.1 LEO 与 HW

对任意 replica,定义(来源:ReplicationLog):

缩写 全称 含义
LEO Log End Offset 下一条待写入 offset(= 已有最大 offset + 1)
HW High Watermark 对消费者可见 的最大 offset + 1;即 committed 上界

Leader 自身 LEO 随 append 增长。Follower 通过 fetch 追 leader;leader 根据 ISR 内各 replica 的 LEO 计算 HW

\[\text{HW} = \min_{\text{replica} \in \text{ISR}} (\text{LEO}_\text{replica})\]

(表述为工程直觉:HW 之前的数据已在 ISR 内全部副本上存在;精确实现见 LeaderEpoch 与 delayed operation 文档。)

Consumer 只能读到 HW 之前的消息(普通 fetch);HW 之后 leader 已收到但尚未「提交」的数据,对 consumer 不可见

1.2 ISR 收缩与扩张

Follower 落后超过 replica.lag.time.max.ms(默认 30s,可配)时,leader 将其 移出 ISR。ISR 缩小后:

Follower 追平后 重新加入 ISR(由 leader 检测并上报 controller)。ISR 变更通过 LeaderAndIsr 请求下发到相关 broker(KRaft controller 路径)。

Unclean leader electionunclean.leader.election.enable,现代默认 false):ISR 全空时是否允许 非 ISR 的 stale follower 当选 leader。允许则可能 丢 committed 数据;禁止则 partition 不可用 直到原 ISR 成员恢复——生产环境保持 false(来源:ReplicationOperating Kafka)。


二、Producer acks 与 Durability

Producer 配置 acks(来源:Producer API,Durability)决定 broker 何时应答:

acks Leader 行为 丢失场景 延迟
0 不等待 broker 确认 broker 未收到即丢 最低
1 leader 写入本地 log 即应答 leader 宕机且未复制到 follower
all(或 -1 leader 等待 ISR 内所有副本 都写入(至 leader HW 规则满足) ISR 内 simultaneous failure 仍可能丢;配合 min.insync.replicas 收紧 最高

2.1 min.insync.replicas 与 topic 级覆盖

Broker 默认 min.insync.replicas(常设为 2;单节点开发集群为 1)。Topic 级 min.insync.replicas 可覆盖(kafka-configs.sh)。

acks=all存活 ISR 数量 < min.insync.replicas 时,producer 收到错误,不会 误以为数据已 committed。

典型生产组合(\(R=3\)):

# broker server.properties 或 topic 配置
min.insync.replicas=2
replication.factor=3
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 配合幂等,见第 6 篇
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

工程含义:至少 2 副本同步才应答,允许 1 个 broker 故障 而不丢 committed 数据(在 unclean election 关闭前提下)。

2.2 acks=1 与日志末尾丢失(Log at End of Stream)

Leader 已应答但 尚未复制到 follower 时 leader 崩溃,新 leader 可能 不包含 该条消息——consumer 永远读不到,producer 却以为成功。金融、CDC、Flink EOS 链路应 避免 单独依赖 acks=1第 6、14 篇)。

2.3 Follower 复制是 pull 模型

Follower 主动 Fetch leader,而非 leader push。复制延迟受 follower 负载、网络、 num.replica.fetchers 影响。监控 UnderReplicatedPartitionsReplicaFetcherLag 可预警 ISR 收缩。


三、Consumer Group 基础语义

Consumer 必须 属于一个 group.id(除非显式 assign 手动指定 partition)。同 group 内:

这与 Flink 不同:Flink 用 算子并行度 + KeyGroup第 7、8 篇),Kafka consumer group 是 传输层 的消费调度。

flowchart TB
  T["Topic P=4"]
  G["Group G1"]
  C1["Consumer A<br/>P0,P1"]
  C2["Consumer B<br/>P2,P3"]
  T --> G
  G --> C1
  G --> C2

3.1 Group Coordinator 与 __consumer_offsets

Broker 中 GroupCoordinator(按 group.id hash 选 broker)管理:

Offset 提交 不是 存在 consumer 本地;重启后通过 group.id + partition 从 __consumer_offsets 恢复。

3.2 消费进度:position vs committed offset

概念 含义
Position 下次 poll() 将读的下一条 offset
Committed offset 已提交、失败重启后从此继续的 offset

enable.auto.commit=true 时,客户端按间隔 自动 把 position 提交为 committed;false 时应用 手动 commitSync / commitAsync(来源:Consumer API)。


四、分区分配与 Rebalance

当 group 成员 加入、离开、崩溃订阅 topic 变化 时,触发 rebalance:撤销旧分配、重新划分 partition、下发新 assignment(来源:ConsumerGroup Management)。

4.1 分配策略

策略 行为 适用
Range(默认旧行为) 按 topic 范围切片,可能不均衡 简单 topic 少
RoundRobin 轮询 partition 需均匀
Sticky 尽量保持原分配,减少迁移 大 state 外部系统
Cooperative Sticky(KIP-429) 增量 rebalance,先 revoke 部分再 assign 降低 stop-the-world

配置示例:

props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    List.of(CooperativeStickyAssignor.class.getName()));

Cooperative 模式下 rebalance 分阶段:consumer 先 释放 部分 partition 并 poll(),再收到新 assignment——减少 全部分区同时停顿 的时间窗口。

4.2 Rebalance 代价

Rebalance 期间 partition 暂停消费(classic protocol 下全组 stop-the-world;cooperative 缩短但仍有窗口)。代价包括:

sequenceDiagram
  participant C1 as Consumer-1
  participant CO as GroupCoordinator
  participant C2 as Consumer-2
  Note over C2: 新成员加入
  CO->>C1: Revoke partitions
  CO->>C2: Join group
  CO->>C1: SyncGroup 新 assignment
  CO->>C2: SyncGroup 新 assignment
  C1->>C1: 暂停消费
  C2->>C2: 开始消费新 partition

4.3 Static Membership(KIP-345)

group.instance.id(静态成员 ID):broker 在 短暂重启 内把同一实例 ID 视为原成员,避免 立即 rebalance。适合 滚动升级 consumer、Connect worker、短生命周期容器——与 Flink TM 重启场景相关但 不能替代 Flink checkpoint。


五、Offset 提交模式与语义

5.1 Auto commit

enable.auto.commit=true
auto.commit.interval.ms=5000

客户端在 poll() 循环中周期性提交 当前 position。若 业务已处理但尚未 commit 时崩溃,会 重复消费;若 commit 后处理失败,会 丢消息(at-most-once 倾向)。

5.2 手动 commit 与「先处理后提交」

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> rec : records) {
        process(rec);  // 业务逻辑
    }
    consumer.commitSync();  // 全部处理成功后再提交
}

这是 at-least-once:崩溃后可能重复 process。Exactly-once 需 幂等 sink事务第 6、14 篇)。

5.3 isolation.level 与事务消息

默认 read_uncommitted:consumer 可见 HW 内所有消息。事务 producer 写入的 未 commit 事务 消息也会被读到。

read_committed:隐藏未 commit 事务消息;open 事务前的 abort 消息按事务协议过滤(第 6 篇)。Flink EOS Kafka Source 在 exactly-once 模式下使用 read_committed 与事务边界对齐。


第 10 篇 详述 barrier 快照;此处强调 与 Kafka consumer offset 的关系

机制 存什么 谁用
Kafka __consumer_offsets group 的 committed offset 原生 consumer、监控 lag
Flink checkpoint Source operator state(各 partition offset + metadata) Flink 故障恢复
Flink 可选 setCommitOffsetsOnCheckpoints 是否把 checkpoint offset 同步 commit 到 Kafka 外部工具读 lag

Flink 容错不依赖 Kafka broker 上的 committed offset:恢复时读 最近成功 checkpoint 里的 split state。若开启向 Kafka commit,只为 运维可见性kafka-consumer-groups.sh --describe 与 Flink 进度一致)。

不要 让外部 consumer 与 Flink 作业 共用同一 group.id 且假设 offset 一致——Flink 重启与 rebalance 语义独立。

Kafka Connect / Debezium 的 offset 在 独立 compact topic第 16 篇),与 Flink 无直接共用。

6.1 KafkaSource 与旧 FlinkKafkaConsumer 的 offset 存储

Flink 1.14+ KafkaSource 把每个 partition 的 offset 存为 Operator State(split enumerator + reader state),checkpoint 时由 JM 持久化到 分布式存储state.checkpoints.dir),不是 __consumer_offsets 的主路径。旧 FlinkKafkaConsumer 同样以 Flink state 为准,可选 setCommitOffsetsOnCheckpoints(true) 同步到 Kafka。

组件 恢复真相 Lag 监控
原生 consumer __consumer_offsets kafka-consumer-groups.sh
Flink Kafka Source Checkpoint / Savepoint Flink metrics + 可选 Kafka commit
Kafka Connect Connect offset topic Connect REST / JMX

作业 cancel 不带 savepoint 后重启,若未配置 retained checkpoint,只能从 策略指定的起始位点(earliest/latest/group)重新消费——与 Kafka group committed offset 无关。

每次 checkpoint 都 commitSync__consumer_offsets 会:

因此生产 Flink 作业:监控 lag 用 Flink REST / Prometheus;若必须用 Kafka lag,再显式开启 commit on checkpoint 并 文档化 外部不得共用 group。


七、Lag、心跳与监控入口

Consumer lag = LEO − consumer committed offset(按 partition 计)。Lag 持续上升表示 消费慢于生产rebalance 停顿

常用排查(命令在官方文档;不在此伪造输出):

# 查看 group lag 与 partition 分配
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-group

# 查看 ISR 与 leader
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic my-topic
指标 / 现象 可能原因
单 partition lag 高 热点 key、该 partition 数据倾斜
全 partition lag 尖刺 rebalance、consumer 全停
UnderReplicatedPartitions follower 落后、ISR 收缩
NOT_ENOUGH_REPLICAS ISR < min.insync.replicas 仍写 acks=all

Flink 背压导致 Source 不 poll,lag 也会升——需区分 传输层计算层第 18 篇)。

7.1 __consumer_offsets 本身也是 Kafka topic

__consumer_offsets 使用 compact 策略(第 4 篇):key 为 (group, topic, partition) 编码,value 为 committed metadata。RF 与 min ISR 应与其他关键 topic 同级——该 topic 不可用会导致 全集群 consumer 无法 commit offset,但 不影响 已 commit offset 的读取路径(直到 compact 损坏等极端情况)。


八、Classic 与 Cooperative Consumer 协议对照

维度 Classic(Eager) Cooperative(Incremental)
Revoke 时机 一次 revoke 全部 partition 分多轮 revoke 子集
Stop-the-world 全组暂停消费窗口较长 缩短,未 revoke 的 partition 可继续 poll
Assignor Range、RoundRobin、Sticky CooperativeStickyAssignor
客户端要求 旧版即可 需支持 incremental protocol 的 consumer 版本

升级路径:组内 所有 consumer 必须支持 cooperative,否则回退 classic。Flink Kafka connector 版本需与 broker 匹配(见 Flink 发行说明与 Kafka 兼容性矩阵)。

Rebalance listener 可在 onPartitionsRevoked同步 commit 或 flush 外部 store,减少 重复消费窗口——Flink 内部 Source 算子有自己的 checkpoint barrier,不依赖 listener,但 原生 consumer 应用 应在 revoke 回调里 commit(来源:Consumer API,Rebalance Listener)。


九、本地复现实验(步骤级)

实验 A:ISR 收缩(需 ≥2 broker 或 min ISR=1 的单节点模拟;以下按官方多 broker KRaft 文档搭建):

  1. 创建 replication.factor=3min.insync.replicas=2 的 topic。
  2. acks=all 生产消息,确认成功。
  3. 停止一个 follower broker 或人为阻塞网络,直至该 replica 移出 ISR(观察 metrics 或 kafka-topics.sh --describe 的 Isr 列表变短)。
  4. 继续 produce:若 ISR 仍 ≥ min ISR,应成功;若 ISR 不足,应失败。
  5. 恢复 follower,观察重新入 ISR 与 HW 推进。

实验 B:Rebalance 与 lag

  1. 启动 2 个 console consumer 同一 group.id,topic P=4
  2. 持续生产,记录 lag。
  3. 启动第 3 个 consumer,触发 rebalance;观察短时间 lag 上升与 partition 迁移。
  4. 改用 CooperativeStickyAssignor + static instance id,重复滚动重启,对比 rebalance 次数(从 broker/coordinator 日志统计,不伪造具体毫秒)。

实验 C:Auto commit vs 手动

  1. 两程序相同 group.id 不同 enable.auto.commit,人工在 process 中 sleep 后 kill -9。
  2. 重启后对比 重复消息条数,验证 at-least-once 边界。

十、与分布式日志复制的对照

distributed 系列Quorum、leader 选举、日志匹配;Kafka ISR 是 工程化折中

读 ISR 时带入 「可见性上界 = HW」 即可对接 formal 日志复制直觉,无需重新证明 safety。


十一、生产清单

检查项 建议
replication.factor 生产 ≥ 3
min.insync.replicas 与 RF 配合,常见 RF=3、min ISR=2
unclean.leader.election.enable false
Producer acks all + 幂等(第 6 篇
Consumer rebalance Cooperative Sticky;Connect/Flink 用独立 group
Flink 作业 offset 以 checkpoint 为准;谨慎 commitOffsetsOnCheckpoints

十二、术语表

术语 含义
ISR 与 leader 同步的 replica 集合
HW High Watermark,consumer 可见上界
LEO Log End Offset
GroupCoordinator 管理 consumer group 与 offset 的 broker 角色
Rebalance 组内 partition 分配重算
Cooperative rebalance 增量 revoke/assign,缩短全停窗口
Static membership group.instance.id 减少短暂重启 rebalance

十三、小结

Kafka 副本通过 leader + ISR + HW 定义 committed:producer acks=allmin.insync.replicas 共同约束 durability;consumer 只能读 HW 以内。Consumer group 把 partition 独占分配 给组内成员,rebalance 带来停顿与 lag 尖刺,Cooperative 与 static membership 可缓解。Offset 提交持久化在 __consumer_offsets;Flink 作业则以 checkpoint 中的 source state 为恢复真相,两者分工不同。

下一篇进入 Kafka 事务与幂等 Producerpid 与 sequence 去重、事务生命周期、__transaction_stateread_committed,以及和 Flink EOS 的衔接点。


参考资料

  1. Apache Kafka Documentation, Replication(Leader/Follower、HW、ISR)。
  2. Apache Kafka Documentation, Consumer(Group management、offset commit、assignors)。
  3. Apache Kafka Producer Configuration(acksenable.idempotence)。
  4. KIP-345: Static Membership for Kafka Consumer Groups
  5. KIP-429: Kafka Consumer Incremental Rebalance Protocol
  6. KIP-101: Alter Replication Protocol to use Leader Epoch
  7. Apache Kafka Documentation, KRaft(Controller 与 LeaderAndIsr)。
  8. 本系列 第 4 篇第 10 篇(Flink offset);第 16 篇(Connect offset topic)。

返回 系列目录 | 上一篇:Kafka 日志模型与分区 | 下一篇:Kafka 事务与幂等 Producer

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】Kafka 日志模型与分区

从 Topic、Partition 到 Log Segment 的 .log/.index/.timeindex 文件布局,讲清 offset 单调性、分区内有序、顺序写与 sendfile 读路径,以及 Kafka 3.x KRaft 模式下元数据与日志目录的分工,为副本与 consumer 语义打底。

2026-07-01 · database / distributed

【流式数据处理】Kafka 事务与幂等 Producer

从幂等 producer 的 PID 与 sequence 去重,到事务 producer 的 init/begin/commit/abort 生命周期、__transaction_state 与 read_committed 隔离,讲清 Kafka 3.x 单集群 EOS 边界及其与 Flink checkpoint 的衔接。

2026-07-01 · database / distributed

【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照

从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。


By .