土法炼钢兴趣小组的算法知识备份

【PG 内核】逻辑复制与逻辑解码:冲突处理与延迟放大

文章导航

分类入口
databasekernel
标签入口
#postgresql#pg-kernel#logical-replication#logical-decoding#reorder-buffer#pgoutput#publication#subscription#decode#conflict-detection#replication-slot#wal

目录

逻辑复制与逻辑解码:冲突处理与延迟放大

流复制(物理复制)能给你一个和 Primary 逐字节相同的 Standby,但它有一个硬伤:你必须复制整个集群。你不能只复制一张表、不能跨大版本复制、不能做双向同步。(关于流复制的完整内核机制,参见流复制。)逻辑复制补上了这些缺口——它从 WAL 中解码出行级变更(INSERT/UPDATE/DELETE),然后把这些变更应用到另一个数据库,甚至可以跨 PG 版本。

听起来很美好,但逻辑复制的故障形态和流复制截然不同。流复制的麻烦在 Slot 溢出和延迟;逻辑复制的麻烦在冲突——Publisher 上成功的操作,在 Subscriber 上可能因为约束冲突而失败。更隐蔽的是,大事务在 Reorder Buffer 中会引发延迟放大效应,一条 UPDATE 话在 commit 之前不会发给 Subscriber。

本文从源码路径拆解 PG 逻辑复制和逻辑解码的内核骨架,重点解释四个运维中最常遇到但理解最不充分的问题:四种冲突类型的根因和修复边界、序列不复制导致的自增主键冲突陷阱、大事务延迟放大的机制根因、以及 Subscription 被 disable 后的正确追平策略。


一、逻辑解码:从 WAL 到逻辑变更

整体架构

逻辑复制和流复制共用同一套 WAL 基础设施,但走向完全不同。流复制的 WAL Sender 直接把 WAL record 的原始字节发给 Standby;逻辑复制则多了一个逻辑解码(Logical Decoding)环节——从 wal_level = logical 级别的 WAL record 中解码出每一行的INSERT/UPDATE/DELETE,再通过 Output Plugin 序列化成协议消息发送给 Subscriber。

sequenceDiagram
    participant BE as Backend (Publisher)
    participant WB as WAL Buffer
    participant LD as Logical Decoding
    participant RB as Reorder Buffer
    participant OP as Output Plugin (pgoutput)
    participant AW as Apply Worker (Subscriber)

    BE->>WB: XLogInsert() 写入 WAL record
    WB->>LD: XLogReadRecord() 读取 record
    LD->>LD: LogicalDecodingProcessRecord()
    LD->>RB: RMGR decode → ReorderBufferQueueChange()
    Note over RB: 按 xid 暂存所有变更
    BE->>WB: XLogInsert() COMMIT record
    WB->>LD: XLogReadRecord() COMMIT
    LD->>RB: ReorderBufferCommit()
    RB->>RB: k-way merge 重建完整变更流
    RB->>OP: change_cb(INSERT/UPDATE/DELETE)
    OP->>AW: 网络传输 (replication protocol)
    AW->>AW: apply_dispatch() → INSERT/UPDATE/DELETE
```text

WAL Sender 对逻辑复制的角色变成了 WAL Reader——它不再发送原始 WAL record,而是不断调用 `LogicalDecodingProcessRecord()`,然后让 Output Plugin 把解码结果发出去。解码过程分了三个核心模块:Decode(逐 record 解码)、ReorderBuffer(按事务重排)、Output Plugin(序列化输出)。

### LogicalDecodingProcessRecord:逐 record 的解码

逻辑解码的入口函数是 `LogicalDecodingProcessRecord()`,位于 `src/backend/replication/logical/decode.c`。每次从 WAL 读取到一条 record,就调用它一次:

```c
// src/backend/replication/logical/decode.c
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
                                   XLogReaderState *record)
{
    XLogRecordBuffer buf;
    RmgrData    rmgr;

    buf.origptr = ctx->reader->ReadRecPtr;
    buf.endptr = ctx->reader->EndRecPtr;
    buf.record = record;

    // 如果有 top-level xid,注册到 reorder buffer
    topxid = XLogRecGetTopXid(record);
    if (TransactionIdIsValid(topxid))
        ReorderBufferAssignChild(ctx->reorder, xid, topxid, ...);

    // 通过 RMGR 表分发到对应的 decode 函数
    rmgr = GetRmgr(XLogRecGetRmid(record));
    if (rmgr.rm_decode != NULL)
        rmgr.rm_decode(ctx, &buf);
    else
        ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), ...);
}

这里的核心设计是 Resource Manager(RMGR)的分发表。WAL 中每条 record 都标明了它属于哪个 RMGR——RM_HEAP_ID(堆表插入/更新/删除)、RM_XACT_ID(事务提交/回滚)、RM_LOGICALMSG_ID(逻辑消息)等。不同的 RMGR 有各自对应的 rm_decode 回调,这些回调在 decode.c 中实现。

以堆表插入为例,链路是:

LogicalDecodingProcessRecord()
  → GetRmgr(RM_HEAP_ID) → rmgr.rm_decode = heap_decode()
    → switch (info) {
        case XLOG_HEAP_INSERT:   DecodeInsert(ctx, buf);
        case XLOG_HEAP_UPDATE:   DecodeUpdate(ctx, buf);
        case XLOG_HEAP_DELETE:   DecodeDelete(ctx, buf);
        case XLOG_HEAP_TRUNCATE: DecodeTruncate(ctx, buf);
      }
```text

每个 `Decode*` 函数做三件事:从 WAL record 数据中重建 tuple 内容(通过 `DecodeXLogTuple()`)、检查数据库 OID 和 origin 过滤条件、构造 `ReorderBufferChange` 并通过 `ReorderBufferQueueChange()` 入队到对应事务的变更链表中。

对于事务提交 (`RM_XACT_ID`),decode 路径不同:

```text
xact_decode()
  → switch (info) {
      case XLOG_XACT_COMMIT:
        DecodeCommit() → ReorderBufferCommit() // 触发重放
      case XLOG_XACT_ABORT:
        DecodeAbort()  → ReorderBufferAbort()  // 丢弃该事务所有变更
    }

关键点在于:直到看到事务的 COMMIT record 之前,该事务所有的 INSERT/UPDATE/DELETE 变更都只是暂存在 Reorder Buffer 中,不会发给 Output Plugin。这是逻辑复制延迟放大的根因。

DecodeXLogTuple:从 WAL 数据重建 HeapTuple

WAL record 中保存的 tuple 数据不是完整的 HeapTuple 格式——它只包含发生变更的列的数据和必要的 header 信息。DecodeXLogTuple() 负责将这些原始字节重建为 HeapTuple 结构:

// src/backend/replication/logical/decode.c
static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
{
    // data 指向 xl_heap_header 或 xl_multi_insert_tuple
    // 重建 HeapTupleData 的 t_data 指针、t_len、t_self
    // t_data 中包含 t_infomask2, t_infomask, t_hoff 以及列数据
}
```text

对于 UPDATE,WAL 中同时包含旧 tuple 和新 tuple(取决于 `REPLICA IDENTITY` 设置),解码函数分别通过 `ReorderBufferGetTupleBuf()` 分配内存并填充。REPLICA IDENTITY 决定了旧 tuple 中包含多少列——`FULL` 包含所有列,`DEFAULT`(主键)只包含主键列,这是 `UPDATE` 和 `DELETE` 在 Subscriber 端能否定位到目标行的关键。

---

## 二、Reorder Buffer:事务重排与 Snapshot 重建

### 为什么必须按 COMMIT 顺序解码

WAL 是按物理写入顺序(LSN 顺序)组织的,不是一个事务的连续记录。PG 使用 MVCC,多个事务的变更在 WAL 中交错出现——事务 A 的 INSERT、事务 B 的 UPDATE、事务 A 的 DELETE、事务 C 的 INSERT、事务 B 的 COMMIT、事务 A 的 COMMIT。

如果直接按 LSN 顺序把变更发给 Subscriber,事务边界就会丢失,Subscriber 上看到的不是原子提交。Reorder Buffer 的职责是**把按 LSN 交错的变更按事务重新组装,然后在事务 COMMIT 的时候按正确的顺序一次性输出**

```mermaid
flowchart LR
    subgraph WAL [WAL Record 序列(按 LSN)]
        direction LR
        W1["T1 INSERT"]
        W2["T2 UPDATE"]
        W3["T1 UPDATE"]
        W4["T3 INSERT"]
        W5["T2 COMMIT"]
        W6["T1 COMMIT"]
        W7["T3 COMMIT"]
    end
    subgraph RB [Reorder Buffer]
        direction TB
        T1["T1: INSERT → UPDATE"]
        T2["T2: UPDATE"]
        T3["T3: INSERT"]
    end
    subgraph OUT [输出顺序]
        direction LR
        O2["T2: BEGIN → UPDATE → COMMIT"]
        O1["T1: BEGIN → INSERT → UPDATE → COMMIT"]
        O3["T3: BEGIN → INSERT → UPDATE → COMMIT"]
    end
    W1 --> T1
    W2 --> T2
    W3 --> T1
    W4 --> T3
    W5 --> O2
    W6 --> O1
    W7 --> O3

注意:输出顺序是按 COMMIT LSN 排序的,不是按事务开始的顺序。T2 虽然比 T1 晚开始,但它先提交,所以 T2 的变更先输出。这保证了 Subscriber 上看到的提交顺序和 Publisher 一致。

ReorderBuffer 的核心数据结构

ReorderBuffersrc/backend/replication/logical/reorderbuffer.c 中的核心管理结构,维护以下关键成员:

ReorderBufferTXN 跟踪一个顶层事务及其所有子事务:

// src/include/replication/reorderbuffer.h
typedef struct ReorderBufferTXN {
    TransactionId   xid;
    XLogRecPtr      first_lsn;       // 事务第一条变更的 LSN
    XLogRecPtr      final_lsn;       // 事务 COMMIT record 的 LSN
    XLogRecPtr      end_lsn;         // COMMIT record 结束位置
    XLogRecPtr      restart_decoding_lsn;
    dlist_head      changes;         // ReorderBufferChange 双向链表
    Size            size;            // 该事务占用的内存
    uint64          nentries;        // 总变更数
    uint64          nentries_mem;    // 仍在内存中的变更数(溢出到磁盘后 < nentries)
    Snapshot        base_snapshot;   // 该事务的 catalog snapshot
    ReorderBufferTXN *subtxns;       // 子事务列表
    int             nsubtxns;
    uint32          txn_flags;       // 标志位组合
} ReorderBufferTXN;
```text

`ReorderBufferChange` 代表一条解码后的变更:

```c
typedef struct ReorderBufferChange {
    XLogRecPtr      lsn;
    ReorderBufferChangeType action;  // INSERT/UPDATE/DELETE/MESSAGE/TRUNCATE/...
    ReorderBufferTXN *txn;
    union {
        ReorderBufferTupleBuf *tuplebuf;  // INSERT/UPDATE/DELETE 的 tuple 数据
        ReorderBufferToastEnt *toast_ent; // TOAST 跨 record 重组状态
        // ... 其他类型特定数据
    } data;
} ReorderBufferChange;

k-way merge:子事务的 LSN 有序迭代

子事务(savepoint)引入了额外的复杂性——顶层事务 commit 时,其子事务的变更可能分布在不同的 LSN 区间。ReorderBufferCommit() 调用 ReorderBufferIterTXNInit() 初始化一个 k-way merge 迭代器:

  1. 收集所有子事务(包括顶层事务自身),每个子事务有一个 changes 链表。
  2. 将所有子事务的当前 change 按 LSN 插入一个 binary heap(最小堆)。
  3. 每次 ReorderBufferIterTXNNext() 弹出最小 LSN 的 change,然后将该子事务的下一个 change 入堆。
  4. 迭代结束时 ReorderBufferIterTXNFinish() 释放资源。

这保证了一个包含多个 savepoint 的事务,其变更按精确的 LSN 顺序串行化——与它们在 Publisher 上执行时的顺序完全一致。

Snapshot 重建:为什么需要 historic MVCC

解码 catalog 变更(如 ALTER TABLE)时,解码器需要能够看到该事务执行时的 catalog 状态。这被称为 historic MVCC——不能用当前的 catalog snapshot,而必须重建事务执行时的 snapshot。

Reorder Buffer 通过以下机制实现:

内存管理与溢出到磁盘

Reorder Buffer 的总内存由 logical_decoding_work_mem 控制(默认 64MB)。当所有事务累积的变更超出这个值时,Reorder Buffer 选择当前最大的事务(通过 txn_heap)将其变更序列化到磁盘:

  1. 被选中的事务的 changes 链表被序列化为 ReorderBufferDiskChange 格式写入临时文件。
  2. 该事务的 nentries_mem 递减至 0,但 nentries 保持不变,RBTXN_IS_SERIALIZED 标志被置位。
  3. 当事务 commit 时,ReorderBufferIterTXNInit() 通过 ReorderBufferRestoreChanges() 分批从磁盘读回变更。

这个溢出机制直接导致了大事务的延迟放大效应:如果一个大事务在 commit 之前被溢出到磁盘,它的变更可以在内存中持续累积,但无论如何都要等到 COMMIT 才释放——在此期间,这些变更占用的内存/磁盘空间不能被其他事务使用。


三、Output Plugin:pgoutput 协议

插件接口

PG 的逻辑解码框架支持多种 Output Plugin,包括内置的 pgoutput(用于逻辑复制)、test_decoding(调试用,输出文本格式)、以及第三方插件(如 wal2jsondecoderbufs)。每个插件实现一系列回调函数:

// src/include/replication/output_plugin.h
typedef struct OutputPluginCallbacks {
    void (*startup)(struct LogicalDecodingContext *ctx,
                    OutputPluginOptions *opt, bool is_init);
    void (*begin_txn)(...);
    void (*change)(...);
    void (*commit_txn)(...);
    void (*abort_txn)(...);
    void (*filter_by_origin_cb)(...);
    void (*shutdown)(...);
    // ... streaming, two_phase, truncate, message 回调
} OutputPluginCallbacks;
```text

`pgoutput` 是 PG 内置的二进制协议插件,实现位于 `src/backend/replication/pgoutput/pgoutput.c`。它将解码出的 `ReorderBufferChange` 序列化为二进制消息,通过 replication protocol 发送给 Subscriber。

### 消息类型

pgoutput 输出的消息类型覆盖了逻辑复制的完整生命周期:

| 消息类型 | 函数 | 用途 |
|---------|------|------|
| BEGIN | `logicalrep_write_begin()` | 事务开始,携带 commit LSN 和 timestamp |
| COMMIT | `logicalrep_write_commit()` | 事务提交,携带 commit LSN 和 end LSN |
| INSERT | `logicalrep_write_insert()` | 行插入,含新 tuple 各列值 |
| UPDATE | `logicalrep_write_update()` | 行更新,含旧/新 tuple |
| DELETE | `logicalrep_write_delete()` | 行删除,含旧 tuple(用于定位目标行) |
| RELATION | `logicalrep_write_rel()` | 发送表结构(列名、类型、replica identity) |
| TRUNCATE | `logicalrep_write_truncate()` | 表截断 |
| TYPE | `logicalrep_write_typ()` | 发送用户自定义类型信息 |
| ORIGIN | `logicalrep_write_origin()` | 复制来源标识 |
| STREAM START/STOP/COMMIT/ABORT | 系列函数 | 流式传输中大事务的变更 |

pgoutput 默认输出二进制格式(`OUTPUT_PLUGIN_BINARY_OUTPUT`)。也可以通过 `binary = false` 选项切换到文本格式。二进制格式避免了类型转换开销,要求 Publisher 和 Subscriber 的类型二进制表示兼容——这通常是主版本相同或相邻版本时成立。

### 行过滤与 UPDATE 变换

pgoutput 最复杂的逻辑在 `pgoutput_change()` 函数中。它不只是简单地转发解码结果——它需要判断该变更是否应该发送,以及如何发送:

**1. 发布资格检查**:`is_publishable_relation()` 跳过对非发布表的解码。

**2. 发布操作过滤**:检查 `relentry->pubactions`——如果发布只定义了 INSERT 操作,则跳过 UPDATE 和 DELETE。

**3. 分区路由**:如果表是分区并且发布使用了 `publish_via_partition_root`,将变更路由到顶层祖先表,并通过 `execute_attr_map_slot()` 转换列的格式。

**4. 行过滤器评估**:`pgoutput_row_filter()` 执行发布中定义的 WHERE 子句。对于 UPDATE 操作,行过滤器会产生有趣的变换:

| 旧 tuple 匹配过滤器? | 新 tuple 匹配过滤器? | 结果 |
|----------------------|----------------------|------|
||| 丢弃该变更 |
||| 变换为 **INSERT** |
||| 变换为 **DELETE** |
||| 保持为 **UPDATE** |

这个变换矩阵意味着:行过滤表达式可以影响 Subscriber 上执行的 SQL 类型。一个 Publisher 上的 UPDATE 可能在 Subscriber 上变成 INSERT(当旧值不匹配过滤器但新值匹配时)——旧值不匹配意味着 Subscriber 上之前没有这行(被过滤器排除了),现在新值匹配过滤器意味着这行第一次进入订阅范围,所以应该变成 INSERT。

### 延迟 BEGIN 优化

pgoutput 有一个关键优化:如果事务中没有任何一条变更通过了发布过滤器,则整个事务的 BEGIN/COMMIT 也不会发送。`pgoutput_change()` 通过 `txndata->sent_begin_txn` 标志跟踪——只有当至少一条变更被实际发送时,才会先发送 BEGIN。

---

## 四、Publication / Subscription 模型

### 发布端:Publication

Publication 定义了"哪些表的哪些操作需要被复制"。在 PG 内核中,Publication 由三个 catalog 表支撑:

- **`pg_publication`**:Publication 的定义——名称、是否 `all tables`、`pubinsert`/`pubupdate`/`pubdelete`/`pubtruncate` 标志位、`pubviaroot`、事件类型等。
- **`pg_publication_rel`**:Publication 与具体表的关系,可选携带列列表(`prattrs`)和行过滤表达式(`prqual`)。
- **`pg_publication_namespace`**:Publication 与 schema 的关系(PG 15+ 的 schema 级发布)。

创建 Publication 的 SQL:

```sql
CREATE PUBLICATION mypub FOR TABLE t1, t2 (c1, c2)
    WITH (publish = 'insert,update,delete');

-- PG 15+: schema 级发布
CREATE PUBLICATION mypub FOR ALL TABLES IN SCHEMA public;

-- PG 17+: 带行过滤器
CREATE PUBLICATION mypub FOR TABLE t1 WHERE (status = 'active');

pgoutput 在启动时调用 LoadPublications() 解析指定的 Publication 名称,然后通过 get_rel_sync_entry() 为每个关系构建 RelationSyncEntry,包含:

订阅端:Subscription

Subscription 定义了”从哪里接收什么数据”:

创建 Subscription 的 SQL:

CREATE SUBSCRIPTION mysub
    CONNECTION 'host=10.0.0.1 dbname=mydb'
    PUBLICATION mypub
    WITH (copy_data = true, create_slot = true, enabled = true,
          streaming = off, two_phase = false,
          disable_on_error = false);
```bash

### 初始表同步:TableSync Worker

当 Subscription 创建时,PG 对每个被订阅的表启动一个 tablesync worker:

1. 在 Publisher 端创建临时 replication slot,导出表当前快照。
2. 通过 `COPY` 将快照数据发送到 Subscriber 端的临时表。
3. 数据复制完成后,将 `pg_subscription_rel.srsubstate` 更新为 `SYNCDONE`,记录 `srsublsn` 为快照结束时的 WAL 位置。
4. 主 apply worker 在看到该表状态变为 `SYNCDONE` 后,从 `srsublsn` 之后的位置开始应用增量变更。

这个过程确保初始数据复制期间没有数据丢失——Snapshot 之后的变更被后续的增量 apply 覆盖。

### Apply Worker 的主循环

主 apply worker 是 Subscriber 端的核心进程。它通过 replication protocol 连接到 Publisher,接收 pgoutput 消息流,并在 Subscriber 上执行对应的 SQL:

```c
// src/backend/replication/logical/worker.c, LogicalRepApplyLoop()
// 主循环(简化)
for (;;) {
    // 从 publisher 接收一条消息
    msg = LogicalRepReadMessage();

    switch (msg->type) {
    case LOGICAL_REP_MSG_BEGIN:
        apply_handle_begin(msg->begin);
        break;
    case LOGICAL_REP_MSG_INSERT:
        apply_handle_insert(msg->insert);
        break;
    case LOGICAL_REP_MSG_UPDATE:
        apply_handle_update(msg->update);
        break;
    case LOGICAL_REP_MSG_DELETE:
        apply_handle_delete(msg->delete);
        break;
    case LOGICAL_REP_MSG_COMMIT:
        apply_handle_commit(msg->commit);
        break;
    case LOGICAL_REP_MSG_RELATION:
        apply_handle_relation(msg->relation);  // 缓存表结构
        break;
    // ... TRUNCATE, TYPE, ORIGIN 等
    }
}

apply_handle_insert() 执行 ExecSimpleRelationInsert()apply_handle_update() 执行 ExecSimpleRelationUpdate(),依此类推。这些操作都通过 Subscriber 端的 Executor 执行,因此会触发所有正常约束检查——唯一约束、外键约束、触发器等。这就是冲突的起点。


五、逻辑复制的冲突检测与处理

四种核心冲突类型

逻辑复制的冲突不是理论上的边缘情况,而是生产环境的高发故障。冲突的本质是:Publisher 上一个成功的操作,在 Subscriber 上由于数据状态不同而无法执行。

1. insert_exists(duplicate_key)

Publisher INSERT 了一行,但这行的主键/唯一键在 Subscriber 上已经存在。典型的根因:有人在 Subscriber 上手动插入了相同主键的数据,或者两个 Publisher 向同一个 Subscriber 写了相同主键的数据(多主到一)。

行为:这是唯一会直接报 ERROR 的冲突类型。Apply worker 遇到 duplicate key value violates unique constraint 时停止工作。

2. update_missing

Publisher UPDATE 了一行,但 Subscriber 上找不到对应行。根因:目标行在 Subscriber 上被手动 DELETE 了,或者之前的 insert_exists 冲突导致该行从未被插入。

行为静默跳过。Apply worker 在 FindReplTupleInLocalRel() 找不到目标行时,只输出一条 DEBUG1 级别的日志 "logical replication did not find row to be updated in replication target relation",不报错。

3. delete_missing

Publisher DELETE 了一行,但 Subscriber 上找不到对应行。根因与 update_missing 相同。

行为静默跳过。Apply worker 同样只输出 DEBUG1 日志,不报错。这意味着 Subscriber 上的数据可能比 Publisher 多——那些在 Publisher 上被删掉的行在 Subscriber 上残留。

4. update_differ / delete_differ(PG 17+ 冲突检测增强)

Publisher UPDATE/DELETE 了一行,但 Subscriber 上的这行是由另一个 origin(不是来自当前复制流)在本地修改过的。这需要 track_commit_timestamp = ondetect_conflict = true(PG 17+)才能检测到。

行为:在 PG 17+ 中可以通过 pg_stat_subscription_stats 的计数器追踪,但不阻断复制(除非升级为约束冲突)。

什么不算冲突:update_missingdelete_missing 的语义

update_missingdelete_missing 不报错,这是一个有意为之的设计决策——逻辑复制的目标是”将 Publisher 的数据尽可能应用到 Subscriber 上”。如果目标行不在,UPDATE/DELETE 就没有可执行的动作,跳过是合理的。但这也意味着 Subscriber 可能无声地偏离 Publisher 的状态。

问题在于:当 update_missing 发生时,之后同一个事务中的其他操作仍会继续执行。如果后续操作依赖于被跳过的 UPDATE 带来的数据一致性,Subscriber 就会进入不一致状态——跳过不是事务级别的回滚,只是操作级别的忽略

冲突修复策略

当 conflict 导致 apply worker 停止时,修复路径如下:

Step 1:确认冲突类型和 finish LSN。

错误日志中会包含关键信息:

ERROR:  duplicate key value violates unique constraint "test_pkey"
DETAIL:  Key (c)=(1) already exists.
CONTEXT:  processing remote data for replication origin "pg_16395"
          during "INSERT" for replication target relation "public.test"
          in transaction 725 finished at 0/14C0378
```text

需要记录两个值:origin name (`pg_16395`) 和 finish LSN (`0/14C0378`)。

**Step 2:选择修复策略。**

| 策略 | 操作 | 适用场景 | 风险 |
|------|------|---------|------|
| 手动修复数据 | 在 Subscriber 上删除或更新冲突行,然后 `ALTER SUBSCRIPTION ENABLE` | 数据完整性关键 | 需要确定哪个版本的数据是正确的 |
| 跳过事务 | `ALTER SUBSCRIPTION mysub SKIP (lsn = '0/14C0378')` | 确认该事务的全部变更都可以丢弃 | 整个事务被跳过,可能丢数据 |
| 推进 origin | `SELECT pg_replication_origin_advance('pg_16395', '0/14C0379')` | 和 SKIP 类似,但操作 origin 而非 subscription | 与 SKIP 效果相同,但绕过了 subscription 的 skip 记录 |

`pg_replication_origin_advance()` 必须在 subscription disabled 状态下执行,而 `ALTER SUBSCRIPTION ... SKIP` 不需要(但通常也建议先 disable)。

**Step 3:重新启用订阅。**

```sql
ALTER SUBSCRIPTION mysub ENABLE;

disable_on_error 选项

CREATE SUBSCRIPTION mysub
    CONNECTION '...' PUBLICATION mypub
    WITH (disable_on_error = true);
```text

当设置了 `disable_on_error = true` 时,任何导致 apply worker 报错的冲突都会自动将 `pg_subscription.subenabled` 设为 `false`。这避免了 apply worker 重复重试——否则 worker 会反复重启、撞上同一个冲突、再崩溃,产生日志风暴。

配合使用建议:生产环境的 Subscription 应当启用 `disable_on_error`,配合监控告警——当 `pg_stat_subscription` 中的订阅消失时,意味着有一个需要人工介入的冲突。

---

## 六、关键陷阱与排查

### 陷阱一:大事务的延迟放大效应

这是逻辑复制最隐蔽的性能杀手。Root cause 链条:

1. Publisher 上一个事务(比如批量 UPDATE 100 万行)开始后,其变更持续写入 WAL。
2. 逻辑解码器从 WAL 逐个解码,每条变更通过 `ReorderBufferQueueChange()` 追加到对应事务的 `ReorderBufferTXN.changes` 链表中。
3. 事务越大,`changes` 链表越长,内存占用越大。当超过 `logical_decoding_work_mem` 时,Reorder Buffer 将其序列化到磁盘。
4. **关键点**:不管变更是在内存中还是在磁盘上,**整个事务的变更在 COMMIT 之前不会发给 Subscriber**。`ReorderBufferCommit()` 只会当解码器读到该事务的 COMMIT record 时才被调用。
5. 因此,一个运行了 30 分钟的大事务,会使 Subscriber 端的复制延迟至少 30 分钟——而且 Subscriber 不是在线 apply 的,而是堆积在 Publisher 的 Reorder Buffer 中,等 COMMIT 后一次性发送。

在 `pg_stat_replication` 中,你会看到 `write_lag` 和 `flush_lag` 正常(因为 WAL 一直在流动),但 Subscriber 的实际数据延迟很大。根源在 `pgoutput` 的延迟 BEGIN 机制——在整个事务完成之前,Subscriber 什么都看不到。

**排查方法**

```sql
-- Publisher 端:检查 replication slot 的 restart_lsn 推进情况
SELECT slot_name, restart_lsn,
       pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_type = 'logical';

-- Subscriber 端:检查 apply worker 的延迟
SELECT application_name,
       pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS lag_bytes
FROM pg_stat_replication;

缓解手段

陷阱二:序列不是逻辑复制的范围

逻辑复制复制的是表的 DML 操作(INSERT/UPDATE/DELETE/TRUNCATE),不复制序列(SEQUENCE)的当前值。这是一个常见的自增主键冲突陷阱:

  1. Publisher 上表 t 使用 SERIAL 主键,Publisher 的 sequence 已经生成到 nextval = 1000
  2. 创建 Subscription 时执行初始数据快照,Subscriber 上插入了 id 为 1-1000 的行,但 Subscriber 的 sequence 仍在初始值 1。
  3. Subscriber 上如果有人直接插入(或另一个复制源插入),会使用 nextval = 1,与已有的 id=1 行冲突。
  4. 更常见的是:Publisher 和新 Subscriber 是在同一个 table DDL 基础上创建的,两边 sequence 起始值相同——Publisher 插了 500 行(id 1-500),初始快照同步后 Subscriber 上也有 id 1-500,但 Subscriber 本地的 sequence 还在 1。之后任何本地写入都会冲突。

修复

-- 在 Subscriber 端手动推进 sequence
SELECT setval('t_id_seq', (SELECT max(id) FROM t));
```text

如果使用双向或多主逻辑复制,需要为每个节点的序列分配不重叠的范围(如 `START 1 INCREMENT 2` vs `START 2 INCREMENT 2`),或者使用 UUID 替代序列作为主键。

### 陷阱三:Subscription 被 disable 后的追平

当 Subscription 因冲突被禁用(`subenabled = false`,无论自动或手动)后,Publisher 端对应的 replication slot 在此期间**不会停止积累 WAL**。Slot 的 `restart_lsn` 停留在 Subscriber 最后确认的位置,Publisher 不能回收这个 LSN 之后的 WAL 段。

这意味着:
1. 如果 Subscriber 宕机或 disabled 时间过长,Publisher 的 `pg_wal` 目录会膨胀——这是 slot 溢出的多米诺效应,和流复制完全一样。
2. Re-enable 后,Subscriber 会从上次停止的位置继续 apply,追赶期间 Publisher 端 WAL 回收恢复正常。

**追平策略**

```sql
-- 1. 禁用订阅(如果尚未禁用)
ALTER SUBSCRIPTION mysub DISABLE;

-- 2. 处理冲突数据(根据冲突类型手动修复)
-- 3. 重新启用
ALTER SUBSCRIPTION mysub ENABLE;

-- 4. 监控追平进度
SELECT application_name, state,
       pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS lag_bytes
FROM pg_stat_replication;

如果 Publisher 端的 WAL 已被回收(slot 在 disabled 期间被删除或 advance),Subscriber 永久无法追平——必须重建 Subscription。

陷阱四:REPLICA IDENTITY 与 UPDATE/DELETE 的定位失败

逻辑复制的 UPDATE 和 DELETE 依赖表上的 REPLICA IDENTITY 来确定”哪一行应该被更新/删除”。默认情况下,REPLICA IDENTITY 使用主键(DEFAULT)。如果表没有主键,也没有设置 REPLICA IDENTITY FULL,UPDATE 和 DELETE 操作无法在 Subscriber 上定位目标行:

-- 检查表上的 REPLICA IDENTITY
SELECT relname, relreplident
FROM pg_class WHERE relname = 'your_table';
-- relreplident: 'd' = DEFAULT (PK), 'f' = FULL, 'n' = NOTHING

-- 修复:设置 REPLICA IDENTITY
ALTER TABLE your_table REPLICA IDENTITY FULL;
```text

`REPLICA IDENTITY FULL` 在 WAL 中记录旧 tuple 的全部列值,确保 Subscriber 能通过全列比对定位目标行。代价是 WAL 量显著增加(每次 UPDATE/DELETE 都要写完整旧行),且旧行中如果有 TOAST 列,其值可能因未压缩而在 WAL 中膨胀。

---

## 七、逻辑复制 vs 流复制:选型

| 维度 | 流复制(物理复制) | 逻辑复制 |
|------|-------------------|---------|
| 复制粒度 | 整个集群 | 表级 |
| 跨版本兼容 | 不允许(必须同主版本) | 允许(Publisher 版本不高于 Subscriber) |
| 复制内容 | page-level WAL | 行级 DML(INSERT/UPDATE/DELETE) |
| DDL 复制 | 自动(DDL 产生 catalog 变更的 WAL) | 不复制(PG 17+ 有限支持) |
| 序列复制 | 自动(序列变更写 WAL) | 不复制 |
| 冲突处理 | 无(物理一致性保证) | 约束冲突需手动处理 |
| 双向/多主 | 不支持 | 支持(需谨慎处理冲突) |
| 数据延迟 | 毫秒级 | 毫秒-分钟级(取决于事务大小和 streaming 模式) |
| WAL 用量 | 高(page-level) | 更高(`wal_level = logical`增加额外 WAL 信息) |

选型结论:如果目标是灾难恢复、高可用、只读负载均衡,用流复制。如果目标是表级同步、跨版本升级、双向同步、或向数据仓库/分析系统发送变更流,用逻辑复制。两种可以并用——同一个 Primary 上可以有多个物理 Standby(流复制)+ 多个逻辑 Subscriber。

---

## 八、实验:搭建逻辑复制并构造冲突

### 环境准备

```bash
# Publisher 端配置
# postgresql.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

# 重启后创建 Publication
psql -c "CREATE TABLE t (id SERIAL PRIMARY KEY, val TEXT);"
psql -c "CREATE PUBLICATION mypub FOR TABLE t;"
# Subscriber 端配置
# postgresql.conf
max_replication_slots = 4
max_logical_replication_workers = 4

# 创建相同的表结构
psql -c "CREATE TABLE t (id SERIAL PRIMARY KEY, val TEXT);"

# 创建 Subscription
psql -c "CREATE SUBSCRIPTION mysub
    CONNECTION 'host=127.0.0.1 port=5433 dbname=test'
    PUBLICATION mypub;"
```bash

### 构造 duplicate_key 冲突

```bash
# Publisher 写入数据
psql -p 5433 -c "INSERT INTO t (val) VALUES ('from publisher');"

# 确认 Subscriber 已同步
psql -p 5434 -c "SELECT * FROM t;"

# 在 Subscriber 上手动插入相同主键的数据(干扰)
psql -p 5434 -c "INSERT INTO t (id, val) VALUES (1, 'from subscriber');"
# 这条可能成功(取决于 sequence 状态),或者先推进 sequence 再插

# Publisher 再次写入,id 冲突
psql -p 5433 -c "INSERT INTO t (val) VALUES ('another from publisher');"
# 如果 publisher 的下一个 id 与 subscriber 上已有的冲突,apply worker 报错

构造 update_missing

# Publisher 写入并更新
psql -p 5433 -c "INSERT INTO t (val) VALUES ('will be deleted on sub');"
# 等待同步后,在 Subscriber 上删除这行
psql -p 5434 -c "DELETE FROM t WHERE val = 'will be deleted on sub';"
# Publisher 上 UPDATE 这行
psql -p 5433 -c "UPDATE t SET val = 'updated' WHERE val = 'will be deleted on sub';"
# Subscriber 日志中会出现(DEBUG1 级别):did not find row to be updated
# 但不会报错,apply worker 继续运行
```bash

### 观察复制状态

```sql
-- Publisher 端查看 replication slot
SELECT slot_name, slot_type, database, active,
       pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag
FROM pg_replication_slots;

-- Subscriber 端查看订阅状态
SELECT subname, subenabled, subconninfo FROM pg_subscription;

-- Subscriber 端查看表同步状态
SELECT srsubid::regclass, srsubstate, srsublsn
FROM pg_subscription_rel;

-- 查看 apply worker 统计(PG 17+)
SELECT * FROM pg_stat_subscription_stats;

九、关键要点

  1. 逻辑复制解码 WAL 为行级 DML——LogicalDecodingContext 经 RMGR 分发到 heap_decode(),Reorder Buffer 按事务重排,COMMIT 后才向 output plugin 输出。
  2. Reorder Buffer 是大事务延迟的根源——未 COMMIT 的变更必须缓存在内存(或溢出到磁盘),Subscriber 只能看到已提交事务。
  3. Publication/Subscription 是表级复制模型——序列、large object、DDL(PG 16 及之前)不在默认复制范围内;跨版本迁移需单独处理这些盲区。
  4. 冲突分四类duplicate_keyupdate_missingdelete_missing、apply 错误——多数场景需要人工介入或 REPLICA IDENTITY FULL 配合,不能指望自动修复。
  5. 与流复制互补而非替代——流复制做 HA 和物理一致性;逻辑复制做表级同步、跨版本升级和异构消费。

上一章:流复制 下一章:扩展系统与 FDW


参考资料

源码(PG 17)

官方文档

其他

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-06-16 · database / kernel

【PG 内核】PostgreSQL 内核机制深度拆解

从进程模型到磁盘页面、从 MVCC 到流复制——对 PostgreSQL 内核做完整的源码级拆解。不止步于源码分析:26 篇中 6 篇是运维实战——经典故障的根因与排查路径、性能调查的五层工具链、配置陷阱与恢复边界。面向想读懂 PG 内核源码、在生产环境排查过问题、准备给 PG 贡献代码的工程师。

2026-06-16 · database / kernel

【PG 内核】WAL 内部机制:从事务提交到磁盘刷写

拆解 PostgreSQL WAL 的完整内部机制:XLogInsert() 从分段锁到 WAL Buffer 的插入路径、XLogRecord 的物理布局(Header + Block Headers + Data)、Checkpoint 的两阶段流程与 IO 摊平算法、REDO 恢复的 RMGR 分发、wal_level 三级差异的 WAL 记录对比。运维部分聚焦 checkpoint IO 风暴的根因与 checkpoint_completion_target 的调优陷阱、max_wal_size 设小导致 WAL 段疯狂切换的机制,以及用 pg_waldump 定位问题 WAL record 的实操方法。

2026-06-16 · database / kernel

【PG 内核】流复制:从 WAL Sender 到 Slot 溢出的多米诺效应

拆解 PostgreSQL 流复制的完整内核路径:WAL Sender 的 WalSndLoop→XLogSendPhysical 发送链路、WAL Receiver 的 WalRcvLoop 接收与恢复链路、同步复制的三种语义与等待机制、Failover 时 Timeline 的 fork 原理与 split-brain 风险、Primary-standby 冲突的本质与 max_standby_streaming_delay 的 trade-off、Replication Slot 的内部结构。重点剖析 Slot 溢出多米诺效应——standby 宕机→slot 阻止 WAL 回收→pg_wal 填满磁盘→primary PANIC 的完整事件链,以及 wal_keep_size 与 slot 的互相影响。配合 pg_stat_replication 的三层延迟指标排查与 conflict_reason 解读。

2026-06-16 · database / kernel

【PG 内核】经典故障模式与排查手册:五个真实事故的内核根因

拆解 PG 生产环境中最危险的五种故障模式——连接风暴与 work_mem 连锁效应、事务 ID wraparound 危机完整时间线、replication slot 溢出多米诺效应、OOM 连锁 kill、长事务 idle in transaction 隐性破坏。每个故障给出可复现的触发方法、Mermaid 时序图标注事件节点和排查断点、排查 SQL 脚本和修复边界,以及监控埋点策略让下次提前发现而非事后救火。


By .