土法炼钢兴趣小组的算法知识备份

【流式数据处理】键控状态与 State TTL

文章导航

分类入口
databasedistributed
标签入口
#flink#keyed-state#value-state#list-state#map-state#rocksdb#hashmap-state-backend#state-ttl#max-parallelism#state-size

目录

第 8 篇 说明 Keyed State 绑定在 keyBy 之后的 (operator, key) 上,KeyGroup 个数等于 maxParallelism。生产故障清单里 「状态膨胀」「TM OOM」「checkpoint 越来越大」 几乎都落在这一层:窗口未清理、MapState 无限增长、TTL 未配或清理策略不对、HashMap 后端撑爆堆内存。

本文回答五个运维与开发会直接碰到的问题:

Checkpoint 如何把 Keyed State 快照进持久存储、与 Kafka offset 对齐,见 第 10 篇。RocksDB 增量 checkpoint 与 compaction 调优见 第 12、13 篇。窗口 Trigger 与 Evictor 对 state 生命周期的影响见 第 3 篇

环境说明:本机 WSL2,未安装 Flink / RocksDB。状态大小估算给出 公式与心算方法,不伪造 TM 磁盘曲线;与 lakehouse 第 6 章 一样,性能数字只在有实验处出现

版本锚定:Flink 1.20+ / 2.x。1.13 起 MemoryStateBackend / FsStateBackend / RocksDBStateBackend 分别演进为 HashMapStateBackendEmbeddedRocksDBStateBackend(checkpoint 仍写分布式文件系统或对象存储)。Flink 2.x 提供 State V2 API(Working with State V2),TTL 配置对象仍为 StateTtlConfig,语义与 V1 一致。


一、Keyed State 在 subtask 内的布局

每个 keyed 算子 subtask 维护一个逻辑上的 键值存储:当前 event 的 key 决定读写哪个命名空间(来源:Flink Documentation,Stateful Stream Processing)。

flowchart TB
  subgraph ST["subtask 3 (parallelism=8)"]
    NS1["namespace: key=A<br/>ValueState / MapState / ..."]
    NS2["namespace: key=B"]
    NS3["namespace: key=C"]
  end
  IN["输入 record (key=B)"] --> NS2

KeyGroup 与 subtask:subtask 3 可能负责 KeyGroup \(\{7, 15, 23, \ldots\}\) 内的所有 key(具体集合由 maxParallelism 与 parallelism 决定,第 8 篇)。rescaling 时 KeyGroup 在 subtask 间迁移,state 随 KeyGroup 走(第 11 篇)。

窗口 state 在命名空间上再嵌套 windowId(窗口起止时间戳等),因此 state 条目数 \(\approx\) 活跃 key 数 × 每 key 窗口数——这是状态膨胀的第一来源。


二、五种 Keyed State 语义

以下类型均在 RichFunction / KeyedProcessFunction / 窗口函数 中通过 StateDescriptor 注册(来源:Flink Documentation,Working with State)。

2.1 ValueState<T>

语义:每个 key 一个可变单值,类型 \(T\)

ValueStateDescriptor<Long> desc =
    new ValueStateDescriptor<>("count", Long.class);
ValueState<Long> count = getRuntimeContext().getState(desc);

public void processElement(Event e, Context ctx, Collector<Out> out) throws Exception {
    Long c = count.value();
    if (c == null) c = 0L;
    count.update(c + 1);
}
适用 不适用
计数、最后见到的时间戳、单值缓存 需保留全量历史事件列表

ReducingState 相比,ValueState 无内置聚合函数,每次完全替换 update(value)

2.2 ListState<T>

语义:每个 key 一个 有序列表,可 add / get / update 全列表。

ListStateDescriptor<Event> bufDesc =
    new ListStateDescriptor<>("buffer", Event.class);
ListState<Event> buffer = getRuntimeContext().getListState(bufDesc);

buffer.add(event);
Iterable<Event> all = buffer.get();
适用 风险
会话缓冲、待排序小批次 列表长度无界 → state 爆炸

ListState 元素在 TTL 下独立过期(见第六节):每个元素可有独立过期时间(集合型 state 的 per-entry TTL)。

2.3 MapState<UK, UV>

语义:每个 key 对应一个 嵌套 Map(用户键 UK → 用户值 UV)。

MapStateDescriptor<String, Long> mapDesc =
    new MapStateDescriptor<>("user-counts", String.class, Long.class);
MapState<String, Long> map = getRuntimeContext().getMapState(mapDesc);

map.put(subKey, 1L);
Long v = map.get(subKey);
适用 风险
多字段动态维、CEP 偏复杂模式 UK Cardinality 无限 时磁盘线性涨

MapState 的 null 值 依赖序列化器是否支持 null;官方文档注明必要时用 NullableSerializer(多占一字节)。

2.4 ReducingState<T>

语义:每个 key 一个 可结合(associative)+ 可交换(commutative) 的聚合值;新元素通过 ReduceFunction 合并。

ReducingStateDescriptor<Long> sumDesc = new ReducingStateDescriptor<>(
    "sum", (a, b) -> a + b, Long.class);
ReducingState<Long> sum = getRuntimeContext().getReducingState(sumDesc);

sum.add(1L);
Long total = sum.get();

要求ReduceFunction 必须满足结合律,否则 checkpoint 恢复后重放顺序不同会导致 结果错误

对比 ValueState ReducingState 省什么
手动 read-modify-write 内置增量合并,窗口增量聚合常用

2.5 AggregatingState<IN, OUT>

语义:通过 AggregateFunction(create / add / merge / getResult)维护聚合结果;输入类型 IN 与输出类型 OUT 可不同。

AggregatingStateDescriptor<Event, Acc, Result> aggDesc =
    new AggregatingStateDescriptor<>("agg", new MyAggregateFunction(), Result.class);
AggregatingState<Event, Result> agg = getRuntimeContext().getAggregatingState(aggDesc);
适用 说明
窗口 AggregateFunction 后端 merge 用于 session 合并

窗口 APIAggregateFunction + ProcessWindowFunction 组合:AggregatingState 存 Acc Accumulator,fire 时 getResult 输出。

2.6 选型对照表

类型 每 key 结构 典型场景
ValueState 单值 标志位、最后事件时间
ListState 列表 小缓冲、会话事件集(需 TTL/限长)
MapState 嵌套 map 动态二级键
ReducingState 单值聚合 sum/min/max(可交换结合)
AggregatingState 自定义累加器 复杂聚合、窗口增量

三、StateBackend:HashMap 与 EmbeddedRocksDB

3.1 抽象层次

StateBackend 决定 Keyed State 与 Operator State 存在哪、checkpoint 时如何快照(来源:Flink Documentation,State Backends)。

flowchart LR
  OP["Keyed 算子"]
  SB["StateBackend"]
  HM["HashMapStateBackend<br/>JVM 堆内"]
  RDB["EmbeddedRocksDBStateBackend<br/>本地磁盘 LSM"]
  CP["Checkpoint 存储<br/>DFS / S3"]
  OP --> SB
  SB --> HM
  SB --> RDB
  HM --> CP
  RDB --> CP

CheckpointStorage 与 StateBackend 正交:即使 HashMap 堆内,checkpoint 也可写到 S3;RocksDB 则 平时在本地盘,checkpoint 上传 SST 或快照第 12 篇)。

3.2 HashMapStateBackend

配置(1.20+):

env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://bucket/flink-checkpoints");

3.3 EmbeddedRocksDBStateBackend

env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 参数:是否启用增量 checkpoint

3.4 选型决策表

条件 倾向
总 keyed state < 数百 MB 且稳定 HashMap
state 达 GB~TB RocksDB
长窗口 + 高 key 基数 RocksDB + TTL + 窗口长度治理
低延迟、短 state、可接受重启从 Kafka 重放 HashMap
lakehouse 入湖 同集群,checkpoint 已写 S3 两者皆可,看 state 规模

不存在「RocksDB 总是更快」:小 state 下 HashMap 往往更省 CPU;RocksDB 的价值在 容量边界增量 checkpoint,不是微基准上的 μs 优势(不做未实测的延迟对比)。

3.5 版本与配置名对照

旧名(≤1.12 常见) 1.13+ 名
MemoryStateBackend HashMapStateBackend
FsStateBackend HashMapStateBackend + 远程 checkpoint
RocksDBStateBackend EmbeddedRocksDBStateBackend

flink-conf.yamlstate.backend: hashmap / rocksdb 仍可用,映射到上述实现类。


四、Operator State 与 Broadcast State(复习)

Keyed State 占生产故障大多数,但 Operator State 与 EOS 相关(第 8 篇):

类型 结构 场景
ListState 每 subtask 一个 list Kafka split / offset
UnionListState rescale 时并集 旧 API 兼容
BroadcastState 每 subtask 完整 map 规则流

Broadcast State 不适合大维表:内存 \(\propto\) parallelism × 维表大小


五、State TTL 配置语义

State TTLKeyed State 任意类型 提供 最佳努力(best-effort) 过期清理(来源:Flink Documentation,State TTL;API StateTtlConfig)。

5.1 基本用法

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofDays(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupIncrementally(10, true)
    .cleanupInRocksdbCompactFilter(1000L)
    .build();

ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("with-ttl", String.class);
desc.enableTimeToLive(ttlConfig);

5.2 时间基准

选项 含义
Processing time(默认) TTL 相对系统时钟
Event time 部分版本/扩展支持,需查当前 release 文档

官方文档明确:TTL 目前以 processing time 为基准Working with State V2 同述)。事件时间业务过期应结合 窗口 allowed lateness + timer 清理第 2、3 篇),不能假设「事件时间 TTL」在所有版本默认可用。

5.3 UpdateType:何时刷新过期时钟

行为
OnCreateAndWrite(默认) 创建与 时刷新 TTL
OnReadAndWrite 也刷新(滑动会话)

OnReadAndWrite 适合「活跃用户状态常驻」;冷 key 仍会在 TTL 后过期

5.4 StateVisibility:过期但未清理的值能否被读到

行为
NeverReturnExpired(默认) 逻辑上视为不存在
ReturnExpiredIfNotCleanedUp 物理未删前仍可读

隐私 / 合规场景用 NeverReturnExpired;调试物理清理延迟可用后者。

5.5 集合类型的 per-entry TTL

ListState、MapState每个元素 / 每个 map entry 独立 TTL。窗口 state 中 每个窗口 namespace 也可独立过期——与 第 3 篇窗口生命周期 叠加时要避免 双计数


六、TTL 清理策略与 checkpoint 交互

TTL 不保证立即物理删除;清理是 best-effort(来源:官方 State TTL 专文)。

flowchart TB
  EXP["状态逻辑过期"]
  R1["读路径: NeverReturnExpired 隐藏"]
  R2["增量清理 incremental cleanup"]
  R3["全量 snapshot 时 cleanupFullSnapshot"]
  R4["RocksDB compaction filter"]
  EXP --> R1
  EXP --> R2
  EXP --> R3
  EXP --> R4
策略 触发点 适用 backend
incremental cleanup 访问 state 时顺带删过期 HashMap / RocksDB
cleanupFullSnapshot 做全量 checkpoint snapshot 时 HashMap 为主
RocksdbCompactFilter RocksDB compaction 线程 RocksDB

配置示例:

StateTtlConfig.newBuilder(Duration.ofHours(6))
    .cleanupIncrementally(1000, true)   // 每访问 1000 条尝试清理
    .cleanupInRocksdbCompactFilter(10000L) // compaction filter 检查间隔
    .build();

6.1 与 checkpoint 的关系

6.2 常见误配

误配 后果
只设 TTL 不开 RocksDB compact filter 磁盘长期膨胀(第 13 篇
窗口很长 + OnReadAndWrite + 高 QPS 热点 key 永不过期
以为 TTL 替代窗口 allowed lateness 事件时间语义仍乱序

七、状态大小估算

不做 benchmark 也可以 数量级估算,用于并行度与 backend 初选。

7.1 单条 state 条目成本

记:

Keyed 条目数(量级):

\[ N_{\text{entries}} \approx K \times W \times N_{\text{state-types}} \]

\(N_{\text{state-types}}\) 为同一算子上注册的 state descriptor 个数(如 ValueState + MapState 算两种)。

7.2 窗口类型与 \(W\)

窗口 \(W\) 量级(直觉)
滚动 1h,事件时间 每 key 通常 1( watermark 推进后关闭)
滑动 1h / 步 5m 每 key 约 12 个重叠窗口
会话 gap 30m 取决于事件密度,无固定上界 → 必须 TTL 或 gap 上限

滑动窗口是 状态膨胀经典来源\(W \approx \lceil \text{windowSize} / \text{slide} \rceil\)

7.3 磁盘占用(RocksDB)

RocksDB live data 小于 含 tombstone 与未 compaction 的磁盘

\[ \text{disk} \approx N_{\text{entries}} \times S \times f_{\text{write-amp}} \]

写放大 \(f_{\text{write-amp}}\) 经验上 1.5~3+(与 compaction 策略、更新频率有关,lsm-tree 系列)。只估算 live data 会低估磁盘

HashMap 堆占用 近似 \(N_{\text{entries}} \times S\) 加 Java 对象头;无 write-amp,但 全量 checkpoint 体积接近 live data 序列化大小。

7.4 心算例子(无实测数字)

假设:

\[ N_{\text{entries}} \approx 10^6 \times 12 = 1.2 \times 10^7,\quad \text{live} \approx 1.2 \times 10^7 \times 64\,\text{B} \approx 768\,\text{MB} \]

RocksDB 磁盘可能 >1 GB;HashMap 需 堆 >768 MB 仅 state,加 JVM 开销与峰值 checkpoint 易 OOM → 应选 RocksDB 并评估 TTL / 窗口改滚动 / 预聚合

作业跑起来后(需真实集群,非本环境):

Metric 用途
rocksdb.estimate-live-data-size subtask 级 live data
rocksdb.total-sst-files-size SST 总大小
lastCheckpointSize checkpoint 体积趋势
numRecordsInPerSecond vs state 增长 验证 \(K\) 是否在涨

第 13 篇live-sst-files 随时间曲线 的实验方法;本篇只给 估算式与 metrics 名


八、窗口 state 与 Kafka 消费位点的内存分工

lakehouse 第 19 章Writer 缓冲 在 Sink 算子,与 Keyed State 分开;入湖作业 OOM 要分 算子 state 还是 批量缓冲


九、代码模式:带 TTL 的 MapState 去重

public class DedupFunction extends KeyedProcessFunction<String, Event, Event> {

    private MapState<String, Boolean> seen;

    @Override
    public void open(OpenContext ctx) {
        StateTtlConfig ttl = StateTtlConfig
            .newBuilder(Duration.ofDays(7))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(5000L)
            .build();

        MapStateDescriptor<String, Boolean> desc =
            new MapStateDescriptor<>("seen-ids", String.class, Boolean.class);
        desc.enableTimeToLive(ttl);
        seen = getRuntimeContext().getMapState(desc);
    }

    @Override
    public void processElement(Event e, Context ctx, Collector<Event> out) throws Exception {
        if (seen.contains(e.getId())) return;
        seen.put(e.getId(), true);
        out.collect(e);
    }
}

Cardinality 等于 7 天内独立 id 数;估算磁盘用 \(S \approx\) id 字符串 + 布尔 + 开销。TTL 7 天是 processing time;若 id 空间无限,仍需 BloomFilter 近似外部 store(超出 Flink state 边界)。


项目 V1(本篇示例) V2
包路径 org.apache.flink.api.common.state.* org.apache.flink.api.common.state.v2.*
TTL StateTtlConfig + enableTimeToLive 同配置对象
访问 getRuntimeContext().getState State V2 接口

Backend、KeyGroup、TTL 清理策略对 V1/V2 共用。新项目若选 V2,以官方迁移指南为准;机制层仍读本篇。


十一、与 checkpoint / savepoint 的衔接预告


十二、术语表

术语 含义
Keyed State 绑定 (operator, key) 的分区状态
StateDescriptor 注册 state 名称、类型与 TTL
StateTtlConfig TTL 时长、更新类型、可见性、清理策略
HashMapStateBackend 堆内 state,全量 checkpoint
EmbeddedRocksDBStateBackend 本地 LSM state,支持增量 checkpoint
KeyGroup state rescale 的原子单位
write amplification LSM 层重复写入导致的磁盘放大
best-effort cleanup TTL 物理删除延迟,逻辑可见性可配置

十三、小结

五种 Keyed State 覆盖从单值到嵌套 map 的聚合模式;选型看 更新模式是否可结合归约HashMap 适合小 state 低延迟;EmbeddedRocksDB 适合 GB 级以上与增量 checkpoint。State TTL 用 processing time 计时,NeverReturnExpired 保证读语义,物理清理依赖 incremental / snapshot / compaction 三条路径 best-effort 组合状态大小\(K \times W \times S\) 估算,滑动窗口与无限 MapState 是两大膨胀源;应用 metrics 校准后决定 window 改型、TTL 或 backend 切换。

下一篇进入 Checkpoint 机制:barrier 对齐、Kafka offset 写入 snapshot、aligned vs unaligned 与超时调优。


参考资料

  1. Apache Flink Documentation, Working with State / Working with State V2(五类 Keyed State、Operator State)。
  2. Apache Flink Documentation, State Backends(HashMapStateBackend、EmbeddedRocksDBStateBackend)。
  3. Apache Flink Documentation, State TTL(UpdateType、StateVisibility、cleanup 策略)。
  4. Apache Flink Documentation, Stateful Stream Processing(KeyGroup 与 keyed state 分片)。
  5. Apache Flink API, StateTtlConfig(Flink 2.2-SNAPSHOT Javadoc)。
  6. Apache Flink 博客, State TTL in Flink 1.8+(RocksDB compact filter 背景,B 级辅助)。
  7. 本系列 第 3 篇(窗口与 \(W\))。
  8. 本系列 第 8 篇(keyBy、KeyGroup)。
  9. 本系列 第 12、13 篇(RocksDB 路径与调优)。
  10. lsm-tree 系列(LSM 写放大直觉)。

返回 系列目录 | 上一篇:DataStream 与算子语义 | 下一篇:Checkpoint 机制

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】DataStream 与算子语义

拆解 Source/Transform/Sink 数据流图、rebalance/keyBy/broadcast 等 shuffle 策略、keyBy 到 KeyGroup 的映射,以及 ProcessFunction 与 TimerService 如何承载事件时间逻辑,并引入算子状态与键控状态的分工边界。

2026-07-01 · database / distributed

【流式数据处理】RocksDB State Backend 内核路径

拆解 Flink EmbeddedRocksDBStateBackend 的物理布局:每个 subtask 独立 RocksDB 实例、ColumnFamily 与 KeyGroup 前缀映射、写路径 memtable→WAL→flush→compaction 与 lsm-tree 系列对照、读路径 block cache 与读放大、增量 checkpoint 与全量 snapshot 的 IO 差异。

2026-07-01 · database / distributed

【流式数据处理】状态放大、Compaction 与调优

在 RocksDB state backend 读写路径之上,拆解窗口 state 膨胀、LSM 写放大与 checkpoint 争抢磁盘、Flink managed memory 与 RocksDBOptionsFactory 调参边界,以及 hot key 导致单 subtask 过热时的诊断与「改 state 设计 vs 拧参数」取舍。

2026-07-01 · database / distributed

【流式数据处理】背压、故障模式与引擎对照

收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。


By .