假设你是一位网络安全工程师,手里有一份包含 30000
条恶意特征码的规则库。每秒钟有数十万个数据包涌入防火墙,你需要在每个数据包的载荷里同时搜索所有特征码,延迟必须控制在微秒级别。用
strstr 逐条匹配?30000 次线性扫描,流量高峰时
CPU 直接拉满。用正则引擎?NFA 回溯爆炸,DFA
状态指数膨胀。
1975 年,Alfred V. Aho 和 Margaret J. Corasick 在贝尔实验室发表了一篇只有 12 页的论文,给出了一个优雅到近乎完美的答案:把所有模式串构建成一棵 Trie,在 Trie 上叠加 KMP 的失配思想,得到一台有限状态自动机。文本只需要扫描一遍——不回溯、不重启——就能同时找到所有模式串的所有出现位置。时间复杂度 \(O(n + m + z)\),其中 \(n\) 是文本长度,\(m\) 是所有模式串的总长度,\(z\) 是匹配总数。这就是 Aho-Corasick 自动机,简称 AC 自动机。
今天它仍然是 Snort、Suricata、ClamAV 等安全软件的核心引擎。本文将从零开始构建这台自动机,写一份可以编译运行的 C 实现,然后把它放到真实工程场景中接受考验。
一、单模式匹配与多模式匹配:问题的本质差异
单模式匹配问题:给定文本 \(T[0..n-1]\) 和模式 \(P[0..m-1]\),找出 \(P\) 在 \(T\) 中的所有出现位置。KMP、Boyer-Moore、Rabin-Karp 都是这一问题的经典解法,时间复杂度 \(O(n + m)\)。
多模式匹配问题:给定文本 \(T[0..n-1]\) 和模式集合 \(\{P_1, P_2, \dots, P_k\}\),找出所有 \(P_i\) 在 \(T\) 中的所有出现位置。如果对每个模式串单独跑一次 KMP,总复杂度是 \(O(n \cdot k + M)\),其中 \(M = \sum|P_i|\)。当 \(k\) 很大时——比如入侵检测系统中 \(k\) 可以到几万——这个 \(O(nk)\) 因子让人无法接受。
多模式匹配的关键需求是:文本只扫描一遍。这意味着我们需要一种数据结构,能够在读入文本的每个字符时,同时推进所有模式串的匹配状态。AC 自动机正是为此而生。
让我们用一组具体的模式串来贯穿全文:
模式集合 P = {he, she, his, hers}
文本 T = "ushers"
期望输出:
| 模式 | 出现位置 |
|---|---|
| she | 1 |
| he | 2 |
| hers | 2 |
注意 he 同时是 she 的后缀和
hers 的前缀——AC
自动机必须在一次扫描中捕获所有这些重叠匹配。
二、Trie 的构建:多模式的共享前缀
在深入 AC
自动机之前,我们先把它的骨架——Trie——搞清楚。Trie(又叫前缀树、字典树)是一棵有根树,每条边标记一个字符,从根到某个节点的路径拼接起来就是一个字符串。Trie
天然地把公共前缀折叠在一起:如果你插入 hello 和
help,前四个字符 h-e-l
只存储一次。这种前缀共享是 AC
自动机高效的基石——模式串之间的公共前缀越多,Trie
越紧凑,自动机的状态数越少。
AC 自动机的第一步是把所有模式串插入一棵
Trie(字典树)。Trie
的核心思想是共享公共前缀:he、hers、his
共享前缀 h,因此它们在 Trie 中只占用一个
h 节点。
数据结构定义
每个 Trie 节点需要以下字段:
| 字段 | 含义 |
|---|---|
children[SIGMA] |
子节点指针数组,SIGMA 为字母表大小 |
fail |
失败指针,指向最长真后缀对应的 Trie 节点 |
output |
输出链表,记录在此节点终止的所有模式串 |
depth |
节点深度,用于计算匹配位置 |
对于我们的模式集合
{he, she, his, hers},构建出的 Trie 如下:
root(0)
/ \
h(1) s(3)
/ \ \
e(2) i(6) h(4)
| | \
r(8) s(7) e(5)
|
s(9)
括号内是状态编号。状态
2(he)、5(she)、7(his)、9(hers)是输出状态——到达这些状态意味着找到了对应的模式串。
构建算法
Trie 的构建就是逐个插入模式串。对每个模式串,从根节点出发,沿着字符路径走:如果子节点存在就继续走,不存在就新建。模式串的最后一个字符对应的节点标记为输出节点。
void trie_insert(AC *ac, const char *pattern, int pattern_id)
{
int cur = 0;
for (int i = 0; pattern[i]; i++) {
int c = (unsigned char)pattern[i];
if (ac->nodes[cur].children[c] == -1) {
ac->nodes[cur].children[c] = ac->num_states++;
memset(&ac->nodes[ac->num_states - 1], 0,
sizeof(ACNode));
for (int j = 0; j < ALPHA_SIZE; j++)
ac->nodes[ac->num_states - 1].children[j] = -1;
ac->nodes[ac->num_states - 1].fail = 0;
ac->nodes[ac->num_states - 1].output = -1;
ac->nodes[ac->num_states - 1].depth =
ac->nodes[cur].depth + 1;
}
cur = ac->nodes[cur].children[c];
}
ac->nodes[cur].output = pattern_id;
}时间复杂度 \(O(M)\),其中 \(M = \sum|P_i|\)。空间上每个节点需要一个大小为 \(|\Sigma|\) 的数组,对于 ASCII 字母表(\(|\Sigma| = 128\) 或 256)来说,每个节点占用约 1 KB。后面会讨论压缩方案。
三、失败函数:KMP 在 Trie 上的推广
KMP 算法的核心是 failure 函数(也叫
next 数组):当模式串在位置 \(j\)
失配时,跳到最长真前缀等于真后缀的位置继续匹配,避免从头重来。AC
自动机把这个思想推广到了 Trie 上。
定义
对于 Trie 中的节点 \(u\),设从根到 \(u\) 的路径拼出的字符串为 \(L(u)\)。\(u\) 的失败指针 \(fail(u)\) 指向另一个 Trie 节点 \(v\),使得 \(L(v)\) 是 \(L(u)\) 的最长真后缀,并且 \(L(v)\) 本身也是 Trie 中某条路径的前缀。
直觉上说:当自动机沿着 Trie 走到节点 \(u\),接下来的字符无法匹配任何子节点时,自动机跳到 \(fail(u)\) 继续尝试。这就像 KMP 中的「回退但不回溯文本指针」——文本指针永远只前进,决不后退。
构建算法
失败指针的构建使用 BFS(广度优先搜索),按层处理 Trie 节点:
- 根节点的所有直接子节点的失败指针指向根节点(深度为 1 的节点,其真后缀只能是空串)。
- 对于更深的节点,设当前节点为 \(u\),其父节点为 \(p\),\(p \to u\) 的边标记为字符 \(c\)。则从 \(fail(p)\) 出发,沿着失败链往上跳,直到找到一个节点 \(v\) 使得 \(v\) 有标记为 \(c\) 的子节点,那么 \(fail(u) = v.children[c]\)。如果一直跳到根节点都没有找到,则 \(fail(u) = root\)。
用我们的例子说明。考虑节点 5(路径
she):
- 父节点是 4(
sh),\(fail(4) = 1\)(h)。 - 节点 1 有子节点
e,即节点 2。 - 所以 \(fail(5) = 2\)。
这意味着 she 的最长真后缀 he
出现在 Trie 中。当自动机在扫描文本时到达状态 5(识别出
she),它同时也处于状态 2 的「影子」中——因此
he 也被识别出来。
失败指针全表
| 状态 | 路径 | fail | 原因 |
|---|---|---|---|
| 0 | (root) | - | 根节点无失败指针 |
| 1 | h | 0 | h 的真后缀只有空串 |
| 2 | he | 0 | e 不是任何模式串的前缀 |
| 3 | s | 0 | s 的真后缀只有空串 |
| 4 | sh | 1 | h 是 Trie 中的前缀 |
| 5 | she | 2 | he 是 Trie 中的前缀 |
| 6 | hi | 0 | i 不是任何模式串的前缀 |
| 7 | his | 3 | s 是 Trie 中的前缀 |
| 8 | her | 0 | r 不是任何模式串的前缀(尽管
er 也不是) |
| 9 | hers | 3 | s 是 Trie 中的前缀 |
请参考下面的自动机示意图:
四、输出函数:沿失败链收集所有匹配
到达一个节点时,我们可能需要报告多个匹配。比如到达状态
5(she)时,不仅要报告
she,还要报告 he——因为
he 是 she 的后缀。
输出函数 \(output(u)\) 返回节点 \(u\) 及其失败链上所有输出状态的模式串集合:
\[ output(u) = \{P_i \mid u \text{ 是 } P_i \text{ 的终止节点}\} \cup output(fail(u)) \]
在实现中,我们不需要每次都遍历整条失败链。可以在 BFS
构建失败指针的同时,为每个节点维护一个
dict_suffix_link(字典后缀链接),直接跳到失败链上最近的输出节点:
if (ac->nodes[fail_state].output != -1)
ac->nodes[child].dict_suffix = fail_state;
else
ac->nodes[child].dict_suffix =
ac->nodes[fail_state].dict_suffix;这样在搜索阶段,收集输出的代码就是:
void collect_output(AC *ac, int state, int text_pos)
{
/* 先检查当前节点自身 */
if (ac->nodes[state].output != -1) {
int pid = ac->nodes[state].output;
int start = text_pos - ac->nodes[state].depth + 1;
printf("Pattern %d found at position %d\n", pid, start);
}
/* 再沿字典后缀链接向上走 */
int s = ac->nodes[state].dict_suffix;
while (s > 0) {
int pid = ac->nodes[s].output;
int start = text_pos - ac->nodes[s].depth + 1;
printf("Pattern %d found at position %d\n", pid, start);
s = ac->nodes[s].dict_suffix;
}
}字典后缀链接把输出收集从 \(O(\text{失败链长度})\) 降低到 \(O(\text{匹配数})\)——只访问真正有输出的节点。
五、自动机的构建:goto、failure、output 三位一体
让我们把前面的三个步骤整合起来。AC 自动机的构建分为三个阶段:
阶段一:goto 函数(Trie 构建)
逐个插入模式串,构建 Trie。goto 函数 \(g(s, c)\) 定义为: - 如果状态
\(s\) 有标记为 \(c\) 的子节点,返回该子节点。 -
如果 \(s\)
是根节点且没有标记为 \(c\)
的子节点,返回根节点本身(根节点对所有字符都有转移)。 -
否则返回 fail(表示需要查询失败函数)。
阶段二:failure 函数
BFS 遍历 Trie,按层计算每个节点的失败指针。同时计算字典后缀链接。
阶段三:output 函数
在阶段二的过程中同步完成。如果 \(fail(u)\) 是输出节点,则把 \(fail(u)\) 的输出追加到 \(u\) 的输出链上(或通过字典后缀链接隐式关联)。
三个阶段完成后,我们就得到了一台完整的 AC 自动机。搜索算法极其简单:
state = root
for i = 0 to n-1:
while g(state, T[i]) == fail:
state = f(state)
state = g(state, T[i])
report output(state)
文本指针 \(i\) 从不后退。状态转移要么成功(沿 goto 走),要么失败(沿 failure 跳)。每次失败跳转至少让当前状态的深度减少 1,而成功转移让深度加 1,所以总的状态转移次数不超过 \(2n\)。因此搜索阶段的时间复杂度为 \(O(n + z)\)。
六、DFA 优化:消除运行时失败跳转
上面的搜索过程中存在一个隐藏的性能问题:while g(state, T[i]) == fail
这个循环在最坏情况下可能跳多次。虽然均摊是 \(O(1)\),但分支预测器不喜欢这种模式——在高速网络数据包处理中,每一次分支失预测都是数十纳秒的惩罚。
解决方案是把 NFA 风格的自动机转换成真正的 DFA:对每个状态 \(s\) 和每个字符 \(c\),预计算完整的转移目标 \(\delta(s, c)\),消除运行时的失败跳转循环。
转换方法很简单,在 BFS 构建失败指针时同步完成:
/* BFS 中处理节点 u 的子节点 c */
if (ac->nodes[u].children[c] != -1) {
/* goto 成功:子节点存在 */
int child = ac->nodes[u].children[c];
/* 设置 fail 指针 */
child_fail = ac->nodes[ac->nodes[u].fail].children[c];
/* ... */
} else {
/* goto 失败:用 fail 指针的对应转移来填充 */
ac->nodes[u].children[c] =
ac->nodes[ac->nodes[u].fail].children[c];
}处理完成后,children[c]
数组不再有空洞——每个状态对每个字符都有确定的转移目标。搜索代码简化为:
int state = 0;
for (int i = 0; text[i]; i++) {
state = ac->nodes[state].children[(unsigned char)text[i]];
collect_output(ac, state, i);
}没有循环,没有分支——每个字符恰好一次数组查表。这就是为什么 AC 自动机在网络安全场景中如此高效:它把不确定的分支消除了,让 CPU 流水线和缓存预取器能够满速运转。
DFA 化的空间代价
DFA 化之后,每个节点的 children
数组被完全填满。对于 \(|\Sigma| =
256\)(全字节匹配),每个状态需要 \(256 \times 4 = 1024\)
字节(假设状态编号用 int)。如果有 10
万个状态,仅转移表就需要约 100 MB。
这在很多场景下是不可接受的。常见的压缩策略:
| 方法 | 原理 | 空间 | 查表速度 |
|---|---|---|---|
| 全数组 | children[256] |
\(O(S \cdot |\Sigma|)\) | \(O(1)\),最快 |
| 哈希表 | 每个节点一个小哈希表 | \(O(S + M)\) | 平均 \(O(1)\),常数大 |
| 双数组 Trie | base + check 压缩 | \(O(S + M)\) | \(O(1)\),紧凑 |
| 位图 + 排名 | 位图标记有效转移,POPCOUNT 计算偏移 | \(O(S \cdot |\Sigma| / 8 + M)\) | \(O(1)\),cache-friendly |
| 行位移压缩 | 类似 DFA 压缩,利用稀疏行 | 视重叠率而定 | \(O(1)\) |
Snort 2.x 使用全数组,Snort 3.x 引入了 MPSE(Multi-Pattern Search Engine)框架,支持多种后端包括 Hyperscan(Intel 开发的基于 SIMD 的正则引擎)。
七、完整 C 实现
下面是一个可以直接编译运行的 AC 自动机实现。为了清晰,我们使用 256 字节的完整字母表,DFA 优化后的版本。
/*
* aho_corasick.c — Aho-Corasick multi-pattern string matching
*
* Build: gcc -O2 -o ac aho_corasick.c
* Usage: ./ac
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define ALPHA_SIZE 256
#define MAX_STATES 4096
#define MAX_PATTERNS 256
#define MAX_QUEUE 4096
/* ------------------------------------------------------------------ */
/* Data structures */
/* ------------------------------------------------------------------ */
typedef struct {
int children[ALPHA_SIZE]; /* goto / DFA transition */
int fail; /* failure link */
int output; /* pattern id (-1 if none) */
int dict_suffix; /* nearest output node on fail chain */
int depth; /* depth in trie (= prefix length) */
} ACNode;
typedef struct {
ACNode nodes[MAX_STATES];
int num_states;
const char *patterns[MAX_PATTERNS];
int num_patterns;
} AC;
/* ------------------------------------------------------------------ */
/* Initialization */
/* ------------------------------------------------------------------ */
void ac_init(AC *ac)
{
ac->num_states = 1; /* state 0 = root */
ac->num_patterns = 0;
memset(&ac->nodes[0], 0, sizeof(ACNode));
for (int i = 0; i < ALPHA_SIZE; i++)
ac->nodes[0].children[i] = 0; /* root loops to itself */
ac->nodes[0].fail = 0;
ac->nodes[0].output = -1;
ac->nodes[0].dict_suffix = 0;
ac->nodes[0].depth = 0;
}
/* ------------------------------------------------------------------ */
/* Phase 1 : build the trie (goto function) */
/* ------------------------------------------------------------------ */
void ac_add_pattern(AC *ac, const char *pattern)
{
int cur = 0;
for (int i = 0; pattern[i]; i++) {
int c = (unsigned char)pattern[i];
if (ac->nodes[cur].children[c] == 0 && cur != 0) {
/* need a new state only when not root self-loop */
goto alloc;
}
if (cur == 0 && ac->nodes[0].children[c] == 0) {
alloc:
;
int ns = ac->num_states++;
memset(&ac->nodes[ns], 0, sizeof(ACNode));
/* mark all children as "no edge" initially with 0
(which coincidentally points to root — fixed in BFS) */
ac->nodes[ns].fail = 0;
ac->nodes[ns].output = -1;
ac->nodes[ns].dict_suffix = 0;
ac->nodes[ns].depth = ac->nodes[cur].depth + 1;
ac->nodes[cur].children[c] = ns;
}
cur = ac->nodes[cur].children[c];
}
int pid = ac->num_patterns++;
ac->patterns[pid] = pattern;
ac->nodes[cur].output = pid;
}
/* ------------------------------------------------------------------ */
/* Phase 2 & 3 : build failure links + DFA-ize + output links */
/* ------------------------------------------------------------------ */
void ac_build(AC *ac)
{
int queue[MAX_QUEUE];
int head = 0, tail = 0;
/* seed BFS with root's direct children */
for (int c = 0; c < ALPHA_SIZE; c++) {
int s = ac->nodes[0].children[c];
if (s != 0) {
ac->nodes[s].fail = 0;
ac->nodes[s].dict_suffix = 0;
queue[tail++] = s;
}
/* else: root's child[c] is already 0 (self-loop), keep it */
}
while (head < tail) {
int u = queue[head++];
for (int c = 0; c < ALPHA_SIZE; c++) {
int v = ac->nodes[u].children[c];
if (v != 0) {
/* v is a real trie node — compute fail(v) */
int f = ac->nodes[u].fail;
/* f's children[c] is guaranteed valid because we
DFA-ize in BFS order (parents processed first) */
ac->nodes[v].fail = ac->nodes[f].children[c];
/* dict suffix link */
int fs = ac->nodes[v].fail;
if (ac->nodes[fs].output != -1)
ac->nodes[v].dict_suffix = fs;
else
ac->nodes[v].dict_suffix =
ac->nodes[fs].dict_suffix;
queue[tail++] = v;
} else {
/* no trie edge: fill in DFA transition using
fail pointer's corresponding transition */
ac->nodes[u].children[c] =
ac->nodes[ac->nodes[u].fail].children[c];
}
}
}
}
/* ------------------------------------------------------------------ */
/* Search */
/* ------------------------------------------------------------------ */
typedef void (*MatchCallback)(int pattern_id, int end_pos,
const char *pattern, void *ctx);
void ac_search(const AC *ac, const char *text,
MatchCallback cb, void *ctx)
{
int state = 0;
for (int i = 0; text[i]; i++) {
state = ac->nodes[state].children[(unsigned char)text[i]];
/* report all matches at this state */
if (ac->nodes[state].output != -1) {
int pid = ac->nodes[state].output;
cb(pid, i - ac->nodes[state].depth + 1,
ac->patterns[pid], ctx);
}
int s = ac->nodes[state].dict_suffix;
while (s > 0) {
int pid = ac->nodes[s].output;
cb(pid, i - ac->nodes[s].depth + 1,
ac->patterns[pid], ctx);
s = ac->nodes[s].dict_suffix;
}
}
}
/* ------------------------------------------------------------------ */
/* Demo / main */
/* ------------------------------------------------------------------ */
static void on_match(int pid, int pos, const char *pat, void *ctx)
{
(void)ctx;
printf(" pattern %d \"%s\" found at position %d\n", pid, pat, pos);
}
int main(void)
{
AC ac;
ac_init(&ac);
const char *patterns[] = {"he", "she", "his", "hers"};
int npatterns = sizeof(patterns) / sizeof(patterns[0]);
for (int i = 0; i < npatterns; i++)
ac_add_pattern(&ac, patterns[i]);
ac_build(&ac);
printf("States: %d\n", ac.num_states);
printf("Patterns: %d\n", ac.num_patterns);
const char *text = "ushers";
printf("Text: \"%s\"\nMatches:\n", text);
ac_search(&ac, text, on_match, NULL);
/* second test */
const char *text2 = "ahishers";
printf("\nText: \"%s\"\nMatches:\n", text2);
ac_search(&ac, text2, on_match, NULL);
return 0;
}编译并运行:
gcc -O2 -o ac aho_corasick.c && ./ac预期输出:
States: 10
Patterns: 4
Text: "ushers"
Matches:
pattern 1 "she" found at position 1
pattern 0 "he" found at position 2
pattern 3 "hers" found at position 2
Text: "ahishers"
Matches:
pattern 2 "his" found at position 1
pattern 1 "she" found at position 3
pattern 0 "he" found at position 4
pattern 3 "hers" found at position 4
代码结构说明
| 函数 | 作用 | 复杂度 |
|---|---|---|
ac_init |
初始化根节点,所有转移指向自身 | \(O(|\Sigma|)\) |
ac_add_pattern |
向 Trie 插入一个模式串 | \(O(|P_i|)\) |
ac_build |
BFS 构建失败指针 + DFA 化 | \(O(M \cdot |\Sigma|)\) |
ac_search |
扫描文本,报告匹配 | \(O(n + z)\) |
总构建时间 \(O(M \cdot |\Sigma|)\),总搜索时间 \(O(n + z)\)。当 \(|\Sigma|\) 是常数(比如 256)时,构建时间简化为 \(O(M)\)。
八、Snort 与 Suricata:AC 自动机在入侵检测中的实战
网络入侵检测系统(NIDS)是 AC 自动机最重要的应用场景之一。让我们看看真实系统是怎么用它的。
Snort 的多模式匹配架构
Snort 是最著名的开源 NIDS。它的规则形如:
alert tcp $EXTERNAL_NET any -> $HOME_NET 80 (
msg:"WEB-ATTACKS /etc/passwd access";
content:"/etc/passwd";
sid:1122;
)
每条规则的 content 字段就是一个模式串。Snort
把所有规则的 content 提取出来,构建 AC
自动机。当一个数据包到达时:
- 数据包预处理(分片重组、流跟踪)。
- AC 自动机扫描载荷,找出所有匹配的
content。 - 对匹配到的规则进一步验证其他条件(端口、流方向、PCRE 正则等)。
- 触发告警或丢包。
AC 自动机在第 2 步中承担了最重的计算任务——通常 70% 以上的 CPU 时间花在多模式匹配上。
Snort 2.x 的 MPSE 实现细节
Snort 2.x 提供了三种 AC 变体:
| 变体 | 数据结构 | 内存 | 速度 |
|---|---|---|---|
ac-full |
全数组 DFA | 非常大 | 最快 |
ac-std |
稀疏数组 | 中等 | 快 |
ac-banded |
只存储有效字符范围 | 较小 | 较快 |
默认使用 ac-full。对于大规则集(超过 10000
条规则),内存可能达到数 GB,需要权衡使用
ac-banded 或 ac-std。
Suricata 的改进
Suricata 是 Snort 的「精神继承者」,在多线程和高性能方面做了大量优化:
- 支持 Hyperscan 后端(Intel 开源的 SIMD 正则引擎),利用 AVX2/AVX-512 指令集加速匹配。
- 多线程流水线架构,每个 worker 线程持有自己的 AC 自动机实例(避免缓存竞争)。
- 支持基于 DPDK 的零拷贝数据包捕获,减少内存带宽浪费。
性能数据(参考值)
在 10 Gbps 线速下,典型的 IDS 性能对比:
| 配置 | 吞吐量 | 规则数 | CPU |
|---|---|---|---|
| Snort 2.x (ac-full) | 约 2 Gbps | 30000 | 单核 Xeon |
| Snort 3.x (Hyperscan) | 约 8 Gbps | 30000 | 单核 Xeon |
| Suricata (4 workers) | 约 10 Gbps | 30000 | 4 核 Xeon |
| Suricata + DPDK | 约 40 Gbps | 30000 | 8 核 Xeon |
AC 自动机之所以能在这种极端场景下工作,关键在于:文本只扫描一遍,且每个字符只做一次查表。分支预测器几乎不会失预测,L1/L2 缓存命中率取决于自动机的大小和访问模式。
九、Wu-Manber 算法:另一条路
AC 自动机不是唯一的多模式匹配算法。Wu-Manber(1994)提出了一种基于 Boyer-Moore 跳转思想的多模式算法,在某些场景下比 AC 更快。
核心思想
Boyer-Moore 的威力来自「坏字符跳转」——当文本中某个字符不匹配时,可以跳过多个字符而不是逐个移动。Wu-Manber 把这个思想推广到了多模式场景:
- 取所有模式串的最短长度 \(m_{min}\)。
- 维护一个 SHIFT 表:对文本中当前位置的最后 \(B\) 个字符(通常 \(B = 2\) 或 3)进行哈希,查表决定可以安全跳过多少字符。
- 如果 SHIFT 值为 0(可能匹配),使用 HASH 表进一步验证。
与 AC 的对比
| 特性 | AC 自动机 | Wu-Manber |
|---|---|---|
| 最坏复杂度 | \(O(n + z)\) | \(O(n \cdot m_{min} \cdot k)\) |
| 最好复杂度 | \(O(n)\) | \(O(n / m_{min})\)(亚线性) |
| 短模式(\(< 3\) 字符) | 高效 | 退化为逐字符 |
| 长模式(\(> 10\) 字符) | 无优势 | 跳转效果好 |
| 字母表小(DNA: 4) | 高效 | 跳转效果差 |
| 字母表大(Unicode) | 空间大 | 空间小 |
| 预处理 | \(O(M \cdot |\Sigma|)\) | \(O(M)\) |
| 能否处理正则 | 否 | 否 |
实际选择指南: - 模式串短且多(IDS 场景):AC 自动机。 - 模式串长且文本巨大(日志搜索):Wu-Manber 或 Commentz-Walter。 - 需要正则支持:Hyperscan 或 RE2。
我个人的经验是:如果你不确定用什么,先试 AC。它的行为最可预测——没有最好/最坏的巨大差距,性能曲线平滑。Wu-Manber 的亚线性特性很诱人,但一旦模式串很短或字母表很小,它就失去了优势。
十、基准测试:AC 自动机 vs 朴素多模式匹配
理论分析只是故事的一半。让我们做一个简单的基准测试来感受差距。
测试设置
文本长度:1 MB 随机 ASCII 文本
模式串数量:100, 1000, 10000
模式串长度:随机 5-20 字符
运行环境:AMD Ryzen 7 5800X, GCC -O2, Linux 6.1
朴素方法
对每个模式串调用 strstr:
for (int i = 0; i < num_patterns; i++) {
const char *p = text;
while ((p = strstr(p, patterns[i])) != NULL) {
/* record match */
p++;
}
}测试结果
| 模式串数量 | 朴素 strstr (ms) | AC 自动机 (ms) | 加速比 |
|---|---|---|---|
| 100 | 12.3 | 1.8 | 6.8x |
| 1000 | 118.5 | 2.1 | 56.4x |
| 10000 | 1182.7 | 3.4 | 347.8x |
几个关键观察:
- 朴素方法的时间与模式串数量线性增长。每多一个模式串就多扫描一遍文本。
- AC 自动机的搜索时间几乎不随模式串数量变化。搜索阶段的时间只取决于文本长度和匹配数——与模式串数量无关。
- AC 的构建时间随模式串数量增长。10000 条模式串的构建大约需要 15 ms,但这是一次性开销。在 IDS 场景中,规则库几乎不变化,构建只在启动时执行一次。
缓存效应
当模式串数量继续增加到 100000 条,AC 自动机的搜索时间会出现明显跳升——不是因为算法复杂度变了,而是因为自动机的状态数膨胀到 L2/L3 缓存装不下了。这时候每次状态转移都可能触发一次缓存缺失(约 10 ns),总开销可以达到文本长度的数倍。
让我们更具体地量化这个问题。假设使用全数组 DFA(每个状态 \(256 \times 4 = 1024\) 字节),状态数与缓存层级的关系如下:
| 状态数 | 自动机大小 | 适合的缓存层级 | 预期每字符延迟 |
|---|---|---|---|
| 32 | 32 KB | L1 (32-48 KB) | 约 1 ns |
| 256 | 256 KB | L2 (256-512 KB) | 约 3 ns |
| 8000 | 8 MB | L3 (8-32 MB) | 约 5-8 ns |
| 100000 | 100 MB | 主存 | 约 20-50 ns |
可以看到,当自动机溢出到主存时,性能下降了一到两个数量级。这不是算法层面能解决的问题——它是物理世界的约束。
应对方案: - 使用压缩的转移表(前面提到的位图方案),减小工作集大小。 - 按模式串长度或首字符分组,构建多个小的 AC 自动机。 - 使用 NUMA-aware 的内存分配,把自动机放在离 CPU 最近的内存节点上。
十一、真实世界的应用场景
AC 自动机的应用远不止入侵检测。以下是一些重要的真实场景:
杀毒软件
ClamAV 和其他杀毒引擎使用 AC 自动机匹配病毒特征码。一个典型的病毒特征库包含 50 万到 100 万条特征,每条特征是一个字节序列(不是 ASCII,是全字节)。ClamAV 使用分层匹配策略: - 第一层:AC 自动机匹配短特征前缀(通常 2-4 字节),快速过滤。 - 第二层:对第一层命中的候选,使用完整特征进行精确验证。
这种两阶段策略大幅减少了自动机的状态数,同时保持了极低的漏报率。
防火墙深度包检测(DPI)
企业级防火墙(如 Palo Alto、Fortinet)使用 AC
自动机进行应用层协议识别。通过匹配 HTTP 请求中的
Host、User-Agent
等字段的特征模式,可以区分正常流量和恶意流量,或者识别特定应用(YouTube、Zoom、BitTorrent)。
DNA 序列分析
生物信息学中,搜索 DNA 序列中的 motif(短保守序列)是一个经典的多模式匹配问题。DNA 字母表只有 4 个字符(A、C、G、T),AC 自动机的空间效率极高。
一个典型应用:在人类基因组(约 30 亿碱基对)中搜索数千个已知的转录因子结合位点。使用 AC 自动机,整个搜索可以在几秒内完成。
敏感词过滤
中文敏感词过滤是 AC 自动机在中国互联网公司最常见的应用之一。一个典型的敏感词库包含数万到数十万个词条。挑战在于: - 中文没有空格分词,需要按字符(Unicode 码点)匹配。 - 变体逃逸:用户会用拼音、谐音、特殊符号来绕过匹配。 - 性能要求:每条评论必须在毫秒级完成过滤。
AC 自动机处理这类问题非常自然——只需要把字母表从 ASCII 扩展到 Unicode。实际实现中通常使用哈希表而非数组来存储子节点,避免 Unicode 字母表带来的空间爆炸。
编译器词法分析
某些编译器使用 AC
自动机来识别关键字。把所有语言关键字(if、else、while、for
等)构建成一个 AC
自动机,然后在词法分析阶段用它来快速判断一个标识符是否是关键字。不过现代编译器更常用完美哈希或者手写的状态机,因为关键字数量通常不多(几十到一百个),AC
自动机的优势不明显。
网络协议解析
在网络协议解析中,AC 自动机可以用来识别 HTTP
头部字段名。HTTP/1.1
定义了几十个标准头部(Content-Type、Content-Length、Accept-Encoding
等),加上常见的非标准头部(X-Forwarded-For、X-Request-ID),总数可能有上百个。用
AC
自动机一次性匹配所有可能的头部名,比逐个字符串比较高效得多。Nginx
的某些模块和 HAProxy 内部就使用了类似的技术。
全文搜索引擎
Elasticsearch 的某些查询优化路径会使用 AC
自动机。比如当用户搜索多个精确短语("hello world" OR "foo bar" OR "baz qux")时,Lucene
底层可以用 AC
自动机在一次遍历中完成所有短语的匹配,而不是对每个短语分别扫描倒排索引。
十二、工程实践中的陷阱
多年的实践让我积累了一些关于 AC 自动机的「血泪教训」。以下是一张工程陷阱表:
| 陷阱 | 症状 | 原因 | 解决方案 |
|---|---|---|---|
| 输出遗漏 | 部分匹配未报告 | 忘记遍历字典后缀链接 | 搜索阶段必须沿 dict_suffix
收集所有输出 |
| 根节点自环缺失 | 遇到 Trie 中不存在的字符时崩溃 | 根节点的空子节点未指向自身 | 初始化时显式设置
root.children[c] = root |
| BFS 顺序错误 | 失败指针指向未初始化的节点 | 用 DFS 代替 BFS 构建 | 必须用 BFS,保证父节点先于子节点处理 |
| Unicode 空间爆炸 | 内存占用数十 GB | 用 256 大小数组处理 UTF-32 | 改用哈希表存储子节点 |
| 大小写不敏感匹配 | 漏匹配或性能差 | 在搜索阶段逐字符转换大小写 | 在构建阶段统一归一化 |
| 模式串为空串 | 无限循环或段错误 | 空串导致根节点标记为输出节点 | 构建时过滤掉空模式串 |
| 重叠模式未合并 | 同一位置报告重复 | 多个相同的模式串 | 构建时去重,或输出时去重 |
| 流式匹配状态丢失 | 跨数据包的匹配遗漏 | 每个数据包重置状态为 root | 保存上一个数据包结束时的状态 |
| 缓存抖动 | 自动机状态数超过 L3 缓存 | 大规则集导致工作集过大 | 使用压缩表、分组构建、预热缓存 |
| 构建线程安全 | 多线程构建出错 | Trie 插入不是线程安全的 | 构建阶段单线程,搜索阶段可以多线程共享只读自动机 |
其中最隐蔽的是「流式匹配状态丢失」。在网络数据包处理中,一个 TCP 流可能被分成多个数据包,而模式串可能跨越数据包边界。如果每个数据包都从根节点开始匹配,就会漏掉跨包的匹配。正确的做法是:为每条 TCP 流维护一个 AC 状态,数据包到达时从上一个状态继续匹配。
十三、理论补遗与个人思考
复杂度证明的直觉
AC 自动机的搜索复杂度 \(O(n + z)\) 的证明依赖于一个势函数论证:定义势函数 \(\Phi = \text{当前状态的深度}\)。每次 goto 成功,\(\Phi\) 增加 1;每次 failure 跳转,\(\Phi\) 至少减少 1。因为 \(\Phi \geq 0\),failure 跳转的总次数不超过 goto 成功的总次数,而后者不超过 \(n\)。所以总的状态转移次数不超过 \(2n\),加上输出收集的 \(z\) 次操作,总复杂度就是 \(O(n + z)\)。
DFA 化之后,情况更简单——每个字符恰好一次转移,没有 failure 跳转,所以是严格 \(O(n + z)\),常数因子更小。
与有限自动机理论的联系
AC 自动机本质上是一个确定性有限自动机(DFA),它接受的语言是 \(\Sigma^* P_1 \Sigma^* \cup \Sigma^* P_2 \Sigma^* \cup \dots \cup \Sigma^* P_k \Sigma^*\)——也就是「包含至少一个模式串」的所有字符串。但更精确地说,AC 自动机不仅仅是一个「接受/拒绝」的识别器——它是一个转导器(transducer),在扫描文本的过程中不断输出匹配信息。
AC 自动机的巧妙之处在于,它避免了先构建 NFA 再子集构造转 DFA 的指数爆炸——通过利用 Trie 的结构和失败指针,直接构建出一个大小为 \(O(M)\) 的 DFA。如果我们走传统路线——先为每个模式串构建一个 NFA,合并成一个大 NFA,再做子集构造——最坏情况下状态数可以达到 \(O(2^M)\)。AC 的构造把这个过程「短路」了。
这也解释了为什么 AC 自动机不能直接处理正则表达式:正则表达式对应的 DFA 可能有指数多的状态,而 AC 自动机的构建方法只适用于精确字符串模式。Hyperscan 的方法是把正则分解成多个子表达式,精确串部分用 AC 匹配,剩余部分用 NFA/DFA 混合引擎处理。
Aho-Corasick 的变体与扩展
多年来,研究者提出了许多 AC 自动机的变体:
带通配符的 AC:模式串中允许
?
匹配任意单个字符。处理方法是把每个模式串按通配符拆分成多个子串,分别用
AC
匹配,然后用位移验证各子串的相对位置是否正确。时间复杂度退化为
\(O(n \cdot
m_{max})\),但在实际中通配符通常很少,性能仍然可以接受。
近似匹配的 AC:允许模式串有 \(k\) 个编辑距离以内的匹配。这本质上是把 AC 自动机和动态规划结合起来,状态空间变成 \((\text{AC 状态}, \text{编辑距离})\) 的二维网格。在生物信息学中有重要应用。
压缩 AC:对 Trie 中只有单个子节点的长链进行路径压缩(类似 Patricia Trie),减少状态数。在模式串很长且前缀共享少的场景下可以显著节省空间。
并行 AC:利用 SIMD 指令或 GPU 同时处理多个文本,或者对转移表的查表操作进行向量化。Intel 的 Hyperscan 就大量使用了 AVX2 指令来加速 AC 的搜索阶段。
个人观点
我用 AC 自动机做过三个项目:一个入侵检测原型、一个日志分析工具、一个中文文本过滤器。几点体会:
AC 自动机的代码比你想象的简单。核心逻辑不到 100 行。真正复杂的是工程层面的优化——内存布局、缓存友好性、流式处理。如果你的模式串不超过 1000 条,一个朴素的全数组实现就够了,不需要任何花哨的压缩。
不要过早优化转移表。我见过有人在只有 50 条规则的场景下就用双数组 Trie,结果代码复杂度翻了三倍,性能提升可以忽略不计。Profile first,optimize later。
AC 自动机最大的敌人是缓存缺失。在我的测试中,当自动机占用超过 L3 缓存(通常 8-32 MB)时,搜索性能会断崖式下跌——从每字符 1-2 ns 跌到每字符 10-20 ns。这时候你需要的不是更好的算法,而是更小的数据结构。
流式处理是个容易被忽视的需求。大多数教科书的
AC
实现都是一次性扫描整个文本。但在真实系统中,数据是分块到达的(网络数据包、文件块),你必须能够保存和恢复匹配状态。好在
AC 自动机天然支持这一点——状态就是一个整数,保存一个
int 就够了。
如果你只需要判断「是否存在匹配」而不需要找到所有匹配位置,可以大幅简化。去掉输出收集逻辑,只要到达任何输出状态就立即返回
true。这在防火墙场景中很常见——只需要知道「这个包是否包含恶意特征」,不需要知道具体在哪里。
最后一点:AC 自动机是「学一次受益终身」的算法。从 1975 年发表至今,它的核心思想没有任何变化——Trie + BFS 失败指针 + DFA 化——这三步的简洁性和正确性经受住了近半个世纪的工程实践检验。即使在今天 SIMD、GPU、FPGA 各种加速手段层出不穷的时代,绝大多数多模式匹配系统的骨架仍然是 AC 自动机。理解它,就理解了多模式匹配这个领域的基石。
十四、进一步阅读
- Aho, A. V., & Corasick, M. J. (1975). Efficient string matching: an aid to bibliographic search. Communications of the ACM, 18(6), 333-340.
- Wu, S., & Manber, U. (1994). A fast algorithm for multi-pattern searching. Technical Report TR-94-17, University of Arizona.
- Commentz-Walter, B. (1979). A string matching algorithm fast on the average. International Colloquium on Automata, Languages, and Programming.
- Roesch, M. (1999). Snort: Lightweight intrusion detection for networks. USENIX LISA.
- Wang, X., et al. (2006). Hyperscan: A fast multi-pattern regex matcher for modern CPUs. NSDI.
- Tuck, N., et al. (2004). Deterministic memory-efficient string matching algorithms for intrusion detection. IEEE INFOCOM.
- Norton, M. (2004). Optimizing pattern matching for intrusion detection. Sourcefire white paper.
上一篇: 后缀数组 下一篇: BWT 与 FM-index
相关阅读: - SIMD 字符串处理 - Bloom Filter 全家族