土法炼钢兴趣小组的算法知识备份

【数据湖与开放表格式】Apache Hudi

文章导航

分类入口
databasestorage
标签入口
#hudi#copy-on-write#merge-on-read#record-index

目录

第 12 章 看了 Delta 的有序事务日志,第 8 章 看了 Iceberg 的不可变快照树。Apache Hudi 是第三种开放表格式,它和前两者的根本区别在出发点:Iceberg/Delta 最初是为「大批量写入 + 快照读」设计,Hudi 从 Uber 的场景出发,第一天就把「高频 upsert 和增量消费」当成一等公民。这个出发点决定了它的一切设计:timeline、file group、强索引体系,都是为了「用主键快速定位并更新少量记录」服务。

这一章把 Hudi 的内核拆开:表怎么存(CoW/MoR、file group/file slice、base/log)、操作怎么记(timeline 的 instant)、为什么 upsert 强(索引)、怎么读(三种查询类型)。不写 Hudi 的 SQL/Flink 全套用法,不写云厂商集成。

版本锚定:以 Apache Hudi 官方文档 1.x(页面标注 Version 1.2.0)与 apache/hudi 源码为准。Hudi 1.0 对 timeline 存储做了较大改动(LSM timeline),涉及版本边界处单独标注。本机未安装 Spark/Hudi,实验一节声明环境并给可复现步骤,不贴伪造输出。


一、问题:为什么需要一个「upsert 优先」的表格式

先看 Hudi 要解决的具体痛点。假设有一张上游 MySQL 的 orders 表,通过 CDC(Debezium → Kafka)持续往湖里同步。变化的特征是:

用 Iceberg/Delta 的 copy-on-write 处理这种负载,问题是写放大:改一行就要重写它所在的整个 Parquet 文件。一批变更如果散落在 100 个文件里,就要重写 100 个文件,哪怕每个文件只改了一行。

Hudi 的回答由三块拼成:

  1. 用主键(record key)+ 索引,把「这条记录在哪个文件」做成 O(变更条数) 的查找,而不是 O(全表) 的 join。
  2. MoR 表把更新先写成轻量 log file(追加),异步 compaction 再合并进 base file,摊薄写代价。
  3. timeline 记录每个操作的精确时刻,让「自某时刻以来的增量」可以高效消费。
flowchart LR
  CDC[CDC 变更流] --> IDX[索引: key→file group]
  IDX -->|定位| FG[file group]
  FG --> COW["CoW: 重写 base file"]
  FG --> MOR["MoR: 追加 log file"]
  MOR -.异步.-> CMP[compaction 合并]

这一章的核心论点:Hudi 的复杂度(timeline、索引、file slice)几乎全部是为「高效 upsert + 增量读」付出的代价。如果负载是「一次性大批量写、只读快照」,这套机制的收益就小,Iceberg/Delta 反而更简单(第 14 章会把这个取舍讲透)。


二、.hoodie 目录与 timeline

Hudi 表的元数据放在表根目录下的 .hoodie/。和 Delta 的 _delta_log、Iceberg 的 metadata/ 对应,但模型是 timeline:表上发生的每个动作,都作为一个 instant(时间点)记录在时间线上。

2.1 instant 的三个属性

官方文档(Timeline)定义,每个 action 关联:

状态转换(文档 State Transitions)由独立的状态文件标记,且依赖底层存储的原子操作(如 S3 的 PUT)保证原子与一致:

[ ] -> REQUESTED   计划已生成,尚未开始
REQUESTED -> INFLIGHT   正在执行
INFLIGHT -> INFLIGHT/REQUESTED   执行中可安全失败重试
INFLIGHT -> COMPLETED   成功完成

一个 instant 的「当前状态」是它记录过的最高状态(REQUESTED < INFLIGHT < COMPLETED)。这套三态设计让 Hudi 能区分「计划中/执行中/已完成」,从而支持回滚未完成写入、非阻塞的表服务等。

2.2 动作类型

官方文档(Action Types)列出的核心动作:

动作 含义
COMMIT 一批记录原子写入 base 文件(CoW,或 MoR 的 compaction 产物)
DELTA_COMMIT 一批记录原子写入 MoR 表,部分/全部写到 log(delta)文件
REPLACE_COMMIT 原子替换一组 file group,用于 insert_overwritedelete_partition,以及 clustering
COMPACTION 表服务:把 log 文件合并进 base 文件
LOGCOMPACTION 表服务:把同一 file slice 的多个小 log 文件合并成大 log 文件
CLUSTERING 表服务:按优化排序/布局重写 file group(完成态记为 REPLACE_COMMIT
CLEANS 表服务:删除不再需要的旧 file slice
ROLLBACK 回滚一次失败写入,清除其产生的部分/未提交文件
SAVEPOINT / RESTORE 标记/恢复到某个时间点,用于灾备与时间旅行
INDEXING 表服务:在线构建某类索引,与并发写保持一致

注意一个反直觉点(文档明确):请求/执行态的动作类型,到完成态可能变名字。例如 CLUSTERING 在 requested/inflight 态叫 CLUSTERING,完成态记为 REPLACE_COMMITCOMPACTION 完成时作为 COMMIT 动作出现(产出新 base 文件)。所以「时间线上看到的完成动作」和「调度时的动作」不是一一对应。

2.3 active timeline 与 LSM timeline history(Hudi 1.x)

时间线会无限增长,Hudi 把它分成两层(文档 Timeline Components):

.hoodie/timeline/history/
  _version_                       当前 manifest 版本
  manifest_1
  manifest_2 ...
  <min>_<max>_<level>.parquet     某区间动作明细

归档由 hoodie.keep.min.commits(默认 20)/hoodie.keep.max.commits(默认 30)控制 active timeline 保留多少 instant;LSM 层 compaction 频率由 hoodie.timeline.compaction.batch.size(默认 10)控制。这是 Hudi 1.0 相对早期版本(早期 instant 文件直接平铺在 .hoodie/ 下、文件名形如 <时间戳>.commit)的重要变化。

实例化的 instant 时间用 TrueTime 语义(文档 TrueTime Generation):全局单调递增,由共享时间生成器或「各自生成 + 在分布式锁内等待最大时钟漂移」保证。Hudi 面向 >1 秒的事务,可用 >100ms 的不确定窗口换取极高保真度的时间生成。这点对「按完成时间排序得到可串行化的写入顺序」很关键。


三、存储模型:file group 与 file slice

这是 Hudi 区别于 Iceberg/Delta 最核心的结构。Iceberg/Delta 里「文件」基本是扁平的(一堆 data file + delete file/DV)。Hudi 把一个分区里的文件组织成file group,每个 file group 内部又分file slice

3.1 定义

flowchart TD
  P[分区 date=2026-06-30] --> FG1["file group A (fileId=a)"]
  P --> FG2["file group B (fileId=b)"]
  FG1 --> S1["file slice @t1: base_t1.parquet"]
  FG1 --> S2["file slice @t3: base_t1.parquet + .log_t2 + .log_t3"]
  S2 -.compaction.-> S3["file slice @t4: base_t4.parquet"]

3.2 为什么这么设计

把「同一批记录」固定在同一个 file group,配合索引(第六节)把 key 映射到 fileId,就能做到:

这就是 Hudi「强在 upsert」的结构性根源:file group + 索引把更新限制在局部


四、CoW 与 MoR

Hudi 的两种表类型(文档 Table & Query Types),决定写操作怎么落到 file group 上。

4.1 Copy On Write

更新/删除直接产生 file group 的新 base 文件,没有 log 文件。每个查询只读 base 文件,读性能高、无需动态合并。代价是写慢:哪怕只改少量记录,也要重写整个 base 文件。

适用:批 ETL、读多写少的数仓表、缓慢变化的维表/参考表。

4.2 Merge On Read

更新/删除先写到log 文件(行式如 Avro,或列式),查询时把 log 动态合并进 base 文件;周期 compaction 再把 log 合并产生新 base 文件版本。写延迟低、近实时可见,代价是查询可能要合并。

适用:CDC 管道、流式入湖、高频 update/delete、合规删除(GDPR)攒一天的删除再一次 compaction 摊销重写成本。

4.3 取舍表

直接引用官方文档的 trade-off 表(来源:Hudi Table & Query Types,read/write amplification 定义见文档):

取舍 Copy-On-Write Merge-On-Read
写延迟
查询延迟
更新代价 高(重写整个 base 文件) 低(追加到 delta log)
base 文件大小 需较小以避免高更新 I/O 可较大(更新代价低且摊销)
读放大 0 被查询的 file group:O(records_changed)
写放大 给定更新模式下最高,O(file_groups_written) 被写的 file group:O(records_changed)

文档对放大的定义:read amplification = 为读 1 字节真实数据读了多少字节;write amplification = 每 1 字节真实变更写入存储多少字节。

flowchart LR
  subgraph CoW
    U1[更新] --> NB[新 base 文件]
  end
  subgraph MoR
    U2[更新] --> LG[追加 log 文件]
    LG -.周期 compaction.-> NB2[新 base 文件]
  end

注意:core 能力(原子写、索引、增量查询、自动文件大小、可扩展元数据)在两种表类型上都有,与表类型无关。


五、表服务:compaction、clustering、cleaning

MoR 表如果只写 log 不合并,log 会越堆越多、查询越来越慢。Hudi 用一组表服务维持健康,它们都在 timeline 上有对应 instant,可同步或异步执行。

5.1 compaction

把一个 file slice 的 log 文件合并进 base 文件,产出新 file slice 的新 base 文件(完成态作为 COMMIT 动作)。compaction 的调度策略决定 read-optimized 查询能看到多新的数据(第七节)。还有 LOGCOMPACTION:只把多个小 log 合并成大 log,不碰 base,降低小文件数。

5.2 clustering

按优化的排序/布局重写 file group(CLUSTERING 调度、REPLACE_COMMIT 完成),类似 Delta 的 Z-order/liquid clustering、Iceberg 的 sort rewrite。目的是改善数据局部性、提升文件裁剪。clustering 会重映射 record key 到新 file group,这是少数会改变「key→file group」映射的操作之一。

5.3 cleaning

删除不再需要的旧 file slice(CLEANS),回收被旧版本占用的空间。保留多少历史版本由 cleaner 策略控制(按 commit 数或时间),这直接决定时间旅行/增量查询能回溯多远——和 Delta 的 VACUUM、Iceberg 的 expire snapshots 是同一类运维操作(第 17 章)。

5.4 rollback 与 savepoint

ROLLBACK 清除失败写入留下的部分文件——因为 timeline 的三态明确区分了「未完成」,回滚能精确知道要删哪些。SAVEPOINT/RESTORE 把某些 file slice 标记为「保存」,cleaner 不删,用于灾备和恢复到时间点。

flowchart TD
  W[DELTA_COMMIT 写 log] --> ACC[log 累积]
  ACC --> CMP[COMPACTION: log 合并进 base]
  ACC --> LCMP[LOGCOMPACTION: 小 log 合并]
  CMP --> CLN[CLEANS: 删旧 file slice]
  W -.失败.-> RB[ROLLBACK: 清未完成文件]

六、索引体系:Hudi 为何强在 upsert

索引是 Hudi 的「核武器」(文档 Indexes)。写入时(upsert/delete),Hudi 必须先知道「每条 incoming 记录的 key 当前在哪个 file group」,才能决定是更新已有还是插入新建。索引就是干这个的。

6.1 索引的作用

没有索引的设计(文档点名 Hive/Iceberg)做更新要全 base 文件 × 全 incoming 更新,读放大高 10–100 倍;纯 LSM 类写优化结构虽不需要索引,但对云存储上的扫描型分析查询表现差。Hudi 用「额外的索引存储成本」换「写和读都好」。

6.2 索引类型

writer 用 hoodie.index.type 选择(Spark 默认 SIMPLE,Flink 默认 FLINK_STATE)。主要类型:

索引 机制 适合
BLOOM / GLOBAL_BLOOM 读 base 文件 footer 里的 bloom filter + 按 key 范围裁剪 key 有序(如时间前缀)的 fact 表、事件去重
SIMPLE / GLOBAL_SIMPLE 直接 join incoming key 与存储上的 key,不做前置裁剪 随机更新散布全表的维表
RECORD_LEVEL_INDEX / GLOBAL_RECORD_LEVEL_INDEX metadata 表里的 record_index 分区,hash 分片存 key→location 大表、index lookup 主导写延迟的场景
BUCKET bucket hash 直接定位 file group(SIMPLE / CONSISTENT_HASHING 两种引擎) 大规模、想避免 lookup 开销
HBASE 外部 HBase 维护映射 可接受外部系统运维成本
FLINK_STATE / INMEMORY Flink 状态后端 / 内存 hashmap Flink 流式写

「global vs 非 global」是另一条关键轴(文档 Global and Non-Global Indexes):

6.3 record index:现代默认

record index(RLI,0.14.0 引入,文档 Record Index)把 record key → 位置 的映射存在 metadata 表的 record_index/ 分区,用 hash 分片支持超大规模,提供 point lookup。1.x 进一步区分:

RLI 相比 GLOBAL_SIMPLE/GLOBAL_BLOOM/HBASE,对大表能给出好得多的 lookup 性能,又不必运维外部系统——这是 Hudi 1.x 在 upsert 路径上的主推方向。

6.4 选索引:按 workload

文档给出三类典型负载与推荐(Picking Indexing Strategies),这里精炼:

flowchart TD
  IN[incoming 记录] --> Q{索引 lookup}
  Q -->|命中 fileId| UP[upsert 到该 file group]
  Q -->|未命中| NEW[insert: 分配新 file group]
  UP --> COW2[CoW 重写 base / MoR 写 log]

记住这条主线:file group 把记录定位到局部 + 索引把 key 快速映射到 file group = Hudi 的 upsert 优势


七、查询类型

同一张 Hudi 表,按「要新鲜度还是要性能」可以用不同查询类型读(文档 Query types),由 hoodie.datasource.query.type 控制。

7.1 snapshot 查询

看表的最新快照(截至最新完成动作)。对 MoR 表,会把 base 文件 + log 文件动态合并出最新值;对 CoW 表就是读最新 base 文件。最新鲜,但 MoR 下查询延迟较高(要合并)。这是默认查询类型。

7.2 read-optimized 查询(仅 MoR)

只读列式 base 文件,不合并 log。性能等同纯列式扫描,但只能看到「最近一次 compaction 之前」的数据,新鲜度低。常配合「compaction 对齐事务边界」的策略,给数仓外表或对延迟不敏感的训练任务用。

文档给的 snapshot vs read-optimized 取舍:

取舍 Snapshot Read Optimized
数据延迟
查询延迟 高(合并 base + log) 低(纯 base 文件性能)

也就是说,MoR 表的「写得快」是有代价的:你要在「snapshot 读得慢但新鲜」和「read-optimized 读得快但旧」之间选,而 compaction 调度决定这两者的差距有多大。

7.3 incremental 查询

只返回「自某个 instant 以来」新写入的数据,是 Hudi 区别于传统批 ETL 的杀手锏(文档 Incremental Queries)。配 hoodie.datasource.read.begin.instanttime(和可选 end.instanttime)使用:

hoodie.datasource.query.type = incremental
hoodie.datasource.read.begin.instanttime = 20260630080000

意义:下游不必每次全表扫,只消费变更。timeline 上的 commit instant 表示数据的到达时间(arrival time),而数据本身的分区反映事件时间(event time)。迟到数据(9 点的订单 10:20 才到)会 upsert 进老分区,但 incremental 查询能凭 timeline 高效只取变更记录,不必扫所有时间桶。

还有 incremental CDC 变体(hoodie.datasource.query.incremental.format = cdc):输出带前后像和操作类型的变更流,像数据库 CDC。

7.4 time travel

as.of.instant 指定过去某个 instant 读历史快照,用于复现训练数据、审计等——能回溯多远受 cleaner 保留策略约束(第五节)。

flowchart LR
  T[Hudi 表] --> SN["snapshot: base+log 合并, 最新"]
  T --> RO["read-optimized: 仅 base, 较旧但快 (MoR)"]
  T --> INC["incremental: 自 instant 以来的变更"]
  T --> TT["time travel: as.of.instant"]

八、metadata 表与多模索引

第六节讲的各种索引,1.x 里大多不是各写各的散落结构,而是统一建在一张内部的 metadata 表上(文档 Multi-modal Indexing,0.11.0 引入)。这是理解 Hudi 1.x 索引体系的关键。

8.1 metadata 表是什么

metadata 表本身就是一张 Hudi MoR 表,藏在 .hoodie/metadata/ 下。它把表的辅助信息组织成多个分区,每个分区是一类索引/统计:

metadata 分区 内容
files 每个数据分区下有哪些文件(替代 LIST 目录)
column_stats 各文件各列的 min/max/null count(文件级裁剪)
bloom_filter 各数据文件的 bloom filter,集中存放避免读 footer
record_index record key → 位置 的映射(RLI)
expr_index_<name> 表达式索引(每个一个分区)
secondary_index_* 二级索引(非 record key 列)

「多模索引」的意思就是:metadata 表可以通过加新分区来扩展新索引类型,配合异步索引构建(INDEXING 动作),在不阻塞写入的前提下在线建索引。

8.2 为什么集中到 metadata 表

启用相关配置(文档列出,默认值标注):

hoodie.metadata.enable                         # 启用 metadata 表(读写共用)
hoodie.metadata.index.column.stats.enable      # 列统计索引
hoodie.metadata.index.bloom.filter.enable      # 默认 false
hoodie.metadata.record.level.index.enable      # 分区级 RLI(1.1.0)
hoodie.metadata.global.record.level.index.enable  # 全局 RLI(0.14.0)

8.3 表达式索引与二级索引(1.x)

Hudi 1.0 起支持两类更「数据库式」的索引(文档 Expression Index / Secondary Index),用 SQL CREATE INDEX / DROP INDEX 管理:

这两个把 Hudi 的索引能力从「主键 upsert 定位」扩展到「任意列查询加速」,让它更像传统数据库的索引体系。注意它们都建在 metadata 表上,享受同样的异步构建与一致性保证。

flowchart TD
  MT["metadata 表 (.hoodie/metadata)"] --> F[files: 文件清单]
  MT --> CS[column_stats: 列界裁剪]
  MT --> BF[bloom_filter: 集中 bloom]
  MT --> RI[record_index: key→位置]
  MT --> EI["expr_index_*: 表达式索引"]
  MT --> SI["secondary_index_*: 非主键列"]

九、写操作语义:upsert、insert 与 precombine

Hudi 的写入不是只有「append」。它把写操作分成几种语义(对应 hoodie.datasource.write.operation),这也是它和 Iceberg/Delta「写就是加文件」的区别:

操作 语义
upsert 默认。按 record key 查索引:已存在则更新,不存在则插入。需要索引 lookup
insert 插入,不做 key 去重的 lookup(更快,但可能产生重复 key)
bulk_insert 大批量初始化导入,优化文件大小与排序,跳过常规写路径
insert_overwrite 覆盖分区内 file group(REPLACE_COMMIT
delete 按 key 删除

upsert 是 Hudi 的招牌,但它依赖索引 lookup,比 insert 重。初次大批量灌数据用 bulk_insert 更合适。

9.1 precombine:同一批里的去重与排序

upsert 时如果同一批数据里出现同一个 record key 的多条记录(CDC 常见:一条订单在一个微批里被改了两次),Hudi 用 precombine 字段hoodie.datasource.write.precombine.field)决定保留哪条——通常是事件时间戳或版本号,取最大的那条。这就是第八节实验里 precombine.field = ts 的作用。

9.2 event-time ordering 与 merge

更进一步,Hudi 支持 event-time ordering(文档 Ordering of Actions):timeline 上的动作按 completed instant 排序得到可串行化的写入顺序(arrival time),而记录本身可按指定的 event-time 字段合并。这让「迟到数据不会覆盖更新的数据」成为可能——一条 10:20 到达但 event time 是 09:00 的记录,不会盖掉 event time 10:00 的当前值。这是 CDC/流式场景里正确性的关键,Iceberg/Delta 默认没有这层内建的记录级合并语义。


十、并发控制

Hudi 的并发模型比「单写者顺序提交」复杂,因为它要同时支持「写入」和「后台表服务(compaction/clustering/cleaning)」并发跑。

10.1 OCC:写者之间

多个写者并发时,Hudi 默认用乐观并发控制(OCC):各自基于某个 timeline 状态写,提交时检查是否和并发提交冲突(改了重叠的 file group),冲突则失败重试。需要一个外部锁提供者(如 ZooKeeper、Hive Metastore、DynamoDB)来协调提交的临界区。这点和 Delta(第 12 章 第九节)、Iceberg(第 11 章)的 OCC 是同类思路。

10.2 表服务的非阻塞性

Hudi 的设计目标之一是表服务尽量不阻塞写入。靠的是 timeline 的 instant 三态 + 「请求 instant 对完成 instant 排序」:compaction 先在 requested 态生成一个不可变计划(锁定要合并哪些 log),之后写入仍可继续往新的 file slice 写 log,compaction 异步执行计划。cleaning、clustering 同理基于计划执行,失败可回滚(ROLLBACK),不破坏并发写入。

10.3 非阻塞并发控制(1.x)

Hudi 1.x 引入 非阻塞并发控制(Non-Blocking Concurrency Control, NBCC),目标是让多个写者甚至能并发写同一个 file group 而不互相阻塞,依赖 TrueTime 的完成时间排序在读时正确合并。这适合多流并发入湖同一张表的场景。

版本提示:NBCC 仍在演进,具体支持的写入路径(Spark/Flink)和限制要查对应 1.x 版本文档,不要假定所有写入组合都支持非阻塞并发。

flowchart TD
  W1[Writer 1] -->|OCC + 锁提供者| T[timeline]
  W2[Writer 2] -->|OCC + 锁提供者| T
  T --> CMP["compaction: requested 计划"]
  CMP -.异步执行, 不阻塞写.-> DONE[completed: 新 base]

十一、log file 内部结构

第三、四节说 MoR 的更新写到 log 文件,这里看 log 文件内部长什么样(apache/hudiHoodieLogFormat 设计)。log 文件不是简单的「追加一行行记录」,而是由若干 log block 组成的自描述格式,便于追加、容错和按类型解析。

block 的主要类型:

block 类型 作用
data block 一批 upsert 记录;编码可为 Avro(行式)、Parquet 或 HFile
delete block 一批删除(按 record key)
command block 控制指令,如「回滚此前某个 block」(配合 ROLLBACK)
corrupt block 写入中断留下的损坏区段,读时识别并跳过

每个 block 带头部(版本、类型、长度、所属 instant 等元信息),所以读 log 时能逐 block 解析、定位到属于某个 instant 的变更,也能在尾部损坏时安全截断。data block 默认 Avro(行式,追加快),这正是 MoR「写得轻」的原因;compaction 时把 log block 的记录合并进列式 base 文件,恢复列式扫描性能。

工程含义:

flowchart LR
  LOG[".log 文件"] --> B1[data block @t1: Avro]
  LOG --> B2[delete block @t2]
  LOG --> B3[data block @t3: Avro]
  B1 & B2 & B3 -.compaction.-> BASE[新 base.parquet]

十二、自动文件大小与小文件治理

小文件是所有湖表格式的通病(频繁提交、流式写、分区过细,第 17 章 专门讲)。Iceberg/Delta 主要靠事后跑 compaction/OPTIMIZE 来合并小文件。Hudi 的差异是它在写入时就尝试控制文件大小(auto file sizing)。

机制:upsert/insert 时,Hudi 不只是新建文件,而是优先把新记录塞进「还没满」的小 base 文件所在的 file group,把它补到目标大小,从而在写路径上就减少小文件产生。关键配置:

hoodie.parquet.max.file.size        # base 文件目标上限(如 120MB)
hoodie.parquet.small.file.limit     # 小于此值视为"小文件",写入时优先补它

工程含义:


十三、Hudi Streamer:内建入湖工具

Iceberg/Delta 通常依赖外部框架(Spark job、Flink、Kafka Connect)把数据写进湖。Hudi 自带一个开箱即用的入湖工具 Hudi Streamer(早期名 DeltaStreamer,在 hudi-utilities 里),这也反映了它「为持续入湖而生」的定位。

它的能力:

spark-submit \
  --class org.apache.hudi.utilities.streamer.HoodieStreamer \
  hudi-utilities-bundle.jar \
  --table-type MERGE_ON_READ \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
  --target-base-path file:///tmp/hudi_orders \
  --target-table orders \
  --op UPSERT \
  --continuous

工程含义:如果你的场景就是「Kafka/CDC → 湖,持续 upsert」,Hudi Streamer 省掉了自己写消费+写入+compaction 调度的活;反过来,如果你已有成熟的 Spark/Flink 管道,这层内建工具的价值就小。这条「自带写入栈」的特点,是第 14 章评估「引擎中立性」时 Hudi 相对 Iceberg 的一个分野(第 19 章 展开流式入湖)。


十四、实验:本机环境与可复现步骤

WRITING_GUIDE 要求:跑不了就声明环境、给可复现步骤,不造数。

本机环境检测(2026-06-30)

OS: Linux 6.6.x (WSL2)
python3: 可用(系统自带,无 pip)
java / spark-submit / pyspark: 未安装

结论:本机不具备运行 Hudi 的条件。Hudi 的写入栈核心运行在 JVM 上(Spark/Flink),不像 Delta 有成熟的纯 Python/Rust 写实现,因此本机无法产生真实的 .hoodie timeline 与 log/base 文件。下面给出在具备 Spark 的环境中可直接复现的步骤,产出物应与第二至七节描述一致。

8.1 启动带 Hudi 的 PySpark

前置:JDK、Spark 3.x、对应的 hudi-spark bundle(版本对应见 Hudi 文档的 quick start)。

pyspark \
  --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \
  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

8.2 建 MoR 表,做多次 upsert

base = "file:///tmp/hudi_mor"
opts = {
  "hoodie.table.name": "orders",
  "hoodie.datasource.write.table.type": "MERGE_ON_READ",
  "hoodie.datasource.write.recordkey.field": "id",
  "hoodie.datasource.write.precombine.field": "ts",
  "hoodie.datasource.write.partitionpath.field": "city",
  "hoodie.datasource.write.hive_style_partitioning": "true",
}
df1 = spark.createDataFrame(
    [(1,"NYC",100,1),(2,"SFO",200,1),(3,"NYC",300,1)],
    ["id","city","amt","ts"])
df1.write.format("hudi").options(**opts).mode("overwrite").save(base)   # 首次:写 base 文件

# 对 id=1,2 做 update(ts 增大),id=4 insert
df2 = spark.createDataFrame(
    [(1,"NYC",150,2),(2,"SFO",250,2),(4,"SEA",400,2)],
    ["id","city","amt","ts"])
df2.write.format("hudi").options(**opts).mode("append").save(base)      # MoR:写 log 文件

8.3 观察 timeline、log file 与 compaction

# 看 timeline 的 instant(Hudi 1.x 在 .hoodie/timeline 下)
ls -R /tmp/hudi_mor/.hoodie/ | head -50
# 期望: 出现 deltacommit 相关 instant(MoR 的写是 DELTA_COMMIT)

# 看某分区下的文件: 应有 base parquet + .log 文件
ls /tmp/hudi_mor/city=NYC/
# 期望: <fileId>_..._<instant>.parquet 和 .<fileId>_<instant>.log.* 

触发 compaction(把 log 合并进 base),然后再看:

# 同步触发 compaction(也可配置 inline / async 策略)
spark.sql("CALL run_compaction(op => 'run', path => 'file:///tmp/hudi_mor')")
ls /tmp/hudi_mor/city=NYC/
# 期望: compaction 后出现新 instant 的 base parquet(COMMIT 动作产物),log 被合并

观察点对照本章:

spark.read.format("hudi").option("hoodie.datasource.query.type","snapshot").load(base).orderBy("id").show()
spark.read.format("hudi").option("hoodie.datasource.query.type","read_optimized").load(base).orderBy("id").show()

8.4 增量查询

(spark.read.format("hudi")
  .option("hoodie.datasource.query.type","incremental")
  .option("hoodie.datasource.read.begin.instanttime","0")  # 0 = 从头
  .load(base).show())

十五、与 Iceberg / Delta 的差异预告

把三家放在一个坐标里(细节留到 第 14 章):

维度 Iceberg Delta Hudi
元数据模型 不可变 snapshot 树 有序事务日志 timeline(instant 三态)+ file group
存储单元 扁平 data/delete 文件 扁平 data 文件 + DV file group / file slice(base + log)
行级更新 position/equality delete、V3 DV deletion vector CoW 重写 / MoR log + compaction
upsert 定位 无内建主键索引 无内建主键索引 record key + 多种索引(核心优势)
增量消费 incremental scan(快照差) CDF incremental query(一等公民)
设计偏向 引擎中立、快照读 Spark 生态、日志简洁 高频 upsert、流式入湖

一句话:Hudi 把「主键 + 索引 + MoR」做进了表格式本身,这让它在 CDC/upsert 负载上结构性领先,代价是概念更多、运维面更大(索引、compaction、cleaning 都要调)。选不选 Hudi,本质是问「你的负载是不是真的高频 upsert + 增量消费」。


十六、小结

下一章把 Iceberg、Delta、Hudi 并排对照,并讲 UniForm / XTable 这两条互通路线和选型决策树。


返回 系列目录 | 上一篇:Delta Lake 事务日志 | 下一篇:Iceberg、Delta、Hudi 对照与互通

参考资料

  1. Apache Hudi 文档,Timeline(hudi.apache.org,Version 1.2.0)— instant 三态、动作类型、active/LSM timeline、TrueTime。
  2. Apache Hudi 文档,Table & Query Types(同上)— CoW/MoR 定义、trade-off 表、snapshot/read-optimized/incremental 查询。
  3. Apache Hudi 文档,Indexes(同上)— 多模索引、bloom/simple/record/bucket、global vs 非 global、按 workload 选型。
  4. apache/hudi 源码(1.x release)— file group/file slice、表服务实现。
  5. 本系列 第 8 章 Iceberg 元数据树第 10 章 行级删除与 Merge-on-Read第 12 章 Delta Lake 事务日志

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-06-30 · database / storage

【数据湖与开放表格式】行级删除与 Merge-on-Read

Iceberg 在不可变文件上做行级删除的两条路线:copy-on-write(重写整文件)与 merge-on-read(写 delete 文件,读时合并)。讲清 position delete 与 equality delete 的语义、字段与作用域规则,写放大/读放大的取舍,V2 delete file 到 V3 deletion vector(Puffin 承载)的差异与迁移,以及读路径如何把 data file 与 delete 合并出可见行。基于 pyiceberg 0.11.1 实测 CoW 写放大并观察 MoR 回退。

2026-06-30 · database / storage

【数据湖与开放表格式】表格式为什么存在

目录式分区表(Hive 表)在对象存储上有三处硬伤:并发写部分提交、list planning 太贵、缺快照隔离与原子提交。本文拆开放表格式补上的四件事——原子提交、快照隔离、文件级统计裁剪、schema 与分区演进,并抽象出三家共有的『元数据指针 + 不可变数据文件』骨架。

2026-06-30 · database / storage

【数据湖与开放表格式】Iceberg、Delta、Hudi 对照与互通

把前面 08–13 章拆过的 Iceberg、Delta、Hudi 放在一个坐标系里对照:元数据模型、行级更新、并发控制、引擎生态四维,每维标清口径。再讲两条互通路线——Delta UniForm(写时同步生成 Iceberg/Hudi 元数据)与 Apache XTable(事后转换元数据),以及它们的边界。最后给一棵按写入模式/引擎栈/更新频率展开的选型决策树,不做排名。

2026-06-29 · database / storage

【数据湖与开放表格式】Parquet · Iceberg · Delta · Hudi 内核拆解

拆解 lakehouse 的两层基础:列式文件格式(Parquet/ORC/Arrow)与开放表格式(Iceberg/Delta/Hudi)。讲清没有数据库进程时,如何在对象存储上做 ACID、行级更新、快照与并发,以及 catalog、查询引擎、流式入湖如何拼成可运维的湖仓。面向数据平台工程师与从 OLAP/数仓转型的开发者。


By .