土法炼钢兴趣小组的算法知识备份

t-digest:分布式系统中的分位数估计

目录

凌晨三点,你被 PagerDuty 叫醒。告警显示 P99 延迟飙升到 2 秒。你打开 Grafana,看到那条刺眼的红线,心想——这条线是怎么画出来的?

背后的答案不是”把所有请求延迟排个序,取第 99 百分位”。一个每秒处理 10 万请求的服务,一天产生 86 亿个数据点。排序?不存在的。真正在跑的,是一个叫做 t-digest 的概率数据结构。它用几十 KB 的内存,在分布式环境下,给出尾部分位数的高精度估计。

这篇文章把 t-digest 从头拆到尾:问题动机、核心数据结构、缩放函数的数学直觉、分布式合并协议、完整实现、以及它与 DDSketch / KLL sketch 的本质区别。

t-digest 的质心压缩与分位数估计示意图

一、分位数估计问题

1.1 什么是分位数

给定一组有序数据 x₁ ≤ x₂ ≤ … ≤ xₙ,q-分位数(0 ≤ q ≤ 1)定义为:使得至少有 qN 个数据点不超过它的最小值。

常见的几个分位数在监控中的含义:

分位数 别名 监控含义
0.50 P50 / 中位数 典型用户体验
0.90 P90 90% 用户的上界
0.99 P99 SLO 的黄金指标
0.999 P999 极端尾部,往往暴露系统瓶颈

1.2 为什么精确计算代价高昂

精确计算 q-分位数需要:

  1. 存储全部 N 个数据点;
  2. 排序(O(N log N))或使用选择算法(O(N));
  3. 取第 ⌈qN⌉ 个元素。

问题在于 N 可以非常大。一个在线服务的监控场景:

QPS = 100,000
时间窗口 = 5 分钟
N = 100,000 × 300 = 30,000,000

存储 float64:30M × 8B = 240 MB

若按 1 小时窗口:
N = 360,000,000
存储 = 2.88 GB

更要命的是分布式场景。你有 100 台机器,每台产出独立的延迟数据。要算全局 P99,你不能简单地对各节点的 P99 取平均——这在数学上是错的。你必须合并全部数据,或者用一种支持合并的摘要结构。

1.3 流式分位数估计的需求

我们需要一种数据结构,满足以下约束:

  1. 亚线性空间:内存占用远小于 O(N);
  2. 单遍扫描:数据点逐个到达,不能回头;
  3. 可合并:两个独立构建的摘要可以合并为一个,且误差不爆炸;
  4. 尾部精度:P99 / P999 的误差要比 P50 更小。

t-digest 恰好满足这四点。

二、t-digest 的核心思想

t-digest 由 Ted Dunning 在 2019 年正式发表(论文标题:Computing Extremely Accurate Quantiles Using t-Digests),尽管其早期实现可追溯到 2013 年。

2.1 质心(Centroid)

t-digest 的基本单元是质心(centroid),每个质心是一个二元组 (mean, weight):

一个 t-digest 由一组有序的质心 C₁, C₂, …, Cₘ 组成,按 mean 值递增排列。总数据量 N = Σwᵢ。

质心集合示例:
C₁ = (mean=2.3,  weight=1)    ← 尾部,weight 小
C₂ = (mean=5.1,  weight=3)
C₃ = (mean=12.7, weight=15)
C₄ = (mean=45.2, weight=120)  ← 中部,weight 大
C₅ = (mean=78.9, weight=150)  ← 中部
C₆ = (mean=102.4, weight=18)
C₇ = (mean=198.5, weight=2)   ← 尾部,weight 小
C₈ = (mean=350.1, weight=1)   ← 尾部

2.2 核心不变量

t-digest 的精髓在一句话:越靠近分布两端的质心,weight 越小

这意味着:

这个性质由缩放函数强制保证。

2.3 数据摘要 vs 精确排序

把 t-digest 理解为数据的一种”有损压缩”:

原始数据        [1, 2, 2, 3, 3, 3, 50, 51, 52, 100, 200, 500]
                    ↓ 压缩
t-digest        [(2.0, 3), (3.0, 3), (51.0, 3), (100, 1), (200, 1), (500, 1)]
                 中部合并多     中部合并多      尾部保持分离

中部三个质心各合并了 3 个点;尾部的 100、200、500 各保留为独立质心。查询 P99 时,尾部的高分辨率保证了估计精度。

三、缩放函数

3.1 直觉

缩放函数 k(q) 是一个从 [0, 1] 到 [0, δ/2] 的单调递增函数,其中 δ 是压缩参数。它定义了”在分位数 q 附近,允许多少个质心”。

核心思路:k(q) 的导数 k’(q) 表示单位分位数区间内的质心密度。如果我们让 k’(q) 在 q 接近 0 和 1 时很大、在 q 接近 0.5 时很小,就实现了”尾部密、中部疏”的效果。

3.2 常用缩放函数

t-digest 论文定义了多个缩放函数:

k₁(默认)

k₁(q) = (δ / 2π) · arcsin(2q - 1)

导数:

k₁'(q) = δ / (2π · √(q(1-q)))

当 q → 0 或 q → 1 时,k₁’(q) → ∞,意味着尾部的质心密度趋向无穷大。

k₂(实践中常用)

k₂(q) = (δ / 4·log(n/δ)) · log(q / (1-q))

k₂ 的优势在于对数据量 n 有自适应性。

k₀(最简单)

k₀(q) = (δ / 2) · q

k₀ 是均匀分布质心,不提供尾部增强。仅用于对照。

3.3 质心的 weight 上界

给定缩放函数 k,位于累积分位数 q 处的质心,其 weight 受如下约束:

weight(Cᵢ) ≤ N · Δq    其中 Δq 使得 k(q + Δq) - k(q) ≤ 1

即:一个质心在 k 空间中最多”占据”1 个单位。由于 k’(q) 在尾部很大,Δq 就很小,于是 weight 也很小。

以 k₁ 为例,在 q = 0.001(P99.9 附近):

k₁'(0.001) = δ / (2π · √(0.001 × 0.999))
            ≈ δ / (2π · 0.0316)
            ≈ 5.03 · δ

δ = 200 时,k₁'(0.001) ≈ 1006

最大 Δq ≈ 1/1006 ≈ 0.000994
最大 weight ≈ N × 0.000994

对于 N = 1,000,000:尾部质心最大 weight ≈ 994。而中部 q = 0.5 处:

k₁'(0.5) = δ / (2π · √(0.25)) = δ / π ≈ 63.7

最大 Δq ≈ 1/63.7 ≈ 0.0157
最大 weight ≈ 15,700

尾部质心比中部质心小 15 倍以上,这就是尾部精度的来源。

3.4 压缩参数 δ 的选择

δ 值 质心数 内存 适用场景
100 ~50-100 ~1.6 KB 资源受限的嵌入式监控
200 ~100-200 ~3.2 KB 通用默认值
500 ~250-500 ~8 KB 高精度 P999 监控
1000 ~500-1000 ~16 KB 极端精度需求

实践中 δ = 200 足以满足绝大多数 P99 监控需求。

四、插入与查询算法

4.1 插入算法

t-digest 支持逐点插入和批量插入。逐点插入的伪代码:

function Add(td, x):
    // 找到最近的质心
    candidates = FindNearest(td.centroids, x)

    for c in candidates:
        // 计算该质心的 weight 上限
        q = CumulativeWeight(c) / td.totalWeight
        maxWeight = td.totalWeight * deltaQ(k, q)

        if c.weight + 1 <= maxWeight:
            // 合并到该质心
            c.mean = (c.mean * c.weight + x) / (c.weight + 1)
            c.weight += 1
            td.totalWeight += 1
            return

    // 没有质心能接受,创建新质心
    InsertNewCentroid(td, x, weight=1)
    td.totalWeight += 1

    // 如果质心数过多,触发压缩
    if len(td.centroids) > threshold:
        Compress(td)

4.2 压缩算法

当质心数量超过阈值时(通常为 2δ 或 3δ),执行压缩:

function Compress(td):
    sorted = SortByMean(td.centroids)
    result = [sorted[0]]
    qSoFar = sorted[0].weight / td.totalWeight

    for i = 1 to len(sorted) - 1:
        c = sorted[i]
        qNext = qSoFar + c.weight / td.totalWeight
        maxWeight = td.totalWeight * deltaQ(k, qSoFar)

        if result.last.weight + c.weight <= maxWeight:
            // 合并
            merged = result.last
            merged.mean = (merged.mean * merged.weight + c.mean * c.weight)
                          / (merged.weight + c.weight)
            merged.weight += c.weight
        else:
            result.append(c)

        qSoFar = qNext

    td.centroids = result

4.3 分位数查询

给定 q,查询对应的值:

function Quantile(td, q):
    target = q * td.totalWeight
    cumulative = 0

    for i, c in enumerate(td.centroids):
        if cumulative + c.weight > target:
            // 在质心 i-1 和 i 之间插值
            innerQ = (target - cumulative) / c.weight
            if i == 0:
                return c.mean
            prev = td.centroids[i-1]
            return prev.mean + (c.mean - prev.mean) * innerQ
        cumulative += c.weight

    return td.centroids.last.mean

查询时间 O(m),其中 m 是质心数。由于 m ≈ δ,这通常是几百次比较,可以用二分查找优化到 O(log δ)。

五、分布式合并

5.1 合并算法

t-digest 的杀手特性是可合并性。两个独立构建的 t-digest 可以合并为一个,且精度损失极小。

function Merge(td1, td2):
    // 收集所有质心
    all = td1.centroids + td2.centroids

    // 随机打乱(避免偏差)
    Shuffle(all)

    // 按 mean 排序
    SortByMean(all)

    // 用新的空 t-digest 逐个吸收
    result = NewTDigest(delta = max(td1.delta, td2.delta))
    for c in all:
        result.AddCentroid(c.mean, c.weight)

    result.Compress()
    return result

合并后质心数不超过 δ,与输入规模无关。这使得 t-digest 天然适合 MapReduce / 树形聚合。

5.2 分布式聚合架构

          Node 1          Node 2          Node 3
         t-digest₁       t-digest₂       t-digest₃
            \                |                /
             \               |               /
              ↘              ↓              ↙
              ┌─────────────────────────────┐
              │   Aggregator: Merge + 压缩  │
              │   最终 t-digest (δ=200)      │
              │   质心数 ≤ 200               │
              └─────────────────────────────┘
                        ↓
                  全局 P99 查询

关键性质:

  1. 每个节点只传输 ~δ 个质心(几 KB),网络开销极小;
  2. 合并是结合律的:merge(A, merge(B, C)) = merge(merge(A, B), C);
  3. 合并后的 t-digest 精度与在全量数据上直接构建的 t-digest 精度接近。

5.3 为什么不能合并百分位数

一个常见错误是把各节点的 P99 取平均或取最大值作为全局 P99。这是错的:

Node A: [1, 1, 1, 1, 1, 1, 1, 1, 1, 100]  → P99 = 100
Node B: [1, 1, 1, 1, 1, 1, 1, 1, 1, 2]    → P99 = 2

平均 P99 = 51
最大 P99 = 100

真实全局 P99(合并排序后第 198 个)= 1

两种方法都错得离谱。

t-digest 的合并操作在数据摘要层面进行,数学上保持了正确性。

六、完整实现

6.1 Go 实现

package tdigest

import (
    "math"
    "sort"
)

// Centroid 表示一个质心。
type Centroid struct {
    Mean   float64
    Weight float64
}

// TDigest 是 t-digest 的核心结构。
type TDigest struct {
    centroids   []Centroid
    delta       float64
    totalWeight float64
    maxSize     int
    unmerged    int
}

// New 创建一个新的 t-digest,delta 为压缩参数。
func New(delta float64) *TDigest {
    if delta <= 0 {
        delta = 200
    }
    return &TDigest{
        centroids: make([]Centroid, 0, int(delta)*2),
        delta:     delta,
        maxSize:   int(delta) * 5,
    }
}

// scaleFunc 使用 k₁ 缩放函数计算分位数 q 处的缩放值。
func (td *TDigest) scaleFunc(q float64) float64 {
    return (td.delta / (2 * math.Pi)) * math.Asin(2*q-1)
}

// maxWeight 计算分位数 q 处允许的最大质心 weight。
func (td *TDigest) maxWeight(q float64) float64 {
    // k'(q) = delta / (2*pi * sqrt(q*(1-q)))
    // maxW = totalWeight / k'(q) = totalWeight * 2*pi*sqrt(q*(1-q)) / delta
    if q <= 0 || q >= 1 {
        return 1
    }
    return td.totalWeight * 2 * math.Pi * math.Sqrt(q*(1-q)) / td.delta
}

// Add 向 t-digest 中插入一个值。
func (td *TDigest) Add(x float64) {
    td.AddWeighted(x, 1)
}

// AddWeighted 向 t-digest 中插入一个带 weight 的值。
func (td *TDigest) AddWeighted(x, w float64) {
    td.centroids = append(td.centroids, Centroid{Mean: x, Weight: w})
    td.totalWeight += w
    td.unmerged++

    if len(td.centroids) > td.maxSize {
        td.Compress()
    }
}

// Compress 执行质心压缩。
func (td *TDigest) Compress() {
    if len(td.centroids) <= 1 {
        return
    }

    sort.Slice(td.centroids, func(i, j int) bool {
        return td.centroids[i].Mean < td.centroids[j].Mean
    })

    result := make([]Centroid, 0, int(td.delta)*2)
    result = append(result, td.centroids[0])
    qSoFar := td.centroids[0].Weight / td.totalWeight

    for i := 1; i < len(td.centroids); i++ {
        c := td.centroids[i]
        proposed := result[len(result)-1].Weight + c.Weight
        qNext := qSoFar + c.Weight/td.totalWeight

        // 在中间分位数处检查约束
        qMid := qSoFar + (c.Weight/td.totalWeight)/2
        limit := td.maxWeight(qMid)

        if proposed <= limit {
            last := &result[len(result)-1]
            newWeight := last.Weight + c.Weight
            last.Mean = (last.Mean*last.Weight + c.Mean*c.Weight) / newWeight
            last.Weight = newWeight
        } else {
            result = append(result, c)
        }
        qSoFar = qNext
    }

    td.centroids = result
    td.unmerged = 0
}

// Quantile 查询 q-分位数(0 ≤ q ≤ 1)对应的值。
func (td *TDigest) Quantile(q float64) float64 {
    if len(td.centroids) == 0 {
        return 0
    }

    if td.unmerged > 0 {
        td.Compress()
    }

    if q <= 0 {
        return td.centroids[0].Mean
    }
    if q >= 1 {
        return td.centroids[len(td.centroids)-1].Mean
    }

    target := q * td.totalWeight
    cumulative := 0.0

    for i, c := range td.centroids {
        if cumulative+c.Weight > target {
            // 在前一个质心和当前质心之间线性插值
            if i == 0 {
                return c.Mean
            }
            prev := td.centroids[i-1]
            innerRatio := (target - cumulative) / c.Weight
            return prev.Mean + (c.Mean-prev.Mean)*
                (0.5+(innerRatio-0.5)*c.Weight/(prev.Weight+c.Weight)*2)
        }
        cumulative += c.Weight
    }

    return td.centroids[len(td.centroids)-1].Mean
}

// Merge 将另一个 t-digest 合并到当前实例中。
func (td *TDigest) Merge(other *TDigest) {
    for _, c := range other.centroids {
        td.AddWeighted(c.Mean, c.Weight)
    }
    td.Compress()
}

// CentroidCount 返回当前质心数量。
func (td *TDigest) CentroidCount() int {
    return len(td.centroids)
}

// TotalWeight 返回已吸收的数据总量。
func (td *TDigest) TotalWeight() float64 {
    return td.totalWeight
}

// CDF 估计值 x 的累积分布函数值。
func (td *TDigest) CDF(x float64) float64 {
    if len(td.centroids) == 0 {
        return 0
    }

    if td.unmerged > 0 {
        td.Compress()
    }

    if x <= td.centroids[0].Mean {
        return 0
    }
    if x >= td.centroids[len(td.centroids)-1].Mean {
        return 1
    }

    cumulative := 0.0
    for i, c := range td.centroids {
        if c.Mean > x {
            if i == 0 {
                return 0
            }
            prev := td.centroids[i-1]
            frac := (x - prev.Mean) / (c.Mean - prev.Mean)
            return (cumulative - prev.Weight/2 + prev.Weight*frac) / td.totalWeight
        }
        cumulative += c.Weight
    }
    return 1
}

6.2 C 实现

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>

#ifndef M_PI
#define M_PI 3.14159265358979323846
#endif

typedef struct {
    double mean;
    double weight;
} centroid_t;

typedef struct {
    centroid_t *centroids;
    int         count;
    int         capacity;
    double      delta;
    double      total_weight;
    int         max_size;
    int         unmerged;
} tdigest_t;

static int centroid_cmp(const void *a, const void *b) {
    double da = ((const centroid_t *)a)->mean;
    double db = ((const centroid_t *)b)->mean;
    if (da < db) return -1;
    if (da > db) return 1;
    return 0;
}

tdigest_t *tdigest_new(double delta) {
    if (delta <= 0) delta = 200;
    tdigest_t *td = (tdigest_t *)calloc(1, sizeof(tdigest_t));
    td->delta = delta;
    td->capacity = (int)(delta * 5);
    td->max_size = td->capacity;
    td->centroids = (centroid_t *)malloc(sizeof(centroid_t) * td->capacity);
    td->count = 0;
    td->total_weight = 0;
    td->unmerged = 0;
    return td;
}

void tdigest_free(tdigest_t *td) {
    if (td) {
        free(td->centroids);
        free(td);
    }
}

static double max_weight(tdigest_t *td, double q) {
    if (q <= 0 || q >= 1) return 1;
    return td->total_weight * 2 * M_PI * sqrt(q * (1 - q)) / td->delta;
}

static void ensure_capacity(tdigest_t *td) {
    if (td->count >= td->capacity) {
        td->capacity *= 2;
        td->centroids = (centroid_t *)realloc(
            td->centroids, sizeof(centroid_t) * td->capacity);
    }
}

void tdigest_compress(tdigest_t *td) {
    if (td->count <= 1) return;

    qsort(td->centroids, td->count, sizeof(centroid_t), centroid_cmp);

    centroid_t *result = (centroid_t *)malloc(sizeof(centroid_t) * td->count);
    result[0] = td->centroids[0];
    int rcount = 1;
    double q_so_far = td->centroids[0].weight / td->total_weight;

    for (int i = 1; i < td->count; i++) {
        centroid_t c = td->centroids[i];
        double proposed = result[rcount - 1].weight + c.weight;
        double q_mid = q_so_far + (c.weight / td->total_weight) / 2.0;
        double limit = max_weight(td, q_mid);

        if (proposed <= limit) {
            centroid_t *last = &result[rcount - 1];
            double new_weight = last->weight + c.weight;
            last->mean = (last->mean * last->weight + c.mean * c.weight)
                         / new_weight;
            last->weight = new_weight;
        } else {
            result[rcount++] = c;
        }
        q_so_far += c.weight / td->total_weight;
    }

    memcpy(td->centroids, result, sizeof(centroid_t) * rcount);
    td->count = rcount;
    td->unmerged = 0;
    free(result);
}

void tdigest_add(tdigest_t *td, double x, double w) {
    ensure_capacity(td);
    td->centroids[td->count++] = (centroid_t){.mean = x, .weight = w};
    td->total_weight += w;
    td->unmerged++;

    if (td->count > td->max_size) {
        tdigest_compress(td);
    }
}

double tdigest_quantile(tdigest_t *td, double q) {
    if (td->count == 0) return 0;
    if (td->unmerged > 0) tdigest_compress(td);
    if (q <= 0) return td->centroids[0].mean;
    if (q >= 1) return td->centroids[td->count - 1].mean;

    double target = q * td->total_weight;
    double cumulative = 0;

    for (int i = 0; i < td->count; i++) {
        centroid_t c = td->centroids[i];
        if (cumulative + c.weight > target) {
            if (i == 0) return c.mean;
            centroid_t prev = td->centroids[i - 1];
            double inner = (target - cumulative) / c.weight;
            return prev.mean + (c.mean - prev.mean) *
                   (0.5 + (inner - 0.5) * c.weight /
                    (prev.weight + c.weight) * 2);
        }
        cumulative += c.weight;
    }
    return td->centroids[td->count - 1].mean;
}

double tdigest_cdf(tdigest_t *td, double x) {
    if (td->count == 0) return 0;
    if (td->unmerged > 0) tdigest_compress(td);
    if (x <= td->centroids[0].mean) return 0;
    if (x >= td->centroids[td->count - 1].mean) return 1;

    double cumulative = 0;
    for (int i = 0; i < td->count; i++) {
        centroid_t c = td->centroids[i];
        if (c.mean > x) {
            if (i == 0) return 0;
            centroid_t prev = td->centroids[i - 1];
            double frac = (x - prev.mean) / (c.mean - prev.mean);
            return (cumulative - prev.weight / 2 + prev.weight * frac)
                   / td->total_weight;
        }
        cumulative += c.weight;
    }
    return 1;
}

void tdigest_merge(tdigest_t *dst, const tdigest_t *src) {
    for (int i = 0; i < src->count; i++) {
        tdigest_add(dst, src->centroids[i].mean, src->centroids[i].weight);
    }
    tdigest_compress(dst);
}

/* ---- 使用示例 ---- */
int main(void) {
    tdigest_t *td = tdigest_new(200);

    /* 模拟长尾延迟分布 */
    for (int i = 0; i < 100000; i++) {
        double latency;
        double r = (double)rand() / RAND_MAX;
        if (r < 0.95)
            latency = 10.0 + (double)(rand() % 40);   /* 10-50ms 正常 */
        else if (r < 0.99)
            latency = 50.0 + (double)(rand() % 150);   /* 50-200ms 慢 */
        else
            latency = 200.0 + (double)(rand() % 800);  /* 200-1000ms 极慢 */

        tdigest_add(td, latency, 1);
    }

    printf("P50  = %.2f ms\n", tdigest_quantile(td, 0.50));
    printf("P90  = %.2f ms\n", tdigest_quantile(td, 0.90));
    printf("P99  = %.2f ms\n", tdigest_quantile(td, 0.99));
    printf("P999 = %.2f ms\n", tdigest_quantile(td, 0.999));
    printf("Centroids: %d\n", td->count);
    printf("Memory: ~%lu bytes\n",
           (unsigned long)(td->count * sizeof(centroid_t)));

    tdigest_free(td);
    return 0;
}

6.3 使用示例(Go)

func main() {
    td := tdigest.New(200)

    // 模拟延迟数据
    rng := rand.New(rand.NewSource(42))
    for i := 0; i < 1000000; i++ {
        var latency float64
        r := rng.Float64()
        switch {
        case r < 0.95:
            latency = 10 + rng.Float64()*40   // 正常:10-50ms
        case r < 0.99:
            latency = 50 + rng.Float64()*150   // 较慢:50-200ms
        default:
            latency = 200 + rng.Float64()*800  // 极慢:200-1000ms
        }
        td.Add(latency)
    }

    fmt.Printf("P50  = %.2f ms\n", td.Quantile(0.50))
    fmt.Printf("P90  = %.2f ms\n", td.Quantile(0.90))
    fmt.Printf("P99  = %.2f ms\n", td.Quantile(0.99))
    fmt.Printf("P999 = %.2f ms\n", td.Quantile(0.999))
    fmt.Printf("质心数: %d\n", td.CentroidCount())

    // 分布式合并
    td2 := tdigest.New(200)
    for i := 0; i < 500000; i++ {
        td2.Add(rng.Float64() * 100)
    }

    td.Merge(td2)
    fmt.Printf("合并后 P99 = %.2f ms\n", td.Quantile(0.99))
    fmt.Printf("合并后质心数: %d\n", td.CentroidCount())
}

七、与其他分位数算法的对比

7.1 GK Sketch(Greenwald-Khanna 2001)

GK sketch 是最经典的流式分位数算法。它维护一组”元组”(v, g, Δ),保证对任意 q 分位数,绝对误差不超过 εN。

优点: - 有严格的理论误差保证; - 空间 O((1/ε) · log(εN))。

缺点: - 不提供尾部增强。P50 和 P999 的绝对误差一样大; - 合并操作复杂且有精度损失; - 实际误差往往比 t-digest 差。

7.2 DDSketch(Datadog 2019)

DDSketch 由 Datadog 团队提出,核心思想是用对数桶(logarithmic bins)来保证相对误差

桶编号 i 对应区间 [γⁱ, γⁱ⁺¹)
其中 γ = (1 + α) / (1 - α)

相对误差保证:|估计值 - 真实值| / 真实值 ≤ α

优点: - 相对误差保证——对 P999 的误差也是 α 比例的,不是绝对的; - 合并只需逐桶相加,O(m) 且无精度损失; - 实现极简(一个整数数组)。

缺点: - 只适用于正数(延迟监控没问题,但不能用于温度等有负值的数据); - 当数据范围极大时(比如 1ns 到 1 小时),桶数会很多; - 不适合非延迟类的通用分位数查询。

DDSketch 参数选择:
α = 0.01 (1% 相对误差)
γ = 1.01 / 0.99 ≈ 1.0202

延迟范围 [1ms, 10s]:
桶数 = log(10000/1) / log(1.0202) ≈ 460 个桶
内存 ≈ 460 × 4B = 1.84 KB

7.3 KLL Sketch(Karnin-Lang-Liberty 2016)

KLL sketch 来自理论界,被证明是空间最优的:对于误差 ε,空间为 O((1/ε) · √(log(1/ε)))。

优点: - 理论最优空间; - 可合并; - Apache DataSketches 库有高质量实现。

缺点: - 和 GK 类似,不提供尾部增强; - 实现复杂度较高(多级压缩器); - 对 P999 的精度不如 t-digest 和 DDSketch。

7.4 综合对比

特性 t-digest DDSketch KLL GK
误差类型 秩误差(尾部增强) 相对误差 绝对秩误差 绝对秩误差
尾部精度 极高 极高 一般 一般
可合并 是(完美) 部分
空间 O(δ) O(log(范围)/α) O(1/ε·√log) O(1/ε·log)
支持负数 否(需扩展)
实现复杂度
实际内存(P99 精度 ~1%) ~3 KB ~2 KB ~4 KB ~8 KB
主要采用者 ES、ClickHouse Datadog、Go metrics Apache DataSketches 学术

7.5 选择建议

如果你在做延迟监控,且关心 P99/P999:
    → DDSketch(相对误差,合并最简单)
    → t-digest(更通用,尾部精度也很好)

如果你需要处理任意数值(含负数):
    → t-digest 或 KLL

如果你需要严格的理论误差界:
    → KLL 或 GK

如果你用 Elasticsearch / ClickHouse:
    → 你已经在用 t-digest 了

八、为什么 P999 比 P99 难估计

这是一个经常被忽视的问题。直觉上,P999 只是”比 P99 再精确一点”。但从统计学角度,两者的难度差距是数量级的。

8.1 样本量需求

要估计 P99,你至少需要 ~100 个样本才能有一个样本落在第 99 百分位附近。要估计 P999,你需要 ~1000 个样本。

但实际情况更糟。置信区间的宽度与 √(q(1-q)/N) 成正比:

P99  的标准误差 ∝ √(0.99 × 0.01 / N) = √(0.0099 / N)
P999 的标准误差 ∝ √(0.999 × 0.001 / N) = √(0.000999 / N)

要达到相同的相对精度:
P999 需要的样本量是 P99 的 ~10 倍

8.2 尾部数据稀疏

在 N = 10,000 个样本中:

P99 附近(q ∈ [0.98, 1.00])的样本数:~200
P999 附近(q ∈ [0.998, 1.000])的样本数:~20

20 个样本要精确估计一个值?误差必然很大。

这就是为什么 t-digest 的缩放函数如此重要——它把更多的”分辨率预算”分配给尾部。

8.3 实际影响

假设你的服务 P99 = 200ms,P999 = 800ms。如果估计误差为 10%:

P99 误差:200ms × 10% = 20ms(可接受)
P999 误差:800ms × 10% = 80ms(这 80ms 可能是一个慢查询的差距)

而且 P999 往往是 SLO 的关键指标。Google 的 Site Reliability Engineering 一书明确指出:看 P999,不看 P99,才能发现长尾问题

8.4 t-digest 的尾部优势

回到缩放函数。在 q = 0.999 处,k₁’(0.999) ≈ 5δ,而在 q = 0.99 处,k₁’(0.99) ≈ 1.6δ。这意味着 P999 附近的质心密度是 P99 附近的 3 倍以上。

这不是巧合,而是设计。t-digest 的缩放函数刻意给尾部分配了不成比例的精度预算。

九、在生产系统中的应用

9.1 Elasticsearch

Elasticsearch 的 percentiles 聚合默认使用 t-digest(compression 参数对应 δ):

{
  "aggs": {
    "latency_percentiles": {
      "percentiles": {
        "field": "response_time",
        "percents": [50, 90, 99, 99.9],
        "tdigest": {
          "compression": 200
        }
      }
    }
  }
}

Elasticsearch 利用 t-digest 的可合并性在分片间聚合分位数。每个分片独立计算自己的 t-digest,然后在协调节点上合并。

9.2 ClickHouse

ClickHouse 提供 quantileTDigest 函数:

SELECT
    quantileTDigest(0.50)(response_time) AS p50,
    quantileTDigest(0.99)(response_time) AS p99,
    quantileTDigest(0.999)(response_time) AS p999
FROM requests
WHERE timestamp > now() - INTERVAL 1 HOUR;

ClickHouse 的实现针对列存储做了优化,批量处理效率高于逐点插入。

9.3 Prometheus 的现状与局限

Prometheus 目前使用的是客户端直方图(histogram),而不是 t-digest。客户端预设若干桶边界,在服务端进行桶内线性插值:

# Prometheus histogram 配置
- name: http_request_duration_seconds
  type: histogram
  buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]

这种方式的问题:

  1. 桶边界是静态的,无法自适应数据分布;
  2. 尾部桶太粗(比如 [5, 10][10, +Inf]),P999 估计不准;
  3. 分位数计算假设桶内均匀分布,实际往往不是。

Prometheus 社区正在讨论引入原生直方图(native histogram),其中一种方案就是基于 t-digest 或 DDSketch 的思路。

9.4 Go 生态

Go 常用的 t-digest 库:

// github.com/caio/go-tdigest (最流行)
import "github.com/caio/go-tdigest/v4"

td, _ := tdigest.New(tdigest.Compression(200))
td.Add(42.0)
p99 := td.Quantile(0.99)

// github.com/influxdata/tdigest (InfluxDB 使用)
import "github.com/influxdata/tdigest"

td := tdigest.NewWithCompression(200)
td.Add(42.0, 1)
p99 := td.ValueAt(0.99)

十、工程陷阱

陷阱 现象 解决方案
δ 太小导致尾部失真 P999 误差超过 20%,且抖动大 增大 δ 到 500 或更高;或者换用 DDSketch
并发写入 t-digest 数据竞争导致 panic 或质心损坏 加锁(sync.Mutex);或每个 goroutine 独立 t-digest 后合并
合并顺序依赖 不同合并顺序产生略有不同的结果 这是正常的——t-digest 不保证确定性。如需确定性,使用固定种子的 shuffle
忽略极端值(min/max) t-digest 的 P0 和 P100 估计不准 单独记录 min 和 max,查询时用它们做边界
在空 t-digest 上查询 返回 0 或 NaN 查询前检查 TotalWeight > 0
数据全相同导致退化 所有质心 mean 相同,weight 极大 这不是 bug——CDF 在该点处跳变到 1。如需平滑,添加微量噪声
序列化不含 min/max 反序列化后边界查询不准 序列化时包含全局 min 和 max
float64 精度在极大值时丢失 weight 累加后精度下降 使用 Kahan 求和或定期重新计算 total_weight
桶边界假设均匀分布 Prometheus 式直方图在偏态分布上严重失准 换用 t-digest 或 DDSketch
时间窗口内数据量太少 5 秒窗口内只有 50 个样本,P99 无意义 增大窗口或使用滑动 t-digest;明确标注 “样本不足”
合并时 δ 不一致 两个 δ 不同的 t-digest 合并后精度不确定 统一所有节点的 δ 值
误以为 t-digest 给出精确值 对结果过度信赖 t-digest 是估计。在关键决策上,配合精确计算做交叉验证

十一、复杂度总结与个人思考

11.1 复杂度

操作 时间 空间
插入 均摊 O(1),最坏 O(δ log δ)(触发压缩) O(δ)
查询分位数 O(δ)(线性扫描)或 O(log δ)(二分)
合并 O(δ log δ) O(δ)
压缩 O(δ log δ) O(δ)

11.2 个人思考

t-digest 是工程品味的胜利

从理论角度看,KLL sketch 有最优空间界。从误差保证角度看,DDSketch 的相对误差更干净。t-digest 在论文里没有给出闭式的误差界——只有实验验证。在纯理论的学术评审中,这是一个弱点。

但 t-digest 赢在了实践中。它的缩放函数不需要预知数据范围(DDSketch 需要知道桶的上下界);它能处理负数(DDSketch 不行);它的合并语义简单直观(GK 的合并非常复杂)。最重要的是——Elasticsearch 选了它,ClickHouse 选了它,整个可观测性生态都在用它。工程世界里,adoption 就是最好的证明。

缩放函数是 t-digest 最深刻的洞察。把”尾部需要更高精度”这个直觉,变成一个数学上优雅的约束:k(q) 在 q 接近端点时导数趋向无穷。这不只是一个 trick,而是对”精度预算分配”问题的本质理解。

DDSketch 值得更多关注。在我经手的延迟监控场景中,DDSketch 的实现最简单、合并最快、误差保证最清晰。如果你只需要监控正数延迟,DDSketch 可能比 t-digest 更合适。它之所以不如 t-digest 知名,很大程度上是因为 Elasticsearch 的选择效应。

P999 监控被严重低估。大多数团队只看 P99,但 P999 往往暴露的是完全不同的问题——跨数据中心的网络抖动、GC 暂停、锁竞争。一个 P99 = 50ms 但 P999 = 5s 的服务,和一个 P99 = 100ms 但 P999 = 200ms 的服务,后者的用户体验可能好得多。

分布式合并是被低估的特性。单机上,排序加选择就能算分位数,t-digest 的优势不大。真正拉开差距的是多节点场景。当你有 1000 台机器,每台持有一小块数据,t-digest 让你只需传输几 KB 就能得到全局分位数。这在微服务架构里,不是”可有可无”,而是”没有就做不到”。

最后一个思考:t-digest 和 HyperLogLog 是同一类设计哲学的产物——用数学洞察换空间。HLL 用随机哈希的极值分布估计基数,t-digest 用缩放函数的不均匀性估计分位数。它们都在告诉我们:好的近似算法不是”把精确算法打个折”,而是找到问题的本质结构,然后用最少的信息捕捉它。

十二、参考资料

  1. Dunning, T., & Ertl, O. (2019). Computing Extremely Accurate Quantiles Using t-Digests. arXiv:1902.04023.
  2. Masson, C., Rim, J. E., & Lee, H. K. (2019). DDSketch: A Fast and Fully-Mergeable Quantile Sketch with Relative-Error Guarantees. PVLDB, 12(12).
  3. Karnin, Z., Lang, K., & Liberty, E. (2016). Optimal Quantile Approximation in Streams. FOCS 2016.
  4. Greenwald, M., & Khanna, S. (2001). Space-Efficient Online Computation of Quantile Summaries. SIGMOD 2001.
  5. t-digest 官方仓库:https://github.com/tdunning/t-digest
  6. Elasticsearch percentiles aggregation 文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html
  7. ClickHouse quantileTDigest 文档:https://clickhouse.com/docs/en/sql-reference/aggregate-functions/reference/quantiletdigest
  8. Dunning, T. (2021). The t-digest: Efficient estimates of distributions. Software Impacts, 7, 100049.
  9. Apache DataSketches KLL 文档:https://datasketches.apache.org/docs/KLL/KLLSketch.html
  10. Beyer, B., Jones, C., Petoff, J., & Murphy, N. R. (2016). Site Reliability Engineering. O’Reilly. Chapter 4: Service Level Objectives.

算法系列导航上一篇:Count-Min Sketch | 下一篇:水塘抽样

相关阅读HyperLogLog | 流式算法总论


By .