一个 OR-Set(Observed-Remove Set)部署在 100 个节点上,每个节点维护的集合包含 10000 个元素。在 state-based CRDT 的经典设计里,每一轮反熵(anti-entropy)同步都要把整个状态发给邻居节点。单个节点的状态序列化后大约 200KB(假设每个元素连同唯一标签占 20 字节)。100 个节点两两同步一轮,网络上就要跑 100 x 99 x 200KB = 约 1.88GB 的数据。而这一轮同步的实际”新信息”可能只有几百字节——一个节点加了一个元素,另一个节点删了一个元素。
99.99% 的带宽被浪费在传输对方已经知道的信息上。
这就是 state-based CRDT 在工程落地时遇到的核心瓶颈。Operation-based CRDT(CmRDT)通过只传播操作来避免这个问题,但它要求底层通信层提供可靠的因果广播(reliable causal broadcast),这在大规模系统中本身就是一个难题。有没有一种方案,既保留 state-based CRDT 对网络层的宽松要求(只需要最终送达,不要求顺序和去重),又能把传输量降下来?
2015 年,Paulo Sergio Almeida、Ali Shoker 和 Carlos Baquero 在论文《Delta State Replicated Data Types》中给出了答案:Delta-state CRDT。核心思想很直接——不发全量状态,只发状态的增量(delta)。但要让这个思路在数学上严格成立,需要一套精确的形式化框架。要让它在工程上可用,还需要解决因果一致性、元数据膨胀和垃圾回收等一系列实际问题。
本文从 state-based CRDT 的带宽问题出发,逐步推导 delta-state CRDT 的理论基础,然后深入反熵协议设计、Merkle 树同步、元数据垃圾回收,最后分析 Riak 和 AntidoteDB 等实际系统的实现。
核心定理(Delta-state 等价性定理)
设 \((S, \sqsubseteq, \sqcup)\) 为一个 join-semilattice,\(m\) 为一个 mutator,\(m^\delta\) 为其对应的 delta-mutator,满足 \(m(s) = s \sqcup m^\delta(s)\)。若所有副本最终接收并合并了所有 delta,则 delta-state CRDT 与 full-state CRDT 收敛到相同的状态。关键区别在于:full-state 同步每次传输的消息大小为 \(O(|S|)\)(正比于完整状态),而 delta-state 同步的消息大小为 \(O(|m^\delta(s)|)\)(仅正比于本次变更),在保持相同收敛性保证的前提下将带宽开销降低了数量级。
一、State-based CRDT 的带宽问题
1.1 全量状态传输的代价
State-based CRDT(也称 CvRDT,Convergent Replicated Data Type)的工作模式是:每个节点在本地执行操作,修改本地状态;然后通过反熵协议定期将本地的完整状态发送给其他节点;接收方用一个合并函数(merge / join)将收到的状态与本地状态合并。合并函数要求是一个半格(join-semilattice)上的最小上界(least upper bound)操作:幂等、交换、结合。
这个模型的优雅之处在于它对网络的要求极低。消息可以丢失、重复、乱序,只要最终能送达,状态就会收敛。丢了就重发——反正发的是全量状态,重发不会产生错误。重复了也没关系——merge 是幂等的。乱序也无所谓——merge 是交换的。
但代价是带宽。
考虑一个 G-Counter(Grow-only Counter),部署在 n 个节点上。G-Counter 的状态是一个长度为 n 的向量,第 i 个位置存储节点 i 的本地计数。每次反熵同步需要传输整个向量。如果 n = 100,每个计数器占 8 字节,一次传输就是 800 字节。这看起来还行。但如果是一个 Map CRDT,里面嵌套了 1000 个 G-Counter 呢?那就是 800KB 一次。如果系统里有 100 个这样的 Map 呢?那就是 80MB 一次。
更严重的是 OR-Set。OR-Set 的经典实现中,每个元素关联一组唯一标签(unique tags)。添加元素时生成新标签,删除元素时移除该元素的所有可见标签。随着时间推移,标签的累积量远超集合中实际存活的元素数量。一个只有 100 个当前元素的 OR-Set,可能因为反复添加/删除操作而积累了数万个历史标签。全量传输这些元数据是巨大的浪费。
1.2 带宽消耗的量化分析
用更精确的方式量化这个问题。设系统有 n 个节点,每个节点的 CRDT 状态大小为 |S| 字节。反熵协议每 T 秒执行一轮,每轮中每个节点选择 k 个邻居发送状态(k 取决于拓扑结构,完全图中 k = n-1)。
每轮的总带宽消耗:
B_full = n * k * |S| bytes/round
对于完全图拓扑(每个节点向所有其他节点同步):
B_full = n * (n-1) * |S|
在前面的 OR-Set 例子中:n = 100,|S| = 200KB:
B_full = 100 * 99 * 200KB = 1,980,000KB ≈ 1.88GB/round
如果反熵间隔 T = 10 秒,持续带宽为 188MB/s。这个数字在一个 100 节点的集群中是不可接受的。
而同一时间段内,实际产生的新操作可能只有几十个,对应的状态变化量可能只有几 KB。全量同步的带宽利用率:
效率 = |实际变更| / |传输总量| = 几KB / 1.88GB ≈ 0.0001%
1.3 已有的优化尝试
在 delta-state CRDT 被提出之前,工程上有几种常见的优化手段:
压缩传输:用 gzip/snappy 等压缩算法压缩状态后传输。对结构化数据能压缩 3-10 倍,但不改变 O(|S|) 的量级。
降低同步频率:延长反熵间隔 T。但这直接导致数据收敛变慢,在某些场景下不可接受。
稀疏同步拓扑:不用完全图,改用稀疏拓扑(如环、树、随机图)。这可以把每个节点的同步邻居数 k 从 n-1 降到 O(log n) 或 O(1),但每个连接上仍然传输全量状态。而且稀疏拓扑增加了传播延迟——从一个节点到另一个节点的更新可能需要经过多跳才能到达。
传输差异(state diff):维护上一次发送给某个邻居的状态快照,只传输与快照的差异。这在工程上可行,但有两个问题:第一,需要为每个邻居维护一份快照,内存开销 O(n * |S|);第二,差异计算本身有开销,且没有通用的 CRDT 差异计算方法。
这些方案都没有从根本上解决问题。根本解决需要在 CRDT 的数学框架中引入增量的概念——这就是 delta-state CRDT 要做的事。
二、Delta-state CRDT 核心理论
2.1 基础概念回顾
在展开 delta-state 理论之前,先明确几个前置概念。
一个 state-based CRDT 定义为一个五元组 (S, s0, q, u, m):
- S:状态空间,构成一个 join-semilattice(加入半格(Join-semilattice))
- s0:初始状态(S 的底元素 ⊥)
- q:查询函数
- u:更新函数(mutator),u: S -> S
- m:合并函数,m(s1, s2) = s1 ⊔ s2(最小上界)
状态空间 S 上的偏序关系 ⊑ 定义为:s1 ⊑ s2 当且仅当 s1 ⊔ s2 = s2。直觉上,s1 ⊑ s2 意味着 s2 “包含”了 s1 的所有信息。
更新函数 u 必须是膨胀的(inflationary):对任意状态 s,u(s) ⊒ s。也就是说,每次操作只能让状态”增长”,不能后退。
2.2 Delta-mutator 的定义
Delta-state CRDT 的核心创新是引入了 delta-mutator(增量变更器)的概念。
在传统 state-based CRDT 中,mutator u 接受当前状态 s,返回一个新的完整状态 u(s)。在 delta-state CRDT 中,我们定义一个对应的 delta-mutator u^δ,它也接受当前状态 s,但返回的不是完整的新状态,而是一个增量(delta):
u^δ: S -> S
注意 delta 的类型仍然是 S——delta 本身也是状态空间中的一个元素。这是 delta-state 理论的关键设计决策。delta 不是某种独立的”操作日志”或”差异格式”,它就是一个(通常很小的)状态。因此,delta 可以用同样的 merge 函数与完整状态合并:
u(s) = s ⊔ u^δ(s)
也就是说,对状态 s 执行操作 u,等价于计算 delta u^δ(s),然后将 delta 合并回 s。
举一个具体的例子。G-Counter 的状态是一个向量 [c1, c2, …, cn],节点 i 执行 increment 操作的传统 mutator 是:
u_i([c1, ..., ci, ..., cn]) = [c1, ..., ci+1, ..., cn]
对应的 delta-mutator 是:
u_i^δ([c1, ..., ci, ..., cn]) = [0, ..., ci+1, ..., 0]
Delta 是一个”几乎全零”的向量,只有第 i 个位置有值。将这个 delta 与原始状态做逐位取 max(G-Counter 的 merge 就是逐位取 max),得到的结果与直接执行传统 mutator 完全相同。
2.3 Delta-group:聚合多个 delta
单个 delta 已经比全量状态小很多了。但如果节点在两次反熵同步之间执行了多次操作,我们不需要一个一个地发送 delta——可以先将它们聚合成一个 delta-group(增量组),然后一次性发送。
Delta-group 就是多个 delta 的 join(合并):
D = d1 ⊔ d2 ⊔ ... ⊔ dk
由于 join 操作是结合的和交换的,delta-group 的计算顺序无关紧要。而且 delta-group 本身也是 S 中的一个元素,所以它同样可以用 merge 函数与完整状态合并:
s' = s ⊔ D
这意味着节点可以在本地维护一个”delta 缓冲区”:每次执行操作时,将产生的 delta 加入(join)到缓冲区。在下一次反熵同步时,将整个缓冲区作为 delta-group 发送给邻居,然后清空缓冲区。
2.4 与 full-state CRDT 的等价性
Delta-state CRDT 的一个关键理论性质是:在可靠通信的条件下,delta-state 传播与 full-state 传播最终达到的状态完全相同。
这个性质的证明依赖于以下观察:
设节点 i 的状态从 s_0 开始,依次执行了操作 u1, u2, …, uk。在 full-state 模式下,最终状态为:
s_k = u_k(u_{k-1}(...u_1(s_0)...))
在 delta-state 模式下,每次操作产生一个 delta,最终状态为:
s_k = s_0 ⊔ u_1^δ(s_0) ⊔ u_2^δ(s_1) ⊔ ... ⊔ u_k^δ(s_{k-1})
由于每个操作 u_i 满足 u_i(s) = s ⊔ u_i^δ(s),并且 join 操作满足 s ⊑ u_i(s)(膨胀性),可以通过归纳法证明两种模式下的最终状态相同。
从多节点的角度看,只要所有 delta 最终都被所有节点收到并合并,所有节点的最终状态就会收敛到相同的值——与 full-state 同步的收敛结果一致。证明的关键在于 join 的幂等性和交换性保证了无论 delta 的接收顺序和重复次数如何,最终结果都相同。
两副本同步的完整推演
下面通过一个具体的两副本场景,展示 delta-state 同步的完整生命周期。
假设 G-Counter 在副本 A 和副本 B 上的初始状态均为
{A:0, B:0}。
- 副本 A 执行本地变更:A 执行
increment(),本地状态变为{A:1, B:0},同时生成 delta{A:1}。 - A 向 B 发送 delta:A 将 delta
{A:1}发送给 B,而非完整状态{A:1, B:0}。消息大小从 \(O(n)\) 降至 \(O(1)\)。 - B 合并收到的 delta:B 执行
merge(local, delta)={A:0, B:0} ⊔ {A:1}={A:1, B:0}。此时 B 的状态与 A 一致。 - B 执行本地变更:B 执行
increment(),本地状态变为{A:1, B:1},生成 delta{B:1}。 - B 向 A 发送 delta:B 将 delta
{B:1}发送给 A。 - A 合并收到的 delta:A 执行
merge(local, delta)={A:1, B:0} ⊔ {B:1}={A:1, B:1}。
最终两个副本均收敛到 {A:1, B:1},与
full-state 同步的结果完全相同。
在工程实现中,delta 不会逐个发送,而是累积为 delta-interval 后批量传输。发送方维护一个 ACK 游标,记录对端已确认接收的 delta 序号,仅发送游标之后的增量。以下时序图展示了这一完整的 delta-interval 传播协议:
sequenceDiagram
participant A as 副本 A
participant B as 副本 B
A->>A: mutate(): 执行本地变更
A->>A: 生成 delta, 追加到 DeltaBuffer
A->>A: 计算 delta-interval [ack_B+1, seq_A]
A->>B: 发送 delta-interval
B->>B: merge(local_state, delta-interval)
B-->>A: ACK(seq=latest_merged)
A->>A: 推进 ack_B 游标至 ACK.seq
B->>B: mutate(): 执行本地变更
B->>B: 生成 delta, 追加到 DeltaBuffer
B->>B: 计算 delta-interval [ack_A+1, seq_B]
B->>A: 发送 delta-interval
A->>A: merge(local_state, delta-interval)
A-->>B: ACK(seq=latest_merged)
B->>B: 推进 ack_A 游标至 ACK.seq
该时序图清晰地展示了 delta-interval 传播的核心机制:发送方根据 ACK 游标计算需要发送的增量区间,接收方合并后返回确认。ACK 机制确保了已确认的 delta 不会被重复发送,同时保留了幂等性——即使 ACK 丢失导致 delta 被重发,join 操作的幂等性保证合并结果不变。这种设计在保持 state-based CRDT 对网络层宽松要求的同时,将传输效率提升到了接近 operation-based CRDT 的水平。
2.5 Delta G-Counter 的完整实现
以下是 Delta G-Counter 的 Go 语言实现:
package crdt
// DeltaGCounter 实现了基于 delta-state 的 G-Counter。
// 状态是一个 map[nodeID]uint64,delta 也是同样的类型。
type DeltaGCounter struct {
id string
entries map[string]uint64
pending map[string]uint64 // delta 缓冲区
}
func NewDeltaGCounter(id string) *DeltaGCounter {
return &DeltaGCounter{
id: id,
entries: make(map[string]uint64),
pending: make(map[string]uint64),
}
}
// Increment 执行自增操作,同时生成 delta 并加入缓冲区。
func (c *DeltaGCounter) Increment() {
c.entries[c.id]++
// delta-mutator: 只包含本节点更新后的值
newVal := c.entries[c.id]
if existing, ok := c.pending[c.id]; !ok || newVal > existing {
c.pending[c.id] = newVal
}
}
// Value 返回计数器的当前值。
func (c *DeltaGCounter) Value() uint64 {
var sum uint64
for _, v := range c.entries {
sum += v
}
return sum
}
// Merge 将一个远程状态(可以是全量或 delta)合并到本地。
func (c *DeltaGCounter) Merge(remote map[string]uint64) {
for id, val := range remote {
if val > c.entries[id] {
c.entries[id] = val
}
}
}
// FlushDelta 取出缓冲区中的 delta-group 并清空缓冲区。
// 返回值就是要发送给邻居的增量。
func (c *DeltaGCounter) FlushDelta() map[string]uint64 {
delta := c.pending
c.pending = make(map[string]uint64)
return delta
}对比一下 full-state 模式。在 full-state
模式下,反熵同步发送的是完整的 entries
map,大小为 O(n),其中 n 是集群节点数。在 delta-state
模式下,发送的是 pending map,大小通常是
O(1)(只包含本节点自上次 flush 以来修改过的条目)。当集群有
100 个节点时,每次同步的传输量减少了约 100 倍。
2.6 Delta OR-Set
OR-Set 的 delta-state 版本更复杂,但原理相同。OR-Set 的状态可以表示为 (E, T),其中 E 是 {(element, tag)} 的集合(表示当前存在的元素),T 是所有已知 tag 的集合(包括已删除的)。但更常见的表示方法是使用 dot store(点存储)——每个”dot”是一个 (nodeID, sequenceNumber) 对,作为唯一标签。
以下是一个简化的 Delta OR-Set 伪代码:
type Dot struct {
NodeID string
Seq uint64
}
type DeltaORSet struct {
id string
nextSeq uint64
dots map[Dot]string // dot -> element
observed map[string]map[Dot]bool // element -> set of dots
pending DeltaPayload // delta 缓冲区
}
type DeltaPayload struct {
AddedDots map[Dot]string // 新增的 dot -> element 映射
RemovedDots map[Dot]bool // 被移除的 dots
}
// Add 向集合中添加一个元素。
// Delta 只包含新生成的 dot。
func (s *DeltaORSet) Add(elem string) {
dot := Dot{NodeID: s.id, Seq: s.nextSeq}
s.nextSeq++
s.dots[dot] = elem
if s.observed[elem] == nil {
s.observed[elem] = make(map[Dot]bool)
}
s.observed[elem][dot] = true
// delta: 只包含这个新 dot
s.pending.AddedDots[dot] = elem
}
// Remove 从集合中移除一个元素。
// Delta 包含被移除的所有 dots。
func (s *DeltaORSet) Remove(elem string) {
if dots, ok := s.observed[elem]; ok {
for dot := range dots {
delete(s.dots, dot)
s.pending.RemovedDots[dot] = true
}
delete(s.observed, elem)
}
}Add 操作的 delta 是一个单独的 dot(通常十几字节),而不是整个集合。Remove 操作的 delta 是被删除元素关联的所有 dot,通常也远小于全量状态。
三、Delta-interval 与因果一致性
3.1 为什么需要因果一致性
在 full-state CRDT 中,因果一致性是免费的——因为每次传输的是完整状态,包含了所有历史操作的效果。接收方只要做一次 merge 就能得到一个因果完整的状态。
但在 delta-state CRDT 中,情况不同。如果节点 A 先执行操作 op1,再执行 op2(op2 依赖 op1 的结果),A 产生了两个 delta:d1 和 d2。如果节点 B 只收到了 d2 但没收到 d1,直接将 d2 merge 到自己的状态,可能得到一个不一致的结果。
具体例子:考虑一个 2P-Set(两阶段集合),支持 add 和 remove。节点 A 先 add(“x”),产生 d1 = {add: x};然后 remove(“x”),产生 d2 = {remove: x}。如果节点 B 先收到 d2,尝试移除一个本地不存在的元素 x,这个操作不会产生效果。之后收到 d1,add(“x”),x 被加入。最终 B 的状态是 {x},但正确的状态应该是 {}。
这个问题的根源是:delta 的应用顺序可能违反因果关系。
3.2 Delta-interval 的定义
为了解决因果一致性问题,Almeida 等人引入了 delta-interval(增量区间)的概念。
节点 i 维护一个单调递增的序列号 c_i。每次执行操作时,序列号递增,并将 (c_i, delta) 关联起来。第 c 次操作产生的 delta 记为 d_i^c。
Delta-interval D_i^{a,b} 定义为节点 i 在序列号 a 到 b 之间的所有 delta 的 join:
D_i^{a,b} = d_i^a ⊔ d_i^{a+1} ⊔ ... ⊔ d_i^b
当节点 i 要向节点 j 发送增量时,它需要知道 j 上次确认收到的最大序列号 ack_j。然后发送 D_i^{ack_j+1, c_i}——即 j 还没看到的所有 delta 的聚合。
3.3 使用版本向量跟踪 delta 传播
在实践中,跟踪 delta 传播进度使用版本向量(Version Vector)。每个节点维护一个版本向量 VV,记录它已经合并过的来自每个其他节点的最大序列号。
当节点 i 向节点 j 发送 delta 时:
- 节点 i 查询 ack_map[j],得到 j 上次确认收到的序列号(初始为 0)
- 节点 i 计算 D_i^{ack_map[j]+1, c_i},发送给 j
- 节点 j 收到后,将 D 与本地状态 merge,并更新 VV[i] = c_i
- 节点 j 回复 ack,节点 i 更新 ack_map[j] = c_i
这个机制类似于 TCP 的序列号和确认机制——发送方记录接收方的进度,只发送对方缺失的部分。
以下是跟踪机制的 Go 语言实现:
type DeltaManager struct {
nodeID string
seq uint64
deltaLog []DeltaEntry // 按序列号排列的 delta 日志
ackMap map[string]uint64 // 邻居 -> 已确认的序列号
vv map[string]uint64 // 本地版本向量
}
type DeltaEntry struct {
Seq uint64
Delta interface{} // 实际的 delta 值
}
// 生成发送给 peer 的 delta-interval
func (dm *DeltaManager) DeltaIntervalFor(peer string) interface{} {
acked := dm.ackMap[peer] // 默认 0
var group interface{}
for _, entry := range dm.deltaLog {
if entry.Seq > acked {
group = joinDelta(group, entry.Delta)
}
}
return group
}
// 收到 peer 的 ack 后更新进度
func (dm *DeltaManager) OnAck(peer string, ackedSeq uint64) {
if ackedSeq > dm.ackMap[peer] {
dm.ackMap[peer] = ackedSeq
}
}3.4 因果稳定性(Causal Stability)
因果稳定性是 delta-state CRDT 中一个重要的概念,它与后文讨论的元数据垃圾回收密切相关。
一个 delta d_i^c 在节点 j 上是因果稳定的(causally stable),当且仅当集群中所有节点都已经合并了这个 delta(直接合并或通过传递性合并)。
判断因果稳定性的方法是检查所有节点的版本向量。设 min_ack = min(VV_k[i]) 对所有节点 k(包括 i 自己)。如果 c <= min_ack,那么 d_i^c 在所有节点上都是因果稳定的。
因果稳定的 delta 有两个重要用途:
delta 日志的裁剪:因果稳定的 delta 不会再被任何节点需要,可以安全地从 delta 日志中删除。这防止了 delta 日志无限增长。
元数据垃圾回收:某些 CRDT 的元数据(如 OR-Set 中的墓碑标记)在因果稳定后可以安全回收,详见第六节。
计算因果稳定性需要知道所有节点的版本向量。在实际系统中,这个信息通过 gossip 协议传播。每个节点定期将自己的版本向量广播给邻居,邻居收集所有版本向量后取逐位最小值(component-wise minimum),得到全局因果稳定点。
// 计算因果稳定点:所有节点都已经看到的最大序列号
func causalStabilityPoint(allVVs map[string]map[string]uint64, nodeID string) uint64 {
minAck := uint64(math.MaxUint64)
for _, vv := range allVVs {
if ack, ok := vv[nodeID]; ok {
if ack < minAck {
minAck = ack
}
} else {
return 0 // 有节点还没收到任何 delta
}
}
return minAck
}四、反熵协议(Anti-Entropy Protocols)
4.1 反熵的基本模式
反熵(anti-entropy)是分布式系统中用于同步副本状态的机制。这个术语来源于物理学中的”熵”——系统的无序程度。副本之间的不一致可以看作一种”熵”,反熵协议的目标就是消除这种不一致。
反熵协议有两个基本维度的设计选择:
触发方式:
- 定期反熵(Periodic Anti-Entropy):每隔固定时间间隔 T 触发一次同步。实现简单,带宽可预测,但有固定的同步延迟(最大 T 秒)。
- 事件驱动反熵(Event-driven Anti-Entropy):每次本地状态发生变化时立即触发同步。延迟低,但在写入密集的场景下可能产生大量同步消息。实际系统中通常会对事件驱动做节流(throttling),比如累积 delta 到一定大小或等待一小段时间再发送。
传播方向:
- Push:节点主动将自己的状态(或 delta)推送给邻居。适用于写入频繁的场景,因为写入方知道自己有新数据。
- Pull:节点主动从邻居拉取状态。适用于读取频繁的场景,读取方可以在需要时拉取最新数据。
- Push-Pull:双向交换。节点同时推送自己的数据并拉取对方的数据。带宽消耗更高,但收敛速度最快。
在 delta-state CRDT 的语境下,push 模式是最自然的选择:节点在执行操作后产生 delta,然后将 delta(或 delta-group)推送给邻居。Pull 模式也可行,但需要接收方告知自己的版本向量,让发送方计算出需要的 delta-interval。
4.2 反熵协议的正确性要求
一个正确的反熵协议必须满足以下性质:
最终传播(Eventual Propagation):任何一个节点上的任何一次操作,其效果最终会传播到所有节点。形式化地说,对于任何操作 op 在节点 i 上产生的 delta d,存在一个时间点 t,使得在 t 之后所有节点的状态都”包含”了 d 的效果。
对于 delta-state 的额外要求——因果传播(Causal Propagation):delta 的传播必须保证因果顺序。具体来说,如果节点 j 合并了 delta d2,而 d2 因果依赖于 d1,那么 j 必须在合并 d2 之前(或同时)合并 d1。使用 delta-interval 机制可以自然地保证这一点,因为 delta-interval 包含了连续序列号范围内的所有 delta。
容错性(Fault Tolerance):在节点崩溃和网络分区的情况下,协议仍然能最终完成同步(在故障恢复后)。
4.3 传播拓扑选择
反熵协议需要选择传播拓扑——即每个节点与哪些节点进行同步。
完全图(Full Mesh):每个节点与所有其他节点同步。收敛速度最快(一轮即可),但消息数量为 O(n^2),不适用于大规模集群。
随机选择(Random Peer Selection):每轮随机选择 k 个邻居同步。这是 gossip 协议的经典做法。Epidemic theory(流行病理论(Epidemic Theory))证明,当 k >= ln(n) + c 时(c 是一个常数),信息在 O(log n) 轮内以高概率传播到所有节点。
固定拓扑:使用固定的拓扑结构,如环(ring)、树(tree)或超立方体(hypercube)。环的消息数量最少(每个节点只与相邻两个节点同步),但传播延迟为 O(n)。树的传播延迟为 O(log n),但有单点故障问题。超立方体(n 个节点构成 log(n) 维超立方体)在消息数量 O(n log n) 和传播延迟 O(log n) 之间取得了较好的平衡。
在实际系统中,随机选择是最常用的方案,因为它在收敛速度、容错性和实现复杂度之间取得了良好的平衡。Riak 和 Cassandra 都使用随机选择的 gossip 协议进行反熵同步。
4.4 实现示例:带 delta-interval 的反熵协议
以下是一个完整的反熵协议实现框架:
type AntiEntropyNode struct {
id string
state JoinSemilattice // CRDT 状态
deltaLog []DeltaEntry // delta 日志
ackMap map[string]uint64 // peer -> 已确认序列号
seq uint64 // 本地序列号
peers []string // 邻居节点列表
interval time.Duration // 反熵间隔
}
// 定期反熵循环
func (n *AntiEntropyNode) AntiEntropyLoop() {
ticker := time.NewTicker(n.interval)
for range ticker.C {
// 随机选择一个邻居
peer := n.peers[rand.Intn(len(n.peers))]
n.pushDelta(peer)
}
}
// 向 peer 推送 delta-interval
func (n *AntiEntropyNode) pushDelta(peer string) {
acked := n.ackMap[peer]
if acked >= n.seq {
return // 该 peer 已经是最新的
}
// 构建 delta-interval: (acked, seq]
var group JoinSemilattice
for _, entry := range n.deltaLog {
if entry.Seq > acked && entry.Seq <= n.seq {
group = join(group, entry.Delta)
}
}
// 发送 delta-group 和当前序列号
send(peer, DeltaMessage{
From: n.id,
Group: group,
MaxSeq: n.seq,
})
}
// 接收远程 delta
func (n *AntiEntropyNode) OnReceiveDelta(msg DeltaMessage) {
n.state = join(n.state, msg.Group)
// 回复 ack
send(msg.From, AckMessage{
From: n.id,
AckedSeq: msg.MaxSeq,
})
}
// 收到 ack
func (n *AntiEntropyNode) OnReceiveAck(msg AckMessage) {
if msg.AckedSeq > n.ackMap[msg.From] {
n.ackMap[msg.From] = msg.AckedSeq
}
// 裁剪 delta 日志:删除所有 peer 都已确认的 delta
n.gcDeltaLog()
}
func (n *AntiEntropyNode) gcDeltaLog() {
minAcked := n.seq
for _, peer := range n.peers {
if ack := n.ackMap[peer]; ack < minAcked {
minAcked = ack
}
}
// 删除 seq <= minAcked 的 delta 条目
newLog := make([]DeltaEntry, 0)
for _, entry := range n.deltaLog {
if entry.Seq > minAcked {
newLog = append(newLog, entry)
}
}
n.deltaLog = newLog
}值得注意的是 gcDeltaLog
方法——它只在所有邻居都确认收到某个 delta
后才将其从日志中删除。如果某个邻居长时间不可达(比如节点崩溃或网络分区),delta
日志会持续增长。实际系统中需要设置日志大小上限,当日志被截断时,对应的邻居在下次恢复连接时需要做一次全量同步。这是
delta-state 相比 full-state 的一个权衡:delta-state
需要维护额外的状态(delta 日志和 ack
map),而当这些状态丢失或不足时,需要回退到全量同步。
五、Merkle 树反熵
5.1 Merkle 树在数据同步中的应用
Merkle 树(Merkle Tree)是一种哈希树(Hash Tree),由 Ralph Merkle 在 1979 年提出。它的特点是:叶节点存储数据块的哈希值,非叶节点存储其子节点哈希值的拼接的哈希。根节点的哈希值(root hash)是整棵树所有数据的”指纹”。
在分布式数据同步中,Merkle 树提供了一种高效的不一致检测机制:
- 两个节点各自为本地数据构建 Merkle 树
- 先比较根哈希。如果相同,说明数据完全一致,无需同步
- 如果根哈希不同,递归比较子树的哈希,快速定位到不一致的数据分区
- 只对不一致的分区进行数据同步
这个过程的关键优势是:比较的通信量与不一致数据的大小成正比,而不是与总数据量成正比。如果 n 个数据项中只有 k 个不一致,比较过程只需要交换 O(k * log(n/k)) 个哈希值。
5.2 Merkle 树在 Cassandra 和 Dynamo 中的应用
Amazon Dynamo 论文(2007)首先将 Merkle 树引入分布式存储的反熵同步。Cassandra 作为 Dynamo 的开源后继者,在其反熵修复(anti-entropy repair)机制中广泛使用了 Merkle 树。
Cassandra 的 Merkle 树反熵工作流程:
- 构建阶段:节点对自己负责的数据范围(token range)构建 Merkle 树。每个叶节点对应一个 token 范围的分区,存储该分区内所有数据的哈希。
- 交换阶段:两个负责相同 token 范围的副本节点交换 Merkle 树。
- 比较阶段:从根到叶逐层比较,找出哈希不同的叶节点。
- 修复阶段:只对哈希不同的分区进行数据流式传输(streaming)。
Cassandra 的 nodetool repair
命令触发这个过程。在 Cassandra 4.0
之前,这是一个资源密集的操作,因为需要读取整个数据集来构建
Merkle 树。Cassandra 4.0 引入了增量修复(Incremental
Repair),只对自上次修复以来有变更的数据构建 Merkle
树,大幅减少了修复的开销。
Merkle 树反熵的局限性:
- 构建成本:需要扫描整个数据集计算哈希。对于大数据集,这本身就是一个 I/O 密集操作。
- 静态快照:Merkle 树是在某个时间点构建的快照。在构建过程中如果有新的写入,可能导致误判。Cassandra 通过在构建前暂停 compaction 来缓解这个问题。
- 粒度问题:叶节点的粒度决定了修复的精度。粒度太粗,可能导致不必要的数据传输;粒度太细,Merkle 树本身的传输开销变大。
5.3 Merkle 树与 delta-state 的结合
Merkle 树和 delta-state CRDT 可以互补:
正常情况下用 delta:在网络连通、节点正常的情况下,使用 delta-interval 传播增量。这是最高效的同步方式。
异常恢复用 Merkle 树:当节点崩溃恢复后,或者因为 delta 日志被截断导致无法增量同步时,使用 Merkle 树快速定位不一致的数据,然后只对这些数据进行全量同步。
混合策略:某些系统(如 Riak)同时运行两套反熵机制。delta 传播处理正常操作流,Merkle 树反熵作为”安全网”定期运行,捕获任何 delta 传播可能遗漏的不一致。
这种混合策略在实际系统中非常实用。Delta 传播是”最优路径”——在一切正常的情况下提供低延迟、低带宽的同步。Merkle 树反熵是”兜底路径”——在各种异常情况(节点崩溃、delta 日志丢失、网络长时间分区)下保证数据最终一致。
%% 伪代码:混合反熵策略
-module(hybrid_anti_entropy).
%% 正常路径:delta 传播(高频,低开销)
handle_local_op(Op, State) ->
{NewState, Delta} = apply_delta_mutator(Op, State),
buffer_delta(Delta),
schedule_delta_push(short_interval()),
NewState.
%% 定期 delta 推送
handle_delta_push(Peer, DeltaBuffer, AckMap) ->
Acked = maps:get(Peer, AckMap, 0),
Interval = compute_delta_interval(DeltaBuffer, Acked),
case Interval of
empty -> ok;
_ -> send_delta(Peer, Interval)
end.
%% 兜底路径:Merkle 树反熵(低频,高开销)
handle_merkle_repair(Peer, LocalData) ->
LocalTree = build_merkle_tree(LocalData),
RemoteTree = request_merkle_tree(Peer),
DiffRanges = compare_trees(LocalTree, RemoteTree),
lists:foreach(fun(Range) ->
repair_range(Peer, Range)
end, DiffRanges).下图展示了 Merkle 树反熵对账的完整流程。两个副本从根哈希开始逐层比较,快速定位不一致的子树,最终仅交换差异部分的 delta,避免了全量扫描:
flowchart TD
A["比较根节点哈希"] -->|哈希相同| B["状态一致,无需同步"]
A -->|哈希不同| C["递归比较子节点哈希"]
C --> D["左子树哈希相同"]
C --> E["右子树哈希不同"]
D --> F["跳过左子树"]
E --> G["继续向下钻取右子树"]
G --> H["定位到叶子节点层的差异 key 范围"]
H --> I["仅交换差异范围内的 delta"]
I --> J["接收方合并 delta"]
J --> K["更新本地 Merkle 树哈希"]
Merkle 树对账的核心优势在于将差异定位的时间复杂度从 \(O(n)\) 降低到 \(O(\log n)\),其中 \(n\) 为数据条目总数。通过逐层哈希比较,两个副本可以在 \(O(\log n)\) 轮交互中精确定位所有不一致的 key 范围,然后仅针对这些范围交换 delta。这种机制特别适合作为 delta-interval 协议的兜底方案——当 delta buffer 因容量限制被截断或节点重启导致 ACK 游标丢失时,Merkle 树反熵可以高效地恢复一致性,而无需回退到全量状态传输。
六、元数据垃圾回收
6.1 CRDT 元数据膨胀问题
CRDT 的正确性依赖于某些元数据,而这些元数据有一个共同特点:它们只增不减。
向量时钟 / 版本向量:大小随参与过的节点数线性增长。如果系统中的节点 ID 是动态分配的(比如容器化环境中,每次重启分配新 ID),版本向量会无限膨胀。
OR-Set 的唯一标签:每次 add 操作都生成新标签。即使元素被删除,标签仍然保留在”墓碑”集合中(用于防止已删除的元素被因果上更早的 add 操作”复活”)。大量 add/remove 操作后,墓碑数量可能远超存活元素数量。
LWW-Register(Last-Writer-Wins Register)的时间戳历史:虽然只需要保留最新的时间戳,但在某些实现中需要保留历史时间戳来处理因果上的并发写入。
Map CRDT 的嵌套元数据:Map 中每个 key 对应的 value 本身是一个 CRDT,每个 value 都有自己的元数据。Map 越大,元数据总量越大。
元数据膨胀的实际影响:
- 状态大小持续增长,增加存储和内存开销
- 序列化/反序列化时间增加
- 即使使用 delta-state,被动接收的 delta 仍然需要与本地状态 merge,而 merge 的时间复杂度与状态大小相关
- 在存储和网络资源有限的设备(如移动端、IoT)上尤其严重
6.2 因果稳定性条件下的垃圾回收
元数据垃圾回收(Garbage Collection,GC)的核心难点在于:过早回收可能导致语义错误。
考虑 OR-Set 的墓碑回收。元素 x 被节点 A 删除后,A 保留了 x 的墓碑 {(x, tag1), (x, tag2)}。如果 A 在其他所有节点收到这个删除信息之前就回收了墓碑,那么:
- 节点 B 之前 add(“x”) 产生的 (x, tag1) 还没传到节点 C
- A 回收了墓碑
- B 的 add 传播到 C,C 看到 (x, tag1),因为没有对应的墓碑信息,C 认为 x 是存活的
- x 被错误地”复活”了
因果稳定性提供了安全回收的条件:只有当一个操作(以及它产生的元数据)在所有节点上都已经被观察到,它产生的元数据才可以安全回收。
def can_gc_tombstone(tombstone, all_version_vectors, origin_node, origin_seq):
"""判断一个墓碑是否可以安全回收。
只有当所有节点都已经观察到这个墓碑对应的操作时,
才可以安全回收。
"""
for node_id, vv in all_version_vectors.items():
if vv.get(origin_node, 0) < origin_seq:
return False # 还有节点没看到这个操作
return True
def gc_tombstones(local_tombstones, all_version_vectors):
"""回收所有因果稳定的墓碑。"""
to_remove = []
for ts in local_tombstones:
if can_gc_tombstone(ts, all_version_vectors,
ts.origin_node, ts.origin_seq):
to_remove.append(ts)
for ts in to_remove:
local_tombstones.remove(ts)
return len(to_remove)6.3 基于 Epoch 的垃圾回收
因果稳定性条件下的 GC 需要知道所有节点的版本向量,这在大规模系统中本身是一个挑战。一种替代方案是基于 epoch(纪元)的 GC。
基本思路:
- 系统维护一个全局 epoch 编号。每个 epoch 持续一段固定时间(如 1 小时)。
- 每个操作都标记它所属的 epoch。
- 当一个 epoch 结束时,系统通过一个协调协议(如 two-phase commit)确认所有节点都已经进入了新 epoch,并且旧 epoch 的所有操作都已经同步完成。
- 确认后,旧 epoch 中的元数据可以安全回收。
class EpochBasedGC:
def __init__(self, node_id, epoch_duration_sec=3600):
self.node_id = node_id
self.current_epoch = 0
self.epoch_duration = epoch_duration_sec
self.metadata_by_epoch = {} # epoch -> [metadata]
self.confirmed_epochs = set() # 已确认同步完成的 epoch
def tag_metadata(self, metadata):
"""为元数据打上当前 epoch 标签。"""
epoch = self.current_epoch
if epoch not in self.metadata_by_epoch:
self.metadata_by_epoch[epoch] = []
self.metadata_by_epoch[epoch].append(metadata)
return epoch
def confirm_epoch(self, epoch):
"""协调者确认某个 epoch 的所有操作已同步。"""
self.confirmed_epochs.add(epoch)
def gc(self):
"""回收已确认 epoch 的元数据。"""
reclaimed = 0
for epoch in list(self.metadata_by_epoch.keys()):
if epoch in self.confirmed_epochs:
reclaimed += len(self.metadata_by_epoch[epoch])
del self.metadata_by_epoch[epoch]
return reclaimedEpoch-based GC 的优势是不需要逐操作跟踪因果稳定性,只需要在 epoch 粒度上做确认。缺点是 GC 的粒度较粗(一个 epoch 的所有元数据要么全部回收,要么全部保留),且需要一个协调者来确认 epoch 完成。
6.4 GC 与正确性的权衡
在实际系统中,元数据 GC 往往需要在正确性和资源消耗之间做权衡。
保守策略:严格按照因果稳定性条件回收。优点是绝对安全,不会破坏 CRDT 语义。缺点是如果有节点长时间不可达,因果稳定性条件一直不满足,元数据就无法回收。极端情况下(永久离线的节点),元数据永远不会被回收。
激进策略:设置一个时间阈值(如 7 天),超过阈值的元数据无条件回收。优点是保证元数据大小有上界。缺点是如果有节点超过 7 天才恢复,可能观察到语义错误。实际系统中通常结合节点成员变更协议——如果一个节点超过阈值没有响应,将其从集群中移除,回收与其相关的元数据。
Riak 的做法:Riak 的 CRDT 实现采用了一种务实的方案。OR-Set 使用”dot”((nodeID, seq))作为唯一标签,而不是传统的 UUID。这使得元数据天然可以用版本向量来追踪和回收。当一个 dot 被所有节点的版本向量覆盖时,与该 dot 相关的元数据就可以安全回收。Riak 在每次合并时顺便做 GC,避免了单独的 GC 阶段。
下图展示了因果稳定性驱动的 GC 状态机,描述了一个 delta 从产生到最终被安全回收的完整生命周期:
stateDiagram-v2
[*] --> DeltaAccumulated: 本地变更产生 delta
DeltaAccumulated --> CausallyStable: 所有副本已确认接收\n(ACK 覆盖该 delta 的序号)
DeltaAccumulated --> DeltaAccumulated: 仍有副本未确认
CausallyStable --> SafeToGC: 确认无迟到消息依赖该 delta\n(版本向量全局推进)
CausallyStable --> CausallyStable: 存在潜在的迟到消息
SafeToGC --> Compacted: 执行压缩,回收元数据
Compacted --> [*]
该状态机的关键在于”因果稳定”到”可安全回收”之间的过渡条件。一个 delta 被所有副本 ACK 并不立即意味着可以回收——还需要确保不存在尚未送达的、因果上依赖该 delta 的迟到消息。只有当全局版本向量推进到覆盖该 delta 的因果上下文时,才能安全地将其标记为可回收。在实际系统中,“Compacted”阶段通常与 merge 操作合并执行(如 Riak 的做法),而非作为独立的后台任务,以避免额外的锁竞争和 I/O 开销。
七、Riak delta-mutation 实现分析
7.1 Riak 2.x 的 CRDT 实现
Riak 是 Basho Technologies 开发的分布式键值存储,基于 Dynamo 论文设计。Riak 从 2.0 版本开始内置了 CRDT 支持(称为 Riak Data Types),包括 Counter、Set、Map、Register 和 Flag 五种数据类型。
Riak 的 CRDT 实现基于 riak_dt 这个 Erlang
库,后来演化为 riak_crdt。这个库实现了多种
CRDT,其中最核心的是基于 dot 的 OR-Set(在 Riak 中称为
riak_dt_orswot——Observed-Remove Set Without
Tombstones)。
“Without Tombstones”是这个实现的亮点。传统 OR-Set 需要维护一个墓碑集合来记录已删除的元素标签。Riak 的 ORSWOT 通过一个巧妙的设计避免了墓碑:它使用一个全局的版本向量(称为 “clock”)来记录已经观察到的所有 dot。删除操作不需要显式记录墓碑,而是通过版本向量隐式表示——如果一个 dot 被版本向量覆盖(即 dot 的序列号小于等于版本向量中对应节点的序列号),但不在 entries 集合中,那么这个 dot 对应的元素就是已被删除的。
7.2 从全量同步到 delta-mutation
Riak 2.0 最初使用全量状态同步。每次 CRDT 值被读取或写入时,Riak 都会在副本之间传输完整的 CRDT 状态。对于大型 Map(嵌套多层 CRDT),这意味着每次操作都传输大量数据。
Riak 2.2 引入了 delta-mutation(在 Riak 的术语中也称为
“CRDT delta-mutations” 或 “dot-deltas”)。核心改动是在
riak_dt 库的每个数据类型中添加了 delta
计算逻辑。
以 ORSWOT(OR-Set)为例,Riak 的 delta 实现遵循以下原则:
%% riak_dt_orswot.erl 中的核心数据结构
%% ORSWOT 状态由三部分组成:
%% - Clock: 版本向量,记录已观察到的所有 dot
%% - Entries: [{Element, [Dot]}],当前存活的元素及其 dot
%% - Deferred: 延迟的操作(处理因果不一致)
-record(orswot, {
clock = riak_dt_vclock:new() :: riak_dt_vclock:vclock(),
entries = orddict:new() :: orddict:orddict(),
deferred = orddict:new() :: orddict:orddict()
}).
%% Add 操作的 delta-mutator
%% 输入: 当前状态、要添加的元素、操作者 ID
%% 输出: delta(一个最小化的 ORSWOT 状态)
delta_add(Element, Actor, #orswot{clock = Clock}) ->
NewSeq = riak_dt_vclock:get_counter(Actor, Clock) + 1,
NewDot = {Actor, NewSeq},
%% Delta 只包含新 dot 和最小必要的 clock 信息
#orswot{
clock = riak_dt_vclock:merge(
[{Actor, NewSeq}], []),
entries = orddict:store(Element, [NewDot], orddict:new()),
deferred = orddict:new()
}.
%% Remove 操作的 delta-mutator
%% 输入: 当前状态、要删除的元素
%% 输出: delta
delta_remove(Element, #orswot{entries = Entries, clock = Clock}) ->
case orddict:find(Element, Entries) of
{ok, Dots} ->
%% Delta 包含被移除的 dots 信息
%% 通过只包含 clock(覆盖这些 dots)但不包含 entries 来表示删除
DotClock = lists:foldl(
fun({Actor, Seq}, Acc) ->
riak_dt_vclock:merge([{Actor, Seq}], Acc)
end, riak_dt_vclock:new(), Dots),
#orswot{
clock = DotClock,
entries = orddict:new(), % 空 entries 表示这些 dots 被删除
deferred = orddict:new()
};
error ->
%% 元素不存在,空 delta
#orswot{}
end.这段代码展示了 delta-mutator 的核心设计:add 操作的 delta 只包含新生成的 dot 和对应的元素;remove 操作的 delta 只包含被移除 dot 的版本信息。两种情况下,delta 的大小都与操作涉及的数据量成正比,而不是与整个集合大小成正比。
7.3 合并逻辑
Riak 的 ORSWOT merge 函数同时处理全量状态和 delta 的合并(因为 delta 本身就是一个合法的 ORSWOT 状态):
%% 合并两个 ORSWOT 状态
%% 核心逻辑:
%% 1. 合并 clock(版本向量取逐位 max)
%% 2. 对于 entries,只保留那些 dot 不被对方 clock 覆盖的元素
%% (被对方 clock 覆盖但不在对方 entries 中的 dot = 已被对方删除)
merge(#orswot{clock = CA, entries = EA},
#orswot{clock = CB, entries = EB}) ->
MergedClock = riak_dt_vclock:merge(CA, CB),
%% 保留 A 的 entries 中不被 B 的 clock 覆盖的 dots
%% 加上 B 的 entries 中不被 A 的 clock 覆盖的 dots
%% 加上两边都有的 dots(取并集)
MergedEntries = merge_entries(EA, CA, EB, CB),
#orswot{
clock = MergedClock,
entries = MergedEntries,
deferred = orddict:new()
}.
merge_entries(EA, CA, EB, CB) ->
%% 对每个元素:
%% - 如果只在 A 中:保留 A 中不被 CB 覆盖的 dots
%% - 如果只在 B 中:保留 B 中不被 CA 覆盖的 dots
%% - 如果两边都有:合并两边的 dots,去除被对方 clock 覆盖的
AllKeys = orddict:fetch_keys(EA) ++ orddict:fetch_keys(EB),
UniqueKeys = lists:usort(AllKeys),
lists:foldl(fun(Key, Acc) ->
DotsA = case orddict:find(Key, EA) of
{ok, DA} -> [D || D = {Actor, Seq} <- DA,
not riak_dt_vclock:descends(CB, [{Actor, Seq}])
orelse orddict:is_key(Key, EB)];
error -> []
end,
DotsB = case orddict:find(Key, EB) of
{ok, DB} -> [D || D = {Actor, Seq} <- DB,
not riak_dt_vclock:descends(CA, [{Actor, Seq}])
orelse orddict:is_key(Key, EA)];
error -> []
end,
MergedDots = lists:usort(DotsA ++ DotsB),
case MergedDots of
[] -> Acc;
_ -> orddict:store(Key, MergedDots, Acc)
end
end, orddict:new(), UniqueKeys).合并逻辑的精妙之处在于它同时实现了 add-wins 语义和隐式墓碑回收。一个 dot 如果被对方的 clock 覆盖但不在对方的 entries 中,说明对方已经删除了这个元素——不需要显式墓碑。
7.4 性能改进
Riak 官方博客和社区基准测试显示,delta-mutation 带来了显著的性能改善:
- 网络带宽:对于包含 1000 个元素的 Set,单次同步的传输量从约 50KB(全量状态)降低到几百字节(delta),减少了约 99%。
- CPU 开销:merge 操作的 CPU 消耗与 delta 大小成正比。delta 模式下单次 merge 的 CPU 时间显著缩短。
- 同步延迟:由于传输量减少,网络延迟的影响被放大。delta 模式下同步延迟更低。
- 内存占用:需要额外维护 delta 日志,但日志通常远小于全量状态。在正常运行条件下,delta 日志的大小是可控的。
需要指出的一个权衡:delta 模式增加了实现复杂度。每种 CRDT 数据类型都需要实现对应的 delta-mutator,并且 delta-mutator 的正确性需要单独验证(必须满足 u(s) = s ⊔ u^δ(s))。Riak 团队为此编写了大量的 property-based test(基于性质的测试),使用 Erlang 的 PropEr 框架验证 delta-mutator 和 merge 函数的正确性。
八、实际系统中的应用
8.1 AntidoteDB
AntidoteDB 是一个由欧盟 SyncFree 和 LightKone 研究项目资助开发的分布式数据库,专门为地理分布式场景设计。它的核心特性是提供因果一致性(Causal Consistency)保证,同时支持高可用的 CRDT 操作。
AntidoteDB 的 CRDT 层大量使用了 delta-state 技术:
数据中心内同步:AntidoteDB 在同一数据中心内的副本之间使用基于 delta 的同步。每个数据中心维护一个分区日志(partition log),记录所有操作。副本之间通过交换日志条目(本质上是 delta)来同步。
跨数据中心同步:跨数据中心的同步使用因果一致的广播协议。AntidoteDB 维护了一个全局的向量时钟(每个数据中心一个条目),保证跨数据中心的操作按因果顺序传播。这与 delta-interval 的思想一致——只传输对方还没看到的操作。
AntidoteDB 的一个关键创新是”Cure”协议(发表于 ICDCS 2016)。Cure 提供了跨数据中心的因果一致性保证,同时保持了高可用性。它使用一种优化的依赖跟踪机制,不需要为每个操作维护完整的向量时钟,而是使用”稳定向量”(stable vector)来批量处理因果依赖。这种批量处理的思想与 delta-group 的聚合思想异曲同工。
AntidoteDB 在元数据管理方面也做了很多工作。它支持”因果稳定性”驱动的自动 GC——当一个操作的效果在所有数据中心都可见后,相关的元数据(如墓碑、版本向量条目)可以自动回收。
8.2 Redis CRDB(Active-Active)
Redis Enterprise 的 Active-Active 功能(前身为 Redis CRDB,Conflict-free Replicated Database)提供了跨地域的多主复制。底层使用 CRDT 来解决冲突。
Redis CRDB 支持的 CRDT 数据类型包括:
- String:LWW-Register 语义
- Counter:PN-Counter,支持增减
- Set:OR-Set 语义,add-wins
- Sorted Set:扩展的 OR-Set,带有 LWW score
- Hash:per-field LWW-Register
- List:基于 LWW 的合并
Redis CRDB 在同步优化上采用了与 delta-state 相似的策略:
操作日志传播:Redis CRDB 不传输完整的数据快照,而是传输操作日志(effect log)。每个写操作产生一个 effect(效果),effect 被复制到其他数据中心。这类似于 operation-based CRDT,但 effect 是幂等的——可以重复应用而不改变结果。这种”基于效果的传播”(effect-based replication)可以看作 delta-state 和 operation-based CRDT 的混合体。
压缩与批量传输:多个 effect 在传输前会被压缩和批量化。连续的 counter increment 操作会被合并为一个净增量,连续的 set add 操作会被合并为一次批量添加。这与 delta-group 的聚合思想一致。
基于 CRDT 的冲突解决:当两个数据中心对同一个 key 做了并发修改时,Redis CRDB 使用 CRDT 的合并语义来解决冲突。对于 Counter,合并是两个 PN-Counter 的逐节点 max。对于 Set,合并是 add-wins 的 OR-Set 合并。对于 String,合并是 LWW——时间戳更大的值胜出。
8.3 对比分析
| 特性 | Riak (delta-mutation) | AntidoteDB | Redis CRDB |
|---|---|---|---|
| 同步粒度 | Delta-state | Delta + 操作日志 | Effect(幂等操作) |
| 因果一致性 | 最终一致 | 因果一致(Cure 协议) | 最终一致 + LWW |
| GC 策略 | 隐式(通过 dot+VV) | 因果稳定性驱动 | 操作日志截断 |
| 元数据开销 | 低(无墓碑设计) | 中等(维护因果信息) | 低(LWW 为主) |
| 跨 DC 同步 | Merkle 树 + delta | Cure 协议 | 操作日志复制 |
| CRDT 类型丰富度 | Counter, Set, Map, Register, Flag | Counter, Set, Map, Register, MV-Register | String, Counter, Set, Sorted Set, Hash, List |
三个系统代表了 delta-state CRDT 在工程实践中的三种路线:
Riak 走的是纯 state-based 路线,通过 delta-mutation 优化带宽。它的 ORSWOT(无墓碑 OR-Set)设计在元数据管理上非常精巧。代价是实现复杂度高——每种数据类型都需要精心设计 delta-mutator 和 merge 函数。
AntidoteDB 走的是学术研究路线,将因果一致性作为核心保证。它的 Cure 协议在理论上更强(提供因果一致性而不仅仅是最终一致性),但实现和运维复杂度也更高。
Redis CRDB 走的是工程实用路线。它没有严格按照 delta-state CRDT 的学术框架来实现,而是结合了 operation-based 和 state-based 的思想,用”幂等效果”作为同步单元。这种设计在 Redis 的单线程模型下实现起来更自然,也更容易与 Redis 原有的 AOF 持久化机制集成。
参考文献
Almeida, P. S., Shoker, A., & Baquero, C. (2018). Delta State Replicated Data Types. Journal of Parallel and Distributed Computing, 111, 162-173. https://arxiv.org/abs/1603.01529
Almeida, P. S., Shoker, A., & Baquero, C. (2015). Efficient State-based CRDTs by Delta-Mutation. In Proceedings of the International Conference on Networked Systems (NETYS 2015). https://arxiv.org/abs/1410.2803
van der Linde, A., Leitao, J., & Preguica, N. (2016). Delta-CRDTs: Making delta-CRDTs Delta-Based. In Proceedings of the 2nd Workshop on the Principles and Practice of Consistency for Distributed Data (PaPoC 2016). https://doi.org/10.1145/2911151.2911163
Shapiro, M., Preguica, N., Baquero, C., & Zawirski, M. (2011). A comprehensive study of Convergent and Commutative Replicated Data Types. INRIA Technical Report RR-7506. https://hal.inria.fr/inria-00555588
Akkoorath, D. D., et al. (2016). Cure: Strong Semantics Meets High Availability and Low Latency. In Proceedings of the 36th IEEE International Conference on Distributed Computing Systems (ICDCS 2016). https://doi.org/10.1109/ICDCS.2016.98
DeCandia, G., et al. (2007). Dynamo: Amazon’s Highly Available Key-value Store. In Proceedings of the 21st ACM Symposium on Operating Systems Principles (SOSP 2007). https://doi.org/10.1145/1294261.1294281
Basho Technologies. Riak KV Data Types documentation. https://docs.riak.com/riak/kv/latest/developing/data-types/
Redis Ltd. Redis Active-Active Geo-Distribution (CRDB). https://redis.io/docs/latest/operate/rs/databases/active-active/
Enes, V., Baquero, C., Rezende, T. F., Gotsman, A., Perrin, M., & Sutra, P. (2019). State-based CRDTs with δ-mutation: A Systematic Approach. arXiv preprint arXiv:1904.00450.
Merkle, R. C. (1979). A Certified Digital Signature. In Proceedings of CRYPTO 1989. https://doi.org/10.1007/0-387-34805-0_21
上一篇:CRDT 在协同编辑中的应用 下一篇:MapReduce