土法炼钢兴趣小组的算法知识备份

【分布式系统百科】数据再平衡:固定分区、动态分区与 TiKV 的调度策略

文章导航

标签入口
#分布式系统#数据分区#负载均衡#TiKV

目录

在上一篇文章中,我们讨论了分布式系统中的二级索引问题。本文将深入探讨数据再平衡(Rebalancing)的核心策略和实现细节。当分布式系统运行一段时间后,数据分布可能会变得不均匀,节点可能会加入或离开集群,这时就需要再平衡机制来重新分配数据,保证系统的负载均衡和高可用性。

一、为什么需要数据再平衡

在理想情况下,我们希望数据能够均匀分布在所有节点上,每个节点处理相同数量的请求。但在实际运行中,这种完美的均衡很难维持:

1.1 节点变更

集群规模不是静态的。在生产环境中,我们经常需要:

当集群拓扑发生变化时,原有的数据分布方案可能不再适用。例如,如果使用简单的哈希取模(hash(key) % N)来分配数据,当节点数 N 变化时,几乎所有的数据都需要重新分配,这会导致大规模的数据迁移。

1.2 数据倾斜

即使节点数量不变,数据分布也可能随时间变得不均匀:

数据倾斜会导致某些节点负载过高,成为系统的瓶颈,而其他节点资源利用率低。

1.3 硬件异构性

在大规模集群中,不同机器的硬件配置可能不同:

理想的再平衡策略应该能够识别硬件差异,给高配置的节点分配更多的数据和负载。

1.4 再平衡的目标

一个好的再平衡机制需要满足以下目标:

  1. 负载均衡:尽量让每个节点的负载(数据量、请求量、CPU、内存、磁盘 I/O)都处于合理范围
  2. 最小化迁移量:数据迁移是有成本的(网络带宽、磁盘 I/O),应该尽量减少迁移的数据量
  3. 保持可用性:再平衡过程中,系统应该继续对外提供服务,不能因为迁移而中断
  4. 可配置性:不同的业务场景对负载均衡的要求不同,应该提供灵活的配置选项
  5. 故障隔离:再平衡过程中如果某个节点失败,不应该影响整个集群

二、固定分区数策略

固定分区数(Fixed Partition Count)是最简单直观的再平衡策略,被 Riak、Elasticsearch、Couchbase 等系统采用。

2.1 核心思想

在集群创建时,预先分配一个固定数量的分区(Partition),这个数量通常远大于节点数。例如:

假设我们有一个 3 节点的集群,预先创建了 12 个分区:

节点 A: [P0, P1, P2, P3]
节点 B: [P4, P5, P6, P7]
节点 C: [P8, P9, P10, P11]

当新增一个节点 D 时,可以从每个现有节点转移 1 个分区:

节点 A: [P0, P1, P2]
节点 B: [P4, P5, P6]
节点 C: [P8, P9, P10]
节点 D: [P3, P7, P11]

2.2 分区数量的选择

分区数量的选择是一个重要的决策,需要平衡多个因素:

分区太少的问题: - 再平衡的粒度太粗,难以实现精细的负载均衡 - 单个分区过大,迁移时间长,影响系统可用性 - 无法充分利用多核 CPU 的并行能力

分区太多的问题: - 每个分区都有元数据开销(内存、文件描述符、管理结构) - 分区间的协调成本增加 - 某些操作(如集群范围的扫描)需要遍历所有分区

经验法则: - Elasticsearch 默认每个索引创建 5 个主分片(Shard),每个主分片可以有多个副本 - Riak 推荐 64 到 1024 个分区(Ring Size),具体取决于集群规模 - 一般建议分区数是节点数的 10-100 倍

2.3 分区分配算法

当需要重新分配分区时,如何决定哪些分区应该移动到哪里?一个简单的策略是:

class FixedPartitionRebalancer:
    def __init__(self, num_partitions, num_replicas):
        self.num_partitions = num_partitions
        self.num_replicas = num_replicas
        
    def assign_partitions(self, nodes):
        """
        将分区均匀分配给节点
        返回: {partition_id: [node_list]}
        """
        if len(nodes) == 0:
            return {}
            
        assignments = {}
        nodes_list = sorted(nodes)
        
        for partition_id in range(self.num_partitions):
            # 为每个分区选择 num_replicas 个节点
            replicas = []
            for replica_idx in range(self.num_replicas):
                # 使用一致性的方式选择节点
                node_idx = (partition_id * self.num_replicas + replica_idx) % len(nodes_list)
                replicas.append(nodes_list[node_idx])
            assignments[partition_id] = replicas
            
        return assignments
    
    def compute_moves(self, old_assignment, new_assignment):
        """
        计算需要移动的分区
        返回: [(partition_id, from_node, to_node)]
        """
        moves = []
        
        for partition_id in new_assignment:
            old_nodes = set(old_assignment.get(partition_id, []))
            new_nodes = set(new_assignment[partition_id])
            
            # 找出需要添加的副本
            to_add = new_nodes - old_nodes
            # 找出需要删除的副本
            to_remove = old_nodes - new_nodes
            
            # 配对删除和添加,形成移动操作
            to_add_list = list(to_add)
            to_remove_list = list(to_remove)
            
            for i in range(min(len(to_add_list), len(to_remove_list))):
                moves.append((partition_id, to_remove_list[i], to_add_list[i]))
                
        return moves

# 使用示例
rebalancer = FixedPartitionRebalancer(num_partitions=256, num_replicas=3)

# 初始 4 个节点
old_nodes = ['node1', 'node2', 'node3', 'node4']
old_assignment = rebalancer.assign_partitions(old_nodes)

# 扩容到 6 个节点
new_nodes = ['node1', 'node2', 'node3', 'node4', 'node5', 'node6']
new_assignment = rebalancer.assign_partitions(new_nodes)

# 计算需要迁移的分区
moves = rebalancer.compute_moves(old_assignment, new_assignment)
print(f"需要迁移 {len(moves)} 个分区副本")

2.4 优势与劣势

优势

  1. 简单易懂:分区的概念清晰,实现相对简单
  2. 可预测性:分区数量固定,系统行为容易预测和监控
  3. 迁移效率高:只需要移动整个分区,不需要拆分或合并操作
  4. 并发度高:多个分区可以同时迁移,充分利用网络带宽

劣势

  1. 分区数难以改变:一旦确定分区数量,后续很难修改。如果初始选择不当,可能导致:
    • 分区太少:无法充分扩展
    • 分区太多:元数据开销过大
  2. 分区大小随数据增长:随着数据量增加,每个分区也会变大,最终可能导致单个分区过大
  3. 空集群冷启动问题:在集群初始化时,所有分区都是空的,无法体现出负载均衡的优势

2.5 Elasticsearch 的实现

Elasticsearch 是固定分区策略的典型代表。每个索引(Index)在创建时指定主分片(Primary Shard)数量,这个数量在索引创建后无法修改:

PUT /my_index
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1
  }
}

这意味着索引 my_index 有 5 个主分片,每个主分片有 1 个副本,总共 10 个分片。

当集群中添加新节点时,Elasticsearch 会自动将部分分片迁移到新节点:

迁移前:
节点1: [P0, P1, R2]
节点2: [P2, P3, R0]
节点3: [P4, R1, R3, R4]

迁移后 (新增节点4):
节点1: [P0, R2]
节点2: [P2, P3]
节点3: [P4, R1]
节点4: [R0, R3, R4]

Elasticsearch 的分片分配器(Shard Allocator)会综合考虑多个因素:

三、动态分区策略

动态分区(Dynamic Partitioning)策略会根据数据量的变化自动调整分区的数量和大小,代表系统包括 HBase、TiKV、RethinkDB。

3.1 核心思想

与固定分区不同,动态分区在运行时可以:

这类似于 B+ 树的节点分裂和合并操作。

3.2 分裂操作

当一个分区的大小超过阈值(例如 64MB 或 96MB)时,系统会将其分裂成两个分区。分裂点的选择有几种策略:

中点分裂: 找到分区中间的 key,以此为界分裂:

原分区: [a, ..., m, ..., z]
分裂后:
  分区1: [a, ..., m)
  分区2: [m, ..., z]

负载分裂: 根据访问热度选择分裂点,将热点 key 隔离到单独的分区:

原分区: [key1: 100 QPS, key2: 10000 QPS, key3: 50 QPS, ...]
分裂后:
  分区1: [key1, key2)  # 包含普通 key
  分区2: [key2, key3)  # 隔离热点 key
  分区3: [key3, ...]

3.3 合并操作

当一个分区的大小低于阈值(例如 32MB)时,可以将其与相邻分区合并。合并操作需要满足一些条件:

class DynamicPartition:
    def __init__(self, start_key, end_key, size_mb, qps):
        self.start_key = start_key
        self.end_key = end_key
        self.size_mb = size_mb
        self.qps = qps
        
class DynamicPartitionManager:
    def __init__(self, split_threshold_mb=96, merge_threshold_mb=32):
        self.split_threshold_mb = split_threshold_mb
        self.merge_threshold_mb = merge_threshold_mb
        self.partitions = []
        
    def check_split(self, partition):
        """
        检查是否需要分裂
        """
        if partition.size_mb > self.split_threshold_mb:
            # 找到中点 key
            mid_key = self.find_median_key(partition)
            
            # 创建两个新分区
            left = DynamicPartition(
                partition.start_key, 
                mid_key, 
                partition.size_mb / 2,
                partition.qps / 2
            )
            right = DynamicPartition(
                mid_key, 
                partition.end_key, 
                partition.size_mb / 2,
                partition.qps / 2
            )
            
            return (left, right)
        return None
    
    def check_merge(self, partition1, partition2):
        """
        检查两个相邻分区是否可以合并
        """
        if partition1.end_key != partition2.start_key:
            return None  # 不相邻
            
        total_size = partition1.size_mb + partition2.size_mb
        
        # 只有两个都很小,或者合并后不超过阈值时才合并
        if (partition1.size_mb < self.merge_threshold_mb and 
            partition2.size_mb < self.merge_threshold_mb and
            total_size < self.split_threshold_mb):
            
            merged = DynamicPartition(
                partition1.start_key,
                partition2.end_key,
                total_size,
                partition1.qps + partition2.qps
            )
            return merged
            
        return None
    
    def find_median_key(self, partition):
        """
        找到分区的中位数 key (简化实现)
        """
        # 实际实现需要扫描分区数据
        # 这里简化为中点
        if isinstance(partition.start_key, str) and isinstance(partition.end_key, str):
            # 字符串的中点
            return partition.start_key + (partition.end_key - partition.start_key) // 2
        return (partition.start_key + partition.end_key) // 2
    
    def rebalance(self):
        """
        执行再平衡:检查所有分区,执行必要的分裂和合并
        """
        i = 0
        while i < len(self.partitions):
            partition = self.partitions[i]
            
            # 检查分裂
            split_result = self.check_split(partition)
            if split_result:
                left, right = split_result
                print(f"分裂分区 [{partition.start_key}, {partition.end_key}) "
                      f"-> [{left.start_key}, {left.end_key}) 和 "
                      f"[{right.start_key}, {right.end_key})")
                self.partitions[i] = left
                self.partitions.insert(i + 1, right)
                i += 2
                continue
            
            # 检查合并
            if i + 1 < len(self.partitions):
                next_partition = self.partitions[i + 1]
                merge_result = self.check_merge(partition, next_partition)
                if merge_result:
                    print(f"合并分区 [{partition.start_key}, {partition.end_key}) 和 "
                          f"[{next_partition.start_key}, {next_partition.end_key}) "
                          f"-> [{merge_result.start_key}, {merge_result.end_key})")
                    self.partitions[i] = merge_result
                    self.partitions.pop(i + 1)
                    continue
            
            i += 1

# 使用示例
manager = DynamicPartitionManager()
manager.partitions = [
    DynamicPartition(0, 100, 120, 1000),    # 太大,需要分裂
    DynamicPartition(100, 200, 20, 100),    # 正常
    DynamicPartition(200, 250, 15, 50),     # 太小
    DynamicPartition(250, 280, 18, 60),     # 太小,可以与前一个合并
]

manager.rebalance()

3.4 初始分区问题与预分裂

动态分区策略有一个显著的问题:在集群刚创建时,通常只有一个分区覆盖整个 key 空间。这导致:

为了解决这个问题,许多系统支持预分裂(Pre-splitting)

def pre_split(key_range, num_partitions):
    """
    预先将 key 空间分裂成多个分区
    """
    start, end = key_range
    step = (end - start) // num_partitions
    
    partitions = []
    for i in range(num_partitions):
        partition_start = start + i * step
        partition_end = start + (i + 1) * step if i < num_partitions - 1 else end
        partitions.append(DynamicPartition(partition_start, partition_end, 0, 0))
    
    return partitions

# 将 key 空间 [0, 1000000) 预分裂成 10 个分区
initial_partitions = pre_split((0, 1000000), 10)

在 HBase 中,可以在创建表时指定预分裂点:

byte[][] splits = new byte[][] {
    Bytes.toBytes("row_100"),
    Bytes.toBytes("row_200"),
    Bytes.toBytes("row_300"),
    // ...
};
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("mytable"));
admin.createTable(desc, splits);

3.5 优势与劣势

优势

  1. 自适应:分区数量和大小会根据实际数据量自动调整
  2. 适合数据增长:随着数据增长,分区数量增加,始终保持每个分区的大小在合理范围
  3. 空集群友好:可以通过预分裂解决初始化问题

劣势

  1. 协调开销:分裂和合并操作需要协调多个节点,增加了系统复杂度
  2. 元数据管理:分区数量动态变化,元数据管理更复杂
  3. 分裂时的性能抖动:分裂操作可能需要扫描整个分区,影响正常请求的延迟

四、按比例分区策略

按比例分区(Proportional Partitioning)是 Cassandra 采用的策略,它试图结合固定分区和动态分区的优点。

4.1 核心思想

每个节点负责固定数量的虚拟节点(Virtual Node, vnode)或分区。当集群规模变化时:

假设每个节点负责 256 个 vnode:

节点数 = 4, 总 vnode = 1024
节点数 = 5, 总 vnode = 1280 (新节点触发 256 次分裂)
节点数 = 6, 总 vnode = 1536

4.2 Cassandra 的实现

Cassandra 使用一致性哈希(Consistent Hashing)环,每个 vnode 对应环上的一个 token:

import hashlib
import bisect

class VirtualNode:
    def __init__(self, node_id, token, vnode_index):
        self.node_id = node_id
        self.token = token
        self.vnode_index = vnode_index
        
    def __repr__(self):
        return f"VNode({self.node_id}, token={self.token}, idx={self.vnode_index})"

class ConsistentHashRing:
    def __init__(self, vnodes_per_node=256):
        self.vnodes_per_node = vnodes_per_node
        self.ring = []  # 有序的 vnode 列表
        self.nodes = set()
        
    def add_node(self, node_id):
        """
        添加新节点:为其生成 vnodes_per_node 个 token
        """
        if node_id in self.nodes:
            return
            
        self.nodes.add(node_id)
        
        for i in range(self.vnodes_per_node):
            # 生成确定性的 token
            token_str = f"{node_id}:{i}"
            token = int(hashlib.md5(token_str.encode()).hexdigest(), 16)
            
            vnode = VirtualNode(node_id, token, i)
            bisect.insort(self.ring, (token, vnode))
        
        print(f"节点 {node_id} 加入,创建了 {self.vnodes_per_node} 个 vnode")
        print(f"总 vnode 数: {len(self.ring)}")
    
    def remove_node(self, node_id):
        """
        移除节点:删除其所有 vnode
        """
        if node_id not in self.nodes:
            return
            
        self.nodes.remove(node_id)
        self.ring = [(token, vnode) for token, vnode in self.ring 
                     if vnode.node_id != node_id]
        
        print(f"节点 {node_id} 移除,删除了 {self.vnodes_per_node} 个 vnode")
    
    def get_node(self, key):
        """
        根据 key 找到负责的节点
        """
        if not self.ring:
            return None
            
        key_hash = int(hashlib.md5(key.encode()).hexdigest(), 16)
        
        # 找到第一个 token >= key_hash 的 vnode
        idx = bisect.bisect_left([token for token, _ in self.ring], key_hash)
        
        if idx == len(self.ring):
            idx = 0  # 环形结构
            
        return self.ring[idx][1].node_id
    
    def get_distribution(self):
        """
        统计每个节点负责的 vnode 数量
        """
        distribution = {}
        for _, vnode in self.ring:
            distribution[vnode.node_id] = distribution.get(vnode.node_id, 0) + 1
        return distribution

# 使用示例
ring = ConsistentHashRing(vnodes_per_node=256)

# 初始 3 个节点
ring.add_node("node1")
ring.add_node("node2")
ring.add_node("node3")

print("\n初始分布:", ring.get_distribution())

# 添加第 4 个节点
ring.add_node("node4")
print("\n添加 node4 后分布:", ring.get_distribution())

# 测试 key 分布
keys = [f"key_{i}" for i in range(10000)]
node_counts = {}
for key in keys:
    node = ring.get_node(key)
    node_counts[node] = node_counts.get(node, 0) + 1

print("\n10000 个 key 的分布:")
for node, count in sorted(node_counts.items()):
    print(f"  {node}: {count} keys ({count/100:.1f}%)")

4.3 优势与劣势

优势

  1. 分区大小有界:每个节点的 vnode 数量固定,因此每个 vnode 的平均大小相对稳定
  2. 灵活性高:添加/移除节点时,只影响相邻的 vnode
  3. 随机性好:通过哈希函数,数据分布更加随机,避免了人为选择分裂点的偏差

劣势

  1. 分裂随机性:当添加节点时,哪些分区被分裂是随机的,可能不是最优选择
  2. 元数据复杂:vnode 数量多,元数据管理开销大
  3. 范围查询不友好:由于数据被随机分散,范围查询需要访问多个节点

五、再平衡与一致性

再平衡过程中,数据正在从一个节点迁移到另一个节点。这期间系统需要保证:

  1. 一致性:读操作应该能看到最新写入的数据
  2. 可用性:读写操作不应该被阻塞
  3. 不重不丢:每条数据都应该被迁移一次,不能丢失也不能重复

5.1 两阶段迁移

大多数系统采用两阶段方法:

阶段 1:复制数据(Copy Phase)

阶段 2:切换(Switchover Phase)

class PartitionMigration:
    def __init__(self, partition_id, source_node, target_node):
        self.partition_id = partition_id
        self.source_node = source_node
        self.target_node = target_node
        self.state = "init"
        self.copied_keys = set()
        self.pending_writes = []
        
    def phase1_copy(self):
        """
        阶段 1: 复制数据
        """
        print(f"开始复制分区 {self.partition_id}{self.source_node}{self.target_node}")
        self.state = "copying"
        
        # 获取源节点的所有 key
        keys = self.source_node.get_keys(self.partition_id)
        
        # 批量复制数据
        batch_size = 1000
        for i in range(0, len(keys), batch_size):
            batch = keys[i:i+batch_size]
            
            # 读取源节点数据
            data = self.source_node.read_batch(batch)
            
            # 写入目标节点
            self.target_node.write_batch(data)
            
            # 记录已复制的 key
            self.copied_keys.update(batch)
            
            print(f"  已复制 {len(self.copied_keys)}/{len(keys)} 个 key")
        
        print(f"分区 {self.partition_id} 数据复制完成")
        
    def phase2_switchover(self):
        """
        阶段 2: 切换
        """
        print(f"开始切换分区 {self.partition_id}")
        self.state = "switching"
        
        # 1. 短暂阻塞写操作(设置只读模式)
        self.source_node.set_readonly(self.partition_id)
        
        # 2. 复制增量数据(在只读期间产生的写请求)
        incremental_data = self.source_node.get_incremental_writes(self.partition_id)
        if incremental_data:
            self.target_node.write_batch(incremental_data)
            print(f"  复制了 {len(incremental_data)} 个增量写入")
        
        # 3. 更新元数据服务器:分区所有权转移
        metadata_service.update_partition_owner(self.partition_id, self.target_node)
        
        # 4. 目标节点开始服务
        self.target_node.set_active(self.partition_id)
        
        # 5. 源节点停止服务
        self.source_node.remove_partition(self.partition_id)
        
        self.state = "completed"
        print(f"分区 {self.partition_id} 切换完成")
        
    def execute(self):
        """
        执行完整的迁移流程
        """
        try:
            self.phase1_copy()
            self.phase2_switchover()
            return True
        except Exception as e:
            print(f"迁移失败: {e}")
            self.state = "failed"
            return False

5.2 读请求的处理

在迁移期间,读请求应该如何处理?有几种策略:

策略 1:总是读源节点 - 简单,但迁移期间源节点压力大

策略 2:总是读目标节点 - 需要等待阶段 1 完成 - 目标节点可能数据不完整

策略 3:双读(Read-Both) - 同时读源节点和目标节点,取最新的结果 - 兼顾一致性,但增加了延迟

策略 4:路由表版本号 - 给每个分区分配一个版本号(Epoch) - 读请求带上版本号,节点检查自己是否是该版本的 owner

class PartitionRouter:
    def __init__(self):
        self.routing_table = {}  # {partition_id: (node, epoch)}
        self.current_epoch = 0
        
    def get_node(self, partition_id):
        """
        获取分区的当前负责节点
        """
        return self.routing_table.get(partition_id)
    
    def start_migration(self, partition_id, source_node, target_node):
        """
        开始迁移:增加 epoch,设置双节点模式
        """
        self.current_epoch += 1
        self.routing_table[partition_id] = {
            'source': source_node,
            'target': target_node,
            'epoch': self.current_epoch,
            'state': 'migrating'
        }
        
    def finish_migration(self, partition_id, target_node):
        """
        完成迁移:切换到目标节点
        """
        self.current_epoch += 1
        self.routing_table[partition_id] = {
            'source': None,
            'target': target_node,
            'epoch': self.current_epoch,
            'state': 'active'
        }
    
    def read(self, partition_id, key):
        """
        读取数据,处理迁移期间的情况
        """
        info = self.routing_table.get(partition_id)
        if not info:
            raise Exception(f"分区 {partition_id} 不存在")
        
        if info['state'] == 'active':
            # 正常状态:读目标节点
            return info['target'].read(key)
        
        elif info['state'] == 'migrating':
            # 迁移中:优先读源节点,如果源节点没有则读目标节点
            value = info['source'].read(key)
            if value is None:
                value = info['target'].read(key)
            return value
        
    def write(self, partition_id, key, value):
        """
        写入数据,处理迁移期间的情况
        """
        info = self.routing_table.get(partition_id)
        if not info:
            raise Exception(f"分区 {partition_id} 不存在")
        
        if info['state'] == 'active':
            # 正常状态:写目标节点
            info['target'].write(key, value)
        
        elif info['state'] == 'migrating':
            # 迁移中:双写
            info['source'].write(key, value)
            info['target'].write(key, value)

5.3 幂等性保证

网络不可靠,迁移操作可能失败重试。系统需要保证操作的幂等性:

class IdempotentMigration:
    def copy_key_idempotent(self, key, value, version):
        """
        幂等的数据复制
        """
        existing = self.target_node.read_with_version(key)
        
        if existing is None:
            # 目标节点没有这个 key,直接写入
            self.target_node.write_with_version(key, value, version)
        elif existing['version'] < version:
            # 目标节点的版本较旧,覆盖
            self.target_node.write_with_version(key, value, version)
        else:
            # 目标节点的版本更新或相同,跳过
            pass

六、TiKV 的 PD 调度策略

TiKV 是 PingCAP 开发的分布式 KV 存储,使用动态分区策略。其调度系统 PD(Placement Driver)是一个非常优秀的实现,值得深入学习。

6.1 TiKV 架构概述

TiKV 集群由三个主要组件组成:

每个 Region 是一个 Raft 组,默认大小为 96MB,超过阈值会自动分裂。Region 有 3 个副本(可配置),分布在不同的 TiKV 节点上。

6.2 心跳机制

PD 如何感知集群状态?通过心跳(Heartbeat)机制:

Store Heartbeat(节点心跳): 每个 TiKV 节点定期(默认 10 秒)向 PD 发送心跳:

type StoreHeartbeat struct {
    StoreID       uint64
    Capacity      uint64  // 总容量
    Available     uint64  // 可用容量
    RegionCount   uint32  // Region 数量
    LeaderCount   uint32  // Leader Region 数量
    BytesWritten  uint64  // 写入字节数
    BytesRead     uint64  // 读取字节数
    KeysWritten   uint64  // 写入 key 数
    KeysRead      uint64  // 读取 key 数
    IsBusy        bool    // 是否繁忙
    StartTime     uint64  // 启动时间
}

Region Heartbeat(分区心跳): 每个 Region 的 Leader 定期(默认 60 秒)向 PD 发送心跳:

type RegionHeartbeat struct {
    RegionID      uint64
    Leader        *Peer      // 当前 Leader
    Peers         []*Peer    // 所有副本
    StartKey      []byte     // 起始 key
    EndKey        []byte     // 结束 key
    ApproximateSize uint64   // 大小估算
    ApproximateKeys uint64   // key 数量估算
    BytesWritten  uint64     // 写入速率
    BytesRead     uint64     // 读取速率
    KeysWritten   uint64
    KeysRead      uint64
}

PD 收集这些心跳信息,建立集群的全局视图。

6.3 调度器(Schedulers)

PD 包含多个独立的调度器,每个调度器负责一个特定的目标:

6.3.1 Balance Leader Scheduler

目标:平衡每个节点的 Leader Region 数量。

Leader 处理所有读写请求,因此 Leader 的分布直接影响负载均衡。

class BalanceLeaderScheduler:
    def __init__(self):
        self.name = "balance-leader"
        
    def schedule(self, cluster_info):
        """
        生成 Leader 平衡的调度操作
        """
        stores = cluster_info.get_stores()
        
        # 统计每个节点的 Leader 数量
        leader_counts = {}
        for store in stores:
            leader_counts[store.id] = store.leader_count
        
        # 找到 Leader 最多和最少的节点
        max_store = max(stores, key=lambda s: s.leader_count)
        min_store = min(stores, key=lambda s: s.leader_count)
        
        # 如果差距太小,不需要调度
        if max_store.leader_count - min_store.leader_count <= 1:
            return None
        
        # 从 max_store 选择一个 Region,将其 Leader 转移到 min_store
        candidate_region = self.select_region(cluster_info, max_store, min_store)
        
        if candidate_region:
            return {
                'type': 'transfer-leader',
                'region_id': candidate_region.id,
                'from_store': max_store.id,
                'to_store': min_store.id
            }
        
        return None
    
    def select_region(self, cluster_info, from_store, to_store):
        """
        选择合适的 Region 进行 Leader 转移
        """
        # 找到在 from_store 上是 Leader,且在 to_store 上有副本的 Region
        for region in cluster_info.get_regions():
            if region.leader_store_id == from_store.id:
                for peer in region.peers:
                    if peer.store_id == to_store.id:
                        return region
        return None

Leader 转移操作非常快(通常几毫秒),因为只需要 Raft 组内部协商,不需要数据迁移。

6.3.2 Balance Region Scheduler

目标:平衡每个节点的 Region 数量和数据量。

class BalanceRegionScheduler:
    def __init__(self):
        self.name = "balance-region"
        
    def schedule(self, cluster_info):
        """
        生成 Region 平衡的调度操作
        """
        stores = cluster_info.get_stores()
        
        # 按数据量排序
        stores_by_size = sorted(stores, key=lambda s: s.used_size, reverse=True)
        
        source_store = stores_by_size[0]
        target_store = stores_by_size[-1]
        
        # 检查是否需要平衡
        avg_size = sum(s.used_size for s in stores) / len(stores)
        
        if source_store.used_size < avg_size * 1.1:
            return None  # 已经足够均衡
        
        # 选择一个 Region 从 source_store 移动到 target_store
        candidate_region = self.select_region(cluster_info, source_store, target_store)
        
        if candidate_region:
            return {
                'type': 'move-peer',
                'region_id': candidate_region.id,
                'from_store': source_store.id,
                'to_store': target_store.id
            }
        
        return None
    
    def select_region(self, cluster_info, from_store, to_store):
        """
        选择合适的 Region 进行迁移
        """
        # 找到在 from_store 上有副本的 Region
        candidates = []
        for region in cluster_info.get_regions():
            if any(peer.store_id == from_store.id for peer in region.peers):
                # 检查 to_store 上是否还没有这个 Region 的副本
                if not any(peer.store_id == to_store.id for peer in region.peers):
                    candidates.append(region)
        
        # 优先选择较大的 Region,加快平衡速度
        if candidates:
            return max(candidates, key=lambda r: r.approximate_size)
        
        return None

Region 迁移步骤: 1. 在 target_store 上添加一个新的 Learner 副本 2. 等待 Learner 追上 Leader 的日志 3. 将 Learner 提升为 Follower 4. 删除 from_store 上的旧副本

6.3.3 Hot Region Scheduler

目标:识别热点 Region,将其分散到不同节点。

热点检测基于 Region 的读写速率:

class HotRegionScheduler:
    def __init__(self, hot_threshold_bytes=10 * 1024 * 1024):  # 10MB/s
        self.name = "hot-region"
        self.hot_threshold = hot_threshold_bytes
        self.hot_cache = {}  # {region_id: hot_info}
        
    def update_hot_cache(self, region_heartbeat):
        """
        更新热点缓存
        """
        region_id = region_heartbeat.region_id
        
        # 计算读写速率 (字节/秒)
        write_rate = region_heartbeat.bytes_written
        read_rate = region_heartbeat.bytes_read
        
        total_rate = write_rate + read_rate
        
        if total_rate > self.hot_threshold:
            self.hot_cache[region_id] = {
                'region_id': region_id,
                'store_id': region_heartbeat.leader.store_id,
                'write_rate': write_rate,
                'read_rate': read_rate,
                'total_rate': total_rate,
                'last_update': time.time()
            }
        else:
            # 不再热,从缓存中移除
            self.hot_cache.pop(region_id, None)
    
    def schedule(self, cluster_info):
        """
        生成热点平衡的调度操作
        """
        # 统计每个节点的热点负载
        store_hot_load = {}
        for store in cluster_info.get_stores():
            store_hot_load[store.id] = 0
        
        for hot_info in self.hot_cache.values():
            store_id = hot_info['store_id']
            store_hot_load[store_id] += hot_info['total_rate']
        
        # 找到最热和最冷的节点
        if not store_hot_load:
            return None
        
        hottest_store = max(store_hot_load.items(), key=lambda x: x[1])
        coldest_store = min(store_hot_load.items(), key=lambda x: x[1])
        
        # 检查是否需要调度
        if hottest_store[1] < coldest_store[1] * 2:
            return None  # 热点分布相对均衡
        
        # 从 hottest_store 选择一个热点 Region,转移 Leader 到 coldest_store
        hot_region = self.select_hot_region(cluster_info, hottest_store[0], coldest_store[0])
        
        if hot_region:
            return {
                'type': 'transfer-leader',
                'region_id': hot_region['region_id'],
                'from_store': hottest_store[0],
                'to_store': coldest_store[0],
                'reason': 'hot-region'
            }
        
        return None
    
    def select_hot_region(self, cluster_info, from_store, to_store):
        """
        选择热点 Region
        """
        for hot_info in self.hot_cache.values():
            if hot_info['store_id'] == from_store:
                # 检查这个 Region 在 to_store 上是否有副本
                region = cluster_info.get_region(hot_info['region_id'])
                if any(peer.store_id == to_store for peer in region.peers):
                    return hot_info
        return None

6.4 调度约束(Placement Rules)

TiKV 支持灵活的放置规则,可以控制数据的分布策略:

标签(Label): 每个 TiKV 节点可以配置多个标签,例如:

[server]
labels = "zone=z1,rack=r1,host=h1"

放置规则示例

{
  "group_id": "pd",
  "id": "default",
  "role": "voter",
  "count": 3,
  "location_labels": ["zone", "rack", "host"],
  "constraints": [
    {"key": "zone", "op": "in", "values": ["z1", "z2", "z3"]}
  ]
}

这个规则表示: - 每个 Region 有 3 个 voter 副本 - 副本应该分布在不同的 zone、rack、host - 只能放在 z1、z2、z3 这三个 zone

PD 在调度时会遵守这些约束:

class PlacementChecker:
    def check_placement(self, region, placement_rule):
        """
        检查 Region 的副本是否满足放置规则
        """
        peers = region.peers
        
        # 检查副本数量
        if len(peers) != placement_rule['count']:
            return False, "副本数量不匹配"
        
        # 检查位置隔离
        location_labels = placement_rule['location_labels']
        
        for label in location_labels:
            label_values = [self.get_store_label(peer.store_id, label) for peer in peers]
            
            # 同一层级的标签值应该各不相同(隔离)
            if len(label_values) != len(set(label_values)):
                return False, f"标签 {label} 没有隔离"
        
        # 检查约束条件
        for constraint in placement_rule.get('constraints', []):
            if not self.check_constraint(peers, constraint):
                return False, f"违反约束 {constraint}"
        
        return True, "满足放置规则"
    
    def check_constraint(self, peers, constraint):
        """
        检查约束条件
        """
        key = constraint['key']
        op = constraint['op']
        values = constraint['values']
        
        for peer in peers:
            label_value = self.get_store_label(peer.store_id, key)
            
            if op == 'in':
                if label_value not in values:
                    return False
            elif op == 'not_in':
                if label_value in values:
                    return False
        
        return True

6.5 调度优先级与并发限制

PD 不会同时执行所有调度操作,而是有优先级和并发限制:

优先级: 1. 高优先级:修复副本缺失、修复副本位置违规 2. 中优先级:热点平衡 3. 低优先级:常规的 Region 和 Leader 平衡

并发限制

class SchedulerManager:
    def __init__(self):
        self.schedulers = []
        self.max_pending_operators = 10  # 最多同时执行 10 个操作
        self.max_snapshot_count = 3      # 最多同时传输 3 个快照
        self.pending_operators = []
        
    def run_schedulers(self, cluster_info):
        """
        运行所有调度器,生成调度操作
        """
        if len(self.pending_operators) >= self.max_pending_operators:
            return  # 已达到并发上限
        
        for scheduler in self.schedulers:
            operator = scheduler.schedule(cluster_info)
            
            if operator:
                # 检查是否可以添加这个操作
                if self.can_add_operator(operator):
                    self.pending_operators.append(operator)
                    print(f"添加调度操作: {operator}")
                    
                    if len(self.pending_operators) >= self.max_pending_operators:
                        break
    
    def can_add_operator(self, operator):
        """
        检查是否可以添加新的调度操作
        """
        # 检查是否涉及正在迁移的 Region
        for pending in self.pending_operators:
            if pending['region_id'] == operator['region_id']:
                return False  # 同一个 Region 不能同时有多个操作
        
        # 检查快照传输数量
        if operator['type'] == 'move-peer':
            snapshot_count = sum(1 for op in self.pending_operators if op['type'] == 'move-peer')
            if snapshot_count >= self.max_snapshot_count:
                return False
        
        return True

6.6 调度算子(Operator)

PD 生成的调度决策称为算子(Operator),一个算子可能包含多个步骤:

type Operator struct {
    RegionID uint64
    Kind     OperatorKind  // transfer-leader, add-peer, remove-peer, etc.
    Steps    []OperatorStep
    Status   OperatorStatus
}

type OperatorStep struct {
    Type     StepType  // add_learner, promote_learner, transfer_leader, remove_peer
    PeerID   uint64
    StoreID  uint64
}

// 示例:将 Region 1 从 Store 1 迁移到 Store 4
operator := Operator{
    RegionID: 1,
    Kind: OpRegion,
    Steps: []OperatorStep{
        {Type: AddLearner, StoreID: 4},     // 在 Store 4 上添加 Learner
        {Type: PromoteLearner, StoreID: 4}, // 提升为 Follower
        {Type: RemovePeer, StoreID: 1},     // 移除 Store 1 的副本
    },
}

TiKV 节点收到 Operator 后,会逐步执行这些步骤,并向 PD 报告进度。

七、性能冲突与限流

再平衡操作会消耗系统资源,与正常的业务请求竞争:

7.1 资源竞争

网络带宽: - 数据迁移需要在节点间传输大量数据 - 可能占满网络带宽,影响正常请求

磁盘 I/O: - 源节点需要读取数据 - 目标节点需要写入数据 - 与正常的读写操作竞争磁盘

CPU 和内存: - 数据的序列化、反序列化、压缩、校验需要 CPU - 缓存可能被迁移数据污染

7.2 限流机制

为了避免再平衡影响正常业务,需要对迁移速度进行限流:

import time

class RateLimiter:
    def __init__(self, rate_bytes_per_sec):
        self.rate = rate_bytes_per_sec
        self.allowance = rate_bytes_per_sec
        self.last_check = time.time()
        
    def acquire(self, bytes_count):
        """
        获取指定字节数的配额,如果超限则阻塞
        """
        current_time = time.time()
        elapsed = current_time - self.last_check
        
        # 补充配额
        self.allowance += elapsed * self.rate
        self.allowance = min(self.allowance, self.rate)  # 不超过上限
        
        self.last_check = current_time
        
        if self.allowance >= bytes_count:
            # 有足够配额,直接消耗
            self.allowance -= bytes_count
            return
        else:
            # 配额不足,需要等待
            wait_time = (bytes_count - self.allowance) / self.rate
            time.sleep(wait_time)
            self.allowance = 0

class ThrottledMigration:
    def __init__(self, rate_limit_mbps=100):
        self.rate_limiter = RateLimiter(rate_limit_mbps * 1024 * 1024)
        
    def migrate_data(self, source, target, partition_id):
        """
        带限流的数据迁移
        """
        keys = source.get_keys(partition_id)
        batch_size = 1000
        
        for i in range(0, len(keys), batch_size):
            batch = keys[i:i+batch_size]
            
            # 读取数据
            data = source.read_batch(batch)
            data_size = sum(len(v) for v in data.values())
            
            # 申请配额
            self.rate_limiter.acquire(data_size)
            
            # 写入目标节点
            target.write_batch(data)

7.3 自适应限流

更高级的策略是根据系统负载动态调整迁移速度:

class AdaptiveThrottle:
    def __init__(self, max_rate_mbps=200):
        self.max_rate = max_rate_mbps * 1024 * 1024
        self.current_rate = self.max_rate * 0.5  # 从 50% 开始
        
    def adjust_rate(self, system_metrics):
        """
        根据系统指标调整迁移速率
        """
        cpu_usage = system_metrics['cpu_usage']
        disk_util = system_metrics['disk_util']
        p99_latency = system_metrics['p99_latency_ms']
        
        # 如果系统压力大,降低迁移速率
        if cpu_usage > 0.8 or disk_util > 0.8 or p99_latency > 100:
            self.current_rate *= 0.9  # 降低 10%
        # 如果系统压力小,提高迁移速率
        elif cpu_usage < 0.5 and disk_util < 0.5 and p99_latency < 50:
            self.current_rate *= 1.1  # 提高 10%
        
        # 限制在合理范围内
        self.current_rate = max(10 * 1024 * 1024, self.current_rate)  # 至少 10MB/s
        self.current_rate = min(self.max_rate, self.current_rate)      # 不超过上限
        
        return self.current_rate

7.4 时间窗口控制

某些场景下,只允许在业务低峰期进行再平衡:

class TimeWindowScheduler:
    def __init__(self, allowed_hours):
        self.allowed_hours = allowed_hours  # 例如 [0, 1, 2, 3, 4, 5] 表示凌晨 0-6 点
        
    def is_allowed(self):
        """
        检查当前是否在允许的时间窗口内
        """
        current_hour = time.localtime().tm_hour
        return current_hour in self.allowed_hours
    
    def schedule_if_allowed(self, scheduler, cluster_info):
        """
        只在允许的时间窗口内执行调度
        """
        if self.is_allowed():
            return scheduler.schedule(cluster_info)
        else:
            return None

八、再平衡触发的检测启发式

在自动化再平衡系统中,准确判断”何时触发再平衡”与”如何执行再平衡”同样重要。过于敏感的触发条件会导致频繁的数据迁移(thrashing),过于迟钝则无法及时响应负载变化。

8.1 负载不均衡检测

最基本的触发条件是节点间负载差异超过阈值:

class ImbalanceDetector:
    def __init__(self, load_threshold=0.3, min_gap_abs=100):
        self.load_threshold = load_threshold
        self.min_gap_abs = min_gap_abs

    def should_rebalance(self, node_loads: dict) -> bool:
        """
        检测是否需要再平衡。
        node_loads: {node_id: load_score}
        load_threshold: 最大/最小负载比超过此值时触发。
        min_gap_abs: 最大与最小负载的绝对差值下限,避免低负载时误触发。
        """
        if len(node_loads) < 2:
            return False
        max_load = max(node_loads.values())
        min_load = min(node_loads.values())
        if min_load == 0:
            return max_load > self.min_gap_abs
        ratio = (max_load - min_load) / min_load
        return ratio > self.load_threshold and (max_load - min_load) > self.min_gap_abs

8.2 磁盘使用率阈值

当某个节点的磁盘使用率接近上限时,必须尽快将部分分区迁出,否则该节点可能因磁盘满而停止服务:

磁盘使用率 建议动作
< 70% 正常,无需干预
70% - 80% 警告,开始规划迁移
80% - 90% 紧急,立即触发自动再平衡
> 90% 危急,拒绝新写入 + 紧急迁移

8.3 手动 vs 自动触发

触发方式 适用场景 优势 风险
全自动 云原生环境、弹性伸缩 无需人工干预,响应快 可能在业务高峰误触发
半自动(推荐后人工确认) 生产环境主力方案 系统建议 + 人工审核 响应速度取决于值班人员
纯手动 关键金融系统、合规要求严格 完全可控 响应慢,依赖运维经验

多数生产系统采用半自动模式:系统根据检测指标生成再平衡计划,运维人员审核后执行。TiKV 的 PD 调度器支持通过 pd-ctl 暂停自动调度、手动添加调度算子,提供了灵活的控制能力。

flowchart TD
    A["集群状态采集"] --> B{"数据量大?<br/>单分区 > 阈值?"}
    B -->|是| C{"分区数多?<br/>> 预设上限?"}
    B -->|否| D{"负载不均衡?<br/>偏差 > 30%?"}
    C -->|是| E{"要求在线迁移?"}
    C -->|否| F["动态分区策略<br/>(自动分裂/合并)"]
    D -->|是| G["触发分区迁移"]
    D -->|否| H["维持现状"]
    E -->|是| I["两阶段在线迁移<br/>(双写 + 切换)"]
    E -->|否| J["离线批量迁移<br/>(停机窗口)"]

上述决策流程图展示了从集群状态采集到策略选择的完整路径。系统首先判断数据规模和分区数量,然后根据负载均衡状况决定是否需要触发再平衡。对于大规模在线系统,两阶段在线迁移是首选方案。

九、迁移安全性与回滚

数据迁移是一项高风险操作。网络抖动、节点故障、磁盘损坏等异常情况随时可能发生。生产级的再平衡系统必须在设计中内置安全保障机制。

9.1 分阶段迁移与速率控制

将数据迁移拆分为多个明确的阶段,每个阶段都有独立的验证和回滚点:

sequenceDiagram
    participant Scheduler as 调度器
    participant Source as 源节点
    participant Target as 目标节点
    participant Verifier as 校验器

    Scheduler->>Scheduler: 生成迁移计划
    Scheduler->>Source: 阶段1:准备(冻结分区快照)
    Source-->>Scheduler: 快照就绪
    Scheduler->>Source: 阶段2:数据传输(限速)
    Source->>Target: 批量传输数据(rate limit: 100MB/s)
    Target-->>Scheduler: 传输完成
    Scheduler->>Verifier: 阶段3:数据校验(checksum比对)
    Verifier-->>Scheduler: 校验通过
    Scheduler->>Scheduler: 阶段4:流量切换(更新路由)
    Scheduler->>Source: 阶段5:清理旧数据(延迟删除)
    Source-->>Scheduler: 清理完成

上述时序图展示了完整的分阶段迁移流程。关键设计要点包括:数据传输阶段使用速率限制避免打满网络带宽;传输完成后通过 checksum 校验保证数据完整性;流量切换和旧数据清理分开执行,为回滚保留时间窗口。

9.2 反压机制

当迁移对在线业务产生可感知的影响时,必须自动降速或暂停迁移:

class MigrationBackpressure:
    def __init__(self, latency_threshold_ms=100, cpu_threshold=0.85):
        self.latency_threshold_ms = latency_threshold_ms
        self.cpu_threshold = cpu_threshold
        self.paused = False

    def check_and_adjust(self, metrics: dict, rate_limiter) -> str:
        """
        根据业务指标动态调整迁移速率。
        metrics: {"p99_latency_ms": float, "cpu_usage": float, "error_rate": float}
        """
        if metrics["error_rate"] > 0.01:
            self.paused = True
            return "暂停迁移:错误率过高"

        if metrics["p99_latency_ms"] > self.latency_threshold_ms:
            rate_limiter.reduce_rate(0.5)
            return "降速50%:P99延迟超标"

        if metrics["cpu_usage"] > self.cpu_threshold:
            rate_limiter.reduce_rate(0.7)
            return "降速30%:CPU使用率过高"

        if not self.paused and metrics["p99_latency_ms"] < self.latency_threshold_ms * 0.5:
            rate_limiter.increase_rate(1.2)
            return "提速20%:系统负载健康"

        return "维持当前速率"

9.3 回滚机制

迁移失败时的回滚策略因阶段不同而异:

失败阶段 回滚动作 数据风险
数据传输中 丢弃目标节点上的部分数据,源节点继续服务
校验失败 删除目标节点数据,重新传输
流量切换后 将路由切回源节点,源数据仍保留 可能丢失切换期间的增量写入
旧数据清理后 无法回滚,需从副本恢复

关键原则:旧数据的清理必须延迟执行。在流量切换后,保留源节点上的旧数据至少一个观察期(通常 24-72 小时),确认新分区工作正常后再清理。

十、运维操作手册(Operator Playbook)

10.1 扩容操作流程

场景:集群负载接近容量上限,需要添加新节点。

  1. 准备阶段:部署新节点,确认网络连通性、磁盘容量、配置一致性。
  2. 加入集群:将新节点注册到集群元数据服务(如 TiKV 的 PD)。
  3. 等待调度:自动调度器会将部分分区迁移到新节点。监控迁移进度和业务指标。
  4. 验证:确认新节点的分区分布、负载水平、延迟指标均正常。
  5. 调整权重(可选):如果新节点硬件配置与旧节点不同,通过权重配置调整分区分配比例。

10.2 缩容操作流程

场景:业务低峰期缩减集群规模以节省成本。

  1. 标记下线:将目标节点标记为”待下线”(draining)状态,停止接收新分区。
  2. 迁出分区:自动调度器将该节点上的所有分区迁移到其他节点。
  3. 等待完成:监控迁移进度,确认所有分区已完全迁出。
  4. 校验零分区:确认目标节点上的分区数为零。
  5. 移除节点:从集群元数据中移除节点,关闭机器。

10.3 紧急故障处理

场景:某个节点突然宕机,部分分区不可用。

1. 确认故障范围:哪些分区受影响?是否有副本可用?
2. 如果有副本(如 Raft 多副本):
   a. 等待自动选主(通常 10-30 秒)
   b. 确认新 Leader 已选出,业务恢复
   c. 补充副本到目标副本数
3. 如果无副本(单副本部署):
   a. 尝试恢复故障节点
   b. 如果无法恢复,从备份恢复数据
   c. 将恢复的数据分配到健康节点
4. 事后复盘:分析故障原因,调整副本策略

10.4 常用监控告警规则

告警项 条件 严重级别 处理建议
节点间负载偏差 max/min > 2.0 Warning 检查是否需要手动触发再平衡
迁移任务超时 单任务 > 30min Warning 检查网络和磁盘状态
迁移失败 连续失败 > 3 次 Critical 暂停自动调度,人工排查
磁盘使用率 > 85% Critical 立即迁出分区或扩容
分区数偏差 最多/最少 > 3 倍 Warning 检查分区分配策略

十一、三种策略的对比

下面是固定分区、动态分区、按比例分区三种策略的详细对比:

维度 固定分区 动态分区 按比例分区
代表系统 Riak, Elasticsearch, Couchbase HBase, TiKV, RethinkDB Cassandra
分区数量 创建时固定,难以修改 根据数据量动态调整 正比于节点数量
分区大小 随数据增长而增长 保持在配置的范围内 相对稳定
初始化 需要预估合适的分区数 单分区启动,可预分裂 每个节点固定 vnode 数
扩容操作 移动整个分区 无需操作,自动分裂 触发分裂,新节点接管一半
缩容操作 移动整个分区 无需操作,自动合并 删除 vnode,现有节点接管
迁移粒度 整个分区(可能较大) 新分裂的子分区(较小) 单个 vnode(较小)
元数据复杂度 低(分区数固定) 中(分区数变化) 高(大量 vnode)
实现复杂度 简单 中等(需要分裂/合并逻辑) 中等(需要哈希环管理)
范围查询 友好(如果按范围分区) 友好 不友好(数据分散)
热点处理 依赖分区分配策略 可以分裂热点分区 通过 vnode 分散热点
适用场景 数据量可预测,分区数合理 数据量不可预测,需要自适应 节点频繁变化,需要快速再平衡

选择建议

  1. 固定分区:适合数据量增长可预测、集群规模相对稳定的场景。如果能合理估算分区数量,这是最简单的方案。

  2. 动态分区:适合数据量增长不可预测、需要自动适应的场景。特别适合大规模存储系统,能够随着数据增长自动扩展。

  3. 按比例分区:适合节点频繁加入/离开、需要快速再平衡的场景。特别适合云环境下的弹性伸缩。

十二、代码示例:完整的再平衡模拟器

下面是一个完整的再平衡模拟器,实现了三种策略:

import random
import hashlib
from typing import List, Dict, Set
from dataclasses import dataclass

@dataclass
class Node:
    id: str
    capacity_mb: int
    used_mb: int = 0
    partitions: Set[int] = None
    
    def __post_init__(self):
        if self.partitions is None:
            self.partitions = set()
    
    def available_mb(self):
        return self.capacity_mb - self.used_mb
    
    def usage_ratio(self):
        return self.used_mb / self.capacity_mb if self.capacity_mb > 0 else 0

@dataclass
class Partition:
    id: int
    size_mb: int
    qps: int
    key_range: tuple  # (start, end)

class RebalanceSimulator:
    def __init__(self, strategy='fixed'):
        self.strategy = strategy
        self.nodes: Dict[str, Node] = {}
        self.partitions: Dict[int, Partition] = {}
        self.partition_to_node: Dict[int, str] = {}
        
    def add_node(self, node: Node):
        self.nodes[node.id] = node
        print(f"添加节点 {node.id}, 容量 {node.capacity_mb}MB")
        
    def remove_node(self, node_id: str):
        if node_id not in self.nodes:
            return
        
        # 将该节点的分区迁移到其他节点
        partitions_to_move = list(self.nodes[node_id].partitions)
        for partition_id in partitions_to_move:
            self.move_partition(partition_id, node_id, self.select_target_node(partition_id))
        
        del self.nodes[node_id]
        print(f"移除节点 {node_id}")
        
    def add_partition(self, partition: Partition):
        self.partitions[partition.id] = partition
        # 选择负载最轻的节点
        target_node = self.select_target_node(partition.id)
        self.assign_partition(partition.id, target_node)
        
    def assign_partition(self, partition_id: int, node_id: str):
        partition = self.partitions[partition_id]
        node = self.nodes[node_id]
        
        self.partition_to_node[partition_id] = node_id
        node.partitions.add(partition_id)
        node.used_mb += partition.size_mb
        
    def move_partition(self, partition_id: int, from_node_id: str, to_node_id: str):
        if from_node_id == to_node_id:
            return
            
        partition = self.partitions[partition_id]
        from_node = self.nodes[from_node_id]
        to_node = self.nodes[to_node_id]
        
        # 从源节点移除
        from_node.partitions.remove(partition_id)
        from_node.used_mb -= partition.size_mb
        
        # 添加到目标节点
        to_node.partitions.add(partition_id)
        to_node.used_mb += partition.size_mb
        
        self.partition_to_node[partition_id] = to_node_id
        
        print(f"迁移分区 {partition_id} ({partition.size_mb}MB) "
              f"从 {from_node_id}{to_node_id}")
        
    def select_target_node(self, partition_id: int) -> str:
        """
        选择分区的目标节点
        """
        # 排除已经有这个分区的节点
        current_node = self.partition_to_node.get(partition_id)
        
        # 选择使用率最低的节点
        min_usage = float('inf')
        target = None
        
        for node_id, node in self.nodes.items():
            if node_id == current_node:
                continue
            if node.usage_ratio() < min_usage:
                min_usage = node.usage_ratio()
                target = node_id
        
        return target
        
    def rebalance(self):
        """
        执行再平衡
        """
        print(f"\n=== 开始再平衡 (策略: {self.strategy}) ===")
        
        if self.strategy == 'fixed':
            self.rebalance_fixed()
        elif self.strategy == 'dynamic':
            self.rebalance_dynamic()
        elif self.strategy == 'proportional':
            self.rebalance_proportional()
            
        self.print_status()
        
    def rebalance_fixed(self):
        """
        固定分区策略:移动整个分区以平衡负载
        """
        # 计算平均使用率
        total_used = sum(node.used_mb for node in self.nodes.values())
        total_capacity = sum(node.capacity_mb for node in self.nodes.values())
        avg_usage = total_used / total_capacity
        
        # 找到过载和空闲的节点
        overloaded = [node for node in self.nodes.values() if node.usage_ratio() > avg_usage * 1.2]
        underloaded = [node for node in self.nodes.values() if node.usage_ratio() < avg_usage * 0.8]
        
        # 从过载节点迁移分区到空闲节点
        for over_node in overloaded:
            for under_node in underloaded:
                # 选择合适大小的分区
                for partition_id in list(over_node.partitions):
                    partition = self.partitions[partition_id]
                    
                    if under_node.available_mb() >= partition.size_mb:
                        self.move_partition(partition_id, over_node.id, under_node.id)
                        
                        # 检查是否已经平衡
                        if over_node.usage_ratio() <= avg_usage * 1.1:
                            break
                            
                if over_node.usage_ratio() <= avg_usage * 1.1:
                    break
                    
    def rebalance_dynamic(self):
        """
        动态分区策略:分裂大分区,合并小分区
        """
        split_threshold = 100  # MB
        merge_threshold = 20   # MB
        
        # 检查分裂
        to_split = []
        for partition_id, partition in self.partitions.items():
            if partition.size_mb > split_threshold:
                to_split.append(partition_id)
        
        for partition_id in to_split:
            self.split_partition(partition_id)
        
        # 检查合并
        sorted_partitions = sorted(self.partitions.values(), key=lambda p: p.key_range[0])
        i = 0
        while i < len(sorted_partitions) - 1:
            p1 = sorted_partitions[i]
            p2 = sorted_partitions[i + 1]
            
            if (p1.size_mb < merge_threshold and p2.size_mb < merge_threshold and
                p1.key_range[1] == p2.key_range[0]):
                self.merge_partitions(p1.id, p2.id)
                sorted_partitions = sorted(self.partitions.values(), key=lambda p: p.key_range[0])
            else:
                i += 1
                
    def split_partition(self, partition_id: int):
        """
        分裂分区
        """
        partition = self.partitions[partition_id]
        node_id = self.partition_to_node[partition_id]
        
        # 找到分裂点
        start, end = partition.key_range
        mid = (start + end) // 2
        
        # 创建两个新分区
        new_id_1 = max(self.partitions.keys()) + 1
        new_id_2 = new_id_1 + 1
        
        p1 = Partition(new_id_1, partition.size_mb // 2, partition.qps // 2, (start, mid))
        p2 = Partition(new_id_2, partition.size_mb // 2, partition.qps // 2, (mid, end))
        
        # 移除旧分区
        node = self.nodes[node_id]
        node.partitions.remove(partition_id)
        node.used_mb -= partition.size_mb
        del self.partitions[partition_id]
        del self.partition_to_node[partition_id]
        
        # 添加新分区
        self.partitions[new_id_1] = p1
        self.partitions[new_id_2] = p2
        self.assign_partition(new_id_1, node_id)
        
        # 第二个分区可能分配到其他节点以平衡负载
        target = self.select_target_node(new_id_2)
        self.assign_partition(new_id_2, target)
        
        print(f"分裂分区 {partition_id} ({partition.size_mb}MB) "
              f"-> {new_id_1}{new_id_2} (各 {partition.size_mb//2}MB)")
        
    def merge_partitions(self, partition_id_1: int, partition_id_2: int):
        """
        合并分区
        """
        p1 = self.partitions[partition_id_1]
        p2 = self.partitions[partition_id_2]
        
        # 创建合并后的分区
        new_id = max(self.partitions.keys()) + 1
        merged = Partition(
            new_id,
            p1.size_mb + p2.size_mb,
            p1.qps + p2.qps,
            (p1.key_range[0], p2.key_range[1])
        )
        
        # 移除旧分区
        for pid in [partition_id_1, partition_id_2]:
            node_id = self.partition_to_node[pid]
            node = self.nodes[node_id]
            node.partitions.remove(pid)
            node.used_mb -= self.partitions[pid].size_mb
            del self.partition_to_node[pid]
            del self.partitions[pid]
        
        # 添加新分区
        self.partitions[new_id] = merged
        target = self.select_target_node(new_id)
        self.assign_partition(new_id, target)
        
        print(f"合并分区 {partition_id_1}{partition_id_2} -> {new_id} ({merged.size_mb}MB)")
        
    def rebalance_proportional(self):
        """
        按比例分区策略:保持每个节点的分区数量一致
        """
        target_partitions_per_node = len(self.partitions) // len(self.nodes)
        
        overloaded = [node for node in self.nodes.values() 
                      if len(node.partitions) > target_partitions_per_node * 1.2]
        underloaded = [node for node in self.nodes.values() 
                       if len(node.partitions) < target_partitions_per_node * 0.8]
        
        for over_node in overloaded:
            for under_node in underloaded:
                while (len(over_node.partitions) > target_partitions_per_node and
                       len(under_node.partitions) < target_partitions_per_node):
                    # 随机选择一个分区迁移
                    partition_id = random.choice(list(over_node.partitions))
                    self.move_partition(partition_id, over_node.id, under_node.id)
                    
    def print_status(self):
        """
        打印集群状态
        """
        print("\n=== 集群状态 ===")
        for node_id, node in sorted(self.nodes.items()):
            print(f"{node_id}: {node.used_mb}/{node.capacity_mb}MB "
                  f"({node.usage_ratio()*100:.1f}%), "
                  f"{len(node.partitions)} 个分区")

# 测试不同策略
def test_rebalance():
    for strategy in ['fixed', 'dynamic', 'proportional']:
        print(f"\n{'='*60}")
        print(f"测试策略: {strategy}")
        print(f"{'='*60}")
        
        sim = RebalanceSimulator(strategy=strategy)
        
        # 初始 3 个节点
        sim.add_node(Node('node1', 1000))
        sim.add_node(Node('node2', 1000))
        sim.add_node(Node('node3', 1000))
        
        # 添加一些分区
        for i in range(20):
            size = random.randint(30, 150)
            qps = random.randint(100, 1000)
            partition = Partition(i, size, qps, (i*1000, (i+1)*1000))
            sim.add_partition(partition)
        
        print("\n初始状态:")
        sim.print_status()
        
        # 添加新节点触发再平衡
        sim.add_node(Node('node4', 1000))
        sim.rebalance()

if __name__ == '__main__':
    test_rebalance()

参考文献

  1. Lakshman, Avinash, and Prashant Malik. “Cassandra: a decentralized structured storage system.” ACM SIGOPS Operating Systems Review 44.2 (2010): 35-40.

  2. DeCandia, Giuseppe, et al. “Dynamo: amazon’s highly available key-value store.” ACM SIGOPS Operating Systems Review 41.6 (2007): 205-220.

  3. Chang, Fay, et al. “Bigtable: A distributed storage system for structured data.” ACM Transactions on Computer Systems (TOCS) 26.2 (2008): 1-26.

  4. Huang, Dongxu, et al. “TiDB: a Raft-based HTAP database.” Proceedings of the VLDB Endowment 13.12 (2020): 3072-3084.

  5. Kleppmann, Martin. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 6: Partitioning.

  6. Corbett, James C., et al. “Spanner: Google’s globally distributed database.” ACM Transactions on Computer Systems (TOCS) 31.3 (2013): 1-22.

  7. TiKV Documentation: “PD Scheduling”. https://tikv.org/docs/latest/concepts/scheduling/

  8. Elasticsearch Documentation: “Shard allocation and cluster-level routing”. https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cluster.html


本文是第五部分(分区:数据太大,一台放不下)的最后一篇文章。从下一篇开始,我们将进入第六部分:分布式事务,探讨如何在分布式环境中保证多个操作的原子性。


上一篇:二级索引 | 下一篇:2PC 的真实失败模式

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13

【分布式系统百科】Dynamo 论文精读:最终一致性的工业级范本

2007 年,Amazon 在 SOSP 会议上发表了《Dynamo: Amazon's Highly Available Key-value Store》论文,这篇论文彻底改变了分布式存储系统的设计思路。与追求强一致性的传统数据库不同,Dynamo 选择了一条完全不同的道路:牺牲一致性,换取可用性和分区容错性。这个设…


By .