土法炼钢兴趣小组的算法知识备份

【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照

文章导航

分类入口
databasedistributed
标签入口
#flink#checkpoint#chandy-lamport#barrier#kafka#exactly-once#backpressure#fault-tolerance

目录

第 9 篇 把键控状态、StateBackend 选型和 TTL 讲清楚了:窗口聚合、去重计数、CDC 维表 join 的结果都驻留在算子 state 里。作业一旦失败,光有 Kafka 的 committed offset 不够——下游算子已经按乱序事件更新了 state,简单「从上次 offset 重读」会重复计算或漏算。

Checkpoint 要解决的问题是:在同一逻辑时间点,把「source 读到了哪」和「每个算子的 state 长什么样」绑成一张可恢复的一致性快照。这张快照是 Flink 容错、exactly-once 语义、以及 lakehouse 第 19 章 里「checkpoint 完成才提交入湖」的引擎侧根基。

本文拆解:

版本锚定:Flink 1.20.x 文档与 API;Kafka 3.x connector 使用新 Source API(KafkaSource)。Flink 2.x 在 checkpoint 存储与配置键上可能有迁移,文中标注版本边界处需对照发行说明。

环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2)、i9-12900K / 32 GB,未安装 JVM、Flink 集群或 Docker 中的 Flink 镜像。第六节实验给出可复现步骤与应观测的 REST metrics 名称,不粘贴未执行的吞吐/延迟数字。机制描述来自 Flink 官方文档与公开设计材料(A 级)。


一、为什么需要 Checkpoint

1.1 有状态流作业的三份「进度」

一个典型的 Kafka → Flink 窗口聚合 → Sink 管道,至少有三处独立进度:

进度 存哪 失败时丢什么
Kafka 消费位点 broker 上的 __consumer_offsets(可选)+ Flink checkpoint 不知道从哪条消息重放
算子 keyed state TaskManager 内存或 RocksDB 窗口半开、计数器归零
Sink 外部副作用 目标系统(Kafka topic、Iceberg 表、JDBC) 重复写或漏写

第 5 篇 讲过:Flink 不依赖 Kafka broker 上 committed offset 做容错,committed offset 主要给监控和外部 consumer 看。真正用于恢复的是 Flink checkpoint 里持久化的 source state(含各 partition 的 offset)。

1.2 与 Savepoint 的分工

Checkpoint 默认由 Flink 周期性触发,成功后旧 checkpoint 可被清理;Savepoint 由用户手动触发,格式更偏「可移植、可升级」,详见 第 11 篇。本篇只讲自动 checkpoint 路径;Savepoint 复用同一套 barrier 注入与状态快照机制,但生命周期与存储策略不同。

1.3 读完本篇应能回答的问题


二、Chandy-Lamport 快照与 Barrier

2.1 分布式快照要满足的「一致性切点」

Chandy-Lamport 算法(Chandy & Lamport, 1985)解决的是:在分布式系统里,不停止计算的情况下,各进程各自拍局部快照,拼起来仍对应某个全局一致状态。

Flink 的变体(Flink 官方博客 From Aligned to Unaligned Checkpoints;Streaming Fault Tolerance 文档)用 checkpoint barrier 代替 Chandy-Lamport 的 marker record:

一致性切点的含义:快照时刻之前到达算子的 record 计入 checkpoint \(c\) 对应的状态;快照时刻之后到达的不计入(exactly-once 模式下通过 barrier 对齐保证;at-least-once 模式语义见 第 14 篇)。

2.2 单输入算子:barrier 即切分线

sequenceDiagram
  participant S as Source subtask
  participant O as 单输入算子
  participant D as 下游
  Note over S: JM 触发 checkpoint N
  S->>O: records ... r_k
  S->>O: barrier B_N
  O->>O: 同步阶段:snapshot state_N
  O->>D: 转发 B_N
  S->>O: records r_{k+1} ...(属于 ckpt N+1)

Source 在注入 \(B_N\) 之前发出的 record 属于 checkpoint \(N\)\(B_N\) 之后发出的属于下一个 checkpoint。算子在处理完 \(B_N\) 之前的所有 record 后做快照。

2.3 多输入算子:对齐(Alignment)

Join、union、co-process 等多输入算子必须等 所有输入通道 都收到同一 checkpoint 的 barrier,才能快照并转发。这段等待叫 alignment phase(对齐阶段):

flowchart TD
  IN1[输入通道 1] -->|已收到 B_N| ALIGN[对齐阶段]
  IN2[输入通道 2] -->|尚未收到 B_N| BUF[该通道暂停消费]
  ALIGN --> SNAP[快照 state]
  SNAP --> FWD[转发 B_N 到所有输出]

对齐期间,已收到 barrier 的通道暂停读数据,未收到 barrier 的通道继续读——直到全部到齐。这保证了多输入算子不会在「一边已过切点、一边还未过切点」的中间态做快照。

背压下的问题:若下游慢导致上游 channel 堆满,barrier 被堵在队列末尾,对齐阶段可能极长——checkpoint duration 飙升,甚至超过 checkpoint.timeout 被丢弃。这正是 unaligned checkpoint 要解决的场景(第三节)。

2.4 At-least-once 与 Exactly-once 在 barrier 上的差异

模式 Barrier 对齐 恢复行为
EXACTLY_ONCE 必须对齐(或使用 unaligned 的等价语义) 每条 record 对 state 的更新恰好一次
AT_LEAST_ONCE 不对齐也可 可能重复处理 barrier 之后的 record,state 可能重复更新

生产环境默认 EXACTLY_ONCE;极低延迟且可接受重复的场景才考虑 AT_LEAST_ONCE(Flink Checkpointing 文档)。


三、Aligned 与 Unaligned Checkpoint

3.1 Aligned checkpoint(默认行为)

Aligned 模式下,算子收到第一个 barrier 后进入对齐,阻塞已收到 barrier 的输入通道,直到所有输入通道都收到 barrier,再快照并转发。

优点:

缺点:

Web UI / REST metrics 里可关注:

3.2 Unaligned checkpoint

Flink 1.11 引入(FLIP-76)。Unaligned 模式下,算子收到 第一个 barrier 后 立即 快照 operator state,并把 该时刻输入/输出 buffer 里尚未被 barrier 划分的 in-flight 数据 一并写入 checkpoint state(称为 channel state / in-flight data)。

flowchart LR
  subgraph aligned [Aligned]
    A1[等所有输入 barrier 到齐] --> A2[快照算子 state]
  end
  subgraph unaligned [Unaligned]
    U1[第一个 barrier 到达] --> U2[快照 state + in-flight buffers]
  end

效果:checkpoint 时长与当前吞吐、背压程度 解耦——barrier 不再被堵在 buffer 队列末尾等对齐。

代价与限制(Flink 1.20 文档):

启用方式:

env.getCheckpointConfig().enableUnalignedCheckpoints();

flink-conf.yaml

execution.checkpointing.unaligned: true

Flink 1.20+ 还支持 aligned-checkpoint-timeout:checkpoint 先按 aligned 启动;若全局对齐超过阈值仍未完成,自动切换为 unaligned 继续(execution.checkpointing.aligned-checkpoint-timeout)。

3.3 选型对照

场景 建议
无背压或 checkpoint 耗时正常 默认 aligned
背压导致 alignment 极长、checkpoint 频繁超时 尝试 unaligned 或 aligned-checkpoint-timeout
状态很大、S3/HDFS 写带宽已是瓶颈 先优化 state / 增量 checkpoint(第 12 篇),再考虑 unaligned
需要 max-concurrent-checkpoints > 1 不能用 unaligned

四、CheckpointCoordinator 生命周期

4.1 角色分工

组件 职责
JobManager CheckpointCoordinator 触发 checkpoint、跟踪各 subtask ack、汇总完成或失败
Source 算子 / SourceReader 接收 trigger,注入 barrier,快照 split/offset state
中间算子 对齐(若 aligned)、同步 snapshotState、异步上传 state handle
StateBackend 把 keyed/operator state 写到 checkpoint 存储(内存、文件系统、RocksDB 增量等)
CheckpointStorage 持久化 checkpoint 元数据与 state 文件路径

CheckpointCoordinator 源码位于 apache/flinkorg.apache.flink.runtime.checkpoint.CheckpointCoordinator(A 级:源码 + 文档 Streaming Fault Tolerance)。

4.2 一次 checkpoint 的时序

sequenceDiagram
  participant JM as JobManager<br/>CheckpointCoordinator
  participant TM1 as TaskManager<br/>Source
  participant TM2 as TaskManager<br/>Window
  participant FS as Checkpoint Storage
  JM->>TM1: trigger checkpoint ID=N
  TM1->>TM1: 注入 barrier B_N,snapshot source state
  TM1->>TM2: 数据流 + B_N
  TM2->>TM2: 对齐,snapshot window state
  TM1-->>JM: ack subtask state (async)
  TM2-->>JM: ack subtask state (async)
  JM->>JM: 所有 required tasks ack?
  JM->>FS: 持久化 _metadata
  JM->>JM: checkpoint N COMPLETED
  Note over JM,TM2: 通知 source/sink:<br/>notifyCheckpointComplete(N)

阶段拆解:

  1. Trigger:按 execution.checkpointing.interval 或手动 API 发起;若已有 in-flight checkpoint 且未达 max-concurrent-checkpoints,可能排队或跳过(取决于 min-pause)。
  2. Barrier 注入:Source 在 main thread 的 checkpoint 同步阶段注入 barrier 并 snapshot;下游收到 barrier 后进入对齐与 snapshot。
  3. Ack:每个 subtask 把 StateHandle 上报给 JM;异步上传在 ack 之前或并行进行,取决于 state backend。
  4. Complete:所有 required operator 都 ack 后,JM 写入 checkpoint 元数据,状态变为 COMPLETED
  5. NotifynotifyCheckpointComplete(N) 回调到 source(提交 Kafka offset 可选)、TwoPhaseCommitSink(提交事务)等——这是端到端 exactly-once 的关键钩子(第 15 篇)。

4.3 失败、超时与 subsumed

结果 含义 典型处理
FAILED 某 subtask 同步阶段失败或 JM IO 异常 触发 failover;连续失败次数受 tolerable-failed-checkpoints 限制
EXPIRED 超过 checkpoint.timeout 仍未完成 丢弃该次 checkpoint,可能触发 failover
SUBSUMED 更新的 checkpoint 已完成,旧的 in-flight 作废 忽略,不算用户可见失败

execution.checkpointing.tolerable-failed-checkpoints 默认为 0:第一次 checkpoint 失败即 failover。生产上若偶发 S3 慢可设为 2–3,但不能靠此掩盖结构性超时。

4.4 Externalized checkpoint

execution.checkpointing.externalized-checkpoint-retention

这与 Savepoint 不同:externalized checkpoint 仍是 checkpoint 格式,跨版本/改拓扑能力弱于 savepoint(见第 11 篇对照表)。

4.5 同步阶段与异步阶段

每个 subtask 的 checkpoint 分两步(Streaming Fault Tolerance):

  1. 同步阶段(sync phase):在 task thread 上执行 snapshotState()——拷贝内存结构到本地临时 holding area,或触发 RocksDB Checkpoint#createCheckpoint。此阶段 阻塞正常 record 处理,因此 sync 时间直接影响 latency spike。
  2. 异步阶段(async phase):把 state 文件上传到 CheckpointStorage(如 S3A),上传完成后再向 JM ack。

若 sync 阶段抛异常,该 subtask failover(不受 tolerable-failed-checkpoints 保护)。Async 阶段 IO 失败则计入 tolerable 计数。

大 keyed state 作业应优先:

4.6 Checkpoint 存储选型

存储 配置 适用
JobManager heap 默认 checkpoint.storage=jobmanager 本地调试、state 极小
文件系统 filesystem + execution.checkpointing.dir 生产(HDFS/S3/GCS)
增量 + RocksDB 同上 + incremental 大 state,减上传带宽

execution.checkpointing.dir 必须 JM 与所有 TM 可达。对象存储上注意:

生产建议:dir 与业务数据 bucket 分离 IAM;监控 S3 503 Slow Down 与 checkpoint timeout 的相关性。

4.7 值得盯的 Metrics

Metric / REST 字段 含义 异常信号
lastCheckpointDuration 端到端 checkpoint 时间 接近或超过 interval
lastCheckpointSize 状态快照大小 单步暴涨 → state 泄漏或 window 未清
numberOfCompletedCheckpoints 累计成功次数 长期不增 → 持续失败
numberOfFailedCheckpoints 累计失败 与 tolerable 阈值联调
subtask alignmentDuration 对齐耗时 背压下极高 → 考虑 unaligned
checkpointStartDelay trigger 到 subtask 开始 背压或 CPU 饱和

Flink Web UI → Job → Checkpoints 页可看到 history table;自动化告警应对 lastCheckpointDuration / interval > 0.8 设阈值。


五、Kafka Source 与 Offset

5.1 Offset 存在哪

新 Source API(KafkaSource)下,每个 KafkaPartitionSplit 携带 topic-partition 与 当前消费 offset。Checkpoint 时 KafkaPartitionSplitState 被 snapshot 进 operator state(split enumerator / reader 协调状态),而不是只写 Kafka broker。

Flink Kafka Connector 文档 Consumer Offset Committing

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("events")
    .setGroupId("flink-job-1")
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    .setProperty("commit.offsets.on.checkpoint", "true")
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

5.2 恢复时 offset 与 state 如何一致

假设 checkpoint \(N\) 成功完成:

失败重启时作业从 checkpoint \(N\) 恢复:source 从 \(o_p\) 重读,算子 state 回到 \(N\)重放范围与 state 对齐。这正是 lakehouse 第 19 章 里「offset 与入湖提交同生死」的引擎侧前提。

5.3 与第 5 篇 consumer offset 的分工

机制 谁写 用途
Flink checkpoint split state Flink 容错恢复(权威)
Kafka __consumer_offsets Flink source(可选) 监控、外部只读 consumer
手动 consumer.commitSync 用户自管 consumer 与 Flink 作业无关

kafka-consumer-groups.sh --describe 看到 lag 归零,不能代替 checkpoint 健康检查——必须同时看 Flink checkpoint 成功率与 lastCheckpointDuration

5.4 动态分区发现

partition.discovery.interval.ms 大于 0 时,enumerator 会发现新 partition 并分配 split。新 partition 的初始 offset 由 OffsetsInitializer 决定(earliest / latest / committed)。Checkpoint 会包含 当前已分配 split 集合;恢复后 enumerator 继续发现逻辑。扩容 topic 时无需重启,但需理解新 partition 从历史位点还是 latest 开始(运维策略问题)。


六、调优:Interval、Timeout、Concurrency

6.1 参数一览

配置键 默认(1.20) 作用
execution.checkpointing.interval 无(需显式开启) 周期性 trigger 间隔
execution.checkpointing.min-pause 0 两次 checkpoint 开始 之间最少间隔;设了此项则隐含 max-concurrent-checkpoints=1
execution.checkpointing.timeout 10 min 单次 checkpoint 超时则 abort
execution.checkpointing.max-concurrent-checkpoints 1 允许几个 checkpoint 同时进行
execution.checkpointing.tolerable-failed-checkpoints 0 连续失败几次才 fail job
execution.checkpointing.mode EXACTLY_ONCE AT_LEAST_ONCE
execution.checkpointing.num-retained 1 保留最近几个 completed checkpoint

代码侧典型写法(与官方文档一致):

env.enableCheckpointing(60_000); // 每 60s 触发一次

CheckpointConfig ck = env.getCheckpointConfig();
ck.setMinPauseBetweenCheckpoints(30_000);
ck.setCheckpointTimeout(600_000);
ck.setMaxConcurrentCheckpoints(1);
ck.setTolerableCheckpointFailureNumber(2);
ck.setCheckpointStorage("hdfs:///flink/checkpoints");

6.2 Interval 与业务延迟的拉扯

Interval 缩短

Interval 拉长

经验上先保证 lastCheckpointDuration << interval(例如 duration 30s、interval 60s),再按 RPO 需求调 interval。没有 universal 最优值——必须结合 state 大小(第 9 篇 TTL 与 第 12 篇 RocksDB 增量)实测。

6.3 Min-pause 与 max-concurrent 的互斥

文档明确:设置了 min-pause-between-checkpoints 就不能使用 max-concurrent-checkpoints > 1

直觉:min-pause 保证两次 checkpoint 之间至少跑 \(T\) ms 的正常数据处理,防止 checkpoint 「背靠背」把管道拖死。高延迟外部调用 + 需要频繁 checkpoint 的场景,有人试图开 concurrent checkpoints——但与 unaligned 互斥,且会放大 state 存储写放大,慎用。

6.4 Timeout 设太短 vs 太长

应使 timeout > p99(checkpoint duration) × 安全系数(1.5–2),并从 metrics 读 lastCheckpointDurationlastCheckpointSize

6.5 实验:Checkpoint Interval 与延迟(可复现步骤)

本机限制:当前环境无 JVM/Flink,以下步骤未在本机执行,不包含实测数据表。在具备 Flink 1.20+ 与 Kafka 的环境可按此复现(与 PLAN 实验台账一致)。

目标:固定作业逻辑与并行度,只改 execution.checkpointing.interval,对比 REST metrics 中的 lastCheckpointDuration 与 source currentEmitEventTimeLag(Kafka connector 文档 Monitoring)。

建议环境

步骤

  1. 部署作业,interval=60000min-pause=30000,跑 15 分钟稳定。
  2. GET /jobs/:jobid/checkpoints 记录 lastCheckpointDurationend_to_end_duration(若有),从 metrics 读 currentEmitEventTimeLag 中位数,≥3 轮采样。
  3. 同样流程改 interval=10000,再改 interval=180000
  4. 对比:interval 缩短时 duration 是否接近 interval(若是则说明 checkpoint 排队);lag 是否系统性升高。

预期定性结论(来自官方文档与工程经验,非本机实测):

读者跑完应把自己的 CPU、Flink 版本、state 大小、三轮中位数填进实验记录,再写进内部 runbook——本文不替读者编造数字。

6.6 调参决策树(工程向)

flowchart TD
  START[checkpoint 经常失败?] -->|是| TIMEOUT{duration > timeout?}
  TIMEOUT -->|是| T1[加大 timeout 或优化 state/存储]
  TIMEOUT -->|否| SYNC[sync 阶段失败?]
  SYNC -->|是| S1[查 TM 日志 / RocksDB OOM]
  SYNC -->|否| ALIGN alignment 极长?
  ALIGN -->|是| U1[开 unaligned 或消背压]
  ALIGN -->|否| T2[查 JM IO / S3 限流]
  START -->|否| LAG{lag 与 interval 相关?}
  LAG -->|是| I1[加大 interval 或 min-pause]
  LAG -->|否| OK[维持现配置]

七、与后续篇章的衔接

话题 本篇覆盖 延伸
State 物理存储 仅提及 StateBackend 第 12 篇 LSM 路径与增量 checkpoint
Savepoint / 升级 对比一句 第 11 篇
端到端 EOS notifyCheckpointComplete 第 14–15 篇
入湖提交频率 interval 与可见延迟 lakehouse 第 19 章
背压 + checkpoint 连锁 unaligned 入门 第 18 篇

Checkpoint 解决的是 引擎内部 一致快照;RocksDB 增量、文件合并(Flink 1.20 实验性 file-merging)决定 snapshot 有多贵。下一篇 Savepoint 解决 人为边界上的迁移与升级;再下一篇进入 RocksDB state backend 的读写与 compaction 细节。


八、边界

本文不展开:

迭代作业(反馈环)默认不支持 checkpoint;强制开启会丢 loop 中的 in-flight 数据(Checkpointing 文档 State Checkpoints in Iterative Jobs)。


参考资料

  1. Apache Flink Documentation 1.20, CheckpointingCheckpointConfigexecution.checkpointing.* 配置项。A 级。
  2. Apache Flink Documentation 1.20, Checkpointing under backpressure(aligned / unaligned、aligned-checkpoint-timeout)。A 级。
  3. Apache Flink Documentation 1.20, Streaming Fault Tolerance(barrier、对齐、恢复)。A 级。
  4. Apache Flink Blog, From Aligned to Unaligned Checkpoints(Part 1–2)。B 级(官方博客)。
  5. FLIP-76: Unaligned Checkpoints(设计动机与 MVP 限制)。A 级(社区 FLIP)。
  6. Chandy & Lamport, Distributed Snapshots: Determining Global States of Distributed Systems(1985)。A 级(原始论文)。
  7. Apache Flink Documentation 1.20, Kafka ConnectorConsumer Offset CommittingKafkaPartitionSplit / split state。A 级。
  8. apache/flink 源码:org.apache.flink.runtime.checkpoint.CheckpointCoordinator。A 级。
  9. 本系列:第 9 篇 键控状态与 TTL第 11 篇 Savepointlakehouse 第 19 章 流式 CDC 入湖

返回 系列目录 · 上一篇 键控状态与 State TTL · 下一篇 Savepoint 与升级恢复

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】交付语义:从 at-most-once 到 exactly-once

用 Source、引擎、Sink 三层模型拆解 at-most-once、at-least-once、exactly-once 的组合规则与最弱环决定律;对照 Flink checkpoint 模式、Kafka 事务与幂等 producer、重复消费/重复写入的三类修复手段,为两阶段提交 sink 铺垫。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。

2026-07-01 · database / distributed

【流式数据处理】背压、故障模式与引擎对照

收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。

2026-07-01 · database / distributed

【流式数据处理】两阶段提交与端到端 Exactly-Once

拆解 Flink GenericTwoPhaseCommitSink 协议:preCommit 进 checkpoint、commit 挂 notifyCheckpointComplete;对照 Kafka 事务 sink、JDBC 与 Iceberg 2PC 落点,以及 commit 前/后崩溃与重复 commit 的幂等边界——与 lakehouse/11 CAS、lakehouse/19 入湖侧对读,不重复表格式全文。


By .