一个支付系统每秒接收数万笔交易事件,需要按 5 分钟窗口统计每个商户的交易总额。问题来了:网络延迟导致一笔 14:59:58 产生的交易在 15:00:03 才到达系统。如果按数据到达时间(处理时间)划分窗口,这笔交易会被错误地归入 15:00-15:05 的窗口,导致 14:55-15:00 窗口少算了一笔,15:00-15:05 窗口多算了一笔。更麻烦的是,系统重启后从上一个检查点恢复,部分交易会被重新处理——如果没有去重机制,交易金额就会被重复计算。
这两个问题——乱序数据的正确归属和故障恢复后的精确一次语义——构成了流计算引擎设计的核心挑战。Apache Flink 对这两个问题给出了系统性的工程解答。本文从事件时间与 Watermark、状态管理、Checkpoint 机制、Exactly-Once 语义、窗口语义、反压机制和流批一体架构七个维度,拆解 Flink 的内部实现。
一、事件时间与 Watermark
1.1 三种时间语义
Flink 定义了三种时间语义:
事件时间(Event Time):事件实际发生的时间,由数据生产者写入事件本身。上面支付系统的例子中,14:59:58 就是事件时间。事件时间不受传输延迟、系统时钟偏移和处理速度的影响,保证了结果的确定性——同一批数据无论何时处理、以什么速度处理,结果都一样。
摄入时间(Ingestion Time):事件进入 Flink Source 算子的时间。摄入时间在 Source 端一次性赋值后不再改变,比处理时间更稳定(不受下游算子处理速度影响),但仍然无法反映事件的真实发生时间。Flink 1.12 之后,摄入时间不再作为独立概念,而是通过在 Source 端赋值事件时间来等效实现。
处理时间(Processing Time):算子处理该事件时的本地系统时钟。处理时间的优势是零延迟(不需要等待乱序数据)和低开销(不需要维护 Watermark),但结果不可重现——同一批数据在不同时刻处理,窗口归属可能完全不同。
三者的关系可以这样理解:事件时间由数据决定,摄入时间由 Source 决定,处理时间由算子决定。大多数需要结果正确性的场景应该使用事件时间。
1.2 Watermark 机制
事件时间带来了一个根本性的问题:系统如何知道一个窗口的数据已经到齐?在处理时间语义下,窗口结束时刻到了就可以触发计算。但在事件时间语义下,即使当前系统时间已经过了 15:00,仍然可能有事件时间在 14:55-15:00 之间的数据还在路上。
Watermark 是 Flink 解决这个问题的机制。Watermark 是一个时间戳 W,表达的语义是:事件时间小于 W 的数据已经全部到达。当一个窗口的结束时间小于等于当前 Watermark 时,该窗口被触发计算。
Watermark 的生成有两种典型策略:
有序流的 Watermark:如果数据源保证有序(例如单分区的 Kafka Topic),Watermark 可以直接取当前最大事件时间。
WatermarkStrategy
.<PaymentEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getTimestamp());乱序流的 Watermark:允许一定程度的乱序,通过设置最大乱序容忍度来生成 Watermark。如果最大乱序容忍度设为 5 秒,当前观察到的最大事件时间为 T,则 Watermark = T - 5s。
WatermarkStrategy
.<PaymentEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTimestamp());Watermark
生成后,随数据流向下游传播。当一个算子有多个输入通道时,取所有输入通道
Watermark 的最小值作为该算子的
Watermark——这确保了不会遗漏任何一个输入通道上的迟到数据,但也意味着最慢的输入通道会拖慢整个算子的
Watermark 推进速度。这种”短板效应”在实践中需要关注:如果某个
Kafka 分区长时间没有数据,其 Watermark
会停滞不前,导致下游窗口无法触发。Flink 通过
withIdleness()
配置来应对这个问题,将长时间没有数据的分区标记为空闲,不再参与
Watermark 计算。
WatermarkStrategy
.<PaymentEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1));1.3 迟到数据处理
即使设置了乱序容忍度,仍然可能有数据在 Watermark 之后才到达——这些就是迟到数据(Late Data)。Flink 提供三种处理策略:
丢弃(默认):迟到数据被直接丢弃。
允许迟到(Allowed Lateness):窗口在 Watermark 触发后不立即清除,而是保留一段额外时间。在此期间到达的迟到数据会触发窗口的重新计算,输出更新结果。
stream
.keyBy(PaymentEvent::getMerchantId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.aggregate(new SumAggregate());侧输出(Side Output):将迟到数据导入一个旁路流,由用户自行决定如何处理——可以写入日志、存入专门的迟到数据表、或者做人工修正。
OutputTag<PaymentEvent> lateTag = new OutputTag<>("late-data"){};
SingleOutputStreamOperator<Result> result = stream
.keyBy(PaymentEvent::getMerchantId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sideOutputLateData(lateTag)
.aggregate(new SumAggregate());
DataStream<PaymentEvent> lateStream = result.getSideOutput(lateTag);Watermark 本质上是在”结果正确性”和”结果及时性”之间做权衡。乱序容忍度越大,等待时间越长,结果越完整;容忍度越小,延迟越低,但可能丢弃更多迟到数据。没有普适的最优值,需要根据业务的容忍度来调整。
1.4 Watermark 推进时间线:一个点击流的完整示例
以一个电商平台的页面点击流场景为例。系统使用基于事件时间的
5 分钟滚动窗口统计页面点击量,Watermark 的乱序容忍度设置为 3
秒。当前关注的窗口区间为
[10:00:00, 10:05:00)。下面的时序图展示了
Watermark
在整个窗口生命周期中的推进过程,以及迟到数据到达时的处理流程:
sequenceDiagram
participant Source as 数据源
participant WM as Watermark生成器
participant Window as 窗口算子
participant Output as 输出
Source->>WM: 点击事件 event_time=10:03:20
WM->>Window: Watermark=10:03:17
Note over Window: 窗口[10:00,10:05)继续收集数据
Source->>WM: 点击事件 event_time=10:04:55
WM->>Window: Watermark=10:04:52
Note over Window: 窗口[10:00,10:05)继续收集数据
Source->>WM: 点击事件 event_time=10:05:04
WM->>Window: Watermark=10:05:01
Window->>Output: 触发窗口[10:00,10:05)输出结果
Note over Window: Watermark越过窗口结束时间
Source->>WM: 迟到事件 event_time=10:04:58
Note over WM: event_time < Watermark
WM->>Window: 迟到数据到达
Note over Window: 根据策略处理:丢弃/更新/侧输出
上图清晰地展示了 Watermark
的推进机制:每次事件到达时,Watermark 被更新为
event_time - 乱序容忍度(即 3 秒)。当
Watermark 越过窗口结束时间 10:05:00
时,窗口被触发输出结果。此后到达的事件时间仍落在窗口范围内的数据,即被判定为迟到数据。
在上述示例中,事件时间为 10:04:58 的点击在 Watermark 已推进至 10:05:01 之后才到达,属于迟到数据。Flink 提供三种策略处理这类情况:
- 丢弃(默认策略):迟到的点击被静默丢弃,窗口的输出结果保持不变。这种方式延迟最低,适合对精度要求不高的实时大盘统计场景。
- allowedLateness(允许迟到):通过设置
.allowedLateness(Time.minutes(1)),窗口在 Watermark 到达 10:06:00 之前保持可更新状态。迟到的点击会触发窗口的重新计算并输出更新后的结果。代价是窗口状态的保留时间更长,增加了内存和存储开销。 - 侧输出(Side
Output):迟到的点击被路由到一个独立的数据流,例如写入名为
late_clicks的 Kafka Topic,供下游系统做后续对账和修正。这种方式兼顾了主流程的低延迟和迟到数据的完整保留。
二、状态管理
流计算与批计算的根本区别在于状态。批计算可以多次扫描完整数据集,中间结果只在作业运行期间存在。流计算面对的是无界数据,算子必须在内存中维护状态来跟踪已处理的数据:窗口聚合需要保存中间结果,去重需要保存已见过的 Key,CEP 需要保存状态机的当前位置。
2.1 Keyed State
Keyed State 是最常用的状态类型,与 KeyedStream 上的每个 Key 关联。Flink 提供五种 Keyed State 原语:
| 类型 | 语义 | 典型场景 |
|---|---|---|
ValueState<T> |
每个 Key 存一个值 | 最近一次交易金额 |
ListState<T> |
每个 Key 存一个列表 | 最近 N 条事件 |
MapState<K, V> |
每个 Key 存一个 Map | 维度属性聚合 |
ReducingState<T> |
自动增量聚合 | 累加和、最大值 |
AggregatingState<IN, OUT> |
自定义增量聚合 | 自定义统计 |
以 ValueState
为例,一个统计每个用户连续登录失败次数的实现:
public class LoginFailDetector extends KeyedProcessFunction<String, LoginEvent, Alert> {
private ValueState<Integer> failCountState;
private ValueState<Long> firstFailTimeState;
@Override
public void open(Configuration params) {
failCountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("fail-count", Integer.class));
firstFailTimeState = getRuntimeContext().getState(
new ValueStateDescriptor<>("first-fail-time", Long.class));
}
@Override
public void processElement(LoginEvent event, Context ctx, Collector<Alert> out)
throws Exception {
if (event.getType() == LoginEvent.Type.FAIL) {
Integer count = failCountState.value();
if (count == null) {
failCountState.update(1);
firstFailTimeState.update(event.getTimestamp());
ctx.timerService().registerEventTimeTimer(
event.getTimestamp() + 10_000);
} else {
failCountState.update(count + 1);
}
if (failCountState.value() >= 3) {
out.collect(new Alert(event.getUserId(), "3 consecutive login failures"));
failCountState.clear();
firstFailTimeState.clear();
}
} else {
failCountState.clear();
firstFailTimeState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
failCountState.clear();
firstFailTimeState.clear();
}
}2.2 Operator State
Operator State 与算子实例(而非 Key)绑定,所有流过该算子实例的数据共享同一份状态。典型用例是 Kafka Source 保存每个分区的消费位移(Offset)。Operator State 在算子并行度变化时需要重新分配——Flink 支持 Even-split(均匀切分)和 Union(每个新实例获得完整状态)两种重分配策略。
public class CountingMapper implements MapFunction<String, Tuple2<String, Long>>,
CheckpointedFunction {
private transient ListState<Long> countState;
private long localCount = 0;
@Override
public Tuple2<String, Long> map(String value) {
localCount++;
return Tuple2.of(value, localCount);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
countState.clear();
countState.add(localCount);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
countState = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count", Long.class));
if (context.isRestored()) {
for (Long count : countState.get()) {
localCount += count;
}
}
}
}2.3 State Backend
State Backend 决定了状态的存储方式和访问性能:
HashMapStateBackend(原 MemoryStateBackend / FsStateBackend):状态以 Java 对象形式存储在 TaskManager 的 JVM 堆内存中。读写速度快(直接内存访问),但受 JVM 堆大小限制。GC 压力随状态量增大而显著增加——当状态超过几 GB 时,Full GC 暂停可能导致心跳超时,被 JobManager 判定为 TaskManager 失联。适用于状态量较小(几 GB 以内)或对延迟极敏感的场景。
EmbeddedRocksDBStateBackend(原 RocksDBStateBackend):状态以序列化字节存储在 RocksDB 的 LSM-Tree 中,数据最终落盘到本地磁盘。优势是可以支撑远超内存容量的状态(TB 级别),且不受 JVM GC 影响。代价是每次状态访问都需要序列化/反序列化,读写延迟比内存后端高一到两个数量级。RocksDB 后端还支持增量 Checkpoint(后文详述)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");上面代码中 EmbeddedRocksDBStateBackend
构造参数 true 表示启用增量 Checkpoint。
2.4 State TTL
长时间运行的流作业会持续累积状态。如果不主动清理过期状态,状态量会无限增长。State TTL(Time-To-Live)为每个状态条目设置过期时间,过期后在访问时被清除或在后台定时清理。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(
StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000)
.build();
ValueStateDescriptor<UserProfile> descriptor =
new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);setUpdateType 控制何时刷新
TTL:OnCreateAndWrite
表示写入时重置,OnReadAndWrite
表示读写都重置。setStateVisibility
控制是否允许读到已过期但未清理的状态。cleanupInRocksdbCompactFilter
利用 RocksDB Compaction
过程顺带清除过期数据,避免单独扫描的开销。
下图展示了 State TTL 的完整生命周期,从状态条目的创建、TTL 刷新、过期到最终清除的全过程:
stateDiagram-v2
[*] --> Active: 创建状态条目
Active --> Active: 读写操作(OnReadAndWrite时刷新TTL)
Active --> Approaching: TTL剩余时间不足
Approaching --> Expired: TTL超时
Expired --> Cleaned: 访问时清除 / RocksDB Compaction清除
Cleaned --> [*]
Active --> Cleaned: 用户主动clear()
Note right of Expired: NeverReturnExpired模式下\n读取返回null
该状态图揭示了 TTL
管理的两个关键设计决策。首先是刷新语义的选择:OnCreateAndWrite
模式下 TTL
仅在状态写入时重置,适合”最后一次更新后超时淘汰”的场景(如会话超时);而
OnReadAndWrite 模式下每次读取也会刷新
TTL,适合”最近一次访问后超时淘汰”的场景(如热点缓存)。其次是过期清除的时机:过期状态并不会在
TTL
到达的瞬间被立即删除,而是通过两种途径异步清除——下一次访问时的惰性清除,以及
RocksDB Compaction 过程中的后台批量清除。后者通过
cleanupInRocksdbCompactFilter
配置,在不阻塞数据处理的前提下回收存储空间。
三、Checkpoint 机制
3.1 从 Chandy-Lamport 到 Flink Checkpoint
分布式快照(Distributed Snapshot)的理论基础是 Chandy 和 Lamport 在 1985 年提出的算法。其核心思想是:在不暂停整个系统的前提下,通过在通信通道中插入标记消息(Marker),将系统状态切割为”标记前”和”标记后”两部分,从而获得一个全局一致的快照。
Flink 的 Checkpoint 机制是 Chandy-Lamport 算法在流计算场景下的工程实现,核心改进点包括:
- 将 Marker 改为 Checkpoint Barrier,直接嵌入数据流中随数据一起传输;
- 利用流计算 DAG 的有向无环特性,简化了标记传播逻辑;
- 引入异步快照,算子在发送 Barrier 后继续处理数据,状态写入在后台进行;
- 支持增量 Checkpoint,只持久化自上次 Checkpoint 以来变化的状态。
3.2 Barrier 传播与对齐
Checkpoint 的触发流程如下:
第一步:JobManager 注入 Barrier。 JobManager 的 CheckpointCoordinator 周期性地向所有 Source 算子发送触发 Checkpoint 的 RPC 消息。每个 Source 算子收到后,在当前数据流位置插入一个 Checkpoint Barrier(标记为 Bn,n 是 Checkpoint 编号),然后将 Barrier 向下游广播。
第二步:Barrier 对齐(Barrier Alignment)。 当一个算子有多个输入通道时(例如 keyBy 后的算子),各通道的 Barrier 到达时间通常不同。算子在收到第一个通道的 Barrier 后,暂停消费该通道的后续数据(将其缓冲),继续正常消费其他通道的数据,直到所有通道的 Barrier 都到齐。这就是 Barrier 对齐。对齐确保了快照只包含 Barrier 之前的数据处理结果,保证了 Exactly-Once 语义。
下图展示了这个过程:
第三步:状态快照。 所有 Barrier 对齐后,算子对当前状态做快照。快照过程可以是同步的(阻塞数据处理)或异步的(先做一份状态的 Copy-on-Write 副本,然后在后台线程写入持久化存储)。RocksDB 后端天然支持异步快照——利用 RocksDB 的 Snapshot 机制创建一个轻量级的一致性视图,然后在后台上传。
第四步:确认完成。 算子将快照写入分布式文件系统(HDFS、S3)后,向 JobManager 发送确认消息。当所有算子都确认后,JobManager 将该 Checkpoint 标记为完成。
3.3 Exactly-Once 与 At-Least-Once 的选择
Barrier 对齐是实现 Exactly-Once 的关键,但它会引入额外延迟:被阻塞的通道上的数据必须等待其他通道的 Barrier 到达后才能被处理。在数据倾斜严重的场景下(某些通道数据量远大于其他通道),对齐等待时间可能很长。
如果应用可以容忍重复处理,Flink 允许关闭 Barrier 对齐,改为 At-Least-Once 模式。此时算子收到第一个 Barrier 后不阻塞任何通道,继续正常处理所有数据。当所有 Barrier 到齐时再做快照。这意味着快照中可能包含部分 Barrier 之后的数据处理结果——故障恢复后这些数据会被重新处理,造成重复。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);3.4 增量 Checkpoint
全量 Checkpoint 每次将完整状态写入持久化存储。当状态达到 TB 级别时,每次 Checkpoint 的 I/O 开销和持续时间都不可接受。
增量 Checkpoint(Incremental Checkpoint)基于 RocksDB 的 SST 文件特性实现。RocksDB 使用 LSM-Tree 存储结构,数据最终以不可变的 SST(Sorted String Table)文件形式存在磁盘上。增量 Checkpoint 只上传自上一次 Checkpoint 以来新生成的 SST 文件,并记录当前所有 SST 文件的引用列表。恢复时,通过引用列表收集所有 SST 文件,重建完整的 RocksDB 实例。
增量 Checkpoint 的前提条件是使用 EmbeddedRocksDBStateBackend。HashMapStateBackend 不支持增量 Checkpoint,因为它在 JVM 堆中以 Java 对象形式存储状态,无法区分”增量”。
3.5 非对齐 Checkpoint
Flink 1.11 引入了非对齐 Checkpoint(Unaligned Checkpoint),进一步解决 Barrier 对齐在反压场景下的问题。
常规对齐 Checkpoint 在反压时面临恶性循环:下游处理慢导致反压,反压导致 Barrier 在网络缓冲区中排队,Barrier 传播变慢导致对齐时间增加,对齐期间阻塞的数据进一步加剧反压。严重时 Checkpoint 可能超时失败。
非对齐 Checkpoint 的做法是:Barrier 可以”超越”数据记录,直接跳到缓冲区头部传播到下游。算子不再等待所有通道的 Barrier 对齐,而是将各通道缓冲区中尚未处理的数据(Barrier 之前的数据)作为快照的一部分保存。恢复时,先恢复状态,再重放缓冲区中的数据。
非对齐 Checkpoint 的代价:快照体积增大(包含了网络缓冲区数据),恢复时间可能变长。适用于反压频繁、Checkpoint 超时问题严重的场景。
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));上面的配置表示:先尝试对齐 Checkpoint,如果 30 秒内未完成对齐,自动切换为非对齐模式。
四、Exactly-Once 语义
Checkpoint 本身只保证 Flink 内部状态的一致性——故障恢复后,所有算子回到一致的历史状态,从 Source 重新消费数据。但如果 Sink 在 Checkpoint 之后、故障之前已经向外部系统写入了数据,恢复后这些数据会被重复写入。端到端(End-to-End)的 Exactly-Once 需要 Source 和 Sink 的配合。
4.1 Source 端:可重放
Source 需要支持从指定位置重新消费数据。Kafka Source 天然满足——消费位移(Offset)保存在 Checkpoint 中,恢复时从保存的 Offset 开始重新消费。如果 Source 不支持重放(例如从 TCP Socket 读取),则端到端 Exactly-Once 无法实现。
4.2 Sink 端:两阶段提交
Flink 通过两阶段提交(Two-Phase Commit,2PC)协议实现支持事务的外部系统的 Exactly-Once 写入。以 Kafka Sink 为例:
预提交阶段(Pre-commit):Sink 在每个
Checkpoint 周期内,将所有待写入数据放入 Kafka 事务中(调用
KafkaProducer.send()),但不提交事务。当
Checkpoint Barrier 到达 Sink 时,Sink 调用
KafkaProducer.flush() 确保数据发送完毕,然后将
Kafka 事务 ID 保存到 Checkpoint
状态中。此时事务处于”预提交”状态——数据已写入 Kafka
但对消费者不可见。
提交阶段(Commit):JobManager
确认所有算子的 Checkpoint 完成后,通知 Sink 提交事务。Sink
调用
KafkaProducer.commitTransaction(),数据对消费者可见。
中止(Abort):如果 Checkpoint 失败,事务被中止,预提交的数据被丢弃。
这个机制要求 Kafka 消费者将隔离级别设为
read_committed,否则会读到未提交的数据。Kafka
事务有超时时间(默认 15 分钟),Checkpoint
间隔必须小于事务超时,否则事务会被 Kafka Broker
自动中止。
Flink 1.14 引入了 TwoPhaseCommitSinkFunction
的泛化抽象,用户可以针对任意支持事务的外部系统实现 2PC
Sink。
4.3 幂等 Sink
对于不支持事务但支持幂等写入的系统(例如按主键 Upsert 的数据库),可以通过幂等性保证 Exactly-Once。重复写入同一个主键的相同值不会改变最终结果。但幂等 Sink 有一个限制:在故障恢复到 Checkpoint 完成之间,外部系统的数据可能短暂不一致(旧数据被覆盖为更早的版本)。
JdbcSink.sink(
"INSERT INTO payment_summary (merchant_id, window_start, total_amount) " +
"VALUES (?, ?, ?) " +
"ON CONFLICT (merchant_id, window_start) " +
"DO UPDATE SET total_amount = EXCLUDED.total_amount",
(ps, result) -> {
ps.setString(1, result.merchantId);
ps.setTimestamp(2, result.windowStart);
ps.setBigDecimal(3, result.totalAmount);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/payments")
.withDriverName("org.postgresql.Driver")
.build()
);4.4 端到端 Exactly-Once 的条件
综合以上分析,端到端 Exactly-Once 需要三个条件同时满足:
- Source 支持从指定位置重新消费(如 Kafka Offset、文件位置);
- Flink 内部开启 Exactly-Once Checkpoint(Barrier 对齐);
- Sink 支持事务写入(2PC)或幂等写入。
缺少任何一个,都只能退化为 At-Least-Once 或 At-Most-Once。
五、窗口语义
5.1 窗口类型
Flink 提供四种内置窗口:
滚动窗口(Tumbling Window):固定大小,窗口之间不重叠。例如每 5 分钟一个窗口:[00:00, 00:05)、[00:05, 00:10)、[00:10, 00:15)。
stream
.keyBy(PaymentEvent::getMerchantId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount");滑动窗口(Sliding Window):固定大小,窗口之间按指定步长滑动,允许重叠。例如窗口大小 10 分钟、滑动步长 5 分钟,则每条数据最多属于两个窗口。
stream
.keyBy(PaymentEvent::getMerchantId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.sum("amount");会话窗口(Session Window):没有固定大小,根据数据之间的间隔(Session Gap)动态划分。如果两条连续数据的时间间隔超过 Gap,则属于不同的会话窗口。典型应用:用户行为分析中,用户停止操作一段时间后认为一次会话结束。
stream
.keyBy(UserAction::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new SessionAnalyzer());会话窗口的实现比固定窗口复杂得多。新数据到达时可能导致两个相邻会话窗口合并(Merge)——例如一条迟到数据填补了两个会话之间的间隔。Flink
的 MergingWindowAssigner
负责处理窗口合并逻辑。
全局窗口(Global Window):所有数据归入一个全局窗口,永不触发。需要配合自定义 Trigger 使用,否则数据只进不出。
5.2 窗口函数
窗口的计算逻辑由窗口函数定义。Flink 提供三类窗口函数:
增量聚合函数:ReduceFunction、AggregateFunction。数据到达窗口时立即参与聚合,窗口内只保存聚合结果(而非全量数据)。空间复杂度
O(1)(相对于窗口大小),适用于简单聚合(求和、计数、最大值)。
全量窗口函数:ProcessWindowFunction。窗口触发时才开始计算,此时可以访问窗口内的全部数据和窗口元信息(窗口起止时间、Key)。空间复杂度
O(n),适用于需要全量数据的计算(中位数、排序、复杂业务逻辑)。
组合使用:先用增量聚合减少数据量,再用
ProcessWindowFunction
访问窗口元信息。这是推荐的做法——兼顾性能和灵活性。
stream
.keyBy(PaymentEvent::getMerchantId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new SumAggregate(), new WindowResultFunction());
public class SumAggregate
implements AggregateFunction<PaymentEvent, BigDecimal, BigDecimal> {
@Override
public BigDecimal createAccumulator() {
return BigDecimal.ZERO;
}
@Override
public BigDecimal add(PaymentEvent event, BigDecimal acc) {
return acc.add(event.getAmount());
}
@Override
public BigDecimal getResult(BigDecimal acc) {
return acc;
}
@Override
public BigDecimal merge(BigDecimal a, BigDecimal b) {
return a.add(b);
}
}
public class WindowResultFunction
extends ProcessWindowFunction<BigDecimal, PaymentSummary, String, TimeWindow> {
@Override
public void process(String merchantId, Context context,
Iterable<BigDecimal> elements,
Collector<PaymentSummary> out) {
BigDecimal total = elements.iterator().next();
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
out.collect(new PaymentSummary(merchantId, windowStart, windowEnd, total));
}
}5.3 Trigger 与 Evictor
Trigger 决定窗口何时触发计算。默认 Trigger 在 Watermark 超过窗口结束时间时触发(事件时间窗口)或在系统时间超过窗口结束时间时触发(处理时间窗口)。用户可以自定义 Trigger,实现更复杂的触发策略,例如每收到 1000 条数据或每隔 10 秒提前输出一次中间结果。
public class CountOrTimeTrigger extends Trigger<Object, TimeWindow> {
private final long maxCount;
private final ReducingStateDescriptor<Long> countDesc =
new ReducingStateDescriptor<>("count", Long::sum, Long.class);
public CountOrTimeTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(countDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countDesc).clear();
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
}Evictor
在窗口触发前或触发后移除窗口中的部分数据。例如
CountEvictor 只保留最近 N
条数据,TimeEvictor
只保留最近一段时间的数据。Evictor
会阻止增量聚合优化——因为需要在触发时访问和操作全量数据。除非确实需要在触发前过滤数据,否则不建议使用
Evictor。
六、反压机制
6.1 反压的产生
流计算系统中,上下游算子的处理速度很难始终匹配。当下游算子处理变慢(磁盘 I/O 抖动、GC 暂停、外部系统响应变慢),上游产生的数据会在网络缓冲区中堆积。如果不加控制,缓冲区溢出会导致数据丢失或系统崩溃。反压(Backpressure)是一种自适应的流量控制机制:当下游无法及时消费数据时,信号向上游传播,使上游主动降速。
6.2 Credit-Based 流控
Flink 1.5 之后采用 Credit-Based 流控机制,在 Flink 自身的网络层(而非依赖 TCP 的流控)实现精细的反压传播。
核心概念:
Network Buffer:Flink 的数据传输以 Buffer 为单位。每个算子的输入通道(InputChannel)和输出通道(ResultSubpartition)各自持有一组 Network Buffer。
Credit:下游算子的 InputChannel 持有的可用 Buffer 数量就是 Credit。下游通过 Backlog 消息告知上游自己当前的 Credit 值——实质上是在说”我还有多少空间可以接收数据”。
流控过程:上游的 ResultSubpartition 向下游发送数据时,每发送一个 Buffer,消耗一个 Credit。当 Credit 降为 0,上游停止向该通道发送数据,但不影响其他通道。下游消费了数据、释放了 Buffer 后,更新 Credit 并通知上游。
这种机制的优势:
通道级粒度:反压精确到单个逻辑通道,不像 TCP 流控那样将多个逻辑通道的流量耦合在一个 TCP 连接中。一个慢通道不会影响同一 TCP 连接上的其他通道。
快速响应:Credit 信息直接嵌入 Flink 的网络协议中,不需要等待 TCP 层的滑动窗口收缩。反压信号传播速度显著快于纯 TCP 流控。
避免死锁:Flink 的网络栈采用了独立的 Buffer Pool 管理,每个 Task 有独立的 Buffer 预算,避免了多个 Task 竞争同一组 Buffer 导致的死锁。
6.3 反压的传播链路
反压的完整传播路径如下:
- Sink 写入外部系统变慢,Sink 算子的输入缓冲区逐渐填满;
- Sink 的 InputChannel Credit 降为 0,上游 Operator B 的 ResultSubpartition 停止发送;
- Operator B 的输出缓冲区填满,Operator B 无法输出新数据,处理速度下降;
- Operator B 的 InputChannel Credit 降为 0,信号继续向上传播;
- 最终传递到 Source,Source 降低从外部系统(如 Kafka)拉取数据的速度。
整个过程是自动的、分布式的、无需人工干预的。在 Flink Web UI 的 Backpressure 面板中,可以观察到每个 Task 的反压状态(OK / LOW / HIGH),帮助定位瓶颈所在。
6.4 反压对 Checkpoint 的影响
反压与 Checkpoint 之间存在相互干扰。Checkpoint Barrier 作为特殊消息嵌入数据流中,当数据流因反压而阻塞时,Barrier 的传播也会变慢。这导致 Checkpoint 完成时间增加,甚至超时失败。更严重的情况下,Checkpoint 持续失败会导致作业无法完成任何 Checkpoint,如果此时发生故障,只能从更早的 Checkpoint 恢复,丢失更多进度。
前面提到的非对齐 Checkpoint 正是针对这个问题的解决方案——让 Barrier 跳过缓冲区中排队的数据,直接传播到下游。
七、流批一体架构
7.1 有界流与无界流
Flink 的核心设计理念是将批处理视为流处理的特例。所有数据都是流——区别在于流是否有界:
无界流(Unbounded Stream):没有终点的数据流,如实时交易事件、传感器数据。这是 Flink 最典型的处理场景。
有界流(Bounded Stream):有明确起止点的数据集,如一天的日志文件、一张数据库表。批处理可以建模为对有界流的处理。
统一的编程模型意味着同一段代码既可以处理实时数据流,也可以处理历史数据集。用户不需要为流和批分别编写两套逻辑(例如 Lambda 架构中用 Storm 处理实时数据、用 MapReduce 处理批量数据)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<String> lines = env.readTextFile("/data/logs/2026-04-13.log");
lines.flatMap(new LogParser())
.keyBy(LogEntry::getLevel)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new CountAggregate())
.addSink(new ResultSink());
env.execute("batch-log-analysis");通过
setRuntimeMode(RuntimeExecutionMode.BATCH),同一套代码以批模式运行。也可以设为
AUTOMATIC,由 Flink 根据数据源自动判断。
7.2 批模式的优化
虽然 Flink 的流引擎可以直接处理有界数据,但批模式下 Flink 会启用一系列专门的优化:
排序替代哈希:流模式下 KeyBy 操作使用哈希分区,数据到达后直接按哈希值路由。批模式下可以先对数据按 Key 排序,然后顺序扫描——排序的缓存友好性(Cache Locality)显著优于随机哈希访问,尤其在数据量大时。
调度策略:流模式下所有算子同时启动(Eager Scheduling),因为数据是持续流入的。批模式下可以按阶段调度(Lazy Scheduling)——上游阶段完成后再启动下游阶段,减少同时运行的 Task 数量,降低资源占用。
Shuffle 实现:流模式的 Shuffle 通过网络缓冲区实时传输。批模式下可以使用基于磁盘的 Blocking Shuffle——上游将数据写入本地磁盘,下游从磁盘读取。这允许上游和下游不同时运行,支持更灵活的调度策略。
容错策略:流模式的容错依赖 Checkpoint——定期对全量状态做快照。批模式不需要 Checkpoint,而是依赖 Task 级重试:某个 Task 失败后只需重新执行该 Task(及其上游产出的中间数据),不需要回滚整个作业。这在大规模批处理中效率更高。
状态后端:批模式下某些场景不需要维护增量状态,可以使用更简单的堆内存结构,避免 RocksDB 的序列化开销。
7.3 流批一体的工程边界
流批一体的理念虽然优雅,但在工程实践中仍有边界需要认识:
性能差距:纯批处理场景下,Spark 经过十余年的优化(Catalyst 优化器、Tungsten 内存管理、Adaptive Query Execution),在 TPC-DS 等标准测试中仍然领先 Flink 的批模式。Flink 的批模式优化在持续追赶,但短期内不太可能在所有批处理场景下取代 Spark。
生态成熟度:Spark 的批处理生态(SparkSQL、MLlib、GraphX、Delta Lake)更为成熟。Flink 的优势在流处理和近实时分析场景。
运维复杂性:统一引擎意味着运维团队需要同时理解流和批两种模式的资源管理、调优方法和故障排查策略。在很多组织中,流和批的运维团队是分开的,强行统一可能增加运维负担。
从架构演进的角度看,流批一体是分布式计算的长期趋势。Google 的 Dataflow 模型(下一篇讨论)从理论层面统一了流和批的编程模型,Flink 是这个理论最完整的开源实现。但”统一”不等于”取代”——在纯批处理、纯流处理和流批混合三种场景下,最优的工程选择可能不同。
八、运行时架构补充
8.1 作业提交与执行
Flink 的运行时由三个核心组件构成:
JobManager:整个作业的协调者。负责接收用户提交的 JobGraph(逻辑执行计划),将其转换为 ExecutionGraph(物理执行计划),然后向 TaskManager 分发 Task、协调 Checkpoint、处理故障恢复。一个 Flink 集群中,每个作业对应一个 JobManager(在 Application 模式下)。
TaskManager:实际执行数据处理的工作节点。每个 TaskManager 提供固定数量的 Task Slot,每个 Slot 可以运行一个并行 Task(或多个共享 Slot 的 Task)。TaskManager 管理本地的内存、网络缓冲区和状态后端。
ResourceManager:管理 TaskManager 的资源分配。在 YARN、Kubernetes 或 Standalone 模式下,ResourceManager 负责向底层资源调度系统申请或释放 TaskManager。
8.2 算子链与 Task Slot
Flink 会将可以合并的连续算子串联成一个算子链(Operator Chain),在同一个线程中执行。合并的条件包括:上下游算子并行度相同、连接方式为 Forward(一对一)、未被用户显式禁用链接。算子链的好处是避免了线程切换、序列化/反序列化和网络传输的开销。
stream
.map(new MyMapper()) // 这三个算子会被合并为一个链
.filter(new MyFilter()) // 在同一个线程中依次执行
.map(new AnotherMapper())
.keyBy(...) // keyBy 打断了链,因为需要 Shuffle
.window(...)
.aggregate(...);Task Slot 是 TaskManager 资源隔离的基本单位。每个 Slot 有独立的内存预算,但共享 JVM 的 CPU 和网络资源。默认情况下,一个 Slot 可以运行来自同一作业不同算子的 Task(Slot Sharing),这提高了资源利用率——计算密集型算子和 I/O 密集型算子共享同一 Slot 时,可以互补利用 CPU 和 I/O 资源。
8.3 内存模型
Flink 的内存管理将 TaskManager 的总内存划分为多个区域:
- Framework Heap/Off-Heap:Flink 框架自身使用的内存;
- Task Heap/Off-Heap:用户代码(UDF)使用的堆内/堆外内存;
- Network Memory:网络缓冲区,用于算子间的数据传输;
- Managed Memory:由 Flink 管理的堆外内存,RocksDB State Backend、批模式排序和哈希表都使用这部分内存。
taskmanager.memory.process.size: 4096m
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64m
taskmanager.memory.network.max: 1gManaged Memory 的分配比例直接影响 RocksDB 的性能——RocksDB 的 Block Cache 和 Write Buffer 使用 Managed Memory。如果分配不足,RocksDB 的缓存命中率下降,频繁的磁盘 I/O 会显著降低状态访问性能。
参考文献
- Carbone, P., Katsifodimos, A., Ewen, S., Markl, V., Haridi, S., & Tzoumas, K. (2015). “Apache Flink: Stream and Batch Processing in a Single Engine.” IEEE Data Engineering Bulletin, 38(4), 28-38.
- Chandy, K. M., & Lamport, L. (1985). “Distributed Snapshots: Determining Global States of Distributed Systems.” ACM Transactions on Computer Systems, 3(1), 63-75.
- Carbone, P., Fóra, G., Ewen, S., Haridi, S., & Tzoumas, K. (2015). “Lightweight Asynchronous Snapshots for Distributed Dataflows.” arXiv preprint arXiv:1506.08603.
- Akidau, T., Bradshaw, R., Chambers, C., et al. (2015). “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing.” Proceedings of the VLDB Endowment, 8(12), 1792-1803.
- Apache Flink Documentation. https://flink.apache.org/
- Apache Flink Source Code. https://github.com/apache/flink
- Friedman, E., & Tzoumas, K. (2016). Introduction to Apache Flink. O’Reilly Media.
- Hueske, F., & Kalavri, V. (2019). Stream Processing with Apache Flink. O’Reilly Media.
上一篇:Spark 内核 下一篇:Dataflow 模型与流批一体