假设你在运维一个日志系统。每秒有一百万条事件涌入,一天就是 864 亿条。你想回答几个看起来很简单的问题:
- 今天有多少个不同的 IP 地址?
- 访问量最高的 10 个 URL 是什么?
- 请求延迟的 P99 是多少?
- 最近 5 分钟的流量和前 5 分钟相比有没有异常?
如果数据能全部放进内存,这些问题用一个哈希表、一个堆、一个排序就能精确回答。但 864 亿条数据放不进内存——甚至放不进磁盘。你只能看一遍,用远小于数据量的空间,给出足够好的近似答案。
这就是流式算法(Streaming Algorithms)要解决的问题。
在前几篇文章中,我们分别深入讨论了 HyperLogLog、Count-Min Sketch、t-digest、水塘抽样、MinHash/SimHash 和频率估计的理论极限。这篇文章退后一步,把它们放到统一的理论框架中,补全之前没有覆盖的话题——AMS sketch、图流算法、滑动窗口模型——并讨论流式算法作为一种设计哲学的意义。
一、流式计算模型
形式化定义
流式计算模型(Data Stream Model)由 Alon、Matias 和 Szegedy 在 1996 年的开创性论文中正式提出。模型的核心约束有三条:
- 单遍扫描(single pass):数据元素 \(a_1, a_2, \ldots, a_m\) 按到达顺序依次呈现,算法只能看一遍,不能回头。
- 亚线性空间(sublinear space):算法使用的空间 \(s\) 远小于数据流长度 \(m\),通常是 \(O(\log m)\)、\(O(\log^2 m)\) 或 \(O(\sqrt{m})\)。
- 高效更新(efficient update):每个元素的处理时间应当是常数或多对数级别。
数据元素通常来自一个大小为 \(n\) 的宇宙 \(\mathcal{U} = \{1, 2, \ldots, n\}\)。流可以表示为一个频率向量 \(f = (f_1, f_2, \ldots, f_n)\),其中 \(f_i\) 表示元素 \(i\) 出现的次数。整个数据流的长度 \(m = \sum_i f_i\)。
三种流模型
根据更新方式的不同,流模型有三种变体:
| 模型 | 更新方式 | 频率约束 | 典型场景 |
|---|---|---|---|
| 现金登记模型(Cash Register) | 每次 \(f_i \leftarrow f_i + 1\) | \(f_i \geq 0\) | 网络流量计数 |
| 十字转门模型(Turnstile) | 每次 \(f_i \leftarrow f_i + c\),\(c \in \mathbb{Z}\) | \(f_i\) 可为负 | 数据库增删 |
| 严格十字转门模型(Strict Turnstile) | 同上 | \(f_i \geq 0\) 始终成立 | 库存管理 |
现金登记模型最简单——只有插入没有删除。大多数实际场景属于这个模型。十字转门模型更通用,允许”撤回”之前的更新。严格十字转门模型介于两者之间,允许撤回但保证频率不会变成负数。
模型的选择直接影响算法的可行性。例如,Count-Min Sketch 只在现金登记模型和严格十字转门模型下有意义——如果频率可以为负,“最小值”就没有直观含义了。
精确计算的不可能性
为什么需要近似?因为许多看起来简单的精确计算在流模型下需要线性空间。
定理:在单遍流模型下,精确计算不同元素个数(\(F_0\))需要 \(\Omega(n)\) 空间。
证明思路:考虑 Alice 持有集合 \(S \subseteq \{1, \ldots, n\}\),Bob 持有元素 \(j\)。Alice 把 \(S\) 编码为流发给 Bob,Bob 在流中追加 \(j\),然后查看 \(F_0\) 是否增加了 1。如果增加了,说明 \(j \notin S\);否则 \(j \in S\)。这就把集合成员查询(INDEX 问题)归约到了 \(F_0\) 计算,而 INDEX 的通信复杂度是 \(\Omega(n)\)。
这个下界告诉我们:如果想用亚线性空间,就必须放弃精确性,允许近似误差。
二、通信复杂度与流式下界
通信复杂度简介
通信复杂度(Communication Complexity)是证明流式算法空间下界的主要工具。模型如下:
Alice 持有输入 \(x \in \{0,1\}^n\),Bob 持有输入 \(y \in \{0,1\}^n\),两人要协同计算某个函数 \(f(x, y)\)。他们可以来回发送消息,通信复杂度衡量的是所有消息的总比特数。
几个经典的通信复杂度下界:
| 问题 | 定义 | 确定性下界 | 随机化下界 |
|---|---|---|---|
| 等价(EQ) | \(x = y\)? | \(\Omega(n)\) | \(O(1)\) |
| 不相交(DISJ) | \(x \cap y = \emptyset\)? | \(\Omega(n)\) | \(\Omega(n)\) |
| 索引(INDEX) | \(x_{y}\)? | \(\Omega(n)\)(单向) | \(\Omega(n)\)(单向) |
| 间隙汉明距离 | \(\|x - y\|_H\) 与阈值 \(t\) 的关系 | \(\Omega(n)\) | \(\Omega(n)\) |
从通信复杂度到流式下界
归约的关键思想:Alice 和 Bob 分别处理数据流的前半段和后半段。
具体地,假设我们想证明流式问题 \(P\) 需要空间 \(s\)。构造如下归约:
- Alice 根据她的输入 \(x\) 构造数据流的前半段 \(\sigma_1\);
- Alice 在 \(\sigma_1\) 上运行流式算法 \(\mathcal{A}\),得到内存快照 \(M\)(大小为 \(s\) 位);
- Alice 把 \(M\) 发送给 Bob(通信量 \(s\) 位);
- Bob 根据他的输入 \(y\) 构造数据流的后半段 \(\sigma_2\);
- Bob 从 \(M\) 恢复算法状态,继续在 \(\sigma_2\) 上运行 \(\mathcal{A}\),得到结果。
如果 \(\mathcal{A}\) 正确解决了 \(P\),那么 Alice 和 Bob 就用 \(s\) 比特通信解决了对应的通信问题。如果通信问题的下界是 \(\Omega(L)\),那么 \(s = \Omega(L)\)。
主要下界结果
通过上述归约技术,可以得到以下流式算法的空间下界:
| 流式问题 | 空间下界 | 对应通信问题 |
|---|---|---|
| 精确 \(F_0\) | \(\Omega(n)\) | INDEX |
| \((1 \pm \varepsilon)\) 近似 \(F_0\) | \(\Omega(1/\varepsilon^2 + \log n)\) | 间隙汉明距离 |
| \((1 \pm \varepsilon)\) 近似 \(F_2\) | \(\Omega(1/\varepsilon^2 \cdot \log n)\) | DISJ 的变体 |
| \((\varepsilon n)\) 加性近似频率 | \(\Omega(1/\varepsilon)\) | INDEX |
| 精确中位数 | \(\Omega(n)\) | INDEX |
这些下界意味着:HyperLogLog 用 \(O(1/\varepsilon^2 \cdot \log\log n + \log n)\) 空间估计 \(F_0\)、AMS sketch 用 \(O(1/\varepsilon^2 \cdot \log n)\) 空间估计 \(F_2\),都已经接近理论最优。
我认为通信复杂度是理论计算机科学中最优雅的工具之一——它把一个看似关于”内存大小”的问题转化为两个人之间的对话游戏。
三、频率矩:从 F0 到 F2
频率矩的定义
给定频率向量 \(f = (f_1, f_2, \ldots, f_n)\),第 \(k\) 阶频率矩定义为:
\[F_k = \sum_{i=1}^{n} f_i^k\]
不同阶的频率矩有不同的统计含义:
| 矩 | 定义 | 含义 |
|---|---|---|
| \(F_0\) | \(\|\{i : f_i > 0\}\|\) | 不同元素个数(基数) |
| \(F_1\) | \(\sum f_i = m\) | 数据流长度 |
| \(F_2\) | \(\sum f_i^2\) | 重复度/冲突度指标 |
| \(F_\infty\) | \(\max_i f_i\) | 最高频率 |
\(F_0\) 就是基数统计问题,我们在第 30 篇中已经详细讨论了 FM sketch、LogLog 和 HyperLogLog。这里聚焦于 \(F_2\)。
F2 的意义
\(F_2\) 又称 Gini 的齐性指标(index of homogeneity),或碰撞概率(collision probability)。如果从数据流中随机抽取两个元素,它们相同的概率正比于 \(F_2 / m^2\)。
\(F_2\) 的应用包括:
- 网络异常检测:正常流量的 \(F_2\) 接近 \(m\)(均匀分布),DDoS 攻击会导致 \(F_2\) 激增(少数 IP 大量出现)。
- 数据倾斜评估:MapReduce 任务中,\(F_2\) 大意味着某些 key 的数据量远超平均,需要特殊处理。
- 数据库查询优化:估计 join 的结果大小——两个表 join 后的行数与各列的 \(F_2\) 相关。
AMS Sketch:估计 F2
Alon-Matias-Szegedy(AMS)sketch 是估计 \(F_2\) 的经典算法,也是 1996 年那篇开创性论文的核心贡献(这篇论文获得了 Gödel 奖)。
核心思想:选择一族四独立哈希函数 \(h: \mathcal{U} \to \{-1, +1\}\),维护一个随机变量 \(Z = \left(\sum_{i=1}^{n} f_i \cdot h(i)\right)^2\),则 \(\mathbb{E}[Z] = F_2\)。
为什么?展开平方:
\[Z = \left(\sum_i f_i \cdot h(i)\right)^2 = \sum_i f_i^2 \cdot h(i)^2 + \sum_{i \neq j} f_i f_j \cdot h(i) \cdot h(j)\]
由于 \(h(i)^2 = 1\),第一项就是 \(\sum_i f_i^2 = F_2\)。由于 \(h\) 是四独立的,\(\mathbb{E}[h(i) \cdot h(j)] = 0\)(\(i \neq j\)),所以交叉项的期望为零。因此 \(\mathbb{E}[Z] = F_2\)。
方差分析:可以证明 \(\text{Var}[Z] \leq 2F_2^2\)。要得到 \((1 \pm \varepsilon)\) 的相对误差,取 \(O(1/\varepsilon^2)\) 个独立的 \(Z\) 的均值即可把方差降到 \(\varepsilon^2 F_2^2\)。再用中位数技巧(取 \(O(\log(1/\delta))\) 组的中位数)把失败概率降到 \(\delta\)。
最终空间复杂度:\(O(1/\varepsilon^2 \cdot \log(1/\delta) \cdot \log n)\) 位。其中 \(\log n\) 是因为每个计数器需要 \(O(\log n)\) 位来存储。
AMS Sketch 的完整实现
import random
import hashlib
import struct
class AMSSketch:
"""AMS Sketch 用于估计 F2(二阶频率矩)。
空间复杂度:O(depth * width * log n) 位
更新时间:O(depth * width)
查询时间:O(depth * width)
"""
def __init__(self, width: int = 64, depth: int = 5, seed: int = 42):
"""
Args:
width: 每行的独立估计器个数,控制精度,误差约 1/sqrt(width)
depth: 行数,用于中位数技巧,失败概率约 2^{-depth}
seed: 随机种子
"""
self.width = width
self.depth = depth
self.counters = [[0] * width for _ in range(depth)]
self._rng = random.Random(seed)
# 为每个 (depth, width) 对生成独立的哈希种子
self._seeds = [
[self._rng.getrandbits(64) for _ in range(width)]
for _ in range(depth)
]
def _four_wise_hash(self, item: int, seed: int) -> int:
"""四独立哈希函数映射到 {-1, +1}。
使用基于 MD5 的哈希模拟四独立性。
在生产环境中应使用多项式哈希 h(x) = (a*x^3 + b*x^2 + c*x + d) mod p。
"""
data = struct.pack('<QQ', item, seed)
h = hashlib.md5(data).digest()
return 1 if h[0] & 1 else -1
def update(self, item: int, count: int = 1):
"""处理流中的一个元素。"""
for d in range(self.depth):
for w in range(self.width):
sign = self._four_wise_hash(item, self._seeds[d][w])
self.counters[d][w] += sign * count
def estimate_f2(self) -> float:
"""返回 F2 的估计值。
每行取 width 个计数器的平方的均值,然后对 depth 行取中位数。
"""
row_estimates = []
for d in range(self.depth):
# 每个计数器的平方就是一个 F2 的无偏估计
# 但这里我们用的是单个计数器累加所有元素的方式
# 所以每行只有一个 Z = (sum f_i * h(i))^2
# 修正:width 个独立估计器取均值
estimates = [self.counters[d][w] ** 2 for w in range(self.width)]
row_estimates.append(sum(estimates) / self.width)
row_estimates.sort()
mid = self.depth // 2
return row_estimates[mid]
def merge(self, other: 'AMSSketch') -> 'AMSSketch':
"""合并两个 AMS Sketch(可加性)。"""
assert self.width == other.width and self.depth == other.depth
result = AMSSketch(self.width, self.depth)
result._seeds = self._seeds
for d in range(self.depth):
for w in range(self.width):
result.counters[d][w] = self.counters[d][w] + other.counters[d][w]
return result
# 使用示例
def demo_ams():
sketch = AMSSketch(width=128, depth=7)
# 模拟一个有倾斜的数据流
# 元素 1 出现 1000 次,元素 2 出现 500 次,其余各出现 1 次
freq = {}
for _ in range(1000):
sketch.update(1)
freq[1] = freq.get(1, 0) + 1
for _ in range(500):
sketch.update(2)
freq[2] = freq.get(2, 0) + 1
for i in range(3, 103):
sketch.update(i)
freq[i] = freq.get(i, 0) + 1
true_f2 = sum(v ** 2 for v in freq.values())
estimated_f2 = sketch.estimate_f2()
print(f"真实 F2: {true_f2}")
print(f"估计 F2: {estimated_f2:.0f}")
print(f"相对误差: {abs(estimated_f2 - true_f2) / true_f2 * 100:.2f}%")
if __name__ == "__main__":
demo_ams()Johnson-Lindenstrauss 维度约简
频率矩估计与 Johnson-Lindenstrauss(JL)引理有深层联系。JL 引理说:\(n\) 维空间中的 \(m\) 个点可以投影到 \(O(\log m / \varepsilon^2)\) 维空间,同时保持所有点对距离在 \((1 \pm \varepsilon)\) 范围内。
AMS sketch 本质上是一种随机线性投影:频率向量 \(f \in \mathbb{R}^n\) 被投影到 \(\mathbb{R}^{O(1/\varepsilon^2)}\) 空间。投影矩阵的每个元素是独立的 \(\{-1, +1\}\) 随机变量。投影后向量的 \(L_2\) 范数的平方就是 \(F_2\) 的无偏估计。
这不是巧合。sketch 就是随机线性投影的特例——这一统一视角将在第十节中展开。
四、基数统计:从 FM 到 HyperLogLog
基数统计(\(F_0\) 估计)是流式算法中研究最深入的问题之一。这里做一个从历史到最优解的快速回顾,详细分析请参考第 30 篇。
算法演进
| 算法 | 年份 | 空间 | 误差 | 核心思想 |
|---|---|---|---|---|
| FM sketch(Flajolet-Martin) | 1985 | \(O(\log n)\) | \(O(1)\) 相对误差 | 哈希值末尾零的最大长度 |
| BJKST | 2002 | \(O(1/\varepsilon^2 \cdot \log n)\) | \((1 \pm \varepsilon)\) | 子采样 + 级别计数 |
| LogLog | 2003 | \(O(1/\varepsilon^2 \cdot \log\log n)\) | \((1 \pm \varepsilon)\) | 分桶取算术平均 |
| HyperLogLog | 2007 | \(O(1/\varepsilon^2 \cdot \log\log n)\) | \((1 \pm \varepsilon)\) | 分桶取调和平均 |
| HyperLogLog++ | 2013 | 同上 + 实际优化 | 同上 | 偏差修正 + 稀疏表示 |
| Streaming HLL(Ertl) | 2017 | 同上 | 改进常数因子 | 最大似然估计替代调和平均 |
核心原理速览
FM sketch 的直觉:如果你扔硬币,连续出现 \(k\) 次正面的概率是 \(2^{-k}\)。把哈希值看作”硬币序列”,观察到的最大连续零长度 \(R\) 意味着大约有 \(2^R\) 个不同元素。
HyperLogLog 的改进:把元素分到 \(m = 2^p\) 个桶中,每个桶记录其”最大零长度”\(M_j\),然后用调和平均合并:
\[E = \alpha_m \cdot m^2 \cdot \left(\sum_{j=1}^{m} 2^{-M_j}\right)^{-1}\]
这个估计器的标准误差约为 \(1.04/\sqrt{m}\)。用 \(m = 2^{14} = 16384\) 个桶(16KB 空间),误差仅为 0.81%。
我认为 HyperLogLog 是流式算法中”理论优雅与工程实用完美统一”的典范。
五、频率估计:从精确到近似
频率估计问题:给定数据流,对任意查询元素 \(q\),估计 \(f_q\)(出现次数)。这部分内容联动第 31 篇 Count-Min Sketch 和第 35 篇频率估计理论。
Misra-Gries 算法
Misra-Gries(1982)是确定性频率估计的经典算法,思想简洁而精妙:
维护 k-1 个 (元素, 计数) 对
对于每个到达的元素 a:
如果 a 已在表中: 对应计数 +1
否则如果表未满: 插入 (a, 1)
否则: 所有计数 -1,删除计数为 0 的条目
性质: - 空间 \(O(k \log n)\) - 保证 \(f_q - m/k \leq \hat{f}_q \leq f_q\)——永远不会高估,最多低估 \(m/k\) - 确定性算法,无随机性
局限在于误差与流长度 \(m\) 相关。如果你想要 \(\varepsilon m\) 的误差保证,需要 \(k = 1/\varepsilon\)。
Count-Min Sketch
Count-Min Sketch(Cormode & Muthukrishnan, 2005)用 \(d\) 个独立哈希函数将元素映射到 \(d \times w\) 的计数矩阵,查询时取所有行对应位置的最小值:
\[\hat{f}_q = \min_{j=1}^{d} \text{table}[j][h_j(q)]\]
性质: - 空间 \(O(w \cdot d \cdot \log n)\),取 \(w = \lceil e/\varepsilon \rceil\),\(d = \lceil \ln(1/\delta) \rceil\) - 保证 \(f_q \leq \hat{f}_q \leq f_q + \varepsilon m\),概率 \(\geq 1 - \delta\) - 仅在现金登记模型下有单侧误差保证(永远不低估)
Space-Saving 算法
Space-Saving(Metwally, 2005)在 Misra-Gries 的基础上改进:当表满且新元素不在表中时,替换计数最小的条目,并将其计数 +1。
这个小改动带来了两个重要的好处: 1. 它自然地给出了 top-k 的候选列表(表中当前的元素)。 2. 每个元素的频率都有明确的上下界。
三者对比
| 特性 | Misra-Gries | Count-Min Sketch | Space-Saving |
|---|---|---|---|
| 类型 | 确定性 | 随机化 | 确定性 |
| 误差方向 | 低估 | 高估 | 上下界都有 |
| 空间 | \(O(1/\varepsilon)\) 个条目 | \(O(1/\varepsilon \times \log(1/\delta))\) | \(O(1/\varepsilon)\) 个条目 |
| 可合并 | 需要额外处理 | 天然可加 | 需要额外处理 |
| 点查询 | 是 | 是 | 是 |
| Top-k | 间接 | 需要堆辅助 | 天然支持 |
| 适用模型 | 现金登记 | 现金登记/十字转门 | 现金登记 |
在实际系统中,Count-Min Sketch 因其可合并性在分布式环境中最受欢迎(多个节点分别维护 sketch,最后逐元素求和即可合并)。Space-Saving 则在单机 top-k 场景中效果最好。
六、分位数估计
分位数问题:给定数据流,估计第 \(\phi\) 分位数(排序后第 \(\lceil \phi \cdot n \rceil\) 个元素)。详细讨论见第 32 篇 t-digest。
GK Sketch
Greenwald-Khanna(2001)sketch 是分位数估计的经典确定性算法。它维护一组”摘要元素”\((v_i, g_i, \Delta_i)\),其中: - \(v_i\) 是观察到的数据值 - \(g_i\) 是 \(v_i\) 和 \(v_{i-1}\) 之间的最小排名差 - \(\Delta_i\) 是 \(v_i\) 的排名的最大不确定性
空间:\(O((1/\varepsilon) \cdot \log(\varepsilon n))\) 个条目
保证:对任意 \(\phi\),返回的值的排名在 \([\phi n - \varepsilon n, \phi n + \varepsilon n]\) 内。
t-digest
t-digest(Dunning, 2019)是面向工程的分位数估计算法,其核心思想是对尾部分位数(P99、P999)给予更多的精度。
它维护一组质心(centroid),每个质心有均值和权重。插入新数据时,优先合并到最近的质心,但限制每个质心的大小——越靠近 0 或 1 分位数的质心允许的大小越小。
这个”不均匀精度”的设计来自一个实际观察:运维人员最关心的是极端分位数(P99、P99.9),而不是中位数。
KLL Sketch
Karnin-Lang-Liberty(2016)sketch 实现了接近最优的空间复杂度 \(O((1/\varepsilon) \cdot \log\log(1/\delta))\),采用”分层压缩(compaction)“的策略。
KLL 维护 \(O(\log(1/(\varepsilon\delta)))\) 层缓冲区。最底层直接存储到达的元素,当缓冲区满时,排序后每隔一个取一个(随机选择偶数或奇数位置),“压缩”到上一层。上层的每个元素代表的权重是下层的两倍。
分位数算法对比
| 算法 | 空间 | 误差保证 | 可合并 | 尾部精度 |
|---|---|---|---|---|
| GK | \(O(\frac{1}{\varepsilon}\log(\varepsilon n))\) | 确定性 \(\varepsilon n\) | 复杂 | 均匀 |
| t-digest | 经验性 | 无严格理论保证 | 是 | 极好 |
| KLL | \(O(\frac{1}{\varepsilon}\log\log\frac{1}{\delta})\) | 概率性 | 是 | 均匀 |
| DDSketch | \(O(\frac{1}{\varepsilon}\log\frac{u}{l})\) | 相对误差确定性 | 是 | 好 |
工程选择建议:如果需要严格理论保证且可合并,选 KLL;如果关注尾部精度且实现简单,选 t-digest;如果需要确定性保证且不需要合并,选 GK。
七、采样与相似度
水塘抽样
水塘抽样(Reservoir Sampling)解决的问题是:从未知长度的数据流中均匀随机抽取 \(k\) 个样本。详见第 33 篇。
经典的 Vitter 算法(Algorithm R): - 前 \(k\) 个元素直接放入水塘 - 第 \(i\) 个元素(\(i > k\))以概率 \(k/i\) 替换水塘中随机一个元素
优先采样(Priority Sampling)是水塘抽样的推广:给每个元素赋予随机优先级 \(r_i = \text{Uniform}(0, 1)\),保留优先级最高的 \(k\) 个。它的优点是可以估计子集的聚合值(例如”来自某个前缀的 IP 的总流量”)。
MinHash 与 SimHash
MinHash 估计集合的 Jaccard 相似度,SimHash 估计向量的余弦相似度。详见第 34 篇。
MinHash:对集合的每个元素计算哈希值,取最小值。两个集合的 MinHash 相同的概率等于它们的 Jaccard 相似度 \(J(A, B) = |A \cap B| / |A \cup B|\)。
SimHash(Charikar, 2002):将高维向量映射为一个位串。选择一个随机超平面 \(r\),设 \(b = \text{sign}(\langle v, r \rangle)\)。两个向量的 SimHash 位不同的比例等于它们之间角度的 \(\theta/\pi\)。
在流式场景中,这两种技术用于在线去重(near-duplicate detection)和相似度搜索。
八、图流算法
图流是流式算法中最具挑战性的分支之一。数据流由图的边 \((u, v)\) 构成,算法在一遍扫描后需要回答关于图结构的问题。
问题的困难性
图流算法面临的根本困难是:图的大多数性质(连通性、最短路径、匹配等)本质上是全局性质,而流式算法只能维护局部信息。
下界示例:在单遍流模型下,精确判断图是否连通需要 \(\Omega(n \log n)\) 空间(\(n\) 是顶点数),这几乎等于存储整棵生成树。
连通分量:半流模型
半流模型(Semi-Streaming Model)放宽空间约束到 \(O(n \cdot \text{polylog}(n))\)——可以存储大约一棵生成树的信息,但远小于边数 \(O(n^2)\)。
在半流模型下,连通分量可以用以下简洁的算法计算:
class SemiStreamingCC:
"""半流模型下的连通分量算法。
维护一棵生成森林。空间 O(n log n)。
"""
def __init__(self, n: int):
self.parent = list(range(n))
self.rank = [0] * n
def find(self, x: int) -> int:
while self.parent[x] != x:
self.parent[x] = self.parent[self.parent[x]] # 路径压缩
x = self.parent[x]
return x
def union(self, u: int, v: int) -> bool:
ru, rv = self.find(u), self.find(v)
if ru == rv:
return False
if self.rank[ru] < self.rank[rv]:
ru, rv = rv, ru
self.parent[rv] = ru
if self.rank[ru] == self.rank[rv]:
self.rank[ru] += 1
return True
def process_edge(self, u: int, v: int):
"""处理流中的一条边。"""
self.union(u, v)
def query(self, u: int, v: int) -> bool:
"""查询 u 和 v 是否在同一个连通分量中。"""
return self.find(u) == self.find(v)
def num_components(self) -> int:
return len(set(self.find(i) for i in range(len(self.parent))))图素描:AGM sketch
Ahn-Guha-McGregor(2012)提出了一种基于线性 sketch 的图流算法,能在 \(O(n \cdot \text{polylog}(n))\) 空间内支持动态图流(边的插入和删除)。
核心思想是对每个顶点的邻接向量维护一个线性 sketch。连通分量的计算通过对 sketch 进行 \(O(\log n)\) 轮”压缩”来实现——每轮找到每个连通分量的一条出边,然后合并。
这是一个深刻的结果:它表明线性 sketch 不仅能处理频率向量上的统计量,还能处理图的结构性质。
图流中的其他问题
| 问题 | 模型 | 已知最优空间 | 算法 |
|---|---|---|---|
| 连通分量 | 半流/插入 | \(O(n \log n)\) | Union-Find |
| 连通分量 | 动态流 | \(O(n \cdot \text{polylog}(n))\) | AGM sketch |
| 二部图判定 | 半流/插入 | \(O(n \log n)\) | 染色 |
| 最大匹配 | 半流/插入 | \(O(n \cdot \text{polylog}(n))\) | 贪心/局部搜索 |
| 三角形计数 | 半流 | \(\Omega(m)\)(精确) | 采样估计 |
| 最短路径 | 半流 | \(\Theta(n^{1+1/k})\) for \((2k-1)\)-近似 | Spanners |
图流算法是一个活跃的研究领域。我认为它最令人兴奋的地方在于,它迫使我们重新思考”什么图性质是局部可计算的”——这个问题的答案往往出人意料。
九、滑动窗口模型
前面讨论的流式算法都假设统计量是关于整个数据流的。但在实际监控中,我们通常只关心最近 \(W\) 个元素——这就是滑动窗口模型。
为什么滑动窗口更难
在无限窗口模型中,元素”只进不出”。在滑动窗口模型中,元素会”过期”——第 \(i\) 个元素在时刻 \(i + W\) 后就不再属于当前窗口。这意味着算法不仅要处理新到达的元素,还要处理旧元素的失效。
如果直接维护一个大小为 \(W\) 的环形缓冲区,空间就是 \(O(W)\)——对于大窗口来说太大了。我们的目标是用 \(o(W)\) 的空间在滑动窗口上维护统计量。
DGIM:滑动窗口上的比特计数
Datar-Gionis-Indyk-Motwani(2002)提出了 DGIM 算法,用于在滑动窗口上计数比特流中 1 的个数。
问题:比特流 \(b_1, b_2, \ldots\) 持续到达,在任意时刻查询”最近 \(W\) 个比特中有多少个 1”。
DGIM 的核心数据结构:维护一组”桶”(bucket),每个桶记录: - 时间戳:桶中最新的 1 的到达时间 - 大小:桶中包含的 1 的个数,必须是 2 的幂
不变量:桶按时间戳降序排列。大小为 \(2^j\) 的桶的个数要么是 1 个,要么是 2 个(每种大小最多 2 个,最少 1 个,最大的那种除外)。
更新规则: 1. 新比特到达。如果是 0,什么都不做。 2. 如果是 1,创建一个大小为 1 的新桶。 3. 如果大小为 1 的桶超过了 2 个,合并最旧的两个为一个大小为 2 的桶。 4. 递归检查大小为 2 的桶是否超过 2 个,以此类推。 5. 删除时间戳超出窗口 \(W\) 的桶。
查询:将窗口内所有桶的大小求和,但最旧那个桶只计一半。
精度保证:误差不超过最旧桶大小的一半,最终相对误差不超过 50%。通过允许每种大小最多 \(r\) 个桶(而不是 2 个),可以把误差降到 \(1/(r-1)\)。
空间:\(O((1/\varepsilon) \cdot \log^2 W)\) 位。
DGIM 的完整实现
from collections import deque
class DGIMBucket:
"""DGIM 桶。"""
__slots__ = ['timestamp', 'size']
def __init__(self, timestamp: int, size: int):
self.timestamp = timestamp
self.size = size
def __repr__(self):
return f"Bucket(t={self.timestamp}, size={self.size})"
class DGIM:
"""DGIM 算法:滑动窗口上的比特计数。
空间:O((1/epsilon) * log^2(W))
误差:最多 1/(max_same - 1) 的相对误差
"""
def __init__(self, window_size: int, max_same: int = 2):
"""
Args:
window_size: 滑动窗口大小 W
max_same: 每种大小最多允许的桶数(默认 2,误差约 50%)
设为 r 则误差约 1/(r-1)
"""
self.W = window_size
self.max_same = max_same
self.buckets: deque[DGIMBucket] = deque()
self.time = 0
def _expire(self):
"""移除窗口外的桶。"""
while self.buckets and self.buckets[0].timestamp <= self.time - self.W:
self.buckets.popleft()
def _merge_if_needed(self):
"""从最新的桶开始检查,合并同大小超额的桶。"""
# 从右(最新)到左(最旧)扫描
if len(self.buckets) < 2:
return
i = len(self.buckets) - 1
while i >= 2:
# 统计从 i 向左连续同大小桶的个数
current_size = self.buckets[i].size
count = 0
j = i
while j >= 0 and self.buckets[j].size == current_size:
count += 1
j -= 1
if count > self.max_same:
# 合并最旧的两个同大小桶
oldest_idx = j + 1
second_oldest_idx = j + 2
merged = DGIMBucket(
self.buckets[second_oldest_idx].timestamp,
current_size * 2
)
# 删除两个旧桶,插入一个新桶
del self.buckets[second_oldest_idx]
del self.buckets[oldest_idx]
self.buckets.insert(oldest_idx, merged)
i = oldest_idx # 重新从合并位置检查
else:
# 跳到下一个不同大小的位置
i = j
if i < 0:
break
def add_bit(self, bit: int):
"""处理一个新比特。"""
self.time += 1
self._expire()
if bit == 1:
self.buckets.append(DGIMBucket(self.time, 1))
self._merge_if_needed()
def query(self) -> int:
"""估计当前窗口中 1 的个数。"""
self._expire()
if not self.buckets:
return 0
total = 0
for bucket in self.buckets:
total += bucket.size
# 最旧的桶可能只有部分在窗口内,取其大小的一半
if self.buckets:
total -= self.buckets[0].size // 2
return total
def get_buckets(self):
"""返回当前桶的状态(调试用)。"""
return [(b.timestamp, b.size) for b in self.buckets]
# 使用示例
def demo_dgim():
dgim = DGIM(window_size=100, max_same=3) # 误差约 50%
import random
random.seed(42)
bits = [random.randint(0, 1) for _ in range(500)]
for bit in bits:
dgim.add_bit(bit)
estimated = dgim.query()
actual = sum(bits[-100:])
print(f"窗口大小: 100")
print(f"实际 1 的个数: {actual}")
print(f"DGIM 估计: {estimated}")
print(f"绝对误差: {abs(estimated - actual)}")
print(f"相对误差: {abs(estimated - actual) / max(actual, 1) * 100:.1f}%")
print(f"桶状态: {dgim.get_buckets()}")
if __name__ == "__main__":
demo_dgim()指数直方图
Exponential Histogram 是 DGIM 的推广,可以处理非比特流——每个元素有一个非负整数值。它的思想与 DGIM 相同,只是桶的大小不再限制为 2 的幂,而是允许任意值(但仍保持指数增长)。
滑动窗口上的其他问题
| 问题 | 空间 | 方法 |
|---|---|---|
| 比特计数 | \(O(\frac{1}{\varepsilon}\log^2 W)\) | DGIM |
| 求和 | \(O(\frac{1}{\varepsilon}\log^2 W)\) | 指数直方图 |
| \(F_0\)(基数) | \(O(\frac{1}{\varepsilon^2}\log^2 W)\) | DGIM + HLL |
| 频率矩 | \(O(\frac{1}{\varepsilon^2}\text{polylog}(W))\) | Smooth Histogram |
| 分位数 | \(O(\frac{1}{\varepsilon}\log^2 W)\) | GK + 过期管理 |
Braverman 和 Ostrovsky(2007)提出了 Smooth Histogram 框架,将任何”光滑”(smooth)的聚合函数的流式算法自动转化为滑动窗口版本。一个函数是光滑的,如果在移除前缀时,函数值的变化是渐进的而非突变的。\(F_2\)、\(F_0\) 和频率矩都是光滑函数。
十、统一视角:Sketch 即线性投影
线性 Sketch 的定义
大多数流式 sketch 可以统一理解为一种操作:用一个随机矩阵 \(\Phi \in \mathbb{R}^{s \times n}\) 对频率向量 \(f \in \mathbb{R}^n\) 做线性投影:
\[\text{sketch}(f) = \Phi \cdot f \in \mathbb{R}^s\]
其中 \(s \ll n\) 是 sketch 的大小。每当元素 \(i\) 到达,sketch 更新为:
\[\text{sketch} \leftarrow \text{sketch} + \Phi \cdot e_i\]
其中 \(e_i\) 是第 \(i\) 个标准基向量。这等价于把 \(\Phi\) 的第 \(i\) 列加到 sketch 上。
各算法的投影矩阵
| 算法 | \(\Phi\) 的结构 | 非零模式 |
|---|---|---|
| Count-Min Sketch | \(\Phi_{(j,h_j(i))} = 1\) | 每行每列恰好一个非零元素 |
| Count Sketch | \(\Phi_{(j,h_j(i))} = s_j(i)\)(\(\pm 1\)) | 同上但带符号 |
| AMS Sketch | 每个元素 \(\Phi_{(j,i)} = s_j(i)\)(\(\pm 1\)) | 稠密 |
| Random Projection | \(\Phi_{ij} \sim N(0,1/s)\) | 全稠密 |
| Sparse JL | \(\Phi_{ij} \in \{0, \pm 1/\sqrt{s'}\}\) | 稀疏 |
线性性的三大好处
可合并性(Mergeability):\(\text{sketch}(f + g) = \text{sketch}(f) + \text{sketch}(g)\)。这意味着多个节点分别维护 sketch 后可以简单相加。在 MapReduce、Flink 等分布式系统中至关重要。
支持十字转门模型:由于线性性,元素的”删除”等价于减去对应列,天然支持频率增减。
隐私保护:线性 sketch 天然满足差分隐私的某些变体——添加适当的噪声后,sketch 不会泄露任何单个元素的信息。
线性 Sketch 的局限
并非所有问题都能用线性 sketch 高效解决。例如:
- 中位数:线性 sketch 无法在 \(o(n)\) 空间内精确计算中位数(需要非线性的比较操作)。
- top-k:精确的 top-k 需要排序,不是线性操作。Space-Saving 不是线性 sketch。
- 图的直径:在动态图流中,线性 sketch 计算直径需要 \(\Omega(n^{3/2})\) 空间。
我认为”sketch = 线性投影”是理解流式算法最重要的心智模型。一旦你意识到这一点,很多看似不同的算法突然变成了同一个主题的变奏——唯一的区别是投影矩阵的选择。
十一、大数据系统中的应用
流式算法不仅是理论上的优雅结果,它们已经深深嵌入到现代大数据基础设施中。
Apache Flink
Flink 是目前最流行的流处理引擎之一。它内置了多种流式算法的实现:
// Flink 中使用 HyperLogLog 统计独立用户数
DataStream<String> userIds = ...;
userIds
.keyBy(uid -> "global")
.process(new ProcessFunction<String, Long>() {
private transient HyperLogLogPlus hll;
@Override
public void open(Configuration params) {
hll = new HyperLogLogPlus(14); // 2^14 个桶
}
@Override
public void processElement(String uid, Context ctx, Collector<Long> out) {
hll.offer(uid);
out.collect(hll.cardinality());
}
});Flink 的 State Backend 天然支持 sketch 的检查点和恢复——sketch 的状态可以序列化后存储到 RocksDB 或文件系统中。
Apache Spark Streaming
Spark Streaming(及 Structured Streaming)通过
approxQuantile 和
approxCountDistinct 等 API 暴露流式算法:
# Spark 中使用近似去重计数
df.select(approx_count_distinct("user_id", rsd=0.05))
# 近似分位数
df.stat.approxQuantile("latency", [0.5, 0.95, 0.99], 0.01)Spark 的 approx_count_distinct 底层使用
HyperLogLog++。approxQuantile 使用
Greenwald-Khanna sketch。
Kafka Streams
Kafka Streams 提供了有状态的流处理,可以在 State Store 中维护 sketch:
// 概念性代码:在 Kafka Streams 中维护 CMS
builder.stream("events")
.groupByKey()
.aggregate(
() -> new CountMinSketch(0.001, 0.01),
(key, value, cms) -> { cms.add(value); return cms; },
Materialized.as("cms-store")
);Redis 的概率数据结构
Redis 从 4.0 版本开始通过模块系统支持概率数据结构,后来在 Redis Stack 中原生集成:
# HyperLogLog
PFADD visitors "user:1001"
PFADD visitors "user:1002"
PFCOUNT visitors
# Count-Min Sketch(Redis Stack)
CMS.INITBYPROB urls 0.001 0.01
CMS.INCRBY urls "/index.html" 1
CMS.QUERY urls "/index.html"
# Top-K(Redis Stack)
TOPK.RESERVE popular_pages 10 50 3 0.9
TOPK.ADD popular_pages "/index.html" "/about.html"
TOPK.LIST popular_pagesClickHouse 的流式聚合
ClickHouse 在 SQL 层面直接暴露了多种流式算法:
-- HyperLogLog
SELECT uniqHLL12(user_id) FROM events;
-- 分位数估计(t-digest)
SELECT quantileTDigest(0.99)(latency) FROM requests;
-- 分位数估计(DDSketch)
SELECT quantileDD(0.01)(latency) FROM requests;
-- 频率估计
SELECT topK(10)(url) FROM access_log;系统集成的工程要点
在将流式算法集成到生产系统时,有几个工程问题需要特别注意:
序列化/反序列化:sketch 必须支持高效的序列化,因为在分布式系统中需要频繁传输和持久化。HyperLogLog 的序列化非常简单(就是一个 byte 数组),Count-Min Sketch 也不难(二维数组),但 t-digest 的序列化需要更多考虑(质心列表的排序和压缩)。
合并语义:在 MapReduce 和窗口聚合中,sketch 需要合并。线性 sketch(CMS、AMS)可以直接逐元素相加。HyperLogLog 取逐桶最大值。t-digest 需要把两组质心合并后重新压缩。
精度与空间的权衡:在资源受限的环境中(如边缘计算设备),需要仔细调优 sketch 的参数。下表给出了常见配置:
| 算法 | 典型配置 | 空间 | 相对误差 |
|---|---|---|---|
| HyperLogLog | \(p = 14\)(16384 桶) | 12 KB | 0.81% |
| Count-Min Sketch | \(w = 2048, d = 5\) | 40 KB | \(\varepsilon \approx 0.0013\) |
| t-digest | \(\delta = 100\) | ~10 KB | P99 < 1% |
| KLL | \(k = 200\) | ~4 KB | \(\varepsilon \approx 0.01\) |
十二、工程陷阱与最佳实践
常见陷阱一览
| 陷阱 | 症状 | 原因 | 解决方案 |
|---|---|---|---|
| 哈希函数质量差 | 估计值偏差大、方差高 | 哈希冲突过多 | 使用 MurmurHash3、xxHash 等高质量哈希 |
| 忘记处理空流 | 除零错误或返回 NaN | 调和平均在全零时发散 | 添加空流检查,返回 0 |
| sketch 参数未调优 | 精度不达标或内存浪费 | 默认参数不适合具体场景 | 根据误差需求和数据分布选择参数 |
| 合并不同参数的 sketch | 程序崩溃或静默错误 | 维度不匹配 | 合并前校验参数一致性 |
| 十字转门模型下用 CMS | 负频率导致错误 | CMS 假设非负频率 | 改用 Count Sketch |
| 滑动窗口未过期 | 统计量反映全量而非窗口 | 忘记删除过期桶/元素 | 严格实现过期逻辑 |
| 序列化字节序不一致 | 跨平台合并结果错误 | 大端/小端差异 | 固定字节序(通常小端) |
| 浮点精度累积误差 | 长时间运行后估计偏移 | 浮点加法的舍入误差 | 定期重建 sketch 或使用整数算术 |
| 并发更新无保护 | 计数丢失或数据损坏 | 多线程竞争 | per-thread sketch + 定期合并 |
| 哈希种子硬编码 | 攻击者可构造最坏输入 | 种子可预测 | 随机生成种子,重启后更换 |
选型决策树
面对一个流式计算需求,可以按以下思路选择算法:
你需要计算什么?
├── 不同元素个数 → HyperLogLog
├── 元素频率
│ ├── 点查询(某个元素出现了几次)→ Count-Min Sketch
│ ├── 内积/F2 估计 → AMS / Count Sketch
│ └── Top-k(最频繁的 k 个)→ Space-Saving
├── 分位数
│ ├── 关心尾部精度(P99/P999)→ t-digest / DDSketch
│ └── 需要严格理论保证 → KLL / GK
├── 随机样本 → 水塘抽样
├── 集合相似度 → MinHash(Jaccard)/ SimHash(余弦)
├── 滑动窗口上的计数 → DGIM / 指数直方图
└── 图的连通性 → 半流 Union-Find / AGM sketch
性能调优建议
批量更新:不要逐条更新 sketch,攒一批后批量处理。这能利用 CPU 缓存局部性和 SIMD 指令。
SIMD 加速哈希:现代 CPU 支持 CRC32、AES-NI 等指令集,可以极大加速哈希计算。ClickHouse 的 HyperLogLog 实现就使用了 CRC32 指令。
内存对齐:sketch 的数组应按 cache line(64 字节)对齐,避免伪共享(false sharing)问题。
分层 sketch:在热门元素和冷门元素差异巨大的场景中,可以用一个小的精确计数器处理热门元素(Frequent 算法识别),用 sketch 处理其余元素。
十三、个人观点:流式思维作为设计哲学
流式算法的核心约束——单遍扫描、亚线性空间——最初是由物理限制驱动的。磁带只能顺序读取,内存装不下所有数据。但我认为,即使在现代计算环境中内存变得越来越大,流式思维仍然有深刻的指导意义。
近似是一种特性,不是缺陷
精确计算往往是过度设计。你真的需要知道今天的独立用户数是 12,847,293 而不是”大约 1285 万”吗?大多数情况下,0.5% 的误差对业务决策没有任何影响。
HyperLogLog 用 12KB 空间就能以 0.8% 的误差统计上亿的独立元素。一个精确的 HashSet 需要数 GB。从信息论的角度看,HyperLogLog 是一种极其高效的信息压缩——它只保留你真正需要的信息量级,丢弃你不需要的精确细节。
这种”有损但有界”的思维方式应该更广泛地应用于系统设计中。
可合并性是分布式系统的核心需求
我在前面反复强调 sketch 的可合并性,因为它直接对应于分布式计算的核心操作——reduce。任何可合并的数据结构都天然适配 MapReduce、流处理、联邦学习等分布式范式。
反过来,当你设计一个数据结构时,如果它不可合并,你就很难把它用在分布式环境中。这是一个设计约束,而非事后添加的特性。
空间-精度-时间的三角不等式
流式算法的理论告诉我们,这三者之间存在根本性的权衡:
- 更多空间 → 更高精度
- 更多遍数(passes) → 更高精度
- 随机性 → 打破确定性下界
这个三角关系在系统设计中无处不在。缓存、索引、采样、预计算——都是在这三个维度上做权衡。
流式算法对软件架构的启示
- 事件驱动架构天然是流式的——消息到达后被处理一次然后丢弃。
- CQRS(命令查询分离)中的查询端可以用 sketch 来加速——写入侧精确记录,查询侧近似汇总。
- 边缘计算中的设备资源极其有限,流式算法是唯一可行的统计手段。
- 实时机器学习中的特征计算(频率、基数、分位数)天然适合用 sketch 来维护。
我认为,理解流式算法不仅仅是学会几个数据结构,更重要的是内化一种思维方式:在信息不完全的约束下,用有限的资源做出足够好的决策。这种思维方式,从本质上说,就是工程学的精髓。
参考文献
- Alon, N., Matias, Y., & Szegedy, M. (1999). “The Space Complexity of Approximating the Frequency Moments.” Journal of Computer and System Sciences, 58(1), 137-147.
- Flajolet, P., & Martin, G. N. (1985). “Probabilistic Counting Algorithms for Data Base Applications.” Journal of Computer and System Sciences, 31(2), 182-209.
- Flajolet, P., Fusy, É., Gandouet, O., & Meunier, F. (2007). “HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm.” DMTCS Proceedings.
- Cormode, G., & Muthukrishnan, S. (2005). “An Improved Data Stream Summary: The Count-Min Sketch and its Applications.” Journal of Algorithms, 55(1), 58-75.
- Misra, J., & Gries, D. (1982). “Finding Repeated Elements.” Science of Computer Programming, 2(2), 143-152.
- Metwally, A., Agrawal, D., & El Abbadi, A. (2005). “Efficient Computation of Frequent and Top-k Elements in Data Streams.” ICDT 2005.
- Greenwald, M., & Khanna, S. (2001). “Space-Efficient Online Computation of Quantile Summaries.” SIGMOD 2001.
- Dunning, T., & Ertl, O. (2019). “Computing Extremely Accurate Quantiles Using t-Digests.” arXiv preprint.
- Karnin, Z., Lang, K., & Liberty, E. (2016). “Optimal Quantile Approximation in Streams.” FOCS 2016.
- Datar, M., Gionis, A., Indyk, P., & Motwani, R. (2002). “Maintaining Stream Statistics over Sliding Windows.” SIAM Journal on Computing, 31(6), 1794-1813.
- Ahn, K. J., Guha, S., & McGregor, A. (2012). “Analyzing Graph Structure via Linear Measurements.” SODA 2012.
- Braverman, V., & Ostrovsky, R. (2007). “Smooth Histograms for Sliding Windows.” FOCS 2007.
- Vitter, J. S. (1985). “Random Sampling with a Reservoir.” ACM Transactions on Mathematical Software, 11(1), 37-57.
- Charikar, M. S. (2002). “Similarity Estimation Techniques from Rounding Algorithms.” STOC 2002.
- Indyk, P., & Woodruff, D. (2005). “Optimal Approximations of the Frequency Moments of Data Streams.” STOC 2005.
- Kushilevitz, E., & Nisan, N. (1997). Communication Complexity. Cambridge University Press.
- Muthukrishnan, S. (2005). “Data Streams: Algorithms and Applications.” Foundations and Trends in Theoretical Computer Science, 1(2), 117-236.
算法系列导航:上一篇:频率估计的理论极限 | 下一篇:k-d tree
相关阅读:HyperLogLog | Count-Min Sketch | t-digest