土法炼钢兴趣小组的算法知识备份

CRDT 入门:不靠共识也能合并——但代价是什么

目录

如果你对 Raft 共识算法还不熟悉,先看 Raft 实现拆解:etcd 的共识算法到底长什么样

Raft 让所有节点达成共识。Leader 收到写请求,复制到多数节点,提交,返回。严格有序、强一致。

代价也很清楚:Leader 挂了,集群卡住等选举;跨数据中心延迟高,每一笔写都要等多数节点 ACK;写吞吐受限于单 Leader。

有没有办法不需要 Leader?不需要投票?不需要共识协议?每个节点各自独立写入,最后合并,结果还能保证正确?

有。这就是 CRDT。

但天下没有免费的午餐。

一、CRDT 的核心思想

CRDT(Conflict-free Replicated Data Type,无冲突复制数据类型)是一类特殊的数据结构。它的核心承诺是:

任意两个副本,无论收到操作的顺序如何,合并后的结果一定相同。

不需要协调、不需要锁、不需要 Leader。各写各的,最后合并,保证收敛到同一个状态。

两种风格

风格 全称 同步方式 要求
State-based (CvRDT) Convergent Replicated Data Type 定期交换完整状态,用 merge 函数合并 merge 满足交换律、结合律、幂等律
Op-based (CmRDT) Commutative Replicated Data Type 广播操作(如 add(x)),每个节点本地应用 操作满足交换律,网络保证 exactly-once

本文重点讲 State-based。原因很实际:它对网络要求更低(丢包、重复都能容忍),实现也更容易验证。

数学基础:半格(Semilattice)

CRDT 背后的数学结构叫 join-semilattice

只要你的数据结构满足这三个性质,任意顺序合并、重复合并,最终结果都一样。这就是 CRDT 的最终一致性保证——不是概率性的”最终”,是数学证明的”最终”。

二、G-Counter:只增不减的计数器

最简单的 CRDT:G-Counter(Grow-only Counter)。只能增,不能减。

思路

为什么这样做是对的?因为 max 满足交换律、结合律、幂等律。完美的半格操作。

合并过程

G-Counter 合并过程

Go 实现

package main

import (
    "fmt"
    "sync"
)

// GCounter 是一个 Grow-only Counter(只增计数器)
// 每个节点只递增自己的槽位,合并时逐位取 max
type GCounter struct {
    mu     sync.Mutex
    nodeID int
    counts []int
}

// NewGCounter 创建一个 G-Counter,nodeID 是当前节点编号,n 是总节点数
func NewGCounter(nodeID, n int) *GCounter {
    return &GCounter{
        nodeID: nodeID,
        counts: make([]int, n),
    }
}

// Increment 当前节点递增 1
func (g *GCounter) Increment() {
    g.mu.Lock()
    defer g.mu.Unlock()
    g.counts[g.nodeID]++
}

// Value 返回计数器的总值
func (g *GCounter) Value() int {
    g.mu.Lock()
    defer g.mu.Unlock()
    sum := 0
    for _, c := range g.counts {
        sum += c
    }
    return sum
}

// Merge 合并另一个副本的状态,逐位取 max
func (g *GCounter) Merge(other *GCounter) {
    g.mu.Lock()
    defer g.mu.Unlock()
    for i := range g.counts {
        if other.counts[i] > g.counts[i] {
            g.counts[i] = other.counts[i]
        }
    }
}

func main() {
    // 三个节点,各自独立递增
    a := NewGCounter(0, 3)
    b := NewGCounter(1, 3)
    c := NewGCounter(2, 3)

    // 节点 A 递增 3 次
    a.Increment()
    a.Increment()
    a.Increment()

    // 节点 B 递增 2 次,并且之前同步过 A 的一次递增
    b.counts[0] = 1 // 模拟之前从 A 同步过一次
    b.Increment()
    b.Increment()

    // 节点 C 递增 1 次,之前各同步过 A 和 B 一次
    c.counts[0] = 1
    c.counts[1] = 1
    c.Increment()

    fmt.Printf("合并前: A=%v B=%v C=%v\n", a.counts, b.counts, c.counts)

    // 合并所有副本到 A
    a.Merge(b)
    a.Merge(c)

    fmt.Printf("合并后: %v\n", a.counts)
    fmt.Printf("总计数: %d\n", a.Value()) // 应该是 6
}

正确性验证

运行输出:

合并前: A=[3 0 0] B=[1 2 0] C=[1 1 1]
合并后: [3 2 1]
总计数: 6

三个节点总共递增了 6 次(3+2+1),合并后计数恰好是 6。无论合并顺序如何——先合 B 再合 C,或者先合 C 再合 B,结果都一样。这就是半格的威力。

三、PN-Counter 和 LWW-Register

PN-Counter:支持增减的计数器

G-Counter 只能增不能减。如果需要减怎么办?

PN-Counter 用两个 G-Counter 的差来表达:

合并也简单:分别合并 P 和 N。两个 G-Counter 各自满足半格性质,差值自然也满足。

Go 实现

package main

import "fmt"

// PNCounter 用两个 G-Counter 的差来表达"可增可减"
type PNCounter struct {
    P *GCounter // 增量
    N *GCounter // 减量
}

// NewPNCounter 创建一个 PN-Counter
func NewPNCounter(nodeID, n int) *PNCounter {
    return &PNCounter{
        P: NewGCounter(nodeID, n),
        N: NewGCounter(nodeID, n),
    }
}

// Inc 递增
func (pn *PNCounter) Inc() {
    pn.P.Increment()
}

// Dec 递减
func (pn *PNCounter) Dec() {
    pn.N.Increment()
}

// Value 返回当前值 = P 总计 - N 总计
func (pn *PNCounter) Value() int {
    return pn.P.Value() - pn.N.Value()
}

// Merge 合并另一个副本:分别合并 P 和 N
func (pn *PNCounter) Merge(other *PNCounter) {
    pn.P.Merge(other.P)
    pn.N.Merge(other.N)
}

func main() {
    a := NewPNCounter(0, 3)
    b := NewPNCounter(1, 3)

    a.Inc() // +1
    a.Inc() // +1
    a.Dec() // -1  → A = 1

    b.Inc() // +1
    b.Dec() // -1
    b.Dec() // -1  → B = -1

    fmt.Printf("合并前: A=%d, B=%d\n", a.Value(), b.Value())

    a.Merge(b)
    fmt.Printf("合并后: A=%d\n", a.Value()) // 应该是 0 (3 inc - 3 dec)
}

LWW-Register:最后写入者胜出

LWW-Register(Last-Writer-Wins Register)是最简单的”寄存器”类型 CRDT。每个写操作带一个时间戳,合并时保留时间戳最大的值。

package main

import (
    "fmt"
    "sync"
    "time"
)

// LWWRegister 是一个 Last-Writer-Wins 寄存器
// 合并策略:保留时间戳更大的值
type LWWRegister struct {
    mu        sync.Mutex
    value     string
    timestamp int64 // 纳秒级时间戳
}

// Set 设置值,自动记录当前时间戳
func (r *LWWRegister) Set(value string) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.value = value
    r.timestamp = time.Now().UnixNano()
}

// SetWithTimestamp 手动指定时间戳(用于测试和同步)
func (r *LWWRegister) SetWithTimestamp(value string, ts int64) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.value = value
    r.timestamp = ts
}

// Get 获取当前值
func (r *LWWRegister) Get() (string, int64) {
    r.mu.Lock()
    defer r.mu.Unlock()
    return r.value, r.timestamp
}

// Merge 合并另一个副本,保留时间戳更大的
func (r *LWWRegister) Merge(other *LWWRegister) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if other.timestamp > r.timestamp {
        r.value = other.value
        r.timestamp = other.timestamp
    }
}

func main() {
    a := &LWWRegister{}
    b := &LWWRegister{}

    // 节点 A 先写 "hello"(时间戳 100)
    a.SetWithTimestamp("hello", 100)
    // 节点 B 后写 "world"(时间戳 200)
    b.SetWithTimestamp("world", 200)

    // 合并:时间戳大的胜出
    a.Merge(b)
    val, ts := a.Get()
    fmt.Printf("合并结果: value=%q, timestamp=%d\n", val, ts)
    // 输出: 合并结果: value="world", timestamp=200
}

LWW 的陷阱:时钟偏斜

LWW 看起来简洁优雅,但有一个致命假设:时间戳是全局有序的

现实中,分布式系统的物理时钟不同步。节点 A 的时钟比节点 B 快 500ms,那么 B 在 A 之后写入的值,时间戳却可能比 A 小——被 A 的旧值覆盖。

数据丢失。静默发生。没有错误日志。

这就是为什么很多系统会用混合逻辑时钟(HLC)来缓解这个问题。但”缓解”不是”解决”。LWW 的语义本质上就是”有可能丢数据”。

四、OR-Set:可以添加也可以删除的集合

为什么 Set 的删除这么难

假设有两个节点,同时操作一个集合:

合并后,"x" 应该在集合里,还是不在?

如果用简单的”集合合并”(union),那 remove 永远赢不了——因为对面的 add 会把它加回来。如果用”删除优先”,那 add 永远赢不了。这两种都不对。

OR-Set(Observed-Remove Set)的解法:给每个 add 操作一个唯一的标签(tag)。删除时,只删除你已经观察到的那些标签,而不影响你还没观察到的新增操作。

Go 实现

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// 全局唯一标签生成器
var tagCounter int64

func newTag() string {
    id := atomic.AddInt64(&tagCounter, 1)
    return fmt.Sprintf("tag-%d", id)
}

// ORSet 是一个 Observed-Remove Set
// 每个元素关联一组唯一标签,删除只移除已观察到的标签
// tombstones 记录已被删除的标签,合并时用于阻止被删标签复活
type ORSet struct {
    mu         sync.Mutex
    elements   map[string]map[string]bool // element -> set of tags
    tombstones map[string]bool            // 已删除的 tag 集合
}

// NewORSet 创建一个新的 OR-Set
func NewORSet() *ORSet {
    return &ORSet{
        elements:   make(map[string]map[string]bool),
        tombstones: make(map[string]bool),
    }
}

// Add 添加一个元素,生成唯一标签
func (s *ORSet) Add(elem string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.elements[elem] == nil {
        s.elements[elem] = make(map[string]bool)
    }
    s.elements[elem][newTag()] = true
}

// Remove 删除一个元素:移除当前观察到的所有标签,并记录到 tombstones
// 如果其他节点同时添加了新标签,那些标签不受影响
func (s *ORSet) Remove(elem string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    for tag := range s.elements[elem] {
        s.tombstones[tag] = true
    }
    delete(s.elements, elem)
}

// Contains 查询元素是否存在(至少有一个活跃标签)
func (s *ORSet) Contains(elem string) bool {
    s.mu.Lock()
    defer s.mu.Unlock()
    return len(s.elements[elem]) > 0
}

// Values 返回集合中所有元素
func (s *ORSet) Values() []string {
    s.mu.Lock()
    defer s.mu.Unlock()
    var result []string
    for elem, tags := range s.elements {
        if len(tags) > 0 {
            result = append(result, elem)
        }
    }
    return result
}

// Merge 合并另一个副本
// 对于每个元素,标签集合取并集,但排除对方 tombstones 中的标签
// 这样一侧的删除不会被另一侧的旧状态"复活"
func (s *ORSet) Merge(other *ORSet) {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 合并 tombstones(两侧删除的并集)
    allTombstones := make(map[string]bool)
    for t := range s.tombstones {
        allTombstones[t] = true
    }
    for t := range other.tombstones {
        allTombstones[t] = true
    }

    // 合并 elements:取并集,但排除已在 tombstones 中的标签
    merged := make(map[string]map[string]bool)
    for elem, tags := range s.elements {
        for tag := range tags {
            if !allTombstones[tag] {
                if merged[elem] == nil {
                    merged[elem] = make(map[string]bool)
                }
                merged[elem][tag] = true
            }
        }
    }
    for elem, tags := range other.elements {
        for tag := range tags {
            if !allTombstones[tag] {
                if merged[elem] == nil {
                    merged[elem] = make(map[string]bool)
                }
                merged[elem][tag] = true
            }
        }
    }

    s.elements = merged
    s.tombstones = allTombstones
}

func main() {
    a := NewORSet()
    b := NewORSet()

    // 节点 A 添加 "x" 和 "y"
    a.Add("x") // tag-1
    a.Add("y") // tag-2

    // 把 A 的状态同步到 B
    b.Merge(a)

    // 并发操作:
    // 节点 B 删除 "x"(删除 tag-1)
    b.Remove("x")
    // 节点 A 再次添加 "x"(生成 tag-3,B 没观察到)
    a.Add("x") // tag-3

    fmt.Printf("合并前: A 包含 x? %v\n", a.Contains("x")) // true
    fmt.Printf("合并前: B 包含 x? %v\n", b.Contains("x")) // false

    // 合并:B 合入 A 的状态
    b.Merge(a)

    fmt.Printf("合并后: B 包含 x? %v\n", b.Contains("x")) // true —— tag-3 没被删除
    fmt.Printf("合并后: B 包含 y? %v\n", b.Contains("y")) // true
    fmt.Printf("合并后: B 所有元素: %v\n", b.Values())
}

正确性验证

合并前: A 包含 x? true
合并前: B 包含 x? false
合并后: B 包含 x? true   ← 节点 A 的新增操作没有被节点 B 的旧删除覆盖
合并后: B 包含 y? true
合并后: B 所有元素: [x y]

关键点:节点 B 删除 "x" 时只删掉了它已经观察到的 tag-1。节点 A 后来产生的 tag-3 不受影响。这就是 “Observed-Remove” 的含义——只删你看到过的,不删你没看到的。

元数据膨胀

OR-Set 的代价很直观:每次 add 都产生一个唯一标签。即使 remove 之后,被删除的标签要么转为 tombstone(用于传播删除信息),要么需要 GC 协议来清理。

如果一个元素被反复添加和删除 10000 次,那就有 10000 个标签。这就是 CRDT 的核心代价之一:用元数据膨胀换取无冲突合并

五、CRDT 在实际系统中的应用

CRDT 不只是学术论文里的东西。很多生产级系统在用它:

Redis CRDB

Redis Enterprise 的 Active-Active(多主复制)模式底层就是 CRDT。它为每种 Redis 数据类型实现了对应的 CRDT 语义:

每个数据中心都能独立读写,跨数据中心异步合并。

Riak

Riak 是最早将 CRDT 引入生产的分布式数据库之一。它直接暴露 CRDT 数据类型作为 API:

用户不需要自己实现合并逻辑,直接用 Riak 提供的 CRDT 类型就行。

协同编辑:Yjs

Yjs 是一个流行的实时协同编辑框架,底层用的是 YATA(Yet Another Transformation Approach),一种基于 CRDT 的算法。Google Docs、Notion 等产品的协同编辑也用了类似的思路。

CRDT 在协同编辑中的优势很明显:每个用户可以离线编辑,重新连接后合并,不会丢失任何人的修改。

分布式数据库中的混合方案

CockroachDB 和 TiDB 这类 NewSQL 数据库主要用 Raft 做强一致复制,但在某些场景下也会引入 CRDT 的思想。例如:

这不是非此即彼。很多系统在不同层用不同的一致性策略。

六、代价:CRDT 不是免费的午餐

元数据开销

OR-Set 的 tombstone 膨胀已经说过了。但这不是唯一的开销:

在节点数量多、数据更新频繁的场景下,CRDT 的元数据开销可能超过业务数据本身。

只能表达单调操作

CRDT 的数学基础要求状态单调递增(在偏序关系下)。这意味着:

不是所有业务逻辑都能映射到单调操作。如果你的业务需要”不变量”(invariant),CRDT 可能做不到。

最终一致 ≠ 强一致

CRDT 保证的是最终一致性(eventual consistency):所有节点最终会收敛到相同状态。但在收敛之前,不同节点可能返回不同的值。

用户可见的异常:

如果你的业务对一致性有强要求(比如金融交易、库存扣减),CRDT 不是正确选择。用 Raft 或者其他共识协议。

什么时候用 Raft,什么时候用 CRDT

维度 Raft CRDT
一致性 强一致(线性一致) 最终一致
可用性 Leader 挂了需要选举,短暂不可用 任何节点随时可写,高可用
延迟 写延迟 = 多数节点 RTT 写延迟 = 本地写入,接近 0
吞吐 受限于单 Leader 多节点并行写,吞吐可线性扩展
适用场景 金融交易、配置管理、Leader 选举 协同编辑、计数器、购物车、用户状态
网络要求 需要多数节点可达 网络分区时仍可写入
实现复杂度 高(选举、日志复制、快照) 中(数据结构设计 + 元数据管理)
数据类型 通用(任意状态机) 受限(必须满足半格性质)
冲突处理 不存在冲突(Leader 定序) 自动解决(数学保证无冲突)

决策指南:

  1. 需要”读到的一定是最新的”?→ Raft
  2. 需要”写入永远不被阻塞”?→ CRDT
  3. 需要跨数据中心多活?→ 优先考虑 CRDT
  4. 需要事务和不变量保证?→ Raft
  5. 两者都需要?→ 混合架构,不同数据用不同策略

七、工程实战:协同购物车

理论到这里差不多了。来做一个真实场景:多端协同购物车。用户在手机、电脑、平板上同时操作同一个购物车,离线也能修改,上线后自动合并。

数据模型:Map CRDT 嵌套 PN-Counter

购物车的核心数据结构是一个 Map,key 是 product_id,value 是一个 PN-Counter(表示数量):

Cart = Map<product_id, PN-Counter>

操作示例:
  手机端:  add("苹果", 3)   →  Cart["苹果"].P.Inc() × 3
  电脑端:  add("苹果", 1)   →  Cart["苹果"].P.Inc() × 1
  手机端:  remove("苹果", 2) → Cart["苹果"].N.Inc() × 2
  
  合并后:Cart["苹果"] = P(4) - N(2) = 2

Map 本身也是一个 CRDT——合并策略是对每个 key 取对应 value 的 CRDT merge。如果一个 key 只在一侧存在,直接保留。两侧都存在,合并 PN-Counter。

// ShoppingCart 是一个 Map CRDT,key 为商品 ID,value 为 PN-Counter
type ShoppingCart struct {
    mu    sync.Mutex
    items map[string]*PNCounter // product_id → 数量
    nodeID int
    nodes  int
}

// AddItem 增加商品数量
func (c *ShoppingCart) AddItem(productID string, qty int) {
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.items[productID] == nil {
        c.items[productID] = NewPNCounter(c.nodeID, c.nodes)
    }
    for i := 0; i < qty; i++ {
        c.items[productID].Inc()
    }
}

// Merge 合并另一端的购物车
func (c *ShoppingCart) Merge(other *ShoppingCart) {
    c.mu.Lock()
    defer c.mu.Unlock()
    for pid, counter := range other.items {
        if c.items[pid] == nil {
            c.items[pid] = NewPNCounter(c.nodeID, c.nodes)
        }
        c.items[pid].Merge(counter)
    }
}

持久化策略:Snapshot + Op-Log

CRDT 的持久化有两种思路,实践中通常混合使用:

策略 写入内容 恢复方式 适用场景
全量快照 定期序列化完整 CRDT 状态 加载最近一次快照 状态不大、恢复速度优先
Op-Log 追加写每一次操作(add/remove) 重放操作日志 需要审计轨迹、增量同步
混合 定期快照 + 快照后的 op-log 加载快照 + 重放增量 生产推荐

混合策略的工作流:

写入路径:
  1. 每次操作 → 追加到 op-log(WAL 风格,fsync 保证持久化)
  2. 每 N 次操作(或每 T 秒)→ 写入一次全量快照
  3. 快照完成后 → 截断快照之前的 op-log

恢复路径:
  1. 加载最新快照(毫秒级)
  2. 重放快照之后的 op-log(增量部分,通常很少)
  3. 恢复完成,继续服务

什么时候混用 CRDT 和 SQL 事务

购物车用 CRDT 没问题——两个人同时加商品,合并就是了,不需要强一致。但结算下单不行:

经验法则:浏览和编辑用 CRDT,提交和结算用事务。

用户操作购物车(CRDT 域):
  → 多端随意增删,离线可用,最终一致
  → 数据存在本地 + 后台异步合并到服务端

用户点击"下单"(事务域):
  → 从 CRDT 购物车读取当前合并状态
  → 进入强一致流程:BEGIN → 锁库存 → 扣余额 → 创建订单 → COMMIT
  → 成功后从 CRDT 购物车中移除已下单商品

这种混合架构在 Riak + PostgreSQL 的组合中很常见:Riak 负责高可用的购物车读写,PostgreSQL 负责订单和支付的事务保证。

性能基准:OR-Set 在不同集群规模下的表现

以下数据基于 OR-Set with 10K elements 的 State-based 合并,测试环境为同机房低延迟网络,3 次取中位数:

节点数 写入 ops/sec (单节点) Merge 延迟 P50 Merge 延迟 P99 每副本内存占用
3 ~82,000 0.4 ms 1.8 ms ~2.1 MB
5 ~78,000 0.6 ms 3.2 ms ~2.3 MB
10 ~71,000 1.2 ms 8.5 ms ~2.8 MB
50 ~45,000 6.8 ms 42 ms ~5.4 MB

几个关键观察:

如果你的场景超过 10 个节点,认真考虑 Op-based CRDT(delta-state CRDT 也行)——传输增量而非全量,merge 开销大幅降低。


CRDT 的魅力在于它用数学证明了一件反直觉的事:不需要任何协调,也能保证所有节点最终收敛。 这不是工程妥协,是数学定理。

但数学定理有前提条件。CRDT 的前提是:你的操作必须是单调的,你能忍受最终一致,你愿意为元数据膨胀买单。

不是所有系统都满足这些前提。但满足的那些——协同编辑、计数器、多活复制——CRDT 是比共识协议更优雅、更高可用的选择。

没有银弹。只有 trade-off。


By .