在前一篇文章中,我们讨论了哈希分区如何通过哈希函数将数据均匀分散到各个节点。然而,哈希分区有一个致命缺陷:它破坏了数据的顺序性。如果你想执行范围查询(Range Query),比如查询”所有用户 ID 在 1000 到 2000 之间的用户”,哈希分区会强迫你扫描所有分区。这在很多场景下是不可接受的。
范围分区(Range Partitioning)应运而生。它将键空间划分为连续的范围,每个范围分配给一个节点。相邻的键会存储在同一个分区中,这使得范围查询和有序扫描变得高效。但范围分区也带来了新的挑战:如何动态调整分区边界?如何处理访问热点?如何在分裂和合并时保持系统的可用性?
本文将深入探讨范围分区的核心机制、分裂与合并策略,以及热点处理的各种技巧。我们将详细分析 TiKV 和 HBase 这两个生产级系统的实现,并提供实际的代码示例。
一、为什么需要范围分区
1.1 哈希分区的局限性
哈希分区通过哈希函数打散数据,实现了负载均衡,但代价是失去了数据的局部性(Locality):
def hash_partition(key, num_partitions):
# 哈希分区破坏了顺序
return hash(key) % num_partitions
# 相邻的键被分散到不同分区
partition_of_1000 = hash_partition(1000, 10) # 可能是 3
partition_of_1001 = hash_partition(1001, 10) # 可能是 7
partition_of_1002 = hash_partition(1002, 10) # 可能是 1
# 范围查询需要扫描所有分区
def range_query_with_hash(start, end, num_partitions):
results = []
for partition_id in range(num_partitions):
results.extend(query_partition(partition_id, start, end))
return results这种设计在以下场景中会遇到问题:
- 时序数据:日志、监控指标、事件流等时序数据通常需要按时间范围查询
- 有序扫描:数据库的 ORDER BY、排序合并连接(Sort-Merge Join)等操作需要有序数据
- 前缀查询:如查询所有以”user_“开头的键
- 邻近访问:某些应用的访问模式具有空间局部性,相邻的键经常一起被访问
1.2 范围分区的优势
范围分区将键空间划分为连续的范围段,每个范围段称为一个分区(Partition)或区域(Region):
Key Space: [0, ∞)
├── Partition 0: [0, 1000) → Node A
├── Partition 1: [1000, 2000) → Node B
├── Partition 2: [2000, 3000) → Node C
└── Partition 3: [3000, ∞) → Node D
范围分区带来的核心优势:
1. 高效的范围查询
def range_query_with_range_partition(start, end, partition_map):
# 只需要访问包含 [start, end) 的分区
affected_partitions = find_overlapping_partitions(
partition_map, start, end
)
results = []
for partition in affected_partitions:
results.extend(query_partition(partition, start, end))
return results
# 示例:查询 [1500, 2500)
# 只需访问 Partition 1 和 Partition 22. 有序扫描
客户端可以从第一个分区开始顺序扫描,无需额外排序:
def ordered_scan(partition_map):
sorted_partitions = sort_partitions_by_start_key(partition_map)
for partition in sorted_partitions:
for key, value in scan_partition(partition):
yield key, value3. 数据局部性
相关数据聚集在一起,提高了缓存效率和预取效果。例如,用户的所有会话数据可以存储在同一个分区:
User Sessions:
├── user_1000_session_1 ─┐
├── user_1000_session_2 ├─ 同一分区,局部性好
├── user_1000_session_3 ─┘
├── user_1001_session_1 ─┐
└── user_1001_session_2 ─┘─ 同一分区
1.3 范围分区的挑战
范围分区并非完美,它面临以下挑战:
- 负载不均:如果数据分布不均匀,某些分区可能过大或过小
- 访问热点:顺序写入或热门键会导致单个分区过载
- 动态调整:需要在运行时动态分裂和合并分区
- 分区路由:客户端需要知道键属于哪个分区
这些问题的解决方案正是本文的核心内容。
二、范围分区的基本机制
2.1 分区元数据
范围分区系统需要维护一个分区表(Partition Table),记录每个分区的边界和位置:
type Partition struct {
ID uint64
StartKey []byte // 包含
EndKey []byte // 不包含,开区间
NodeID string
Replicas []string // 副本节点
Version uint64 // 用于处理并发更新
}
type PartitionTable struct {
Partitions []Partition
mu sync.RWMutex
}
// 查找键所属的分区
func (pt *PartitionTable) FindPartition(key []byte) *Partition {
pt.mu.RLock()
defer pt.mu.RUnlock()
// 二分查找
idx := sort.Search(len(pt.Partitions), func(i int) bool {
return bytes.Compare(pt.Partitions[i].EndKey, key) > 0
})
if idx < len(pt.Partitions) &&
bytes.Compare(pt.Partitions[idx].StartKey, key) <= 0 {
return &pt.Partitions[idx]
}
return nil
}2.2 分区路由
客户端需要一种机制来定位键所属的分区。常见方案有三种:
方案一:客户端缓存分区表
class RangePartitionClient:
def __init__(self, metadata_server):
self.metadata_server = metadata_server
self.partition_cache = {}
self.refresh_partition_table()
def refresh_partition_table(self):
self.partition_cache = self.metadata_server.get_partition_table()
def get(self, key):
partition = self.find_partition(key)
if partition is None:
# 缓存失效,刷新后重试
self.refresh_partition_table()
partition = self.find_partition(key)
try:
return self.query_node(partition.node_id, key)
except PartitionMovedError as e:
# 分区已迁移,更新缓存
self.partition_cache[partition.id] = e.new_partition
return self.query_node(e.new_partition.node_id, key)
def find_partition(self, key):
for partition in sorted(self.partition_cache.values(),
key=lambda p: p.start_key):
if partition.start_key <= key < partition.end_key:
return partition
return None方案二:元数据服务器路由
客户端每次请求都查询元数据服务器:
type MetadataServer struct {
partitionTable *PartitionTable
}
func (ms *MetadataServer) Route(key []byte) (*Partition, error) {
partition := ms.partitionTable.FindPartition(key)
if partition == nil {
return nil, errors.New("partition not found")
}
return partition, nil
}
// 客户端
func (c *Client) Get(key []byte) ([]byte, error) {
partition, err := c.metadataServer.Route(key)
if err != nil {
return nil, err
}
return c.queryNode(partition.NodeID, key)
}方案三:Gossip 协议传播
节点之间通过 Gossip 协议交换分区信息,最终达到一致:
class GossipNode:
def __init__(self, node_id, peers):
self.node_id = node_id
self.peers = peers
self.partition_table = {}
self.version = 0
def gossip_loop(self):
while True:
peer = random.choice(self.peers)
# 交换分区表
peer_table, peer_version = peer.get_partition_table()
if peer_version > self.version:
self.merge_partition_table(peer_table)
self.version = peer_version
time.sleep(1)
def merge_partition_table(self, peer_table):
for partition_id, partition in peer_table.items():
if (partition_id not in self.partition_table or
partition.version > self.partition_table[partition_id].version):
self.partition_table[partition_id] = partitionTiKV 采用方案一,客户端缓存分区表(Region 路由表)并通过 PD(Placement Driver)更新。HBase 则混合使用方案一和方案二,通过 ZooKeeper 和 HBase Meta 表维护路由信息。
2.3 边界键的选择
分区的边界键选择至关重要。理想情况下,我们希望:
- 数据均匀分布在各个分区
- 访问负载均匀分布
- 最小化跨分区操作
静态预分裂:在系统初始化时根据数据分布特征预设边界:
def presplit_by_data_distribution(key_space, num_partitions, distribution):
"""
根据数据分布预分裂
distribution: 累积分布函数 (CDF)
"""
boundaries = []
for i in range(num_partitions):
# 找到第 i/n 分位数
percentile = i / num_partitions
boundary = distribution.inverse_cdf(percentile)
boundaries.append(boundary)
boundaries.append(key_space.max) # 最后一个边界
return boundaries
# 示例:时间戳键,假设数据按时间均匀增长
def presplit_timestamp_keys(start_ts, end_ts, num_partitions):
interval = (end_ts - start_ts) / num_partitions
boundaries = [start_ts + i * interval for i in range(num_partitions)]
boundaries.append(end_ts)
return boundaries动态自适应:系统运行时根据实际负载动态调整边界,这正是分裂和合并要解决的问题。
三、分裂策略:应对增长与热点
分区分裂(Split)是范围分区系统的核心机制。当一个分区变得过大或过热时,系统需要将其一分为二。
3.1 基于大小的分裂
最简单的分裂策略是基于分区大小:当分区超过阈值时触发分裂。
HBase 的大小阈值分裂
HBase 的默认分裂策略是
ConstantSizeRegionSplitPolicy,当 Region
大小超过 hbase.hregion.max.filesize(默认
10GB)时触发分裂:
public class ConstantSizeRegionSplitPolicy extends RegionSplitPolicy {
private long desiredMaxFileSize;
@Override
protected void configureForRegion(HRegion region) {
this.desiredMaxFileSize = region.getTableDesc()
.getMaxFileSize();
if (this.desiredMaxFileSize <= 0) {
this.desiredMaxFileSize = 10L * 1024 * 1024 * 1024; // 10GB
}
}
@Override
protected boolean shouldSplit() {
long size = 0;
for (HStore store : region.getStores()) {
size += store.getSize();
}
return size > desiredMaxFileSize;
}
@Override
protected byte[] getSplitPoint() {
// 选择中间键作为分裂点
byte[] splitPoint = null;
long largestStoreSize = 0;
for (HStore store : region.getStores()) {
if (store.getSize() > largestStoreSize) {
largestStoreSize = store.getSize();
splitPoint = store.getSplitPoint();
}
}
return splitPoint;
}
}TiKV 的大小阈值分裂
TiKV 的 Region 默认大小为 96MB,当 Region 大小超过
144MB(region-max-size)时触发分裂:
// TiKV 中的 Region 大小检查(简化版)
impl<EK: KvEngine, ER: RaftEngine> SplitCheckTask<EK, ER> {
fn check_split(&self) -> Option<Vec<Vec<u8>>> {
let region = &self.region;
let approximate_size = self.get_approximate_size(region);
if approximate_size >= self.cfg.region_max_size.0 {
// 超过阈值,计算分裂点
return Some(self.get_split_keys(region));
}
None
}
fn get_split_keys(&self, region: &Region) -> Vec<Vec<u8>> {
let split_size = self.cfg.region_split_size.0; // 96MB
let mut split_keys = Vec::new();
let mut current_size = 0u64;
// 扫描 Region,每 96MB 找一个分裂点
for (key, value_size) in self.scan_region(region) {
current_size += value_size;
if current_size >= split_size {
split_keys.push(key.to_vec());
current_size = 0;
}
}
split_keys
}
}3.2 中点分裂 vs 加权分裂
找到分裂点的策略有两种:
中点分裂(Midpoint Split)
选择中间位置的键作为分裂点:
def midpoint_split(partition):
keys = list(partition.keys())
keys.sort()
midpoint = len(keys) // 2
split_key = keys[midpoint]
partition1 = Partition(
start_key=partition.start_key,
end_key=split_key
)
partition2 = Partition(
start_key=split_key,
end_key=partition.end_key
)
return partition1, partition2优点是简单,缺点是如果键大小差异很大,可能导致数据量不均。
加权分裂(Weighted Split)
根据实际数据量找到能均分数据的键:
def weighted_split(partition):
total_size = partition.total_size()
target_size = total_size / 2
current_size = 0
for key in sorted(partition.keys()):
current_size += partition.get_size(key)
if current_size >= target_size:
split_key = key
break
return create_partitions(partition, split_key)TiKV 和 HBase 都使用加权分裂,确保分裂后的两个 Region 大小大致相等。
3.3 基于访问热度的分裂
仅基于大小的分裂无法解决热点问题。如果某个键被频繁访问,即使分区很小也会成为瓶颈。
热点检测
系统需要监控每个键的访问频率:
type HotspotDetector struct {
keyAccessCount map[string]uint64
windowSize time.Duration
threshold uint64
mu sync.RWMutex
}
func (hd *HotspotDetector) RecordAccess(key string) {
hd.mu.Lock()
hd.keyAccessCount[key]++
hd.mu.Unlock()
}
func (hd *HotspotDetector) DetectHotspots() []string {
hd.mu.RLock()
defer hd.mu.RUnlock()
var hotkeys []string
for key, count := range hd.keyAccessCount {
if count > hd.threshold {
hotkeys = append(hotkeys, key)
}
}
return hotkeys
}
func (hd *HotspotDetector) ResetWindow() {
hd.mu.Lock()
hd.keyAccessCount = make(map[string]uint64)
hd.mu.Unlock()
}热点分裂
当检测到热点时,即使分区未达到大小阈值也触发分裂:
class HotspotSplitPolicy:
def __init__(self, qps_threshold=10000):
self.qps_threshold = qps_threshold
def should_split(self, partition, stats):
# 检查总 QPS
if stats.total_qps > self.qps_threshold:
return True
# 检查是否有单个键成为热点
for key, qps in stats.key_qps.items():
if qps > self.qps_threshold * 0.5:
return True
return False
def get_split_point(self, partition, stats):
# 如果有明确的热点键,在热点附近分裂
hottest_key = max(stats.key_qps.items(), key=lambda x: x[1])[0]
# 找到热点键附近的分裂点
# 避免将热点键本身作为边界(可能导致分裂后仍然热点)
split_key = self.find_key_near(partition, hottest_key)
return split_keyTiKV 实现了负载感知的分裂策略,通过 PD 收集各个 Region 的读写流量,对热点 Region 进行分裂。
3.4 预分裂:提前规划
对于可预测的数据分布和访问模式,预分裂可以避免系统启动初期的大量分裂操作。
时序数据预分裂
def presplit_timeseries(start_time, end_time, num_regions):
"""
为时序数据预分裂 Region
键格式: timestamp_sensor_id
"""
time_range = end_time - start_time
interval = time_range / num_regions
split_keys = []
for i in range(1, num_regions):
split_time = start_time + i * interval
split_key = f"{split_time}_"
split_keys.append(split_key)
return create_regions_with_split_keys(split_keys)
# 示例:为未来 30 天的监控数据预分裂 30 个 Region
now = int(time.time())
future_30_days = now + 30 * 24 * 3600
regions = presplit_timeseries(now, future_30_days, 30)哈希预分裂
为了避免热点,可以在键前加上哈希前缀,然后按哈希前缀预分裂:
def presplit_with_hash_prefix(key_space, num_regions):
"""
使用哈希前缀预分裂
实际键: user_12345
存储键: 03_user_12345 (03 是哈希前缀)
"""
split_keys = []
for i in range(1, num_regions):
# 哈希前缀从 00 到 FF(假设用 16 进制两位)
prefix = f"{i:02x}_"
split_keys.append(prefix)
return create_regions_with_split_keys(split_keys)
def encode_key_with_hash(key, num_buckets=256):
"""将键编码为带哈希前缀的格式"""
hash_value = hash(key) % num_buckets
prefix = f"{hash_value:02x}_"
return prefix + key这种方法牺牲了范围查询能力,但避免了单点热点,适合纯 KV 访问场景。
3.5 分裂的执行过程
分裂不是原子操作,需要精心设计以保持系统可用性。以 TiKV 为例:
sequenceDiagram
participant PD as PD 调度器
participant Leader as Region Leader
participant Follower as Region Follower
participant Client as 客户端
PD->>PD: 检测到热点 Region R1
PD->>Leader: 发送 Split 命令(split_key=m)
Leader->>Leader: 将 Split 作为 Raft Log 写入
Leader->>Follower: 复制 Split Log
Follower-->>Leader: 确认(多数派达成)
Leader->>Leader: 应用 Split:R1[a,m) + R2[m,z)
Leader->>Leader: 为 R2 创建新 Raft Group
Leader-->>PD: 报告分裂完成(R1, R2 的元信息)
PD->>PD: 更新路由表
PD-->>Client: 推送路由变更通知
Client->>Client: 刷新本地路由缓存
这张时序图展示了从热点检测到分裂完成的完整编排流程。PD 作为全局调度器发起分裂决策,Leader 通过 Raft 共识保证分裂操作在所有副本上一致执行。分裂完成后,路由表更新是异步推送的,客户端在收到过期 Epoch 错误时也会主动刷新缓存。
以 TiKV 的具体步骤为例:
1. PD 决定分裂 Region R1 [a, z) → [a, m), [m, z)
2. PD 向 Region R1 的 Leader 发送 Split 命令
3. Leader 将 Split 命令作为 Raft Log 提交
4. 当 Split Log 被多数派确认后:
a. Leader 创建新 Region R2 [m, z)
b. R1 的 EndKey 更新为 m,变成 [a, m)
c. R2 继承 R1 的数据([m, z) 部分)
d. R2 创建新的 Raft Group
5. Leader 向 PD 报告分裂完成
6. PD 更新路由表,通知客户端
关键点:
- 分裂期间 Region 仍然可用:读写请求可以正常处理
- 使用 Epoch 检测过期请求:每次分裂后 Region Epoch 递增,携带旧 Epoch 的请求被拒绝
- Raft 保证分裂的一致性:分裂作为 Raft Admin Command,由共识算法保证
// TiKV Region Epoch
type RegionEpoch struct {
ConfVer uint64 // 配置变更版本(副本增删)
Version uint64 // 分裂/合并版本
}
// 检查请求是否携带最新 Epoch
func (s *Store) checkEpoch(req *Request) error {
region := s.getRegion(req.RegionID)
if region == nil {
return ErrRegionNotFound
}
reqEpoch := req.RegionEpoch
regionEpoch := region.Epoch
if reqEpoch.Version < regionEpoch.Version {
return &EpochNotMatch{
Message: "region has been split or merged",
CurrentRegions: []*Region{region},
}
}
return nil
}四、合并策略:回收空间与减少碎片
分裂解决了分区过大的问题,但也带来了副作用:随着时间推移,删除操作会导致某些分区变得很小,产生大量碎片。分区合并(Merge)通过将相邻的小分区合并为一个来解决这个问题。
4.1 合并的触发条件
条件一:分区大小低于阈值
func (pc *PartitionController) shouldMerge(partition *Partition) bool {
// TiKV 默认当 Region 小于 1MB 时触发合并
minSize := pc.config.RegionMinSize // 1MB
if partition.ApproximateSize < minSize {
return true
}
return false
}条件二:分区空闲
即使分区很小,如果访问频繁也不应合并:
def should_merge(partition, stats, min_size, min_qps):
if partition.size < min_size and stats.qps < min_qps:
return True
return False条件三:相邻分区
只有相邻的分区才能合并:
func (pc *PartitionController) findMergeTarget(partition *Partition) *Partition {
// 寻找左邻居或右邻居
leftNeighbor := pc.findPartitionByEndKey(partition.StartKey)
rightNeighbor := pc.findPartitionByStartKey(partition.EndKey)
// 选择更小的邻居
if leftNeighbor != nil && shouldMerge(leftNeighbor) {
return leftNeighbor
}
if rightNeighbor != nil && shouldMerge(rightNeighbor) {
return rightNeighbor
}
return nil
}4.2 合并的执行过程
合并比分裂更复杂,因为需要协调两个独立的 Raft Group。
TiKV 的 Region 合并流程:
1. PD 决定合并 Region R1 [a, m) 和 R2 [m, z) → R [a, z)
2. PD 向 R2 的 Leader 发送 PrepareMerge 命令
- R2 停止接收新的 Raft Log(不影响读操作)
- R2 等待所有 Apply Index 追上 Commit Index
3. PD 向 R1 的 Leader 发送 CommitMerge 命令,携带 R2 的状态
4. R1 Leader 将 CommitMerge 作为 Raft Log 提交
5. 当 CommitMerge 被 Apply 时:
- R1 吸收 R2 的数据(逻辑上,数据可能已经在同一台机器)
- R1 的 EndKey 更新为 z
- R2 被销毁
6. R1 向 PD 报告合并完成
7. PD 更新路由表
关键挑战:
挑战一:保证数据一致性
R2 在 PrepareMerge 后不能接受新写入,否则这些写入会丢失:
// TiKV PrepareMerge 实现(简化)
fn exec_prepare_merge(&mut self, req: &AdminRequest) -> Result<()> {
let merge_state = MergeState {
min_index: self.apply_state.get_applied_index(),
target: req.get_prepare_merge().get_target().clone(),
commit: 0,
};
// 标记 Region 为 Merging 状态
self.region_state = RegionState::Merging(merge_state);
// 拒绝新的写入提案
self.pending_merge = Some(req.get_prepare_merge().clone());
Ok(())
}
fn propose(&mut self, cmd: &RaftCmdRequest) -> Result<()> {
// 检查是否处于 Merging 状态
if self.pending_merge.is_some() {
return Err(Error::RegionMerging);
}
// 正常提案流程
self.raft_group.propose(cmd)?;
Ok(())
}挑战二:处理并发分裂和合并
如果 R2 在合并过程中被分裂了怎么办?答案是使用 Region Epoch 检测并重试:
func (r *Region) CommitMerge(source *Region) error {
// 检查源 Region 的 Epoch 是否匹配
if source.Epoch.Version != r.expectedSourceEpoch.Version {
return ErrEpochNotMatch{
Message: "source region has been split or merged",
}
}
// 执行合并
r.EndKey = source.EndKey
r.Epoch.Version++
// 销毁源 Region
source.Destroy()
return nil
}4.3 HBase 的 Region 合并
HBase 的合并相对简单,因为它通过 Master 协调而不是两个 Region 之间直接通信:
public class HMaster {
public void mergeRegions(RegionInfo region1, RegionInfo region2)
throws IOException {
// 检查是否相邻
if (!Bytes.equals(region1.getEndKey(), region2.getStartKey())) {
throw new IOException("Regions are not adjacent");
}
// 离线两个 Region
unassignRegion(region1);
unassignRegion(region2);
// 合并数据文件(底层是 HDFS 文件)
Path region1Path = getRegionPath(region1);
Path region2Path = getRegionPath(region2);
Path mergedPath = getMergedRegionPath(region1, region2);
fs.rename(region1Path, mergedPath);
mergeStoreFiles(region2Path, mergedPath);
fs.delete(region2Path, true);
// 创建新 Region 元数据
RegionInfo mergedRegion = RegionInfo.newBuilder()
.setTable(region1.getTable())
.setStartKey(region1.getStartKey())
.setEndKey(region2.getEndKey())
.build();
// 更新 Meta 表
updateMetaTable(region1, region2, mergedRegion);
// 重新上线合并后的 Region
assignRegion(mergedRegion);
}
}HBase 的方法更直接但也更重:合并期间 Region 完全不可用。TiKV 的方法更复杂但实现了在线合并。
4.4 合并的时机选择
频繁合并会增加系统开销,延迟合并会浪费资源。实践中通常采用以下策略:
策略一:定期检查
class MergeScheduler:
def __init__(self, check_interval=600): # 10分钟
self.check_interval = check_interval
def run(self):
while True:
time.sleep(self.check_interval)
self.check_and_merge()
def check_and_merge(self):
for partition in self.get_all_partitions():
if self.should_merge(partition):
target = self.find_merge_target(partition)
if target:
self.merge(partition, target)策略二:低峰期合并
在系统负载低时执行合并:
func (ms *MergeScheduler) shouldMergeNow() bool {
currentHour := time.Now().Hour()
// 凌晨 2-5 点执行合并
if currentHour >= 2 && currentHour < 5 {
return true
}
// 或者检查系统负载
if ms.getSystemLoad() < 0.3 {
return true
}
return false
}策略三:批量合并
累积多个待合并的分区,批量执行:
def batch_merge(partitions_to_merge, batch_size=10):
batches = [partitions_to_merge[i:i+batch_size]
for i in range(0, len(partitions_to_merge), batch_size)]
for batch in batches:
for partition in batch:
target = find_merge_target(partition)
if target:
merge(partition, target)
time.sleep(60) # 批次间休息五、热点处理:让负载更均衡
热点问题是范围分区系统最头疼的问题。即使数据分布均匀,访问模式的不均也会导致某些分区过载。
5.1 热点的成因
成因一:时序写入
日志、监控等时序数据通常使用时间戳作为键,导致写入集中在最新的分区:
Time →
├── Region 1: [2024-01-01, 2024-01-15) ✓ 完成,空闲
├── Region 2: [2024-01-15, 2024-02-01) ✓ 完成,空闲
├── Region 3: [2024-02-01, 2024-02-15) ✓ 完成,空闲
└── Region 4: [2024-02-15, ∞) 🔥 所有写入集中在这里!
成因二:热门键
某些键天然比其他键更热门,比如微博的热搜话题、电商的爆款商品:
Product访问分布:
├── product_1 ───────── 1M QPS 🔥
├── product_2 ── 100K QPS
├── product_3 ── 80K QPS
└── product_1000 ─ 10 QPS
成因三:顺序扫描
某些应用会顺序扫描大范围数据,导致瞬时热点:
# 分析任务:扫描所有用户
for user_id in range(1, 1000000):
process(get_user(user_id))
# 扫描会依次让每个 Region 成为热点5.2 热点检测指标与诊断
在实施热点治理之前,首先需要具备可靠的热点检测能力。以下是生产环境中常用的检测指标体系:
指标一:每个 Range 的请求速率(QPS/TPS)
最直接的热点信号是单个 Range 的请求速率显著高于集群平均值。通常以 P99 或标准差来衡量偏离程度:
class HotRangeDetector:
def __init__(self, threshold_ratio=3.0):
self.threshold_ratio = threshold_ratio
def detect(self, range_qps: dict) -> list:
"""
检测热点 Range。
range_qps: {range_id: qps}
返回热点 Range 列表。
"""
if not range_qps:
return []
avg_qps = sum(range_qps.values()) / len(range_qps)
hot_ranges = []
for range_id, qps in range_qps.items():
if avg_qps > 0 and qps / avg_qps > self.threshold_ratio:
hot_ranges.append({
"range_id": range_id,
"qps": qps,
"ratio": qps / avg_qps
})
return sorted(hot_ranges, key=lambda x: x["ratio"], reverse=True)指标二:键分布直方图(Key Distribution Histogram)
通过采样统计每个 Range 内的键分布,识别数据倾斜。如果某个 Range 的键数量远超其他 Range,即使 QPS 相似,未来也很可能成为热点:
class KeyDistributionAnalyzer:
def __init__(self, num_buckets=64):
self.num_buckets = num_buckets
def build_histogram(self, range_key_counts: dict) -> dict:
"""
构建键分布直方图。
range_key_counts: {range_id: key_count}
"""
total_keys = sum(range_key_counts.values())
avg_keys = total_keys / len(range_key_counts) if range_key_counts else 0
skew_score = 0.0
for count in range_key_counts.values():
skew_score += (count - avg_keys) ** 2
skew_score = (skew_score / len(range_key_counts)) ** 0.5 if range_key_counts else 0
return {
"total_keys": total_keys,
"avg_keys_per_range": avg_keys,
"max_keys": max(range_key_counts.values(), default=0),
"min_keys": min(range_key_counts.values(), default=0),
"skew_stddev": skew_score,
"skew_ratio": max(range_key_counts.values(), default=0) / avg_keys if avg_keys > 0 else 0
}指标三:资源维度综合评估
单纯的 QPS 不够全面,还需要结合 CPU 使用率、磁盘 I/O、网络带宽等多维指标判断热点类型:
| 检测指标 | 阈值参考 | 热点类型 | 推荐动作 |
|---|---|---|---|
| 单 Range QPS > 集群平均 3 倍 | 可配置 | 读/写热点 | 分裂 Range |
| 键数量偏斜比 > 5 | 可配置 | 数据倾斜 | 预分裂或加盐 |
| 单 Range CPU 使用率 > 80% | 80% | 计算热点 | 分裂或迁移 |
| 单 Range 写入字节 > 集群平均 5 倍 | 可配置 | 写入热点 | 加盐或分裂 |
| P99 延迟 > SLA 阈值 | 业务定义 | 综合热点 | 分裂 + 读副本 |
flowchart TD
A["检测到热点 Range"] --> B{"CPU 使用率高?"}
B -->|是| C{"写入集中?"}
B -->|否| D{"读请求集中?"}
C -->|是| E["分裂 Range"]
C -->|否| F["迁移到更强节点"]
D -->|是| G["增加 Follower Read 副本"]
D -->|否| H{"键分布倾斜?"}
H -->|是| I["键加盐 + 预分裂"]
H -->|否| J["检查扫描类查询"]
上述决策流程图展示了热点治理的核心判断路径。首先区分热点的资源瓶颈类型(CPU、读、写),然后根据具体成因选择对应的治理手段。键分布倾斜导致的热点需要从数据模型层面解决,而非简单的分裂操作。
5.3 解决方案一:键加盐(Key Salting)
在键前加上随机前缀,将写入分散到多个分区:
import hashlib
def salt_key(key, num_salts=10):
"""
为键添加盐
原始键: user_12345
加盐键: 03_user_12345
"""
salt = hash(key) % num_salts
return f"{salt:02d}_{key}"
def desalt_key(salted_key):
"""移除盐前缀"""
return salted_key.split('_', 1)[1]
# 写入
def write_with_salt(key, value, num_salts=10):
salted_key = salt_key(key, num_salts)
db.put(salted_key, value)
# 读取(需要尝试所有可能的盐)
def read_with_salt(key, num_salts=10):
for salt in range(num_salts):
salted_key = f"{salt:02d}_{key}"
value = db.get(salted_key)
if value is not None:
return value
return None加盐的代价是点查询需要尝试所有盐值,范围查询变得不可能。适用于纯 KV 场景。
5.4 解决方案二:时间戳反转
对于时序数据,可以反转时间戳避免写入集中:
def reverse_timestamp(timestamp):
"""
反转时间戳
正向: 1609459200 (2021-01-01)
反向: 8390540799 (从某个未来时间倒数)
"""
max_timestamp = 9999999999 # 遥远的未来
return max_timestamp - timestamp
# 存储
def store_event(event_id, timestamp, data):
reversed_ts = reverse_timestamp(timestamp)
key = f"{reversed_ts}_{event_id}"
db.put(key, data)
# 查询最近的事件(反转后最近的事件在键空间前端)
def query_recent_events(limit=100):
return db.scan(start_key="", end_key="", limit=limit)这种方法保留了范围查询能力,但改变了查询语义(最近的在前而不是在后)。
5.5 解决方案三:自适应分裂
前面提到的热点分裂策略,通过监控访问频率动态分裂热点分区:
type AdaptiveSplitter struct {
regions map[uint64]*Region
accessStats *AccessStats
splitThreshold uint64 // QPS 阈值
}
func (as *AdaptiveSplitter) MonitorLoop() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
as.checkAndSplitHotspots()
}
}
func (as *AdaptiveSplitter) checkAndSplitHotspots() {
for regionID, region := range as.regions {
qps := as.accessStats.GetRegionQPS(regionID)
if qps > as.splitThreshold {
// 分析热点分布
hotKeys := as.accessStats.GetHotKeys(regionID, 10)
if len(hotKeys) > 0 {
// 在热点键附近分裂
splitKey := as.findSplitKeyNear(region, hotKeys[0])
as.splitRegion(region, splitKey)
} else {
// 均匀热点,正常分裂
splitKey := as.findMidpoint(region)
as.splitRegion(region, splitKey)
}
}
}
}5.6 解决方案四:负载感知路由
对于读热点,可以通过负载均衡将请求分散到多个副本:
class LoadAwareRouter:
def __init__(self):
self.region_replicas = {} # region_id -> [replica1, replica2, ...]
self.replica_load = {} # replica_id -> current_qps
def route_read_request(self, region_id):
replicas = self.region_replicas[region_id]
# 选择负载最低的副本
best_replica = min(replicas,
key=lambda r: self.replica_load.get(r, 0))
self.replica_load[best_replica] = \
self.replica_load.get(best_replica, 0) + 1
return best_replica
def update_load_stats(self):
# 定期更新负载统计
for replica_id in self.replica_load:
# 衰减旧的统计
self.replica_load[replica_id] *= 0.9TiKV 的 Follower Read 特性允许从 Follower 副本读取数据,缓解 Leader 的读压力。
5.7 解决方案五:预分裂与哈希前缀
对于已知的热点模式,可以预先分裂并使用哈希前缀:
class HashPrefixedTimeSeries:
def __init__(self, num_buckets=16):
self.num_buckets = num_buckets
self.presplit()
def presplit(self):
"""预分裂 Region,每个哈希前缀一个 Region"""
split_keys = []
for i in range(1, self.num_buckets):
prefix = f"{i:02x}_"
split_keys.append(prefix)
create_regions_with_splits(split_keys)
def encode_key(self, sensor_id, timestamp):
"""
组合哈希前缀和时间戳
键格式: hash_timestamp_sensor_id
"""
bucket = hash(sensor_id) % self.num_buckets
prefix = f"{bucket:02x}_"
return f"{prefix}{timestamp}_{sensor_id}"
def write(self, sensor_id, timestamp, data):
key = self.encode_key(sensor_id, timestamp)
db.put(key, data)
def query_sensor(self, sensor_id, start_time, end_time):
"""查询特定传感器的时序数据"""
bucket = hash(sensor_id) % self.num_buckets
prefix = f"{bucket:02x}_"
start_key = f"{prefix}{start_time}_{sensor_id}"
end_key = f"{prefix}{end_time}_{sensor_id}~" # ~ 在字典序中较大
return db.scan(start_key, end_key)这种方法在保留时间范围查询能力的同时,将写入分散到多个分区。代价是跨传感器的全局时间范围查询需要扫描所有哈希桶。
六、TiKV Region 的分裂与合并
TiKV 是 PingCAP 开发的分布式 KV 存储,采用范围分区(称为 Region)。让我们深入分析其实现。
6.1 Region 的基本结构
TiKV 的数据空间被划分为 Region,每个 Region 负责一个连续的键范围:
pub struct Region {
pub id: u64,
pub start_key: Vec<u8>,
pub end_key: Vec<u8>,
pub region_epoch: RegionEpoch,
pub peers: Vec<Peer>, // 副本列表
}
pub struct RegionEpoch {
pub conf_ver: u64, // 配置变更版本
pub version: u64, // Region 版本(分裂/合并时递增)
}
pub struct Peer {
pub id: u64,
pub store_id: u64, // 所在的存储节点
}每个 Region 是一个独立的 Raft Group,有自己的 Leader 和 Follower。
6.2 PD 的作用
PD(Placement Driver)是 TiKV 集群的大脑,负责:
- 元数据管理:维护 Region 路由表
- 调度决策:决定何时分裂、合并、迁移 Region
- 时间戳分配:为事务分配全局唯一时间戳
- 负载均衡:监控各节点负载,调度 Region 分布
type PDServer struct {
regionTree *RegionTree // 维护所有 Region 的索引
stores map[uint64]*StoreInfo
scheduler *Scheduler
}
type RegionTree struct {
tree *btree.BTree // 按 StartKey 排序的 B 树
}
// PD 查找键所属的 Region
func (pd *PDServer) GetRegion(key []byte) *Region {
item := pd.regionTree.tree.Search(key)
if item == nil {
return nil
}
return item.(*Region)
}
// PD 监控 Region 统计信息
func (pd *PDServer) HandleRegionHeartbeat(req *RegionHeartbeat) {
region := req.GetRegion()
// 更新 Region 信息
pd.regionTree.Update(region)
// 检查是否需要分裂
if req.ApproximateSize > pd.cfg.RegionMaxSize {
pd.scheduler.ScheduleSplit(region)
}
// 检查是否需要合并
if req.ApproximateSize < pd.cfg.RegionMinSize {
pd.scheduler.ScheduleMerge(region)
}
// 检查是否需要迁移(负载均衡)
if pd.shouldTransferRegion(region, req.Load) {
pd.scheduler.ScheduleTransfer(region)
}
}6.3 Region Split 的详细流程
第一步:检查分裂条件
TiKV 节点定期检查 Region 大小:
impl<EK: KvEngine, ER: RaftEngine> SplitCheckRunner<EK, ER> {
fn run(&mut self, task: SplitCheckTask<EK, ER>) {
let region = task.region;
let approximate_size = self.get_approximate_size(®ion);
if approximate_size >= self.cfg.region_max_size {
// 需要分裂,计算分裂键
let split_keys = self.get_split_keys(®ion);
if !split_keys.is_empty() {
// 向 Region Leader 发送分裂请求
self.router.send(
region.id,
PeerMsg::SplitRegion { split_keys }
);
}
}
}
fn get_split_keys(&self, region: &Region) -> Vec<Vec<u8>> {
let mut split_keys = Vec::new();
let split_size = self.cfg.region_split_size;
let mut accumulated_size = 0;
// 扫描 Region,寻找分裂点
let start_key = keys::data_key(region.get_start_key());
let end_key = keys::data_key(region.get_end_key());
let mut iter = self.engine.iterator(&start_key, &end_key);
while iter.valid() {
accumulated_size += iter.value().len() as u64;
if accumulated_size >= split_size {
split_keys.push(keys::origin_key(iter.key()).to_vec());
accumulated_size = 0;
}
iter.next();
}
split_keys
}
}第二步:提交 Raft Admin Command
Leader 收到分裂请求后,将其作为 Admin Command 提交到 Raft:
impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
fn propose_split(&mut self, split_keys: Vec<Vec<u8>>) {
// 构造 Split 命令
let mut req = AdminRequest::default();
let mut split = SplitRequest::default();
split.set_split_key(split_keys[0].clone());
split.set_new_region_id(self.next_region_id());
split.set_new_peer_ids(self.next_peer_ids());
req.set_cmd_type(AdminCmdType::Split);
req.set_split(split);
// 提交到 Raft
self.propose_raft_command(req);
}
fn on_ready_split_region(&mut self, split: &SplitRequest) {
let region = self.region.clone();
let split_key = split.get_split_key();
// 创建新 Region
let mut new_region = Region::default();
new_region.set_id(split.get_new_region_id());
new_region.set_start_key(split_key.to_vec());
new_region.set_end_key(region.get_end_key().to_vec());
new_region.mut_region_epoch().set_version(1);
new_region.set_peers(split.get_new_peer_ids().into());
// 更新原 Region
let mut updated_region = region.clone();
updated_region.set_end_key(split_key.to_vec());
updated_region.mut_region_epoch().set_version(
region.get_region_epoch().get_version() + 1
);
// 应用分裂
self.region = updated_region;
self.create_peer_for_region(new_region);
// 通知 PD
self.report_split_to_pd(new_region);
}
}第三步:路由更新
客户端通过 Region Epoch 检测分裂:
type TiKVClient struct {
regionCache *RegionCache
}
func (c *TiKVClient) Get(key []byte) ([]byte, error) {
region := c.regionCache.LocateKey(key)
req := &GetRequest{
Key: key,
RegionId: region.ID,
RegionEpoch: region.Epoch,
}
resp, err := c.sendRequest(region.Leader, req)
if err != nil {
return nil, err
}
// 检查 Epoch
if resp.RegionError != nil && resp.RegionError.EpochNotMatch != nil {
// Region 已分裂,更新缓存
newRegions := resp.RegionError.EpochNotMatch.CurrentRegions
c.regionCache.UpdateRegions(newRegions)
// 重试
return c.Get(key)
}
return resp.Value, nil
}6.4 Region Merge 的实现
Region 合并更复杂,涉及两个 Raft Group 的协调:
// 第一步:PrepareMerge
fn propose_prepare_merge(&mut self, target_region: Region) {
let mut req = AdminRequest::default();
let mut prepare_merge = PrepareMergeRequest::default();
prepare_merge.set_min_index(self.get_apply_index());
prepare_merge.set_target(target_region);
req.set_cmd_type(AdminCmdType::PrepareMerge);
req.set_prepare_merge(prepare_merge);
self.propose_raft_command(req);
}
fn on_ready_prepare_merge(&mut self, req: &PrepareMergeRequest) {
// 标记为 Merging 状态
self.pending_merge_state = Some(MergeState {
min_index: req.get_min_index(),
target: req.get_target().clone(),
});
// 拒绝新的写入
self.is_merging = true;
}
// 第二步:CommitMerge
fn propose_commit_merge(&mut self, source_region: Region, source_state: MergeState) {
let mut req = AdminRequest::default();
let mut commit_merge = CommitMergeRequest::default();
commit_merge.set_source(source_region);
commit_merge.set_commit(source_state.min_index);
req.set_cmd_type(AdminCmdType::CommitMerge);
req.set_commit_merge(commit_merge);
self.propose_raft_command(req);
}
fn on_ready_commit_merge(&mut self, req: &CommitMergeRequest) {
let source = req.get_source();
// 检查 Epoch
if source.get_region_epoch().get_version() !=
self.pending_merge_target_epoch.get_version() {
// Epoch 不匹配,源 Region 已变化,取消合并
return;
}
// 扩展键范围
self.region.set_end_key(source.get_end_key().to_vec());
self.region.mut_region_epoch().set_version(
self.region.get_region_epoch().get_version() + 1
);
// 销毁源 Region
self.destroy_source_region(source.get_id());
// 通知 PD
self.report_merge_to_pd();
}6.5 TiKV 的负载均衡
PD 通过调度器实现负载均衡:
type Scheduler struct {
name string
cfg *SchedulerConfig
}
// 热点调度器
type HotRegionScheduler struct {
Scheduler
hotRegions map[uint64]*HotRegion // region_id -> hot_info
}
func (hs *HotRegionScheduler) Schedule(cluster *Cluster) []*Operator {
var ops []*Operator
// 找出热点 Region
for _, region := range hs.hotRegions {
if region.FlowBytes > hs.cfg.HotRegionThreshold {
// 尝试分裂热点 Region
if region.Size < hs.cfg.MaxRegionSize {
op := CreateSplitOperator(region)
ops = append(ops, op)
}
// 或者迁移到负载更低的节点
targetStore := cluster.SelectTargetStore(region)
if targetStore != nil {
op := CreateTransferLeaderOperator(region, targetStore)
ops = append(ops, op)
}
}
}
return ops
}
// 负载均衡调度器
type BalanceRegionScheduler struct {
Scheduler
}
func (bs *BalanceRegionScheduler) Schedule(cluster *Cluster) []*Operator {
// 找出负载最高和最低的节点
stores := cluster.GetStores()
sort.Slice(stores, func(i, j int) bool {
return stores[i].RegionCount > stores[j].RegionCount
})
sourceStore := stores[0] // 负载最高
targetStore := stores[len(stores)-1] // 负载最低
// 选择一个 Region 从 source 迁移到 target
region := sourceStore.SelectRegionToMove()
if region != nil {
return []*Operator{
CreateMoveRegionOperator(region, targetStore),
}
}
return nil
}七、HBase Region 的分裂与合并
HBase 是 Hadoop 生态的分布式数据库,也采用范围分区(称为 Region)。其实现与 TiKV 有显著不同。
7.1 HBase 的架构
HBase 架构:
├── HMaster: 协调者,负责 Region 分配和负载均衡
├── RegionServer: 数据节点,每个节点管理多个 Region
│ ├── Region 1: [a, m)
│ ├── Region 2: [m, z)
│ └── WAL: 写前日志
├── ZooKeeper: 协调服务,存储元数据
└── HDFS: 底层存储,存储 HFile(数据文件)
public class HRegion {
private final RegionInfo regionInfo;
private final Map<byte[], HStore> stores; // Column Family -> Store
private final WAL wal;
private final FileSystem fs;
// Region 元数据
public static class RegionInfo {
private final byte[] tableName;
private final byte[] startKey;
private final byte[] endKey;
private final long regionId;
}
// Store 对应一个 Column Family
public static class HStore {
private final byte[] family;
private final List<HFile> storeFiles; // 数据文件
private final MemStore memStore; // 内存缓冲
}
}7.2 HBase 的分裂策略
HBase 提供多种分裂策略,通过
RegionSplitPolicy 配置:
ConstantSizeRegionSplitPolicy:固定大小阈值
public class ConstantSizeRegionSplitPolicy extends RegionSplitPolicy {
private long desiredMaxFileSize;
@Override
protected void configureForRegion(HRegion region) {
this.desiredMaxFileSize = region.getTableDescriptor()
.getMaxFileSize();
if (this.desiredMaxFileSize <= 0) {
this.desiredMaxFileSize = 10L * 1024 * 1024 * 1024; // 10GB
}
}
@Override
protected boolean shouldSplit() {
long totalSize = 0;
for (HStore store : region.getStores()) {
totalSize += store.getSize();
}
return totalSize > desiredMaxFileSize;
}
}IncreasingToUpperBoundRegionSplitPolicy:随表增长调整阈值
public class IncreasingToUpperBoundRegionSplitPolicy
extends RegionSplitPolicy {
@Override
protected boolean shouldSplit() {
// 表的 Region 数量越少,分裂阈值越低
int regionCount = getRegionCountOfTable(region.getTableName());
long sizeToCheck = desiredMaxFileSize;
if (regionCount == 1) {
sizeToCheck = initialSize; // 例如 256MB
} else if (regionCount < 100) {
// min(maxFileSize, initialSize * regionCount^3)
sizeToCheck = Math.min(
desiredMaxFileSize,
initialSize * regionCount * regionCount * regionCount
);
}
return region.getSize() > sizeToCheck;
}
}这种策略在表初期更激进地分裂,避免初始单个 Region 成为瓶颈。
SteppingSplitPolicy:阶梯式分裂
public class SteppingSplitPolicy extends RegionSplitPolicy {
@Override
protected boolean shouldSplit() {
int regionCount = getRegionCountOfTable(region.getTableName());
if (regionCount <= 1) {
return region.getSize() > 128 * 1024 * 1024; // 128MB
} else if (regionCount <= 10) {
return region.getSize() > 512 * 1024 * 1024; // 512MB
} else {
return region.getSize() > desiredMaxFileSize; // 10GB
}
}
}7.3 HBase Region Split 的执行流程
HBase 的分裂过程涉及 WAL 分裂和数据文件处理:
public class HRegion {
public void split(byte[] splitKey) throws IOException {
// 第一步:关闭 Region,停止写入
this.close();
// 第二步:分裂 WAL
Path regionDir = getRegionDir();
Path splitLogDir = new Path(regionDir, "splitlog");
fs.mkdirs(splitLogDir);
// 将 WAL 按分裂键分成两部分
splitWAL(wal, splitKey, splitLogDir);
// 第三步:创建两个子 Region
RegionInfo daughterA = RegionInfo.newBuilder()
.setTable(regionInfo.getTable())
.setStartKey(regionInfo.getStartKey())
.setEndKey(splitKey)
.build();
RegionInfo daughterB = RegionInfo.newBuilder()
.setTable(regionInfo.getTable())
.setStartKey(splitKey)
.setEndKey(regionInfo.getEndKey())
.build();
// 第四步:创建 Reference 文件而不是立即复制数据
createReferenceFiles(daughterA, daughterB);
// 第五步:更新 Meta 表
updateMetaTable(regionInfo, daughterA, daughterB);
// 第六步:打开子 Region
HRegion regionA = openRegion(daughterA);
HRegion regionB = openRegion(daughterB);
// 第七步:通知 HMaster
reportSplit(daughterA, daughterB);
}
private void createReferenceFiles(RegionInfo daughterA,
RegionInfo daughterB)
throws IOException {
// Reference 文件指向父 Region 的数据文件
// 避免立即复制数据
for (HStore store : stores.values()) {
for (HFile file : store.getStoreFiles()) {
// 为 daughterA 创建 top reference(指向前半部分)
createReference(file, daughterA, Reference.Type.TOP);
// 为 daughterB 创建 bottom reference(指向后半部分)
createReference(file, daughterB, Reference.Type.BOTTOM);
}
}
}
}
// Reference 文件
public class Reference {
enum Type { TOP, BOTTOM }
private final Path parentFile;
private final byte[] splitKey;
private final Type type;
// 读取 Reference 文件时过滤数据
public Iterator<KeyValue> iterator() {
Iterator<KeyValue> parentIter = openParentFile();
return new FilteringIterator(parentIter, kv -> {
if (type == Type.TOP) {
return kv.getKey() < splitKey;
} else {
return kv.getKey() >= splitKey;
}
});
}
}HBase 使用 Reference 文件延迟数据复制,分裂可以快速完成。真正的数据分离在后台 Compaction 时进行:
public class Compaction {
public void compact(HStore store) throws IOException {
List<HFile> filesToCompact = store.selectFilesToCompact();
// 检查是否有 Reference 文件
boolean hasReference = filesToCompact.stream()
.anyMatch(HFile::isReference);
if (hasReference) {
// Compaction 时解引用,生成真实的数据文件
HFile newFile = compactWithDereference(filesToCompact);
// 删除旧的 Reference 文件
for (HFile file : filesToCompact) {
file.delete();
}
store.addFile(newFile);
} else {
// 正常 Compaction
HFile newFile = compactNormally(filesToCompact);
store.addFile(newFile);
}
}
}7.4 HBase Region 合并
HBase 的合并由 HMaster 协调:
public class HMaster {
public void mergeRegions(RegionInfo region1, RegionInfo region2)
throws IOException {
// 验证 Region 相邻
if (!Bytes.equals(region1.getEndKey(), region2.getStartKey())) {
throw new IOException("Regions are not adjacent");
}
// 关闭两个 Region
RegionServer rs1 = getRegionServer(region1);
RegionServer rs2 = getRegionServer(region2);
rs1.closeRegion(region1);
if (rs1 != rs2) {
rs2.closeRegion(region2);
}
// 合并数据
Path region1Dir = getRegionDir(region1);
Path region2Dir = getRegionDir(region2);
RegionInfo mergedRegion = RegionInfo.newBuilder()
.setTable(region1.getTable())
.setStartKey(region1.getStartKey())
.setEndKey(region2.getEndKey())
.build();
Path mergedDir = getRegionDir(mergedRegion);
// 移动数据文件
fs.rename(region1Dir, mergedDir);
// 合并 region2 的数据
for (String family : region2.getFamilies()) {
Path srcFamily = new Path(region2Dir, family);
Path dstFamily = new Path(mergedDir, family);
if (fs.exists(srcFamily)) {
fs.mkdirs(dstFamily);
for (FileStatus file : fs.listStatus(srcFamily)) {
fs.rename(file.getPath(),
new Path(dstFamily, file.getPath().getName()));
}
}
}
// 删除 region2 目录
fs.delete(region2Dir, true);
// 更新 Meta 表
deleteFromMeta(region1);
deleteFromMeta(region2);
addToMeta(mergedRegion);
// 打开合并后的 Region
assignRegion(mergedRegion);
}
}HBase 的合并是离线操作,期间 Region 不可用。相比之下,TiKV 的在线合并更加优雅,但实现也更复杂。
八、混合分区方案
纯粹的范围分区和哈希分区都有局限性。实践中,很多系统采用混合方案。
8.1 复合分区键(Cassandra)
Cassandra 使用复合分区键:第一部分哈希,第二部分范围:
CREATE TABLE sensor_data (
sensor_id INT,
timestamp BIGINT,
value DOUBLE,
PRIMARY KEY ((sensor_id), timestamp)
);
-- (sensor_id) 是 Partition Key,使用哈希分区
-- timestamp 是 Clustering Key,在分区内范围排序实现原理:
class CassandraPartitioner:
def __init__(self, num_partitions):
self.num_partitions = num_partitions
def get_partition(self, partition_key, clustering_key):
# 哈希分区键决定分区
partition_id = hash(partition_key) % self.num_partitions
return partition_id
def store(self, partition_key, clustering_key, value):
partition_id = self.get_partition(partition_key, clustering_key)
# 在分区内按 clustering key 范围存储
partition = self.get_partition_storage(partition_id)
partition.put(clustering_key, value)
def query_range(self, partition_key, start_cluster, end_cluster):
# 可以高效范围查询同一分区内的数据
partition_id = self.get_partition(partition_key, None)
partition = self.get_partition_storage(partition_id)
return partition.scan(start_cluster, end_cluster)这种设计的优势:
- 负载均衡:哈希分区键打散数据
- 高效范围查询:在分区内按 Clustering Key 范围查询
- 局部性:相同 Partition Key 的数据在同一分区
限制:只能查询特定 Partition Key 的范围,不能跨 Partition Key 范围查询。
8.2 多级分区
先按一个维度哈希分区,再按另一个维度范围分区:
class HierarchicalPartitioner:
def __init__(self, num_hash_partitions, region_size):
self.num_hash_partitions = num_hash_partitions
self.region_size = region_size
self.regions = {} # (hash_partition, region_id) -> Region
def get_partition(self, tenant_id, user_id):
# 第一级:按 tenant_id 哈希
hash_partition = hash(tenant_id) % self.num_hash_partitions
# 第二级:按 user_id 范围查找 Region
region = self.find_region(hash_partition, user_id)
return (hash_partition, region.id)
def find_region(self, hash_partition, key):
# 在哈希分区内查找键所属的 Region
for region in self.get_regions_of_hash_partition(hash_partition):
if region.start_key <= key < region.end_key:
return region
return None
def split_region_if_needed(self, hash_partition, region):
if region.size > self.region_size:
# 只在第二级(范围分区)进行分裂
mid_key = region.get_midpoint_key()
region1, region2 = region.split(mid_key)
self.regions[(hash_partition, region1.id)] = region1
self.regions[(hash_partition, region2.id)] = region2这种方案在多租户系统中很有用:
- 第一级哈希按租户隔离,避免租户间热点
- 第二级范围分区在租户内提供高效范围查询
8.3 地理分区
根据地理位置进行范围分区,结合就近路由:
class GeoPartitioner:
def __init__(self):
self.regions = [
Region("us-west", lat_range=(32, 42), lon_range=(-125, -110)),
Region("us-east", lat_range=(36, 45), lon_range=(-80, -70)),
Region("eu-west", lat_range=(48, 56), lon_range=(-10, 5)),
Region("asia-east", lat_range=(30, 40), lon_range=(110, 125)),
]
def get_region(self, lat, lon):
for region in self.regions:
if (region.lat_range[0] <= lat < region.lat_range[1] and
region.lon_range[0] <= lon < region.lon_range[1]):
return region
return None
def query_nearby(self, lat, lon, radius_km):
# 计算范围
lat_range = (lat - radius_km / 111, lat + radius_km / 111)
lon_range = (
lon - radius_km / (111 * cos(radians(lat))),
lon + radius_km / (111 * cos(radians(lat)))
)
# 找到所有相关的 Region
affected_regions = []
for region in self.regions:
if self.ranges_overlap(region.lat_range, lat_range) and \
self.ranges_overlap(region.lon_range, lon_range):
affected_regions.append(region)
# 查询所有相关 Region
results = []
for region in affected_regions:
results.extend(region.query(lat_range, lon_range))
return results地理分区的优势是数据局部性和就近访问,适合 LBS(Location-Based Service)应用。
九、实战:实现一个简单的范围分区系统
让我们实现一个简化的范围分区系统,包含分裂和合并功能:
import bisect
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
@dataclass
class Partition:
id: int
start_key: int
end_key: int
data: Dict[int, str]
size: int = 0
def contains(self, key: int) -> bool:
return self.start_key <= key < self.end_key
def put(self, key: int, value: str):
if not self.contains(key):
raise ValueError(f"Key {key} not in partition range")
old_size = len(self.data.get(key, ""))
self.data[key] = value
self.size += len(value) - old_size
def get(self, key: int) -> Optional[str]:
return self.data.get(key)
def scan(self, start: int, end: int) -> List[Tuple[int, str]]:
return [(k, v) for k, v in sorted(self.data.items())
if start <= k < end]
class RangePartitionSystem:
def __init__(self, split_threshold: int = 1000,
merge_threshold: int = 100):
self.partitions: List[Partition] = []
self.split_threshold = split_threshold
self.merge_threshold = merge_threshold
self.next_partition_id = 0
self.lock = threading.RLock()
# 初始化一个覆盖整个键空间的分区
self.add_partition(Partition(
id=self.next_partition_id,
start_key=0,
end_key=2**31,
data={}
))
self.next_partition_id += 1
def add_partition(self, partition: Partition):
with self.lock:
# 按 start_key 有序插入
idx = bisect.bisect_left(
[p.start_key for p in self.partitions],
partition.start_key
)
self.partitions.insert(idx, partition)
def find_partition(self, key: int) -> Optional[Partition]:
with self.lock:
# 二分查找
idx = bisect.bisect_right(
[p.start_key for p in self.partitions],
key
) - 1
if idx >= 0 and idx < len(self.partitions):
partition = self.partitions[idx]
if partition.contains(key):
return partition
return None
def put(self, key: int, value: str):
partition = self.find_partition(key)
if partition is None:
raise ValueError(f"No partition found for key {key}")
partition.put(key, value)
# 检查是否需要分裂
if partition.size > self.split_threshold:
self.split_partition(partition)
def get(self, key: int) -> Optional[str]:
partition = self.find_partition(key)
if partition is None:
return None
return partition.get(key)
def scan(self, start_key: int, end_key: int) -> List[Tuple[int, str]]:
results = []
with self.lock:
for partition in self.partitions:
# 检查分区是否与查询范围重叠
if partition.start_key < end_key and partition.end_key > start_key:
# 计算重叠范围
scan_start = max(start_key, partition.start_key)
scan_end = min(end_key, partition.end_key)
results.extend(partition.scan(scan_start, scan_end))
return sorted(results)
def split_partition(self, partition: Partition):
with self.lock:
if len(partition.data) == 0:
return
# 找到中点键(按数据量加权)
sorted_keys = sorted(partition.data.keys())
mid_idx = len(sorted_keys) // 2
split_key = sorted_keys[mid_idx]
# 创建两个新分区
partition1 = Partition(
id=self.next_partition_id,
start_key=partition.start_key,
end_key=split_key,
data={}
)
self.next_partition_id += 1
partition2 = Partition(
id=self.next_partition_id,
start_key=split_key,
end_key=partition.end_key,
data={}
)
self.next_partition_id += 1
# 分配数据
for key, value in partition.data.items():
if key < split_key:
partition1.put(key, value)
else:
partition2.put(key, value)
# 替换旧分区
self.partitions.remove(partition)
self.add_partition(partition1)
self.add_partition(partition2)
print(f"Split partition {partition.id} "
f"[{partition.start_key}, {partition.end_key}) into "
f"[{partition1.start_key}, {partition1.end_key}) and "
f"[{partition2.start_key}, {partition2.end_key})")
def merge_partitions(self):
with self.lock:
i = 0
while i < len(self.partitions) - 1:
partition1 = self.partitions[i]
partition2 = self.partitions[i + 1]
# 检查是否相邻且都小于合并阈值
if (partition1.end_key == partition2.start_key and
partition1.size < self.merge_threshold and
partition2.size < self.merge_threshold):
# 合并
merged = Partition(
id=self.next_partition_id,
start_key=partition1.start_key,
end_key=partition2.end_key,
data={}
)
self.next_partition_id += 1
# 合并数据
for key, value in partition1.data.items():
merged.put(key, value)
for key, value in partition2.data.items():
merged.put(key, value)
# 替换
self.partitions.pop(i)
self.partitions.pop(i)
self.add_partition(merged)
print(f"Merged partitions {partition1.id} and {partition2.id} "
f"into partition {merged.id} "
f"[{merged.start_key}, {merged.end_key})")
else:
i += 1
def print_partitions(self):
print("\n=== Partition Status ===")
for p in self.partitions:
print(f"Partition {p.id}: [{p.start_key}, {p.end_key}), "
f"size={p.size}, keys={len(p.data)}")
# 使用示例
def demo():
system = RangePartitionSystem(split_threshold=500, merge_threshold=100)
# 插入数据触发分裂
print("Inserting data...")
for i in range(100):
system.put(i, "x" * 10) # 每个值 10 字节
system.print_partitions()
# 继续插入,触发更多分裂
for i in range(100, 300):
system.put(i, "y" * 10)
system.print_partitions()
# 范围查询
print("\nRange scan [50, 150):")
results = system.scan(50, 150)
print(f"Found {len(results)} keys")
# 删除一些数据
print("\nDeleting data...")
for i in range(50, 150):
partition = system.find_partition(i)
if partition and i in partition.data:
del partition.data[i]
partition.size -= 10
system.print_partitions()
# 触发合并
print("\nMerging partitions...")
system.merge_partitions()
system.print_partitions()
if __name__ == "__main__":
demo()运行结果:
Inserting data...
=== Partition Status ===
Partition 0: [0, 2147483648), size=1000, keys=100
Split partition 0 [0, 2147483648) into [0, 50) and [50, 2147483648)
=== Partition Status ===
Partition 1: [0, 50), size=500, keys=50
Partition 2: [50, 2147483648), size=2500, keys=250
Split partition 2 [50, 2147483648) into [50, 175) and [175, 2147483648)
Range scan [50, 150):
Found 100 keys
Deleting data...
=== Partition Status ===
Partition 1: [0, 50), size=500, keys=50
Partition 3: [50, 175), size=250, keys=25
Partition 4: [175, 2147483648), size=1250, keys=125
Merging partitions...
Merged partitions 1 and 3 into partition 5 [0, 175)
=== Partition Status ===
Partition 5: [0, 175), size=750, keys=75
Partition 4: [175, 2147483648), size=1250, keys=125
这个示例展示了范围分区的基本机制,包括动态分裂和合并。生产系统需要考虑更多细节:副本、故障恢复、并发控制等。
十、范围分区 vs 哈希分区:决策指南
在实际系统设计中,选择范围分区还是哈希分区,不能一概而论。下表从多个维度对比两种策略,并给出具体的工作负载推荐:
| 维度 | 范围分区 | 哈希分区 |
|---|---|---|
| 范围查询 | 高效,只需访问少量分区 | 必须扫描所有分区 |
| 点查询 | 高效,定位到单一分区 | 高效,哈希直接定位 |
| 写入均匀性 | 差,时序/自增键容易热点 | 好,哈希天然打散 |
| 有序扫描 | 天然支持 | 不支持 |
| 分裂/合并 | 需要动态调整边界 | 通常固定分区数,无需分裂 |
| 实现复杂度 | 高(需要分裂/合并/热点治理) | 低(一致性哈希即可) |
| 元数据开销 | 中等(分区边界随数据变化) | 低(哈希函数固定) |
10.1 按工作负载选择
选择范围分区的场景:
- 时序数据分析:监控系统需要按时间范围聚合指标(如”最近 1 小时的 P99 延迟”),范围分区可将查询限制在少数分区内。典型系统:TiKV、HBase。
- 有序扫描密集型:数据仓库的 ETL 作业、排序合并连接(Sort-Merge Join)等操作依赖数据的有序性。
- 前缀匹配查询:如按用户 ID 前缀查询某个租户的所有数据,范围分区天然支持这种局部性。
- 地理位置数据:地理围栏查询、附近搜索等场景中,将地理编码作为范围键可以提高空间查询效率。
选择哈希分区的场景:
- 纯 KV 点查:缓存系统(如 Memcached、Redis Cluster)的核心操作是 Get/Set,不需要范围查询,哈希分区的均匀性和简单性是最优选择。
- 高写入吞吐量:日志采集系统中,如果不需要按时间范围查询,哈希分区可以避免写入热点。
- 自增主键场景:使用自增 ID 作为键时,范围分区会导致所有写入集中在最后一个分区,哈希分区可以打散。
- 负载均衡优先:对延迟一致性要求极高的场景(如在线广告竞价),哈希分区的均匀负载分布更有优势。
混合策略的场景:
- Cassandra 式复合键:使用哈希分区键 + 范围排序键,兼顾写入均匀和分区内有序扫描。适合 IoT 传感器数据(按设备 ID 哈希,按时间戳排序)。
- 两级分区:先按租户 ID 哈希分配到节点,再在节点内按时间范围分区。适合多租户 SaaS 系统。
十一、总结
范围分区是分布式系统中的重要技术,它在保持数据顺序性的同时实现了水平扩展。本文深入探讨了范围分区的各个方面:
核心机制:
- 键空间划分为连续范围,每个范围一个分区
- 分区元数据维护边界和位置信息
- 客户端通过缓存或查询定位键所属分区
分裂策略:
- 基于大小:超过阈值触发分裂
- 基于热度:检测访问热点并分裂
- 预分裂:根据已知模式提前规划
- 分裂执行:通过 Raft 或其他共识协议保证一致性
合并策略:
- 回收小分区,减少碎片
- 协调相邻分区的合并操作
- 选择合适的时机避免影响性能
热点处理:
- 键加盐分散写入
- 时间戳反转改变访问模式
- 自适应分裂应对热点
- 负载感知路由分散读压力
- 预分裂和哈希前缀的混合方案
生产实现:
- TiKV:Region 作为 Raft Group,PD 协调调度,在线分裂合并
- HBase:RegionServer 管理 Region,Reference 文件延迟数据复制,离线分裂合并
混合方案:
- 复合分区键:哈希 + 范围
- 多级分区:隔离后再范围分区
- 地理分区:基于位置的范围划分
范围分区没有银弹。它在范围查询方面的优势需要付出热点处理和动态调整的代价。选择范围分区还是哈希分区,取决于应用的访问模式。如果范围查询是核心需求,范围分区是正确的选择。如果只需要点查询且负载均衡更重要,哈希分区可能更合适。
在下一篇文章中,我们将探讨二级索引(Secondary Index)的分区策略,这是另一个充满挑战的话题。
参考文献
Corbett, J. C., et al. (2013). “Spanner: Google’s Globally Distributed Database.” ACM Transactions on Computer Systems (TOCS), 31(3).
Chang, F., et al. (2008). “Bigtable: A Distributed Storage System for Structured Data.” ACM Transactions on Computer Systems (TOCS), 26(2).
Huang, D., et al. (2020). “TiDB: A Raft-based HTAP Database.” Proceedings of the VLDB Endowment, 13(12).
Lakshman, A., & Malik, P. (2010). “Cassandra: A Decentralized Structured Storage System.” ACM SIGOPS Operating Systems Review, 44(2).
DeCandia, G., et al. (2007). “Dynamo: Amazon’s Highly Available Key-value Store.” ACM SIGOPS Operating Systems Review, 41(6).
Apache HBase Documentation. “Region Splits.” https://hbase.apache.org/book.html#regions.arch
TiKV Documentation. “Region Merge.” https://tikv.org/docs/
Kleppmann, M. (2017). “Designing Data-Intensive Applications.” O’Reilly Media.
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【分布式系统百科】数据再平衡:固定分区、动态分区与 TiKV 的调度策略
在上一篇文章中,我们讨论了分布式系统中的二级索引问题。本文将深入探讨数据再平衡(Rebalancing)的核心策略和实现细节。当分布式系统运行一段时间后,数据分布可能会变得不均匀,节点可能会加入或离开集群,这时就需要再平衡机制来重新分配数据,保证系统的负载均衡和高可用性。
【分布式系统百科】Raft 深度重写:从论文的 18 页到 etcd 的 15000 行
Raft 论文 18 页就能读完,但 etcd/raft 用了 15000 行 Go 才把它变成能在生产环境跑的代码。这篇文章从论文的每一个核心机制出发,逐一拆解工程实现中论文没说的东西:PreVote、ReadIndex、LeaderTransfer、ConfChange V2、流水线复制、Async Apply,以及 TiKV 的 Multi-Raft 实践。最后做一次精确的 Paxos 对比,并坦诚讨论 Raft 的已知缺陷。