土法炼钢兴趣小组的算法知识备份

【流式数据处理】事件时间、处理时间与 Watermark

文章导航

分类入口
databasedistributed
标签入口
#flink#event-time#processing-time#ingestion-time#watermark#out-of-order#allowed-lateness#side-output#timely-stream-processing

目录

第 1 篇 把流平台拆成 持久 log + 有状态计算。有状态计算里最常见的一类需求是:按业务发生时间聚合——例如「统计 10:00–10:05 每个用户的点击次数」。网络延迟、移动端离线缓存、跨机房时钟漂移会导致:业务时间 10:00:03 的事件,可能在 10:00:47 才被 Flink 算子读到。若用 算子机器时钟(processing time) 关窗,窗口边界会随负载漂移;若用 事件携带的时间戳(event time),又必须回答:在乱序下,何时认为「10:05 之前的事件已到齐」?

本文回答四个问题(来源:Flink Documentation Timely Stream Processing;Dataflow Model 论文 Akidau et al.):

窗口类型与 Trigger 细节见 第 3 篇ProcessFunctionTimerService 如何注册 event-time 定时器见 第 8 篇。Checkpoint 不改变时间语义,但 重启后 watermark 与 window state 从 checkpoint 恢复第 10 篇)。

环境说明:本机 WSL2,未安装 Flink。API 与语义以 Flink 1.20+ / 2.x 官方文档为准;不粘贴未执行的作业输出或伪造 metrics。第八节给出 乱序 JSON 事件流 + event-time vs processing-time 窗口 的可复现实验骨架。


一、三种时间语义

Flink 文档区分三类与 record 关联的时间(Timely Stream Processing):

语义 定义 典型来源 优点 风险
Event time 业务事实发生时刻 传感器、客户端 event_ts、DB binlog 时间 结果可复现、与批式 WHERE ts BETWEEN ... 对齐 乱序、时钟不准;需 watermark
Processing time 算子所在 JVM 处理该 record 的 wall-clock System.currentTimeMillis() 实现简单、延迟低 重跑结果变;背压会拖慢「时间进度」
Ingestion time record 进入 Flink source 的时刻 Source 注入 比 event time 少乱序 仍非业务时间;语义介于二者之间
flowchart LR
  E["事件发生<br/>event time"]
  K["写入 Kafka<br/>(可能延迟)"]
  F["Flink Source 读取<br/>ingestion / processing"]
  E --> K --> F

1.1 Event time:批流一致的语义锚点

Event time 从 record 字段读取,例如:

public class ClickEvent {
    public String userId;
    public long eventTimeMs; // 业务时间
}

同一输入 log replay 两次,只要 watermark 策略与窗口参数不变,event-time 聚合结果应一致——这是 Kappa「重放 log」与批校正可对齐的基础(第 1 篇)。lakehouse 流式入湖里 days(ts) 分区ts,通常就是 event time(lakehouse/19)。

1.2 Processing time:算子本地时钟

Processing time 窗口由 系统时钟 驱动:ProcessingTimeTimer 在 wall-clock 触发(第 8 篇)。下游 Kafka 消费慢导致背压时,同一 event-time 事件的处理时刻推迟,processing-time 窗口边界 整体后移,统计口径漂移。适用于 近似监控、无业务时间字段的调试管道,不建议作为财务口径默认。

1.3 Ingestion time:Source 打戳

Ingestion time 在 Source 算子入口 统一赋 now(),之后整条管道当作 event time 用该戳。乱序被 限制在 Kafka 分区内到达顺序(分区内相对有序,第 4 篇),比纯 event time 简单,但 无法纠正客户端错误时钟

Flink 1.20+ 推荐通过 WatermarkStrategy 在 Source 上显式声明;旧 API TimeCharacteristic 已废弃。


二、Watermark 的形式化含义

Dataflow Model 与 Flink 文档把 watermark 定义为 事件时间进度标记

在时间 \(t\) 发出的 watermark \(W\),表示 「event time \(\leq W\) 的事件应已全部到达」 的假设(允许后续推翻,见 allowed lateness)。

记某算子当前观察到的 最大 event time\(E_{\max}\)Bounded out-of-orderness 策略常用:

\[ W = E_{\max} - \delta \]

其中 \(\delta \geq 0\)允许的最大乱序宽度maxOutOfOrderness)。\(\delta\) 越大,窗口关得越晚、结果越稳,延迟越大\(\delta\) 过小则迟到事件被当作 窗口外数据 丢弃或进侧输出。

sequenceDiagram
  participant S as Source
  participant W as Watermark 生成
  participant O as Window 算子

  S->>W: record ts=10:00:08
  W->>O: watermark=10:00:03 (δ=5s)
  Note over O: 关闭 event time ≤ 10:00:03 的窗口
  S->>O: record ts=10:00:02 (迟到)
  Note over O: 若 allowed lateness>0 仍可更新

2.1 Watermark 与 processing time 无关

Watermark 只推进 event-time 逻辑;processing-time 窗口 不消费 watermark。混合作业里可以 map 用 processing time 做告警,keyBy 后用 event time 开窗——两套时钟 并行存在,互不替换。

2.2 多并行 Source 的 watermark 合并

每个 Source subtask 独立生成 watermark;下游算子取 所有输入 channel 上的最小 watermark(Flink Timely Stream Processing:对齐 barrier 与 event time 对齐的直觉相同)。因此 最慢 partition 拖住全局 event-time 进度——与 consumer lag 拖慢整条管道类似(第 5 篇)。

2.3 Idle source

若某 partition 长期无数据,其 watermark 不推进,会 阻塞 全局最小 watermark。Flink 提供 withIdleness 策略:空闲超过阈值则标记 channel idle,不再阻塞(来源:Flink Documentation,WatermarkStrategy#withIdleness)。


三、Watermark 生成策略

Flink 1.20+ 在 WatermarkStrategy 上配置(包 org.apache.flink.api.common.eventtime)。

3.1 Bounded out-of-orderness(最常用)

WatermarkStrategy<ClickEvent> strategy =
    WatermarkStrategy
        .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> event.eventTimeMs);

语义:当前 watermark = \(\max(\text{seen event time}) - 5\,\text{s}\)。适合 乱序有上界 的点击流、CDC(binlog 时间通常单调于事务,但跨 partition 仍乱序)。

3.2 Monotonous timestamps(严格单调)

WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
    .withTimestampAssigner((event, ts) -> event.eventTimeMs);

等价于 \(\delta = 0\)分区内 若 ts 严格递增则无迟到;跨 partition 合并后仍可能乱序。适用于 已按 event time 排序的单分区 log

3.3 Periodic vs punctuated 发射

模式 行为 适用
Periodic 按固定 wall-clock 间隔(默认 200ms)发射当前计算的 watermark 高吞吐、ts 密集
Punctuated 在特定 record 上手动 context.emitWatermark 业务边界(如「批次结束」标记)

Periodic 实现类 BoundedOutOfOrdernessWatermarks事件时间轴 上推进,与 processing 间隔 解耦。Punctuated 需自定义 AssignerWithPunctuatedWatermarks(旧 API)或在 ProcessFunction 里 emit(高级用法,第 8 篇)。

3.4 与 Kafka Source 集成

KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("clicks")
    .setGroupId("flink-et")
    .setValueOnlyDeserializer(new ClickEventDeserializer())
    .build();

DataStream<ClickEvent> clicks = env.fromSource(
    source,
    WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((e, ts) -> e.eventTimeMs),
    "kafka-clicks");

Kafka record 不包含 Flink watermark;watermark 完全在 Flink 算子链内生成。Consumer offset 提交 由 checkpoint 管理,与 watermark 数值无直接相等关系(第 10 篇)。


四、窗口触发与 watermark 的联动

窗口算子(第 3 篇)在 event time 模式下:

  1. WindowAssigner 把 record 划入时间区间(如 [10:00, 10:05))。
  2. watermark \(\geq\) 窗口 end time 时,默认 Trigger 认为窗口可关闭并输出聚合结果。
  3. 若配置了 allowed lateness \(L > 0\),窗口在 end time 后 仍保留 state 直到 watermark >= end + L,允许迟到 record 更新已输出结果(或发 retract,取决于 sink)。
clicks
    .keyBy(e -> e.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(lateTag)
    .aggregate(new CountAgg());
参数 含义
窗口长度 5 min 按 event time 切桶
allowedLateness 1 min watermark 超过 end+1min 后才彻底丢弃
sideOutputLateData 超过 allowed lateness 的 record 进侧输出流

4.1 迟到数据的三种处置

策略 行为 代价
丢弃(默认无 lateness) 简单 silently 少计
allowed lateness + 更新 修正已发结果 state 保留更久;下游需支持 upsert/changelog
侧输出 主输出关窗,迟到进 OutputTag 需第二条链路补算或审计

lakehouse upsert sink(lakehouse/19)在 允许更新 语义下可吸收部分迟到修正;纯 append sink 则倾向 侧输出 + 批回补

4.2 Truncate vs 更新

Truncate:窗口关闭后输出 final result,迟到直接丢弃或侧输出。更新:迟到 event 触发 retract + emit(Table API changelog)或 增量修正。Flink DataStream 的 allowedLateness 配合 AggregateFunction 会在 lateness 内 更新窗口 state 并再次 fire——下游 Kafka 若只 append,可能产生 重复统计行,需 幂等键事务 sink第 14–15 篇)。


五、Event time 与 Processing time 窗口对照

同一 ClickEvent 流,两种时间属性下 窗口 membership 不同:

事件 event time 到达 processing time 5min 滚动窗(event) 5min 滚动窗(processing)
A 10:00:50 10:01:02 [10:00,10:05) [10:01,10:06)
B 10:00:02 10:01:05 [10:00,10:05) [10:01,10:06)

A、B 在 event-time 窗 同一桶;在 processing-time 窗 也在同一桶(因到达时刻接近)。若 B 因背压于 10:06:00 才到达,processing 窗可能划入 [10:06,10:11),与 A 分离——口径随负载变化

flowchart TB
  subgraph et ["Event-time 窗口"]
    E1["按 event_ts 分桶"]
    E2["watermark 关窗"]
  end
  subgraph pt ["Processing-time 窗口"]
    P1["按算子 clock 分桶"]
    P2["系统定时器关窗"]
  end

工程建议:对外 KPI、与离线数仓对账 → event time + 明确 \(\delta\) 与 lateness;内部算子延迟告警 → processing time 可接受。


六、实现路径:API 层到 WindowOperator

Flink 内部(源码 WindowOperator,版本随 release 变化)大致路径:

  1. TimestampAssigner 从 record 提取 event time。
  2. WatermarkGenerator 根据 stream 进度 emit watermark。
  3. WindowOperator 收到 record 放入对应 WindowState;收到 watermark 触发 TriggerContext 判定 fire / purge。
  4. StateBackend 持久化未关窗 state(第 9、12 篇)。

ProcessFunction 可绕过内置窗口,用 TimerService.registerEventTimeTimer(windowEnd) 自管边界——适合 自定义会话 gap复杂 Trigger第 3 篇 GlobalWindow 模式)。


七、与 lakehouse / CDC 的时间字段

Debezium CDC 事件常带 source timestamp第 16 篇)。选哪个字段作 event time 影响 乱序程度

字段 乱序特征
binlog event time 单表 partition 上相对有序
Kafka record timestamp 接近 ingestion time
业务 updated_at 可能客户端时钟漂移

入湖 按 event time 分区 时,Flink 窗口 watermark 与 Iceberg 分区裁剪 共用同一时间轴——watermark 过慢会导致 窗口结果延迟湖分区 late arrival 同时发生(第 17 篇)。


八、可复现实验:乱序流上的双窗口对比

下列步骤 未在本写作环境执行;读者可用 Flink 1.20+ local cluster + Socket 或文件 Source 复现。

8.1 生成乱序事件(Python 3)

#!/usr/bin/env python3
"""生成带 event_time_ms 的 JSON 行;故意乱序。"""
import json
import random
import sys
import time

base_ms = int(time.time()) * 1000
events = [
    ("u1", base_ms + 1000),
    ("u2", base_ms + 8000),
    ("u1", base_ms + 2000),   # 乱序:比上一条 u1 早
    ("u2", base_ms + 3000),
]
random.shuffle(events)
for uid, ts in events:
    print(json.dumps({"user_id": uid, "event_time_ms": ts}), flush=True)
// 分支 1:event time + watermark
DataStream<ClickEvent> withWm = raw
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2))
            .withTimestampAssigner((e, ts) -> e.eventTimeMs));

withWm.keyBy(e -> e.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .sum("countField");

// 分支 2:processing time 窗口(对比用)
raw.keyBy(e -> e.userId)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    .sum("countField");

8.3 观察要点(不伪造具体数字)

  1. event-time 分支:乱序 record base+2000 仍计入与 base+1000 同一窗口(在 \(\delta=2s\) 内)。
  2. processing-time 分支:窗口边界随 注入速率 变化;暂停 stdin 输入则 processing 窗 也暂停推进
  3. Flink Web UI → Watermarks 面板(若版本提供)或算子 Records/Lag;调大 \(\delta\) 观察 窗口输出时刻推迟
  4. 开启 allowedLateness(5s),在关窗后注入更迟 record,验证 二次输出或侧输出

实验记录应包含:Flink 版本、并行度 1(排除 shuffle 干扰)、\(\delta\) 与窗口长度。性能数字不是本篇目标第 3 篇 讨论窗口 state 规模。


九、TimestampAssigner 与乱序源

withTimestampAssigner 负责 从 record 提取 long 毫秒时间戳。常见坑:

问题 后果 处理
字段为 0 或 null 默认 全部挤在 epoch,窗口异常 过滤 + 侧输出脏数据
秒 vs 毫秒混用 watermark 暴涨或不动 统一单位;Connect 转换
时区字符串解析 夏令时边界错位 源端 UTC 毫秒
未来时间戳 watermark 超前,关窗延迟 WatermarkStrategy 过滤或 clamp
.withTimestampAssigner((event, recordTimestamp) -> {
    if (event.eventTimeMs <= 0) {
        throw new RuntimeException("invalid ts");
    }
    return event.eventTimeMs;
})

recordTimestamp 参数在 Kafka Source 上常为 Kafka record timestamp(CREATE_TIME / LOG_APPEND_TIME),与业务 event time 可能不一致——assigner 应 显式读 payload 字段,不要默认用 recordTimestamp(除非刻意 ingestion time)。


十、Watermark 在算子链中的传播

Watermark 由 Source 生成,沿 forward / shuffle 向下游广播:

flowchart LR
  SRC["Source subtask<br/>W=100"]
  MAP["map chain"]
  SH["keyBy shuffle"]
  WIN["WindowOperator"]
  SRC --> MAP --> SH --> WIN

因此 最慢源 决定全局进度——与 第 5 篇 consumer lag 监控同类:单 partition stuck 会拖死 event-time 窗口。

ProcessFunctiononTimer 的 event-time timer 触发时刻 \(\leq\) 当前 watermark(TimerService 保证)。自定义算子 不可 随意 emitWatermark 超过 \(E_{\max}\),否则破坏单调性假设。


十一、与 Table/SQL 的时间声明

Table API 在 DDL 中声明 watermark(编译期):

CREATE TABLE clicks (
  user_id STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', ...);

运行时与 DataStream 的 forBoundedOutOfOrderness(5s) 等价。SQL CUMULATE / HOP 窗口(第 3 篇)同样依赖该 watermark。Dynamic Table 的 retract 流在迟到修正时 emit -U / +U changelog——入湖 upsert(lakehouse/19)需 sink 支持 equality delete 或 upsert 模式。


十二、调参与故障模式

现象 可能原因 方向
窗口永不输出 watermark 不推进;某 partition idle 未配置 idleness withIdleness;查 lag
计数偏少 \(\delta\) 过小;无 allowed lateness 增大 \(\delta\) 或侧输出审计
结果延迟大 \(\delta\) 过大;窗口过长 权衡正确性与 SLA
重启后窗口重复输出 at-least-once + 无 sink 幂等 EOS(第 14–15 篇)
与离线对不上 用了 processing time;时区 统一 UTC event time

Watermark 与 checkpoint:checkpoint barrier 与 watermark 独立 传播;对齐机制在 第 10 篇。高 checkpoint 频率 不替代 合理的 \(\delta\)


Flink 2.x DataStream V1 仍使用上述 WatermarkStrategy。Table/SQL 通过 DDL WATERMARK FOR ts AS ts - INTERVAL '5' SECOND 声明,Planner 下沉为相同 runtime 逻辑。本系列以 DataStream 讲清机制;SQL 语法见官方 SQL 文档,不重复教程。


十四、术语表

术语 含义
Event time record 业务时间戳
Processing time 算子处理时刻
Ingestion time 进入 Flink 的时刻
Watermark 事件时间进度;\(W \leq E_{\max} - \delta\) 常见
maxOutOfOrderness \(\delta\),允许乱序上界
Allowed lateness 关窗后仍接受迟到的宽限
Side output 主输出外的 OutputTag
Idle source 无 record 的 partition,需 idleness 策略

十五、小结

Event time 使流式聚合与批式 SQL 在 时间口径上可对齐Watermark 是在乱序假设下 关窗的唯一规则(除非显式 processing time)。\(\delta\)allowed lateness正确性 vs 延迟 的旋钮;迟到数据应 可观测(侧输出或 upsert sink),而不是 silent drop。

下一篇:滚动、滑动、会话窗口 如何分配 state,以及 Trigger / Evictor 如何改变输出节奏。


参考资料

  1. Apache Flink Documentation, Timely Stream Processing(三种时间、watermark、idle)。
  2. Apache Flink Documentation, Windows(allowed lateness、side output late data)。
  3. Akidau, T. et al., The Dataflow Model(watermark 与 window 形式化)。
  4. Apache Flink Documentation, Kafka Connector(Source 与 WatermarkStrategy 组合)。
  5. Apache Flink 源码 WindowOperatorBoundedOutOfOrdernessWatermarks(实现对照,版本随 release 标注)。
  6. 本系列 第 1 篇第 3 篇第 8 篇
  7. lakehouse 第 19 章(入湖时间字段与分区)。

返回 系列目录 | 上一篇:流处理全景 | 下一篇:窗口:滚动、滑动与会话

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。

2026-07-01 · database / distributed

【流式数据处理】窗口:滚动、滑动与会话

从 WindowAssigner 三类(Tumbling、Sliding、Session)出发,讲清窗口 state 如何随 key 与窗口实例增长,Trigger 与 Evictor 如何改变 firing 与清理节奏,GlobalWindow 自定义 Trigger 的边界,并与批式 GROUP BY 时间分桶对照;附三种窗口 state 观测的可复现步骤。

2026-07-01 · database / distributed

【流式数据处理】Flink 运行时模型

从 JobManager、TaskManager、Slot 到 StreamGraph→JobGraph→ExecutionGraph 的四层编译链,讲清 operator chain、并行度、SlotSharingGroup 如何决定任务在集群上的物理形态,并与 Kafka 消费位点提交分工衔接。


By .