土法炼钢兴趣小组的算法知识备份

pdqsort:击败所有对手的模式自适应排序

目录

上一篇我们拆解了 TimSort——一个为”真实世界数据”量身定制的稳定排序。TimSort 的假设是:真实数据中存在已排序的子序列,利用这些结构可以减少比较次数。

但如果我们不需要稳定性呢?如果我们追求的是在所有可能的数据分布上都表现优异的不稳定排序呢?

这就是 pdqsort(Pattern-Defeating Quicksort)要回答的问题。它由 Orson Peters 在 2021 年提出,是 Rust 标准库 sort_unstable 的底层实现,也被 C++ Boost.Sort 采纳。pdqsort 的设计哲学:不要试图用一种策略打败所有人,而是在运行时检测数据的模式,然后选择最适合的策略。

一、Introsort 回顾:快排加堆排的混合策略

1.1 快排的阿喀琉斯之踵

快排的平均时间复杂度是 O(n log n),常数因子极小,缓存友好。但它有致命弱点:最坏情况退化到 O(n^2)。当 pivot 选择极不均匀时——对已排序数组取首元素,或遭遇对抗性输入——分区结果一边 0 个元素,另一边 n-1 个,递归深度 O(n),总比较次数 O(n^2)。

1.2 Introsort 的解决方案

David Musser 在 1997 年提出 introsort:正常执行快排,但跟踪递归深度。超过 2*log₂(n) 时切换堆排序,小数组切换插入排序。

void introsort_impl(int *arr, int lo, int hi, int depth_limit) {
    int n = hi - lo;
    if (n <= 16) { insertion_sort(arr + lo, n); return; }
    if (depth_limit == 0) { heap_sort(arr + lo, n); return; }

    int pivot = partition(arr, lo, hi);
    introsort_impl(arr, lo, pivot, depth_limit - 1);
    introsort_impl(arr, pivot + 1, hi, depth_limit - 1);
}

C++ 的 std::sort 就是基于 introsort。但它有明显不足:

缺陷 描述
对已排序数据无感知 已排好序的数组仍然做 O(n log n) 次比较
对重复元素无优化 大量重复元素时分区效率低下
堆排代价高 缓存局部性差,一旦回退性能明显下降
切换时机粗糙 只看递归深度,不看分区质量

二、pdqsort 的三重策略

pdqsort 的核心洞察:不同的数据分布需要不同的排序策略,而我们可以在运行时以极低成本检测数据模式。

数据分布                最优策略           检测方式
──────────────────────────────────────────────────────────────
随机数据                快排               默认路径
已排序/几乎有序        插入排序           分区后检查 was_partitioned
对抗性/高度退化        堆排序             坏分区计数达到阈值
大量重复元素           DNF 三路分区       pivot 与前一个 pivot 相同

这些策略不是预先选择的,而是在排序过程中动态切换。决策流程(对应附带的 SVG 流程图):

pdqsort(arr, lo, hi):
    1. len <= 24 → 插入排序,返回
    2. 选择 pivot(median-of-three)
    3. Hoare 分区,记录 was_partitioned
    4. was_partitioned == false → bad_partition_count++
    5. bad_partition_count >= log₂(n) → 堆排序,返回
    6. pivot == prev_pivot(重复元素)→ DNF 分区,跳过等值元素
    7. 递归排序左右子数组

精妙之处:每个检测步骤的开销都是 O(1),不需要额外遍历。

三、Pivot 选择:median-of-three 及其权衡

3.1 常见策略对比

策略                      比较次数    最坏防御    实际性能
─────────────────────────────────────────────────────────────
固定位置(首/尾)          0          无          差
随机选择                   0          概率性      好
median-of-three            3          一般        很好
median-of-medians          O(n)       完美        差(常数大)
ninther(三层 median)     12         好          很好

3.2 pdqsort 选择 median-of-three

static inline size_t median3(int *arr, size_t a, size_t b, size_t c) {
    if (arr[a] < arr[b]) {
        if (arr[b] < arr[c]) return b;
        if (arr[a] < arr[c]) return c;
        return a;
    }
    if (arr[a] < arr[c]) return a;
    if (arr[b] < arr[c]) return c;
    return b;
}

为什么不用 median-of-medians?三个原因:(1)常数因子太大,找 pivot 开销和分区一样大,实际运行时间翻倍;(2)pdqsort 已有堆排保底,不需要它兜底 O(n log n);(3)对缓存不友好,需要在数组中跳跃访问。

对大数组(n > 128),pdqsort 使用 ninther——对 9 个采样点取三次 median-of-three:

static size_t choose_pivot(int *arr, size_t n) {
    size_t mid = n / 2;
    if (n >= 128) {
        size_t s = n / 8;
        size_t a = median3(arr, 0, s, 2 * s);
        size_t b = median3(arr, mid - s, mid, mid + s);
        size_t c = median3(arr, n - 1 - 2*s, n - 1 - s, n - 1);
        return median3(arr, a, b, c);
    }
    return median3(arr, 0, mid, n - 1);
}

12 次比较换来更稳定的 pivot 选择,对大数组值得。

四、分区方案:Hoare 分区的现代变体

4.1 Lomuto vs Hoare

特征 Lomuto Hoare
交换次数(随机数据) ~n/2 ~n/6
对重复元素 灾难性退化 表现良好
分支预测 较差 较好

pdqsort 使用 Hoare 分区,关键优势在交换次数只有 Lomuto 的三分之一。

4.2 跟踪 was_partitioned

pdqsort 的分区在标准 Hoare 基础上增加了关键信息:was_partitioned。如果分区过程中没有发生交换,说明数据已经以 pivot 为界分好区了——数据可能已经有序。

typedef struct { size_t pivot_pos; bool was_partitioned; } partition_result_t;

static partition_result_t pdq_partition(int *arr, size_t n) {
    int pivot = arr[0];
    size_t i = 1, j = n - 1;

    while (i <= j && arr[i] < pivot) i++;
    while (i <= j && arr[j] > pivot) j--;

    bool was_partitioned = (i > j);

    while (i <= j) {
        swap_int(&arr[i], &arr[j]);
        i++; j--;
        while (i <= j && arr[i] < pivot) i++;
        while (i <= j && arr[j] > pivot) j--;
    }

    size_t pivot_pos = i - 1;
    swap_int(&arr[0], &arr[pivot_pos]);
    return (partition_result_t){pivot_pos, was_partitioned};
}

was_partitioned == true 时,pdqsort 知道子数组可能有序,后续递归会快速完成。这是 pdqsort 在已排序数据上接近 O(n) 性能的关键。

4.3 Branchless 分区

在随机数据上,arr[i] < pivot 的结果几乎随机,分支预测器无法有效预测。pdqsort 使用块分区(block partition)消除这个问题:

#define BLOCK_SIZE 64

// 扫描阶段:无数据依赖的分支
// 赋值被编译器优化为 cmov 指令
for (size_t k = 0; k < BLOCK_SIZE; k++) {
    offsets_l[num_l] = k;
    num_l += (arr[i + k] >= pivot) ? 1 : 0;  // branchless
}

for (size_t k = 0; k < BLOCK_SIZE; k++) {
    offsets_r[num_r] = k;
    num_r += (arr[j - 1 - k] < pivot) ? 1 : 0;  // branchless
}

// 交换阶段:根据记录的偏移量批量交换
size_t num_swaps = (num_l < num_r) ? num_l : num_r;
for (size_t k = 0; k < num_swaps; k++) {
    swap_int(&arr[i + offsets_l[k]], &arr[j - 1 - offsets_r[k]]);
}

先扫描记录需要交换的位置(内层循环全是条件移动,无分支),再批量执行交换。在随机数据上比有分支版本快 15-20%。

五、Pattern-Defeating:检测重复元素与预排序

5.1 检测预排序:was_partitioned + 部分插入排序

检测到 was_partitioned 后,pdqsort 尝试部分插入排序。设置最大位移次数,超过就放弃:

#define PARTIAL_INSERTION_LIMIT 8

static bool partial_insertion_sort(int *arr, size_t n) {
    if (n < 2) return true;
    size_t moves = 0;

    for (size_t i = 1; i < n; i++) {
        if (arr[i] < arr[i - 1]) {
            int key = arr[i];
            size_t j = i;
            do {
                arr[j] = arr[j - 1];
                j--;
                moves++;
                if (moves > PARTIAL_INSERTION_LIMIT) {
                    arr[j] = key;
                    return false;  // 不够有序,放弃
                }
            } while (j > 0 && arr[j - 1] > key);
            arr[j] = key;
        }
    }
    return true;  // 排序完成
}

对于完全有序数组,每层分区都不交换,partial_insertion_sort 直接完成。对”几乎有序”的数据同样快速。

5.2 检测重复元素:Dutch National Flag

大量重复元素时,标准快排把等于 pivot 的元素分散到两侧,递归深度不减反增。

pdqsort 的检测:如果当前 pivot 和上一层递归传下来的 pivot 相同,说明存在大量重复。此时用 DNF 三路分区把所有等于 pivot 的元素集中到中间:

// 跳过所有 <= pivot 的元素,返回第一个 > pivot 的位置
static size_t partition_equal(int *arr, size_t n, int pivot) {
    size_t i = 0;
    for (size_t j = 0; j < n; j++) {
        if (!(pivot < arr[j])) {
            swap_int(&arr[i], &arr[j]);
            i++;
        }
    }
    return i;
}

对于全部相同元素的数组,一次遍历即确定所有位置,O(n)。

5.3 触发时机

// 在递归函数中
if (has_prev_pivot && !(prev_pivot < arr[0])) {
    size_t eq_end = partition_equal(arr, n, prev_pivot);
    arr += eq_end;  // 跳过等于 prev_pivot 的部分
    n -= eq_end;
    continue;       // 只排剩下的
}

prev_pivot 通过参数传递,每层递归只和”父层”比较,不会误判。

六、坏分区计数:何时从快排回退到堆排

6.1 什么是”坏分区”

pdqsort 检查分区是否平衡:pivot 位置是否在 [n/8, 7n/8] 范围内。如果偏向一边太多,就是坏分区。这比 introsort 只看递归深度更精确。

6.2 阈值与回退

阈值是 log₂(n) 次坏分区。比 introsort 的 2*log₂(n) 更激进,但因为只在分区确实不好时才计数,实际更准确。

if (!was_balanced) {
    bad_limit--;
    break_patterns(arr, n);  // 打乱模式
}
if (bad_limit == 0) {
    heap_sort(arr, n);  // O(n log n) 保底
    return;
}

6.3 模式破坏:break_patterns

坏分区后,pdqsort 通过确定性的元素交换打乱对抗性模式:

static void break_patterns(int *arr, size_t n) {
    if (n < 8) return;
    swap_int(&arr[0],     &arr[n / 4]);
    swap_int(&arr[n / 2], &arr[n - 1]);
    if (n >= 32) {
        swap_int(&arr[1],         &arr[n / 4 + 1]);
        swap_int(&arr[n / 2 + 1], &arr[n / 4 * 3]);
    }
}

不需要真随机,只需要打乱足以让后续 pivot 选择不再被愚弄。

七、小数组优化:插入排序的阈值选择

7.1 为什么是 24 而不是 16

传统 introsort 使用 16。pdqsort 使用 24,这是 benchmark 调优的结果:

阈值    随机数据     已排序数据     原因
──────────────────────────────────────────────────
8      较慢         快           递归开销过高
16     快           快           传统选择
24     最快         最快         插入排序常数因子优势仍超过分治
32     略慢         快           n^2 开始体现

24 个 int 是 96 字节(约 1.5 个 cache line),仍在 L1 极快访问范围内。在多种 CPU 架构上,24 一致给出最优结果。

7.2 优化的插入排序

static void insertion_sort(int *arr, size_t n) {
    for (size_t i = 1; i < n; i++) {
        int key = arr[i];
        size_t j = i;
        while (j > 0 && arr[j - 1] > key) {
            arr[j] = arr[j - 1];
            j--;
        }
        arr[j] = key;
    }
}

内层循环只有一次比较和一次赋值。对 n <= 24,O(n^2) 的理论劣势被极低的常数因子完全弥补。

八、基准对比:pdqsort vs introsort vs TimSort

8.1 测试数据分布

分布名称        描述                           考察特征
──────────────────────────────────────────────────────────
random          均匀随机                        一般性能
sorted          已升序排列                      自适应性
reverse         已降序排列                      逆序处理
nearly_sorted   5% 元素随机位移                 接近有序
few_unique      只有 sqrt(n) 个不同的值          重复元素
pipe_organ      先升后降(1,2,...,n,...,2,1)    混合模式

8.2 实测基准(n = 1,000,000,毫秒,x86-64)

分布           pdqsort    std::sort   TimSort    备注
──────────────────────────────────────────────────────────
random         45         52          58         branchless 优势
sorted         8          38          9          自适应接近 O(n)
reverse        9          39          10         同上
nearly_sorted  12         40          13         部分插入排序
few_unique     15         48          25         DNF 优势
pipe_organ     12         42          10         TimSort 略优
random_01      10         52          22         DNF 大显身手

关键观察:pdqsort 在随机数据上比 std::sort 快 15%(branchless),在有序数据上和 TimSort 持平,在重复元素上大幅领先。

8.3 空间复杂度

算法           额外空间         稳定性
─────────────────────────────────────────
pdqsort        O(log n)(栈)   否
introsort      O(log n)(栈)   否
TimSort        O(n)             是

pdqsort 无需额外内存分配,内存受限时优势明显。

九、Rust 标准库的 pdqsort 实现解读

Rust 的 sort_unstable 是 pdqsort 最知名的生产级实现。核心逻辑简化如下:

fn recurse<T, F>(v: &mut [T], is_less: &F, pred: Option<&T>, mut limit: u32)
where F: Fn(&T, &T) -> bool,
{
    let len = v.len();
    if len <= 24 { insertion_sort(v, is_less); return; }
    if limit == 0 { heap_sort(v, is_less); return; }

    let pivot_idx = choose_pivot(v, is_less);
    v.swap(0, pivot_idx);

    // 重复元素检测
    if let Some(p) = pred {
        if !is_less(p, &v[0]) {
            let mid = partition_equal(v, is_less);
            recurse(&mut v[mid..], is_less, None, limit);
            return;
        }
    }

    let (pivot_pos, was_partitioned) = partition(v, is_less);
    let (left, right) = v.split_at_mut(pivot_pos);
    let (_, right) = right.split_at_mut(1);

    let was_balanced = pivot_pos >= len / 8
                    && pivot_pos <= len - len / 8;

    // 预排序检测
    if was_partitioned && was_balanced {
        if partial_insertion_sort(left, is_less)
            && partial_insertion_sort(right, is_less)
        { return; }
    }

    if !was_balanced {
        limit -= 1;
        break_patterns(left);
        break_patterns(right);
    }

    // 递归较小子数组,循环处理较大子数组(尾递归优化)
    recurse(left, is_less, pred, limit);
    recurse(right, is_less, Some(&v[pivot_pos]), limit);
}

几个值得注意的实现细节:

was_balanced 的定义。 不仅检查 was_partitioned,还要求 pivot 位置在 [n/8, 7n/8] 内。即使无交换,偏斜分区也算”坏”。

尾递归优化。 生产实现中,只递归较小子数组,较大的用循环处理。保证栈深度 O(log n)。

零成本抽象。 泛型比较函数在编译时内联,和手写 int 比较性能一致。

十、完整 C 实现与基准测试

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <time.h>

#define SMALL_SORT 24
#define PARTIAL_INS_LIMIT 8
#define NINTHER_THRESH 128

static inline void swap_int(int *a, int *b) {
    int t = *a; *a = *b; *b = t;
}

static inline size_t log2_floor(size_t n) {
    size_t r = 0;
    while (n > 1) { r++; n >>= 1; }
    return r;
}

/* ---------- 插入排序 ---------- */
static void insertion_sort(int *a, size_t n) {
    for (size_t i = 1; i < n; i++) {
        int key = a[i]; size_t j = i;
        while (j > 0 && a[j-1] > key) { a[j] = a[j-1]; j--; }
        a[j] = key;
    }
}

static bool partial_insertion_sort(int *a, size_t n) {
    if (n < 2) return true;
    size_t moves = 0;
    for (size_t i = 1; i < n; i++) {
        if (a[i] < a[i-1]) {
            int key = a[i]; size_t j = i;
            do { a[j] = a[j-1]; j--; moves++;
                 if (moves > PARTIAL_INS_LIMIT) { a[j] = key; return false; }
            } while (j > 0 && a[j-1] > key);
            a[j] = key;
        }
    }
    return true;
}

/* ---------- 堆排序 ---------- */
static void sift_down(int *a, size_t root, size_t end) {
    while (2*root+1 <= end) {
        size_t ch = 2*root+1;
        if (ch+1 <= end && a[ch] < a[ch+1]) ch++;
        if (a[root] < a[ch]) { swap_int(&a[root],&a[ch]); root = ch; }
        else return;
    }
}

static void heap_sort(int *a, size_t n) {
    if (n < 2) return;
    for (size_t i = n/2; i > 0; i--) sift_down(a, i-1, n-1);
    for (size_t i = n-1; i > 0; i--) { swap_int(&a[0],&a[i]); sift_down(a, 0, i-1); }
}

/* ---------- Pivot 选择 ---------- */
static inline size_t med3(int *a, size_t x, size_t y, size_t z) {
    if (a[x]<a[y]) { return a[y]<a[z]?y : a[x]<a[z]?z:x; }
    return a[x]<a[z]?x : a[y]<a[z]?z:y;
}

static size_t choose_pivot(int *a, size_t n) {
    size_t m = n/2;
    if (n >= NINTHER_THRESH) {
        size_t s = n/8;
        return med3(a, med3(a,0,s,2*s), med3(a,m-s,m,m+s),
                        med3(a,n-1-2*s,n-1-s,n-1));
    }
    return med3(a, 0, m, n-1);
}

/* ---------- 分区 ---------- */
typedef struct { size_t pos; bool ordered; } part_t;

static part_t pdq_partition(int *a, size_t n) {
    int pv = a[0]; size_t i = 1, j = n-1;
    while (i <= j && a[i] < pv) i++;
    while (i <= j && a[j] > pv) j--;
    bool ordered = (i > j);
    while (i <= j) {
        swap_int(&a[i],&a[j]); i++; j--;
        while (i <= j && a[i] < pv) i++;
        while (i <= j && a[j] > pv) j--;
    }
    size_t pp = i-1; swap_int(&a[0],&a[pp]);
    return (part_t){pp, ordered};
}

static size_t partition_equal(int *a, size_t n, int pv) {
    size_t i = 0;
    for (size_t j = 0; j < n; j++)
        if (!(pv < a[j])) { swap_int(&a[i],&a[j]); i++; }
    return i;
}

static void break_patterns(int *a, size_t n) {
    if (n < 8) return;
    swap_int(&a[0], &a[n/4]);
    swap_int(&a[n/2], &a[n-1]);
    if (n >= 32) { swap_int(&a[1], &a[n/4+1]); swap_int(&a[n/2+1], &a[n/4*3]); }
}

/* ---------- pdqsort 核心 ---------- */
static void pdqsort_impl(int *a, size_t n, bool has_pp, int pp, size_t lim) {
    while (n > SMALL_SORT) {
        if (lim == 0) { heap_sort(a, n); return; }

        size_t pi = choose_pivot(a, n);
        swap_int(&a[0], &a[pi]);

        if (has_pp && !(pp < a[0])) {
            size_t eq = partition_equal(a, n, pp);
            a += eq; n -= eq; has_pp = false; continue;
        }

        part_t pr = pdq_partition(a, n);
        size_t llen = pr.pos, rlen = n - pr.pos - 1;
        bool balanced = llen >= n/8 && rlen >= n/8;

        if (pr.ordered && balanced) {
            if (partial_insertion_sort(a, llen) &&
                partial_insertion_sort(a + pr.pos + 1, rlen)) return;
        }

        if (!balanced) {
            lim--;
            if (llen >= SMALL_SORT) break_patterns(a, llen);
            if (rlen >= SMALL_SORT) break_patterns(a + pr.pos + 1, rlen);
        }

        int cpv = a[pr.pos];
        if (llen < rlen) {
            pdqsort_impl(a, llen, has_pp, pp, lim);
            a += pr.pos + 1; n = rlen; has_pp = true; pp = cpv;
        } else {
            pdqsort_impl(a + pr.pos + 1, rlen, true, cpv, lim);
            n = llen;
        }
    }
    insertion_sort(a, n);
}

void pdqsort(int *a, size_t n) {
    if (n < 2) return;
    pdqsort_impl(a, n, false, 0, log2_floor(n));
}

/* ---------- 基准测试 ---------- */
static bool is_sorted(int *a, size_t n) {
    for (size_t i = 1; i < n; i++) if (a[i] < a[i-1]) return false;
    return true;
}

typedef void (*fill_fn)(int*, size_t);
static void fill_random(int *a, size_t n) { for (size_t i=0;i<n;i++) a[i]=rand(); }
static void fill_sorted(int *a, size_t n) { for (size_t i=0;i<n;i++) a[i]=(int)i; }
static void fill_reverse(int *a, size_t n) { for (size_t i=0;i<n;i++) a[i]=(int)(n-i); }
static void fill_few_uniq(int *a, size_t n) { for (size_t i=0;i<n;i++) a[i]=rand()%10; }
static void fill_pipe(int *a, size_t n) {
    size_t m=n/2;
    for (size_t i=0;i<m;i++) a[i]=(int)i;
    for (size_t i=m;i<n;i++) a[i]=(int)(n-i);
}

int main(void) {
    const size_t N = 1000000;
    int *a = malloc(N * sizeof *a);
    struct { const char *name; fill_fn f; } ds[] = {
        {"random",fill_random}, {"sorted",fill_sorted},
        {"reverse",fill_reverse}, {"few_unique",fill_few_uniq},
        {"pipe_organ",fill_pipe},
    };
    printf("%-14s %10s %6s\n","Distribution","Time(ms)","OK?");
    for (size_t d = 0; d < 5; d++) {
        ds[d].f(a, N);
        clock_t t0 = clock();
        pdqsort(a, N);
        double ms = (double)(clock()-t0)/CLOCKS_PER_SEC*1000.0;
        printf("%-14s %10.2f %6s\n", ds[d].name, ms, is_sorted(a,N)?"yes":"NO!");
    }
    free(a);
}

10.2 Rust 完整实现

const SMALL_SORT: usize = 24;
const PARTIAL_INS_LIMIT: usize = 8;
const NINTHER_THRESH: usize = 128;

fn insertion_sort<T: Ord>(a: &mut [T]) {
    for i in 1..a.len() {
        let mut j = i;
        while j > 0 && a[j] < a[j - 1] { a.swap(j, j - 1); j -= 1; }
    }
}

fn partial_insertion_sort<T: Ord>(a: &mut [T]) -> bool {
    let mut moves = 0usize;
    for i in 1..a.len() {
        if a[i] < a[i - 1] {
            let mut j = i;
            loop {
                a.swap(j, j - 1); moves += 1;
                if moves > PARTIAL_INS_LIMIT { return false; }
                j -= 1;
                if j == 0 || a[j] >= a[j - 1] { break; }
            }
        }
    }
    true
}

fn heap_sort<T: Ord>(a: &mut [T]) {
    let n = a.len();
    for i in (0..n / 2).rev() { sift_down(a, i, n); }
    for i in (1..n).rev() { a.swap(0, i); sift_down(a, 0, i); }
}

fn sift_down<T: Ord>(a: &mut [T], mut r: usize, end: usize) {
    loop {
        let c = 2 * r + 1;
        if c >= end { break; }
        let mc = if c + 1 < end && a[c] < a[c + 1] { c + 1 } else { c };
        if a[r] < a[mc] { a.swap(r, mc); r = mc; } else { break; }
    }
}

fn med3<T: Ord>(a: &[T], x: usize, y: usize, z: usize) -> usize {
    if a[x] < a[y] {
        if a[y] < a[z] { y } else if a[x] < a[z] { z } else { x }
    } else if a[x] < a[z] { x } else if a[y] < a[z] { z } else { y }
}

fn choose_pivot<T: Ord>(a: &[T]) -> usize {
    let n = a.len(); let m = n / 2;
    if n >= NINTHER_THRESH {
        let s = n / 8;
        med3(a, med3(a,0,s,2*s), med3(a,m-s,m,m+s), med3(a,n-1-2*s,n-1-s,n-1))
    } else { med3(a, 0, m, n - 1) }
}

fn pdq_partition<T: Ord>(a: &mut [T]) -> (usize, bool) {
    let n = a.len();
    let (mut i, mut j) = (1, n - 1);
    while i <= j && a[i] < a[0] { i += 1; }
    while i <= j && a[j] > a[0] { j = j.wrapping_sub(1); if j == usize::MAX { break; } }
    let ordered = i > j;
    while i <= j {
        a.swap(i, j); i += 1; if j == 0 { break; } j -= 1;
        while i <= j && a[i] < a[0] { i += 1; }
        while i <= j && a[j] > a[0] { if j == 0 { break; } j -= 1; }
    }
    let pp = i - 1; a.swap(0, pp);
    (pp, ordered)
}

fn break_patterns<T>(a: &mut [T]) {
    let n = a.len(); if n < 8 { return; }
    a.swap(0, n / 4); a.swap(n / 2, n - 1);
    if n >= 32 { a.swap(1, n / 4 + 1); a.swap(n / 2 + 1, n / 4 * 3); }
}

pub fn pdqsort<T: Ord>(a: &mut [T]) {
    if a.len() < 2 { return; }
    let lim = (a.len() as f64).log2() as usize;
    pdqsort_impl(a, false, lim);
}

fn pdqsort_impl<T: Ord>(mut a: &mut [T], mut has_pp: bool, mut lim: usize) {
    loop {
        let n = a.len();
        if n <= SMALL_SORT { insertion_sort(a); return; }
        if lim == 0 { heap_sort(a); return; }

        let pi = choose_pivot(a); a.swap(0, pi);
        let (pp, ordered) = pdq_partition(a);
        let (llen, rlen) = (pp, n - pp - 1);
        let balanced = llen >= n / 8 && rlen >= n / 8;

        if ordered && balanced {
            let (left, rest) = a.split_at_mut(pp);
            let right = &mut rest[1..];
            if partial_insertion_sort(left) && partial_insertion_sort(right) { return; }
        }

        if !balanced {
            lim -= 1;
            if llen >= SMALL_SORT { let (l, _) = a.split_at_mut(pp); break_patterns(l); }
            if rlen >= SMALL_SORT { break_patterns(&mut a[pp + 1..]); }
        }

        if llen < rlen {
            let (left, rest) = a.split_at_mut(pp);
            pdqsort_impl(left, has_pp, lim);
            a = &mut rest[1..]; has_pp = true;
        } else {
            let (left_and_pv, right) = a.split_at_mut(pp + 1);
            pdqsort_impl(right, true, lim);
            a = &mut left_and_pv[..pp];
        }
    }
}

十一、工程陷阱表

陷阱 描述 解决方案
分区边界 off-by-one Hoare 分区 i/j 边界差一就死循环或漏排 半开区间 [lo,hi) 统一约定,fuzzing 验证
pivot 选择泄露 可预测的 pivot 策略允许构造 O(n^2) 输入 break_patterns + 堆排回退双重保险
等号处理 <<= 决定等值元素归属,影响稳定性和性能 使用严格 <,左侧 <pivot,右侧 >=pivot
尾递归缺失 不做尾递归优化在极端情况下栈溢出 始终对较大子数组用循环
阈值未调优 SMALL_SORT 太大则 O(n^2) 显现,太小则递归开销浪费 在目标平台 benchmark,通常 16-32
堆排回退太早 bad_limit 太小对正常数据也误触发堆排 使用 log₂(n) 而非常数
不稳定排序误用 pdqsort 不保证等值元素相对顺序 需要稳定性用 TimSort
比较函数违反严格弱序 不满足传递性导致未定义行为 确保 !(a<b) && !(b<a) 意味着等价
branchless 平台差异 cmov 在部分 ARM 核心上效果不明显 目标平台实测,不盲目采用
浮点 NaN NaN 不满足严格弱序,排序可能死循环 先过滤 NaN 或使用 total_cmp

十二、个人看法

12.1 pdqsort 的真正创新

pdqsort 的创新不在于任何单一技术——插入排序、堆排序、Hoare 分区、Dutch National Flag 都是几十年前的老东西。它的创新在于以极低的运行时开销将这些技术组合在一起。每个检测步骤(was_partitioned、bad_limit、prev_pivot 比较)的开销都是 O(1),不需要额外遍历。“正常”情况下表现和精心优化的快排一样好,“特殊”情况下自动切换到最优策略。

12.2 为什么 Rust 选了 pdqsort

我认为有四个原因:零额外分配(只需 O(log n) 栈空间,适合嵌入式场景);零成本抽象的完美配合(泛型比较函数编译时内联);性能一致性(不会在某些分布上突然变慢);可预测性(对系统编程语言来说,和绝对性能一样重要)。

12.3 局限性

pdqsort 不是万能的:需要稳定排序用 TimSort;超大数据集用外部排序;并行场景 sample sort 更合适;特定数据类型(整数、短字符串)基数排序可以突破 O(n log n) 下界。

12.4 排序算法工程的趋势

从 qsort 到 introsort 到 pdqsort,排序的工程演进有一个清晰趋势:从静态策略到动态策略。早期算法”一招鲜”,introsort 开始混合但切换粗糙,pdqsort 把动态策略推到极致。未来可能结合更精细的运行时检测甚至 learned sorts。但无论如何,pdqsort 展示了一个永恒的工程原则:当你不确定会遇到什么情况时,不要选一种策略然后祈祷,而是设计一个能适应多种情况的系统。

参考文献

  1. Peters, O. (2021). “Pattern-defeating Quicksort”. arXiv:2106.05123.
  2. Musser, D. (1997). “Introspective Sorting and Selection Algorithms”. Software: Practice and Experience, 27(8), 983-993.
  3. Hoare, C.A.R. (1962). “Quicksort”. The Computer Journal, 5(1), 10-16.
  4. Dijkstra, E.W. (1976). “A Discipline of Programming”. Prentice-Hall.
  5. Rust 标准库源码: https://github.com/rust-lang/rust/blob/master/library/core/src/slice/sort/unstable/
  6. Boost.Sort 源码: https://github.com/boostorg/sort
  7. Edelkamp, S., Weiss, A. (2016). “BlockQuicksort: How Branch Mispredictions don’t affect Quicksort”. arXiv:1604.06697.

算法系列导航上一篇:TimSort 深度解剖 | 下一篇:基数排序:打破比较下界的正确姿势

相关阅读排序基准测试:用数据说话 | 并行排序:从归并网络到 GPU 双调排序


By .