现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

libevent事件处理框架

2018-04-01 23:00 工业·编程 ⁄ 共 10017字 ⁄ 字号 暂无评论

    libevent中基于Reactor模式的事件处理框架对应event_base,在event在完成创建后,需要向event_base注册事件,监控事件的当前状态,当事件状态为激活状(EV_ACTIVE)时,调用回调函数执行。

    本文主要从以下几方面进行分析:event_base的结构,event_base的创建,事件的注册、事件分发、事件注销

event_base结构

struct event_base {

  //指定某个eventop结构体,它决定了该event_base使用哪种I/O多路复用技术(注解1)

  const struct eventop *evsel;

  void *evbase;

  //告知后端下次执行事件分发时需要注意的哪些事件

  struct event_changelist changelist;

  //一个eventop,专门用来处理信号事件的

  const struct eventop *evsigsel;

  //存储信号处理的信息

  struct evsig_info sig;

  //虚拟事件的个数

  int virtual_event_count;

  //总事件个数

  int event_count;

  //活跃事件个数

  int event_count_active;

  //处理完当前所有的事件之后退出

  int event_gotterm;

  //立即退出

  int event_break;

  //立即启动一个新的事件循环

  int event_continue;

  //当前运行事件的优先级

  int event_running_priority;

  //是否正在进行事件循环

  int running_loop;

  //活跃事件的链表

  struct event_list *activequeues;

  //活跃事件的个数

  int nactivequeues;

  //要延迟处理的事件队列

  struct deferred_cb_queue defer_queue;

  //IO事件队列

  struct event_io_map io;

  //信号事件队列

  struct event_signal_map sigmap;

  //所有注册事件的队列

  struct event_list eventqueue;

  //管理定时事件的最小堆

  struct min_heap timeheap;

  //IO就绪的时候和缓存时间

  struct timeval event_tv;

  struct timeval tv_cache;

  ......

};

event_base的初始化

创建event_base对象的过程也即创建了一个libevent实例,需要通过event_base_new()函数分配并创建一个具有默认配置的event_base,而event_base_new调用event_base_new_with_config(…)创建event_base。

struct event_base * event_base_new(void)

{

    struct event_base *base = NULL;

    struct event_config *cfg = event_config_new();

    if (cfg) {

        base = event_base_new_with_config(cfg);

        event_config_free(cfg);

    }

    return base;

}

从上面可以看出,首先创建一个具有默认配置的event_config,因此若需要对event_base进行配置,可以通过配置cfg达到。下面先看下event_config的结构体定义:

struct event_config {

  TAILQ_HEAD(event_configq, event_config_entry) entries;

  int n_cpus_hint;

  enum event_method_feature require_features;

  enum event_base_config_flag flags;

};

entries

TAILQ_HEAD表示一个队列,队列的元素的类型为event_config_entry。libevent是基于跨平台的,其会对IO多路复用函数(select, evport, poll, epoll等)进行封装,根据操作系统选择最高效的IO复用函数。event_config_avoid_method可以配置屏蔽使用指定的IO多路复用函数。通过字符串的方式指定method。

n_cpus_hint

指明CPU的数量,可通过event_config_set_num_cpus_hint函数来设置的。其作用是告诉event_config,系统中有多少个CPU,以便作一些对线程池作一些调整来获取更高的效率。目前,仅仅Window系统的IOCP(Windows的IOCP能够根据CPU的个数智能调整),该函数的设置才有用。event_base实际使用的CPU个数不一定等于提示的个数。

require_features

指明了event_config要求的特征,用于指定IO多路复用函数所需要的特征,不同IO复用函数其需要的特征不一样,如果IO复用函数无法满足配置的特征,那么配置失败。各个IO复用函数支持的特征,可以从select.c, poll.c, epoll.c等源文件查看。

//event.h文件

enum event_method_feature {

    EV_FEATURE_ET = 0x01, //支持边沿触发

    //添加、删除、或者确定哪个事件激活这些动作的时间复杂度都为O(1)

    //select、poll是不能满足这个特征的,而epoll则满足

    EV_FEATURE_O1 = 0x02,

    EV_FEATURE_FDS = 0x04 //支持任意的文件描述符,而不能仅仅支持套接字

};

flags

可以通过event_config_set_flag函数进行设置。

EVENT_BASE_FLAG_NOLOCK:不要为event_base分配锁。设置这个选项可以为event_base节省一点加锁和解锁的时间,但是当多个线程访问event_base会变得不安全

EVENT_BASE_FLAG_IGNORE_ENV:选择多路IO复用函数时,不检测EVENT_*环境变量。使用这个标志要考虑清楚:因为这会使得用户更难调试程序与Libevent之间的交互

EVENT_BASE_FLAG_STARTUP_IOCP:仅用于Windows,这使得Libevent在启动时就启用任何必需的IOCP分发逻辑,而不是按需启用

EVENT_BASE_FLAG_NO_CACHE_TIME:不是在event loop每次准备执行timeout回调函数时检测当前时间,而是在每次执行timeout回调函数后都进行检测,这将消耗更多的CPU时间

EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST:告知Libevent,如果决定使用epoll这个多路IO复用函数,可以安全地使用更快的基于changelist 的多路IO复用函数:epoll-changelist多路IO复用可以在多路IO复用函数调用之间,同样的fd 多次修改其状态的情况下,避免不必要的系统调用。但是如果传递任何使用dup()或者其变体克隆的fd给libevent,epoll-changelist多路IO复用函数会触发一个内核bug,导致不正确的结果。在不使用epoll这个多路IO复用函数的情况下,这个标志是没有效果的。也可以通过设置EVENT_EPOLL_USE_CHANGELIST 环境变量来打开epoll-changelist选项。

struct event_base *

event_base_new_with_config(const struct event_config *cfg)

{

    int i;

    struct event_base *base;

    int should_check_environment;

    // event_base空间分配

    if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {

        event_warn("%s: calloc", __func__);

        return NULL;

    }

    detect_monotonic();

    gettime(base, &base->event_tv);

    // 超时小根堆初始化,即base指向min-heap的指针,小根堆大小设置为0

    min_heap_ctor(&base->timeheap);

    // event_base的事件注册队列初始化

    TAILQ_INIT(&base->eventqueue);

    base->sig.ev_signal_pair[0] = -1;

    base->sig.ev_signal_pair[1] = -1;

    base->th_notify_fd[0] = -1;

    base->th_notify_fd[1] = -1;

    // event_base事件的回调函数队列初始化

    event_deferred_cb_queue_init(&base->defer_queue);

    base->defer_queue.notify_fn = notify_base_cbq_callback;

    base->defer_queue.notify_arg = base;

    if (cfg)

        base->flags = cfg->flags;

    // 初始化event_io_map, event_signal_map, changelist

    evmap_io_initmap(&base->io);

    evmap_signal_initmap(&base->sigmap);

    event_changelist_init(&base->changelist);

    base->evbase = NULL;

    should_check_environment =

        !(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));

    for (i = 0; eventops[i] && !base->evbase; i++) {

        if (cfg != NULL) {

            // 检查IO多路复用的方法是否需要屏蔽

            if (event_config_is_avoided_method(cfg,

                eventops[i]->name))

                continue;

            // 检查IO复用函数是否支持cfg中设置的特征

            if ((eventops[i]->features & cfg->require_features)

                != cfg->require_features)

                continue;

        }

        /* also obey the environment variables */

        if (should_check_environment &&

            event_is_method_disabled(eventops[i]->name))

            continue;

        base->evsel = eventops[i];

        base->evbase = base->evsel->init(base);

    }

    ....

    /* allocate a single active event queue */

    if (event_base_priority_init(base, 1) < 0) {

        event_base_free(base);

        return NULL;

    }

    ....

    return (base);

}

接口函数

event_base管理事件主要通过以下五个接口函数:

int event_add(struct event *ev, const struct timeval *timeout);

int event_del(struct event *ev);

int event_base_loop(struct event_base *base, int loops);

void event_active(struct event *event, int res, short events);

void event_process_active(struct event_base *base);

1 . 添加事件 (event_add)

int event_add(struct event *ev, const struct timeval *tv)

{

    int res;

    // check event是否有event_base,无

    if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {...;return -1;}

    // 获取event_base的th_base_lock锁

    EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

    /* 此处为event_add_internal函数的展开 */

    // tv不为NULL,就说明是一个超时event,在小根堆中为其留一个位置 

    if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {

        if (min_heap_reserve(&base->timeheap,

            1 + min_heap_size(&base->timeheap)) == -1)

            return (-1);  /* ENOMEM == errno */

    }

    /* If the main thread is currently executing a signal event's

     * callback, and we are not the main thread, then we want to wait

     * until the callback is done before we mess with the event, or 

     * else we can race on ev_ncalls and ev_pncalls below.

     */

    if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)

        && !EVBASE_IN_THREAD(base)) {

        ++base->current_event_waiters; // 等待事件数 +1,进入条件等待

        EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);

    }

    // IO事件,添加到event_io_map中

    res = evmap_io_add(base, ev->ev_fd, ev);

    // 信号事件,添加到event_signal_map中

    res = evmap_signal_add(base, (int)ev->ev_fd, ev);

    // 无论IO事件还是signal事件,添加到event queue中

    event_queue_insert(base, ev, EVLIST_INSERTED);

    if (res != -1 && tv != NULL) { 

        struct timeval now;

        //用户把这个event设置成EV_PERSIST,即为永久event,可以发生多次超时. 

        //需要记录用户设置的超时值,tv_is_absolute = 0表示使用相对时间

        if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)

            ev->ev_io_timeout = *tv;

        //该event之前被加入到超时队列。用户可以对同一个event调用多次event_add 

        //并且可以每次都用不同的超时值。

        if (ev->ev_flags & EVLIST_TIMEOUT) { 

            /* XXX I believe this is needless. */ 

            //之前为该event设置的超时值是所有超时中最小的。

            //从下面的删除可知,会删除这个最小的超时值。此时多路IO复用函数 

            //的超时值参数就已经改变了。 

            if (min_heap_elt_is_top(ev)) 

                notify = 1; //要通知主线程。可能是次线程为这个event调用本函数 

            //从超时队列中删除这个event。

            //多次对同一个超时event调用event_add,那么只能保留最后的那个。 

            event_queue_remove(base, ev, EVLIST_TIMEOUT); 

        }

        //若正在event_add的event由于超时而被激活,需要从active queue中将该event移除 

        if ((ev->ev_flags & EVLIST_ACTIVE) && 

            (ev->ev_res & EV_TIMEOUT)) {//该event被激活的原因是超时 

            ... 

            event_queue_remove(base, ev, EVLIST_ACTIVE); 

        }

        //获取当前时间

        gettime(base, &now); 

        //虽然用户在event_add时只需用一个相对时间,但实际上在Libevent内部 

        //还是要把这个时间转换成绝对时间。从存储的角度来说,存绝对时间只需 

        //一个变量。而相对时间则需两个,一个存相对值,另一个存参照物。 

        if (tv_is_absolute) { //该参数指明时间是否为一个绝对时间 

            ev->ev_timeout = *tv; 

        } else { 

            //参照时间 + 相对时间  ev_timeout存的是绝对时间 

            evutil_timeradd(&now, tv, &ev->ev_timeout); 

        } 

        //将该超时event插入到超时队列中

        event_queue_insert(base, ev, EVLIST_TIMEOUT);

        //若本次插入的超时值,是所有超时中最小的。那么此时就需要通知主线程。 

        if (min_heap_elt_is_top(ev)) 

            notify = 1;

    }

    //如果需要通知,且本线程不是主线程,则需通知主线程

    if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))

        evthread_notify_base(base);

    /* 此处为event_add_internal函数的结束 */

    // 释放event_base的th_base_lock锁

    EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

    return (res);

}

注: 1、对于同一个event,若为IO event、信号event,那么将无法多次添加。若为超时event,则可进行多次添加,并且超时值为最后一次设置的超时大小。

2、notify变量。主线程在执行event_base_dispatch,此时多次执行event_add,并且超时值发生了改变,那么需要更新event的超时值设置,并且以最后一次event_add为准,且通知主线程evthread_notify_base。

3 . 删除事件 (event_del)

int event_del(struct event *ev)

{

    ....

    // 获取event_base的th_base_lock锁

    EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

    /* 此处展开event_del_internal函数*/

    struct event_base *base;

    int res = 0, notify = 0;

    ....

    // 主线程正在执行将要删除的event,且当前线程不是主线程,那么需要进入等待,

    // 直至主线程完成event的回调执行,

    base = ev->ev_base;

    if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {

        ++base->current_event_waiters;

        EVTHREAD_COND_WAIT(base->current_event_cond,

                            base->th_base_lock);

    }

    // 若当前event为信号事件,且正在循环执行,那么应该停止循环

    /* See if we are just active executing this event in a loop */

    if (ev->ev_events & EV_SIGNAL) {

        if (ev->ev_ncalls && ev->ev_pncalls) {

            /* Abort loop */

            *ev->ev_pncalls = 0;

        }

    }

    // 若event为定时事件,则需要从超时队列中将事件删除

    if (ev->ev_flags & EVLIST_TIMEOUT) {

        event_queue_remove(base, ev, EVLIST_TIMEOUT);

    }

    // 若event的状态为激活状态,则需要从激活队列中将事件删除

    if (ev->ev_flags & EVLIST_ACTIVE)

        event_queue_remove(base, ev, EVLIST_ACTIVE);

    // 若event的状态为插入状态,则需要从event queue将事件删除

    if (ev->ev_flags & EVLIST_INSERTED) {

        event_queue_remove(base, ev, EVLIST_INSERTED);

        if (ev->ev_events & (EV_READ|EV_WRITE))

            res = evmap_io_del(base, ev->ev_fd, ev); // 从event_io_map删除event

        else

            // 从event_signal_map删除event

            res = evmap_signal_del(base, (int)ev->ev_fd, ev);

        if (res == 1) {

            // 删除成功需要通知主线程

            notify = 1;

            res = 0;

        }

    }

    //如果需要通知,且本线程不是主线程,则需通知主线程

    if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))

        evthread_notify_base(base);

    /* 此处结束event_del_internal函数*/

    // 释放event_base的th_base_lock锁

    EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

    return (res);

}

给我留言

留言无头像?