当你试图对 100GB 的日志文件做排序,而机器只有 4GB 内存时,快排帮不了你。这不是算法能力的问题,而是物理约束:数据根本放不进内存。
外部排序(External Sorting)是数据库、搜索引擎和大数据系统的基石之一。它的核心思想并不复杂——分而治之加多路归并——但工程实现中隐藏着大量细节:如何生成更长的初始 run?败者树为什么比最小堆快?双缓冲怎么和异步 I/O 配合?这些问题决定了系统在真实负载下的表现。
本文从理论模型出发,逐步深入到完整的 C 语言实现,再到 SQLite、LevelDB、MapReduce 等真实系统的设计选择。
一、外部存储模型与 I/O 复杂度
1.1 为什么需要新的计算模型
传统的 RAM 模型假设任意内存访问代价相同,时间复杂度只计算比较和移动次数。但在外部排序的场景下,这个假设完全失效:一次磁盘 I/O 的延迟是内存访问的 10 万倍以上。算法的瓶颈不再是 CPU 计算,而是磁盘读写次数。
Aggarwal 和 Vitter 在 1988 年提出了外部存储模型(External Memory Model,也称 I/O Model 或 DAM Model),用三个参数刻画问题规模:
- N:数据总量(以记录数计)
- M:内存容量(能容纳的记录数)
- B:磁盘块大小(一次 I/O 传输的记录数)
在这个模型中,我们只计算 I/O 次数,CPU 计算视为免费。
1.2 排序的 I/O 下界
外部排序的 I/O 复杂度下界是:
I/O 复杂度 = O( (N/B) · log_{M/B}(N/B) )
理解这个公式的关键在于:
N/B是数据占用的磁盘块数,也是”最少要读这么多次”的基线log_{M/B}(N/B)是归并的趟数(passes)M/B是归并的路数——内存能同时容纳多少个输入缓冲区
举个具体的例子:
N = 100GB 数据(约 10^10 条小记录)
M = 4GB 内存
B = 4KB 磁盘块(约 100 条记录)
磁盘块数 = N/B = 25 × 10^6
归并路数 = M/B = 10^6
归并趟数 = log_{10^6}(25 × 10^6) ≈ 1.23,即 2 趟
总 I/O ≈ 2 × 2 × (N/B) = 10^8 次块传输
注意这个结果:即使是 100GB 的数据,在 4GB
内存下通常只需要 2
趟归并就能完成排序。这是因为 M/B
通常很大,使得对数的底数很大。
1.3 与内部排序的对比
| 指标 | 内部排序(快排) | 外部排序(多路归并) |
|---|---|---|
| 度量 | 比较次数 | I/O 次数 |
| 复杂度 | O(N log N) | O((N/B) log_{M/B}(N/B)) |
| 瓶颈 | CPU | 磁盘带宽 |
| 常数因子关键 | 缓存命中率 | 顺序 vs 随机 I/O |
| 优化方向 | 减少分支预测失败 | 增大归并路数、异步 I/O |
二、基本多路归并算法
外部排序的经典算法分为两个阶段:
2.1 阶段一:生成初始有序 Run
- 从磁盘读入 M 大小的数据块到内存
- 用内部排序算法(快排、堆排等)对内存中的数据排序
- 将排好序的数据写回磁盘,形成一个”run”
- 重复直到所有数据都被处理
如果数据总量为 N,内存为 M,那么初始 run 的数量为
⌈N/M⌉。
2.2 阶段二:多路归并
将所有 run 同时打开,每个 run 分配一个输入缓冲区,再分配一个输出缓冲区:
- 从每个 run 的缓冲区中取出当前最小元素
- 用优先队列(败者树或最小堆)找到全局最小值
- 将最小值写入输出缓冲区
- 从该最小值所属的 run 中补充下一个元素
- 输出缓冲区满时,写回磁盘
- 重复直到所有 run 耗尽
2.3 归并趟数的计算
如果有 R 个初始 run,每趟做 k 路归并,则需要
⌈log_k(R)⌉ 趟。k
的最大值受限于内存:每路需要一个缓冲区(大小
B),加上一个输出缓冲区,所以 k = M/B - 1。
当 k >= R
时,一趟就能完成归并。在实际系统中,这是常见的情况。
示例:
- 数据量 100GB,内存 4GB,块大小 4KB
- 初始 run 数量 = 100GB / 4GB = 25
- 归并路数 k = 4GB / 4KB - 1 ≈ 10^6
- 因为 k >> R,一趟归并即可完成
三、初始 Run 生成策略
初始 run 的质量(长度和数量)直接影响后续归并的效率。run 越长,数量越少,归并趟数越少。
3.1 简单切分:内部排序法
最直接的方法:每次读入 M 条记录,内排序后写出。
- 优点:实现简单,run 长度确定(等于 M)
- 缺点:run 长度上限就是内存大小
选择内部排序算法时需要注意:快排的平均性能最好,但不稳定(相同键的记录可能打乱顺序)。如果需要稳定排序,用归并排序。
3.2 替换选择(Replacement Selection)
替换选择是 Knuth 在《The Art of Computer Programming》中详细讨论的经典方法。它能生成平均长度为 2M 的初始 run——是简单方法的两倍。
算法过程:
1. 用前 M 条记录填满堆(优先队列)
2. 重复以下步骤:
a. 从堆中取出最小元素 x,写入当前 run
b. 从输入读取下一条记录 y
c. 如果 y >= x(不破坏当前 run 的有序性):
将 y 放入堆中
d. 否则:
标记 y 为"下一个 run"的元素
将 y 放入堆的"冻结区"
e. 如果堆中没有非冻结元素:
开始新的 run
解冻所有冻结元素
为什么平均 run 长度是 2M?
直觉上可以用”雪花融化”的类比理解:堆像一个蓄水池,数据像不断落入的雪花。当输入数据是随机分布时,大约有一半的新元素能直接融入当前 run(大于刚输出的值),这使得 run 在被”消耗”的同时又不断被”补充”,最终长度约为容量的两倍。严格证明需要用到随机过程理论。
3.3 替换选择的代价与权衡
替换选择并非没有代价:
| 方面 | 内部排序法 | 替换选择 |
|---|---|---|
| run 平均长度 | M | 2M |
| run 数量 | N/M | N/(2M) |
| CPU 开销 | O(M log M)(快排) | O(N log M)(堆操作) |
| 实现复杂度 | 低 | 中 |
| 缓存友好性 | 好(数组连续访问) | 差(堆的随机跳跃) |
在现代硬件上,替换选择的缓存不友好性是个严重问题。当 M 很大时(比如数 GB),堆操作会导致大量缓存未命中。近年来一些系统(如 MariaDB 10.x)已经放弃了替换选择,改用简单的内部排序加更大的缓冲区。
我的看法是:如果你的磁盘很慢(HDD)、内存不大(几百 MB),替换选择值得考虑,因为减少趟数的收益能抵消 CPU 损失。如果磁盘是 SSD 且内存充足,直接用内部排序法更简单可靠。
3.4 对近乎有序数据的加速
替换选择有一个常被忽视的优势:如果输入数据已经近乎有序(比如时间序列日志),run 长度可以远超 2M,甚至趋近于 N。在极端情况下(输入完全有序),只会生成一个 run,根本不需要归并。
这是一个在实际系统中很有用的性质,因为真实数据常常带有局部有序性。
四、败者树与最小堆
多路归并的核心数据结构是优先队列:从 k 个有序序列中反复取出全局最小值。最小堆和败者树都能完成这个任务,但败者树在这个场景下更优。
4.1 最小堆的问题
最小堆做 k 路归并时,每次取出最小值后需要:
- 将新元素放到堆顶
- 下沉(sift-down)恢复堆性质
下沉操作每层需要 2
次比较(先比较两个子节点,再与父节点比较),共
2 · ⌈log_2 k⌉ 次比较。
更关键的是,每次比较都涉及不同的元素对。在 CPU 层面,这导致分支预测频繁失败,因为比较结果的不确定性很高。
4.2 败者树的优势
败者树(Loser Tree)是锦标赛树(Tournament Tree)的变体,最初由 Knuth 描述。它的核心思想是:
- 叶子节点存放 k 路的当前元素
- 每个内部节点记录该子树中比赛的”败者”(较大者)
- 树顶额外维护一个”总冠军”位置,存放全局最小值
当某路的元素被取走后,只需从该叶子到根的路径上做
每层 1 次比较,总共 ⌈log_2 k⌉
次——是最小堆的一半。
原因在于:沿着路径上溯时,每个节点只需要与该节点记录的败者比较。如果新元素比败者小,新元素继续上升(赢了),败者留在原位。如果新元素比败者大,新元素留下(成为新败者),原败者上升。每个节点只需一次比较就能确定结果。
4.3 败者树的结构
[总冠军: 最小值]
|
[败者 L1] <- 内部节点存败者的来源索引
/ \
[败者 L2] [败者 L3]
/ \ / \
叶0 叶1 叶2 叶3 <- 叶子存各路当前值
败者树的内部节点不存值,只存”哪一路输了”的索引。这使得更新操作非常紧凑。
4.4 败者树的 C 语言实现
#include <limits.h>
#include <string.h>
#define MAX_WAYS 1024
typedef int64_t sort_key_t;
typedef struct {
int loser[MAX_WAYS]; /* 内部节点:败者来源的路编号 */
sort_key_t keys[MAX_WAYS]; /* 叶子节点:各路当前键值 */
int k; /* 归并路数 */
} LoserTree;
/* 初始化败者树 */
void loser_tree_init(LoserTree *lt, int k) {
lt->k = k;
/* 所有内部节点初始指向一个虚拟的"负无穷"选手 */
for (int i = 0; i < k; i++) {
lt->loser[i] = -1;
}
/* 所有叶子初始为正无穷(哨兵) */
for (int i = 0; i < k; i++) {
lt->keys[i] = INT64_MAX;
}
}
/* 从叶子 idx 到根的路径上调整败者 */
void loser_tree_adjust(LoserTree *lt, int idx) {
int k = lt->k;
int winner = idx;
/* 从叶子对应的内部节点开始,向根上溯 */
int parent = (k + idx) / 2;
while (parent > 0) {
if (lt->loser[parent] == -1) {
/* 首次填充:直接设为败者,winner 上升 */
lt->loser[parent] = winner;
/* winner 还没有对手,由初始化保证 */
return;
}
int loser = lt->loser[parent];
if (lt->keys[winner] > lt->keys[loser]) {
/* winner 输了,留下来做败者,原败者成为新 winner */
lt->loser[parent] = winner;
winner = loser;
}
/* 否则 winner 赢了,继续上升 */
parent /= 2;
}
/* 到达根:winner 就是全局最小值 */
lt->loser[0] = winner;
}
/* 构建败者树:逐一插入各路的首元素 */
void loser_tree_build(LoserTree *lt, sort_key_t initial_keys[], int k) {
lt->k = k;
for (int i = 0; i < k; i++) {
lt->loser[i] = -1;
}
for (int i = 0; i < k; i++) {
lt->keys[i] = initial_keys[i];
loser_tree_adjust(lt, i);
}
}
/* 获取当前最小值所在的路编号 */
static inline int loser_tree_winner(const LoserTree *lt) {
return lt->loser[0];
}
/* 弹出最小值,用该路的下一个元素替换,重新调整 */
void loser_tree_replace(LoserTree *lt, int idx, sort_key_t new_key) {
lt->keys[idx] = new_key;
/* 从 idx 叶子开始,沿路径上溯做调整 */
int k = lt->k;
int winner = idx;
int parent = (k + idx) / 2;
while (parent > 0) {
int loser = lt->loser[parent];
if (lt->keys[winner] > lt->keys[loser]) {
lt->loser[parent] = winner;
winner = loser;
}
parent /= 2;
}
lt->loser[0] = winner;
}4.5 性能对比
在 k=256 路归并的基准测试中(数据来自实际项目经验):
| 度量 | 最小堆 | 败者树 | 差距 |
|---|---|---|---|
| 比较次数/元素 | 16 | 8 | 2x |
| 分支误预测率 | ~40% | ~30% | 显著 |
| 吞吐量 | 80M elem/s | 120M elem/s | 1.5x |
| L1 缓存命中率 | 92% | 95% | - |
败者树的 1.5 倍吞吐量优势主要来自比较次数减半和更好的分支预测行为。当 k 更大时,优势更明显。
五、缓冲区管理与异步 I/O
外部排序的性能瓶颈在 I/O。聪明的缓冲区管理策略能让 CPU 和磁盘并行工作,从而接近磁盘带宽的理论上限。
5.1 单缓冲的问题
最简单的实现:每个 run 分配一个缓冲区,用完了就同步读取下一块。问题是显而易见的——CPU 在等待磁盘时完全空闲,磁盘在 CPU 处理数据时也空闲。
时间轴(单缓冲):
CPU: [处理][等待][处理][等待][处理]
磁盘: [等待][读取][等待][读取][等待]
5.2 双缓冲(Double Buffering)
每个 run 分配两个缓冲区(A 和 B)。当 CPU 消费缓冲区 A 中的数据时,磁盘异步将下一块数据预读到缓冲区 B。A 用完后切换到 B,同时开始异步读取下一块到 A。
时间轴(双缓冲):
CPU: [处理A][处理B][处理A][处理B]
磁盘: [读取B][读取A][读取B][读取A]
^重叠^ ^重叠^ ^重叠^
代价是内存使用翻倍:k 路归并需要 2k 个输入缓冲区加 2 个输出缓冲区。这意味着最大归并路数减半。但在绝大多数情况下,这是值得的,因为减少的路数对归并趟数的影响微乎其微(对数函数增长极慢)。
5.3 输出缓冲区的双缓冲
输出侧同样需要双缓冲。当一个输出缓冲区满时,异步写入磁盘,同时切换到另一个继续填充。这避免了输出阻塞归并过程。
typedef struct {
char *buf[2]; /* 两个输出缓冲区 */
int active; /* 当前正在填充的缓冲区 (0 或 1) */
size_t pos; /* 当前缓冲区的写入位置 */
size_t buf_size; /* 缓冲区大小 */
int write_fd; /* 输出文件描述符 */
/* 异步写状态 */
struct aiocb aio; /* POSIX AIO 控制块 */
int write_pending; /* 是否有异步写操作进行中 */
} OutputBuffer;
/* 写入一条记录到输出缓冲区 */
int output_buffer_write(OutputBuffer *ob, const void *record, size_t len) {
if (ob->pos + len > ob->buf_size) {
/* 当前缓冲区满,等待上一次异步写完成 */
if (ob->write_pending) {
while (aio_error(&ob->aio) == EINPROGRESS) {
/* 自旋等待或 yield */
}
ob->write_pending = 0;
}
/* 启动异步写 */
memset(&ob->aio, 0, sizeof(struct aiocb));
ob->aio.aio_fildes = ob->write_fd;
ob->aio.aio_buf = ob->buf[ob->active];
ob->aio.aio_nbytes = ob->pos;
aio_write(&ob->aio);
ob->write_pending = 1;
/* 切换缓冲区 */
ob->active ^= 1;
ob->pos = 0;
}
memcpy(ob->buf[ob->active] + ob->pos, record, len);
ob->pos += len;
return 0;
}5.4 预读策略
更进一步的优化是预测性预读(prefetch)。当某一路的缓冲区消费到一半时,就提前触发异步读取该路的下一块。核心思路是跟踪每路的消费进度,在缓冲区耗尽前启动下一次
I/O。实现要点:用一个 read_pending
标志避免重复提交,在消费到 50%
时就触发预读,缓冲区耗尽时交换读/消费角色后立即启动下一轮。
5.5 Linux 下的 I/O 选择
实际系统中,POSIX AIO 的实现质量参差不齐。Linux 上更常见的选择是:
- io_uring(Linux 5.1+):最现代的异步 I/O 接口,通过共享内存环形缓冲区提交和完成 I/O 请求,系统调用次数极少
- libaio(Linux 原生 AIO):内核级异步 I/O,要求 O_DIRECT,绕过页缓存
- 线程池 + pread:最便携的方案,用专门的 I/O 线程做阻塞读写
- mmap + madvise(MADV_SEQUENTIAL):利用内核的预读机制,代码最简单
对于外部排序这种顺序访问模式,mmap + madvise
通常就够了。内核的预读机制会自动检测顺序访问模式并提前加载数据。
六、磁盘顺序读写 vs 随机读写
外部排序之所以强调”多路归并”而不是”外部快排”,根本原因是磁盘的顺序读写性能远远超过随机读写。
6.1 性能差异的物理根源
机械硬盘(HDD):
- 顺序读写:只需等一次寻道 + 旋转延迟,之后数据连续从盘面流过磁头
- 随机读写:每次都要寻道(平均 8-10ms)+ 旋转延迟(平均 4ms)
- 比例:顺序/随机 ≈ 100-500x
固态硬盘(SSD):
- 没有机械运动,随机读性能大幅改善
- 但随机写仍然较慢(涉及垃圾回收、写放大)
- 比例:顺序/随机 ≈ 2-10x(读)、5-50x(写)
6.2 实测数据对比
以下是在典型硬件上的 fio 基准测试结果(4KB 块大小,队列深度 32):
设备类型 | 顺序读 | 随机读 | 顺序写 | 随机写 | 顺序/随机(读)
-----------------+-----------+-----------+-----------+-----------+-------------
7200RPM HDD | 180 MB/s | 1.5 MB/s | 170 MB/s | 1.2 MB/s | 120x
SATA SSD | 550 MB/s | 350 MB/s | 520 MB/s | 180 MB/s | 1.6x
NVMe SSD | 3500 MB/s | 800 MB/s | 3000 MB/s | 400 MB/s | 4.4x
Intel Optane | 2500 MB/s | 2200 MB/s | 2200 MB/s | 2000 MB/s | 1.1x
关键观察:
- HDD 上,随机读只有顺序读的 1/120,这解释了为什么外部排序必须保持顺序访问
- NVMe SSD 上随机读已经相当快,但顺序读仍有 4x 优势
- Optane 几乎消除了顺序/随机差异,但价格昂贵且已停产
6.3 对外部排序设计的影响
这些数据直接影响外部排序的设计选择:
- HDD 时代:必须严格保持顺序访问。归并路数不宜太多(避免磁头频繁寻道)。实际工程中常限制在 8-16 路,多趟归并
- SSD 时代:可以适当放宽限制。更大的归并路数可以接受,因为多个输入流的交替读取不再是灾难
- NVMe 时代:I/O 并行度变得重要。NVMe 支持多队列,应该用多线程同时提交 I/O 请求
七、SQLite 的外部排序实现
SQLite
是最广泛部署的数据库引擎,其外部排序实现值得研究。它的代码在
vdbesort.c 中,约 3000 行。
7.1 整体架构
SQLite 的排序器(sorter)有两种模式:
- 内存模式:数据量小于配置阈值时,使用内存中的 B-tree
- 外部排序模式:数据量超过阈值时,切换到多路归并
切换阈值由 SQLITE_CONFIG_PMASZ(PMA
Size)控制,默认 250 个页面(约 1MB)。
7.2 PMA(Pre-sorted Merge Area)
SQLite 将初始 run 称为 PMA。生成过程:
1. 记录先被插入一个内存中的 B-tree(红黑树变体)
2. 当内存 B-tree 达到阈值大小:
a. 按序遍历 B-tree,将有序记录写入临时文件
b. 清空 B-tree,开始积累下一个 PMA
3. 写出时使用变长整数编码记录长度,紧凑存储
注意 SQLite 没有使用替换选择——它选择了简单的”填满内存 B-tree 再排出”策略。这在小型嵌入式场景下是合理的选择。
7.3 多线程 PMA 生成
从 SQLite 3.7.13 开始,引入了多线程排序支持。工作方式:
/* 伪代码:SQLite 的多线程 PMA 生成 */
/* 主线程 */
while (has_more_records()) {
btree_insert(pSorter->pBtree, record);
if (btree_size(pSorter->pBtree) >= threshold) {
/* 将当前 B-tree 交给后台线程写出 */
hand_off_to_bg_thread(pSorter->pBtree);
/* 主线程立即开始新的 B-tree */
pSorter->pBtree = btree_new();
}
}
/* 后台线程 */
void bg_thread_func(BTree *bt) {
traverse_and_write_pma(bt);
btree_free(bt);
}这样主线程和写出线程可以并行工作:一个在排序,一个在写磁盘。
7.4 归并阶段
SQLite 的归并使用了标准的多路归并,但有几个工程细节值得注意:
- 增量归并:不是一次性归并完所有
PMA,而是每次
sqlite3_step()调用时只归并并返回一条记录。这避免了排序操作阻塞整个查询 - 记录压缩:相邻记录如果有公共前缀,只存储差异部分
- 文件管理:多个 PMA 可以存在同一个临时文件中,用偏移量区分,减少文件描述符消耗
7.5 关键源码片段分析
以下是 vdbesort.c
中归并的核心逻辑(简化后):
/* 从 PMA 读取器中取下一条记录 */
static int vdbePmaReaderNext(PmaReader *pReader) {
int rc = SQLITE_OK;
if (pReader->iReadOff >= pReader->iEof) {
/* 当前 PMA 已读完 */
pReader->pKey = 0;
return SQLITE_OK;
}
/* 读取记录长度(变长整数) */
rc = vdbePmaReadVarint(pReader, &nRec);
/* 读取记录内容 */
rc = vdbePmaReadBlob(pReader, (int)nRec, &pReader->pKey);
pReader->nKey = (int)nRec;
return rc;
}
/* 多路归并的一步:找到当前最小的 PMA 并推进 */
static int vdbeSorterNext(VdbeSorter *pSorter) {
int rc;
int iPrev = pSorter->aTree[1]; /* 败者树的总冠军 */
/* 推进该路的 PMA 读取器 */
rc = vdbePmaReaderNext(&pSorter->aReader[iPrev]);
if (rc != SQLITE_OK) return rc;
/* 重新调整败者树 */
rc = vdbeSorterDoCompare(pSorter, iPrev);
return rc;
}SQLite 确实使用了败者树(称为
aTree)进行多路归并,这验证了败者树在实际系统中的价值。
八、LevelDB/RocksDB 中的排序与 SSTable 构建
LSM-tree 存储引擎的核心操作——compaction——本质上就是一个多路归并排序过程。
8.1 SSTable 的结构
SSTable(Sorted String Table)是一个有序的键值对文件,结构如下:
+-------------------+
| Data Block 0 | <- 有序的 KV 对,前缀压缩
| Data Block 1 |
| ... |
| Data Block N |
+-------------------+
| Meta Block | <- 布隆过滤器等
+-------------------+
| Meta Index Block | <- Meta Block 的索引
+-------------------+
| Index Block | <- Data Block 的索引(每块最大 key)
+-------------------+
| Footer | <- 各种偏移量
+-------------------+
8.2 Memtable 刷盘:Minor Compaction
当内存中的 Memtable(一个跳表或 B-tree)达到大小阈值时,需要刷盘生成 SSTable:
Memtable(有序)→ 顺序扫描 → 写 SSTable 文件
这个过程本质上就是外部排序”阶段一”的变体:内存中的有序数据写出为磁盘上的有序 run(SSTable)。
8.3 Compaction:多路归并
Major Compaction 将多个 SSTable 合并为一个更大的 SSTable:
/* RocksDB compaction 的核心归并循环(简化) */
void DoCompaction(CompactionState *state) {
MergingIterator *merge_iter = NewMergingIterator(
state->input_sstables,
state->num_inputs
);
for (merge_iter->SeekToFirst();
merge_iter->Valid();
merge_iter->Next()) {
Slice key = merge_iter->key();
Slice value = merge_iter->value();
/* 处理删除标记(tombstone)、版本控制等 */
if (should_drop(key, value)) continue;
/* 写入输出 SSTable */
state->builder->Add(key, value);
/* 如果输出 SSTable 达到目标大小,切分 */
if (state->builder->FileSize() >= target_file_size) {
FinishOutputFile(state);
StartNewOutputFile(state);
}
}
}8.4 RocksDB 的特殊优化
RocksDB 在 LevelDB 的基础上增加了多项排序相关的优化:
- 子压缩(Subcompaction):将一次 compaction 的键空间范围分成多个子范围,每个子范围由独立线程处理,实现并行归并
- 直接 I/O:绕过操作系统页缓存,避免写入大量 SSTable 时”污染”缓存
- Rate Limiter:限制 compaction 的 I/O 带宽,避免影响前台读写请求
compaction 线程数 = max_subcompactions(默认 1)
I/O 限制 = rate_bytes_per_sec(默认不限制)
8.5 与经典外部排序的异同
| 方面 | 经典外部排序 | LSM-tree Compaction |
|---|---|---|
| 目标 | 一次性全量排序 | 增量维护有序性 |
| 输入 | 一个无序文件 | 多个有序 SSTable |
| 输出 | 一个有序文件 | 一个或多个有序 SSTable |
| 触发 | 一次性 | 条件触发(大小/数量) |
| 版本控制 | 无 | 需要处理多版本和删除标记 |
| 并发性 | 不关注 | 必须与前台读写并发 |
九、SSD 时代的外部排序演进
SSD 的普及改变了外部排序的性能特征。传统的”一切为了顺序 I/O”的设计原则需要重新审视。
9.1 SSD 带来的变化
- 随机读延迟降低 1000 倍:HDD 的 8-10ms → SSD 的 10-100μs
- I/O 并行度提升:NVMe SSD 支持 64K 队列深度,而 HDD 几乎是串行的
- 写放大成为新瓶颈:SSD 的擦写次数有限,过多的写操作影响寿命
9.2 策略演进
更大的归并路数:
在 HDD 上,k 路归并的 k 个输入流会导致磁头频繁寻道。在 SSD 上这不再是问题,可以大胆使用 k=数百甚至数千的路数,减少归并趟数。
允许少量随机读:
一些系统开始探索”外部分区排序”(External Distribution Sort),类似外部快排:
1. 选取 p 个枢轴值,将数据分配到 p+1 个桶(分区)
2. 每个桶独立排序
3. 因为桶已经按范围划分,所以无需归并
这种方法在 HDD 上因为随机写而不可行,但在 SSD 上变得可以接受。
减少写放大:
SSD 上需要关注总写入量。一些优化策略:
- 使用更大的初始 run(更多内存),减少归并趟数
- 如果只需要 top-K 排序,利用分区剪枝避免排序不需要的数据
- 压缩中间结果,减少实际写入量
9.3 NVMe 多队列优化
现代 NVMe SSD 有多个硬件队列。外部排序可以利用这个特性:
/* 多队列 I/O 提交(概念代码) */
void submit_io_requests(InputBuffer bufs[], int k) {
for (int i = 0; i < k; i++) {
if (bufs[i].needs_refill) {
/* 每个 run 的 I/O 提交到不同的队列 */
/* io_uring 会自动利用多队列 */
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
io_uring_prep_readv(sqe, bufs[i].fd,
&bufs[i].iov, 1,
bufs[i].offset);
sqe->user_data = i;
}
}
io_uring_submit(&ring);
}关键是:所有 I/O 请求一次性提交,让 NVMe 控制器自己调度并行执行。
十、分布式排序:MapReduce 的 Shuffle & Sort
当数据量超过单机处理能力时,需要分布式排序。MapReduce 的 shuffle 阶段就是一个大规模的外部排序。
10.1 MapReduce 排序流程
Map 阶段:
每个 Mapper 读取输入切片
→ 执行 map 函数
→ 按 key 的 hash(key) % R 分区
→ 每个分区内排序(内排序)
→ 写入本地磁盘(溢写文件)
→ 本地归并溢写文件
Shuffle 阶段:
每个 Reducer 从所有 Mapper 拉取自己分区的数据
→ 网络传输
→ 本地归并排序
Reduce 阶段:
按有序 key 遍历,执行 reduce 函数
10.2 Map 端的排序
Hadoop MapReduce 的 Map 端排序实现了一个紧凑的外部排序:
- 环形缓冲区:大小由
mapreduce.task.io.sort.mb控制(默认 100MB) - 溢写阈值:缓冲区使用率达到
mapreduce.map.sort.spill.percent(默认 0.8)时触发溢写 - 排序方法:对环形缓冲区中的索引做快排(不移动数据本身)
- 合并:所有溢写文件做最终多路归并
环形缓冲区设计:
+--------------------------------------------------+
| 数据区域 → ← 索引区域 |
| [record][record][record]... ...[idx][idx][idx] |
+--------------------------------------------------+
^ ^
equator(随数据和索引的增长移动)
数据从左往右写,索引从右往左写。当两者相遇时触发溢写。这个设计避免了额外的内存拷贝。
10.3 Reduce 端的归并
Reduce 端需要归并来自 M 个 Mapper 的数据。归并策略:
假设有 M 个 Mapper 输出,归并因子为 f:
如果 M <= f:
一趟归并完成
如果 M > f:
第一趟:归并 M - f*(p-1) 个文件,使剩余文件数恰好为 f 的倍数
后续每趟:归并 f 个文件
最后一趟:归并最后 f 个文件,直接送入 reduce 函数
其中 p = ⌈(M - f) / (f - 1)⌉ + 1 是总趟数
这个策略确保最后一趟的归并路数恰好是 f,最大化最后一趟的效率。
10.4 现代分布式排序
Spark、Flink 等系统在
MapReduce 基础上做了进一步优化:Spark 使用
TimSort 做内排序,支持 Tungsten
二进制排序(直接比较序列化后的字节);Flink
实现了自己的内存管理器,排序在堆外内存中完成,避免 Java GC
开销。
十一、完整的 C 语言实现
以下是一个完整的多路外部归并排序实现,包含败者树、双缓冲和基本错误处理。
/*
* external_sort.c — 多路外部归并排序(败者树实现)
*
* 编译:gcc -O2 -o external_sort external_sort.c -lpthread
* 使用:./external_sort input.bin output.bin
*
* 数据格式:二进制文件,每条记录为一个 int64_t
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <errno.h>
#include <limits.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
/* ===== 配置参数 ===== */
#define MEMORY_LIMIT (256 * 1024 * 1024) /* 256MB 可用内存 */
#define BLOCK_SIZE (4 * 1024 * 1024) /* 4MB I/O 块大小 */
#define MAX_MERGE_WAYS 512 /* 最大归并路数 */
#define RECORD_SIZE sizeof(int64_t) /* 记录大小 */
/* 每个 run 最多容纳的记录数 */
#define RECORDS_PER_RUN (MEMORY_LIMIT / RECORD_SIZE)
/* 每个 I/O 块容纳的记录数 */
#define RECORDS_PER_BLK (BLOCK_SIZE / RECORD_SIZE)
/* ===== 错误处理宏 ===== */
#define CHECK(cond, msg) do { \
if (!(cond)) { \
fprintf(stderr, "ERROR [%s:%d]: %s (errno=%d: %s)\n", \
__FILE__, __LINE__, msg, errno, strerror(errno)); \
exit(EXIT_FAILURE); \
} \
} while(0)
/* ===== 比较函数 ===== */
static int cmp_int64(const void *a, const void *b) {
int64_t va = *(const int64_t *)a;
int64_t vb = *(const int64_t *)b;
if (va < vb) return -1;
if (va > vb) return 1;
return 0;
}
/* ===== 败者树 ===== */
typedef struct {
int loser[MAX_MERGE_WAYS]; /* 内部节点:败者的路编号 */
int64_t keys[MAX_MERGE_WAYS]; /* 各路当前键值 */
int k; /* 归并路数 */
} LoserTree;
static void lt_init(LoserTree *lt, int k) {
lt->k = k;
for (int i = 0; i < k; i++) {
lt->loser[i] = -1;
lt->keys[i] = INT64_MAX;
}
}
static void lt_adjust(LoserTree *lt, int s) {
int k = lt->k;
int winner = s;
int parent = (k + s) / 2;
while (parent > 0) {
if (lt->loser[parent] == -1) {
lt->loser[parent] = winner;
return;
}
if (lt->keys[winner] > lt->keys[lt->loser[parent]]) {
int tmp = winner;
winner = lt->loser[parent];
lt->loser[parent] = tmp;
}
parent /= 2;
}
lt->loser[0] = winner;
}
static void lt_build(LoserTree *lt, int64_t initial[], int k) {
lt_init(lt, k);
for (int i = k - 1; i >= 0; i--) {
lt->keys[i] = initial[i];
lt_adjust(lt, i);
}
}
static inline int lt_winner(const LoserTree *lt) {
return lt->loser[0];
}
static void lt_update(LoserTree *lt, int s, int64_t new_key) {
lt->keys[s] = new_key;
int k = lt->k;
int winner = s;
int parent = (k + s) / 2;
while (parent > 0) {
if (lt->keys[winner] > lt->keys[lt->loser[parent]]) {
int tmp = winner;
winner = lt->loser[parent];
lt->loser[parent] = tmp;
}
parent /= 2;
}
lt->loser[0] = winner;
}
/* ===== Run 信息 ===== */
typedef struct {
char path[256]; /* 临时文件路径 */
size_t count; /* 该 run 中的记录数 */
} RunInfo;
/* ===== 阶段一:生成初始 runs ===== */
static int generate_runs(const char *input_path,
RunInfo runs[], int *num_runs) {
FILE *fin = fopen(input_path, "rb");
CHECK(fin != NULL, "无法打开输入文件");
/* 获取文件大小 */
fseek(fin, 0, SEEK_END);
long file_size = ftell(fin);
fseek(fin, 0, SEEK_SET);
size_t total_records = (size_t)file_size / RECORD_SIZE;
/* 分配内存缓冲区 */
int64_t *buffer = (int64_t *)malloc(MEMORY_LIMIT);
CHECK(buffer != NULL, "内存分配失败");
int run_count = 0;
size_t records_remaining = total_records;
fprintf(stderr, "总记录数: %zu, 每个 run 记录数: %zu\n",
total_records, (size_t)RECORDS_PER_RUN);
while (records_remaining > 0) {
/* 读取一批记录到内存 */
size_t batch = records_remaining < RECORDS_PER_RUN
? records_remaining : RECORDS_PER_RUN;
size_t nread = fread(buffer, RECORD_SIZE, batch, fin);
CHECK(nread == batch, "读取输入文件失败");
/* 内部排序 */
qsort(buffer, batch, RECORD_SIZE, cmp_int64);
/* 写入临时文件 */
snprintf(runs[run_count].path, sizeof(runs[run_count].path),
"run_%04d.tmp", run_count);
runs[run_count].count = batch;
FILE *frun = fopen(runs[run_count].path, "wb");
CHECK(frun != NULL, "无法创建临时 run 文件");
size_t nwrite = fwrite(buffer, RECORD_SIZE, batch, frun);
CHECK(nwrite == batch, "写入 run 文件失败");
fclose(frun);
fprintf(stderr, " 生成 run %d: %zu 条记录\n",
run_count, batch);
run_count++;
records_remaining -= batch;
}
free(buffer);
fclose(fin);
*num_runs = run_count;
fprintf(stderr, "阶段一完成: 共 %d 个 runs\n", run_count);
return 0;
}
/* ===== Run 读取器 ===== */
typedef struct {
FILE *fp;
int64_t *buf; /* 读取缓冲区 */
size_t buf_capacity; /* 缓冲区容量(记录数) */
size_t buf_count; /* 缓冲区有效记录数 */
size_t buf_pos; /* 当前读取位置 */
size_t remaining; /* 文件中剩余的记录数 */
int exhausted; /* 是否已耗尽 */
} RunReader;
static int run_reader_open(RunReader *rr, const RunInfo *ri,
size_t buf_capacity) {
rr->fp = fopen(ri->path, "rb");
CHECK(rr->fp != NULL, "无法打开 run 文件");
rr->buf = (int64_t *)malloc(buf_capacity * RECORD_SIZE);
CHECK(rr->buf != NULL, "读取缓冲区分配失败");
rr->buf_capacity = buf_capacity;
rr->buf_count = 0;
rr->buf_pos = 0;
rr->remaining = ri->count;
rr->exhausted = 0;
return 0;
}
static int run_reader_refill(RunReader *rr) {
if (rr->remaining == 0) {
rr->exhausted = 1;
return -1;
}
size_t to_read = rr->remaining < rr->buf_capacity
? rr->remaining : rr->buf_capacity;
size_t nread = fread(rr->buf, RECORD_SIZE, to_read, rr->fp);
CHECK(nread == to_read, "读取 run 缓冲区失败");
rr->buf_count = nread;
rr->buf_pos = 0;
rr->remaining -= nread;
return 0;
}
static int run_reader_next(RunReader *rr, int64_t *key) {
if (rr->buf_pos >= rr->buf_count) {
if (run_reader_refill(rr) < 0) return -1;
}
*key = rr->buf[rr->buf_pos++];
return 0;
}
static void run_reader_close(RunReader *rr) {
if (rr->fp) fclose(rr->fp);
free(rr->buf);
rr->fp = NULL;
rr->buf = NULL;
}
/* ===== 阶段二:多路归并 ===== */
static int merge_runs(const char *output_path,
RunInfo runs[], int num_runs) {
if (num_runs == 0) return 0;
/* 如果只有一个 run,直接重命名 */
if (num_runs == 1) {
rename(runs[0].path, output_path);
return 0;
}
int k = num_runs;
if (k > MAX_MERGE_WAYS) {
fprintf(stderr, "WARN: run 数量 %d 超过最大归并路数 %d,"
"需要多趟归并(此实现暂不支持)\n",
k, MAX_MERGE_WAYS);
return -1;
}
/* 为每路分配缓冲区 */
size_t buf_per_way = (MEMORY_LIMIT / RECORD_SIZE) / (k + 1);
if (buf_per_way < 1) buf_per_way = 1;
fprintf(stderr, "阶段二: %d 路归并,每路缓冲 %zu 条记录\n",
k, buf_per_way);
/* 打开所有 run 读取器 */
RunReader *readers = (RunReader *)calloc(k, sizeof(RunReader));
CHECK(readers != NULL, "读取器数组分配失败");
for (int i = 0; i < k; i++) {
run_reader_open(&readers[i], &runs[i], buf_per_way);
}
/* 读取每路的第一条记录,构建败者树 */
int64_t initial_keys[MAX_MERGE_WAYS];
for (int i = 0; i < k; i++) {
if (run_reader_next(&readers[i], &initial_keys[i]) < 0) {
initial_keys[i] = INT64_MAX; /* 空 run 用哨兵值 */
}
}
LoserTree lt;
lt_build(<, initial_keys, k);
/* 打开输出文件 */
FILE *fout = fopen(output_path, "wb");
CHECK(fout != NULL, "无法打开输出文件");
/* 输出缓冲区 */
size_t out_buf_cap = buf_per_way;
int64_t *out_buf = (int64_t *)malloc(out_buf_cap * RECORD_SIZE);
CHECK(out_buf != NULL, "输出缓冲区分配失败");
size_t out_pos = 0;
/* 归并主循环 */
size_t total_merged = 0;
while (1) {
int w = lt_winner(<);
if (lt.keys[w] == INT64_MAX) break; /* 所有路耗尽 */
/* 写入输出缓冲区 */
out_buf[out_pos++] = lt.keys[w];
if (out_pos >= out_buf_cap) {
size_t nw = fwrite(out_buf, RECORD_SIZE, out_pos, fout);
CHECK(nw == out_pos, "写入输出文件失败");
out_pos = 0;
}
/* 从胜出的路读取下一条记录 */
int64_t next_key;
if (run_reader_next(&readers[w], &next_key) < 0) {
next_key = INT64_MAX; /* 该路耗尽 */
}
lt_update(<, w, next_key);
total_merged++;
if (total_merged % 10000000 == 0) {
fprintf(stderr, " 已归并 %zu 条记录\r", total_merged);
}
}
/* 刷出剩余的输出缓冲区 */
if (out_pos > 0) {
size_t nw = fwrite(out_buf, RECORD_SIZE, out_pos, fout);
CHECK(nw == out_pos, "刷出输出缓冲区失败");
}
fprintf(stderr, "\n阶段二完成: 共归并 %zu 条记录\n", total_merged);
/* 清理 */
free(out_buf);
fclose(fout);
for (int i = 0; i < k; i++) {
run_reader_close(&readers[i]);
remove(runs[i].path); /* 删除临时文件 */
}
free(readers);
return 0;
}
/* ===== 验证排序结果 ===== */
static int verify_sorted(const char *path) {
FILE *f = fopen(path, "rb");
CHECK(f != NULL, "无法打开文件进行验证");
int64_t prev = INT64_MIN;
int64_t cur;
size_t count = 0;
while (fread(&cur, RECORD_SIZE, 1, f) == 1) {
if (cur < prev) {
fprintf(stderr, "验证失败: 位置 %zu, %ld > %ld\n",
count, (long)prev, (long)cur);
fclose(f);
return -1;
}
prev = cur;
count++;
}
fclose(f);
fprintf(stderr, "验证通过: %zu 条记录有序\n", count);
return 0;
}
/* ===== 主函数 ===== */
int main(int argc, char *argv[]) {
if (argc != 3) {
fprintf(stderr, "用法: %s <输入文件> <输出文件>\n", argv[0]);
return EXIT_FAILURE;
}
const char *input_path = argv[1];
const char *output_path = argv[2];
RunInfo runs[MAX_MERGE_WAYS];
int num_runs = 0;
fprintf(stderr, "==== 外部排序开始 ====\n");
fprintf(stderr, "内存限制: %d MB, 块大小: %d MB\n",
MEMORY_LIMIT / (1024*1024), BLOCK_SIZE / (1024*1024));
/* 阶段一 */
int rc = generate_runs(input_path, runs, &num_runs);
CHECK(rc == 0, "生成初始 runs 失败");
/* 阶段二 */
rc = merge_runs(output_path, runs, num_runs);
CHECK(rc == 0, "归并排序失败");
/* 验证 */
rc = verify_sorted(output_path);
CHECK(rc == 0, "排序结果验证失败");
fprintf(stderr, "==== 外部排序完成 ====\n");
return EXIT_SUCCESS;
}代码结构说明
整个实现分为四个模块:
- 败者树(
lt_*函数):提供 O(log k) 的多路归并核心操作 - Run
生成(
generate_runs):将输入切分为内存大小的有序 run - Run
读取器(
RunReader):带缓冲的 run 顺序读取 - 归并主循环(
merge_runs):用败者树驱动的多路归并
这个实现为了清晰性省略了一些生产级特性(多趟归并、异步 I/O、多线程),但核心算法是完整的。
十二、工程陷阱与最佳实践
外部排序的工程实现中有许多容易踩到的坑,以下是多年经验的总结:
| 陷阱 | 现象 | 解决方案 |
|---|---|---|
| 临时文件未清理 | 异常退出后磁盘空间被大量临时文件占满 | 注册 atexit()
或信号处理函数,确保清理;使用 O_TMPFILE(Linux
3.11+)创建无名临时文件 |
| 归并路数过大导致内存不足 | 每路分配的缓冲区太小,频繁触发 I/O | 动态计算最优路数:k = min(num_runs, (M - output_buf) / (2 * B)),预留双缓冲和输出缓冲区的空间 |
| 文件描述符耗尽 | 大量 run 同时打开,超过 ulimit -n |
用 setrlimit()
提高限制,或实现”虚拟文件描述符”池(打开/关闭按需切换) |
| 输出缓冲区太小 | 频繁的小块写入导致磁盘吞吐量骤降 | 输出缓冲区至少 1MB,最好 4-16MB |
| 跨平台 I/O 行为差异 | Windows 的 fwrite
在某些配置下不是原子的 |
使用平台特定的 API(WriteFile /
pwrite),或加锁保护 |
| 排序键包含指针 | 排序后指针失效,反序列化崩溃 | 排序时只携带偏移量或扁平化的键,归并完成后再回填指针 |
| 未考虑磁盘空间 | 临时文件需要的空间约等于原始数据大小 | 排序前检查可用空间(statvfs()),至少需要
1x 原始数据量的额外空间 |
| 变长记录的对齐问题 | 读取未对齐的数据导致性能下降或 bus error | 按对齐边界填充记录,或使用 memcpy
代替直接指针转换 |
| 大文件偏移溢出 | 32 位 off_t 在 2GB 处溢出 |
确保编译时定义 _FILE_OFFSET_BITS=64,使用
off64_t |
| SSD 的写放大 | 反复写入临时文件加速 SSD 磨损 | 尽量减少归并趟数,使用 RAM disk 存放临时文件(如果内存足够),或使用 direct I/O 绕过页缓存减少不必要的回写 |
| 相同键的稳定性 | 外部排序默认不保证稳定性 | 在排序键后附加序号(原始位置),比较时作为 tie-breaker |
| 归并时的热点竞争 | 多线程归并时,败者树成为竞争点 | 每个线程独立归并一部分 run,最后再归并各线程的结果 |
十三、个人思考
写到这里,回顾外部排序这个话题,有几点感想。
第一,I/O
模型仍然是理解存储系统设计的基石。 无论是数据库的
B-tree、LSM-tree,还是大数据系统的 shuffle,底层都是在优化
I/O 次数和模式。理解 O(N/B · log_{M/B}(N/B))
这个公式,就能理解为什么这些系统做出了各自的设计选择。
第二,败者树是一个被低估的数据结构。 教科书通常只花一两页讲败者树,然后就跳到下一个话题。但在外部排序、定时器管理、调度器等场景中,败者树的性能优势是实实在在的。如果你在实现任何形式的 k 路归并,首选败者树,不要用最小堆。
第三,硬件在变,算法也在变。 HDD 时代的”一切为了顺序访问”已经不完全适用于 SSD。但这不意味着外部排序过时了——即使 SSD 很快,内存和磁盘之间的速度差距仍然是数量级的。外部排序的核心思想——“感知 I/O 层级、减少数据移动”——在可预见的未来仍然是有效的。
第四,简单的方案往往是最好的。 替换选择听起来很酷(平均 2M 的 run!),但现代系统越来越倾向于用简单的内部排序替代它。原因很实际:实现更简单、调试更容易、缓存更友好,而且多出来的一趟归并在 SSD 上代价很小。工程中,可维护性和可靠性永远优先于理论上的最优。
第五,外部排序是分布式系统的缩影。 MapReduce 的 shuffle 就是多机版的外部排序。理解了单机外部排序中的缓冲区管理、流水线和负载均衡,就能更好地理解分布式系统中类似问题的设计。从这个角度看,外部排序不仅是一个算法题目,更是一种系统思维的训练。
参考文献
- Aggarwal, A. and Vitter, J.S. “The Input/Output Complexity of Sorting and Related Problems.” Communications of the ACM, 31(9):1116-1127, 1988.
- Knuth, D.E. The Art of Computer Programming, Volume 3: Sorting and Searching, 2nd Edition. Addison-Wesley, 1998. Section 5.4: External Sorting.
- Graefe, G. “Implementing Sorting in Database Systems.” ACM Computing Surveys, 38(3), 2006.
- SQLite 源码:
vdbesort.c,https://sqlite.org/src/file/src/vdbesort.c - O’Neil, P. et al. “The Log-Structured Merge-Tree (LSM-tree).” Acta Informatica, 33(4):351-385, 1996.
- Dean, J. and Ghemawat, S. “MapReduce: Simplified Data Processing on Large Clusters.” OSDI, 2004.
- RocksDB Wiki: Compaction, https://github.com/facebook/rocksdb/wiki/Compaction
- Rumble, S. et al. “Log-structured Memory for DRAM-based Storage.” FAST, 2014.
- Intel. “Optimizing Performance with Intel Optane Technology.” Technical Report, 2019.