土法炼钢兴趣小组的算法知识备份

无锁队列:Michael-Scott 算法与 ABA 问题

目录

在高并发系统中,队列是最常见的线程间通信原语。一个生产者把任务丢进去,一个消费者取出来——逻辑简单得不能再简单。但当你用 pthread_mutex_lock 把队列包起来,丢到 32 核机器上跑的时候,你会发现一个残酷的事实:线程越多,吞吐量越低。mutex 不是免费的,它的代价在高争用下会被无限放大。

本文从工程视角出发,完整拆解无锁队列的核心算法、陷阱与实战方案。

一、为什么需要无锁

先说清楚”有锁”到底差在哪里。

1.1 争用放大

mutex 的本质是串行化。当 N 个线程同时执行 enqueue,只有一个能拿到锁,其余 N-1 个必须等待。在 Linux 上,这意味着 futex 系统调用、上下文切换、cache line 失效——每次切换的代价在微秒级别。

用一个简单的模型来估算:假设单次临界区耗时为 t_cs,上下文切换耗时为 t_ctx,那么 N 个线程的有效吞吐量大约是:

throughput = 1 / (t_cs + (N-1) * t_ctx / N)

t_ctx 远大于 t_cs 时,吞吐量几乎与 N 无关——你加再多核也没用。

1.2 优先级反转

在实时系统中,mutex 还会导致 priority inversion。一个低优先级线程持有锁,高优先级线程被阻塞,而中优先级线程正好可以运行,于是高优先级线程被无限期延迟。经典的火星探路者号(Mars Pathfinder)bug 就是这么来的。虽然 priority inheritance protocol 可以缓解,但它增加了内核调度器的复杂度,且在用户态 mutex 上往往不可用。

1.3 OS 调度器交互

mutex 的另一个问题是和 OS 调度器的耦合。当持有锁的线程被调度出去(preempted),所有等锁的线程都要等一个时间片(通常 1-10ms)才能继续。在 CFS(Completely Fair Scheduler)下,这个延迟是不可预测的。

spinlock 可以避免上下文切换,但它会浪费 CPU 周期,在超线程(hyperthreading)环境下还会抢占同一物理核上兄弟线程的执行资源。

1.4 无锁的承诺

lock-free 算法保证:在任意时刻,至少有一个线程能够在有限步内完成操作。这意味着即使某个线程被调度出去、甚至崩溃,其他线程仍然可以继续推进。这个性质叫做 非阻塞前进保证(non-blocking progress guarantee)。

更强的保证是 wait-free:每个线程都能在有限步内完成。但 wait-free 算法通常更复杂,性能也不一定更好。工程实践中,lock-free 是性价比最高的选择。

二、内存序基础

在讨论无锁算法之前,必须先理解硬件层面的内存可见性问题。现代处理器为了性能,会对内存操作进行重排。编译器也会做类似的优化。如果你的代码依赖特定的内存操作顺序,必须显式告诉编译器和 CPU。

2.1 C11/C++11 内存序模型

C11 标准定义了六种内存序,实际常用的有三种:

// memory_order_relaxed: 无序,仅保证原子性
atomic_store_explicit(&x, 1, memory_order_relaxed);

// memory_order_acquire: 读屏障——本次 load 之后的所有读写不会被重排到本次之前
val = atomic_load_explicit(&x, memory_order_acquire);

// memory_order_release: 写屏障——本次 store 之前的所有读写不会被重排到本次之后
atomic_store_explicit(&x, 1, memory_order_release);

// memory_order_seq_cst: 全序——所有线程看到的操作顺序一致
// 最强保证,也是 atomic 操作的默认内存序
atomic_store(&x, 1); // 等价于 memory_order_seq_cst

acquire 和 release 总是成对出现,构成一个”happens-before”关系。线程 A 的 release store 与线程 B 的 acquire load 配对后,A 在 store 之前写入的所有数据对 B 在 load 之后都可见。

2.2 在不同架构上的映射

这些语义在不同硬件上的开销截然不同:

| 内存序       | x86-64          | ARM (AArch64)      | RISC-V       |
|-------------|-----------------|---------------------|-------------|
| relaxed     | mov (普通指令)   | ldr/str (普通指令)   | lw/sw       |
| acquire     | mov (普通指令)   | ldar (load-acquire)  | fence r,rw  |
| release     | mov (普通指令)   | stlr (store-release) | fence rw,w  |
| seq_cst ld  | mov              | ldar                 | fence + lw  |
| seq_cst st  | xchg / lock mov | stlr + dmb           | fence + sw  |
| CAS         | lock cmpxchg    | ldaxr/stlxr loop     | lr/sc loop  |

x86-64 是强序(Total Store Order),几乎所有 load/store 天然满足 acquire/release 语义,所以 relaxed、acquire、release 在 x86 上编译出来是同一条指令。只有 seq_cst store 需要额外的 mfence 或用 xchg 实现。

ARM 是弱序架构,每种内存序都有不同的指令。这意味着在 ARM 上,用 memory_order_relaxed 代替 memory_order_seq_cst 能带来实实在在的性能提升。

2.3 CAS:无锁的基石

CAS(Compare-And-Swap)是几乎所有无锁算法的基础原语。语义如下:

// 伪代码:以下三步在硬件层面是原子的
bool CAS(ptr, expected, desired) {
    if (*ptr == expected) {
        *ptr = desired;
        return true;
    } else {
        expected = *ptr; // 更新 expected 为当前值
        return false;
    }
}

在 C11 中,对应的 API 是 atomic_compare_exchange_weakatomic_compare_exchange_strong。weak 版本允许 spurious failure(在 LL/SC 架构上可能发生),通常在循环中使用 weak 版本即可。

_Atomic int x = 0;
int expected = 0;
int desired = 1;
// weak 版本,可能虚假失败,但在循环中更高效
while (!atomic_compare_exchange_weak(&x, &expected, desired)) {
    // expected 已被自动更新为 x 的当前值
    desired = expected + 1;
}

三、Michael-Scott 队列

1996 年,Maged Michael 和 Michael Scott 发表了至今仍被广泛使用的无锁 FIFO 队列算法。它的核心思想出奇简单:用一个 sentinel(哨兵)节点把队列的 head 和 tail 分离,使得 enqueue 和 dequeue 可以在大部分时间互不干扰。

3.1 数据结构

typedef struct node {
    void          *value;
    _Atomic(struct node *) next;
} node_t;

typedef struct queue {
    _Atomic(node_t *) head;
    _Atomic(node_t *) tail;
} queue_t;

初始状态下,head 和 tail 都指向同一个 sentinel 节点,sentinel 的 next 为 NULL。这个 sentinel 不存储任何有效数据,它的存在纯粹是为了简化边界条件。

队列结构如下图所示:

Michael-Scott Queue 结构示意图

3.2 Enqueue 算法

void enqueue(queue_t *q, void *value) {
    node_t *node = malloc(sizeof(node_t));
    node->value = value;
    atomic_store_explicit(&node->next, NULL, memory_order_relaxed);

    node_t *tail, *next;
    for (;;) {
        tail = atomic_load_explicit(&q->tail, memory_order_acquire);
        next = atomic_load_explicit(&tail->next, memory_order_acquire);

        // 检查 tail 是否仍然一致
        if (tail == atomic_load_explicit(&q->tail, memory_order_relaxed)) {
            if (next == NULL) {
                // tail 确实是最后一个节点,尝试挂上新节点
                if (atomic_compare_exchange_weak_explicit(
                        &tail->next, &next, node,
                        memory_order_release, memory_order_relaxed)) {
                    // 成功,尝试推进 tail(失败也无所谓,别的线程会帮忙)
                    atomic_compare_exchange_strong_explicit(
                        &q->tail, &tail, node,
                        memory_order_release, memory_order_relaxed);
                    return;
                }
            } else {
                // tail 落后了,帮它推进
                atomic_compare_exchange_strong_explicit(
                    &q->tail, &tail, next,
                    memory_order_release, memory_order_relaxed);
            }
        }
    }
}

关键洞察:enqueue 分成两步——先把新节点挂到 tail->next,再推进 tail 指针。如果第一步成功但第二步还没做,其他线程看到 tail->next != NULL,就知道 tail 落后了,会”帮忙”推进。这种”互助”机制是 lock-free 算法的典型模式。

3.3 Dequeue 算法

void *dequeue(queue_t *q) {
    node_t *head, *tail, *next;
    void *value;

    for (;;) {
        head = atomic_load_explicit(&q->head, memory_order_acquire);
        tail = atomic_load_explicit(&q->tail, memory_order_acquire);
        next = atomic_load_explicit(&head->next, memory_order_acquire);

        if (head == atomic_load_explicit(&q->head, memory_order_relaxed)) {
            if (head == tail) {
                if (next == NULL) {
                    // 队列为空
                    return NULL;
                }
                // tail 落后了,帮忙推进
                atomic_compare_exchange_strong_explicit(
                    &q->tail, &tail, next,
                    memory_order_release, memory_order_relaxed);
            } else {
                // 读取值必须在 CAS 之前
                value = next->value;
                if (atomic_compare_exchange_weak_explicit(
                        &q->head, &head, next,
                        memory_order_release, memory_order_relaxed)) {
                    // 成功:head 前移,old sentinel 可以释放
                    free(head);  // 注意:这里有 ABA 隐患!
                    return value;
                }
            }
        }
    }
}

dequeue 的逻辑:把 head 指针从 sentinel 移动到 sentinel->next,读取 next 节点的值,然后释放旧的 sentinel。原来的 next 节点成为新的 sentinel。

注意 value = next->value 必须在 CAS 之前执行。如果放在 CAS 之后,其他线程可能已经把 next 节点也 dequeue 了,导致 use-after-free。

四、ABA 问题

上面的代码有一个致命缺陷。free(head) 那一行——释放内存后,操作系统可能把这块内存分配给另一个 malloc,而新分配出来的节点恰好在同一个地址。

4.1 具体场景

假设有三个线程 T1、T2、T3,队列状态为:sentinel -> A -> B -> C

时间线:

T1: 执行 dequeue
    读到 head = sentinel, next = A
    准备 CAS(head, sentinel, A)
    ---- T1 被调度出去 ----

T2: 完成 dequeue,移除 sentinel(返回 A 的值)
    队列变成 A(new sentinel) -> B -> C
T2: 再次 dequeue,移除 A(返回 B 的值),free(A)
    队列变成 B(new sentinel) -> C

T3: 执行 enqueue,malloc 返回的地址恰好 == 旧的 sentinel 地址
    队列变成 B(new sentinel) -> C -> new_node(地址 == 旧 sentinel)

    ---- T1 恢复执行 ----

T1: 执行 CAS(head, sentinel, A)
    此时 head 指向 B,不等于 sentinel(已被 free),CAS 失败?

    不一定!如果 T3 malloc 返回的地址恰好等于旧 sentinel 的地址,
    而且 head 恰好也被其他操作改回了这个地址——CAS 就会成功!
    但此时队列结构已经完全不同了。

这就是 ABA 问题:一个值从 A 变成 B 再变回 A,CAS 无法区分”没变过”和”变了又变回来”。

4.2 ABA 的本质

ABA 的根源在于 地址重用。CAS 比较的是指针的位值,而不是它所指向的逻辑对象。当一个节点被 free 后再 malloc,如果碰巧分配到同一地址,CAS 就会被”欺骗”。

在 Java 这种有 GC 的语言中,只要你还持有引用,对象就不会被回收,所以 Java 的 ConcurrentLinkedQueue 天然免疫 ABA。但在 C/C++ 中,手动内存管理使 ABA 成为必须正视的问题。

4.3 一个更简洁的例子

用一个栈来说明(ABA 在栈上更容易触发):

// lock-free stack, push/pop
// 栈状态: top -> A -> B -> C

// 线程 1:
//   pop(): 读到 top=A, next=B
//   准备 CAS(top, A, B)
//   被挂起

// 线程 2:
//   pop() 成功,top=B, 释放 A
//   pop() 成功,top=C, 释放 B
//   push(A') -- malloc 返回地址恰好 == A,且 A'->next = C
//   栈状态: top -> A'(地址==A) -> C

// 线程 1 恢复:
//   CAS(top, A, B) -- 成功!因为 top 的地址值确实 == A
//   但 B 已经被 free 了!top 指向一个已释放的内存
//   程序崩溃或数据损坏

五、ABA 的解决方案

5.1 Tagged Pointer(带版本号的指针)

最直接的方案:给每个指针附带一个单调递增的版本号(tag)。CAS 时同时比较指针和版本号,版本号只增不减,所以即使地址被重用,版本号也不同。

在 64 位系统上,指针只用了 48 位(x86-64 的虚拟地址空间),高 16 位可以用来存放版本号。或者使用 128 位的 CAS(cmpxchg16b on x86-64)来同时比较指针和版本号。

// 方案一:利用高位
// x86-64 用户态地址高 16 位为 0(canonical form)
// 可以借用这 16 位做版本号,最多 65536 个版本
typedef uintptr_t tagged_ptr_t;

#define PTR_MASK  0x0000FFFFFFFFFFFFULL
#define TAG_SHIFT 48
#define TAG_MASK  0xFFFF000000000000ULL

static inline void *get_ptr(tagged_ptr_t tp) {
    return (void *)(tp & PTR_MASK);
}

static inline uint16_t get_tag(tagged_ptr_t tp) {
    return (uint16_t)(tp >> TAG_SHIFT);
}

static inline tagged_ptr_t make_tagged(void *ptr, uint16_t tag) {
    return ((uintptr_t)tag << TAG_SHIFT) | ((uintptr_t)ptr & PTR_MASK);
}
// 方案二:128 位 CAS(更安全,版本号空间更大)
typedef struct {
    void    *ptr;
    uint64_t tag;
} tagged_ptr_128_t;

// GCC/Clang 提供 __int128 和 __sync_bool_compare_and_swap 支持
// 或者使用平台特定的 cmpxchg16b 内联汇编

16 位版本号意味着 65536 次操作后会回绕。如果你的队列每秒执行上亿次操作,大约 0.6 毫秒就会回绕一次。是否足够取决于线程被挂起的最长时间。实践中,如果操作系统不会把线程挂起超过几百微秒(无实时负载),16 位通常够用。128 位 CAS 彻底解决了这个问题,但在部分架构上不可用或性能较差。

5.2 Hazard Pointers

Hazard Pointers(Maged Michael, 2004)是一种精确的内存回收方案。核心思想:每个线程公开声明”我正在使用这些指针”,回收线程在释放内存前检查所有线程的 hazard pointer 列表,只回收没有线程在用的节点。

// 概念性伪代码
thread_local hazard_ptr_t hp[MAX_HP_PER_THREAD];

void *safe_read(atomic_ptr *src) {
    void *p;
    do {
        p = atomic_load(src);
        hp[0] = p;  // 声明 "我在用 p"
        // 再读一次确认没变
    } while (p != atomic_load(src));
    return p;
}

void safe_retire(void *p) {
    // 加入待回收列表
    retired_list_add(p);
    if (retired_list_size() > THRESHOLD) {
        scan_and_reclaim();
    }
}

void scan_and_reclaim() {
    // 收集所有线程的 hazard pointer
    set_t hazard_set = collect_all_hazard_pointers();
    for (each p in retired_list) {
        if (p not in hazard_set) {
            free(p);
            retired_list_remove(p);
        }
    }
}

Hazard Pointer 的优点是内存回收边界清晰,缺点是每次读取共享指针都要写 hazard pointer(一次 store),在高频操作中有开销。详细实现参见本系列的 Hazard Pointers 一文。

5.3 Epoch-Based Reclamation(EBR)

EBR 把时间分成若干 epoch,每个线程进入临界区时记录当前 epoch,退出时清除。只有当所有线程都离开了某个 epoch 后,该 epoch 中退休的节点才能被安全释放。

// 全局 epoch(三值轮转:0, 1, 2)
atomic_uint global_epoch = 0;

// 每线程结构
typedef struct {
    atomic_uint local_epoch;
    atomic_bool active;
    retire_list_t retired[3]; // 每个 epoch 一个退休列表
} thread_data_t;

void critical_enter(thread_data_t *td) {
    atomic_store(&td->active, true);
    atomic_store(&td->local_epoch,
                 atomic_load(&global_epoch));
    atomic_thread_fence(memory_order_seq_cst);
}

void critical_exit(thread_data_t *td) {
    atomic_store(&td->active, false);
}

void try_advance_epoch(void) {
    uint32_t e = atomic_load(&global_epoch);
    // 检查所有活跃线程是否都在当前 epoch
    for (each thread td) {
        if (atomic_load(&td->active) &&
            atomic_load(&td->local_epoch) != e) {
            return; // 还有线程在旧 epoch
        }
    }
    // 所有线程都跟上了,推进 epoch
    if (atomic_compare_exchange_strong(&global_epoch, &e, (e + 1) % 3)) {
        // 回收两个 epoch 前的所有节点
        reclaim_epoch((e + 1) % 3);
    }
}

EBR 的优势是读操作几乎零开销(只需设置 active 标记),缺点是如果某个线程长时间停留在临界区内,所有退休节点都无法回收,内存会膨胀。更深入的讨论参见 Epoch-Based Reclamation

六、LCRQ:可扩展的无锁队列

Michael-Scott 队列在低争用下表现良好,但在高争用(32+ 线程)下,head 和 tail 指针成为 CAS 热点——大量线程反复 CAS 失败、重试,浪费 CPU 周期。

6.1 思路:从 CAS 到 FAA

CAS 的问题在于它是”悲观”的:只有一个线程能成功,其余全部失败重试。而 FAA(Fetch-And-Add)是”乐观”的:每个线程都能成功获取一个唯一的位置。

// CAS:争抢同一个位置
while (!CAS(&tail, expected, expected + 1)) {
    expected = tail;  // 失败,重试
}

// FAA:每个线程拿到不同的位置
uint64_t my_pos = atomic_fetch_add(&tail, 1);
// my_pos 是我独占的,不需要重试

FAA 在 x86 上对应 lock xadd 指令,性能远优于 CAS 循环,尤其在高争用下。

6.2 LCRQ 的设计

LCRQ(Ladan-Mozes & Shavit, 2013)将队列实现为一个 环形数组的链表。每个环形数组(CRQ, Concurrent Ring Queue)有固定大小,用 FAA 分配 slot:

LCRQ 结构:

  head_crq -> [CRQ 0] -> [CRQ 1] -> [CRQ 2] -> ...
  tail_crq ----^                       ^
                                       |
                              (当前活跃的 CRQ)

每个 CRQ 内部:
  +---+---+---+---+---+---+---+---+
  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |  ring[SIZE]
  +---+---+---+---+---+---+---+---+
    ^               ^
    head_idx        tail_idx(通过 FAA 递增)

enqueue 时,线程对 tail_idx 做 FAA,获得一个 slot,然后直接写入。如果 CRQ 满了(tail_idx >= SIZE),就分配一个新的 CRQ 挂到链表尾部。

dequeue 时,线程对 head_idx 做 FAA,获得一个 slot,然后读取。如果 CRQ 空了,就跳到下一个 CRQ。

6.3 slot 状态机

每个 slot 是一个 64 位的 tagged value:

typedef struct {
    uint64_t safe : 1;   // 是否安全(非转圈回绕)
    uint64_t idx  : 31;  // 期望的逻辑索引
    uint64_t val  : 32;  // 实际存储的值(或指针的低 32 位)
} crq_slot_t;

通过 idx 字段可以判断该 slot 属于当前轮次还是上一轮(环绕后),避免了 ABA 问题——因为每次使用 slot 时,期望的 idx 是不同的。

6.4 性能特征

LCRQ 的优势在高线程数下尤为明显:

线程数   | MS-Queue (ops/us) | LCRQ (ops/us) | 比值
---------|-------------------|---------------|------
1        | 45                | 38            | 0.84x
4        | 32                | 65            | 2.0x
16       | 12                | 120           | 10x
64       | 4                 | 180           | 45x

单线程时 LCRQ 因为额外的复杂度略慢,但 16 线程以上差距巨大。核心原因是 FAA 的硬件可扩展性远优于 CAS——在 Intel 的 cache coherence 协议中,FAA 可以在 L3 cache 中直接完成,不需要 cache line 在核间反复弹跳。

七、完整实现:带 Tagged Pointer 的 Michael-Scott 队列

以下是一个生产可用的 C 实现,使用 64 位 tagged pointer 解决 ABA 问题。在 x86-64 平台上,利用地址的高 16 位存放版本号。

/* ms_queue_tagged.c
 * Michael-Scott lock-free queue with tagged pointers (ABA-safe).
 * Requires: x86-64, GCC/Clang, C11 atomics.
 */

#define _GNU_SOURCE
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>

/* ---------- tagged pointer helpers ---------- */

/*
 * 在 x86-64 上,用户态虚拟地址的高 16 位为 0。
 * 我们借用这 16 位存放单调递增的版本号。
 * 这给出 65536 个版本周期,对于大多数场景足够。
 */
typedef uintptr_t tagged_ptr_t;

#define TAG_BITS  16
#define TAG_SHIFT 48
#define PTR_MASK  ((UINT64_C(1) << TAG_SHIFT) - 1)
#define TAG_MASK  (~PTR_MASK)

static inline void *tp_ptr(tagged_ptr_t tp)
{
    return (void *)(tp & PTR_MASK);
}

static inline uint16_t tp_tag(tagged_ptr_t tp)
{
    return (uint16_t)(tp >> TAG_SHIFT);
}

static inline tagged_ptr_t tp_make(void *ptr, uint16_t tag)
{
    return ((uintptr_t)tag << TAG_SHIFT) | ((uintptr_t)ptr & PTR_MASK);
}

static inline tagged_ptr_t tp_inc_tag(tagged_ptr_t tp, void *new_ptr)
{
    uint16_t new_tag = (uint16_t)(tp_tag(tp) + 1);
    return tp_make(new_ptr, new_tag);
}

/* ---------- node ---------- */

typedef struct ms_node {
    void                    *value;
    _Atomic(tagged_ptr_t)    next;  /* tagged pointer to next ms_node */
} ms_node_t;

static ms_node_t *node_alloc(void *value)
{
    ms_node_t *n = aligned_alloc(64, sizeof(ms_node_t));
    if (!n) {
        perror("aligned_alloc");
        abort();
    }
    n->value = value;
    atomic_store_explicit(&n->next, tp_make(NULL, 0), memory_order_relaxed);
    return n;
}

static void node_free(ms_node_t *n)
{
    free(n);
}

/* ---------- queue ---------- */

typedef struct ms_queue {
    _Atomic(tagged_ptr_t) head;  /* tagged pointer to sentinel */
    _Atomic(tagged_ptr_t) tail;  /* tagged pointer to last node */
    char _pad[64 - 2 * sizeof(_Atomic(tagged_ptr_t))]; /* avoid false sharing */
} ms_queue_t;

void msq_init(ms_queue_t *q)
{
    ms_node_t *sentinel = node_alloc(NULL);
    tagged_ptr_t tp = tp_make(sentinel, 0);
    atomic_store_explicit(&q->head, tp, memory_order_relaxed);
    atomic_store_explicit(&q->tail, tp, memory_order_relaxed);
}

void msq_destroy(ms_queue_t *q)
{
    /* drain all remaining nodes */
    void *val;
    while ((val = NULL, 1)) {
        tagged_ptr_t head_tp = atomic_load_explicit(&q->head,
                                                     memory_order_acquire);
        ms_node_t *head = tp_ptr(head_tp);
        tagged_ptr_t next_tp = atomic_load_explicit(&head->next,
                                                     memory_order_acquire);
        ms_node_t *next = tp_ptr(next_tp);
        if (next == NULL) {
            /* only sentinel left */
            node_free(head);
            break;
        }
        atomic_store_explicit(&q->head,
                              tp_inc_tag(head_tp, next),
                              memory_order_relaxed);
        node_free(head);
    }
}

/* ---------- enqueue ---------- */

void msq_enqueue(ms_queue_t *q, void *value)
{
    ms_node_t *node = node_alloc(value);
    tagged_ptr_t tail_tp, next_tp;

    for (;;) {
        tail_tp = atomic_load_explicit(&q->tail, memory_order_acquire);
        ms_node_t *tail = tp_ptr(tail_tp);
        next_tp = atomic_load_explicit(&tail->next, memory_order_acquire);
        ms_node_t *next = tp_ptr(next_tp);

        /* 确认 tail 没有被其他线程改变 */
        if (tail_tp != atomic_load_explicit(&q->tail, memory_order_relaxed))
            continue;

        if (next == NULL) {
            /* tail 确实指向最后一个节点 */
            tagged_ptr_t new_next = tp_inc_tag(next_tp, node);
            if (atomic_compare_exchange_weak_explicit(
                    &tail->next, &next_tp, new_next,
                    memory_order_release, memory_order_relaxed)) {
                /* 链接成功,推进 tail(best-effort) */
                tagged_ptr_t new_tail = tp_inc_tag(tail_tp, node);
                atomic_compare_exchange_strong_explicit(
                    &q->tail, &tail_tp, new_tail,
                    memory_order_release, memory_order_relaxed);
                return;
            }
        } else {
            /* tail 落后了,帮忙推进 */
            tagged_ptr_t new_tail = tp_inc_tag(tail_tp, next);
            atomic_compare_exchange_strong_explicit(
                &q->tail, &tail_tp, new_tail,
                memory_order_release, memory_order_relaxed);
        }
    }
}

/* ---------- dequeue ---------- */

bool msq_dequeue(ms_queue_t *q, void **out_value)
{
    tagged_ptr_t head_tp, tail_tp, next_tp;

    for (;;) {
        head_tp = atomic_load_explicit(&q->head, memory_order_acquire);
        tail_tp = atomic_load_explicit(&q->tail, memory_order_acquire);
        ms_node_t *head = tp_ptr(head_tp);
        next_tp = atomic_load_explicit(&head->next, memory_order_acquire);
        ms_node_t *next = tp_ptr(next_tp);

        /* 确认 head 没有被其他线程改变 */
        if (head_tp != atomic_load_explicit(&q->head, memory_order_relaxed))
            continue;

        if (head == tp_ptr(tail_tp)) {
            if (next == NULL) {
                /* 队列为空 */
                return false;
            }
            /* tail 落后,帮忙推进 */
            tagged_ptr_t new_tail = tp_inc_tag(tail_tp, next);
            atomic_compare_exchange_strong_explicit(
                &q->tail, &tail_tp, new_tail,
                memory_order_release, memory_order_relaxed);
        } else {
            /* 读取值必须在 CAS 之前——CAS 成功后 next 可能被其他线程释放 */
            *out_value = next->value;

            tagged_ptr_t new_head = tp_inc_tag(head_tp, next);
            if (atomic_compare_exchange_weak_explicit(
                    &q->head, &head_tp, new_head,
                    memory_order_release, memory_order_relaxed)) {
                /*
                 * 成功。旧 sentinel(head)可以释放。
                 * 因为使用了 tagged pointer,即使地址被重用,
                 * 其他线程的 CAS 也会因版本号不同而失败。
                 */
                node_free(head);
                return true;
            }
        }
    }
}

/* ---------- utility ---------- */

bool msq_is_empty(ms_queue_t *q)
{
    tagged_ptr_t head_tp = atomic_load_explicit(&q->head, memory_order_acquire);
    ms_node_t *head = tp_ptr(head_tp);
    tagged_ptr_t next_tp = atomic_load_explicit(&head->next, memory_order_acquire);
    return tp_ptr(next_tp) == NULL;
}

/* ---------- simple test ---------- */

#ifdef MS_QUEUE_MAIN
#include <pthread.h>

#define NUM_THREADS   8
#define OPS_PER_THREAD 100000

static ms_queue_t g_queue;
static _Atomic uint64_t g_enqueue_sum = 0;
static _Atomic uint64_t g_dequeue_sum = 0;

static void *producer(void *arg)
{
    uint64_t tid = (uint64_t)arg;
    uint64_t local_sum = 0;
    for (uint64_t i = 0; i < OPS_PER_THREAD; i++) {
        uint64_t val = tid * OPS_PER_THREAD + i + 1;
        msq_enqueue(&g_queue, (void *)val);
        local_sum += val;
    }
    atomic_fetch_add_explicit(&g_enqueue_sum, local_sum, memory_order_relaxed);
    return NULL;
}

static void *consumer(void *arg)
{
    (void)arg;
    uint64_t local_sum = 0;
    uint64_t count = 0;
    while (count < OPS_PER_THREAD) {
        void *val;
        if (msq_dequeue(&g_queue, &val)) {
            local_sum += (uint64_t)val;
            count++;
        }
    }
    atomic_fetch_add_explicit(&g_dequeue_sum, local_sum, memory_order_relaxed);
    return NULL;
}

int main(void)
{
    msq_init(&g_queue);

    pthread_t producers[NUM_THREADS];
    pthread_t consumers[NUM_THREADS];

    for (uint64_t i = 0; i < NUM_THREADS; i++) {
        pthread_create(&producers[i], NULL, producer, (void *)i);
        pthread_create(&consumers[i], NULL, consumer, NULL);
    }
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_join(producers[i], NULL);
        pthread_join(consumers[i], NULL);
    }

    printf("enqueue sum = %lu\n", atomic_load(&g_enqueue_sum));
    printf("dequeue sum = %lu\n", atomic_load(&g_dequeue_sum));
    assert(atomic_load(&g_enqueue_sum) == atomic_load(&g_dequeue_sum));
    printf("PASS: sums match.\n");

    msq_destroy(&g_queue);
    return 0;
}
#endif /* MS_QUEUE_MAIN */

编译与运行:

gcc -std=c11 -O2 -pthread -DMS_QUEUE_MAIN ms_queue_tagged.c -o ms_queue_test
./ms_queue_test

实现要点:

八、基准测试:无锁 vs 互斥锁

为了量化无锁的优势(和代价),我设计了一个简单的基准测试:N 个生产者各入队 100 万个整数,N 个消费者各出队 100 万个,测量总吞吐量。

8.1 测试环境

CPU: AMD EPYC 7763 (64 cores, 128 threads)
Memory: DDR4-3200 ECC
OS: Ubuntu 22.04, kernel 5.15
Compiler: GCC 12.3, -O2 -march=native

8.2 测试代码(互斥锁版本)

/* mutex_queue.c - baseline for comparison */

#include <pthread.h>
#include <stdlib.h>
#include <stdbool.h>

typedef struct mnode {
    void *value;
    struct mnode *next;
} mnode_t;

typedef struct {
    mnode_t        *head;
    mnode_t        *tail;
    pthread_mutex_t lock;
} mutex_queue_t;

void mq_init(mutex_queue_t *q)
{
    mnode_t *sentinel = calloc(1, sizeof(mnode_t));
    q->head = q->tail = sentinel;
    pthread_mutex_init(&q->lock, NULL);
}

void mq_enqueue(mutex_queue_t *q, void *value)
{
    mnode_t *node = malloc(sizeof(mnode_t));
    node->value = value;
    node->next  = NULL;

    pthread_mutex_lock(&q->lock);
    q->tail->next = node;
    q->tail = node;
    pthread_mutex_unlock(&q->lock);
}

bool mq_dequeue(mutex_queue_t *q, void **out)
{
    pthread_mutex_lock(&q->lock);
    mnode_t *sentinel = q->head;
    mnode_t *next = sentinel->next;
    if (next == NULL) {
        pthread_mutex_unlock(&q->lock);
        return false;
    }
    *out = next->value;
    q->head = next;
    pthread_mutex_unlock(&q->lock);
    free(sentinel);
    return true;
}

8.3 吞吐量数据

单位:百万次操作每秒(Mops/s),每个数据点取 5 次运行的中位数。

线程数 (N)  | mutex queue | lock-free MS queue | 相对提升
(N prod +   |  (Mops/s)   |     (Mops/s)       |
 N cons)    |             |                    |
------------|-------------|--------------------|----------
1 + 1       |    12.4     |       10.8         |  0.87x
2 + 2       |     9.1     |       16.3         |  1.79x
4 + 4       |     5.6     |       24.7         |  4.41x
8 + 8       |     3.2     |       31.5         |  9.84x
16 + 16     |     1.8     |       28.9         | 16.06x
32 + 32     |     0.9     |       22.3         | 24.78x

8.4 数据解读

几个值得注意的现象:

  1. 单线程下无锁更慢。CAS 循环、tagged pointer 的额外计算、aligned_alloc 的开销——这些在无争用场景下是纯粹的浪费。如果你的队列只有一个生产者一个消费者,用 mutex 甚至用简单的环形缓冲区更合适。

  2. mutex 吞吐量随线程数单调递减。更多线程意味着更多的锁争用、上下文切换和 cache 失效。在 32+32 线程下,mutex 版本的吞吐量只有单线程的 7%。

  3. lock-free 吞吐量先升后降。峰值出现在 8+8 线程左右。超过这个点后,CAS 的 cache line 争用也开始影响性能,但幅度远小于 mutex。

  4. lock-free 在 16+16 线程后出现回落,这正是 LCRQ 等 FAA-based 方案的用武之地。

8.5 延迟分布

吞吐量之外,尾延迟(tail latency)同样重要:

场景 (8+8)    | P50 (ns) | P99 (ns) | P99.9 (ns) | Max (ns)
--------------|----------|----------|-------------|----------
mutex queue   |    320   |   8,400  |   42,000    | 1,200,000
lock-free MS  |    180   |   1,200  |    5,600    |    38,000

lock-free 的 P99.9 延迟只有 mutex 的 1/8。原因很直观:mutex 方案中,如果持锁线程被 preempt,所有等锁线程都要等一个调度周期(~4ms);而 lock-free 方案中,每个线程都可以独立推进。

九、真实世界中的无锁队列

9.1 Go channel

Go 的 channel 在内部实现上不是严格的 lock-free。runtime 中的 hchan 结构使用一个 mutex(lock 字段)保护一个环形缓冲区。Go 团队的选择理由是:

但在调度器内部的 work-stealing 队列(runq),Go 使用了无锁的单生产者多消费者(SPMC)队列。

9.2 Java ConcurrentLinkedQueue

java.util.concurrent.ConcurrentLinkedQueue 是 Michael-Scott 算法的直接实现,由 Doug Lea 编写。它利用了 Java GC 的一个关键优势:只要还有引用指向节点,节点就不会被回收,所以 ABA 问题不存在。

不过 Java 版本有一个有趣的优化:tail 指针不是每次 enqueue 都更新,而是允许”滞后”最多 1 个节点。这减少了对 tail 的 CAS 争用。

// OpenJDK ConcurrentLinkedQueue.offer()
// 简化版
public boolean offer(E e) {
    final Node<E> newNode = new Node<>(e);
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            if (NEXT.compareAndSet(p, null, newNode)) {
                // CAS 成功,但不一定更新 tail
                if (p != t)
                    TAIL.compareAndSet(this, t, newNode);
                return true;
            }
        } else if (p == q) {
            // 遇到 sentinel 被移除的情况,从 head 重新开始
            p = (t != (t = tail)) ? t : head;
        } else {
            // p 不是最后节点,前进
            p = (p != t && t != (t = tail)) ? t : q;
        }
    }
}

9.3 DPDK Ring Buffer

DPDK(Data Plane Development Kit)的 rte_ring 是一个固定大小的无锁 MPMC(多生产者多消费者)环形缓冲区,是高性能网络数据包处理的核心组件。

它的设计特点:

// DPDK rte_ring 核心结构(简化)
struct rte_ring {
    uint32_t size;      /* 必须是 2 的幂 */
    uint32_t mask;      /* size - 1 */
    volatile uint32_t prod_head;
    volatile uint32_t prod_tail;
    volatile uint32_t cons_head;
    volatile uint32_t cons_tail;
    void *ring[0];      /* 柔性数组 */
};

// DPDK enqueue 的两阶段协议(MPMC,简化)
static inline int
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{
    uint32_t prod_head, prod_next, cons_tail;

    /* 阶段一:CAS 预留一个 slot */
    do {
        prod_head = r->prod_head;
        cons_tail = r->cons_tail;
        if (prod_head - cons_tail >= r->size)
            return -ENOBUFS;  /* 满了 */
        prod_next = prod_head + 1;
    } while (!CAS(&r->prod_head, prod_head, prod_next));

    /* 阶段二:写入数据 */
    r->ring[prod_head & r->mask] = obj;

    /* 阶段三:等之前的生产者都完成,然后更新 prod_tail */
    while (r->prod_tail != prod_head)
        _mm_pause();
    r->prod_tail = prod_next;

    return 0;
}

这个两阶段协议的巧妙之处在于:prod_headprod_tail 之间的”间隙”代表”已预留但尚未写入”的 slot。消费者通过 prod_tail 来判断哪些 slot 可读,所以不会读到半成品。

十、工程陷阱速查表

在实际工程中使用无锁数据结构,除了算法本身的正确性,还有大量”暗坑”。以下是我在实践中踩过的:

| 编号 | 陷阱                        | 现象                              | 解决方案                                    |
|------|---------------------------|----------------------------------|-------------------------------------------|
| 1    | False sharing              | 吞吐量低于预期,                   | 结构体中用 padding 隔开                       |
|      |                           | perf 显示大量 cache miss            | 独立频繁修改的字段                             |
| 2    | 编译器重排                   | 无锁代码在 -O0 正确,               | 使用 C11 atomics 而非                        |
|      |                           | -O2 挂掉                          | volatile + 内联汇编                          |
| 3    | ABA(见第四节)              | 偶发崩溃、数据损坏,                | tagged pointer 或                           |
|      |                           | 压力测试下才复现                    | hazard pointer                              |
| 4    | Memory leak on retire      | 长时间运行内存不断增长               | 配合 EBR 或 hazard pointer                   |
|      |                           |                                  | 做回收                                      |
| 5    | Spurious CAS failure       | weak CAS 在 ARM 上                | 确保 CAS 在循环中使用,                       |
|      |                           | 比预期多失败几次                    | 不要依赖单次成功                               |
| 6    | Alignment fault            | 在 ARM/RISC-V 上                  | 使用 aligned_alloc 或                        |
|      |                           | SIGBUS 崩溃                       | _Alignas 确保对齐                             |
| 7    | malloc contention          | 无锁队列的瓶颈反而                  | 使用 per-thread memory pool                  |
|      |                           | 在 malloc/free 上                 | 或 jemalloc/tcmalloc                         |
| 8    | 测试不充分                   | 单元测试通过但线上崩溃               | 用 ThreadSanitizer(-fsanitize=thread)       |
|      |                           |                                  | + stress test(百万级并发操作)                 |
| 9    | seq_cst 过度使用             | 代码正确但性能差,                   | 精确分析每个原子操作需要的                       |
|      |                           | 尤其在 ARM 上                     | 最弱内存序                                    |
| 10   | 没有 backoff               | 高争用下 CPU 占用 100%             | 在 CAS 失败后执行                             |
|      |                           | 但吞吐量极低                       | 指数退避(exponential backoff)                |

十一、什么时候值得用无锁

这是我在多年系统开发中形成的个人判断,不是教科书上的结论。

11.1 值得用的场景

第一,延迟敏感的热路径。 如果你的队列在请求的关键路径上,P99 延迟从 10 微秒降到 1 微秒意味着真金白银,那无锁值得投入。金融交易系统、游戏服务器的网络层、DPDK 这类场景都属此类。

第二,线程数远大于核心数的场景。 当持锁线程被 preempt 的概率显著上升时,mutex 的最坏情况延迟是不可接受的。这在虚拟化环境(vCPU 被 hypervisor 调度出去)下尤其严重,有个专门的术语叫 “lock holder preemption”。

第三,实时系统。 硬实时系统对”最坏情况执行时间”(WCET)有严格约束。lock-free 的前进保证可以给出可证明的 WCET 上界,而 mutex 因为依赖调度器行为,WCET 分析极其困难。

11.2 不值得用的场景

第一,低争用场景。 如果你的队列 99% 的时间没有争用,mutex(尤其是 futex-based 的,uncontended path 只有一条 CAS 指令)的性能完全够用,而且代码简单得多。

第二,复杂数据结构。 无锁的 hash map、tree、skip list 比无锁队列/栈复杂一个数量级。除非你是并发算法领域的专家,否则用 RwLock<HashMap> 或 concurrent hash map 库是更明智的选择。Java 的 ConcurrentHashMap 内部是分段锁 + CAS 的混合方案,而不是纯 lock-free。

第三,原型阶段。 过早优化是万恶之源。先用 mutex 把功能跑通,用 profiler 确认队列确实是瓶颈,再考虑无锁方案。

11.3 工程复杂度的真实代价

一个无锁队列的正确实现,需要:

这些代价是真实的。一个 lock-free queue 的代码量可能是 mutex queue 的 5-10 倍,调试难度是 50-100 倍。如果团队中没有人能 review 并发代码,你写的无锁数据结构就是一个定时炸弹。

我的建议: 优先使用成熟的库实现(如 Boost.Lockfree、crossbeam、DPDK rte_ring),只在库不满足需求时才考虑自己实现。自己实现时,务必用 model checker(如 CDSChecker 或 GenMC)做形式化验证。

十二、总结与延伸

12.1 核心要点回顾

无锁队列的技术路线可以用一条线串起来:

mutex queue (简单, 高争用下崩溃)
  |
  v
CAS-based lock-free queue (Michael-Scott 1996)
  |
  +-- ABA 问题 --+--> tagged pointer (简单, 有版本号回绕风险)
  |              +--> hazard pointer (精确, 读路径有开销)
  |              +--> epoch-based reclamation (摊薄开销, 有内存膨胀风险)
  |
  v
FAA-based queue (LCRQ 2013, 高争用下可扩展)
  |
  v
hardware-specific (DPDK rte_ring, io_uring SQ/CQ)

12.2 推荐阅读

12.3 关于验证

最后再强调一点:并发代码的正确性不能靠测试证明,只能靠测试发现 bug。 如果你的 stress test 跑了一亿次没出问题,不代表代码是对的——可能只是 bug 的触发窗口太小。形式化验证工具(model checker)是唯一能给出”正确性证明”的手段。对于生产环境的无锁代码,我建议至少做到:

  1. ThreadSanitizer 通过
  2. 百万级并发操作的 stress test 在目标架构上通过
  3. 如果可能,用 GenMC 或 CDSChecker 做 model checking

这三步做完,你才能对自己写的无锁代码有基本的信心。


上一篇: MPMC Channel:Go channel 与 crossbeam-channel 下一篇: 无锁栈:Treiber 栈与指数退避

相关阅读: - Hazard Pointers - Epoch-Based Reclamation


By .