土法炼钢兴趣小组的算法知识备份

SIMD 字符串处理进阶

目录

字符串处理是几乎所有后端系统的性能瓶颈之一。JSON 解析、日志扫描、CSV 导入、Base64 编解码——这些操作的共同特点是:逐字节遍历一段内存,对每个字节做分类或变换。标量代码一次处理一个字节;而现代 CPU 的 SIMD 寄存器宽达 128 到 512 位,一次可以并行处理 16 到 64 个字节。把字符串操作搬到 SIMD 上,10 倍性能提升并非夸张。

本文从最基础的指令集介绍开始,逐步深入到工业级项目(simdjson、Hyperscan)的核心算法,最后给出完整的 C 实现和工程经验总结。全文约一万五千字,建议配合编译器和 Godbolt 边读边练。

一、SIMD 指令集速览

SIMD(Single Instruction, Multiple Data)指令让 CPU 在一条指令内对多个数据元素执行相同操作。对字符串处理而言,最重要的三组指令集如下。

1.1 SSE2(x86, 128-bit)

SSE2 自 2001 年 Pentium 4 起成为 x86 基线。核心寄存器 xmm0xmm15,每个 128 位,可容纳 16 个 uint8_t

常用指令:

指令 含义 延迟/吞吐
_mm_loadu_si128 非对齐加载 128 位 1c / 0.5c
_mm_cmpeq_epi8 逐字节相等比较 1c / 0.5c
_mm_movemask_epi8 提取每字节最高位为 16-bit 掩码 1c / 1c
_mm_or_si128 按位或 1c / 0.33c
_mm_shuffle_epi8 按索引重排字节(SSSE3) 1c / 0.5c

其中 _mm_cmpeq_epi8 + _mm_movemask_epi8 是字符串扫描的万金油组合:先比较,再把比较结果压缩成一个标量整数掩码,后续用 ctz(count trailing zeros)逐个提取匹配位置。

1.2 AVX2(x86, 256-bit)

AVX2 自 Haswell(2013)起可用,寄存器 ymm0ymm15 宽 256 位,一次处理 32 字节。大部分 SSE2 指令都有 256 位版本,只需把 _mm_ 前缀换成 _mm256_

需要注意 AVX2 的 256 位操作在内部被分为两个 128 位 lane,_mm256_shuffle_epi8 只在 lane 内重排,跨 lane 需要额外的 _mm256_permute2x128_si256_mm256_permutevar8x32_epi32

1.3 ARM NEON(ARM, 128-bit)

NEON 在 ARMv7 及 AArch64 上可用,寄存器 v0v31 宽 128 位。核心指令:

// NEON 等价操作
uint8x16_t chunk = vld1q_u8(ptr);          // 加载
uint8x16_t cmp   = vceqq_u8(chunk, target); // 比较
// NEON 没有 movemask,需要手动缩位
uint8x8_t  narrowed = vshrn_n_u16(vreinterpretq_u16_u8(cmp), 4);
uint64_t   mask     = vget_lane_u64(vreinterpret_u64_u8(narrowed), 0);

NEON 缺少 movemask 是最大痛点,需要 4–5 条指令模拟。AArch64 的 SVE2 引入了 predicate 寄存器,情况大为改善,但普及率尚低。

1.4 指令集选择策略

实际项目中常见做法是运行时分发(runtime dispatch):

typedef size_t (*strlen_fn)(const char *);

strlen_fn resolve_strlen(void) {
    if (has_avx2())  return strlen_avx2;
    if (has_sse42()) return strlen_sse42;
    return strlen_scalar;
}

size_t my_strlen(const char *s)
    __attribute__((ifunc("resolve_strlen")));

glibc 的 strlenmemchrmemcmp 全部采用这种模式。编译时用 -march=native-mavx2 启用对应指令。

二、字节搜索:_mm_cmpeq_epi8 的基本范式

字符串处理中最原子的操作是在一段内存中查找某个字节。标量版:

const char *scalar_memchr(const char *s, char c, size_t n) {
    for (size_t i = 0; i < n; i++) {
        if (s[i] == c) return s + i;
    }
    return NULL;
}

一次一个字节,循环体内有分支。SIMD 版:

#include <immintrin.h>
#include <stdint.h>
#include <stddef.h>

const char *simd_memchr_sse2(const char *s, char c, size_t n) {
    __m128i target = _mm_set1_epi8(c);
    size_t i = 0;

    // 主循环:每次 16 字节
    for (; i + 16 <= n; i += 16) {
        __m128i chunk = _mm_loadu_si128((const __m128i *)(s + i));
        __m128i cmp   = _mm_cmpeq_epi8(chunk, target);
        int mask      = _mm_movemask_epi8(cmp);
        if (mask != 0) {
            return s + i + __builtin_ctz(mask);
        }
    }

    // 尾部标量处理
    for (; i < n; i++) {
        if (s[i] == c) return s + i;
    }
    return NULL;
}

核心三步:

  1. 广播目标字节到所有 lane:_mm_set1_epi8(c)
  2. 并行比较 16 字节:_mm_cmpeq_epi8,匹配的 lane 置 0xFF
  3. 压缩为掩码_mm_movemask_epi8 提取每 lane 最高位,组成 16-bit 整数。

这个模式几乎是所有 SIMD 字符串算法的基石。理解了它,后续的 strlen、换行符统计、JSON 结构字符检测都只是在此基础上的组合。

三、SIMD strlen 与 memchr 的工业实现

glibc 的 strlen 在 x86-64 上有超过 500 行汇编,处理了对齐、跨页、以及各种微架构特化。我们来看一个简化但完整的版本。

3.1 SIMD strlen

size_t simd_strlen(const char *s) {
    const char *p = s;
    __m128i zero = _mm_setzero_si128();

    // 处理起始非对齐部分
    size_t misalign = (uintptr_t)p & 15;
    if (misalign) {
        __m128i chunk = _mm_load_si128((const __m128i *)(p - misalign));
        __m128i cmp   = _mm_cmpeq_epi8(chunk, zero);
        int mask      = _mm_movemask_epi8(cmp);
        // 屏蔽对齐前的位
        mask >>= misalign;
        if (mask) return __builtin_ctz(mask);
        p += 16 - misalign;
    }

    // 对齐主循环
    for (;;) {
        __m128i chunk = _mm_load_si128((const __m128i *)p);
        __m128i cmp   = _mm_cmpeq_epi8(chunk, zero);
        int mask      = _mm_movemask_epi8(cmp);
        if (mask) return (p - s) + __builtin_ctz(mask);
        p += 16;
    }
}

关键技巧:

3.2 SIMD memchr 的 AVX2 版本

const char *simd_memchr_avx2(const char *s, char c, size_t n) {
    __m256i target = _mm256_set1_epi8(c);
    size_t i = 0;

    for (; i + 32 <= n; i += 32) {
        __m256i chunk = _mm256_loadu_si256((const __m256i *)(s + i));
        __m256i cmp   = _mm256_cmpeq_epi8(chunk, target);
        int mask      = _mm256_movemask_epi8(cmp);
        if (mask) return s + i + __builtin_ctz(mask);
    }

    // 尾部用 SSE2 处理
    for (; i + 16 <= n; i += 16) {
        __m128i chunk = _mm_loadu_si128((const __m128i *)(s + i));
        __m128i cmp   = _mm_cmpeq_epi8(chunk, _mm256_castsi256_si128(target));
        int mask      = _mm_movemask_epi8(cmp);
        if (mask) return s + i + __builtin_ctz(mask);
    }

    for (; i < n; i++) {
        if (s[i] == c) return s + i;
    }
    return NULL;
}

每次 32 字节,吞吐量约为标量版本的 16–20 倍(L1 热数据场景)。

四、PCMPISTRI 与 PCMPISTRM:SSE4.2 字符串指令

SSE4.2 引入了两条专用字符串指令:PCMPISTRI(返回索引)和 PCMPISTRM(返回掩码)。它们可以在一条指令内完成范围比较、集合匹配等操作。

4.1 指令格式

int _mm_cmpistri(__m128i a, __m128i b, int imm8);
__m128i _mm_cmpistrm(__m128i a, __m128i b, int imm8);

imm8 编码了四个维度:

位域 含义 选项
[1:0] 元素大小 00=uint8, 01=uint16, 10=int8, 11=int16
[3:2] 比较模式 00=相等每个, 01=范围, 10=相等任一, 11=相等有序
[5:4] 极性 00=正, 01=取反, 10=掩码, 11=取反掩码
[6] 输出选择 0=最低位, 1=最高位

4.2 范围匹配:字符分类

想要一次性判断 16 个字节是否都是 ASCII 字母或数字:

#include <nmmintrin.h>  // SSE4.2

int is_alnum_block(const char *s) {
    // 范围对:[0-9], [A-Z], [a-z]
    __m128i ranges = _mm_setr_epi8(
        '0', '9', 'A', 'Z', 'a', 'z', 0, 0,
         0,   0,   0,   0,   0,   0,  0, 0
    );
    __m128i data = _mm_loadu_si128((const __m128i *)s);
    // 模式:ranges(01) | uint8(00) | 取反极性(01) | 最低位(0)
    // imm8 = 0b00_01_01_00 = 0x14
    int idx = _mm_cmpistri(ranges, data, 0x14);
    // idx == 16 表示所有字节都匹配
    return idx == 16;
}

一条指令完成 6 个范围的并行匹配。不过 PCMPISTRI 的延迟较高(约 10 周期),在纯吞吐场景下可能不如 _mm_cmpeq_epi8 + 多次比较的组合。

4.3 有序子串搜索

int find_substr(const char *haystack, size_t hlen,
                const char *needle,   size_t nlen) {
    __m128i pat = _mm_loadu_si128((const __m128i *)needle);
    for (size_t i = 0; i + 16 <= hlen; ) {
        __m128i chunk = _mm_loadu_si128((const __m128i *)(haystack + i));
        // 相等有序(11) | uint8(00) | 正极性(00) | 最低位(0)
        // imm8 = 0x0C
        int idx = _mm_cmpistri(pat, chunk, 0x0C);
        if (idx < 16) {
            // 候选位置,需要验证完整匹配
            if (i + idx + nlen <= hlen &&
                memcmp(haystack + i + idx, needle, nlen) == 0) {
                return (int)(i + idx);
            }
            i += idx + 1;
        } else {
            i += 16;
        }
    }
    return -1;
}

对于短 needle(<=16 字节),PCMPISTRI 可以高效地找到候选位置,再用标量验证。glibc 的 strstr 在 SSE4.2 可用时正是采用这种策略。

五、simdjson:2.5 GB/s 的 JSON 解析

simdjson 是 SIMD 字符串处理的标杆项目。它的核心思想是:把 JSON 解析分为两阶段,第一阶段纯 SIMD,第二阶段才处理语义

SIMD 扫描流水线

5.1 第一阶段:结构字符检测

JSON 中的结构字符只有 { } [ ] : ,。simdjson 用 _mm256_cmpeq_epi8 对每个字符做比较,再用 OR 合并:

static inline uint64_t find_structural_bits(const uint8_t *buf) {
    __m256i lo = _mm256_loadu_si256((const __m256i *)(buf));
    __m256i hi = _mm256_loadu_si256((const __m256i *)(buf + 32));

    __m256i lbrace  = _mm256_set1_epi8('{');
    __m256i rbrace  = _mm256_set1_epi8('}');
    __m256i lbracket = _mm256_set1_epi8('[');
    __m256i rbracket = _mm256_set1_epi8(']');
    __m256i colon   = _mm256_set1_epi8(':');
    __m256i comma   = _mm256_set1_epi8(',');

    __m256i structural_lo = _mm256_or_si256(
        _mm256_or_si256(
            _mm256_or_si256(_mm256_cmpeq_epi8(lo, lbrace),
                            _mm256_cmpeq_epi8(lo, rbrace)),
            _mm256_or_si256(_mm256_cmpeq_epi8(lo, lbracket),
                            _mm256_cmpeq_epi8(lo, rbracket))
        ),
        _mm256_or_si256(_mm256_cmpeq_epi8(lo, colon),
                        _mm256_cmpeq_epi8(lo, comma))
    );

    // 同理处理 hi 部分...
    __m256i structural_hi = _mm256_or_si256(
        _mm256_or_si256(
            _mm256_or_si256(_mm256_cmpeq_epi8(hi, lbrace),
                            _mm256_cmpeq_epi8(hi, rbrace)),
            _mm256_or_si256(_mm256_cmpeq_epi8(hi, lbracket),
                            _mm256_cmpeq_epi8(hi, rbracket))
        ),
        _mm256_or_si256(_mm256_cmpeq_epi8(hi, colon),
                        _mm256_cmpeq_epi8(hi, comma))
    );

    uint32_t mask_lo = (uint32_t)_mm256_movemask_epi8(structural_lo);
    uint32_t mask_hi = (uint32_t)_mm256_movemask_epi8(structural_hi);
    return ((uint64_t)mask_hi << 32) | mask_lo;
}

64 字节输入产生一个 64-bit 掩码,每个 1 表示对应位置是结构字符。

但以上只是简化版本。真正的 simdjson 还需要处理引号内的结构字符(不应算作结构)和反斜杠转义

5.2 引号与转义处理

simdjson 的关键洞察:用 _mm256_cmpeq_epi8 同时找出所有 "\,然后用位运算确定哪些引号是有效的字符串边界。

输入:   {"key": "val\"ue"}
引号位: .1....1..1....1.1.
反斜杠: ..............1...

反斜杠的处理需要考虑连续反斜杠(\\ 表示一个字面反斜杠),simdjson 用前缀异或(prefix XOR)在 O(1) 内解决:

// 计算奇偶性前缀
// 连续 n 个反斜杠:奇数个 -> 最后一个是转义;偶数个 -> 不转义
uint64_t follows_escape = clmul(backslash_mask, 0xFFFFFFFFFFFFFFFFULL);
uint64_t odd_backslash  = follows_escape & backslash_mask;
uint64_t escaped_chars  = follows_escape & ~backslash_mask;

这里用到了 PCLMULQDQ(无进位乘法),把异或前缀问题变成了一次乘法。

5.3 UTF-8 验证

simdjson 的 UTF-8 验证也是纯 SIMD 的,利用 _mm256_shuffle_epi8 做查表:

这个算法由 Keiser 和 Lemire 在 2018 年的论文中提出,每 64 字节只需约 15 条 SIMD 指令。

5.4 Tape 生成

第一阶段输出一个结构索引数组(tape),记录每个结构字符在原始缓冲区中的偏移。第二阶段遍历 tape,构建 DOM 树。两阶段分离的好处:

  1. 第一阶段没有分支(branchless),完全可预测,不会污染分支预测器。
  2. 第一阶段的内存访问模式是严格顺序的,对预取器极度友好。
  3. 第二阶段虽然有分支,但 tape 已经把无关字节(空白、字符串内容)全部跳过,工作量大幅减少。

simdjson 在 Skylake 上实测可达 2.5 GB/s 的 JSON 解析速度,约为 RapidJSON 的 4 倍、nlohmann/json 的 25 倍。

六、SIMD CSV 解析

CSV 的结构比 JSON 更简单——只需要找逗号 ,、换行符 \n、引号 " 三种字符。但 CSV 的引号转义规则不同:字段内的引号用 "" 双写表示。

6.1 并行字段分割

void parse_csv_line_avx2(const char *line, size_t len,
                         uint32_t *field_starts, int *nfields) {
    __m256i comma = _mm256_set1_epi8(',');
    __m256i quote = _mm256_set1_epi8('"');
    int in_quote = 0;  // 标量状态:当前是否在引号内
    int count = 0;

    field_starts[count++] = 0;

    for (size_t i = 0; i + 32 <= len; i += 32) {
        __m256i chunk = _mm256_loadu_si256((const __m256i *)(line + i));
        uint32_t comma_mask = _mm256_movemask_epi8(
            _mm256_cmpeq_epi8(chunk, comma));
        uint32_t quote_mask = _mm256_movemask_epi8(
            _mm256_cmpeq_epi8(chunk, quote));

        // 引号切换 in_quote 状态
        int quote_count = __builtin_popcount(quote_mask);
        // 处理逗号:只有 in_quote==0 时才算分隔符
        // 这里需要逐位扫描 quote_mask 和 comma_mask
        // 实际实现更复杂,此处为简化演示
        while (comma_mask && !in_quote) {
            int pos = __builtin_ctz(comma_mask);
            field_starts[count++] = (uint32_t)(i + pos + 1);
            comma_mask &= comma_mask - 1;
        }

        if (quote_count & 1) in_quote ^= 1;
    }

    *nfields = count;
}

实际的高性能 CSV 解析器(如 ParaText、xsv)会更激进地用 SIMD 处理引号状态——把引号掩码做前缀异或来确定每个位置是否在引号内,与 simdjson 处理字符串的思路一致。

6.2 SIMD 与标量状态的交互

CSV 解析暴露了 SIMD 字符串处理的一个通用难题:字节级的含义依赖上下文。引号内的逗号不是分隔符;转义后的引号不是边界。解决思路有两种:

  1. 前缀计算法:把状态变化编码为位掩码,用前缀运算(异或、加法)一次性传播。simdjson 和高性能 CSV 解析器都用这种方法。
  2. 分块 + 标量衔接法:每个 SIMD 块内做无状态扫描,块边界处用标量代码修正状态。实现简单但块边界处性能下降。

七、SIMD Base64 编解码

Base64 是另一个非常适合 SIMD 化的操作:输入每 3 字节映射到输出 4 字节(编码),或反过来(解码)。映射规则固定,无上下文依赖。

7.1 编码流水线

输入:  [A0 A1 A2] [B0 B1 B2] [C0 C1 C2] [D0 D1 D2]  (12 bytes)
拆分:  每 3 字节拆成 4 个 6-bit 值
查表:  用 shuffle 在 SIMD 寄存器内做 6-bit -> ASCII 映射
输出:  16 个 Base64 字符

关键操作是用 _mm_shuffle_epi8 做 LUT(查找表)。6-bit 值只有 64 种可能,而一个 xmm 寄存器恰好能存 16 个 entry。需要 4 张子表覆盖全部 64 种映射,通过高位判断选择子表。

7.2 解码流水线

解码的核心是把 4 个 ASCII 字符反映射为 3 字节。Wojciech Mula 的经典实现用 _mm_shuffle_epi8 做反查表,然后用 _mm_maddubs_epi16 + _mm_madd_epi16 做 6-bit 拼接:

// 简化版:4 个 6-bit 值 -> 3 字节
// v = [00aaaaaa|00bbbbbb|00cccccc|00dddddd]
__m128i merge_ab = _mm_maddubs_epi16(v, _mm_set1_epi32(0x01400140));
// merge_ab = [00000000|aaaaaabb|00000000|ccccccdd] (每 16-bit 合并两个 6-bit)
__m128i merged = _mm_madd_epi16(merge_ab, _mm_set1_epi32(0x00011000));
// merged = [00000000|aaaaaabb|bbbbcccc|ccdddddd] (每 32-bit 合并成 24-bit)
__m128i result = _mm_shuffle_epi8(merged, pack_shuffle);

SIMD Base64 编解码的吞吐量可达 8–12 GB/s(L1 热数据),是标量版本的 5–8 倍。AWS 的 aws-lc 库和 Chromium 的 Base64 实现都采用了类似技术。

7.3 完整的 AVX2 解码循环

void base64_decode_avx2(const char *src, size_t src_len,
                        uint8_t *dst) {
    __m256i lut_lo = _mm256_setr_epi8(
        0x15, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
        0x11, 0x11, 0x13, 0x1A, 0x1B, 0x1B, 0x1B, 0x1A,
        0x15, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
        0x11, 0x11, 0x13, 0x1A, 0x1B, 0x1B, 0x1B, 0x1A
    );
    __m256i lut_hi = _mm256_setr_epi8(
        0x10, 0x10, 0x01, 0x02, 0x04, 0x08, 0x04, 0x08,
        0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10,
        0x10, 0x10, 0x01, 0x02, 0x04, 0x08, 0x04, 0x08,
        0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10
    );
    __m256i lut_roll = _mm256_setr_epi8(
         0,  16, 19,  4, -65, -65, -71, -71,
         0,   0,  0,  0,   0,   0,   0,   0,
         0,  16, 19,  4, -65, -65, -71, -71,
         0,   0,  0,  0,   0,   0,   0,   0
    );

    size_t si = 0, di = 0;
    for (; si + 32 <= src_len; si += 32, di += 24) {
        __m256i input = _mm256_loadu_si256((const __m256i *)(src + si));

        // 校验与反映射
        __m256i hi_nibbles = _mm256_and_si256(
            _mm256_srli_epi32(input, 4), _mm256_set1_epi8(0x0F));
        __m256i lo_nibbles = _mm256_and_si256(
            input, _mm256_set1_epi8(0x2F));

        __m256i hi_check = _mm256_shuffle_epi8(lut_hi, hi_nibbles);
        __m256i lo_check = _mm256_shuffle_epi8(lut_lo, lo_nibbles);
        // hi_check & lo_check 若非零则输入非法

        __m256i roll = _mm256_shuffle_epi8(lut_roll, hi_nibbles);
        input = _mm256_add_epi8(input, roll);  // ASCII -> 6-bit

        // 6-bit 合并为 8-bit
        __m256i merge1 = _mm256_maddubs_epi16(
            input, _mm256_set1_epi32(0x01400140));
        __m256i merge2 = _mm256_madd_epi16(
            merge1, _mm256_set1_epi32(0x00011000));

        // 重排字节顺序
        __m256i pack = _mm256_shuffle_epi8(merge2, _mm256_setr_epi8(
            2,1,0, 6,5,4, 10,9,8, 14,13,12, -1,-1,-1,-1,
            2,1,0, 6,5,4, 10,9,8, 14,13,12, -1,-1,-1,-1
        ));

        // 跨 lane 合并
        __m256i output = _mm256_permutevar8x32_epi32(pack,
            _mm256_setr_epi32(0,1,2,4,5,6,-1,-1));

        // 写出 24 字节
        _mm256_storeu_si256((__m256i *)(dst + di), output);
    }
    // 尾部标量处理省略
}

八、Hyperscan:SIMD 驱动的正则引擎

Intel 的 Hyperscan 是一个面向网络安全场景的高性能正则引擎。它的核心技术之一是用 SIMD 加速多模式匹配

8.1 FDR:Fat DFA via SIMD

传统的 Aho-Corasick 自动机在模式数量很大时(几千到几万条规则),转移表会非常大,缓存不友好。Hyperscan 的 FDR(Fat Double-Robust)算法把转移表拆成多个小查找表,每张恰好放进一个 SIMD 寄存器。

输入流:   ... a  b  c  d  e  f  g  h ...
步骤 1:   取每字节的低 4 位作为 shuffle 索引
           _mm_shuffle_epi8(bucket_mask, low_nibbles)
步骤 2:   取高 4 位,同理 shuffle 另一张表
步骤 3:   AND 两个结果 -> 候选桶掩码
步骤 4:   对非零桶执行精确验证

这种方法把 Aho-Corasick 的状态转移变成了 SIMD 查表 + 位运算,在规则数 >100 时性能优势明显。

8.2 Teddy:短模式的 SIMD 匹配

对于短模式(<=8 字节),Hyperscan 使用 Teddy 算法:

  1. _mm_shuffle_epi8 分别查找每字节的低半字节和高半字节的候选模式集合。
  2. AND 两个集合得到同时匹配的模式。
  3. 对多个字节位置重复上述过程,再 AND 所有位置的结果。
  4. 最终非零的位表示有模式完全匹配。

Teddy 可以在一次 SIMD 操作中同时检查 16 个输入位置、128 种短模式的匹配情况。

8.3 Hyperscan 的分层架构

                 输入流
                   |
          +--------+--------+
          |  预过滤层 (SIMD) |    FDR / Teddy
          +--------+--------+
                   |
            候选匹配位置
                   |
          +--------+--------+
          |  NFA 精确验证    |    有限状态机
          +--------+--------+
                   |
              确认匹配

SIMD 层作为预过滤器,把不可能匹配的位置快速排除;只有通过预过滤的少量位置才需要跑完整的 NFA。在实际网络流量中,预过滤器能排除 99%+ 的位置。

九、完整实现:SIMD memchr 与 JSON 结构扫描器

以下是一个可编译、可运行的完整 C 实现,包含 SIMD memchr 和 JSON 结构字符扫描器两个函数,共约 200 行。

/*
 * simd_string.c -- SIMD memchr + JSON structural character scanner
 *
 * 编译:gcc -O2 -mavx2 -o simd_string simd_string.c
 * 运行:./simd_string
 */

#include <immintrin.h>
#include <stdint.h>
#include <stddef.h>
#include <stdio.h>
#include <string.h>
#include <time.h>

/* ========== 第一部分:SIMD memchr (AVX2) ========== */

static const char *simd_memchr(const char *s, char c, size_t n) {
    __m256i target = _mm256_set1_epi8(c);
    size_t i = 0;

    /* 主循环:每次处理 64 字节(两个 AVX2 寄存器) */
    for (; i + 64 <= n; i += 64) {
        __m256i chunk0 = _mm256_loadu_si256((const __m256i *)(s + i));
        __m256i chunk1 = _mm256_loadu_si256((const __m256i *)(s + i + 32));

        __m256i cmp0 = _mm256_cmpeq_epi8(chunk0, target);
        __m256i cmp1 = _mm256_cmpeq_epi8(chunk1, target);

        uint32_t mask0 = (uint32_t)_mm256_movemask_epi8(cmp0);
        uint32_t mask1 = (uint32_t)_mm256_movemask_epi8(cmp1);

        if (mask0) return s + i + __builtin_ctz(mask0);
        if (mask1) return s + i + 32 + __builtin_ctz(mask1);
    }

    /* 处理 32 字节块 */
    for (; i + 32 <= n; i += 32) {
        __m256i chunk = _mm256_loadu_si256((const __m256i *)(s + i));
        __m256i cmp   = _mm256_cmpeq_epi8(chunk, target);
        uint32_t mask = (uint32_t)_mm256_movemask_epi8(cmp);
        if (mask) return s + i + __builtin_ctz(mask);
    }

    /* 尾部逐字节 */
    for (; i < n; i++) {
        if (s[i] == c) return s + i;
    }
    return NULL;
}

/* 标量版对照 */
static const char *scalar_memchr(const char *s, char c, size_t n) {
    for (size_t i = 0; i < n; i++) {
        if (s[i] == c) return s + i;
    }
    return NULL;
}


/* ========== 第二部分:JSON 结构字符扫描器 ========== */

typedef struct {
    uint32_t *indices;   /* 结构字符位置数组 */
    size_t    count;     /* 找到的结构字符数 */
    size_t    capacity;  /* 数组容量 */
} structural_index_t;

static void ensure_capacity(structural_index_t *idx, size_t needed) {
    if (idx->count + needed > idx->capacity) {
        idx->capacity = (idx->count + needed) * 2;
        /* 实际使用中应检查 realloc 返回值 */
    }
}

/*
 * 扫描 64 字节块,找出所有结构字符 { } [ ] : ,
 * 返回 64-bit 掩码,每一位对应一个字节是否为结构字符
 *
 * 注意:此为简化版,未处理引号内的结构字符
 */
static uint64_t find_structural_bits_64(const uint8_t *buf) {
    __m256i lo = _mm256_loadu_si256((const __m256i *)buf);
    __m256i hi = _mm256_loadu_si256((const __m256i *)(buf + 32));

    __m256i lbrace   = _mm256_set1_epi8('{');
    __m256i rbrace   = _mm256_set1_epi8('}');
    __m256i lbracket = _mm256_set1_epi8('[');
    __m256i rbracket = _mm256_set1_epi8(']');
    __m256i colon    = _mm256_set1_epi8(':');
    __m256i comma    = _mm256_set1_epi8(',');

    /* 低 32 字节 */
    __m256i s_lo = _mm256_or_si256(
        _mm256_or_si256(
            _mm256_or_si256(_mm256_cmpeq_epi8(lo, lbrace),
                            _mm256_cmpeq_epi8(lo, rbrace)),
            _mm256_or_si256(_mm256_cmpeq_epi8(lo, lbracket),
                            _mm256_cmpeq_epi8(lo, rbracket))),
        _mm256_or_si256(_mm256_cmpeq_epi8(lo, colon),
                        _mm256_cmpeq_epi8(lo, comma)));

    /* 高 32 字节 */
    __m256i s_hi = _mm256_or_si256(
        _mm256_or_si256(
            _mm256_or_si256(_mm256_cmpeq_epi8(hi, lbrace),
                            _mm256_cmpeq_epi8(hi, rbrace)),
            _mm256_or_si256(_mm256_cmpeq_epi8(hi, lbracket),
                            _mm256_cmpeq_epi8(hi, rbracket))),
        _mm256_or_si256(_mm256_cmpeq_epi8(hi, colon),
                        _mm256_cmpeq_epi8(hi, comma)));

    uint32_t mask_lo = (uint32_t)_mm256_movemask_epi8(s_lo);
    uint32_t mask_hi = (uint32_t)_mm256_movemask_epi8(s_hi);

    return ((uint64_t)mask_hi << 32) | mask_lo;
}

/*
 * 从 64-bit 掩码中提取所有 1 的位置,写入索引数组
 */
static void extract_positions(uint64_t bits, uint32_t base,
                              structural_index_t *idx) {
    while (bits) {
        int pos = __builtin_ctzll(bits);
        idx->indices[idx->count++] = base + (uint32_t)pos;
        bits &= bits - 1;  /* 清除最低位 */
    }
}

/*
 * 完整的 JSON 结构索引构建函数
 */
static void build_structural_index(const uint8_t *buf, size_t len,
                                   structural_index_t *idx) {
    size_t i = 0;
    idx->count = 0;

    /* 主循环:每次 64 字节 */
    for (; i + 64 <= len; i += 64) {
        uint64_t bits = find_structural_bits_64(buf + i);
        ensure_capacity(idx, 64);
        extract_positions(bits, (uint32_t)i, idx);
    }

    /* 尾部逐字节 */
    for (; i < len; i++) {
        uint8_t c = buf[i];
        if (c == '{' || c == '}' || c == '[' || c == ']' ||
            c == ':' || c == ',') {
            ensure_capacity(idx, 1);
            idx->indices[idx->count++] = (uint32_t)i;
        }
    }
}


/* ========== 第三部分:测试与基准 ========== */

static double now_sec(void) {
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    return ts.tv_sec + ts.tv_nsec * 1e-9;
}

static void bench_memchr(const char *data, size_t len) {
    const int ITERS = 1000;
    double t0, t1;
    volatile const char *result;

    /* SIMD */
    t0 = now_sec();
    for (int i = 0; i < ITERS; i++) {
        result = simd_memchr(data, '}', len);
    }
    t1 = now_sec();
    double simd_time = t1 - t0;

    /* 标量 */
    t0 = now_sec();
    for (int i = 0; i < ITERS; i++) {
        result = scalar_memchr(data, '}', len);
    }
    t1 = now_sec();
    double scalar_time = t1 - t0;

    printf("memchr benchmark (%zu bytes x %d iters):\n", len, ITERS);
    printf("  SIMD:   %.3f ms  (%.2f GB/s)\n",
           simd_time * 1000,
           (double)len * ITERS / simd_time / 1e9);
    printf("  Scalar: %.3f ms  (%.2f GB/s)\n",
           scalar_time * 1000,
           (double)len * ITERS / scalar_time / 1e9);
    printf("  Speedup: %.1fx\n\n", scalar_time / simd_time);
    (void)result;
}

static void test_structural_scanner(void) {
    const char *json =
        "{\"name\":\"Alice\",\"scores\":[100,95,87],"
        "\"address\":{\"city\":\"Beijing\",\"zip\":\"100000\"}}";
    size_t len = strlen(json);

    uint32_t index_buf[256];
    structural_index_t idx = { index_buf, 0, 256 };

    build_structural_index((const uint8_t *)json, len, &idx);

    printf("JSON structural scanner test:\n");
    printf("  Input (%zu bytes): %.60s...\n", len, json);
    printf("  Found %zu structural characters:\n  ", idx.count);

    for (size_t i = 0; i < idx.count; i++) {
        printf("[%u]='%c' ", idx.indices[i], json[idx.indices[i]]);
    }
    printf("\n\n");
}

int main(void) {
    /* 功能测试 */
    const char *test = "hello, world!";
    const char *p = simd_memchr(test, 'w', strlen(test));
    printf("simd_memchr test: found '%c' at offset %td\n\n",
           *p, p - test);

    test_structural_scanner();

    /* 性能基准:1MB 缓冲区 */
    size_t buflen = 1024 * 1024;
    char *buf = (char *)malloc(buflen);
    memset(buf, 'a', buflen);
    buf[buflen - 1] = '}';  /* 目标在最后 */
    bench_memchr(buf, buflen);
    free(buf);

    return 0;
}

编译与运行:

gcc -O2 -mavx2 -o simd_string simd_string.c
./simd_string

预期输出(数值因硬件而异):

simd_memchr test: found 'w' at offset 7

JSON structural scanner test:
  Input (82 bytes): {"name":"Alice","scores":[100,95,87],"address":{"city":"Bei...
  Found 16 structural characters:
  [0]='{' [6]=':' [14]=','  [22]=':' [24]='[' [28]=',' [31]=',' [34]=']' ...

memchr benchmark (1048576 bytes x 1000 iters):
  SIMD:   28.500 ms  (36.79 GB/s)
  Scalar: 485.200 ms  (2.16 GB/s)
  Speedup: 17.0x

十、自动向量化 vs 手写 Intrinsics

编译器的自动向量化(auto-vectorization)能力在过去十年有了巨大进步。何时该依赖编译器,何时该手写?

10.1 自动向量化能搞定的

// 编译器可以自动向量化的简单循环
void to_upper(char *s, size_t n) {
    for (size_t i = 0; i < n; i++) {
        if (s[i] >= 'a' && s[i] <= 'z')
            s[i] -= 32;
    }
}

gcc -O3 -mavx2 编译,Godbolt 上可以看到编译器生成了 vpcmpgtb + vpand 的向量化代码。

10.2 自动向量化搞不定的

以下模式编译器通常无法自动向量化:

  1. 跨迭代依赖:当前迭代的结果影响下一次迭代的行为(如引号状态追踪)。
  2. 非线性控制流:循环体内有复杂分支(如 UTF-8 多字节序列处理)。
  3. scatter/gather 访问:输出位置依赖输入数据(如压缩操作,只输出匹配元素)。
  4. 位操作技巧ctz、前缀异或、无进位乘法等在标量代码中无对应物。

10.3 实用建议

场景 建议
简单逐元素变换(大小写转换、加减常数) 依赖自动向量化,加 #pragma omp simd__attribute__((optimize("O3")))
字节搜索、memchr 类操作 手写 intrinsics,或使用成熟库
有状态扫描(JSON、CSV、UTF-8) 必须手写,编译器无法推导出前缀异或等技巧
查表变换(Base64、十六进制转换) 手写 shuffle 查表,自动向量化几乎不可能生成这种模式

Clang 和 GCC 都支持用 -Rpass=loop-vectorize 查看哪些循环被成功向量化了。如果关键循环没有被向量化,再考虑手写。

十一、基准测试:SIMD vs 标量

以下数据在 Intel Core i7-12700K(Alder Lake, AVX2)上测得,数据集 1 MB,L1 热,取 1000 次迭代中位数。

11.1 各操作性能对比

操作 标量吞吐 (GB/s) SIMD 吞吐 (GB/s) 加速比 指令集
memchr(目标在末尾) 2.1 35.8 17.0x AVX2
strlen 2.3 32.4 14.1x AVX2
换行符计数 1.9 28.7 15.1x AVX2
UTF-8 验证 0.8 12.5 15.6x AVX2 + shuffle
JSON 结构扫描 0.6 9.8 16.3x AVX2
Base64 解码 0.9 8.2 9.1x AVX2 + maddubs
CSV 字段分割 1.2 7.5 6.3x AVX2
大小写转换 3.5 30.2 8.6x AVX2(自动向量化)
子串搜索(16B needle) 0.4 4.8 12.0x SSE4.2 PCMPISTRI
十六进制编码 1.1 10.5 9.5x AVX2 + shuffle

11.2 加速比的影响因素

加速比并非固定常数,受以下因素影响:

11.3 何时不值得 SIMD 化

十二、工程陷阱与经验总结

12.1 常见陷阱一览

陷阱 症状 解决方案
非对齐跨页读取 段错误 (SIGSEGV) 在页边界附近使用对齐加载,或确保缓冲区尾部有 padding
缓冲区尾部越界 读到垃圾数据 分配时多分配 32/64 字节作为安全余量,或用掩码屏蔽尾部无效字节
AVX-SSE 转换惩罚 性能下降 70%+ 在 AVX 代码前后加 _mm256_zeroupper() 或使用 -mvzeroupper
跨 lane shuffle 错误 结果数据乱序 AVX2 的 shuffle 只在 128-bit lane 内生效,跨 lane 需要 permute
编译器不生成期望指令 性能不及预期 检查 Godbolt 输出,确认 -mavx2 已启用,必要时用 __attribute__((target("avx2")))
Denormal 浮点数 FP SIMD 操作突然变慢 设置 _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON)
内存别名 编译器不敢向量化 restrict 限定符,或用 __builtin_assume_aligned
NEON 缺少 movemask ARM 上性能不及 x86 用缩位操作模拟,或等待 SVE2
大端序架构 字节序错乱 SIMD 字符串操作通常假设小端序,大端需要额外处理
频繁 AVX-512 降频 整体系统变慢 避免在延迟敏感路径使用 AVX-512,或限制 512-bit 指令密度

12.2 调试技巧

打印 __m256i 的内容:

static void print_m256i(const char *label, __m256i v) {
    uint8_t buf[32];
    _mm256_storeu_si256((__m256i *)buf, v);
    printf("%s: ", label);
    for (int i = 0; i < 32; i++) printf("%02x ", buf[i]);
    printf("\n");
}

在 GDB 中查看 SIMD 寄存器:

(gdb) p $ymm0.v32_int8
(gdb) p/x $xmm0.v16_int8

12.3 可移植性层

推荐使用可移植的 SIMD 抽象库来降低维护成本:

语言 特点
simde C/C++ 头文件库,在不支持的平台上用标量模拟
highway C++ Google 出品,面向可移植 SIMD
std::experimental::simd C++ 标准提案(P0214),主流编译器部分支持
packed_simd / std::simd Rust Rust nightly 可用

12.4 个人思考

写了几年 SIMD 代码后,我有以下体会:

第一,SIMD 不是银弹。 很多时候算法层面的优化(更好的数据结构、减少不必要的拷贝、改善内存布局)带来的收益比 SIMD 化更大。SIMD 应该是优化链的最后一环,而不是第一环。

第二,位运算是 SIMD 字符串处理的灵魂。 ctzpopcount、前缀异或、bits &= bits - 1 这些技巧在标量编程中很少用到,但在 SIMD 编程中无处不在。建议花时间系统学习 Hacker’s Delight 中的位操作技巧。

第三,simdjson 的两阶段架构是一个通用的设计模式。 不仅适用于 JSON,任何需要扫描文本格式的场景都可以借鉴——先用 SIMD 建立结构索引,再用标量代码解释语义。这种分离让两部分都能各自做到最优。

第四,可移植性问题正在缓解。 随着 WASM SIMD、ARM SVE2、RISC-V V 扩展的成熟,以及 highway、simde 等抽象库的完善,写一次 SIMD 代码跑在所有平台上正在变为现实。但现阶段,如果你的目标平台以 x86 为主,直接用 intrinsics 仍然是性能最优解。

第五,衡量,衡量,再衡量。 SIMD 代码的性能高度依赖微架构、缓存状态、数据特征。永远不要凭直觉判断一段 SIMD 代码是否更快——用 perf stat 看 IPC 和缓存命中率,用 nanobenchgoogle benchmark 做微基准测试,用真实数据做端到端测试。

最后送一句话:标量代码是给人看的,SIMD 代码是给 CPU 看的。 两者之间的桥梁是清晰的注释和完善的测试。如果你的 SIMD 代码没有比标量代码更多的注释,那它迟早会变成一个谁也不敢碰的黑盒。

附录:参考资料

  1. Daniel Lemire, Geoff Langdale. “Parsing Gigabytes of JSON per Second”. VLDB Journal, 2019.
  2. Wojciech Mula, Daniel Lemire. “Faster Base64 Encoding and Decoding Using AVX2 Instructions”. ACM Transactions on the Web, 2018.
  3. Wojciech Mula, Daniel Lemire. “Validating UTF-8 In Less Than One Instruction Per Byte”. Software: Practice and Experience, 2021.
  4. Intel. “Hyperscan: High-performance regular expression matching library”. https://www.hyperscan.io/
  5. Intel. “Intel Intrinsics Guide”. https://www.intel.com/content/www/us/en/docs/intrinsics-guide/
  6. Agner Fog. “Instruction tables”. https://www.agner.org/optimize/
  7. Henry S. Warren Jr. “Hacker’s Delight”, 2nd Edition, Addison-Wesley, 2012.

上一篇: 字符串哈希 下一篇: Unicode 算法 相关阅读: - 向量化哈希 - 整数压缩


By .