土法炼钢兴趣小组的算法知识备份

【分布式系统百科】Delta-state CRDT 与反熵优化:从全量同步到增量传播

目录

一个 OR-Set(Observed-Remove Set)部署在 100 个节点上,每个节点维护的集合包含 10000 个元素。在 state-based CRDT 的经典设计里,每一轮反熵(anti-entropy)同步都要把整个状态发给邻居节点。单个节点的状态序列化后大约 200KB(假设每个元素连同唯一标签占 20 字节)。100 个节点两两同步一轮,网络上就要跑 100 x 99 x 200KB = 约 1.88GB 的数据。而这一轮同步的实际”新信息”可能只有几百字节——一个节点加了一个元素,另一个节点删了一个元素。

99.99% 的带宽被浪费在传输对方已经知道的信息上。

这就是 state-based CRDT 在工程落地时遇到的核心瓶颈。Operation-based CRDT(CmRDT)通过只传播操作来避免这个问题,但它要求底层通信层提供可靠的因果广播(reliable causal broadcast),这在大规模系统中本身就是一个难题。有没有一种方案,既保留 state-based CRDT 对网络层的宽松要求(只需要最终送达,不要求顺序和去重),又能把传输量降下来?

2015 年,Paulo Sergio Almeida、Ali Shoker 和 Carlos Baquero 在论文《Delta State Replicated Data Types》中给出了答案:Delta-state CRDT。核心思想很直接——不发全量状态,只发状态的增量(delta)。但要让这个思路在数学上严格成立,需要一套精确的形式化框架。要让它在工程上可用,还需要解决因果一致性、元数据膨胀和垃圾回收等一系列实际问题。

本文从 state-based CRDT 的带宽问题出发,逐步推导 delta-state CRDT 的理论基础,然后深入反熵协议设计、Merkle 树同步、元数据垃圾回收,最后分析 Riak 和 AntidoteDB 等实际系统的实现。

核心定理(Delta-state 等价性定理)

\((S, \sqsubseteq, \sqcup)\) 为一个 join-semilattice,\(m\) 为一个 mutator,\(m^\delta\) 为其对应的 delta-mutator,满足 \(m(s) = s \sqcup m^\delta(s)\)。若所有副本最终接收并合并了所有 delta,则 delta-state CRDT 与 full-state CRDT 收敛到相同的状态。关键区别在于:full-state 同步每次传输的消息大小为 \(O(|S|)\)(正比于完整状态),而 delta-state 同步的消息大小为 \(O(|m^\delta(s)|)\)(仅正比于本次变更),在保持相同收敛性保证的前提下将带宽开销降低了数量级。

Delta-state 传播示意图

一、State-based CRDT 的带宽问题

1.1 全量状态传输的代价

State-based CRDT(也称 CvRDT,Convergent Replicated Data Type)的工作模式是:每个节点在本地执行操作,修改本地状态;然后通过反熵协议定期将本地的完整状态发送给其他节点;接收方用一个合并函数(merge / join)将收到的状态与本地状态合并。合并函数要求是一个半格(join-semilattice)上的最小上界(least upper bound)操作:幂等、交换、结合。

这个模型的优雅之处在于它对网络的要求极低。消息可以丢失、重复、乱序,只要最终能送达,状态就会收敛。丢了就重发——反正发的是全量状态,重发不会产生错误。重复了也没关系——merge 是幂等的。乱序也无所谓——merge 是交换的。

但代价是带宽。

考虑一个 G-Counter(Grow-only Counter),部署在 n 个节点上。G-Counter 的状态是一个长度为 n 的向量,第 i 个位置存储节点 i 的本地计数。每次反熵同步需要传输整个向量。如果 n = 100,每个计数器占 8 字节,一次传输就是 800 字节。这看起来还行。但如果是一个 Map CRDT,里面嵌套了 1000 个 G-Counter 呢?那就是 800KB 一次。如果系统里有 100 个这样的 Map 呢?那就是 80MB 一次。

更严重的是 OR-Set。OR-Set 的经典实现中,每个元素关联一组唯一标签(unique tags)。添加元素时生成新标签,删除元素时移除该元素的所有可见标签。随着时间推移,标签的累积量远超集合中实际存活的元素数量。一个只有 100 个当前元素的 OR-Set,可能因为反复添加/删除操作而积累了数万个历史标签。全量传输这些元数据是巨大的浪费。

1.2 带宽消耗的量化分析

用更精确的方式量化这个问题。设系统有 n 个节点,每个节点的 CRDT 状态大小为 |S| 字节。反熵协议每 T 秒执行一轮,每轮中每个节点选择 k 个邻居发送状态(k 取决于拓扑结构,完全图中 k = n-1)。

每轮的总带宽消耗:

B_full = n * k * |S| bytes/round

对于完全图拓扑(每个节点向所有其他节点同步):

B_full = n * (n-1) * |S|

在前面的 OR-Set 例子中:n = 100,|S| = 200KB:

B_full = 100 * 99 * 200KB = 1,980,000KB ≈ 1.88GB/round

如果反熵间隔 T = 10 秒,持续带宽为 188MB/s。这个数字在一个 100 节点的集群中是不可接受的。

而同一时间段内,实际产生的新操作可能只有几十个,对应的状态变化量可能只有几 KB。全量同步的带宽利用率:

效率 = |实际变更| / |传输总量| = 几KB / 1.88GB ≈ 0.0001%

1.3 已有的优化尝试

在 delta-state CRDT 被提出之前,工程上有几种常见的优化手段:

压缩传输:用 gzip/snappy 等压缩算法压缩状态后传输。对结构化数据能压缩 3-10 倍,但不改变 O(|S|) 的量级。

降低同步频率:延长反熵间隔 T。但这直接导致数据收敛变慢,在某些场景下不可接受。

稀疏同步拓扑:不用完全图,改用稀疏拓扑(如环、树、随机图)。这可以把每个节点的同步邻居数 k 从 n-1 降到 O(log n) 或 O(1),但每个连接上仍然传输全量状态。而且稀疏拓扑增加了传播延迟——从一个节点到另一个节点的更新可能需要经过多跳才能到达。

传输差异(state diff):维护上一次发送给某个邻居的状态快照,只传输与快照的差异。这在工程上可行,但有两个问题:第一,需要为每个邻居维护一份快照,内存开销 O(n * |S|);第二,差异计算本身有开销,且没有通用的 CRDT 差异计算方法。

这些方案都没有从根本上解决问题。根本解决需要在 CRDT 的数学框架中引入增量的概念——这就是 delta-state CRDT 要做的事。

二、Delta-state CRDT 核心理论

2.1 基础概念回顾

在展开 delta-state 理论之前,先明确几个前置概念。

一个 state-based CRDT 定义为一个五元组 (S, s0, q, u, m):

状态空间 S 上的偏序关系 ⊑ 定义为:s1 ⊑ s2 当且仅当 s1 ⊔ s2 = s2。直觉上,s1 ⊑ s2 意味着 s2 “包含”了 s1 的所有信息。

更新函数 u 必须是膨胀的(inflationary):对任意状态 s,u(s) ⊒ s。也就是说,每次操作只能让状态”增长”,不能后退。

2.2 Delta-mutator 的定义

Delta-state CRDT 的核心创新是引入了 delta-mutator(增量变更器)的概念。

在传统 state-based CRDT 中,mutator u 接受当前状态 s,返回一个新的完整状态 u(s)。在 delta-state CRDT 中,我们定义一个对应的 delta-mutator u^δ,它也接受当前状态 s,但返回的不是完整的新状态,而是一个增量(delta):

u^δ: S -> S

注意 delta 的类型仍然是 S——delta 本身也是状态空间中的一个元素。这是 delta-state 理论的关键设计决策。delta 不是某种独立的”操作日志”或”差异格式”,它就是一个(通常很小的)状态。因此,delta 可以用同样的 merge 函数与完整状态合并:

u(s) = s ⊔ u^δ(s)

也就是说,对状态 s 执行操作 u,等价于计算 delta u^δ(s),然后将 delta 合并回 s。

举一个具体的例子。G-Counter 的状态是一个向量 [c1, c2, …, cn],节点 i 执行 increment 操作的传统 mutator 是:

u_i([c1, ..., ci, ..., cn]) = [c1, ..., ci+1, ..., cn]

对应的 delta-mutator 是:

u_i^δ([c1, ..., ci, ..., cn]) = [0, ..., ci+1, ..., 0]

Delta 是一个”几乎全零”的向量,只有第 i 个位置有值。将这个 delta 与原始状态做逐位取 max(G-Counter 的 merge 就是逐位取 max),得到的结果与直接执行传统 mutator 完全相同。

2.3 Delta-group:聚合多个 delta

单个 delta 已经比全量状态小很多了。但如果节点在两次反熵同步之间执行了多次操作,我们不需要一个一个地发送 delta——可以先将它们聚合成一个 delta-group(增量组),然后一次性发送。

Delta-group 就是多个 delta 的 join(合并):

D = d1 ⊔ d2 ⊔ ... ⊔ dk

由于 join 操作是结合的和交换的,delta-group 的计算顺序无关紧要。而且 delta-group 本身也是 S 中的一个元素,所以它同样可以用 merge 函数与完整状态合并:

s' = s ⊔ D

这意味着节点可以在本地维护一个”delta 缓冲区”:每次执行操作时,将产生的 delta 加入(join)到缓冲区。在下一次反熵同步时,将整个缓冲区作为 delta-group 发送给邻居,然后清空缓冲区。

2.4 与 full-state CRDT 的等价性

Delta-state CRDT 的一个关键理论性质是:在可靠通信的条件下,delta-state 传播与 full-state 传播最终达到的状态完全相同。

这个性质的证明依赖于以下观察:

设节点 i 的状态从 s_0 开始,依次执行了操作 u1, u2, …, uk。在 full-state 模式下,最终状态为:

s_k = u_k(u_{k-1}(...u_1(s_0)...))

在 delta-state 模式下,每次操作产生一个 delta,最终状态为:

s_k = s_0 ⊔ u_1^δ(s_0) ⊔ u_2^δ(s_1) ⊔ ... ⊔ u_k^δ(s_{k-1})

由于每个操作 u_i 满足 u_i(s) = s ⊔ u_i^δ(s),并且 join 操作满足 s ⊑ u_i(s)(膨胀性),可以通过归纳法证明两种模式下的最终状态相同。

从多节点的角度看,只要所有 delta 最终都被所有节点收到并合并,所有节点的最终状态就会收敛到相同的值——与 full-state 同步的收敛结果一致。证明的关键在于 join 的幂等性和交换性保证了无论 delta 的接收顺序和重复次数如何,最终结果都相同。

两副本同步的完整推演

下面通过一个具体的两副本场景,展示 delta-state 同步的完整生命周期。

假设 G-Counter 在副本 A 和副本 B 上的初始状态均为 {A:0, B:0}

  1. 副本 A 执行本地变更:A 执行 increment(),本地状态变为 {A:1, B:0},同时生成 delta {A:1}
  2. A 向 B 发送 delta:A 将 delta {A:1} 发送给 B,而非完整状态 {A:1, B:0}。消息大小从 \(O(n)\) 降至 \(O(1)\)
  3. B 合并收到的 delta:B 执行 merge(local, delta) = {A:0, B:0} ⊔ {A:1} = {A:1, B:0}。此时 B 的状态与 A 一致。
  4. B 执行本地变更:B 执行 increment(),本地状态变为 {A:1, B:1},生成 delta {B:1}
  5. B 向 A 发送 delta:B 将 delta {B:1} 发送给 A。
  6. A 合并收到的 delta:A 执行 merge(local, delta) = {A:1, B:0} ⊔ {B:1} = {A:1, B:1}

最终两个副本均收敛到 {A:1, B:1},与 full-state 同步的结果完全相同。

在工程实现中,delta 不会逐个发送,而是累积为 delta-interval 后批量传输。发送方维护一个 ACK 游标,记录对端已确认接收的 delta 序号,仅发送游标之后的增量。以下时序图展示了这一完整的 delta-interval 传播协议:

sequenceDiagram
    participant A as 副本 A
    participant B as 副本 B

    A->>A: mutate(): 执行本地变更
    A->>A: 生成 delta, 追加到 DeltaBuffer
    A->>A: 计算 delta-interval [ack_B+1, seq_A]

    A->>B: 发送 delta-interval
    B->>B: merge(local_state, delta-interval)
    B-->>A: ACK(seq=latest_merged)

    A->>A: 推进 ack_B 游标至 ACK.seq

    B->>B: mutate(): 执行本地变更
    B->>B: 生成 delta, 追加到 DeltaBuffer
    B->>B: 计算 delta-interval [ack_A+1, seq_B]

    B->>A: 发送 delta-interval
    A->>A: merge(local_state, delta-interval)
    A-->>B: ACK(seq=latest_merged)

    B->>B: 推进 ack_A 游标至 ACK.seq

该时序图清晰地展示了 delta-interval 传播的核心机制:发送方根据 ACK 游标计算需要发送的增量区间,接收方合并后返回确认。ACK 机制确保了已确认的 delta 不会被重复发送,同时保留了幂等性——即使 ACK 丢失导致 delta 被重发,join 操作的幂等性保证合并结果不变。这种设计在保持 state-based CRDT 对网络层宽松要求的同时,将传输效率提升到了接近 operation-based CRDT 的水平。

2.5 Delta G-Counter 的完整实现

以下是 Delta G-Counter 的 Go 语言实现:

package crdt

// DeltaGCounter 实现了基于 delta-state 的 G-Counter。
// 状态是一个 map[nodeID]uint64,delta 也是同样的类型。
type DeltaGCounter struct {
    id      string
    entries map[string]uint64
    pending map[string]uint64 // delta 缓冲区
}

func NewDeltaGCounter(id string) *DeltaGCounter {
    return &DeltaGCounter{
        id:      id,
        entries: make(map[string]uint64),
        pending: make(map[string]uint64),
    }
}

// Increment 执行自增操作,同时生成 delta 并加入缓冲区。
func (c *DeltaGCounter) Increment() {
    c.entries[c.id]++
    // delta-mutator: 只包含本节点更新后的值
    newVal := c.entries[c.id]
    if existing, ok := c.pending[c.id]; !ok || newVal > existing {
        c.pending[c.id] = newVal
    }
}

// Value 返回计数器的当前值。
func (c *DeltaGCounter) Value() uint64 {
    var sum uint64
    for _, v := range c.entries {
        sum += v
    }
    return sum
}

// Merge 将一个远程状态(可以是全量或 delta)合并到本地。
func (c *DeltaGCounter) Merge(remote map[string]uint64) {
    for id, val := range remote {
        if val > c.entries[id] {
            c.entries[id] = val
        }
    }
}

// FlushDelta 取出缓冲区中的 delta-group 并清空缓冲区。
// 返回值就是要发送给邻居的增量。
func (c *DeltaGCounter) FlushDelta() map[string]uint64 {
    delta := c.pending
    c.pending = make(map[string]uint64)
    return delta
}

对比一下 full-state 模式。在 full-state 模式下,反熵同步发送的是完整的 entries map,大小为 O(n),其中 n 是集群节点数。在 delta-state 模式下,发送的是 pending map,大小通常是 O(1)(只包含本节点自上次 flush 以来修改过的条目)。当集群有 100 个节点时,每次同步的传输量减少了约 100 倍。

2.6 Delta OR-Set

OR-Set 的 delta-state 版本更复杂,但原理相同。OR-Set 的状态可以表示为 (E, T),其中 E 是 {(element, tag)} 的集合(表示当前存在的元素),T 是所有已知 tag 的集合(包括已删除的)。但更常见的表示方法是使用 dot store(点存储)——每个”dot”是一个 (nodeID, sequenceNumber) 对,作为唯一标签。

以下是一个简化的 Delta OR-Set 伪代码:

type Dot struct {
    NodeID string
    Seq    uint64
}

type DeltaORSet struct {
    id       string
    nextSeq  uint64
    dots     map[Dot]string          // dot -> element
    observed map[string]map[Dot]bool // element -> set of dots
    pending  DeltaPayload            // delta 缓冲区
}

type DeltaPayload struct {
    AddedDots   map[Dot]string // 新增的 dot -> element 映射
    RemovedDots map[Dot]bool   // 被移除的 dots
}

// Add 向集合中添加一个元素。
// Delta 只包含新生成的 dot。
func (s *DeltaORSet) Add(elem string) {
    dot := Dot{NodeID: s.id, Seq: s.nextSeq}
    s.nextSeq++
    s.dots[dot] = elem
    if s.observed[elem] == nil {
        s.observed[elem] = make(map[Dot]bool)
    }
    s.observed[elem][dot] = true
    // delta: 只包含这个新 dot
    s.pending.AddedDots[dot] = elem
}

// Remove 从集合中移除一个元素。
// Delta 包含被移除的所有 dots。
func (s *DeltaORSet) Remove(elem string) {
    if dots, ok := s.observed[elem]; ok {
        for dot := range dots {
            delete(s.dots, dot)
            s.pending.RemovedDots[dot] = true
        }
        delete(s.observed, elem)
    }
}

Add 操作的 delta 是一个单独的 dot(通常十几字节),而不是整个集合。Remove 操作的 delta 是被删除元素关联的所有 dot,通常也远小于全量状态。

三、Delta-interval 与因果一致性

3.1 为什么需要因果一致性

在 full-state CRDT 中,因果一致性是免费的——因为每次传输的是完整状态,包含了所有历史操作的效果。接收方只要做一次 merge 就能得到一个因果完整的状态。

但在 delta-state CRDT 中,情况不同。如果节点 A 先执行操作 op1,再执行 op2(op2 依赖 op1 的结果),A 产生了两个 delta:d1 和 d2。如果节点 B 只收到了 d2 但没收到 d1,直接将 d2 merge 到自己的状态,可能得到一个不一致的结果。

具体例子:考虑一个 2P-Set(两阶段集合),支持 add 和 remove。节点 A 先 add(“x”),产生 d1 = {add: x};然后 remove(“x”),产生 d2 = {remove: x}。如果节点 B 先收到 d2,尝试移除一个本地不存在的元素 x,这个操作不会产生效果。之后收到 d1,add(“x”),x 被加入。最终 B 的状态是 {x},但正确的状态应该是 {}。

这个问题的根源是:delta 的应用顺序可能违反因果关系。

3.2 Delta-interval 的定义

为了解决因果一致性问题,Almeida 等人引入了 delta-interval(增量区间)的概念。

节点 i 维护一个单调递增的序列号 c_i。每次执行操作时,序列号递增,并将 (c_i, delta) 关联起来。第 c 次操作产生的 delta 记为 d_i^c。

Delta-interval D_i^{a,b} 定义为节点 i 在序列号 a 到 b 之间的所有 delta 的 join:

D_i^{a,b} = d_i^a ⊔ d_i^{a+1} ⊔ ... ⊔ d_i^b

当节点 i 要向节点 j 发送增量时,它需要知道 j 上次确认收到的最大序列号 ack_j。然后发送 D_i^{ack_j+1, c_i}——即 j 还没看到的所有 delta 的聚合。

3.3 使用版本向量跟踪 delta 传播

在实践中,跟踪 delta 传播进度使用版本向量(Version Vector)。每个节点维护一个版本向量 VV,记录它已经合并过的来自每个其他节点的最大序列号。

当节点 i 向节点 j 发送 delta 时:

  1. 节点 i 查询 ack_map[j],得到 j 上次确认收到的序列号(初始为 0)
  2. 节点 i 计算 D_i^{ack_map[j]+1, c_i},发送给 j
  3. 节点 j 收到后,将 D 与本地状态 merge,并更新 VV[i] = c_i
  4. 节点 j 回复 ack,节点 i 更新 ack_map[j] = c_i

这个机制类似于 TCP 的序列号和确认机制——发送方记录接收方的进度,只发送对方缺失的部分。

以下是跟踪机制的 Go 语言实现:

type DeltaManager struct {
    nodeID    string
    seq       uint64
    deltaLog  []DeltaEntry         // 按序列号排列的 delta 日志
    ackMap    map[string]uint64    // 邻居 -> 已确认的序列号
    vv        map[string]uint64    // 本地版本向量
}

type DeltaEntry struct {
    Seq   uint64
    Delta interface{} // 实际的 delta 值
}

// 生成发送给 peer 的 delta-interval
func (dm *DeltaManager) DeltaIntervalFor(peer string) interface{} {
    acked := dm.ackMap[peer] // 默认 0
    var group interface{}
    for _, entry := range dm.deltaLog {
        if entry.Seq > acked {
            group = joinDelta(group, entry.Delta)
        }
    }
    return group
}

// 收到 peer 的 ack 后更新进度
func (dm *DeltaManager) OnAck(peer string, ackedSeq uint64) {
    if ackedSeq > dm.ackMap[peer] {
        dm.ackMap[peer] = ackedSeq
    }
}

3.4 因果稳定性(Causal Stability)

因果稳定性是 delta-state CRDT 中一个重要的概念,它与后文讨论的元数据垃圾回收密切相关。

一个 delta d_i^c 在节点 j 上是因果稳定的(causally stable),当且仅当集群中所有节点都已经合并了这个 delta(直接合并或通过传递性合并)。

判断因果稳定性的方法是检查所有节点的版本向量。设 min_ack = min(VV_k[i]) 对所有节点 k(包括 i 自己)。如果 c <= min_ack,那么 d_i^c 在所有节点上都是因果稳定的。

因果稳定的 delta 有两个重要用途:

  1. delta 日志的裁剪:因果稳定的 delta 不会再被任何节点需要,可以安全地从 delta 日志中删除。这防止了 delta 日志无限增长。

  2. 元数据垃圾回收:某些 CRDT 的元数据(如 OR-Set 中的墓碑标记)在因果稳定后可以安全回收,详见第六节。

计算因果稳定性需要知道所有节点的版本向量。在实际系统中,这个信息通过 gossip 协议传播。每个节点定期将自己的版本向量广播给邻居,邻居收集所有版本向量后取逐位最小值(component-wise minimum),得到全局因果稳定点。

// 计算因果稳定点:所有节点都已经看到的最大序列号
func causalStabilityPoint(allVVs map[string]map[string]uint64, nodeID string) uint64 {
    minAck := uint64(math.MaxUint64)
    for _, vv := range allVVs {
        if ack, ok := vv[nodeID]; ok {
            if ack < minAck {
                minAck = ack
            }
        } else {
            return 0 // 有节点还没收到任何 delta
        }
    }
    return minAck
}

四、反熵协议(Anti-Entropy Protocols)

4.1 反熵的基本模式

反熵(anti-entropy)是分布式系统中用于同步副本状态的机制。这个术语来源于物理学中的”熵”——系统的无序程度。副本之间的不一致可以看作一种”熵”,反熵协议的目标就是消除这种不一致。

反熵协议有两个基本维度的设计选择:

触发方式

传播方向

在 delta-state CRDT 的语境下,push 模式是最自然的选择:节点在执行操作后产生 delta,然后将 delta(或 delta-group)推送给邻居。Pull 模式也可行,但需要接收方告知自己的版本向量,让发送方计算出需要的 delta-interval。

4.2 反熵协议的正确性要求

一个正确的反熵协议必须满足以下性质:

最终传播(Eventual Propagation):任何一个节点上的任何一次操作,其效果最终会传播到所有节点。形式化地说,对于任何操作 op 在节点 i 上产生的 delta d,存在一个时间点 t,使得在 t 之后所有节点的状态都”包含”了 d 的效果。

对于 delta-state 的额外要求——因果传播(Causal Propagation):delta 的传播必须保证因果顺序。具体来说,如果节点 j 合并了 delta d2,而 d2 因果依赖于 d1,那么 j 必须在合并 d2 之前(或同时)合并 d1。使用 delta-interval 机制可以自然地保证这一点,因为 delta-interval 包含了连续序列号范围内的所有 delta。

容错性(Fault Tolerance):在节点崩溃和网络分区的情况下,协议仍然能最终完成同步(在故障恢复后)。

4.3 传播拓扑选择

反熵协议需要选择传播拓扑——即每个节点与哪些节点进行同步。

完全图(Full Mesh):每个节点与所有其他节点同步。收敛速度最快(一轮即可),但消息数量为 O(n^2),不适用于大规模集群。

随机选择(Random Peer Selection):每轮随机选择 k 个邻居同步。这是 gossip 协议的经典做法。Epidemic theory(流行病理论(Epidemic Theory))证明,当 k >= ln(n) + c 时(c 是一个常数),信息在 O(log n) 轮内以高概率传播到所有节点。

固定拓扑:使用固定的拓扑结构,如环(ring)、树(tree)或超立方体(hypercube)。环的消息数量最少(每个节点只与相邻两个节点同步),但传播延迟为 O(n)。树的传播延迟为 O(log n),但有单点故障问题。超立方体(n 个节点构成 log(n) 维超立方体)在消息数量 O(n log n) 和传播延迟 O(log n) 之间取得了较好的平衡。

在实际系统中,随机选择是最常用的方案,因为它在收敛速度、容错性和实现复杂度之间取得了良好的平衡。Riak 和 Cassandra 都使用随机选择的 gossip 协议进行反熵同步。

4.4 实现示例:带 delta-interval 的反熵协议

以下是一个完整的反熵协议实现框架:

type AntiEntropyNode struct {
    id        string
    state     JoinSemilattice       // CRDT 状态
    deltaLog  []DeltaEntry          // delta 日志
    ackMap    map[string]uint64     // peer -> 已确认序列号
    seq       uint64                // 本地序列号
    peers     []string              // 邻居节点列表
    interval  time.Duration         // 反熵间隔
}

// 定期反熵循环
func (n *AntiEntropyNode) AntiEntropyLoop() {
    ticker := time.NewTicker(n.interval)
    for range ticker.C {
        // 随机选择一个邻居
        peer := n.peers[rand.Intn(len(n.peers))]
        n.pushDelta(peer)
    }
}

// 向 peer 推送 delta-interval
func (n *AntiEntropyNode) pushDelta(peer string) {
    acked := n.ackMap[peer]
    if acked >= n.seq {
        return // 该 peer 已经是最新的
    }
    // 构建 delta-interval: (acked, seq]
    var group JoinSemilattice
    for _, entry := range n.deltaLog {
        if entry.Seq > acked && entry.Seq <= n.seq {
            group = join(group, entry.Delta)
        }
    }
    // 发送 delta-group 和当前序列号
    send(peer, DeltaMessage{
        From:   n.id,
        Group:  group,
        MaxSeq: n.seq,
    })
}

// 接收远程 delta
func (n *AntiEntropyNode) OnReceiveDelta(msg DeltaMessage) {
    n.state = join(n.state, msg.Group)
    // 回复 ack
    send(msg.From, AckMessage{
        From:     n.id,
        AckedSeq: msg.MaxSeq,
    })
}

// 收到 ack
func (n *AntiEntropyNode) OnReceiveAck(msg AckMessage) {
    if msg.AckedSeq > n.ackMap[msg.From] {
        n.ackMap[msg.From] = msg.AckedSeq
    }
    // 裁剪 delta 日志:删除所有 peer 都已确认的 delta
    n.gcDeltaLog()
}

func (n *AntiEntropyNode) gcDeltaLog() {
    minAcked := n.seq
    for _, peer := range n.peers {
        if ack := n.ackMap[peer]; ack < minAcked {
            minAcked = ack
        }
    }
    // 删除 seq <= minAcked 的 delta 条目
    newLog := make([]DeltaEntry, 0)
    for _, entry := range n.deltaLog {
        if entry.Seq > minAcked {
            newLog = append(newLog, entry)
        }
    }
    n.deltaLog = newLog
}

值得注意的是 gcDeltaLog 方法——它只在所有邻居都确认收到某个 delta 后才将其从日志中删除。如果某个邻居长时间不可达(比如节点崩溃或网络分区),delta 日志会持续增长。实际系统中需要设置日志大小上限,当日志被截断时,对应的邻居在下次恢复连接时需要做一次全量同步。这是 delta-state 相比 full-state 的一个权衡:delta-state 需要维护额外的状态(delta 日志和 ack map),而当这些状态丢失或不足时,需要回退到全量同步。

五、Merkle 树反熵

5.1 Merkle 树在数据同步中的应用

Merkle 树(Merkle Tree)是一种哈希树(Hash Tree),由 Ralph Merkle 在 1979 年提出。它的特点是:叶节点存储数据块的哈希值,非叶节点存储其子节点哈希值的拼接的哈希。根节点的哈希值(root hash)是整棵树所有数据的”指纹”。

在分布式数据同步中,Merkle 树提供了一种高效的不一致检测机制:

  1. 两个节点各自为本地数据构建 Merkle 树
  2. 先比较根哈希。如果相同,说明数据完全一致,无需同步
  3. 如果根哈希不同,递归比较子树的哈希,快速定位到不一致的数据分区
  4. 只对不一致的分区进行数据同步

这个过程的关键优势是:比较的通信量与不一致数据的大小成正比,而不是与总数据量成正比。如果 n 个数据项中只有 k 个不一致,比较过程只需要交换 O(k * log(n/k)) 个哈希值。

5.2 Merkle 树在 Cassandra 和 Dynamo 中的应用

Amazon Dynamo 论文(2007)首先将 Merkle 树引入分布式存储的反熵同步。Cassandra 作为 Dynamo 的开源后继者,在其反熵修复(anti-entropy repair)机制中广泛使用了 Merkle 树。

Cassandra 的 Merkle 树反熵工作流程:

  1. 构建阶段:节点对自己负责的数据范围(token range)构建 Merkle 树。每个叶节点对应一个 token 范围的分区,存储该分区内所有数据的哈希。
  2. 交换阶段:两个负责相同 token 范围的副本节点交换 Merkle 树。
  3. 比较阶段:从根到叶逐层比较,找出哈希不同的叶节点。
  4. 修复阶段:只对哈希不同的分区进行数据流式传输(streaming)。

Cassandra 的 nodetool repair 命令触发这个过程。在 Cassandra 4.0 之前,这是一个资源密集的操作,因为需要读取整个数据集来构建 Merkle 树。Cassandra 4.0 引入了增量修复(Incremental Repair),只对自上次修复以来有变更的数据构建 Merkle 树,大幅减少了修复的开销。

Merkle 树反熵的局限性:

5.3 Merkle 树与 delta-state 的结合

Merkle 树和 delta-state CRDT 可以互补:

正常情况下用 delta:在网络连通、节点正常的情况下,使用 delta-interval 传播增量。这是最高效的同步方式。

异常恢复用 Merkle 树:当节点崩溃恢复后,或者因为 delta 日志被截断导致无法增量同步时,使用 Merkle 树快速定位不一致的数据,然后只对这些数据进行全量同步。

混合策略:某些系统(如 Riak)同时运行两套反熵机制。delta 传播处理正常操作流,Merkle 树反熵作为”安全网”定期运行,捕获任何 delta 传播可能遗漏的不一致。

这种混合策略在实际系统中非常实用。Delta 传播是”最优路径”——在一切正常的情况下提供低延迟、低带宽的同步。Merkle 树反熵是”兜底路径”——在各种异常情况(节点崩溃、delta 日志丢失、网络长时间分区)下保证数据最终一致。

%% 伪代码:混合反熵策略
-module(hybrid_anti_entropy).

%% 正常路径:delta 传播(高频,低开销)
handle_local_op(Op, State) ->
    {NewState, Delta} = apply_delta_mutator(Op, State),
    buffer_delta(Delta),
    schedule_delta_push(short_interval()),
    NewState.

%% 定期 delta 推送
handle_delta_push(Peer, DeltaBuffer, AckMap) ->
    Acked = maps:get(Peer, AckMap, 0),
    Interval = compute_delta_interval(DeltaBuffer, Acked),
    case Interval of
        empty -> ok;
        _     -> send_delta(Peer, Interval)
    end.

%% 兜底路径:Merkle 树反熵(低频,高开销)
handle_merkle_repair(Peer, LocalData) ->
    LocalTree = build_merkle_tree(LocalData),
    RemoteTree = request_merkle_tree(Peer),
    DiffRanges = compare_trees(LocalTree, RemoteTree),
    lists:foreach(fun(Range) ->
        repair_range(Peer, Range)
    end, DiffRanges).

下图展示了 Merkle 树反熵对账的完整流程。两个副本从根哈希开始逐层比较,快速定位不一致的子树,最终仅交换差异部分的 delta,避免了全量扫描:

flowchart TD
    A["比较根节点哈希"] -->|哈希相同| B["状态一致,无需同步"]
    A -->|哈希不同| C["递归比较子节点哈希"]
    C --> D["左子树哈希相同"]
    C --> E["右子树哈希不同"]
    D --> F["跳过左子树"]
    E --> G["继续向下钻取右子树"]
    G --> H["定位到叶子节点层的差异 key 范围"]
    H --> I["仅交换差异范围内的 delta"]
    I --> J["接收方合并 delta"]
    J --> K["更新本地 Merkle 树哈希"]

Merkle 树对账的核心优势在于将差异定位的时间复杂度从 \(O(n)\) 降低到 \(O(\log n)\),其中 \(n\) 为数据条目总数。通过逐层哈希比较,两个副本可以在 \(O(\log n)\) 轮交互中精确定位所有不一致的 key 范围,然后仅针对这些范围交换 delta。这种机制特别适合作为 delta-interval 协议的兜底方案——当 delta buffer 因容量限制被截断或节点重启导致 ACK 游标丢失时,Merkle 树反熵可以高效地恢复一致性,而无需回退到全量状态传输。

六、元数据垃圾回收

6.1 CRDT 元数据膨胀问题

CRDT 的正确性依赖于某些元数据,而这些元数据有一个共同特点:它们只增不减。

向量时钟 / 版本向量:大小随参与过的节点数线性增长。如果系统中的节点 ID 是动态分配的(比如容器化环境中,每次重启分配新 ID),版本向量会无限膨胀。

OR-Set 的唯一标签:每次 add 操作都生成新标签。即使元素被删除,标签仍然保留在”墓碑”集合中(用于防止已删除的元素被因果上更早的 add 操作”复活”)。大量 add/remove 操作后,墓碑数量可能远超存活元素数量。

LWW-Register(Last-Writer-Wins Register)的时间戳历史:虽然只需要保留最新的时间戳,但在某些实现中需要保留历史时间戳来处理因果上的并发写入。

Map CRDT 的嵌套元数据:Map 中每个 key 对应的 value 本身是一个 CRDT,每个 value 都有自己的元数据。Map 越大,元数据总量越大。

元数据膨胀的实际影响:

6.2 因果稳定性条件下的垃圾回收

元数据垃圾回收(Garbage Collection,GC)的核心难点在于:过早回收可能导致语义错误。

考虑 OR-Set 的墓碑回收。元素 x 被节点 A 删除后,A 保留了 x 的墓碑 {(x, tag1), (x, tag2)}。如果 A 在其他所有节点收到这个删除信息之前就回收了墓碑,那么:

  1. 节点 B 之前 add(“x”) 产生的 (x, tag1) 还没传到节点 C
  2. A 回收了墓碑
  3. B 的 add 传播到 C,C 看到 (x, tag1),因为没有对应的墓碑信息,C 认为 x 是存活的
  4. x 被错误地”复活”了

因果稳定性提供了安全回收的条件:只有当一个操作(以及它产生的元数据)在所有节点上都已经被观察到,它产生的元数据才可以安全回收。

def can_gc_tombstone(tombstone, all_version_vectors, origin_node, origin_seq):
    """判断一个墓碑是否可以安全回收。
    
    只有当所有节点都已经观察到这个墓碑对应的操作时,
    才可以安全回收。
    """
    for node_id, vv in all_version_vectors.items():
        if vv.get(origin_node, 0) < origin_seq:
            return False  # 还有节点没看到这个操作
    return True

def gc_tombstones(local_tombstones, all_version_vectors):
    """回收所有因果稳定的墓碑。"""
    to_remove = []
    for ts in local_tombstones:
        if can_gc_tombstone(ts, all_version_vectors,
                           ts.origin_node, ts.origin_seq):
            to_remove.append(ts)
    for ts in to_remove:
        local_tombstones.remove(ts)
    return len(to_remove)

6.3 基于 Epoch 的垃圾回收

因果稳定性条件下的 GC 需要知道所有节点的版本向量,这在大规模系统中本身是一个挑战。一种替代方案是基于 epoch(纪元)的 GC。

基本思路:

  1. 系统维护一个全局 epoch 编号。每个 epoch 持续一段固定时间(如 1 小时)。
  2. 每个操作都标记它所属的 epoch。
  3. 当一个 epoch 结束时,系统通过一个协调协议(如 two-phase commit)确认所有节点都已经进入了新 epoch,并且旧 epoch 的所有操作都已经同步完成。
  4. 确认后,旧 epoch 中的元数据可以安全回收。
class EpochBasedGC:
    def __init__(self, node_id, epoch_duration_sec=3600):
        self.node_id = node_id
        self.current_epoch = 0
        self.epoch_duration = epoch_duration_sec
        self.metadata_by_epoch = {}  # epoch -> [metadata]
        self.confirmed_epochs = set()  # 已确认同步完成的 epoch
    
    def tag_metadata(self, metadata):
        """为元数据打上当前 epoch 标签。"""
        epoch = self.current_epoch
        if epoch not in self.metadata_by_epoch:
            self.metadata_by_epoch[epoch] = []
        self.metadata_by_epoch[epoch].append(metadata)
        return epoch
    
    def confirm_epoch(self, epoch):
        """协调者确认某个 epoch 的所有操作已同步。"""
        self.confirmed_epochs.add(epoch)
    
    def gc(self):
        """回收已确认 epoch 的元数据。"""
        reclaimed = 0
        for epoch in list(self.metadata_by_epoch.keys()):
            if epoch in self.confirmed_epochs:
                reclaimed += len(self.metadata_by_epoch[epoch])
                del self.metadata_by_epoch[epoch]
        return reclaimed

Epoch-based GC 的优势是不需要逐操作跟踪因果稳定性,只需要在 epoch 粒度上做确认。缺点是 GC 的粒度较粗(一个 epoch 的所有元数据要么全部回收,要么全部保留),且需要一个协调者来确认 epoch 完成。

6.4 GC 与正确性的权衡

在实际系统中,元数据 GC 往往需要在正确性和资源消耗之间做权衡。

保守策略:严格按照因果稳定性条件回收。优点是绝对安全,不会破坏 CRDT 语义。缺点是如果有节点长时间不可达,因果稳定性条件一直不满足,元数据就无法回收。极端情况下(永久离线的节点),元数据永远不会被回收。

激进策略:设置一个时间阈值(如 7 天),超过阈值的元数据无条件回收。优点是保证元数据大小有上界。缺点是如果有节点超过 7 天才恢复,可能观察到语义错误。实际系统中通常结合节点成员变更协议——如果一个节点超过阈值没有响应,将其从集群中移除,回收与其相关的元数据。

Riak 的做法:Riak 的 CRDT 实现采用了一种务实的方案。OR-Set 使用”dot”((nodeID, seq))作为唯一标签,而不是传统的 UUID。这使得元数据天然可以用版本向量来追踪和回收。当一个 dot 被所有节点的版本向量覆盖时,与该 dot 相关的元数据就可以安全回收。Riak 在每次合并时顺便做 GC,避免了单独的 GC 阶段。

下图展示了因果稳定性驱动的 GC 状态机,描述了一个 delta 从产生到最终被安全回收的完整生命周期:

stateDiagram-v2
    [*] --> DeltaAccumulated: 本地变更产生 delta

    DeltaAccumulated --> CausallyStable: 所有副本已确认接收\n(ACK 覆盖该 delta 的序号)
    DeltaAccumulated --> DeltaAccumulated: 仍有副本未确认

    CausallyStable --> SafeToGC: 确认无迟到消息依赖该 delta\n(版本向量全局推进)
    CausallyStable --> CausallyStable: 存在潜在的迟到消息

    SafeToGC --> Compacted: 执行压缩,回收元数据
    Compacted --> [*]

该状态机的关键在于”因果稳定”到”可安全回收”之间的过渡条件。一个 delta 被所有副本 ACK 并不立即意味着可以回收——还需要确保不存在尚未送达的、因果上依赖该 delta 的迟到消息。只有当全局版本向量推进到覆盖该 delta 的因果上下文时,才能安全地将其标记为可回收。在实际系统中,“Compacted”阶段通常与 merge 操作合并执行(如 Riak 的做法),而非作为独立的后台任务,以避免额外的锁竞争和 I/O 开销。

七、Riak delta-mutation 实现分析

7.1 Riak 2.x 的 CRDT 实现

Riak 是 Basho Technologies 开发的分布式键值存储,基于 Dynamo 论文设计。Riak 从 2.0 版本开始内置了 CRDT 支持(称为 Riak Data Types),包括 Counter、Set、Map、Register 和 Flag 五种数据类型。

Riak 的 CRDT 实现基于 riak_dt 这个 Erlang 库,后来演化为 riak_crdt。这个库实现了多种 CRDT,其中最核心的是基于 dot 的 OR-Set(在 Riak 中称为 riak_dt_orswot——Observed-Remove Set Without Tombstones)。

“Without Tombstones”是这个实现的亮点。传统 OR-Set 需要维护一个墓碑集合来记录已删除的元素标签。Riak 的 ORSWOT 通过一个巧妙的设计避免了墓碑:它使用一个全局的版本向量(称为 “clock”)来记录已经观察到的所有 dot。删除操作不需要显式记录墓碑,而是通过版本向量隐式表示——如果一个 dot 被版本向量覆盖(即 dot 的序列号小于等于版本向量中对应节点的序列号),但不在 entries 集合中,那么这个 dot 对应的元素就是已被删除的。

7.2 从全量同步到 delta-mutation

Riak 2.0 最初使用全量状态同步。每次 CRDT 值被读取或写入时,Riak 都会在副本之间传输完整的 CRDT 状态。对于大型 Map(嵌套多层 CRDT),这意味着每次操作都传输大量数据。

Riak 2.2 引入了 delta-mutation(在 Riak 的术语中也称为 “CRDT delta-mutations” 或 “dot-deltas”)。核心改动是在 riak_dt 库的每个数据类型中添加了 delta 计算逻辑。

以 ORSWOT(OR-Set)为例,Riak 的 delta 实现遵循以下原则:

%% riak_dt_orswot.erl 中的核心数据结构
%% ORSWOT 状态由三部分组成:
%% - Clock: 版本向量,记录已观察到的所有 dot
%% - Entries: [{Element, [Dot]}],当前存活的元素及其 dot
%% - Deferred: 延迟的操作(处理因果不一致)

-record(orswot, {
    clock   = riak_dt_vclock:new() :: riak_dt_vclock:vclock(),
    entries = orddict:new()        :: orddict:orddict(),
    deferred = orddict:new()       :: orddict:orddict()
}).

%% Add 操作的 delta-mutator
%% 输入: 当前状态、要添加的元素、操作者 ID
%% 输出: delta(一个最小化的 ORSWOT 状态)
delta_add(Element, Actor, #orswot{clock = Clock}) ->
    NewSeq = riak_dt_vclock:get_counter(Actor, Clock) + 1,
    NewDot = {Actor, NewSeq},
    %% Delta 只包含新 dot 和最小必要的 clock 信息
    #orswot{
        clock   = riak_dt_vclock:merge(
                    [{Actor, NewSeq}], []),
        entries = orddict:store(Element, [NewDot], orddict:new()),
        deferred = orddict:new()
    }.

%% Remove 操作的 delta-mutator
%% 输入: 当前状态、要删除的元素
%% 输出: delta
delta_remove(Element, #orswot{entries = Entries, clock = Clock}) ->
    case orddict:find(Element, Entries) of
        {ok, Dots} ->
            %% Delta 包含被移除的 dots 信息
            %% 通过只包含 clock(覆盖这些 dots)但不包含 entries 来表示删除
            DotClock = lists:foldl(
                fun({Actor, Seq}, Acc) ->
                    riak_dt_vclock:merge([{Actor, Seq}], Acc)
                end, riak_dt_vclock:new(), Dots),
            #orswot{
                clock    = DotClock,
                entries  = orddict:new(), % 空 entries 表示这些 dots 被删除
                deferred = orddict:new()
            };
        error ->
            %% 元素不存在,空 delta
            #orswot{}
    end.

这段代码展示了 delta-mutator 的核心设计:add 操作的 delta 只包含新生成的 dot 和对应的元素;remove 操作的 delta 只包含被移除 dot 的版本信息。两种情况下,delta 的大小都与操作涉及的数据量成正比,而不是与整个集合大小成正比。

7.3 合并逻辑

Riak 的 ORSWOT merge 函数同时处理全量状态和 delta 的合并(因为 delta 本身就是一个合法的 ORSWOT 状态):

%% 合并两个 ORSWOT 状态
%% 核心逻辑: 
%% 1. 合并 clock(版本向量取逐位 max)
%% 2. 对于 entries,只保留那些 dot 不被对方 clock 覆盖的元素
%%    (被对方 clock 覆盖但不在对方 entries 中的 dot = 已被对方删除)
merge(#orswot{clock = CA, entries = EA},
      #orswot{clock = CB, entries = EB}) ->
    MergedClock = riak_dt_vclock:merge(CA, CB),
    %% 保留 A 的 entries 中不被 B 的 clock 覆盖的 dots
    %% 加上 B 的 entries 中不被 A 的 clock 覆盖的 dots
    %% 加上两边都有的 dots(取并集)
    MergedEntries = merge_entries(EA, CA, EB, CB),
    #orswot{
        clock   = MergedClock,
        entries = MergedEntries,
        deferred = orddict:new()
    }.

merge_entries(EA, CA, EB, CB) ->
    %% 对每个元素:
    %% - 如果只在 A 中:保留 A 中不被 CB 覆盖的 dots
    %% - 如果只在 B 中:保留 B 中不被 CA 覆盖的 dots
    %% - 如果两边都有:合并两边的 dots,去除被对方 clock 覆盖的
    AllKeys = orddict:fetch_keys(EA) ++ orddict:fetch_keys(EB),
    UniqueKeys = lists:usort(AllKeys),
    lists:foldl(fun(Key, Acc) ->
        DotsA = case orddict:find(Key, EA) of
            {ok, DA} -> [D || D = {Actor, Seq} <- DA,
                         not riak_dt_vclock:descends(CB, [{Actor, Seq}])
                         orelse orddict:is_key(Key, EB)];
            error    -> []
        end,
        DotsB = case orddict:find(Key, EB) of
            {ok, DB} -> [D || D = {Actor, Seq} <- DB,
                         not riak_dt_vclock:descends(CA, [{Actor, Seq}])
                         orelse orddict:is_key(Key, EA)];
            error    -> []
        end,
        MergedDots = lists:usort(DotsA ++ DotsB),
        case MergedDots of
            [] -> Acc;
            _  -> orddict:store(Key, MergedDots, Acc)
        end
    end, orddict:new(), UniqueKeys).

合并逻辑的精妙之处在于它同时实现了 add-wins 语义和隐式墓碑回收。一个 dot 如果被对方的 clock 覆盖但不在对方的 entries 中,说明对方已经删除了这个元素——不需要显式墓碑。

7.4 性能改进

Riak 官方博客和社区基准测试显示,delta-mutation 带来了显著的性能改善:

需要指出的一个权衡:delta 模式增加了实现复杂度。每种 CRDT 数据类型都需要实现对应的 delta-mutator,并且 delta-mutator 的正确性需要单独验证(必须满足 u(s) = s ⊔ u^δ(s))。Riak 团队为此编写了大量的 property-based test(基于性质的测试),使用 Erlang 的 PropEr 框架验证 delta-mutator 和 merge 函数的正确性。

八、实际系统中的应用

8.1 AntidoteDB

AntidoteDB 是一个由欧盟 SyncFree 和 LightKone 研究项目资助开发的分布式数据库,专门为地理分布式场景设计。它的核心特性是提供因果一致性(Causal Consistency)保证,同时支持高可用的 CRDT 操作。

AntidoteDB 的 CRDT 层大量使用了 delta-state 技术:

数据中心内同步:AntidoteDB 在同一数据中心内的副本之间使用基于 delta 的同步。每个数据中心维护一个分区日志(partition log),记录所有操作。副本之间通过交换日志条目(本质上是 delta)来同步。

跨数据中心同步:跨数据中心的同步使用因果一致的广播协议。AntidoteDB 维护了一个全局的向量时钟(每个数据中心一个条目),保证跨数据中心的操作按因果顺序传播。这与 delta-interval 的思想一致——只传输对方还没看到的操作。

AntidoteDB 的一个关键创新是”Cure”协议(发表于 ICDCS 2016)。Cure 提供了跨数据中心的因果一致性保证,同时保持了高可用性。它使用一种优化的依赖跟踪机制,不需要为每个操作维护完整的向量时钟,而是使用”稳定向量”(stable vector)来批量处理因果依赖。这种批量处理的思想与 delta-group 的聚合思想异曲同工。

AntidoteDB 在元数据管理方面也做了很多工作。它支持”因果稳定性”驱动的自动 GC——当一个操作的效果在所有数据中心都可见后,相关的元数据(如墓碑、版本向量条目)可以自动回收。

8.2 Redis CRDB(Active-Active)

Redis Enterprise 的 Active-Active 功能(前身为 Redis CRDB,Conflict-free Replicated Database)提供了跨地域的多主复制。底层使用 CRDT 来解决冲突。

Redis CRDB 支持的 CRDT 数据类型包括:

Redis CRDB 在同步优化上采用了与 delta-state 相似的策略:

操作日志传播:Redis CRDB 不传输完整的数据快照,而是传输操作日志(effect log)。每个写操作产生一个 effect(效果),effect 被复制到其他数据中心。这类似于 operation-based CRDT,但 effect 是幂等的——可以重复应用而不改变结果。这种”基于效果的传播”(effect-based replication)可以看作 delta-state 和 operation-based CRDT 的混合体。

压缩与批量传输:多个 effect 在传输前会被压缩和批量化。连续的 counter increment 操作会被合并为一个净增量,连续的 set add 操作会被合并为一次批量添加。这与 delta-group 的聚合思想一致。

基于 CRDT 的冲突解决:当两个数据中心对同一个 key 做了并发修改时,Redis CRDB 使用 CRDT 的合并语义来解决冲突。对于 Counter,合并是两个 PN-Counter 的逐节点 max。对于 Set,合并是 add-wins 的 OR-Set 合并。对于 String,合并是 LWW——时间戳更大的值胜出。

8.3 对比分析

特性 Riak (delta-mutation) AntidoteDB Redis CRDB
同步粒度 Delta-state Delta + 操作日志 Effect(幂等操作)
因果一致性 最终一致 因果一致(Cure 协议) 最终一致 + LWW
GC 策略 隐式(通过 dot+VV) 因果稳定性驱动 操作日志截断
元数据开销 低(无墓碑设计) 中等(维护因果信息) 低(LWW 为主)
跨 DC 同步 Merkle 树 + delta Cure 协议 操作日志复制
CRDT 类型丰富度 Counter, Set, Map, Register, Flag Counter, Set, Map, Register, MV-Register String, Counter, Set, Sorted Set, Hash, List

三个系统代表了 delta-state CRDT 在工程实践中的三种路线:

Riak 走的是纯 state-based 路线,通过 delta-mutation 优化带宽。它的 ORSWOT(无墓碑 OR-Set)设计在元数据管理上非常精巧。代价是实现复杂度高——每种数据类型都需要精心设计 delta-mutator 和 merge 函数。

AntidoteDB 走的是学术研究路线,将因果一致性作为核心保证。它的 Cure 协议在理论上更强(提供因果一致性而不仅仅是最终一致性),但实现和运维复杂度也更高。

Redis CRDB 走的是工程实用路线。它没有严格按照 delta-state CRDT 的学术框架来实现,而是结合了 operation-based 和 state-based 的思想,用”幂等效果”作为同步单元。这种设计在 Redis 的单线程模型下实现起来更自然,也更容易与 Redis 原有的 AOF 持久化机制集成。

参考文献

  1. Almeida, P. S., Shoker, A., & Baquero, C. (2018). Delta State Replicated Data Types. Journal of Parallel and Distributed Computing, 111, 162-173. https://arxiv.org/abs/1603.01529

  2. Almeida, P. S., Shoker, A., & Baquero, C. (2015). Efficient State-based CRDTs by Delta-Mutation. In Proceedings of the International Conference on Networked Systems (NETYS 2015). https://arxiv.org/abs/1410.2803

  3. van der Linde, A., Leitao, J., & Preguica, N. (2016). Delta-CRDTs: Making delta-CRDTs Delta-Based. In Proceedings of the 2nd Workshop on the Principles and Practice of Consistency for Distributed Data (PaPoC 2016). https://doi.org/10.1145/2911151.2911163

  4. Shapiro, M., Preguica, N., Baquero, C., & Zawirski, M. (2011). A comprehensive study of Convergent and Commutative Replicated Data Types. INRIA Technical Report RR-7506. https://hal.inria.fr/inria-00555588

  5. Akkoorath, D. D., et al. (2016). Cure: Strong Semantics Meets High Availability and Low Latency. In Proceedings of the 36th IEEE International Conference on Distributed Computing Systems (ICDCS 2016). https://doi.org/10.1109/ICDCS.2016.98

  6. DeCandia, G., et al. (2007). Dynamo: Amazon’s Highly Available Key-value Store. In Proceedings of the 21st ACM Symposium on Operating Systems Principles (SOSP 2007). https://doi.org/10.1145/1294261.1294281

  7. Basho Technologies. Riak KV Data Types documentation. https://docs.riak.com/riak/kv/latest/developing/data-types/

  8. Redis Ltd. Redis Active-Active Geo-Distribution (CRDB). https://redis.io/docs/latest/operate/rs/databases/active-active/

  9. Enes, V., Baquero, C., Rezende, T. F., Gotsman, A., Perrin, M., & Sutra, P. (2019). State-based CRDTs with δ-mutation: A Systematic Approach. arXiv preprint arXiv:1904.00450.

  10. Merkle, R. C. (1979). A Certified Digital Signature. In Proceedings of CRYPTO 1989. https://doi.org/10.1007/0-387-34805-0_21


上一篇:CRDT 在协同编辑中的应用 下一篇:MapReduce


By .