libevent中基于Reactor模式的事件处理框架对应event_base,在event在完成创建后,需要向event_base注册事件,监控事件的当前状态,当事件状态为激活状(EV_ACTIVE)时,调用回调函数执行。
本文主要从以下几方面进行分析:event_base的结构,event_base的创建,事件的注册、事件分发、事件注销
event_base结构
struct event_base {
//指定某个eventop结构体,它决定了该event_base使用哪种I/O多路复用技术(注解1)
const struct eventop *evsel;
void *evbase;
//告知后端下次执行事件分发时需要注意的哪些事件
struct event_changelist changelist;
//一个eventop,专门用来处理信号事件的
const struct eventop *evsigsel;
//存储信号处理的信息
struct evsig_info sig;
//虚拟事件的个数
int virtual_event_count;
//总事件个数
int event_count;
//活跃事件个数
int event_count_active;
//处理完当前所有的事件之后退出
int event_gotterm;
//立即退出
int event_break;
//立即启动一个新的事件循环
int event_continue;
//当前运行事件的优先级
int event_running_priority;
//是否正在进行事件循环
int running_loop;
//活跃事件的链表
struct event_list *activequeues;
//活跃事件的个数
int nactivequeues;
//要延迟处理的事件队列
struct deferred_cb_queue defer_queue;
//IO事件队列
struct event_io_map io;
//信号事件队列
struct event_signal_map sigmap;
//所有注册事件的队列
struct event_list eventqueue;
//管理定时事件的最小堆
struct min_heap timeheap;
//IO就绪的时候和缓存时间
struct timeval event_tv;
struct timeval tv_cache;
......
};
event_base的初始化
创建event_base对象的过程也即创建了一个libevent实例,需要通过event_base_new()函数分配并创建一个具有默认配置的event_base,而event_base_new调用event_base_new_with_config(…)创建event_base。
struct event_base * event_base_new(void)
{
struct event_base *base = NULL;
struct event_config *cfg = event_config_new();
if (cfg) {
base = event_base_new_with_config(cfg);
event_config_free(cfg);
}
return base;
}
从上面可以看出,首先创建一个具有默认配置的event_config,因此若需要对event_base进行配置,可以通过配置cfg达到。下面先看下event_config的结构体定义:
struct event_config {
TAILQ_HEAD(event_configq, event_config_entry) entries;
int n_cpus_hint;
enum event_method_feature require_features;
enum event_base_config_flag flags;
};
entries
TAILQ_HEAD表示一个队列,队列的元素的类型为event_config_entry。libevent是基于跨平台的,其会对IO多路复用函数(select, evport, poll, epoll等)进行封装,根据操作系统选择最高效的IO复用函数。event_config_avoid_method可以配置屏蔽使用指定的IO多路复用函数。通过字符串的方式指定method。
n_cpus_hint
指明CPU的数量,可通过event_config_set_num_cpus_hint函数来设置的。其作用是告诉event_config,系统中有多少个CPU,以便作一些对线程池作一些调整来获取更高的效率。目前,仅仅Window系统的IOCP(Windows的IOCP能够根据CPU的个数智能调整),该函数的设置才有用。event_base实际使用的CPU个数不一定等于提示的个数。
require_features
指明了event_config要求的特征,用于指定IO多路复用函数所需要的特征,不同IO复用函数其需要的特征不一样,如果IO复用函数无法满足配置的特征,那么配置失败。各个IO复用函数支持的特征,可以从select.c, poll.c, epoll.c等源文件查看。
//event.h文件
enum event_method_feature {
EV_FEATURE_ET = 0x01, //支持边沿触发
//添加、删除、或者确定哪个事件激活这些动作的时间复杂度都为O(1)
//select、poll是不能满足这个特征的,而epoll则满足
EV_FEATURE_O1 = 0x02,
EV_FEATURE_FDS = 0x04 //支持任意的文件描述符,而不能仅仅支持套接字
};
flags
可以通过event_config_set_flag函数进行设置。
EVENT_BASE_FLAG_NOLOCK:不要为event_base分配锁。设置这个选项可以为event_base节省一点加锁和解锁的时间,但是当多个线程访问event_base会变得不安全
EVENT_BASE_FLAG_IGNORE_ENV:选择多路IO复用函数时,不检测EVENT_*环境变量。使用这个标志要考虑清楚:因为这会使得用户更难调试程序与Libevent之间的交互
EVENT_BASE_FLAG_STARTUP_IOCP:仅用于Windows,这使得Libevent在启动时就启用任何必需的IOCP分发逻辑,而不是按需启用
EVENT_BASE_FLAG_NO_CACHE_TIME:不是在event loop每次准备执行timeout回调函数时检测当前时间,而是在每次执行timeout回调函数后都进行检测,这将消耗更多的CPU时间
EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST:告知Libevent,如果决定使用epoll这个多路IO复用函数,可以安全地使用更快的基于changelist 的多路IO复用函数:epoll-changelist多路IO复用可以在多路IO复用函数调用之间,同样的fd 多次修改其状态的情况下,避免不必要的系统调用。但是如果传递任何使用dup()或者其变体克隆的fd给libevent,epoll-changelist多路IO复用函数会触发一个内核bug,导致不正确的结果。在不使用epoll这个多路IO复用函数的情况下,这个标志是没有效果的。也可以通过设置EVENT_EPOLL_USE_CHANGELIST 环境变量来打开epoll-changelist选项。
struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
int i;
struct event_base *base;
int should_check_environment;
// event_base空间分配
if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
event_warn("%s: calloc", __func__);
return NULL;
}
detect_monotonic();
gettime(base, &base->event_tv);
// 超时小根堆初始化,即base指向min-heap的指针,小根堆大小设置为0
min_heap_ctor(&base->timeheap);
// event_base的事件注册队列初始化
TAILQ_INIT(&base->eventqueue);
base->sig.ev_signal_pair[0] = -1;
base->sig.ev_signal_pair[1] = -1;
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;
// event_base事件的回调函数队列初始化
event_deferred_cb_queue_init(&base->defer_queue);
base->defer_queue.notify_fn = notify_base_cbq_callback;
base->defer_queue.notify_arg = base;
if (cfg)
base->flags = cfg->flags;
// 初始化event_io_map, event_signal_map, changelist
evmap_io_initmap(&base->io);
evmap_signal_initmap(&base->sigmap);
event_changelist_init(&base->changelist);
base->evbase = NULL;
should_check_environment =
!(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));
for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
// 检查IO多路复用的方法是否需要屏蔽
if (event_config_is_avoided_method(cfg,
eventops[i]->name))
continue;
// 检查IO复用函数是否支持cfg中设置的特征
if ((eventops[i]->features & cfg->require_features)
!= cfg->require_features)
continue;
}
/* also obey the environment variables */
if (should_check_environment &&
event_is_method_disabled(eventops[i]->name))
continue;
base->evsel = eventops[i];
base->evbase = base->evsel->init(base);
}
....
/* allocate a single active event queue */
if (event_base_priority_init(base, 1) < 0) {
event_base_free(base);
return NULL;
}
....
return (base);
}
接口函数
event_base管理事件主要通过以下五个接口函数:
int event_add(struct event *ev, const struct timeval *timeout);
int event_del(struct event *ev);
int event_base_loop(struct event_base *base, int loops);
void event_active(struct event *event, int res, short events);
void event_process_active(struct event_base *base);
1 . 添加事件 (event_add)
int event_add(struct event *ev, const struct timeval *tv)
{
int res;
// check event是否有event_base,无
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {...;return -1;}
// 获取event_base的th_base_lock锁
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
/* 此处为event_add_internal函数的展开 */
// tv不为NULL,就说明是一个超时event,在小根堆中为其留一个位置
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
/* If the main thread is currently executing a signal event's
* callback, and we are not the main thread, then we want to wait
* until the callback is done before we mess with the event, or
* else we can race on ev_ncalls and ev_pncalls below.
*/
if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters; // 等待事件数 +1,进入条件等待
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
// IO事件,添加到event_io_map中
res = evmap_io_add(base, ev->ev_fd, ev);
// 信号事件,添加到event_signal_map中
res = evmap_signal_add(base, (int)ev->ev_fd, ev);
// 无论IO事件还是signal事件,添加到event queue中
event_queue_insert(base, ev, EVLIST_INSERTED);
if (res != -1 && tv != NULL) {
struct timeval now;
//用户把这个event设置成EV_PERSIST,即为永久event,可以发生多次超时.
//需要记录用户设置的超时值,tv_is_absolute = 0表示使用相对时间
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
//该event之前被加入到超时队列。用户可以对同一个event调用多次event_add
//并且可以每次都用不同的超时值。
if (ev->ev_flags & EVLIST_TIMEOUT) {
/* XXX I believe this is needless. */
//之前为该event设置的超时值是所有超时中最小的。
//从下面的删除可知,会删除这个最小的超时值。此时多路IO复用函数
//的超时值参数就已经改变了。
if (min_heap_elt_is_top(ev))
notify = 1; //要通知主线程。可能是次线程为这个event调用本函数
//从超时队列中删除这个event。
//多次对同一个超时event调用event_add,那么只能保留最后的那个。
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}
//若正在event_add的event由于超时而被激活,需要从active queue中将该event移除
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {//该event被激活的原因是超时
...
event_queue_remove(base, ev, EVLIST_ACTIVE);
}
//获取当前时间
gettime(base, &now);
//虽然用户在event_add时只需用一个相对时间,但实际上在Libevent内部
//还是要把这个时间转换成绝对时间。从存储的角度来说,存绝对时间只需
//一个变量。而相对时间则需两个,一个存相对值,另一个存参照物。
if (tv_is_absolute) { //该参数指明时间是否为一个绝对时间
ev->ev_timeout = *tv;
} else {
//参照时间 + 相对时间 ev_timeout存的是绝对时间
evutil_timeradd(&now, tv, &ev->ev_timeout);
}
//将该超时event插入到超时队列中
event_queue_insert(base, ev, EVLIST_TIMEOUT);
//若本次插入的超时值,是所有超时中最小的。那么此时就需要通知主线程。
if (min_heap_elt_is_top(ev))
notify = 1;
}
//如果需要通知,且本线程不是主线程,则需通知主线程
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
/* 此处为event_add_internal函数的结束 */
// 释放event_base的th_base_lock锁
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}
注: 1、对于同一个event,若为IO event、信号event,那么将无法多次添加。若为超时event,则可进行多次添加,并且超时值为最后一次设置的超时大小。
2、notify变量。主线程在执行event_base_dispatch,此时多次执行event_add,并且超时值发生了改变,那么需要更新event的超时值设置,并且以最后一次event_add为准,且通知主线程evthread_notify_base。
3 . 删除事件 (event_del)
int event_del(struct event *ev)
{
....
// 获取event_base的th_base_lock锁
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
/* 此处展开event_del_internal函数*/
struct event_base *base;
int res = 0, notify = 0;
....
// 主线程正在执行将要删除的event,且当前线程不是主线程,那么需要进入等待,
// 直至主线程完成event的回调执行,
base = ev->ev_base;
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond,
base->th_base_lock);
}
// 若当前event为信号事件,且正在循环执行,那么应该停止循环
/* See if we are just active executing this event in a loop */
if (ev->ev_events & EV_SIGNAL) {
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
}
// 若event为定时事件,则需要从超时队列中将事件删除
if (ev->ev_flags & EVLIST_TIMEOUT) {
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}
// 若event的状态为激活状态,则需要从激活队列中将事件删除
if (ev->ev_flags & EVLIST_ACTIVE)
event_queue_remove(base, ev, EVLIST_ACTIVE);
// 若event的状态为插入状态,则需要从event queue将事件删除
if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove(base, ev, EVLIST_INSERTED);
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_del(base, ev->ev_fd, ev); // 从event_io_map删除event
else
// 从event_signal_map删除event
res = evmap_signal_del(base, (int)ev->ev_fd, ev);
if (res == 1) {
// 删除成功需要通知主线程
notify = 1;
res = 0;
}
}
//如果需要通知,且本线程不是主线程,则需通知主线程
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
/* 此处结束event_del_internal函数*/
// 释放event_base的th_base_lock锁
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}