土法炼钢兴趣小组的算法知识备份

Epoch-Based Reclamation:Crossbeam 的实现之道

目录

一、旧题重提:无锁数据结构的内存回收困境

在有锁的世界里,内存回收不是什么难事。当一个线程持有互斥锁并删除某个节点时,它可以确信没有其他线程正在访问该节点,因此可以安全地释放内存。但在无锁(lock-free)数据结构中,情况发生了根本性的变化。

考虑一个无锁链表的删除操作:

// 线程 A 正在遍历链表,刚刚读取了指向节点 N 的指针
node_t *curr = atomic_load(&prev->next);  // curr 指向节点 N

// 与此同时,线程 B 通过 CAS 操作将节点 N 从链表中摘除
atomic_compare_exchange(&prev->next, &curr, curr->next);

// 问题来了:线程 B 现在能释放节点 N 的内存吗?
// 不能!因为线程 A 仍然持有指向 N 的指针
free(curr);  // 危险!线程 A 随时可能解引用这个指针

这就是著名的内存回收问题(Memory Reclamation Problem)。它的核心矛盾在于:

  1. 节点已经从数据结构中逻辑删除,不应该再被新的遍历者看到;
  2. 但已经获取了该节点指针的线程,可能在任意时刻解引用它;
  3. 我们无法确切知道何时所有这样的线程都”用完了”该指针。

这个问题的本质是一个安全回收点的判定问题:我们需要一种机制来确定,什么时候一个已被逻辑删除的节点可以安全地被物理释放。

学术界和工业界提出了多种解决方案:

方案 核心思想 代表实现
引用计数 跟踪每个对象的引用数 std::shared_ptr
Hazard Pointers 每个线程公布当前正在保护的指针 Maged Michael, 2004
Epoch-Based Reclamation 通过纪元推进来批量确定安全回收点 Keir Fraser, 2004
RCU 等待所有读者通过静默状态 Linux Kernel
标签指针 在指针中嵌入版本号防止 ABA 各类无锁算法

其中,Epoch-Based Reclamation(以下简称 EBR)以其简洁的设计和优秀的读侧性能,成为了实践中最受欢迎的方案之一。Rust 生态中的 crossbeam-epoch 库正是 EBR 的经典实现。

本文将从 EBR 的原始设计出发,逐步深入到 Crossbeam 的工程实现,并提供一个完整的 C 语言参考实现。

二、纪元的直觉:用”时代”划分安全边界

EBR 的核心直觉非常朴素:如果我们能确认所有线程都已经”翻过了某一页”,那么在那一页之前被删除的对象就可以安全回收了。

2.1 全局纪元与线程本地纪元

EBR 维护两层状态:

全局纪元(Global Epoch):一个单调递增的整数,通常只需要三个值(0, 1, 2)循环使用。它代表整个系统当前所处的”时代”。

线程本地纪元(Thread-Local Epoch):每个参与并发操作的线程维护自己观察到的纪元值,以及一个”是否活跃”的标志。

Global Epoch: 2

Thread A: epoch=2, active=true    (正在操作数据结构)
Thread B: epoch=1, active=true    (较慢,还在上个纪元)
Thread C: epoch=2, active=false   (当前不在临界区)

2.2 三条核心规则

EBR 的运行遵循三条简单的规则:

规则一:进入临界区时,线程将自己的本地纪元更新为当前全局纪元,并标记自己为活跃状态。

pin():
    local.active = true
    local.epoch = global_epoch
    memory_fence()

规则二:节点被逻辑删除时,将其放入当前纪元对应的”待回收列表”中。

retire(node):
    garbage[global_epoch].add(node)

规则三:当所有活跃线程的本地纪元都等于全局纪元时,全局纪元可以推进,并且两个纪元之前的待回收节点可以安全释放。

try_advance():
    for each thread t:
        if t.active and t.epoch != global_epoch:
            return  // 有线程还停留在旧纪元,不能推进
    // 所有活跃线程都跟上了当前纪元
    old = (global_epoch + 1) % 3
    free_all(garbage[old])
    global_epoch = (global_epoch + 1) % 3

为什么需要三个纪元而不是两个?这是 EBR 设计中最微妙的地方。考虑以下场景:

时刻 1: 全局纪元 = 0
        线程 A pin, 观察到 epoch = 0, 读取节点 X
        线程 B 删除节点 X, 将 X 放入 garbage[0]

时刻 2: 全局纪元推进到 1
        线程 A 仍然 pinned, epoch = 0 (还没来得及更新)

时刻 3: 此时能回收 garbage[0] 吗?
        不能!线程 A 在 epoch=0 时读取了 X,
        X 在 garbage[0] 中,线程 A 可能仍在使用它

三个纪元确保了在回收纪元 e 的垃圾时,所有线程至少已经经过了 e+1 的纪元——这意味着它们已经完成了在纪元 e 期间开始的所有操作。

三、静默状态检测:EBR 的安全基石

3.1 什么是静默状态

静默状态(Quiescent State) 是指线程不持有任何对共享数据结构中对象的引用的时刻。在 EBR 中,这对应于线程处于 unpin 状态——即 active = false

当一个线程从 pin 转换为 unpin 时,它向系统做出了一个承诺:

“我已经用完了在本次临界区中获取的所有共享指针,不再持有任何对共享对象的引用。”

这个承诺是 EBR 正确性的基石。如果线程在 unpin 之后仍然持有并使用共享指针,系统的安全性保证将被打破。

3.2 纪元推进的条件

全局纪元推进需要满足一个关键条件:自上次纪元推进以来,每个参与的线程至少经历了一次静默状态。

用更精确的语言表述:

can_advance(global_epoch) =
    forall thread t in registered_threads:
        t.active == false                    // 线程当前不活跃
        OR t.epoch == global_epoch           // 线程已观察到当前纪元

当这个条件满足时,我们知道:

因此,距当前纪元两个”代”之前的垃圾可以安全回收。

3.3 为什么这个推理是正确的

让我们严格论证。假设全局纪元从 e 推进到 e+1

  1. 推进条件要求所有活跃线程的本地纪元等于 e
  2. 所有活跃线程在 pin() 时观察到 global_epoch == e,且 pin() 中的内存屏障确保后续读取发生在观察纪元之后;
  3. 如果一个节点在纪元 e-1 或更早被删除,在纪元 e 开始时它已不在数据结构中;
  4. 因此在纪元 epin() 的线程不可能获取到该节点的引用;
  5. 结论:纪元 e-1 及更早纪元的垃圾在纪元 e+1 时可以安全回收。

这就是为什么我们需要三个纪元槽位(0, 1, 2 循环使用)——回收总是发生在”两个纪元之前”的垃圾上。

下图展示了纪元推进与垃圾回收的时间线关系:

Epoch-Based Reclamation 时间线

四、Keir Fraser 的原始设计

Epoch-Based Reclamation 最初由 Keir Fraser 在其 2004 年的博士论文 “Practical Lock-Freedom” 中系统化地提出。Fraser 的设计虽然简洁,但已经包含了 EBR 的所有核心要素。

4.1 Fraser 的数据结构

// Fraser 原始设计的核心数据结构(简化表示)
typedef struct {
    atomic_uint global_epoch;      // 全局纪元,只有 0, 1, 2 三个值

    struct thread_record {
        atomic_uint epoch;         // 线程本地纪元
        atomic_bool active;        // 是否在临界区中
        // 每个纪元一个垃圾袋
        garbage_bag_t limbo_list[3];
    } threads[MAX_THREADS];
} ebr_t;

4.2 Fraser 设计的关键操作

进入临界区(对应后来的 pin):

void critical_enter(ebr_t *ebr, int tid) {
    atomic_store(&ebr->threads[tid].active, true);
    // 发布当前纪元——必须在 active=true 之后
    atomic_store(&ebr->threads[tid].epoch,
                 atomic_load(&ebr->global_epoch));
    atomic_thread_fence(memory_order_seq_cst);
}

离开临界区(对应后来的 unpin):

void critical_exit(ebr_t *ebr, int tid) {
    atomic_thread_fence(memory_order_release);
    atomic_store(&ebr->threads[tid].active, false);
}

尝试推进纪元

void try_gc(ebr_t *ebr) {
    uint32_t ge = atomic_load(&ebr->global_epoch);

    for (int i = 0; i < num_threads; i++) {
        if (atomic_load(&ebr->threads[i].active) &&
            atomic_load(&ebr->threads[i].epoch) != ge) {
            return;  // 有线程滞后,不能推进
        }
    }

    // 推进纪元,回收两个纪元前的垃圾
    uint32_t new_epoch = (ge + 1) % 3;
    if (atomic_compare_exchange_strong(&ebr->global_epoch, &ge, new_epoch)) {
        // 回收 new_epoch 对应的旧垃圾
        // (因为循环使用, garbage[new_epoch] 存的是三个纪元前的数据)
        free_garbage(new_epoch);
    }
}

4.3 Fraser 设计的特点

Fraser 的原始设计有几个值得注意的特点:

线程注册制:每个线程需要预先注册,获取一个 thread record。这在固定线程池的场景下工作良好,但在动态线程创建/销毁的环境中会带来管理复杂性。

垃圾袋按纪元分配:每个线程为每个纪元维护独立的垃圾列表,这避免了在退休节点时的竞争。

推进是尽力而为try_gc 是非阻塞的,垃圾的积累由后续的推进尝试来清理。

内存开销不确定:如果某个线程长时间处于 pin 状态,全局纪元将无法推进,垃圾会持续积累。

五、Pin 与 Unpin:临界区的语义学

5.1 Pin 的本质

pin() 操作在语义上等同于”宣告进入一个受保护的临界区”。调用 pin() 之后:

// Crossbeam 中的 pin 用法
let guard = epoch::pin();  // 进入临界区,获取 Guard

// 在 guard 的保护下读取共享指针
let shared = some_atomic.load(Ordering::Acquire, &guard);

// shared 指向的内存在 guard 存活期间保证有效
if let Some(node) = unsafe { shared.as_ref() } {
    println!("value: {}", node.data);
}

// guard 被 drop 时自动 unpin
drop(guard);

5.2 Guard 的生命周期语义

在 Crossbeam 中,pin() 返回一个 Guard 对象,它的 Drop 实现会自动执行 unpin()。这个设计与 Rust 的所有权系统完美契合:

// Guard 的简化定义
pub struct Guard {
    local: *const Local,  // 指向线程本地状态
}

impl Drop for Guard {
    fn drop(&mut self) {
        // 自动 unpin
        unsafe {
            (*self.local).unpin();
        }
    }
}

Guard 类型有一个重要的约束:它不实现 Send。这意味着 Guard 不能被发送到其他线程。这个约束是有意义的——pin/unpin 的语义绑定在特定线程上,将 Guard 发送到其他线程会破坏纪元跟踪的正确性。

5.3 嵌套 Pin

一个线程可以在已经 pin 的状态下再次调用 pin()。Crossbeam 通过引用计数来处理这种情况:

fn pin(&self) -> Guard {
    let guard_count = self.guard_count.get();
    self.guard_count.set(guard_count + 1);

    if guard_count == 0 {
        // 第一次 pin,需要真正进入临界区
        let global_epoch = self.global().epoch.load(Ordering::Relaxed);
        self.epoch.store(global_epoch, Ordering::Relaxed);
        atomic::fence(Ordering::SeqCst);
    }

    Guard { local: self }
}

fn unpin(&self) {
    let guard_count = self.guard_count.get();
    self.guard_count.set(guard_count - 1);

    if guard_count == 1 {
        // 最后一个 guard 被释放,真正离开临界区
        self.epoch.store(UNSET_EPOCH, Ordering::Release);

        if self.should_advance() {
            self.global().try_advance();
        }
    }
}

5.4 Pin 的性能特征

pin() 操作的开销主要来自:

  1. 一次原子写入:更新线程本地纪元;
  2. 一个内存屏障:确保纪元更新对其他线程可见;
  3. 可选的推进尝试:检查是否可以推进全局纪元。

在 x86 架构上,SeqCst fence 通常编译为 mfence 指令,这是 pin 操作中最昂贵的部分。但与 Hazard Pointers 在每次指针读取时都需要的原子操作相比,EBR 的 pin 操作是一次性的开销——在整个临界区内,线程可以自由地读取指针而无需额外的同步。

pin 操作耗时(x86-64 典型值):
  EBR pin():        ~20-40 ns (一次 mfence)
  HP protect():     ~15-30 ns (每个指针一次)

  读取 10 个节点:
  EBR: 20ns + 10 * ~5ns = ~70ns
  HP:  10 * 25ns        = ~250ns

六、Crossbeam 的实现之道

Crossbeam 的 crossbeam-epoch 是 EBR 在 Rust 中的工业级实现。它在 Fraser 的基础设计上做了大量的工程优化。

6.1 核心类型

// 全局状态
pub struct Global {
    epoch: CachePadded<AtomicEpoch>,       // 全局纪元
    locals: List<Local>,                    // 所有线程本地状态的链表
    garbage: [CachePadded<Bag>; 3],         // 三个纪元的垃圾袋
}

// 线程本地状态
pub struct Local {
    epoch: AtomicEpoch,                     // 本地纪元
    guard_count: Cell<usize>,               // 嵌套 pin 计数
    bag: Cell<Bag>,                         // 本地垃圾袋
    collector: Collector,                   // 指向所属的 Collector
}

// 纪元值
struct Epoch {
    // 使用最低位作为 "pinned" 标志
    // 高位存储纪元值
    data: usize,
}

6.2 纪元编码的精妙设计

Crossbeam 使用了一个聪明的位编码技巧:将 active 标志和 epoch 值合并到一个原子变量中。

// 纪元编码:
// bit 0:      pinned 标志 (1 = pinned, 0 = unpinned)
// bit 1..63:  纪元值

const PINNED: usize = 1;

impl Epoch {
    fn is_pinned(&self) -> bool {
        self.data & PINNED != 0
    }

    fn unpinned() -> Self {
        Epoch { data: 0 }  // 纪元值无关紧要,因为未 pin
    }

    fn pinned(epoch: usize) -> Self {
        Epoch { data: (epoch << 1) | PINNED }
    }

    fn value(&self) -> usize {
        self.data >> 1
    }
}

这个设计将两个字段的更新合并为一次原子操作,减少了竞争窗口,同时也减少了缓存行的争用。

6.3 Collector 与 Handle

Crossbeam 引入了 CollectorHandle 的两级抽象:

// Collector 是全局的,可以 Clone(引用计数)
let collector = Collector::new();

// 每个线程通过 register 获取 Handle
let handle = collector.register();

// 通过 Handle 进行 pin
let guard = handle.pin();

这种设计允许多个独立的”域”(domain)使用各自的 EBR 实例,避免了不同数据结构之间的纪元推进互相干扰。

6.4 垃圾袋(Bag)的分层设计

Crossbeam 的垃圾收集采用了两级结构:

线程本地 Bag --[满了]--> 全局 Garbage Queue (per-epoch)
                              |
                        [纪元推进时]
                              |
                              v
                        批量释放全部

每个线程在本地积累垃圾(避免竞争),当本地 Bag 满了(默认 64 个条目)后,将整个 Bag 推送到全局队列:

// 退休一个节点
unsafe fn defer_destroy<T>(guard: &Guard, ptr: Shared<T>) {
    let local = guard.local();
    let bag = local.bag.get_mut();

    bag.push(Deferred::new(move || {
        drop(Box::from_raw(ptr.as_raw() as *mut T));
    }));

    if bag.is_full() {
        let bag = local.bag.replace(Bag::new());
        let epoch = local.epoch.load(Ordering::Relaxed);
        local.global().push_bag(bag, epoch);
    }
}

6.5 纪元推进的实际流程

fn try_advance(&self) -> bool {
    let global_epoch = self.epoch.load(Ordering::Relaxed);

    // 检查所有注册的线程
    for local in self.locals.iter() {
        let local_epoch = local.epoch.load(Ordering::Relaxed);

        // 如果线程已 pin 且纪元落后,不能推进
        if local_epoch.is_pinned()
            && local_epoch.value() != global_epoch
        {
            return false;
        }
    }

    // 所有线程都跟上了,推进纪元
    atomic::fence(Ordering::Acquire);
    let new_epoch = global_epoch.wrapping_add(1);
    self.epoch.store(new_epoch, Ordering::Release);

    // 回收两个纪元前的垃圾
    let old_epoch = new_epoch.wrapping_add(1);  // (e+2) % 3 == (e-1) % 3
    self.garbage[old_epoch % 3].collect();

    true
}

6.6 CachePadded 与伪共享规避

Crossbeam 对频繁被不同线程访问的数据使用了 CachePadded 包装:

#[repr(align(128))]  // 通常缓存行是 64 字节,128 保证跨越行边界
pub struct CachePadded<T> {
    value: T,
}

这确保全局纪元和各线程的本地纪元不会落在同一个缓存行上,避免了伪共享(false sharing)导致的性能退化。

七、Rust 所有权模型与 EBR 的天然契合

7.1 生命周期即安全性

EBR 的核心安全约束是:在 pin 期间获取的指针,不能在 unpin 之后使用。 这恰好可以用 Rust 的生命周期系统来表达:

impl Guard {
    // 返回的 Shared 的生命周期被绑定到 &self(即 Guard 的生命周期)
    pub fn load<'g, T>(
        &'g self,
        atomic: &Atomic<T>,
        ord: Ordering,
    ) -> Shared<'g, T> {
        // Shared<'g, T> 不能比 Guard 活得更久
        Shared {
            ptr: atomic.load(ord),
            _marker: PhantomData,
        }
    }
}

// 编译器会阻止这样的错误用法:
let shared;
{
    let guard = epoch::pin();
    shared = some_atomic.load(Ordering::Acquire, &guard);
}  // guard 被 drop,unpin
// shared.as_ref();  // 编译错误!shared 的生命周期超过了 guard

7.2 SharedOwned 的类型设计

Crossbeam 设计了三种智能指针类型来区分不同的所有权状态:

// Shared<'g, T> - 受 Guard 保护的共享引用
// 类似于 &'g T,但来自原子指针
pub struct Shared<'g, T: 'g> {
    ptr: *const T,
    _marker: PhantomData<(&'g (), *const T)>,
}

// Owned<T> - 独占所有权,可以安全 drop
// 类似于 Box<T>
pub struct Owned<T> {
    ptr: *mut T,
    _marker: PhantomData<Box<T>>,
}

// Atomic<T> - 原子指针,类似于 AtomicPtr<T>
pub struct Atomic<T> {
    ptr: AtomicUsize,
    _marker: PhantomData<*mut T>,
}

这三种类型之间的转换清晰地表达了节点在无锁数据结构中的生命周期:Owned<T> 通过 store 进入 Atomic<T>,再通过 load 获得 Shared<'g, T>——从独占所有权到共享原子再到受 Guard 保护的引用。

7.3 unsafe 的精确边界

Crossbeam 将 unsafe 操作限制在最小范围:pinloadstore、CAS 都是安全的,只有解引用(shared.deref())和延迟释放(guard.defer_destroy())需要 unsafe。这让 Rust 的类型系统承担了 EBR 的大部分安全保证。

八、完整 C 语言实现

下面是一个简化但完整的 EBR 实现。它包含了所有核心功能,并且可以与实际的无锁数据结构配合使用。

/* epoch_reclaim.h - 简化的 Epoch-Based Reclamation 实现 */
#ifndef EPOCH_RECLAIM_H
#define EPOCH_RECLAIM_H

#include <stdatomic.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

/* ===== 配置参数 ===== */
#define EBR_MAX_THREADS     64
#define EBR_EPOCHS          3
#define EBR_BAG_CAPACITY    128
#define EBR_ADVANCE_FREQ    64    /* 每隔多少次 unpin 尝试一次推进 */

/* ===== 待回收节点 ===== */
typedef void (*ebr_free_fn)(void *ptr);

typedef struct ebr_retired_node {
    void               *ptr;
    ebr_free_fn         free_fn;
    struct ebr_retired_node *next;
} ebr_retired_node_t;

/* ===== 线程本地垃圾袋 ===== */
typedef struct {
    ebr_retired_node_t *head;
    int                 count;
} ebr_bag_t;

/* ===== 线程本地记录 ===== */
typedef struct {
    _Atomic(unsigned)   epoch;          /* 线程观察到的纪元 */
    _Atomic(bool)       active;         /* 是否在临界区中 */
    _Atomic(bool)       used;           /* 是否已注册 */
    unsigned            pin_count;      /* 嵌套 pin 计数 */
    unsigned            op_count;       /* 操作计数,用于触发推进 */
    ebr_bag_t           bags[EBR_EPOCHS]; /* 每个纪元一个垃圾袋 */
    char                _pad[64];       /* 缓存行填充 */
} ebr_thread_t;

/* ===== 全局 EBR 状态 ===== */
typedef struct {
    _Atomic(unsigned)   global_epoch;
    ebr_thread_t        threads[EBR_MAX_THREADS];
    int                 max_tid;        /* 记录最大注册线程 ID */
} ebr_t;

/* ===== 初始化 ===== */
static inline void ebr_init(ebr_t *ebr)
{
    memset(ebr, 0, sizeof(ebr_t));
    atomic_store(&ebr->global_epoch, 0);
    ebr->max_tid = 0;
}

/* ===== 线程注册 ===== */
static inline int ebr_register(ebr_t *ebr)
{
    for (int i = 0; i < EBR_MAX_THREADS; i++) {
        bool expected = false;
        if (atomic_compare_exchange_strong(&ebr->threads[i].used,
                                           &expected, true)) {
            atomic_store(&ebr->threads[i].active, false);
            atomic_store(&ebr->threads[i].epoch, 0);
            ebr->threads[i].pin_count = 0;
            ebr->threads[i].op_count = 0;
            for (int e = 0; e < EBR_EPOCHS; e++) {
                ebr->threads[i].bags[e].head = NULL;
                ebr->threads[i].bags[e].count = 0;
            }
            /* 更新 max_tid(非精确,但足够) */
            int cur_max = ebr->max_tid;
            while (i >= cur_max) {
                __atomic_compare_exchange_n(&ebr->max_tid,
                    &cur_max, i + 1, true,
                    __ATOMIC_RELAXED, __ATOMIC_RELAXED);
            }
            return i;
        }
    }
    return -1;  /* 注册失败,超出最大线程数 */
}

/* ===== 线程注销 ===== */
static inline void ebr_unregister(ebr_t *ebr, int tid)
{
    atomic_store(&ebr->threads[tid].active, false);
    /* 注意:注销时垃圾袋中可能还有待回收节点
       完整实现需要将这些节点移交给全局垃圾队列 */
    for (int e = 0; e < EBR_EPOCHS; e++) {
        ebr_retired_node_t *node = ebr->threads[tid].bags[e].head;
        while (node) {
            ebr_retired_node_t *next = node->next;
            node->free_fn(node->ptr);
            free(node);
            node = next;
        }
        ebr->threads[tid].bags[e].head = NULL;
        ebr->threads[tid].bags[e].count = 0;
    }
    atomic_store(&ebr->threads[tid].used, false);
}

/* ===== 回收指定纪元的垃圾 ===== */
static inline void ebr_collect_epoch(ebr_t *ebr, unsigned epoch_idx)
{
    int max = ebr->max_tid;
    for (int i = 0; i < max; i++) {
        if (!atomic_load(&ebr->threads[i].used)) continue;

        ebr_bag_t *bag = &ebr->threads[i].bags[epoch_idx];
        ebr_retired_node_t *node = bag->head;
        while (node) {
            ebr_retired_node_t *next = node->next;
            node->free_fn(node->ptr);
            free(node);
            node = next;
        }
        bag->head = NULL;
        bag->count = 0;
    }
}

/* ===== 尝试推进全局纪元 ===== */
static inline bool ebr_try_advance(ebr_t *ebr)
{
    unsigned ge = atomic_load_explicit(&ebr->global_epoch,
                                       memory_order_relaxed);
    int max = ebr->max_tid;

    /* 检查所有活跃线程是否已跟上当前纪元 */
    for (int i = 0; i < max; i++) {
        if (!atomic_load_explicit(&ebr->threads[i].used,
                                   memory_order_relaxed))
            continue;

        if (atomic_load_explicit(&ebr->threads[i].active,
                                  memory_order_acquire)) {
            unsigned te = atomic_load_explicit(&ebr->threads[i].epoch,
                                                memory_order_relaxed);
            if (te != ge) {
                return false;  /* 有线程滞后 */
            }
        }
    }

    /* 所有线程已跟上,推进纪元 */
    unsigned new_epoch = (ge + 1) % EBR_EPOCHS;

    /* CAS 避免多线程同时推进 */
    if (!atomic_compare_exchange_strong_explicit(
            &ebr->global_epoch, &ge, new_epoch,
            memory_order_release, memory_order_relaxed)) {
        return false;  /* 其他线程已推进 */
    }

    /* 回收新纪元槽位上的旧垃圾
       (new_epoch 槽位存放的是三个纪元前的垃圾) */
    ebr_collect_epoch(ebr, new_epoch);

    return true;
}

/* ===== Pin:进入临界区 ===== */
static inline void ebr_pin(ebr_t *ebr, int tid)
{
    ebr_thread_t *t = &ebr->threads[tid];

    if (t->pin_count++ > 0) {
        return;  /* 嵌套 pin,只增加计数 */
    }

    /* 标记为活跃 */
    atomic_store_explicit(&t->active, true, memory_order_relaxed);

    /* 加载并发布当前全局纪元 */
    unsigned ge = atomic_load_explicit(&ebr->global_epoch,
                                       memory_order_relaxed);
    atomic_store_explicit(&t->epoch, ge, memory_order_relaxed);

    /* 关键屏障:确保纪元更新在后续所有读操作之前可见 */
    atomic_thread_fence(memory_order_seq_cst);
}

/* ===== Unpin:离开临界区 ===== */
static inline void ebr_unpin(ebr_t *ebr, int tid)
{
    ebr_thread_t *t = &ebr->threads[tid];

    if (--t->pin_count > 0) {
        return;  /* 嵌套 unpin,只减少计数 */
    }

    /* 释放屏障:确保临界区内的操作完成 */
    atomic_thread_fence(memory_order_release);
    atomic_store_explicit(&t->active, false, memory_order_relaxed);

    /* 周期性尝试推进纪元 */
    if (++t->op_count % EBR_ADVANCE_FREQ == 0) {
        ebr_try_advance(ebr);
    }
}

/* ===== 退休节点:将节点放入当前纪元的垃圾袋 ===== */
static inline void ebr_retire(ebr_t *ebr, int tid,
                               void *ptr, ebr_free_fn free_fn)
{
    unsigned ge = atomic_load_explicit(&ebr->global_epoch,
                                       memory_order_relaxed);
    unsigned idx = ge % EBR_EPOCHS;

    ebr_retired_node_t *node = (ebr_retired_node_t *)malloc(
        sizeof(ebr_retired_node_t));
    node->ptr = ptr;
    node->free_fn = free_fn;
    node->next = ebr->threads[tid].bags[idx].head;
    ebr->threads[tid].bags[idx].head = node;
    ebr->threads[tid].bags[idx].count++;
}

/* ===== 强制回收(用于 shutdown) ===== */
static inline void ebr_drain(ebr_t *ebr)
{
    for (unsigned e = 0; e < EBR_EPOCHS; e++) {
        ebr_collect_epoch(ebr, e);
    }
}

#endif /* EPOCH_RECLAIM_H */

上面的实现约 200 行,涵盖了 EBR 的所有核心功能。完整的使用示例见后文的 Treiber 栈集成。

九、与 Hazard Pointers 及 RCU 的比较

三种主流内存回收机制各有所长。下面从多个维度进行对比:

维度 EBR Hazard Pointers RCU
读侧开销 极低(每次 pin 一个 fence) 中等(每个指针一次原子写) 零(编译器屏障即可)
写侧开销 低(退休到本地列表) 中等(扫描全局 HP 数组) 高(synchronize_rcu 可能阻塞)
内存回收确定性 差(依赖所有线程推进) 好(精确知道哪些指针被保护) 中等(grace period 有界但可能较长)
最坏情况内存占用 无界(线程可能长期 pin) 有界(O(线程数 * HP数)) 无界(类似 EBR)
实现复杂度 中等 高(内核级)/ 低(用户级 QSBR)
适合的工作负载 读多写少,短临界区 需要确定性回收 读极多写极少
ABA 保护 是(延迟回收) 是(延迟回收) 是(延迟回收)
线程可以阻塞吗 可以但会阻碍回收 可以,不影响其他线程 不建议(会延长 grace period)
标准实现 crossbeam-epoch (Rust) haphazard (Rust), folly (C++) Linux Kernel, URCU
每线程额外空间 O(待回收节点数) O(HP 槽位数) O(待回收节点数)
是否需要线程注册 是(QSBR)/ 否(通用 RCU)
可组合性 好(多个数据结构可共享) 差(HP 数量需预知) 好(内核 RCU 全局共享)

9.1 性能特征的直觉解释

EBR 的读侧性能接近 RCU,因为 pin 操作只需要一个 fence 加一次原子写,之后在整个临界区内读取共享指针没有额外开销。

Hazard Pointers 的优势在于确定性。每个被保护的指针都是显式公布的,即使某个线程挂起,也只会保护它声明的那几个特定指针。

RCU 的读侧开销为零(在内核中仅需禁用抢占),但写侧的 synchronize_rcu 可能需要等待一个完整的 grace period。

9.2 选择指南

                    需要确定性回收?
                    /            \
                  是              否
                  |               |
             Hazard Pointers    读操作频率?
                                /          \
                            极高            一般
                              |               |
                            RCU             EBR

在实践中,EBR 因其实现简单和良好的整体性能,成为了最常用的选择。Crossbeam 的成功很大程度上验证了这一点。

十、边缘情况与工程陷阱

10.1 长时间 Pin 的问题

EBR 最大的隐患是”长寿 pin”(long-lived pin)。如果一个线程长时间保持 pin 状态,全局纪元将无法推进,所有线程退休的垃圾都会积累在内存中。

// 危险!不要这样做
let guard = epoch::pin();

// 长时间阻塞操作
std::thread::sleep(Duration::from_secs(60));

// 在这 60 秒内,全局纪元无法推进
// 所有线程的垃圾都在积累
drop(guard);

缓解策略

  1. 保持 pin 时间尽量短:只在真正需要访问共享数据时 pin;
  2. 周期性 unpin/re-pin:在长操作中主动插入 unpin 点;
  3. 设置垃圾阈值:当本地垃圾超过阈值时强制尝试推进。

10.2 停滞线程(Stalled Threads)

比长寿 pin 更糟糕的情况是线程被操作系统调度器长时间挂起,或者陷入了无限循环:

ebr_pin(&ebr, tid);
/* 线程被 OS 挂起,或者 */
while (some_lock_free_cas_keeps_failing) {
    /* CAS 重试循环——pin 期间不断重试 */
}
ebr_unpin(&ebr, tid);

在这种情况下,全局纪元被完全”冻结”,内存使用量单调增长,最终可能导致 OOM。

Crossbeam 的实际应对:Crossbeam 在 try_advance 中不会阻塞等待落后的线程,而是直接放弃推进。垃圾会在后续的 unpin 中被回收。如果问题持续存在,内存占用将成为应用层需要关注的问题。

10.3 工程陷阱速查表

陷阱 症状 根因 解决方案
在 Guard 之外使用 Shared 指针 use-after-free, 段错误 指针引用超出了 pin 的保护范围 利用 Rust 生命周期约束;C 中需人工审查
忘记 unpin 内存持续增长 全局纪元无法推进 RAII 模式(Rust Guard 自动 drop);C 中使用 cleanup 属性
Pin 期间执行阻塞 IO 垃圾积累,内存暴涨 长时间占据纪元槽位 在 IO 前 unpin,IO 后重新 pin
跨线程传递 Guard 未定义行为 破坏线程本地纪元追踪 Guard 不实现 Send(Rust 编译期阻止)
过于频繁地 pin/unpin 性能下降 每次 pin 都有 fence 开销 批量操作,减少 pin/unpin 频率
回收函数中访问已回收内存 double-free, 数据损坏 回收函数的闭包捕获了共享引用 确保回收函数自包含
混用不同 Collector 的 Guard 未定义行为 纪元追踪域错乱 每个数据结构使用固定的 Collector
线程退出未注销 幽灵线程阻碍纪元推进 线程记录中 active=true 残留 注册时使用 TLS 析构函数自动注销
retire 在 unpin 之后调用 节点被过早回收 retire 记录的纪元可能已经被推进 始终在 pin 保护下 retire

十一、集成实战:基于 EBR 的 Treiber 栈

下面展示如何将 EBR 与经典的 Treiber 栈结合使用。首先是 Rust 版本(使用 Crossbeam),然后是 C 版本(使用前面的 EBR 实现)。

11.1 Rust 版本

use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared};
use std::sync::atomic::Ordering;
use std::ptr;

pub struct TreiberStack<T> {
    head: Atomic<Node<T>>,
}

struct Node<T> {
    data: T,
    next: Atomic<Node<T>>,
}

impl<T> TreiberStack<T> {
    pub fn new() -> Self {
        TreiberStack {
            head: Atomic::null(),
        }
    }

    /// 压栈操作
    pub fn push(&self, data: T) {
        let node = Owned::new(Node {
            data,
            next: Atomic::null(),
        });

        let guard = epoch::pin();

        loop {
            let head = self.head.load(Ordering::Acquire, &guard);
            node.next.store(head, Ordering::Relaxed);

            match self.head.compare_exchange(
                head,
                node,
                Ordering::Release,
                Ordering::Relaxed,
                &guard,
            ) {
                Ok(_) => break,
                Err(e) => {
                    // CAS 失败,node 通过 e.new 返回
                    // 编译器帮助我们避免了内存泄漏
                    node = e.new;
                }
            }
        }
    }

    /// 弹栈操作
    pub fn pop(&self) -> Option<T> {
        let guard = epoch::pin();

        loop {
            let head = self.head.load(Ordering::Acquire, &guard);

            let head_ref = unsafe {
                match head.as_ref() {
                    Some(r) => r,
                    None => return None,  // 栈空
                }
            };

            let next = head_ref.next.load(Ordering::Acquire, &guard);

            if self.head.compare_exchange(
                head,
                next,
                Ordering::Release,
                Ordering::Relaxed,
                &guard,
            ).is_ok() {
                // CAS 成功,读取数据并延迟释放节点
                let data = unsafe { ptr::read(&head_ref.data) };
                unsafe {
                    guard.defer_destroy(head);
                }
                return Some(data);
            }
            // CAS 失败,重试
        }
    }
}

impl<T> Drop for TreiberStack<T> {
    fn drop(&mut self) {
        while self.pop().is_some() {}
    }
}

11.2 C 版本

/* treiber_stack.c - 基于 EBR 的 Treiber 栈 */
#include "epoch_reclaim.h"
#include <stdatomic.h>
#include <stdio.h>
#include <pthread.h>

/* ===== 栈节点 ===== */
typedef struct stack_node {
    int                     value;
    _Atomic(struct stack_node *) next;
} stack_node_t;

/* ===== Treiber 栈 ===== */
typedef struct {
    _Atomic(stack_node_t *) top;
    ebr_t                   ebr;
} treiber_stack_t;

static void free_stack_node(void *ptr) {
    free(ptr);
}

void stack_init(treiber_stack_t *s) {
    atomic_store(&s->top, NULL);
    ebr_init(&s->ebr);
}

void stack_push(treiber_stack_t *s, int tid, int value) {
    stack_node_t *node = (stack_node_t *)malloc(sizeof(stack_node_t));
    node->value = value;

    ebr_pin(&s->ebr, tid);

    stack_node_t *old_top;
    do {
        old_top = atomic_load_explicit(&s->top, memory_order_acquire);
        atomic_store_explicit(&node->next, old_top, memory_order_relaxed);
    } while (!atomic_compare_exchange_weak_explicit(
        &s->top, &old_top, node,
        memory_order_release, memory_order_relaxed));

    ebr_unpin(&s->ebr, tid);
}

bool stack_pop(treiber_stack_t *s, int tid, int *out) {
    ebr_pin(&s->ebr, tid);

    stack_node_t *old_top;
    stack_node_t *new_top;

    do {
        old_top = atomic_load_explicit(&s->top, memory_order_acquire);
        if (old_top == NULL) {
            ebr_unpin(&s->ebr, tid);
            return false;
        }
        new_top = atomic_load_explicit(&old_top->next,
                                        memory_order_relaxed);
    } while (!atomic_compare_exchange_weak_explicit(
        &s->top, &old_top, new_top,
        memory_order_release, memory_order_relaxed));

    *out = old_top->value;

    /* 延迟释放:将旧栈顶节点放入当前纪元的垃圾袋 */
    ebr_retire(&s->ebr, tid, old_top, free_stack_node);

    ebr_unpin(&s->ebr, tid);
    return true;
}

/* ===== 测试 ===== */
#define NUM_THREADS 4
#define OPS_PER_THREAD 10000

treiber_stack_t g_stack;
_Atomic(int) g_push_sum = 0;
_Atomic(int) g_pop_sum = 0;

void *push_worker(void *arg) {
    int tid = ebr_register(&g_stack.ebr);
    int local_sum = 0;

    for (int i = 1; i <= OPS_PER_THREAD; i++) {
        stack_push(&g_stack, tid, i);
        local_sum += i;
    }

    atomic_fetch_add(&g_push_sum, local_sum);
    ebr_unregister(&g_stack.ebr, tid);
    return NULL;
}

void *pop_worker(void *arg) {
    int tid = ebr_register(&g_stack.ebr);
    int local_sum = 0;
    int value;
    int empty_count = 0;

    while (empty_count < 1000) {
        if (stack_pop(&g_stack, tid, &value)) {
            local_sum += value;
            empty_count = 0;
        } else {
            empty_count++;
            sched_yield();
        }
    }

    atomic_fetch_add(&g_pop_sum, local_sum);
    ebr_unregister(&g_stack.ebr, tid);
    return NULL;
}

int main(void) {
    stack_init(&g_stack);

    pthread_t pushers[NUM_THREADS], poppers[NUM_THREADS];

    for (int i = 0; i < NUM_THREADS; i++)
        pthread_create(&pushers[i], NULL, push_worker, NULL);
    for (int i = 0; i < NUM_THREADS; i++)
        pthread_create(&poppers[i], NULL, pop_worker, NULL);
    for (int i = 0; i < NUM_THREADS; i++)
        pthread_join(pushers[i], NULL);
    for (int i = 0; i < NUM_THREADS; i++)
        pthread_join(poppers[i], NULL);

    /* 弹出剩余元素并验证 */
    int tid = ebr_register(&g_stack.ebr);
    int value;
    while (stack_pop(&g_stack, tid, &value))
        atomic_fetch_add(&g_pop_sum, value);
    ebr_unregister(&g_stack.ebr, tid);
    ebr_drain(&g_stack.ebr);

    printf("pushed=%d popped=%d match=%s\n",
           atomic_load(&g_push_sum), atomic_load(&g_pop_sum),
           atomic_load(&g_push_sum) == atomic_load(&g_pop_sum) ? "YES" : "NO");
    return 0;
}

注意:push 在 CAS 成功前保持 pin,确保 old_top 有效;pop 中读取 old_top->next 必须在 pin 保护下;retirepop 成功后将弹出节点放入当前纪元的垃圾袋。

十二、个人思考与总结

12.1 EBR 的哲学

EBR 的设计哲学体现了一种”乐观的集体主义”:它假设所有线程都会在合理的时间内通过静默状态,然后利用这个集体行为来推断安全性。这与 Hazard Pointers 的”个人主义”(每个线程精确声明自己保护的指针)形成了有趣的对比。

这种设计选择反映了一个工程权衡:在常见情况下追求最大性能,代价是在极端情况下失去确定性。 对于大多数应用来说,这是一个合理的取舍——临界区通常很短,线程通常不会被长时间挂起。

12.2 Crossbeam 的工程智慧

Crossbeam 的实现展示了几个值得学习的工程决策:分层垃圾收集(线程本地 Bag 加全局 Garbage Queue 的两级设计)减少了竞争;位编码纪元将 pinned 标志和纪元值合并到一个原子变量,减少原子操作次数;类型驱动的安全性通过 Shared<'g, T> 的生命周期和 Guard 的不可发送性,将安全约束编码到类型系统;周期性推进使用计数器控制推进频率,减少不必要的扫描开销。

12.3 未来的方向

EBR 的研究仍在继续。一些值得关注的改进方向包括:Hyaline(2020),通过引用计数辅助的方式消除了对全局纪元的依赖,解决了线程停滞问题;NBR(Neutralization-Based Reclamation, 2021),允许系统”中和”停滞线程的操作,从根本上解决了长寿 pin 的问题;VBR(Version-Based Reclamation),为每个对象维护版本号,结合了 Hazard Pointers 的精确性和 EBR 的低开销。

这些工作都在试图回答同一个问题:能否在保持 EBR 读侧性能的同时,获得 Hazard Pointers 的确定性? 这依然是并发内存回收领域最核心的开放问题之一。

12.4 写在最后

内存回收看似是一个底层的技术细节,但它实际上触及了并发编程中最深层的问题:在没有全局同步的情况下,如何安全地管理共享资源的生命周期?

EBR 给出的答案是优雅的——用三个递增的纪元和一组简单的规则,在不需要精确跟踪每个引用的情况下,为整个系统建立了安全的回收边界。它不是完美的(没有哪个方案是完美的),但它在简洁性、性能和实用性之间找到了一个令人满意的平衡点。

当你下次在 Rust 中使用 crossbeam::epoch::pin() 时,不妨想一想:这短短一行代码的背后,是二十年来对并发内存安全不懈探索的结晶。


上一篇: Hazard Pointers 下一篇: RCU:Linux 内核的读侧零开销并发

相关阅读: - 无锁队列 - 无锁栈


By .