土法炼钢兴趣小组的算法知识备份

外部排序:当数据装不进内存

目录

当你试图对 100GB 的日志文件做排序,而机器只有 4GB 内存时,快排帮不了你。这不是算法能力的问题,而是物理约束:数据根本放不进内存。

外部排序(External Sorting)是数据库、搜索引擎和大数据系统的基石之一。它的核心思想并不复杂——分而治之加多路归并——但工程实现中隐藏着大量细节:如何生成更长的初始 run?败者树为什么比最小堆快?双缓冲怎么和异步 I/O 配合?这些问题决定了系统在真实负载下的表现。

本文从理论模型出发,逐步深入到完整的 C 语言实现,再到 SQLite、LevelDB、MapReduce 等真实系统的设计选择。

多路归并数据流动图

一、外部存储模型与 I/O 复杂度

1.1 为什么需要新的计算模型

传统的 RAM 模型假设任意内存访问代价相同,时间复杂度只计算比较和移动次数。但在外部排序的场景下,这个假设完全失效:一次磁盘 I/O 的延迟是内存访问的 10 万倍以上。算法的瓶颈不再是 CPU 计算,而是磁盘读写次数。

Aggarwal 和 Vitter 在 1988 年提出了外部存储模型(External Memory Model,也称 I/O Model 或 DAM Model),用三个参数刻画问题规模:

在这个模型中,我们只计算 I/O 次数,CPU 计算视为免费。

1.2 排序的 I/O 下界

外部排序的 I/O 复杂度下界是:

I/O 复杂度 = O( (N/B) · log_{M/B}(N/B) )

理解这个公式的关键在于:

举个具体的例子:

N = 100GB 数据(约 10^10 条小记录)
M = 4GB 内存
B = 4KB 磁盘块(约 100 条记录)

磁盘块数 = N/B = 25 × 10^6
归并路数 = M/B = 10^6
归并趟数 = log_{10^6}(25 × 10^6) ≈ 1.23,即 2 趟

总 I/O ≈ 2 × 2 × (N/B) = 10^8 次块传输

注意这个结果:即使是 100GB 的数据,在 4GB 内存下通常只需要 2 趟归并就能完成排序。这是因为 M/B 通常很大,使得对数的底数很大。

1.3 与内部排序的对比

指标 内部排序(快排) 外部排序(多路归并)
度量 比较次数 I/O 次数
复杂度 O(N log N) O((N/B) log_{M/B}(N/B))
瓶颈 CPU 磁盘带宽
常数因子关键 缓存命中率 顺序 vs 随机 I/O
优化方向 减少分支预测失败 增大归并路数、异步 I/O

二、基本多路归并算法

外部排序的经典算法分为两个阶段:

2.1 阶段一:生成初始有序 Run

  1. 从磁盘读入 M 大小的数据块到内存
  2. 用内部排序算法(快排、堆排等)对内存中的数据排序
  3. 将排好序的数据写回磁盘,形成一个”run”
  4. 重复直到所有数据都被处理

如果数据总量为 N,内存为 M,那么初始 run 的数量为 ⌈N/M⌉

2.2 阶段二:多路归并

将所有 run 同时打开,每个 run 分配一个输入缓冲区,再分配一个输出缓冲区:

  1. 从每个 run 的缓冲区中取出当前最小元素
  2. 用优先队列(败者树或最小堆)找到全局最小值
  3. 将最小值写入输出缓冲区
  4. 从该最小值所属的 run 中补充下一个元素
  5. 输出缓冲区满时,写回磁盘
  6. 重复直到所有 run 耗尽

2.3 归并趟数的计算

如果有 R 个初始 run,每趟做 k 路归并,则需要 ⌈log_k(R)⌉ 趟。k 的最大值受限于内存:每路需要一个缓冲区(大小 B),加上一个输出缓冲区,所以 k = M/B - 1

k >= R 时,一趟就能完成归并。在实际系统中,这是常见的情况。

示例:
- 数据量 100GB,内存 4GB,块大小 4KB
- 初始 run 数量 = 100GB / 4GB = 25
- 归并路数 k = 4GB / 4KB - 1 ≈ 10^6
- 因为 k >> R,一趟归并即可完成

三、初始 Run 生成策略

初始 run 的质量(长度和数量)直接影响后续归并的效率。run 越长,数量越少,归并趟数越少。

3.1 简单切分:内部排序法

最直接的方法:每次读入 M 条记录,内排序后写出。

选择内部排序算法时需要注意:快排的平均性能最好,但不稳定(相同键的记录可能打乱顺序)。如果需要稳定排序,用归并排序。

3.2 替换选择(Replacement Selection)

替换选择是 Knuth 在《The Art of Computer Programming》中详细讨论的经典方法。它能生成平均长度为 2M 的初始 run——是简单方法的两倍。

算法过程:

1. 用前 M 条记录填满堆(优先队列)
2. 重复以下步骤:
   a. 从堆中取出最小元素 x,写入当前 run
   b. 从输入读取下一条记录 y
   c. 如果 y >= x(不破坏当前 run 的有序性):
      将 y 放入堆中
   d. 否则:
      标记 y 为"下一个 run"的元素
      将 y 放入堆的"冻结区"
   e. 如果堆中没有非冻结元素:
      开始新的 run
      解冻所有冻结元素

为什么平均 run 长度是 2M?

直觉上可以用”雪花融化”的类比理解:堆像一个蓄水池,数据像不断落入的雪花。当输入数据是随机分布时,大约有一半的新元素能直接融入当前 run(大于刚输出的值),这使得 run 在被”消耗”的同时又不断被”补充”,最终长度约为容量的两倍。严格证明需要用到随机过程理论。

3.3 替换选择的代价与权衡

替换选择并非没有代价:

方面 内部排序法 替换选择
run 平均长度 M 2M
run 数量 N/M N/(2M)
CPU 开销 O(M log M)(快排) O(N log M)(堆操作)
实现复杂度
缓存友好性 好(数组连续访问) 差(堆的随机跳跃)

在现代硬件上,替换选择的缓存不友好性是个严重问题。当 M 很大时(比如数 GB),堆操作会导致大量缓存未命中。近年来一些系统(如 MariaDB 10.x)已经放弃了替换选择,改用简单的内部排序加更大的缓冲区。

我的看法是:如果你的磁盘很慢(HDD)、内存不大(几百 MB),替换选择值得考虑,因为减少趟数的收益能抵消 CPU 损失。如果磁盘是 SSD 且内存充足,直接用内部排序法更简单可靠。

3.4 对近乎有序数据的加速

替换选择有一个常被忽视的优势:如果输入数据已经近乎有序(比如时间序列日志),run 长度可以远超 2M,甚至趋近于 N。在极端情况下(输入完全有序),只会生成一个 run,根本不需要归并。

这是一个在实际系统中很有用的性质,因为真实数据常常带有局部有序性。

四、败者树与最小堆

多路归并的核心数据结构是优先队列:从 k 个有序序列中反复取出全局最小值。最小堆和败者树都能完成这个任务,但败者树在这个场景下更优。

4.1 最小堆的问题

最小堆做 k 路归并时,每次取出最小值后需要:

  1. 将新元素放到堆顶
  2. 下沉(sift-down)恢复堆性质

下沉操作每层需要 2 次比较(先比较两个子节点,再与父节点比较),共 2 · ⌈log_2 k⌉ 次比较。

更关键的是,每次比较都涉及不同的元素对。在 CPU 层面,这导致分支预测频繁失败,因为比较结果的不确定性很高。

4.2 败者树的优势

败者树(Loser Tree)是锦标赛树(Tournament Tree)的变体,最初由 Knuth 描述。它的核心思想是:

当某路的元素被取走后,只需从该叶子到根的路径上做 每层 1 次比较,总共 ⌈log_2 k⌉ 次——是最小堆的一半。

原因在于:沿着路径上溯时,每个节点只需要与该节点记录的败者比较。如果新元素比败者小,新元素继续上升(赢了),败者留在原位。如果新元素比败者大,新元素留下(成为新败者),原败者上升。每个节点只需一次比较就能确定结果。

4.3 败者树的结构

        [总冠军: 最小值]
              |
          [败者 L1]          <- 内部节点存败者的来源索引
         /        \
     [败者 L2]  [败者 L3]
      /    \      /    \
    叶0   叶1   叶2   叶3    <- 叶子存各路当前值

败者树的内部节点不存值,只存”哪一路输了”的索引。这使得更新操作非常紧凑。

4.4 败者树的 C 语言实现

#include <limits.h>
#include <string.h>

#define MAX_WAYS 1024

typedef int64_t sort_key_t;

typedef struct {
    int loser[MAX_WAYS];       /* 内部节点:败者来源的路编号 */
    sort_key_t keys[MAX_WAYS]; /* 叶子节点:各路当前键值 */
    int k;                     /* 归并路数 */
} LoserTree;

/* 初始化败者树 */
void loser_tree_init(LoserTree *lt, int k) {
    lt->k = k;
    /* 所有内部节点初始指向一个虚拟的"负无穷"选手 */
    for (int i = 0; i < k; i++) {
        lt->loser[i] = -1;
    }
    /* 所有叶子初始为正无穷(哨兵) */
    for (int i = 0; i < k; i++) {
        lt->keys[i] = INT64_MAX;
    }
}

/* 从叶子 idx 到根的路径上调整败者 */
void loser_tree_adjust(LoserTree *lt, int idx) {
    int k = lt->k;
    int winner = idx;
    /* 从叶子对应的内部节点开始,向根上溯 */
    int parent = (k + idx) / 2;

    while (parent > 0) {
        if (lt->loser[parent] == -1) {
            /* 首次填充:直接设为败者,winner 上升 */
            lt->loser[parent] = winner;
            /* winner 还没有对手,由初始化保证 */
            return;
        }
        int loser = lt->loser[parent];
        if (lt->keys[winner] > lt->keys[loser]) {
            /* winner 输了,留下来做败者,原败者成为新 winner */
            lt->loser[parent] = winner;
            winner = loser;
        }
        /* 否则 winner 赢了,继续上升 */
        parent /= 2;
    }
    /* 到达根:winner 就是全局最小值 */
    lt->loser[0] = winner;
}

/* 构建败者树:逐一插入各路的首元素 */
void loser_tree_build(LoserTree *lt, sort_key_t initial_keys[], int k) {
    lt->k = k;
    for (int i = 0; i < k; i++) {
        lt->loser[i] = -1;
    }
    for (int i = 0; i < k; i++) {
        lt->keys[i] = initial_keys[i];
        loser_tree_adjust(lt, i);
    }
}

/* 获取当前最小值所在的路编号 */
static inline int loser_tree_winner(const LoserTree *lt) {
    return lt->loser[0];
}

/* 弹出最小值,用该路的下一个元素替换,重新调整 */
void loser_tree_replace(LoserTree *lt, int idx, sort_key_t new_key) {
    lt->keys[idx] = new_key;
    /* 从 idx 叶子开始,沿路径上溯做调整 */
    int k = lt->k;
    int winner = idx;
    int parent = (k + idx) / 2;

    while (parent > 0) {
        int loser = lt->loser[parent];
        if (lt->keys[winner] > lt->keys[loser]) {
            lt->loser[parent] = winner;
            winner = loser;
        }
        parent /= 2;
    }
    lt->loser[0] = winner;
}

4.5 性能对比

在 k=256 路归并的基准测试中(数据来自实际项目经验):

度量 最小堆 败者树 差距
比较次数/元素 16 8 2x
分支误预测率 ~40% ~30% 显著
吞吐量 80M elem/s 120M elem/s 1.5x
L1 缓存命中率 92% 95% -

败者树的 1.5 倍吞吐量优势主要来自比较次数减半和更好的分支预测行为。当 k 更大时,优势更明显。

五、缓冲区管理与异步 I/O

外部排序的性能瓶颈在 I/O。聪明的缓冲区管理策略能让 CPU 和磁盘并行工作,从而接近磁盘带宽的理论上限。

5.1 单缓冲的问题

最简单的实现:每个 run 分配一个缓冲区,用完了就同步读取下一块。问题是显而易见的——CPU 在等待磁盘时完全空闲,磁盘在 CPU 处理数据时也空闲。

时间轴(单缓冲):
CPU:  [处理][等待][处理][等待][处理]
磁盘: [等待][读取][等待][读取][等待]

5.2 双缓冲(Double Buffering)

每个 run 分配两个缓冲区(A 和 B)。当 CPU 消费缓冲区 A 中的数据时,磁盘异步将下一块数据预读到缓冲区 B。A 用完后切换到 B,同时开始异步读取下一块到 A。

时间轴(双缓冲):
CPU:  [处理A][处理B][处理A][处理B]
磁盘: [读取B][读取A][读取B][读取A]
      ^重叠^  ^重叠^  ^重叠^

代价是内存使用翻倍:k 路归并需要 2k 个输入缓冲区加 2 个输出缓冲区。这意味着最大归并路数减半。但在绝大多数情况下,这是值得的,因为减少的路数对归并趟数的影响微乎其微(对数函数增长极慢)。

5.3 输出缓冲区的双缓冲

输出侧同样需要双缓冲。当一个输出缓冲区满时,异步写入磁盘,同时切换到另一个继续填充。这避免了输出阻塞归并过程。

typedef struct {
    char *buf[2];       /* 两个输出缓冲区 */
    int active;         /* 当前正在填充的缓冲区 (0 或 1) */
    size_t pos;         /* 当前缓冲区的写入位置 */
    size_t buf_size;    /* 缓冲区大小 */
    int write_fd;       /* 输出文件描述符 */
    /* 异步写状态 */
    struct aiocb aio;   /* POSIX AIO 控制块 */
    int write_pending;  /* 是否有异步写操作进行中 */
} OutputBuffer;

/* 写入一条记录到输出缓冲区 */
int output_buffer_write(OutputBuffer *ob, const void *record, size_t len) {
    if (ob->pos + len > ob->buf_size) {
        /* 当前缓冲区满,等待上一次异步写完成 */
        if (ob->write_pending) {
            while (aio_error(&ob->aio) == EINPROGRESS) {
                /* 自旋等待或 yield */
            }
            ob->write_pending = 0;
        }
        /* 启动异步写 */
        memset(&ob->aio, 0, sizeof(struct aiocb));
        ob->aio.aio_fildes = ob->write_fd;
        ob->aio.aio_buf = ob->buf[ob->active];
        ob->aio.aio_nbytes = ob->pos;
        aio_write(&ob->aio);
        ob->write_pending = 1;

        /* 切换缓冲区 */
        ob->active ^= 1;
        ob->pos = 0;
    }
    memcpy(ob->buf[ob->active] + ob->pos, record, len);
    ob->pos += len;
    return 0;
}

5.4 预读策略

更进一步的优化是预测性预读(prefetch)。当某一路的缓冲区消费到一半时,就提前触发异步读取该路的下一块。核心思路是跟踪每路的消费进度,在缓冲区耗尽前启动下一次 I/O。实现要点:用一个 read_pending 标志避免重复提交,在消费到 50% 时就触发预读,缓冲区耗尽时交换读/消费角色后立即启动下一轮。

5.5 Linux 下的 I/O 选择

实际系统中,POSIX AIO 的实现质量参差不齐。Linux 上更常见的选择是:

  1. io_uring(Linux 5.1+):最现代的异步 I/O 接口,通过共享内存环形缓冲区提交和完成 I/O 请求,系统调用次数极少
  2. libaio(Linux 原生 AIO):内核级异步 I/O,要求 O_DIRECT,绕过页缓存
  3. 线程池 + pread:最便携的方案,用专门的 I/O 线程做阻塞读写
  4. mmap + madvise(MADV_SEQUENTIAL):利用内核的预读机制,代码最简单

对于外部排序这种顺序访问模式,mmap + madvise 通常就够了。内核的预读机制会自动检测顺序访问模式并提前加载数据。

六、磁盘顺序读写 vs 随机读写

外部排序之所以强调”多路归并”而不是”外部快排”,根本原因是磁盘的顺序读写性能远远超过随机读写。

6.1 性能差异的物理根源

机械硬盘(HDD)

固态硬盘(SSD)

6.2 实测数据对比

以下是在典型硬件上的 fio 基准测试结果(4KB 块大小,队列深度 32):

设备类型          | 顺序读    | 随机读    | 顺序写    | 随机写    | 顺序/随机(读)
-----------------+-----------+-----------+-----------+-----------+-------------
7200RPM HDD      | 180 MB/s  | 1.5 MB/s  | 170 MB/s  | 1.2 MB/s  | 120x
SATA SSD         | 550 MB/s  | 350 MB/s  | 520 MB/s  | 180 MB/s  | 1.6x
NVMe SSD         | 3500 MB/s | 800 MB/s  | 3000 MB/s | 400 MB/s  | 4.4x
Intel Optane     | 2500 MB/s | 2200 MB/s | 2200 MB/s | 2000 MB/s | 1.1x

关键观察:

6.3 对外部排序设计的影响

这些数据直接影响外部排序的设计选择:

七、SQLite 的外部排序实现

SQLite 是最广泛部署的数据库引擎,其外部排序实现值得研究。它的代码在 vdbesort.c 中,约 3000 行。

7.1 整体架构

SQLite 的排序器(sorter)有两种模式:

  1. 内存模式:数据量小于配置阈值时,使用内存中的 B-tree
  2. 外部排序模式:数据量超过阈值时,切换到多路归并

切换阈值由 SQLITE_CONFIG_PMASZ(PMA Size)控制,默认 250 个页面(约 1MB)。

7.2 PMA(Pre-sorted Merge Area)

SQLite 将初始 run 称为 PMA。生成过程:

1. 记录先被插入一个内存中的 B-tree(红黑树变体)
2. 当内存 B-tree 达到阈值大小:
   a. 按序遍历 B-tree,将有序记录写入临时文件
   b. 清空 B-tree,开始积累下一个 PMA
3. 写出时使用变长整数编码记录长度,紧凑存储

注意 SQLite 没有使用替换选择——它选择了简单的”填满内存 B-tree 再排出”策略。这在小型嵌入式场景下是合理的选择。

7.3 多线程 PMA 生成

从 SQLite 3.7.13 开始,引入了多线程排序支持。工作方式:

/* 伪代码:SQLite 的多线程 PMA 生成 */
/* 主线程 */
while (has_more_records()) {
    btree_insert(pSorter->pBtree, record);
    if (btree_size(pSorter->pBtree) >= threshold) {
        /* 将当前 B-tree 交给后台线程写出 */
        hand_off_to_bg_thread(pSorter->pBtree);
        /* 主线程立即开始新的 B-tree */
        pSorter->pBtree = btree_new();
    }
}
/* 后台线程 */
void bg_thread_func(BTree *bt) {
    traverse_and_write_pma(bt);
    btree_free(bt);
}

这样主线程和写出线程可以并行工作:一个在排序,一个在写磁盘。

7.4 归并阶段

SQLite 的归并使用了标准的多路归并,但有几个工程细节值得注意:

  1. 增量归并:不是一次性归并完所有 PMA,而是每次 sqlite3_step() 调用时只归并并返回一条记录。这避免了排序操作阻塞整个查询
  2. 记录压缩:相邻记录如果有公共前缀,只存储差异部分
  3. 文件管理:多个 PMA 可以存在同一个临时文件中,用偏移量区分,减少文件描述符消耗

7.5 关键源码片段分析

以下是 vdbesort.c 中归并的核心逻辑(简化后):

/* 从 PMA 读取器中取下一条记录 */
static int vdbePmaReaderNext(PmaReader *pReader) {
    int rc = SQLITE_OK;
    if (pReader->iReadOff >= pReader->iEof) {
        /* 当前 PMA 已读完 */
        pReader->pKey = 0;
        return SQLITE_OK;
    }
    /* 读取记录长度(变长整数) */
    rc = vdbePmaReadVarint(pReader, &nRec);
    /* 读取记录内容 */
    rc = vdbePmaReadBlob(pReader, (int)nRec, &pReader->pKey);
    pReader->nKey = (int)nRec;
    return rc;
}

/* 多路归并的一步:找到当前最小的 PMA 并推进 */
static int vdbeSorterNext(VdbeSorter *pSorter) {
    int rc;
    int iPrev = pSorter->aTree[1]; /* 败者树的总冠军 */
    /* 推进该路的 PMA 读取器 */
    rc = vdbePmaReaderNext(&pSorter->aReader[iPrev]);
    if (rc != SQLITE_OK) return rc;
    /* 重新调整败者树 */
    rc = vdbeSorterDoCompare(pSorter, iPrev);
    return rc;
}

SQLite 确实使用了败者树(称为 aTree)进行多路归并,这验证了败者树在实际系统中的价值。

八、LevelDB/RocksDB 中的排序与 SSTable 构建

LSM-tree 存储引擎的核心操作——compaction——本质上就是一个多路归并排序过程。

8.1 SSTable 的结构

SSTable(Sorted String Table)是一个有序的键值对文件,结构如下:

+-------------------+
| Data Block 0      |  <- 有序的 KV 对,前缀压缩
| Data Block 1      |
| ...               |
| Data Block N      |
+-------------------+
| Meta Block        |  <- 布隆过滤器等
+-------------------+
| Meta Index Block  |  <- Meta Block 的索引
+-------------------+
| Index Block       |  <- Data Block 的索引(每块最大 key)
+-------------------+
| Footer            |  <- 各种偏移量
+-------------------+

8.2 Memtable 刷盘:Minor Compaction

当内存中的 Memtable(一个跳表或 B-tree)达到大小阈值时,需要刷盘生成 SSTable:

Memtable(有序)→ 顺序扫描 → 写 SSTable 文件

这个过程本质上就是外部排序”阶段一”的变体:内存中的有序数据写出为磁盘上的有序 run(SSTable)。

8.3 Compaction:多路归并

Major Compaction 将多个 SSTable 合并为一个更大的 SSTable:

/* RocksDB compaction 的核心归并循环(简化) */
void DoCompaction(CompactionState *state) {
    MergingIterator *merge_iter = NewMergingIterator(
        state->input_sstables,
        state->num_inputs
    );

    for (merge_iter->SeekToFirst();
         merge_iter->Valid();
         merge_iter->Next()) {
        Slice key = merge_iter->key();
        Slice value = merge_iter->value();

        /* 处理删除标记(tombstone)、版本控制等 */
        if (should_drop(key, value)) continue;

        /* 写入输出 SSTable */
        state->builder->Add(key, value);

        /* 如果输出 SSTable 达到目标大小,切分 */
        if (state->builder->FileSize() >= target_file_size) {
            FinishOutputFile(state);
            StartNewOutputFile(state);
        }
    }
}

8.4 RocksDB 的特殊优化

RocksDB 在 LevelDB 的基础上增加了多项排序相关的优化:

  1. 子压缩(Subcompaction):将一次 compaction 的键空间范围分成多个子范围,每个子范围由独立线程处理,实现并行归并
  2. 直接 I/O:绕过操作系统页缓存,避免写入大量 SSTable 时”污染”缓存
  3. Rate Limiter:限制 compaction 的 I/O 带宽,避免影响前台读写请求
compaction 线程数 = max_subcompactions(默认 1)
I/O 限制 = rate_bytes_per_sec(默认不限制)

8.5 与经典外部排序的异同

方面 经典外部排序 LSM-tree Compaction
目标 一次性全量排序 增量维护有序性
输入 一个无序文件 多个有序 SSTable
输出 一个有序文件 一个或多个有序 SSTable
触发 一次性 条件触发(大小/数量)
版本控制 需要处理多版本和删除标记
并发性 不关注 必须与前台读写并发

九、SSD 时代的外部排序演进

SSD 的普及改变了外部排序的性能特征。传统的”一切为了顺序 I/O”的设计原则需要重新审视。

9.1 SSD 带来的变化

  1. 随机读延迟降低 1000 倍:HDD 的 8-10ms → SSD 的 10-100μs
  2. I/O 并行度提升:NVMe SSD 支持 64K 队列深度,而 HDD 几乎是串行的
  3. 写放大成为新瓶颈:SSD 的擦写次数有限,过多的写操作影响寿命

9.2 策略演进

更大的归并路数

在 HDD 上,k 路归并的 k 个输入流会导致磁头频繁寻道。在 SSD 上这不再是问题,可以大胆使用 k=数百甚至数千的路数,减少归并趟数。

允许少量随机读

一些系统开始探索”外部分区排序”(External Distribution Sort),类似外部快排:

1. 选取 p 个枢轴值,将数据分配到 p+1 个桶(分区)
2. 每个桶独立排序
3. 因为桶已经按范围划分,所以无需归并

这种方法在 HDD 上因为随机写而不可行,但在 SSD 上变得可以接受。

减少写放大

SSD 上需要关注总写入量。一些优化策略:

9.3 NVMe 多队列优化

现代 NVMe SSD 有多个硬件队列。外部排序可以利用这个特性:

/* 多队列 I/O 提交(概念代码) */
void submit_io_requests(InputBuffer bufs[], int k) {
    for (int i = 0; i < k; i++) {
        if (bufs[i].needs_refill) {
            /* 每个 run 的 I/O 提交到不同的队列 */
            /* io_uring 会自动利用多队列 */
            struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
            io_uring_prep_readv(sqe, bufs[i].fd,
                                &bufs[i].iov, 1,
                                bufs[i].offset);
            sqe->user_data = i;
        }
    }
    io_uring_submit(&ring);
}

关键是:所有 I/O 请求一次性提交,让 NVMe 控制器自己调度并行执行。

十、分布式排序:MapReduce 的 Shuffle & Sort

当数据量超过单机处理能力时,需要分布式排序。MapReduce 的 shuffle 阶段就是一个大规模的外部排序。

10.1 MapReduce 排序流程

Map 阶段:
  每个 Mapper 读取输入切片
  → 执行 map 函数
  → 按 key 的 hash(key) % R 分区
  → 每个分区内排序(内排序)
  → 写入本地磁盘(溢写文件)
  → 本地归并溢写文件

Shuffle 阶段:
  每个 Reducer 从所有 Mapper 拉取自己分区的数据
  → 网络传输
  → 本地归并排序

Reduce 阶段:
  按有序 key 遍历,执行 reduce 函数

10.2 Map 端的排序

Hadoop MapReduce 的 Map 端排序实现了一个紧凑的外部排序:

  1. 环形缓冲区:大小由 mapreduce.task.io.sort.mb 控制(默认 100MB)
  2. 溢写阈值:缓冲区使用率达到 mapreduce.map.sort.spill.percent(默认 0.8)时触发溢写
  3. 排序方法:对环形缓冲区中的索引做快排(不移动数据本身)
  4. 合并:所有溢写文件做最终多路归并
环形缓冲区设计:
+--------------------------------------------------+
|  数据区域 →                      ← 索引区域      |
|  [record][record][record]...  ...[idx][idx][idx]  |
+--------------------------------------------------+
^                                                   ^
equator(随数据和索引的增长移动)                    

数据从左往右写,索引从右往左写。当两者相遇时触发溢写。这个设计避免了额外的内存拷贝。

10.3 Reduce 端的归并

Reduce 端需要归并来自 M 个 Mapper 的数据。归并策略:

假设有 M 个 Mapper 输出,归并因子为 f:

如果 M <= f:
    一趟归并完成

如果 M > f:
    第一趟:归并 M - f*(p-1) 个文件,使剩余文件数恰好为 f 的倍数
    后续每趟:归并 f 个文件
    最后一趟:归并最后 f 个文件,直接送入 reduce 函数

其中 p = ⌈(M - f) / (f - 1)⌉ + 1 是总趟数

这个策略确保最后一趟的归并路数恰好是 f,最大化最后一趟的效率。

10.4 现代分布式排序

SparkFlink 等系统在 MapReduce 基础上做了进一步优化:Spark 使用 TimSort 做内排序,支持 Tungsten 二进制排序(直接比较序列化后的字节);Flink 实现了自己的内存管理器,排序在堆外内存中完成,避免 Java GC 开销。

十一、完整的 C 语言实现

以下是一个完整的多路外部归并排序实现,包含败者树、双缓冲和基本错误处理。

/*
 * external_sort.c — 多路外部归并排序(败者树实现)
 *
 * 编译:gcc -O2 -o external_sort external_sort.c -lpthread
 * 使用:./external_sort input.bin output.bin
 *
 * 数据格式:二进制文件,每条记录为一个 int64_t
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <errno.h>
#include <limits.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>

/* ===== 配置参数 ===== */
#define MEMORY_LIMIT    (256 * 1024 * 1024)   /* 256MB 可用内存 */
#define BLOCK_SIZE      (4 * 1024 * 1024)     /* 4MB I/O 块大小 */
#define MAX_MERGE_WAYS  512                    /* 最大归并路数 */
#define RECORD_SIZE     sizeof(int64_t)        /* 记录大小 */

/* 每个 run 最多容纳的记录数 */
#define RECORDS_PER_RUN (MEMORY_LIMIT / RECORD_SIZE)

/* 每个 I/O 块容纳的记录数 */
#define RECORDS_PER_BLK (BLOCK_SIZE / RECORD_SIZE)

/* ===== 错误处理宏 ===== */
#define CHECK(cond, msg) do { \
    if (!(cond)) { \
        fprintf(stderr, "ERROR [%s:%d]: %s (errno=%d: %s)\n", \
                __FILE__, __LINE__, msg, errno, strerror(errno)); \
        exit(EXIT_FAILURE); \
    } \
} while(0)

/* ===== 比较函数 ===== */
static int cmp_int64(const void *a, const void *b) {
    int64_t va = *(const int64_t *)a;
    int64_t vb = *(const int64_t *)b;
    if (va < vb) return -1;
    if (va > vb) return  1;
    return 0;
}

/* ===== 败者树 ===== */
typedef struct {
    int loser[MAX_MERGE_WAYS]; /* 内部节点:败者的路编号 */
    int64_t keys[MAX_MERGE_WAYS]; /* 各路当前键值 */
    int k;                     /* 归并路数 */
} LoserTree;

static void lt_init(LoserTree *lt, int k) {
    lt->k = k;
    for (int i = 0; i < k; i++) {
        lt->loser[i] = -1;
        lt->keys[i] = INT64_MAX;
    }
}

static void lt_adjust(LoserTree *lt, int s) {
    int k = lt->k;
    int winner = s;
    int parent = (k + s) / 2;

    while (parent > 0) {
        if (lt->loser[parent] == -1) {
            lt->loser[parent] = winner;
            return;
        }
        if (lt->keys[winner] > lt->keys[lt->loser[parent]]) {
            int tmp = winner;
            winner = lt->loser[parent];
            lt->loser[parent] = tmp;
        }
        parent /= 2;
    }
    lt->loser[0] = winner;
}

static void lt_build(LoserTree *lt, int64_t initial[], int k) {
    lt_init(lt, k);
    for (int i = k - 1; i >= 0; i--) {
        lt->keys[i] = initial[i];
        lt_adjust(lt, i);
    }
}

static inline int lt_winner(const LoserTree *lt) {
    return lt->loser[0];
}

static void lt_update(LoserTree *lt, int s, int64_t new_key) {
    lt->keys[s] = new_key;
    int k = lt->k;
    int winner = s;
    int parent = (k + s) / 2;

    while (parent > 0) {
        if (lt->keys[winner] > lt->keys[lt->loser[parent]]) {
            int tmp = winner;
            winner = lt->loser[parent];
            lt->loser[parent] = tmp;
        }
        parent /= 2;
    }
    lt->loser[0] = winner;
}

/* ===== Run 信息 ===== */
typedef struct {
    char path[256]; /* 临时文件路径 */
    size_t count;   /* 该 run 中的记录数 */
} RunInfo;

/* ===== 阶段一:生成初始 runs ===== */
static int generate_runs(const char *input_path,
                         RunInfo runs[], int *num_runs) {
    FILE *fin = fopen(input_path, "rb");
    CHECK(fin != NULL, "无法打开输入文件");

    /* 获取文件大小 */
    fseek(fin, 0, SEEK_END);
    long file_size = ftell(fin);
    fseek(fin, 0, SEEK_SET);
    size_t total_records = (size_t)file_size / RECORD_SIZE;

    /* 分配内存缓冲区 */
    int64_t *buffer = (int64_t *)malloc(MEMORY_LIMIT);
    CHECK(buffer != NULL, "内存分配失败");

    int run_count = 0;
    size_t records_remaining = total_records;

    fprintf(stderr, "总记录数: %zu, 每个 run 记录数: %zu\n",
            total_records, (size_t)RECORDS_PER_RUN);

    while (records_remaining > 0) {
        /* 读取一批记录到内存 */
        size_t batch = records_remaining < RECORDS_PER_RUN
                       ? records_remaining : RECORDS_PER_RUN;
        size_t nread = fread(buffer, RECORD_SIZE, batch, fin);
        CHECK(nread == batch, "读取输入文件失败");

        /* 内部排序 */
        qsort(buffer, batch, RECORD_SIZE, cmp_int64);

        /* 写入临时文件 */
        snprintf(runs[run_count].path, sizeof(runs[run_count].path),
                 "run_%04d.tmp", run_count);
        runs[run_count].count = batch;

        FILE *frun = fopen(runs[run_count].path, "wb");
        CHECK(frun != NULL, "无法创建临时 run 文件");

        size_t nwrite = fwrite(buffer, RECORD_SIZE, batch, frun);
        CHECK(nwrite == batch, "写入 run 文件失败");
        fclose(frun);

        fprintf(stderr, "  生成 run %d: %zu 条记录\n",
                run_count, batch);

        run_count++;
        records_remaining -= batch;
    }

    free(buffer);
    fclose(fin);

    *num_runs = run_count;
    fprintf(stderr, "阶段一完成: 共 %d 个 runs\n", run_count);
    return 0;
}

/* ===== Run 读取器 ===== */
typedef struct {
    FILE *fp;
    int64_t *buf;            /* 读取缓冲区 */
    size_t buf_capacity;     /* 缓冲区容量(记录数) */
    size_t buf_count;        /* 缓冲区有效记录数 */
    size_t buf_pos;          /* 当前读取位置 */
    size_t remaining;        /* 文件中剩余的记录数 */
    int exhausted;           /* 是否已耗尽 */
} RunReader;

static int run_reader_open(RunReader *rr, const RunInfo *ri,
                           size_t buf_capacity) {
    rr->fp = fopen(ri->path, "rb");
    CHECK(rr->fp != NULL, "无法打开 run 文件");

    rr->buf = (int64_t *)malloc(buf_capacity * RECORD_SIZE);
    CHECK(rr->buf != NULL, "读取缓冲区分配失败");

    rr->buf_capacity = buf_capacity;
    rr->buf_count = 0;
    rr->buf_pos = 0;
    rr->remaining = ri->count;
    rr->exhausted = 0;
    return 0;
}

static int run_reader_refill(RunReader *rr) {
    if (rr->remaining == 0) {
        rr->exhausted = 1;
        return -1;
    }
    size_t to_read = rr->remaining < rr->buf_capacity
                     ? rr->remaining : rr->buf_capacity;
    size_t nread = fread(rr->buf, RECORD_SIZE, to_read, rr->fp);
    CHECK(nread == to_read, "读取 run 缓冲区失败");

    rr->buf_count = nread;
    rr->buf_pos = 0;
    rr->remaining -= nread;
    return 0;
}

static int run_reader_next(RunReader *rr, int64_t *key) {
    if (rr->buf_pos >= rr->buf_count) {
        if (run_reader_refill(rr) < 0) return -1;
    }
    *key = rr->buf[rr->buf_pos++];
    return 0;
}

static void run_reader_close(RunReader *rr) {
    if (rr->fp) fclose(rr->fp);
    free(rr->buf);
    rr->fp = NULL;
    rr->buf = NULL;
}

/* ===== 阶段二:多路归并 ===== */
static int merge_runs(const char *output_path,
                      RunInfo runs[], int num_runs) {
    if (num_runs == 0) return 0;

    /* 如果只有一个 run,直接重命名 */
    if (num_runs == 1) {
        rename(runs[0].path, output_path);
        return 0;
    }

    int k = num_runs;
    if (k > MAX_MERGE_WAYS) {
        fprintf(stderr, "WARN: run 数量 %d 超过最大归并路数 %d,"
                "需要多趟归并(此实现暂不支持)\n",
                k, MAX_MERGE_WAYS);
        return -1;
    }

    /* 为每路分配缓冲区 */
    size_t buf_per_way = (MEMORY_LIMIT / RECORD_SIZE) / (k + 1);
    if (buf_per_way < 1) buf_per_way = 1;

    fprintf(stderr, "阶段二: %d 路归并,每路缓冲 %zu 条记录\n",
            k, buf_per_way);

    /* 打开所有 run 读取器 */
    RunReader *readers = (RunReader *)calloc(k, sizeof(RunReader));
    CHECK(readers != NULL, "读取器数组分配失败");

    for (int i = 0; i < k; i++) {
        run_reader_open(&readers[i], &runs[i], buf_per_way);
    }

    /* 读取每路的第一条记录,构建败者树 */
    int64_t initial_keys[MAX_MERGE_WAYS];
    for (int i = 0; i < k; i++) {
        if (run_reader_next(&readers[i], &initial_keys[i]) < 0) {
            initial_keys[i] = INT64_MAX; /* 空 run 用哨兵值 */
        }
    }

    LoserTree lt;
    lt_build(&lt, initial_keys, k);

    /* 打开输出文件 */
    FILE *fout = fopen(output_path, "wb");
    CHECK(fout != NULL, "无法打开输出文件");

    /* 输出缓冲区 */
    size_t out_buf_cap = buf_per_way;
    int64_t *out_buf = (int64_t *)malloc(out_buf_cap * RECORD_SIZE);
    CHECK(out_buf != NULL, "输出缓冲区分配失败");
    size_t out_pos = 0;

    /* 归并主循环 */
    size_t total_merged = 0;
    while (1) {
        int w = lt_winner(&lt);
        if (lt.keys[w] == INT64_MAX) break; /* 所有路耗尽 */

        /* 写入输出缓冲区 */
        out_buf[out_pos++] = lt.keys[w];
        if (out_pos >= out_buf_cap) {
            size_t nw = fwrite(out_buf, RECORD_SIZE, out_pos, fout);
            CHECK(nw == out_pos, "写入输出文件失败");
            out_pos = 0;
        }

        /* 从胜出的路读取下一条记录 */
        int64_t next_key;
        if (run_reader_next(&readers[w], &next_key) < 0) {
            next_key = INT64_MAX; /* 该路耗尽 */
        }
        lt_update(&lt, w, next_key);

        total_merged++;
        if (total_merged % 10000000 == 0) {
            fprintf(stderr, "  已归并 %zu 条记录\r", total_merged);
        }
    }

    /* 刷出剩余的输出缓冲区 */
    if (out_pos > 0) {
        size_t nw = fwrite(out_buf, RECORD_SIZE, out_pos, fout);
        CHECK(nw == out_pos, "刷出输出缓冲区失败");
    }

    fprintf(stderr, "\n阶段二完成: 共归并 %zu 条记录\n", total_merged);

    /* 清理 */
    free(out_buf);
    fclose(fout);
    for (int i = 0; i < k; i++) {
        run_reader_close(&readers[i]);
        remove(runs[i].path); /* 删除临时文件 */
    }
    free(readers);

    return 0;
}

/* ===== 验证排序结果 ===== */
static int verify_sorted(const char *path) {
    FILE *f = fopen(path, "rb");
    CHECK(f != NULL, "无法打开文件进行验证");

    int64_t prev = INT64_MIN;
    int64_t cur;
    size_t count = 0;

    while (fread(&cur, RECORD_SIZE, 1, f) == 1) {
        if (cur < prev) {
            fprintf(stderr, "验证失败: 位置 %zu, %ld > %ld\n",
                    count, (long)prev, (long)cur);
            fclose(f);
            return -1;
        }
        prev = cur;
        count++;
    }

    fclose(f);
    fprintf(stderr, "验证通过: %zu 条记录有序\n", count);
    return 0;
}

/* ===== 主函数 ===== */
int main(int argc, char *argv[]) {
    if (argc != 3) {
        fprintf(stderr, "用法: %s <输入文件> <输出文件>\n", argv[0]);
        return EXIT_FAILURE;
    }

    const char *input_path = argv[1];
    const char *output_path = argv[2];

    RunInfo runs[MAX_MERGE_WAYS];
    int num_runs = 0;

    fprintf(stderr, "==== 外部排序开始 ====\n");
    fprintf(stderr, "内存限制: %d MB, 块大小: %d MB\n",
            MEMORY_LIMIT / (1024*1024), BLOCK_SIZE / (1024*1024));

    /* 阶段一 */
    int rc = generate_runs(input_path, runs, &num_runs);
    CHECK(rc == 0, "生成初始 runs 失败");

    /* 阶段二 */
    rc = merge_runs(output_path, runs, num_runs);
    CHECK(rc == 0, "归并排序失败");

    /* 验证 */
    rc = verify_sorted(output_path);
    CHECK(rc == 0, "排序结果验证失败");

    fprintf(stderr, "==== 外部排序完成 ====\n");
    return EXIT_SUCCESS;
}

代码结构说明

整个实现分为四个模块:

  1. 败者树lt_* 函数):提供 O(log k) 的多路归并核心操作
  2. Run 生成generate_runs):将输入切分为内存大小的有序 run
  3. Run 读取器RunReader):带缓冲的 run 顺序读取
  4. 归并主循环merge_runs):用败者树驱动的多路归并

这个实现为了清晰性省略了一些生产级特性(多趟归并、异步 I/O、多线程),但核心算法是完整的。

十二、工程陷阱与最佳实践

外部排序的工程实现中有许多容易踩到的坑,以下是多年经验的总结:

陷阱 现象 解决方案
临时文件未清理 异常退出后磁盘空间被大量临时文件占满 注册 atexit() 或信号处理函数,确保清理;使用 O_TMPFILE(Linux 3.11+)创建无名临时文件
归并路数过大导致内存不足 每路分配的缓冲区太小,频繁触发 I/O 动态计算最优路数:k = min(num_runs, (M - output_buf) / (2 * B)),预留双缓冲和输出缓冲区的空间
文件描述符耗尽 大量 run 同时打开,超过 ulimit -n setrlimit() 提高限制,或实现”虚拟文件描述符”池(打开/关闭按需切换)
输出缓冲区太小 频繁的小块写入导致磁盘吞吐量骤降 输出缓冲区至少 1MB,最好 4-16MB
跨平台 I/O 行为差异 Windows 的 fwrite 在某些配置下不是原子的 使用平台特定的 API(WriteFile / pwrite),或加锁保护
排序键包含指针 排序后指针失效,反序列化崩溃 排序时只携带偏移量或扁平化的键,归并完成后再回填指针
未考虑磁盘空间 临时文件需要的空间约等于原始数据大小 排序前检查可用空间(statvfs()),至少需要 1x 原始数据量的额外空间
变长记录的对齐问题 读取未对齐的数据导致性能下降或 bus error 按对齐边界填充记录,或使用 memcpy 代替直接指针转换
大文件偏移溢出 32 位 off_t 在 2GB 处溢出 确保编译时定义 _FILE_OFFSET_BITS=64,使用 off64_t
SSD 的写放大 反复写入临时文件加速 SSD 磨损 尽量减少归并趟数,使用 RAM disk 存放临时文件(如果内存足够),或使用 direct I/O 绕过页缓存减少不必要的回写
相同键的稳定性 外部排序默认不保证稳定性 在排序键后附加序号(原始位置),比较时作为 tie-breaker
归并时的热点竞争 多线程归并时,败者树成为竞争点 每个线程独立归并一部分 run,最后再归并各线程的结果

十三、个人思考

写到这里,回顾外部排序这个话题,有几点感想。

第一,I/O 模型仍然是理解存储系统设计的基石。 无论是数据库的 B-tree、LSM-tree,还是大数据系统的 shuffle,底层都是在优化 I/O 次数和模式。理解 O(N/B · log_{M/B}(N/B)) 这个公式,就能理解为什么这些系统做出了各自的设计选择。

第二,败者树是一个被低估的数据结构。 教科书通常只花一两页讲败者树,然后就跳到下一个话题。但在外部排序、定时器管理、调度器等场景中,败者树的性能优势是实实在在的。如果你在实现任何形式的 k 路归并,首选败者树,不要用最小堆。

第三,硬件在变,算法也在变。 HDD 时代的”一切为了顺序访问”已经不完全适用于 SSD。但这不意味着外部排序过时了——即使 SSD 很快,内存和磁盘之间的速度差距仍然是数量级的。外部排序的核心思想——“感知 I/O 层级、减少数据移动”——在可预见的未来仍然是有效的。

第四,简单的方案往往是最好的。 替换选择听起来很酷(平均 2M 的 run!),但现代系统越来越倾向于用简单的内部排序替代它。原因很实际:实现更简单、调试更容易、缓存更友好,而且多出来的一趟归并在 SSD 上代价很小。工程中,可维护性和可靠性永远优先于理论上的最优。

第五,外部排序是分布式系统的缩影。 MapReduce 的 shuffle 就是多机版的外部排序。理解了单机外部排序中的缓冲区管理、流水线和负载均衡,就能更好地理解分布式系统中类似问题的设计。从这个角度看,外部排序不仅是一个算法题目,更是一种系统思维的训练。

参考文献

  1. Aggarwal, A. and Vitter, J.S. “The Input/Output Complexity of Sorting and Related Problems.” Communications of the ACM, 31(9):1116-1127, 1988.
  2. Knuth, D.E. The Art of Computer Programming, Volume 3: Sorting and Searching, 2nd Edition. Addison-Wesley, 1998. Section 5.4: External Sorting.
  3. Graefe, G. “Implementing Sorting in Database Systems.” ACM Computing Surveys, 38(3), 2006.
  4. SQLite 源码:vdbesort.chttps://sqlite.org/src/file/src/vdbesort.c
  5. O’Neil, P. et al. “The Log-Structured Merge-Tree (LSM-tree).” Acta Informatica, 33(4):351-385, 1996.
  6. Dean, J. and Ghemawat, S. “MapReduce: Simplified Data Processing on Large Clusters.” OSDI, 2004.
  7. RocksDB Wiki: Compaction, https://github.com/facebook/rocksdb/wiki/Compaction
  8. Rumble, S. et al. “Log-structured Memory for DRAM-based Storage.” FAST, 2014.
  9. Intel. “Optimizing Performance with Intel Optane Technology.” Technical Report, 2019.

算法系列导航上一篇:基数排序 | 下一篇:并行排序

相关阅读LSM-tree Compaction 策略 | B-tree 深度解剖


By .