土法炼钢兴趣小组的算法知识备份

Rust async 运行时拆解:tokio 的调度器到底在干什么

目录

Rust 的 async/await 只是语法糖。

编译器帮你把 async fn 变成一个状态机,但它不负责执行。真正的魔法在 runtime——而 tokio 用大约 40000 行代码,管理你每一个 .await 的挂起、唤醒、调度和 I/O。

如果你写过 Go,你知道 go func() 背后有 GMP 调度器 在默默工作。Rust 的区别在于:语言本身不包含 runtime,你得自己选一个。tokio 是事实标准,但它到底在干什么?

本文基于 tokio 1.x 源码,从五个层面拆解这个运行时:

  1. Rust async 模型本身的设计
  2. work-stealing 多线程调度器
  3. Waker 唤醒机制
  4. I/O 驱动层(mio + epoll)
  5. Timer 时间轮

如果你对 Linux 底层的异步 I/O 感兴趣,推荐先看 io_uring 系列,那是另一条技术路线。

tokio 运行时架构图

一、Rust async 模型速览

Future trait:一切的起点

Rust 的异步模型建立在一个 trait 上:

// 标准库的 Future trait,简化版
trait Future {
  type Output;

  // runtime 反复调用 poll,直到返回 Ready
  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
  Ready(T),  // 完成了,拿到结果
  Pending,  // 还没好,先挂起
}

这是一个 pull-based 模型:runtime 主动去”拉”(poll)Future 的进度,而不是 Future 主动”推”结果。每次 poll 要么得到结果(Ready),要么得到”还没好”(Pending)。

编译器把 async fn 变成状态机

当你写这样的代码:

// 一个简单的 async 函数
async fn fetch_data(url: &str) -> String {
  // 第一个 .await —— 状态机的第一个挂起点
  let response = http_get(url).await;
  // 第二个 .await —— 状态机的第二个挂起点
  let body = response.text().await;
  body
}

编译器大致会把它变成:

// 编译器生成的状态机(伪代码)
enum FetchDataFuture<'a> {
  // 初始状态:还没开始
  State0 { url: &'a str },
  // 第一个 await 挂起:等 http_get 完成
  State1 { url: &'a str, http_future: HttpGetFuture<'a> },
  // 第二个 await 挂起:等 text() 完成
  State2 { response: Response, text_future: TextFuture },
  // 已完成
  Done,
}

impl<'a> Future for FetchDataFuture<'a> {
  type Output = String;

  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
  loop {
  match self.state {
  State0 { url } => {
  // 创建 http_get future,转移到 State1
  let http_future = http_get(url);
  *self = State1 { url, http_future };
  }
  State1 { http_future, .. } => {
  // poll 内部的 future
  match http_future.poll(cx) {
  Poll::Ready(response) => {
  let text_future = response.text();
  *self = State2 { response, text_future };
  }
  Poll::Pending => return Poll::Pending,
  }
  }
  State2 { text_future, .. } => {
  match text_future.poll(cx) {
  Poll::Ready(body) => {
  *self = Done;
  return Poll::Ready(body);
  }
  Poll::Pending => return Poll::Pending,
  }
  }
  Done => panic!("poll after completion"),
  }
  }
  }
}

每个 .await 就是状态机的一个挂起点。这意味着:

为什么 Rust 的 async 是 lazy 的

这是跟 Go 最本质的区别:

// Rust:创建 future 但不执行!
let fut = fetch_data("https://example.com");
// 到这里,没有任何网络请求发出
// 只有 poll 或 .await 才会推进

// Go:goroutine 立即开始执行
// go fetchData("https://example.com")
// 到这里,goroutine 已经在跑了

Rust 创建 Future 不执行,Go 创建 goroutine 立即调度。Rust 的 Future 通过 drop 取消,Go 需要 context.Context。更完整的对比在下面调度器一节。

没有 runtime,Future 不会跑

这一点怎么强调都不为过:

fn main() {
  // 这个 future 永远不会执行
  // 编译器会警告:unused future that must be used
  async { println!("hello") };

  // 你需要一个 runtime
  tokio::runtime::Runtime::new()
  .unwrap()
  .block_on(async {
  println!("这次真的跑了");
  });
}

#[tokio::main] 宏只是帮你生成了上面的样板代码。Rust 的设计哲学是”你不用的东西不应该有开销”——所以 runtime 不塞进语言里,让你自己选。


二、tokio 调度器:work-stealing 多线程

tokio 的调度器是整个 runtime 的心脏。它的任务很简单:把 Future 分配给线程去 poll。但”简单”的任务做到极致性能,需要精心设计的数据结构。

三层队列结构

┌────────────────────────────────────────────┐
│  全局注入队列 (Global Inject)  │
│  Mutex<VecDeque<Task>>  │
│  跨线程安全,但获取需要加锁  │
└──────────┬──────────────┬──────────────┬───┘
  │  │  │
  ┌─────▼─────┐  ┌────▼─────┐  ┌────▼─────┐
  │  Worker 0  │  │ Worker 1  │  │ Worker N  │
  │ ┌────────┐ │  │┌────────┐│  │┌────────┐│
  │ │LIFO slot│ │  ││LIFO slot││  ││LIFO slot││
  │ └────────┘ │  │└────────┘│  │└────────┘│
  │ ┌────────┐ │  │┌────────┐│  │┌────────┐│
  │ │本地队列 │ │  ││本地队列 ││  ││本地队列 ││
  │ │lock-free│ │  ││lock-free││  ││lock-free││
  │ └────────┘ │  │└────────┘│  │└────────┘│
  └────────────┘  └──────────┘  └──────────┘

每个 Worker 线程拿任务的优先级:

  1. LIFO slot(最高优先级):最后放进去的任务最先执行,利用 CPU cache 热度
  2. 本地队列:lock-free 的固定大小环形缓冲区(256 slots)
  3. 全局队列:加锁访问,但不是每次都检查(每 poll 61 次检查一次全局队列,避免本地任务饿死)
  4. 偷其他 Worker 的本地队列:最后的手段
// tokio 调度循环的简化逻辑
fn worker_run(worker: &Worker) {
  let mut tick: u32 = 0;
  loop {
  tick = tick.wrapping_add(1);

  let task = if tick % 61 == 0 {
  // 每 61 次 poll,优先检查全局队列
  // 防止全局任务饿死
  worker.global_queue.pop()
  .or_else(|| worker.lifo_slot.take())
  .or_else(|| worker.local_queue.pop())
  } else {
  // 正常路径:LIFO slot → 本地 → 全局 → steal
  worker.lifo_slot.take()
  .or_else(|| worker.local_queue.pop())
  .or_else(|| worker.global_queue.pop())
  .or_else(|| worker.steal_from_others())
  };

  match task {
  Some(task) => task.poll(),  // 执行任务
  None => worker.park(),  // 没活干了,休眠
  }
  }
}

Work-Stealing:空闲线程从繁忙线程偷任务

当一个 Worker 线程自己的队列和全局队列都空了,它不会傻等——它会去别的 Worker 的任务:

// 偷取逻辑的简化版
fn steal_from_others(&self) -> Option<Task> {
  let num_workers = self.workers.len();
  // 从随机位置开始遍历,避免所有空闲线程都去偷同一个
  let start = random::<usize>() % num_workers;

  for i in 0..num_workers {
  let target = (start + i) % num_workers;
  if target == self.index {
  continue; // 不偷自己
  }

  // 关键:偷一半!不是偷一个
  // 这样可以减少偷取次数,摊薄同步开销
  if let Some(tasks) = self.workers[target].local_queue.steal_half() {
  return Some(tasks);
  }
  }
  None
}

为什么偷一半而不是偷一个?

与 Go 调度器的 GMP 模型对比

读过 Go 调度器那篇 的话,直接看表:

维度 tokio (Rust) Go GMP
调度单元 Task(Box<dyn Future>,无栈状态机) Goroutine(有栈协程,初始 2KB 可增长)
抢占方式 协作式——仅在 .await 点让出 异步抢占——sysmon 发信号强制让出(Go 1.14+)
I/O 模型 epoll readiness → Waker 唤醒 task netpoller (epoll) → 唤醒 goroutine
栈模型 无栈:编译期 enum,大小确定 有栈:运行时分配,按需 2× 扩容
典型开销 ~数百字节/task,零堆分配(spawn 前) ~2KB/goroutine + 栈增长拷贝开销
本地队列 256-slot 环形缓冲区 256-slot 环形缓冲区
偷取策略 偷一半 偷一半
阻塞处理 spawn_blocking 独立线程池 M 与 P 解绑,创建新 M

核心区别就一句话:tokio 无栈,只在 .await 让出;Go 有栈,死循环也能被抢占。 async fn 里写了 CPU 死循环,该 Worker 就废了;Go 里 sysmon 会发信号把它踢下来。这是两种设计哲学的根本分歧——Rust 选择零开销但要求开发者自律,Go 选择安全网但付出运行时成本。

两种运行模式

// 多线程模式(默认):N 个 Worker 线程 + work-stealing
// 适合:生产环境、CPU 密集型混合 I/O 的场景
let rt = tokio::runtime::Builder::new_multi_thread()
  .worker_threads(4)  // 默认等于 CPU 核心数
  .build()
  .unwrap();

// 单线程模式:所有任务在一个线程上协作调度
// 适合:测试、简单脚本、确定性调试
let rt = tokio::runtime::Builder::new_current_thread()
  .enable_all()
  .build()
  .unwrap();

单线程模式不需要 work-stealing,也不需要原子操作来保护队列——所有东西都在一个线程上。在写单元测试时特别有用,因为行为是确定性的。


三、Waker 机制:谁来叫醒睡着的 Future

Future 返回 Pending 之后,谁来负责再次 poll 它?答案是 Waker

Context 和 Waker 的关系

回顾 poll 的签名:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

Context 里面最重要的东西就是一个 Waker

// 标准库中的 Context,简化版
pub struct Context<'a> {
  waker: &'a Waker,
  // Rust 1.82+ 还有 local_waker 等扩展
}

impl<'a> Context<'a> {
  pub fn waker(&self) -> &'a Waker {
  self.waker
  }
}

当一个 Future 返回 Pending 时,它必须在某个时刻调用 waker.wake(),否则这个 Future 就永远不会被再次 poll——它会被遗忘。

wake() 到底做了什么

在 tokio 里,waker.wake() 的核心逻辑是:把 task 重新放回调度队列

// tokio 的 waker 实现(简化)
impl Wake for TaskHandle {
  fn wake(self) {
  // 标记 task 为"需要 poll"
  self.header.state.transition_to_notified();

  // 把 task 放回调度队列
  // 优先放回创建它的 worker 的本地队列
  // 如果放不进去(满了或者跨线程),放全局队列
  if let Some(worker) = self.bound_worker() {
  worker.local_queue.push(self.task);
  } else {
  global_queue.push(self.task);
  }

  // 如果对应的 worker 正在休眠,叫醒它
  self.scheduler.notify();
  }
}

完整的流程:

  1. I/O 就绪事件到达(比如 socket 可读)
  2. mio 通知 tokio 的 I/O Driver
  3. I/O Driver 找到注册在这个 socket 上的 Waker
  4. 调用 waker.wake()
  5. task 被放回 Worker 的调度队列
  6. Worker 线程从队列取出 task,再次调用 Future::poll()
  7. 这次 poll 能读到数据了,返回 Ready

自己实现一个最小的 Waker

为了理解 Waker 的本质,我们从零实现一个:

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::collections::VecDeque;
use std::thread;
use std::time::Duration;

// 一个极简的"运行时":就是一个任务队列
struct MiniRuntime {
  // 待执行的任务列表
  queue: Mutex<VecDeque<Arc<Task>>>,
}

struct Task {
  // 被 Pin 住的 Future,类型擦除
  future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
  // 持有运行时的引用,wake 时把自己重新放回队列
  runtime: Arc<MiniRuntime>,
}

impl MiniRuntime {
  fn new() -> Arc<Self> {
  Arc::new(Self {
  queue: Mutex::new(VecDeque::new()),
  })
  }

  // 提交一个 future 到运行时
  fn spawn(self: &Arc<Self>, future: impl Future<Output = ()> + Send + 'static) {
  let task = Arc::new(Task {
  future: Mutex::new(Box::pin(future)),
  runtime: self.clone(),
  });
  self.queue.lock().unwrap().push_back(task);
  }

  // 不断 poll 队列里的任务,直到全部完成
  fn run(&self) {
  loop {
  let task = self.queue.lock().unwrap().pop_front();
  match task {
  Some(task) => {
  // 为这个 task 创建一个 waker
  let waker = task_to_waker(task.clone());
  let mut cx = Context::from_waker(&waker);
  // poll 这个 future
  let mut future = task.future.lock().unwrap();
  match future.as_mut().poll(&mut cx) {
  Poll::Ready(()) => {} // 完成了,不放回队列
  Poll::Pending => {}  // 没完成,等 wake() 放回来
  }
  }
  None => break, // 队列空了,结束
  }
  }
  }
}

// 注意:教学简化版 -- 生产代码请使用 Arc<Fn> 或 futures::task::ArcWake
// 下面手动管理 Arc 引用计数,必须保证 clone/wake/drop 的配对正确,
// 否则会导致 double-free 或内存泄漏。tokio 内部用宏生成来保证安全性。
fn task_to_waker(task: Arc<Task>) -> Waker {
  unsafe fn clone_fn(ptr: *const ()) -> RawWaker {
  // 从裸指针恢复 Arc,clone 后再 forget 原始 Arc 以维持引用计数
  let task = Arc::from_raw(ptr as *const Task);
  let cloned = Arc::clone(&task);
  std::mem::forget(task); // 不要 drop 原来的,引用计数保持不变
  RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
  }

  unsafe fn wake_fn(ptr: *const ()) {
  // 取得所有权(引用计数不变),放回队列后 Arc 由队列持有
  let task = Arc::from_raw(ptr as *const Task);
  task.runtime.queue.lock().unwrap().push_back(Arc::clone(&task));
  // task 在此被 drop,引用计数 -1,与 clone 的 +1 抵消
  }

  unsafe fn drop_fn(ptr: *const ()) {
  drop(Arc::from_raw(ptr as *const Task)); // 引用计数 -1
  }

  static VTABLE: RawWakerVTable = RawWakerVTable::new(clone_fn,
  wake_fn,
  wake_fn, // wake_by_ref 简化为 wake_fn(教学用途,生产中应单独实现)
  drop_fn,);

  let raw = RawWaker::new(Arc::into_raw(task) as *const (), &VTABLE);
  unsafe { Waker::from_raw(raw) }
}

这段代码虽然简陋,但展示了 Waker 的核心:wake() = 把 task 放回队列。tokio 做的事情本质上一样,只是队列换成了 work-stealing 队列,加上了各种优化。

为什么 Waker 是类型擦除的(vtable)

你可能注意到了上面代码里的 RawWakerVTable。为什么 Waker 不用 trait object(Box<dyn Wake>)而要手动搞 vtable?

// Waker 内部结构
pub struct Waker {
  waker: RawWaker,
}

pub struct RawWaker {
  data: *const (),  // 指向具体数据(被类型擦除)
  vtable: &'static RawWakerVTable, // 函数指针表
}

pub struct RawWakerVTable {
  clone: unsafe fn(*const ()) -> RawWaker,
  wake: unsafe fn(*const ()),
  wake_by_ref: unsafe fn(*const ()),
  drop: unsafe fn(*const ()),
}

原因有三:

  1. 不依赖 alloc:Waker 在 core 里定义,不能用 Box(那需要 alloc),嵌入式场景也要能用
  2. 大小固定:Waker 就是两个指针大小(16 bytes on 64-bit),可以直接在栈上传递,不需要间接寻址
  3. 灵活性:不要求实现者使用 Arc,你可以用任何内存管理方式(arena、static、甚至裸指针)

注:Rust 1.82 稳定了 Wake trait(在 alloc 里),如果你不在乎 no_std,可以直接 impl Wake for MyType,不用手写 vtable 了。


四、I/O Driver:mio 和 epoll 的封装

async runtime 存在的核心理由是 I/O 多路复用:一个线程同时等待成千上万个 I/O 事件。tokio 的 I/O Driver 构建在 mio 之上。

mio 的角色

mio(Metal I/O)是一个跨平台的 I/O 事件通知库,它抹平了不同操作系统的差异:

操作系统 底层机制 mio 封装后
Linux epoll mio::Poll
macOS / BSD kqueue mio::Poll
Windows IOCP mio::Poll

mio 做的事情很纯粹:

// mio 的核心 API(简化)
let mut poll = mio::Poll::new()?;  // 创建 epoll 实例
let mut events = mio::Events::with_capacity(1024);

// 注册一个 TCP socket,关注"可读"事件
poll.registry().register(&mut tcp_stream,
  mio::Token(42),  // 这个 token 就是 task 的标识
  mio::Interest::READABLE,)?;

// 阻塞等待事件(带超时)
poll.poll(&mut events, Some(Duration::from_millis(100)))?;

// 遍历就绪事件
for event in events.iter() {
  match event.token() {
  mio::Token(42) => {
  // socket 可读了!唤醒对应的 task
  }
  _ => {}
  }
}

tokio 如何集成 mio

tokio 在 mio 之上做了一层封装,核心流程:

  1. 注册:当你创建 TcpStream::connect() 时,tokio 把底层 socket fd 注册到 mio 的 Poll 实例
  2. 等待:I/O Driver 线程(通常是某个 Worker 线程兼任)调用 poll.poll() 等待事件
  3. 唤醒:事件到达后,找到对应 Token → 找到对应 Task 的 Waker → 调用 wake()
  4. 调度:Task 回到 Worker 队列,被 poll 时执行实际的 read() / write()
// tokio I/O Driver 的简化流程
fn io_driver_loop(driver: &IoDriver) {
  let mut events = mio::Events::with_capacity(1024);

  loop {
  // 计算最近的 timer 超时,作为 poll 的超时时间
  let timeout = driver.timer.next_deadline();

  // 阻塞等待 I/O 事件(或超时)
  driver.mio_poll.poll(&mut events, timeout)?;

  // 处理 I/O 事件
  for event in events.iter() {
  // 根据 token 找到注册的 I/O 资源
  if let Some(io_resource) = driver.resources.get(event.token()) {
  // 更新就绪状态
  io_resource.set_readiness(event.readiness());
  // 唤醒等待这个资源的 task
  io_resource.wake();
  }
  }

  // 顺便处理到期的 timer(下一节详述)
  driver.timer.process_expired_timers();
  }
}

注意一个关键设计:I/O Driver 不是独立线程。在 tokio 的多线程 runtime 中,某个 Worker 线程在没有任务执行时,会”兼任” I/O Driver 的角色,调用 poll.poll() 等待事件。这样节省了一个线程,也减少了跨线程通信。

tokio 的 io_uring 支持现状

标准 tokio 基于 epoll 的 readiness 模型——先查询”是否就绪”,再执行 I/O。而 io_uring 是 completion 模型——直接提交 I/O 操作,完成后收通知。

两种模型的冲突意味着 io_uring 不能简单塞进现有的 tokio I/O Driver。社区的方案是 tokio-uring crate:

// tokio-uring 的用法:completion-based I/O
// 注意:需要单独的 runtime,不是标准 tokio runtime
tokio_uring::start(async {
  let file = tokio_uring::fs::File::open("data.txt").await.unwrap();

  // buffer 的所有权转移给内核,完成后还回来
  // 这是 completion model 的核心特征
  let buf = vec![0u8; 4096];
  let (result, buf) = file.read_at(buf, 0).await;

  println!("读取了 {} 字节", result.unwrap());
});

tokio-uring 目前仍是实验性的,主要限制:


五、Timer:时间轮 + 层级堆

tokio::time::sleep()tokio::time::timeout() 是最常用的 timer 操作。它们背后是一个精心设计的时间轮数据结构。

sleep() 和 timeout() 的实现原理

// sleep 的实质:创建一个在指定时刻被唤醒的 Future
pub async fn sleep(duration: Duration) {
  let deadline = Instant::now() + duration;
  let entry = timer::Entry::new(deadline);

  // 把 entry 注册到时间轮
  timer::register(&entry);

  // 挂起,等待时间轮在 deadline 到达时调用 wake()
  entry.await;
}

// timeout 就是把 sleep 和目标 future 组合起来
pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, Elapsed> {
  tokio::select! {
  result = future => Ok(result),  // future 先完成
  _ = sleep(duration) => Err(Elapsed), // 超时了
  }
}

时间轮(Hierarchical Timing Wheel)

tokio 没有用 BinaryHeap(标准库的优先队列)来管理 timer,而是用了分层时间轮。为什么?

操作 BinaryHeap 时间轮
插入 timer O(log n) O(1)
取消 timer O(n) 查找 + O(log n) 调整 O(1)
推进到下一个到期 O(log n) pop O(1) 摊销
空间 紧凑 预分配 slots

在高并发场景下(数万个活跃 timer),O(1) 插入和取消的优势是碾压级的。

时间轮的基本思路类似时钟:

层级时间轮结构(tokio 使用 6 层)

Level 0:  64 slots, 每 slot = 1ms  → 覆盖 64ms
Level 1:  64 slots, 每 slot = 64ms  → 覆盖 ~4s
Level 2:  64 slots, 每 slot = 4s  → 覆盖 ~4min
Level 3:  64 slots, 每 slot = 4min  → 覆盖 ~4hr
Level 4:  64 slots, 每 slot = 4hr  → 覆盖 ~10days
Level 5:  64 slots, 每 slot = 10days → 覆盖 ~2years

每层是一个 64 格的环形数组,低层 slot 到期时会触发"级联":
把高层对应 slot 的 timer 移到更低层级,精确到毫秒。
// 时间轮的简化数据结构
struct TimerWheel {
  // 6 层,每层 64 个 slot
  levels: [Level; 6],
  // 当前时间(毫秒精度)
  elapsed: u64,
}

struct Level {
  // 64 个 slot,每个 slot 是一个 timer 链表
  slots: [LinkedList<TimerEntry>; 64],
  // 这一层的时间粒度
  granularity: u64,
}

impl TimerWheel {
  // O(1) 插入:根据 deadline 计算应该放在哪一层的哪个 slot
  fn insert(&mut self, entry: TimerEntry) {
  let deadline = entry.deadline;
  let diff = deadline - self.elapsed;

  // 找到合适的层级
  let level = self.level_for(diff);
  // 计算 slot 索引
  let slot = (deadline / level.granularity) % 64;
  // 直接插入链表头部 —— O(1)
  level.slots[slot].push_front(entry);
  }

  // 推进时间,触发到期的 timer
  fn advance(&mut self, now: u64) {
  while self.elapsed < now {
  self.elapsed += 1;

  // 检查最底层的当前 slot
  let slot = (self.elapsed / 1) % 64;
  for entry in self.levels[0].slots[slot].drain() {
  // 到期了!调用 waker.wake()
  entry.waker.wake();
  }

  // 在适当时机做级联(cascade)
  // 高层 slot 的 timer 移到低层以获得更高精度
  self.maybe_cascade();
  }
  }
}

Timer 的精度和限制

// interval 的漂移处理策略
use tokio::time::{interval, MissedTickBehavior};

let mut ticker = interval(Duration::from_millis(100));

// Burst:错过的 tick 立即连续触发(默认)
ticker.set_missed_tick_behavior(MissedTickBehavior::Burst);

// Delay:错过就错过,从当前时刻重新计时
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);

// Skip:跳过错过的 tick,对齐到下一个整数倍时刻
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);

六、性能调优和常见陷阱

理解了 tokio 的内部机制之后,很多”最佳实践”就不再是死记硬背,而是自然推导出来的结论。

陷阱 1:在 async context 里调用阻塞代码

这是最常见也最致命的错误:

// 否 错误:在 async 函数里调用同步阻塞 I/O
async fn bad_handler(path: &str) -> String {
  // std::fs::read_to_string 会阻塞当前 OS 线程!
  // 这个 Worker 线程在文件读取期间完全卡住
  // 其他 task 无法被调度
  let content = std::fs::read_to_string(path).unwrap();
  content
}

// 是 正确:用 spawn_blocking 把阻塞操作移到独立线程池
async fn good_handler(path: String) -> String {
  tokio::task::spawn_blocking(move || {
  std::fs::read_to_string(&path).unwrap()
  })
  .await
  .unwrap()
}

// 是 更好:直接用 tokio 的异步 fs API
async fn best_handler(path: &str) -> String {
  // tokio::fs 内部也是 spawn_blocking,但 API 更干净
  tokio::fs::read_to_string(path).await.unwrap()
}

为什么这么严重? 因为 tokio 的 task 是协作式调度——只有在 .await 点才会让出 CPU。如果你在两个 .await 之间执行了耗时操作(无论是 CPU 密集还是阻塞 I/O),这个 Worker 线程上的所有其他 task 都会饿死

陷阱 2:spawn_blocking 的线程池管理

spawn_blocking 不是万能的。它背后有一个独立的线程池,默认上限 512 个线程:

// spawn_blocking 线程池的配置
let rt = tokio::runtime::Builder::new_multi_thread()
  .max_blocking_threads(512)  // 默认值
  .thread_keep_alive(Duration::from_secs(10)) // 空闲线程存活时间
  .build()
  .unwrap();

如果你大量使用 spawn_blocking,可能会遇到:

经验法则:如果你发现 spawn_blocking 调用量很大,说明你应该用真正的异步替代方案,或者用专门的线程池。

陷阱 3:task 太多导致的调度延迟

tokio 的 Worker 线程数通常等于 CPU 核心数。如果你有 100 万个活跃 task,但只有 8 个 Worker:

// 模拟调度延迟
async fn measure_latency() {
  let start = Instant::now();

  // 创建大量 task 争抢调度
  let mut handles = Vec::new();
  for i in 0..1_000_000 {
  handles.push(tokio::spawn(async move {
  // 每个 task 做一点点工作
  // 但 100 万个 task 排队等 8 个 Worker
  tokio::task::yield_now().await;
  }));
  }

  for handle in handles {
  handle.await.unwrap();
  }

  // 在 8 核机器上,这可能需要几秒钟
  // 每个 task 的平均调度延迟 = 总时间 / task 数
  println!("总耗时: {:?}", start.elapsed());
}

缓解策略:

选择 multi-thread 还是 current-thread

场景 推荐 原因
Web 服务器(生产) multi_thread 利用多核,I/O 密集型最优
CLI 工具 current_thread 简单,开销低
单元测试 current_thread 确定性行为,便于调试
嵌入式 / WASM current_thread 可能只有一个核心
CPU 密集型计算 multi_thread + spawn_blocking 不要在 async 里做重计算
// 生产环境推荐配置
#[tokio::main]
async fn main() {
  // 等价于 Builder::new_multi_thread()
  //  .worker_threads(num_cpus)
  //  .enable_all()
  //  .build()
  server::run().await;
}

// 测试环境推荐配置
#[tokio::test]
async fn test_something() {
  // 等价于 Builder::new_current_thread()
  //  .enable_all()
  //  .build()
  assert_eq!(my_async_fn().await, 42);
}

陷阱:Tokio 与标准库的混用

这是新手(甚至老手)最密集的踩坑区。Rust 标准库和 tokio 的同名 API 看起来一样,行为完全不同。

坑 1:async 里用 std::fs

// 否 灾难:阻塞整个 Worker 线程
async fn read_config() -> String {
  // std::fs::read_to_string 是同步阻塞的!
  // 文件在机械硬盘上?恭喜,这个 Worker 上的所有 task 陪你等 50ms
  std::fs::read_to_string("config.toml").unwrap()
}

// 是 正确:用 spawn_blocking 隔离
async fn read_config() -> String {
  tokio::task::spawn_blocking(|| {
  std::fs::read_to_string("config.toml").unwrap()
  }).await.unwrap()
}

// 是 更好:直接用 tokio::fs(内部就是 spawn_blocking 封装)
async fn read_config() -> String {
  tokio::fs::read_to_string("config.toml").await.unwrap()
}

坑 2:async 里用 std::sync::Mutex

// 否 危险:如果持锁期间有 .await,可能死锁
async fn bad_update(data: Arc<std::sync::Mutex<Vec<String>>>) {
  let mut guard = data.lock().unwrap();
  // 持有 std::sync::Mutex 的 guard 跨越了 .await 点
  // 如果这个 task 在 .await 时被挂起,其他 task 无法获取锁
  // 而且这个 task 可能被调度到不同的 Worker 线程上恢复——
  // std::sync::MutexGuard 不是 Send 的,编译器会报错(这是好事)
  let result = fetch_something().await;
  guard.push(result);
}

// 是 正确:用 tokio::sync::Mutex(支持跨 .await 持有)
async fn good_update(data: Arc<tokio::sync::Mutex<Vec<String>>>) {
  let mut guard = data.lock().await; // 注意这里是 .await
  let result = fetch_something().await;
  guard.push(result);
}

// 是 更好:如果不需要跨 .await 持锁,std::sync::Mutex 反而更快
async fn best_update(data: Arc<std::sync::Mutex<Vec<String>>>) {
  let result = fetch_something().await;
  // 先完成异步操作,再短暂持锁同步修改
  data.lock().unwrap().push(result);
  // guard 立即 drop,不跨 .await——没问题
}

经验法则:锁不跨 .await → 用 std::sync::Mutex(更快,无 overhead);锁跨 .await → 用 tokio::sync::Mutex

坑 3:在 tokio runtime 里用 std::thread::spawn

// 否 常见误解:想"并行"但绕过了调度器
async fn handle_request() {
  // std::thread::spawn 创建的线程不受 tokio 管理
  // 里面不能用 tokio::spawn、tokio::time::sleep 等异步 API
  // 除非你手动在新线程里创建 runtime(浪费资源)
  std::thread::spawn(|| {
  // tokio::time::sleep(Duration::from_secs(1)).await;  // 编译错误!
  });
}

// 是 正确:用 tokio::spawn 做并发
async fn handle_request() {
  tokio::spawn(async {
  tokio::time::sleep(Duration::from_secs(1)).await;
  // 这个 task 由 tokio 调度器管理
  });
}

// 是 正确:真的需要 OS 线程做 CPU 密集计算,用 spawn_blocking
async fn handle_request() {
  let result = tokio::task::spawn_blocking(|| {
  // 这里跑 CPU 密集逻辑,不影响 async 调度器
  heavy_computation()
  }).await.unwrap();
}

速查表:标准库 vs tokio 对照

场景 否 不要在 async 里用 是 正确替代
文件 I/O std::fs::* tokio::fs::*spawn_blocking
线程睡眠 std::thread::sleep tokio::time::sleep
互斥锁(跨 await) std::sync::Mutex tokio::sync::Mutex
通道 std::sync::mpsc tokio::sync::mpsc
创建并发任务 std::thread::spawn tokio::spawn
DNS 解析 std::net::ToSocketAddrs tokio::net::lookup_host

总结

tokio 的 ~40000 行代码做了这些事:

  1. 调度器:work-stealing 多线程队列,让 CPU 一刻不停
  2. Waker:类型擦除的唤醒机制,连接 I/O 事件和任务调度
  3. I/O Driver:基于 mio 封装 epoll/kqueue/IOCP,一个线程等万级连接
  4. Timer:分层时间轮,O(1) 插入和取消

Go 的 GMP 调度器 相比,tokio 选择了”无栈协程 + 编译期状态机”的路线,换来零开销抽象和极致的内存效率,代价是更陡峭的学习曲线和”必须在 .await 点让出”的协作式限制。

两种设计没有绝对优劣——Go 选择了”让大多数开发者轻松写并发”,Rust 选择了”让需要极致性能的开发者有完全的控制权”。理解它们的内部机制,才能在合适的场景选择合适的工具。


延伸阅读


By .