土法炼钢兴趣小组的算法知识备份

【分布式系统百科】分布式日志:Kafka 的日志抽象与 Pulsar 的分层架构

文章导航

标签入口
#分布式系统#Kafka#Pulsar#BookKeeper#日志存储

目录

一、日志作为统一抽象

1.1 The Log:分布式系统的核心

Jay Kreps 在 2013 年的博客文章”The Log: What every software engineer should know about real-time data’s unifying abstraction”中提出了日志(Log)作为分布式系统基础抽象的思想。日志不是应用程序的调试日志,而是一个仅追加(append-only)、完全有序的记录序列。

日志的数学本质:

Log = [e₀, e₁, e₂, ..., eₙ]
其中 eᵢ 是第 i 个记录,按时间顺序排列
每个记录有唯一的序列号(offset)

日志提供了三个关键保证:

1.2 日志在分布式系统中的应用

日志抽象统一了多个分布式系统问题:

复制状态机(Replicated State Machine):将日志作为副本间同步的基础。每个副本按相同顺序应用日志中的操作,保证最终状态一致。

Primary: [op1, op2, op3, op4]
         ↓
Replica: [op1, op2, op3, op4]

消息队列:生产者向日志追加消息,消费者按顺序读取。日志天然保证了消息的顺序性和可重放性。

事件溯源(Event Sourcing):不存储当前状态,而是存储导致该状态的所有事件。当前状态通过重放日志计算得出。

CDC(Change Data Capture):数据库的变更日志(binlog、WAL)本质上就是一个日志,捕获所有修改操作。

1.3 从日志到流处理

日志的出现催生了流处理范式。传统批处理系统处理有界数据集,流处理系统处理无界日志:

批处理:[dataset] → process → [result]
流处理:[log stream] → continuous process → [continuous results]

Kafka 将日志抽象实现为分布式流平台,成为流处理生态的基础设施。

二、Kafka 架构深入

2.1 核心概念与架构

Kafka 的层次结构:

Topic
 ├── Partition 0 (Leader: Broker 1, Replicas: [1, 2, 3])
 ├── Partition 1 (Leader: Broker 2, Replicas: [2, 3, 1])
 └── Partition 2 (Leader: Broker 3, Replicas: [3, 1, 2])

主题(Topic):消息的逻辑分类。

分区(Partition):主题的物理分割单元,每个分区是一个有序的、不可变的日志文件序列。分区是 Kafka 并行性的基础。

副本(Replica):每个分区有多个副本分布在不同 Broker 上,提供容错能力。副本分为 Leader 和 Follower。

Broker:Kafka 集群中的服务器节点,存储分区副本并处理读写请求。

2.2 分区内部结构

每个分区在磁盘上由多个日志段(Log Segment)组成:

partition-0/
├── 00000000000000000000.log    (消息数据)
├── 00000000000000000000.index  (偏移量索引)
├── 00000000000000000000.timeindex (时间戳索引)
├── 00000000000000368769.log
├── 00000000000000368769.index
└── 00000000000000368769.timeindex

日志段文件名是该段第一条消息的偏移量。当段文件达到配置的大小(log.segment.bytes,默认 1GB)或时间(log.segment.ms)时,会创建新段并关闭旧段。

索引结构

.index 文件采用稀疏索引,存储 <offset, physical position> 映射:

Offset    Position
  0         0
  100       4096
  200       8192
  ...

查找偏移量 150 的消息: 1. 二分查找索引,找到最大的小于 150 的条目(offset=100, pos=4096) 2. 从位置 4096 顺序扫描到 offset 150

2.3 生产者写入流程

生产者发送消息到分区的详细流程:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("acks", "all");  // 等待所有 ISR 副本确认
props.put("retries", 3);
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", "true");  // 开启幂等性

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = 
    new ProducerRecord<>("my-topic", "key", "value");

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 处理失败
    } else {
        System.out.printf("Sent to partition %d with offset %d%n",
            metadata.partition(), metadata.offset());
    }
});

分区选择策略: - 指定分区:直接发送到指定分区 - 有 key:hash(key) % num_partitions - 无 key:轮询(Round-Robin)或粘性分区(Sticky Partitioning)

批处理与压缩

生产者在内存中按分区缓存消息,达到 batch.sizelinger.ms 超时后发送:

batch.size=16384          # 16KB 批次大小
linger.ms=10              # 最多等待 10ms
compression.type=snappy   # 压缩算法

2.4 ISR 机制与副本同步

Kafka ISR 机制

同步副本集(In-Sync Replica Set, ISR):与 Leader 保持同步的副本集合。副本必须满足以下条件才能在 ISR 中:

  1. 与 ZooKeeper/KRaft Controller 保持心跳
  2. replica.lag.time.max.ms(默认 10 秒)内从 Leader 拉取过消息
  3. replica.lag.time.max.ms 内追上了 Leader 的最新消息

高水位(High Watermark, HW):ISR 中所有副本都已复制的最大偏移量。只有 HW 之前的消息对消费者可见,保证已提交的消息不会因 Leader 切换而丢失。

LEO(Log End Offset):日志末端偏移量,副本中下一条消息的偏移量。

Leader:   [m0, m1, m2, m3, m4, m5]  LEO=6, HW=4
Follower1:[m0, m1, m2, m3, m4]      LEO=5
Follower2:[m0, m1, m2, m3]          LEO=4
                         ↑
                        HW (所有副本都已复制)

消费者只能读取到 offset 3 (HW-1)

ACK 配置

props.put("acks", "all");  // 或 "0", "1", "all"

最小 ISR 配置

min.insync.replicas=2  # ISR 至少要有 2 个副本

如果 ISR 副本数少于 min.insync.replicasacks=all,生产者写入会失败,保证数据安全性。

下面的时序图完整展示了一条消息从生产者写入到消费者可见的全过程,包括 ISR 确认和高水位推进机制:

sequenceDiagram
    participant P as Producer
    participant L as Leader Broker
    participant F1 as Follower-1
    participant F2 as Follower-2
    participant C as Consumer

    Note over P,C: acks=all, ISR={Leader, F1, F2}, min.insync.replicas=2
    P->>L: 发送消息 m5(offset=5)
    L->>L: 写入本地日志,LEO=6
    Note over L: HW 仍为 4(等待 Follower 追赶)
    F1->>L: Fetch 请求(fetch_offset=5)
    L-->>F1: 返回 m5 + 当前 HW=4
    F1->>F1: 写入本地日志,LEO=6
    F2->>L: Fetch 请求(fetch_offset=5)
    L-->>F2: 返回 m5 + 当前 HW=4
    F2->>F2: 写入本地日志,LEO=6
    Note over L: 所有 ISR 副本 LEO >= 6
    L->>L: 推进 HW=6
    L-->>P: ACK(acks=all 满足)
    C->>L: Fetch 请求(fetch_offset=5)
    L-->>C: 返回 m5(offset=5 < HW=6,可见)

该时序图揭示了 Kafka 消息可见性的核心机制:消息写入 Leader 后并不立即对消费者可见,必须等待所有 ISR 副本通过 Fetch 请求拉取并确认后,Leader 才会推进高水位。消费者只能读取 HW 之前的消息,这保证了即使 Leader 宕机,已对消费者可见的消息不会丢失。

2.5 Leader 选举与故障恢复

当分区 Leader 失败时,Controller 从 ISR 中选举新 Leader:

  1. Controller 从 ISR 列表中选择第一个存活的副本作为新 Leader
  2. 更新 ISR 列表,移除失败的副本
  3. 通知所有 Broker 新的 Leader 和 ISR 信息
  4. 新 Leader 开始处理读写请求

Unclean Leader Election

如果所有 ISR 副本都不可用,可以配置是否允许非 ISR 副本成为 Leader:

unclean.leader.election.enable=false  # 默认 false,优先可用性设为 true

2.6 故障场景实战推演:Leader Broker 宕机

为了深入理解 Kafka 的故障恢复机制,我们以一个具体场景完整推演 Leader Broker 宕机后的全链路恢复过程。

初始状态:Topic-A Partition-0,副本分布在 Broker-1(Leader)、Broker-2、Broker-3,ISR = {1, 2, 3},当前 HW = 1000,Leader LEO = 1005。

第一阶段:故障发生

Broker-1 因硬件故障突然下线。此时 Broker-2 的 LEO = 1003,Broker-3 的 LEO = 1001。offset 1001-1004 的消息尚未被所有 ISR 确认(未超过 HW),offset 1000 及之前的消息已提交。

第二阶段:Controller 检测与选举

KRaft Controller(或 ZooKeeper)在心跳超时(默认 broker.session.timeout.ms = 18s)后检测到 Broker-1 失联。Controller 从 ISR 列表中选择 Broker-2 作为新 Leader(ISR 中第一个存活节点)。Controller 更新元数据:Partition-0 的 Leader 变为 Broker-2,ISR 缩小为 {2, 3}。

第三阶段:日志截断与一致性恢复

新 Leader Broker-2 将 HW 设为 min(自身 LEO, 旧 HW) = min(1003, 1000) = 1000。Broker-3 发现自己的 LEO(1001) > 新 Leader 的 HW(1000),截断 offset 1000 之后的日志,LEO 退回到 1000。随后 Broker-3 从 Broker-2 拉取 offset 1000-1002 的消息,LEO 追赶到 1003。

第四阶段:生产者重试

生产者收到网络错误后,根据 retries 配置进行重试。幂等性生产者(enable.idempotence=true)通过 PID + Sequence Number 保证重试不会产生重复消息。生产者通过元数据刷新发现新 Leader 是 Broker-2,后续消息发送到 Broker-2。

第五阶段:消费者再平衡

消费者组检测到分区 Leader 变更,触发 Rebalance。消费者从新 Leader Broker-2 继续消费。由于消费者只能读取 HW 之前的消息,不会读到未提交的数据,因此不存在消息一致性问题。消费者从上次提交的 offset 继续消费即可。

关键结论:在 acks=all + min.insync.replicas=2 的配置下,HW 之前的已提交消息在此故障场景中零丢失。HW 与 LEO 之间的未提交消息(offset 1001-1004)可能丢失,但这些消息从未对生产者确认成功,生产者会通过重试机制重新发送。

2.7 日志压缩机制

除了基于时间或大小的日志保留策略(log.retention.hours),Kafka 还提供日志压缩(Log Compaction)机制,为每个 Key 只保留最新的 Value。

适用场景:当 Topic 作为变更日志(Changelog)使用时,例如数据库 CDC 流、KV 缓存同步、用户配置存储等。消费者需要的是每个 Key 的最终状态,而非完整的变更历史。

压缩过程

Kafka 后台线程 Log Cleaner 负责压缩。每个分区的日志被分为两部分:

|<--- Clean 区域 --->|<--- Dirty 区域 --->|
[已压缩的消息段]        [待压缩的新消息段]
  每个 Key 唯一          可能有重复 Key

压缩算法的核心步骤:

  1. 扫描 Dirty 区域,构建 offset map:{key -> latest_offset}
  2. 遍历整个日志(Clean + Dirty),对于每个消息,如果其 offset 不是该 key 的最新 offset,则标记为可删除
  3. 生成新的压缩日志段,仅保留每个 Key 的最新版本
  4. 值为 null 的消息(tombstone)在保留 delete.retention.ms(默认 24 小时)后被彻底删除

配置

log.cleanup.policy=compact            # 启用压缩
log.cleaner.min.cleanable.ratio=0.5   # Dirty 比例超过 50% 时触发压缩
log.cleaner.threads=2                 # 压缩线程数
min.compaction.lag.ms=0               # 消息写入后多久才能被压缩
delete.retention.ms=86400000          # tombstone 保留 24 小时

注意事项:压缩不保证实时性——Dirty 区域中的重复 Key 在压缩完成前仍然存在。活跃段(Active Segment)永远不会被压缩,只有已关闭的段才参与压缩。消费者在压缩前后读取到的消息顺序可能略有不同,但每个 Key 的最终状态保持一致。

三、Kafka Exactly-Once 语义

3.1 幂等性生产者

Kafka 0.11 引入幂等性生产者,解决网络重试导致的消息重复问题:

props.put("enable.idempotence", "true");

实现原理:

  1. Producer 启动时向 Broker 申请唯一的 Producer ID(PID)
  2. 每条消息携带 PID 和单调递增的 Sequence Number
  3. Broker 为每个 <PID, Partition> 维护最后写入的 Sequence Number
  4. 如果新消息的 Sequence Number <= 已存储的值,Broker 丢弃重复消息并返回成功
Producer (PID=100):
  send(partition=0, seq=0, msg="A") ✓
  send(partition=0, seq=1, msg="B") ✓
  send(partition=0, seq=1, msg="B") ✗ 重复,丢弃

Broker 内存:
  partition-0: {PID=100: last_seq=1}

限制: - 幂等性仅保证单分区、单会话(Producer 重启后 PID 改变) - 必须设置 max.in.flight.requests.per.connection <= 5 - 必须设置 retries > 0

3.2 事务 API

事务 API 提供跨分区、跨会话的 Exactly-Once 语义:

Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-transactional-id");  // 必须唯一

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();  // 初始化事务

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic-A", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic-B", "key2", "value2"));
    
    // 提交消费位移(用于 consume-transform-produce 模式)
    Map<TopicPartition, OffsetAndMetadata> offsets = ...;
    producer.sendOffsetsToTransaction(offsets, "consumer-group-id");
    
    producer.commitTransaction();  // 原子提交
} catch (Exception e) {
    producer.abortTransaction();  // 回滚
}

事务协调器(Transaction Coordinator)

每个 transactional.id 映射到一个 Transaction Coordinator(类似 Consumer Group Coordinator)。Coordinator 管理事务状态,存储在内部主题 __transaction_state 中。

两阶段提交流程

  1. Begin:Producer 向 Coordinator 发送 BeginTransaction 请求
  2. AddPartitions:Producer 写入分区前,先向 Coordinator 注册该分区
  3. Prepare:Producer 调用 commitTransaction(),Coordinator 将事务状态改为 PrepareCommit,写入 __transaction_state
  4. Commit:Coordinator 向所有参与的分区写入事务标记(Transaction Marker),标记消息为已提交
  5. Complete:所有分区确认后,Coordinator 将事务状态改为 CompleteCommit

消费者配置:

props.put("isolation.level", "read_committed");  // 只读已提交的消息

消费者会过滤掉未提交或已回滚的消息,只看到已提交事务的消息。

3.3 Exactly-Once 的成本

事务机制带来额外开销:

典型场景:

四、KRaft 模式:移除 ZooKeeper

4.1 从 ZooKeeper 到 KRaft 的演进

Kafka 长期依赖 ZooKeeper 管理 Broker 成员、主题配置、Controller 选举和 ACL 等元数据,但 ZooKeeper 引入了运维复杂性(需维护两套分布式系统)、扩展瓶颈(10 万+分区时元数据操作性能显著下降)、故障恢复慢(Controller 从 ZooKeeper 加载全量元数据耗时可达分钟级)等问题。

KIP-500 提出使用 Raft 共识算法管理元数据,完全移除 ZooKeeper 依赖。Kafka 3.3 起 KRaft 模式标记为生产就绪。其核心架构包含三个组件:Quorum Controller(基于 Raft 的 Controller 集群)、元数据日志__cluster_metadata 主题,通过 Raft 复制)、以及每个 Broker 维护的 Metadata Cache(通过订阅元数据日志保持同步)。

节点可配置为纯 Controller、纯 Broker 或混合模式:

process.roles=controller          # 纯 Controller 节点
process.roles=broker              # 纯 Broker 节点
process.roles=broker,controller   # 混合模式节点
controller.quorum.voters=1@host1:9093,2@host2:9094,3@host3:9095

4.2 元数据管理与快照

元数据变更流程:管理请求发送到 Active Controller,经验证后生成元数据记录,通过 Raft 复制到多数派并提交。Broker 通过 Fetch 请求获取元数据日志增量更新,应用到本地缓存。元数据日志定期生成快照避免无限增长,Controller 启动时加载最新快照后重放后续日志。

4.3 KRaft Controller 故障切换

当 Active Controller 宕机时,Raft 协议自动触发选举。以下时序图展示了 Controller 故障切换的完整过程:

sequenceDiagram
    participant C1 as Controller-1<br/>(Active)
    participant C2 as Controller-2<br/>(Follower)
    participant C3 as Controller-3<br/>(Follower)
    participant B as Broker 集群

    Note over C1,B: 正常运行:C1 为 Active Controller
    C1->>C2: Raft 心跳
    C1->>C3: Raft 心跳
    C1-xC1: Controller-1 宕机
    Note over C2,C3: 心跳超时(election.timeout.ms)
    C2->>C3: RequestVote(term=2)
    C3-->>C2: VoteGranted
    Note over C2: C2 当选新 Active Controller
    C2->>C2: 加载元数据快照 + 重放日志
    C2->>B: 推送元数据更新通知
    B->>C2: Fetch 元数据增量
    C2-->>B: 返回最新元数据
    Note over C2,B: 服务恢复,切换耗时通常 < 10秒

该时序图展示了 KRaft 模式下 Controller 故障切换的三个关键阶段:心跳超时触发选举、Raft 投票完成 Leader 切换、新 Controller 加载状态并通知 Broker 集群。相比 ZooKeeper 时代分钟级的恢复时间,KRaft 的故障切换通常在 10 秒以内完成,这得益于 Raft 协议的高效选举机制和元数据快照的快速加载能力。

4.4 KRaft 的核心收益

KRaft 带来三方面改进:性能方面,Controller 故障恢复从分钟级降到秒级,元数据变更延迟显著降低,可支持数百万分区;运维方面,消除了 ZooKeeper 依赖,只需部署和监控单一系统;一致性方面,Kafka 完全掌控元数据的一致性语义,避免了 ZooKeeper ZAB 协议与 Kafka 副本协议之间的语义差异。

五、Pulsar 分层架构

5.1 架构哲学

Pulsar 采用计算与存储分离的架构,由 Apache BookKeeper 提供持久化存储:

Pulsar 架构

架构层次

┌─────────────────────────────────────────┐
│  Producers / Consumers (Clients)        │
└────────────────┬────────────────────────┘
                 │
┌────────────────▼────────────────────────┐
│  Pulsar Brokers (Stateless)             │
│  - 路由与负载均衡                         │
│  - 协议处理                              │
│  - 缓存与分发                            │
└────────────────┬────────────────────────┘
                 │
┌────────────────▼────────────────────────┐
│  Apache BookKeeper (Stateful)           │
│  - 持久化存储                            │
│  - 分片与复制                            │
│  - 一致性保证                            │
└─────────────────────────────────────────┘

5.2 Broker 层:无状态服务

Pulsar Broker 不存储数据,只负责:

主题所有权

每个主题由一个 Broker 负责(单写多读)。主题到 Broker 的映射存储在元数据存储(ZooKeeper/etcd)中。Broker 故障时,其负责的主题快速重新分配到其他 Broker。

Metadata Store:
  persistent://tenant/namespace/topic-1 -> broker-A
  persistent://tenant/namespace/topic-2 -> broker-B
  persistent://tenant/namespace/topic-3 -> broker-A

Broker 宕机恢复:

  1. Broker-A 宕机
  2. 其他 Broker 检测到故障(通过心跳)
  3. Topic-1 和 Topic-3 重新分配给 Broker-B 和 Broker-C
  4. 新 Broker 从 BookKeeper 加载主题元数据,立即提供服务(无需复制数据)

5.3 BookKeeper:分段日志存储

Pulsar 主题的日志分割为多个 Ledger(账本),每个 Ledger 是一个不可变的日志段。

Ledger 结构

Topic: persistent://public/default/my-topic
├── Ledger 123 [entry 0..999]    (已关闭)
├── Ledger 456 [entry 0..999]    (已关闭)
└── Ledger 789 [entry 0..450]    (活跃中)

当 Ledger 达到配置的大小或时间后关闭(immutable),创建新 Ledger。

Ledger 写入流程

// Pulsar Broker 写入消息
BookKeeper bk = ...;
LedgerHandle lh = bk.createLedger(
    3,  // 集合大小 (Ensemble)
    3,  // 写 Quorum 大小
    2,  // 确认 Quorum 大小
    DigestType.CRC32C,
    password
);

byte[] data = message.serialize();
lh.addEntry(data);  // 异步写入,返回 Entry ID

lh.close();  // 关闭 Ledger

5.4 BookKeeper Quorum 协议

BookKeeper 使用 Quorum 协议保证持久性和可用性:

三个关键参数

配置示例:E=5, Qw=3, Qa=2

写入流程

Entry 0: Bookies [A, B, C] -> 等待 2 个 ACK
Entry 1: Bookies [B, C, D] -> 等待 2 个 ACK
Entry 2: Bookies [C, D, E] -> 等待 2 个 ACK
Entry 3: Bookies [D, E, A] -> 等待 2 个 ACK
Entry 4: Bookies [E, A, B] -> 等待 2 个 ACK

Entry 按轮询方式分布到 E 个 Bookie,每个 Entry 写入 Qw 个副本,等待 Qa 个确认后返回成功。

持久性保证

只要至少 Qw - Qa + 1 个副本存活,数据就不会丢失。例如 Qw=3, Qa=2,至少需要 2 个副本存活。

可用性保证

读取 Entry 需要联系 Qw 个 Bookie 中的任意 Qa 个。只要少于 Qw - Qa + 1 个 Bookie 故障,就能读取数据。

5.5 Ledger 恢复

当 Bookie 故障导致某些 Entry 副本不足时,触发 Ledger 恢复:

  1. 检测到 Bookie 不可用
  2. 关闭该 Bookie 上的所有活跃 Ledger
  3. 对每个 Ledger,从其他副本读取 Entry
  4. 将缺失的副本写入新的 Bookie
  5. 更新 Ledger 元数据

Fencing

关闭 Ledger 前需要 Fencing,防止旧 Writer 继续写入:

1. Client 向所有 Bookie 发送 Fence 请求
2. Bookie 标记 Ledger 为 Fenced,拒绝后续写入
3. Client 确认 Ledger 最后的 Entry ID
4. 恢复缺失的 Entry 副本

5.6 分层存储(Tiered Storage)

Pulsar 支持将旧数据卸载到对象存储(S3、GCS、Azure Blob):

# namespace 配置
pulsar-admin namespaces set-offload-policies \
  --bucket my-bucket \
  --region us-west-2 \
  --offloadThresholdInSeconds 3600 \
  public/default

数据生命周期:

Hot:  Recent data in BookKeeper (fast access)
Warm: Offloaded to Object Storage (slower, cheaper)
Cold: Archived or deleted

Broker 自动处理跨层读取,对客户端透明:

Consumer read offset 1000:
  1. Broker 检查 Ledger 是否 offloaded
  2. 如果在 BookKeeper,直接读取
  3. 如果已 offloaded,从对象存储读取

六、Kafka vs Pulsar 对比

6.1 架构对比

Kafka

Pulsar

6.2 性能特征

吞吐量

Kafka 的单节点设计在高吞吐量场景下效率更高,避免了网络往返。Pulsar 的分层架构增加了 Broker 到 BookKeeper 的网络延迟。

基准测试(OpenMessaging Benchmark):

Kafka (3 brokers, RF=3):
  - Throughput: ~600MB/s
  - Latency P99: 15ms

Pulsar (3 brokers, 3 bookies, Qw=3, Qa=2):
  - Throughput: ~400MB/s
  - Latency P99: 25ms

延迟

Kafka 的 Leader 副本处理读写,延迟较低。Pulsar 需要跨层通信,延迟稍高。

可扩展性

Pulsar 在需要动态扩展的场景下优势明显。Kafka 的分区迁移(数据复制)耗时长,Pulsar 只需重新分配 Topic 所有权。

6.3 运维复杂度

Kafka

Pulsar

6.4 多租户与隔离

Pulsar

原生支持多租户:

persistent://{tenant}/{namespace}/{topic}

Tenant: 组织级隔离
Namespace: 应用级隔离,配置独立的策略
Topic: 具体主题

每个 Namespace 可配置独立的:

Kafka

多租户支持有限,主要通过 ACL 和配额实现隔离。缺少 Namespace 层次,大规模多租户场景下管理困难。

6.5 消息模型

Kafka

消费者组(Consumer Group)模型,一个分区只能被组内一个消费者消费。

Partition 0 -> Consumer A (Group 1)
Partition 1 -> Consumer B (Group 1)

Pulsar

支持多种订阅模式:

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscriptionType(SubscriptionType.Key_Shared)  // 选择订阅类型
    .subscribe();

6.6 生态系统

Kafka

Pulsar

6.7 故障恢复机制深度对比

Kafka 和 Pulsar 在故障恢复上的差异源于架构本质的不同。

Kafka Broker 故障:Leader Broker 宕机后,该 Broker 上所有 Leader 分区需要重新选举。新 Leader 的数据取决于 ISR 中 Follower 的同步进度,可能需要截断日志以保证一致性。恢复后的旧 Broker 作为 Follower 重新加入,需要从新 Leader 拉取缺失的数据,追赶 LEO。如果数据量大,追赶过程可能持续数分钟到数小时。

Pulsar Broker 故障:由于 Broker 无状态,故障恢复仅需将 Topic 所有权转移到其他 Broker,耗时通常在秒级。无需数据迁移或副本同步,新 Broker 直接从 BookKeeper 读取数据为消费者服务。

BookKeeper Bookie 故障:某个 Bookie 宕机后,当前活跃 Ledger 需要 Fencing(防止旧 Writer 继续写入),然后创建新 Ledger 选择其他健康 Bookie。已关闭 Ledger 中存储在故障 Bookie 上的 Entry 副本数降低,系统后台触发 Auto-Recovery,从其他 Bookie 读取这些 Entry 并复制到新 Bookie,恢复副本因子。这个过程对客户端完全透明。

核心差异:Kafka 的恢复涉及数据重新同步,恢复时间与数据量成正比;Pulsar 的计算层(Broker)恢复与数据无关,存储层(BookKeeper)恢复可独立后台进行。在需要快速弹性伸缩和频繁故障切换的云原生环境下,Pulsar 的分层架构具有显著优势。

七、实战配置与调优

7.1 Kafka 生产配置

Broker 配置

# 数据目录
log.dirs=/data/kafka-logs-1,/data/kafka-logs-2

# 副本配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

# 日志保留
log.retention.hours=168       # 7 天
log.segment.bytes=1073741824  # 1GB
log.retention.check.interval.ms=300000

# 性能调优
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
num.replica.fetchers=4

# JVM 配置
-Xms6g -Xmx6g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35

生产者调优

props.put("batch.size", 32768);           // 32KB 批次
props.put("linger.ms", 10);               // 等待 10ms 凑批
props.put("compression.type", "lz4");     // LZ4 压缩
props.put("buffer.memory", 67108864);     // 64MB 缓冲区
props.put("max.in.flight.requests.per.connection", 5);

消费者调优

props.put("fetch.min.bytes", 1024);           // 至少 1KB 才返回
props.put("fetch.max.wait.ms", 500);          // 最多等待 500ms
props.put("max.partition.fetch.bytes", 1048576); // 每个分区最多 1MB
props.put("session.timeout.ms", 10000);       // 会话超时 10s
props.put("max.poll.records", 500);           // 每次 poll 最多 500 条

7.2 Pulsar 生产配置

Broker 配置

# ZooKeeper
zookeeperServers=zk1:2181,zk2:2181,zk3:2181

# 集群
clusterName=pulsar-cluster
brokerServicePort=6650
webServicePort=8080

# 存储
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=3
managedLedgerDefaultAckQuorum=2
managedLedgerMaxEntriesPerLedger=50000
managedLedgerMinLedgerRolloverTimeMinutes=10

# 缓存
managedLedgerCacheSizeMB=2048
managedLedgerCacheEvictionWatermark=0.9

# 性能
numIOThreads=16
numWorkerThreads=16

BookKeeper 配置

# 存储
journalDirectory=/data/bk/journal
ledgerDirectories=/data/bk/ledgers

# 写入
journalSyncData=true
journalMaxGroupWaitMSec=1
journalBufferedWritesThreshold=524288  # 512KB

# Ledger 缓存
ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage
dbStorage_writeCacheMaxSizeMb=2048
dbStorage_readAheadCacheMaxSizeMb=1024

# 压缩
compactionRateByBytes=10485760  # 10MB/s
compactionMaxOutstandingRequests=100000

7.3 监控指标

Kafka 关键指标

# Broker
- UnderReplicatedPartitions:副本不足的分区数
- OfflinePartitionsCount:离线分区数
- ActiveControllerCount:Active Controller 数量(应为 1)
- LeaderElectionRateAndTimeMs:Leader 选举速率和耗时

# Producer
- record-send-rate:消息发送速率
- record-error-rate:错误率
- request-latency-avg:请求平均延迟

# Consumer
- records-lag-max:最大消费延迟(条数)
- fetch-rate:拉取速率

Pulsar 关键指标

# Broker
- pulsar_topics_count:主题数量
- pulsar_rate_in:流入速率
- pulsar_rate_out:流出速率
- pulsar_storage_size:存储大小
- pulsar_msg_backlog:消息积压

# BookKeeper
- bookie_write_bytes:写入字节数
- bookie_read_bytes:读取字节数
- bookie_journal_sync_latency:Journal 同步延迟
- bookie_ledger_write_latency:Ledger 写入延迟

八、总结

分布式日志系统是现代数据架构的基石。Kafka 和 Pulsar 代表了两种不同的设计哲学:

Kafka: - 紧耦合架构,Broker 直接管理存储 - 极致的性能和吞吐量 - 成熟的生态和广泛的应用 - KRaft 模式简化运维 - 适合高吞吐量、性能敏感的场景

Pulsar: - 计算存储分离,Broker 无状态 - 灵活的扩展性和快速恢复 - 多租户和多种订阅模式 - 分层存储降低成本 - 适合云原生、多租户、需要弹性的场景

选择取决于具体需求: - 性能优先:Kafka - 弹性优先:Pulsar - 生态需求:Kafka(更成熟) - 多租户需求:Pulsar(原生支持) - 运维成熟度:都需要深入理解,Kafka 相对简单(KRaft 后)

两者都在持续演进:Kafka 引入 KRaft 和 Tiered Storage,Pulsar 提升性能和完善生态。分布式日志的未来是更高的性能、更低的延迟、更灵活的架构、更简单的运维。

参考资料

  1. Jay Kreps. “The Log: What every software engineer should know about real-time data’s unifying abstraction”. LinkedIn Engineering Blog, 2013.
  2. Apache Kafka Documentation. https://kafka.apache.org/documentation/
  3. KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
  4. Apache Pulsar Documentation. https://pulsar.apache.org/docs/
  5. Apache BookKeeper Documentation. https://bookkeeper.apache.org/docs/
  6. Sijie Guo, et al. “Apache BookKeeper: A High Performance and Low Latency Storage Service”. VLDB 2020.
  7. OpenMessaging Benchmark Framework. https://github.com/openmessaging/benchmark
  8. Confluent Blog: “Exactly-once Semantics are Possible: Here’s How Kafka Does it”. https://www.confluent.io/blog/exactly-once-semantics-are-possible/

上一篇:分布式事务实战对比 下一篇:Dynamo 论文精读

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】消息队列架构:异步解耦的设计与陷阱

在分布式系统中,服务之间的直接同步调用会导致强耦合、级联故障和性能瓶颈。消息队列(Message Queue)作为异步通信的核心基础设施,在现代架构中承担着解耦、削峰、容错等关键职责。然而,引入消息队列并非没有代价——投递语义的选择、顺序性保证、消费者组再平衡、幂等消费等问题,每一个都隐藏着工程陷阱。本文将从原理到实践…

2026-04-13 · architecture

【系统架构设计百科】零拷贝与内存映射:数据搬运的极致优化

一次普通的文件传输在 Linux 内核中要经历 4 次数据拷贝和 4 次上下文切换。sendfile、splice、mmap、io_uring、DPDK 各自用不同的方式缩减这条路径,但每种方案都有自己的使用条件和工程限制。本文从 Linux 内核的数据搬运路径出发,拆解五种零拷贝(Zero-Copy)技术的机制与取舍,结合 Kafka、Nginx、DPDK 的工程实践,讨论什么场景该用、什么场景不该用。

2026-04-13 · architecture

【系统架构设计百科】事件驱动架构:从消息通知到事件溯源

事件通知、事件携带状态转移、事件溯源三种模式经常被混为一谈,但它们在耦合度、数据一致性、存储成本和调试难度上有本质差异。本文基于 Martin Fowler 的 EDA 分类,拆解三种模式的机制与取舍,分析 Kafka 在事件驱动架构中的角色与局限,讨论事件排序的工程挑战和 schema 演进策略。

2026-04-13

【分布式系统百科】Dynamo 论文精读:最终一致性的工业级范本

2007 年,Amazon 在 SOSP 会议上发表了《Dynamo: Amazon's Highly Available Key-value Store》论文,这篇论文彻底改变了分布式存储系统的设计思路。与追求强一致性的传统数据库不同,Dynamo 选择了一条完全不同的道路:牺牲一致性,换取可用性和分区容错性。这个设…


By .