目录
- 1. Libevent简介
- 2. Libevent事件处理流程
- 3. Libevent常用API接口
- 3.1 地基——event_base
- 3.2 事件——event
- 3.3 循环等待事件
- 3.4 自带 buffer 的事件——bufferevent
- 3.5 链接监听器——evconnlistener
- 3.6 基于event的服务器程序
- 3.7 基于 bufferevent 的服务器和客户端实现
- 4. Libevent的数据结构
- 5. I/O,信号,定时统一原理
- 5.1 I/O
- 5.2 信号
- 5.3 定时
- 6. Libevent+多线程
1. Libevent简介
Libevent是以个基于C语言编写的轻量级的开源高性能网络库,被广泛使用。Libevent本质上是对已有的系统I/O多路复用技术进行了特殊封装,并统一接口实现I/O,定时器和信号事件,主要有以下几个特点:
- Reactor模式,事件驱动,轻量级,性能高;
- 支持I/O多路复用技术,例如select,poll,epoll、kqueue等;
- 跨平台,支持Windows,Linux,BSD 和 Mac Os;
- 支持I/O,定时器和信号等事件,并将它们统一于库内。
2. Libevent事件处理流程
本节先介绍事件处理流程。流程中每一个步骤都可以在第三节中找到对应的API接口执行相应的操作。
- 首先创建libevent实例even_base,准备并初始化 event,设置好事件类型和回调函数;
- 之后添加添加初始化的event到even_base中,开始等待事件发生。对于定时事件,libevent 使用一个小根堆管理,key 为超时时间;对于 Signal 和 I/O 事件,libevent 将其放入到等待链表(双向链表)中;
- 程序调用 event_base_dispatch()系列函数进入无限循环,等待事件,以 select()函数为例;每次循环前 libevent 会检查定时事件的最小超时时间 tv,根据 tv 设置 select()的最大等待时间,以便于后面及时处理超时事件;当 select()返回后,首先检查超时事件,然后检查 I/O 事件。
3. Libevent常用API接口
3.1 地基——event_base
在使用 libevent 的函数之前,需要先申请一个或 event_base 结构,相当于盖房子时的地基。
(1) 通常情况下可以通过 event_base_new 函数获得 event_base 结构。
struct event_base *event_base_new(void);
(2) 申请到 event_base 结构指针可以通过 event_base_free 进行释放。
void event_base_free(struct event_base *);
(3) 如果 fork 出子进程,想在子进程继续使用 event_base,那么子进程需要对 event_base 重新初始化,函数如下:
int event_reinit(struct event_base *base);
3.2 事件——event
Libevent 的事件驱动对应的结构体为 struct event,其主要存在3个状态:
- 非未决:相当于创建了事件,但是事件还没有处于被监听状态,类似于我们使用 epoll 的时候定义了 struct epoll_event ev 并且对 ev 的两个字段进行了赋值,但是此时尚未调用 epoll_ctl。
- 未决:就是对事件开始监听,暂时未有事件产生。相当于调用 epoll_ctl。
- 激活:代表监听的事件已经产生,这时需要处理,相当于我们 epoll 所说的事件就绪。
(1)创建/设置新的event,二者用其一即可:
struct event *event_new(struct event_base *base, evutil_socket_t fd, short events, event_callback_fn cb, void *arg);@base 将要给event关联的event_base
@fd 要监听的文件描述符
@events 要监听的事件,默认触发一次,可用与操作符|同时监听多个事件#define EV_TIMEOUT 0x01 //超时事件 #define EV_READ 0x02 //读事件 #define EV_WRITE 0x04 //写事件 #define EV_SIGNAL 0x08 //信号事件 #define EV_PERSIST 0x10 //周期性触发,(事件触发后事件将会重新被添加)#define EV_ET 0x20 //边缘触发,如果底层模型支持
@cb 回调函数,原型如下: typedef void (*event_callback_fn)(evutil_socket_t fd, short events, void *arg);
void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg) 1.设置事件 ev 绑定的文件描述符或者信号,对于定时事件,设为-1 即可;
2.设置事件类型,比如 EV_READ|EV_PERSIST, EV_WRITE, EV_SIGNAL 等;
3.设置事件的回调函数以及参数 arg;
4.初始化其它字段,比如缺省的 event_base 和优先级;
(2)注册并激活事件,event_base开始监听事件,将非未决态事件转为未决态,相当于调用 epoll_ctl 函数:
int event_add(struct event *ev, const struct timeval *timeout)@ev 为前面第一步中创建的event事件
@timeout 限时等待事件的产生,也可以设置为 NULL,没有限时;如果 tv 不是 NULL,则会同时注册定时事件,将 ev 添加到 timer堆上
(3)删除事件,将事件从未决态变为非未决态,相当于 epoll 的下树(epoll_ctl 调用 EPOLL_CTL_DEL 操作)操作。
int event_del(struct event *ev)@ev 为处理的event事件
(4)删除event事件:
void event_free(struct event *ev)@ev 为处理的event事件,该函数将删除事件 ev,对于 I/O 事件,从 I/O 的
demultiplexer 上将事件注销;对于 Signal事件,将从 Signal 事件链表中
删除;对于定时事件,将从堆上删除
3.3 循环等待事件
(1)调用该函数,相当于没有设置标志位的 event_base_loop。程序将会一直运行,直到没有需要检测的事件了,或者被结束循环的 api 终止。
//最常用
int event_base_dispatch(struct event_base *base);
(2) 上述event_base_dispatch内部也是调用event_base_loop函数完成的。
//不常用
int event_base_loop(struct event_base *base, int flags); flags取值: #define EVLOOP_ONCE 0x01 只触发一次,如果事件没有被触发,阻塞等待 #define EVLOOP_NONBLOCK 0x02 非阻塞方式检测事件是否被触发,不管事件触发与否,都会立即返回
(3)跳出循环或终止循环。两个函数的区别是,而 event_base_loopbreak 会立即终止循环。:
int event_base_loopexit(struct event_base *base, const struct timeval *tv);
//如果正在执行激活事件的回调函数,那么 event_base_loopexit 将在事件回调执行结
//束后终止循环(如果 tv 时间非 NULL,那么将等待 tv 设置的时间后立即结束循环)
struct timeval { long tv_sec; long tv_usec;}int event_base_loopbreak(struct event_base *base);
//如果正在执行激活事件的回调函数,event_base_loopbreak 会立即终止循环
3.4 自带 buffer 的事件——bufferevent
bufferevent 是普通event 的升级版。它的内部有两个缓冲区(用户区),以及一个文件描述符(网络套接字)。一个bufferevent可以同时设置三个回调函数,分别是读回调、写回调、事件回调。用户区的缓冲区与底层(内核)的缓冲区之间的数据交换是自动的。
- 读回调 – 当 bufferevent 将底层读缓冲区的数据读到自身的读缓冲区时触发读事件回调
- 写回调 – 当 bufferevent 将自身写缓冲的数据写到底层写缓冲区的时候出发写事件回调
- 事件回调 – 当 bufferevent 绑定的 socket 连接,断开或者异常的时候触发事件回调
(1)创建一个bufferevent事件:
struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options); @base – 对应根节点
@fd -- 文件描述符
@options – bufferevent 的选项BEV_OPT_CLOSE_ON_FREE -- 释放 bufferevent 自动关闭底层接口 BEV_OPT_THREADSAFE -- 使 bufferevent 能够在多线程下是安全的
(2)设置bufferevent的三个回调函数,设置完后自动注册并开始监听事件:
void bufferevent_setcb(struct bufferevent *bufev, bufferevent_data_cb readcb,
bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg);readcb,writecb,eventcb 分别对应了读回调,写回调,事件回调,cbarg 代表回调函数的参数。 回调函数的原型: typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx); typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short what,void *ctx); What 代表 对应的事件:BEV_EVENT_EOF 对方关闭连接BEV_EVENT_ERROR 出错BEV_EVENT_TIMEOUT 超时BEV_EVENT_CONNECTED 建立连接
(3)创建一个套接字,主动连接服务器,连接成功/断开都会触发bufferevent的事件回调:
int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *serv, int socklen); @bev – 需要提前初始化的 bufferevent 事件(提前创建一个bufferevent) @serv – 对端的 ip 地址(一般指服务器),端口,协议的结构指针 @socklen – 描述 serv 的长度
(4)可设置一个bufferevent的三个回调事件是否生效:
int bufferevent_enable(struct bufferevent *bufev, short event);
int bufferevent_disable(struct bufferevent *bufev, short event);
如果设置为disable,事件回调将不会被触发。event为#define EV_READ 0x02 //读事件
#define EV_WRITE 0x04 //写事件
事件回调也需要读事件被设置为触发
(5)从缓冲区读取数据或写数据到缓冲区:
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
将data的数据写到bufferevent的写缓冲区 int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
将数据写到写缓冲区另外一个写法,实际上bufferevent的内部的两个缓冲区结构就是struct evbuffer。 size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
将bufferevent的读缓冲区数据读到data中,同时将读到的数据从bufferevent的读缓冲清除。 int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);
将bufferevent读缓冲数据读到buf中,接口的另外一种。
(6)释放 bufferevent:
void bufferevent_free(struct bufferevent *bufev);
3.5 链接监听器——evconnlistener
链接监听器封装了底层的 socket 通信相关函数,比如 socket,bind,listen,accept 这几个函数。链接监听器创建后实际上相当于调用了 socket,bind,listen,此时等待新的客户端连接到来,如果有新的客户端连接,那么内部先进行 accept 处理,然后调用用户指定的回调函数。
(1)创建链接监听器,evconnlistener_new_bind 是在当前没有套接字的情况下对链接监听器进行初始化:
struct evconnlistener *evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, const struct sockaddr *sa, int socklen); @base 为event_base结构@cb 是有新连接之后的回调函数,但是注意这个回调函数触发的时候,链接器已经处理好新连接了,并将与新连接通信的描述符交给回调函数。回调函数原型为typedef void (*evconnlistener_cb)(struct evconnlistener *evl, evutil_socket_t fd, struct sockaddr *cliaddr, int socklen, void *ptr); 回调函数 fd 参数会与客户端通信的描述符,并非是等待连接的监听的那个描述符,所以 cliaddr 对应的也是新连接的对端地址信息,已经是 accept 处理好的。@ptr 是回调函数的参数@Flags 需要参考几个值:LEV_OPT_LEAVE_SOCKETS_BLOCKING 文件描述符为阻塞的LEV_OPT_CLOSE_ON_FREE 关闭时自动释放LEV_OPT_REUSEABLE 端口复用LEV_OPT_THREADSAFE 分配锁,线程安全@backlog 是 listen 函数的关键参数(略有不同的是,如果 backlog 是-1,那么监听器会自动选择一个合适的值,如果填 0,那么监听器会认为 listen 函数已经被调用过了)@sa 为bind中的绑定的套接字地址@socklen 为bind中套接字地址结构体的大小
(2)设置链接监听器生效或失效:
int evconnlistener_enable(struct evconnlistener *lev);
使链接监听器生效int evconnlistener_disable(struct evconnlistener *lev);
使链接监听器失效
(3)释放链接监听器:
void evconnlistener_free(struct evconnlistener *lev);
3.6 基于event的服务器程序
#include <event.h>
#include <stdio.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
struct event *readev = NULL;
void readcb(evutil_socket_t fd, short event, void * arg)
{ //处理读事件char buf[256] = { 0 };int ret = read(fd, buf, sizeof(buf)); if (ret < 0){ perror("read err"); close(fd); event_del(readev); } else if (ret == 0){ printf("client closed\n"); close(fd); event_del(readev); } else{ write(fd, buf, ret);//反射}
}
void conncb(evutil_socket_t fd, short event, void * arg)
{ //处理连接struct event_base *base = (struct event_base*)arg; struct sockaddr_in client; socklen_t len = sizeof(client); int cfd = accept(fd, (struct sockaddr*)&client, &len); if (cfd > 0){ //连接成功//需要将新的文件描述符上树readev = event_new(base, cfd, EV_READ | EV_PERSIST, readcb, base); event_add(readev, NULL); }
} int main()
{ int fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serv; bzero(&serv, sizeof(serv)); serv.sin_addr.s_addr = htonl(INADDR_ANY); serv.sin_port = htons(8888); serv.sin_family = AF_INET; bind(fd, (struct sockaddr*)&serv, sizeof(serv)); listen(fd, 120); struct event_base *base = event_base_new();//创建根节点//struct event *event_new(struct event_base *, evutil_socket_t, short, event_callback_fn, void *); struct event *connev = event_new(base, fd, EV_READ | EV_PERSIST, conncb, base); event_add(connev, NULL);//开始监听//循环event_base_dispatch(base); event_base_free(base);//释放event_free(connev); event_free(readev); return 0;
}
3.7 基于 bufferevent 的服务器和客户端实现
(1)基于bufferevent 的服务器
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <ctype.h> static const char MESSAGE[] = "Hello, World!\n";
static const int PORT = 9995;
static void listener_cb(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_readcb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *); int main(int argc, char **argv)
{ struct event_base *base;//根节点定义struct evconnlistener *listener;//监听器定义struct event *signal_event;//信号事件struct sockaddr_in sin; base = event_base_new();//创建根节点if (!base) { fprintf(stderr, "Could not initialize libevent!\n"); return 1; } memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(PORT); //创建监听器-端口复用-关闭自动释放listener = evconnlistener_new_bind(base, listener_cb, (void *)base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!listener) { fprintf(stderr, "Could not create a listener!\n"); return 1; }//定义信号回调事件 -SIGINT signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); //event_add 上树 -开始监听信号事件if (!signal_event || event_add(signal_event, NULL)<0) { fprintf(stderr, "Could not create/add a signal event!\n"); return 1; } //循环等待事件event_base_dispatch(base); //释放链接侦听器evconnlistener_free(listener); event_free(signal_event); event_base_free(base); printf("done\n"); return 0;
} //链接监听器帮助处理了 accept 连接,得到新的文件描述符,作为参数传入
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data)
{ printf("---call------%s----\n",__FUNCTION__); struct event_base *base = user_data; struct bufferevent *bev;//定义 bufferevent 事件//创建新的 bufferevent 事件,对应的与客户端通信的 socket bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);if (!bev) { fprintf(stderr, "Error constructing bufferevent!"); event_base_loopbreak(base); return; } //设置回调函数 只设置了写回调和事件产生回调bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL); //启用读写缓冲区bufferevent_enable(bev, EV_WRITE|EV_READ); //禁用读缓冲//bufferevent_disable(bev, EV_READ); //将 MESSAGE 写到输出缓冲区bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
} //自定义读回调函数
static void conn_readcb(struct bufferevent *bev, void *user_data)
{ printf("---calll-----%s\n",__FUNCTION__); //何时被触发?读入缓冲区有数据的时候,非底层的char buf[256]={0}; size_t ret = bufferevent_read(bev, buf, sizeof(buf)); if(ret > 0){ //转为大写int i; for(i = 0; i < ret ; i ++){ buf[i] = toupper(buf[i]); } //写到 bufferevent 的输出缓冲区bufferevent_write(bev, buf, ret); }
} static void conn_writecb(struct bufferevent *bev, void *user_data)
{ printf("---call------%s----\n",__FUNCTION__); struct evbuffer *output = bufferevent_get_output(bev); if (evbuffer_get_length(output) == 0) { printf("flushed answer\n"); // bufferevent_free(bev); }
} static void conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{ printf("---call------%s----\n",__FUNCTION__); if (events & BEV_EVENT_EOF) { printf("Connection closed.\n"); } else if (events & BEV_EVENT_ERROR) { printf("Got an error on the connection: %s\n", strerror(errno));/*XXX win32*/ } /* None of the other events can happen here, since we haven't enabled * timeouts */ bufferevent_free(bev);
} static void signal_cb(evutil_socket_t sig, short events, void *user_data)
{ printf("---call------%s----\n",__FUNCTION__); struct event_base *base = user_data; struct timeval delay = { 2, 0 };//设置延迟时间 2sprintf("Caught an interrupt signal; exiting cleanly in two seconds.\n"); event_base_loopexit(base, &delay);//延时 2s 退出
}
(2)基于bufferevent 的客户端:实现客户在输入终端输入内容,发送给服务器。客户端主要是建立一个event_base,管理一个event(监听终端输入,与客户终端交流)和一个bufferevent(与服务器建立链接,与服务器交流)。其中,event监听客户端终端标准输入,如果客户输入内容并回车,触发event的写事件。该写事件为往bufferevent写缓冲区中,触发bufferevent的写事件(将内容传输给服务器)。
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
int tcp_connect_server(const char* server_ip, int port);
void cmd_msg_cb(int fd, short events, void* arg);
void server_msg_cb(struct bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
int main(int argc, char** argv)
{ if( argc < 3 ) { //两个参数依次是服务器端的 IP 地址、端口号printf("please input 2 parameter\n"); return -1; } //创建根节点struct event_base *base = event_base_new(); //创建并且初始化 buffer 缓冲区struct bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); //监听终端输入事件struct event* ev_cmd = event_new(base, STDIN_FILENO, EV_READ | EV_PERSIST, cmd_msg_cb, (void*)bev); //开始监听标准输入的读事件event_add(ev_cmd, NULL); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr) ); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(atoi(argv[2])); //将 ip 地址转换为网络字节序inet_aton(argv[1], &server_addr.sin_addr); //连接到 服务器 ip 地址和端口 初始化了 socket 文件描述符bufferevent_socket_connect(bev, (struct sockaddr *)&server_addr, sizeof(server_addr)); //设置 buffer 的回调函数 主要设置了读回调 server_msg_cb ,传入参数是标准输入的读事件bufferevent_setcb(bev, server_msg_cb, NULL, event_cb, (void*)ev_cmd); bufferevent_enable(bev, EV_READ | EV_PERSIST); event_base_dispatch(base); printf("finished \n"); return 0;
} //终端输入回调
void cmd_msg_cb(int fd, short events, void* arg)
{ char msg[1024]; int ret = read(fd, msg, sizeof(msg)); if( ret < 0 ) { perror("read fail "); exit(1); } struct bufferevent* bev = (struct bufferevent*)arg; //把终端的消息发送给服务器端bufferevent_write(bev, msg, ret);
} void server_msg_cb(struct bufferevent* bev, void* arg)
{ char msg[1024]; size_t len = bufferevent_read(bev, msg, sizeof(msg)); msg[len] = '\0'; printf("recv %s from server\n", msg);
} void event_cb(struct bufferevent *bev, short event, void *arg)
{ if (event & BEV_EVENT_EOF) printf("connection closed\n"); else if (event & BEV_EVENT_ERROR) printf("some other error\n"); else if( event & BEV_EVENT_CONNECTED) { printf("the client has connected to server\n"); return ; } //这将自动 close 套接字和 free 读写缓冲区bufferevent_free(bev); //释放 event 事件 监控读终端struct event *ev = (struct event*)arg; event_free(ev);
}
4. Libevent的数据结构
Libevent对事件的管理主要采用了三个双向链表和一个小根堆。三个链表分别存放I/O事件的双向链表,信号事件的双向链表,激活(就绪)事件的双向链表。小根堆用于存放定时器事件。每次当有事件 event 转变为就绪状态时,libevent 就会把它移入到 active event list[priority]中,其中 priority 是 event 的优先级。
event_base为libevent的实例,其中数据结构中主要包含eventop结构体,该结构体主要由若干个函数指针组成。event_base之所以能够管理I/O事件,是因为其对I/O多路复用的封装。event_base的操作都是基于select、poll、epoll等等的接口实现的,本质上就是用eventop结构体中的函数指针保存这些接口,之后通过函数指针实现对相应功能的调用。
struct eventop { const char *name; void *(*init)(struct event_base *); // 初始化 int (*add)(void *, struct event *); // 注册事件 int (*del)(void *, struct event *); // 删除事件 int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分发 void (*dealloc)(struct event_base *, void *); // 注销,释放资源 /* set if we need to reinitialize the event base */ int need_reinit;
};
5. I/O,信号,定时统一原理
5.1 I/O
对于I/O没有什么说的,主要是对系统支持的I/O多路复用模型进行封装,libevent的主要功能也是进行I/O处理。
5.2 信号
为了将信号事件集成到libevent中,libevent借用了一个socket pair,即一个socket对,包含两个socket。一个socket专门负责读,一个socket专门负责写。libevent就对读的socket套接字进行监听。由于信号可以绑定对应的信号处理函数(注册信号),在该信号处理函数中将信号发生标记置1,然后往写套接字里面写入数据,I/O事件发生导致event_base返回,检查是否为信号触发。若为信号触发,则将信号事件event添加到激活链表中,等待调用信号事件回调函数。
5.3 定时
由于系统的 I/O 机制像 select()和 epoll_wait()都允许程序制定一个最大等待时间(也称为最大超时时间)timeout,即使没有 I/O 事件发生,它们也保证能在 timeout 时间内返回。那么根据所有 Timer 事件的最小超时时间来设置系统 I/O 的 timeout 时间;当系统 I/O返回时,再激活所有就绪的 Timer 事件就可以了,这样就能将 Timer 事件完美的融合到系统的 I/O 机制中了。
6. Libevent+多线程
Libevent中,如果只有一个libevent实例,即event_base,多个线程间可能会同时对event_base进行修改,例如同时注册事件或删除事件等等。这就导致多线程不安全。分布式项目Memcached中的网络部分就是基于libevent和多线程完成的,每个工作线程都有一个独立的libevent实例,管理自己任务队列里的任务事件,主线程负责分发任务。