土法炼钢兴趣小组的算法知识备份

【分布式系统百科】分区环境下的二级索引:本地索引 vs 全局索引

文章导航

标签入口
#分布式系统#数据分区#二级索引#DynamoDB#Elasticsearch

目录

在分布式系统中,当数据量增长到单机无法容纳时,我们需要将数据分区(Partition)到多个节点上。然而,分区带来了一个棘手的问题:如何高效地支持非主键查询?主键决定了数据的分区位置,但实际业务中,我们经常需要根据其他字段进行查询。这就引出了分区环境下的二级索引(Secondary Index)问题——这是分布式数据库设计中最具挑战性的课题之一。

一、问题的本质:分区与查询的矛盾

1.1 主键分区的困境

当我们使用主键对数据进行分区时,通过主键查询数据非常高效——只需根据分区函数计算出数据所在的分区,然后直接访问该分区即可。但现实世界的查询需求往往更加复杂。

假设我们有一个电商系统,用户表按 user_id 进行哈希分区:

# 用户数据按 user_id 分区
class User:
    user_id: str      # 主键,决定分区
    email: str        # 需要查询
    country: str      # 需要查询
    registration_date: datetime  # 需要查询
    
# 哈希分区函数
def get_partition(user_id: str, num_partitions: int) -> int:
    return hash(user_id) % num_partitions

这种设计下,以下查询会遇到不同的性能特征:

# 查询 1:通过主键查询 - O(1) 复杂度
user = db.get_user(user_id="user_12345")  # 直接定位到分区 3

# 查询 2:通过邮箱查询 - O(N) 复杂度,需要扫描所有分区
user = db.find_user_by_email(email="alice@example.com")  # 需要查询所有分区

# 查询 3:按国家统计 - O(N) 复杂度
count = db.count_users_by_country(country="China")  # 需要扫描所有分区

对于查询 2 和查询 3,由于 emailcountry 不是分区键,系统无法确定数据在哪个分区,只能向所有分区发送查询请求。这种操作被称为散射-聚集(Scatter-Gather),代价高昂。

1.2 二级索引的必要性

为了解决这个问题,我们需要在非主键字段上建立索引。在单机数据库中,这很简单——B+ 树或哈希索引就能搞定。但在分布式环境中,二级索引的实现面临两个根本性的设计选择:

  1. 本地索引(Local Index):也称为文档分区索引(Document-Partitioned Index),每个分区维护自己的本地数据的索引
  2. 全局索引(Global Index):也称为词项分区索引(Term-Partitioned Index),索引条目按照索引键进行分区,与原始数据的分区方式独立

这两种方案在写入路径、读取路径、一致性保证、系统复杂度等方面有着截然不同的权衡。

二、本地索引:每个分区自治

2.1 设计原理

本地索引的核心思想是将索引与数据放在一起。每个分区不仅存储属于它的数据,还维护这些数据的所有索引。

分区 0:                    分区 1:                    分区 2:
数据:                      数据:                      数据:
  user_1 (CN)                user_2 (US)                user_4 (CN)
  user_7 (JP)                user_5 (US)                user_8 (UK)
  
本地索引 (country):          本地索引 (country):          本地索引 (country):
  CN -> [user_1]              US -> [user_2, user_5]       CN -> [user_4]
  JP -> [user_7]                                           UK -> [user_8]

2.2 写入路径:简单高效

写入操作只涉及单个分区,保持了原子性:

class LocalIndexPartition:
    def __init__(self, partition_id: int):
        self.partition_id = partition_id
        self.data = {}  # user_id -> User
        self.indexes = {
            'email': {},      # email -> user_id
            'country': {}     # country -> [user_ids]
        }
    
    def write_user(self, user: User) -> None:
        """写入用户数据,同时更新本地索引"""
        # 1. 检查主键冲突
        if user.user_id in self.data:
            old_user = self.data[user.user_id]
            self._remove_from_indexes(old_user)
        
        # 2. 写入数据
        self.data[user.user_id] = user
        
        # 3. 更新本地索引(在同一个事务中)
        self._add_to_indexes(user)
    
    def _add_to_indexes(self, user: User) -> None:
        """将用户添加到本地索引"""
        # 邮箱唯一索引
        self.indexes['email'][user.email] = user.user_id
        
        # 国家多值索引
        if user.country not in self.indexes['country']:
            self.indexes['country'][user.country] = []
        self.indexes['country'][user.country].append(user.user_id)
    
    def _remove_from_indexes(self, user: User) -> None:
        """从本地索引中移除用户"""
        if user.email in self.indexes['email']:
            del self.indexes['email'][user.email]
        
        if user.country in self.indexes['country']:
            self.indexes['country'][user.country].remove(user.user_id)
            if not self.indexes['country'][user.country]:
                del self.indexes['country'][user.country]

写入的关键优势:

2.3 查询路径:散射-聚集

查询非主键字段时,由于不知道数据在哪个分区,必须向所有分区发送请求:

class DistributedDatabase:
    def __init__(self, num_partitions: int):
        self.partitions = [
            LocalIndexPartition(i) for i in range(num_partitions)
        ]
    
    def find_users_by_country(self, country: str) -> List[User]:
        """通过国家查询用户 - 需要查询所有分区"""
        results = []
        
        # 1. 散射阶段:并行查询所有分区
        futures = []
        for partition in self.partitions:
            future = self._query_partition_async(partition, country)
            futures.append(future)
        
        # 2. 聚集阶段:收集所有结果
        for future in futures:
            partition_results = future.result()
            results.extend(partition_results)
        
        return results
    
    def _query_partition_async(self, partition: LocalIndexPartition, 
                               country: str) -> Future:
        """异步查询单个分区"""
        def query():
            if country not in partition.indexes['country']:
                return []
            
            user_ids = partition.indexes['country'][country]
            return [partition.data[uid] for uid in user_ids]
        
        return ThreadPoolExecutor().submit(query)

查询的关键特征:

2.4 MongoDB 的本地索引实现

MongoDB 在分片(Sharding)环境中使用本地索引。每个分片维护自己的数据和索引:

// MongoDB 分片集合
db.users.createIndex({ country: 1, registration_date: -1 })

// 查询会路由到所有分片
db.users.find({ country: "China" }).sort({ registration_date: -1 }).limit(10)

MongoDB 的查询路由器(mongos)会:

  1. 将查询发送到所有分片
  2. 每个分片使用本地索引执行查询
  3. mongos 合并结果并应用 sortlimit

优化技巧:如果查询包含分片键(Shard Key),mongos 可以只查询相关分片:

// 如果 users 按 user_id 分片,这个查询可以定向路由
db.users.find({ user_id: "user_12345", country: "China" })

// 但这个查询仍然需要散射到所有分片
db.users.find({ country: "China" })

2.5 本地索引的适用场景

本地索引适合以下场景:

  1. 写多读少:写入性能是瓶颈,可以接受查询扫描所有分区
  2. 分析型查询:需要扫描大量数据的 OLAP 查询,散射-聚集可以并行化
  3. 强一致性要求:索引和数据在同一分区,容易保证一致性

三、全局索引:索引独立分区

3.1 设计原理

全局索引将索引数据与原始数据解耦,按照索引键对索引条目进行分区。这样,相同索引值的所有条目会集中在同一个索引分区中。

数据分区(按 user_id):
分区 0: user_1(CN), user_7(JP)
分区 1: user_2(US), user_5(US)
分区 2: user_4(CN), user_8(UK)

全局索引分区(按 country):
索引分区 A: CN -> [(partition_0, user_1), (partition_2, user_4)]
索引分区 B: JP -> [(partition_0, user_7)]
               US -> [(partition_1, user_2), (partition_1, user_5)]
索引分区 C: UK -> [(partition_2, user_8)]

3.2 写入路径:跨分区更新

写入单条记录时,需要更新多个地方:

class GlobalIndexSystem:
    def __init__(self, num_data_partitions: int, num_index_partitions: int):
        self.data_partitions = [DataPartition(i) for i in range(num_data_partitions)]
        self.index_partitions = [IndexPartition(i) for i in range(num_index_partitions)]
    
    def write_user(self, user: User) -> None:
        """写入用户 - 需要更新数据分区和索引分区"""
        # 1. 确定数据分区
        data_partition_id = self._get_data_partition(user.user_id)
        data_partition = self.data_partitions[data_partition_id]
        
        # 2. 确定需要更新的索引分区
        country_index_partition_id = self._get_index_partition(user.country)
        email_index_partition_id = self._get_index_partition(user.email)
        
        # 3. 使用分布式事务或异步更新
        try:
            # 写入数据分区
            old_user = data_partition.write(user)
            
            # 更新索引分区
            self._update_country_index(
                user, old_user, country_index_partition_id, data_partition_id
            )
            self._update_email_index(
                user, old_user, email_index_partition_id, data_partition_id
            )
        except Exception as e:
            # 回滚或记录不一致
            self._handle_write_failure(user, e)
    
    def _get_data_partition(self, user_id: str) -> int:
        """根据用户 ID 确定数据分区"""
        return hash(user_id) % len(self.data_partitions)
    
    def _get_index_partition(self, index_value: str) -> int:
        """根据索引值确定索引分区"""
        return hash(index_value) % len(self.index_partitions)
    
    def _update_country_index(self, user: User, old_user: Optional[User],
                              index_partition_id: int, 
                              data_partition_id: int) -> None:
        """更新国家索引"""
        index_partition = self.index_partitions[index_partition_id]
        
        # 删除旧索引条目
        if old_user and old_user.country != user.country:
            old_index_partition_id = self._get_index_partition(old_user.country)
            old_index_partition = self.index_partitions[old_index_partition_id]
            old_index_partition.remove_entry(
                'country', old_user.country, data_partition_id, user.user_id
            )
        
        # 添加新索引条目
        index_partition.add_entry(
            'country', user.country, data_partition_id, user.user_id
        )

写入的关键挑战:

索引写入路径涉及多个状态转换,任何一步失败都需要恰当处理。下图展示了索引更新的状态机:

stateDiagram-v2
    [*] --> IndexCurrent: 初始状态
    IndexCurrent --> WritePending: 收到数据写入请求
    WritePending --> Updating: 开始更新索引条目
    Updating --> IndexCurrent: 更新成功
    Updating --> RetryPending: 更新失败(网络/超时)
    RetryPending --> Updating: 重试更新
    RetryPending --> IndexStale: 超过最大重试次数
    IndexStale --> Updating: 后台修复任务触发
    IndexStale --> IndexCurrent: 修复成功

上述状态机描述了全局索引写入路径的完整生命周期。正常路径下,索引从 IndexCurrent 经过 WritePending 和 Updating 后回到 IndexCurrent。异常路径中,更新失败会进入 RetryPending 状态进行有限次数的重试。如果重试耗尽,索引进入 IndexStale 状态,等待后台修复任务进行异步补偿。这种设计保证了索引最终一定会与主数据一致。

3.3 查询路径:精确定位

查询时可以直接定位到索引分区:

class GlobalIndexSystem:
    def find_users_by_country(self, country: str) -> List[User]:
        """通过国家查询用户 - 只需查询一个索引分区"""
        # 1. 确定索引分区
        index_partition_id = self._get_index_partition(country)
        index_partition = self.index_partitions[index_partition_id]
        
        # 2. 查询索引分区,获取数据位置
        index_entries = index_partition.query('country', country)
        # index_entries: [(data_partition_id, user_id), ...]
        
        # 3. 根据数据位置批量获取数据
        results = []
        data_requests = {}  # data_partition_id -> [user_ids]
        
        for data_partition_id, user_id in index_entries:
            if data_partition_id not in data_requests:
                data_requests[data_partition_id] = []
            data_requests[data_partition_id].append(user_id)
        
        # 4. 并行查询数据分区
        for data_partition_id, user_ids in data_requests.items():
            partition = self.data_partitions[data_partition_id]
            for user_id in user_ids:
                user = partition.get(user_id)
                if user:
                    results.append(user)
        
        return results

查询的关键优势:

3.4 全局索引的适用场景

全局索引适合以下场景:

  1. 读多写少:查询性能是瓶颈,可以接受写入的额外开销
  2. 点查询:通过索引查找少量记录,不需要全表扫描
  3. 低延迟要求:不能接受散射-聚集的尾延迟

四、DynamoDB GSI 深度剖析

4.1 GSI 架构设计

Amazon DynamoDB 的全局二级索引(Global Secondary Index, GSI)是全局索引的典型实现。GSI 本质上是一个独立的表,有自己的分区键和排序键。

# DynamoDB 表定义
class DynamoDBTable:
    """
    主表:按 user_id 分区
    GSI:按 country 分区,按 registration_date 排序
    """
    table_name = "Users"
    partition_key = "user_id"  # 主键
    
    # 定义 GSI
    global_secondary_indexes = [
        {
            "index_name": "CountryDateIndex",
            "partition_key": "country",      # GSI 分区键
            "sort_key": "registration_date", # GSI 排序键
            "projection": {
                "type": "INCLUDE",
                "attributes": ["email", "name"]  # 投影属性
            },
            "provisioned_throughput": {
                "read_capacity_units": 100,
                "write_capacity_units": 50
            }
        }
    ]

4.2 异步复制机制

DynamoDB GSI 使用异步复制从主表更新到索引表:

class DynamoDBGSIReplication:
    def __init__(self):
        self.main_table = MainTable()
        self.gsi_table = GSITable()
        self.replication_stream = StreamProcessor()
    
    def write_to_main_table(self, item: dict) -> None:
        """写入主表"""
        # 1. 同步写入主表
        self.main_table.put_item(item)
        
        # 2. 发布到 Stream(类似 WAL)
        change_event = {
            "event_type": "INSERT",
            "table": "Users",
            "keys": {"user_id": item["user_id"]},
            "new_image": item,
            "timestamp": time.time()
        }
        self.replication_stream.publish(change_event)
        
        # 3. 异步处理器消费 Stream 并更新 GSI
        # (在后台线程中执行)
    
    def process_stream_record(self, event: dict) -> None:
        """Stream 处理器:更新 GSI"""
        try:
            new_item = event["new_image"]
            
            # 构建 GSI 条目
            gsi_item = {
                "country": new_item["country"],  # GSI 分区键
                "registration_date": new_item["registration_date"],  # GSI 排序键
                "user_id": new_item["user_id"],  # 回指主表的键
                # 投影属性
                "email": new_item.get("email"),
                "name": new_item.get("name")
            }
            
            # 写入 GSI 表
            self.gsi_table.put_item(gsi_item)
            
        except Exception as e:
            # 重试机制
            self._retry_with_backoff(event)

4.3 最终一致性

由于异步复制,GSI 只能保证最终一致性(Eventual Consistency)。下图展示了 GSI 复制的时间线以及潜在的过期窗口(Staleness Window):

sequenceDiagram
    participant Client as 客户端
    participant Main as 主表分区
    participant Stream as 复制流
    participant GSI as GSI 分区

    Client->>Main: 写入 user_12345(country=CN)
    Main-->>Client: 写入成功
    Main->>Stream: 产生变更事件(异步)
    Note over Stream: 复制延迟窗口开始
    Client->>GSI: 查询 country=CN
    GSI-->>Client: 未找到(过期读取)
    Stream->>GSI: 异步更新 GSI 条目
    Note over Stream: 复制延迟窗口结束
    Client->>GSI: 再次查询 country=CN
    GSI-->>Client: 返回 user_12345

上图清晰地展示了 GSI 最终一致性的根源:主表写入成功到 GSI 更新完成之间存在一个不确定长度的延迟窗口。在此窗口内,通过 GSI 查询可能读到过期数据或完全读不到新写入的记录。DynamoDB 通常在几百毫秒内完成 GSI 复制,但这并非硬性保证。

import time

def demonstrate_gsi_eventual_consistency():
    """演示 GSI 的最终一致性"""
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('Users')
    
    # 写入新用户
    table.put_item(Item={
        'user_id': 'user_12345',
        'country': 'China',
        'email': 'alice@example.com',
        'registration_date': '2026-04-13'
    })
    
    # 立即查询 GSI - 可能查不到!
    response = table.query(
        IndexName='CountryDateIndex',
        KeyConditionExpression='country = :country',
        ExpressionAttributeValues={':country': 'China'}
    )
    print(f"立即查询结果: {len(response['Items'])} 条")  # 可能是 0
    
    # 等待一段时间后查询
    time.sleep(1)
    response = table.query(
        IndexName='CountryDateIndex',
        KeyConditionExpression='country = :country',
        ExpressionAttributeValues={':country': 'China'}
    )
    print(f"1秒后查询结果: {len(response['Items'])} 条")  # 可能还是 0
    
    # 通常在几百毫秒内复制完成,但没有保证

4.4 GSI 限流与反压

GSI 有自己的吞吐量配额,当 GSI 更新跟不上主表写入时,会对主表施加反压(Backpressure)

class DynamoDBThrottling:
    def __init__(self):
        self.main_table_wcu = 1000  # 主表写容量单位
        self.gsi_wcu = 100          # GSI 写容量单位
    
    def write_with_throttling_check(self, item: dict) -> bool:
        """写入时检查 GSI 限流"""
        # 计算写入此 item 需要的 WCU
        item_size_kb = len(json.dumps(item).encode()) / 1024
        required_wcu = max(1, int(item_size_kb))
        
        # 检查 GSI 容量
        if not self._check_gsi_capacity(required_wcu):
            raise ProvisionedThroughputExceededException(
                "GSI CountryDateIndex 吞吐量不足,主表写入被限流"
            )
        
        # 写入主表
        self.main_table.put_item(item)
        return True
    
    def _check_gsi_capacity(self, required_wcu: int) -> bool:
        """检查 GSI 是否有足够容量"""
        current_usage = self._get_gsi_current_wcu()
        return current_usage + required_wcu <= self.gsi_wcu

实践建议

  1. GSI 的 WCU 应该与主表相同,避免成为瓶颈
  2. 使用按需计费(On-Demand)模式可以自动扩展
  3. 监控 ThrottledRequests 指标

4.5 投影属性优化

GSI 的投影(Projection)决定了索引中包含哪些属性:

# 1. KEYS_ONLY - 只包含键属性(最省存储)
gsi_keys_only = {
    "projection": {"type": "KEYS_ONLY"}
}
# GSI 包含:country, registration_date, user_id

# 2. INCLUDE - 包含指定属性(平衡)
gsi_include = {
    "projection": {
        "type": "INCLUDE",
        "attributes": ["email", "name", "phone"]
    }
}
# GSI 包含:country, registration_date, user_id, email, name, phone

# 3. ALL - 包含所有属性(最快查询)
gsi_all = {
    "projection": {"type": "ALL"}
}
# GSI 包含:主表的所有属性

权衡分析:

def compare_projection_types():
    """对比不同投影类型的性能"""
    
    # KEYS_ONLY - 需要回表查询
    def query_with_keys_only():
        # 第一步:查询 GSI(1 次读取)
        response = table.query(
            IndexName='CountryDateIndex',
            KeyConditionExpression='country = :country',
            ExpressionAttributeValues={':country': 'China'}
        )
        
        # 第二步:对每个结果回表查询完整数据(N 次读取)
        users = []
        for item in response['Items']:
            full_user = table.get_item(Key={'user_id': item['user_id']})
            users.append(full_user['Item'])
        
        return users  # 总计 1 + N 次读取
    
    # ALL - 直接返回完整数据
    def query_with_all():
        # 一步完成(1 次读取)
        response = table.query(
            IndexName='CountryDateIndex',
            KeyConditionExpression='country = :country',
            ExpressionAttributeValues={':country': 'China'}
        )
        return response['Items']  # 总计 1 次读取
    
    # 存储成本对比
    storage_keys_only = "仅键:~100 字节/项"
    storage_all = "所有属性:~1KB/项"
    
    return {
        "KEYS_ONLY": {"reads": "1 + N", "storage": storage_keys_only},
        "ALL": {"reads": "1", "storage": storage_all}
    }

五、Elasticsearch 分片与索引

5.1 Lucene 索引架构

Elasticsearch 的每个分片(Shard)本质上是一个 Lucene 索引,使用本地索引模型:

class ElasticsearchShard:
    """ES 分片 - 包含完整的 Lucene 索引"""
    
    def __init__(self, shard_id: int):
        self.shard_id = shard_id
        self.lucene_index = LuceneIndex()
        
        # Lucene 倒排索引结构
        self.inverted_index = {
            # term -> posting list
            "china": PostingList([doc1, doc5, doc9]),
            "beijing": PostingList([doc1, doc9]),
            "shanghai": PostingList([doc5]),
        }
        
        # 正排索引(列存储)
        self.doc_values = {
            "country": ["China", "US", "China", ...],
            "timestamp": [1640000000, 1640000100, ...]
        }
    
    def index_document(self, doc_id: str, document: dict) -> None:
        """索引文档到 Lucene"""
        # 1. 分词
        for field, value in document.items():
            if isinstance(value, str):
                tokens = self._analyze(value, field)
                for token in tokens:
                    self._add_to_inverted_index(token, doc_id, field)
        
        # 2. 存储原始文档
        self.lucene_index.store_document(doc_id, document)
        
        # 3. 构建列存储(用于聚合)
        self._build_doc_values(doc_id, document)
    
    def search(self, query: str) -> List[str]:
        """在本地分片搜索"""
        # 查询本地 Lucene 索引
        tokens = self._analyze(query, "content")
        doc_ids = set()
        
        for token in tokens:
            if token in self.inverted_index:
                doc_ids.update(self.inverted_index[token].doc_ids)
        
        return list(doc_ids)

5.2 查询的散射-聚集

ES 的查询协调节点(Coordinating Node)执行散射-聚集:

class ElasticsearchCoordinator:
    def __init__(self, shards: List[ElasticsearchShard]):
        self.shards = shards
    
    def search(self, query: dict) -> dict:
        """执行分布式搜索"""
        # 1. Query Phase - 散射到所有分片
        shard_results = []
        for shard in self.shards:
            # 每个分片返回 top-N 文档 ID 和得分
            result = shard.search_phase_query(query)
            shard_results.append(result)
        
        # 2. 在协调节点合并结果
        merged_doc_ids = self._merge_top_results(shard_results, query['size'])
        
        # 3. Fetch Phase - 获取完整文档
        documents = []
        fetch_requests = self._group_by_shard(merged_doc_ids)
        
        for shard_id, doc_ids in fetch_requests.items():
            shard = self.shards[shard_id]
            docs = shard.fetch_documents(doc_ids)
            documents.extend(docs)
        
        return {
            "hits": {
                "total": sum(r['total'] for r in shard_results),
                "hits": documents
            }
        }
    
    def _merge_top_results(self, shard_results: List[dict], 
                           size: int) -> List[tuple]:
        """合并各分片的 top 结果"""
        import heapq
        
        # 使用最小堆合并
        all_results = []
        for result in shard_results:
            for hit in result['hits']:
                all_results.append((hit['score'], hit['_id'], hit['_shard']))
        
        # 按得分排序,取 top N
        top_results = heapq.nlargest(size, all_results, key=lambda x: x[0])
        return [(doc_id, shard) for score, doc_id, shard in top_results]

5.3 相关性评分的挑战

在分片环境中,TF-IDF 评分会出现问题:

class RelevanceScoring:
    """演示分片环境下的 TF-IDF 问题"""
    
    def compute_tf_idf_score(self, term: str, doc_id: str, 
                             shard: ElasticsearchShard) -> float:
        """计算 TF-IDF 得分"""
        # TF (词频)
        tf = self._compute_tf(term, doc_id, shard)
        
        # IDF (逆文档频率) - 问题在这里!
        idf = self._compute_idf_local(term, shard)  # 只基于本地分片
        
        return tf * idf
    
    def _compute_idf_local(self, term: str, 
                           shard: ElasticsearchShard) -> float:
        """基于本地分片计算 IDF - 不准确!"""
        # IDF = log(N / df)
        # N = 分片中的文档总数(不是全局总数)
        # df = 包含该词的文档数(不是全局数量)
        
        num_docs = shard.doc_count()  # 仅本地
        doc_freq = len(shard.inverted_index.get(term, []))  # 仅本地
        
        import math
        return math.log((num_docs + 1) / (doc_freq + 1))
    
    def _compute_idf_global(self, term: str, 
                            all_shards: List[ElasticsearchShard]) -> float:
        """基于全局统计计算 IDF - 准确但慢"""
        total_docs = sum(s.doc_count() for s in all_shards)
        total_doc_freq = sum(
            len(s.inverted_index.get(term, [])) for s in all_shards
        )
        
        import math
        return math.log((total_docs + 1) / (total_doc_freq + 1))

问题场景

# 场景:某个词在不同分片的分布不均匀
# 分片 0:文档 1-1000,其中 500 个包含 "python"
# 分片 1:文档 1001-2000,其中 10 个包含 "python"

# 在分片 1 中,"python" 的 IDF 很高(罕见词)
# 在分片 0 中,"python" 的 IDF 很低(常见词)

# 结果:相同的查询在不同分片得到不同的得分
# 可能导致排序不准确

5.4 DFS Query Then Fetch

为了解决相关性评分问题,ES 提供了 dfs_query_then_fetch 模式:

class ElasticsearchDFS:
    def search_with_dfs(self, query: dict) -> dict:
        """使用 DFS 模式搜索"""
        # 1. DFS Phase - 收集全局词频统计
        global_stats = {}
        for shard in self.shards:
            shard_stats = shard.get_term_statistics(query)
            self._merge_statistics(global_stats, shard_stats)
        
        # 2. Query Phase - 使用全局统计计算得分
        shard_results = []
        for shard in self.shards:
            result = shard.search_with_global_stats(query, global_stats)
            shard_results.append(result)
        
        # 3. Fetch Phase
        return self._fetch_and_merge(shard_results, query['size'])
    
    def _merge_statistics(self, global_stats: dict, 
                         shard_stats: dict) -> None:
        """合并词频统计"""
        for term, stats in shard_stats.items():
            if term not in global_stats:
                global_stats[term] = {'doc_freq': 0, 'total_term_freq': 0}
            
            global_stats[term]['doc_freq'] += stats['doc_freq']
            global_stats[term]['total_term_freq'] += stats['total_term_freq']

权衡

在实践中,如果数据分布均匀,默认模式通常足够好。

六、物化视图:预计算的查询结果

6.1 物化视图的概念

物化视图(Materialized View)是另一种支持复杂查询的方式——预先计算并存储查询结果

-- 传统视图(虚拟视图)- 每次查询时计算
CREATE VIEW user_country_stats AS
SELECT country, COUNT(*) as user_count, AVG(age) as avg_age
FROM users
GROUP BY country;

-- 物化视图 - 结果持久化存储
CREATE MATERIALIZED VIEW user_country_stats_mv AS
SELECT country, COUNT(*) as user_count, AVG(age) as avg_age
FROM users
GROUP BY country;

6.2 物化视图维护策略

物化视图的核心挑战是保持与基表同步

class MaterializedView:
    """物化视图实现"""
    
    def __init__(self, base_table: str, view_query: str):
        self.base_table = base_table
        self.view_query = view_query
        self.materialized_data = {}
        self.refresh_strategy = "INCREMENTAL"
    
    def refresh_full(self) -> None:
        """完全刷新 - 重新计算整个视图"""
        print("执行完全刷新...")
        # 删除旧数据
        self.materialized_data.clear()
        
        # 重新执行查询
        result = self._execute_query(self.view_query)
        self.materialized_data = result
    
    def refresh_incremental(self, change_log: List[dict]) -> None:
        """增量刷新 - 只更新变化的部分"""
        print(f"处理 {len(change_log)} 条变更...")
        
        for change in change_log:
            if change['operation'] == 'INSERT':
                self._apply_insert(change['new_row'])
            elif change['operation'] == 'UPDATE':
                self._apply_update(change['old_row'], change['new_row'])
            elif change['operation'] == 'DELETE':
                self._apply_delete(change['old_row'])
    
    def _apply_insert(self, new_row: dict) -> None:
        """应用插入操作到物化视图"""
        # 例:视图是按国家统计用户数
        country = new_row['country']
        
        if country not in self.materialized_data:
            self.materialized_data[country] = {
                'user_count': 0,
                'total_age': 0
            }
        
        self.materialized_data[country]['user_count'] += 1
        self.materialized_data[country]['total_age'] += new_row['age']
    
    def _apply_update(self, old_row: dict, new_row: dict) -> None:
        """应用更新操作"""
        # 先删除旧值的影响
        self._apply_delete(old_row)
        # 再添加新值
        self._apply_insert(new_row)
    
    def _apply_delete(self, old_row: dict) -> None:
        """应用删除操作"""
        country = old_row['country']
        self.materialized_data[country]['user_count'] -= 1
        self.materialized_data[country]['total_age'] -= old_row['age']

6.3 刷新策略对比

class MVRefreshStrategies:
    """物化视图刷新策略对比"""
    
    def strategy_eager_sync(self, base_table_write: dict) -> None:
        """策略 1:同步立即刷新(强一致性)"""
        # 在同一个事务中更新基表和物化视图
        with transaction():
            base_table.write(base_table_write)
            materialized_view.refresh_incremental([base_table_write])
        
        # 优点:物化视图始终是最新的
        # 缺点:写入延迟高,基表写入被阻塞
    
    def strategy_async_near_realtime(self) -> None:
        """策略 2:异步准实时刷新(最终一致性)"""
        # 使用 CDC(变更数据捕获)异步更新
        change_stream = ChangeDataCapture(base_table)
        
        for change in change_stream.subscribe():
            # 在后台线程异步处理
            async_executor.submit(
                lambda: materialized_view.refresh_incremental([change])
            )
        
        # 优点:不阻塞写入,写入性能好
        # 缺点:物化视图可能落后几秒钟
    
    def strategy_periodic_batch(self) -> None:
        """策略 3:定期批量刷新"""
        # 每隔一段时间完全重建
        schedule.every(1).hours.do(
            lambda: materialized_view.refresh_full()
        )
        
        # 优点:简单,适合对实时性要求不高的场景
        # 缺点:刷新期间资源消耗大,数据可能非常陈旧

6.4 写放大与存储成本

物化视图带来显著的写放大:

def analyze_write_amplification():
    """分析物化视图的写放大"""
    
    # 基表
    base_table_size = 1_000_000  # 100 万用户
    
    # 物化视图
    materialized_views = [
        "user_by_country",         # 按国家统计
        "user_by_age_group",       # 按年龄段统计
        "user_by_registration_year",  # 按注册年份统计
        "user_email_index",        # 邮箱索引
        "user_country_city_stats"  # 国家-城市二维统计
    ]
    
    # 写放大分析
    def write_user(user: dict):
        # 1. 写入基表 - 1 次写
        base_table.insert(user)
        
        # 2. 更新 5 个物化视图 - 5 次写
        for mv in materialized_views:
            mv.refresh_incremental(user)
        
        # 总计:6 次写入,写放大 = 6x
    
    # 存储放大分析
    base_storage = base_table_size * 1024  # 1KB per user
    mv_storage = len(materialized_views) * base_table_size * 512  # 每个 MV 平均 0.5KB
    
    total_storage = base_storage + mv_storage
    storage_amplification = total_storage / base_storage
    
    print(f"写放大系数: {len(materialized_views) + 1}x")
    print(f"存储放大系数: {storage_amplification:.2f}x")
    
    return {
        "write_amplification": len(materialized_views) + 1,
        "storage_amplification": storage_amplification
    }

七、权衡分析:写放大 vs 读放大

7.1 性能特征对比

from dataclasses import dataclass
from typing import Optional

@dataclass
class IndexStrategy:
    name: str
    write_latency: str
    read_latency: str
    write_amplification: int
    read_amplification: int
    consistency: str
    complexity: str

def compare_strategies() -> dict:
    """对比三种索引策略"""
    
    strategies = {
        "本地索引": IndexStrategy(
            name="Local Index (Document-Partitioned)",
            write_latency="低(单分区写入)",
            read_latency="高(散射-聚集)",
            write_amplification=1,  # 只写入一个分区
            read_amplification=100,  # 假设 100 个分区
            consistency="强一致性",
            complexity="低"
        ),
        
        "全局索引": IndexStrategy(
            name="Global Index (Term-Partitioned)",
            write_latency="中(需要更新索引分区)",
            read_latency="低(精确定位)",
            write_amplification=2,  # 写入数据分区 + 索引分区
            read_amplification=2,  # 查询索引分区 + 数据分区
            consistency="最终一致性(如果异步)",
            complexity="高(需要处理分布式一致性)"
        ),
        
        "物化视图": IndexStrategy(
            name="Materialized View",
            write_latency="高(更新多个物化视图)",
            read_latency="极低(预计算结果)",
            write_amplification=5,  # 假设 5 个物化视图
            read_amplification=1,  # 直接查询物化视图
            consistency="取决于刷新策略",
            complexity="高(需要维护同步逻辑)"
        )
    }
    
    return strategies

# 决策树
def choose_strategy(workload: dict) -> str:
    """根据工作负载选择策略"""
    
    read_write_ratio = workload['reads'] / workload['writes']
    latency_requirement = workload['max_latency_ms']
    consistency_requirement = workload['consistency']
    
    if read_write_ratio > 100:
        # 读多写少
        if latency_requirement < 50:
            return "物化视图(预计算结果,查询极快)"
        else:
            return "全局索引(平衡读写性能)"
    
    elif read_write_ratio < 10:
        # 写多读少
        return "本地索引(写入快,读取可以慢)"
    
    else:
        # 读写平衡
        if consistency_requirement == "strong":
            return "本地索引(容易保证一致性)"
        else:
            return "全局索引(读写都不错)"

7.2 数学模型分析

import math

class IndexCostModel:
    """索引成本模型"""
    
    def __init__(self, num_partitions: int, num_indexes: int):
        self.P = num_partitions  # 分区数量
        self.I = num_indexes     # 索引数量
    
    def local_index_cost(self, writes_per_sec: float, 
                        reads_per_sec: float) -> dict:
        """本地索引成本"""
        # 写入成本:O(1) - 只写入一个分区
        write_cost = writes_per_sec * 1
        
        # 读取成本:O(P) - 扇出到所有分区
        read_cost = reads_per_sec * self.P
        
        # 网络开销
        network_roundtrips = reads_per_sec * 1  # 并行查询,1 个 RTT
        
        return {
            "write_cost": write_cost,
            "read_cost": read_cost,
            "total_cost": write_cost + read_cost,
            "network_roundtrips": network_roundtrips
        }
    
    def global_index_cost(self, writes_per_sec: float, 
                         reads_per_sec: float,
                         async_replication: bool = True) -> dict:
        """全局索引成本"""
        if async_replication:
            # 异步复制:写入成本低
            write_cost = writes_per_sec * 1  # 只写主表
            write_async_cost = writes_per_sec * self.I  # 后台异步更新索引
        else:
            # 同步更新:写入成本高
            write_cost = writes_per_sec * (1 + self.I)
            write_async_cost = 0
        
        # 读取成本:O(1) - 精确定位
        # 平均每个查询需要访问 1 个索引分区 + K 个数据分区
        avg_data_partitions = 2  # 假设平均返回 2 个分区的数据
        read_cost = reads_per_sec * (1 + avg_data_partitions)
        
        # 网络开销
        network_roundtrips = reads_per_sec * 2  # 索引查询 + 数据获取
        
        return {
            "write_cost": write_cost,
            "write_async_cost": write_async_cost,
            "read_cost": read_cost,
            "total_cost": write_cost + write_async_cost + read_cost,
            "network_roundtrips": network_roundtrips
        }
    
    def compare(self, writes_per_sec: float, reads_per_sec: float) -> None:
        """对比两种策略"""
        local = self.local_index_cost(writes_per_sec, reads_per_sec)
        global_async = self.global_index_cost(writes_per_sec, reads_per_sec, 
                                              async_replication=True)
        
        print(f"分区数: {self.P}, 索引数: {self.I}")
        print(f"工作负载: {writes_per_sec} 写/秒, {reads_per_sec} 读/秒")
        print(f"\n本地索引总成本: {local['total_cost']:.2f}")
        print(f"  - 写入: {local['write_cost']:.2f}")
        print(f"  - 读取: {local['read_cost']:.2f}")
        print(f"\n全局索引总成本: {global_async['total_cost']:.2f}")
        print(f"  - 写入(同步): {global_async['write_cost']:.2f}")
        print(f"  - 写入(异步): {global_async['write_async_cost']:.2f}")
        print(f"  - 读取: {global_async['read_cost']:.2f}")
        
        if local['total_cost'] < global_async['total_cost']:
            print(f"\n推荐: 本地索引(成本低 {(1 - global_async['total_cost']/local['total_cost'])*100:.1f}%)")
        else:
            print(f"\n推荐: 全局索引(成本低 {(1 - local['total_cost']/global_async['total_cost'])*100:.1f}%)")

# 示例分析
model = IndexCostModel(num_partitions=100, num_indexes=3)

print("场景 1: 写多读少")
model.compare(writes_per_sec=1000, reads_per_sec=100)

print("\n" + "="*50 + "\n")

print("场景 2: 读多写少")
model.compare(writes_per_sec=100, reads_per_sec=1000)

八、实践模式:CQRS 与 CDC

8.1 CQRS 架构

命令查询职责分离(Command Query Responsibility Segregation, CQRS)将读写模型分离:

class CQRSSystem:
    """CQRS 架构实现"""
    
    def __init__(self):
        # 写模型:优化写入性能
        self.write_model = CommandModel()
        
        # 读模型:优化查询性能
        self.read_models = {
            'user_by_email': ReadModelByEmail(),
            'user_by_country': ReadModelByCountry(),
            'user_stats': ReadModelStats()
        }
        
        # 事件流:连接写模型和读模型
        self.event_store = EventStore()
    
    def handle_command(self, command: dict) -> None:
        """处理写命令"""
        if command['type'] == 'CreateUser':
            # 1. 验证命令
            self._validate_create_user(command['data'])
            
            # 2. 写入写模型(简单、快速)
            user_id = self.write_model.create_user(command['data'])
            
            # 3. 发布事件到事件流
            event = {
                'type': 'UserCreated',
                'aggregate_id': user_id,
                'data': command['data'],
                'timestamp': time.time(),
                'version': 1
            }
            self.event_store.append(event)
            
            # 4. 异步传播到读模型(不阻塞写入)
            self._publish_event_async(event)
    
    def handle_query(self, query: dict) -> dict:
        """处理读查询"""
        if query['type'] == 'FindUserByEmail':
            # 直接查询优化的读模型
            return self.read_models['user_by_email'].find(query['email'])
        
        elif query['type'] == 'GetUsersByCountry':
            return self.read_models['user_by_country'].find(query['country'])
        
        elif query['type'] == 'GetUserStats':
            return self.read_models['user_stats'].get_stats()
    
    def _publish_event_async(self, event: dict) -> None:
        """异步传播事件到读模型"""
        for read_model in self.read_models.values():
            # 在后台线程中更新读模型
            threading.Thread(
                target=read_model.apply_event,
                args=(event,)
            ).start()

class ReadModelByEmail:
    """针对邮箱查询优化的读模型"""
    
    def __init__(self):
        # 使用哈希索引
        self.email_index = {}  # email -> user_data
    
    def apply_event(self, event: dict) -> None:
        """应用事件更新读模型"""
        if event['type'] == 'UserCreated':
            user_data = event['data']
            self.email_index[user_data['email']] = user_data
        
        elif event['type'] == 'UserUpdated':
            old_email = event['old_data']['email']
            new_email = event['data']['email']
            
            if old_email != new_email:
                del self.email_index[old_email]
                self.email_index[new_email] = event['data']
            else:
                self.email_index[new_email] = event['data']
    
    def find(self, email: str) -> Optional[dict]:
        """O(1) 查询"""
        return self.email_index.get(email)

class ReadModelByCountry:
    """针对国家查询优化的读模型"""
    
    def __init__(self):
        # 使用倒排索引
        self.country_index = {}  # country -> [user_data]
    
    def apply_event(self, event: dict) -> None:
        if event['type'] == 'UserCreated':
            user_data = event['data']
            country = user_data['country']
            
            if country not in self.country_index:
                self.country_index[country] = []
            self.country_index[country].append(user_data)
    
    def find(self, country: str) -> List[dict]:
        """O(1) 查询"""
        return self.country_index.get(country, [])

8.2 变更数据捕获(CDC)

CDC 从数据库的事务日志中捕获变更,用于同步索引和物化视图:

class ChangeDataCapture:
    """CDC 实现 - 从 WAL 捕获变更"""
    
    def __init__(self, database: str):
        self.database = database
        self.wal_reader = WALReader(database)
        self.subscribers = []
    
    def subscribe(self, callback: callable) -> None:
        """订阅变更事件"""
        self.subscribers.append(callback)
    
    def start(self) -> None:
        """启动 CDC 进程"""
        print("启动 CDC,监听 WAL...")
        
        # 从上次检查点继续
        last_lsn = self._load_checkpoint()
        
        for wal_entry in self.wal_reader.read_from(last_lsn):
            # 解析 WAL 条目
            change_event = self._parse_wal_entry(wal_entry)
            
            if change_event:
                # 通知所有订阅者
                for subscriber in self.subscribers:
                    try:
                        subscriber(change_event)
                    except Exception as e:
                        print(f"订阅者处理失败: {e}")
                
                # 保存检查点
                self._save_checkpoint(wal_entry.lsn)
    
    def _parse_wal_entry(self, wal_entry: WALEntry) -> Optional[dict]:
        """解析 WAL 条目为变更事件"""
        if wal_entry.type == WALEntryType.INSERT:
            return {
                'operation': 'INSERT',
                'table': wal_entry.table,
                'new_row': wal_entry.new_tuple,
                'timestamp': wal_entry.timestamp,
                'lsn': wal_entry.lsn
            }
        
        elif wal_entry.type == WALEntryType.UPDATE:
            return {
                'operation': 'UPDATE',
                'table': wal_entry.table,
                'old_row': wal_entry.old_tuple,
                'new_row': wal_entry.new_tuple,
                'timestamp': wal_entry.timestamp,
                'lsn': wal_entry.lsn
            }
        
        elif wal_entry.type == WALEntryType.DELETE:
            return {
                'operation': 'DELETE',
                'table': wal_entry.table,
                'old_row': wal_entry.old_tuple,
                'timestamp': wal_entry.timestamp,
                'lsn': wal_entry.lsn
            }
        
        return None

# 使用 CDC 维护全局索引
class CDCBasedGlobalIndex:
    """基于 CDC 的全局索引"""
    
    def __init__(self):
        self.index_partitions = [IndexPartition(i) for i in range(10)]
        self.cdc = ChangeDataCapture('users_db')
        
        # 订阅 CDC 事件
        self.cdc.subscribe(self.handle_change_event)
    
    def handle_change_event(self, event: dict) -> None:
        """处理变更事件,更新索引"""
        if event['table'] != 'users':
            return
        
        if event['operation'] == 'INSERT':
            self._index_new_user(event['new_row'])
        
        elif event['operation'] == 'UPDATE':
            self._update_user_index(event['old_row'], event['new_row'])
        
        elif event['operation'] == 'DELETE':
            self._remove_user_index(event['old_row'])
    
    def _index_new_user(self, user: dict) -> None:
        """索引新用户"""
        # 根据 country 确定索引分区
        partition_id = hash(user['country']) % len(self.index_partitions)
        partition = self.index_partitions[partition_id]
        
        partition.add_entry(
            index_name='country',
            index_value=user['country'],
            doc_ref=(user['user_id'], user)
        )

8.3 Debezium 实战示例

Debezium 是流行的开源 CDC 平台:

# Debezium 配置示例
debezium_config = {
    "name": "users-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "secret",
        "database.dbname": "users_db",
        "database.server.name": "users",
        "table.include.list": "public.users",
        "plugin.name": "pgoutput",  # PostgreSQL 逻辑复制插件
        "slot.name": "debezium_users"
    }
}

# 消费 Debezium 事件
from kafka import KafkaConsumer
import json

def consume_debezium_events():
    """从 Kafka 消费 Debezium CDC 事件"""
    consumer = KafkaConsumer(
        'users.public.users',  # Debezium topic
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    for message in consumer:
        event = message.value
        
        # Debezium 事件结构
        operation = event['op']  # c=create, u=update, d=delete
        
        if operation == 'c':  # Create
            new_user = event['after']
            global_index.index_new_user(new_user)
        
        elif operation == 'u':  # Update
            old_user = event['before']
            new_user = event['after']
            global_index.update_user_index(old_user, new_user)
        
        elif operation == 'd':  # Delete
            old_user = event['before']
            global_index.remove_user_index(old_user)

九、生产环境的实践建议

9.1 索引设计清单

class IndexDesignChecklist:
    """索引设计清单"""
    
    @staticmethod
    def evaluate_index_design(requirements: dict) -> dict:
        """评估索引设计"""
        
        questions = {
            "查询模式": [
                "主要查询是点查询还是范围查询?",
                "查询的选择性如何?(返回多少行)",
                "是否需要排序、聚合?",
                "查询 QPS 是多少?"
            ],
            
            "写入模式": [
                "写入 QPS 是多少?",
                "写入是否有突发(burst)?",
                "能否接受异步索引更新?",
                "写入延迟预算是多少?"
            ],
            
            "一致性": [
                "读需要强一致性还是最终一致性?",
                "索引可以落后主数据多久?",
                "不一致的窗口期对业务的影响?"
            ],
            
            "扩展性": [
                "预期数据量增长速度?",
                "索引是否支持在线扩容?",
                "每个索引的基数(cardinality)?"
            ],
            
            "成本": [
                "存储成本预算?",
                "计算资源预算?",
                "运维复杂度接受度?"
            ]
        }
        
        return questions

### 9.2 常见反模式

```python
class AntiPatterns:
    """索引设计反模式"""
    
    @staticmethod
    def antipattern_1_too_many_indexes():
        """反模式 1: 索引过多"""
        print("问题:为每个字段都建索引")
        print("后果:")
        print("  - 写入性能严重下降")
        print("  - 存储成本翻倍甚至更高")
        print("  - 索引维护成本高")
        print("\n建议:只为频繁查询的字段建索引")
        print("  - 分析查询日志,找到热点查询")
        print("  - 使用复合索引覆盖多个查询")
        print("  - 定期审查和删除无用索引")
    
    @staticmethod
    def antipattern_2_global_index_with_sync_update():
        """反模式 2: 全局索引使用同步更新"""
        print("问题:全局索引每次写入都同步更新所有索引分区")
        print("后果:")
        print("  - 写入延迟非常高")
        print("  - 分布式事务复杂,容易失败")
        print("  - 系统可用性下降")
        print("\n建议:使用异步复制")
        print("  - 接受最终一致性")
        print("  - 使用 CDC 或事件流")
        print("  - 在应用层处理不一致窗口期")
    
    @staticmethod
    def antipattern_3_scatter_gather_without_pruning():
        """反模式 3: 散射-聚集时不做分区裁剪"""
        print("问题:本地索引查询时无脑扇出到所有分区")
        print("后果:")
        print("  - 查询延迟高")
        print("  - 浪费网络和 CPU 资源")
        print("\n建议:尽量包含分区键进行分区裁剪")
        print("示例:")
        print("  差:SELECT * FROM users WHERE country = 'China'")
        print("  好:SELECT * FROM users WHERE user_id_prefix = '86' AND country = 'China'")
        print("      (假设 user_id 按前缀分区)")
    
    @staticmethod
    def antipattern_4_hot_partition_in_global_index():
        """反模式 4: 全局索引的热点分区"""
        print("问题:索引键分布不均匀,导致某些索引分区过热")
        print("示例:按国家分区,中国、美国的索引分区远大于其他")
        print("后果:")
        print("  - 热点分区成为瓶颈")
        print("  - 负载不均衡")
        print("\n建议:")
        print("  - 使用复合分区键:(country, user_id_hash)")
        print("  - 对热点键进行拆分")
        print("  - 监控分区大小,动态再平衡")

9.3 监控指标

class IndexMonitoring:
    """索引监控指标"""
    
    def collect_metrics(self) -> dict:
        """收集关键指标"""
        return {
            "写入指标": {
                "index_write_latency_p99": "索引写入 P99 延迟",
                "index_write_throughput": "索引写入吞吐量",
                "index_update_failures": "索引更新失败次数",
                "index_replication_lag": "索引复制延迟(秒)"
            },
            
            "查询指标": {
                "index_query_latency_p99": "索引查询 P99 延迟",
                "index_hit_rate": "索引命中率",
                "scatter_gather_fanout": "散射-聚集扇出数",
                "index_scan_rows": "索引扫描行数"
            },
            
            "存储指标": {
                "index_size_bytes": "索引大小",
                "index_data_ratio": "索引/数据比例",
                "index_fragmentation": "索引碎片率"
            },
            
            "一致性指标": {
                "index_data_inconsistency_count": "索引数据不一致数量",
                "stale_index_reads": "读取到陈旧索引的次数",
                "index_repair_operations": "索引修复操作次数"
            }
        }
    
    def set_alerts(self) -> dict:
        """设置告警阈值"""
        return {
            "index_replication_lag > 10s": "告警:索引复制延迟过高",
            "index_update_failures > 100/min": "告警:索引更新失败率高",
            "index_data_ratio > 2.0": "告警:索引占用空间过大",
            "scatter_gather_fanout > 100": "告警:查询扇出过多"
        }

十、本地索引与全局索引的延迟/一致性权衡

在实际系统中,本地索引和全局索引的选择本质上是写入延迟、查询延迟、一致性保证三者之间的权衡。本节从工程实践角度深入分析这组权衡关系。

10.1 写入路径延迟对比

操作 本地索引 全局索引(同步) 全局索引(异步)
数据写入 单分区本地写 跨分区分布式事务 单分区本地写
索引更新 与数据在同一分区,原子操作 需要 2PC 或类似协议 异步队列推送
写入延迟 低(1-5ms) 高(10-50ms,取决于参与者数量) 低(1-5ms)
一致性保证 强一致:数据和索引原子更新 强一致:分布式事务保证 最终一致:存在延迟窗口

同步全局索引的延迟开销来源:每次写入需要跨分区协调。假设数据分区和索引分区在不同节点,一次写入至少需要一个额外的跨节点 RTT(Round-Trip Time)。如果使用 2PC 协调,则需要两个额外 RTT。对于写入密集型工作负载,这个开销可能成为瓶颈。

异步全局索引的一致性窗口:DynamoDB GSI 的复制延迟通常在几百毫秒以内,但在主表写入突增或 GSI 限流时,延迟可能达到秒级甚至更长。应用层必须容忍这种不一致,或者通过”读主表确认”的方式进行补偿查询。

10.2 查询路径延迟对比

查询类型 本地索引 全局索引
按索引键点查 必须散射-聚集所有分区(P 个 RPC) 精确定位 1-2 个分区(1-2 个 RPC)
按索引键范围查询 散射-聚集,结果需合并排序 索引分区内有序扫描
尾延迟(P99) 受最慢分区影响,随分区数线性增长 基本稳定,不受分区数影响

当分区数量增长到数百甚至数千时,本地索引的散射-聚集查询延迟会显著恶化。全局索引的查询延迟则相对稳定,这是大规模系统倾向于使用全局索引的核心原因。

10.3 一致性级别的实际影响

在生产系统中,一致性要求往往因业务场景而异:

十一、再平衡对二级索引的影响

当主数据发生再平衡(Rebalancing)时,二级索引面临的挑战因索引类型不同而差异巨大。

11.1 本地索引的再平衡

本地索引与数据绑定在同一分区,因此数据迁移时索引必须随数据一起迁移。核心影响:

11.2 全局索引的再平衡

全局索引独立于数据分区,主数据再平衡对全局索引的影响更为复杂:

11.3 减轻再平衡影响的策略

策略 适用索引类型 描述
索引与数据共迁移 本地索引 将索引文件作为数据迁移的一部分,保证原子性
双写过渡期 全局索引 迁移期间同时更新新旧分区的索引条目,完成后清理旧条目
惰性修复 全局索引 查询时发现指针失效后异步修复索引条目,避免全量重建
后台全量校验 两者 再平衡完成后启动后台任务,逐条对比数据与索引,修复不一致

十二、总结

分区环境下的二级索引是分布式系统中最复杂的问题之一,没有银弹解决方案,只有根据具体场景的权衡:

本地索引: - ✅ 写入简单、快速、强一致 - ✅ 索引与数据同分区,易于管理 - ❌ 查询需要散射-聚集,延迟高 - 适用:写多读少、分析型查询、强一致性要求

全局索引: - ✅ 查询精确定位,延迟低 - ✅ 支持高效的点查询 - ❌ 写入涉及多个分区,复杂度高 - ❌ 通常是最终一致性 - 适用:读多写少、点查询、可接受最终一致性

物化视图: - ✅ 查询极快(预计算) - ✅ 支持复杂聚合 - ❌ 写放大严重 - ❌ 存储开销大 - 适用:读写比极高、固定查询模式

在实践中,这三种方案往往组合使用: - 主表使用本地索引支持强一致查询 - 热点查询字段使用全局索引加速 - 复杂报表使用物化视图预计算 - CQRS 架构分离读写模型 - CDC 管道保持索引同步

设计索引时,务必: 1. 分析真实查询模式和访问频率 2. 评估读写比例和性能要求 3. 权衡一致性与性能 4. 监控索引效率,持续优化 5. 避免过度索引

记住:索引是为查询服务的,不是为了数据库的美观。每个索引都有成本,只建立真正需要的索引。

参考文献

  1. Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 6: Partitioning.

  2. Amazon Web Services. Amazon DynamoDB Developer Guide: Global Secondary Indexes.

  3. Elasticsearch B.V. Elasticsearch: The Definitive Guide. O’Reilly Media, 2015. Chapter 13: Distributed Search.

  4. Rick Houlihan. Advanced Design Patterns for Amazon DynamoDB. AWS re:Invent, 2019.

  5. Pat Helland. Immutability Changes Everything. CIDR 2015.

  6. Giuseppe DeCandia et al. Dynamo: Amazon’s Highly Available Key-value Store. SOSP 2007.

  7. Apache Cassandra Project. Secondary Indexes in Cassandra.

  8. MongoDB Inc. MongoDB Manual: Sharding and Indexes.

  9. Jay Kreps. The Log: What every software engineer should know about real-time data’s unifying abstraction. LinkedIn Engineering Blog, 2013.

  10. Kleppmann, Martin and Beresford, Alastair R. A Conflict-Free Replicated JSON Datatype. IEEE Transactions on Parallel and Distributed Systems, 2017.


上一篇:范围分区 | 下一篇:数据再平衡

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13

【分布式系统百科】数据再平衡:固定分区、动态分区与 TiKV 的调度策略

在上一篇文章中,我们讨论了分布式系统中的二级索引问题。本文将深入探讨数据再平衡(Rebalancing)的核心策略和实现细节。当分布式系统运行一段时间后,数据分布可能会变得不均匀,节点可能会加入或离开集群,这时就需要再平衡机制来重新分配数据,保证系统的负载均衡和高可用性。

2026-04-13

【分布式系统百科】Dynamo 论文精读:最终一致性的工业级范本

2007 年,Amazon 在 SOSP 会议上发表了《Dynamo: Amazon's Highly Available Key-value Store》论文,这篇论文彻底改变了分布式存储系统的设计思路。与追求强一致性的传统数据库不同,Dynamo 选择了一条完全不同的道路:牺牲一致性,换取可用性和分区容错性。这个设…


By .