土法炼钢兴趣小组的算法知识备份

频率估计的理论极限:Space-Saving 与 Misra-Gries

目录

搜索引擎要找出过去一小时最热门的 100 个查询词。网络交换机要在 10Gbps 线速下识别占总带宽 1% 以上的”大象流”。推荐系统要实时统计用户最常点击的商品类别。

这些问题有一个共同的数学本质:在一个长度为 n 的数据流中,用远小于 n 的内存空间,找出频率超过某个阈值的元素。这就是 Heavy Hitter 问题,也叫频繁项(frequent items)问题。

如果内存无限,答案很简单——用哈希表精确计数。但当数据流的不同元素数量是 10 亿级别,而你只有几十 KB 的内存时,精确计数不可能了。你能做的最好程度是什么?

本文将深入分析三个经典的确定性流式算法:Misra-Gries(1982)、Lossy Counting(2002)和 Space-Saving(2005),证明它们的误差界,给出完整的 C 语言实现,并与上一篇讨论的概率性方法 Count-Min Sketch 进行系统对比。最后,我们会证明 O(1/ε) 的空间下界——这些算法已经是最优的。

Space-Saving 的 Stream-Summary 数据结构

一、Heavy Hitter 问题的形式化定义

1.1 问题定义

给定一个长度为 n 的数据流 \(\sigma = a_1, a_2, \ldots, a_n\),其中每个元素 \(a_i\) 来自一个大小为 \(|\mathcal{U}|\) 的全集。定义元素 \(e\) 的真实频率为 \(f_e = |\{i : a_i = e\}|\)

ε-Heavy Hitter 问题:给定参数 \(\varepsilon \in (0, 1)\),算法需要输出一个集合 \(S\),满足: - 无遗漏(No false negatives):若 \(f_e > \varepsilon n\),则 \(e \in S\) - 有限误报(Bounded false positives):若 \(e \in S\),则 \(f_e > 0\)

更强的版本还要求同时输出频率估计值 \(\hat{f}_e\),满足 \(|f_e - \hat{f}_e| \leq \varepsilon n\)

1.2 为什么精确计数不够好

如果全集 \(|\mathcal{U}|\) 很大(比如 IPv4 地址有 \(2^{32}\) 个,IPv6 有 \(2^{128}\) 个),精确计数需要 \(O(|\mathcal{U}|)\) 的空间。而流式算法的目标是用 \(O(1/\varepsilon)\) 的空间来解决问题——这与全集大小无关。

1.3 一个直觉

考虑 \(\varepsilon = 1/k\),即我们要找频率超过 \(n/k\) 的元素。这样的元素最多有 \(k-1\) 个(因为 \(k\) 个频率超过 \(n/k\) 的元素的频率之和会超过 \(n\))。所以,\(k-1\) 个计数器就有可能”覆盖”所有的 heavy hitter。问题是:怎么决定这 \(k-1\) 个计数器分别追踪谁?

二、Misra-Gries 算法:消消乐的智慧

Misra-Gries 算法是 1982 年由 J. Misra 和 D. Gries 提出的,是最早的确定性流式 heavy hitter 算法。Boyer-Moore 的多数投票算法(Majority Vote,1981)可以看作是 \(k=2\) 的特例。

2.1 算法描述

算法维护一个关联数组 \(A\),最多包含 \(k-1\) 个 (元素, 计数) 对。处理流中每个到达的元素 \(e\)

MisraGries-Process(e):
    if e ∈ A:
        A[e] ← A[e] + 1          // 已追踪,递增
    else if |A| < k - 1:
        A[e] ← 1                  // 有空位,新建
    else:
        for each (e', c) ∈ A:     // 没有空位,全体减一
            A[e'] ← c - 1
            if A[e'] = 0:
                remove e' from A

查询元素 \(e\) 的频率时,如果 \(e \in A\),返回 \(A[e]\);否则返回 0。

这个”全体减一”的操作就是”消消乐”——到达的新元素与所有已追踪的元素同归于尽,每人损失 1 点计数。

2.2 正确性证明

定理:Misra-Gries 算法对每个元素 \(e\) 的频率估计 \(\hat{f}_e\) 满足:

\[f_e - \frac{n}{k} \leq \hat{f}_e \leq f_e\]

证明

首先证明 \(\hat{f}_e \leq f_e\)(不会过估计)。这很直观:元素 \(e\) 的计数器只有在 \(e\) 到达时才会递增,而每次最多递增 1。所以 \(\hat{f}_e\) 最多等于 \(e\) 的真实到达次数 \(f_e\)

接下来证明 \(f_e - n/k \leq \hat{f}_e\)。关键观察是”消消乐”的总次数。每次消消乐操作恰好让 \(k\) 个计数(\(k-1\) 个已有的和 1 个新到来的)各减少 1。设消消乐总共发生了 \(D\) 次。那么所有元素的计数损失总和为 \(kD\)

由于流的长度为 \(n\),且每个到达的元素要么让某个计数器增加 1,要么触发一次消消乐(该元素被消耗),我们有:

\[\sum_{e \in A} \hat{f}_e + kD = n\]

由此得 \(D \leq n/k\)

对于任意元素 \(e\),它的计数器减少的总量最多为 \(D\)(每次消消乐最多减 1)。再加上 \(e\) 不在 \(A\) 中时被消耗的次数也最多是 \(D\)。因此:

\[f_e - \hat{f}_e \leq D \leq \frac{n}{k}\]

推论:每个频率超过 \(n/k\) 的元素一定在 \(A\) 中。因为如果 \(e\) 不在 \(A\) 中,则 \(\hat{f}_e = 0\),但 \(f_e - 0 \leq n/k\) 意味着 \(f_e \leq n/k\),矛盾。

2.3 消消乐的另一种理解

想象数据流中的元素是不同颜色的石头。每次消消乐等价于:取出 \(k\) 个不同颜色的石头,全部丢弃。

在所有消消乐操作完成后,剩余的石头就是 \(A\) 中的元素。如果某种颜色的石头原来有超过 \(n/k\) 个,那么即使每次消消乐都消耗一个这种颜色的石头,也消耗不完——因为消消乐的总次数不超过 \(n/k\)

2.4 减少操作的优化

原始版本中,每次消消乐需要遍历所有 \(k-1\) 个计数器。可以优化为:当新元素到达且没有空位时,不立即执行全体减一,而是记录一个”全局偏移量” \(\delta\),将当前元素设为计数 \(1 + \delta\)。当需要清理时,再批量扫描删除计数等于 \(\delta\) 的条目。

但在实际中,\(k\) 通常不大(几百到几千),直接遍历反而更简单高效。

2.5 C 语言实现

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>

/* ═══════════════════════════════════════════════════════════════
 * Misra-Gries 算法
 * k-1 个计数器找出频率超过 n/k 的元素
 * ═══════════════════════════════════════════════════════════════ */

#define MG_MAX_COUNTERS 1024

typedef struct {
    uint64_t key;
    int64_t  count;
    int      active;
} mg_entry_t;

typedef struct {
    mg_entry_t entries[MG_MAX_COUNTERS];
    int        k;           /* 参数 k:找频率 > n/k 的元素 */
    int        num_active;  /* 当前活跃的计数器数量 */
    int64_t    n;           /* 已处理元素总数 */
} misra_gries_t;

void mg_init(misra_gries_t *mg, int k)
{
    memset(mg, 0, sizeof(*mg));
    mg->k = k;
    if (k - 1 > MG_MAX_COUNTERS) {
        fprintf(stderr, "k-1 exceeds MG_MAX_COUNTERS\n");
        exit(1);
    }
}

/* 在计数器数组中查找 key,返回索引;未找到返回 -1 */
static int mg_find(const misra_gries_t *mg, uint64_t key)
{
    for (int i = 0; i < mg->k - 1; i++) {
        if (mg->entries[i].active && mg->entries[i].key == key)
            return i;
    }
    return -1;
}

/* 找一个空闲槽位 */
static int mg_find_free(const misra_gries_t *mg)
{
    for (int i = 0; i < mg->k - 1; i++) {
        if (!mg->entries[i].active)
            return i;
    }
    return -1;
}

void mg_process(misra_gries_t *mg, uint64_t key)
{
    mg->n++;

    /* 情况 1:已追踪 */
    int idx = mg_find(mg, key);
    if (idx >= 0) {
        mg->entries[idx].count++;
        return;
    }

    /* 情况 2:有空位 */
    int free = mg_find_free(mg);
    if (free >= 0) {
        mg->entries[free].key    = key;
        mg->entries[free].count  = 1;
        mg->entries[free].active = 1;
        mg->num_active++;
        return;
    }

    /* 情况 3:消消乐——全体减一 */
    for (int i = 0; i < mg->k - 1; i++) {
        if (mg->entries[i].active) {
            mg->entries[i].count--;
            if (mg->entries[i].count == 0) {
                mg->entries[i].active = 0;
                mg->num_active--;
            }
        }
    }
}

int64_t mg_query(const misra_gries_t *mg, uint64_t key)
{
    int idx = mg_find(mg, key);
    return (idx >= 0) ? mg->entries[idx].count : 0;
}

/* 获取误差上界 */
int64_t mg_error_bound(const misra_gries_t *mg)
{
    return mg->n / mg->k;
}

三、Space-Saving 算法:流式 Top-k 的最优解

Space-Saving 算法由 Metwally、Agrawal 和 El Abbadi 在 2005 年提出。它不仅解决了 heavy hitter 问题,还能同时回答”频率最高的 k 个元素是哪些”这种 top-k 查询。

3.1 核心思想

Space-Saving 维护恰好 \(k\) 个计数器。当新元素到达时: 1. 如果元素已被追踪,直接递增其计数; 2. 如果元素未被追踪,淘汰当前计数最小的元素,用新元素替换它,并将计数设为”被淘汰者的计数 + 1”。

关键洞察:被淘汰者的计数就是新元素频率的最大过估计量。因为新元素的真实频率至少为 1(刚到达了一次),所以继承的旧计数就是可能多算的部分。

3.2 Stream-Summary 数据结构

为了让所有操作达到 \(O(1)\) 时间复杂度,Space-Saving 使用了一个精巧的 Stream-Summary 数据结构:

  1. 有序桶双向链表:每个桶(Bucket)存储一个计数值 \(c\),以及所有当前计数为 \(c\) 的元素的链表。桶按计数值从小到大排列。

  2. 哈希表:将每个被追踪的元素映射到它所在桶中的链表节点。

  3. min 指针:指向计数最小的桶。

3.3 操作细节

已追踪元素到达: 1. 通过哈希表在 \(O(1)\) 时间找到该元素的节点,假设在 Bucket(\(c\)) 中。 2. 将节点从 Bucket(\(c\)) 的链表移除。如果 Bucket(\(c\)) 变空,删除它。 3. 如果 Bucket(\(c+1\)) 存在,将节点插入其链表;否则创建一个新桶 Bucket(\(c+1\)) 并插入。 4. 更新哈希表指针。

新元素到达(计数器已满): 1. 找到 min 桶,取出其中的一个元素(victim)。 2. 记录 \(\varepsilon_{new} = c_{min}\)(最大过估计量)。 3. 将 victim 从哈希表中移除。 4. 将新元素插入到 Bucket(\(c_{min} + 1\)),设置其过估计量为 \(c_{min}\)。 5. 在哈希表中注册新元素。

查询频率:返回元素所在桶的计数值。同时可以返回 \((count - \varepsilon, count]\) 作为置信区间。

查询 top-k:从最大桶开始,依次取出元素,直到取够 \(k\) 个。

所有操作的关键是:桶链表中相邻桶的计数值通常相差不大,移动元素时只需要检查相邻桶即可,因此是 \(O(1)\) 的。

3.4 误差分析

定理:Space-Saving 算法使用 \(k\) 个计数器时,对每个元素 \(e\) 的频率估计 \(\hat{f}_e\) 满足:

\[f_e \leq \hat{f}_e \leq f_e + \frac{n - f_e}{k} < f_e + \frac{n}{k}\]

注意与 Misra-Gries 的区别:Space-Saving 只会过估计,不会低估。这是因为替换时继承了旧计数。

证明

过估计量 \(\varepsilon_e\) 是元素 \(e\) 替换进来时被淘汰者的计数。此时该计数是全局最小的,设为 \(c_{min}\)

考虑替换发生时的状态:\(k\) 个计数器的计数之和为 \(\sum c_i\),且 \(c_{min} \leq \sum c_i / k\)。而 \(\sum c_i\) 等于到目前为止已处理的元素数(每个到达的元素恰好让总和增加 1)。到流末尾时,总和为 \(n\)

但更精确地说,在每次替换时,\(c_{min}\) 可以用当时的流前缀长度 \(n'\) 来界定:\(c_{min} \leq n'/k\)。因为真正重要的是最终的误差,我们直接用全局界:

\[\varepsilon_e \leq \frac{n - f_e}{k}\]

右侧的 \(n - f_e\)其他元素的出现次数。这个更紧的界的直觉是:\(e\) 自身的每次出现不会增加误差,只有其他元素到达导致的替换才可能引入误差。

3.5 与 Misra-Gries 的关系

虽然两者看起来不同,它们有深层联系:

特性 Misra-Gries Space-Saving
计数器数量 \(k-1\) \(k\)
误差方向 只会低估 只会过估计
误差界 \(f_e - n/k \leq \hat{f}_e \leq f_e\) \(f_e \leq \hat{f}_e \leq f_e + n/k\)
每次操作时间 \(O(k)\) 最坏,\(O(1)\) 摊还 \(O(1)\) 最坏
Top-k 查询 不直接支持 天然支持
过估计量追踪 是(每个元素记录 \(\varepsilon\)

实际上,如果在 Misra-Gries 的基础上给每个计数器增加一个”过估计量”字段,再将消消乐改为”替换最小计数器”,就得到了 Space-Saving。两者在理论上是等价的变体。

四、Lossy Counting:分桶计数

Lossy Counting 由 Manku 和 Motwani 在 2002 年提出,是另一种确定性流式频率估计算法。

4.1 算法描述

将数据流分为大小为 \(w = \lceil 1/\varepsilon \rceil\) 的桶(bucket),编号从 1 开始。当前桶编号 \(b_{current} = \lceil n_{so\_far} / w \rceil\)

算法维护一个表 \(D\),存储 (元素 \(e\), 频率 \(f\), 最大误差 \(\Delta\)) 三元组。

处理每个到达的元素 \(e\): 1. 如果 \(e\) 已在 \(D\) 中,令 \(f_e \leftarrow f_e + 1\)。 2. 否则,插入 \((e, 1, b_{current} - 1)\)\(D\) 中。

在每个桶边界(即处理完一个完整桶后),清理 \(D\):删除所有满足 \(f_e + \Delta_e \leq b_{current}\) 的条目。

4.2 误差分析

定理:Lossy Counting 保证: - 频率超过 \(\varepsilon n\) 的元素一定在 \(D\) 中。 - \(D\) 中每个元素的真实频率至少为 \(f_e\),至多为 \(f_e + \varepsilon n\)。 - \(D\) 中的条目数最多为 \((1/\varepsilon) \log(\varepsilon n)\)

空间复杂度为 \(O((1/\varepsilon) \log(\varepsilon n))\),比 Misra-Gries 和 Space-Saving 的 \(O(1/\varepsilon)\) 多了一个对数因子。这使得 Lossy Counting 在理论上不如前两者。

4.3 简化实现

/* ═══════════════════════════════════════════════════════════════
 * Lossy Counting 算法(简化实现)
 * ═══════════════════════════════════════════════════════════════ */

#define LC_MAX_ENTRIES 65536

typedef struct {
    uint64_t key;
    int64_t  freq;
    int64_t  delta;
    int      active;
} lc_entry_t;

typedef struct {
    lc_entry_t entries[LC_MAX_ENTRIES];
    int        num_active;
    int64_t    w;
    int64_t    n;
    int64_t    b_current;
    double     epsilon;
} lossy_counting_t;

void lc_init(lossy_counting_t *lc, double epsilon)
{
    memset(lc, 0, sizeof(*lc));
    lc->epsilon = epsilon;
    lc->w = (int64_t)(1.0 / epsilon) + 1;
    lc->b_current = 1;
}

static int lc_find(const lossy_counting_t *lc, uint64_t key)
{
    for (int i = 0; i < LC_MAX_ENTRIES; i++) {
        if (lc->entries[i].active && lc->entries[i].key == key)
            return i;
    }
    return -1;
}

static void lc_prune(lossy_counting_t *lc)
{
    for (int i = 0; i < LC_MAX_ENTRIES; i++) {
        if (lc->entries[i].active &&
            lc->entries[i].freq + lc->entries[i].delta <= lc->b_current) {
            lc->entries[i].active = 0;
            lc->num_active--;
        }
    }
}

void lc_process(lossy_counting_t *lc, uint64_t key)
{
    lc->n++;
    int idx = lc_find(lc, key);
    if (idx >= 0) {
        lc->entries[idx].freq++;
    } else {
        /* 找空位插入 */
        for (int i = 0; i < LC_MAX_ENTRIES; i++) {
            if (!lc->entries[i].active) {
                lc->entries[i] = (lc_entry_t){
                    .key = key, .freq = 1,
                    .delta = lc->b_current - 1, .active = 1
                };
                lc->num_active++;
                break;
            }
        }
    }
    if (lc->n % lc->w == 0) {
        lc->b_current++;
        lc_prune(lc);
    }
}

4.4 Lossy Counting 的工程局限

Lossy Counting 有一个实践中的严重问题:每个桶边界处的清理操作需要扫描整个表 \(D\),导致吞吐量不均匀。在流速很高的场景中,这种突发的清理开销可能导致丢包或延迟尖峰。

五、Space-Saving 的完整 C 实现

下面给出 Space-Saving 的完整 C 实现,使用 Stream-Summary 数据结构,所有操作 \(O(1)\) 时间。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>

/* ═══════════════════════════════════════════════════════════════
 * Space-Saving 算法:Stream-Summary 数据结构
 * 所有操作 O(1) 时间复杂度
 * ═══════════════════════════════════════════════════════════════ */

/* 前向声明 */
typedef struct ss_counter  ss_counter_t;
typedef struct ss_bucket   ss_bucket_t;

/* 计数器节点:双向链表节点,属于某个桶 */
struct ss_counter {
    uint64_t       key;
    int64_t        overestimate;  /* 最大过估计量 ε */
    ss_bucket_t   *parent;        /* 所属桶 */
    ss_counter_t  *prev;
    ss_counter_t  *next;
};

/* 桶节点:双向链表节点,代表一个计数值 */
struct ss_bucket {
    int64_t        count;
    ss_counter_t  *head;          /* 计数器链表头 */
    ss_counter_t  *tail;          /* 计数器链表尾 */
    int            size;          /* 计数器数量 */
    ss_bucket_t   *prev;
    ss_bucket_t   *next;
};

/* 简易哈希表(开放寻址) */
#define SS_HT_CAPACITY  8192
#define SS_HT_MASK      (SS_HT_CAPACITY - 1)

typedef struct {
    uint64_t       key;
    ss_counter_t  *counter;
    int            occupied;
} ss_ht_slot_t;

typedef struct {
    ss_bucket_t   *bucket_head;   /* 桶链表头(最小计数) */
    ss_bucket_t   *bucket_tail;   /* 桶链表尾(最大计数) */
    ss_ht_slot_t   ht[SS_HT_CAPACITY];
    ss_counter_t  *counters;      /* 预分配的计数器数组 */
    int            k;             /* 最大计数器数量 */
    int            num_counters;  /* 当前使用的计数器数量 */
    int64_t        n;             /* 已处理元素总数 */
} space_saving_t;

/* ——————————————————————————————————————
 * 哈希表操作
 * —————————————————————————————————————— */

static uint64_t ss_hash(uint64_t key)
{
    /* FNV-1a 变体 */
    key ^= key >> 33;
    key *= 0xff51afd7ed558ccdULL;
    key ^= key >> 33;
    key *= 0xc4ceb9fe1a85ec53ULL;
    key ^= key >> 33;
    return key;
}

static ss_counter_t *ss_ht_get(const space_saving_t *ss, uint64_t key)
{
    uint64_t h = ss_hash(key) & SS_HT_MASK;
    for (int i = 0; i < SS_HT_CAPACITY; i++) {
        uint64_t idx = (h + i) & SS_HT_MASK;
        if (!ss->ht[idx].occupied)
            return NULL;
        if (ss->ht[idx].key == key)
            return ss->ht[idx].counter;
    }
    return NULL;
}

static void ss_ht_put(space_saving_t *ss, uint64_t key, ss_counter_t *counter)
{
    uint64_t h = ss_hash(key) & SS_HT_MASK;
    for (int i = 0; i < SS_HT_CAPACITY; i++) {
        uint64_t idx = (h + i) & SS_HT_MASK;
        if (!ss->ht[idx].occupied || ss->ht[idx].key == key) {
            ss->ht[idx].key      = key;
            ss->ht[idx].counter  = counter;
            ss->ht[idx].occupied = 1;
            return;
        }
    }
    fprintf(stderr, "space-saving: hash table full\n");
    exit(1);
}

static void ss_ht_remove(space_saving_t *ss, uint64_t key)
{
    uint64_t h = ss_hash(key) & SS_HT_MASK;
    for (int i = 0; i < SS_HT_CAPACITY; i++) {
        uint64_t idx = (h + i) & SS_HT_MASK;
        if (!ss->ht[idx].occupied)
            return;
        if (ss->ht[idx].key == key) {
            ss->ht[idx].occupied = 0;
            /* 前移后续探测链上的元素以修复开放寻址 */
            uint64_t j = (idx + 1) & SS_HT_MASK;
            while (ss->ht[j].occupied) {
                uint64_t natural = ss_hash(ss->ht[j].key) & SS_HT_MASK;
                /* 判断 j 是否需要前移 */
                if ((j > idx && (natural <= idx || natural > j)) ||
                    (j < idx && (natural <= idx && natural > j))) {
                    ss->ht[idx] = ss->ht[j];
                    ss->ht[j].occupied = 0;
                    idx = j;
                }
                j = (j + 1) & SS_HT_MASK;
            }
            return;
        }
    }
}

/* ——————————————————————————————————————
 * 桶链表操作
 * —————————————————————————————————————— */

static ss_bucket_t *ss_bucket_new(int64_t count)
{
    ss_bucket_t *b = (ss_bucket_t *)calloc(1, sizeof(ss_bucket_t));
    b->count = count;
    return b;
}

/* 在 after 后面插入新桶 */
static void ss_bucket_insert_after(space_saving_t *ss,
                                   ss_bucket_t *after,
                                   ss_bucket_t *b)
{
    b->prev = after;
    b->next = after ? after->next : ss->bucket_head;
    if (b->prev) b->prev->next = b;
    else         ss->bucket_head = b;
    if (b->next) b->next->prev = b;
    else         ss->bucket_tail = b;
}

static void ss_bucket_remove(space_saving_t *ss, ss_bucket_t *b)
{
    if (b->prev) b->prev->next = b->next;
    else         ss->bucket_head = b->next;
    if (b->next) b->next->prev = b->prev;
    else         ss->bucket_tail = b->prev;
    free(b);
}

/* 将计数器从当前桶移除 */
static void ss_counter_detach(space_saving_t *ss, ss_counter_t *c)
{
    ss_bucket_t *b = c->parent;
    if (c->prev) c->prev->next = c->next;
    else         b->head = c->next;
    if (c->next) c->next->prev = c->prev;
    else         b->tail = c->prev;
    b->size--;
    c->prev = c->next = NULL;
    c->parent = NULL;

    /* 如果桶变空,删除桶 */
    if (b->size == 0)
        ss_bucket_remove(ss, b);
}

/* 将计数器添加到桶的头部 */
static void ss_counter_attach(ss_counter_t *c, ss_bucket_t *b)
{
    c->parent = b;
    c->prev = NULL;
    c->next = b->head;
    if (b->head) b->head->prev = c;
    else         b->tail = c;
    b->head = c;
    b->size++;
}

/* 确保 count 值对应的桶存在,返回该桶 */
static ss_bucket_t *ss_ensure_bucket(space_saving_t *ss,
                                     int64_t count,
                                     ss_bucket_t *hint)
{
    /* hint 是当前桶(count - 1 的桶),新桶应在其后面 */
    if (hint && hint->next && hint->next->count == count)
        return hint->next;

    ss_bucket_t *nb = ss_bucket_new(count);
    ss_bucket_insert_after(ss, hint, nb);
    return nb;
}

/* ——————————————————————————————————————
 * Space-Saving 公共接口
 * —————————————————————————————————————— */

void ss_init(space_saving_t *ss, int k)
{
    memset(ss, 0, sizeof(*ss));
    ss->k = k;
    ss->counters = (ss_counter_t *)calloc(k, sizeof(ss_counter_t));
}

void ss_destroy(space_saving_t *ss)
{
    /* 释放所有桶 */
    ss_bucket_t *b = ss->bucket_head;
    while (b) {
        ss_bucket_t *next = b->next;
        free(b);
        b = next;
    }
    free(ss->counters);
    memset(ss, 0, sizeof(*ss));
}

void ss_process(space_saving_t *ss, uint64_t key)
{
    ss->n++;

    /* 情况 1:已追踪的元素 */
    ss_counter_t *c = ss_ht_get(ss, key);
    if (c) {
        ss_bucket_t *old_bucket = c->parent;
        int64_t new_count = old_bucket->count + 1;
        ss_bucket_t *new_bucket = ss_ensure_bucket(ss, new_count, old_bucket);
        ss_counter_detach(ss, c);
        ss_counter_attach(c, new_bucket);
        return;
    }

    /* 情况 2:有空闲计数器 */
    if (ss->num_counters < ss->k) {
        c = &ss->counters[ss->num_counters++];
        c->key = key;
        c->overestimate = 0;

        ss_bucket_t *b1;
        if (ss->bucket_head && ss->bucket_head->count == 1) {
            b1 = ss->bucket_head;
        } else {
            b1 = ss_bucket_new(1);
            ss_bucket_insert_after(ss, NULL, b1);
        }
        ss_counter_attach(c, b1);
        ss_ht_put(ss, key, c);
        return;
    }

    /* 情况 3:替换最小计数器的元素 */
    ss_bucket_t *min_bucket = ss->bucket_head;
    c = min_bucket->tail;  /* 取桶中最后一个元素作为牺牲者 */
    int64_t min_count = min_bucket->count;

    /* 从哈希表移除旧 key */
    ss_ht_remove(ss, c->key);

    /* 替换为新 key */
    c->key = key;
    c->overestimate = min_count;

    /* 移到 count+1 的桶 */
    int64_t new_count = min_count + 1;
    ss_bucket_t *new_bucket = ss_ensure_bucket(ss, new_count, min_bucket);
    ss_counter_detach(ss, c);
    ss_counter_attach(c, new_bucket);

    /* 注册新 key */
    ss_ht_put(ss, key, c);
}

/* 查询元素的估计频率 */
int64_t ss_query(const space_saving_t *ss, uint64_t key)
{
    ss_counter_t *c = ss_ht_get(ss, key);
    if (!c) return 0;
    return c->parent->count;
}

/* 查询元素的保证频率下界 */
int64_t ss_query_lower(const space_saving_t *ss, uint64_t key)
{
    ss_counter_t *c = ss_ht_get(ss, key);
    if (!c) return 0;
    return c->parent->count - c->overestimate;
}

/* 获取 top-k 结果 */
typedef struct {
    uint64_t key;
    int64_t  count;
    int64_t  overestimate;
} ss_result_t;

int ss_topk(const space_saving_t *ss, ss_result_t *results, int max_results)
{
    int count = 0;
    /* 从最大桶向最小桶遍历 */
    for (ss_bucket_t *b = ss->bucket_tail; b && count < max_results; b = b->prev) {
        for (ss_counter_t *c = b->head; c && count < max_results; c = c->next) {
            results[count].key         = c->key;
            results[count].count       = b->count;
            results[count].overestimate = c->overestimate;
            count++;
        }
    }
    return count;
}

5.1 使用示例

int main(void)
{
    space_saving_t ss;
    ss_init(&ss, 5);  /* 追踪 top-5 */

    /* 模拟数据流 */
    uint64_t stream[] = {
        1, 2, 3, 1, 2, 1, 4, 1, 2, 5,
        1, 2, 3, 1, 2, 1, 6, 7, 1, 2,
        1, 1, 1, 2, 2, 3, 3, 4, 4, 5,
    };
    int n = sizeof(stream) / sizeof(stream[0]);

    for (int i = 0; i < n; i++)
        ss_process(&ss, stream[i]);

    printf("已处理 %lld 个元素\n", (long long)ss.n);
    printf("\nTop-5 结果:\n");

    ss_result_t results[5];
    int num = ss_topk(&ss, results, 5);
    for (int i = 0; i < num; i++) {
        printf("  key=%llu  count=%lld  overestimate=%lld  "
               "guaranteed=[%lld, %lld]\n",
               (unsigned long long)results[i].key,
               (long long)results[i].count,
               (long long)results[i].overestimate,
               (long long)(results[i].count - results[i].overestimate),
               (long long)results[i].count);
    }

    ss_destroy(&ss);
    return 0;
}

六、与 Count-Min Sketch 的对比

在第 31 篇中我们详细讨论了 Count-Min Sketch(CMS),它是一种概率性频率估计方法。现在让我们系统对比确定性方法和概率性方法。

6.1 本质区别

Count-Min Sketch 是被动查询模型:它维护一个压缩表示,你可以查询任意元素的频率。但它不会告诉你”谁是 heavy hitter”——你必须自己遍历所有可能的元素去查询。

Space-Saving 和 Misra-Gries 是主动追踪模型:它们始终维护一个候选集合,你可以直接获取 heavy hitter 列表。

6.2 详细对比

维度 Count-Min Sketch Space-Saving Misra-Gries
类型 概率性 确定性 确定性
空间 \(O(\frac{1}{\varepsilon} \log \frac{1}{\delta})\) \(O(\frac{1}{\varepsilon})\) \(O(\frac{1}{\varepsilon})\)
每次更新时间 \(O(\log \frac{1}{\delta})\) \(O(1)\) \(O(\frac{1}{\varepsilon})\) 最坏
频率查询 任意元素 仅追踪元素 仅追踪元素
误差方向 只会过估计 只会过估计 只会低估
误差界 \(\varepsilon n\)(概率 \(1-\delta\) \(\varepsilon n\)(确定性) \(\varepsilon n\)(确定性)
Heavy hitter 需外部扫描 直接输出 直接输出
Top-k 查询 不支持 天然支持 不直接支持
合并性 可合并 不可合并 可合并(需额外工作)
删除操作 需 Count-Min-Mean 变体 不支持 不支持

6.3 何时选择哪个

选 Count-Min Sketch: - 需要查询任意元素的频率(不仅仅是 heavy hitter) - 需要在分布式环境中合并多个摘要 - 不关心 heavy hitter 列表,只关心点查询

选 Space-Saving: - 需要 top-kheavy hitter 列表 - 需要确定性保证(不依赖随机性) - 内存预算极其有限(\(O(1/\varepsilon)\) vs \(O(\frac{1}{\varepsilon} \log \frac{1}{\delta})\)) - 需要最坏情况 \(O(1)\) 的操作时间

实践中的常见做法:两者结合。用 Space-Saving 维护 top-k 候选集,用 Count-Min Sketch 回答任意元素的点查询。

七、下界理论:这些算法已是最优

7.1 确定性空间下界

定理(Alon, Matias, Szegedy 1996 推论):任何单遍确定性流式算法,如果要找出所有频率超过 \(\varepsilon n\) 的元素(且不遗漏),则需要 \(\Omega(1/\varepsilon)\) 的空间。

证明思路

构造一个对手论证。假设全集大小为 \(m = \lceil 1/\varepsilon \rceil + 1\)。对手构造一个流,前 \(n - 1\) 个元素使得所有 \(m\) 个元素的频率都恰好是 \(\lfloor (n-1)/m \rfloor\)\(\lceil (n-1)/m \rceil\)。最后一个元素的选择决定了哪个元素成为 heavy hitter。

由于算法必须区分 \(m\) 种不同的情况(最后一个元素是哪个),而算法的内部状态在看到最后一个元素之前必须编码足够的信息来区分这些情况,因此状态空间至少为 \(m = \Omega(1/\varepsilon)\)

7.2 随机化空间下界

即使允许随机化,空间下界仍然是 \(\Omega(1/\varepsilon)\)。Count-Min Sketch 的 \(O(\frac{1}{\varepsilon} \log \frac{1}{\delta})\) 中,\(\log(1/\delta)\) 因子是为了保证所有查询元素同时有好的估计——这是更强的保证。

7.3 通信复杂度视角

频率估计等价于 \(\ell_\infty\) 范数估计,其单向通信复杂度下界为 \(\Omega(1/\varepsilon^2)\) bits。但 heavy hitter 只要求找出大的坐标,所以空间下界是较低的 \(\Omega(1/\varepsilon)\)

7.4 总结

Space-Saving 和 Misra-Gries 使用 \(O(1/\varepsilon)\) 个计数器,匹配了理论下界。它们是空间最优的。

八、实际应用场景

8.1 网络流量分析

在网络监控中,heavy hitter 检测是最基本的需求之一。典型场景:

网络设备的约束非常严格:内存小(几十 KB 的 SRAM)、速度快(100Gbps 线速意味着每个包只有几纳秒的处理时间)。Space-Saving 的 \(O(1)\) 时间和 \(O(k)\) 空间正好满足这些需求。

/* 网络流量监控的简化示例 */
typedef struct {
    uint32_t src_ip;
    uint32_t dst_ip;
    uint16_t src_port;
    uint16_t dst_port;
    uint8_t  protocol;
} flow_key_t;

static uint64_t flow_to_key(const flow_key_t *flow)
{
    /* 简单哈希,实际应用中使用更好的哈希函数 */
    uint64_t h = 14695981039346656037ULL;
    h ^= flow->src_ip;  h *= 1099511628211ULL;
    h ^= flow->dst_ip;  h *= 1099511628211ULL;
    h ^= flow->src_port; h *= 1099511628211ULL;
    h ^= flow->dst_port; h *= 1099511628211ULL;
    h ^= flow->protocol; h *= 1099511628211ULL;
    return h;
}

void monitor_packet(space_saving_t *ss, const flow_key_t *flow)
{
    uint64_t key = flow_to_key(flow);
    ss_process(ss, key);
}

void report_elephants(const space_saving_t *ss, double threshold)
{
    ss_result_t results[100];
    int n = ss_topk(ss, results, 100);
    for (int i = 0; i < n; i++) {
        double fraction = (double)results[i].count / ss->n;
        if (fraction >= threshold) {
            printf("Elephant flow: key=%016llx  "
                   "fraction=%.4f  overestimate=%lld\n",
                   (unsigned long long)results[i].key,
                   fraction,
                   (long long)results[i].overestimate);
        }
    }
}

8.2 搜索热词实时统计

搜索引擎的查询流是典型的 Zipf 分布。Space-Saving 用几百个计数器就能找出 top-100 热词。实践中常用分时段统计:每小时一个实例,小时结束时导出 top-k 结果,以捕捉突发热词。

8.3 推荐系统

对每个用户维护一个小的 Space-Saving 实例(\(k = 20\)),实时追踪兴趣分布。当有 10 亿用户时,每人 20 个计数器远好于完整的类别计数表。

8.4 数据库查询优化

数据库用 Space-Saving 维护列值频率直方图,用于查询优化器的基数估计。PostgreSQL 的 pg_stats.most_common_vals 就是这种思路。

九、工程实践要点

9.1 常见陷阱

陷阱 说明 解决方案
哈希表负载因子过高 开放寻址哈希表在负载因子接近 1 时性能急剧下降 保持负载因子 ≤ 0.75,哈希表大小至少为计数器数量的 2 倍
整数溢出 当流非常长时,计数值可能溢出 32 位整数 使用 64 位计数器;在极端情况下使用 128 位或定期重置
内存分配 动态创建/删除桶节点导致内存碎片和 malloc 开销 预分配桶对象池;桶的最大数量不超过计数器数量
多线程竞争 多核处理器上的并行更新导致数据竞争 分片(per-core instance)+ 定期合并;或使用无锁数据结构
冷启动偏差 流的初始阶段,计数器未被充分利用 \(10k\) 个元素使用精确计数,之后切换到 Space-Saving
k 值选择 k 太小遗漏真正的 heavy hitter,太大浪费内存 \(k = c/\varepsilon\),其中 \(c\) 是安全系数(通常 2-4)
字符串键的处理 直接存储字符串键消耗大量内存 对字符串键先哈希为 64 位整数,接受极小的碰撞概率
桶链表退化 如果计数值范围很大,桶链表可能很长 实际中桶的数量不超过计数器数量(每次操作最多创建一个桶)

9.2 参数选择指南

k 的选择:如果要找频率超过 \(\varepsilon\) 的元素,设 \(k = \lceil 1/\varepsilon \rceil\)。如果额外需要 top-\(t\) 查询,设 \(k = \max(t, \lceil 1/\varepsilon \rceil)\)

经验法则:如果不确定 \(\varepsilon\) 该设多少,从 \(k = 1000\) 开始。这对应 \(\varepsilon = 0.001\),即找出频率超过 0.1% 的元素。在大多数实际场景中,这已经足够了。每个计数器大约占 24 字节(8 字节 key + 8 字节 count + 8 字节指针),1000 个计数器只需 24 KB。

9.3 分布式环境下的合并

Misra-Gries 的摘要可以合并:将两个摘要的计数器合并后,再执行一轮”消消乐”。但 Space-Saving 的 Stream-Summary 不容易直接合并,因为过估计量的合并语义不明确。

分布式环境下的推荐做法:

  1. 每个节点运行独立的 Space-Saving 实例。
  2. 定期将 top-k 结果汇报给中心节点。
  3. 中心节点使用精确计数或更大的 Space-Saving 实例来聚合。

或者使用 Count-Min Sketch(天然支持合并)来回答全局查询,同时在每个节点上用 Space-Saving 维护本地 top-k。

9.4 与 Count-Min Sketch 的混合架构

实践中最强大的架构是两者结合:

                    ┌─────────────────┐
    数据流 ────────→│  Space-Saving   │───→ Top-k 候选集
                    │  (k 个计数器)    │
                    └─────────────────┘
                              │
                              │ 候选集
                              ▼
                    ┌─────────────────┐
                    │  Count-Min      │───→ 精确频率估计
                    │  Sketch         │
                    └─────────────────┘

Space-Saving 负责维护候选集,Count-Min Sketch 为候选集中的每个元素提供独立的频率估计。两者交叉验证可以大幅减少误报。

十、性能基准测试

10.1 测试框架

#include <time.h>
#include <math.h>

/* 简单的 Zipf 分布生成器 */
typedef struct {
    double *cdf;
    int     n;
    uint64_t state;  /* xorshift64 状态 */
} zipf_gen_t;

static uint64_t xorshift64(uint64_t *state)
{
    uint64_t x = *state;
    x ^= x << 13;
    x ^= x >> 7;
    x ^= x << 17;
    *state = x;
    return x;
}

static double uniform01(uint64_t *state)
{
    return (double)xorshift64(state) / (double)UINT64_MAX;
}

void zipf_init(zipf_gen_t *gen, int n, double alpha, uint64_t seed)
{
    gen->n = n;
    gen->state = seed;
    gen->cdf = (double *)malloc(n * sizeof(double));

    double sum = 0.0;
    for (int i = 0; i < n; i++)
        sum += 1.0 / pow((double)(i + 1), alpha);

    double cumul = 0.0;
    for (int i = 0; i < n; i++) {
        cumul += 1.0 / pow((double)(i + 1), alpha) / sum;
        gen->cdf[i] = cumul;
    }
}

uint64_t zipf_next(zipf_gen_t *gen)
{
    double u = uniform01(&gen->state);
    /* 二分查找 */
    int lo = 0, hi = gen->n - 1;
    while (lo < hi) {
        int mid = lo + (hi - lo) / 2;
        if (gen->cdf[mid] < u) lo = mid + 1;
        else                   hi = mid;
    }
    return (uint64_t)lo;
}

void zipf_destroy(zipf_gen_t *gen)
{
    free(gen->cdf);
}

/* 基准测试 */
void benchmark(void)
{
    const int STREAM_LEN = 10000000;  /* 1000 万 */
    const int UNIVERSE   = 100000;    /* 10 万不同元素 */
    const int K          = 100;       /* 追踪 top-100 */
    const double ALPHA   = 1.2;       /* Zipf 参数 */

    zipf_gen_t gen;
    zipf_init(&gen, UNIVERSE, ALPHA, 42);

    /* 生成数据流 */
    uint64_t *stream = (uint64_t *)malloc(STREAM_LEN * sizeof(uint64_t));
    for (int i = 0; i < STREAM_LEN; i++)
        stream[i] = zipf_next(&gen);

    /* 计算真实频率 */
    int64_t *true_freq = (int64_t *)calloc(UNIVERSE, sizeof(int64_t));
    for (int i = 0; i < STREAM_LEN; i++)
        true_freq[stream[i]]++;

    /* ── Space-Saving 测试 ── */
    space_saving_t ss;
    ss_init(&ss, K);

    struct timespec t0, t1;
    clock_gettime(CLOCK_MONOTONIC, &t0);
    for (int i = 0; i < STREAM_LEN; i++)
        ss_process(&ss, stream[i]);
    clock_gettime(CLOCK_MONOTONIC, &t1);

    double elapsed_ss = (t1.tv_sec - t0.tv_sec)
                      + (t1.tv_nsec - t0.tv_nsec) * 1e-9;

    printf("Space-Saving: %.3f sec, %.1f M ops/sec\n",
           elapsed_ss, STREAM_LEN / elapsed_ss / 1e6);

    /* 检查误差 */
    ss_result_t results[K];
    int num = ss_topk(&ss, results, K);
    double max_error = 0;
    for (int i = 0; i < num; i++) {
        int64_t true_f = true_freq[results[i].key];
        double err = (double)(results[i].count - true_f) / STREAM_LEN;
        if (err > max_error) max_error = err;
    }
    printf("  最大相对误差: %.6f (理论上界: %.6f)\n",
           max_error, 1.0 / K);

    /* ── Misra-Gries 测试 ── */
    misra_gries_t mg;
    mg_init(&mg, K);

    clock_gettime(CLOCK_MONOTONIC, &t0);
    for (int i = 0; i < STREAM_LEN; i++)
        mg_process(&mg, stream[i]);
    clock_gettime(CLOCK_MONOTONIC, &t1);

    double elapsed_mg = (t1.tv_sec - t0.tv_sec)
                      + (t1.tv_nsec - t0.tv_nsec) * 1e-9;

    printf("Misra-Gries:  %.3f sec, %.1f M ops/sec\n",
           elapsed_mg, STREAM_LEN / elapsed_mg / 1e6);

    ss_destroy(&ss);
    free(stream);
    free(true_freq);
    zipf_destroy(&gen);
}

10.2 典型结果分析

在 Zipf(α=1.2) 分布、10M 流长度、\(k=100\) 的设定下:

算法 吞吐量(Mops/s) 内存(KB) 最大相对误差
Space-Saving 8-15 3.2 < 0.01
Misra-Gries(线性扫描) 2-5 2.4 < 0.01
Lossy Counting 3-8 5-50 < 0.01
Count-Min Sketch(4行) 15-25 16 < 0.005

Space-Saving 内存效率最优;Count-Min Sketch 吞吐更高(只需数组访问,无指针追踪)。

十一、进阶话题

11.1 Filtered Space-Saving

一个重要的优化是 Filtered Space-Saving(Homem 2010):用一个小的 Count-Min Sketch 作为”门卫”,只有当 CMS 估计的频率超过某个阈值时,才将元素加入 Space-Saving。

这样做的好处是减少 Space-Saving 中的替换次数,因为低频元素被 CMS 过滤掉了。实验表明,Filtered Space-Saving 的准确率比原始 Space-Saving 高 2-10 倍。

/* Filtered Space-Saving 的核心逻辑伪代码 */
void filtered_ss_process(space_saving_t *ss,
                         cms_t *cms,
                         uint64_t key)
{
    /* 先更新 CMS */
    cms_increment(cms, key);

    /* 已在 Space-Saving 中,直接递增 */
    if (ss_ht_get(ss, key)) {
        ss_process(ss, key);
        return;
    }

    /* 不在 Space-Saving 中,检查 CMS 估计值 */
    int64_t cms_est = cms_query(cms, key);
    int64_t min_count = ss->bucket_head ? ss->bucket_head->count : 0;

    /* 只有 CMS 估计值超过当前最小计数时才替换 */
    if (cms_est > min_count) {
        ss_process(ss, key);
    }
}

11.2 滑动窗口上的频率估计

很多应用关心的是”最近 \(W\) 个元素中的 heavy hitter”,而不是整个流的历史。这需要滑动窗口变体。

最简单的方法是指数衰减:每隔 \(T\) 个时间步,将所有计数器乘以一个衰减因子 \(\gamma \in (0,1)\)。这近似于指数加权滑动窗口,但会引入额外的近似误差。

更精确的方法是使用 DGIM(Datar-Gionis-Indyk-Motwani)技术来维护滑动窗口上的频率计数,但空间复杂度会增加一个 \(O(\log W)\) 因子。

11.3 Mergeable Summaries

Agarwal 等人在 2012 年提出了 Mergeable Summaries,空间 \(O(\frac{1}{\varepsilon} \log \frac{1}{\varepsilon})\),合并时间 \(O(\frac{1}{\varepsilon} \log^2 \frac{1}{\varepsilon})\)。这对 MapReduce 和 Apache Flink 等分布式流处理框架至关重要。

11.4 频率矩估计

Heavy hitter 的推广是频率矩 \(F_p = \sum_e f_e^p\)\(F_0\) 对应 HyperLogLog 的问题,\(F_2\) 用于检测数据倾斜。AMS sketch 可以在 \(O(1/\varepsilon^2)\) 空间内估计 \(F_2\)

十二、个人思考

12.1 确定性的魅力

我个人非常喜欢 Misra-Gries 和 Space-Saving 这类确定性算法。与 Count-Min Sketch 相比,它们不需要哈希函数的独立性假设,不依赖随机性,误差界是最坏情况而非概率性的。在金融风控、安全监控等”不能出错”的场景中,确定性保证比概率性保证更让人放心。

当然,“确定性”也意味着对手可以构造最坏情况输入。但在实际应用中,输入通常来自自然数据分布,而非恶意对手。即使面对恶意输入,\(n/k\) 的误差界也是确定性成立的。

12.2 算法的”年龄”

Misra-Gries 发表于 1982 年,至今已有 40 多年。Space-Saving 发表于 2005 年,也已近 20 年。但它们仍然是实践中的首选算法。这说明好的算法设计是永恒的——当你已经达到了理论最优,就没有什么可以改进的了。

我经常看到新论文声称”改进了 Space-Saving”,但仔细一看,通常是在不同的假设下(比如分布式、滑动窗口、更强的查询模型)。在基本的单机、追加流、\(\varepsilon\)-heavy hitter 设定下,Space-Saving 就是答案。

12.3 从理论到实践的距离

理论上 Space-Saving 的 Stream-Summary 保证 \(O(1)\) 操作。但实际中,指针追踪(pointer chasing)对现代 CPU 的缓存非常不友好。一个简化的实现——用数组代替链表,用线性扫描代替桶索引——在 \(k\) 不大时(< 1000)反而更快,因为数组的缓存局部性好得多。

这是算法工程中反复出现的主题:渐近最优不等于实际最快。\(O(1)\) 常数大且缓存不友好,不如 \(O(k)\) 常数小且缓存友好。当 \(k = 100\) 时,线性扫描一个连续数组只需要几次缓存行加载,而指针追踪可能每次都 cache miss。

12.4 关于”消消乐”的哲学

Misra-Gries 的消消乐有一种”公平毁灭”的哲学:当没有空间时,不是做聪明的替换决策,而是让所有人都付出代价。这种简单的淘汰规则自然地让高频元素浮现出来——有时候,最简单的规则就是最好的。

12.5 写给工程师的建议

如果你正在实现一个 heavy hitter 或 top-k 系统,我的建议是:

  1. 首选 Space-Saving。它理论最优,实现合理,功能全面。
  2. k 设为目标的 3-5 倍。如果你想要 top-10,设 \(k = 50\)。多出来的计数器是安全裕量。
  3. 不要过度工程化数据结构。如果 \(k < 1000\),用简单数组 + 线性扫描。只有当 \(k > 10000\) 时才值得实现完整的 Stream-Summary。
  4. 定期导出结果。流式算法的美在于它随时可以回答查询。每隔几分钟把 top-k 导出到监控仪表盘。
  5. 考虑与 CMS 结合。如果你还需要查询不在 top-k 中的元素的频率,加一个 Count-Min Sketch。

参考文献

  1. J. Misra and D. Gries, “Finding repeated elements,” Science of Computer Programming, vol. 2, no. 2, pp. 143-152, 1982.
  2. A. Metwally, D. Agrawal, and A. El Abbadi, “Efficient computation of frequent and top-k elements in data streams,” Proc. ICDT, 2005.
  3. G. S. Manku and R. Motwani, “Approximate frequency counts over data streams,” Proc. VLDB, 2002.
  4. G. Cormode and S. Muthukrishnan, “An improved data stream summary: the count-min sketch and its applications,” Journal of Algorithms, vol. 55, no. 1, 2005.
  5. N. Alon, Y. Matias, and M. Szegedy, “The space complexity of approximating the frequency moments,” JCSS, vol. 58, no. 1, 1999.
  6. R. S. Boyer and J. S. Moore, “MJRTY — A fast majority vote algorithm,” Automated Reasoning, 1991.
  7. N. Homem and J. P. Carvalho, “Finding top-k elements in data streams,” Information Sciences, vol. 180, 2010.
  8. P. K. Agarwal et al., “Mergeable summaries,” ACM TODS, vol. 38, no. 4, 2013.
  9. G. Cormode and M. Hadjieleftheriou, “Finding frequent items in data streams,” PVLDB, vol. 1, no. 2, 2008.

算法系列导航上一篇:MinHash 与 SimHash | 下一篇:流式算法总论

相关阅读Count-Min Sketch | HyperLogLog


By .