在数十亿网页中找出近似重复的内容,逐对比较需要天文数字的计算量。MinHash 和 SimHash 用概率方法将复杂度从 O(n^2) 降到近线性。本文完整推导两种算法的数学原理,给出工业级实现,并分析在搜索引擎、推荐系统等场景中的实际部署经验。
一、问题背景:为什么需要大规模相似度检测
1.1 现实场景
互联网每天产生数以亿计的文本内容。以下场景都需要高效识别”相似”文档:
- 搜索引擎去重:互联网上约 30% 的网页是近似重复的(Google 2007)
- 抄袭检测:学术论文、新闻稿件的原创性检验
- 推荐系统:识别内容相似的商品或文章,避免重复推荐
- 广告反作弊:检测相似的广告文案,识别批量作弊行为
1.2 朴素方法的困境
N = 10^9(十亿)篇网页,逐对比较需要 C(N, 2) ≈ 5 × 10^17 次比较。即使每次 1 微秒,也需约 16 年。我们需要将每篇文档压缩成紧凑的”签名”,然后只比较签名相似的文档对,使总体复杂度接近 O(N)。
1.3 两种相似度度量
在讨论算法之前,需要明确”相似”的定义。文本相似度有多种度量方式,MinHash 和 SimHash 分别对应两种不同的度量:
| 度量 | 数学定义 | 适用场景 | 对应算法 |
|---|---|---|---|
| Jaccard 相似度 | J(A,B) = |A∩B| / |A∪B| | 集合比较、token 级去重 | MinHash |
| Cosine 相似度 | cos(A,B) = A·B / (||A|| × ||B||) | 向量空间、TF-IDF 权重 | SimHash |
两者的选择取决于数据表示方式和业务需求,后文会详细对比。
二、文本的集合表示:Shingling
在应用 MinHash 之前,首先需要将文本转换为集合。标准做法是 k-Shingling(k-gram)。
2.1 定义
给定文本字符串 s 和参数 k,其 k-Shingle 集合是 s 中所有长度为 k 的连续子串组成的集合。
例如,文本 “abcdab” 的 2-Shingle 集合为:
{ab, bc, cd, da, ab} → {ab, bc, cd, da} (集合自动去重)
2.2 k 值的选择
| 文本类型 | 推荐 k 值 | 说明 |
|---|---|---|
| 短文本(推文、标题) | 3~5 | 区分度要求低 |
| 中等文本(新闻、博客) | 5~9 | 通用场景 |
| 长文本(网页、论文) | 8~12 | 需要较高区分度 |
2.3 字符级 vs 词级
除了字符级 shingle,还可以使用词级(word-level)shingle:
文本:"the quick brown fox jumps"
词级 2-shingle:{"the quick", "quick brown", "brown fox", "fox jumps"}
词级 shingle 语义更丰富,但词表更大。实践中常对 shingle 取哈希值,映射到固定范围的整数集合,减少存储开销。
import hashlib
def shingle_hash(text, k=5):
"""将文本转为 k-shingle 的哈希集合"""
shingles = set()
for i in range(len(text) - k + 1):
token = text[i:i+k]
h = int(hashlib.md5(token.encode('utf-8')).hexdigest(), 16) % (2**32)
shingles.add(h)
return shingles三、Jaccard 相似度
3.1 定义与性质
给定两个集合 A 和 B,Jaccard 相似度定义为:
J(A, B) = |A ∩ B| / |A ∪ B|
性质: - 0 <= J(A, B) <= 1 - J(A, B) = 1 当且仅当 A = B - J(A, B) = 0 当且仅当 A ∩ B = ∅ - 对称性:J(A, B) = J(B, A) - 满足距离公理:d(A, B) = 1 - J(A, B) 是合法的度量(满足三角不等式)
3.2 直接计算的代价
两个大小为 m 的集合,计算 Jaccard 需要 O(m) 时间。N 个文档,每个 m = 1000 个 shingle:所有文档对的比较 O(N^2 × m)。N = 10^6 时约 10^15 次操作——仍然不可接受。
3.3 从精确到近似
关键洞察:我们不需要精确的 Jaccard 值,只需要找出 J(A, B) > t 的文档对(t 是预设阈值,如 0.8)。这正是概率算法大显身手的舞台。
四、MinHash 算法
4.1 核心思想
MinHash 的核心定理极其优雅:
对全集 U 上的一个随机排列(permutation)π,定义 h_π(S) = min{π(x) : x ∈ S},则:
Pr[h_π(A) = h_π(B)] = J(A, B)
即,两个集合在随机排列下的最小哈希值相等的概率,恰好等于它们的 Jaccard 相似度。
4.2 证明
设全集 U 中有一个随机排列 π。考虑 A ∪ B 中的元素在 π 下的排序。
设 A ∪ B = {e_1, e_2, …, e_n},其中 π(e_1) < π(e_2) < … < π(e_n)。
h_π(A) = h_π(B) 成立,当且仅当 A ∪ B 中 π 值最小的元素同时属于 A 和 B,即属于 A ∩ B。
因为 π 是随机排列,A ∪ B 中每个元素成为最小值的概率相等,都是 1/|A ∪ B|。
因此:
Pr[h_π(A) = h_π(B)] = |A ∩ B| / |A ∪ B| = J(A, B)
证毕。这个证明的简洁性令人赞叹——它将一个组合问题归结为均匀分布下的计数。
4.3 从排列到哈希函数
实践中不可能真正对全集做随机排列(全集可能有 2^32 个元素)。替代方案是使用哈希函数模拟排列。
选择一个哈希函数 h: U → [0, 2^64),对集合 S 计算:
MinHash_h(S) = min{h(x) : x ∈ S}
只要 h 足够”随机”(接近独立均匀分布),上述概率等式近似成立。
4.4 签名矩阵
单个哈希函数的估计方差太大。用 k 个独立哈希函数 h_1, h_2, …, h_k 得到 k 维签名向量:
Sig(S) = [MinHash_{h_1}(S), MinHash_{h_2}(S), ..., MinHash_{h_k}(S)]
Jaccard 估计值为签名中相等分量的比例:J_hat(A, B) = |{i : Sig_i(A) = Sig_i(B)}| / k。
由 Hoeffding 不等式:Pr[|J_hat - J| > ε] <= 2 × exp(-2kε^2)。要使误差 ≤ 0.05 且置信度 99%,需要 k ≈ 1060。实践中 k = 100~200 通常足够。
4.5 哈希函数的构造
常用线性哈希函数族:h_{a,b}(x) = (a * x + b) mod p,其中 p 是大素数,a ∈ [1, p-1],b ∈ [0, p-1] 随机选取。这是 2-独立哈希函数族,对 MinHash 已经足够。
import random
class MinHashFamily:
"""MinHash 签名生成器"""
def __init__(self, num_hashes, prime=2**31 - 1):
self.num_hashes = num_hashes
self.prime = prime
self.a = [random.randint(1, prime - 1) for _ in range(num_hashes)]
self.b = [random.randint(0, prime - 1) for _ in range(num_hashes)]
def hash_func(self, i, x):
return (self.a[i] * x + self.b[i]) % self.prime
def signature(self, shingle_set):
"""计算集合的 MinHash 签名"""
sig = [float('inf')] * self.num_hashes
for x in shingle_set:
for i in range(self.num_hashes):
val = self.hash_func(i, x)
if val < sig[i]:
sig[i] = val
return sig
def jaccard_estimate(self, sig_a, sig_b):
"""根据签名估计 Jaccard 相似度"""
equal = sum(1 for a, b in zip(sig_a, sig_b) if a == b)
return equal / self.num_hashes五、LSH:局部敏感哈希加速
5.1 签名比较的瓶颈
有了签名矩阵,每篇文档被压缩成 k 维签名。但仍需比较所有 O(N^2) 个文档对。LSH 解决的正是这个问题:只比较同桶内的文档。
5.2 Banding 技术
将 k 维签名分成 b 个 band,每个 band 包含 r 行(k = b × r)。
对每个 band: 1. 将该 band 对应的 r 个签名值拼接,计算哈希 2. 哈希到一个桶中 3. 如果两个文档在某个 band 中落入同一个桶,则它们成为候选对
5.3 概率分析
两个 Jaccard 相似度为 s 的文档:
- 在一个 band 中完全匹配的概率:s^r
- 在一个 band 中不匹配的概率:1 - s^r
- 在所有 b 个 band 中都不匹配的概率:(1 - sr)b
- 至少在一个 band 中匹配(成为候选对)的概率:
P(s) = 1 - (1 - s^r)^b
5.4 S-曲线与阈值
函数 P(s) = 1 - (1 - sr)b 形成一条 S 形曲线:
- 当 s 接近 0 时,P(s) ≈ 0(不相似的文档几乎不会成为候选对)
- 当 s 接近 1 时,P(s) ≈ 1(相似的文档几乎一定成为候选对)
- 曲线的陡峭过渡发生在阈值 t 附近
阈值的近似公式:
t ≈ (1/b)^(1/r)
例如 b = 20, r = 5 → t ≈ (1/20)^(1/5) ≈ 0.55。
5.5 参数选择
给定签名长度 k = 100,不同 b、r 组合的效果:
| b | r | 阈值 t | 特点 |
|---|---|---|---|
| 50 | 2 | 0.14 | 极低阈值,高召回 |
| 20 | 5 | 0.55 | 中低阈值 |
| 10 | 10 | 0.69 | 中等阈值 |
| 5 | 20 | 0.83 | 高阈值,高精确 |
通常根据业务需求先确定目标阈值,再反推 b 和 r。
5.6 误差控制
| 错误类型 | 含义 | 控制方法 |
|---|---|---|
| 假阳性 | 不相似的文档成为候选对 | 增大 r,或候选对上精确验证 |
| 假阴性 | 相似的文档未成为候选对 | 增大 b,或使用多轮 LSH |
实际系统中,LSH 筛选出候选对后,通常再做一次精确 Jaccard 计算消除假阳性。
class LSH:
"""基于 Banding 的 LSH 索引"""
def __init__(self, num_bands, rows_per_band):
self.b = num_bands
self.r = rows_per_band
self.buckets = [{} for _ in range(num_bands)]
def _band_hash(self, band_values):
"""将一个 band 的值映射到桶 ID"""
return hash(tuple(band_values))
def index(self, doc_id, signature):
"""将文档签名加入索引"""
for band_idx in range(self.b):
start = band_idx * self.r
end = start + self.r
band = signature[start:end]
bucket_id = self._band_hash(band)
if bucket_id not in self.buckets[band_idx]:
self.buckets[band_idx][bucket_id] = set()
self.buckets[band_idx][bucket_id].add(doc_id)
def query(self, signature):
"""查询与给定签名相似的候选文档"""
candidates = set()
for band_idx in range(self.b):
start = band_idx * self.r
end = start + self.r
band = signature[start:end]
bucket_id = self._band_hash(band)
if bucket_id in self.buckets[band_idx]:
candidates.update(self.buckets[band_idx][bucket_id])
return candidates六、SimHash 算法
6.1 背景
SimHash 由 Moses Charikar 于 2002 年提出,后来被 Google 大规模应用于网页去重系统。与 MinHash 不同,SimHash 对应的是 Cosine 相似度而非 Jaccard 相似度。
6.2 随机超平面方法
SimHash 的理论基础是随机超平面的 LSH 方案。
对 d 维空间中的两个向量 u 和 v,随机选择一个超平面(通过原点),则 u 和 v 被分到超平面同侧的概率为:
Pr[sign(r·u) = sign(r·v)] = 1 - θ(u,v)/π
其中 θ(u,v) 是 u 和 v 之间的夹角,r 是随机向量。
这意味着两个向量越相似(夹角越小),被随机超平面分到同侧的概率越高。
6.3 SimHash 的构造
给定文档的特征向量 v ∈ R^d(例如 TF-IDF 向量),SimHash 生成一个 f-bit 指纹:选择 f 个随机超平面 r_1, …, r_f,对第 i 位取 sign(r_i · v)。
关键性质:E[Hamming(SimHash(u), SimHash(v))] = f × θ(u,v) / π。
6.4 高效实现
实践中不需要存储 f 个随机向量,而是利用每个特征的哈希值来模拟随机超平面投影:
- 初始化 f 维累加向量 V = [0, …, 0]
- 对每个特征 feature(权重 w):计算 f-bit 哈希 h,对每位 j:若第 j 位为 1 则 V[j] += w,否则 V[j] -= w
- 最终对 V 的每位,正数映射为 1,负数映射为 0
import hashlib
import struct
def simhash(features, weights=None, f=64):
"""
计算 SimHash 指纹
features: 特征列表(字符串)
weights: 对应权重(默认全为 1)
f: 指纹位数
"""
if weights is None:
weights = [1.0] * len(features)
v = [0.0] * f
for feature, weight in zip(features, weights):
h = _hash_feature(feature, f)
for j in range(f):
if h & (1 << j):
v[j] += weight
else:
v[j] -= weight
fingerprint = 0
for j in range(f):
if v[j] > 0:
fingerprint |= (1 << j)
return fingerprint
def _hash_feature(feature, f=64):
"""将特征映射到 f-bit 哈希值"""
digest = hashlib.md5(feature.encode('utf-8')).digest()
if f <= 64:
return struct.unpack('<Q', digest[:8])[0] & ((1 << f) - 1)
else:
val = int.from_bytes(digest, 'little')
return val & ((1 << f) - 1)
def hamming_distance(a, b):
"""计算两个整数的 Hamming 距离"""
x = a ^ b
count = 0
while x:
count += 1
x &= x - 1 # Brian Kernighan 算法
return count
def simhash_similarity(a, b, f=64):
"""将 Hamming 距离转换为相似度 [0, 1]"""
d = hamming_distance(a, b)
return 1.0 - d / f6.5 64-bit SimHash 的实际应用
Google 的网页去重系统使用 64-bit SimHash,判定标准为 Hamming(SimHash(A), SimHash(B)) <= 3。
64 位 SimHash 中 Hamming 距离与 Cosine 相似度的对应关系:
| Hamming 距离 | Cosine 相似度 |
|---|---|
| 0 | 1.000 |
| 3 | 0.989 |
| 8 | 0.924 |
| 16 | 0.707 |
| 32 | 0.000 |
3 位对应 Cosine 约 0.989,对于网页去重是合理的阈值。
6.6 Hamming 距离的高效检索
给定查询 SimHash,如何在数十亿指纹中快速找到 Hamming 距离 ≤ 3 的文档?
分块索引:将 64-bit 分成 4 块(每块 16 bit)。由鸽巢原理,Hamming ≤ 3 的两个指纹至少有一块完全相同。因此建立 4 份索引,查询时分别精确匹配对应块,对候选集计算实际 Hamming 距离。
class SimHashIndex:
"""基于分块的 SimHash 索引,支持 Hamming 距离 <= k 的查询"""
def __init__(self, f=64, num_blocks=4):
self.f = f
self.num_blocks = num_blocks
self.block_bits = f // num_blocks
self.tables = [{} for _ in range(num_blocks)]
self.fingerprints = {} # doc_id -> fingerprint
def _get_block(self, fingerprint, block_idx):
"""提取指纹的第 block_idx 块"""
shift = block_idx * self.block_bits
mask = (1 << self.block_bits) - 1
return (fingerprint >> shift) & mask
def add(self, doc_id, fingerprint):
"""添加文档指纹"""
self.fingerprints[doc_id] = fingerprint
for i in range(self.num_blocks):
block = self._get_block(fingerprint, i)
if block not in self.tables[i]:
self.tables[i][block] = set()
self.tables[i][block].add(doc_id)
def query(self, fingerprint, max_distance=3):
"""查询 Hamming 距离不超过 max_distance 的文档"""
candidates = set()
for i in range(self.num_blocks):
block = self._get_block(fingerprint, i)
if block in self.tables[i]:
candidates.update(self.tables[i][block])
results = []
for doc_id in candidates:
d = hamming_distance(fingerprint, self.fingerprints[doc_id])
if d <= max_distance:
results.append((doc_id, d))
return sorted(results, key=lambda x: x[1])七、完整工程实现
7.1 C 语言实现:MinHash
下面给出 MinHash 的完整 C 实现,侧重性能与内存效率。
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <limits.h>
#define MAX_SHINGLES 100000
#define PRIME 2147483647ULL /* 2^31 - 1,梅森素数 */
typedef struct {
uint64_t *a; /* 哈希系数 a */
uint64_t *b; /* 哈希系数 b */
int k; /* 哈希函数个数 */
} MinHasher;
typedef struct {
uint32_t *values; /* 签名值数组 */
int k;
} Signature;
/* 初始化 MinHash 生成器 */
MinHasher *minhash_create(int num_hashes)
{
MinHasher *mh = malloc(sizeof(MinHasher));
mh->k = num_hashes;
mh->a = malloc(sizeof(uint64_t) * num_hashes);
mh->b = malloc(sizeof(uint64_t) * num_hashes);
for (int i = 0; i < num_hashes; i++) {
mh->a[i] = (uint64_t)rand() % (PRIME - 1) + 1;
mh->b[i] = (uint64_t)rand() % PRIME;
}
return mh;
}
void minhash_free(MinHasher *mh)
{
free(mh->a);
free(mh->b);
free(mh);
}
/* 单个哈希函数计算 */
static inline uint32_t hash_func(uint64_t a, uint64_t b, uint32_t x)
{
return (uint32_t)((a * (uint64_t)x + b) % PRIME);
}
/* 计算一个集合的 MinHash 签名 */
Signature *minhash_signature(MinHasher *mh,
uint32_t *shingles, int num_shingles)
{
Signature *sig = malloc(sizeof(Signature));
sig->k = mh->k;
sig->values = malloc(sizeof(uint32_t) * mh->k);
for (int i = 0; i < mh->k; i++)
sig->values[i] = UINT32_MAX;
for (int s = 0; s < num_shingles; s++) {
for (int i = 0; i < mh->k; i++) {
uint32_t val = hash_func(mh->a[i], mh->b[i], shingles[s]);
if (val < sig->values[i])
sig->values[i] = val;
}
}
return sig;
}
void signature_free(Signature *sig)
{
free(sig->values);
free(sig);
}
/* 根据签名估计 Jaccard 相似度 */
double jaccard_estimate(Signature *a, Signature *b)
{
if (a->k != b->k) return -1.0;
int match = 0;
for (int i = 0; i < a->k; i++) {
if (a->values[i] == b->values[i])
match++;
}
return (double)match / a->k;
}
/*
* LSH 索引(简化版,单桶哈希)
* 生产系统中应使用更高效的哈希表实现
*/
#define MAX_DOCS 1000000
#define MAX_BUCKET (1 << 20)
typedef struct BucketNode {
int doc_id;
struct BucketNode *next;
} BucketNode;
typedef struct {
int num_bands;
int rows_per_band;
BucketNode **buckets; /* num_bands * MAX_BUCKET */
} LSHIndex;
LSHIndex *lsh_create(int num_bands, int rows_per_band)
{
LSHIndex *lsh = malloc(sizeof(LSHIndex));
lsh->num_bands = num_bands;
lsh->rows_per_band = rows_per_band;
size_t total = (size_t)num_bands * MAX_BUCKET;
lsh->buckets = calloc(total, sizeof(BucketNode *));
return lsh;
}
/* FNV-1a 哈希,将一个 band 的值映射到桶 */
static uint32_t band_hash(uint32_t *values, int r)
{
uint32_t h = 2166136261u;
for (int i = 0; i < r; i++) {
h ^= values[i];
h *= 16777619u;
}
return h % MAX_BUCKET;
}
void lsh_insert(LSHIndex *lsh, int doc_id, Signature *sig)
{
for (int b = 0; b < lsh->num_bands; b++) {
int start = b * lsh->rows_per_band;
uint32_t bucket = band_hash(&sig->values[start],
lsh->rows_per_band);
size_t idx = (size_t)b * MAX_BUCKET + bucket;
BucketNode *node = malloc(sizeof(BucketNode));
node->doc_id = doc_id;
node->next = lsh->buckets[idx];
lsh->buckets[idx] = node;
}
}
/*
* 查询候选对:返回与给定签名在任何 band 中同桶的文档 ID
* 调用者负责去重和精确验证
*/
int lsh_query(LSHIndex *lsh, Signature *sig,
int *result_buf, int max_results)
{
int count = 0;
for (int b = 0; b < lsh->num_bands; b++) {
int start = b * lsh->rows_per_band;
uint32_t bucket = band_hash(&sig->values[start],
lsh->rows_per_band);
size_t idx = (size_t)b * MAX_BUCKET + bucket;
BucketNode *node = lsh->buckets[idx];
while (node && count < max_results) {
result_buf[count++] = node->doc_id;
node = node->next;
}
}
return count;
}
void lsh_free(LSHIndex *lsh)
{
size_t total = (size_t)lsh->num_bands * MAX_BUCKET;
for (size_t i = 0; i < total; i++) {
BucketNode *node = lsh->buckets[i];
while (node) {
BucketNode *tmp = node;
node = node->next;
free(tmp);
}
}
free(lsh->buckets);
free(lsh);
}
/* 演示 */
int main(void)
{
srand(42);
uint32_t set_a[] = {1, 2, 3, 4, 5, 6, 7, 8};
uint32_t set_b[] = {1, 2, 3, 4, 5, 9, 10, 11};
uint32_t set_c[] = {20, 21, 22, 23, 24, 25, 26, 27};
MinHasher *mh = minhash_create(100);
Signature *sig_a = minhash_signature(mh, set_a, 8);
Signature *sig_b = minhash_signature(mh, set_b, 8);
Signature *sig_c = minhash_signature(mh, set_c, 8);
printf("J_hat(A,B) = %.3f (真实: 5/11 ≈ 0.455)\n",
jaccard_estimate(sig_a, sig_b));
printf("J_hat(A,C) = %.3f (真实: 0.000)\n",
jaccard_estimate(sig_a, sig_c));
LSHIndex *lsh = lsh_create(20, 5);
lsh_insert(lsh, 0, sig_a);
lsh_insert(lsh, 1, sig_b);
lsh_insert(lsh, 2, sig_c);
int results[100];
int n = lsh_query(lsh, sig_a, results, 100);
printf("LSH 候选对:");
for (int i = 0; i < n; i++) printf(" %d", results[i]);
printf("\n");
signature_free(sig_a); signature_free(sig_b); signature_free(sig_c);
lsh_free(lsh); minhash_free(mh);
return 0;
}7.2 C 语言实现:SimHash
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
/* FNV-1a 64-bit 哈希 */
static uint64_t fnv1a_64(const char *data, size_t len)
{
uint64_t h = 14695981039346656037ULL;
for (size_t i = 0; i < len; i++) {
h ^= (uint8_t)data[i];
h *= 1099511628211ULL;
}
return h;
}
/* 计算 64-bit SimHash */
uint64_t simhash_compute(const char **features, const double *weights,
int n)
{
double v[64];
memset(v, 0, sizeof(v));
for (int i = 0; i < n; i++) {
uint64_t h = fnv1a_64(features[i], strlen(features[i]));
double w = weights ? weights[i] : 1.0;
for (int j = 0; j < 64; j++) {
if (h & (1ULL << j))
v[j] += w;
else
v[j] -= w;
}
}
uint64_t fingerprint = 0;
for (int j = 0; j < 64; j++) {
if (v[j] > 0)
fingerprint |= (1ULL << j);
}
return fingerprint;
}
/* 利用 popcount 的快速 Hamming 距离 */
static inline int hamming_dist(uint64_t a, uint64_t b)
{
return __builtin_popcountll(a ^ b);
}
/* 分块索引:64-bit 分成 4 个 16-bit 块 */
#define NUM_BLOCKS 4
#define BLOCK_BITS 16
#define TABLE_SIZE (1 << BLOCK_BITS)
typedef struct DocEntry {
int doc_id;
uint64_t fingerprint;
struct DocEntry *next;
} DocEntry;
typedef struct {
DocEntry *tables[NUM_BLOCKS][TABLE_SIZE];
} SimHashIndex;
static inline uint16_t get_block(uint64_t fp, int block)
{
return (uint16_t)((fp >> (block * BLOCK_BITS)) & 0xFFFF);
}
void simhash_index_add(SimHashIndex *idx, int doc_id, uint64_t fp)
{
for (int b = 0; b < NUM_BLOCKS; b++) {
uint16_t key = get_block(fp, b);
DocEntry *entry = malloc(sizeof(DocEntry));
entry->doc_id = doc_id;
entry->fingerprint = fp;
entry->next = idx->tables[b][key];
idx->tables[b][key] = entry;
}
}
int simhash_index_query(SimHashIndex *idx, uint64_t fp,
int max_dist, int *results, int max_results)
{
int count = 0;
for (int b = 0; b < NUM_BLOCKS; b++) {
uint16_t key = get_block(fp, b);
for (DocEntry *e = idx->tables[b][key]; e && count < max_results; e = e->next) {
if (hamming_dist(fp, e->fingerprint) <= max_dist)
results[count++] = e->doc_id;
}
}
return count;
}
int main(void)
{
const char *doc_a[] = {"hello", "world", "foo", "bar", "test"};
const char *doc_b[] = {"hello", "world", "foo", "baz", "test"};
const char *doc_c[] = {"alpha", "beta", "gamma", "delta", "epsilon"};
uint64_t fp_a = simhash_compute(doc_a, NULL, 5);
uint64_t fp_b = simhash_compute(doc_b, NULL, 5);
uint64_t fp_c = simhash_compute(doc_c, NULL, 5);
printf("Hamming(A,B) = %d\n", hamming_dist(fp_a, fp_b));
printf("Hamming(A,C) = %d\n", hamming_dist(fp_a, fp_c));
SimHashIndex idx = {{{0}}};
simhash_index_add(&idx, 0, fp_a);
simhash_index_add(&idx, 1, fp_b);
simhash_index_add(&idx, 2, fp_c);
int results[100];
int n = simhash_index_query(&idx, fp_a, 3, results, 100);
printf("SimHash 近似文档(Hamming<=3):");
for (int i = 0; i < n; i++)
printf(" doc_%d", results[i]);
printf("\n");
return 0;
}7.3 Python 完整流水线
"""
MinHash + LSH + SimHash 完整流水线
用于文本近似重复检测
"""
import hashlib
import random
import struct
from collections import defaultdict
# ==================== Shingling ====================
def text_to_shingles(text, k=5):
"""将文本转为字符级 k-shingle 哈希集合"""
shingles = set()
text = text.lower().strip()
for i in range(len(text) - k + 1):
token = text[i:i + k]
h = struct.unpack('<I',
hashlib.md5(token.encode('utf-8')).digest()[:4])[0]
shingles.add(h)
return shingles
# ==================== MinHash ====================
class MinHash:
"""MinHash 签名生成器"""
def __init__(self, num_perm=128, seed=42):
self.num_perm = num_perm
self.prime = (1 << 31) - 1
rng = random.Random(seed)
self.a = [rng.randint(1, self.prime - 1) for _ in range(num_perm)]
self.b = [rng.randint(0, self.prime - 1) for _ in range(num_perm)]
def signature(self, shingle_set):
"""计算集合的 MinHash 签名"""
sig = [float('inf')] * self.num_perm
for elem in shingle_set:
for i in range(self.num_perm):
val = (self.a[i] * elem + self.b[i]) % self.prime
if val < sig[i]:
sig[i] = val
return sig
@staticmethod
def jaccard_from_sigs(sig_a, sig_b):
"""根据签名估计 Jaccard 相似度"""
matches = sum(1 for a, b in zip(sig_a, sig_b) if a == b)
return matches / len(sig_a)
# ==================== LSH ====================
class LSHIndex:
"""基于 Banding 的 LSH 索引"""
def __init__(self, num_bands, rows_per_band):
self.num_bands = num_bands
self.rows_per_band = rows_per_band
self.hash_tables = [defaultdict(set) for _ in range(num_bands)]
def insert(self, doc_id, signature):
"""将文档签名加入索引"""
for band_idx in range(self.num_bands):
start = band_idx * self.rows_per_band
end = start + self.rows_per_band
band = tuple(signature[start:end])
self.hash_tables[band_idx][band].add(doc_id)
def query(self, signature):
"""查询候选相似文档"""
candidates = set()
for band_idx in range(self.num_bands):
start = band_idx * self.rows_per_band
end = start + self.rows_per_band
band = tuple(signature[start:end])
candidates.update(self.hash_tables[band_idx][band])
return candidates
# ==================== SimHash ====================
class SimHash:
"""64-bit SimHash 指纹生成器"""
def __init__(self, f=64):
self.f = f
def fingerprint(self, features, weights=None):
"""计算 SimHash 指纹"""
if weights is None:
weights = [1.0] * len(features)
v = [0.0] * self.f
for feat, w in zip(features, weights):
h = self._hash_feature(feat)
for j in range(self.f):
if h & (1 << j):
v[j] += w
else:
v[j] -= w
result = 0
for j in range(self.f):
if v[j] > 0:
result |= (1 << j)
return result
def _hash_feature(self, feature):
digest = hashlib.md5(feature.encode('utf-8')).digest()
val = struct.unpack('<Q', digest[:8])[0]
return val & ((1 << self.f) - 1)
@staticmethod
def hamming(a, b):
x = a ^ b
count = 0
while x:
count += 1
x &= x - 1
return count
def similarity(self, a, b):
return 1.0 - self.hamming(a, b) / self.f
# ==================== 完整流水线 ====================
def dedup_pipeline(documents, threshold=0.5,
num_perm=128, num_bands=16, rows_per_band=8):
"""
文本去重完整流水线
documents: dict of {doc_id: text}
threshold: Jaccard 相似度阈值
返回: 近似重复的文档对列表
"""
mh = MinHash(num_perm=num_perm)
lsh = LSHIndex(num_bands=num_bands, rows_per_band=rows_per_band)
# 第一步:计算签名并索引
signatures = {}
shingle_sets = {}
for doc_id, text in documents.items():
shingles = text_to_shingles(text)
shingle_sets[doc_id] = shingles
sig = mh.signature(shingles)
signatures[doc_id] = sig
lsh.insert(doc_id, sig)
# 第二步:查找候选对
candidate_pairs = set()
for doc_id, sig in signatures.items():
candidates = lsh.query(sig)
for cand_id in candidates:
if cand_id != doc_id:
pair = tuple(sorted([doc_id, cand_id]))
candidate_pairs.add(pair)
# 第三步:精确验证
duplicates = []
for id_a, id_b in candidate_pairs:
set_a = shingle_sets[id_a]
set_b = shingle_sets[id_b]
jaccard = len(set_a & set_b) / len(set_a | set_b) if set_a | set_b else 0
if jaccard >= threshold:
duplicates.append((id_a, id_b, jaccard))
return sorted(duplicates, key=lambda x: -x[2])
# ==================== 演示 ====================
if __name__ == '__main__':
docs = {
'doc1': '今天天气真好,适合出去散步和锻炼身体',
'doc2': '今天天气真好,非常适合出去散步锻炼身体',
'doc3': '量子计算机可以在某些问题上超越经典计算机',
'doc4': '深度学习模型需要大量的训练数据和计算资源',
}
print('=== MinHash + LSH 去重 ===')
results = dedup_pipeline(docs, threshold=0.3,
num_perm=128, num_bands=16, rows_per_band=8)
for id_a, id_b, sim in results:
print(f' {id_a} <-> {id_b}: Jaccard = {sim:.3f}')
print('\n=== SimHash Hamming 距离 ===')
sh = SimHash(f=64)
fps = {did: sh.fingerprint(list(text)) for did, text in docs.items()}
ids = list(docs.keys())
for i in range(len(ids)):
for j in range(i + 1, len(ids)):
d = SimHash.hamming(fps[ids[i]], fps[ids[j]])
print(f' {ids[i]} <-> {ids[j]}: Hamming = {d}')八、MinHash vs SimHash 对比
8.1 理论对比
| 维度 | MinHash | SimHash |
|---|---|---|
| 对应相似度 | Jaccard(集合) | Cosine(向量) |
| 数据表示 | 无序集合(shingle 集合) | 加权特征向量 |
| 签名大小 | k 个 32-bit 值(通常 k=100~200) | 单个 64-bit 值 |
| 存储开销 | 较高(每文档数百字节) | 极低(每文档 8 字节) |
| 查询方式 | LSH Banding | Hamming 距离分块索引 |
| 适合场景 | 精确 token 级去重 | 语义级近似匹配 |
| 权重支持 | 不直接支持 | 天然支持特征权重 |
8.2 精度与召回
MinHash 可以通过增加 k 来提高精度。k = 100 时,Jaccard 估计标准差约 0.05。SimHash 极其紧凑(64-bit),但在低相似度区间(J < 0.5)区分能力较弱。
8.3 选择建议
场景 推荐算法 理由
─────────────────────────────────────────────────────────
搜索引擎网页去重 SimHash 存储紧凑,亿级规模
新闻聚类/相似文章推荐 MinHash 需要精细的相似度阈值
抄袭检测 MinHash token 级匹配更准确
商品去重 MinHash 特征是离散集合
推荐系统(向量表示) SimHash 向量空间的自然选择
实时流处理 SimHash 计算和比较都更快
8.4 混合方案
在某些系统中,两者可以配合使用:
- 用 SimHash 做粗粒度过滤(每文档 8 字节,全量比较代价低)
- 对 SimHash 粗筛通过的文档对,再用 MinHash 做精细验证
这种两阶段方案兼顾了效率和精度。
九、工程实践要点
9.1 常见陷阱与应对
| 陷阱 | 现象 | 解决方案 |
|---|---|---|
| 哈希冲突 | 不同 shingle 哈希到相同值 | 使用 64-bit 哈希,或双哈希 |
| 空集合 | 极短文本生成空 shingle 集 | 设下限,集合为空时跳过 |
| 高频 shingle | “的”、“是”等停用词主导 | 去停用词或使用词级 shingle |
| 内存爆炸 | 签名矩阵过大 | 流式计算,不存储原始集合 |
| 桶倾斜 | 某些桶包含过多文档 | 对大桶截断或二次哈希 |
| 字符编码 | UTF-8 中文切分位置错误 | 按字符而非字节切分 |
| 数值溢出 | 哈希计算中间值超出范围 | 用 64-bit 或 128-bit 中间变量 |
| 参数不匹配 | b×r != k 导致签名越界 | 断言 num_bands * rows_per_band == num_perm |
9.2 性能优化
MinHash 向量化(NumPy 广播避免 Python 循环):
import numpy as np
def minhash_vectorized(shingles, a_coeffs, b_coeffs, prime):
"""shingles: (n,), a/b: (k,) → 返回 (k,) 签名"""
s = shingles.astype(np.uint64)
a = a_coeffs.astype(np.uint64).reshape(-1, 1)
b = b_coeffs.astype(np.uint64).reshape(-1, 1)
return ((a * s + b) % prime).min(axis=1)SimHash 位并行:利用 CPU popcount 指令加速 Hamming 距离。
9.3 分布式部署
在大规模系统中,MinHash/LSH 需要分布式部署。常见方案:
MapReduce:Map 阶段对每个 band emit ((band_idx, band_hash), doc_id),Reduce 阶段输出同桶内的所有文档对,最后验证阶段计算精确 Jaccard。
Redis:用 Set 存储每个桶的文档 ID,查询时取 SUNION。
# Redis 索引/查询伪代码
for band_idx in range(b):
key = f"lsh:band:{band_idx}:{band_hash}"
redis.sadd(key, doc_id) # 索引
candidates |= redis.smembers(key) # 查询十、在搜索引擎与推荐系统中的部署
10.1 Google 的网页去重
Google 在 2007 年的论文《Detecting Near-Duplicates for Web Crawling》中描述了基于 SimHash 的去重系统:
- 爬取阶段:对每个网页计算 64-bit SimHash
- 索引阶段:将 SimHash 存入分块索引
- 去重阶段:对新爬取的网页,查询 Hamming 距离 ≤ 3 的已有网页
- 决策:如果找到近似重复,保留 PageRank 更高的版本
系统规模:数十亿网页,每天处理数亿新网页。64-bit SimHash 使得全量指纹可以驻留在内存中——80 亿网页仅需约 64 GB 内存。
10.2 推荐系统与新闻聚类
电商和内容平台需要避免推荐重复内容。典型流水线:离线计算 MinHash 签名并建立 LSH 索引,在线生成候选后用 LSH 检查相似度,贪心去重——从高分到低分遍历,与已选内容相似则跳过。
新闻聚类类似:对每篇新闻计算签名,用 LSH 找候选相似对,构建相似度图后跑连通分量算法,每个分量即为一个”故事”。
10.3 实际部署数据
| 系统 | 算法 | 文档规模 | 签名维度 | 阈值 | 延迟 |
|---|---|---|---|---|---|
| Google 去重 | SimHash | 80 亿 | 64-bit | Hamming ≤ 3 | < 1ms |
| 某新闻聚类 | MinHash | 千万级 | k=128, b=16, r=8 | J > 0.5 | < 10ms |
| 某电商去重 | MinHash | 亿级 | k=200, b=20, r=10 | J > 0.7 | < 5ms |
十一、扩展话题
11.1 变体算法
- 加权 MinHash(ICWS,Ioffe 2010):将 TF-IDF 等加权集合映射为无权签名,解决标准 MinHash 不支持权重的问题。
- b-bit MinHash(Li & K”onig 2011):只保留每个 MinHash 值的低 b 位(如 b=1),存储压缩 32 倍,代价是引入可控偏差。
- One Permutation Hashing(Li et al. 2012):只用一个排列代替 k 个哈希函数,大幅降低计算量。
11.2 超越文本
MinHash 的思想不限于文本:基因组比较中的 k-mer 集合(Mash 工具)、图的邻接集合相似度、用户行为集合的重叠检测等。
11.3 与深度学习方法的比较
| 维度 | MinHash/SimHash | 深度学习(BERT 等) |
|---|---|---|
| 速度 | 极快(微秒级) | 较慢(毫秒级) |
| 语义理解 | 弱(依赖词面匹配) | 强(理解同义词、改写) |
| 资源需求 | CPU,内存少 | GPU,内存大 |
| 适用规模 | 百亿级 | 亿级(需近似最近邻加速) |
工业实践中,常用 MinHash/SimHash 做初筛,再用深度模型做精排。
十二、个人思考
关于概率算法的哲学
MinHash 的证明只有三行,SimHash 的核心也只是一个内积的符号。这两个算法让我深刻体会到:最优雅的算法往往源于对问题本质的深刻理解,而不是复杂的技巧堆砌。
Pr[h(A) = h(B)] = J(A, B) 这个等式,将一个组合优化问题(集合比较)完美地转化为一个概率事件。这种转化的美感在于它不是近似——它是精确的。每一次随机排列都是一个无偏估计器,而 LSH 的 Banding 技术则是对多个估计器的巧妙聚合。
关于工程与理论的鸿沟
理论上,MinHash + LSH 将 O(N^2) 降到了 O(N)。但工程中的挑战往往不在算法本身:数据倾斜(模板页面形成超级桶)、增量更新(不能从头重建索引)、阈值调优(误报/漏报的代价不同)。每一条都需要针对具体业务场景的解决方案。
关于”足够好”的哲学
在精确算法和概率算法之间选择时,工程师需要思考的不是”哪个更好”,而是”什么精度是足够的”。对于搜索引擎去重,5% 的误报率是完全可以接受的——用户几乎不会注意到。但对于抄袭检测,同样的误报率可能导致学术生涯的毁灭。
概率算法的价值在于:它让我们用可控的精度损失,换取了数量级的效率提升。这种权衡贯穿了整个大数据时代的算法设计。
关于技术选型
在 2025 年的今天,BERT 和 Sentence Transformer 的语义理解能力远超 MinHash 和 SimHash。但在百亿规模的去重场景中,你不会用 BERT 做第一层过滤——它太慢了。MinHash/SimHash 解决的不是”理解文本”的问题,而是”在天文数字的候选对中快速缩小范围”的问题。这两个问题的最优解,可能永远不同。
参考文献
- A. Z. Broder. “On the Resemblance and Containment of Documents.” SEQUENCES 1997.
- A. Z. Broder, M. Charikar, A. M. Frieze, M. Mitzenmacher. “Min-wise Independent Permutations.” STOC 1998.
- M. S. Charikar. “Similarity Estimation Techniques from Rounding Algorithms.” STOC 2002.
- G. S. Manku, A. Jain, A. Das Sarma. “Detecting Near-Duplicates for Web Crawling.” WWW 2007.
- P. Indyk, R. Motwani. “Approximate Nearest Neighbors: Towards Removing the Curse of Dimensionality.” STOC 1998.
- J. Leskovec, A. Rajaraman, J. D. Ullman. “Mining of Massive Datasets.” Cambridge University Press, Chapter 3.
- P. Li, A. B. Owen, C. H. Zhang. “One Permutation Hashing.” NIPS 2012.
- A. Shrivastava, P. Li. “Densifying One Permutation Hashing via Rotation for Fast Near Neighbor Search.” ICML 2014.
- S. Ioffe. “Improved Consistent Sampling, Weighted Minhash and L1 Sketching.” ICDM 2010.
- P. Li, C. K”onig. “b-Bit Minwise Hashing.” WWW 2011.
算法系列导航:上一篇:水塘抽样 | 下一篇:频率估计的理论极限
相关阅读:LSH 局部敏感哈希 | HNSW