第
8 篇 说明 Keyed State 绑定在 keyBy
之后的 (operator, key) 上,KeyGroup 个数等于
maxParallelism。生产故障清单里
「状态膨胀」「TM OOM」「checkpoint
越来越大」 几乎都落在这一层:窗口未清理、MapState
无限增长、TTL 未配或清理策略不对、HashMap
后端撑爆堆内存。
本文回答五个运维与开发会直接碰到的问题:
- ValueState / ListState / MapState / ReducingState / AggregatingState 各自存什么、适合什么聚合模式?
- HashMapStateBackend 与 EmbeddedRocksDBStateBackend 的读写路径与选型边界在哪(与 lsm-tree 系列 的 LSM 对照留到 第 12 篇)?
- State TTL 的
UpdateType、StateVisibility、清理策略在 checkpoint 与 RocksDB compaction 里各起什么作用? - 如何 估算 窗口 state 条目数与 RocksDB 磁盘占用,避免凭感觉设并行度?
- Flink 1.20+ / 2.x 在 state 接口与 TTL 上有哪些版本边界?
Checkpoint 如何把 Keyed State 快照进持久存储、与 Kafka offset 对齐,见 第 10 篇。RocksDB 增量 checkpoint 与 compaction 调优见 第 12、13 篇。窗口 Trigger 与 Evictor 对 state 生命周期的影响见 第 3 篇。
环境说明:本机 WSL2,未安装 Flink / RocksDB。状态大小估算给出 公式与心算方法,不伪造 TM 磁盘曲线;与 lakehouse 第 6 章 一样,性能数字只在有实验处出现。
版本锚定:Flink 1.20+ / 2.x。1.13 起
MemoryStateBackend /
FsStateBackend /
RocksDBStateBackend 分别演进为
HashMapStateBackend、EmbeddedRocksDBStateBackend(checkpoint
仍写分布式文件系统或对象存储)。Flink 2.x 提供 State
V2 API(Working with State V2),TTL
配置对象仍为 StateTtlConfig,语义与 V1
一致。
一、Keyed State 在 subtask 内的布局
每个 keyed 算子 subtask 维护一个逻辑上的 键值存储:当前 event 的 key 决定读写哪个命名空间(来源:Flink Documentation,Stateful Stream Processing)。
flowchart TB
subgraph ST["subtask 3 (parallelism=8)"]
NS1["namespace: key=A<br/>ValueState / MapState / ..."]
NS2["namespace: key=B"]
NS3["namespace: key=C"]
end
IN["输入 record (key=B)"] --> NS2
KeyGroup 与 subtask:subtask 3 可能负责 KeyGroup \(\{7, 15, 23, \ldots\}\) 内的所有 key(具体集合由 maxParallelism 与 parallelism 决定,第 8 篇)。rescaling 时 KeyGroup 在 subtask 间迁移,state 随 KeyGroup 走(第 11 篇)。
窗口 state 在命名空间上再嵌套 windowId(窗口起止时间戳等),因此 state 条目数 \(\approx\) 活跃 key 数 × 每 key 窗口数——这是状态膨胀的第一来源。
二、五种 Keyed State 语义
以下类型均在 RichFunction /
KeyedProcessFunction /
窗口函数 中通过
StateDescriptor 注册(来源:Flink
Documentation,Working with State)。
2.1 ValueState<T>
语义:每个 key 一个可变单值,类型 \(T\)。
ValueStateDescriptor<Long> desc =
new ValueStateDescriptor<>("count", Long.class);
ValueState<Long> count = getRuntimeContext().getState(desc);
public void processElement(Event e, Context ctx, Collector<Out> out) throws Exception {
Long c = count.value();
if (c == null) c = 0L;
count.update(c + 1);
}| 适用 | 不适用 |
|---|---|
| 计数、最后见到的时间戳、单值缓存 | 需保留全量历史事件列表 |
与 ReducingState 相比,ValueState
无内置聚合函数,每次完全替换
update(value)。
2.2 ListState<T>
语义:每个 key 一个
有序列表,可 add /
get / update 全列表。
ListStateDescriptor<Event> bufDesc =
new ListStateDescriptor<>("buffer", Event.class);
ListState<Event> buffer = getRuntimeContext().getListState(bufDesc);
buffer.add(event);
Iterable<Event> all = buffer.get();| 适用 | 风险 |
|---|---|
| 会话缓冲、待排序小批次 | 列表长度无界 → state 爆炸 |
ListState 元素在 TTL 下独立过期(见第六节):每个元素可有独立过期时间(集合型 state 的 per-entry TTL)。
2.3 MapState<UK, UV>
语义:每个 key 对应一个 嵌套 Map(用户键 UK → 用户值 UV)。
MapStateDescriptor<String, Long> mapDesc =
new MapStateDescriptor<>("user-counts", String.class, Long.class);
MapState<String, Long> map = getRuntimeContext().getMapState(mapDesc);
map.put(subKey, 1L);
Long v = map.get(subKey);| 适用 | 风险 |
|---|---|
| 多字段动态维、CEP 偏复杂模式 | UK Cardinality 无限 时磁盘线性涨 |
MapState 的 null 值 依赖序列化器是否支持
null;官方文档注明必要时用
NullableSerializer(多占一字节)。
2.4 ReducingState<T>
语义:每个 key 一个
可结合(associative)+
可交换(commutative) 的聚合值;新元素通过
ReduceFunction 合并。
ReducingStateDescriptor<Long> sumDesc = new ReducingStateDescriptor<>(
"sum", (a, b) -> a + b, Long.class);
ReducingState<Long> sum = getRuntimeContext().getReducingState(sumDesc);
sum.add(1L);
Long total = sum.get();要求:ReduceFunction
必须满足结合律,否则 checkpoint 恢复后重放顺序不同会导致
结果错误。
| 对比 ValueState | ReducingState 省什么 |
|---|---|
| 手动 read-modify-write | 内置增量合并,窗口增量聚合常用 |
2.5 AggregatingState<IN, OUT>
语义:通过 AggregateFunction(create / add / merge / getResult)维护聚合结果;输入类型 IN 与输出类型 OUT 可不同。
AggregatingStateDescriptor<Event, Acc, Result> aggDesc =
new AggregatingStateDescriptor<>("agg", new MyAggregateFunction(), Result.class);
AggregatingState<Event, Result> agg = getRuntimeContext().getAggregatingState(aggDesc);| 适用 | 说明 |
|---|---|
| 窗口 AggregateFunction 后端 | merge 用于 session 合并 |
窗口 API 中
AggregateFunction +
ProcessWindowFunction 组合:AggregatingState 存
Acc Accumulator,fire 时
getResult 输出。
2.6 选型对照表
| 类型 | 每 key 结构 | 典型场景 |
|---|---|---|
| ValueState | 单值 | 标志位、最后事件时间 |
| ListState | 列表 | 小缓冲、会话事件集(需 TTL/限长) |
| MapState | 嵌套 map | 动态二级键 |
| ReducingState | 单值聚合 | sum/min/max(可交换结合) |
| AggregatingState | 自定义累加器 | 复杂聚合、窗口增量 |
三、StateBackend:HashMap 与 EmbeddedRocksDB
3.1 抽象层次
StateBackend 决定 Keyed State 与 Operator State 存在哪、checkpoint 时如何快照(来源:Flink Documentation,State Backends)。
flowchart LR
OP["Keyed 算子"]
SB["StateBackend"]
HM["HashMapStateBackend<br/>JVM 堆内"]
RDB["EmbeddedRocksDBStateBackend<br/>本地磁盘 LSM"]
CP["Checkpoint 存储<br/>DFS / S3"]
OP --> SB
SB --> HM
SB --> RDB
HM --> CP
RDB --> CP
CheckpointStorage 与 StateBackend 正交:即使 HashMap 堆内,checkpoint 也可写到 S3;RocksDB 则 平时在本地盘,checkpoint 上传 SST 或快照(第 12 篇)。
3.2 HashMapStateBackend
- 运行时:state 全在 TM JVM 堆(或少量 off-heap,视版本)。
- checkpoint:全量序列化 state 到远程存储。
- 优点:读写延迟低,无 LSM compaction;小 state 作业简单。
- 缺点:state 大于可用堆 → OOM;大 checkpoint 序列化 CPU 与 IO 峰值高。
配置(1.20+):
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://bucket/flink-checkpoints");3.3 EmbeddedRocksDBStateBackend
- 运行时:每个 keyed subtask 独立 RocksDB 实例(本地目录),LSM 结构(与 lsm-tree 系列 MemTable/SSTable 模型同族)。
- checkpoint:支持 全量 与 增量(上传新增 SST,第 12 篇)。
- 优点:state 可 远大于堆;增量 checkpoint 降 IO。
- 缺点:读写走 LSM,读放大 / compaction 影响延迟;调参复杂(第 13 篇)。
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 参数:是否启用增量 checkpoint3.4 选型决策表
| 条件 | 倾向 |
|---|---|
| 总 keyed state < 数百 MB 且稳定 | HashMap |
| state 达 GB~TB | RocksDB |
| 长窗口 + 高 key 基数 | RocksDB + TTL + 窗口长度治理 |
| 低延迟、短 state、可接受重启从 Kafka 重放 | HashMap |
| 与 lakehouse 入湖 同集群,checkpoint 已写 S3 | 两者皆可,看 state 规模 |
不存在「RocksDB 总是更快」:小 state 下 HashMap 往往更省 CPU;RocksDB 的价值在 容量边界 与 增量 checkpoint,不是微基准上的 μs 优势(不做未实测的延迟对比)。
3.5 版本与配置名对照
| 旧名(≤1.12 常见) | 1.13+ 名 |
|---|---|
| MemoryStateBackend | HashMapStateBackend |
| FsStateBackend | HashMapStateBackend + 远程 checkpoint |
| RocksDBStateBackend | EmbeddedRocksDBStateBackend |
flink-conf.yaml 中
state.backend: hashmap / rocksdb
仍可用,映射到上述实现类。
四、Operator State 与 Broadcast State(复习)
Keyed State 占生产故障大多数,但 Operator State 与 EOS 相关(第 8 篇):
| 类型 | 结构 | 场景 |
|---|---|---|
| ListState | 每 subtask 一个 list | Kafka split / offset |
| UnionListState | rescale 时并集 | 旧 API 兼容 |
| BroadcastState | 每 subtask 完整 map | 规则流 |
Broadcast State 不适合大维表:内存 \(\propto\) parallelism × 维表大小。
五、State TTL 配置语义
State TTL 为 Keyed State
任意类型 提供
最佳努力(best-effort)
过期清理(来源:Flink Documentation,State TTL;API
StateTtlConfig)。
5.1 基本用法
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofDays(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupIncrementally(10, true)
.cleanupInRocksdbCompactFilter(1000L)
.build();
ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("with-ttl", String.class);
desc.enableTimeToLive(ttlConfig);5.2 时间基准
| 选项 | 含义 |
|---|---|
| Processing time(默认) | TTL 相对系统时钟 |
| Event time | 部分版本/扩展支持,需查当前 release 文档 |
官方文档明确:TTL 目前以 processing time 为基准(Working with State V2 同述)。事件时间业务过期应结合 窗口 allowed lateness + timer 清理(第 2、3 篇),不能假设「事件时间 TTL」在所有版本默认可用。
5.3 UpdateType:何时刷新过期时钟
| 值 | 行为 |
|---|---|
OnCreateAndWrite(默认) |
创建与 写 时刷新 TTL |
OnReadAndWrite |
读 也刷新(滑动会话) |
OnReadAndWrite 适合「活跃用户状态常驻」;冷 key 仍会在 TTL 后过期。
5.4 StateVisibility:过期但未清理的值能否被读到
| 值 | 行为 |
|---|---|
NeverReturnExpired(默认) |
逻辑上视为不存在 |
ReturnExpiredIfNotCleanedUp |
物理未删前仍可读 |
隐私 / 合规场景用 NeverReturnExpired;调试物理清理延迟可用后者。
5.5 集合类型的 per-entry TTL
ListState、MapState 的 每个元素 / 每个 map entry 独立 TTL。窗口 state 中 每个窗口 namespace 也可独立过期——与 第 3 篇 的 窗口生命周期 叠加时要避免 双计数。
六、TTL 清理策略与 checkpoint 交互
TTL 不保证立即物理删除;清理是 best-effort(来源:官方 State TTL 专文)。
flowchart TB
EXP["状态逻辑过期"]
R1["读路径: NeverReturnExpired 隐藏"]
R2["增量清理 incremental cleanup"]
R3["全量 snapshot 时 cleanupFullSnapshot"]
R4["RocksDB compaction filter"]
EXP --> R1
EXP --> R2
EXP --> R3
EXP --> R4
| 策略 | 触发点 | 适用 backend |
|---|---|---|
| incremental cleanup | 访问 state 时顺带删过期 | HashMap / RocksDB |
| cleanupFullSnapshot | 做全量 checkpoint snapshot 时 | HashMap 为主 |
| RocksdbCompactFilter | RocksDB compaction 线程 | RocksDB |
配置示例:
StateTtlConfig.newBuilder(Duration.ofHours(6))
.cleanupIncrementally(1000, true) // 每访问 1000 条尝试清理
.cleanupInRocksdbCompactFilter(10000L) // compaction filter 检查间隔
.build();6.1 与 checkpoint 的关系
- checkpoint 仍包含未物理删除的过期 key(直到清理策略运行)——snapshot 体积可能 短暂偏大。
- 增量 RocksDB checkpoint 上传 SST;TTL 清理后新 SST 合并,旧 SST 仍可能在 checkpoint 目录保留直到 GC(第 12 篇)。
- NeverReturnExpired 保证 恢复后逻辑正确;物理空间回收滞后只影响 磁盘,不影响 读语义(在 visibility 配置下)。
6.2 常见误配
| 误配 | 后果 |
|---|---|
| 只设 TTL 不开 RocksDB compact filter | 磁盘长期膨胀(第 13 篇) |
| 窗口很长 + OnReadAndWrite + 高 QPS | 热点 key 永不过期 |
| 以为 TTL 替代窗口 allowed lateness | 事件时间语义仍乱序 |
七、状态大小估算
不做 benchmark 也可以 数量级估算,用于并行度与 backend 初选。
7.1 单条 state 条目成本
记:
- \(K\):活跃 业务 key 数(峰值或稳态)
- \(W\):每 key 并发活跃窗口数(与窗口类型相关,第 3 篇)
- \(S\):单条 state 序列化后字节数(含 key、namespace、值、RocksDB 开销)
Keyed 条目数(量级):
\[ N_{\text{entries}} \approx K \times W \times N_{\text{state-types}} \]
\(N_{\text{state-types}}\) 为同一算子上注册的 state descriptor 个数(如 ValueState + MapState 算两种)。
7.2 窗口类型与 \(W\)
| 窗口 | \(W\) 量级(直觉) |
|---|---|
| 滚动 1h,事件时间 | 每 key 通常 1( watermark 推进后关闭) |
| 滑动 1h / 步 5m | 每 key 约 12 个重叠窗口 |
| 会话 gap 30m | 取决于事件密度,无固定上界 → 必须 TTL 或 gap 上限 |
滑动窗口是 状态膨胀经典来源:\(W \approx \lceil \text{windowSize} / \text{slide} \rceil\)。
7.3 磁盘占用(RocksDB)
RocksDB live data 小于 含 tombstone 与未 compaction 的磁盘:
\[ \text{disk} \approx N_{\text{entries}} \times S \times f_{\text{write-amp}} \]
写放大 \(f_{\text{write-amp}}\) 经验上 1.5~3+(与 compaction 策略、更新频率有关,lsm-tree 系列)。只估算 live data 会低估磁盘。
HashMap 堆占用 近似 \(N_{\text{entries}} \times S\) 加 Java 对象头;无 write-amp,但 全量 checkpoint 体积接近 live data 序列化大小。
7.4 心算例子(无实测数字)
假设:
- \(K = 10^6\) 用户
- 滑动窗口:1 小时窗口、5 分钟 slide → \(W \approx 12\)
- 每窗口 AggregatingState 累加器 \(S \approx 64\) B
\[ N_{\text{entries}} \approx 10^6 \times 12 = 1.2 \times 10^7,\quad \text{live} \approx 1.2 \times 10^7 \times 64\,\text{B} \approx 768\,\text{MB} \]
RocksDB 磁盘可能 >1 GB;HashMap 需 堆 >768 MB 仅 state,加 JVM 开销与峰值 checkpoint 易 OOM → 应选 RocksDB 并评估 TTL / 窗口改滚动 / 预聚合。
7.5 从 Flink metrics 校准
作业跑起来后(需真实集群,非本环境):
| Metric | 用途 |
|---|---|
rocksdb.estimate-live-data-size |
subtask 级 live data |
rocksdb.total-sst-files-size |
SST 总大小 |
lastCheckpointSize |
checkpoint 体积趋势 |
numRecordsInPerSecond vs state 增长 |
验证 \(K\) 是否在涨 |
第 13 篇 讲 live-sst-files 随时间曲线 的实验方法;本篇只给 估算式与 metrics 名。
八、窗口 state 与 Kafka 消费位点的内存分工
- Kafka offset:Operator State / SourceReader state,体积 \(\approx O(\text{分区数})\),通常可忽略(第 5、10 篇)。
- 窗口 / 聚合 Keyed State:\(O(K \times W)\),主导 TM 资源。
- EOS Kafka Sink:事务状态另计(第 6 篇)。
lakehouse 第 19 章 的 Writer 缓冲 在 Sink 算子,与 Keyed State 分开;入湖作业 OOM 要分 算子 state 还是 批量缓冲。
九、代码模式:带 TTL 的 MapState 去重
public class DedupFunction extends KeyedProcessFunction<String, Event, Event> {
private MapState<String, Boolean> seen;
@Override
public void open(OpenContext ctx) {
StateTtlConfig ttl = StateTtlConfig
.newBuilder(Duration.ofDays(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(5000L)
.build();
MapStateDescriptor<String, Boolean> desc =
new MapStateDescriptor<>("seen-ids", String.class, Boolean.class);
desc.enableTimeToLive(ttl);
seen = getRuntimeContext().getMapState(desc);
}
@Override
public void processElement(Event e, Context ctx, Collector<Event> out) throws Exception {
if (seen.contains(e.getId())) return;
seen.put(e.getId(), true);
out.collect(e);
}
}Cardinality 等于 7 天内独立 id 数;估算磁盘用 \(S \approx\) id 字符串 + 布尔 + 开销。TTL 7 天是 processing time;若 id 空间无限,仍需 BloomFilter 近似 或 外部 store(超出 Flink state 边界)。
十、Flink 2.x State V2 边界
| 项目 | V1(本篇示例) | V2 |
|---|---|---|
| 包路径 | org.apache.flink.api.common.state.* |
org.apache.flink.api.common.state.v2.*
等 |
| TTL | StateTtlConfig +
enableTimeToLive |
同配置对象 |
| 访问 | getRuntimeContext().getState |
State V2 接口 |
Backend、KeyGroup、TTL 清理策略对 V1/V2 共用。新项目若选 V2,以官方迁移指南为准;机制层仍读本篇。
十一、与 checkpoint / savepoint 的衔接预告
- 全量 checkpoint:HashMap 序列化所有未清理 state;RocksDB 可增量(第 10、12 篇)。
- savepoint:格式更稳定,用于 改并行度、升级;TTL 过期 state 在 savepoint 里 可能仍存在物理副本(第 11 篇)。
- maxParallelism 不可降:KeyGroup 数 写进 savepoint metadata,与 state 条目正交但影响 rescale。
十二、术语表
| 术语 | 含义 |
|---|---|
| Keyed State | 绑定 (operator, key) 的分区状态 |
| StateDescriptor | 注册 state 名称、类型与 TTL |
| StateTtlConfig | TTL 时长、更新类型、可见性、清理策略 |
| HashMapStateBackend | 堆内 state,全量 checkpoint |
| EmbeddedRocksDBStateBackend | 本地 LSM state,支持增量 checkpoint |
| KeyGroup | state rescale 的原子单位 |
| write amplification | LSM 层重复写入导致的磁盘放大 |
| best-effort cleanup | TTL 物理删除延迟,逻辑可见性可配置 |
十三、小结
五种 Keyed State 覆盖从单值到嵌套 map 的聚合模式;选型看 更新模式 与 是否可结合归约。HashMap 适合小 state 低延迟;EmbeddedRocksDB 适合 GB 级以上与增量 checkpoint。State TTL 用 processing time 计时,NeverReturnExpired 保证读语义,物理清理依赖 incremental / snapshot / compaction 三条路径 best-effort 组合。状态大小 用 \(K \times W \times S\) 估算,滑动窗口与无限 MapState 是两大膨胀源;应用 metrics 校准后决定 window 改型、TTL 或 backend 切换。
下一篇进入 Checkpoint 机制:barrier 对齐、Kafka offset 写入 snapshot、aligned vs unaligned 与超时调优。
参考资料
- Apache Flink Documentation, Working with State / Working with State V2(五类 Keyed State、Operator State)。
- Apache Flink Documentation, State Backends(HashMapStateBackend、EmbeddedRocksDBStateBackend)。
- Apache Flink Documentation, State TTL(UpdateType、StateVisibility、cleanup 策略)。
- Apache Flink Documentation, Stateful Stream Processing(KeyGroup 与 keyed state 分片)。
- Apache Flink API,
StateTtlConfig(Flink 2.2-SNAPSHOT Javadoc)。 - Apache Flink 博客, State TTL in Flink 1.8+(RocksDB compact filter 背景,B 级辅助)。
- 本系列 第 3 篇(窗口与 \(W\))。
- 本系列 第 8 篇(keyBy、KeyGroup)。
- 本系列 第 12、13 篇(RocksDB 路径与调优)。
- lsm-tree 系列(LSM 写放大直觉)。
返回 系列目录 | 上一篇:DataStream 与算子语义 | 下一篇:Checkpoint 机制
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】DataStream 与算子语义
拆解 Source/Transform/Sink 数据流图、rebalance/keyBy/broadcast 等 shuffle 策略、keyBy 到 KeyGroup 的映射,以及 ProcessFunction 与 TimerService 如何承载事件时间逻辑,并引入算子状态与键控状态的分工边界。
【流式数据处理】RocksDB State Backend 内核路径
拆解 Flink EmbeddedRocksDBStateBackend 的物理布局:每个 subtask 独立 RocksDB 实例、ColumnFamily 与 KeyGroup 前缀映射、写路径 memtable→WAL→flush→compaction 与 lsm-tree 系列对照、读路径 block cache 与读放大、增量 checkpoint 与全量 snapshot 的 IO 差异。
【流式数据处理】状态放大、Compaction 与调优
在 RocksDB state backend 读写路径之上,拆解窗口 state 膨胀、LSM 写放大与 checkpoint 争抢磁盘、Flink managed memory 与 RocksDBOptionsFactory 调参边界,以及 hot key 导致单 subtask 过热时的诊断与「改 state 设计 vs 拧参数」取舍。
【流式数据处理】背压、故障模式与引擎对照
收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。