一、从一个简单问题说起:这个元素出现了多少次
在数据流的世界里,有一类看似简单的问题:给定一个元素,它在数据流中出现了多少次?
这个问题被称为 频率估计(Frequency Estimation) 或 点查询(Point Query)。精确回答它很容易——维护一个哈希表,key 是元素,value 是计数。但在以下场景中,精确计数变得不可行:
- 数据流无限长:网络路由器每秒处理数百万个数据包,流永远不会停止。
- 元素空间巨大:IP 地址有 2^32 种可能,URL 的空间更是无限。
- 内存有限:嵌入式设备或 FPGA 上只有 KB 级别的 SRAM。
- 单遍扫描:数据只能看一次,不能回溯。
如果数据流中有 n 个不同的元素,精确计数需要 O(n) 的空间。当 n 达到数十亿甚至更大时,这个开销是不可接受的。
我们需要的是一种 亚线性(sublinear) 空间的数据结构——用远小于 O(n) 的空间,给出一个”足够好”的近似答案。
Count-Min Sketch(以下简称 CMS)正是为这个问题而生的。它由 Graham Cormode 和 S. Muthukrishnan 在 2005 年的论文 “An Improved Data Stream Summary: The Count-Min Sketch and its Applications” 中提出。这个数据结构简洁优美,实现简单,理论保证强大,已经成为流式算法工具箱中最常用的工具之一。
在这一篇中,我会从问题定义出发,完整推导 CMS 的结构、操作、误差保证,讨论它的变体和改进,给出一份可以直接编译运行的 C 实现,并结合实际工程经验分享我的看法。
二、Count-Min Sketch 的结构
CMS 的核心是一个二维计数器数组,加上一组独立的哈希函数。
基本组成
CMS 由两个参数决定:
- w(宽度):每行的列数
- d(深度):行数,也就是哈希函数的个数
数据结构包含:
- 一个
d × w的二维整数数组count[d][w],所有元素初始化为 0。 - d 个独立的哈希函数
h_1, h_2, ..., h_d,每个将元素映射到{0, 1, ..., w-1}。
可以将其想象成一个矩阵:每一行对应一个哈希函数,每一列对应一个桶。每个元素通过 d 个哈希函数被映射到 d 个不同行中的 d 个桶。
列 0 列 1 列 2 列 3 列 4 ... 列 w-1
行 0 (h₁) [ ] [ ] [ ] [ ] [ ] ... [ ]
行 1 (h₂) [ ] [ ] [ ] [ ] [ ] ... [ ]
行 2 (h₃) [ ] [ ] [ ] [ ] [ ] ... [ ]
...
行 d-1(hd) [ ] [ ] [ ] [ ] [ ] ... [ ]
参数的选择
w 和 d 并不是随意选择的。它们由两个精度参数 ε(epsilon)和 δ(delta)决定:
- ε:误差上界。查询结果最多比真实值大 ε·N,其中 N 是数据流中所有元素的总频率之和。
- δ:失败概率上界。上述误差保证以至少 1-δ 的概率成立。
参数的设定公式:
w = ⌈e / ε⌉ (e 是自然常数 ≈ 2.718)
d = ⌈ln(1/δ)⌉
总空间为 w × d 个计数器。例如,设 ε =
0.001,δ = 0.01:
w = ⌈2.718 / 0.001⌉ = 2718
d = ⌈ln(100)⌉ = ⌈4.605⌉ = 5
总共只需要 2718 × 5 = 13590 个计数器。如果每个计数器用 4 字节,总共约 53 KB。这个空间可以跟踪任意数量的不同元素,精度为千分之一。
与 Bloom Filter 的结构对比
CMS 的结构和 Bloom Filter 有几分相似:
| 特征 | Bloom Filter | Count-Min Sketch |
|---|---|---|
| 底层数组 | 一维比特数组 | 二维整数数组(d × w) |
| 哈希函数 | k 个 | d 个 |
| 存储内容 | 0 或 1 | 计数值 |
| 回答的问题 | 元素是否存在 | 元素出现了多少次 |
| 假阳性 | 会误报”存在” | 会高估频率 |
| 假阴性 | 不会 | 不会低估频率 |
可以把 CMS 看作 Bloom Filter 从”存在性”到”频率”的自然推广。
三、更新与查询操作
更新操作(Update / Add)
当数据流中到达一个元素 e 时,执行以下操作:
对于 i = 0, 1, ..., d-1:
count[i][h_i(e)] += 1
也就是说,将 e 通过每个哈希函数映射到对应行的一个列,然后将这 d 个位置的计数器各加 1。
更新的时间复杂度是 O(d),因为需要计算 d 个哈希值并执行 d 次加法。
更一般地,CMS 支持加权更新。当元素 e 带有权重 c 时:
对于 i = 0, 1, ..., d-1:
count[i][h_i(e)] += c
这使得 CMS 可以用于统计字节数、金额等非单位权重的场景。
查询操作(Query / Estimate)
要估计元素 e 的频率,执行以下操作:
f̂(e) = min{count[i][h_i(e)] : i = 0, 1, ..., d-1}
也就是说,找到 e 在 d 行中对应的 d 个计数器的值,取其中的最小值作为频率估计。
为什么取最小值?因为每个计数器可能因为哈希碰撞而被其他元素”污染”(增加),所以每个计数器的值都 大于等于 真实频率。取最小值可以最大程度地减少碰撞带来的高估。
查询的时间复杂度同样是 O(d)。
一个具体的例子
假设 w = 5,d = 3,数据流为
[a, b, a, c, a, b, d, a]。
哈希函数的映射(假设):
h₁: a→2, b→0, c→3, d→1
h₂: a→1, b→4, c→1, d→3
h₃: a→3, b→2, c→0, d→4
处理完所有元素后,矩阵状态:
列0 列1 列2 列3 列4
行0(h₁): [2] [1] [4] [1] [0] ← b=2, d=1, a=4, c=1
行1(h₂): [0] [5] [0] [1] [2] ← a+c=5, d=1, b=2
行2(h₃): [1] [0] [2] [4] [1] ← c=1, b=2, a=4, d=1
查询 a 的频率:
f̂(a) = min(count[0][2], count[1][1], count[2][3])
= min(4, 5, 4)
= 4 ← 真实值就是 4,精确!
查询 c 的频率:
f̂(c) = min(count[0][3], count[1][1], count[2][0])
= min(1, 5, 1)
= 1 ← 真实值就是 1,精确!
注意 count[1][1] 的值是 5,因为 a 和 c 在行
1 碰撞了(都映射到列
1)。但取最小值后,碰撞的影响被其他行”纠正”了。
删除操作
标准 CMS 不支持删除。因为将计数器减 1 可能导致某些计数器变为负数(当碰撞元素被”过度删除”时),破坏”只高估不低估”的性质。
如果确实需要删除功能,可以使用 Count Sketch(下文讨论)或维护两个 CMS(一个跟踪插入,一个跟踪删除),但这会使误差分析复杂化。
四、误差分析:为什么 CMS 只高估不低估
CMS 的核心理论保证可以用一句话概括:
对于任意元素 e,真实频率 f(e) 和估计值 f̂(e) 满足: f(e) ≤ f̂(e) ≤ f(e) + ε·‖f‖₁ 其中 ‖f‖₁ 是所有元素频率之和(即数据流长度 N),以概率至少 1 - δ 成立。
下界证明:为什么不会低估
这部分是平凡的。对于元素 e,每次 e
出现在数据流中时,count[i][h_i(e)] 都会加
1。其他元素的出现只会增加(而非减少)这些计数器的值。因此:
count[i][h_i(e)] ≥ f(e),对所有 i 成立
取最小值后:
f̂(e) = min_i count[i][h_i(e)] ≥ f(e)
上界证明:Markov 不等式 + 独立放大
定义随机变量 X_i 为行 i 中与 e
碰撞的”噪音”:
X_i = count[i][h_i(e)] - f(e) = Σ_{j ≠ e, h_i(j) = h_i(e)} f(j)
对于一个好的哈希函数,碰撞概率为 1/w。由期望的线性性:
E[X_i] = Σ_{j ≠ e} f(j) · Pr[h_i(j) = h_i(e)]
= Σ_{j ≠ e} f(j) · (1/w)
≤ ‖f‖₁ / w
当 w = ⌈e/ε⌉ 时:
E[X_i] ≤ ‖f‖₁ · ε / e
由 Markov 不等式:
Pr[X_i ≥ ε · ‖f‖₁] ≤ E[X_i] / (ε · ‖f‖₁) ≤ 1/e
也就是说,每一行的碰撞噪音超过 ε·‖f‖₁ 的概率不超过 1/e ≈ 0.368。
由于 d 行是独立的(哈希函数独立),所有行同时碰撞过大的概率为:
Pr[f̂(e) - f(e) ≥ ε · ‖f‖₁] = Pr[所有行的 X_i 都 ≥ ε · ‖f‖₁]
≤ (1/e)^d
当 d = ⌈ln(1/δ)⌉ 时:
(1/e)^d = e^{-d} ≤ e^{-ln(1/δ)} = δ
证毕。这就是为什么参数设为 w = ⌈e/ε⌉ 和 d = ⌈ln(1/δ)⌉。
关于 L1 范数
误差上界中的 ‖f‖₁ 是数据流中所有元素频率的绝对值之和。在纯计数场景下,它就等于数据流长度 N。这意味着:
- CMS 的误差是 加性的,与数据流总量成比例。
- 对于高频元素(频率远大于 ε·N 的),相对误差很小。
- 对于低频元素(频率接近或小于 ε·N 的),相对误差可能很大。
这是 CMS 的一个固有限制:它更擅长估计”大象”(高频元素),而非”老鼠”(低频元素)。
五、三种查询类型
CMS 不仅支持点查询,还可以扩展到范围查询和内积查询。
Point Query(点查询)
这是最基本的查询:给定元素 e,估计 f(e)。
f̂(e) = min_i count[i][h_i(e)]
我们已经详细讨论过了。
Range Query(范围查询)
给定一个范围 [l, r],估计所有元素 e 满足 l ≤ e ≤ r 的频率之和。
直觉上,可以将范围分解为 O(log U) 个二进制区间(U 是元素空间大小),每个区间对应一个 CMS 实例。这种技术叫做 Dyadic Ranges。
具体做法:
- 构建 log U + 1 层 CMS:第 0 层对应单个元素,第 1 层对应长度为 2 的区间,第 k 层对应长度为 2^k 的区间。
- 每个元素到达时,更新它所属的所有层的 CMS(共 log U + 1 个 CMS)。
- 查询 [l, r] 时,将其分解为 O(log U) 个二进制对齐的区间,对每个区间查询对应层的 CMS,求和。
误差保证变为:
Pr[|f̂([l,r]) - f([l,r])| ≤ ε · log(U) · ‖f‖₁] ≥ 1 - δ · log(U)
空间乘以 O(log U),误差也乘以 O(log U)。在实践中,如果范围查询不是核心需求,通常不值得这个开销。
Inner Product Query(内积查询)
给定两个数据流的频率向量 f 和 g,估计它们的内积 Σ_e f(e)·g(e)。
做法:对两个数据流分别维护 CMS(使用相同的哈希函数),查询时:
⟨f̂, ĝ⟩ = min_i Σ_j count_f[i][j] · count_g[i][j]
即对每一行计算两个 CMS 对应位置的逐元素乘积之和,然后取行间最小值。
内积查询在以下场景中很有用:
- 连接大小估计(Join Size Estimation):数据库中估计两个表的连接结果大小。
- 相似度计算:估计两个流之间的余弦相似度。
- 自连接大小(Self-Join Size):Σ f(e)^2,用于衡量数据流的偏斜程度。
六、Conservative Update:减少高估的改进策略
标准 CMS 在更新时”无脑”地给所有 d 个位置加 1。但其中某些位置可能已经因为碰撞被高估了,继续加 1 会使高估更严重。
Conservative Update(保守更新) 是由 Estan 和 Varghese 在 2002 年提出的改进策略。核心思想是:更新时只增加那些”真正需要增加”的计数器。
算法描述
对于到达的元素 e(权重为 1):
1. 先查询当前估计值:est = min_i count[i][h_i(e)]
2. 对于每个 i = 0, 1, ..., d-1:
count[i][h_i(e)] = max(count[i][h_i(e)], est + 1)
也就是说,只把那些值不超过 est + 1
的计数器提升到 est + 1,而不是无条件加 1。
为什么有效
考虑一个计数器
count[i][h_i(e)],它因为碰撞已经达到了 10,而 e
的真实频率只有 3。标准更新会把它从 10 变成
11,但保守更新会把它保持在 10(因为 est + 1 = 4 <
10)。
这样做的效果是:
- 减少了高估的程度。
- 不会影响”只高估不低估”的性质。
- 不增加时间或空间开销。
理论与实践
保守更新在理论上不改变最坏情况的误差保证,但在实践中显著减少了平均误差。在 Zipf 分布等有偏分布下,改善尤为明显。
我个人认为,没有理由不使用保守更新。它的实现代价几乎为零,收益却很可观。唯一的”代价”是更新操作从一次写入变成了”一次读+一次条件写”,在某些需要极致写入吞吐的场景(如 FPGA 流水线)中可能有影响。
保守更新的伪代码
void cms_add_conservative(CMS *cms, const char *key, uint32_t count) {
uint32_t min_val = cms_query(cms, key);
uint32_t new_val = min_val + count;
for (uint32_t i = 0; i < cms->depth; i++) {
uint32_t idx = hash(cms, i, key) % cms->width;
if (cms->table[i][idx] < new_val) {
cms->table[i][idx] = new_val;
}
}
}七、Count-Min-Log Sketch:用对数计数器减少空间
标准 CMS 的每个计数器用 32 位或 64 位整数存储。当计数值很大时,低几位的精度其实并不重要——我们可以用对数尺度来压缩计数器。
Count-Min-Log Sketch 由 Pitel 和 Fouquier 在 2015 年提出,核心思想是将每个计数器替换为一个对数计数器(类似 Morris Counter)。
Morris Counter 回顾
Morris Counter 是一种概率计数器,用 c 位存储可以近似计数到 2{2c} 的数量级。其工作原理:
- 计数器存储的是”对数级别”的值 v。
- 代表的近似计数为 2^v - 1。
- 增加 1 时:以概率 1/2^v 将 v 加 1,否则不变。
这样一个 8 位的计数器可以表示高达约 2^256 的计数,精度随计数增大而降低。
在 CMS 中的应用
将 CMS 的每个 32 位计数器替换为 8 位或 16 位的 Morris 计数器:
- 空间减少 2-4 倍:每个计数器从 4 字节降到 1-2 字节。
- 更新变为概率性的:不是每次都增加,而是以递减的概率增加。
- 查询时需要解码:将对数值转换回线性估计。
误差分析变为两层:CMS 本身的碰撞误差 + 对数计数的近似误差。
权衡
Count-Min-Log 在以下场景中有优势:
- 需要跟踪的计数器数量非常大(数千万级别)。
- 内存预算极其有限(如嵌入式设备)。
- 可以容忍额外的概率误差。
在一般服务器端应用中,我不太推荐使用 Count-Min-Log。内存通常不是瓶颈,而额外的概率误差和实现复杂度不值得。但在物联网、边缘计算等场景下,它是值得考虑的。
八、Count-Min Sketch vs Count Sketch:有偏 vs 无偏
Count Sketch(CS)由 Charikar、Chen 和 Farach-Colton 在 2002 年提出,比 CMS 早三年。两者经常被混淆,但有本质区别。
Count Sketch 的结构
CS 也使用 d × w 的矩阵和 d 个哈希函数。但它额外引入了 d 个 符号函数(sign function) s_1, s_2, …, s_d,每个将元素映射到 {+1, -1}。
更新:
对于 i = 0, 1, ..., d-1:
count[i][h_i(e)] += s_i(e) (注意是加符号值,不是加 1)
查询:
f̂(e) = median_i { s_i(e) · count[i][h_i(e)] }
注意取的是中位数(median),不是最小值。
核心区别
| 特性 | Count-Min Sketch | Count Sketch |
|---|---|---|
| 更新 | count += 1 | count += sign(e) |
| 查询 | 取最小值 | 取中位数 |
| 估计偏差 | 有偏(只高估) | 无偏 |
| 误差范围 | f(e) ± ε·‖f‖₁ | f(e) ± ε·‖f‖₂ |
| 范数 | L1 范数 | L2 范数 |
| 支持删除 | 不支持 | 支持 |
| 支持负权重 | 不支持 | 支持 |
L1 vs L2 范数的影响
CMS 的误差与 ‖f‖₁(频率之和)成比例,CS 的误差与 ‖f‖₂(频率平方和的平方根)成比例。
在 Zipf 分布等有偏分布下,‖f‖₂ ≪ ‖f‖₁,所以 CS 的误差保证更紧。但 CMS 的”只高估”性质在某些场景中更有价值——例如,如果你用频率估计来检测超过阈值的重要元素,高估不会导致遗漏,而低估会。
实际选择建议
- 如果需要单边误差保证(只高估),选 CMS。
- 如果需要无偏估计或支持删除,选 CS。
- 如果数据分布高度偏斜且关心精度,CS 通常更好。
- 如果实现简单性是首要考虑,选 CMS。
我个人在大多数场景下倾向于使用 CMS,因为:(1)实现更简单;(2)“只高估”的性质在异常检测中更安全;(3)配合 Conservative Update 后实际精度并不差。
九、Heavy Hitter 检测与 Space-Saving 联动
Heavy Hitter 问题是频率估计的一个重要应用:找出数据流中频率超过 φ·N 的所有元素(N 是数据流长度,φ 是给定的阈值)。
用 CMS 检测 Heavy Hitter
最直接的方法是将 CMS 与一个小的候选集结合:
1. 维护一个 CMS 和一个大小为 O(1/φ) 的候选集(哈希表)。
2. 每个元素到达时:
a. 更新 CMS。
b. 查询 CMS 估计。
c. 如果估计值 ≥ φ·N,将元素加入候选集。
3. 输出时:对候选集中的每个元素用 CMS 验证,过滤掉估计值低于阈值的。
由于 CMS 只高估不低估,真正的 Heavy Hitter 一定会被发现(无假阴性)。但候选集中可能包含一些非 Heavy Hitter(假阳性),这由 CMS 的精度参数 ε 控制。
Space-Saving 算法
Space-Saving(Metwally et al., 2005)是另一种 Heavy Hitter 检测算法。它维护一个固定大小为 k 的”摘要”,其中存储 k 个 (元素, 计数) 对:
1. 元素到达时:
a. 如果元素已在摘要中,将其计数加 1。
b. 如果不在但摘要未满,加入并计数设为 1。
c. 如果不在且摘要已满,替换计数最小的元素,并将其计数加 1。
Space-Saving 的优势是能给出频率的上界和下界,且实现也很简单。
CMS + Space-Saving 联合使用
在实际系统中,常见的做法是:
- 用 CMS 做粗粒度的频率估计,筛选出潜在的 Heavy Hitter。
- 对候选 Heavy Hitter 用 Space-Saving 或精确计数器做精细跟踪。
这种两层结构在网络流量分析中非常常见——第一层用硬件(FPGA/ASIC)上的 CMS 进行线速过滤,第二层用软件做精确分析。
十、实际系统中的应用
CMS 在工业界的应用远比学术论文中描述的广泛。以下是一些典型的应用场景。
网络流量监控
这是 CMS 最经典的应用场景。网络路由器需要实时统计每个源IP/目的IP/流的数据包数量或字节数。在 10Gbps 的链路上,每秒可能有数百万个数据包,精确计数不现实。
CMS 的优势在于:
- 固定的、可预测的内存使用。
- 每次更新只需 O(d) 次内存访问,适合流水线化的硬件实现。
- 可以检测 DDoS 攻击中的 Heavy Hitter。
推荐系统
在推荐系统中,CMS 用于:
- 统计物品(item)的展示次数和点击次数,计算近似 CTR。
- 统计用户-物品的交互频率,做近似协同过滤。
- 在 A/B 测试中快速估计各变体的曝光和转化。
例如,YouTube 的推荐系统曾使用类似 CMS 的结构来跟踪视频的观看次数,用于在推荐排序中纳入流行度信号。
数据库近似查询
多个数据库系统使用 CMS 来加速查询优化:
- Apache Spark:在 Structured Streaming 中提供了 CMS 的内置支持。
- PostgreSQL:
pg_statistic中的频率直方图可以视为 CMS 的一种替代。 - Redis:通过 RedisBloom 模块支持 CMS。
在查询优化器中,CMS 用于估计 JOIN 的输出大小、GROUP BY 的基数等。
自然语言处理
在大规模 NLP 任务中,CMS 用于:
- 统计词频(word frequency),用于 TF-IDF 计算。
- 在分布式训练中近似跟踪 n-gram 频率。
- 在特征哈希(feature hashing)中估计碰撞率。
分布式系统中的可合并性
CMS 的一个重要性质是可合并的(mergeable):两个使用相同哈希函数的 CMS 可以通过逐元素相加来合并。
merged[i][j] = cms_a[i][j] + cms_b[i][j]
这使得 CMS 天然适合 MapReduce / 分布式计算框架:每个节点维护自己的 CMS,最后合并。这个性质是 CMS 在 Spark、Flink 等系统中广泛使用的重要原因。
十一、完整 C 实现
下面是一个功能完整的 CMS C 实现,包含标准更新、保守更新、查询、合并等操作。
/*
* count_min_sketch.c
*
* 一个功能完整的 Count-Min Sketch 实现。
* 支持标准更新、保守更新、查询、合并。
*
* 编译:gcc -O2 -o cms count_min_sketch.c -lm
* 运行:./cms
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <math.h>
#include <limits.h>
/* ================================================================
* 哈希函数:使用 MurmurHash3 的 32 位变体
* ================================================================ */
static uint32_t murmur3_32(const void *key, size_t len, uint32_t seed) {
const uint8_t *data = (const uint8_t *)key;
const int nblocks = (int)(len / 4);
uint32_t h1 = seed;
const uint32_t c1 = 0xcc9e2d51;
const uint32_t c2 = 0x1b873593;
/* body */
const uint32_t *blocks = (const uint32_t *)(data + nblocks * 4);
for (int i = -nblocks; i; i++) {
uint32_t k1;
memcpy(&k1, &blocks[i], sizeof(k1));
k1 *= c1;
k1 = (k1 << 15) | (k1 >> 17);
k1 *= c2;
h1 ^= k1;
h1 = (h1 << 13) | (h1 >> 19);
h1 = h1 * 5 + 0xe6546b64;
}
/* tail */
const uint8_t *tail = (const uint8_t *)(data + nblocks * 4);
uint32_t k1 = 0;
switch (len & 3) {
case 3: k1 ^= (uint32_t)tail[2] << 16; /* fall through */
case 2: k1 ^= (uint32_t)tail[1] << 8; /* fall through */
case 1: k1 ^= (uint32_t)tail[0];
k1 *= c1;
k1 = (k1 << 15) | (k1 >> 17);
k1 *= c2;
h1 ^= k1;
}
/* finalization */
h1 ^= (uint32_t)len;
h1 ^= h1 >> 16;
h1 *= 0x85ebca6b;
h1 ^= h1 >> 13;
h1 *= 0xc2b2ae35;
h1 ^= h1 >> 16;
return h1;
}
/* ================================================================
* Count-Min Sketch 数据结构
* ================================================================ */
typedef struct {
uint32_t width; /* w:每行的列数 */
uint32_t depth; /* d:行数(哈希函数个数) */
uint64_t total; /* 所有更新的权重之和 */
uint32_t *seeds; /* 每个哈希函数的种子 */
uint32_t *table; /* d × w 的二维数组(展平存储) */
} CMS;
/* 通过 (ε, δ) 参数创建 CMS */
CMS *cms_create(double epsilon, double delta) {
CMS *cms = (CMS *)calloc(1, sizeof(CMS));
if (!cms) return NULL;
cms->width = (uint32_t)ceil(M_E / epsilon);
cms->depth = (uint32_t)ceil(log(1.0 / delta));
cms->total = 0;
if (cms->depth < 1) cms->depth = 1;
if (cms->width < 1) cms->width = 1;
cms->seeds = (uint32_t *)malloc(cms->depth * sizeof(uint32_t));
cms->table = (uint32_t *)calloc((size_t)cms->width * cms->depth, sizeof(uint32_t));
if (!cms->seeds || !cms->table) {
free(cms->seeds);
free(cms->table);
free(cms);
return NULL;
}
/* 用简单的线性序列作为种子 */
for (uint32_t i = 0; i < cms->depth; i++) {
cms->seeds[i] = i * 0x9E3779B9 + 0x12345678;
}
return cms;
}
/* 通过 (width, depth) 直接创建 CMS */
CMS *cms_create_wd(uint32_t width, uint32_t depth) {
CMS *cms = (CMS *)calloc(1, sizeof(CMS));
if (!cms) return NULL;
cms->width = width;
cms->depth = depth;
cms->total = 0;
cms->seeds = (uint32_t *)malloc(depth * sizeof(uint32_t));
cms->table = (uint32_t *)calloc((size_t)width * depth, sizeof(uint32_t));
if (!cms->seeds || !cms->table) {
free(cms->seeds);
free(cms->table);
free(cms);
return NULL;
}
for (uint32_t i = 0; i < depth; i++) {
cms->seeds[i] = i * 0x9E3779B9 + 0x12345678;
}
return cms;
}
void cms_destroy(CMS *cms) {
if (cms) {
free(cms->seeds);
free(cms->table);
free(cms);
}
}
/* 计算元素在第 row 行的列索引 */
static inline uint32_t cms_hash(const CMS *cms, uint32_t row,
const char *key, size_t keylen) {
return murmur3_32(key, keylen, cms->seeds[row]) % cms->width;
}
/* 标准更新:每个计数器无条件加 count */
void cms_add(CMS *cms, const char *key, size_t keylen, uint32_t count) {
cms->total += count;
for (uint32_t i = 0; i < cms->depth; i++) {
uint32_t col = cms_hash(cms, i, key, keylen);
cms->table[i * cms->width + col] += count;
}
}
/* 点查询:取所有行中的最小值 */
uint32_t cms_query(const CMS *cms, const char *key, size_t keylen) {
uint32_t min_val = UINT32_MAX;
for (uint32_t i = 0; i < cms->depth; i++) {
uint32_t col = cms_hash(cms, i, key, keylen);
uint32_t val = cms->table[i * cms->width + col];
if (val < min_val) {
min_val = val;
}
}
return min_val;
}
/* 保守更新:只增加"需要增加"的计数器 */
void cms_add_conservative(CMS *cms, const char *key, size_t keylen,
uint32_t count) {
uint32_t cur_min = cms_query(cms, key, keylen);
uint32_t new_val = cur_min + count;
cms->total += count;
for (uint32_t i = 0; i < cms->depth; i++) {
uint32_t col = cms_hash(cms, i, key, keylen);
uint32_t idx = i * cms->width + col;
if (cms->table[idx] < new_val) {
cms->table[idx] = new_val;
}
}
}
/* 合并两个 CMS(使用相同的哈希函数) */
int cms_merge(CMS *dest, const CMS *src) {
if (dest->width != src->width || dest->depth != src->depth) {
return -1; /* 维度不匹配 */
}
dest->total += src->total;
size_t n = (size_t)dest->width * dest->depth;
for (size_t i = 0; i < n; i++) {
dest->table[i] += src->table[i];
}
return 0;
}
/* 内积查询 */
uint64_t cms_inner_product(const CMS *a, const CMS *b) {
if (a->width != b->width || a->depth != b->depth) {
return UINT64_MAX;
}
uint64_t min_product = UINT64_MAX;
for (uint32_t i = 0; i < a->depth; i++) {
uint64_t row_product = 0;
for (uint32_t j = 0; j < a->width; j++) {
uint32_t idx = i * a->width + j;
row_product += (uint64_t)a->table[idx] * b->table[idx];
}
if (row_product < min_product) {
min_product = row_product;
}
}
return min_product;
}
/* 打印 CMS 的统计信息 */
void cms_info(const CMS *cms) {
size_t mem = sizeof(CMS)
+ cms->depth * sizeof(uint32_t)
+ (size_t)cms->width * cms->depth * sizeof(uint32_t);
printf("Count-Min Sketch:\n");
printf(" width = %u\n", cms->width);
printf(" depth = %u\n", cms->depth);
printf(" total = %lu\n", (unsigned long)cms->total);
printf(" memory = %zu bytes (%.2f KB)\n", mem, (double)mem / 1024.0);
}
/* ================================================================
* 测试与示例
* ================================================================ */
int main(void) {
/* 创建一个 ε=0.001, δ=0.01 的 CMS */
CMS *cms = cms_create(0.001, 0.01);
if (!cms) {
fprintf(stderr, "Failed to create CMS\n");
return 1;
}
cms_info(cms);
printf("\n");
/* 模拟一个 Zipf 分布的数据流 */
const char *elements[] = {
"apple", "banana", "cherry", "date", "elderberry",
"fig", "grape", "honeydew", "kiwi", "lemon"
};
int frequencies[] = {
10000, 5000, 2500, 1250, 625,
312, 156, 78, 39, 20
};
int n = 10;
printf("Inserting elements (Zipf distribution)...\n");
for (int i = 0; i < n; i++) {
const char *elem = elements[i];
size_t len = strlen(elem);
for (int j = 0; j < frequencies[i]; j++) {
cms_add(cms, elem, len, 1);
}
}
printf("\nStandard update results:\n");
printf("%-15s %10s %10s %10s\n", "Element", "True", "Estimated", "Error");
printf("%-15s %10s %10s %10s\n", "-------", "----", "---------", "-----");
for (int i = 0; i < n; i++) {
const char *elem = elements[i];
uint32_t est = cms_query(cms, elem, strlen(elem));
int error = (int)est - frequencies[i];
printf("%-15s %10d %10u %+10d\n", elem, frequencies[i], est, error);
}
/* 查询一个不存在的元素 */
const char *missing = "watermelon";
uint32_t missing_est = cms_query(cms, missing, strlen(missing));
printf("%-15s %10d %10u %+10d\n", missing, 0, missing_est, (int)missing_est);
/* 测试保守更新 */
printf("\n--- Conservative Update Test ---\n");
CMS *cms_cu = cms_create(0.001, 0.01);
for (int i = 0; i < n; i++) {
const char *elem = elements[i];
size_t len = strlen(elem);
for (int j = 0; j < frequencies[i]; j++) {
cms_add_conservative(cms_cu, elem, len, 1);
}
}
printf("%-15s %10s %10s %10s\n", "Element", "True", "Estimated", "Error");
printf("%-15s %10s %10s %10s\n", "-------", "----", "---------", "-----");
for (int i = 0; i < n; i++) {
const char *elem = elements[i];
uint32_t est = cms_query(cms_cu, elem, strlen(elem));
int error = (int)est - frequencies[i];
printf("%-15s %10d %10u %+10d\n", elem, frequencies[i], est, error);
}
/* 测试合并 */
printf("\n--- Merge Test ---\n");
CMS *cms_a = cms_create_wd(cms->width, cms->depth);
CMS *cms_b = cms_create_wd(cms->width, cms->depth);
/* 确保使用相同的种子 */
memcpy(cms_a->seeds, cms->seeds, cms->depth * sizeof(uint32_t));
memcpy(cms_b->seeds, cms->seeds, cms->depth * sizeof(uint32_t));
/* 分片插入 */
for (int i = 0; i < n / 2; i++) {
const char *elem = elements[i];
size_t len = strlen(elem);
for (int j = 0; j < frequencies[i]; j++) {
cms_add(cms_a, elem, len, 1);
}
}
for (int i = n / 2; i < n; i++) {
const char *elem = elements[i];
size_t len = strlen(elem);
for (int j = 0; j < frequencies[i]; j++) {
cms_add(cms_b, elem, len, 1);
}
}
cms_merge(cms_a, cms_b);
printf("Merged result for 'apple': %u (expected %d)\n",
cms_query(cms_a, "apple", 5), frequencies[0]);
printf("Merged result for 'grape': %u (expected %d)\n",
cms_query(cms_a, "grape", 5), frequencies[6]);
/* 清理 */
cms_destroy(cms);
cms_destroy(cms_cu);
cms_destroy(cms_a);
cms_destroy(cms_b);
printf("\nDone.\n");
return 0;
}这份实现包含以下功能:
- MurmurHash3 哈希函数,用不同种子实现多个独立的哈希。
- 标准更新(
cms_add)和 保守更新(cms_add_conservative)。 - 点查询(
cms_query)和 内积查询(cms_inner_product)。 - 合并(
cms_merge)操作。 - 一个完整的测试程序,模拟 Zipf 分布并比较标准更新和保守更新的效果。
十二、工程实践要点
哈希函数的选择
| 哈希函数 | 速度 | 分布质量 | 备注 |
|---|---|---|---|
| MurmurHash3 | 快 | 优秀 | 最常用,用不同种子实现多个哈希 |
| xxHash | 极快 | 优秀 | 适合吞吐量敏感的场景 |
| CityHash | 快 | 优秀 | Google 出品 |
| SipHash | 中等 | 优秀 | 抗哈希洪水攻击 |
| FNV-1a | 快 | 一般 | 简单但分布质量略差 |
| 多项式哈希 | 快 | 理论保证 | h(x) = (ax+b) mod p mod w |
在实践中,MurmurHash3 + 不同种子
是最简单且效果最好的方案。理论上需要两两独立(pairwise
independent)的哈希族,多项式哈希
h(x) = ((a·x + b) mod p) mod w
就够了,但实际中非密码学哈希函数的效果更好。
计数器溢出
32 位计数器最大值约 42 亿,对于大多数应用足够。但在以下场景中需要注意:
- 超长运行时间:如果数据流持续数月不重置,计数器可能溢出。
- 加权更新:如果权重很大(如字节数),溢出更容易发生。
解决方案:
- 使用 64 位计数器(空间翻倍)。
- 定期”衰减”:将所有计数器除以 2(类似指数衰减),保持相对比例。
- 使用饱和计数器(到达最大值后不再增加)。
工程陷阱表
| 陷阱 | 症状 | 解决方案 |
|---|---|---|
| 哈希种子相同 | 所有行完全相同,等于只有一行 | 确保每行使用不同的种子或不同的哈希函数 |
| w 和 d 设反了 | 空间浪费或精度极差 | w(宽)远大于 d(深),典型比例 1000:5 |
| 忘记计入总权重 | 无法计算相对频率 | 维护 total 字段 |
| 合并时哈希不一致 | 结果完全错误 | 确保所有分片使用相同的哈希函数和种子 |
| 对低频元素过度信任 | 估计值远高于真实值 | CMS 对低频元素误差大,不适合精确统计低频元素 |
| 线程不安全 | 计数丢失或竞争 | 每行用原子操作,或每个线程维护独立 CMS 后合并 |
| 字节序问题 | 跨平台合并结果错误 | 统一字节序或在合并前转换 |
| 只用一个哈希然后分桶 | 哈希值的高低位有相关性 | 每行用完全独立的哈希计算 |
参数选择指南
场景 ε δ w d 空间(4B 计数器)
----------------------------------------------------------------------
网络流量粗估计 0.01 0.05 272 3 ~3.2 KB
数据库查询优化 0.001 0.01 2718 5 ~53 KB
精确 Heavy Hitter 0.0001 0.001 27183 7 ~740 KB
高精度科学计算 0.00001 0.0001 271829 10 ~10.3 MB
关于内存对齐
在高性能场景中,CMS 的矩阵行应该对齐到缓存行(通常 64 字节)。这可以避免伪共享(false sharing)并提高 CPU 缓存命中率。
/* 将 width 对齐到 16 的倍数(假设 4 字节计数器,64 字节缓存行) */
uint32_t aligned_width = (width + 15) & ~15u;十三、我的个人看法
写到这里,我想分享一些关于 CMS 的个人思考。
CMS 是否被高估了
在学术界,CMS 是引用量最高的流式算法之一。但在工业实践中,我发现很多场景其实不需要 CMS 的全部能力:
- 如果你只关心 Top-K,Space-Saving 更直接。
- 如果你可以容忍更大的误差,一个简单的采样就够了。
- 如果你的数据可以分批处理,精确计数 + 合并可能更简单。
CMS 真正的甜蜜点是:你需要在严格的内存约束下,对任意元素进行实时频率查询,且数据流不能回溯。不是所有场景都满足这些条件。
保守更新应该是默认行为
我前面说过,没有理由不使用保守更新。让我再强调一次:如果你在实现 CMS 库,保守更新应该是默认选项,标准更新是”兼容模式”。现有的很多 CMS 库(包括一些知名的)默认使用标准更新,这是一个历史遗留问题。
哈希函数质量比数量重要
理论上需要 d 个”完全独立”的哈希函数,但实践中使用一个好的哈希函数配不同种子就够了。我见过有人为了”独立性”用了 d 个不同的哈希算法,反而引入了实现复杂度和维护负担。
MurmurHash3 + 不同种子,或者用两个独立的哈希做线性组合(h_i(x) = h1(x) + i * h2(x)),都是实践中验证过的好方案。后一种方法来自 Kirsch 和 Mitzenmacher 2006 年的论文,只需要两次哈希计算就能生成任意多个”看起来独立”的哈希值。
CMS 在机器学习中的应用被低估了
Feature hashing(特征哈希)本质上就是一个单行的 CMS。如果你在做大规模稀疏特征的在线学习,CMS 可以用来:
- 监控特征的更新频率,自适应调整学习率。
- 检测特征漂移(feature drift)。
- 在分布式训练中近似同步特征统计。
这些应用在学术论文中讨论不多,但在实际系统中很有价值。
什么时候不该用 CMS
最后列几个不适合用 CMS 的场景:
- 需要精确计数:如果误差不可接受,老老实实用哈希表。
- 需要枚举元素:CMS 不能告诉你有哪些元素,只能回答”这个元素出现了多少次”。
- 数据流可以多遍扫描:如果数据在磁盘上,精确计数可能更划算。
- 低频元素很重要:CMS 对低频元素的估计很差,此时考虑 Count Sketch 或 Lossy Counting。
十四、总结与延伸
Count-Min Sketch 是流式算法工具箱中的基础构件。它的设计体现了概率数据结构的核心哲学:用少量空间和可控的误差换取处理无限数据流的能力。
本文的要点回顾:
- CMS 是一个 d × w 的计数器矩阵,用 d 个独立哈希函数将元素映射到 d 行。
- 更新时 d 个计数器各加 1,查询时取 d 个计数器的最小值。
- 参数 w = ⌈e/ε⌉,d = ⌈ln(1/δ)⌉,空间与数据流长度无关。
- CMS 只会高估不会低估,误差上界为 ε·‖f‖₁,概率至少 1-δ。
- 保守更新在不增加开销的情况下显著减少高估。
- Count Sketch 提供无偏估计和更紧的 L2 范数保证,但不具备单边误差性质。
- CMS 可合并,天然适合分布式计算。
如果你对流式算法感兴趣,建议进一步阅读以下主题:
- Lossy Counting:另一种频率估计算法,提供确定性保证。
- Misra-Gries 算法:Heavy Hitter 检测的经典算法。
- Turnstile 模型:允许删除操作的流模型,需要更强的数据结构。
- 滑动窗口模型:只关心最近 W 个元素的频率估计。
参考文献
- Cormode, G., & Muthukrishnan, S. (2005). An improved data stream summary: the count-min sketch and its applications. Journal of Algorithms, 55(1), 58-75.
- Charikar, M., Chen, K., & Farach-Colton, M. (2002). Finding frequent items in data streams. Proceedings of the 29th International Colloquium on Automata, Languages and Programming (ICALP).
- Estan, C., & Varghese, G. (2002). New directions in traffic measurement and accounting. Proceedings of the 2002 conference on Applications, technologies, architectures, and protocols for computer communications (SIGCOMM).
- Metwally, A., Agrawal, D., & El Abbadi, A. (2005). Efficient computation of frequent and top-k elements in data streams. Proceedings of the 10th International Conference on Database Theory (ICDT).
- Pitel, G., & Fouquier, G. (2015). Count-Min-Log sketch: Approximately counting with approximate counters. arXiv preprint arXiv:1502.04885.
- Kirsch, A., & Mitzenmacher, M. (2006). Less hashing, same performance: Building a better Bloom filter. European Symposium on Algorithms (ESA).
- Cormode, G., & Hadjieleftheriou, M. (2008). Finding frequent items in data streams. Proceedings of the VLDB Endowment, 1(2), 1530-1541.
- Muthukrishnan, S. (2005). Data streams: Algorithms and applications. Foundations and Trends in Theoretical Computer Science, 1(2), 117-236.
- Morris, R. (1978). Counting large numbers of events in small registers. Communications of the ACM, 21(10), 840-842.
- Flajolet, P. (1985). Approximate counting: a detailed analysis. BIT Numerical Mathematics, 25(1), 113-134.
算法系列导航:上一篇:HyperLogLog | 下一篇:t-digest