凌晨三点,某电商平台的 DBA 在公司群里发了一条消息:“开始执行 ALTER TABLE orders ADD COLUMN shipping_address_id BIGINT,预计锁表 40 分钟,期间订单服务不可用。”40 分钟后,表锁释放,主从复制延迟从 0 秒飙到 12 分钟,从库的读请求全部返回过期数据。凌晨四点半,复制追上了,DBA 发了第二条消息:“迁移完成,可以恢复流量。”
这是 2015 年之前大多数团队做数据库 schema 迁移的方式——挑一个流量最低的时间窗口停机维护。当业务规模还小的时候,凌晨三点停半个小时没人在意。但当你的系统服务全球用户、每秒处理上万笔交易、任何停机都意味着真金白银的损失时,“找个没人的时间窗口”这个前提就不存在了。
在线数据迁移(Online Data Migration)要解决的核心问题是:在系统持续对外提供服务的同时,完成数据库 schema 变更、数据格式升级、甚至底层存储引擎的更换。这件事的难度远超大多数工程师的直觉——它不只是一个 SQL 语句的问题,而是一个涉及并发控制、数据一致性、回滚安全性的系统工程问题。
在 上一篇 中我们讨论了时序数据的存储与查询架构,本文聚焦另一个工程现实:当数据模型需要演进时,如何在不停机的前提下安全地完成迁移。
一、为什么在线迁移如此困难
要理解在线迁移方案为什么设计得这么复杂,先要搞清楚传统迁移方式的三个致命问题。
1.1 锁表:ALTER TABLE 的隐藏代价
MySQL 的 InnoDB 引擎在执行 DDL(Data Definition
Language)语句时,不同操作对表的锁定行为差异很大。以 MySQL
5.6 之前的版本为例,ALTER TABLE ... ADD COLUMN
的执行流程如下:
- 获取表的元数据锁(Metadata Lock,MDL)。
- 创建一张临时表,按照新的 schema 结构建表。
- 逐行拷贝原表数据到临时表。
- 交换原表和临时表的名字。
- 删除旧表。
在步骤 3 期间,整张表处于只读或完全锁定状态,所有写入操作被阻塞。如果表有 1 亿行数据,拷贝过程可能持续几十分钟甚至数小时。这期间所有对该表的 INSERT、UPDATE、DELETE 操作全部排队等待。
MySQL 5.6 引入了在线 DDL(Online DDL),允许部分 ALTER TABLE 操作在不锁表的情况下执行。但”在线”并不意味着完全无影响:
-- MySQL 5.6+ 的在线 DDL 语法
ALTER TABLE orders ADD COLUMN shipping_address_id BIGINT, ALGORITHM=INPLACE, LOCK=NONE;即使使用
ALGORITHM=INPLACE, LOCK=NONE,以下问题仍然存在:
- 并非所有 DDL 操作都支持 INPLACE 算法。修改列类型、删除主键等操作仍然需要 COPY 算法,必须锁表。
- 在线 DDL 执行期间会产生大量的行日志(Row Log),记录执行过程中的并发写入,执行结束时需要回放这些日志。如果并发写入量很大,回放时间也会很长。
- 执行期间会占用大量的磁盘 I/O 和 CPU 资源,影响正常业务查询的性能。
PostgreSQL
的情况稍好一些。ALTER TABLE ... ADD COLUMN
如果新列有默认值且版本在 11 以上,只需要修改系统目录(System
Catalog),不需要重写整张表,基本可以做到瞬间完成。但
ALTER TABLE ... ALTER COLUMN TYPE
仍然需要重写全表。
1.2 长事务与复制延迟
在主从复制架构(Primary-Replica Replication)中,DDL 语句的执行会被记录到二进制日志(Binlog)中,然后传播到从库执行。问题在于:从库是单线程回放 Binlog 的(MySQL 5.7 之前),DDL 在主库执行了 30 分钟,从库也需要 30 分钟来回放。
在这 30 分钟内,从库的复制位点停滞不前,所有后续的 DML(Data Manipulation Language)操作都在排队。对于依赖从库做读分离的应用来说,这意味着 30 分钟的数据不一致。
更危险的情况是 DDL 导致主从切换失败。如果主库在执行 DDL 期间宕机,切换到从库时从库还在回放 DDL,无法立即接管写入流量。这种场景在生产环境中并不罕见。
时间线示意:
主库: |----- ALTER TABLE (30min) -----|--> 正常写入
| |
Binlog: DDL event DML events
| |
从库: |- 等待 DDL event -|- ALTER TABLE (30min) -|- 回放 DML -|
| |
←-------- 复制延迟窗口 --------→
1.3 回滚的不可逆性
传统的 DDL 操作很难回滚。一旦
ALTER TABLE ... DROP COLUMN
执行完成,被删除的列和数据就永久丢失了。即使你在执行前做了备份,从备份恢复也意味着停机——你需要停止写入、导入备份、追赶备份之后的增量数据。这个过程的时间取决于表的大小和增量数据量,可能从几分钟到几小时不等。
这三个问题叠加在一起,使得在线数据迁移成为一个需要精心设计的系统工程问题,而不是一条 SQL 语句能解决的事情。
1.4 传统方案对比
| 方案 | 锁表影响 | 复制延迟 | 回滚能力 | 适用场景 |
|---|---|---|---|---|
| 直接 ALTER TABLE | 严重,可能锁表数小时 | 等于 DDL 执行时间 | 无法回滚 DROP 操作 | 小表、允许停机 |
| MySQL Online DDL | 部分操作免锁 | 仍有延迟 | 有限 | 中等规模表、支持 INPLACE 的操作 |
| pt-online-schema-change | 不锁原表 | 触发器带来额外开销 | 可通过保留旧表回滚 | 大表,但写入量不能太高 |
| gh-ost | 不锁原表、无触发器 | 可控 | 可随时中止和回滚 | 大表、高写入场景 |
| 蓝绿部署 + 双写 | 无锁表 | 无复制延迟问题 | 完全可回滚 | 存储引擎更换、大规模重构 |
二、Expand-Contract 模式:安全迁移的基本范式
Expand-Contract 模式(扩展-收缩模式)是在线数据迁移的基本思想框架。它的核心理念是:不要试图一步到位地完成迁移,而是分成”扩展”和”收缩”两个阶段,中间留出足够的过渡期和验证窗口。
2.1 模式概述
Expand-Contract 模式将一次数据迁移拆分为三个阶段:
- 扩展(Expand):在不破坏现有功能的前提下,添加新的数据结构。旧结构保持不变,新旧结构并存。
- 迁移(Migrate):逐步将数据从旧结构迁移到新结构,同时保持应用对两种结构的兼容。
- 收缩(Contract):确认迁移完成且新结构工作正常后,删除旧的数据结构。
这种分阶段的做法带来两个关键好处:
- 每一步都是可回滚的。扩展阶段只是添加了新结构,如果出问题可以直接删除新结构,不影响旧结构。迁移阶段如果数据校验失败,可以回退到只使用旧结构。只有在收缩阶段(删除旧结构)才是不可逆的,而此时你已经充分验证了新结构的正确性。
- 每一步都不需要停机。添加新列、新表不需要锁旧表。数据迁移可以后台批量执行。删除旧列在确认安全后执行。
2.2 具体案例:拆分 address 字段
假设你的 users 表有一个 address
文本字段,存储的是用户的完整地址字符串。现在产品需求要求支持按省份、城市筛选用户,你需要把
address 拆分为
province、city、district、street
四个独立字段。
第一阶段:扩展
添加新的列,不删除旧列,不修改应用读取逻辑:
-- 扩展阶段:添加新列
ALTER TABLE users ADD COLUMN province VARCHAR(50);
ALTER TABLE users ADD COLUMN city VARCHAR(50);
ALTER TABLE users ADD COLUMN district VARCHAR(100);
ALTER TABLE users ADD COLUMN street VARCHAR(200);修改应用的写入逻辑,同时写入旧字段和新字段:
def update_user_address(user_id: int, province: str, city: str, district: str, street: str):
full_address = f"{province}{city}{district}{street}"
db.execute("""
UPDATE users
SET address = %s,
province = %s,
city = %s,
district = %s,
street = %s
WHERE id = %s
""", (full_address, province, city, district, street, user_id))注意此时应用的读取逻辑仍然使用旧的 address
字段,所以即使新列的数据有问题,也不影响现有功能。
第二阶段:迁移
编写后台任务,批量将历史数据从 address
字段解析并填充到新字段:
import re
from typing import Optional, Tuple
def parse_address(address: str) -> Tuple[str, str, str, str]:
"""解析地址字符串为省市区街道四个部分"""
province_pattern = r'([\u4e00-\u9fa5]{2,}(?:省|自治区|市))'
city_pattern = r'([\u4e00-\u9fa5]{2,}(?:市|自治州|地区|盟))'
district_pattern = r'([\u4e00-\u9fa5]{2,}(?:区|县|市|旗))'
province = re.search(province_pattern, address)
city = re.search(city_pattern, address)
district = re.search(district_pattern, address)
province_str = province.group(0) if province else ''
city_str = city.group(0) if city else ''
district_str = district.group(0) if district else ''
remaining = address
for part in [province_str, city_str, district_str]:
remaining = remaining.replace(part, '', 1)
street_str = remaining.strip()
return province_str, city_str, district_str, street_str
def backfill_addresses(batch_size: int = 1000):
"""批量回填地址数据"""
last_id = 0
total_migrated = 0
total_failed = 0
while True:
rows = db.query("""
SELECT id, address FROM users
WHERE id > %s AND province IS NULL AND address IS NOT NULL
ORDER BY id ASC
LIMIT %s
""", (last_id, batch_size))
if not rows:
break
for row in rows:
try:
province, city, district, street = parse_address(row['address'])
db.execute("""
UPDATE users
SET province = %s, city = %s, district = %s, street = %s
WHERE id = %s AND province IS NULL
""", (province, city, district, street, row['id']))
total_migrated += 1
except Exception as e:
logging.error(f"Failed to parse address for user {row['id']}: {e}")
total_failed += 1
last_id = row['id']
logging.info(f"Migrated {total_migrated} rows, failed {total_failed}, last_id={last_id}")
time.sleep(0.1) # 控制回填速度,避免影响在线业务
return total_migrated, total_failed回填完成后,运行校验脚本,确认新旧数据一致:
def verify_migration(sample_size: int = 10000) -> dict:
"""抽样校验迁移数据的正确性"""
rows = db.query("""
SELECT id, address, province, city, district, street
FROM users
WHERE province IS NOT NULL
ORDER BY RAND()
LIMIT %s
""", (sample_size,))
results = {'total': len(rows), 'match': 0, 'mismatch': 0, 'mismatches': []}
for row in rows:
reconstructed = f"{row['province']}{row['city']}{row['district']}{row['street']}"
if reconstructed == row['address']:
results['match'] += 1
else:
results['mismatch'] += 1
if len(results['mismatches']) < 100:
results['mismatches'].append({
'id': row['id'],
'original': row['address'],
'reconstructed': reconstructed
})
results['match_rate'] = results['match'] / results['total'] if results['total'] > 0 else 0
return results校验通过后,切换应用的读取逻辑,从新字段读取:
def get_user_address(user_id: int) -> dict:
row = db.query_one("SELECT province, city, district, street FROM users WHERE id = %s", (user_id,))
return {
'province': row['province'],
'city': row['city'],
'district': row['district'],
'street': row['street'],
'full': f"{row['province']}{row['city']}{row['district']}{row['street']}"
}第三阶段:收缩
确认新字段工作正常、没有代码再引用旧的
address 字段后,删除旧列:
-- 收缩阶段:删除旧列(确认安全后执行)
ALTER TABLE users DROP COLUMN address;2.3 Expand-Contract 工作流
以下是 Expand-Contract 模式的完整工作流:
flowchart TD
A[当前状态:旧 Schema] --> B[扩展:添加新列/新表]
B --> C[部署新代码:双写新旧结构]
C --> D[后台回填历史数据]
D --> E{数据校验通过?}
E -->|否| F[修复数据/修复解析逻辑]
F --> D
E -->|是| G[切换读取逻辑到新结构]
G --> H[观察期:监控错误率和延迟]
H --> I{新结构稳定?}
I -->|否| J[回切到旧结构读取]
J --> H
I -->|是| K[移除旧结构的写入代码]
K --> L[收缩:删除旧列/旧表]
L --> M[迁移完成]
2.4 关键原则
Expand-Contract 模式有几条必须遵守的原则:
向后兼容(Backward Compatibility):扩展阶段添加的新结构不能破坏现有代码对旧结构的读写。这意味着新列必须允许 NULL 或有默认值,新表不能要求旧代码也写入。
向前兼容(Forward Compatibility):收缩阶段删除旧结构之前,必须确认所有代码已经切换到新结构。在微服务架构中,这意味着所有消费该表的服务都必须完成代码更新。
独立部署(Independent Deployment):Schema 变更和代码变更应该是独立的部署步骤。不要在同一个发布中既修改 schema 又修改代码——如果发布失败需要回滚,你需要能独立回滚代码而不影响 schema,反之亦然。
可观测性(Observability):迁移的每个阶段都应该有监控指标。回填进度、校验结果、新旧结构的读写比例、错误率——这些指标决定了你是否可以安全地进入下一个阶段。
三、双写双读的一致性保证
当迁移涉及更换底层存储(例如从 MySQL 迁移到 PostgreSQL,或从自建数据库迁移到云数据库),简单的 Expand-Contract 模式不够用——你需要在两个完全独立的存储系统之间保持数据一致。这就引出了双写双读(Dual Write / Dual Read)策略。
3.1 双写的基本思路
双写是指应用同时向旧存储和新存储写入数据。基本的实现方式有两种:
同步双写:在同一个请求处理流程中,先写旧库,再写新库(或反过来)。两个写入都成功才返回成功。
public class DualWriteRepository {
private final OldRepository oldRepo;
private final NewRepository newRepo;
private final MigrationConfig config;
public void save(Order order) {
// 先写旧库(主库)
oldRepo.save(order);
try {
// 再写新库(目标库)
newRepo.save(transform(order));
} catch (Exception e) {
// 新库写入失败不影响业务
// 但需要记录到修复队列
repairQueue.enqueue(new RepairTask(order.getId(), "save", e));
metrics.increment("dual_write.new_store.failure");
}
}
private NewOrder transform(Order order) {
// 将旧数据模型转换为新数据模型
return NewOrder.builder()
.id(order.getId())
.customerId(order.getCustomerId())
.totalAmount(order.getTotalCents() / 100.0)
.currency(order.getCurrency())
.shippingAddress(AddressMapper.fromLegacy(order.getAddress()))
.createdAt(order.getCreatedAt())
.build();
}
}异步双写:旧库写入成功后,通过消息队列(Message Queue)或变更数据捕获(Change Data Capture,CDC)将变更异步推送到新库。
class AsyncDualWriter:
def __init__(self, old_db, change_publisher):
self.old_db = old_db
self.change_publisher = change_publisher
def save_order(self, order: Order):
# 只写旧库
self.old_db.save(order)
# 发布变更事件,由消费者异步写入新库
event = ChangeEvent(
table='orders',
operation='INSERT',
data=order.to_dict(),
timestamp=datetime.utcnow()
)
self.change_publisher.publish('order-changes', event)3.2 双写的一致性问题
双写看起来简单,但一致性保证极其困难。以下是三个常见的一致性问题:
问题一:写入顺序不一致
假设用户先创建订单,再修改订单地址。在同步双写场景下:
请求1: INSERT order-123 → 旧库成功 → 新库成功
请求2: UPDATE order-123 → 旧库成功 → 新库失败(网络超时)
请求3: UPDATE order-123 → 旧库成功 → 新库成功
最终状态:
旧库: order-123 的地址是请求3的值 (正确)
新库: order-123 的地址是请求3的值 (正确,但跳过了请求2)
这个例子看起来没问题,但考虑并发场景:
线程A: UPDATE order-123 SET status='paid' → 旧库成功(T1) → 新库写入中(T3)
线程B: UPDATE order-123 SET status='cancelled' → 旧库成功(T2) → 新库成功(T4)
如果 T4 < T3(线程B先写入新库完成):
旧库: status = 'cancelled' (T2 > T1,后写入的覆盖前写入的)
新库: status = 'paid' (T3 > T4,线程A的写入覆盖了线程B)
数据不一致。
问题二:部分写入失败
旧库写入成功但新库写入失败,或者反过来。如果不使用分布式事务(Distributed Transaction),两个库的数据就会不一致。而分布式事务(如两阶段提交 2PC)性能开销大、可用性低,在在线迁移场景中通常不可接受。
问题三:幂等性要求
在异步双写场景中,消息可能被重复消费。如果消费者没有做幂等处理,同一条变更可能被应用多次,导致数据异常。
3.3 一致性保证机制
针对上述问题,工业实践中通常采用以下机制来保证一致性:
机制一:以旧库为准(Source of Truth)
在迁移过程中,明确旧库是数据的唯一权威来源。新库的数据只是旧库的副本。所有读取操作仍然从旧库读取,新库的数据仅用于验证。
class MigrationAwareRepository:
def __init__(self, old_db, new_db, config):
self.old_db = old_db
self.new_db = new_db
self.config = config
def find_order(self, order_id: str) -> Order:
# 始终从旧库读取
order = self.old_db.find_order(order_id)
# 如果开启了影子读取,同时从新库读取并比较
if self.config.shadow_read_enabled:
try:
new_order = self.new_db.find_order(order_id)
self._compare_and_report(order, new_order)
except Exception as e:
metrics.increment("shadow_read.failure")
return order
def _compare_and_report(self, old_order, new_order):
"""比较新旧库的数据,上报差异"""
if new_order is None:
metrics.increment("shadow_read.missing_in_new")
return
diffs = []
for field in ['status', 'total_amount', 'customer_id']:
old_val = getattr(old_order, field)
new_val = getattr(new_order, field)
if old_val != new_val:
diffs.append(f"{field}: old={old_val}, new={new_val}")
if diffs:
metrics.increment("shadow_read.mismatch")
logging.warning(f"Data mismatch for order {old_order.id}: {diffs}")
else:
metrics.increment("shadow_read.match")机制二:CDC(变更数据捕获)代替应用层双写
与其在应用层实现双写,不如使用 CDC 工具(如 Debezium、Maxwell、Canal)直接从数据库的事务日志(Binlog / WAL)中捕获变更,然后推送到新库。这种方式的优势是:
- 不需要修改应用代码。
- 捕获的是数据库级别的所有变更,包括直接 SQL 操作和运维脚本。
- 事务日志本身就保证了变更的顺序性。
# Debezium 连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: orders-cdc-connector
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.hostname: old-mysql-primary
database.port: "3306"
database.user: cdc_reader
database.password: "${CDC_PASSWORD}"
database.server.id: "10001"
database.server.name: "orders-db"
database.include.list: "ecommerce"
table.include.list: "ecommerce.orders,ecommerce.order_items"
database.history.kafka.bootstrap.servers: "kafka:9092"
database.history.kafka.topic: "schema-changes.orders"
snapshot.mode: "initial"
decimal.handling.mode: "string"
tombstones.on.delete: "true"机制三:校验与修复闭环
即使使用了 CDC,也不能假设新旧库的数据一定一致。生产环境中总会有各种意外:网络抖动导致 CDC 事件丢失、消费者 bug 导致数据转换错误、存储引擎差异导致精度丢失。因此需要一个持续运行的校验和修复闭环:
class ConsistencyChecker:
def __init__(self, old_db, new_db, repair_queue):
self.old_db = old_db
self.new_db = new_db
self.repair_queue = repair_queue
def check_batch(self, table: str, start_id: int, end_id: int) -> dict:
"""批量比对一段 ID 范围内的数据"""
old_rows = self.old_db.query(
f"SELECT * FROM {table} WHERE id BETWEEN %s AND %s ORDER BY id",
(start_id, end_id)
)
new_rows = self.new_db.query(
f"SELECT * FROM {table} WHERE id BETWEEN %s AND %s ORDER BY id",
(start_id, end_id)
)
old_map = {row['id']: row for row in old_rows}
new_map = {row['id']: row for row in new_rows}
stats = {'checked': 0, 'match': 0, 'mismatch': 0, 'missing_new': 0, 'extra_new': 0}
# 检查旧库有但新库没有的记录
for id, old_row in old_map.items():
stats['checked'] += 1
if id not in new_map:
stats['missing_new'] += 1
self.repair_queue.enqueue(RepairTask(table, id, 'missing_in_new'))
else:
if self._rows_match(old_row, new_map[id]):
stats['match'] += 1
else:
stats['mismatch'] += 1
self.repair_queue.enqueue(RepairTask(table, id, 'mismatch'))
# 检查新库有但旧库没有的记录
for id in new_map:
if id not in old_map:
stats['extra_new'] += 1
self.repair_queue.enqueue(RepairTask(table, id, 'extra_in_new'))
return stats
def _rows_match(self, old_row: dict, new_row: dict, ignored_fields=None) -> bool:
"""比较两行数据是否一致"""
ignored = ignored_fields or {'updated_at', 'created_at'}
for key in old_row:
if key in ignored:
continue
if key not in new_row:
return False
if old_row[key] != new_row[key]:
return False
return True3.4 双写双读策略对比
| 策略 | 一致性保证 | 性能影响 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 同步双写 | 强,但有并发竞争风险 | 高,每次写入延迟翻倍 | 中 | 写入量低、数据一致性要求极高 |
| 异步双写(消息队列) | 最终一致 | 低 | 中 | 写入量中等、可接受短暂不一致 |
| CDC(Binlog/WAL 捕获) | 最终一致,顺序保证 | 极低(对应用无感知) | 高(需要运维 CDC 管道) | 大规模迁移、不想修改应用代码 |
| 影子读取 + 校验修复 | 补充机制,非独立方案 | 读取延迟略增 | 中 | 配合上述任一策略使用 |
3.5 回切方案
在线迁移必须有回切(Rollback)方案。回切不是”出了问题再想办法”,而是在迁移开始之前就设计好的能力。
class MigrationSwitch:
"""迁移切换控制器,支持灰度切换和紧急回切"""
def __init__(self, feature_flag_client):
self.ff = feature_flag_client
def get_read_source(self, user_id: str) -> str:
"""决定读取来源:old / new / both"""
phase = self.ff.get_string('migration.read_source', default='old')
if phase == 'shadow':
return 'both' # 新旧都读,以旧库为准,新库用于校验
if phase == 'new_with_fallback':
return 'new_fallback_old' # 优先新库,失败回退旧库
if phase == 'new':
return 'new' # 只读新库
return 'old' # 默认只读旧库
def get_write_target(self, user_id: str) -> str:
"""决定写入目标:old / both / new"""
phase = self.ff.get_string('migration.write_target', default='both')
return phase
def emergency_rollback(self):
"""紧急回切:将所有读写切回旧库"""
self.ff.set_string('migration.read_source', 'old')
self.ff.set_string('migration.write_target', 'old')
logging.critical("Migration emergency rollback triggered!")
alerting.send("Migration rollback", severity="critical")四、GitHub gh-ost:基于 Binlog 的无触发器在线 DDL
GitHub 在 2016 年开源了 gh-ost(GitHub Online Schema Transmogrifier),这是目前业界最成熟的在线 DDL 工具之一。它的设计思路和传统工具(如 pt-online-schema-change)有根本性的不同。
4.1 pt-online-schema-change 的问题
在 gh-ost 出现之前,Percona 的 pt-online-schema-change 是大规模 MySQL 在线 DDL 的标准工具。pt-osc 的工作原理如下:
- 创建一张与原表结构相同的影子表(Ghost Table)。
- 在原表上创建三个触发器(AFTER INSERT、AFTER UPDATE、AFTER DELETE),将原表的增量变更实时同步到影子表。
- 批量将原表的历史数据拷贝到影子表。
- 拷贝完成后,用 RENAME TABLE 原子交换原表和影子表。
这个方案的问题在于触发器:
- 性能开销:每一次对原表的写操作都会触发一个额外的写操作到影子表。在高写入场景下,这相当于将写入吞吐量减半。
- 锁争用:触发器在原表的事务上下文中执行,增加了行锁的持有时间,加剧了锁竞争。
- 不可暂停:一旦开始迁移,触发器持续工作。如果数据库负载过高需要暂停迁移,你必须先删除触发器——但删除触发器的过程本身也需要元数据锁。
- 不可测试:触发器的行为无法在非生产环境完全模拟,因为它依赖生产环境的真实写入模式。
4.2 gh-ost 的设计思路
gh-ost 用 Binlog 替代了触发器。它的核心设计决策是:不在数据库内部做增量同步,而是在数据库外部通过读取 Binlog 来捕获变更。
gh-ost 的执行流程如下:
- 创建影子表:在目标数据库上创建一张按照新
schema 定义的影子表(
_tablename_gho)。 - 连接 Binlog:gh-ost 伪装成一个 MySQL 从库(Replica),连接到主库(或另一个从库)的 Binlog 流,实时读取原表的 DML 事件。
- 行拷贝(Row Copy):使用
INSERT ... SELECT批量将原表的历史数据拷贝到影子表。拷贝策略是按主键范围分批执行,每批的大小可以动态调整。 - Binlog 回放:在行拷贝的同时,持续将从 Binlog 捕获的增量变更应用到影子表。这确保了影子表始终与原表保持同步。
- 切表(Cut-over):当历史数据拷贝完成且增量同步追上后,通过原子的 RENAME TABLE 操作交换原表和影子表。
gh-ost 架构示意:
┌─────────────────┐
│ MySQL 主库 │
│ ┌───────────┐ │
│ │ 原表 │ │
│ └───────────┘ │
│ ┌───────────┐ │
│ │ 影子表 │ │
│ └───────────┘ │
└────┬────────┬────┘
│ │
Binlog 流 │ │ INSERT...SELECT
│ │ (行拷贝)
┌────▼────────▼────┐
│ gh-ost │
│ ┌───────────┐ │
│ │ Binlog │ │
│ │ 解析器 │ │
│ └───────────┘ │
│ ┌───────────┐ │
│ │ 行拷贝 │ │
│ │ 控制器 │ │
│ └───────────┘ │
│ ┌───────────┐ │
│ │ 节流阀 │ │
│ └───────────┘ │
└─────────────────┘
4.3 为什么 Binlog 比触发器好
gh-ost 选择 Binlog 而不是触发器,有以下几个关键理由:
解耦:触发器在数据库进程内部运行,和业务写入争用相同的 CPU、内存、锁资源。Binlog 消费在数据库进程外部运行,资源消耗独立于数据库。
可暂停和可恢复:gh-ost 可以随时暂停行拷贝和 Binlog 消费,不需要修改数据库的任何状态。暂停期间,Binlog 事件会在上游积累,恢复后从断点继续消费。触发器做不到这一点——你不能”暂停”触发器。
可测试:gh-ost 可以连接到从库的 Binlog 流,在从库上执行迁移,而不影响主库。这使得在生产环境中做”预演”成为可能。
动态节流:gh-ost 内置了节流阀(Throttler),可以根据以下指标动态调节迁移速度:
# gh-ost 的命令行示例
gh-ost \
--host=mysql-primary \
--database=ecommerce \
--table=orders \
--alter="ADD COLUMN shipping_address_id BIGINT" \
--allow-on-master \
--max-load="Threads_running=25" \
--critical-load="Threads_running=100" \
--chunk-size=1000 \
--nice-ratio=0.5 \
--throttle-query="SELECT IF(COUNT(*)>1000, 1, 0) FROM ecommerce.replication_lag_monitor" \
--execute--max-load:当 MySQL 的Threads_running超过 25 时,自动暂停迁移。--critical-load:当Threads_running超过 100 时,立即中止迁移。--nice-ratio:每完成一批行拷贝后,等待一个与拷贝耗时成比例的时间。0.5表示拷贝了 100ms 就等 50ms。--throttle-query:自定义的节流查询。返回非零值时暂停迁移。这里的例子是监控复制延迟。
4.4 切表的原子性问题
gh-ost 的切表过程是最关键也是最危险的步骤。它需要在极短的时间内完成原表和影子表的名字交换,同时不能丢失这个窗口内的任何写入。
gh-ost 使用了一种精巧的切表策略,基于 MySQL 的元数据锁(Metadata Lock)机制:
-- 步骤 1: 创建一个占位表
CREATE TABLE _tablename_del LIKE tablename;
-- 步骤 2: 在一个会话中锁定原表(阻塞所有写入)
LOCK TABLES tablename WRITE, _tablename_del WRITE;
-- 步骤 3: 在锁定期间进行原子交换
-- 注意:RENAME TABLE 是原子操作
RENAME TABLE
tablename TO _tablename_del,
_tablename_gho TO tablename;
-- 步骤 4: 释放锁
UNLOCK TABLES;
-- 步骤 5: 清理占位表
DROP TABLE _tablename_del;整个锁定窗口通常在毫秒级别。在这个窗口内,所有对原表的写入请求会短暂阻塞,等待锁释放后自动重定向到新表。对于绝大多数业务场景来说,毫秒级的阻塞是可以接受的。
4.5 gh-ost 在 GitHub 的实战
GitHub 在 2016 年的博客中披露了以下数据:
- GitHub 的 MySQL 集群包含数千张表,部分表超过数十亿行。
- 在引入 gh-ost 之前,大表的 schema 变更需要数天的计划和协调。
- gh-ost 投入生产后,schema 变更变成了日常操作,每周执行数百次。
- gh-ost 在迁移 2 亿行、200 GB 的大表时,对生产流量的影响控制在 5% 以内。
gh-ost 的工程价值不仅在于解决了锁表问题,更在于它将 schema 变更从”高风险运维操作”降级为”常规开发任务”。这使得团队可以更频繁地演进数据模型,而不必积攒变更到一次”大迁移”中集中处理。
五、Stripe 的在线数据迁移实践:四阶段迁移法
Stripe 的支付系统处理着全球数以亿计的交易记录,任何停机都直接导致商家无法收款。Stripe 的工程团队在 2017 年的技术博客中公开了他们的在线数据迁移方法论,被称为四阶段迁移法(Four-Phase Data Migration)。
5.1 四阶段概述
Stripe 的四阶段迁移法将每次数据迁移分为以下步骤:
阶段一:双写(Dual Writing)
修改应用代码,使得每次写操作同时写入旧存储和新存储。关键点是:
- 旧存储仍然是权威来源(Source of Truth)。
- 新存储的写入失败不影响业务流程。
- 所有新存储的写入失败都被记录下来,用于后续修复。
阶段二:回填(Backfilling)
将历史数据从旧存储批量迁移到新存储。回填过程中注意:
- 使用幂等操作,确保重复执行不会产生错误数据。
- 控制回填速度,避免对旧存储造成过大压力。
- 回填完成后运行全量校验,确保新旧存储的数据一致。
阶段三:切换读取(Changing Reads)
逐步将读取流量从旧存储切换到新存储:
- 先在内部工具和低风险场景中切换。
- 然后通过灰度发布(Gradual Rollout)逐步扩大切换比例。
- 保留旧存储的读取能力作为降级方案。
阶段四:清理(Cleanup)
确认新存储完全接管后,清理遗留代码和旧存储:
- 移除双写逻辑。
- 移除旧存储的读取逻辑。
- 在确认期过后,删除旧存储中的数据。
5.2 Stripe 的工程纪律
Stripe 在迁移过程中强调以下工程纪律:
每个阶段独立部署和回滚:四个阶段是四个独立的代码变更,可以独立部署和回滚。不要把多个阶段的代码放在一个 Pull Request 里。
写入是最危险的操作:Stripe 的工程师特别强调,双写阶段是最容易出错的阶段。写入的顺序、重试策略、错误处理——每一个细节都可能导致数据不一致。
校验是迁移的核心:Stripe 在每次迁移中都会编写专门的校验工具,持续比对新旧存储的数据。校验不是迁移的最后一步,而是贯穿整个过程。
class StripeMigrationValidator:
"""Stripe 风格的迁移校验器"""
def __init__(self, old_store, new_store, metrics):
self.old_store = old_store
self.new_store = new_store
self.metrics = metrics
def validate_continuous(self, table: str, interval_seconds: int = 60):
"""持续校验:每隔一段时间抽样比对"""
while True:
sample_ids = self.old_store.random_sample_ids(table, count=1000)
results = self.validate_batch(table, sample_ids)
self.metrics.gauge('migration.consistency_rate', results['match_rate'])
self.metrics.counter('migration.checked_total', results['total'])
self.metrics.counter('migration.mismatches_total', results['mismatch'])
if results['match_rate'] < 0.999:
alerting.warn(
f"Migration consistency below threshold: "
f"{results['match_rate']:.4f} for table {table}"
)
time.sleep(interval_seconds)
def validate_batch(self, table: str, ids: list) -> dict:
results = {'total': len(ids), 'match': 0, 'mismatch': 0, 'missing': 0}
for id in ids:
old_record = self.old_store.get(table, id)
new_record = self.new_store.get(table, id)
if old_record is None:
continue
if new_record is None:
results['missing'] += 1
continue
if self._records_equal(old_record, new_record):
results['match'] += 1
else:
results['mismatch'] += 1
results['match_rate'] = (
results['match'] / (results['total'] - results['missing'])
if (results['total'] - results['missing']) > 0 else 1.0
)
return results
def _records_equal(self, old, new) -> bool:
"""比较记录,忽略已知的差异字段"""
ignored_keys = {'internal_id', 'migrated_at', 'legacy_field'}
for key in old:
if key in ignored_keys:
continue
if old.get(key) != new.get(key):
return False
return True5.3 Stripe 迁移案例:从 MongoDB 到自研存储
Stripe 早期使用 MongoDB 存储商家数据。随着业务增长,MongoDB 在事务支持、查询灵活性方面的限制越来越明显。Stripe 决定将核心数据迁移到基于关系型数据库的自研存储引擎。
这次迁移涉及数十亿条记录、数百个微服务,耗时超过一年。核心挑战包括:
- MongoDB 的文档模型(Document Model)和关系模型之间的映射不是一对一的。嵌套文档需要拆分为多张关联表。
- 数百个微服务中有不同的代码路径读写 MongoDB,需要逐一排查和修改。
- 部分旧数据的格式不规范,需要在迁移过程中做数据清洗。
Stripe 为这次迁移建立了以下基础设施:
- 一个迁移框架(Migration Framework),封装了双写、回填、校验、切换的通用逻辑。
- 一个仪表板(Dashboard),实时展示每张表的迁移进度、一致性率、错误率。
- 一个修复队列(Repair Queue),自动修复校验发现的不一致数据。
迁移完成后,Stripe 将这套框架沉淀为内部工具,后续的存储迁移都基于这套框架执行。这是一个典型的”投资基础设施降低后续迁移成本”的案例。
六、数据格式版本化:Protobuf 和 Avro 的 Schema 演进
数据迁移不仅发生在数据库层面。当系统使用序列化格式(如 Protocol Buffers、Apache Avro、Apache Thrift)在服务之间传递数据、在磁盘上持久化数据时,数据格式的版本化(Schema Evolution)同样是一个关键问题。
6.1 为什么需要 Schema 演进
在微服务架构中,一个服务生产的数据可能被多个消费者读取。生产者升级了数据格式(例如添加了一个新字段),消费者可能还没有升级。如果数据格式不支持版本兼容,新格式的数据会导致旧消费者崩溃。
这个问题在以下场景中尤为突出:
- 消息队列:Kafka 中的消息可能被不同版本的消费者读取。
- 持久化存储:数据库或对象存储中的数据可能在数月甚至数年后被读取,届时代码已经更新了很多个版本。
- RPC 通信:gRPC 服务端升级了 Protobuf 定义,客户端可能还在使用旧版本。
6.2 兼容性的三种类型
Schema 演进中的兼容性分为三种:
向后兼容(Backward Compatibility):新版本的代码可以读取旧版本的数据。这是最基本的要求——你升级了消费者,它必须能处理队列中积压的旧格式消息。
向前兼容(Forward Compatibility):旧版本的代码可以读取新版本的数据。这意味着你先升级了生产者,旧的消费者不会崩溃。
完全兼容(Full Compatibility):同时满足向后兼容和向前兼容。新旧版本的代码可以互相读取对方的数据。
6.3 Protobuf 的 Schema 演进规则
Protocol Buffers(Protobuf)通过字段编号(Field Number)而非字段名来标识字段,这使得它天然支持一定程度的 Schema 演进。以下是 Protobuf 的安全演进规则:
// 版本 1:初始定义
syntax = "proto3";
message Order {
int64 id = 1;
string customer_id = 2;
int64 total_cents = 3;
string currency = 4;
string status = 5;
int64 created_at = 6;
}安全的变更(向后 + 向前兼容):
// 版本 2:添加新字段
message Order {
int64 id = 1;
string customer_id = 2;
int64 total_cents = 3;
string currency = 4;
string status = 5;
int64 created_at = 6;
// 新增字段,使用新的字段编号
string shipping_address = 7;
int64 updated_at = 8;
repeated string tags = 9;
}- 添加新字段:安全。旧代码读取新数据时,忽略未知字段。新代码读取旧数据时,新字段为默认值。
- 删除字段:安全(但需要保留字段编号)。用
reserved关键字标记已删除的字段编号,防止未来被复用。
// 版本 3:删除字段
message Order {
reserved 5; // 原来的 status 字段已废弃
reserved "status";
int64 id = 1;
string customer_id = 2;
int64 total_cents = 3;
string currency = 4;
// status 字段已删除
int64 created_at = 6;
string shipping_address = 7;
int64 updated_at = 8;
repeated string tags = 9;
// 用新字段替代旧字段
OrderStatus order_status = 10;
}
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0;
ORDER_STATUS_PENDING = 1;
ORDER_STATUS_PAID = 2;
ORDER_STATUS_SHIPPED = 3;
ORDER_STATUS_CANCELLED = 4;
}不安全的变更(会破坏兼容性):
// 错误示范:以下变更会导致数据损坏
// 1. 修改字段编号
message Order {
int64 id = 1;
string customer_id = 3; // 从 2 改为 3,旧数据中编号 2 的字段会被错误解析
}
// 2. 修改字段类型(不兼容的类型转换)
message Order {
int64 id = 1;
int64 customer_id = 2; // 从 string 改为 int64,旧数据无法正确解码
}
// 3. 复用已删除字段的编号
message Order {
int64 id = 1;
string email = 2; // 复用了 customer_id 的编号 2,旧数据会被错误解读为 email
}6.4 Avro 的 Schema 演进规则
Apache Avro 的 Schema 演进机制与 Protobuf 有本质区别。Avro 不使用字段编号,而是通过字段名和读写 Schema 的配对来处理兼容性。
{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "long"},
{"name": "customer_id", "type": "string"},
{"name": "total_cents", "type": "long"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "status", "type": "string", "default": "pending"}
]
}Avro 的关键特性是 Schema Resolution(Schema 解析):读取数据时,不仅需要数据本身,还需要写入时使用的 Writer Schema 和当前使用的 Reader Schema。Avro 运行时会自动处理两个 Schema 之间的差异。
添加字段:新字段必须有默认值,否则旧数据无法被新代码读取。
{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "long"},
{"name": "customer_id", "type": "string"},
{"name": "total_cents", "type": "long"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "status", "type": "string", "default": "pending"},
{"name": "shipping_address", "type": ["null", "string"], "default": null}
]
}删除字段:被删除的字段必须在写入时有默认值,否则新代码写入的数据无法被旧代码读取。
6.5 Schema Registry 的角色
在使用 Avro 或 Protobuf 做数据序列化的系统中,Schema Registry(如 Confluent Schema Registry)扮演着关键角色:
- Schema 存储:集中存储所有 Schema 版本。
- 兼容性检查:在注册新版本 Schema 时,自动检查是否与之前的版本兼容。不兼容的变更会被拒绝。
- Schema ID 映射:每个 Schema 版本分配一个唯一 ID,数据中只需要携带 Schema ID(通常是 4 字节),而不是完整的 Schema 定义。
# 向 Schema Registry 注册新版本 Schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"total_cents\",\"type\":\"long\"},{\"name\":\"shipping_address\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}' \
http://schema-registry:8081/subjects/orders-value/versions
# 检查兼容性
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "..."}' \
http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest6.6 Protobuf 与 Avro Schema 演进对比
| 特性 | Protobuf | Avro |
|---|---|---|
| 字段标识 | 字段编号(Field Number) | 字段名(Field Name) |
| Schema 是否随数据传输 | 否,编解码需要同一份 .proto 文件 | 是,Writer Schema 通常与数据一起存储 |
| 添加字段 | 安全(新编号) | 安全(需要默认值) |
| 删除字段 | 安全(保留编号) | 安全(需要默认值) |
| 重命名字段 | 安全(编号不变即可) | 不安全(依赖字段名匹配) |
| 修改字段类型 | 部分安全(int32 → int64) | 部分安全(需要 promotion 规则) |
| Schema 演进验证 | 需要人工遵守规则 | Schema Registry 自动验证 |
| 典型使用场景 | gRPC、移动端、游戏 | Kafka、Hadoop、数据湖 |
七、存储引擎更换的分步策略
存储引擎更换(Storage Engine Migration)是最复杂的数据迁移类型。它不只是改变数据的存储位置,还涉及查询语言、事务模型、一致性保证、运维工具等方方面面的变化。
7.1 为什么要更换存储引擎
常见的更换动机包括:
- 扩展性瓶颈:单机 MySQL 扛不住流量增长,需要迁移到分布式数据库(如 TiDB、CockroachDB)。
- 功能需求变化:关系型数据库不适合存储时序数据,需要迁移到 InfluxDB 或 TimescaleDB。
- 成本优化:自建数据库的运维成本过高,迁移到云托管服务(如 Amazon Aurora、Google Cloud Spanner)。
- 技术债务清理:历史原因使用了不合适的存储引擎,积累了大量 workaround 代码。
7.2 分步策略
存储引擎更换的推荐策略是将整个过程分为六个步骤:
步骤一:抽象存储层
在更换存储之前,先将应用代码中直接操作数据库的部分抽象为接口(Repository Pattern 或 Data Access Object)。这一步的目的是让应用代码不直接依赖特定的存储引擎。
// 定义存储接口
type OrderRepository interface {
FindByID(ctx context.Context, id string) (*Order, error)
FindByCustomerID(ctx context.Context, customerID string, opts QueryOptions) ([]*Order, error)
Save(ctx context.Context, order *Order) error
Update(ctx context.Context, order *Order) error
Delete(ctx context.Context, id string) error
}
// MySQL 实现
type MySQLOrderRepository struct {
db *sql.DB
}
func (r *MySQLOrderRepository) FindByID(ctx context.Context, id string) (*Order, error) {
row := r.db.QueryRowContext(ctx, "SELECT id, customer_id, total_cents, status FROM orders WHERE id = ?", id)
order := &Order{}
err := row.Scan(&order.ID, &order.CustomerID, &order.TotalCents, &order.Status)
if err == sql.ErrNoRows {
return nil, ErrNotFound
}
return order, err
}
func (r *MySQLOrderRepository) Save(ctx context.Context, order *Order) error {
_, err := r.db.ExecContext(ctx,
"INSERT INTO orders (id, customer_id, total_cents, status, created_at) VALUES (?, ?, ?, ?, ?)",
order.ID, order.CustomerID, order.TotalCents, order.Status, order.CreatedAt)
return err
}步骤二:实现新存储的适配器
基于相同的接口,实现新存储引擎的适配器(如
PostgresOrderRepository)。接口签名与旧实现完全一致,只有
SQL 方言不同(例如参数占位符从 ? 变为
$1)。
步骤三:构建双写代理
创建一个代理实现,将写入操作同时分发到新旧存储。核心逻辑是先写主存储(旧),然后异步写副存储(新),副存储写入失败不影响业务但记录到指标系统。同时实现影子读取(Shadow Read),从副存储异步读取并与主存储结果比较,用于验证数据一致性。这个模式与第三节讨论的双写双读一致,只是通过 Repository 接口实现了更好的封装。
步骤四:数据回填与校验
批量将历史数据从旧存储迁移到新存储。回填逻辑与第二节中的
backfill_addresses
函数类似——按主键范围分批读取、写入新存储、控制速度。回填完成后运行校验,确认新旧存储数据一致。
步骤五:灰度切换读取
通过功能开关(Feature Flag)逐步将读取流量从旧存储切换到新存储:
type GradualSwitchRepository struct {
old OrderRepository
new OrderRepository
flagStore FeatureFlagStore
}
func (r *GradualSwitchRepository) FindByID(ctx context.Context, id string) (*Order, error) {
readPercent := r.flagStore.GetInt("migration.read_new_percent")
if shouldUseNew(id, readPercent) {
order, err := r.new.FindByID(ctx, id)
if err != nil {
// 新存储失败,降级到旧存储
metrics.Increment("read.new.fallback")
return r.old.FindByID(ctx, id)
}
metrics.Increment("read.new.success")
return order, nil
}
metrics.Increment("read.old")
return r.old.FindByID(ctx, id)
}
func shouldUseNew(id string, percent int) bool {
// 基于 ID 的哈希值决定是否使用新存储
// 确保同一 ID 在同一百分比下的决策一致
hash := crc32.ChecksumIEEE([]byte(id))
return int(hash%100) < percent
}步骤六:清理旧存储
确认新存储完全接管后,移除双写逻辑、移除旧存储的读取路径、最终下线旧存储实例。
7.3 存储引擎更换的常见陷阱
语义差异:不同存储引擎的语义可能存在微妙差异。例如
MySQL 的 CHARSET=utf8 实际上是 3 字节的
UTF-8,不支持 4 字节的 emoji 字符;而 PostgreSQL 的
UTF8 编码支持完整的
Unicode。这种差异可能导致迁移后部分数据无法正确显示。
事务边界变化:如果从支持事务的存储迁移到不支持事务的存储(如从 MySQL 到 DynamoDB),原来依赖事务保证原子性的代码需要重新设计。
查询模式适配:关系型数据库支持灵活的 JOIN 查询,NoSQL 数据库通常不支持。如果应用代码中有大量 JOIN 查询,迁移到 NoSQL 数据库时需要在应用层做数据拼接,性能和代码复杂度都会增加。
# 关系型数据库:一条 SQL 搞定
result = db.query("""
SELECT o.id, o.total_cents, c.name, c.email
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending'
""")
# NoSQL 数据库:需要在应用层拼接
pending_orders = dynamo.query(
TableName='orders',
IndexName='status-index',
KeyConditionExpression='#s = :status',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':status': {'S': 'pending'}}
)
customer_ids = [o['customer_id']['S'] for o in pending_orders['Items']]
customers = dynamo.batch_get_item(
RequestItems={
'customers': {
'Keys': [{'id': {'S': cid}} for cid in customer_ids]
}
}
)
# 在应用层做 JOIN
customer_map = {c['id']['S']: c for c in customers['Responses']['customers']}
result = []
for order in pending_orders['Items']:
customer = customer_map.get(order['customer_id']['S'])
result.append({
'order_id': order['id']['S'],
'total_cents': int(order['total_cents']['N']),
'customer_name': customer['name']['S'] if customer else None,
'customer_email': customer['email']['S'] if customer else None,
})索引策略差异:不同存储引擎的索引机制不同。MySQL 的 B+Tree 索引和 DynamoDB 的 GSI(Global Secondary Index)在查询能力、一致性保证、成本模型上都有显著差异。迁移时需要重新设计索引策略。
八、迁移过程的可观测性
在线迁移不是”启动然后等待完成”的一次性操作,而是一个需要持续监控和调整的过程。可观测性(Observability)是迁移成功的基础保障。
8.1 关键监控指标
迁移过程中需要监控的核心指标包括:
class MigrationMetrics:
"""迁移过程的核心监控指标"""
def __init__(self, metrics_client):
self.m = metrics_client
def record_backfill_progress(self, table: str, migrated: int, total: int):
"""回填进度"""
self.m.gauge(f'migration.backfill.progress.{table}', migrated / total)
self.m.gauge(f'migration.backfill.remaining.{table}', total - migrated)
def record_consistency(self, table: str, match_rate: float):
"""数据一致性率"""
self.m.gauge(f'migration.consistency.{table}', match_rate)
def record_dual_write_latency(self, table: str, old_ms: float, new_ms: float):
"""双写延迟对比"""
self.m.histogram(f'migration.write_latency.old.{table}', old_ms)
self.m.histogram(f'migration.write_latency.new.{table}', new_ms)
self.m.histogram(f'migration.write_latency.overhead.{table}', new_ms - old_ms)
def record_read_source(self, table: str, source: str):
"""读取来源分布"""
self.m.increment(f'migration.read.{source}.{table}')
def record_repair(self, table: str, repair_type: str, success: bool):
"""修复操作"""
status = 'success' if success else 'failure'
self.m.increment(f'migration.repair.{repair_type}.{status}.{table}')8.2 迁移仪表板与告警
一个好的迁移仪表板应该包含以下面板:整体回填进度与预计完成时间、过去 1 小时和 24 小时的一致性率趋势(阈值 99.9%)、新旧存储的写入延迟对比、双写/校验/修复的错误率、CDC 管道延迟、源和目标数据库的 CPU/内存/IOPS/连接数。
关键告警规则应覆盖四个场景:数据一致性低于 99.9% 持续 5 分钟(warning)、双写失败率超过 1%(critical)、回填进度停滞超过 30 分钟(warning)、CDC 延迟超过 60 秒(critical)。以下是 Prometheus AlertManager 格式的示例:
groups:
- name: migration_alerts
rules:
- alert: MigrationConsistencyLow
expr: migration_consistency < 0.999
for: 5m
labels:
severity: warning
annotations:
summary: "数据一致性低于 99.9%"
- alert: MigrationDualWriteFailureHigh
expr: rate(migration_dual_write_failure_total[5m]) > 0.01
for: 3m
labels:
severity: critical
annotations:
summary: "双写失败率超过 1%"
- alert: MigrationCDCLagHigh
expr: migration_cdc_lag_seconds > 60
for: 5m
labels:
severity: critical
annotations:
summary: "CDC 延迟超过 60 秒"九、迁移检查清单与最佳实践
9.1 迁移前检查清单
在启动任何在线迁移之前,确认以下事项:
迁移前检查清单:
[ ] 已明确迁移的目标和范围
[ ] 已评估数据量和迁移时间估算
[ ] 已识别所有读写该数据的服务和代码路径
[ ] 已设计回滚方案并在测试环境验证
[ ] 已准备数据校验工具
[ ] 已建立监控仪表板和告警规则
[ ] 已在测试环境完成端到端演练
[ ] 已通知所有相关团队迁移计划
[ ] 已评估迁移对数据库负载的影响
[ ] 已准备修复队列和修复脚本
9.2 最佳实践总结
小步快跑:将大迁移拆分为多个小迁移。每次只迁移一张表或一个数据模型。小迁移的风险可控、回滚简单、验证快速。
可观测性优先:在编写迁移代码之前,先搭建监控和告警。没有可观测性的迁移就是盲飞。
校验驱动:不要相信”代码逻辑没问题数据一定没问题”。始终用独立的校验工具验证数据一致性。
灰度发布:切换读写源时使用灰度策略。先切 1% 的流量,观察一天;再切 10%,再观察;逐步扩大到 100%。
保留回切能力:在迁移完全结束之前(通常是收缩阶段之前),始终保留回到旧存储的能力。功能开关(Feature Flag)是实现回切的最佳工具。
文档化:记录迁移的决策、步骤、时间线、遇到的问题和解决方案。这些文档是下次迁移的最佳参考。用 ADR(架构决策记录)格式记录关键决策,具体方法可参考 架构决策与 ADR 一文。
9.3 迁移方案选型决策树
根据迁移的具体场景,选择合适的方案:
迁移需求分析:
│
├── 只是添加/删除列?
│ ├── 表小于 100 万行 → 直接 ALTER TABLE(MySQL Online DDL / PostgreSQL 快速 DDL)
│ └── 表大于 100 万行 → gh-ost 或 pt-online-schema-change
│
├── 数据格式需要转换?
│ ├── 同一个数据库内 → Expand-Contract 模式
│ └── 跨存储引擎 → 双写 + 回填 + 灰度切换
│
├── 存储引擎需要更换?
│ ├── 同类型更换(MySQL → MySQL / PostgreSQL → PostgreSQL)→ 逻辑复制 + 切换
│ └── 跨类型更换(MySQL → DynamoDB / MongoDB → PostgreSQL)→ 四阶段迁移法
│
└── 数据序列化格式需要变更?
├── Protobuf / Avro → 遵循 Schema 演进规则,使用 Schema Registry
└── 自定义格式 → 版本字段 + 多版本解析器
十、总结
在线数据迁移的核心挑战不在于技术本身,而在于如何在保证系统持续可用的前提下,安全地完成数据结构和存储的演进。
传统的停机迁移方式——锁表、修改 schema、重新上线——在现代系统中已经不可接受。Expand-Contract 模式提供了一个基本框架:先扩展、再迁移、最后收缩,每一步都可回滚。双写双读策略解决了跨存储迁移的一致性问题,但需要精心设计写入顺序、校验机制和回切方案。
GitHub 的 gh-ost 用 Binlog 取代触发器,将大表 schema 变更从高风险运维操作降级为日常开发任务。Stripe 的四阶段迁移法为跨存储引擎迁移提供了工业级的方法论。Protobuf 和 Avro 各自通过不同的机制支持数据格式的版本化演进。
所有这些方案都有一个共同的前提:可观测性。没有监控、没有校验、没有告警的迁移,和蒙着眼睛过马路没有本质区别。
在线迁移不是一个技术问题,而是一个工程纪律问题。它要求团队在速度和安全性之间做出平衡,在自动化和人工判断之间找到边界。每一次成功的迁移,都是团队工程能力的一次检验。
上一篇:时序数据架构
下一篇:应用层数据一致性模式
参考资料
- Shlomi Noach. “gh-ost: GitHub’s Online Schema Migration Tool for MySQL.” GitHub Engineering Blog, 2016.
- Jacqueline Xu. “Online Migrations at Scale.” Stripe Engineering Blog, 2017.
- Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 4: Encoding and Evolution.
- Percona. “pt-online-schema-change Documentation.” Percona Toolkit.
- MySQL Documentation. “InnoDB Online DDL Operations.” MySQL 8.0 Reference Manual.
- Confluent. “Schema Evolution and Compatibility.” Confluent Documentation.
- Google. “Protocol Buffers Language Guide (proto3).” Google Developers Documentation.
- Apache Avro. “Avro Specification: Schema Resolution.” Apache Avro Documentation.
- Debezium. “Debezium Connector for MySQL.” Debezium Documentation.
- Sam Newman. Monolith to Microservices. O’Reilly Media, 2019. Chapter 4: Decomposing the Database.
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【系统架构设计百科】契约测试与 Schema 演进:服务间的信任协议
微服务拆分之后,服务间的接口兼容性成为系统稳定性的最大隐患。Pact 的消费者驱动契约、Protobuf 和 Avro 各自不同的 Schema 演进规则、Confluent Schema Registry 的兼容性策略——这些机制共同构成了服务间的信任协议。本文从一个真实的 Breaking Change 事故出发,拆解契约测试与 Schema 演进的工程实践,给出可落地的 CI/CD 集成方案。
【系统架构设计百科】架构质量属性:不只是"高可用高性能"
需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。
【系统架构设计百科】告警策略:如何避免"狼来了"
大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。
【系统架构设计百科】复杂性管理:架构的核心战场
系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略