土法炼钢兴趣小组的算法知识备份

【流式数据处理】Flink 运行时模型

文章导航

分类入口
databasedistributed
标签入口
#flink#jobmanager#taskmanager#slot#streamgraph#jobgraph#executiongraph#operator-chain#parallelism#slot-sharing-group

目录

第 6 篇把 Kafka 侧的幂等与事务 producer 钉住了:分区日志持久、ISR 副本、read_committed 隔离未提交消息。Flink 作业从 Kafka 读数据、做窗口聚合、再写下游——这些计算发生在 Flink 集群里。运维第一次打开 Flink Web UI,通常会看到 JobManager、TaskManager、Slot、并行度、背压百分比;提交一个 DataStream 程序后,日志里又会出现 StreamGraph、JobGraph、ExecutionGraph。这些名字各自对应什么物理对象?一条记录从 Source 算子到 Sink 算子,中间经过几次线程切换、几次网络 shuffle?

本文不教 flink run 参数大全,而是把 Flink 1.20+ / 2.x 主线的运行时骨架讲清楚,回答四个会直接决定资源规划和故障排查的问题:

后文默认读者已读过本系列 第 1 篇 的「日志 + 有状态计算」全景,以及 第 4–6 篇 的 Kafka 分区与 offset 语义。事件时间、watermark、窗口算子本身的语义见 第 2、3 篇;Checkpoint 如何把状态与 Kafka offset 绑在一起,留到 第 10 篇

环境说明:本机为 WSL2(Linux 6.6.87.2)、i9-12900K / 32 GiB,未安装 JVM 与 Flink 集群。本文架构结论来自 Apache Flink 官方文档(ArchitectureJobs and Scheduling)与 1.20 / 2.x API 文档;不粘贴未在本机执行的 Web UI 截图或 REST 输出,也不给出任何伪造的吞吐/延迟 benchmark。文末给出可复现的本地集群步骤。

版本锚定:Flink 1.20+ / 2.x 主线。1.13 之前常见的 MemoryStateBackend / RocksDBStateBackend 类名在 1.13 后已统一为 HashMapStateBackend / EmbeddedRocksDBStateBackend(状态后端细节见 第 9 篇)。Flink 2.0 起 DataStream V2 API 与 State V2 并行存在,本篇 runtime 层编译链对 V1/V2 共用同一套 JM/TM 调度模型;API 差异在 第 8 篇 标注。


一、三层进程模型:Client、JobManager、TaskManager

Flink 集群在逻辑上分成三类角色(来源:Flink Documentation,Architecture):

角色 职责 典型部署
Client 解析程序、生成 StreamGraph、提交 JobGraph(或 StreamGraph,见 FLIP-468) flink run 进程、IDE 本地 main
JobManager(JM) 接收作业、编译 ExecutionGraph、调度 Task、协调 checkpoint、故障恢复 1 个 active JM(HA 下 standby)
TaskManager(TM) 提供 Slot 资源、执行 Task、维护网络缓冲与本地状态 多个 TM 水平扩展
flowchart TB
  CL["Client<br/>StreamGraph / 提交"]
  JM["JobManager<br/>调度 · checkpoint · 故障恢复"]
  TM1["TaskManager 1<br/>Slot × N"]
  TM2["TaskManager 2<br/>Slot × N"]
  CL -->|"JobGraph / StreamGraph"| JM
  JM -->|"部署 Task"| TM1
  JM -->|"部署 Task"| TM2
  TM1 <-->|"shuffle 网络"| TM2

和 Spark 对照(点到为止,引擎全面对照见 第 18 篇):Spark 的 Driver 近似 Flink 的 JobManager + Client 提交逻辑;Spark Executor 近似 Flink TaskManager。差异在于 Flink 把 Slot 作为 TM 内可共享的资源槽位显式建模,且 checkpoint 协调 由 JM 集中驱动——这与 Kafka consumer group 里「协调器分配分区、各 consumer 拉取」的分工类似(见 第 5 篇),但 Flink 协调的是 有状态的计算 Task 而非单纯的消息拉取。

1.1 JobManager 内部组件

JM 不是单线程调度器。Flink 1.20+ 文档把 JM 职责拆成(名称随版本略有演进,语义稳定):

一次作业失败重启时,JM 根据最近一次 成功的 checkpoint第 10 篇)恢复 ExecutionGraph,并在可用 Slot 上 重新部署 Task。这与 Kafka 故障后 consumer 从 已提交 offset 继续消费(第 5 篇)对称:Flink 把 算子状态 + source offset 绑在同一 checkpoint 里,而不是只记 Kafka offset。

1.2 TaskManager 与 Slot

每个 TaskManager 是一个 JVM 进程,向集群声明 taskmanager.numberOfTaskSlots 个 Slot(默认 1)。一个 Slot 在同一时刻只运行一个 Task 线程(一个 subtask 实例),但 多个算子链(operator chain)可以串在同一个 Task 里顺序执行——这是后文 operator chain 的物理落点。

Slot 不是 CPU 核的 1:1 映射。生产上常见配置是 TM 进程数 × 每 TM Slot 数 ≥ 作业最大并行度,并留余量给 rescale 与多作业 SlotSharing。Slot 过小会导致并行度上不去;Slot 过大而并行度低则浪费 TM 内存(每个 Slot 有独立的网络缓冲与状态后端实例开销)。

概念 含义
TaskManager 执行节点 JVM,向 RM 注册 Slot 资源
Slot TM 内的资源槽,部署一个 subtask(可能含整条算子链)
Task Slot 上运行的线程,对应 ExecutionGraph 里一个 Execution 的一次尝试

二、四层编译链:StreamGraph → JobGraph → ExecutionGraph → Task

用户写的 DataStream 程序在提交后,会经历 四次表示(来源:Flink Documentation,Jobs and Scheduling;源码包 org.apache.flink.streaming.api.graphorg.apache.flink.runtime.jobgraphorg.apache.flink.runtime.executiongraph):

flowchart LR
  SG["StreamGraph<br/>逻辑 DAG,每 API 算子一个 StreamNode"]
  JG["JobGraph<br/>chain 后的 JobVertex + JobEdge"]
  EG["ExecutionGraph<br/>按并行度展开 ExecutionVertex"]
  TK["Task<br/>Slot 上的物理线程"]
  SG --> JG --> EG --> TK

2.1 StreamGraph:API 层的逻辑 DAG

Client 侧(或在 JM 侧,若走 StreamGraph 提交模式)把 StreamExecutionEnvironment 上的 transformation 转成 StreamGraph

StreamGraph 是「用户视角的逻辑计划」,节点数往往多于最终运行的 Task 数——因为后续会把可 chain 的节点合并。

2.2 JobGraph:chain 优化后的调度单元

JobGraph 是 JM 接受的低层数据流表示(来源:Jobs and Scheduling)。转换步骤核心:

  1. 从 Source 节点 DFS 遍历 StreamGraph。
  2. 应用 operator chaining 规则,把可合并的 StreamNode 合成一个 JobVertex
  3. 不可合并的算子各自成为独立 JobVertex。
  4. JobVertex 之间用 JobEdge 连接,边关联 IntermediateDataSet(中间结果集)。

JobVertex 是调度与 checkpoint barrier 对齐的基本单位:一个 JobVertex 内所有算子共享同一个并行度,并在同一 Task 线程内顺序执行(若 chain 成功)。

2.3 ExecutionGraph:并行化后的执行图

JM 收到 JobGraph 后构建 ExecutionGraph(接口文档:ExecutionGraph,Flink 1.20 API):

因此:逻辑上一个 map 算子 在 ExecutionGraph 里可能是 128 个并行 subtask,每个 subtask 是一个可独立失败、独立重启的 Execution。

2.4 Task:Slot 上的物理执行

Scheduler 把 READY 状态的 Execution 部署到可用 Slot,在 TM 上启动 Task 线程。Task 加载 JobVertex 的 chain 数组(多个 StreamOperator 串行),从网络输入缓冲读 record、依次调用各算子、写出到下游 partition。

一条 record 在 chain 内部 通常是 Java 对象引用传递,无序列化;跨 JobVertex 才走网络 shuffle(序列化 + 网络缓冲 + 反序列化)。理解这一边界,是优化延迟与排查「为什么 CPU 不高但吞吐上不去」的第一步。

2.5 FLIP-468:StreamGraph 提交模式(1.20+ 演进)

FLIP-468 引入可选的 StreamGraph 直传 JM 路径:Client 提交 StreamGraph,由 JM 内编译 JobGraph,而不是 Client 预编译 JobGraph。动机包括:

无论 Client 还是 JM 编译 JobGraph,ExecutionGraph 与 Task 部署路径不变。运维上你仍会在 Web UI 看到 JobVertex 级 DAG 与 subtask 指标。


三、Operator Chain:何时合并、何时必须断开

Operator chaining 的目标:把并行度相同、一对一连接、可安全串行的算子放进同一 Task,减少线程切换与序列化(来源:Flink Documentation,Operators / Physical 相关章节;源码 StreamingJobGraphGenerator)。

3.1 默认会 chain 的典型模式

DataStream<String> lines = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");
DataStream<String> words = lines
    .flatMap((String line, Collector<String> out) -> { /* ... */ })
    .returns(String.class)
    .filter(s -> s.length() > 0)
    .map(String::toUpperCase);

flatMapfiltermap 并行度一致,且之间是 forward 连接(无 shuffle),Flink 默认倾向把它们 chain 成 一个 JobVertex / 一个 Task。Web UI 里会显示算子名称用 -> 连接,如 Flat Map -> Filter -> Map

3.2 必须断开 chain 的常见原因

条件 后果 典型算子
并行度变化 新 JobVertex rescalerebalance 前后
非 forward 分区 shuffle 边 keyBybroadcast
用户禁止 chain 独立 Task disableChaining()
Sink/SOURCE 特殊 独立 vertex 部分 Source/Sink 实现
不同 SlotSharingGroup 资源隔离 见第五节

keyBy 会引入 HASH 分区,必然产生新的 JobVertex——这也是 Keyed State 只能出现在 keyBy 之后的原因(第 8、9 篇)。

3.3 手动控制 chain

// 强制与上游断开,单独占一个 Task
someStream.map(new MyMap()).disableChaining();

// 强制与下游合并(在允许的规则内)
someStream.map(new MyMap()).startNewChain(); // 语义:从此处开始新 chain

// 全局开关
env.disableOperatorChaining();

生产上 不建议全局 disable:会把每个算子变成独立 Task,线程与序列化开销陡增。更常见的是对 重 CPU 算子单独 disableChaining,以便 Web UI 里单独看其背压与 metrics。

3.4 与 Kafka Source 的衔接

Flink Kafka Source(KafkaSource,1.14+ 统一 Source API)在 无 rebalance 且并行度与 topic 分区对齐时,常与解析/map 算子 chain。每个 subtask 消费固定分区子集(见 第 4 篇 的分区内有序语义)。一旦在 Source 后立刻 keyBy,chain 断开,Source subtask 与下游 keyed 算子 不再同线程——offset 提交仍在 Source 算子状态里,由 checkpoint 持久化(第 10 篇),与下游 state 分片无关。


四、并行度、maxParallelism 与 rescale 边界

4.1 三个容易混淆的「度」

名称 作用域 能否改
parallelism 某算子/作业当前并行 subtask 数 可 rescale(受 max 限制)
maxParallelism KeyGroup 数量上界,决定 keyed state 如何切分 首次启动后不可降低;提高后需从 savepoint 恢复
Slot 数 集群资源上界 扩 TM 可增

maxParallelism 默认由 Flink 根据 parallelism 推导(约 \(1.5 \times \text{parallelism}\),且 \(\geq 128\) 等规则,见 DefaultMaxParallelism 源码),也可显式设置:

env.setMaxParallelism(256);
env.setParallelism(48);

Keyed 作业 rescale 时,state 按 KeyGroup 重分布(第 8 篇)。KeyGroup 个数 等于 maxParallelism,因此 maxParallelism 一旦定得太小,后续无法把并行度扩过该值而不丢 state 兼容性;定得过大则空 KeyGroup 多,checkpoint 元数据略膨胀——这是 工程上的固定成本,不是运行时 CPU 线性开销。

4.2 与 Kafka 分区数的关系

Kafka topic 分区数 \(P\) 与 Source 并行度 \(F\) 的常见关系:

下游 keyBy 后并行度可以与 \(P\) 脱钩,由 数据倾斜与 state 大小 决定(第 9、18 篇)。

4.3 设置并行度的入口

// 全局默认
env.setParallelism(16);

// 单算子
dataStream.map(...).setParallelism(32);

// Source / Sink 常单独设
env.fromSource(..., WatermarkStrategy..., "kafka").setParallelism(12);

Flink 2.x 仍保留上述 V1 API;SQL 作业则还有 算子级 parallelism hintresource 声明,本篇不展开 SQL 优化器。


五、SlotSharingGroup 与资源隔离

默认情况下,同一作业的所有 JobVertex 共享 SlotSharingGroup "default":不同 JobVertex 的 subtask 可以复用同一个 Slot(时间多路复用),提高集群利用率。例如并行度 8 的 map 与并行度 8 的 map 不会各占 8 个 Slot,而可能 8 个 Slot 上各跑一条 chain

5.1 何时需要拆分 SlotSharingGroup

场景 做法
重 CPU 与重 IO 算子争抢同一 Slot 把 IO 密集 Sink 放到独立 group
希望某算子独占 Slot 资源 slotSharingGroup("heavy") + TM Slot 数规划
与外部系统连接数限制 限制某 Source/Sink 并行度与 Slot 布局
DataStream<Event> parsed = source
    .slotSharingGroup("source-group")
    .map(...);

parsed.keyBy(...).window(...).slotSharingGroup("compute-group");

同一 SlotSharingGroup 内的算子才能 chain(并行度相同且 forward 时)。跨 group 必然不同 Slot,chain 断开。

5.2 与 YARN/K8s 资源

taskmanager.memory.process.size 与 Slot 数决定 每个 Slot 分到的堆/off-heap。EmbeddedRocksDBStateBackend 的 block cache 多在 TM 级共享,但 keyed state 按 subtask 隔离目录(第 12 篇)。SlotSharing 不会混用 keyed state,但 会共享 TM 内存池——大 state 作业应控制 每 TM Slot 数,避免多个重 state subtask 同 TM OOM。


六、作业生命周期与 Web UI 对照

stateDiagram-v2
  [*] --> CREATED: Client 提交
  CREATED --> RUNNING: Slot 分配 + Task 启动
  RUNNING --> FAILING: Task 异常 / checkpoint 失败
  FAILING --> RESTARTING: 固定延迟重启策略
  RESTARTING --> RUNNING: 从 checkpoint 恢复
  RUNNING --> FINISHED: 有界流结束
  RUNNING --> CANCELED: 用户 cancel
  FAILING --> FAILED: 重启次数耗尽

对照 Web UI(字段名随版本略有差异):

UI 概念 运行时对象
Job Graph JobVertex 级 DAG(chain 后)
Task Managers TM 进程列表与 Slot 占用
Subtasks ExecutionVertex 实例
Bytes/Records Received/Sent IntermediateResultPartition shuffle
Backpressure 下游 credit 不足反压上游(第 18 篇

Restarts 计数高并不一定是 bug:Kafka rebalance(第 5 篇)或 TM 滚动升级也会触发 全图重启。区分「预期重启」与「checkpoint 连锁失败」要看 checkpoint 成功率Root Exception第 10 篇)。


七、与 lakehouse 入湖侧的分工

lakehouse 第 19 章入湖侧 讲 Writer/Committer 与 checkpoint 对齐;本篇从 引擎侧 讲 Task 如何跑在 Slot 上。衔接点:


八、本地复现:从提交日志到 ExecutionGraph

下列步骤按 Flink 官方 Local Cluster 文档可复现;以下命令未在本写作环境执行,仅作读者实验入口。

# 下载 Flink 1.20.x 或 2.x 二进制包并解压
export FLINK_HOME=/path/to/flink
$FLINK_HOME/bin/start-cluster.sh

# 提交示例作业(自带 WordCount jar 或打包自己的 DataStream 程序)
$FLINK_HOME/bin/flink run -p 4 $FLINK_HOME/examples/streaming/WordCount.jar

# Web UI
# http://localhost:8081  → Job Graph / Task Managers / Subtasks

在 Client 日志中搜索 JobGraph / StreamGraph 打印(需 -Dlog4j.logger.org.apache.flink=DEBUG 或对应 log4j2 配置),可看到 chain 后的 vertex 列表。修改程序加入 keyBy 后再提交,对比 JobVertex 数量增加chain 名称变化

验证 SlotSharing:提交并行度 4 的两段 map 作业,在 UI 查看 Task Managers → Slots used,对比把某段 map 设为独立 slotSharingGroup("io") 前后 Slot 占用差异。

8.1 Pipeline 与多作业提交

PipelineStreamExecutionEnvironment.execute() 单次提交)内的多个 DataStream sink 会合成 一个 JobGraph。若用 StatementSet(Table API)或多次 execute(),则产生 多个独立 JobGraph,各自占用 Slot。混部时要注意 Slot 总数 ≥ 各作业并行度峰值之和(除非刻意 SlotSharing 跨作业——默认不共享)。

Flink 2.x 的 Application Modemain() 运行在 JM 侧:Client 只上传 jar,StreamGraph 在 JM 生成——与 FLIP-468 的 StreamGraph 提交方向一致,利于大 job 的 classpath 与依赖管理(来源:Flink Documentation,Application Mode)。


九、术语表

术语 含义
StreamGraph Client 从 DataStream API 生成的逻辑 DAG
JobGraph chain 优化后 JM 接受的调度图(JobVertex + JobEdge)
ExecutionGraph 按并行度展开后的执行图(ExecutionVertex / Execution)
JobVertex 调度与 barrier 对齐单位,可含多个 chain 在一起的算子
Operator chain 多个算子串在同一 Task 线程内顺序执行
Slot TaskManager 内的资源槽,部署一个 subtask
SlotSharingGroup 决定哪些 JobVertex 的 subtask 可共享 Slot
maxParallelism KeyGroup 数上界,影响 keyed state rescale
ExecutionAttemptID 单次 Task 尝试的唯一标识,用于 JM–TM 消息

十、部署模式与资源参数(Standalone / YARN / K8s)

运行时模型不随部署模式改变,但 谁启动 TM、Slot 如何装箱 随集群类型变化。三种常见模式(来源:Flink Documentation,Deployment):

模式 JM/TM 谁管 适用
Standalone 手工或脚本启停 开发、PoC
YARN / Hadoop RM 申请 container 传统企业 Hadoop 栈
Native Kubernetes K8s Operator / Flink K8s 集成 云原生、容器化

与运行时直接相关的配置项(flink-conf.yaml):

# 每个 TaskManager 的 Slot 数
taskmanager.numberOfTaskSlots: 4

# TM 进程总内存(Flink 1.10+ 统一内存模型)
taskmanager.memory.process.size: 4096m

# 默认并行度(可被 env.setParallelism 覆盖)
parallelism.default: 4

Flink 1.20+ / 2.x 使用 Flink 内存模型 划分 framework / task / network / managed 内存;RocksDB state 的 managed memory 比例影响 block cache 与 write buffer(第 12 篇)。JM 堆过小会在 大并行度 + 宽 ExecutionGraph 时 OOM;TM network 内存过小会在 高 shuffle 宽依赖 时 backpressure 加剧(第 18 篇)。

K8s 上 TaskManager Pod 数 × 每 Pod Slot 数 应覆盖峰值并行度,并预留 滚动升级 时的双跑窗口。与 Kafka consumer 静态成员第 5 篇)配合,可减少 Flink 重启触发的 rebalance 风暴。


十一、常见问题与排查入口

现象 运行时层可能原因 下一步
可用 Slot 为 0 TM 未注册或内存不足 RM / K8s 事件、TM 日志
Subtask 数少于预期并行度 Slot 不够,部分 Execution 等待 扩 TM 或降并行度
同一 JobVertex 内算子不 chain 中间有 shuffle 或 disableChaining 看 Job Graph 链名称
rescale 后 state 不兼容 maxParallelism 或 key 类型变更 从 savepoint 恢复规则(第 11 篇
Source 空闲 subtask 并行度 > Kafka 分区数 对齐 \(F\)\(P\)(第四节)

Credit-based flow control 在 TM 网络层实现:下游 input gate 给上游 output buffer credit,下游慢则 credit 耗尽,上游 block——这在 UI 上显示为 backpressure。背压根因可能在下游算子 CPU、RocksDB 读放大、或外部 Sink 慢(第 18 篇),不要只看 JM 线程栈。


十二、小结

Flink 运行时把 DataStream 程序 编译成四层表示:StreamGraph 反映 API 逻辑;JobGraph 通过 operator chain 合并可串行算子;ExecutionGraph 按并行度展开 subtask;Task 在 TM Slot 上物理执行。理解 chain 在 JobVertex 内、shuffle 在 JobVertex 间,就能解释延迟、序列化开销与背压传播路径。并行度与 Kafka 分区对齐决定 Source 吞吐;maxParallelism 与 KeyGroup 绑定 keyed state 的 rescale 上限;SlotSharingGroup 在利用率与隔离之间做取舍。

下一篇进入 DataStream 算子语义:Source/Transform/Sink、shuffle 策略、keyBy 如何映射 KeyGroup,以及 ProcessFunction 与 TimerService 如何驱动 第 2 篇 的事件时间逻辑。


参考资料

  1. Apache Flink Documentation, Architecture(JM/TM/Slot 角色划分)。
  2. Apache Flink Documentation, Jobs and Scheduling(JobGraph、ExecutionGraph、Execution 状态机)。
  3. Apache Flink Documentation, Parallel Execution(并行度、SlotSharing、operator chain 配置)。
  4. Apache Flink 1.20 API, org.apache.flink.runtime.executiongraph.ExecutionGraph(ExecutionVertex / Execution 语义)。
  5. Apache Flink FLIP-468: Introducing StreamGraph-Based Job Submission(StreamGraph 提交 JM 模式)。
  6. Apache Flink 源码 StreamingJobGraphGenerator(StreamGraph → JobGraph chain 规则,版本随 release 标注)。
  7. 本系列 第 4–6 篇(Kafka 分区、offset、事务与 Flink 衔接)。
  8. 本系列 第 1 篇(流处理全景与系列地图)。

返回 系列目录 | 上一篇:Kafka 事务与幂等 Producer | 下一篇:DataStream 与算子语义

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】事件时间、处理时间与 Watermark

拆解 event time、processing time、ingestion time 三种时间语义,给出 watermark 的形式化含义与 bounded-out-of-orderness 等生成策略,并说明侧输出、allowed lateness 如何处理迟到数据;附 event-time 与 processing-time 窗口对比的可复现实验步骤。

2026-07-01 · database / distributed

【流式数据处理】窗口:滚动、滑动与会话

从 WindowAssigner 三类(Tumbling、Sliding、Session)出发,讲清窗口 state 如何随 key 与窗口实例增长,Trigger 与 Evictor 如何改变 firing 与清理节奏,GlobalWindow 自定义 Trigger 的边界,并与批式 GROUP BY 时间分桶对照;附三种窗口 state 观测的可复现步骤。

2026-07-01 · database / distributed

【流式数据处理】DataStream 与算子语义

拆解 Source/Transform/Sink 数据流图、rebalance/keyBy/broadcast 等 shuffle 策略、keyBy 到 KeyGroup 的映射,以及 ProcessFunction 与 TimerService 如何承载事件时间逻辑,并引入算子状态与键控状态的分工边界。


By .