土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】吞吐量优化:批处理、流水线与并发模型

文章导航

分类入口
architecture
标签入口
#throughput#batching#pipelining#concurrency#Nagle#TCP-CORK

目录

你负责的订单系统每天处理 500 万笔交易,数据库写入是瓶颈。DBA 告诉你 MySQL 单条 INSERT 的 TPS 上限大约 3000,但你用批量 INSERT 一次提交 500 行,TPS 跳到了 15 万行每秒——提升了 50 倍。你很高兴,把批大小调到 5000,结果 TPS 反而下降了,而且 P99 延迟从 20ms 涨到了 800ms。

为什么批量越大反而越慢?最优的批大小到底是多少?这个问题没有统一答案,因为它取决于网络 RTT、磁盘 IOPS、内存缓冲区大小、事务日志刷盘策略等多个变量。但分析的方法是确定的。

这篇文章讨论三件事:批处理(Batching)如何减少固定开销、流水线(Pipelining)如何掩盖延迟、并发模型如何影响吞吐量上限。它们是提升单机吞吐量的三条基本路径。


一、吞吐量的理论上限

1.1 定义与度量

吞吐量(Throughput)是单位时间内系统完成的工作量。度量单位因场景而异:

场景 吞吐量单位
Web 服务 请求/秒(RPS)
数据库 事务/秒(TPS)或行/秒
消息队列 消息/秒或 MB/秒
网络 比特/秒(bps)或包/秒(pps)
磁盘 IOPS 或 MB/秒
CPU 指令/周期(IPC)

吞吐量和延迟(Latency)是一对共生指标。降低单次操作的延迟通常会提升吞吐量,但反过来不一定成立——批处理可以大幅提升吞吐量,但单条记录的延迟反而会增加,因为每条记录要等到凑够一批才被处理。这是吞吐量优化中最常见的权衡。

1.2 Amdahl 定律在吞吐量上的应用

Amdahl 定律(Amdahl’s Law)最初描述的是并行化对程序加速比的限制。原始公式:

\[S = \frac{1}{(1 - P) + \frac{P}{N}}\]

其中 \(S\) 是加速比,\(P\) 是可并行化的比例,\(N\) 是并行度(核心数、线程数等)。

这个公式直接约束吞吐量上限。假设一个请求的处理时间中 80% 可以并行化(\(P = 0.8\)),20% 是串行的(锁竞争、日志刷盘、全局状态更新等),那么:

20% 的串行部分把吞吐量上限锁死在单核的 5 倍。增加到 64 核只用到了理论上限的 94%,剩下的 6% 需要从减少串行部分入手,而不是增加核心数。

这给吞吐量优化指出了一个基本方向:在增加并行度之前,先减少串行部分的比例。常见的串行瓶颈包括:

1.3 Universal Scalability Law

Amdahl 定律假设并行化没有额外开销,但现实中并行度增加会带来协调成本。Neil Gunther 提出的通用可扩展性定律(Universal Scalability Law, USL)增加了一个相干性延迟项(Coherency Delay):

\[C(N) = \frac{N}{1 + \sigma(N-1) + \kappa N(N-1)}\]

其中 \(\sigma\) 是串行化系数(对应 Amdahl 的 \(1-P\)),\(\kappa\) 是相干性系数。当 \(\kappa > 0\) 时,吞吐量在某个 \(N\) 之后会下降,而不仅仅是增长放缓。

这解释了一个常见现象:线程池从 16 调到 32,吞吐量没变;调到 64,吞吐量反而降了。\(\kappa\) 项捕捉的是缓存失效、锁争用、上下文切换等随并行度二次增长的开销。

graph LR
    subgraph 吞吐量与并行度的关系
    A["线性扩展<br/>理想情况"] --> B["Amdahl 上限<br/>增长放缓"]
    B --> C["USL 回退<br/>吞吐量下降"]
    end
    style A fill:#d4edda,stroke:#28a745
    style B fill:#fff3cd,stroke:#ffc107
    style C fill:#f8d7da,stroke:#dc3545

1.4 瓶颈识别:利特尔定律

利特尔定律(Little’s Law)是排队论的基础公式:

\[L = \lambda \times W\]

其中 \(L\) 是系统中的平均请求数,\(\lambda\) 是吞吐量(到达率),\(W\) 是平均逗留时间(延迟)。

这个公式的工程意义在于:已知任意两个变量就能推出第三个。当你测量到系统中平均有 100 个并发请求(\(L = 100\)),平均延迟是 50ms(\(W = 0.05s\)),那么吞吐量就是 \(\lambda = 100 / 0.05 = 2000\) RPS。如果你想把吞吐量提到 4000 RPS 而不增加延迟,就需要系统能容纳 200 个并发请求——这意味着线程池、连接池、内存都要翻倍。


二、批处理:减少固定开销

2.1 为什么批处理有效

每次 I/O 操作都有固定开销(Fixed Overhead)和可变开销(Variable Overhead)。以数据库 INSERT 为例:

单条 INSERT 的总耗时 ≈ 2.6ms + 0.01ms = 2.61ms,吞吐量 ≈ 383 行/秒。

100 条批量 INSERT 的总耗时 ≈ 2.6ms + 1ms = 3.6ms(可变部分增加),吞吐量 ≈ 100/3.6ms ≈ 27,778 行/秒。

批处理把固定开销在多条记录间分摊,减少了每条记录的平均处理时间。这是批处理提升吞吐量的核心机制。

2.2 最优批大小分析

设固定开销为 \(F\),每条记录的可变开销为 \(V\),批大小为 \(B\),则:

\(B \to \infty\) 时,\(T_{avg} \to V\),吞吐量趋近 \(\frac{1}{V}\)。看起来批越大越好,但实际中存在三个约束:

约束一:延迟约束。 凑齐一批需要等待时间。如果请求到达率为 \(r\),凑齐 \(B\) 条的等待时间为 \(\frac{B}{r}\)。当 \(B\) 增大,单条记录从到达到被处理的延迟线性增长。如果系统有延迟 SLA(例如 P99 < 100ms),就必须限制 \(B\)

约束二:内存约束。 缓冲区大小有限。\(B\) 条记录需要 \(B \times S\) 字节的内存(\(S\) 是单条记录大小)。当 \(B\) 太大,可能触发 GC、OOM 或缓冲区溢出。

约束三:非线性开销。 实际中可变开销不是常数。数据库批量 INSERT 在行数超过某个阈值后,日志写入从顺序变为随机、索引维护成本急剧增加、锁持有时间过长导致其他事务阻塞。这使得 \(V\) 变成 \(V(B)\),是关于 \(B\) 的递增函数。

最优批大小通常需要实测。一种实用方法是:固定延迟预算 \(D\),求满足 \(\frac{B}{r} + F + B \times V \leq D\) 的最大 \(B\)

2.3 批处理的经典模式

数据库批量写入

MySQL 的多行 INSERT 语法:

-- 单条插入:每条都要解析、开事务、提交、刷日志
INSERT INTO orders (user_id, amount) VALUES (1, 100);
INSERT INTO orders (user_id, amount) VALUES (2, 200);

-- 批量插入:一次解析、一次事务、一次刷日志
INSERT INTO orders (user_id, amount) VALUES
  (1, 100),
  (2, 200),
  (3, 300),
  -- ... 更多行
  (500, 50000);

Go 语言中实现批量写入的常见模式:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "strings"
)

const batchSize = 500

type Order struct {
    UserID int64
    Amount int64
}

func BatchInsertOrders(ctx context.Context, db *sql.DB, orders []Order) error {
    for i := 0; i < len(orders); i += batchSize {
        end := i + batchSize
        if end > len(orders) {
            end = len(orders)
        }
        batch := orders[i:end]

        valueStrings := make([]string, 0, len(batch))
        valueArgs := make([]interface{}, 0, len(batch)*2)

        for j, o := range batch {
            valueStrings = append(valueStrings,
                fmt.Sprintf("($%d, $%d)", j*2+1, j*2+2))
            valueArgs = append(valueArgs, o.UserID, o.Amount)
        }

        query := fmt.Sprintf(
            "INSERT INTO orders (user_id, amount) VALUES %s",
            strings.Join(valueStrings, ","))

        _, err := db.ExecContext(ctx, query, valueArgs...)
        if err != nil {
            return fmt.Errorf("batch insert at offset %d: %w", i, err)
        }
    }
    return nil
}

消息队列批量发送

Kafka 生产者的批处理由两个参数控制:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class BatchProducerExample {
    public static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");

        // 批处理核心参数
        // batch.size: 单个分区的批缓冲区大小(字节)
        // 默认 16384(16KB),增大可提升吞吐量
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB

        // linger.ms: 等待凑批的最大时间(毫秒)
        // 默认 0(立即发送),增大可让更多消息凑成一批
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待 10ms

        // 压缩:批越大压缩效果越好
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

        // 内存缓冲区总大小,限制所有分区的批缓冲总量
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB

        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) {
        try (KafkaProducer<String, String> producer = createProducer()) {
            for (int i = 0; i < 1000000; i++) {
                producer.send(new ProducerRecord<>(
                    "orders", String.valueOf(i), "order-data-" + i));
            }
        }
    }
}

batch.sizelinger.ms 形成一个”或”关系:哪个条件先满足就触发发送。batch.size = 64KB, linger.ms = 10 的含义是”凑够 64KB 或等满 10ms,先到的触发发送”。

2.4 批处理的权衡

维度 小批量(B < 100) 中批量(100 < B < 1000) 大批量(B > 1000)
吞吐量 低,固定开销占比大 高,固定开销被充分分摊 可能回退,非线性开销显现
延迟 低,几乎不等待 中等,等待凑批 高,等待时间长
内存 中等 高,可能 OOM
失败影响 小,重试代价低 中等 大,整批失败需重试
事务完整性 容易保证 需要注意事务大小 可能超过数据库事务限制
背压处理 灵活 需要控制缓冲区 容易造成上游阻塞

2.5 自适应批大小

固定批大小在负载变化时表现不佳。高负载时批很快凑满,低负载时等待超时才发送,浪费延迟预算。自适应策略根据当前负载动态调整批大小:

package main

import (
    "sync"
    "time"
)

type AdaptiveBatcher[T any] struct {
    mu         sync.Mutex
    buffer     []T
    minBatch   int
    maxBatch   int
    maxWait    time.Duration
    flush      func([]T) error
    timer      *time.Timer
}

func NewAdaptiveBatcher[T any](
    minBatch, maxBatch int,
    maxWait time.Duration,
    flush func([]T) error,
) *AdaptiveBatcher[T] {
    return &AdaptiveBatcher[T]{
        minBatch: minBatch,
        maxBatch: maxBatch,
        maxWait:  maxWait,
        flush:    flush,
    }
}

func (b *AdaptiveBatcher[T]) Add(item T) error {
    b.mu.Lock()
    b.buffer = append(b.buffer, item)

    if len(b.buffer) >= b.maxBatch {
        batch := b.buffer
        b.buffer = nil
        if b.timer != nil {
            b.timer.Stop()
            b.timer = nil
        }
        b.mu.Unlock()
        return b.flush(batch)
    }

    if b.timer == nil {
        b.timer = time.AfterFunc(b.maxWait, func() {
            b.mu.Lock()
            if len(b.buffer) > 0 {
                batch := b.buffer
                b.buffer = nil
                b.timer = nil
                b.mu.Unlock()
                _ = b.flush(batch)
                return
            }
            b.timer = nil
            b.mu.Unlock()
        })
    }

    b.mu.Unlock()
    return nil
}

三、流水线:掩盖延迟

3.1 流水线的基本原理

批处理减少的是固定开销的次数,流水线解决的是另一个问题:让多个阶段重叠执行

经典的 CPU 指令流水线(Instruction Pipeline)把一条指令的执行分为取指(Fetch)、译码(Decode)、执行(Execute)、访存(Memory)、写回(Writeback)五个阶段。没有流水线时,一条指令 5 个周期,10 条指令需要 50 个周期。有流水线时,第一条指令进入执行阶段的同时第二条指令进入译码阶段、第三条指令进入取指阶段——10 条指令只需要 14 个周期(5 + 9 × 1)。

软件系统中同样存在多阶段处理。一个 HTTP 请求的处理可以分为:接收请求 → 认证鉴权 → 业务逻辑 → 数据库查询 → 序列化响应 → 发送响应。如果每个阶段分配给不同的处理单元,就可以在处理请求 A 的业务逻辑时,同时对请求 B 进行认证鉴权、接收请求 C。

3.2 流水线吞吐量分析

设流水线有 \(K\) 个阶段,第 \(i\) 个阶段的处理时间为 \(t_i\),则:

关键结论:流水线的吞吐量由最慢的阶段决定。即使其他阶段都只需要 1ms,只要有一个阶段需要 10ms,整个流水线的吞吐量就被限制在 100 请求/秒。

3.3 级间平衡

流水线效率的核心是级间平衡(Stage Balancing)——让每个阶段的处理时间尽量相等。不平衡意味着快阶段在等慢阶段,计算资源被浪费。

三种实现平衡的方法:

方法一:拆分慢阶段。 如果数据库查询阶段需要 10ms,其他阶段只需要 2ms,可以把数据库查询拆成两个子阶段:先查缓存(2ms),缓存未命中才查数据库(8ms)。缓存命中率足够高时,平均处理时间接近 2ms。

方法二:并行化慢阶段。 不拆分阶段本身,而是在慢阶段部署多个处理器。如果数据库查询需要 10ms、其他阶段需要 2ms,在数据库查询阶段部署 5 个并行处理器,每个处理器 10ms 处理一个请求,5 个处理器交替工作,等效处理时间 2ms。

方法三:批量合并请求。 在慢阶段的入口设置缓冲区,把多个请求的同类操作合并成一次批量操作。例如把 5 个独立的数据库查询合并成一个包含 WHERE id IN (…) 的批量查询。

graph LR
    subgraph 不平衡流水线
    A1["接收<br/>2ms"] --> B1["认证<br/>2ms"]
    B1 --> C1["查询 DB<br/>10ms ⬤"]
    C1 --> D1["序列化<br/>2ms"]
    end

    subgraph 并行化后
    A2["接收<br/>2ms"] --> B2["认证<br/>2ms"]
    B2 --> C2a["DB Worker 1<br/>10ms"]
    B2 --> C2b["DB Worker 2<br/>10ms"]
    B2 --> C2c["DB Worker 3<br/>10ms"]
    B2 --> C2d["DB Worker 4<br/>10ms"]
    B2 --> C2e["DB Worker 5<br/>10ms"]
    C2a --> D2["序列化<br/>2ms"]
    C2b --> D2
    C2c --> D2
    C2d --> D2
    C2e --> D2
    end

3.4 Go 语言中的流水线实现

Go 的 channel 天然适合流水线模式。每个阶段是一个 goroutine,阶段间通过 channel 连接:

package main

import (
    "context"
    "fmt"
    "sync"
)

type Request struct {
    ID   int
    Data string
}

type Result struct {
    ID     int
    Output string
}

// 阶段一:接收与验证
func validate(ctx context.Context, in <-chan Request) <-chan Request {
    out := make(chan Request)
    go func() {
        defer close(out)
        for req := range in {
            if req.Data == "" {
                continue
            }
            select {
            case out <- req:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// 阶段二:业务处理(慢阶段,部署多个 worker 并行化)
func process(ctx context.Context, in <-chan Request, workers int) <-chan Result {
    out := make(chan Result)
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for req := range in {
                result := Result{
                    ID:     req.ID,
                    Output: fmt.Sprintf("processed-%s", req.Data),
                }
                select {
                case out <- result:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

// 阶段三:结果收集
func collect(ctx context.Context, in <-chan Result) []Result {
    var results []Result
    for r := range in {
        results = append(results, r)
    }
    return results
}

func main() {
    ctx := context.Background()

    input := make(chan Request, 100)
    go func() {
        defer close(input)
        for i := 0; i < 10000; i++ {
            input <- Request{ID: i, Data: fmt.Sprintf("order-%d", i)}
        }
    }()

    validated := validate(ctx, input)
    processed := process(ctx, validated, 8) // 8 个 worker 并行处理慢阶段
    results := collect(ctx, processed)

    fmt.Printf("processed %d results\n", len(results))
}

3.5 流水线的陷阱

陷阱一:流水线气泡(Pipeline Bubble)。 当某个阶段因异常或条件判断跳过时,后续阶段无事可做,产生空闲周期。CPU 的分支预测错误会导致流水线冲刷(Pipeline Flush),软件流水线中的错误处理也会造成类似问题。

陷阱二:级间缓冲区溢出。 快阶段生产速度大于慢阶段消费速度,中间的缓冲区会不断增长。必须设置有界缓冲区(Bounded Buffer)并实现背压(Backpressure)。Go channel 的容量限制就是天然的背压机制。

陷阱三:延迟累加。 流水线提升吞吐量但不降低单个请求的延迟。一个 \(K\) 阶段的流水线,单个请求的延迟是所有阶段之和,至少 \(K \times T_{cycle}\)。如果 \(K\) 太大,延迟会超过 SLA。


四、Nagle 算法与 TCP 写优化

4.1 小包问题

TCP 每个数据包(Segment)都有固定的头部开销:20 字节 IP 头 + 20 字节 TCP 头 = 40 字节。如果应用层每次只发送 1 字节数据,有效载荷率只有 \(\frac{1}{41} \approx 2.4\%\),97.6% 的带宽浪费在头部上。这在 1984 年被 John Nagle 称为”小包问题”(Small Packet Problem),在当时的低带宽网络中尤其严重。

4.2 Nagle 算法的机制

Nagle 算法(RFC 896)的规则很简单:

  1. 如果当前没有未被确认(unacknowledged)的数据包在网络中飞行,立即发送数据。
  2. 如果有未确认的数据包,将新数据放入缓冲区等待,直到:
    • 之前的数据包被 ACK 确认,或者
    • 缓冲区累积到一个完整的 MSS(Maximum Segment Size,通常 1460 字节)

伪代码描述:

// Nagle 算法伪代码
void nagle_send(socket *s, const char *data, size_t len) {
    if (s->unacked_bytes == 0) {
        // 没有未确认的数据,立即发送
        tcp_send(s, data, len);
        s->unacked_bytes += len;
    } else if (s->send_buffer_size + len >= MSS) {
        // 缓冲区凑够一个 MSS,立即发送
        buffer_append(s, data, len);
        tcp_send(s, s->send_buffer, s->send_buffer_size);
        s->unacked_bytes += s->send_buffer_size;
        buffer_clear(s);
    } else {
        // 既有未确认数据,又没凑够 MSS,等待
        buffer_append(s, data, len);
    }
}

// 收到 ACK 时
void on_ack_received(socket *s) {
    s->unacked_bytes = recalculate_unacked(s);
    if (s->send_buffer_size > 0 && s->unacked_bytes == 0) {
        tcp_send(s, s->send_buffer, s->send_buffer_size);
        s->unacked_bytes += s->send_buffer_size;
        buffer_clear(s);
    }
}

4.3 Nagle 与延迟 ACK 的交互

Nagle 算法单独看是合理的,但它和 TCP 延迟确认(Delayed ACK, RFC 1122)的交互会产生意外的延迟。

延迟 ACK 的规则是:收到数据包后不立即回复 ACK,而是等待最多 40ms(Linux 默认),希望在回复 ACK 的同时顺便携带应用层数据(ACK 搭便车)。

两者组合的糟糕场景:

  1. 客户端发送第一个小包(比如 HTTP 请求头),进入”等 ACK”状态
  2. 服务端收到请求头,延迟 ACK 启动 40ms 计时器,等待顺便搭载响应数据
  3. 服务端应用层还在处理请求,没有数据要发
  4. 客户端在 Nagle 等服务端的 ACK,服务端在延迟 ACK 等应用层数据
  5. 40ms 后延迟 ACK 超时,服务端发送纯 ACK
  6. 客户端收到 ACK,发送第二个小包(比如 HTTP 请求体)

本来几毫秒就能完成的两次发送,被拖到了 40ms 以上。这就是为什么几乎所有高性能网络程序都会关闭 Nagle 算法。

4.4 TCP_NODELAY:关闭 Nagle

设置 TCP_NODELAY 选项关闭 Nagle 算法,每次 write() 立即发送:

#include <sys/socket.h>
#include <netinet/tcp.h>

int enable_nodelay(int sockfd) {
    int flag = 1;
    return setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY,
                      &flag, sizeof(flag));
}

Go 中 net.TCPConn 默认就是 TCP_NODELAY = true

conn, err := net.Dial("tcp", "server:8080")
if err != nil {
    log.Fatal(err)
}
tcpConn := conn.(*net.TCPConn)
tcpConn.SetNoDelay(true) // Go 默认已开启

Rust 中使用 TcpStream

use std::net::TcpStream;

fn main() -> std::io::Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8080")?;
    stream.set_nodelay(true)?;
    Ok(())
}

4.5 TCP_CORK:应用层的写合并

关闭 Nagle 解决了延迟问题,但又回到了小包问题——如果应用层频繁调用 write() 发送小数据,每次都会立即生成一个 TCP 段。

Linux 提供了 TCP_CORK 选项(又称 Nagle 的替代方案)。设置 TCP_CORK = 1 后,TCP 会把后续的 write() 数据累积在内核缓冲区中,直到:

这让应用层精确控制发送边界。典型用法是在构造一个完整响应时:

#include <sys/socket.h>
#include <netinet/tcp.h>
#include <string.h>
#include <unistd.h>

int send_http_response(int sockfd,
                       const char *headers,
                       const char *body,
                       size_t body_len) {
    int cork = 1;
    // 塞上瓶塞,开始积攒数据
    setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &cork, sizeof(cork));

    // 多次 write 不会产生多个小包
    write(sockfd, headers, strlen(headers));
    write(sockfd, "\r\n", 2);
    write(sockfd, body, body_len);

    // 拔掉瓶塞,一次性发送
    cork = 0;
    setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &cork, sizeof(cork));

    return 0;
}

4.6 TCP_NODELAY vs TCP_CORK

特性 TCP_NODELAY TCP_CORK
功能 关闭 Nagle 算法 阻止发送直到显式释放
可移植性 POSIX 标准,所有系统支持 Linux 特有(FreeBSD 有 TCP_NOPUSH)
控制粒度 连接级,一旦设置全程生效 可以反复开关,精确控制发送窗口
适用场景 交互式协议(SSH、游戏) 构造完整消息后一次发送
风险 可能产生大量小包 忘记解除 cork 会导致数据不发送
与 sendfile 配合 无特殊优势 能合并 sendfile 和前后的 write

在实际工程中,常见的组合策略是:设置 TCP_NODELAY = true 作为基线(避免 Nagle-Delayed ACK 交互),然后在需要合并写入时使用 writev()(Scatter/Gather I/O)代替 TCP_CORKwritev() 是 POSIX 标准,可移植性更好:

#include <sys/uio.h>
#include <string.h>

int send_response_writev(int sockfd,
                         const char *headers,
                         const char *body,
                         size_t body_len) {
    struct iovec iov[3];
    iov[0].iov_base = (void *)headers;
    iov[0].iov_len = strlen(headers);
    iov[1].iov_base = "\r\n";
    iov[1].iov_len = 2;
    iov[2].iov_base = (void *)body;
    iov[2].iov_len = body_len;

    return writev(sockfd, iov, 3);
}

五、I/O 调度与写合并

5.1 磁盘 I/O 调度器

操作系统的 I/O 调度器(I/O Scheduler)对写吞吐量有直接影响。Linux 提供多种调度算法:

调度器 策略 适用场景
noop / none 不做排序,先来先服务 SSD、NVMe(硬件自带调度)
mq-deadline 保证读写请求在截止时间内完成 数据库、延迟敏感型负载
bfq 按进程公平分配带宽 桌面、多租户系统
kyber 针对快速设备的低延迟调度 NVMe 高 IOPS 场景

对于数据库服务器使用 SSD 的场景,通常选择 none(NVMe)或 mq-deadline(SATA SSD),让硬件自身的 FTL(Flash Translation Layer)处理写合并和磨损均衡。

5.2 写合并(Write Combining)

写合并(Write Combining)是将多次小写操作合并为一次大写操作的技术,在多个层次上存在:

CPU 层面: Write Combining Buffer(WCB)将对同一缓存行的多次写入合并为一次总线事务。这对显存写入尤其重要,GPU 驱动通常把显存区域映射为 Write-Combining 内存类型。

文件系统层面: ext4 的 Journal 模式会把多个小写操作合并到日志缓冲区,然后一次性刷盘。XFS 的延迟分配(Delayed Allocation)更进一步,在 write() 时不立即分配磁盘块,而是累积多次写入后批量分配连续的磁盘空间,减少碎片。

应用层面: 使用 BufferedWriter(Java)、bufio.Writer(Go)等缓冲写入器,减少系统调用次数:

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;

public class BufferedWriteExample {
    public static void writeLines(String path, Iterable<String> lines)
            throws IOException {
        // 默认缓冲区 8KB,可自定义
        try (BufferedWriter writer =
                new BufferedWriter(new FileWriter(path), 65536)) {
            for (String line : lines) {
                writer.write(line);
                writer.newLine();
            }
            // close() 自动 flush
        }
    }
}

5.3 fsync 的成本与策略

fsync() 把文件数据和元数据从页缓存(Page Cache)刷到磁盘,是保证持久性的关键调用,也是写吞吐量的最大瓶颈之一。

一次 fsync() 在不同存储介质上的典型耗时:

存储介质 fsync 延迟 对应 IOPS 上限
HDD 7200rpm 5-10ms 100-200
SATA SSD 0.1-0.5ms 2,000-10,000
NVMe SSD 0.02-0.1ms 10,000-100,000
Intel Optane 0.01ms 100,000+
电池备份 RAID 控制器 <0.01ms 立即返回

数据库系统通过”组提交”(Group Commit)优化 fsync 的影响:多个事务共享一次 fsync。MySQL InnoDB 的 innodb_flush_log_at_trx_commit 参数控制刷盘策略:


六、并发模型对吞吐量的影响

6.1 线程模型

不同的并发模型(Concurrency Model)对吞吐量有根本性影响。

每请求一线程(Thread-per-Request): 最简单的模型。每个请求分配一个操作系统线程。线程数受限于内存(每个线程约 1MB 栈空间)和上下文切换开销。典型上限:几千个并发连接。

import java.net.ServerSocket;
import java.net.Socket;

public class ThreadPerRequestServer {
    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(8080);
        while (true) {
            Socket client = server.accept();
            new Thread(() -> handleRequest(client)).start();
        }
    }

    static void handleRequest(Socket client) {
        // 处理请求
    }
}

线程池(Thread Pool): 限制线程数量,用队列缓冲请求。解决了线程爆炸问题,但线程在 I/O 阻塞时被占用,无法服务其他请求。

事件驱动(Event-Driven): 单线程或少量线程通过非阻塞 I/O 和事件循环处理大量并发连接。代表:Nginx、Node.js、Redis。吞吐量高但编程模型复杂,不适合 CPU 密集型任务。

协程/用户态线程: Go 的 goroutine、Java 21 的虚拟线程(Virtual Thread)。保留同步编程风格的同时实现高并发。goroutine 栈初始只有几 KB,可以轻松创建数十万个。

6.2 并发模型对比

模型 并发上限 编程复杂度 CPU 利用率 I/O 密集型 CPU 密集型
每请求一线程 数千 低(上下文切换多) 中等
线程池 数千~数万 中等 中等 良好
事件驱动 数十万 高(回调地狱) 优秀 差(阻塞事件循环)
协程 数十万~百万 低(同步风格) 优秀 良好(配合多核调度)
Actor 模型 数百万 中等 优秀 良好

6.3 Java 虚拟线程实战

Java 21 正式引入虚拟线程(Virtual Thread),在 I/O 密集型场景下能显著提升吞吐量:

import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadServer {
    public static void main(String[] args) throws Exception {
        // 虚拟线程执行器:每个任务一个虚拟线程,没有线程数上限
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

        try (ServerSocket server = new ServerSocket(8080)) {
            while (true) {
                Socket client = server.accept();
                executor.submit(() -> handleRequest(client));
            }
        }
    }

    static void handleRequest(Socket client) {
        // 虚拟线程在 I/O 阻塞时自动让出载体线程
        // 不需要修改业务代码
    }
}

虚拟线程的吞吐量优势来自两个方面:

  1. 栈内存极小(初始几百字节 vs 操作系统线程的 1MB),可以创建百万级虚拟线程
  2. I/O 阻塞时自动让出底层平台线程(Carrier Thread),不浪费 CPU 时间片

6.4 Rust 的 async/await 与 Tokio

Rust 的 async/await 编译为状态机,配合 Tokio 运行时实现高效的用户态调度:

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("0.0.0.0:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0u8; 1024];
            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(_) => return,
                };
                if socket.write_all(&buf[..n]).await.is_err() {
                    return;
                }
            }
        });
    }
}

Tokio 的工作窃取调度器(Work-Stealing Scheduler)在多核环境下自动平衡负载,空闲线程会从忙线程的队列中窃取任务,减少了手动调优线程池大小的负担。


七、工程案例:Kafka 生产者吞吐量调优

7.1 背景

某在线广告平台的点击事件采集系统需要将用户点击数据写入 Kafka。业务需求:

7.2 初始配置与问题

使用默认配置的 Kafka 生产者,测试结果:

吞吐量:8 万事件/秒
P99 延迟:5ms
CPU 使用率:15%
网络带宽使用率:3%

吞吐量远低于需求。CPU 和网络都没有成为瓶颈,问题出在哪里?

7.3 分析过程

第一步:识别瓶颈。 使用 Kafka 生产者的 JMX 指标 record-send-ratebatch-size-avgrequest-latency-avg 分析:

batch-size-avg: 1200 字节(远小于默认 batch.size 的 16KB)
record-queue-time-avg: 0.5ms
request-latency-avg: 3ms
records-per-request-avg: 6

平均每批只有 6 条记录,1200 字节。批太小了。原因:linger.ms 默认为 0,消息到达后立即发送,来不及凑批。

第二步:调大 linger.ms。linger.ms = 5,让生产者等待 5ms 凑批:

吞吐量:25 万事件/秒
batch-size-avg: 12KB
records-per-request-avg: 60
P99 延迟:12ms

吞吐量提升 3 倍,但还不够。

第三步:增大 batch.size 和缓冲区。batch.size = 131072(128KB),buffer.memory = 134217728(128MB):

吞吐量:42 万事件/秒
batch-size-avg: 45KB
records-per-request-avg: 225
P99 延迟:18ms

第四步:启用压缩。compression.type = lz4,压缩在批级别进行,批越大压缩率越高:

吞吐量:55 万事件/秒
batch-size-avg: 48KB(压缩前)
compressed-size-avg: 14KB(压缩后)
P99 延迟:22ms
网络带宽使用率:8%

第五步:增加生产者线程数。 单个生产者实例受限于单线程的序列化和压缩。启动 4 个生产者实例,每个绑定不同的分区范围:

总吞吐量:65 万事件/秒
P99 延迟:25ms
CPU 使用率:55%

7.4 最终配置

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
props.put("acks", "1");
props.put("batch.size", 131072);         // 128KB
props.put("linger.ms", 5);              // 等待 5ms 凑批
props.put("buffer.memory", 134217728);   // 128MB 总缓冲
props.put("compression.type", "lz4");    // LZ4 压缩
props.put("max.in.flight.requests.per.connection", 5);
props.put("send.buffer.bytes", 1048576); // 1MB TCP 发送缓冲区

7.5 调优总结

flowchart TD
    A["识别瓶颈<br/>JMX 指标分析"] --> B{"批大小是否<br/>接近 batch.size?"}
    B -- 否 --> C["增大 linger.ms<br/>给更多时间凑批"]
    B -- 是 --> D{"压缩是否启用?"}
    C --> B
    D -- 否 --> E["启用 LZ4 压缩<br/>减少网络传输量"]
    D -- 是 --> F{"CPU 是否瓶颈?"}
    E --> D
    F -- 否 --> G["增大 batch.size<br/>增大 buffer.memory"]
    F -- 是 --> H["增加生产者实例<br/>分担序列化负载"]
    G --> F
    H --> I["达到目标吞吐量"]

八、工程案例:数据库批量写入优化

8.1 PostgreSQL COPY 协议

对于大批量数据导入,PostgreSQL 的 COPY 命令比 INSERT 快一个数量级。COPY 使用专用的二进制协议,跳过 SQL 解析、查询规划、逐行事务等开销:

-- INSERT 方式:100 万行约 120 秒
INSERT INTO events (ts, user_id, event_type, payload)
VALUES ('2026-01-01', 1, 'click', '{}');
-- 重复 100 万次...

-- COPY 方式:100 万行约 5 秒
COPY events (ts, user_id, event_type, payload)
FROM STDIN WITH (FORMAT csv);
2026-01-01,1,click,{}
2026-01-01,2,view,{}
-- ...
\.

Go 中使用 pgx 库的 CopyFrom

func BulkInsertEvents(ctx context.Context, pool *pgxpool.Pool,
    events []Event) (int64, error) {

    rows := make([][]interface{}, len(events))
    for i, e := range events {
        rows[i] = []interface{}{e.Ts, e.UserID, e.EventType, e.Payload}
    }

    return pool.CopyFrom(
        ctx,
        pgx.Identifier{"events"},
        []string{"ts", "user_id", "event_type", "payload"},
        pgx.CopyFromRows(rows),
    )
}

8.2 批量写入的索引策略

大批量写入时,索引维护是主要开销。每插入一行,B-Tree 索引需要找到插入位置、可能触发页分裂。当批量写入 100 万行时,索引维护可能占据 60% 以上的总时间。

优化策略:

  1. 先删索引后重建: 对于初始加载场景,先 DROP INDEX,加载数据,再 CREATE INDEX。批量构建索引比逐行维护快得多,因为可以使用排序-合并算法一次性构建 B-Tree。

  2. 调大 maintenance_work_mem: PostgreSQL 构建索引时使用 maintenance_work_mem 作为排序缓冲区。默认 64MB,大批量加载时建议调到 1GB 以上。

  3. 禁用 WAL: PostgreSQL 的 UNLOGGED TABLE 跳过 WAL(Write-Ahead Log)写入,性能提升 2-3 倍,但数据库崩溃时会丢数据。适用于临时表或可重建的数据。


九、实用批处理模式

9.1 令牌桶与批量限流

高吞吐系统通常需要限流(Rate Limiting)。令牌桶(Token Bucket)算法在批处理场景下的变种是:不是每次请求消耗一个令牌,而是每批消耗令牌数等于批大小。这避免了批内逐条检查限流的开销:

package main

import (
    "context"
    "time"

    "golang.org/x/time/rate"
)

func FlushWithRateLimit(ctx context.Context,
    limiter *rate.Limiter, batch [][]byte) error {
    // 整批一次性申请令牌,避免逐条检查的开销
    if err := limiter.WaitN(ctx, len(batch)); err != nil {
        return err
    }
    return processBatch(batch)
}

func processBatch(batch [][]byte) error {
    return nil
}

func main() {
    _ = rate.NewLimiter(rate.Every(time.Second/10000), 1000)
}

9.2 分区并行批处理

当目标系统支持分区(如 Kafka Topic 的 Partition、数据库的分表),可以按分区分别凑批、并行发送。每个分区有独立的缓冲区和刷新逻辑,互不阻塞:

package main

import (
    "fmt"
    "hash/fnv"
)

type PartitionedBatcher struct {
    partitions int
    batchers   []*AdaptiveBatcher[Record]
}

type Record struct {
    Key   string
    Value []byte
}

func NewPartitionedBatcher(partitions int) *PartitionedBatcher {
    pb := &PartitionedBatcher{
        partitions: partitions,
        batchers:   make([]*AdaptiveBatcher[Record], partitions),
    }
    for i := 0; i < partitions; i++ {
        partID := i
        pb.batchers[i] = NewAdaptiveBatcher[Record](
            10, 500, 0,
            func(batch []Record) error {
                fmt.Printf("flushing %d records to partition %d\n",
                    len(batch), partID)
                return nil
            },
        )
    }
    return pb
}

func (pb *PartitionedBatcher) Add(record Record) error {
    h := fnv.New32a()
    h.Write([]byte(record.Key))
    partition := int(h.Sum32()) % pb.partitions
    return pb.batchers[partition].Add(record)
}

十、吞吐量优化的系统性方法

10.1 优化步骤

吞吐量优化不是随机调参,而是一个系统性的排查过程:

  1. 确定目标。 明确需要多少吞吐量、在什么延迟约束下。没有明确目标的优化是浪费时间。

  2. 测量基线。 在当前配置下测量实际吞吐量。不要猜测,不要依赖理论计算。用真实负载或接近真实的压测工具(如 wrk、vegeta、k6)。

  3. 识别瓶颈。perfstracebpftraceasync-profiler 等工具定位瓶颈。常见瓶颈按优先级排序:

    • 锁竞争(检查 perf lockmutex contention 指标)
    • I/O 等待(检查 iostatiotop
    • 网络延迟(检查 ss -titcpdump
    • CPU 饱和(检查 mpstathtop
    • 内存不足导致的 GC 或 swap
  4. 针对性优化。 只优化瓶颈环节。优化非瓶颈环节不会提升整体吞吐量(Amdahl 定律的推论)。

  5. 验证效果。 重新测量,确认吞吐量提升符合预期,延迟没有超出 SLA。

  6. 回到第 3 步。 瓶颈可能转移。原来的 I/O 瓶颈解决后,CPU 可能成为新的瓶颈。

10.2 常见反模式

反模式一:过早优化并发度。 在没有识别瓶颈的情况下就增加线程数。如果瓶颈是磁盘 I/O,增加线程只会增加锁争用,不会提升吞吐量。

反模式二:忽略延迟约束。 通过无限增大批大小来追求极端吞吐量,导致延迟不可控。批处理的吞吐量和延迟是一对权衡,必须在 SLA 约束下优化。

反模式三:微观优化。 花大量时间优化单条记录的处理时间(比如用 SIMD 加速 JSON 解析),但瓶颈实际在网络 RTT 或磁盘 fsync。方向错误的优化,无论多精妙都是浪费。

反模式四:忽略尾部延迟。 只看平均吞吐量和平均延迟,忽略 P99/P999 的毛刺。批处理和缓冲区溢出常常导致尾部延迟爆炸。

10.3 吞吐量 vs 延迟的全局视角

吞吐量和延迟的关系可以用排队论的经典模型描述。在 M/M/1 队列中,系统利用率 \(\rho = \frac{\lambda}{\mu}\)(到达率 / 服务率),平均等待时间为:

\[W = \frac{1}{\mu - \lambda} = \frac{1}{\mu(1 - \rho)}\]

\(\rho\) 接近 1(吞吐量接近系统容量)时,等待时间趋向无穷。这意味着:

实际工程中,通常把目标利用率控制在 60-70%,为流量尖峰留出余量。超过 80% 的持续利用率几乎必然导致延迟问题。


十一、总结

吞吐量优化的三条路径各有适用场景:

技术 核心机制 适用场景 主要风险
批处理 分摊固定开销 I/O 密集、固定开销大 延迟增加、失败影响范围大
流水线 阶段重叠执行 多阶段串行处理 级间不平衡、延迟累加
并发 多处理单元并行 可并行化比例高 锁竞争、协调开销

三者可以组合使用。Kafka 生产者就是一个典型:批处理凑批减少网络请求次数,多分区并行发送提升并行度,生产者内部的 Sender 线程和应用线程形成两级流水线。

最重要的原则不是”哪种技术更好”,而是”瓶颈在哪里”。不识别瓶颈就开始优化,大概率走弯路。用测量替代猜测,用数据驱动决策。

下一篇将深入线程模型的细节:操作系统线程、用户态线程、协程、事件循环——它们的调度机制有什么区别,在不同负载模式下应该如何选择。


上一篇:【系统架构设计百科】延迟分析:从硬件时钟到尾部延迟

下一篇:【系统架构设计百科】线程模型:从操作系统线程到协程


参考资料

书籍

论文与 RFC

在线资源

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。

2026-04-13 · architecture

【系统架构设计百科】复杂性管理:架构的核心战场

系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略


By .