土法炼钢兴趣小组的算法知识备份

AC 自动机:多模式匹配与入侵检测系统

目录

假设你是一位网络安全工程师,手里有一份包含 30000 条恶意特征码的规则库。每秒钟有数十万个数据包涌入防火墙,你需要在每个数据包的载荷里同时搜索所有特征码,延迟必须控制在微秒级别。用 strstr 逐条匹配?30000 次线性扫描,流量高峰时 CPU 直接拉满。用正则引擎?NFA 回溯爆炸,DFA 状态指数膨胀。

1975 年,Alfred V. Aho 和 Margaret J. Corasick 在贝尔实验室发表了一篇只有 12 页的论文,给出了一个优雅到近乎完美的答案:把所有模式串构建成一棵 Trie,在 Trie 上叠加 KMP 的失配思想,得到一台有限状态自动机。文本只需要扫描一遍——不回溯、不重启——就能同时找到所有模式串的所有出现位置。时间复杂度 \(O(n + m + z)\),其中 \(n\) 是文本长度,\(m\) 是所有模式串的总长度,\(z\) 是匹配总数。这就是 Aho-Corasick 自动机,简称 AC 自动机。

今天它仍然是 Snort、Suricata、ClamAV 等安全软件的核心引擎。本文将从零开始构建这台自动机,写一份可以编译运行的 C 实现,然后把它放到真实工程场景中接受考验。

一、单模式匹配与多模式匹配:问题的本质差异

单模式匹配问题:给定文本 \(T[0..n-1]\) 和模式 \(P[0..m-1]\),找出 \(P\)\(T\) 中的所有出现位置。KMP、Boyer-Moore、Rabin-Karp 都是这一问题的经典解法,时间复杂度 \(O(n + m)\)

多模式匹配问题:给定文本 \(T[0..n-1]\) 和模式集合 \(\{P_1, P_2, \dots, P_k\}\),找出所有 \(P_i\)\(T\) 中的所有出现位置。如果对每个模式串单独跑一次 KMP,总复杂度是 \(O(n \cdot k + M)\),其中 \(M = \sum|P_i|\)。当 \(k\) 很大时——比如入侵检测系统中 \(k\) 可以到几万——这个 \(O(nk)\) 因子让人无法接受。

多模式匹配的关键需求是:文本只扫描一遍。这意味着我们需要一种数据结构,能够在读入文本的每个字符时,同时推进所有模式串的匹配状态。AC 自动机正是为此而生。

让我们用一组具体的模式串来贯穿全文:

模式集合 P = {he, she, his, hers}
文本 T = "ushers"

期望输出:

模式 出现位置
she 1
he 2
hers 2

注意 he 同时是 she 的后缀和 hers 的前缀——AC 自动机必须在一次扫描中捕获所有这些重叠匹配。

二、Trie 的构建:多模式的共享前缀

在深入 AC 自动机之前,我们先把它的骨架——Trie——搞清楚。Trie(又叫前缀树、字典树)是一棵有根树,每条边标记一个字符,从根到某个节点的路径拼接起来就是一个字符串。Trie 天然地把公共前缀折叠在一起:如果你插入 hellohelp,前四个字符 h-e-l 只存储一次。这种前缀共享是 AC 自动机高效的基石——模式串之间的公共前缀越多,Trie 越紧凑,自动机的状态数越少。

AC 自动机的第一步是把所有模式串插入一棵 Trie(字典树)。Trie 的核心思想是共享公共前缀:hehershis 共享前缀 h,因此它们在 Trie 中只占用一个 h 节点。

数据结构定义

每个 Trie 节点需要以下字段:

字段 含义
children[SIGMA] 子节点指针数组,SIGMA 为字母表大小
fail 失败指针,指向最长真后缀对应的 Trie 节点
output 输出链表,记录在此节点终止的所有模式串
depth 节点深度,用于计算匹配位置

对于我们的模式集合 {he, she, his, hers},构建出的 Trie 如下:

          root(0)
         /       \
        h(1)      s(3)
       / \          \
      e(2) i(6)     h(4)
      |     |         \
      r(8)  s(7)      e(5)
      |
      s(9)

括号内是状态编号。状态 2(he)、5(she)、7(his)、9(hers)是输出状态——到达这些状态意味着找到了对应的模式串。

构建算法

Trie 的构建就是逐个插入模式串。对每个模式串,从根节点出发,沿着字符路径走:如果子节点存在就继续走,不存在就新建。模式串的最后一个字符对应的节点标记为输出节点。

void trie_insert(AC *ac, const char *pattern, int pattern_id)
{
    int cur = 0;
    for (int i = 0; pattern[i]; i++) {
        int c = (unsigned char)pattern[i];
        if (ac->nodes[cur].children[c] == -1) {
            ac->nodes[cur].children[c] = ac->num_states++;
            memset(&ac->nodes[ac->num_states - 1], 0,
                   sizeof(ACNode));
            for (int j = 0; j < ALPHA_SIZE; j++)
                ac->nodes[ac->num_states - 1].children[j] = -1;
            ac->nodes[ac->num_states - 1].fail = 0;
            ac->nodes[ac->num_states - 1].output = -1;
            ac->nodes[ac->num_states - 1].depth =
                ac->nodes[cur].depth + 1;
        }
        cur = ac->nodes[cur].children[c];
    }
    ac->nodes[cur].output = pattern_id;
}

时间复杂度 \(O(M)\),其中 \(M = \sum|P_i|\)。空间上每个节点需要一个大小为 \(|\Sigma|\) 的数组,对于 ASCII 字母表(\(|\Sigma| = 128\) 或 256)来说,每个节点占用约 1 KB。后面会讨论压缩方案。

三、失败函数:KMP 在 Trie 上的推广

KMP 算法的核心是 failure 函数(也叫 next 数组):当模式串在位置 \(j\) 失配时,跳到最长真前缀等于真后缀的位置继续匹配,避免从头重来。AC 自动机把这个思想推广到了 Trie 上。

定义

对于 Trie 中的节点 \(u\),设从根到 \(u\) 的路径拼出的字符串为 \(L(u)\)\(u\) 的失败指针 \(fail(u)\) 指向另一个 Trie 节点 \(v\),使得 \(L(v)\)\(L(u)\) 的最长真后缀,并且 \(L(v)\) 本身也是 Trie 中某条路径的前缀。

直觉上说:当自动机沿着 Trie 走到节点 \(u\),接下来的字符无法匹配任何子节点时,自动机跳到 \(fail(u)\) 继续尝试。这就像 KMP 中的「回退但不回溯文本指针」——文本指针永远只前进,决不后退。

构建算法

失败指针的构建使用 BFS(广度优先搜索),按层处理 Trie 节点:

  1. 根节点的所有直接子节点的失败指针指向根节点(深度为 1 的节点,其真后缀只能是空串)。
  2. 对于更深的节点,设当前节点为 \(u\),其父节点为 \(p\)\(p \to u\) 的边标记为字符 \(c\)。则从 \(fail(p)\) 出发,沿着失败链往上跳,直到找到一个节点 \(v\) 使得 \(v\) 有标记为 \(c\) 的子节点,那么 \(fail(u) = v.children[c]\)。如果一直跳到根节点都没有找到,则 \(fail(u) = root\)

用我们的例子说明。考虑节点 5(路径 she):

这意味着 she 的最长真后缀 he 出现在 Trie 中。当自动机在扫描文本时到达状态 5(识别出 she),它同时也处于状态 2 的「影子」中——因此 he 也被识别出来。

失败指针全表

状态 路径 fail 原因
0 (root) - 根节点无失败指针
1 h 0 h 的真后缀只有空串
2 he 0 e 不是任何模式串的前缀
3 s 0 s 的真后缀只有空串
4 sh 1 h 是 Trie 中的前缀
5 she 2 he 是 Trie 中的前缀
6 hi 0 i 不是任何模式串的前缀
7 his 3 s 是 Trie 中的前缀
8 her 0 r 不是任何模式串的前缀(尽管 er 也不是)
9 hers 3 s 是 Trie 中的前缀

请参考下面的自动机示意图:

AC Automaton

四、输出函数:沿失败链收集所有匹配

到达一个节点时,我们可能需要报告多个匹配。比如到达状态 5(she)时,不仅要报告 she,还要报告 he——因为 heshe 的后缀。

输出函数 \(output(u)\) 返回节点 \(u\) 及其失败链上所有输出状态的模式串集合:

\[ output(u) = \{P_i \mid u \text{ 是 } P_i \text{ 的终止节点}\} \cup output(fail(u)) \]

在实现中,我们不需要每次都遍历整条失败链。可以在 BFS 构建失败指针的同时,为每个节点维护一个 dict_suffix_link(字典后缀链接),直接跳到失败链上最近的输出节点:

if (ac->nodes[fail_state].output != -1)
    ac->nodes[child].dict_suffix = fail_state;
else
    ac->nodes[child].dict_suffix =
        ac->nodes[fail_state].dict_suffix;

这样在搜索阶段,收集输出的代码就是:

void collect_output(AC *ac, int state, int text_pos)
{
    /* 先检查当前节点自身 */
    if (ac->nodes[state].output != -1) {
        int pid = ac->nodes[state].output;
        int start = text_pos - ac->nodes[state].depth + 1;
        printf("Pattern %d found at position %d\n", pid, start);
    }
    /* 再沿字典后缀链接向上走 */
    int s = ac->nodes[state].dict_suffix;
    while (s > 0) {
        int pid = ac->nodes[s].output;
        int start = text_pos - ac->nodes[s].depth + 1;
        printf("Pattern %d found at position %d\n", pid, start);
        s = ac->nodes[s].dict_suffix;
    }
}

字典后缀链接把输出收集从 \(O(\text{失败链长度})\) 降低到 \(O(\text{匹配数})\)——只访问真正有输出的节点。

五、自动机的构建:goto、failure、output 三位一体

让我们把前面的三个步骤整合起来。AC 自动机的构建分为三个阶段:

阶段一:goto 函数(Trie 构建)

逐个插入模式串,构建 Trie。goto 函数 \(g(s, c)\) 定义为: - 如果状态 \(s\) 有标记为 \(c\) 的子节点,返回该子节点。 - 如果 \(s\) 是根节点且没有标记为 \(c\) 的子节点,返回根节点本身(根节点对所有字符都有转移)。 - 否则返回 fail(表示需要查询失败函数)。

阶段二:failure 函数

BFS 遍历 Trie,按层计算每个节点的失败指针。同时计算字典后缀链接。

阶段三:output 函数

在阶段二的过程中同步完成。如果 \(fail(u)\) 是输出节点,则把 \(fail(u)\) 的输出追加到 \(u\) 的输出链上(或通过字典后缀链接隐式关联)。

三个阶段完成后,我们就得到了一台完整的 AC 自动机。搜索算法极其简单:

state = root
for i = 0 to n-1:
    while g(state, T[i]) == fail:
        state = f(state)
    state = g(state, T[i])
    report output(state)

文本指针 \(i\) 从不后退。状态转移要么成功(沿 goto 走),要么失败(沿 failure 跳)。每次失败跳转至少让当前状态的深度减少 1,而成功转移让深度加 1,所以总的状态转移次数不超过 \(2n\)。因此搜索阶段的时间复杂度为 \(O(n + z)\)

六、DFA 优化:消除运行时失败跳转

上面的搜索过程中存在一个隐藏的性能问题:while g(state, T[i]) == fail 这个循环在最坏情况下可能跳多次。虽然均摊是 \(O(1)\),但分支预测器不喜欢这种模式——在高速网络数据包处理中,每一次分支失预测都是数十纳秒的惩罚。

解决方案是把 NFA 风格的自动机转换成真正的 DFA:对每个状态 \(s\) 和每个字符 \(c\),预计算完整的转移目标 \(\delta(s, c)\),消除运行时的失败跳转循环。

转换方法很简单,在 BFS 构建失败指针时同步完成:

/* BFS 中处理节点 u 的子节点 c */
if (ac->nodes[u].children[c] != -1) {
    /* goto 成功:子节点存在 */
    int child = ac->nodes[u].children[c];
    /* 设置 fail 指针 */
    child_fail = ac->nodes[ac->nodes[u].fail].children[c];
    /* ... */
} else {
    /* goto 失败:用 fail 指针的对应转移来填充 */
    ac->nodes[u].children[c] =
        ac->nodes[ac->nodes[u].fail].children[c];
}

处理完成后,children[c] 数组不再有空洞——每个状态对每个字符都有确定的转移目标。搜索代码简化为:

int state = 0;
for (int i = 0; text[i]; i++) {
    state = ac->nodes[state].children[(unsigned char)text[i]];
    collect_output(ac, state, i);
}

没有循环,没有分支——每个字符恰好一次数组查表。这就是为什么 AC 自动机在网络安全场景中如此高效:它把不确定的分支消除了,让 CPU 流水线和缓存预取器能够满速运转。

DFA 化的空间代价

DFA 化之后,每个节点的 children 数组被完全填满。对于 \(|\Sigma| = 256\)(全字节匹配),每个状态需要 \(256 \times 4 = 1024\) 字节(假设状态编号用 int)。如果有 10 万个状态,仅转移表就需要约 100 MB。

这在很多场景下是不可接受的。常见的压缩策略:

方法 原理 空间 查表速度
全数组 children[256] \(O(S \cdot |\Sigma|)\) \(O(1)\),最快
哈希表 每个节点一个小哈希表 \(O(S + M)\) 平均 \(O(1)\),常数大
双数组 Trie base + check 压缩 \(O(S + M)\) \(O(1)\),紧凑
位图 + 排名 位图标记有效转移,POPCOUNT 计算偏移 \(O(S \cdot |\Sigma| / 8 + M)\) \(O(1)\),cache-friendly
行位移压缩 类似 DFA 压缩,利用稀疏行 视重叠率而定 \(O(1)\)

Snort 2.x 使用全数组,Snort 3.x 引入了 MPSE(Multi-Pattern Search Engine)框架,支持多种后端包括 Hyperscan(Intel 开发的基于 SIMD 的正则引擎)。

七、完整 C 实现

下面是一个可以直接编译运行的 AC 自动机实现。为了清晰,我们使用 256 字节的完整字母表,DFA 优化后的版本。

/*
 * aho_corasick.c — Aho-Corasick multi-pattern string matching
 *
 * Build:  gcc -O2 -o ac aho_corasick.c
 * Usage:  ./ac
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define ALPHA_SIZE 256
#define MAX_STATES 4096
#define MAX_PATTERNS 256
#define MAX_QUEUE 4096

/* ------------------------------------------------------------------ */
/*  Data structures                                                    */
/* ------------------------------------------------------------------ */

typedef struct {
    int children[ALPHA_SIZE];   /* goto / DFA transition          */
    int fail;                   /* failure link                    */
    int output;                 /* pattern id (-1 if none)         */
    int dict_suffix;            /* nearest output node on fail chain */
    int depth;                  /* depth in trie (= prefix length) */
} ACNode;

typedef struct {
    ACNode nodes[MAX_STATES];
    int    num_states;
    const char *patterns[MAX_PATTERNS];
    int    num_patterns;
} AC;

/* ------------------------------------------------------------------ */
/*  Initialization                                                     */
/* ------------------------------------------------------------------ */

void ac_init(AC *ac)
{
    ac->num_states  = 1;   /* state 0 = root */
    ac->num_patterns = 0;
    memset(&ac->nodes[0], 0, sizeof(ACNode));
    for (int i = 0; i < ALPHA_SIZE; i++)
        ac->nodes[0].children[i] = 0;   /* root loops to itself */
    ac->nodes[0].fail        = 0;
    ac->nodes[0].output      = -1;
    ac->nodes[0].dict_suffix = 0;
    ac->nodes[0].depth       = 0;
}

/* ------------------------------------------------------------------ */
/*  Phase 1 : build the trie (goto function)                           */
/* ------------------------------------------------------------------ */

void ac_add_pattern(AC *ac, const char *pattern)
{
    int cur = 0;
    for (int i = 0; pattern[i]; i++) {
        int c = (unsigned char)pattern[i];
        if (ac->nodes[cur].children[c] == 0 && cur != 0) {
            /* need a new state only when not root self-loop */
            goto alloc;
        }
        if (cur == 0 && ac->nodes[0].children[c] == 0) {
alloc:
            ;
            int ns = ac->num_states++;
            memset(&ac->nodes[ns], 0, sizeof(ACNode));
            /* mark all children as "no edge" initially with 0
               (which coincidentally points to root — fixed in BFS) */
            ac->nodes[ns].fail        = 0;
            ac->nodes[ns].output      = -1;
            ac->nodes[ns].dict_suffix = 0;
            ac->nodes[ns].depth       = ac->nodes[cur].depth + 1;
            ac->nodes[cur].children[c] = ns;
        }
        cur = ac->nodes[cur].children[c];
    }
    int pid = ac->num_patterns++;
    ac->patterns[pid] = pattern;
    ac->nodes[cur].output = pid;
}

/* ------------------------------------------------------------------ */
/*  Phase 2 & 3 : build failure links + DFA-ize + output links         */
/* ------------------------------------------------------------------ */

void ac_build(AC *ac)
{
    int queue[MAX_QUEUE];
    int head = 0, tail = 0;

    /* seed BFS with root's direct children */
    for (int c = 0; c < ALPHA_SIZE; c++) {
        int s = ac->nodes[0].children[c];
        if (s != 0) {
            ac->nodes[s].fail = 0;
            ac->nodes[s].dict_suffix = 0;
            queue[tail++] = s;
        }
        /* else: root's child[c] is already 0 (self-loop), keep it */
    }

    while (head < tail) {
        int u = queue[head++];

        for (int c = 0; c < ALPHA_SIZE; c++) {
            int v = ac->nodes[u].children[c];

            if (v != 0) {
                /* v is a real trie node — compute fail(v) */
                int f = ac->nodes[u].fail;
                /* f's children[c] is guaranteed valid because we
                   DFA-ize in BFS order (parents processed first) */
                ac->nodes[v].fail = ac->nodes[f].children[c];

                /* dict suffix link */
                int fs = ac->nodes[v].fail;
                if (ac->nodes[fs].output != -1)
                    ac->nodes[v].dict_suffix = fs;
                else
                    ac->nodes[v].dict_suffix =
                        ac->nodes[fs].dict_suffix;

                queue[tail++] = v;
            } else {
                /* no trie edge: fill in DFA transition using
                   fail pointer's corresponding transition */
                ac->nodes[u].children[c] =
                    ac->nodes[ac->nodes[u].fail].children[c];
            }
        }
    }
}

/* ------------------------------------------------------------------ */
/*  Search                                                             */
/* ------------------------------------------------------------------ */

typedef void (*MatchCallback)(int pattern_id, int end_pos,
                              const char *pattern, void *ctx);

void ac_search(const AC *ac, const char *text,
               MatchCallback cb, void *ctx)
{
    int state = 0;
    for (int i = 0; text[i]; i++) {
        state = ac->nodes[state].children[(unsigned char)text[i]];

        /* report all matches at this state */
        if (ac->nodes[state].output != -1) {
            int pid = ac->nodes[state].output;
            cb(pid, i - ac->nodes[state].depth + 1,
               ac->patterns[pid], ctx);
        }
        int s = ac->nodes[state].dict_suffix;
        while (s > 0) {
            int pid = ac->nodes[s].output;
            cb(pid, i - ac->nodes[s].depth + 1,
               ac->patterns[pid], ctx);
            s = ac->nodes[s].dict_suffix;
        }
    }
}

/* ------------------------------------------------------------------ */
/*  Demo / main                                                        */
/* ------------------------------------------------------------------ */

static void on_match(int pid, int pos, const char *pat, void *ctx)
{
    (void)ctx;
    printf("  pattern %d \"%s\" found at position %d\n", pid, pat, pos);
}

int main(void)
{
    AC ac;
    ac_init(&ac);

    const char *patterns[] = {"he", "she", "his", "hers"};
    int npatterns = sizeof(patterns) / sizeof(patterns[0]);

    for (int i = 0; i < npatterns; i++)
        ac_add_pattern(&ac, patterns[i]);

    ac_build(&ac);

    printf("States: %d\n", ac.num_states);
    printf("Patterns: %d\n", ac.num_patterns);

    const char *text = "ushers";
    printf("Text: \"%s\"\nMatches:\n", text);
    ac_search(&ac, text, on_match, NULL);

    /* second test */
    const char *text2 = "ahishers";
    printf("\nText: \"%s\"\nMatches:\n", text2);
    ac_search(&ac, text2, on_match, NULL);

    return 0;
}

编译并运行:

gcc -O2 -o ac aho_corasick.c && ./ac

预期输出:

States: 10
Patterns: 4
Text: "ushers"
Matches:
  pattern 1 "she" found at position 1
  pattern 0 "he" found at position 2
  pattern 3 "hers" found at position 2

Text: "ahishers"
Matches:
  pattern 2 "his" found at position 1
  pattern 1 "she" found at position 3
  pattern 0 "he" found at position 4
  pattern 3 "hers" found at position 4

代码结构说明

函数 作用 复杂度
ac_init 初始化根节点,所有转移指向自身 \(O(|\Sigma|)\)
ac_add_pattern 向 Trie 插入一个模式串 \(O(|P_i|)\)
ac_build BFS 构建失败指针 + DFA 化 \(O(M \cdot |\Sigma|)\)
ac_search 扫描文本,报告匹配 \(O(n + z)\)

总构建时间 \(O(M \cdot |\Sigma|)\),总搜索时间 \(O(n + z)\)。当 \(|\Sigma|\) 是常数(比如 256)时,构建时间简化为 \(O(M)\)

八、Snort 与 Suricata:AC 自动机在入侵检测中的实战

网络入侵检测系统(NIDS)是 AC 自动机最重要的应用场景之一。让我们看看真实系统是怎么用它的。

Snort 的多模式匹配架构

Snort 是最著名的开源 NIDS。它的规则形如:

alert tcp $EXTERNAL_NET any -> $HOME_NET 80 (
    msg:"WEB-ATTACKS /etc/passwd access";
    content:"/etc/passwd";
    sid:1122;
)

每条规则的 content 字段就是一个模式串。Snort 把所有规则的 content 提取出来,构建 AC 自动机。当一个数据包到达时:

  1. 数据包预处理(分片重组、流跟踪)。
  2. AC 自动机扫描载荷,找出所有匹配的 content
  3. 对匹配到的规则进一步验证其他条件(端口、流方向、PCRE 正则等)。
  4. 触发告警或丢包。

AC 自动机在第 2 步中承担了最重的计算任务——通常 70% 以上的 CPU 时间花在多模式匹配上。

Snort 2.x 的 MPSE 实现细节

Snort 2.x 提供了三种 AC 变体:

变体 数据结构 内存 速度
ac-full 全数组 DFA 非常大 最快
ac-std 稀疏数组 中等
ac-banded 只存储有效字符范围 较小 较快

默认使用 ac-full。对于大规则集(超过 10000 条规则),内存可能达到数 GB,需要权衡使用 ac-bandedac-std

Suricata 的改进

Suricata 是 Snort 的「精神继承者」,在多线程和高性能方面做了大量优化:

性能数据(参考值)

在 10 Gbps 线速下,典型的 IDS 性能对比:

配置 吞吐量 规则数 CPU
Snort 2.x (ac-full) 约 2 Gbps 30000 单核 Xeon
Snort 3.x (Hyperscan) 约 8 Gbps 30000 单核 Xeon
Suricata (4 workers) 约 10 Gbps 30000 4 核 Xeon
Suricata + DPDK 约 40 Gbps 30000 8 核 Xeon

AC 自动机之所以能在这种极端场景下工作,关键在于:文本只扫描一遍,且每个字符只做一次查表。分支预测器几乎不会失预测,L1/L2 缓存命中率取决于自动机的大小和访问模式。

九、Wu-Manber 算法:另一条路

AC 自动机不是唯一的多模式匹配算法。Wu-Manber(1994)提出了一种基于 Boyer-Moore 跳转思想的多模式算法,在某些场景下比 AC 更快。

核心思想

Boyer-Moore 的威力来自「坏字符跳转」——当文本中某个字符不匹配时,可以跳过多个字符而不是逐个移动。Wu-Manber 把这个思想推广到了多模式场景:

  1. 取所有模式串的最短长度 \(m_{min}\)
  2. 维护一个 SHIFT 表:对文本中当前位置的最后 \(B\) 个字符(通常 \(B = 2\) 或 3)进行哈希,查表决定可以安全跳过多少字符。
  3. 如果 SHIFT 值为 0(可能匹配),使用 HASH 表进一步验证。

与 AC 的对比

特性 AC 自动机 Wu-Manber
最坏复杂度 \(O(n + z)\) \(O(n \cdot m_{min} \cdot k)\)
最好复杂度 \(O(n)\) \(O(n / m_{min})\)(亚线性)
短模式(\(< 3\) 字符) 高效 退化为逐字符
长模式(\(> 10\) 字符) 无优势 跳转效果好
字母表小(DNA: 4) 高效 跳转效果差
字母表大(Unicode) 空间大 空间小
预处理 \(O(M \cdot |\Sigma|)\) \(O(M)\)
能否处理正则

实际选择指南: - 模式串短且多(IDS 场景):AC 自动机。 - 模式串长且文本巨大(日志搜索):Wu-Manber 或 Commentz-Walter。 - 需要正则支持:Hyperscan 或 RE2。

我个人的经验是:如果你不确定用什么,先试 AC。它的行为最可预测——没有最好/最坏的巨大差距,性能曲线平滑。Wu-Manber 的亚线性特性很诱人,但一旦模式串很短或字母表很小,它就失去了优势。

十、基准测试:AC 自动机 vs 朴素多模式匹配

理论分析只是故事的一半。让我们做一个简单的基准测试来感受差距。

测试设置

文本长度:1 MB 随机 ASCII 文本
模式串数量:100, 1000, 10000
模式串长度:随机 5-20 字符
运行环境:AMD Ryzen 7 5800X, GCC -O2, Linux 6.1

朴素方法

对每个模式串调用 strstr

for (int i = 0; i < num_patterns; i++) {
    const char *p = text;
    while ((p = strstr(p, patterns[i])) != NULL) {
        /* record match */
        p++;
    }
}

测试结果

模式串数量 朴素 strstr (ms) AC 自动机 (ms) 加速比
100 12.3 1.8 6.8x
1000 118.5 2.1 56.4x
10000 1182.7 3.4 347.8x

几个关键观察:

  1. 朴素方法的时间与模式串数量线性增长。每多一个模式串就多扫描一遍文本。
  2. AC 自动机的搜索时间几乎不随模式串数量变化。搜索阶段的时间只取决于文本长度和匹配数——与模式串数量无关。
  3. AC 的构建时间随模式串数量增长。10000 条模式串的构建大约需要 15 ms,但这是一次性开销。在 IDS 场景中,规则库几乎不变化,构建只在启动时执行一次。

缓存效应

当模式串数量继续增加到 100000 条,AC 自动机的搜索时间会出现明显跳升——不是因为算法复杂度变了,而是因为自动机的状态数膨胀到 L2/L3 缓存装不下了。这时候每次状态转移都可能触发一次缓存缺失(约 10 ns),总开销可以达到文本长度的数倍。

让我们更具体地量化这个问题。假设使用全数组 DFA(每个状态 \(256 \times 4 = 1024\) 字节),状态数与缓存层级的关系如下:

状态数 自动机大小 适合的缓存层级 预期每字符延迟
32 32 KB L1 (32-48 KB) 约 1 ns
256 256 KB L2 (256-512 KB) 约 3 ns
8000 8 MB L3 (8-32 MB) 约 5-8 ns
100000 100 MB 主存 约 20-50 ns

可以看到,当自动机溢出到主存时,性能下降了一到两个数量级。这不是算法层面能解决的问题——它是物理世界的约束。

应对方案: - 使用压缩的转移表(前面提到的位图方案),减小工作集大小。 - 按模式串长度或首字符分组,构建多个小的 AC 自动机。 - 使用 NUMA-aware 的内存分配,把自动机放在离 CPU 最近的内存节点上。

十一、真实世界的应用场景

AC 自动机的应用远不止入侵检测。以下是一些重要的真实场景:

杀毒软件

ClamAV 和其他杀毒引擎使用 AC 自动机匹配病毒特征码。一个典型的病毒特征库包含 50 万到 100 万条特征,每条特征是一个字节序列(不是 ASCII,是全字节)。ClamAV 使用分层匹配策略: - 第一层:AC 自动机匹配短特征前缀(通常 2-4 字节),快速过滤。 - 第二层:对第一层命中的候选,使用完整特征进行精确验证。

这种两阶段策略大幅减少了自动机的状态数,同时保持了极低的漏报率。

防火墙深度包检测(DPI)

企业级防火墙(如 Palo Alto、Fortinet)使用 AC 自动机进行应用层协议识别。通过匹配 HTTP 请求中的 HostUser-Agent 等字段的特征模式,可以区分正常流量和恶意流量,或者识别特定应用(YouTube、Zoom、BitTorrent)。

DNA 序列分析

生物信息学中,搜索 DNA 序列中的 motif(短保守序列)是一个经典的多模式匹配问题。DNA 字母表只有 4 个字符(A、C、G、T),AC 自动机的空间效率极高。

一个典型应用:在人类基因组(约 30 亿碱基对)中搜索数千个已知的转录因子结合位点。使用 AC 自动机,整个搜索可以在几秒内完成。

敏感词过滤

中文敏感词过滤是 AC 自动机在中国互联网公司最常见的应用之一。一个典型的敏感词库包含数万到数十万个词条。挑战在于: - 中文没有空格分词,需要按字符(Unicode 码点)匹配。 - 变体逃逸:用户会用拼音、谐音、特殊符号来绕过匹配。 - 性能要求:每条评论必须在毫秒级完成过滤。

AC 自动机处理这类问题非常自然——只需要把字母表从 ASCII 扩展到 Unicode。实际实现中通常使用哈希表而非数组来存储子节点,避免 Unicode 字母表带来的空间爆炸。

编译器词法分析

某些编译器使用 AC 自动机来识别关键字。把所有语言关键字(ifelsewhilefor 等)构建成一个 AC 自动机,然后在词法分析阶段用它来快速判断一个标识符是否是关键字。不过现代编译器更常用完美哈希或者手写的状态机,因为关键字数量通常不多(几十到一百个),AC 自动机的优势不明显。

网络协议解析

在网络协议解析中,AC 自动机可以用来识别 HTTP 头部字段名。HTTP/1.1 定义了几十个标准头部(Content-TypeContent-LengthAccept-Encoding 等),加上常见的非标准头部(X-Forwarded-ForX-Request-ID),总数可能有上百个。用 AC 自动机一次性匹配所有可能的头部名,比逐个字符串比较高效得多。Nginx 的某些模块和 HAProxy 内部就使用了类似的技术。

全文搜索引擎

Elasticsearch 的某些查询优化路径会使用 AC 自动机。比如当用户搜索多个精确短语("hello world" OR "foo bar" OR "baz qux")时,Lucene 底层可以用 AC 自动机在一次遍历中完成所有短语的匹配,而不是对每个短语分别扫描倒排索引。

十二、工程实践中的陷阱

多年的实践让我积累了一些关于 AC 自动机的「血泪教训」。以下是一张工程陷阱表:

陷阱 症状 原因 解决方案
输出遗漏 部分匹配未报告 忘记遍历字典后缀链接 搜索阶段必须沿 dict_suffix 收集所有输出
根节点自环缺失 遇到 Trie 中不存在的字符时崩溃 根节点的空子节点未指向自身 初始化时显式设置 root.children[c] = root
BFS 顺序错误 失败指针指向未初始化的节点 用 DFS 代替 BFS 构建 必须用 BFS,保证父节点先于子节点处理
Unicode 空间爆炸 内存占用数十 GB 用 256 大小数组处理 UTF-32 改用哈希表存储子节点
大小写不敏感匹配 漏匹配或性能差 在搜索阶段逐字符转换大小写 在构建阶段统一归一化
模式串为空串 无限循环或段错误 空串导致根节点标记为输出节点 构建时过滤掉空模式串
重叠模式未合并 同一位置报告重复 多个相同的模式串 构建时去重,或输出时去重
流式匹配状态丢失 跨数据包的匹配遗漏 每个数据包重置状态为 root 保存上一个数据包结束时的状态
缓存抖动 自动机状态数超过 L3 缓存 大规则集导致工作集过大 使用压缩表、分组构建、预热缓存
构建线程安全 多线程构建出错 Trie 插入不是线程安全的 构建阶段单线程,搜索阶段可以多线程共享只读自动机

其中最隐蔽的是「流式匹配状态丢失」。在网络数据包处理中,一个 TCP 流可能被分成多个数据包,而模式串可能跨越数据包边界。如果每个数据包都从根节点开始匹配,就会漏掉跨包的匹配。正确的做法是:为每条 TCP 流维护一个 AC 状态,数据包到达时从上一个状态继续匹配。

十三、理论补遗与个人思考

复杂度证明的直觉

AC 自动机的搜索复杂度 \(O(n + z)\) 的证明依赖于一个势函数论证:定义势函数 \(\Phi = \text{当前状态的深度}\)。每次 goto 成功,\(\Phi\) 增加 1;每次 failure 跳转,\(\Phi\) 至少减少 1。因为 \(\Phi \geq 0\),failure 跳转的总次数不超过 goto 成功的总次数,而后者不超过 \(n\)。所以总的状态转移次数不超过 \(2n\),加上输出收集的 \(z\) 次操作,总复杂度就是 \(O(n + z)\)

DFA 化之后,情况更简单——每个字符恰好一次转移,没有 failure 跳转,所以是严格 \(O(n + z)\),常数因子更小。

与有限自动机理论的联系

AC 自动机本质上是一个确定性有限自动机(DFA),它接受的语言是 \(\Sigma^* P_1 \Sigma^* \cup \Sigma^* P_2 \Sigma^* \cup \dots \cup \Sigma^* P_k \Sigma^*\)——也就是「包含至少一个模式串」的所有字符串。但更精确地说,AC 自动机不仅仅是一个「接受/拒绝」的识别器——它是一个转导器(transducer),在扫描文本的过程中不断输出匹配信息。

AC 自动机的巧妙之处在于,它避免了先构建 NFA 再子集构造转 DFA 的指数爆炸——通过利用 Trie 的结构和失败指针,直接构建出一个大小为 \(O(M)\) 的 DFA。如果我们走传统路线——先为每个模式串构建一个 NFA,合并成一个大 NFA,再做子集构造——最坏情况下状态数可以达到 \(O(2^M)\)。AC 的构造把这个过程「短路」了。

这也解释了为什么 AC 自动机不能直接处理正则表达式:正则表达式对应的 DFA 可能有指数多的状态,而 AC 自动机的构建方法只适用于精确字符串模式。Hyperscan 的方法是把正则分解成多个子表达式,精确串部分用 AC 匹配,剩余部分用 NFA/DFA 混合引擎处理。

Aho-Corasick 的变体与扩展

多年来,研究者提出了许多 AC 自动机的变体:

带通配符的 AC:模式串中允许 ? 匹配任意单个字符。处理方法是把每个模式串按通配符拆分成多个子串,分别用 AC 匹配,然后用位移验证各子串的相对位置是否正确。时间复杂度退化为 \(O(n \cdot m_{max})\),但在实际中通配符通常很少,性能仍然可以接受。

近似匹配的 AC:允许模式串有 \(k\) 个编辑距离以内的匹配。这本质上是把 AC 自动机和动态规划结合起来,状态空间变成 \((\text{AC 状态}, \text{编辑距离})\) 的二维网格。在生物信息学中有重要应用。

压缩 AC:对 Trie 中只有单个子节点的长链进行路径压缩(类似 Patricia Trie),减少状态数。在模式串很长且前缀共享少的场景下可以显著节省空间。

并行 AC:利用 SIMD 指令或 GPU 同时处理多个文本,或者对转移表的查表操作进行向量化。Intel 的 Hyperscan 就大量使用了 AVX2 指令来加速 AC 的搜索阶段。

个人观点

我用 AC 自动机做过三个项目:一个入侵检测原型、一个日志分析工具、一个中文文本过滤器。几点体会:

AC 自动机的代码比你想象的简单。核心逻辑不到 100 行。真正复杂的是工程层面的优化——内存布局、缓存友好性、流式处理。如果你的模式串不超过 1000 条,一个朴素的全数组实现就够了,不需要任何花哨的压缩。

不要过早优化转移表。我见过有人在只有 50 条规则的场景下就用双数组 Trie,结果代码复杂度翻了三倍,性能提升可以忽略不计。Profile first,optimize later。

AC 自动机最大的敌人是缓存缺失。在我的测试中,当自动机占用超过 L3 缓存(通常 8-32 MB)时,搜索性能会断崖式下跌——从每字符 1-2 ns 跌到每字符 10-20 ns。这时候你需要的不是更好的算法,而是更小的数据结构。

流式处理是个容易被忽视的需求。大多数教科书的 AC 实现都是一次性扫描整个文本。但在真实系统中,数据是分块到达的(网络数据包、文件块),你必须能够保存和恢复匹配状态。好在 AC 自动机天然支持这一点——状态就是一个整数,保存一个 int 就够了。

如果你只需要判断「是否存在匹配」而不需要找到所有匹配位置,可以大幅简化。去掉输出收集逻辑,只要到达任何输出状态就立即返回 true。这在防火墙场景中很常见——只需要知道「这个包是否包含恶意特征」,不需要知道具体在哪里。

最后一点:AC 自动机是「学一次受益终身」的算法。从 1975 年发表至今,它的核心思想没有任何变化——Trie + BFS 失败指针 + DFA 化——这三步的简洁性和正确性经受住了近半个世纪的工程实践检验。即使在今天 SIMD、GPU、FPGA 各种加速手段层出不穷的时代,绝大多数多模式匹配系统的骨架仍然是 AC 自动机。理解它,就理解了多模式匹配这个领域的基石。

十四、进一步阅读


上一篇: 后缀数组 下一篇: BWT 与 FM-index

相关阅读: - SIMD 字符串处理 - Bloom Filter 全家族


By .