第 2 篇 说明:event-time 窗口在 watermark 越过窗口 end 时关闭(默认 Trigger),迟到数据靠 allowed lateness 与 侧输出 处置。本篇回答窗口 几何形状 与 状态成本:
- Tumbling(滚动)、Sliding(滑动)、Session(会话)
三类
WindowAssigner如何把 record 划入窗口集合? - 同一 keyed 流上,三类窗口的 state 条目数 与 输出频率 差多少量级?
- Trigger 与 Evictor 在默认行为之外能改什么?
- GlobalWindow + 自定义 Trigger 适用什么边界?
- 批式
GROUP BY date_trunc('minute', ts)与流式窗口 在状态驻留上 差在哪?
实现锚定 Flink Documentation Windows 与源码
WindowOperator(A 级);版本 Flink
1.20+ / 2.x。先修:第
2 篇 watermark;第
8 篇 keyBy;第 9
篇 Keyed State 存储。
环境说明:本机 WSL2,未安装 Flink。不给出未实测的 state 字节数或 QPS。第十节提供 同一 keyed 流上三种窗口 的 metrics 观测步骤(Flink REST / Web UI),读者自行记录 \(\geq 3\) 轮中位数。
一、窗口在流水线中的位置
典型聚合链:
stream
.keyBy(event -> event.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new MyAgg());flowchart LR
KB["keyBy(userId)"]
WA["WindowAssigner<br/>划分窗口集合"]
TR["Trigger<br/>何时输出"]
EV["Evictor 可选<br/>输出前删 state"]
AGG["Aggregate / Process"]
KB --> WA --> TR --> EV --> AGG
| 组件 | 职责 |
|---|---|
| keyBy | 决定 state 按 key 分片(KeyGroup,第 8 篇) |
| WindowAssigner | 每条 record 落入哪些窗口区间 |
| Trigger | watermark / processing time / 计数等条件下 fire 或 purge |
| Evictor | fire 前从 window state 剔除 部分元素(少见) |
| Aggregation | sum / AggregateFunction /
ProcessWindowFunction |
无 key 的窗口 在 Flink 里需
windowAll(单并行度瓶颈),生产 keyed
窗口为主。
二、Tumbling Window(滚动窗口)
定义:固定长度 \(L\),时间轴划分为 不重叠 区间:
\[ [\,kL,\,(k+1)L \,),\quad k \in \mathbb{Z} \]
Event time 下常用
TumblingEventTimeWindows.of(Time.minutes(5));Processing
time 用 TumblingProcessingTimeWindows。
gantt
title 滚动窗口 L=5min(event time)
dateFormat HH:mm
axisFormat %H:%M
section key=A
win0 : 10:00, 5m
win1 : 10:05, 5m
section key=B
win0 : 10:00, 5m
win1 : 10:05, 5m
2.1 State 规模(估算式)
对每个 key,任意时刻 活跃滚动窗口数 \(\leq 1\)(无 allowed lateness 时)。全作业窗口 state 条目 \(\approx\) 活跃 key 数 × 每窗口 accumulator 大小。
若 allowedLateness = L),已关窗但未 purge
的窗口仍占 state,活跃条目 \(\approx 2 \times
|\text{keys}|\)(量级直觉,具体取决于
Trigger purge 时机)。
2.2 与批式分桶对照
批 SQL:
SELECT user_id, window_start, COUNT(*)
FROM clicks
WHERE dt = '2026-06-30'
GROUP BY user_id, FLOOR_TO_MINUTE(ts, 5);批 job 每行只扫描一次;流式滚动窗 每条 record 更新一次 accumulator,state 从首条 record 驻留到 watermark 关窗(第 1 篇 流表对偶)。
三、Sliding Window(滑动窗口)
定义:长度 \(L\),滑动步长 \(S\)(\(S \leq L\))。record 时间戳 \(t\) 落入:
\[ \{\, [\,kS,\, kS + L) \mid kS \leq t < kS + L,\; k \in \mathbb{Z} \,\} \]
一条 record 可属于多个窗口。例如 \(L=10\,\text{min}\)、\(S=1\,\text{min}\),某事件最多同时属于 10 个 重叠窗口。
SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))flowchart TB
R["record @ 10:07"]
W1["[10:00,10:10)"]
W2["[10:01,10:11)"]
W3["..."]
W10["[10:06,10:16)"]
R --> W1
R --> W2
R --> W3
R --> W10
3.1 State 与输出
| 量 | 相对 Tumbling |
|---|---|
| 每 key 活跃窗口数 | 最高 \(\lceil L/S \rceil\) |
| 每 record 更新次数 | 与所属窗口数成正比 |
| 输出次数 | 每个窗口实例 watermark 到 end 各 fire 一次 |
滑动窗是 state 放大器:\(L/S=60\) 时(10 分钟窗 10 秒 slide),活跃 state 可比同长滚动窗 高一个数量级——第 13 篇 讨论调优与 redesign。
3.2 何时用滑动窗
- 需要 平滑曲线(每分钟输出过去 10 分钟 UV)。
- 可接受 state 成本,或 预聚合 + 更大 \(S\)。
若只需 固定不重叠桶,优先 Tumbling。
四、Session Window(会话窗口)
定义:同一 key 上,相邻 event 间隔 \(<\ G\)(gap)则属同一会话;$ G$ 则开新会话。会话 长度不固定,由数据驱动。
EventTimeSessionWindows.withGap(Time.minutes(30))sequenceDiagram
participant U as user X events
Note over U: 10:00, 10:05, 10:40
Note over U: gap>30min → 两个 session
4.1 合并( merging )
Session 窗口 会合并:新 event 桥接两个
previously 分离的 session 时,Flink 合并 window state(源码
MergingWindowSet)。这比滚动窗
更复杂,checkpoint 时需
序列化合并元数据。
4.2 State 特征
| 特征 | 说明 |
|---|---|
| 每 key 会话数 | 取决于到达模式;突发流量 key 多会话并存 |
| 关窗时机 | event time 下 无 event 超过 G 且 watermark 推进 |
| 热点 key | 长会话导致 单 key state 长期膨胀 |
业务上 用户在线时长、点击序列 常用 session;gap 需 业务定义(30 min 是常见默认,非 universal)。
4.3 Processing time session
ProcessingTimeSessionWindows 用
处理间隔 切会话——用户停顿在背压下可能被切成
更长会话,与 event-time session
口径不同。
五、三类窗口对照表
| 类型 | 窗口边界 | 每 record 窗口数 | 典型 state(每 key) | 输出节奏 |
|---|---|---|---|---|
| Tumbling | 固定、不重叠 | 1 | 1(+ lateness) | 每窗 end 一次 |
| Sliding | 固定、重叠 | \(\leq \lceil L/S \rceil\) | 同上界 | 每个窗口实例一次 |
| Session | 数据驱动 | 1(当前会话) | 活跃会话数 | 会话结束(gap+watermark) |
时间语义 Event / Processing 对三类均适用;ingestion time 等价于在 Source 赋 ts 后的 event time。
六、Trigger:何时 fire、何时 purge
默认
EventTimeTrigger:watermark >= windowEnd
时 fire 并 purge window
state(无 allowed lateness 时)。可自定义:
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.aggregate(...);| Trigger 类 | 行为 |
|---|---|
| EventTimeTrigger | watermark 过 end → fire + purge |
| ProcessingTimeTrigger | processing time 过 end |
| CountTrigger | 元素个数达到阈值(常与 GlobalWindow 联用) |
| ContinuousEventTimeTrigger | 周期 fire 但不 purge(中间结果) |
| PurgingTrigger | 包装器:fire 后 purge |
Fire 与 purge
分离:ContinuousEventTimeTrigger 可
早输出 partial result,最终仍由
EventTimeTrigger 关窗——适用于
大屏渐进刷新;下游需区分 partial vs
final(或通过 retract changelog,Table API)。
6.1 Allowed lateness 与 Trigger 交互
配置 .allowedLateness(T)
后,EventTimeTrigger 在 end 时
fire 但不立即 purge;迟到 record 可更新;当
watermark >= end + T 才 purge(来源:Flink
Windows)。
七、Evictor:fire 前的 state 裁剪
Evictor 在 Trigger fire
之后、将 window contents 交给
ProcessWindowFunction 之前
运行,可从 state 删除部分元素(如只保留最近
N 条)。
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))
.evictor(CountEvictor.of(100))
.process(new MyProcessWindowFunction());Evictor 不常用;多数聚合用 AggregateFunction 增量 accumulator,不保留全量元素。Evictor 适合 需要有限缓冲的 ProcessWindowFunction(如 top-100 近似)。
八、GlobalWindow 与自定义 Trigger
GlobalWindow:单窗口 \((-\infty,
+\infty)\),所有 record 进同一窗口实例 per
key。必须 自定义
Trigger,否则永不输出。
stream.keyBy(e -> e.userId)
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(1000)))
.sum("value");| 适用 | 不适用 |
|---|---|
| 计数批量 flush(每 1000 条输出) | 需要清晰 time bucket 的对账 |
| 自定义会话逻辑(ProcessFunction + timer 有时更简单) | 长窗 + 全量 buffer(OOM 风险) |
复杂 per-key 状态机 往往
KeyedProcessFunction + event-time
timer(第
8 篇)比 GlobalWindow 更清晰。
九、ProcessWindowFunction vs AggregateFunction
| API | 输入 | State |
|---|---|---|
reduce /
aggregate(AggregateFunction) |
增量 accumulator | 小 |
process(ProcessWindowFunction) |
窗口内 Iterable 全量元素 | 需缓冲全元素或配合 Evictor |
// 增量:只存 count + sum
.aggregate(new AverageAggregate())
// 全量:窗口内所有 event 进 Iterable(大窗危险)
.process(new ProcessWindowFunction<In, Out, Key, TimeWindow>() {
@Override
public void process(Key key, Context ctx, Iterable<In> elements, Collector<Out> out) {
// elements 可能很大
}
});生产 大窗口 + 高基数 key 应用 AggregateFunction;需要 排序 top-N 等再考虑 ProcessWindowFunction + 有界 Evictor 或 专用数据结构 state(第 9 篇)。
十、可复现实验:三种窗口的 state 与输出观测
目标:同一 keyBy(user_id)
流,分别跑 Tumbling / Sliding / Session,对比
checkpoint 状态大小趋势 与 sink
记录条数(定性 + 读者自录 metrics)。
10.1 环境声明(模板)
| 项 | 建议值(读者填写实测) |
|---|---|
| Flink | 1.20.x 或 2.x,local cluster |
| 并行度 | 4 |
| StateBackend | HashMapStateBackend(小 state)或 EmbeddedRocksDB |
| 输入 | 合成流:1000 个 user_id,每 user 100 event,event time 随机 ±30s 乱序 |
| 窗口配置 A | Tumbling 5 min event time |
| 窗口配置 B | Sliding 5 min / slide 1 min |
| 窗口配置 C | Session gap 5 min |
10.2 合成输入(Python 骨架)
#!/usr/bin/env python3
import json, random, time
random.seed(42)
base = int(time.time()) * 1000
for uid in range(1000):
for _ in range(100):
ts = base + random.randint(0, 3600_000)
print(json.dumps({"user_id": str(uid), "event_time_ms": ts, "v": 1}))通过 文件 Source 或
Socket
灌入;WatermarkStrategy 使用
forBoundedOutOfOrderness(Duration.ofSeconds(30))(第
2 篇)。
10.3 三条作业(Java 片段)
DataStream<Event> keyed = env.addSource(...).assignTimestampsAndWatermarks(strategy)
.keyBy(e -> e.userId);
keyed.window(TumblingEventTimeWindows.of(Time.minutes(5))).sum("v");
// 另提交:SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1))
// 另提交:EventTimeSessionWindows.withGap(Time.minutes(5))10.4 Metrics 观测(Flink REST)
# 作业 running 后(URL 随集群变化)
curl -s "http://localhost:8081/jobs/<jobId>/vertices/<vertexId>/metrics?get=lastCheckpointSize,lastCheckpointDuration"Web UI → Checkpoints → State
Size;Sink 输出条数用
print() 或 文件 sink
计数(勿与 Kafka EOS 混测以免干扰)。
10.5 预期定性结论(非 benchmark)
- Sliding 的 checkpoint state 大于 Tumbling(同 \(L\) 下活跃窗口倍数 \(\approx L/S\))。
- Session 的 state 随 会话数 波动;gap 小则 session 多、state 条目多。
- 输出条数:Sliding 多于 Tumbling(每 slide 一步一个新窗口实例 fire);Session 取决于 gap 与事件密度。
本文不写入具体 MB 数字;读者按 PLAN 要求 \(\geq 3\) 轮取中位数 填入实验笔记。
十一、与 lakehouse / 批 OLAP 的衔接
- 流式预聚合 后写湖(第 17
篇):窗口长度 对齐 Iceberg
分区粒度(如 1 hour tumbling →
hours(ts)分区),减少 下游批扫描重复聚合。 - CDC upsert(lakehouse/19)常与 无窗口 keyBy 去重 或 短滚动窗 组合;长 session state 与 RocksDB compaction 压力见 第 12–13 篇。
- columnar-engine 系列 批扫描适合 历史回补;流窗口适合 近实时 SLA——第 1 篇 Lambda/Kappa 边界。
十二、设计取舍与反模式
| 反模式 | 后果 | 替代 |
|---|---|---|
| 大 \(L\) 小 \(S\) 滑动窗 | state 爆炸 | Tumbling + 下游 roll-up |
| 无 key 的 windowAll 高并行 | 单 subtask 热点 | keyBy 重新分区 |
| 大窗 ProcessWindowFunction 存全量 | OOM | AggregateFunction |
| Session gap 过小 | 会话碎片化、输出抖动 | 调 gap 或业务 dedupe |
| 忽略 allowed lateness | silently 丢迟到 | 侧输出 + 批回补 |
Watermark 未推进 时三种窗 均不会关窗——窗口问题常与 第 2 篇 lag / idle 同源。
十三、WindowOperator 与 checkpoint
WindowOperator 的
WindowState 是 Keyed State
的一种;checkpoint 时随 KeyGroup 快照(第 10
篇)。RocksDB backend 下每个窗口 accumulator 是
LSM 中的键值(第
12 篇)。增量 checkpoint 只上传变更
SST,窗口 burst 写入会导致 checkpoint
时长波动(第 13
篇)。
十四、Table API / SQL 窗口(点到为止)
Flink SQL:
SELECT user_id, TUMBLE_START(ts, INTERVAL '5' MINUTE), COUNT(*)
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '5' MINUTE);Planner 将 TUMBLE / HOP / CUMULATE /
SESSION 译为相同 WindowOperator
逻辑(Windows in SQL)。Hop 对应
Sliding;Session 需
SESSION(ts, INTERVAL '30' MINUTE)。本系列不展开
SQL 优化;语义与 DataStream 一致。
十五、术语表
| 术语 | 含义 |
|---|---|
| WindowAssigner | 分配 record 到窗口集合 |
| Tumbling | 固定长度、不重叠 |
| Sliding | 固定长度、步长 \(S\) 滑动 |
| Session | gap \(G\) 内合并为会话 |
| Trigger | 控制 fire / purge 时机 |
| Evictor | fire 后输出前裁剪 window 内容 |
| GlobalWindow | 全时间轴单窗,需自定义 Trigger |
| Allowed lateness | 关窗后仍接受迟到的宽限 |
十六、小结
Tumbling state 最省、口径最像批式 time bucket;Sliding 用 \(L/S\) 倍 state 换平滑输出;Session 适应 不规则间隔 但 merging 与 checkpoint 更复杂。Trigger 决定 早输出还是只 end 输出;Evictor 仅在小集合 Process 场景有用。窗口 state 是 Keyed State 最大头之一,设计时先问能否 Tumbling + 下游批聚合 替代大滑动窗。
下一篇进入 Kafka 日志模型与分区:offset、segment 文件、分区内有序如何支撑 keyBy 前的 log 分区策略(第 4 篇)。
参考资料
- Apache Flink Documentation, Windows(Tumbling / Sliding / Session、Trigger、Evictor、allowed lateness)。
- Apache Flink Documentation, Timely Stream Processing(event-time 窗口与 watermark)。
- Apache Flink 源码
WindowOperator、MergingWindowSet、GlobalWindows(版本随 release 标注)。 - Akidau, T. et al., The Dataflow Model(窗口与 trigger 抽象)。
- 本系列 第 2 篇、第 8–9 篇、第 13 篇。
- lakehouse 第 19 章(入湖与流聚合分工)。
返回 系列目录 | 上一篇:事件时间、处理时间与 Watermark | 下一篇:Kafka 日志模型与分区
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】状态放大、Compaction 与调优
在 RocksDB state backend 读写路径之上,拆解窗口 state 膨胀、LSM 写放大与 checkpoint 争抢磁盘、Flink managed memory 与 RocksDBOptionsFactory 调参边界,以及 hot key 导致单 subtask 过热时的诊断与「改 state 设计 vs 拧参数」取舍。
【流式数据处理】流处理全景:从日志到有状态计算
从批、流、微批四维度对比出发,建立「可重放日志 + 有状态计算」心智模型,厘清 Lambda/Kappa 边界与流表对偶,并给出与 lakehouse 入湖侧对称的全系列地图。
【流式数据处理】事件时间、处理时间与 Watermark
拆解 event time、processing time、ingestion time 三种时间语义,给出 watermark 的形式化含义与 bounded-out-of-orderness 等生成策略,并说明侧输出、allowed lateness 如何处理迟到数据;附 event-time 与 processing-time 窗口对比的可复现实验步骤。
【流式数据处理】Flink 运行时模型
从 JobManager、TaskManager、Slot 到 StreamGraph→JobGraph→ExecutionGraph 的四层编译链,讲清 operator chain、并行度、SlotSharingGroup 如何决定任务在集群上的物理形态,并与 Kafka 消费位点提交分工衔接。