写无锁数据结构的人,迟早会撞上同一堵墙:你什么时候才能安全地释放内存?
假设你在实现一个无锁队列。线程 A 通过 CAS
成功地把一个节点从队列中摘除了,正准备调用
free()。但此刻,线程 B
正拿着一个指向同一个节点的指针,准备读取它的
next 字段。如果 A 先 free() 了,B
读到的就是垃圾数据——这就是经典的
use-after-free,而且在并发场景下,它几乎不可能通过测试复现,只会在凌晨三点的生产环境里暴露。
这个问题被称为 Safe Memory Reclamation(SMR),它是所有无锁编程中最棘手的问题之一。2004 年,Maged Michael 在他的论文 “Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects” 中提出了一个优雅的解决方案。这个方案的核心思想简单得令人惊讶:让每个线程公开声明它正在访问哪些指针。
本文将从问题出发,完整地讲解 Hazard Pointers 的设计、实现和工程实践。
一、问题的本质:为什么 free() 在无锁世界中如此危险
在有锁的世界里,内存回收不是问题。持有锁的线程知道没有其他线程在访问被保护的数据,释放内存是安全的。但在无锁数据结构中,多个线程可以同时读取同一个节点,而没有任何互斥机制来协调它们。
让我们用一个最简单的场景来说明。考虑一个单链表:
HEAD -> [A] -> [B] -> [C] -> NULL
线程 1 想要删除节点 A,线程 2 正在遍历链表。
时间线:
T1 T2
| |
| CAS(HEAD, A, B) 成功 |
| | ptr = HEAD (读到了 A)
| free(A) |
| | val = ptr->data // BOOM!
| | use-after-free
问题在于,CAS 操作和内存释放之间没有因果关系。CAS 只保证 HEAD 的更新是原子的,但它无法阻止其他线程持有旧指针。
ABA 问题的内存视角
这个问题和 ABA 问题密切相关,但更加根本。ABA 问题关注的是指针值的复用——一个被释放的地址可能被重新分配,导致 CAS 成功但语义错误。而内存回收问题更基础:即使没有 ABA,单纯的 use-after-free 就已经是未定义行为了。
在带 GC 的语言(Java、Go)中,这个问题”自动消失”了——GC 保证只要有引用存在,对象就不会被回收。但在 C/C++ 中,我们必须手动解决它。
二、一个具体的例子:Michael-Scott 队列中的 use-after-free
Michael-Scott 队列是最经典的无锁队列实现之一。它的 dequeue 操作大致如下:
typedef struct node {
void *data;
struct node *next;
} node_t;
typedef struct queue {
node_t *head; // 哨兵节点
node_t *tail;
} queue_t;
void *dequeue(queue_t *q) {
node_t *head, *tail, *next;
void *data;
while (1) {
head = q->head; // 1. 读 head
tail = q->tail; // 2. 读 tail
next = head->next; // 3. 读 head->next <-- 危险!
if (head != q->head) // 4. 一致性检查
continue;
if (head == tail) {
if (next == NULL)
return NULL; // 队列为空
CAS(&q->tail, tail, next); // 帮助推进 tail
continue;
}
data = next->data; // 5. 读数据 <-- 危险!
if (CAS(&q->head, head, next)) // 6. 推进 head
break;
}
// head 现在是旧的哨兵节点,可以"释放"
// 但是,其他线程可能还在步骤 1-5 中持有 head 的引用!
free(head); // <-- 这行代码是不安全的
return data;
}注意步骤
3:next = head->next。当线程执行这一行时,head
可能已经被另一个线程删除并释放了。如果 head
指向的内存已经被归还给操作系统或者被其他 malloc
调用复用,那么 head->next
的解引用就是灾难性的。
这就是为什么我们需要一种机制来推迟内存释放,直到确定没有线程在访问它。
三、Hazard Pointers 的核心思想
Maged Michael 的 Hazard Pointers 方案基于一个简洁的协议:
在访问一个共享节点之前,线程必须将该节点的地址发布到一个全局可见的 hazard pointer 槽中。在回收内存之前,回收者必须检查所有线程的 hazard pointer,确保没有线程正在访问待回收的节点。
整个机制由三个部分组成:
3.1 Hazard Pointer 数组
每个线程拥有固定数量(通常是 1-2 个)的 hazard pointer 槽位。这些槽位组成一个全局数组,任何线程都可以读取。
Global HP Array:
Thread 0: [hp0 = 0xA000] [hp1 = NULL ]
Thread 1: [hp0 = 0xC000] [hp1 = 0xA000 ]
Thread 2: [hp0 = NULL ] [hp1 = 0xB000 ]
3.2 Retire 操作
当一个线程从数据结构中移除一个节点后,它不直接
free() 该节点,而是将其放入本线程的
retired list(退役列表)。
void retire(hp_domain_t *domain, void *ptr) {
// 加入本线程的退役列表
add_to_retired_list(ptr);
// 如果退役列表超过阈值,触发扫描
if (retired_count >= SCAN_THRESHOLD)
scan(domain);
}3.3 Scan 操作
这是 Hazard Pointers 最精妙的部分。当退役列表积累到一定程度,线程执行一次 scan:
- 收集:读取所有线程的所有 hazard
pointer,形成一个保护集合
plist。 - 比对:遍历自己的退役列表,检查每个节点是否在
plist中。 - 回收:不在
plist中的节点可以安全释放;在plist中的节点保留在退役列表中等待下次扫描。
下图展示了 Scan 算法的完整流程:
四、Hazard Pointer 的使用协议
使用 Hazard Pointer 保护一个共享指针的标准协议如下:
// 安全地读取一个共享指针
node_t *protect(atomic_ptr *source, hazard_pointer_t *hp) {
node_t *ptr;
do {
ptr = atomic_load(source); // 1. 读取共享指针
atomic_store(hp, ptr); // 2. 发布到 HP 槽
// 需要 StoreLoad barrier
atomic_thread_fence(memory_order_seq_cst);
} while (ptr != atomic_load(source)); // 3. 验证指针未变
return ptr;
}这个 “读取-发布-验证” 的三步协议是 Hazard Pointers 正确性的基础。为什么需要验证?因为在步骤 1 和步骤 2 之间,存在一个时间窗口:
线程 A(读取者) 线程 B(回收者)
ptr = load(source)
remove node from structure
retire(node)
scan() // 此时 A 的 HP 还是旧值
// node 不在任何 HP 中
free(node) // 释放了!
store(hp, ptr)
// 太晚了,发布了一个已被释放的地址
验证步骤(re-read source)确保:如果指针在发布之前已经改变(意味着节点可能已经被 retire),我们会重新开始。如果指针没变,说明节点还在数据结构中,而此时我们的 HP 已经发布,任何后续的 scan 都会看到它。
内存序要求
步骤 2 和步骤 3 之间的 StoreLoad fence 是关键的。它确保: - HP 的写入对其他线程可见(在 scan 读取 HP 之前)。 - 步骤 3 的读取不会被重排到步骤 2 之前。
在 x86 架构上,普通的 store 就包含 StoreStore 语义,但
StoreLoad 需要显式的 mfence 或者使用
memory_order_seq_cst。在 ARM/RISC-V
等弱序架构上,这个 fence 的成本更高。
五、Scan 算法详解
下面是 Scan 算法的伪代码,忠实于 Michael 的原始论文:
procedure Scan(hp_domain):
// 阶段 1:收集所有活跃的 hazard pointers
plist = {}
for each thread T in hp_domain:
for each hp in T.hazard_pointers:
ptr = load(hp)
if ptr != NULL:
plist.insert(ptr)
// 阶段 2:对 plist 排序(用于二分查找)
sort(plist)
// 阶段 3:扫描退役列表
new_retired = {}
for each node in my_retired_list:
if binary_search(plist, node):
// 被保护,不能释放
new_retired.append(node)
else:
// 安全,释放
free(node)
my_retired_list = new_retired
复杂度分析
设 N 为线程数,H 为每线程 hazard pointer 个数,R 为退役列表长度。
- 阶段 1:O(N * H) 次原子读。
- 阶段 2:O(N * H * log(N * H)) 排序。
- 阶段 3:O(R * log(N * H)) 二分查找。
如果我们设置阈值 R >= 2 * N * H,那么每次 scan 至少能回收 R - N*H >= R/2 个节点。这意味着:
- 每个节点的摊还回收成本:O(1) 次原子读 + O(log(N*H)) 的查找开销。
- 空间开销:O(N * H + N * R) 个未释放节点。
为什么阈值是 2NH
这是一个精巧的设计。在任何时刻,最多有 N * H 个节点被 hazard pointer 保护。所以如果退役列表有 2NH 个节点,至少有 NH 个节点是可以安全释放的。这保证了每次 scan 都是”值得的”——我们永远不会白白做一次 scan 却什么都释放不了。
六、完整的 C 实现
下面是一个功能完整的 Hazard Pointer 库实现。代码经过精心设计,适合在实际项目中参考使用。
/* hazard_pointer.h */
#ifndef HAZARD_POINTER_H
#define HAZARD_POINTER_H
#include <stdatomic.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
/* ========== 配置常量 ========== */
#define HP_MAX_THREADS 64
#define HP_MAX_HPS 2 /* 每个线程的 hazard pointer 数量 */
#define HP_SCAN_THRESHOLD 32 /* retired list 阈值 */
/* ========== 数据结构 ========== */
/* 退役节点链表 */
typedef struct hp_retired_node {
void *ptr;
void (*deleter)(void *);
struct hp_retired_node *next;
} hp_retired_node_t;
/* 每线程记录 */
typedef struct hp_thread_record {
_Atomic(void *) hp[HP_MAX_HPS]; /* hazard pointer 槽位 */
_Atomic(bool) active; /* 是否被某线程持有 */
hp_retired_node_t *retired_list; /* 退役列表(线程本地) */
int retired_count; /* 退役列表长度 */
struct hp_thread_record *next; /* 全局链表 */
} hp_thread_record_t;
/* Hazard Pointer 域 */
typedef struct hp_domain {
_Atomic(hp_thread_record_t *) head; /* 线程记录链表头 */
_Atomic(int) thread_count; /* 活跃线程数 */
} hp_domain_t;
/* ========== 全局域 ========== */
static hp_domain_t g_hp_domain = {
.head = NULL,
.thread_count = 0
};
/* 线程局部存储:每个线程的记录 */
static _Thread_local hp_thread_record_t *tl_my_record = NULL;
/* ========== 线程注册与注销 ========== */
/*
* hp_thread_acquire - 获取一个线程记录
* 优先复用已注销线程的记录,否则分配新的。
*/
static hp_thread_record_t *hp_thread_acquire(hp_domain_t *domain) {
hp_thread_record_t *rec;
/* 尝试复用已有的非活跃记录 */
rec = atomic_load_explicit(&domain->head, memory_order_acquire);
while (rec != NULL) {
bool expected = false;
if (atomic_compare_exchange_strong_explicit(
&rec->active, &expected, true,
memory_order_acq_rel, memory_order_relaxed)) {
/* 成功获取一个已有记录 */
return rec;
}
rec = rec->next;
}
/* 没有可复用的记录,分配新的 */
rec = (hp_thread_record_t *)calloc(1, sizeof(hp_thread_record_t));
if (!rec) return NULL;
atomic_store_explicit(&rec->active, true, memory_order_relaxed);
for (int i = 0; i < HP_MAX_HPS; i++) {
atomic_store_explicit(&rec->hp[i], NULL, memory_order_relaxed);
}
rec->retired_list = NULL;
rec->retired_count = 0;
/* 用 CAS 插入全局链表头部 */
hp_thread_record_t *old_head;
do {
old_head = atomic_load_explicit(&domain->head, memory_order_acquire);
rec->next = old_head;
} while (!atomic_compare_exchange_weak_explicit(
&domain->head, &old_head, rec,
memory_order_acq_rel, memory_order_relaxed));
atomic_fetch_add_explicit(&domain->thread_count, 1, memory_order_relaxed);
return rec;
}
/*
* hp_thread_release - 注销当前线程的记录
*/
static void hp_thread_release(hp_thread_record_t *rec) {
for (int i = 0; i < HP_MAX_HPS; i++) {
atomic_store_explicit(&rec->hp[i], NULL, memory_order_release);
}
atomic_store_explicit(&rec->active, false, memory_order_release);
}
/* ========== Hazard Pointer 操作 ========== */
/*
* hp_protect - "读取-发布-验证"三步协议
* 安全地获取一个共享指针的快照并发布 hazard pointer。
*/
static void *hp_protect(int hp_index, _Atomic(void *) *source,
hp_thread_record_t *rec) {
void *ptr;
do {
ptr = atomic_load_explicit(source, memory_order_acquire);
atomic_store_explicit(&rec->hp[hp_index], ptr, memory_order_release);
atomic_thread_fence(memory_order_seq_cst);
} while (ptr != atomic_load_explicit(source, memory_order_acquire));
return ptr;
}
/*
* hp_clear - 清除一个 hazard pointer 槽
*/
static void hp_clear(int hp_index, hp_thread_record_t *rec) {
atomic_store_explicit(&rec->hp[hp_index], NULL, memory_order_release);
}
/* ========== Scan 算法 ========== */
/*
* pointer 比较函数,用于 qsort 和 bsearch
*/
static int ptr_compare(const void *a, const void *b) {
uintptr_t pa = (uintptr_t)(*(void **)a);
uintptr_t pb = (uintptr_t)(*(void **)b);
if (pa < pb) return -1;
if (pa > pb) return 1;
return 0;
}
/*
* hp_scan - 扫描并回收安全的退役节点
*/
static void hp_scan(hp_domain_t *domain, hp_thread_record_t *rec) {
int hp_count = 0;
int max_hps = HP_MAX_THREADS * HP_MAX_HPS;
void **plist = (void **)malloc(sizeof(void *) * max_hps);
if (!plist) return;
/* 阶段 1:收集所有活跃的 hazard pointers */
hp_thread_record_t *cur = atomic_load_explicit(
&domain->head, memory_order_acquire);
while (cur != NULL) {
if (atomic_load_explicit(&cur->active, memory_order_acquire)) {
for (int i = 0; i < HP_MAX_HPS; i++) {
void *hp = atomic_load_explicit(
&cur->hp[i], memory_order_acquire);
if (hp != NULL) {
plist[hp_count++] = hp;
}
}
}
cur = cur->next;
}
/* 阶段 2:排序,用于后续二分查找 */
qsort(plist, hp_count, sizeof(void *), ptr_compare);
/* 阶段 3:扫描退役列表 */
hp_retired_node_t *new_list = NULL;
int new_count = 0;
hp_retired_node_t *node = rec->retired_list;
while (node != NULL) {
hp_retired_node_t *next = node->next;
/* 二分查找:该节点是否被任何 HP 保护? */
bool is_protected = (hp_count > 0) &&
bsearch(&node->ptr, plist, hp_count,
sizeof(void *), ptr_compare) != NULL;
if (is_protected) {
/* 被保护,保留在退役列表 */
node->next = new_list;
new_list = node;
new_count++;
} else {
/* 安全,调用 deleter 释放 */
node->deleter(node->ptr);
free(node);
}
node = next;
}
rec->retired_list = new_list;
rec->retired_count = new_count;
free(plist);
}
/* ========== Retire 操作 ========== */
/*
* hp_retire - 将一个节点加入退役列表
* 当退役列表超过阈值时触发 scan。
*/
static void hp_retire(hp_domain_t *domain, hp_thread_record_t *rec,
void *ptr, void (*deleter)(void *)) {
hp_retired_node_t *rnode =
(hp_retired_node_t *)malloc(sizeof(hp_retired_node_t));
if (!rnode) {
deleter(ptr);
return;
}
rnode->ptr = ptr;
rnode->deleter = deleter;
rnode->next = rec->retired_list;
rec->retired_list = rnode;
rec->retired_count++;
int threshold = 2 * atomic_load_explicit(
&domain->thread_count, memory_order_relaxed) * HP_MAX_HPS;
if (threshold < HP_SCAN_THRESHOLD)
threshold = HP_SCAN_THRESHOLD;
if (rec->retired_count >= threshold) {
hp_scan(domain, rec);
}
}
/* ========== 便捷接口 ========== */
/*
* hp_init - 初始化当前线程
* 必须在线程开始使用 HP 前调用。
*/
static inline void hp_init(void) {
if (tl_my_record == NULL) {
tl_my_record = hp_thread_acquire(&g_hp_domain);
}
}
/*
* hp_deinit - 注销当前线程
* 在线程退出前调用,会尝试回收所有退役节点。
*/
static inline void hp_deinit(void) {
if (tl_my_record != NULL) {
/* 尝试最后一次 scan */
hp_scan(&g_hp_domain, tl_my_record);
/* 剩余无法释放的节点,交给其他线程处理(简化处理:此处不转移) */
hp_thread_release(tl_my_record);
tl_my_record = NULL;
}
}
#endif /* HAZARD_POINTER_H */这段实现大约 220 行,涵盖了 Hazard Pointer 的全部核心逻辑。几个值得注意的设计决策:
- 线程记录复用:通过
active标志和 CAS,避免了为每个线程分配新的记录。这在线程池环境中很重要。 - 动态阈值:scan 阈值基于当前线程数动态计算,而不是写死。
- 自定义 deleter:通过函数指针支持不同类型节点的释放策略。
- 头插法链表:线程记录用无锁链表组织,插入是 O(1) 的。
七、集成示例:带 Hazard Pointers 的无锁栈
让我们把上面的 HP 库集成到一个完整的无锁栈中,展示实际使用方式。
/* lockfree_stack.c */
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include "hazard_pointer.h"
/* ========== 无锁栈定义 ========== */
typedef struct stack_node {
int value;
struct stack_node *next;
} stack_node_t;
typedef struct lockfree_stack {
_Atomic(void *) top;
} lockfree_stack_t;
static void stack_init(lockfree_stack_t *s) {
atomic_store(&s->top, NULL);
}
static void stack_push(lockfree_stack_t *s, int value) {
stack_node_t *node = (stack_node_t *)malloc(sizeof(stack_node_t));
node->value = value;
stack_node_t *old_top;
do {
old_top = atomic_load_explicit(&s->top, memory_order_acquire);
node->next = old_top;
} while (!atomic_compare_exchange_weak_explicit(
&s->top, (void **)&old_top, node,
memory_order_acq_rel, memory_order_relaxed));
}
static bool stack_pop(lockfree_stack_t *s, int *out_value) {
hp_thread_record_t *rec = tl_my_record;
stack_node_t *top;
while (1) {
/* 用 HP 保护 top 节点 */
top = (stack_node_t *)hp_protect(0, &s->top, rec);
if (top == NULL) {
hp_clear(0, rec);
return false; /* 栈为空 */
}
stack_node_t *next = top->next;
if (atomic_compare_exchange_weak_explicit(
&s->top, (void **)&top, next,
memory_order_acq_rel, memory_order_relaxed)) {
/* CAS 成功,摘除了 top 节点 */
*out_value = top->value;
hp_clear(0, rec);
/* 不是 free(top),而是 retire */
hp_retire(&g_hp_domain, rec, top, free);
return true;
}
/* CAS 失败,重试(HP 会在下一轮循环中更新) */
}
}
/* ========== 测试 ========== */
#define NUM_THREADS 8
#define OPS_PER_THREAD 100000
static lockfree_stack_t g_stack;
static _Atomic(long) g_push_sum = 0;
static _Atomic(long) g_pop_sum = 0;
static void *worker(void *arg) {
int tid = (int)(long)arg;
hp_init();
long local_push_sum = 0;
long local_pop_sum = 0;
for (int i = 0; i < OPS_PER_THREAD; i++) {
if (i % 2 == 0) {
int val = tid * OPS_PER_THREAD + i;
stack_push(&g_stack, val);
local_push_sum += val;
} else {
int val;
if (stack_pop(&g_stack, &val)) {
local_pop_sum += val;
}
}
}
atomic_fetch_add(&g_push_sum, local_push_sum);
atomic_fetch_add(&g_pop_sum, local_pop_sum);
hp_deinit();
return NULL;
}
int main(void) {
stack_init(&g_stack);
pthread_t threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, worker, (void *)(long)i);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
/* 弹出栈中剩余元素 */
hp_init();
long drain_sum = 0;
int val;
while (stack_pop(&g_stack, &val)) {
drain_sum += val;
}
hp_deinit();
long total_push = atomic_load(&g_push_sum);
long total_pop = atomic_load(&g_pop_sum) + drain_sum;
printf("push sum = %ld\n", total_push);
printf("pop sum = %ld\n", total_pop);
printf("match: %s\n", total_push == total_pop ? "YES" : "NO");
return 0;
}编译和运行:
gcc -std=c11 -O2 -pthread lockfree_stack.c -o lockfree_stack
./lockfree_stack这个测试验证了一个关键不变量:所有 push 进去的值之和,等于所有 pop 出来的值之和。 如果 HP 实现有 bug 导致 use-after-free 或者重复释放,这个不变量会被破坏(或者程序直接崩溃)。
八、性能特征分析
Hazard Pointers 的性能特征需要从读取路径和回收路径两个维度分析。
8.1 读取路径开销
每次 hp_protect 调用的成本:
| 操作 | 成本 | 说明 |
|---|---|---|
第一次 atomic_load |
1 次原子读 | 读取共享指针 |
atomic_store 到 HP 槽 |
1 次原子写 | 发布 hazard pointer |
atomic_thread_fence(seq_cst) |
1 次 StoreLoad fence | 最昂贵的部分 |
验证 atomic_load |
1 次原子读 | 通常不需要重试 |
在 x86 上,seq_cst fence 编译为
mfence 指令,大约 30-50ns。在 ARM 上,这个
fence 的成本更高(dmb ish),大约
50-100ns。
关键洞察:这个开销是每次读取操作都要付出的。对于读密集的数据结构(如无锁哈希表的 lookup),这可能是显著的瓶颈。
8.2 回收路径开销(摊还)
假设 N 个线程,每个线程 H 个 HP:
| 参数 | 值 |
|---|---|
| Scan 触发阈值 | R = 2NH |
| 每次 Scan 的原子读 | NH 次 |
| 排序 | O(NH log(NH)) |
| 二分查找 | O(R log(NH)) |
| 每节点摊还成本 | O(log(NH)) |
8.3 空间开销
这是 Hazard Pointers 的一个弱点。在任何时刻,系统中可能有 O(N^2 * H) 个未释放的退役节点(每个线程最多积累 2NH 个)。对于大量线程的场景,这可能导致显著的内存浪费。
九、与其他 SMR 方案的对比
Hazard Pointers 不是唯一的 SMR 方案。下面是三种主流方案的全面对比。
9.1 对比表
| 特性 | Hazard Pointers | Epoch-Based Reclamation | RCU |
|---|---|---|---|
| 发明者 | Maged Michael (2004) | Keir Fraser (2004) | Paul McKenney (1998) |
| 保护粒度 | 每个指针 | 整个临界区 | 整个读侧临界区 |
| 读取开销 | 原子写 + fence + 验证 | 仅一次原子操作(进入 epoch) | 几乎为零(compiler barrier) |
| 回收延迟 | 有界:O(N^2*H) | 无界(一个慢线程可阻塞所有回收) | 有界(grace period) |
| 实现复杂度 | 中等 | 简单 | 复杂(需要 OS 支持) |
| 适合场景 | 通用无锁数据结构 | 短临界区,可控线程 | 读极多写极少 |
| 线程可以阻塞? | 可以(不影响其他线程回收) | 不可以(会阻塞全局回收) | 不可以(用户态 RCU 不可以) |
| ABA 保护 | 是(延迟释放自然防止 ABA) | 是 | 是 |
9.2 Epoch-Based Reclamation(EBR)
EBR 的核心思想更简单:维护一个全局 epoch 计数器。线程在进入临界区时记录当前 epoch,退出时清除。只有当所有线程都经过了某个 epoch 之后,该 epoch 之前退役的节点才能被释放。
Epoch 0 Epoch 1 Epoch 2
+---------+ +---------+ +---------+
全局 epoch: | 0 | -> | 1 | -> | 2 |
+---------+ +---------+ +---------+
| | |
Thread 0: [in epoch 0] [in epoch 1] idle
Thread 1: idle [in epoch 1] [in epoch 2]
|
当所有线程都离开 epoch 0 后,
epoch 0 的退役节点可以释放
EBR 的最大优势是读取路径几乎没有开销(不需要 per-pointer 的 fence)。最大弱点是:如果一个线程长时间停留在一个 epoch 中(比如被抢占、执行了一个很长的操作),所有 epoch 的回收都会被阻塞,导致无界的内存增长。
9.3 RCU(Read-Copy-Update)
RCU
专为”读多写少”场景设计。读侧临界区(rcu_read_lock()
/ rcu_read_unlock())几乎没有开销——在 Linux
内核中,它们编译为空操作或者仅仅是一个编译器屏障。写侧通过
synchronize_rcu() 等待一个 grace
period,确保所有正在进行的读侧临界区都已结束。
RCU
的核心限制是:它依赖于调度器的协作来确定
grace period 的结束。在用户态实现中(如
liburcu),这通常意味着读侧临界区不能包含阻塞操作。
9.4 选择建议
- 通用无锁数据结构:Hazard Pointers 是最安全的选择,因为它提供有界的内存开销且允许线程被抢占。
- 高性能读密集场景(如路由表、配置查找):RCU 的读侧零开销是无可比拟的。
- 短临界区 + 已知线程模型:EBR 实现简单,读取开销低。
- 混合方案:一些现代系统结合多种技术。例如 Crossbeam(Rust)使用基于 epoch 的方案,但加入了本地垃圾收集来缓解 EBR 的内存无界问题。
十、现代发展:Hazard Eras 与 Interval-Based Reclamation
Hazard Pointers 的原始设计已经有 20 多年了。研究者们在此基础上做了许多改进。
10.1 Hazard Eras(2017)
Hazard Eras 由 Ramalhete 和 Correia 提出,是 Hazard Pointers 和 EBR 的混合体。核心思想是:用时间戳(era)替代精确的指针值来标记保护范围。
传统 HP: "我正在访问地址 0xA000" --> 精确保护
Hazard Eras:"我正在访问 era 42 时存活的节点" --> 范围保护
每个共享节点在被加入数据结构时打上当前 era 的时间戳,退役时也打上时间戳。线程在进入临界区时发布当前 era。扫描时,只要一个节点的”出生 era”到”退役 era”范围与某个线程的活跃 era 重叠,它就不能被回收。
Hazard Eras 的优势: - 读取路径更便宜:只需发布一个 era 值(一次原子写),不需要 per-pointer 的 fence。 - 仍然有界:不像 EBR 那样可能无限积累。
代价是牺牲了一些回收精度——可能会多保留一些实际上已经安全的节点。
10.2 Interval-Based Reclamation(IBR,2018)
Wen 等人提出的 IBR 进一步泛化了时间戳方案。每个节点关联一个”保护区间”[birth, retire],每个线程关联一个”活跃区间”[enter, exit]。只要两个区间不重叠,节点就可以安全回收。
IBR 被认为是 HP 和 EBR 之间最好的平衡点之一,在实践中表现出色。
10.3 Hyaline(2020)
Nikolaev 和 Bhatt 提出的 Hyaline 使用引用计数的变体来实现 SMR。它的回收延迟极低(接近 HP),但读取路径开销更小。Hyaline 的设计比 HP 更复杂,但在某些工作负载下性能显著更好。
十一、工业实践:folly::HazardPointer 与 C++ P2530
11.1 Meta 的 folly::HazardPointer
Meta(原 Facebook)的 Folly 库包含了一个生产级的 Hazard
Pointer 实现。它被广泛用于 Meta 的基础设施中,包括
folly::ConcurrentHashMap 和
folly::AtomicUnorderedInsertMap。
folly 实现的几个工程亮点:
// folly 的使用接口非常简洁
auto holder = folly::makeHazardPointer();
auto *ptr = holder.protect(atomicPtr);
// ptr 在 holder 生命周期内保证有效
// holder 析构时自动清除 HPRAII 语义:通过
HazardPointerHolder 类型自动管理 HP
的获取和释放,避免忘记调用 hp_clear 导致的 HP
泄漏。
自适应 Scan 策略:folly 的实现根据退役列表的增长速率动态调整 scan 频率,在内存使用和 CPU 开销之间取得平衡。
批量回收:支持异步的批量回收路径,将 scan 操作卸载到后台线程,减少前台线程的延迟抖动。
缓存对齐:HP 槽位按缓存行对齐,避免 false sharing。
11.2 C++ P2530 提案
C++ 标准委员会正在讨论将 Hazard Pointers 纳入标准库(提案 P2530)。该提案由 Maged Michael 本人参与,定义了以下核心接口:
// P2530 提案接口(简化版)
namespace std {
template<typename T>
class hazard_pointer {
public:
hazard_pointer() noexcept;
hazard_pointer(hazard_pointer&&) noexcept;
~hazard_pointer();
// 保护一个原子指针
T* protect(const atomic<T*>& source) noexcept;
// 清除保护
void reset_protection() noexcept;
};
// 退役一个指针
template<typename T, typename D = default_delete<T>>
void hazard_pointer_retire(T* ptr, D deleter = {});
}P2530 的设计哲学是:提供一个极简的、不需要用户管理 domain 或线程注册的接口。 底层实现由标准库负责,允许不同平台提供针对硬件优化的实现。
截至 2026 年初,P2530 已经通过了 LEWG(Library Evolution Working Group)的审查,预计将在 C++26 或 C++29 中被纳入标准。
11.3 其他工业实现
| 实现 | 语言 | 特点 |
|---|---|---|
| folly::HazardPointer | C++ | 最成熟的生产级实现 |
| libcds | C++ | 学术风格,功能全面 |
| Crossbeam | Rust | 基于 epoch,非 HP,但值得对比 |
| haphazard | Rust | Rust 生态中的 HP 实现 |
| Java ConcurrentSkipListMap | Java | 内部使用类似 HP 的机制 |
| Go runtime | Go | GC 本身解决了 SMR 问题 |
十二、工程陷阱与最佳实践
在实际项目中使用 Hazard Pointers 时,有许多容易踩的坑。下面是我总结的经验表。
陷阱一览表
| 陷阱 | 症状 | 原因 | 解决方案 |
|---|---|---|---|
| 忘记验证步骤 | 偶发 use-after-free | 读取和发布之间的时间窗口 | 严格遵循”读取-发布-验证”三步协议 |
| fence 不够强 | 极低概率的数据损坏 | StoreLoad 重排导致 scan 看不到最新 HP | 使用 memory_order_seq_cst fence |
| HP 槽泄漏 | 内存持续增长,scan 无法回收 | 使用完 HP 后忘记 clear | 用 RAII 包装 HP(C++),或代码审查 |
| 退役列表永远达不到阈值 | 内存缓慢增长 | 线程数少导致阈值太低 | 设置合理的最小阈值(如 32) |
| 对退役节点的二次访问 | 段错误或数据损坏 | 退役后又通过旧指针访问 | 退役即意味着”放弃所有权” |
| 线程退出时残留退役节点 | 内存泄漏 | 退役列表中的节点被遗弃 | 线程退出时执行最终 scan 或转移给其他线程 |
| false sharing | 性能劣化 | HP 槽位没有对齐到缓存行 | 对 HP 数组做 alignas(64) 或 padding |
| 混用 HP domain | 逻辑错误 | 不同数据结构共享 HP 域导致过度保护 | 每个数据结构或子系统使用独立 domain |
| scan 中的 ABA | 误判节点安全 | 指针值复用(经过 free+malloc) | HP 本身防止 ABA,但要确保 retire 前正确断开节点 |
| 在 HP 保护下修改节点 | 数据竞争 | HP 只保护内存不被释放,不提供互斥 | HP 不是锁,并发修改仍需原子操作 |
调试技巧
- AddressSanitizer(ASan):用
-fsanitize=address编译可以检测 use-after-free,但在高并发下可能有误报。 - Valgrind + DRD/Helgrind:可以检测数据竞争,但性能开销很大(10-50 倍减速)。
- 压力测试:sum 校验(如上面栈的测试)是验证正确性的有效方法。如果存在 use-after-free,总和几乎一定会不匹配。
- 统计监控:在生产环境中,监控退役列表的平均长度和 scan 频率。如果退役列表持续增长,说明有 HP 泄漏或者 scan 阈值设置不合理。
十三、个人观点
写了这么多,说说我对 Hazard Pointers 的个人看法。
HP 是我见过的最”正确”的 SMR 方案。 它的正确性证明简洁优美:只要你遵循”读取-发布-验证”协议,就能保证内存安全。不像 EBR 那样依赖”没有线程会长时间停留在临界区”这种脆弱的假设,也不像 RCU 那样需要操作系统调度器的配合。
但 HP
的读取路径开销确实是它的阿喀琉斯之踵。 每次
protect 都需要一次 StoreLoad fence,这在 ARM
等弱序架构上可能要付出 50-100ns
的代价。在读密集的场景中,这个开销可能比数据结构本身的操作还要大。这就是为什么
RCU 在”读极多写极少”的场景中仍然不可替代。
在实际工程中,我更偏向于混合方案。 比如:对于读路径热的数据结构(路由表、配置缓存),用 RCU 或 EBR。对于写操作频繁且需要强安全保证的结构(并发队列、并发 map),用 HP。Meta 的 folly 库就是这种思路——它同时提供了 HP 和 RCU 两种方案。
C++ P2530 提案的推进让我很兴奋。 如果 HP 进入 C++ 标准库,意味着编译器和标准库实现者可以针对不同平台提供优化的 HP 实现。在 x86 上可以利用 TSO 内存模型减少 fence,在未来的硬件上甚至可能有专门的 HP 支持指令。
最后一点务实的建议:如果你不是在写基础库,不要自己实现 HP。直接用 folly、libcds 或者 haphazard 这样的成熟库。HP 的实现看起来不多,但内存序的细节极其容易出错,而且 bug 可能在百万次操作中才出现一次。
SMR 问题本质上反映了一个深层的计算机科学问题:在没有全局协调的情况下,如何确定一个资源不再被任何参与者使用。 这和分布式系统中的垃圾收集、TCP 中的 TIME_WAIT、甚至是现实世界中”什么时候可以拆除一栋大楼”的问题都是同构的。Hazard Pointers 给出的答案是:让每个参与者主动声明自己的依赖。 这个简单的思想,在很多领域都值得借鉴。
附录:参考文献
- Maged M. Michael. “Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects.” IEEE TPDS, 2004.
- Keir Fraser. “Practical lock-freedom.” PhD thesis, University of Cambridge, 2004.
- Paul E. McKenney, John D. Slingwine. “Read-Copy Update: Using Execution History to Solve Concurrency Problems.” PDCS, 1998.
- Pedro Ramalhete, Andreia Correia. “Brief Announcement: Hazard Eras - Non-Blocking Memory Reclamation.” SPAA, 2017.
- Haosen Wen et al. “Interval-Based Memory Reclamation.” PPoPP, 2018.
- Ruslan Nikolaev, Binoy Ravindran. “Hyaline: Fast and Transparent Lock-Free Memory Reclamation.” PODC, 2020.
- C++ P2530 提案:Hazard Pointers for C++26. https://wg21.link/P2530
- Meta folly 库:https://github.com/facebook/folly
上一篇: 并发跳表 下一篇: Epoch-Based Reclamation
相关阅读: - 无锁队列 - RCU:Linux 内核的读侧零开销并发