土法炼钢兴趣小组的算法知识备份

Count-Min Sketch:流式频率估计的瑞士军刀

目录

一、从一个简单问题说起:这个元素出现了多少次

在数据流的世界里,有一类看似简单的问题:给定一个元素,它在数据流中出现了多少次?

这个问题被称为 频率估计(Frequency Estimation)点查询(Point Query)。精确回答它很容易——维护一个哈希表,key 是元素,value 是计数。但在以下场景中,精确计数变得不可行:

  1. 数据流无限长:网络路由器每秒处理数百万个数据包,流永远不会停止。
  2. 元素空间巨大:IP 地址有 2^32 种可能,URL 的空间更是无限。
  3. 内存有限:嵌入式设备或 FPGA 上只有 KB 级别的 SRAM。
  4. 单遍扫描:数据只能看一次,不能回溯。

如果数据流中有 n 个不同的元素,精确计数需要 O(n) 的空间。当 n 达到数十亿甚至更大时,这个开销是不可接受的。

我们需要的是一种 亚线性(sublinear) 空间的数据结构——用远小于 O(n) 的空间,给出一个”足够好”的近似答案。

Count-Min Sketch(以下简称 CMS)正是为这个问题而生的。它由 Graham Cormode 和 S. Muthukrishnan 在 2005 年的论文 “An Improved Data Stream Summary: The Count-Min Sketch and its Applications” 中提出。这个数据结构简洁优美,实现简单,理论保证强大,已经成为流式算法工具箱中最常用的工具之一。

在这一篇中,我会从问题定义出发,完整推导 CMS 的结构、操作、误差保证,讨论它的变体和改进,给出一份可以直接编译运行的 C 实现,并结合实际工程经验分享我的看法。

二、Count-Min Sketch 的结构

CMS 的核心是一个二维计数器数组,加上一组独立的哈希函数。

基本组成

CMS 由两个参数决定:

数据结构包含:

  1. 一个 d × w 的二维整数数组 count[d][w],所有元素初始化为 0。
  2. d 个独立的哈希函数 h_1, h_2, ..., h_d,每个将元素映射到 {0, 1, ..., w-1}

可以将其想象成一个矩阵:每一行对应一个哈希函数,每一列对应一个桶。每个元素通过 d 个哈希函数被映射到 d 个不同行中的 d 个桶。

          列 0    列 1    列 2    列 3    列 4    ... 列 w-1
行 0 (h₁)  [  ]    [  ]    [  ]    [  ]    [  ]    ... [  ]
行 1 (h₂)  [  ]    [  ]    [  ]    [  ]    [  ]    ... [  ]
行 2 (h₃)  [  ]    [  ]    [  ]    [  ]    [  ]    ... [  ]
...
行 d-1(hd)  [  ]    [  ]    [  ]    [  ]    [  ]    ... [  ]

参数的选择

w 和 d 并不是随意选择的。它们由两个精度参数 ε(epsilon)和 δ(delta)决定:

参数的设定公式:

w = ⌈e / ε⌉     (e 是自然常数 ≈ 2.718)
d = ⌈ln(1/δ)⌉

总空间为 w × d 个计数器。例如,设 ε = 0.001,δ = 0.01:

w = ⌈2.718 / 0.001⌉ = 2718
d = ⌈ln(100)⌉ = ⌈4.605⌉ = 5

总共只需要 2718 × 5 = 13590 个计数器。如果每个计数器用 4 字节,总共约 53 KB。这个空间可以跟踪任意数量的不同元素,精度为千分之一。

Count-Min Sketch 结构图

与 Bloom Filter 的结构对比

CMS 的结构和 Bloom Filter 有几分相似:

特征 Bloom Filter Count-Min Sketch
底层数组 一维比特数组 二维整数数组(d × w)
哈希函数 k 个 d 个
存储内容 0 或 1 计数值
回答的问题 元素是否存在 元素出现了多少次
假阳性 会误报”存在” 会高估频率
假阴性 不会 不会低估频率

可以把 CMS 看作 Bloom Filter 从”存在性”到”频率”的自然推广。

三、更新与查询操作

更新操作(Update / Add)

当数据流中到达一个元素 e 时,执行以下操作:

对于 i = 0, 1, ..., d-1:
    count[i][h_i(e)] += 1

也就是说,将 e 通过每个哈希函数映射到对应行的一个列,然后将这 d 个位置的计数器各加 1。

更新的时间复杂度是 O(d),因为需要计算 d 个哈希值并执行 d 次加法。

更一般地,CMS 支持加权更新。当元素 e 带有权重 c 时:

对于 i = 0, 1, ..., d-1:
    count[i][h_i(e)] += c

这使得 CMS 可以用于统计字节数、金额等非单位权重的场景。

查询操作(Query / Estimate)

要估计元素 e 的频率,执行以下操作:

f̂(e) = min{count[i][h_i(e)] : i = 0, 1, ..., d-1}

也就是说,找到 e 在 d 行中对应的 d 个计数器的值,取其中的最小值作为频率估计。

为什么取最小值?因为每个计数器可能因为哈希碰撞而被其他元素”污染”(增加),所以每个计数器的值都 大于等于 真实频率。取最小值可以最大程度地减少碰撞带来的高估。

查询的时间复杂度同样是 O(d)。

一个具体的例子

假设 w = 5,d = 3,数据流为 [a, b, a, c, a, b, d, a]

哈希函数的映射(假设):

h₁: a→2, b→0, c→3, d→1
h₂: a→1, b→4, c→1, d→3
h₃: a→3, b→2, c→0, d→4

处理完所有元素后,矩阵状态:

         列0  列1  列2  列3  列4
行0(h₁): [2]  [1]  [4]  [1]  [0]   ← b=2, d=1, a=4, c=1
行1(h₂): [0]  [5]  [0]  [1]  [2]   ← a+c=5, d=1, b=2
行2(h₃): [1]  [0]  [2]  [4]  [1]   ← c=1, b=2, a=4, d=1

查询 a 的频率:

f̂(a) = min(count[0][2], count[1][1], count[2][3])
      = min(4, 5, 4)
      = 4    ← 真实值就是 4,精确!

查询 c 的频率:

f̂(c) = min(count[0][3], count[1][1], count[2][0])
      = min(1, 5, 1)
      = 1    ← 真实值就是 1,精确!

注意 count[1][1] 的值是 5,因为 a 和 c 在行 1 碰撞了(都映射到列 1)。但取最小值后,碰撞的影响被其他行”纠正”了。

删除操作

标准 CMS 不支持删除。因为将计数器减 1 可能导致某些计数器变为负数(当碰撞元素被”过度删除”时),破坏”只高估不低估”的性质。

如果确实需要删除功能,可以使用 Count Sketch(下文讨论)或维护两个 CMS(一个跟踪插入,一个跟踪删除),但这会使误差分析复杂化。

四、误差分析:为什么 CMS 只高估不低估

CMS 的核心理论保证可以用一句话概括:

对于任意元素 e,真实频率 f(e) 和估计值 f̂(e) 满足: f(e) ≤ f̂(e) ≤ f(e) + ε·‖f‖₁ 其中 ‖f‖₁ 是所有元素频率之和(即数据流长度 N),以概率至少 1 - δ 成立。

下界证明:为什么不会低估

这部分是平凡的。对于元素 e,每次 e 出现在数据流中时,count[i][h_i(e)] 都会加 1。其他元素的出现只会增加(而非减少)这些计数器的值。因此:

count[i][h_i(e)] ≥ f(e),对所有 i 成立

取最小值后:

f̂(e) = min_i count[i][h_i(e)] ≥ f(e)

上界证明:Markov 不等式 + 独立放大

定义随机变量 X_i 为行 i 中与 e 碰撞的”噪音”:

X_i = count[i][h_i(e)] - f(e) = Σ_{j ≠ e, h_i(j) = h_i(e)} f(j)

对于一个好的哈希函数,碰撞概率为 1/w。由期望的线性性:

E[X_i] = Σ_{j ≠ e} f(j) · Pr[h_i(j) = h_i(e)]
       = Σ_{j ≠ e} f(j) · (1/w)
       ≤ ‖f‖₁ / w

当 w = ⌈e/ε⌉ 时:

E[X_i] ≤ ‖f‖₁ · ε / e

由 Markov 不等式:

Pr[X_i ≥ ε · ‖f‖₁] ≤ E[X_i] / (ε · ‖f‖₁) ≤ 1/e

也就是说,每一行的碰撞噪音超过 ε·‖f‖₁ 的概率不超过 1/e ≈ 0.368。

由于 d 行是独立的(哈希函数独立),所有行同时碰撞过大的概率为:

Pr[f̂(e) - f(e) ≥ ε · ‖f‖₁] = Pr[所有行的 X_i 都 ≥ ε · ‖f‖₁]
                                ≤ (1/e)^d

当 d = ⌈ln(1/δ)⌉ 时:

(1/e)^d = e^{-d} ≤ e^{-ln(1/δ)} = δ

证毕。这就是为什么参数设为 w = ⌈e/ε⌉ 和 d = ⌈ln(1/δ)⌉。

关于 L1 范数

误差上界中的 ‖f‖₁ 是数据流中所有元素频率的绝对值之和。在纯计数场景下,它就等于数据流长度 N。这意味着:

这是 CMS 的一个固有限制:它更擅长估计”大象”(高频元素),而非”老鼠”(低频元素)。

五、三种查询类型

CMS 不仅支持点查询,还可以扩展到范围查询和内积查询。

Point Query(点查询)

这是最基本的查询:给定元素 e,估计 f(e)。

f̂(e) = min_i count[i][h_i(e)]

我们已经详细讨论过了。

Range Query(范围查询)

给定一个范围 [l, r],估计所有元素 e 满足 l ≤ e ≤ r 的频率之和。

直觉上,可以将范围分解为 O(log U) 个二进制区间(U 是元素空间大小),每个区间对应一个 CMS 实例。这种技术叫做 Dyadic Ranges

具体做法:

  1. 构建 log U + 1 层 CMS:第 0 层对应单个元素,第 1 层对应长度为 2 的区间,第 k 层对应长度为 2^k 的区间。
  2. 每个元素到达时,更新它所属的所有层的 CMS(共 log U + 1 个 CMS)。
  3. 查询 [l, r] 时,将其分解为 O(log U) 个二进制对齐的区间,对每个区间查询对应层的 CMS,求和。

误差保证变为:

Pr[|f̂([l,r]) - f([l,r])| ≤ ε · log(U) · ‖f‖₁] ≥ 1 - δ · log(U)

空间乘以 O(log U),误差也乘以 O(log U)。在实践中,如果范围查询不是核心需求,通常不值得这个开销。

Inner Product Query(内积查询)

给定两个数据流的频率向量 f 和 g,估计它们的内积 Σ_e f(e)·g(e)。

做法:对两个数据流分别维护 CMS(使用相同的哈希函数),查询时:

⟨f̂, ĝ⟩ = min_i Σ_j count_f[i][j] · count_g[i][j]

即对每一行计算两个 CMS 对应位置的逐元素乘积之和,然后取行间最小值。

内积查询在以下场景中很有用:

六、Conservative Update:减少高估的改进策略

标准 CMS 在更新时”无脑”地给所有 d 个位置加 1。但其中某些位置可能已经因为碰撞被高估了,继续加 1 会使高估更严重。

Conservative Update(保守更新) 是由 Estan 和 Varghese 在 2002 年提出的改进策略。核心思想是:更新时只增加那些”真正需要增加”的计数器。

算法描述

对于到达的元素 e(权重为 1):

1. 先查询当前估计值:est = min_i count[i][h_i(e)]
2. 对于每个 i = 0, 1, ..., d-1:
       count[i][h_i(e)] = max(count[i][h_i(e)], est + 1)

也就是说,只把那些值不超过 est + 1 的计数器提升到 est + 1,而不是无条件加 1。

为什么有效

考虑一个计数器 count[i][h_i(e)],它因为碰撞已经达到了 10,而 e 的真实频率只有 3。标准更新会把它从 10 变成 11,但保守更新会把它保持在 10(因为 est + 1 = 4 < 10)。

这样做的效果是:

  1. 减少了高估的程度。
  2. 不会影响”只高估不低估”的性质。
  3. 不增加时间或空间开销。

理论与实践

保守更新在理论上不改变最坏情况的误差保证,但在实践中显著减少了平均误差。在 Zipf 分布等有偏分布下,改善尤为明显。

我个人认为,没有理由不使用保守更新。它的实现代价几乎为零,收益却很可观。唯一的”代价”是更新操作从一次写入变成了”一次读+一次条件写”,在某些需要极致写入吞吐的场景(如 FPGA 流水线)中可能有影响。

保守更新的伪代码

void cms_add_conservative(CMS *cms, const char *key, uint32_t count) {
    uint32_t min_val = cms_query(cms, key);
    uint32_t new_val = min_val + count;
    for (uint32_t i = 0; i < cms->depth; i++) {
        uint32_t idx = hash(cms, i, key) % cms->width;
        if (cms->table[i][idx] < new_val) {
            cms->table[i][idx] = new_val;
        }
    }
}

七、Count-Min-Log Sketch:用对数计数器减少空间

标准 CMS 的每个计数器用 32 位或 64 位整数存储。当计数值很大时,低几位的精度其实并不重要——我们可以用对数尺度来压缩计数器。

Count-Min-Log Sketch 由 Pitel 和 Fouquier 在 2015 年提出,核心思想是将每个计数器替换为一个对数计数器(类似 Morris Counter)。

Morris Counter 回顾

Morris Counter 是一种概率计数器,用 c 位存储可以近似计数到 2{2c} 的数量级。其工作原理:

这样一个 8 位的计数器可以表示高达约 2^256 的计数,精度随计数增大而降低。

在 CMS 中的应用

将 CMS 的每个 32 位计数器替换为 8 位或 16 位的 Morris 计数器:

  1. 空间减少 2-4 倍:每个计数器从 4 字节降到 1-2 字节。
  2. 更新变为概率性的:不是每次都增加,而是以递减的概率增加。
  3. 查询时需要解码:将对数值转换回线性估计。

误差分析变为两层:CMS 本身的碰撞误差 + 对数计数的近似误差。

权衡

Count-Min-Log 在以下场景中有优势:

在一般服务器端应用中,我不太推荐使用 Count-Min-Log。内存通常不是瓶颈,而额外的概率误差和实现复杂度不值得。但在物联网、边缘计算等场景下,它是值得考虑的。

八、Count-Min Sketch vs Count Sketch:有偏 vs 无偏

Count Sketch(CS)由 Charikar、Chen 和 Farach-Colton 在 2002 年提出,比 CMS 早三年。两者经常被混淆,但有本质区别。

Count Sketch 的结构

CS 也使用 d × w 的矩阵和 d 个哈希函数。但它额外引入了 d 个 符号函数(sign function) s_1, s_2, …, s_d,每个将元素映射到 {+1, -1}。

更新

对于 i = 0, 1, ..., d-1:
    count[i][h_i(e)] += s_i(e)    (注意是加符号值,不是加 1)

查询

f̂(e) = median_i { s_i(e) · count[i][h_i(e)] }

注意取的是中位数(median),不是最小值。

核心区别

特性 Count-Min Sketch Count Sketch
更新 count += 1 count += sign(e)
查询 取最小值 取中位数
估计偏差 有偏(只高估) 无偏
误差范围 f(e) ± ε·‖f‖₁ f(e) ± ε·‖f‖₂
范数 L1 范数 L2 范数
支持删除 不支持 支持
支持负权重 不支持 支持

L1 vs L2 范数的影响

CMS 的误差与 ‖f‖₁(频率之和)成比例,CS 的误差与 ‖f‖₂(频率平方和的平方根)成比例。

在 Zipf 分布等有偏分布下,‖f‖₂ ≪ ‖f‖₁,所以 CS 的误差保证更紧。但 CMS 的”只高估”性质在某些场景中更有价值——例如,如果你用频率估计来检测超过阈值的重要元素,高估不会导致遗漏,而低估会。

实际选择建议

我个人在大多数场景下倾向于使用 CMS,因为:(1)实现更简单;(2)“只高估”的性质在异常检测中更安全;(3)配合 Conservative Update 后实际精度并不差。

九、Heavy Hitter 检测与 Space-Saving 联动

Heavy Hitter 问题是频率估计的一个重要应用:找出数据流中频率超过 φ·N 的所有元素(N 是数据流长度,φ 是给定的阈值)。

用 CMS 检测 Heavy Hitter

最直接的方法是将 CMS 与一个小的候选集结合:

1. 维护一个 CMS 和一个大小为 O(1/φ) 的候选集(哈希表)。
2. 每个元素到达时:
   a. 更新 CMS。
   b. 查询 CMS 估计。
   c. 如果估计值 ≥ φ·N,将元素加入候选集。
3. 输出时:对候选集中的每个元素用 CMS 验证,过滤掉估计值低于阈值的。

由于 CMS 只高估不低估,真正的 Heavy Hitter 一定会被发现(无假阴性)。但候选集中可能包含一些非 Heavy Hitter(假阳性),这由 CMS 的精度参数 ε 控制。

Space-Saving 算法

Space-Saving(Metwally et al., 2005)是另一种 Heavy Hitter 检测算法。它维护一个固定大小为 k 的”摘要”,其中存储 k 个 (元素, 计数) 对:

1. 元素到达时:
   a. 如果元素已在摘要中,将其计数加 1。
   b. 如果不在但摘要未满,加入并计数设为 1。
   c. 如果不在且摘要已满,替换计数最小的元素,并将其计数加 1。

Space-Saving 的优势是能给出频率的上界和下界,且实现也很简单。

CMS + Space-Saving 联合使用

在实际系统中,常见的做法是:

  1. 用 CMS 做粗粒度的频率估计,筛选出潜在的 Heavy Hitter。
  2. 对候选 Heavy Hitter 用 Space-Saving 或精确计数器做精细跟踪。

这种两层结构在网络流量分析中非常常见——第一层用硬件(FPGA/ASIC)上的 CMS 进行线速过滤,第二层用软件做精确分析。

十、实际系统中的应用

CMS 在工业界的应用远比学术论文中描述的广泛。以下是一些典型的应用场景。

网络流量监控

这是 CMS 最经典的应用场景。网络路由器需要实时统计每个源IP/目的IP/流的数据包数量或字节数。在 10Gbps 的链路上,每秒可能有数百万个数据包,精确计数不现实。

CMS 的优势在于:

推荐系统

在推荐系统中,CMS 用于:

例如,YouTube 的推荐系统曾使用类似 CMS 的结构来跟踪视频的观看次数,用于在推荐排序中纳入流行度信号。

数据库近似查询

多个数据库系统使用 CMS 来加速查询优化:

在查询优化器中,CMS 用于估计 JOIN 的输出大小、GROUP BY 的基数等。

自然语言处理

在大规模 NLP 任务中,CMS 用于:

分布式系统中的可合并性

CMS 的一个重要性质是可合并的(mergeable):两个使用相同哈希函数的 CMS 可以通过逐元素相加来合并。

merged[i][j] = cms_a[i][j] + cms_b[i][j]

这使得 CMS 天然适合 MapReduce / 分布式计算框架:每个节点维护自己的 CMS,最后合并。这个性质是 CMS 在 Spark、Flink 等系统中广泛使用的重要原因。

十一、完整 C 实现

下面是一个功能完整的 CMS C 实现,包含标准更新、保守更新、查询、合并等操作。

/*
 * count_min_sketch.c
 *
 * 一个功能完整的 Count-Min Sketch 实现。
 * 支持标准更新、保守更新、查询、合并。
 *
 * 编译:gcc -O2 -o cms count_min_sketch.c -lm
 * 运行:./cms
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <math.h>
#include <limits.h>

/* ================================================================
 * 哈希函数:使用 MurmurHash3 的 32 位变体
 * ================================================================ */

static uint32_t murmur3_32(const void *key, size_t len, uint32_t seed) {
    const uint8_t *data = (const uint8_t *)key;
    const int nblocks = (int)(len / 4);

    uint32_t h1 = seed;
    const uint32_t c1 = 0xcc9e2d51;
    const uint32_t c2 = 0x1b873593;

    /* body */
    const uint32_t *blocks = (const uint32_t *)(data + nblocks * 4);
    for (int i = -nblocks; i; i++) {
        uint32_t k1;
        memcpy(&k1, &blocks[i], sizeof(k1));

        k1 *= c1;
        k1 = (k1 << 15) | (k1 >> 17);
        k1 *= c2;

        h1 ^= k1;
        h1 = (h1 << 13) | (h1 >> 19);
        h1 = h1 * 5 + 0xe6546b64;
    }

    /* tail */
    const uint8_t *tail = (const uint8_t *)(data + nblocks * 4);
    uint32_t k1 = 0;
    switch (len & 3) {
        case 3: k1 ^= (uint32_t)tail[2] << 16; /* fall through */
        case 2: k1 ^= (uint32_t)tail[1] << 8;  /* fall through */
        case 1: k1 ^= (uint32_t)tail[0];
                k1 *= c1;
                k1 = (k1 << 15) | (k1 >> 17);
                k1 *= c2;
                h1 ^= k1;
    }

    /* finalization */
    h1 ^= (uint32_t)len;
    h1 ^= h1 >> 16;
    h1 *= 0x85ebca6b;
    h1 ^= h1 >> 13;
    h1 *= 0xc2b2ae35;
    h1 ^= h1 >> 16;

    return h1;
}

/* ================================================================
 * Count-Min Sketch 数据结构
 * ================================================================ */

typedef struct {
    uint32_t  width;       /* w:每行的列数 */
    uint32_t  depth;       /* d:行数(哈希函数个数) */
    uint64_t  total;       /* 所有更新的权重之和 */
    uint32_t *seeds;       /* 每个哈希函数的种子 */
    uint32_t *table;       /* d × w 的二维数组(展平存储) */
} CMS;

/* 通过 (ε, δ) 参数创建 CMS */
CMS *cms_create(double epsilon, double delta) {
    CMS *cms = (CMS *)calloc(1, sizeof(CMS));
    if (!cms) return NULL;

    cms->width = (uint32_t)ceil(M_E / epsilon);
    cms->depth = (uint32_t)ceil(log(1.0 / delta));
    cms->total = 0;

    if (cms->depth < 1) cms->depth = 1;
    if (cms->width < 1) cms->width = 1;

    cms->seeds = (uint32_t *)malloc(cms->depth * sizeof(uint32_t));
    cms->table = (uint32_t *)calloc((size_t)cms->width * cms->depth, sizeof(uint32_t));

    if (!cms->seeds || !cms->table) {
        free(cms->seeds);
        free(cms->table);
        free(cms);
        return NULL;
    }

    /* 用简单的线性序列作为种子 */
    for (uint32_t i = 0; i < cms->depth; i++) {
        cms->seeds[i] = i * 0x9E3779B9 + 0x12345678;
    }

    return cms;
}

/* 通过 (width, depth) 直接创建 CMS */
CMS *cms_create_wd(uint32_t width, uint32_t depth) {
    CMS *cms = (CMS *)calloc(1, sizeof(CMS));
    if (!cms) return NULL;

    cms->width = width;
    cms->depth = depth;
    cms->total = 0;

    cms->seeds = (uint32_t *)malloc(depth * sizeof(uint32_t));
    cms->table = (uint32_t *)calloc((size_t)width * depth, sizeof(uint32_t));

    if (!cms->seeds || !cms->table) {
        free(cms->seeds);
        free(cms->table);
        free(cms);
        return NULL;
    }

    for (uint32_t i = 0; i < depth; i++) {
        cms->seeds[i] = i * 0x9E3779B9 + 0x12345678;
    }

    return cms;
}

void cms_destroy(CMS *cms) {
    if (cms) {
        free(cms->seeds);
        free(cms->table);
        free(cms);
    }
}

/* 计算元素在第 row 行的列索引 */
static inline uint32_t cms_hash(const CMS *cms, uint32_t row,
                                const char *key, size_t keylen) {
    return murmur3_32(key, keylen, cms->seeds[row]) % cms->width;
}

/* 标准更新:每个计数器无条件加 count */
void cms_add(CMS *cms, const char *key, size_t keylen, uint32_t count) {
    cms->total += count;
    for (uint32_t i = 0; i < cms->depth; i++) {
        uint32_t col = cms_hash(cms, i, key, keylen);
        cms->table[i * cms->width + col] += count;
    }
}

/* 点查询:取所有行中的最小值 */
uint32_t cms_query(const CMS *cms, const char *key, size_t keylen) {
    uint32_t min_val = UINT32_MAX;
    for (uint32_t i = 0; i < cms->depth; i++) {
        uint32_t col = cms_hash(cms, i, key, keylen);
        uint32_t val = cms->table[i * cms->width + col];
        if (val < min_val) {
            min_val = val;
        }
    }
    return min_val;
}

/* 保守更新:只增加"需要增加"的计数器 */
void cms_add_conservative(CMS *cms, const char *key, size_t keylen,
                          uint32_t count) {
    uint32_t cur_min = cms_query(cms, key, keylen);
    uint32_t new_val = cur_min + count;

    cms->total += count;
    for (uint32_t i = 0; i < cms->depth; i++) {
        uint32_t col = cms_hash(cms, i, key, keylen);
        uint32_t idx = i * cms->width + col;
        if (cms->table[idx] < new_val) {
            cms->table[idx] = new_val;
        }
    }
}

/* 合并两个 CMS(使用相同的哈希函数) */
int cms_merge(CMS *dest, const CMS *src) {
    if (dest->width != src->width || dest->depth != src->depth) {
        return -1; /* 维度不匹配 */
    }
    dest->total += src->total;
    size_t n = (size_t)dest->width * dest->depth;
    for (size_t i = 0; i < n; i++) {
        dest->table[i] += src->table[i];
    }
    return 0;
}

/* 内积查询 */
uint64_t cms_inner_product(const CMS *a, const CMS *b) {
    if (a->width != b->width || a->depth != b->depth) {
        return UINT64_MAX;
    }
    uint64_t min_product = UINT64_MAX;
    for (uint32_t i = 0; i < a->depth; i++) {
        uint64_t row_product = 0;
        for (uint32_t j = 0; j < a->width; j++) {
            uint32_t idx = i * a->width + j;
            row_product += (uint64_t)a->table[idx] * b->table[idx];
        }
        if (row_product < min_product) {
            min_product = row_product;
        }
    }
    return min_product;
}

/* 打印 CMS 的统计信息 */
void cms_info(const CMS *cms) {
    size_t mem = sizeof(CMS)
               + cms->depth * sizeof(uint32_t)
               + (size_t)cms->width * cms->depth * sizeof(uint32_t);
    printf("Count-Min Sketch:\n");
    printf("  width  = %u\n", cms->width);
    printf("  depth  = %u\n", cms->depth);
    printf("  total  = %lu\n", (unsigned long)cms->total);
    printf("  memory = %zu bytes (%.2f KB)\n", mem, (double)mem / 1024.0);
}

/* ================================================================
 * 测试与示例
 * ================================================================ */

int main(void) {
    /* 创建一个 ε=0.001, δ=0.01 的 CMS */
    CMS *cms = cms_create(0.001, 0.01);
    if (!cms) {
        fprintf(stderr, "Failed to create CMS\n");
        return 1;
    }

    cms_info(cms);
    printf("\n");

    /* 模拟一个 Zipf 分布的数据流 */
    const char *elements[] = {
        "apple", "banana", "cherry", "date", "elderberry",
        "fig", "grape", "honeydew", "kiwi", "lemon"
    };
    int frequencies[] = {
        10000, 5000, 2500, 1250, 625,
        312, 156, 78, 39, 20
    };
    int n = 10;

    printf("Inserting elements (Zipf distribution)...\n");

    for (int i = 0; i < n; i++) {
        const char *elem = elements[i];
        size_t len = strlen(elem);
        for (int j = 0; j < frequencies[i]; j++) {
            cms_add(cms, elem, len, 1);
        }
    }

    printf("\nStandard update results:\n");
    printf("%-15s %10s %10s %10s\n", "Element", "True", "Estimated", "Error");
    printf("%-15s %10s %10s %10s\n", "-------", "----", "---------", "-----");

    for (int i = 0; i < n; i++) {
        const char *elem = elements[i];
        uint32_t est = cms_query(cms, elem, strlen(elem));
        int error = (int)est - frequencies[i];
        printf("%-15s %10d %10u %+10d\n", elem, frequencies[i], est, error);
    }

    /* 查询一个不存在的元素 */
    const char *missing = "watermelon";
    uint32_t missing_est = cms_query(cms, missing, strlen(missing));
    printf("%-15s %10d %10u %+10d\n", missing, 0, missing_est, (int)missing_est);

    /* 测试保守更新 */
    printf("\n--- Conservative Update Test ---\n");
    CMS *cms_cu = cms_create(0.001, 0.01);

    for (int i = 0; i < n; i++) {
        const char *elem = elements[i];
        size_t len = strlen(elem);
        for (int j = 0; j < frequencies[i]; j++) {
            cms_add_conservative(cms_cu, elem, len, 1);
        }
    }

    printf("%-15s %10s %10s %10s\n", "Element", "True", "Estimated", "Error");
    printf("%-15s %10s %10s %10s\n", "-------", "----", "---------", "-----");

    for (int i = 0; i < n; i++) {
        const char *elem = elements[i];
        uint32_t est = cms_query(cms_cu, elem, strlen(elem));
        int error = (int)est - frequencies[i];
        printf("%-15s %10d %10u %+10d\n", elem, frequencies[i], est, error);
    }

    /* 测试合并 */
    printf("\n--- Merge Test ---\n");
    CMS *cms_a = cms_create_wd(cms->width, cms->depth);
    CMS *cms_b = cms_create_wd(cms->width, cms->depth);
    /* 确保使用相同的种子 */
    memcpy(cms_a->seeds, cms->seeds, cms->depth * sizeof(uint32_t));
    memcpy(cms_b->seeds, cms->seeds, cms->depth * sizeof(uint32_t));

    /* 分片插入 */
    for (int i = 0; i < n / 2; i++) {
        const char *elem = elements[i];
        size_t len = strlen(elem);
        for (int j = 0; j < frequencies[i]; j++) {
            cms_add(cms_a, elem, len, 1);
        }
    }
    for (int i = n / 2; i < n; i++) {
        const char *elem = elements[i];
        size_t len = strlen(elem);
        for (int j = 0; j < frequencies[i]; j++) {
            cms_add(cms_b, elem, len, 1);
        }
    }

    cms_merge(cms_a, cms_b);

    printf("Merged result for 'apple':  %u (expected %d)\n",
           cms_query(cms_a, "apple", 5), frequencies[0]);
    printf("Merged result for 'grape':  %u (expected %d)\n",
           cms_query(cms_a, "grape", 5), frequencies[6]);

    /* 清理 */
    cms_destroy(cms);
    cms_destroy(cms_cu);
    cms_destroy(cms_a);
    cms_destroy(cms_b);

    printf("\nDone.\n");
    return 0;
}

这份实现包含以下功能:

  1. MurmurHash3 哈希函数,用不同种子实现多个独立的哈希。
  2. 标准更新cms_add)和 保守更新cms_add_conservative)。
  3. 点查询cms_query)和 内积查询cms_inner_product)。
  4. 合并cms_merge)操作。
  5. 一个完整的测试程序,模拟 Zipf 分布并比较标准更新和保守更新的效果。

十二、工程实践要点

哈希函数的选择

哈希函数 速度 分布质量 备注
MurmurHash3 优秀 最常用,用不同种子实现多个哈希
xxHash 极快 优秀 适合吞吐量敏感的场景
CityHash 优秀 Google 出品
SipHash 中等 优秀 抗哈希洪水攻击
FNV-1a 一般 简单但分布质量略差
多项式哈希 理论保证 h(x) = (ax+b) mod p mod w

在实践中,MurmurHash3 + 不同种子 是最简单且效果最好的方案。理论上需要两两独立(pairwise independent)的哈希族,多项式哈希 h(x) = ((a·x + b) mod p) mod w 就够了,但实际中非密码学哈希函数的效果更好。

计数器溢出

32 位计数器最大值约 42 亿,对于大多数应用足够。但在以下场景中需要注意:

解决方案:

  1. 使用 64 位计数器(空间翻倍)。
  2. 定期”衰减”:将所有计数器除以 2(类似指数衰减),保持相对比例。
  3. 使用饱和计数器(到达最大值后不再增加)。

工程陷阱表

陷阱 症状 解决方案
哈希种子相同 所有行完全相同,等于只有一行 确保每行使用不同的种子或不同的哈希函数
w 和 d 设反了 空间浪费或精度极差 w(宽)远大于 d(深),典型比例 1000:5
忘记计入总权重 无法计算相对频率 维护 total 字段
合并时哈希不一致 结果完全错误 确保所有分片使用相同的哈希函数和种子
对低频元素过度信任 估计值远高于真实值 CMS 对低频元素误差大,不适合精确统计低频元素
线程不安全 计数丢失或竞争 每行用原子操作,或每个线程维护独立 CMS 后合并
字节序问题 跨平台合并结果错误 统一字节序或在合并前转换
只用一个哈希然后分桶 哈希值的高低位有相关性 每行用完全独立的哈希计算

参数选择指南

场景                   ε        δ        w       d      空间(4B 计数器)
----------------------------------------------------------------------
网络流量粗估计        0.01    0.05      272     3      ~3.2 KB
数据库查询优化        0.001   0.01     2718     5      ~53 KB
精确 Heavy Hitter    0.0001  0.001   27183     7      ~740 KB
高精度科学计算       0.00001 0.0001  271829    10     ~10.3 MB

关于内存对齐

在高性能场景中,CMS 的矩阵行应该对齐到缓存行(通常 64 字节)。这可以避免伪共享(false sharing)并提高 CPU 缓存命中率。

/* 将 width 对齐到 16 的倍数(假设 4 字节计数器,64 字节缓存行) */
uint32_t aligned_width = (width + 15) & ~15u;

十三、我的个人看法

写到这里,我想分享一些关于 CMS 的个人思考。

CMS 是否被高估了

在学术界,CMS 是引用量最高的流式算法之一。但在工业实践中,我发现很多场景其实不需要 CMS 的全部能力:

CMS 真正的甜蜜点是:你需要在严格的内存约束下,对任意元素进行实时频率查询,且数据流不能回溯。不是所有场景都满足这些条件。

保守更新应该是默认行为

我前面说过,没有理由不使用保守更新。让我再强调一次:如果你在实现 CMS 库,保守更新应该是默认选项,标准更新是”兼容模式”。现有的很多 CMS 库(包括一些知名的)默认使用标准更新,这是一个历史遗留问题。

哈希函数质量比数量重要

理论上需要 d 个”完全独立”的哈希函数,但实践中使用一个好的哈希函数配不同种子就够了。我见过有人为了”独立性”用了 d 个不同的哈希算法,反而引入了实现复杂度和维护负担。

MurmurHash3 + 不同种子,或者用两个独立的哈希做线性组合(h_i(x) = h1(x) + i * h2(x)),都是实践中验证过的好方案。后一种方法来自 Kirsch 和 Mitzenmacher 2006 年的论文,只需要两次哈希计算就能生成任意多个”看起来独立”的哈希值。

CMS 在机器学习中的应用被低估了

Feature hashing(特征哈希)本质上就是一个单行的 CMS。如果你在做大规模稀疏特征的在线学习,CMS 可以用来:

  1. 监控特征的更新频率,自适应调整学习率。
  2. 检测特征漂移(feature drift)。
  3. 在分布式训练中近似同步特征统计。

这些应用在学术论文中讨论不多,但在实际系统中很有价值。

什么时候不该用 CMS

最后列几个不适合用 CMS 的场景:

  1. 需要精确计数:如果误差不可接受,老老实实用哈希表。
  2. 需要枚举元素:CMS 不能告诉你有哪些元素,只能回答”这个元素出现了多少次”。
  3. 数据流可以多遍扫描:如果数据在磁盘上,精确计数可能更划算。
  4. 低频元素很重要:CMS 对低频元素的估计很差,此时考虑 Count Sketch 或 Lossy Counting。

十四、总结与延伸

Count-Min Sketch 是流式算法工具箱中的基础构件。它的设计体现了概率数据结构的核心哲学:用少量空间和可控的误差换取处理无限数据流的能力

本文的要点回顾:

  1. CMS 是一个 d × w 的计数器矩阵,用 d 个独立哈希函数将元素映射到 d 行。
  2. 更新时 d 个计数器各加 1,查询时取 d 个计数器的最小值。
  3. 参数 w = ⌈e/ε⌉,d = ⌈ln(1/δ)⌉,空间与数据流长度无关。
  4. CMS 只会高估不会低估,误差上界为 ε·‖f‖₁,概率至少 1-δ。
  5. 保守更新在不增加开销的情况下显著减少高估。
  6. Count Sketch 提供无偏估计和更紧的 L2 范数保证,但不具备单边误差性质。
  7. CMS 可合并,天然适合分布式计算。

如果你对流式算法感兴趣,建议进一步阅读以下主题:

参考文献

  1. Cormode, G., & Muthukrishnan, S. (2005). An improved data stream summary: the count-min sketch and its applications. Journal of Algorithms, 55(1), 58-75.
  2. Charikar, M., Chen, K., & Farach-Colton, M. (2002). Finding frequent items in data streams. Proceedings of the 29th International Colloquium on Automata, Languages and Programming (ICALP).
  3. Estan, C., & Varghese, G. (2002). New directions in traffic measurement and accounting. Proceedings of the 2002 conference on Applications, technologies, architectures, and protocols for computer communications (SIGCOMM).
  4. Metwally, A., Agrawal, D., & El Abbadi, A. (2005). Efficient computation of frequent and top-k elements in data streams. Proceedings of the 10th International Conference on Database Theory (ICDT).
  5. Pitel, G., & Fouquier, G. (2015). Count-Min-Log sketch: Approximately counting with approximate counters. arXiv preprint arXiv:1502.04885.
  6. Kirsch, A., & Mitzenmacher, M. (2006). Less hashing, same performance: Building a better Bloom filter. European Symposium on Algorithms (ESA).
  7. Cormode, G., & Hadjieleftheriou, M. (2008). Finding frequent items in data streams. Proceedings of the VLDB Endowment, 1(2), 1530-1541.
  8. Muthukrishnan, S. (2005). Data streams: Algorithms and applications. Foundations and Trends in Theoretical Computer Science, 1(2), 117-236.
  9. Morris, R. (1978). Counting large numbers of events in small registers. Communications of the ACM, 21(10), 840-842.
  10. Flajolet, P. (1985). Approximate counting: a detailed analysis. BIT Numerical Mathematics, 25(1), 113-134.

算法系列导航上一篇:HyperLogLog | 下一篇:t-digest

相关阅读频率估计的理论极限 | 流式算法总论


By .