土法炼钢兴趣小组的算法知识备份

无锁栈:Treiber 栈与指数退避

文章导航

分类入口
algorithms
标签入口
#lock-free#treiber-stack#cas#elimination-backoff#concurrent

目录

1986 年,R. Kent Treiber 在 IBM 的一份技术报告中描述了一个极其简洁的无锁栈实现。整个算法只有两个操作——push 和 pop——每个操作的核心都是一次 CAS(Compare-And-Swap)。

三十多年后的今天,Treiber 栈仍然是并发数据结构的”Hello World”——每一本并发编程教材都会从它讲起。不是因为它最实用(它有严重的可扩展性问题),而是因为它浓缩了无锁编程的所有核心概念:CAS 循环、ABA 问题、内存回收、伪共享。

理解 Treiber 栈,就理解了无锁编程的基本范式。

下图展示了 Treiber 栈的 CAS 操作和 Elimination Backoff 的配对机制:

Treiber 栈与 Elimination Backoff

一、从 mutex 栈到无锁栈:为什么要去掉锁

临界区方案:加锁栈

最直接的并发栈就是给每个操作加一把互斥锁:

typedef struct Node {
    int val;
    struct Node *next;
} Node;

typedef struct {
    Node *top;
    pthread_mutex_t lock;
} LockedStack;

void locked_push(LockedStack *s, int val) {
    Node *n = malloc(sizeof(Node));
    n->val = val;
    pthread_mutex_lock(&s->lock);
    n->next = s->top;
    s->top = n;
    pthread_mutex_unlock(&s->lock);
}

int locked_pop(LockedStack *s, int *out) {
    pthread_mutex_lock(&s->lock);
    if (s->top == NULL) {
        pthread_mutex_unlock(&s->lock);
        return 0;
    }
    Node *old = s->top;
    s->top = old->next;
    *out = old->val;
    pthread_mutex_unlock(&s->lock);
    free(old);
    return 1;
}

这段代码正确、简洁,但在高并发下存在四个系统性问题:

  1. 优先级反转:低优先级线程持锁时,高优先级线程被阻塞。在实时系统中这可能导致任务错过截止时间。
  2. Convoying:一个线程在临界区内被调度器换出(例如时间片用尽或缺页中断),所有其他线程排队等待,即使 CPU 空闲。
  3. 不可组合:两个加锁操作不能安全组合为一个原子操作——尝试在持有锁 A 的同时获取锁 B 就引入了死锁风险。
  4. Cache line bouncing:锁变量本身就是一个竞争热点,N 个核心反复对同一 cache line 做 invalidation,导致总线流量随线程数平方增长。

这四个问题的根源在于悲观并发——进入临界区前必须先获取互斥权,无论实际是否有竞争。

CAS:乐观并发的基石

CAS(Compare-And-Swap)是一条硬件原子指令,提供了另一种并发范式:

// 伪代码:以下操作由硬件原子完成
bool CAS(T *ptr, T *expected, T desired) {
    if (*ptr == *expected) {
        *ptr = desired;
        return true;
    } else {
        *expected = *ptr;  // 更新 expected 为当前值
        return false;
    }
}

不同架构上的实现:

架构 指令 特点
x86/x86-64 CMPXCHG / CMPXCHG16B 直接 CAS,支持 64/128 位
ARM (v8+) LDXR / STXR(LL/SC) 加载独占/存储独占对,更灵活
RISC-V LR / SC 类似 ARM 的 LL/SC 语义
POWER LWARX / STWCX IBM 风格的 LL/SC

CAS 的核心优势:乐观并发。先假设没有竞争,直接尝试修改;如果失败(有人先修改了),重新读取并重试。大多数情况下竞争不会发生,所以 CAS 路径比加锁更轻。

临界区 vs CAS 的根本对比

维度 临界区(锁) CAS(无锁)
并发模型 悲观:先获取互斥权 乐观:直接尝试,失败重试
阻塞 获取不到锁时阻塞(或自旋) 永不阻塞,失败立即重试
进度保证 无(持锁线程可能被调度出去) Lock-free:至少一个线程能推进
优先级反转 存在 不存在
复杂度 简单 ABA、内存回收等需要额外处理
适用场景 临界区操作复杂时 共享状态可用单个原子操作更新时

为什么栈比队列更适合入门

栈只有一个竞争点——top 指针。push 和 pop 都只修改 top,一次 CAS 就能完成。

队列有两个竞争点——head(出队)和 tail(入队)。Michael-Scott 队列需要两步 CAS:先 CAS tail->next,再 CAS tail。中间状态需要”帮助”机制来修复,算法复杂度显著上升。

所以 Treiber 栈是学习无锁编程的最佳起点:一个指针、一次 CAS、最少的概念负担。

二、Treiber 栈原理

算法描述

栈只有一个共享指针 top,指向栈顶节点。每个节点有 valnext

Push 操作

  1. 创建新节点 n,设置 n->val
  2. 读取当前 topold_top
  3. 设置 n->next = old_top
  4. CAS(&top, &old_top, n)
    • 成功:返回
    • 失败:old_top 已被更新为当前 top,回到第 3 步

Pop 操作

  1. 读取当前 topold_top
  2. 如果 old_top == NULL,栈空,返回失败
  3. 读取 old_top->nextnew_top
  4. CAS(&top, &old_top, new_top)
    • 成功:返回 old_top->val
    • 失败:old_top 已被更新,回到第 1 步

关键观察:CAS 循环中不需要重新分配节点。push 时创建的节点可以反复使用——只需更新 n->next;pop 时的重试只需重新读取 top

CAS 循环的正确性

为什么这是正确的?线性化点(linearization point)分析:

每个操作要么成功(CAS 返回 true),要么发现冲突并重试。不存在中间状态——这就是无锁数据结构的核心不变量。

基础 C 实现

#include <stdint.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>

typedef struct Node {
    int val;
    struct Node *next;
} Node;

typedef struct {
    _Atomic(Node *) top;
} TreiberStack;

void treiber_init(TreiberStack *s) {
    atomic_store(&s->top, NULL);
}

void treiber_push(TreiberStack *s, int val) {
    Node *n = malloc(sizeof(Node));
    n->val = val;

    Node *old_top = atomic_load_explicit(&s->top, memory_order_relaxed);
    do {
        n->next = old_top;
    } while (!atomic_compare_exchange_weak_explicit(
        &s->top, &old_top, n,
        memory_order_release,
        memory_order_relaxed));
}

int treiber_pop(TreiberStack *s, int *val) {
    Node *old_top = atomic_load_explicit(&s->top, memory_order_acquire);
    Node *new_top;

    do {
        if (old_top == NULL) return 0;
        new_top = old_top->next;
    } while (!atomic_compare_exchange_weak_explicit(
        &s->top, &old_top, new_top,
        memory_order_acquire,
        memory_order_relaxed));

    *val = old_top->val;
    // 注意:这里不能直接 free(old_top)!
    // 其他线程可能还在读 old_top->next
    // 需要安全内存回收(Hazard Pointers 或 Epoch-Based)
    return 1;
}

内存序的选择

为什么 push 用 memory_order_release,pop 用 memory_order_acquire

Release-acquire 对形成了 happens-before 关系:push 的 release 和 pop 的 acquire 同步,保证 pop 线程能看到 push 线程写入的数据。

在 x86 上,release 和 acquire 都是免费的(x86 的 TSO 内存模型天然提供)。但在 ARM 和 RISC-V 上,它们会生成额外的屏障指令。这就是为什么不应该盲目使用 memory_order_seq_cst——在弱内存模型架构上,更强的内存序意味着更多的屏障指令和更高的延迟。

为什么用 compare_exchange_weak 而不是 strong

weak 版本允许”伪失败”(spurious failure)——即使值没有变化也可能返回 false。这在循环中没有问题(反正要重试),但允许编译器在 LL/SC 架构上生成更高效的代码。strong 版本在 LL/SC 架构上需要额外的循环来排除伪失败,在 CAS 循环中这是多余的开销。

三、ABA 问题深入

具体场景演示

ABA 问题是无锁编程中最经典的陷阱。下图展示了完整的场景:

ABA 问题场景演示

用文字详细描述。假设栈内容为 A → B → C:

线程 1(正在做 pop)              线程 2
──────────────────              ──────────
① 读取 old_top = A
② 读取 new_top = A->next = B
③ (此时被调度器换出)
                                ④ pop A → 成功,top = B
                                ⑤ pop B → 成功,top = C
                                ⑥ free(B)
                                ⑦ push A(重新入栈)
                                   top = A, A->next = C
                                   (此时栈:A → C)
⑧ (恢复执行)
⑨ CAS(&top, &A, B) → 成功!
   (因为 top 确实等于 A)

结果:top 现在指向已经被 free 的 B!

问题的本质:CAS 只比较指针的值(地址),无法区分”指针从未改变”和”指针改变后又恢复原值”。

为什么栈的 ABA 比队列简单

在 Michael-Scott 队列中,ABA 可能发生在 headtail 两个指针上,且 tail 的 ABA 还涉及”帮助推进”逻辑。栈只有一个 top 指针,ABA 的触发条件更简单——但后果同样严重。

解法一:Tagged Pointer(版本号指针)

最经典的解法:在指针旁边附带一个单调递增的版本号。每次修改指针时版本号加一。即使指针值相同,版本号不同也会导致 CAS 失败。

64 位系统上的实现策略

方案 A——利用指针高位。x86-64 当前只使用 48 位虚拟地址,高 16 位可用于存储 tag。但要注意 Intel 的 5-level paging(57 位地址)和 ARM 的 TBI(Top Byte Ignore)等扩展。

方案 B——double-width CAS。使用 128 位的 CMPXCHG16B(x86-64)或两个寄存器的 LL/SC(ARM),将指针和 tag 打包为一个 128 位的值。

typedef struct {
    Node *ptr;
    uint64_t tag;
} __attribute__((aligned(16))) TaggedPtr;

方案 B 更安全,因为 tag 有完整的 64 位空间,实际上不可能回绕。

解法二:Double-Width CAS

在 x86-64 上,CMPXCHG16B 指令原子地比较和交换 16 字节。GCC 提供 __sync_bool_compare_and_swap 和 C11 的 _Atomic 对 128 位类型的支持(需要 -mcx16 编译选项)。

// 确保 16 字节对齐
typedef struct {
    _Alignas(16) Node *ptr;
    uint64_t tag;
} TaggedPtr;

// GCC 内建方式
typedef unsigned __int128 uint128_t;

static inline bool dwcas(TaggedPtr *target,
                         TaggedPtr *expected,
                         TaggedPtr desired) {
    return __sync_bool_compare_and_swap(
        (uint128_t *)target,
        *(uint128_t *)expected,
        *(uint128_t *)&desired);
}

解法三:LL/SC(Load-Linked / Store-Conditional)

ARM、POWER、RISC-V 等架构提供 LL/SC 指令对,天然免疫 ABA 问题:

任何对该地址的写入(即使写入相同的值)都会清除标记,导致 SC 失败。这意味着在 LL/SC 架构上,ABA 问题在硬件层面就被解决了——不需要 tag。

但是,C11 原子操作将 LL/SC 抽象为 CAS 语义,编译器生成的代码可能无法充分利用 LL/SC 的 ABA 免疫特性。在性能关键路径上,可能需要使用平台特定的内联汇编。

三种解法的对比

方案 ABA 免疫 额外空间 平台限制 性能
Tagged pointer(高位) 0(利用未用位) 地址空间扩展后失效 最佳(单次 64-bit CAS)
Double-width CAS 8 字节 tag 需要 CMPXCHG16B(x86)或等价指令 好(128-bit CAS 略慢)
LL/SC 天然免疫 0 仅 ARM/POWER/RISC-V 好(取决于独占监视器粒度)
Hazard Pointers 间接解决 per-thread HP 列表 通用 中等(每次访问需发布/回收 HP)

四、指数退避:CAS 失败后如何减少竞争

问题:CAS 风暴

当 32 个线程同时对 top 做 CAS,每轮只有 1 个能成功,其余 31 个失败后立即重试。下一轮又只有 1 个成功……这就是 CAS 风暴——大量无效的 CAS 操作消耗总线带宽,成功率随线程数下降。

指数退避的思想

来自以太网 CSMA/CD 协议的经典策略:碰撞后不要立即重试,等一段随机时间。如果再次碰撞,等待时间加倍。

第 1 次失败:等待 [0, 1) 微秒
第 2 次失败:等待 [0, 2) 微秒
第 3 次失败:等待 [0, 4) 微秒
第 4 次失败:等待 [0, 8) 微秒
...
第 n 次失败:等待 [0, min(2^n, max_backoff)) 微秒

随机化很重要——如果所有线程等待完全相同的时间,它们会同时醒来并再次碰撞。

上下界的选择

退避策略有三个关键参数:

参数 含义 经验值 选择依据
min_backoff 初始退避下界 1 微秒 约等于一次 CAS 失败的代价
max_backoff 退避上界 1000 微秒 避免延迟过高,通常为 CAS 延迟 × 线程数
随机化范围 [0, current_backoff) 均匀分布 打散线程的唤醒时间

min_backoff 太大:低竞争时无谓等待。max_backoff 太小:高竞争时退避不够。实践中常用 1-1024 微秒的范围,可以根据运行时 CAS 成功率动态调整。

退避的实现:自旋 vs 睡眠

短退避用自旋(PAUSE 指令循环),长退避用 usleepnanosleep。分界点约在 1-10 微秒:

static inline void backoff_delay(int us) {
    if (us < 4) {
        // 短退避:自旋,用 PAUSE 指令减少功耗
        for (int i = 0; i < us * 100; i++) {
            __builtin_ia32_pause();
        }
    } else {
        struct timespec ts = {0, us * 1000L};
        nanosleep(&ts, NULL);
    }
}

x86 的 PAUSE 指令很关键——它告诉 CPU “我在自旋等待”,让 CPU 降低功耗并避免因 speculative execution 导致的流水线清空。

五、Elimination Backoff Stack

碰撞数组的思想

2004 年 Hendler、Shavit 和 Yerushalmi 的关键洞察:如果一个 push 和一个 pop 几乎同时发生,它们可以直接交换数据——不需要触碰栈。

从栈的语义来看:push(x) 紧跟 pop() 返回 x,效果等价于什么都没做。所以如果我们能让 push 和 pop 直接”碰面”并交换数据,栈的状态完全不变,但两个操作都完成了。

这就是 Elimination(消除):互补的操作对可以被消除。

Exchanger 的实现

消除数组的每个槽位(slot)本质上是一个 Exchanger——允许两个线程原子地交换数据:

Slot 状态机:
  EMPTY → push 写入值 → WAITING → pop 取走值 → BUSY → push 确认 → EMPTY

Push 线程写入数据并等待;Pop 线程检测到有数据后取走。这个过程通过 CAS 保证原子性。

概率分析

假设 N 个线程中,push 和 pop 各占 50%。消除数组有 K 个槽位。一个 push 线程选择某个槽位的概率为 1/K。

一次成功配对需要:一个 push 和一个 pop 选择同一个槽位,且在超时窗口内到达。

配对成功率 ≈ 1 - (1 - 1/K)^(N/2)

当 N >> K 时,每个槽位几乎一定有配对。这就是为什么 Elimination Stack 在高竞争下表现优异:更多的线程意味着更高的配对概率。

K 的选择:太少则配对碰撞(两个 push 抢同一个槽位),太多则配对概率降低。经验上 K ≈ 线程数/2 是较好的选择,实践中常用 K = 8-16 的固定值。

在高竞争下的吞吐提升

Elimination Stack 的核心优势:竞争从敌人变成朋友

传统的可扩展性曲线是”线程越多越慢”。Elimination Stack 的曲线是”线程越多越快”——至少在 push/pop 比例均衡时如此。

六、内存回收问题

为什么不能直接 free

考虑 pop 操作中的关键时刻:

do {
    if (old_top == NULL) return 0;
    new_top = old_top->next;         // ← 这里解引用 old_top
} while (!atomic_compare_exchange_weak(...));

// CAS 成功后,old_top 被弹出
free(old_top);  // ← 危险!

问题:在线程 A 执行 old_top->next 时,线程 B 可能已经 pop 了同一个节点并 free 了它。线程 A 对已释放内存的解引用是未定义行为。

即使线程 A 自己是 CAS 成功者——它在 free 之前,其他线程可能还持有对该节点的引用(读取了 old_top 但还没执行到 ->next)。

这是所有无锁数据结构共有的问题,不仅限于栈。

Hazard Pointer 方案概述

Michael(2004)提出的方案:

  1. 每个线程维护一组 Hazard Pointer(HP),声明”我正在访问这些地址”。
  2. 线程要访问共享节点前,先把地址写入自己的 HP 列表。
  3. 要回收节点时,扫描所有线程的 HP 列表。如果没有任何线程的 HP 指向该节点,可以安全回收。
  4. 否则,将回收请求放入延迟列表,稍后重试。
线程 A 做 pop:
  hp[0] = old_top;           // 声明:我在访问 old_top
  memory_fence();            // 确保 HP 可见
  if (top != old_top) {      // 再次验证
      hp[0] = NULL;
      goto retry;
  }
  new_top = old_top->next;   // 现在安全解引用
  ...
  hp[0] = NULL;              // 完成后清除 HP

代价:每次节点访问需要一次 store(设置 HP)+ 一次 fence + 一次验证。回收时需要扫描所有线程的 HP。

详见 Hazard Pointers:安全内存回收

Epoch-Based Reclamation 方案概述

更轻量的方案,但有不同的权衡:

  1. 维护一个全局 epoch 计数器(0, 1, 2 循环)。
  2. 线程进入临界区时,记录当前 epoch。
  3. 退出临界区时,将待回收节点放入当前 epoch 的回收列表。
  4. 当所有线程都已经过了某个 epoch(没有线程还在该 epoch 的临界区),该 epoch 的所有回收节点可以安全释放。
线程 A 做 pop:
  enter_epoch();               // 记录当前 epoch
  ... 正常 pop 操作 ...
  retire(old_top);             // 放入延迟回收列表
  leave_epoch();               // 离开临界区

优势:摊销开销极低(只在进入/离开临界区时做少量原子操作)。劣势:如果某个线程长时间不离开临界区,所有待回收节点都无法释放,可能导致内存膨胀。

详见 Epoch-Based Reclamation

方案对比

方案 访问开销 回收延迟 内存膨胀风险 实现复杂度
Hazard Pointers 高(每次读设置 HP) 低(与线程数成正比) 可控(O(N×K))
Epoch-Based 低(摊销) 较高 高(一个慢线程阻塞所有回收)
RCU 最低(读侧几乎为零) 低(读侧)/高(写侧)

七、完整 C 实现:Treiber 栈 + 指数退避 + Tagged Pointer

以下是一个生产级的实现,约 200 行,包含 tagged pointer 防 ABA 和指数退避:

#include <stdint.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

/* ── 节点定义 ── */
typedef struct Node {
    int val;
    struct Node *next;
} Node;

/* ── Tagged Pointer:指针 + 版本号 ── */
typedef struct {
    Node *ptr;
    uint64_t tag;
} __attribute__((aligned(16))) TaggedPtr;

/* 128-bit CAS 封装(x86-64: CMPXCHG16B,需要 -mcx16) */
typedef unsigned __int128 uint128_t;

static inline bool tagged_cas(TaggedPtr *target,
                              TaggedPtr *expected,
                              TaggedPtr desired) {
    uint128_t exp_val, des_val;
    memcpy(&exp_val, expected, sizeof(uint128_t));
    memcpy(&des_val, &desired, sizeof(uint128_t));

    bool ok = __sync_bool_compare_and_swap(
        (volatile uint128_t *)target, exp_val, des_val);
    if (!ok) {
        uint128_t cur = __atomic_load_n(
            (volatile uint128_t *)target, __ATOMIC_RELAXED);
        memcpy(expected, &cur, sizeof(uint128_t));
    }
    return ok;
}

static inline TaggedPtr tagged_load(const TaggedPtr *src) {
    uint128_t raw = __atomic_load_n(
        (const volatile uint128_t *)src, __ATOMIC_ACQUIRE);
    TaggedPtr tp;
    memcpy(&tp, &raw, sizeof(TaggedPtr));
    return tp;
}

/* ── 退避策略 ── */
#define BACKOFF_MIN_US  1
#define BACKOFF_MAX_US  1024

static inline void do_backoff(int *backoff_us) {
    int delay = *backoff_us + (rand() % (*backoff_us + 1));
    if (delay < 4) {
        for (int i = 0; i < delay * 80; i++)
            __builtin_ia32_pause();
    } else {
        struct timespec ts = {0, (long)delay * 1000};
        nanosleep(&ts, NULL);
    }
    int next = *backoff_us * 2;
    *backoff_us = next < BACKOFF_MAX_US ? next : BACKOFF_MAX_US;
}

/* ── Treiber 栈(带 tagged pointer + 指数退避) ── */
typedef struct {
    TaggedPtr top __attribute__((aligned(16)));
} TreiberStackTP;

void treiber_tp_init(TreiberStackTP *s) {
    s->top.ptr = NULL;
    s->top.tag = 0;
}

void treiber_tp_push(TreiberStackTP *s, int val) {
    Node *n = malloc(sizeof(Node));
    n->val = val;

    int backoff = BACKOFF_MIN_US;
    TaggedPtr old_top = tagged_load(&s->top);
    TaggedPtr new_top;

    for (;;) {
        n->next = old_top.ptr;
        new_top.ptr = n;
        new_top.tag = old_top.tag + 1;

        if (tagged_cas(&s->top, &old_top, new_top))
            return;

        do_backoff(&backoff);
    }
}

int treiber_tp_pop(TreiberStackTP *s, int *out) {
    int backoff = BACKOFF_MIN_US;
    TaggedPtr old_top = tagged_load(&s->top);
    TaggedPtr new_top;

    for (;;) {
        if (old_top.ptr == NULL) return 0;

        new_top.ptr = old_top.ptr->next;
        new_top.tag = old_top.tag + 1;

        if (tagged_cas(&s->top, &old_top, new_top)) {
            *out = old_top.ptr->val;
            /* 注意:不直接 free(old_top.ptr)
             * 生产代码需用 Hazard Pointer 或 Epoch-Based 回收
             * 此处为演示省略 */
            return 1;
        }

        do_backoff(&backoff);
    }
}

/* ── 简易测试 ── */
#ifdef TREIBER_TP_MAIN
#include <stdio.h>
#include <pthread.h>

static TreiberStackTP g_stack;
#define OPS_PER_THREAD 100000

void *worker(void *arg) {
    (void)arg;
    unsigned int seed = (unsigned int)pthread_self();
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        if (rand_r(&seed) % 2 == 0) {
            treiber_tp_push(&g_stack, i);
        } else {
            int v;
            treiber_tp_pop(&g_stack, &v);
        }
    }
    return NULL;
}

int main(void) {
    treiber_tp_init(&g_stack);
    int nthreads = 8;
    pthread_t threads[64];
    for (int i = 0; i < nthreads; i++)
        pthread_create(&threads[i], NULL, worker, NULL);
    for (int i = 0; i < nthreads; i++)
        pthread_join(threads[i], NULL);
    printf("Done. Draining stack...\n");
    int v, count = 0;
    while (treiber_tp_pop(&g_stack, &v)) count++;
    printf("Remaining elements: %d\n", count);
    return 0;
}
#endif

编译方式:

gcc -DTREIBER_TP_MAIN -O2 -mcx16 -lpthread -o treiber_tp treiber_tp.c

关键设计决策说明:

八、Elimination Backoff Stack 完整实现

#include <stdint.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <limits.h>

/* ── 节点 ── */
typedef struct ENode {
    int val;
    struct ENode *next;
} ENode;

/* ── 主栈(简化版 Treiber,使用普通指针 CAS) ── */
typedef struct {
    _Atomic(ENode *) top;
} EStack;

/* ── Elimination 槽位 ── */
/* 状态编码到指针中:
 *   NULL          → EMPTY(无人等待)
 *   SENTINEL      → POP 正在等待
 *   其他 Node*    → PUSH 正在等待(节点地址即数据)
 */
#define ELIM_EMPTY  ((ENode *)0)
#define ELIM_POP    ((ENode *)1)  /* 哨兵值,表示 pop 在等待 */

#define ELIM_ARRAY_SIZE   16
#define ELIM_ROUNDS       32     /* 自旋轮次 */
#define BACKOFF_MIN       1
#define BACKOFF_MAX       512

typedef struct {
    /* 每个槽位独占一个 cache line */
    _Atomic(ENode *) slot __attribute__((aligned(64)));
} __attribute__((aligned(64))) ElimSlot;

typedef struct {
    EStack stack;
    ElimSlot elim[ELIM_ARRAY_SIZE];
} EliminationBackoffStack;

/* ── 工具函数 ── */
static inline unsigned int fast_rand(unsigned int *seed) {
    *seed = *seed * 1103515245 + 12345;
    return (*seed >> 16) & 0x7fff;
}

static inline void spin_wait(int rounds) {
    for (int i = 0; i < rounds; i++)
        __builtin_ia32_pause();
}

/* ── 初始化 ── */
void ebs_init(EliminationBackoffStack *ebs) {
    atomic_store(&ebs->stack.top, NULL);
    for (int i = 0; i < ELIM_ARRAY_SIZE; i++)
        atomic_store(&ebs->elim[i].slot, ELIM_EMPTY);
}

/* ── 尝试通过 elimination 完成 push ── */
static bool elim_try_push(EliminationBackoffStack *ebs,
                          ENode *node, unsigned int *seed) {
    int idx = fast_rand(seed) % ELIM_ARRAY_SIZE;
    _Atomic(ENode *) *slot = &ebs->elim[idx].slot;

    ENode *expected = ELIM_EMPTY;
    if (!atomic_compare_exchange_strong_explicit(
            slot, &expected, node,
            memory_order_release, memory_order_relaxed)) {
        /* 槽位非空——如果是 pop 在等待,直接配对 */
        if (expected == ELIM_POP) {
            if (atomic_compare_exchange_strong_explicit(
                    slot, &expected, node,
                    memory_order_release, memory_order_relaxed)) {
                /* pop 会看到 node 并取走 */
                return false; /* 让 pop 侧完成语义 */
            }
        }
        return false;
    }

    /* 成功写入,等待 pop 来取 */
    for (int r = 0; r < ELIM_ROUNDS; r++) {
        ENode *cur = atomic_load_explicit(slot, memory_order_acquire);
        if (cur != node) {
            /* pop 取走了我们的节点(替换为 ELIM_EMPTY 或其他)*/
            return true;
        }
        spin_wait(4);
    }

    /* 超时,尝试收回 */
    expected = node;
    if (atomic_compare_exchange_strong_explicit(
            slot, &expected, ELIM_EMPTY,
            memory_order_relaxed, memory_order_relaxed))
        return false;  /* 成功收回,elimination 未完成 */

    /* CAS 失败说明 pop 已取走 */
    return true;
}

/* ── 尝试通过 elimination 完成 pop ── */
static bool elim_try_pop(EliminationBackoffStack *ebs,
                         int *out, unsigned int *seed) {
    int idx = fast_rand(seed) % ELIM_ARRAY_SIZE;
    _Atomic(ENode *) *slot = &ebs->elim[idx].slot;

    ENode *cur = atomic_load_explicit(slot, memory_order_acquire);

    if (cur != ELIM_EMPTY && cur != ELIM_POP) {
        /* push 在等待,尝试取走 */
        if (atomic_compare_exchange_strong_explicit(
                slot, &cur, ELIM_EMPTY,
                memory_order_acquire, memory_order_relaxed)) {
            *out = cur->val;
            /* 不 free(cur)——由调用者或回收机制处理 */
            return true;
        }
    }
    return false;
}

/* ── Push(主栈 + elimination 退避) ── */
void ebs_push(EliminationBackoffStack *ebs, int val) {
    ENode *n = malloc(sizeof(ENode));
    n->val = val;

    unsigned int seed = (unsigned int)(uintptr_t)n;
    int backoff = BACKOFF_MIN;

    for (;;) {
        /* 尝试主栈 */
        ENode *old = atomic_load_explicit(
            &ebs->stack.top, memory_order_relaxed);
        n->next = old;
        if (atomic_compare_exchange_weak_explicit(
                &ebs->stack.top, &old, n,
                memory_order_release, memory_order_relaxed))
            return;

        /* CAS 失败,尝试 elimination */
        if (elim_try_push(ebs, n, &seed))
            return;

        /* 退避 */
        spin_wait(backoff);
        backoff = backoff * 2 < BACKOFF_MAX ? backoff * 2 : BACKOFF_MAX;
    }
}

/* ── Pop(主栈 + elimination 退避) ── */
int ebs_pop(EliminationBackoffStack *ebs, int *out) {
    unsigned int seed = (unsigned int)time(NULL);
    int backoff = BACKOFF_MIN;

    for (;;) {
        /* 尝试主栈 */
        ENode *old = atomic_load_explicit(
            &ebs->stack.top, memory_order_acquire);
        if (old == NULL) {
            /* 栈可能为空,但 elimination 中可能有数据 */
            if (elim_try_pop(ebs, out, &seed))
                return 1;
            return 0;  /* 真正为空 */
        }
        ENode *nx = old->next;
        if (atomic_compare_exchange_weak_explicit(
                &ebs->stack.top, &old, nx,
                memory_order_acquire, memory_order_relaxed)) {
            *out = old->val;
            return 1;
        }

        /* CAS 失败,尝试 elimination */
        if (elim_try_pop(ebs, out, &seed))
            return 1;

        /* 退避 */
        spin_wait(backoff);
        backoff = backoff * 2 < BACKOFF_MAX ? backoff * 2 : BACKOFF_MAX;
    }
}

/* ── 销毁 ── */
void ebs_destroy(EliminationBackoffStack *ebs) {
    ENode *cur = atomic_load(&ebs->stack.top);
    while (cur) {
        ENode *next = cur->next;
        free(cur);
        cur = next;
    }
}

实现要点:

  1. 槽位状态用指针值编码:避免额外的状态字段,减少 CAS 操作次数。NULL 表示空,(void*)1 是 pop 等待的哨兵,其余值表示 push 等待的节点地址。
  2. 每个槽位 64 字节对齐:避免不同槽位落在同一 cache line 上导致伪共享。
  3. 自旋而非 sleep:elimination 的超时窗口极短(几十个 PAUSE 周期),用 sleep 太重。
  4. push 只写入节点地址而不写值:节点本身携带数据,pop 取走节点后直接读 val 字段。

九、基准测试

测试框架

以下是四种栈实现的对比基准测试:mutex stack、Treiber、Treiber+backoff、Elimination。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>
#include <string.h>

#define OPS_PER_THREAD  500000
#define MAX_THREADS     64

typedef struct {
    const char *name;
    void *ctx;
    void (*push_fn)(void *ctx, int val);
    int  (*pop_fn)(void *ctx, int *val);
} BenchTarget;

typedef struct {
    BenchTarget *target;
    int thread_id;
} WorkerArg;

static void *bench_worker(void *arg) {
    WorkerArg *wa = (WorkerArg *)arg;
    unsigned int seed = (unsigned int)wa->thread_id;
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        if (rand_r(&seed) % 2 == 0)
            wa->target->push_fn(wa->target->ctx, i);
        else {
            int v;
            wa->target->pop_fn(wa->target->ctx, &v);
        }
    }
    return NULL;
}

static double run_bench(BenchTarget *target, int nthreads) {
    pthread_t threads[MAX_THREADS];
    WorkerArg args[MAX_THREADS];

    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);

    for (int i = 0; i < nthreads; i++) {
        args[i] = (WorkerArg){target, i};
        pthread_create(&threads[i], NULL, bench_worker, &args[i]);
    }
    for (int i = 0; i < nthreads; i++)
        pthread_join(threads[i], NULL);

    clock_gettime(CLOCK_MONOTONIC, &end);
    double elapsed = (end.tv_sec - start.tv_sec) +
                     (end.tv_nsec - start.tv_nsec) / 1e9;
    double mops = (double)(nthreads * OPS_PER_THREAD) / elapsed / 1e6;
    return mops;
}

void run_all_benchmarks(BenchTarget targets[], int ntargets) {
    int thread_counts[] = {1, 2, 4, 8, 16, 32, 64};
    int ncounts = sizeof(thread_counts) / sizeof(thread_counts[0]);

    printf("%-25s", "Threads");
    for (int t = 0; t < ntargets; t++)
        printf("  %-18s", targets[t].name);
    printf("\n");

    for (int c = 0; c < ncounts; c++) {
        int nt = thread_counts[c];
        printf("%-25d", nt);
        for (int t = 0; t < ntargets; t++) {
            double mops = run_bench(&targets[t], nt);
            printf("  %-18.1f", mops);
        }
        printf("\n");
    }
}

预期结果

在典型的 x86-64 服务器(Intel Xeon,2 NUMA 节点,共 32 核 / 64 线程)上:

线程数 Mutex (Mops/s) Treiber (Mops/s) Treiber+Backoff (Mops/s) Elimination (Mops/s)
1 30.2 45.1 44.8 42.3
2 18.5 35.0 34.5 33.1
4 12.3 22.4 24.1 28.5
8 8.1 12.0 15.8 38.2
16 5.2 6.3 9.1 55.0
32 3.1 3.2 5.4 68.3
64 2.0 1.8 3.2 72.5

关键观察:

  1. 1 线程:Treiber 最快(无竞争时 CAS 比 mutex 轻);Elimination 略有开销。
  2. 2-4 线程:Treiber 开始下降,backoff 版本略好;Elimination 开始显示优势。
  3. 8 线程:Elimination 反超所有方案,因为配对开始高效运作。
  4. 32-64 线程:Mutex 和朴素 Treiber 几乎一样慢;Backoff 有 1.5-2x 改善;Elimination 达到 20-40x 加速。
  5. 跨 NUMA(32+ 线程):朴素 Treiber 甚至比 mutex 慢——跨 NUMA 的 CAS 失败代价比 mutex 的调度开销更高。

push/pop 比例的影响

Elimination 的效果高度依赖于 push 和 pop 的比例。如果工作负载是 90% push + 10% pop,大部分 push 找不到配对的 pop,elimination 退化为普通退避。

push:pop 比例 Elimination 加速比(32 线程)
50:50 22x
70:30 8x
90:10 2.5x
100:0 0.95x(略慢)

十、工程陷阱表

序号 陷阱 症状 解法
1 忽略 ABA 问题 偶发的 use-after-free crash,难以复现 使用 tagged pointer 或安全内存回收
2 Pop 后直接 free 其他线程读已释放内存,SIGSEGV 或静默数据损坏 使用 Hazard Pointers 或 Epoch-Based Reclamation
3 CAS 失败时立即 spin 高竞争时 CPU 100%、吞吐量趋零,功耗飙升 加入指数退避或 elimination
4 memory_order_seq_cst everywhere ARM 上额外屏障指令,性能下降 20-40% 分析数据流,使用最弱的足够的内存序
5 Elimination slot 与栈 top 在同一 cache line 伪共享(false sharing),操作本应独立却互相干扰 对齐到不同 cache line(__attribute__((aligned(64)))
6 Elimination 超时设置不当 太短:配对率低,频繁退回主栈;太长:延迟增加 根据竞争程度自适应调整,或使用自旋轮次而非时间
7 Tagged pointer 版本号溢出 32-bit tag 约 40 亿次操作后回绕 使用 64-bit tag;或改用 Hazard Pointers
8 未对齐的 128-bit CAS CMPXCHG16B 要求 16 字节对齐,否则 SIGBUS 使用 __attribute__((aligned(16)))_Alignas(16)
9 malloc 在 CAS 循环内 每次 CAS 失败都分配新节点,内存泄漏 在循环外分配一次,循环内只更新 next 指针
10 NUMA 不感知的 elimination 数组 跨 NUMA 节点的 CAS 延迟是本地的 3-5x 按 NUMA 节点分区 elimination 数组
11 全局 rand() 用于随机选择槽位 glibc rand() 内部有锁,变成新的竞争点 使用线程局部的 rand_r 或 xorshift
12 忽略编译器重排 编译器把 n->val = val 重排到 CAS 之后 使用 atomic_thread_fence 或正确的内存序参数

十一、硬件层面

Cache Line Bouncing

无锁栈的 top 指针是一个 64 字节 cache line 中的 8 字节地址。当 N 个核心同时 CAS 这个指针时,该 cache line 必须在核心间传输。

MESI 协议下的状态转换:

Core 0: CMPXCHG &top     → cache line 变为 Modified
Core 1: CMPXCHG &top     → Core 0 的 line 变为 Invalid
                          → 数据通过互连总线传输到 Core 1
                          → Core 1 的 line 变为 Modified
Core 2: CMPXCHG &top     → Core 1 的 line 变为 Invalid
                          → 传输到 Core 2
...

每次 CAS(无论成功还是失败)都触发一次 cache line 传输。传输延迟:

场景 延迟
同一 L1 cache 命中 ~1 ns
同一 L2 cache ~5 ns
同一 NUMA 节点内核间 ~20-40 ns
跨 NUMA 节点 ~80-200 ns

在 32 核系统上,如果每个核心都在 CAS top,每次成功的 CAS 需要等待 cache line 从上一个修改者传输过来。有效吞吐量 ≈ 1 / (cache line 传输延迟),与线程数无关!

这就是为什么 Treiber 栈的吞吐量在高线程数下趋于常数——它受限于物理互连总线的带宽,而不是 CPU 的计算能力。

MESI 协议与 CAS 的交互

CAS 指令在 MESI 协议中的行为:

  1. CAS 开始前:核心发送 Read-For-Ownership (RFO) 请求,将 cache line 从 Shared/Invalid 变为 Exclusive。
  2. 比较阶段:读取 cache line 中的值,与 expected 比较。
  3. 交换阶段:如果匹配,写入 desired,cache line 变为 Modified。
  4. 如果不匹配:CAS 失败,cache line 仍然变为 Exclusive(因为 RFO 已经完成)。

注意:即使 CAS 失败,也会导致 cache line invalidation。这是 CAS 风暴的根本原因——失败的 CAS 和成功的 CAS 对总线流量的影响是一样的。

这就是指数退避有效的硬件原因:通过延迟重试,减少了无效的 cache line invalidation,让成功的 CAS 有更多时间独占 cache line。

False Sharing(伪共享)

伪共享是指两个逻辑上独立的变量恰好落在同一个 cache line 上,导致对一个变量的修改会 invalidate 另一个变量所在的 cache line。

在无锁栈中常见的伪共享场景:

/* 错误:top 和 elim[0] 可能在同一 cache line */
typedef struct {
    _Atomic(Node *) top;       // 8 字节
    _Atomic(Node *) elim[16];  // 紧接 top 之后
} BadStack;

/* 正确:padding 确保不同变量在不同 cache line */
typedef struct {
    _Atomic(Node *) top;
    char _pad[56];                          // 填充到 64 字节
    _Atomic(Node *) elim[16] __attribute__((aligned(64)));
} GoodStack;

Padding 策略

现代 x86 处理器的 cache line 大小为 64 字节。常用的 padding 方式:

/* 方法 1:手动 padding */
typedef struct {
    _Atomic(Node *) top;
    char _pad[64 - sizeof(_Atomic(Node *))];
} PaddedTop;

/* 方法 2:GCC aligned 属性 */
typedef struct {
    _Atomic(Node *) top __attribute__((aligned(64)));
} AlignedTop;

/* 方法 3:C11 _Alignas */
typedef struct {
    _Alignas(64) _Atomic(Node *) top;
} C11AlignedTop;

需要 padding 的位置:

  1. 栈顶 top 指针:独占一个 cache line。
  2. Elimination 数组的每个槽位:每个槽位独占一个 cache line(否则相邻槽位的操作会互相干扰)。
  3. 线程局部的 Hazard Pointer:每个线程的 HP 列表独占 cache line。

代价:内存膨胀。16 个 elimination 槽位原本需要 128 字节(8 字节 × 16),padding 后变成 1024 字节(64 字节 × 16)。在大多数场景下这是值得的——1KB 的内存换取数倍的吞吐量提升。

NUMA 的影响

在多 NUMA 节点系统上,跨节点的 cache line 传输延迟是节点内的 2-5 倍。这意味着:

  1. 如果 top 指针频繁在不同 NUMA 节点的核心间弹跳,延迟会更高。
  2. Elimination 数组应该按 NUMA 节点分区:同一节点的线程优先在本地分区的槽位中配对。

一个简单的 NUMA 感知策略:

int slot_index = (numa_node_id * SLOTS_PER_NODE) +
                 (fast_rand(&seed) % SLOTS_PER_NODE);

这样同一 NUMA 节点的线程大概率在本地 cache 中完成配对,避免跨节点流量。

十二、个人观点

Treiber 栈是并发编程的”原子”——最简单的无锁数据结构,但浓缩了所有核心问题。从它出发,可以清晰地看到并发数据结构的设计空间。

一、无锁不等于高性能。 Treiber 栈是无锁的,但在高竞争下比 mutex 好不了多少。根本原因是 LIFO 栈的语义要求串行化——所有操作都必须经过同一个 top 指针。无锁消除了锁的系统性问题(优先级反转、convoying),但不能消除数据结构本身的串行瓶颈。在选择无锁方案之前,先问自己:瓶颈是锁本身,还是数据结构的语义?

二、最好的优化是改变问题。 Elimination Stack 不是在”更快地做 CAS”——它改变了操作的路径:push 和 pop 直接交换数据,绕过了串行瓶颈。这是并发编程中最深刻的设计原则:当硬件限制了一个操作的速度时,不要试图让它更快,而是想办法让它不需要发生。

三、内存回收是真正的难题。 实现一个正确的 Treiber 栈不难。实现一个正确且不泄漏内存的 Treiber 栈才是真正的挑战。Hazard Pointers 和 Epoch-Based Reclamation 各有 trade-off:HP 对每次访问收税,EBR 对长临界区收税。没有完美方案,只有适合场景的方案。在我的经验中,如果线程的操作粒度小且一致,EBR 通常是更好的选择;如果线程可能被长时间抢占或有不可预测的暂停(如 GC),HP 更安全。

四、Rust 的所有权系统在这里大放异彩。 Rust 的 crossbeam 库提供了 epoch::pin() API,编译器强制你在 guard 的作用域内才能访问共享指针。这把运行时的 use-after-free 风险变成了编译时错误。C 的无锁代码需要严格的代码审查和大量的压力测试来发现内存安全问题;Rust 在编译阶段就能拦截大部分错误。如果你在写并发数据结构,Rust 是目前最好的语言选择。

五、理解硬件才能写好无锁代码。 Cache line bouncing、MESI 协议、NUMA 拓扑——这些不是可选知识,而是设计无锁数据结构的基础。一个看起来正确的算法可能因为伪共享而损失 80% 的性能。一个在单 NUMA 节点上表现良好的方案可能在跨 NUMA 时完全崩溃。无锁编程是”在硬件上编程”,你需要了解 cache line 的大小、互连总线的延迟、原子指令的开销。

六、实践建议。 对于大多数应用,先用 pthread_mutex_t(或更好的,lock-free queue 如 disruptor 模式)。只有当 profiling 明确显示锁竞争是瓶颈时,才考虑无锁方案。上无锁之前,先考虑分片(sharding)——把一个全局栈拆成 N 个线程局部栈,消除共享,比任何无锁算法都有效。无锁是最后的手段,不是第一选择。


系列导航: - 上一篇:无锁队列:Michael-Scott 与 ABA 问题 - 下一篇:并发跳表:ConcurrentSkipListMap 的设计

相关阅读: - Hazard Pointers:安全内存回收 - Epoch-Based Reclamation

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · algorithms

无锁队列:Michael-Scott 算法与 ABA 问题

当多线程争抢一把 mutex 保护的队列时,吞吐量随线程数增长反而下降——锁成了瓶颈。无锁队列用 CAS 循环取代互斥,换来真正的无阻塞并发,但也引入了 ABA 问题和内存回收的工程深渊。

2026-04-14 · algorithms

并发跳表:ConcurrentSkipListMap 的设计

Java 的 `java.util.concurrent` 提供了 ConcurrentHashMap,却没有 ConcurrentTreeMap——取而代之的是一个基于跳表的 ConcurrentSkipListMap。为什么 Doug Lea 选择了跳表而不是红黑树?因为平衡树的旋转操作会同时修改多个节点的指针,在并发场景下几乎不可能做到无锁;而跳表天然的分层链表结构使得每次修改只涉及局部指针,为 CAS 操作提供了完美的施展空间。

2026-04-15 · algorithms

RCU:Linux 内核的读侧零开销并发

Linux 内核如何在并发数据结构中实现读侧零开销?RCU 用一种违反直觉的方式回答了这个问题:让读者永远不等待,让写者承担一切代价。


By .