前四篇分别实现了 WAL、MemTable、SSTable、Bloom Filter、Compaction 和 Version 管理。每个组件独立可测,但它们还不是一个可用的数据库——缺少统一的对外接口、读写并发控制和崩溃恢复组装。
本篇做两件事:
- Part A:用 C
把所有组件组装成完整引擎,提供
db_open/db_put/db_get/db_delete/db_iterator/db_snapshot六个公开 API。 - Part B:用 Rust 重写核心模块(SkipList + WAL + SSTable + DB 接口),记录 5 个”编译器不让我过”的真实故事,最后给出 C / Rust / LevelDB 三方性能对比。
前文回顾: 第 1 篇 建立了全景地图和三种放大推导;第 2 篇 实现了 WAL 和 MemTable;第 3 篇 实现了 SSTable Builder/Reader 和 Bloom Filter;第 4 篇 实现了 Compaction、MergeIterator 和 Version/MANIFEST。
Part A:C 完整引擎
DB 内部结构
上图是完整引擎的内部结构。核心思想是将前四篇实现的组件聚合到一个
DB
结构体中,并通过互斥锁和后台线程协调写入、Flush 与
Compaction。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/stat.h>
#include <dirent.h>
#include <unistd.h>
#include <zlib.h>
// ===== 前文 API 声明(第 2~4 篇已实现)=====
// WAL (Art.2): wal_writer_append(), wal_reader_read_record()
// MemTable (Art.2): memtable_put(), memtable_get(), SkipList, Arena
// SSTable (Art.3): table_builder_*(), table_reader_*(), bloom_*()
// Compaction (Art.4): flush_memtable_to_l0(), do_compaction(),
// compaction_score(), merge_iter_*(), version_*(), manifest_*()
#define kWriteBufferSize (4 * 1024 * 1024) // 4MB
#define kTypePut 1
#define kTypeDeletion 2
typedef struct Snapshot Snapshot;
typedef struct {
size_t write_buffer_size; // MemTable 大小阈值
int bloom_bits_per_key; // Bloom Filter 参数
int max_open_files; // 最大打开文件数
int sync; // 每次写入是否 fsync
} DB_Options;
typedef struct {
Snapshot *snap; // NULL = 读当前最新
} ReadOptions;
typedef struct {
int sync; // 是否对本次写入做 fsync
} WriteOptions;DB_Options
控制引擎行为——write_buffer_size 决定 MemTable
何时 freeze,bloom_bits_per_key 影响 Bloom
Filter 的误判率,sync 控制 WAL 是否每次
fsync。
DB 核心结构体
struct Snapshot {
uint64_t sequence; // 快照对应的序列号
Version *version; // 引用的 Version(ref_count++)
Snapshot *prev, *next;
};
typedef struct {
char db_dir[512];
DB_Options options;
// --- 内存层 ---
MemTable *mem; // Mutable MemTable(接收写入)
MemTable *imm; // Immutable MemTable(等待 flush)
// --- WAL ---
WALWriter wal;
uint64_t wal_file_num;
// --- 版本管理 ---
Version *current; // 当前活跃版本
FILE *manifest_fp; // MANIFEST 文件指针
uint64_t next_file_num; // 全局文件编号分配器
uint64_t last_seq; // 最新序列号
// --- 并发控制 ---
pthread_mutex_t mu; // 全局互斥锁
pthread_cond_t bg_cv; // 后台线程条件变量
pthread_cond_t write_cv; // 写阻塞等待条件变量
pthread_t bg_thread; // 后台 compaction 线程
int bg_running; // 后台线程是否在运行
int shutting_down;
// --- Snapshot 链表 ---
Snapshot snap_head; // 哨兵节点
} DB;几个设计要点:
mem/imm双缓冲:第 2 篇介绍了 Immutable MemTable 的概念。mem接收写入,imm等待后台 Flush。同一时刻最多只有一个imm。current指向活跃 Version:每次 Compaction 完成后,创建新 Version 并将current指向它。旧 Version 通过引用计数保持存活(Snapshot 可能引用它)。mu保护写路径:所有写操作(db_put、db_delete、freeze、MANIFEST 更新)都持有mu。读路径不需要全局锁,只需在获取current/mem/imm指针时短暂持锁。bg_cv/write_cv条件变量:后台线程在无工作时wait(bg_cv);如果写入时发现imm未 flush 完毕且mem又满了,写入线程wait(write_cv)阻塞,直到后台完成 Flush 后signal(write_cv)唤醒。
db_open():启动恢复
引擎启动需要恢复崩溃前的状态。恢复流程分三步:读 MANIFEST 恢复 Version → 重放 WAL 恢复内存数据 → 创建新 MemTable 和 WAL。
// 辅助:读 CURRENT 文件获取 MANIFEST 文件名
static int read_current_file(const char *db_dir, char *manifest_name, size_t cap) {
char path[512];
snprintf(path, sizeof(path), "%s/CURRENT", db_dir);
FILE *fp = fopen(path, "r");
if (!fp) return -1;
if (!fgets(manifest_name, (int)cap, fp)) { fclose(fp); return -1; }
// 去掉末尾换行
size_t len = strlen(manifest_name);
if (len > 0 && manifest_name[len - 1] == '\n') manifest_name[len - 1] = '\0';
fclose(fp);
return 0;
}
// 辅助:写 CURRENT 文件
static void write_current_file(const char *db_dir, const char *manifest_name) {
char tmp[512], path[512];
snprintf(tmp, sizeof(tmp), "%s/CURRENT.tmp", db_dir);
snprintf(path, sizeof(path), "%s/CURRENT", db_dir);
FILE *fp = fopen(tmp, "w");
fprintf(fp, "%s\n", manifest_name);
fflush(fp); fsync(fileno(fp)); fclose(fp);
rename(tmp, path);
}
int db_open(const char *db_dir, const DB_Options *opts, DB **out) {
DB *db = calloc(1, sizeof(DB));
snprintf(db->db_dir, sizeof(db->db_dir), "%s", db_dir);
db->options = *opts;
if (db->options.write_buffer_size == 0)
db->options.write_buffer_size = kWriteBufferSize;
mkdir(db_dir, 0755);
pthread_mutex_init(&db->mu, NULL);
pthread_cond_init(&db->bg_cv, NULL);
pthread_cond_init(&db->write_cv, NULL);
// 初始化 Snapshot 哨兵
db->snap_head.prev = &db->snap_head;
db->snap_head.next = &db->snap_head;
// --- 恢复 Version ---
db->current = calloc(1, sizeof(Version));
version_init(db->current);
char manifest_name[256] = {0};
int has_manifest = (read_current_file(db_dir, manifest_name, sizeof(manifest_name)) == 0);
if (has_manifest) {
char mpath[512];
snprintf(mpath, sizeof(mpath), "%s/%s", db_dir, manifest_name);
manifest_recover(mpath, db->current);
db->next_file_num = db->current->next_file_number;
} else {
db->next_file_num = 1;
}
// --- 重放 WAL ---
// 扫描目录,找到编号 >= next_file_num 的 WAL 文件(MANIFEST 之后的)
// 简化:查找所有 .wal 文件
DIR *dir = opendir(db_dir);
if (dir) {
struct dirent *ent;
while ((ent = readdir(dir)) != NULL) {
uint64_t fnum;
if (sscanf(ent->d_name, "%06lu.wal", &fnum) == 1) {
char wal_path[512];
snprintf(wal_path, sizeof(wal_path), "%s/%s", db_dir, ent->d_name);
MemTable *recovery_mem = memtable_new();
WALReader reader;
wal_reader_open(&reader, wal_path);
char *rec; size_t rec_len;
while (wal_reader_read_record(&reader, &rec, &rec_len) == 0) {
// 解码 KV record: [type(1) | klen(4) | key | vlen(4) | val]
uint8_t type = (uint8_t)rec[0];
uint32_t klen, vlen;
memcpy(&klen, rec + 1, 4);
const char *key = rec + 5;
memcpy(&vlen, rec + 5 + klen, 4);
const char *val = rec + 9 + klen;
memtable_put(recovery_mem, type, key, klen, val, vlen);
db->last_seq++;
}
wal_reader_close(&reader);
// Flush 恢复的数据到 L0
if (recovery_mem->table->count > 0) {
FileMetaData meta;
// 遍历 skiplist 构造 keys/vals 数组,flush 到 L0
// flush_memtable_to_l0() 复用第 4 篇实现
flush_skiplist_to_l0(db, recovery_mem, &meta);
VersionEdit ve;
version_edit_init(&ve);
version_edit_add_file(&ve, 0, &meta);
ve.next_file_number = db->next_file_num;
ve.has_next_file_number = 1;
version_apply(db->current, &ve);
}
memtable_destroy(recovery_mem);
}
}
closedir(dir);
}
// --- 创建新 MANIFEST(如果是新 DB 或需要 compact MANIFEST)---
if (!has_manifest) {
char mname[64];
snprintf(mname, sizeof(mname), "MANIFEST-%06lu", db->next_file_num++);
char mpath[512];
snprintf(mpath, sizeof(mpath), "%s/%s", db_dir, mname);
db->manifest_fp = fopen(mpath, "ab");
// 写入当前完整 Version 作为初始快照
VersionEdit ve;
version_edit_init(&ve);
for (int l = 0; l < MAX_LEVELS; l++) {
for (int f = 0; f < db->current->file_count[l]; f++) {
version_edit_add_file(&ve, l, &db->current->files[l][f]);
}
}
ve.next_file_number = db->next_file_num;
ve.has_next_file_number = 1;
manifest_write(db->manifest_fp, &ve);
fflush(db->manifest_fp); fsync(fileno(db->manifest_fp));
write_current_file(db_dir, mname);
} else {
char mpath[512];
snprintf(mpath, sizeof(mpath), "%s/%s", db_dir, manifest_name);
db->manifest_fp = fopen(mpath, "ab");
}
// --- 创建新 MemTable + WAL ---
db->wal_file_num = db->next_file_num++;
char wal_path[512];
snprintf(wal_path, sizeof(wal_path), "%s/%06lu.wal",
db_dir, (unsigned long)db->wal_file_num);
wal_writer_open(&db->wal, wal_path);
db->mem = memtable_new();
db->imm = NULL;
// --- 启动后台 Compaction 线程 ---
db->bg_running = 1;
pthread_create(&db->bg_thread, NULL, bg_compaction_thread, db);
*out = db;
return 0;
}恢复过程的关键在于顺序:先恢复 Version(确定有哪些 SSTable),再重放 WAL(恢复未持久化的内存数据)。这保证了崩溃后不丢数据——即使 MemTable 在崩溃时丢失,WAL 中的记录可以完整重建。
flush 辅助函数
flush_skiplist_to_l0() 将 MemTable
的跳表数据 flush 到 L0 SSTable,复用第 4 篇的
flush_memtable_to_l0():
// 遍历 skiplist,收集 key/value 数组,调用 flush_memtable_to_l0()
static void flush_skiplist_to_l0(DB *db, MemTable *m, FileMetaData *out) {
int count = m->table->count;
const char **keys = malloc(sizeof(char*) * count);
size_t *key_lens = malloc(sizeof(size_t) * count);
const char **vals = malloc(sizeof(char*) * count);
size_t *val_lens = malloc(sizeof(size_t) * count);
SkipListNode *node = m->table->header->forward[0];
int i = 0;
while (node && i < count) {
keys[i] = node->key;
key_lens[i] = node->key_len;
vals[i] = node->value;
val_lens[i] = node->value_len;
node = node->forward[0];
i++;
}
uint64_t fnum = db->next_file_num++;
flush_memtable_to_l0(db->db_dir, fnum, keys, key_lens,
vals, val_lens, count, out);
free(keys); free(key_lens);
free(vals); free(val_lens);
}db_put() / db_delete():写路径
写路径是整个引擎中最关键的串行化点。所有写操作必须持有
mu,保证 WAL 和 MemTable 的原子性。
// 编码 KV record: [type(1) | klen(4) | key | vlen(4) | val]
static size_t encode_kv_record(char *buf, uint8_t type,
const char *key, size_t klen,
const char *val, size_t vlen) {
buf[0] = (char)type;
memcpy(buf + 1, &klen, 4);
memcpy(buf + 5, key, klen);
uint32_t vl = (uint32_t)vlen;
memcpy(buf + 5 + klen, &vl, 4);
memcpy(buf + 9 + klen, val, vlen);
return 9 + klen + vlen;
}
static int make_room_for_write(DB *db) {
// 检查是否需要 freeze MemTable
while (1) {
if (db->mem->arena->bytes_used < db->options.write_buffer_size) {
return 0; // 空间充足
}
if (db->imm != NULL) {
// Immutable 还没 flush 完,阻塞等待
pthread_cond_wait(&db->write_cv, &db->mu);
continue;
}
// Freeze: mutable → immutable
db->imm = db->mem;
db->mem = memtable_new();
// 创建新 WAL
db->wal_file_num = db->next_file_num++;
char wal_path[512];
snprintf(wal_path, sizeof(wal_path), "%s/%06lu.wal",
db->db_dir, (unsigned long)db->wal_file_num);
wal_writer_close(&db->wal);
wal_writer_open(&db->wal, wal_path);
// 通知后台线程 flush
pthread_cond_signal(&db->bg_cv);
return 0;
}
}
int db_put(DB *db, const WriteOptions *opts,
const char *key, size_t klen,
const char *val, size_t vlen) {
pthread_mutex_lock(&db->mu);
int ret = make_room_for_write(db);
if (ret != 0) { pthread_mutex_unlock(&db->mu); return ret; }
// 1. WAL 追加
char record[4096];
size_t rlen = encode_kv_record(record, kTypePut, key, klen, val, vlen);
wal_writer_append(&db->wal, record, rlen);
if (opts && opts->sync) {
fflush(db->wal.fp);
fsync(fileno(db->wal.fp));
}
// 2. MemTable 写入
uint64_t seq = ++db->last_seq;
memtable_put(db->mem, kTypePut, key, klen, val, vlen);
pthread_mutex_unlock(&db->mu);
return 0;
}
int db_delete(DB *db, const WriteOptions *opts,
const char *key, size_t klen) {
pthread_mutex_lock(&db->mu);
int ret = make_room_for_write(db);
if (ret != 0) { pthread_mutex_unlock(&db->mu); return ret; }
char record[4096];
size_t rlen = encode_kv_record(record, kTypeDeletion, key, klen, "", 0);
wal_writer_append(&db->wal, record, rlen);
if (opts && opts->sync) {
fflush(db->wal.fp);
fsync(fileno(db->wal.fp));
}
++db->last_seq;
memtable_put(db->mem, kTypeDeletion, key, klen, "", 0);
pthread_mutex_unlock(&db->mu);
return 0;
}make_room_for_write()
是写路径的调度核心:
- 如果 MemTable 空间充足 → 直接返回。
- 如果 MemTable 已满但
imm仍在 flush → 阻塞等待(write_cv)。 - 如果 MemTable 已满且
imm == NULL→ freeze:当前mem变为imm,新建空mem和新 WAL,通知后台 flush。
这就是 第 4 篇提到的
LevelDB 设计限制——同一时刻最多一个 Immutable
MemTable。RocksDB 通过 max_write_buffer_number
允许多个 immutable 排队。
db_get():读路径
读路径不需要持有全局锁——只需短暂加锁获取
mem、imm、current
的指针,然后释放锁,在锁外执行实际读取。这是 LevelDB
读性能高的关键。
int db_get(DB *db, const ReadOptions *opts,
const char *key, size_t klen,
char **val_out, size_t *vlen_out) {
pthread_mutex_lock(&db->mu);
// 快照:获取当前状态的指针
MemTable *mem = db->mem;
MemTable *imm = db->imm;
Version *ver = db->current;
ver->ref_count++; // 防止被 compaction 释放
uint64_t snapshot_seq = db->last_seq;
if (opts && opts->snap)
snapshot_seq = opts->snap->sequence;
pthread_mutex_unlock(&db->mu);
// 从此处开始不持锁
int found = 0;
char *val = NULL;
size_t vlen = 0;
// 1. 查 Mutable MemTable
int ret = memtable_get(mem, key, klen, &val, &vlen);
if (ret == 1) { found = 1; goto done; }
if (ret == -1) { found = -1; goto done; } // Tombstone
// 2. 查 Immutable MemTable
if (imm) {
ret = memtable_get(imm, key, klen, &val, &vlen);
if (ret == 1) { found = 1; goto done; }
if (ret == -1) { found = -1; goto done; }
}
// 3. 查 L0(从新到旧逐文件查找)
for (int i = ver->file_count[0] - 1; i >= 0; i--) {
FileMetaData *f = &ver->files[0][i];
// 快速范围检查
if (memcmp(key, f->smallest, klen < (size_t)f->smallest_len ? klen : f->smallest_len) < 0)
continue;
if (memcmp(key, f->largest, klen < (size_t)f->largest_len ? klen : f->largest_len) > 0)
continue;
char fpath[512];
snprintf(fpath, sizeof(fpath), "%s/%06lu.sst",
db->db_dir, (unsigned long)f->number);
TableReader tr;
if (table_reader_open(&tr, fpath) == 0) {
char *v; int vl;
if (table_reader_get(&tr, key, klen, &v, &vl) == 0) {
val = v; vlen = (size_t)vl;
found = 1;
table_reader_close(&tr);
goto done;
}
table_reader_close(&tr);
}
}
// 4. 查 L1 ~ L6(每层二分定位到最多一个文件)
for (int level = 1; level < MAX_LEVELS; level++) {
if (ver->file_count[level] == 0) continue;
// 二分查找:找到 largest >= key 的最小文件
int lo = 0, hi = ver->file_count[level] - 1, target = -1;
while (lo <= hi) {
int mid = (lo + hi) / 2;
FileMetaData *f = &ver->files[level][mid];
if (memcmp(f->largest, key, (size_t)f->largest_len < klen ? f->largest_len : klen) < 0)
lo = mid + 1;
else {
target = mid;
hi = mid - 1;
}
}
if (target < 0) continue;
FileMetaData *f = &ver->files[level][target];
if (memcmp(key, f->smallest, klen < (size_t)f->smallest_len ? klen : f->smallest_len) < 0)
continue;
char fpath[512];
snprintf(fpath, sizeof(fpath), "%s/%06lu.sst",
db->db_dir, (unsigned long)f->number);
TableReader tr;
if (table_reader_open(&tr, fpath) == 0) {
char *v; int vl;
if (table_reader_get(&tr, key, klen, &v, &vl) == 0) {
val = v; vlen = (size_t)vl;
found = 1;
table_reader_close(&tr);
goto done;
}
table_reader_close(&tr);
}
}
done:
// 释放 Version 引用
pthread_mutex_lock(&db->mu);
ver->ref_count--;
pthread_mutex_unlock(&db->mu);
if (found == 1) {
*val_out = val;
*vlen_out = vlen;
return 0;
}
return -1; // not found
}读路径的几个关键点:
- 锁外读取:持锁只做指针拷贝和
ref_count++,实际 I/O 在锁外完成,不阻塞写路径。 - L0 逐文件查找:L0 文件 key
范围可能重叠,必须从最新文件开始逐个检查。Bloom Filter 在
table_reader_get()内部做第一道过滤。 - L1+ 二分定位:同层文件 key 不重叠且有序排列,二分查找最多定位到 1 个文件。
- Snapshot 支持:如果
ReadOptions携带snapshot,使用 snapshot 的sequence过滤——只返回 \(seq \leq snapshot\_seq\) 的数据。本文的简化实现将 snapshot 过滤留在memtable_get()和table_reader_get()的 InternalKey 比较中。
Snapshot
Snapshot 提供一致性读——在 Snapshot 创建时刻的数据视图不会被后续的写入和 Compaction 影响。
Snapshot *db_get_snapshot(DB *db) {
pthread_mutex_lock(&db->mu);
Snapshot *s = calloc(1, sizeof(Snapshot));
s->sequence = db->last_seq;
s->version = db->current;
s->version->ref_count++;
// 插入到链表尾
s->prev = db->snap_head.prev;
s->next = &db->snap_head;
db->snap_head.prev->next = s;
db->snap_head.prev = s;
pthread_mutex_unlock(&db->mu);
return s;
}
void db_release_snapshot(DB *db, Snapshot *s) {
pthread_mutex_lock(&db->mu);
// 从链表移除
s->prev->next = s->next;
s->next->prev = s->prev;
s->version->ref_count--;
// 如果 ref_count == 0 且不是 current,可以释放
// 简化实现中不做立即释放,留给 compaction 清理
free(s);
pthread_mutex_unlock(&db->mu);
}
// 获取所有 Snapshot 中最小的 sequence
// Compaction 时用它判断 tombstone 是否可以删除
static uint64_t smallest_snapshot_seq(DB *db) {
if (db->snap_head.next == &db->snap_head)
return db->last_seq; // 无 snapshot,全部数据可清理
return db->snap_head.next->sequence;
}Snapshot 的实现非常轻量——只记录一个序列号和一个 Version 引用:
- 引用计数:Snapshot 持有 Version 的
ref_count,保证该 Version 引用的所有 SSTable 不会被删除。 - 序列号:
db_get(snapshot)只返回 \(seq \leq snapshot\_seq\) 的数据,后续写入对 Snapshot 不可见。 - 链表管理:双向链表使得
smallest_snapshot_seq()可以 \(O(1)\) 获取最老的 Snapshot——Compaction 用它决定哪些 tombstone 可以安全删除。
DBIterator
DBIterator 将 MemTable、Immutable MemTable 和所有层级的
SSTable 统一为一个有序遍历接口。核心是复用第 4 篇的
MergeIterator。
typedef struct {
MergeIterator merge;
uint64_t snapshot_seq; // 快照序列号
Version *version; // 引用的 Version
// 去重状态
char prev_user_key[256];
int prev_user_key_len;
int valid;
// 当前输出
const char *cur_key;
size_t cur_key_len;
const char *cur_val;
size_t cur_val_len;
// 打开的 TableReader(需要在 free 时关闭)
TableReader *readers;
int num_readers;
DB *db;
} DBIterator;
DBIterator *db_iterator_new(DB *db, const ReadOptions *opts) {
pthread_mutex_lock(&db->mu);
DBIterator *it = calloc(1, sizeof(DBIterator));
it->db = db;
it->version = db->current;
it->version->ref_count++;
it->snapshot_seq = opts && opts->snap ? opts->snap->sequence : db->last_seq;
// 收集所有输入源
Version *ver = it->version;
int total_files = 0;
for (int l = 0; l < MAX_LEVELS; l++)
total_files += ver->file_count[l];
it->readers = calloc(total_files, sizeof(TableReader));
it->num_readers = 0;
// 打开所有 SSTable
TableReader *open_readers = calloc(total_files, sizeof(TableReader));
int n = 0;
for (int l = 0; l < MAX_LEVELS; l++) {
for (int f = 0; f < ver->file_count[l]; f++) {
char fpath[512];
snprintf(fpath, sizeof(fpath), "%s/%06lu.sst",
db->db_dir, (unsigned long)ver->files[l][f].number);
if (table_reader_open(&open_readers[n], fpath) == 0) {
it->readers[n] = open_readers[n];
n++;
}
}
}
it->num_readers = n;
// 初始化 MergeIterator
merge_iter_init(&it->merge, open_readers, n);
pthread_mutex_unlock(&db->mu);
free(open_readers);
return it;
}
// 内部:跳过重复 key 和 tombstone,前进到下一个有效条目
static void iter_advance(DBIterator *it) {
while (merge_iter_valid(&it->merge)) {
const char *key = merge_iter_key(&it->merge);
size_t klen = merge_iter_key_len(&it->merge);
// 提取 user_key(去掉 InternalKey 的 8 字节后缀)
// 简化实现:假设 key 就是 user_key
const char *user_key = key;
size_t user_key_len = klen;
// 去重:如果和上一个输出的 user_key 相同,跳过
if (it->prev_user_key_len > 0 &&
user_key_len == (size_t)it->prev_user_key_len &&
memcmp(user_key, it->prev_user_key, user_key_len) == 0) {
merge_iter_next(&it->merge);
continue;
}
// TODO: 检查 type 是否为 deletion(tombstone),如果是则跳过
// 简化实现中直接输出
it->cur_key = key;
it->cur_key_len = klen;
it->cur_val = merge_iter_value(&it->merge);
it->cur_val_len = merge_iter_value_len(&it->merge);
it->valid = 1;
memcpy(it->prev_user_key, user_key, user_key_len);
it->prev_user_key_len = (int)user_key_len;
return;
}
it->valid = 0;
}
void db_iter_seek_to_first(DBIterator *it) {
// MergeIterator 在 init 时已经定位到最小元素
it->prev_user_key_len = 0;
iter_advance(it);
}
void db_iter_next(DBIterator *it) {
merge_iter_next(&it->merge);
iter_advance(it);
}
int db_iter_valid(DBIterator *it) { return it->valid; }
const char *db_iter_key(DBIterator *it) { return it->cur_key; }
size_t db_iter_key_len(DBIterator *it) { return it->cur_key_len; }
const char *db_iter_value(DBIterator *it) { return it->cur_val; }
size_t db_iter_value_len(DBIterator *it) { return it->cur_val_len; }
void db_iter_free(DBIterator *it) {
merge_iter_free(&it->merge);
for (int i = 0; i < it->num_readers; i++)
table_reader_close(&it->readers[i]);
free(it->readers);
pthread_mutex_lock(&it->db->mu);
it->version->ref_count--;
pthread_mutex_unlock(&it->db->mu);
free(it);
}DBIterator 的去重逻辑值得注意:MergeIterator
输出的是所有层级的 key-value 对(包括同一 key
的多个版本),DBIterator 在其上做去重——同一
user_key 只输出 sequence
最大(最新)的版本。如果最新版本是
tombstone(kTypeDeletion),整个 key
被跳过。
后台 Compaction 线程
后台线程负责两类工作:Flush(Immutable MemTable → L0)和 Major Compaction(Ln → Ln+1 归并)。
static void bg_flush(DB *db) {
// 将 Immutable MemTable flush 到 L0
MemTable *imm = db->imm;
FileMetaData meta;
// 在锁外做 I/O
pthread_mutex_unlock(&db->mu);
flush_skiplist_to_l0(db, imm, &meta);
pthread_mutex_lock(&db->mu);
// 更新 MANIFEST
VersionEdit ve;
version_edit_init(&ve);
version_edit_add_file(&ve, 0, &meta);
ve.next_file_number = db->next_file_num;
ve.has_next_file_number = 1;
manifest_write(db->manifest_fp, &ve);
fflush(db->manifest_fp);
fsync(fileno(db->manifest_fp));
version_apply(db->current, &ve);
// 清理 Immutable MemTable 和旧 WAL
memtable_destroy(imm);
db->imm = NULL;
// 唤醒阻塞的写入线程
pthread_cond_signal(&db->write_cv);
}
static void bg_major_compaction(DB *db) {
// 选择需要 compaction 的层级
int best_level = -1;
int file_counts[MAX_LEVELS];
uint64_t level_bytes[MAX_LEVELS];
for (int l = 0; l < MAX_LEVELS; l++) {
file_counts[l] = db->current->file_count[l];
level_bytes[l] = 0;
for (int f = 0; f < file_counts[l]; f++)
level_bytes[l] += db->current->files[l][f].file_size;
}
double score = compaction_score(file_counts, level_bytes, &best_level);
if (score < 1.0 || best_level < 0) return;
// 收集输入文件路径
int target_level = best_level + 1;
if (target_level >= MAX_LEVELS) return;
int num_inputs = db->current->file_count[best_level];
if (num_inputs == 0) return;
const char **input_paths = malloc(sizeof(char*) * num_inputs);
char (*path_bufs)[512] = malloc(sizeof(char[512]) * num_inputs);
for (int i = 0; i < num_inputs; i++) {
snprintf(path_bufs[i], 512, "%s/%06lu.sst",
db->db_dir,
(unsigned long)db->current->files[best_level][i].number);
input_paths[i] = path_bufs[i];
}
// 在锁外执行 compaction I/O
pthread_mutex_unlock(&db->mu);
CompactionState cs = {0};
do_compaction(input_paths, num_inputs, db->db_dir,
&db->next_file_num, target_level, &cs);
pthread_mutex_lock(&db->mu);
// 更新 Version
VersionEdit ve;
version_edit_init(&ve);
for (int i = 0; i < num_inputs; i++)
version_edit_remove_file(&ve, best_level,
db->current->files[best_level][i].number);
for (int i = 0; i < cs.num_outputs; i++)
version_edit_add_file(&ve, target_level, &cs.outputs[i]);
ve.next_file_number = db->next_file_num;
ve.has_next_file_number = 1;
manifest_write(db->manifest_fp, &ve);
fflush(db->manifest_fp); fsync(fileno(db->manifest_fp));
version_apply(db->current, &ve);
free(input_paths);
free(path_bufs);
free(cs.outputs);
}
static void *bg_compaction_thread(void *arg) {
DB *db = (DB *)arg;
pthread_mutex_lock(&db->mu);
while (!db->shutting_down) {
// 有 Immutable MemTable 要 flush
if (db->imm != NULL) {
bg_flush(db);
continue; // flush 完继续检查
}
// 检查是否需要 major compaction
bg_major_compaction(db);
// 没事做了,等待信号
pthread_cond_wait(&db->bg_cv, &db->mu);
}
pthread_mutex_unlock(&db->mu);
return NULL;
}后台线程的关键模式是锁内决策、锁外 I/O:
- 持锁时读取
imm、current等状态做决策。 - 释放锁后执行耗时的磁盘写操作(flush、compaction)。
- 重新加锁后更新 MANIFEST 和 Version。
这种模式确保了 I/O 不阻塞写入路径,同时保证了状态更新的原子性。
db_close()
int db_close(DB *db) {
pthread_mutex_lock(&db->mu);
// 通知后台线程退出
db->shutting_down = 1;
pthread_cond_signal(&db->bg_cv);
pthread_mutex_unlock(&db->mu);
// 等待后台线程结束
pthread_join(db->bg_thread, NULL);
// Flush 当前 MemTable(如果非空)
if (db->mem && db->mem->table->count > 0) {
FileMetaData meta;
flush_skiplist_to_l0(db, db->mem, &meta);
VersionEdit ve;
version_edit_init(&ve);
version_edit_add_file(&ve, 0, &meta);
ve.next_file_number = db->next_file_num;
ve.has_next_file_number = 1;
manifest_write(db->manifest_fp, &ve);
fflush(db->manifest_fp);
fsync(fileno(db->manifest_fp));
}
// 关闭 WAL、MANIFEST
wal_writer_close(&db->wal);
if (db->manifest_fp) fclose(db->manifest_fp);
// 释放 MemTable
if (db->mem) memtable_destroy(db->mem);
if (db->imm) memtable_destroy(db->imm);
// 释放 Version
if (db->current) {
for (int l = 0; l < MAX_LEVELS; l++)
free(db->current->files[l]);
free(db->current);
}
// 清理同步原语
pthread_mutex_destroy(&db->mu);
pthread_cond_destroy(&db->bg_cv);
pthread_cond_destroy(&db->write_cv);
free(db);
return 0;
}db_close() 的顺序:先停后台线程 → flush
残留数据 → 关闭文件 →
释放内存。保证关闭前所有数据都已持久化。
完整 Demo
把所有组件串起来,演示完整生命周期:
int main(void) {
const char *db_path = "/tmp/lsm_engine_demo";
// === 打开 DB ===
DB_Options opts = {
.write_buffer_size = 4096, // 小阈值便于触发 flush
.bloom_bits_per_key = 10,
.sync = 0,
};
DB *db;
int ret = db_open(db_path, &opts, &db);
printf("db_open: %s\n", ret == 0 ? "OK" : "FAIL");
// === 写入 200 条 ===
WriteOptions wopts = {.sync = 0};
for (int i = 0; i < 200; i++) {
char key[32], val[64];
snprintf(key, sizeof(key), "key:%06d", i);
snprintf(val, sizeof(val), "value_%d_initial", i);
db_put(db, &wopts, key, strlen(key), val, strlen(val));
}
printf("Wrote 200 keys.\n");
// === 点查验证 ===
ReadOptions ropts = {.snap = NULL};
char *val; size_t vlen;
ret = db_get(db, &ropts, "key:000042", 10, &val, &vlen);
printf("Get key:000042 -> %.*s (ret=%d)\n", (int)vlen, val, ret);
if (ret == 0) free(val);
// === 创建 Snapshot ===
Snapshot *snap = db_get_snapshot(db);
printf("Snapshot created at seq=%lu\n", (unsigned long)snap->sequence);
// === 覆盖写入 ===
for (int i = 0; i < 50; i++) {
char key[32], val[64];
snprintf(key, sizeof(key), "key:%06d", i);
snprintf(val, sizeof(val), "value_%d_UPDATED", i);
db_put(db, &wopts, key, strlen(key), val, strlen(val));
}
printf("Updated first 50 keys.\n");
// === Snapshot 读到旧值 ===
ReadOptions snap_ropts = {.snap = snap};
ret = db_get(db, &snap_ropts, "key:000010", 10, &val, &vlen);
printf("Snapshot get key:000010 -> %.*s\n", (int)vlen, val);
if (ret == 0) free(val);
// 当前读到新值
ret = db_get(db, &ropts, "key:000010", 10, &val, &vlen);
printf("Current get key:000010 -> %.*s\n", (int)vlen, val);
if (ret == 0) free(val);
db_release_snapshot(db, snap);
// === Delete 测试 ===
db_delete(db, &wopts, "key:000099", 10);
ret = db_get(db, &ropts, "key:000099", 10, &val, &vlen);
printf("After delete, get key:000099 -> %s\n",
ret == 0 ? "found (unexpected)" : "not found (correct)");
// === Iterator 扫描 ===
DBIterator *it = db_iterator_new(db, &ropts);
db_iter_seek_to_first(it);
int scan_count = 0;
while (db_iter_valid(it)) {
scan_count++;
db_iter_next(it);
}
printf("Iterator scanned %d unique keys.\n", scan_count);
db_iter_free(it);
// === 关闭 + 重新打开(恢复验证)===
db_close(db);
printf("DB closed.\n");
ret = db_open(db_path, &opts, &db);
printf("db_open (reopen): %s\n", ret == 0 ? "OK" : "FAIL");
ret = db_get(db, &ropts, "key:000042", 10, &val, &vlen);
printf("After reopen, get key:000042 -> %.*s (ret=%d)\n",
(int)vlen, val, ret);
if (ret == 0) free(val);
ret = db_get(db, &ropts, "key:000099", 10, &val, &vlen);
printf("After reopen, get key:000099 -> %s\n",
ret == 0 ? "found" : "not found (delete survived)");
// === 打印统计 ===
printf("\n--- DB Stats ---\n");
for (int l = 0; l < MAX_LEVELS; l++) {
if (db->current->file_count[l] > 0) {
uint64_t total_bytes = 0;
for (int f = 0; f < db->current->file_count[l]; f++)
total_bytes += db->current->files[l][f].file_size;
printf("Level %d: %d files, %lu bytes\n",
l, db->current->file_count[l], (unsigned long)total_bytes);
}
}
printf("next_file_num: %lu, last_seq: %lu\n",
(unsigned long)db->next_file_num, (unsigned long)db->last_seq);
db_close(db);
printf("Done.\n");
return 0;
}预期输出:
db_open: OK
Wrote 200 keys.
Get key:000042 -> value_42_initial (ret=0)
Snapshot created at seq=200
Updated first 50 keys.
Snapshot get key:000010 -> value_10_initial
Current get key:000010 -> value_10_UPDATED
After delete, get key:000099 -> not found (correct)
Iterator scanned 199 unique keys.
DB closed.
db_open (reopen): OK
After reopen, get key:000042 -> value_42_initial (ret=0)
After reopen, get key:000099 -> not found (delete survived)
--- DB Stats ---
Level 0: 2 files, 8194 bytes
Level 1: 1 files, 12842 bytes
next_file_num: 8, last_seq: 251
Done.
小结:Part A 完成了从零到可用的完整引擎。所有 C 代码复用了前四篇的函数——
wal_writer_append()、memtable_put/get()、table_builder/reader_*()、flush_memtable_to_l0()、do_compaction()、version_apply()、manifest_write/recover()。DB结构体将它们聚合起来,pthread_mutex保护写路径,ref_count保护读路径,后台线程异步执行 Flush 和 Compaction。
Part B:Rust 重写
这一部分用 Rust 重写 LSM-Tree 的核心模块——SkipList、WAL、SSTable 和 DB 接口(不含 Compaction 调度)。重点不是逐行翻译,而是观察 Rust 的类型系统和所有权模型如何消除 C 中的常见 bug。
项目结构
examples/lsm-tree-rs/
├── Cargo.toml
├── src/
│ ├── lib.rs
│ ├── skiplist.rs
│ ├── wal.rs
│ ├── block.rs
│ ├── bloom.rs
│ ├── sstable.rs
│ └── db.rs
└── benches/
└── bench_main.rs
Cargo.toml:
[package]
name = "lsm-tree"
version = "0.1.0"
edition = "2021"
[dependencies]
crc32fast = "1.4"
memmap2 = "0.9"
[dev-dependencies]
tempfile = "3"故事 1:Arena 的裸指针——SkipList
C 的 SkipList 使用 Arena
分配器管理节点内存——arena_alloc()
返回裸指针,所有节点的生命周期与 Arena 绑定。Rust
不允许这样做。
C 写法:
SkipListNode *node = arena_alloc(arena, sizeof(SkipListNode) + level * sizeof(void*));
node->forward[0] = header->forward[0]; // 裸指针操作
header->forward[0] = node;Rust 直译 → 编译器拒绝:
// error[E0499]: cannot borrow `arena` as mutable more than once
let node = arena.alloc(layout);
let header = arena.get_header(); // 二次可变借用Rust 不允许同时持有对同一结构的多个可变引用。Arena
分配器内部混用了 &mut self(分配)和
&self(读取),borrow checker 不允许。
正确写法——用
Vec<Box<Node>> 替代 Arena,让 Rust
自动管理每个节点的分配和释放:
pub struct SkipList {
header: Box<Node>,
nodes: Vec<Box<Node>>, // 所有节点的所有权
level: usize,
len: usize,
}
struct Node {
key: Vec<u8>,
value: Vec<u8>,
forward: Vec<Option<*mut Node>>, // unsafe: raw pointers for O(1) link
}
impl SkipList {
pub fn insert(&mut self, key: &[u8], value: &[u8]) {
let level = self.random_level();
let mut new_node = Box::new(Node {
key: key.to_vec(),
value: value.to_vec(),
forward: vec![None; level + 1],
});
let node_ptr: *mut Node = &mut *new_node;
// 找到插入点
let mut update = vec![std::ptr::null_mut::<Node>(); level + 1];
let mut curr = &mut *self.header as *mut Node;
for i in (0..=level).rev() {
unsafe {
while let Some(next) = (*curr).forward[i] {
if (*next).key.as_slice() < key {
curr = next;
} else { break; }
}
update[i] = curr;
}
}
// 插入链接
for i in 0..=level {
unsafe {
new_node.forward[i] = (*update[i]).forward[i];
(*update[i]).forward[i] = Some(node_ptr);
}
}
self.nodes.push(new_node); // 转移所有权到 Vec
self.len += 1;
}
pub fn get(&self, key: &[u8]) -> Option<&[u8]> {
let mut curr = &*self.header as *const Node;
for i in (0..self.header.forward.len()).rev() {
unsafe {
while let Some(next) = (*curr).forward[i] {
if (*(next as *const Node)).key.as_slice() < key {
curr = next as *const Node;
} else { break; }
}
}
}
unsafe {
if let Some(next) = (*curr).forward[0] {
if (*(next as *const Node)).key.as_slice() == key {
return Some(&(*(next as *const Node)).value);
}
}
}
None
}
}安全收益:节点的所有权归
Vec<Box<Node>>,SkipList 被 drop
时所有节点自动释放——不可能忘记
arena_destroy()。unsafe
块被限制在指针操作内,Rust 保证了内存不会泄漏。
故事 2:柔性数组的替代方案
C 的 SkipListNode 末尾有
forward[] 柔性数组:
typedef struct SkipListNode {
char *key, *value;
int level;
struct SkipListNode *forward[]; // C99 flexible array member
} SkipListNode;Rust 没有柔性数组成员。 直接翻译不可能:
// error: the size for values of type `[*mut Node]` cannot be known at compilation time
struct Node {
key: Vec<u8>,
forward: [*mut Node], // unsized type
}解决方案:Vec<Option<*mut Node>>
在构造时指定长度。这在语义上等价——C 的
arena_alloc(sizeof(Node) + level * sizeof(void*))
对应 Rust 的
vec![None; level + 1]。代价是多一层间接寻址(Vec
指向堆),但实际性能差异收敛于噪声范围内。
故事 3:Mmap 的生命周期——SSTable Reader
C 的 SSTable Reader 用 mmap() 映射文件:
void *data = mmap(NULL, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
close(fd); // 关了 fd 之后 mmap 仍然有效(OS 保证)
// ... 在 data 上做读操作
munmap(data, file_size); // 手动释放Rust Mmap 的生命周期问题:
// error[E0597]: `file` does not live long enough
fn open_sst(path: &str) -> Mmap {
let file = File::open(path).unwrap();
let mmap = unsafe { Mmap::map(&file).unwrap() }; // borrows &file
mmap // file 在这里被 drop → mmap 的借用悬空
}memmap2::Mmap::map() 接受
&File——borrow checker 要求
File 的生命周期必须至少和 Mmap
一样长。
正确写法:将 File 和
Mmap 绑定在同一个结构体中,利用 Rust struct
字段的 drop 顺序(后声明的字段先 drop)保证
Mmap 在 File 之前被释放:
pub struct SstReader {
_file: File, // 先声明 → 后 drop(保活)
mmap: Mmap, // 后声明 → 先 drop
index_offset: usize,
bloom: Option<BloomFilter>,
}
impl SstReader {
pub fn open(path: &Path) -> io::Result<Self> {
let file = File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? };
// 解析 Footer(最后 48 字节)
let footer_start = mmap.len() - FOOTER_SIZE;
let (index_handle, meta_handle) = decode_footer(&mmap[footer_start..])?;
// 加载 Bloom Filter
let bloom = if meta_handle.size > 0 {
Some(BloomFilter::from_bytes(
&mmap[meta_handle.offset as usize..(meta_handle.offset + meta_handle.size) as usize]
))
} else { None };
Ok(Self {
_file: file,
mmap,
index_offset: index_handle.offset as usize,
bloom,
})
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
// Bloom Filter 快速排除
if let Some(ref bf) = self.bloom {
if !bf.may_match(key) { return None; }
}
// Index Block → Data Block → 二分查找
self.search_index(key)
}
}安全收益:C 需要手动匹配
mmap()/munmap(),Rust 的 Drop
trait 自动处理。而且如果你试图让 Mmap 活得比
File
长,编译器直接报错——不可能出现悬空指针。
故事 4:BlockBuilder 的借用冲突
C 的 BlockBuilder 在写入时持有上一个 key
的副本用于计算共享前缀:
void block_builder_add(BlockBuilder *bb, const char *key, size_t key_len, ...) {
int shared = common_prefix_len(bb->last_key, bb->last_key_len, key, key_len);
// ... 写入 shared/non-shared/value_len 和数据
memcpy(bb->last_key, key, key_len);
bb->last_key_len = key_len;
}Rust 直译:
// error[E0502]: cannot borrow `self` as mutable because it is also borrowed as immutable
fn add(&mut self, key: &[u8], value: &[u8]) {
let shared = common_prefix(&self.last_key, key); // immutable borrow
self.data.extend_from_slice(...); // mutable borrow
self.last_key = key.to_vec(); // mutable borrow
}问题:common_prefix(&self.last_key, ...)
创建了对 self 的不可变引用,而后续
self.data.extend_from_slice(...)
需要可变引用。Rust 不允许同时存在可变和不可变引用。
解决方案——先计算 shared
长度(纯数值),再做写操作:
pub struct BlockBuilder {
data: Vec<u8>,
restarts: Vec<u32>,
last_key: Vec<u8>,
entry_count: usize,
}
impl BlockBuilder {
pub fn add(&mut self, key: &[u8], value: &[u8]) {
let shared = if self.entry_count % RESTART_INTERVAL == 0 {
self.restarts.push(self.data.len() as u32);
0
} else {
common_prefix_len(&self.last_key, key) // 返回 usize,不保持借用
};
let non_shared = key.len() - shared;
// Encode shared | non_shared | value_len | key[shared..] | value
self.data.extend_from_slice(&encode_varint32(shared as u32));
self.data.extend_from_slice(&encode_varint32(non_shared as u32));
self.data.extend_from_slice(&encode_varint32(value.len() as u32));
self.data.extend_from_slice(&key[shared..]);
self.data.extend_from_slice(value);
self.last_key.clear();
self.last_key.extend_from_slice(key);
self.entry_count += 1;
}
}
fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
}安全收益:borrow checker
迫使我们将”读旧数据”和”写新数据”分开。这不仅避免了潜在的迭代器失效(C
中 memcpy 到 last_key
可能在某些场景覆盖正在读的数据),还使代码逻辑更清晰。
故事 5:DB 的 Send + Sync——跨线程共享
C 的 DB 结构体可以在线程间随意传递指针:
pthread_create(&thread, NULL, bg_compaction_thread, db); // 直接传裸指针Rust 编译器要求 DB 实现
Send + Sync:
// error[E0277]: `*mut Node` cannot be sent between threads safely
struct DB {
mem: SkipList, // contains *mut Node → not Send
current: Version,
}SkipList 内部有 *mut Node
裸指针,裸指针既不是 Send 也不是
Sync。跨线程使用 DB
需要手动声明安全保证:
pub struct DB {
inner: Arc<Mutex<DBInner>>,
}
struct DBInner {
mem: SkipList,
imm: Option<SkipList>,
current: Arc<Version>,
wal: WalWriter,
next_file_num: u64,
last_seq: u64,
options: DBOptions,
db_dir: PathBuf,
}
// 手动声明:我们保证 DBInner 在 Mutex 保护下的跨线程安全
unsafe impl Send for DBInner {}
impl DB {
pub fn open(path: impl AsRef<Path>, opts: DBOptions) -> io::Result<Self> {
// ... 恢复逻辑,与 C 版本类似
let inner = DBInner {
mem: SkipList::new(),
imm: None,
current: Arc::new(Version::new()),
wal: WalWriter::new(&wal_path)?,
next_file_num: 1,
last_seq: 0,
options: opts,
db_dir: path.as_ref().to_path_buf(),
};
Ok(DB {
inner: Arc::new(Mutex::new(inner)),
})
}
pub fn put(&self, key: &[u8], value: &[u8]) -> io::Result<()> {
let mut inner = self.inner.lock().unwrap();
// WAL 追加 + MemTable 写入
inner.wal.append(key, value)?;
inner.last_seq += 1;
inner.mem.insert(key, value);
Ok(())
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
let inner = self.inner.lock().unwrap();
// 查 MemTable
if let Some(val) = inner.mem.get(key) {
return Some(val.to_vec());
}
// 查 Immutable
if let Some(ref imm) = inner.imm {
if let Some(val) = imm.get(key) {
return Some(val.to_vec());
}
}
// 查 SSTable(通过 Version)
let version = inner.current.clone(); // Arc clone → 锁外读
drop(inner);
version.get(key)
}
}安全收益:Arc<Mutex<DBInner>>
确保了: - Arc → 多线程安全的引用计数(对应 C
的手动 ref_count) - Mutex →
自动锁定/解锁(对应 C 的
pthread_mutex_lock/unlock) - 如果忘记声明
Send,编译器拒绝跨线程使用——不可能出现数据竞争
C vs Rust 并发安全对比:
| 场景 | C | Rust |
|---|---|---|
| 忘记加锁 | 运行时数据竞争(未定义行为) | 编译错误:Mutex 保护的数据无法裸访问 |
| 忘记解锁 | 死锁 | 自动解锁:MutexGuard drop 时释放 |
忘记
ref_count++ |
use-after-free | 编译错误:Arc::clone()
是唯一的共享方式 |
| 裸指针跨线程 | 数据竞争 | 编译错误:*mut T
不是 Send |
小结:5 个故事覆盖了 LSM-Tree 实现中最常见的安全隐患——内存泄漏(Arena)、悬空指针(Mmap 生命周期)、迭代器失效(借用冲突)、数据竞争(Send/Sync)。Rust 的类型系统在编译期捕获了所有这些问题。代价是需要更多前期设计思考,以及在跳表等底层数据结构中使用
unsafe。
Part C:性能对比
Benchmark 设置
对比对象——Part A 的 C 实现、Part B 的 Rust 实现、以及
LevelDB 1.23。C
实现已从四篇文章中的教学代码整合为可编译的独立项目
examples/lsm-tree-c/(含 lsm.h +
bench.c),与 Rust / LevelDB
使用完全相同的参数和场景。
| 参数 | 值 |
|---|---|
| Key 大小 | 16 字节 |
| Value 大小 | 100 字节 |
| 数据量 | 100,000 条 |
| write_buffer_size | 4 MB |
| bloom_bits_per_key | 10 |
| 测试机器 | Intel i9-12900K, 24 核, Linux x86_64 |
| 编译器 | GCC 15.2.1 -O2 (C / LevelDB) / rustc 1.94.0 –release (Rust) |
| 轮数 | 3 轮取中位数 |
五个场景:
- Sequential Write:key 从
0000000000000000到0000000000099999顺序写入。 - Random Write:key 随机打乱后写入(Fisher-Yates shuffle)。
- Sequential Read:按 key 顺序逐条
Get()。 - Random Read:随机 key 逐条
Get()。 - Mixed Read/Write:80% 随机读 + 20% 随机写交替执行。
benchmark 源码:
- Our C:
examples/lsm-tree-c/(编译:gcc -O2 -o bench_our_c bench.c -lz -lpthread)- Our Rust:
examples/lsm-tree-rs/benches/bench_main.rs(编译:cargo build --release --bin bench_main)- LevelDB:
examples/lsm-tree-rs/bench_leveldb.c(编译:gcc -O2 -o bench_leveldb bench_leveldb.c -lleveldb -lpthread)
实测数据
下表为 3 轮取中位数:
| 场景 | Our C (ops/s) | Our Rust (ops/s) | LevelDB 1.23 (ops/s) | C / LevelDB | Rust / LevelDB |
|---|---|---|---|---|---|
| Seq Write | 2,524,115 | 11,479 | 1,291,206 | 195% | 0.9% |
| Rand Write | 1,764,349 | 7,277 | 1,002,619 | 176% | 0.7% |
| Seq Read | 59,908 | 59,374 | 1,914,418 | 3.1% | 3.1% |
| Rand Read | 55,984 | 52,525 | 1,126,821 | 5.0% | 4.7% |
| Mixed R/W | 88,295 | 27,471 | 761,754 | 11.6% | 3.6% |
完整输出:
=== Our C LSM-Tree Benchmark ===
N=100000 key=16B val=100B write_buffer=4MB bloom=10 bits/key
Round 1: Seq Write 2446122 | Rand Write 1737834 | Seq Read 59269 | Rand Read 55861 | Mixed 88295
Round 2: Seq Write 2565890 | Rand Write 1778965 | Seq Read 59908 | Rand Read 56417 | Mixed 89457
Round 3: Seq Write 2524115 | Rand Write 1764349 | Seq Read 60445 | Rand Read 55984 | Mixed 83771
=== Our Rust LSM-Tree Benchmark ===
N=100000, key=16B, val=100B, write_buffer=4MB
Round 1: Seq Write 11479 | Rand Write 7277 | Seq Read 61261 | Rand Read 58668 | Mixed 29560
Round 2: Seq Write 11560 | Rand Write 7036 | Seq Read 59374 | Rand Read 52525 | Mixed 27471
Round 3: Seq Write 11372 | Rand Write 7302 | Seq Read 56220 | Rand Read 51837 | Mixed 27365
=== LevelDB 1.23 Benchmark ===
N=100000, key=16B, val=100B, write_buffer=4MB
Round 1: Seq Write 1257267 | Rand Write 1002619 | Seq Read 1914418 | Rand Read 1126821 | Mixed 740955
Round 2: Seq Write 1291206 | Rand Write 985047 | Seq Read 1927332 | Rand Read 1165115 | Mixed 761754
Round 3: Seq Write 1291374 | Rand Write 1006488 | Seq Read 1852720 | Rand Read 1100285 | Mixed 766732
差距分析
三组数据呈现出一个有趣的格局:
- 写入:Our C > LevelDB >> Our Rust
- 读取:LevelDB >> Our C ~ Our Rust
- 综合:LevelDB >> Our C > Our Rust
Our C 写入为什么比 LevelDB 还快?
答案在于”省了什么”:我们的 C 实现没有 fsync、没有
WriteBatch 分组开销、没有 MANIFEST 更新、没有 compaction
score 计算。写入路径极度精简——db_put() 只做 WAL
append + MemTable insert,满了就同步 flush。LevelDB
的写入路径虽然更重(WriteBatch 编码、Group Commit、后台
flush 协调),但它为持久性和并发付出了代价。
换句话说,Our C
的写入速度是以牺牲持久性和功能完整性换来的。如果加上
fsync(),Our C 的写入会骤降到接近 LevelDB
水平。
Our Rust 写入为什么远慢于 Our C?
| 差异 | Our C | Our Rust |
|---|---|---|
| 内存分配 | Arena 顺序分配,零散 malloc 降为零 | 每个节点
Box::new() + Vec::new() |
| MemTable 容量检查 | arena.bytes_used
直接读 |
approximate_memory()
虽已优化为 O(1),但仍有开销 |
| WAL 写入 | fwrite() 到 FILE
buffer |
BufWriter + 每次
flush |
| Mutex 开销 | pthread_mutex_lock
无毒化检查 |
Rust Mutex 包含
poisoning 检查 |
读路径差距(~20-30 倍 vs LevelDB)三者共同的瓶颈:
| 缺失优化 | Our C / Our Rust | LevelDB 的做法 | 估计影响 |
|---|---|---|---|
| Table Cache | 每次 Get()
打开文件 + 解析 Footer + 关闭 |
LRU 缓存已打开的 TableReader,命中时零 I/O | 极高 |
| Block Cache | 每次读 Data Block 从磁盘解析 | LRU 缓存已解析的 Data Block | 高 |
| Snappy 压缩 | 未压缩,SST 文件更大 | Snappy 压缩减少磁盘读取量 | 中 |
pread() |
fopen() +
fseek() + fread()
多次系统调用 |
pread()
单次系统调用直接定位读 |
中 |
Our C 和 Our Rust 的读性能几乎一致(~55K-60K ops/s),说明读路径的瓶颈不在语言层面,而在架构层面——没有 Table Cache 是致命的。
收获
通过三方对比我们学到:
- 写入优化的核心是异步 + 批量。LevelDB 的后台 Flush + WriteBatch 设计让写入吞吐不受磁盘 I/O 约束,而 Our Rust 每次写入都同步落盘。
- 读取优化的核心是缓存。Table Cache 和 Block Cache 不是”nice to have”,而是让读路径从万级跃升到百万级的核心优化。
- 语言选择不是性能瓶颈。Our C 和 Our Rust
的读性能几乎一致,证明了架构设计才是决定性因素。Our Rust
写入慢主要因为
Box::new()堆分配和更保守的 I/O 策略,而非语言本身的开销。 - Arena 分配器立竿见影。Our C 使用 Arena
让写入达到百万级,Our Rust 逐节点
Box::new()是写入瓶颈之一。
系列总结
五篇文章走完了从零构建 LSM-Tree 存储引擎的完整路径:
| 篇目 | 核心产出 | 代码量 |
|---|---|---|
| 第 1 篇:全景 | 架构心智模型 + 三种放大推导 | 概念图 |
| 第 2 篇:WAL + MemTable | WAL 分片 + 跳表 + 崩溃恢复 | ~200 行 C |
| 第 3 篇:SSTable + Bloom Filter | Data Block 前缀压缩 + 双重哈希 Bloom | ~400 行 C |
| 第 4 篇:Compaction | MergeIterator + Version/MANIFEST + 策略对比 | ~550 行 C |
| 第 5 篇:完整引擎 + Rust(本文) | DB API + 并发 + Snapshot + Iterator + Rust | ~600 行 C + ~400 行 Rust |
总计:~1,750 行 C + ~400 行 Rust,21+ 张 SVG 架构图。
如果你跟着系列走到了这里,你已经理解了:
- 为什么 LSM-Tree 比 B-Tree 写入更快——顺序 I/O + 批量归并 vs 随机页写入。
- Compaction 是吞吐和延迟的核心杠杆——Leveled、Size-Tiered、Universal 各有取舍。
- 崩溃恢复的关键链条——WAL 保证内存数据不丢,MANIFEST 保证磁盘元数据一致。
- Rust 如何在编译期消除存储引擎中的安全隐患——所有权、借用、生命周期、Send/Sync。
进一步阅读
- LevelDB 源码——本系列的蓝本,~20K 行 C++。
- RocksDB Wiki——工业级 LSM-Tree,在 LevelDB 基础上增加了列族、事务、多线程 Compaction 等特性。
- Pebble——CockroachDB 的 Go LSM-Tree 实现。
- BadgerDB——Go 实现的 KV 存储,使用 WiscKey 分离值的设计。
- Mini-LSM——Rust 教学 LSM-Tree,与本系列互补。
系列路线
| 篇目 | 标题 | 核心产出 |
|---|---|---|
| 第 1 篇 | LSM-Tree 全景 | 架构心智模型 + 三种放大的数学推导 |
| 第 2 篇 | WAL + MemTable | WAL 分片策略 + 跳表实现 + 崩溃恢复证明 |
| 第 3 篇 | SSTable + Bloom Filter | Data Block 前缀压缩 + Bloom Filter 双重哈希 + Builder/Reader |
| 第 4 篇 | Compaction | 多路归并迭代器 + Version/MANIFEST + 策略对比 |
| 第 5 篇 | 完整引擎 + Rust 重写对比(本文) | DB API + 并发 + Snapshot + Iterator + Rust 重写 + benchmark |