目录
1. I/O 的重新理解
2. 五种 I/O 模型
① 阻塞 && 非阻塞
② 同步 I/O && 异步 I/O
3. select 模型
① 非阻塞 I/O
② select 接口介绍
③ 代码实现
④ select 模型的优缺点
4. poll 模型
① poll 接口介绍
② 代码实现
③ 与 select 模型的异同
5. epoll 模型
① epoll 接口介绍
② epoll 的实现原理
③ epoll 的优势
④ 代码实现
⑤ epoll 的两种工作模式
1. 为什么 ET 模式的文件描述符 fd 要设置成非阻塞形式?
2. ET 模式的效率一定比 LT 高吗?
3. LT 和 ET 的本质区别
6. 对多路转接代码进行设计与整合
① 代码的设计与整合
1. Comm.hpp
2. Epoller.hpp
3. Log.hpp
4. nocopy.hpp
5. Protocol.hpp
6. ServerCal.hpp
7. Socket.hpp
8. TimeManager.hpp
9. TcpServer.hpp
10. Main.cc
② Reactor 理论
1. I/O 的重新理解
I/O 即 Input && Output(输入/输出),在 Linux 中主要使用 read && write 函数
1. 应用层在使用 read && write 函数时,实际上是把数据从用户层写入到 OS 中,本质是两个拷贝函数!
2. I/O = 等待条件满足 + 调用拷贝函数,一般来说,要进行拷贝必须要先进行判断,只有条件满足才能进行读写事件,因此我们大部分时候都处在等待条件满足的状态!
那么什么才叫做高效 I/O 呢?——在单位时间内,I/O 过程中,等待条件满足的时间占比越小,I/O 的效率越高!几乎所有提高 I/O 效率的策略其本质都是使等待条件满足的时间占比变小。
2. 五种 I/O 模型
5 种 I/O 模型分别是 阻塞式 I/O、非阻塞式 I/O、信号驱动 I/O、多路复用 I/O 和 异步 I/O。我们在这里以买票为具体例子来讲讲它们
- 阻塞式 I/O (Blocking I/O)
老王去火车站排队买票,他站在队伍中等待,直到轮到他买到票。在这个过程中,老王不能做其他任何事情,他被阻塞在买票这个操作上。- 非阻塞式 I/O (Non-blocking I/O)
老王去火车站买票,但他告诉售票员他不等待,如果票还没好,他就先去做其他事情,等票好了再回来。这样老王就不会一直站在售票窗口前等待,而是可以自由地去处理其他事务。- 信号驱动 I/O (Signal-driven I/O)
老王通过电话预订火车票,他告诉售票员一旦票准备好就给他打电话。然后老王就去做其他事情,当票准备好后,售票员会给他打电话,他再去取票。在这个过程中,老王不需要一直等待,而是通过信号(电话)来得知票的状态。- 多路复用 I/O (I/O Multiplexing)
老王是票务代理,他负责多个客户的购票需求。他通过一个系统(比如select/poll/epoll)来监控多个客户的购票状态,一旦某个客户的票状态更新(比如票已准备好),系统就会通知他去处理这个客户的购票请求。这样老王就可以同时处理多个客户的购票需求,而不需要分别去每个窗口排队。- 异步 I/O (Asynchronous I/O)
老王在线预订火车票,他提交订单后就可以去做其他事情了。售票系统会在后台处理他的订单,包括支付、出票等所有步骤。当所有步骤完成后,售票系统会将票直接快递到老王家,或者通过电子邮件发送电子票给他。老王不需要参与购票过程中的任何步骤,一切都由系统自动完成。
① 阻塞 && 非阻塞
我们知道 I/O = 等待条件满足 + 拷贝,而从买票的例子中我们可以看出,阻塞与非阻塞只在等待条件满足的方式上不同,阻塞是条件没满足就一直等,而非阻塞是条件没满足就先去做其他事。
② 同步 I/O && 异步 I/O
前面四种 I/O 方式都属于同步 I/O,从例子中我们可以看出,异步 I/O 与同步 I/O 最大的不同就是他并不参与 I/O,只负责发起 I/O,只需要在最后得到结果就行。
我们主要介绍多路转接 I/O 技术。
3. select 模型
① 非阻塞 I/O
一个文件描述符被创建出来时默认都是阻塞 I/O 的模式,我们可以使用 fcntl() 函数来将其设置为非阻塞模式,函数接口如下
int fcntl(int fd, int cmd, ... /* arg */ );
这个函数有 5 种功能
- 复制一个现有的描述符 (cmd=F_DUPFD)
- 获得/设置文件描述符标记 (cmd=F_GETFD或F_SETFD)
- 获得/设置文件状态标记 (cmd=F_GETFL或F_SETFL)
- 获得/设置异步I/O所有权 (cmd=F_GETOWN或F_SETOWN)
- 获得/设置记录锁 (cmd=F_GETLK,F_SETLK或F_SETLKW)
在这里我们只使用第三种功能,获取/设置文件状态标记,就可以将一个文件描述符设置为非阻塞。举个例子,在正常情况下,文件描述符都是阻塞形式的,代码如下
#include <iostream>
#include <unistd.h>using namespace std;int main()
{char buffer[1024];while(true){cout << "Please Enter: " << endl;ssize_t n = read(0, buffer, sizeof(buffer)-1);if (n > 0){buffer[n] = 0;cout << "echo: " << buffer << endl;}else if (n == 0){cout << "read done" << endl;}else{cerr << "read error" << endl;}}return 0;
}
运行效果如下
接下来我们使用 fcntl 函数将其设置为非阻塞,代码如下
#include <iostream>
#include <unistd.h>
#include <fcntl.h>using namespace std;void SetNonBlock(int fd)
{// 获取文件描述符 fd 当前的文件状态标志int fl = fcntl(fd, F_GETFL);if(fl < 0) // 调用失败处理{perror("fcntl");return;}// 将 fd 设置为非阻塞模式fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}int main()
{char buffer[1024];SetNonBlock(0);sleep(1);while(true){cout << "Please Enter: " << endl;ssize_t n = read(0, buffer, sizeof(buffer)-1);if (n > 0){buffer[n] = 0;cout << "echo: " << buffer << endl;}else if (n == 0){cout << "read done" << endl;}else{cerr << "read error" << endl;}}return 0;
}
运行后发现
显示 read 出错了?这是怎么回事呢?
1. 一旦将一个文件描述符设置成非阻塞状态,如果底层 fd 数据没有准备就绪,那么 recv/read/write/send 等函数的返回值会以出错的形式返回
2. 也就是说,如果显示 read 出错可能有两种情况,一种是真的 read 出错了;另一种是底层 fd 没有就绪
3. 那我们如何区分这两种出错呢?——使用 errno!
修改后代码如下
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <fcntl.h>using namespace std;void SetNonBlock(int fd)
{// 获取文件描述符 fd 当前的文件状态标志int fl = fcntl(fd, F_GETFL);if(fl < 0) // 调用失败处理{perror("fcntl");return;}// 将 fd 设置为非阻塞模式fcntl(fd, F_SETFL, fl | O_NONBLOCK);cout << "set " << fd << " nonblock done!" << endl;
}int main()
{char buffer[1024];SetNonBlock(0);sleep(1);while(true){cout << "Please Enter: " << endl;ssize_t n = read(0, buffer, sizeof(buffer)-1);if (n > 0){buffer[n-1] = 0;cout << "echo: " << buffer << endl;}else if (n == 0){cout << "read done" << endl;break;}else{if (errno == EWOULDBLOCK){cout << "0 fd data not ready, try again!" << endl;sleep(1);}else{cerr << "read error, n = " << n << ", errno code: " << errno << ", error str: " << strerror(errno) << endl;}}}return 0;
}
运行效果如下
② select 接口介绍
我们来介绍一下 select 模型,我们先前说 I/O = 等待 + 拷贝,而 select 只负责进行等,一次可以等待多个 fd,我们来看看它的接口
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
- 对于返回值 n 来说,若 n > 0 则表示有 n 个 fd 已经就绪了;若 n = 0 则表示超时了,既没有错误也没有 fd 就绪; 若 n < 0 则表示等待出错了
- 对于第一个参数 int nfds 来说,这个参数它指定了
readfds
、writefds
和exceptfds
这三个集合中监控的最大文件描述符值加一,也就是说,nfds
是我们所监控的文件描述符中最大的那个值加一。- 对于 2、3、4 个参数来说,fd_set 是 Linux 内核提供的数据类型位图,在这里我们以 readfds 为例来讲解,剩下的两个参数含义都与其相似
fd_set *readfds 是一个输入输出型参数。在输入时,它的意思是用户告诉 Linux 内核,我会给你一个或者多个 fd ,你要帮我关心 fd 上的读事件,如果读事件就绪就告诉我;在输出时,它的意思是 Linux 内核告诉用户,你让我关心的多个 fd 中,有哪些已经准备就绪了,你可以进行读取了!我们可以通过画图来理解
除了读事件 readfds 外,还有写事件 writefds 和异常事件 exceptfds,此外,我们可以看到 fd_set 是一张位图,它是用来让用户与内核互相传递 fd 是否就绪的信息的!而 fd_set 是操作系统提供的,同时 select 在使用中一定会存在大量的位图操作,因此 OS 会为位图的操作提供一些接口
void FD_CLR(int fd, fd_set *set); // 从fd_set集合中移除指定的文件描述符fd
int FD_ISSET(int fd, fd_set *set); // 检查fd_set集合中是否包含指定的文件描述符fd
void FD_SET(int fd, fd_set *set); // 向fd_set集合中添加指定的文件描述符fd
void FD_ZERO(fd_set *set); // 清空整个fd_set集合,移除其中所有的文件描述符
- 对于最后一个参数 struct timeval *timeout 来说,这是一个输入输出型参数,它指向 timeval 结构的指针,指定了 select 调用的超时时间。如果 timeout 设置为 NULL,select 调用会一直阻塞,直到至少有一个文件描述符准备好。如果 timeout 不是 NULL,select 会在指定的时间内阻塞,等待文件描述符准备好或者超时。举个例子,如果 timeout = [0, 0] 则会立马返回,其相当于非阻塞的一种;如果 timeout = [5, 0] 表示每隔 5s 超时一次。
③ 代码实现
我们实现一个基于 TCP 的 select 服务器
#pragma once
#include <iostream>
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>#include "Log.hpp"
extern Log lg;// 默认端口号
static const uint16_t defaultport = 8080;// select 中的 fd_set 位图比特位的个数上限为 1024, 也就是说在一个事件中, 最多能同时监视 1024 个 fd
// 因此我们需要创建一个辅助数组来进行 select 函数的信息传递
static const int fd_num_max = (sizeof(fd_set)*8); // 辅助数组的最大长度
const int defaultfd = -1; // 辅助数组中元素的默认值class SelectServer
{
public:SelectServer(uint16_t port = defaultport): _port(port){// 初始化辅助数组for (int i = 0; i < fd_num_max; i++){fd_array[i] = defaultfd;}}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true;}void HandlerEvent(fd_set &rfds){if(FD_ISSET(_listensock.Fd(), &rfds)){// 连接事件已经就绪std::string clientip;uint16_t clientport = 0;// 会在这里阻塞吗?不会int sock = _listensock.Accept(&clientip, &clientport); // 获取 sock 后不能直接 read, 因为可能不会立刻发送数据 -> 交由 select 关心// 服务器接受一个新的客户端连接(sock)之后,不应该立即尝试从这个新的套接字读取数据。// 原因是客户端可能不会立即发送数据,如果服务器立即尝试读取,那么读取操作可能会阻塞,因为没有任何数据可读。if (sock < 0) return;lg(Info, "accept success, %s:%d", clientip.c_str(), clientport);// "交由 select 关心"意味着服务器应该将这个新的套接字(sock)添加到 fd_set 集合中,然后继续使用 select 函数来监控这个套接字。// 将 sock 设置进 fd_array 中int pos = 1;for(; pos < fd_num_max; pos++){if (fd_array[pos] != defaultfd) continue;else break;}if (pos == fd_num_max){lg(Warning, "server is full, close %d now!", sock);close(sock);}else{fd_array[pos] = sock;}}}void Start(){int listensock =_listensock.Fd();fd_array[0] = listensock;for (;;){// 每次都要重新设置 rfdsfd_set rfds;FD_ZERO(&rfds);// 使用辅助数组 fd_array 进行 select 函数信息传递// 设置 select 函数需要监控的文件描述符集合int maxfd = fd_array[0]; // select 的第一个参数for (int i = 0; i < fd_num_max; i++){// fd_array 若为默认值则表明不需要关心这个 fdif (fd_array[i] == defaultfd){continue;}FD_SET(fd_array[i], &rfds);// 不断更新 maxfdif (maxfd < fd_array[i]){maxfd = fd_array[i];lg(Debug, "maxfd update, maxfd: %d", maxfd);sleep(1);}}// 不能直接accept! accept 的功能是检测并获取 listensock 上面的事件,新连接到来,等价于读事件就绪// 输入输出型参数,需要周期性的重复设置,否则可能会陷入死循环struct timeval timeout = {0, 0};// select 的特点: 如果事件就绪,上层一直不处理,select 就会一直通知用户!// select 告诉用户资源就绪后,接下来的一次读取,在读取 fd 的时候,不会被阻塞int n = select(maxfd+1, &rfds, nullptr, nullptr, /*&timeout*/nullptr);switch (n){case 0:lg(Info, "time out, timeout: %ld, %ld", (long)timeout.tv_sec, (long)timeout.tv_usec);break;case -1:lg(Error, "select error");break;default: // 有事件就绪lg(Info, "get a new link!");HandlerEvent(rfds);break;}}}~SelectServer(){}
private:Sock _listensock; // 监听套接字uint16_t _port; // 端口号int fd_array[fd_num_max]; // 辅助数组
};
我们使用本地环回测试有
而除了监听套接字外还应该有接收处理,完整代码如下
#pragma once
#include <iostream>
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>#include "Log.hpp"
extern Log lg;// 默认端口号
static const uint16_t defaultport = 8080;// select 中的 fd_set 位图比特位的个数上限为 1024, 也就是说在一个事件中, 最多能同时监视 1024 个 fd
// 因此我们需要创建一个辅助数组来进行 select 函数的信息传递
static const int fd_num_max = (sizeof(fd_set)*8); // 辅助数组的最大长度
const int defaultfd = -1; // 辅助数组中元素的默认值class SelectServer
{
public:SelectServer(uint16_t port = defaultport): _port(port){// 初始化辅助数组for (int i = 0; i < fd_num_max; i++){fd_array[i] = defaultfd;}}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true;}void PrintFd(){std::cout << "online fd list: ";for (int i = 0; i < fd_num_max; i++){if (fd_array[i] == defaultfd) continue;std::cout << fd_array[i] << " ";}std::cout << std::endl;}void HandlerEvent(fd_set &rfds){for (int i = 0; i < fd_num_max; i++){int fd = fd_array[i];if (fd == defaultfd) continue;// 事件已经就绪if(FD_ISSET(fd, &rfds)){// 当前 fd 是监听套接字if (fd == _listensock.Fd()){// 连接事件已经就绪std::string clientip;uint16_t clientport = 0;// 会在这里阻塞吗?不会int sock = _listensock.Accept(&clientip, &clientport); // 获取 sock 后不能直接 read, 因为可能不会立刻发送数据 -> 交由 select 关心// 服务器接受一个新的客户端连接(sock)之后,不应该立即尝试从这个新的套接字读取数据。// 原因是客户端可能不会立即发送数据,如果服务器立即尝试读取,那么读取操作可能会阻塞,因为没有任何数据可读。if (sock < 0) return;lg(Info, "accept success, %s:%d, sockfd: %d", clientip.c_str(), clientport, sock);// "交由 select 关心"意味着服务器应该将这个新的套接字(sock)添加到 fd_set 集合中,然后继续使用 select 函数来监控这个套接字。 // 找到一个最新的 pos 来放置 sockint pos = 1;for(; pos < fd_num_max; pos++){if (fd_array[pos] != defaultfd) continue;else break;}if (pos == fd_num_max) // 服务器已满{lg(Warning, "server is full, close %d now!", sock);close(sock);}else // 将 sock 设置进 fd_array 中{fd_array[pos] = sock;PrintFd(); // 打印正在使用的 sockfd}} else // 当前 fd 不是监听套接字{char buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "get a messge: " << buffer << std::endl;}else if(n == 0){lg(Info, "client quit, me too, close fd %d", fd);close(fd);fd_array[i] = defaultfd; // 这里本质是从select中移除}else{lg(Warning, "recv error: fd is %d", fd);close(fd);fd_array[i] = defaultfd; // 这里本质是从select中移除}} }} }void Start(){int listensock =_listensock.Fd();fd_array[0] = listensock;for (;;){// 每次都要重新设置 rfdsfd_set rfds;FD_ZERO(&rfds);// 使用辅助数组 fd_array 进行 select 函数信息传递// 设置 select 函数需要监控的文件描述符集合int maxfd = fd_array[0]; // select 的第一个参数for (int i = 0; i < fd_num_max; i++){// fd_array 若为默认值则表明不需要关心这个 fdif (fd_array[i] == defaultfd){continue;}FD_SET(fd_array[i], &rfds);// 不断更新 maxfdif (maxfd < fd_array[i]){maxfd = fd_array[i];lg(Debug, "maxfd update, maxfd: %d", maxfd);sleep(1);}}// 不能直接accept! accept 的功能是检测并获取 listensock 上面的事件,新连接到来,等价于读事件就绪// 输入输出型参数,需要周期性的重复设置,否则可能会陷入死循环struct timeval timeout = {0, 0};// select 的特点: 如果事件就绪,上层一直不处理,select 就会一直通知用户!// select 告诉用户资源就绪后,接下来的一次读取,在读取 fd 的时候,不会被阻塞int n = select(maxfd+1, &rfds, nullptr, nullptr, /*&timeout*/nullptr);switch (n){case 0:lg(Info, "time out, timeout: %ld, %ld", (long)timeout.tv_sec, (long)timeout.tv_usec);break;case -1:lg(Error, "select error");break;default: // 有事件就绪lg(Info, "get a new link!");HandlerEvent(rfds);break;}}}~SelectServer(){_listensock.Close();}
private:Sock _listensock; // 监听套接字uint16_t _port; // 端口号int fd_array[fd_num_max]; // 辅助读事件的数组// int wfd_array[fd_num_max]; // 辅助写事件的数组// int expfd_array[fd_num_max]; // 辅助异常事件的数组
};
④ select 模型的优缺点
优点
就算是单进程也能等待多个文件描述符
缺点
1. 等待的 fd 有上限
2. 输入输出型参数比较多,数据拷贝频率高(数据不断地在内核和用户之间切换,而且遍历式查询的效率相对比较低下)
3. 输入输出型参数比较多,每次都要重置需要关心的 fd 事件
4. 用户在使用时,要使用辅助数组管理用户的 fd,所以用户需要很多次遍历;而在内核中,内核检测 fd 事件是否就绪,也要进行遍历,这样就会导致效率大大降低
4. poll 模型
① poll 接口介绍
poll 模型和 select 类似,都只负责等待描述符,其接口如下
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- 第一个参数的结构如下
struct pollfd
{int fd; /* 文件描述符 */short events; /* 要查询的事件 */short revents; /* 实际发生的事件 */
};
这个参数有效地将输入和输出的事件进行了分离!
events和revents的取值如图所示
- 第二个参数表示一共有多少元素,即需要监视的文件描述符的数量
- 第三个参数表示要设置的状态,若为 0 表示设置为非阻塞状态,若为 -1 表示设置为永久阻塞状态
② 代码实现
poll 的整体代码与 select 的服务器类似,代码如下
#pragma once
#include <iostream>
#include "Socket.hpp"
#include <poll.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>#include "Log.hpp"
extern Log lg;// 默认端口号
static const uint16_t defaultport = 8080;static const int fd_num_max = 64; // 辅助数组的最大长度
const int defaultfd = -1; // 辅助数组中 fd 的默认值
const int non_event = 0; // 辅助数组中 events && revents 的默认值class PollServer
{
public:PollServer(uint16_t port = defaultport): _port(port){// 初始化辅助数组for (int i = 0; i < fd_num_max; i++){_event_fds[i].fd = defaultfd;_event_fds[i].events = non_event;_event_fds[i].revents = non_event;}}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true;}void PrintFd(){std::cout << "online fd list: ";for (int i = 0; i < fd_num_max; i++){if (_event_fds[i].fd == defaultfd) continue;std::cout << _event_fds[i].fd << " ";}std::cout << std::endl;}void HandlerEvent(){for (int i = 0; i < fd_num_max; i++){int fd = _event_fds[i].fd;if (fd == defaultfd) continue;// 事件已经就绪if(_event_fds[i].revents & POLLIN){// 当前 fd 是监听套接字if (fd == _listensock.Fd()){// 连接事件已经就绪std::string clientip;uint16_t clientport = 0;// 会在这里阻塞吗?不会int sock = _listensock.Accept(&clientip, &clientport); // 获取 sock 后不能直接 read, 因为可能不会立刻发送数据 -> 交由 poll 关心// 服务器接受一个新的客户端连接(sock)之后,不应该立即尝试从这个新的套接字读取数据。// 原因是客户端可能不会立即发送数据,如果服务器立即尝试读取,那么读取操作可能会阻塞,因为没有任何数据可读。if (sock < 0) return;lg(Info, "accept success, %s:%d, sockfd: %d", clientip.c_str(), clientport, sock);// 找到一个最新的 pos 来放置 sockint pos = 1;for(; pos < fd_num_max; pos++){if (_event_fds[pos].fd != defaultfd) continue;else break;}if (pos == fd_num_max) // 服务器已满,可以进行扩容操作{lg(Warning, "server is full, close %d now!", sock);close(sock);}else // 将 sock 设置进 fd_array 中{_event_fds[pos].fd = sock;_event_fds[pos].events = POLLIN;_event_fds[pos].revents = non_event;PrintFd(); // 打印正在使用的 sockfd}} else // 当前 fd 不是监听套接字{char buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "get a messge: " << buffer << std::endl;}else if(n == 0){lg(Info, "client quit, me too, close fd %d", fd);close(fd);_event_fds[i].fd = defaultfd; }else{lg(Warning, "recv error: fd is %d", fd);close(fd);_event_fds[i].fd = defaultfd; }} }} }void Start(){_event_fds[0].fd = _listensock.Fd();_event_fds[0].events = POLLIN;int timeout = 3000;// 3sfor (;;){int n = poll(_event_fds, fd_num_max, timeout);switch(n){case 0:lg(Info, "time out...");break;case -1:lg(Error, "poll error");break;default:lg(Info, "get a new link!");HandlerEvent();break;}}}~PollServer(){_listensock.Close();}
private:Sock _listensock; // 监听套接字uint16_t _port; // 端口号struct pollfd _event_fds[fd_num_max]; // 辅助数组
};
运行效果如下
③ 与 select 模型的异同
从接口上我们可以看到,select 模型等待的 fd 有上限,但是 poll 数组通过用户手动设置来解决这个上限问题;但是 poll 模型在效率问题上还是没有什么改变,因为它同样需要在用户层与内核间多次进行拷贝!因此,我们在 poll 的基础上对其进行延伸,即—— epoll(extend poll)!
5. epoll 模型
① epoll 接口介绍
我们先来看看 epoll 的相关接口,如图所示
我们首先来看看这些接口
int epoll_create(int size);
- 返回值 int 返回 epoll 模型的句柄,这个句柄是 epoll_wait、epoll_ctl 的第一个参数,用于对特定示例进行控制;
- 第一个参数 size 指定了需要监听的文件描述符数量。虽然在 Linux 2.6.8 版本之后, size 参数已经不再被使用,因为内核会根据需要动态分配内存来存储文件描述符,但是为了保持向后兼容性,size 参数仍然被保留。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- 返回值 int 表示已经就绪的 fd 个数;
- 第一个参数 epfd 表示可以通过这个描述符向 epoll 实例中添加、删除或修改要监视的文件描述符
- 第二个参数是一个指向 epoll_event 结构数组的指针,用于接收准备就绪的事件。epoll_event 结构包含与事件相关的文件描述符和数据。在调用 epoll_wait 后,events 数组会被填充为准备就绪的文件描述符和它们关联的事件(例如读就绪、写就绪等),其结构如下
typedef union epoll_data
{void *ptr; // 通用指针,可以指向任何类型的数据,通常用于传递上下文信息给回调函数int fd; // 文件描述符,用于标识某个特定的文件或者socketuint32_t u32; // 无符号32位整数,可以用于传递一些整数值uint64_t u64; // 无符号64位整数,可以用于传递更大的整数值或者两个32位整数
} epoll_data_t;struct epoll_event
{uint32_t events; // 通过位图形式传递标记位epoll_data_t data; // 用户级数据
};
- 第三个参数指定了 events 数组可以容纳的最大事件数。epoll_wait 最多会返回这个数目的准备就绪事件。如果少于这个数目的事件准备就绪,那么实际返回的事件数会少于 maxevents。如果这个参数设置为 0,那么 epoll_wait 将立即返回,不等待任何事件发生。
- 第四个参数表示等待事件的超时时间,单位是毫秒。如果设置为 -1,表示无限期等待;如果设置为 0,则表示立即返回,不会等待任何事件;如果设置为一个正数,则表示等待事件的最大时间。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- 返回值 int 表示已经就绪的 fd 个数;
- 第一个参数 epfd 表示可以通过这个描述符向 epoll 实例中添加、删除或修改要监视的文件描述符
- 第二个参数 op 是一个整数,表示要对 epoll 实例执行的操作类型。其可能的值如下:
EPOLL_CTL_ADD:在 epoll 的监视列表中添加一个文件描述符(即参数 fd),并指定监视的事件类型(参数 event)。
EPOLL_CTL_MOD:修改监视列表中已经存在的描述符(即参数 fd)对应的监视事件类型(参数 event)。
EPOLL_CTL_DEL:从 epoll 的监视列表中删除一个文件描述符(即参数 fd)。在这种情况下,参数 event 被忽略,可以为 NULL。- 第三和第四个参数表示要对哪个 fd 操作哪些 events。
② epoll 的实现原理
如图所示
当数据链路层接收到信息时,OS 会在网卡驱动层调用 callback 回调函数(OS 如何在硬件层面知道网卡上有数据的呢?——硬件中断),而回调函数主要会完成四个任务
1.向上层交付
2.通过 struct file 中包含的 void* private_data 指针,将其指向内核的 struct socket 结构体,在这个结构体中再有一个指针指向 struct sock,而 sock 中有一个 recv_queue 和send_queue,最终将数据交付给TCP的接收队列
3.查找 rb_tree节点,将当前数据中的 fd 作为键值,如果当前 fd 是被用户关心的,且事件(EPOLLIN/EPOLLOUT)也是被用户关心的,就执行4
4.构建就绪节点,插入到就绪节点中
作为用户,我们只需要从就绪队列中获取就绪节点即可!红黑树、就绪队列和回调函数这三种机制共同组成了 epoll 模型。而我们先前所说的三个接口就与这三种机制相关
- epoll_create 本质就是向红黑树中添加一个节点
- epoll_ctl 本质就是在红黑树中修改一个节点的具体内容
- epoll_wait 本质就是在遍历就绪队列,并将其取出
因此我们可以将红黑树看成 select 与 poll 中的数组!
③ epoll 的优势
- 检测是否就绪的时间复杂度为 O(1),获取就绪事件的时间复杂度为 O(n)
- fd 与 event 没有上限
- 返回值 n 表示有几个 fd 准备就绪了,而就绪事件是连续的!也就是有返回值个连续就绪事件!
④ 代码实现
接下来我们就粗略实现一下 epoll 的 TCP 服务器,代码如下
#pragma once// 将构造函数、拷贝构造函数与赋值运算符重载都设置成不可拷贝
class nocopy
{
public:nocopy() {};nocopy(const nocopy&) = delete;const nocopy& operator=(const nocopy&) = delete;
};#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <cerrno>
#include "nocopy.hpp"#include "Log.hpp"class Epoller : public nocopy // 禁止拷贝
{static const int size = 128; // epoll_create 的参数
public:Epoller() {// 创建 epoll 句柄_epfd = epoll_create(size);if (_epfd == -1){lg(Error, "epoll_create error: %s", strerror(errno));}else{lg(Info, "epoll_craete success: %d", _epfd);}}// 封装 epoll_wait 函数int EpollerWait(struct epoll_event revents[], int num){int n = epoll_wait(_epfd, revents, num, /*_timeout 0*/ -1);return n;}// 封装 epoll_ctl 函数int EpollerUpdate(int oper, int sock, uint32_t event){int n = 0;if (oper == EPOLL_CTL_DEL) // -> 从 epoll 的监视列表中删除一个文件描述符(即参数 fd){std::cout << "EPOLL_CTL_DEL" << std::endl;n = epoll_ctl(_epfd, oper, sock, nullptr);if (n != 0){lg(Error, "epoll_ctl delete error!");}else lg(Info, "epoll_ctl delete success!");}else {// EPOLL_CTL_MOD -> 修改监视列表中已经存在的描述符(即参数 fd)对应的监视事件类型(参数 event)// or // EPOLL_CTL_ADD -> 在 epoll 的监视列表中添加一个文件描述符(即参数 fd), 并指定监视的事件类型(参数 event)struct epoll_event ev;ev.events = event;ev.data.fd = sock; // 方便我们以后得知是哪一个 fd 就绪了n = epoll_ctl(_epfd, oper, sock, &ev);if (n != 0){lg(Error, "epoll_ctl error! ");}if (oper == EPOLL_CTL_ADD) lg(Info, "epoll_ctl add success!");else lg(Info, "epoll_ctl mod success!");}return n;}~Epoller() {if (_epfd >= 0){close(_epfd);}}
private:int _epfd;int _timeout{3000};
};#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <memory>
#include <sys/epoll.h>
#include <cerrno>
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Log.hpp"// epoll_ctl 的最后一个参数
uint32_t EVENT_IN = EPOLLIN; // 读事件
uint32_t EVENT_OUT = EPOLLOUT; // 写事件class EpollServer : public nocopy // 将服务器设置为禁止拷贝
{static const int num = 64; // epoll_create 的参数
public:EpollServer(uint16_t port = 8080) :_port(port),_listensocket_ptr(new Sock()),_epoller_ptr(new Epoller()){}// 初始化服务器void Init() {_listensocket_ptr->Socket();_listensocket_ptr->Bind(_port);_listensocket_ptr->Listen();lg(Info, "create listen socket success: %d", _listensocket_ptr->Fd());}// 链接操作void Accepter(){std::string clientip;uint16_t clientport;int sock = _listensocket_ptr->Accept(&clientip, &clientport);if (sock > 0){// 我们不能直接读取_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, EVENT_IN);lg(Info, "get a new link, client info: %s:%d", clientip.c_str(), clientport);sleep(5);}}// 读取操作void Recver(int fd){ while (true){char buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?if (n > 0){buffer[n] = 0;std::cout << "get a messge: " << buffer << std::endl;// writestd::string echo_str = "server echo: ";echo_str += buffer;write(fd, echo_str.c_str(), echo_str.size());}else if (n == 0){lg(Info, "client quit, me too, close fd is: %d", fd);// 先从 epoll 模型中移除,再 close _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);close(fd);}else{lg(Warning, "recv error: fd is: %d", fd);_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);close(fd);}sleep(1);}}// 根据事件类型(如读就绪、写就绪等)执行相应的操作, 即 分派器(调度器)void Dispatcher(struct epoll_event revs[], int num){// 读取 revs 中的所有事件与 fdfor(int i = 0; i < num; i++){uint32_t events = revs[i].events;int fd = revs[i].data.fd;if (events & EVENT_IN) // 对读事件的处理{if (fd == _listensocket_ptr->Fd()){//获取了一个新连接Accepter();}else{//其他fd上面的普通读取事件就绪Recver(fd);}}else if (events & EVENT_OUT) // 对写事件的处理{}else // 对其他事件的处理{}}}void Start() {// 将 listensock 添加到 epoll 中 -> 将 listensock 和他关心的事件,添加到内核 epoll 模型的 rb_tree 中_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, _listensocket_ptr->Fd(), EVENT_IN);struct epoll_event revs[num];for (;;){int n = _epoller_ptr->EpollerWait(revs, num);if (n > 0) // 有事件就绪{lg(Debug, "event happend, fd is: %d", revs[0].data.fd);Dispatcher(revs, num);}else if (n == 0){lg(Info, "time out...");}else{lg(Error, "epoll wait error!");}}}~EpollServer() {_listensocket_ptr->Close();}
private:std::shared_ptr<Sock> _listensocket_ptr; // 监听套接字std::shared_ptr<Epoller> _epoller_ptr; // epoll 句柄uint16_t _port; // 端口号
};
⑤ epoll 的两种工作模式
实际上,epoll 模型有两种工作模式,分别是 LT(Level Triggered) 和 ET(Edge Triggered)
我们举个简单的例子,来理解它们
假如现在你正在打游戏,眼看着快赢了,你妈饭做好了,叫你吃饭有两种方式:
1.如果你妈喊你一次,你没动,那么你妈会继续喊你第二次,第三次...(亲妈,水平触发)2.如果你妈喊你一次,你没动,你妈就不管你了(后妈,边缘触发)
即
- LT: 水平触发工作模式(epoll的默认工作模式),事件到来后,上层如果一直不处理,epoll 就会一直告知上层来取数据
- ET: 边缘触发工作模式,只有当数据或链接从无到有、从有到多,也就是数据发生变化时,才会通知上层一次
对比两种工作模式,我们明显可以看到 ET 模式的通知效率更高!
1. 为什么 ET 模式的文件描述符 fd 要设置成非阻塞形式?
ET 的工作模式会倒闭程序员每次都必须将本轮数据全部取走,为此我们就需要不断地进行循环读取直到循环读取出错,但是 fd 默认是阻塞的工作模式,为了防止我们循环读取时,fd 的阻塞导致 read 操作被挂起,我们必须将 fd 设置为 non_block(非阻塞) 形式!
而在倒逼程序员将数据全部取走时,本质就是让 TCP 给对方通告一个更大的窗口,从而从概率上让对方一次发送更多的数据,也就是说 ET 模式的 I/O 效率也会更高!
2. ET 模式的效率一定比 LT 高吗?
既然 ET 模式可以设置成非阻塞形式,那 LT 模式就不能设置 fd 为 non_block,然后循环读取吗?或者 LT 在第一次通知的时候就将数据全部取走,不就和 ET 一样了吗?因此,具体的效率比较还是需要看具体的代码怎么写
3. LT 和 ET 的本质区别
我们先前提到过 epoll 在 OS 中的模型
- 在 LT 模式下,只要文件描述符上有数据可读,epoll 就会持续将该文件描述符标记为就绪状态,并将其放入就绪队列中。也就是说,如果应用程序没有读取完所有数据,epoll 会不断地在每次 epoll_wait 调用时将该文件描述符重新加入就绪队列,直到应用程序读取了所有数据。
- 在 ET 模式下,epoll 仅在文件描述符的状态发生变化时(例如,从未有数据到有数据可读)将其标记为就绪状态,并将其放入就绪队列中。一旦状态变化被检测到并且文件描述符被加入就绪队列,epoll 就不会再次将其加入,除非再次检测到新的状态变化(例如,新的数据到达)。
所以我们可以说, LT 和 ET 在本质上就是向就绪队列添加一次和次次添加的区别!
6. 对多路转接代码进行设计与整合
① 代码的设计与整合
我们使用边缘触发的方式实现一下服务器,代码如下
// Comm.hpp
#pragma once#include <unistd.h>
#include <fcntl.h>
#include "Log.hpp"// 设置为非阻塞模式
void SetNonBlockOrDie(int sock)
{int fl = fcntl(sock, F_GETFL);if (fl < 0) {lg(Error, "fcntl error!");exit(5);}fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}// Epoller.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <cerrno>
#include <cstring>
#include "nocopy.hpp"#include "Log.hpp"class Epoller : public nocopy // 禁止拷贝
{static const int size = 128; // epoll_create 的参数
public:Epoller() {// 创建 epoll 句柄_epfd = epoll_create(size);if (_epfd == -1){lg(Error, "epoll_create error: %s", strerror(errno));}else{lg(Info, "epoll_craete success: %d", _epfd);}}// 封装 epoll_wait 函数int EpollerWait(struct epoll_event revents[], int num, int timeout = -1){int n = epoll_wait(_epfd, revents, num, timeout);return n;}// 封装 epoll_ctl 函数int EpollerUpdate(int oper, int sock, uint32_t event){int n = 0;if (oper == EPOLL_CTL_DEL) // -> 从 epoll 的监视列表中删除一个文件描述符(即参数 fd){std::cout << "EPOLL_CTL_DEL" << std::endl;n = epoll_ctl(_epfd, oper, sock, nullptr);if (n != 0){lg(Error, "epoll_ctl delete error!");}else lg(Info, "epoll_ctl delete success!");}else {// EPOLL_CTL_MOD -> 修改监视列表中已经存在的描述符(即参数 fd)对应的监视事件类型(参数 event)// or // EPOLL_CTL_ADD -> 在 epoll 的监视列表中添加一个文件描述符(即参数 fd), 并指定监视的事件类型(参数 event)struct epoll_event ev;ev.events = event;ev.data.fd = sock; // 方便我们以后得知是哪一个 fd 就绪了n = epoll_ctl(_epfd, oper, sock, &ev);if (n != 0){lg(Error, "epoll_ctl error! ");}if (oper == EPOLL_CTL_ADD) lg(Info, "epoll_ctl add success!");else lg(Info, "epoll_ctl mod success!");}return n;}~Epoller() {if (_epfd >= 0){close(_epfd);}}
private:int _epfd;int _timeout{3000};
};// TcpServer.hpp
#pragma once
#include <iostream>
#include <string>
#include <unordered_map>
#include <memory>
#include <functional>
#include "Epoller.hpp"
#include "Comm.hpp"
#include "Log.hpp"
#include "Socket.hpp"class Connection;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET); // 边缘触发方式 读事件
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET); // 边缘触发方式 写事件// 设置回调函数形式
using func_t = std::function<void(std::shared_ptr<Connection>)>;// 通过 Connection 类实现
// 1. 封装连接信息 2. 回调机制 3. 与服务器关联 4. 资源管理
class Connection
{
public:Connection(int sock, std::shared_ptr<TcpServer> tcp_server_ptr) :_sock(sock),_tcp_server_ptr(tcp_server_ptr){}// 获取套接字int SockFd() { return _sock; }// 设置实例的回调函数void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}~Connection() {}
private:int _sock; // 套接字描述符std::string _inbuffer; // 输入缓冲区std::string _outbuffer; // 输出缓冲区public:func_t _recv_cb; // 读事件回调函数func_t _send_cb; // 写件回调函数func_t _except_cb; // 异常事件回调函数// 回指指针std::shared_ptr<TcpServer> _tcp_server_ptr;
};class TcpServer
{static const int num = 64; // epoll_create 的参数
public:TcpServer(uint16_t port = 8080):_port(port),_epoller_ptr(new Epoller()),_listensock_ptr(new Sock()){}void Init() {_listensock_ptr->Socket();SetNonBlockOrDie(_listensock_ptr->Fd()); // ET 模式需要将套接字设置成非阻塞模式_listensock_ptr->Bind(_port);_listensock_ptr->Listen();lg(Info, "create listen socket success, sockfd: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, // Accepter 作为成员函数, 第一个参数必须是 this, 这之后我们需要再传入一个该函数自己的参数// 将传入的参数传递到 Accepter 函数的参数, 然后调用 std::bind(&TcpServer::Accepter, this, std::placeholders::_1),nullptr, nullptr);}void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){// 将 fd 和 event 加到内核中_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, event);// 给 sock 建立一个 connection 对象, 将 sock 添加到 Connection 中std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this));new_connection->SetHandler(recv_cb, send_cb, except_cb);// 同时, 将 sock 和 Connecion 放入 _connections_connections.insert(std::make_pair(sock, new_connection));lg(Debug, "add a new connection success, sockfd is: %d" , sock);}// 监听套接字的读事件处理函数void Accepter(std::shared_ptr<Connection> connection){while(true){// 获取客户端信息struct sockaddr_in peer;socklen_t len = sizeof(peer);// 调用全局命名空间中的 accept 函数,而不是当前类或命名空间中的任何同名函数int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);if(sock > 0){uint16_t peerport = ntohs(peer.sin_port);char ipbuf[128];inet_ntop(AF_INET, &peer.sin_addr, ipbuf, sizeof(ipbuf));lg(Debug, "get a new client, client info -> [%s:%d], sockfd: %d" , ipbuf, peerport, sock);// ET 模式需要将套接字设置成非阻塞模式SetNonBlockOrDie(sock); // 普通套接字的处理AddConnection(sock, EVENT_IN, nullptr, nullptr, nullptr); }else{if(errno == EWOULDBLOCK) break;else if(errno == EINTR) continue;else break;}}}// 判断 fd 是否被用户关心(是否存在 epoll 模型的红黑树中)bool IsConnectionSafe(int fd){auto iter = _connections.find(fd);if (iter == _connections.end()){return false;}else{ return true;}}// 任务派发器void Dispatcher(int timeout){// 查询当前有多少的事件就绪int n = _epoller_ptr->EpollerWait(revs, num, timeout);for (int i = 0; i < n; i++){uint32_t events = revs[i].events;int sock = revs[i].data.fd;//统一把事件异常转换成为读写问题if (events & EPOLLERR)events |= (EPOLLIN | EPOLLOUT);if (events & EPOLLHUP)events |= (EPOLLIN | EPOLLOUT);// 只处理 EPOLLIN 和 EPOLLOUTif ((events & EPOLLIN) && IsConnectionSafe(sock)){if(_connections[sock]->_recv_cb)_connections[sock]->_recv_cb(_connections[sock]);}if ((events & EPOLLOUT) && IsConnectionSafe(sock)){if(_connections[sock]->_send_cb)_connections[sock]->_send_cb(_connections[sock]);}}}void Loop() {_quit = false;// 我们添加对应的事件时, 除了要将 fd 和 event 加到内核中// 还要给 listensock 建立一个 connection 对象, 将 listensock 添加到 Connection 中// 同时, 将 listensock 和 Connecion 放入 _connections// AddConnection(); -> 移至初始化函数while (!_quit){Dispatcher(3000);}_quit = true;}~TcpServer() {}
private:std::shared_ptr<Epoller> _epoller_ptr; // epoll 句柄指针std::shared_ptr<Sock> _listensock_ptr; // 监听套接字指针std::unordered_map<int, std::shared_ptr<Connection>> _connections; // <sock, Connection> 对,表示每一个 sock 与其对应的各个处理方案struct epoll_event revs[num]; // epoll_wait 函数参数uint16_t _port; // 端口号bool _quit; // 服务器是否退出
};
粗略运行效果如下
接下来我们继续完成代码
1. Comm.hpp
#pragma once#include <unistd.h>
#include <fcntl.h>
#include "Log.hpp"// 设置为非阻塞模式
void SetNonBlockOrDie(int sock)
{int fl = fcntl(sock, F_GETFL);if (fl < 0) {lg(Error, "fcntl error!");exit(5);}fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
2. Epoller.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <cerrno>
#include <cstring>
#include "nocopy.hpp"#include "Log.hpp"class Epoller : public nocopy // 禁止拷贝
{static const int size = 128; // epoll_create 的参数
public:Epoller() {// 创建 epoll 句柄_epfd = epoll_create(size);if (_epfd == -1){lg(Error, "epoll_create error: %s", strerror(errno));}else{lg(Info, "epoll_craete success: %d", _epfd);}}// 封装 epoll_wait 函数int EpollerWait(struct epoll_event revents[], int num, int timeout = -1){int n = epoll_wait(_epfd, revents, num, timeout);return n;}// 封装 epoll_ctl 函数int EpollerUpdate(int oper, int sock, uint32_t event){int n = 0;if (oper == EPOLL_CTL_DEL) // -> 从 epoll 的监视列表中删除一个文件描述符(即参数 fd){std::cout << "EPOLL_CTL_DEL" << std::endl;n = epoll_ctl(_epfd, oper, sock, nullptr);if (n != 0){lg(Error, "epoll_ctl delete error!");}else lg(Info, "epoll_ctl delete success!");}else {// EPOLL_CTL_MOD -> 修改监视列表中已经存在的描述符(即参数 fd)对应的监视事件类型(参数 event)// or // EPOLL_CTL_ADD -> 在 epoll 的监视列表中添加一个文件描述符(即参数 fd), 并指定监视的事件类型(参数 event)struct epoll_event ev;ev.events = event;ev.data.fd = sock; // 方便我们以后得知是哪一个 fd 就绪了n = epoll_ctl(_epfd, oper, sock, &ev);if (n != 0){lg(Error, "epoll_ctl error! ");}if (oper == EPOLL_CTL_ADD) lg(Info, "epoll_ctl add success!");else lg(Info, "epoll_ctl mod success!");}return n;}~Epoller() {if (_epfd >= 0){close(_epfd);}}
private:int _epfd;int _timeout{3000};
};
3. Log.hpp
#pragma once#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>#define SIZE 1024#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4#define Screen 1
#define Onefile 2
#define Classfile 3#define LogFile "log.txt"class Log
{
public:// 构造函数Log(){// 设置默认日志模式为向屏幕打印printMethod = Screen;path = "./log/";}// 调整日志模式void Enable(int method){printMethod = method;}// 返回分级字符串std::string levelToString(int level){switch (level){case Info:return "Info";case Debug:return "Debug";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "None";}}// 根据不同模式,向不同位置打印日志void printLog(int level, const std::string &logtxt){switch (printMethod){case Screen:std::cout << logtxt;break;case Onefile:printOneFile(LogFile, logtxt);break;case Classfile:printClassFile(level, logtxt);break;default:break;}}// 向一个文件中写入日志void printOneFile(const std::string &logname, const std::string &logtxt){std::string _logname = path + logname;int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"if (fd < 0)return;write(fd, logtxt.c_str(), logtxt.size());close(fd);}// 向一个文件夹中写入日志void printClassFile(int level, const std::string &logtxt){std::string filename = LogFile;filename += ".";filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"printOneFile(filename, logtxt);}~Log(){}// 运算符重载 便于直接使用void operator()(int level, const char *format, ...){time_t t = time(nullptr);struct tm *ctime = localtime(&t);// leftbuffer - 存储等级信息与日期信息char leftbuffer[SIZE];snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,ctime->tm_hour, ctime->tm_min, ctime->tm_sec);// 将可变参数中传入的参数写入到rightbuffer中// rightbuffer - 存储用户提示va_list s;va_start(s, format);char rightbuffer[SIZE];vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);va_end(s);// 格式:默认部分+自定义部分char logtxt[SIZE * 2];snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);printLog(level, logtxt);}private:int printMethod;std::string path;
};Log lg;
4. nocopy.hpp
#pragma once// 将拷贝构造函数与赋值运算符重载都设置成不可拷贝
class nocopy
{
public:nocopy() {};nocopy(const nocopy&) = delete;const nocopy& operator=(const nocopy&) = delete;
};
5. Protocol.hpp
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>// 我们可以通过设置 MySelf 来控制是否使用 Json
// #define MySelf 1// 空白分隔符
const std::string blank_space_sep = " ";
// 报文分隔符
const std::string protocol_sep = "\n";// 封装报头
// "len"\n + content + "\n"
std::string Encode(std::string &content)
{std::string package = std::to_string(content.size());package += protocol_sep;package += content;package += protocol_sep;return package;
}// 去除报头
// "len"\n"x op y"\n
bool Decode(std::string &package, std::string *content)
{ // 找到第一个分隔符 '\n'std::size_t pos = package.find(protocol_sep);if(pos == std::string::npos) return false;// 获取 len 部分的信息std::string len_str = package.substr(0, pos);std::size_t len = std::stoi(len_str);// package = len_str + content_str +2std::size_t total_len = len_str.size() + len + 2;if(package.size() < total_len) return false;*content = package.substr(pos+1, len);// 移除报文package.erase(0, total_len);return true;
}class Request
{
public:Request(int data1 = 0, int data2 = 0, char oper = '+'):x(data1), y(data2), op(oper){}
public:// 序列化bool Serialize(std::string *out){
#ifdef MySelf// 构建报文的有效载荷// struct => string, "x op y”std::string s = std::to_string(x);s += blank_space_sep;s += op;s += blank_space_sep;s += std::to_string(y);*out += s;return true;
#elseJson::Value root;root["x"] = x;root["y"] = y;root["op"] = op;// 为了方便我们观察,使用 StyledWriterJson::StyledWriter w;*out = w.write(root);return true;
#endif}// 反序列化bool Deserialize(const std::string &in)// "x op y"{
#ifdef MySelf// 找到第一个 ' ' 的位置std::size_t left = in.find(blank_space_sep);if(left == std::string::npos) return false;std::string part_x = in.substr(0, left);// 找到最后一个 ' ' 的位置std::size_t right = in.rfind(blank_space_sep);if(right == std::string::npos) return false;std::string part_y = in.substr(right+1);// 判断位置是否合法if(left + 2 != right) return false;op = in[left+1];x = std::stoi(part_x);y = std::stoi(part_y);return true;
#elseJson::Value root;Json::Reader r;r.parse(in, root);x = root["x"].asInt();y = root["y"].asInt();op = root["op"].asInt();return true;
#endif}void DebugPrint(){std::cout << "新请求构建完成: " << x << op << y << "=?" << std::endl;}
public:// x op yint x;int y;char op; // +-*/%
};class Response
{
public:Response(int res = 0, int c = 0):result(res), code(c){}
public:bool Serialize(std::string *out){
#ifdef MySelf// "len"\n"result code"//构建报文的有效载荷std::string s = std::to_string(result);s += blank_space_sep;s += std::to_string(code);*out += s;return true;
#elseJson::Value root;root["result"] = result;root["code"] = code;Json::StyledWriter w;*out = w.write(root);return true;
#endif}bool Deserialize(const std::string &in)// "result code"{
#ifdef MySelfstd::size_t pos = in.find(blank_space_sep);if (pos == std::string::npos) return false;std::string part_left = in.substr(0, pos);std::string part_right = in.substr(pos+1);result = std::stoi(part_left);code = std::stoi(part_right);return true;
#elseJson::Value root;Json::Reader r;r.parse(in, root);result = root["result"].asInt();code = root["code"].asInt();return true;
#endif}void DebugPrint( ){std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl;}
public:int result;int code; //为0则表示可信,否则!0具体是几, 表明对应的错误原因
};
6. ServerCal.hpp
#pragma once
#include <iostream>
#include "Protocol.hpp"enum
{Div_Zero = 1,Mod_Zero,Other_Oper
};class ServerCal
{
public:ServerCal(){}Response CalculatorHelper(const Request &req) // "10 + 20"{Response resp(0, 0);switch (req.op){case '+':resp.result = req.x + req.y;break;case '-':resp.result = req.x - req.y;break;case '*':resp.result = req.x * req.y;break;case '/':{if (req.y == 0)resp.code = Div_Zero;elseresp.result = req.x / req.y;}break;case '%':{if (req.y == 0)resp.code = Mod_Zero;elseresp.result = req.x % req.y;}break;default:resp.code = Other_Oper;break;}return resp;}// "len"\n"10 + 20"\nstd::string Handler(std::string &package){std::string content;// 尝试解码bool r = Decode(package, &content); // "len"\n"10 + 20"\n// 解码失败直接返回if (!r) return "";// "10 + 20"// 得到数据后尝试反序列化Request req;r = req.Deserialize(content); // "10 + 20" ->x=10 op=+ y=20if (!r) return "";content = ""; // 尝试计算 Response resp = CalculatorHelper(req); // result=30 code=0;// 序列化 resp 并加上报头传回resp.Serialize(&content); // "30 0"content = Encode(content); // "len"\n"30 0"return content;}~ServerCal(){}
};
7. Socket.hpp
#pragma once #include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include <functional>#include "Log.hpp"
extern Log lg;enum
{SocketErr = 2,BindErr,ListenErr,NonBlockErr
};// 监听时套接字排队的未处理连接请求的最大数量
const int backlog = 10;class Sock
{
public:Sock(){}~Sock(){}
public:// 创建套接字void Socket(){// 创建字节流型(TCP)套接字sockfd_ = socket(AF_INET, SOCK_STREAM, 0);if(sockfd_ < 0){lg(Fatal, "socker error, %s: %d" ,strerror(errno) ,errno);exit(SocketErr);}// 让 TCP 的服务器方不要进入 TIME_WAIT 状态int opt = 1;setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR| SO_REUSEPORT, &opt, sizeof(opt));}// 绑定// 将一个地址(包括IP地址和端口号)分配给套接字,使得套接字与特定的网络接口和端口关联起来void Bind(uint16_t port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);// 绑定到 INADDR_ANY 后,允许套接字监听所有可用的网络接口 local.sin_addr.s_addr = INADDR_ANY;if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0){lg(Fatal,"bind error, %s:%d" , strerror(errno), errno);exit(BindErr);}}// 监听// 使服务器端套接字进入监听状态,准备接受来自客户端的连接请求。// 在没有调用监听操作之前,套接字不能接收连接。void Listen(){if (listen(sockfd_, backlog) < 0){lg(Fatal, "listen error, %s:%d", strerror(errno), errno);exit(ListenErr);}}// 接收// 从监听队列中取出一个连接请求,并建立一个新的连接。将一个被动监听的服务器端套接字转换为一个活跃的连接。// 接收成功时,返回一个新的套接字,用于与客户端进行通信。它提供了客户端的地址信息,包括IP地址和端口号。// 通过接收操作,服务器可以同时与多个客户端建立连接。每个客户端连接都有自己的套接字,允许服务器并行处理多个连接。int Accept(std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);if (newfd < 0){lg(Warning, "accept error, %s:%d", strerror(errno), errno);return -1;}char ipstr[64];inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));*clientip = ipstr;*clientport = ntohs(peer.sin_port);return newfd;}// 连接// 连接操作让客户端套接字与服务器端套接字建立一个双向的通信通道。这是客户端能够发送和接收数据的前提。bool Connect(const std::string &ip, const uint16_t &port){struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));peer.sin_family = AF_INET;peer.sin_port = htons(port);inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));if(n == -1) {std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;return false;}return true;}// 关闭服务器监听套接字void Close(){close(sockfd_);}// 获取套接字int Fd(){return sockfd_;}
private:int sockfd_;
};
8. TimeManager.hpp
#pragma once
#include <queue>class Timer
{uint64_t expired_time;bool type;func_t cb;
};class TimeManager
{
private:std::priority_queue<std::vector<Timer>> pq; //堆
}
9. TcpServer.hpp
#pragma once
#include <iostream>
#include <string>
#include <unordered_map>
#include <memory>
#include <functional>
#include "Epoller.hpp"
#include "Comm.hpp"
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "TimeManager.hpp"class Connection;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET); // 边缘触发方式 读事件
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET); // 边缘触发方式 写事件
const static int g_buffer_size = 128; // 事件管理器中缓冲区大小
const uint16_t defaultport = 8080;// 设置回调函数形式
using func_t = std::function<void(std::shared_ptr<Connection>)>;// 通过 Connection 类实现
// 1. 封装连接信息 2. 回调机制 3. 与服务器关联 4. 资源管理
class Connection
{
public:Connection(int sock, std::shared_ptr<TcpServer> tcp_server_ptr) :_sock(sock),_tcp_server_ptr(tcp_server_ptr){}// 获取套接字int SockFd() { return _sock; }// 向输入缓冲区添加void AppendInBuffer(const std::string &info){_inbuffer += info;}// 向输出缓冲区添加void AppendOutBuffer(const std::string &info){_outbuffer += info;}// 获取输入缓冲区内容std::string &Inbuffer(){return _inbuffer;}// 获取输出缓冲区内容std::string &Outbuffer(){return _outbuffer;}// 设置实例的回调函数void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}~Connection() {}
private:int _sock; // 套接字描述符std::string _inbuffer; // 输入缓冲区std::string _outbuffer; // 输出缓冲区public:func_t _recv_cb; // 读事件回调函数func_t _send_cb; // 写件回调函数func_t _except_cb; // 异常事件回调函数// 回指指针std::shared_ptr<TcpServer> _tcp_server_ptr;std::string _ip;uint16_t _port;time_t last_active_time;
};class TcpServer : public nocopy
{static const int num = 64; // epoll_create 的参数
public:TcpServer(uint16_t port, func_t OnMessage):_port(port),_epoller_ptr(new Epoller()),_listensock_ptr(new Sock()),_OnMessage(OnMessage){}void Init() {_listensock_ptr->Socket();SetNonBlockOrDie(_listensock_ptr->Fd()); // ET 模式需要将套接字设置成非阻塞模式_listensock_ptr->Bind(_port);_listensock_ptr->Listen();lg(Info, "create listen socket success, sockfd: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, // Accepter 作为成员函数, 第一个参数必须是 this, 这之后我们需要再传入一个该函数自己的参数// 将传入的参数传递到 Accepter 函数的参数, 然后调用 std::bind(&TcpServer::Accepter, this, std::placeholders::_1),nullptr, nullptr);}void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb,const std::string &ip = "0.0.0.0", uint16_t port = 8081){// 将 fd 和 event 加到内核中_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, event);// 给 sock 建立一个 connection 对象, 将 sock 添加到 Connection 中std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this));new_connection->SetHandler(recv_cb, send_cb, except_cb);new_connection->_ip = ip;new_connection->_port = port;// 同时, 将 sock 和 Connecion 放入 _connections_connections.insert(std::make_pair(sock, new_connection));// 再给每一个链接添加一个定时器Timer timer;time_manager.push(timer);lg(Debug, "add a new connection success, sockfd is: %d" , sock);}// 监听套接字的读事件处理函数(链接管理器) void Accepter(std::shared_ptr<Connection> connection){while(true){// 获取客户端信息struct sockaddr_in peer;socklen_t len = sizeof(peer);// 调用全局命名空间中的 accept 函数,而不是当前类或命名空间中的任何同名函数int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);if(sock > 0){uint16_t peerport = ntohs(peer.sin_port);char ipbuf[128];inet_ntop(AF_INET, &peer.sin_addr, ipbuf, sizeof(ipbuf));lg(Debug, "get a new client, client info -> [%s:%d], sockfd: %d" , ipbuf, peerport, sock);// ET 模式需要将套接字设置成非阻塞模式SetNonBlockOrDie(sock); // listensock 只需要设置 _recv_cb,而一般 sock 的读,写,异常指针都要设置AddConnection(sock, EVENT_IN,std::bind(&TcpServer::Recver, this, std::placeholders::_1),std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Excepter, this, std::placeholders::_1),ipbuf, peerport); }else{if(errno == EWOULDBLOCK) break;else if(errno == EINTR) continue;else break;}}}// 读事件管理器// 服务器不用关心数据的格式, 只要IO数据就行, 至于有没有读完, 报文的具体格式细节, 服务器都不用管void Recver(std::shared_ptr<Connection> connection){connection->last_active_time = time(nullptr);int sock = connection->SockFd();while(true){char buffer[g_buffer_size];memset(buffer, 0, sizeof(buffer));ssize_t n = recv(sock, buffer, sizeof(buffer)-1, 0); // 非阻塞读取if(n > 0){connection->AppendInBuffer(buffer);}else if(n == 0){lg(Info, "sockfd: %d, client info [%s:%d] quit...", connection->_ip.c_str(), connection->_port);connection->_tcp_server_ptr->Excepter(connection);}else{if (errno == EWOULDBLOCK) break;else if (errno == EINTR) continue;else {lg(Warning, "sockfd: %d, client info [%s:%d] recv error...", sock, connection->_ip.c_str(), connection->_port);connection->_tcp_server_ptr->Excepter(connection);break;}}}// 数据有了, 但是不一定是完整的// 因此我们需要 1. 检测 2. 如果有完整报文,就处理_OnMessage(connection);}// 写事件管理器void Sender(std::shared_ptr<Connection> connection) {connection->last_active_time = time(nullptr);int sock = connection->SockFd();auto &outbuffer = connection->Outbuffer();while(true){ssize_t n = send(connection->SockFd(), outbuffer.c_str(), outbuffer.size(), 0);if(n > 0){outbuffer.erase(0, n);if (outbuffer.empty()) break;}else if(n == 0){return;}else{if (errno == EWOULDBLOCK) break;else if (errno == EINTR) continue;else {lg(Warning, "sockfd: %d, client info [%s:%d] send error...", sock, connection->_ip.c_str(), connection->_port);connection->_tcp_server_ptr->Excepter(connection);break;}}}if (!outbuffer.empty()) // 对写事件进行关心{EnableEvent(connection->SockFd(), true, true);}else // 对写事件取消关心{EnableEvent(connection->SockFd(), true, false);}}// 异常事件管理器void Excepter(std::shared_ptr<Connection> connection) {lg(Warning, "Excepter hander sockfd: %d, client info [%s:%d] excepter handler", connection->SockFd(), connection->_ip.c_str(), connection->_port);// 验证 sock 是否合法if (!IsConnectionSafe(connection->SockFd())) return;//1.移除对特定fd的关心_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, connection->SockFd(), 0);// 2.关闭异常的文件描述符lg(Debug, "close %d done!", connection->SockFd());close(connection->SockFd());// 3.从unordered_map中移除lg(Debug, "remove %d from _connections!", connection->SockFd());_connections.erase(connection->SockFd());}// 对事件的状态进行设置void EnableEvent(int sock, bool readable, bool writeable){uint32_t events = 0;events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);_epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);}// 判断 fd 是否被用户关心(是否存在 epoll 模型的红黑树中)bool IsConnectionSafe(int fd){auto iter = _connections.find(fd);if (iter == _connections.end()){return false;}else{ return true;}}// 任务派发器void Dispatcher(int timeout){// 查询当前有多少的事件就绪int n = _epoller_ptr->EpollerWait(revs, num, timeout);for (int i = 0; i < n; i++){uint32_t events = revs[i].events;int sock = revs[i].data.fd;//统一把事件异常转换成为读写问题if (events & EPOLLERR)events |= (EPOLLIN | EPOLLOUT);if (events & EPOLLHUP)events |= (EPOLLIN | EPOLLOUT);// 只处理 EPOLLIN 和 EPOLLOUTif ((events & EPOLLIN) && IsConnectionSafe(sock)){if(_connections[sock]->_recv_cb)_connections[sock]->_recv_cb(_connections[sock]);}if ((events & EPOLLOUT) && IsConnectionSafe(sock)){if(_connections[sock]->_send_cb)_connections[sock]->_send_cb(_connections[sock]);}}}// 在任务派发后的空闲期所做的其他事void do_other_thing(){// 检测最小堆 top(); 是否超时// 没有超时: int timeout = top()->expired - now();// 如果超时:top()->cb(); pop(); }void Loop() {_quit = false;// 我们添加对应的事件时, 除了要将 fd 和 event 加到内核中// 还要给 listensock 建立一个 connection 对象, 将 listensock 添加到 Connection 中// 同时, 将 listensock 和 Connecion 放入 _connections// AddConnection(); -> 移至初始化函数while (!_quit){Dispatcher(1000);do_other_thing();}_quit = true;}~TcpServer() {}
private:std::shared_ptr<Epoller> _epoller_ptr; // epoll 句柄指针std::shared_ptr<Sock> _listensock_ptr; // 监听套接字指针std::unordered_map<int, std::shared_ptr<Connection>> _connections; // <sock, Connection> 对,表示每一个 sock 与其对应的各个处理方案struct epoll_event revs[num]; // epoll_wait 函数参数uint16_t _port; // 端口号bool _quit; // 服务器是否退出TimeManager time_manager; // 时间管理确认func_t _OnMessage; // 上层处理函数// std::vector<int> fds; // 也可以使用 vector 数组来管理众多 fd
};
10. Main.cc
#include "TcpServer.hpp"
#include "ServerCal.hpp"
#include <memory>
#include <iostream>ServerCal calculator;void DefaultOnMessage(std::shared_ptr<Connection> connection_ptr)
{std::cout << "上层获取数据成功, data: " << connection_ptr->Inbuffer() << std::endl;std::string response_str = calculator.Handler(connection_ptr->Inbuffer());if (response_str.empty()) return;lg(Debug, "%s", response_str.c_str());// 发送回去connection_ptr->AppendOutBuffer(response_str);connection_ptr->_tcp_server_ptr->Sender(connection_ptr);// 我们如何正确理解发送?// 在 select/poll/epoll 中, 写事件经常是就绪的(因为发送缓冲区一般来说都是有空间的)// 如果我们对 EPOLLOUT 设置为关心, 那么在大部分情况下 EPOLLOUT 几乎都是就绪的// 这样就会导致 epollserver 经常返回, 从而导致浪费 CPU 资源// 因为上面的原因, 我们需要对读事件设置为总是关心, 而对写事件设置为按需关心 // 那我们怎么处理写事件呢?——直接写入, 如果写入完成就直接结束; 如果写入完成, 但是数据没有写完// 就说明 outbuffer 中还有内容, 此时就需要对写事件进行关心了! 如果写完了所有数据, 再去掉对写事件的关心
}int main()
{std::unique_ptr<TcpServer> epoll_svr(new TcpServer(8080, DefaultOnMessage));epoll_svr->Init();epoll_svr->Loop();return 0;
}
② Reactor 理论
从我们编写的代码中我们可以看出,Reactor 模型实际上是一个半同步半异步模型,它的同步体现在它需要自己去进行 select/poll/epoll ,它的异步则体现在它可以调用各种回调函数!