Rust 的 async/await 只是语法糖。
编译器帮你把 async fn
变成一个状态机,但它不负责执行。真正的魔法在
runtime——而 tokio 用大约 40000 行代码,管理你每一个
.await 的挂起、唤醒、调度和 I/O。
如果你写过 Go,你知道 go func() 背后有 GMP 调度器
在默默工作。Rust 的区别在于:语言本身不包含
runtime,你得自己选一个。tokio
是事实标准,但它到底在干什么?
本文基于 tokio 1.x 源码,从五个层面拆解这个运行时:
- Rust async 模型本身的设计
- work-stealing 多线程调度器
- Waker 唤醒机制
- I/O 驱动层(mio + epoll)
- Timer 时间轮
如果你对 Linux 底层的异步 I/O 感兴趣,推荐先看 io_uring 系列,那是另一条技术路线。
一、Rust async 模型速览
Future trait:一切的起点
Rust 的异步模型建立在一个 trait 上:
// 标准库的 Future trait,简化版
trait Future {
type Output;
// runtime 反复调用 poll,直到返回 Ready
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T), // 完成了,拿到结果
Pending, // 还没好,先挂起
}这是一个 pull-based 模型:runtime
主动去”拉”(poll)Future 的进度,而不是 Future
主动”推”结果。每次 poll
要么得到结果(Ready),要么得到”还没好”(Pending)。
编译器把 async fn 变成状态机
当你写这样的代码:
// 一个简单的 async 函数
async fn fetch_data(url: &str) -> String {
// 第一个 .await —— 状态机的第一个挂起点
let response = http_get(url).await;
// 第二个 .await —— 状态机的第二个挂起点
let body = response.text().await;
body
}编译器大致会把它变成:
// 编译器生成的状态机(伪代码)
enum FetchDataFuture<'a> {
// 初始状态:还没开始
State0 { url: &'a str },
// 第一个 await 挂起:等 http_get 完成
State1 { url: &'a str, http_future: HttpGetFuture<'a> },
// 第二个 await 挂起:等 text() 完成
State2 { response: Response, text_future: TextFuture },
// 已完成
Done,
}
impl<'a> Future for FetchDataFuture<'a> {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
loop {
match self.state {
State0 { url } => {
// 创建 http_get future,转移到 State1
let http_future = http_get(url);
*self = State1 { url, http_future };
}
State1 { http_future, .. } => {
// poll 内部的 future
match http_future.poll(cx) {
Poll::Ready(response) => {
let text_future = response.text();
*self = State2 { response, text_future };
}
Poll::Pending => return Poll::Pending,
}
}
State2 { text_future, .. } => {
match text_future.poll(cx) {
Poll::Ready(body) => {
*self = Done;
return Poll::Ready(body);
}
Poll::Pending => return Poll::Pending,
}
}
Done => panic!("poll after completion"),
}
}
}
}每个 .await
就是状态机的一个挂起点。这意味着:
- 零堆分配:状态机是一个
enum,大小在编译期确定,直接分配在调用者的栈上(直到被
spawn时才 Box 到堆上) - 零开销抽象:没有隐含的线程、没有隐含的堆分配、没有运行时类型信息
为什么 Rust 的 async 是 lazy 的
这是跟 Go 最本质的区别:
// Rust:创建 future 但不执行!
let fut = fetch_data("https://example.com");
// 到这里,没有任何网络请求发出
// 只有 poll 或 .await 才会推进
// Go:goroutine 立即开始执行
// go fetchData("https://example.com")
// 到这里,goroutine 已经在跑了Rust 创建 Future 不执行,Go 创建 goroutine 立即调度。Rust
的 Future 通过 drop 取消,Go 需要
context.Context。更完整的对比在下面调度器一节。
没有 runtime,Future 不会跑
这一点怎么强调都不为过:
fn main() {
// 这个 future 永远不会执行
// 编译器会警告:unused future that must be used
async { println!("hello") };
// 你需要一个 runtime
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async {
println!("这次真的跑了");
});
}#[tokio::main]
宏只是帮你生成了上面的样板代码。Rust
的设计哲学是”你不用的东西不应该有开销”——所以 runtime
不塞进语言里,让你自己选。
二、tokio 调度器:work-stealing 多线程
tokio 的调度器是整个 runtime 的心脏。它的任务很简单:把 Future 分配给线程去 poll。但”简单”的任务做到极致性能,需要精心设计的数据结构。
三层队列结构
┌────────────────────────────────────────────┐
│ 全局注入队列 (Global Inject) │
│ Mutex<VecDeque<Task>> │
│ 跨线程安全,但获取需要加锁 │
└──────────┬──────────────┬──────────────┬───┘
│ │ │
┌─────▼─────┐ ┌────▼─────┐ ┌────▼─────┐
│ Worker 0 │ │ Worker 1 │ │ Worker N │
│ ┌────────┐ │ │┌────────┐│ │┌────────┐│
│ │LIFO slot│ │ ││LIFO slot││ ││LIFO slot││
│ └────────┘ │ │└────────┘│ │└────────┘│
│ ┌────────┐ │ │┌────────┐│ │┌────────┐│
│ │本地队列 │ │ ││本地队列 ││ ││本地队列 ││
│ │lock-free│ │ ││lock-free││ ││lock-free││
│ └────────┘ │ │└────────┘│ │└────────┘│
└────────────┘ └──────────┘ └──────────┘
每个 Worker 线程拿任务的优先级:
- LIFO slot(最高优先级):最后放进去的任务最先执行,利用 CPU cache 热度
- 本地队列:lock-free 的固定大小环形缓冲区(256 slots)
- 全局队列:加锁访问,但不是每次都检查(每 poll 61 次检查一次全局队列,避免本地任务饿死)
- 偷其他 Worker 的本地队列:最后的手段
// tokio 调度循环的简化逻辑
fn worker_run(worker: &Worker) {
let mut tick: u32 = 0;
loop {
tick = tick.wrapping_add(1);
let task = if tick % 61 == 0 {
// 每 61 次 poll,优先检查全局队列
// 防止全局任务饿死
worker.global_queue.pop()
.or_else(|| worker.lifo_slot.take())
.or_else(|| worker.local_queue.pop())
} else {
// 正常路径:LIFO slot → 本地 → 全局 → steal
worker.lifo_slot.take()
.or_else(|| worker.local_queue.pop())
.or_else(|| worker.global_queue.pop())
.or_else(|| worker.steal_from_others())
};
match task {
Some(task) => task.poll(), // 执行任务
None => worker.park(), // 没活干了,休眠
}
}
}Work-Stealing:空闲线程从繁忙线程偷任务
当一个 Worker 线程自己的队列和全局队列都空了,它不会傻等——它会去偷别的 Worker 的任务:
// 偷取逻辑的简化版
fn steal_from_others(&self) -> Option<Task> {
let num_workers = self.workers.len();
// 从随机位置开始遍历,避免所有空闲线程都去偷同一个
let start = random::<usize>() % num_workers;
for i in 0..num_workers {
let target = (start + i) % num_workers;
if target == self.index {
continue; // 不偷自己
}
// 关键:偷一半!不是偷一个
// 这样可以减少偷取次数,摊薄同步开销
if let Some(tasks) = self.workers[target].local_queue.steal_half() {
return Some(tasks);
}
}
None
}为什么偷一半而不是偷一个?
- 偷一个:同步开销 / 获得的任务 = 高,频繁跨线程同步
- 偷一半:一次同步操作获得多个任务,摊薄了原子操作的开销
- 这是 tokio 的选择;Go 的调度器默认偷一半(从 target P 的
runq 偷
len/2),思路一致
与 Go 调度器的 GMP 模型对比
读过 Go 调度器那篇 的话,直接看表:
| 维度 | tokio (Rust) | Go GMP |
|---|---|---|
| 调度单元 | Task(Box<dyn Future>,无栈状态机) |
Goroutine(有栈协程,初始 2KB 可增长) |
| 抢占方式 | 协作式——仅在
.await 点让出 |
异步抢占——sysmon 发信号强制让出(Go 1.14+) |
| I/O 模型 | epoll readiness → Waker 唤醒 task | netpoller (epoll) → 唤醒 goroutine |
| 栈模型 | 无栈:编译期 enum,大小确定 | 有栈:运行时分配,按需 2× 扩容 |
| 典型开销 | ~数百字节/task,零堆分配(spawn 前) | ~2KB/goroutine + 栈增长拷贝开销 |
| 本地队列 | 256-slot 环形缓冲区 | 256-slot 环形缓冲区 |
| 偷取策略 | 偷一半 | 偷一半 |
| 阻塞处理 | spawn_blocking
独立线程池 |
M 与 P 解绑,创建新 M |
核心区别就一句话:tokio 无栈,只在
.await 让出;Go
有栈,死循环也能被抢占。 async fn
里写了 CPU 死循环,该 Worker 就废了;Go 里 sysmon
会发信号把它踢下来。这是两种设计哲学的根本分歧——Rust
选择零开销但要求开发者自律,Go
选择安全网但付出运行时成本。
两种运行模式
// 多线程模式(默认):N 个 Worker 线程 + work-stealing
// 适合:生产环境、CPU 密集型混合 I/O 的场景
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 默认等于 CPU 核心数
.build()
.unwrap();
// 单线程模式:所有任务在一个线程上协作调度
// 适合:测试、简单脚本、确定性调试
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();单线程模式不需要 work-stealing,也不需要原子操作来保护队列——所有东西都在一个线程上。在写单元测试时特别有用,因为行为是确定性的。
三、Waker 机制:谁来叫醒睡着的 Future
Future 返回 Pending 之后,谁来负责再次 poll
它?答案是 Waker。
Context 和 Waker 的关系
回顾 poll 的签名:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;Context 里面最重要的东西就是一个
Waker:
// 标准库中的 Context,简化版
pub struct Context<'a> {
waker: &'a Waker,
// Rust 1.82+ 还有 local_waker 等扩展
}
impl<'a> Context<'a> {
pub fn waker(&self) -> &'a Waker {
self.waker
}
}当一个 Future 返回 Pending
时,它必须在某个时刻调用
waker.wake(),否则这个 Future 就永远不会被再次
poll——它会被遗忘。
wake() 到底做了什么
在 tokio 里,waker.wake()
的核心逻辑是:把 task
重新放回调度队列。
// tokio 的 waker 实现(简化)
impl Wake for TaskHandle {
fn wake(self) {
// 标记 task 为"需要 poll"
self.header.state.transition_to_notified();
// 把 task 放回调度队列
// 优先放回创建它的 worker 的本地队列
// 如果放不进去(满了或者跨线程),放全局队列
if let Some(worker) = self.bound_worker() {
worker.local_queue.push(self.task);
} else {
global_queue.push(self.task);
}
// 如果对应的 worker 正在休眠,叫醒它
self.scheduler.notify();
}
}完整的流程:
- I/O 就绪事件到达(比如 socket 可读)
- mio 通知 tokio 的 I/O Driver
- I/O Driver 找到注册在这个 socket 上的 Waker
- 调用
waker.wake() - task 被放回 Worker 的调度队列
- Worker 线程从队列取出 task,再次调用
Future::poll() - 这次 poll 能读到数据了,返回
Ready
自己实现一个最小的 Waker
为了理解 Waker 的本质,我们从零实现一个:
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::collections::VecDeque;
use std::thread;
use std::time::Duration;
// 一个极简的"运行时":就是一个任务队列
struct MiniRuntime {
// 待执行的任务列表
queue: Mutex<VecDeque<Arc<Task>>>,
}
struct Task {
// 被 Pin 住的 Future,类型擦除
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
// 持有运行时的引用,wake 时把自己重新放回队列
runtime: Arc<MiniRuntime>,
}
impl MiniRuntime {
fn new() -> Arc<Self> {
Arc::new(Self {
queue: Mutex::new(VecDeque::new()),
})
}
// 提交一个 future 到运行时
fn spawn(self: &Arc<Self>, future: impl Future<Output = ()> + Send + 'static) {
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
runtime: self.clone(),
});
self.queue.lock().unwrap().push_back(task);
}
// 不断 poll 队列里的任务,直到全部完成
fn run(&self) {
loop {
let task = self.queue.lock().unwrap().pop_front();
match task {
Some(task) => {
// 为这个 task 创建一个 waker
let waker = task_to_waker(task.clone());
let mut cx = Context::from_waker(&waker);
// poll 这个 future
let mut future = task.future.lock().unwrap();
match future.as_mut().poll(&mut cx) {
Poll::Ready(()) => {} // 完成了,不放回队列
Poll::Pending => {} // 没完成,等 wake() 放回来
}
}
None => break, // 队列空了,结束
}
}
}
}
// 注意:教学简化版 -- 生产代码请使用 Arc<Fn> 或 futures::task::ArcWake
// 下面手动管理 Arc 引用计数,必须保证 clone/wake/drop 的配对正确,
// 否则会导致 double-free 或内存泄漏。tokio 内部用宏生成来保证安全性。
fn task_to_waker(task: Arc<Task>) -> Waker {
unsafe fn clone_fn(ptr: *const ()) -> RawWaker {
// 从裸指针恢复 Arc,clone 后再 forget 原始 Arc 以维持引用计数
let task = Arc::from_raw(ptr as *const Task);
let cloned = Arc::clone(&task);
std::mem::forget(task); // 不要 drop 原来的,引用计数保持不变
RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
}
unsafe fn wake_fn(ptr: *const ()) {
// 取得所有权(引用计数不变),放回队列后 Arc 由队列持有
let task = Arc::from_raw(ptr as *const Task);
task.runtime.queue.lock().unwrap().push_back(Arc::clone(&task));
// task 在此被 drop,引用计数 -1,与 clone 的 +1 抵消
}
unsafe fn drop_fn(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const Task)); // 引用计数 -1
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone_fn,
wake_fn,
wake_fn, // wake_by_ref 简化为 wake_fn(教学用途,生产中应单独实现)
drop_fn,);
let raw = RawWaker::new(Arc::into_raw(task) as *const (), &VTABLE);
unsafe { Waker::from_raw(raw) }
}这段代码虽然简陋,但展示了 Waker 的核心:wake() = 把 task 放回队列。tokio 做的事情本质上一样,只是队列换成了 work-stealing 队列,加上了各种优化。
为什么 Waker 是类型擦除的(vtable)
你可能注意到了上面代码里的
RawWakerVTable。为什么 Waker 不用 trait
object(Box<dyn Wake>)而要手动搞
vtable?
// Waker 内部结构
pub struct Waker {
waker: RawWaker,
}
pub struct RawWaker {
data: *const (), // 指向具体数据(被类型擦除)
vtable: &'static RawWakerVTable, // 函数指针表
}
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}原因有三:
- 不依赖 alloc:Waker 在
core里定义,不能用Box(那需要alloc),嵌入式场景也要能用 - 大小固定:Waker 就是两个指针大小(16 bytes on 64-bit),可以直接在栈上传递,不需要间接寻址
- 灵活性:不要求实现者使用
Arc,你可以用任何内存管理方式(arena、static、甚至裸指针)
注:Rust 1.82 稳定了
Waketrait(在alloc里),如果你不在乎 no_std,可以直接impl Wake for MyType,不用手写 vtable 了。
四、I/O Driver:mio 和 epoll 的封装
async runtime 存在的核心理由是 I/O 多路复用:一个线程同时等待成千上万个 I/O 事件。tokio 的 I/O Driver 构建在 mio 之上。
mio 的角色
mio(Metal I/O)是一个跨平台的 I/O 事件通知库,它抹平了不同操作系统的差异:
| 操作系统 | 底层机制 | mio 封装后 |
|---|---|---|
| Linux | epoll | mio::Poll |
| macOS / BSD | kqueue | mio::Poll |
| Windows | IOCP | mio::Poll |
mio 做的事情很纯粹:
// mio 的核心 API(简化)
let mut poll = mio::Poll::new()?; // 创建 epoll 实例
let mut events = mio::Events::with_capacity(1024);
// 注册一个 TCP socket,关注"可读"事件
poll.registry().register(&mut tcp_stream,
mio::Token(42), // 这个 token 就是 task 的标识
mio::Interest::READABLE,)?;
// 阻塞等待事件(带超时)
poll.poll(&mut events, Some(Duration::from_millis(100)))?;
// 遍历就绪事件
for event in events.iter() {
match event.token() {
mio::Token(42) => {
// socket 可读了!唤醒对应的 task
}
_ => {}
}
}tokio 如何集成 mio
tokio 在 mio 之上做了一层封装,核心流程:
- 注册:当你创建
TcpStream::connect()时,tokio 把底层 socket fd 注册到 mio 的Poll实例 - 等待:I/O Driver 线程(通常是某个
Worker 线程兼任)调用
poll.poll()等待事件 - 唤醒:事件到达后,找到对应 Token →
找到对应 Task 的 Waker → 调用
wake() - 调度:Task 回到 Worker 队列,被 poll
时执行实际的
read()/write()
// tokio I/O Driver 的简化流程
fn io_driver_loop(driver: &IoDriver) {
let mut events = mio::Events::with_capacity(1024);
loop {
// 计算最近的 timer 超时,作为 poll 的超时时间
let timeout = driver.timer.next_deadline();
// 阻塞等待 I/O 事件(或超时)
driver.mio_poll.poll(&mut events, timeout)?;
// 处理 I/O 事件
for event in events.iter() {
// 根据 token 找到注册的 I/O 资源
if let Some(io_resource) = driver.resources.get(event.token()) {
// 更新就绪状态
io_resource.set_readiness(event.readiness());
// 唤醒等待这个资源的 task
io_resource.wake();
}
}
// 顺便处理到期的 timer(下一节详述)
driver.timer.process_expired_timers();
}
}注意一个关键设计:I/O Driver
不是独立线程。在 tokio 的多线程 runtime 中,某个
Worker 线程在没有任务执行时,会”兼任” I/O Driver
的角色,调用 poll.poll()
等待事件。这样节省了一个线程,也减少了跨线程通信。
tokio 的 io_uring 支持现状
标准 tokio 基于 epoll 的 readiness 模型——先查询”是否就绪”,再执行 I/O。而 io_uring 是 completion 模型——直接提交 I/O 操作,完成后收通知。
两种模型的冲突意味着 io_uring 不能简单塞进现有的 tokio
I/O Driver。社区的方案是 tokio-uring
crate:
// tokio-uring 的用法:completion-based I/O
// 注意:需要单独的 runtime,不是标准 tokio runtime
tokio_uring::start(async {
let file = tokio_uring::fs::File::open("data.txt").await.unwrap();
// buffer 的所有权转移给内核,完成后还回来
// 这是 completion model 的核心特征
let buf = vec![0u8; 4096];
let (result, buf) = file.read_at(buf, 0).await;
println!("读取了 {} 字节", result.unwrap());
});tokio-uring 目前仍是实验性的,主要限制:
- 仅支持
current_threadruntime(io_uring 的 SQ/CQ 不适合跨线程共享) - Buffer 所有权语义跟标准
AsyncRead/AsyncWritetrait 不兼容 - 生态库(hyper、tonic 等)尚未适配
五、Timer:时间轮 + 层级堆
tokio::time::sleep() 和
tokio::time::timeout() 是最常用的 timer
操作。它们背后是一个精心设计的时间轮数据结构。
sleep() 和 timeout() 的实现原理
// sleep 的实质:创建一个在指定时刻被唤醒的 Future
pub async fn sleep(duration: Duration) {
let deadline = Instant::now() + duration;
let entry = timer::Entry::new(deadline);
// 把 entry 注册到时间轮
timer::register(&entry);
// 挂起,等待时间轮在 deadline 到达时调用 wake()
entry.await;
}
// timeout 就是把 sleep 和目标 future 组合起来
pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, Elapsed> {
tokio::select! {
result = future => Ok(result), // future 先完成
_ = sleep(duration) => Err(Elapsed), // 超时了
}
}时间轮(Hierarchical Timing Wheel)
tokio 没有用
BinaryHeap(标准库的优先队列)来管理
timer,而是用了分层时间轮。为什么?
| 操作 | BinaryHeap | 时间轮 |
|---|---|---|
| 插入 timer | O(log n) | O(1) |
| 取消 timer | O(n) 查找 + O(log n) 调整 | O(1) |
| 推进到下一个到期 | O(log n) pop | O(1) 摊销 |
| 空间 | 紧凑 | 预分配 slots |
在高并发场景下(数万个活跃 timer),O(1) 插入和取消的优势是碾压级的。
时间轮的基本思路类似时钟:
层级时间轮结构(tokio 使用 6 层)
Level 0: 64 slots, 每 slot = 1ms → 覆盖 64ms
Level 1: 64 slots, 每 slot = 64ms → 覆盖 ~4s
Level 2: 64 slots, 每 slot = 4s → 覆盖 ~4min
Level 3: 64 slots, 每 slot = 4min → 覆盖 ~4hr
Level 4: 64 slots, 每 slot = 4hr → 覆盖 ~10days
Level 5: 64 slots, 每 slot = 10days → 覆盖 ~2years
每层是一个 64 格的环形数组,低层 slot 到期时会触发"级联":
把高层对应 slot 的 timer 移到更低层级,精确到毫秒。
// 时间轮的简化数据结构
struct TimerWheel {
// 6 层,每层 64 个 slot
levels: [Level; 6],
// 当前时间(毫秒精度)
elapsed: u64,
}
struct Level {
// 64 个 slot,每个 slot 是一个 timer 链表
slots: [LinkedList<TimerEntry>; 64],
// 这一层的时间粒度
granularity: u64,
}
impl TimerWheel {
// O(1) 插入:根据 deadline 计算应该放在哪一层的哪个 slot
fn insert(&mut self, entry: TimerEntry) {
let deadline = entry.deadline;
let diff = deadline - self.elapsed;
// 找到合适的层级
let level = self.level_for(diff);
// 计算 slot 索引
let slot = (deadline / level.granularity) % 64;
// 直接插入链表头部 —— O(1)
level.slots[slot].push_front(entry);
}
// 推进时间,触发到期的 timer
fn advance(&mut self, now: u64) {
while self.elapsed < now {
self.elapsed += 1;
// 检查最底层的当前 slot
let slot = (self.elapsed / 1) % 64;
for entry in self.levels[0].slots[slot].drain() {
// 到期了!调用 waker.wake()
entry.waker.wake();
}
// 在适当时机做级联(cascade)
// 高层 slot 的 timer 移到低层以获得更高精度
self.maybe_cascade();
}
}
}Timer 的精度和限制
- 精度:毫秒级。tokio 不是实时系统,timer 的精度受操作系统调度延迟影响
- 最小粒度:1ms。
sleep(Duration::from_nanos(1))实际上至少等 1ms - 最大范围:约 2 年(6 层 × 64 slots 的覆盖范围)
- 定时漂移:长时间运行的
interval()会有累积误差,tokio 提供MissedTickBehavior来选择处理策略
// interval 的漂移处理策略
use tokio::time::{interval, MissedTickBehavior};
let mut ticker = interval(Duration::from_millis(100));
// Burst:错过的 tick 立即连续触发(默认)
ticker.set_missed_tick_behavior(MissedTickBehavior::Burst);
// Delay:错过就错过,从当前时刻重新计时
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Skip:跳过错过的 tick,对齐到下一个整数倍时刻
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);六、性能调优和常见陷阱
理解了 tokio 的内部机制之后,很多”最佳实践”就不再是死记硬背,而是自然推导出来的结论。
陷阱 1:在 async context 里调用阻塞代码
这是最常见也最致命的错误:
// 否 错误:在 async 函数里调用同步阻塞 I/O
async fn bad_handler(path: &str) -> String {
// std::fs::read_to_string 会阻塞当前 OS 线程!
// 这个 Worker 线程在文件读取期间完全卡住
// 其他 task 无法被调度
let content = std::fs::read_to_string(path).unwrap();
content
}
// 是 正确:用 spawn_blocking 把阻塞操作移到独立线程池
async fn good_handler(path: String) -> String {
tokio::task::spawn_blocking(move || {
std::fs::read_to_string(&path).unwrap()
})
.await
.unwrap()
}
// 是 更好:直接用 tokio 的异步 fs API
async fn best_handler(path: &str) -> String {
// tokio::fs 内部也是 spawn_blocking,但 API 更干净
tokio::fs::read_to_string(path).await.unwrap()
}为什么这么严重? 因为 tokio 的 task
是协作式调度——只有在 .await 点才会让出
CPU。如果你在两个 .await
之间执行了耗时操作(无论是 CPU 密集还是阻塞 I/O),这个
Worker 线程上的所有其他 task 都会饿死。
陷阱 2:spawn_blocking 的线程池管理
spawn_blocking
不是万能的。它背后有一个独立的线程池,默认上限 512
个线程:
// spawn_blocking 线程池的配置
let rt = tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(512) // 默认值
.thread_keep_alive(Duration::from_secs(10)) // 空闲线程存活时间
.build()
.unwrap();如果你大量使用
spawn_blocking,可能会遇到:
- 线程池耗尽:512 个线程全忙,新的
spawn_blocking调用会排队等待 - 内存开销:每个 OS 线程默认 8MB 栈,512 个就是 4GB 虚拟内存
- 上下文切换开销:大量 OS 线程意味着频繁的内核态调度
经验法则:如果你发现 spawn_blocking
调用量很大,说明你应该用真正的异步替代方案,或者用专门的线程池。
陷阱 3:task 太多导致的调度延迟
tokio 的 Worker 线程数通常等于 CPU 核心数。如果你有 100 万个活跃 task,但只有 8 个 Worker:
// 模拟调度延迟
async fn measure_latency() {
let start = Instant::now();
// 创建大量 task 争抢调度
let mut handles = Vec::new();
for i in 0..1_000_000 {
handles.push(tokio::spawn(async move {
// 每个 task 做一点点工作
// 但 100 万个 task 排队等 8 个 Worker
tokio::task::yield_now().await;
}));
}
for handle in handles {
handle.await.unwrap();
}
// 在 8 核机器上,这可能需要几秒钟
// 每个 task 的平均调度延迟 = 总时间 / task 数
println!("总耗时: {:?}", start.elapsed());
}缓解策略:
- 用
tokio::task::yield_now()让长计算的 task 主动让出 - 用
tokio::sync::Semaphore控制并发 task 数量 - 对 CPU 密集型工作,用
spawn_blocking或rayon线程池
选择 multi-thread 还是 current-thread
| 场景 | 推荐 | 原因 |
|---|---|---|
| Web 服务器(生产) | multi_thread |
利用多核,I/O 密集型最优 |
| CLI 工具 | current_thread |
简单,开销低 |
| 单元测试 | current_thread |
确定性行为,便于调试 |
| 嵌入式 / WASM | current_thread |
可能只有一个核心 |
| CPU 密集型计算 | multi_thread +
spawn_blocking |
不要在 async 里做重计算 |
// 生产环境推荐配置
#[tokio::main]
async fn main() {
// 等价于 Builder::new_multi_thread()
// .worker_threads(num_cpus)
// .enable_all()
// .build()
server::run().await;
}
// 测试环境推荐配置
#[tokio::test]
async fn test_something() {
// 等价于 Builder::new_current_thread()
// .enable_all()
// .build()
assert_eq!(my_async_fn().await, 42);
}陷阱:Tokio 与标准库的混用
这是新手(甚至老手)最密集的踩坑区。Rust 标准库和 tokio 的同名 API 看起来一样,行为完全不同。
坑 1:async 里用
std::fs
// 否 灾难:阻塞整个 Worker 线程
async fn read_config() -> String {
// std::fs::read_to_string 是同步阻塞的!
// 文件在机械硬盘上?恭喜,这个 Worker 上的所有 task 陪你等 50ms
std::fs::read_to_string("config.toml").unwrap()
}
// 是 正确:用 spawn_blocking 隔离
async fn read_config() -> String {
tokio::task::spawn_blocking(|| {
std::fs::read_to_string("config.toml").unwrap()
}).await.unwrap()
}
// 是 更好:直接用 tokio::fs(内部就是 spawn_blocking 封装)
async fn read_config() -> String {
tokio::fs::read_to_string("config.toml").await.unwrap()
}坑 2:async 里用
std::sync::Mutex
// 否 危险:如果持锁期间有 .await,可能死锁
async fn bad_update(data: Arc<std::sync::Mutex<Vec<String>>>) {
let mut guard = data.lock().unwrap();
// 持有 std::sync::Mutex 的 guard 跨越了 .await 点
// 如果这个 task 在 .await 时被挂起,其他 task 无法获取锁
// 而且这个 task 可能被调度到不同的 Worker 线程上恢复——
// std::sync::MutexGuard 不是 Send 的,编译器会报错(这是好事)
let result = fetch_something().await;
guard.push(result);
}
// 是 正确:用 tokio::sync::Mutex(支持跨 .await 持有)
async fn good_update(data: Arc<tokio::sync::Mutex<Vec<String>>>) {
let mut guard = data.lock().await; // 注意这里是 .await
let result = fetch_something().await;
guard.push(result);
}
// 是 更好:如果不需要跨 .await 持锁,std::sync::Mutex 反而更快
async fn best_update(data: Arc<std::sync::Mutex<Vec<String>>>) {
let result = fetch_something().await;
// 先完成异步操作,再短暂持锁同步修改
data.lock().unwrap().push(result);
// guard 立即 drop,不跨 .await——没问题
}经验法则:锁不跨 .await → 用
std::sync::Mutex(更快,无 overhead);锁跨
.await → 用
tokio::sync::Mutex。
坑 3:在 tokio runtime 里用
std::thread::spawn
// 否 常见误解:想"并行"但绕过了调度器
async fn handle_request() {
// std::thread::spawn 创建的线程不受 tokio 管理
// 里面不能用 tokio::spawn、tokio::time::sleep 等异步 API
// 除非你手动在新线程里创建 runtime(浪费资源)
std::thread::spawn(|| {
// tokio::time::sleep(Duration::from_secs(1)).await; // 编译错误!
});
}
// 是 正确:用 tokio::spawn 做并发
async fn handle_request() {
tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(1)).await;
// 这个 task 由 tokio 调度器管理
});
}
// 是 正确:真的需要 OS 线程做 CPU 密集计算,用 spawn_blocking
async fn handle_request() {
let result = tokio::task::spawn_blocking(|| {
// 这里跑 CPU 密集逻辑,不影响 async 调度器
heavy_computation()
}).await.unwrap();
}速查表:标准库 vs tokio 对照
| 场景 | 否 不要在 async 里用 | 是 正确替代 |
|---|---|---|
| 文件 I/O | std::fs::* |
tokio::fs::* 或
spawn_blocking |
| 线程睡眠 | std::thread::sleep |
tokio::time::sleep |
| 互斥锁(跨 await) | std::sync::Mutex |
tokio::sync::Mutex |
| 通道 | std::sync::mpsc |
tokio::sync::mpsc |
| 创建并发任务 | std::thread::spawn |
tokio::spawn |
| DNS 解析 | std::net::ToSocketAddrs |
tokio::net::lookup_host |
总结
tokio 的 ~40000 行代码做了这些事:
- 调度器:work-stealing 多线程队列,让 CPU 一刻不停
- Waker:类型擦除的唤醒机制,连接 I/O 事件和任务调度
- I/O Driver:基于 mio 封装 epoll/kqueue/IOCP,一个线程等万级连接
- Timer:分层时间轮,O(1) 插入和取消
跟 Go 的 GMP 调度器 相比,tokio 选择了”无栈协程 + 编译期状态机”的路线,换来零开销抽象和极致的内存效率,代价是更陡峭的学习曲线和”必须在 .await 点让出”的协作式限制。
两种设计没有绝对优劣——Go 选择了”让大多数开发者轻松写并发”,Rust 选择了”让需要极致性能的开发者有完全的控制权”。理解它们的内部机制,才能在合适的场景选择合适的工具。
延伸阅读