土法炼钢兴趣小组的算法知识备份

MinHash 与 SimHash:海量文本相似度检测

目录

在数十亿网页中找出近似重复的内容,逐对比较需要天文数字的计算量。MinHash 和 SimHash 用概率方法将复杂度从 O(n^2) 降到近线性。本文完整推导两种算法的数学原理,给出工业级实现,并分析在搜索引擎、推荐系统等场景中的实际部署经验。

一、问题背景:为什么需要大规模相似度检测

1.1 现实场景

互联网每天产生数以亿计的文本内容。以下场景都需要高效识别”相似”文档:

1.2 朴素方法的困境

N = 10^9(十亿)篇网页,逐对比较需要 C(N, 2) ≈ 5 × 10^17 次比较。即使每次 1 微秒,也需约 16 年。我们需要将每篇文档压缩成紧凑的”签名”,然后只比较签名相似的文档对,使总体复杂度接近 O(N)。

1.3 两种相似度度量

在讨论算法之前,需要明确”相似”的定义。文本相似度有多种度量方式,MinHash 和 SimHash 分别对应两种不同的度量:

度量 数学定义 适用场景 对应算法
Jaccard 相似度 J(A,B) = |A∩B| / |A∪B| 集合比较、token 级去重 MinHash
Cosine 相似度 cos(A,B) = A·B / (||A|| × ||B||) 向量空间、TF-IDF 权重 SimHash

两者的选择取决于数据表示方式和业务需求,后文会详细对比。

二、文本的集合表示:Shingling

在应用 MinHash 之前,首先需要将文本转换为集合。标准做法是 k-Shingling(k-gram)。

2.1 定义

给定文本字符串 s 和参数 k,其 k-Shingle 集合是 s 中所有长度为 k 的连续子串组成的集合。

例如,文本 “abcdab” 的 2-Shingle 集合为:

{ab, bc, cd, da, ab} → {ab, bc, cd, da}  (集合自动去重)

2.2 k 值的选择

文本类型 推荐 k 值 说明
短文本(推文、标题) 3~5 区分度要求低
中等文本(新闻、博客) 5~9 通用场景
长文本(网页、论文) 8~12 需要较高区分度

2.3 字符级 vs 词级

除了字符级 shingle,还可以使用词级(word-level)shingle:

文本:"the quick brown fox jumps"
词级 2-shingle:{"the quick", "quick brown", "brown fox", "fox jumps"}

词级 shingle 语义更丰富,但词表更大。实践中常对 shingle 取哈希值,映射到固定范围的整数集合,减少存储开销。

import hashlib

def shingle_hash(text, k=5):
    """将文本转为 k-shingle 的哈希集合"""
    shingles = set()
    for i in range(len(text) - k + 1):
        token = text[i:i+k]
        h = int(hashlib.md5(token.encode('utf-8')).hexdigest(), 16) % (2**32)
        shingles.add(h)
    return shingles

三、Jaccard 相似度

3.1 定义与性质

给定两个集合 A 和 B,Jaccard 相似度定义为:

J(A, B) = |A ∩ B| / |A ∪ B|

性质: - 0 <= J(A, B) <= 1 - J(A, B) = 1 当且仅当 A = B - J(A, B) = 0 当且仅当 A ∩ B = ∅ - 对称性:J(A, B) = J(B, A) - 满足距离公理:d(A, B) = 1 - J(A, B) 是合法的度量(满足三角不等式)

3.2 直接计算的代价

两个大小为 m 的集合,计算 Jaccard 需要 O(m) 时间。N 个文档,每个 m = 1000 个 shingle:所有文档对的比较 O(N^2 × m)。N = 10^6 时约 10^15 次操作——仍然不可接受。

3.3 从精确到近似

关键洞察:我们不需要精确的 Jaccard 值,只需要找出 J(A, B) > t 的文档对(t 是预设阈值,如 0.8)。这正是概率算法大显身手的舞台。

四、MinHash 算法

4.1 核心思想

MinHash 的核心定理极其优雅:

对全集 U 上的一个随机排列(permutation)π,定义 h_π(S) = min{π(x) : x ∈ S},则:

Pr[h_π(A) = h_π(B)] = J(A, B)

即,两个集合在随机排列下的最小哈希值相等的概率,恰好等于它们的 Jaccard 相似度。

4.2 证明

设全集 U 中有一个随机排列 π。考虑 A ∪ B 中的元素在 π 下的排序。

设 A ∪ B = {e_1, e_2, …, e_n},其中 π(e_1) < π(e_2) < … < π(e_n)。

h_π(A) = h_π(B) 成立,当且仅当 A ∪ B 中 π 值最小的元素同时属于 A 和 B,即属于 A ∩ B。

因为 π 是随机排列,A ∪ B 中每个元素成为最小值的概率相等,都是 1/|A ∪ B|。

因此:

Pr[h_π(A) = h_π(B)] = |A ∩ B| / |A ∪ B| = J(A, B)

证毕。这个证明的简洁性令人赞叹——它将一个组合问题归结为均匀分布下的计数。

4.3 从排列到哈希函数

实践中不可能真正对全集做随机排列(全集可能有 2^32 个元素)。替代方案是使用哈希函数模拟排列。

选择一个哈希函数 h: U → [0, 2^64),对集合 S 计算:

MinHash_h(S) = min{h(x) : x ∈ S}

只要 h 足够”随机”(接近独立均匀分布),上述概率等式近似成立。

4.4 签名矩阵

单个哈希函数的估计方差太大。用 k 个独立哈希函数 h_1, h_2, …, h_k 得到 k 维签名向量:

Sig(S) = [MinHash_{h_1}(S), MinHash_{h_2}(S), ..., MinHash_{h_k}(S)]

Jaccard 估计值为签名中相等分量的比例:J_hat(A, B) = |{i : Sig_i(A) = Sig_i(B)}| / k。

由 Hoeffding 不等式:Pr[|J_hat - J| > ε] <= 2 × exp(-2kε^2)。要使误差 ≤ 0.05 且置信度 99%,需要 k ≈ 1060。实践中 k = 100~200 通常足够。

4.5 哈希函数的构造

常用线性哈希函数族:h_{a,b}(x) = (a * x + b) mod p,其中 p 是大素数,a ∈ [1, p-1],b ∈ [0, p-1] 随机选取。这是 2-独立哈希函数族,对 MinHash 已经足够。

import random

class MinHashFamily:
    """MinHash 签名生成器"""

    def __init__(self, num_hashes, prime=2**31 - 1):
        self.num_hashes = num_hashes
        self.prime = prime
        self.a = [random.randint(1, prime - 1) for _ in range(num_hashes)]
        self.b = [random.randint(0, prime - 1) for _ in range(num_hashes)]

    def hash_func(self, i, x):
        return (self.a[i] * x + self.b[i]) % self.prime

    def signature(self, shingle_set):
        """计算集合的 MinHash 签名"""
        sig = [float('inf')] * self.num_hashes
        for x in shingle_set:
            for i in range(self.num_hashes):
                val = self.hash_func(i, x)
                if val < sig[i]:
                    sig[i] = val
        return sig

    def jaccard_estimate(self, sig_a, sig_b):
        """根据签名估计 Jaccard 相似度"""
        equal = sum(1 for a, b in zip(sig_a, sig_b) if a == b)
        return equal / self.num_hashes

五、LSH:局部敏感哈希加速

5.1 签名比较的瓶颈

有了签名矩阵,每篇文档被压缩成 k 维签名。但仍需比较所有 O(N^2) 个文档对。LSH 解决的正是这个问题:只比较同桶内的文档。

5.2 Banding 技术

将 k 维签名分成 b 个 band,每个 band 包含 r 行(k = b × r)。

对每个 band: 1. 将该 band 对应的 r 个签名值拼接,计算哈希 2. 哈希到一个桶中 3. 如果两个文档在某个 band 中落入同一个桶,则它们成为候选对

LSH 桶分配与相似度检测

5.3 概率分析

两个 Jaccard 相似度为 s 的文档:

P(s) = 1 - (1 - s^r)^b

5.4 S-曲线与阈值

函数 P(s) = 1 - (1 - sr)b 形成一条 S 形曲线:

阈值的近似公式:

t ≈ (1/b)^(1/r)

例如 b = 20, r = 5 → t ≈ (1/20)^(1/5) ≈ 0.55。

5.5 参数选择

给定签名长度 k = 100,不同 b、r 组合的效果:

b r 阈值 t 特点
50 2 0.14 极低阈值,高召回
20 5 0.55 中低阈值
10 10 0.69 中等阈值
5 20 0.83 高阈值,高精确

通常根据业务需求先确定目标阈值,再反推 b 和 r。

5.6 误差控制

错误类型 含义 控制方法
假阳性 不相似的文档成为候选对 增大 r,或候选对上精确验证
假阴性 相似的文档未成为候选对 增大 b,或使用多轮 LSH

实际系统中,LSH 筛选出候选对后,通常再做一次精确 Jaccard 计算消除假阳性。

class LSH:
    """基于 Banding 的 LSH 索引"""

    def __init__(self, num_bands, rows_per_band):
        self.b = num_bands
        self.r = rows_per_band
        self.buckets = [{} for _ in range(num_bands)]

    def _band_hash(self, band_values):
        """将一个 band 的值映射到桶 ID"""
        return hash(tuple(band_values))

    def index(self, doc_id, signature):
        """将文档签名加入索引"""
        for band_idx in range(self.b):
            start = band_idx * self.r
            end = start + self.r
            band = signature[start:end]
            bucket_id = self._band_hash(band)
            if bucket_id not in self.buckets[band_idx]:
                self.buckets[band_idx][bucket_id] = set()
            self.buckets[band_idx][bucket_id].add(doc_id)

    def query(self, signature):
        """查询与给定签名相似的候选文档"""
        candidates = set()
        for band_idx in range(self.b):
            start = band_idx * self.r
            end = start + self.r
            band = signature[start:end]
            bucket_id = self._band_hash(band)
            if bucket_id in self.buckets[band_idx]:
                candidates.update(self.buckets[band_idx][bucket_id])
        return candidates

六、SimHash 算法

6.1 背景

SimHash 由 Moses Charikar 于 2002 年提出,后来被 Google 大规模应用于网页去重系统。与 MinHash 不同,SimHash 对应的是 Cosine 相似度而非 Jaccard 相似度。

6.2 随机超平面方法

SimHash 的理论基础是随机超平面的 LSH 方案。

对 d 维空间中的两个向量 u 和 v,随机选择一个超平面(通过原点),则 u 和 v 被分到超平面同侧的概率为:

Pr[sign(r·u) = sign(r·v)] = 1 - θ(u,v)/π

其中 θ(u,v) 是 u 和 v 之间的夹角,r 是随机向量。

这意味着两个向量越相似(夹角越小),被随机超平面分到同侧的概率越高。

6.3 SimHash 的构造

给定文档的特征向量 v ∈ R^d(例如 TF-IDF 向量),SimHash 生成一个 f-bit 指纹:选择 f 个随机超平面 r_1, …, r_f,对第 i 位取 sign(r_i · v)。

关键性质:E[Hamming(SimHash(u), SimHash(v))] = f × θ(u,v) / π。

6.4 高效实现

实践中不需要存储 f 个随机向量,而是利用每个特征的哈希值来模拟随机超平面投影:

  1. 初始化 f 维累加向量 V = [0, …, 0]
  2. 对每个特征 feature(权重 w):计算 f-bit 哈希 h,对每位 j:若第 j 位为 1 则 V[j] += w,否则 V[j] -= w
  3. 最终对 V 的每位,正数映射为 1,负数映射为 0
import hashlib
import struct

def simhash(features, weights=None, f=64):
    """
    计算 SimHash 指纹

    features: 特征列表(字符串)
    weights: 对应权重(默认全为 1)
    f: 指纹位数
    """
    if weights is None:
        weights = [1.0] * len(features)

    v = [0.0] * f
    for feature, weight in zip(features, weights):
        h = _hash_feature(feature, f)
        for j in range(f):
            if h & (1 << j):
                v[j] += weight
            else:
                v[j] -= weight

    fingerprint = 0
    for j in range(f):
        if v[j] > 0:
            fingerprint |= (1 << j)
    return fingerprint


def _hash_feature(feature, f=64):
    """将特征映射到 f-bit 哈希值"""
    digest = hashlib.md5(feature.encode('utf-8')).digest()
    if f <= 64:
        return struct.unpack('<Q', digest[:8])[0] & ((1 << f) - 1)
    else:
        val = int.from_bytes(digest, 'little')
        return val & ((1 << f) - 1)


def hamming_distance(a, b):
    """计算两个整数的 Hamming 距离"""
    x = a ^ b
    count = 0
    while x:
        count += 1
        x &= x - 1   # Brian Kernighan 算法
    return count


def simhash_similarity(a, b, f=64):
    """将 Hamming 距离转换为相似度 [0, 1]"""
    d = hamming_distance(a, b)
    return 1.0 - d / f

6.5 64-bit SimHash 的实际应用

Google 的网页去重系统使用 64-bit SimHash,判定标准为 Hamming(SimHash(A), SimHash(B)) <= 3。

64 位 SimHash 中 Hamming 距离与 Cosine 相似度的对应关系:

Hamming 距离 Cosine 相似度
0 1.000
3 0.989
8 0.924
16 0.707
32 0.000

3 位对应 Cosine 约 0.989,对于网页去重是合理的阈值。

6.6 Hamming 距离的高效检索

给定查询 SimHash,如何在数十亿指纹中快速找到 Hamming 距离 ≤ 3 的文档?

分块索引:将 64-bit 分成 4 块(每块 16 bit)。由鸽巢原理,Hamming ≤ 3 的两个指纹至少有一块完全相同。因此建立 4 份索引,查询时分别精确匹配对应块,对候选集计算实际 Hamming 距离。

class SimHashIndex:
    """基于分块的 SimHash 索引,支持 Hamming 距离 <= k 的查询"""

    def __init__(self, f=64, num_blocks=4):
        self.f = f
        self.num_blocks = num_blocks
        self.block_bits = f // num_blocks
        self.tables = [{} for _ in range(num_blocks)]
        self.fingerprints = {}  # doc_id -> fingerprint

    def _get_block(self, fingerprint, block_idx):
        """提取指纹的第 block_idx 块"""
        shift = block_idx * self.block_bits
        mask = (1 << self.block_bits) - 1
        return (fingerprint >> shift) & mask

    def add(self, doc_id, fingerprint):
        """添加文档指纹"""
        self.fingerprints[doc_id] = fingerprint
        for i in range(self.num_blocks):
            block = self._get_block(fingerprint, i)
            if block not in self.tables[i]:
                self.tables[i][block] = set()
            self.tables[i][block].add(doc_id)

    def query(self, fingerprint, max_distance=3):
        """查询 Hamming 距离不超过 max_distance 的文档"""
        candidates = set()
        for i in range(self.num_blocks):
            block = self._get_block(fingerprint, i)
            if block in self.tables[i]:
                candidates.update(self.tables[i][block])

        results = []
        for doc_id in candidates:
            d = hamming_distance(fingerprint, self.fingerprints[doc_id])
            if d <= max_distance:
                results.append((doc_id, d))
        return sorted(results, key=lambda x: x[1])

七、完整工程实现

7.1 C 语言实现:MinHash

下面给出 MinHash 的完整 C 实现,侧重性能与内存效率。

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <limits.h>

#define MAX_SHINGLES  100000
#define PRIME         2147483647ULL  /* 2^31 - 1,梅森素数 */

typedef struct {
    uint64_t *a;      /* 哈希系数 a */
    uint64_t *b;      /* 哈希系数 b */
    int       k;      /* 哈希函数个数 */
} MinHasher;

typedef struct {
    uint32_t *values;  /* 签名值数组 */
    int       k;
} Signature;

/* 初始化 MinHash 生成器 */
MinHasher *minhash_create(int num_hashes)
{
    MinHasher *mh = malloc(sizeof(MinHasher));
    mh->k = num_hashes;
    mh->a = malloc(sizeof(uint64_t) * num_hashes);
    mh->b = malloc(sizeof(uint64_t) * num_hashes);

    for (int i = 0; i < num_hashes; i++) {
        mh->a[i] = (uint64_t)rand() % (PRIME - 1) + 1;
        mh->b[i] = (uint64_t)rand() % PRIME;
    }
    return mh;
}

void minhash_free(MinHasher *mh)
{
    free(mh->a);
    free(mh->b);
    free(mh);
}

/* 单个哈希函数计算 */
static inline uint32_t hash_func(uint64_t a, uint64_t b, uint32_t x)
{
    return (uint32_t)((a * (uint64_t)x + b) % PRIME);
}

/* 计算一个集合的 MinHash 签名 */
Signature *minhash_signature(MinHasher *mh,
                             uint32_t *shingles, int num_shingles)
{
    Signature *sig = malloc(sizeof(Signature));
    sig->k = mh->k;
    sig->values = malloc(sizeof(uint32_t) * mh->k);

    for (int i = 0; i < mh->k; i++)
        sig->values[i] = UINT32_MAX;

    for (int s = 0; s < num_shingles; s++) {
        for (int i = 0; i < mh->k; i++) {
            uint32_t val = hash_func(mh->a[i], mh->b[i], shingles[s]);
            if (val < sig->values[i])
                sig->values[i] = val;
        }
    }
    return sig;
}

void signature_free(Signature *sig)
{
    free(sig->values);
    free(sig);
}

/* 根据签名估计 Jaccard 相似度 */
double jaccard_estimate(Signature *a, Signature *b)
{
    if (a->k != b->k) return -1.0;
    int match = 0;
    for (int i = 0; i < a->k; i++) {
        if (a->values[i] == b->values[i])
            match++;
    }
    return (double)match / a->k;
}

/*
 * LSH 索引(简化版,单桶哈希)
 * 生产系统中应使用更高效的哈希表实现
 */
#define MAX_DOCS    1000000
#define MAX_BUCKET  (1 << 20)

typedef struct BucketNode {
    int doc_id;
    struct BucketNode *next;
} BucketNode;

typedef struct {
    int num_bands;
    int rows_per_band;
    BucketNode **buckets;  /* num_bands * MAX_BUCKET */
} LSHIndex;

LSHIndex *lsh_create(int num_bands, int rows_per_band)
{
    LSHIndex *lsh = malloc(sizeof(LSHIndex));
    lsh->num_bands = num_bands;
    lsh->rows_per_band = rows_per_band;

    size_t total = (size_t)num_bands * MAX_BUCKET;
    lsh->buckets = calloc(total, sizeof(BucketNode *));
    return lsh;
}

/* FNV-1a 哈希,将一个 band 的值映射到桶 */
static uint32_t band_hash(uint32_t *values, int r)
{
    uint32_t h = 2166136261u;
    for (int i = 0; i < r; i++) {
        h ^= values[i];
        h *= 16777619u;
    }
    return h % MAX_BUCKET;
}

void lsh_insert(LSHIndex *lsh, int doc_id, Signature *sig)
{
    for (int b = 0; b < lsh->num_bands; b++) {
        int start = b * lsh->rows_per_band;
        uint32_t bucket = band_hash(&sig->values[start],
                                    lsh->rows_per_band);
        size_t idx = (size_t)b * MAX_BUCKET + bucket;

        BucketNode *node = malloc(sizeof(BucketNode));
        node->doc_id = doc_id;
        node->next = lsh->buckets[idx];
        lsh->buckets[idx] = node;
    }
}

/*
 * 查询候选对:返回与给定签名在任何 band 中同桶的文档 ID
 * 调用者负责去重和精确验证
 */
int lsh_query(LSHIndex *lsh, Signature *sig,
              int *result_buf, int max_results)
{
    int count = 0;
    for (int b = 0; b < lsh->num_bands; b++) {
        int start = b * lsh->rows_per_band;
        uint32_t bucket = band_hash(&sig->values[start],
                                    lsh->rows_per_band);
        size_t idx = (size_t)b * MAX_BUCKET + bucket;

        BucketNode *node = lsh->buckets[idx];
        while (node && count < max_results) {
            result_buf[count++] = node->doc_id;
            node = node->next;
        }
    }
    return count;
}

void lsh_free(LSHIndex *lsh)
{
    size_t total = (size_t)lsh->num_bands * MAX_BUCKET;
    for (size_t i = 0; i < total; i++) {
        BucketNode *node = lsh->buckets[i];
        while (node) {
            BucketNode *tmp = node;
            node = node->next;
            free(tmp);
        }
    }
    free(lsh->buckets);
    free(lsh);
}

/* 演示 */
int main(void)
{
    srand(42);
    uint32_t set_a[] = {1, 2, 3, 4, 5, 6, 7, 8};
    uint32_t set_b[] = {1, 2, 3, 4, 5, 9, 10, 11};
    uint32_t set_c[] = {20, 21, 22, 23, 24, 25, 26, 27};

    MinHasher *mh = minhash_create(100);
    Signature *sig_a = minhash_signature(mh, set_a, 8);
    Signature *sig_b = minhash_signature(mh, set_b, 8);
    Signature *sig_c = minhash_signature(mh, set_c, 8);

    printf("J_hat(A,B) = %.3f  (真实: 5/11 ≈ 0.455)\n",
           jaccard_estimate(sig_a, sig_b));
    printf("J_hat(A,C) = %.3f  (真实: 0.000)\n",
           jaccard_estimate(sig_a, sig_c));

    LSHIndex *lsh = lsh_create(20, 5);
    lsh_insert(lsh, 0, sig_a);
    lsh_insert(lsh, 1, sig_b);
    lsh_insert(lsh, 2, sig_c);

    int results[100];
    int n = lsh_query(lsh, sig_a, results, 100);
    printf("LSH 候选对:");
    for (int i = 0; i < n; i++) printf(" %d", results[i]);
    printf("\n");

    signature_free(sig_a); signature_free(sig_b); signature_free(sig_c);
    lsh_free(lsh); minhash_free(mh);
    return 0;
}

7.2 C 语言实现:SimHash

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>

/* FNV-1a 64-bit 哈希 */
static uint64_t fnv1a_64(const char *data, size_t len)
{
    uint64_t h = 14695981039346656037ULL;
    for (size_t i = 0; i < len; i++) {
        h ^= (uint8_t)data[i];
        h *= 1099511628211ULL;
    }
    return h;
}

/* 计算 64-bit SimHash */
uint64_t simhash_compute(const char **features, const double *weights,
                         int n)
{
    double v[64];
    memset(v, 0, sizeof(v));

    for (int i = 0; i < n; i++) {
        uint64_t h = fnv1a_64(features[i], strlen(features[i]));
        double w = weights ? weights[i] : 1.0;
        for (int j = 0; j < 64; j++) {
            if (h & (1ULL << j))
                v[j] += w;
            else
                v[j] -= w;
        }
    }

    uint64_t fingerprint = 0;
    for (int j = 0; j < 64; j++) {
        if (v[j] > 0)
            fingerprint |= (1ULL << j);
    }
    return fingerprint;
}

/* 利用 popcount 的快速 Hamming 距离 */
static inline int hamming_dist(uint64_t a, uint64_t b)
{
    return __builtin_popcountll(a ^ b);
}

/* 分块索引:64-bit 分成 4 个 16-bit 块 */
#define NUM_BLOCKS   4
#define BLOCK_BITS   16
#define TABLE_SIZE   (1 << BLOCK_BITS)

typedef struct DocEntry {
    int              doc_id;
    uint64_t         fingerprint;
    struct DocEntry *next;
} DocEntry;

typedef struct {
    DocEntry *tables[NUM_BLOCKS][TABLE_SIZE];
} SimHashIndex;

static inline uint16_t get_block(uint64_t fp, int block)
{
    return (uint16_t)((fp >> (block * BLOCK_BITS)) & 0xFFFF);
}

void simhash_index_add(SimHashIndex *idx, int doc_id, uint64_t fp)
{
    for (int b = 0; b < NUM_BLOCKS; b++) {
        uint16_t key = get_block(fp, b);
        DocEntry *entry = malloc(sizeof(DocEntry));
        entry->doc_id = doc_id;
        entry->fingerprint = fp;
        entry->next = idx->tables[b][key];
        idx->tables[b][key] = entry;
    }
}

int simhash_index_query(SimHashIndex *idx, uint64_t fp,
                        int max_dist, int *results, int max_results)
{
    int count = 0;
    for (int b = 0; b < NUM_BLOCKS; b++) {
        uint16_t key = get_block(fp, b);
        for (DocEntry *e = idx->tables[b][key]; e && count < max_results; e = e->next) {
            if (hamming_dist(fp, e->fingerprint) <= max_dist)
                results[count++] = e->doc_id;
        }
    }
    return count;
}

int main(void)
{
    const char *doc_a[] = {"hello", "world", "foo", "bar", "test"};
    const char *doc_b[] = {"hello", "world", "foo", "baz", "test"};
    const char *doc_c[] = {"alpha", "beta", "gamma", "delta", "epsilon"};

    uint64_t fp_a = simhash_compute(doc_a, NULL, 5);
    uint64_t fp_b = simhash_compute(doc_b, NULL, 5);
    uint64_t fp_c = simhash_compute(doc_c, NULL, 5);

    printf("Hamming(A,B) = %d\n", hamming_dist(fp_a, fp_b));
    printf("Hamming(A,C) = %d\n", hamming_dist(fp_a, fp_c));

    SimHashIndex idx = {{{0}}};
    simhash_index_add(&idx, 0, fp_a);
    simhash_index_add(&idx, 1, fp_b);
    simhash_index_add(&idx, 2, fp_c);

    int results[100];
    int n = simhash_index_query(&idx, fp_a, 3, results, 100);
    printf("SimHash 近似文档(Hamming<=3):");
    for (int i = 0; i < n; i++)
        printf(" doc_%d", results[i]);
    printf("\n");
    return 0;
}

7.3 Python 完整流水线

"""
MinHash + LSH + SimHash 完整流水线
用于文本近似重复检测
"""

import hashlib
import random
import struct
from collections import defaultdict


# ==================== Shingling ====================

def text_to_shingles(text, k=5):
    """将文本转为字符级 k-shingle 哈希集合"""
    shingles = set()
    text = text.lower().strip()
    for i in range(len(text) - k + 1):
        token = text[i:i + k]
        h = struct.unpack('<I',
            hashlib.md5(token.encode('utf-8')).digest()[:4])[0]
        shingles.add(h)
    return shingles


# ==================== MinHash ====================

class MinHash:
    """MinHash 签名生成器"""

    def __init__(self, num_perm=128, seed=42):
        self.num_perm = num_perm
        self.prime = (1 << 31) - 1
        rng = random.Random(seed)
        self.a = [rng.randint(1, self.prime - 1) for _ in range(num_perm)]
        self.b = [rng.randint(0, self.prime - 1) for _ in range(num_perm)]

    def signature(self, shingle_set):
        """计算集合的 MinHash 签名"""
        sig = [float('inf')] * self.num_perm
        for elem in shingle_set:
            for i in range(self.num_perm):
                val = (self.a[i] * elem + self.b[i]) % self.prime
                if val < sig[i]:
                    sig[i] = val
        return sig

    @staticmethod
    def jaccard_from_sigs(sig_a, sig_b):
        """根据签名估计 Jaccard 相似度"""
        matches = sum(1 for a, b in zip(sig_a, sig_b) if a == b)
        return matches / len(sig_a)


# ==================== LSH ====================

class LSHIndex:
    """基于 Banding 的 LSH 索引"""

    def __init__(self, num_bands, rows_per_band):
        self.num_bands = num_bands
        self.rows_per_band = rows_per_band
        self.hash_tables = [defaultdict(set) for _ in range(num_bands)]

    def insert(self, doc_id, signature):
        """将文档签名加入索引"""
        for band_idx in range(self.num_bands):
            start = band_idx * self.rows_per_band
            end = start + self.rows_per_band
            band = tuple(signature[start:end])
            self.hash_tables[band_idx][band].add(doc_id)

    def query(self, signature):
        """查询候选相似文档"""
        candidates = set()
        for band_idx in range(self.num_bands):
            start = band_idx * self.rows_per_band
            end = start + self.rows_per_band
            band = tuple(signature[start:end])
            candidates.update(self.hash_tables[band_idx][band])
        return candidates


# ==================== SimHash ====================

class SimHash:
    """64-bit SimHash 指纹生成器"""

    def __init__(self, f=64):
        self.f = f

    def fingerprint(self, features, weights=None):
        """计算 SimHash 指纹"""
        if weights is None:
            weights = [1.0] * len(features)

        v = [0.0] * self.f
        for feat, w in zip(features, weights):
            h = self._hash_feature(feat)
            for j in range(self.f):
                if h & (1 << j):
                    v[j] += w
                else:
                    v[j] -= w

        result = 0
        for j in range(self.f):
            if v[j] > 0:
                result |= (1 << j)
        return result

    def _hash_feature(self, feature):
        digest = hashlib.md5(feature.encode('utf-8')).digest()
        val = struct.unpack('<Q', digest[:8])[0]
        return val & ((1 << self.f) - 1)

    @staticmethod
    def hamming(a, b):
        x = a ^ b
        count = 0
        while x:
            count += 1
            x &= x - 1
        return count

    def similarity(self, a, b):
        return 1.0 - self.hamming(a, b) / self.f


# ==================== 完整流水线 ====================

def dedup_pipeline(documents, threshold=0.5,
                   num_perm=128, num_bands=16, rows_per_band=8):
    """
    文本去重完整流水线

    documents: dict of {doc_id: text}
    threshold: Jaccard 相似度阈值
    返回: 近似重复的文档对列表
    """
    mh = MinHash(num_perm=num_perm)
    lsh = LSHIndex(num_bands=num_bands, rows_per_band=rows_per_band)

    # 第一步:计算签名并索引
    signatures = {}
    shingle_sets = {}
    for doc_id, text in documents.items():
        shingles = text_to_shingles(text)
        shingle_sets[doc_id] = shingles
        sig = mh.signature(shingles)
        signatures[doc_id] = sig
        lsh.insert(doc_id, sig)

    # 第二步:查找候选对
    candidate_pairs = set()
    for doc_id, sig in signatures.items():
        candidates = lsh.query(sig)
        for cand_id in candidates:
            if cand_id != doc_id:
                pair = tuple(sorted([doc_id, cand_id]))
                candidate_pairs.add(pair)

    # 第三步:精确验证
    duplicates = []
    for id_a, id_b in candidate_pairs:
        set_a = shingle_sets[id_a]
        set_b = shingle_sets[id_b]
        jaccard = len(set_a & set_b) / len(set_a | set_b) if set_a | set_b else 0
        if jaccard >= threshold:
            duplicates.append((id_a, id_b, jaccard))

    return sorted(duplicates, key=lambda x: -x[2])


# ==================== 演示 ====================

if __name__ == '__main__':
    docs = {
        'doc1': '今天天气真好,适合出去散步和锻炼身体',
        'doc2': '今天天气真好,非常适合出去散步锻炼身体',
        'doc3': '量子计算机可以在某些问题上超越经典计算机',
        'doc4': '深度学习模型需要大量的训练数据和计算资源',
    }

    print('=== MinHash + LSH 去重 ===')
    results = dedup_pipeline(docs, threshold=0.3,
                             num_perm=128, num_bands=16, rows_per_band=8)
    for id_a, id_b, sim in results:
        print(f'  {id_a} <-> {id_b}: Jaccard = {sim:.3f}')

    print('\n=== SimHash Hamming 距离 ===')
    sh = SimHash(f=64)
    fps = {did: sh.fingerprint(list(text)) for did, text in docs.items()}
    ids = list(docs.keys())
    for i in range(len(ids)):
        for j in range(i + 1, len(ids)):
            d = SimHash.hamming(fps[ids[i]], fps[ids[j]])
            print(f'  {ids[i]} <-> {ids[j]}: Hamming = {d}')

八、MinHash vs SimHash 对比

8.1 理论对比

维度 MinHash SimHash
对应相似度 Jaccard(集合) Cosine(向量)
数据表示 无序集合(shingle 集合) 加权特征向量
签名大小 k 个 32-bit 值(通常 k=100~200) 单个 64-bit 值
存储开销 较高(每文档数百字节) 极低(每文档 8 字节)
查询方式 LSH Banding Hamming 距离分块索引
适合场景 精确 token 级去重 语义级近似匹配
权重支持 不直接支持 天然支持特征权重

8.2 精度与召回

MinHash 可以通过增加 k 来提高精度。k = 100 时,Jaccard 估计标准差约 0.05。SimHash 极其紧凑(64-bit),但在低相似度区间(J < 0.5)区分能力较弱。

8.3 选择建议

场景                        推荐算法        理由
─────────────────────────────────────────────────────────
搜索引擎网页去重             SimHash        存储紧凑,亿级规模
新闻聚类/相似文章推荐        MinHash        需要精细的相似度阈值
抄袭检测                    MinHash        token 级匹配更准确
商品去重                    MinHash        特征是离散集合
推荐系统(向量表示)         SimHash        向量空间的自然选择
实时流处理                  SimHash        计算和比较都更快

8.4 混合方案

在某些系统中,两者可以配合使用:

  1. 用 SimHash 做粗粒度过滤(每文档 8 字节,全量比较代价低)
  2. 对 SimHash 粗筛通过的文档对,再用 MinHash 做精细验证

这种两阶段方案兼顾了效率和精度。

九、工程实践要点

9.1 常见陷阱与应对

陷阱 现象 解决方案
哈希冲突 不同 shingle 哈希到相同值 使用 64-bit 哈希,或双哈希
空集合 极短文本生成空 shingle 集 设下限,集合为空时跳过
高频 shingle “的”、“是”等停用词主导 去停用词或使用词级 shingle
内存爆炸 签名矩阵过大 流式计算,不存储原始集合
桶倾斜 某些桶包含过多文档 对大桶截断或二次哈希
字符编码 UTF-8 中文切分位置错误 按字符而非字节切分
数值溢出 哈希计算中间值超出范围 用 64-bit 或 128-bit 中间变量
参数不匹配 b×r != k 导致签名越界 断言 num_bands * rows_per_band == num_perm

9.2 性能优化

MinHash 向量化(NumPy 广播避免 Python 循环):

import numpy as np

def minhash_vectorized(shingles, a_coeffs, b_coeffs, prime):
    """shingles: (n,), a/b: (k,) → 返回 (k,) 签名"""
    s = shingles.astype(np.uint64)
    a = a_coeffs.astype(np.uint64).reshape(-1, 1)
    b = b_coeffs.astype(np.uint64).reshape(-1, 1)
    return ((a * s + b) % prime).min(axis=1)

SimHash 位并行:利用 CPU popcount 指令加速 Hamming 距离。

9.3 分布式部署

在大规模系统中,MinHash/LSH 需要分布式部署。常见方案:

MapReduce:Map 阶段对每个 band emit ((band_idx, band_hash), doc_id),Reduce 阶段输出同桶内的所有文档对,最后验证阶段计算精确 Jaccard。

Redis:用 Set 存储每个桶的文档 ID,查询时取 SUNION。

# Redis 索引/查询伪代码
for band_idx in range(b):
    key = f"lsh:band:{band_idx}:{band_hash}"
    redis.sadd(key, doc_id)       # 索引
    candidates |= redis.smembers(key)  # 查询

十、在搜索引擎与推荐系统中的部署

10.1 Google 的网页去重

Google 在 2007 年的论文《Detecting Near-Duplicates for Web Crawling》中描述了基于 SimHash 的去重系统:

  1. 爬取阶段:对每个网页计算 64-bit SimHash
  2. 索引阶段:将 SimHash 存入分块索引
  3. 去重阶段:对新爬取的网页,查询 Hamming 距离 ≤ 3 的已有网页
  4. 决策:如果找到近似重复,保留 PageRank 更高的版本

系统规模:数十亿网页,每天处理数亿新网页。64-bit SimHash 使得全量指纹可以驻留在内存中——80 亿网页仅需约 64 GB 内存。

10.2 推荐系统与新闻聚类

电商和内容平台需要避免推荐重复内容。典型流水线:离线计算 MinHash 签名并建立 LSH 索引,在线生成候选后用 LSH 检查相似度,贪心去重——从高分到低分遍历,与已选内容相似则跳过。

新闻聚类类似:对每篇新闻计算签名,用 LSH 找候选相似对,构建相似度图后跑连通分量算法,每个分量即为一个”故事”。

10.3 实际部署数据

系统 算法 文档规模 签名维度 阈值 延迟
Google 去重 SimHash 80 亿 64-bit Hamming ≤ 3 < 1ms
某新闻聚类 MinHash 千万级 k=128, b=16, r=8 J > 0.5 < 10ms
某电商去重 MinHash 亿级 k=200, b=20, r=10 J > 0.7 < 5ms

十一、扩展话题

11.1 变体算法

11.2 超越文本

MinHash 的思想不限于文本:基因组比较中的 k-mer 集合(Mash 工具)、图的邻接集合相似度、用户行为集合的重叠检测等。

11.3 与深度学习方法的比较

维度 MinHash/SimHash 深度学习(BERT 等)
速度 极快(微秒级) 较慢(毫秒级)
语义理解 弱(依赖词面匹配) 强(理解同义词、改写)
资源需求 CPU,内存少 GPU,内存大
适用规模 百亿级 亿级(需近似最近邻加速)

工业实践中,常用 MinHash/SimHash 做初筛,再用深度模型做精排。

十二、个人思考

关于概率算法的哲学

MinHash 的证明只有三行,SimHash 的核心也只是一个内积的符号。这两个算法让我深刻体会到:最优雅的算法往往源于对问题本质的深刻理解,而不是复杂的技巧堆砌。

Pr[h(A) = h(B)] = J(A, B) 这个等式,将一个组合优化问题(集合比较)完美地转化为一个概率事件。这种转化的美感在于它不是近似——它是精确的。每一次随机排列都是一个无偏估计器,而 LSH 的 Banding 技术则是对多个估计器的巧妙聚合。

关于工程与理论的鸿沟

理论上,MinHash + LSH 将 O(N^2) 降到了 O(N)。但工程中的挑战往往不在算法本身:数据倾斜(模板页面形成超级桶)、增量更新(不能从头重建索引)、阈值调优(误报/漏报的代价不同)。每一条都需要针对具体业务场景的解决方案。

关于”足够好”的哲学

在精确算法和概率算法之间选择时,工程师需要思考的不是”哪个更好”,而是”什么精度是足够的”。对于搜索引擎去重,5% 的误报率是完全可以接受的——用户几乎不会注意到。但对于抄袭检测,同样的误报率可能导致学术生涯的毁灭。

概率算法的价值在于:它让我们用可控的精度损失,换取了数量级的效率提升。这种权衡贯穿了整个大数据时代的算法设计。

关于技术选型

在 2025 年的今天,BERT 和 Sentence Transformer 的语义理解能力远超 MinHash 和 SimHash。但在百亿规模的去重场景中,你不会用 BERT 做第一层过滤——它太慢了。MinHash/SimHash 解决的不是”理解文本”的问题,而是”在天文数字的候选对中快速缩小范围”的问题。这两个问题的最优解,可能永远不同。

参考文献

  1. A. Z. Broder. “On the Resemblance and Containment of Documents.” SEQUENCES 1997.
  2. A. Z. Broder, M. Charikar, A. M. Frieze, M. Mitzenmacher. “Min-wise Independent Permutations.” STOC 1998.
  3. M. S. Charikar. “Similarity Estimation Techniques from Rounding Algorithms.” STOC 2002.
  4. G. S. Manku, A. Jain, A. Das Sarma. “Detecting Near-Duplicates for Web Crawling.” WWW 2007.
  5. P. Indyk, R. Motwani. “Approximate Nearest Neighbors: Towards Removing the Curse of Dimensionality.” STOC 1998.
  6. J. Leskovec, A. Rajaraman, J. D. Ullman. “Mining of Massive Datasets.” Cambridge University Press, Chapter 3.
  7. P. Li, A. B. Owen, C. H. Zhang. “One Permutation Hashing.” NIPS 2012.
  8. A. Shrivastava, P. Li. “Densifying One Permutation Hashing via Rotation for Fast Near Neighbor Search.” ICML 2014.
  9. S. Ioffe. “Improved Consistent Sampling, Weighted Minhash and L1 Sketching.” ICDM 2010.
  10. P. Li, C. K”onig. “b-Bit Minwise Hashing.” WWW 2011.

算法系列导航上一篇:水塘抽样 | 下一篇:频率估计的理论极限

相关阅读LSH 局部敏感哈希 | HNSW


By .