土法炼钢兴趣小组的算法知识备份

流式算法总论:亚线性空间的艺术

目录

假设你在运维一个日志系统。每秒有一百万条事件涌入,一天就是 864 亿条。你想回答几个看起来很简单的问题:

如果数据能全部放进内存,这些问题用一个哈希表、一个堆、一个排序就能精确回答。但 864 亿条数据放不进内存——甚至放不进磁盘。你只能看一遍,用远小于数据量的空间,给出足够好的近似答案。

这就是流式算法(Streaming Algorithms)要解决的问题。

在前几篇文章中,我们分别深入讨论了 HyperLogLogCount-Min Sketcht-digest水塘抽样MinHash/SimHash频率估计的理论极限。这篇文章退后一步,把它们放到统一的理论框架中,补全之前没有覆盖的话题——AMS sketch、图流算法、滑动窗口模型——并讨论流式算法作为一种设计哲学的意义。

一、流式计算模型

形式化定义

流式计算模型(Data Stream Model)由 Alon、Matias 和 Szegedy 在 1996 年的开创性论文中正式提出。模型的核心约束有三条:

  1. 单遍扫描(single pass):数据元素 \(a_1, a_2, \ldots, a_m\) 按到达顺序依次呈现,算法只能看一遍,不能回头。
  2. 亚线性空间(sublinear space):算法使用的空间 \(s\) 远小于数据流长度 \(m\),通常是 \(O(\log m)\)\(O(\log^2 m)\)\(O(\sqrt{m})\)
  3. 高效更新(efficient update):每个元素的处理时间应当是常数或多对数级别。

数据元素通常来自一个大小为 \(n\) 的宇宙 \(\mathcal{U} = \{1, 2, \ldots, n\}\)。流可以表示为一个频率向量 \(f = (f_1, f_2, \ldots, f_n)\),其中 \(f_i\) 表示元素 \(i\) 出现的次数。整个数据流的长度 \(m = \sum_i f_i\)

三种流模型

根据更新方式的不同,流模型有三种变体:

模型 更新方式 频率约束 典型场景
现金登记模型(Cash Register) 每次 \(f_i \leftarrow f_i + 1\) \(f_i \geq 0\) 网络流量计数
十字转门模型(Turnstile) 每次 \(f_i \leftarrow f_i + c\)\(c \in \mathbb{Z}\) \(f_i\) 可为负 数据库增删
严格十字转门模型(Strict Turnstile) 同上 \(f_i \geq 0\) 始终成立 库存管理

现金登记模型最简单——只有插入没有删除。大多数实际场景属于这个模型。十字转门模型更通用,允许”撤回”之前的更新。严格十字转门模型介于两者之间,允许撤回但保证频率不会变成负数。

模型的选择直接影响算法的可行性。例如,Count-Min Sketch 只在现金登记模型和严格十字转门模型下有意义——如果频率可以为负,“最小值”就没有直观含义了。

精确计算的不可能性

为什么需要近似?因为许多看起来简单的精确计算在流模型下需要线性空间。

定理:在单遍流模型下,精确计算不同元素个数(\(F_0\))需要 \(\Omega(n)\) 空间。

证明思路:考虑 Alice 持有集合 \(S \subseteq \{1, \ldots, n\}\),Bob 持有元素 \(j\)。Alice 把 \(S\) 编码为流发给 Bob,Bob 在流中追加 \(j\),然后查看 \(F_0\) 是否增加了 1。如果增加了,说明 \(j \notin S\);否则 \(j \in S\)。这就把集合成员查询(INDEX 问题)归约到了 \(F_0\) 计算,而 INDEX 的通信复杂度是 \(\Omega(n)\)

这个下界告诉我们:如果想用亚线性空间,就必须放弃精确性,允许近似误差。

二、通信复杂度与流式下界

通信复杂度简介

通信复杂度(Communication Complexity)是证明流式算法空间下界的主要工具。模型如下:

Alice 持有输入 \(x \in \{0,1\}^n\),Bob 持有输入 \(y \in \{0,1\}^n\),两人要协同计算某个函数 \(f(x, y)\)。他们可以来回发送消息,通信复杂度衡量的是所有消息的总比特数。

几个经典的通信复杂度下界:

问题 定义 确定性下界 随机化下界
等价(EQ) \(x = y\) \(\Omega(n)\) \(O(1)\)
不相交(DISJ) \(x \cap y = \emptyset\) \(\Omega(n)\) \(\Omega(n)\)
索引(INDEX) \(x_{y}\) \(\Omega(n)\)(单向) \(\Omega(n)\)(单向)
间隙汉明距离 \(\|x - y\|_H\) 与阈值 \(t\) 的关系 \(\Omega(n)\) \(\Omega(n)\)

从通信复杂度到流式下界

归约的关键思想:Alice 和 Bob 分别处理数据流的前半段和后半段。

具体地,假设我们想证明流式问题 \(P\) 需要空间 \(s\)。构造如下归约:

  1. Alice 根据她的输入 \(x\) 构造数据流的前半段 \(\sigma_1\)
  2. Alice 在 \(\sigma_1\) 上运行流式算法 \(\mathcal{A}\),得到内存快照 \(M\)(大小为 \(s\) 位);
  3. Alice 把 \(M\) 发送给 Bob(通信量 \(s\) 位);
  4. Bob 根据他的输入 \(y\) 构造数据流的后半段 \(\sigma_2\)
  5. Bob 从 \(M\) 恢复算法状态,继续在 \(\sigma_2\) 上运行 \(\mathcal{A}\),得到结果。

如果 \(\mathcal{A}\) 正确解决了 \(P\),那么 Alice 和 Bob 就用 \(s\) 比特通信解决了对应的通信问题。如果通信问题的下界是 \(\Omega(L)\),那么 \(s = \Omega(L)\)

主要下界结果

通过上述归约技术,可以得到以下流式算法的空间下界:

流式问题 空间下界 对应通信问题
精确 \(F_0\) \(\Omega(n)\) INDEX
\((1 \pm \varepsilon)\) 近似 \(F_0\) \(\Omega(1/\varepsilon^2 + \log n)\) 间隙汉明距离
\((1 \pm \varepsilon)\) 近似 \(F_2\) \(\Omega(1/\varepsilon^2 \cdot \log n)\) DISJ 的变体
\((\varepsilon n)\) 加性近似频率 \(\Omega(1/\varepsilon)\) INDEX
精确中位数 \(\Omega(n)\) INDEX

这些下界意味着:HyperLogLog 用 \(O(1/\varepsilon^2 \cdot \log\log n + \log n)\) 空间估计 \(F_0\)、AMS sketch 用 \(O(1/\varepsilon^2 \cdot \log n)\) 空间估计 \(F_2\),都已经接近理论最优。

我认为通信复杂度是理论计算机科学中最优雅的工具之一——它把一个看似关于”内存大小”的问题转化为两个人之间的对话游戏。

三、频率矩:从 F0 到 F2

频率矩的定义

给定频率向量 \(f = (f_1, f_2, \ldots, f_n)\),第 \(k\) 阶频率矩定义为:

\[F_k = \sum_{i=1}^{n} f_i^k\]

不同阶的频率矩有不同的统计含义:

定义 含义
\(F_0\) \(\|\{i : f_i > 0\}\|\) 不同元素个数(基数)
\(F_1\) \(\sum f_i = m\) 数据流长度
\(F_2\) \(\sum f_i^2\) 重复度/冲突度指标
\(F_\infty\) \(\max_i f_i\) 最高频率

\(F_0\) 就是基数统计问题,我们在第 30 篇中已经详细讨论了 FM sketch、LogLog 和 HyperLogLog。这里聚焦于 \(F_2\)

F2 的意义

\(F_2\) 又称 Gini 的齐性指标(index of homogeneity),或碰撞概率(collision probability)。如果从数据流中随机抽取两个元素,它们相同的概率正比于 \(F_2 / m^2\)

\(F_2\) 的应用包括:

AMS Sketch:估计 F2

Alon-Matias-Szegedy(AMS)sketch 是估计 \(F_2\) 的经典算法,也是 1996 年那篇开创性论文的核心贡献(这篇论文获得了 Gödel 奖)。

核心思想:选择一族四独立哈希函数 \(h: \mathcal{U} \to \{-1, +1\}\),维护一个随机变量 \(Z = \left(\sum_{i=1}^{n} f_i \cdot h(i)\right)^2\),则 \(\mathbb{E}[Z] = F_2\)

为什么?展开平方:

\[Z = \left(\sum_i f_i \cdot h(i)\right)^2 = \sum_i f_i^2 \cdot h(i)^2 + \sum_{i \neq j} f_i f_j \cdot h(i) \cdot h(j)\]

由于 \(h(i)^2 = 1\),第一项就是 \(\sum_i f_i^2 = F_2\)。由于 \(h\) 是四独立的,\(\mathbb{E}[h(i) \cdot h(j)] = 0\)\(i \neq j\)),所以交叉项的期望为零。因此 \(\mathbb{E}[Z] = F_2\)

方差分析:可以证明 \(\text{Var}[Z] \leq 2F_2^2\)。要得到 \((1 \pm \varepsilon)\) 的相对误差,取 \(O(1/\varepsilon^2)\) 个独立的 \(Z\) 的均值即可把方差降到 \(\varepsilon^2 F_2^2\)。再用中位数技巧(取 \(O(\log(1/\delta))\) 组的中位数)把失败概率降到 \(\delta\)

最终空间复杂度:\(O(1/\varepsilon^2 \cdot \log(1/\delta) \cdot \log n)\) 位。其中 \(\log n\) 是因为每个计数器需要 \(O(\log n)\) 位来存储。

AMS Sketch 的完整实现

import random
import hashlib
import struct

class AMSSketch:
    """AMS Sketch 用于估计 F2(二阶频率矩)。

    空间复杂度:O(depth * width * log n) 位
    更新时间:O(depth * width)
    查询时间:O(depth * width)
    """

    def __init__(self, width: int = 64, depth: int = 5, seed: int = 42):
        """
        Args:
            width: 每行的独立估计器个数,控制精度,误差约 1/sqrt(width)
            depth: 行数,用于中位数技巧,失败概率约 2^{-depth}
            seed: 随机种子
        """
        self.width = width
        self.depth = depth
        self.counters = [[0] * width for _ in range(depth)]
        self._rng = random.Random(seed)
        # 为每个 (depth, width) 对生成独立的哈希种子
        self._seeds = [
            [self._rng.getrandbits(64) for _ in range(width)]
            for _ in range(depth)
        ]

    def _four_wise_hash(self, item: int, seed: int) -> int:
        """四独立哈希函数映射到 {-1, +1}。

        使用基于 MD5 的哈希模拟四独立性。
        在生产环境中应使用多项式哈希 h(x) = (a*x^3 + b*x^2 + c*x + d) mod p。
        """
        data = struct.pack('<QQ', item, seed)
        h = hashlib.md5(data).digest()
        return 1 if h[0] & 1 else -1

    def update(self, item: int, count: int = 1):
        """处理流中的一个元素。"""
        for d in range(self.depth):
            for w in range(self.width):
                sign = self._four_wise_hash(item, self._seeds[d][w])
                self.counters[d][w] += sign * count

    def estimate_f2(self) -> float:
        """返回 F2 的估计值。

        每行取 width 个计数器的平方的均值,然后对 depth 行取中位数。
        """
        row_estimates = []
        for d in range(self.depth):
            # 每个计数器的平方就是一个 F2 的无偏估计
            # 但这里我们用的是单个计数器累加所有元素的方式
            # 所以每行只有一个 Z = (sum f_i * h(i))^2
            # 修正:width 个独立估计器取均值
            estimates = [self.counters[d][w] ** 2 for w in range(self.width)]
            row_estimates.append(sum(estimates) / self.width)
        row_estimates.sort()
        mid = self.depth // 2
        return row_estimates[mid]

    def merge(self, other: 'AMSSketch') -> 'AMSSketch':
        """合并两个 AMS Sketch(可加性)。"""
        assert self.width == other.width and self.depth == other.depth
        result = AMSSketch(self.width, self.depth)
        result._seeds = self._seeds
        for d in range(self.depth):
            for w in range(self.width):
                result.counters[d][w] = self.counters[d][w] + other.counters[d][w]
        return result


# 使用示例
def demo_ams():
    sketch = AMSSketch(width=128, depth=7)

    # 模拟一个有倾斜的数据流
    # 元素 1 出现 1000 次,元素 2 出现 500 次,其余各出现 1 次
    freq = {}
    for _ in range(1000):
        sketch.update(1)
        freq[1] = freq.get(1, 0) + 1
    for _ in range(500):
        sketch.update(2)
        freq[2] = freq.get(2, 0) + 1
    for i in range(3, 103):
        sketch.update(i)
        freq[i] = freq.get(i, 0) + 1

    true_f2 = sum(v ** 2 for v in freq.values())
    estimated_f2 = sketch.estimate_f2()

    print(f"真实 F2: {true_f2}")
    print(f"估计 F2: {estimated_f2:.0f}")
    print(f"相对误差: {abs(estimated_f2 - true_f2) / true_f2 * 100:.2f}%")

if __name__ == "__main__":
    demo_ams()

Johnson-Lindenstrauss 维度约简

频率矩估计与 Johnson-Lindenstrauss(JL)引理有深层联系。JL 引理说:\(n\) 维空间中的 \(m\) 个点可以投影到 \(O(\log m / \varepsilon^2)\) 维空间,同时保持所有点对距离在 \((1 \pm \varepsilon)\) 范围内。

AMS sketch 本质上是一种随机线性投影:频率向量 \(f \in \mathbb{R}^n\) 被投影到 \(\mathbb{R}^{O(1/\varepsilon^2)}\) 空间。投影矩阵的每个元素是独立的 \(\{-1, +1\}\) 随机变量。投影后向量的 \(L_2\) 范数的平方就是 \(F_2\) 的无偏估计。

这不是巧合。sketch 就是随机线性投影的特例——这一统一视角将在第十节中展开。

四、基数统计:从 FM 到 HyperLogLog

基数统计(\(F_0\) 估计)是流式算法中研究最深入的问题之一。这里做一个从历史到最优解的快速回顾,详细分析请参考第 30 篇

算法演进

算法 年份 空间 误差 核心思想
FM sketch(Flajolet-Martin) 1985 \(O(\log n)\) \(O(1)\) 相对误差 哈希值末尾零的最大长度
BJKST 2002 \(O(1/\varepsilon^2 \cdot \log n)\) \((1 \pm \varepsilon)\) 子采样 + 级别计数
LogLog 2003 \(O(1/\varepsilon^2 \cdot \log\log n)\) \((1 \pm \varepsilon)\) 分桶取算术平均
HyperLogLog 2007 \(O(1/\varepsilon^2 \cdot \log\log n)\) \((1 \pm \varepsilon)\) 分桶取调和平均
HyperLogLog++ 2013 同上 + 实际优化 同上 偏差修正 + 稀疏表示
Streaming HLL(Ertl) 2017 同上 改进常数因子 最大似然估计替代调和平均

核心原理速览

FM sketch 的直觉:如果你扔硬币,连续出现 \(k\) 次正面的概率是 \(2^{-k}\)。把哈希值看作”硬币序列”,观察到的最大连续零长度 \(R\) 意味着大约有 \(2^R\) 个不同元素。

HyperLogLog 的改进:把元素分到 \(m = 2^p\) 个桶中,每个桶记录其”最大零长度”\(M_j\),然后用调和平均合并:

\[E = \alpha_m \cdot m^2 \cdot \left(\sum_{j=1}^{m} 2^{-M_j}\right)^{-1}\]

这个估计器的标准误差约为 \(1.04/\sqrt{m}\)。用 \(m = 2^{14} = 16384\) 个桶(16KB 空间),误差仅为 0.81%。

我认为 HyperLogLog 是流式算法中”理论优雅与工程实用完美统一”的典范。

五、频率估计:从精确到近似

频率估计问题:给定数据流,对任意查询元素 \(q\),估计 \(f_q\)(出现次数)。这部分内容联动第 31 篇 Count-Min Sketch第 35 篇频率估计理论

Misra-Gries 算法

Misra-Gries(1982)是确定性频率估计的经典算法,思想简洁而精妙:

维护 k-1 个 (元素, 计数) 对
对于每个到达的元素 a:
  如果 a 已在表中: 对应计数 +1
  否则如果表未满: 插入 (a, 1)
  否则: 所有计数 -1,删除计数为 0 的条目

性质: - 空间 \(O(k \log n)\) - 保证 \(f_q - m/k \leq \hat{f}_q \leq f_q\)——永远不会高估,最多低估 \(m/k\) - 确定性算法,无随机性

局限在于误差与流长度 \(m\) 相关。如果你想要 \(\varepsilon m\) 的误差保证,需要 \(k = 1/\varepsilon\)

Count-Min Sketch

Count-Min Sketch(Cormode & Muthukrishnan, 2005)用 \(d\) 个独立哈希函数将元素映射到 \(d \times w\) 的计数矩阵,查询时取所有行对应位置的最小值:

\[\hat{f}_q = \min_{j=1}^{d} \text{table}[j][h_j(q)]\]

性质: - 空间 \(O(w \cdot d \cdot \log n)\),取 \(w = \lceil e/\varepsilon \rceil\)\(d = \lceil \ln(1/\delta) \rceil\) - 保证 \(f_q \leq \hat{f}_q \leq f_q + \varepsilon m\),概率 \(\geq 1 - \delta\) - 仅在现金登记模型下有单侧误差保证(永远不低估)

Space-Saving 算法

Space-Saving(Metwally, 2005)在 Misra-Gries 的基础上改进:当表满且新元素不在表中时,替换计数最小的条目,并将其计数 +1。

这个小改动带来了两个重要的好处: 1. 它自然地给出了 top-k 的候选列表(表中当前的元素)。 2. 每个元素的频率都有明确的上下界。

三者对比

特性 Misra-Gries Count-Min Sketch Space-Saving
类型 确定性 随机化 确定性
误差方向 低估 高估 上下界都有
空间 \(O(1/\varepsilon)\) 个条目 \(O(1/\varepsilon \times \log(1/\delta))\) \(O(1/\varepsilon)\) 个条目
可合并 需要额外处理 天然可加 需要额外处理
点查询
Top-k 间接 需要堆辅助 天然支持
适用模型 现金登记 现金登记/十字转门 现金登记

在实际系统中,Count-Min Sketch 因其可合并性在分布式环境中最受欢迎(多个节点分别维护 sketch,最后逐元素求和即可合并)。Space-Saving 则在单机 top-k 场景中效果最好。

六、分位数估计

分位数问题:给定数据流,估计第 \(\phi\) 分位数(排序后第 \(\lceil \phi \cdot n \rceil\) 个元素)。详细讨论见第 32 篇 t-digest

GK Sketch

Greenwald-Khanna(2001)sketch 是分位数估计的经典确定性算法。它维护一组”摘要元素”\((v_i, g_i, \Delta_i)\),其中: - \(v_i\) 是观察到的数据值 - \(g_i\)\(v_i\)\(v_{i-1}\) 之间的最小排名差 - \(\Delta_i\)\(v_i\) 的排名的最大不确定性

空间\(O((1/\varepsilon) \cdot \log(\varepsilon n))\) 个条目

保证:对任意 \(\phi\),返回的值的排名在 \([\phi n - \varepsilon n, \phi n + \varepsilon n]\) 内。

t-digest

t-digest(Dunning, 2019)是面向工程的分位数估计算法,其核心思想是对尾部分位数(P99、P999)给予更多的精度。

它维护一组质心(centroid),每个质心有均值和权重。插入新数据时,优先合并到最近的质心,但限制每个质心的大小——越靠近 0 或 1 分位数的质心允许的大小越小。

这个”不均匀精度”的设计来自一个实际观察:运维人员最关心的是极端分位数(P99、P99.9),而不是中位数。

KLL Sketch

Karnin-Lang-Liberty(2016)sketch 实现了接近最优的空间复杂度 \(O((1/\varepsilon) \cdot \log\log(1/\delta))\),采用”分层压缩(compaction)“的策略。

KLL 维护 \(O(\log(1/(\varepsilon\delta)))\) 层缓冲区。最底层直接存储到达的元素,当缓冲区满时,排序后每隔一个取一个(随机选择偶数或奇数位置),“压缩”到上一层。上层的每个元素代表的权重是下层的两倍。

分位数算法对比

算法 空间 误差保证 可合并 尾部精度
GK \(O(\frac{1}{\varepsilon}\log(\varepsilon n))\) 确定性 \(\varepsilon n\) 复杂 均匀
t-digest 经验性 无严格理论保证 极好
KLL \(O(\frac{1}{\varepsilon}\log\log\frac{1}{\delta})\) 概率性 均匀
DDSketch \(O(\frac{1}{\varepsilon}\log\frac{u}{l})\) 相对误差确定性

工程选择建议:如果需要严格理论保证且可合并,选 KLL;如果关注尾部精度且实现简单,选 t-digest;如果需要确定性保证且不需要合并,选 GK。

七、采样与相似度

水塘抽样

水塘抽样(Reservoir Sampling)解决的问题是:从未知长度的数据流中均匀随机抽取 \(k\) 个样本。详见第 33 篇

经典的 Vitter 算法(Algorithm R): - 前 \(k\) 个元素直接放入水塘 - 第 \(i\) 个元素(\(i > k\))以概率 \(k/i\) 替换水塘中随机一个元素

优先采样(Priority Sampling)是水塘抽样的推广:给每个元素赋予随机优先级 \(r_i = \text{Uniform}(0, 1)\),保留优先级最高的 \(k\) 个。它的优点是可以估计子集的聚合值(例如”来自某个前缀的 IP 的总流量”)。

MinHash 与 SimHash

MinHash 估计集合的 Jaccard 相似度,SimHash 估计向量的余弦相似度。详见第 34 篇

MinHash:对集合的每个元素计算哈希值,取最小值。两个集合的 MinHash 相同的概率等于它们的 Jaccard 相似度 \(J(A, B) = |A \cap B| / |A \cup B|\)

SimHash(Charikar, 2002):将高维向量映射为一个位串。选择一个随机超平面 \(r\),设 \(b = \text{sign}(\langle v, r \rangle)\)。两个向量的 SimHash 位不同的比例等于它们之间角度的 \(\theta/\pi\)

在流式场景中,这两种技术用于在线去重(near-duplicate detection)和相似度搜索。

八、图流算法

图流是流式算法中最具挑战性的分支之一。数据流由图的边 \((u, v)\) 构成,算法在一遍扫描后需要回答关于图结构的问题。

问题的困难性

图流算法面临的根本困难是:图的大多数性质(连通性、最短路径、匹配等)本质上是全局性质,而流式算法只能维护局部信息。

下界示例:在单遍流模型下,精确判断图是否连通需要 \(\Omega(n \log n)\) 空间(\(n\) 是顶点数),这几乎等于存储整棵生成树。

连通分量:半流模型

半流模型(Semi-Streaming Model)放宽空间约束到 \(O(n \cdot \text{polylog}(n))\)——可以存储大约一棵生成树的信息,但远小于边数 \(O(n^2)\)

在半流模型下,连通分量可以用以下简洁的算法计算:

class SemiStreamingCC:
    """半流模型下的连通分量算法。

    维护一棵生成森林。空间 O(n log n)。
    """

    def __init__(self, n: int):
        self.parent = list(range(n))
        self.rank = [0] * n

    def find(self, x: int) -> int:
        while self.parent[x] != x:
            self.parent[x] = self.parent[self.parent[x]]  # 路径压缩
            x = self.parent[x]
        return x

    def union(self, u: int, v: int) -> bool:
        ru, rv = self.find(u), self.find(v)
        if ru == rv:
            return False
        if self.rank[ru] < self.rank[rv]:
            ru, rv = rv, ru
        self.parent[rv] = ru
        if self.rank[ru] == self.rank[rv]:
            self.rank[ru] += 1
        return True

    def process_edge(self, u: int, v: int):
        """处理流中的一条边。"""
        self.union(u, v)

    def query(self, u: int, v: int) -> bool:
        """查询 u 和 v 是否在同一个连通分量中。"""
        return self.find(u) == self.find(v)

    def num_components(self) -> int:
        return len(set(self.find(i) for i in range(len(self.parent))))

图素描:AGM sketch

Ahn-Guha-McGregor(2012)提出了一种基于线性 sketch 的图流算法,能在 \(O(n \cdot \text{polylog}(n))\) 空间内支持动态图流(边的插入和删除)。

核心思想是对每个顶点的邻接向量维护一个线性 sketch。连通分量的计算通过对 sketch 进行 \(O(\log n)\) 轮”压缩”来实现——每轮找到每个连通分量的一条出边,然后合并。

这是一个深刻的结果:它表明线性 sketch 不仅能处理频率向量上的统计量,还能处理图的结构性质。

图流中的其他问题

问题 模型 已知最优空间 算法
连通分量 半流/插入 \(O(n \log n)\) Union-Find
连通分量 动态流 \(O(n \cdot \text{polylog}(n))\) AGM sketch
二部图判定 半流/插入 \(O(n \log n)\) 染色
最大匹配 半流/插入 \(O(n \cdot \text{polylog}(n))\) 贪心/局部搜索
三角形计数 半流 \(\Omega(m)\)(精确) 采样估计
最短路径 半流 \(\Theta(n^{1+1/k})\) for \((2k-1)\)-近似 Spanners

图流算法是一个活跃的研究领域。我认为它最令人兴奋的地方在于,它迫使我们重新思考”什么图性质是局部可计算的”——这个问题的答案往往出人意料。

九、滑动窗口模型

前面讨论的流式算法都假设统计量是关于整个数据流的。但在实际监控中,我们通常只关心最近 \(W\) 个元素——这就是滑动窗口模型。

为什么滑动窗口更难

在无限窗口模型中,元素”只进不出”。在滑动窗口模型中,元素会”过期”——第 \(i\) 个元素在时刻 \(i + W\) 后就不再属于当前窗口。这意味着算法不仅要处理新到达的元素,还要处理旧元素的失效。

如果直接维护一个大小为 \(W\) 的环形缓冲区,空间就是 \(O(W)\)——对于大窗口来说太大了。我们的目标是用 \(o(W)\) 的空间在滑动窗口上维护统计量。

DGIM:滑动窗口上的比特计数

Datar-Gionis-Indyk-Motwani(2002)提出了 DGIM 算法,用于在滑动窗口上计数比特流中 1 的个数。

问题:比特流 \(b_1, b_2, \ldots\) 持续到达,在任意时刻查询”最近 \(W\) 个比特中有多少个 1”。

DGIM 的核心数据结构:维护一组”桶”(bucket),每个桶记录: - 时间戳:桶中最新的 1 的到达时间 - 大小:桶中包含的 1 的个数,必须是 2 的幂

不变量:桶按时间戳降序排列。大小为 \(2^j\) 的桶的个数要么是 1 个,要么是 2 个(每种大小最多 2 个,最少 1 个,最大的那种除外)。

更新规则: 1. 新比特到达。如果是 0,什么都不做。 2. 如果是 1,创建一个大小为 1 的新桶。 3. 如果大小为 1 的桶超过了 2 个,合并最旧的两个为一个大小为 2 的桶。 4. 递归检查大小为 2 的桶是否超过 2 个,以此类推。 5. 删除时间戳超出窗口 \(W\) 的桶。

查询:将窗口内所有桶的大小求和,但最旧那个桶只计一半。

精度保证:误差不超过最旧桶大小的一半,最终相对误差不超过 50%。通过允许每种大小最多 \(r\) 个桶(而不是 2 个),可以把误差降到 \(1/(r-1)\)

空间\(O((1/\varepsilon) \cdot \log^2 W)\) 位。

DGIM 的完整实现

from collections import deque

class DGIMBucket:
    """DGIM 桶。"""
    __slots__ = ['timestamp', 'size']

    def __init__(self, timestamp: int, size: int):
        self.timestamp = timestamp
        self.size = size

    def __repr__(self):
        return f"Bucket(t={self.timestamp}, size={self.size})"


class DGIM:
    """DGIM 算法:滑动窗口上的比特计数。

    空间:O((1/epsilon) * log^2(W))
    误差:最多 1/(max_same - 1) 的相对误差
    """

    def __init__(self, window_size: int, max_same: int = 2):
        """
        Args:
            window_size: 滑动窗口大小 W
            max_same: 每种大小最多允许的桶数(默认 2,误差约 50%)
                      设为 r 则误差约 1/(r-1)
        """
        self.W = window_size
        self.max_same = max_same
        self.buckets: deque[DGIMBucket] = deque()
        self.time = 0

    def _expire(self):
        """移除窗口外的桶。"""
        while self.buckets and self.buckets[0].timestamp <= self.time - self.W:
            self.buckets.popleft()

    def _merge_if_needed(self):
        """从最新的桶开始检查,合并同大小超额的桶。"""
        # 从右(最新)到左(最旧)扫描
        if len(self.buckets) < 2:
            return

        i = len(self.buckets) - 1
        while i >= 2:
            # 统计从 i 向左连续同大小桶的个数
            current_size = self.buckets[i].size
            count = 0
            j = i
            while j >= 0 and self.buckets[j].size == current_size:
                count += 1
                j -= 1

            if count > self.max_same:
                # 合并最旧的两个同大小桶
                oldest_idx = j + 1
                second_oldest_idx = j + 2
                merged = DGIMBucket(
                    self.buckets[second_oldest_idx].timestamp,
                    current_size * 2
                )
                # 删除两个旧桶,插入一个新桶
                del self.buckets[second_oldest_idx]
                del self.buckets[oldest_idx]
                self.buckets.insert(oldest_idx, merged)
                i = oldest_idx  # 重新从合并位置检查
            else:
                # 跳到下一个不同大小的位置
                i = j
                if i < 0:
                    break

    def add_bit(self, bit: int):
        """处理一个新比特。"""
        self.time += 1
        self._expire()

        if bit == 1:
            self.buckets.append(DGIMBucket(self.time, 1))
            self._merge_if_needed()

    def query(self) -> int:
        """估计当前窗口中 1 的个数。"""
        self._expire()
        if not self.buckets:
            return 0

        total = 0
        for bucket in self.buckets:
            total += bucket.size

        # 最旧的桶可能只有部分在窗口内,取其大小的一半
        if self.buckets:
            total -= self.buckets[0].size // 2

        return total

    def get_buckets(self):
        """返回当前桶的状态(调试用)。"""
        return [(b.timestamp, b.size) for b in self.buckets]


# 使用示例
def demo_dgim():
    dgim = DGIM(window_size=100, max_same=3)  # 误差约 50%

    import random
    random.seed(42)

    bits = [random.randint(0, 1) for _ in range(500)]
    for bit in bits:
        dgim.add_bit(bit)

    estimated = dgim.query()
    actual = sum(bits[-100:])

    print(f"窗口大小: 100")
    print(f"实际 1 的个数: {actual}")
    print(f"DGIM 估计: {estimated}")
    print(f"绝对误差: {abs(estimated - actual)}")
    print(f"相对误差: {abs(estimated - actual) / max(actual, 1) * 100:.1f}%")
    print(f"桶状态: {dgim.get_buckets()}")


if __name__ == "__main__":
    demo_dgim()

指数直方图

Exponential Histogram 是 DGIM 的推广,可以处理非比特流——每个元素有一个非负整数值。它的思想与 DGIM 相同,只是桶的大小不再限制为 2 的幂,而是允许任意值(但仍保持指数增长)。

滑动窗口上的其他问题

问题 空间 方法
比特计数 \(O(\frac{1}{\varepsilon}\log^2 W)\) DGIM
求和 \(O(\frac{1}{\varepsilon}\log^2 W)\) 指数直方图
\(F_0\)(基数) \(O(\frac{1}{\varepsilon^2}\log^2 W)\) DGIM + HLL
频率矩 \(O(\frac{1}{\varepsilon^2}\text{polylog}(W))\) Smooth Histogram
分位数 \(O(\frac{1}{\varepsilon}\log^2 W)\) GK + 过期管理

Braverman 和 Ostrovsky(2007)提出了 Smooth Histogram 框架,将任何”光滑”(smooth)的聚合函数的流式算法自动转化为滑动窗口版本。一个函数是光滑的,如果在移除前缀时,函数值的变化是渐进的而非突变的。\(F_2\)\(F_0\) 和频率矩都是光滑函数。

十、统一视角:Sketch 即线性投影

线性 Sketch 的定义

大多数流式 sketch 可以统一理解为一种操作:用一个随机矩阵 \(\Phi \in \mathbb{R}^{s \times n}\) 对频率向量 \(f \in \mathbb{R}^n\) 做线性投影:

\[\text{sketch}(f) = \Phi \cdot f \in \mathbb{R}^s\]

其中 \(s \ll n\) 是 sketch 的大小。每当元素 \(i\) 到达,sketch 更新为:

\[\text{sketch} \leftarrow \text{sketch} + \Phi \cdot e_i\]

其中 \(e_i\) 是第 \(i\) 个标准基向量。这等价于把 \(\Phi\) 的第 \(i\) 列加到 sketch 上。

各算法的投影矩阵

算法 \(\Phi\) 的结构 非零模式
Count-Min Sketch \(\Phi_{(j,h_j(i))} = 1\) 每行每列恰好一个非零元素
Count Sketch \(\Phi_{(j,h_j(i))} = s_j(i)\)\(\pm 1\) 同上但带符号
AMS Sketch 每个元素 \(\Phi_{(j,i)} = s_j(i)\)\(\pm 1\) 稠密
Random Projection \(\Phi_{ij} \sim N(0,1/s)\) 全稠密
Sparse JL \(\Phi_{ij} \in \{0, \pm 1/\sqrt{s'}\}\) 稀疏

线性性的三大好处

  1. 可合并性(Mergeability)\(\text{sketch}(f + g) = \text{sketch}(f) + \text{sketch}(g)\)。这意味着多个节点分别维护 sketch 后可以简单相加。在 MapReduce、Flink 等分布式系统中至关重要。

  2. 支持十字转门模型:由于线性性,元素的”删除”等价于减去对应列,天然支持频率增减。

  3. 隐私保护:线性 sketch 天然满足差分隐私的某些变体——添加适当的噪声后,sketch 不会泄露任何单个元素的信息。

线性 Sketch 的局限

并非所有问题都能用线性 sketch 高效解决。例如:

我认为”sketch = 线性投影”是理解流式算法最重要的心智模型。一旦你意识到这一点,很多看似不同的算法突然变成了同一个主题的变奏——唯一的区别是投影矩阵的选择。

十一、大数据系统中的应用

流式算法不仅是理论上的优雅结果,它们已经深深嵌入到现代大数据基础设施中。

Flink 是目前最流行的流处理引擎之一。它内置了多种流式算法的实现:

// Flink 中使用 HyperLogLog 统计独立用户数
DataStream<String> userIds = ...;

userIds
    .keyBy(uid -> "global")
    .process(new ProcessFunction<String, Long>() {
        private transient HyperLogLogPlus hll;

        @Override
        public void open(Configuration params) {
            hll = new HyperLogLogPlus(14);  // 2^14 个桶
        }

        @Override
        public void processElement(String uid, Context ctx, Collector<Long> out) {
            hll.offer(uid);
            out.collect(hll.cardinality());
        }
    });

Flink 的 State Backend 天然支持 sketch 的检查点和恢复——sketch 的状态可以序列化后存储到 RocksDB 或文件系统中。

Apache Spark Streaming

Spark Streaming(及 Structured Streaming)通过 approxQuantileapproxCountDistinct 等 API 暴露流式算法:

# Spark 中使用近似去重计数
df.select(approx_count_distinct("user_id", rsd=0.05))

# 近似分位数
df.stat.approxQuantile("latency", [0.5, 0.95, 0.99], 0.01)

Spark 的 approx_count_distinct 底层使用 HyperLogLog++。approxQuantile 使用 Greenwald-Khanna sketch。

Kafka Streams

Kafka Streams 提供了有状态的流处理,可以在 State Store 中维护 sketch:

// 概念性代码:在 Kafka Streams 中维护 CMS
builder.stream("events")
    .groupByKey()
    .aggregate(
        () -> new CountMinSketch(0.001, 0.01),
        (key, value, cms) -> { cms.add(value); return cms; },
        Materialized.as("cms-store")
    );

Redis 的概率数据结构

Redis 从 4.0 版本开始通过模块系统支持概率数据结构,后来在 Redis Stack 中原生集成:

# HyperLogLog
PFADD visitors "user:1001"
PFADD visitors "user:1002"
PFCOUNT visitors

# Count-Min Sketch(Redis Stack)
CMS.INITBYPROB urls 0.001 0.01
CMS.INCRBY urls "/index.html" 1
CMS.QUERY urls "/index.html"

# Top-K(Redis Stack)
TOPK.RESERVE popular_pages 10 50 3 0.9
TOPK.ADD popular_pages "/index.html" "/about.html"
TOPK.LIST popular_pages

ClickHouse 的流式聚合

ClickHouse 在 SQL 层面直接暴露了多种流式算法:

-- HyperLogLog
SELECT uniqHLL12(user_id) FROM events;

-- 分位数估计(t-digest)
SELECT quantileTDigest(0.99)(latency) FROM requests;

-- 分位数估计(DDSketch)
SELECT quantileDD(0.01)(latency) FROM requests;

-- 频率估计
SELECT topK(10)(url) FROM access_log;

系统集成的工程要点

在将流式算法集成到生产系统时,有几个工程问题需要特别注意:

  1. 序列化/反序列化:sketch 必须支持高效的序列化,因为在分布式系统中需要频繁传输和持久化。HyperLogLog 的序列化非常简单(就是一个 byte 数组),Count-Min Sketch 也不难(二维数组),但 t-digest 的序列化需要更多考虑(质心列表的排序和压缩)。

  2. 合并语义:在 MapReduce 和窗口聚合中,sketch 需要合并。线性 sketch(CMS、AMS)可以直接逐元素相加。HyperLogLog 取逐桶最大值。t-digest 需要把两组质心合并后重新压缩。

  3. 精度与空间的权衡:在资源受限的环境中(如边缘计算设备),需要仔细调优 sketch 的参数。下表给出了常见配置:

算法 典型配置 空间 相对误差
HyperLogLog \(p = 14\)(16384 桶) 12 KB 0.81%
Count-Min Sketch \(w = 2048, d = 5\) 40 KB \(\varepsilon \approx 0.0013\)
t-digest \(\delta = 100\) ~10 KB P99 < 1%
KLL \(k = 200\) ~4 KB \(\varepsilon \approx 0.01\)

十二、工程陷阱与最佳实践

常见陷阱一览

陷阱 症状 原因 解决方案
哈希函数质量差 估计值偏差大、方差高 哈希冲突过多 使用 MurmurHash3、xxHash 等高质量哈希
忘记处理空流 除零错误或返回 NaN 调和平均在全零时发散 添加空流检查,返回 0
sketch 参数未调优 精度不达标或内存浪费 默认参数不适合具体场景 根据误差需求和数据分布选择参数
合并不同参数的 sketch 程序崩溃或静默错误 维度不匹配 合并前校验参数一致性
十字转门模型下用 CMS 负频率导致错误 CMS 假设非负频率 改用 Count Sketch
滑动窗口未过期 统计量反映全量而非窗口 忘记删除过期桶/元素 严格实现过期逻辑
序列化字节序不一致 跨平台合并结果错误 大端/小端差异 固定字节序(通常小端)
浮点精度累积误差 长时间运行后估计偏移 浮点加法的舍入误差 定期重建 sketch 或使用整数算术
并发更新无保护 计数丢失或数据损坏 多线程竞争 per-thread sketch + 定期合并
哈希种子硬编码 攻击者可构造最坏输入 种子可预测 随机生成种子,重启后更换

选型决策树

面对一个流式计算需求,可以按以下思路选择算法:

你需要计算什么?
├── 不同元素个数 → HyperLogLog
├── 元素频率
│   ├── 点查询(某个元素出现了几次)→ Count-Min Sketch
│   ├── 内积/F2 估计 → AMS / Count Sketch
│   └── Top-k(最频繁的 k 个)→ Space-Saving
├── 分位数
│   ├── 关心尾部精度(P99/P999)→ t-digest / DDSketch
│   └── 需要严格理论保证 → KLL / GK
├── 随机样本 → 水塘抽样
├── 集合相似度 → MinHash(Jaccard)/ SimHash(余弦)
├── 滑动窗口上的计数 → DGIM / 指数直方图
└── 图的连通性 → 半流 Union-Find / AGM sketch

性能调优建议

  1. 批量更新:不要逐条更新 sketch,攒一批后批量处理。这能利用 CPU 缓存局部性和 SIMD 指令。

  2. SIMD 加速哈希:现代 CPU 支持 CRC32、AES-NI 等指令集,可以极大加速哈希计算。ClickHouse 的 HyperLogLog 实现就使用了 CRC32 指令。

  3. 内存对齐:sketch 的数组应按 cache line(64 字节)对齐,避免伪共享(false sharing)问题。

  4. 分层 sketch:在热门元素和冷门元素差异巨大的场景中,可以用一个小的精确计数器处理热门元素(Frequent 算法识别),用 sketch 处理其余元素。

十三、个人观点:流式思维作为设计哲学

流式算法的核心约束——单遍扫描、亚线性空间——最初是由物理限制驱动的。磁带只能顺序读取,内存装不下所有数据。但我认为,即使在现代计算环境中内存变得越来越大,流式思维仍然有深刻的指导意义。

近似是一种特性,不是缺陷

精确计算往往是过度设计。你真的需要知道今天的独立用户数是 12,847,293 而不是”大约 1285 万”吗?大多数情况下,0.5% 的误差对业务决策没有任何影响。

HyperLogLog 用 12KB 空间就能以 0.8% 的误差统计上亿的独立元素。一个精确的 HashSet 需要数 GB。从信息论的角度看,HyperLogLog 是一种极其高效的信息压缩——它只保留你真正需要的信息量级,丢弃你不需要的精确细节。

这种”有损但有界”的思维方式应该更广泛地应用于系统设计中。

可合并性是分布式系统的核心需求

我在前面反复强调 sketch 的可合并性,因为它直接对应于分布式计算的核心操作——reduce。任何可合并的数据结构都天然适配 MapReduce、流处理、联邦学习等分布式范式。

反过来,当你设计一个数据结构时,如果它不可合并,你就很难把它用在分布式环境中。这是一个设计约束,而非事后添加的特性。

空间-精度-时间的三角不等式

流式算法的理论告诉我们,这三者之间存在根本性的权衡:

这个三角关系在系统设计中无处不在。缓存、索引、采样、预计算——都是在这三个维度上做权衡。

流式算法对软件架构的启示

  1. 事件驱动架构天然是流式的——消息到达后被处理一次然后丢弃。
  2. CQRS(命令查询分离)中的查询端可以用 sketch 来加速——写入侧精确记录,查询侧近似汇总。
  3. 边缘计算中的设备资源极其有限,流式算法是唯一可行的统计手段。
  4. 实时机器学习中的特征计算(频率、基数、分位数)天然适合用 sketch 来维护。

我认为,理解流式算法不仅仅是学会几个数据结构,更重要的是内化一种思维方式:在信息不完全的约束下,用有限的资源做出足够好的决策。这种思维方式,从本质上说,就是工程学的精髓。

参考文献

  1. Alon, N., Matias, Y., & Szegedy, M. (1999). “The Space Complexity of Approximating the Frequency Moments.” Journal of Computer and System Sciences, 58(1), 137-147.
  2. Flajolet, P., & Martin, G. N. (1985). “Probabilistic Counting Algorithms for Data Base Applications.” Journal of Computer and System Sciences, 31(2), 182-209.
  3. Flajolet, P., Fusy, É., Gandouet, O., & Meunier, F. (2007). “HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm.” DMTCS Proceedings.
  4. Cormode, G., & Muthukrishnan, S. (2005). “An Improved Data Stream Summary: The Count-Min Sketch and its Applications.” Journal of Algorithms, 55(1), 58-75.
  5. Misra, J., & Gries, D. (1982). “Finding Repeated Elements.” Science of Computer Programming, 2(2), 143-152.
  6. Metwally, A., Agrawal, D., & El Abbadi, A. (2005). “Efficient Computation of Frequent and Top-k Elements in Data Streams.” ICDT 2005.
  7. Greenwald, M., & Khanna, S. (2001). “Space-Efficient Online Computation of Quantile Summaries.” SIGMOD 2001.
  8. Dunning, T., & Ertl, O. (2019). “Computing Extremely Accurate Quantiles Using t-Digests.” arXiv preprint.
  9. Karnin, Z., Lang, K., & Liberty, E. (2016). “Optimal Quantile Approximation in Streams.” FOCS 2016.
  10. Datar, M., Gionis, A., Indyk, P., & Motwani, R. (2002). “Maintaining Stream Statistics over Sliding Windows.” SIAM Journal on Computing, 31(6), 1794-1813.
  11. Ahn, K. J., Guha, S., & McGregor, A. (2012). “Analyzing Graph Structure via Linear Measurements.” SODA 2012.
  12. Braverman, V., & Ostrovsky, R. (2007). “Smooth Histograms for Sliding Windows.” FOCS 2007.
  13. Vitter, J. S. (1985). “Random Sampling with a Reservoir.” ACM Transactions on Mathematical Software, 11(1), 37-57.
  14. Charikar, M. S. (2002). “Similarity Estimation Techniques from Rounding Algorithms.” STOC 2002.
  15. Indyk, P., & Woodruff, D. (2005). “Optimal Approximations of the Frequency Moments of Data Streams.” STOC 2005.
  16. Kushilevitz, E., & Nisan, N. (1997). Communication Complexity. Cambridge University Press.
  17. Muthukrishnan, S. (2005). “Data Streams: Algorithms and Applications.” Foundations and Trends in Theoretical Computer Science, 1(2), 117-236.

算法系列导航上一篇:频率估计的理论极限 | 下一篇:k-d tree

相关阅读HyperLogLog | Count-Min Sketch | t-digest


By .