土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】数据迁移与版本化:在线不停机的数据演进

文章导航

分类入口
architecture
标签入口
#data-migration#expand-contract#gh-ost#online-DDL#schema-evolution

目录

凌晨三点,某电商平台的 DBA 在公司群里发了一条消息:“开始执行 ALTER TABLE orders ADD COLUMN shipping_address_id BIGINT,预计锁表 40 分钟,期间订单服务不可用。”40 分钟后,表锁释放,主从复制延迟从 0 秒飙到 12 分钟,从库的读请求全部返回过期数据。凌晨四点半,复制追上了,DBA 发了第二条消息:“迁移完成,可以恢复流量。”

这是 2015 年之前大多数团队做数据库 schema 迁移的方式——挑一个流量最低的时间窗口停机维护。当业务规模还小的时候,凌晨三点停半个小时没人在意。但当你的系统服务全球用户、每秒处理上万笔交易、任何停机都意味着真金白银的损失时,“找个没人的时间窗口”这个前提就不存在了。

在线数据迁移(Online Data Migration)要解决的核心问题是:在系统持续对外提供服务的同时,完成数据库 schema 变更、数据格式升级、甚至底层存储引擎的更换。这件事的难度远超大多数工程师的直觉——它不只是一个 SQL 语句的问题,而是一个涉及并发控制、数据一致性、回滚安全性的系统工程问题。

上一篇 中我们讨论了时序数据的存储与查询架构,本文聚焦另一个工程现实:当数据模型需要演进时,如何在不停机的前提下安全地完成迁移。


一、为什么在线迁移如此困难

要理解在线迁移方案为什么设计得这么复杂,先要搞清楚传统迁移方式的三个致命问题。

1.1 锁表:ALTER TABLE 的隐藏代价

MySQL 的 InnoDB 引擎在执行 DDL(Data Definition Language)语句时,不同操作对表的锁定行为差异很大。以 MySQL 5.6 之前的版本为例,ALTER TABLE ... ADD COLUMN 的执行流程如下:

  1. 获取表的元数据锁(Metadata Lock,MDL)。
  2. 创建一张临时表,按照新的 schema 结构建表。
  3. 逐行拷贝原表数据到临时表。
  4. 交换原表和临时表的名字。
  5. 删除旧表。

在步骤 3 期间,整张表处于只读或完全锁定状态,所有写入操作被阻塞。如果表有 1 亿行数据,拷贝过程可能持续几十分钟甚至数小时。这期间所有对该表的 INSERT、UPDATE、DELETE 操作全部排队等待。

MySQL 5.6 引入了在线 DDL(Online DDL),允许部分 ALTER TABLE 操作在不锁表的情况下执行。但”在线”并不意味着完全无影响:

-- MySQL 5.6+ 的在线 DDL 语法
ALTER TABLE orders ADD COLUMN shipping_address_id BIGINT, ALGORITHM=INPLACE, LOCK=NONE;

即使使用 ALGORITHM=INPLACE, LOCK=NONE,以下问题仍然存在:

PostgreSQL 的情况稍好一些。ALTER TABLE ... ADD COLUMN 如果新列有默认值且版本在 11 以上,只需要修改系统目录(System Catalog),不需要重写整张表,基本可以做到瞬间完成。但 ALTER TABLE ... ALTER COLUMN TYPE 仍然需要重写全表。

1.2 长事务与复制延迟

在主从复制架构(Primary-Replica Replication)中,DDL 语句的执行会被记录到二进制日志(Binlog)中,然后传播到从库执行。问题在于:从库是单线程回放 Binlog 的(MySQL 5.7 之前),DDL 在主库执行了 30 分钟,从库也需要 30 分钟来回放。

在这 30 分钟内,从库的复制位点停滞不前,所有后续的 DML(Data Manipulation Language)操作都在排队。对于依赖从库做读分离的应用来说,这意味着 30 分钟的数据不一致。

更危险的情况是 DDL 导致主从切换失败。如果主库在执行 DDL 期间宕机,切换到从库时从库还在回放 DDL,无法立即接管写入流量。这种场景在生产环境中并不罕见。

时间线示意:

主库:  |----- ALTER TABLE (30min) -----|--> 正常写入
         |                              |
Binlog:  DDL event                      DML events
         |                              |
从库:   |- 等待 DDL event -|- ALTER TABLE (30min) -|- 回放 DML -|
         |                              |
         ←-------- 复制延迟窗口 --------→

1.3 回滚的不可逆性

传统的 DDL 操作很难回滚。一旦 ALTER TABLE ... DROP COLUMN 执行完成,被删除的列和数据就永久丢失了。即使你在执行前做了备份,从备份恢复也意味着停机——你需要停止写入、导入备份、追赶备份之后的增量数据。这个过程的时间取决于表的大小和增量数据量,可能从几分钟到几小时不等。

这三个问题叠加在一起,使得在线数据迁移成为一个需要精心设计的系统工程问题,而不是一条 SQL 语句能解决的事情。

1.4 传统方案对比

方案 锁表影响 复制延迟 回滚能力 适用场景
直接 ALTER TABLE 严重,可能锁表数小时 等于 DDL 执行时间 无法回滚 DROP 操作 小表、允许停机
MySQL Online DDL 部分操作免锁 仍有延迟 有限 中等规模表、支持 INPLACE 的操作
pt-online-schema-change 不锁原表 触发器带来额外开销 可通过保留旧表回滚 大表,但写入量不能太高
gh-ost 不锁原表、无触发器 可控 可随时中止和回滚 大表、高写入场景
蓝绿部署 + 双写 无锁表 无复制延迟问题 完全可回滚 存储引擎更换、大规模重构

二、Expand-Contract 模式:安全迁移的基本范式

Expand-Contract 模式(扩展-收缩模式)是在线数据迁移的基本思想框架。它的核心理念是:不要试图一步到位地完成迁移,而是分成”扩展”和”收缩”两个阶段,中间留出足够的过渡期和验证窗口

2.1 模式概述

Expand-Contract 模式将一次数据迁移拆分为三个阶段:

  1. 扩展(Expand):在不破坏现有功能的前提下,添加新的数据结构。旧结构保持不变,新旧结构并存。
  2. 迁移(Migrate):逐步将数据从旧结构迁移到新结构,同时保持应用对两种结构的兼容。
  3. 收缩(Contract):确认迁移完成且新结构工作正常后,删除旧的数据结构。

这种分阶段的做法带来两个关键好处:

2.2 具体案例:拆分 address 字段

假设你的 users 表有一个 address 文本字段,存储的是用户的完整地址字符串。现在产品需求要求支持按省份、城市筛选用户,你需要把 address 拆分为 provincecitydistrictstreet 四个独立字段。

第一阶段:扩展

添加新的列,不删除旧列,不修改应用读取逻辑:

-- 扩展阶段:添加新列
ALTER TABLE users ADD COLUMN province VARCHAR(50);
ALTER TABLE users ADD COLUMN city VARCHAR(50);
ALTER TABLE users ADD COLUMN district VARCHAR(100);
ALTER TABLE users ADD COLUMN street VARCHAR(200);

修改应用的写入逻辑,同时写入旧字段和新字段:

def update_user_address(user_id: int, province: str, city: str, district: str, street: str):
    full_address = f"{province}{city}{district}{street}"
    db.execute("""
        UPDATE users
        SET address = %s,
            province = %s,
            city = %s,
            district = %s,
            street = %s
        WHERE id = %s
    """, (full_address, province, city, district, street, user_id))

注意此时应用的读取逻辑仍然使用旧的 address 字段,所以即使新列的数据有问题,也不影响现有功能。

第二阶段:迁移

编写后台任务,批量将历史数据从 address 字段解析并填充到新字段:

import re
from typing import Optional, Tuple

def parse_address(address: str) -> Tuple[str, str, str, str]:
    """解析地址字符串为省市区街道四个部分"""
    province_pattern = r'([\u4e00-\u9fa5]{2,}(?:|自治区|))'
    city_pattern = r'([\u4e00-\u9fa5]{2,}(?:|自治州|地区|))'
    district_pattern = r'([\u4e00-\u9fa5]{2,}(?:|||))'

    province = re.search(province_pattern, address)
    city = re.search(city_pattern, address)
    district = re.search(district_pattern, address)

    province_str = province.group(0) if province else ''
    city_str = city.group(0) if city else ''
    district_str = district.group(0) if district else ''

    remaining = address
    for part in [province_str, city_str, district_str]:
        remaining = remaining.replace(part, '', 1)
    street_str = remaining.strip()

    return province_str, city_str, district_str, street_str

def backfill_addresses(batch_size: int = 1000):
    """批量回填地址数据"""
    last_id = 0
    total_migrated = 0
    total_failed = 0

    while True:
        rows = db.query("""
            SELECT id, address FROM users
            WHERE id > %s AND province IS NULL AND address IS NOT NULL
            ORDER BY id ASC
            LIMIT %s
        """, (last_id, batch_size))

        if not rows:
            break

        for row in rows:
            try:
                province, city, district, street = parse_address(row['address'])
                db.execute("""
                    UPDATE users
                    SET province = %s, city = %s, district = %s, street = %s
                    WHERE id = %s AND province IS NULL
                """, (province, city, district, street, row['id']))
                total_migrated += 1
            except Exception as e:
                logging.error(f"Failed to parse address for user {row['id']}: {e}")
                total_failed += 1

            last_id = row['id']

        logging.info(f"Migrated {total_migrated} rows, failed {total_failed}, last_id={last_id}")
        time.sleep(0.1)  # 控制回填速度,避免影响在线业务

    return total_migrated, total_failed

回填完成后,运行校验脚本,确认新旧数据一致:

def verify_migration(sample_size: int = 10000) -> dict:
    """抽样校验迁移数据的正确性"""
    rows = db.query("""
        SELECT id, address, province, city, district, street
        FROM users
        WHERE province IS NOT NULL
        ORDER BY RAND()
        LIMIT %s
    """, (sample_size,))

    results = {'total': len(rows), 'match': 0, 'mismatch': 0, 'mismatches': []}

    for row in rows:
        reconstructed = f"{row['province']}{row['city']}{row['district']}{row['street']}"
        if reconstructed == row['address']:
            results['match'] += 1
        else:
            results['mismatch'] += 1
            if len(results['mismatches']) < 100:
                results['mismatches'].append({
                    'id': row['id'],
                    'original': row['address'],
                    'reconstructed': reconstructed
                })

    results['match_rate'] = results['match'] / results['total'] if results['total'] > 0 else 0
    return results

校验通过后,切换应用的读取逻辑,从新字段读取:

def get_user_address(user_id: int) -> dict:
    row = db.query_one("SELECT province, city, district, street FROM users WHERE id = %s", (user_id,))
    return {
        'province': row['province'],
        'city': row['city'],
        'district': row['district'],
        'street': row['street'],
        'full': f"{row['province']}{row['city']}{row['district']}{row['street']}"
    }

第三阶段:收缩

确认新字段工作正常、没有代码再引用旧的 address 字段后,删除旧列:

-- 收缩阶段:删除旧列(确认安全后执行)
ALTER TABLE users DROP COLUMN address;

2.3 Expand-Contract 工作流

以下是 Expand-Contract 模式的完整工作流:

flowchart TD
    A[当前状态:旧 Schema] --> B[扩展:添加新列/新表]
    B --> C[部署新代码:双写新旧结构]
    C --> D[后台回填历史数据]
    D --> E{数据校验通过?}
    E -->|否| F[修复数据/修复解析逻辑]
    F --> D
    E -->|是| G[切换读取逻辑到新结构]
    G --> H[观察期:监控错误率和延迟]
    H --> I{新结构稳定?}
    I -->|否| J[回切到旧结构读取]
    J --> H
    I -->|是| K[移除旧结构的写入代码]
    K --> L[收缩:删除旧列/旧表]
    L --> M[迁移完成]

2.4 关键原则

Expand-Contract 模式有几条必须遵守的原则:

向后兼容(Backward Compatibility):扩展阶段添加的新结构不能破坏现有代码对旧结构的读写。这意味着新列必须允许 NULL 或有默认值,新表不能要求旧代码也写入。

向前兼容(Forward Compatibility):收缩阶段删除旧结构之前,必须确认所有代码已经切换到新结构。在微服务架构中,这意味着所有消费该表的服务都必须完成代码更新。

独立部署(Independent Deployment):Schema 变更和代码变更应该是独立的部署步骤。不要在同一个发布中既修改 schema 又修改代码——如果发布失败需要回滚,你需要能独立回滚代码而不影响 schema,反之亦然。

可观测性(Observability):迁移的每个阶段都应该有监控指标。回填进度、校验结果、新旧结构的读写比例、错误率——这些指标决定了你是否可以安全地进入下一个阶段。


三、双写双读的一致性保证

当迁移涉及更换底层存储(例如从 MySQL 迁移到 PostgreSQL,或从自建数据库迁移到云数据库),简单的 Expand-Contract 模式不够用——你需要在两个完全独立的存储系统之间保持数据一致。这就引出了双写双读(Dual Write / Dual Read)策略。

3.1 双写的基本思路

双写是指应用同时向旧存储和新存储写入数据。基本的实现方式有两种:

同步双写:在同一个请求处理流程中,先写旧库,再写新库(或反过来)。两个写入都成功才返回成功。

public class DualWriteRepository {
    private final OldRepository oldRepo;
    private final NewRepository newRepo;
    private final MigrationConfig config;

    public void save(Order order) {
        // 先写旧库(主库)
        oldRepo.save(order);

        try {
            // 再写新库(目标库)
            newRepo.save(transform(order));
        } catch (Exception e) {
            // 新库写入失败不影响业务
            // 但需要记录到修复队列
            repairQueue.enqueue(new RepairTask(order.getId(), "save", e));
            metrics.increment("dual_write.new_store.failure");
        }
    }

    private NewOrder transform(Order order) {
        // 将旧数据模型转换为新数据模型
        return NewOrder.builder()
            .id(order.getId())
            .customerId(order.getCustomerId())
            .totalAmount(order.getTotalCents() / 100.0)
            .currency(order.getCurrency())
            .shippingAddress(AddressMapper.fromLegacy(order.getAddress()))
            .createdAt(order.getCreatedAt())
            .build();
    }
}

异步双写:旧库写入成功后,通过消息队列(Message Queue)或变更数据捕获(Change Data Capture,CDC)将变更异步推送到新库。

class AsyncDualWriter:
    def __init__(self, old_db, change_publisher):
        self.old_db = old_db
        self.change_publisher = change_publisher

    def save_order(self, order: Order):
        # 只写旧库
        self.old_db.save(order)

        # 发布变更事件,由消费者异步写入新库
        event = ChangeEvent(
            table='orders',
            operation='INSERT',
            data=order.to_dict(),
            timestamp=datetime.utcnow()
        )
        self.change_publisher.publish('order-changes', event)

3.2 双写的一致性问题

双写看起来简单,但一致性保证极其困难。以下是三个常见的一致性问题:

问题一:写入顺序不一致

假设用户先创建订单,再修改订单地址。在同步双写场景下:

请求1: INSERT order-123    → 旧库成功 → 新库成功
请求2: UPDATE order-123    → 旧库成功 → 新库失败(网络超时)
请求3: UPDATE order-123    → 旧库成功 → 新库成功

最终状态:
  旧库: order-123 的地址是请求3的值 (正确)
  新库: order-123 的地址是请求3的值 (正确,但跳过了请求2)

这个例子看起来没问题,但考虑并发场景:

线程A: UPDATE order-123 SET status='paid'     → 旧库成功(T1) → 新库写入中(T3)
线程B: UPDATE order-123 SET status='cancelled' → 旧库成功(T2) → 新库成功(T4)

如果 T4 < T3(线程B先写入新库完成):
  旧库: status = 'cancelled' (T2 > T1,后写入的覆盖前写入的)
  新库: status = 'paid' (T3 > T4,线程A的写入覆盖了线程B)

数据不一致。

问题二:部分写入失败

旧库写入成功但新库写入失败,或者反过来。如果不使用分布式事务(Distributed Transaction),两个库的数据就会不一致。而分布式事务(如两阶段提交 2PC)性能开销大、可用性低,在在线迁移场景中通常不可接受。

问题三:幂等性要求

在异步双写场景中,消息可能被重复消费。如果消费者没有做幂等处理,同一条变更可能被应用多次,导致数据异常。

3.3 一致性保证机制

针对上述问题,工业实践中通常采用以下机制来保证一致性:

机制一:以旧库为准(Source of Truth)

在迁移过程中,明确旧库是数据的唯一权威来源。新库的数据只是旧库的副本。所有读取操作仍然从旧库读取,新库的数据仅用于验证。

class MigrationAwareRepository:
    def __init__(self, old_db, new_db, config):
        self.old_db = old_db
        self.new_db = new_db
        self.config = config

    def find_order(self, order_id: str) -> Order:
        # 始终从旧库读取
        order = self.old_db.find_order(order_id)

        # 如果开启了影子读取,同时从新库读取并比较
        if self.config.shadow_read_enabled:
            try:
                new_order = self.new_db.find_order(order_id)
                self._compare_and_report(order, new_order)
            except Exception as e:
                metrics.increment("shadow_read.failure")

        return order

    def _compare_and_report(self, old_order, new_order):
        """比较新旧库的数据,上报差异"""
        if new_order is None:
            metrics.increment("shadow_read.missing_in_new")
            return

        diffs = []
        for field in ['status', 'total_amount', 'customer_id']:
            old_val = getattr(old_order, field)
            new_val = getattr(new_order, field)
            if old_val != new_val:
                diffs.append(f"{field}: old={old_val}, new={new_val}")

        if diffs:
            metrics.increment("shadow_read.mismatch")
            logging.warning(f"Data mismatch for order {old_order.id}: {diffs}")
        else:
            metrics.increment("shadow_read.match")

机制二:CDC(变更数据捕获)代替应用层双写

与其在应用层实现双写,不如使用 CDC 工具(如 Debezium、Maxwell、Canal)直接从数据库的事务日志(Binlog / WAL)中捕获变更,然后推送到新库。这种方式的优势是:

# Debezium 连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: orders-cdc-connector
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.hostname: old-mysql-primary
    database.port: "3306"
    database.user: cdc_reader
    database.password: "${CDC_PASSWORD}"
    database.server.id: "10001"
    database.server.name: "orders-db"
    database.include.list: "ecommerce"
    table.include.list: "ecommerce.orders,ecommerce.order_items"
    database.history.kafka.bootstrap.servers: "kafka:9092"
    database.history.kafka.topic: "schema-changes.orders"
    snapshot.mode: "initial"
    decimal.handling.mode: "string"
    tombstones.on.delete: "true"

机制三:校验与修复闭环

即使使用了 CDC,也不能假设新旧库的数据一定一致。生产环境中总会有各种意外:网络抖动导致 CDC 事件丢失、消费者 bug 导致数据转换错误、存储引擎差异导致精度丢失。因此需要一个持续运行的校验和修复闭环:

class ConsistencyChecker:
    def __init__(self, old_db, new_db, repair_queue):
        self.old_db = old_db
        self.new_db = new_db
        self.repair_queue = repair_queue

    def check_batch(self, table: str, start_id: int, end_id: int) -> dict:
        """批量比对一段 ID 范围内的数据"""
        old_rows = self.old_db.query(
            f"SELECT * FROM {table} WHERE id BETWEEN %s AND %s ORDER BY id",
            (start_id, end_id)
        )
        new_rows = self.new_db.query(
            f"SELECT * FROM {table} WHERE id BETWEEN %s AND %s ORDER BY id",
            (start_id, end_id)
        )

        old_map = {row['id']: row for row in old_rows}
        new_map = {row['id']: row for row in new_rows}

        stats = {'checked': 0, 'match': 0, 'mismatch': 0, 'missing_new': 0, 'extra_new': 0}

        # 检查旧库有但新库没有的记录
        for id, old_row in old_map.items():
            stats['checked'] += 1
            if id not in new_map:
                stats['missing_new'] += 1
                self.repair_queue.enqueue(RepairTask(table, id, 'missing_in_new'))
            else:
                if self._rows_match(old_row, new_map[id]):
                    stats['match'] += 1
                else:
                    stats['mismatch'] += 1
                    self.repair_queue.enqueue(RepairTask(table, id, 'mismatch'))

        # 检查新库有但旧库没有的记录
        for id in new_map:
            if id not in old_map:
                stats['extra_new'] += 1
                self.repair_queue.enqueue(RepairTask(table, id, 'extra_in_new'))

        return stats

    def _rows_match(self, old_row: dict, new_row: dict, ignored_fields=None) -> bool:
        """比较两行数据是否一致"""
        ignored = ignored_fields or {'updated_at', 'created_at'}
        for key in old_row:
            if key in ignored:
                continue
            if key not in new_row:
                return False
            if old_row[key] != new_row[key]:
                return False
        return True

3.4 双写双读策略对比

策略 一致性保证 性能影响 实现复杂度 适用场景
同步双写 强,但有并发竞争风险 高,每次写入延迟翻倍 写入量低、数据一致性要求极高
异步双写(消息队列) 最终一致 写入量中等、可接受短暂不一致
CDC(Binlog/WAL 捕获) 最终一致,顺序保证 极低(对应用无感知) 高(需要运维 CDC 管道) 大规模迁移、不想修改应用代码
影子读取 + 校验修复 补充机制,非独立方案 读取延迟略增 配合上述任一策略使用

3.5 回切方案

在线迁移必须有回切(Rollback)方案。回切不是”出了问题再想办法”,而是在迁移开始之前就设计好的能力。

class MigrationSwitch:
    """迁移切换控制器,支持灰度切换和紧急回切"""

    def __init__(self, feature_flag_client):
        self.ff = feature_flag_client

    def get_read_source(self, user_id: str) -> str:
        """决定读取来源:old / new / both"""
        phase = self.ff.get_string('migration.read_source', default='old')

        if phase == 'shadow':
            return 'both'  # 新旧都读,以旧库为准,新库用于校验

        if phase == 'new_with_fallback':
            return 'new_fallback_old'  # 优先新库,失败回退旧库

        if phase == 'new':
            return 'new'  # 只读新库

        return 'old'  # 默认只读旧库

    def get_write_target(self, user_id: str) -> str:
        """决定写入目标:old / both / new"""
        phase = self.ff.get_string('migration.write_target', default='both')
        return phase

    def emergency_rollback(self):
        """紧急回切:将所有读写切回旧库"""
        self.ff.set_string('migration.read_source', 'old')
        self.ff.set_string('migration.write_target', 'old')
        logging.critical("Migration emergency rollback triggered!")
        alerting.send("Migration rollback", severity="critical")

四、GitHub gh-ost:基于 Binlog 的无触发器在线 DDL

GitHub 在 2016 年开源了 gh-ost(GitHub Online Schema Transmogrifier),这是目前业界最成熟的在线 DDL 工具之一。它的设计思路和传统工具(如 pt-online-schema-change)有根本性的不同。

4.1 pt-online-schema-change 的问题

在 gh-ost 出现之前,Percona 的 pt-online-schema-change 是大规模 MySQL 在线 DDL 的标准工具。pt-osc 的工作原理如下:

  1. 创建一张与原表结构相同的影子表(Ghost Table)。
  2. 在原表上创建三个触发器(AFTER INSERT、AFTER UPDATE、AFTER DELETE),将原表的增量变更实时同步到影子表。
  3. 批量将原表的历史数据拷贝到影子表。
  4. 拷贝完成后,用 RENAME TABLE 原子交换原表和影子表。

这个方案的问题在于触发器:

4.2 gh-ost 的设计思路

gh-ost 用 Binlog 替代了触发器。它的核心设计决策是:不在数据库内部做增量同步,而是在数据库外部通过读取 Binlog 来捕获变更

gh-ost 的执行流程如下:

  1. 创建影子表:在目标数据库上创建一张按照新 schema 定义的影子表(_tablename_gho)。
  2. 连接 Binlog:gh-ost 伪装成一个 MySQL 从库(Replica),连接到主库(或另一个从库)的 Binlog 流,实时读取原表的 DML 事件。
  3. 行拷贝(Row Copy):使用 INSERT ... SELECT 批量将原表的历史数据拷贝到影子表。拷贝策略是按主键范围分批执行,每批的大小可以动态调整。
  4. Binlog 回放:在行拷贝的同时,持续将从 Binlog 捕获的增量变更应用到影子表。这确保了影子表始终与原表保持同步。
  5. 切表(Cut-over):当历史数据拷贝完成且增量同步追上后,通过原子的 RENAME TABLE 操作交换原表和影子表。
gh-ost 架构示意:

                    ┌─────────────────┐
                    │   MySQL 主库     │
                    │  ┌───────────┐   │
                    │  │  原表      │   │
                    │  └───────────┘   │
                    │  ┌───────────┐   │
                    │  │  影子表    │   │
                    │  └───────────┘   │
                    └────┬────────┬────┘
                         │        │
               Binlog 流 │        │ INSERT...SELECT
                         │        │ (行拷贝)
                    ┌────▼────────▼────┐
                    │     gh-ost       │
                    │  ┌───────────┐   │
                    │  │ Binlog    │   │
                    │  │ 解析器    │   │
                    │  └───────────┘   │
                    │  ┌───────────┐   │
                    │  │ 行拷贝    │   │
                    │  │ 控制器    │   │
                    │  └───────────┘   │
                    │  ┌───────────┐   │
                    │  │ 节流阀    │   │
                    │  └───────────┘   │
                    └─────────────────┘

4.3 为什么 Binlog 比触发器好

gh-ost 选择 Binlog 而不是触发器,有以下几个关键理由:

解耦:触发器在数据库进程内部运行,和业务写入争用相同的 CPU、内存、锁资源。Binlog 消费在数据库进程外部运行,资源消耗独立于数据库。

可暂停和可恢复:gh-ost 可以随时暂停行拷贝和 Binlog 消费,不需要修改数据库的任何状态。暂停期间,Binlog 事件会在上游积累,恢复后从断点继续消费。触发器做不到这一点——你不能”暂停”触发器。

可测试:gh-ost 可以连接到从库的 Binlog 流,在从库上执行迁移,而不影响主库。这使得在生产环境中做”预演”成为可能。

动态节流:gh-ost 内置了节流阀(Throttler),可以根据以下指标动态调节迁移速度:

# gh-ost 的命令行示例
gh-ost \
  --host=mysql-primary \
  --database=ecommerce \
  --table=orders \
  --alter="ADD COLUMN shipping_address_id BIGINT" \
  --allow-on-master \
  --max-load="Threads_running=25" \
  --critical-load="Threads_running=100" \
  --chunk-size=1000 \
  --nice-ratio=0.5 \
  --throttle-query="SELECT IF(COUNT(*)>1000, 1, 0) FROM ecommerce.replication_lag_monitor" \
  --execute

4.4 切表的原子性问题

gh-ost 的切表过程是最关键也是最危险的步骤。它需要在极短的时间内完成原表和影子表的名字交换,同时不能丢失这个窗口内的任何写入。

gh-ost 使用了一种精巧的切表策略,基于 MySQL 的元数据锁(Metadata Lock)机制:

-- 步骤 1: 创建一个占位表
CREATE TABLE _tablename_del LIKE tablename;

-- 步骤 2: 在一个会话中锁定原表(阻塞所有写入)
LOCK TABLES tablename WRITE, _tablename_del WRITE;

-- 步骤 3: 在锁定期间进行原子交换
-- 注意:RENAME TABLE 是原子操作
RENAME TABLE
  tablename TO _tablename_del,
  _tablename_gho TO tablename;

-- 步骤 4: 释放锁
UNLOCK TABLES;

-- 步骤 5: 清理占位表
DROP TABLE _tablename_del;

整个锁定窗口通常在毫秒级别。在这个窗口内,所有对原表的写入请求会短暂阻塞,等待锁释放后自动重定向到新表。对于绝大多数业务场景来说,毫秒级的阻塞是可以接受的。

4.5 gh-ost 在 GitHub 的实战

GitHub 在 2016 年的博客中披露了以下数据:

gh-ost 的工程价值不仅在于解决了锁表问题,更在于它将 schema 变更从”高风险运维操作”降级为”常规开发任务”。这使得团队可以更频繁地演进数据模型,而不必积攒变更到一次”大迁移”中集中处理。


五、Stripe 的在线数据迁移实践:四阶段迁移法

Stripe 的支付系统处理着全球数以亿计的交易记录,任何停机都直接导致商家无法收款。Stripe 的工程团队在 2017 年的技术博客中公开了他们的在线数据迁移方法论,被称为四阶段迁移法(Four-Phase Data Migration)。

5.1 四阶段概述

Stripe 的四阶段迁移法将每次数据迁移分为以下步骤:

阶段一:双写(Dual Writing)

修改应用代码,使得每次写操作同时写入旧存储和新存储。关键点是:

阶段二:回填(Backfilling)

将历史数据从旧存储批量迁移到新存储。回填过程中注意:

阶段三:切换读取(Changing Reads)

逐步将读取流量从旧存储切换到新存储:

阶段四:清理(Cleanup)

确认新存储完全接管后,清理遗留代码和旧存储:

5.2 Stripe 的工程纪律

Stripe 在迁移过程中强调以下工程纪律:

每个阶段独立部署和回滚:四个阶段是四个独立的代码变更,可以独立部署和回滚。不要把多个阶段的代码放在一个 Pull Request 里。

写入是最危险的操作:Stripe 的工程师特别强调,双写阶段是最容易出错的阶段。写入的顺序、重试策略、错误处理——每一个细节都可能导致数据不一致。

校验是迁移的核心:Stripe 在每次迁移中都会编写专门的校验工具,持续比对新旧存储的数据。校验不是迁移的最后一步,而是贯穿整个过程。

class StripeMigrationValidator:
    """Stripe 风格的迁移校验器"""

    def __init__(self, old_store, new_store, metrics):
        self.old_store = old_store
        self.new_store = new_store
        self.metrics = metrics

    def validate_continuous(self, table: str, interval_seconds: int = 60):
        """持续校验:每隔一段时间抽样比对"""
        while True:
            sample_ids = self.old_store.random_sample_ids(table, count=1000)
            results = self.validate_batch(table, sample_ids)

            self.metrics.gauge('migration.consistency_rate', results['match_rate'])
            self.metrics.counter('migration.checked_total', results['total'])
            self.metrics.counter('migration.mismatches_total', results['mismatch'])

            if results['match_rate'] < 0.999:
                alerting.warn(
                    f"Migration consistency below threshold: "
                    f"{results['match_rate']:.4f} for table {table}"
                )

            time.sleep(interval_seconds)

    def validate_batch(self, table: str, ids: list) -> dict:
        results = {'total': len(ids), 'match': 0, 'mismatch': 0, 'missing': 0}

        for id in ids:
            old_record = self.old_store.get(table, id)
            new_record = self.new_store.get(table, id)

            if old_record is None:
                continue

            if new_record is None:
                results['missing'] += 1
                continue

            if self._records_equal(old_record, new_record):
                results['match'] += 1
            else:
                results['mismatch'] += 1

        results['match_rate'] = (
            results['match'] / (results['total'] - results['missing'])
            if (results['total'] - results['missing']) > 0 else 1.0
        )
        return results

    def _records_equal(self, old, new) -> bool:
        """比较记录,忽略已知的差异字段"""
        ignored_keys = {'internal_id', 'migrated_at', 'legacy_field'}
        for key in old:
            if key in ignored_keys:
                continue
            if old.get(key) != new.get(key):
                return False
        return True

5.3 Stripe 迁移案例:从 MongoDB 到自研存储

Stripe 早期使用 MongoDB 存储商家数据。随着业务增长,MongoDB 在事务支持、查询灵活性方面的限制越来越明显。Stripe 决定将核心数据迁移到基于关系型数据库的自研存储引擎。

这次迁移涉及数十亿条记录、数百个微服务,耗时超过一年。核心挑战包括:

Stripe 为这次迁移建立了以下基础设施:

迁移完成后,Stripe 将这套框架沉淀为内部工具,后续的存储迁移都基于这套框架执行。这是一个典型的”投资基础设施降低后续迁移成本”的案例。


六、数据格式版本化:Protobuf 和 Avro 的 Schema 演进

数据迁移不仅发生在数据库层面。当系统使用序列化格式(如 Protocol Buffers、Apache Avro、Apache Thrift)在服务之间传递数据、在磁盘上持久化数据时,数据格式的版本化(Schema Evolution)同样是一个关键问题。

6.1 为什么需要 Schema 演进

在微服务架构中,一个服务生产的数据可能被多个消费者读取。生产者升级了数据格式(例如添加了一个新字段),消费者可能还没有升级。如果数据格式不支持版本兼容,新格式的数据会导致旧消费者崩溃。

这个问题在以下场景中尤为突出:

6.2 兼容性的三种类型

Schema 演进中的兼容性分为三种:

向后兼容(Backward Compatibility):新版本的代码可以读取旧版本的数据。这是最基本的要求——你升级了消费者,它必须能处理队列中积压的旧格式消息。

向前兼容(Forward Compatibility):旧版本的代码可以读取新版本的数据。这意味着你先升级了生产者,旧的消费者不会崩溃。

完全兼容(Full Compatibility):同时满足向后兼容和向前兼容。新旧版本的代码可以互相读取对方的数据。

6.3 Protobuf 的 Schema 演进规则

Protocol Buffers(Protobuf)通过字段编号(Field Number)而非字段名来标识字段,这使得它天然支持一定程度的 Schema 演进。以下是 Protobuf 的安全演进规则:

// 版本 1:初始定义
syntax = "proto3";

message Order {
  int64 id = 1;
  string customer_id = 2;
  int64 total_cents = 3;
  string currency = 4;
  string status = 5;
  int64 created_at = 6;
}

安全的变更(向后 + 向前兼容)

// 版本 2:添加新字段
message Order {
  int64 id = 1;
  string customer_id = 2;
  int64 total_cents = 3;
  string currency = 4;
  string status = 5;
  int64 created_at = 6;

  // 新增字段,使用新的字段编号
  string shipping_address = 7;
  int64 updated_at = 8;
  repeated string tags = 9;
}
// 版本 3:删除字段
message Order {
  reserved 5;  // 原来的 status 字段已废弃
  reserved "status";

  int64 id = 1;
  string customer_id = 2;
  int64 total_cents = 3;
  string currency = 4;
  // status 字段已删除
  int64 created_at = 6;
  string shipping_address = 7;
  int64 updated_at = 8;
  repeated string tags = 9;

  // 用新字段替代旧字段
  OrderStatus order_status = 10;
}

enum OrderStatus {
  ORDER_STATUS_UNSPECIFIED = 0;
  ORDER_STATUS_PENDING = 1;
  ORDER_STATUS_PAID = 2;
  ORDER_STATUS_SHIPPED = 3;
  ORDER_STATUS_CANCELLED = 4;
}

不安全的变更(会破坏兼容性)

// 错误示范:以下变更会导致数据损坏

// 1. 修改字段编号
message Order {
  int64 id = 1;
  string customer_id = 3;  // 从 2 改为 3,旧数据中编号 2 的字段会被错误解析
}

// 2. 修改字段类型(不兼容的类型转换)
message Order {
  int64 id = 1;
  int64 customer_id = 2;  // 从 string 改为 int64,旧数据无法正确解码
}

// 3. 复用已删除字段的编号
message Order {
  int64 id = 1;
  string email = 2;  // 复用了 customer_id 的编号 2,旧数据会被错误解读为 email
}

6.4 Avro 的 Schema 演进规则

Apache Avro 的 Schema 演进机制与 Protobuf 有本质区别。Avro 不使用字段编号,而是通过字段名和读写 Schema 的配对来处理兼容性。

{
  "type": "record",
  "name": "Order",
  "namespace": "com.example",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "customer_id", "type": "string"},
    {"name": "total_cents", "type": "long"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "status", "type": "string", "default": "pending"}
  ]
}

Avro 的关键特性是 Schema Resolution(Schema 解析):读取数据时,不仅需要数据本身,还需要写入时使用的 Writer Schema 和当前使用的 Reader Schema。Avro 运行时会自动处理两个 Schema 之间的差异。

添加字段:新字段必须有默认值,否则旧数据无法被新代码读取。

{
  "type": "record",
  "name": "Order",
  "namespace": "com.example",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "customer_id", "type": "string"},
    {"name": "total_cents", "type": "long"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "status", "type": "string", "default": "pending"},
    {"name": "shipping_address", "type": ["null", "string"], "default": null}
  ]
}

删除字段:被删除的字段必须在写入时有默认值,否则新代码写入的数据无法被旧代码读取。

6.5 Schema Registry 的角色

在使用 Avro 或 Protobuf 做数据序列化的系统中,Schema Registry(如 Confluent Schema Registry)扮演着关键角色:

# 向 Schema Registry 注册新版本 Schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"customer_id\",\"type\":\"string\"},{\"name\":\"total_cents\",\"type\":\"long\"},{\"name\":\"shipping_address\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}' \
  http://schema-registry:8081/subjects/orders-value/versions

# 检查兼容性
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "..."}' \
  http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest

6.6 Protobuf 与 Avro Schema 演进对比

特性 Protobuf Avro
字段标识 字段编号(Field Number) 字段名(Field Name)
Schema 是否随数据传输 否,编解码需要同一份 .proto 文件 是,Writer Schema 通常与数据一起存储
添加字段 安全(新编号) 安全(需要默认值)
删除字段 安全(保留编号) 安全(需要默认值)
重命名字段 安全(编号不变即可) 不安全(依赖字段名匹配)
修改字段类型 部分安全(int32 → int64) 部分安全(需要 promotion 规则)
Schema 演进验证 需要人工遵守规则 Schema Registry 自动验证
典型使用场景 gRPC、移动端、游戏 Kafka、Hadoop、数据湖

七、存储引擎更换的分步策略

存储引擎更换(Storage Engine Migration)是最复杂的数据迁移类型。它不只是改变数据的存储位置,还涉及查询语言、事务模型、一致性保证、运维工具等方方面面的变化。

7.1 为什么要更换存储引擎

常见的更换动机包括:

7.2 分步策略

存储引擎更换的推荐策略是将整个过程分为六个步骤:

步骤一:抽象存储层

在更换存储之前,先将应用代码中直接操作数据库的部分抽象为接口(Repository Pattern 或 Data Access Object)。这一步的目的是让应用代码不直接依赖特定的存储引擎。

// 定义存储接口
type OrderRepository interface {
    FindByID(ctx context.Context, id string) (*Order, error)
    FindByCustomerID(ctx context.Context, customerID string, opts QueryOptions) ([]*Order, error)
    Save(ctx context.Context, order *Order) error
    Update(ctx context.Context, order *Order) error
    Delete(ctx context.Context, id string) error
}

// MySQL 实现
type MySQLOrderRepository struct {
    db *sql.DB
}

func (r *MySQLOrderRepository) FindByID(ctx context.Context, id string) (*Order, error) {
    row := r.db.QueryRowContext(ctx, "SELECT id, customer_id, total_cents, status FROM orders WHERE id = ?", id)
    order := &Order{}
    err := row.Scan(&order.ID, &order.CustomerID, &order.TotalCents, &order.Status)
    if err == sql.ErrNoRows {
        return nil, ErrNotFound
    }
    return order, err
}

func (r *MySQLOrderRepository) Save(ctx context.Context, order *Order) error {
    _, err := r.db.ExecContext(ctx,
        "INSERT INTO orders (id, customer_id, total_cents, status, created_at) VALUES (?, ?, ?, ?, ?)",
        order.ID, order.CustomerID, order.TotalCents, order.Status, order.CreatedAt)
    return err
}

步骤二:实现新存储的适配器

基于相同的接口,实现新存储引擎的适配器(如 PostgresOrderRepository)。接口签名与旧实现完全一致,只有 SQL 方言不同(例如参数占位符从 ? 变为 $1)。

步骤三:构建双写代理

创建一个代理实现,将写入操作同时分发到新旧存储。核心逻辑是先写主存储(旧),然后异步写副存储(新),副存储写入失败不影响业务但记录到指标系统。同时实现影子读取(Shadow Read),从副存储异步读取并与主存储结果比较,用于验证数据一致性。这个模式与第三节讨论的双写双读一致,只是通过 Repository 接口实现了更好的封装。

步骤四:数据回填与校验

批量将历史数据从旧存储迁移到新存储。回填逻辑与第二节中的 backfill_addresses 函数类似——按主键范围分批读取、写入新存储、控制速度。回填完成后运行校验,确认新旧存储数据一致。

步骤五:灰度切换读取

通过功能开关(Feature Flag)逐步将读取流量从旧存储切换到新存储:

type GradualSwitchRepository struct {
    old       OrderRepository
    new       OrderRepository
    flagStore FeatureFlagStore
}

func (r *GradualSwitchRepository) FindByID(ctx context.Context, id string) (*Order, error) {
    readPercent := r.flagStore.GetInt("migration.read_new_percent")

    if shouldUseNew(id, readPercent) {
        order, err := r.new.FindByID(ctx, id)
        if err != nil {
            // 新存储失败,降级到旧存储
            metrics.Increment("read.new.fallback")
            return r.old.FindByID(ctx, id)
        }
        metrics.Increment("read.new.success")
        return order, nil
    }

    metrics.Increment("read.old")
    return r.old.FindByID(ctx, id)
}

func shouldUseNew(id string, percent int) bool {
    // 基于 ID 的哈希值决定是否使用新存储
    // 确保同一 ID 在同一百分比下的决策一致
    hash := crc32.ChecksumIEEE([]byte(id))
    return int(hash%100) < percent
}

步骤六:清理旧存储

确认新存储完全接管后,移除双写逻辑、移除旧存储的读取路径、最终下线旧存储实例。

7.3 存储引擎更换的常见陷阱

语义差异:不同存储引擎的语义可能存在微妙差异。例如 MySQL 的 CHARSET=utf8 实际上是 3 字节的 UTF-8,不支持 4 字节的 emoji 字符;而 PostgreSQL 的 UTF8 编码支持完整的 Unicode。这种差异可能导致迁移后部分数据无法正确显示。

事务边界变化:如果从支持事务的存储迁移到不支持事务的存储(如从 MySQL 到 DynamoDB),原来依赖事务保证原子性的代码需要重新设计。

查询模式适配:关系型数据库支持灵活的 JOIN 查询,NoSQL 数据库通常不支持。如果应用代码中有大量 JOIN 查询,迁移到 NoSQL 数据库时需要在应用层做数据拼接,性能和代码复杂度都会增加。

# 关系型数据库:一条 SQL 搞定
result = db.query("""
    SELECT o.id, o.total_cents, c.name, c.email
    FROM orders o
    JOIN customers c ON o.customer_id = c.id
    WHERE o.status = 'pending'
""")

# NoSQL 数据库:需要在应用层拼接
pending_orders = dynamo.query(
    TableName='orders',
    IndexName='status-index',
    KeyConditionExpression='#s = :status',
    ExpressionAttributeNames={'#s': 'status'},
    ExpressionAttributeValues={':status': {'S': 'pending'}}
)

customer_ids = [o['customer_id']['S'] for o in pending_orders['Items']]
customers = dynamo.batch_get_item(
    RequestItems={
        'customers': {
            'Keys': [{'id': {'S': cid}} for cid in customer_ids]
        }
    }
)

# 在应用层做 JOIN
customer_map = {c['id']['S']: c for c in customers['Responses']['customers']}
result = []
for order in pending_orders['Items']:
    customer = customer_map.get(order['customer_id']['S'])
    result.append({
        'order_id': order['id']['S'],
        'total_cents': int(order['total_cents']['N']),
        'customer_name': customer['name']['S'] if customer else None,
        'customer_email': customer['email']['S'] if customer else None,
    })

索引策略差异:不同存储引擎的索引机制不同。MySQL 的 B+Tree 索引和 DynamoDB 的 GSI(Global Secondary Index)在查询能力、一致性保证、成本模型上都有显著差异。迁移时需要重新设计索引策略。


八、迁移过程的可观测性

在线迁移不是”启动然后等待完成”的一次性操作,而是一个需要持续监控和调整的过程。可观测性(Observability)是迁移成功的基础保障。

8.1 关键监控指标

迁移过程中需要监控的核心指标包括:

class MigrationMetrics:
    """迁移过程的核心监控指标"""

    def __init__(self, metrics_client):
        self.m = metrics_client

    def record_backfill_progress(self, table: str, migrated: int, total: int):
        """回填进度"""
        self.m.gauge(f'migration.backfill.progress.{table}', migrated / total)
        self.m.gauge(f'migration.backfill.remaining.{table}', total - migrated)

    def record_consistency(self, table: str, match_rate: float):
        """数据一致性率"""
        self.m.gauge(f'migration.consistency.{table}', match_rate)

    def record_dual_write_latency(self, table: str, old_ms: float, new_ms: float):
        """双写延迟对比"""
        self.m.histogram(f'migration.write_latency.old.{table}', old_ms)
        self.m.histogram(f'migration.write_latency.new.{table}', new_ms)
        self.m.histogram(f'migration.write_latency.overhead.{table}', new_ms - old_ms)

    def record_read_source(self, table: str, source: str):
        """读取来源分布"""
        self.m.increment(f'migration.read.{source}.{table}')

    def record_repair(self, table: str, repair_type: str, success: bool):
        """修复操作"""
        status = 'success' if success else 'failure'
        self.m.increment(f'migration.repair.{repair_type}.{status}.{table}')

8.2 迁移仪表板与告警

一个好的迁移仪表板应该包含以下面板:整体回填进度与预计完成时间、过去 1 小时和 24 小时的一致性率趋势(阈值 99.9%)、新旧存储的写入延迟对比、双写/校验/修复的错误率、CDC 管道延迟、源和目标数据库的 CPU/内存/IOPS/连接数。

关键告警规则应覆盖四个场景:数据一致性低于 99.9% 持续 5 分钟(warning)、双写失败率超过 1%(critical)、回填进度停滞超过 30 分钟(warning)、CDC 延迟超过 60 秒(critical)。以下是 Prometheus AlertManager 格式的示例:

groups:
  - name: migration_alerts
    rules:
      - alert: MigrationConsistencyLow
        expr: migration_consistency < 0.999
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "数据一致性低于 99.9%"

      - alert: MigrationDualWriteFailureHigh
        expr: rate(migration_dual_write_failure_total[5m]) > 0.01
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "双写失败率超过 1%"

      - alert: MigrationCDCLagHigh
        expr: migration_cdc_lag_seconds > 60
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "CDC 延迟超过 60 秒"

九、迁移检查清单与最佳实践

9.1 迁移前检查清单

在启动任何在线迁移之前,确认以下事项:

迁移前检查清单:

[ ] 已明确迁移的目标和范围
[ ] 已评估数据量和迁移时间估算
[ ] 已识别所有读写该数据的服务和代码路径
[ ] 已设计回滚方案并在测试环境验证
[ ] 已准备数据校验工具
[ ] 已建立监控仪表板和告警规则
[ ] 已在测试环境完成端到端演练
[ ] 已通知所有相关团队迁移计划
[ ] 已评估迁移对数据库负载的影响
[ ] 已准备修复队列和修复脚本

9.2 最佳实践总结

小步快跑:将大迁移拆分为多个小迁移。每次只迁移一张表或一个数据模型。小迁移的风险可控、回滚简单、验证快速。

可观测性优先:在编写迁移代码之前,先搭建监控和告警。没有可观测性的迁移就是盲飞。

校验驱动:不要相信”代码逻辑没问题数据一定没问题”。始终用独立的校验工具验证数据一致性。

灰度发布:切换读写源时使用灰度策略。先切 1% 的流量,观察一天;再切 10%,再观察;逐步扩大到 100%。

保留回切能力:在迁移完全结束之前(通常是收缩阶段之前),始终保留回到旧存储的能力。功能开关(Feature Flag)是实现回切的最佳工具。

文档化:记录迁移的决策、步骤、时间线、遇到的问题和解决方案。这些文档是下次迁移的最佳参考。用 ADR(架构决策记录)格式记录关键决策,具体方法可参考 架构决策与 ADR 一文。

9.3 迁移方案选型决策树

根据迁移的具体场景,选择合适的方案:

迁移需求分析:
│
├── 只是添加/删除列?
│   ├── 表小于 100 万行 → 直接 ALTER TABLE(MySQL Online DDL / PostgreSQL 快速 DDL)
│   └── 表大于 100 万行 → gh-ost 或 pt-online-schema-change
│
├── 数据格式需要转换?
│   ├── 同一个数据库内 → Expand-Contract 模式
│   └── 跨存储引擎 → 双写 + 回填 + 灰度切换
│
├── 存储引擎需要更换?
│   ├── 同类型更换(MySQL → MySQL / PostgreSQL → PostgreSQL)→ 逻辑复制 + 切换
│   └── 跨类型更换(MySQL → DynamoDB / MongoDB → PostgreSQL)→ 四阶段迁移法
│
└── 数据序列化格式需要变更?
    ├── Protobuf / Avro → 遵循 Schema 演进规则,使用 Schema Registry
    └── 自定义格式 → 版本字段 + 多版本解析器

十、总结

在线数据迁移的核心挑战不在于技术本身,而在于如何在保证系统持续可用的前提下,安全地完成数据结构和存储的演进。

传统的停机迁移方式——锁表、修改 schema、重新上线——在现代系统中已经不可接受。Expand-Contract 模式提供了一个基本框架:先扩展、再迁移、最后收缩,每一步都可回滚。双写双读策略解决了跨存储迁移的一致性问题,但需要精心设计写入顺序、校验机制和回切方案。

GitHub 的 gh-ost 用 Binlog 取代触发器,将大表 schema 变更从高风险运维操作降级为日常开发任务。Stripe 的四阶段迁移法为跨存储引擎迁移提供了工业级的方法论。Protobuf 和 Avro 各自通过不同的机制支持数据格式的版本化演进。

所有这些方案都有一个共同的前提:可观测性。没有监控、没有校验、没有告警的迁移,和蒙着眼睛过马路没有本质区别。

在线迁移不是一个技术问题,而是一个工程纪律问题。它要求团队在速度和安全性之间做出平衡,在自动化和人工判断之间找到边界。每一次成功的迁移,都是团队工程能力的一次检验。


上一篇:时序数据架构

下一篇:应用层数据一致性模式


参考资料

  1. Shlomi Noach. “gh-ost: GitHub’s Online Schema Migration Tool for MySQL.” GitHub Engineering Blog, 2016.
  2. Jacqueline Xu. “Online Migrations at Scale.” Stripe Engineering Blog, 2017.
  3. Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 4: Encoding and Evolution.
  4. Percona. “pt-online-schema-change Documentation.” Percona Toolkit.
  5. MySQL Documentation. “InnoDB Online DDL Operations.” MySQL 8.0 Reference Manual.
  6. Confluent. “Schema Evolution and Compatibility.” Confluent Documentation.
  7. Google. “Protocol Buffers Language Guide (proto3).” Google Developers Documentation.
  8. Apache Avro. “Avro Specification: Schema Resolution.” Apache Avro Documentation.
  9. Debezium. “Debezium Connector for MySQL.” Debezium Documentation.
  10. Sam Newman. Monolith to Microservices. O’Reilly Media, 2019. Chapter 4: Decomposing the Database.

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】契约测试与 Schema 演进:服务间的信任协议

微服务拆分之后,服务间的接口兼容性成为系统稳定性的最大隐患。Pact 的消费者驱动契约、Protobuf 和 Avro 各自不同的 Schema 演进规则、Confluent Schema Registry 的兼容性策略——这些机制共同构成了服务间的信任协议。本文从一个真实的 Breaking Change 事故出发,拆解契约测试与 Schema 演进的工程实践,给出可落地的 CI/CD 集成方案。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。

2026-04-13 · architecture

【系统架构设计百科】复杂性管理:架构的核心战场

系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略


By .