土法炼钢兴趣小组的算法知识备份

【分布式系统百科】范围分区:分裂、合并与热点处理

文章导航

标签入口
#分布式系统百科#分区#范围分区#TiKV#HBase

目录

在前一篇文章中,我们讨论了哈希分区如何通过哈希函数将数据均匀分散到各个节点。然而,哈希分区有一个致命缺陷:它破坏了数据的顺序性。如果你想执行范围查询(Range Query),比如查询”所有用户 ID 在 1000 到 2000 之间的用户”,哈希分区会强迫你扫描所有分区。这在很多场景下是不可接受的。

范围分区(Range Partitioning)应运而生。它将键空间划分为连续的范围,每个范围分配给一个节点。相邻的键会存储在同一个分区中,这使得范围查询和有序扫描变得高效。但范围分区也带来了新的挑战:如何动态调整分区边界?如何处理访问热点?如何在分裂和合并时保持系统的可用性?

本文将深入探讨范围分区的核心机制、分裂与合并策略,以及热点处理的各种技巧。我们将详细分析 TiKV 和 HBase 这两个生产级系统的实现,并提供实际的代码示例。

一、为什么需要范围分区

1.1 哈希分区的局限性

哈希分区通过哈希函数打散数据,实现了负载均衡,但代价是失去了数据的局部性(Locality):

def hash_partition(key, num_partitions):
    # 哈希分区破坏了顺序
    return hash(key) % num_partitions

# 相邻的键被分散到不同分区
partition_of_1000 = hash_partition(1000, 10)  # 可能是 3
partition_of_1001 = hash_partition(1001, 10)  # 可能是 7
partition_of_1002 = hash_partition(1002, 10)  # 可能是 1

# 范围查询需要扫描所有分区
def range_query_with_hash(start, end, num_partitions):
    results = []
    for partition_id in range(num_partitions):
        results.extend(query_partition(partition_id, start, end))
    return results

这种设计在以下场景中会遇到问题:

  1. 时序数据:日志、监控指标、事件流等时序数据通常需要按时间范围查询
  2. 有序扫描:数据库的 ORDER BY、排序合并连接(Sort-Merge Join)等操作需要有序数据
  3. 前缀查询:如查询所有以”user_“开头的键
  4. 邻近访问:某些应用的访问模式具有空间局部性,相邻的键经常一起被访问

1.2 范围分区的优势

范围分区将键空间划分为连续的范围段,每个范围段称为一个分区(Partition)或区域(Region):

Key Space: [0, ∞)
├── Partition 0: [0, 1000)      → Node A
├── Partition 1: [1000, 2000)   → Node B
├── Partition 2: [2000, 3000)   → Node C
└── Partition 3: [3000, ∞)      → Node D

范围分区带来的核心优势:

1. 高效的范围查询

def range_query_with_range_partition(start, end, partition_map):
    # 只需要访问包含 [start, end) 的分区
    affected_partitions = find_overlapping_partitions(
        partition_map, start, end
    )
    results = []
    for partition in affected_partitions:
        results.extend(query_partition(partition, start, end))
    return results

# 示例:查询 [1500, 2500)
# 只需访问 Partition 1 和 Partition 2

2. 有序扫描

客户端可以从第一个分区开始顺序扫描,无需额外排序:

def ordered_scan(partition_map):
    sorted_partitions = sort_partitions_by_start_key(partition_map)
    for partition in sorted_partitions:
        for key, value in scan_partition(partition):
            yield key, value

3. 数据局部性

相关数据聚集在一起,提高了缓存效率和预取效果。例如,用户的所有会话数据可以存储在同一个分区:

User Sessions:
├── user_1000_session_1 ─┐
├── user_1000_session_2  ├─ 同一分区,局部性好
├── user_1000_session_3 ─┘
├── user_1001_session_1 ─┐
└── user_1001_session_2 ─┘─ 同一分区

1.3 范围分区的挑战

范围分区并非完美,它面临以下挑战:

  1. 负载不均:如果数据分布不均匀,某些分区可能过大或过小
  2. 访问热点:顺序写入或热门键会导致单个分区过载
  3. 动态调整:需要在运行时动态分裂和合并分区
  4. 分区路由:客户端需要知道键属于哪个分区

这些问题的解决方案正是本文的核心内容。

二、范围分区的基本机制

2.1 分区元数据

范围分区系统需要维护一个分区表(Partition Table),记录每个分区的边界和位置:

type Partition struct {
    ID        uint64
    StartKey  []byte  // 包含
    EndKey    []byte  // 不包含,开区间
    NodeID    string
    Replicas  []string  // 副本节点
    Version   uint64    // 用于处理并发更新
}

type PartitionTable struct {
    Partitions []Partition
    mu         sync.RWMutex
}

// 查找键所属的分区
func (pt *PartitionTable) FindPartition(key []byte) *Partition {
    pt.mu.RLock()
    defer pt.mu.RUnlock()
    
    // 二分查找
    idx := sort.Search(len(pt.Partitions), func(i int) bool {
        return bytes.Compare(pt.Partitions[i].EndKey, key) > 0
    })
    
    if idx < len(pt.Partitions) && 
       bytes.Compare(pt.Partitions[idx].StartKey, key) <= 0 {
        return &pt.Partitions[idx]
    }
    return nil
}

2.2 分区路由

客户端需要一种机制来定位键所属的分区。常见方案有三种:

方案一:客户端缓存分区表

class RangePartitionClient:
    def __init__(self, metadata_server):
        self.metadata_server = metadata_server
        self.partition_cache = {}
        self.refresh_partition_table()
    
    def refresh_partition_table(self):
        self.partition_cache = self.metadata_server.get_partition_table()
    
    def get(self, key):
        partition = self.find_partition(key)
        if partition is None:
            # 缓存失效,刷新后重试
            self.refresh_partition_table()
            partition = self.find_partition(key)
        
        try:
            return self.query_node(partition.node_id, key)
        except PartitionMovedError as e:
            # 分区已迁移,更新缓存
            self.partition_cache[partition.id] = e.new_partition
            return self.query_node(e.new_partition.node_id, key)
    
    def find_partition(self, key):
        for partition in sorted(self.partition_cache.values(), 
                               key=lambda p: p.start_key):
            if partition.start_key <= key < partition.end_key:
                return partition
        return None

方案二:元数据服务器路由

客户端每次请求都查询元数据服务器:

type MetadataServer struct {
    partitionTable *PartitionTable
}

func (ms *MetadataServer) Route(key []byte) (*Partition, error) {
    partition := ms.partitionTable.FindPartition(key)
    if partition == nil {
        return nil, errors.New("partition not found")
    }
    return partition, nil
}

// 客户端
func (c *Client) Get(key []byte) ([]byte, error) {
    partition, err := c.metadataServer.Route(key)
    if err != nil {
        return nil, err
    }
    return c.queryNode(partition.NodeID, key)
}

方案三:Gossip 协议传播

节点之间通过 Gossip 协议交换分区信息,最终达到一致:

class GossipNode:
    def __init__(self, node_id, peers):
        self.node_id = node_id
        self.peers = peers
        self.partition_table = {}
        self.version = 0
    
    def gossip_loop(self):
        while True:
            peer = random.choice(self.peers)
            # 交换分区表
            peer_table, peer_version = peer.get_partition_table()
            
            if peer_version > self.version:
                self.merge_partition_table(peer_table)
                self.version = peer_version
            
            time.sleep(1)
    
    def merge_partition_table(self, peer_table):
        for partition_id, partition in peer_table.items():
            if (partition_id not in self.partition_table or
                partition.version > self.partition_table[partition_id].version):
                self.partition_table[partition_id] = partition

TiKV 采用方案一,客户端缓存分区表(Region 路由表)并通过 PD(Placement Driver)更新。HBase 则混合使用方案一和方案二,通过 ZooKeeper 和 HBase Meta 表维护路由信息。

2.3 边界键的选择

分区的边界键选择至关重要。理想情况下,我们希望:

  1. 数据均匀分布在各个分区
  2. 访问负载均匀分布
  3. 最小化跨分区操作

静态预分裂:在系统初始化时根据数据分布特征预设边界:

def presplit_by_data_distribution(key_space, num_partitions, distribution):
    """
    根据数据分布预分裂
    distribution: 累积分布函数 (CDF)
    """
    boundaries = []
    for i in range(num_partitions):
        # 找到第 i/n 分位数
        percentile = i / num_partitions
        boundary = distribution.inverse_cdf(percentile)
        boundaries.append(boundary)
    
    boundaries.append(key_space.max)  # 最后一个边界
    return boundaries

# 示例:时间戳键,假设数据按时间均匀增长
def presplit_timestamp_keys(start_ts, end_ts, num_partitions):
    interval = (end_ts - start_ts) / num_partitions
    boundaries = [start_ts + i * interval for i in range(num_partitions)]
    boundaries.append(end_ts)
    return boundaries

动态自适应:系统运行时根据实际负载动态调整边界,这正是分裂和合并要解决的问题。

三、分裂策略:应对增长与热点

分区分裂(Split)是范围分区系统的核心机制。当一个分区变得过大或过热时,系统需要将其一分为二。

3.1 基于大小的分裂

最简单的分裂策略是基于分区大小:当分区超过阈值时触发分裂。

HBase 的大小阈值分裂

HBase 的默认分裂策略是 ConstantSizeRegionSplitPolicy,当 Region 大小超过 hbase.hregion.max.filesize(默认 10GB)时触发分裂:

public class ConstantSizeRegionSplitPolicy extends RegionSplitPolicy {
    private long desiredMaxFileSize;
    
    @Override
    protected void configureForRegion(HRegion region) {
        this.desiredMaxFileSize = region.getTableDesc()
            .getMaxFileSize();
        if (this.desiredMaxFileSize <= 0) {
            this.desiredMaxFileSize = 10L * 1024 * 1024 * 1024; // 10GB
        }
    }
    
    @Override
    protected boolean shouldSplit() {
        long size = 0;
        for (HStore store : region.getStores()) {
            size += store.getSize();
        }
        return size > desiredMaxFileSize;
    }
    
    @Override
    protected byte[] getSplitPoint() {
        // 选择中间键作为分裂点
        byte[] splitPoint = null;
        long largestStoreSize = 0;
        
        for (HStore store : region.getStores()) {
            if (store.getSize() > largestStoreSize) {
                largestStoreSize = store.getSize();
                splitPoint = store.getSplitPoint();
            }
        }
        return splitPoint;
    }
}

TiKV 的大小阈值分裂

TiKV 的 Region 默认大小为 96MB,当 Region 大小超过 144MB(region-max-size)时触发分裂:

// TiKV 中的 Region 大小检查(简化版)
impl<EK: KvEngine, ER: RaftEngine> SplitCheckTask<EK, ER> {
    fn check_split(&self) -> Option<Vec<Vec<u8>>> {
        let region = &self.region;
        let approximate_size = self.get_approximate_size(region);
        
        if approximate_size >= self.cfg.region_max_size.0 {
            // 超过阈值,计算分裂点
            return Some(self.get_split_keys(region));
        }
        None
    }
    
    fn get_split_keys(&self, region: &Region) -> Vec<Vec<u8>> {
        let split_size = self.cfg.region_split_size.0; // 96MB
        let mut split_keys = Vec::new();
        let mut current_size = 0u64;
        
        // 扫描 Region,每 96MB 找一个分裂点
        for (key, value_size) in self.scan_region(region) {
            current_size += value_size;
            if current_size >= split_size {
                split_keys.push(key.to_vec());
                current_size = 0;
            }
        }
        split_keys
    }
}

3.2 中点分裂 vs 加权分裂

找到分裂点的策略有两种:

中点分裂(Midpoint Split)

选择中间位置的键作为分裂点:

def midpoint_split(partition):
    keys = list(partition.keys())
    keys.sort()
    midpoint = len(keys) // 2
    split_key = keys[midpoint]
    
    partition1 = Partition(
        start_key=partition.start_key,
        end_key=split_key
    )
    partition2 = Partition(
        start_key=split_key,
        end_key=partition.end_key
    )
    return partition1, partition2

优点是简单,缺点是如果键大小差异很大,可能导致数据量不均。

加权分裂(Weighted Split)

根据实际数据量找到能均分数据的键:

def weighted_split(partition):
    total_size = partition.total_size()
    target_size = total_size / 2
    
    current_size = 0
    for key in sorted(partition.keys()):
        current_size += partition.get_size(key)
        if current_size >= target_size:
            split_key = key
            break
    
    return create_partitions(partition, split_key)

TiKV 和 HBase 都使用加权分裂,确保分裂后的两个 Region 大小大致相等。

3.3 基于访问热度的分裂

仅基于大小的分裂无法解决热点问题。如果某个键被频繁访问,即使分区很小也会成为瓶颈。

热点检测

系统需要监控每个键的访问频率:

type HotspotDetector struct {
    keyAccessCount map[string]uint64
    windowSize     time.Duration
    threshold      uint64
    mu             sync.RWMutex
}

func (hd *HotspotDetector) RecordAccess(key string) {
    hd.mu.Lock()
    hd.keyAccessCount[key]++
    hd.mu.Unlock()
}

func (hd *HotspotDetector) DetectHotspots() []string {
    hd.mu.RLock()
    defer hd.mu.RUnlock()
    
    var hotkeys []string
    for key, count := range hd.keyAccessCount {
        if count > hd.threshold {
            hotkeys = append(hotkeys, key)
        }
    }
    return hotkeys
}

func (hd *HotspotDetector) ResetWindow() {
    hd.mu.Lock()
    hd.keyAccessCount = make(map[string]uint64)
    hd.mu.Unlock()
}

热点分裂

当检测到热点时,即使分区未达到大小阈值也触发分裂:

class HotspotSplitPolicy:
    def __init__(self, qps_threshold=10000):
        self.qps_threshold = qps_threshold
    
    def should_split(self, partition, stats):
        # 检查总 QPS
        if stats.total_qps > self.qps_threshold:
            return True
        
        # 检查是否有单个键成为热点
        for key, qps in stats.key_qps.items():
            if qps > self.qps_threshold * 0.5:
                return True
        
        return False
    
    def get_split_point(self, partition, stats):
        # 如果有明确的热点键,在热点附近分裂
        hottest_key = max(stats.key_qps.items(), key=lambda x: x[1])[0]
        
        # 找到热点键附近的分裂点
        # 避免将热点键本身作为边界(可能导致分裂后仍然热点)
        split_key = self.find_key_near(partition, hottest_key)
        return split_key

TiKV 实现了负载感知的分裂策略,通过 PD 收集各个 Region 的读写流量,对热点 Region 进行分裂。

3.4 预分裂:提前规划

对于可预测的数据分布和访问模式,预分裂可以避免系统启动初期的大量分裂操作。

时序数据预分裂

def presplit_timeseries(start_time, end_time, num_regions):
    """
    为时序数据预分裂 Region
    键格式: timestamp_sensor_id
    """
    time_range = end_time - start_time
    interval = time_range / num_regions
    
    split_keys = []
    for i in range(1, num_regions):
        split_time = start_time + i * interval
        split_key = f"{split_time}_"
        split_keys.append(split_key)
    
    return create_regions_with_split_keys(split_keys)

# 示例:为未来 30 天的监控数据预分裂 30 个 Region
now = int(time.time())
future_30_days = now + 30 * 24 * 3600
regions = presplit_timeseries(now, future_30_days, 30)

哈希预分裂

为了避免热点,可以在键前加上哈希前缀,然后按哈希前缀预分裂:

def presplit_with_hash_prefix(key_space, num_regions):
    """
    使用哈希前缀预分裂
    实际键: user_12345
    存储键: 03_user_12345 (03 是哈希前缀)
    """
    split_keys = []
    for i in range(1, num_regions):
        # 哈希前缀从 00 到 FF(假设用 16 进制两位)
        prefix = f"{i:02x}_"
        split_keys.append(prefix)
    
    return create_regions_with_split_keys(split_keys)

def encode_key_with_hash(key, num_buckets=256):
    """将键编码为带哈希前缀的格式"""
    hash_value = hash(key) % num_buckets
    prefix = f"{hash_value:02x}_"
    return prefix + key

这种方法牺牲了范围查询能力,但避免了单点热点,适合纯 KV 访问场景。

3.5 分裂的执行过程

分裂不是原子操作,需要精心设计以保持系统可用性。以 TiKV 为例:

sequenceDiagram
    participant PD as PD 调度器
    participant Leader as Region Leader
    participant Follower as Region Follower
    participant Client as 客户端

    PD->>PD: 检测到热点 Region R1
    PD->>Leader: 发送 Split 命令(split_key=m)
    Leader->>Leader: 将 Split 作为 Raft Log 写入
    Leader->>Follower: 复制 Split Log
    Follower-->>Leader: 确认(多数派达成)
    Leader->>Leader: 应用 Split:R1[a,m) + R2[m,z)
    Leader->>Leader: 为 R2 创建新 Raft Group
    Leader-->>PD: 报告分裂完成(R1, R2 的元信息)
    PD->>PD: 更新路由表
    PD-->>Client: 推送路由变更通知
    Client->>Client: 刷新本地路由缓存

这张时序图展示了从热点检测到分裂完成的完整编排流程。PD 作为全局调度器发起分裂决策,Leader 通过 Raft 共识保证分裂操作在所有副本上一致执行。分裂完成后,路由表更新是异步推送的,客户端在收到过期 Epoch 错误时也会主动刷新缓存。

以 TiKV 的具体步骤为例:

1. PD 决定分裂 Region R1 [a, z) → [a, m), [m, z)
2. PD 向 Region R1 的 Leader 发送 Split 命令
3. Leader 将 Split 命令作为 Raft Log 提交
4. 当 Split Log 被多数派确认后:
   a. Leader 创建新 Region R2 [m, z)
   b. R1 的 EndKey 更新为 m,变成 [a, m)
   c. R2 继承 R1 的数据([m, z) 部分)
   d. R2 创建新的 Raft Group
5. Leader 向 PD 报告分裂完成
6. PD 更新路由表,通知客户端

关键点:

// TiKV Region Epoch
type RegionEpoch struct {
    ConfVer uint64  // 配置变更版本(副本增删)
    Version uint64  // 分裂/合并版本
}

// 检查请求是否携带最新 Epoch
func (s *Store) checkEpoch(req *Request) error {
    region := s.getRegion(req.RegionID)
    if region == nil {
        return ErrRegionNotFound
    }
    
    reqEpoch := req.RegionEpoch
    regionEpoch := region.Epoch
    
    if reqEpoch.Version < regionEpoch.Version {
        return &EpochNotMatch{
            Message: "region has been split or merged",
            CurrentRegions: []*Region{region},
        }
    }
    
    return nil
}

四、合并策略:回收空间与减少碎片

分裂解决了分区过大的问题,但也带来了副作用:随着时间推移,删除操作会导致某些分区变得很小,产生大量碎片。分区合并(Merge)通过将相邻的小分区合并为一个来解决这个问题。

4.1 合并的触发条件

条件一:分区大小低于阈值

func (pc *PartitionController) shouldMerge(partition *Partition) bool {
    // TiKV 默认当 Region 小于 1MB 时触发合并
    minSize := pc.config.RegionMinSize // 1MB
    if partition.ApproximateSize < minSize {
        return true
    }
    return false
}

条件二:分区空闲

即使分区很小,如果访问频繁也不应合并:

def should_merge(partition, stats, min_size, min_qps):
    if partition.size < min_size and stats.qps < min_qps:
        return True
    return False

条件三:相邻分区

只有相邻的分区才能合并:

func (pc *PartitionController) findMergeTarget(partition *Partition) *Partition {
    // 寻找左邻居或右邻居
    leftNeighbor := pc.findPartitionByEndKey(partition.StartKey)
    rightNeighbor := pc.findPartitionByStartKey(partition.EndKey)
    
    // 选择更小的邻居
    if leftNeighbor != nil && shouldMerge(leftNeighbor) {
        return leftNeighbor
    }
    if rightNeighbor != nil && shouldMerge(rightNeighbor) {
        return rightNeighbor
    }
    return nil
}

4.2 合并的执行过程

合并比分裂更复杂,因为需要协调两个独立的 Raft Group。

TiKV 的 Region 合并流程

1. PD 决定合并 Region R1 [a, m) 和 R2 [m, z) → R [a, z)
2. PD 向 R2 的 Leader 发送 PrepareMerge 命令
   - R2 停止接收新的 Raft Log(不影响读操作)
   - R2 等待所有 Apply Index 追上 Commit Index
3. PD 向 R1 的 Leader 发送 CommitMerge 命令,携带 R2 的状态
4. R1 Leader 将 CommitMerge 作为 Raft Log 提交
5. 当 CommitMerge 被 Apply 时:
   - R1 吸收 R2 的数据(逻辑上,数据可能已经在同一台机器)
   - R1 的 EndKey 更新为 z
   - R2 被销毁
6. R1 向 PD 报告合并完成
7. PD 更新路由表

关键挑战:

挑战一:保证数据一致性

R2 在 PrepareMerge 后不能接受新写入,否则这些写入会丢失:

// TiKV PrepareMerge 实现(简化)
fn exec_prepare_merge(&mut self, req: &AdminRequest) -> Result<()> {
    let merge_state = MergeState {
        min_index: self.apply_state.get_applied_index(),
        target: req.get_prepare_merge().get_target().clone(),
        commit: 0,
    };
    
    // 标记 Region 为 Merging 状态
    self.region_state = RegionState::Merging(merge_state);
    
    // 拒绝新的写入提案
    self.pending_merge = Some(req.get_prepare_merge().clone());
    
    Ok(())
}

fn propose(&mut self, cmd: &RaftCmdRequest) -> Result<()> {
    // 检查是否处于 Merging 状态
    if self.pending_merge.is_some() {
        return Err(Error::RegionMerging);
    }
    
    // 正常提案流程
    self.raft_group.propose(cmd)?;
    Ok(())
}

挑战二:处理并发分裂和合并

如果 R2 在合并过程中被分裂了怎么办?答案是使用 Region Epoch 检测并重试:

func (r *Region) CommitMerge(source *Region) error {
    // 检查源 Region 的 Epoch 是否匹配
    if source.Epoch.Version != r.expectedSourceEpoch.Version {
        return ErrEpochNotMatch{
            Message: "source region has been split or merged",
        }
    }
    
    // 执行合并
    r.EndKey = source.EndKey
    r.Epoch.Version++
    
    // 销毁源 Region
    source.Destroy()
    
    return nil
}

4.3 HBase 的 Region 合并

HBase 的合并相对简单,因为它通过 Master 协调而不是两个 Region 之间直接通信:

public class HMaster {
    public void mergeRegions(RegionInfo region1, RegionInfo region2) 
            throws IOException {
        // 检查是否相邻
        if (!Bytes.equals(region1.getEndKey(), region2.getStartKey())) {
            throw new IOException("Regions are not adjacent");
        }
        
        // 离线两个 Region
        unassignRegion(region1);
        unassignRegion(region2);
        
        // 合并数据文件(底层是 HDFS 文件)
        Path region1Path = getRegionPath(region1);
        Path region2Path = getRegionPath(region2);
        Path mergedPath = getMergedRegionPath(region1, region2);
        
        fs.rename(region1Path, mergedPath);
        mergeStoreFiles(region2Path, mergedPath);
        fs.delete(region2Path, true);
        
        // 创建新 Region 元数据
        RegionInfo mergedRegion = RegionInfo.newBuilder()
            .setTable(region1.getTable())
            .setStartKey(region1.getStartKey())
            .setEndKey(region2.getEndKey())
            .build();
        
        // 更新 Meta 表
        updateMetaTable(region1, region2, mergedRegion);
        
        // 重新上线合并后的 Region
        assignRegion(mergedRegion);
    }
}

HBase 的方法更直接但也更重:合并期间 Region 完全不可用。TiKV 的方法更复杂但实现了在线合并。

4.4 合并的时机选择

频繁合并会增加系统开销,延迟合并会浪费资源。实践中通常采用以下策略:

策略一:定期检查

class MergeScheduler:
    def __init__(self, check_interval=600):  # 10分钟
        self.check_interval = check_interval
    
    def run(self):
        while True:
            time.sleep(self.check_interval)
            self.check_and_merge()
    
    def check_and_merge(self):
        for partition in self.get_all_partitions():
            if self.should_merge(partition):
                target = self.find_merge_target(partition)
                if target:
                    self.merge(partition, target)

策略二:低峰期合并

在系统负载低时执行合并:

func (ms *MergeScheduler) shouldMergeNow() bool {
    currentHour := time.Now().Hour()
    // 凌晨 2-5 点执行合并
    if currentHour >= 2 && currentHour < 5 {
        return true
    }
    
    // 或者检查系统负载
    if ms.getSystemLoad() < 0.3 {
        return true
    }
    
    return false
}

策略三:批量合并

累积多个待合并的分区,批量执行:

def batch_merge(partitions_to_merge, batch_size=10):
    batches = [partitions_to_merge[i:i+batch_size] 
               for i in range(0, len(partitions_to_merge), batch_size)]
    
    for batch in batches:
        for partition in batch:
            target = find_merge_target(partition)
            if target:
                merge(partition, target)
        time.sleep(60)  # 批次间休息

五、热点处理:让负载更均衡

热点问题是范围分区系统最头疼的问题。即使数据分布均匀,访问模式的不均也会导致某些分区过载。

5.1 热点的成因

成因一:时序写入

日志、监控等时序数据通常使用时间戳作为键,导致写入集中在最新的分区:

Time →
├── Region 1: [2024-01-01, 2024-01-15)  ✓ 完成,空闲
├── Region 2: [2024-01-15, 2024-02-01)  ✓ 完成,空闲
├── Region 3: [2024-02-01, 2024-02-15)  ✓ 完成,空闲
└── Region 4: [2024-02-15, ∞)          🔥 所有写入集中在这里!

成因二:热门键

某些键天然比其他键更热门,比如微博的热搜话题、电商的爆款商品:

Product访问分布:
├── product_1      ───────── 1M QPS  🔥
├── product_2      ── 100K QPS
├── product_3      ── 80K QPS
└── product_1000   ─ 10 QPS

成因三:顺序扫描

某些应用会顺序扫描大范围数据,导致瞬时热点:

# 分析任务:扫描所有用户
for user_id in range(1, 1000000):
    process(get_user(user_id))
    
# 扫描会依次让每个 Region 成为热点

5.2 热点检测指标与诊断

在实施热点治理之前,首先需要具备可靠的热点检测能力。以下是生产环境中常用的检测指标体系:

指标一:每个 Range 的请求速率(QPS/TPS)

最直接的热点信号是单个 Range 的请求速率显著高于集群平均值。通常以 P99 或标准差来衡量偏离程度:

class HotRangeDetector:
    def __init__(self, threshold_ratio=3.0):
        self.threshold_ratio = threshold_ratio

    def detect(self, range_qps: dict) -> list:
        """
        检测热点 Range。
        range_qps: {range_id: qps}
        返回热点 Range 列表。
        """
        if not range_qps:
            return []
        avg_qps = sum(range_qps.values()) / len(range_qps)
        hot_ranges = []
        for range_id, qps in range_qps.items():
            if avg_qps > 0 and qps / avg_qps > self.threshold_ratio:
                hot_ranges.append({
                    "range_id": range_id,
                    "qps": qps,
                    "ratio": qps / avg_qps
                })
        return sorted(hot_ranges, key=lambda x: x["ratio"], reverse=True)

指标二:键分布直方图(Key Distribution Histogram)

通过采样统计每个 Range 内的键分布,识别数据倾斜。如果某个 Range 的键数量远超其他 Range,即使 QPS 相似,未来也很可能成为热点:

class KeyDistributionAnalyzer:
    def __init__(self, num_buckets=64):
        self.num_buckets = num_buckets

    def build_histogram(self, range_key_counts: dict) -> dict:
        """
        构建键分布直方图。
        range_key_counts: {range_id: key_count}
        """
        total_keys = sum(range_key_counts.values())
        avg_keys = total_keys / len(range_key_counts) if range_key_counts else 0
        skew_score = 0.0
        for count in range_key_counts.values():
            skew_score += (count - avg_keys) ** 2
        skew_score = (skew_score / len(range_key_counts)) ** 0.5 if range_key_counts else 0

        return {
            "total_keys": total_keys,
            "avg_keys_per_range": avg_keys,
            "max_keys": max(range_key_counts.values(), default=0),
            "min_keys": min(range_key_counts.values(), default=0),
            "skew_stddev": skew_score,
            "skew_ratio": max(range_key_counts.values(), default=0) / avg_keys if avg_keys > 0 else 0
        }

指标三:资源维度综合评估

单纯的 QPS 不够全面,还需要结合 CPU 使用率、磁盘 I/O、网络带宽等多维指标判断热点类型:

检测指标 阈值参考 热点类型 推荐动作
单 Range QPS > 集群平均 3 倍 可配置 读/写热点 分裂 Range
键数量偏斜比 > 5 可配置 数据倾斜 预分裂或加盐
单 Range CPU 使用率 > 80% 80% 计算热点 分裂或迁移
单 Range 写入字节 > 集群平均 5 倍 可配置 写入热点 加盐或分裂
P99 延迟 > SLA 阈值 业务定义 综合热点 分裂 + 读副本
flowchart TD
    A["检测到热点 Range"] --> B{"CPU 使用率高?"}
    B -->|是| C{"写入集中?"}
    B -->|否| D{"读请求集中?"}
    C -->|是| E["分裂 Range"]
    C -->|否| F["迁移到更强节点"]
    D -->|是| G["增加 Follower Read 副本"]
    D -->|否| H{"键分布倾斜?"}
    H -->|是| I["键加盐 + 预分裂"]
    H -->|否| J["检查扫描类查询"]

上述决策流程图展示了热点治理的核心判断路径。首先区分热点的资源瓶颈类型(CPU、读、写),然后根据具体成因选择对应的治理手段。键分布倾斜导致的热点需要从数据模型层面解决,而非简单的分裂操作。

5.3 解决方案一:键加盐(Key Salting)

在键前加上随机前缀,将写入分散到多个分区:

import hashlib

def salt_key(key, num_salts=10):
    """
    为键添加盐
    原始键: user_12345
    加盐键: 03_user_12345
    """
    salt = hash(key) % num_salts
    return f"{salt:02d}_{key}"

def desalt_key(salted_key):
    """移除盐前缀"""
    return salted_key.split('_', 1)[1]

# 写入
def write_with_salt(key, value, num_salts=10):
    salted_key = salt_key(key, num_salts)
    db.put(salted_key, value)

# 读取(需要尝试所有可能的盐)
def read_with_salt(key, num_salts=10):
    for salt in range(num_salts):
        salted_key = f"{salt:02d}_{key}"
        value = db.get(salted_key)
        if value is not None:
            return value
    return None

加盐的代价是点查询需要尝试所有盐值,范围查询变得不可能。适用于纯 KV 场景。

5.4 解决方案二:时间戳反转

对于时序数据,可以反转时间戳避免写入集中:

def reverse_timestamp(timestamp):
    """
    反转时间戳
    正向: 1609459200 (2021-01-01)
    反向: 8390540799 (从某个未来时间倒数)
    """
    max_timestamp = 9999999999  # 遥远的未来
    return max_timestamp - timestamp

# 存储
def store_event(event_id, timestamp, data):
    reversed_ts = reverse_timestamp(timestamp)
    key = f"{reversed_ts}_{event_id}"
    db.put(key, data)

# 查询最近的事件(反转后最近的事件在键空间前端)
def query_recent_events(limit=100):
    return db.scan(start_key="", end_key="", limit=limit)

这种方法保留了范围查询能力,但改变了查询语义(最近的在前而不是在后)。

5.5 解决方案三:自适应分裂

前面提到的热点分裂策略,通过监控访问频率动态分裂热点分区:

type AdaptiveSplitter struct {
    regions       map[uint64]*Region
    accessStats   *AccessStats
    splitThreshold uint64  // QPS 阈值
}

func (as *AdaptiveSplitter) MonitorLoop() {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        as.checkAndSplitHotspots()
    }
}

func (as *AdaptiveSplitter) checkAndSplitHotspots() {
    for regionID, region := range as.regions {
        qps := as.accessStats.GetRegionQPS(regionID)
        
        if qps > as.splitThreshold {
            // 分析热点分布
            hotKeys := as.accessStats.GetHotKeys(regionID, 10)
            
            if len(hotKeys) > 0 {
                // 在热点键附近分裂
                splitKey := as.findSplitKeyNear(region, hotKeys[0])
                as.splitRegion(region, splitKey)
            } else {
                // 均匀热点,正常分裂
                splitKey := as.findMidpoint(region)
                as.splitRegion(region, splitKey)
            }
        }
    }
}

5.6 解决方案四:负载感知路由

对于读热点,可以通过负载均衡将请求分散到多个副本:

class LoadAwareRouter:
    def __init__(self):
        self.region_replicas = {}  # region_id -> [replica1, replica2, ...]
        self.replica_load = {}      # replica_id -> current_qps
    
    def route_read_request(self, region_id):
        replicas = self.region_replicas[region_id]
        
        # 选择负载最低的副本
        best_replica = min(replicas, 
                          key=lambda r: self.replica_load.get(r, 0))
        
        self.replica_load[best_replica] = \
            self.replica_load.get(best_replica, 0) + 1
        
        return best_replica
    
    def update_load_stats(self):
        # 定期更新负载统计
        for replica_id in self.replica_load:
            # 衰减旧的统计
            self.replica_load[replica_id] *= 0.9

TiKV 的 Follower Read 特性允许从 Follower 副本读取数据,缓解 Leader 的读压力。

5.7 解决方案五:预分裂与哈希前缀

对于已知的热点模式,可以预先分裂并使用哈希前缀:

class HashPrefixedTimeSeries:
    def __init__(self, num_buckets=16):
        self.num_buckets = num_buckets
        self.presplit()
    
    def presplit(self):
        """预分裂 Region,每个哈希前缀一个 Region"""
        split_keys = []
        for i in range(1, self.num_buckets):
            prefix = f"{i:02x}_"
            split_keys.append(prefix)
        create_regions_with_splits(split_keys)
    
    def encode_key(self, sensor_id, timestamp):
        """
        组合哈希前缀和时间戳
        键格式: hash_timestamp_sensor_id
        """
        bucket = hash(sensor_id) % self.num_buckets
        prefix = f"{bucket:02x}_"
        return f"{prefix}{timestamp}_{sensor_id}"
    
    def write(self, sensor_id, timestamp, data):
        key = self.encode_key(sensor_id, timestamp)
        db.put(key, data)
    
    def query_sensor(self, sensor_id, start_time, end_time):
        """查询特定传感器的时序数据"""
        bucket = hash(sensor_id) % self.num_buckets
        prefix = f"{bucket:02x}_"
        
        start_key = f"{prefix}{start_time}_{sensor_id}"
        end_key = f"{prefix}{end_time}_{sensor_id}~"  # ~ 在字典序中较大
        
        return db.scan(start_key, end_key)

这种方法在保留时间范围查询能力的同时,将写入分散到多个分区。代价是跨传感器的全局时间范围查询需要扫描所有哈希桶。

六、TiKV Region 的分裂与合并

TiKV 是 PingCAP 开发的分布式 KV 存储,采用范围分区(称为 Region)。让我们深入分析其实现。

6.1 Region 的基本结构

TiKV 的数据空间被划分为 Region,每个 Region 负责一个连续的键范围:

pub struct Region {
    pub id: u64,
    pub start_key: Vec<u8>,
    pub end_key: Vec<u8>,
    pub region_epoch: RegionEpoch,
    pub peers: Vec<Peer>,  // 副本列表
}

pub struct RegionEpoch {
    pub conf_ver: u64,  // 配置变更版本
    pub version: u64,   // Region 版本(分裂/合并时递增)
}

pub struct Peer {
    pub id: u64,
    pub store_id: u64,  // 所在的存储节点
}

每个 Region 是一个独立的 Raft Group,有自己的 Leader 和 Follower。

6.2 PD 的作用

PD(Placement Driver)是 TiKV 集群的大脑,负责:

  1. 元数据管理:维护 Region 路由表
  2. 调度决策:决定何时分裂、合并、迁移 Region
  3. 时间戳分配:为事务分配全局唯一时间戳
  4. 负载均衡:监控各节点负载,调度 Region 分布
type PDServer struct {
    regionTree *RegionTree  // 维护所有 Region 的索引
    stores     map[uint64]*StoreInfo
    scheduler  *Scheduler
}

type RegionTree struct {
    tree *btree.BTree  // 按 StartKey 排序的 B 树
}

// PD 查找键所属的 Region
func (pd *PDServer) GetRegion(key []byte) *Region {
    item := pd.regionTree.tree.Search(key)
    if item == nil {
        return nil
    }
    return item.(*Region)
}

// PD 监控 Region 统计信息
func (pd *PDServer) HandleRegionHeartbeat(req *RegionHeartbeat) {
    region := req.GetRegion()
    
    // 更新 Region 信息
    pd.regionTree.Update(region)
    
    // 检查是否需要分裂
    if req.ApproximateSize > pd.cfg.RegionMaxSize {
        pd.scheduler.ScheduleSplit(region)
    }
    
    // 检查是否需要合并
    if req.ApproximateSize < pd.cfg.RegionMinSize {
        pd.scheduler.ScheduleMerge(region)
    }
    
    // 检查是否需要迁移(负载均衡)
    if pd.shouldTransferRegion(region, req.Load) {
        pd.scheduler.ScheduleTransfer(region)
    }
}

6.3 Region Split 的详细流程

第一步:检查分裂条件

TiKV 节点定期检查 Region 大小:

impl<EK: KvEngine, ER: RaftEngine> SplitCheckRunner<EK, ER> {
    fn run(&mut self, task: SplitCheckTask<EK, ER>) {
        let region = task.region;
        let approximate_size = self.get_approximate_size(&region);
        
        if approximate_size >= self.cfg.region_max_size {
            // 需要分裂,计算分裂键
            let split_keys = self.get_split_keys(&region);
            
            if !split_keys.is_empty() {
                // 向 Region Leader 发送分裂请求
                self.router.send(
                    region.id,
                    PeerMsg::SplitRegion { split_keys }
                );
            }
        }
    }
    
    fn get_split_keys(&self, region: &Region) -> Vec<Vec<u8>> {
        let mut split_keys = Vec::new();
        let split_size = self.cfg.region_split_size;
        let mut accumulated_size = 0;
        
        // 扫描 Region,寻找分裂点
        let start_key = keys::data_key(region.get_start_key());
        let end_key = keys::data_key(region.get_end_key());
        
        let mut iter = self.engine.iterator(&start_key, &end_key);
        while iter.valid() {
            accumulated_size += iter.value().len() as u64;
            
            if accumulated_size >= split_size {
                split_keys.push(keys::origin_key(iter.key()).to_vec());
                accumulated_size = 0;
            }
            
            iter.next();
        }
        
        split_keys
    }
}

第二步:提交 Raft Admin Command

Leader 收到分裂请求后,将其作为 Admin Command 提交到 Raft:

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
    fn propose_split(&mut self, split_keys: Vec<Vec<u8>>) {
        // 构造 Split 命令
        let mut req = AdminRequest::default();
        let mut split = SplitRequest::default();
        
        split.set_split_key(split_keys[0].clone());
        split.set_new_region_id(self.next_region_id());
        split.set_new_peer_ids(self.next_peer_ids());
        
        req.set_cmd_type(AdminCmdType::Split);
        req.set_split(split);
        
        // 提交到 Raft
        self.propose_raft_command(req);
    }
    
    fn on_ready_split_region(&mut self, split: &SplitRequest) {
        let region = self.region.clone();
        let split_key = split.get_split_key();
        
        // 创建新 Region
        let mut new_region = Region::default();
        new_region.set_id(split.get_new_region_id());
        new_region.set_start_key(split_key.to_vec());
        new_region.set_end_key(region.get_end_key().to_vec());
        new_region.mut_region_epoch().set_version(1);
        new_region.set_peers(split.get_new_peer_ids().into());
        
        // 更新原 Region
        let mut updated_region = region.clone();
        updated_region.set_end_key(split_key.to_vec());
        updated_region.mut_region_epoch().set_version(
            region.get_region_epoch().get_version() + 1
        );
        
        // 应用分裂
        self.region = updated_region;
        self.create_peer_for_region(new_region);
        
        // 通知 PD
        self.report_split_to_pd(new_region);
    }
}

第三步:路由更新

客户端通过 Region Epoch 检测分裂:

type TiKVClient struct {
    regionCache *RegionCache
}

func (c *TiKVClient) Get(key []byte) ([]byte, error) {
    region := c.regionCache.LocateKey(key)
    
    req := &GetRequest{
        Key:         key,
        RegionId:    region.ID,
        RegionEpoch: region.Epoch,
    }
    
    resp, err := c.sendRequest(region.Leader, req)
    if err != nil {
        return nil, err
    }
    
    // 检查 Epoch
    if resp.RegionError != nil && resp.RegionError.EpochNotMatch != nil {
        // Region 已分裂,更新缓存
        newRegions := resp.RegionError.EpochNotMatch.CurrentRegions
        c.regionCache.UpdateRegions(newRegions)
        
        // 重试
        return c.Get(key)
    }
    
    return resp.Value, nil
}

6.4 Region Merge 的实现

Region 合并更复杂,涉及两个 Raft Group 的协调:

// 第一步:PrepareMerge
fn propose_prepare_merge(&mut self, target_region: Region) {
    let mut req = AdminRequest::default();
    let mut prepare_merge = PrepareMergeRequest::default();
    
    prepare_merge.set_min_index(self.get_apply_index());
    prepare_merge.set_target(target_region);
    
    req.set_cmd_type(AdminCmdType::PrepareMerge);
    req.set_prepare_merge(prepare_merge);
    
    self.propose_raft_command(req);
}

fn on_ready_prepare_merge(&mut self, req: &PrepareMergeRequest) {
    // 标记为 Merging 状态
    self.pending_merge_state = Some(MergeState {
        min_index: req.get_min_index(),
        target: req.get_target().clone(),
    });
    
    // 拒绝新的写入
    self.is_merging = true;
}

// 第二步:CommitMerge
fn propose_commit_merge(&mut self, source_region: Region, source_state: MergeState) {
    let mut req = AdminRequest::default();
    let mut commit_merge = CommitMergeRequest::default();
    
    commit_merge.set_source(source_region);
    commit_merge.set_commit(source_state.min_index);
    
    req.set_cmd_type(AdminCmdType::CommitMerge);
    req.set_commit_merge(commit_merge);
    
    self.propose_raft_command(req);
}

fn on_ready_commit_merge(&mut self, req: &CommitMergeRequest) {
    let source = req.get_source();
    
    // 检查 Epoch
    if source.get_region_epoch().get_version() != 
       self.pending_merge_target_epoch.get_version() {
        // Epoch 不匹配,源 Region 已变化,取消合并
        return;
    }
    
    // 扩展键范围
    self.region.set_end_key(source.get_end_key().to_vec());
    self.region.mut_region_epoch().set_version(
        self.region.get_region_epoch().get_version() + 1
    );
    
    // 销毁源 Region
    self.destroy_source_region(source.get_id());
    
    // 通知 PD
    self.report_merge_to_pd();
}

6.5 TiKV 的负载均衡

PD 通过调度器实现负载均衡:

type Scheduler struct {
    name string
    cfg  *SchedulerConfig
}

// 热点调度器
type HotRegionScheduler struct {
    Scheduler
    hotRegions map[uint64]*HotRegion  // region_id -> hot_info
}

func (hs *HotRegionScheduler) Schedule(cluster *Cluster) []*Operator {
    var ops []*Operator
    
    // 找出热点 Region
    for _, region := range hs.hotRegions {
        if region.FlowBytes > hs.cfg.HotRegionThreshold {
            // 尝试分裂热点 Region
            if region.Size < hs.cfg.MaxRegionSize {
                op := CreateSplitOperator(region)
                ops = append(ops, op)
            }
            
            // 或者迁移到负载更低的节点
            targetStore := cluster.SelectTargetStore(region)
            if targetStore != nil {
                op := CreateTransferLeaderOperator(region, targetStore)
                ops = append(ops, op)
            }
        }
    }
    
    return ops
}

// 负载均衡调度器
type BalanceRegionScheduler struct {
    Scheduler
}

func (bs *BalanceRegionScheduler) Schedule(cluster *Cluster) []*Operator {
    // 找出负载最高和最低的节点
    stores := cluster.GetStores()
    sort.Slice(stores, func(i, j int) bool {
        return stores[i].RegionCount > stores[j].RegionCount
    })
    
    sourceStore := stores[0]  // 负载最高
    targetStore := stores[len(stores)-1]  // 负载最低
    
    // 选择一个 Region 从 source 迁移到 target
    region := sourceStore.SelectRegionToMove()
    if region != nil {
        return []*Operator{
            CreateMoveRegionOperator(region, targetStore),
        }
    }
    
    return nil
}

七、HBase Region 的分裂与合并

HBase 是 Hadoop 生态的分布式数据库,也采用范围分区(称为 Region)。其实现与 TiKV 有显著不同。

7.1 HBase 的架构

HBase 架构:
├── HMaster: 协调者,负责 Region 分配和负载均衡
├── RegionServer: 数据节点,每个节点管理多个 Region
│   ├── Region 1: [a, m)
│   ├── Region 2: [m, z)
│   └── WAL: 写前日志
├── ZooKeeper: 协调服务,存储元数据
└── HDFS: 底层存储,存储 HFile(数据文件)
public class HRegion {
    private final RegionInfo regionInfo;
    private final Map<byte[], HStore> stores;  // Column Family -> Store
    private final WAL wal;
    private final FileSystem fs;
    
    // Region 元数据
    public static class RegionInfo {
        private final byte[] tableName;
        private final byte[] startKey;
        private final byte[] endKey;
        private final long regionId;
    }
    
    // Store 对应一个 Column Family
    public static class HStore {
        private final byte[] family;
        private final List<HFile> storeFiles;  // 数据文件
        private final MemStore memStore;       // 内存缓冲
    }
}

7.2 HBase 的分裂策略

HBase 提供多种分裂策略,通过 RegionSplitPolicy 配置:

ConstantSizeRegionSplitPolicy:固定大小阈值

public class ConstantSizeRegionSplitPolicy extends RegionSplitPolicy {
    private long desiredMaxFileSize;
    
    @Override
    protected void configureForRegion(HRegion region) {
        this.desiredMaxFileSize = region.getTableDescriptor()
            .getMaxFileSize();
        if (this.desiredMaxFileSize <= 0) {
            this.desiredMaxFileSize = 10L * 1024 * 1024 * 1024; // 10GB
        }
    }
    
    @Override
    protected boolean shouldSplit() {
        long totalSize = 0;
        for (HStore store : region.getStores()) {
            totalSize += store.getSize();
        }
        return totalSize > desiredMaxFileSize;
    }
}

IncreasingToUpperBoundRegionSplitPolicy:随表增长调整阈值

public class IncreasingToUpperBoundRegionSplitPolicy 
        extends RegionSplitPolicy {
    
    @Override
    protected boolean shouldSplit() {
        // 表的 Region 数量越少,分裂阈值越低
        int regionCount = getRegionCountOfTable(region.getTableName());
        
        long sizeToCheck = desiredMaxFileSize;
        if (regionCount == 1) {
            sizeToCheck = initialSize;  // 例如 256MB
        } else if (regionCount < 100) {
            // min(maxFileSize, initialSize * regionCount^3)
            sizeToCheck = Math.min(
                desiredMaxFileSize,
                initialSize * regionCount * regionCount * regionCount
            );
        }
        
        return region.getSize() > sizeToCheck;
    }
}

这种策略在表初期更激进地分裂,避免初始单个 Region 成为瓶颈。

SteppingSplitPolicy:阶梯式分裂

public class SteppingSplitPolicy extends RegionSplitPolicy {
    @Override
    protected boolean shouldSplit() {
        int regionCount = getRegionCountOfTable(region.getTableName());
        
        if (regionCount <= 1) {
            return region.getSize() > 128 * 1024 * 1024; // 128MB
        } else if (regionCount <= 10) {
            return region.getSize() > 512 * 1024 * 1024; // 512MB
        } else {
            return region.getSize() > desiredMaxFileSize; // 10GB
        }
    }
}

7.3 HBase Region Split 的执行流程

HBase 的分裂过程涉及 WAL 分裂和数据文件处理:

public class HRegion {
    public void split(byte[] splitKey) throws IOException {
        // 第一步:关闭 Region,停止写入
        this.close();
        
        // 第二步:分裂 WAL
        Path regionDir = getRegionDir();
        Path splitLogDir = new Path(regionDir, "splitlog");
        fs.mkdirs(splitLogDir);
        
        // 将 WAL 按分裂键分成两部分
        splitWAL(wal, splitKey, splitLogDir);
        
        // 第三步:创建两个子 Region
        RegionInfo daughterA = RegionInfo.newBuilder()
            .setTable(regionInfo.getTable())
            .setStartKey(regionInfo.getStartKey())
            .setEndKey(splitKey)
            .build();
        
        RegionInfo daughterB = RegionInfo.newBuilder()
            .setTable(regionInfo.getTable())
            .setStartKey(splitKey)
            .setEndKey(regionInfo.getEndKey())
            .build();
        
        // 第四步:创建 Reference 文件而不是立即复制数据
        createReferenceFiles(daughterA, daughterB);
        
        // 第五步:更新 Meta 表
        updateMetaTable(regionInfo, daughterA, daughterB);
        
        // 第六步:打开子 Region
        HRegion regionA = openRegion(daughterA);
        HRegion regionB = openRegion(daughterB);
        
        // 第七步:通知 HMaster
        reportSplit(daughterA, daughterB);
    }
    
    private void createReferenceFiles(RegionInfo daughterA, 
                                      RegionInfo daughterB) 
            throws IOException {
        // Reference 文件指向父 Region 的数据文件
        // 避免立即复制数据
        for (HStore store : stores.values()) {
            for (HFile file : store.getStoreFiles()) {
                // 为 daughterA 创建 top reference(指向前半部分)
                createReference(file, daughterA, Reference.Type.TOP);
                
                // 为 daughterB 创建 bottom reference(指向后半部分)
                createReference(file, daughterB, Reference.Type.BOTTOM);
            }
        }
    }
}

// Reference 文件
public class Reference {
    enum Type { TOP, BOTTOM }
    
    private final Path parentFile;
    private final byte[] splitKey;
    private final Type type;
    
    // 读取 Reference 文件时过滤数据
    public Iterator<KeyValue> iterator() {
        Iterator<KeyValue> parentIter = openParentFile();
        
        return new FilteringIterator(parentIter, kv -> {
            if (type == Type.TOP) {
                return kv.getKey() < splitKey;
            } else {
                return kv.getKey() >= splitKey;
            }
        });
    }
}

HBase 使用 Reference 文件延迟数据复制,分裂可以快速完成。真正的数据分离在后台 Compaction 时进行:

public class Compaction {
    public void compact(HStore store) throws IOException {
        List<HFile> filesToCompact = store.selectFilesToCompact();
        
        // 检查是否有 Reference 文件
        boolean hasReference = filesToCompact.stream()
            .anyMatch(HFile::isReference);
        
        if (hasReference) {
            // Compaction 时解引用,生成真实的数据文件
            HFile newFile = compactWithDereference(filesToCompact);
            
            // 删除旧的 Reference 文件
            for (HFile file : filesToCompact) {
                file.delete();
            }
            
            store.addFile(newFile);
        } else {
            // 正常 Compaction
            HFile newFile = compactNormally(filesToCompact);
            store.addFile(newFile);
        }
    }
}

7.4 HBase Region 合并

HBase 的合并由 HMaster 协调:

public class HMaster {
    public void mergeRegions(RegionInfo region1, RegionInfo region2) 
            throws IOException {
        // 验证 Region 相邻
        if (!Bytes.equals(region1.getEndKey(), region2.getStartKey())) {
            throw new IOException("Regions are not adjacent");
        }
        
        // 关闭两个 Region
        RegionServer rs1 = getRegionServer(region1);
        RegionServer rs2 = getRegionServer(region2);
        
        rs1.closeRegion(region1);
        if (rs1 != rs2) {
            rs2.closeRegion(region2);
        }
        
        // 合并数据
        Path region1Dir = getRegionDir(region1);
        Path region2Dir = getRegionDir(region2);
        
        RegionInfo mergedRegion = RegionInfo.newBuilder()
            .setTable(region1.getTable())
            .setStartKey(region1.getStartKey())
            .setEndKey(region2.getEndKey())
            .build();
        
        Path mergedDir = getRegionDir(mergedRegion);
        
        // 移动数据文件
        fs.rename(region1Dir, mergedDir);
        
        // 合并 region2 的数据
        for (String family : region2.getFamilies()) {
            Path srcFamily = new Path(region2Dir, family);
            Path dstFamily = new Path(mergedDir, family);
            
            if (fs.exists(srcFamily)) {
                fs.mkdirs(dstFamily);
                for (FileStatus file : fs.listStatus(srcFamily)) {
                    fs.rename(file.getPath(), 
                             new Path(dstFamily, file.getPath().getName()));
                }
            }
        }
        
        // 删除 region2 目录
        fs.delete(region2Dir, true);
        
        // 更新 Meta 表
        deleteFromMeta(region1);
        deleteFromMeta(region2);
        addToMeta(mergedRegion);
        
        // 打开合并后的 Region
        assignRegion(mergedRegion);
    }
}

HBase 的合并是离线操作,期间 Region 不可用。相比之下,TiKV 的在线合并更加优雅,但实现也更复杂。

八、混合分区方案

纯粹的范围分区和哈希分区都有局限性。实践中,很多系统采用混合方案。

8.1 复合分区键(Cassandra)

Cassandra 使用复合分区键:第一部分哈希,第二部分范围:

CREATE TABLE sensor_data (
    sensor_id INT,
    timestamp BIGINT,
    value DOUBLE,
    PRIMARY KEY ((sensor_id), timestamp)
);

-- (sensor_id) 是 Partition Key,使用哈希分区
-- timestamp 是 Clustering Key,在分区内范围排序

实现原理:

class CassandraPartitioner:
    def __init__(self, num_partitions):
        self.num_partitions = num_partitions
    
    def get_partition(self, partition_key, clustering_key):
        # 哈希分区键决定分区
        partition_id = hash(partition_key) % self.num_partitions
        return partition_id
    
    def store(self, partition_key, clustering_key, value):
        partition_id = self.get_partition(partition_key, clustering_key)
        
        # 在分区内按 clustering key 范围存储
        partition = self.get_partition_storage(partition_id)
        partition.put(clustering_key, value)
    
    def query_range(self, partition_key, start_cluster, end_cluster):
        # 可以高效范围查询同一分区内的数据
        partition_id = self.get_partition(partition_key, None)
        partition = self.get_partition_storage(partition_id)
        
        return partition.scan(start_cluster, end_cluster)

这种设计的优势:

  1. 负载均衡:哈希分区键打散数据
  2. 高效范围查询:在分区内按 Clustering Key 范围查询
  3. 局部性:相同 Partition Key 的数据在同一分区

限制:只能查询特定 Partition Key 的范围,不能跨 Partition Key 范围查询。

8.2 多级分区

先按一个维度哈希分区,再按另一个维度范围分区:

class HierarchicalPartitioner:
    def __init__(self, num_hash_partitions, region_size):
        self.num_hash_partitions = num_hash_partitions
        self.region_size = region_size
        self.regions = {}  # (hash_partition, region_id) -> Region
    
    def get_partition(self, tenant_id, user_id):
        # 第一级:按 tenant_id 哈希
        hash_partition = hash(tenant_id) % self.num_hash_partitions
        
        # 第二级:按 user_id 范围查找 Region
        region = self.find_region(hash_partition, user_id)
        
        return (hash_partition, region.id)
    
    def find_region(self, hash_partition, key):
        # 在哈希分区内查找键所属的 Region
        for region in self.get_regions_of_hash_partition(hash_partition):
            if region.start_key <= key < region.end_key:
                return region
        return None
    
    def split_region_if_needed(self, hash_partition, region):
        if region.size > self.region_size:
            # 只在第二级(范围分区)进行分裂
            mid_key = region.get_midpoint_key()
            region1, region2 = region.split(mid_key)
            
            self.regions[(hash_partition, region1.id)] = region1
            self.regions[(hash_partition, region2.id)] = region2

这种方案在多租户系统中很有用:

8.3 地理分区

根据地理位置进行范围分区,结合就近路由:

class GeoPartitioner:
    def __init__(self):
        self.regions = [
            Region("us-west", lat_range=(32, 42), lon_range=(-125, -110)),
            Region("us-east", lat_range=(36, 45), lon_range=(-80, -70)),
            Region("eu-west", lat_range=(48, 56), lon_range=(-10, 5)),
            Region("asia-east", lat_range=(30, 40), lon_range=(110, 125)),
        ]
    
    def get_region(self, lat, lon):
        for region in self.regions:
            if (region.lat_range[0] <= lat < region.lat_range[1] and
                region.lon_range[0] <= lon < region.lon_range[1]):
                return region
        return None
    
    def query_nearby(self, lat, lon, radius_km):
        # 计算范围
        lat_range = (lat - radius_km / 111, lat + radius_km / 111)
        lon_range = (
            lon - radius_km / (111 * cos(radians(lat))),
            lon + radius_km / (111 * cos(radians(lat)))
        )
        
        # 找到所有相关的 Region
        affected_regions = []
        for region in self.regions:
            if self.ranges_overlap(region.lat_range, lat_range) and \
               self.ranges_overlap(region.lon_range, lon_range):
                affected_regions.append(region)
        
        # 查询所有相关 Region
        results = []
        for region in affected_regions:
            results.extend(region.query(lat_range, lon_range))
        
        return results

地理分区的优势是数据局部性和就近访问,适合 LBS(Location-Based Service)应用。

九、实战:实现一个简单的范围分区系统

让我们实现一个简化的范围分区系统,包含分裂和合并功能:

import bisect
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

@dataclass
class Partition:
    id: int
    start_key: int
    end_key: int
    data: Dict[int, str]
    size: int = 0
    
    def contains(self, key: int) -> bool:
        return self.start_key <= key < self.end_key
    
    def put(self, key: int, value: str):
        if not self.contains(key):
            raise ValueError(f"Key {key} not in partition range")
        
        old_size = len(self.data.get(key, ""))
        self.data[key] = value
        self.size += len(value) - old_size
    
    def get(self, key: int) -> Optional[str]:
        return self.data.get(key)
    
    def scan(self, start: int, end: int) -> List[Tuple[int, str]]:
        return [(k, v) for k, v in sorted(self.data.items())
                if start <= k < end]

class RangePartitionSystem:
    def __init__(self, split_threshold: int = 1000, 
                 merge_threshold: int = 100):
        self.partitions: List[Partition] = []
        self.split_threshold = split_threshold
        self.merge_threshold = merge_threshold
        self.next_partition_id = 0
        self.lock = threading.RLock()
        
        # 初始化一个覆盖整个键空间的分区
        self.add_partition(Partition(
            id=self.next_partition_id,
            start_key=0,
            end_key=2**31,
            data={}
        ))
        self.next_partition_id += 1
    
    def add_partition(self, partition: Partition):
        with self.lock:
            # 按 start_key 有序插入
            idx = bisect.bisect_left(
                [p.start_key for p in self.partitions],
                partition.start_key
            )
            self.partitions.insert(idx, partition)
    
    def find_partition(self, key: int) -> Optional[Partition]:
        with self.lock:
            # 二分查找
            idx = bisect.bisect_right(
                [p.start_key for p in self.partitions],
                key
            ) - 1
            
            if idx >= 0 and idx < len(self.partitions):
                partition = self.partitions[idx]
                if partition.contains(key):
                    return partition
            return None
    
    def put(self, key: int, value: str):
        partition = self.find_partition(key)
        if partition is None:
            raise ValueError(f"No partition found for key {key}")
        
        partition.put(key, value)
        
        # 检查是否需要分裂
        if partition.size > self.split_threshold:
            self.split_partition(partition)
    
    def get(self, key: int) -> Optional[str]:
        partition = self.find_partition(key)
        if partition is None:
            return None
        return partition.get(key)
    
    def scan(self, start_key: int, end_key: int) -> List[Tuple[int, str]]:
        results = []
        
        with self.lock:
            for partition in self.partitions:
                # 检查分区是否与查询范围重叠
                if partition.start_key < end_key and partition.end_key > start_key:
                    # 计算重叠范围
                    scan_start = max(start_key, partition.start_key)
                    scan_end = min(end_key, partition.end_key)
                    results.extend(partition.scan(scan_start, scan_end))
        
        return sorted(results)
    
    def split_partition(self, partition: Partition):
        with self.lock:
            if len(partition.data) == 0:
                return
            
            # 找到中点键(按数据量加权)
            sorted_keys = sorted(partition.data.keys())
            mid_idx = len(sorted_keys) // 2
            split_key = sorted_keys[mid_idx]
            
            # 创建两个新分区
            partition1 = Partition(
                id=self.next_partition_id,
                start_key=partition.start_key,
                end_key=split_key,
                data={}
            )
            self.next_partition_id += 1
            
            partition2 = Partition(
                id=self.next_partition_id,
                start_key=split_key,
                end_key=partition.end_key,
                data={}
            )
            self.next_partition_id += 1
            
            # 分配数据
            for key, value in partition.data.items():
                if key < split_key:
                    partition1.put(key, value)
                else:
                    partition2.put(key, value)
            
            # 替换旧分区
            self.partitions.remove(partition)
            self.add_partition(partition1)
            self.add_partition(partition2)
            
            print(f"Split partition {partition.id} "
                  f"[{partition.start_key}, {partition.end_key}) into "
                  f"[{partition1.start_key}, {partition1.end_key}) and "
                  f"[{partition2.start_key}, {partition2.end_key})")
    
    def merge_partitions(self):
        with self.lock:
            i = 0
            while i < len(self.partitions) - 1:
                partition1 = self.partitions[i]
                partition2 = self.partitions[i + 1]
                
                # 检查是否相邻且都小于合并阈值
                if (partition1.end_key == partition2.start_key and
                    partition1.size < self.merge_threshold and
                    partition2.size < self.merge_threshold):
                    
                    # 合并
                    merged = Partition(
                        id=self.next_partition_id,
                        start_key=partition1.start_key,
                        end_key=partition2.end_key,
                        data={}
                    )
                    self.next_partition_id += 1
                    
                    # 合并数据
                    for key, value in partition1.data.items():
                        merged.put(key, value)
                    for key, value in partition2.data.items():
                        merged.put(key, value)
                    
                    # 替换
                    self.partitions.pop(i)
                    self.partitions.pop(i)
                    self.add_partition(merged)
                    
                    print(f"Merged partitions {partition1.id} and {partition2.id} "
                          f"into partition {merged.id} "
                          f"[{merged.start_key}, {merged.end_key})")
                else:
                    i += 1
    
    def print_partitions(self):
        print("\n=== Partition Status ===")
        for p in self.partitions:
            print(f"Partition {p.id}: [{p.start_key}, {p.end_key}), "
                  f"size={p.size}, keys={len(p.data)}")

# 使用示例
def demo():
    system = RangePartitionSystem(split_threshold=500, merge_threshold=100)
    
    # 插入数据触发分裂
    print("Inserting data...")
    for i in range(100):
        system.put(i, "x" * 10)  # 每个值 10 字节
    
    system.print_partitions()
    
    # 继续插入,触发更多分裂
    for i in range(100, 300):
        system.put(i, "y" * 10)
    
    system.print_partitions()
    
    # 范围查询
    print("\nRange scan [50, 150):")
    results = system.scan(50, 150)
    print(f"Found {len(results)} keys")
    
    # 删除一些数据
    print("\nDeleting data...")
    for i in range(50, 150):
        partition = system.find_partition(i)
        if partition and i in partition.data:
            del partition.data[i]
            partition.size -= 10
    
    system.print_partitions()
    
    # 触发合并
    print("\nMerging partitions...")
    system.merge_partitions()
    system.print_partitions()

if __name__ == "__main__":
    demo()

运行结果:

Inserting data...

=== Partition Status ===
Partition 0: [0, 2147483648), size=1000, keys=100

Split partition 0 [0, 2147483648) into [0, 50) and [50, 2147483648)

=== Partition Status ===
Partition 1: [0, 50), size=500, keys=50
Partition 2: [50, 2147483648), size=2500, keys=250

Split partition 2 [50, 2147483648) into [50, 175) and [175, 2147483648)

Range scan [50, 150):
Found 100 keys

Deleting data...

=== Partition Status ===
Partition 1: [0, 50), size=500, keys=50
Partition 3: [50, 175), size=250, keys=25
Partition 4: [175, 2147483648), size=1250, keys=125

Merging partitions...
Merged partitions 1 and 3 into partition 5 [0, 175)

=== Partition Status ===
Partition 5: [0, 175), size=750, keys=75
Partition 4: [175, 2147483648), size=1250, keys=125

这个示例展示了范围分区的基本机制,包括动态分裂和合并。生产系统需要考虑更多细节:副本、故障恢复、并发控制等。

十、范围分区 vs 哈希分区:决策指南

在实际系统设计中,选择范围分区还是哈希分区,不能一概而论。下表从多个维度对比两种策略,并给出具体的工作负载推荐:

维度 范围分区 哈希分区
范围查询 高效,只需访问少量分区 必须扫描所有分区
点查询 高效,定位到单一分区 高效,哈希直接定位
写入均匀性 差,时序/自增键容易热点 好,哈希天然打散
有序扫描 天然支持 不支持
分裂/合并 需要动态调整边界 通常固定分区数,无需分裂
实现复杂度 高(需要分裂/合并/热点治理) 低(一致性哈希即可)
元数据开销 中等(分区边界随数据变化) 低(哈希函数固定)

10.1 按工作负载选择

选择范围分区的场景

  1. 时序数据分析:监控系统需要按时间范围聚合指标(如”最近 1 小时的 P99 延迟”),范围分区可将查询限制在少数分区内。典型系统:TiKV、HBase。
  2. 有序扫描密集型:数据仓库的 ETL 作业、排序合并连接(Sort-Merge Join)等操作依赖数据的有序性。
  3. 前缀匹配查询:如按用户 ID 前缀查询某个租户的所有数据,范围分区天然支持这种局部性。
  4. 地理位置数据:地理围栏查询、附近搜索等场景中,将地理编码作为范围键可以提高空间查询效率。

选择哈希分区的场景

  1. 纯 KV 点查:缓存系统(如 Memcached、Redis Cluster)的核心操作是 Get/Set,不需要范围查询,哈希分区的均匀性和简单性是最优选择。
  2. 高写入吞吐量:日志采集系统中,如果不需要按时间范围查询,哈希分区可以避免写入热点。
  3. 自增主键场景:使用自增 ID 作为键时,范围分区会导致所有写入集中在最后一个分区,哈希分区可以打散。
  4. 负载均衡优先:对延迟一致性要求极高的场景(如在线广告竞价),哈希分区的均匀负载分布更有优势。

混合策略的场景

  1. Cassandra 式复合键:使用哈希分区键 + 范围排序键,兼顾写入均匀和分区内有序扫描。适合 IoT 传感器数据(按设备 ID 哈希,按时间戳排序)。
  2. 两级分区:先按租户 ID 哈希分配到节点,再在节点内按时间范围分区。适合多租户 SaaS 系统。

十一、总结

范围分区是分布式系统中的重要技术,它在保持数据顺序性的同时实现了水平扩展。本文深入探讨了范围分区的各个方面:

核心机制

分裂策略

合并策略

热点处理

生产实现

混合方案

范围分区没有银弹。它在范围查询方面的优势需要付出热点处理和动态调整的代价。选择范围分区还是哈希分区,取决于应用的访问模式。如果范围查询是核心需求,范围分区是正确的选择。如果只需要点查询且负载均衡更重要,哈希分区可能更合适。

在下一篇文章中,我们将探讨二级索引(Secondary Index)的分区策略,这是另一个充满挑战的话题。

参考文献

  1. Corbett, J. C., et al. (2013). “Spanner: Google’s Globally Distributed Database.” ACM Transactions on Computer Systems (TOCS), 31(3).

  2. Chang, F., et al. (2008). “Bigtable: A Distributed Storage System for Structured Data.” ACM Transactions on Computer Systems (TOCS), 26(2).

  3. Huang, D., et al. (2020). “TiDB: A Raft-based HTAP Database.” Proceedings of the VLDB Endowment, 13(12).

  4. Lakshman, A., & Malik, P. (2010). “Cassandra: A Decentralized Structured Storage System.” ACM SIGOPS Operating Systems Review, 44(2).

  5. DeCandia, G., et al. (2007). “Dynamo: Amazon’s Highly Available Key-value Store.” ACM SIGOPS Operating Systems Review, 41(6).

  6. Apache HBase Documentation. “Region Splits.” https://hbase.apache.org/book.html#regions.arch

  7. TiKV Documentation. “Region Merge.” https://tikv.org/docs/

  8. Kleppmann, M. (2017). “Designing Data-Intensive Applications.” O’Reilly Media.


上一篇:哈希分区 | 下一篇:二级索引

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13

【分布式系统百科】数据再平衡:固定分区、动态分区与 TiKV 的调度策略

在上一篇文章中,我们讨论了分布式系统中的二级索引问题。本文将深入探讨数据再平衡(Rebalancing)的核心策略和实现细节。当分布式系统运行一段时间后,数据分布可能会变得不均匀,节点可能会加入或离开集群,这时就需要再平衡机制来重新分配数据,保证系统的负载均衡和高可用性。

2026-04-13 · distributed

【分布式系统百科】Raft 深度重写:从论文的 18 页到 etcd 的 15000 行

Raft 论文 18 页就能读完,但 etcd/raft 用了 15000 行 Go 才把它变成能在生产环境跑的代码。这篇文章从论文的每一个核心机制出发,逐一拆解工程实现中论文没说的东西:PreVote、ReadIndex、LeaderTransfer、ConfChange V2、流水线复制、Async Apply,以及 TiKV 的 Multi-Raft 实践。最后做一次精确的 Paxos 对比,并坦诚讨论 Raft 的已知缺陷。


By .