土法炼钢兴趣小组的算法知识备份

【LSM-Tree】完整引擎 + Rust 重写对比

源码下载

本文相关源码已整理,共 14 个文件。

打开下载目录 →

目录

前四篇分别实现了 WAL、MemTable、SSTable、Bloom Filter、Compaction 和 Version 管理。每个组件独立可测,但它们还不是一个可用的数据库——缺少统一的对外接口、读写并发控制和崩溃恢复组装。

本篇不是继续堆组件,而是第一次把语义闭环(6 个公开 API 跑通完整生命周期)、工程边界(教学实现 vs LevelDB 差在哪里)和语言取舍(C 手动管理 vs Rust 编译期约束)放在同一篇里观察。具体分三部分:

  1. Part A:用 C 把所有组件组装成完整引擎,提供 db_open / db_put / db_get / db_delete / db_iterator / db_snapshot 六个公开 API,含单写多读并发控制和崩溃恢复。
  2. Part B:用 Rust 重写核心模块(SkipList + WAL + SSTable + DB 接口),记录 5 个”编译器不让我过”的真实故事。
  3. Part C:C / Rust / LevelDB 三方 benchmark 对比,拆解架构选择对性能的影响。

本文是「从零写一个 LSM-Tree 存储引擎」系列的第 5 篇(共 5 篇)。 如果你是从搜索引擎直接来到这里的,建议先看 系列目录 了解全貌。

篇目 核心内容 状态
第 1 篇 · 全景 B-Tree vs LSM-Tree、三种放大的数学推导 前置知识
第 2 篇 · WAL + MemTable 跳表实现、WAL 32KB 分片、崩溃恢复证明 前置知识
第 3 篇 · SSTable + Bloom Filter 前缀压缩、双重哈希 Bloom、Builder/Reader 前置知识
第 4 篇 · Compaction 多路归并、Version/MANIFEST、策略对比 前置知识
第 5 篇 · 完整引擎 + Rust 重写 DB API + 并发 + Rust 重写 + benchmark

Part A:C 完整引擎

本节交付范围:完整的 DB 接口(Open / Put / Get / Delete / Iterator / Snapshot)+ 单写多读并发控制 + 崩溃恢复。所有代码复用前四篇已实现的函数;为突出主线,本节做了以下简化:

  • InternalKey:MemTable 和 SSTable 内部使用 [user_key | sequence(7B) | type(1B)] 格式的 InternalKey;本节代码中省略了编码/解码细节,memtable_put/gettable_reader_get 的完整签名含 snapshot_seq 参数用于版本过滤。
  • Table Cache / Block Cache:未实现。每次 Get() 打开文件并解析 Footer,这是读性能远低于 LevelDB 的主要原因(Part C 会量化这个差距)。
  • WriteBatch / Group Commit:未实现。写入路径是单条串行写入。

DB 内部结构

DB 内部架构

上图是完整引擎的内部结构。核心思想是将前四篇实现的组件聚合到一个 DB 结构体中,并通过互斥锁和后台线程协调写入、Flush 与 Compaction。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/stat.h>
#include <dirent.h>
#include <unistd.h>
#include <zlib.h>

// ===== 前文 API 声明(第 2~4 篇已实现)=====
// WAL (Art.2): wal_writer_append(), wal_reader_read_record()
// MemTable (Art.2): memtable_put(), memtable_get(), SkipList, Arena
// SSTable (Art.3): table_builder_*(), table_reader_*(), bloom_*()
// Compaction (Art.4): flush_memtable_to_l0(), do_compaction(),
//  compaction_score(), merge_iter_*(), version_*(), manifest_*()

#define kWriteBufferSize (4 * 1024 * 1024) // 4MB
#define kTypePut  1
#define kTypeDeletion 2

typedef struct Snapshot Snapshot;

typedef struct {
  size_t write_buffer_size;  // MemTable 大小阈值
  int  bloom_bits_per_key;  // Bloom Filter 参数
  int  max_open_files;  // 最大打开文件数
  int  sync;  // 每次写入是否 fsync
} DB_Options;

typedef struct {
  Snapshot *snap; // NULL = 读当前最新
} ReadOptions;

typedef struct {
  int sync;  // 是否对本次写入做 fsync
} WriteOptions;

DB_Options 控制引擎行为——write_buffer_size 决定 MemTable 何时 freeze,bloom_bits_per_key 影响 Bloom Filter 的误判率,sync 控制 WAL 是否每次 fsync

DB 核心结构体

struct Snapshot {
  uint64_t  sequence;  // 快照对应的序列号
  Version  *version;  // 引用的 Version(ref_count++)
  Snapshot *prev, *next;
};

typedef struct {
  char  db_dir[512];
  DB_Options  options;

  // --- 内存层 ---
  MemTable  *mem;  // Mutable MemTable(接收写入)
  MemTable  *imm;  // Immutable MemTable(等待 flush)

  // --- WAL ---
  WALWriter  wal;
  uint64_t  wal_file_num;

  // --- 版本管理 ---
  Version  *current;  // 当前活跃版本
  FILE  *manifest_fp;  // MANIFEST 文件指针
  uint64_t  next_file_num; // 全局文件编号分配器
  uint64_t  last_seq;  // 最新序列号

  // --- 并发控制 ---
  pthread_mutex_t mu;  // 全局互斥锁
  pthread_cond_t  bg_cv;  // 后台线程条件变量
  pthread_cond_t  write_cv;  // 写阻塞等待条件变量
  pthread_t  bg_thread;  // 后台 compaction 线程
  int  bg_running;  // 后台线程是否在运行
  int  shutting_down;

  // --- Snapshot 链表 ---
  Snapshot  snap_head;  // 哨兵节点
} DB;

几个设计要点:


db_open():启动恢复

db_open() 恢复流程

引擎启动需要恢复崩溃前的状态。恢复流程分三步:读 MANIFEST 恢复 Version → 重放 WAL 恢复内存数据 → 创建新 MemTable 和 WAL。

// 辅助:读 CURRENT 文件获取 MANIFEST 文件名
static int read_current_file(const char *db_dir, char *manifest_name, size_t cap) {
  char path[512];
  snprintf(path, sizeof(path), "%s/CURRENT", db_dir);
  FILE *fp = fopen(path, "r");
  if (!fp) return -1;
  if (!fgets(manifest_name, (int)cap, fp)) { fclose(fp); return -1; }
  // 去掉末尾换行
  size_t len = strlen(manifest_name);
  if (len > 0 && manifest_name[len - 1] == '\n') manifest_name[len - 1] = '\0';
  fclose(fp);
  return 0;
}

// 辅助:写 CURRENT 文件
static void write_current_file(const char *db_dir, const char *manifest_name) {
  char tmp[512], path[512];
  snprintf(tmp, sizeof(tmp), "%s/CURRENT.tmp", db_dir);
  snprintf(path, sizeof(path), "%s/CURRENT", db_dir);
  FILE *fp = fopen(tmp, "w");
  fprintf(fp, "%s\n", manifest_name);
  fflush(fp); fsync(fileno(fp)); fclose(fp);
  rename(tmp, path);
}

int db_open(const char *db_dir, const DB_Options *opts, DB **out) {
  DB *db = calloc(1, sizeof(DB));
  snprintf(db->db_dir, sizeof(db->db_dir), "%s", db_dir);
  db->options = *opts;
  if (db->options.write_buffer_size == 0)
  db->options.write_buffer_size = kWriteBufferSize;

  mkdir(db_dir, 0755);
  pthread_mutex_init(&db->mu, NULL);
  pthread_cond_init(&db->bg_cv, NULL);
  pthread_cond_init(&db->write_cv, NULL);

  // 初始化 Snapshot 哨兵
  db->snap_head.prev = &db->snap_head;
  db->snap_head.next = &db->snap_head;

  // --- 恢复 Version ---
  db->current = calloc(1, sizeof(Version));
  version_init(db->current);

  char manifest_name[256] = {0};
  int has_manifest = (read_current_file(db_dir, manifest_name, sizeof(manifest_name)) == 0);

  if (has_manifest) {
  char mpath[512];
  snprintf(mpath, sizeof(mpath), "%s/%s", db_dir, manifest_name);
  manifest_recover(mpath, db->current);
  db->next_file_num = db->current->next_file_number;
  } else {
  db->next_file_num = 1;
  }

  // --- 重放 WAL ---
  // 扫描目录,找到编号 >= next_file_num 的 WAL 文件(MANIFEST 之后的)
  // 简化:查找所有 .wal 文件
  DIR *dir = opendir(db_dir);
  if (dir) {
  struct dirent *ent;
  while ((ent = readdir(dir)) != NULL) {
  uint64_t fnum;
  if (sscanf(ent->d_name, "%06lu.wal", &fnum) == 1) {
  char wal_path[512];
  snprintf(wal_path, sizeof(wal_path), "%s/%s", db_dir, ent->d_name);

  MemTable *recovery_mem = memtable_new();
  WALReader reader;
  wal_reader_open(&reader, wal_path);
  char *rec; size_t rec_len;
  while (wal_reader_read_record(&reader, &rec, &rec_len) == 0) {
  // 解码 KV record: [type(1) | klen(4) | key | vlen(4) | val]
  uint8_t type = (uint8_t)rec[0];
  uint32_t klen, vlen;
  memcpy(&klen, rec + 1, 4);
  const char *key = rec + 5;
  memcpy(&vlen, rec + 5 + klen, 4);
  const char *val = rec + 9 + klen;
  memtable_put(recovery_mem, type, key, klen, val, vlen);
  db->last_seq++;
  }
  wal_reader_close(&reader);

  // Flush 恢复的数据到 L0
  if (recovery_mem->table->count > 0) {
  FileMetaData meta;
  // 遍历 skiplist 构造 keys/vals 数组,flush 到 L0
  // flush_memtable_to_l0() 复用第 4 篇实现
  flush_skiplist_to_l0(db, recovery_mem, &meta);

  VersionEdit ve;
  version_edit_init(&ve);
  version_edit_add_file(&ve, 0, &meta);
  ve.next_file_number = db->next_file_num;
  ve.has_next_file_number = 1;
  version_apply(db->current, &ve);
  }
  memtable_destroy(recovery_mem);
  }
  }
  closedir(dir);
  }

  // --- 创建新 MANIFEST(如果是新 DB 或需要 compact MANIFEST)---
  if (!has_manifest) {
  char mname[64];
  snprintf(mname, sizeof(mname), "MANIFEST-%06lu", db->next_file_num++);
  char mpath[512];
  snprintf(mpath, sizeof(mpath), "%s/%s", db_dir, mname);
  db->manifest_fp = fopen(mpath, "ab");

  // 写入当前完整 Version 作为初始快照
  VersionEdit ve;
  version_edit_init(&ve);
  for (int l = 0; l < MAX_LEVELS; l++) {
  for (int f = 0; f < db->current->file_count[l]; f++) {
  version_edit_add_file(&ve, l, &db->current->files[l][f]);
  }
  }
  ve.next_file_number = db->next_file_num;
  ve.has_next_file_number = 1;
  manifest_write(db->manifest_fp, &ve);
  fflush(db->manifest_fp); fsync(fileno(db->manifest_fp));

  write_current_file(db_dir, mname);
  } else {
  char mpath[512];
  snprintf(mpath, sizeof(mpath), "%s/%s", db_dir, manifest_name);
  db->manifest_fp = fopen(mpath, "ab");
  }

  // --- 创建新 MemTable + WAL ---
  db->wal_file_num = db->next_file_num++;
  char wal_path[512];
  snprintf(wal_path, sizeof(wal_path), "%s/%06lu.wal",
  db_dir, (unsigned long)db->wal_file_num);
  wal_writer_open(&db->wal, wal_path);
  db->mem = memtable_new();
  db->imm = NULL;

  // --- 启动后台 Compaction 线程 ---
  db->bg_running = 1;
  pthread_create(&db->bg_thread, NULL, bg_compaction_thread, db);

  *out = db;
  return 0;
}

恢复过程的关键在于顺序:先恢复 Version(确定有哪些 SSTable),再重放 WAL(恢复未持久化的内存数据)。这保证了崩溃后不丢数据——即使 MemTable 在崩溃时丢失,WAL 中的记录可以完整重建。

flush 辅助函数

flush_skiplist_to_l0() 将 MemTable 的跳表数据 flush 到 L0 SSTable,复用第 4 篇flush_memtable_to_l0()

// 遍历 skiplist,收集 key/value 数组,调用 flush_memtable_to_l0()
static void flush_skiplist_to_l0(DB *db, MemTable *m, FileMetaData *out) {
  int count = m->table->count;
  const char **keys = malloc(sizeof(char*) * count);
  size_t *key_lens  = malloc(sizeof(size_t) * count);
  const char **vals = malloc(sizeof(char*) * count);
  size_t *val_lens  = malloc(sizeof(size_t) * count);

  SkipListNode *node = m->table->header->forward[0];
  int i = 0;
  while (node && i < count) {
  keys[i] = node->key;
  key_lens[i] = node->key_len;
  vals[i] = node->value;
  val_lens[i] = node->value_len;
  node = node->forward[0];
  i++;
  }

  uint64_t fnum = db->next_file_num++;
  flush_memtable_to_l0(db->db_dir, fnum, keys, key_lens,
  vals, val_lens, count, out);

  free(keys); free(key_lens);
  free(vals); free(val_lens);
}

db_put() / db_delete():写路径

db_put() 完整写路径

写路径是整个引擎中最关键的串行化点。所有写操作必须持有 mu,保证 WAL 和 MemTable 的原子性。

// 编码 KV record: [type(1) | klen(4) | key | vlen(4) | val]
static size_t encode_kv_record(char *buf, uint8_t type,
  const char *key, size_t klen,
  const char *val, size_t vlen) {
  buf[0] = (char)type;
  memcpy(buf + 1, &klen, 4);
  memcpy(buf + 5, key, klen);
  uint32_t vl = (uint32_t)vlen;
  memcpy(buf + 5 + klen, &vl, 4);
  memcpy(buf + 9 + klen, val, vlen);
  return 9 + klen + vlen;
}

static int make_room_for_write(DB *db) {
  // 检查是否需要 freeze MemTable
  while (1) {
  if (db->mem->arena->bytes_used < db->options.write_buffer_size) {
  return 0; // 空间充足
  }

  if (db->imm != NULL) {
  // Immutable 还没 flush 完,阻塞等待
  pthread_cond_wait(&db->write_cv, &db->mu);
  continue;
  }

  // Freeze: mutable → immutable
  db->imm = db->mem;
  db->mem = memtable_new();

  // 创建新 WAL
  db->wal_file_num = db->next_file_num++;
  char wal_path[512];
  snprintf(wal_path, sizeof(wal_path), "%s/%06lu.wal",
  db->db_dir, (unsigned long)db->wal_file_num);
  wal_writer_close(&db->wal);
  wal_writer_open(&db->wal, wal_path);

  // 通知后台线程 flush
  pthread_cond_signal(&db->bg_cv);
  return 0;
  }
}

int db_put(DB *db, const WriteOptions *opts,
  const char *key, size_t klen,
  const char *val, size_t vlen) {
  pthread_mutex_lock(&db->mu);

  int ret = make_room_for_write(db);
  if (ret != 0) { pthread_mutex_unlock(&db->mu); return ret; }

  // 1. WAL 追加
  char record[4096];
  size_t rlen = encode_kv_record(record, kTypePut, key, klen, val, vlen);
  wal_writer_append(&db->wal, record, rlen);

  if (opts && opts->sync) {
  fflush(db->wal.fp);
  fsync(fileno(db->wal.fp));
  }

  // 2. MemTable 写入
  uint64_t seq = ++db->last_seq;
  memtable_put(db->mem, kTypePut, key, klen, val, vlen);

  pthread_mutex_unlock(&db->mu);
  return 0;
}

int db_delete(DB *db, const WriteOptions *opts,
  const char *key, size_t klen) {
  pthread_mutex_lock(&db->mu);

  int ret = make_room_for_write(db);
  if (ret != 0) { pthread_mutex_unlock(&db->mu); return ret; }

  char record[4096];
  size_t rlen = encode_kv_record(record, kTypeDeletion, key, klen, "", 0);
  wal_writer_append(&db->wal, record, rlen);

  if (opts && opts->sync) {
  fflush(db->wal.fp);
  fsync(fileno(db->wal.fp));
  }

  ++db->last_seq;
  memtable_put(db->mem, kTypeDeletion, key, klen, "", 0);

  pthread_mutex_unlock(&db->mu);
  return 0;
}

make_room_for_write() 是写路径的调度核心:

  1. 如果 MemTable 空间充足 → 直接返回。
  2. 如果 MemTable 已满但 imm 仍在 flush → 阻塞等待write_cv)。
  3. 如果 MemTable 已满且 imm == NULLfreeze:当前 mem 变为 imm,新建空 mem 和新 WAL,通知后台 flush。

这就是 第 4 篇提到的 LevelDB 设计限制——同一时刻最多一个 Immutable MemTable。RocksDB 通过 max_write_buffer_number 允许多个 immutable 排队。


db_get():读路径

db_get() 完整读路径

读路径不需要持有全局锁——只需短暂加锁获取 memimmcurrent 的指针,然后释放锁,在锁外执行实际读取。这是 LevelDB 读性能高的关键。

int db_get(DB *db, const ReadOptions *opts,
  const char *key, size_t klen,
  char **val_out, size_t *vlen_out) {
  pthread_mutex_lock(&db->mu);

  // 快照:获取当前状态的指针
  MemTable *mem = db->mem;
  MemTable *imm = db->imm;
  Version *ver = db->current;
  ver->ref_count++;  // 防止被 compaction 释放

  uint64_t snapshot_seq = db->last_seq;
  if (opts && opts->snap)
  snapshot_seq = opts->snap->sequence;

  pthread_mutex_unlock(&db->mu);
  // 从此处开始不持锁

  int found = 0;
  char *val = NULL;
  size_t vlen = 0;

  // 1. 查 Mutable MemTable(传入 snapshot_seq 做版本过滤)
  int ret = memtable_get(mem, key, klen, snapshot_seq, &val, &vlen);
  if (ret == 1)  { found = 1; goto done; }
  if (ret == -1)  { found = -1; goto done; } // Tombstone

  // 2. 查 Immutable MemTable
  if (imm) {
  ret = memtable_get(imm, key, klen, snapshot_seq, &val, &vlen);
  if (ret == 1)  { found = 1; goto done; }
  if (ret == -1)  { found = -1; goto done; }
  }

  // 3. 查 L0(从新到旧逐文件查找)
  for (int i = ver->file_count[0] - 1; i >= 0; i--) {
  FileMetaData *f = &ver->files[0][i];
  // 快速范围检查
  if (memcmp(key, f->smallest, klen < (size_t)f->smallest_len ? klen : f->smallest_len) < 0)
  continue;
  if (memcmp(key, f->largest, klen < (size_t)f->largest_len ? klen : f->largest_len) > 0)
  continue;

  char fpath[512];
  snprintf(fpath, sizeof(fpath), "%s/%06lu.sst",
  db->db_dir, (unsigned long)f->number);
  TableReader tr;
  if (table_reader_open(&tr, fpath) == 0) {
  char *v; int vl;
  if (table_reader_get(&tr, key, klen, snapshot_seq, &v, &vl) == 0) {
  val = v; vlen = (size_t)vl;
  found = 1;
  table_reader_close(&tr);
  goto done;
  }
  table_reader_close(&tr);
  }
  }

  // 4. 查 L1 ~ L6(每层二分定位到最多一个文件)
  for (int level = 1; level < MAX_LEVELS; level++) {
  if (ver->file_count[level] == 0) continue;

  // 二分查找:找到 largest >= key 的最小文件
  int lo = 0, hi = ver->file_count[level] - 1, target = -1;
  while (lo <= hi) {
  int mid = (lo + hi) / 2;
  FileMetaData *f = &ver->files[level][mid];
  if (memcmp(f->largest, key, (size_t)f->largest_len < klen ? f->largest_len : klen) < 0)
  lo = mid + 1;
  else {
  target = mid;
  hi = mid - 1;
  }
  }
  if (target < 0) continue;

  FileMetaData *f = &ver->files[level][target];
  if (memcmp(key, f->smallest, klen < (size_t)f->smallest_len ? klen : f->smallest_len) < 0)
  continue;

  char fpath[512];
  snprintf(fpath, sizeof(fpath), "%s/%06lu.sst",
  db->db_dir, (unsigned long)f->number);
  TableReader tr;
  if (table_reader_open(&tr, fpath) == 0) {
  char *v; int vl;
  if (table_reader_get(&tr, key, klen, snapshot_seq, &v, &vl) == 0) {
  val = v; vlen = (size_t)vl;
  found = 1;
  table_reader_close(&tr);
  goto done;
  }
  table_reader_close(&tr);
  }
  }

done:
  // 释放 Version 引用
  pthread_mutex_lock(&db->mu);
  ver->ref_count--;
  pthread_mutex_unlock(&db->mu);

  if (found == 1) {
  *val_out = val;
  *vlen_out = vlen;
  return 0;
  }
  return -1; // not found
}

读路径的几个关键点:


Snapshot

Snapshot 与 Version 引用计数

Snapshot 提供一致性读——在 Snapshot 创建时刻的数据视图不会被后续的写入和 Compaction 影响。

Snapshot *db_get_snapshot(DB *db) {
  pthread_mutex_lock(&db->mu);

  Snapshot *s = calloc(1, sizeof(Snapshot));
  s->sequence = db->last_seq;
  s->version = db->current;
  s->version->ref_count++;

  // 插入到链表尾
  s->prev = db->snap_head.prev;
  s->next = &db->snap_head;
  db->snap_head.prev->next = s;
  db->snap_head.prev = s;

  pthread_mutex_unlock(&db->mu);
  return s;
}

void db_release_snapshot(DB *db, Snapshot *s) {
  pthread_mutex_lock(&db->mu);

  // 从链表移除
  s->prev->next = s->next;
  s->next->prev = s->prev;

  s->version->ref_count--;
  // 如果 ref_count == 0 且不是 current,可以释放
  // 简化实现中不做立即释放,留给 compaction 清理

  free(s);
  pthread_mutex_unlock(&db->mu);
}

// 获取所有 Snapshot 中最小的 sequence
// Compaction 时用它判断 tombstone 是否可以删除
static uint64_t smallest_snapshot_seq(DB *db) {
  if (db->snap_head.next == &db->snap_head)
  return db->last_seq; // 无 snapshot,全部数据可清理
  return db->snap_head.next->sequence;
}

Snapshot 的实现非常轻量——只记录一个序列号和一个 Version 引用:


DBIterator

DBIterator 多层归并

DBIterator 将 MemTable、Immutable MemTable 和所有层级的 SSTable 统一为一个有序遍历接口。核心是复用第 4 篇MergeIterator

typedef struct {
  MergeIterator  merge;
  uint64_t  snapshot_seq;  // 快照序列号
  Version  *version;  // 引用的 Version

  // 去重状态
  char  prev_user_key[256];
  int  prev_user_key_len;
  int  valid;

  // 当前输出
  const char  *cur_key;
  size_t  cur_key_len;
  const char  *cur_val;
  size_t  cur_val_len;

  // 打开的 TableReader(需要在 free 时关闭)
  TableReader  *readers;
  int  num_readers;
  DB  *db;
} DBIterator;

DBIterator *db_iterator_new(DB *db, const ReadOptions *opts) {
  pthread_mutex_lock(&db->mu);

  DBIterator *it = calloc(1, sizeof(DBIterator));
  it->db = db;
  it->version = db->current;
  it->version->ref_count++;
  it->snapshot_seq = opts && opts->snap ? opts->snap->sequence : db->last_seq;

  // 收集所有输入源
  Version *ver = it->version;
  int total_files = 0;
  for (int l = 0; l < MAX_LEVELS; l++)
  total_files += ver->file_count[l];

  it->readers = calloc(total_files, sizeof(TableReader));
  it->num_readers = 0;

  // 打开所有 SSTable
  TableReader *open_readers = calloc(total_files, sizeof(TableReader));
  int n = 0;
  for (int l = 0; l < MAX_LEVELS; l++) {
  for (int f = 0; f < ver->file_count[l]; f++) {
  char fpath[512];
  snprintf(fpath, sizeof(fpath), "%s/%06lu.sst",
  db->db_dir, (unsigned long)ver->files[l][f].number);
  if (table_reader_open(&open_readers[n], fpath) == 0) {
  it->readers[n] = open_readers[n];
  n++;
  }
  }
  }
  it->num_readers = n;

  // 初始化 MergeIterator
  merge_iter_init(&it->merge, open_readers, n);

  pthread_mutex_unlock(&db->mu);
  free(open_readers);
  return it;
}

// 内部:跳过重复 key 和 tombstone,前进到下一个有效条目
static void iter_advance(DBIterator *it) {
  while (merge_iter_valid(&it->merge)) {
  const char *key = merge_iter_key(&it->merge);
  size_t klen = merge_iter_key_len(&it->merge);

  // 提取 user_key(去掉 InternalKey 的 8 字节后缀)
  // 简化实现:假设 key 就是 user_key
  const char *user_key = key;
  size_t user_key_len = klen;

  // 去重:如果和上一个输出的 user_key 相同,跳过
  if (it->prev_user_key_len > 0 &&
  user_key_len == (size_t)it->prev_user_key_len &&
  memcmp(user_key, it->prev_user_key, user_key_len) == 0) {
  merge_iter_next(&it->merge);
  continue;
  }

  // 检查 type 是否为 deletion(tombstone),如果是则跳过
  // InternalKey 末尾 1 字节为 type,kTypeDeletion == 2
  if (klen > 8) {
  uint8_t type = (uint8_t)key[klen - 1];
  if (type == kTypeDeletion) {
  // 记录 user_key 以跳过同 key 的更旧版本
  memcpy(it->prev_user_key, user_key, user_key_len);
  it->prev_user_key_len = (int)user_key_len;
  merge_iter_next(&it->merge);
  continue;
  }
  }

  it->cur_key = key;
  it->cur_key_len = klen;
  it->cur_val = merge_iter_value(&it->merge);
  it->cur_val_len = merge_iter_value_len(&it->merge);
  it->valid = 1;

  memcpy(it->prev_user_key, user_key, user_key_len);
  it->prev_user_key_len = (int)user_key_len;
  return;
  }
  it->valid = 0;
}

void db_iter_seek_to_first(DBIterator *it) {
  // MergeIterator 在 init 时已经定位到最小元素
  it->prev_user_key_len = 0;
  iter_advance(it);
}

void db_iter_next(DBIterator *it) {
  merge_iter_next(&it->merge);
  iter_advance(it);
}

int db_iter_valid(DBIterator *it)  { return it->valid; }
const char *db_iter_key(DBIterator *it)  { return it->cur_key; }
size_t db_iter_key_len(DBIterator *it)  { return it->cur_key_len; }
const char *db_iter_value(DBIterator *it)  { return it->cur_val; }
size_t db_iter_value_len(DBIterator *it)  { return it->cur_val_len; }

void db_iter_free(DBIterator *it) {
  merge_iter_free(&it->merge);
  for (int i = 0; i < it->num_readers; i++)
  table_reader_close(&it->readers[i]);
  free(it->readers);

  pthread_mutex_lock(&it->db->mu);
  it->version->ref_count--;
  pthread_mutex_unlock(&it->db->mu);

  free(it);
}

DBIterator 的去重逻辑值得注意:MergeIterator 输出的是所有层级的 key-value 对(包括同一 key 的多个版本),DBIterator 在其上做去重——同一 user_key 只输出 sequence 最大(最新)的版本。如果最新版本是 tombstone(kTypeDeletion),整个 key 被跳过。


后台 Compaction 线程

后台线程负责两类工作:Flush(Immutable MemTable → L0)和 Major Compaction(Ln → Ln+1 归并)。

static void bg_flush(DB *db) {
  // 将 Immutable MemTable flush 到 L0
  MemTable *imm = db->imm;
  FileMetaData meta;

  // 在锁外做 I/O
  pthread_mutex_unlock(&db->mu);
  flush_skiplist_to_l0(db, imm, &meta);
  pthread_mutex_lock(&db->mu);

  // 更新 MANIFEST
  VersionEdit ve;
  version_edit_init(&ve);
  version_edit_add_file(&ve, 0, &meta);
  ve.next_file_number = db->next_file_num;
  ve.has_next_file_number = 1;
  manifest_write(db->manifest_fp, &ve);
  fflush(db->manifest_fp);
  fsync(fileno(db->manifest_fp));

  version_apply(db->current, &ve);

  // 清理 Immutable MemTable 和旧 WAL
  memtable_destroy(imm);
  db->imm = NULL;

  // 唤醒阻塞的写入线程
  pthread_cond_signal(&db->write_cv);
}

static void bg_major_compaction(DB *db) {
  // 选择需要 compaction 的层级
  int best_level = -1;
  int file_counts[MAX_LEVELS];
  uint64_t level_bytes[MAX_LEVELS];
  for (int l = 0; l < MAX_LEVELS; l++) {
  file_counts[l] = db->current->file_count[l];
  level_bytes[l] = 0;
  for (int f = 0; f < file_counts[l]; f++)
  level_bytes[l] += db->current->files[l][f].file_size;
  }

  double score = compaction_score(file_counts, level_bytes, &best_level);
  if (score < 1.0 || best_level < 0) return;

  // 收集输入文件路径
  int target_level = best_level + 1;
  if (target_level >= MAX_LEVELS) return;

  int num_inputs = db->current->file_count[best_level];
  if (num_inputs == 0) return;

  const char **input_paths = malloc(sizeof(char*) * num_inputs);
  char (*path_bufs)[512] = malloc(sizeof(char[512]) * num_inputs);
  for (int i = 0; i < num_inputs; i++) {
  snprintf(path_bufs[i], 512, "%s/%06lu.sst",
  db->db_dir,
  (unsigned long)db->current->files[best_level][i].number);
  input_paths[i] = path_bufs[i];
  }

  // 在锁内预分配输出文件编号,避免锁外修改共享状态
  // 预估 compaction 最多产生 num_inputs 个输出文件
  uint64_t reserved_file_num = db->next_file_num;
  db->next_file_num += num_inputs; // 预留足够的编号空间

  // 在锁外执行 compaction I/O
  pthread_mutex_unlock(&db->mu);

  CompactionState cs = {0};
  do_compaction(input_paths, num_inputs, db->db_dir,
  &reserved_file_num, target_level, &cs);

  pthread_mutex_lock(&db->mu);

  // 更新 Version
  VersionEdit ve;
  version_edit_init(&ve);
  for (int i = 0; i < num_inputs; i++)
  version_edit_remove_file(&ve, best_level,
  db->current->files[best_level][i].number);
  for (int i = 0; i < cs.num_outputs; i++)
  version_edit_add_file(&ve, target_level, &cs.outputs[i]);
  ve.next_file_number = reserved_file_num; // 使用实际消耗后的编号
  db->next_file_num = reserved_file_num;  // 同步回 DB
  ve.has_next_file_number = 1;

  manifest_write(db->manifest_fp, &ve);
  fflush(db->manifest_fp); fsync(fileno(db->manifest_fp));
  version_apply(db->current, &ve);

  free(input_paths);
  free(path_bufs);
  free(cs.outputs);
}

static void *bg_compaction_thread(void *arg) {
  DB *db = (DB *)arg;

  pthread_mutex_lock(&db->mu);
  while (!db->shutting_down) {
  // 有 Immutable MemTable 要 flush
  if (db->imm != NULL) {
  bg_flush(db);
  continue; // flush 完继续检查
  }

  // 检查是否需要 major compaction
  bg_major_compaction(db);

  // 没事做了,等待信号
  pthread_cond_wait(&db->bg_cv, &db->mu);
  }
  pthread_mutex_unlock(&db->mu);
  return NULL;
}

后台线程的关键模式是锁内决策、锁外 I/O

这种模式确保了 I/O 不阻塞写入路径,同时保证了状态更新的原子性。next_file_num 在锁内预分配而非在锁外修改,避免了共享变量的数据竞争。


db_close()

int db_close(DB *db) {
  pthread_mutex_lock(&db->mu);

  // 通知后台线程退出
  db->shutting_down = 1;
  pthread_cond_signal(&db->bg_cv);
  pthread_mutex_unlock(&db->mu);

  // 等待后台线程结束
  pthread_join(db->bg_thread, NULL);

  // Flush 当前 MemTable(如果非空)
  if (db->mem && db->mem->table->count > 0) {
  FileMetaData meta;
  flush_skiplist_to_l0(db, db->mem, &meta);

  VersionEdit ve;
  version_edit_init(&ve);
  version_edit_add_file(&ve, 0, &meta);
  ve.next_file_number = db->next_file_num;
  ve.has_next_file_number = 1;
  manifest_write(db->manifest_fp, &ve);
  fflush(db->manifest_fp);
  fsync(fileno(db->manifest_fp));
  }

  // 关闭 WAL、MANIFEST
  wal_writer_close(&db->wal);
  if (db->manifest_fp) fclose(db->manifest_fp);

  // 释放 MemTable
  if (db->mem) memtable_destroy(db->mem);
  if (db->imm) memtable_destroy(db->imm);

  // 释放 Version
  if (db->current) {
  for (int l = 0; l < MAX_LEVELS; l++)
  free(db->current->files[l]);
  free(db->current);
  }

  // 清理同步原语
  pthread_mutex_destroy(&db->mu);
  pthread_cond_destroy(&db->bg_cv);
  pthread_cond_destroy(&db->write_cv);

  free(db);
  return 0;
}

db_close() 的顺序:先停后台线程 → flush 残留数据 → 关闭文件 → 释放内存。保证关闭前所有数据都已持久化。


完整 Demo

把所有组件串起来,演示完整生命周期:

int main(void) {
  const char *db_path = "/tmp/lsm_engine_demo";

  // === 打开 DB ===
  DB_Options opts = {
  .write_buffer_size = 4096,  // 小阈值便于触发 flush
  .bloom_bits_per_key = 10,
  .sync = 0,
  };
  DB *db;
  int ret = db_open(db_path, &opts, &db);
  printf("db_open: %s\n", ret == 0 ? "OK" : "FAIL");

  // === 写入 200 条 ===
  WriteOptions wopts = {.sync = 0};
  for (int i = 0; i < 200; i++) {
  char key[32], val[64];
  snprintf(key, sizeof(key), "key:%06d", i);
  snprintf(val, sizeof(val), "value_%d_initial", i);
  db_put(db, &wopts, key, strlen(key), val, strlen(val));
  }
  printf("Wrote 200 keys.\n");

  // === 点查验证 ===
  ReadOptions ropts = {.snap = NULL};
  char *val; size_t vlen;
  ret = db_get(db, &ropts, "key:000042", 10, &val, &vlen);
  printf("Get key:000042 -> %.*s (ret=%d)\n", (int)vlen, val, ret);
  if (ret == 0) free(val);

  // === 创建 Snapshot ===
  Snapshot *snap = db_get_snapshot(db);
  printf("Snapshot created at seq=%lu\n", (unsigned long)snap->sequence);

  // === 覆盖写入 ===
  for (int i = 0; i < 50; i++) {
  char key[32], val[64];
  snprintf(key, sizeof(key), "key:%06d", i);
  snprintf(val, sizeof(val), "value_%d_UPDATED", i);
  db_put(db, &wopts, key, strlen(key), val, strlen(val));
  }
  printf("Updated first 50 keys.\n");

  // === Snapshot 读到旧值 ===
  ReadOptions snap_ropts = {.snap = snap};
  ret = db_get(db, &snap_ropts, "key:000010", 10, &val, &vlen);
  printf("Snapshot get key:000010 -> %.*s\n", (int)vlen, val);
  if (ret == 0) free(val);

  // 当前读到新值
  ret = db_get(db, &ropts, "key:000010", 10, &val, &vlen);
  printf("Current get key:000010 -> %.*s\n", (int)vlen, val);
  if (ret == 0) free(val);

  db_release_snapshot(db, snap);

  // === Delete 测试 ===
  db_delete(db, &wopts, "key:000099", 10);
  ret = db_get(db, &ropts, "key:000099", 10, &val, &vlen);
  printf("After delete, get key:000099 -> %s\n",
  ret == 0 ? "found (unexpected)" : "not found (correct)");

  // === Iterator 扫描 ===
  DBIterator *it = db_iterator_new(db, &ropts);
  db_iter_seek_to_first(it);
  int scan_count = 0;
  while (db_iter_valid(it)) {
  scan_count++;
  db_iter_next(it);
  }
  printf("Iterator scanned %d unique keys.\n", scan_count);
  db_iter_free(it);

  // === 关闭 + 重新打开(恢复验证)===
  db_close(db);
  printf("DB closed.\n");

  ret = db_open(db_path, &opts, &db);
  printf("db_open (reopen): %s\n", ret == 0 ? "OK" : "FAIL");

  ret = db_get(db, &ropts, "key:000042", 10, &val, &vlen);
  printf("After reopen, get key:000042 -> %.*s (ret=%d)\n",
  (int)vlen, val, ret);
  if (ret == 0) free(val);

  ret = db_get(db, &ropts, "key:000099", 10, &val, &vlen);
  printf("After reopen, get key:000099 -> %s\n",
  ret == 0 ? "found" : "not found (delete survived)");

  // === 打印统计 ===
  printf("\n--- DB Stats ---\n");
  for (int l = 0; l < MAX_LEVELS; l++) {
  if (db->current->file_count[l] > 0) {
  uint64_t total_bytes = 0;
  for (int f = 0; f < db->current->file_count[l]; f++)
  total_bytes += db->current->files[l][f].file_size;
  printf("Level %d: %d files, %lu bytes\n",
  l, db->current->file_count[l], (unsigned long)total_bytes);
  }
  }
  printf("next_file_num: %lu, last_seq: %lu\n",
  (unsigned long)db->next_file_num, (unsigned long)db->last_seq);

  db_close(db);
  printf("Done.\n");
  return 0;
}

预期输出:

db_open: OK
Wrote 200 keys.
Get key:000042 -> value_42_initial (ret=0)
Snapshot created at seq=200
Updated first 50 keys.
Snapshot get key:000010 -> value_10_initial
Current get key:000010 -> value_10_UPDATED
After delete, get key:000099 -> not found (correct)
Iterator scanned 199 unique keys.
DB closed.
db_open (reopen): OK
After reopen, get key:000042 -> value_42_initial (ret=0)
After reopen, get key:000099 -> not found (delete survived)

--- DB Stats ---
Level 0: 2 files, 8194 bytes
Level 1: 1 files, 12842 bytes
next_file_num: 8, last_seq: 251
Done.

小结:Part A 完成了从零到可用的完整引擎。所有 C 代码复用了前四篇的函数——wal_writer_append()memtable_put/get()table_builder/reader_*()flush_memtable_to_l0()do_compaction()version_apply()manifest_write/recover()DB 结构体将它们聚合起来,pthread_mutex 保护写路径,ref_count 保护读路径,后台线程异步执行 Flush 和 Compaction。


Part B:Rust 重写

本节交付范围:用 Rust 重写 SkipList、WAL、SSTable Reader/Builder 和 DB 接口(Put / Get)。不含 Compaction 调度和 Iterator——目的不是 1:1 移植,而是通过 5 个真实编译错误展示 Rust 如何在编译期捕获 C 中常见的安全隐患。

这一部分用 Rust 重写 LSM-Tree 的核心模块——SkipList、WAL、SSTable 和 DB 接口(不含 Compaction 调度)。重点不是逐行翻译,而是观察 Rust 的类型系统和所有权模型如何消除 C 中的常见 bug。

C 手动管理 vs Rust 所有权模型

项目结构

examples/lsm-tree-rs/
├── Cargo.toml
├── src/
│  ├── lib.rs
│  ├── skiplist.rs
│  ├── wal.rs
│  ├── block.rs
│  ├── bloom.rs
│  ├── sstable.rs
│  └── db.rs
└── benches/
  └── bench_main.rs

Cargo.toml:

[package]
name = "lsm-tree"
version = "0.1.0"
edition = "2021"

[dependencies]
crc32fast = "1.4"
memmap2 = "0.9"

[dev-dependencies]
tempfile = "3"

故事 1:Arena 的裸指针——SkipList

C 的 SkipList 使用 Arena 分配器管理节点内存——arena_alloc() 返回裸指针,所有节点的生命周期与 Arena 绑定。Rust 不允许这样做。

C 写法:

SkipListNode *node = arena_alloc(arena, sizeof(SkipListNode) + level * sizeof(void*));
node->forward[0] = header->forward[0]; // 裸指针操作
header->forward[0] = node;

Rust 直译 → 编译器拒绝:

// error[E0499]: cannot borrow `arena` as mutable more than once
let node = arena.alloc(layout);
let header = arena.get_header(); // 二次可变借用

Rust 不允许同时持有对同一结构的多个可变引用。Arena 分配器内部混用了 &mut self(分配)和 &self(读取),borrow checker 不允许。

正确写法——用 Vec<Box<Node>> 替代 Arena,让 Rust 自动管理每个节点的分配和释放:

pub struct SkipList {
  header: Box<Node>,
  nodes: Vec<Box<Node>>, // 所有节点的所有权
  level: usize,
  len: usize,
}

struct Node {
  key: Vec<u8>,
  value: Vec<u8>,
  forward: Vec<Option<*mut Node>>, // unsafe: raw pointers for O(1) link
}

impl SkipList {
  pub fn insert(&mut self, key: &[u8], value: &[u8]) {
  let level = self.random_level();
  let mut new_node = Box::new(Node {
  key: key.to_vec(),
  value: value.to_vec(),
  forward: vec![None; level + 1],
  });
  let node_ptr: *mut Node = &mut *new_node;

  // 找到插入点
  let mut update = vec![std::ptr::null_mut::<Node>(); level + 1];
  let mut curr = &mut *self.header as *mut Node;
  for i in (0..=level).rev() {
  unsafe {
  while let Some(next) = (*curr).forward[i] {
  if (*next).key.as_slice() < key {
  curr = next;
  } else { break; }
  }
  update[i] = curr;
  }
  }

  // 插入链接
  for i in 0..=level {
  unsafe {
  new_node.forward[i] = (*update[i]).forward[i];
  (*update[i]).forward[i] = Some(node_ptr);
  }
  }

  self.nodes.push(new_node); // 转移所有权到 Vec
  self.len += 1;
  }

  pub fn get(&self, key: &[u8]) -> Option<&[u8]> {
  let mut curr = &*self.header as *const Node;
  for i in (0..self.header.forward.len()).rev() {
  unsafe {
  while let Some(next) = (*curr).forward[i] {
  if (*(next as *const Node)).key.as_slice() < key {
  curr = next as *const Node;
  } else { break; }
  }
  }
  }
  unsafe {
  if let Some(next) = (*curr).forward[0] {
  if (*(next as *const Node)).key.as_slice() == key {
  return Some(&(*(next as *const Node)).value);
  }
  }
  }
  None
  }
}

安全收益:节点的所有权归 Vec<Box<Node>>,SkipList 被 drop 时所有节点自动释放——不可能忘记 arena_destroy()unsafe 块被限制在指针操作内,Rust 保证了内存不会泄漏。

故事 2:柔性数组的替代方案

C 的 SkipListNode 末尾有 forward[] 柔性数组:

typedef struct SkipListNode {
  char *key, *value;
  int level;
  struct SkipListNode *forward[]; // C99 flexible array member
} SkipListNode;

Rust 没有柔性数组成员。 直接翻译不可能:

// error: the size for values of type `[*mut Node]` cannot be known at compilation time
struct Node {
  key: Vec<u8>,
  forward: [*mut Node], // unsized type
}

解决方案Vec<Option<*mut Node>> 在构造时指定长度。这在语义上等价——C 的 arena_alloc(sizeof(Node) + level * sizeof(void*)) 对应 Rust 的 vec![None; level + 1]。代价是多一层间接寻址(Vec 指向堆),但实际性能差异收敛于噪声范围内。

故事 3:Mmap 的生命周期——SSTable Reader

Mmap 生命周期约束

C 的 SSTable Reader 用 mmap() 映射文件:

void *data = mmap(NULL, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
close(fd);  // 关了 fd 之后 mmap 仍然有效(OS 保证)
// ... 在 data 上做读操作
munmap(data, file_size); // 手动释放

Rust Mmap 的生命周期问题:

// error[E0597]: `file` does not live long enough
fn open_sst(path: &str) -> Mmap {
  let file = File::open(path).unwrap();
  let mmap = unsafe { Mmap::map(&file).unwrap() }; // borrows &file
  mmap // file 在这里被 drop → mmap 的借用悬空
}

memmap2::Mmap::map() 接受 &File——borrow checker 要求 File 的生命周期必须至少和 Mmap 一样长。

正确写法:将 FileMmap 绑定在同一个结构体中,利用 Rust struct 字段的 drop 顺序(后声明的字段先 drop)保证 MmapFile 之前被释放:

pub struct SstReader {
  _file: File,  // 先声明 → 后 drop(保活)
  mmap: Mmap,  // 后声明 → 先 drop
  index_offset: usize,
  bloom: Option<BloomFilter>,
}

impl SstReader {
  pub fn open(path: &Path) -> io::Result<Self> {
  let file = File::open(path)?;
  let mmap = unsafe { Mmap::map(&file)? };

  // 解析 Footer(最后 48 字节)
  let footer_start = mmap.len() - FOOTER_SIZE;
  let (index_handle, meta_handle) = decode_footer(&mmap[footer_start..])?;

  // 加载 Bloom Filter
  let bloom = if meta_handle.size > 0 {
  Some(BloomFilter::from_bytes(&mmap[meta_handle.offset as usize..(meta_handle.offset + meta_handle.size) as usize]))
  } else { None };

  Ok(Self {
  _file: file,
  mmap,
  index_offset: index_handle.offset as usize,
  bloom,
  })
  }

  pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
  // Bloom Filter 快速排除
  if let Some(ref bf) = self.bloom {
  if !bf.may_match(key) { return None; }
  }
  // Index Block → Data Block → 二分查找
  self.search_index(key)
  }
}

安全收益:C 需要手动匹配 mmap()/munmap(),Rust 的 Drop trait 自动处理。而且如果你试图让 Mmap 活得比 File 长,编译器直接报错——不可能出现悬空指针。

故事 4:BlockBuilder 的借用冲突

C 的 BlockBuilder 在写入时持有上一个 key 的副本用于计算共享前缀:

void block_builder_add(BlockBuilder *bb, const char *key, size_t key_len, ...) {
  int shared = common_prefix_len(bb->last_key, bb->last_key_len, key, key_len);
  // ... 写入 shared/non-shared/value_len 和数据
  memcpy(bb->last_key, key, key_len);
  bb->last_key_len = key_len;
}

Rust 直译:

// error[E0502]: cannot borrow `self` as mutable because it is also borrowed as immutable
fn add(&mut self, key: &[u8], value: &[u8]) {
  let shared = common_prefix(&self.last_key, key); // immutable borrow
  self.data.extend_from_slice(...);  // mutable borrow
  self.last_key = key.to_vec();  // mutable borrow
}

问题:common_prefix(&self.last_key, ...) 创建了对 self 的不可变引用,而后续 self.data.extend_from_slice(...) 需要可变引用。Rust 不允许同时存在可变和不可变引用。

解决方案——先计算 shared 长度(纯数值),再做写操作:

pub struct BlockBuilder {
  data: Vec<u8>,
  restarts: Vec<u32>,
  last_key: Vec<u8>,
  entry_count: usize,
}

impl BlockBuilder {
  pub fn add(&mut self, key: &[u8], value: &[u8]) {
  let shared = if self.entry_count % RESTART_INTERVAL == 0 {
  self.restarts.push(self.data.len() as u32);
  0
  } else {
  common_prefix_len(&self.last_key, key) // 返回 usize,不保持借用
  };

  let non_shared = key.len() - shared;
  // Encode shared | non_shared | value_len | key[shared..] | value
  self.data.extend_from_slice(&encode_varint32(shared as u32));
  self.data.extend_from_slice(&encode_varint32(non_shared as u32));
  self.data.extend_from_slice(&encode_varint32(value.len() as u32));
  self.data.extend_from_slice(&key[shared..]);
  self.data.extend_from_slice(value);

  self.last_key.clear();
  self.last_key.extend_from_slice(key);
  self.entry_count += 1;
  }
}

fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
  a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
}

安全收益:borrow checker 迫使我们将”读旧数据”和”写新数据”分开。这不仅避免了潜在的迭代器失效(C 中 memcpylast_key 可能在某些场景覆盖正在读的数据),还使代码逻辑更清晰。

故事 5:DB 的 Send + Sync——跨线程共享

C 的 DB 结构体可以在线程间随意传递指针:

pthread_create(&thread, NULL, bg_compaction_thread, db); // 直接传裸指针

Rust 编译器要求 DB 实现 Send + Sync

// error[E0277]: `*mut Node` cannot be sent between threads safely
struct DB {
  mem: SkipList,  // contains *mut Node → not Send
  current: Version,
}

SkipList 内部有 *mut Node 裸指针,裸指针既不是 Send 也不是 Sync。跨线程使用 DB 需要手动声明安全保证:

pub struct DB {
  inner: Arc<Mutex<DBInner>>,
}

struct DBInner {
  mem: SkipList,
  imm: Option<SkipList>,
  current: Arc<Version>,
  wal: WalWriter,
  next_file_num: u64,
  last_seq: u64,
  options: DBOptions,
  db_dir: PathBuf,
}

// 手动声明:我们保证 DBInner 在 Mutex 保护下的跨线程安全
unsafe impl Send for DBInner {}

impl DB {
  pub fn open(path: impl AsRef<Path>, opts: DBOptions) -> io::Result<Self> {
  // ... 恢复逻辑,与 C 版本类似
  let inner = DBInner {
  mem: SkipList::new(),
  imm: None,
  current: Arc::new(Version::new()),
  wal: WalWriter::new(&wal_path)?,
  next_file_num: 1,
  last_seq: 0,
  options: opts,
  db_dir: path.as_ref().to_path_buf(),
  };

  Ok(DB {
  inner: Arc::new(Mutex::new(inner)),
  })
  }

  pub fn put(&self, key: &[u8], value: &[u8]) -> io::Result<()> {
  let mut inner = self.inner.lock().unwrap();
  // WAL 追加 + MemTable 写入
  inner.wal.append(key, value)?;
  inner.last_seq += 1;
  inner.mem.insert(key, value);
  Ok(())
  }

  pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
  let inner = self.inner.lock().unwrap();
  // 查 MemTable
  if let Some(val) = inner.mem.get(key) {
  return Some(val.to_vec());
  }
  // 查 Immutable
  if let Some(ref imm) = inner.imm {
  if let Some(val) = imm.get(key) {
  return Some(val.to_vec());
  }
  }
  // 查 SSTable(通过 Version)
  let version = inner.current.clone(); // Arc clone → 锁外读
  drop(inner);
  version.get(key)
  }
}

安全收益Arc<Mutex<DBInner>> 确保了: - Arc → 多线程安全的引用计数(对应 C 的手动 ref_count) - Mutex → 自动锁定/解锁(对应 C 的 pthread_mutex_lock/unlock) - 如果忘记声明 Send,编译器拒绝跨线程使用——不可能出现数据竞争

C vs Rust 并发安全对比

场景 C Rust
忘记加锁 运行时数据竞争(未定义行为) 编译错误:Mutex 保护的数据无法裸访问
忘记解锁 死锁 自动解锁:MutexGuard drop 时释放
忘记 ref_count++ use-after-free 编译错误Arc::clone() 是唯一的共享方式
裸指针跨线程 数据竞争 编译错误*mut T 不是 Send

小结:5 个故事覆盖了 LSM-Tree 实现中最常见的安全隐患——内存泄漏(Arena)、悬空指针(Mmap 生命周期)、迭代器失效(借用冲突)、数据竞争(Send/Sync)。Rust 的类型系统在编译期捕获了所有这些问题。代价是需要更多前期设计思考,以及在跳表等底层数据结构中使用 unsafe


Part C:性能对比

Benchmark 设置

对比对象——Part A 的 C 实现、Part B 的 Rust 实现、以及 LevelDB 1.23。C 实现已从四篇文章中的教学代码整合为可编译的独立项目 examples/lsm-tree-c/(含 lsm.h + bench.c),与 Rust / LevelDB 使用完全相同的参数和场景

参数
Key 大小 16 字节
Value 大小 100 字节
数据量 100,000 条
write_buffer_size 4 MB
bloom_bits_per_key 10
测试机器 Intel i9-12900K, 24 核, Linux x86_64
编译器 GCC 15.2.1 -O2 (C / LevelDB) / rustc 1.94.0 –release (Rust)
轮数 3 轮取中位数

五个场景:

  1. Sequential Write:key 从 00000000000000000000000000099999 顺序写入。
  2. Random Write:key 随机打乱后写入(Fisher-Yates shuffle)。
  3. Sequential Read:按 key 顺序逐条 Get()
  4. Random Read:随机 key 逐条 Get()
  5. Mixed Read/Write:80% 随机读 + 20% 随机写交替执行。

benchmark 源码:

  • Our Cexamples/lsm-tree-c/(编译:gcc -O2 -o bench_our_c bench.c -lz -lpthread
  • Our Rustexamples/lsm-tree-rs/benches/bench_main.rs(编译:cargo build --release --bin bench_main
  • LevelDBexamples/lsm-tree-rs/bench_leveldb.c(编译:gcc -O2 -o bench_leveldb bench_leveldb.c -lleveldb -lpthread

实测数据

性能对比图

下表为 3 轮取中位数:

场景 Our C (ops/s) Our Rust (ops/s) LevelDB 1.23 (ops/s) C / LevelDB Rust / LevelDB
Seq Write 2,524,115 11,479 1,291,206 195% 0.9%
Rand Write 1,764,349 7,277 1,002,619 176% 0.7%
Seq Read 59,908 59,374 1,914,418 3.1% 3.1%
Rand Read 55,984 52,525 1,126,821 5.0% 4.7%
Mixed R/W 88,295 27,471 761,754 11.6% 3.6%

完整输出:

=== Our C LSM-Tree Benchmark ===
N=100000  key=16B  val=100B  write_buffer=4MB  bloom=10 bits/key

Round 1: Seq Write 2446122 | Rand Write 1737834 | Seq Read  59269 | Rand Read 55861 | Mixed  88295
Round 2: Seq Write 2565890 | Rand Write 1778965 | Seq Read  59908 | Rand Read 56417 | Mixed  89457
Round 3: Seq Write 2524115 | Rand Write 1764349 | Seq Read  60445 | Rand Read 55984 | Mixed  83771

=== Our Rust LSM-Tree Benchmark ===
N=100000, key=16B, val=100B, write_buffer=4MB

Round 1: Seq Write 11479 | Rand Write 7277 | Seq Read 61261 | Rand Read 58668 | Mixed 29560
Round 2: Seq Write 11560 | Rand Write 7036 | Seq Read 59374 | Rand Read 52525 | Mixed 27471
Round 3: Seq Write 11372 | Rand Write 7302 | Seq Read 56220 | Rand Read 51837 | Mixed 27365

=== LevelDB 1.23 Benchmark ===
N=100000, key=16B, val=100B, write_buffer=4MB

Round 1: Seq Write 1257267 | Rand Write 1002619 | Seq Read 1914418 | Rand Read 1126821 | Mixed 740955
Round 2: Seq Write 1291206 | Rand Write  985047 | Seq Read 1927332 | Rand Read 1165115 | Mixed 761754
Round 3: Seq Write 1291374 | Rand Write 1006488 | Seq Read 1852720 | Rand Read 1100285 | Mixed 766732

差距分析

口径说明:三个实现的功能完整度不同。Our C 包含 MANIFEST 更新和 Compaction 调度但不对写入做 fsync();Our Rust 不含 Compaction 调度且每次写入都 BufWriter::flush();LevelDB 是功能完整的生产级实现(WriteBatch、Group Commit、Table Cache、Block Cache、Snappy 压缩)。下面的数据用于观察架构选择对性能的影响,而非语言本身的优劣。

三组数据呈现出一个有趣的格局:

Our C 写入为什么比 LevelDB 还快?

Our C 的写入路径虽然包含 MANIFEST 和 Compaction(后台线程),但 benchmark 中 sync=0——WAL 只做 fwrite() 到 libc buffer,不调用 fsync()。而 LevelDB 即使在 sync=false 模式下,每次写入仍有 WriteBatch 编码、Group Commit 逻辑、log::Writer 分帧等额外开销。

换句话说,Our C 的写入速度来自更短的 hot path——db_put() 持锁后只做 WAL append + MemTable insert,没有 WriteBatch 分组协议。如果加上 fsync(),Our C 的写入会骤降到接近 LevelDB 水平。

Our Rust 写入为什么远慢于 Our C?

差异 Our C Our Rust
内存分配 Arena 顺序分配,零散 malloc 降为零 每个节点 Box::new() + Vec::new()
MemTable 容量检查 arena.bytes_used 直接读 approximate_memory() 虽已优化为 O(1),但仍有开销
WAL 写入 fwrite() 到 FILE buffer BufWriter + 每次 flush
Mutex 开销 pthread_mutex_lock 无毒化检查 Rust Mutex 包含 poisoning 检查

读路径差距(~20-30 倍 vs LevelDB)三者共同的瓶颈:

缺失优化 Our C / Our Rust LevelDB 的做法 估计影响
Table Cache 每次 Get() 打开文件 + 解析 Footer + 关闭 LRU 缓存已打开的 TableReader,命中时零 I/O 极高
Block Cache 每次读 Data Block 从磁盘解析 LRU 缓存已解析的 Data Block
Snappy 压缩 未压缩,SST 文件更大 Snappy 压缩减少磁盘读取量
pread() fopen() + fseek() + fread() 多次系统调用 pread() 单次系统调用直接定位读

Our C 和 Our Rust 的读性能几乎一致(~55K-60K ops/s),说明读路径的瓶颈不在语言层面,而在架构层面——没有 Table Cache 是致命的。

收获

通过三方对比我们观察到:

  1. hot path 长度决定写入吞吐。Our C 写入路径只有 WAL append + MemTable insert(无 WriteBatch 分组、无 fsync),LevelDB 的 Group Commit 和 log 分帧增加了延迟。Arena 分配器更是让 MemTable insert 开销降到最低。
  2. 读取优化的核心是缓存。Table Cache 和 Block Cache 不是”nice to have”,而是让读路径从万级跃升到百万级的核心优化——Our C 和 Our Rust 都缺少这两层缓存,读性能因此与 LevelDB 差距达 20-30 倍。
  3. 语言层面差异在读路径上不显著。Our C 和 Our Rust 的读性能几乎一致(~55K-60K ops/s),说明架构设计(有无 Cache)是决定性因素。
  4. Rust 写入慢的根因是内存分配策略。逐节点 Box::new() 对比 Arena 顺序分配差距显著;此外 Our Rust 的 BufWriter 每次 flush 和 Mutex poisoning 检查也有额外开销。给 Rust 版本加上 Arena(例如 bumpalo crate)是最直接的优化方向。

系列总结

五篇文章走完了从零构建 LSM-Tree 存储引擎的完整路径:

篇目 核心产出 代码量
第 1 篇:全景 架构心智模型 + 三种放大推导 概念图
第 2 篇:WAL + MemTable WAL 分片 + 跳表 + 崩溃恢复 ~200 行 C
第 3 篇:SSTable + Bloom Filter Data Block 前缀压缩 + 双重哈希 Bloom ~400 行 C
第 4 篇:Compaction MergeIterator + Version/MANIFEST + 策略对比 ~550 行 C
第 5 篇:完整引擎 + Rust(本文) DB API + 并发 + Snapshot + Iterator + Rust ~600 行 C + ~400 行 Rust

总计:~1,750 行 C + ~400 行 Rust,21+ 张 SVG 架构图。

如果你跟着系列走到了这里,你已经理解了:

进一步阅读


系列路线

→ 系列目录页

篇目 标题 核心产出
第 1 篇 LSM-Tree 全景 架构心智模型 + 三种放大的数学推导
第 2 篇 WAL + MemTable WAL 分片策略 + 跳表实现 + 崩溃恢复证明
第 3 篇 SSTable + Bloom Filter Data Block 前缀压缩 + Bloom Filter 双重哈希 + Builder/Reader
第 4 篇 Compaction 多路归并迭代器 + Version/MANIFEST + 策略对比
第 5 篇 完整引擎 + Rust 重写对比(本文) DB API + 并发 + Snapshot + Iterator + Rust 重写 + benchmark

延伸阅读:如果你对 Rust 所有权模型 vs C++ RAII 的深层设计差异感兴趣,可以看 Rust 所有权:C++ RAII 本来想成为的样子,那里从 C++ 侧拆解 RAII 的五个根本缺陷。


By .