第 1 篇 把流平台拆成 持久 log + 有状态计算。有状态计算里最常见的一类需求是:按业务发生时间聚合——例如「统计 10:00–10:05 每个用户的点击次数」。网络延迟、移动端离线缓存、跨机房时钟漂移会导致:业务时间 10:00:03 的事件,可能在 10:00:47 才被 Flink 算子读到。若用 算子机器时钟(processing time) 关窗,窗口边界会随负载漂移;若用 事件携带的时间戳(event time),又必须回答:在乱序下,何时认为「10:05 之前的事件已到齐」?
本文回答四个问题(来源:Flink Documentation Timely Stream Processing;Dataflow Model 论文 Akidau et al.):
- Event time、processing time、ingestion time 各自度量什么,在生产里如何选?
- Watermark \(W(t)\) 的语义是什么,如何与窗口触发联动?
- Periodic / punctuated watermark、bounded out-of-orderness 策略各适用什么场景?
- 迟到数据 如何通过 allowed lateness、侧输出(side output) 处理,而不 silently 丢数?
窗口类型与 Trigger 细节见 第 3
篇;ProcessFunction 与
TimerService 如何注册 event-time 定时器见
第
8 篇。Checkpoint 不改变时间语义,但 重启后
watermark 与 window state 从 checkpoint 恢复(第 10
篇)。
环境说明:本机 WSL2,未安装 Flink。API 与语义以 Flink 1.20+ / 2.x 官方文档为准;不粘贴未执行的作业输出或伪造 metrics。第八节给出 乱序 JSON 事件流 + event-time vs processing-time 窗口 的可复现实验骨架。
一、三种时间语义
Flink 文档区分三类与 record 关联的时间(Timely Stream Processing):
| 语义 | 定义 | 典型来源 | 优点 | 风险 |
|---|---|---|---|---|
| Event time | 业务事实发生时刻 | 传感器、客户端 event_ts、DB binlog
时间 |
结果可复现、与批式 WHERE ts BETWEEN ...
对齐 |
乱序、时钟不准;需 watermark |
| Processing time | 算子所在 JVM 处理该 record 的 wall-clock | System.currentTimeMillis() |
实现简单、延迟低 | 重跑结果变;背压会拖慢「时间进度」 |
| Ingestion time | record 进入 Flink source 的时刻 | Source 注入 | 比 event time 少乱序 | 仍非业务时间;语义介于二者之间 |
flowchart LR
E["事件发生<br/>event time"]
K["写入 Kafka<br/>(可能延迟)"]
F["Flink Source 读取<br/>ingestion / processing"]
E --> K --> F
1.1 Event time:批流一致的语义锚点
Event time 从 record 字段读取,例如:
public class ClickEvent {
public String userId;
public long eventTimeMs; // 业务时间
}同一输入 log replay 两次,只要 watermark
策略与窗口参数不变,event-time
聚合结果应一致——这是 Kappa「重放
log」与批校正可对齐的基础(第 1
篇)。lakehouse 流式入湖里 按
days(ts) 分区 的
ts,通常就是 event time(lakehouse/19)。
1.2 Processing time:算子本地时钟
Processing time 窗口由 系统时钟
驱动:ProcessingTimeTimer 在 wall-clock
触发(第
8 篇)。下游 Kafka 消费慢导致背压时,同一
event-time 事件的处理时刻推迟,processing-time
窗口边界 整体后移,统计口径漂移。适用于
近似监控、无业务时间字段的调试管道,不建议作为财务口径默认。
1.3 Ingestion time:Source 打戳
Ingestion time 在 Source 算子入口 统一赋
now(),之后整条管道当作 event time
用该戳。乱序被 限制在 Kafka
分区内到达顺序(分区内相对有序,第
4 篇),比纯 event time 简单,但
无法纠正客户端错误时钟。
Flink 1.20+ 推荐通过 WatermarkStrategy 在
Source 上显式声明;旧 API TimeCharacteristic
已废弃。
二、Watermark 的形式化含义
Dataflow Model 与 Flink 文档把 watermark 定义为 事件时间进度标记:
在时间 \(t\) 发出的 watermark \(W\),表示 「event time \(\leq W\) 的事件应已全部到达」 的假设(允许后续推翻,见 allowed lateness)。
记某算子当前观察到的 最大 event time 为 \(E_{\max}\)。Bounded out-of-orderness 策略常用:
\[ W = E_{\max} - \delta \]
其中 \(\delta \geq 0\)
是
允许的最大乱序宽度(maxOutOfOrderness)。\(\delta\)
越大,窗口关得越晚、结果越稳,延迟越大;\(\delta\) 过小则迟到事件被当作
窗口外数据 丢弃或进侧输出。
sequenceDiagram
participant S as Source
participant W as Watermark 生成
participant O as Window 算子
S->>W: record ts=10:00:08
W->>O: watermark=10:00:03 (δ=5s)
Note over O: 关闭 event time ≤ 10:00:03 的窗口
S->>O: record ts=10:00:02 (迟到)
Note over O: 若 allowed lateness>0 仍可更新
2.1 Watermark 与 processing time 无关
Watermark 只推进 event-time 逻辑;processing-time 窗口 不消费 watermark。混合作业里可以 map 用 processing time 做告警,keyBy 后用 event time 开窗——两套时钟 并行存在,互不替换。
2.2 多并行 Source 的 watermark 合并
每个 Source subtask 独立生成 watermark;下游算子取 所有输入 channel 上的最小 watermark(Flink Timely Stream Processing:对齐 barrier 与 event time 对齐的直觉相同)。因此 最慢 partition 拖住全局 event-time 进度——与 consumer lag 拖慢整条管道类似(第 5 篇)。
2.3 Idle source
若某 partition 长期无数据,其 watermark 不推进,会 阻塞 全局最小 watermark。Flink 提供 withIdleness 策略:空闲超过阈值则标记 channel idle,不再阻塞(来源:Flink Documentation,WatermarkStrategy#withIdleness)。
三、Watermark 生成策略
Flink 1.20+ 在 WatermarkStrategy 上配置(包
org.apache.flink.api.common.eventtime)。
3.1 Bounded out-of-orderness(最常用)
WatermarkStrategy<ClickEvent> strategy =
WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.eventTimeMs);语义:当前 watermark = \(\max(\text{seen event time}) - 5\,\text{s}\)。适合 乱序有上界 的点击流、CDC(binlog 时间通常单调于事务,但跨 partition 仍乱序)。
3.2 Monotonous timestamps(严格单调)
WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.eventTimeMs);等价于 \(\delta = 0\),分区内 若 ts 严格递增则无迟到;跨 partition 合并后仍可能乱序。适用于 已按 event time 排序的单分区 log。
3.3 Periodic vs punctuated 发射
| 模式 | 行为 | 适用 |
|---|---|---|
| Periodic | 按固定 wall-clock 间隔(默认 200ms)发射当前计算的 watermark | 高吞吐、ts 密集 |
| Punctuated | 在特定 record 上手动
context.emitWatermark |
业务边界(如「批次结束」标记) |
Periodic 实现类
BoundedOutOfOrdernessWatermarks 在
事件时间轴 上推进,与 processing
间隔 解耦。Punctuated 需自定义
AssignerWithPunctuatedWatermarks(旧 API)或在
ProcessFunction 里 emit(高级用法,第
8 篇)。
3.4 与 Kafka Source 集成
KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics("clicks")
.setGroupId("flink-et")
.setValueOnlyDeserializer(new ClickEventDeserializer())
.build();
DataStream<ClickEvent> clicks = env.fromSource(
source,
WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.eventTimeMs),
"kafka-clicks");Kafka record 不包含 Flink watermark;watermark 完全在 Flink 算子链内生成。Consumer offset 提交 由 checkpoint 管理,与 watermark 数值无直接相等关系(第 10 篇)。
四、窗口触发与 watermark 的联动
窗口算子(第 3 篇)在 event time 模式下:
- WindowAssigner 把 record
划入时间区间(如
[10:00, 10:05))。 - 当 watermark \(\geq\) 窗口 end time 时,默认 Trigger 认为窗口可关闭并输出聚合结果。
- 若配置了 allowed lateness \(L > 0\),窗口在 end time 后
仍保留 state 直到
watermark >= end + L,允许迟到 record 更新已输出结果(或发 retract,取决于 sink)。
clicks
.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateTag)
.aggregate(new CountAgg());| 参数 | 含义 |
|---|---|
| 窗口长度 5 min | 按 event time 切桶 |
| allowedLateness 1 min | watermark 超过 end+1min 后才彻底丢弃 |
| sideOutputLateData | 超过 allowed lateness 的 record 进侧输出流 |
4.1 迟到数据的三种处置
| 策略 | 行为 | 代价 |
|---|---|---|
| 丢弃(默认无 lateness) | 简单 | silently 少计 |
| allowed lateness + 更新 | 修正已发结果 | state 保留更久;下游需支持 upsert/changelog |
| 侧输出 | 主输出关窗,迟到进 OutputTag |
需第二条链路补算或审计 |
lakehouse upsert sink(lakehouse/19)在 允许更新 语义下可吸收部分迟到修正;纯 append sink 则倾向 侧输出 + 批回补。
4.2 Truncate vs 更新
Truncate:窗口关闭后输出 final
result,迟到直接丢弃或侧输出。更新:迟到
event 触发 retract + emit(Table API
changelog)或 增量修正。Flink DataStream 的
allowedLateness 配合
AggregateFunction 会在 lateness 内
更新窗口 state 并再次 fire——下游 Kafka 若只
append,可能产生 重复统计行,需
幂等键 或 事务 sink(第
14–15 篇)。
五、Event time 与 Processing time 窗口对照
同一 ClickEvent 流,两种时间属性下
窗口 membership 不同:
| 事件 | event time | 到达 processing time | 5min 滚动窗(event) | 5min 滚动窗(processing) |
|---|---|---|---|---|
| A | 10:00:50 | 10:01:02 | [10:00,10:05) | [10:01,10:06) |
| B | 10:00:02 | 10:01:05 | [10:00,10:05) | [10:01,10:06) |
A、B 在 event-time 窗 同一桶;在 processing-time 窗 也在同一桶(因到达时刻接近)。若 B 因背压于 10:06:00 才到达,processing 窗可能划入 [10:06,10:11),与 A 分离——口径随负载变化。
flowchart TB
subgraph et ["Event-time 窗口"]
E1["按 event_ts 分桶"]
E2["watermark 关窗"]
end
subgraph pt ["Processing-time 窗口"]
P1["按算子 clock 分桶"]
P2["系统定时器关窗"]
end
工程建议:对外 KPI、与离线数仓对账 → event time + 明确 \(\delta\) 与 lateness;内部算子延迟告警 → processing time 可接受。
六、实现路径:API 层到 WindowOperator
Flink 内部(源码 WindowOperator,版本随
release 变化)大致路径:
- TimestampAssigner 从 record 提取 event time。
- WatermarkGenerator 根据 stream 进度 emit watermark。
- WindowOperator 收到 record 放入对应 WindowState;收到 watermark 触发 TriggerContext 判定 fire / purge。
- StateBackend 持久化未关窗 state(第 9、12 篇)。
ProcessFunction 可绕过内置窗口,用
TimerService.registerEventTimeTimer(windowEnd)
自管边界——适合 自定义会话 gap 或
复杂 Trigger(第 3 篇
GlobalWindow 模式)。
七、与 lakehouse / CDC 的时间字段
Debezium CDC 事件常带 source timestamp(第 16 篇)。选哪个字段作 event time 影响 乱序程度:
| 字段 | 乱序特征 |
|---|---|
| binlog event time | 单表 partition 上相对有序 |
| Kafka record timestamp | 接近 ingestion time |
业务 updated_at |
可能客户端时钟漂移 |
入湖 按 event time 分区 时,Flink 窗口 watermark 与 Iceberg 分区裁剪 共用同一时间轴——watermark 过慢会导致 窗口结果延迟 与 湖分区 late arrival 同时发生(第 17 篇)。
八、可复现实验:乱序流上的双窗口对比
下列步骤 未在本写作环境执行;读者可用 Flink 1.20+ local cluster + Socket 或文件 Source 复现。
8.1 生成乱序事件(Python 3)
#!/usr/bin/env python3
"""生成带 event_time_ms 的 JSON 行;故意乱序。"""
import json
import random
import sys
import time
base_ms = int(time.time()) * 1000
events = [
("u1", base_ms + 1000),
("u2", base_ms + 8000),
("u1", base_ms + 2000), # 乱序:比上一条 u1 早
("u2", base_ms + 3000),
]
random.shuffle(events)
for uid, ts in events:
print(json.dumps({"user_id": uid, "event_time_ms": ts}), flush=True)8.2 Flink 作业骨架(Java,节选)
// 分支 1:event time + watermark
DataStream<ClickEvent> withWm = raw
.assignTimestampsAndWatermarks(
WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((e, ts) -> e.eventTimeMs));
withWm.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.sum("countField");
// 分支 2:processing time 窗口(对比用)
raw.keyBy(e -> e.userId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum("countField");8.3 观察要点(不伪造具体数字)
- event-time 分支:乱序 record
base+2000仍计入与base+1000同一窗口(在 \(\delta=2s\) 内)。 - processing-time 分支:窗口边界随 注入速率 变化;暂停 stdin 输入则 processing 窗 也暂停推进。
- Flink Web UI → Watermarks 面板(若版本提供)或算子 Records/Lag;调大 \(\delta\) 观察 窗口输出时刻推迟。
- 开启 allowedLateness(5s),在关窗后注入更迟 record,验证 二次输出或侧输出。
实验记录应包含:Flink 版本、并行度 1(排除 shuffle 干扰)、\(\delta\) 与窗口长度。性能数字不是本篇目标;第 3 篇 讨论窗口 state 规模。
九、TimestampAssigner 与乱序源
withTimestampAssigner 负责 从 record
提取 long 毫秒时间戳。常见坑:
| 问题 | 后果 | 处理 |
|---|---|---|
| 字段为 0 或 null 默认 | 全部挤在 epoch,窗口异常 | 过滤 + 侧输出脏数据 |
| 秒 vs 毫秒混用 | watermark 暴涨或不动 | 统一单位;Connect 转换 |
| 时区字符串解析 | 夏令时边界错位 | 源端 UTC 毫秒 |
| 未来时间戳 | watermark 超前,关窗延迟 | WatermarkStrategy 过滤或 clamp |
.withTimestampAssigner((event, recordTimestamp) -> {
if (event.eventTimeMs <= 0) {
throw new RuntimeException("invalid ts");
}
return event.eventTimeMs;
})recordTimestamp 参数在 Kafka Source 上常为 Kafka record timestamp(CREATE_TIME / LOG_APPEND_TIME),与业务 event time 可能不一致——assigner 应 显式读 payload 字段,不要默认用 recordTimestamp(除非刻意 ingestion time)。
十、Watermark 在算子链中的传播
Watermark 由 Source 生成,沿 forward / shuffle 向下游广播:
flowchart LR
SRC["Source subtask<br/>W=100"]
MAP["map chain"]
SH["keyBy shuffle"]
WIN["WindowOperator"]
SRC --> MAP --> SH --> WIN
- One-to-one chain:watermark 直通,无合并。
- keyBy / rebalance 后:某 subtask 收到 多路上游 时取 min(watermark)。
- 多 Source union:各源 watermark 取 min。
因此 最慢源 决定全局进度——与 第 5 篇 consumer lag 监控同类:单 partition stuck 会拖死 event-time 窗口。
ProcessFunction 内 onTimer
的 event-time timer 触发时刻 \(\leq\) 当前
watermark(TimerService 保证)。自定义算子
不可 随意 emitWatermark 超过
\(E_{\max}\),否则破坏单调性假设。
十一、与 Table/SQL 的时间声明
Table API 在 DDL 中声明 watermark(编译期):
CREATE TABLE clicks (
user_id STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', ...);运行时与 DataStream 的
forBoundedOutOfOrderness(5s)
等价。SQL CUMULATE / HOP
窗口(第 3
篇)同样依赖该 watermark。Dynamic Table
的 retract 流在迟到修正时 emit -U /
+U changelog——入湖 upsert(lakehouse/19)需
sink 支持 equality delete 或 upsert
模式。
十二、调参与故障模式
| 现象 | 可能原因 | 方向 |
|---|---|---|
| 窗口永不输出 | watermark 不推进;某 partition idle 未配置 idleness | withIdleness;查 lag |
| 计数偏少 | \(\delta\) 过小;无 allowed lateness | 增大 \(\delta\) 或侧输出审计 |
| 结果延迟大 | \(\delta\) 过大;窗口过长 | 权衡正确性与 SLA |
| 重启后窗口重复输出 | at-least-once + 无 sink 幂等 | EOS(第 14–15 篇) |
| 与离线对不上 | 用了 processing time;时区 | 统一 UTC event time |
Watermark 与 checkpoint:checkpoint barrier 与 watermark 独立 传播;对齐机制在 第 10 篇。高 checkpoint 频率 不替代 合理的 \(\delta\)。
十三、Flink 2.x 与 Table API 备注
Flink 2.x DataStream V1 仍使用上述
WatermarkStrategy。Table/SQL 通过 DDL
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
声明,Planner 下沉为相同 runtime 逻辑。本系列以 DataStream
讲清机制;SQL 语法见官方 SQL 文档,不重复教程。
十四、术语表
| 术语 | 含义 |
|---|---|
| Event time | record 业务时间戳 |
| Processing time | 算子处理时刻 |
| Ingestion time | 进入 Flink 的时刻 |
| Watermark | 事件时间进度;\(W \leq E_{\max} - \delta\) 常见 |
| maxOutOfOrderness | \(\delta\),允许乱序上界 |
| Allowed lateness | 关窗后仍接受迟到的宽限 |
| Side output | 主输出外的 OutputTag 流 |
| Idle source | 无 record 的 partition,需 idleness 策略 |
十五、小结
Event time 使流式聚合与批式 SQL 在 时间口径上可对齐;Watermark 是在乱序假设下 关窗的唯一规则(除非显式 processing time)。\(\delta\) 与 allowed lateness 是 正确性 vs 延迟 的旋钮;迟到数据应 可观测(侧输出或 upsert sink),而不是 silent drop。
下一篇:滚动、滑动、会话窗口 如何分配 state,以及 Trigger / Evictor 如何改变输出节奏。
参考资料
- Apache Flink Documentation, Timely Stream Processing(三种时间、watermark、idle)。
- Apache Flink Documentation, Windows(allowed lateness、side output late data)。
- Akidau, T. et al., The Dataflow Model(watermark 与 window 形式化)。
- Apache Flink Documentation, Kafka Connector(Source 与 WatermarkStrategy 组合)。
- Apache Flink 源码
WindowOperator、BoundedOutOfOrdernessWatermarks(实现对照,版本随 release 标注)。 - 本系列 第 1 篇、第 3 篇、第 8 篇。
- lakehouse 第 19 章(入湖时间字段与分区)。
返回 系列目录 | 上一篇:流处理全景 | 下一篇:窗口:滚动、滑动与会话
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】Kafka · Flink · 状态 · Exactly-Once
承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。
【流式数据处理】流处理全景:从日志到有状态计算
从批、流、微批四维度对比出发,建立「可重放日志 + 有状态计算」心智模型,厘清 Lambda/Kappa 边界与流表对偶,并给出与 lakehouse 入湖侧对称的全系列地图。
【流式数据处理】窗口:滚动、滑动与会话
从 WindowAssigner 三类(Tumbling、Sliding、Session)出发,讲清窗口 state 如何随 key 与窗口实例增长,Trigger 与 Evictor 如何改变 firing 与清理节奏,GlobalWindow 自定义 Trigger 的边界,并与批式 GROUP BY 时间分桶对照;附三种窗口 state 观测的可复现步骤。
【流式数据处理】Flink 运行时模型
从 JobManager、TaskManager、Slot 到 StreamGraph→JobGraph→ExecutionGraph 的四层编译链,讲清 operator chain、并行度、SlotSharingGroup 如何决定任务在集群上的物理形态,并与 Kafka 消费位点提交分工衔接。