土法炼钢兴趣小组的算法知识备份

【流式数据处理】Debezium 与 Change Data Capture

文章导航

分类入口
databasedistributed
标签入口
#debezium#cdc#kafka-connect#mysql#postgresql#binlog#snapshot#schema-history

目录

lakehouse 第 19 章入湖侧讲清楚了 Flink/Iceberg sink 怎么提交、CDC 怎么借 equality delete 做 upsert。但 CDC 管道在到达 Flink 或 Iceberg 之前,还有一整段「数据库 → Kafka」的传输层:谁读 binlog/WAL、事件长什么样、快照和增量怎么切换、断点存在哪。这一段通常由 Debezium + Kafka Connect 承担。

读完 lakehouse/19,读者仍可能卡在这些问题:

本文只讲 CDC 采集与事件模型,不重复 lakehouse/19 的表格式提交协议,也不展开 Flink checkpoint 内部(见本系列第 1015 篇)。目标是把 Debezium 产出的事件读懂、把 Connect 架构里的断点与 schema 主题管清楚,并为入湖 upsert 准备好主键、顺序、幂等前提。

环境说明:本机为 Arch Linux on WSL2(内核 6.6.87.2)、i9-12900K / 32 GB,未安装 JVM、Kafka Connect、MySQL、Debezium。下文 JSON 示例来自 Debezium MySQL Connector 官方文档(标注「经删减」);配置片段按官方文档可复现,不粘贴未执行环境的命令输出,也不伪造吞吐/延迟数字。

版本锚定:Debezium 3.x / 2.x 稳定版文档;Kafka Connect 随 Kafka 3.x(KRaft);MySQL connector 以 binlog + GTID 场景为主叙述,PostgreSQL 差异在文中标注。


一、CDC 管道在整体链路中的位置

典型实时入湖链路可以切成四段:

flowchart LR
  DB[(OLTP<br/>MySQL / PG)] -->|binlog / WAL| DBZ[Debezium<br/>Kafka Connect Source]
  DBZ -->|change events| K[Kafka topics]
  K --> ENG[Flink / Spark /<br/>Connect Sink]
  ENG --> LAKE[("Iceberg / Hudi / Delta")]
负责方 本文覆盖
数据库 → Kafka Debezium + Connect
Kafka → 有状态计算 Flink 等 仅衔接点
计算 → 对象存储文件 Flink writer 见第 17 篇
文件 → 表 snapshot 表格式 committer 见 lakehouse/19

Debezium 的角色是 log-based CDC:持续读取数据库事务日志(MySQL binlog、PostgreSQL logical decoding WAL 等),把每条行级变更封装成统一信封,写入 Kafka。它保证下游 exactly-once——那是 Kafka + Flink + 表格式三层叠加的结果(本系列第 1415 篇)。Debezium 保证的是:在 connector 正常运行且 offset 未丢失的前提下,按日志顺序 emit 变更,并在 envelope 里携带足够的源库位点行镜像供下游还原语义。


二、变更事件信封(Envelope)

每条 Debezium 变更事件(change event)在 Kafka 里通常是一条 JSON / Avro 记录。核心结构固定为四块:

字段 含义
op 操作类型:c / r / u / d(及扩展码,见第三节)
before 变更行镜像;INSERT 时为 null
after 变更行镜像;DELETE 时为 null
source 源库坐标:connector 名、表名、日志位点、snapshot 标记等

外层还有 ts_ms(Connect 处理时间戳)、可选 transaction(事务边界元数据,需显式开启)等。Debezium 文档称这一结构为 envelope——下游无论用 Flink SQL、Spark 还是 Iceberg Connect sink,首先都要按 envelope 解析语义,而不是直接把 Kafka 字节当 Parquet 列。

2.1 INSERTop: c

创建行:beforenullafter 为完整新行。source.snapshot 通常为 "false"(来自 streaming)。

官方文档风格的示例(经删减):

{
  "before": null,
  "after": {
    "id": 1002,
    "email": "alice@example.com",
    "name": "Alice"
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "mysql",
    "name": "inventory",
    "ts_ms": 1705305600000,
    "snapshot": "false",
    "db": "inventory",
    "table": "customers",
    "server_id": 223344,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "thread": 7
  },
  "op": "c",
  "ts_ms": 1705305601050
}

入湖侧把 after 当作待 upsert 的新版本行;主键字段必须出现在 after 里(见第九节)。

2.2 UPDATEop: u

更新行:beforeafter 同时非空(除非 column 过滤或 REPLICA IDENTITY 限制导致 PG 旧值不全)。Debezium 用这一对镜像表达「改了什么」——下游 equality delete + insert 入湖时,通常用 after 写新行、用主键(有时加上 before 里变化前的键)定位旧行。

{
  "before": {
    "id": 1002,
    "email": "alice@example.com",
    "name": "Alice"
  },
  "after": {
    "id": 1002,
    "email": "alice@example.com",
    "name": "Alice Smith"
  },
  "source": {
    "connector": "mysql",
    "name": "inventory",
    "ts_ms": 1705305700000,
    "snapshot": "false",
    "db": "inventory",
    "table": "customers",
    "file": "mysql-bin.000003",
    "pos": 892
  },
  "op": "u",
  "ts_ms": 1705305700120
}

若下游只关心当前状态而非变更 diff,常用 ExtractNewRecordState SMT 把 envelope 压平为「仅 after 列 + __op 头」——但入湖 upsert 若要做 delete,仍需保留 op 或 tombstone 语义(lakehouse/19 第四节)。

2.3 DELETEop: d

删除行:afternullbefore 携带被删行的最后镜像。Iceberg equality delete 入湖时,关键是从 before(或 Kafka key)提取主键,写 delete 文件。

{
  "before": {
    "id": 1002,
    "email": "alice@example.com",
    "name": "Alice Smith"
  },
  "after": null,
  "source": {
    "connector": "mysql",
    "name": "inventory",
    "snapshot": "false",
    "db": "inventory",
    "table": "customers",
    "file": "mysql-bin.000003",
    "pos": 1204
  },
  "op": "d",
  "ts_ms": 1705305800300
}

2.4 快照读(op: r

初始快照(blocking snapshot)与增量快照(incremental snapshot)阶段,Debezium 把表扫描结果 emit 为 READ 操作,即 op: rbeforenullafter 为行内容;source.snapshot"true""last""incremental" 等,标记事件来自快照而非 binlog streaming。

增量快照事件示例(Debezium 文档 Example 1,经删减):

{
  "before": null,
  "after": {
    "pk": "1",
    "value": "New data"
  },
  "source": {
    "snapshot": "incremental"
  },
  "op": "r",
  "ts_ms": "1620393591654"
}

常见误区:把 r 当成「普通 insert」再写一遍,会导致入湖重复行。处理方式有三类:

  1. 下游按 source.snapshot 过滤,快照阶段与 streaming 阶段用不同 sink 策略;
  2. 用 upsert 按主键覆盖(幂等,lakehouse/19);
  3. 配置 ReadToInsertEvent SMT 把 r 改写成 c(仅当业务明确需要统一 op 码)。

MySQL connector 文档还说明:默认快照 emit r;若希望快照行表现为 c,可启用 ReadToInsertEvent SMT。

2.5 截断与其它 op 码

MySQL TRUNCATE 等 DDL 可能产生 op: t(truncate)类事件(视 connector 与表类型而定)。下游若只做行级 upsert,必须显式处理 truncate——否则湖表保留被截断表的历史行。PostgreSQL connector 对 TRUNCATE 的支持与配置相关;生产上常在下游把 truncate 映射为分区 overwrite 或人工运维,而不是假设 Debezium 自动等价于 DELETE 全表。


三、before / after 的工程语义

3.1 为什么需要成对镜像

关系型数据库日志里,一条 UPDATE 可能只记录「改了哪些列」。Debezium 在 connector 内维护表 schema 内存模型(见第五节 schema history),把日志条目 decode 成完整行镜像。对下游而言:

场景 应读字段 入湖动作(概念)
insert after append / upsert 新行
update after(主键),必要时 before(变键) equality delete 旧键 + 写新行
delete before 中的主键 equality delete
snapshot read after upsert 覆盖或单独快照管道

3.2 PostgreSQL 的 REPLICA IDENTITY

PostgreSQL logical replication 下,若表为 REPLICA IDENTITY DEFAULTUPDATE/DELETEbefore 可能只有主键列,非主键旧值缺失。需要完整 before 镜像时,应把表设为 REPLICA IDENTITY FULL(代价是 WAL 体积增大)。这是 CDC 入湖前必须在源库确认的 DBA 项,不是 Debezium 单方能补的。

3.3 列过滤与 schema 演进

Debezium 支持 column.include.list / column.exclude.list。过滤后 before/after 只含子集列,但 主键列必须保留,否则无法 upsert。schema 加列后,新事件 after 多字段、旧事件少字段——Avro/Schema Registry 场景靠 schema id 演进;JSON 场景靠下游「缺列填 null」策略。


四、source 元数据:位点、快照标记与顺序

source 块是 Debezium 的断点凭证,也是下游判断事件先后顺序的辅助依据(最终顺序仍由 Kafka 分区内 offset 保证)。

4.1 常用字段

字段 MySQL PostgreSQL 作用
connector mysql postgresql connector 类型
name 逻辑 server 名(database.server.name 同左 offset / schema history 的命名空间
db / schema / table 库表 schema + table 路由与 metrics
file + pos 或 GTID binlog 文件与偏移 MySQL 恢复位点
lsn WAL LSN PG 恢复位点
ts_ms 源库变更时间(毫秒) 同左 事件时间分析
snapshot true / false / last / incremental 同左 是否快照事件
sequence [last_commit_lsn, current_lsn] PG 事务内顺序

关键区分source.ts_ms 是数据库里变更发生的时间;记录顶层的 ts_ms 是 Kafka Connect 处理该事件的时间。做事件时间窗口(本系列第 2 篇)时,应优先 source.ts_ms,并处理 clock skew。

4.2 Kafka 分区内顺序 vs 跨表顺序

Debezium 默认按 发 topic(topic.prefix + . + schema + . + table)。单表变更在单分区内有序(若 producer 按主键 hash 到固定分区);跨表无全局顺序。多表 JOIN 物化或宽表入湖时,要么在 Flink 里按业务键重分区,要么接受最终一致。


五、Snapshot 阶段 vs Streaming 阶段

Debezium connector 生命周期分两阶段:

stateDiagram-v2
  [*] --> Snapshot: 首次启动或无有效 offset
  Snapshot --> Streaming: 快照完成,切入 binlog/WAL
  Streaming --> Streaming: 持续 emit c/u/d
  Streaming --> Snapshot: snapshot.mode=always 等
  note right of Snapshot: emit op=r(或 SMT 改为 c)
  note right of Streaming: source.snapshot=false

5.1 为什么要 snapshot

MySQL binlog、PostgreSQL WAL 都有保留策略——日志会被 purge。Connector 不能假设「从很久以前的位置还能读」。首次启动时,Debezium 通常先做 consistent snapshot(一致快照):在某一时间点冻结表视图(MySQL 用全局读锁或 table-level lock 变体),导出表数据,再从快照对应的 binlog 位点开始 streaming,避免快照与日志之间的缝。

PostgreSQL 使用 replication slot,原理类似:快照边界与 slot 起始 LSN 对齐。

5.2 Blocking snapshot 工作流(概念)

MySQL connector 文档描述的标准流程(简化):

  1. 若配置了 snapshot,锁定/协调源表(视 snapshot.locking.mode);
  2. 读取当前 binlog 位置,作为 streaming 起点;
  3. 扫描 table.include.list 中的表,emit op=r 事件;
  4. 释放锁,切换到 binlog streaming;
  5. 对快照开始后新建表的 DDL,在 streaming 阶段补 capture。

快照进行中 connector 失败重启,会从快照进度恢复而非重头读 binlog(具体 checkpoint 由 Debezium 内部 offset 记录)。

5.3 Streaming 阶段

快照完成后,connector 只读事务日志:

此阶段 source.snapshotfalse。Offset 持续写入 Connect offset storage(第六节)。若 binlog 被 purge 且 offset 指向已不存在的位置,connector 失败;snapshot.mode=when_needed 可触发重新快照(MySQL 文档 Behavior when things go wrong)。

5.4 Incremental snapshot(增量快照)

大表全量 blocking snapshot 可能耗时数小时。Debezium 支持 incremental snapshot:在不长时间锁表的前提下分 chunk 扫描,并与 streaming 并行。增量快照仍 emit op=rsource.snapshot=incremental。文档指出:streaming 与 snapshot 可能乱序到达,connector 内部有 buffer 做碰撞解析,之后才对 Kafka emit——下游仍可能短暂看到「先 update 后 read」的错觉,upsert 按主键可吸收。

触发方式: signaling table、execute-snapshot Kafka 信号 topic 等(Debezium Incremental snapshots 章节)。


六、snapshot.mode 选型

snapshot.mode 决定 connector 启动时是否以及如何跑快照。MySQL connector 官方文档(stable)主要取值如下:

模式 行为摘要 典型用途
initial(默认) 无 offset 时跑全量快照+schema,然后 streaming;有 offset 则跳过快照 新管道标准起点
initial_only 只快照,不 streaming 一次性灌 Kafka
never 不快照,直接从当前日志位置 stream(可能丢历史 下游已有全量
no_data 只抓 schema,不导数据 只要结构变更
when_needed offset 丢失或 binlog 不可用时自动快照 生产容错
always 每次启动都全量快照 测试或强制全量刷新
recovery 重建丢失的 schema history topic 灾难恢复
configuration_based 用前缀属性细粒度控制 高级
custom 自定义 Snapshotter SPI 特殊策略

与 offset 的交互(工程上最易踩坑):

PostgreSQL connector 有类似概念,但依赖 replication slot;删 slot 的后果比删 Kafka offset 更严重——可能要从头快照。


七、Kafka Connect 架构

Debezium 以 Kafka Connect source connector 插件形式运行,不单独起进程(除非用 Debezium Engine 嵌入应用,本文不展开)。

flowchart TB
  subgraph Connect Cluster
    W[Connect Worker]
    C[Debezium MySQLConnector]
    T1[Task 0]
    T2[Task 1]
    W --> C
    C --> T1
    C --> T2
  end
  DB[(MySQL)] --> T1
  DB --> T2
  T1 --> K1[Kafka topic<br/>inventory.db.orders]
  T2 --> K2[Kafka topic<br/>inventory.db.customers]
  T1 --> OS[(offset.storage.topic)]
  T1 --> SH[(schema.history.internal.kafka.topic)]

7.1 Worker、Connector、Task

概念 职责
Worker JVM 进程,加载 connector 插件,执行 task
Connector 配置与任务拆分逻辑(MySqlConnector
Task 实际读 binlog / 发 Kafka 的并行单元

MySQL connector 通常 1 个 database server → 1 个 task(单线程读 binlog)。PostgreSQL 一般也是单 task。并行度来自多 connector 实例捕获不同库表,而非单 connector 多 task 读同一 binlog。

7.2 配置与 REST 管理

Connect 通过 REST API 注册 connector:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "inventory",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "include.schema.changes": "true"
  }
}

tasks.max 对 MySQL 实际常为 1;调大不会加速单库 binlog 读取。

7.3 内部 topic 与 Connect 框架 topic

除业务 change event topic 外,还有三类基础设施 topic

Topic 默认名 compaction 作用
Offset storage connect-offsets(可配置) 存 connector 读库位点
Config storage connect-configs Connect 集群配置
Status storage connect-status connector/task 状态

Debezium 另用 schema history topic(connector 级配置,非 Connect 框架内置):

Topic 配置项 compaction
Schema history schema.history.internal.kafka.topic 不应 compact(需完整 DDL 链)

生产必须给 connect-offsets 与 schema history 设 replication.factor ≥ 3,否则 broker 故障可能丢断点,触发全量重快照。


八、Offset storage:读库位点存在哪

Debezium 的 offset 不是 Kafka consumer group offset。它记录的是源数据库日志坐标,例如:

Connect 把这些 JSON 以 key = [connector 名, partition 标识] 写入 offset.storage.topic(默认 connect-offsets),value 为位点 + 可选 schema history 进度。

sequenceDiagram
  participant DB as MySQL binlog
  participant T as Debezium Task
  participant OS as connect-offsets
  participant K as data topic
  DB->>T: event at pos 892
  T->>K: emit change event
  T->>OS: commit offset pos 892
  Note over T,OS: task 崩溃重启后<br/>从 OS 读 pos 892 续读

与下游 Flink 的分工

三层独立。清空 Flink checkpoint 不会回退 Debezium;重置 Debezium offset 不会自动重置 Flink。

8.1 重置 offset 的后果

社区 FAQ 常见操作:向 connect-offsetstombstone(key 相同、value null)触发 compaction 删除位点。后果:

任何「从某个 binlog pos 重放」的运维都必须同时规划 湖侧幂等下游 consumer


九、Schema history topic:DDL 链为何不能 compact

数据库 schema 随时间变化。同一张表在 t0 可能是 (id, name),t1 加了 email 列。Binlog 里 t0 时刻的 UPDATE 只有两列,t1 时刻有三列——connector 必须知道每个位点对应的 schema 才能 decode。

Debezium 做法:

  1. 在内存维护每张表的当前 schema;
  2. 遇到 DDL,解析并更新内存,同时 append 一条记录到 schema.history.internal.kafka.topic
  3. Restart 时从 offset 位点出发,重放 schema history 中截至该位点的全部 DDL,重建内存 schema,再读 binlog。

因此 schema history topic 必须 保留完整 append-only 历史(文档明确:不要 compact)。若 topic 丢失或损坏,需 snapshot.mode=recovery 等模式重建(MySQL 文档 Restoring a database schema history topic)。

9.1 Schema change 事件 topic

include.schema.changes=true,Debezium 还会向 {topic.prefix} 或配置的 topic 发送 schema change 事件(含 DDL 语句与 tableChanges 结构),供下游 Registry 或 Flink 动态建表。这与 history topic 不同:history 给 connector 自己恢复用;schema change topic 给应用消费。

Flink CDC 或 KafkaTableSource 监听 schema change 来 ALTER TABLE 湖表——否则新列进不了 Iceberg equality field。


十、Kafka 消息键、topic 命名与 tombstone

10.1 Topic 命名

默认:{topic.prefix}.{database}.{table},例如 inventory.inventory.customers。可通过 topic.naming.strategy 定制。

10.2 消息 Key

默认 key 为 主键 JSON。同一主键的变更进入同一 Kafka 分区(若 key-based partitioner),保证分区内 per-key 顺序——这对 upsert 入湖至关重要:同一 uidu/d 若乱序,湖表可能短暂出现旧版本覆盖新版本。

无主键表:key 可能为 null 或整行 hash,顺序语义弱,入湖 upsert 需额外设计( surrogate key 或全行 equality)。

10.3 Delete 与 compacted topic

若对 change topic 启用 log compactionDELETE 事件可能以 tombstone(value null)保留 key。下游 Flink 必须启用 deletion 语义解析。未 compact 的 topic 则保留完整 op=d envelope。


十一、Single Message Transform(SMT)与下游简化

Connect 链上常加 SMT 再入 Kafka:

SMT 作用
ExtractNewRecordState 展平 after,把 op 放入 header(__op
ExtractChangedRecordState 仅保留变更字段
ReadToInsertEvent 快照 rc
Filter / ContentBasedRouter 按库表路由

入湖管道若用 ExtractNewRecordState,Iceberg sink 仍须从 header 读 __op 区分 delete。Flink SQL 解析 Debezium JSON 时可用 DebeziumJsonDeserializationSchema 或官方 CDC connector 直接产出 RowKind+I/+U/-D 等)。


十二、与 lakehouse 第 19 章 upsert 的衔接

lakehouse/19 第四节说明:CDC 入湖要把 c/u/d/r 按主键 reduce 成当前状态,Iceberg 用 equality delete + 新行,Hudi 用 record index,Delta 用 MERGE。Debezium 层必须为这三条机制提供输入前提:

12.1 主键全局唯一

Debezium 只转发源库主键。分库分表若各库都有 id=1,Kafka key 冲突,湖表 upsert 互相覆盖。引擎侧需加 shard id 列或合成全局键(本系列第 17 篇写 Flink 侧 keyBy 与 bucket)。

12.2 顺序与幂等

语义 Debezium 提供 引擎/湖需提供
至少一次 emit 可能重复最后批次 checkpoint + 幂等 sink
分区内 per-key 顺序 主键 hash 到固定分区 同 key 进同一 Flink subtask
快照 vs 增量 op=r + source.snapshot upsert 或分阶段消费
Delete op=d + before PK equality delete

详细表格式提交与 equality delete 机制见 lakehouse/19 第四节第 10 篇行级删除

12.3 两条入湖路径对比

flowchart LR
  subgraph PathA["路径 A: Kafka 中转"]
    DB1[(MySQL)] --> DBZ[Debezium]
    DBZ --> K[Kafka]
    K --> FL1[Flink SQL]
    FL1 --> ICE1[Iceberg upsert]
  end
  subgraph PathB["路径 B: Flink CDC 直连"]
    DB2[(MySQL)] --> FL2[Flink CDC source]
    FL2 --> ICE2[Iceberg upsert]
  end

lakehouse/19 的 Flink SQL 示例走路径 B;本文架构图走路径 A。Upsert 表 DDL 与 write.upsert.enabled 对两条路径相同。


十三、生产故障模式(CDC 侧)

现象 可能原因 方向
Connector FAILED,提示 binlog 不可用 日志 purge,offset 过旧 when_needed 或重快照 + 湖幂等
Schema history 丢失 topic 被误删/compact recovery 模式
下游列对不齐 加列后未消费 schema change 更新 Flink/Iceberg schema
湖表重复主键 重快照 + append 表 改 upsert;检查 offset
PG 磁盘涨 replication slot 未消费 监控 slot lag,避免 orphan slot
快照时间过长 大表 blocking snapshot incremental snapshot + 信号触发

这些问题的 入湖侧 后果(小文件、commit 冲突)在本系列第 17 篇从 Flink 作业角度继续展开。


十四、边界

本文展开:

本文覆盖


参考资料

  1. Debezium Documentation, MySQL Connector(Event structure、Snapshot、snapshot.modeschema.history.internal.kafka.topic、incremental snapshot)。A 级。
  2. Debezium Documentation, PostgreSQL Connector(logical decoding、REPLICA IDENTITYsequence)。A 级。
  3. Apache Kafka Documentation, Kafka Connect(worker、offset/config/status storage、REST API)。A 级。
  4. MySQL Reference Manual, Binary Log;PostgreSQL Documentation, Logical Replication。A 级。
  5. 本系列:第 15 篇 两阶段提交与 EOS、第 17 篇 流式入湖深化
  6. 跨系列:lakehouse 第 19 章 流式写入与 CDC 入湖第 10 章 行级删除

返回 系列目录 · 上一篇 两阶段提交与端到端 Exactly-Once · 下一篇 流式入湖深化

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-07-01 · database / distributed

【流式数据处理】Kafka · Flink · 状态 · Exactly-Once

承接数据湖流式入湖:从 Kafka 日志与副本语义,到 Flink 事件时间、watermark、窗口、RocksDB 状态与 checkpoint,再到端到端 exactly-once 与 Debezium CDC 入湖。面向数据平台与实时工程师,补全批式湖仓之外的实时计算层。

2026-06-30 · database / storage

【数据湖与开放表格式】流式写入与 CDC 入湖

拆解流式数据进入 Iceberg/Delta/Hudi 的入湖侧机制:Flink/Kafka Connect/Spark sink 如何提交、exactly-once 怎样把引擎 checkpoint 与表格式的原子提交对齐、CDC 如何借 equality delete 与 record index 做 upsert,以及高频提交与小文件、compaction 的拉扯。只讲入湖侧,流处理引擎本身的窗口与状态留给后续。

2026-07-01 · database / distributed

【流式数据处理】事件时间、处理时间与 Watermark

拆解 event time、processing time、ingestion time 三种时间语义,给出 watermark 的形式化含义与 bounded-out-of-orderness 等生成策略,并说明侧输出、allowed lateness 如何处理迟到数据;附 event-time 与 processing-time 窗口对比的可复现实验步骤。


By .