土法炼钢兴趣小组的算法知识备份

【流式数据处理】DataStream 与算子语义

文章导航

分类入口
databasedistributed
标签入口
#flink#datastream#shuffle#keyby#keygroup#processfunction#timerservice#operator-state#keyed-state#broadcast-state

目录

第 7 篇 把 Flink 作业编译成 StreamGraph → JobGraph → ExecutionGraph,并说明 shuffle 只发生在 JobVertex 之间。本篇回答:用户写的 fromSourcemapkeyBywindow 在语义上各是什么?一条 record 何时必须走网络、何时可以 forward?keyBy 之后为什么才能用 ValueState?ProcessFunction 里的定时器和 第 2 篇 的 watermark 如何配合?

本文聚焦 Flink DataStream API(V1)——本系列主线。Flink 2.x 引入 DataStream V2State V2(见官方 Working with State V2),语义与 V1 对齐但接口包名不同;文末单独标注版本边界。不写 Flink SQL 优化器,也不写 Table API 到 DataStream 的转换细节。

先修:第 1 篇 的流表对偶;第 2、3 篇 的事件时间与窗口;第 4–6 篇 的 Kafka 分区有序与事务;第 7 篇 的 chain 与并行度。

环境说明:本机 WSL2,未安装 Flink。shuffle 分布实验给出 可复现的验证思路与代码骨架,不伪造 subtask 计数输出。架构与 API 行为以 Flink 1.20+ / 2.x 官方文档为准。


一、DataStream 程序的三段结构

一个典型作业在 API 层总是 Source → Transformation → Sink(来源:Flink Documentation,DataStream API):

flowchart LR
  SRC["Source<br/>Kafka / 文件 / 自定义"]
  TR["Transformation<br/>map / keyBy / window / connect"]
  SNK["Sink<br/>Kafka / JDBC / 湖 / 打印"]
  SRC --> TR --> SNK
阶段 职责 与 Kafka 对照
Source 持续读外部系统,注入 watermark(若配置) Consumer 拉分区、提交 offset(第 5 篇
Transformation 无界流上的算子逻辑,可能 shuffle 无直接 Kafka 对应;分区键类似 keyBy 前的业务键
Sink 写出结果,参与 EOS 时做 2PC(第 15 篇 Producer / 事务 commit(第 6 篇

DataStream<T> 是逻辑句柄:携带类型 \(T\)、分区策略、并行度、watermark 策略。对 DataStream 的每次 map/filter 返回新 DataStream,底层在 StreamGraph 上追加 StreamNode。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("events")
    .setGroupId("flink-demo")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> raw = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> parseTs(event)),
    "kafka-source");

DataStream<CountResult> counts = raw
    .flatMap(new Tokenizer())
    .keyBy(w -> w.f0)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .sum(1);

counts.sinkTo(kafkaSink); // 或 print() 调试

env.execute("word-count-streaming");

1.1 Unified Source/Sink API(1.14+)

Flink 1.14 起推荐 Source/Sink v2 接口(Source / Sink),Kafka connector 的 KafkaSource / KafkaSink 已统一:

旧 API 新 API
FlinkKafkaConsumer KafkaSource + fromSource
FlinkKafkaProducer KafkaSink + sinkTo

Split 枚举(partition 作为 split)在 SourceReader 内并行拉取;offset 存 split state,语义仍属 Operator State 范畴(第 10 篇)。1.20+ 旧 consumer/producer 标记 deprecated,新作业应直接用 Unified API。

env.execute() 触发 StreamGraph 生成与提交(第 7 篇)。有界流execute() 同样成立,Source 读完后作业 FINISHED;无界流则长期 RUNNING


二、Transformation 分类与 One-to-One 边界

2.1 常见算子与是否引入 shuffle

算子 是否改变分区 典型 shuffle
map / flatMap / filter forward(同并行度)
keyBy HASH 按 key
rebalance / rescale REBALANCE / RESCALE
broadcast BROADCAST 到所有 subtask
window + 聚合 否(在 keyBy 之后) 依赖上游 keyBy
connect / coMap 视情况 常配合 broadcast

Physical 分区类型(源码 StreamPartitioner)决定 record 如何选下游 subtask:

2.2 隐式 shuffle:keyBy 与 rebalance

// 显式 rebalance:解决 Source 分区局部性导致的数据倾斜前兆
DataStream<Event> balanced = parsed.rebalance();

// keyBy:按业务键哈希,后续 keyed state / window 都绑定此键
KeyedStream<Event, String> keyed = balanced.keyBy(Event::getUserId);

没有 keyBy 就不能使用 Keyed State(ValueState 等,第 9 篇)。WindowAll 可以在非 keyed 流上做窗口,但 state 是 单并行度Operator State 语义,无法水平扩展 keyed 聚合。


三、keyBy 与 KeyGroup:状态分片的根

3.1 从 key 到 subtask

对 keyed 算子,Flink 使用两步映射(来源:Flink Documentation,Stateful Stream Processing):

  1. Key → KeyGroup:$ = (key, ) $
  2. KeyGroup → subtask:$ = $

KeyGroup 个数 = maxParallelism第 7 篇)。parallelism 变化时,同一 KeyGroup 仍映射到确定 subtask 集合,state 随 KeyGroup 迁移——这是 rescaling 的基础(第 11 篇)。

flowchart TB
  K["业务 key (userId)"]
  KG["KeyGroup id<br/>0 .. maxParallelism-1"]
  ST["subtask index<br/>0 .. parallelism-1"]
  K -->|"MurmurHash 等"| KG -->|"mod parallelism"| ST

MurmurHash 等哈希函数保证 key 均匀打散;若业务 key 本身倾斜(热点 userId),再均匀的哈希也无法救 subtask 负载不均——这是 第 18 篇 的数据倾斜主题,不是 shuffle 策略能单独解决的。

3.2 KeyGroup 与 Kafka 分区键

Kafka producer 按 record key 选分区(第 4 篇),保证 分区内有序。Flink keyBy 的键 可以与 Kafka key 相同也可以不同

3.3 maxParallelism 与 key 类型

key 类型必须在 整个作业生命周期 内与 state 序列化器一致。改 key 类型或 maxParallelism 降档会导致 savepoint 不兼容第 11 篇)。


四、Shuffle 策略详解

4.1 rebalance:强制 round-robin

用途:打破 Source 分区与 subtask 固定绑定 带来的热点。例如 Kafka 某分区消息量远大于其他分区,Source subtask 负载不均,在 Source 后加 rebalance() 可把 record 均匀打到下游。

代价:每条 record 一次网络 shuffle + 序列化。不要在最热路径上滥用。

4.2 keyBy:HASH 与 KeyedStream

keyBy 返回 KeyedStream,只能调用 keyed 算子(windowprocess with keyed state 等):

keyedStream
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(lateTag)
    .aggregate(new MyAggregate());

Multiple key selectorsKeySelector lambda 或 KeySelector 类;复合键用 Tuple2 等,注意 equals/hashCode 语义。

4.3 broadcast:规则流与 Broadcast State

MapStateDescriptor<String, Rule> ruleDesc =
    new MapStateDescriptor<>("rules", String.class, Rule.class);

BroadcastStream<Rule> rules = env
    .fromSource(ruleSource, WatermarkStrategy.noWatermarks(), "rules")
    .broadcast(ruleDesc);

DataStream<Alert> alerts = events
    .connect(rules)
    .process(new BroadcastProcessFunction<Event, Rule, Alert>() { /* ... */ });

Broadcast StateOperator State 的一种特殊形式:每个 subtask 持有 完整规则副本,适合 小体积、低频更新 的配置流。大表 join 不应滥用 broadcast(内存 × parallelism 膨胀)。

4.4 分区策略选择表

目标 策略
同并行度管道内省 shuffle 保持 forward(默认)
纠正 Source 倾斜 rebalance()
按业务键聚合 / state keyBy()
小维表 / 规则下发 broadcast()
部分下游 subtask rescale()

五、ProcessFunction 与 TimerService

ProcessFunction单条 record 级别 的底层 API,可注册 processing-time / event-time 定时器,访问 Keyed State侧输出(来源:Flink Documentation,Process Function)。

5.1 与 MapFunction 的差异

能力 MapFunction ProcessFunction
访问 Keyed State
注册定时器
侧输出
每条 record 调用 map(T) processElement(T, ctx, out)

第 2 篇allowed lateness侧输出迟到数据,在 API 层常通过 ProcessFunctionWindowFunction 实现;窗口算子内部也依赖 timer 触发 cleanup。

5.2 事件时间定时器示例骨架

public class SessionGapFunction
        extends KeyedProcessFunction<String, Event, SessionResult> {

    private ValueState<Long> lastEventTs;
    private ValueState<List<Event>> buffer;

    @Override
    public void open(OpenContext openContext) {
        lastEventTs = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-ts", Long.class));
        // buffer 可用 ListState,见第 9 篇
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<SessionResult> out)
            throws Exception {
        long ts = event.getTimestamp();
        // 注册 event-time timer:当 watermark >= ts + gap 时触发
        ctx.timerService().registerEventTimeTimer(ts + SESSION_GAP_MS);
        // 更新 state ...
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<SessionResult> out)
            throws Exception {
        // watermark 已推进到 timestamp,可安全关闭会话
        emitSession(out);
        lastEventTs.clear();
    }
}

TimerService 规则(来源:官方文档 Timely Stream Processing):

5.3 侧输出

OutputTag<Event> lateTag = new OutputTag<>("late-events") {};

// 在 window 或 process 中
ctx.output(lateTag, lateEvent);

// 主流程外消费
DataStream<Event> late = main.getSideOutput(lateTag);

侧输出流 独立并行度,可单独 sink 到 Kafka dead-letter topic(第 4 篇 的独立 topic 治理)。

5.4 KeyedProcessFunction 与 CoProcessFunction

CoProcessFunction 连接两条流(一条 keyed、一条 broadcast 或 keyed),在 processElement1 / processElement2 分别处理;connect + keyBy 组合可实现 双流 join 的 state 维护。与 Interval Join(SQL/DataStream API 高层)相比,ProcessFunction 暴露完整 timer/state,适合 第 2 篇自定义乱序边界

注意:只有 KeyedStream 上的 KeyedProcessFunction 能按 key 注册 timer;非 keyed 的 ProcessFunction 无 key 上下文,timer 是 全局 per subtask 语义。


六、窗口算子在本篇的边界

第 3 篇 专讲 Tumbling / Sliding / Session 与 Trigger/Evictor。本篇只强调 运行时衔接

窗口聚合常用 AggregateFunction + ProcessWindowFunction 组合:前者增量计算省 state,后者在 fire 时拿全量上下文输出。


七、算子状态与键控状态:初探

Flink 两类 state(来源:Flink Documentation,Working with State):

类型 绑定对象 访问 API 典型场景
Operator State 并行 subtask ListState / UnionListState / BroadcastState Kafka offset、缓冲批次、规则表
Keyed State 每个 key ValueState / ListState / MapState / … 聚合、会话、去重

Keyed State 只能在 KeyedStream 上的算子 中访问(RichFunctiongetRuntimeContext().getState(...))。Operator State 在任意 RichFunction 上通过 getRuntimeContext().getOperatorStateStore() 注册。

7.1 Kafka Source 与 Operator State

Flink Kafka Source 把 partition → offset 存在 Operator State(或 split 枚举机制,Unified Source 框架下为 SourceReader 状态)。checkpoint 时写入 全部分区的 offset 快照第 10 篇),与 第 5 篇 的 consumer offset commit 分工不同:Flink 默认 不依赖 Kafka __consumer_offsets 的 auto commit,而以 checkpoint 为准。

7.2 何时用哪类 state

Keyed State 的五种结构与 TTL 在 第 9 篇 展开;RocksDB 路径在 第 12 篇


八、Sink 与交付语义钩子

Sink 算子常是 独立 JobVertex(不与上游 chain),以便单独 metrics 与 2PC(第 7 篇)。

Sink 类型 语义要点
print / collect 调试,无 EOS
Kafka Sink 事务 + checkpoint 对齐(第 6、15 篇
文件 / 湖 Sink pre-commit 文件 + committer(lakehouse 第 19 章

SinkFunctionSink V2(Flink 1.12+ Sink 接口)并存;2.x 推荐 Unified Sink API。EOS 作业必须选用 支持两阶段提交 的 Sink 实现(第 14、15 篇)。


九、验证 shuffle 分布(实验思路)

目标:观察 rebalance vs 无 rebalance 时下游 subtask 记录数分布(PLAN 第 8 篇实验)。本环境未跑 Flink,给出可复现步骤:

  1. 本地起 Kafka KRaft + Flink(版本 1.20+ 或 2.x),向单分区 topic 灌入足够多事件(或故意只写 partition 0)。
  2. 作业 A:source.map(record -> { subtaskIndex; return record; }) 统计各 subtask 计数,无 rebalance
  3. 作业 B:在 map 前加 .rebalance()
  4. 在 Flink Web UI Subtasks 看各并行实例 Records Sent,或通过 metric numRecordsOutPerSecond 聚合。

预期(工程推导,非本机实测):单分区 Source 无 rebalance 时,仅 一个 Source subtask 有输入,forward 到同 index 下游 subtask,分布极度不均;rebalance 后下游计数接近 均匀(± 抽样波动)。keyBy 则按 key 哈希,热点 key 导致 skew 而非均匀——这与 rebalance 目标不同。

// 统计用 map(示意)
public class SubtaskTagMap extends RichMapFunction<Event, Tuple2<Integer, Event>> {
    @Override
    public Tuple2<Integer, Event> map(Event value) {
        return Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), value);
    }
}

项目 Flink 1.20+ Flink 2.x
DataStream V1 主线,本系列示例语言 仍支持
DataStream V2 预览/部分 backport 官方推荐新项目评估
State V2 新 state 接口,TTL 语义延续
OpenContext 1.20 引入,替代部分 open(Configuration) 沿用

运行时 shuffle、KeyGroup、ExecutionGraph 对 V1/V2 一致。迁移 V2 时主要改 包名与 ProcessFunction 基类;本篇机制描述仍适用。


十一、与系列其他篇章的交叉引用

flowchart LR
  SP08["本篇: shuffle / keyBy / ProcessFunction"]
  SP02["02 事件时间与 watermark"]
  SP03["03 窗口"]
  SP07["07 运行时 JobVertex"]
  SP09["09 Keyed State / TTL"]
  SP10["10 Checkpoint"]
  K05["05 Consumer offset"]
  K06["06 Kafka 事务"]
  SP08 --> SP02
  SP08 --> SP03
  SP08 --> SP07
  SP08 --> SP09
  SP09 --> SP10
  SP08 --> K05
  SP10 --> K06

十二、术语表

术语 含义
KeyedStream keyBy 后的流,启用 Keyed State
KeyGroup keyed state 重分布的原子单位,个数 = maxParallelism
Shuffle 跨 subtask 的网络数据交换
ProcessFunction 带 state / timer / 侧输出的底层算子
TimerService 注册 processing-time / event-time 定时器
Operator State 绑定 subtask 实例的状态
Keyed State 绑定 (operator, key) 的状态
BroadcastState 广播到所有 subtask 的 Operator State
Side Output 主输出外的辅助流

十三、小结

DataStream 程序由 Source、Transformation、Sink 组成;Transformation 通过 分区策略 决定 record 是否 shuffle。keyBy 将 key 映射到 KeyGroup 再映射到 subtask,是 Keyed State 与窗口的水平扩展基础。ProcessFunction + TimerService 实现事件时间会话、迟到处理等 第 2 篇 机制。Operator State 管 Kafka offset 与广播规则;Keyed State 管 per-key 聚合——结构细节与 TTL 见 第 9 篇

下一篇:五种 Keyed State、HashMap vs RocksDB 选型、State TTL 与状态大小估算


参考资料

  1. Apache Flink Documentation, DataStream API(Source / Transformation / Sink)。
  2. Apache Flink Documentation, Stateful Stream Processing(KeyGroup、keyed state 模型)。
  3. Apache Flink Documentation, Process Function(ProcessFunction、TimerService、侧输出)。
  4. Apache Flink Documentation, Timely Stream Processing(event-time timer 与 watermark)。
  5. Apache Flink Documentation, Working with State / Working with State V2(Operator vs Keyed State)。
  6. Apache Flink 源码 StreamPartitionerKeyGroupStreamPartitioner(分区与 KeyGroup 实现)。
  7. 本系列 第 7 篇(JobVertex、chain、并行度)。
  8. 本系列 第 4–6 篇(Kafka 分区、offset、事务)。

返回 系列目录 | 上一篇:Flink 运行时模型 | 下一篇:键控状态与 State TTL

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】键控状态与 State TTL

系统拆解 ValueState、ListState、MapState、ReducingState、AggregatingState 的语义与适用场景,对比 HashMapStateBackend 与 EmbeddedRocksDBStateBackend 选型,讲清 State TTL 的更新/可见性/清理策略,并给出窗口 state 与 RocksDB 磁盘占用的估算方法。

2026-07-01 · database / distributed

【流式数据处理】RocksDB State Backend 内核路径

拆解 Flink EmbeddedRocksDBStateBackend 的物理布局:每个 subtask 独立 RocksDB 实例、ColumnFamily 与 KeyGroup 前缀映射、写路径 memtable→WAL→flush→compaction 与 lsm-tree 系列对照、读路径 block cache 与读放大、增量 checkpoint 与全量 snapshot 的 IO 差异。

2026-07-01 · database / distributed

【流式数据处理】事件时间、处理时间与 Watermark

拆解 event time、processing time、ingestion time 三种时间语义,给出 watermark 的形式化含义与 bounded-out-of-orderness 等生成策略,并说明侧输出、allowed lateness 如何处理迟到数据;附 event-time 与 processing-time 窗口对比的可复现实验步骤。


By .