批量写湖一天提交一次,一次几百个大文件,表格式只需把这批文件原子地挂进一个新
snapshot。流式写湖完全是另一回事:一个 Flink 作业每 30
秒做一次 checkpoint,就意味着每 30 秒要往表里提交一次;CDC
把上游 MySQL 的 UPDATE、DELETE
实时同步过来,要求湖表支持按主键更新和删除,而不只是
append。
这带来三个具体问题:
- 提交频率:流式作业按 checkpoint 间隔提交,几十秒一次,一天上千次提交,每次提交都生成小文件和新元数据。小文件不治理,查询端的 planning 和扫描都会被拖垮(见第 17 篇)。
- exactly-once:作业失败重启、checkpoint 回滚时,怎么保证「同一批数据不会在湖里出现两次,也不会丢」。这要求引擎的 checkpoint 语义和表格式的原子提交协议严丝合缝地对齐。
- upsert / delete:CDC 流里有
INSERT、UPDATE、DELETE。湖表默认只会 append,要实现按主键更新,必须落到第 10 篇讲的行级删除机制上。
本文只讲入湖侧:数据如何稳定、不重不漏地落进表格式,以及落进去之后的小文件治理。流处理引擎本身的事件时间、窗口、状态后端、容错语义不展开,那是另一条主线的内容。
环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2)、i9-12900K / 32 GB,未安装 JVM、Flink、Kafka、Spark。本文涉及 Flink/Kafka Connect/Spark 的配置与命令均为按官方文档可复现的步骤,不粘贴未执行的输出,也不伪造任何吞吐/延迟数字。表格式侧的提交语义来自各项目规范与文档。
一、流式入湖的总体形状
把流式入湖拆成三段:source → 引擎算子 → sink 提交。表格式只参与最后一段。
flowchart LR
SRC[上游: Kafka / CDC] --> ENG["流处理引擎<br/>(Flink / Spark / Connect)"]
ENG --> W[Writer 算子<br/>写数据文件到对象存储]
W --> C[Committer<br/>按 checkpoint 提交]
C --> T[("表格式<br/>新 snapshot / commit")]
T --> Q[查询引擎读到]
关键在于 writer 和 committer 是分离的:writer 持续把记录写成对象存储上的数据文件(Parquet),但这些文件在 committer 提交之前对读端不可见。提交这一步才是表格式的原子操作——把已写好的文件挂进一个新 snapshot(Iceberg)或写一条 commit(Delta)。提交频率因此等于「writer 攒多久的数据算一批」,而这通常由引擎的 checkpoint 间隔决定。
这个分离正是 exactly-once 的着力点:数据文件可以提前写、可以写重复(失败重试),只要提交是幂等且原子的,最终可见的就只有被成功提交的那一份。
二、Sink 怎么提交
Flink + Iceberg:基于 checkpoint 的两阶段提交
Iceberg 的 Flink sink(FlinkSink /
IcebergSink)把写入拆成两个算子:IcebergStreamWriter
负责写数据文件,IcebergFilesCommitter
负责提交。提交挂在 Flink 的 checkpoint
机制上,是一个标准的两阶段提交(Flink 文档称为 “Two-Phase
Commit”):
- pre-commit(snapshotState):checkpoint
触发时,writer 把当前攒的数据文件
flush、关闭,把这批文件的清单(
DataFile列表)作为算子状态存进 Flink 的 checkpoint。此时文件已在对象存储上,但还没进 Iceberg 表。 - commit(notifyCheckpointComplete):当 Flink 确认这次 checkpoint 全局完成,回调 committer,由它把这批文件作为一次 Iceberg append/overwrite 提交进表,生成新 snapshot。
为什么必须等 checkpoint 完成再提交?因为 Flink 保证 checkpoint 完成意味着上游 offset 也一并持久化了。万一在 commit 之前作业崩了,重启会从上一个 checkpoint 恢复,重放这批数据、重写文件、重新提交——旧的、未提交的孤儿数据文件不会进表,不影响正确性(但需要后续清理,见第六节)。
一个典型的 Flink SQL 建表与写入(DDL/INSERT 形式,按 Iceberg Flink 文档):
-- 注册 Iceberg catalog(REST catalog 为例)
CREATE CATALOG ice WITH (
'type'='iceberg',
'catalog-type'='rest',
'uri'='http://catalog:8181',
'warehouse'='s3://bucket/wh'
);
CREATE TABLE ice.db.events (
id BIGINT,
ts TIMESTAMP(6),
payload STRING,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (days(ts));
-- 流式写入;checkpoint 间隔即提交间隔
INSERT INTO ice.db.events SELECT id, ts, payload FROM kafka_src;提交间隔由 Flink 的
execution.checkpointing.interval 决定。把它从
30 秒拉长到 5
分钟,提交次数和小文件数立刻降一个数量级——这是流式入湖第一个要调的旋钮,代价是数据可见延迟变大。
upsert:用 equality delete 表达主键更新
append 只能加行。要让 CDC 的
UPDATE/DELETE 生效,Iceberg Flink
sink 提供 upsert 模式:声明相等字段(equality
field,通常是主键),sink 在写新版本行的同时,写一条
equality
delete,逻辑含义是「删掉这些键的旧值」。读端把 data
file 和 equality delete
合并,得到主键的最新版本(行级删除机制见第 10
篇)。
-- 开启 upsert:按主键去重更新
CREATE TABLE ice.db.users (
uid BIGINT,
name STRING,
updated_at TIMESTAMP(6),
PRIMARY KEY (uid) NOT ENFORCED
) WITH ('write.upsert.enabled'='true');upsert 模式的代价是读放大:equality delete 要对每个相关 data file 生效,读时要做 anti-join。所以 upsert 表更依赖定期 compaction 把 delete 合进 data file(merge-on-read → 重写)。
Kafka Connect:用控制主题协调提交
不想拉起 Flink 集群时,可以用 Iceberg 自带的 Kafka Connect Sink Connector,直接把 Kafka topic 落进 Iceberg。它的难点在于:Kafka Connect 有多个并行 task,每个 task 各写各的文件,但 Iceberg 一次提交必须是全局一致的一批。
Iceberg 的 Connect sink 用一个控制主题(control
topic) 做协调:一个 coordinator 周期性发起
commit,各 worker task
把自己这一轮写好的数据文件清单发到控制主题,coordinator
收齐后做一次 Iceberg 提交。提交周期由
iceberg.control.commit.interval-ms
控制,本质上和 Flink 的 checkpoint 间隔扮演同样角色。
# Iceberg Kafka Connect Sink(按官方 connector 配置)
connector.class=org.apache.iceberg.connect.IcebergSinkConnector
topics=cdc.users
iceberg.tables=db.users
iceberg.catalog.type=rest
iceberg.catalog.uri=http://catalog:8181
iceberg.control.commit.interval-ms=300000
Spark Structured Streaming
Spark 这边,Iceberg/Delta 都把流式写做成标准的
DataStreamWriter。每个 micro-batch
对应一次提交,trigger
控制批间隔;checkpointLocation 存 offset 与提交进度,承担和
Flink checkpoint 同样的「失败可重放」职责。
df.writeStream
.format("iceberg") // 或 "delta"
.outputMode("append")
.option("checkpointLocation", "s3://bucket/ckpt/users")
.trigger(Trigger.ProcessingTime("5 minutes"))
.toTable("db.users")三种 sink 的共性很清楚:都把一段时间的写入攒成一批,按引擎的检查点边界,做一次表格式的原子提交。区别只在批边界由谁定义、多个并行写者如何对齐。
三、exactly-once 如何对齐
exactly-once 不是单靠表格式或单靠引擎能给的,它是两边协议拼出来的。拆成三条不变量:
- 数据文件可重写、不可半可见:writer 写的文件在提交前对读端不可见。失败重试重写文件,只是多了孤儿文件,不会让读端看到重复数据。
- 提交幂等:同一个 checkpoint/epoch 的提交,即使因故障重复触发,也只能在表里生效一次。Flink 的 committer 会记录已提交的 checkpoint id;Iceberg 提交本身是基于当前 snapshot 的乐观 CAS(见第 11 篇),重复提交会被识别。
- offset 与提交同生死:上游消费位点(Kafka offset)和表的提交绑定在同一个 checkpoint 里。恢复时 offset 回到上一次成功提交的位置,重放的数据正好覆盖未提交的那一段。
sequenceDiagram
participant F as Flink Checkpoint
participant W as Writer
participant C as Committer
participant T as Iceberg 表
F->>W: ckpt-N 触发, flush 文件
W-->>F: 文件清单存入 ckpt 状态
F->>F: ckpt-N 全局完成(含 offset)
F->>C: notifyCheckpointComplete(N)
C->>T: 提交这批文件 -> snapshot
Note over C,T: 崩溃发生在此之前 -> 重启重放<br/>未提交文件成孤儿, 不影响正确性
需要强调边界:表格式只保证「提交是原子且幂等的」这一半,另一半(offset
与 checkpoint
一致、状态可恢复)由引擎负责。任何一边没做对,exactly-once
都不成立。比如用 foreachBatch
自己写提交逻辑而没有把 batchId 做幂等,就会在 Spark 重试
micro-batch 时产生重复。
四、CDC 入湖
CDC(Change Data Capture)把上游数据库的行级变更流式同步进湖。典型链路:
flowchart LR
DB[(MySQL/PG)] -->|binlog/WAL| DBZ[Debezium]
DBZ -->|change events| K[Kafka]
K --> SINK[Flink / Connect / Hudi Streamer]
SINK --> LAKE[("Iceberg / Hudi / Delta")]
Debezium 把每条变更包成带 op
字段的事件(c 创建、u
更新、d 删除,r 快照读),含
before/after
两个镜像。入湖侧要把这些变更按主键 reduce
成当前状态,三种表格式给了三条路:
| 表格式 | upsert 机制 | 关键依赖 |
|---|---|---|
| Iceberg | equality delete(删旧键)+ 新行 | merge-on-read,定期 compaction |
| Hudi | record-level index 定位旧记录所在 file group,写进 log file | record index + MoR base/log |
| Delta | MERGE INTO 或 Change Data Feed |
deletion vector / 重写 |
Hudi 在这条链路上有专门工具 Hudi Streamer(旧称
DeltaStreamer),内建 Debezium source 与去重逻辑;它强在
upsert,是因为 record-level index
能直接把一个主键映射到它所在的 file
group,避免全表扫描找旧值(见第 13 篇)。Iceberg 走
equality delete 的代价前面说过:读放大,靠 compaction
偿还。Delta 侧常用 MERGE INTO 把一个
micro-batch 的变更合并进目标表,或开启 Change Data Feed
让下游再订阅变更。
一个 Flink CDC 直接入 Iceberg 的形态(Flink CDC connector + Iceberg upsert 表):
-- 上游用 Flink CDC 直连 MySQL binlog
CREATE TABLE mysql_users (
uid BIGINT, name STRING, updated_at TIMESTAMP(3),
PRIMARY KEY (uid) NOT ENFORCED
) WITH ('connector'='mysql-cdc', 'hostname'='...', 'database-name'='app', 'table-name'='users');
-- 落进 upsert 模式的 Iceberg 表
INSERT INTO ice.db.users SELECT * FROM mysql_users;主键的语义必须从上游一路保持到湖表的 equality field / record key,否则去重就错位。这是 CDC 入湖最容易踩的坑:上游复合主键、分库分表后主键不全局唯一,到了湖表 upsert 就会把不同库的同 id 行互相覆盖。
五、小文件与 compaction 的拉扯
流式入湖和小文件治理是一对天然矛盾:
- 提交越频繁,数据可见延迟越低,但小文件越多、equality delete 越多。
- compaction 在重写文件,而流式写入在持续产生新文件和新提交,两者会抢同一张表的提交权,触发乐观并发冲突(第 11 篇)。
实践上的几个着力点:
- 拉长提交间隔:把 checkpoint/commit interval 调到业务能接受的可见延迟上限,是性价比最高的减小文件手段。
- 写端预聚合:Flink 端按分区/桶做 keyby,让每个并行 writer 只写少数分区,减少每次提交里的文件碎片。
- 异步 compaction 错峰:把
rewrite_data_files放到独立作业,避开写入高峰,降低提交冲突概率;冲突时靠乐观重试兜底。 - 及时清理 delete 与孤儿文件:upsert 表的
equality delete 累积会让读越来越慢,compaction 时把 delete
合进 data file;失败重写留下的孤儿文件用
remove_orphan_files清(见第 20 篇运维清单)。
换句话说,流式入湖不是「写进去就完了」,它必然附带一套后台维护作业。把维护作业算进容量规划,是流式湖仓能长期稳定的前提。
六、边界
本文只覆盖入湖侧。明确不展开、留给后续可能的独立「流处理」系列的内容:
- 流处理引擎的事件时间、watermark、窗口、乱序处理。
- 状态后端(如 RocksDB state backend)的实现与调优。
- 引擎自身的容错与 checkpoint 算法(Chandy-Lamport 变体)内部机制。
本文用到这些概念时,只把它们当作「提交边界的来源」,不深入其内部。从表格式的视角,流式写入和批量写入没有本质区别——都是「写文件 + 原子提交」,区别只是提交的频率、批的大小,以及由此衍生的小文件与并发治理压力。
参考资料
- Apache Iceberg Documentation, Flink
Writes(
FlinkSink/IcebergSink、IcebergFilesCommitter、基于 checkpoint 的两阶段提交、write.upsert.enabled、equality field)。A 级(官方文档)。 - Apache Iceberg Documentation, Kafka
Connect(Iceberg Sink Connector、control topic
协调提交、
iceberg.control.commit.interval-ms)。A 级。 - Apache Flink Documentation, Checkpointing 与 Fault Tolerance Guarantees(exactly-once、two-phase commit sink、notifyCheckpointComplete 语义)。A 级。
- Debezium Documentation, Connector for MySQL /
Event
structure(
op、before/after、snapshot 与 streaming 阶段)。A 级。 - Apache Hudi Documentation, Hudi Streamer(原 DeltaStreamer)、Record-level Index、Writing Data(upsert)。A 级。
- Delta Lake Documentation, Table streaming reads and writes、Change Data Feed、MERGE INTO。A 级。
- 本系列:第 10 行级删除与 Merge-on-Read、11 提交协议与并发控制、13 Apache Hudi、17 小文件与 Compaction、20 选型、迁移与运维 篇。
返回 系列目录 · 上一篇 查询引擎如何读湖 · 下一篇 选型、迁移与运维
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【流式数据处理】Kafka · Flink · 状态 · Exactly-Once
承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。
【流式数据处理】Debezium 与 Change Data Capture
从 Debezium 变更事件信封(op、before/after、source)入手,拆解 snapshot 与 streaming 两阶段、Kafka Connect 的 connector task 与 offset/schema history 主题,并说明引擎侧如何保证主键顺序与幂等,衔接 lakehouse 第 19 章 upsert 入湖。
【数据湖与开放表格式】Lakehouse 全景:从 Hive 表到开放表格式
Hive 目录式分区表把『表』等同于『一组目录加 metastore 里的分区行』,于是没有原子提交、planning 要 LIST 目录、schema 与分区演进常要重写。本文用这三个硬伤切入,讲清 lakehouse 把表拆成『不可变数据文件 + 可变元数据指针 + catalog』三层后各自解决了什么,并给出全系列的分层地图。
【数据湖与开放表格式】Parquet 文件格式深拆
拆 Parquet 的物理结构:file → row group → column chunk → page,footer 里的 FileMetaData(Thrift)与 PAR1 magic。讲清 PLAIN/RLE-bitpacking/字典/DELTA_BINARY_PACKED/BYTE_STREAM_SPLIT 各自压谁,Dremel 的 repetition/definition level 如何表达嵌套,column index/offset index 与 split-block bloom filter 怎样让谓词在读盘前裁掉 page。基于本机 pyarrow 24.0.0 真实 dump footer 与编码。