上一篇拆解了 hash 和 array 的核心数据结构与并发模型。这一篇文章处理剩下的重要 map 类型——它们不遵循简单的 key-value 模式,而是各自解决一个特定工程问题:ring buffer 解决事件流输出,bloom filter 解决快速排除,LPM trie 解决最长前缀匹配,queue/stack 解决 BPF 程序间的数据传递,devmap/cpumap 解决包重定向。
理解这些 map 的实现,才能在做架构决策时选对工具——例如用 ringbuf 而不是 perfbuf 降低 per-event 头部开销(ringbuf 8 字节头 vs perf 256 字节头,小 payload 时差距显著),或者用 LPM trie 而不是遍历 hash map 来做路由查找。
源码基准:Linux 6.8,路径
kernel/bpf/ringbuf.c、kernel/bpf/bloom_filter.c、kernel/bpf/lpm_trie.c、kernel/bpf/devmap.c、kernel/events/core.c。
一、BPF_MAP_TYPE_RINGBUF:mmap 双缓冲与 record 提交语义
Ring buffer map(Linux 5.8 引入)是 BPF 程序向用户态输出事件的标准方式。它取代了 perf buffer 成为 libbpf 推荐的默认事件输出机制。
1.1 为什么 ringbuf 要取代 perfbuf
Perf
buffer(BPF_MAP_TYPE_PERF_EVENT_ARRAY)是
ringbuf 的前身。它的主要问题:
- 每记录开销大:perf buffer
每条记录需要一个 256
字节的管理头(
perf_event_header),即使你只输出 4 字节。 - 没有记录边界保护:如果生产者写入 100 字节后消费者正好读到第 50 字节处,消费者会读到损坏的记录。需要在应用层重建记录边界。
- 内存浪费:每个 CPU 需要独立的一块缓冲区,但各 CPU 的负载可能极不均匀——CPU 3 在丢事件,CPU 7 的缓冲区闲置。
Ringbuf 用共享的环形缓冲区 + 忙位协议解决了这三个问题。
1.2 struct bpf_ringbuf 的内存布局
/* kernel/bpf/ringbuf.c — Linux 6.8 */
struct bpf_ringbuf {
wait_queue_head_t waitq; /* epoll 等待队列 */
struct irq_work work; /* 跨 CPU 唤醒机制 */
u64 mask; /* 缓冲区大小掩码(2 的幂 - 1) */
struct page **pages; /* 数据页面数组 */
int nr_pages; /* 页面数 */
spinlock_t spinlock; /* 保留操作的自旋锁 */
atomic_t busy; /* 忙位:标记记录正在写入 */
unsigned long consumer_pos; /* 消费者已读位置 */
unsigned long producer_pos; /* 生产者已写入位置 */
unsigned long pending_pos; /* 已提交但未通知消费者的位置 */
struct bpf_ringbuf_map *ringbuf_map; /* 关联的 ringbuf map */
};
struct bpf_ringbuf_map {
struct bpf_map map;
struct bpf_ringbuf *rb;
};Ring buffer 使用两页的特殊布局:
flowchart TD
subgraph layout["Ring Buffer 内存布局"]
direction LR
P0["Page 0\n(consumer meta-page)\nconsumer_pos\nproducer_pos"]
P1["Page 1\n(data page)\nrecord header + payload\nrecord header + payload\n..."]
P2["Page 2\n(data page)\n... more data ..."]
PN["Page N\n(data page)\n..."]
end
subgraph meta["Consumer Meta-Page (Page 0)"]
CP["consumer_pos: u64"]
PP["producer_pos: u64"]
PD["pending_pos: u64"]
end
subgraph rec["Record 结构"]
R_HDR["len: u32\nlen_discard: u32\n→ 总大小 = (len + 7) & ~7"]
end
P0 --> meta
P1 --> rec
- Page 0(consumer meta-page):包含
consumer_pos和producer_pos,用户态 mmap 后直接读取。 - Pages 1-N(data pages):环形排列,存储
record。每条 record 带 8 字节头:4 字节
len(有效载荷长度)+ 4 字节len_discard。
1.3 mmap 双缓冲语义
用户态通过 mmap 访问 ring buffer:
/* 用户态 mmap 的伪结构 */
struct ringbuf_consumer {
unsigned long *consumer_pos; /* mmap page 0 */
unsigned long *producer_pos; /* mmap page 0 */
void *data; /* mmap page 1+ */
unsigned long mask; /* data 区域大小 - 1 */
};内核和用户态通过 consumer meta-page 中的
consumer_pos 和 producer_pos
无锁同步:
- 生产者(BPF 程序)写入后更新
producer_pos。 - 消费者(用户态)读取后更新
consumer_pos。 - 双方通过这两个位置坐标协调:
consumer_pos <= producer_pos始终成立,两者之间的区域是待消费的数据。
1.4 忙位协议(Busy-Bit Protocol)
Ringbuf 的核心并发机制是忙位协议。它保证生产者和消费者在不使用内核锁的情况下安全协调。
每条 record 的 len_discard
字段的最高位是”忙位”:
/* kernel/bpf/ringbuf.c — 忙位定义 */
#define BPF_RINGBUF_BUSY_BIT (1 << 31)
#define BPF_RINGBUF_DISCARD_BIT (1 << 30)生产者流程(bpf_ringbuf_reserve
+ bpf_ringbuf_submit):
/* kernel/bpf/ringbuf.c — bpf_ringbuf_reserve() 简化 */
static void *bpf_ringbuf_reserve(struct bpf_map *map, u64 size, u64 flags)
{
struct bpf_ringbuf_map *rb_map = container_of(map, ...);
struct bpf_ringbuf *rb = rb_map->rb;
u64 cons_pos, prod_pos, new_prod_pos;
void *rec;
if (size > RINGBUF_MAX_RECORD_SZ)
return NULL;
/* 1. 计算对齐后的大小(8 字节对齐) */
u32 total_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
/* 2. 尝试在缓冲区中保留空间 */
prod_pos = smp_load_acquire(&rb->producer_pos);
new_prod_pos = prod_pos + total_size;
/* 3. 检查空间:如果 new_prod_pos 越过 consumer_pos,缓冲区满 */
cons_pos = smp_load_acquire(&rb->consumer_pos);
if (new_prod_pos - cons_pos > rb->mask + 1)
return NULL; /* 缓冲区满 */
/* 4. 获取 record 指针并设置头部 */
rec = (void *)rb->pages[1] + (prod_pos & rb->mask);
struct bpf_ringbuf_hdr *hdr = rec;
hdr->len = size;
hdr->pg_off = 0;
hdr->len_discard = BPF_RINGBUF_BUSY_BIT; /* 设置忙位! */
/* 5. 更新 producer_pos(带 release 语义) */
smp_store_release(&rb->producer_pos, new_prod_pos);
return rec + BPF_RINGBUF_HDR_SZ; /* 返回 payload 指针 */
}提交(bpf_ringbuf_submit):
/* kernel/bpf/ringbuf.c — bpf_ringbuf_submit() 简化 */
static void bpf_ringbuf_submit(void *data, u64 flags)
{
struct bpf_ringbuf_hdr *hdr = data - BPF_RINGBUF_HDR_SZ;
/* 清除忙位:此操作使记录对消费者可见 */
smp_store_release(&hdr->len_discard,
hdr->len_discard & ~BPF_RINGBUF_BUSY_BIT);
}消费者流程(用户态):
/* 用户态消费逻辑(伪代码) */
static int consume_ringbuf(struct ringbuf_consumer *c,
void (*callback)(void *data, size_t len))
{
unsigned long cons_pos = *c->consumer_pos;
unsigned long prod_pos = smp_load_acquire(c->producer_pos);
while (cons_pos < prod_pos) {
struct bpf_ringbuf_hdr *hdr = c->data + (cons_pos & c->mask);
/* 检查忙位:如果设置了,记录还在写入中,跳过 */
if (hdr->len_discard & BPF_RINGBUF_BUSY_BIT) {
/* 更新 consumer_pos 以跳过忙的记录 */
smp_store_release(c->consumer_pos, cons_pos);
return 0;
}
/* 检查丢弃位 */
if (hdr->len_discard & BPF_RINGBUF_DISCARD_BIT) {
/* 此记录被丢弃(bpf_ringbuf_discard),跳过 */
cons_pos += round_up(hdr->len_discard & ~BPF_RINGBUF_DISCARD_BIT, 8);
continue;
}
/* 调用回调处理数据 */
callback(hdr + 1, hdr->len);
cons_pos += round_up(hdr->len + BPF_RINGBUF_HDR_SZ, 8);
}
/* 更新 consumer_pos */
smp_store_release(c->consumer_pos, cons_pos);
return 0;
}忙位协议的精妙之处:如果消费者碰到忙位,它知道生产者正在写那条记录,于是仅提交之前的消费进度并返回,留下那条未完成的记录不做处理。下次消费时重新检查。生产者一侧,smp_store_release
保证有效的载荷写入在清除忙位之前对消费者可见。
1.5 bpf_ringbuf_discard
除了 submit,BPF 程序还可以
discard:
/* kernel/bpf/ringbuf.c — bpf_ringbuf_discard() 简化 */
static void bpf_ringbuf_discard(void *data, u64 flags)
{
struct bpf_ringbuf_hdr *hdr = data - BPF_RINGBUF_HDR_SZ;
/* 设置丢弃位并清除忙位 */
hdr->len_discard |= BPF_RINGBUF_DISCARD_BIT;
smp_store_release(&hdr->len_discard,
hdr->len_discard & ~BPF_RINGBUF_BUSY_BIT);
}这允许 BPF 程序先 reserve 空间,如果发现数据不需要发送(例如过滤条件不满足),就 discard——消费者看到丢弃位后直接跳过。
1.6 用户态消费者:ring_buffer__new
libbpf 封装了 ring buffer 的用户态消费逻辑:
/* libbpf: ring_buffer__new() 创建消费者 */
struct ring_buffer *ring_buffer__new(int map_fd,
ring_buffer_sample_fn sample_cb,
void *ctx,
const struct ring_buffer_opts *opts);
/* 在事件循环中调用 */
int ring_buffer__consume(struct ring_buffer *rb);
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);ring_buffer__new() 内部做 mmap: 1. mmap
consumer meta-page 2. mmap data pages 3. 设置 epoll 监听(当
producer_pos > consumer_pos 时触发)
1.7 Ringbuf 与 Perfbuf 对比
| 维度 | Ring Buffer | Perf Buffer |
|---|---|---|
| 记录头部大小 | 8 字节(len +
len_discard) |
256 字节(perf_event_header +
padding) |
| 记录边界 | 显式(忙位协议) | 无保护,需应用层重建 |
| 跨 CPU 共享 | 共享一个缓冲区 | 每 CPU 独立缓冲区 |
| 可变大小记录 | 原生支持 | 支持但更复杂 |
| 内存效率 | 高(无 CPU 乘数) | 低(每 CPU 独立) |
| 通知机制 | epoll(bpf_ringbuf_notify) |
perf_event 的 poll |
| 用户态 API | ring_buffer__new() /
ring_buffer__consume() |
perf_buffer__new() /
perf_buffer__poll() |
| 引入版本 | Linux 5.8 | Linux 4.x 起 |
经验法则:新项目始终使用
ringbuf,除非需要与旧版工具(perf、老版本
bcc 等)兼容。
二、BPF_MAP_TYPE_PERF_EVENT_ARRAY:perf 事件输出路径
Perf event array map 的 map 条目存储的是
struct perf_event * 指针——由用户态通过
perf_event_open() 创建,然后通过
bpf_map_update_elem() 存入 map。BPF 程序能通过
bpf_perf_event_output() helper
向这些事件写入数据。
2.1 存储的指针:perf_event
/* kernel/trace/bpf_trace.c — perf_event_array_map_update_elem() 简化 */
static long perf_event_array_map_update_elem(struct bpf_map *map, void *key,
void *value, u64 map_flags)
{
struct bpf_array *array = container_of(map, struct bpf_array, map);
u32 index = *(u32 *)key;
/* value 是一个 fd,内核将其转换为 struct perf_event * */
struct perf_event *event = perf_event_get_from_fd(*(int *)value);
if (IS_ERR(event))
return PTR_ERR(event);
WRITE_ONCE(array->ptrs[index], event); /* 存储 perf_event 指针 */
return 0;
}Perf event array 底层仍然是一个
bpf_array,但 value 不是普通数据,而是
struct perf_event *。
2.2 bpf_perf_event_output 路径
/* kernel/trace/bpf_trace.c — bpf_perf_event_output() 简化逻辑 */
/*
* BPF_CALL_5(bpf_perf_event_output, struct pt_regs *, regs,
* struct bpf_map *, map, u64, flags, void *, data, u64, size)
*
* 执行路径:
* 1. 从 map 获取 perf_event *(通过 index 或 key)
* 2. 调用 perf_event_output(event, data, size)
* → perf_output_begin(&handle, event, size) // 保留空间
* → perf_output_copy(&handle, data, size) // 复制数据
* → perf_output_end(&handle) // 提交
* 3. perf_output_begin 内部:
* - 检查缓冲区空间
* - 写入 perf_event_header(type, size, misc)
* - 如果缓冲区满,返回 NULL
*/Perf buffer
的数据格式(perf_event_header):
/* include/uapi/linux/perf_event.h */
struct perf_event_header {
__u32 type; /* PERF_RECORD_SAMPLE 等 */
__u16 misc; /* 混杂标志(CPU、时间戳模式等) */
__u16 size; /* 记录总大小 */
};在 perf_event_header 之后是可变长度的
payload,BPF 程序写入的 data 紧接其后。
2.3 为什么 perfbuf 还在用
尽管 ringbuf 在几乎所有方面都更好,perfbuf 仍然存在,因为:
- 工具链兼容:
perf工具、老版本bcc、许多已有的观测工具基于 perf buffer。ringbuf 的数据格式与perf不兼容。 - per-CPU 隐式语义:perf buffer 的每 CPU
独立缓冲区恰好适合某些场景——例如
perf工具对每 CPU 事件的采样。ringbuf 是共享的,需要记录中显式包含 CPU 信息。 - 内核内部使用:某些内核子系统(
bpf_trace_printk的底层、bpf_perf_event_read)内部已经深度绑定 perf event。
三、BPF_MAP_TYPE_BLOOM_FILTER:概率集合成员测试
Bloom filter map(Linux 5.16 引入,通过 kfunc 而非传统
helper
访问)提供概率集合成员测试:bpf_bloom_filter_lookup()
最坏情况下错误地返回 “存在”(false
positive),但永远不会错误地返回 “不存在”(no false
negative)。
3.1 数据结构
/* kernel/bpf/bloom_filter.c — Linux 6.8 */
struct bpf_bloom_filter {
struct bpf_map map;
u32 bitset_mask; /* 位图大小掩码:nr_hash_funcs * BITS_PER_LONG - 1 */
u32 hash_seed; /* 哈希种子 */
u32 nr_hash_funcs; /* 哈希函数数量 */
/* 紧随: unsigned long bitset[] ——实际位图 */
unsigned long bitset[];
};3.2 核心操作
Bloom filter 通过 kfunc 暴露,而不是传统的 BPF helper:
/* kernel/bpf/bloom_filter.c */
__bpf_kfunc u64 bpf_bloom_filter_lookup(void *map, void *data,
u32 data_len, void *data2,
u32 data2_len)
{
struct bpf_bloom_filter *bloom = container_of(*(struct bpf_map **)map, ...);
u32 n = bloom->nr_hash_funcs;
u32 i;
for (i = 0; i < n; i++) {
/* 使用 jhash2 计算第 i 个 hash */
u32 hash = jhash2((const u32 *)data, data_len / 4,
bloom->hash_seed + i);
/* 检查对应位是否设置 */
if (!test_bit(hash & bloom->bitset_mask, bloom->bitset))
return 0; /* 确定不存在 */
}
return 1; /* 可能存在(false positive 可能) */
}
__bpf_kfunc void bpf_bloom_filter_push(void *map, void *data,
u32 data_len, void *data2,
u32 data2_len)
{
struct bpf_bloom_filter *bloom = ...;
u32 n = bloom->nr_hash_funcs;
u32 i;
for (i = 0; i < n; i++) {
u32 hash = jhash2(...);
set_bit(hash & bloom->bitset_mask, bloom->bitset);
}
}关键参数 nr_hash_funcs 决定了 false positive
rate。假设位图有 m 位,预期插入 k
个元素,h 个哈希函数,则 false positive rate
约为:
\[p \approx \left(1 - e^{-kh/m}\right)^h\]
3.3 典型用例
/* BPF 程序:用 bloom filter 去重连接 */
struct {
__uint(type, BPF_MAP_TYPE_BLOOM_FILTER);
__type(value, __u32);
__uint(max_entries, 1 << 20); /* 1M 位 */
__uint(map_extra, 4); /* 4 个哈希函数 */
__uint(nr_hash_funcs, 4);
} conn_seen SEC(".maps");
SEC("kprobe/tcp_connect")
int filter_new_conn(struct pt_regs *ctx)
{
struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx);
__u32 daddr = BPF_CORE_READ(sk, __sk_common.skc_daddr);
__u16 dport = BPF_CORE_READ(sk, __sk_common.skc_dport);
/* 先查 bloom filter:这个连接之前见过吗? */
if (bpf_bloom_filter_lookup(&conn_seen, &daddr, sizeof(daddr),
&dport, sizeof(dport)))
return 0; /* 可能见过,跳过 */
/* 确实没见过,加入并处理 */
bpf_bloom_filter_push(&conn_seen, &daddr, sizeof(daddr),
&dport, sizeof(dport));
/* ... 记录新连接事件 ... */
return 0;
}Bloom filter 在这里做第一关快速排除,避免对每个包都执行完整的事件记录——如果新连接占比 0.1%,那么 99.9% 的包在 bloom filter 这关就被快速过滤掉了。
四、BPF_MAP_TYPE_QUEUE 与 BPF_MAP_TYPE_STACK
Queue 和 stack map 提供简单的 FIFO / LIFO 语义。它们没有 key,只有 value——本质上是 BPF 程序之间的数据通道。
4.1 数据结构
/* kernel/bpf/queue_stack_maps.c — Linux 6.8 */
struct bpf_queue_stack {
struct bpf_map map;
raw_spinlock_t lock;
u64 head; /* 队列头 / 栈顶 */
u64 tail; /* 队列尾 */
u64 size; /* 当前元素数 */
u32 elem_size; /* 每个元素的大小(含填充) */
/* 紧随:value 数组 */
char elements[] __aligned(8);
};虽然名字里有 “queue” 和
“stack”,但实现上用的是同一个结构体:一个环形缓冲区(circular
buffer),head 和 tail
指针管理可用空间。区别只在于 push 和
pop 的方向:
| 操作 | Queue(FIFO) | Stack(LIFO) |
|---|---|---|
| push | 写到 tail,tail++ |
写到 head,head++ |
| pop | 读 head,head++ |
读 head-1,head-- |
| peek | 读 head |
读 head-1 |
4.2 操作语义
/* kernel/bpf/queue_stack_maps.c — push / pop / peek */
static long queue_stack_map_push_elem(struct bpf_map *map, void *value, u64 flags)
{
struct bpf_queue_stack *qs = container_of(map, ...);
raw_spin_lock_irqsave(&qs->lock, flags);
if (qs->size >= qs->map.max_entries) {
ret = -ENOSPC; /* 队列满 */
goto out;
}
/* 复制 value 到环形缓冲区的尾部 */
memcpy(&qs->elements[qs->tail * qs->elem_size], value, qs->map.value_size);
qs->tail = (qs->tail + 1) % qs->map.max_entries;
qs->size++;
out:
raw_spin_unlock_irqrestore(&qs->lock, flags);
return ret;
}关键设计: - 一把全局自旋锁:queue/stack
的操作全部序列化。这是因为 head/tail
指针的更新需要原子性,而环形缓冲区本身是紧凑的连续内存,无法像
hash map 那样做 per-bucket 锁。 -
固定大小:环形缓冲区,满了就
-ENOSPC。 -
非抢占安全:pop/push 操作持锁更新
head、tail 和 size。如果在 NMI 上下文中 push 而另一个 CPU
持有锁,会失败。因此 queue/stack 不适合在热路径上用于
BPF-用户态通信,更适合 BPF-BPF
程序间的数据传递(如尾调用程序之间的消息传递)。
4.3 适用场景
/* 例:BPF 程序 A 收集事件,尾调用程序 B 批量处理 */
struct {
__uint(type, BPF_MAP_TYPE_QUEUE);
__type(value, struct event);
__uint(max_entries, 1024);
} event_queue SEC(".maps");
/* 程序 A:推入事件 */
SEC("kprobe/some_func")
int producer(struct pt_regs *ctx)
{
struct event evt = {};
evt.pid = bpf_get_current_pid_tgid() >> 32;
/* ... 填充其他字段 ... */
bpf_map_push_elem(&event_queue, &evt, BPF_EXIST);
return 0;
}
/* 程序 B:批处理事件(通过尾调用触发) */
SEC("kprobe/another_func")
int consumer(struct pt_regs *ctx)
{
struct event evt;
while (bpf_map_pop_elem(&event_queue, &evt) == 0) {
/* 处理事件 */
}
return 0;
}五、BPF_MAP_TYPE_LPM_TRIE:最长前缀匹配
LPM trie
map(BPF_MAP_TYPE_LPM_TRIE)实现最长前缀匹配。key
的结构是:
/* include/uapi/linux/bpf.h */
struct bpf_lpm_trie_key {
__u32 prefixlen; /* 前缀长度(位数) */
__u8 data[]; /* 实际 key 数据(仅前 prefixlen 位有效) */
};5.1 压缩 Trie 结构
LPM trie 使用 Patricia trie(radix tree 的一种压缩变体):
/* kernel/bpf/lpm_trie.c — Linux 6.8 */
struct lpm_trie_node {
struct rcu_head rcu;
struct lpm_trie_node __rcu *child[2]; /* 两个子节点(bit 0 和 bit 1) */
u32 prefixlen; /* 该节点的前缀长度 */
u32 flags;
u8 data[]; /* key 前缀 + value */
};
struct lpm_trie {
struct bpf_map map;
struct lpm_trie_node __rcu *root; /* 根节点 */
size_t n_entries; /* 节点数 */
size_t max_prefixlen; /* 最大前缀长度(bits) */
size_t data_size; /* 每个节点的总数据大小 */
raw_spinlock_t lock;
};5.2 插入算法
/* kernel/bpf/lpm_trie.c — trie_update_elem() 简化逻辑 */
/*
* 对于要插入的 key(prefixlen=P, data=D):
*
* 1. 从根节点开始遍历
* 2. 在每个节点 N:
* a. 计算 N 的 key 与新 key 的公共前缀长度(common prefix length)
* b. 如果 common >= N->prefixlen:确定下一个分支位
* - 用新 key 在那一位的 bit 值决定走 child[0] 或 child[1]
* - 继续向下
* c. 如果 common < N->prefixlen:
* - 需要在此位置插入一个中间节点
* - 中间节点的 prefixlen = common
* - 根据 common+1 位的值,将 N 和新节点分别挂在中间节点的两个 child 上
*
* 3. 到达叶子节点或找到匹配位置:
* - 如果 prefixlen 相同:替换 value
* - 如果 prefixlen 不同:在正确位置插入
*
* 整个操作在 trie->lock 下进行。
*/5.3 查找算法
/* kernel/bpf/lpm_trie.c — trie_lookup_elem() 简化 */
static void *trie_lookup_elem(struct bpf_map *map, void *_key)
{
struct lpm_trie *trie = container_of(map, struct lpm_trie, map);
struct bpf_lpm_trie_key *key = _key;
struct lpm_trie_node *node, *found = NULL;
/* RCU 读锁已由 BPF 运行上下文的 preempt_disable 保证 */
node = rcu_dereference_check(trie->root, rcu_read_lock_bh_held());
/* 沿 trie 向下,记录沿途所有匹配节点 */
while (node) {
/* 计算 public prefix:查找 key 的前缀与该节点 key 的公共部分 */
u32 limit = min(key->prefixlen, node->prefixlen);
u32 prefixlen = longest_prefix_match(trie, node, key);
if (prefixlen >= limit) {
/* 如果完全匹配该节点的前缀 */
if (node->prefixlen >= key->prefixlen) {
/* 节点比查找 key 更具体:可以返回 */
found = node;
}
/* 继续向下查找 */
u32 next_bit = extract_bit(key->data, node->prefixlen);
node = rcu_dereference_check(node->child[next_bit], ...);
} else {
/* 不匹配:这条路径上没有更多可能 */
break;
}
}
if (found)
return found->data + trie->data_size;
return NULL;
}查找返回与 key 的前 prefixlen 位完全匹配且具有最长前缀长度的节点的 value。例如:
| 已插入的 key | prefixlen | 查找 key | 查找 prefixlen | 结果 |
|---|---|---|---|---|
10.0.0.0 |
8 | 10.0.0.1 |
32 | 匹配(8 位前缀) |
10.0.0.0 |
16 | 10.0.0.1 |
32 | 匹配(16 位前缀,更精确) |
10.0.0.0 |
8 | 192.168.0.1 |
32 | 不匹配 |
5.4 与 IPv4 路由的对应关系
LPM trie 最典型的应用是 IP 路由表:
/* 例:BPF 路由表 */
struct {
__uint(type, BPF_MAP_TYPE_LPM_TRIE);
__type(key, struct ipv4_lpm_key); /* { u32 prefixlen; u32 addr; } */
__type(value, struct route_info); /* { u32 ifindex; u8 dmac[6]; } */
__uint(max_entries, 65536);
__uint(map_flags, BPF_F_NO_PREALLOC);
} route_table SEC(".maps");
/* 查找:匹配最长前缀路由 */
SEC("xdp")
int xdp_router(struct xdp_md *ctx)
{
struct ipv4_lpm_key key = {
.prefixlen = 32,
.addr = dst_ip,
};
struct route_info *ri;
ri = bpf_map_lookup_elem(&route_table, &key); /* LPM lookup */
if (ri) {
return bpf_redirect(ri->ifindex, 0);
}
return XDP_PASS;
}这种方式比在 hash map 中手动遍历所有前缀做 try-and-match 要高效得多——trie 可以在 \(O(L)\) 时间内完成查找,其中 \(L\) 是 key 的位宽,而不是尝试 \(O(n)\) 个条目。
六、重定向 Map:devmap / cpumap / xskmap
这三类 map 用于 BPF 程序的包重定向。它们的 key
是一个索引(通常是 ifindex 或 CPU 号),value
包含重定向的目标信息。
6.1 BPF_MAP_TYPE_DEVMAP / DEVMAP_HASH
Devmap 用于 bpf_redirect_map()
将包重定向到另一块网卡:
/* kernel/bpf/devmap.c — struct bpf_devmap_val */
struct bpf_devmap_val {
__u32 ifindex; /* 目标网络接口 */
union {
int fd; /* bpf_prog FD(可选:在重定向前执行) */
__u32 id; /* bpf_prog ID */
} bpf_prog;
};重定向路径:bpf_redirect_map(devmap, key, flags)
→ dev_map_enqueue() →
__dev_xmit_enqueue() → 目标设备的 XDP 或 TC
入口。
Devmap
有两种模式:BPF_MAP_TYPE_DEVMAP(array
实现,整数 key)和
BPF_MAP_TYPE_DEVMAP_HASH(hash 实现,任意
key)。
6.2 BPF_MAP_TYPE_CPUMAP
Cpumap 将包重定向到另一个 CPU:
/* kernel/bpf/cpumap.c — struct bpf_cpumap_val */
struct bpf_cpumap_val {
__u32 qsize; /* 每 CPU 队列大小 */
union {
int fd; /* 在目标 CPU 上执行的 BPF 程序 */
__u32 id;
} bpf_prog;
};重定向路径:bpf_redirect_map(cpumap, cpu, flags)
→ cpu_map_enqueue() →
__ptr_ring_produce()(每 CPU 环形队列)→
ksoftirqd 在目标 CPU 上执行 BPF 程序。
Cpumap 的典型用途:将需要 CPU 亲和性的处理从收到包的 CPU 卸载到另一个 CPU 上——例如 RSS 把 HTTP 流分配到 CPU 3,但 CPU 3 正在跑高优先级任务,可以用 cpumap 将处理重定向到 CPU 7。
6.3 BPF_MAP_TYPE_XSKMAP
XSK map(AF_XDP socket map)用于将包重定向到用户态 AF_XDP socket:
/* kernel/bpf/xskmap.c — struct bpf_xskmap_val */
struct xskmap_entry {
struct xdp_sock __rcu *xsk;
struct xdp_sock __rcu *xsk_map_entry;
};重定向路径:bpf_redirect_map(xskmap, key, flags)
→ xsk_map_redirect() →
__xsk_map_redirect() → xsk_rcv() →
AF_XDP socket 的 RX ring。
AF_XDP 使用共享的 UMEM(User Memory)区域,数据通过 DMA 直接写入用户态预注册的内存——零拷贝从网卡到用户态。
6.4 重定向 Map 对比
| Map 类型 | 重定向目标 | 执行上下文变化 | 典型用途 |
|---|---|---|---|
| DEVMAP | 另一块网卡 | 驱动程序上下文(可能不同设备) | XDP 负载均衡、容器网络 |
| CPUMAP | 另一个 CPU | 软中断上下文(目标 CPU) | CPU 亲和性、分布式处理 |
| XSKMAP | AF_XDP socket | 用户态进程 | 高性能数据包捕获(DPDK 替代) |
七、全类型对比速查
综合第 06、07 两篇,所有常用 BPF map 类型一览:
| Map 类型 | 寻址方式 | 查找复杂度 | 写入复杂度 | 读并发 | 写并发 | mmap | 删除 | 典型场景 |
|---|---|---|---|---|---|---|---|---|
| HASH | 任意 key | O(1) avg | O(1) | RCU | per-bucket 锁 | 否 | 是 | 连接跟踪、通用 KVS |
| ARRAY | u32 索引 | O(1) | O(1) | 无锁 | 无锁 | 是 | 否 | 配置表、静态路由 |
| PERCPU_HASH | 任意 key | O(1) | O(1) | RCU + per-CPU | per-bucket 锁 + per-CPU | 否 | 是 | per-CPU 统计 |
| PERCPU_ARRAY | u32 索引 | O(1) | O(1) | per-CPU 无锁 | per-CPU 无锁 | 否 | 否 | 计数器、临时缓冲区 |
| RINGBUF | N/A(追加写) | N/A | O(1) append | N/A | 忙位协议 | 是(消费者) | N/A | 事件输出 |
| PERF_EVENT_ARRAY | N/A(写入 preset event) | N/A | O(1) | N/A | perf lock | 否(perf API) | N/A | 兼容旧工具 |
| BLOOM_FILTER | 集合成员 | O(h)(h=hash 数) | O(h) | 无锁(atomic bit) | atomic bit | 否 | 否(只能消亡重建) | 去重、排除 |
| QUEUE | N/A(FIFO) | N/A | O(1) push/pop | 全局锁 | 全局锁 | 否 | N/A(pop 即删) | BPF-BPF 通信 |
| STACK | N/A(LIFO) | N/A | O(1) push/pop | 全局锁 | 全局锁 | 否 | N/A(pop 即删) | BPF-BPF 通信 |
| LPM_TRIE | 前缀 key | O(L)(L=位宽) | O(L) | RCU | trie 锁 | 否 | 是 | IP 路由表 |
| DEVMAP | u32 索引 | O(1) | O(1) | 无锁 | per-CPU 队列 | 否 | 是 | 包重定向 |
| CPUMAP | u32 索引 | O(1) | O(1) | 无锁 | per-CPU 队列 | 否 | 是 | CPU 间卸载 |
| XSKMAP | u32 索引 | O(1) | O(1) | 无锁 | per-socket 队列 | 否 | 是 | AF_XDP 重定向 |
八、Map 类型选择的工程原则
选择 map 类型时可参考以下原则:
- 读多写少、key 可顺序编号:用 array。连续内存、O(1) 无锁读、支持 mmap——没有比这更快的了。
- 稀疏 key、需要删除、动态大小需求:用 hash。per-bucket 锁最大化写入并发,RCU 保证读不阻塞写。
- 多 CPU 并发读写同一逻辑对象:用 per-CPU
变体。
this_cpu_ptr()消除了缓存行竞争。 - 输出事件流:用 ringbuf。如果必须兼容
perf工具,用 perf event array。 - 快速排除不存在的 key:用 bloom filter 做第一关过滤,再查 hash map。
- IP 路由 / CIDR 匹配:用 LPM trie,不要手工在 hash map 里做前缀匹配。
- BPF 程序间数据传递:用 queue/stack。不要用 hash map 模拟队列——queue/stack 的环形缓冲区更紧凑,且 pop 后内存自动复用。
- 跨设备/跨 CPU 包重定向:用
devmap/cpumap/xskmap。不要手工在 BPF 里伪造
__sk_buff——内核提供了专门优化的重定向路径。
下一篇(第 08 篇)将从 map 转到 helper 函数子系统——map 创建和操作最终都要通过 helper 函数来调用,理解 helper 的注册、类型检查和参数验证,是把整个 BPF 运行时图景拼完整的关键一步。
参考
内核源码
- Linux 6.8
kernel/bpf/ringbuf.c— ring buffer 实现(bpf_ringbuf_reserve、bpf_ringbuf_submit、bpf_ringbuf_discard) - Linux 6.8
kernel/bpf/bloom_filter.c— bloom filter 实现 - Linux 6.8
kernel/bpf/lpm_trie.c— LPM trie 实现(trie_lookup_elem、trie_update_elem) - Linux 6.8
kernel/bpf/queue_stack_maps.c— queue/stack 实现 - Linux 6.8
kernel/bpf/devmap.c— devmap 实现 - Linux 6.8
kernel/bpf/cpumap.c— cpumap 实现 - Linux 6.8
kernel/bpf/xskmap.c— xsk map 实现 - Linux 6.8
kernel/trace/bpf_trace.c—bpf_perf_event_output实现 - Linux 6.8
kernel/events/core.c— perf event 内核实现
规范与文档
Documentation/bpf/maps.rst— 内核 BPF map 文档Documentation/bpf/ringbuf.rst— ring buffer 设计文档include/uapi/linux/bpf.h—enum bpf_map_type和 map 属性定义
补充资料
上一篇:Map 内核实现(上):hash / array / per-CPU(第 06 篇)
下一篇:Helper 函数子系统:注册、类型检查与参数传递(第 08 篇)
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【eBPF 内核实现深度拆解】Map 内核实现(上):hash / array / per-CPU 的数据结构与并发模型
从 bpf_map_ops 虚函数表出发,逐层拆解 BPF_MAP_TYPE_HASH、BPF_MAP_TYPE_ARRAY、per-CPU 变体的内核实现——htab 的 bucket 链表与 prealloc、bpf_array 的零拷贝共享、per-CPU 分配器的无锁语义。
【eBPF 内核实现深度拆解】从验证器到 JIT,从 BTF 到调度器
eBPF 内核虚拟机内部实现系统讲解:BPF 指令集与寄存器机器、验证器的抽象解释与状态裁剪、JIT 编译器后端、Map 各类型的并发与内存模型、helper 函数注册与类型检查、BTF 格式规范与 CO-RE 重定位引擎、libbpf 加载器工程、fentry/fexit 蹦床机制、sched_ext 调度器内核接口。面向想读懂 eBPF 内核源码、写生产级 BPF 程序的系统工程师。
【eBPF 内核实现深度拆解】BPF 指令集解码:寄存器机器、调用约定与指令编码
从 eBPF 虚拟机的 11 个 64-bit 寄存器和 struct bpf_insn 出发,逐条拆解 ALU64/ALU32、跳转、加载存储、call 四类指令的字段语义与编码格式,建立后续 verifier 和 JIT 讨论的精确基础。
【eBPF 内核实现深度拆解】验证器框架:从 BPF_PROG_LOAD 到 do_check()
跟踪 BPF_PROG_LOAD 系统调用的内核执行路径,逐层拆解 bpf_prog_load()→bpf_check()→do_check_main() 的调用链,建立 verifier 执行全景——这是理解 verifier 安全保证的入口。