土法炼钢兴趣小组的算法知识备份

Hazard Pointers:安全内存回收的优雅方案

目录

写无锁数据结构的人,迟早会撞上同一堵墙:你什么时候才能安全地释放内存?

假设你在实现一个无锁队列。线程 A 通过 CAS 成功地把一个节点从队列中摘除了,正准备调用 free()。但此刻,线程 B 正拿着一个指向同一个节点的指针,准备读取它的 next 字段。如果 A 先 free() 了,B 读到的就是垃圾数据——这就是经典的 use-after-free,而且在并发场景下,它几乎不可能通过测试复现,只会在凌晨三点的生产环境里暴露。

这个问题被称为 Safe Memory Reclamation(SMR),它是所有无锁编程中最棘手的问题之一。2004 年,Maged Michael 在他的论文 “Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects” 中提出了一个优雅的解决方案。这个方案的核心思想简单得令人惊讶:让每个线程公开声明它正在访问哪些指针。

本文将从问题出发,完整地讲解 Hazard Pointers 的设计、实现和工程实践。


一、问题的本质:为什么 free() 在无锁世界中如此危险

在有锁的世界里,内存回收不是问题。持有锁的线程知道没有其他线程在访问被保护的数据,释放内存是安全的。但在无锁数据结构中,多个线程可以同时读取同一个节点,而没有任何互斥机制来协调它们

让我们用一个最简单的场景来说明。考虑一个单链表:

HEAD -> [A] -> [B] -> [C] -> NULL

线程 1 想要删除节点 A,线程 2 正在遍历链表。

时间线:
  T1                          T2
  |                           |
  | CAS(HEAD, A, B) 成功      |
  |                           | ptr = HEAD  (读到了 A)
  | free(A)                   |
  |                           | val = ptr->data  // BOOM!
  |                           | use-after-free

问题在于,CAS 操作和内存释放之间没有因果关系。CAS 只保证 HEAD 的更新是原子的,但它无法阻止其他线程持有旧指针。

ABA 问题的内存视角

这个问题和 ABA 问题密切相关,但更加根本。ABA 问题关注的是指针值的复用——一个被释放的地址可能被重新分配,导致 CAS 成功但语义错误。而内存回收问题更基础:即使没有 ABA,单纯的 use-after-free 就已经是未定义行为了。

在带 GC 的语言(Java、Go)中,这个问题”自动消失”了——GC 保证只要有引用存在,对象就不会被回收。但在 C/C++ 中,我们必须手动解决它。


二、一个具体的例子:Michael-Scott 队列中的 use-after-free

Michael-Scott 队列是最经典的无锁队列实现之一。它的 dequeue 操作大致如下:

typedef struct node {
    void *data;
    struct node *next;
} node_t;

typedef struct queue {
    node_t *head;  // 哨兵节点
    node_t *tail;
} queue_t;

void *dequeue(queue_t *q) {
    node_t *head, *tail, *next;
    void *data;

    while (1) {
        head = q->head;                          // 1. 读 head
        tail = q->tail;                          // 2. 读 tail
        next = head->next;                       // 3. 读 head->next  <-- 危险!

        if (head != q->head)                     // 4. 一致性检查
            continue;

        if (head == tail) {
            if (next == NULL)
                return NULL;                     // 队列为空
            CAS(&q->tail, tail, next);           // 帮助推进 tail
            continue;
        }

        data = next->data;                       // 5. 读数据       <-- 危险!
        if (CAS(&q->head, head, next))           // 6. 推进 head
            break;
    }

    // head 现在是旧的哨兵节点,可以"释放"
    // 但是,其他线程可能还在步骤 1-5 中持有 head 的引用!
    free(head);  // <-- 这行代码是不安全的
    return data;
}

注意步骤 3:next = head->next。当线程执行这一行时,head 可能已经被另一个线程删除并释放了。如果 head 指向的内存已经被归还给操作系统或者被其他 malloc 调用复用,那么 head->next 的解引用就是灾难性的。

这就是为什么我们需要一种机制来推迟内存释放,直到确定没有线程在访问它。


三、Hazard Pointers 的核心思想

Maged Michael 的 Hazard Pointers 方案基于一个简洁的协议:

在访问一个共享节点之前,线程必须将该节点的地址发布到一个全局可见的 hazard pointer 槽中。在回收内存之前,回收者必须检查所有线程的 hazard pointer,确保没有线程正在访问待回收的节点。

整个机制由三个部分组成:

3.1 Hazard Pointer 数组

每个线程拥有固定数量(通常是 1-2 个)的 hazard pointer 槽位。这些槽位组成一个全局数组,任何线程都可以读取。

Global HP Array:
  Thread 0: [hp0 = 0xA000] [hp1 = NULL    ]
  Thread 1: [hp0 = 0xC000] [hp1 = 0xA000  ]
  Thread 2: [hp0 = NULL  ] [hp1 = 0xB000  ]

3.2 Retire 操作

当一个线程从数据结构中移除一个节点后,它不直接 free() 该节点,而是将其放入本线程的 retired list(退役列表)。

void retire(hp_domain_t *domain, void *ptr) {
    // 加入本线程的退役列表
    add_to_retired_list(ptr);

    // 如果退役列表超过阈值,触发扫描
    if (retired_count >= SCAN_THRESHOLD)
        scan(domain);
}

3.3 Scan 操作

这是 Hazard Pointers 最精妙的部分。当退役列表积累到一定程度,线程执行一次 scan:

  1. 收集:读取所有线程的所有 hazard pointer,形成一个保护集合 plist
  2. 比对:遍历自己的退役列表,检查每个节点是否在 plist 中。
  3. 回收:不在 plist 中的节点可以安全释放;在 plist 中的节点保留在退役列表中等待下次扫描。

下图展示了 Scan 算法的完整流程:

Hazard Pointer Scan 算法流程

四、Hazard Pointer 的使用协议

使用 Hazard Pointer 保护一个共享指针的标准协议如下:

// 安全地读取一个共享指针
node_t *protect(atomic_ptr *source, hazard_pointer_t *hp) {
    node_t *ptr;
    do {
        ptr = atomic_load(source);            // 1. 读取共享指针
        atomic_store(hp, ptr);                // 2. 发布到 HP 槽
        // 需要 StoreLoad barrier
        atomic_thread_fence(memory_order_seq_cst);
    } while (ptr != atomic_load(source));     // 3. 验证指针未变
    return ptr;
}

这个 “读取-发布-验证” 的三步协议是 Hazard Pointers 正确性的基础。为什么需要验证?因为在步骤 1 和步骤 2 之间,存在一个时间窗口:

线程 A(读取者)          线程 B(回收者)
  ptr = load(source)       
                           remove node from structure
                           retire(node)
                           scan()  // 此时 A 的 HP 还是旧值
                           // node 不在任何 HP 中
                           free(node)  // 释放了!
  store(hp, ptr)           
  // 太晚了,发布了一个已被释放的地址

验证步骤(re-read source)确保:如果指针在发布之前已经改变(意味着节点可能已经被 retire),我们会重新开始。如果指针没变,说明节点还在数据结构中,而此时我们的 HP 已经发布,任何后续的 scan 都会看到它。

内存序要求

步骤 2 和步骤 3 之间的 StoreLoad fence 是关键的。它确保: - HP 的写入对其他线程可见(在 scan 读取 HP 之前)。 - 步骤 3 的读取不会被重排到步骤 2 之前。

在 x86 架构上,普通的 store 就包含 StoreStore 语义,但 StoreLoad 需要显式的 mfence 或者使用 memory_order_seq_cst。在 ARM/RISC-V 等弱序架构上,这个 fence 的成本更高。


五、Scan 算法详解

下面是 Scan 算法的伪代码,忠实于 Michael 的原始论文:

procedure Scan(hp_domain):
    // 阶段 1:收集所有活跃的 hazard pointers
    plist = {}
    for each thread T in hp_domain:
        for each hp in T.hazard_pointers:
            ptr = load(hp)
            if ptr != NULL:
                plist.insert(ptr)

    // 阶段 2:对 plist 排序(用于二分查找)
    sort(plist)

    // 阶段 3:扫描退役列表
    new_retired = {}
    for each node in my_retired_list:
        if binary_search(plist, node):
            // 被保护,不能释放
            new_retired.append(node)
        else:
            // 安全,释放
            free(node)

    my_retired_list = new_retired

复杂度分析

设 N 为线程数,H 为每线程 hazard pointer 个数,R 为退役列表长度。

如果我们设置阈值 R >= 2 * N * H,那么每次 scan 至少能回收 R - N*H >= R/2 个节点。这意味着:

为什么阈值是 2NH

这是一个精巧的设计。在任何时刻,最多有 N * H 个节点被 hazard pointer 保护。所以如果退役列表有 2NH 个节点,至少有 NH 个节点是可以安全释放的。这保证了每次 scan 都是”值得的”——我们永远不会白白做一次 scan 却什么都释放不了。


六、完整的 C 实现

下面是一个功能完整的 Hazard Pointer 库实现。代码经过精心设计,适合在实际项目中参考使用。

/* hazard_pointer.h */
#ifndef HAZARD_POINTER_H
#define HAZARD_POINTER_H

#include <stdatomic.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

/* ========== 配置常量 ========== */

#define HP_MAX_THREADS    64
#define HP_MAX_HPS        2       /* 每个线程的 hazard pointer 数量 */
#define HP_SCAN_THRESHOLD 32      /* retired list 阈值 */

/* ========== 数据结构 ========== */

/* 退役节点链表 */
typedef struct hp_retired_node {
    void *ptr;
    void (*deleter)(void *);
    struct hp_retired_node *next;
} hp_retired_node_t;

/* 每线程记录 */
typedef struct hp_thread_record {
    _Atomic(void *) hp[HP_MAX_HPS];       /* hazard pointer 槽位 */
    _Atomic(bool)   active;               /* 是否被某线程持有 */
    hp_retired_node_t *retired_list;       /* 退役列表(线程本地) */
    int retired_count;                     /* 退役列表长度 */
    struct hp_thread_record *next;         /* 全局链表 */
} hp_thread_record_t;

/* Hazard Pointer 域 */
typedef struct hp_domain {
    _Atomic(hp_thread_record_t *) head;   /* 线程记录链表头 */
    _Atomic(int) thread_count;            /* 活跃线程数 */
} hp_domain_t;

/* ========== 全局域 ========== */

static hp_domain_t g_hp_domain = {
    .head = NULL,
    .thread_count = 0
};

/* 线程局部存储:每个线程的记录 */
static _Thread_local hp_thread_record_t *tl_my_record = NULL;

/* ========== 线程注册与注销 ========== */

/*
 * hp_thread_acquire - 获取一个线程记录
 * 优先复用已注销线程的记录,否则分配新的。
 */
static hp_thread_record_t *hp_thread_acquire(hp_domain_t *domain) {
    hp_thread_record_t *rec;

    /* 尝试复用已有的非活跃记录 */
    rec = atomic_load_explicit(&domain->head, memory_order_acquire);
    while (rec != NULL) {
        bool expected = false;
        if (atomic_compare_exchange_strong_explicit(
                &rec->active, &expected, true,
                memory_order_acq_rel, memory_order_relaxed)) {
            /* 成功获取一个已有记录 */
            return rec;
        }
        rec = rec->next;
    }

    /* 没有可复用的记录,分配新的 */
    rec = (hp_thread_record_t *)calloc(1, sizeof(hp_thread_record_t));
    if (!rec) return NULL;

    atomic_store_explicit(&rec->active, true, memory_order_relaxed);
    for (int i = 0; i < HP_MAX_HPS; i++) {
        atomic_store_explicit(&rec->hp[i], NULL, memory_order_relaxed);
    }
    rec->retired_list = NULL;
    rec->retired_count = 0;

    /* 用 CAS 插入全局链表头部 */
    hp_thread_record_t *old_head;
    do {
        old_head = atomic_load_explicit(&domain->head, memory_order_acquire);
        rec->next = old_head;
    } while (!atomic_compare_exchange_weak_explicit(
                &domain->head, &old_head, rec,
                memory_order_acq_rel, memory_order_relaxed));

    atomic_fetch_add_explicit(&domain->thread_count, 1, memory_order_relaxed);
    return rec;
}

/*
 * hp_thread_release - 注销当前线程的记录
 */
static void hp_thread_release(hp_thread_record_t *rec) {
    for (int i = 0; i < HP_MAX_HPS; i++) {
        atomic_store_explicit(&rec->hp[i], NULL, memory_order_release);
    }
    atomic_store_explicit(&rec->active, false, memory_order_release);
}

/* ========== Hazard Pointer 操作 ========== */

/*
 * hp_protect - "读取-发布-验证"三步协议
 * 安全地获取一个共享指针的快照并发布 hazard pointer。
 */
static void *hp_protect(int hp_index, _Atomic(void *) *source,
                        hp_thread_record_t *rec) {
    void *ptr;
    do {
        ptr = atomic_load_explicit(source, memory_order_acquire);
        atomic_store_explicit(&rec->hp[hp_index], ptr, memory_order_release);
        atomic_thread_fence(memory_order_seq_cst);
    } while (ptr != atomic_load_explicit(source, memory_order_acquire));
    return ptr;
}

/*
 * hp_clear - 清除一个 hazard pointer 槽
 */
static void hp_clear(int hp_index, hp_thread_record_t *rec) {
    atomic_store_explicit(&rec->hp[hp_index], NULL, memory_order_release);
}

/* ========== Scan 算法 ========== */

/*
 * pointer 比较函数,用于 qsort 和 bsearch
 */
static int ptr_compare(const void *a, const void *b) {
    uintptr_t pa = (uintptr_t)(*(void **)a);
    uintptr_t pb = (uintptr_t)(*(void **)b);
    if (pa < pb) return -1;
    if (pa > pb) return 1;
    return 0;
}

/*
 * hp_scan - 扫描并回收安全的退役节点
 */
static void hp_scan(hp_domain_t *domain, hp_thread_record_t *rec) {
    int hp_count = 0;
    int max_hps = HP_MAX_THREADS * HP_MAX_HPS;
    void **plist = (void **)malloc(sizeof(void *) * max_hps);
    if (!plist) return;

    /* 阶段 1:收集所有活跃的 hazard pointers */
    hp_thread_record_t *cur = atomic_load_explicit(
        &domain->head, memory_order_acquire);
    while (cur != NULL) {
        if (atomic_load_explicit(&cur->active, memory_order_acquire)) {
            for (int i = 0; i < HP_MAX_HPS; i++) {
                void *hp = atomic_load_explicit(
                    &cur->hp[i], memory_order_acquire);
                if (hp != NULL) {
                    plist[hp_count++] = hp;
                }
            }
        }
        cur = cur->next;
    }

    /* 阶段 2:排序,用于后续二分查找 */
    qsort(plist, hp_count, sizeof(void *), ptr_compare);

    /* 阶段 3:扫描退役列表 */
    hp_retired_node_t *new_list = NULL;
    int new_count = 0;

    hp_retired_node_t *node = rec->retired_list;
    while (node != NULL) {
        hp_retired_node_t *next = node->next;

        /* 二分查找:该节点是否被任何 HP 保护? */
        bool is_protected = (hp_count > 0) &&
            bsearch(&node->ptr, plist, hp_count,
                    sizeof(void *), ptr_compare) != NULL;

        if (is_protected) {
            /* 被保护,保留在退役列表 */
            node->next = new_list;
            new_list = node;
            new_count++;
        } else {
            /* 安全,调用 deleter 释放 */
            node->deleter(node->ptr);
            free(node);
        }

        node = next;
    }

    rec->retired_list = new_list;
    rec->retired_count = new_count;

    free(plist);
}

/* ========== Retire 操作 ========== */

/*
 * hp_retire - 将一个节点加入退役列表
 * 当退役列表超过阈值时触发 scan。
 */
static void hp_retire(hp_domain_t *domain, hp_thread_record_t *rec,
                      void *ptr, void (*deleter)(void *)) {
    hp_retired_node_t *rnode =
        (hp_retired_node_t *)malloc(sizeof(hp_retired_node_t));
    if (!rnode) {
        deleter(ptr);
        return;
    }

    rnode->ptr = ptr;
    rnode->deleter = deleter;
    rnode->next = rec->retired_list;
    rec->retired_list = rnode;
    rec->retired_count++;

    int threshold = 2 * atomic_load_explicit(
        &domain->thread_count, memory_order_relaxed) * HP_MAX_HPS;
    if (threshold < HP_SCAN_THRESHOLD)
        threshold = HP_SCAN_THRESHOLD;

    if (rec->retired_count >= threshold) {
        hp_scan(domain, rec);
    }
}

/* ========== 便捷接口 ========== */

/*
 * hp_init - 初始化当前线程
 * 必须在线程开始使用 HP 前调用。
 */
static inline void hp_init(void) {
    if (tl_my_record == NULL) {
        tl_my_record = hp_thread_acquire(&g_hp_domain);
    }
}

/*
 * hp_deinit - 注销当前线程
 * 在线程退出前调用,会尝试回收所有退役节点。
 */
static inline void hp_deinit(void) {
    if (tl_my_record != NULL) {
        /* 尝试最后一次 scan */
        hp_scan(&g_hp_domain, tl_my_record);
        /* 剩余无法释放的节点,交给其他线程处理(简化处理:此处不转移) */
        hp_thread_release(tl_my_record);
        tl_my_record = NULL;
    }
}

#endif /* HAZARD_POINTER_H */

这段实现大约 220 行,涵盖了 Hazard Pointer 的全部核心逻辑。几个值得注意的设计决策:

  1. 线程记录复用:通过 active 标志和 CAS,避免了为每个线程分配新的记录。这在线程池环境中很重要。
  2. 动态阈值:scan 阈值基于当前线程数动态计算,而不是写死。
  3. 自定义 deleter:通过函数指针支持不同类型节点的释放策略。
  4. 头插法链表:线程记录用无锁链表组织,插入是 O(1) 的。

七、集成示例:带 Hazard Pointers 的无锁栈

让我们把上面的 HP 库集成到一个完整的无锁栈中,展示实际使用方式。

/* lockfree_stack.c */
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include "hazard_pointer.h"

/* ========== 无锁栈定义 ========== */

typedef struct stack_node {
    int value;
    struct stack_node *next;
} stack_node_t;

typedef struct lockfree_stack {
    _Atomic(void *) top;
} lockfree_stack_t;

static void stack_init(lockfree_stack_t *s) {
    atomic_store(&s->top, NULL);
}

static void stack_push(lockfree_stack_t *s, int value) {
    stack_node_t *node = (stack_node_t *)malloc(sizeof(stack_node_t));
    node->value = value;

    stack_node_t *old_top;
    do {
        old_top = atomic_load_explicit(&s->top, memory_order_acquire);
        node->next = old_top;
    } while (!atomic_compare_exchange_weak_explicit(
                &s->top, (void **)&old_top, node,
                memory_order_acq_rel, memory_order_relaxed));
}

static bool stack_pop(lockfree_stack_t *s, int *out_value) {
    hp_thread_record_t *rec = tl_my_record;
    stack_node_t *top;

    while (1) {
        /* 用 HP 保护 top 节点 */
        top = (stack_node_t *)hp_protect(0, &s->top, rec);
        if (top == NULL) {
            hp_clear(0, rec);
            return false;  /* 栈为空 */
        }

        stack_node_t *next = top->next;

        if (atomic_compare_exchange_weak_explicit(
                &s->top, (void **)&top, next,
                memory_order_acq_rel, memory_order_relaxed)) {
            /* CAS 成功,摘除了 top 节点 */
            *out_value = top->value;
            hp_clear(0, rec);

            /* 不是 free(top),而是 retire */
            hp_retire(&g_hp_domain, rec, top, free);
            return true;
        }
        /* CAS 失败,重试(HP 会在下一轮循环中更新) */
    }
}

/* ========== 测试 ========== */

#define NUM_THREADS   8
#define OPS_PER_THREAD 100000

static lockfree_stack_t g_stack;
static _Atomic(long) g_push_sum = 0;
static _Atomic(long) g_pop_sum = 0;

static void *worker(void *arg) {
    int tid = (int)(long)arg;
    hp_init();

    long local_push_sum = 0;
    long local_pop_sum = 0;

    for (int i = 0; i < OPS_PER_THREAD; i++) {
        if (i % 2 == 0) {
            int val = tid * OPS_PER_THREAD + i;
            stack_push(&g_stack, val);
            local_push_sum += val;
        } else {
            int val;
            if (stack_pop(&g_stack, &val)) {
                local_pop_sum += val;
            }
        }
    }

    atomic_fetch_add(&g_push_sum, local_push_sum);
    atomic_fetch_add(&g_pop_sum, local_pop_sum);

    hp_deinit();
    return NULL;
}

int main(void) {
    stack_init(&g_stack);

    pthread_t threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_create(&threads[i], NULL, worker, (void *)(long)i);
    }
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_join(threads[i], NULL);
    }

    /* 弹出栈中剩余元素 */
    hp_init();
    long drain_sum = 0;
    int val;
    while (stack_pop(&g_stack, &val)) {
        drain_sum += val;
    }
    hp_deinit();

    long total_push = atomic_load(&g_push_sum);
    long total_pop = atomic_load(&g_pop_sum) + drain_sum;

    printf("push sum = %ld\n", total_push);
    printf("pop  sum = %ld\n", total_pop);
    printf("match: %s\n", total_push == total_pop ? "YES" : "NO");

    return 0;
}

编译和运行:

gcc -std=c11 -O2 -pthread lockfree_stack.c -o lockfree_stack
./lockfree_stack

这个测试验证了一个关键不变量:所有 push 进去的值之和,等于所有 pop 出来的值之和。 如果 HP 实现有 bug 导致 use-after-free 或者重复释放,这个不变量会被破坏(或者程序直接崩溃)。


八、性能特征分析

Hazard Pointers 的性能特征需要从读取路径和回收路径两个维度分析。

8.1 读取路径开销

每次 hp_protect 调用的成本:

操作 成本 说明
第一次 atomic_load 1 次原子读 读取共享指针
atomic_store 到 HP 槽 1 次原子写 发布 hazard pointer
atomic_thread_fence(seq_cst) 1 次 StoreLoad fence 最昂贵的部分
验证 atomic_load 1 次原子读 通常不需要重试

在 x86 上,seq_cst fence 编译为 mfence 指令,大约 30-50ns。在 ARM 上,这个 fence 的成本更高(dmb ish),大约 50-100ns。

关键洞察:这个开销是每次读取操作都要付出的。对于读密集的数据结构(如无锁哈希表的 lookup),这可能是显著的瓶颈。

8.2 回收路径开销(摊还)

假设 N 个线程,每个线程 H 个 HP:

参数
Scan 触发阈值 R = 2NH
每次 Scan 的原子读 NH 次
排序 O(NH log(NH))
二分查找 O(R log(NH))
每节点摊还成本 O(log(NH))

8.3 空间开销

这是 Hazard Pointers 的一个弱点。在任何时刻,系统中可能有 O(N^2 * H) 个未释放的退役节点(每个线程最多积累 2NH 个)。对于大量线程的场景,这可能导致显著的内存浪费。


九、与其他 SMR 方案的对比

Hazard Pointers 不是唯一的 SMR 方案。下面是三种主流方案的全面对比。

9.1 对比表

特性 Hazard Pointers Epoch-Based Reclamation RCU
发明者 Maged Michael (2004) Keir Fraser (2004) Paul McKenney (1998)
保护粒度 每个指针 整个临界区 整个读侧临界区
读取开销 原子写 + fence + 验证 仅一次原子操作(进入 epoch) 几乎为零(compiler barrier)
回收延迟 有界:O(N^2*H) 无界(一个慢线程可阻塞所有回收) 有界(grace period)
实现复杂度 中等 简单 复杂(需要 OS 支持)
适合场景 通用无锁数据结构 短临界区,可控线程 读极多写极少
线程可以阻塞? 可以(不影响其他线程回收) 不可以(会阻塞全局回收) 不可以(用户态 RCU 不可以)
ABA 保护 是(延迟释放自然防止 ABA)

9.2 Epoch-Based Reclamation(EBR)

EBR 的核心思想更简单:维护一个全局 epoch 计数器。线程在进入临界区时记录当前 epoch,退出时清除。只有当所有线程都经过了某个 epoch 之后,该 epoch 之前退役的节点才能被释放。

                Epoch 0        Epoch 1        Epoch 2
               +---------+   +---------+   +---------+
全局 epoch:    |    0    | -> |    1    | -> |    2    |
               +---------+   +---------+   +---------+
                    |              |              |
Thread 0:     [in epoch 0]  [in epoch 1]        idle
Thread 1:         idle      [in epoch 1]   [in epoch 2]
                    |
           当所有线程都离开 epoch 0 后,
           epoch 0 的退役节点可以释放

EBR 的最大优势是读取路径几乎没有开销(不需要 per-pointer 的 fence)。最大弱点是:如果一个线程长时间停留在一个 epoch 中(比如被抢占、执行了一个很长的操作),所有 epoch 的回收都会被阻塞,导致无界的内存增长。

9.3 RCU(Read-Copy-Update)

RCU 专为”读多写少”场景设计。读侧临界区(rcu_read_lock() / rcu_read_unlock())几乎没有开销——在 Linux 内核中,它们编译为空操作或者仅仅是一个编译器屏障。写侧通过 synchronize_rcu() 等待一个 grace period,确保所有正在进行的读侧临界区都已结束。

RCU 的核心限制是:它依赖于调度器的协作来确定 grace period 的结束。在用户态实现中(如 liburcu),这通常意味着读侧临界区不能包含阻塞操作。

9.4 选择建议


十、现代发展:Hazard Eras 与 Interval-Based Reclamation

Hazard Pointers 的原始设计已经有 20 多年了。研究者们在此基础上做了许多改进。

10.1 Hazard Eras(2017)

Hazard Eras 由 Ramalhete 和 Correia 提出,是 Hazard Pointers 和 EBR 的混合体。核心思想是:用时间戳(era)替代精确的指针值来标记保护范围。

传统 HP:  "我正在访问地址 0xA000"      --> 精确保护
Hazard Eras:"我正在访问 era 42 时存活的节点" --> 范围保护

每个共享节点在被加入数据结构时打上当前 era 的时间戳,退役时也打上时间戳。线程在进入临界区时发布当前 era。扫描时,只要一个节点的”出生 era”到”退役 era”范围与某个线程的活跃 era 重叠,它就不能被回收。

Hazard Eras 的优势: - 读取路径更便宜:只需发布一个 era 值(一次原子写),不需要 per-pointer 的 fence。 - 仍然有界:不像 EBR 那样可能无限积累。

代价是牺牲了一些回收精度——可能会多保留一些实际上已经安全的节点。

10.2 Interval-Based Reclamation(IBR,2018)

Wen 等人提出的 IBR 进一步泛化了时间戳方案。每个节点关联一个”保护区间”[birth, retire],每个线程关联一个”活跃区间”[enter, exit]。只要两个区间不重叠,节点就可以安全回收。

IBR 被认为是 HP 和 EBR 之间最好的平衡点之一,在实践中表现出色。

10.3 Hyaline(2020)

Nikolaev 和 Bhatt 提出的 Hyaline 使用引用计数的变体来实现 SMR。它的回收延迟极低(接近 HP),但读取路径开销更小。Hyaline 的设计比 HP 更复杂,但在某些工作负载下性能显著更好。


十一、工业实践:folly::HazardPointer 与 C++ P2530

11.1 Meta 的 folly::HazardPointer

Meta(原 Facebook)的 Folly 库包含了一个生产级的 Hazard Pointer 实现。它被广泛用于 Meta 的基础设施中,包括 folly::ConcurrentHashMapfolly::AtomicUnorderedInsertMap

folly 实现的几个工程亮点:

// folly 的使用接口非常简洁
auto holder = folly::makeHazardPointer();
auto *ptr = holder.protect(atomicPtr);
// ptr 在 holder 生命周期内保证有效
// holder 析构时自动清除 HP

RAII 语义:通过 HazardPointerHolder 类型自动管理 HP 的获取和释放,避免忘记调用 hp_clear 导致的 HP 泄漏。

自适应 Scan 策略:folly 的实现根据退役列表的增长速率动态调整 scan 频率,在内存使用和 CPU 开销之间取得平衡。

批量回收:支持异步的批量回收路径,将 scan 操作卸载到后台线程,减少前台线程的延迟抖动。

缓存对齐:HP 槽位按缓存行对齐,避免 false sharing。

11.2 C++ P2530 提案

C++ 标准委员会正在讨论将 Hazard Pointers 纳入标准库(提案 P2530)。该提案由 Maged Michael 本人参与,定义了以下核心接口:

// P2530 提案接口(简化版)
namespace std {
    template<typename T>
    class hazard_pointer {
    public:
        hazard_pointer() noexcept;
        hazard_pointer(hazard_pointer&&) noexcept;
        ~hazard_pointer();

        // 保护一个原子指针
        T* protect(const atomic<T*>& source) noexcept;

        // 清除保护
        void reset_protection() noexcept;
    };

    // 退役一个指针
    template<typename T, typename D = default_delete<T>>
    void hazard_pointer_retire(T* ptr, D deleter = {});
}

P2530 的设计哲学是:提供一个极简的、不需要用户管理 domain 或线程注册的接口。 底层实现由标准库负责,允许不同平台提供针对硬件优化的实现。

截至 2026 年初,P2530 已经通过了 LEWG(Library Evolution Working Group)的审查,预计将在 C++26 或 C++29 中被纳入标准。

11.3 其他工业实现

实现 语言 特点
folly::HazardPointer C++ 最成熟的生产级实现
libcds C++ 学术风格,功能全面
Crossbeam Rust 基于 epoch,非 HP,但值得对比
haphazard Rust Rust 生态中的 HP 实现
Java ConcurrentSkipListMap Java 内部使用类似 HP 的机制
Go runtime Go GC 本身解决了 SMR 问题

十二、工程陷阱与最佳实践

在实际项目中使用 Hazard Pointers 时,有许多容易踩的坑。下面是我总结的经验表。

陷阱一览表

陷阱 症状 原因 解决方案
忘记验证步骤 偶发 use-after-free 读取和发布之间的时间窗口 严格遵循”读取-发布-验证”三步协议
fence 不够强 极低概率的数据损坏 StoreLoad 重排导致 scan 看不到最新 HP 使用 memory_order_seq_cst fence
HP 槽泄漏 内存持续增长,scan 无法回收 使用完 HP 后忘记 clear 用 RAII 包装 HP(C++),或代码审查
退役列表永远达不到阈值 内存缓慢增长 线程数少导致阈值太低 设置合理的最小阈值(如 32)
对退役节点的二次访问 段错误或数据损坏 退役后又通过旧指针访问 退役即意味着”放弃所有权”
线程退出时残留退役节点 内存泄漏 退役列表中的节点被遗弃 线程退出时执行最终 scan 或转移给其他线程
false sharing 性能劣化 HP 槽位没有对齐到缓存行 对 HP 数组做 alignas(64) 或 padding
混用 HP domain 逻辑错误 不同数据结构共享 HP 域导致过度保护 每个数据结构或子系统使用独立 domain
scan 中的 ABA 误判节点安全 指针值复用(经过 free+malloc) HP 本身防止 ABA,但要确保 retire 前正确断开节点
在 HP 保护下修改节点 数据竞争 HP 只保护内存不被释放,不提供互斥 HP 不是锁,并发修改仍需原子操作

调试技巧

  1. AddressSanitizer(ASan):用 -fsanitize=address 编译可以检测 use-after-free,但在高并发下可能有误报。
  2. Valgrind + DRD/Helgrind:可以检测数据竞争,但性能开销很大(10-50 倍减速)。
  3. 压力测试:sum 校验(如上面栈的测试)是验证正确性的有效方法。如果存在 use-after-free,总和几乎一定会不匹配。
  4. 统计监控:在生产环境中,监控退役列表的平均长度和 scan 频率。如果退役列表持续增长,说明有 HP 泄漏或者 scan 阈值设置不合理。

十三、个人观点

写了这么多,说说我对 Hazard Pointers 的个人看法。

HP 是我见过的最”正确”的 SMR 方案。 它的正确性证明简洁优美:只要你遵循”读取-发布-验证”协议,就能保证内存安全。不像 EBR 那样依赖”没有线程会长时间停留在临界区”这种脆弱的假设,也不像 RCU 那样需要操作系统调度器的配合。

但 HP 的读取路径开销确实是它的阿喀琉斯之踵。 每次 protect 都需要一次 StoreLoad fence,这在 ARM 等弱序架构上可能要付出 50-100ns 的代价。在读密集的场景中,这个开销可能比数据结构本身的操作还要大。这就是为什么 RCU 在”读极多写极少”的场景中仍然不可替代。

在实际工程中,我更偏向于混合方案。 比如:对于读路径热的数据结构(路由表、配置缓存),用 RCU 或 EBR。对于写操作频繁且需要强安全保证的结构(并发队列、并发 map),用 HP。Meta 的 folly 库就是这种思路——它同时提供了 HP 和 RCU 两种方案。

C++ P2530 提案的推进让我很兴奋。 如果 HP 进入 C++ 标准库,意味着编译器和标准库实现者可以针对不同平台提供优化的 HP 实现。在 x86 上可以利用 TSO 内存模型减少 fence,在未来的硬件上甚至可能有专门的 HP 支持指令。

最后一点务实的建议:如果你不是在写基础库,不要自己实现 HP。直接用 folly、libcds 或者 haphazard 这样的成熟库。HP 的实现看起来不多,但内存序的细节极其容易出错,而且 bug 可能在百万次操作中才出现一次。

SMR 问题本质上反映了一个深层的计算机科学问题:在没有全局协调的情况下,如何确定一个资源不再被任何参与者使用。 这和分布式系统中的垃圾收集、TCP 中的 TIME_WAIT、甚至是现实世界中”什么时候可以拆除一栋大楼”的问题都是同构的。Hazard Pointers 给出的答案是:让每个参与者主动声明自己的依赖。 这个简单的思想,在很多领域都值得借鉴。


附录:参考文献

  1. Maged M. Michael. “Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects.” IEEE TPDS, 2004.
  2. Keir Fraser. “Practical lock-freedom.” PhD thesis, University of Cambridge, 2004.
  3. Paul E. McKenney, John D. Slingwine. “Read-Copy Update: Using Execution History to Solve Concurrency Problems.” PDCS, 1998.
  4. Pedro Ramalhete, Andreia Correia. “Brief Announcement: Hazard Eras - Non-Blocking Memory Reclamation.” SPAA, 2017.
  5. Haosen Wen et al. “Interval-Based Memory Reclamation.” PPoPP, 2018.
  6. Ruslan Nikolaev, Binoy Ravindran. “Hyaline: Fast and Transparent Lock-Free Memory Reclamation.” PODC, 2020.
  7. C++ P2530 提案:Hazard Pointers for C++26. https://wg21.link/P2530
  8. Meta folly 库:https://github.com/facebook/folly

上一篇: 并发跳表 下一篇: Epoch-Based Reclamation

相关阅读: - 无锁队列 - RCU:Linux 内核的读侧零开销并发


By .