在分布式系统中,当数据量增长到单机无法容纳时,我们需要将数据分区(Partition)到多个节点上。然而,分区带来了一个棘手的问题:如何高效地支持非主键查询?主键决定了数据的分区位置,但实际业务中,我们经常需要根据其他字段进行查询。这就引出了分区环境下的二级索引(Secondary Index)问题——这是分布式数据库设计中最具挑战性的课题之一。
一、问题的本质:分区与查询的矛盾
1.1 主键分区的困境
当我们使用主键对数据进行分区时,通过主键查询数据非常高效——只需根据分区函数计算出数据所在的分区,然后直接访问该分区即可。但现实世界的查询需求往往更加复杂。
假设我们有一个电商系统,用户表按 user_id
进行哈希分区:
# 用户数据按 user_id 分区
class User:
user_id: str # 主键,决定分区
email: str # 需要查询
country: str # 需要查询
registration_date: datetime # 需要查询
# 哈希分区函数
def get_partition(user_id: str, num_partitions: int) -> int:
return hash(user_id) % num_partitions这种设计下,以下查询会遇到不同的性能特征:
# 查询 1:通过主键查询 - O(1) 复杂度
user = db.get_user(user_id="user_12345") # 直接定位到分区 3
# 查询 2:通过邮箱查询 - O(N) 复杂度,需要扫描所有分区
user = db.find_user_by_email(email="alice@example.com") # 需要查询所有分区
# 查询 3:按国家统计 - O(N) 复杂度
count = db.count_users_by_country(country="China") # 需要扫描所有分区对于查询 2 和查询 3,由于 email 和
country
不是分区键,系统无法确定数据在哪个分区,只能向所有分区发送查询请求。这种操作被称为散射-聚集(Scatter-Gather),代价高昂。
1.2 二级索引的必要性
为了解决这个问题,我们需要在非主键字段上建立索引。在单机数据库中,这很简单——B+ 树或哈希索引就能搞定。但在分布式环境中,二级索引的实现面临两个根本性的设计选择:
- 本地索引(Local Index):也称为文档分区索引(Document-Partitioned Index),每个分区维护自己的本地数据的索引
- 全局索引(Global Index):也称为词项分区索引(Term-Partitioned Index),索引条目按照索引键进行分区,与原始数据的分区方式独立
这两种方案在写入路径、读取路径、一致性保证、系统复杂度等方面有着截然不同的权衡。
二、本地索引:每个分区自治
2.1 设计原理
本地索引的核心思想是将索引与数据放在一起。每个分区不仅存储属于它的数据,还维护这些数据的所有索引。
分区 0: 分区 1: 分区 2:
数据: 数据: 数据:
user_1 (CN) user_2 (US) user_4 (CN)
user_7 (JP) user_5 (US) user_8 (UK)
本地索引 (country): 本地索引 (country): 本地索引 (country):
CN -> [user_1] US -> [user_2, user_5] CN -> [user_4]
JP -> [user_7] UK -> [user_8]
2.2 写入路径:简单高效
写入操作只涉及单个分区,保持了原子性:
class LocalIndexPartition:
def __init__(self, partition_id: int):
self.partition_id = partition_id
self.data = {} # user_id -> User
self.indexes = {
'email': {}, # email -> user_id
'country': {} # country -> [user_ids]
}
def write_user(self, user: User) -> None:
"""写入用户数据,同时更新本地索引"""
# 1. 检查主键冲突
if user.user_id in self.data:
old_user = self.data[user.user_id]
self._remove_from_indexes(old_user)
# 2. 写入数据
self.data[user.user_id] = user
# 3. 更新本地索引(在同一个事务中)
self._add_to_indexes(user)
def _add_to_indexes(self, user: User) -> None:
"""将用户添加到本地索引"""
# 邮箱唯一索引
self.indexes['email'][user.email] = user.user_id
# 国家多值索引
if user.country not in self.indexes['country']:
self.indexes['country'][user.country] = []
self.indexes['country'][user.country].append(user.user_id)
def _remove_from_indexes(self, user: User) -> None:
"""从本地索引中移除用户"""
if user.email in self.indexes['email']:
del self.indexes['email'][user.email]
if user.country in self.indexes['country']:
self.indexes['country'][user.country].remove(user.user_id)
if not self.indexes['country'][user.country]:
del self.indexes['country'][user.country]写入的关键优势:
- 单分区事务:数据和索引在同一个分区,可以使用本地事务保证一致性
- 低延迟:只需要一次网络往返
- 无写放大:不需要更新其他分区的索引
2.3 查询路径:散射-聚集
查询非主键字段时,由于不知道数据在哪个分区,必须向所有分区发送请求:
class DistributedDatabase:
def __init__(self, num_partitions: int):
self.partitions = [
LocalIndexPartition(i) for i in range(num_partitions)
]
def find_users_by_country(self, country: str) -> List[User]:
"""通过国家查询用户 - 需要查询所有分区"""
results = []
# 1. 散射阶段:并行查询所有分区
futures = []
for partition in self.partitions:
future = self._query_partition_async(partition, country)
futures.append(future)
# 2. 聚集阶段:收集所有结果
for future in futures:
partition_results = future.result()
results.extend(partition_results)
return results
def _query_partition_async(self, partition: LocalIndexPartition,
country: str) -> Future:
"""异步查询单个分区"""
def query():
if country not in partition.indexes['country']:
return []
user_ids = partition.indexes['country'][country]
return [partition.data[uid] for uid in user_ids]
return ThreadPoolExecutor().submit(query)查询的关键特征:
- 尾延迟问题(Tail Latency):整体查询速度取决于最慢的分区
- 网络放大:需要与所有分区通信
- 计算分散:无法在单点进行优化,如排序、聚合需要在协调节点完成
2.4 MongoDB 的本地索引实现
MongoDB 在分片(Sharding)环境中使用本地索引。每个分片维护自己的数据和索引:
// MongoDB 分片集合
db.users.createIndex({ country: 1, registration_date: -1 })
// 查询会路由到所有分片
db.users.find({ country: "China" }).sort({ registration_date: -1 }).limit(10)MongoDB 的查询路由器(mongos)会:
- 将查询发送到所有分片
- 每个分片使用本地索引执行查询
- mongos 合并结果并应用
sort和limit
优化技巧:如果查询包含分片键(Shard Key),mongos 可以只查询相关分片:
// 如果 users 按 user_id 分片,这个查询可以定向路由
db.users.find({ user_id: "user_12345", country: "China" })
// 但这个查询仍然需要散射到所有分片
db.users.find({ country: "China" })2.5 本地索引的适用场景
本地索引适合以下场景:
- 写多读少:写入性能是瓶颈,可以接受查询扫描所有分区
- 分析型查询:需要扫描大量数据的 OLAP 查询,散射-聚集可以并行化
- 强一致性要求:索引和数据在同一分区,容易保证一致性
三、全局索引:索引独立分区
3.1 设计原理
全局索引将索引数据与原始数据解耦,按照索引键对索引条目进行分区。这样,相同索引值的所有条目会集中在同一个索引分区中。
数据分区(按 user_id):
分区 0: user_1(CN), user_7(JP)
分区 1: user_2(US), user_5(US)
分区 2: user_4(CN), user_8(UK)
全局索引分区(按 country):
索引分区 A: CN -> [(partition_0, user_1), (partition_2, user_4)]
索引分区 B: JP -> [(partition_0, user_7)]
US -> [(partition_1, user_2), (partition_1, user_5)]
索引分区 C: UK -> [(partition_2, user_8)]
3.2 写入路径:跨分区更新
写入单条记录时,需要更新多个地方:
class GlobalIndexSystem:
def __init__(self, num_data_partitions: int, num_index_partitions: int):
self.data_partitions = [DataPartition(i) for i in range(num_data_partitions)]
self.index_partitions = [IndexPartition(i) for i in range(num_index_partitions)]
def write_user(self, user: User) -> None:
"""写入用户 - 需要更新数据分区和索引分区"""
# 1. 确定数据分区
data_partition_id = self._get_data_partition(user.user_id)
data_partition = self.data_partitions[data_partition_id]
# 2. 确定需要更新的索引分区
country_index_partition_id = self._get_index_partition(user.country)
email_index_partition_id = self._get_index_partition(user.email)
# 3. 使用分布式事务或异步更新
try:
# 写入数据分区
old_user = data_partition.write(user)
# 更新索引分区
self._update_country_index(
user, old_user, country_index_partition_id, data_partition_id
)
self._update_email_index(
user, old_user, email_index_partition_id, data_partition_id
)
except Exception as e:
# 回滚或记录不一致
self._handle_write_failure(user, e)
def _get_data_partition(self, user_id: str) -> int:
"""根据用户 ID 确定数据分区"""
return hash(user_id) % len(self.data_partitions)
def _get_index_partition(self, index_value: str) -> int:
"""根据索引值确定索引分区"""
return hash(index_value) % len(self.index_partitions)
def _update_country_index(self, user: User, old_user: Optional[User],
index_partition_id: int,
data_partition_id: int) -> None:
"""更新国家索引"""
index_partition = self.index_partitions[index_partition_id]
# 删除旧索引条目
if old_user and old_user.country != user.country:
old_index_partition_id = self._get_index_partition(old_user.country)
old_index_partition = self.index_partitions[old_index_partition_id]
old_index_partition.remove_entry(
'country', old_user.country, data_partition_id, user.user_id
)
# 添加新索引条目
index_partition.add_entry(
'country', user.country, data_partition_id, user.user_id
)写入的关键挑战:
- 分布式事务:需要协调多个分区的更新
- 写放大:一次写入影响多个分区
- 一致性问题:索引可能与数据不一致
索引写入路径涉及多个状态转换,任何一步失败都需要恰当处理。下图展示了索引更新的状态机:
stateDiagram-v2
[*] --> IndexCurrent: 初始状态
IndexCurrent --> WritePending: 收到数据写入请求
WritePending --> Updating: 开始更新索引条目
Updating --> IndexCurrent: 更新成功
Updating --> RetryPending: 更新失败(网络/超时)
RetryPending --> Updating: 重试更新
RetryPending --> IndexStale: 超过最大重试次数
IndexStale --> Updating: 后台修复任务触发
IndexStale --> IndexCurrent: 修复成功
上述状态机描述了全局索引写入路径的完整生命周期。正常路径下,索引从 IndexCurrent 经过 WritePending 和 Updating 后回到 IndexCurrent。异常路径中,更新失败会进入 RetryPending 状态进行有限次数的重试。如果重试耗尽,索引进入 IndexStale 状态,等待后台修复任务进行异步补偿。这种设计保证了索引最终一定会与主数据一致。
3.3 查询路径:精确定位
查询时可以直接定位到索引分区:
class GlobalIndexSystem:
def find_users_by_country(self, country: str) -> List[User]:
"""通过国家查询用户 - 只需查询一个索引分区"""
# 1. 确定索引分区
index_partition_id = self._get_index_partition(country)
index_partition = self.index_partitions[index_partition_id]
# 2. 查询索引分区,获取数据位置
index_entries = index_partition.query('country', country)
# index_entries: [(data_partition_id, user_id), ...]
# 3. 根据数据位置批量获取数据
results = []
data_requests = {} # data_partition_id -> [user_ids]
for data_partition_id, user_id in index_entries:
if data_partition_id not in data_requests:
data_requests[data_partition_id] = []
data_requests[data_partition_id].append(user_id)
# 4. 并行查询数据分区
for data_partition_id, user_ids in data_requests.items():
partition = self.data_partitions[data_partition_id]
for user_id in user_ids:
user = partition.get(user_id)
if user:
results.append(user)
return results查询的关键优势:
- 低延迟:只需查询相关的索引分区和数据分区
- 可扩展性:查询开销与结果集大小成正比,而非分区数量
3.4 全局索引的适用场景
全局索引适合以下场景:
- 读多写少:查询性能是瓶颈,可以接受写入的额外开销
- 点查询:通过索引查找少量记录,不需要全表扫描
- 低延迟要求:不能接受散射-聚集的尾延迟
四、DynamoDB GSI 深度剖析
4.1 GSI 架构设计
Amazon DynamoDB 的全局二级索引(Global Secondary Index, GSI)是全局索引的典型实现。GSI 本质上是一个独立的表,有自己的分区键和排序键。
# DynamoDB 表定义
class DynamoDBTable:
"""
主表:按 user_id 分区
GSI:按 country 分区,按 registration_date 排序
"""
table_name = "Users"
partition_key = "user_id" # 主键
# 定义 GSI
global_secondary_indexes = [
{
"index_name": "CountryDateIndex",
"partition_key": "country", # GSI 分区键
"sort_key": "registration_date", # GSI 排序键
"projection": {
"type": "INCLUDE",
"attributes": ["email", "name"] # 投影属性
},
"provisioned_throughput": {
"read_capacity_units": 100,
"write_capacity_units": 50
}
}
]4.2 异步复制机制
DynamoDB GSI 使用异步复制从主表更新到索引表:
class DynamoDBGSIReplication:
def __init__(self):
self.main_table = MainTable()
self.gsi_table = GSITable()
self.replication_stream = StreamProcessor()
def write_to_main_table(self, item: dict) -> None:
"""写入主表"""
# 1. 同步写入主表
self.main_table.put_item(item)
# 2. 发布到 Stream(类似 WAL)
change_event = {
"event_type": "INSERT",
"table": "Users",
"keys": {"user_id": item["user_id"]},
"new_image": item,
"timestamp": time.time()
}
self.replication_stream.publish(change_event)
# 3. 异步处理器消费 Stream 并更新 GSI
# (在后台线程中执行)
def process_stream_record(self, event: dict) -> None:
"""Stream 处理器:更新 GSI"""
try:
new_item = event["new_image"]
# 构建 GSI 条目
gsi_item = {
"country": new_item["country"], # GSI 分区键
"registration_date": new_item["registration_date"], # GSI 排序键
"user_id": new_item["user_id"], # 回指主表的键
# 投影属性
"email": new_item.get("email"),
"name": new_item.get("name")
}
# 写入 GSI 表
self.gsi_table.put_item(gsi_item)
except Exception as e:
# 重试机制
self._retry_with_backoff(event)4.3 最终一致性
由于异步复制,GSI 只能保证最终一致性(Eventual Consistency)。下图展示了 GSI 复制的时间线以及潜在的过期窗口(Staleness Window):
sequenceDiagram
participant Client as 客户端
participant Main as 主表分区
participant Stream as 复制流
participant GSI as GSI 分区
Client->>Main: 写入 user_12345(country=CN)
Main-->>Client: 写入成功
Main->>Stream: 产生变更事件(异步)
Note over Stream: 复制延迟窗口开始
Client->>GSI: 查询 country=CN
GSI-->>Client: 未找到(过期读取)
Stream->>GSI: 异步更新 GSI 条目
Note over Stream: 复制延迟窗口结束
Client->>GSI: 再次查询 country=CN
GSI-->>Client: 返回 user_12345
上图清晰地展示了 GSI 最终一致性的根源:主表写入成功到 GSI 更新完成之间存在一个不确定长度的延迟窗口。在此窗口内,通过 GSI 查询可能读到过期数据或完全读不到新写入的记录。DynamoDB 通常在几百毫秒内完成 GSI 复制,但这并非硬性保证。
import time
def demonstrate_gsi_eventual_consistency():
"""演示 GSI 的最终一致性"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')
# 写入新用户
table.put_item(Item={
'user_id': 'user_12345',
'country': 'China',
'email': 'alice@example.com',
'registration_date': '2026-04-13'
})
# 立即查询 GSI - 可能查不到!
response = table.query(
IndexName='CountryDateIndex',
KeyConditionExpression='country = :country',
ExpressionAttributeValues={':country': 'China'}
)
print(f"立即查询结果: {len(response['Items'])} 条") # 可能是 0
# 等待一段时间后查询
time.sleep(1)
response = table.query(
IndexName='CountryDateIndex',
KeyConditionExpression='country = :country',
ExpressionAttributeValues={':country': 'China'}
)
print(f"1秒后查询结果: {len(response['Items'])} 条") # 可能还是 0
# 通常在几百毫秒内复制完成,但没有保证4.4 GSI 限流与反压
GSI 有自己的吞吐量配额,当 GSI 更新跟不上主表写入时,会对主表施加反压(Backpressure):
class DynamoDBThrottling:
def __init__(self):
self.main_table_wcu = 1000 # 主表写容量单位
self.gsi_wcu = 100 # GSI 写容量单位
def write_with_throttling_check(self, item: dict) -> bool:
"""写入时检查 GSI 限流"""
# 计算写入此 item 需要的 WCU
item_size_kb = len(json.dumps(item).encode()) / 1024
required_wcu = max(1, int(item_size_kb))
# 检查 GSI 容量
if not self._check_gsi_capacity(required_wcu):
raise ProvisionedThroughputExceededException(
"GSI CountryDateIndex 吞吐量不足,主表写入被限流"
)
# 写入主表
self.main_table.put_item(item)
return True
def _check_gsi_capacity(self, required_wcu: int) -> bool:
"""检查 GSI 是否有足够容量"""
current_usage = self._get_gsi_current_wcu()
return current_usage + required_wcu <= self.gsi_wcu实践建议:
- GSI 的 WCU 应该与主表相同,避免成为瓶颈
- 使用按需计费(On-Demand)模式可以自动扩展
- 监控
ThrottledRequests指标
4.5 投影属性优化
GSI 的投影(Projection)决定了索引中包含哪些属性:
# 1. KEYS_ONLY - 只包含键属性(最省存储)
gsi_keys_only = {
"projection": {"type": "KEYS_ONLY"}
}
# GSI 包含:country, registration_date, user_id
# 2. INCLUDE - 包含指定属性(平衡)
gsi_include = {
"projection": {
"type": "INCLUDE",
"attributes": ["email", "name", "phone"]
}
}
# GSI 包含:country, registration_date, user_id, email, name, phone
# 3. ALL - 包含所有属性(最快查询)
gsi_all = {
"projection": {"type": "ALL"}
}
# GSI 包含:主表的所有属性权衡分析:
def compare_projection_types():
"""对比不同投影类型的性能"""
# KEYS_ONLY - 需要回表查询
def query_with_keys_only():
# 第一步:查询 GSI(1 次读取)
response = table.query(
IndexName='CountryDateIndex',
KeyConditionExpression='country = :country',
ExpressionAttributeValues={':country': 'China'}
)
# 第二步:对每个结果回表查询完整数据(N 次读取)
users = []
for item in response['Items']:
full_user = table.get_item(Key={'user_id': item['user_id']})
users.append(full_user['Item'])
return users # 总计 1 + N 次读取
# ALL - 直接返回完整数据
def query_with_all():
# 一步完成(1 次读取)
response = table.query(
IndexName='CountryDateIndex',
KeyConditionExpression='country = :country',
ExpressionAttributeValues={':country': 'China'}
)
return response['Items'] # 总计 1 次读取
# 存储成本对比
storage_keys_only = "仅键:~100 字节/项"
storage_all = "所有属性:~1KB/项"
return {
"KEYS_ONLY": {"reads": "1 + N", "storage": storage_keys_only},
"ALL": {"reads": "1", "storage": storage_all}
}五、Elasticsearch 分片与索引
5.1 Lucene 索引架构
Elasticsearch 的每个分片(Shard)本质上是一个 Lucene 索引,使用本地索引模型:
class ElasticsearchShard:
"""ES 分片 - 包含完整的 Lucene 索引"""
def __init__(self, shard_id: int):
self.shard_id = shard_id
self.lucene_index = LuceneIndex()
# Lucene 倒排索引结构
self.inverted_index = {
# term -> posting list
"china": PostingList([doc1, doc5, doc9]),
"beijing": PostingList([doc1, doc9]),
"shanghai": PostingList([doc5]),
}
# 正排索引(列存储)
self.doc_values = {
"country": ["China", "US", "China", ...],
"timestamp": [1640000000, 1640000100, ...]
}
def index_document(self, doc_id: str, document: dict) -> None:
"""索引文档到 Lucene"""
# 1. 分词
for field, value in document.items():
if isinstance(value, str):
tokens = self._analyze(value, field)
for token in tokens:
self._add_to_inverted_index(token, doc_id, field)
# 2. 存储原始文档
self.lucene_index.store_document(doc_id, document)
# 3. 构建列存储(用于聚合)
self._build_doc_values(doc_id, document)
def search(self, query: str) -> List[str]:
"""在本地分片搜索"""
# 查询本地 Lucene 索引
tokens = self._analyze(query, "content")
doc_ids = set()
for token in tokens:
if token in self.inverted_index:
doc_ids.update(self.inverted_index[token].doc_ids)
return list(doc_ids)5.2 查询的散射-聚集
ES 的查询协调节点(Coordinating Node)执行散射-聚集:
class ElasticsearchCoordinator:
def __init__(self, shards: List[ElasticsearchShard]):
self.shards = shards
def search(self, query: dict) -> dict:
"""执行分布式搜索"""
# 1. Query Phase - 散射到所有分片
shard_results = []
for shard in self.shards:
# 每个分片返回 top-N 文档 ID 和得分
result = shard.search_phase_query(query)
shard_results.append(result)
# 2. 在协调节点合并结果
merged_doc_ids = self._merge_top_results(shard_results, query['size'])
# 3. Fetch Phase - 获取完整文档
documents = []
fetch_requests = self._group_by_shard(merged_doc_ids)
for shard_id, doc_ids in fetch_requests.items():
shard = self.shards[shard_id]
docs = shard.fetch_documents(doc_ids)
documents.extend(docs)
return {
"hits": {
"total": sum(r['total'] for r in shard_results),
"hits": documents
}
}
def _merge_top_results(self, shard_results: List[dict],
size: int) -> List[tuple]:
"""合并各分片的 top 结果"""
import heapq
# 使用最小堆合并
all_results = []
for result in shard_results:
for hit in result['hits']:
all_results.append((hit['score'], hit['_id'], hit['_shard']))
# 按得分排序,取 top N
top_results = heapq.nlargest(size, all_results, key=lambda x: x[0])
return [(doc_id, shard) for score, doc_id, shard in top_results]5.3 相关性评分的挑战
在分片环境中,TF-IDF 评分会出现问题:
class RelevanceScoring:
"""演示分片环境下的 TF-IDF 问题"""
def compute_tf_idf_score(self, term: str, doc_id: str,
shard: ElasticsearchShard) -> float:
"""计算 TF-IDF 得分"""
# TF (词频)
tf = self._compute_tf(term, doc_id, shard)
# IDF (逆文档频率) - 问题在这里!
idf = self._compute_idf_local(term, shard) # 只基于本地分片
return tf * idf
def _compute_idf_local(self, term: str,
shard: ElasticsearchShard) -> float:
"""基于本地分片计算 IDF - 不准确!"""
# IDF = log(N / df)
# N = 分片中的文档总数(不是全局总数)
# df = 包含该词的文档数(不是全局数量)
num_docs = shard.doc_count() # 仅本地
doc_freq = len(shard.inverted_index.get(term, [])) # 仅本地
import math
return math.log((num_docs + 1) / (doc_freq + 1))
def _compute_idf_global(self, term: str,
all_shards: List[ElasticsearchShard]) -> float:
"""基于全局统计计算 IDF - 准确但慢"""
total_docs = sum(s.doc_count() for s in all_shards)
total_doc_freq = sum(
len(s.inverted_index.get(term, [])) for s in all_shards
)
import math
return math.log((total_docs + 1) / (total_doc_freq + 1))问题场景:
# 场景:某个词在不同分片的分布不均匀
# 分片 0:文档 1-1000,其中 500 个包含 "python"
# 分片 1:文档 1001-2000,其中 10 个包含 "python"
# 在分片 1 中,"python" 的 IDF 很高(罕见词)
# 在分片 0 中,"python" 的 IDF 很低(常见词)
# 结果:相同的查询在不同分片得到不同的得分
# 可能导致排序不准确5.4 DFS Query Then Fetch
为了解决相关性评分问题,ES 提供了
dfs_query_then_fetch 模式:
class ElasticsearchDFS:
def search_with_dfs(self, query: dict) -> dict:
"""使用 DFS 模式搜索"""
# 1. DFS Phase - 收集全局词频统计
global_stats = {}
for shard in self.shards:
shard_stats = shard.get_term_statistics(query)
self._merge_statistics(global_stats, shard_stats)
# 2. Query Phase - 使用全局统计计算得分
shard_results = []
for shard in self.shards:
result = shard.search_with_global_stats(query, global_stats)
shard_results.append(result)
# 3. Fetch Phase
return self._fetch_and_merge(shard_results, query['size'])
def _merge_statistics(self, global_stats: dict,
shard_stats: dict) -> None:
"""合并词频统计"""
for term, stats in shard_stats.items():
if term not in global_stats:
global_stats[term] = {'doc_freq': 0, 'total_term_freq': 0}
global_stats[term]['doc_freq'] += stats['doc_freq']
global_stats[term]['total_term_freq'] += stats['total_term_freq']权衡:
query_then_fetch(默认):快,但评分可能不准确dfs_query_then_fetch:准确,但增加一轮网络往返
在实践中,如果数据分布均匀,默认模式通常足够好。
六、物化视图:预计算的查询结果
6.1 物化视图的概念
物化视图(Materialized View)是另一种支持复杂查询的方式——预先计算并存储查询结果。
-- 传统视图(虚拟视图)- 每次查询时计算
CREATE VIEW user_country_stats AS
SELECT country, COUNT(*) as user_count, AVG(age) as avg_age
FROM users
GROUP BY country;
-- 物化视图 - 结果持久化存储
CREATE MATERIALIZED VIEW user_country_stats_mv AS
SELECT country, COUNT(*) as user_count, AVG(age) as avg_age
FROM users
GROUP BY country;6.2 物化视图维护策略
物化视图的核心挑战是保持与基表同步:
class MaterializedView:
"""物化视图实现"""
def __init__(self, base_table: str, view_query: str):
self.base_table = base_table
self.view_query = view_query
self.materialized_data = {}
self.refresh_strategy = "INCREMENTAL"
def refresh_full(self) -> None:
"""完全刷新 - 重新计算整个视图"""
print("执行完全刷新...")
# 删除旧数据
self.materialized_data.clear()
# 重新执行查询
result = self._execute_query(self.view_query)
self.materialized_data = result
def refresh_incremental(self, change_log: List[dict]) -> None:
"""增量刷新 - 只更新变化的部分"""
print(f"处理 {len(change_log)} 条变更...")
for change in change_log:
if change['operation'] == 'INSERT':
self._apply_insert(change['new_row'])
elif change['operation'] == 'UPDATE':
self._apply_update(change['old_row'], change['new_row'])
elif change['operation'] == 'DELETE':
self._apply_delete(change['old_row'])
def _apply_insert(self, new_row: dict) -> None:
"""应用插入操作到物化视图"""
# 例:视图是按国家统计用户数
country = new_row['country']
if country not in self.materialized_data:
self.materialized_data[country] = {
'user_count': 0,
'total_age': 0
}
self.materialized_data[country]['user_count'] += 1
self.materialized_data[country]['total_age'] += new_row['age']
def _apply_update(self, old_row: dict, new_row: dict) -> None:
"""应用更新操作"""
# 先删除旧值的影响
self._apply_delete(old_row)
# 再添加新值
self._apply_insert(new_row)
def _apply_delete(self, old_row: dict) -> None:
"""应用删除操作"""
country = old_row['country']
self.materialized_data[country]['user_count'] -= 1
self.materialized_data[country]['total_age'] -= old_row['age']6.3 刷新策略对比
class MVRefreshStrategies:
"""物化视图刷新策略对比"""
def strategy_eager_sync(self, base_table_write: dict) -> None:
"""策略 1:同步立即刷新(强一致性)"""
# 在同一个事务中更新基表和物化视图
with transaction():
base_table.write(base_table_write)
materialized_view.refresh_incremental([base_table_write])
# 优点:物化视图始终是最新的
# 缺点:写入延迟高,基表写入被阻塞
def strategy_async_near_realtime(self) -> None:
"""策略 2:异步准实时刷新(最终一致性)"""
# 使用 CDC(变更数据捕获)异步更新
change_stream = ChangeDataCapture(base_table)
for change in change_stream.subscribe():
# 在后台线程异步处理
async_executor.submit(
lambda: materialized_view.refresh_incremental([change])
)
# 优点:不阻塞写入,写入性能好
# 缺点:物化视图可能落后几秒钟
def strategy_periodic_batch(self) -> None:
"""策略 3:定期批量刷新"""
# 每隔一段时间完全重建
schedule.every(1).hours.do(
lambda: materialized_view.refresh_full()
)
# 优点:简单,适合对实时性要求不高的场景
# 缺点:刷新期间资源消耗大,数据可能非常陈旧6.4 写放大与存储成本
物化视图带来显著的写放大:
def analyze_write_amplification():
"""分析物化视图的写放大"""
# 基表
base_table_size = 1_000_000 # 100 万用户
# 物化视图
materialized_views = [
"user_by_country", # 按国家统计
"user_by_age_group", # 按年龄段统计
"user_by_registration_year", # 按注册年份统计
"user_email_index", # 邮箱索引
"user_country_city_stats" # 国家-城市二维统计
]
# 写放大分析
def write_user(user: dict):
# 1. 写入基表 - 1 次写
base_table.insert(user)
# 2. 更新 5 个物化视图 - 5 次写
for mv in materialized_views:
mv.refresh_incremental(user)
# 总计:6 次写入,写放大 = 6x
# 存储放大分析
base_storage = base_table_size * 1024 # 1KB per user
mv_storage = len(materialized_views) * base_table_size * 512 # 每个 MV 平均 0.5KB
total_storage = base_storage + mv_storage
storage_amplification = total_storage / base_storage
print(f"写放大系数: {len(materialized_views) + 1}x")
print(f"存储放大系数: {storage_amplification:.2f}x")
return {
"write_amplification": len(materialized_views) + 1,
"storage_amplification": storage_amplification
}七、权衡分析:写放大 vs 读放大
7.1 性能特征对比
from dataclasses import dataclass
from typing import Optional
@dataclass
class IndexStrategy:
name: str
write_latency: str
read_latency: str
write_amplification: int
read_amplification: int
consistency: str
complexity: str
def compare_strategies() -> dict:
"""对比三种索引策略"""
strategies = {
"本地索引": IndexStrategy(
name="Local Index (Document-Partitioned)",
write_latency="低(单分区写入)",
read_latency="高(散射-聚集)",
write_amplification=1, # 只写入一个分区
read_amplification=100, # 假设 100 个分区
consistency="强一致性",
complexity="低"
),
"全局索引": IndexStrategy(
name="Global Index (Term-Partitioned)",
write_latency="中(需要更新索引分区)",
read_latency="低(精确定位)",
write_amplification=2, # 写入数据分区 + 索引分区
read_amplification=2, # 查询索引分区 + 数据分区
consistency="最终一致性(如果异步)",
complexity="高(需要处理分布式一致性)"
),
"物化视图": IndexStrategy(
name="Materialized View",
write_latency="高(更新多个物化视图)",
read_latency="极低(预计算结果)",
write_amplification=5, # 假设 5 个物化视图
read_amplification=1, # 直接查询物化视图
consistency="取决于刷新策略",
complexity="高(需要维护同步逻辑)"
)
}
return strategies
# 决策树
def choose_strategy(workload: dict) -> str:
"""根据工作负载选择策略"""
read_write_ratio = workload['reads'] / workload['writes']
latency_requirement = workload['max_latency_ms']
consistency_requirement = workload['consistency']
if read_write_ratio > 100:
# 读多写少
if latency_requirement < 50:
return "物化视图(预计算结果,查询极快)"
else:
return "全局索引(平衡读写性能)"
elif read_write_ratio < 10:
# 写多读少
return "本地索引(写入快,读取可以慢)"
else:
# 读写平衡
if consistency_requirement == "strong":
return "本地索引(容易保证一致性)"
else:
return "全局索引(读写都不错)"7.2 数学模型分析
import math
class IndexCostModel:
"""索引成本模型"""
def __init__(self, num_partitions: int, num_indexes: int):
self.P = num_partitions # 分区数量
self.I = num_indexes # 索引数量
def local_index_cost(self, writes_per_sec: float,
reads_per_sec: float) -> dict:
"""本地索引成本"""
# 写入成本:O(1) - 只写入一个分区
write_cost = writes_per_sec * 1
# 读取成本:O(P) - 扇出到所有分区
read_cost = reads_per_sec * self.P
# 网络开销
network_roundtrips = reads_per_sec * 1 # 并行查询,1 个 RTT
return {
"write_cost": write_cost,
"read_cost": read_cost,
"total_cost": write_cost + read_cost,
"network_roundtrips": network_roundtrips
}
def global_index_cost(self, writes_per_sec: float,
reads_per_sec: float,
async_replication: bool = True) -> dict:
"""全局索引成本"""
if async_replication:
# 异步复制:写入成本低
write_cost = writes_per_sec * 1 # 只写主表
write_async_cost = writes_per_sec * self.I # 后台异步更新索引
else:
# 同步更新:写入成本高
write_cost = writes_per_sec * (1 + self.I)
write_async_cost = 0
# 读取成本:O(1) - 精确定位
# 平均每个查询需要访问 1 个索引分区 + K 个数据分区
avg_data_partitions = 2 # 假设平均返回 2 个分区的数据
read_cost = reads_per_sec * (1 + avg_data_partitions)
# 网络开销
network_roundtrips = reads_per_sec * 2 # 索引查询 + 数据获取
return {
"write_cost": write_cost,
"write_async_cost": write_async_cost,
"read_cost": read_cost,
"total_cost": write_cost + write_async_cost + read_cost,
"network_roundtrips": network_roundtrips
}
def compare(self, writes_per_sec: float, reads_per_sec: float) -> None:
"""对比两种策略"""
local = self.local_index_cost(writes_per_sec, reads_per_sec)
global_async = self.global_index_cost(writes_per_sec, reads_per_sec,
async_replication=True)
print(f"分区数: {self.P}, 索引数: {self.I}")
print(f"工作负载: {writes_per_sec} 写/秒, {reads_per_sec} 读/秒")
print(f"\n本地索引总成本: {local['total_cost']:.2f}")
print(f" - 写入: {local['write_cost']:.2f}")
print(f" - 读取: {local['read_cost']:.2f}")
print(f"\n全局索引总成本: {global_async['total_cost']:.2f}")
print(f" - 写入(同步): {global_async['write_cost']:.2f}")
print(f" - 写入(异步): {global_async['write_async_cost']:.2f}")
print(f" - 读取: {global_async['read_cost']:.2f}")
if local['total_cost'] < global_async['total_cost']:
print(f"\n推荐: 本地索引(成本低 {(1 - global_async['total_cost']/local['total_cost'])*100:.1f}%)")
else:
print(f"\n推荐: 全局索引(成本低 {(1 - local['total_cost']/global_async['total_cost'])*100:.1f}%)")
# 示例分析
model = IndexCostModel(num_partitions=100, num_indexes=3)
print("场景 1: 写多读少")
model.compare(writes_per_sec=1000, reads_per_sec=100)
print("\n" + "="*50 + "\n")
print("场景 2: 读多写少")
model.compare(writes_per_sec=100, reads_per_sec=1000)八、实践模式:CQRS 与 CDC
8.1 CQRS 架构
命令查询职责分离(Command Query Responsibility Segregation, CQRS)将读写模型分离:
class CQRSSystem:
"""CQRS 架构实现"""
def __init__(self):
# 写模型:优化写入性能
self.write_model = CommandModel()
# 读模型:优化查询性能
self.read_models = {
'user_by_email': ReadModelByEmail(),
'user_by_country': ReadModelByCountry(),
'user_stats': ReadModelStats()
}
# 事件流:连接写模型和读模型
self.event_store = EventStore()
def handle_command(self, command: dict) -> None:
"""处理写命令"""
if command['type'] == 'CreateUser':
# 1. 验证命令
self._validate_create_user(command['data'])
# 2. 写入写模型(简单、快速)
user_id = self.write_model.create_user(command['data'])
# 3. 发布事件到事件流
event = {
'type': 'UserCreated',
'aggregate_id': user_id,
'data': command['data'],
'timestamp': time.time(),
'version': 1
}
self.event_store.append(event)
# 4. 异步传播到读模型(不阻塞写入)
self._publish_event_async(event)
def handle_query(self, query: dict) -> dict:
"""处理读查询"""
if query['type'] == 'FindUserByEmail':
# 直接查询优化的读模型
return self.read_models['user_by_email'].find(query['email'])
elif query['type'] == 'GetUsersByCountry':
return self.read_models['user_by_country'].find(query['country'])
elif query['type'] == 'GetUserStats':
return self.read_models['user_stats'].get_stats()
def _publish_event_async(self, event: dict) -> None:
"""异步传播事件到读模型"""
for read_model in self.read_models.values():
# 在后台线程中更新读模型
threading.Thread(
target=read_model.apply_event,
args=(event,)
).start()
class ReadModelByEmail:
"""针对邮箱查询优化的读模型"""
def __init__(self):
# 使用哈希索引
self.email_index = {} # email -> user_data
def apply_event(self, event: dict) -> None:
"""应用事件更新读模型"""
if event['type'] == 'UserCreated':
user_data = event['data']
self.email_index[user_data['email']] = user_data
elif event['type'] == 'UserUpdated':
old_email = event['old_data']['email']
new_email = event['data']['email']
if old_email != new_email:
del self.email_index[old_email]
self.email_index[new_email] = event['data']
else:
self.email_index[new_email] = event['data']
def find(self, email: str) -> Optional[dict]:
"""O(1) 查询"""
return self.email_index.get(email)
class ReadModelByCountry:
"""针对国家查询优化的读模型"""
def __init__(self):
# 使用倒排索引
self.country_index = {} # country -> [user_data]
def apply_event(self, event: dict) -> None:
if event['type'] == 'UserCreated':
user_data = event['data']
country = user_data['country']
if country not in self.country_index:
self.country_index[country] = []
self.country_index[country].append(user_data)
def find(self, country: str) -> List[dict]:
"""O(1) 查询"""
return self.country_index.get(country, [])8.2 变更数据捕获(CDC)
CDC 从数据库的事务日志中捕获变更,用于同步索引和物化视图:
class ChangeDataCapture:
"""CDC 实现 - 从 WAL 捕获变更"""
def __init__(self, database: str):
self.database = database
self.wal_reader = WALReader(database)
self.subscribers = []
def subscribe(self, callback: callable) -> None:
"""订阅变更事件"""
self.subscribers.append(callback)
def start(self) -> None:
"""启动 CDC 进程"""
print("启动 CDC,监听 WAL...")
# 从上次检查点继续
last_lsn = self._load_checkpoint()
for wal_entry in self.wal_reader.read_from(last_lsn):
# 解析 WAL 条目
change_event = self._parse_wal_entry(wal_entry)
if change_event:
# 通知所有订阅者
for subscriber in self.subscribers:
try:
subscriber(change_event)
except Exception as e:
print(f"订阅者处理失败: {e}")
# 保存检查点
self._save_checkpoint(wal_entry.lsn)
def _parse_wal_entry(self, wal_entry: WALEntry) -> Optional[dict]:
"""解析 WAL 条目为变更事件"""
if wal_entry.type == WALEntryType.INSERT:
return {
'operation': 'INSERT',
'table': wal_entry.table,
'new_row': wal_entry.new_tuple,
'timestamp': wal_entry.timestamp,
'lsn': wal_entry.lsn
}
elif wal_entry.type == WALEntryType.UPDATE:
return {
'operation': 'UPDATE',
'table': wal_entry.table,
'old_row': wal_entry.old_tuple,
'new_row': wal_entry.new_tuple,
'timestamp': wal_entry.timestamp,
'lsn': wal_entry.lsn
}
elif wal_entry.type == WALEntryType.DELETE:
return {
'operation': 'DELETE',
'table': wal_entry.table,
'old_row': wal_entry.old_tuple,
'timestamp': wal_entry.timestamp,
'lsn': wal_entry.lsn
}
return None
# 使用 CDC 维护全局索引
class CDCBasedGlobalIndex:
"""基于 CDC 的全局索引"""
def __init__(self):
self.index_partitions = [IndexPartition(i) for i in range(10)]
self.cdc = ChangeDataCapture('users_db')
# 订阅 CDC 事件
self.cdc.subscribe(self.handle_change_event)
def handle_change_event(self, event: dict) -> None:
"""处理变更事件,更新索引"""
if event['table'] != 'users':
return
if event['operation'] == 'INSERT':
self._index_new_user(event['new_row'])
elif event['operation'] == 'UPDATE':
self._update_user_index(event['old_row'], event['new_row'])
elif event['operation'] == 'DELETE':
self._remove_user_index(event['old_row'])
def _index_new_user(self, user: dict) -> None:
"""索引新用户"""
# 根据 country 确定索引分区
partition_id = hash(user['country']) % len(self.index_partitions)
partition = self.index_partitions[partition_id]
partition.add_entry(
index_name='country',
index_value=user['country'],
doc_ref=(user['user_id'], user)
)8.3 Debezium 实战示例
Debezium 是流行的开源 CDC 平台:
# Debezium 配置示例
debezium_config = {
"name": "users-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "secret",
"database.dbname": "users_db",
"database.server.name": "users",
"table.include.list": "public.users",
"plugin.name": "pgoutput", # PostgreSQL 逻辑复制插件
"slot.name": "debezium_users"
}
}
# 消费 Debezium 事件
from kafka import KafkaConsumer
import json
def consume_debezium_events():
"""从 Kafka 消费 Debezium CDC 事件"""
consumer = KafkaConsumer(
'users.public.users', # Debezium topic
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
# Debezium 事件结构
operation = event['op'] # c=create, u=update, d=delete
if operation == 'c': # Create
new_user = event['after']
global_index.index_new_user(new_user)
elif operation == 'u': # Update
old_user = event['before']
new_user = event['after']
global_index.update_user_index(old_user, new_user)
elif operation == 'd': # Delete
old_user = event['before']
global_index.remove_user_index(old_user)九、生产环境的实践建议
9.1 索引设计清单
class IndexDesignChecklist:
"""索引设计清单"""
@staticmethod
def evaluate_index_design(requirements: dict) -> dict:
"""评估索引设计"""
questions = {
"查询模式": [
"主要查询是点查询还是范围查询?",
"查询的选择性如何?(返回多少行)",
"是否需要排序、聚合?",
"查询 QPS 是多少?"
],
"写入模式": [
"写入 QPS 是多少?",
"写入是否有突发(burst)?",
"能否接受异步索引更新?",
"写入延迟预算是多少?"
],
"一致性": [
"读需要强一致性还是最终一致性?",
"索引可以落后主数据多久?",
"不一致的窗口期对业务的影响?"
],
"扩展性": [
"预期数据量增长速度?",
"索引是否支持在线扩容?",
"每个索引的基数(cardinality)?"
],
"成本": [
"存储成本预算?",
"计算资源预算?",
"运维复杂度接受度?"
]
}
return questions
### 9.2 常见反模式
```python
class AntiPatterns:
"""索引设计反模式"""
@staticmethod
def antipattern_1_too_many_indexes():
"""反模式 1: 索引过多"""
print("问题:为每个字段都建索引")
print("后果:")
print(" - 写入性能严重下降")
print(" - 存储成本翻倍甚至更高")
print(" - 索引维护成本高")
print("\n建议:只为频繁查询的字段建索引")
print(" - 分析查询日志,找到热点查询")
print(" - 使用复合索引覆盖多个查询")
print(" - 定期审查和删除无用索引")
@staticmethod
def antipattern_2_global_index_with_sync_update():
"""反模式 2: 全局索引使用同步更新"""
print("问题:全局索引每次写入都同步更新所有索引分区")
print("后果:")
print(" - 写入延迟非常高")
print(" - 分布式事务复杂,容易失败")
print(" - 系统可用性下降")
print("\n建议:使用异步复制")
print(" - 接受最终一致性")
print(" - 使用 CDC 或事件流")
print(" - 在应用层处理不一致窗口期")
@staticmethod
def antipattern_3_scatter_gather_without_pruning():
"""反模式 3: 散射-聚集时不做分区裁剪"""
print("问题:本地索引查询时无脑扇出到所有分区")
print("后果:")
print(" - 查询延迟高")
print(" - 浪费网络和 CPU 资源")
print("\n建议:尽量包含分区键进行分区裁剪")
print("示例:")
print(" 差:SELECT * FROM users WHERE country = 'China'")
print(" 好:SELECT * FROM users WHERE user_id_prefix = '86' AND country = 'China'")
print(" (假设 user_id 按前缀分区)")
@staticmethod
def antipattern_4_hot_partition_in_global_index():
"""反模式 4: 全局索引的热点分区"""
print("问题:索引键分布不均匀,导致某些索引分区过热")
print("示例:按国家分区,中国、美国的索引分区远大于其他")
print("后果:")
print(" - 热点分区成为瓶颈")
print(" - 负载不均衡")
print("\n建议:")
print(" - 使用复合分区键:(country, user_id_hash)")
print(" - 对热点键进行拆分")
print(" - 监控分区大小,动态再平衡")9.3 监控指标
class IndexMonitoring:
"""索引监控指标"""
def collect_metrics(self) -> dict:
"""收集关键指标"""
return {
"写入指标": {
"index_write_latency_p99": "索引写入 P99 延迟",
"index_write_throughput": "索引写入吞吐量",
"index_update_failures": "索引更新失败次数",
"index_replication_lag": "索引复制延迟(秒)"
},
"查询指标": {
"index_query_latency_p99": "索引查询 P99 延迟",
"index_hit_rate": "索引命中率",
"scatter_gather_fanout": "散射-聚集扇出数",
"index_scan_rows": "索引扫描行数"
},
"存储指标": {
"index_size_bytes": "索引大小",
"index_data_ratio": "索引/数据比例",
"index_fragmentation": "索引碎片率"
},
"一致性指标": {
"index_data_inconsistency_count": "索引数据不一致数量",
"stale_index_reads": "读取到陈旧索引的次数",
"index_repair_operations": "索引修复操作次数"
}
}
def set_alerts(self) -> dict:
"""设置告警阈值"""
return {
"index_replication_lag > 10s": "告警:索引复制延迟过高",
"index_update_failures > 100/min": "告警:索引更新失败率高",
"index_data_ratio > 2.0": "告警:索引占用空间过大",
"scatter_gather_fanout > 100": "告警:查询扇出过多"
}十、本地索引与全局索引的延迟/一致性权衡
在实际系统中,本地索引和全局索引的选择本质上是写入延迟、查询延迟、一致性保证三者之间的权衡。本节从工程实践角度深入分析这组权衡关系。
10.1 写入路径延迟对比
| 操作 | 本地索引 | 全局索引(同步) | 全局索引(异步) |
|---|---|---|---|
| 数据写入 | 单分区本地写 | 跨分区分布式事务 | 单分区本地写 |
| 索引更新 | 与数据在同一分区,原子操作 | 需要 2PC 或类似协议 | 异步队列推送 |
| 写入延迟 | 低(1-5ms) | 高(10-50ms,取决于参与者数量) | 低(1-5ms) |
| 一致性保证 | 强一致:数据和索引原子更新 | 强一致:分布式事务保证 | 最终一致:存在延迟窗口 |
同步全局索引的延迟开销来源:每次写入需要跨分区协调。假设数据分区和索引分区在不同节点,一次写入至少需要一个额外的跨节点 RTT(Round-Trip Time)。如果使用 2PC 协调,则需要两个额外 RTT。对于写入密集型工作负载,这个开销可能成为瓶颈。
异步全局索引的一致性窗口:DynamoDB GSI 的复制延迟通常在几百毫秒以内,但在主表写入突增或 GSI 限流时,延迟可能达到秒级甚至更长。应用层必须容忍这种不一致,或者通过”读主表确认”的方式进行补偿查询。
10.2 查询路径延迟对比
| 查询类型 | 本地索引 | 全局索引 |
|---|---|---|
| 按索引键点查 | 必须散射-聚集所有分区(P 个 RPC) | 精确定位 1-2 个分区(1-2 个 RPC) |
| 按索引键范围查询 | 散射-聚集,结果需合并排序 | 索引分区内有序扫描 |
| 尾延迟(P99) | 受最慢分区影响,随分区数线性增长 | 基本稳定,不受分区数影响 |
当分区数量增长到数百甚至数千时,本地索引的散射-聚集查询延迟会显著恶化。全局索引的查询延迟则相对稳定,这是大规模系统倾向于使用全局索引的核心原因。
10.3 一致性级别的实际影响
在生产系统中,一致性要求往往因业务场景而异:
- 金融交易类:必须强一致。用户转账后立即查询余额,必须看到最新值。此场景适合本地索引或同步全局索引。
- 社交信息流:可接受最终一致。用户发布内容后,其他用户的信息流在几秒内更新即可。此场景适合异步全局索引。
- 电商库存查询:可接受短暂不一致,但需要在关键操作(如下单)时进行强一致校验。此场景适合异步全局索引 + 主表回查确认。
十一、再平衡对二级索引的影响
当主数据发生再平衡(Rebalancing)时,二级索引面临的挑战因索引类型不同而差异巨大。
11.1 本地索引的再平衡
本地索引与数据绑定在同一分区,因此数据迁移时索引必须随数据一起迁移。核心影响:
- 迁移数据量翻倍:每迁移一个分区的数据,必须同时迁移该分区的所有本地索引。如果有 N 个本地索引,实际迁移的数据量约为原始数据的 (1 + N) 倍。
- 索引重建时机:在目标节点上,可以选择直接迁移索引文件(快,但需要格式兼容),或者只迁移数据后在目标节点重建索引(慢,但更安全)。
- 过期读取窗口:迁移过程中,如果客户端缓存了旧的路由信息,可能从旧分区读到过期的索引数据。需要 Epoch 机制或版本号来检测和重试。
11.2 全局索引的再平衡
全局索引独立于数据分区,主数据再平衡对全局索引的影响更为复杂:
- 索引条目指向失效:全局索引中存储的是”数据所在分区 ID + 主键”的指针。当数据从分区 A 迁移到分区 B 后,索引中的指针仍然指向分区 A,导致查询失败或需要二次重定向。
- 索引重建成本:最简单的修复方式是全量重建全局索引,但对于大规模数据集,重建可能需要数小时甚至数天。增量修复(只更新受影响的索引条目)更高效,但实现复杂。
- 过渡期的过期读取:在索引更新完成之前,通过全局索引查询可能返回过期结果(指向旧分区)或缺失结果(新分区的数据尚未被索引)。
11.3 减轻再平衡影响的策略
| 策略 | 适用索引类型 | 描述 |
|---|---|---|
| 索引与数据共迁移 | 本地索引 | 将索引文件作为数据迁移的一部分,保证原子性 |
| 双写过渡期 | 全局索引 | 迁移期间同时更新新旧分区的索引条目,完成后清理旧条目 |
| 惰性修复 | 全局索引 | 查询时发现指针失效后异步修复索引条目,避免全量重建 |
| 后台全量校验 | 两者 | 再平衡完成后启动后台任务,逐条对比数据与索引,修复不一致 |
十二、总结
分区环境下的二级索引是分布式系统中最复杂的问题之一,没有银弹解决方案,只有根据具体场景的权衡:
本地索引: - ✅ 写入简单、快速、强一致 - ✅ 索引与数据同分区,易于管理 - ❌ 查询需要散射-聚集,延迟高 - 适用:写多读少、分析型查询、强一致性要求
全局索引: - ✅ 查询精确定位,延迟低 - ✅ 支持高效的点查询 - ❌ 写入涉及多个分区,复杂度高 - ❌ 通常是最终一致性 - 适用:读多写少、点查询、可接受最终一致性
物化视图: - ✅ 查询极快(预计算) - ✅ 支持复杂聚合 - ❌ 写放大严重 - ❌ 存储开销大 - 适用:读写比极高、固定查询模式
在实践中,这三种方案往往组合使用: - 主表使用本地索引支持强一致查询 - 热点查询字段使用全局索引加速 - 复杂报表使用物化视图预计算 - CQRS 架构分离读写模型 - CDC 管道保持索引同步
设计索引时,务必: 1. 分析真实查询模式和访问频率 2. 评估读写比例和性能要求 3. 权衡一致性与性能 4. 监控索引效率,持续优化 5. 避免过度索引
记住:索引是为查询服务的,不是为了数据库的美观。每个索引都有成本,只建立真正需要的索引。
参考文献
Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 6: Partitioning.
Amazon Web Services. Amazon DynamoDB Developer Guide: Global Secondary Indexes.
Elasticsearch B.V. Elasticsearch: The Definitive Guide. O’Reilly Media, 2015. Chapter 13: Distributed Search.
Rick Houlihan. Advanced Design Patterns for Amazon DynamoDB. AWS re:Invent, 2019.
Pat Helland. Immutability Changes Everything. CIDR 2015.
Giuseppe DeCandia et al. Dynamo: Amazon’s Highly Available Key-value Store. SOSP 2007.
Apache Cassandra Project. Secondary Indexes in Cassandra.
MongoDB Inc. MongoDB Manual: Sharding and Indexes.
Jay Kreps. The Log: What every software engineer should know about real-time data’s unifying abstraction. LinkedIn Engineering Blog, 2013.
Kleppmann, Martin and Beresford, Alastair R. A Conflict-Free Replicated JSON Datatype. IEEE Transactions on Parallel and Distributed Systems, 2017.
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【分布式系统百科】数据再平衡:固定分区、动态分区与 TiKV 的调度策略
在上一篇文章中,我们讨论了分布式系统中的二级索引问题。本文将深入探讨数据再平衡(Rebalancing)的核心策略和实现细节。当分布式系统运行一段时间后,数据分布可能会变得不均匀,节点可能会加入或离开集群,这时就需要再平衡机制来重新分配数据,保证系统的负载均衡和高可用性。
【系统架构设计百科】搜索引擎架构:倒排索引之上的系统设计
一个生产级搜索系统除了倒排索引,还需要哪些架构组件?本文分析分布式索引的分片策略、相关性排序的工程实现,深入 Elasticsearch 的集群架构与运维陷阱。
【分布式系统百科】Dynamo 论文精读:最终一致性的工业级范本
2007 年,Amazon 在 SOSP 会议上发表了《Dynamo: Amazon's Highly Available Key-value Store》论文,这篇论文彻底改变了分布式存储系统的设计思路。与追求强一致性的传统数据库不同,Dynamo 选择了一条完全不同的道路:牺牲一致性,换取可用性和分区容错性。这个设…
【分布式系统百科】大规模故障复盘:从真实事故中学习分布式系统设计
精选 8 个真实大规模分布式系统故障案例,逐一分析根因、传播路径、恢复过程与事后改进,提炼分布式系统可靠性设计的共性教训。