土法炼钢兴趣小组的算法知识备份

【流式数据处理】Savepoint 与升级恢复:状态演化与兼容边界

文章导航

分类入口
databasedistributed
标签入口
#flink#savepoint#checkpoint#state-evolution#rescale#upgrade#operator-uid#schema-evolution

目录

第 10 篇 讲了周期性 checkpoint:barrier 对齐、Coordinator 汇总 ack、Kafka offset 写入快照。Checkpoint 解决 运行中容错;生产上还缺一类操作——在人为边界上停作业、改代码、调并行度、升 Flink 小版本,然后从同一份状态继续跑

Savepoint 就是 Flink 为这种场景提供的 用户拥有的、可移植的一致性快照。它与 checkpoint 共用 barrier 与状态序列化管线,但在触发方式、存储格式、生命周期和兼容承诺上 deliberately 不同。

本文回答:

版本锚定:Flink 1.20.x;Savepoint canonicalnative 格式在 1.15+ 并存。Flink 2.0 将移除 LEGACY claim mode(Savepoints 文档)。下文 rescue 命令以 bin/flink CLI 为准。

环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2)、i9-12900K / 32 GB,未安装 Flink 集群。CLI 示例与配置片段来自 Flink 1.20 官方文档,不伪造 savepoint 恢复日志或耗时数据


一、Savepoint 解决什么问题

1.1 三类典型运维动作

动作 为什么 checkpoint 不够 Savepoint 的作用
发版 自动 checkpoint 与旧代码拓扑绑定,cancel 后默认删除 手动触发,保留格式供新 JAR 恢复
** rescale ** 需明确「从哪份已知 good 状态」扩缩容 指定 savepoint 路径 + 新 parallelism
有状态拓扑变更 删/增算子需映射或丢弃旧 state uid + --allowNonRestoredState 或 State Processor API

第 10 篇 的 checkpoint 默认 num-retained=1,面向 失败自动回滚;Savepoint 面向 计划内变更

1.2 Savepoint 的物理结构

Flink Savepoints 文档描述:Savepoint = 稳定存储上的二进制 state 文件 + 较小的元数据文件_metadata)。元数据含指向各 operator state 文件的相对路径;canonical 格式与具体 StateBackend 解耦,native 格式则与 RocksDB 等后端绑定(1.15+)。

目录示例:

/savepoints/savepoint-00012-abc123/
  _metadata
  ...(各 operator 的 state 文件)

Savepoint 通常可整体搬迁(复制目录到新 bucket 再 -s 恢复),但文档提醒:启用 entropy injection 或含 task-owned state(如部分 sink)时可能含绝对路径,搬迁需验证。

1.3 与第 9 篇 state 类型的关系

第 9 篇 的 ValueState / ListState / 窗口 state 都是 Savepoint 里的一行行序列化字节。State 用什么 Serializer(POJO、Avro、Kryo)直接决定后续 schema evolution 是否可行(第五节)。


二、Savepoint vs Checkpoint

2.1 生命周期与所有权

Flink 文档 Checkpoints vs. Savepoints 的核心区分:

维度 Checkpoint Savepoint
触发 JM 按 interval 自动 用户 CLI / REST 手动
所有权 Flink 作业;完成后旧 ckpt 可删 用户;Flink 不自动删
目的 故障恢复 升级、迁移、归档、实验分支
格式 后端相关;可增量(RocksDB) Canonical(默认可跨后端读)或 Native
作业 cancel 默认删除(除非 externalized retain) 保留,除非用户 -d dispose
flowchart TD
  subgraph ckpt [Checkpoint 生命周期]
    C1[interval trigger] --> C2[complete]
    C2 --> C3[保留 num-retained 份]
    C3 --> C4[failover 自动恢复]
    C3 --> C5[cancel 默认删除]
  end
  subgraph sp [Savepoint 生命周期]
    S1[用户 trigger / stop] --> S2[complete]
    S2 --> S3[用户保管路径]
    S3 --> S4[新作业 -s 恢复]
    S3 --> S5[用户 dispose]
  end

下表摘自官方 Checkpoints vs. Savepoints 能力矩阵,✓ 表示支持,x 不支持,! 有限制:

能力 Canonical Savepoint Native Savepoint Aligned Checkpoint Unaligned Checkpoint
更换 StateBackend x x x
State Processor API 写 x x x
State Processor API 读 ! ! x
自包含、可搬迁 x x
Schema evolution ! ! !
任意拓扑升级 x
Flink 小版本升级 x
Flink patch 升级
Rescale 并行度

读表要点:

2.3 何时仍用 Checkpoint 做恢复

2.4 Native vs Canonical Savepoint

Flink 1.15+ 触发时可指定格式:

bin/flink savepoint --type native :jobId hdfs:///savepoints/
bin/flink savepoint --type canonical :jobId hdfs:///savepoints/
格式 优点 缺点
Canonical 跨 StateBackend、State Processor API、schema evolution 最稳 大 state 时创建/恢复较慢
Native RocksDB 场景快,SST 可自包含 不能换 backend;State Processor 写不支持

大状态生产发版:若长期用 RocksDB 且不发版换 backend,native 可缩短窗口;否则 canonical 更省心。


三、Operator UID 与状态映射

3.1 Savepoint 是 Operator ID → State 的 map

Savepoints 文档:Savepoint 可视为每个有 state 的算子一条映射:

Operator ID  |  State
-------------+------------------
source-id    |  Kafka split / offset state
mapper-id    |  Keyed state

无 state 的算子(如 print)不在 savepoint 里。

3.2 必须手动 uid() 的原因

DataStream<String> stream = env
    .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
    .uid("kafka-source")
    .keyBy(...)
    .process(new MyProcessFunction())
    .uid("my-process")
    .sinkTo(sink);

文档建议:所有算子都显式 uid,因为 Window 等内置算子也有 state,表面「无 state」的节点可能仍有 state。

SQL 作业若无法设 uid,savepoint 升级难度显著上升——DataStream 生产发版应优先。

3.3 拓扑变更规则(FAQ 摘要)

Flink Savepoints FAQ(A 级):

变更 未手动 uid 已手动 uid
新增有 state 算子 新算子空 state 启动
删除有 state 算子 默认失败;加 -n 跳过
重排有 state 算子 几乎必失败 uid 不变则可恢复
增删无 state 算子 可能影响自动 ID 通常不影响
改并行度 支持 rescale

四、Rescale 与 KeyGroup 重分布

4.1 并行度变更如何工作

Keyed state 按 KeyGroup 分片(第 8 篇 keyBy)。Savepoint / checkpoint 里每个 subtask 存自己负责的 KeyGroupRange。

Rescale 时 Flink 读取 savepoint 中 所有 KeyGroup 数据,按 新 parallelism 重新划分 range 并分配给新 subtask。这是 在线 reshuffle state,不是简单复制文件。

# 原 parallelism=2,从 savepoint 以 4 并行启动
bin/flink run -s hdfs:///savepoints/savepoint-xxx -p 4 my-job.jar

4.2 限制与注意

4.3 rescale 与 rebalance 的区别

操作 何时发生 State
Rescale 从 savepoint/checkpoint 启动时指定新 -p 重分布 KeyGroup
Rebalance 运行中 shuffle 策略 不改变 KeyGroup 归属

Savepoint 路径上的 rescale 是 计划内 扩容;运行中改 -p 必须 stop-with-savepoint 再提交新 job。

4.4 算例:KeyGroup 如何重切

maxParallelism=128(KeyGroup 总数固定为 128),原 parallelism=2

从 savepoint 以 parallelism=4 启动时,Flink 按均匀切分规则映射(具体公式见 KeyGroupRangeAssignment 源码):

原 subtask 0 的状态文件会被 拆分读取,属于 KeyGroup 32–63 的条目移交给新 subtask 1。这个过程在 restore 阶段完成,不是运行时 rebalance record。

若新 parallelism 不能整除 KeyGroup 分布,Flink 仍保证每个 KeyGroup 恰好归属一个 subtask,但可能出现 subtask 间 KeyGroup 数量差 1——与 第 8 篇 数据倾斜话题相关:rescaling 不消除 hot key,只改变物理并行度。

4.5 maxParallelism 规划

创建 StreamExecutionEnvironment 时:

env.setMaxParallelism(256);
env.setParallelism(8);

规则:


五、State Schema Evolution

5.1 什么叫 schema evolution

State Schema Evolution 文档:允许在 savepoint 中 修改 state 数据类型(如 POJO 加字段),恢复时 Flink 用 旧 Serializer 读、新 Serializer 写 做一次性迁移。

标准流程:

  1. 触发 savepoint;
  2. 修改代码中 state 类型(兼容变更);
  3. flink run -s ... 恢复;首次访问该 state 时迁移。

5.2 支持的类型

类型 支持 evolution 条件
POJO Flink ≥ 1.8 从 savepoint 恢复;字段可增删,类型不可变
Avro 变更须符合 Avro schema resolution 规则
Kryo x 无法检测兼容性,改类型即 incompatible
嵌套 Kryo x POJO 内 List<SomeType> 若走 Kryo,内层也不能 evolution

POJO 规则摘要:

5.3 与 CDC / 入湖字段变更的对照

业务表加列不等于 Flink state 自动兼容——窗口里存的 MyAggregate POJO 必须按上表演进。Avro state + Schema Registry 是长生命周期聚合作业的常见选型。


六、Cancel、Stop 与 Savepoint 操作

6.1 命令对照

命令 行为 Savepoint
flink cancel :jobId 停止作业 否(除非事先 trigger)
flink stop --savepointPath :jobId 触发 savepoint 并 停止 是(stop-with-savepoint)
flink savepoint :jobId [:targetDir] 作业 继续运行 是(在线 savepoint)
flink savepoint -d :path 删除 savepoint dispose

Stop-with-savepoint(Flink 1.15+ 推荐发版流程):

bin/flink stop --type canonical --savepointPath hdfs:///savepoints/ :jobId

返回 savepoint 路径后,再提交新 JAR:

bin/flink run -s hdfs:///savepoints/savepoint-xxx \
  -c com.example.MyJob my-job.jar

6.2 Detached 触发

大 state 时同步 CLI 可能超时:

bin/flink savepoint :jobId hdfs:///savepoints/ -detached

客户端立即返回 trigger id,通过 REST API 轮询完成状态(Savepoints 文档)。

6.3 --allowNonRestoredState-n

删除有 state 的算子或改 uid 导致无法映射时:

bin/flink run -s hdfs:///savepoints/xxx -n my-job.jar

风险:跳过 unmatched state,该算子从空 state 开始——若它是计数器/去重表,可能重复或丢业务语义。文档强调:默认拓扑序 reassigned uid 可能 错配 state,必须 显式 uid 后再用 -n 跳过 确定废弃 的算子。

6.4 Claim Mode 与文件归属(1.15+)

从 savepoint 恢复时的 claim mode 决定 snapshot 文件谁删:

Mode 行为 适用
NO_CLAIM(默认) Flink 不拥有 snapshot;第一次 successful checkpoint 必须是 full(RocksDB 增量场景) 想保留 savepoint 作备份、或多 job 读同一 snapshot
CLAIM Flink 拥有 snapshot,等同 checkpoint 生命周期 常规发版,旧 savepoint 可被清理
LEGACY 1.15 前行为;2.0 移除 勿在新作业使用
bin/flink run -s hdfs:///savepoints/xxx -claimMode CLAIM my-job.jar

Native RocksDB savepoint 在 CLAIM 下可能因 SST 复用延迟目录删除(Savepoints 文档 Claim mode)。

6.5 Savepoint 存储配置

默认必须配置 savepoint 目录,否则 trigger 失败:

# flink-conf.yaml
execution.checkpointing.savepoint-dir: hdfs:///flink/savepoints

或在代码中:

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

要求与 checkpoint 相同:JobManager 与 TaskManager 均可读写。权限错误表现为 trigger 成功但 subtask 上传 state 失败。

生产建议:

6.6 同一 checkpoint 时间线上的多作业陷阱

Savepoints 文档 Attention:若原 job 在 take savepoint 后 失败回滚到 savepoint 之前的 checkpoint,而另一 job 已从该 savepoint 启动,可能出现 事务 sink 重复提交(非确定性 + 两阶段 commit)。安全做法:stop 原 job 后再从 savepoint 启新 job;或改 transactional sink 的 uid 丢弃旧事务 state。


七、升级恢复与故障排查

7.1 版本升级检查清单

  1. 阅读 Release NotesState Compatibility / Savepoint 章节;
  2. 发版前对生产 job trigger canonical savepoint(或 stop-with-savepoint);
  3. 预发 用同路径 -s 恢复,跑 smoke + 对账;
  4. 确认 Flink 库版本(connector、state backend)与集群一致;
  5. 若从 checkpoint 而非 savepoint 升 小版本:文档矩阵允许 aligned checkpoint,但生产仍推荐 savepoint;
  6. 恢复后盯 第一次 checkpoint:NO_CLAIM 下 RocksDB 须 full checkpoint 成功后再删旧 savepoint。

7.2 常见异常与含义

异常 / 日志 常见原因 处理方向
NonRestoredStateException 删算子、uid 变更、无 -n 显式 uid;或 -n 跳过;或 State Processor API 删 operator
Key serializer 不兼容 keyBy 字段、SQL 改 group key 不能 -n 跳过 key 不兼容;需新 uid 新 state 或 API 删 state
StateMigrationException POJO/Avro 不兼容改型 回滚代码;或换 state 名(新 operator)丢弃旧 state
Savepoint 不存在 / 权限 路径错、S3 凭证 TM/JM 都能读 hdfs://s3://
恢复极慢 canonical + 大 state 下次用 native;或 RocksDB incremental(第 12 篇)

Jira FLINK-38558(Flink 1.20):state.backend.rocksdb.timer-service.factory=HEAP 时,从 checkpoint rescale 可能抛 KeyGroupRange does not contain key group N从 savepoint 恢复 正常(savepoint 不序列化 heap timer 到 RawKeyedState,FLINK-21344)。

工程建议:rescaling 发版路径用 stop-with-savepoint;或把 timer 放到 RocksDB(ROCKSDB factory);升级前在预发验证 rescale。

7.4 Key Serializer 不兼容:没有银弹

邮件列表与 Jira 讨论(FLINK-20375 等):execution.savepoint.ignore-unclaimed-state 不能单独解决 key serializer 变更——跳过的只是 unclaimed operator state,key 类型变了仍无法 deserialize。

可选硬路径:

  1. State Processor APISavepointWriter.fromExistingSavepoint(...).removeOperator(uid)... 去掉问题算子 state,再写新 savepoint;
  2. 新 uid + -n:旧 state 丢弃,该算子重算(需业务可接受);
  3. 不从 savepoint 恢复:重新消费 Kafka(改 group 或 timestamp)并重建 state。

Flink SQL 改 GROUP BY 键触发的 key serializer 冲突,往往只能 新作业 id + 新 state 或 API 裁剪。

7.5 诊断命令与 REST

# 列出 savepoint 元数据(路径指向 _metadata 或目录)
bin/flink savepoint -d hdfs:///savepoints/savepoint-xxx  # 确认存在再删

# 作业恢复时看 Web UI → Checkpoints → History → Savepoint restore

REST:GET /jobs/:jobid/savepoints 查看 trigger 状态(detached 模式)。

日志关键词:Cannot map old state for operatorSerializerSchemaIncompatibleUnhandled key group

7.6 故障排查流程

flowchart TD
  E[恢复失败] --> Q1{NonRestoredState?}
  Q1 -->|是| Q2{删了算子?}
  Q2 -->|是| A1["-n 或 API 删 operator"]
  Q2 -->|否| Q3{改过 uid?}
  Q3 -->|是| A2[改回 uid 或 -n]
  Q3 -->|否| A3[查日志 unmatched operator]
  Q1 -->|否| Q4{Serializer / Migration?}
  Q4 -->|是| A4[POJO/Avro 兼容? Kryo 则新 state]
  Q4 -->|否| Q5{KeyGroup 错误?}
  Q5 -->|是| A5[FLINK-38558: 换 savepoint 或 timer 配置]
  Q5 -->|否| A6[路径/权限/版本]

7.7 发版 Runbook 模板(可复制到内部 wiki)

  1. 冻结配置变更;确认当前 checkpoint 成功率 100%(最近 1 小时)。
  2. flink stop --type canonical --savepointPath ... :jobId;记录返回路径与触发时间。
  3. 验证 savepoint 目录大小与 _metadata 存在;抽样备份到冷存储(可选)。
  4. 部署新 JAR / 新 Flink 版本;预发 -s 恢复 + -claimMode CLAIM(或 NO_CLAIM 若需保留原 snapshot)。
  5. 对账:Kafka lag、业务指标、sink 侧无重复(第 15 篇 EOS 验证)。
  6. 第一次 checkpoint 成功后,dispose 旧 savepoint(-d)或归档。
  7. 回滚预案:保留上一版 JAR + 上一 savepoint 路径至少 24h。

八、State Processor API(边界介绍)

State Processor API 允许 读/改/写 savepoint,而不启动完整流作业——例如:从 savepoint 删除废弃算子、注入初始 state、审计 KeyGroup 分布。

典型裁剪流程(概念示例,类名以 Flink 1.20 文档为准):

SavepointWriter
    .newSavepoint(env, maxParallelism, new HashMapStateBackend())
    .withOperator(OperatorIdentifier.forUid("deprecated-op"), input -> { /* drop */ })
    .write(savepointPath);

本篇不展开 API 全书;遇到 单算子 state Poison-n 不够用时,再查官方 State Processor API 专章。


九、与 Checkpoint / RocksDB 的衔接

flowchart LR
  RUN[运行中] -->|failover| CKPT[最近 Checkpoint]
  RUN -->|发版| SP[Savepoint]
  SP --> NEW[新 JAR / 新 parallelism]
  CKPT --> RUN
  NEW --> RUN
  SP --> RB[RocksDB 读 SST<br/>第 12 篇]

十、边界

本文不展开:


参考资料

  1. Apache Flink Documentation 1.20, Savepoints(trigger、stop、restore、claim mode、native/canonical)。A 级。
  2. Apache Flink Documentation 1.20, Checkpoints vs. Savepoints(能力矩阵)。A 级。
  3. Apache Flink Documentation 1.20, State Schema Evolution(POJO/Avro/Kryo)。A 级。
  4. Apache Flink Documentation 1.20, State Processor API。A 级。
  5. Apache Flink Documentation 1.20, Production Ready — 为算子设置 UID。A 级。
  6. ASF Jira FLINK-38558(RocksDB heap timer + checkpoint rescale)。A 级(issue)。
  7. ASF Jira FLINK-21344(savepoint 与 heap timer 序列化)。A 级(issue)。
  8. 本系列:第 9 篇 键控状态第 10 篇 Checkpoint第 12 篇 RocksDB State Backend

返回 系列目录 · 上一篇 Checkpoint 机制 · 下一篇 RocksDB State Backend 内核路径

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】背压、故障模式与引擎对照

收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。

2026-07-01 · database / distributed

【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照

从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。

2026-07-01 · database / distributed

【流式数据处理】RocksDB State Backend 内核路径

拆解 Flink EmbeddedRocksDBStateBackend 的物理布局:每个 subtask 独立 RocksDB 实例、ColumnFamily 与 KeyGroup 前缀映射、写路径 memtable→WAL→flush→compaction 与 lsm-tree 系列对照、读路径 block cache 与读放大、增量 checkpoint 与全量 snapshot 的 IO 差异。

2026-07-01 · database / distributed

【流式数据处理】状态放大、Compaction 与调优

在 RocksDB state backend 读写路径之上,拆解窗口 state 膨胀、LSM 写放大与 checkpoint 争抢磁盘、Flink managed memory 与 RocksDBOptionsFactory 调参边界,以及 hot key 导致单 subtask 过热时的诊断与「改 state 设计 vs 拧参数」取舍。


By .