多线程使用 libevent

Table of Contents

1 前言

libevent 封装了底层多路复用接口,让我们可以更方便地跨平台使用异步网络IO。同时, libevent 还实现了定时任务,使用它我们就不用自己实现一遍了,还是比较方便。

libevent 官方提供了 libevent的教程libevent的例子 以及libevent的接口文档,写得相当好。我在阅读过一遍之后,开始尝试使用它实现一个负责与物联网设备通信的接入程序,也就是普通的TCP/UDP服务端,承担接收连接请求、接收数据、下发数据、验证身份、转发设备请求、管理连接超时、以及实现一些简单的接口,当然还有其它功能。这个程序跟 nginx 是很像的,之前我直接用 epoll 实现过很多个类似的程序,最近看到 libevent 开发的程序很多,于是开始尝试使用它。

然后遇到了问题。我尝试创建多个IO线程,然而事情并不是想象的那样, event_del() 阻塞了。libevent 的事件操作只能在 dispatch 循环同一个线程中执行,也就是循环退出后,或者在回调函数中。

一番调试后有了这个echo-server 例子,以及这篇说明,记录我调试的过程。

2 libevent 入门

libevent 有两个概念,event_base 和 event。

event 是事件,可以设定触发条件(可读/可写/超时/信号),以及条件触发后需要执行的函数。event 相当于 epoll 中的 epoll_event。

事件循环在 event_base 上执行,event_base 里记录了所有事件的触发条件,循环中会检查条件,如果条件满足,则调用 event 中指定的函数。event_base 相当于 epoll 中 epoll_create() 创建的结构。

例如,要读取一个文件描述符 fd ,可以创建一个读事件,在事件的回调函数中读 fd

// 事件的回调函数
void event_cb(evutil_socket_t fd, short what, void* arg) {
  // 在这里读 fd
  // ...
}

void* arg = NULL;
// 创建 event_base
struct event_base* ev_base = event_base_new();
// 创建 event, 可以从一个event_base中创建多个event
struct event* ev = event_new(ev_base, fd, EV_READ, event_cb, arg);

// 把事件注册到循环中
event_add(ev, NULL);

// 开始事件循环,如果事件的条件满足,则调用事件的回调函数
event_base_dispatch(ev_base);

3 设计结构

有两种方式结构能达到我的目的,一种是一个线程监听事件,线程池处理事件;第二种是多个线程监听事件,事件出发后直接在本线程中执行。我使用的是第二种。

3.1 单事件循环,多处理线程

event_base 操作只能与事件循环在同一个线程中,为了在多线程中都可以进行事件处理,第一个想法是在事件线程中创建 event_base 循环,回调函数中将事件的处理交给线程池。事件循环中存在一个超时事件A,这个超时事件的回调函数专门负责执行线程池发过来的操作事件的代码。如果在线程池中还需要操作事件,则将操作事件的代码发给事件线程,并将超时事件激活以执行这些代码。

single_loop_multi_thread.png

我们在使用 epoll 时经常使用这种方式,一个线程监听事件,事件触发后再交由线程池处理,如果要添加或操作事件,则把操作函数发到队列中,由监听事件的线程执行。这种方式需要在多个线程中交流,会造成一些性能损失,但在我实际的项目中,这些性能损失跟业务消耗的性能比起来微不足道。但这次开发的程序我还是用了另一种模式。

3.2 多事件循环

创建多个事件线程,每个线程创建一个 event_base 事件循环,事件触发时直接在事件线程中执行。

multiloop.png

实际的项目中,事件触发后,不只进行IO操作,还有很多阻塞的任务需要处理,最常见的是请求数据库。数据库操作也可以写成异步IO放在事件循环中,但为了方便,我都把数据库操作放到线程池中运行,运行结束后将事件操作放到队列,向上一个结构一样,由一个超时任务来处理事件操作。

4 代码说明

代码都上传了,去除了业务相关的逻辑,只实现了 echo-server 功能。

4.1 buffer

buffer.cc 和 buffer.h 实现了可变长度的读写缓冲区。libevent 有 evbuffer 结构,自己实现 buffer 是因为业务的程序中不止用了 libevent,还有很多遗留的IO代码,为了在IO以后统一业务接口,还是沿用自己实现的 buffer。libevent 的实现比我不知高到哪里去,今后我会考虑使用它。

buffer 本质上是个缓存队列,数据从头部插入,从尾部取出。数据取出的顺序与数据插入的顺序相同。 buffer.png

4.2 dispatcher

Dispatcher 是对 event_base 事件循环的封装。 event_base_loop() 函数在此处运行,使用中每个IO线程拥有一个 Dispather 实例。其中的队列 post_callbacks_ 保存来自其它线程通过 post() 方法对 Dispatcher 的操作。 post() 函数中对超时事件激活,而在超时事件中则将函数从队列中取出执行。

代码中只演示了一个队列,实际由于业务需要,可能需要多个队列以满足任务优先级的需求。在我们的业务程序中,关闭连接、释放资源等操作被认为是优先级低的,我们设立一个单独的队列,在其它队列中的内容都处理完之后,再处理低优先级队列的任务。

disp.png

4.3 thread_pool

一个简单的线程池实现。多个线程循环从任务队列中获取任务后执行。任务队列存在多个,以实现优先级的目的。在我们的业务中,设备身份认证包含很多密码学和数据库操作,非常耗时,我们将这个操作的优先级设置很低;相对地,设备数据上传优先级较高。这样可以保证现有业务不因大量的高性能消耗的设备接入请求中断。

thread.png

4.4 listener

打开监听端口,注册事件,触发事件后调用 accept 函数接收新连接,接收新连接后调用指定函数(handler)处理连接。

监听链接设置了 SO_REUSEPORT 选项,这样可以在多个线程中同时监听一个端口。

4.5 handler

连接处理的方法。listener 接收新连接后,实例化这个类处理新连接。

handler 的读事件接收客户端发来的数据,放到 read_buf 中,再从 read_buf 写到 write_buf (这里有点多余),随后注册写事件。在写事件中,将 write_buf 的内容发送给客户端。完成 echo-server 的功能。

实际的程序中,经常需要查询某个客户端连接,将数据从服务端主动发送给客户端,所以我们将 handler 放到一个哈希表中存储,并设置引用计数。但这个例子里没有这个必要。

4.6 main

程序入口,做一些初始化工作。

5 总结

搞这些花里胡哨的不见得比新员工学一周go开发的业务程序性能高。如果你真的需要一个与业务关系不是那么密切,更新不频繁而且对效率要求高的程序,可以尝试使用这里介绍的方法。


By .