在 Linux Socket 服务器短编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和复用,select,poll 和 epoll 是 Linux API 提供的I/O复用方式。
\ | select | poll | epoll |
---|---|---|---|
操作方式 | 遍历 | 遍历 | 回调 |
底层实现 | 数组 | 链表 | 哈希表 |
IO效率 | 每次调用都进行线性遍历,时间复杂度为O(n) | 每次调用都进行线性遍历,时间复杂度为O(n) | 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪 fd 放到 rdllist 里面。时间复杂度O(1) |
最大连接数 | 1024(x86)或 2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态 | 每次调用 poll,都需要把 fd 集合从用户态拷贝到内核态 | 调用 epoll_ctl 时拷贝进内核并保存,之后每次 epoll_wait 不拷贝 |
select
在一段指定的时间内,监听用户感兴趣的文件描述符上可读、可写和异常等事件。
select 实现多路复用的方式是,将已连接的 Socket 都放到一个文件描述符集合,然后调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。
所以,对于 select 这种方式,需要进行 2 次「遍历」文件描述符集合,一次是在内核态里,一个次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。
select 使用固定长度的 BitsMap,表示文件描述符集合,而且所支持的文件描述符的个数是有限制的,在 Linux 系统中,由内核中的 FD_SETSIZE 限制, 默认最大值为 1024,只能监听 0~1023 的文件描述符。
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。
参数:
nfds
: 最大的文件描述符加1。
readfds
: 用于检查可读的。
writefds
:用于检查可写性。
exceptfds
:用于检查异常的数据。
timeout
:一个指向 timeval
结构的指针,用于决定 select
等待 I/O
的最长时间。如果为空将一直等待。
timeval
结构的定义:
struct timeval{long tv_sec; // secondslong tv_usec; // microseconds
}
返回值: >0
是已就绪的文件句柄的总数, =0
超时,<0
表示出错,错误: errno
void FD_CLR(int fd, fd_set *set); // 把文件描述符集合里fd清0
int FD_ISSET(int fd, fd_set *set); // 测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set); // 把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0
传统 select/pol l的另一个致命弱点就是当你拥有一个很大的 socket 集合,由于网络得延时,使得任一时间只有部分的 socket 是"活跃" 的,而 select/poll 每次调用都会线性扫描全部的集合,导致效率呈现线性下降。
但是 epoll 不存在这个问题,它只会对"活跃"的 socket 进行操作—这是因为在内核实现中 epoll 是根据每个 fd 上面的 callback 函数实现的。于是,只有"活跃"的 socket 才会主动去调用 callback函数,其他idle 状态的 socket 则不会,在这点上,epoll实现了一个 "伪"AIO,因为这时候推动力在os内核。
示例
select_server.c
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>int main()
{int server_sockfd, client_sockfd;int server_len, client_len;struct sockaddr_in server_address;struct sockaddr_in client_address;int result;fd_set readfds, testfds;server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立服务器端socket server_address.sin_family = AF_INET;server_address.sin_addr.s_addr = htonl(INADDR_ANY);server_address.sin_port = htons(9000);server_len = sizeof(server_address);bind(server_sockfd, (struct sockaddr*)&server_address, server_len);listen(server_sockfd, 5); //监听队列最多容纳5个 FD_ZERO(&readfds);FD_SET(server_sockfd, &readfds);//将服务器端socket加入到集合中while (1){char ch;int fd;int nread;testfds = readfds;//将需要监视的描述符集copy到select查询队列中,select会对其修改,所以一定要分开使用变量 printf("server waiting\n");/*无限期阻塞,并测试文件描述符变动 */result = select(FD_SETSIZE, &testfds, (fd_set*)0, (fd_set*)0, (struct timeval*)0); //FD_SETSIZE:系统默认的最大文件描述符if (result < 1){perror("server5");exit(1);}/*扫描所有的文件描述符*/for (fd = 0; fd < FD_SETSIZE; fd++){/*找到相关文件描述符*/if (FD_ISSET(fd, &testfds)){/*判断是否为服务器套接字,是则表示为客户请求连接。*/if (fd == server_sockfd){client_len = sizeof(client_address);client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address, &client_len);FD_SET(client_sockfd, &readfds);//将客户端socket加入到集合中printf("adding client on fd %d\n", client_sockfd);}/*客户端socket中有数据请求时*/else{ioctl(fd, FIONREAD, &nread);//取得数据量交给nread/*客户数据请求完毕,关闭套接字,从集合中清除相应描述符 */if (nread == 0){close(fd);FD_CLR(fd, &readfds); //去掉关闭的fdprintf("removing client on fd %d\n", fd);}/*处理客户数据请求*/else{read(fd, &ch, 1);sleep(5);printf("serving client on fd %d\n", fd);ch++;write(fd, &ch, 1);}}}}}return 0;
}
select_client.c
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/time.h>int main()
{int client_sockfd;int len;struct sockaddr_in address;//服务器端网络地址结构体 int result;char ch = 'A';client_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立客户端socket address.sin_family = AF_INET;address.sin_addr.s_addr = inet_addr("127.0.0.1");address.sin_port = htons(9000);len = sizeof(address);result = connect(client_sockfd, (struct sockaddr*)&address, len);if (result == -1){perror("oops: client2");exit(1);}//第一次读写write(client_sockfd, &ch, 1);read(client_sockfd, &ch, 1);printf("the first time: char from server = %c\n", ch);sleep(5);//第二次读写write(client_sockfd, &ch, 1);read(client_sockfd, &ch, 1);printf("the second time: char from server = %c\n", ch);close(client_sockfd);return 0;
}
poll
poll 不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限制,当然还会受到系统文件描述符限制。
但是 poll 和 select 并没有太大的本质区别,都是使用 「线性结构」 存储进程关注的 Socket 集合,因此都需要遍历文件描述符集合来找到可读或可写的 Socket,时间复杂度为 O(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能的损耗会呈指数级增长。
poll 和select 区别: select 有文件句柄上线设置,值为 FD_SETSIZE,而poll 理论上没有限制!
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
输入参数:
fds
:可以传递多个结构体,也就是说可以监测多个驱动设备所产生的事件,只要有一个产生了请求事件,就能立即返回。
struct pollfd {int fd; /*文件描述符 open打开的那个*/short events; /*请求的事件类型,监视驱动文件的事件掩码*/ POLLIN | POLLOUTshort revents; /*驱动文件实际返回的事件*/
}
nfds
:监测驱动文件的个数。
timeout
:超时时间,单位是ms。
事件类型 events
可以为下列值:
POLLIN
有数据可读
POLLRDNORM
有普通数据可读,等效与POLLIN
POLLPRI
有紧迫数据可读
POLLOUT
写数据不会导致阻塞
POLLER
指定的文件描述符发生错误
POLLHUP
指定的文件描述符挂起事件
POLLNVAL
无效的请求,打不开指定的文件描述符
返回值:
有事件发生 返回revents
域不为0
的文件描述符个数
超时:return 0
失败:return -1
错误:errno
示例
poll_server.c
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>#define MAX_FD 8192
struct pollfd fds[MAX_FD];
int cur_max_fd = 0;int main()
{int server_sockfd, client_sockfd;int server_len, client_len;struct sockaddr_in server_address;struct sockaddr_in client_address;int result;//fd_set readfds, testfds;server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立服务器端socketserver_address.sin_family = AF_INET;server_address.sin_addr.s_addr = htonl(INADDR_ANY);server_address.sin_port = htons(9000);server_len = sizeof(server_address);bind(server_sockfd, (struct sockaddr*)&server_address, server_len);listen(server_sockfd, 5); //监听队列最多容纳5个//FD_ZERO(&readfds);//FD_SET(server_sockfd, &readfds);//将服务器端socket加入到集合中fds[server_sockfd].fd = server_sockfd;fds[server_sockfd].events = POLLIN;fds[server_sockfd].revents = 0;if(cur_max_fd <= server_sockfd){cur_max_fd = server_sockfd + 1;}while (1){char ch;int i, fd;int nread;//testfds = readfds;//将需要监视的描述符集copy到select查询队列中,select会对其修改,所以一定要分开使用变量printf("server waiting\n");/*无限期阻塞,并测试文件描述符变动 */result = poll(fds, cur_max_fd, 1000);//result = select(FD_SETSIZE, &testfds, (fd_set*)0, (fd_set*)0, (struct timeval*)0); //FD_SETSIZE:系统默认的最大文件描述符if (result < 0){perror("server5");exit(1);}/*扫描所有的文件描述符*/for (i = 0; i < cur_max_fd; i++){/*找到相关文件描述符*/if (fds[i].revents){fd = fds[i].fd;/*判断是否为服务器套接字,是则表示为客户请求连接。*/if (fd == server_sockfd){client_len = sizeof(client_address);client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address, &client_len);fds[client_sockfd].fd = client_sockfd;//将客户端socket加入到集合中fds[client_sockfd].events = POLLIN;fds[client_sockfd].revents = 0;if(cur_max_fd <= client_sockfd){cur_max_fd = client_sockfd + 1;}printf("adding client on fd %d\n", client_sockfd);//fds[server_sockfd].events = POLLIN;}/*客户端socket中有数据请求时*/else{//ioctl(fd, FIONREAD, &nread);//取得数据量交给nreadnread = read(fd, &ch, 1);/*客户数据请求完毕,关闭套接字,从集合中清除相应描述符 */if (nread == 0){close(fd);memset(&fds[i], 0, sizeof(struct pollfd)); //去掉关闭的fdprintf("removing client on fd %d\n", fd);}/*处理客户数据请求*/else{//read(fds[fd].fd, &ch, 1);sleep(5);printf("serving client on fd %d, read: %c\n", fd, ch);ch++;write(fd, &ch, 1);//fds[fd].events = POLLIN;}}}}}return 0;
}
epoll
epoll 可以理解为 event poll (基于事件的轮询)
使用场合:
- 当客户处理多个描述符时(一般是交互式输入和网络套接口),必须使用I/O复用。
- 当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
- 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
- 如果一个服务器既要处理TCP,又要处理UDP,一般要使用I/O复用。
- 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
Reactor 设计模式
reactor 是一种事件驱动的反应堆模式,高效的事件处理模型
reactor 反应堆:事件来了,执行,事件类型可能不尽相同,所以我们需要提前注册好不同的事件处理函数;事件到来就由 epoll_wait
获取同时到来的多个事件,并且根据数据的不同类型将事件分发给事件处理机制 (事件处理器), 也就是我们提前注册的那些接口函数。
reactor 模型的设计思想和思维方式: 它需要的是事件驱动,相应的事件发生,我们需要根据事件自动的调用相应的函数,所以我们需要提前注册好处理函数的接口到 reactor 中, 函数是由 reactor 去调用的,而不是再主函数中直接进行调用的,所以需要使用回调函数。
Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O;
中心思想是将所有要处理的 I/O 事件注册到一个 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
流程
1.注册事件和对应的事件处理器;
2.多路复用器等待事件到来;
3.事件到来,激发事件分发器分发事件到对应的处理器;
4.事件处理器处理事件,然后注册新的事件, (比如处理读事件,处理完成之后需要将其设置为写事件再注册,因为读取之后我们需要针对业务需求进行数据处理,之后将其send 回去响应客户端结果,所以自然需要改成写事件,也就需要重新注册)
多路复用器 :由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
事件分发器 :将多路复用器中返回的就绪事件分到对应的处理函数中,分发给事件处理器。
事件处理器 :处理对应的IO事件。
Reactor优点:
1)响应快,不必为单个同步事件所阻塞,虽然 Reactor 本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
4)可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;
I/O 多路复用
1.创建 EPOLL 句柄
int epoll_create(int size);
2.向 EPOLL 对象中添加、修改或者删除感兴趣的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
op 取值:
EPOLL_CTL_ADD
添加新的事件到epoll中
EPOLL_CTL_MOD
修改EPOLL中的事件
EPOLL_CTL_DEL
删除epoll中的事件
events 取值:
EPOLLIN
表示有数据可以读出(接受连接、关闭连接)
EPOLLOUT
表示连接可以写入数据发送(向服务器发起连接,连接成功事件)
EPOLLERR
表示对应的连接发生错误
EPOLLHUP
表示对应的连接被挂起
3.收集在epoll监控的事件中已经发生的事件
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
参数:
epfd
: epoll 的描述符。
events
:分配好的 epoll_event 结构体数组,epoll 将会把发生的事件复制到 events 数组中(events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。
struct epoll_event{__uint32_t events;epoll_data_t data;
}
typedef union epoll_data{void *ptr;int fd;uint32_t u32;uint64_t u64;
}epoll_data_t
maxevents
: 本次可以返回的最大事件数目,通常 maxevents
参数与预分配的 events
数组的大小是相等的。
timeout
: 表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout
为0
,立刻返回,不会等待。-1
表示无限期阻塞。
epoll 支持两种事件触发模式:边缘触发(edge-triggered,ET)和水平触发(level-triggered,LT)。
边缘触发:当被监控的文件描述符上有可读写事件发生时,epoll_wait()
会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait(
)时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!
这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!
设置方式: stat->_ev.events = EPOLLIN | EPOLLET
水平触发:当被监控的文件描述符上有可读写事件发生时,epoll_wait()
会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()
时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!
如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
两者的区别:水平触发的意思是只要满足事件的条件,比如内核中有数据需要读,就一直不断地把这个事件传递给用户;而边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。
epoll为什么高效?
1、红黑树提高 epoll 事件增删查改效率;
2、回调通知机制;
当epoll监听套接字有数据读或者写时,会通过注册到socket的回调函数通知epoll,epoll检测到事件后,将事件存储在就绪队列(rdllist)。
就绪队列
epoll_wait 返回成功后,会将所有就绪事件存储在事件数组,用户不需要进行无效的轮询,从而提高了效率。
示例1:使用 epoll 实现简单的 web 服务器
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>// int fd;
typedef struct _ConnectStat ConnectStat;typedef void(*response_handler) (ConnectStat * stat);struct _ConnectStat {int fd;char name[64];char age[64];struct epoll_event _ev;int status;//0 -未登录 1 - 已登陆response_handler handler;//不同页面的处理函数
};//http协议相关代码
ConnectStat * stat_init(int fd);
void connect_handle(int new_fd);
void do_http_respone(ConnectStat * stat);
void do_http_request(ConnectStat * stat);
void welcome_response_handler(ConnectStat * stat);
void commit_respone_handler(ConnectStat * stat);const char *main_header = "HTTP/1.0 200 OK\r\nServer: Martin Server\r\nContent-Type: text/html\r\nConnection: Close\r\n";static int epfd = 0;void usage(const char* argv)
{printf("%s:[ip][port]\n", argv);
}void set_nonblock(int fd)
{int fl = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}int startup(char* _ip, int _port) //创建一个套接字,绑定,检测服务器
{//sock//1.创建套接字int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){perror("sock");exit(2);}int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));//2.填充本地 sockaddr_in 结构体(设置本地的IP地址和端口)struct sockaddr_in local;local.sin_port = htons(_port);local.sin_family = AF_INET;local.sin_addr.s_addr = inet_addr(_ip);//3.bind()绑定if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0){perror("bind");exit(3);}//4.listen()监听 检测服务器if (listen(sock, 5) < 0){perror("listen");exit(4);}//sleep(1000);return sock; //这样的套接字返回
}int main(int argc, char *argv[])
{if (argc != 3) //检测参数个数是否正确{usage(argv[0]);exit(1);}int listen_sock = startup(argv[1], atoi(argv[2])); //创建一个绑定了本地 ip 和端口号的套接字描述符//1.创建epoll epfd = epoll_create(256); //可处理的最大句柄数256个if (epfd < 0){perror("epoll_create");exit(5);}struct epoll_event _ev; //epoll结构填充 ConnectStat * stat = stat_init(listen_sock);_ev.events = EPOLLIN; //初始关心事件为读_ev.data.ptr = stat;//_ev.data.fd = listen_sock; // //2.托管epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &_ev); //将listen sock添加到epfd中,关心读事件struct epoll_event revs[64];int timeout = -1;int num = 0;int done = 0;while (!done){//epoll_wait()相当于在检测事件switch ((num = epoll_wait(epfd, revs, 64, timeout))) //返回需要处理的事件数目 64表示 事件有多大{case 0: //返回0 ,表示监听超时printf("timeout\n");break;case -1: //出错perror("epoll_wait");break;default: //大于零 即就是返回了需要处理事件的数目{struct sockaddr_in peer;socklen_t len = sizeof(peer);int i;for (i = 0; i < num; i++){ConnectStat * stat = (ConnectStat *)revs[i].data.ptr;int rsock = stat->fd; //准确获取哪个事件的描述符if (rsock == listen_sock && (revs[i].events) && EPOLLIN) //如果是初始的 就接受,建立链接{int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);if (new_fd > 0){printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));//sleep(1000);connect_handle(new_fd);}}else // 接下来对num - 1 个事件处理{if (revs[i].events & EPOLLIN){do_http_request((ConnectStat *)revs[i].data.ptr);}else if (revs[i].events & EPOLLOUT){do_http_respone((ConnectStat *)revs[i].data.ptr);}else{}}}}break;}//end switch}//end whilereturn 0;
}ConnectStat * stat_init(int fd) {ConnectStat * temp = NULL;temp = (ConnectStat *)malloc(sizeof(ConnectStat));if (!temp) {fprintf(stderr, "malloc failed. reason: %m\n");return NULL;}memset(temp, '\0', sizeof(ConnectStat));temp->fd = fd;temp->status = 0;//temp->handler = welcome_response_handler;}//初始化连接,然后等待浏览器发送请求
void connect_handle(int new_fd) {ConnectStat *stat = stat_init(new_fd);set_nonblock(new_fd);stat->_ev.events = EPOLLIN;stat->_ev.data.ptr = stat;epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &stat->_ev); //二次托管}void do_http_respone(ConnectStat * stat) {stat->handler(stat);
}void do_http_request(ConnectStat * stat) {//读取和解析http 请求char buf[4096];char * pos = NULL;//while header \r\n\r\ndatassize_t _s = read(stat->fd, buf, sizeof(buf) - 1);if (_s > 0){buf[_s] = '\0';printf("receive from client:%s\n", buf);pos = buf;//Demo 仅仅演示效果,不做详细的协议解析if (!strncasecmp(pos, "GET", 3)) {stat->handler = welcome_response_handler;}else if (!strncasecmp(pos, "Post", 4)) {//获取 uriprintf("---Post----\n");pos += strlen("Post");while (*pos == ' ' || *pos == '/') ++pos;if (!strncasecmp(pos, "commit", 6)) {//获取名字和年龄int len = 0;printf("post commit --------\n");pos = strstr(buf, "\r\n\r\n");char *end = NULL;if (end = strstr(pos, "name=")) {pos = end + strlen("name=");end = pos;while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9')) end++;len = end - pos;if (len > 0) {memcpy(stat->name, pos, end - pos);stat->name[len] = '\0';}}if (end = strstr(pos, "age=")) {pos = end + strlen("age=");end = pos;while ('0' <= *end && *end <= '9') end++;len = end - pos;if (len > 0) {memcpy(stat->age, pos, end - pos);stat->age[len] = '\0';}}stat->handler = commit_respone_handler;}else {stat->handler = welcome_response_handler;}}else {stat->handler = welcome_response_handler;}//生成处理结果 html ,writestat->_ev.events = EPOLLOUT;//stat->_ev.data.ptr = stat;epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev); //二次托管}else if (_s == 0) //client:close{printf("client: %d close\n", stat->fd);epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);close(stat->fd);free(stat);}else{perror("read");}}void welcome_response_handler(ConnectStat * stat) {const char * welcome_content = "\<html lang=\"zh-CN\">\n\<head>\n\<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\<title>This is a test</title>\n\</head>\n\<body>\n\<div align=center height=\"500px\" >\n\<br/><br/><br/>\n\<h2>大家好,欢迎来到奇牛学院VIP 课!</h2><br/><br/>\n\<form action=\"commit\" method=\"post\">\n\尊姓大名: <input type=\"text\" name=\"name\" />\n\<br/>芳龄几何: <input type=\"password\" name=\"age\" />\n\<br/><br/><br/><input type=\"submit\" value=\"提交\" />\n\<input type=\"reset\" value=\"重置\" />\n\</form>\n\</div>\n\</body>\n\</html>";char sendbuffer[4096];char content_len[64];strcpy(sendbuffer, main_header);snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", (int)strlen(welcome_content));strcat(sendbuffer, content_len);strcat(sendbuffer, welcome_content);printf("send reply to client \n%s", sendbuffer);write(stat->fd, sendbuffer, strlen(sendbuffer));stat->_ev.events = EPOLLIN;//stat->_ev.data.ptr = stat;epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);}void commit_respone_handler(ConnectStat * stat) {const char * commit_content = "\<html lang=\"zh-CN\">\n\<head>\n\<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\<title>This is a test</title>\n\</head>\n\<body>\n\<div align=center height=\"500px\" >\n\<br/><br/><br/>\n\<h2>欢迎学霸同学 %s ,你的芳龄是 %s!</h2><br/><br/>\n\</div>\n\</body>\n\</html>\n";char sendbuffer[4096];char content[4096];char content_len[64];int len = 0;len = snprintf(content, 4096, commit_content, stat->name, stat->age);strcpy(sendbuffer, main_header);snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", len);strcat(sendbuffer, content_len);strcat(sendbuffer, content);printf("send reply to client \n%s", sendbuffer);write(stat->fd, sendbuffer, strlen(sendbuffer));stat->_ev.events = EPOLLIN;//stat->_ev.data.ptr = stat;epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}
示例2:使用 epoll 封装的库的使用
globals.h
#ifndef GLOBALS_H
#define GLOBALS_H#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <errno.h>
#include <fcntl.h>
#include <assert.h>#define FD_DESC_SZ 64#define COMM_OK (0)
#define COMM_ERROR (-1)
#define COMM_NOMESSAGE (-3)
#define COMM_TIMEOUT (-4)
#define COMM_SHUTDOWN (-5)
#define COMM_INPROGRESS (-6)
#define COMM_ERR_CONNECT (-7)
#define COMM_ERR_DNS (-8)
#define COMM_ERR_CLOSING (-9)//调试相关
#define DEBUG_LEVEL 0
#define DEBUG_ONLY 8
#define debug(m, n) if( m >= DEBUG_LEVEL && n <= DEBUG_ONLY ) printf#define safe_free(x) if (x) { free(x); x = NULL; }typedef void PF(int, void *);struct _fde {unsigned int type;__u_short local_port;__u_short remote_port;struct in_addr local_addr;char ipaddr[16]; /* dotted decimal address of peer */PF *read_handler;void *read_data;PF *write_handler;void *write_data;PF *timeout_handler;time_t timeout;void *timeout_data;
};typedef struct _fde fde;extern fde *fd_table;
extern int Biggest_FD; /*系统时间相关,设置成全局变量,供所有模块使用*/
extern struct timeval current_time;
extern double current_dtime;
extern time_t sys_curtime;/* epoll 相关接口实现 */
extern void do_epoll_init(int max_fd);
extern void do_epoll_shutdown();
extern void epollSetEvents(int fd, int need_read, int need_write);
extern int do_epoll_select(int msec);/*框架外围接口*/
void comm_init(int max_fd);
extern int comm_select(int msec);
extern inline void comm_call_handlers(int fd, int read_event, int write_event);
void commUpdateReadHandler(int fd, PF * handler, void *data);
void commUpdateWriteHandler(int fd, PF * handler, void *data);extern const char *xstrerror(void);
int ignoreErrno(int ierrno);#endif /* GLOBALS_H */
comm_epoll.c
#include "globals.h"
#include <sys/epoll.h>#define MAX_EVENTS 256 /* 一次处理的最大的事件 *//* epoll structs */
static int kdpfd;
static struct epoll_event events[MAX_EVENTS];
static int epoll_fds = 0;
static unsigned *epoll_state; /* 保存每个epoll 的事件状态 */static const char *
epolltype_atoi(int x)
{switch (x) {case EPOLL_CTL_ADD:return "EPOLL_CTL_ADD";case EPOLL_CTL_DEL:return "EPOLL_CTL_DEL";case EPOLL_CTL_MOD:return "EPOLL_CTL_MOD";default:return "UNKNOWN_EPOLLCTL_OP";}
}void do_epoll_init(int max_fd)
{kdpfd = epoll_create(max_fd);if (kdpfd < 0)fprintf(stderr,"do_epoll_init: epoll_create(): %s\n", xstrerror());//fd_open(kdpfd, FD_UNKNOWN, "epoll ctl");//commSetCloseOnExec(kdpfd);epoll_state = calloc(max_fd, sizeof(*epoll_state));
}void do_epoll_shutdown()
{close(kdpfd);kdpfd = -1;safe_free(epoll_state);
}void epollSetEvents(int fd, int need_read, int need_write)
{int epoll_ctl_type = 0;struct epoll_event ev;assert(fd >= 0);debug(5, 8) ("commSetEvents(fd=%d)\n", fd);memset(&ev, 0, sizeof(ev));ev.events = 0;ev.data.fd = fd;if (need_read)ev.events |= EPOLLIN;if (need_write)ev.events |= EPOLLOUT;if (ev.events)ev.events |= EPOLLHUP | EPOLLERR;if (ev.events != epoll_state[fd]) {/* If the struct is already in epoll MOD or DEL, else ADD */if (!ev.events) {epoll_ctl_type = EPOLL_CTL_DEL;} else if (epoll_state[fd]) {epoll_ctl_type = EPOLL_CTL_MOD;} else {epoll_ctl_type = EPOLL_CTL_ADD;}/* Update the state */epoll_state[fd] = ev.events;if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0) {debug(5, 1) ("commSetEvents: epoll_ctl(%s): failed on fd=%d: %s\n",epolltype_atoi(epoll_ctl_type), fd, xstrerror());}switch (epoll_ctl_type) {case EPOLL_CTL_ADD:epoll_fds++;break;case EPOLL_CTL_DEL:epoll_fds--;break;default:break;}}
}int do_epoll_select(int msec)
{int i;int num;int fd;struct epoll_event *cevents;/*if (epoll_fds == 0) {assert(shutting_down);return COMM_SHUTDOWN;}statCounter.syscalls.polls++;*/num = epoll_wait(kdpfd, events, MAX_EVENTS, msec);if (num < 0) {getCurrentTime();if (ignoreErrno(errno))return COMM_OK;debug(5, 1) ("comm_select: epoll failure: %s\n", xstrerror());return COMM_ERROR;}//statHistCount(&statCounter.select_fds_hist, num);if (num == 0)return COMM_TIMEOUT;for (i = 0, cevents = events; i < num; i++, cevents++) {fd = cevents->data.fd;comm_call_handlers(fd, cevents->events & ~EPOLLOUT, cevents->events & ~EPOLLIN);}return COMM_OK;
}
comm.c
#include "globals.h"double current_dtime;
time_t sys_curtime;
struct timeval current_time;int Biggest_FD = 1024; /*默认的最大文件描述符数量 1024*/
static int MAX_POLL_TIME = 1000; /* see also comm_quick_poll_required() */
fde *fd_table = NULL; time_t getCurrentTime(void)
{gettimeofday(¤t_time, NULL);current_dtime = (double) current_time.tv_sec +(double) current_time.tv_usec / 1000000.0;return sys_curtime = current_time.tv_sec;
}void
comm_close(int fd)
{assert(fd>0);fde *F = &fd_table[fd];if(F) memset((void *)F,'\0',sizeof(fde));epollSetEvents(fd, 0, 0);close(fd);
}void
comm_init(int max_fd)
{if(max_fd > 0 ) Biggest_FD = max_fd;fd_table = calloc(Biggest_FD, sizeof(fde));do_epoll_init(Biggest_FD);
}void
comm_select_shutdown(void)
{do_epoll_shutdown();if(fd_table) free(fd_table);
}//static int comm_select_handled;inline void
comm_call_handlers(int fd, int read_event, int write_event)
{fde *F = &fd_table[fd];debug(5, 8) ("comm_call_handlers(): got fd=%d read_event=%x write_event=%x F->read_handler=%p F->write_handler=%p\n",fd, read_event, write_event, F->read_handler, F->write_handler);if (F->read_handler && read_event) {PF *hdl = F->read_handler;void *hdl_data = F->read_data;/* If the descriptor is meant to be deferred, don't handle */debug(5, 8) ("comm_call_handlers(): Calling read handler on fd=%d\n", fd);//commUpdateReadHandler(fd, NULL, NULL);hdl(fd, hdl_data);}if (F->write_handler && write_event) {PF *hdl = F->write_handler;void *hdl_data = F->write_data;//commUpdateWriteHandler(fd, NULL, NULL);hdl(fd, hdl_data);}
}int
commSetTimeout(int fd, int timeout, PF * handler, void *data)
{fde *F;debug(5, 3) ("commSetTimeout: FD %d timeout %d\n", fd, timeout);assert(fd >= 0);assert(fd < Biggest_FD);F = &fd_table[fd];if (timeout < 0) {F->timeout_handler = NULL;F->timeout_data = NULL;return F->timeout = 0;}assert(handler || F->timeout_handler);if (handler || data) {F->timeout_handler = handler;F->timeout_data = data;}return F->timeout = sys_curtime + (time_t) timeout;
}void
commUpdateReadHandler(int fd, PF * handler, void *data)
{fd_table[fd].read_handler = handler;fd_table[fd].read_data = data;epollSetEvents(fd,1,0);
}void
commUpdateWriteHandler(int fd, PF * handler, void *data)
{fd_table[fd].write_handler = handler;fd_table[fd].write_data = data;epollSetEvents(fd,0,1);
}static void
checkTimeouts(void)
{int fd;fde *F = NULL;PF *callback;for (fd = 0; fd <= Biggest_FD; fd++) {F = &fd_table[fd];/*if (!F->flags.open)continue;*/if (F->timeout == 0)continue;if (F->timeout > sys_curtime)continue;debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd);if (F->timeout_handler) {debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd);callback = F->timeout_handler;F->timeout_handler = NULL;callback(fd, F->timeout_data);} else {debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd);comm_close(fd);}}
}int
comm_select(int msec)
{static double last_timeout = 0.0;int rc;double start = current_dtime;debug(5, 3) ("comm_select: timeout %d\n", msec);if (msec > MAX_POLL_TIME)msec = MAX_POLL_TIME;//statCounter.select_loops++;/* Check timeouts once per second */if (last_timeout + 0.999 < current_dtime) {last_timeout = current_dtime;checkTimeouts();} else {int max_timeout = (last_timeout + 1.0 - current_dtime) * 1000;if (max_timeout < msec)msec = max_timeout;}//comm_select_handled = 0;rc = do_epoll_select(msec);getCurrentTime();//statCounter.select_time += (current_dtime - start);if (rc == COMM_TIMEOUT)debug(5, 8) ("comm_select: time out\n");return rc;
}const char *
xstrerror(void)
{static char xstrerror_buf[BUFSIZ];const char *errmsg;errmsg = strerror(errno);if (!errmsg || !*errmsg)errmsg = "Unknown error";snprintf(xstrerror_buf, BUFSIZ, "(%d) %s", errno, errmsg);return xstrerror_buf;
}int
ignoreErrno(int ierrno)
{switch (ierrno) {case EINPROGRESS:case EWOULDBLOCK:
#if EAGAIN != EWOULDBLOCKcase EAGAIN:
#endifcase EALREADY:case EINTR:
#ifdef ERESTARTcase ERESTART:
#endifreturn 1;default:return 0;}/* NOTREACHED */
}
main.c
#include "globals.h"
typedef struct _ConnectStat ConnectStat;#define BUFLEN 1024struct _ConnectStat {int fd;char send_buf[BUFLEN];PF *handler;//不同页面的处理函数
};//echo 服务实现相关代码
ConnectStat * stat_init(int fd);
void accept_connection(int fd, void *data);
void do_echo_handler(int fd, void *data);
void do_echo_response(int fd,void *data);
void do_echo_timeout(int fd, void *data);void usage(const char* argv)
{printf("%s:[ip][port]\n", argv);
}void set_nonblock(int fd)
{int fl = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}int startup(char* _ip, int _port) //创建一个套接字,绑定,检测服务器
{//sock//1.创建套接字int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){perror("sock");exit(2);}int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));//2.填充本地 sockaddr_in 结构体(设置本地的IP地址和端口)struct sockaddr_in local;local.sin_port = htons(_port);local.sin_family = AF_INET;local.sin_addr.s_addr = inet_addr(_ip);//3.bind()绑定if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0){perror("bind");exit(3);}//4.listen()监听 检测服务器if (listen(sock, 5) < 0){perror("listen");exit(4);}return sock; //这样的套接字返回
}ConnectStat * stat_init(int fd) {ConnectStat * temp = NULL;temp = (ConnectStat *)malloc(sizeof(ConnectStat));if (!temp) {fprintf(stderr, "malloc failed. reason: %m\n");return NULL;}memset(temp, '\0', sizeof(ConnectStat));temp->fd = fd;//temp->status = 0;
}void do_welcome_handler(int fd, void *data) {const char * WELCOME= "Welcome.\n";int wlen = strlen(WELCOME);int n ;ConnectStat * stat = (ConnectStat *)(data);if( (n = write(fd, "Welcome.\n",wlen)) != wlen ){if(n<=0){fprintf(stderr, "write failed[len:%d], reason: %s\n",n,strerror(errno));}else fprintf(stderr, "send %d bytes only ,need to send %d bytes.\n",n,wlen);}else {commUpdateReadHandler(fd, do_echo_handler,(void *)stat);commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);}
}void do_echo_handler(int fd, void *data) {ConnectStat * stat = (ConnectStat *)(data);char * p = NULL;assert(stat!=NULL);p = stat->send_buf;*p++ = '-';*p++ = '>';ssize_t _s = read(fd, p, BUFLEN-(p-stat->send_buf)-1); //2字节"->" +字符结束符.if (_s > 0){*(p+_s) = '\0';printf("receive from client: %s\n", p);//_s--;//while( _s>=0 && ( stat->send_buf[_s]=='\r' || stat->send_buf[_s]=='\n' ) ) stat->send_buf[_s]='\0';if(!strncasecmp(p, "quit", 4)){//退出.comm_close(fd);free(stat);return ;}//write(fd,commUpdateWriteHandler(fd, do_echo_response, (void *)stat);commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);}else if (_s == 0) //client:close{fprintf(stderr,"Remote connection[fd: %d] has been closed\n", fd);comm_close(fd);free(stat);}else //err occurred.{fprintf(stderr,"read faield[fd: %d], reason:%s [%d]\n",fd , strerror(errno), _s);}
}void do_echo_response(int fd, void *data) {ConnectStat * stat = (ConnectStat *)(data);int len = strlen(stat->send_buf);int _s = write(fd, stat->send_buf, len);if(_s>0){commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);commUpdateReadHandler(fd, do_echo_handler, (void *)stat);}else if(_s==0){fprintf(stderr,"Remote connection[fd: %d] has been closed\n", fd);comm_close(fd);free(stat);}else {fprintf(stderr,"read faield[fd: %d], reason:%s [%d]\n",fd ,_s ,strerror(errno));}
}//read()
//注册写事件//写事件就绪
//write()void accept_connection(int fd, void *data){struct sockaddr_in peer;socklen_t len = sizeof(peer);ConnectStat * stat = (ConnectStat *)data;int new_fd = accept(fd, (struct sockaddr*)&peer, &len);if (new_fd > 0){ConnectStat *stat = stat_init(new_fd);set_nonblock(new_fd);printf("new client: %s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));commUpdateWriteHandler(new_fd, do_welcome_handler, (void *)stat);commSetTimeout(new_fd, 30,do_echo_timeout, (void *)stat);}
}void do_echo_timeout(int fd, void *data){fprintf(stdout,"---------timeout[fd:%d]----------\n",fd);comm_close(fd);free(data);
}int main(int argc,char **argv){if (argc != 3) //检测参数个数是否正确{usage(argv[0]);exit(1);}int listen_sock = startup(argv[1], atoi(argv[2])); //创建一个绑定了本地 ip 和端口号的套接字描述符//初始化异步事件处理框架epollcomm_init(102400);ConnectStat * stat = stat_init(listen_sock);commUpdateReadHandler(listen_sock,accept_connection,(void *)stat);do{//不断循环处理事件comm_select(1000);}while(1==1);comm_select_shutdown();
}
参考:
Select、Poll、Epoll详解
小林coding-I/O 多路复用
epoll高度封装reactor,几乎所有可见服务器的底层框架