土法炼钢兴趣小组的算法知识备份

【流式数据处理】状态放大、Compaction 与调优

文章导航

分类入口
databasedistributed
标签入口
#flink#rocksdb#state-tuning#compaction#write-amplification#hot-key#skew#window-state#checkpoint#lsm-tree

目录

第 12 篇EmbeddedRocksDBStateBackend 的 ColumnFamily、KeyGroup 前缀、增量 checkpoint 与 lsm-tree 系列 的 WAL / MemTable / SSTable 路径对齐了。生产里更常见的故障却是:磁盘占用持续爬升、checkpoint 同步阶段越来越长、某个 subtask 的 RocksDB 指标独高、全作业背压但 CPU 并不满——这些往往不是「再调一个 JVM 参数」能解决的,而是 state 设计、LSM 写放大、KeyGroup 倾斜 叠在一起的结果。

本文回答四个问题:

  1. 窗口 state、长 TTL、MapState 无限增长各自如何把 逻辑状态 放大成 磁盘 bytes
  2. RocksDB Compaction 与 Flink 增量 checkpoint 如何在 TaskManager 磁盘上抢 I/O?写放大公式 \(WA \approx T \times L\) 在流作业里意味着什么?
  3. Flink 场景下 write_buffer_sizemax_background_jobs、managed memory 比例等旋钮 有效边界 在哪?
  4. Hot key 倾斜 时,为什么加并行度无效,何时该改 key 设计或预聚合而不是继续调 RocksDB?

本文是「流式数据处理」系列第 13 篇(共 18 篇)。→ 系列目录

篇目 核心内容
第 12 篇 · RocksDB 内核路径 ColumnFamily、读写路径、增量 checkpoint
第 13 篇 · 状态放大与调优 窗口膨胀、写放大、hot key、设计 vs 调参
第 14 篇 · 交付语义 Source / 引擎 / Sink 三层语义

版本锚定:Flink 1.20+ / 2.x;RocksDB 随 flink-statebackend-rocksdb 捆绑。调参 API 以 EmbeddedRocksDBStateBackendRocksDBOptionsFactory 为准。

环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2),未安装 JVM / Flink 集群。机制来自 Flink 官方文档、RocksDB Wiki Tuning Guide 与 flink-statebackend-rocksdb 源码;不粘贴未执行的 Web UI 截图,也不伪造 checkpoint 耗时或 live-sst-files 曲线数字。文末给出可复现实验步骤。


一、状态放大的三个来源

1.1 逻辑条目数 ≠ 磁盘占用

第 9 篇 给出窗口 state 条目估算:

\[ N_{\text{state}} \approx N_{\text{key}} \times N_{\text{window/key}} \]

对 RocksDB 后端,每条 entry 还要乘以 序列化开销LSM 多版本 / 未 compaction 的 tombstone

\[ \text{磁盘占用} \gtrsim N_{\text{state}} \times (\|key\| + \|value\| + \text{overhead}) \times (1 + WA_{\text{factor}}) \]

其中 \(WA_{\text{factor}}\) 来自 lsm-tree 第 4 篇 Compaction 的 Leveled 写放大:逻辑写入 1 GB,Compaction 可能额外写数十 GB。Web UI 里的「状态大小」与 TM 上 du -sh 的 RocksDB 目录 often 差一个数量级——这不是 bug,是 LSM 账单。

1.2 窗口 state 膨胀

典型模式:keyBy(userId) + 滑动窗口(例如 1 小时窗口、5 分钟 slide)+ ProcessWindowFunction 保留明细 ListState

因素 对 state 的影响
滑动窗口 overlap 同一 event 落入多个窗口 namespace,条目乘上 overlap 因子
allowedLateness 过大 窗口触发后 state 仍保留至迟到数据截止,清理滞后
自定义 Trigger 未 fire-and-purge 窗口「已输出」但 state 未 clear
事件时间 watermark 推进慢 窗口 end time 迟迟不到,state 常驻

第 3 篇 的 Session 窗口在 gap 内合并,活跃 session 数随 key 波动;高基数 key(百万级 userId)× 长 session gap → RocksDB iterator 扫描成本上升(第 12 篇 读路径)。

设计层修复(优先于调参):

1.3 State TTL 与 Compaction 清理不同步

启用 StateTtlConfigcleanupInRocksdbCompactFilter=true 时,过期 entry 在 Compaction 中过滤(Flink 文档 State TTL)。这意味着:

TTL 是 最终清理,不是 实时 cap。若业务要求「状态不能超过 X GB」,必须 限制 key 基数或窗口覆盖范围,不能单靠 TTL。

1.4 MapState / ListState 无界增长

反模式:MapState 存「每个 user 见过的所有 productId」且无 TTL、无窗口边界。Keyed state 按 KeyGroup 分片,单 key 的 Map 再大也落在某一个 subtask 的一个 RocksDB 实例里——这是 hot key 问题的温床(第四节)。


二、写放大、Compaction 与 Checkpoint 的 I/O 争抢

2.1 写路径回顾与放大公式

第 12 篇lsm-tree 全景 一致:前台 Put → WAL + MemTable;后台 Flush → L0 SST;Leveled Compaction → L1…Ln。

Leveled 策略的写放大近似:

\[ WA \approx T \times L \]

\(T\) 为层间容量倍数(RocksDB 默认 10),\(L\) 为层数。Flink 作业 持续更新 同一批 hot key 时,Compaction 反复归并相同 key 范围,写放大感知更明显——磁盘带宽被 Compaction 吃满后,前台 Put stall(RocksDB stall-micros 上升),反压经算子链上传(第 18 篇)。

2.2 增量 Checkpoint 叠加 Flush

增量 checkpoint 在 snapshot 前 同步 Flush MemTable(Flink 官方博客 Incremental Checkpointing第 12 篇)。时间线上:

sequenceDiagram
  participant OP as 算子写入
  participant R as RocksDB
  participant CP as Checkpoint
  OP->>R: 持续 Put
  R->>R: 后台 Compaction
  CP->>R: 同步 Flush(barrier 对齐后)
  R->>R: 新 L0 SST
  CP->>CP: diff SST 上传
  Note over R,CP: Flush + Compaction + 上传<br/>共享 TM 磁盘与网络

症状对照

现象 可能原因
syncDuration 升高、asyncDuration 正常 MemTable 大、Flush 慢
checkpoint 成功但 TM 磁盘 %util 长期 100% Compaction 与 Flush 叠加
checkpointedSize 小但本地 du Compaction 未合并旧 SST;引用链仍保留远程 SST
反压 + checkpoint 超时连锁 I/O 瓶颈 → barrier 对齐慢 → 超时 → 回滚重算

调短 checkpoint interval 不会 降低 Compaction 压力,反而 增加 Flush 与上传频率第 10 篇min-pause-between-checkpoints 在此有实际意义:给 Compaction 留呼吸窗口。

2.3 与 HashMap 后端的对比边界

状态小于 JVM 堆安全阈值、更新模式偏 读多写少 时,HashMap 后端避免 Compaction 税(第 9 篇)。一旦:

就应接受 RocksDB 的 LSM 账单,并在 state 设计 + Compaction 预算 上投入,而不是指望「调大堆」一劳永逸。


3.1 Managed Memory 划分

Flink 默认 state.backend.rocksdb.memory.managed=true(文档 Memory Management):

配置 默认 作用
state.backend.rocksdb.memory.write-buffer-ratio 0.5 Write Buffer Manager 预算
state.backend.rocksdb.memory.high-prio-pool-ratio 0.1 Block Cache 中 index/filter 预留
state.backend.rocksdb.memory.partitioned-index-filters true 大 key 空间下降低 filter 内存

同一 slot 内 多个算子、多个 RocksDB 实例共享 该预算(第 12 篇)。一个算子 Compaction 风暴会挤占同 slot 其他算子的 block cache——大状态混部时用 SlotSharingGroup 隔离(第 7 篇)。

3.2 RocksDBOptionsFactory 常用项

通过 state.backend.rocksdb.options-factory 或代码注入 RocksDBOptionsFactory(Flink 文档 Advanced RocksDB State Backend Options):

public class FlinkRocksDBOptionsFactory implements ConfigurableRocksDBOptionsFactory {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, ColumnFamilyOptions columnOptions) {
        return currentOptions
            .setMaxBackgroundJobs(4)
            .setMaxSubcompactions(2);
    }

    @Override
    public ColumnFamilyOptions createColumnOptions(
            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions
            .setWriteBufferSize(64 * 1024 * 1024L)
            .setMaxWriteBufferNumber(3)
            .setMinWriteBufferNumberToMerge(2);
    }
}
参数 调大效果 风险
write_buffer_size 减少 Flush 次数 单次 Flush 更慢;checkpoint 同步 Flush 更久
max_write_buffer_number 缓解写入 stall 内存占用上升
max_background_jobs Compaction 更快 CPU 与磁盘 I/O 争抢加剧
target_file_size_base 更大 SST,减少文件数 单次 Compaction 更重
level0_slowdown_writes_trigger / stop_writes_trigger 默认背压阈值 盲目放宽会 L0 堆积

原则:在 managed memory 总预算固定时,write buffer 与 block cache 是零和。读热点明显(维表 join、频繁 value())→ 略增 cache 比例;写密集窗口聚合 → 保证足够 write buffer 并控制 max_background_jobs 不超过磁盘 IOPS 上限。

3.3 Native Metrics 诊断

启用 state.backend.rocksdb.metrics.enable=true 后,选择性打开(文档列出的 metric 名):

Flink Web UI 单个 subtask 的 metrics 若显著高于同算子其他 subtask,优先怀疑 KeyGroup 倾斜(第四节),而非全局 RocksDB 参数。

3.4 Changelog State Backend(边界)

Flink Changelog state backend(FLIP-158)在 RocksDB 之上追加 changelog 以缩短 checkpoint 同步时间——改变的是 快照频率与上传路径,不消除 Compaction。生产默认仍以 EmbeddedRocksDB 为主;若 syncDuration 长期占 checkpoint 总时长大部分,可评估 changelog,但 state 条目数膨胀 仍需从设计层解决。

3.5 Timer Service 与 RocksDB 的叠加

Keyed 算子的 processing-time / event-time timer 默认也写入 RocksDB(state.backend.rocksdb.timer-service.factory=rocksdb)。Timer 条目与 keyed state 共用 同一 RocksDB 实例的 CF 与 I/O 预算,但 key 布局不同(timer 按 timestamp 排序)。

场景 风险
大窗口 + 每 key 大量 event-time timer timer 条目数 \(\approx\) 窗口内事件数,与 window state 双重膨胀
watermark 长期不推进 event-time timer 无法触发删除,RocksDB 持续 Put
timer-service.factory=heap timer 走堆内存,不支持 与 RocksDB keyed state 组合的异步快照路径(文档 State Backends 限制)

排查 checkpoint 体积异常时,除 window state 外应查看 registered timer 数量(Flink metrics numRegisteredEventTimeTimers / numRegisteredProcessingTimeTimers per subtask)。

下列配置与代码中的 EmbeddedRocksDBStateBackend(true) 配合使用;键名以 Flink 1.20 文档为准,2.x 若有迁移以发行说明为准。

state.backend.type: rocksdb
execution.checkpointing.incremental: true
state.backend.rocksdb.localdir: /data/flink/rocksdb

# Managed memory(与 TM process size 联动)
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.5
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1

# 诊断
state.backend.rocksdb.metrics.enable: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.num-immutable-mem-table: true
state.backend.rocksdb.metrics.stall-micros: true
state.backend.rocksdb.metrics.compaction-pending: true

# Checkpoint 与 Compaction 错峰
execution.checkpointing.interval: 120000
execution.checkpointing.min-pause-between-checkpoints: 60000
execution.checkpointing.timeout: 600000

注意localdir 必须落在 本地 SSD;与 checkpoint 存储(S3/HDFS)分离。把 RocksDB 目录放在网络盘会放大 第 12 篇 读路径延迟,且 Compaction 与远程 checkpoint 上传 争抢网络

3.7 远程 checkpoint 存储上的「状态体积」误读

增量 checkpoint 下,Flink Web UI 的 Checkpointed Data Size当次 delta第 12 篇)。运维容量规划还需:

\[ \text{远程占用} \approx \text{最新 completed checkpoint 引用的 SST 闭包} \]

Compaction 合并 SST 后,旧文件引用被 subsumed,远程体积可能 下降;若 state 逻辑只增不减,引用闭包仍单调涨。不能 用单次 delta 推断总占用,也不能用 TM 本地 du 直接等同远程账单——本地还有 WAL、未上传 SST、Compaction 临时文件。


四、Hot Key 倾斜与 Subtask 负载不均

4.1 KeyGroup 不等于均匀

keyBy 把 key 哈希到 maxParallelism 个 KeyGroup(第 8 篇)。均匀哈希保证 key 空间期望均匀,不保证业务热点均匀

每个 subtask 负责 连续 KeyGroup 区间。倾斜时,一个 subtask 的 RocksDB 实例承担过多更新与 Compaction,表现为:

flowchart LR
  subgraph Keys["业务 key 流量"]
    H1["hot key 40%"]
    H2["hot key 35%"]
    T["长尾 25%"]
  end
  subgraph KG["KeyGroup 映射"]
    ST2["subtask 2<br/>RocksDB 过热"]
    ST0["subtask 0"]
    ST1["subtask 1"]
  end
  H1 --> ST2
  H2 --> ST2
  T --> ST0
  T --> ST1

4.2 加并行度为什么常常无效

parallelism 从 8 提到 32 不会 拆分单个 hot key 的 state——同一 key 永远落在 唯一 KeyGroup。并行度只切 KeyGroup 区间,不切单 key。有效手段:

手段 做法 代价
本地预聚合 两阶段:keyBy(salt(userId)) 局部聚合 → keyBy(userId) 全局合并 延迟与 state 结构变复杂
改 key 引入 hash(userId) % N 作中间 key 下游需二次 shuffle
拆分作业 热点 key 单独作业 + 侧输出 运维复杂度
custom Partitioner 仅当明确热点 key 集合可枚举 维护成本高

第 18 篇 的倾斜诊断清单与此一致:先看 subtask 级 recordsIn 与 RocksDB metrics,再决定是否 rescale。

4.3 rescale 与 savepoint

从 savepoint 恢复并 提高 maxParallelism 可重划 KeyGroup(第 11 篇),但若热点 key 不变,新 subtask 仍会接住该 key。Rescale 解决的是 算力不够,不是 单 key 状态过大

4.4 诊断步骤(REST / Web UI)

  1. Flink Web UI → Metrics → 算子 → subtask:比较 numRecordsInPerSecondbusyTimeMsPerSecond
  2. RocksDB native metrics(第三节):对比同算子各 subtask 的 live-sst-files-sizestall-micros
  3. Checkpoint → History:最慢 subtask 是否长期为同一 index(sync_duration 瓶颈);
  4. 源端:Kafka partition lag 是否仅对应 busy subtask 消费的 partition 子集——若 source 并行度与 keyBy 不一致,需先排除 source 倾斜 再判 keyed 倾斜。

若仅 一个 subtask 三项均异常,几乎可断定 hot key 或 KeyGroup 分配不均;若 全部 subtask RocksDB 指标同步恶化,回到 全局 state 膨胀或 Compaction 预算(第一、二节)。

4.5 两阶段预聚合示例(思路)

keyBy(userId) 全局 UV 统计,热点 userId 可把单 subtask 压垮。可拆为:

flowchart LR
  IN[输入] --> K1["keyBy(hash(userId)%S)<br/>局部 AggregatingState"]
  K1 --> SH[shuffle]
  SH --> K2["keyBy(userId)<br/>全局合并"]
  K2 --> OUT[输出]

第一阶段 \(S\) 个 salt bucket 把同一 userId 的更新分散到 \(S\) 个 subtask;第二阶段 state 仅存 每 user 一条聚合值,不再存明细 ListState。代价是 多一次 shuffle略高 end-to-end 延迟——这是用网络换磁盘与 Compaction 的典型交易。


五、何时改 State 设计,何时拧 RocksDB 参数

5.1 决策表

症状 优先动作
窗口 state 条数 \(\propto\) 事件率 × 窗口长度 改窗口类型 / 聚合 state / TTL
全 subtask 磁盘同步涨 checkpoint interval、Changelog、增量 checkpoint 已开?
单 subtask 独高 hot key、自定义 partition、预聚合
stall-micros 全局高、Compaction pending max_background_jobs、磁盘 IOPS、降写入量
读延迟高、写正常 block cache 比例、Bloom、减少 full scan
checkpoint syncDuration 减小 write_buffer_size 或评估 unaligned checkpoint

经验边界:若 state 条目数估算错误一个数量级,任何 RocksDB 调参都只是延缓 OOM。先画 \(N_{\text{key}} \times N_{\text{window/key}}\)第 9 篇),再动 write_buffer_size

5.2 与 lsm-tree 系列的对照

教学实现(lsm-tree 系列) Flink RocksDB state
单进程 KV benchmark 多 subtask × 多 CF × KeyGroup
写放大公式 \(WA \approx T \times L\) 同样适用;叠加 checkpoint Flush
无外部快照 增量 checkpoint 引用 SST 链
均匀随机 key 业务 skew 为主因

深入 Compaction 算法与 Version 见 lsm-tree Compaction 篇;SSTable 布局见 SSTable + Bloom Filter 篇


六、可复现实验(不提供伪造数字)

环境建议(与 系列 PLAN 实验台账一致):

观测项(≥3 次 checkpoint 取中位数趋势,本文不填具体数值):

  1. TM 上 state.backend.rocksdb.localdirdu -sh
  2. REST API:/jobs/:id/checkpointssync_durationcheckpointed_size
  3. 单 subtask rocksdb.live-sst-files-sizestall-micros
  4. 组 B 中 busy subtask 的 numRecordsInPerSecond vs 其他 subtask。

预期定性结论(机制推导):

6.1 生产告警阈值(建议口径,非硬编码)

下列为 运维启发式,须按集群基线校准;本文不给未实测的绝对毫秒数。

信号 建议动作
连续 3 次 checkpoint sync_duration > 总 duration 50% 查 MemTable / Flush;评估减小 write_buffer_size 或 changelog
rocksdb.compaction-pending=1 持续 > 1 个 checkpoint 周期 max_background_jobs 或降写入;查 TTL tombstone
单 subtask busy > 90% 且 peers < 30% hot key 排查(第四节)
TM 磁盘使用率 > 85% 且 state 逻辑未扩容 Compaction 跟不上或 remote ckpt 引用链;紧急 savepoint + 缩 state 或扩盘
Full GC 频繁且 HashMap backend 迁移 RocksDB 前先算 \(N_{\text{state}}\),避免「换后端仍膨胀」

七、边界与后文

本文 重复 第 12 篇 的 ColumnFamily 布局与增量 snapshot 算法,也 展开 第 14 篇 的端到端 exactly-once。

未覆盖

下一篇 第 14 篇 把视角从「状态 bytes」转到「记录是否重复、是否丢失」:Source、引擎、Sink 三层交付语义如何组合,以及 at-least-once 与 exactly-once 的边界。


参考资料

  1. Apache Flink Documentation, State BackendsRocksDB State BackendMemory ManagementState TTLMonitoring(RocksDB native metrics)。A 级。
  2. Apache Flink 源码:flink-statebackend-rocksdbEmbeddedRocksDBStateBackendRocksDBOptionsFactory(release-1.20 / release-2.x)。A 级。
  3. RocksDB Wiki, Tuning GuideCompactionWrite Stalls。A 级。
  4. Apache Flink 官方博客,Managing Large State in Apache Flink: An Intro to Incremental Checkpointing(2018)。B 级。
  5. 本仓库 lsm-tree 系列全景Compaction。机制对照。
  6. 本系列:第 9 篇 键控状态第 10 篇 Checkpoint第 12 篇 RocksDB 路径

返回 系列目录 · 上一篇 RocksDB State Backend 内核路径 · 下一篇 交付语义:从 at-most-once 到 exactly-once

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】RocksDB State Backend 内核路径

拆解 Flink EmbeddedRocksDBStateBackend 的物理布局:每个 subtask 独立 RocksDB 实例、ColumnFamily 与 KeyGroup 前缀映射、写路径 memtable→WAL→flush→compaction 与 lsm-tree 系列对照、读路径 block cache 与读放大、增量 checkpoint 与全量 snapshot 的 IO 差异。

2026-07-01 · database / distributed

【流式数据处理】背压、故障模式与引擎对照

收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。


By .