土法炼钢兴趣小组的算法知识备份

【流式数据处理】RocksDB State Backend 内核路径

文章导航

分类入口
databasedistributed
标签入口
#flink#rocksdb#state-backend#lsm-tree#checkpoint#keygroup#column-family#incremental-checkpoint

目录

第 9 篇 已经说明:KeyedState 可以落在 HashMapStateBackendEmbeddedRocksDBStateBackend。前者把 Java 对象放在 JVM 堆上,读写快,但状态规模受堆内存和 GC 约束;后者把状态序列化成字节数组写入 RocksDB——一个嵌入式 LSM-Tree 引擎——状态上限由 TaskManager 本地磁盘决定,代价是每次访问都要经过序列化,且读写路径会穿过 MemTable、SSTable、Block Cache 和后台 Compaction。

第 10 篇 讲 checkpoint 如何把 barrier 对齐后的算子状态持久化;第 11 篇 讲 savepoint 的全量快照格式与跨版本恢复。两篇都假设「状态后端会配合快照」,但没有展开 RocksDB 内部到底怎么写、怎么读、增量 checkpoint 为什么比全量 snapshot 省 IO。

本文填这个缺口:把 Flink 的 RocksDB state backend 当作「跑在流计算作业里的 LSM-Tree」,与 lsm-tree 系列 建立的 WAL / MemTable / SSTable / Compaction 心智模型逐层对照。读完后你应该能回答三个生产问题:

  1. 一条 ValueState.update() 最终写进了 RocksDB 的哪一层、哪个 Column Family、什么 key 前缀?
  2. checkpoint 触发时,Flink 上传的是完整状态还是自上次 checkpoint 以来新增的 SST 文件?
  3. 状态读慢,应该怀疑 block cache 未命中、L0 文件堆积,还是 KeyGroup 倾斜导致单个 subtask 的 RocksDB 过热?

本文是「流式数据处理」系列第 12 篇(共 18 篇)。→ 系列目录

篇目 核心内容
第 9 篇 · 键控状态与 TTL HashMap vs RocksDB 选型
第 10 篇 · Checkpoint Barrier 对齐与协调器生命周期
第 11 篇 · Savepoint 全量快照与 rescale
第 12 篇 · RocksDB 内核路径 ColumnFamily / KeyGroup、读写路径、增量 checkpoint
第 13 篇 · 状态放大与调优 窗口膨胀、写放大、hot key
第 14 篇 · 交付语义 at-least-once 与 exactly-once 边界

版本锚定:Flink 1.20+ / 2.x 主线;RocksDB 随 Flink 发行版捆绑(flink-statebackend-rocksdb)。API 名称以 EmbeddedRocksDBStateBackend 为准(原 RocksDBStateBackend 已统一更名)。

环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2),未安装 JVM / Flink 集群。本文机制来自 Flink 官方文档、flink-statebackend-rocksdb 源码与 RocksDB Wiki;不粘贴未执行的 Web UI 截图,也不伪造 checkpoint 耗时或状态体积数字。文末给出可复现实验步骤,供读者在自有环境验证。


一、EmbeddedRocksDBStateBackend 在作业里的位置

流式作业的一个 keyed 算子(例如 keyBy(userId) 后的窗口聚合)在 \(P\) 个并行 subtask 上运行。每个 subtask 持有一个 RocksDB 实例——不是「整个作业一个 RocksDB」,也不是「每个 state 变量一个 RocksDB」,而是 每个 subtask × 每个需要 KeyedState 的算子 = 一个 RocksDBKeyedStateBackend 实例

flowchart TB
  subgraph TM["TaskManager Slot"]
    subgraph ST0["Subtask 0"]
      R0["RocksDB 实例<br/>KeyGroup 0..k"]
    end
    subgraph ST1["Subtask 1"]
      R1["RocksDB 实例<br/>KeyGroup k+1..2k"]
    end
  end
  CKPT["CheckpointStorage<br/>HDFS / S3 / 本地"]
  ST0 -->|"增量上传 SST"| CKPT
  ST1 -->|"增量上传 SST"| CKPT

这个实例默认把数据目录放在 TaskManager 的本地路径(state.backend.rocksdb.localdir,或 TM 的 tmp 目录下的 rocksdb 子目录)。目录结构大致为:

{localDir}/{jobId}/{operatorIdentifier}/{subtaskIndex}/
  ├── CURRENT          # RocksDB 当前 MANIFEST 指针
  ├── MANIFEST-*       # SST 文件集合的版本日志(见 lsm-tree compaction 篇)
  ├── *.sst            # 不可变有序表文件
  ├── *.log            # WAL
  └── OPTIONS-*        # RocksDB 选项快照

与 HashMapStateBackend 的本质差异(Flink 文档 State Backends,A 级):

维度 HashMapStateBackend EmbeddedRocksDBStateBackend
存储介质 JVM 堆(Java 对象) 本地磁盘(序列化 byte[]
单次读写 直接内存访问 序列化 / 反序列化 + 可能磁盘 I/O
状态上限 -Xmx 与 managed memory 约束 受 TM 本地磁盘容量约束
checkpoint 形态 全量序列化上传 默认异步快照;可选增量 checkpoint
对象复用 不安全(堆上对象可变) 安全(每次读出新字节数组)

RocksDB 通过 JNI 暴露 C++ API,单 key / 单 value 上限为 \(2^{31}\) 字节(Flink 文档明确标注的限制)。使用 ListState 等 merge 语义的 state 时,合并后的 value 可能静默超过此限并在下次读取时失败——这是 RocksDB JNI 层的硬边界,不是 Flink 单独能绕过的。

启用 RocksDB 后端并打开增量 checkpoint 的典型代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
// 构造参数 true:启用增量 checkpoint(覆盖 flink-conf 默认值时需显式传入)
env.setStateBackend(backend);
env.getCheckpointConfig().setCheckpointStorage("s3://bucket/flink/checkpoints");
env.enableCheckpointing(60_000);

或在 flink-conf.yaml 中:

state.backend.type: rocksdb
execution.checkpointing.incremental: true
execution.checkpointing.dir: s3://bucket/flink/checkpoints
state.backend.rocksdb.localdir: /data/flink/rocksdb

二、Column Family 与 KeyGroup 映射

2.1 一个算子、一个 RocksDB、多个 Column Family

Flink 文档写得很明确:当前实现中,算子里的每一种 registered state 对应一个 RocksDB Column FamilyState BackendsPredefined Per-ColumnFamily Options)。

例如一个窗口算子注册了:

则该 subtask 的 RocksDB 里至少有 两个 Column Family(加上 RocksDB 默认的 default CF,通常不参与 Flink 状态存储)。所有 Column Family 共享同一个 WAL、共享 Block Cache 和 Write Buffer Manager(在 Flink managed memory 开启时),但 逻辑上 state 彼此隔离——读 "count" 不会扫描 "user-last-seen" 的数据。

这与 lsm-tree 第 1 篇 讲的「单 CF 单 LSM-Tree」不同:RocksDB 允许多棵逻辑 LSM-Tree 共库,Flink 利用这一点把不同 state 变量分开,便于按 state 做 TTL compact filter、也便于 savepoint 元数据对齐。

flowchart LR
  subgraph RocksDB["单个 subtask 的 RocksDB"]
    CF1["Column Family: count"]
    CF2["Column Family: user-last-seen"]
    WAL["共享 WAL"]
    BC["共享 Block Cache"]
  end
  OP["WindowOperator"] --> CF1
  OP --> CF2
  WAL --> CF1
  WAL --> CF2
  BC --> CF1
  BC --> CF2

Flink 的 keyed stream 在逻辑上把全量 key 空间切成 maxParallelismKeyGroup(最大并行度决定 KeyGroup 总数,实际并行度 \(P \leq\) maxParallelism)。keyBy 的 key 经 KeyGroupStreamPartitioner 哈希到某个 KeyGroup;每个 subtask 在运行时只负责 连续的 KeyGroup 区间KeyGroupRange)。

Rescale(改并行度)或 savepoint 恢复时会 重分配 KeyGroup 区间,因此 RocksDB 里存的 key 必须带 KeyGroup 前缀,否则无法把某 subtask 本地数据库里的条目正确迁移到新 subtask。

Flink 在 CompositeKeySerializationUtils 中实现前缀规则(apache/flink 源码,A 级):

读路径上,算子已知当前 key 属于哪个 KeyGroup 时,可走 setCurrentKeyAndKeyGroup 快速路径,跳过重新计算 KeyGroup 的开销(RocksDBKeyedStateBackend API)。

2.3 与 lsm-tree 系列的对照

概念 lsm-tree 教学实现 Flink RocksDB State Backend
逻辑 key InternalKey = user_key \| seq \| type KeyGroup \| Namespace \| UserKey
多表 单库单 CF 单库多 CF(每 state 一 CF)
分区 无(单机 KV) KeyGroup 前缀 + subtask 区间
版本 / 删除 sequence + tombstone State TTL tombstone;窗口触发后显式 clear
文件布局 .sst + MANIFEST 相同(RocksDB 即 LSM-Tree 工业实现)

lsm-tree 第 2 篇 WAL + MemTable 里的 InternalKey 按 (user_key, seq desc) 排序,保证同一 user key 最新版本在前;Flink 状态 key 不含 sequence——更新即覆盖同一 (KeyGroup, Namespace, UserKey) 的最新 value 字节。窗口 state 的「多版本」靠 不同 Namespace(不同窗口) 区分,而不是靠 sequence number。


三、写路径:从算子 update 到 Compaction

3.1 前台写路径

一次 valueState.update(v) 在 RocksDB 后端上的路径:

  1. 序列化TypeSerializer 把 user key、namespace、value 转成 byte[]
  2. KeyGroup 前缀:根据当前 key 计算 KeyGroup index,写入 key 前缀(1 或 2 字节)。
  3. RocksDB Put:进入对应 Column Family 的 MemTable;同时顺序追加 WAL(除非显式关闭 WAL,Flink 默认开启)。
  4. 返回:前台线程结束;与 lsm-tree 全景 描述一致——前台主要是 顺序 WAL + 内存 MemTable 插入,不做随机磁盘写。
sequenceDiagram
  participant OP as WindowOperator
  participant RS as RocksDBStateBackend
  participant MT as MemTable
  participant WAL as WAL 文件
  OP->>RS: update(value)
  RS->>RS: serialize KeyGroup+NS+Key+Value
  RS->>WAL: append record
  RS->>MT: insert sorted
  RS-->>OP: return
  Note over MT: 后台:MemTable 满 → Flush → L0 SST
  Note over MT: 后台:Compaction → L1..Ln

3.2 Flush:Minor Compaction

MemTable 达到 write_buffer_size(RocksDB 默认 64MB;Flink 开启 managed memory 时由 Write Buffer Manager 统一预算,见第 13 篇)后:

  1. 当前 MemTable 冻结为 Immutable MemTable;
  2. 后台线程顺序扫描,写出 L0 SSTable(与 lsm-tree compaction 篇 Minor Compaction 相同);
  3. 旧 WAL 在对应 SST 持久化后可回收。

RocksDB 相对 LevelDB 教学实现的增强:允许多个 Immutable MemTable 排队max_write_buffer_number),缓解「Flush 跟不上写入」时的 stall。Flink 高吞吐窗口作业若频繁 stall,Web UI 里表现为反压,底层常见信号是 stall micros 计数上升(RocksDB 统计项,需在 Flink 中启用 native metrics)。

3.3 Compaction:Major Compaction 与写放大

L0 文件数超阈值或各层字节数超限时,RocksDB 触发 Leveled Compaction(默认策略,与 lsm-tree 系列实现一致):选入 SST → 与下一层 key 范围重叠的文件 多路归并 → 输出新 SST → 旧文件标记删除。

归并时:

lsm-tree 第 1 篇 推导的 Leveled 写放大近似:

\[ WA \approx T \times L \]

其中 \(T\) 为层间容量倍数(RocksDB 默认 10),\(L\) 为层数。用户逻辑写入 1 GB 状态变更,磁盘可能承受数十 GB 的 Compaction 写——这是 LSM-Tree 换前台顺序写代价的后台账单。Flink 作业 checkpoint 间隔内的 SST 增量 + Compaction 重写,会叠加在 TM 磁盘 I/O 上,与 checkpoint 上传争抢带宽(第 13 篇展开)。

3.4 写路径与 checkpoint 的交叉

增量 checkpoint 触发前,Flink 会调用 RocksDB 的 Flush,把 MemTable 强制落盘为 SST,保证快照可见的文件集合稳定(Flink 官方博客 Managing Large State in Apache Flink: An Intro to Incremental Checkpointing,B 级官方工程博客,机制描述与源码一致)。这一步是 同步 的——在 barrier 对齐后的 snapshot 阶段执行——因此 MemTable 过大或 Flush 过慢会直接拉长 checkpoint 同步阶段 时长(第 10 篇sync 时间)。


四、读路径:Block Cache、Bloom Filter 与读放大

4.1 点查:ValueState.value()

读一条状态的路径(与 lsm-tree 全景的 Get 走查同构):

  1. Active MemTable
  2. Immutable MemTable(若有);
  3. L0 各 SST(key 范围可能重叠,需逐个或并行查);
  4. L1 … Ln(每层至多一个 SST 命中,二分 + Bloom Filter 跳过);
  5. 命中则反序列化 value 返回;否则返回 null 或空 Optional。

每一步「读 SST」先查 Bloom Filterlsm-tree 第 3 篇 SSTable + Bloom Filter):判定「一定不存在」则跳过该文件;「可能存在」则读 Index Block → Data Block。

RocksDB 的 Block Cache 缓存解压后的 Data Block。Flink 默认开启 state.backend.rocksdb.memory.managed=true,在 slot 级别 用 shared cache + write buffer manager 划分 managed memory(Flink 文档 Memory Management):

读热点集中在少数 key 时,Block Cache 命中率高,读延迟接近内存;大范围扫描(例如 MapState.entries()getKeys())则容易 cache 抖动,读放大接近 lsm-tree 推导的最坏情况——层数 × 文件数。

4.3 Iterator 与窗口清理

窗口触发后清理 state 往往遍历 namespace 或依赖 TTL。RocksDB Iterator 按 key 序扫描,成本与扫描范围内 SST 层数、文件数成正比。Flink 的 RocksDBMapState / RocksDBValueState 在 iterator 路径上同样走 JNI,大 state 下的 full scan 是 CPU + I/O 双高操作——这不是调大 parallelism 能线性分摊的,因为 scan 发生在持有该 KeyGroup 的 subtask 本地。

读放大公式(lsm-tree 第 1 篇)在存在 Bloom Filter 时,对「不存在 key」的期望额外磁盘读:

\[ RA_{\text{extra}} \approx (L - 1) \times p \]

\(p\) 为 Bloom 误判率。Flink 状态读大多是「已知存在」的 update/read,热点 key 常落在较浅层,但实际生产里 窗口 state 键空间巨大且均匀分散 时,读尾延迟仍可能明显高于 HashMap 后端。


五、Checkpoint:增量 SST 上传 vs 全量 Snapshot

5.1 全量 checkpoint / Savepoint

未开启增量 checkpoint,或 Savepoint(canonical 格式)时,Flink 走 全量快照 路径:把 RocksDB 中的 kv 按 Flink 统一格式迭代写出,或通过 RocksDB 原生 checkpoint 导出完整文件集再 整体上传。Savepoint 的设计目标是 自包含、可移植、跨版本——不依赖「上一次 checkpoint 的文件引用链」。

特点:

5.2 增量 checkpoint 机制

启用 EmbeddedRocksDBStateBackend(true)execution.checkpointing.incremental: true 后,Flink 使用 RocksIncrementalSnapshotStrategy(源码 flink-statebackend-rocksdb,A 级):

  1. Flush MemTable → 得到稳定 SST 集合;
  2. 调用 RocksDB Checkpoint API,在本地临时目录 硬链接 当前 SST 与 MANIFEST(不复制数据块);
  3. 对比 上次 completed checkpoint 的文件清单:
    • 新 SST / 新 MANIFEST 片段 → 上传到 CheckpointStorage;
    • 已上传且仍被 RocksDB 引用的 SST → 只在 state handle 里 增加引用,不上传内容;
  4. 生成 IncrementalRemoteKeyedStateHandle(shared state + private state 分离);
  5. JobManager 确认 checkpoint 完成后,notifyCheckpointComplete 触发 引用计数 更新;被 Compaction 淘汰且不再被任何 checkpoint 引用的 SST 可从远程存储删除。
flowchart TB
  subgraph CP_n["Checkpoint N"]
    SST_new["新 SST 文件<br/>上传"]
    SST_ref["旧 SST 文件<br/>仅加引用"]
  end
  subgraph CP_n1["Checkpoint N+1"]
    SST_new2["新增 SST"]
    SST_ref2["复用 CP N 的 SST"]
    SST_drop["被 compaction 淘汰的 SST<br/>解除引用"]
  end
  CP_n --> CP_n1

Flink 文档指出:增量 checkpoint 历史 不会无限增长——RocksDB Compaction 合并旧 SST 后,Flink 的引用链会 subsumed 旧 checkpoint,自动 prune(Incremental Checkpoints 节)。这与 lsm-tree compaction 篇 Version/MANIFEST 的「旧文件在新 Version 安装后删除」同构,只是 引用者从 RocksDB Version 变成了 Flink CheckpointCoordinator

5.3 IO 差异与 Web UI 误读

模式 每次 checkpoint 上传量 恢复路径 典型用途
全量 \(\approx\) 全状态 单 checkpoint 自包含 Savepoint、小状态
增量 \(\approx\) 自上次以来的 SST 增量 需沿引用链收集 SST 大状态生产 checkpoint

重要:开启增量后,Flink Web UI 的 Checkpointed Data Size 显示的是 当次 delta 大小,不是全状态体积——用该数字估算磁盘或 S3 容量会严重低估。

官方博客(2018,机制仍适用)描述的权衡:

ForSt state backend(Flink 2.x 实验特性)把 SST 放远程文件系统,增量语义延伸为 disaggregated state——本文不展开,生产默认仍以 EmbeddedRocksDB 本地 SST 为准。

5.4 与 HashMap 后端的 checkpoint 对比

HashMapStateBackend 不支持 增量 checkpoint:状态是 JVM 对象,只能全量序列化。对比维度:

HashMap RocksDB 增量
上传内容 Java 序列化 blob SST + MANIFEST 引用
状态大小上限 堆 + 磁盘临时文件 本地磁盘 LSM
单次 checkpoint CPU 序列化全量 Flush + 文件 diff + 上传
与 Compaction 关系 Compaction 改变 SST 集合 → 影响增量链

六、异步快照与资源隔离

Flink 文档:EmbeddedRocksDBStateBackend 始终异步快照——barrier 对齐后的 snapshotState 把「创建 RocksDB 一致性视图 + 提交异步上传任务」与正常处理分离。RocksDB 的 Snapshot(DB 级快照,非 Flink savepoint)提供轻量级一致性读视图,Compaction 与前台写继续,旧 SST 在快照释放前不被删除(与 lsm-tree Version 引用计数相同)。

每个 slot 内 多个 RocksDB 实例(多个算子各一份)共享 managed memory 的 cache 与 write buffer pool。一个算子 Compaction 风暴会挤占同 slot 其他算子的 block cache——SlotSharingGroup 隔离 CPU/网络但不隔离 RocksDB native memory,大状态混部时需要 手动拆分 SlotSharingGroup 或调大 TM managed memory

Timer 默认也存 RocksDB(state.backend.rocksdb.timer-service.factory=rocksdb)。Timer 数量极大时,可改为 heap 存 timer(文档注明:heap timer 与 RocksDB 后端组合时 timer state 不支持异步快照,且与 raw keyed state 写入不兼容——仅高级自定义算子需警惕)。


七、可复现实验(不提供伪造数字)

下列步骤用于在自有集群对比 HashMap 与 RocksDB 的 checkpoint 行为;本文未在本地执行,故不写入具体毫秒数或 GB 数。

环境建议(与 系列 PLAN 实验台账一致):

步骤概要

  1. 同一作业分别用 HashMapStateBackendEmbeddedRocksDBStateBackend(true) 运行;
  2. 在 Flink REST API 或 Web UI 记录 lastCheckpointExternalPathcheckpointedSizedurationsyncDuration
  3. 在 TM 主机用 du -sh 观察 state.backend.rocksdb.localdir 占用;
  4. 用 RocksDB 日志或 Flink native metrics(state.backend.rocksdb.metrics.enable=true 后选择性打开)观察 num-immutable-mem-tablestall-micros

预期定性结论(推导自机制,非 fabricated benchmark):


八、边界与后文

本文 重复 第 10 篇 的 barrier 对齐算法,也 展开 第 14 篇 的端到端 exactly-once。RocksDB 路径只回答「状态 bytes 落盘与快照如何工作」。

未覆盖

下一篇 第 13 篇 在此路径之上讨论:窗口 state 膨胀、Compaction 与 checkpoint 争抢磁盘、hot key 导致单 subtask RocksDB 过热,以及 何时改 state 设计比调 write_buffer_size 更有效


参考资料

  1. Apache Flink Documentation, State Backends(EmbeddedRocksDBStateBackend、Column Family、managed memory、incremental checkpoint、timer service)。A 级。
  2. Apache Flink Documentation, Checkpointing(checkpoint 存储布局、增量语义)。A 级。
  3. Apache Flink 源码:flink-statebackend-rocksdbRocksDBKeyedStateBackendCompositeKeySerializationUtilsRocksIncrementalSnapshotStrategy(release-1.20 / release-2.x 分支)。A 级。
  4. Apache Flink 官方博客,Managing Large State in Apache Flink: An Intro to Incremental Checkpointing(2018-01-30)。B 级。
  5. RocksDB Wiki, CompactionBlock Cache。A 级。
  6. 本仓库 lsm-tree 系列第 1 篇 全景第 4 篇 Compaction。机制对照。
  7. 本系列:第 9 篇 键控状态第 10 篇 Checkpoint第 11 篇 Savepoint

返回 系列目录 · 上一篇 Savepoint 与升级恢复 · 下一篇 状态放大、Compaction 与调优

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】状态放大、Compaction 与调优

在 RocksDB state backend 读写路径之上,拆解窗口 state 膨胀、LSM 写放大与 checkpoint 争抢磁盘、Flink managed memory 与 RocksDBOptionsFactory 调参边界,以及 hot key 导致单 subtask 过热时的诊断与「改 state 设计 vs 拧参数」取舍。

2026-07-01 · database / distributed

【流式数据处理】背压、故障模式与引擎对照

收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。

2026-07-01 · database / distributed

【流式数据处理】DataStream 与算子语义

拆解 Source/Transform/Sink 数据流图、rebalance/keyBy/broadcast 等 shuffle 策略、keyBy 到 KeyGroup 的映射,以及 ProcessFunction 与 TimerService 如何承载事件时间逻辑,并引入算子状态与键控状态的分工边界。


By .