土法炼钢兴趣小组的算法知识备份

【eBPF 内核实现深度拆解】Map 内核实现(下):ringbuf / perfbuf / bloom / queue-stack / LPM

文章导航

分类入口
kernelebpf
标签入口
#ebpf#bpf-maps#ringbuf#bloom-filter#lpm-trie#devmap#perf-event#linux-kernel

目录

上一篇拆解了 hash 和 array 的核心数据结构与并发模型。这一篇文章处理剩下的重要 map 类型——它们不遵循简单的 key-value 模式,而是各自解决一个特定工程问题:ring buffer 解决事件流输出,bloom filter 解决快速排除,LPM trie 解决最长前缀匹配,queue/stack 解决 BPF 程序间的数据传递,devmap/cpumap 解决包重定向。

理解这些 map 的实现,才能在做架构决策时选对工具——例如用 ringbuf 而不是 perfbuf 降低 per-event 头部开销(ringbuf 8 字节头 vs perf 256 字节头,小 payload 时差距显著),或者用 LPM trie 而不是遍历 hash map 来做路由查找。

源码基准:Linux 6.8,路径 kernel/bpf/ringbuf.ckernel/bpf/bloom_filter.ckernel/bpf/lpm_trie.ckernel/bpf/devmap.ckernel/events/core.c

一、BPF_MAP_TYPE_RINGBUF:mmap 双缓冲与 record 提交语义

Ring buffer map(Linux 5.8 引入)是 BPF 程序向用户态输出事件的标准方式。它取代了 perf buffer 成为 libbpf 推荐的默认事件输出机制。

1.1 为什么 ringbuf 要取代 perfbuf

Perf buffer(BPF_MAP_TYPE_PERF_EVENT_ARRAY)是 ringbuf 的前身。它的主要问题:

  1. 每记录开销大:perf buffer 每条记录需要一个 256 字节的管理头(perf_event_header),即使你只输出 4 字节。
  2. 没有记录边界保护:如果生产者写入 100 字节后消费者正好读到第 50 字节处,消费者会读到损坏的记录。需要在应用层重建记录边界。
  3. 内存浪费:每个 CPU 需要独立的一块缓冲区,但各 CPU 的负载可能极不均匀——CPU 3 在丢事件,CPU 7 的缓冲区闲置。

Ringbuf 用共享的环形缓冲区 + 忙位协议解决了这三个问题。

1.2 struct bpf_ringbuf 的内存布局

/* kernel/bpf/ringbuf.c — Linux 6.8 */
struct bpf_ringbuf {
    wait_queue_head_t waitq;             /* epoll 等待队列 */
    struct irq_work work;                /* 跨 CPU 唤醒机制 */
    u64 mask;                            /* 缓冲区大小掩码(2 的幂 - 1) */
    struct page **pages;                 /* 数据页面数组 */
    int nr_pages;                        /* 页面数 */
    spinlock_t spinlock;                 /* 保留操作的自旋锁 */
    atomic_t busy;                       /* 忙位:标记记录正在写入 */
    unsigned long consumer_pos;          /* 消费者已读位置 */
    unsigned long producer_pos;          /* 生产者已写入位置 */
    unsigned long pending_pos;           /* 已提交但未通知消费者的位置 */
    struct bpf_ringbuf_map *ringbuf_map; /* 关联的 ringbuf map */
};

struct bpf_ringbuf_map {
    struct bpf_map map;
    struct bpf_ringbuf *rb;
};

Ring buffer 使用两页的特殊布局:

flowchart TD
    subgraph layout["Ring Buffer 内存布局"]
        direction LR
        P0["Page 0\n(consumer meta-page)\nconsumer_pos\nproducer_pos"] 
        P1["Page 1\n(data page)\nrecord header + payload\nrecord header + payload\n..."]
        P2["Page 2\n(data page)\n... more data ..."]
        PN["Page N\n(data page)\n..."]
    end

    subgraph meta["Consumer Meta-Page (Page 0)"]
        CP["consumer_pos: u64"]
        PP["producer_pos: u64"]
        PD["pending_pos: u64"]
    end

    subgraph rec["Record 结构"]
        R_HDR["len: u32\nlen_discard: u32\n→ 总大小 = (len + 7) & ~7"]
    end

    P0 --> meta
    P1 --> rec

1.3 mmap 双缓冲语义

用户态通过 mmap 访问 ring buffer:

/* 用户态 mmap 的伪结构 */
struct ringbuf_consumer {
    unsigned long *consumer_pos;    /* mmap page 0 */
    unsigned long *producer_pos;    /* mmap page 0 */
    void *data;                     /* mmap page 1+ */
    unsigned long mask;             /* data 区域大小 - 1 */
};

内核和用户态通过 consumer meta-page 中的 consumer_posproducer_pos 无锁同步:

1.4 忙位协议(Busy-Bit Protocol)

Ringbuf 的核心并发机制是忙位协议。它保证生产者和消费者在不使用内核锁的情况下安全协调。

每条 record 的 len_discard 字段的最高位是”忙位”:

/* kernel/bpf/ringbuf.c — 忙位定义 */
#define BPF_RINGBUF_BUSY_BIT  (1 << 31)
#define BPF_RINGBUF_DISCARD_BIT (1 << 30)

生产者流程bpf_ringbuf_reserve + bpf_ringbuf_submit):

/* kernel/bpf/ringbuf.c — bpf_ringbuf_reserve() 简化 */
static void *bpf_ringbuf_reserve(struct bpf_map *map, u64 size, u64 flags)
{
    struct bpf_ringbuf_map *rb_map = container_of(map, ...);
    struct bpf_ringbuf *rb = rb_map->rb;
    u64 cons_pos, prod_pos, new_prod_pos;
    void *rec;

    if (size > RINGBUF_MAX_RECORD_SZ)
        return NULL;

    /* 1. 计算对齐后的大小(8 字节对齐) */
    u32 total_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8);

    /* 2. 尝试在缓冲区中保留空间 */
    prod_pos = smp_load_acquire(&rb->producer_pos);
    new_prod_pos = prod_pos + total_size;

    /* 3. 检查空间:如果 new_prod_pos 越过 consumer_pos,缓冲区满 */
    cons_pos = smp_load_acquire(&rb->consumer_pos);
    if (new_prod_pos - cons_pos > rb->mask + 1)
        return NULL;  /* 缓冲区满 */

    /* 4. 获取 record 指针并设置头部 */
    rec = (void *)rb->pages[1] + (prod_pos & rb->mask);
    struct bpf_ringbuf_hdr *hdr = rec;
    hdr->len = size;
    hdr->pg_off = 0;
    hdr->len_discard = BPF_RINGBUF_BUSY_BIT;  /* 设置忙位! */

    /* 5. 更新 producer_pos(带 release 语义) */
    smp_store_release(&rb->producer_pos, new_prod_pos);

    return rec + BPF_RINGBUF_HDR_SZ;          /* 返回 payload 指针 */
}

提交bpf_ringbuf_submit):

/* kernel/bpf/ringbuf.c — bpf_ringbuf_submit() 简化 */
static void bpf_ringbuf_submit(void *data, u64 flags)
{
    struct bpf_ringbuf_hdr *hdr = data - BPF_RINGBUF_HDR_SZ;

    /* 清除忙位:此操作使记录对消费者可见 */
    smp_store_release(&hdr->len_discard,
                       hdr->len_discard & ~BPF_RINGBUF_BUSY_BIT);
}

消费者流程(用户态):

/* 用户态消费逻辑(伪代码) */
static int consume_ringbuf(struct ringbuf_consumer *c,
                           void (*callback)(void *data, size_t len))
{
    unsigned long cons_pos = *c->consumer_pos;
    unsigned long prod_pos = smp_load_acquire(c->producer_pos);

    while (cons_pos < prod_pos) {
        struct bpf_ringbuf_hdr *hdr = c->data + (cons_pos & c->mask);

        /* 检查忙位:如果设置了,记录还在写入中,跳过 */
        if (hdr->len_discard & BPF_RINGBUF_BUSY_BIT) {
            /* 更新 consumer_pos 以跳过忙的记录 */
            smp_store_release(c->consumer_pos, cons_pos);
            return 0;
        }

        /* 检查丢弃位 */
        if (hdr->len_discard & BPF_RINGBUF_DISCARD_BIT) {
            /* 此记录被丢弃(bpf_ringbuf_discard),跳过 */
            cons_pos += round_up(hdr->len_discard & ~BPF_RINGBUF_DISCARD_BIT, 8);
            continue;
        }

        /* 调用回调处理数据 */
        callback(hdr + 1, hdr->len);

        cons_pos += round_up(hdr->len + BPF_RINGBUF_HDR_SZ, 8);
    }

    /* 更新 consumer_pos */
    smp_store_release(c->consumer_pos, cons_pos);
    return 0;
}

忙位协议的精妙之处:如果消费者碰到忙位,它知道生产者正在写那条记录,于是仅提交之前的消费进度并返回,留下那条未完成的记录不做处理。下次消费时重新检查。生产者一侧,smp_store_release 保证有效的载荷写入在清除忙位之前对消费者可见。

1.5 bpf_ringbuf_discard

除了 submit,BPF 程序还可以 discard

/* kernel/bpf/ringbuf.c — bpf_ringbuf_discard() 简化 */
static void bpf_ringbuf_discard(void *data, u64 flags)
{
    struct bpf_ringbuf_hdr *hdr = data - BPF_RINGBUF_HDR_SZ;

    /* 设置丢弃位并清除忙位 */
    hdr->len_discard |= BPF_RINGBUF_DISCARD_BIT;
    smp_store_release(&hdr->len_discard,
                       hdr->len_discard & ~BPF_RINGBUF_BUSY_BIT);
}

这允许 BPF 程序先 reserve 空间,如果发现数据不需要发送(例如过滤条件不满足),就 discard——消费者看到丢弃位后直接跳过。

1.6 用户态消费者:ring_buffer__new

libbpf 封装了 ring buffer 的用户态消费逻辑:

/* libbpf: ring_buffer__new() 创建消费者 */
struct ring_buffer *ring_buffer__new(int map_fd,
                                     ring_buffer_sample_fn sample_cb,
                                     void *ctx,
                                     const struct ring_buffer_opts *opts);

/* 在事件循环中调用 */
int ring_buffer__consume(struct ring_buffer *rb);
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);

ring_buffer__new() 内部做 mmap: 1. mmap consumer meta-page 2. mmap data pages 3. 设置 epoll 监听(当 producer_pos > consumer_pos 时触发)

1.7 Ringbuf 与 Perfbuf 对比

维度 Ring Buffer Perf Buffer
记录头部大小 8 字节(len + len_discard 256 字节(perf_event_header + padding)
记录边界 显式(忙位协议) 无保护,需应用层重建
跨 CPU 共享 共享一个缓冲区 每 CPU 独立缓冲区
可变大小记录 原生支持 支持但更复杂
内存效率 高(无 CPU 乘数) 低(每 CPU 独立)
通知机制 epoll(bpf_ringbuf_notify perf_event 的 poll
用户态 API ring_buffer__new() / ring_buffer__consume() perf_buffer__new() / perf_buffer__poll()
引入版本 Linux 5.8 Linux 4.x 起

经验法则:新项目始终使用 ringbuf,除非需要与旧版工具(perf、老版本 bcc 等)兼容。

二、BPF_MAP_TYPE_PERF_EVENT_ARRAY:perf 事件输出路径

Perf event array map 的 map 条目存储的是 struct perf_event * 指针——由用户态通过 perf_event_open() 创建,然后通过 bpf_map_update_elem() 存入 map。BPF 程序能通过 bpf_perf_event_output() helper 向这些事件写入数据。

2.1 存储的指针:perf_event

/* kernel/trace/bpf_trace.c — perf_event_array_map_update_elem() 简化 */
static long perf_event_array_map_update_elem(struct bpf_map *map, void *key,
                                              void *value, u64 map_flags)
{
    struct bpf_array *array = container_of(map, struct bpf_array, map);
    u32 index = *(u32 *)key;

    /* value 是一个 fd,内核将其转换为 struct perf_event * */
    struct perf_event *event = perf_event_get_from_fd(*(int *)value);
    if (IS_ERR(event))
        return PTR_ERR(event);

    WRITE_ONCE(array->ptrs[index], event);  /* 存储 perf_event 指针 */
    return 0;
}

Perf event array 底层仍然是一个 bpf_array,但 value 不是普通数据,而是 struct perf_event *

2.2 bpf_perf_event_output 路径

/* kernel/trace/bpf_trace.c — bpf_perf_event_output() 简化逻辑 */
/*
 * BPF_CALL_5(bpf_perf_event_output, struct pt_regs *, regs,
 *            struct bpf_map *, map, u64, flags, void *, data, u64, size)
 *
 * 执行路径:
 * 1. 从 map 获取 perf_event *(通过 index 或 key)
 * 2. 调用 perf_event_output(event, data, size)
 *    → perf_output_begin(&handle, event, size)  // 保留空间
 *    → perf_output_copy(&handle, data, size)    // 复制数据
 *    → perf_output_end(&handle)                  // 提交
 * 3. perf_output_begin 内部:
 *    - 检查缓冲区空间
 *    - 写入 perf_event_header(type, size, misc)
 *    - 如果缓冲区满,返回 NULL
 */

Perf buffer 的数据格式(perf_event_header):

/* include/uapi/linux/perf_event.h */
struct perf_event_header {
    __u32   type;         /* PERF_RECORD_SAMPLE 等 */
    __u16   misc;         /* 混杂标志(CPU、时间戳模式等) */
    __u16   size;         /* 记录总大小 */
};

perf_event_header 之后是可变长度的 payload,BPF 程序写入的 data 紧接其后。

2.3 为什么 perfbuf 还在用

尽管 ringbuf 在几乎所有方面都更好,perfbuf 仍然存在,因为:

  1. 工具链兼容perf 工具、老版本 bcc、许多已有的观测工具基于 perf buffer。ringbuf 的数据格式与 perf 不兼容。
  2. per-CPU 隐式语义:perf buffer 的每 CPU 独立缓冲区恰好适合某些场景——例如 perf 工具对每 CPU 事件的采样。ringbuf 是共享的,需要记录中显式包含 CPU 信息。
  3. 内核内部使用:某些内核子系统(bpf_trace_printk 的底层、bpf_perf_event_read)内部已经深度绑定 perf event。

三、BPF_MAP_TYPE_BLOOM_FILTER:概率集合成员测试

Bloom filter map(Linux 5.16 引入,通过 kfunc 而非传统 helper 访问)提供概率集合成员测试:bpf_bloom_filter_lookup() 最坏情况下错误地返回 “存在”(false positive),但永远不会错误地返回 “不存在”(no false negative)。

3.1 数据结构

/* kernel/bpf/bloom_filter.c — Linux 6.8 */
struct bpf_bloom_filter {
    struct bpf_map map;
    u32 bitset_mask;                     /* 位图大小掩码:nr_hash_funcs * BITS_PER_LONG - 1 */
    u32 hash_seed;                       /* 哈希种子 */
    u32 nr_hash_funcs;                   /* 哈希函数数量 */
    /* 紧随: unsigned long bitset[]  ——实际位图 */
    unsigned long bitset[];
};

3.2 核心操作

Bloom filter 通过 kfunc 暴露,而不是传统的 BPF helper:

/* kernel/bpf/bloom_filter.c */
__bpf_kfunc u64 bpf_bloom_filter_lookup(void *map, void *data,
                                         u32 data_len, void *data2,
                                         u32 data2_len)
{
    struct bpf_bloom_filter *bloom = container_of(*(struct bpf_map **)map, ...);
    u32 n = bloom->nr_hash_funcs;
    u32 i;

    for (i = 0; i < n; i++) {
        /* 使用 jhash2 计算第 i 个 hash */
        u32 hash = jhash2((const u32 *)data, data_len / 4,
                          bloom->hash_seed + i);
        /* 检查对应位是否设置 */
        if (!test_bit(hash & bloom->bitset_mask, bloom->bitset))
            return 0;  /* 确定不存在 */
    }

    return 1;  /* 可能存在(false positive 可能) */
}

__bpf_kfunc void bpf_bloom_filter_push(void *map, void *data,
                                        u32 data_len, void *data2,
                                        u32 data2_len)
{
    struct bpf_bloom_filter *bloom = ...;
    u32 n = bloom->nr_hash_funcs;
    u32 i;

    for (i = 0; i < n; i++) {
        u32 hash = jhash2(...);
        set_bit(hash & bloom->bitset_mask, bloom->bitset);
    }
}

关键参数 nr_hash_funcs 决定了 false positive rate。假设位图有 m 位,预期插入 k 个元素,h 个哈希函数,则 false positive rate 约为:

\[p \approx \left(1 - e^{-kh/m}\right)^h\]

3.3 典型用例

/* BPF 程序:用 bloom filter 去重连接 */
struct {
    __uint(type, BPF_MAP_TYPE_BLOOM_FILTER);
    __type(value, __u32);
    __uint(max_entries, 1 << 20);       /* 1M 位 */
    __uint(map_extra, 4);               /* 4 个哈希函数 */
    __uint(nr_hash_funcs, 4);
} conn_seen SEC(".maps");

SEC("kprobe/tcp_connect")
int filter_new_conn(struct pt_regs *ctx)
{
    struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx);
    __u32 daddr = BPF_CORE_READ(sk, __sk_common.skc_daddr);
    __u16 dport = BPF_CORE_READ(sk, __sk_common.skc_dport);

    /* 先查 bloom filter:这个连接之前见过吗? */
    if (bpf_bloom_filter_lookup(&conn_seen, &daddr, sizeof(daddr),
                                 &dport, sizeof(dport)))
        return 0;  /* 可能见过,跳过 */

    /* 确实没见过,加入并处理 */
    bpf_bloom_filter_push(&conn_seen, &daddr, sizeof(daddr),
                           &dport, sizeof(dport));
    /* ... 记录新连接事件 ... */
    return 0;
}

Bloom filter 在这里做第一关快速排除,避免对每个包都执行完整的事件记录——如果新连接占比 0.1%,那么 99.9% 的包在 bloom filter 这关就被快速过滤掉了。

四、BPF_MAP_TYPE_QUEUE 与 BPF_MAP_TYPE_STACK

Queue 和 stack map 提供简单的 FIFO / LIFO 语义。它们没有 key,只有 value——本质上是 BPF 程序之间的数据通道。

4.1 数据结构

/* kernel/bpf/queue_stack_maps.c — Linux 6.8 */
struct bpf_queue_stack {
    struct bpf_map map;
    raw_spinlock_t lock;
    u64 head;                            /* 队列头 / 栈顶 */
    u64 tail;                            /* 队列尾 */
    u64 size;                            /* 当前元素数 */
    u32 elem_size;                       /* 每个元素的大小(含填充) */
    /* 紧随:value 数组 */
    char elements[] __aligned(8);
};

虽然名字里有 “queue” 和 “stack”,但实现上用的是同一个结构体:一个环形缓冲区(circular buffer),headtail 指针管理可用空间。区别只在于 pushpop 的方向:

操作 Queue(FIFO) Stack(LIFO)
push 写到 tailtail++ 写到 headhead++
pop headhead++ head-1head--
peek head head-1

4.2 操作语义

/* kernel/bpf/queue_stack_maps.c — push / pop / peek */
static long queue_stack_map_push_elem(struct bpf_map *map, void *value, u64 flags)
{
    struct bpf_queue_stack *qs = container_of(map, ...);
    raw_spin_lock_irqsave(&qs->lock, flags);

    if (qs->size >= qs->map.max_entries) {
        ret = -ENOSPC;  /* 队列满 */
        goto out;
    }

    /* 复制 value 到环形缓冲区的尾部 */
    memcpy(&qs->elements[qs->tail * qs->elem_size], value, qs->map.value_size);
    qs->tail = (qs->tail + 1) % qs->map.max_entries;
    qs->size++;

out:
    raw_spin_unlock_irqrestore(&qs->lock, flags);
    return ret;
}

关键设计: - 一把全局自旋锁:queue/stack 的操作全部序列化。这是因为 head/tail 指针的更新需要原子性,而环形缓冲区本身是紧凑的连续内存,无法像 hash map 那样做 per-bucket 锁。 - 固定大小:环形缓冲区,满了就 -ENOSPC。 - 非抢占安全:pop/push 操作持锁更新 head、tail 和 size。如果在 NMI 上下文中 push 而另一个 CPU 持有锁,会失败。因此 queue/stack 不适合在热路径上用于 BPF-用户态通信,更适合 BPF-BPF 程序间的数据传递(如尾调用程序之间的消息传递)。

4.3 适用场景

/* 例:BPF 程序 A 收集事件,尾调用程序 B 批量处理 */
struct {
    __uint(type, BPF_MAP_TYPE_QUEUE);
    __type(value, struct event);
    __uint(max_entries, 1024);
} event_queue SEC(".maps");

/* 程序 A:推入事件 */
SEC("kprobe/some_func")
int producer(struct pt_regs *ctx)
{
    struct event evt = {};
    evt.pid = bpf_get_current_pid_tgid() >> 32;
    /* ... 填充其他字段 ... */
    bpf_map_push_elem(&event_queue, &evt, BPF_EXIST);
    return 0;
}

/* 程序 B:批处理事件(通过尾调用触发) */
SEC("kprobe/another_func")
int consumer(struct pt_regs *ctx)
{
    struct event evt;
    while (bpf_map_pop_elem(&event_queue, &evt) == 0) {
        /* 处理事件 */
    }
    return 0;
}

五、BPF_MAP_TYPE_LPM_TRIE:最长前缀匹配

LPM trie map(BPF_MAP_TYPE_LPM_TRIE)实现最长前缀匹配。key 的结构是:

/* include/uapi/linux/bpf.h */
struct bpf_lpm_trie_key {
    __u32   prefixlen;   /* 前缀长度(位数) */
    __u8    data[];      /* 实际 key 数据(仅前 prefixlen 位有效) */
};

5.1 压缩 Trie 结构

LPM trie 使用 Patricia trie(radix tree 的一种压缩变体):

/* kernel/bpf/lpm_trie.c — Linux 6.8 */
struct lpm_trie_node {
    struct rcu_head rcu;
    struct lpm_trie_node __rcu *child[2];  /* 两个子节点(bit 0 和 bit 1) */
    u32 prefixlen;                          /* 该节点的前缀长度 */
    u32 flags;
    u8 data[];                              /* key 前缀 + value */
};

struct lpm_trie {
    struct bpf_map map;
    struct lpm_trie_node __rcu *root;       /* 根节点 */
    size_t n_entries;                        /* 节点数 */
    size_t max_prefixlen;                    /* 最大前缀长度(bits) */
    size_t data_size;                        /* 每个节点的总数据大小 */
    raw_spinlock_t lock;
};

5.2 插入算法

/* kernel/bpf/lpm_trie.c — trie_update_elem() 简化逻辑 */
/*
 * 对于要插入的 key(prefixlen=P, data=D):
 *
 * 1. 从根节点开始遍历
 * 2. 在每个节点 N:
 *    a. 计算 N 的 key 与新 key 的公共前缀长度(common prefix length)
 *    b. 如果 common >= N->prefixlen:确定下一个分支位
 *       - 用新 key 在那一位的 bit 值决定走 child[0] 或 child[1]
 *       - 继续向下
 *    c. 如果 common < N->prefixlen:
 *       - 需要在此位置插入一个中间节点
 *       - 中间节点的 prefixlen = common
 *       - 根据 common+1 位的值,将 N 和新节点分别挂在中间节点的两个 child 上
 *
 * 3. 到达叶子节点或找到匹配位置:
 *    - 如果 prefixlen 相同:替换 value
 *    - 如果 prefixlen 不同:在正确位置插入
 *
 * 整个操作在 trie->lock 下进行。
 */

5.3 查找算法

/* kernel/bpf/lpm_trie.c — trie_lookup_elem() 简化 */
static void *trie_lookup_elem(struct bpf_map *map, void *_key)
{
    struct lpm_trie *trie = container_of(map, struct lpm_trie, map);
    struct bpf_lpm_trie_key *key = _key;
    struct lpm_trie_node *node, *found = NULL;

    /* RCU 读锁已由 BPF 运行上下文的 preempt_disable 保证 */
    node = rcu_dereference_check(trie->root, rcu_read_lock_bh_held());

    /* 沿 trie 向下,记录沿途所有匹配节点 */
    while (node) {
        /* 计算 public prefix:查找 key 的前缀与该节点 key 的公共部分 */
        u32 limit = min(key->prefixlen, node->prefixlen);
        u32 prefixlen = longest_prefix_match(trie, node, key);

        if (prefixlen >= limit) {
            /* 如果完全匹配该节点的前缀 */
            if (node->prefixlen >= key->prefixlen) {
                /* 节点比查找 key 更具体:可以返回 */
                found = node;
            }
            /* 继续向下查找 */
            u32 next_bit = extract_bit(key->data, node->prefixlen);
            node = rcu_dereference_check(node->child[next_bit], ...);
        } else {
            /* 不匹配:这条路径上没有更多可能 */
            break;
        }
    }

    if (found)
        return found->data + trie->data_size;
    return NULL;
}

查找返回与 key 的前 prefixlen 位完全匹配且具有最长前缀长度的节点的 value。例如:

已插入的 key prefixlen 查找 key 查找 prefixlen 结果
10.0.0.0 8 10.0.0.1 32 匹配(8 位前缀)
10.0.0.0 16 10.0.0.1 32 匹配(16 位前缀,更精确)
10.0.0.0 8 192.168.0.1 32 不匹配

5.4 与 IPv4 路由的对应关系

LPM trie 最典型的应用是 IP 路由表:

/* 例:BPF 路由表 */
struct {
    __uint(type, BPF_MAP_TYPE_LPM_TRIE);
    __type(key, struct ipv4_lpm_key);     /* { u32 prefixlen; u32 addr; } */
    __type(value, struct route_info);      /* { u32 ifindex; u8 dmac[6]; } */
    __uint(max_entries, 65536);
    __uint(map_flags, BPF_F_NO_PREALLOC);
} route_table SEC(".maps");

/* 查找:匹配最长前缀路由 */
SEC("xdp")
int xdp_router(struct xdp_md *ctx)
{
    struct ipv4_lpm_key key = {
        .prefixlen = 32,
        .addr = dst_ip,
    };
    struct route_info *ri;

    ri = bpf_map_lookup_elem(&route_table, &key);  /* LPM lookup */
    if (ri) {
        return bpf_redirect(ri->ifindex, 0);
    }
    return XDP_PASS;
}

这种方式比在 hash map 中手动遍历所有前缀做 try-and-match 要高效得多——trie 可以在 \(O(L)\) 时间内完成查找,其中 \(L\) 是 key 的位宽,而不是尝试 \(O(n)\) 个条目。

六、重定向 Map:devmap / cpumap / xskmap

这三类 map 用于 BPF 程序的包重定向。它们的 key 是一个索引(通常是 ifindex 或 CPU 号),value 包含重定向的目标信息。

6.1 BPF_MAP_TYPE_DEVMAP / DEVMAP_HASH

Devmap 用于 bpf_redirect_map() 将包重定向到另一块网卡:

/* kernel/bpf/devmap.c — struct bpf_devmap_val */
struct bpf_devmap_val {
    __u32 ifindex;       /* 目标网络接口 */
    union {
        int   fd;        /* bpf_prog FD(可选:在重定向前执行) */
        __u32 id;        /* bpf_prog ID */
    } bpf_prog;
};

重定向路径:bpf_redirect_map(devmap, key, flags)dev_map_enqueue()__dev_xmit_enqueue() → 目标设备的 XDP 或 TC 入口。

Devmap 有两种模式:BPF_MAP_TYPE_DEVMAP(array 实现,整数 key)和 BPF_MAP_TYPE_DEVMAP_HASH(hash 实现,任意 key)。

6.2 BPF_MAP_TYPE_CPUMAP

Cpumap 将包重定向到另一个 CPU:

/* kernel/bpf/cpumap.c — struct bpf_cpumap_val */
struct bpf_cpumap_val {
    __u32 qsize;         /* 每 CPU 队列大小 */
    union {
        int   fd;        /* 在目标 CPU 上执行的 BPF 程序 */
        __u32 id;
    } bpf_prog;
};

重定向路径:bpf_redirect_map(cpumap, cpu, flags)cpu_map_enqueue()__ptr_ring_produce()(每 CPU 环形队列)→ ksoftirqd 在目标 CPU 上执行 BPF 程序。

Cpumap 的典型用途:将需要 CPU 亲和性的处理从收到包的 CPU 卸载到另一个 CPU 上——例如 RSS 把 HTTP 流分配到 CPU 3,但 CPU 3 正在跑高优先级任务,可以用 cpumap 将处理重定向到 CPU 7。

6.3 BPF_MAP_TYPE_XSKMAP

XSK map(AF_XDP socket map)用于将包重定向到用户态 AF_XDP socket:

/* kernel/bpf/xskmap.c — struct bpf_xskmap_val */
struct xskmap_entry {
    struct xdp_sock __rcu *xsk;
    struct xdp_sock __rcu *xsk_map_entry;
};

重定向路径:bpf_redirect_map(xskmap, key, flags)xsk_map_redirect()__xsk_map_redirect()xsk_rcv() → AF_XDP socket 的 RX ring。

AF_XDP 使用共享的 UMEM(User Memory)区域,数据通过 DMA 直接写入用户态预注册的内存——零拷贝从网卡到用户态。

6.4 重定向 Map 对比

Map 类型 重定向目标 执行上下文变化 典型用途
DEVMAP 另一块网卡 驱动程序上下文(可能不同设备) XDP 负载均衡、容器网络
CPUMAP 另一个 CPU 软中断上下文(目标 CPU) CPU 亲和性、分布式处理
XSKMAP AF_XDP socket 用户态进程 高性能数据包捕获(DPDK 替代)

七、全类型对比速查

综合第 06、07 两篇,所有常用 BPF map 类型一览:

Map 类型 寻址方式 查找复杂度 写入复杂度 读并发 写并发 mmap 删除 典型场景
HASH 任意 key O(1) avg O(1) RCU per-bucket 锁 连接跟踪、通用 KVS
ARRAY u32 索引 O(1) O(1) 无锁 无锁 配置表、静态路由
PERCPU_HASH 任意 key O(1) O(1) RCU + per-CPU per-bucket 锁 + per-CPU per-CPU 统计
PERCPU_ARRAY u32 索引 O(1) O(1) per-CPU 无锁 per-CPU 无锁 计数器、临时缓冲区
RINGBUF N/A(追加写) N/A O(1) append N/A 忙位协议 是(消费者) N/A 事件输出
PERF_EVENT_ARRAY N/A(写入 preset event) N/A O(1) N/A perf lock 否(perf API) N/A 兼容旧工具
BLOOM_FILTER 集合成员 O(h)(h=hash 数) O(h) 无锁(atomic bit) atomic bit 否(只能消亡重建) 去重、排除
QUEUE N/A(FIFO) N/A O(1) push/pop 全局锁 全局锁 N/A(pop 即删) BPF-BPF 通信
STACK N/A(LIFO) N/A O(1) push/pop 全局锁 全局锁 N/A(pop 即删) BPF-BPF 通信
LPM_TRIE 前缀 key O(L)(L=位宽) O(L) RCU trie 锁 IP 路由表
DEVMAP u32 索引 O(1) O(1) 无锁 per-CPU 队列 包重定向
CPUMAP u32 索引 O(1) O(1) 无锁 per-CPU 队列 CPU 间卸载
XSKMAP u32 索引 O(1) O(1) 无锁 per-socket 队列 AF_XDP 重定向

八、Map 类型选择的工程原则

选择 map 类型时可参考以下原则:

  1. 读多写少、key 可顺序编号:用 array。连续内存、O(1) 无锁读、支持 mmap——没有比这更快的了。
  2. 稀疏 key、需要删除、动态大小需求:用 hash。per-bucket 锁最大化写入并发,RCU 保证读不阻塞写。
  3. 多 CPU 并发读写同一逻辑对象:用 per-CPU 变体。this_cpu_ptr() 消除了缓存行竞争。
  4. 输出事件流:用 ringbuf。如果必须兼容 perf 工具,用 perf event array。
  5. 快速排除不存在的 key:用 bloom filter 做第一关过滤,再查 hash map。
  6. IP 路由 / CIDR 匹配:用 LPM trie,不要手工在 hash map 里做前缀匹配。
  7. BPF 程序间数据传递:用 queue/stack。不要用 hash map 模拟队列——queue/stack 的环形缓冲区更紧凑,且 pop 后内存自动复用。
  8. 跨设备/跨 CPU 包重定向:用 devmap/cpumap/xskmap。不要手工在 BPF 里伪造 __sk_buff——内核提供了专门优化的重定向路径。

下一篇(第 08 篇)将从 map 转到 helper 函数子系统——map 创建和操作最终都要通过 helper 函数来调用,理解 helper 的注册、类型检查和参数验证,是把整个 BPF 运行时图景拼完整的关键一步。

参考

内核源码

规范与文档

补充资料

上一篇Map 内核实现(上):hash / array / per-CPU(第 06 篇)

下一篇Helper 函数子系统:注册、类型检查与参数传递(第 08 篇)

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-06-12 · kernel / ebpf

【eBPF 内核实现深度拆解】从验证器到 JIT,从 BTF 到调度器

eBPF 内核虚拟机内部实现系统讲解:BPF 指令集与寄存器机器、验证器的抽象解释与状态裁剪、JIT 编译器后端、Map 各类型的并发与内存模型、helper 函数注册与类型检查、BTF 格式规范与 CO-RE 重定位引擎、libbpf 加载器工程、fentry/fexit 蹦床机制、sched_ext 调度器内核接口。面向想读懂 eBPF 内核源码、写生产级 BPF 程序的系统工程师。


By .