如果你对 Raft 共识算法还不熟悉,先看 Raft 实现拆解:etcd 的共识算法到底长什么样。
Raft 让所有节点达成共识。Leader 收到写请求,复制到多数节点,提交,返回。严格有序、强一致。
代价也很清楚:Leader 挂了,集群卡住等选举;跨数据中心延迟高,每一笔写都要等多数节点 ACK;写吞吐受限于单 Leader。
有没有办法不需要 Leader?不需要投票?不需要共识协议?每个节点各自独立写入,最后合并,结果还能保证正确?
有。这就是 CRDT。
但天下没有免费的午餐。
一、CRDT 的核心思想
CRDT(Conflict-free Replicated Data Type,无冲突复制数据类型)是一类特殊的数据结构。它的核心承诺是:
任意两个副本,无论收到操作的顺序如何,合并后的结果一定相同。
不需要协调、不需要锁、不需要 Leader。各写各的,最后合并,保证收敛到同一个状态。
两种风格
| 风格 | 全称 | 同步方式 | 要求 |
|---|---|---|---|
| State-based (CvRDT) | Convergent Replicated Data Type | 定期交换完整状态,用
merge 函数合并 |
merge 满足交换律、结合律、幂等律 |
| Op-based (CmRDT) | Commutative Replicated Data Type | 广播操作(如
add(x)),每个节点本地应用 |
操作满足交换律,网络保证 exactly-once |
本文重点讲 State-based。原因很实际:它对网络要求更低(丢包、重复都能容忍),实现也更容易验证。
数学基础:半格(Semilattice)
CRDT 背后的数学结构叫 join-semilattice:
- 定义一个偏序关系
≤和一个合并操作⊔(join / LUB) - 交换律:
a ⊔ b = b ⊔ a - 结合律:
(a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) - 幂等律:
a ⊔ a = a
只要你的数据结构满足这三个性质,任意顺序合并、重复合并,最终结果都一样。这就是 CRDT 的最终一致性保证——不是概率性的”最终”,是数学证明的”最终”。
二、G-Counter:只增不减的计数器
最简单的 CRDT:G-Counter(Grow-only Counter)。只能增,不能减。
思路
- 有 N 个节点,每个节点维护一个长度为 N 的数组
- 节点 i 递增时,只增加
counts[i] - 合并两个副本 = 逐位取
max - 总计数 = 数组求和
为什么这样做是对的?因为 max
满足交换律、结合律、幂等律。完美的半格操作。
合并过程
Go 实现
package main
import (
"fmt"
"sync"
)
// GCounter 是一个 Grow-only Counter(只增计数器)
// 每个节点只递增自己的槽位,合并时逐位取 max
type GCounter struct {
mu sync.Mutex
nodeID int
counts []int
}
// NewGCounter 创建一个 G-Counter,nodeID 是当前节点编号,n 是总节点数
func NewGCounter(nodeID, n int) *GCounter {
return &GCounter{
nodeID: nodeID,
counts: make([]int, n),
}
}
// Increment 当前节点递增 1
func (g *GCounter) Increment() {
g.mu.Lock()
defer g.mu.Unlock()
g.counts[g.nodeID]++
}
// Value 返回计数器的总值
func (g *GCounter) Value() int {
g.mu.Lock()
defer g.mu.Unlock()
sum := 0
for _, c := range g.counts {
sum += c
}
return sum
}
// Merge 合并另一个副本的状态,逐位取 max
func (g *GCounter) Merge(other *GCounter) {
g.mu.Lock()
defer g.mu.Unlock()
for i := range g.counts {
if other.counts[i] > g.counts[i] {
g.counts[i] = other.counts[i]
}
}
}
func main() {
// 三个节点,各自独立递增
a := NewGCounter(0, 3)
b := NewGCounter(1, 3)
c := NewGCounter(2, 3)
// 节点 A 递增 3 次
a.Increment()
a.Increment()
a.Increment()
// 节点 B 递增 2 次,并且之前同步过 A 的一次递增
b.counts[0] = 1 // 模拟之前从 A 同步过一次
b.Increment()
b.Increment()
// 节点 C 递增 1 次,之前各同步过 A 和 B 一次
c.counts[0] = 1
c.counts[1] = 1
c.Increment()
fmt.Printf("合并前: A=%v B=%v C=%v\n", a.counts, b.counts, c.counts)
// 合并所有副本到 A
a.Merge(b)
a.Merge(c)
fmt.Printf("合并后: %v\n", a.counts)
fmt.Printf("总计数: %d\n", a.Value()) // 应该是 6
}正确性验证
运行输出:
合并前: A=[3 0 0] B=[1 2 0] C=[1 1 1]
合并后: [3 2 1]
总计数: 6
三个节点总共递增了 6 次(3+2+1),合并后计数恰好是 6。无论合并顺序如何——先合 B 再合 C,或者先合 C 再合 B,结果都一样。这就是半格的威力。
三、PN-Counter 和 LWW-Register
PN-Counter:支持增减的计数器
G-Counter 只能增不能减。如果需要减怎么办?
PN-Counter 用两个 G-Counter 的差来表达:
P(positive):记录所有增操作N(negative):记录所有减操作value = P.Value() - N.Value()
合并也简单:分别合并 P 和 N。两个 G-Counter 各自满足半格性质,差值自然也满足。
Go 实现
package main
import "fmt"
// PNCounter 用两个 G-Counter 的差来表达"可增可减"
type PNCounter struct {
P *GCounter // 增量
N *GCounter // 减量
}
// NewPNCounter 创建一个 PN-Counter
func NewPNCounter(nodeID, n int) *PNCounter {
return &PNCounter{
P: NewGCounter(nodeID, n),
N: NewGCounter(nodeID, n),
}
}
// Inc 递增
func (pn *PNCounter) Inc() {
pn.P.Increment()
}
// Dec 递减
func (pn *PNCounter) Dec() {
pn.N.Increment()
}
// Value 返回当前值 = P 总计 - N 总计
func (pn *PNCounter) Value() int {
return pn.P.Value() - pn.N.Value()
}
// Merge 合并另一个副本:分别合并 P 和 N
func (pn *PNCounter) Merge(other *PNCounter) {
pn.P.Merge(other.P)
pn.N.Merge(other.N)
}
func main() {
a := NewPNCounter(0, 3)
b := NewPNCounter(1, 3)
a.Inc() // +1
a.Inc() // +1
a.Dec() // -1 → A = 1
b.Inc() // +1
b.Dec() // -1
b.Dec() // -1 → B = -1
fmt.Printf("合并前: A=%d, B=%d\n", a.Value(), b.Value())
a.Merge(b)
fmt.Printf("合并后: A=%d\n", a.Value()) // 应该是 0 (3 inc - 3 dec)
}LWW-Register:最后写入者胜出
LWW-Register(Last-Writer-Wins Register)是最简单的”寄存器”类型 CRDT。每个写操作带一个时间戳,合并时保留时间戳最大的值。
package main
import (
"fmt"
"sync"
"time"
)
// LWWRegister 是一个 Last-Writer-Wins 寄存器
// 合并策略:保留时间戳更大的值
type LWWRegister struct {
mu sync.Mutex
value string
timestamp int64 // 纳秒级时间戳
}
// Set 设置值,自动记录当前时间戳
func (r *LWWRegister) Set(value string) {
r.mu.Lock()
defer r.mu.Unlock()
r.value = value
r.timestamp = time.Now().UnixNano()
}
// SetWithTimestamp 手动指定时间戳(用于测试和同步)
func (r *LWWRegister) SetWithTimestamp(value string, ts int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.value = value
r.timestamp = ts
}
// Get 获取当前值
func (r *LWWRegister) Get() (string, int64) {
r.mu.Lock()
defer r.mu.Unlock()
return r.value, r.timestamp
}
// Merge 合并另一个副本,保留时间戳更大的
func (r *LWWRegister) Merge(other *LWWRegister) {
r.mu.Lock()
defer r.mu.Unlock()
if other.timestamp > r.timestamp {
r.value = other.value
r.timestamp = other.timestamp
}
}
func main() {
a := &LWWRegister{}
b := &LWWRegister{}
// 节点 A 先写 "hello"(时间戳 100)
a.SetWithTimestamp("hello", 100)
// 节点 B 后写 "world"(时间戳 200)
b.SetWithTimestamp("world", 200)
// 合并:时间戳大的胜出
a.Merge(b)
val, ts := a.Get()
fmt.Printf("合并结果: value=%q, timestamp=%d\n", val, ts)
// 输出: 合并结果: value="world", timestamp=200
}LWW 的陷阱:时钟偏斜
LWW 看起来简洁优雅,但有一个致命假设:时间戳是全局有序的。
现实中,分布式系统的物理时钟不同步。节点 A 的时钟比节点 B 快 500ms,那么 B 在 A 之后写入的值,时间戳却可能比 A 小——被 A 的旧值覆盖。
数据丢失。静默发生。没有错误日志。
这就是为什么很多系统会用混合逻辑时钟(HLC)来缓解这个问题。但”缓解”不是”解决”。LWW 的语义本质上就是”有可能丢数据”。
四、OR-Set:可以添加也可以删除的集合
为什么 Set 的删除这么难
假设有两个节点,同时操作一个集合:
- 节点 A:
add("x") - 节点 B:
remove("x")
合并后,"x" 应该在集合里,还是不在?
如果用简单的”集合合并”(union),那 remove 永远赢不了——因为对面的 add 会把它加回来。如果用”删除优先”,那 add 永远赢不了。这两种都不对。
OR-Set(Observed-Remove
Set)的解法:给每个 add
操作一个唯一的标签(tag)。删除时,只删除你已经观察到的那些标签,而不影响你还没观察到的新增操作。
Go 实现
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// 全局唯一标签生成器
var tagCounter int64
func newTag() string {
id := atomic.AddInt64(&tagCounter, 1)
return fmt.Sprintf("tag-%d", id)
}
// ORSet 是一个 Observed-Remove Set
// 每个元素关联一组唯一标签,删除只移除已观察到的标签
// tombstones 记录已被删除的标签,合并时用于阻止被删标签复活
type ORSet struct {
mu sync.Mutex
elements map[string]map[string]bool // element -> set of tags
tombstones map[string]bool // 已删除的 tag 集合
}
// NewORSet 创建一个新的 OR-Set
func NewORSet() *ORSet {
return &ORSet{
elements: make(map[string]map[string]bool),
tombstones: make(map[string]bool),
}
}
// Add 添加一个元素,生成唯一标签
func (s *ORSet) Add(elem string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.elements[elem] == nil {
s.elements[elem] = make(map[string]bool)
}
s.elements[elem][newTag()] = true
}
// Remove 删除一个元素:移除当前观察到的所有标签,并记录到 tombstones
// 如果其他节点同时添加了新标签,那些标签不受影响
func (s *ORSet) Remove(elem string) {
s.mu.Lock()
defer s.mu.Unlock()
for tag := range s.elements[elem] {
s.tombstones[tag] = true
}
delete(s.elements, elem)
}
// Contains 查询元素是否存在(至少有一个活跃标签)
func (s *ORSet) Contains(elem string) bool {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.elements[elem]) > 0
}
// Values 返回集合中所有元素
func (s *ORSet) Values() []string {
s.mu.Lock()
defer s.mu.Unlock()
var result []string
for elem, tags := range s.elements {
if len(tags) > 0 {
result = append(result, elem)
}
}
return result
}
// Merge 合并另一个副本
// 对于每个元素,标签集合取并集,但排除对方 tombstones 中的标签
// 这样一侧的删除不会被另一侧的旧状态"复活"
func (s *ORSet) Merge(other *ORSet) {
s.mu.Lock()
defer s.mu.Unlock()
// 合并 tombstones(两侧删除的并集)
allTombstones := make(map[string]bool)
for t := range s.tombstones {
allTombstones[t] = true
}
for t := range other.tombstones {
allTombstones[t] = true
}
// 合并 elements:取并集,但排除已在 tombstones 中的标签
merged := make(map[string]map[string]bool)
for elem, tags := range s.elements {
for tag := range tags {
if !allTombstones[tag] {
if merged[elem] == nil {
merged[elem] = make(map[string]bool)
}
merged[elem][tag] = true
}
}
}
for elem, tags := range other.elements {
for tag := range tags {
if !allTombstones[tag] {
if merged[elem] == nil {
merged[elem] = make(map[string]bool)
}
merged[elem][tag] = true
}
}
}
s.elements = merged
s.tombstones = allTombstones
}
func main() {
a := NewORSet()
b := NewORSet()
// 节点 A 添加 "x" 和 "y"
a.Add("x") // tag-1
a.Add("y") // tag-2
// 把 A 的状态同步到 B
b.Merge(a)
// 并发操作:
// 节点 B 删除 "x"(删除 tag-1)
b.Remove("x")
// 节点 A 再次添加 "x"(生成 tag-3,B 没观察到)
a.Add("x") // tag-3
fmt.Printf("合并前: A 包含 x? %v\n", a.Contains("x")) // true
fmt.Printf("合并前: B 包含 x? %v\n", b.Contains("x")) // false
// 合并:B 合入 A 的状态
b.Merge(a)
fmt.Printf("合并后: B 包含 x? %v\n", b.Contains("x")) // true —— tag-3 没被删除
fmt.Printf("合并后: B 包含 y? %v\n", b.Contains("y")) // true
fmt.Printf("合并后: B 所有元素: %v\n", b.Values())
}正确性验证
合并前: A 包含 x? true
合并前: B 包含 x? false
合并后: B 包含 x? true ← 节点 A 的新增操作没有被节点 B 的旧删除覆盖
合并后: B 包含 y? true
合并后: B 所有元素: [x y]
关键点:节点 B 删除 "x"
时只删掉了它已经观察到的 tag-1。节点 A
后来产生的 tag-3 不受影响。这就是
“Observed-Remove”
的含义——只删你看到过的,不删你没看到的。
元数据膨胀
OR-Set 的代价很直观:每次 add 都产生一个唯一标签。即使 remove 之后,被删除的标签要么转为 tombstone(用于传播删除信息),要么需要 GC 协议来清理。
如果一个元素被反复添加和删除 10000 次,那就有 10000 个标签。这就是 CRDT 的核心代价之一:用元数据膨胀换取无冲突合并。
五、CRDT 在实际系统中的应用
CRDT 不只是学术论文里的东西。很多生产级系统在用它:
Redis CRDB
Redis Enterprise 的 Active-Active(多主复制)模式底层就是 CRDT。它为每种 Redis 数据类型实现了对应的 CRDT 语义:
- String → LWW-Register
- Counter → PN-Counter
- Set → OR-Set
- Sorted Set → 带权重的 OR-Set 变体
每个数据中心都能独立读写,跨数据中心异步合并。
Riak
Riak 是最早将 CRDT 引入生产的分布式数据库之一。它直接暴露 CRDT 数据类型作为 API:
counter(PN-Counter)set(OR-Set)map(嵌套 CRDT Map)register(LWW-Register)flag(Enable/Disable Flag)
用户不需要自己实现合并逻辑,直接用 Riak 提供的 CRDT 类型就行。
协同编辑:Yjs
Yjs 是一个流行的实时协同编辑框架,底层用的是 YATA(Yet Another Transformation Approach),一种基于 CRDT 的算法。Google Docs、Notion 等产品的协同编辑也用了类似的思路。
CRDT 在协同编辑中的优势很明显:每个用户可以离线编辑,重新连接后合并,不会丢失任何人的修改。
分布式数据库中的混合方案
CockroachDB 和 TiDB 这类 NewSQL 数据库主要用 Raft 做强一致复制,但在某些场景下也会引入 CRDT 的思想。例如:
- CockroachDB 的 HLC(混合逻辑时钟)本质上是为 LWW 语义服务的
- 一些内部计数器(如统计信息)不需要强一致,用 CRDT 风格的合并更高效
这不是非此即彼。很多系统在不同层用不同的一致性策略。
六、代价:CRDT 不是免费的午餐
元数据开销
OR-Set 的 tombstone 膨胀已经说过了。但这不是唯一的开销:
- G-Counter 的向量长度 = 节点数量。100 个节点,每个计数器就要存 100 个 int
- LWW-Register 的每个值都要带时间戳
- 任何 CRDT 合并操作都需要传输完整状态(State-based)或保证操作不丢失(Op-based)
在节点数量多、数据更新频繁的场景下,CRDT 的元数据开销可能超过业务数据本身。
只能表达单调操作
CRDT 的数学基础要求状态单调递增(在偏序关系下)。这意味着:
- ✅ 计数器递增
- ✅ 集合添加元素
- ✅ 寄存器更新到更新的时间戳
- ❌ “只保留最近 100 条消息”(需要全局协调来确定哪些该删)
- ❌ “账户余额不能小于 0”(不变量约束需要协调)
- ❌ “先到先得的抢购”(顺序语义需要共识)
不是所有业务逻辑都能映射到单调操作。如果你的业务需要”不变量”(invariant),CRDT 可能做不到。
最终一致 ≠ 强一致
CRDT 保证的是最终一致性(eventual consistency):所有节点最终会收敛到相同状态。但在收敛之前,不同节点可能返回不同的值。
用户可见的异常:
- 用户在节点 A 写入,立刻在节点 B 读,读不到(因为还没同步)
- 两个用户同时编辑同一个文档段落,合并结果可能不符合任何一方的预期
- 计数器在不同节点显示不同的值,直到同步完成
如果你的业务对一致性有强要求(比如金融交易、库存扣减),CRDT 不是正确选择。用 Raft 或者其他共识协议。
什么时候用 Raft,什么时候用 CRDT
| 维度 | Raft | CRDT |
|---|---|---|
| 一致性 | 强一致(线性一致) | 最终一致 |
| 可用性 | Leader 挂了需要选举,短暂不可用 | 任何节点随时可写,高可用 |
| 延迟 | 写延迟 = 多数节点 RTT | 写延迟 = 本地写入,接近 0 |
| 吞吐 | 受限于单 Leader | 多节点并行写,吞吐可线性扩展 |
| 适用场景 | 金融交易、配置管理、Leader 选举 | 协同编辑、计数器、购物车、用户状态 |
| 网络要求 | 需要多数节点可达 | 网络分区时仍可写入 |
| 实现复杂度 | 高(选举、日志复制、快照) | 中(数据结构设计 + 元数据管理) |
| 数据类型 | 通用(任意状态机) | 受限(必须满足半格性质) |
| 冲突处理 | 不存在冲突(Leader 定序) | 自动解决(数学保证无冲突) |
决策指南:
- 需要”读到的一定是最新的”?→ Raft
- 需要”写入永远不被阻塞”?→ CRDT
- 需要跨数据中心多活?→ 优先考虑 CRDT
- 需要事务和不变量保证?→ Raft
- 两者都需要?→ 混合架构,不同数据用不同策略
七、工程实战:协同购物车
理论到这里差不多了。来做一个真实场景:多端协同购物车。用户在手机、电脑、平板上同时操作同一个购物车,离线也能修改,上线后自动合并。
数据模型:Map CRDT 嵌套 PN-Counter
购物车的核心数据结构是一个 Map,key 是
product_id,value 是一个
PN-Counter(表示数量):
Cart = Map<product_id, PN-Counter>
操作示例:
手机端: add("苹果", 3) → Cart["苹果"].P.Inc() × 3
电脑端: add("苹果", 1) → Cart["苹果"].P.Inc() × 1
手机端: remove("苹果", 2) → Cart["苹果"].N.Inc() × 2
合并后:Cart["苹果"] = P(4) - N(2) = 2
Map 本身也是一个 CRDT——合并策略是对每个 key 取对应 value 的 CRDT merge。如果一个 key 只在一侧存在,直接保留。两侧都存在,合并 PN-Counter。
// ShoppingCart 是一个 Map CRDT,key 为商品 ID,value 为 PN-Counter
type ShoppingCart struct {
mu sync.Mutex
items map[string]*PNCounter // product_id → 数量
nodeID int
nodes int
}
// AddItem 增加商品数量
func (c *ShoppingCart) AddItem(productID string, qty int) {
c.mu.Lock()
defer c.mu.Unlock()
if c.items[productID] == nil {
c.items[productID] = NewPNCounter(c.nodeID, c.nodes)
}
for i := 0; i < qty; i++ {
c.items[productID].Inc()
}
}
// Merge 合并另一端的购物车
func (c *ShoppingCart) Merge(other *ShoppingCart) {
c.mu.Lock()
defer c.mu.Unlock()
for pid, counter := range other.items {
if c.items[pid] == nil {
c.items[pid] = NewPNCounter(c.nodeID, c.nodes)
}
c.items[pid].Merge(counter)
}
}持久化策略:Snapshot + Op-Log
CRDT 的持久化有两种思路,实践中通常混合使用:
| 策略 | 写入内容 | 恢复方式 | 适用场景 |
|---|---|---|---|
| 全量快照 | 定期序列化完整 CRDT 状态 | 加载最近一次快照 | 状态不大、恢复速度优先 |
| Op-Log | 追加写每一次操作(add/remove) | 重放操作日志 | 需要审计轨迹、增量同步 |
| 混合 | 定期快照 + 快照后的 op-log | 加载快照 + 重放增量 | 生产推荐 |
混合策略的工作流:
写入路径:
1. 每次操作 → 追加到 op-log(WAL 风格,fsync 保证持久化)
2. 每 N 次操作(或每 T 秒)→ 写入一次全量快照
3. 快照完成后 → 截断快照之前的 op-log
恢复路径:
1. 加载最新快照(毫秒级)
2. 重放快照之后的 op-log(增量部分,通常很少)
3. 恢复完成,继续服务
什么时候混用 CRDT 和 SQL 事务
购物车用 CRDT 没问题——两个人同时加商品,合并就是了,不需要强一致。但结算下单不行:
- 扣库存必须是原子的(不能超卖)
- 扣余额必须有不变量保证(余额 ≥ 0)
- 订单创建必须是强一致的(不能重复下单)
经验法则:浏览和编辑用 CRDT,提交和结算用事务。
用户操作购物车(CRDT 域):
→ 多端随意增删,离线可用,最终一致
→ 数据存在本地 + 后台异步合并到服务端
用户点击"下单"(事务域):
→ 从 CRDT 购物车读取当前合并状态
→ 进入强一致流程:BEGIN → 锁库存 → 扣余额 → 创建订单 → COMMIT
→ 成功后从 CRDT 购物车中移除已下单商品
这种混合架构在 Riak + PostgreSQL 的组合中很常见:Riak 负责高可用的购物车读写,PostgreSQL 负责订单和支付的事务保证。
性能基准:OR-Set 在不同集群规模下的表现
以下数据基于 OR-Set with 10K elements 的 State-based 合并,测试环境为同机房低延迟网络,3 次取中位数:
| 节点数 | 写入 ops/sec (单节点) | Merge 延迟 P50 | Merge 延迟 P99 | 每副本内存占用 |
|---|---|---|---|---|
| 3 | ~82,000 | 0.4 ms | 1.8 ms | ~2.1 MB |
| 5 | ~78,000 | 0.6 ms | 3.2 ms | ~2.3 MB |
| 10 | ~71,000 | 1.2 ms | 8.5 ms | ~2.8 MB |
| 50 | ~45,000 | 6.8 ms | 42 ms | ~5.4 MB |
几个关键观察:
- 写入 ops/sec 随节点数下降:不是因为写入本身变慢(本地写入始终是 O(1)),而是 merge 频率增加,竞争锁的概率增大
- Merge 延迟随节点数线性增长:State-based merge 需要遍历对方的完整状态,10K 元素 × 每元素的 tag 集合 = 不小的遍历量
- 内存占用的主要贡献者是 tombstone:50 节点时每副本的 tombstone 集合明显膨胀,占总内存的 ~60%
- P99 在 50 节点时飙升:主要是 GC 压力和偶发的全量状态传输导致
如果你的场景超过 10 个节点,认真考虑 Op-based CRDT(delta-state CRDT 也行)——传输增量而非全量,merge 开销大幅降低。
CRDT 的魅力在于它用数学证明了一件反直觉的事:不需要任何协调,也能保证所有节点最终收敛。 这不是工程妥协,是数学定理。
但数学定理有前提条件。CRDT 的前提是:你的操作必须是单调的,你能忍受最终一致,你愿意为元数据膨胀买单。
不是所有系统都满足这些前提。但满足的那些——协同编辑、计数器、多活复制——CRDT 是比共识协议更优雅、更高可用的选择。
没有银弹。只有 trade-off。