土法炼钢兴趣小组的算法知识备份

Join 算法全解:Nested Loop → Hash → Sort-Merge

目录

Join 算法全解:Nested Loop、Hash Join、Sort-Merge Join

数据库中最昂贵的操作,可能就是 Join。

一条看似简单的 SQL:

SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.amount > 1000;

如果 orders 表有 1000 万行,customers 表有 100 万行,朴素的嵌套循环需要 10 万亿次比较。即使每次比较只要 1 纳秒,也需要 10000 秒——将近 3 小时。

但实际上,任何一个现代数据库都能在几秒内完成这个查询。原因很简单:没有人会真的用朴素嵌套循环。从 System R 时代开始,数据库研究者就在不断优化 Join 算法。从 Nested Loop 到 Sort-Merge,从 Classic Hash Join 到 Hybrid Hash Join,再到现代的向量化执行和并行 Hash Join,每一步优化都在解决同一个核心问题:如何用最少的 I/O 和 CPU 代价,把两张表中匹配的行拼在一起。

这篇文章从成本模型讲起,逐一拆解每种 Join 算法的原理、代价公式和适用场景。最后给出一个完整的 C 语言 Hash Join 实现,以及真实数据库系统中的工程细节。

一、Join 问题的形式化定义与成本模型

Join 操作的本质

Join 的本质是笛卡尔积的子集。给定关系 R 和 S,以及连接谓词 theta:

R ⋈_theta S = { (r, s) | r ∈ R, s ∈ S, theta(r, s) = true }

最常见的是等值连接(equi-join):R.a = S.b。范围连接(range join)、半连接(semi-join)、反连接(anti-join)都是 Join 的变体,但等值连接是绝大多数优化技术的目标。

I/O 成本模型

在传统的磁盘数据库中,I/O 是瓶颈。我们用以下符号建立成本模型:

符号 含义
M 关系 R 的页数(pages)
N 关系 S 的页数(pages)
p_R 每页 R 中的元组数
p_S 每页 S 中的元组数
B 可用的缓冲区页数(buffer pages)
\|R\| R 的元组总数,= M * p_R
\|S\| S 的元组总数,= N * p_S

成本以 磁盘页面 I/O 次数 度量。我们忽略 CPU 成本——在机械硬盘时代,一次随机 I/O 大约 10ms,而一次内存比较不到 10ns,两者相差 6 个数量级。虽然 SSD 缩小了差距(随机读约 100us),但 I/O 仍然是主要矛盾。

为什么缓冲区大小决定一切

所有 Join 算法的 I/O 成本都是 B(缓冲区页数)的函数。缓冲区越大,能缓存的数据越多,I/O 越少。这是一个核心洞察:Join 算法的选择本质上是在给定内存预算下,如何最优地安排读写顺序

成本 = f(M, N, B)

其中 B 是可用缓冲区页数。
当 B >= min(M, N) + 2 时,几乎所有算法都退化为简单的一趟扫描。
真正有趣的情况是 B << M 且 B << N。

二、Nested Loop Join:最朴素但最灵活

Simple Nested Loop Join

最直观的实现:对 R 中的每一行,扫描 S 中的每一行。

for each tuple r in R:          -- 扫描 R 一次
    for each tuple s in S:      -- 每个 r 扫描 S 一次
        if theta(r, s):
            output (r, s)

I/O 成本

Cost = M + |R| * N
     = M + (M * p_R) * N

R 扫描一次花费 M 次 I/O。对于 R 中的每个元组(共 M * p_R 个),都要完整扫描 S 一次,每次 N 次 I/O。

如果 M = 1000,N = 500,p_R = 100:

Cost = 1000 + 1000 * 100 * 500 = 50,001,000 次 I/O

以每次 I/O 0.1ms(SSD)计算,需要 5000 秒。不可接受。

Block Nested Loop Join

核心优化:以页(block)为单位而非以元组为单位迭代外层关系。

for each block b_R of R (using B-2 pages):
    for each page p_S of S:
        for each tuple r in b_R:
            for each tuple s in p_S:
                if theta(r, s):
                    output (r, s)

分配 B-2 页给外层关系 R(1 页给 S 的输入,1 页给输出)。每次读入 B-2 页的 R,然后完整扫描 S。

I/O 成本

Cost = M + ⌈M / (B-2)⌉ * N

R 仍然扫描一次。但现在 S 只需要扫描 ⌈M / (B-2)⌉ 次,而不是 M * p_R 次。

同样的例子,B = 100:

Cost = 1000 + ⌈1000/98⌉ * 500
     = 1000 + 11 * 500
     = 6,500 次 I/O

从 5000 万降到 6500——改进了近 4 个数量级。

关键优化:永远把小表放在外层。如果 M < N,那么 ⌈M/(B-2)⌉ * N < ⌈N/(B-2)⌉ * M

Index Nested Loop Join

如果内层关系上有索引(B+ 树或哈希索引),可以用索引查找代替全表扫描。

for each tuple r in R:
    -- 用 r.join_key 在 S 的索引上查找匹配
    for each matching tuple s in S.index_lookup(r.join_key):
        output (r, s)

I/O 成本

Cost = M + |R| * C_index

其中 C_index 是一次索引查找的成本。对于 B+ 树索引,C_index = h + 1(h 是树的高度,通常 2-4 层,加上一次数据页的读取)。对于哈希索引,C_index ≈ 1.2(平均 1.2 次 I/O)。

如果 R 有 1000 页,p_R = 100,B+ 树高度 = 3:

Cost = 1000 + 100,000 * 4 = 401,000 次 I/O

看起来不少,但如果 S 非常大(比如 100 万页),Index NLJ 可能比 Block NLJ 更好。而且,如果连接的选择率(selectivity)很低——即匹配的行很少——Index NLJ 的优势更加明显。

三种 NLJ 的适用场景

变体 最佳场景 限制
Simple NLJ 教学用途 几乎从不使用
Block NLJ 无索引、小到中等数据量 内存不足时 S 多次扫描
Index NLJ 内层有索引、低选择率 需要预建索引

三、Sort-Merge Join:有序数据的利器

算法概述

Sort-Merge Join 分两个阶段:

  1. Sort 阶段:分别对 R 和 S 按连接键排序(如果尚未排序)
  2. Merge 阶段:两个有序序列做归并
-- Phase 1: Sort
sort R on join key -> R_sorted
sort S on join key -> S_sorted

-- Phase 2: Merge
r = first tuple in R_sorted
s = first tuple in S_sorted
while r != EOF and s != EOF:
    if r.key < s.key:
        advance r
    else if r.key > s.key:
        advance s
    else:
        -- r.key == s.key, output all matching pairs
        output all (r_i, s_j) where r_i.key == s_j.key == r.key
        advance past current key group

External Sort 的成本

当数据无法全部放入内存时,需要外部排序。使用 B 个缓冲页:

Pass 0(初始运行生成):读入 B 页数据,内存中排序,写出。产生 ⌈M/B⌉ 个有序运行(sorted runs),每个长 B 页。

后续 Pass(多路归并):每次将 B-1 个运行合并为一个更大的运行(1 页用于输出)。需要 ⌈log_{B-1}(⌈M/B⌉)⌉ 趟归并。

排序总成本

Sort(M) = 2 * M * (1 + ⌈log_{B-1}(⌈M/B⌉)⌉)

每趟读写各一次,所以乘以 2。

Sort-Merge Join 总成本

Cost = Sort(M) + Sort(N) + M + N
     = 2M * (1 + ⌈log_{B-1}(⌈M/B⌉)⌉) + 2N * (1 + ⌈log_{B-1}(⌈N/B⌉)⌉) + M + N

最后的 M + N 是 merge 阶段的一趟扫描。

特殊情况:如果数据已经按连接键排序(比如连接键是聚簇索引的键),排序阶段免费,总成本就是 M + N——线性的。

数值实例

M = 1000,N = 500,B = 100:

Sort(1000) = 2 * 1000 * (1 + ⌈log_99(⌈1000/100⌉)⌉)
           = 2000 * (1 + ⌈log_99(10)⌉)
           = 2000 * (1 + 1)
           = 4000

Sort(500)  = 2 * 500 * (1 + ⌈log_99(⌈500/100⌉)⌉)
           = 1000 * (1 + ⌈log_99(5)⌉)
           = 1000 * (1 + 1)
           = 2000

Total = 4000 + 2000 + 1000 + 500 = 7500 次 I/O

Sort-Merge Join 的优势

  1. 已排序输入:如果 R 或 S 已经按连接键排序(例如通过聚簇索引),排序阶段为零成本
  2. 范围连接R.a BETWEEN S.b AND S.c 这种非等值连接,Sort-Merge 仍然适用,而 Hash Join 不行
  3. 排序结果可复用:排序后的数据可以服务后续的 ORDER BY、GROUP BY 或另一个 Sort-Merge Join
  4. 最坏情况保证:成本是确定性的 O(M log M + N log N),不像 Hash Join 可能因为数据倾斜而退化
-- 当查询同时需要排序时,Sort-Merge Join 可以"免费"提供排序结果
SELECT o.id, c.name, o.amount
FROM orders o JOIN customers c ON o.customer_id = c.id
ORDER BY o.customer_id;  -- Sort-Merge 的排序结果正好满足这个 ORDER BY

四、Classic Hash Join:分区与探测

基本思想

Hash Join 的核心思想极其简单:用哈希表避免笛卡尔积。

-- Phase 1: Build(构建哈希表)
hash_table = {}
for each tuple s in S:    -- S 是较小的表(build side)
    key = hash(s.join_key)
    hash_table[key].append(s)

-- Phase 2: Probe(探测)
for each tuple r in R:    -- R 是较大的表(probe side)
    key = hash(r.join_key)
    for each tuple s in hash_table[key]:
        if r.join_key == s.join_key:
            output (r, s)

如果 S 能完全放入内存(即 M_S <= B - 2),这就是一个简单哈希连接(Simple Hash Join),成本仅为 M + N——读 S 建表,读 R 探测,各扫描一次。

但如果 S 太大,放不进内存呢?

Grace Hash Join

Grace Hash Join(以日本筑波的 Grace 机器命名)使用分区来处理超出内存的情况。

Phase 1: Partition(分区)

用哈希函数 h1 将 R 和 S 各分成 B-1 个分区:

-- 分配 B-1 个输出缓冲页(每个分区一个),1 个输入缓冲页
for each tuple r in R:
    i = h1(r.join_key) mod (B-1)
    write r to partition R_i

for each tuple s in S:
    i = h1(s.join_key) mod (B-1)
    write s to partition S_i

关键性质:如果 r.join_key == s.join_key,那么 r 和 s 一定在同一个分区号中。因此,只需要对应分区之间做 Join,不同分区之间不会有匹配。

Phase 2: Build & Probe(逐分区连接)

for i = 0 to B-2:
    -- 将 S_i 读入内存,构建哈希表(使用不同的哈希函数 h2)
    hash_table = build_hash_table(S_i, h2)
    -- 扫描 R_i,在哈希表中查找匹配
    for each tuple r in R_i:
        probe hash_table with h2(r.join_key)
        output matches

每个分区 S_i 的大小大约是 N / (B-1)。只要 N / (B-1) <= B - 2,即 B > sqrt(N) + 1,每个分区就能放进内存。

I/O 成本

Cost = 2 * (M + N) + (M + N)
     = 3 * (M + N)

分区阶段:读 R 一次、写 R 一次;读 S 一次、写 S 一次。共 2(M + N)。 连接阶段:读所有 R 分区和 S 分区各一次。共 M + N

对于我们的例子:

Cost = 3 * (1000 + 500) = 4500 次 I/O

比 Block NLJ 的 6500 更好,而且几乎不受缓冲区大小影响(只要 B > sqrt(N))。

为什么总是用小表做 Build Side

build side(构建哈希表的表)应该是较小的表。原因很简单:哈希表有额外的空间开销(指针、桶数组),实际需要的内存约为数据大小的 1.2-2 倍。选择小表可以最小化分区数,从而最小化 I/O。

五、Hybrid Hash Join:内存优化的艺术

Classic Hash Join 的浪费

Grace Hash Join 总是将所有数据写到磁盘再读回来,即使有足够的内存容纳一部分数据。当 B 明显大于 sqrt(N) 时,这种浪费尤其明显。

Hybrid Hash Join 的优化

核心思想:在分区阶段,将第一个分区直接保留在内存中,不写到磁盘。

Phase 1: Partition
    -- 为分区 0 的 S 元组在内存中构建哈希表
    -- 其他分区 1..k-1 正常写到磁盘
    in_memory_ht = {}
    for each tuple s in S:
        i = h1(s.join_key) mod k
        if i == 0:
            in_memory_ht.insert(s)   -- 直接建哈希表,不落盘
        else:
            write s to partition S_i

    for each tuple r in R:
        i = h1(r.join_key) mod k
        if i == 0:
            probe in_memory_ht       -- 直接探测,不落盘
            output matches
        else:
            write r to partition R_i

Phase 2: Build & Probe for partitions 1..k-1
    -- 与 Grace Hash Join 相同

成本分析

设分区 0 占总数据的比例为 f = 1/k。分区 0 不写磁盘:

Cost = (M + N)                          -- 读入所有数据
     + 2 * (1-f) * (M + N)             -- 非第 0 分区的写出和读回
     = (M + N) * (1 + 2*(1-f))
     = (M + N) * (3 - 2/k)

当 k 很大(缓冲区小)时,f 接近 0,成本趋近于 3(M+N)——与 Grace 相同。 当 k 很小(缓冲区大到可以装下大部分 S)时,f 接近 1,成本趋近于 (M+N)——与 Simple Hash Join 相同。

Hybrid Hash Join 平滑地在两个极端之间过渡:

B 很大(B >= N + 2): Cost = M + N           -- 完全内存 Join
B 适中:                Cost = (M+N) * (3-2/k) -- 部分内存,部分磁盘
B 刚够(B ~ sqrt(N)):  Cost ≈ 3(M+N)         -- 全磁盘 Grace Join

分区数量的选择

需要满足两个约束:

  1. 每个磁盘分区的 S 部分必须能放进内存:N * (1-f) / (k-1) <= B - k
  2. 内存中的分区 0 的哈希表不能超过可用内存:N * f * overhead <= B_reserved

实际实现中,通常根据 S 的大小和可用内存动态计算 k,让分区 0 尽可能大。

六、Hash Join 的边界情况与高级技巧

数据倾斜(Skew)

当连接键的分布极不均匀时,某些分区可能远大于其他分区。例如,如果 90% 的订单来自 10% 的客户,那么这 10% 客户对应的分区将包含 90% 的数据。

一个巨大的分区可能无法放进内存,导致 Hash Join 失败或退化。

解决方案一:递归分区(Recursive Partitioning)

function join_partition(R_i, S_i, depth):
    if S_i fits in memory:
        -- 正常 build & probe
        build hash table on S_i
        probe with R_i
    else:
        -- S_i 太大,用新的哈希函数再次分区
        re-partition R_i and S_i with h_{depth+1}
        for each sub-partition j:
            join_partition(R_i_j, S_i_j, depth + 1)

递归分区的最坏情况是所有键完全相同——此时无论怎么分区都无法减小分区大小。对于这种情况,只能回退到 Nested Loop Join。

解决方案二:倾斜检测与特殊处理

-- 分区后检查每个分区的大小
for each partition i:
    if size(S_i) > B * threshold:
        -- 标记为倾斜分区
        -- 使用 Block NLJ 或 递归分区处理
        handle_skewed_partition(R_i, S_i)
    else:
        normal_hash_join(R_i, S_i)

Bloom Filter 预过滤

在分区阶段,对 build side 构建一个 Bloom filter。在 probe 阶段,先用 Bloom filter 过滤 R 的元组——如果 Bloom filter 说”不存在”,那就一定不存在,可以跳过。

-- Build 阶段
bloom_filter = new BloomFilter(expected_size)
for each tuple s in S:
    bloom_filter.add(s.join_key)
    hash_table.insert(s)

-- Probe 阶段
for each tuple r in R:
    if not bloom_filter.might_contain(r.join_key):
        continue    -- 一定没有匹配,跳过
    -- 可能有匹配,正常探测
    probe hash_table with r.join_key

当连接的选择率很低(即大部分 R 的行在 S 中没有匹配)时,Bloom filter 可以过滤掉大量的哈希表探测,显著减少 CPU 开销。在分布式 Join 中,Bloom filter 还可以在网络传输前过滤数据,减少数据 shuffle 量。

分区溢出处理

当内存不足以容纳某个分区时的处理策略:

策略一:溢出到磁盘
    当某个分区的输出缓冲满了,刷到磁盘
    分区阶段结束后,溢出的分区需要额外的 I/O 读回

策略二:动态调整分区数
    监控每个分区的大小
    当某个分区增长过快,拆分为多个子分区

策略三:预留 victim buffer
    保留少量缓冲页作为溢出缓冲
    当分区缓冲不够时,先驱逐(evict)最大的分区到磁盘

七、向量化执行:列式 Hash Join

传统火山模型的问题

传统数据库使用火山模型(Volcano / Iterator Model):每个算子每次产出一行。这意味着每处理一行就要经过多层虚函数调用,对 CPU 缓存极不友好。

-- 火山模型:一次一行
class HashJoinOperator:
    def next():
        while true:
            r = probe_child.next()    -- 虚函数调用
            if r is EOF: return EOF
            matches = hash_table.probe(r.join_key)
            if matches:
                return concatenate(r, matches[0])

列式向量化(DuckDB 方式)

现代分析型数据库(如 DuckDB、ClickHouse)使用向量化执行:每次处理一批(vector)数据,通常 1024 或 2048 行。

// 向量化 Hash Join 的 Probe 阶段
void hash_join_probe_vectorized(
    Vector *join_keys,      // 输入:一批 join key
    Vector *hash_table,     // 哈希表
    SelectionVector *result // 输出:匹配的行号
) {
    uint64_t hashes[VECTOR_SIZE];
    uint32_t entries[VECTOR_SIZE];

    // Step 1: 批量计算哈希值
    hash_vector(join_keys, hashes, VECTOR_SIZE);

    // Step 2: 批量查找桶位置
    for (int i = 0; i < VECTOR_SIZE; i++) {
        entries[i] = hash_table->buckets[hashes[i] & mask];
    }

    // Step 3: 批量比较键值
    int match_count = 0;
    for (int i = 0; i < VECTOR_SIZE; i++) {
        while (entries[i] != EMPTY) {
            if (compare_keys(join_keys, i, hash_table, entries[i])) {
                result->set(match_count++, i, entries[i]);
            }
            entries[i] = hash_table->next[entries[i]];
        }
    }
}

向量化的优势:

  1. 减少虚函数开销:每 1024 行才调用一次函数,而非每行
  2. SIMD 友好:批量哈希计算和键比较可以使用 SIMD 指令
  3. 缓存友好:列式存储的连接键在内存中连续,预取效率高
  4. 编译器优化:紧凑的循环体更容易被编译器自动向量化

列存中的 Hash Join 优化

传统行存的 Hash Join:
    哈希表中存储完整行 -> 每行大小不固定,缓存利用率低

列存的 Hash Join:
    哈希表只存储 row_id 和 join_key -> 固定大小条目
    匹配后通过 row_id 延迟物化(late materialization)其他列
    只有最终需要的列才会被读取

延迟物化(Late Materialization)是列存 Hash Join 的关键优化。如果查询只需要 R 的 3 列和 S 的 2 列,传统行存需要在哈希表中存储 S 的所有列,而列存只需要存储连接键和 row_id,匹配后再按需取其他列。

八、并行 Hash Join:分区与共享

Partitioned Parallel Hash Join

每个线程处理不同的分区,天然无锁:

Phase 1: Parallel Partition
    -- 每个线程对自己负责的 R 和 S 分片进行分区
    Thread i:
        for each tuple in my_chunk_of_R:
            partition into local buffers
        for each tuple in my_chunk_of_S:
            partition into local buffers
    -- barrier --
    -- 合并同一分区号的数据

Phase 2: Parallel Build & Probe
    -- 每个线程处理一个或多个分区
    Thread i:
        for each partition assigned to me:
            build hash table on S_partition
            probe with R_partition

优点:无锁,扩展性好。 缺点:分区阶段本身有开销;分区不均可能导致负载不均衡。

Shared Hash Table Parallel Join

所有线程共享同一个哈希表:

Phase 1: Parallel Build
    -- 所有线程并发向同一个哈希表插入 S 的元组
    Thread i:
        for each tuple s in my_chunk_of_S:
            bucket = hash(s.join_key) & mask
            -- 需要原子操作或分桶锁
            atomic_insert(hash_table[bucket], s)
    -- barrier --

Phase 2: Parallel Probe
    -- 所有线程并发读哈希表(只读,天然线程安全)
    Thread i:
        for each tuple r in my_chunk_of_R:
            probe hash_table with r.join_key

优点:不需要分区阶段,更简单。Probe 阶段只读,天然并行。 缺点:Build 阶段需要同步(原子操作或锁),在高并发下可能成为瓶颈。

NUMA 感知

在多路服务器上,内存访问并不均匀。访问本地 NUMA 节点的内存比远端快 2-3 倍。

NUMA 感知的 Hash Join 策略:

1. 分区阶段:每个 NUMA 节点处理本地数据的分区
2. 数据放置:确保每个分区的数据在处理它的 NUMA 节点上
3. 哈希表构建:在本地内存中分配哈希表
4. 探测阶段:尽量让探测线程访问本地的哈希表分区

核心原则:计算跟着数据走,而不是数据跟着计算走。

现代数据库如 HyPer、Umbra 在 Join 实现中显式考虑 NUMA 拓扑,通过 numactlmbind 系统调用控制内存分配。

两种方案的对比

维度 分区并行 共享哈希表
扩展性 优秀,接近线性 受限于原子操作
缓存局部性 好(每个线程操作自己的分区) 较差(多线程竞争缓存行)
NUMA 友好 是(数据局部性好) 否(跨节点访问频繁)
实现复杂度 较高(需要分区合并) 较低
小数据量 分区开销可能不值得 更合适
大数据量 更合适 原子操作开销大

九、C 语言实现:完整的 Hash Join

下面是一个完整的 Hash Join 实现,包括分区和探测两个阶段,以及基本的内存管理。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <time.h>

/* ----------------------------------------------------------------
 * 数据结构定义
 * ---------------------------------------------------------------- */

typedef struct {
    int32_t key;
    int32_t payload;
} Tuple;

typedef struct HashEntry {
    Tuple             tuple;
    struct HashEntry *next;
} HashEntry;

typedef struct {
    HashEntry **buckets;
    uint32_t    num_buckets;
    uint32_t    mask;           /* num_buckets - 1, 要求 num_buckets 是 2 的幂 */
    uint32_t    count;
} HashTable;

typedef struct {
    Tuple   *data;
    uint32_t size;
    uint32_t capacity;
} Partition;

typedef struct {
    int32_t r_key;
    int32_t r_payload;
    int32_t s_key;
    int32_t s_payload;
} JoinResult;

/* ----------------------------------------------------------------
 * 哈希函数:使用 murmur-like 混合
 * ---------------------------------------------------------------- */

static inline uint32_t hash_key(int32_t key)
{
    uint32_t h = (uint32_t)key;
    h ^= h >> 16;
    h *= 0x45d9f3b;
    h ^= h >> 16;
    h *= 0x45d9f3b;
    h ^= h >> 16;
    return h;
}

/* ----------------------------------------------------------------
 * 哈希表操作
 * ---------------------------------------------------------------- */

static HashTable *ht_create(uint32_t expected_size)
{
    HashTable *ht = (HashTable *)malloc(sizeof(HashTable));
    /* 取大于 expected_size * 2 的最小 2 的幂 */
    uint32_t nb = 1;
    while (nb < expected_size * 2)
        nb <<= 1;
    ht->num_buckets = nb;
    ht->mask = nb - 1;
    ht->count = 0;
    ht->buckets = (HashEntry **)calloc(nb, sizeof(HashEntry *));
    return ht;
}

static void ht_insert(HashTable *ht, const Tuple *t)
{
    uint32_t idx = hash_key(t->key) & ht->mask;
    HashEntry *entry = (HashEntry *)malloc(sizeof(HashEntry));
    entry->tuple = *t;
    entry->next = ht->buckets[idx];
    ht->buckets[idx] = entry;
    ht->count++;
}

static void ht_destroy(HashTable *ht)
{
    for (uint32_t i = 0; i < ht->num_buckets; i++) {
        HashEntry *e = ht->buckets[i];
        while (e) {
            HashEntry *tmp = e;
            e = e->next;
            free(tmp);
        }
    }
    free(ht->buckets);
    free(ht);
}

/* ----------------------------------------------------------------
 * 分区操作
 * ---------------------------------------------------------------- */

static void partition_init(Partition *p, uint32_t capacity)
{
    p->data = (Tuple *)malloc(capacity * sizeof(Tuple));
    p->size = 0;
    p->capacity = capacity;
}

static void partition_append(Partition *p, const Tuple *t)
{
    if (p->size >= p->capacity) {
        p->capacity *= 2;
        p->data = (Tuple *)realloc(p->data, p->capacity * sizeof(Tuple));
    }
    p->data[p->size++] = *t;
}

static void partition_free(Partition *p)
{
    free(p->data);
    p->data = NULL;
    p->size = 0;
    p->capacity = 0;
}

/* ----------------------------------------------------------------
 * Grace Hash Join 主流程
 * ---------------------------------------------------------------- */

typedef struct {
    JoinResult *results;
    uint32_t    size;
    uint32_t    capacity;
} ResultBuffer;

static void result_append(ResultBuffer *buf, int32_t rk, int32_t rp,
                           int32_t sk, int32_t sp)
{
    if (buf->size >= buf->capacity) {
        buf->capacity = buf->capacity ? buf->capacity * 2 : 1024;
        buf->results = (JoinResult *)realloc(
            buf->results, buf->capacity * sizeof(JoinResult));
    }
    JoinResult *jr = &buf->results[buf->size++];
    jr->r_key     = rk;
    jr->r_payload = rp;
    jr->s_key     = sk;
    jr->s_payload = sp;
}

uint32_t hash_join(const Tuple *R, uint32_t r_size,
                   const Tuple *S, uint32_t s_size,
                   uint32_t num_partitions,
                   ResultBuffer *output)
{
    /* --- Phase 1: Partition --- */
    Partition *r_parts = (Partition *)malloc(num_partitions * sizeof(Partition));
    Partition *s_parts = (Partition *)malloc(num_partitions * sizeof(Partition));

    uint32_t est = (r_size > s_size ? r_size : s_size) / num_partitions + 64;
    for (uint32_t i = 0; i < num_partitions; i++) {
        partition_init(&r_parts[i], est);
        partition_init(&s_parts[i], est);
    }

    for (uint32_t i = 0; i < r_size; i++) {
        uint32_t p = hash_key(R[i].key) % num_partitions;
        partition_append(&r_parts[p], &R[i]);
    }
    for (uint32_t i = 0; i < s_size; i++) {
        uint32_t p = hash_key(S[i].key) % num_partitions;
        partition_append(&s_parts[p], &S[i]);
    }

    /* --- Phase 2: Build & Probe per partition --- */
    uint32_t total_matches = 0;

    for (uint32_t p = 0; p < num_partitions; p++) {
        Partition *sp = &s_parts[p];
        Partition *rp = &r_parts[p];

        if (sp->size == 0 || rp->size == 0) {
            continue;
        }

        /* Build 哈希表(在 S 分区上) */
        HashTable *ht = ht_create(sp->size);
        for (uint32_t i = 0; i < sp->size; i++) {
            ht_insert(ht, &sp->data[i]);
        }

        /* Probe(在 R 分区上) */
        for (uint32_t i = 0; i < rp->size; i++) {
            uint32_t idx = hash_key(rp->data[i].key) & ht->mask;
            HashEntry *entry = ht->buckets[idx];
            while (entry) {
                if (entry->tuple.key == rp->data[i].key) {
                    result_append(output,
                                  rp->data[i].key, rp->data[i].payload,
                                  entry->tuple.key, entry->tuple.payload);
                    total_matches++;
                }
                entry = entry->next;
            }
        }

        ht_destroy(ht);
    }

    /* 清理分区 */
    for (uint32_t i = 0; i < num_partitions; i++) {
        partition_free(&r_parts[i]);
        partition_free(&s_parts[i]);
    }
    free(r_parts);
    free(s_parts);

    return total_matches;
}

/* ----------------------------------------------------------------
 * 测试与基准
 * ---------------------------------------------------------------- */

static Tuple *generate_relation(uint32_t size, uint32_t key_range)
{
    Tuple *rel = (Tuple *)malloc(size * sizeof(Tuple));
    for (uint32_t i = 0; i < size; i++) {
        rel[i].key     = rand() % key_range;
        rel[i].payload = (int32_t)i;
    }
    return rel;
}

int main(void)
{
    srand((unsigned)time(NULL));

    const uint32_t R_SIZE     = 1000000;
    const uint32_t S_SIZE     = 500000;
    const uint32_t KEY_RANGE  = 200000;
    const uint32_t NUM_PARTS  = 128;

    printf("Generating relations: R=%u tuples, S=%u tuples\n",
           R_SIZE, S_SIZE);

    Tuple *R = generate_relation(R_SIZE, KEY_RANGE);
    Tuple *S = generate_relation(S_SIZE, KEY_RANGE);

    ResultBuffer output = { NULL, 0, 0 };

    printf("Running Grace Hash Join with %u partitions...\n", NUM_PARTS);

    clock_t start = clock();
    uint32_t matches = hash_join(R, R_SIZE, S, S_SIZE, NUM_PARTS, &output);
    clock_t end = clock();

    double elapsed = (double)(end - start) / CLOCKS_PER_SEC;
    printf("Done: %u matches in %.3f seconds\n", matches, elapsed);
    printf("Throughput: %.2f M tuples/sec\n",
           (double)(R_SIZE + S_SIZE) / elapsed / 1e6);

    free(output.results);
    free(R);
    free(S);
    return 0;
}

编译和运行

gcc -O2 -o hash_join hash_join.c -lm
./hash_join

典型输出:

Generating relations: R=1000000 tuples, S=500000 tuples
Running Grace Hash Join with 128 partitions...
Done: 2498317 matches in 0.487 seconds
Throughput: 3.08 M tuples/sec

实现要点

  1. 哈希函数:使用 murmur 风格的位混合,确保分布均匀。分区用的哈希和探测用的哈希不需要是不同的函数——这里我们用同一个 hash_key 但在分区时取模 num_partitions,在探测时取模 num_buckets,效果相当于两个不同的哈希
  2. 内存管理:分区使用动态数组,避免预先精确估算。生产代码中通常使用预分配的内存池
  3. 分区数选择num_partitions 应该让每个分区的 S 部分能完全放入 L2/L3 缓存。对于 8MB L3 缓存,每个分区约 64KB 是合适的

十、基准对比:三种算法的性能画像

Join 算法 I/O 成本对比

不同场景下的 I/O 成本对比

以下假设 |R| = 1000 页,|S| = 500 页:

场景 Buffer Block NLJ Hash Join Sort-Merge Hybrid HJ 胜者
小缓冲区 B=10 51,500 4,500 10,300 3,300 Hybrid HJ
中缓冲区 B=100 6,500 4,500 7,500 2,700 Hybrid HJ
大缓冲区 B=500 2,500 1,500 4,500 1,500 HJ / HHJ
预排序输入 B=100 6,500 4,500 1,500 2,700 Sort-Merge
有索引 B=100 2,000* 4,500 7,500 2,700 Index NLJ
极低选择率 B=100 1,200* 4,500 7,500 2,700 Index NLJ
非等值连接 B=100 6,500 N/A 7,500 N/A Block NLJ
重复键多 B=100 6,500 4,500+ 7,500 2,700+ 取决于倾斜度

*带索引的 NLJ 假设 B+ 树高度 = 3,选择率 = 0.01

不同选择率下的行为

选择率(selectivity)= 匹配行数 / (|R| * |S|)

高选择率(>10%):大量匹配,输出成本主导
    -> Hash Join 和 Sort-Merge Join 各有千秋
    -> NLJ 一定很慢

低选择率(<0.1%):极少匹配
    -> Index NLJ 最优(只访问匹配行)
    -> Hash Join 仍需要读全表
    -> Bloom filter 加持的 Hash Join 可以接近 Index NLJ

中等选择率(0.1%-10%):
    -> Hash Join 通常最优
    -> Sort-Merge Join 在输入已排序时竞争

CPU 缓存效应

在内存数据库中(没有磁盘 I/O),CPU 缓存成为性能瓶颈:

算法 L1 缓存友好 L2/L3 友好 TLB 友好
NLJ 差(随机访问内层)
Sort-Merge 好(顺序扫描)
Hash Join(无分区) 差(随机探测)
Hash Join(分区后) 好(分区适配缓存) 较好
Radix Hash Join 最好(多轮分区适配各级缓存) 最好 最好

Radix Hash Join(多轮基数分区)是专门为现代 CPU 缓存层级设计的算法,在纯内存场景下通常比 Grace Hash Join 快 2-3 倍。

十一、真实系统中的 Join 实现

PostgreSQL: hashjoin.c

PostgreSQL 的 Hash Join 实现是教科书级别的参考,位于 src/backend/executor/nodeHashjoin.c

关键设计:

1. 批次(Batch)概念:相当于 Grace Hash Join 的分区
   - 初始批次数根据 work_mem 和估计的 inner 表大小动态计算
   - 如果运行时发现某个批次太大,会增加批次数(doubling)

2. 内存管理:
   - 使用 work_mem 控制哈希表的内存上限(默认 4MB)
   - 当内存不够时,将多余的批次写到临时文件

3. Skew 优化:
   - 检测 inner 表中的高频值(Most Common Values,来自 pg_statistic)
   - 高频值的元组放在单独的 skew bucket 中,避免大分区

4. 并行 Hash Join(PostgreSQL 11+):
   - 多个 worker 并行构建共享哈希表
   - 使用 Barrier 同步 build 和 probe 阶段
-- 查看 PostgreSQL 选择的 Join 算法
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id;

-- 典型输出
Hash Join  (cost=3175.00..51487.50 rows=1000000 width=48)
  Hash Cond: (o.customer_id = c.id)
  -> Seq Scan on orders o  (cost=0.00..16370.00 rows=1000000 width=24)
  -> Hash  (cost=1925.00..1925.00 rows=100000 width=24)
        Buckets: 131072  Batches: 1  Memory Usage: 5157kB
        -> Seq Scan on customers c  (cost=0.00..1925.00 rows=100000 width=24)

MySQL: Block Nested Loop 和 Batched Key Access

MySQL 的 Join 策略历史上以 Nested Loop 为主:

MySQL 8.0 之前:
    - Simple NLJ + Index NLJ 为主
    - Block Nested Loop(BNL)用于无索引的情况
    - 没有原生的 Hash Join

MySQL 8.0.18+:
    - 引入了 Hash Join,用于等值连接且无索引的情况
    - BNL 被逐步替换为 Hash Join

MySQL 的 Batched Key Access (BKA):
    - 收集一批外层表的连接键
    - 对内层表的索引做批量查找(Multi-Range Read, MRR)
    - 利用排序将随机 I/O 转为顺序 I/O
-- MySQL 中开启 BKA
SET optimizer_switch='mrr=on,mrr_cost_based=off,batched_key_access=on';

EXPLAIN FORMAT=TREE
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id;

TiDB: 分布式 Join 策略

TiDB 作为分布式数据库,面临的 Join 挑战更加复杂——数据分布在多个节点上。

TiDB 的三种分布式 Join 策略:

1. Broadcast Join:
   - 将小表广播到所有节点
   - 每个节点本地做 Hash Join
   - 适用场景:一个表很小(< broadcast_threshold)

2. Shuffle Hash Join:
   - 按连接键对两个表做 hash shuffle
   - 相同哈希值的数据发送到同一个节点
   - 每个节点本地做 Hash Join
   - 适用场景:两个大表

3. Index Join(类似 Index NLJ 的分布式版本):
   - 逐批从外层表取数据
   - 用连接键在内层表的索引上查找
   - 适用场景:外层表较小或有高选择性的过滤条件
-- TiDB 中通过 Hint 指定 Join 策略
SELECT /*+ BROADCAST_JOIN(c) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.id;

SELECT /*+ SHUFFLE_JOIN(o, c) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.id;

分布式 Join 的核心挑战在于网络传输。一次网络往返(100us-1ms)比一次内存访问(100ns)慢了 3-4 个数量级。因此,减少数据 shuffle 量——通过 Bloom filter 预过滤、选择合适的 Join 策略、colocated join——是分布式查询优化的关键。

十二、工程实践:踩坑表与经验总结

工程陷阱一览

编号 陷阱 症状 解法
1 哈希函数质量差 分区严重不均,某些分区溢出 使用 murmur3、xxhash 等高质量哈希函数;避免简单取模
2 build side 选错 大表放在 build side,内存爆炸 查询优化器估算两表大小,选小表做 build side;运行时检测并切换
3 NULL 值处理遗漏 NULL = NULL 返回 true,产生错误结果 SQL 标准规定 NULL 不等于任何值(包括 NULL);Join 时显式跳过 NULL
4 基数估计严重失准 优化器选了错误的 Join 算法 使用直方图、HyperLogLog 改进基数估计;引入自适应执行
5 work_mem 设置过小 Hash Join 产生大量磁盘临时文件 监控临时文件使用量;适当增大 work_mem(但注意并发连接的总内存)
6 未考虑数据倾斜 个别分区超大,Join 时间远超预期 运行时倾斜检测 + 特殊处理;预先分析 MCV(Most Common Values)
7 字符串连接键的哈希开销 变长字符串的哈希计算成本被低估 如果连接键是长字符串,考虑预计算哈希值;或将字符串映射为整数 ID
8 并行 Join 的同步开销 线程数增加但性能不升反降 分析 barrier 等待和锁竞争;适当减少线程数或切换到分区并行
9 忽略输出成本 Join 结果集爆炸式增长(多对多连接) 提前估算结果集大小;考虑 semi-join 或 EXISTS 替代
10 SSD 上的算法选择 沿用针对 HDD 优化的算法,性能不理想 SSD 的随机读成本远低于 HDD;Index NLJ 在 SSD 上更有优势

我的一些看法

关于 Hash Join 与 Sort-Merge Join 的选择。在 OLTP 系统中,Hash Join 几乎总是赢家——因为 OLTP 查询涉及的数据量小,Hash Join 的常数因子更低。在 OLAP 系统中,Sort-Merge Join 仍然有其位置:当查询需要排序结果、当数据已经按连接键聚簇、当连接是范围连接时,Sort-Merge 都是更好的选择。

关于内存数据库中的 Join。当所有数据都在内存中时,I/O 成本模型不再适用。CPU 缓存局部性成为性能的决定因素。Radix Hash Join(多轮分区,每轮分区适配一级缓存)在学术基准测试中表现最好,但工程实现复杂。实际上,大多数内存数据库(如 MemSQL/SingleStore)仍然使用传统的分区 Hash Join,只是更注重内存分配和 SIMD 优化。

关于分布式 Join。分布式系统中的 Join 本质上是一个数据放置(data placement)问题。如果两个经常 Join 的表按相同的键做了 co-partition(共同分区),那么 Join 可以在每个节点本地完成,无需任何数据传输。这是为什么 TiDB、CockroachDB 等系统越来越重视”表的 co-location”特性。在设计 schema 时,选择合适的分区键,使常见的 Join 查询能够本地执行,可能比任何算法优化都更有效。

关于自适应执行。静态的查询优化器不可能总是做出最优选择——它依赖于统计信息,而统计信息可能过时或不准确。现代数据库越来越多地采用自适应执行:运行时监控 Join 的实际行为,如果发现选择率与估计差距过大,动态切换算法。例如,Oracle 的 Adaptive Join 会在运行时根据实际数据量决定使用 Hash Join 还是 Nested Loop Join。

关于硬件发展的影响。NVMe SSD 的随机读延迟已经降到 10us 以下,与内存的差距缩小到 2 个数量级。CXL(Compute Express Link)技术进一步模糊了本地内存和远端内存的界限。在这种硬件趋势下,传统的”最小化 I/O 次数”优化目标可能需要修正。未来的 Join 算法可能更关注带宽利用率而非 I/O 次数,更关注 SIMD 利用率而非算法复杂度。

附录:算法选择决策树

                       输入已排序?
                      /           \
                    是              否
                   /                 \
            Sort-Merge          有索引?
            (Cost: M+N)        /       \
                             是          否
                            /             \
                   选择率低?          等值连接?
                   /      \           /        \
                 是        否       是           否
                /           \     /              \
         Index NLJ     Block NLJ  内存够装 S?    Sort-Merge
         (最优)       (较好)     /        \       (范围连接)
                              是          否
                             /             \
                    Simple HJ          数据有倾斜?
                   (Cost: M+N)         /         \
                                     否           是
                                    /              \
                            Grace/Hybrid HJ    递归分区 HJ
                         (Cost: ~3(M+N))      + Bloom filter

附录:进一步阅读

  1. Shapiro, L. D. “Join Processing in Database Systems with Large Main Memories.” ACM Computing Surveys, 1986. – Hash Join 的经典综述
  2. Graefe, G. “Sort-Merge-Join: An Idea Whose Time Has(h) Passed?” ICDE, 1994. – Sort-Merge 与 Hash Join 的深入对比
  3. Balkesen, C. et al. “Main-Memory Hash Joins on Multi-Core CPUs: Tuning to the Underlying Hardware.” ICDE, 2013. – 现代硬件上的并行 Hash Join
  4. Leis, V. et al. “Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework.” SIGMOD, 2014. – HyPer 的并行执行框架
  5. Kersten, T. et al. “Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask.” VLDB, 2018. – 编译执行 vs 向量化执行

上一篇: 定时器算法 下一篇: 查询优化器

相关阅读: - B-tree 深度解剖 - MVCC 实现变体全解


By .