Join 算法全解:Nested Loop、Hash Join、Sort-Merge Join
数据库中最昂贵的操作,可能就是 Join。
一条看似简单的 SQL:
SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.amount > 1000;如果 orders 表有 1000 万行,customers 表有 100 万行,朴素的嵌套循环需要 10 万亿次比较。即使每次比较只要 1 纳秒,也需要 10000 秒——将近 3 小时。
但实际上,任何一个现代数据库都能在几秒内完成这个查询。原因很简单:没有人会真的用朴素嵌套循环。从 System R 时代开始,数据库研究者就在不断优化 Join 算法。从 Nested Loop 到 Sort-Merge,从 Classic Hash Join 到 Hybrid Hash Join,再到现代的向量化执行和并行 Hash Join,每一步优化都在解决同一个核心问题:如何用最少的 I/O 和 CPU 代价,把两张表中匹配的行拼在一起。
这篇文章从成本模型讲起,逐一拆解每种 Join 算法的原理、代价公式和适用场景。最后给出一个完整的 C 语言 Hash Join 实现,以及真实数据库系统中的工程细节。
一、Join 问题的形式化定义与成本模型
Join 操作的本质
Join 的本质是笛卡尔积的子集。给定关系 R 和 S,以及连接谓词 theta:
R ⋈_theta S = { (r, s) | r ∈ R, s ∈ S, theta(r, s) = true }
最常见的是等值连接(equi-join):R.a = S.b。范围连接(range
join)、半连接(semi-join)、反连接(anti-join)都是 Join
的变体,但等值连接是绝大多数优化技术的目标。
I/O 成本模型
在传统的磁盘数据库中,I/O 是瓶颈。我们用以下符号建立成本模型:
| 符号 | 含义 |
|---|---|
M |
关系 R 的页数(pages) |
N |
关系 S 的页数(pages) |
p_R |
每页 R 中的元组数 |
p_S |
每页 S 中的元组数 |
B |
可用的缓冲区页数(buffer pages) |
\|R\| |
R 的元组总数,= M * p_R |
\|S\| |
S 的元组总数,= N * p_S |
成本以 磁盘页面 I/O 次数 度量。我们忽略 CPU 成本——在机械硬盘时代,一次随机 I/O 大约 10ms,而一次内存比较不到 10ns,两者相差 6 个数量级。虽然 SSD 缩小了差距(随机读约 100us),但 I/O 仍然是主要矛盾。
为什么缓冲区大小决定一切
所有 Join 算法的 I/O 成本都是 B(缓冲区页数)的函数。缓冲区越大,能缓存的数据越多,I/O 越少。这是一个核心洞察:Join 算法的选择本质上是在给定内存预算下,如何最优地安排读写顺序。
成本 = f(M, N, B)
其中 B 是可用缓冲区页数。
当 B >= min(M, N) + 2 时,几乎所有算法都退化为简单的一趟扫描。
真正有趣的情况是 B << M 且 B << N。
二、Nested Loop Join:最朴素但最灵活
Simple Nested Loop Join
最直观的实现:对 R 中的每一行,扫描 S 中的每一行。
for each tuple r in R: -- 扫描 R 一次
for each tuple s in S: -- 每个 r 扫描 S 一次
if theta(r, s):
output (r, s)
I/O 成本:
Cost = M + |R| * N
= M + (M * p_R) * N
R 扫描一次花费 M 次 I/O。对于 R 中的每个元组(共
M * p_R 个),都要完整扫描 S 一次,每次 N 次
I/O。
如果 M = 1000,N = 500,p_R = 100:
Cost = 1000 + 1000 * 100 * 500 = 50,001,000 次 I/O
以每次 I/O 0.1ms(SSD)计算,需要 5000 秒。不可接受。
Block Nested Loop Join
核心优化:以页(block)为单位而非以元组为单位迭代外层关系。
for each block b_R of R (using B-2 pages):
for each page p_S of S:
for each tuple r in b_R:
for each tuple s in p_S:
if theta(r, s):
output (r, s)
分配 B-2 页给外层关系 R(1 页给 S 的输入,1 页给输出)。每次读入 B-2 页的 R,然后完整扫描 S。
I/O 成本:
Cost = M + ⌈M / (B-2)⌉ * N
R 仍然扫描一次。但现在 S 只需要扫描
⌈M / (B-2)⌉ 次,而不是 M * p_R
次。
同样的例子,B = 100:
Cost = 1000 + ⌈1000/98⌉ * 500
= 1000 + 11 * 500
= 6,500 次 I/O
从 5000 万降到 6500——改进了近 4 个数量级。
关键优化:永远把小表放在外层。如果
M < N,那么
⌈M/(B-2)⌉ * N < ⌈N/(B-2)⌉ * M。
Index Nested Loop Join
如果内层关系上有索引(B+ 树或哈希索引),可以用索引查找代替全表扫描。
for each tuple r in R:
-- 用 r.join_key 在 S 的索引上查找匹配
for each matching tuple s in S.index_lookup(r.join_key):
output (r, s)
I/O 成本:
Cost = M + |R| * C_index
其中 C_index 是一次索引查找的成本。对于 B+
树索引,C_index = h + 1(h 是树的高度,通常 2-4
层,加上一次数据页的读取)。对于哈希索引,C_index ≈ 1.2(平均
1.2 次 I/O)。
如果 R 有 1000 页,p_R = 100,B+ 树高度 = 3:
Cost = 1000 + 100,000 * 4 = 401,000 次 I/O
看起来不少,但如果 S 非常大(比如 100 万页),Index NLJ 可能比 Block NLJ 更好。而且,如果连接的选择率(selectivity)很低——即匹配的行很少——Index NLJ 的优势更加明显。
三种 NLJ 的适用场景
| 变体 | 最佳场景 | 限制 |
|---|---|---|
| Simple NLJ | 教学用途 | 几乎从不使用 |
| Block NLJ | 无索引、小到中等数据量 | 内存不足时 S 多次扫描 |
| Index NLJ | 内层有索引、低选择率 | 需要预建索引 |
三、Sort-Merge Join:有序数据的利器
算法概述
Sort-Merge Join 分两个阶段:
- Sort 阶段:分别对 R 和 S 按连接键排序(如果尚未排序)
- Merge 阶段:两个有序序列做归并
-- Phase 1: Sort
sort R on join key -> R_sorted
sort S on join key -> S_sorted
-- Phase 2: Merge
r = first tuple in R_sorted
s = first tuple in S_sorted
while r != EOF and s != EOF:
if r.key < s.key:
advance r
else if r.key > s.key:
advance s
else:
-- r.key == s.key, output all matching pairs
output all (r_i, s_j) where r_i.key == s_j.key == r.key
advance past current key group
External Sort 的成本
当数据无法全部放入内存时,需要外部排序。使用 B 个缓冲页:
Pass 0(初始运行生成):读入 B
页数据,内存中排序,写出。产生 ⌈M/B⌉
个有序运行(sorted runs),每个长 B 页。
后续 Pass(多路归并):每次将 B-1
个运行合并为一个更大的运行(1 页用于输出)。需要
⌈log_{B-1}(⌈M/B⌉)⌉ 趟归并。
排序总成本:
Sort(M) = 2 * M * (1 + ⌈log_{B-1}(⌈M/B⌉)⌉)
每趟读写各一次,所以乘以 2。
Sort-Merge Join 总成本
Cost = Sort(M) + Sort(N) + M + N
= 2M * (1 + ⌈log_{B-1}(⌈M/B⌉)⌉) + 2N * (1 + ⌈log_{B-1}(⌈N/B⌉)⌉) + M + N
最后的 M + N 是 merge 阶段的一趟扫描。
特殊情况:如果数据已经按连接键排序(比如连接键是聚簇索引的键),排序阶段免费,总成本就是
M + N——线性的。
数值实例
M = 1000,N = 500,B = 100:
Sort(1000) = 2 * 1000 * (1 + ⌈log_99(⌈1000/100⌉)⌉)
= 2000 * (1 + ⌈log_99(10)⌉)
= 2000 * (1 + 1)
= 4000
Sort(500) = 2 * 500 * (1 + ⌈log_99(⌈500/100⌉)⌉)
= 1000 * (1 + ⌈log_99(5)⌉)
= 1000 * (1 + 1)
= 2000
Total = 4000 + 2000 + 1000 + 500 = 7500 次 I/O
Sort-Merge Join 的优势
- 已排序输入:如果 R 或 S 已经按连接键排序(例如通过聚簇索引),排序阶段为零成本
- 范围连接:
R.a BETWEEN S.b AND S.c这种非等值连接,Sort-Merge 仍然适用,而 Hash Join 不行 - 排序结果可复用:排序后的数据可以服务后续的 ORDER BY、GROUP BY 或另一个 Sort-Merge Join
- 最坏情况保证:成本是确定性的
O(M log M + N log N),不像 Hash Join 可能因为数据倾斜而退化
-- 当查询同时需要排序时,Sort-Merge Join 可以"免费"提供排序结果
SELECT o.id, c.name, o.amount
FROM orders o JOIN customers c ON o.customer_id = c.id
ORDER BY o.customer_id; -- Sort-Merge 的排序结果正好满足这个 ORDER BY
四、Classic Hash Join:分区与探测
基本思想
Hash Join 的核心思想极其简单:用哈希表避免笛卡尔积。
-- Phase 1: Build(构建哈希表)
hash_table = {}
for each tuple s in S: -- S 是较小的表(build side)
key = hash(s.join_key)
hash_table[key].append(s)
-- Phase 2: Probe(探测)
for each tuple r in R: -- R 是较大的表(probe side)
key = hash(r.join_key)
for each tuple s in hash_table[key]:
if r.join_key == s.join_key:
output (r, s)
如果 S 能完全放入内存(即
M_S <= B - 2),这就是一个简单哈希连接(Simple
Hash Join),成本仅为 M + N——读 S
建表,读 R 探测,各扫描一次。
但如果 S 太大,放不进内存呢?
Grace Hash Join
Grace Hash Join(以日本筑波的 Grace 机器命名)使用分区来处理超出内存的情况。
Phase 1: Partition(分区)
用哈希函数 h1 将 R 和 S 各分成 B-1 个分区:
-- 分配 B-1 个输出缓冲页(每个分区一个),1 个输入缓冲页
for each tuple r in R:
i = h1(r.join_key) mod (B-1)
write r to partition R_i
for each tuple s in S:
i = h1(s.join_key) mod (B-1)
write s to partition S_i
关键性质:如果
r.join_key == s.join_key,那么 r 和 s
一定在同一个分区号中。因此,只需要对应分区之间做
Join,不同分区之间不会有匹配。
Phase 2: Build & Probe(逐分区连接)
for i = 0 to B-2:
-- 将 S_i 读入内存,构建哈希表(使用不同的哈希函数 h2)
hash_table = build_hash_table(S_i, h2)
-- 扫描 R_i,在哈希表中查找匹配
for each tuple r in R_i:
probe hash_table with h2(r.join_key)
output matches
每个分区 S_i 的大小大约是 N / (B-1)。只要
N / (B-1) <= B - 2,即
B > sqrt(N) + 1,每个分区就能放进内存。
I/O 成本:
Cost = 2 * (M + N) + (M + N)
= 3 * (M + N)
分区阶段:读 R 一次、写 R 一次;读 S 一次、写 S 一次。共
2(M + N)。 连接阶段:读所有 R 分区和 S
分区各一次。共 M + N。
对于我们的例子:
Cost = 3 * (1000 + 500) = 4500 次 I/O
比 Block NLJ 的 6500
更好,而且几乎不受缓冲区大小影响(只要
B > sqrt(N))。
为什么总是用小表做 Build Side
build side(构建哈希表的表)应该是较小的表。原因很简单:哈希表有额外的空间开销(指针、桶数组),实际需要的内存约为数据大小的 1.2-2 倍。选择小表可以最小化分区数,从而最小化 I/O。
五、Hybrid Hash Join:内存优化的艺术
Classic Hash Join 的浪费
Grace Hash Join
总是将所有数据写到磁盘再读回来,即使有足够的内存容纳一部分数据。当
B 明显大于 sqrt(N)
时,这种浪费尤其明显。
Hybrid Hash Join 的优化
核心思想:在分区阶段,将第一个分区直接保留在内存中,不写到磁盘。
Phase 1: Partition
-- 为分区 0 的 S 元组在内存中构建哈希表
-- 其他分区 1..k-1 正常写到磁盘
in_memory_ht = {}
for each tuple s in S:
i = h1(s.join_key) mod k
if i == 0:
in_memory_ht.insert(s) -- 直接建哈希表,不落盘
else:
write s to partition S_i
for each tuple r in R:
i = h1(r.join_key) mod k
if i == 0:
probe in_memory_ht -- 直接探测,不落盘
output matches
else:
write r to partition R_i
Phase 2: Build & Probe for partitions 1..k-1
-- 与 Grace Hash Join 相同
成本分析
设分区 0 占总数据的比例为 f = 1/k。分区 0
不写磁盘:
Cost = (M + N) -- 读入所有数据
+ 2 * (1-f) * (M + N) -- 非第 0 分区的写出和读回
= (M + N) * (1 + 2*(1-f))
= (M + N) * (3 - 2/k)
当 k 很大(缓冲区小)时,f 接近
0,成本趋近于 3(M+N)——与 Grace 相同。 当 k
很小(缓冲区大到可以装下大部分 S)时,f 接近 1,成本趋近于
(M+N)——与 Simple Hash Join 相同。
Hybrid Hash Join 平滑地在两个极端之间过渡:
B 很大(B >= N + 2): Cost = M + N -- 完全内存 Join
B 适中: Cost = (M+N) * (3-2/k) -- 部分内存,部分磁盘
B 刚够(B ~ sqrt(N)): Cost ≈ 3(M+N) -- 全磁盘 Grace Join
分区数量的选择
需要满足两个约束:
- 每个磁盘分区的 S
部分必须能放进内存:
N * (1-f) / (k-1) <= B - k - 内存中的分区 0
的哈希表不能超过可用内存:
N * f * overhead <= B_reserved
实际实现中,通常根据 S 的大小和可用内存动态计算 k,让分区 0 尽可能大。
六、Hash Join 的边界情况与高级技巧
数据倾斜(Skew)
当连接键的分布极不均匀时,某些分区可能远大于其他分区。例如,如果 90% 的订单来自 10% 的客户,那么这 10% 客户对应的分区将包含 90% 的数据。
一个巨大的分区可能无法放进内存,导致 Hash Join 失败或退化。
解决方案一:递归分区(Recursive Partitioning)
function join_partition(R_i, S_i, depth):
if S_i fits in memory:
-- 正常 build & probe
build hash table on S_i
probe with R_i
else:
-- S_i 太大,用新的哈希函数再次分区
re-partition R_i and S_i with h_{depth+1}
for each sub-partition j:
join_partition(R_i_j, S_i_j, depth + 1)
递归分区的最坏情况是所有键完全相同——此时无论怎么分区都无法减小分区大小。对于这种情况,只能回退到 Nested Loop Join。
解决方案二:倾斜检测与特殊处理
-- 分区后检查每个分区的大小
for each partition i:
if size(S_i) > B * threshold:
-- 标记为倾斜分区
-- 使用 Block NLJ 或 递归分区处理
handle_skewed_partition(R_i, S_i)
else:
normal_hash_join(R_i, S_i)
Bloom Filter 预过滤
在分区阶段,对 build side 构建一个 Bloom filter。在 probe 阶段,先用 Bloom filter 过滤 R 的元组——如果 Bloom filter 说”不存在”,那就一定不存在,可以跳过。
-- Build 阶段
bloom_filter = new BloomFilter(expected_size)
for each tuple s in S:
bloom_filter.add(s.join_key)
hash_table.insert(s)
-- Probe 阶段
for each tuple r in R:
if not bloom_filter.might_contain(r.join_key):
continue -- 一定没有匹配,跳过
-- 可能有匹配,正常探测
probe hash_table with r.join_key
当连接的选择率很低(即大部分 R 的行在 S 中没有匹配)时,Bloom filter 可以过滤掉大量的哈希表探测,显著减少 CPU 开销。在分布式 Join 中,Bloom filter 还可以在网络传输前过滤数据,减少数据 shuffle 量。
分区溢出处理
当内存不足以容纳某个分区时的处理策略:
策略一:溢出到磁盘
当某个分区的输出缓冲满了,刷到磁盘
分区阶段结束后,溢出的分区需要额外的 I/O 读回
策略二:动态调整分区数
监控每个分区的大小
当某个分区增长过快,拆分为多个子分区
策略三:预留 victim buffer
保留少量缓冲页作为溢出缓冲
当分区缓冲不够时,先驱逐(evict)最大的分区到磁盘
七、向量化执行:列式 Hash Join
传统火山模型的问题
传统数据库使用火山模型(Volcano / Iterator Model):每个算子每次产出一行。这意味着每处理一行就要经过多层虚函数调用,对 CPU 缓存极不友好。
-- 火山模型:一次一行
class HashJoinOperator:
def next():
while true:
r = probe_child.next() -- 虚函数调用
if r is EOF: return EOF
matches = hash_table.probe(r.join_key)
if matches:
return concatenate(r, matches[0])
列式向量化(DuckDB 方式)
现代分析型数据库(如 DuckDB、ClickHouse)使用向量化执行:每次处理一批(vector)数据,通常 1024 或 2048 行。
// 向量化 Hash Join 的 Probe 阶段
void hash_join_probe_vectorized(
Vector *join_keys, // 输入:一批 join key
Vector *hash_table, // 哈希表
SelectionVector *result // 输出:匹配的行号
) {
uint64_t hashes[VECTOR_SIZE];
uint32_t entries[VECTOR_SIZE];
// Step 1: 批量计算哈希值
hash_vector(join_keys, hashes, VECTOR_SIZE);
// Step 2: 批量查找桶位置
for (int i = 0; i < VECTOR_SIZE; i++) {
entries[i] = hash_table->buckets[hashes[i] & mask];
}
// Step 3: 批量比较键值
int match_count = 0;
for (int i = 0; i < VECTOR_SIZE; i++) {
while (entries[i] != EMPTY) {
if (compare_keys(join_keys, i, hash_table, entries[i])) {
result->set(match_count++, i, entries[i]);
}
entries[i] = hash_table->next[entries[i]];
}
}
}向量化的优势:
- 减少虚函数开销:每 1024 行才调用一次函数,而非每行
- SIMD 友好:批量哈希计算和键比较可以使用 SIMD 指令
- 缓存友好:列式存储的连接键在内存中连续,预取效率高
- 编译器优化:紧凑的循环体更容易被编译器自动向量化
列存中的 Hash Join 优化
传统行存的 Hash Join:
哈希表中存储完整行 -> 每行大小不固定,缓存利用率低
列存的 Hash Join:
哈希表只存储 row_id 和 join_key -> 固定大小条目
匹配后通过 row_id 延迟物化(late materialization)其他列
只有最终需要的列才会被读取
延迟物化(Late Materialization)是列存 Hash Join 的关键优化。如果查询只需要 R 的 3 列和 S 的 2 列,传统行存需要在哈希表中存储 S 的所有列,而列存只需要存储连接键和 row_id,匹配后再按需取其他列。
八、并行 Hash Join:分区与共享
Partitioned Parallel Hash Join
每个线程处理不同的分区,天然无锁:
Phase 1: Parallel Partition
-- 每个线程对自己负责的 R 和 S 分片进行分区
Thread i:
for each tuple in my_chunk_of_R:
partition into local buffers
for each tuple in my_chunk_of_S:
partition into local buffers
-- barrier --
-- 合并同一分区号的数据
Phase 2: Parallel Build & Probe
-- 每个线程处理一个或多个分区
Thread i:
for each partition assigned to me:
build hash table on S_partition
probe with R_partition
优点:无锁,扩展性好。 缺点:分区阶段本身有开销;分区不均可能导致负载不均衡。
Shared Hash Table Parallel Join
所有线程共享同一个哈希表:
Phase 1: Parallel Build
-- 所有线程并发向同一个哈希表插入 S 的元组
Thread i:
for each tuple s in my_chunk_of_S:
bucket = hash(s.join_key) & mask
-- 需要原子操作或分桶锁
atomic_insert(hash_table[bucket], s)
-- barrier --
Phase 2: Parallel Probe
-- 所有线程并发读哈希表(只读,天然线程安全)
Thread i:
for each tuple r in my_chunk_of_R:
probe hash_table with r.join_key
优点:不需要分区阶段,更简单。Probe 阶段只读,天然并行。 缺点:Build 阶段需要同步(原子操作或锁),在高并发下可能成为瓶颈。
NUMA 感知
在多路服务器上,内存访问并不均匀。访问本地 NUMA 节点的内存比远端快 2-3 倍。
NUMA 感知的 Hash Join 策略:
1. 分区阶段:每个 NUMA 节点处理本地数据的分区
2. 数据放置:确保每个分区的数据在处理它的 NUMA 节点上
3. 哈希表构建:在本地内存中分配哈希表
4. 探测阶段:尽量让探测线程访问本地的哈希表分区
核心原则:计算跟着数据走,而不是数据跟着计算走。
现代数据库如 HyPer、Umbra 在 Join 实现中显式考虑 NUMA
拓扑,通过 numactl 或 mbind
系统调用控制内存分配。
两种方案的对比
| 维度 | 分区并行 | 共享哈希表 |
|---|---|---|
| 扩展性 | 优秀,接近线性 | 受限于原子操作 |
| 缓存局部性 | 好(每个线程操作自己的分区) | 较差(多线程竞争缓存行) |
| NUMA 友好 | 是(数据局部性好) | 否(跨节点访问频繁) |
| 实现复杂度 | 较高(需要分区合并) | 较低 |
| 小数据量 | 分区开销可能不值得 | 更合适 |
| 大数据量 | 更合适 | 原子操作开销大 |
九、C 语言实现:完整的 Hash Join
下面是一个完整的 Hash Join 实现,包括分区和探测两个阶段,以及基本的内存管理。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <time.h>
/* ----------------------------------------------------------------
* 数据结构定义
* ---------------------------------------------------------------- */
typedef struct {
int32_t key;
int32_t payload;
} Tuple;
typedef struct HashEntry {
Tuple tuple;
struct HashEntry *next;
} HashEntry;
typedef struct {
HashEntry **buckets;
uint32_t num_buckets;
uint32_t mask; /* num_buckets - 1, 要求 num_buckets 是 2 的幂 */
uint32_t count;
} HashTable;
typedef struct {
Tuple *data;
uint32_t size;
uint32_t capacity;
} Partition;
typedef struct {
int32_t r_key;
int32_t r_payload;
int32_t s_key;
int32_t s_payload;
} JoinResult;
/* ----------------------------------------------------------------
* 哈希函数:使用 murmur-like 混合
* ---------------------------------------------------------------- */
static inline uint32_t hash_key(int32_t key)
{
uint32_t h = (uint32_t)key;
h ^= h >> 16;
h *= 0x45d9f3b;
h ^= h >> 16;
h *= 0x45d9f3b;
h ^= h >> 16;
return h;
}
/* ----------------------------------------------------------------
* 哈希表操作
* ---------------------------------------------------------------- */
static HashTable *ht_create(uint32_t expected_size)
{
HashTable *ht = (HashTable *)malloc(sizeof(HashTable));
/* 取大于 expected_size * 2 的最小 2 的幂 */
uint32_t nb = 1;
while (nb < expected_size * 2)
nb <<= 1;
ht->num_buckets = nb;
ht->mask = nb - 1;
ht->count = 0;
ht->buckets = (HashEntry **)calloc(nb, sizeof(HashEntry *));
return ht;
}
static void ht_insert(HashTable *ht, const Tuple *t)
{
uint32_t idx = hash_key(t->key) & ht->mask;
HashEntry *entry = (HashEntry *)malloc(sizeof(HashEntry));
entry->tuple = *t;
entry->next = ht->buckets[idx];
ht->buckets[idx] = entry;
ht->count++;
}
static void ht_destroy(HashTable *ht)
{
for (uint32_t i = 0; i < ht->num_buckets; i++) {
HashEntry *e = ht->buckets[i];
while (e) {
HashEntry *tmp = e;
e = e->next;
free(tmp);
}
}
free(ht->buckets);
free(ht);
}
/* ----------------------------------------------------------------
* 分区操作
* ---------------------------------------------------------------- */
static void partition_init(Partition *p, uint32_t capacity)
{
p->data = (Tuple *)malloc(capacity * sizeof(Tuple));
p->size = 0;
p->capacity = capacity;
}
static void partition_append(Partition *p, const Tuple *t)
{
if (p->size >= p->capacity) {
p->capacity *= 2;
p->data = (Tuple *)realloc(p->data, p->capacity * sizeof(Tuple));
}
p->data[p->size++] = *t;
}
static void partition_free(Partition *p)
{
free(p->data);
p->data = NULL;
p->size = 0;
p->capacity = 0;
}
/* ----------------------------------------------------------------
* Grace Hash Join 主流程
* ---------------------------------------------------------------- */
typedef struct {
JoinResult *results;
uint32_t size;
uint32_t capacity;
} ResultBuffer;
static void result_append(ResultBuffer *buf, int32_t rk, int32_t rp,
int32_t sk, int32_t sp)
{
if (buf->size >= buf->capacity) {
buf->capacity = buf->capacity ? buf->capacity * 2 : 1024;
buf->results = (JoinResult *)realloc(
buf->results, buf->capacity * sizeof(JoinResult));
}
JoinResult *jr = &buf->results[buf->size++];
jr->r_key = rk;
jr->r_payload = rp;
jr->s_key = sk;
jr->s_payload = sp;
}
uint32_t hash_join(const Tuple *R, uint32_t r_size,
const Tuple *S, uint32_t s_size,
uint32_t num_partitions,
ResultBuffer *output)
{
/* --- Phase 1: Partition --- */
Partition *r_parts = (Partition *)malloc(num_partitions * sizeof(Partition));
Partition *s_parts = (Partition *)malloc(num_partitions * sizeof(Partition));
uint32_t est = (r_size > s_size ? r_size : s_size) / num_partitions + 64;
for (uint32_t i = 0; i < num_partitions; i++) {
partition_init(&r_parts[i], est);
partition_init(&s_parts[i], est);
}
for (uint32_t i = 0; i < r_size; i++) {
uint32_t p = hash_key(R[i].key) % num_partitions;
partition_append(&r_parts[p], &R[i]);
}
for (uint32_t i = 0; i < s_size; i++) {
uint32_t p = hash_key(S[i].key) % num_partitions;
partition_append(&s_parts[p], &S[i]);
}
/* --- Phase 2: Build & Probe per partition --- */
uint32_t total_matches = 0;
for (uint32_t p = 0; p < num_partitions; p++) {
Partition *sp = &s_parts[p];
Partition *rp = &r_parts[p];
if (sp->size == 0 || rp->size == 0) {
continue;
}
/* Build 哈希表(在 S 分区上) */
HashTable *ht = ht_create(sp->size);
for (uint32_t i = 0; i < sp->size; i++) {
ht_insert(ht, &sp->data[i]);
}
/* Probe(在 R 分区上) */
for (uint32_t i = 0; i < rp->size; i++) {
uint32_t idx = hash_key(rp->data[i].key) & ht->mask;
HashEntry *entry = ht->buckets[idx];
while (entry) {
if (entry->tuple.key == rp->data[i].key) {
result_append(output,
rp->data[i].key, rp->data[i].payload,
entry->tuple.key, entry->tuple.payload);
total_matches++;
}
entry = entry->next;
}
}
ht_destroy(ht);
}
/* 清理分区 */
for (uint32_t i = 0; i < num_partitions; i++) {
partition_free(&r_parts[i]);
partition_free(&s_parts[i]);
}
free(r_parts);
free(s_parts);
return total_matches;
}
/* ----------------------------------------------------------------
* 测试与基准
* ---------------------------------------------------------------- */
static Tuple *generate_relation(uint32_t size, uint32_t key_range)
{
Tuple *rel = (Tuple *)malloc(size * sizeof(Tuple));
for (uint32_t i = 0; i < size; i++) {
rel[i].key = rand() % key_range;
rel[i].payload = (int32_t)i;
}
return rel;
}
int main(void)
{
srand((unsigned)time(NULL));
const uint32_t R_SIZE = 1000000;
const uint32_t S_SIZE = 500000;
const uint32_t KEY_RANGE = 200000;
const uint32_t NUM_PARTS = 128;
printf("Generating relations: R=%u tuples, S=%u tuples\n",
R_SIZE, S_SIZE);
Tuple *R = generate_relation(R_SIZE, KEY_RANGE);
Tuple *S = generate_relation(S_SIZE, KEY_RANGE);
ResultBuffer output = { NULL, 0, 0 };
printf("Running Grace Hash Join with %u partitions...\n", NUM_PARTS);
clock_t start = clock();
uint32_t matches = hash_join(R, R_SIZE, S, S_SIZE, NUM_PARTS, &output);
clock_t end = clock();
double elapsed = (double)(end - start) / CLOCKS_PER_SEC;
printf("Done: %u matches in %.3f seconds\n", matches, elapsed);
printf("Throughput: %.2f M tuples/sec\n",
(double)(R_SIZE + S_SIZE) / elapsed / 1e6);
free(output.results);
free(R);
free(S);
return 0;
}编译和运行
gcc -O2 -o hash_join hash_join.c -lm
./hash_join典型输出:
Generating relations: R=1000000 tuples, S=500000 tuples
Running Grace Hash Join with 128 partitions...
Done: 2498317 matches in 0.487 seconds
Throughput: 3.08 M tuples/sec
实现要点
- 哈希函数:使用 murmur
风格的位混合,确保分布均匀。分区用的哈希和探测用的哈希不需要是不同的函数——这里我们用同一个
hash_key但在分区时取模num_partitions,在探测时取模num_buckets,效果相当于两个不同的哈希 - 内存管理:分区使用动态数组,避免预先精确估算。生产代码中通常使用预分配的内存池
- 分区数选择:
num_partitions应该让每个分区的 S 部分能完全放入 L2/L3 缓存。对于 8MB L3 缓存,每个分区约 64KB 是合适的
十、基准对比:三种算法的性能画像
不同场景下的 I/O 成本对比
以下假设 |R| = 1000 页,|S| = 500 页:
| 场景 | Buffer | Block NLJ | Hash Join | Sort-Merge | Hybrid HJ | 胜者 |
|---|---|---|---|---|---|---|
| 小缓冲区 | B=10 | 51,500 | 4,500 | 10,300 | 3,300 | Hybrid HJ |
| 中缓冲区 | B=100 | 6,500 | 4,500 | 7,500 | 2,700 | Hybrid HJ |
| 大缓冲区 | B=500 | 2,500 | 1,500 | 4,500 | 1,500 | HJ / HHJ |
| 预排序输入 | B=100 | 6,500 | 4,500 | 1,500 | 2,700 | Sort-Merge |
| 有索引 | B=100 | 2,000* | 4,500 | 7,500 | 2,700 | Index NLJ |
| 极低选择率 | B=100 | 1,200* | 4,500 | 7,500 | 2,700 | Index NLJ |
| 非等值连接 | B=100 | 6,500 | N/A | 7,500 | N/A | Block NLJ |
| 重复键多 | B=100 | 6,500 | 4,500+ | 7,500 | 2,700+ | 取决于倾斜度 |
*带索引的 NLJ 假设 B+ 树高度 = 3,选择率 = 0.01
不同选择率下的行为
选择率(selectivity)= 匹配行数 / (|R| * |S|)
高选择率(>10%):大量匹配,输出成本主导
-> Hash Join 和 Sort-Merge Join 各有千秋
-> NLJ 一定很慢
低选择率(<0.1%):极少匹配
-> Index NLJ 最优(只访问匹配行)
-> Hash Join 仍需要读全表
-> Bloom filter 加持的 Hash Join 可以接近 Index NLJ
中等选择率(0.1%-10%):
-> Hash Join 通常最优
-> Sort-Merge Join 在输入已排序时竞争
CPU 缓存效应
在内存数据库中(没有磁盘 I/O),CPU 缓存成为性能瓶颈:
| 算法 | L1 缓存友好 | L2/L3 友好 | TLB 友好 |
|---|---|---|---|
| NLJ | 差(随机访问内层) | 差 | 差 |
| Sort-Merge | 好(顺序扫描) | 好 | 好 |
| Hash Join(无分区) | 差(随机探测) | 差 | 差 |
| Hash Join(分区后) | 好(分区适配缓存) | 好 | 较好 |
| Radix Hash Join | 最好(多轮分区适配各级缓存) | 最好 | 最好 |
Radix Hash Join(多轮基数分区)是专门为现代 CPU 缓存层级设计的算法,在纯内存场景下通常比 Grace Hash Join 快 2-3 倍。
十一、真实系统中的 Join 实现
PostgreSQL: hashjoin.c
PostgreSQL 的 Hash Join 实现是教科书级别的参考,位于
src/backend/executor/nodeHashjoin.c。
关键设计:
1. 批次(Batch)概念:相当于 Grace Hash Join 的分区
- 初始批次数根据 work_mem 和估计的 inner 表大小动态计算
- 如果运行时发现某个批次太大,会增加批次数(doubling)
2. 内存管理:
- 使用 work_mem 控制哈希表的内存上限(默认 4MB)
- 当内存不够时,将多余的批次写到临时文件
3. Skew 优化:
- 检测 inner 表中的高频值(Most Common Values,来自 pg_statistic)
- 高频值的元组放在单独的 skew bucket 中,避免大分区
4. 并行 Hash Join(PostgreSQL 11+):
- 多个 worker 并行构建共享哈希表
- 使用 Barrier 同步 build 和 probe 阶段
-- 查看 PostgreSQL 选择的 Join 算法
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id;
-- 典型输出
Hash Join (cost=3175.00..51487.50 rows=1000000 width=48)
Hash Cond: (o.customer_id = c.id)
-> Seq Scan on orders o (cost=0.00..16370.00 rows=1000000 width=24)
-> Hash (cost=1925.00..1925.00 rows=100000 width=24)
Buckets: 131072 Batches: 1 Memory Usage: 5157kB
-> Seq Scan on customers c (cost=0.00..1925.00 rows=100000 width=24)MySQL: Block Nested Loop 和 Batched Key Access
MySQL 的 Join 策略历史上以 Nested Loop 为主:
MySQL 8.0 之前:
- Simple NLJ + Index NLJ 为主
- Block Nested Loop(BNL)用于无索引的情况
- 没有原生的 Hash Join
MySQL 8.0.18+:
- 引入了 Hash Join,用于等值连接且无索引的情况
- BNL 被逐步替换为 Hash Join
MySQL 的 Batched Key Access (BKA):
- 收集一批外层表的连接键
- 对内层表的索引做批量查找(Multi-Range Read, MRR)
- 利用排序将随机 I/O 转为顺序 I/O
-- MySQL 中开启 BKA
SET optimizer_switch='mrr=on,mrr_cost_based=off,batched_key_access=on';
EXPLAIN FORMAT=TREE
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id;TiDB: 分布式 Join 策略
TiDB 作为分布式数据库,面临的 Join 挑战更加复杂——数据分布在多个节点上。
TiDB 的三种分布式 Join 策略:
1. Broadcast Join:
- 将小表广播到所有节点
- 每个节点本地做 Hash Join
- 适用场景:一个表很小(< broadcast_threshold)
2. Shuffle Hash Join:
- 按连接键对两个表做 hash shuffle
- 相同哈希值的数据发送到同一个节点
- 每个节点本地做 Hash Join
- 适用场景:两个大表
3. Index Join(类似 Index NLJ 的分布式版本):
- 逐批从外层表取数据
- 用连接键在内层表的索引上查找
- 适用场景:外层表较小或有高选择性的过滤条件
-- TiDB 中通过 Hint 指定 Join 策略
SELECT /*+ BROADCAST_JOIN(c) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.id;
SELECT /*+ SHUFFLE_JOIN(o, c) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.id;分布式 Join 的核心挑战在于网络传输。一次网络往返(100us-1ms)比一次内存访问(100ns)慢了 3-4 个数量级。因此,减少数据 shuffle 量——通过 Bloom filter 预过滤、选择合适的 Join 策略、colocated join——是分布式查询优化的关键。
十二、工程实践:踩坑表与经验总结
工程陷阱一览
| 编号 | 陷阱 | 症状 | 解法 |
|---|---|---|---|
| 1 | 哈希函数质量差 | 分区严重不均,某些分区溢出 | 使用 murmur3、xxhash 等高质量哈希函数;避免简单取模 |
| 2 | build side 选错 | 大表放在 build side,内存爆炸 | 查询优化器估算两表大小,选小表做 build side;运行时检测并切换 |
| 3 | NULL 值处理遗漏 | NULL = NULL 返回 true,产生错误结果 | SQL 标准规定 NULL 不等于任何值(包括 NULL);Join 时显式跳过 NULL |
| 4 | 基数估计严重失准 | 优化器选了错误的 Join 算法 | 使用直方图、HyperLogLog 改进基数估计;引入自适应执行 |
| 5 | work_mem 设置过小 | Hash Join 产生大量磁盘临时文件 | 监控临时文件使用量;适当增大 work_mem(但注意并发连接的总内存) |
| 6 | 未考虑数据倾斜 | 个别分区超大,Join 时间远超预期 | 运行时倾斜检测 + 特殊处理;预先分析 MCV(Most Common Values) |
| 7 | 字符串连接键的哈希开销 | 变长字符串的哈希计算成本被低估 | 如果连接键是长字符串,考虑预计算哈希值;或将字符串映射为整数 ID |
| 8 | 并行 Join 的同步开销 | 线程数增加但性能不升反降 | 分析 barrier 等待和锁竞争;适当减少线程数或切换到分区并行 |
| 9 | 忽略输出成本 | Join 结果集爆炸式增长(多对多连接) | 提前估算结果集大小;考虑 semi-join 或 EXISTS 替代 |
| 10 | SSD 上的算法选择 | 沿用针对 HDD 优化的算法,性能不理想 | SSD 的随机读成本远低于 HDD;Index NLJ 在 SSD 上更有优势 |
我的一些看法
关于 Hash Join 与 Sort-Merge Join 的选择。在 OLTP 系统中,Hash Join 几乎总是赢家——因为 OLTP 查询涉及的数据量小,Hash Join 的常数因子更低。在 OLAP 系统中,Sort-Merge Join 仍然有其位置:当查询需要排序结果、当数据已经按连接键聚簇、当连接是范围连接时,Sort-Merge 都是更好的选择。
关于内存数据库中的 Join。当所有数据都在内存中时,I/O 成本模型不再适用。CPU 缓存局部性成为性能的决定因素。Radix Hash Join(多轮分区,每轮分区适配一级缓存)在学术基准测试中表现最好,但工程实现复杂。实际上,大多数内存数据库(如 MemSQL/SingleStore)仍然使用传统的分区 Hash Join,只是更注重内存分配和 SIMD 优化。
关于分布式 Join。分布式系统中的 Join 本质上是一个数据放置(data placement)问题。如果两个经常 Join 的表按相同的键做了 co-partition(共同分区),那么 Join 可以在每个节点本地完成,无需任何数据传输。这是为什么 TiDB、CockroachDB 等系统越来越重视”表的 co-location”特性。在设计 schema 时,选择合适的分区键,使常见的 Join 查询能够本地执行,可能比任何算法优化都更有效。
关于自适应执行。静态的查询优化器不可能总是做出最优选择——它依赖于统计信息,而统计信息可能过时或不准确。现代数据库越来越多地采用自适应执行:运行时监控 Join 的实际行为,如果发现选择率与估计差距过大,动态切换算法。例如,Oracle 的 Adaptive Join 会在运行时根据实际数据量决定使用 Hash Join 还是 Nested Loop Join。
关于硬件发展的影响。NVMe SSD 的随机读延迟已经降到 10us 以下,与内存的差距缩小到 2 个数量级。CXL(Compute Express Link)技术进一步模糊了本地内存和远端内存的界限。在这种硬件趋势下,传统的”最小化 I/O 次数”优化目标可能需要修正。未来的 Join 算法可能更关注带宽利用率而非 I/O 次数,更关注 SIMD 利用率而非算法复杂度。
附录:算法选择决策树
输入已排序?
/ \
是 否
/ \
Sort-Merge 有索引?
(Cost: M+N) / \
是 否
/ \
选择率低? 等值连接?
/ \ / \
是 否 是 否
/ \ / \
Index NLJ Block NLJ 内存够装 S? Sort-Merge
(最优) (较好) / \ (范围连接)
是 否
/ \
Simple HJ 数据有倾斜?
(Cost: M+N) / \
否 是
/ \
Grace/Hybrid HJ 递归分区 HJ
(Cost: ~3(M+N)) + Bloom filter
附录:进一步阅读
- Shapiro, L. D. “Join Processing in Database Systems with Large Main Memories.” ACM Computing Surveys, 1986. – Hash Join 的经典综述
- Graefe, G. “Sort-Merge-Join: An Idea Whose Time Has(h) Passed?” ICDE, 1994. – Sort-Merge 与 Hash Join 的深入对比
- Balkesen, C. et al. “Main-Memory Hash Joins on Multi-Core CPUs: Tuning to the Underlying Hardware.” ICDE, 2013. – 现代硬件上的并行 Hash Join
- Leis, V. et al. “Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework.” SIGMOD, 2014. – HyPer 的并行执行框架
- Kersten, T. et al. “Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask.” VLDB, 2018. – 编译执行 vs 向量化执行
相关阅读: - B-tree 深度解剖 - MVCC 实现变体全解