土法炼钢兴趣小组的算法知识备份

【存储工程】Parquet 文件格式深度解析

文章导航

分类入口
storage
标签入口
#parquet#row-group#column-chunk#encoding#predicate-pushdown#pyarrow

目录

上一篇我们讨论了列式存储(Columnar Storage)的核心思想:把同一列的数据连续存放,让分析查询只读取需要的列,而不是扫描整行。这个思想落地到具体文件格式时,需要回答一系列工程问题:文件内部怎么组织数据才能同时支持并行读取和列裁剪?同一列的数据用什么编码方式才能最大化压缩率?如何在不读取全部数据的前提下跳过不符合条件的行?

Apache Parquet 是目前大数据生态中使用最广泛的列式文件格式。从 Spark、Hive、Impala 到 DuckDB、Polars,几乎所有分析引擎都把 Parquet 作为首选存储格式。它的设计源自 Google Dremel 论文中的列式编码思想,由 Twitter 和 Cloudera 联合开发,2013 年进入 Apache 孵化器,目前最新规范版本为 2.10(对应 Parquet Format 的 GitHub 仓库)。

本文基于 Parquet Format 规范(parquet-format GitHub 仓库,版本 2.10)和 parquet-java 实现(版本 1.14.x),从文件结构、编码方式、统计信息、谓词下推、投影下推、版本演进六个维度拆解 Parquet 的内部机制,再通过 parquet-tools 和 PyArrow 的实战操作落到工程实践。


一、Parquet 设计目标

1.1 设计约束

Parquet 要解决的核心问题是:在分布式文件系统(HDFS、S3、GCS)上存储结构化数据,让分析查询尽可能少读数据、尽可能快解码数据。这个目标拆解成几个具体约束:

  1. 列裁剪(Column Pruning):查询只涉及 3 列时,不应该读取其余 97 列的数据。
  2. 行组过滤(Row Group Filtering):查询条件是 age > 30 时,如果某个数据块的 age 最大值只有 25,整个块都应该跳过。
  3. 高压缩率:同一列的数据类型一致、值域相近,列式存储天然适合压缩。编码方式应该利用这个特性。
  4. 可分片(Splittable):MapReduce、Spark 等框架需要把一个文件拆成多个分片(Split)并行处理。文件格式必须支持在合理边界上切分。
  5. 自描述(Self-Describing):文件内部包含完整的模式信息(Schema),不依赖外部元数据服务。
  6. 嵌套结构支持:现实数据不只有扁平表,还有嵌套的 struct、array、map 结构。格式必须高效支持嵌套数据。

1.2 Dremel 的遗产

Parquet 的嵌套编码方案直接来自 Google 在 2010 年发表的 Dremel 论文。Dremel 提出了两个关键概念:

这两个整数配合列值本身,可以无损地重建任意深度的嵌套结构,而不需要在列式存储中保留行的边界信息。

举一个例子。假设 Schema 定义如下:

message Document {
  required int64 doc_id;
  repeated group Links {
    optional int64 forward;
    optional int64 backward;
  }
}

对于 Links.forward 这一列,Parquet 存储的不只是值的序列,还有每个值对应的 Definition Level 和 Repetition Level。如果某条记录没有 Links 字段,forward 的 Definition Level 为 0(Links 这一层就不存在了);如果有 Linksforward 为 null,Definition Level 为 1(Links 存在,forward 不存在);如果 forward 有值,Definition Level 为 2。

对于扁平表(Flat Schema),所有字段的 Definition Level 最多为 0 或 1(取决于是否可选),Repetition Level 始终为 0。这种情况下嵌套编码几乎没有额外开销。

1.3 与 ORC 的定位差异

Parquet 和 ORC(Optimized Row Columnar)是大数据生态中两个主要的列式文件格式。它们的设计目标相似,但在几个维度上有所不同:

维度 Parquet ORC
嵌套支持 Dremel 编码,原生支持任意嵌套 扁平化处理,嵌套支持较弱
生态绑定 Spark、Arrow 生态首选 Hive 生态首选
索引能力 列统计 + Page Index(2.6 起) Bloom Filter + Row Group Index
ACID 支持 依赖 Delta Lake / Iceberg 等外部表格式 Hive ACID 原生支持
压缩率 取决于编码,整体相当 取决于编码,整体相当

我认为 Parquet 在跨引擎互操作性上有明显优势。ORC 的核心用户群体集中在 Hive 生态,而 Parquet 几乎被所有现代分析引擎支持:Spark、DuckDB、Polars、ClickHouse、DataFusion、BigQuery。如果你的数据需要被多个引擎读取,Parquet 是更安全的选择。


二、Row Group/Column Chunk/Page 三级结构

2.1 文件整体布局

一个 Parquet 文件的物理结构分为三层:行组(Row Group)、列块(Column Chunk)、页(Page)。文件头部有 4 字节魔数(Magic Number)PAR1,尾部同样有 4 字节 PAR1,尾部魔数之前是文件元数据(File Metadata)及其长度。

┌──────────────────────────────────┐
│  Magic Number: PAR1 (4 bytes)    │
├──────────────────────────────────┤
│  Row Group 0                     │
│  ├── Column Chunk 0 (col_a)      │
│  │   ├── Page 0                  │
│  │   ├── Page 1                  │
│  │   └── Page 2                  │
│  ├── Column Chunk 1 (col_b)      │
│  │   ├── Page 0                  │
│  │   └── Page 1                  │
│  └── Column Chunk 2 (col_c)      │
│      └── Page 0                  │
├──────────────────────────────────┤
│  Row Group 1                     │
│  ├── Column Chunk 0 (col_a)      │
│  │   ├── Page 0                  │
│  │   └── Page 1                  │
│  ├── Column Chunk 1 (col_b)      │
│  │   └── Page 0                  │
│  └── Column Chunk 2 (col_c)      │
│      ├── Page 0                  │
│      └── Page 1                  │
├──────────────────────────────────┤
│  File Metadata (Thrift)          │
│  ├── Schema                      │
│  ├── Row Group Metadata[]        │
│  │   ├── Column Chunk Metadata[] │
│  │   │   ├── file_offset         │
│  │   │   ├── num_values          │
│  │   │   ├── encodings           │
│  │   │   ├── statistics          │
│  │   │   └── ...                 │
│  │   └── total_byte_size         │
│  └── num_rows                    │
├──────────────────────────────────┤
│  Footer Length (4 bytes)         │
│  Magic Number: PAR1 (4 bytes)    │
└──────────────────────────────────┘

读取 Parquet 文件的第一步是从文件尾部读取 Footer:先定位到文件末尾的 8 字节(4 字节 Footer Length + 4 字节魔数),拿到 Footer Length 后再往前读取对应长度的 File Metadata。File Metadata 使用 Thrift 编码(Compact Protocol),包含完整的 Schema 定义和所有 Row Group 的元数据。

这个”先读尾部”的设计有一个重要意义:写入 Parquet 文件时可以流式追加数据,一个 Row Group 写完再写下一个,最后才写 Footer。如果写入过程中崩溃,丢失的只是最后一个未完成的 Row Group 和 Footer。

2.2 行组(Row Group)

行组是 Parquet 中最粗粒度的数据分区单元。一个行组包含一批行(通常几十万到几百万行),这些行的所有列数据都在同一个行组内。行组有两个核心作用:

  1. 并行处理单元:MapReduce 和 Spark 的一个 Task 通常处理一个或多个行组。行组之间没有数据依赖,可以完全并行。
  2. I/O 调度单元:读取一个行组的某些列时,只需要发起对应列块的 I/O 请求,不需要读取其他行组的数据。

行组的大小对性能影响很大。行组太小,元数据占比高,列的压缩效果差,且 I/O 请求过于碎片化。行组太大,一个 Task 的内存占用高,且无法细粒度并行。Parquet 官方文档建议行组大小在 128MB 到 1GB 之间,Spark 默认使用 128MB。

行组的行数不是固定的,它由写入时的行组大小限制决定。写入方会累积数据,当累积大小接近目标行组大小时,把当前数据刷写为一个行组。因此,不同行组的行数可能不同。

2.3 列块(Column Chunk)

一个行组中,每一列的数据构成一个列块。列块是列维度上的数据分区单元。一个列块只包含一列的数据,且这些数据在文件中连续存放。

列块的元数据(Column Chunk Metadata)记录在 File Metadata 中,包括:

列块是投影下推(Projection Pushdown)的基本单位。如果查询不涉及某一列,读取器直接跳过该列的所有列块,不发起任何 I/O。

2.4 页(Page)

页是 Parquet 中最细粒度的数据单元,也是编码(Encoding)和压缩(Compression)的基本单位。一个列块由一个或多个页组成。Parquet 定义了三种页类型:

数据页的默认大小为 1MB(未压缩)。页太小会增加页头开销和解码次数;页太大会降低谓词下推的过滤精度(因为统计信息是按页粒度记录的)。

数据页有两个版本:Data Page V1 和 Data Page V2。V2 在 Parquet 2.0 中引入,主要变化是把 Definition Level 和 Repetition Level 的编码从页数据中分离出来,放到页头中独立的部分,使得跳过不需要的页时不必解码 Level 数据。

一个数据页(V1)的二进制布局如下:

┌─────────────────────────────────────┐
│  Page Header (Thrift)               │
│  ├── page_type: DATA_PAGE           │
│  ├── uncompressed_page_size         │
│  ├── compressed_page_size           │
│  ├── num_values                     │
│  ├── encoding                       │
│  ├── definition_level_encoding      │
│  └── repetition_level_encoding      │
├─────────────────────────────────────┤
│  Repetition Levels (encoded)        │
├─────────────────────────────────────┤
│  Definition Levels (encoded)        │
├─────────────────────────────────────┤
│  Values (encoded + compressed)      │
└─────────────────────────────────────┘

Data Page V2 的布局有一处关键不同:Repetition Level 和 Definition Level 不参与压缩,只有 Values 部分被压缩。这使得读取器可以先读取 Level 数据判断哪些值为 null,再决定是否需要解压 Values 部分。

┌──────────────────────────────────────┐
│  Page Header (Thrift)                │
│  ├── page_type: DATA_PAGE_V2        │
│  ├── num_values                      │
│  ├── num_nulls                       │
│  ├── num_rows                        │
│  ├── encoding                        │
│  ├── definition_levels_byte_length   │
│  ├── repetition_levels_byte_length   │
│  └── is_compressed                   │
├──────────────────────────────────────┤
│  Repetition Levels (NOT compressed)  │
├──────────────────────────────────────┤
│  Definition Levels (NOT compressed)  │
├──────────────────────────────────────┤
│  Values (compressed)                 │
└──────────────────────────────────────┘

2.5 三级结构的协作

这三层结构之间的关系可以用一个类比来理解:

查询引擎读取 Parquet 文件的典型流程是:

  1. 读取 Footer,获取 Schema 和所有行组的元数据。
  2. 根据查询涉及的列,确定需要读取哪些列块(投影下推)。
  3. 根据查询的过滤条件和行组/列块的统计信息,跳过不符合条件的行组(谓词下推)。
  4. 对于需要读取的列块,按页逐个解码,应用页级别的过滤(如果有 Page Index)。
  5. 将解码后的列数据重组为行或批次,交给上层算子处理。

三、编码方式

Parquet 支持多种编码方式(Encoding),目标是利用同一列数据的统计特征来减少存储空间和加速解码。编码发生在压缩之前:数据先编码,再压缩。选择合适的编码方式通常比选择压缩算法更能影响文件大小和读取性能。

3.1 Plain 编码

普通编码(Plain Encoding)是最简单的编码方式:值按原始二进制格式连续存放。固定长度类型(INT32、INT64、FLOAT、DOUBLE)直接写入字节序列;变长类型(BYTE_ARRAY)先写 4 字节长度前缀,再写内容。

# INT32 Plain Encoding
[value0: 4 bytes][value1: 4 bytes][value2: 4 bytes]...

# BYTE_ARRAY Plain Encoding
[len0: 4 bytes][data0: len0 bytes][len1: 4 bytes][data1: len1 bytes]...

Plain 编码没有任何压缩效果,但解码速度最快——直接读取字节,不需要任何计算。当列的基数(Cardinality)很高、值之间没有规律时,Plain 编码配合通用压缩算法(如 Zstd)可能是最好的选择。

3.2 字典编码(Dictionary Encoding)

字典编码是 Parquet 中使用最广泛的编码方式。它的原理是:如果一列的去重值(Distinct Values)数量远小于总行数,就建立一个字典(Dictionary),把每个值映射为一个整数索引,然后只存储索引序列。

字典页(Dictionary Page):
  entries = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen"]

数据页(Data Page):
  indices = [0, 1, 0, 2, 3, 1, 0, 2, ...]

字典编码的索引序列本身通常使用 RLE/Bit-Packing 混合编码(见下文)进一步压缩。例如,如果字典只有 4 个条目,每个索引只需要 2 位(Bit),1000 个值只需要 250 字节(加上 RLE 压缩可能更少),而原始字符串可能需要几十 KB。

字典编码有一个硬限制:字典页的大小不能超过页大小限制(默认 1MB)。当列的基数超过这个限制时,写入器会放弃字典编码,回退到 Plain 编码。这个回退称为”字典回退(Dictionary Fallback)“。实际工程中,当列的基数超过几万到几十万时(取决于值的平均长度),字典编码就会失效。

Parquet 的写入器通常默认启用字典编码,并在写入过程中动态监测字典大小。一旦字典超限,当前页及后续页切换到 Plain 编码。同一个列块内可能前几个页使用字典编码,后几个页使用 Plain 编码,这在元数据的 encodings 字段中有体现。

3.3 RLE/Bit-Packing 混合编码

游程编码(Run-Length Encoding,RLE)和位打包(Bit-Packing)是 Parquet 用来编码小整数序列的核心方法。它们通常组合使用,称为 RLE/Bit-Packing 混合编码(Hybrid Encoding)。

这个编码方式用于两个场景:

  1. 字典编码的索引序列。
  2. Definition Level 和 Repetition Level 序列。

混合编码的基本思想是:对于连续重复的值,使用 RLE(记录值和重复次数);对于不重复的值,使用 Bit-Packing(把多个小整数打包到一组字节中)。编码器在写入时动态选择 RLE 还是 Bit-Packing,以较小者为准。

编码格式以变长整数(VarInt)开头作为 header。header 的最低位标识编码类型:

# 示例:bit_width = 3,编码序列 [5, 5, 5, 5, 5, 5, 5, 5, 1, 2, 3, 4, 5, 6, 7, 8]

# 前 8 个值全是 5,使用 RLE:
# header = 8 << 1 | 0 = 16 (varint编码)
# value = 5 (用 ceil(3/8) = 1 字节表示)
# 编码结果: [16, 5]

# 后 8 个值不重复,使用 Bit-Packing:
# header = 1 << 1 | 1 = 3 (varint编码,1 组 x 8 个值)
# 8 个 3-bit 值打包到 3 字节:
# 编码结果: [3, packed_bytes...]

Bit-Packing 的位宽(Bit Width)由该列的最大值决定。例如字典大小为 100,索引的最大值为 99,需要 7 位,所以 Bit Width = 7。位宽越小,压缩效果越好。

3.4 Delta 编码(Delta Encoding)

Delta 编码(Delta Binary Packing)适用于整数列,特别是单调递增或递减的序列(如时间戳、自增 ID)。它不存储原始值,而是存储相邻值之间的差值(Delta),再用 Bit-Packing 编码这些差值。

原始值:     [1000, 1001, 1005, 1009, 1010]
差值:       [1000, 1, 4, 4, 1]  (第一个值作为基准)

差值通常比原始值小得多,需要的位宽更低,压缩效果更好。对于严格递增 1 的序列(如连续 ID),所有差值都是 1,RLE 可以进一步把它压缩到极小的空间。

Parquet 中的 Delta Binary Packing 编码参考了 Daniel Lemire 和 Leonid Boytsov 的 FastPFOR 论文。它把值分成固定大小的块(Block),每个块内部独立计算 Delta 和 Bit Width,以适应值域的局部变化。

具体结构:

Block Header:
  min_delta: zigzag varint
  bit_widths: [bw0, bw1, ...] (每个 mini-block 的位宽)

Mini-Block 0: bit-packed deltas using bw0
Mini-Block 1: bit-packed deltas using bw1
...

Delta 编码对于时间戳列效果显著。以毫秒精度的时间戳为例,原始值需要 64 位,但相邻事件的时间差通常只有几毫秒到几秒,用 10-20 位就能表示。

3.5 Delta Length Byte Array 编码

这是 Delta 编码在变长字节数组上的变体。它把长度和内容分开存储:长度序列使用 Delta Binary Packing 编码,内容直接拼接。

原始数据: ["abc", "defg", "hi", "jklmn"]

长度序列: [3, 4, 2, 5]  → Delta Binary Packing 编码
内容拼接: abcdefghijklmn

解码时先还原长度序列,再按长度切分内容。

这种编码对长度比较均匀的字符串列有效。如果长度变化不大,长度的 Delta 序列会很小。

3.6 Delta Strings 编码

Delta Strings(也称 Incremental Encoding)针对已排序的字符串列做前缀压缩。它存储每个字符串与前一个字符串的公共前缀长度和后缀内容。

原始数据(已排序): ["apple", "application", "apply", "banana"]

编码:
  前缀长度: [0, 4, 4, 0]    → Delta Binary Packing 编码
  后缀:     ["apple", "ication", "y", "banana"] → Delta Length Byte Array 编码

Delta Strings 编码在排序后的字符串列上压缩效果极好,特别是 URL、文件路径这类有大量公共前缀的数据。但它要求列数据已排序,否则公共前缀长度趋近于零,编码退化为 Delta Length Byte Array。

3.7 Byte Stream Split 编码

字节流拆分编码(Byte Stream Split)是 Parquet 2.8 引入的编码方式,专门针对 FLOAT 和 DOUBLE 类型设计。IEEE 754 浮点数的字节表示中,高位字节(指数部分)的变化通常比低位字节(尾数部分)小得多。Byte Stream Split 把每个值的第 k 个字节收集到一起,形成 N 个字节流,然后对这些字节流进行压缩。

原始数据(FLOAT,4 字节): [v0, v1, v2, v3]

v0 = [b0_0, b0_1, b0_2, b0_3]
v1 = [b1_0, b1_1, b1_2, b1_3]
v2 = [b2_0, b2_1, b2_2, b2_3]
v3 = [b3_0, b3_1, b3_2, b3_3]

Byte Stream Split 重排后:
Stream 0: [b0_0, b1_0, b2_0, b3_0]  (所有值的第 0 字节)
Stream 1: [b0_1, b1_1, b2_1, b3_1]  (所有值的第 1 字节)
Stream 2: [b0_2, b1_2, b2_2, b3_2]  (所有值的第 2 字节)
Stream 3: [b0_3, b1_3, b2_3, b3_3]  (所有值的第 3 字节)

最终存储: Stream 0 || Stream 1 || Stream 2 || Stream 3

这个重排本身不减少数据量,但它让通用压缩算法(如 Zstd)更容易发现模式。同一字节位置的值通常有更高的相关性(例如指数字节变化很慢),压缩率因此提高。实测中,Byte Stream Split + Zstd 在浮点数列上的压缩率比 Plain + Zstd 提高 20%-40%。

3.8 编码选择建议

不同编码方式适用于不同的数据特征。以下是选择建议:

数据特征 推荐编码 原因
低基数字符串/整数(基数 < 10000) Dictionary 字典索引远小于原始值
高基数字符串 Plain + Zstd 字典会溢出,直接压缩更好
单调递增整数(时间戳、ID) Delta Binary Packing 差值极小,位宽低
已排序字符串 Delta Strings 前缀压缩效果好
浮点数 Byte Stream Split + Zstd 字节流重排提升压缩率
Boolean RLE 只有两个值,RLE 极度有效

大多数写入器(包括 PyArrow、parquet-java)会根据列类型和统计信息自动选择编码方式。通常不需要手动指定,但了解编码原理有助于诊断压缩率不理想的问题。


四、统计信息与谓词下推

4.1 列统计信息(Column Statistics)

Parquet 在写入时为每个列块记录统计信息(Statistics),存储在 Column Chunk Metadata 中。标准统计字段包括:

这些统计信息是谓词下推(Predicate Pushdown)的基础。查询引擎在读取数据之前先检查统计信息,判断当前行组/列块是否可能包含符合条件的数据。

有一个容易踩的坑:Parquet 早期版本(1.x 规范)中,字符串类型的 min/max 统计信息使用有符号字节比较(Signed Byte Comparison),而实际字符串比较应该是无符号的。这导致统计信息不可靠,一些读取器会忽略字符串列的统计信息。Parquet 2.0 规范引入了 min_value/max_value 字段,使用正确的无符号比较语义,同时废弃了旧的 min/max 字段。

4.2 谓词下推原理

谓词下推(Predicate Pushdown)是指查询引擎把过滤条件”下推”到存储层,让存储层在读取数据时就跳过不符合条件的数据块。在 Parquet 中,谓词下推发生在两个层级:

行组级别:读取器检查行组内每个相关列块的统计信息。如果过滤条件与统计信息矛盾,整个行组被跳过。

查询条件: WHERE age > 30

行组 0: age 列统计 min=18, max=25
  → max(25) < 30,不可能有 age > 30 的行 → 跳过整个行组

行组 1: age 列统计 min=22, max=55
  → max(55) > 30,可能有符合条件的行 → 需要读取

行组 2: age 列统计 min=35, max=60
  → min(35) > 30,所有行都符合条件 → 需要读取

页级别(Page Index,Parquet 2.6 起):对于需要读取的行组,读取器进一步检查每个数据页的统计信息,跳过不符合条件的页。

4.3 Page Index

Parquet 2.6 引入了页索引(Page Index)机制,把统计信息的粒度从行组/列块级别细化到页级别。Page Index 由两个结构组成:

这两个索引结构存储在 File Metadata 中(而不是内联在数据页里),这样读取器可以在读取任何数据页之前就获取所有页的统计信息。

Column Index for column "age":
  page 0: min=18, max=22, null_count=0
  page 1: min=19, max=25, null_count=2
  page 2: min=28, max=55, null_count=0

Offset Index for column "age":
  page 0: offset=1024, compressed_size=4096
  page 1: offset=5120, compressed_size=3840
  page 2: offset=8960, compressed_size=4096

查询条件: WHERE age > 30
  → page 0: max=22 < 30 → 跳过
  → page 1: max=25 < 30 → 跳过
  → page 2: max=55 > 30 → 需要读取

Page Index 在大行组场景下特别有用。如果一个行组有 100 万行、分成 100 个页,没有 Page Index 时要么读取整个行组,要么完全跳过。有了 Page Index,可以只读取 2-3 个页,I/O 量减少一到两个数量级。

4.4 Bloom Filter

Parquet 2.7 引入了布隆过滤器(Bloom Filter)支持。Bloom Filter 是一种概率数据结构,可以快速判断”一个值是否可能在集合中”。它有两种结果:

Bloom Filter 对等值查询(column = value)特别有效,弥补了 min/max 统计信息在高基数列上的不足。例如,一个列块的 min=1、max=1000000,查询 id = 42 时,min/max 无法排除这个列块,但 Bloom Filter 可以判断 42 是否可能存在于该列块中。

Column Chunk Metadata:
  statistics: min=1, max=1000000
  bloom_filter: [bit_array, num_hashes, hash_algorithm]

查询: WHERE id = 42
  min/max 检查: 1 <= 42 <= 1000000 → 无法排除
  Bloom Filter 检查: hash(42) 对应的位全为 1 → 可能存在 → 需要读取
                    或 hash(42) 对应的位有 0 → 一定不存在 → 跳过

Bloom Filter 的空间开销需要权衡。每个列块的 Bloom Filter 通常占几 KB 到几十 KB,对于有大量列块的大文件,总开销可能可观。写入时需要显式启用 Bloom Filter,并指定期望的假阳性率(False Positive Rate,通常 1%-5%)。

4.5 支持的谓词类型

不同的统计信息支持不同类型的谓词下推:

谓词类型 min/max Bloom Filter Page Index
col = value 部分有效 高度有效 部分有效
col > value 有效 不适用 有效
col < value 有效 不适用 有效
col BETWEEN a AND b 有效 不适用 有效
col IN (v1, v2, ...) 部分有效 有效 部分有效
col IS NULL 有效(null_count) 不适用 有效
col LIKE 'prefix%' 有效(字符串有序时) 不适用 有效

我认为在实际工程中,min/max 统计信息能处理大部分场景,Page Index 是进阶优化,Bloom Filter 则是针对等值查询的补充。三者不是互斥的,而是分层配合。


五、投影下推

5.1 投影下推原理

投影下推(Projection Pushdown)是列式存储最基本的优势:查询只涉及表中的部分列时,只读取这些列的数据,完全跳过其他列。这在行式存储(如 CSV、JSON)中无法做到——即使只需要一列,也必须解析整行。

Parquet 的列块在文件中独立存放,每个列块的偏移量和大小记录在 File Metadata 中。读取器根据查询涉及的列列表,只向存储层发起对应列块的读取请求。

Schema: id (INT64), name (STRING), age (INT32), address (STRING), salary (DOUBLE)

查询: SELECT name, salary FROM table WHERE age > 30

投影下推: 只读取 name, age, salary 三列的列块
         跳过 id 和 address 列块
         (age 用于过滤,过滤完成后可以丢弃)

5.2 投影下推的 I/O 节省

投影下推的 I/O 节省量取决于查询涉及的列数占总列数的比例。分析场景中,宽表(几十到几百列)是常态,但单次查询通常只涉及 5-10 列。这意味着投影下推可以减少 90%-99% 的 I/O。

以一个具体例子说明。假设一个表有 100 列,每列的数据量大致相同,总文件大小 10GB:

查询列数 行式存储读取量 列式存储读取量 节省比例
1 10 GB 100 MB 99%
5 10 GB 500 MB 95%
10 10 GB 1 GB 90%
50 10 GB 5 GB 50%
100 10 GB 10 GB 0%

在对象存储(S3、GCS)上,投影下推的收益更大,因为每次 I/O 请求都有网络延迟,减少请求数量和数据传输量直接降低查询延迟和成本。

5.3 嵌套列的投影

Parquet 对嵌套结构的投影支持比较细致。如果 Schema 中有一个 Struct 类型的列:

message Record {
  required group user {
    required string name;
    optional int32 age;
    optional string email;
  }
  required int64 timestamp;
}

查询只需要 user.nametimestamp 时,Parquet 可以只读取 user.nametimestamp 的列块,跳过 user.ageuser.email。嵌套结构在 Parquet 内部被展平为独立的叶子列(Leaf Column),每个叶子列有独立的列块。

对于 Array(Repeated)类型和 Map 类型,情况稍微复杂。Array 元素的数量通过 Repetition Level 编码,Map 的 key 和 value 是两个独立的叶子列。投影下推仍然在叶子列级别进行。


六、Parquet 版本演进

6.1 版本号体系

Parquet 的版本号容易混淆,因为存在两个独立的版本号体系:

  1. Format Version:文件格式规范的版本号,定义在 parquet-format 仓库中。重要版本包括 1.0、2.0、2.4、2.6、2.7、2.8、2.9、2.10。
  2. Writer Version:写入器声明的兼容版本号,存储在 File Metadata 的 created_by 字段中。例如 parquet-mr version 1.14.1 表示使用 parquet-java 1.14.1 版本写入。

Format Version 决定了文件中可以使用哪些特性。为了向后兼容,大多数写入器默认使用 Format Version 1.0 的特性子集,除非显式配置为使用更高版本的特性。

6.2 各版本关键变化

版本 年份 关键特性
1.0 2013 初始版本:三级结构、基本编码、Thrift 元数据
2.0 2014 Data Page V2、Delta 编码、修正字符串统计信息语义
2.4 2018 逻辑类型(Logical Type)重构,替代旧的 ConvertedType
2.6 2019 Page Index(Column Index + Offset Index)
2.7 2020 Bloom Filter 支持
2.8 2021 Byte Stream Split 编码
2.9 2022 16 位浮点(FLOAT16)类型
2.10 2023 Geometry 和 Geography 逻辑类型(GeoParquet)

6.3 逻辑类型(Logical Type)

Parquet 的物理类型(Physical Type)只有七种:BOOLEAN、INT32、INT64、FLOAT、DOUBLE、BYTE_ARRAY、FIXED_LEN_BYTE_ARRAY。逻辑类型(Logical Type)在物理类型之上定义更丰富的语义:

逻辑类型 物理类型 语义
STRING BYTE_ARRAY UTF-8 字符串
DECIMAL INT32/INT64/FIXED_LEN_BYTE_ARRAY/BYTE_ARRAY 定点数,带精度和标度
DATE INT32 日期,Unix 纪元以来的天数
TIME INT32/INT64 时间,毫秒或微秒精度
TIMESTAMP INT64 时间戳,毫秒、微秒或纳秒精度
UUID FIXED_LEN_BYTE_ARRAY(16) 128 位 UUID
LIST group 有序列表
MAP group 键值映射
ENUM BYTE_ARRAY 枚举字符串
JSON BYTE_ARRAY JSON 字符串
BSON BYTE_ARRAY BSON 二进制
FLOAT16 FIXED_LEN_BYTE_ARRAY(2) IEEE 754 半精度浮点数

Parquet 2.4 之前使用 ConvertedType 枚举来表示逻辑类型,2.4 之后改为 LogicalType 结构体,能携带更多参数(如 TIMESTAMP 的精度和时区信息)。新的写入器应该使用 LogicalType,但同时写入 ConvertedType 以兼容旧的读取器。

6.4 兼容性实践

我认为在实际工程中处理 Parquet 版本兼容性时,有三条原则:

  1. 写入时保守,读取时宽容。 写入器默认使用 Format Version 1.0 的特性,除非确认所有消费者都支持更高版本。读取器应该尽量支持所有已知版本。

  2. 显式声明 writer version。 使用 PyArrow 写入时通过 version 参数控制("1.0""2.6")。使用 Spark 时通过 spark.sql.parquet.writeLegacyFormat 控制。

  3. 测试下游兼容性。 写入后用目标引擎实际读取验证。Hive 对 Data Page V2 的支持历史上有过问题,Spark 对 Page Index 的利用程度在不同版本间有差异。


七、parquet-tools 实战

7.1 安装

parquet-tools 有多个实现。推荐使用 Python 版本的 parquet-tools(基于 PyArrow)或 Apache 官方的 parquet-cli(基于 Java)。

# Python 版本(推荐,轻量)
pip install parquet-tools

# 或者直接使用 PyArrow 自带的命令行功能
pip install pyarrow

Apache 官方的 parquet-cli 需要 Java 环境:

# 下载 parquet-cli JAR
# https://repo1.maven.org/maven2/org/apache/parquet/parquet-cli/
java -jar parquet-cli-1.14.1.jar meta input.parquet

7.2 查看 Schema

# 查看文件的 Schema
parquet-tools schema input.parquet

输出示例:

message schema {
  required int64 id;
  optional binary name (STRING);
  optional int32 age;
  optional double salary;
  optional binary city (STRING);
  optional int64 created_at (TIMESTAMP(MILLIS,true));
}

这里 binary 是 Parquet 的物理类型名称,对应 BYTE_ARRAY。括号中的 STRINGTIMESTAMP(MILLIS,true) 是逻辑类型注解。

7.3 查看元数据

# 查看文件级元数据
parquet-tools meta input.parquet

输出示例(简化):

File Metadata:
  Version: 2.6
  Created By: parquet-cpp-arrow version 15.0.0
  Num Row Groups: 3
  Total Rows: 1500000

Row Group 0:
  Num Rows: 500000
  Total Byte Size: 42315678
  Column: id
    Type: INT64
    Encodings: PLAIN, RLE
    Compression: ZSTD
    Num Values: 500000
    Null Count: 0
    Min: 1
    Max: 500000
    Compressed Size: 1234567
    Uncompressed Size: 4000000
  Column: name
    Type: BYTE_ARRAY
    Encodings: PLAIN, RLE, PLAIN_DICTIONARY
    Compression: ZSTD
    Num Values: 500000
    Null Count: 1234
    Min: Aaron
    Max: Zylstra
    Compressed Size: 5678901
    Uncompressed Size: 12345678
  ...

7.4 查看数据内容

# 查看前 10 行数据
parquet-tools head -n 10 input.parquet

# 查看指定列的前 5 行
parquet-tools head -n 5 -c id,name input.parquet

7.5 查看 Row Group 详细信息

# 使用 PyArrow 查看详细的 Row Group 元数据
python3 -c "
import pyarrow.parquet as pq
f = pq.ParquetFile('input.parquet')
print('Num Row Groups:', f.metadata.num_row_groups)
for i in range(f.metadata.num_row_groups):
    rg = f.metadata.row_group(i)
    print(f'Row Group {i}: {rg.num_rows} rows, {rg.total_byte_size} bytes')
    for j in range(rg.num_columns):
        col = rg.column(j)
        print(f'  Column {j} ({col.path_in_schema}):')
        print(f'    Encoding: {col.encodings}')
        print(f'    Compression: {col.compression}')
        if col.statistics:
            stats = col.statistics
            print(f'    Min: {stats.min}, Max: {stats.max}')
            print(f'    Null Count: {stats.null_count}')
"

7.6 检查 Page Index

# 查看 Page Index 信息(需要文件写入时启用)
python3 -c "
import pyarrow.parquet as pq
f = pq.ParquetFile('input.parquet')
metadata = f.metadata
for i in range(metadata.num_row_groups):
    rg = metadata.row_group(i)
    for j in range(rg.num_columns):
        col = rg.column(j)
        print(f'Row Group {i}, Column {j} ({col.path_in_schema}):')
        print(f'  Has Column Index: {col.is_stats_set}')
"

7.7 文件大小分析

在诊断 Parquet 文件的存储效率时,一个常用方法是分析每列的压缩率:

python3 -c "
import pyarrow.parquet as pq
f = pq.ParquetFile('input.parquet')
metadata = f.metadata
print(f'Total file rows: {metadata.num_rows}')
print(f'Total row groups: {metadata.num_row_groups}')
print()
print(f'{\"Column\":<30} {\"Compressed\":>12} {\"Uncompressed\":>14} {\"Ratio\":>8}')
print('-' * 68)

col_stats = {}
for i in range(metadata.num_row_groups):
    rg = metadata.row_group(i)
    for j in range(rg.num_columns):
        col = rg.column(j)
        name = col.path_in_schema
        if name not in col_stats:
            col_stats[name] = [0, 0]
        col_stats[name][0] += col.total_compressed_size
        col_stats[name][1] += col.total_uncompressed_size

for name, (comp, uncomp) in col_stats.items():
    ratio = comp / uncomp if uncomp > 0 else 0
    print(f'{name:<30} {comp:>12,} {uncomp:>14,} {ratio:>7.1%}')
"

八、PyArrow 读写 Parquet

8.1 基本写入

PyArrow 是目前最常用的 Parquet 读写库之一。它基于 C++ 实现的 parquet-cpp,性能接近原生代码。

import pyarrow as pa
import pyarrow.parquet as pq

# 构造数据
table = pa.table({
    "id": pa.array(range(1000000), type=pa.int64()),
    "name": pa.array([f"user_{i}" for i in range(1000000)], type=pa.string()),
    "age": pa.array([20 + (i % 50) for i in range(1000000)], type=pa.int32()),
    "salary": pa.array([3000.0 + i * 0.5 for i in range(1000000)], type=pa.float64()),
    "city": pa.array(["Beijing", "Shanghai", "Guangzhou", "Shenzhen"] * 250000,
                     type=pa.string()),
})

# 写入 Parquet 文件
pq.write_table(
    table,
    "output.parquet",
    compression="zstd",
    compression_level=3,
    row_group_size=500000,
    version="2.6",
    write_statistics=True,
    use_dictionary=True,
)

关键参数说明:

8.2 基本读取

import pyarrow.parquet as pq

# 读取整个文件
table = pq.read_table("output.parquet")
print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")
print(table.schema)

# 投影下推:只读取指定列
table = pq.read_table("output.parquet", columns=["name", "salary"])

# 谓词下推:使用过滤条件
table = pq.read_table(
    "output.parquet",
    filters=[
        ("age", ">", 30),
        ("city", "=", "Beijing"),
    ],
)

filters 参数接受一个条件列表,列表中的条件默认以 AND 连接。PyArrow 会将这些条件转换为行组级别和页级别(如果有 Page Index)的过滤。

8.3 分区写入

分区(Partitioning)是处理大数据集的标准做法。PyArrow 支持按列值将数据写入不同的子目录:

import pyarrow as pa
import pyarrow.parquet as pq

table = pa.table({
    "date": ["2025-01-01", "2025-01-01", "2025-01-02", "2025-01-02"],
    "city": ["Beijing", "Shanghai", "Beijing", "Guangzhou"],
    "value": [100, 200, 300, 400],
})

# 按 date 和 city 分区写入
pq.write_to_dataset(
    table,
    root_path="output_partitioned",
    partition_cols=["date", "city"],
    compression="zstd",
)

写入后的目录结构:

output_partitioned/
├── date=2025-01-01/
│   ├── city=Beijing/
│   │   └── part-0.parquet
│   └── city=Shanghai/
│       └── part-0.parquet
└── date=2025-01-02/
    ├── city=Beijing/
    │   └── part-0.parquet
    └── city=Guangzhou/
        └── part-0.parquet

分区下推(Partition Pruning)比行组级谓词下推更高效:查询条件涉及分区列时,读取器直接跳过不匹配的子目录,连文件都不需要打开。

8.4 流式写入(逐行组写入)

对于无法一次性放入内存的大数据集,可以使用 ParquetWriter 逐行组写入:

import pyarrow as pa
import pyarrow.parquet as pq

schema = pa.schema([
    ("id", pa.int64()),
    ("value", pa.float64()),
])

writer = pq.ParquetWriter(
    "large_output.parquet",
    schema,
    compression="zstd",
    version="2.6",
)

# 模拟分批生成数据
for batch_idx in range(10):
    batch = pa.table({
        "id": pa.array(range(batch_idx * 100000, (batch_idx + 1) * 100000)),
        "value": pa.array([float(i) * 0.1 for i in range(100000)]),
    })
    writer.write_table(batch)

writer.close()  # 必须显式关闭,写入 Footer

每次调用 write_table() 会写入一个新的行组。如果不显式关闭 Writer,Footer 不会写入,文件将不可读。推荐使用上下文管理器(Context Manager):

with pq.ParquetWriter("output.parquet", schema, compression="zstd") as writer:
    for batch in data_batches:
        writer.write_table(batch)
# 离开 with 块时自动关闭

8.5 读取元数据

import pyarrow.parquet as pq

# 只读取元数据,不读取数据
metadata = pq.read_metadata("output.parquet")
print(f"Format Version: {metadata.format_version}")
print(f"Created By: {metadata.created_by}")
print(f"Num Rows: {metadata.num_rows}")
print(f"Num Row Groups: {metadata.num_row_groups}")
print(f"Num Columns: {metadata.num_columns}")

# 检查 Schema
schema = pq.read_schema("output.parquet")
print(schema)

# 检查单个 Row Group
for i in range(metadata.num_row_groups):
    rg = metadata.row_group(i)
    print(f"\nRow Group {i}:")
    print(f"  Rows: {rg.num_rows}")
    print(f"  Size: {rg.total_byte_size:,} bytes")

8.6 控制编码方式

PyArrow 允许按列指定编码方式:

import pyarrow as pa
import pyarrow.parquet as pq

table = pa.table({
    "ts": pa.array(range(0, 1000000, 1), type=pa.int64()),
    "temperature": pa.array([20.5 + i * 0.001 for i in range(1000000)],
                            type=pa.float32()),
    "status": pa.array(["OK", "WARN", "ERROR"] * 333333 + ["OK"],
                       type=pa.string()),
})

pq.write_table(
    table,
    "encoded.parquet",
    compression="zstd",
    use_dictionary=["status"],         # 只对 status 列启用字典编码
    column_encoding={
        "ts": "DELTA_BINARY_PACKED",   # 时间戳用 Delta 编码
        "temperature": "BYTE_STREAM_SPLIT",  # 浮点用 Byte Stream Split
    },
    write_statistics=True,
    version="2.6",
)

# 验证编码方式
f = pq.ParquetFile("encoded.parquet")
rg = f.metadata.row_group(0)
for i in range(rg.num_columns):
    col = rg.column(i)
    print(f"{col.path_in_schema}: encodings={col.encodings}, "
          f"compression={col.compression}")

8.7 Pandas 互操作

PyArrow 与 Pandas 的互操作是最常见的使用场景:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Pandas DataFrame → Parquet
df = pd.DataFrame({
    "id": range(100000),
    "name": [f"item_{i}" for i in range(100000)],
    "price": [round(i * 0.99, 2) for i in range(100000)],
})
df.to_parquet("from_pandas.parquet", engine="pyarrow", compression="zstd")

# Parquet → Pandas DataFrame
df2 = pd.read_parquet("from_pandas.parquet", columns=["name", "price"])
print(df2.head())

需要注意的是,Pandas 的 NaN 和 Parquet 的 null 语义不完全一致。Pandas 使用 NaN 表示缺失值(对于浮点列),使用 Nonepd.NA(对于 Nullable 整数类型)。PyArrow 在转换时会自动处理,但边界情况下可能需要显式指定类型映射。


九、Row Group 大小调优

9.1 Row Group 大小的影响因素

行组大小是 Parquet 文件性能调优中最重要的参数之一。它影响四个方面:

  1. I/O 效率:行组越大,每次 I/O 请求读取的有效数据越多,元数据开销占比越低。
  2. 并行度:行组是并行处理的基本单位。行组太少,Task 数量不足,集群利用率低。
  3. 内存占用:写入器需要在内存中缓存一个行组的数据才能刷写。行组越大,写入时的内存峰值越高。
  4. 谓词下推精度:行组越小,统计信息的覆盖范围越小,过滤效果越好。但行组太小,元数据膨胀,整体性能反而下降。

9.2 行组大小 vs HDFS Block 大小

在 HDFS 上,一个经典的建议是让行组大小等于 HDFS Block 大小(默认 128MB)。原因是:

但在对象存储(S3、GCS)上,没有 Block 的概念,数据本地性也不适用。此时行组大小的选择更自由。一般建议在 128MB 到 512MB 之间,根据查询模式调整。

9.3 不同场景下的推荐大小

场景 推荐行组大小 原因
HDFS + Spark/Hive 128MB(与 Block 对齐) 数据本地性最优
S3 + Spark 128MB-256MB 平衡 I/O 粒度和并行度
DuckDB 单机分析 256MB-1GB 行组切换开销大于 I/O 收益
实时查询(Impala、Presto) 64MB-128MB 减少单次查询的延迟
频繁使用谓词下推 64MB-128MB 更细粒度的统计信息
宽表(>100 列) 256MB-512MB 每列的列块足够大,压缩效果好

9.4 Spark 中的配置

# Spark 行组大小配置
spark.conf.set("spark.sql.parquet.rowGroupSize", str(128 * 1024 * 1024))  # 128MB

# 或者在写入时指定
df.write \
    .option("parquet.block.size", str(256 * 1024 * 1024)) \
    .parquet("output_path")

Spark 中 parquet.block.sizespark.sql.parquet.rowGroupSize 是同义参数,都控制行组的目标大小(以字节为单位)。注意这是目标大小,实际行组大小可能略有偏差,因为写入器在接近目标大小时才决定是否刷写。

9.5 行组行数 vs 行组大小

有两种方式控制行组粒度:按行数和按字节大小。PyArrow 的 row_group_size 参数控制的是行数上限,而 Spark 的 parquet.block.size 控制的是字节大小上限。

行数控制更简单直观,但不同列的宽度差异可能导致实际字节大小差异很大。字节大小控制更贴近 I/O 行为,但需要写入器在写入过程中估算已累积数据的字节大小。

我认为在大多数场景下,按字节大小控制更合理。如果使用 PyArrow 且需要精确控制字节大小,可以预估每行的平均大小,再换算为行数:

# 假设每行平均 200 字节,目标行组大小 128MB
avg_row_bytes = 200
target_rg_bytes = 128 * 1024 * 1024
row_group_size = target_rg_bytes // avg_row_bytes  # 约 671,000 行

9.6 实测调优方法

调优行组大小的建议步骤:

  1. 用实际数据写入多个不同行组大小的文件。
  2. 记录文件大小和写入时间。
  3. 用实际查询测试读取性能。
  4. 检查行组统计信息的过滤效果。
import pyarrow as pa
import pyarrow.parquet as pq
import time

# 生成测试数据
table = pa.table({
    "id": pa.array(range(5000000), type=pa.int64()),
    "value": pa.array([float(i % 1000) for i in range(5000000)], type=pa.float64()),
    "category": pa.array([f"cat_{i % 100}" for i in range(5000000)], type=pa.string()),
})

# 测试不同行组大小
for rg_size in [50000, 100000, 500000, 1000000, 5000000]:
    start = time.time()
    pq.write_table(
        table,
        f"test_rg_{rg_size}.parquet",
        compression="zstd",
        row_group_size=rg_size,
    )
    write_time = time.time() - start

    f = pq.ParquetFile(f"test_rg_{rg_size}.parquet")
    import os
    file_size = os.path.getsize(f"test_rg_{rg_size}.parquet")
    print(f"RG Size: {rg_size:>10,}  "
          f"Row Groups: {f.metadata.num_row_groups:>3}  "
          f"File Size: {file_size:>12,}  "
          f"Write Time: {write_time:.2f}s")

十、Parquet 最佳实践

10.1 压缩算法选择

压缩算法的选择需要在压缩率、压缩速度和解压速度之间权衡:

算法 压缩率 压缩速度 解压速度 适用场景
Snappy 中等 极快 交互查询,解压速度优先
Zstd 中等 通用推荐,压缩率和速度平衡
LZ4 较低 极快 极快 极端速度要求
Gzip 中等 兼容性要求,冷数据归档
不压缩 无开销 无开销 数据已经不可压缩

Zstd 是目前大多数场景下的最佳选择。它的压缩率接近 Gzip,但解压速度接近 Snappy。Zstd 的压缩级别可调(1-22),级别 3 是一个常用的平衡点。

# Zstd 不同级别的对比测试
import pyarrow.parquet as pq
import os

for level in [1, 3, 6, 9, 15]:
    pq.write_table(table, f"zstd_l{level}.parquet",
                   compression="zstd", compression_level=level)
    size = os.path.getsize(f"zstd_l{level}.parquet")
    print(f"Zstd level {level:>2}: {size:>12,} bytes")

10.2 排序对压缩率的影响

数据写入前按低基数列排序,可以显著提升压缩率。原因有两个:

  1. 排序后同一列的相邻值更可能相同或相近,字典编码和 RLE 编码的效率更高。
  2. 排序后行组内的统计信息(min/max)范围更窄,谓词下推的过滤效果更好。
import pyarrow as pa
import pyarrow.parquet as pq
import os

# 未排序写入
table_unsorted = pa.table({
    "region": pa.array(["East", "West", "North", "South"] * 250000),
    "product": pa.array([f"P{i % 50}" for i in range(1000000)]),
    "amount": pa.array([float(i % 10000) for i in range(1000000)]),
})
pq.write_table(table_unsorted, "unsorted.parquet", compression="zstd")

# 按 region + product 排序后写入
indices = pa.compute.sort_indices(table_unsorted, sort_keys=[
    ("region", "ascending"), ("product", "ascending")
])
table_sorted = table_unsorted.take(indices)
pq.write_table(table_sorted, "sorted.parquet", compression="zstd")

print(f"Unsorted: {os.path.getsize('unsorted.parquet'):>12,} bytes")
print(f"Sorted:   {os.path.getsize('sorted.parquet'):>12,} bytes")

排序的收益取决于数据特征。低基数列放在排序键前面效果最好。对于已经按时间顺序生成的数据(如日志、事件),时间列天然有序,不需要额外排序。

10.3 分区策略

分区(Partitioning)把数据按一个或多个列的值拆分到不同的文件或目录中。分区的目标是让查询引擎在扫描前就排除大量无关数据。

分区策略的选择原则:

  1. 分区列应该是查询中最常出现在 WHERE 子句中的列。 典型的分区列包括日期、地区、业务类型。
  2. 分区数量不宜过多。 每个分区对应一个或多个文件,分区过多导致小文件问题(Small File Problem),增加元数据开销和文件系统负担。一般建议总分区数不超过 10000。
  3. 单个分区文件不宜太小。 理想情况下每个分区文件至少 64MB-128MB。如果分区后每个文件只有几 MB,应该降低分区粒度或合并小文件。
  4. 分区列不应该是高基数列。user_id 作为分区列会产生数百万个分区目录,这是灾难性的。
# 合理的分区策略
pq.write_to_dataset(
    table,
    root_path="events",
    partition_cols=["date", "region"],  # 低基数列
    compression="zstd",
)

# 不合理的分区策略(避免)
# partition_cols=["user_id"]  # 高基数,会产生大量小文件

10.4 Schema 设计

Parquet Schema 设计影响存储效率和查询性能:

  1. 使用最紧凑的物理类型。 如果值域在 INT32 范围内,不要用 INT64。FLOAT 够用时不要用 DOUBLE。每个值省 4 字节,100 万行就省 4MB。

  2. 字符串 vs 枚举。 如果一个字符串列的取值只有几十种,考虑在应用层转换为整数编码。虽然 Parquet 的字典编码也能达到类似效果,但显式使用整数类型更可控。

  3. 避免过深的嵌套。 Parquet 支持任意深度的嵌套结构,但嵌套越深,Definition Level 和 Repetition Level 的开销越大,编解码越慢。如果可以,把嵌套结构展平。

  4. 合理使用可选字段。 Required 字段不需要存储 Definition Level(始终为最大值),比 Optional 字段少一个编码开销。如果某列不可能为 null,声明为 Required。

10.5 小文件合并

小文件是 Parquet 使用中最常见的性能问题之一。大量小文件导致:

合并小文件的方法:

import pyarrow.parquet as pq
import pyarrow.dataset as ds

# 读取包含大量小文件的目录
dataset = ds.dataset("small_files_dir", format="parquet")
table = dataset.to_table()

# 写入合并后的文件
pq.write_table(
    table,
    "merged.parquet",
    compression="zstd",
    row_group_size=1000000,
)

对于 Spark 环境,可以使用 coalesce()repartition() 控制输出文件数量:

# Spark 中控制输出文件数量
df.coalesce(10).write.parquet("output_path")

# 或按目标文件大小重新分区
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000000)
df.write.parquet("output_path")

10.6 写入性能优化

写入 Parquet 文件时的几个优化建议:

  1. 批量写入。 避免逐行写入。每次 write_table() 应该传入至少几万行的数据。PyArrow 的写入器在内部按行组缓存数据,频繁调用小批量写入会增加 Python 层面的开销。

  2. 选择合适的压缩级别。 Zstd level 1-3 通常是写入速度和压缩率的最佳平衡点。级别 9 以上的写入速度会显著下降,但压缩率提升有限。

  3. 禁用不需要的特性。 如果不需要统计信息(例如只做全表扫描),可以关闭统计写入以加速写入。如果某些列不需要字典编码(例如高基数字符串),显式禁用以避免字典构建的开销。

pq.write_table(
    table,
    "fast_write.parquet",
    compression="zstd",
    compression_level=1,
    write_statistics=False,       # 不需要统计信息时关闭
    use_dictionary=["category"],  # 只对低基数列启用字典
)

10.7 读取性能优化

读取 Parquet 文件时的几个优化建议:

  1. 始终使用投影下推。 只读取查询需要的列。这是 Parquet 最基本的优化手段。

  2. 利用谓词下推。 把过滤条件传递给读取器,而不是读取全部数据后在应用层过滤。

  3. 使用 read_table 而不是逐行读取。 PyArrow 的批量读取利用了 SIMD 解码和内存预分配,比 Python 层面的逐行处理快几个数量级。

  4. 预读元数据。 如果需要多次读取同一个文件的不同列,先读取元数据并缓存,避免重复的 Footer 读取。

# 预读元数据
pf = pq.ParquetFile("large_file.parquet")
metadata = pf.metadata  # 缓存元数据

# 后续按行组读取
for i in range(metadata.num_row_groups):
    rg_table = pf.read_row_group(i, columns=["col_a", "col_b"])
    # 处理 rg_table
  1. 使用 ParquetDataset 读取分区数据。 它自动处理分区发现和分区裁剪。
dataset = pq.ParquetDataset(
    "partitioned_data",
    filters=[("date", "=", "2025-01-15")],
)
table = dataset.read(columns=["user_id", "amount"])

10.8 Schema 演化

Parquet 支持有限的 Schema 演化(Schema Evolution):

  1. 添加新列:新增的列在旧文件中自动填充为 null。这是最安全的演化操作。
  2. 删除列:读取时指定不包含已删除列的列列表。旧文件中仍然包含该列的数据,但不会被读取。
  3. 重命名列:Parquet 按列名匹配,重命名等同于删除旧列 + 新增新列。
  4. 修改类型:不支持直接修改列的物理类型。需要写入新文件。
# Schema 演化:读取添加了新列的混合数据集
dataset = ds.dataset("evolved_data", format="parquet")
# PyArrow 自动合并不同文件的 Schema
# 旧文件中缺失的新列会被填充为 null
table = dataset.to_table()
print(table.schema)

在 Delta Lake 和 Apache Iceberg 等表格式中,Schema 演化有更完善的支持,包括类型安全的类型扩展(如 INT32 到 INT64)和列重命名追踪。如果需要频繁的 Schema 变更,建议在 Parquet 上层使用这些表格式。

10.9 与其他格式的互操作

Parquet 作为中间格式,经常需要与其他格式互转:

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as csv
import pyarrow.json as json_arrow

# CSV → Parquet
csv_table = csv.read_csv("input.csv")
pq.write_table(csv_table, "from_csv.parquet", compression="zstd")

# JSON → Parquet
json_table = json_arrow.read_json("input.json")
pq.write_table(json_table, "from_json.parquet", compression="zstd")

# Parquet → CSV
table = pq.read_table("input.parquet")
csv.write_csv(table, "output.csv")

CSV 和 JSON 转 Parquet 时需要注意类型推断的准确性。PyArrow 的 CSV Reader 会自动推断类型,但可能把整数推断为浮点数或把日期推断为字符串。显式指定 Schema 可以避免这个问题:

schema = pa.schema([
    ("id", pa.int64()),
    ("name", pa.string()),
    ("date", pa.date32()),
    ("amount", pa.decimal128(10, 2)),
])
csv_table = csv.read_csv("input.csv", convert_options=csv.ConvertOptions(
    column_types=schema,
))

10.10 监控与诊断

生产环境中使用 Parquet 时,建议监控以下指标:

  1. 文件大小分布:检查是否有大量小文件。
  2. 行组大小分布:检查行组大小是否在合理范围内。
  3. 列压缩率:识别压缩率异常低的列,考虑调整编码方式或排序策略。
  4. 谓词下推命中率:检查有多少行组被统计信息过滤掉,评估数据排序和分区策略的效果。
import pyarrow.parquet as pq
import os

def diagnose_parquet(file_path):
    """诊断 Parquet 文件的健康状况。"""
    file_size = os.path.getsize(file_path)
    metadata = pq.read_metadata(file_path)

    print(f"File: {file_path}")
    print(f"Size: {file_size:,} bytes ({file_size / 1024 / 1024:.1f} MB)")
    print(f"Rows: {metadata.num_rows:,}")
    print(f"Columns: {metadata.num_columns}")
    print(f"Row Groups: {metadata.num_row_groups}")
    print(f"Avg Row Group Size: "
          f"{file_size / metadata.num_row_groups / 1024 / 1024:.1f} MB")
    print(f"Bytes per Row: {file_size / metadata.num_rows:.1f}")
    print()

    # 检查小行组
    small_rg_count = 0
    for i in range(metadata.num_row_groups):
        rg = metadata.row_group(i)
        rg_size_mb = rg.total_byte_size / 1024 / 1024
        if rg_size_mb < 10:
            small_rg_count += 1

    if small_rg_count > 0:
        print(f"[WARN] {small_rg_count} row groups smaller than 10MB")

    # 检查列压缩率
    print(f"\n{'Column':<30} {'Compression':>12} {'Ratio':>8}")
    print("-" * 54)
    for j in range(metadata.num_columns):
        total_comp = 0
        total_uncomp = 0
        col_name = ""
        for i in range(metadata.num_row_groups):
            col = metadata.row_group(i).column(j)
            col_name = col.path_in_schema
            total_comp += col.total_compressed_size
            total_uncomp += col.total_uncompressed_size
        ratio = total_comp / total_uncomp if total_uncomp > 0 else 0
        flag = " [!]" if ratio > 0.9 else ""
        print(f"{col_name:<30} {col.compression:>12} {ratio:>7.1%}{flag}")

diagnose_parquet("output.parquet")

输出中 [!] 标记的列压缩率高于 90%(接近不可压缩),可能需要关注。常见原因包括:数据本身随机性高(如 UUID、哈希值)、已经是压缩过的二进制数据、或者编码方式不匹配。


参考资料

论文

  1. S. Melnik, A. Gubarev, J. J. Long, G. Romer, S. Shivakumar, M. Tolton, T. Vassilakis. “Dremel: Interactive Analysis of Web-Scale Datasets.” VLDB, 2010. Parquet 嵌套编码方案的理论基础。

  2. D. Lemire, L. Boytsov. “Decoding billions of integers in milliseconds through vectorized bit packing.” Software: Practice and Experience, 2015. Delta Binary Packing 编码的参考实现。

规范与源码

  1. Apache Parquet Format 规范。https://github.com/apache/parquet-format. 版本参考:2.10。

  2. Apache Parquet Java 实现。https://github.com/apache/parquet-java. 版本参考:1.14.x。

  3. Apache Arrow(含 parquet-cpp)。https://github.com/apache/arrow. PyArrow 的底层实现。

文档

  1. PyArrow Parquet 文档。https://arrow.apache.org/docs/python/parquet.html. Python API 参考。

  2. Spark Parquet 数据源文档。https://spark.apache.org/docs/latest/sql-data-sources-parquet.html. Spark 中的 Parquet 配置项。


上一篇: 列式存储原理 下一篇: Arrow 内存格式

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-22 · db / storage

数据库内核实验索引

汇总本站数据库内核与存储引擎实验文章,重点覆盖从零实现 LSM-Tree 及其工程权衡。

2026-04-22 · storage

存储工程索引

汇总本站存储工程系列文章,覆盖 HDD、SSD、NVMe、持久内存、索引结构、压缩、分布式存储与对象存储。

2025-10-18 · storage

【存储工程】云块存储架构

深入剖析云块存储——分布式块存储架构原理、AWS EBS与阿里云ESSD架构分析、云盘性能规格解读、性能测试方法与选型成本优化


By .