lakehouse
第 19 章
已经把流式入湖的表格式侧讲透:IcebergStreamWriter
与 IcebergFilesCommitter 分离、checkpoint
完成后的 notifyCheckpointComplete、equality
delete upsert、Connect control topic 协调——都是 sink
如何把文件挂进 snapshot 的问题。
运维 Flink CDC 入湖作业时,遇到的却是另一组旋钮:
execution.checkpointing.interval从 30 秒调到 5 分钟,小文件数立刻下来,但 lag 与数据可见延迟怎么估?- Web UI 显示 sink 算子 backpressure HIGH,Iceberg 表的 commit 间隔变长——是 catalog 慢还是 checkpoint 对齐卡住?
- 并行度 32 的 writer 每次 checkpoint
各写一个文件,compaction
作业与写入作业抢提交,频繁
CommitFailedException(乐观 CAS 失败,见 lakehouse/11)。
本文从 Flink 作业侧回答这些问题,与 lakehouse/19 对读,不重复表格式两阶段提交协议全文。读 lakehouse/19 搞清「提交是什么」;读本文搞清「作业上该拧哪些螺丝、和表治理怎么分工」。
环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2)、i9-12900K / 32 GB,未安装 JVM、Flink、Kafka、Iceberg。下文配置与公式按 Apache Flink / Iceberg 官方文档可复现;不粘贴未执行环境的 metrics 截图,也不伪造 benchmark 数字。小文件数量级关系用符号推导与 lakehouse/17 的实验口径对齐(该篇有 PyIceberg 实测 planning 耗时)。
版本锚定:Flink 1.20+ / 2.x;Iceberg Flink sink 以官方 Flink Writes 文档为准;CDC 上游事件模型见本系列第 16 篇。
一、与 lakehouse/19 的分工
先把责任划清,避免两篇文章重复读同一段协议。
| 问题 | lakehouse/19 | 本文(引擎侧) |
|---|---|---|
| Writer 写 Parquet、Committer 调 catalog | 详述 pre-commit / commit | 只引用,不展开 CAS |
| equality delete upsert 机制 | 详述 | 假设已读,讲 Flink DDL 与 keyBy |
| exactly-once 三不变量 | 表格式 + offset 绑定 | checkpoint 间隔、背压对 commit 时刻的影响 |
| Kafka Connect control topic | 详述 | 对比 Flink checkpoint 边界 |
| 小文件根因 | 提交频率表格式视角 | 作业参数如何改变提交频率 |
| compaction | 指向 lakehouse/17 | 调度错峰与写入并行度配合 |
| 乐观并发冲突 | 指向 lakehouse/11 | 并行 writer 数如何放大冲突率 |
flowchart TB
subgraph Engine["Flink 作业侧(本文)"]
CI[checkpoint interval]
BP[背压 / 缓冲]
PAR[并行度 / keyBy / bucket]
PA[预聚合]
end
subgraph Lake["表格式侧(lakehouse/19)"]
W[Writer 写文件]
C[Committer 2PC]
T[Snapshot CAS]
end
subgraph Gov["表治理(lakehouse/17/20)"]
CP[rewrite_data_files]
EO[expire_snapshots]
end
CI --> C
BP --> CI
PAR --> W
PA --> W
C --> T
CP --> T
W --> C
二、checkpoint 间隔 ↔︎ 提交频率 ↔︎ 小文件
2.1 等式关系
对 Flink + Iceberg sink(FlinkSink / SQL
INSERT INTO iceberg 表),在默认 aligned
checkpoint、启用 exactly-once 时:
\[ \text{Iceberg 提交次数/天} \approx \frac{86400}{\Delta_{\text{ckpt}}} \]
其中 \(\Delta_{\text{ckpt}}\) =
execution.checkpointing.interval(秒)。每次成功
checkpoint 触发一次 notifyCheckpointComplete →
一次 append/upsert commit → 至少一批新 data file(及 upsert
时的 equality delete file)。
若作业并行度为 \(P\),writer 算子每个 subtask 独立滚动文件,单次提交产生的新文件数上界约为 \(P\)(实际可能更少,若部分 subtask 无数据):
\[ \text{新 data file 数/天} \lesssim P \times \frac{86400}{\Delta_{\text{ckpt}}} \]
upsert 模式还可能每条 equality delete 批次附加 delete file,读放大见 lakehouse/19 与 第 10 篇。
例子(数量级,非实测):\(P=16\),\(\Delta_{\text{ckpt}}=30\,\text{s}\) → 约 \(16 \times 2880 \approx 46000\) 个新文件/天/表。把 interval 调到 300 s,同式得约 4600——降一个数量级,可见延迟上界从约 30 s 变为约 5 min。业务能否接受,是产品问题;引擎侧这是第一个旋钮。
2.2 不止 interval:影响 commit 边界的参数
| Flink 配置 | 对提交的影响 |
|---|---|
execution.checkpointing.interval |
主旋钮:目标 commit 周期 |
execution.checkpointing.min-pause |
两次 checkpoint 最小间隔,防止 commit 风暴 |
execution.checkpointing.timeout |
超时则 ckpt 失败,本次不 commit,文件堆积在 writer |
execution.checkpointing.max-concurrent-checkpoints |
通常为 1;>1 时 commit 顺序与语义需额外小心 |
execution.checkpointing.unaligned |
缩短 backpressure 下 barrier 对齐时间,间接减少 commit 延迟抖动 |
Iceberg write.target-file-size-bytes |
不减少 commit 次数,但减少「未写满就 commit 的碎文件」 |
lakehouse/19 第二节写「把 interval 从 30 s 拉到 5 min」——本文补全:并行度 \(P\) 与 upsert delete file 乘在分子上;只调 interval 不调并行度,小文件仍可能很多。
2.3 与 Spark micro-batch 的对照(点到为止)
Spark Structured Streaming 用
trigger(ProcessingTime(...))
定义批边界,checkpointLocation 存 offset;Iceberg 每
micro-batch 一次 commit。语义上 \(\Delta_{\text{batch}}\) 等价于
Flink 的 \(\Delta_{\text{ckpt}}\)。引擎不同,小文件公式同形:批越碎,文件越多。第
18
篇对照表会收束多引擎差异。
2.4 Kafka Connect Iceberg sink
不走 Flink checkpoint 时,提交周期由
iceberg.control.commit.interval-ms
决定(lakehouse/19
第二节)。没有背压统一模型——各 task
自写自报,coordinator 收齐 commit。调 interval 的权衡与
Flink 相同,但 lag 表现为 Connect
source/sink task 的
records-lag-max,而非 Flink backpressure
比例。
三、背压如何传导到 commit 延迟
3.1 背压链
Flink 使用 credit-based flow control(详见第 18 篇)。下游算子消费慢 → 上游 channel credits 耗尽 → 反压传播至 source。对 Iceberg sink 管道:
flowchart LR
SRC[Kafka source] --> OP[map / keyBy / 窗口]
OP --> WR[IcebergStreamWriter]
WR --> CM[IcebergFilesCommitter]
CM --> CAT[REST / JDBC Catalog]
CAT --> OBJ[对象存储 PUT]
背压可能发生在:
- Writer:对象存储 PUT 慢、Parquet 编码 CPU 饱和;
- Committer:catalog CAS 重试、REST 延迟;
- 上游:CDC 解析、keyBy 热点、RocksDB state 读放大(第 12、13 篇)。
背压的直接后果不是「跳过 commit」,而是 checkpoint 对齐变慢:
- Barrier 无法在所有 subtask 同时 ack;
checkpoint duration拉长,接近checkpoint.timeout时失败;- 失败则 notifyCheckpointComplete 不调用,Iceberg 少一次 commit,writer 缓冲膨胀,lag 上升。
因此 Web UI 上 sink backpressure 100% 与 commit 间隔变长常同时出现——根因可能是 writer 慢,也可能是 committer 等 catalog,需要看 checkpoint duration 与 Iceberg commit metrics 分拆,不能单看一个算子颜色。
3.2 背压 vs 故意拉长 interval
| 手段 | commit 频率 | 数据可见延迟 | 风险 |
|---|---|---|---|
| 增大 checkpoint interval | 下降 | 上升(设计如此) | 业务 SLA |
| 背压导致 ckpt 超时 | 非受控下降 | 不可预测抖动 | lag 堆积、state 变大 |
| unaligned checkpoint | 不直接改频率 | 缩短 barrier 等待 | 状态体积略增 |
运维目标:用 interval 做有意识的权衡,而不是让背压被动拉长 effective interval。第 18 篇给背压诊断清单;本篇强调与 commit 延迟的因果关系。
3.3 调参顺序(工程判断)
- 确认 lag 来自 消费慢 还是
生产过快(Kafka
records-lag-maxvs FlinknumRecordsOutPerSecond)。 - 若 writer/committer 背压:查对象存储 throttling、catalog p99、并行 writer 是否过多文件 PUT 并发。
- 若上游算子背压:查 keyBy 倾斜、state 大小、GC(第 13 篇)。
- 在吞吐达标后,再调 interval 平衡小文件——避免「背压已经让 commit 稀疏,再加大 interval」双重拉高可见延迟。
四、并行 writer 与 commit 冲突
4.1 为何并行度放大冲突
每次 checkpoint,\(P\) 个 writer subtask 各产出文件清单,一个 committer 把合并后的清单提交给 catalog(lakehouse/19)。提交本身是 基于当前 snapshot 的单点 CAS(lakehouse/11)。
冲突来自其它写者同时改表:
- 另一个 Flink 作业写同表;
- 异步
rewrite_data_filescompaction 作业; - 人工
INSERT或 Spark batch overwrite。
Flink 单作业内 committer 通常 串行(单并行度),不会自己和自己 CAS 撞。冲突率是 表级并发提交频率 的函数,不是 \(P\) 的直接线性函数——但 \(P\) 大时 每次 commit 触发的 metadata 变更更重(manifest 条目多),重试成本更高;且 commit 更频繁(若 interval 固定)时与 compaction 撞车概率上升。
4.2 与 lakehouse/11 乐观重试的配合
Iceberg 冲突时 committer 应 基于新 snapshot
重试(Flink Iceberg sink 内置重试逻辑,受
catalog 与版本影响)。Flink 作业侧可做的:
| 做法 | 作用 |
|---|---|
| 降低写入作业 commit 频率(调 interval) | 降低与 compaction 时间重叠 |
| compaction 独立作业 + 低峰 cron | 冲突概率下降 |
| 避免多作业写同表 | 消除跨作业 CAS |
| 升级 REST catalog 后端吞吐 | 缩短 commit 持锁窗口 |
不能靠「把 writer 并行度设为 1」消除冲突——那只减少每 commit 文件数,不消除 compaction 并发。正确是 治理提交时间表,不是盲目减 \(P\)。
4.3 upsert 加剧 metadata 压力
upsert 每次 commit 可能新增 equality delete file;读路径 merge-on-read 变重(lakehouse/19)。writer 并行度越高,同一 checkpoint 内 delete + data 文件对 越多,manifest 膨胀越快——planning 变慢(lakehouse/17 第二节有 200 小文件 planning 184 ms 量级实测)。引擎侧应配合 更长的 interval 与 定期 compaction,而不是无限堆 \(P\)。
五、写端预聚合:在 Flink 里减文件、减 commit 体积
预聚合指 在到达 Iceberg writer
之前减少记录数或把多条变更合并成一条。CDC
场景尤其有效:同一主键 1 秒内十次
UPDATE,若逐条入湖产生十条
delete+insert,预聚合后可能只剩
一条最终态。
5.1 典型模式
| 模式 | 实现 sketch | 状态 | 适用 |
|---|---|---|---|
| Keyed 最新态 | keyBy(pk) + ProcessFunction
保留 last after |
每键一条 state | CDC upsert 前 |
| 微窗口合并 | TumbleEventTimeWindows +
LastValue |
窗口 state | 可接受秒级延迟 |
| 序列化 changelog | MiniBatch /
buffer-timeout(Flink SQL) |
算子缓冲 | 高吞吐 SQL 管道 |
Flink SQL 示例(概念 DDL,非完整作业):
-- 假设 kafka_cdc 已解析 Debezium 为 upsert changelog
CREATE VIEW users_latest AS
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY uid ORDER BY updated_at DESC) AS rn
FROM kafka_cdc
) WHERE rn = 1;
INSERT INTO ice.db.users SELECT uid, name, updated_at FROM users_latest;注意:ROW_NUMBER 需要 状态
与 watermark(第 2、3
篇);updated_at
必须可信,否则乱序下会覆盖错行。
5.2 与 lakehouse/19 的分工
- lakehouse/19:表格式如何表达 upsert(equality delete)。
- 本文:是否在 Flink 内 先 reduce changelog,再交给 sink。reduce 做得越狠,equality delete 越少,但 状态越大、恢复越慢——这是引擎 trade-off,表格式层看不见。
5.3 边界
预聚合无法替代 delete 语义:若 ten 次
update 后接一条 delete,reduce 必须输出 delete 或
tombstone,不能静默丢 delete。Keyed state 里应存
RowKind 或 op 字段(衔接第 16 篇
envelope)。
六、Bucket 分区与
keyBy:控制文件布局
6.1 问题
默认 hash 分区写入 Iceberg 表时,Flink writer 按
Iceberg 分区 spec 滚动文件。若表
PARTITIONED BY (days(ts)) 而 CDC 事件
ts 分散,单次 checkpoint 可能
每个活跃日分区各出一个文件——一次 commit
产生「分区数个文件」,加剧小文件。
6.2 Bucket 表(Iceberg hidden partition / bucket transform)
Iceberg 支持 bucket(N, col) transform。Flink
侧应:
CREATE TABLE ice.db.users (
uid BIGINT,
name STRING,
updated_at TIMESTAMP(6),
PRIMARY KEY (uid) NOT ENFORCED
) PARTITIONED BY (bucket(16, uid))
WITH ('write.upsert.enabled'='true');引擎侧 keyBy(uid) 使同一
uid 的变更进入同一 subtask,与
bucket(16, uid) 对齐时,每个 subtask 主要写固定
bucket,单次 commit 文件数 ≈ min(P, bucket
数) 而非按业务日期扩散。
6.3 与 Debezium 分区对齐
Debezium 把 主键 作为 Kafka record key →
同一 key 进同一 Kafka 分区 → Flink KafkaSource
若 不改变并行度与 keyBy 链,可维持 per-key
顺序(第 16 篇第十节)。入湖前 keyBy 与 Iceberg
bucket 数 不必相等,但 bucket 数过大(如
256)且 \(P\) 小会导致空
bucket 文件碎片化。
6.4 分工表
| 层级 | 负责 |
|---|---|
| Kafka key | per-key 顺序到 consumer |
Flink keyBy |
subtask 本地序 + state 分片 |
| Iceberg partition spec | 对象存储路径与 pruning |
| lakehouse/17 compaction | 合并同分区碎文件 |
七、异步 compaction 与写入作业的调度分工
lakehouse/17 详述
rewrite_data_files、bin-pack、expire snapshots
等 表治理操作。流式场景下 compaction 与写入
抢同一张表的 CAS——已在 lakehouse/19
第五节点名。
7.1 引擎侧能做的
- 时间错峰:compaction 作业 cron 在写入低峰(如凌晨),与 Flink 高峰 checkpoint 错开。
- 独立 Flink/Spark 作业:compaction 不嵌入 CDC 主作业,避免单作业内 writer+compaction 双 role。
- 监控 commit 失败率:Flink REST / Iceberg metrics 中 commit retry 次数上升 → 优先调 compaction 窗口,而非盲目加 writer 并行度。
- 配合
rewrite-data-files目标大小:与write.target-file-size-bytes同量级(lakehouse/17),减少「compaction 完又立刻被碎 commit 拆散」。
7.2 引擎侧不能替代的
- expire snapshots、remove orphan files:纯 catalog/运维(lakehouse/20 运维清单),Flink checkpoint 不负责。
- equality delete 合并进 data file:必须 compaction(MoR → copy-on-write 段),引擎无法靠调 interval 单独完成。
gantt
title 单日提交与 compaction 错峰(示意)
dateFormat HH:mm
section 写入作业
checkpoint commit :active, w1, 00:00, 24h
section compaction
rewrite_data_files :crit, c1, 02:00, 2h
7.3 CDC + upsert 的特殊点
upsert 表 compaction 还要 合并 delete
file(lakehouse/10)。CDC 删除比例高时,delete file
增长快于 data file——仅调 checkpoint interval 不够,必须
更频繁的 rewrite(表治理),与第 16 篇
op=d 流量相关。
八、Flink 作业配置清单(入湖向)
下列配置构成「引擎侧最小集合」,表格式 DDL 见 lakehouse/19。
# flink-conf.yaml 片段
execution.checkpointing.interval: 120s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 10min
execution.checkpointing.min-pause: 30s
execution.checkpointing.unaligned: true
state.backend.type: rocksdb
state.backend.incremental: trueSQL / Table API 侧:
-- 与 interval 一致的业务可见延迟预期
SET 'execution.checkpointing.interval' = '2 min';
SET 'execution.checkpointing.timeout' = '8 min';
SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- 若已在视图层 reduceIceberg table 属性(catalog 侧,非 Flink 独有):
ALTER TABLE ice.db.users SET (
'write.target-file-size-bytes' = '536870912',
'write.upsert.enabled' = 'true',
'commit.retry.num-retries' = '4'
);commit.retry.num-retries 在冲突时与
lakehouse/11 重试哲学一致;Flink committer 仍依赖
成功 checkpoint 才触发 commit——retry 救不了
checkpoint 本身失败。
九、端到端延迟预算(符号化)
端到端可见延迟上界可粗分为:
\[ L_{\text{e2e}} \lesssim L_{\text{cdc}} + L_{\text{kafka}} + L_{\text{flink}} + \Delta_{\text{ckpt}} + L_{\text{commit}} \]
| 项 | 含义 | 谁调 |
|---|---|---|
| \(L_{\text{cdc}}\) | Debezium snapshot/stream 滞后 | Connect、binlog |
| \(L_{\text{kafka}}\) | 生产消费 lag | 分区数、consumer |
| \(L_{\text{flink}}\) | 算子处理 + 背压 | 并行度、state |
| \(\Delta_{\text{ckpt}}\) | 本文主旋钮 | interval |
| \(L_{\text{commit}}\) | catalog + 对象存储 | 表治理、REST |
lakehouse/19 强调 \(\Delta_{\text{ckpt}}\) 与 commit 绑定;本文强调 背压增大 \(L_{\text{flink}}\) 会间接增大 effective \(\Delta_{\text{ckpt}}\)(checkpoint 对齐等待),二者叠加时不要重复归因。
十、CDC 入湖作业检查表
上线前按顺序核对:
- 主键全局唯一(第 16 篇):分库分表是否合成键。
- Iceberg upsert
已开,
PRIMARY KEY与 equality field 一致(lakehouse/19)。 - checkpoint interval 与 SLA 对齐,估算 \(P \times 86400/\Delta_{\text{ckpt}}\) 文件量。
keyBy(pk)与 bucket 分区 spec 是否一致。- 是否需要 预聚合 降 changelog 体积。
- compaction 作业 是否独立、是否错峰(lakehouse/17)。
- 监控:
lastCheckpointDuration、numRestarts、IcebergCommit失败、Kafka lag。 - savepoint 升级 规则(第 11 篇):改并行度后 state 与 writer 对齐。
十一、Hudi / Delta 引擎侧差异(一句边界)
- Hudi:Flink sink 同样挂 checkpoint
2PC;upsert 依赖 record index(lakehouse/13)。引擎侧
bucket/index 类型 与
keyBy对齐更重要,小文件公式仍适用。 - Delta:Structured Streaming
merge或 Delta Flink connector;checkpoint 边界由 Delta 事务 log 衔接。冲突模型不同,但 interval ↔︎ micro-batch 频率 同形。
三格式协议细节留在 lakehouse 系列;本篇旋钮思想可迁移,具体类名查各引擎文档。
十二、边界
本文不重复:
- Iceberg
SnapshotProducer、manifest 写入步骤(lakehouse/8–11); - Debezium envelope 与 snapshot 模式(本系列第 16 篇);
- GenericTwoPhaseCommitSink 状态机全文(第 15 篇);
- 背压 credit 算法与 Web UI 读法(第 18 篇)。
本文覆盖:
- checkpoint interval 与 commit 频率、小文件数量的关系;
- 背压 → checkpoint 延迟 → commit 延迟的传导;
- 并行 writer 与 lakehouse/11 并发冲突的工程配合;
- 预聚合、bucket、
keyBy与异步 compaction 的分工。
参考资料
- Apache Flink Documentation, Checkpointing、Fault Tolerance Guarantees、Back Pressure Monitoring。A 级。
- Apache Iceberg Documentation, Flink
Writes(
IcebergStreamWriter、IcebergFilesCommitter、write.upsert.enabled、write.target-file-size-bytes)。A 级。 - Apache Iceberg Documentation, Kafka
Connect(
iceberg.control.commit.interval-ms,与 Flink 对照)。A 级。 - 本系列:第 10、15、16、18 篇。
- 跨系列:lakehouse/19 流式 CDC 入湖、lakehouse/11 提交并发、lakehouse/17 小文件与 Compaction。
返回 系列目录 · 上一篇 Debezium 与 Change Data Capture · 下一篇 背压、故障模式与引擎对照
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】Checkpoint 机制:Barrier 对齐与一致性快照
从 Chandy-Lamport 分布式快照到 Flink aligned/unaligned checkpoint:CheckpointCoordinator 触发—ack—完成生命周期,Kafka source 如何把 partition offset 写入 checkpoint,以及 interval、timeout、min-pause、concurrent checkpoints 的调优边界。
【流式数据处理】状态放大、Compaction 与调优
在 RocksDB state backend 读写路径之上,拆解窗口 state 膨胀、LSM 写放大与 checkpoint 争抢磁盘、Flink managed memory 与 RocksDBOptionsFactory 调参边界,以及 hot key 导致单 subtask 过热时的诊断与「改 state 设计 vs 拧参数」取舍。
【流式数据处理】背压、故障模式与引擎对照
收束流式数据处理系列:Flink credit-based 背压如何沿算子链传播、Web UI 指标怎么读;数据倾斜、checkpoint 超时连锁、Kafka rebalance 风暴、RocksDB OOM、savepoint 不兼容五类生产故障的诊断与止血;Flink / Kafka Streams / Spark Structured Streaming / RisingWave 在状态模型、交付语义、运维与入湖成熟度上的对照表与选型决策树,不做排名。
【流式数据处理】Kafka · Flink · 状态 · Exactly-Once
承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。