土法炼钢兴趣小组的算法知识备份

Compaction:LSM-Tree 的心脏手术

目录

前三篇走通了写入 → 内存 → 磁盘的完整链路:WAL 保证持久性,MemTable 在内存中攒批,SSTable 把有序数据固化到磁盘。

但有一个问题一直被搁置——SSTable 只增不删

每次 MemTable flush 都会在 Level 0 新增一个 SSTable 文件。随着写入持续,L0 文件越来越多,造成三个后果:

  1. 读性能退化——L0 文件 key 范围可能重叠,Get() 最坏要逐个搜索。
  2. 空间浪费——同一个 key 被更新 10 次,前 9 次的旧值仍然占据磁盘。
  3. Tombstone 积压——Delete() 只是写入一条墓碑标记,真正的数据并未释放。

Compaction 是 LSM-Tree 的后台整理机制:把多个旧的 SSTable 归并成更少、更有序的文件,回收无效数据并维持读性能。

它是 LSM-Tree 中实现最复杂的子系统之一,也是决定系统长期读写表现的关键机制。

前文回顾: 第 1 篇 建立了全景地图和三种放大的数学推导;第 2 篇 实现了 WAL 和 MemTable;第 3 篇 实现了 SSTable Builder/Reader 和 Bloom Filter。本文实现 Compaction 全链路。


Level 层级设计

分层结构

第 1 篇已经介绍了 LSM-Tree 的分层架构。这里展开每一层的具体规则:

Level 分层结构
层级 容量上限 key 范围 触发条件
L0 文件数 ≤ 4 可重叠 文件数 ≥ 4 → compaction 到 L1
L1 10 MB 不重叠 总大小 > 10 MB
L2 100 MB 不重叠 总大小 > 100 MB
L3 1 GB 不重叠 总大小 > 1 GB
×10 不重叠
L6 1 TB 不重叠 —(最底层)

L0 的特殊性:L0 的每个 SSTable 来自一次 MemTable flush,各文件之间 key 范围可能重叠。这意味着查找一个 key 时,必须按时间倒序搜索所有 L0 文件——这也是 L0 文件数不能太多的原因。

L1+ 的不变式:同层内任意两个 SSTable 的 key 范围不重叠,且整层按 key 有序排列。查找时只需二分定位到最多 1 个文件。这个不变式由 compaction 过程维护。

容量递增设计的直觉:为什么每层容量是上层的 10 倍?这是写放大和空间利用的权衡。设放大因子为 T:

LevelDB 选择 T=10,层数约 7(覆盖 ~1TB 数据),是经验上较好的平衡点。工程上可以通过调整 max_bytes_for_level_basemax_bytes_for_level_multiplier 来微调。

Compaction 打分

LevelDB 用一个简单的打分机制决定哪一层需要 compaction:

#define MAX_LEVELS       7
#define L0_COMPACTION_TRIGGER 4   // L0 文件数阈值

// 每层的容量上限(字节)
static uint64_t max_bytes_for_level(int level) {
    uint64_t result = 10 * 1024 * 1024;  // L1 = 10MB
    while (level > 1) {
        result *= 10;
        level--;
    }
    return result;
}

// 为每一层打分,返回得分最高的层级
// 得分 > 1.0 表示需要 compaction
static double compaction_score(const int *file_counts,
                               const uint64_t *level_bytes,
                               int *best_level) {
    double max_score = 0;
    *best_level = -1;

    // L0 按文件数打分
    double score0 = (double)file_counts[0] / L0_COMPACTION_TRIGGER;
    if (score0 > max_score) {
        max_score = score0;
        *best_level = 0;
    }

    // L1+ 按总大小 / 目标大小打分
    for (int level = 1; level < MAX_LEVELS - 1; level++) {
        double score = (double)level_bytes[level] / max_bytes_for_level(level);
        if (score > max_score) {
            max_score = score;
            *best_level = level;
        }
    }

    return max_score;
}

核心思想:L0 按文件数、L1+ 按总字节数。得分最高的层优先做 compaction。L0 用文件数而非字节数是因为每个 L0 文件都可能与目标 key 重叠,文件越多读越慢。

何时执行打分? LevelDB 在每次 compaction 完成后和每次 flush 完成后都会调用打分。如果最高分 > 1.0,就在后台调度一次新的 compaction。多个层可能同时需要 compaction,但 LevelDB 每次只执行一个——处理完一个后再打分、再选下一个。

Seek-based Compaction:除了 score 触发外,LevelDB 还有一种触发方式:如果某个文件在点查中被反复”经过但未命中”(allowed_seeks 计数器耗尽),说明它的 Bloom Filter 和 key 范围已不能有效过滤读请求,需要通过 compaction 重新整理。

FileMetaData

每个 SSTable 文件在内存中用 FileMetaData 记录元信息:

typedef struct {
    uint64_t number;        // 全局唯一的文件编号
    uint64_t file_size;     // 文件大小(字节)
    char     smallest[256]; // 最小 key
    int      smallest_len;
    char     largest[256];  // 最大 key
    int      largest_len;
} FileMetaData;

number 用于构造文件名(如 000042.sst),也用于在 Version 中唯一标识文件。


Minor Compaction:MemTable → L0

Minor Compaction(也叫 Flush)是最简单的 compaction:把 Immutable MemTable 的全部内容写成一个 L0 SSTable。

触发时机

Minor Compaction 触发流程

当 Mutable MemTable 大小达到 write_buffer_size(LevelDB 默认 4MB)时触发 Flush:

  1. 冻结:把当前 Mutable MemTable 标记为 Immutable(只读)。
  2. 切换:新建一个空的 Mutable MemTable 和对应的 WAL 文件。后续写入切换到新 MemTable,因此在后台 Flush 能及时完成的前提下,写路径通常不会被阻塞。
  3. 后台 Flush:调度后台线程遍历 Immutable MemTable 的跳表,写出一个 L0 SSTable。
  4. 清理:Flush 完成后,删除 Immutable MemTable 和对应的旧 WAL 文件。

注意:如果后台 Flush 还没完成,当前 Mutable MemTable 又满了,此时会阻塞写入——因为 LevelDB 同一时刻最多只有一个 Immutable MemTable。这是 LevelDB 的设计限制。RocksDB 通过 max_write_buffer_number 允许多个 Immutable MemTable 排队,缓解写入阻塞问题。

Flush 实现

第 2 篇实现了 MemTable 的跳表,forward[0] 链表就是按 InternalKey 有序的完整遍历。第 3 篇实现了 TableBuilder——顺序喂入 key-value 就能生成 SSTable。把两者串起来:

// 遍历跳表写入 SSTable
// 此处 SkipListNode、TableBuilder 接口引用自第 2、3 篇
// 为使本文代码自包含,仅展示核心逻辑

static int flush_memtable_to_l0(const char *db_dir, uint64_t file_num,
                                const char **keys, const size_t *key_lens,
                                const char **vals, const size_t *val_lens,
                                int count, FileMetaData *out) {
    char path[512];
    snprintf(path, sizeof(path), "%s/%06lu.sst", db_dir, (unsigned long)file_num);

    FILE *fp = fopen(path, "wb");
    if (!fp) return -1;

    TableBuilder tb;
    table_builder_init(&tb, fp);

    for (int i = 0; i < count; i++) {
        table_builder_add(&tb, keys[i], key_lens[i], vals[i], val_lens[i]);
    }
    table_builder_finish(&tb);

    out->number = file_num;
    out->file_size = tb.offset;

    // 记录最小/最大 key
    if (count > 0) {
        memcpy(out->smallest, keys[0], key_lens[0]);
        out->smallest_len = (int)key_lens[0];
        memcpy(out->largest, keys[count - 1], key_lens[count - 1]);
        out->largest_len = (int)key_lens[count - 1];
    }

    block_builder_free(&tb.data_block);
    block_builder_free(&tb.index_block);
    fclose(fp);

    return 0;
}

Flush 后产生一个新的 L0 文件。此时需要: 1. 记录一条 VersionEdit(添加新文件到 L0),写入 MANIFEST 日志。 2. 更新 Version——安装新文件到 L0 列表。 3. 检查 L0 文件数是否达到阈值(L0_COMPACTION_TRIGGER = 4),如果是,触发 L0 → L1 的 Major Compaction。

Flush 的 key 范围覆盖整个 MemTable——从跳表的第一个节点到最后一个节点。由于不同 MemTable 的 key 范围可能重叠(两次写入可能修改相同的 key),L0 文件之间 key 范围也会重叠。这正是 L0 特殊性的根源。


MergeIterator:多路归并的核心

问题

Major Compaction 需要同时读取多个 SSTable,按 key 有序输出。这就是经典的 k 路归并问题。

SSTable 顺序迭代器

第 3 篇TableReader 只提供了点查接口 table_reader_get()。要做归并,需要一个可以顺序扫描的迭代器。我们基于 TableReader 的 Index Block 和 Data Block 封装一个:

typedef struct {
    TableReader *reader;

    // Index Block 扫描状态
    size_t  idx_offset;        // 当前在 index block 中的偏移
    char    idx_prev_key[256]; // index block 前缀解码用
    int     idx_prev_key_len;

    // 当前 Data Block 状态
    uint8_t *block_data;       // 当前已加载的 data block
    size_t   block_size;
    size_t   blk_offset;       // 当前在 data block 内的偏移
    char     blk_prev_key[256];
    int      blk_prev_key_len;

    // 当前 key-value
    char     cur_key[256];
    int      cur_key_len;
    const char *cur_val;
    int      cur_val_len;
    int      valid;            // 迭代器是否有效
} TableIterator;

// 加载下一个 data block(通过 index block 的当前 entry)
static int table_iter_load_block(TableIterator *it,
                                 const char *handle_raw, int handle_len) {
    if (it->block_data) { free(it->block_data); it->block_data = NULL; }

    BlockHandle bh;
    if (decode_block_handle((const uint8_t *)handle_raw, handle_len, &bh) < 0)
        return -1;

    it->block_data = read_block(it->reader->fp, &bh, &it->block_size);
    if (!it->block_data) return -1;

    // data block 内从头开始
    it->blk_offset = 0;
    it->blk_prev_key_len = 0;

    // 跳过 restart array 区域:只扫描 entry 部分
    // restarts_offset = block_size - 4 - num_restarts * 4
    return 0;
}

static size_t block_entries_end(const uint8_t *data, size_t size) {
    if (size < 4) return 0;
    uint32_t num_restarts;
    memcpy(&num_restarts, data + size - 4, 4);
    return size - 4 - num_restarts * 4;
}

// 在当前 data block 内前进一条 entry
static int table_iter_next_in_block(TableIterator *it) {
    size_t entries_end = block_entries_end(it->block_data, it->block_size);

    if (it->blk_offset >= entries_end) return -1; // 当前 block 耗尽

    const char *v;
    int vl;
    int consumed = decode_entry(it->block_data, entries_end, it->blk_offset,
                                it->blk_prev_key, it->blk_prev_key_len,
                                it->cur_key, &it->cur_key_len, &v, &vl);
    if (consumed < 0) return -1;

    it->cur_val = v;
    it->cur_val_len = vl;

    memcpy(it->blk_prev_key, it->cur_key, it->cur_key_len);
    it->blk_prev_key_len = it->cur_key_len;
    it->blk_offset += consumed;

    return 0;
}

// 前进到下一个 index entry,加载对应的 data block
static int table_iter_next_block(TableIterator *it) {
    uint32_t num_restarts;
    memcpy(&num_restarts,
           it->reader->index_data + it->reader->index_size - 4, 4);
    size_t idx_entries_end = it->reader->index_size - 4 - num_restarts * 4;

    if (it->idx_offset >= idx_entries_end) return -1; // 所有 block 耗尽

    char sep_key[256];
    int sep_len;
    const char *handle_raw;
    int handle_len;
    int consumed = decode_entry(it->reader->index_data, idx_entries_end,
                                it->idx_offset,
                                it->idx_prev_key, it->idx_prev_key_len,
                                sep_key, &sep_len, &handle_raw, &handle_len);
    if (consumed < 0) return -1;

    memcpy(it->idx_prev_key, sep_key, sep_len);
    it->idx_prev_key_len = sep_len;
    it->idx_offset += consumed;

    return table_iter_load_block(it, handle_raw, handle_len);
}

static void table_iter_init(TableIterator *it, TableReader *reader) {
    memset(it, 0, sizeof(*it));
    it->reader = reader;
    it->valid = 0;

    // 加载第一个 data block
    if (table_iter_next_block(it) == 0) {
        if (table_iter_next_in_block(it) == 0) {
            it->valid = 1;
        }
    }
}

static void table_iter_next(TableIterator *it) {
    if (!it->valid) return;

    // 先尝试在当前 block 内前进
    if (table_iter_next_in_block(it) == 0) return;

    // 当前 block 耗尽,加载下一个 block
    if (table_iter_next_block(it) == 0) {
        if (table_iter_next_in_block(it) == 0) return;
    }

    it->valid = 0; // 所有数据耗尽
}

static void table_iter_free(TableIterator *it) {
    if (it->block_data) { free(it->block_data); it->block_data = NULL; }
}

TableIterator 按 Data Block 顺序、Block 内按 entry 顺序输出所有 key-value。这是 MergeIterator 的输入。

最小堆归并

k 路归并的经典做法:维护一个大小为 k 的最小堆,每次取堆顶(最小 key),然后让该路前进一步、重新调整堆。每次 next() 的时间复杂度 \(O(\log k)\)

为什么用最小堆而非简单线性扫描? 如果每次 next() 都遍历所有 k 路找最小值,复杂度是 \(O(k)\)。用最小堆降为 \(O(\log k)\)。当 Compaction 涉及 10+ 个文件、总共数百万条 key 时,这个差距是数量级的:

k (输入路数) 线性扫描 最小堆
4 4 次比较/key 2 次比较/key
16 16 次比较/key 4 次比较/key
64 64 次比较/key 6 次比较/key

堆的核心操作

MergeIterator 最小堆归并逐步演示

上图展示了 3 路归并的前 3 步。每步操作完全一致:弹出堆顶 → 输出到结果 → 该迭代器 next() → 放回堆顶位置 → sift_down() 恢复堆序。这个循环持续到所有路都耗尽。

#define MAX_MERGE_INPUTS 16

typedef struct {
    TableIterator iters[MAX_MERGE_INPUTS]; // 子迭代器数组
    int heap[MAX_MERGE_INPUTS];            // 堆(存储 iters 的下标)
    int heap_size;
    int num_inputs;
} MergeIterator;

// 比较两个迭代器当前 key
static int merge_compare(const MergeIterator *mi, int a, int b) {
    const TableIterator *ia = &mi->iters[a];
    const TableIterator *ib = &mi->iters[b];
    int min_len = ia->cur_key_len < ib->cur_key_len
                ? ia->cur_key_len : ib->cur_key_len;
    int cmp = memcmp(ia->cur_key, ib->cur_key, min_len);
    if (cmp != 0) return cmp;
    return ia->cur_key_len - ib->cur_key_len;
}

// 堆下沉
static void heap_sift_down(MergeIterator *mi, int i) {
    while (2 * i + 1 < mi->heap_size) {
        int child = 2 * i + 1;
        if (child + 1 < mi->heap_size &&
            merge_compare(mi, mi->heap[child + 1], mi->heap[child]) < 0)
            child++;
        if (merge_compare(mi, mi->heap[i], mi->heap[child]) <= 0) break;
        int tmp = mi->heap[i];
        mi->heap[i] = mi->heap[child];
        mi->heap[child] = tmp;
        i = child;
    }
}

// 堆上浮
static void heap_sift_up(MergeIterator *mi, int i) {
    while (i > 0) {
        int parent = (i - 1) / 2;
        if (merge_compare(mi, mi->heap[parent], mi->heap[i]) <= 0) break;
        int tmp = mi->heap[i];
        mi->heap[i] = mi->heap[parent];
        mi->heap[parent] = tmp;
        i = parent;
    }
}

static void merge_iter_init(MergeIterator *mi, TableReader *readers, int n) {
    mi->num_inputs = n;
    mi->heap_size = 0;

    for (int i = 0; i < n; i++) {
        table_iter_init(&mi->iters[i], &readers[i]);
        if (mi->iters[i].valid) {
            mi->heap[mi->heap_size] = i;
            mi->heap_size++;
        }
    }

    // 建堆
    for (int i = mi->heap_size / 2 - 1; i >= 0; i--) {
        heap_sift_down(mi, i);
    }
}

static int merge_iter_valid(const MergeIterator *mi) {
    return mi->heap_size > 0;
}

// 获取当前最小 key
static void merge_iter_key(const MergeIterator *mi,
                           const char **key, int *key_len) {
    int idx = mi->heap[0];
    *key = mi->iters[idx].cur_key;
    *key_len = mi->iters[idx].cur_key_len;
}

// 获取当前值
static void merge_iter_value(const MergeIterator *mi,
                             const char **val, int *val_len) {
    int idx = mi->heap[0];
    *val = mi->iters[idx].cur_val;
    *val_len = mi->iters[idx].cur_val_len;
}

static void merge_iter_next(MergeIterator *mi) {
    if (mi->heap_size == 0) return;

    int idx = mi->heap[0];
    table_iter_next(&mi->iters[idx]);

    if (mi->iters[idx].valid) {
        // 该路还有数据,下沉调整堆
        heap_sift_down(mi, 0);
    } else {
        // 该路耗尽,从堆中移除
        mi->heap[0] = mi->heap[mi->heap_size - 1];
        mi->heap_size--;
        if (mi->heap_size > 0) heap_sift_down(mi, 0);
    }
}

static void merge_iter_free(MergeIterator *mi) {
    for (int i = 0; i < mi->num_inputs; i++) {
        table_iter_free(&mi->iters[i]);
    }
}

MergeIterator 是 Compaction 的核心组件——它将多个 SSTable 的有序输出融合成一条统一的有序流。

算法正确性保证:最小堆的不变式是”堆顶元素 ≤ 所有其他元素”。由于每个 TableIterator 内部已经是有序的(SSTable 本身有序),堆顶永远是全局最小的 key。next() 后该路前进到下一个更大的 key(或耗尽),下沉操作保证新的堆顶仍然是全局最小——因此输出流严格有序。

具体执行流程举例——假设 3 个 SSTable 的 key 分别为:

迭代器 key 序列
iter0 key:000, key:003, key:007
iter1 key:001, key:003, key:008
iter2 key:002, key:005, key:009
  1. 建堆:堆 = [000(iter0), 001(iter1), 002(iter2)],堆顶 = 000
  2. 弹出 000 → 输出 000,iter0 前进到 003,下沉 → 堆 = [001(iter1), 003(iter0), 002(iter2)] → 下沉后 [001, 003, 002]
  3. 弹出 001 → 输出 001,iter1 前进到 003,下沉 → 堆 = [002(iter2), 003(iter0), 003(iter1)]
  4. 弹出 002 → 输出 002,iter2 前进到 005,下沉 → 堆 = [003(iter0), 005(iter2), 003(iter1)]
  5. 弹出 003(iter0) → 输出 003,iter0 前进到 007 → 堆 = [003(iter1), 005(iter2), 007(iter0)]
  6. 弹出 003(iter1) → 这是重复 key,Compaction 的去重逻辑会跳过它(后面详述)

最终有序输出:000, 001, 002, 003, 005, 007, 008, 009(去重后)。

小结:MergeIterator 用最小堆将多路有序输入归并为一条全局有序流。堆的不变式保证了输出的正确排序,每次 next() 的时间复杂度为 \(O(\log k)\)\(k\) 为输入路数)。它是 Minor/Major Compaction 共用的底层归并引擎。


Major Compaction

选择输入文件

Compaction 打分确定了需要 compact 的层级。接下来要选择具体的输入文件。

Compaction 完整流程

Ln(L1+)的文件选择

对于 L1 及以上层,选文件相对简单:

  1. 从 Ln 选 1 个文件:LevelDB 采用 round-robin 策略——记录上次 compact 到哪个 key(compact_pointer[level]),下次从那个 key 之后继续。这确保同层所有文件被均匀 compact,不会出现”热点文件”被反复 compact 而冷文件一直不动的情况。
  2. 从 Ln+1 找所有 key 范围重叠的文件:遍历 Ln+1 的文件列表,收集所有 key 范围与选中文件有交集的文件。

L0 的特殊处理

L0 的文件选择要复杂得多:

L0 → L1 Compaction 文件选择

L0 文件之间 key 范围可以重叠。这意味着不能只选一个文件——如果选了 SST-001(范围 [aaa…ggg]),而 SST-002(范围 [ddd…mmm])也与之重叠,那么 SST-002 也必须参与这次 compaction。否则:

LevelDB 的做法是递归扩展GetOverlappingInputs):

  1. 初始选一个 L0 文件,记其 key 范围 [smallest, largest]
  2. 扫描所有 L0 文件,收集所有与 [smallest, largest] 重叠的文件
  3. 合并所有选中文件的 key 范围,得到新的 [smallest’, largest’]
  4. 如果新范围比旧范围大,回到第 2 步继续扩展
  5. 重复直到范围不再增长

最坏情况下,所有 L0 文件都互相重叠,全部参与 compaction。这也是为什么 L0 文件数触发阈值设得较低(默认 4)——太多 L0 文件会导致 compaction 输入集过大。

Major Compaction 归并过程
// 判断两个 key 范围是否重叠
static int ranges_overlap(const char *a_small, int a_small_len,
                          const char *a_large, int a_large_len,
                          const char *b_small, int b_small_len,
                          const char *b_large, int b_large_len) {
    // a.largest < b.smallest → 不重叠
    int cmp1_len = a_large_len < b_small_len ? a_large_len : b_small_len;
    int cmp1 = memcmp(a_large, b_small, cmp1_len);
    if (cmp1 == 0) cmp1 = a_large_len - b_small_len;
    if (cmp1 < 0) return 0;

    // b.largest < a.smallest → 不重叠
    int cmp2_len = b_large_len < a_small_len ? b_large_len : a_small_len;
    int cmp2 = memcmp(b_large, a_small, cmp2_len);
    if (cmp2 == 0) cmp2 = b_large_len - a_small_len;
    if (cmp2 < 0) return 0;

    return 1; // 重叠
}

// 在 Ln+1 中找出所有与指定 key 范围重叠的文件
static int find_overlapping_files(const FileMetaData *files, int num_files,
                                  const char *smallest, int smallest_len,
                                  const char *largest, int largest_len,
                                  int *indices, int max_indices) {
    int count = 0;
    for (int i = 0; i < num_files && count < max_indices; i++) {
        if (ranges_overlap(smallest, smallest_len, largest, largest_len,
                           files[i].smallest, files[i].smallest_len,
                           files[i].largest, files[i].largest_len)) {
            indices[count++] = i;
        }
    }
    return count;
}

归并执行

Compaction 的核心流程分为 5 步:

  1. 打开输入文件:为每个参与的 SSTable 创建 TableReader
  2. 初始化 MergeIterator:每个 TableReader 生成一个 TableIterator,所有迭代器的当前 key 放入最小堆建堆。
  3. 归并循环:不断弹出堆顶,执行去重和 tombstone 过滤(详见下一节),写入输出 SSTable。
  4. 文件切割:输出文件超过 MAX_OUTPUT_SIZE(通常 2MB)时,关闭当前文件,打开新文件。切割点选在一个完整 key 之后,保证一个 key 不会被拆到两个文件。
  5. 清理:关闭所有输入文件的 Reader,释放 MergeIterator。

为什么切割为 2MB? 输出文件太大会导致后续 compaction 读写更多数据;太小会导致文件数膨胀,增加元数据开销。LevelDB 默认 2MB 是读写开销和文件管理的平衡点。

扩展输入范围(Expanding):LevelDB 在确定 Ln+1 的输入文件后,还会尝试扩大 Ln 的输入范围——把更多 key 邻近的文件也加进来。前提是不会因此增加 Ln+1 的文件数。这样做可以一次 compaction 处理更多数据,减少后续的重复工作。具体逻辑在 SetupOtherInputs() 中实现。

#define MAX_OUTPUT_SIZE (2 * 1024 * 1024) // 输出文件 2MB 切割

typedef struct {
    FileMetaData *outputs;   // 输出文件元信息数组
    int           num_outputs;
    int           output_cap;
} CompactionState;

// 执行一次 compaction
// input_files: 所有参与合并的文件路径
// num_inputs: 输入文件个数
// db_dir: 数据库目录
// next_file_num: 下一个可用文件编号(会被更新)
// state: 输出结果
static int do_compaction(const char **input_files, int num_inputs,
                         const char *db_dir, uint64_t *next_file_num,
                         int target_level,  // 输出到哪一层
                         CompactionState *state) {
    // 打开所有输入文件
    TableReader *readers = malloc(num_inputs * sizeof(TableReader));
    for (int i = 0; i < num_inputs; i++) {
        if (table_reader_open(&readers[i], input_files[i]) < 0) {
            // 清理已打开的
            for (int j = 0; j < i; j++) table_reader_close(&readers[j]);
            free(readers);
            return -1;
        }
    }

    // 初始化多路归并
    MergeIterator mi;
    merge_iter_init(&mi, readers, num_inputs);

    // 输出状态
    state->output_cap = 16;
    state->outputs = malloc(state->output_cap * sizeof(FileMetaData));
    state->num_outputs = 0;

    FILE *out_fp = NULL;
    TableBuilder *out_tb = NULL;
    FileMetaData *cur_meta = NULL;

    char last_user_key[256];
    int  last_user_key_len = 0;
    int  has_last_user_key = 0;

    while (merge_iter_valid(&mi)) {
        const char *key;
        int key_len;
        const char *val;
        int val_len;

        merge_iter_key(&mi, &key, &key_len);
        merge_iter_value(&mi, &val, &val_len);

        // === 去重:相同 user_key 只保留第一个(sequence 最大) ===
        // 简化处理:在教学实现中,key 就是 user_key
        int is_dup = 0;
        if (has_last_user_key) {
            int min_l = last_user_key_len < key_len
                      ? last_user_key_len : key_len;
            int cmp = memcmp(last_user_key, key, min_l);
            if (cmp == 0 && last_user_key_len == key_len) {
                is_dup = 1; // 重复 key,跳过
            }
        }

        if (!is_dup) {
            memcpy(last_user_key, key, key_len);
            last_user_key_len = key_len;
            has_last_user_key = 1;

            // 需要新建输出文件?
            if (!out_fp) {
                uint64_t fnum = (*next_file_num)++;
                char out_path[512];
                snprintf(out_path, sizeof(out_path), "%s/%06lu.sst",
                         db_dir, (unsigned long)fnum);
                out_fp = fopen(out_path, "wb");
                out_tb = malloc(sizeof(TableBuilder));
                table_builder_init(out_tb, out_fp);

                if (state->num_outputs >= state->output_cap) {
                    state->output_cap *= 2;
                    state->outputs = realloc(state->outputs,
                        state->output_cap * sizeof(FileMetaData));
                }
                cur_meta = &state->outputs[state->num_outputs];
                memset(cur_meta, 0, sizeof(*cur_meta));
                cur_meta->number = fnum;
                memcpy(cur_meta->smallest, key, key_len);
                cur_meta->smallest_len = key_len;
                state->num_outputs++;
            }

            // 写入 key-value
            table_builder_add(out_tb, key, key_len, val, val_len);

            // 更新 largest
            memcpy(cur_meta->largest, key, key_len);
            cur_meta->largest_len = key_len;

            // 检查是否需要切割文件
            if (out_tb->offset >= MAX_OUTPUT_SIZE) {
                table_builder_finish(out_tb);
                cur_meta->file_size = out_tb->offset;
                block_builder_free(&out_tb->data_block);
                block_builder_free(&out_tb->index_block);
                fclose(out_fp);
                free(out_tb);
                out_fp = NULL;
                out_tb = NULL;
                cur_meta = NULL;
            }
        }

        merge_iter_next(&mi);
    }

    // 关闭最后一个输出文件
    if (out_fp) {
        table_builder_finish(out_tb);
        cur_meta->file_size = out_tb->offset;
        block_builder_free(&out_tb->data_block);
        block_builder_free(&out_tb->index_block);
        fclose(out_fp);
        free(out_tb);
    }

    // 清理
    merge_iter_free(&mi);
    for (int i = 0; i < num_inputs; i++) table_reader_close(&readers[i]);
    free(readers);

    return 0;
}

do_compaction() 是整个 compaction 的执行器。注意它完全不关心”从哪一层到哪一层”——它只接收一组文件、归并、输出。层级管理由上层的 Version 系统负责。

输出键的正确性do_compaction() 保证输出的 key 严格递增(去重后没有重复 key),且每个输出文件内部 key 有序、文件间 key 不重叠。这维护了 L1+ 的核心不变式。

Compaction 期间的并发读取:正在被 compact 的输入文件不会被立即删除。当前 Version 仍然引用这些文件,直到新 Version 安装完毕、旧 Version 引用计数归零后才物理删除。这是 Version 系统(下一节)的职责。


Tombstone 下推与去重

去重规则

LevelDB 中每个 key 实际上是 InternalKey = user_key + sequence + type。InternalKey 的排序规则是:user_key 升序,sequence 降序。这意味着在归并输出中,同一个 user_key 的多个版本会连续出现,且最新版本排在最前面。

去重逻辑因此很简单:遇到与上一条相同的 user_key,直接跳过(已在前面 is_dup 判断中实现)。

为什么这样就够了? InternalKey 排序保证的是:

key:foo#100 < key:foo#50 < key:foo#10 < key:goo#200

即同一个 user_key 的多个版本按 sequence 降序排列(最新在前)。当 MergeIterator 输出 key:foo#100 后,接下来输出的 key:foo#50key:foo#10 都是旧版本,可以直接丢弃。

本文简化实现:我们的教学代码中 key 就是 user_key(没有 sequence number),所以去重就是简单的 memcmp 相等判断。生产级实现中,比较器需要先提取 user_key 部分再比较。

Tombstone 处理

Delete() 操作写入一条 type = kTypeDeletion 的墓碑标记。Compaction 时需要决定何时可以安全丢弃这个 tombstone。这是 Compaction 中最微妙的部分之一。

Tombstone 下推与安全删除

上图展示了一个具体的例子。用户对 key foo 执行了 Delete() 操作,tombstone 写入 L0。Compaction 必须把这个 tombstone 一路下推,直到它覆盖了所有层中 foo 的旧值,才能安全丢弃。

生产级实现中 Tombstone 的安全删除条件:

条件 能否丢弃 原因
输出层是最底层,或更低层没有包含该 key 的文件 可以丢弃 不存在需要被”遮蔽”的旧值
更低层可能有该 key 的旧版本 不能 丢弃 tombstone 后 Get() 会穿透看到旧值
有活跃的 Snapshot 引用了该 sequence 不能 Snapshot 读取可能需要看到这个删除操作

LevelDB 用 IsBaseLevelForKey() 检查条件 1:从 compaction 输出层 +1 开始一直到最底层,对每层做二分搜索,如果所有层都没有包含该 user_key 的文件,返回 true。

本文的简化实现中不处理 InternalKey 的 sequence/type 拆分和 snapshot 检查。生产级实现(如 LevelDB)会在归并循环中检查这些条件:

// 伪代码:生产级 tombstone + 去重处理
for each key from MergeIterator:
    // 1. 去重:同一 user_key 只保留 sequence 最大的第一条
    if same_user_key_as_previous:
        // 旧版本,但如果有 snapshot 需要该版本则保留
        if snapshot_needs(key.seq):
            output(key)
        else:
            drop(key)         // 丢弃旧版本
        continue

    // 2. Tombstone 处理
    if key.type == kTypeDeletion:
        if is_base_level_for_key(key.user_key)
           and no_snapshot_before(key.seq):
            drop(key)         // 安全丢弃 tombstone
        else:
            output(key)       // 必须保留,继续下推到更低层
    else:
        output(key)           // 正常数据,输出

为什么不能过早丢弃? 如上图所示:假设 L2 有 key:foo = "old_value",L1 有 Delete(key:foo) 的 tombstone。如果 L0→L1 的 compaction 丢弃了这个 tombstone,Get("foo") 就会穿透到 L2 并错误返回旧值。这是一个正确性 bug,而非性能问题——Tombstone 必须一路下推到覆盖所有层的旧数据后才能删除

空间影响:如果应用层频繁执行 Delete() 但旧数据分布在很深的层(比如 L5),tombstone 需要经过 L0→L1→L2→L3→L4→L5 的 5 次 compaction 才能最终被清理。这期间 tombstone 会占据空间。这也是 RocksDB 引入 DeleteRange 范围删除的原因之一——用一条范围标记替代大量单点 tombstone。

小结:Tombstone 不能在产生时立即删除,必须逐层下推直到覆盖所有更低层的旧值。安全删除还需考虑 Snapshot 引用。这是 LSM-Tree 空间回收延迟的主要来源。


Version / VersionEdit / MANIFEST

为什么需要版本管理

每次 Compaction 都会增删文件——新文件产生、旧文件淘汰。但 Compaction 过程中可能有并发的读操作正在使用旧文件。直接删除会导致读操作崩溃。

LevelDB 的解决方案是 Version:一个不可变的文件集合快照。每次 Compaction 产生一个新的 Version,旧 Version 在所有引用释放后才清理。

Version / VersionEdit / MANIFEST

VersionEdit

VersionEdit 是 Version 状态变更的增量描述——不记录完整快照,只记录”加了哪些文件、删了哪些文件”:

#define TAG_NEXT_FILE_NUMBER  1
#define TAG_LAST_SEQUENCE     2
#define TAG_ADD_FILE          3
#define TAG_REMOVE_FILE       4

typedef struct {
    // 新增文件
    struct { int level; FileMetaData meta; } added[64];
    int num_added;

    // 删除文件
    struct { int level; uint64_t number; } removed[64];
    int num_removed;

    uint64_t next_file_number;
    uint64_t last_sequence;
    int      has_next_file_number;
    int      has_last_sequence;
} VersionEdit;

static void version_edit_init(VersionEdit *ve) {
    memset(ve, 0, sizeof(*ve));
}

static void version_edit_add_file(VersionEdit *ve, int level,
                                  const FileMetaData *f) {
    ve->added[ve->num_added].level = level;
    ve->added[ve->num_added].meta = *f;
    ve->num_added++;
}

static void version_edit_remove_file(VersionEdit *ve, int level,
                                     uint64_t number) {
    ve->removed[ve->num_removed].level = level;
    ve->removed[ve->num_removed].number = number;
    ve->num_removed++;
}

VersionEdit 编码

VersionEdit 使用 Tag-Length-Value 格式序列化,长度字段复用 varint 编码:

// 编码 VersionEdit → 字节流
static int version_edit_encode(const VersionEdit *ve,
                               uint8_t *buf, size_t cap) {
    int n = 0;

    if (ve->has_next_file_number) {
        n += encode_varint32(buf + n, TAG_NEXT_FILE_NUMBER);
        n += encode_varint64(buf + n, ve->next_file_number);
    }

    if (ve->has_last_sequence) {
        n += encode_varint32(buf + n, TAG_LAST_SEQUENCE);
        n += encode_varint64(buf + n, ve->last_sequence);
    }

    for (int i = 0; i < ve->num_added; i++) {
        n += encode_varint32(buf + n, TAG_ADD_FILE);
        n += encode_varint32(buf + n, (uint32_t)ve->added[i].level);
        n += encode_varint64(buf + n, ve->added[i].meta.number);
        n += encode_varint64(buf + n, ve->added[i].meta.file_size);
        // smallest key: length-prefixed
        n += encode_varint32(buf + n, (uint32_t)ve->added[i].meta.smallest_len);
        memcpy(buf + n, ve->added[i].meta.smallest,
               ve->added[i].meta.smallest_len);
        n += ve->added[i].meta.smallest_len;
        // largest key: length-prefixed
        n += encode_varint32(buf + n, (uint32_t)ve->added[i].meta.largest_len);
        memcpy(buf + n, ve->added[i].meta.largest,
               ve->added[i].meta.largest_len);
        n += ve->added[i].meta.largest_len;
    }

    for (int i = 0; i < ve->num_removed; i++) {
        n += encode_varint32(buf + n, TAG_REMOVE_FILE);
        n += encode_varint32(buf + n, (uint32_t)ve->removed[i].level);
        n += encode_varint64(buf + n, ve->removed[i].number);
    }

    return n;
}

// 解码 字节流 → VersionEdit
static int version_edit_decode(const uint8_t *buf, size_t len,
                               VersionEdit *ve) {
    version_edit_init(ve);
    size_t pos = 0;

    while (pos < len) {
        uint32_t tag;
        int r = decode_varint32(buf + pos, len - pos, &tag);
        if (r < 0) return -1;
        pos += r;

        switch (tag) {
        case TAG_NEXT_FILE_NUMBER: {
            r = decode_varint64(buf + pos, len - pos, &ve->next_file_number);
            if (r < 0) return -1;
            pos += r;
            ve->has_next_file_number = 1;
            break;
        }
        case TAG_LAST_SEQUENCE: {
            r = decode_varint64(buf + pos, len - pos, &ve->last_sequence);
            if (r < 0) return -1;
            pos += r;
            ve->has_last_sequence = 1;
            break;
        }
        case TAG_ADD_FILE: {
            int idx = ve->num_added;
            uint32_t level;
            r = decode_varint32(buf + pos, len - pos, &level);
            if (r < 0) return -1;
            pos += r;
            ve->added[idx].level = (int)level;

            r = decode_varint64(buf + pos, len - pos,
                                &ve->added[idx].meta.number);
            if (r < 0) return -1;
            pos += r;

            r = decode_varint64(buf + pos, len - pos,
                                &ve->added[idx].meta.file_size);
            if (r < 0) return -1;
            pos += r;

            // smallest key
            uint32_t slen;
            r = decode_varint32(buf + pos, len - pos, &slen);
            if (r < 0) return -1;
            pos += r;
            memcpy(ve->added[idx].meta.smallest, buf + pos, slen);
            ve->added[idx].meta.smallest_len = (int)slen;
            pos += slen;

            // largest key
            uint32_t llen;
            r = decode_varint32(buf + pos, len - pos, &llen);
            if (r < 0) return -1;
            pos += r;
            memcpy(ve->added[idx].meta.largest, buf + pos, llen);
            ve->added[idx].meta.largest_len = (int)llen;
            pos += llen;

            ve->num_added++;
            break;
        }
        case TAG_REMOVE_FILE: {
            int idx = ve->num_removed;
            uint32_t level;
            r = decode_varint32(buf + pos, len - pos, &level);
            if (r < 0) return -1;
            pos += r;
            ve->removed[idx].level = (int)level;

            r = decode_varint64(buf + pos, len - pos,
                                &ve->removed[idx].number);
            if (r < 0) return -1;
            pos += r;

            ve->num_removed++;
            break;
        }
        default:
            return -1; // 未知 tag
        }
    }

    return 0;
}

编码格式简洁高效——内存平均一条 add_file 约 30-50 字节。

Version

Version 是某一时刻所有 SSTable 文件的完整集合:

#define MAX_FILES_PER_LEVEL 128

typedef struct {
    FileMetaData files[MAX_LEVELS][MAX_FILES_PER_LEVEL];
    int          file_count[MAX_LEVELS];
    uint64_t     next_file_number;
    uint64_t     last_sequence;
} Version;

static void version_init(Version *v) {
    memset(v, 0, sizeof(*v));
    v->next_file_number = 1;
}

// 应用 VersionEdit,产生新的 Version 状态
static void version_apply(Version *v, const VersionEdit *ve) {
    // 删除文件
    for (int i = 0; i < ve->num_removed; i++) {
        int level = ve->removed[i].level;
        uint64_t num = ve->removed[i].number;
        for (int j = 0; j < v->file_count[level]; j++) {
            if (v->files[level][j].number == num) {
                // 移除:用最后一个元素覆盖
                v->files[level][j] = v->files[level][v->file_count[level] - 1];
                v->file_count[level]--;
                break;
            }
        }
    }

    // 添加文件
    for (int i = 0; i < ve->num_added; i++) {
        int level = ve->added[i].level;
        v->files[level][v->file_count[level]] = ve->added[i].meta;
        v->file_count[level]++;
    }

    // 更新元数据
    if (ve->has_next_file_number)
        v->next_file_number = ve->next_file_number;
    if (ve->has_last_sequence)
        v->last_sequence = ve->last_sequence;
}

MANIFEST 日志

MANIFEST 文件是 VersionEdit 的持久化日志。它复用第 2 篇的 WAL record 格式(CRC32 + Length + Type + Data),每条 record 的 data 就是一个经过编码的 VersionEdit。

// MANIFEST 写入一条 VersionEdit
static int manifest_write(FILE *fp, const VersionEdit *ve) {
    uint8_t data[4096];
    int data_len = version_edit_encode(ve, data, sizeof(data));
    if (data_len < 0) return -1;

    // WAL record 格式: CRC32(4) + Length(2) + Type(1) + Data
    uint8_t header[7];
    uint16_t len16 = (uint16_t)data_len;
    uint8_t type = 1; // FULL record

    // CRC32 覆盖 type + data
    uint32_t crc = crc32(0, &type, 1);
    crc = crc32(crc, data, data_len);

    memcpy(header, &crc, 4);
    memcpy(header + 4, &len16, 2);
    header[6] = type;

    fwrite(header, 1, 7, fp);
    fwrite(data, 1, data_len, fp);
    fflush(fp);

    return 0;
}

// Recovery:从 MANIFEST 读取所有 VersionEdit,重建 Version
static int manifest_recover(const char *manifest_path, Version *v) {
    version_init(v);

    FILE *fp = fopen(manifest_path, "rb");
    if (!fp) return -1;

    while (1) {
        // 读取 WAL record header
        uint8_t header[7];
        if (fread(header, 1, 7, fp) != 7) break; // EOF

        uint32_t stored_crc;
        memcpy(&stored_crc, header, 4);
        uint16_t data_len;
        memcpy(&data_len, header + 4, 2);
        uint8_t type = header[6];

        if (data_len > 4096) { fclose(fp); return -1; }

        uint8_t data[4096];
        if (fread(data, 1, data_len, fp) != data_len) {
            fclose(fp);
            return -1;
        }

        // 验证 CRC
        uint32_t actual_crc = crc32(0, &type, 1);
        actual_crc = crc32(actual_crc, data, data_len);
        if (actual_crc != stored_crc) {
            fclose(fp);
            return -1;
        }

        // 解码并应用 VersionEdit
        VersionEdit ve;
        if (version_edit_decode(data, data_len, &ve) < 0) {
            fclose(fp);
            return -1;
        }
        version_apply(v, &ve);
    }

    fclose(fp);
    return 0;
}

启动时的恢复流程:

  1. 读取 MANIFEST 文件。
  2. 逐条解码 VersionEdit。
  3. 依次 version_apply(),叠加所有增量。
  4. 最终得到崩溃前的完整文件集合。

整个 Version 系统的精妙之处在于:永远不需要写完整快照。初始时写一条包含所有文件的 edit,之后每次 compaction 只追加增量。即使运行数年,MANIFEST 也只有几 MB(必要时可以 compact MANIFEST 本身——写一个新的完整快照到新 MANIFEST 文件,然后原子切换)。

完整 Compaction 与 Version 更新的原子性

Compaction 的最后一步是”安装结果”,这个操作必须是原子的:

  1. 构建 VersionEditremove 所有输入文件 + add 所有输出文件。
  2. 写 MANIFEST:将 VersionEdit 编码后追加到 MANIFEST 日志。fflush + fsync 确保持久化。
  3. 安装到内存version_apply() 生成新的 Version 对象,挂到 Version 链表头部。
  4. 删除旧文件:遍历输入文件列表,在新 Version 中检查是否仍被引用(可能有并发读操作持有旧 Version)。只有不再被任何 Version 引用的文件才物理删除。

如果在第 2 步之后、第 3 步之前崩溃,恢复时 MANIFEST 能重放到新状态——Compaction 的结果不丢失。如果在第 2 步之前崩溃,MANIFEST 没有记录这次 compaction——重启后重新执行即可(输入文件都还在,输出文件是多余的但会被清理)。

小结:Version 系统通过 VersionEdit 增量记录 + MANIFEST 持久化日志,实现了 Compaction 结果的原子安装和崩溃恢复。它保证任何时刻都有一致的文件视图,并发读操作通过引用计数安全地持有旧版本。


Compaction 策略对比

第 1 篇已经给出了策略对比表。这里展开每种策略的工作原理和数学分析。

Compaction 策略对比

Leveled Compaction(LevelDB / RocksDB 默认)

核心思想:维护严格的层级结构,每层容量 = 上层 × T,同层 key 不重叠。Compaction 时从 Ln 选一个文件,与 Ln+1 中重叠的文件归并,输出回 Ln+1。

写放大分析

每个 key-value 从 L0 一路推到最底层 L(L ≈ 7)。在每一层,每字节数据最多被”读出 → 归并 → 写回”T 次(因为该层容量是上层的 T 倍,数据不断有新内容从上层推下来)。

\[ WA_{\text{leveled}} \approx T \times L \]

LevelDB 默认 T=10, L≈7,写放大约 70。意味着用户写 1 字节,磁盘实际写 70 字节。

读放大:每层至多命中 1 个文件(因为 key 不重叠),加上 Bloom Filter 可以跳过大部分层。实际读放大接近 1-2 次磁盘 I/O。

空间放大:Compaction 过程中需要同时存在输入和输出文件。稳态下约:

\[ SA_{\text{leveled}} \approx 1 + \frac{1}{T} \approx 1.1 \]

空间利用率非常高。

适用场景:读多写少、空间敏感——典型如 LevelDB、RocksDB 在大多数 OLTP 场景下的默认配置。

Size-Tiered Compaction(Cassandra / ScyllaDB)

核心思想:不严格分层。把大小相近的 SSTable 分到同一个”tier”,每个 tier 攒够 T 个同大小的文件后合并为一个更大的文件。

写放大分析

每个 tier 合并一次产生 T 倍大小的文件,总层数约 \(\lceil \log_T(N/F_0) \rceil\)\(F_0\) 是初始文件大小)。每层合并只触发一次归并,写放大约:

\[ WA_{\text{size\text{-}tiered}} \approx T \times \lceil \log_T(N/F_0) \rceil \]

但因为同 tier 内文件不需要像 Leveled 那样反复归并,实际写放大远低于 Leveled。典型值约 10。

读放大:问题在于同一个 tier 内可能有多个文件包含相同的 key 范围。查找时需要搜索每个 tier 的所有文件。读放大较高。

空间放大:合并时需要同时容纳输入(T 个文件)和输出(1 个大文件),最坏约 T 倍:

\[ SA_{\text{size\text{-}tiered}} \approx T \]

T=4 时空间放大可达 4 倍——对存储敏感的场景不友好。

适用场景:写多读少、吞吐优先——Cassandra 的时序数据、日志写入等场景。

Universal Compaction(RocksDB 可选)

核心思想:维护一组按时间排序的 sorted run(每个 run 可以是一个或多个文件)。RocksDB 根据相邻 run 的大小比值和总 run 数来决定合并策略:

这两个旋钮让 Universal Compaction 可以在 Leveled 和 Size-Tiered 之间滑动:

参数 偏向 Leveled 偏向 Size-Tiered
max_size_amplification_percent 小(如 25%) 大(如 200%)
size_ratio

\[ WA_{\text{universal}} \in [WA_{\text{size\text{-}tiered}},\; WA_{\text{leveled}}] \]

适用场景:混合负载,需要根据实际读写比例调优——RocksDB 在一些大规模部署中(如 Facebook 内部的 MyRocks)使用 Universal Compaction。

小结

指标 Leveled Size-Tiered Universal
写放大 高(~70) 低(~10) 可调
读放大 可调
空间放大 低(~1.1) 高(~T) 可调
实现复杂度
典型系统 LevelDB, RocksDB Cassandra RocksDB

本系列选择 Leveled Compaction 作为实现蓝本,不是因为它在所有场景下都最优,而是因为它最能体现 LSM-Tree 在读放大、写放大和空间放大之间的经典取舍,也是理解其他策略的基础。


完整 Demo

把所有部件串起来——构建多个 SSTable(模拟 L0 堆积),执行 compaction,验证归并结果正确:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <zlib.h>
#include <sys/stat.h>

// [前文全部函数定义省略,依次拼接即可编译]
// 包括: varint, BlockBuilder, decode_entry, block_search, bloom,
//       BlockHandle, Footer, TableBuilder, TableReader, read_block,
//       TableIterator, MergeIterator, FileMetaData, VersionEdit,
//       Version, compaction_score, flush/compaction/manifest 函数

int main(void) {
    const char *db_dir = "/tmp/compaction_demo";
    mkdir(db_dir, 0755);

    uint64_t next_file_num = 1;

    // === 构建 3 个 L0 SSTable(模拟 3 次 flush) ===
    FileMetaData l0_files[3];

    for (int batch = 0; batch < 3; batch++) {
        const int N = 50;
        const char *keys[50];
        size_t key_lens[50];
        const char *vals[50];
        size_t val_lens[50];
        char key_bufs[50][32];
        char val_bufs[50][32];

        for (int i = 0; i < N; i++) {
            // 每个 batch 覆盖不同但有重叠的 key 范围
            int k = batch * 20 + i;
            snprintf(key_bufs[i], 32, "key:%06d", k);
            snprintf(val_bufs[i], 32, "val_b%d_%d", batch, k);
            keys[i] = key_bufs[i];
            key_lens[i] = strlen(key_bufs[i]);
            vals[i] = val_bufs[i];
            val_lens[i] = strlen(val_bufs[i]);
        }

        flush_memtable_to_l0(db_dir, next_file_num, keys, key_lens,
                             vals, val_lens, N, &l0_files[batch]);
        printf("Flush #%d: file %06lu, keys [%.*s .. %.*s], %lu bytes\n",
               batch, (unsigned long)next_file_num,
               l0_files[batch].smallest_len, l0_files[batch].smallest,
               l0_files[batch].largest_len, l0_files[batch].largest,
               (unsigned long)l0_files[batch].file_size);
        next_file_num++;
    }

    // === Version 管理:记录 L0 文件 ===
    Version ver;
    version_init(&ver);

    for (int i = 0; i < 3; i++) {
        VersionEdit ve;
        version_edit_init(&ve);
        version_edit_add_file(&ve, 0, &l0_files[i]);
        version_apply(&ver, &ve);
    }

    printf("\nBefore compaction: L0 has %d files\n", ver.file_count[0]);

    // === 写 MANIFEST ===
    char manifest_path[512];
    snprintf(manifest_path, sizeof(manifest_path), "%s/MANIFEST", db_dir);
    FILE *mfp = fopen(manifest_path, "wb");
    for (int i = 0; i < 3; i++) {
        VersionEdit ve;
        version_edit_init(&ve);
        version_edit_add_file(&ve, 0, &l0_files[i]);
        ve.next_file_number = next_file_num;
        ve.has_next_file_number = 1;
        manifest_write(mfp, &ve);
    }
    fclose(mfp);

    // === 执行 Compaction: L0 所有文件 → L1 ===
    const char *input_paths[3];
    char path_bufs[3][512];
    for (int i = 0; i < 3; i++) {
        snprintf(path_bufs[i], 512, "%s/%06lu.sst", db_dir,
                 (unsigned long)l0_files[i].number);
        input_paths[i] = path_bufs[i];
    }

    CompactionState cs = {0};
    int ret = do_compaction(input_paths, 3, db_dir, &next_file_num, 1, &cs);
    printf("Compaction result: %s, produced %d output files\n",
           ret == 0 ? "OK" : "FAIL", cs.num_outputs);

    // 更新 Version
    VersionEdit compact_edit;
    version_edit_init(&compact_edit);
    for (int i = 0; i < 3; i++) {
        version_edit_remove_file(&compact_edit, 0, l0_files[i].number);
    }
    for (int i = 0; i < cs.num_outputs; i++) {
        version_edit_add_file(&compact_edit, 1, &cs.outputs[i]);
        printf("  Output: %06lu [%.*s .. %.*s] %lu bytes\n",
               (unsigned long)cs.outputs[i].number,
               cs.outputs[i].smallest_len, cs.outputs[i].smallest,
               cs.outputs[i].largest_len, cs.outputs[i].largest,
               (unsigned long)cs.outputs[i].file_size);
    }
    compact_edit.next_file_number = next_file_num;
    compact_edit.has_next_file_number = 1;
    version_apply(&ver, &compact_edit);

    printf("\nAfter compaction: L0=%d files, L1=%d files\n",
           ver.file_count[0], ver.file_count[1]);

    // 写 MANIFEST
    mfp = fopen(manifest_path, "ab");
    manifest_write(mfp, &compact_edit);
    fclose(mfp);

    // === 验证读取:从 L1 的输出文件中读取 key ===
    printf("\n=== Verify reads from compacted files ===\n");
    for (int i = 0; i < cs.num_outputs; i++) {
        char fpath[512];
        snprintf(fpath, 512, "%s/%06lu.sst", db_dir,
                 (unsigned long)cs.outputs[i].number);
        TableReader tr;
        if (table_reader_open(&tr, fpath) == 0) {
            // 抽样验证
            const char *samples[] = {"key:000000", "key:000025", "key:000050",
                                     "key:000070", "key:000089"};
            for (int s = 0; s < 5; s++) {
                char *val = NULL; int vlen = 0;
                if (table_reader_get(&tr, samples[s], strlen(samples[s]),
                                     &val, &vlen) == 0) {
                    printf("  GET %-12s => %.*s\n", samples[s], vlen, val);
                    free(val);
                }
            }
            table_reader_close(&tr);
        }
    }

    // === 验证 MANIFEST Recovery ===
    printf("\n=== MANIFEST Recovery ===\n");
    Version recovered;
    if (manifest_recover(manifest_path, &recovered) == 0) {
        printf("Recovered: L0=%d files, L1=%d files, next_file=%lu\n",
               recovered.file_count[0], recovered.file_count[1],
               (unsigned long)recovered.next_file_number);
    }

    free(cs.outputs);
    return 0;
}

预期输出:

Flush #0: file 000001, keys [key:000000 .. key:000049], XXXX bytes
Flush #1: file 000002, keys [key:000020 .. key:000069], XXXX bytes
Flush #2: file 000003, keys [key:000040 .. key:000089], XXXX bytes

Before compaction: L0 has 3 files
Compaction result: OK, produced 1 output files
  Output: 000004 [key:000000 .. key:000089] XXXX bytes

After compaction: L0=0 files, L1=1 files

=== Verify reads from compacted files ===
  GET key:000000   => val_b0_0
  GET key:000025   => val_b1_25
  GET key:000050   => val_b1_50
  GET key:000070   => val_b2_70
  GET key:000089   => val_b2_89

=== MANIFEST Recovery ===
Recovered: L0=0 files, L1=1 files, next_file=5

3 个 batch 的 key 范围有重叠(0-49、20-69、40-89),归并后去重为 90 个 key(key:000000 到 key:000089),输出 1 个文件。key:000025 同时出现在 batch 0 和 batch 1 中,去重后保留归并顺序中先出现的版本。MANIFEST recovery 正确恢复了 compaction 后的文件状态。

注意:本文的简化实现中,key 不含 sequence number,因此”先出现”取决于堆的顺序而非写入时间。生产级实现(如 LevelDB)使用 InternalKey = user_key + sequence,sequence 降序排列保证最新版本总是先出现。完整的 InternalKey 比较器将在第 5 篇的引擎组装中实现。


系列路线图

篇目 标题 核心产出
第 1 篇 LSM-Tree 全景 架构心智模型 + 三种放大的数学推导
第 2 篇 WAL + MemTable WAL 分片策略 + 跳表实现 + 崩溃恢复证明
第 3 篇 SSTable + Bloom Filter Data Block 前缀压缩 + Bloom Filter 双重哈希 + Builder/Reader
第 4 篇 Compaction(本文) 多路归并迭代器 + Version/MANIFEST + 策略对比
第 5 篇 完整引擎 + Rust 重写对比 DB API + 并发 + Snapshot + Iterator + Rust 重写 + benchmark

→ 第 5 篇我们组装完整引擎:DB 接口(Open/Put/Get/Delete)、读写并发控制、启动恢复流程,然后用 Rust 重写并对比性能。



By .