在上一篇文章中,我们讨论了分布式系统中的二级索引问题。本文将深入探讨数据再平衡(Rebalancing)的核心策略和实现细节。当分布式系统运行一段时间后,数据分布可能会变得不均匀,节点可能会加入或离开集群,这时就需要再平衡机制来重新分配数据,保证系统的负载均衡和高可用性。
一、为什么需要数据再平衡
在理想情况下,我们希望数据能够均匀分布在所有节点上,每个节点处理相同数量的请求。但在实际运行中,这种完美的均衡很难维持:
1.1 节点变更
集群规模不是静态的。在生产环境中,我们经常需要:
- 增加节点:随着业务增长,数据量和访问量增加,需要添加新的机器来扩展容量
- 移除节点:某些节点可能因为硬件故障而下线,或者在业务低峰期缩减容量以节省成本
- 替换节点:硬件升级或者机器老化时,需要用新机器替换旧机器
当集群拓扑发生变化时,原有的数据分布方案可能不再适用。例如,如果使用简单的哈希取模(hash(key) % N)来分配数据,当节点数
N
变化时,几乎所有的数据都需要重新分配,这会导致大规模的数据迁移。
1.2 数据倾斜
即使节点数量不变,数据分布也可能随时间变得不均匀:
- 热点数据:某些 key 的访问频率远高于其他 key。例如,社交网络中的明星账号、电商平台的爆款商品
- 增长不均:不同分区的数据增长速度不同。例如,按时间分区的日志系统,最新的分区增长最快
- 删除不均:某些分区的数据被大量删除,导致分区大小差异
数据倾斜会导致某些节点负载过高,成为系统的瓶颈,而其他节点资源利用率低。
1.3 硬件异构性
在大规模集群中,不同机器的硬件配置可能不同:
- 旧节点与新节点共存:新采购的机器往往配置更高(CPU 更快、内存更大、SSD 更快)
- 不同用途的机器:某些节点可能配置了 SSD,而另一些使用 HDD
- 云环境的资源差异:在云环境中,可能混合使用不同规格的实例
理想的再平衡策略应该能够识别硬件差异,给高配置的节点分配更多的数据和负载。
1.4 再平衡的目标
一个好的再平衡机制需要满足以下目标:
- 负载均衡:尽量让每个节点的负载(数据量、请求量、CPU、内存、磁盘 I/O)都处于合理范围
- 最小化迁移量:数据迁移是有成本的(网络带宽、磁盘 I/O),应该尽量减少迁移的数据量
- 保持可用性:再平衡过程中,系统应该继续对外提供服务,不能因为迁移而中断
- 可配置性:不同的业务场景对负载均衡的要求不同,应该提供灵活的配置选项
- 故障隔离:再平衡过程中如果某个节点失败,不应该影响整个集群
二、固定分区数策略
固定分区数(Fixed Partition Count)是最简单直观的再平衡策略,被 Riak、Elasticsearch、Couchbase 等系统采用。
2.1 核心思想
在集群创建时,预先分配一个固定数量的分区(Partition),这个数量通常远大于节点数。例如:
- 10 个节点的集群可能创建 1000 个分区
- 每个节点负责约 100 个分区
- 当节点加入或离开时,只需要移动分区的所有权,不需要拆分或合并分区
假设我们有一个 3 节点的集群,预先创建了 12 个分区:
节点 A: [P0, P1, P2, P3]
节点 B: [P4, P5, P6, P7]
节点 C: [P8, P9, P10, P11]
当新增一个节点 D 时,可以从每个现有节点转移 1 个分区:
节点 A: [P0, P1, P2]
节点 B: [P4, P5, P6]
节点 C: [P8, P9, P10]
节点 D: [P3, P7, P11]
2.2 分区数量的选择
分区数量的选择是一个重要的决策,需要平衡多个因素:
分区太少的问题: - 再平衡的粒度太粗,难以实现精细的负载均衡 - 单个分区过大,迁移时间长,影响系统可用性 - 无法充分利用多核 CPU 的并行能力
分区太多的问题: - 每个分区都有元数据开销(内存、文件描述符、管理结构) - 分区间的协调成本增加 - 某些操作(如集群范围的扫描)需要遍历所有分区
经验法则: - Elasticsearch 默认每个索引创建 5 个主分片(Shard),每个主分片可以有多个副本 - Riak 推荐 64 到 1024 个分区(Ring Size),具体取决于集群规模 - 一般建议分区数是节点数的 10-100 倍
2.3 分区分配算法
当需要重新分配分区时,如何决定哪些分区应该移动到哪里?一个简单的策略是:
class FixedPartitionRebalancer:
def __init__(self, num_partitions, num_replicas):
self.num_partitions = num_partitions
self.num_replicas = num_replicas
def assign_partitions(self, nodes):
"""
将分区均匀分配给节点
返回: {partition_id: [node_list]}
"""
if len(nodes) == 0:
return {}
assignments = {}
nodes_list = sorted(nodes)
for partition_id in range(self.num_partitions):
# 为每个分区选择 num_replicas 个节点
replicas = []
for replica_idx in range(self.num_replicas):
# 使用一致性的方式选择节点
node_idx = (partition_id * self.num_replicas + replica_idx) % len(nodes_list)
replicas.append(nodes_list[node_idx])
assignments[partition_id] = replicas
return assignments
def compute_moves(self, old_assignment, new_assignment):
"""
计算需要移动的分区
返回: [(partition_id, from_node, to_node)]
"""
moves = []
for partition_id in new_assignment:
old_nodes = set(old_assignment.get(partition_id, []))
new_nodes = set(new_assignment[partition_id])
# 找出需要添加的副本
to_add = new_nodes - old_nodes
# 找出需要删除的副本
to_remove = old_nodes - new_nodes
# 配对删除和添加,形成移动操作
to_add_list = list(to_add)
to_remove_list = list(to_remove)
for i in range(min(len(to_add_list), len(to_remove_list))):
moves.append((partition_id, to_remove_list[i], to_add_list[i]))
return moves
# 使用示例
rebalancer = FixedPartitionRebalancer(num_partitions=256, num_replicas=3)
# 初始 4 个节点
old_nodes = ['node1', 'node2', 'node3', 'node4']
old_assignment = rebalancer.assign_partitions(old_nodes)
# 扩容到 6 个节点
new_nodes = ['node1', 'node2', 'node3', 'node4', 'node5', 'node6']
new_assignment = rebalancer.assign_partitions(new_nodes)
# 计算需要迁移的分区
moves = rebalancer.compute_moves(old_assignment, new_assignment)
print(f"需要迁移 {len(moves)} 个分区副本")2.4 优势与劣势
优势:
- 简单易懂:分区的概念清晰,实现相对简单
- 可预测性:分区数量固定,系统行为容易预测和监控
- 迁移效率高:只需要移动整个分区,不需要拆分或合并操作
- 并发度高:多个分区可以同时迁移,充分利用网络带宽
劣势:
- 分区数难以改变:一旦确定分区数量,后续很难修改。如果初始选择不当,可能导致:
- 分区太少:无法充分扩展
- 分区太多:元数据开销过大
- 分区大小随数据增长:随着数据量增加,每个分区也会变大,最终可能导致单个分区过大
- 空集群冷启动问题:在集群初始化时,所有分区都是空的,无法体现出负载均衡的优势
2.5 Elasticsearch 的实现
Elasticsearch 是固定分区策略的典型代表。每个索引(Index)在创建时指定主分片(Primary Shard)数量,这个数量在索引创建后无法修改:
PUT /my_index
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
}
}这意味着索引 my_index 有 5
个主分片,每个主分片有 1 个副本,总共 10 个分片。
当集群中添加新节点时,Elasticsearch 会自动将部分分片迁移到新节点:
迁移前:
节点1: [P0, P1, R2]
节点2: [P2, P3, R0]
节点3: [P4, R1, R3, R4]
迁移后 (新增节点4):
节点1: [P0, R2]
节点2: [P2, P3]
节点3: [P4, R1]
节点4: [R0, R3, R4]
Elasticsearch 的分片分配器(Shard Allocator)会综合考虑多个因素:
- 每个节点的分片数量应该尽量相等
- 同一分片的主副本不能在同一节点上
- 考虑节点的磁盘使用率
- 遵守分配感知(Allocation Awareness)规则
三、动态分区策略
动态分区(Dynamic Partitioning)策略会根据数据量的变化自动调整分区的数量和大小,代表系统包括 HBase、TiKV、RethinkDB。
3.1 核心思想
与固定分区不同,动态分区在运行时可以:
- 分裂(Split):当分区的数据量超过阈值时,将其分裂成两个更小的分区
- 合并(Merge):当分区的数据量低于阈值时,将其与相邻分区合并
这类似于 B+ 树的节点分裂和合并操作。
3.2 分裂操作
当一个分区的大小超过阈值(例如 64MB 或 96MB)时,系统会将其分裂成两个分区。分裂点的选择有几种策略:
中点分裂: 找到分区中间的 key,以此为界分裂:
原分区: [a, ..., m, ..., z]
分裂后:
分区1: [a, ..., m)
分区2: [m, ..., z]
负载分裂: 根据访问热度选择分裂点,将热点 key 隔离到单独的分区:
原分区: [key1: 100 QPS, key2: 10000 QPS, key3: 50 QPS, ...]
分裂后:
分区1: [key1, key2) # 包含普通 key
分区2: [key2, key3) # 隔离热点 key
分区3: [key3, ...]
3.3 合并操作
当一个分区的大小低于阈值(例如 32MB)时,可以将其与相邻分区合并。合并操作需要满足一些条件:
- 两个分区在 key 空间上是相邻的
- 合并后的分区大小不会超过上限
- 合并不会导致热点集中
class DynamicPartition:
def __init__(self, start_key, end_key, size_mb, qps):
self.start_key = start_key
self.end_key = end_key
self.size_mb = size_mb
self.qps = qps
class DynamicPartitionManager:
def __init__(self, split_threshold_mb=96, merge_threshold_mb=32):
self.split_threshold_mb = split_threshold_mb
self.merge_threshold_mb = merge_threshold_mb
self.partitions = []
def check_split(self, partition):
"""
检查是否需要分裂
"""
if partition.size_mb > self.split_threshold_mb:
# 找到中点 key
mid_key = self.find_median_key(partition)
# 创建两个新分区
left = DynamicPartition(
partition.start_key,
mid_key,
partition.size_mb / 2,
partition.qps / 2
)
right = DynamicPartition(
mid_key,
partition.end_key,
partition.size_mb / 2,
partition.qps / 2
)
return (left, right)
return None
def check_merge(self, partition1, partition2):
"""
检查两个相邻分区是否可以合并
"""
if partition1.end_key != partition2.start_key:
return None # 不相邻
total_size = partition1.size_mb + partition2.size_mb
# 只有两个都很小,或者合并后不超过阈值时才合并
if (partition1.size_mb < self.merge_threshold_mb and
partition2.size_mb < self.merge_threshold_mb and
total_size < self.split_threshold_mb):
merged = DynamicPartition(
partition1.start_key,
partition2.end_key,
total_size,
partition1.qps + partition2.qps
)
return merged
return None
def find_median_key(self, partition):
"""
找到分区的中位数 key (简化实现)
"""
# 实际实现需要扫描分区数据
# 这里简化为中点
if isinstance(partition.start_key, str) and isinstance(partition.end_key, str):
# 字符串的中点
return partition.start_key + (partition.end_key - partition.start_key) // 2
return (partition.start_key + partition.end_key) // 2
def rebalance(self):
"""
执行再平衡:检查所有分区,执行必要的分裂和合并
"""
i = 0
while i < len(self.partitions):
partition = self.partitions[i]
# 检查分裂
split_result = self.check_split(partition)
if split_result:
left, right = split_result
print(f"分裂分区 [{partition.start_key}, {partition.end_key}) "
f"-> [{left.start_key}, {left.end_key}) 和 "
f"[{right.start_key}, {right.end_key})")
self.partitions[i] = left
self.partitions.insert(i + 1, right)
i += 2
continue
# 检查合并
if i + 1 < len(self.partitions):
next_partition = self.partitions[i + 1]
merge_result = self.check_merge(partition, next_partition)
if merge_result:
print(f"合并分区 [{partition.start_key}, {partition.end_key}) 和 "
f"[{next_partition.start_key}, {next_partition.end_key}) "
f"-> [{merge_result.start_key}, {merge_result.end_key})")
self.partitions[i] = merge_result
self.partitions.pop(i + 1)
continue
i += 1
# 使用示例
manager = DynamicPartitionManager()
manager.partitions = [
DynamicPartition(0, 100, 120, 1000), # 太大,需要分裂
DynamicPartition(100, 200, 20, 100), # 正常
DynamicPartition(200, 250, 15, 50), # 太小
DynamicPartition(250, 280, 18, 60), # 太小,可以与前一个合并
]
manager.rebalance()3.4 初始分区问题与预分裂
动态分区策略有一个显著的问题:在集群刚创建时,通常只有一个分区覆盖整个 key 空间。这导致:
- 所有写入都打到单个节点,无法利用多节点并发
- 必须等待第一个分区增长到阈值才能分裂
- 在数据量较小时,无法体现分布式系统的优势
为了解决这个问题,许多系统支持预分裂(Pre-splitting):
def pre_split(key_range, num_partitions):
"""
预先将 key 空间分裂成多个分区
"""
start, end = key_range
step = (end - start) // num_partitions
partitions = []
for i in range(num_partitions):
partition_start = start + i * step
partition_end = start + (i + 1) * step if i < num_partitions - 1 else end
partitions.append(DynamicPartition(partition_start, partition_end, 0, 0))
return partitions
# 将 key 空间 [0, 1000000) 预分裂成 10 个分区
initial_partitions = pre_split((0, 1000000), 10)在 HBase 中,可以在创建表时指定预分裂点:
byte[][] splits = new byte[][] {
Bytes.toBytes("row_100"),
Bytes.toBytes("row_200"),
Bytes.toBytes("row_300"),
// ...
};
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("mytable"));
admin.createTable(desc, splits);3.5 优势与劣势
优势:
- 自适应:分区数量和大小会根据实际数据量自动调整
- 适合数据增长:随着数据增长,分区数量增加,始终保持每个分区的大小在合理范围
- 空集群友好:可以通过预分裂解决初始化问题
劣势:
- 协调开销:分裂和合并操作需要协调多个节点,增加了系统复杂度
- 元数据管理:分区数量动态变化,元数据管理更复杂
- 分裂时的性能抖动:分裂操作可能需要扫描整个分区,影响正常请求的延迟
四、按比例分区策略
按比例分区(Proportional Partitioning)是 Cassandra 采用的策略,它试图结合固定分区和动态分区的优点。
4.1 核心思想
每个节点负责固定数量的虚拟节点(Virtual Node, vnode)或分区。当集群规模变化时:
- 添加节点:新节点会从现有节点随机选择一些分区,将其分裂,并接管一半
- 移除节点:该节点的分区会被分配给其他节点,并可能与现有分区合并
假设每个节点负责 256 个 vnode:
节点数 = 4, 总 vnode = 1024
节点数 = 5, 总 vnode = 1280 (新节点触发 256 次分裂)
节点数 = 6, 总 vnode = 1536
4.2 Cassandra 的实现
Cassandra 使用一致性哈希(Consistent Hashing)环,每个 vnode 对应环上的一个 token:
import hashlib
import bisect
class VirtualNode:
def __init__(self, node_id, token, vnode_index):
self.node_id = node_id
self.token = token
self.vnode_index = vnode_index
def __repr__(self):
return f"VNode({self.node_id}, token={self.token}, idx={self.vnode_index})"
class ConsistentHashRing:
def __init__(self, vnodes_per_node=256):
self.vnodes_per_node = vnodes_per_node
self.ring = [] # 有序的 vnode 列表
self.nodes = set()
def add_node(self, node_id):
"""
添加新节点:为其生成 vnodes_per_node 个 token
"""
if node_id in self.nodes:
return
self.nodes.add(node_id)
for i in range(self.vnodes_per_node):
# 生成确定性的 token
token_str = f"{node_id}:{i}"
token = int(hashlib.md5(token_str.encode()).hexdigest(), 16)
vnode = VirtualNode(node_id, token, i)
bisect.insort(self.ring, (token, vnode))
print(f"节点 {node_id} 加入,创建了 {self.vnodes_per_node} 个 vnode")
print(f"总 vnode 数: {len(self.ring)}")
def remove_node(self, node_id):
"""
移除节点:删除其所有 vnode
"""
if node_id not in self.nodes:
return
self.nodes.remove(node_id)
self.ring = [(token, vnode) for token, vnode in self.ring
if vnode.node_id != node_id]
print(f"节点 {node_id} 移除,删除了 {self.vnodes_per_node} 个 vnode")
def get_node(self, key):
"""
根据 key 找到负责的节点
"""
if not self.ring:
return None
key_hash = int(hashlib.md5(key.encode()).hexdigest(), 16)
# 找到第一个 token >= key_hash 的 vnode
idx = bisect.bisect_left([token for token, _ in self.ring], key_hash)
if idx == len(self.ring):
idx = 0 # 环形结构
return self.ring[idx][1].node_id
def get_distribution(self):
"""
统计每个节点负责的 vnode 数量
"""
distribution = {}
for _, vnode in self.ring:
distribution[vnode.node_id] = distribution.get(vnode.node_id, 0) + 1
return distribution
# 使用示例
ring = ConsistentHashRing(vnodes_per_node=256)
# 初始 3 个节点
ring.add_node("node1")
ring.add_node("node2")
ring.add_node("node3")
print("\n初始分布:", ring.get_distribution())
# 添加第 4 个节点
ring.add_node("node4")
print("\n添加 node4 后分布:", ring.get_distribution())
# 测试 key 分布
keys = [f"key_{i}" for i in range(10000)]
node_counts = {}
for key in keys:
node = ring.get_node(key)
node_counts[node] = node_counts.get(node, 0) + 1
print("\n10000 个 key 的分布:")
for node, count in sorted(node_counts.items()):
print(f" {node}: {count} keys ({count/100:.1f}%)")4.3 优势与劣势
优势:
- 分区大小有界:每个节点的 vnode 数量固定,因此每个 vnode 的平均大小相对稳定
- 灵活性高:添加/移除节点时,只影响相邻的 vnode
- 随机性好:通过哈希函数,数据分布更加随机,避免了人为选择分裂点的偏差
劣势:
- 分裂随机性:当添加节点时,哪些分区被分裂是随机的,可能不是最优选择
- 元数据复杂:vnode 数量多,元数据管理开销大
- 范围查询不友好:由于数据被随机分散,范围查询需要访问多个节点
五、再平衡与一致性
再平衡过程中,数据正在从一个节点迁移到另一个节点。这期间系统需要保证:
- 一致性:读操作应该能看到最新写入的数据
- 可用性:读写操作不应该被阻塞
- 不重不丢:每条数据都应该被迁移一次,不能丢失也不能重复
5.1 两阶段迁移
大多数系统采用两阶段方法:
阶段 1:复制数据(Copy Phase)
- 源节点继续服务读写请求
- 在后台将数据逐步复制到目标节点
- 记录这期间的增量变更(可以通过日志或版本号)
阶段 2:切换(Switchover Phase)
- 短暂阻塞写操作(或使用双写)
- 复制增量变更到目标节点
- 更新元数据,将分区的所有权转移到目标节点
- 恢复正常服务
class PartitionMigration:
def __init__(self, partition_id, source_node, target_node):
self.partition_id = partition_id
self.source_node = source_node
self.target_node = target_node
self.state = "init"
self.copied_keys = set()
self.pending_writes = []
def phase1_copy(self):
"""
阶段 1: 复制数据
"""
print(f"开始复制分区 {self.partition_id} 从 {self.source_node} 到 {self.target_node}")
self.state = "copying"
# 获取源节点的所有 key
keys = self.source_node.get_keys(self.partition_id)
# 批量复制数据
batch_size = 1000
for i in range(0, len(keys), batch_size):
batch = keys[i:i+batch_size]
# 读取源节点数据
data = self.source_node.read_batch(batch)
# 写入目标节点
self.target_node.write_batch(data)
# 记录已复制的 key
self.copied_keys.update(batch)
print(f" 已复制 {len(self.copied_keys)}/{len(keys)} 个 key")
print(f"分区 {self.partition_id} 数据复制完成")
def phase2_switchover(self):
"""
阶段 2: 切换
"""
print(f"开始切换分区 {self.partition_id}")
self.state = "switching"
# 1. 短暂阻塞写操作(设置只读模式)
self.source_node.set_readonly(self.partition_id)
# 2. 复制增量数据(在只读期间产生的写请求)
incremental_data = self.source_node.get_incremental_writes(self.partition_id)
if incremental_data:
self.target_node.write_batch(incremental_data)
print(f" 复制了 {len(incremental_data)} 个增量写入")
# 3. 更新元数据服务器:分区所有权转移
metadata_service.update_partition_owner(self.partition_id, self.target_node)
# 4. 目标节点开始服务
self.target_node.set_active(self.partition_id)
# 5. 源节点停止服务
self.source_node.remove_partition(self.partition_id)
self.state = "completed"
print(f"分区 {self.partition_id} 切换完成")
def execute(self):
"""
执行完整的迁移流程
"""
try:
self.phase1_copy()
self.phase2_switchover()
return True
except Exception as e:
print(f"迁移失败: {e}")
self.state = "failed"
return False5.2 读请求的处理
在迁移期间,读请求应该如何处理?有几种策略:
策略 1:总是读源节点 - 简单,但迁移期间源节点压力大
策略 2:总是读目标节点 - 需要等待阶段 1 完成 - 目标节点可能数据不完整
策略 3:双读(Read-Both) - 同时读源节点和目标节点,取最新的结果 - 兼顾一致性,但增加了延迟
策略 4:路由表版本号 - 给每个分区分配一个版本号(Epoch) - 读请求带上版本号,节点检查自己是否是该版本的 owner
class PartitionRouter:
def __init__(self):
self.routing_table = {} # {partition_id: (node, epoch)}
self.current_epoch = 0
def get_node(self, partition_id):
"""
获取分区的当前负责节点
"""
return self.routing_table.get(partition_id)
def start_migration(self, partition_id, source_node, target_node):
"""
开始迁移:增加 epoch,设置双节点模式
"""
self.current_epoch += 1
self.routing_table[partition_id] = {
'source': source_node,
'target': target_node,
'epoch': self.current_epoch,
'state': 'migrating'
}
def finish_migration(self, partition_id, target_node):
"""
完成迁移:切换到目标节点
"""
self.current_epoch += 1
self.routing_table[partition_id] = {
'source': None,
'target': target_node,
'epoch': self.current_epoch,
'state': 'active'
}
def read(self, partition_id, key):
"""
读取数据,处理迁移期间的情况
"""
info = self.routing_table.get(partition_id)
if not info:
raise Exception(f"分区 {partition_id} 不存在")
if info['state'] == 'active':
# 正常状态:读目标节点
return info['target'].read(key)
elif info['state'] == 'migrating':
# 迁移中:优先读源节点,如果源节点没有则读目标节点
value = info['source'].read(key)
if value is None:
value = info['target'].read(key)
return value
def write(self, partition_id, key, value):
"""
写入数据,处理迁移期间的情况
"""
info = self.routing_table.get(partition_id)
if not info:
raise Exception(f"分区 {partition_id} 不存在")
if info['state'] == 'active':
# 正常状态:写目标节点
info['target'].write(key, value)
elif info['state'] == 'migrating':
# 迁移中:双写
info['source'].write(key, value)
info['target'].write(key, value)5.3 幂等性保证
网络不可靠,迁移操作可能失败重试。系统需要保证操作的幂等性:
- 复制操作幂等:重复复制同一个 key 不应该导致错误
- 切换操作幂等:重复切换不应该导致数据不一致
- 使用版本号或校验和:确保写入的数据是最新的
class IdempotentMigration:
def copy_key_idempotent(self, key, value, version):
"""
幂等的数据复制
"""
existing = self.target_node.read_with_version(key)
if existing is None:
# 目标节点没有这个 key,直接写入
self.target_node.write_with_version(key, value, version)
elif existing['version'] < version:
# 目标节点的版本较旧,覆盖
self.target_node.write_with_version(key, value, version)
else:
# 目标节点的版本更新或相同,跳过
pass六、TiKV 的 PD 调度策略
TiKV 是 PingCAP 开发的分布式 KV 存储,使用动态分区策略。其调度系统 PD(Placement Driver)是一个非常优秀的实现,值得深入学习。
6.1 TiKV 架构概述
TiKV 集群由三个主要组件组成:
- TiKV 节点:存储实际数据,每个节点运行多个 Region(分区)
- PD 集群:元数据管理和调度决策,通常 3-5 个节点组成的 Raft 集群
- TiDB 节点:SQL 层,本文不涉及
每个 Region 是一个 Raft 组,默认大小为 96MB,超过阈值会自动分裂。Region 有 3 个副本(可配置),分布在不同的 TiKV 节点上。
6.2 心跳机制
PD 如何感知集群状态?通过心跳(Heartbeat)机制:
Store Heartbeat(节点心跳): 每个 TiKV 节点定期(默认 10 秒)向 PD 发送心跳:
type StoreHeartbeat struct {
StoreID uint64
Capacity uint64 // 总容量
Available uint64 // 可用容量
RegionCount uint32 // Region 数量
LeaderCount uint32 // Leader Region 数量
BytesWritten uint64 // 写入字节数
BytesRead uint64 // 读取字节数
KeysWritten uint64 // 写入 key 数
KeysRead uint64 // 读取 key 数
IsBusy bool // 是否繁忙
StartTime uint64 // 启动时间
}Region Heartbeat(分区心跳): 每个 Region 的 Leader 定期(默认 60 秒)向 PD 发送心跳:
type RegionHeartbeat struct {
RegionID uint64
Leader *Peer // 当前 Leader
Peers []*Peer // 所有副本
StartKey []byte // 起始 key
EndKey []byte // 结束 key
ApproximateSize uint64 // 大小估算
ApproximateKeys uint64 // key 数量估算
BytesWritten uint64 // 写入速率
BytesRead uint64 // 读取速率
KeysWritten uint64
KeysRead uint64
}PD 收集这些心跳信息,建立集群的全局视图。
6.3 调度器(Schedulers)
PD 包含多个独立的调度器,每个调度器负责一个特定的目标:
6.3.1 Balance Leader Scheduler
目标:平衡每个节点的 Leader Region 数量。
Leader 处理所有读写请求,因此 Leader 的分布直接影响负载均衡。
class BalanceLeaderScheduler:
def __init__(self):
self.name = "balance-leader"
def schedule(self, cluster_info):
"""
生成 Leader 平衡的调度操作
"""
stores = cluster_info.get_stores()
# 统计每个节点的 Leader 数量
leader_counts = {}
for store in stores:
leader_counts[store.id] = store.leader_count
# 找到 Leader 最多和最少的节点
max_store = max(stores, key=lambda s: s.leader_count)
min_store = min(stores, key=lambda s: s.leader_count)
# 如果差距太小,不需要调度
if max_store.leader_count - min_store.leader_count <= 1:
return None
# 从 max_store 选择一个 Region,将其 Leader 转移到 min_store
candidate_region = self.select_region(cluster_info, max_store, min_store)
if candidate_region:
return {
'type': 'transfer-leader',
'region_id': candidate_region.id,
'from_store': max_store.id,
'to_store': min_store.id
}
return None
def select_region(self, cluster_info, from_store, to_store):
"""
选择合适的 Region 进行 Leader 转移
"""
# 找到在 from_store 上是 Leader,且在 to_store 上有副本的 Region
for region in cluster_info.get_regions():
if region.leader_store_id == from_store.id:
for peer in region.peers:
if peer.store_id == to_store.id:
return region
return NoneLeader 转移操作非常快(通常几毫秒),因为只需要 Raft 组内部协商,不需要数据迁移。
6.3.2 Balance Region Scheduler
目标:平衡每个节点的 Region 数量和数据量。
class BalanceRegionScheduler:
def __init__(self):
self.name = "balance-region"
def schedule(self, cluster_info):
"""
生成 Region 平衡的调度操作
"""
stores = cluster_info.get_stores()
# 按数据量排序
stores_by_size = sorted(stores, key=lambda s: s.used_size, reverse=True)
source_store = stores_by_size[0]
target_store = stores_by_size[-1]
# 检查是否需要平衡
avg_size = sum(s.used_size for s in stores) / len(stores)
if source_store.used_size < avg_size * 1.1:
return None # 已经足够均衡
# 选择一个 Region 从 source_store 移动到 target_store
candidate_region = self.select_region(cluster_info, source_store, target_store)
if candidate_region:
return {
'type': 'move-peer',
'region_id': candidate_region.id,
'from_store': source_store.id,
'to_store': target_store.id
}
return None
def select_region(self, cluster_info, from_store, to_store):
"""
选择合适的 Region 进行迁移
"""
# 找到在 from_store 上有副本的 Region
candidates = []
for region in cluster_info.get_regions():
if any(peer.store_id == from_store.id for peer in region.peers):
# 检查 to_store 上是否还没有这个 Region 的副本
if not any(peer.store_id == to_store.id for peer in region.peers):
candidates.append(region)
# 优先选择较大的 Region,加快平衡速度
if candidates:
return max(candidates, key=lambda r: r.approximate_size)
return NoneRegion 迁移步骤: 1. 在 target_store 上添加一个新的 Learner 副本 2. 等待 Learner 追上 Leader 的日志 3. 将 Learner 提升为 Follower 4. 删除 from_store 上的旧副本
6.3.3 Hot Region Scheduler
目标:识别热点 Region,将其分散到不同节点。
热点检测基于 Region 的读写速率:
class HotRegionScheduler:
def __init__(self, hot_threshold_bytes=10 * 1024 * 1024): # 10MB/s
self.name = "hot-region"
self.hot_threshold = hot_threshold_bytes
self.hot_cache = {} # {region_id: hot_info}
def update_hot_cache(self, region_heartbeat):
"""
更新热点缓存
"""
region_id = region_heartbeat.region_id
# 计算读写速率 (字节/秒)
write_rate = region_heartbeat.bytes_written
read_rate = region_heartbeat.bytes_read
total_rate = write_rate + read_rate
if total_rate > self.hot_threshold:
self.hot_cache[region_id] = {
'region_id': region_id,
'store_id': region_heartbeat.leader.store_id,
'write_rate': write_rate,
'read_rate': read_rate,
'total_rate': total_rate,
'last_update': time.time()
}
else:
# 不再热,从缓存中移除
self.hot_cache.pop(region_id, None)
def schedule(self, cluster_info):
"""
生成热点平衡的调度操作
"""
# 统计每个节点的热点负载
store_hot_load = {}
for store in cluster_info.get_stores():
store_hot_load[store.id] = 0
for hot_info in self.hot_cache.values():
store_id = hot_info['store_id']
store_hot_load[store_id] += hot_info['total_rate']
# 找到最热和最冷的节点
if not store_hot_load:
return None
hottest_store = max(store_hot_load.items(), key=lambda x: x[1])
coldest_store = min(store_hot_load.items(), key=lambda x: x[1])
# 检查是否需要调度
if hottest_store[1] < coldest_store[1] * 2:
return None # 热点分布相对均衡
# 从 hottest_store 选择一个热点 Region,转移 Leader 到 coldest_store
hot_region = self.select_hot_region(cluster_info, hottest_store[0], coldest_store[0])
if hot_region:
return {
'type': 'transfer-leader',
'region_id': hot_region['region_id'],
'from_store': hottest_store[0],
'to_store': coldest_store[0],
'reason': 'hot-region'
}
return None
def select_hot_region(self, cluster_info, from_store, to_store):
"""
选择热点 Region
"""
for hot_info in self.hot_cache.values():
if hot_info['store_id'] == from_store:
# 检查这个 Region 在 to_store 上是否有副本
region = cluster_info.get_region(hot_info['region_id'])
if any(peer.store_id == to_store for peer in region.peers):
return hot_info
return None6.4 调度约束(Placement Rules)
TiKV 支持灵活的放置规则,可以控制数据的分布策略:
标签(Label): 每个 TiKV 节点可以配置多个标签,例如:
[server]
labels = "zone=z1,rack=r1,host=h1"放置规则示例:
{
"group_id": "pd",
"id": "default",
"role": "voter",
"count": 3,
"location_labels": ["zone", "rack", "host"],
"constraints": [
{"key": "zone", "op": "in", "values": ["z1", "z2", "z3"]}
]
}这个规则表示: - 每个 Region 有 3 个 voter 副本 - 副本应该分布在不同的 zone、rack、host - 只能放在 z1、z2、z3 这三个 zone
PD 在调度时会遵守这些约束:
class PlacementChecker:
def check_placement(self, region, placement_rule):
"""
检查 Region 的副本是否满足放置规则
"""
peers = region.peers
# 检查副本数量
if len(peers) != placement_rule['count']:
return False, "副本数量不匹配"
# 检查位置隔离
location_labels = placement_rule['location_labels']
for label in location_labels:
label_values = [self.get_store_label(peer.store_id, label) for peer in peers]
# 同一层级的标签值应该各不相同(隔离)
if len(label_values) != len(set(label_values)):
return False, f"标签 {label} 没有隔离"
# 检查约束条件
for constraint in placement_rule.get('constraints', []):
if not self.check_constraint(peers, constraint):
return False, f"违反约束 {constraint}"
return True, "满足放置规则"
def check_constraint(self, peers, constraint):
"""
检查约束条件
"""
key = constraint['key']
op = constraint['op']
values = constraint['values']
for peer in peers:
label_value = self.get_store_label(peer.store_id, key)
if op == 'in':
if label_value not in values:
return False
elif op == 'not_in':
if label_value in values:
return False
return True6.5 调度优先级与并发限制
PD 不会同时执行所有调度操作,而是有优先级和并发限制:
优先级: 1. 高优先级:修复副本缺失、修复副本位置违规 2. 中优先级:热点平衡 3. 低优先级:常规的 Region 和 Leader 平衡
并发限制:
class SchedulerManager:
def __init__(self):
self.schedulers = []
self.max_pending_operators = 10 # 最多同时执行 10 个操作
self.max_snapshot_count = 3 # 最多同时传输 3 个快照
self.pending_operators = []
def run_schedulers(self, cluster_info):
"""
运行所有调度器,生成调度操作
"""
if len(self.pending_operators) >= self.max_pending_operators:
return # 已达到并发上限
for scheduler in self.schedulers:
operator = scheduler.schedule(cluster_info)
if operator:
# 检查是否可以添加这个操作
if self.can_add_operator(operator):
self.pending_operators.append(operator)
print(f"添加调度操作: {operator}")
if len(self.pending_operators) >= self.max_pending_operators:
break
def can_add_operator(self, operator):
"""
检查是否可以添加新的调度操作
"""
# 检查是否涉及正在迁移的 Region
for pending in self.pending_operators:
if pending['region_id'] == operator['region_id']:
return False # 同一个 Region 不能同时有多个操作
# 检查快照传输数量
if operator['type'] == 'move-peer':
snapshot_count = sum(1 for op in self.pending_operators if op['type'] == 'move-peer')
if snapshot_count >= self.max_snapshot_count:
return False
return True6.6 调度算子(Operator)
PD 生成的调度决策称为算子(Operator),一个算子可能包含多个步骤:
type Operator struct {
RegionID uint64
Kind OperatorKind // transfer-leader, add-peer, remove-peer, etc.
Steps []OperatorStep
Status OperatorStatus
}
type OperatorStep struct {
Type StepType // add_learner, promote_learner, transfer_leader, remove_peer
PeerID uint64
StoreID uint64
}
// 示例:将 Region 1 从 Store 1 迁移到 Store 4
operator := Operator{
RegionID: 1,
Kind: OpRegion,
Steps: []OperatorStep{
{Type: AddLearner, StoreID: 4}, // 在 Store 4 上添加 Learner
{Type: PromoteLearner, StoreID: 4}, // 提升为 Follower
{Type: RemovePeer, StoreID: 1}, // 移除 Store 1 的副本
},
}TiKV 节点收到 Operator 后,会逐步执行这些步骤,并向 PD 报告进度。
七、性能冲突与限流
再平衡操作会消耗系统资源,与正常的业务请求竞争:
7.1 资源竞争
网络带宽: - 数据迁移需要在节点间传输大量数据 - 可能占满网络带宽,影响正常请求
磁盘 I/O: - 源节点需要读取数据 - 目标节点需要写入数据 - 与正常的读写操作竞争磁盘
CPU 和内存: - 数据的序列化、反序列化、压缩、校验需要 CPU - 缓存可能被迁移数据污染
7.2 限流机制
为了避免再平衡影响正常业务,需要对迁移速度进行限流:
import time
class RateLimiter:
def __init__(self, rate_bytes_per_sec):
self.rate = rate_bytes_per_sec
self.allowance = rate_bytes_per_sec
self.last_check = time.time()
def acquire(self, bytes_count):
"""
获取指定字节数的配额,如果超限则阻塞
"""
current_time = time.time()
elapsed = current_time - self.last_check
# 补充配额
self.allowance += elapsed * self.rate
self.allowance = min(self.allowance, self.rate) # 不超过上限
self.last_check = current_time
if self.allowance >= bytes_count:
# 有足够配额,直接消耗
self.allowance -= bytes_count
return
else:
# 配额不足,需要等待
wait_time = (bytes_count - self.allowance) / self.rate
time.sleep(wait_time)
self.allowance = 0
class ThrottledMigration:
def __init__(self, rate_limit_mbps=100):
self.rate_limiter = RateLimiter(rate_limit_mbps * 1024 * 1024)
def migrate_data(self, source, target, partition_id):
"""
带限流的数据迁移
"""
keys = source.get_keys(partition_id)
batch_size = 1000
for i in range(0, len(keys), batch_size):
batch = keys[i:i+batch_size]
# 读取数据
data = source.read_batch(batch)
data_size = sum(len(v) for v in data.values())
# 申请配额
self.rate_limiter.acquire(data_size)
# 写入目标节点
target.write_batch(data)7.3 自适应限流
更高级的策略是根据系统负载动态调整迁移速度:
class AdaptiveThrottle:
def __init__(self, max_rate_mbps=200):
self.max_rate = max_rate_mbps * 1024 * 1024
self.current_rate = self.max_rate * 0.5 # 从 50% 开始
def adjust_rate(self, system_metrics):
"""
根据系统指标调整迁移速率
"""
cpu_usage = system_metrics['cpu_usage']
disk_util = system_metrics['disk_util']
p99_latency = system_metrics['p99_latency_ms']
# 如果系统压力大,降低迁移速率
if cpu_usage > 0.8 or disk_util > 0.8 or p99_latency > 100:
self.current_rate *= 0.9 # 降低 10%
# 如果系统压力小,提高迁移速率
elif cpu_usage < 0.5 and disk_util < 0.5 and p99_latency < 50:
self.current_rate *= 1.1 # 提高 10%
# 限制在合理范围内
self.current_rate = max(10 * 1024 * 1024, self.current_rate) # 至少 10MB/s
self.current_rate = min(self.max_rate, self.current_rate) # 不超过上限
return self.current_rate7.4 时间窗口控制
某些场景下,只允许在业务低峰期进行再平衡:
class TimeWindowScheduler:
def __init__(self, allowed_hours):
self.allowed_hours = allowed_hours # 例如 [0, 1, 2, 3, 4, 5] 表示凌晨 0-6 点
def is_allowed(self):
"""
检查当前是否在允许的时间窗口内
"""
current_hour = time.localtime().tm_hour
return current_hour in self.allowed_hours
def schedule_if_allowed(self, scheduler, cluster_info):
"""
只在允许的时间窗口内执行调度
"""
if self.is_allowed():
return scheduler.schedule(cluster_info)
else:
return None八、再平衡触发的检测启发式
在自动化再平衡系统中,准确判断”何时触发再平衡”与”如何执行再平衡”同样重要。过于敏感的触发条件会导致频繁的数据迁移(thrashing),过于迟钝则无法及时响应负载变化。
8.1 负载不均衡检测
最基本的触发条件是节点间负载差异超过阈值:
class ImbalanceDetector:
def __init__(self, load_threshold=0.3, min_gap_abs=100):
self.load_threshold = load_threshold
self.min_gap_abs = min_gap_abs
def should_rebalance(self, node_loads: dict) -> bool:
"""
检测是否需要再平衡。
node_loads: {node_id: load_score}
load_threshold: 最大/最小负载比超过此值时触发。
min_gap_abs: 最大与最小负载的绝对差值下限,避免低负载时误触发。
"""
if len(node_loads) < 2:
return False
max_load = max(node_loads.values())
min_load = min(node_loads.values())
if min_load == 0:
return max_load > self.min_gap_abs
ratio = (max_load - min_load) / min_load
return ratio > self.load_threshold and (max_load - min_load) > self.min_gap_abs8.2 磁盘使用率阈值
当某个节点的磁盘使用率接近上限时,必须尽快将部分分区迁出,否则该节点可能因磁盘满而停止服务:
| 磁盘使用率 | 建议动作 |
|---|---|
| < 70% | 正常,无需干预 |
| 70% - 80% | 警告,开始规划迁移 |
| 80% - 90% | 紧急,立即触发自动再平衡 |
| > 90% | 危急,拒绝新写入 + 紧急迁移 |
8.3 手动 vs 自动触发
| 触发方式 | 适用场景 | 优势 | 风险 |
|---|---|---|---|
| 全自动 | 云原生环境、弹性伸缩 | 无需人工干预,响应快 | 可能在业务高峰误触发 |
| 半自动(推荐后人工确认) | 生产环境主力方案 | 系统建议 + 人工审核 | 响应速度取决于值班人员 |
| 纯手动 | 关键金融系统、合规要求严格 | 完全可控 | 响应慢,依赖运维经验 |
多数生产系统采用半自动模式:系统根据检测指标生成再平衡计划,运维人员审核后执行。TiKV
的 PD 调度器支持通过 pd-ctl
暂停自动调度、手动添加调度算子,提供了灵活的控制能力。
flowchart TD
A["集群状态采集"] --> B{"数据量大?<br/>单分区 > 阈值?"}
B -->|是| C{"分区数多?<br/>> 预设上限?"}
B -->|否| D{"负载不均衡?<br/>偏差 > 30%?"}
C -->|是| E{"要求在线迁移?"}
C -->|否| F["动态分区策略<br/>(自动分裂/合并)"]
D -->|是| G["触发分区迁移"]
D -->|否| H["维持现状"]
E -->|是| I["两阶段在线迁移<br/>(双写 + 切换)"]
E -->|否| J["离线批量迁移<br/>(停机窗口)"]
上述决策流程图展示了从集群状态采集到策略选择的完整路径。系统首先判断数据规模和分区数量,然后根据负载均衡状况决定是否需要触发再平衡。对于大规模在线系统,两阶段在线迁移是首选方案。
九、迁移安全性与回滚
数据迁移是一项高风险操作。网络抖动、节点故障、磁盘损坏等异常情况随时可能发生。生产级的再平衡系统必须在设计中内置安全保障机制。
9.1 分阶段迁移与速率控制
将数据迁移拆分为多个明确的阶段,每个阶段都有独立的验证和回滚点:
sequenceDiagram
participant Scheduler as 调度器
participant Source as 源节点
participant Target as 目标节点
participant Verifier as 校验器
Scheduler->>Scheduler: 生成迁移计划
Scheduler->>Source: 阶段1:准备(冻结分区快照)
Source-->>Scheduler: 快照就绪
Scheduler->>Source: 阶段2:数据传输(限速)
Source->>Target: 批量传输数据(rate limit: 100MB/s)
Target-->>Scheduler: 传输完成
Scheduler->>Verifier: 阶段3:数据校验(checksum比对)
Verifier-->>Scheduler: 校验通过
Scheduler->>Scheduler: 阶段4:流量切换(更新路由)
Scheduler->>Source: 阶段5:清理旧数据(延迟删除)
Source-->>Scheduler: 清理完成
上述时序图展示了完整的分阶段迁移流程。关键设计要点包括:数据传输阶段使用速率限制避免打满网络带宽;传输完成后通过 checksum 校验保证数据完整性;流量切换和旧数据清理分开执行,为回滚保留时间窗口。
9.2 反压机制
当迁移对在线业务产生可感知的影响时,必须自动降速或暂停迁移:
class MigrationBackpressure:
def __init__(self, latency_threshold_ms=100, cpu_threshold=0.85):
self.latency_threshold_ms = latency_threshold_ms
self.cpu_threshold = cpu_threshold
self.paused = False
def check_and_adjust(self, metrics: dict, rate_limiter) -> str:
"""
根据业务指标动态调整迁移速率。
metrics: {"p99_latency_ms": float, "cpu_usage": float, "error_rate": float}
"""
if metrics["error_rate"] > 0.01:
self.paused = True
return "暂停迁移:错误率过高"
if metrics["p99_latency_ms"] > self.latency_threshold_ms:
rate_limiter.reduce_rate(0.5)
return "降速50%:P99延迟超标"
if metrics["cpu_usage"] > self.cpu_threshold:
rate_limiter.reduce_rate(0.7)
return "降速30%:CPU使用率过高"
if not self.paused and metrics["p99_latency_ms"] < self.latency_threshold_ms * 0.5:
rate_limiter.increase_rate(1.2)
return "提速20%:系统负载健康"
return "维持当前速率"9.3 回滚机制
迁移失败时的回滚策略因阶段不同而异:
| 失败阶段 | 回滚动作 | 数据风险 |
|---|---|---|
| 数据传输中 | 丢弃目标节点上的部分数据,源节点继续服务 | 无 |
| 校验失败 | 删除目标节点数据,重新传输 | 无 |
| 流量切换后 | 将路由切回源节点,源数据仍保留 | 可能丢失切换期间的增量写入 |
| 旧数据清理后 | 无法回滚,需从副本恢复 | 高 |
关键原则:旧数据的清理必须延迟执行。在流量切换后,保留源节点上的旧数据至少一个观察期(通常 24-72 小时),确认新分区工作正常后再清理。
十、运维操作手册(Operator Playbook)
10.1 扩容操作流程
场景:集群负载接近容量上限,需要添加新节点。
- 准备阶段:部署新节点,确认网络连通性、磁盘容量、配置一致性。
- 加入集群:将新节点注册到集群元数据服务(如 TiKV 的 PD)。
- 等待调度:自动调度器会将部分分区迁移到新节点。监控迁移进度和业务指标。
- 验证:确认新节点的分区分布、负载水平、延迟指标均正常。
- 调整权重(可选):如果新节点硬件配置与旧节点不同,通过权重配置调整分区分配比例。
10.2 缩容操作流程
场景:业务低峰期缩减集群规模以节省成本。
- 标记下线:将目标节点标记为”待下线”(draining)状态,停止接收新分区。
- 迁出分区:自动调度器将该节点上的所有分区迁移到其他节点。
- 等待完成:监控迁移进度,确认所有分区已完全迁出。
- 校验零分区:确认目标节点上的分区数为零。
- 移除节点:从集群元数据中移除节点,关闭机器。
10.3 紧急故障处理
场景:某个节点突然宕机,部分分区不可用。
1. 确认故障范围:哪些分区受影响?是否有副本可用?
2. 如果有副本(如 Raft 多副本):
a. 等待自动选主(通常 10-30 秒)
b. 确认新 Leader 已选出,业务恢复
c. 补充副本到目标副本数
3. 如果无副本(单副本部署):
a. 尝试恢复故障节点
b. 如果无法恢复,从备份恢复数据
c. 将恢复的数据分配到健康节点
4. 事后复盘:分析故障原因,调整副本策略
10.4 常用监控告警规则
| 告警项 | 条件 | 严重级别 | 处理建议 |
|---|---|---|---|
| 节点间负载偏差 | max/min > 2.0 | Warning | 检查是否需要手动触发再平衡 |
| 迁移任务超时 | 单任务 > 30min | Warning | 检查网络和磁盘状态 |
| 迁移失败 | 连续失败 > 3 次 | Critical | 暂停自动调度,人工排查 |
| 磁盘使用率 | > 85% | Critical | 立即迁出分区或扩容 |
| 分区数偏差 | 最多/最少 > 3 倍 | Warning | 检查分区分配策略 |
十一、三种策略的对比
下面是固定分区、动态分区、按比例分区三种策略的详细对比:
| 维度 | 固定分区 | 动态分区 | 按比例分区 |
|---|---|---|---|
| 代表系统 | Riak, Elasticsearch, Couchbase | HBase, TiKV, RethinkDB | Cassandra |
| 分区数量 | 创建时固定,难以修改 | 根据数据量动态调整 | 正比于节点数量 |
| 分区大小 | 随数据增长而增长 | 保持在配置的范围内 | 相对稳定 |
| 初始化 | 需要预估合适的分区数 | 单分区启动,可预分裂 | 每个节点固定 vnode 数 |
| 扩容操作 | 移动整个分区 | 无需操作,自动分裂 | 触发分裂,新节点接管一半 |
| 缩容操作 | 移动整个分区 | 无需操作,自动合并 | 删除 vnode,现有节点接管 |
| 迁移粒度 | 整个分区(可能较大) | 新分裂的子分区(较小) | 单个 vnode(较小) |
| 元数据复杂度 | 低(分区数固定) | 中(分区数变化) | 高(大量 vnode) |
| 实现复杂度 | 简单 | 中等(需要分裂/合并逻辑) | 中等(需要哈希环管理) |
| 范围查询 | 友好(如果按范围分区) | 友好 | 不友好(数据分散) |
| 热点处理 | 依赖分区分配策略 | 可以分裂热点分区 | 通过 vnode 分散热点 |
| 适用场景 | 数据量可预测,分区数合理 | 数据量不可预测,需要自适应 | 节点频繁变化,需要快速再平衡 |
选择建议:
固定分区:适合数据量增长可预测、集群规模相对稳定的场景。如果能合理估算分区数量,这是最简单的方案。
动态分区:适合数据量增长不可预测、需要自动适应的场景。特别适合大规模存储系统,能够随着数据增长自动扩展。
按比例分区:适合节点频繁加入/离开、需要快速再平衡的场景。特别适合云环境下的弹性伸缩。
十二、代码示例:完整的再平衡模拟器
下面是一个完整的再平衡模拟器,实现了三种策略:
import random
import hashlib
from typing import List, Dict, Set
from dataclasses import dataclass
@dataclass
class Node:
id: str
capacity_mb: int
used_mb: int = 0
partitions: Set[int] = None
def __post_init__(self):
if self.partitions is None:
self.partitions = set()
def available_mb(self):
return self.capacity_mb - self.used_mb
def usage_ratio(self):
return self.used_mb / self.capacity_mb if self.capacity_mb > 0 else 0
@dataclass
class Partition:
id: int
size_mb: int
qps: int
key_range: tuple # (start, end)
class RebalanceSimulator:
def __init__(self, strategy='fixed'):
self.strategy = strategy
self.nodes: Dict[str, Node] = {}
self.partitions: Dict[int, Partition] = {}
self.partition_to_node: Dict[int, str] = {}
def add_node(self, node: Node):
self.nodes[node.id] = node
print(f"添加节点 {node.id}, 容量 {node.capacity_mb}MB")
def remove_node(self, node_id: str):
if node_id not in self.nodes:
return
# 将该节点的分区迁移到其他节点
partitions_to_move = list(self.nodes[node_id].partitions)
for partition_id in partitions_to_move:
self.move_partition(partition_id, node_id, self.select_target_node(partition_id))
del self.nodes[node_id]
print(f"移除节点 {node_id}")
def add_partition(self, partition: Partition):
self.partitions[partition.id] = partition
# 选择负载最轻的节点
target_node = self.select_target_node(partition.id)
self.assign_partition(partition.id, target_node)
def assign_partition(self, partition_id: int, node_id: str):
partition = self.partitions[partition_id]
node = self.nodes[node_id]
self.partition_to_node[partition_id] = node_id
node.partitions.add(partition_id)
node.used_mb += partition.size_mb
def move_partition(self, partition_id: int, from_node_id: str, to_node_id: str):
if from_node_id == to_node_id:
return
partition = self.partitions[partition_id]
from_node = self.nodes[from_node_id]
to_node = self.nodes[to_node_id]
# 从源节点移除
from_node.partitions.remove(partition_id)
from_node.used_mb -= partition.size_mb
# 添加到目标节点
to_node.partitions.add(partition_id)
to_node.used_mb += partition.size_mb
self.partition_to_node[partition_id] = to_node_id
print(f"迁移分区 {partition_id} ({partition.size_mb}MB) "
f"从 {from_node_id} 到 {to_node_id}")
def select_target_node(self, partition_id: int) -> str:
"""
选择分区的目标节点
"""
# 排除已经有这个分区的节点
current_node = self.partition_to_node.get(partition_id)
# 选择使用率最低的节点
min_usage = float('inf')
target = None
for node_id, node in self.nodes.items():
if node_id == current_node:
continue
if node.usage_ratio() < min_usage:
min_usage = node.usage_ratio()
target = node_id
return target
def rebalance(self):
"""
执行再平衡
"""
print(f"\n=== 开始再平衡 (策略: {self.strategy}) ===")
if self.strategy == 'fixed':
self.rebalance_fixed()
elif self.strategy == 'dynamic':
self.rebalance_dynamic()
elif self.strategy == 'proportional':
self.rebalance_proportional()
self.print_status()
def rebalance_fixed(self):
"""
固定分区策略:移动整个分区以平衡负载
"""
# 计算平均使用率
total_used = sum(node.used_mb for node in self.nodes.values())
total_capacity = sum(node.capacity_mb for node in self.nodes.values())
avg_usage = total_used / total_capacity
# 找到过载和空闲的节点
overloaded = [node for node in self.nodes.values() if node.usage_ratio() > avg_usage * 1.2]
underloaded = [node for node in self.nodes.values() if node.usage_ratio() < avg_usage * 0.8]
# 从过载节点迁移分区到空闲节点
for over_node in overloaded:
for under_node in underloaded:
# 选择合适大小的分区
for partition_id in list(over_node.partitions):
partition = self.partitions[partition_id]
if under_node.available_mb() >= partition.size_mb:
self.move_partition(partition_id, over_node.id, under_node.id)
# 检查是否已经平衡
if over_node.usage_ratio() <= avg_usage * 1.1:
break
if over_node.usage_ratio() <= avg_usage * 1.1:
break
def rebalance_dynamic(self):
"""
动态分区策略:分裂大分区,合并小分区
"""
split_threshold = 100 # MB
merge_threshold = 20 # MB
# 检查分裂
to_split = []
for partition_id, partition in self.partitions.items():
if partition.size_mb > split_threshold:
to_split.append(partition_id)
for partition_id in to_split:
self.split_partition(partition_id)
# 检查合并
sorted_partitions = sorted(self.partitions.values(), key=lambda p: p.key_range[0])
i = 0
while i < len(sorted_partitions) - 1:
p1 = sorted_partitions[i]
p2 = sorted_partitions[i + 1]
if (p1.size_mb < merge_threshold and p2.size_mb < merge_threshold and
p1.key_range[1] == p2.key_range[0]):
self.merge_partitions(p1.id, p2.id)
sorted_partitions = sorted(self.partitions.values(), key=lambda p: p.key_range[0])
else:
i += 1
def split_partition(self, partition_id: int):
"""
分裂分区
"""
partition = self.partitions[partition_id]
node_id = self.partition_to_node[partition_id]
# 找到分裂点
start, end = partition.key_range
mid = (start + end) // 2
# 创建两个新分区
new_id_1 = max(self.partitions.keys()) + 1
new_id_2 = new_id_1 + 1
p1 = Partition(new_id_1, partition.size_mb // 2, partition.qps // 2, (start, mid))
p2 = Partition(new_id_2, partition.size_mb // 2, partition.qps // 2, (mid, end))
# 移除旧分区
node = self.nodes[node_id]
node.partitions.remove(partition_id)
node.used_mb -= partition.size_mb
del self.partitions[partition_id]
del self.partition_to_node[partition_id]
# 添加新分区
self.partitions[new_id_1] = p1
self.partitions[new_id_2] = p2
self.assign_partition(new_id_1, node_id)
# 第二个分区可能分配到其他节点以平衡负载
target = self.select_target_node(new_id_2)
self.assign_partition(new_id_2, target)
print(f"分裂分区 {partition_id} ({partition.size_mb}MB) "
f"-> {new_id_1} 和 {new_id_2} (各 {partition.size_mb//2}MB)")
def merge_partitions(self, partition_id_1: int, partition_id_2: int):
"""
合并分区
"""
p1 = self.partitions[partition_id_1]
p2 = self.partitions[partition_id_2]
# 创建合并后的分区
new_id = max(self.partitions.keys()) + 1
merged = Partition(
new_id,
p1.size_mb + p2.size_mb,
p1.qps + p2.qps,
(p1.key_range[0], p2.key_range[1])
)
# 移除旧分区
for pid in [partition_id_1, partition_id_2]:
node_id = self.partition_to_node[pid]
node = self.nodes[node_id]
node.partitions.remove(pid)
node.used_mb -= self.partitions[pid].size_mb
del self.partition_to_node[pid]
del self.partitions[pid]
# 添加新分区
self.partitions[new_id] = merged
target = self.select_target_node(new_id)
self.assign_partition(new_id, target)
print(f"合并分区 {partition_id_1} 和 {partition_id_2} -> {new_id} ({merged.size_mb}MB)")
def rebalance_proportional(self):
"""
按比例分区策略:保持每个节点的分区数量一致
"""
target_partitions_per_node = len(self.partitions) // len(self.nodes)
overloaded = [node for node in self.nodes.values()
if len(node.partitions) > target_partitions_per_node * 1.2]
underloaded = [node for node in self.nodes.values()
if len(node.partitions) < target_partitions_per_node * 0.8]
for over_node in overloaded:
for under_node in underloaded:
while (len(over_node.partitions) > target_partitions_per_node and
len(under_node.partitions) < target_partitions_per_node):
# 随机选择一个分区迁移
partition_id = random.choice(list(over_node.partitions))
self.move_partition(partition_id, over_node.id, under_node.id)
def print_status(self):
"""
打印集群状态
"""
print("\n=== 集群状态 ===")
for node_id, node in sorted(self.nodes.items()):
print(f"{node_id}: {node.used_mb}/{node.capacity_mb}MB "
f"({node.usage_ratio()*100:.1f}%), "
f"{len(node.partitions)} 个分区")
# 测试不同策略
def test_rebalance():
for strategy in ['fixed', 'dynamic', 'proportional']:
print(f"\n{'='*60}")
print(f"测试策略: {strategy}")
print(f"{'='*60}")
sim = RebalanceSimulator(strategy=strategy)
# 初始 3 个节点
sim.add_node(Node('node1', 1000))
sim.add_node(Node('node2', 1000))
sim.add_node(Node('node3', 1000))
# 添加一些分区
for i in range(20):
size = random.randint(30, 150)
qps = random.randint(100, 1000)
partition = Partition(i, size, qps, (i*1000, (i+1)*1000))
sim.add_partition(partition)
print("\n初始状态:")
sim.print_status()
# 添加新节点触发再平衡
sim.add_node(Node('node4', 1000))
sim.rebalance()
if __name__ == '__main__':
test_rebalance()参考文献
Lakshman, Avinash, and Prashant Malik. “Cassandra: a decentralized structured storage system.” ACM SIGOPS Operating Systems Review 44.2 (2010): 35-40.
DeCandia, Giuseppe, et al. “Dynamo: amazon’s highly available key-value store.” ACM SIGOPS Operating Systems Review 41.6 (2007): 205-220.
Chang, Fay, et al. “Bigtable: A distributed storage system for structured data.” ACM Transactions on Computer Systems (TOCS) 26.2 (2008): 1-26.
Huang, Dongxu, et al. “TiDB: a Raft-based HTAP database.” Proceedings of the VLDB Endowment 13.12 (2020): 3072-3084.
Kleppmann, Martin. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 6: Partitioning.
Corbett, James C., et al. “Spanner: Google’s globally distributed database.” ACM Transactions on Computer Systems (TOCS) 31.3 (2013): 1-22.
TiKV Documentation: “PD Scheduling”. https://tikv.org/docs/latest/concepts/scheduling/
Elasticsearch Documentation: “Shard allocation and cluster-level routing”. https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cluster.html
本文是第五部分(分区:数据太大,一台放不下)的最后一篇文章。从下一篇开始,我们将进入第六部分:分布式事务,探讨如何在分布式环境中保证多个操作的原子性。
上一篇:二级索引 | 下一篇:2PC 的真实失败模式
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【分布式系统百科】分区环境下的二级索引:本地索引 vs 全局索引
深入探讨分布式数据库中二级索引的实现策略,对比本地索引和全局索引的设计权衡,分析 DynamoDB GSI 和 Elasticsearch 的实现细节。
【分布式系统百科】Dynamo 论文精读:最终一致性的工业级范本
2007 年,Amazon 在 SOSP 会议上发表了《Dynamo: Amazon's Highly Available Key-value Store》论文,这篇论文彻底改变了分布式存储系统的设计思路。与追求强一致性的传统数据库不同,Dynamo 选择了一条完全不同的道路:牺牲一致性,换取可用性和分区容错性。这个设…
【分布式系统百科】大规模故障复盘:从真实事故中学习分布式系统设计
精选 8 个真实大规模分布式系统故障案例,逐一分析根因、传播路径、恢复过程与事后改进,提炼分布式系统可靠性设计的共性教训。
【分布式系统百科】分布式日志:Kafka 的日志抽象与 Pulsar 的分层架构
Jay Kreps 在 2013 年的博客文章"The Log: What every software engineer should know about real-time data's unifying abstraction"中提出了日志(Log)作为分布式系统基础抽象的思想。日志不是应用程序的调试日志,而是…