第 7
篇 把 Flink 作业编译成 StreamGraph → JobGraph →
ExecutionGraph,并说明 shuffle 只发生在 JobVertex
之间。本篇回答:用户写的
fromSource、map、keyBy、window
在语义上各是什么?一条 record 何时必须走网络、何时可以
forward?keyBy 之后为什么才能用
ValueState?ProcessFunction 里的定时器和 第
2 篇 的 watermark 如何配合?
本文聚焦 Flink DataStream API(V1)——本系列主线。Flink 2.x 引入 DataStream V2 与 State V2(见官方 Working with State V2),语义与 V1 对齐但接口包名不同;文末单独标注版本边界。不写 Flink SQL 优化器,也不写 Table API 到 DataStream 的转换细节。
先修:第 1 篇 的流表对偶;第 2、3 篇 的事件时间与窗口;第 4–6 篇 的 Kafka 分区有序与事务;第 7 篇 的 chain 与并行度。
环境说明:本机 WSL2,未安装 Flink。shuffle 分布实验给出 可复现的验证思路与代码骨架,不伪造 subtask 计数输出。架构与 API 行为以 Flink 1.20+ / 2.x 官方文档为准。
一、DataStream 程序的三段结构
一个典型作业在 API 层总是 Source → Transformation → Sink(来源:Flink Documentation,DataStream API):
flowchart LR
SRC["Source<br/>Kafka / 文件 / 自定义"]
TR["Transformation<br/>map / keyBy / window / connect"]
SNK["Sink<br/>Kafka / JDBC / 湖 / 打印"]
SRC --> TR --> SNK
| 阶段 | 职责 | 与 Kafka 对照 |
|---|---|---|
| Source | 持续读外部系统,注入 watermark(若配置) | Consumer 拉分区、提交 offset(第 5 篇) |
| Transformation | 无界流上的算子逻辑,可能 shuffle | 无直接 Kafka 对应;分区键类似 keyBy
前的业务键 |
| Sink | 写出结果,参与 EOS 时做 2PC(第 15 篇) | Producer / 事务 commit(第 6 篇) |
DataStream<T> 是逻辑句柄:携带类型
\(T\)、分区策略、并行度、watermark
策略。对 DataStream 的每次
map/filter 返回新
DataStream,底层在 StreamGraph 上追加
StreamNode。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setGroupId("flink-demo")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> raw = env.fromSource(
kafkaSource,
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> parseTs(event)),
"kafka-source");
DataStream<CountResult> counts = raw
.flatMap(new Tokenizer())
.keyBy(w -> w.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.sum(1);
counts.sinkTo(kafkaSink); // 或 print() 调试
env.execute("word-count-streaming");1.1 Unified Source/Sink API(1.14+)
Flink 1.14 起推荐 Source/Sink v2
接口(Source / Sink),Kafka
connector 的 KafkaSource /
KafkaSink 已统一:
| 旧 API | 新 API |
|---|---|
FlinkKafkaConsumer |
KafkaSource + fromSource |
FlinkKafkaProducer |
KafkaSink + sinkTo |
Split 枚举(partition 作为 split)在 SourceReader 内并行拉取;offset 存 split state,语义仍属 Operator State 范畴(第 10 篇)。1.20+ 旧 consumer/producer 标记 deprecated,新作业应直接用 Unified API。
env.execute() 触发 StreamGraph
生成与提交(第 7
篇)。有界流用 execute()
同样成立,Source 读完后作业
FINISHED;无界流则长期
RUNNING。
二、Transformation 分类与 One-to-One 边界
2.1 常见算子与是否引入 shuffle
| 算子 | 是否改变分区 | 典型 shuffle |
|---|---|---|
map / flatMap /
filter |
否 | forward(同并行度) |
keyBy |
是 | HASH 按 key |
rebalance / rescale |
是 | REBALANCE / RESCALE |
broadcast |
是 | BROADCAST 到所有 subtask |
window + 聚合 |
否(在 keyBy 之后) | 依赖上游 keyBy |
connect / coMap |
视情况 | 常配合 broadcast |
Physical 分区类型(源码
StreamPartitioner)决定 record 如何选下游
subtask:
- FORWARD:上游 subtask \(i\) 只发下游 \(i\)(并行度相同时)。
- REBALANCE: round-robin 到所有下游 subtask。
- RESCALE: round-robin 到下游 部分 subtask 子集(本地性优化)。
- HASH:
key.hashCode()经 KeyGroup 映射(见第三节)。 - BROADCAST:复制到每个下游 subtask。
- CUSTOM:用户自定义
Partitioner。
2.2 隐式 shuffle:keyBy 与 rebalance
// 显式 rebalance:解决 Source 分区局部性导致的数据倾斜前兆
DataStream<Event> balanced = parsed.rebalance();
// keyBy:按业务键哈希,后续 keyed state / window 都绑定此键
KeyedStream<Event, String> keyed = balanced.keyBy(Event::getUserId);没有 keyBy 就不能使用 Keyed
State(ValueState 等,第 9
篇)。WindowAll 可以在非 keyed
流上做窗口,但 state 是 单并行度 或
Operator State 语义,无法水平扩展 keyed
聚合。
三、keyBy 与 KeyGroup:状态分片的根
3.1 从 key 到 subtask
对 keyed 算子,Flink 使用两步映射(来源:Flink Documentation,Stateful Stream Processing):
- Key → KeyGroup:$ = (key, ) $
- KeyGroup → subtask:$ = $
KeyGroup 个数 = maxParallelism(第 7 篇)。parallelism 变化时,同一 KeyGroup 仍映射到确定 subtask 集合,state 随 KeyGroup 迁移——这是 rescaling 的基础(第 11 篇)。
flowchart TB
K["业务 key (userId)"]
KG["KeyGroup id<br/>0 .. maxParallelism-1"]
ST["subtask index<br/>0 .. parallelism-1"]
K -->|"MurmurHash 等"| KG -->|"mod parallelism"| ST
MurmurHash 等哈希函数保证 key 均匀打散;若业务 key 本身倾斜(热点 userId),再均匀的哈希也无法救 subtask 负载不均——这是 第 18 篇 的数据倾斜主题,不是 shuffle 策略能单独解决的。
3.2 KeyGroup 与 Kafka 分区键
Kafka producer 按 record key 选分区(第
4 篇),保证 分区内有序。Flink
keyBy 的键 可以与 Kafka key
相同也可以不同:
- 相同:同一业务实体在 Kafka 与 Flink 中走固定分区/subtask,利于 局部性;但 Kafka 分区数与 Flink parallelism 仍可能不一致。
- 不同:例如 Kafka 按
deviceId分区,Flink 按userId聚合——跨分区重分布,shuffle 成本上升,但语义正确。
3.3 maxParallelism 与 key 类型
key 类型必须在 整个作业生命周期 内与 state 序列化器一致。改 key 类型或 maxParallelism 降档会导致 savepoint 不兼容(第 11 篇)。
四、Shuffle 策略详解
4.1 rebalance:强制 round-robin
用途:打破 Source 分区与 subtask
固定绑定 带来的热点。例如 Kafka
某分区消息量远大于其他分区,Source subtask 负载不均,在
Source 后加 rebalance() 可把 record
均匀打到下游。
代价:每条 record 一次网络 shuffle + 序列化。不要在最热路径上滥用。
4.2 keyBy:HASH 与 KeyedStream
keyBy 返回
KeyedStream,只能调用 keyed
算子(window、process with keyed
state 等):
keyedStream
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateTag)
.aggregate(new MyAggregate());Multiple key selectors 用
KeySelector lambda 或 KeySelector
类;复合键用 Tuple2 等,注意
equals/hashCode 语义。
4.3 broadcast:规则流与 Broadcast State
MapStateDescriptor<String, Rule> ruleDesc =
new MapStateDescriptor<>("rules", String.class, Rule.class);
BroadcastStream<Rule> rules = env
.fromSource(ruleSource, WatermarkStrategy.noWatermarks(), "rules")
.broadcast(ruleDesc);
DataStream<Alert> alerts = events
.connect(rules)
.process(new BroadcastProcessFunction<Event, Rule, Alert>() { /* ... */ });Broadcast State 是 Operator State 的一种特殊形式:每个 subtask 持有 完整规则副本,适合 小体积、低频更新 的配置流。大表 join 不应滥用 broadcast(内存 × parallelism 膨胀)。
4.4 分区策略选择表
| 目标 | 策略 |
|---|---|
| 同并行度管道内省 shuffle | 保持 forward(默认) |
| 纠正 Source 倾斜 | rebalance() |
| 按业务键聚合 / state | keyBy() |
| 小维表 / 规则下发 | broadcast() |
| 部分下游 subtask | rescale() |
五、ProcessFunction 与 TimerService
ProcessFunction 是 单条 record 级别 的底层 API,可注册 processing-time / event-time 定时器,访问 Keyed State 与 侧输出(来源:Flink Documentation,Process Function)。
5.1 与 MapFunction 的差异
| 能力 | MapFunction | ProcessFunction |
|---|---|---|
| 访问 Keyed State | 否 | 是 |
| 注册定时器 | 否 | 是 |
| 侧输出 | 否 | 是 |
| 每条 record 调用 | map(T) |
processElement(T, ctx, out) |
第
2 篇 的 allowed lateness 与
侧输出迟到数据,在 API 层常通过
ProcessFunction 或 WindowFunction
实现;窗口算子内部也依赖 timer 触发 cleanup。
5.2 事件时间定时器示例骨架
public class SessionGapFunction
extends KeyedProcessFunction<String, Event, SessionResult> {
private ValueState<Long> lastEventTs;
private ValueState<List<Event>> buffer;
@Override
public void open(OpenContext openContext) {
lastEventTs = getRuntimeContext().getState(
new ValueStateDescriptor<>("last-ts", Long.class));
// buffer 可用 ListState,见第 9 篇
}
@Override
public void processElement(Event event, Context ctx, Collector<SessionResult> out)
throws Exception {
long ts = event.getTimestamp();
// 注册 event-time timer:当 watermark >= ts + gap 时触发
ctx.timerService().registerEventTimeTimer(ts + SESSION_GAP_MS);
// 更新 state ...
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SessionResult> out)
throws Exception {
// watermark 已推进到 timestamp,可安全关闭会话
emitSession(out);
lastEventTs.clear();
}
}TimerService 规则(来源:官方文档 Timely Stream Processing):
- Event-time timer 在 watermark ≥ 触发时间 时 fire;乱序事件可能导致 timer 注册顺序与 fire 顺序交错。
- Processing-time timer 在系统时钟推进时 fire,与 watermark 无关。
- 同一 key 的 timer 去重:相同 timestamp 的同类型 timer 只保留一个。
5.3 侧输出
OutputTag<Event> lateTag = new OutputTag<>("late-events") {};
// 在 window 或 process 中
ctx.output(lateTag, lateEvent);
// 主流程外消费
DataStream<Event> late = main.getSideOutput(lateTag);侧输出流 独立并行度,可单独 sink 到 Kafka dead-letter topic(第 4 篇 的独立 topic 治理)。
5.4 KeyedProcessFunction 与 CoProcessFunction
CoProcessFunction 连接两条流(一条
keyed、一条 broadcast 或 keyed),在
processElement1 / processElement2
分别处理;connect + keyBy 组合可实现
双流 join 的 state 维护。与
Interval Join(SQL/DataStream API
高层)相比,ProcessFunction 暴露完整 timer/state,适合 第
2 篇 的 自定义乱序边界。
注意:只有 KeyedStream
上的 KeyedProcessFunction 能按 key 注册
timer;非 keyed 的 ProcessFunction 无 key
上下文,timer 是 全局 per subtask
语义。
六、窗口算子在本篇的边界
第 3 篇 专讲 Tumbling / Sliding / Session 与 Trigger/Evictor。本篇只强调 运行时衔接:
window()必须在 KeyedStream 上调用(windowAll除外)。- 窗口 state 是 Keyed State,按 (key, windowId) 命名空间存储(第 9 篇)。
- Event-time 窗口 触发依赖 watermark;Processing-time 窗口 依赖 processing-time timer。
窗口聚合常用 AggregateFunction + ProcessWindowFunction 组合:前者增量计算省 state,后者在 fire 时拿全量上下文输出。
七、算子状态与键控状态:初探
Flink 两类 state(来源:Flink Documentation,Working with State):
| 类型 | 绑定对象 | 访问 API | 典型场景 |
|---|---|---|---|
| Operator State | 并行 subtask | ListState / UnionListState /
BroadcastState |
Kafka offset、缓冲批次、规则表 |
| Keyed State | 每个 key | ValueState / ListState /
MapState / … |
聚合、会话、去重 |
Keyed State 只能在 KeyedStream
上的算子 中访问(RichFunction 的
getRuntimeContext().getState(...))。Operator
State 在任意 RichFunction 上通过
getRuntimeContext().getOperatorStateStore()
注册。
7.1 Kafka Source 与 Operator State
Flink Kafka Source 把 partition → offset
存在 Operator State(或 split
枚举机制,Unified Source 框架下为
SourceReader 状态)。checkpoint 时写入
全部分区的 offset 快照(第 10
篇),与 第
5 篇 的 consumer offset commit
分工不同:Flink 默认
不依赖 Kafka
__consumer_offsets 的 auto commit,而以
checkpoint 为准。
7.2 何时用哪类 state
- 按 key 聚合、窗口、CEP:Keyed State。
- Source/Sink 与外部系统对齐位点:Operator State(或新版 Source API 内置)。
- 广播维表:Broadcast State(Operator State 子类)。
Keyed State 的五种结构与 TTL 在 第 9 篇 展开;RocksDB 路径在 第 12 篇。
八、Sink 与交付语义钩子
Sink 算子常是 独立 JobVertex(不与上游 chain),以便单独 metrics 与 2PC(第 7 篇)。
| Sink 类型 | 语义要点 |
|---|---|
print / collect |
调试,无 EOS |
| Kafka Sink | 事务 + checkpoint 对齐(第 6、15 篇) |
| 文件 / 湖 Sink | pre-commit 文件 + committer(lakehouse 第 19 章) |
SinkFunction 与 Sink
V2(Flink 1.12+ Sink 接口)并存;2.x
推荐 Unified Sink API。EOS 作业必须选用
支持两阶段提交 的 Sink 实现(第
14、15 篇)。
九、验证 shuffle 分布(实验思路)
目标:观察 rebalance vs 无
rebalance 时下游 subtask 记录数分布(PLAN
第 8 篇实验)。本环境未跑
Flink,给出可复现步骤:
- 本地起 Kafka KRaft + Flink(版本 1.20+ 或 2.x),向单分区 topic 灌入足够多事件(或故意只写 partition 0)。
- 作业
A:
source.map(record -> { subtaskIndex; return record; })统计各 subtask 计数,无 rebalance。 - 作业 B:在 map 前加
.rebalance()。 - 在 Flink Web UI Subtasks 看各并行实例
Records Sent,或通过 metric
numRecordsOutPerSecond聚合。
预期(工程推导,非本机实测):单分区 Source 无 rebalance 时,仅 一个 Source subtask 有输入,forward 到同 index 下游 subtask,分布极度不均;rebalance 后下游计数接近 均匀(± 抽样波动)。keyBy 则按 key 哈希,热点 key 导致 skew 而非均匀——这与 rebalance 目标不同。
// 统计用 map(示意)
public class SubtaskTagMap extends RichMapFunction<Event, Tuple2<Integer, Event>> {
@Override
public Tuple2<Integer, Event> map(Event value) {
return Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), value);
}
}十、Flink 2.x 与 DataStream V2 边界
| 项目 | Flink 1.20+ | Flink 2.x |
|---|---|---|
| DataStream V1 | 主线,本系列示例语言 | 仍支持 |
| DataStream V2 | 预览/部分 backport | 官方推荐新项目评估 |
| State V2 | — | 新 state 接口,TTL 语义延续 |
OpenContext |
1.20 引入,替代部分
open(Configuration) |
沿用 |
运行时 shuffle、KeyGroup、ExecutionGraph 对 V1/V2
一致。迁移 V2 时主要改 包名与
ProcessFunction
基类;本篇机制描述仍适用。
十一、与系列其他篇章的交叉引用
flowchart LR
SP08["本篇: shuffle / keyBy / ProcessFunction"]
SP02["02 事件时间与 watermark"]
SP03["03 窗口"]
SP07["07 运行时 JobVertex"]
SP09["09 Keyed State / TTL"]
SP10["10 Checkpoint"]
K05["05 Consumer offset"]
K06["06 Kafka 事务"]
SP08 --> SP02
SP08 --> SP03
SP08 --> SP07
SP08 --> SP09
SP09 --> SP10
SP08 --> K05
SP10 --> K06
- 第
2 篇:
WatermarkStrategy在 Source 上注入;ProcessFunction 的 event-time timer 受 watermark 驱动。 - 第 3
篇:窗口 state 与 Trigger;本篇
keyBy是窗口前置条件。 - 第 6 篇:EOS pipeline 的 Sink 侧事务;与本篇 Source 消费分区衔接。
- distributed 系列:record 路由 与 一致性 的直觉可对照日志复制,但 Flink shuffle 是 计算层 路由,不是 Kafka 副本。
十二、术语表
| 术语 | 含义 |
|---|---|
| KeyedStream | keyBy 后的流,启用 Keyed State |
| KeyGroup | keyed state 重分布的原子单位,个数 = maxParallelism |
| Shuffle | 跨 subtask 的网络数据交换 |
| ProcessFunction | 带 state / timer / 侧输出的底层算子 |
| TimerService | 注册 processing-time / event-time 定时器 |
| Operator State | 绑定 subtask 实例的状态 |
| Keyed State | 绑定 (operator, key) 的状态 |
| BroadcastState | 广播到所有 subtask 的 Operator State |
| Side Output | 主输出外的辅助流 |
十三、小结
DataStream 程序由 Source、Transformation、Sink 组成;Transformation 通过 分区策略 决定 record 是否 shuffle。keyBy 将 key 映射到 KeyGroup 再映射到 subtask,是 Keyed State 与窗口的水平扩展基础。ProcessFunction + TimerService 实现事件时间会话、迟到处理等 第 2 篇 机制。Operator State 管 Kafka offset 与广播规则;Keyed State 管 per-key 聚合——结构细节与 TTL 见 第 9 篇。
下一篇:五种 Keyed State、HashMap vs RocksDB 选型、State TTL 与状态大小估算。
参考资料
- Apache Flink Documentation, DataStream API(Source / Transformation / Sink)。
- Apache Flink Documentation, Stateful Stream Processing(KeyGroup、keyed state 模型)。
- Apache Flink Documentation, Process Function(ProcessFunction、TimerService、侧输出)。
- Apache Flink Documentation, Timely Stream Processing(event-time timer 与 watermark)。
- Apache Flink Documentation, Working with State / Working with State V2(Operator vs Keyed State)。
- Apache Flink 源码
StreamPartitioner、KeyGroupStreamPartitioner(分区与 KeyGroup 实现)。 - 本系列 第 7 篇(JobVertex、chain、并行度)。
- 本系列 第 4–6 篇(Kafka 分区、offset、事务)。
返回 系列目录 | 上一篇:Flink 运行时模型 | 下一篇:键控状态与 State TTL
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】键控状态与 State TTL
系统拆解 ValueState、ListState、MapState、ReducingState、AggregatingState 的语义与适用场景,对比 HashMapStateBackend 与 EmbeddedRocksDBStateBackend 选型,讲清 State TTL 的更新/可见性/清理策略,并给出窗口 state 与 RocksDB 磁盘占用的估算方法。
【流式数据处理】RocksDB State Backend 内核路径
拆解 Flink EmbeddedRocksDBStateBackend 的物理布局:每个 subtask 独立 RocksDB 实例、ColumnFamily 与 KeyGroup 前缀映射、写路径 memtable→WAL→flush→compaction 与 lsm-tree 系列对照、读路径 block cache 与读放大、增量 checkpoint 与全量 snapshot 的 IO 差异。
【流式数据处理】流处理全景:从日志到有状态计算
从批、流、微批四维度对比出发,建立「可重放日志 + 有状态计算」心智模型,厘清 Lambda/Kappa 边界与流表对偶,并给出与 lakehouse 入湖侧对称的全系列地图。
【流式数据处理】事件时间、处理时间与 Watermark
拆解 event time、processing time、ingestion time 三种时间语义,给出 watermark 的形式化含义与 bounded-out-of-orderness 等生成策略,并说明侧输出、allowed lateness 如何处理迟到数据;附 event-time 与 processing-time 窗口对比的可复现实验步骤。