Libevent源码分析

Linux服务器程序处理三类事件:I/O事件、信号和定时事件,在处理这三类事件时我们一般要考虑以下三个问题:

  • 统一事件源:统一管理这些事件,既能使代码通俗易懂,也能避免一下潜在的问题,可以利用I/O复用系统调用统一管理所有的时间;
  • 可移植性:不同的系统具有不同的I/O复用的方式,例如Solaris的/dev/poll文件,FreeBSD的kqueue机制,Linux的epoll系统调用;
  • 对并发编程的支持:在多线程和多进程环境下,我们需要考虑各执行实体如何协同处理客户连接、信号和定时器,以避免竞态条件。
    开源社区提供了很多优秀的I/O框架来解决上述问题,例如ACE、ASIO和Libevent,让我们将更多的精力放在业务与逻辑开发上,但是理解这些框架的原理对我们也是非常有帮助的,这篇博文主要分析Libevent的源码。

Libevent简介

Libevent是开源社区的一款高性能I/O框架库,其学习者和使用者众多,其著名案例有:高性能分布式内存对象缓存软件memcached,Google浏览器Chromium的Linux版本。作为一个I/O框架库,Libevent具有如下特点:

  • 跨平台支持:Libevent支持Linux、Unix和Windows;
  • 统一事件源:Libevent对I/O事件、信号和定时事件提供统一的处理;
  • 线程安全:Libevent使用libevent_pthreads库来提供线程安全支持;
  • 基于Reactor模式实现。

一个实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <signal.h>
#include <event.h>
void signal_cb(int fd, short event, void *argc) {
struct event_base *base = (event_base *)argc;
struct timeval delay = {2, 0};
printf("Caught an interrupt signal; exiting cleanly in two seconds...\n");
event_base_loopexit(base, &delay);
}
void timeout_cb(int fd, short event, void *argc) {
printf("timeout\n");
}
int mian() {
struct event_base *base = envent_init();
struct event *signal_event = evsignal_new(base, SIGINT, signal_cb, NULL);
event_add(signal_event, NULL);
timeval tv = {1, 0};
struct event *timeout_event = evtimer_new(base, timeout_cb, NULL);
event_add(timeout_event, &tv);
event_base_dispatch(base);
event_free(timeout_event);
event_free(signal_event);
event_base_free(base);
}
  • 调用event_init创建一个event_base对象,一个event_base对象相当于一个Reactor实例;
  • 创建具体的事件处理器,并设置他们所从属的Reactor实例。evsignal_new和evtimer_new函数分别用于创建信号事件处理器和定时事件处理器,定义如下:

    1
    2
    3
    #define evsignal_new(b, x, cb, arg) \
    event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))
    #define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg))
  • 调用event_add函数,将事件处理器添加到注册事件队列中,并将该事件处理器对应的事件添加到事件多路分发器中;

  • 调用event_base_dispatch函数来执行事件循环;
  • 最后释放系统资源。

Libevent源码组织结构

  • include/event2:Libevent主版本升级到2.0之后引入的,提供给应用程序使用的头文件;
    • event.h:提供核心函数;
    • http.h:提供HTTP协议相关服务;
    • rpc.h:提供远程调用支持。
  • 源码根目录下的头文件:这些分为两类,一类是对include/event2目录下的部分头文件的包装,另外一类是提供Libevent内部使用的辅助性头文件,具有*-internal.h;
  • compat/sys:通用数据结构目录,只有一个文件queue.h,封装了跨平台的基础数据结构,包括单向链表、双向链表、队列、尾队列和循环队列;
  • sample:提供一些实例代码;
  • test:提供一些测试代码;
  • WIN32-Code目录:提供Windows平台的一些专用代码;
  • event.c:该文件实现了Libevent的整体框架,主要是event和event_base两个结构体的相关操作;
  • devpoll.c、kqueue.c、evport.c、select.c、win32select.c、poll.c和epoll.c文件,分别封装了如下I/O复用机制:/dev/poll、kqueue、event ports、POSIX select、Windows select、poll和epoll。这些文件的主要内容相似,都是对结构体eventtop定义的接口函数的具体实现;
  • minheap-internal.h:时间堆的实现,提供对定时事件的支持;
  • signal.c:提供对信号的支持;
  • evmap.c:维护句柄(文件描述符或信号)与事件处理器的映射关系;
  • event_tagging.c:提供缓冲区中添加标记数据,以及从缓冲区中读取标记数据的函数;
  • event_iocp.c:提供对Windows IOCP的支持;
  • buffer*.c:提供对网络I/O缓冲的控制,包括:输入输出数据过滤,传输速率限制,使用SSL协议对应用数据进行保护,以及零拷贝文件传输等;
  • evthread*.c:提供对多线程的支持;
  • listener.c:封装了对监听socket的操作,包括监听连接和接受连接;
  • logs.c:Libevent的日志系统;
  • evutils.c、evutils_rand.c、strlcpy.c和arc4random.c:提供一些基本操作,比如生成随机数、获取socket地址信息、读取文件、设置socket属性等;
  • evdns.c、http.c和evrpc.c:提供对DNS协议、HTTP协议和RPC协议的支持;

event结构体

event结构体封装了句柄、事件类型、回调函数,以及其他必要的标志和数据,event结构体在include/event2/event_struct.h中定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
struct event {
struct event_callback ev_evcallback;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
struct event_base *ev_base;
union {
/* used for io events */
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} ev_;
short ev_events;
short ev_res; /* result passed to event callback */
struct timeval ev_timeout;
};

下面详细的介绍事件处理器event结构的每个成员:

  • ev_events:时间类型,取值如下:

    1
    2
    3
    4
    5
    6
    7
    8
    #define EV_TIMEOUT 0x01 // 定时事件
    #define EV_READ 0x02 // 可读事件
    #define EV_WRITE 0x04 // 可写事件
    #define EV_SIGNAL 0x08 // 信号事件
    #define EV_PERSIST 0x10 // 永久事件
    #define EV_ET 0x20 // 边沿触发事件,需要结合epoll使用
    #define EV_FINALIZE 0x40 // 释放事件
    #define EV_CLOSED 0x80 // 关闭事件
  • ev_evcallback:事件处理器,其定义如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    struct event_callback {
    TAILQ_ENTRY(event_callback) evcb_active_next;
    short evcb_flags;
    ev_uint8_t evcb_pri; /* smaller numbers are higher priority */
    ev_uint8_t evcb_closure;
    /* allows us to adopt for different types of events */
    union {
    void (*evcb_callback)(evutil_socket_t, short, void *);
    void (*evcb_selfcb)(struct event_callback *, void *);
    void (*evcb_evfinalize)(struct event *, void *);
    void (*evcb_cbfinalize)(struct event_callback *, void *);
    } evcb_cb_union;
    void *evcb_arg;
    };
    • ev_active_next:所有被激活的事件处理器通过该成员串联成一个尾队列,称之为活动时间队列,宏TAILQ_ENTRY定义在compat/sys/queue.h文件中;

      1
      2
      3
      4
      5
      #define TAILQ_ENTRY(type) \
      struct { \
      struct type *tqe_next; /* next element */ \
      struct type **tqe_prev; /* address of previous next element */ \
      }
    • evcb_flags:事件标志,可选值如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      #define EVLIST_TIMEOUT 0x01 // 事件处理器从属于通用定时器队列或时间堆
      #define EVLIST_INSERTED 0x02 // 事件处理器从属于注册事件队列
      #define EVLIST_SIGNAL 0x04 // 没有使用
      #define EVLIST_ACTIVE 0x08 /// 事件处理器从属于活动时间队列
      #define EVLIST_INTERNAL 0x10 // 内部使用
      #define EVLIST_ACTIVE_LATER 0x20 // 事件处理器延迟激活
      #define EVLIST_FINALIZING 0x40 // 事件处理器已被释放
      #define EVLIST_INIT 0x80 // 事件处理器已被初始化
      #define EVLIST_ALL 0xff // 定义所有标志
    • evcb_pri:事件处理器的优先级,值越小优先级越高,不同优先级的事件处理器会被插入到不同的活动事件队列中,Reactor会按照优先级从高到低遍历所有活动事件队列;

    • evcb_closure:指定event_base执行事件处理器的回调函数时的行为,可选值如下

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      /** 默认行为 */
      #define EV_CLOSURE_EVENT 0
      /** 执行信号的事件处理回调函数是,调用_ev.ev_signal.ev_ncalls次该回调函数 */
      #define EV_CLOSURE_EVENT_SIGNAL 1
      /** 执行完回调函数后,再次将事件添加到注册事件队列中 */
      #define EV_CLOSURE_EVENT_PERSIST 2
      /** 简单的回调函数 */
      #define EV_CLOSURE_CB_SELF 3
      /** 释放回调函数 */
      #define EV_CLOSURE_CB_FINALIZE 4
      /** 关闭回调函数 */
      #define EV_CLOSURE_EVENT_FINALIZE 5
      /** 关闭之后释放回调函数 */
      #define EV_CLOSURE_EVENT_FINALIZE_FREE 6
    • evcb_cb_union:事件处理器回调函数的联合体,支持多种回调函数;

    • evcb_arg:回调函数参数。
  • ev_timeout_pos:定时事件处理器,支持时间堆和链表管理定时时间;
  • ev_:联合体,将具有相同文件描述符的I/O时间通过ev_.evio串联成一个队列,称之为I/O事件队列,将具有相同信号值的信号处理器通过ev.ev_signal串联成一个队列,成为之信号事件队列;
  • ev_res:当前激活事件的类型;
  • ev_timeout:定时器超时值。

往注册事件队列中添加事件处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
struct event *ev;
ev = mm_malloc(sizeof(struct event));
if (ev == NULL)
return (NULL);
if (event_assign(ev, base, fd, events, cb, arg) < 0) {
mm_free(ev);
return (NULL);
}
return (ev);
}

event_new函数给event对象分配内存并初始化它的部分成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int
event_add(struct event *ev, const struct timeval *tv)
{
int res;
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
res = event_add_nolock_(ev, tv, 0);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}

event_add函数用于将event对象添加到注册事件列表中,调用event_add_nolock_函数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
int
event_add_nolock_(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
EVENT_BASE_ASSERT_LOCKED(base);
event_debug_assert_is_setup_(ev);
event_debug((
"event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%s%scall %p",
ev,
EV_SOCK_ARG(ev->ev_fd),
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_events & EV_CLOSED ? "EV_CLOSED " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback));
EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));
if (ev->ev_flags & EVLIST_FINALIZING) {
return (-1);
}
/*如果新添加的事件处理器是定时器,且它尚未添加到通用定时器队列或时间堆中,则为该定时器在时间堆上预留一个位置*/
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve_(&base->timeheap,
1 + min_heap_size_(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
/*如果当前调用者不是主线程(执行事件循环的线程),并且被添加的事件处理器是信号事件处理器,而且主线程正在执行该信号处理器的回调函数,则当前调用者必须等待主线程完成调用,否则将会引起竞态条件*/
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == event_to_event_callback(ev) &&
(ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED))
/*添加I/O事件和I/O事件处理器的映射关系*/
res = evmap_io_add_(base, ev->ev_fd, ev);
else if (ev->ev_events & EV_SIGNAL)
/*添加信号事件和信号信号处理器的映射关系*/
res = evmap_signal_add_(base, (int)ev->ev_fd, ev);
if (res != -1)
/*将事件处理器插入注册事件队列*/
event_queue_insert_inserted(base, ev);
if (res == 1) {
/* 事件多路发生器中添加了新的事件,所以要通知主线程 */
notify = 1;
res = 0;
}
}
/*下面将事件处理器添加至通用定时器队列或时间堆中*/
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;
#ifdef USE_REINSERT_TIMEOUT
int was_common;
int old_timeout_idx;
#endif
/*对于永久性事件处理器,如果其超时事件不是绝对事件,则将该事件处理器的超时事件记录在变量ev->ev_io_timeout中*/
if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
#ifndef USE_REINSERT_TIMEOUT
/** 如果该事件处理器已经被插入通用定时器或时间堆中,则先删除 */
if (ev->ev_flags & EVLIST_TIMEOUT) {
event_queue_remove_timeout(base, ev);
}
#endif
/*如果待添加的事件处理器已被激活,且原因是超时,则从活动队列中删除它,以避免其回调函数被执行*/
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
if (ev->ev_ncalls && ev->ev_pncalls) {
*ev->ev_pncalls = 0;
}
}
event_queue_remove_active(base, event_to_event_callback(ev));
}
gettime(base, &now);
common_timeout = is_common_timeout(tv, base);
#ifdef USE_REINSERT_TIMEOUT
was_common = is_common_timeout(&ev->ev_timeout, base);
old_timeout_idx = COMMON_TIMEOUT_IDX(&ev->ev_timeout);
#endif
if (tv_is_absolute) {
ev->ev_timeout = *tv;
/** 判断应该将定时器插入通用定时器队列,还是插入时间堆*/
} else if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
/** 加上当前系统时间,以取得定时器超时的绝对时间*/
evutil_timeradd(&now, tv, &ev->ev_timeout);
}
event_debug((
"event_add: event %p, timeout in %d seconds %d useconds, call %p",
ev, (int)tv->tv_sec, (int)tv->tv_usec, ev->ev_callback));
#ifdef USE_REINSERT_TIMEOUT
event_queue_reinsert_timeout(base, ev, was_common, common_timeout, old_timeout_idx);
#else
event_queue_insert_timeout(base, ev);
/** 插入定时器 */
#endif
if (common_timeout) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
if (ev == TAILQ_FIRST(&ctl->events)) {
common_timeout_schedule(ctl, &now, ev);
}
} else {
struct event* top = NULL;
if (min_heap_elt_is_top_(ev))
notify = 1;
else if ((top = min_heap_top_(&base->timeheap)) != NULL &&
evutil_timercmp(&top->ev_timeout, &now, <))
notify = 1;
}
}
/** 如果必要,唤醒主线程 */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
event_debug_note_add_(ev);
return (res);
}


往事件多路分发器中注册事件

event_add将一个事件添加到event_base的某个事件队列中,对于新添加的I/O事件处理器和信号事件处理器,我们还需要让时间多路分发器来监听对应的事件,同时建立文件描述符、信号值与事件处理器之间的映射关系。这主要通过调用evmap_io_add和evmap_signam_add两个函数来完成的,它们在evmap.c文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
int
evmap_io_add_(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
/** 获得event_base的后端I/O复用机制实例 */
const struct eventop *evsel = base->evsel;
/** 获得event_base中文件描述符与I/O事件队列的映射表 */
struct event_io_map *io = &base->io;
struct evmap_io *ctx = NULL;
/** fd对应的I/O事件队列 */
int nread, nwrite, nclose, retval = 0;
short res = 0, old = 0;
struct event *old_ev;
EVUTIL_ASSERT(fd == ev->ev_fd);
if (fd < 0)
return 0;
#ifndef EVMAP_USE_HT
if (fd >= io->nentries) {
if (evmap_make_space(io, fd, sizeof(struct evmap_io *)) == -1)
return (-1);
}
#endif
GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
evsel->fdinfo_len);
nread = ctx->nread;
nwrite = ctx->nwrite;
nclose = ctx->nclose;
if (nread)
old |= EV_READ;
if (nwrite)
old |= EV_WRITE;
if (nclose)
old |= EV_CLOSED;
if (ev->ev_events & EV_READ) {
if (++nread == 1)
res |= EV_READ;
}
if (ev->ev_events & EV_WRITE) {
if (++nwrite == 1)
res |= EV_WRITE;
}
if (ev->ev_events & EV_CLOSED) {
if (++nclose == 1)
res |= EV_CLOSED;
}
if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff || nclose > 0xffff)) {
event_warnx("Too many events reading or writing on fd %d",
(int)fd);
return -1;
}
if (EVENT_DEBUG_MODE_IS_ON() &&
(old_ev = LIST_FIRST(&ctx->events)) &&
(old_ev->ev_events&EV_ET) != (ev->ev_events&EV_ET)) {
event_warnx("Tried to mix edge-triggered and non-edge-triggered"
" events on fd %d", (int)fd);
return -1;
}
if (res) {
void *extra = ((char*)ctx) + sizeof(struct evmap_io);
/** 往事件多路分发器中注册事件,add是事件多路分发器的接口函数之一,对不同的后端I/O复用机制,这些接口函数有不同的实现 */
if (evsel->add(base, ev->ev_fd,
old, (ev->ev_events & EV_ET) | res, extra) == -1)
return (-1);
retval = 1;
}
ctx->nread = (ev_uint16_t) nread;
ctx->nwrite = (ev_uint16_t) nwrite;
ctx->nclose = (ev_uint16_t) nclose;
LIST_INSERT_HEAD(&ctx->events, ev, ev_io_next);
return (retval);
}


eventop结构体

eventop结构体封装了I/O复用机制必要的一些操作,比如注册事件、等待事件等,该结构体定义在event-internal.h中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct eventop {
/** 后端I/O复用技术的名字 */
const char *name;
/** 初始化函数 */
void *(*init)(struct event_base *);
/** 注册事件 */
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** 删除事件 */
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** 等待事件 */
int (*dispatch)(struct event_base *, struct timeval *);
/** 释放I/O复用机制使用的资源 */
void (*dealloc)(struct event_base *);
/** 调用fork之后是否需要重新初始化event_base */
int need_reinit;
/** I/O复用技术支持的一些特性,例如边沿触发等 */
enum event_method_feature features;
/** 有的I/O复用机制需要为每个I/O事件队列和信号队列分配额外的内存 */
size_t fdinfo_len;
};

devpoll.c、kqueue.c、evport.c、select.c、win32select.c、poll.c和epoll.c这些文件分别针对不同的I/O复用技术实现了eventop定义的这些接口。


event_base结构体

结构体event_base是Libevent的Reactor,定义在evevt_internal.h文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
struct event_base {
/** 初始化Reactor的时候选择一种后端I/O复用机制,并记录在如下字段中*/
const struct eventop *evsel;
/** 指向I/O复用机制真正存储的数据,它通过evsel成员的init函数来初始化 */
void *evbase;
struct event_changelist changelist;
const struct eventop *evsigsel;
/** 信号事件处理器使用的数据结构 */
struct evsig_info sig;
/** 添加到该event_base的虚拟事件、所有事件和激活事件的数量 */
int virtual_event_count;
int virtual_event_count_max;
int event_count;
int event_count_max;
int event_count_active;
int event_count_active_max;
/** 是否执行完活动事件队列上剩余的任务之后就退出事件循环*/
int event_gotterm;
/** 是否立即退出,而不管是否还有任务需要处理*/
int event_break;
/** 是否应该启动一个新的事件循环*/
int event_continue;
/** 目前正在处理的活动事件队列的优先级 */
int event_running_priority;
/** 事件循环是否已经启动 */
int running_loop;
int n_deferreds_queued;
/** 活动事件队列数组 */
struct evcallback_list *activequeues;
/** 活动事件队列数组大小 */
int nactivequeues;
/** 存放延迟回调函数的链表 */
struct evcallback_list active_later_queue;
/** 下面3个成员用于管理通用定时器队列 */
struct common_timeout_list **common_timeout_queues;
int n_common_timeouts;
int n_common_timeouts_allocated;
/** 文件描述符和I/O时间的映射关系表 */
struct event_io_map io;
/** 信号值和信号事件之间的映射关系表 */
struct event_signal_map sigmap;
/** 时间堆 */
struct min_heap timeheap;
/** 管理系统时间的一些成员. */
struct timeval tv_cache;
struct evutil_monotonic_timer monotonic_timer;
struct timeval tv_clock_diff;
time_t last_updated_clock_diff;
/** 多线程支持 */
#ifndef EVENT__DISABLE_THREAD_SUPPORT
/** 当前运行该event_base的事件循环的线程 */
unsigned long th_owner_id;
/** 对event_base的独占锁 */
void *th_base_lock;
/** 条件变量,用于唤醒正在等待某个事件处理完毕的线程 */
void *current_event_cond;
int current_event_waiters;
#endif
/** 当前事件循环正在执行哪个事件处理器的回调函数 */
struct event_callback *current_event;
#ifdef _WIN32
struct event_iocp_port *iocp;
#endif
/** 该event_base的一些配置参数 */
enum event_base_config_flag flags;
struct timeval max_dispatch_time;
int max_dispatch_callbacks;
int limit_callbacks_after_prio;
/** 下面这组成员变量给工作线程唤醒主线程提供了方法 */
int is_notify_pending;
evutil_socket_t th_notify_fd[2];
struct event th_notify;
int (*th_notify_fn)(struct event_base *base);
struct evutil_weakrand_state weakrand_seed;
LIST_HEAD(once_event_list, event_once) once_events;
};


事件循环

Libevent中实现事件循环的函数是event_base_loop,该函数首先调用I/O事件多路分发器的函数监听函数,以等待事件;当有事件发生时,就依次处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
/** 一个event_base仅允许一个事件循环 */
if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1; /* 标记该event_base已经运行 */
clear_time_cache(base); /* 清除event_base的系统时间缓存 */
/** 设置信号事件的event_base实例 */
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base_(base);
done = 0;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
base->n_deferreds_queued = 0;
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
/** 获取时间堆上堆顶元素的超时值,即I/O复用系统调用本次应该设置的超时值 */
timeout_next(base, &tv_p);
} else {
/** 如果有就绪事件尚未处理,则将I/O复用系统调用的超时事件“置0”,这样I/O复用系统调用直接返回,程序也就可以立即处理就绪事件了*/
evutil_timerclear(&tv);
}
/** 如果event_base中没有注册任何事件,则直接退出事件循环 */
if (0==(flags&EVLOOP_NO_EXIT_ON_EMPTY) &&
!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
event_queue_make_later_events_active(base);
clear_time_cache(base);
/** 调用事件多路分发器中的dispatch方法等待事件,将就绪事件插入活动时间队列 */
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
/** 调用event_process_active函数依次处理就绪的信号事件和I/O事件*/
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
/** 事件循环结束,清空时间缓存,并设置停止循环标志 */
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}


总结

Libevent是一个基于Reactor事件模型的I/O处理框架,支持I/O、信号和定时器三种事件,内部使用I/O复用技术,首先将事件处理器的事件添加到事件注册队列中,当有事件发生时,添加到活动事件队列中,之后处理发生的事件。本文是对Libevent执行顺序源码的一个简要分析,如果要更加深入的了解Libevent,还需要对其中的每行代码做以解析。