🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
目录
- 👉多路转接之select👈
- 初识select
- select函数原型
- socket就绪条件
- select的基本工作流程
- select服务器
- select服务器的优缺点
- select服务器的应用场景
- 👉多路转接之poll👈
- poll函数原型
- poll服务器
- poll服务器的优缺点
- 👉多路转接之epoll👈
- epoll初识
- epoll的相关系统调用
- epoll的工作原理
- epoll服务器
- epoll的优点
- epoll的工作方式
- 对比LT和ET
- epoll的使用场景
- 👉总结👈
👉多路转接之select👈
初识select
IO 操作等于 “等” + “数据拷贝”,而 select 函数就是帮助用户进行一次等待多个文件描述符。当文件描述符就绪了,select 函数就会通知用户就绪的文件有哪些,然后用户调用 read / recvfrom / recv 进行 “数据拷贝”。
select函数原型
select 函数是一个 I / O 多路复用函数,它可以同时监视多个文件描述符的可读、可写或异常状态,从而允许程序等待任何一个文件描述符准备好进行读写操作。
select 函数的基本原理是,将需要监视的文件描述符集合放在一个 fd_set 数据结构中,然后调用 select 函数等待这些文件描述符的状态变化。select 函数会一直阻塞直到文件描述符集合中有一个或多个文件描述符准备好进行读写操作或者发生错误。
select函数的函数原型如下:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
select 函数说明:
- nfds:监视的最大文件描述符 + 1,一般是文件描述符集中最大文件描述符 + 1。
- 如果 select 函数的返回值大于 0 表示就绪文件描述符的数量,可以通过对文件描述符集合进行轮询来确定是哪些文件描述符就绪。如果返回 -1 则表示出现错误,错误码 errno 被设置。如果返回 0 则表示超时。
- 错误码的可能取值:EBADF 文件描述符为无效的或该文件已关闭;EINTR 此调用被信号所中断;EINVAL 参数 nfds 为负值或者 timeout 为无效值;ENOMEM 核心内存不足。
- readfds、writefds、exceptfds 和 timeout 都是输入输出型参数。
- select 函数可以等待多个文件描述符,等待策略通过 timeout 来进行设置。如果 timeout 为 nullptr,select 将会进行阻塞式等待;如果 timeout 为 {0, 0},select 将会进行非阻塞式等待;如果 timeout 为 {5, 0},前 5 秒内,select 会进行阻塞式等待,5 秒后,进行非阻塞式等待。如果在 2 秒的时候有文件描述符就绪了,那么 timeout 就会体现其输出型参数的功能,timeout 为被设置为 {3, 0},表示距离下一次 timeout 还剩余 3 秒的时间。
- 非阻塞式等待就是没有文件描述符就绪时,立马返回。
- readfds:读文件描述符集合;writefds:写文件描述符集合;exceptfds:异常文件描述符集合。这三个参数作为输入参数时,所表示的意思是内核需要帮助用户关系这些描述符的读事件、写事件或异常时间;作为输出型参数时,所表示的意思是内核告诉用户,哪些文件描述符的哪类事件已经就绪了。
- 如果用户不关心文件描述符的异常事件,可以将 exceptfds 为 nullptr,其余事件同理。
- fd_set 是文件描述符集合,其本质是一个位图结构,比特位从低位到高位就可以表示不同文件描述符。不能直接通过按位与等操作进行位图结构的修改,需要通过系统提供的接口来修改位图结构。
认识 struct time 类型
struct timeval
{long tv_sec; // 秒long tv_usec; // 微妙
};
#include <ctime>
#include <cassert>
#include <iostream>
#include <sys/time.h>
#include <unistd.h>using namespace std;int main()
{while (true){cout << "time: " << (unsigned long)time(nullptr) << endl;struct timeval tv = {0, 0};int n = gettimeofday(&tv, nullptr);cout << "gettimeofday: " << tv.tv_sec << "." << tv.tv_usec << endl;sleep(1);}return 0;
}
fd_set 类型的定义
其实这个结构就是一个整数数组,更严格的说,是一个位图,使用位图中对应的位来表示要监视的文件描述符。
#include <sys/select.h>
#include <iostream>int main()
{std::cout << sizeof(fd_set) * 8 << std::endl;return 0;
}
说明:可监控的文件描述符个数取决与 sizeof(fd_set) 的值,其中 sizeof 求的是字节数,所以要乘上 8 表示比特位的数目。因此,我的服务器支持的最大文件描述符数是 1024,不同的服务器可能会有点不一样。
修改 fd_set 位图结构的函数
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
- FD_ZERO:清空位图结构,将所有文件描述符对应的比特位设置成 0。
- FD_SET:将指定的文件描述符对应的比特位设置成 1。
- FD_CLR:将指定的文件描述符对应的比特位设置成 0。
- FD_ISSET:判断指定的文件描述符对应比特位的数值是否为 1,如果为 1,则说明该文件描述符在位图结构中。
理解 readfds,writefds 和 exceptfds 参数同理
- readfds 作为输入型参数时,所表示的意思是用户要内核关心哪些文件描述符的读事件。比特位的位置表示文件描述符的大小,比特位的内容表示是否要求内核关心该文件描述符的读事件。假设 readfds 为 0000 1001,最低位为第 0 位,那么 0000 1010 就表示内核需要帮用户关心 0 号和 3 号文件描述符的读事件。
- readfs 作为输出型参数时,所表示的意思是用户关心的文件描述符的读事件是否就绪。比特位的位置表示文件描述符的大小,比特位的内容表示该文件描述符的读事件是否就绪。假设 readfds 为 0000 1000,最低位为第 0 位,那么 0000 1000 就表示 3 号文件描述符的读事件就绪,0 号文件描述符的读事件没有就绪,后续调用 read / recv / recvfrom 函数不会被阻塞,可以直接进行数据读取(数据拷贝)。
- 用户和内核会同时修改位图结构 readfs,因此 readfds 用一次后一定需要进行重新设定。
socket就绪条件
读就绪
- socket 内核中。接收缓冲区中的字节数大于等于低水位标记SO_RCVLOWAT,此时可以无阻塞地读该文件描述符,并且返回值大于 0。
- socket TCP 通信中,对端关闭连接,此时对该 socket 读,则返回 0。
- 监听的 socke t上有新的连接请求。
- socket 上有未处理的错误。
写就绪
- socket 内核中,发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小)大于等于低水位标记 SO_SNDLOWAT,此时可以无阻塞的写,并且返回值大于 0。
- socket 的写操作被关闭(close 或者 shutdown),对一个写操作被关闭的 socket 进行写操作, 会触发 SIGPIPE 信号。
- socket 使用非阻塞 connect 连接成功或失败之后。
- socket上有未读取的错误。
异常就绪
- socket 上收到带外数据,关于带外数据,和 TCP 紧急模式相关。
- 带外数据(out-of-band data)是指优先级高于普通数据的数据。带外数据可以在普通数据传输的同时进行传输,而不会影响普通数据的传输。
select的基本工作流程
如果想要通过 select 来编写一个网络服务器,那么网络服务器的功能是将客户端发送过的数据进行打印接口,那么这个网络服务器的编写流程应该是这样的:
- 先初始化服务器,完成套接字的创建、绑定和监听。
- 定义一个文件描述符(套接字)数组 _fdArray,该数组是用来保存 select 需要关心的文件描述符,首先将监听套接字填到 _fdArray 数组中。
- 然后服务器就开始循环调用 select 函数,监测是否有读事件就需要。如果有读事件就绪,则需要对应的操作。
- 在调用 select 函数之前,都需要定义一个位图结构 rfds。然后遍历 _fdArray 数组找出有效的文件描述符添加进 rfds 中并找出有效文件描述符的最大值,然后再让 select 函数帮我们关心这些文件描述符的读事件是否就绪。
- 当有读事件就需要,select 就会通过输出型参数 rfds 来告诉我们哪些文件描述符的读事件就绪了。因为 _fdArray 数组中保存着 select 需要关心的文件描述符,所以我们可以遍历 _fdArray 数组找出是哪些有效文件描述符的读事件就绪了。
- 如果是用于监听的文件描述符就绪了,此时就需要调用 accept 函数从全连接队列中获取已经建立好的连接,并将该连接所对应的文件描述符添加到 _fdArray 数组中,以便后续 select 帮助我们关心该文件描述符的读事件。
- 因为 select 能够关心的文件描述符个数是有上限的。所以超过该上限时,新建立的连接就需要关闭掉。
- 如果是用于通信的文件描述符就绪了,此时就需要调用 read 函数读取客户端发送过来的数据并进行打印。该读事件就绪还有一种可能就是客户端将连接关闭了,那么服务端也要将该连接关闭掉,并将该文件描述符从 _fdArray 数组中移除,表明不需要 select 再关心该文件描述符的读事件了。
select服务器
日志和套接字组件
// Log.hpp
#pragma once#include <cstdio>
#include <cstdarg>
#include <string>
#include <iostream>
#include <ctime>// 日志等级
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4#define LOGFILE "./ThreadPool.log"const char* levelMap[] =
{"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};void logMessage(int level, const char* format, ...)
{// 只有定义了DEBUG_SHOW,才会打印debug信息// 利用命令行来定义即可,如-D DEBUG_SHOW
#ifndef DEBUG_SHOWif(level == DEBUG) return;
#endifchar stdBuffer[1024]; // 标准部分time_t timestamp = time(nullptr);// struct tm *localtime = localtime(×tamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", levelMap[level], timestamp);char logBuffer[1024]; // 自定义部分va_list args; // va_list就是char*的别名va_start(args, format); // va_start是宏函数,让args指向参数列表的第一个位置// vprintf(format, args); // 以format形式向显示器上打印参数列表vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args); // va_end将args弄成nullptr// FILE* fp = fopen(LOGFILE, "a");printf("%s%s\n", stdBuffer, logBuffer);// fprintf(fp, "%s%s\n", stdBuffer, logBuffer); // 向文件中写入日志信息// fclose(fp);
}
// Sock.hpp
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
#include <fcntl.h>class Sock
{
private:// listen的第二个参数是底层全连接队列的长度,其数值为listen的第二个参数+1const static int gbackLog = 10;
public:Sock(){}// 创建套接字static int Socket(){int listenSock = socket(AF_INET, SOCK_STREAM, 0);if(listen < 0) exit(2);int opt = 1;setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); // 复用IP地址和端口号return listenSock;}// 绑定IP地址和端口号static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0"){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);inet_pton(AF_INET, ip.c_str(), &local.sin_addr);if(bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0) exit(3);}static void Listen(int sock){if(listen(sock, gbackLog) < 0)exit(4);}// 一般经验// const std::string &: 输入型参数// std::string *: 输出型参数 // std::string &: 输入输出型参数// 获取连接static int Accept(int listenSock, std::string* ip, uint16_t* port){struct sockaddr_in src;socklen_t len = sizeof(src);int serviceSock = accept(listenSock, (struct sockaddr*)&src, &len);if(serviceSock < 0) return -1;if(port) *port = ntohs(src.sin_port);if(ip) *ip = inet_ntoa(src.sin_addr);return serviceSock;}// 发起连接static bool Connect(int sock, const std::string& serverIp, uint16_t& serverPort){struct sockaddr_in server;memset(&server, 0, sizeof server);server.sin_family = AF_INET;server.sin_port = htons(serverPort);server.sin_addr.s_addr = inet_addr(serverIp.c_str());if(connect(sock, (struct sockaddr*)&server, sizeof server) == 0) return true;else return false;}// 将sock设置为非阻塞static bool SetNonBlock(int sock){int fl = fcntl(sock, F_GETFL);if(fl < 0) return false;fcntl(sock, F_SETFL, fl | O_NONBLOCK);return true;}~Sock(){}
};
select 服务器
// SelectServer.hpp
#ifndef __SELECT_SERVER_H__
#define __SELECT_SERVER_H__#include <sys/select.h>
#include <iostream>
#include "Log.hpp"
#include "Sock.hpp"using namespace std;#define BITS 8
#define NUM (sizeof(fd_set) * BITS) // 监测文件描述符的最多个数
#define FD_NONE -1 // FD_NONE表示该文件描述符不需要关心// select只关心读取时间,不关心写入事件和异常事件
// select一开始只关心listensock,随着连接的增多,select关心的文件描述符会越来越多
// 如果看待listensock?listensock是用来获取新连接的,我们依旧把它看作IO事件,input事件
// 如果新连接没有到来,那么直接调用accept会被阻塞住,所以需要让select关心listensock的读事件
class SelectServer
{
public:SelectServer(const uint16_t& port = 8080): _port(port){_listenSock = Sock::Socket();Sock::Bind(_listenSock, _port); // 绑定IP地址和端口号Sock::Listen(_listenSock); // 将_listenSock设置为监听套接字logMessage(DEBUG, "Create ListenSock Success!");// _fdArray数组中存储的都是要求select帮我们关心的文件描述符// FD_NONE为无效的文件描述符for(int i = 0; i < NUM; ++i) _fdArray[i] = FD_NONE;// 约定: _fdArray[0] = _listenSock_fdArray[0] = _listenSock;}void Start(){while(true){Debug();fd_set rfds;FD_ZERO(&rfds);int maxfd = _listenSock; // select需要关心的最大文件描述符// 设置select需要关心的文件描述符for(int i = 0; i < NUM; ++i){// 无效的文件描述符直接跳过即可if(_fdArray[i] == FD_NONE) continue;else{FD_SET(_fdArray[i], &rfds); // 让select关心_fdArray[i]的读事件if(_fdArray[i] > maxfd) maxfd = _fdArray[i]; // 更新最大文件描述符}}// 前五秒阻塞等待,如果文件描述符的读事件还没有就绪,就从select中返回// struct timeval timeout = {5, 0};int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);// int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);switch (n){case -1:logMessage(WARNING, "Select Error %d %s", errno, strerror(errno));break;case 0:// 文件描述符的读事件未就绪,此时可以去处理其他事情logMessage(DEBUG, "Read Event Not Ready"); break;default:// 读事件就绪logMessage(DEBUG, "Read Event Ready");// 读事件就绪时,就需要处理该事件,不然的话// 内核就会一直通知你读事件就绪了HandlerEvents(rfds);break;}}}~SelectServer(){if(_listenSock >= 0) close(_listenSock);}private:void HandlerEvents(const fd_set& rfds){for(int i = 0; i < NUM; ++i){// 跳过不合法的文件描述符if(_fdArray[i] == FD_NONE) continue;// 合法的文件描述符不一定就绪,在位图结构中的文件描述符才是就绪的if(FD_ISSET(_fdArray[i], &rfds)){// 1、读事件就绪,新连接到来,需要调用Accepter// 2、读事件就绪,数据到来,需要调用Recverif(_fdArray[i] == _listenSock)Accepter(); elseRecver(i);}}}void Accepter(){string clientIp;uint16_t clientPort = 0;// 此时获取新连接不会被阻塞住// 获取到新连接后不应该立即对sock进行读取操作,因为有可能会被阻塞住int sock = Sock::Accept(_listenSock, &clientIp, &clientPort);if(sock < 0){logMessage(WARNING, "Accepter Error");return;}logMessage(NORMAL, "Accept A New Link Success: [%s:%d] fd:%d", clientIp.c_str(), clientPort, sock);// 找到一个合法的位置,将获取的新连接放到_fdArray数组中int pos = 1;for(; pos < NUM; ++pos){if(_fdArray[pos] == FD_NONE) break;}// NUM就是select能够检测文件描述符数目的上限,如果pos// 等于NUM,则说明现在select检测的文件描述符已经有NUM个// 了,无法对新获取的连接进行监测了,所以只能将该连接关闭掉if(pos == NUM){logMessage(WARNING, "SelectServer Already Full, Close: %d", sock);close(sock);}else_fdArray[pos] = sock;}void Recver(int pos){// 此时调用read/recv等函数就不会被阻塞住了logMessage(NORMAL, "Data Arrives on %d File Descriptor", _fdArray[pos]);char buffer[1024];// 这里是有bug的,因为使用TCP协议进行通信,没有进行协议定制// 无法保证此次读取的数据就是一个完整的报文,而不是多个报文或半个报文int n = read(_fdArray[pos], buffer, sizeof(buffer) - 1);if(n > 0){buffer[n] = 0;logMessage(NORMAL, "Client[%d]# %s", _fdArray[pos], buffer);}else if(n == 0){// 客户端关闭连接,服务端也要关闭连接logMessage(NORMAL, "Client[%d] Quit, Me Too", _fdArray[pos]);// 先关闭文件描述符,然后不再让select关心该文件描述符close(_fdArray[pos]);_fdArray[pos] = FD_NONE;}else{logMessage(WARNING, "%d File Descriptor Read Error %d:%s", _fdArray[pos], errno, strerror(errno));// 先关闭文件描述符,然后不再让select关心该文件描述符close(_fdArray[pos]);_fdArray[pos] = FD_NONE;}}// 打印select所关心的文件描述符void Debug(){cout << "_fd_array[]: ";for(int i = 0; i < NUM; i++){if(_fdArray[i] == FD_NONE) continue;cout << _fdArray[i] << " ";}cout << endl;}private:uint16_t _port;int _listenSock;// _fdArray数组中保存的是select需要关心的文件描述符int _fdArray[NUM];
};#endif
select 服务器说明:
- 服务端的 IP 地址和端口号没有让我们进行指定,IP 地址采用缺省值 0.0.0.0,端口号采用缺省值 8080。
- select 的第一个参数是文件描述符的最大值 + 1,随着获取的文件描述符越来越多,那么文件描述符的最大值也会变化,因此需要遍历 _fdArray 数组找出最大的文件描述符。
- readfs、writefds 和 exceptfds 都是输入输出型参数,输入和输出时可能会不一样,因此我们要将需要关心的文件描述符添加到 readfds、writefds 和 exceptfds 中。
- timeout 也是输入输出型参数,如果有需要的话,也需要进行重新设置。
- 当有读事件就绪没有处理该事件或者 timeout 为零并没有重新设置时,服务器将会一直打印
Read Event Ready
或者Read Event Not Ready
,因此需要对就绪的读事件进行处理和重新设置 timeout 参数。 - 就读的读事件有两种:用于监听的文件描述符的读事件就绪和用于通信的文件描述符的读事件就绪。
- 当用于监听的读事件就绪时,需要通过 accept 函数将建立好的连接获取上来,但并不能直接调用 read 函数来读取数据,因为底层可能并没有数据,此时读取就会被挂起阻塞住。因此,获取到的新连接应该添加到 _fdArray 数据中,然后下一次循环就可以让 select 来帮助我们对该连接的读事件进行监测了。如果 _fdArray 已经无法存放新的连接了,则说明 select 关心更多的连接了,所以只能将该连接关闭掉。
- 当用于通信的读事件就绪时,需要调用 read 函数进行数据的读取。如果 read 函数的返回值大于 0,则说明获取到了数据,可以直接进行打印;如果返回值等于 0,则说明客户端已经将连接关闭了,此时服务端也应该将连接关闭并将该连接从 _fdArray 数组中移除;如果返回值小于 0,则说明 read 函数出错了,也需要关闭连接并将该连接从 _fdArray 数组中移除。
- 小小的优化:保存文件描述符的数组 _fdArray 可以使用 vector 来代替,需要 select 关心的文件描述符尾插到 vector 中,不需要 select 关心的文件描述符直接从 vector 中 erase 掉即可。不过还需要维护一下 select 关心文件描述符的数目,避免超出上限。
select 服务器测试
由于没有编写客户端,所以我们通过 telnet 工具来充当客户端,并向服务端发送消息,此时服务器就可以将客户端发送过来的数据进行打印了。
尽管 select 服务器是单进程的服务器,但是它也可以并发地为多个客户端进行服务。其根本原因就是 select 帮助我们检测哪些文件描述符的读事件就绪了,当 select 函数返回时,就可以直接处理这些读事件,并不会被阻塞注。
当前 select 服务器存在的一些问题
当前的 select 服务器实际上还存在着一些问题:
- 服务器没有对客户端发送过来的数据进行相应。如果 select 服务器要想服务端进行数据发送,也不能直接调用 write / send 函数。因为 write / send 函数也包括 “等” 和 “数据拷贝” 两个步骤,直接调用 write / send 函数会导致进程被挂起。因此,我们需要将 “等” 的过程交给 select,那么就要增加一个数组来保存需要关心写事件的文件描述符。在调用 select 函数之前,需要将写事件的文件描述符添加到 writefds 中,让 select 帮我们关心这些文件描述符的写事件。
- 未进行协议的定制。因为没有进行协议的定制,所以 Recver 函数中读取上来的数据并不一定就是一个完整的报文,可能是多个报文,也有可能是半个报文。这个就是粘包问题,需要通过定制协议来解决。
- 以上问题都将会 epoll 中进行解决,编写出更加全面的代码。
select服务器的优缺点
select 服务器的优点
- select 可以同时等待多个文件描述符,并且负责等待这些文件描述符,具体的 IO 操作则有 read、recv 和 write 等接口来完成,调用这些接口时就不会被阻塞注了。
- select 可以让多个文件描述符的等待时间重叠起来,从而提高了 IO 的效率。
- select 服务器与多进程 / 多线程服务器相比,更加节省系统资源。使用多线程服务器的一个明显缺点是需要为每个连接分配一个线程,大量连接可能导致大量线程,从而占用大量系统资源,降低服务器的性能。而 select 函数可以通过单线程同时管理多个连接,节省了系统资源。
以上的优点,所以的多路转接接口都拥有。
select 服务器的缺点
- 检测的文件描述符数量受限,通常被限制为 1024。
- 每次调用 select 函数时,需要将所有的文件描述符从用户空间复制到内核空间或从内核空间拷贝到用户空间,开销比较大。
- 每次调用 select 函数前,需要遍历整个文件描述符数组,查找需要关心的文件描述符,将其重新添加到位图结构中,使用该接口是非常不方便的,开销也是比较大的。
- 每次调用 select 函数时,内核都需要通过遍历的方式来判断文件描述符的是否就绪,效率也会比较低。
- 无法知道哪些文件描述符准备就绪,需要遍历整个文件描述符数组来判断。
select服务器的应用场景
使用多路转接接口 select / poll / epoll 编写出来的网络服务器需要在一定的应用场景下使用,如果不这样的话,效率可能会反而下降。
- 多路转接接口编写出来的网络服务器通常是适用于有多连接,且多连接只有少部分连接是活跃的,这也就意味着大多数连接进行 IO 操作时,都需要花费较多的时间进行等待事件就绪。那么使用多路转接接口将多个事件的等待时间重叠在依次,提高 IO 的效率。面对这样的场景,多路转接接口还可以节省系统资源,因为多路转接只创建了文件描述符,并不像多线程那样直接创建一个线程去等待一个不活跃的连接。
- 对应多连接中大部分连接都很活跃的场景,就没必要使用多路转接接口了。因为每个连接都很活跃,这就意味着每个连接的时间基本都是就绪的,可以直接进行 IO 操作,根本就不用多路转接接口来进行等待,因为使用多路转接接口也是需要消耗系统的时间和空间资源的。
使用多路转接接口最常见的场景就是 QQ、WeChat 等聊天软件,因为这些软件都是会存在大量的连接但是不会有很多连接是活跃的,因此可以使用多路转接接口来进行服务器的编写。如果采取多线程方案的话,会销毁系统大量的时间资源和空间资源。
👉多路转接之poll👈
poll函数原型
poll 函数也是用于等待多个文件描述符的多路转接接口,它与 select 函数的主要区别是使用结构体数组来代替文件描述符集合,可以手动地调整数组的大小,从而避免了文件描述符数量受限的问题。
poll 函数的原型如下:
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- fds 是一个 pollfd 结构体数组,该数组中包含 nfds 个元素,每个元素包含了文件描述符,监听的事件集合和返回的事件集合。
- nfds 表示 fds 数组中元素的数量。
- timeout 表示等待的超时时间,以毫秒为单位,当 timeout 为负数时,poll 函数将一直等待直到有事件发生,当 timeout 为 0 时,poll 函数将立即返回。timeout 仅仅是一个输入型参数,因此调用 poll 函数之前,不需要对他重新进行设定。
- poll 函数的返回值表示实际发生事件的文件描述符数量。如果返回值为 0,表示超时但没有文件描述符发生事件,如果返回值为负数,表示出错(如将非法文件描述符添加到 poll 中)。
pollfd 结构体
pollfd 结构体定义如下:
struct pollfd
{int fd; // 文件描述符short events; // 等待的事件类型short revents; // 实际发生的事件类型
};
- fd 是文件描述符,调用 poll 函数或 poll 函数调用完成时,这个字段都不会被修改,因为用户和内核都会使用该字段。
- events 是用户告诉内核要关心文件描述符 fd 上的事件,内核不会修改这个字段,只会读取该字段。
- revents 是内核告诉用户该文件描述符 fd 上的哪些事件发生了,该字段由内核进行修改,用户通过读取该字段来得知什么事件发生了。
- events 和 revents 都是 short 类型,它们是通过不同的比特位来表示不同的事件。
poll 与 select 相比的优点是:poll 将要监测的时间和实际发生的时间两个参数分开了,不像 select 用一个参数来同时表示要监测的时间和实践发生的事件,因此调用 poll 函数之前不需要对这两个参数重新进行设定。
events 和 revents 的取值
- POLLPRI 是优先级数据可读,比如 TCP 带外数据。TCP 带外数据和 URG 标记位和 16 位紧急指针有关。
poll 测试代码:使用poll监控标准输入
#include <poll.h>
#include <unistd.h>
#include <stdio.h>int main()
{struct pollfd poll_fd;poll_fd.fd = 0;poll_fd.events = POLLIN;for (;;){int ret = poll(&poll_fd, 1, 1000);if (ret < 0){perror("poll");continue;}if (ret == 0){printf("poll timeout\n");continue;}if (poll_fd.revents & POLLIN){char buf[1024] = {0};read(0, buf, sizeof(buf) - 1);printf("stdin:%s", buf);}}
}
注:当 timeout 为负数时,poll 函数会进行阻塞等待。
poll服务器
#ifndef __POLL_SERVER_H__
#define __POLL_SERVER_H__#include <poll.h>
#include <iostream>
#include "Log.hpp"
#include "Sock.hpp"using namespace std;#define FD_NONE -1 // FD_NONE表示该文件描述符不需要关心class PollServer
{
private:static const int defaultnfds = 100;static const int defaulttimeout = 1000;
public:PollServer(const uint16_t& port = 8080, int nfds = defaultnfds, int timeout = defaulttimeout): _port(port), _nfds(nfds), _timeout(timeout){_listenSock = Sock::Socket();Sock::Bind(_listenSock, _port); // 绑定IP地址和端口号Sock::Listen(_listenSock); // 将_listenSock设置为监听套接字logMessage(DEBUG, "Create ListenSock Success!");_fds = new struct pollfd[_nfds];for(int i = 0; i < _nfds; ++i){_fds[i].fd = FD_NONE;_fds[i].events = _fds[i].revents = 0;}// 约定:监听套接字放在下标为0的位置// 关心监听套接字的读事件_fds[0].fd = _listenSock;_fds[0].events = POLLIN;}void Start(){while(true){int n = poll(_fds, _nfds, _timeout);switch (n){case -1:logMessage(WARNING, "Select Error %d %s", errno, strerror(errno));break;case 0:// 文件描述符的读事件未就绪,此时可以去处理其他事情logMessage(DEBUG, "Read Event Not Ready, Time Out"); break;default:// 读事件就绪logMessage(DEBUG, "Read Event Ready");HandlerEvents();break;}}}~PollServer(){if(_listenSock >= 0) close(_listenSock);if(_fds != nullptr) delete[] _fds; }private:void HandlerEvents(){for(int i = 0; i < _nfds; ++i){// 跳过不合法的文件描述符if(_fds[i].fd == FD_NONE) continue;// 合法的文件描述符不一定就绪,在位图结构中的文件描述符才是就绪的if(_fds[i].revents & POLLIN){// 1、读事件就绪,新连接到来,需要调用Accepter// 2、读事件就绪,数据到来,需要调用Recverif(_fds[i].fd == _listenSock)Accepter(); elseRecver(i);}}}void Accepter(){string clientIp;uint16_t clientPort = 0;// 此时获取新连接不会被阻塞住int sock = Sock::Accept(_listenSock, &clientIp, &clientPort);if(sock < 0){logMessage(WARNING, "Accepter Error");return;}logMessage(NORMAL, "Accept A New Link Success: [%s:%d] fd:%d", clientIp.c_str(), clientPort, sock);// 找到一个合法的位置,将获取的新连接放到_fds数组中int pos = 1;for(; pos < _nfds; ++pos){if(_fds[pos].fd == FD_NONE) break;}// poll所能监测的文件描述符是没有上限的,但是数组是有上限的// 当poll监测的文件描述符超过数组的上限时,可以进行扩容,也// 可以不进行扩容,取决于具体的应用场景。如果不进行扩容,直接// 将该连接关闭即可if(pos == _nfds){logMessage(WARNING, "SelectServer Already Full, Close: %d", sock);close(sock);}else{_fds[pos].fd = sock;// 如果想要文件描述符的写时间,可以按位或上POLLOUT// 增加对写事件的关心,那么就要对写事件进行一定的处理_fds[pos].events = POLLIN; }}void Recver(int pos){// 此时调用read/recv等函数就不会被阻塞住了logMessage(NORMAL, "Data Arrives on %d File Descriptor", _fds[pos].fd);char buffer[1024];// 这里是有bug的,因为使用TCP协议进行通信,没有进行协议定制// 无法保证此次读取的数据就是一个完整的报文,而不是多个报文或半个报文int n = read(_fds[pos].fd, buffer, sizeof(buffer) - 1);if(n > 0){buffer[n] = 0;logMessage(NORMAL, "Client[%d]# %s", _fds[pos].fd, buffer);}else if(n == 0){// 客户端关闭连接,服务端也要关闭连接logMessage(NORMAL, "Client[%d] Quit, Me Too", _fds[pos].fd);// 先关闭文件描述符,然后不再让poll关心该文件描述符close(_fds[pos].fd);_fds[pos].fd = FD_NONE;_fds[pos].events = 0;}else{logMessage(WARNING, "%d File Descriptor Read Error %d:%s", _fds[pos].fd, errno, strerror(errno));// 先关闭文件描述符,然后不再让poll关心该文件描述符close(_fds[pos].fd);_fds[pos].fd = FD_NONE;_fds[pos].events = 0;}}private:uint16_t _port;int _listenSock;struct pollfd* _fds;int _nfds; // struct pollfd数组中元素的个数int _timeout;
};#endif
poll 服务器说明:
- poll 服务器可以在 select 服务器的基础上进行改写,用 _fds 替换掉 _fdArray,同时增加了 _nfds 和 _timeout,_nfds 用来表示 fds 数组中元素的个数,_timeout 用来表示超时时间。
- poll 服务器和 select 服务器的工作流程非常类似,只是不需要对 struct pollfd 数组进行重新设定了。
- poll 服务器的事件处理分为两种:用于监听的文件描述符的读事件就绪和用于通信的文件描述符的读事件就绪。
- 当用于监听的文件描述符读事件就绪时,则调用 Accepter 函数将建立好的连接获取上来,并将该连接添加到 _fds 数组中,如果 _fds 数组已经没有空间了,此时可以选择扩容也可以选择不扩容取决于具体的业务场景。
- 如果用于通信的文件描述符读事件就绪时,此时应该调用 read 函数将数据获取上来。如果 read 函数的返回值大于 0 ,则可以对读取到的数据进行打印(注:没有进行协议定制,读取到的数据可能不是一个完整的报文);如果返回值等于 0,则说明客户端已经将连接关闭,则服务端也应该将连接关闭并让 poll 不再关心该连接;如果返回值小于 0,则说明 read 函数出现错误,此时可以将连接直接关闭并让 poll 不再关心该连接。
- 注:fd 为负数,内核会认为该文件描述符是个无效的文件描述符,自动跳过。
Test.cc
#include "PollServer.hpp"
#include <memory>int main()
{unique_ptr<PollServer> svr(new PollServer);svr->Start();return 0;
}
尽管 poll 服务器也是一个单进程的服务器,但是它也能够高并发地处理多个客户端的请求。
poll服务器的优缺点
优点
- 效率高,poll 可以同时等待多个文件描述符,让多个文件描述符的等待时间重叠在一起。因此,在有大量连接且只有少部分连接是活跃的情况下,poll 服务器比多线程服务器更加节省系统资源。
- poll 的输入参数和输出参数是分离的,不需要像 select 那样进行大量的参数重置。
- poll 可以支持的文件描述符数量没有上限,因此它更适合于大规模并发的场景。
缺点
poll 中检测的文件描述符数目增多时
- poll 依旧需要不少的遍历,不管是在用户层,还是在内核层,都需要通过遍历的方式来检测文件描述符的相关事件是否就绪。同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率会线性下降。
- 每次调用 poll 都需要把大量的 pollfd 结构从用户态拷到内核层和从内核层到用户层的拷贝,这个拷贝是少不了的。
👉多路转接之epoll👈
epoll初识
按照 man 手册的说法: 是为处理大批量句柄而作了改进的 poll。它是在 2.5.44 内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它几乎具备了之前所说的一切优点,其效率远高于 select 和 poll 函数,被公认为 Linux2.6 下性能最好的多路 I / O 就绪通知方法。
注:句柄(Handle)通常是一个唯一的标识符,用于标识操作系统内核中的某个对象,如文件、管道、套接字等。句柄可以用于对对象进行操作,例如读写文件、传输数据等。句柄通常是一个整数值,可以看作是操作系统内部的指针或引用,它指向了对象在内存中的位置或者描述对象的数据结构。
epoll的相关系统调用
epoll 实际上提供了三个系统调用:epoll_create、epoll_ctl 和 epoll_wait。
epoll_create
int epoll_create(int size);
- epoll_create 函数用于创建一个 epoll 实例,返回一个文件描述符,后续操作都需要使用这个文件描述符。
- 自从 Linux2.6.8 之后, size 参数是被忽略的,没有删除该参数,主要是为了向前兼容。
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- epoll_ctl 是 epoll 的事件注册函数,用于添加或修改或删除监听事件。
- epfd 是要操作的 epoll 实例的文件描述符,也就是 epoll_create 函数的返回值(epoll 的句柄)。
- op 是对事件的操作,包括添加、修改和删除,用三个宏来表示,分别是 EPOLL_CTL_ADD、EPOLL_CTL_MOD 和 EPOLL_CTL_DEL。EPOLL_CTL_ADD 是注册新的 fd 到 epfd 中,EPOLL_CTL_MOD 是修改已经注册的 fd 的监听事件,EPOLL_CTL_DEL 是从 epfd 中删除一个 fd。
- fd 是需要监测的文件描述符。
- event 是用户告诉内核需要监测什么事件,包括读事件、写事件和异常事件。
- 如果返回值为 0,表示调用成功;如果返回值为 -1,表示调用失败,错误码会被设置。
认识 struct epoll_event 结构
struct epoll_event 是 epoll 事件的数据结构,用于描述一个被监测的文件描述符上发生的事件,其结构如下:
// 联合体
typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event {uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
};
其中,events 用于描述被监测文件描述符上的事件集合,包括:
- EPOLLIN:表示文件描述符上有可读数据。
- EPOLLOUT:表示文件描述符上可以写数据。
- EPOLLRDHUP:表示对端关闭了连接。
- EPOLLPRI:表示文件描述符上有紧急数据可读。
- EPOLLERR:表示文件描述符发生错误,可以通过 epoll_ctl 操作设置该事件。
- EPOLLHUP:表示文件文件描述符挂起。
- EPOLLET:表示将 epoll 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
- EPOLLONESHOT:表示只监听一次事件,事件被触发后,该事件将从 epoll 中删除。当监听完这次事件之后,如果还需要继续监听这个文件描述符的话,需要再次把这个文件加入到 epoll队列里。
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
- epfd 是要操作的 epoll 实例的文件描述符,也就是 epoll_create 函数的返回值(epoll 的句柄)。
- events 是就绪事件数组(epoll_event 结构体数组),输出型参数,epoll 会将发生的事件按照顺序放入到 events 数组中,events 不能是空指针,内核只负责将数据复制到 events 数组中,不会帮助我们在用户态中分配内存。
- maxevents 是事件数组的元素个数,maxevents 的值不能大于 epoll_create 函数中的参数 size。
- timeout 是超时时间,输入型参数,单位是毫秒。如果为 0 则立即返回,如果为 -1 则一直阻塞等待时间就绪。
- 用于等待事件的发生,如果有事件发生则已经就绪的文件描述符总数。如果返回值等于 0,则表示超时;如果返回值等于 -1,则表示函数调用失败,错误码 errno 会被设置。
epoll的工作原理
select 和 poll 的工作原理
- 无论是 select 还是 poll,都需要用户自己维护一个数组来保存文件描述符和特定的时间,这是一个较大的成本。
- select 和 poll 不管是在用户层,还是在内核层,都需要通过遍历的方式来检查文件描述符的事件是否就绪,开销较大。
- 调用 select 和 poll 之前,用户都要告诉内核,它要帮助自己关心哪些文件描述符上的哪些时间。
- 调用 select 和 poll 之后,内核会告诉用户哪些文件描述符上的哪些事件发生了。
网卡接收到了数据,操作系统是如何得知的呢?
操作系统可以通过两种方式得知网卡有数据到来:
- 轮询(Polling):操作系统会定期轮询(Poll)每个网卡的状态,看是否有新的数据包到来。这种方式的缺点是会浪费系统资源,因为操作系统需要不停地查询网卡状态,即使网卡上没有数据包需要处理。
- 中断(Interrupt):当网卡接收到数据时,会产生一个硬件中断信号(interrupt),中断控制器硬件会将 8 号中断号发送到CPU。此时 CPU 会停止当前的任务,根据中断号找到硬件中断处理程序的入口(interrupt handler),转而去执行硬件中断处理程序。
- 硬件中断处理程序是操作系统内核中的一段代码,它会读取网卡中接收到的数据,并将其存放在内核中的接收缓冲区(receive buffer)中。接着,硬件中断处理程序会调用软件中断处理程序(software interrupt handler)来进一步处理这些数据。
中断向量表
中断向量表是一个存储系统中断处理程序入口地址的表格。它是操作系统内核用于响应系统中断的重要数据结构之一。
当系统发生中断时(例如,外部设备发送了一个信号或者出现了一个硬件故障),中断控制器硬件会将中断号(或中断向量)发送到 CPU。CPU 将中断号作为索引,通过访问中断向量表来获取相应中断处理程序的入口地址。操作系统内核根据获取的入口地址调用相应的中断处理程序。
在 x86 架构的计算机中,中断向量表的大小是固定的,为256个条目,每个条目对应一个中断号。前 32 个条目用于CPU内部异常处理和软件中断,而后 224 个条目用于外部设备的中断处理。
中断向量表中的每个条目包含两部分信息:中断处理程序的入口地址和处理程序的特权级别。由于操作系统内核需要访问特权级别较高的硬件资源,所以中断处理程序通常也需要在内核态下运行。因此,处理程序的特权级别必须与内核的特权级别匹配,否则处理程序将无法正常运行。
中断向量表的地址通常被保存在一个专用的寄存器中,例如 x86 架构的 IDTR 寄存器。当操作系统启动时,它会初始化中断向量表,并将其地址加载到 IDTR 寄存器中,以便 CPU 能够访问该表。
查看中断号
/proc/interrupts
是一个特殊的文件,在 Linux 系统中用于展示当前系统中各种中断的状态信息。它可以显示中断号、中断名称、中断发生的次数以及中断对应的 CPU 的编号等信息。
调用 epoll_create 函数时,操作系统会在底层做些什么呢?
调用 epoll_create 函数创建 epoll 模型时,操作系统会在底层做两件事情。
-
调用 epoll_create 函数时,操作系统会在底层创建一棵红黑树,该红黑树用来保存用户要求内核关心的文件描述符和事件,用户不需要再应用层再用数组来维护这些信息了。红黑树的作用就是将被监测的文件描述符按照其值的大小进行排序,并保证其能够高效地进行查找、插入、删除等操作。
-
调用 epoll_create 函数时,操作系统会在底层创建一个就绪队列(本质是双向链表),该就绪队列保存的是就绪的文件描述符和其对应的时间。当用户程序调用 epoll_wait 函数时,内核会将链表上的 epoll_event 结构体复制到用户空间,供用户程序使用。这样,在 O(k) 的时间复杂度内(k 表示就绪的文件描述符数),用户程序就可以获取到所有就绪的文件描述符及其对应的事件了。
- 当某一进程调用 epoll_create方法时, Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关。
struct eventpoll
{..../*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/struct rb_root rbr;/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/struct list_head rdlist;....
};
- 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件。
- 这些事件都会挂载在红黑树中,如此重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 l o g log logN,其中 N 为节点的个数)。
- 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说当响应的事件发生时会调用这个回调方法。
- 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双链表中。
- 在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体。
struct epitem
{struct rb_node rbn;//红黑树节点struct list_head rdllink;//双向链表节点struct epoll_filefd ffd; //事件句柄信息struct eventpoll *ep; //指向其所属的eventpoll对象struct epoll_event event; //期待发生的事件类型
}
- 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可。
- 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,这个操作的时间复杂度是 O(k),k 是就绪事件的个数。
有关 epoll 工作原理的细节
- 红黑树是要有 key 值才能进行排序的,文件描述符就是天然的 key 值。
- 用户只需要设置关心哪些文件描述符的哪些事件,然后获取结果即可,不用再关心任何文件描述符和事件的管理细节。
- epoll 为什么高效呢?epoll 是采用红黑树来管理事件的,而 select 和 epoll 是采用数组来管理事件的,所以 epoll 更加高效。还有就是文件描述符就绪时,可以通过回调的方式告诉操作系统,操作系统不需要一直检测文件描述符的事件是否发生。调用 epoll_wait 时,就绪队列可以以 O(1) 的时间复杂度告诉上层是否有事件就绪。
- 底层只要有文件描述符就绪时,操作系统会给用户构建节点并连入到就绪队列中,上层只需要不断地从就绪队列中将数据取走,就完成了获取事件的任务。其实这个过程的本质就是生产者消费者模型,操作系统是生产者,用户是消费者,就绪队列是交易场所也是共享资源。就绪队列是共享资源,那就需要进行加锁保护,而 epoll 已经保证 epoll 所有的接口都是线程安全的。
- 如果底层没有就绪事件,那么上层是否阻塞取决于第三个参数 timeout。
epoll服务器
epoll 相关接口的封装
#pragma once#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>class Epoll
{
private:const static int defaultSize = 256;public:// 创建epoll模型static int CreateEpoll(){int epfd = epoll_create(defaultSize);if(epfd <= 0) exit(5);return epfd;}static bool CtlEpoll(int epfd, int op, int sock, uint32_t events){struct epoll_event ev;ev.events = events;ev.data.fd = sock; // ev.data是一个联合体,使用其中的文件描述符字段即可int ret = epoll_ctl(epfd, op, sock, &ev);return ret == 0;}static int WaitEpoll(int epfd, struct epoll_event* readyEvents, int maxEvents, int timeout){// 如果底层就绪的事件有很多,readyEvents承装不下,可以分多次来拿取底层就绪的事件// epoll_wait的返回值是底层就绪的事件的个数,readyEvents是输出型参数// epoll_wait函数返回时,会将底层就绪的事件按照顺序放入到readyEvents// 数组中,一个有返回值个就绪的事件return epoll_wait(epfd, readyEvents, maxEvents, timeout);}
};
EpollServer 的设计
#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__#include <functional>
#include <cassert>
#include "Epoll.hpp"
#include "Log.hpp"
#include "Sock.hpp"using func_t = std::function<void(std::string)>;class EpollServer
{
private:const static uint16_t defaultPort = 8080; // 将服务器的端口号默认为8080const static int defaultMaxEvents = 64; // 默认一次接收就绪事件的上限为64public:EpollServer(func_t callBack, const uint16_t port = defaultPort, const int maxEvents = defaultMaxEvents): _port(port), _maxEvents(maxEvents), _callBack(callBack){// 1. 创建监听套接字_listenSock = Sock::Socket();Sock::Bind(_listenSock, _port);Sock::Listen(_listenSock);// 2. 创建epoll模型_epfd = Epoll::CreateEpoll();// 3. 将_listenSock添加到epoll模型中logMessage(DEBUG, "Init Server Success! _listenSock:%d _epfd:%d", _listenSock, _epfd); // 3 4if(!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, _listenSock, EPOLLIN))exit(6); // 只关心读事件// 4. 申请保存就绪事件的空间_readyEvents = new struct epoll_event[_maxEvents];logMessage(DEBUG, "Add _listenSock To Epoll Success");}~EpollServer(){if(_listenSock >= 0) close(_listenSock);if(_epfd >= 0) close(_epfd);if(_readyEvents) delete[] _readyEvents;}void Start(){// int timeout = -1; // 阻塞等待// int timeout = 0; // 非阻塞等待int timeout = 3000; // 没事件就绪时,每隔3秒timeout一次while(true){LoopOnce(timeout); // 循环一次}}private:void LoopOnce(int timeout){int n = Epoll::WaitEpoll(_epfd, _readyEvents, _maxEvents, timeout);// if(n == _maxEvents) // 如果获取上来的事件个数等于上限,可以选择扩容switch(n){case -1:logMessage(WARNING, "WaitEpoll Error: %s", strerror(errno));break;case 0:logMessage(DEBUG, "WaitEpoll Timeout");break;default:// 事件就绪,需要处理事件logMessage(NORMAL, "WaitEpoll Success");HandlerEvents(n);break;}}void HandlerEvents(int n){assert(n > 0); // 增强健壮性for(int i = 0; i < n; ++i){uint32_t revents = _readyEvents[i].events;int sock = _readyEvents[i].data.fd;if(revents & EPOLLIN){if(sock == _listenSock){Accepter(_listenSock);}else{Recver(sock);}}if(revents & EPOLLOUT){// TODO 写事件就绪将会在Reactor模式中处理}}}void Accepter(int listenSock){ std::string clientIp;uint16_t clientPort;int sock = Sock::Accept(listenSock, &clientIp, &clientPort);if(sock < 0){logMessage(WARNING, "Accept Error!");return;}// 获取连接成功不能立即调用read等接口,因为底层数据可能没有就绪而导致进程被挂起if(!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, sock, EPOLLIN)) return;logMessage(NORMAL, "Add New Sock %d To Epoll Success", sock);}void Recver(int sock){char buffer[1024];int n = read(sock, buffer, sizeof(buffer) - 1);if(n > 0){// 假设这里获取到的数据就是一个完整的报文// 但实际上不一定是一个完整的报文,需要通过// 定制协议来保证,在Reactor模式中统一讲解buffer[n] = 0;_callBack(buffer); // 将获取到的数据交给实际的业务进行处理,实现一定程度的解耦}else if(n == 0){// 先让epoll不要关心该文件描述符了,然后才能关闭该文件描述符// 如果先关闭文件描述符的话,epoll将认为该文件描述符是无效的// 去除对文件描述符的关心,不需要设置时间bool ret = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0); assert(ret);(void)ret;close(sock);logMessage(NORMAL, "Client Quit, Me Too");}else{bool ret = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0); assert(ret);(void)ret;close(sock);logMessage(WARNING, "Read Error, Close Sock: %d", sock);}}private:uint16_t _port;int _listenSock;int _epfd;struct epoll_event* _readyEvents; // 用于接收就绪事件的集合int _maxEvents; // 一次接收就绪事件的上限func_t _callBack; // 处理业务逻辑的回调函数
};#endif
相关说明:
- EpollServer 的构造函数需要传入业务逻辑处理的回调函数,然后创建、绑定监听套接字;再然后创建 epoll 模型并让 epoll 关心监听套接字的读事件;接着申请用来保存就绪事件的数组,当有事件就绪时,内核会将就绪的事件按照顺序拷贝到该数组中,此时前 n 个位置放的就是就绪的事件了(n 为 epoll_wait 函数的返回值)。
- Start 是运行服务器的接口,在该函数内部调用了 LoopOnce 含,而 LoopOnce 函数主要是调用 epoll_wait 函数,然后根据函数的返回值 n 来进行判断。如果 n 等于 -1,则 epoll_wait 调用失败,可能是接收到信号等原因导致的;如果 n 等于 0,则说明超时了;如果 n 大于 0,则说明事件就绪了,需要调用 HandleEvents 函数来处理时间。
- 就绪的事件有 n 个且按照顺序放在 _readyEvents 数组中了,所以遍历 _readyEvents 数组前 n 个元素接口。然后根据就绪的时间类型来做出相应的处理,在这里我们只关心读事件,其他类型的事件将在 Reactor 模式中讲解。那么我们只需要判断就绪的文件描述符是不是 _listenSock,如果是则调用 Accepter 函数获取新连接;如果不是则调用 Recver 读取数据。
- Accepter 函数获取到的新连接,不能立马调用 read 等接口,因为底层的数据可能还有就绪进而导致服务器进程被挂起。此时应该将获取的连接添加到 epoll 中,让 epoll 来关心其读事件是否就绪。Recv 函数接收到的数据的字节数大于 0,则将该数据交给 _callBack 函数,_callBack 是进行业务处理的函数,这样可以让 epoll 和实际的业务逻辑实现一定程度的解耦。如果字节数小于或等于 0,都应该先将该文件描述符移出 epoll 然后再关闭该文件描述符。如果先关闭文件描述符,再从 epoll 中移除该文件描述符,那么 epoll_ctl 函数将会调用失败。
业务逻辑处理
#include "EpollServer.hpp"
#include <memory>// 业务逻辑处理:将小写字母转为大写字母
void callBack(std::string request)
{for(int i = 0; i < request.size(); ++i){if(islower(request[i])){request[i] = toupper(request[i]);}}std::cout << "callBack: " << request << std::endl;
}int main()
{std::unique_ptr<EpollServer> svr(new EpollServer(callBack));svr->Start();return 0;
}
功能测试
将 epoll 服务器与业务逻辑处理解耦的好处就是:业务逻辑处理可以千变万化,但是 epoll 服务可以一成不变地接收连接和数据。
epoll的优点
- 接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离开。
- 数据拷贝轻量:只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中,这个操作并不频繁,而 select / poll 都是每次循环都要进行拷贝。
- 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中。epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度 O(1)。即使文件描述符数目很多,效率也不会受到影响。
- 没有数量限制:文件描述符数目无上限。
注:网上有些博客说,epoll 中使用了内存映射机制(内存映射机制:内核直接将就绪队列通过 mmap 的方式映射到用户态,避免了拷贝内存这样的额外性能开销)。这种说法是不准确的,我们定义的 struct epoll_event 是我们在用户空间中分配好的内存,势必还是需要将内核的数据拷贝到这个用户空间的内存中的。
epoll的工作方式
与 poll 模式的事件宏相比,epoll 模式新增了一个事件宏 EPOLLET,即边缘触发模式(Edge Trigger,ET),我们称默认的模式为水平触发模式(Level Trigger,LT)。这种模式的区别在于:
- 对于水平触发模式,一个事件只要有,就会一直触发,这是 select、poll 和 epoll 的默认模式,其表现就是如果没有将底层的数据全部取走,就会一直通知你事件就绪了。
- 对于边缘触发模式,在一个事件从无到有或从少到多变化时才会触发,
这两个词汇来自电学术语,我们可以将 fd 上有数据的状态认为是高电平状态,将没数据的状态认为是低电平状态,将 fd 可写状态认为是高电平状态,将 fd 不可写状态认为是低电平状态。那么水平模式的触发条件是处于高电平,而边缘模式的触发条件是新来的一次电信号将当前状态变为高电平状态。
- 水平模式的触发条件:低电平状态变成高电平状态或处于高电平状态。
- 边缘模式的触发条件:低电平状态变成高电平状态。
为什么 ET 模式一般会比 LT 模式高效呢?
在 LT模式下,当文件描述符的状态发生变化时,epoll_wait 函数会立即返回并通知应用程序。如果应用程序没有处理完这个文件描述符的事件,那么 epoll_wait 函数会再次返回这个文件描述符的事件,直到应用程序处理完它。
在 ET 模式下,只有在文件描述符状态发生变化时,epoll_wait 函数才会返回。如果应用程序没有处理完该事件,epoll_wait 函数将不会再次返回该事件。
ET 模式会要求程序员一次就把底层的数据取走,这样就可以避免过多的系统调用和数据拷贝,提高 IO 的效率。而 LT 模式是如果没有将底层的数据取完,那么 epoll_wait 会一直告诉用户底层数据没有取完,进而出现多次调用 epoll_wait 的情况。但是当 LT 模式也一次将底层数据取完,这时 LT 模式的效率和 ET 模式的效率是一样的。因此,ET 模式一般会比 LT 模式高效。
注:ET 模式会倒逼程序员尽快将接收缓冲区中的数据全部取走,应用层能够尽快地将缓冲区的数据取走,那么在单位时间内,该模式下工作的服务器就可以在一定程度上给发送方同步一个更大的接收窗口,所以对方就可以有更大的滑动窗口,就能向我们发送更多的数据,提高 IO 吞吐量。
为什么 ET 模式需要将文件描述符设置为非阻塞呢?
ET 模式要求应用层在下一次调用 epoll_wait 前就将底层的数据读取完,而一次就要将数据读取完就必须要一直循环读取。那么在最后一次读取完毕时,我们必须还有进行下一次读取(因为无法确认是否读取完)。如果此时文件描述符没有被设置成非阻塞,那么此次读取必定会导致进程被挂起,而进程被挂起在多路转接中是一定不被允许的。为了避免这个问题,所以在 ET 模式下工作的服务器都需要将文件描述符设置成非阻塞,只要一直循环读取到出错并且错误码为 EWOULDBLOCK(EAGAIN)就表明底层数据已经读取完了。
对比LT和ET
LT 模式是 epoll 的默认模式,使用 ET 模式能够减少 epoll 触发的次数,但是代价就是强逼着程序员在一次响应就绪过程中就把所有的数据都处理完。
相当于一个文件描述符就绪之后,不会反复被提示就绪,看起来就比 LT 模式更高效一些。但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被重复提示的话,其实性能也是一样的。
另一方面,ET 模式的代码复杂程度更高了。
epoll的使用场景
epoll 的高性能,是有一定的特定场景的。如果场景选择的不适宜,epoll 的性能可能适得其反。
- 对于多连接且多连接中只有一部分连接比较活跃时,比较适合使用 epoll。
例如:典型的一个需要处理上万个客户端的服务器和各种互联网 APP 的入口服务器,这样的服务器就很适合 epoll。
如果只是系统内部的服务器和服务器之间进行通信,只有少数的几个连接,这种情况下用 epoll 就并不合适,具体要根据需求和场景特点来决定使用哪种 IO 模型。
👉总结👈
本篇博客主要讲解了多路转接之 select、poll 和 epoll,分析了它们的函数原型、优缺点和应用场景等。以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家啦!💖💝❣️