前几篇分别讲了 epoll、零拷贝、DPDK 和 XDP——它们解决的是”怎么高效地收发数据”。本文讨论更高层的问题:拿到 I/O 事件后,应用程序应该用什么模式来组织处理逻辑? Reactor、Proactor、协程——三种模式各有取舍,选错了要么性能差,要么代码复杂得无法维护。
一、I/O 模式的核心问题
1.1 同步与异步的本质区别
网络 I/O 的核心矛盾:I/O 操作本身很慢,但 CPU 处理逻辑很快。
网络 I/O 的时间尺度:
CPU 指令执行: ~0.3 ns
L1 缓存访问: ~1 ns
L2 缓存访问: ~3 ns
L3 缓存访问: ~10 ns
主内存访问: ~100 ns
─────────────────────────
SSD 随机读: ~100 μs (比内存慢 1000 倍)
网络往返(同机房): ~500 μs
网络往返(跨城): ~10 ms (比 CPU 指令慢 3000 万倍)
网络往返(跨洲): ~100 ms
问题就变成了:等待 I/O 完成的那段时间,CPU 干什么?
| 模式 | CPU 在 I/O 等待期间 | 编程模型 | 资源效率 |
|---|---|---|---|
| 阻塞同步 | 休眠(让出 CPU) | 最简单 | 最差(线程多) |
| 非阻塞同步 | 轮询检查 | 复杂 | 差(浪费 CPU) |
| I/O 多路复用 | 等待多个 fd 就绪 | Reactor | 好 |
| 异步 I/O | 提交请求后做别的事 | Proactor | 最好 |
1.2 两种核心模式
Reactor 模式:当 I/O
就绪时通知应用,应用自己执行 I/O 操作 -
类比:餐厅服务员告诉你”你的菜好了”,你自己去端 -
核心调用:epoll_wait() →
read()/write()
Proactor 模式:应用提交 I/O
请求,完成后通知应用 - 类比:餐厅服务员直接把菜端到你桌上 -
核心调用:io_uring_submit() → 收到完成通知
Reactor(同步非阻塞 + 事件通知):
应用 → 注册感兴趣的事件(EPOLLIN)
↓
epoll_wait() 等待
↓
事件就绪(fd 可读)
↓
应用调用 read() ← 应用自己执行 I/O
↓
处理数据
Proactor(异步 I/O + 完成通知):
应用 → 提交读取请求(io_uring_prep_read)
↓
io_uring_submit() 提交
↓
内核异步执行 I/O ← 内核负责执行
↓
应用收到完成通知
↓
数据已在缓冲区中,直接处理
二、Reactor 模式详解
2.1 单线程 Reactor
最简单的 Reactor:一个线程处理所有 I/O 事件和业务逻辑。Redis 的核心就是这个模式。
/* single_reactor.c - 单线程 Reactor */
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#define MAX_EVENTS 1024
#define BUF_SIZE 4096
typedef void (*event_handler)(int fd, uint32_t events, void *data);
struct event_ctx {
int fd;
event_handler handler;
void *data;
};
struct reactor {
int epoll_fd;
int running;
};
int reactor_init(struct reactor *r) {
r->epoll_fd = epoll_create1(0);
r->running = 1;
return r->epoll_fd >= 0 ? 0 : -1;
}
int reactor_register(struct reactor *r, int fd, uint32_t events,
event_handler handler, void *data) {
struct event_ctx *ctx = malloc(sizeof(*ctx));
ctx->fd = fd;
ctx->handler = handler;
ctx->data = data;
struct epoll_event ev = {
.events = events,
.data.ptr = ctx
};
return epoll_ctl(r->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
}
void reactor_run(struct reactor *r) {
struct epoll_event events[MAX_EVENTS];
while (r->running) {
int nfds = epoll_wait(r->epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < nfds; i++) {
struct event_ctx *ctx = events[i].data.ptr;
ctx->handler(ctx->fd, events[i].events, ctx->data);
}
}
}
/* 连接处理器 */
void on_connection(int fd, uint32_t events, void *data) {
(void)events;
struct reactor *r = data;
char buf[BUF_SIZE];
ssize_t n = read(fd, buf, sizeof(buf));
if (n <= 0) {
epoll_ctl(r->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return;
}
/* 处理请求(在同一线程中)
* 如果这里的处理很慢,会阻塞整个 Reactor
*/
process_request(buf, n);
/* 回写响应 */
const char *resp = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK";
write(fd, resp, strlen(resp));
}
/* 监听器 */
void on_accept(int listen_fd, uint32_t events, void *data) {
(void)events;
struct reactor *r = data;
int client_fd = accept4(listen_fd, NULL, NULL, SOCK_NONBLOCK);
if (client_fd < 0) return;
reactor_register(r, client_fd, EPOLLIN | EPOLLET,
on_connection, r);
}
void process_request(const char *buf, ssize_t n) {
(void)buf; (void)n;
}单线程 Reactor 的局限:
优点:
✓ 无锁,无线程同步开销
✓ 代码简单,调试容易
✓ 适合 I/O 密集 + 计算轻量的场景(如 Redis)
缺点:
✗ 业务处理阻塞会拖垮所有连接
✗ 无法利用多核
✗ 一个慢请求影响所有其他请求
2.2 多线程 Reactor(Worker Pool)
将业务逻辑分离到工作线程池,Reactor 线程只负责 I/O:
/* mt_reactor.c - Reactor + 工作线程池 */
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define QUEUE_SIZE 4096
/* 无锁队列(简化版,实际用 rte_ring 或类似实现) */
struct task {
int fd;
char data[4096];
ssize_t len;
};
struct task_queue {
struct task tasks[QUEUE_SIZE];
int head, tail;
pthread_mutex_t lock;
pthread_cond_t cond;
};
void queue_push(struct task_queue *q, struct task *t) {
pthread_mutex_lock(&q->lock);
q->tasks[q->tail % QUEUE_SIZE] = *t;
q->tail++;
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->lock);
}
struct task queue_pop(struct task_queue *q) {
pthread_mutex_lock(&q->lock);
while (q->head == q->tail)
pthread_cond_wait(&q->cond, &q->lock);
struct task t = q->tasks[q->head % QUEUE_SIZE];
q->head++;
pthread_mutex_unlock(&q->lock);
return t;
}
static struct task_queue work_queue;
/* 工作线程 */
void *worker_thread(void *arg) {
(void)arg;
while (1) {
struct task t = queue_pop(&work_queue);
/* 执行耗时的业务逻辑 */
char response[4096];
int resp_len = process_business_logic(t.data, t.len,
response, sizeof(response));
/* 回写结果(注意:这里跨线程操作 fd) */
write(t.fd, response, resp_len);
}
return NULL;
}
/* Reactor 线程中的连接处理器(只做 I/O,不做业务) */
void on_readable(int fd, uint32_t events, void *data) {
(void)events; (void)data;
struct task t;
t.fd = fd;
t.len = read(fd, t.data, sizeof(t.data));
if (t.len <= 0) {
close(fd);
return;
}
/* 提交到工作队列 */
queue_push(&work_queue, &t);
}
int process_business_logic(const char *in, ssize_t in_len,
char *out, size_t out_size) {
(void)in; (void)in_len;
return snprintf(out, out_size,
"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK");
}
/* 启动 */
void start_mt_reactor(int n_workers) {
/* 初始化队列 */
pthread_mutex_init(&work_queue.lock, NULL);
pthread_cond_init(&work_queue.cond, NULL);
/* 启动工作线程 */
for (int i = 0; i < n_workers; i++) {
pthread_t tid;
pthread_create(&tid, NULL, worker_thread, NULL);
pthread_detach(tid);
}
/* Reactor 主循环(复用之前的 reactor_run) */
/* ... */
}2.3 主从 Reactor(Multiple Reactors)
Nginx 和 Netty 使用的模式:一个主 Reactor 接受连接,多个子 Reactor 处理 I/O。
/* multi_reactor.c - 主从 Reactor 模式 */
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define NUM_SUB_REACTORS 4
#define MAX_EVENTS 1024
struct sub_reactor {
int epoll_fd;
int pipe_fd[2]; /* 用于主 Reactor 通知新连接 */
pthread_t thread;
int id;
};
static struct sub_reactor sub_reactors[NUM_SUB_REACTORS];
static int next_reactor = 0;
/* 子 Reactor 线程 */
void *sub_reactor_thread(void *arg) {
struct sub_reactor *sr = arg;
struct epoll_event events[MAX_EVENTS];
printf("Sub-reactor %d started\n", sr->id);
while (1) {
int nfds = epoll_wait(sr->epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
if (fd == sr->pipe_fd[0]) {
/* 收到新连接通知 */
int client_fd;
read(sr->pipe_fd[0], &client_fd, sizeof(client_fd));
struct epoll_event ev = {
.events = EPOLLIN | EPOLLET,
.data.fd = client_fd
};
epoll_ctl(sr->epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
} else {
/* 处理已有连接的 I/O */
char buf[4096];
ssize_t n = read(fd, buf, sizeof(buf));
if (n <= 0) {
epoll_ctl(sr->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
continue;
}
/* 处理请求并响应 */
const char *resp =
"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK";
write(fd, resp, 38);
}
}
}
return NULL;
}
/* 初始化子 Reactor */
void init_sub_reactors(void) {
for (int i = 0; i < NUM_SUB_REACTORS; i++) {
sub_reactors[i].id = i;
sub_reactors[i].epoll_fd = epoll_create1(0);
pipe(sub_reactors[i].pipe_fd);
/* 把 pipe 读端加入子 Reactor 的 epoll */
struct epoll_event ev = {
.events = EPOLLIN,
.data.fd = sub_reactors[i].pipe_fd[0]
};
epoll_ctl(sub_reactors[i].epoll_fd, EPOLL_CTL_ADD,
sub_reactors[i].pipe_fd[0], &ev);
pthread_create(&sub_reactors[i].thread, NULL,
sub_reactor_thread, &sub_reactors[i]);
}
}
/* 主 Reactor:只负责 accept */
void main_reactor(int listen_fd) {
int epoll_fd = epoll_create1(0);
struct epoll_event ev = {
.events = EPOLLIN,
.data.fd = listen_fd
};
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);
struct epoll_event events[MAX_EVENTS];
while (1) {
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < nfds; i++) {
int client_fd = accept4(listen_fd, NULL, NULL, SOCK_NONBLOCK);
if (client_fd < 0) continue;
/* Round-Robin 分发到子 Reactor */
int idx = next_reactor % NUM_SUB_REACTORS;
next_reactor++;
/* 通过 pipe 传递 fd 给子 Reactor */
write(sub_reactors[idx].pipe_fd[1],
&client_fd, sizeof(client_fd));
}
}
}2.4 Reactor 模式总结
| 变体 | 线程模型 | 代表实现 | 适用场景 |
|---|---|---|---|
| 单线程 Reactor | 1 个线程做所有事 | Redis | I/O 密集、逻辑简单 |
| Reactor + Worker Pool | 1 个 I/O 线程 + N 个业务线程 | 早期 Tomcat | 业务逻辑耗时 |
| 主从 Reactor | 1 个 accept + N 个 I/O 线程 | Nginx, Netty | 高并发通用 |
| 每线程 Reactor | 每个线程独立 Reactor | Memcached | 多核利用率高 |
三、Proactor 模式与 io_uring
3.1 传统 AIO 的失败
Linux 有两套异步 I/O API,第一套(POSIX AIO / libaio)在网络场景几乎不可用:
/*
* POSIX AIO (aio_read/aio_write):
* - 用线程池模拟异步,不是真正的内核异步
* - 性能差
*
* Linux Native AIO (io_submit/io_getevents):
* - 只支持 O_DIRECT 文件 I/O
* - 不支持 socket
* - 不支持普通文件(非 O_DIRECT)
* - 有时会退化为同步
*
* 结论:Linux 在 io_uring 之前,几乎没有可用的异步网络 I/O
* 所以 Linux 上的高性能网络编程一直用 Reactor 模式
*/3.2 io_uring:真正的异步 I/O
io_uring(Linux 5.1+)是 Linux 的新一代异步 I/O 框架,天然适合 Proactor 模式:
/* proactor_iouring.c - 基于 io_uring 的 Proactor */
#include <liburing.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define QUEUE_DEPTH 256
#define BUF_SIZE 4096
enum event_type {
EVENT_ACCEPT,
EVENT_READ,
EVENT_WRITE,
};
struct request {
enum event_type type;
int fd;
char buf[BUF_SIZE];
int buf_len;
};
struct io_uring ring;
/* 提交 accept 请求 */
void submit_accept(int listen_fd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
struct request *req = calloc(1, sizeof(*req));
req->type = EVENT_ACCEPT;
req->fd = listen_fd;
io_uring_prep_accept(sqe, listen_fd, NULL, NULL, 0);
io_uring_sqe_set_data(sqe, req);
io_uring_submit(&ring);
}
/* 提交 read 请求 */
void submit_read(int fd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
struct request *req = calloc(1, sizeof(*req));
req->type = EVENT_READ;
req->fd = fd;
io_uring_prep_read(sqe, fd, req->buf, BUF_SIZE, 0);
io_uring_sqe_set_data(sqe, req);
io_uring_submit(&ring);
}
/* 提交 write 请求 */
void submit_write(int fd, const char *data, int len) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
struct request *req = calloc(1, sizeof(*req));
req->type = EVENT_WRITE;
req->fd = fd;
memcpy(req->buf, data, len);
req->buf_len = len;
io_uring_prep_write(sqe, fd, req->buf, len, 0);
io_uring_sqe_set_data(sqe, req);
io_uring_submit(&ring);
}
/* Proactor 事件循环 */
void proactor_loop(int listen_fd) {
/* 提交第一个 accept 请求 */
submit_accept(listen_fd);
while (1) {
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
struct request *req = io_uring_cqe_get_data(cqe);
int result = cqe->res;
io_uring_cqe_seen(&ring, cqe);
switch (req->type) {
case EVENT_ACCEPT:
if (result >= 0) {
int client_fd = result;
/* accept 完成:提交 read 请求 */
submit_read(client_fd);
}
/* 继续接受下一个连接 */
submit_accept(listen_fd);
break;
case EVENT_READ:
if (result > 0) {
/* 数据已经在 buf 中!
* 注意区别:Reactor 模式在这里才调用 read()
* Proactor 模式的 read 已经由内核完成了
*/
const char *resp =
"HTTP/1.1 200 OK\r\n"
"Content-Length: 2\r\n\r\nOK";
submit_write(req->fd, resp, strlen(resp));
} else {
close(req->fd);
}
break;
case EVENT_WRITE:
if (result > 0) {
/* 写入完成,继续读取下一个请求 */
submit_read(req->fd);
} else {
close(req->fd);
}
break;
}
free(req);
}
}
int main(void) {
/* 初始化 io_uring */
io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
/* 创建监听 socket */
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(8080),
.sin_addr.s_addr = INADDR_ANY
};
bind(listen_fd, (struct sockaddr *)&addr, sizeof(addr));
listen(listen_fd, 128);
printf("Proactor server on port 8080\n");
proactor_loop(listen_fd);
io_uring_queue_exit(&ring);
return 0;
}3.3 io_uring 的高级特性
/* io_uring 的批量提交和链式操作 */
/* 1. 批量提交:多个请求一次 submit */
void batch_submit(struct io_uring *ring, int *fds, int n) {
for (int i = 0; i < n; i++) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct request *req = calloc(1, sizeof(*req));
req->type = EVENT_READ;
req->fd = fds[i];
io_uring_prep_read(sqe, fds[i], req->buf, BUF_SIZE, 0);
io_uring_sqe_set_data(sqe, req);
}
/* 一次系统调用提交所有请求 */
io_uring_submit(ring);
}
/* 2. 链式操作:read → write 自动串联 */
void submit_read_then_write(struct io_uring *ring, int fd) {
struct request *req = calloc(1, sizeof(*req));
req->type = EVENT_READ;
req->fd = fd;
/* 第一步:read */
struct io_uring_sqe *sqe1 = io_uring_get_sqe(ring);
io_uring_prep_read(sqe1, fd, req->buf, BUF_SIZE, 0);
sqe1->flags |= IOSQE_IO_LINK; /* 链到下一个 */
io_uring_sqe_set_data(sqe1, req);
/* 第二步:write(在 read 完成后自动执行) */
struct io_uring_sqe *sqe2 = io_uring_get_sqe(ring);
io_uring_prep_write(sqe2, fd, "OK", 2, 0);
io_uring_sqe_set_data(sqe2, req);
io_uring_submit(ring);
}
/* 3. 固定缓冲区:避免每次 I/O 的缓冲区注册开销 */
void setup_fixed_buffers(struct io_uring *ring) {
struct iovec iovs[32];
for (int i = 0; i < 32; i++) {
iovs[i].iov_base = malloc(BUF_SIZE);
iovs[i].iov_len = BUF_SIZE;
}
/* 注册固定缓冲区 */
io_uring_register_buffers(ring, iovs, 32);
/* 使用固定缓冲区 I/O */
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_read_fixed(sqe, fd, iovs[0].iov_base,
BUF_SIZE, 0, 0 /* buf_index */);
}
/* 4. SQPOLL 模式:内核轮询 SQ,连 submit 系统调用都省了 */
void setup_sqpoll(struct io_uring *ring) {
struct io_uring_params params = {
.flags = IORING_SETUP_SQPOLL,
.sq_thread_idle = 2000, /* 2 秒无请求后停止轮询 */
};
io_uring_queue_init_params(256, ring, ¶ms);
/*
* SQPOLL 模式下:
* 内核有一个专门的线程轮询 SQ
* 应用只需要写入 SQ,不需要调用 io_uring_submit()
* 真正的零系统调用 I/O
*/
}3.4 Reactor vs Proactor 对比
| 维度 | Reactor | Proactor |
|---|---|---|
| I/O 执行者 | 应用程序调用 read/write | 内核/框架执行 |
| 通知内容 | “I/O 就绪了” | “I/O 完成了” |
| 系统调用 | epoll_wait + read/write | io_uring_submit |
| 缓冲区管理 | 应用分配 | 提前注册给内核 |
| 取消操作 | 不读就行 | 需要显式取消 |
| 代码复杂度 | 中等 | 高(状态机更复杂) |
| Linux 成熟度 | 非常成熟(epoll) | 较新(io_uring 5.1+) |
| 性能 | 高 | 更高(少系统调用) |
| Windows 对应 | select/IOCP | IOCP(原生 Proactor) |
四、协程:同步风格的异步 I/O
Reactor 和 Proactor 的共同问题是回调地狱——业务逻辑被拆散到多个回调函数中,难以编写和维护。协程(Coroutine)用同步的写法实现异步的效果。
4.1 协程的核心思想
传统回调(Reactor):
void on_read(int fd) {
read(fd, buf, ...);
// 处理请求
// 提交写事件
}
void on_write(int fd) {
write(fd, resp, ...);
// 写完了
// 提交读事件等待下一个请求
}
// 逻辑分散在多个回调中
协程(同步风格):
async function handle_connection(fd) {
while (true) {
data = await read(fd); // 挂起,不阻塞线程
response = process(data); // 同步处理
await write(fd, response); // 挂起,不阻塞线程
}
}
// 逻辑集中在一个函数中,像同步代码一样
4.2 Go goroutine 模型
Go 的网络 I/O 是协程模型的典范:用 goroutine + 网络轮询器(netpoller)实现。
// go_server.go - Go 的协程网络模型
package main
import (
"fmt"
"io"
"net"
"runtime"
)
func handleConn(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 4096)
for {
// conn.Read() 看起来是同步阻塞的
// 实际上 Go runtime 会:
// 1. 尝试非阻塞 read
// 2. 如果 EAGAIN,将 goroutine 挂起
// 3. 把 fd 注册到 netpoller(底层是 epoll)
// 4. 调度其他 goroutine 运行
// 5. epoll 通知 fd 可读时,恢复这个 goroutine
n, err := conn.Read(buf)
if err != nil {
if err != io.EOF {
fmt.Println("read error:", err)
}
return
}
// 同步风格的业务处理
response := processRequest(buf[:n])
// conn.Write() 同理
_, err = conn.Write(response)
if err != nil {
return
}
}
}
func processRequest(data []byte) []byte {
return []byte("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
}
func main() {
// 使用所有 CPU 核
runtime.GOMAXPROCS(runtime.NumCPU())
ln, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
for {
conn, err := ln.Accept()
if err != nil {
continue
}
// 每个连接一个 goroutine
// goroutine 极轻量:初始栈 2-8 KB(线程通常 1-8 MB)
go handleConn(conn)
}
}
/*
* Go netpoller 的内部原理:
*
* 1. runtime 在启动时创建 epoll 实例
* 2. 每个 socket 操作(Read/Write)先尝试非阻塞
* 3. 如果 EAGAIN:
* a. 将 goroutine 的状态保存到 g 结构体
* b. 将 fd 注册到 epoll(EPOLLIN/EPOLLOUT)
* c. 调用 gopark() 挂起 goroutine
* d. 调度器(scheduler)运行其他 goroutine
* 4. sysmon goroutine 或调度循环中调用 epoll_wait()
* 5. 有事件就绪时,将对应的 goroutine 放回运行队列
*
* 结果:开发者写的是同步代码,底层是 Reactor 模式
* goroutine 切换开销:~100 ns(线程切换:~1-10 μs)
*/4.3 Rust async/await 模型
Rust 的异步模型基于 Future trait 和 executor:
// rust_server.rs - Rust 异步网络服务器(使用 tokio)
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
println!("Server on port 8080");
loop {
let (stream, _addr) = listener.accept().await.unwrap();
// tokio::spawn 创建一个异步任务(类似 goroutine)
tokio::spawn(handle_connection(stream));
}
}
async fn handle_connection(mut stream: TcpStream) {
let mut buf = vec![0u8; 4096];
loop {
// .await 点是挂起点
// 编译器将 async fn 转换为状态机
let n = match stream.read(&mut buf).await {
Ok(0) => return, // 连接关闭
Ok(n) => n,
Err(_) => return,
};
let response = process_request(&buf[..n]);
if stream.write_all(&response).await.is_err() {
return;
}
}
}
fn process_request(_data: &[u8]) -> Vec<u8> {
b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".to_vec()
}
/*
* Rust async 的内部原理:
*
* 1. async fn 被编译器转换为 impl Future 的状态机
* 2. 每个 .await 点是一个状态转换
* 3. tokio runtime(executor)驱动 Future:
* a. 调用 Future::poll()
* b. 如果返回 Poll::Pending,注册 Waker
* c. 底层 I/O(mio/epoll)就绪时,调用 Waker::wake()
* d. executor 再次 poll() 这个 Future
* 4. 零开销抽象:状态机在编译时确定,无需堆分配
*
* tokio 的调度模型:
* - 多线程 runtime:work-stealing 调度器
* - 单线程 runtime:适合嵌入到其他系统
* - 支持 io_uring 后端(tokio-uring crate)
*/4.4 Java Virtual Thread 模型
Java 21 引入了虚拟线程(Virtual Thread),类似 Go goroutine 的轻量级线程:
// VirtualThreadServer.java - Java 虚拟线程网络服务器
import java.io.*;
import java.net.*;
import java.util.concurrent.Executors;
public class VirtualThreadServer {
public static void main(String[] args) throws Exception {
// 虚拟线程执行器:每个任务运行在一个虚拟线程上
var executor = Executors.newVirtualThreadPerTaskExecutor();
var server = new ServerSocket(8080);
System.out.println("Server on port 8080");
while (true) {
Socket client = server.accept();
// 每个连接一个虚拟线程
executor.submit(() -> handleConnection(client));
}
}
static void handleConnection(Socket client) {
try (client;
var in = client.getInputStream();
var out = client.getOutputStream()) {
byte[] buf = new byte[4096];
int n;
while ((n = in.read(buf)) > 0) {
// 看起来是同步阻塞的 I/O
// 虚拟线程在阻塞时自动让出载体线程
byte[] response = processRequest(buf, n);
out.write(response);
out.flush();
}
} catch (IOException e) {
// 连接错误
}
}
static byte[] processRequest(byte[] data, int len) {
return "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK"
.getBytes();
}
}
/*
* Java Virtual Thread 的内部原理:
*
* 1. 虚拟线程运行在载体线程(carrier thread)上
* 2. 载体线程是平台线程,数量 = CPU 核数
* 3. 当虚拟线程执行阻塞 I/O:
* a. JVM 检测到阻塞操作
* b. 将虚拟线程从载体线程上"卸载"(unmount)
* c. 载体线程可以运行其他虚拟线程
* d. I/O 完成后,虚拟线程被"挂载"到(可能不同的)载体线程
* 4. 底层使用 NIO + epoll,但用户代码写同步风格
*
* 与 Go goroutine 的区别:
* - Java VT 使用 JNI/JVM 机制,Go 用自己的运行时
* - Java VT 在 synchronized 块和 native 方法中会钉住载体线程
* - Go goroutine 的调度器更成熟(多年迭代)
*/4.5 协程模型对比
| 维度 | Go goroutine | Rust async | Java Virtual Thread |
|---|---|---|---|
| 调度器 | Go runtime(M:N) | tokio/async-std | JVM(M:N) |
| 栈模型 | 动态栈(2KB 起) | 无栈(状态机) | 动态栈(几百 B 起) |
| 内存开销 | ~4-8 KB/goroutine | ~几十字节/Future | ~几百 B/VT |
| 创建成本 | ~300 ns | ~50 ns | ~1 μs |
| 切换成本 | ~100 ns | ~10 ns | ~100 ns |
| 抢占 | 有(Go 1.14+) | 无(协作式) | 有 |
| I/O 后端 | epoll(内置) | epoll/kqueue/IOCP | NIO + epoll |
| io_uring 支持 | 实验性 | tokio-uring | 计划中 |
| 生态成熟度 | 高 | 高 | 中(Java 21+) |
| 学习曲线 | 低 | 高(生命周期/Pin) | 低 |
五、模式选型决策
5.1 决策矩阵
你的网络服务应该选什么模式?
连接数量级?
├── < 1K 并发 → 线程池(每连接一线程)足够
│ 适用:内部工具、管理接口
│
├── 1K-100K 并发
│ ├── 开发效率优先?
│ │ ├── Go → goroutine(最简单)
│ │ ├── Java → Virtual Thread(Java 21+)
│ │ └── Python → asyncio
│ └── 性能优先?
│ ├── C/C++ → 主从 Reactor(epoll)
│ ├── Rust → tokio async
│ └── C → io_uring Proactor
│
├── 100K-1M 并发
│ ├── 有协议栈需求?→ 协程模型(Go/Rust)
│ └── 原始包处理?→ XDP/AF_XDP
│
└── > 1M 并发 → DPDK + 用户态协议栈
适用:电信/CDN/L4 LB
5.2 性能基准参考
典型的 HTTP echo 服务器基准测试(单机,8 核,万兆网卡):
| 实现 | 模式 | RPS | P99 延迟 | 内存/万连接 |
|---|---|---|---|---|
| Nginx | 主从 Reactor | ~300K | 1.2 ms | ~50 MB |
| Go net/http | goroutine | ~250K | 1.5 ms | ~80 MB |
| tokio (Rust) | async/await | ~400K | 0.8 ms | ~30 MB |
| io_uring (C) | Proactor | ~500K | 0.5 ms | ~20 MB |
| epoll (C) | 主从 Reactor | ~450K | 0.6 ms | ~25 MB |
注:以上数据仅供量级参考,实际性能取决于具体场景、硬件配置和调优程度。
5.3 实际选型建议
工程实践中的选择:
1. 大多数 Web 服务:Go / Java Virtual Thread
原因:开发效率 > 极致性能,维护成本低
2. 基础设施组件(代理/网关/LB):C + epoll / Rust + tokio
原因:需要极致性能和资源控制
案例:Nginx, Envoy, HAProxy, Cloudflare
3. 数据库/存储系统:io_uring Proactor / Rust async
原因:需要统一的网络+磁盘异步 I/O
案例:TiKV(Rust), ScyllaDB(Seastar/C++)
4. 包处理/NFV:DPDK / XDP
原因:需要线速处理
案例:Katran, Cilium, Open vSwitch
5. 避免的选择:
× C++ Boost.Asio 的回调模式(可读性差)
× select/poll(性能差,只适合 POC)
× POSIX AIO(不是真异步)
参考文献
- Douglas C. Schmidt, “Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events,” 1995.
- Jens Axboe, “Efficient IO with io_uring,” Linux Kernel Documentation, 2019.
- Russ Cox, “The Go Scheduler,” Go Blog / Runtime Documentation.
- tokio.rs, “Tokio Tutorial: Bridging with Sync Code,” tokio.rs.
- JEP 444: Virtual Threads, OpenJDK, 2023.
- Shuveb Hussain, “Lord of the io_uring,” unixism.net, 2020.
- Bryan Cantrill, “The Problem with Threads,” USENIX HotOS 2006.
上一篇: XDP 与 AF_XDP:eBPF 驱动的早期包处理 下一篇: DDoS 防御架构:容量型、协议型与应用层攻击
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【网络工程】可编程数据平面与 P4:软件定义转发
传统网络设备的转发逻辑固化在硬件中。P4 语言让交换机的转发管线可编程——你可以定义自己的包头解析、匹配规则和转发动作。本文从 P4 语言核心概念出发,讲解 Parser/Match-Action/Deparser 的编程模型、可编程交换机芯片(Tofino)的架构、P4 在数据中心和运营商网络中的应用案例,以及 P4 与 eBPF 的定位差异。
【网络工程】Socket 编程模型演进:从阻塞到多路复用
网络编程模型的选择决定了服务的并发能力上限。本文从阻塞 I/O 到非阻塞、select、poll、epoll,逐步解剖每种模型的系统调用开销、性能边界与适用场景,用 C 代码实测从 C10K 到 C1M 的演进。
【网络工程】UDP 工程:何时用 UDP、怎么用好 UDP
UDP 不是'不可靠的 TCP',它是一张白纸——不做连接管理、不做流控、不做重传,把所有决策权交给应用层。本文从 UDP 的协议本质出发,剖析它在游戏、视频、DNS、QUIC 等场景中的工程用法,深入分析 MTU/分片/NAT 穿越等工程陷阱,并给出高性能 UDP 编程的内核调优方法。
跨越世纪的挑战:从C10K到C10M,现代网络架构如何突破并发极限
深度解析C10K到C10M问题的演进,涵盖从select/poll到epoll、io_uring的I/O模型变革,Reactor与Proactor模式的实现,事件驱动架构,内核旁路技术(DPDK),以及Go/Erlang的M:N调度模型,全面剖析现代高并发网络编程的理论本质与工程实践。