土法炼钢兴趣小组的算法知识备份

【网络工程】网络 I/O 模式:Reactor、Proactor 与协程

文章导航

分类入口
network
标签入口
#reactor#proactor#io-uring#coroutine#async-io#network-programming

目录

前几篇分别讲了 epoll、零拷贝、DPDK 和 XDP——它们解决的是”怎么高效地收发数据”。本文讨论更高层的问题:拿到 I/O 事件后,应用程序应该用什么模式来组织处理逻辑? Reactor、Proactor、协程——三种模式各有取舍,选错了要么性能差,要么代码复杂得无法维护。

一、I/O 模式的核心问题

1.1 同步与异步的本质区别

网络 I/O 的核心矛盾:I/O 操作本身很慢,但 CPU 处理逻辑很快

网络 I/O 的时间尺度:

CPU 指令执行:      ~0.3 ns
L1 缓存访问:       ~1 ns
L2 缓存访问:       ~3 ns
L3 缓存访问:       ~10 ns
主内存访问:        ~100 ns
─────────────────────────
SSD 随机读:        ~100 μs      (比内存慢 1000 倍)
网络往返(同机房):  ~500 μs
网络往返(跨城):    ~10 ms       (比 CPU 指令慢 3000 万倍)
网络往返(跨洲):    ~100 ms

问题就变成了:等待 I/O 完成的那段时间,CPU 干什么?

模式 CPU 在 I/O 等待期间 编程模型 资源效率
阻塞同步 休眠(让出 CPU) 最简单 最差(线程多)
非阻塞同步 轮询检查 复杂 差(浪费 CPU)
I/O 多路复用 等待多个 fd 就绪 Reactor
异步 I/O 提交请求后做别的事 Proactor 最好

1.2 两种核心模式

Reactor 模式:当 I/O 就绪时通知应用,应用自己执行 I/O 操作 - 类比:餐厅服务员告诉你”你的菜好了”,你自己去端 - 核心调用:epoll_wait()read()/write()

Proactor 模式:应用提交 I/O 请求,完成后通知应用 - 类比:餐厅服务员直接把菜端到你桌上 - 核心调用:io_uring_submit() → 收到完成通知

Reactor(同步非阻塞 + 事件通知):
应用 → 注册感兴趣的事件(EPOLLIN)
        ↓
     epoll_wait() 等待
        ↓
     事件就绪(fd 可读)
        ↓
     应用调用 read() ← 应用自己执行 I/O
        ↓
     处理数据

Proactor(异步 I/O + 完成通知):
应用 → 提交读取请求(io_uring_prep_read)
        ↓
     io_uring_submit() 提交
        ↓
     内核异步执行 I/O ← 内核负责执行
        ↓
     应用收到完成通知
        ↓
     数据已在缓冲区中,直接处理

二、Reactor 模式详解

2.1 单线程 Reactor

最简单的 Reactor:一个线程处理所有 I/O 事件和业务逻辑。Redis 的核心就是这个模式。

/* single_reactor.c - 单线程 Reactor */
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#define MAX_EVENTS 1024
#define BUF_SIZE   4096

typedef void (*event_handler)(int fd, uint32_t events, void *data);

struct event_ctx {
    int fd;
    event_handler handler;
    void *data;
};

struct reactor {
    int epoll_fd;
    int running;
};

int reactor_init(struct reactor *r) {
    r->epoll_fd = epoll_create1(0);
    r->running = 1;
    return r->epoll_fd >= 0 ? 0 : -1;
}

int reactor_register(struct reactor *r, int fd, uint32_t events,
                     event_handler handler, void *data) {
    struct event_ctx *ctx = malloc(sizeof(*ctx));
    ctx->fd = fd;
    ctx->handler = handler;
    ctx->data = data;

    struct epoll_event ev = {
        .events = events,
        .data.ptr = ctx
    };
    return epoll_ctl(r->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
}

void reactor_run(struct reactor *r) {
    struct epoll_event events[MAX_EVENTS];

    while (r->running) {
        int nfds = epoll_wait(r->epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; i++) {
            struct event_ctx *ctx = events[i].data.ptr;
            ctx->handler(ctx->fd, events[i].events, ctx->data);
        }
    }
}

/* 连接处理器 */
void on_connection(int fd, uint32_t events, void *data) {
    (void)events;
    struct reactor *r = data;
    char buf[BUF_SIZE];

    ssize_t n = read(fd, buf, sizeof(buf));
    if (n <= 0) {
        epoll_ctl(r->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
        close(fd);
        return;
    }

    /* 处理请求(在同一线程中)
     * 如果这里的处理很慢,会阻塞整个 Reactor
     */
    process_request(buf, n);

    /* 回写响应 */
    const char *resp = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK";
    write(fd, resp, strlen(resp));
}

/* 监听器 */
void on_accept(int listen_fd, uint32_t events, void *data) {
    (void)events;
    struct reactor *r = data;

    int client_fd = accept4(listen_fd, NULL, NULL, SOCK_NONBLOCK);
    if (client_fd < 0) return;

    reactor_register(r, client_fd, EPOLLIN | EPOLLET,
                     on_connection, r);
}

void process_request(const char *buf, ssize_t n) {
    (void)buf; (void)n;
}

单线程 Reactor 的局限

优点:
✓ 无锁,无线程同步开销
✓ 代码简单,调试容易
✓ 适合 I/O 密集 + 计算轻量的场景(如 Redis)

缺点:
✗ 业务处理阻塞会拖垮所有连接
✗ 无法利用多核
✗ 一个慢请求影响所有其他请求

2.2 多线程 Reactor(Worker Pool)

将业务逻辑分离到工作线程池,Reactor 线程只负责 I/O:

/* mt_reactor.c - Reactor + 工作线程池 */
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define QUEUE_SIZE 4096

/* 无锁队列(简化版,实际用 rte_ring 或类似实现) */
struct task {
    int fd;
    char data[4096];
    ssize_t len;
};

struct task_queue {
    struct task tasks[QUEUE_SIZE];
    int head, tail;
    pthread_mutex_t lock;
    pthread_cond_t cond;
};

void queue_push(struct task_queue *q, struct task *t) {
    pthread_mutex_lock(&q->lock);
    q->tasks[q->tail % QUEUE_SIZE] = *t;
    q->tail++;
    pthread_cond_signal(&q->cond);
    pthread_mutex_unlock(&q->lock);
}

struct task queue_pop(struct task_queue *q) {
    pthread_mutex_lock(&q->lock);
    while (q->head == q->tail)
        pthread_cond_wait(&q->cond, &q->lock);
    struct task t = q->tasks[q->head % QUEUE_SIZE];
    q->head++;
    pthread_mutex_unlock(&q->lock);
    return t;
}

static struct task_queue work_queue;

/* 工作线程 */
void *worker_thread(void *arg) {
    (void)arg;
    while (1) {
        struct task t = queue_pop(&work_queue);

        /* 执行耗时的业务逻辑 */
        char response[4096];
        int resp_len = process_business_logic(t.data, t.len,
                                               response, sizeof(response));

        /* 回写结果(注意:这里跨线程操作 fd) */
        write(t.fd, response, resp_len);
    }
    return NULL;
}

/* Reactor 线程中的连接处理器(只做 I/O,不做业务) */
void on_readable(int fd, uint32_t events, void *data) {
    (void)events; (void)data;
    struct task t;
    t.fd = fd;
    t.len = read(fd, t.data, sizeof(t.data));
    if (t.len <= 0) {
        close(fd);
        return;
    }
    /* 提交到工作队列 */
    queue_push(&work_queue, &t);
}

int process_business_logic(const char *in, ssize_t in_len,
                           char *out, size_t out_size) {
    (void)in; (void)in_len;
    return snprintf(out, out_size,
                    "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK");
}

/* 启动 */
void start_mt_reactor(int n_workers) {
    /* 初始化队列 */
    pthread_mutex_init(&work_queue.lock, NULL);
    pthread_cond_init(&work_queue.cond, NULL);

    /* 启动工作线程 */
    for (int i = 0; i < n_workers; i++) {
        pthread_t tid;
        pthread_create(&tid, NULL, worker_thread, NULL);
        pthread_detach(tid);
    }

    /* Reactor 主循环(复用之前的 reactor_run) */
    /* ... */
}

2.3 主从 Reactor(Multiple Reactors)

Nginx 和 Netty 使用的模式:一个主 Reactor 接受连接,多个子 Reactor 处理 I/O。

/* multi_reactor.c - 主从 Reactor 模式 */
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define NUM_SUB_REACTORS 4
#define MAX_EVENTS 1024

struct sub_reactor {
    int epoll_fd;
    int pipe_fd[2];   /* 用于主 Reactor 通知新连接 */
    pthread_t thread;
    int id;
};

static struct sub_reactor sub_reactors[NUM_SUB_REACTORS];
static int next_reactor = 0;

/* 子 Reactor 线程 */
void *sub_reactor_thread(void *arg) {
    struct sub_reactor *sr = arg;
    struct epoll_event events[MAX_EVENTS];

    printf("Sub-reactor %d started\n", sr->id);

    while (1) {
        int nfds = epoll_wait(sr->epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; i++) {
            int fd = events[i].data.fd;

            if (fd == sr->pipe_fd[0]) {
                /* 收到新连接通知 */
                int client_fd;
                read(sr->pipe_fd[0], &client_fd, sizeof(client_fd));

                struct epoll_event ev = {
                    .events = EPOLLIN | EPOLLET,
                    .data.fd = client_fd
                };
                epoll_ctl(sr->epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
            } else {
                /* 处理已有连接的 I/O */
                char buf[4096];
                ssize_t n = read(fd, buf, sizeof(buf));
                if (n <= 0) {
                    epoll_ctl(sr->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
                    close(fd);
                    continue;
                }
                /* 处理请求并响应 */
                const char *resp =
                    "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK";
                write(fd, resp, 38);
            }
        }
    }
    return NULL;
}

/* 初始化子 Reactor */
void init_sub_reactors(void) {
    for (int i = 0; i < NUM_SUB_REACTORS; i++) {
        sub_reactors[i].id = i;
        sub_reactors[i].epoll_fd = epoll_create1(0);
        pipe(sub_reactors[i].pipe_fd);

        /* 把 pipe 读端加入子 Reactor 的 epoll */
        struct epoll_event ev = {
            .events = EPOLLIN,
            .data.fd = sub_reactors[i].pipe_fd[0]
        };
        epoll_ctl(sub_reactors[i].epoll_fd, EPOLL_CTL_ADD,
                  sub_reactors[i].pipe_fd[0], &ev);

        pthread_create(&sub_reactors[i].thread, NULL,
                       sub_reactor_thread, &sub_reactors[i]);
    }
}

/* 主 Reactor:只负责 accept */
void main_reactor(int listen_fd) {
    int epoll_fd = epoll_create1(0);
    struct epoll_event ev = {
        .events = EPOLLIN,
        .data.fd = listen_fd
    };
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);

    struct epoll_event events[MAX_EVENTS];

    while (1) {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; i++) {
            int client_fd = accept4(listen_fd, NULL, NULL, SOCK_NONBLOCK);
            if (client_fd < 0) continue;

            /* Round-Robin 分发到子 Reactor */
            int idx = next_reactor % NUM_SUB_REACTORS;
            next_reactor++;

            /* 通过 pipe 传递 fd 给子 Reactor */
            write(sub_reactors[idx].pipe_fd[1],
                  &client_fd, sizeof(client_fd));
        }
    }
}

2.4 Reactor 模式总结

变体 线程模型 代表实现 适用场景
单线程 Reactor 1 个线程做所有事 Redis I/O 密集、逻辑简单
Reactor + Worker Pool 1 个 I/O 线程 + N 个业务线程 早期 Tomcat 业务逻辑耗时
主从 Reactor 1 个 accept + N 个 I/O 线程 Nginx, Netty 高并发通用
每线程 Reactor 每个线程独立 Reactor Memcached 多核利用率高

三、Proactor 模式与 io_uring

3.1 传统 AIO 的失败

Linux 有两套异步 I/O API,第一套(POSIX AIO / libaio)在网络场景几乎不可用:

/*
 * POSIX AIO (aio_read/aio_write):
 * - 用线程池模拟异步,不是真正的内核异步
 * - 性能差
 *
 * Linux Native AIO (io_submit/io_getevents):
 * - 只支持 O_DIRECT 文件 I/O
 * - 不支持 socket
 * - 不支持普通文件(非 O_DIRECT)
 * - 有时会退化为同步
 *
 * 结论:Linux 在 io_uring 之前,几乎没有可用的异步网络 I/O
 * 所以 Linux 上的高性能网络编程一直用 Reactor 模式
 */

3.2 io_uring:真正的异步 I/O

io_uring(Linux 5.1+)是 Linux 的新一代异步 I/O 框架,天然适合 Proactor 模式:

/* proactor_iouring.c - 基于 io_uring 的 Proactor */
#include <liburing.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define QUEUE_DEPTH 256
#define BUF_SIZE    4096

enum event_type {
    EVENT_ACCEPT,
    EVENT_READ,
    EVENT_WRITE,
};

struct request {
    enum event_type type;
    int fd;
    char buf[BUF_SIZE];
    int buf_len;
};

struct io_uring ring;

/* 提交 accept 请求 */
void submit_accept(int listen_fd) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    struct request *req = calloc(1, sizeof(*req));
    req->type = EVENT_ACCEPT;
    req->fd = listen_fd;

    io_uring_prep_accept(sqe, listen_fd, NULL, NULL, 0);
    io_uring_sqe_set_data(sqe, req);
    io_uring_submit(&ring);
}

/* 提交 read 请求 */
void submit_read(int fd) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    struct request *req = calloc(1, sizeof(*req));
    req->type = EVENT_READ;
    req->fd = fd;

    io_uring_prep_read(sqe, fd, req->buf, BUF_SIZE, 0);
    io_uring_sqe_set_data(sqe, req);
    io_uring_submit(&ring);
}

/* 提交 write 请求 */
void submit_write(int fd, const char *data, int len) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    struct request *req = calloc(1, sizeof(*req));
    req->type = EVENT_WRITE;
    req->fd = fd;
    memcpy(req->buf, data, len);
    req->buf_len = len;

    io_uring_prep_write(sqe, fd, req->buf, len, 0);
    io_uring_sqe_set_data(sqe, req);
    io_uring_submit(&ring);
}

/* Proactor 事件循环 */
void proactor_loop(int listen_fd) {
    /* 提交第一个 accept 请求 */
    submit_accept(listen_fd);

    while (1) {
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        struct request *req = io_uring_cqe_get_data(cqe);
        int result = cqe->res;
        io_uring_cqe_seen(&ring, cqe);

        switch (req->type) {
        case EVENT_ACCEPT:
            if (result >= 0) {
                int client_fd = result;
                /* accept 完成:提交 read 请求 */
                submit_read(client_fd);
            }
            /* 继续接受下一个连接 */
            submit_accept(listen_fd);
            break;

        case EVENT_READ:
            if (result > 0) {
                /* 数据已经在 buf 中!
                 * 注意区别:Reactor 模式在这里才调用 read()
                 * Proactor 模式的 read 已经由内核完成了
                 */
                const char *resp =
                    "HTTP/1.1 200 OK\r\n"
                    "Content-Length: 2\r\n\r\nOK";
                submit_write(req->fd, resp, strlen(resp));
            } else {
                close(req->fd);
            }
            break;

        case EVENT_WRITE:
            if (result > 0) {
                /* 写入完成,继续读取下一个请求 */
                submit_read(req->fd);
            } else {
                close(req->fd);
            }
            break;
        }

        free(req);
    }
}

int main(void) {
    /* 初始化 io_uring */
    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);

    /* 创建监听 socket */
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(8080),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(listen_fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(listen_fd, 128);

    printf("Proactor server on port 8080\n");
    proactor_loop(listen_fd);

    io_uring_queue_exit(&ring);
    return 0;
}

3.3 io_uring 的高级特性

/* io_uring 的批量提交和链式操作 */

/* 1. 批量提交:多个请求一次 submit */
void batch_submit(struct io_uring *ring, int *fds, int n) {
    for (int i = 0; i < n; i++) {
        struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
        struct request *req = calloc(1, sizeof(*req));
        req->type = EVENT_READ;
        req->fd = fds[i];
        io_uring_prep_read(sqe, fds[i], req->buf, BUF_SIZE, 0);
        io_uring_sqe_set_data(sqe, req);
    }
    /* 一次系统调用提交所有请求 */
    io_uring_submit(ring);
}

/* 2. 链式操作:read → write 自动串联 */
void submit_read_then_write(struct io_uring *ring, int fd) {
    struct request *req = calloc(1, sizeof(*req));
    req->type = EVENT_READ;
    req->fd = fd;

    /* 第一步:read */
    struct io_uring_sqe *sqe1 = io_uring_get_sqe(ring);
    io_uring_prep_read(sqe1, fd, req->buf, BUF_SIZE, 0);
    sqe1->flags |= IOSQE_IO_LINK;  /* 链到下一个 */
    io_uring_sqe_set_data(sqe1, req);

    /* 第二步:write(在 read 完成后自动执行) */
    struct io_uring_sqe *sqe2 = io_uring_get_sqe(ring);
    io_uring_prep_write(sqe2, fd, "OK", 2, 0);
    io_uring_sqe_set_data(sqe2, req);

    io_uring_submit(ring);
}

/* 3. 固定缓冲区:避免每次 I/O 的缓冲区注册开销 */
void setup_fixed_buffers(struct io_uring *ring) {
    struct iovec iovs[32];
    for (int i = 0; i < 32; i++) {
        iovs[i].iov_base = malloc(BUF_SIZE);
        iovs[i].iov_len = BUF_SIZE;
    }
    /* 注册固定缓冲区 */
    io_uring_register_buffers(ring, iovs, 32);

    /* 使用固定缓冲区 I/O */
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_read_fixed(sqe, fd, iovs[0].iov_base,
                              BUF_SIZE, 0, 0 /* buf_index */);
}

/* 4. SQPOLL 模式:内核轮询 SQ,连 submit 系统调用都省了 */
void setup_sqpoll(struct io_uring *ring) {
    struct io_uring_params params = {
        .flags = IORING_SETUP_SQPOLL,
        .sq_thread_idle = 2000,  /* 2 秒无请求后停止轮询 */
    };
    io_uring_queue_init_params(256, ring, &params);
    /*
     * SQPOLL 模式下:
     * 内核有一个专门的线程轮询 SQ
     * 应用只需要写入 SQ,不需要调用 io_uring_submit()
     * 真正的零系统调用 I/O
     */
}

3.4 Reactor vs Proactor 对比

维度 Reactor Proactor
I/O 执行者 应用程序调用 read/write 内核/框架执行
通知内容 “I/O 就绪了” “I/O 完成了”
系统调用 epoll_wait + read/write io_uring_submit
缓冲区管理 应用分配 提前注册给内核
取消操作 不读就行 需要显式取消
代码复杂度 中等 高(状态机更复杂)
Linux 成熟度 非常成熟(epoll) 较新(io_uring 5.1+)
性能 更高(少系统调用)
Windows 对应 select/IOCP IOCP(原生 Proactor)

四、协程:同步风格的异步 I/O

Reactor 和 Proactor 的共同问题是回调地狱——业务逻辑被拆散到多个回调函数中,难以编写和维护。协程(Coroutine)用同步的写法实现异步的效果。

4.1 协程的核心思想

传统回调(Reactor):
void on_read(int fd) {
    read(fd, buf, ...);
    // 处理请求
    // 提交写事件
}
void on_write(int fd) {
    write(fd, resp, ...);
    // 写完了
    // 提交读事件等待下一个请求
}
// 逻辑分散在多个回调中

协程(同步风格):
async function handle_connection(fd) {
    while (true) {
        data = await read(fd);        // 挂起,不阻塞线程
        response = process(data);     // 同步处理
        await write(fd, response);    // 挂起,不阻塞线程
    }
}
// 逻辑集中在一个函数中,像同步代码一样

4.2 Go goroutine 模型

Go 的网络 I/O 是协程模型的典范:用 goroutine + 网络轮询器(netpoller)实现。

// go_server.go - Go 的协程网络模型
package main

import (
    "fmt"
    "io"
    "net"
    "runtime"
)

func handleConn(conn net.Conn) {
    defer conn.Close()
    buf := make([]byte, 4096)

    for {
        // conn.Read() 看起来是同步阻塞的
        // 实际上 Go runtime 会:
        // 1. 尝试非阻塞 read
        // 2. 如果 EAGAIN,将 goroutine 挂起
        // 3. 把 fd 注册到 netpoller(底层是 epoll)
        // 4. 调度其他 goroutine 运行
        // 5. epoll 通知 fd 可读时,恢复这个 goroutine
        n, err := conn.Read(buf)
        if err != nil {
            if err != io.EOF {
                fmt.Println("read error:", err)
            }
            return
        }

        // 同步风格的业务处理
        response := processRequest(buf[:n])

        // conn.Write() 同理
        _, err = conn.Write(response)
        if err != nil {
            return
        }
    }
}

func processRequest(data []byte) []byte {
    return []byte("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
}

func main() {
    // 使用所有 CPU 核
    runtime.GOMAXPROCS(runtime.NumCPU())

    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        panic(err)
    }

    for {
        conn, err := ln.Accept()
        if err != nil {
            continue
        }
        // 每个连接一个 goroutine
        // goroutine 极轻量:初始栈 2-8 KB(线程通常 1-8 MB)
        go handleConn(conn)
    }
}

/*
 * Go netpoller 的内部原理:
 *
 * 1. runtime 在启动时创建 epoll 实例
 * 2. 每个 socket 操作(Read/Write)先尝试非阻塞
 * 3. 如果 EAGAIN:
 *    a. 将 goroutine 的状态保存到 g 结构体
 *    b. 将 fd 注册到 epoll(EPOLLIN/EPOLLOUT)
 *    c. 调用 gopark() 挂起 goroutine
 *    d. 调度器(scheduler)运行其他 goroutine
 * 4. sysmon goroutine 或调度循环中调用 epoll_wait()
 * 5. 有事件就绪时,将对应的 goroutine 放回运行队列
 *
 * 结果:开发者写的是同步代码,底层是 Reactor 模式
 * goroutine 切换开销:~100 ns(线程切换:~1-10 μs)
 */

4.3 Rust async/await 模型

Rust 的异步模型基于 Future trait 和 executor:

// rust_server.rs - Rust 异步网络服务器(使用 tokio)
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
    println!("Server on port 8080");

    loop {
        let (stream, _addr) = listener.accept().await.unwrap();
        // tokio::spawn 创建一个异步任务(类似 goroutine)
        tokio::spawn(handle_connection(stream));
    }
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buf = vec![0u8; 4096];

    loop {
        // .await 点是挂起点
        // 编译器将 async fn 转换为状态机
        let n = match stream.read(&mut buf).await {
            Ok(0) => return,  // 连接关闭
            Ok(n) => n,
            Err(_) => return,
        };

        let response = process_request(&buf[..n]);

        if stream.write_all(&response).await.is_err() {
            return;
        }
    }
}

fn process_request(_data: &[u8]) -> Vec<u8> {
    b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".to_vec()
}

/*
 * Rust async 的内部原理:
 *
 * 1. async fn 被编译器转换为 impl Future 的状态机
 * 2. 每个 .await 点是一个状态转换
 * 3. tokio runtime(executor)驱动 Future:
 *    a. 调用 Future::poll()
 *    b. 如果返回 Poll::Pending,注册 Waker
 *    c. 底层 I/O(mio/epoll)就绪时,调用 Waker::wake()
 *    d. executor 再次 poll() 这个 Future
 * 4. 零开销抽象:状态机在编译时确定,无需堆分配
 *
 * tokio 的调度模型:
 * - 多线程 runtime:work-stealing 调度器
 * - 单线程 runtime:适合嵌入到其他系统
 * - 支持 io_uring 后端(tokio-uring crate)
 */

4.4 Java Virtual Thread 模型

Java 21 引入了虚拟线程(Virtual Thread),类似 Go goroutine 的轻量级线程:

// VirtualThreadServer.java - Java 虚拟线程网络服务器
import java.io.*;
import java.net.*;
import java.util.concurrent.Executors;

public class VirtualThreadServer {
    public static void main(String[] args) throws Exception {
        // 虚拟线程执行器:每个任务运行在一个虚拟线程上
        var executor = Executors.newVirtualThreadPerTaskExecutor();
        var server = new ServerSocket(8080);
        System.out.println("Server on port 8080");

        while (true) {
            Socket client = server.accept();
            // 每个连接一个虚拟线程
            executor.submit(() -> handleConnection(client));
        }
    }

    static void handleConnection(Socket client) {
        try (client;
             var in = client.getInputStream();
             var out = client.getOutputStream()) {

            byte[] buf = new byte[4096];
            int n;

            while ((n = in.read(buf)) > 0) {
                // 看起来是同步阻塞的 I/O
                // 虚拟线程在阻塞时自动让出载体线程
                byte[] response = processRequest(buf, n);
                out.write(response);
                out.flush();
            }
        } catch (IOException e) {
            // 连接错误
        }
    }

    static byte[] processRequest(byte[] data, int len) {
        return "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK"
               .getBytes();
    }
}

/*
 * Java Virtual Thread 的内部原理:
 *
 * 1. 虚拟线程运行在载体线程(carrier thread)上
 * 2. 载体线程是平台线程,数量 = CPU 核数
 * 3. 当虚拟线程执行阻塞 I/O:
 *    a. JVM 检测到阻塞操作
 *    b. 将虚拟线程从载体线程上"卸载"(unmount)
 *    c. 载体线程可以运行其他虚拟线程
 *    d. I/O 完成后,虚拟线程被"挂载"到(可能不同的)载体线程
 * 4. 底层使用 NIO + epoll,但用户代码写同步风格
 *
 * 与 Go goroutine 的区别:
 * - Java VT 使用 JNI/JVM 机制,Go 用自己的运行时
 * - Java VT 在 synchronized 块和 native 方法中会钉住载体线程
 * - Go goroutine 的调度器更成熟(多年迭代)
 */

4.5 协程模型对比

维度 Go goroutine Rust async Java Virtual Thread
调度器 Go runtime(M:N) tokio/async-std JVM(M:N)
栈模型 动态栈(2KB 起) 无栈(状态机) 动态栈(几百 B 起)
内存开销 ~4-8 KB/goroutine ~几十字节/Future ~几百 B/VT
创建成本 ~300 ns ~50 ns ~1 μs
切换成本 ~100 ns ~10 ns ~100 ns
抢占 有(Go 1.14+) 无(协作式)
I/O 后端 epoll(内置) epoll/kqueue/IOCP NIO + epoll
io_uring 支持 实验性 tokio-uring 计划中
生态成熟度 中(Java 21+)
学习曲线 高(生命周期/Pin)

五、模式选型决策

5.1 决策矩阵

你的网络服务应该选什么模式?

连接数量级?
├── < 1K 并发 → 线程池(每连接一线程)足够
│   适用:内部工具、管理接口
│
├── 1K-100K 并发
│   ├── 开发效率优先?
│   │   ├── Go → goroutine(最简单)
│   │   ├── Java → Virtual Thread(Java 21+)
│   │   └── Python → asyncio
│   └── 性能优先?
│       ├── C/C++ → 主从 Reactor(epoll)
│       ├── Rust → tokio async
│       └── C → io_uring Proactor
│
├── 100K-1M 并发
│   ├── 有协议栈需求?→ 协程模型(Go/Rust)
│   └── 原始包处理?→ XDP/AF_XDP
│
└── > 1M 并发 → DPDK + 用户态协议栈
    适用:电信/CDN/L4 LB

5.2 性能基准参考

典型的 HTTP echo 服务器基准测试(单机,8 核,万兆网卡):

实现 模式 RPS P99 延迟 内存/万连接
Nginx 主从 Reactor ~300K 1.2 ms ~50 MB
Go net/http goroutine ~250K 1.5 ms ~80 MB
tokio (Rust) async/await ~400K 0.8 ms ~30 MB
io_uring (C) Proactor ~500K 0.5 ms ~20 MB
epoll (C) 主从 Reactor ~450K 0.6 ms ~25 MB

注:以上数据仅供量级参考,实际性能取决于具体场景、硬件配置和调优程度。

5.3 实际选型建议

工程实践中的选择:

1. 大多数 Web 服务:Go / Java Virtual Thread
   原因:开发效率 > 极致性能,维护成本低

2. 基础设施组件(代理/网关/LB):C + epoll / Rust + tokio
   原因:需要极致性能和资源控制
   案例:Nginx, Envoy, HAProxy, Cloudflare

3. 数据库/存储系统:io_uring Proactor / Rust async
   原因:需要统一的网络+磁盘异步 I/O
   案例:TiKV(Rust), ScyllaDB(Seastar/C++)

4. 包处理/NFV:DPDK / XDP
   原因:需要线速处理
   案例:Katran, Cilium, Open vSwitch

5. 避免的选择:
   × C++ Boost.Asio 的回调模式(可读性差)
   × select/poll(性能差,只适合 POC)
   × POSIX AIO(不是真异步)

参考文献

  1. Douglas C. Schmidt, “Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events,” 1995.
  2. Jens Axboe, “Efficient IO with io_uring,” Linux Kernel Documentation, 2019.
  3. Russ Cox, “The Go Scheduler,” Go Blog / Runtime Documentation.
  4. tokio.rs, “Tokio Tutorial: Bridging with Sync Code,” tokio.rs.
  5. JEP 444: Virtual Threads, OpenJDK, 2023.
  6. Shuveb Hussain, “Lord of the io_uring,” unixism.net, 2020.
  7. Bryan Cantrill, “The Problem with Threads,” USENIX HotOS 2006.

上一篇: XDP 与 AF_XDP:eBPF 驱动的早期包处理 下一篇: DDoS 防御架构:容量型、协议型与应用层攻击

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2025-08-06 · network

【网络工程】可编程数据平面与 P4:软件定义转发

传统网络设备的转发逻辑固化在硬件中。P4 语言让交换机的转发管线可编程——你可以定义自己的包头解析、匹配规则和转发动作。本文从 P4 语言核心概念出发,讲解 Parser/Match-Action/Deparser 的编程模型、可编程交换机芯片(Tofino)的架构、P4 在数据中心和运营商网络中的应用案例,以及 P4 与 eBPF 的定位差异。

2025-07-25 · network

【网络工程】UDP 工程:何时用 UDP、怎么用好 UDP

UDP 不是'不可靠的 TCP',它是一张白纸——不做连接管理、不做流控、不做重传,把所有决策权交给应用层。本文从 UDP 的协议本质出发,剖析它在游戏、视频、DNS、QUIC 等场景中的工程用法,深入分析 MTU/分片/NAT 穿越等工程陷阱,并给出高性能 UDP 编程的内核调优方法。


By .