土法炼钢兴趣小组的算法知识备份

RCU:Linux 内核的读侧零开销并发

目录

一、从一个真实的性能问题说起

2002 年,Linux 内核的路由表查找是一个令人头疼的瓶颈。每一次网络包到达时,内核都需要查找路由表来决定下一跳。在一台繁忙的路由器上,每秒可能有数百万次查找,而路由表的更新——增加一条路由、删除一条路由——一天可能只发生几十次。

当时的解决方案是 rwlock(读写锁)。读者之间不互斥,但每个读者在进入临界区时仍然需要原子地修改锁的计数器。在一台 4 核的机器上,这不是问题。但当 CPU 数量增长到 64、128 甚至更多时,这个原子操作引发了灾难性的 cache line bouncing——每个 CPU 核心都在争抢同一条 cache line,读者之间虽然”不互斥”,实际上却在硬件层面互相拖累。

读写锁的承诺是”读者并行”,但硬件告诉我们,任何共享的可写状态都是并行的敌人。

这就是 RCU(Read-Copy-Update)诞生的背景。它的核心思想可以用一句话概括:

读者完全不修改任何共享状态,因此读侧的开销严格为零。

不是”很小”,不是”接近零”,是在 non-preemptible 内核配置下,rcu_read_lock() 编译后只产生一条编译器屏障指令(compiler barrier),没有任何原子操作、没有任何内存屏障、没有任何分支。零条额外的机器指令。

这听起来像是天方夜谭。读者什么都不做,那写者怎么知道什么时候可以安全地释放旧数据?这正是 RCU 最精妙的地方。

二、RCU 的核心概念

读者永远不阻塞

传统的并发控制——无论是互斥锁、读写锁还是无锁算法中的 CAS 循环——都要求读者以某种方式”注册”自己的存在。RCU 打破了这个范式:读者直接读取数据,不做任何注册。

这怎么可能安全?关键在于 RCU 将”保护数据”这个目标拆分成了两个独立的问题:

  1. 保证读者看到一致的数据:通过 rcu_assign_pointer()rcu_dereference() 的配合,使用内存屏障确保读者要么看到完整的旧版本,要么看到完整的新版本,永远不会看到半新半旧的状态。

  2. 保证旧数据在所有读者结束前不被释放:通过 Grace Period(宽限期)机制延迟释放,确保没有任何读者还在引用旧数据时才真正回收内存。

写者的三步协议

RCU 的写入遵循一个固定的三步协议:

1. Copy   —— 复制一份旧数据的副本
2. Update  —— 在副本上做修改
3. Publish —— 用原子操作将指针从旧版本切换到新版本
   (然后等待所有持有旧版本引用的读者结束,最后释放旧版本)

这个过程中,读者可以在任意时刻进入和退出。正在读旧版本的读者继续读旧版本直到结束,新进入的读者看到新版本。没有锁、没有等待、没有重试。

一个具体的例子

假设我们有一个全局指针 gp 指向一个结构体:

struct foo {
    int a;
    int b;
    int c;
};

struct foo *gp = NULL;

/* 读者 */
void reader(void)
{
    struct foo *p;

    rcu_read_lock();
    p = rcu_dereference(gp);
    if (p != NULL) {
        do_something(p->a, p->b, p->c);
    }
    rcu_read_unlock();
}

/* 写者 */
void writer(int new_a, int new_b, int new_c)
{
    struct foo *old_fp, *new_fp;

    new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
    new_fp->a = new_a;
    new_fp->b = new_b;
    new_fp->c = new_c;

    old_fp = gp;
    rcu_assign_pointer(gp, new_fp);

    synchronize_rcu();  /* 等待所有读者结束 */
    kfree(old_fp);
}

rcu_assign_pointer() 在赋值前插入一个写屏障(write barrier),确保新结构体的所有字段在指针发布之前已经对其他 CPU 可见。rcu_dereference() 在读取指针后插入一个数据依赖屏障,确保后续通过该指针的访问不会被 CPU 或编译器重排到指针读取之前。

三、Grace Period:RCU 的灵魂

Grace Period(宽限期)是 RCU 最关键的概念。它回答了这个问题:写者怎么知道所有读者都已经结束了?

静止状态(Quiescent State)

RCU 不跟踪每个读者的进入和退出。相反,它依赖一个更粗粒度的观察:如果一个 CPU 经历了一次上下文切换(context switch),那么它之前的所有 RCU 读侧临界区一定已经结束了。

这是因为 rcu_read_lock() 在经典 RCU 实现中只是禁止抢占(preemption),而上下文切换意味着抢占已经重新启用,临界区已经结束。

一个 CPU 经历上下文切换的状态称为”静止状态”(Quiescent State,QS)。当所有 CPU 都至少经历了一次静止状态后,一个 Grace Period 就完成了。

下面这张图展示了 Grace Period 的时间线:

RCU Grace Period Timeline

Grace Period 的关键性质

Grace Period 有一个重要的性质需要理解:

在 Grace Period 开始之前已经开始的 RCU 读侧临界区,
必须在 Grace Period 结束之前完成。

但是,在 Grace Period 开始之后才开始的 RCU 读侧临界区,
不在本次 Grace Period 的等待范围内。

这意味着 synchronize_rcu() 不需要等待”所有读者”,只需要等待”在调用 synchronize_rcu() 之前就已经开始的读者”。新的读者看到的是新版本的数据,它们引用新版本,与旧版本的释放无关。

为什么上下文切换是可靠的信号

在 non-preemptible RCU(经典 RCU)中:

这意味着: - 在 RCU 读侧临界区内,当前线程不会被切换出去 - 如果一个 CPU 发生了上下文切换,它一定不在任何 RCU 读侧临界区内 - 因此,上下文切换是一个完美的”静止状态”指示器

其他静止状态还包括: - CPU 进入 idle 状态 - CPU 执行用户态代码 - CPU 执行离线操作

/*
 * 经典 RCU 的 Grace Period 检测(简化版)
 * 每个 CPU 维护一个计数器,每次经过静止状态时递增
 */
struct rcu_data {
    unsigned long completed;     /* 已完成的 GP 编号 */
    unsigned long gpnum;         /* 当前 GP 编号 */
    bool passed_quiesce;         /* 本 GP 是否经过了 QS */
};

DEFINE_PER_CPU(struct rcu_data, rcu_data);

static void rcu_check_quiescent_state(void)
{
    struct rcu_data *rdp = this_cpu_ptr(&rcu_data);

    if (rdp->gpnum != rdp->completed && !rdp->passed_quiesce) {
        /* 当前 CPU 还没有为本 GP 报告静止状态 */
        /* 检查是否经过了上下文切换、idle 等 */
        if (cpu_has_passed_quiescent_state(rdp)) {
            rdp->passed_quiesce = true;
            report_quiescent_state(rdp);
        }
    }
}

四、RCU API 详解

Linux 内核的 RCU API 可以分为五个核心原语。理解它们的语义和实现是掌握 RCU 的关键。

rcu_read_lock() 和 rcu_read_unlock()

/* include/linux/rcupdate.h */

static inline void rcu_read_lock(void)
{
    __rcu_read_lock();
    __acquire(RCU);              /* sparse 静态检查标注 */
    rcu_lock_acquire(&rcu_lock_map);  /* lockdep 检查 */
    rcu_read_lock_held_common(); /* debug 检查 */
}

/* 在 non-preemptible RCU 中,__rcu_read_lock 的实现 */
static inline void __rcu_read_lock(void)
{
    preempt_disable();
}

static inline void __rcu_read_unlock(void)
{
    preempt_enable();
}

在发布版本(CONFIG_PREEMPT_NONECONFIG_PREEMPT_VOLUNTARY)中,这些函数编译后只剩下一条编译器屏障:

/* 编译器屏障确保编译器不会将临界区内的访问移到临界区外 */
#define barrier() __asm__ __volatile__("": : :"memory")

没有原子操作。没有内存屏障。没有分支。读侧开销为零。

rcu_dereference()

/*
 * rcu_dereference - 在 RCU 读侧临界区中安全解引用 RCU 保护的指针
 *
 * 关键作用:
 * 1. 防止编译器将指针解引用提前到指针读取之前
 * 2. 在需要的架构上插入数据依赖屏障
 * 3. 为 sparse 和 lockdep 提供检查点
 */
#define rcu_dereference(p) \
    ({ \
        typeof(p) _________p1 = READ_ONCE(p); \
        rcu_check_sparse(p, __rcu); \
        rcu_dereference_check(p, __CHECKER__); \
        smp_read_barrier_depends(); \
        (_________p1); \
    })

READ_ONCE() 防止编译器缓存指针值或做推测优化。smp_read_barrier_depends() 在大多数架构上是空操作(no-op),因为 x86、ARM 等架构天然保证数据依赖顺序。只有 Alpha 架构需要一条真正的屏障指令,因为 Alpha 有极其激进的乱序执行,可以打破数据依赖。

rcu_assign_pointer()

/*
 * rcu_assign_pointer - 发布一个新的 RCU 保护的指针
 *
 * 确保在指针赋值之前,所有对新结构体字段的写入
 * 对其他 CPU 可见。
 */
#define rcu_assign_pointer(p, v) \
    do { \
        smp_store_release(&(p), (v)); \
    } while (0)

smp_store_release() 包含一个写屏障,确保所有在此之前的写操作(比如初始化新结构体的字段)在指针赋值对其他 CPU 可见之前完成。这与 rcu_dereference() 中的读屏障配对,共同保证了 publish-subscribe 的正确性。

synchronize_rcu()

/*
 * synchronize_rcu - 等待一个完整的 Grace Period
 *
 * 阻塞调用者直到所有在调用之前已经开始的 RCU 读侧临界区结束。
 * 这是最简单但也最慢的 RCU 回收方式。
 */
void synchronize_rcu(void)
{
    RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
                     lock_is_held(&rcu_lock_map) ||
                     lock_is_held(&rcu_sched_lock_map),
                     "Illegal synchronize_rcu() in RCU read-side critical section");

    if (rcu_blocking_is_gp())
        return;  /* 单 CPU 系统,禁用抢占即等价于 GP */

    /* 注册一个回调并等待它被调用 */
    wait_rcu_gp(call_rcu);
}

synchronize_rcu() 是同步的——它会阻塞当前线程。如果在中断上下文或不能睡眠的地方,必须使用异步版本 call_rcu()

call_rcu()

/*
 * call_rcu - 注册一个在 Grace Period 结束后调用的回调
 *
 * 不阻塞调用者。回调函数在一个 Grace Period 后
 * 被 RCU 的 softirq 处理器调用。
 */
void call_rcu(struct rcu_head *head, rcu_callback_t func)
{
    __call_rcu(head, func, &rcu_state);
}

/* 使用示例 */
struct my_data {
    struct rcu_head rcu;
    int value;
    char name[64];
};

static void my_data_free(struct rcu_head *head)
{
    struct my_data *p = container_of(head, struct my_data, rcu);
    kfree(p);
}

void update_data(struct my_data *new_data)
{
    struct my_data *old;

    old = rcu_dereference_protected(global_data,
                                     lockdep_is_held(&my_mutex));
    rcu_assign_pointer(global_data, new_data);

    /* 异步释放旧数据,不阻塞 */
    call_rcu(&old->rcu, my_data_free);
}

call_rcu() 的优势在于它不会阻塞调用者,适合在持有自旋锁或原子上下文中使用。代价是每个待释放的对象需要嵌入一个 struct rcu_head(16 字节)。

API 使用规则总结

+-------------------------+--------------------------------------------+
| API                     | 语义                                       |
+-------------------------+--------------------------------------------+
| rcu_read_lock()         | 标记 RCU 读侧临界区开始(禁止抢占)       |
| rcu_read_unlock()       | 标记 RCU 读侧临界区结束(允许抢占)       |
| rcu_dereference(ptr)    | 在读侧安全地获取 RCU 保护的指针           |
| rcu_assign_pointer(p,v) | 发布新版本(含写屏障)                     |
| synchronize_rcu()       | 同步等待 Grace Period 完成                 |
| call_rcu(head, func)    | 异步在 Grace Period 后调用回调             |
| rcu_barrier()           | 等待所有已注册的 call_rcu 回调完成         |
+-------------------------+--------------------------------------------+

五、实现演进:从 Classic RCU 到 Tree RCU

Classic RCU 的瓶颈

最初的 RCU 实现(Classic RCU)使用一个全局的位图来跟踪哪些 CPU 已经通过了静止状态:

/* Classic RCU 的简化模型 */
struct rcu_ctrlblk {
    unsigned long cpumask;   /* 还没有通过 QS 的 CPU 集合 */
    unsigned long completed; /* 已完成的 GP 编号 */
    spinlock_t lock;         /* 保护全局状态的锁 */
};

当一个 CPU 报告静止状态时,它需要获取全局锁,清除位图中自己的位,然后释放锁。在 CPU 数量较少时这没有问题,但当系统扩展到数百个 CPU 时,这个全局锁成为了瓶颈——所有 CPU 都在争抢同一个锁来报告自己的静止状态。

这是一个讽刺的情况:RCU 的设计初衷是消除读者的锁竞争,但它自身的实现却引入了一个新的锁竞争点。

Tree RCU 的层次化设计

Paul McKenney(RCU 的主要维护者和设计者)在 2008 年引入了 Tree RCU,用树状结构替代了扁平的位图:

                    +------------------+
                    |   rcu_state      |
                    |   (root node)    |
                    +--------+---------+
                             |
              +--------------+--------------+
              |                             |
     +--------+--------+          +--------+--------+
     |   rcu_node[0]   |          |   rcu_node[1]   |
     | (CPUs 0-63)     |          | (CPUs 64-127)   |
     +--------+--------+          +--------+--------+
              |                             |
     +--------+--------+          +--------+--------+
     |                  |          |                  |
  +--+---+  +---+---+  ...     +--+---+  +---+---+
  |node  |  |node   |          |node  |  |node   |
  |0-15  |  |16-31  |          |64-79 |  |80-95  |
  +--+---+  +---+---+          +--+---+  +---+---+
     |          |                  |          |
  CPUs 0-15  CPUs 16-31        CPUs 64-79  CPUs 80-95

每个叶子节点(rcu_node)负责一组 CPU(默认 16 个)。当一个 CPU 报告静止状态时,它只需要获取自己所属叶子节点的锁。只有当一个叶子节点的所有 CPU 都通过了静止状态时,才需要向上一层汇报。

/* Tree RCU 的核心数据结构 */
struct rcu_node {
    raw_spinlock_t lock;
    unsigned long qsmask;    /* 还没有通过 QS 的 CPU/子节点 */
    unsigned long qsmaskinit;/* GP 开始时的初始掩码 */
    unsigned long gpnum;     /* 当前 GP 编号 */
    unsigned long completed; /* 已完成的 GP 编号 */
    struct rcu_node *parent; /* 父节点 */
    int grplo;               /* 负责的最小 CPU 编号 */
    int grphi;               /* 负责的最大 CPU 编号 */
    int level;               /* 树的层级 */
};

struct rcu_state {
    struct rcu_node node[NUM_RCU_NODES]; /* 所有节点 */
    struct rcu_node *level[RCU_NUM_LVLS];/* 每层的起始节点 */
    struct rcu_data __percpu *rda;       /* per-CPU 数据 */
    unsigned long gpnum;                 /* 全局 GP 编号 */
    unsigned long completed;             /* 已完成 GP 编号 */
    struct task_struct *gp_kthread;      /* GP 内核线程 */
};

扩展性分析

假设系统有 4096 个 CPU,每个 rcu_node 管理 64 个子节点:

当一个 CPU 报告静止状态时: - 在 Classic RCU 中:竞争全局锁,4096 个 CPU 争抢 1 个锁 - 在 Tree RCU 中:竞争叶子节点的锁,最多 64 个 CPU 争抢 1 个锁

锁竞争从 O(N) 降低到了 O(log N)。这使得 RCU 能够扩展到拥有数千个 CPU 的系统。

Grace Period 内核线程

Tree RCU 使用专门的内核线程 rcu_gp_kthread 管理 Grace Period 生命周期。它的主循环为:等待 GP 请求 -> 初始化 GP -> 等待所有 CPU 通过静止状态(rcu_gp_fqs_loop) -> 清理并推进回调队列 -> 唤醒 RCU softirq 处理到期回调。

六、内存序与屏障:为什么细节如此重要

RCU 的正确性依赖于精确的内存排序保证。这是整个机制中最微妙、最容易出错的部分。

发布-订阅模式

RCU 的核心是一个发布-订阅(publish-subscribe)模式:

/* 发布者(写者) */
struct foo *new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
new_fp->a = 1;           /* 初始化字段 A */
new_fp->b = 2;           /* 初始化字段 B */
new_fp->c = 3;           /* 初始化字段 C */
rcu_assign_pointer(gp, new_fp);  /* 发布 */
                /* ^^^ 包含 smp_store_release() */
                /*     保证上面三条写入在指针赋值之前完成 */

/* 订阅者(读者) */
rcu_read_lock();
struct foo *p = rcu_dereference(gp);  /* 订阅 */
              /* ^^^ 包含 READ_ONCE() + 数据依赖屏障 */
              /*     保证通过 p 的后续访问不会被重排到此之前 */
if (p) {
    do_something(p->a, p->b, p->c);
    /* 保证看到的是完全初始化后的 a, b, c */
}
rcu_read_unlock();

如果没有这些屏障,以下灾难性的场景在弱内存序架构上是可能的:

  1. 编译器可能将 new_fp->a = 1 重排到 rcu_assign_pointer() 之后
  2. CPU 可能让其他 CPU 先看到指针赋值,再看到字段初始化
  3. 读者可能读到一个非 NULL 的指针,但通过它访问到的字段还是旧值或未初始化的值

Alpha 架构的特殊性

在几乎所有现代架构上,如果你通过指针 p 访问 p->field,硬件会保证你读到的 p->field 的值不会比 p 更旧。这叫做”数据依赖保证”(data dependency ordering)。

但 Alpha 是个例外。Alpha 的分区缓存(split cache)设计允许以下情况发生:

CPU 0 (writer)                    CPU 1 (reader on Alpha)
----                              ----
new_fp->a = 1;
smp_wmb();                        /* 即使有写屏障... */
gp = new_fp;
                                  p = gp;        /* 读到 new_fp */
                                  tmp = p->a;    /* 可能读到旧值! */

这就是为什么 rcu_dereference() 中需要 smp_read_barrier_depends()——在 Alpha 上它是一条真正的内存屏障 mb(),在其他所有架构上它是空操作。

从内核 4.15 开始,smp_read_barrier_depends() 已经逐步被 READ_ONCE() 的语义吸收。READ_ONCE() 在所有架构上都保证正确的数据依赖顺序,包括 Alpha。这简化了 RCU 的实现。

READ_ONCE 的重要性

即使在 x86 这样的强内存序架构上,READ_ONCE() 仍然是必需的,但原因不同——它防止的是编译器优化:

/* 没有 READ_ONCE 时可能发生的编译器优化灾难 */

/* 原始代码 */
p = gp;
if (p != NULL) {
    do_something(p->a);
}

/* 编译器可能优化为(在 gp 未标记 volatile 时) */
if (gp != NULL) {
    do_something(gp->a);  /* 重新读取 gp!可能已经被修改了! */
}

/* 更糟糕的情况:编译器做值预测优化 */
tmp = gp->a;  /* 先投机读取 */
p = gp;
if (p != NULL) {
    do_something(tmp);
}

READ_ONCE() 告诉编译器:“这个值可能随时被其他线程修改,不要缓存它,不要推测它,每次都真正读一次。”

#define READ_ONCE(x) \
    ({ \
        union { typeof(x) __val; char __c[1]; } __u = \
            { .__c = { 0 } }; \
        __read_once_size(&(x), __u.__c, sizeof(x)); \
        smp_read_barrier_depends(); \
        __u.__val; \
    })

七、RCU 在内核中的典型应用

RCU 是 Linux 内核中使用最广泛的同步机制之一。截至内核 6.x,RCU 的使用遍布数百个子系统。以下是几个最经典的应用场景。

路由表查找(FIB)

这是 RCU 的”杀手级应用”,也是 RCU 被引入内核的直接原因:

/* net/ipv4/fib_trie.c */

static int fn_trie_lookup(struct fib_table *tb, const struct flowi4 *flp,
                           struct fib_result *res)
{
    struct trie *t = (struct trie *)tb->tb_data;
    struct key_vector *n, *pn;
    t_key key = ntohl(flp->daddr);

    rcu_read_lock();  /* 进入 RCU 读侧临界区 */

    n = rcu_dereference(t->trie);
    if (!n)
        goto out;

    /* 在 trie 中查找最长前缀匹配 */
    pn = n;
    while (IS_TNODE(n)) {
        /* 遍历 trie 节点——完全无锁 */
        n = rcu_dereference(n->child[get_index(key, n)]);
        if (!n)
            break;
    }

    /* 在叶子节点中查找匹配的路由 */
    if (IS_LEAF(n)) {
        /* ... 查找逻辑 ... */
    }

out:
    rcu_read_unlock();  /* 退出 RCU 读侧临界区 */
    return ret;
}

在一台现代服务器上,路由表查找每秒可能发生数千万次。如果每次查找都需要获取一个读写锁的读锁,仅锁操作本身就会消耗大量 CPU 周期并导致 cache line 颠簸。使用 RCU 后,读侧开销降为零,查找性能完全取决于数据结构本身的效率。

内核模块卸载

当卸载一个内核模块时,需要确保没有任何 CPU 正在执行该模块的代码:

/* kernel/module.c(简化) */

static void free_module(struct module *mod)
{
    /* 从模块列表中移除 */
    mutex_lock(&module_mutex);
    list_del_rcu(&mod->list);
    mutex_unlock(&module_mutex);

    /* 等待所有可能在执行模块代码的 CPU */
    synchronize_rcu();

    /* 现在可以安全地释放模块内存了 */
    module_memfree(mod->core_layout.base);
    module_memfree(mod->init_layout.base);
}

SELinux 策略更新

SELinux 的安全策略在每次系统调用时都需要检查,但策略更新非常罕见:

/* security/selinux/ss/services.c(简化) */

static int security_compute_av(/* ... */)
{
    struct selinux_policy *policy;

    rcu_read_lock();
    policy = rcu_dereference(selinux_state.policy);

    /* 在 RCU 保护下查询策略 */
    rc = context_struct_compute_av(policy, /* ... */);

    rcu_read_unlock();
    return rc;
}

/* 策略更新 */
void selinux_policy_commit(struct selinux_policy *newpolicy)
{
    struct selinux_policy *oldpolicy;

    oldpolicy = rcu_dereference_protected(selinux_state.policy,
                                           lockdep_is_held(&state->policy_mutex));
    rcu_assign_pointer(selinux_state.policy, newpolicy);
    synchronize_rcu();
    selinux_policy_free(oldpolicy);
}

文件系统 dentry 缓存

目录项缓存是 VFS 层最频繁访问的数据结构之一。d_lookup() 在 RCU 读侧临界区中遍历 hash bucket 链表,通过 hlist_bl_for_each_entry_rcu 无锁查找匹配的 dentry,找到后用 lockref_get_not_dead() 安全地获取引用。整个路径解析过程完全无锁。


### 使用场景总结
子系统 读操作频率 写操作频率
路由表(FIB) 模块列表 SELinux 策略 dentry 缓存 PID 查找 网络命名空间 每秒数千万次 每次 kallsyms 查找 每次系统调用 每次路径解析 信号发送、procfs 每个网络操作 每天几十次 模块加载/卸载时 策略变更时(极罕见) 文件创建/删除时 进程创建/销毁时 命名空间创建/销毁时

## 八、用户态 RCU:liburcu

RCU 的思想不限于内核。liburcu(Userspace RCU)将 RCU 带到了用户态,但面临一些内核中不存在的挑战:

1. 用户态线程可以在任意时刻被调度器抢占,无法像内核那样简单地禁止抢占
2. 用户态无法直接检测上下文切换
3. 用户态没有 per-CPU 数据结构的天然支持

liburcu 提供了多种 RCU 变体来应对这些挑战:

### QSBR(Quiescent-State-Based Reclamation)

QSBR 要求读者显式地报告静止状态。这是性能最好的用户态 RCU 变体,读侧开销为零,但侵入性最强——应用程序必须在每个合适的时机调用 `rcu_quiescent_state()`。

```c
#include <urcu/urcu-qsbr.h>

/* 读者线程 */
void reader_thread(void *arg)
{
    rcu_register_thread();

    while (!should_stop) {
        rcu_read_lock();   /* QSBR 中这是空操作 */

        struct data *p = rcu_dereference(shared_ptr);
        if (p) {
            process(p);
        }

        rcu_read_unlock(); /* QSBR 中这也是空操作 */

        /* 必须定期调用,否则 Grace Period 无法推进 */
        rcu_quiescent_state();
    }

    rcu_unregister_thread();
}

membarrier-based RCU

利用 Linux 的 membarrier() 系统调用,在写侧通过向所有 CPU 广播内存屏障来代替读侧的屏障。读侧只需要编译器屏障:

#include <urcu/urcu-memb.h>

/* 读者 */
rcu_read_lock();   /* 只是一个编译器屏障 + 计数器递增 */
/* ... 读取 RCU 保护的数据 ... */
rcu_read_unlock(); /* 编译器屏障 + 计数器递增 */

/* 写者调用 synchronize_rcu() 时 */
/* 内部使用 membarrier(MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0) */
/* 这会让内核在所有 CPU 上执行一次内存屏障 */

signal-based RCU

在没有 membarrier() 的旧内核上,使用 SIGUSR1 信号来中断所有读者线程,迫使它们执行内存屏障:

#include <urcu/urcu-signal.h>

/* 读者 */
rcu_read_lock();   /* 编译器屏障 + 计数器操作 */
/* ... */
rcu_read_unlock();

/* 写者调用 synchronize_rcu() 时 */
/* 向所有注册的读者线程发送 SIGUSR1 */
/* 信号处理器中执行内存屏障 */

变体对比

+-------------------+----------+-----------+------------------+-----------+
| 变体              | 读侧开销 | 写侧开销  | 侵入性           | 适用场景  |
+-------------------+----------+-----------+------------------+-----------+
| QSBR              | 0        | 低        | 高(需显式报告) | 高性能服务|
| membarrier-based  | 极低     | 中        | 低               | 通用场景  |
| signal-based      | 极低     | 高        | 中(需注册线程) | 旧内核    |
| bullet-proof (bp) | 低       | 高        | 最低             | 库/框架   |
+-------------------+----------+-----------+------------------+-----------+

九、手写一个简化的用户态 QSBR RCU

理解 RCU 最好的方式是自己实现一个。下面是一个完整的、可编译运行的简化版 QSBR RCU 实现。

/* simple_qsbr_rcu.h -- 简化的 QSBR RCU 实现 */
#ifndef SIMPLE_QSBR_RCU_H
#define SIMPLE_QSBR_RCU_H

#include <stdint.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>

#define RCU_MAX_THREADS 128
#define RCU_THREAD_OFFLINE UINT64_MAX

/* per-thread RCU 状态,cache line 对齐以避免 false sharing */
struct rcu_thread_data {
    _Atomic uint64_t gp_seq;
    uint64_t         pad[7];
} __attribute__((aligned(64)));

/* 全局 RCU 状态 */
struct rcu_global {
    _Atomic uint64_t        gp_seq;
    struct rcu_thread_data  threads[RCU_MAX_THREADS];
    _Atomic int             thread_count;
};

static struct rcu_global g_rcu;
static __thread int rcu_thread_id = -1;

static inline void rcu_init(void)
{
    memset(&g_rcu, 0, sizeof(g_rcu));
    atomic_store(&g_rcu.gp_seq, 1);
    for (int i = 0; i < RCU_MAX_THREADS; i++)
        atomic_store_explicit(&g_rcu.threads[i].gp_seq,
                              RCU_THREAD_OFFLINE, memory_order_relaxed);
}

static inline void rcu_thread_register(void)
{
    int id = atomic_fetch_add(&g_rcu.thread_count, 1);
    assert(id < RCU_MAX_THREADS);
    rcu_thread_id = id;
    uint64_t cur = atomic_load_explicit(&g_rcu.gp_seq,
                                         memory_order_acquire);
    atomic_store_explicit(&g_rcu.threads[id].gp_seq,
                          cur, memory_order_release);
}

static inline void rcu_thread_unregister(void)
{
    if (rcu_thread_id >= 0) {
        atomic_store_explicit(&g_rcu.threads[rcu_thread_id].gp_seq,
                              RCU_THREAD_OFFLINE, memory_order_release);
        rcu_thread_id = -1;
    }
}

/* QSBR 中 rcu_read_lock/unlock 是空操作 */
static inline void rcu_read_lock(void)  { /* no-op */ }
static inline void rcu_read_unlock(void) { /* no-op */ }

#define rcu_dereference(p) \
    ({ \
        __typeof__(p) _val = atomic_load_explicit( \
            (_Atomic __typeof__(p) *)&(p), memory_order_consume); \
        _val; \
    })

#define rcu_assign_pointer(p, v) \
    atomic_store_explicit( \
        (_Atomic __typeof__(p) *)&(p), (v), memory_order_release)

/*
 * rcu_quiescent_state -- QSBR 的核心
 * 读者定期调用,告诉 RCU 子系统"我不在任何读侧临界区"
 */
static inline void rcu_quiescent_state(void)
{
    assert(rcu_thread_id >= 0);
    uint64_t cur = atomic_load_explicit(&g_rcu.gp_seq,
                                         memory_order_acquire);
    atomic_store_explicit(&g_rcu.threads[rcu_thread_id].gp_seq,
                          cur, memory_order_release);
}

/* 线程长时间阻塞前标记 offline,避免卡住 Grace Period */
static inline void rcu_thread_offline(void)
{
    assert(rcu_thread_id >= 0);
    atomic_store_explicit(&g_rcu.threads[rcu_thread_id].gp_seq,
                          RCU_THREAD_OFFLINE, memory_order_release);
}

static inline void rcu_thread_online(void)
{
    assert(rcu_thread_id >= 0);
    uint64_t cur = atomic_load_explicit(&g_rcu.gp_seq,
                                         memory_order_acquire);
    atomic_store_explicit(&g_rcu.threads[rcu_thread_id].gp_seq,
                          cur, memory_order_release);
}

/* 检查所有线程是否都通过了 target GP */
static inline bool rcu_gp_completed(uint64_t target)
{
    int count = atomic_load_explicit(&g_rcu.thread_count,
                                      memory_order_acquire);
    for (int i = 0; i < count; i++) {
        uint64_t thr_gp = atomic_load_explicit(
            &g_rcu.threads[i].gp_seq, memory_order_acquire);
        if (thr_gp != RCU_THREAD_OFFLINE && thr_gp < target)
            return false;
    }
    return true;
}

/*
 * synchronize_rcu -- 递增全局 GP,等待所有线程通过
 */
static inline void synchronize_rcu(void)
{
    uint64_t target = atomic_fetch_add_explicit(
        &g_rcu.gp_seq, 1, memory_order_acq_rel) + 1;

    while (!rcu_gp_completed(target))
        sched_yield();
}

#endif /* SIMPLE_QSBR_RCU_H */

测试程序

/* test_qsbr_rcu.c */
#define _GNU_SOURCE
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <sched.h>
#include "simple_qsbr_rcu.h"

struct shared_data {
    int version;
    int value_a;
    int value_b;
    int checksum;  /* 应该等于 value_a + value_b */
};

static struct shared_data *global_data = NULL;
static _Atomic int running = 1;
static _Atomic uint64_t read_count = 0;
static _Atomic uint64_t inconsistency_count = 0;

void *reader_func(void *arg)
{
    long id = (long)arg;
    rcu_thread_register();
    uint64_t local_reads = 0;

    while (atomic_load(&running)) {
        rcu_read_lock();
        struct shared_data *p = rcu_dereference(global_data);
        if (p) {
            int expected = p->value_a + p->value_b;
            if (p->checksum != expected) {
                atomic_fetch_add(&inconsistency_count, 1);
                fprintf(stderr,
                    "Reader %ld: inconsistency! v=%d a=%d b=%d\n",
                    id, p->version, p->value_a, p->value_b);
            }
            local_reads++;
        }
        rcu_read_unlock();

        if (local_reads % 1000 == 0)
            rcu_quiescent_state();
    }

    atomic_fetch_add(&read_count, local_reads);
    rcu_thread_unregister();
    return NULL;
}

void *writer_func(void *arg)
{
    int version = 0;
    while (atomic_load(&running)) {
        struct shared_data *new_data = malloc(sizeof(*new_data));
        version++;
        new_data->version  = version;
        new_data->value_a  = version * 10;
        new_data->value_b  = version * 20;
        new_data->checksum = new_data->value_a + new_data->value_b;

        struct shared_data *old_data = global_data;
        rcu_assign_pointer(global_data, new_data);

        if (old_data) {
            synchronize_rcu();
            free(old_data);
        }
        usleep(1000);
    }
    return NULL;
}

int main(void)
{
    rcu_init();
    const int NR = 4;
    pthread_t readers[NR], writer;

    for (long i = 0; i < NR; i++)
        pthread_create(&readers[i], NULL, reader_func, (void *)i);
    pthread_create(&writer, NULL, writer_func, NULL);

    sleep(5);
    atomic_store(&running, 0);

    pthread_join(writer, NULL);
    for (int i = 0; i < NR; i++)
        pthread_join(readers[i], NULL);

    printf("Total reads:     %lu\n", atomic_load(&read_count));
    printf("Inconsistencies: %lu\n", atomic_load(&inconsistency_count));

    free(global_data);
    return atomic_load(&inconsistency_count) == 0 ? 0 : 1;
}

编译和运行:

gcc -O2 -pthread -o test_qsbr_rcu test_qsbr_rcu.c
./test_qsbr_rcu

预期输出(inconsistency 应为 0):

Total reads:     48372615
Inconsistencies: 0

十、RCU vs Hazard Pointers vs Epoch-Based Reclamation

这三种内存回收机制各有优劣。选择哪一种取决于具体的工作负载特征和系统约束。

原理对比

RCU 的核心思路是:不跟踪单个对象的引用,而是等待一段足够长的时间(Grace Period),保证所有可能持有旧引用的读者都已经结束。

Hazard Pointers 的核心思路是:每个线程显式声明自己正在使用的指针。写者在释放对象前检查所有线程的 hazard pointer 列表,如果有任何线程正在使用该对象,则延迟释放。

Epoch-Based Reclamation 的核心思路是:维护一个全局的 epoch 计数器。线程进入临界区时记录当前 epoch。当所有线程都已经观察到新的 epoch 时,旧 epoch 中待释放的对象可以安全回收。

详细对比表

+--------------------------+------------------+------------------+------------------+
| 维度                     | RCU              | Hazard Pointers  | Epoch-Based      |
+--------------------------+------------------+------------------+------------------+
| 读侧开销                 | 0(编译器屏障)  | 1-2 次内存屏障   | 1 次 atomic load |
| 写侧开销                 | 等待 GP 或回调   | 扫描 HP 列表     | epoch 推进       |
| 内存回收延迟              | GP 时长          | 即时检测         | 2 个 epoch       |
|                          | (毫秒级)       | (微秒级)       | (可变)         |
+--------------------------+------------------+------------------+------------------+
| 每对象额外空间            | 0(或 rcu_head) | 0                | 0                |
| per-thread 额外空间       | per-CPU 计数器   | K 个 HP 槽位     | epoch 计数器     |
| 回收粒度                  | 批量             | 单对象           | 批量             |
+--------------------------+------------------+------------------+------------------+
| 读者嵌套                  | 支持             | 需要多个 HP 槽位 | 天然支持         |
| 等待自由(wait-free)读   | 是               | 是               | 否(有 CAS)     |
| 无界内存累积风险          | 有(慢读者)     | 无               | 有(慢线程)     |
+--------------------------+------------------+------------------+------------------+
| 编程复杂度                | 低               | 高               | 中               |
| 适用语言                  | C(内核/用户态) | C/C++/Rust       | C/C++/Rust/Java  |
| 标准化                    | Linux 内核事实   | C++ P1121        | 学术界           |
|                          | 标准             | (提案中)       |                  |
+--------------------------+------------------+------------------+------------------+
| 最佳场景                  | 读远多于写       | 对象少、HP 少    | 中等读写比       |
| 最差场景                  | 读侧临界区极长   | 复杂数据结构     | 有长时间阻塞     |
|                          |                  | (需要很多 HP)  | 的线程           |
+--------------------------+------------------+------------------+------------------+

选择指南

用一个简单的决策树来帮助选择:

读写比是否 > 100:1?
  |
  +-- 是 --> 读者是否在内核态?
  |            |
  |            +-- 是 --> 使用 RCU(内核原生支持)
  |            +-- 否 --> 使用 liburcu QSBR
  |
  +-- 否 --> 对象生命周期是否明确?
               |
               +-- 是 --> 每个读者同时访问的对象数量?
               |            |
               |            +-- <= 3 --> 使用 Hazard Pointers
               |            +-- > 3  --> 使用 Epoch-Based
               |
               +-- 否 --> 使用引用计数或 GC

十一、性能分析

读侧开销的量化

在 non-preemptible 内核中,rcu_read_lock() 编译后不产生任何机器指令——只有一条编译器屏障。对比 pthread_rwlock_rdlock 需要 lock cmpxchg(原子 CAS)和失败重试循环,差距是本质性的。

典型 x86-64 系统延迟数据:

+---------------------------+-------------------+-----------------+
| 操作                      | 延迟(纳秒/次)  | 缓存行访问      |
+---------------------------+-------------------+-----------------+
| rcu_read_lock()           | ~0 (编译器屏障)   | 0               |
| rcu_read_unlock()         | ~0 (编译器屏障)   | 0               |
| rcu_dereference()         | ~0 (READ_ONCE)    | 0(额外的)     |
+---------------------------+-------------------+-----------------+
| pthread_rwlock_rdlock()   | 15-100            | 1-2(竞争时更多)|
| pthread_rwlock_rdunlock() | 15-100            | 1-2             |
+---------------------------+-------------------+-----------------+
| atomic_inc (无竞争)       | 5-10              | 1               |
| atomic_inc (竞争激烈)     | 50-500            | 1(但 bounce)  |
+---------------------------+-------------------+-----------------+
| synchronize_rcu()         | ~10000-50000      | 多个            |
| call_rcu()                | ~100-500          | 1-2             |
+---------------------------+-------------------+-----------------+

写侧开销分析

写侧的开销来自三方面:数据复制(O(数据大小))、Grace Period 等待(取决于最慢读者,受 CONFIG_HZ 影响——HZ=1000 时最多约 1ms,HZ=100 时最多约 10ms)、以及回调批处理延迟。call_rcu() 本身很快(100-500ns),但回调队列在极端情况下可能积压。

扩展性分析

当 CPU 数量增加时,不同同步机制的表现差异巨大:

读者吞吐量 vs CPU 数量(路由表查找场景)

CPU 数量:    1      4      16     64     256    1024
----------------------------------------------------------
RCU:        100%   400%   1600%  6400%  25600% 102400%
            (线性扩展,每个 CPU 独立运行)

rwlock:     100%   350%   800%   600%   400%   200%
            (先增长后因 cache line bouncing 下降)

mutex:      100%   100%   100%   100%   100%   100%
            (所有读者串行,不随 CPU 增加而扩展)

RCU 的读侧扩展性是完美线性的,因为读者之间完全没有共享的可写状态。读写锁在 CPU 数量达到一定程度后开始退化,因为 cache line bouncing 的开销超过了并行带来的收益。

十二、工程实践中的陷阱

在实际使用 RCU 时,有许多容易犯的错误。以下是最常见的陷阱和对策:

+----+-------------------------------+--------------------------------------+-------------------------------------+
| #  | 陷阱                          | 后果                                 | 对策                                |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 1  | 在 RCU 读侧临界区内睡眠      | 阻塞 Grace Period,导致内存泄漏     | 使用 SRCU 或将睡眠移到临界区外      |
|    | (non-preemptible RCU)       | 和系统挂起                           |                                     |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 2  | 忘记使用 rcu_dereference()    | 编译器可能优化掉指针读取或重排       | 启用 sparse 检查(__rcu 标注),    |
|    | 直接读取 RCU 保护的指针       | 导致读到不一致的数据                 | 使用 CONFIG_PROVE_RCU               |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 3  | 忘记使用 rcu_assign_pointer() | 其他 CPU 可能先看到新指针再看到新   | 代码审查中检查所有对 RCU 保护指针   |
|    | 直接赋值 RCU 保护的指针       | 数据的初始化,导致 use-after-init   | 的赋值操作                          |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 4  | 在 RCU 回调中访问已释放的     | use-after-free,内核 oops 或       | 将所有需要的数据嵌入到 rcu_head     |
|    | 数据结构的其他成员             | 数据损坏                             | 所在的结构体中                      |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 5  | 大量 call_rcu() 导致内存积压  | 回调队列无限增长,最终 OOM          | 定期调用 rcu_barrier() 或使用       |
|    |                               |                                      | synchronize_rcu() 做节流            |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 6  | 在中断上下文中调用             | 死锁(synchronize_rcu 可能睡眠)   | 在中断中只使用 call_rcu()           |
|    | synchronize_rcu()             |                                      |                                     |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 7  | 混用不同的 RCU 变体           | 例如用 rcu_read_lock() 保护,却用  | 确保 lock/unlock/synchronize/call   |
|    |                               | synchronize_rcu_bh() 等待           | 使用匹配的变体                      |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 8  | QSBR 中忘记调用               | Grace Period 永远无法完成,写者     | 在事件循环的每次迭代中调用          |
|    | rcu_quiescent_state()         | 永远阻塞                             | rcu_quiescent_state()               |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 9  | 写者之间缺少互斥              | 多个写者并发修改同一指针,导致      | 写者之间使用互斥锁或其他同步机制    |
|    |                               | 丢失更新或数据损坏                   |                                     |
+----+-------------------------------+--------------------------------------+-------------------------------------+
| 10 | 在读侧临界区中修改 RCU        | 违反 RCU 的读写分离语义,           | RCU 读侧只读取数据,所有修改       |
|    | 保护的数据                     | 导致其他读者看到不一致状态           | 通过 Copy-Update 模式进行           |
+----+-------------------------------+--------------------------------------+-------------------------------------+

调试工具

内核提供了 RCU 调试支持。在 .config 中启用 CONFIG_PROVE_RCU=yCONFIG_DEBUG_OBJECTS_RCU_HEAD=y,运行时可通过 /sys/kernel/debug/rcu/ 查看状态。lockdep 集成的 rcu_dereference_protected() 可在编译和运行时捕获 RCU 误用:

/* 在持有锁的上下文中安全访问 RCU 指针 */
struct foo *p = rcu_dereference_protected(gp,
    lockdep_is_held(&my_mutex));

/* 允许在多种上下文中访问 */
struct foo *p = rcu_dereference_check(gp,
    lockdep_is_held(&my_mutex) || rcu_read_lock_held());

十三、个人思考

RCU 的精妙之处

我认为 RCU 是计算机科学中最优雅的并发设计之一,原因有三:

第一,它重新定义了问题。传统的并发控制问着”如何让读者和写者协调”,RCU 问的是”读者能不能什么都不做”。这个问题的转换本身就是一种洞见——当你意识到大多数场景的读写比是 1000:1 甚至更高时,把所有成本转移到写者身上是完全合理的。

第二,它利用了硬件的特性而非对抗它。RCU 的设计与现代 CPU 的 cache 架构完美契合。读者不修改任何共享状态,意味着 cache line 永远不会因为读操作而 bounce。这不是巧合,而是深刻理解硬件后的有意设计。

第三,它证明了”等一等”可以是一种高效的策略。在我们的直觉中,等待总是低效的。但 RCU 告诉我们,如果等待的代价可以由少数写者承担,而不是由多数读者承担,那么整体效率反而更高。这是一种反直觉但深刻的权衡。

RCU 的局限性

当然,RCU 不是银弹:

  1. 写侧开销不可忽视:当写操作频繁时,Grace Period 的等待和内存复制会成为瓶颈。如果读写比接近 1:1,RCU 反而可能比读写锁更慢。

  2. 内存开销:在 Grace Period 结束之前,旧版本的数据无法释放。在极端情况下(比如大量写者产生大量旧版本),内存使用可能显著增加。

  3. 编程模型的限制:RCU 最适合指针替换(pointer replacement)模式。对于需要原地修改(in-place update)的数据结构,RCU 不直接适用。

  4. Grace Period 延迟不可预测:在实时系统中,synchronize_rcu() 的延迟取决于系统中最慢的读者,这可能影响实时性保证。SRCU(Sleepable RCU)在一定程度上缓解了这个问题,但增加了读侧开销。

  5. 学习曲线陡峭:RCU 的内存序要求、API 的配对使用、不同变体的选择——这些都需要深入理解才能正确使用。内核中 RCU 相关的 bug 修复补丁数量就是证明。

对并发编程的启示

RCU 给我最大的启示是:最好的优化不是让操作更快,而是让操作消失。 rcu_read_lock() 不是一个”很快的锁”,它根本就不是锁——它什么都没做。这种思维方式值得我们在其他领域借鉴。

Paul McKenney 在他的经典论文中写道,RCU 的设计哲学是”适当的懒惰”(appropriate laziness)。写者不急于清理旧数据,而是等待一个安全的时机。这种懒惰不是缺陷,而是一种深思熟虑的设计选择。在软件工程中,很多时候”延迟执行”都比”立即执行”更高效——这个原则在垃圾回收、写时复制、延迟加载等技术中都有体现。

RCU 提醒我们,解决并发问题不一定需要更精细的锁、更复杂的无锁算法。有时候,我们只需要换一个角度看问题,找到那个可以”什么都不做”的路径。


上一篇: Epoch-Based Reclamation 下一篇: 并发哈希表:从分段锁到无锁设计

相关阅读: - Hazard Pointers - epoll 的数据结构


By .