土法炼钢兴趣小组的算法知识备份

【流式数据处理】窗口:滚动、滑动与会话

文章导航

分类入口
databasedistributed
标签入口
#flink#window#tumbling-window#sliding-window#session-window#trigger#evictor#global-window#window-state#group-by

目录

第 2 篇 说明:event-time 窗口在 watermark 越过窗口 end 时关闭(默认 Trigger),迟到数据靠 allowed lateness侧输出 处置。本篇回答窗口 几何形状状态成本

实现锚定 Flink Documentation Windows 与源码 WindowOperator(A 级);版本 Flink 1.20+ / 2.x。先修:第 2 篇 watermark;第 8 篇 keyBy第 9 篇 Keyed State 存储。

环境说明:本机 WSL2,未安装 Flink不给出未实测的 state 字节数或 QPS。第十节提供 同一 keyed 流上三种窗口 的 metrics 观测步骤(Flink REST / Web UI),读者自行记录 \(\geq 3\) 轮中位数。


一、窗口在流水线中的位置

典型聚合链:

stream
    .keyBy(event -> event.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new MyAgg());
flowchart LR
  KB["keyBy(userId)"]
  WA["WindowAssigner<br/>划分窗口集合"]
  TR["Trigger<br/>何时输出"]
  EV["Evictor 可选<br/>输出前删 state"]
  AGG["Aggregate / Process"]
  KB --> WA --> TR --> EV --> AGG
组件 职责
keyBy 决定 state 按 key 分片(KeyGroup,第 8 篇
WindowAssigner 每条 record 落入哪些窗口区间
Trigger watermark / processing time / 计数等条件下 firepurge
Evictor fire 前从 window state 剔除 部分元素(少见)
Aggregation sum / AggregateFunction / ProcessWindowFunction

无 key 的窗口 在 Flink 里需 windowAll(单并行度瓶颈),生产 keyed 窗口为主。


二、Tumbling Window(滚动窗口)

定义:固定长度 \(L\),时间轴划分为 不重叠 区间:

\[ [\,kL,\,(k+1)L \,),\quad k \in \mathbb{Z} \]

Event time 下常用 TumblingEventTimeWindows.of(Time.minutes(5));Processing time 用 TumblingProcessingTimeWindows

gantt
  title 滚动窗口 L=5min(event time)
  dateFormat HH:mm
  axisFormat %H:%M
  section key=A
  win0 : 10:00, 5m
  win1 : 10:05, 5m
  section key=B
  win0 : 10:00, 5m
  win1 : 10:05, 5m

2.1 State 规模(估算式)

对每个 key,任意时刻 活跃滚动窗口数 \(\leq 1\)(无 allowed lateness 时)。全作业窗口 state 条目 \(\approx\) 活跃 key 数 × 每窗口 accumulator 大小

allowedLateness = L),已关窗但未 purge 的窗口仍占 state,活跃条目 \(\approx 2 \times |\text{keys}|\)(量级直觉,具体取决于 Trigger purge 时机)。

2.2 与批式分桶对照

批 SQL:

SELECT user_id, window_start, COUNT(*)
FROM clicks
WHERE dt = '2026-06-30'
GROUP BY user_id, FLOOR_TO_MINUTE(ts, 5);

批 job 每行只扫描一次;流式滚动窗 每条 record 更新一次 accumulator,state 从首条 record 驻留到 watermark 关窗第 1 篇 流表对偶)。


三、Sliding Window(滑动窗口)

定义:长度 \(L\),滑动步长 \(S\)\(S \leq L\))。record 时间戳 \(t\) 落入:

\[ \{\, [\,kS,\, kS + L) \mid kS \leq t < kS + L,\; k \in \mathbb{Z} \,\} \]

一条 record 可属于多个窗口。例如 \(L=10\,\text{min}\)\(S=1\,\text{min}\),某事件最多同时属于 10 个 重叠窗口。

SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))
flowchart TB
  R["record @ 10:07"]
  W1["[10:00,10:10)"]
  W2["[10:01,10:11)"]
  W3["..."]
  W10["[10:06,10:16)"]
  R --> W1
  R --> W2
  R --> W3
  R --> W10

3.1 State 与输出

相对 Tumbling
每 key 活跃窗口数 最高 \(\lceil L/S \rceil\)
每 record 更新次数 与所属窗口数成正比
输出次数 每个窗口实例 watermark 到 end 各 fire 一次

滑动窗是 state 放大器\(L/S=60\) 时(10 分钟窗 10 秒 slide),活跃 state 可比同长滚动窗 高一个数量级——第 13 篇 讨论调优与 redesign。

3.2 何时用滑动窗

若只需 固定不重叠桶,优先 Tumbling


四、Session Window(会话窗口)

定义:同一 key 上,相邻 event 间隔 \(<\ G\)(gap)则属同一会话;$ G$ 则开新会话。会话 长度不固定,由数据驱动。

EventTimeSessionWindows.withGap(Time.minutes(30))
sequenceDiagram
  participant U as user X events
  Note over U: 10:00, 10:05, 10:40
  Note over U: gap>30min → 两个 session

4.1 合并( merging )

Session 窗口 会合并:新 event 桥接两个 previously 分离的 session 时,Flink 合并 window state(源码 MergingWindowSet)。这比滚动窗 更复杂,checkpoint 时需 序列化合并元数据

4.2 State 特征

特征 说明
每 key 会话数 取决于到达模式;突发流量 key 多会话并存
关窗时机 event time 下 无 event 超过 G 且 watermark 推进
热点 key 长会话导致 单 key state 长期膨胀

业务上 用户在线时长、点击序列 常用 session;gap 需 业务定义(30 min 是常见默认,非 universal)。

4.3 Processing time session

ProcessingTimeSessionWindows处理间隔 切会话——用户停顿在背压下可能被切成 更长会话,与 event-time session 口径不同。


五、三类窗口对照表

类型 窗口边界 每 record 窗口数 典型 state(每 key) 输出节奏
Tumbling 固定、不重叠 1 1(+ lateness) 每窗 end 一次
Sliding 固定、重叠 \(\leq \lceil L/S \rceil\) 同上界 每个窗口实例一次
Session 数据驱动 1(当前会话) 活跃会话数 会话结束(gap+watermark)

时间语义 Event / Processing 对三类均适用;ingestion time 等价于在 Source 赋 ts 后的 event time。


六、Trigger:何时 fire、何时 purge

默认 EventTimeTriggerwatermark >= windowEndfirepurge window state(无 allowed lateness 时)。可自定义:

stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
    .aggregate(...);
Trigger 类 行为
EventTimeTrigger watermark 过 end → fire + purge
ProcessingTimeTrigger processing time 过 end
CountTrigger 元素个数达到阈值(常与 GlobalWindow 联用)
ContinuousEventTimeTrigger 周期 fire 但不 purge(中间结果)
PurgingTrigger 包装器:fire 后 purge

Fire 与 purge 分离ContinuousEventTimeTrigger早输出 partial result,最终仍由 EventTimeTrigger 关窗——适用于 大屏渐进刷新;下游需区分 partial vs final(或通过 retract changelog,Table API)。

6.1 Allowed lateness 与 Trigger 交互

配置 .allowedLateness(T) 后,EventTimeTriggerendfire 但不立即 purge;迟到 record 可更新;当 watermark >= end + T 才 purge(来源:Flink Windows)。


七、Evictor:fire 前的 state 裁剪

Evictor 在 Trigger fire 之后、将 window contents 交给 ProcessWindowFunction 之前 运行,可从 state 删除部分元素(如只保留最近 N 条)。

.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))
.evictor(CountEvictor.of(100))
.process(new MyProcessWindowFunction());

Evictor 不常用;多数聚合用 AggregateFunction 增量 accumulator,不保留全量元素。Evictor 适合 需要有限缓冲的 ProcessWindowFunction(如 top-100 近似)。


八、GlobalWindow 与自定义 Trigger

GlobalWindow单窗口 \((-\infty, +\infty)\),所有 record 进同一窗口实例 per key。必须 自定义 Trigger,否则永不输出。

stream.keyBy(e -> e.userId)
    .window(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountTrigger.of(1000)))
    .sum("value");
适用 不适用
计数批量 flush(每 1000 条输出) 需要清晰 time bucket 的对账
自定义会话逻辑(ProcessFunction + timer 有时更简单) 长窗 + 全量 buffer(OOM 风险)

复杂 per-key 状态机 往往 KeyedProcessFunction + event-time timer第 8 篇)比 GlobalWindow 更清晰。


九、ProcessWindowFunction vs AggregateFunction

API 输入 State
reduce / aggregate(AggregateFunction) 增量 accumulator
process(ProcessWindowFunction) 窗口内 Iterable 全量元素 需缓冲全元素或配合 Evictor
// 增量:只存 count + sum
.aggregate(new AverageAggregate())

// 全量:窗口内所有 event 进 Iterable(大窗危险)
.process(new ProcessWindowFunction<In, Out, Key, TimeWindow>() {
    @Override
    public void process(Key key, Context ctx, Iterable<In> elements, Collector<Out> out) {
        // elements 可能很大
    }
});

生产 大窗口 + 高基数 key 应用 AggregateFunction;需要 排序 top-N 等再考虑 ProcessWindowFunction + 有界 Evictor专用数据结构 state第 9 篇)。


十、可复现实验:三种窗口的 state 与输出观测

目标:同一 keyBy(user_id) 流,分别跑 Tumbling / Sliding / Session,对比 checkpoint 状态大小趋势sink 记录条数(定性 + 读者自录 metrics)。

10.1 环境声明(模板)

建议值(读者填写实测)
Flink 1.20.x 或 2.x,local cluster
并行度 4
StateBackend HashMapStateBackend(小 state)或 EmbeddedRocksDB
输入 合成流:1000 个 user_id,每 user 100 event,event time 随机 ±30s 乱序
窗口配置 A Tumbling 5 min event time
窗口配置 B Sliding 5 min / slide 1 min
窗口配置 C Session gap 5 min

10.2 合成输入(Python 骨架)

#!/usr/bin/env python3
import json, random, time
random.seed(42)
base = int(time.time()) * 1000
for uid in range(1000):
    for _ in range(100):
        ts = base + random.randint(0, 3600_000)
        print(json.dumps({"user_id": str(uid), "event_time_ms": ts, "v": 1}))

通过 文件 SourceSocket 灌入;WatermarkStrategy 使用 forBoundedOutOfOrderness(Duration.ofSeconds(30))第 2 篇)。

10.3 三条作业(Java 片段)

DataStream<Event> keyed = env.addSource(...).assignTimestampsAndWatermarks(strategy)
    .keyBy(e -> e.userId);

keyed.window(TumblingEventTimeWindows.of(Time.minutes(5))).sum("v");
// 另提交:SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1))
// 另提交:EventTimeSessionWindows.withGap(Time.minutes(5))
# 作业 running 后(URL 随集群变化)
curl -s "http://localhost:8081/jobs/<jobId>/vertices/<vertexId>/metrics?get=lastCheckpointSize,lastCheckpointDuration"

Web UI → CheckpointsState SizeSink 输出条数用 print()文件 sink 计数(勿与 Kafka EOS 混测以免干扰)。

10.5 预期定性结论(非 benchmark)

  1. Sliding 的 checkpoint state 大于 Tumbling(同 \(L\) 下活跃窗口倍数 \(\approx L/S\))。
  2. Session 的 state 随 会话数 波动;gap 小则 session 多、state 条目多。
  3. 输出条数:Sliding 多于 Tumbling(每 slide 一步一个新窗口实例 fire);Session 取决于 gap 与事件密度。

本文不写入具体 MB 数字;读者按 PLAN 要求 \(\geq 3\) 轮取中位数 填入实验笔记。


十一、与 lakehouse / 批 OLAP 的衔接


十二、设计取舍与反模式

反模式 后果 替代
\(L\)\(S\) 滑动窗 state 爆炸 Tumbling + 下游 roll-up
无 key 的 windowAll 高并行 单 subtask 热点 keyBy 重新分区
大窗 ProcessWindowFunction 存全量 OOM AggregateFunction
Session gap 过小 会话碎片化、输出抖动 调 gap 或业务 dedupe
忽略 allowed lateness silently 丢迟到 侧输出 + 批回补

Watermark 未推进 时三种窗 均不会关窗——窗口问题常与 第 2 篇 lag / idle 同源。


十三、WindowOperator 与 checkpoint

WindowOperatorWindowState 是 Keyed State 的一种;checkpoint 时随 KeyGroup 快照(第 10 篇)。RocksDB backend 下每个窗口 accumulator 是 LSM 中的键值第 12 篇)。增量 checkpoint 只上传变更 SST,窗口 burst 写入会导致 checkpoint 时长波动第 13 篇)。


十四、Table API / SQL 窗口(点到为止)

Flink SQL:

SELECT user_id, TUMBLE_START(ts, INTERVAL '5' MINUTE), COUNT(*)
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '5' MINUTE);

Planner 将 TUMBLE / HOP / CUMULATE / SESSION 译为相同 WindowOperator 逻辑(Windows in SQL)。Hop 对应 Sliding;Session 需 SESSION(ts, INTERVAL '30' MINUTE)。本系列不展开 SQL 优化;语义与 DataStream 一致。


十五、术语表

术语 含义
WindowAssigner 分配 record 到窗口集合
Tumbling 固定长度、不重叠
Sliding 固定长度、步长 \(S\) 滑动
Session gap \(G\) 内合并为会话
Trigger 控制 fire / purge 时机
Evictor fire 后输出前裁剪 window 内容
GlobalWindow 全时间轴单窗,需自定义 Trigger
Allowed lateness 关窗后仍接受迟到的宽限

十六、小结

Tumbling state 最省、口径最像批式 time bucket;Sliding\(L/S\) state 换平滑输出;Session 适应 不规则间隔 但 merging 与 checkpoint 更复杂。Trigger 决定 早输出还是只 end 输出Evictor 仅在小集合 Process 场景有用。窗口 state 是 Keyed State 最大头之一,设计时先问能否 Tumbling + 下游批聚合 替代大滑动窗。

下一篇进入 Kafka 日志模型与分区:offset、segment 文件、分区内有序如何支撑 keyBy 前的 log 分区策略第 4 篇)。


参考资料

  1. Apache Flink Documentation, Windows(Tumbling / Sliding / Session、Trigger、Evictor、allowed lateness)。
  2. Apache Flink Documentation, Timely Stream Processing(event-time 窗口与 watermark)。
  3. Apache Flink 源码 WindowOperatorMergingWindowSetGlobalWindows(版本随 release 标注)。
  4. Akidau, T. et al., The Dataflow Model(窗口与 trigger 抽象)。
  5. 本系列 第 2 篇第 8–9 篇第 13 篇
  6. lakehouse 第 19 章(入湖与流聚合分工)。

返回 系列目录 | 上一篇:事件时间、处理时间与 Watermark | 下一篇:Kafka 日志模型与分区

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】状态放大、Compaction 与调优

在 RocksDB state backend 读写路径之上,拆解窗口 state 膨胀、LSM 写放大与 checkpoint 争抢磁盘、Flink managed memory 与 RocksDBOptionsFactory 调参边界,以及 hot key 导致单 subtask 过热时的诊断与「改 state 设计 vs 拧参数」取舍。

2026-07-01 · database / distributed

【流式数据处理】事件时间、处理时间与 Watermark

拆解 event time、processing time、ingestion time 三种时间语义,给出 watermark 的形式化含义与 bounded-out-of-orderness 等生成策略,并说明侧输出、allowed lateness 如何处理迟到数据;附 event-time 与 processing-time 窗口对比的可复现实验步骤。

2026-07-01 · database / distributed

【流式数据处理】Flink 运行时模型

从 JobManager、TaskManager、Slot 到 StreamGraph→JobGraph→ExecutionGraph 的四层编译链,讲清 operator chain、并行度、SlotSharingGroup 如何决定任务在集群上的物理形态,并与 Kafka 消费位点提交分工衔接。


By .