文章目录
- 1. 关于Reactor模式的了解
- 2. 基于Reactor模式实现epoll ET服务器
- 2.1 EventItem类的实现
- 2.2 Reactor类的实现
- Dispatcher函数
- AddEvent函数
- DelEvent函数
- EnableReadWrite函数
- 2.3 四个回调函数的实现
- acceptor回调函数
- recver回调函数
- sender回调函数
- errorer回调函数
- 3. epoll ET服务器的运行
- 4. 线程池的使用
1. 关于Reactor模式的了解
Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的设计模式。
Reactor模式的五个角色构成
- 句柄:文件描述符
- 同步事件分离器:本质就是一个系统调用,用于等待事件的发生。对于Linux来说,这个角色就是IO多路复用,select、poll、epoll等。
- 事件处理器:由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈。
- 具体事件处理器:事件处理器中各个回调方法的具体实现。
- 初识分发器:它本质上就是Reactor角色,初始分发器会通过同步事件分离器来等待事件的就绪,当对应事件就绪时就调用事件处理器,最后调用对应的回调方法来处理这个事件。
Reactor模式的工作流程
- 当应用向初识分发器注册具体事件处理器时,应用会标识出该事件处理器希望初识分发器在某个事件发生时向其通知,该事件与Handle关联。
- 初识分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
- 当所有的事件处理器注册完毕后,应用会启动初识分发器的事件循环,这时初识分发器会将每个事件处理器的Handle葛冰起来,并使用同步事件分离器来等待这些事件的发生。
- 初识分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
- 初识分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
- 初识分发器会调用其对应事件处理器当中对应的回调方法来响应该事件。
2. 基于Reactor模式实现epoll ET服务器
下面我们根据Reactor的五个角色构成以及其工作流程,实现一个Reactor模式下的epoll服务器,从而更直观地感受一下Reactor模式。
2.1 EventItem类的实现
EventItem类的介绍
- 在Reactor的工作流程中说到,在注册事件处理器时需要将其与Handle关联,本质上就是需要将读回调、写回调和异常回调等与某个文件描述符关联起来。
- 这样做的目的就是为了当某个文件描述符上的事件就绪时可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。
EventItem类的设计
所以我们可以设计一个EventItem类。,该类中有以下成员:
- 文件描述符Handle。
- 对应的读回调函数指针、写回调函数指针、异常回调函数指针。
- 输入缓冲区inbuffer和输出缓冲区outbuffer。
- 回指指针R,指向我们定义的Reactor对象。
对于前两种成员文章上面已经介绍过了,下面介绍一下后两种成员的作用。
- 为什么要有inbuffer?当某个文件描述符的读事件就绪时,我们会调用recv函数读取客户端发送过来的数据,但是我们其实并不能保证我们读取到的是一个完整的报文,因此需要将读取到的数据暂时保存到该文件描述符对应的inbuffer中,当inbuffer当中可用分离出一个完整的报文后再将其分离出来进行数据处理,这里inbuffer的本质就是用来解决粘包问题的。
- 为什么要有outbuffer?当处理完一个请求报文后,需要将响应数据发送给客户端,但我们并不能保证底层TCP的发送缓冲区中足够的空间功,因此需要将发送的数据暂时保存在文件描述符对应对应outbuffer中,当底层TCP的发送缓冲区中有足够的空间时,即当写事件就绪时,再一次发送outbuffer当中的数据。
- 为什么要有回指指针R?后续我们需要根据EventItem对象去寻找Reactor对象,比如当连接事件就绪时,需要调用Reactor类中的AddEventt函数将其添加到Dispatcher当中。有了回指指针R,可以让我们快速地找到对应的Reactor对象。
并且,EventItem类当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。
EventItem类代码如下:
class EventItem
{
public:int _sock; // 文件描述符Reactor *_R; // 回指指针callback_t _recv_handler; // 读回调callback_t _send_handler; // 写回调callback_t _error_handler; // 异常回调std::string _inbuffer; // 输入缓冲区std::string _outbuffer; // 输出缓冲区public:EventItem(): _sock(-1), _R(nullptr), _recv_handler(nullptr), _send_handler(nullptr), _error_handler(nullptr){}// 管理回调void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler){_recv_handler = recv_handler;_send_handler = send_handler;_error_handler = error_handler;}~EventItem() {}
};
2.2 Reactor类的实现
Reactor类的介绍
- 在Reactor的工作流程中提到,当所有的时间处理器注册完毕之后,会使用同步事件分离器等待这些事件发生,当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初识分发器,然后初识分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器的回调方法来响应该事件。
- 本质就是当事件注册完毕之后,会调用epoll_wait函数来等待这些事件的就绪,当某个事件就绪时epoll_wait函数会告知调用方,然后调用方就会根据就绪的文件描述符来找到其对应的各种回调函数,并调用其对应的回调函数进行事件处理。
Reactor类的设计
- 该类当中有一个成员函数叫做Dispatcher,这个函数其实就是所谓的初识分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
- 当事件就绪之后根据就绪的文件描述符来找到其对应的各种回调函数,由于我们会将每个文件描述符及其对应的各种回调函数都封装到一个EventItme结构当中,所以实际我们就是要根据文件描述符找到其对应的EventItem结构。
- 我们可以使用哈希表,建立各个文件描述符与其对应的EventItem结构之间的映射,这个哈希表可以作为Reactor类中的一个成员变量,当需要找某个文件描述符对应的EventItem结构时就可以根据该成员变量找到。
- 当然,Reactor类当中还需要提供成员函数AddEvent和DelEvent,用于向Dispatcher当中注册和删除事件。
Reactor类的基本成员变量以及epoll模型创建的代码
#define SIZE 256
#define MAX_NUM 64class Reactor
{
private:int _epfd; // epoll模型std::unordered_map<int, EventItem> _event_items; // 建立sock与EventItem结构的映射public:Reactor() : _epfd(-1) {}void InitReactor(){// 创建epoll模型_epfd = epoll_create(SIZE);if (_epfd < 0){std::cerr << "epoll_create error" << std::endl;exit(5);} }~Reactor(){if (_epfd >= 0) close(_epfd);}
};
Dispatcher函数
Reactor类当中的Dispatcher函数就是之前所说的初识分发器,这里我们可以将其更形象地称为事件分派器。
- 事件分派器要做的就是调用epoll_wait函数等待事件发生。
- 当某个文件描述上的事件发生后,使用哈希表根据文件描述符找到对应的EventItem结构,然后调用EventItem结构当对应的回到函数对该事件进行处理即可。
// 事件分派器void Dispatcher(int timeout){struct epoll_event revs[MAX_NUM];int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);for (int i = 0; i < num; ++i){int sock = revs[i].data.fd;if ((revs[i].events & EPOLLIN) || (revs[i].events & EPOLLHUP)){// 优先处理异常事件就绪if (_event_items[sock]._error_handler)_event_items[sock]._error_handler(&_event_items[sock]);}if (revs[i].events & EPOLLIN){if (_event_items[sock]._recv_handler)_event_items[sock]._recv_handler(_event_items[sock]);}if (revs[i].events & EPOLLOUT){if (_event_items[sock]._send_handler)_event_items[sock]._send_handler(&_event_items[sock]);}}}
- 这里没有用switch语句或者if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断。
- 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for内部。
- 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不好进入for循环内部。
- 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数进行事件处理。
- 事件处理时最好先对异常事件进行处理,因此代码中将异常事件的判断放在了最前面。
AddEvent函数
Reactor类当中的AddEvent函数是用于进行事件注册的。
- 在注册事件时需要传入一个文件描述符和一个事件集合,表示需要监视哪个文件描述符上的哪些事件。
- 还需要传入该文件描述符对应的EventItem结构,表示该文件描述符上的事件就绪后一个执行的回调方法。
- 在AddEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符及其对应的事件集合注册到epoll模型当中,然后建立该文件描述符与其对应的EventItem结构的映射关系。
void AddEvent(int sock, uint32_t event, const EventItem &item){struct epoll_event ev;ev.data.fd = sock;ev.events = event;if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0){std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;return;}// 建立sock与EventItem直接的结构映射关系_event_items.insert({sock, item});std::cout << "添加:" << sock << " 到epoll模型中,成功" << std::endl;}
DelEvent函数
- Reactor类当中的DelEvent函数是用于事件删除的。
- 在删除事件时只需要传入一个文件描述符即可。
- 在DelEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符从epoll模型中删除,并取消该文件描述符与其对应的EventItem结构的映射关系。
void DelEvent(int sock){if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0){std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;return;}// 取消sock与EventItem之间的映射关系_event_items.erase(sock);std::cout << "从epoll模型中删除:" << sock << " 成功" << std::endl;}
EnableReadWrite函数
- Reactor类当中的EnableReadWrite函数,用于使能某个文件描述符的读写事件。
- 调用EnableReadWrite函数需要传入一个文件描述符,表示需要设置的是哪个文件描述符的事件。
- 还需要传入两个bool值,分别表示是否需要使能读和写事件。
- EnableReadWrite函数内部会调用epoll_ctl函数修改该文件描述符的监听事件。
void EnableReadWrite(int sock, bool read, bool write){struct epoll_event ev;ev.data.fd = sock;// EPOLLET表示当前epoll服务器为边缘触发模式ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0){std::cerr << "epoll mod error, fd: " << sock << std::endl;}}
2.3 四个回调函数的实现
下面介绍并实现四个回调函数
- acceptor:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
- recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
- sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
- errorer:当异常事件就绪时可以调用该函数将对应的文件描述符关闭。
当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数添加到EventItem结构当中。
- 我们会将监听套接字对应的EventItem结构当中的recv_handler设置为acceptor,因为监听套接字的读事件就绪就意味中有连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的send_handler和error_handler可以设置为nullptr。
- 当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的EventItem结构当中的recv_handler回调,此时就会调用acceptor回调获取建立好的连接。
- 而对于客户端建立连接的套接字,我们会将其对应的EventItem结构当中的recv_handler、send_handler和error_handler分别设置为这里的recver、sender和error。
- 当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的EventItem结构当中对应的回调函数,也就是这里的recver、sender和error。
acceptor回调函数
acceptor回调用于处理连接事件,其工作流程如下:
- 调用accept函数获取底层建立好的连接。
- 将获取到的套接字设置为非阻塞,并未其创建EventItem结构,填充EventItem结构当中的各个字段,并注册该套接字相关的回调方法。
- 将该套接字及其对应需要关心的事件注册到epoll当中。
int acceptor(EventItem *item)
{while (1){struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));socklen_t len = sizeof(peer);int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);if (sock < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){// 并没有读取出错,只是底层没有链接了return 0;}else if (errno == EINTR){continue;}else{std::cerr << " accept error" << std::endl;return -1;}}// 将该套接字设置为非阻塞SetNonBlock(sock);// 构建EventItem结构EventItem sock_item;sock_item._sock = sock;sock_item._R = item->_R;// 注册回调方法sock_item.ManageCallbacks(recver, sender, errorer); Reactor *R = item->_R;// 将该套接字以及其对应的事件注册到epoll中R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item);}
}
需要注意的是,因为这里实现的是ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。
- 因为ET模式下只有当底层建立的连接从无到有或是从右到多时才会通知上层,如果没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有获取完的连接就相当于丢失了,所以需要循环多次调用accept函数获取底层建立好的连接。
- 循环调用accept函数也就意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住。因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就会返回,而不会被阻塞住。
accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。
设置文件描述符为非阻塞
设置文件描述符为非阻塞时,需要先调用fcntl函数获取获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。
代码如下:
// 设置文件描述符为非阻塞
bool SetNonBlock(int sock)
{int fl = fcntl(sock, F_GETFL);if (fl < 0){std::cerr << "fcntl error" << std::endl;return false;}fcntl(sock, F_SETFL, fl | O_NONBLOCK);return true;
}
监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。
- 如果错误码为EAGAIN或EWOULDBLOCK,说明本次出错是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次acceptor调用成功。
- 错误码为EINTR,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。
- 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次acceptor调用失败。
accept、recv和send等IO系统调用为什么会被信号中断?
IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前被信号中断了,也就是说IO系统调用在陷入内核,但是却在没有返回用户态的时候内核就去处理其他信号了。
- 一般来说,在从内核态返回用户态之前会检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,内核就会对该信号进行处理。
- 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上是一个特例,因为IO过程分为 等 和 拷贝 两个步骤,而一般等的时间是比较长的,而在这个过程中我们的执行流其实是处于闲置的状态的,因此在等的过程中如果有信号产生,内核就会立即去进行信号的处理。
写事件是按需打开的
这里调用accept获取上来的套接字添加到epoll模型中时,只添加了EPOLLIN和EPOLLET事件,也就是说只让epoll关心套接字的读事件。
- 这里之所以没有添加写事件,是因为当前我们并没有要发送的数据,因此密钥必要让epoll帮我们关心写事件。
- 一般读事件是经常被设置的,而写事件是按需打开的,只有当我们有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭。
recver回调函数
recver回调函数用于处理读事件,其工作流程如下:
- 循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应的EventItem结构的inbuffer中。
- 对inbuffer中的数据进行切割,将完整的报文切割出来,剩下的留在inbuffer中。
- 对切割出来的完整报文进行反序列化。
- 业务处理。
- 业务处理后形成响应报文。
- 将响应报文添加到对应EventItem结构的outbuffer中,并打开写事件。
下一次Dispatcher在进行事件派发的时候就会帮我们关注该套接字的写事件,当写事件就绪就会执行该套接字对应的EventItem结构中的写回调方法,进而将outbuffer中的响应数据发送给客户端。
int recver(EventItem *item)
{if (item->_sock < 0) return -1; // 说明该文件描述符已经被关闭// 1. 数据读取if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){// 读取失败item->_error_handler(item);return -1;}// 2. 报文切割,以 X 作为分隔符std::vector<std::string> datagrams;StringUtil::Split(item->_inbuffer, &datagrams, "X");for (auto s : datagrams){// 3. 反序列化struct data d;StringUtil::Deserialize(s, &d._x, &d._y, &d._op);// 4. 业务处理int result = 0;switch(d._op){case '+' : result = d._x + d._y; break;case '-' : result = d._x - d._y; break;case '*' : result = d._x * d._y; break;case '/' : if (d._y == 0){std::cerr << "Error: div zero!" << std::endl;continue; // 继续处理下一个报文}else{result = d._x / d._y;}break;case '%' :if (d._y == 0){std::cerr << "Error: mod zero! " << std::endl;continue;}else{result = d._x % d._y;}break;default:std::cerr << "operation error!" << std::endl;continue; // 继续处理下一个报文}// 5. 形成响应报文std::string response;response += std::to_string(d._x);response += d._op;response += std::to_string(d._y);response += "=";response += std::to_string(result);response += "X"; // 报文与报文之间的分隔符// 6. 将响应报文添加到outbuffer中item->_outbuffer += response;// 打开写事件if (!item->_outbuffer.empty()) item->_R->EnableReadWrite(item->_sock, true, true);}
}
数据读取函数recver_helper
我们可以将循环调用recv函数读取数据的过程封装成一个recv_helper函数。
- recver_helper函数要做的就是循环调用recv函数将读取到的数据添加到inbuffer中。
- 当recv函数的返回值小于0时同一需要进一步判断错误码,如果错误码为EAGAIN或EWOULDBLOCK说明底层数据读取完毕了,如果错误码为EINTR则说明读取说错被信号中断了,此时还需要继续调用recv函数进行读取,否则就是读取出错了。
- 当读取出错时直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
int recver_helper(int sock, std::string *out)
{while (true){char buffer[128];ssize_t size = recv(sock, buffer, sizeof(buffer) - 1, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){// 数据读取完毕return 0;}else if (errno == EINTR){// 被信号中断,继续尝试读取continue;}else{// 读取出错return -1;}}else if (size == 0){// 对端连接关闭return -1;}// 读取成功buffer[size] = '\0';// 这里这个out不是输出缓冲区,是输出型参数的意思*out += buffer; // 将读取到的数据添加到该套接字对应EventItem结构对应的inbuffer中}
}
报文切割函数Split
报文切割本质就是为了防止粘包问题,而粘包问题实际是设计到协议定制的。
- 因为我们需要根据协议知道如何将各个报文进行分离,比如UDP分离报文采用的就是定长报头+自描述字段。
- 我们的目的是演示整个数据处理的过程,为了简单起见就不进行过于复杂的协议定制了,这里我们就以"X"作为各个报文之间的分隔符,每个报文的最后队徽以一个"X"作为报文结束的标志。
- 因此现在要做的就是以"X"作为分隔符对inbuffer当中的字符串进行切割,这里将这个过程封装成一个Split函数并放到一个StringUtil工具类中。
- Split函数要做就是对inbuffer中的报文进行分割,将切割出来的报文放到vector中,对于最后无法切割的报文的数据就留在inbuffer即可。
static void Split(std::string &in, std::vector<std::string> *out, const std::string sep){int start = 0;size_t pos = in.find(sep, start);while (pos != std::string::npos){out->push_back(in.substr(start, pos - start));start = pos + sep.size();pos = in.find(sep, start);}in = in.substr(start);}
反序列化函数Deserialize
在数据发送之前需要进行序列化encode,接收数据之后需要对数据进行反序列化decode。
- 序列化就是将对象的状态信息转换为可以存储或传输的形式(字节序列)的过程。
- 反序列化就是把字节序列恢复为原对象的过程。
实际反序列化也是与协议定制相关的,假设这里的epoll服务器向客户端提供的就是计算服务,客户端向服务器发送的都是需要计算的算术表达式。可以用结构体来描述这样一个算术表达式。
static void Deserialize(std::string &in, int *x, int *y, char *op){size_t pos = 0;for (pos = 0; pos < in.size(); pos++){if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')break; }if (pos < in.size()){std::string left = in.substr(0, pos);std::string right = in.substr(pos + 1);*x = atoi(left.c_str());*y = atoi(right.c_str());*op = in[pos];}else{*op = -1;}}
业务处理
业务处理就是服务器拿到客户端发来的数据后,对数据进行数据分析,最终拿到客户端想要的数据。
我们这里要做的业务处理非常简单,就是用反序列化后的数据计算,此时得到的计算结果就是客户端想要的。
形成响应报文
在业务处理后我们已经拿到了客户端想要的数据,现在我们要的就是形成响应报文,由于我们这里规定每个报文都以"X"作为报文结束的标志,因此在形成响应报文的时候,就需要在每一个计算结果后面加上一个"X",表示这是之前某一个请求报文的响应报文,因此协议定制后就需要双方遵守。
将响应报文添加到outbuffer中
响应报文构建完后需要将其添加到该套接字对应的outbuffer中,并打开该套接字的写事件,此后当写事件就绪时就会将outbuffer当中的数据发送出去。
sender回调函数
sender回调函数用于处理写事件,其工作流程如下:
- 循环调用send函数发送数据,并将发送出去的数据从该套接字对应EventItem结构删除。
- 如果循环调用send函数后该套接字对应的outbuffer当中的数据被全部发送,此时就要将该套接字对应的写事件关闭,因为已经没有要发送的数据,如果outbuffer中还有数据,那么该套接字对应的写事件就应该继续打开。
int sender(EventItem *item)
{if (item->_sock < 0) return -1;int ret = sender_helper(item->_sock, item->_outbuffer);if (ret == 0) // 全部发送成功,不再关心写事件item->_R->EnableReadWrite(item->_sock, true, false);else if (ret == 1) // 没有发送完毕,还需要继续关心写事件item->_R->EnableReadWrite(item->_sock, true, true);else // 写入出错item->_error_handler(item);return 0;
}
我们可以将循环调用send函数发送数据的过程封装成一个sender_helper函数。
- sender_helper函数要做的就是循环调用send函数将outbuffer中的数据发送出去。
- 当send函数的返回值小于0时需进一步判断错误码,如果错误码为EAGAIN或EWOULDBLOCK说明底层TCP发送缓冲区已经被写满了,这时需要将已经发送的数据从outbuffer中移除。
- 如果错误码为EINTR则说明发生过程中被信号中断了,此时还需要继续调用send函数进行发送,否则就是发送出错了。
- 当发送出错时也直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
- 如果最终outbuffer当中的数据全部发送成功,则将outbuffer清空即可。
// in为输入型参数的意思
int sender_helper(int sock, std::string &in)
{size_t total = 0; // 累计已经发送的字节数while (true){ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){// 底层缓冲区已经没有空间了in.erase(0, total); // 将已经发送的数据移除outbufferreturn 1; // 缓冲区写满,没写完}else if (errno == EINTR){// 被信号中断continue;}else{// 写入出错return -1;}}total += size;if (total >= in.size()){in.clear(); // 清空outbufferreturn 0; // 全部写入完毕}}
}
errorer回调函数
errorer回调用于处理异常事件
- 对于异常事件就绪的套接字这里不作其他处理,直接调用close函数关闭该套接字即可。
- 但是在关闭套接字之前,需要先调用DelEvent函数将该套接字从epoll模型删除,并取消套接字与其对应的EventItem结构的映射关系。
- 由于在Dispatcher当中是先处理的异常事件,为了避免该套接字被关闭后继续进行读写操作,然后读写操作失败再次调用errorer回调函数重复关闭该文件描述符,因此在关闭该套接字后将其EventItem当中的文件描述符设置为-1。
- 在调用recver和sender回调执行读写操作之前,都会判断该EventItem结构当中的文件描述符是否有效,如果无效则不会进行后续操作。
int errorer(EventItem *item)
{item->_R->DelEvent(item->_sock); // 将该文件描述符从epoll模型删除close(item->_sock); // 关闭该文件描述符item->_sock = -1; // 防止关闭后继续执行读写回调return 0;
}
3. epoll ET服务器的运行
服务器的运行步骤如下:
- 套接字的创建、绑定和监听,因为是ET模式下的服务器,因此监听套接字创建出来后需要将其设置为非阻塞。
- 实例化一个Reactor对象,并将其进行初始化,也就是创建epoll模型。
- 为监听套接字定义一个EventItem结构,提案重复EventItem结构中的各个字段,并将acceptor设置为监听套接字的回调方法。
- 调用AddEvent函数将监听套接字及其需要监视的事件添加到Dispatcher当中,该过程包括将监听套接字注册到epoll模型中,1以及监听套接字与其对应EventItem结构的映射。
- 最后循环调用Reactor类中的Dispatcher函数进行事件派发。
将sockt套接字进行封装
这里编写一个Socket类,对套接字相关的接口进行一定程度的封装,并且为了让外部能够直接调用Socket类当中封装的函数,将这些函数定义为了静态成员函数。
class Socket
{
public:// 创建套接字static int SocketeCreate(){int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){std::cerr << "socket error" << std::endl;exit(2);}// 设置端口复用int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}// 绑定端口号static void Bind(int sock, int port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_port = htons(port);local.sin_family = AF_INET;local.sin_addr.s_addr = INADDR_ANY;socklen_t len = sizeof(local);if (bind(sock, (struct sockaddr*)&local, len) < 0){std::cerr << "bind error" << std::endl;exit(3);}}// 监听static void Listen(int sock, int backlog){if (listen(sock, backlog) < 0){std::cerr << "listen error" << std::endl;exit(4);}}
};
主函数代码
#include "Reactor.hpp"
#include "Util.hpp"
#include "Socket.hpp"#define BACKLOG 5static void Usage(const std::string s)
{std::cout << "Usage: " << s << " port" << std::endl;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(1);}int port = 8081;int listen_sock = Socket::SocketeCreate();Socket::Bind(listen_sock, port);SetNonBlock(listen_sock);Socket::Listen(listen_sock, BACKLOG);// 创建Reactor,并初始化Reactor R;R.InitReactor();// 创建套接字对应的EventItem结构EventItem item;item._sock = listen_sock;item._R = &R;item.ManageCallbacks(acceptor, nullptr, nullptr); // 套接字只关心读事件// 将监听套接字托管给DispatcherR.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);// 循环进行事件派发int timeout = 1000;while (1) R.Dispatcher(timeout);return 0;
}
至此,一个简单的单Reactor单线程服务器就编写完毕了。
运行这个服务器:
这就可以看到,服务器可以接收客户端发来的请求并且进行业务处理后响应给客户端。
4. 线程池的使用
因为当前的epoll服务器的业务处理比较简单,所以但进程的epoll服务器看起来没有什么压力,但是如果服务器的业务处理逻辑比较复杂,那么某些客户端发来的数据请求就可能长时间得不到响应,因为这时epoll服务器需要花费大量时间进行业务处理,而在这个过程中服务器无法为其他客户端提供服务。
解决方法
可以在当前服务器的基础上接入线程池,当recver回调读取完数据并完成报文的切割和反序列化之后,就可以将其构建成一个任务然后放到线程池的任务队列中,然后服务器就可以继续进行事件派发,而不需要将事件耗费到业务处理上面,而放到任务队列当中的任务,则由线程池当中的若干个线程进行处理。
接入线程池
线程池的代码如下:
#pragma once#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>#define NUM 5//线程池
template<class T>
class ThreadPool
{
public://提供一个全局访问点static ThreadPool* GetInstance(){return &_sInst;}
private:bool IsEmpty(){return _task_queue.size() == 0;}void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void Wait(){pthread_cond_wait(&_cond, &_mutex);}void WakeUp(){pthread_cond_signal(&_cond);}
public:~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}//线程池中线程的执行例程static void* Routine(void* arg){pthread_detach(pthread_self());ThreadPool* self = (ThreadPool*)arg;//不断从任务队列获取任务进行处理while (true){self->LockQueue();while (self->IsEmpty()){self->Wait();}T task;self->Pop(task);self->UnLockQueue();task.Run(); //处理任务}}void ThreadPoolInit(){pthread_t tid;for (int i = 0; i < _thread_num; i++){pthread_create(&tid, nullptr, Routine, this); //注意参数传入this指针}}//往任务队列塞任务(主线程调用)void Push(const T& task){LockQueue();_task_queue.push(task);UnLockQueue();WakeUp();}//从任务队列获取任务(线程池中的线程调用)void Pop(T& task){task = _task_queue.front();_task_queue.pop();}
private:ThreadPool(int num = NUM) //构造函数私有: _thread_num(num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool&) = delete; //防拷贝std::queue<T> _task_queue; //任务队列int _thread_num; //线程池中线程的数量pthread_mutex_t _mutex;pthread_cond_t _cond;static ThreadPool<T> _sInst;
};template<class T>
ThreadPool<T> ThreadPool<T>::_sInst;
设计一个任务类
我们需要定义出来一个任务类,该任务类当中需要提供一个Run方法,这个Run方法就是将线程池中的若干线程池从任务队列当中拿到任务后执行的方法。
- 在任务类中包含两个成员变量,成员bianld就是反序列化后用于进行业务处理的数据,成员变量item就是该套接字的EventItem结构,因为数据处理完之后需要将形成的响应报文添加到该套接字对应的outbuffer中。
- Run方法中处理数据的逻辑与之前的意义,只是将那部分方法放到了Run方法中。
此时recver回调函数中在读取数据、报文切割、反序列化后就可以构建出一个任务对象,然后将该任务放到任务队列当中就行了。
int recver(EventItem* item)
{if (item->_sock < 0) //该文件描述符已经被关闭return -1;//1、数据读取if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败item->_error_handler(item);return -1;}//2、报文切割std::vector<std::string> datagrams;StringUtil::Split(item->_inbuffer, &datagrams, "X");for (auto s : datagrams){//3、反序列化struct data d;StringUtil::Deserialize(s, &d._x, &d._y, &d._op);Task t(d, item); //构建任务ThreadPool<Task>::GetInstance()->Push(t); //将任务push到线程池的任务队列中}return 0;
}
这样线程池就是接入完毕了,下面再次尝试运行这个服务器。需要注意的是在运行之前需要对线程池进行初始化。
//初始化线程池ThreadPool<Task>::GetInstance()->ThreadPoolInit();
运行结果如下: