你负责的订单系统每天处理 500 万笔交易,数据库写入是瓶颈。DBA 告诉你 MySQL 单条 INSERT 的 TPS 上限大约 3000,但你用批量 INSERT 一次提交 500 行,TPS 跳到了 15 万行每秒——提升了 50 倍。你很高兴,把批大小调到 5000,结果 TPS 反而下降了,而且 P99 延迟从 20ms 涨到了 800ms。
为什么批量越大反而越慢?最优的批大小到底是多少?这个问题没有统一答案,因为它取决于网络 RTT、磁盘 IOPS、内存缓冲区大小、事务日志刷盘策略等多个变量。但分析的方法是确定的。
这篇文章讨论三件事:批处理(Batching)如何减少固定开销、流水线(Pipelining)如何掩盖延迟、并发模型如何影响吞吐量上限。它们是提升单机吞吐量的三条基本路径。
一、吞吐量的理论上限
1.1 定义与度量
吞吐量(Throughput)是单位时间内系统完成的工作量。度量单位因场景而异:
| 场景 | 吞吐量单位 |
|---|---|
| Web 服务 | 请求/秒(RPS) |
| 数据库 | 事务/秒(TPS)或行/秒 |
| 消息队列 | 消息/秒或 MB/秒 |
| 网络 | 比特/秒(bps)或包/秒(pps) |
| 磁盘 | IOPS 或 MB/秒 |
| CPU | 指令/周期(IPC) |
吞吐量和延迟(Latency)是一对共生指标。降低单次操作的延迟通常会提升吞吐量,但反过来不一定成立——批处理可以大幅提升吞吐量,但单条记录的延迟反而会增加,因为每条记录要等到凑够一批才被处理。这是吞吐量优化中最常见的权衡。
1.2 Amdahl 定律在吞吐量上的应用
Amdahl 定律(Amdahl’s Law)最初描述的是并行化对程序加速比的限制。原始公式:
\[S = \frac{1}{(1 - P) + \frac{P}{N}}\]
其中 \(S\) 是加速比,\(P\) 是可并行化的比例,\(N\) 是并行度(核心数、线程数等)。
这个公式直接约束吞吐量上限。假设一个请求的处理时间中 80% 可以并行化(\(P = 0.8\)),20% 是串行的(锁竞争、日志刷盘、全局状态更新等),那么:
- 1 个核心:加速比 1.0
- 4 个核心:加速比 \(\frac{1}{0.2 + 0.2} = 2.5\)
- 16 个核心:加速比 \(\frac{1}{0.2 + 0.05} = 4.0\)
- 64 个核心:加速比 \(\frac{1}{0.2 + 0.0125} \approx 4.7\)
- 无穷核心:加速比上限 \(\frac{1}{0.2} = 5.0\)
20% 的串行部分把吞吐量上限锁死在单核的 5 倍。增加到 64 核只用到了理论上限的 94%,剩下的 6% 需要从减少串行部分入手,而不是增加核心数。
这给吞吐量优化指出了一个基本方向:在增加并行度之前,先减少串行部分的比例。常见的串行瓶颈包括:
- 全局锁(Global Lock):Python 的 GIL、数据库的表级锁
- 共享状态更新:计数器、序列号生成器
- 磁盘同步写入:事务日志的 fsync
- 网络协议的串行约束:TCP 的有序交付
1.3 Universal Scalability Law
Amdahl 定律假设并行化没有额外开销,但现实中并行度增加会带来协调成本。Neil Gunther 提出的通用可扩展性定律(Universal Scalability Law, USL)增加了一个相干性延迟项(Coherency Delay):
\[C(N) = \frac{N}{1 + \sigma(N-1) + \kappa N(N-1)}\]
其中 \(\sigma\) 是串行化系数(对应 Amdahl 的 \(1-P\)),\(\kappa\) 是相干性系数。当 \(\kappa > 0\) 时,吞吐量在某个 \(N\) 之后会下降,而不仅仅是增长放缓。
这解释了一个常见现象:线程池从 16 调到 32,吞吐量没变;调到 64,吞吐量反而降了。\(\kappa\) 项捕捉的是缓存失效、锁争用、上下文切换等随并行度二次增长的开销。
graph LR
subgraph 吞吐量与并行度的关系
A["线性扩展<br/>理想情况"] --> B["Amdahl 上限<br/>增长放缓"]
B --> C["USL 回退<br/>吞吐量下降"]
end
style A fill:#d4edda,stroke:#28a745
style B fill:#fff3cd,stroke:#ffc107
style C fill:#f8d7da,stroke:#dc3545
1.4 瓶颈识别:利特尔定律
利特尔定律(Little’s Law)是排队论的基础公式:
\[L = \lambda \times W\]
其中 \(L\) 是系统中的平均请求数,\(\lambda\) 是吞吐量(到达率),\(W\) 是平均逗留时间(延迟)。
这个公式的工程意义在于:已知任意两个变量就能推出第三个。当你测量到系统中平均有 100 个并发请求(\(L = 100\)),平均延迟是 50ms(\(W = 0.05s\)),那么吞吐量就是 \(\lambda = 100 / 0.05 = 2000\) RPS。如果你想把吞吐量提到 4000 RPS 而不增加延迟,就需要系统能容纳 200 个并发请求——这意味着线程池、连接池、内存都要翻倍。
二、批处理:减少固定开销
2.1 为什么批处理有效
每次 I/O 操作都有固定开销(Fixed Overhead)和可变开销(Variable Overhead)。以数据库 INSERT 为例:
- 固定开销:网络 RTT(1ms)、SQL 解析(0.1ms)、事务开始/提交(0.5ms)、日志刷盘(1ms)
- 可变开销:每行数据的写入(0.01ms/行)
单条 INSERT 的总耗时 ≈ 2.6ms + 0.01ms = 2.61ms,吞吐量 ≈ 383 行/秒。
100 条批量 INSERT 的总耗时 ≈ 2.6ms + 1ms = 3.6ms(可变部分增加),吞吐量 ≈ 100/3.6ms ≈ 27,778 行/秒。
批处理把固定开销在多条记录间分摊,减少了每条记录的平均处理时间。这是批处理提升吞吐量的核心机制。
2.2 最优批大小分析
设固定开销为 \(F\),每条记录的可变开销为 \(V\),批大小为 \(B\),则:
- 一批的处理时间:\(T_{batch} = F + B \times V\)
- 每条记录的平均时间:\(T_{avg} = \frac{F}{B} + V\)
- 吞吐量:\(\lambda = \frac{B}{F + B \times V}\)
当 \(B \to \infty\) 时,\(T_{avg} \to V\),吞吐量趋近 \(\frac{1}{V}\)。看起来批越大越好,但实际中存在三个约束:
约束一:延迟约束。 凑齐一批需要等待时间。如果请求到达率为 \(r\),凑齐 \(B\) 条的等待时间为 \(\frac{B}{r}\)。当 \(B\) 增大,单条记录从到达到被处理的延迟线性增长。如果系统有延迟 SLA(例如 P99 < 100ms),就必须限制 \(B\)。
约束二:内存约束。 缓冲区大小有限。\(B\) 条记录需要 \(B \times S\) 字节的内存(\(S\) 是单条记录大小)。当 \(B\) 太大,可能触发 GC、OOM 或缓冲区溢出。
约束三:非线性开销。 实际中可变开销不是常数。数据库批量 INSERT 在行数超过某个阈值后,日志写入从顺序变为随机、索引维护成本急剧增加、锁持有时间过长导致其他事务阻塞。这使得 \(V\) 变成 \(V(B)\),是关于 \(B\) 的递增函数。
最优批大小通常需要实测。一种实用方法是:固定延迟预算 \(D\),求满足 \(\frac{B}{r} + F + B \times V \leq D\) 的最大 \(B\)。
2.3 批处理的经典模式
数据库批量写入
MySQL 的多行 INSERT 语法:
-- 单条插入:每条都要解析、开事务、提交、刷日志
INSERT INTO orders (user_id, amount) VALUES (1, 100);
INSERT INTO orders (user_id, amount) VALUES (2, 200);
-- 批量插入:一次解析、一次事务、一次刷日志
INSERT INTO orders (user_id, amount) VALUES
(1, 100),
(2, 200),
(3, 300),
-- ... 更多行
(500, 50000);Go 语言中实现批量写入的常见模式:
package main
import (
"context"
"database/sql"
"fmt"
"strings"
)
const batchSize = 500
type Order struct {
UserID int64
Amount int64
}
func BatchInsertOrders(ctx context.Context, db *sql.DB, orders []Order) error {
for i := 0; i < len(orders); i += batchSize {
end := i + batchSize
if end > len(orders) {
end = len(orders)
}
batch := orders[i:end]
valueStrings := make([]string, 0, len(batch))
valueArgs := make([]interface{}, 0, len(batch)*2)
for j, o := range batch {
valueStrings = append(valueStrings,
fmt.Sprintf("($%d, $%d)", j*2+1, j*2+2))
valueArgs = append(valueArgs, o.UserID, o.Amount)
}
query := fmt.Sprintf(
"INSERT INTO orders (user_id, amount) VALUES %s",
strings.Join(valueStrings, ","))
_, err := db.ExecContext(ctx, query, valueArgs...)
if err != nil {
return fmt.Errorf("batch insert at offset %d: %w", i, err)
}
}
return nil
}消息队列批量发送
Kafka 生产者的批处理由两个参数控制:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class BatchProducerExample {
public static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 批处理核心参数
// batch.size: 单个分区的批缓冲区大小(字节)
// 默认 16384(16KB),增大可提升吞吐量
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
// linger.ms: 等待凑批的最大时间(毫秒)
// 默认 0(立即发送),增大可让更多消息凑成一批
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待 10ms
// 压缩:批越大压缩效果越好
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// 内存缓冲区总大小,限制所有分区的批缓冲总量
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB
return new KafkaProducer<>(props);
}
public static void main(String[] args) {
try (KafkaProducer<String, String> producer = createProducer()) {
for (int i = 0; i < 1000000; i++) {
producer.send(new ProducerRecord<>(
"orders", String.valueOf(i), "order-data-" + i));
}
}
}
}batch.size 和 linger.ms
形成一个”或”关系:哪个条件先满足就触发发送。batch.size = 64KB, linger.ms = 10
的含义是”凑够 64KB 或等满 10ms,先到的触发发送”。
2.4 批处理的权衡
| 维度 | 小批量(B < 100) | 中批量(100 < B < 1000) | 大批量(B > 1000) |
|---|---|---|---|
| 吞吐量 | 低,固定开销占比大 | 高,固定开销被充分分摊 | 可能回退,非线性开销显现 |
| 延迟 | 低,几乎不等待 | 中等,等待凑批 | 高,等待时间长 |
| 内存 | 低 | 中等 | 高,可能 OOM |
| 失败影响 | 小,重试代价低 | 中等 | 大,整批失败需重试 |
| 事务完整性 | 容易保证 | 需要注意事务大小 | 可能超过数据库事务限制 |
| 背压处理 | 灵活 | 需要控制缓冲区 | 容易造成上游阻塞 |
2.5 自适应批大小
固定批大小在负载变化时表现不佳。高负载时批很快凑满,低负载时等待超时才发送,浪费延迟预算。自适应策略根据当前负载动态调整批大小:
package main
import (
"sync"
"time"
)
type AdaptiveBatcher[T any] struct {
mu sync.Mutex
buffer []T
minBatch int
maxBatch int
maxWait time.Duration
flush func([]T) error
timer *time.Timer
}
func NewAdaptiveBatcher[T any](
minBatch, maxBatch int,
maxWait time.Duration,
flush func([]T) error,
) *AdaptiveBatcher[T] {
return &AdaptiveBatcher[T]{
minBatch: minBatch,
maxBatch: maxBatch,
maxWait: maxWait,
flush: flush,
}
}
func (b *AdaptiveBatcher[T]) Add(item T) error {
b.mu.Lock()
b.buffer = append(b.buffer, item)
if len(b.buffer) >= b.maxBatch {
batch := b.buffer
b.buffer = nil
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
b.mu.Unlock()
return b.flush(batch)
}
if b.timer == nil {
b.timer = time.AfterFunc(b.maxWait, func() {
b.mu.Lock()
if len(b.buffer) > 0 {
batch := b.buffer
b.buffer = nil
b.timer = nil
b.mu.Unlock()
_ = b.flush(batch)
return
}
b.timer = nil
b.mu.Unlock()
})
}
b.mu.Unlock()
return nil
}三、流水线:掩盖延迟
3.1 流水线的基本原理
批处理减少的是固定开销的次数,流水线解决的是另一个问题:让多个阶段重叠执行。
经典的 CPU 指令流水线(Instruction Pipeline)把一条指令的执行分为取指(Fetch)、译码(Decode)、执行(Execute)、访存(Memory)、写回(Writeback)五个阶段。没有流水线时,一条指令 5 个周期,10 条指令需要 50 个周期。有流水线时,第一条指令进入执行阶段的同时第二条指令进入译码阶段、第三条指令进入取指阶段——10 条指令只需要 14 个周期(5 + 9 × 1)。
软件系统中同样存在多阶段处理。一个 HTTP 请求的处理可以分为:接收请求 → 认证鉴权 → 业务逻辑 → 数据库查询 → 序列化响应 → 发送响应。如果每个阶段分配给不同的处理单元,就可以在处理请求 A 的业务逻辑时,同时对请求 B 进行认证鉴权、接收请求 C。
3.2 流水线吞吐量分析
设流水线有 \(K\) 个阶段,第 \(i\) 个阶段的处理时间为 \(t_i\),则:
- 流水线的周期(Cycle Time):\(T_{cycle} = \max(t_1, t_2, ..., t_K)\)
- 理论吞吐量:\(\lambda = \frac{1}{T_{cycle}}\)
- 处理 \(N\) 个任务的总时间:\(T_{total} = (K - 1) \times T_{cycle} + N \times T_{cycle} = (K + N - 1) \times T_{cycle}\)
关键结论:流水线的吞吐量由最慢的阶段决定。即使其他阶段都只需要 1ms,只要有一个阶段需要 10ms,整个流水线的吞吐量就被限制在 100 请求/秒。
3.3 级间平衡
流水线效率的核心是级间平衡(Stage Balancing)——让每个阶段的处理时间尽量相等。不平衡意味着快阶段在等慢阶段,计算资源被浪费。
三种实现平衡的方法:
方法一:拆分慢阶段。 如果数据库查询阶段需要 10ms,其他阶段只需要 2ms,可以把数据库查询拆成两个子阶段:先查缓存(2ms),缓存未命中才查数据库(8ms)。缓存命中率足够高时,平均处理时间接近 2ms。
方法二:并行化慢阶段。 不拆分阶段本身,而是在慢阶段部署多个处理器。如果数据库查询需要 10ms、其他阶段需要 2ms,在数据库查询阶段部署 5 个并行处理器,每个处理器 10ms 处理一个请求,5 个处理器交替工作,等效处理时间 2ms。
方法三:批量合并请求。 在慢阶段的入口设置缓冲区,把多个请求的同类操作合并成一次批量操作。例如把 5 个独立的数据库查询合并成一个包含 WHERE id IN (…) 的批量查询。
graph LR
subgraph 不平衡流水线
A1["接收<br/>2ms"] --> B1["认证<br/>2ms"]
B1 --> C1["查询 DB<br/>10ms ⬤"]
C1 --> D1["序列化<br/>2ms"]
end
subgraph 并行化后
A2["接收<br/>2ms"] --> B2["认证<br/>2ms"]
B2 --> C2a["DB Worker 1<br/>10ms"]
B2 --> C2b["DB Worker 2<br/>10ms"]
B2 --> C2c["DB Worker 3<br/>10ms"]
B2 --> C2d["DB Worker 4<br/>10ms"]
B2 --> C2e["DB Worker 5<br/>10ms"]
C2a --> D2["序列化<br/>2ms"]
C2b --> D2
C2c --> D2
C2d --> D2
C2e --> D2
end
3.4 Go 语言中的流水线实现
Go 的 channel 天然适合流水线模式。每个阶段是一个 goroutine,阶段间通过 channel 连接:
package main
import (
"context"
"fmt"
"sync"
)
type Request struct {
ID int
Data string
}
type Result struct {
ID int
Output string
}
// 阶段一:接收与验证
func validate(ctx context.Context, in <-chan Request) <-chan Request {
out := make(chan Request)
go func() {
defer close(out)
for req := range in {
if req.Data == "" {
continue
}
select {
case out <- req:
case <-ctx.Done():
return
}
}
}()
return out
}
// 阶段二:业务处理(慢阶段,部署多个 worker 并行化)
func process(ctx context.Context, in <-chan Request, workers int) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for req := range in {
result := Result{
ID: req.ID,
Output: fmt.Sprintf("processed-%s", req.Data),
}
select {
case out <- result:
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// 阶段三:结果收集
func collect(ctx context.Context, in <-chan Result) []Result {
var results []Result
for r := range in {
results = append(results, r)
}
return results
}
func main() {
ctx := context.Background()
input := make(chan Request, 100)
go func() {
defer close(input)
for i := 0; i < 10000; i++ {
input <- Request{ID: i, Data: fmt.Sprintf("order-%d", i)}
}
}()
validated := validate(ctx, input)
processed := process(ctx, validated, 8) // 8 个 worker 并行处理慢阶段
results := collect(ctx, processed)
fmt.Printf("processed %d results\n", len(results))
}3.5 流水线的陷阱
陷阱一:流水线气泡(Pipeline Bubble)。 当某个阶段因异常或条件判断跳过时,后续阶段无事可做,产生空闲周期。CPU 的分支预测错误会导致流水线冲刷(Pipeline Flush),软件流水线中的错误处理也会造成类似问题。
陷阱二:级间缓冲区溢出。 快阶段生产速度大于慢阶段消费速度,中间的缓冲区会不断增长。必须设置有界缓冲区(Bounded Buffer)并实现背压(Backpressure)。Go channel 的容量限制就是天然的背压机制。
陷阱三:延迟累加。 流水线提升吞吐量但不降低单个请求的延迟。一个 \(K\) 阶段的流水线,单个请求的延迟是所有阶段之和,至少 \(K \times T_{cycle}\)。如果 \(K\) 太大,延迟会超过 SLA。
四、Nagle 算法与 TCP 写优化
4.1 小包问题
TCP 每个数据包(Segment)都有固定的头部开销:20 字节 IP 头 + 20 字节 TCP 头 = 40 字节。如果应用层每次只发送 1 字节数据,有效载荷率只有 \(\frac{1}{41} \approx 2.4\%\),97.6% 的带宽浪费在头部上。这在 1984 年被 John Nagle 称为”小包问题”(Small Packet Problem),在当时的低带宽网络中尤其严重。
4.2 Nagle 算法的机制
Nagle 算法(RFC 896)的规则很简单:
- 如果当前没有未被确认(unacknowledged)的数据包在网络中飞行,立即发送数据。
- 如果有未确认的数据包,将新数据放入缓冲区等待,直到:
- 之前的数据包被 ACK 确认,或者
- 缓冲区累积到一个完整的 MSS(Maximum Segment Size,通常 1460 字节)
伪代码描述:
// Nagle 算法伪代码
void nagle_send(socket *s, const char *data, size_t len) {
if (s->unacked_bytes == 0) {
// 没有未确认的数据,立即发送
tcp_send(s, data, len);
s->unacked_bytes += len;
} else if (s->send_buffer_size + len >= MSS) {
// 缓冲区凑够一个 MSS,立即发送
buffer_append(s, data, len);
tcp_send(s, s->send_buffer, s->send_buffer_size);
s->unacked_bytes += s->send_buffer_size;
buffer_clear(s);
} else {
// 既有未确认数据,又没凑够 MSS,等待
buffer_append(s, data, len);
}
}
// 收到 ACK 时
void on_ack_received(socket *s) {
s->unacked_bytes = recalculate_unacked(s);
if (s->send_buffer_size > 0 && s->unacked_bytes == 0) {
tcp_send(s, s->send_buffer, s->send_buffer_size);
s->unacked_bytes += s->send_buffer_size;
buffer_clear(s);
}
}4.3 Nagle 与延迟 ACK 的交互
Nagle 算法单独看是合理的,但它和 TCP 延迟确认(Delayed ACK, RFC 1122)的交互会产生意外的延迟。
延迟 ACK 的规则是:收到数据包后不立即回复 ACK,而是等待最多 40ms(Linux 默认),希望在回复 ACK 的同时顺便携带应用层数据(ACK 搭便车)。
两者组合的糟糕场景:
- 客户端发送第一个小包(比如 HTTP 请求头),进入”等 ACK”状态
- 服务端收到请求头,延迟 ACK 启动 40ms 计时器,等待顺便搭载响应数据
- 服务端应用层还在处理请求,没有数据要发
- 客户端在 Nagle 等服务端的 ACK,服务端在延迟 ACK 等应用层数据
- 40ms 后延迟 ACK 超时,服务端发送纯 ACK
- 客户端收到 ACK,发送第二个小包(比如 HTTP 请求体)
本来几毫秒就能完成的两次发送,被拖到了 40ms 以上。这就是为什么几乎所有高性能网络程序都会关闭 Nagle 算法。
4.4 TCP_NODELAY:关闭 Nagle
设置 TCP_NODELAY 选项关闭 Nagle 算法,每次
write() 立即发送:
#include <sys/socket.h>
#include <netinet/tcp.h>
int enable_nodelay(int sockfd) {
int flag = 1;
return setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY,
&flag, sizeof(flag));
}Go 中 net.TCPConn 默认就是
TCP_NODELAY = true:
conn, err := net.Dial("tcp", "server:8080")
if err != nil {
log.Fatal(err)
}
tcpConn := conn.(*net.TCPConn)
tcpConn.SetNoDelay(true) // Go 默认已开启Rust 中使用 TcpStream:
use std::net::TcpStream;
fn main() -> std::io::Result<()> {
let stream = TcpStream::connect("127.0.0.1:8080")?;
stream.set_nodelay(true)?;
Ok(())
}4.5 TCP_CORK:应用层的写合并
关闭 Nagle
解决了延迟问题,但又回到了小包问题——如果应用层频繁调用
write() 发送小数据,每次都会立即生成一个 TCP
段。
Linux 提供了 TCP_CORK 选项(又称 Nagle
的替代方案)。设置 TCP_CORK = 1 后,TCP
会把后续的 write()
数据累积在内核缓冲区中,直到:
- 应用层设置
TCP_CORK = 0(“拔掉瓶塞”),或 - 缓冲区满(MSS),或
- 超时(200ms)
这让应用层精确控制发送边界。典型用法是在构造一个完整响应时:
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <string.h>
#include <unistd.h>
int send_http_response(int sockfd,
const char *headers,
const char *body,
size_t body_len) {
int cork = 1;
// 塞上瓶塞,开始积攒数据
setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &cork, sizeof(cork));
// 多次 write 不会产生多个小包
write(sockfd, headers, strlen(headers));
write(sockfd, "\r\n", 2);
write(sockfd, body, body_len);
// 拔掉瓶塞,一次性发送
cork = 0;
setsockopt(sockfd, IPPROTO_TCP, TCP_CORK, &cork, sizeof(cork));
return 0;
}4.6 TCP_NODELAY vs TCP_CORK
| 特性 | TCP_NODELAY | TCP_CORK |
|---|---|---|
| 功能 | 关闭 Nagle 算法 | 阻止发送直到显式释放 |
| 可移植性 | POSIX 标准,所有系统支持 | Linux 特有(FreeBSD 有 TCP_NOPUSH) |
| 控制粒度 | 连接级,一旦设置全程生效 | 可以反复开关,精确控制发送窗口 |
| 适用场景 | 交互式协议(SSH、游戏) | 构造完整消息后一次发送 |
| 风险 | 可能产生大量小包 | 忘记解除 cork 会导致数据不发送 |
| 与 sendfile 配合 | 无特殊优势 | 能合并 sendfile 和前后的 write |
在实际工程中,常见的组合策略是:设置
TCP_NODELAY = true 作为基线(避免 Nagle-Delayed
ACK 交互),然后在需要合并写入时使用
writev()(Scatter/Gather I/O)代替
TCP_CORK。writev() 是 POSIX
标准,可移植性更好:
#include <sys/uio.h>
#include <string.h>
int send_response_writev(int sockfd,
const char *headers,
const char *body,
size_t body_len) {
struct iovec iov[3];
iov[0].iov_base = (void *)headers;
iov[0].iov_len = strlen(headers);
iov[1].iov_base = "\r\n";
iov[1].iov_len = 2;
iov[2].iov_base = (void *)body;
iov[2].iov_len = body_len;
return writev(sockfd, iov, 3);
}五、I/O 调度与写合并
5.1 磁盘 I/O 调度器
操作系统的 I/O 调度器(I/O Scheduler)对写吞吐量有直接影响。Linux 提供多种调度算法:
| 调度器 | 策略 | 适用场景 |
|---|---|---|
| noop / none | 不做排序,先来先服务 | SSD、NVMe(硬件自带调度) |
| mq-deadline | 保证读写请求在截止时间内完成 | 数据库、延迟敏感型负载 |
| bfq | 按进程公平分配带宽 | 桌面、多租户系统 |
| kyber | 针对快速设备的低延迟调度 | NVMe 高 IOPS 场景 |
对于数据库服务器使用 SSD 的场景,通常选择
none(NVMe)或 mq-deadline(SATA
SSD),让硬件自身的 FTL(Flash Translation
Layer)处理写合并和磨损均衡。
5.2 写合并(Write Combining)
写合并(Write Combining)是将多次小写操作合并为一次大写操作的技术,在多个层次上存在:
CPU 层面: Write Combining Buffer(WCB)将对同一缓存行的多次写入合并为一次总线事务。这对显存写入尤其重要,GPU 驱动通常把显存区域映射为 Write-Combining 内存类型。
文件系统层面: ext4 的 Journal
模式会把多个小写操作合并到日志缓冲区,然后一次性刷盘。XFS
的延迟分配(Delayed Allocation)更进一步,在
write()
时不立即分配磁盘块,而是累积多次写入后批量分配连续的磁盘空间,减少碎片。
应用层面: 使用
BufferedWriter(Java)、bufio.Writer(Go)等缓冲写入器,减少系统调用次数:
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
public class BufferedWriteExample {
public static void writeLines(String path, Iterable<String> lines)
throws IOException {
// 默认缓冲区 8KB,可自定义
try (BufferedWriter writer =
new BufferedWriter(new FileWriter(path), 65536)) {
for (String line : lines) {
writer.write(line);
writer.newLine();
}
// close() 自动 flush
}
}
}5.3 fsync 的成本与策略
fsync() 把文件数据和元数据从页缓存(Page
Cache)刷到磁盘,是保证持久性的关键调用,也是写吞吐量的最大瓶颈之一。
一次 fsync()
在不同存储介质上的典型耗时:
| 存储介质 | fsync 延迟 | 对应 IOPS 上限 |
|---|---|---|
| HDD 7200rpm | 5-10ms | 100-200 |
| SATA SSD | 0.1-0.5ms | 2,000-10,000 |
| NVMe SSD | 0.02-0.1ms | 10,000-100,000 |
| Intel Optane | 0.01ms | 100,000+ |
| 电池备份 RAID 控制器 | <0.01ms | 立即返回 |
数据库系统通过”组提交”(Group Commit)优化
fsync 的影响:多个事务共享一次
fsync。MySQL InnoDB 的
innodb_flush_log_at_trx_commit
参数控制刷盘策略:
= 1:每次事务提交都 fsync(最安全,性能最低)= 2:每次提交写到 OS 缓存,每秒 fsync 一次(折中)= 0:每秒写入并 fsync 一次(最快,最多丢 1 秒数据)
六、并发模型对吞吐量的影响
6.1 线程模型
不同的并发模型(Concurrency Model)对吞吐量有根本性影响。
每请求一线程(Thread-per-Request): 最简单的模型。每个请求分配一个操作系统线程。线程数受限于内存(每个线程约 1MB 栈空间)和上下文切换开销。典型上限:几千个并发连接。
import java.net.ServerSocket;
import java.net.Socket;
public class ThreadPerRequestServer {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(8080);
while (true) {
Socket client = server.accept();
new Thread(() -> handleRequest(client)).start();
}
}
static void handleRequest(Socket client) {
// 处理请求
}
}线程池(Thread Pool): 限制线程数量,用队列缓冲请求。解决了线程爆炸问题,但线程在 I/O 阻塞时被占用,无法服务其他请求。
事件驱动(Event-Driven): 单线程或少量线程通过非阻塞 I/O 和事件循环处理大量并发连接。代表:Nginx、Node.js、Redis。吞吐量高但编程模型复杂,不适合 CPU 密集型任务。
协程/用户态线程: Go 的 goroutine、Java 21 的虚拟线程(Virtual Thread)。保留同步编程风格的同时实现高并发。goroutine 栈初始只有几 KB,可以轻松创建数十万个。
6.2 并发模型对比
| 模型 | 并发上限 | 编程复杂度 | CPU 利用率 | I/O 密集型 | CPU 密集型 |
|---|---|---|---|---|---|
| 每请求一线程 | 数千 | 低 | 低(上下文切换多) | 差 | 中等 |
| 线程池 | 数千~数万 | 低 | 中等 | 中等 | 良好 |
| 事件驱动 | 数十万 | 高(回调地狱) | 高 | 优秀 | 差(阻塞事件循环) |
| 协程 | 数十万~百万 | 低(同步风格) | 高 | 优秀 | 良好(配合多核调度) |
| Actor 模型 | 数百万 | 中等 | 高 | 优秀 | 良好 |
6.3 Java 虚拟线程实战
Java 21 正式引入虚拟线程(Virtual Thread),在 I/O 密集型场景下能显著提升吞吐量:
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class VirtualThreadServer {
public static void main(String[] args) throws Exception {
// 虚拟线程执行器:每个任务一个虚拟线程,没有线程数上限
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
try (ServerSocket server = new ServerSocket(8080)) {
while (true) {
Socket client = server.accept();
executor.submit(() -> handleRequest(client));
}
}
}
static void handleRequest(Socket client) {
// 虚拟线程在 I/O 阻塞时自动让出载体线程
// 不需要修改业务代码
}
}虚拟线程的吞吐量优势来自两个方面:
- 栈内存极小(初始几百字节 vs 操作系统线程的 1MB),可以创建百万级虚拟线程
- I/O 阻塞时自动让出底层平台线程(Carrier Thread),不浪费 CPU 时间片
6.4 Rust 的 async/await 与 Tokio
Rust 的 async/await 编译为状态机,配合 Tokio 运行时实现高效的用户态调度:
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("0.0.0.0:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0u8; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(_) => return,
};
if socket.write_all(&buf[..n]).await.is_err() {
return;
}
}
});
}
}Tokio 的工作窃取调度器(Work-Stealing Scheduler)在多核环境下自动平衡负载,空闲线程会从忙线程的队列中窃取任务,减少了手动调优线程池大小的负担。
七、工程案例:Kafka 生产者吞吐量调优
7.1 背景
某在线广告平台的点击事件采集系统需要将用户点击数据写入 Kafka。业务需求:
- 峰值流量:50 万事件/秒
- 单条事件大小:~200 字节
- 延迟要求:从事件产生到写入 Kafka 不超过 100ms(P99)
- 可靠性:允许极少量丢失(
acks=1)
7.2 初始配置与问题
使用默认配置的 Kafka 生产者,测试结果:
吞吐量:8 万事件/秒
P99 延迟:5ms
CPU 使用率:15%
网络带宽使用率:3%
吞吐量远低于需求。CPU 和网络都没有成为瓶颈,问题出在哪里?
7.3 分析过程
第一步:识别瓶颈。 使用 Kafka 生产者的
JMX 指标
record-send-rate、batch-size-avg、request-latency-avg
分析:
batch-size-avg: 1200 字节(远小于默认 batch.size 的 16KB)
record-queue-time-avg: 0.5ms
request-latency-avg: 3ms
records-per-request-avg: 6
平均每批只有 6 条记录,1200
字节。批太小了。原因:linger.ms 默认为
0,消息到达后立即发送,来不及凑批。
第二步:调大 linger.ms。 设
linger.ms = 5,让生产者等待 5ms 凑批:
吞吐量:25 万事件/秒
batch-size-avg: 12KB
records-per-request-avg: 60
P99 延迟:12ms
吞吐量提升 3 倍,但还不够。
第三步:增大 batch.size 和缓冲区。 设
batch.size = 131072(128KB),buffer.memory = 134217728(128MB):
吞吐量:42 万事件/秒
batch-size-avg: 45KB
records-per-request-avg: 225
P99 延迟:18ms
第四步:启用压缩。 设
compression.type = lz4,压缩在批级别进行,批越大压缩率越高:
吞吐量:55 万事件/秒
batch-size-avg: 48KB(压缩前)
compressed-size-avg: 14KB(压缩后)
P99 延迟:22ms
网络带宽使用率:8%
第五步:增加生产者线程数。 单个生产者实例受限于单线程的序列化和压缩。启动 4 个生产者实例,每个绑定不同的分区范围:
总吞吐量:65 万事件/秒
P99 延迟:25ms
CPU 使用率:55%
7.4 最终配置
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
props.put("acks", "1");
props.put("batch.size", 131072); // 128KB
props.put("linger.ms", 5); // 等待 5ms 凑批
props.put("buffer.memory", 134217728); // 128MB 总缓冲
props.put("compression.type", "lz4"); // LZ4 压缩
props.put("max.in.flight.requests.per.connection", 5);
props.put("send.buffer.bytes", 1048576); // 1MB TCP 发送缓冲区7.5 调优总结
flowchart TD
A["识别瓶颈<br/>JMX 指标分析"] --> B{"批大小是否<br/>接近 batch.size?"}
B -- 否 --> C["增大 linger.ms<br/>给更多时间凑批"]
B -- 是 --> D{"压缩是否启用?"}
C --> B
D -- 否 --> E["启用 LZ4 压缩<br/>减少网络传输量"]
D -- 是 --> F{"CPU 是否瓶颈?"}
E --> D
F -- 否 --> G["增大 batch.size<br/>增大 buffer.memory"]
F -- 是 --> H["增加生产者实例<br/>分担序列化负载"]
G --> F
H --> I["达到目标吞吐量"]
八、工程案例:数据库批量写入优化
8.1 PostgreSQL COPY 协议
对于大批量数据导入,PostgreSQL 的 COPY
命令比 INSERT 快一个数量级。COPY
使用专用的二进制协议,跳过 SQL
解析、查询规划、逐行事务等开销:
-- INSERT 方式:100 万行约 120 秒
INSERT INTO events (ts, user_id, event_type, payload)
VALUES ('2026-01-01', 1, 'click', '{}');
-- 重复 100 万次...
-- COPY 方式:100 万行约 5 秒
COPY events (ts, user_id, event_type, payload)
FROM STDIN WITH (FORMAT csv);
2026-01-01,1,click,{}
2026-01-01,2,view,{}
-- ...
\.Go 中使用 pgx 库的
CopyFrom:
func BulkInsertEvents(ctx context.Context, pool *pgxpool.Pool,
events []Event) (int64, error) {
rows := make([][]interface{}, len(events))
for i, e := range events {
rows[i] = []interface{}{e.Ts, e.UserID, e.EventType, e.Payload}
}
return pool.CopyFrom(
ctx,
pgx.Identifier{"events"},
[]string{"ts", "user_id", "event_type", "payload"},
pgx.CopyFromRows(rows),
)
}8.2 批量写入的索引策略
大批量写入时,索引维护是主要开销。每插入一行,B-Tree 索引需要找到插入位置、可能触发页分裂。当批量写入 100 万行时,索引维护可能占据 60% 以上的总时间。
优化策略:
先删索引后重建: 对于初始加载场景,先
DROP INDEX,加载数据,再CREATE INDEX。批量构建索引比逐行维护快得多,因为可以使用排序-合并算法一次性构建 B-Tree。调大 maintenance_work_mem: PostgreSQL 构建索引时使用
maintenance_work_mem作为排序缓冲区。默认 64MB,大批量加载时建议调到 1GB 以上。禁用 WAL: PostgreSQL 的
UNLOGGED TABLE跳过 WAL(Write-Ahead Log)写入,性能提升 2-3 倍,但数据库崩溃时会丢数据。适用于临时表或可重建的数据。
九、实用批处理模式
9.1 令牌桶与批量限流
高吞吐系统通常需要限流(Rate Limiting)。令牌桶(Token Bucket)算法在批处理场景下的变种是:不是每次请求消耗一个令牌,而是每批消耗令牌数等于批大小。这避免了批内逐条检查限流的开销:
package main
import (
"context"
"time"
"golang.org/x/time/rate"
)
func FlushWithRateLimit(ctx context.Context,
limiter *rate.Limiter, batch [][]byte) error {
// 整批一次性申请令牌,避免逐条检查的开销
if err := limiter.WaitN(ctx, len(batch)); err != nil {
return err
}
return processBatch(batch)
}
func processBatch(batch [][]byte) error {
return nil
}
func main() {
_ = rate.NewLimiter(rate.Every(time.Second/10000), 1000)
}9.2 分区并行批处理
当目标系统支持分区(如 Kafka Topic 的 Partition、数据库的分表),可以按分区分别凑批、并行发送。每个分区有独立的缓冲区和刷新逻辑,互不阻塞:
package main
import (
"fmt"
"hash/fnv"
)
type PartitionedBatcher struct {
partitions int
batchers []*AdaptiveBatcher[Record]
}
type Record struct {
Key string
Value []byte
}
func NewPartitionedBatcher(partitions int) *PartitionedBatcher {
pb := &PartitionedBatcher{
partitions: partitions,
batchers: make([]*AdaptiveBatcher[Record], partitions),
}
for i := 0; i < partitions; i++ {
partID := i
pb.batchers[i] = NewAdaptiveBatcher[Record](
10, 500, 0,
func(batch []Record) error {
fmt.Printf("flushing %d records to partition %d\n",
len(batch), partID)
return nil
},
)
}
return pb
}
func (pb *PartitionedBatcher) Add(record Record) error {
h := fnv.New32a()
h.Write([]byte(record.Key))
partition := int(h.Sum32()) % pb.partitions
return pb.batchers[partition].Add(record)
}十、吞吐量优化的系统性方法
10.1 优化步骤
吞吐量优化不是随机调参,而是一个系统性的排查过程:
确定目标。 明确需要多少吞吐量、在什么延迟约束下。没有明确目标的优化是浪费时间。
测量基线。 在当前配置下测量实际吞吐量。不要猜测,不要依赖理论计算。用真实负载或接近真实的压测工具(如 wrk、vegeta、k6)。
识别瓶颈。 用
perf、strace、bpftrace、async-profiler等工具定位瓶颈。常见瓶颈按优先级排序:- 锁竞争(检查
perf lock或mutex contention指标) - I/O 等待(检查
iostat、iotop) - 网络延迟(检查
ss -ti、tcpdump) - CPU 饱和(检查
mpstat、htop) - 内存不足导致的 GC 或 swap
- 锁竞争(检查
针对性优化。 只优化瓶颈环节。优化非瓶颈环节不会提升整体吞吐量(Amdahl 定律的推论)。
验证效果。 重新测量,确认吞吐量提升符合预期,延迟没有超出 SLA。
回到第 3 步。 瓶颈可能转移。原来的 I/O 瓶颈解决后,CPU 可能成为新的瓶颈。
10.2 常见反模式
反模式一:过早优化并发度。 在没有识别瓶颈的情况下就增加线程数。如果瓶颈是磁盘 I/O,增加线程只会增加锁争用,不会提升吞吐量。
反模式二:忽略延迟约束。 通过无限增大批大小来追求极端吞吐量,导致延迟不可控。批处理的吞吐量和延迟是一对权衡,必须在 SLA 约束下优化。
反模式三:微观优化。 花大量时间优化单条记录的处理时间(比如用 SIMD 加速 JSON 解析),但瓶颈实际在网络 RTT 或磁盘 fsync。方向错误的优化,无论多精妙都是浪费。
反模式四:忽略尾部延迟。 只看平均吞吐量和平均延迟,忽略 P99/P999 的毛刺。批处理和缓冲区溢出常常导致尾部延迟爆炸。
10.3 吞吐量 vs 延迟的全局视角
吞吐量和延迟的关系可以用排队论的经典模型描述。在 M/M/1 队列中,系统利用率 \(\rho = \frac{\lambda}{\mu}\)(到达率 / 服务率),平均等待时间为:
\[W = \frac{1}{\mu - \lambda} = \frac{1}{\mu(1 - \rho)}\]
当 \(\rho\) 接近 1(吞吐量接近系统容量)时,等待时间趋向无穷。这意味着:
- 系统利用率 50% 时,平均延迟是空载的 2 倍
- 系统利用率 80% 时,平均延迟是空载的 5 倍
- 系统利用率 90% 时,平均延迟是空载的 10 倍
实际工程中,通常把目标利用率控制在 60-70%,为流量尖峰留出余量。超过 80% 的持续利用率几乎必然导致延迟问题。
十一、总结
吞吐量优化的三条路径各有适用场景:
| 技术 | 核心机制 | 适用场景 | 主要风险 |
|---|---|---|---|
| 批处理 | 分摊固定开销 | I/O 密集、固定开销大 | 延迟增加、失败影响范围大 |
| 流水线 | 阶段重叠执行 | 多阶段串行处理 | 级间不平衡、延迟累加 |
| 并发 | 多处理单元并行 | 可并行化比例高 | 锁竞争、协调开销 |
三者可以组合使用。Kafka 生产者就是一个典型:批处理凑批减少网络请求次数,多分区并行发送提升并行度,生产者内部的 Sender 线程和应用线程形成两级流水线。
最重要的原则不是”哪种技术更好”,而是”瓶颈在哪里”。不识别瓶颈就开始优化,大概率走弯路。用测量替代猜测,用数据驱动决策。
下一篇将深入线程模型的细节:操作系统线程、用户态线程、协程、事件循环——它们的调度机制有什么区别,在不同负载模式下应该如何选择。
参考资料
书籍
- John L. Hennessy, David A. Patterson, Computer Architecture: A Quantitative Approach, 6th Edition, Morgan Kaufmann, 2017. 指令流水线、存储层次和吞吐量分析的权威教材。
- Martin Kleppmann, Designing Data-Intensive Applications, O’Reilly, 2017. 批处理、流处理、数据系统的架构设计,第十、十一章对批处理有深入讨论。
- Brendan Gregg, Systems Performance: Enterprise and the Cloud, 2nd Edition, Addison-Wesley, 2020. 系统性能分析的方法论和工具,覆盖 CPU、内存、I/O、网络各层的吞吐量分析。
- W. Richard Stevens, TCP/IP Illustrated, Volume 1: The Protocols, 2nd Edition, Addison-Wesley, 2011. TCP 协议细节,包括 Nagle 算法、延迟 ACK、滑动窗口。
- Neil J. Gunther, Guerrilla Capacity Planning, Springer, 2007. Universal Scalability Law 的原始出处,量化分析并行系统的扩展性上限。
论文与 RFC
- John Nagle, “Congestion Control in IP/TCP Internetworks”, RFC 896, January 1984. Nagle 算法的原始提案。
- Robert Braden, “Requirements for Internet Hosts – Communication Layers”, RFC 1122, October 1989. 延迟 ACK 的规范。
- Gene M. Amdahl, “Validity of the Single Processor Approach to Achieving Large Scale Computing Capabilities”, AFIPS Conference Proceedings, 1967. Amdahl 定律的原始论文。
- John D. C. Little, “A Proof for the Queuing Formula: L = λW”, Operations Research, 9(3):383-387, 1961. Little 定律的数学证明。
- Neil J. Gunther, “A General Theory of Computational Scalability Based on Rational Functions”, arXiv:0808.1431, 2008. USL 的形式化描述。
在线资源
- Apache Kafka Documentation, “Producer Configs”, kafka.apache.org. Kafka 生产者批处理参数的官方文档。
- PostgreSQL Documentation, “COPY”, postgresql.org. COPY 命令的语法和性能特性。
- Brendan Gregg, “Linux Performance Analysis in 60 Seconds”, brendangregg.com. 快速定位性能瓶颈的实用清单。
- Linux Kernel Documentation, “Block I/O Schedulers”, kernel.org. Linux I/O 调度器的实现细节。
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【系统架构设计百科】架构质量属性:不只是"高可用高性能"
需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。
【系统架构设计百科】告警策略:如何避免"狼来了"
大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。
【系统架构设计百科】复杂性管理:架构的核心战场
系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略
【系统架构设计百科】微服务架构深度审视:优势、代价与适用边界
微服务不是免费的午餐。本文从分布式系统八大谬误出发,拆解微服务真正解决的问题与引入的代价,梳理服务边界划分的工程方法论,还原 Amazon 和 Netflix 从单体到微服务的真实演进时间线,给出微服务适用与不适用的判断框架。