第 3 篇 在
Flink 侧把窗口 state 与事件时间对齐讲完了:同一 key
上的乱序事件要在 KeyGroup
内归并。回到管道最上游,Kafka
只保证分区内有序——分区键选错,窗口再正确也会把本应同序的业务事件拆到不同分区,聚合结果无从谈起。运维第一次
ls broker 数据目录,会看到
00000000000000000000.log、00000000000000000000.index
这类文件名;consumer 日志里反复出现
partition、offset、leader
epoch。这些名字各自对应什么物理结构?
本文不教 kafka-topics.sh 参数大全,而是把
Kafka 3.x(KRaft 模式)
下日志与分区的内核语义讲清楚,回答四个会直接决定吞吐规划和故障排查的问题:
- Topic、Partition、Log Segment 三层结构在磁盘上长什么样?
- Offset 为什么单调、能否回退;分区内有序与分区间无序的边界在哪?
- 顺序写、页缓存与 zero-copy(sendfile)如何支撑 broker 读路径?
- KRaft 与已废弃的 ZooKeeper 模式在元数据与日志存储上差在哪?
后文默认读者已读过本系列 第 1
篇 的「日志即真相」心智模型。副本、ISR 与 consumer group
见 第
5 篇;事务 producer 与 read_committed 见 第
6 篇。Flink 如何把 partition 当作 Source split、offset
写入 checkpoint,见 第 7、10
篇。
环境说明:本机为 WSL2(Linux 6.6.87.2)、i9-12900K / 32 GiB,未在本写作环境启动 Kafka 集群。本文机制结论来自 Apache Kafka 官方文档(Design、Implementation、KRaft)与
apache/kafka3.x 源码包结构说明;不粘贴未在本机执行的kafka-dump-log.sh输出,也不给出任何伪造的吞吐 benchmark。文末给出可复现的本地 KRaft 单节点步骤。
版本锚定:Apache Kafka 3.x,新集群默认 KRaft(Kafka Raft) 元数据模式;Kafka 3.3+ 起 KRaft 生产可用,4.0 起移除 ZooKeeper 依赖(来源:Kafka Documentation,KRaft;Release Notes)。文中 ZK 模式仅作历史对照,不作为新部署推荐。
一、Topic、Partition 与 Broker 上的物理布局
Kafka 把消息组织成 append-only 日志(来源:Kafka Documentation,Design)。逻辑层次:
| 层级 | 含义 | 运维可见对象 |
|---|---|---|
| Topic | 一类消息的逻辑命名空间 | kafka-topics.sh --describe |
| Partition | Topic 内的并行与有序单元 | partition=0..N-1 |
| Replica | Partition 在 broker 上的副本 | Leader + Follower(第 5 篇) |
| Log Segment | Partition 日志的滚动文件 | .log / .index /
.timeindex |
一个 Topic 有 \(P\) 个
partition 时,吞吐扩展靠增加 \(P\);单 partition
内顺序写、单 consumer 线程顺序读,单 partition
吞吐有上限。这与 Flink keyBy 后 state
按 KeyGroup 分片(第
8 篇)是同一类「先分区、再局部有序」思路。
flowchart TB
T["Topic: orders"]
P0["Partition 0<br/>Leader @ broker-1"]
P1["Partition 1<br/>Leader @ broker-2"]
P2["Partition 2<br/>Leader @ broker-3"]
T --> P0
T --> P1
T --> P2
P0 --> S0["LogSegment<br/>00000000000000000000.*"]
P0 --> S1["LogSegment<br/>00000000000000012345.*"]
1.1 数据目录结构(KRaft 单节点示意)
KRaft 模式下,broker 配置 log.dirs
指向日志根目录(可多路径)。每个 partition
replica 对应一个子目录,命名模式(来源:Kafka 源码
LogManager、官方 Directory Layout
说明):
${log.dirs}/<topic>-<partition>/ # 例如 orders-0
00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000012345.log
...
leader-epoch-checkpoint
partition.metadata
- 目录名
<topic>-<partition>中的 topic 名经 URL 编码(特殊字符替换)。 - 文件名前缀 base offset:该 segment 中第一条消息的 offset。
- active segment 是当前唯一可 append 的
.log文件;写满或到期后 roll 出新 segment。
元数据(topic 定义、partition 分配、ISR、leader epoch)在
KRaft 下由 Controller Quorum 持久化在
metadata.log(metadata.log.dir),不再写
ZooKeeper znode。Broker 本地仍只存 消息正文
与 segment 索引——理解这一点,排查「元数据正常但某 broker
磁盘缺文件」时不会误查 ZK。
1.2 KRaft 与 ZooKeeper 模式的边界
| 维度 | KRaft(3.x 默认方向) | ZooKeeper(已废弃) |
|---|---|---|
| 元数据存储 | 内置 Raft quorum(__cluster_metadata) |
ZooKeeper znode |
| Controller | KRaft controller 节点 | 单 Controller broker |
| 部署组件 | Kafka broker + KRaft voter | Kafka + ZK 集群 |
| 分区 leader 选举 | Raft 日志 + broker 本地状态 | Controller 写 ZK |
对 本文关心的日志与 offset 语义,两种模式 无差异:producer 仍 append 到 leader replica 的 segment;consumer 仍按 offset 顺序读。差异在 元数据故障域与运维面:KRaft 把「谁是 partition 0 的 leader」也纳入 Raft 日志,避免 ZK session 超时触发的 controller 抖动(具体 ISR 行为见 第 5 篇)。
二、Log
Segment:.log、.index、.timeindex
Partition 日志在磁盘上被切成多个
LogSegment,避免单文件无限增长(来源:Kafka
Documentation,Log;源码
LogSegment、OffsetIndex、TimeIndex)。
2.1 三类文件各存什么
| 文件 | 内容 | 读路径用途 |
|---|---|---|
.log |
连续 RecordBatch 二进制序列 | 顺序 scan、按 offset fetch |
.index |
(offset → 在 .log 中的物理 position)
稀疏索引 |
给定 offset,定位到 .log 附近字节 |
.timeindex |
(timestamp → offset) 稀疏索引 |
按时间戳 seek(time-based retention / consumer
offsetsForTimes) |
索引是 稀疏
的:不是每条消息一条索引项,而是每累积
index.interval.bytes(默认 4 KiB)建一项。查找
offset \(o\) 时,在 index
中找到 \(\leq o\)
的最大项,再从 .log 对应 position
顺序扫描 到精确 batch——空间换时间,且 scan
范围通常很小。
Segment roll
触发条件(log.segment.bytes、log.segment.ms,来源:broker
配置文档):
- 当前 active segment 大小达到
log.segment.bytes(默认 1 GiB); - 或 segment 创建时间距今超过
log.segment.ms(默认 7 天)。
Roll 后旧 segment 只读;配合 log
retention(log.retention.hours /
log.retention.bytes)由
LogCleaner(compact topic)或
RetentionManager(delete topic)回收。
2.2 RecordBatch 与消息格式
Kafka 0.11+ 生产路径使用 RecordBatch(Magic v2,来源:Kafka Protocol Guide,Record Batch)。一个 batch 共享:
- Base offset:batch 中第一条记录的 offset;
- Partition leader epoch(容错与 offset 对齐,与 第 5 篇 HW 相关);
- Timestamp type(CreateTime / LogAppendTime);
- Compression(none、gzip、snappy、lz4、zstd);
- 多条 Record(key、value、headers、timestamp)。
Producer 按 batch 发送,broker 顺序 append 整个
batch 到
.log,减少系统调用与磁盘小写。Consumer fetch
也按 batch 返回,客户端再解压、逐条交付。
理解 batch 后,解释两个常见现象:
- 「一条消息一个 offset」在实现上是 batch 内递增:单 batch 多 record 时,offset 仍单调,但物理上同一次 append。
- 压缩在 batch 级:相同 key 的重复 value 在 batch 内压缩效率更高;跨 batch 无法共享压缩字典。
2.3 Offset 的单调性与不可回退
对单个 partition,offset 是 64 位整数,从 0 起单调递增(来源:Design)。新消息占用新 offset;不会在正常运行中「重用」已 commit 过的 offset 号。
注意区分:
| 概念 | 是否回退 | 说明 |
|---|---|---|
| Log end offset(LEO) | 只增不减 | leader 已写入的最大 offset + 1 |
| Consumer position | 可 seek 到更小值 | 人为 seek() 或重置 group offset |
| 日志截断(truncate) | 旧 offset 对应数据被删 | retention、compact、unclean leader election(第 5 篇) |
Flink Kafka Source 把
(topic, partition) → offset 存进 checkpoint(第 10
篇)。Broker 侧 LEO 增长与 consumer offset
独立——重复消费来自 consumer 回退
offset,不是 broker「倒写日志」。
三、分区内有序、分区间无序
Kafka 的唯一顺序保证:同一 partition 内,按 offset 顺序可见(来源:Design,Guarantees)。不同 partition 之间 无任何顺序;全局有序若业务需要,只能:
- 用 单 partition(吞吐上限低),或
- 在 下游 Flink/keyed 算子 按业务键重分区并重排序(第 2、8 篇)。
3.1 分区键如何决定消息落哪个 partition
Default partitioner(Java
DefaultPartitioner,Kafka 2.4+
可插拔)逻辑(来源:Producer API 文档):
- 若指定了 partition 参数,直接用;
- 否则若 key !=
null,
partition = murmur2(key) % numPartitions(符号处理保证非负); - 否则 sticky partition:batch 内粘在同一 partition,批次间轮询,降低小 batch 开销。
因此:
- 相同 key → 相同 partition(partition 数不变时),保证该 key 的事件 按 produce 顺序 进入日志(单 producer、无失败重试乱序的前提下;幂等与事务重试见 第 6 篇)。
- key = null 的消息轮询各 partition,无 key 级顺序。
// 显式 key:同一 userId 进同一 partition
ProducerRecord<String, String> rec =
new ProducerRecord<>("orders", userId, payload);
// 显式指定 partition(绕过 partitioner)
ProducerRecord<String, String> rec2 =
new ProducerRecord<>("orders", 3, userId, payload);3.2 分区数 \(P\) 与下游并行度
| 关系 | 后果 |
|---|---|
| \(P\) 过小 | 单 partition 成为吞吐瓶颈;单 consumer 线程读满才能吃满 |
| \(P\) 过大 | 文件句柄、元数据、空 partition 增多;端到端延迟可能上升 |
| 改 \(P\) | 已有 key 到 partition 的映射会变(除非自定义 partitioner 固定映射) |
Flink Source 并行度与 \(P\) 的对齐见 第 7 篇:理想情况是 Source 并行度 = \(P\),每个 subtask 固定消费若干 partition。CDC 场景常按 表主键 hash 选 key,使同一行变更进同一 partition(第 16 篇)。
3.3 与事件时间乱序的关系
分区内有序是 offset 顺序,不一定是 事件时间顺序。Producer 先写 \(t=100\) 再写 \(t=50\),日志里仍是 offset 递增。Flink 用 watermark 处理 事件时间乱序(第 2 篇),与 Kafka offset 顺序正交——两者都要在管道设计里同时考虑。
四、写路径:顺序 append 与页缓存
Broker 写路径核心:只做 append,不做原地更新(来源:Design,Performance)。
flowchart LR
PRO["Producer batch"] --> NET["网络线程"]
NET --> APP["Log.append<br/>顺序写 active segment"]
APP --> PC["页缓存 page cache"]
PC --> DISK["异步 flush 到磁盘"]
- Producer 发送 RecordBatch 到 partition leader(第 5 篇 讲副本复制)。
- Leader 在内存中校验 batch,追加到 mapped active segment 文件末尾。
- 数据先进 OS page cache;由
log.flush.interval.messages/log.flush.interval.ms等控制刷盘频率(现代部署常依赖 replication + OS 刷盘,而非每条 fsync)。
顺序写 使 HDD/SSD 都能接近介质顺序带宽;随机写更新 B-tree 的传统 MQ 难以做到这点。这也是 Kafka 适合 高吞吐日志管道 而非小消息低延迟 RPC 的原因之一。
4.1
log.flush 与 durability 分工
| 机制 | 保证什么 |
|---|---|
| Broker 刷盘 | 单节点崩溃后数据是否在本地磁盘 |
Replication + min.insync.replicas +
acks=all |
多副本 committed(第 5 篇) |
Producer acks=1 |
仅 leader 写入即应答,副本未同步可能丢 |
不要把「没开 broker fsync」等同于「一定丢数据」——副本语义才是生产 durability 的主旋钮。
五、读路径:Consumer Fetch 与 zero-copy
Consumer(或 Follower 副本)读消息走 Fetch 请求(来源:Implementation,Fetch):
- Client 指定
(topic, partition, fetchOffset, maxBytes); - Leader 用 offset index 定位
.log字节范围; - 从 page cache 或磁盘读连续字节,封装为 FetchResponse。
5.1 sendfile 与 zero-copy
Kafka broker 在支持的操作系统上使用
sendfile(2)(或等价机制),把
.log 文件描述符与 socket
描述符在内核态直连,减少
用户态拷贝(来源:Kafka 设计文档
Efficient Transfer;Linux
man sendfile)。高吞吐消费场景下,CPU
不会全耗在 read()/write()
拷贝上。
限制与边界:
- TLS 终止在 broker 时,通常必须在用户态加密,zero-copy 路径可能降级;
- 压缩 batch 在 consumer 端解压,解压发生在用户态,与 sendfile 无关;
- Follower 从 leader 复制 也是 fetch 路径,同样受益于顺序读。
5.2
fetch.min.bytes 与
fetch.max.wait.ms
Consumer 可批量拉取:数据量不足
fetch.min.bytes 时 broker 最多等待
fetch.max.wait.ms 再返回。这与
微批 不同——仍是长轮询拉模型,延迟由
consumer 配置与 broker 负载共同决定(第 1
篇 批 vs 流对照)。
六、Retention、Compact 与 Tiered Storage(边界)
6.1 Delete retention
默认 delete 策略:超过
log.retention.hours 或
log.retention.bytes 的 整
segment 删除。删除后 更小 offset
不可再读——consumer 若仍持有旧 offset,会触发
OffsetOutOfRange,需重置或 seek 到 log
start。
6.2 Compact retention
Compact topic(如
__consumer_offsets、Connect offset topic)保留
每个 key 的最新 value,旧 offset
段可能被清理。Compact 不改变「同一 offset 一旦可读则
immutable」的语义,但 相同 key 的多条记录
最终只留最后一条——用于 changelog 而非事件溯源全历史。
6.3 Tiered Storage
Kafka 3.x 企业/云扩展支持 分层存储(本地热 segment + 对象存储冷 segment,来源:Confluent/Apache 分层存储 KIP)。机制仍是 segment 文件,只是旧 segment offload 到 S3 等;offset 与 index 语义不变。自建开源 3.x 默认仅本地盘,本篇不展开云厂商实现。
七、本地复现:KRaft
单节点与 kafka-dump-log.sh
下列步骤按 Kafka 官方 KRaft Quick Start(Documentation,KRaft)可复现;以下命令未在本写作环境执行,仅作读者实验入口。
# 下载 Kafka 3.9.x(或当前 3.x 稳定版)并解压
export KAFKA_HOME=/path/to/kafka_2.13-3.9.0
# 生成 KRaft 集群 UUID 与格式化存储(单节点 combined broker+controller)
$KAFKA_HOME/bin/kafka-storage.sh random-uuid
# 记下 UUID,填入 config/kraft/server.properties 的 node.id / controller.quorum.voters
$KAFKA_HOME/bin/kafka-storage.sh format -t <UUID> -c $KAFKA_HOME/config/kraft/server.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties创建 topic 并生产几条带 key 的消息:
$KAFKA_HOME/bin/kafka-topics.sh --create --topic demo \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic demo --property "parse.key=true" --property "key.separator=:"
# 输入 user-1:hello 与 user-2:world在 log.dirs 下找到
demo-0、demo-1 等目录,对
已 roll 或足够大的 segment 执行:
$KAFKA_HOME/bin/kafka-dump-log.sh \
--files /path/to/kafka-logs/demo-0/00000000000000000000.log \
--print-data-log应观测到的字段(名称以工具输出为准,不伪造具体数值):
- 每条 record 的 offset、timestamp、key/value 长度;
- RecordBatch 的
baseOffset、lastOffset、compression; - 相同 key 的消息是否落在预期 partition(对比
demo-0vsdemo-1目录)。
再用 --index-types offset /
time 查看索引文件项,理解 稀疏
index 与 .log 的对应关系。
7.1 与 Flink 本地联调的前置条件
若要与 第 7 篇
Flink 联调,保持
$P \geq$ Flink Source 并行度,并记录 topic 的 **partition → leader broker** 映射(kafka-topics.sh
–describe)。FlinkKafkaSource` 按
split(partition)分配,不假设全局有序。
7.2 用
--deep-iteration 观察 batch 边界
kafka-dump-log.sh 支持
--deep-iteration(版本以本地
--help 为准):逐 batch 打印
baseOffset、lastOffset、compressionType、producerId(若有)。对比
同一 key 连续两条 是否在同一 batch,可验证
producer linger.ms /
batch.size 是否按预期合并。事务 batch
会带 producerId 与非 -1 的
producerEpoch(第
6 篇)。
八、Leader Epoch Checkpoint 与 offset 对齐
Leader epoch(KIP-101)是 partition 上
leader 代际 的单调计数。每次 leader
变更,epoch 递增;broker 在本地维护
leader-epoch-checkpoint
文件,记录 (epoch → end offset)
映射(来源:Replication)。
Follower 复制与 consumer _seek 时,epoch 用于:
- 防止 stale leader 上 截断后 的 offset 被误当作有效;
- 协调 HW 推进与 truncate-on-recovery 的边界。
对应用开发者,epoch 多数 透明;排查 「offset 存在但 fetch 空」 或 副本恢复后消息丢失 时,需结合 epoch 与 HW(第 5 篇)看是否发生过 unclean election 或 日志截断。
九、Broker 配置与磁盘规划
与日志模型直接相关的 broker
配置(server.properties,来源:Broker
Configs):
| 配置项 | 默认量级 | 调优直觉 |
|---|---|---|
log.dirs |
必填 | 多磁盘可写多个路径,单 partition 仍属一目录 |
log.segment.bytes |
1 GiB | 过小 → inode 与文件句柄压力 |
log.retention.hours |
168 | 与磁盘容量、合规保留联动 |
log.retention.check.interval.ms |
5 min | 删除 segment 的扫描周期 |
num.io.threads |
8 | 与磁盘数、fetch 并发相关 |
num.network.threads |
3 | 接受连接与 request 解析 |
磁盘满 时 broker 可能 拒绝 produce 或 删旧 segment(视策略),consumer 的 log start offset 上移,旧 offset 不可再读——Flink 作业若 checkpoint 指向已删除 offset,恢复会失败,需 重置起始位点 或 延长 retention(第 10 篇)。
十、生产参数与常见误区
| 参数 | 含义 | 常见误区 |
|---|---|---|
num.partitions |
新建 topic 默认分区数 | 以为可随时改而不影响 key 路由 |
log.segment.bytes |
segment roll 大小 | 过小导致文件过多;过大影响 retention 粒度 |
log.retention.hours |
delete 策略保留时长 | 与 compact topic 混用未查
cleanup.policy |
message.timestamp.type |
CreateTime vs LogAppendTime | 与 Flink event-time 不一致导致 watermark 异常 |
| 误区 | 纠正 |
|---|---|
| 「Kafka 保证全局有序」 | 仅 分区内 有序 |
| 「offset 等于消息 ID」 | offset 是 分区内日志位置;业务 ID 应放 key/value |
| 「多 consumer 同 group 提高单 partition 吞吐」 | 同 group 内 一 partition 只分给一个 consumer(第 5 篇) |
| 「KRaft 改了 offset 规则」 | 未改;改的是 元数据存储 |
十一、与 lakehouse 入湖侧的对称
lakehouse 第 19 章 讲 不可变文件 + 元数据指针 入湖;Kafka 讲 不可变 segment + offset 指针 传输。对称点:
- 都是 append-only,更新用新 record(或 compact 留最新);
- 进度指针:Iceberg snapshot / Kafka offset;
- 失败恢复:从指针重放。
分工:Kafka 不负责 窗口聚合与 exactly-once 计算;它提供 可重放、可分区的日志。引擎侧语义在 Flink(第 10、14 篇)。
十二、术语表
| 术语 | 含义 |
|---|---|
| LogSegment | Partition 日志的滚动文件组(.log + index + timeindex) |
| Active segment | 当前唯一可 append 的 segment |
| LEO | Log End Offset,下一条待分配 offset |
| RecordBatch | 磁盘与网络传输的消息批次单元 |
| KRaft | Kafka 内置 Raft 元数据 quorum,替代 ZooKeeper |
| DefaultPartitioner | 按 key hash 或 sticky 策略选择 partition |
| sendfile | 内核态文件到 socket 的数据传输,减少拷贝 |
十三、小结
Kafka 把每个 partition 实现为 只追加的 LogSegment
链:.log 存
RecordBatch,.index / .timeindex
支持按 offset 与时间 seek。分区内 offset
单调、分区间无顺序;吞吐靠 partition
水平扩展,顺序靠 key 选 partition。写路径顺序 append
利用页缓存;读路径 fetch 配合 sendfile 降低拷贝开销。Kafka
3.x KRaft 把元数据迁入内置
Raft,不改变 日志文件布局与 offset
语义。
下一篇进入 副本、ISR 与 Consumer
Group:Leader/Follower
复制、HW/LEO、acks 与
min.insync.replicas
如何定义「committed」;consumer group rebalance 与 offset
提交和 Flink checkpoint 如何分工。
参考资料
- Apache Kafka Documentation, Design(日志、分区、顺序保证)。
- Apache Kafka Documentation, Implementation(LogSegment、索引、fetch)。
- Apache Kafka Documentation, KRaft(元数据 quorum、与 ZK 迁移边界)。
- Apache Kafka Protocol Guide, Record Batch(Magic v2、压缩与 timestamp type)。
- Apache Kafka Producer
Configuration(
partitioner.class、acks与 第 5 篇 交叉)。 - Apache Kafka 源码
org.apache.kafka.storage.internals.log.LogSegment(3.x 包路径,segment roll 逻辑)。 - Linux
man 2 sendfile(zero-copy 系统调用语义)。 - 本系列 第 1 篇(日志心智模型);第 3 篇(key 与窗口)。
返回 系列目录 | 上一篇:窗口:滚动、滑动与会话 | 下一篇:副本、ISR 与 Consumer Group
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】流处理全景:从日志到有状态计算
从批、流、微批四维度对比出发,建立「可重放日志 + 有状态计算」心智模型,厘清 Lambda/Kappa 边界与流表对偶,并给出与 lakehouse 入湖侧对称的全系列地图。
【流式数据处理】副本、ISR 与 Consumer Group
从 Leader/Follower 复制、HW/LEO/ISR 到 acks 与 min.insync.replicas 的 durability 边界,再到 consumer group 分区分配、rebalance 代价,以及 offset 提交与 Flink checkpoint 的分工。
【流式数据处理】Kafka 事务与幂等 Producer
从幂等 producer 的 PID 与 sequence 去重,到事务 producer 的 init/begin/commit/abort 生命周期、__transaction_state 与 read_committed 隔离,讲清 Kafka 3.x 单集群 EOS 边界及其与 Flink checkpoint 的衔接。
【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照
从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。