Reactor模式
Reactor模式的定义
Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的事件设计模式。
Reactor模式的角色构成
Reactor主要由以下五个角色构成:
角色 | 解释 |
Handle(句柄) | 用于标识不同的事件,本质就是一个文件描述符 |
Sychronous Event Demultiplexer(同步事件分离器) | 本质就是一个系统调用,用于等待时间发声。对于Linux来说,同步事件分离器指的就是IO/多路复用,比如select、poll、epoll等 |
Event Handler(事件处理器) | 由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈 |
Concrete Event Handler(具体事件处理器) | 事件处理器中各种回调方法的具体实现 |
Initiation Dispatcher(初始分发器) | 初始分发器实际上就是reactor角色,初始分发器会通过同步事件分离器来等待事件的发生,当对应事件就绪就调用事件处理器,最后调用对应的回调方法来处理这个事件 |
Reactor模式的工作流程
Reactor模式的工作流程如下:
- 当应用向初始分发器注册具体事件处理器时,应用会标识出该事件处理器希望初始分发器在某个事件发生时向其通知,该事件于Handle关联。
- 初始分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
- 当所有的事件处理器注册完毕后,应用会启动初始分发器的事件循环,这时初始分发器会将每个事件处理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。
- 当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器。
- 初始分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
- 初始分发器会调用其对应事件处理器当中的回调方法来响应该事件。
epoll ET服务器(Reactor模式)
如果在此之前没有了解过Reactor模式,相信在看了Reactor模式的工作流程后一定是一头雾水,下面我们实现一个Reactor模式下的epoll ET服务器,来感受一下Reactor模式。
设计思路
功能完善于问题提出
之前echo版的EpollServer,只能做了检测读取事件的就绪,现在需要添加以下功能:
- 读取数据保存的问题:
- 如何保证读取上来的数据就是一条完整的报文?一次读取不能保证,需要边读取边检测。
- 既然一次的读取数据不能保证读完,那读上来的数据如何保存呢?建议一个堆的缓存区。套接字会有很多,那又如何确保套接字和缓冲区一 一对应呢?
- 在ET的工作模式下,怎么能确保通知一次,就把内核中缓冲区的数据全部读取完呢?循环读取,非阻塞的方式。那什么时候停止读取操作呢?读取上来的数据小于预期的值。
- 如果得到了一条完整的报文,如何提取提取有效载荷呢?定制协议+Json的序列化和反序列化!
- 如果读、写和异常事件发生了,如何知道?Epoll接口。怎么样执行对应的方法呢?
- 写事件的就绪条件是缓冲区没有满就是就绪的,所以,如果一直监视写事件的话,就需要一直调用写事件所对应的方法,而大部分时候是没有数据可以发送的,这样调用方法就直接返回了,浪费CPU资源 —— 结论&细节:写事件是按需进行监视的!也就是说有数据要发送的时候才开启对写事件的监视。
- 在ET的工作模式下,要循环写入,确保一次通知就把写的工作干完。当输出缓冲区的数据为空的时候结束写的事件!
设计思路
epoll ET服务器
在epoll ET服务器中,我们需要处理如下几种事件:
- 读事件:如果是监听套接字的读事件就绪则调用accept函数获取底层的连接,如果是其他套接字的读事件就绪则调用recv函数读取客户端发来的数据。
- 写事件:写事件就绪则将待发送的数据写入到发送缓冲区当中。
- 异常事件:当某个套接字的异常事件就绪时我们不做过多处理,直接关闭该套接字。
当epoll ET服务器监测到某一事件就绪后,就会将该事件交给对应的服务处理程序进行处理。
Reactor模式的五个角色
在这个epoll ET服务器中,Reactor模式中的五个角色对应如下:
- 句柄:文件描述符
- 同步事件分离器:IO/多路复用epoll。
- 事件处理器:包括读回调、写回调和异常回调。
- 具体事件处理器:读回调、写回调和异常回调的具体实现。
- 初始分发器:Reactor类当中的Dispatcher函数
Dispatcher函数要做的就是调用epoll_wait函数等待事件发生,当有事件发生后就将就绪的事件派发给对应的服务处理程序即可。
EventItem类
- 在Reactor的工作流程中说道,在注册事件处理器时需要将其与Handle关联,本质上就是将读回调、写回调和异常回调与某个文件描述符关联起来。
- 这样做的目的就是为了当某个文件描述符上的事件就绪就可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。
所以我们可以设计一个Eventtem类,该类中的成员就包括一个文件描述符,以及该文件描述符对应的各种回调函数。
Reactor类
- 在Reactor的工作流程中说道,当所有事件处理器注册完毕后,会使用同步事件分离器等待这些事件发生,当某个事件处理的Handle变成为Ready状态时,同步事件分离器会通知初始分发器,然后初始分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器中对应的回调方法来响应该事件。
- 本质就是当事件注册完毕后,会调用epoll_wait函数来等待这些事件发生,当某个事件就绪时epoll_wait函数就会告诉调用方,然后调用方就根据就绪的文件描述符来找到其对应的各种回调函数,并调用对应的回调函数进行事件处理。
对此我们可以设计一个Reactor类。
- 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
- 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
- 我们可以使用C++ STL当中的unordered_map,来建立各个文件描述符与其对应的EventItem结构之间的映射,这个unordered_map可以作为Reactor类的一个成员变量,当需要找某个文件描述符的EventItem结构时就可以通过该成员变量找到。
- 当然,Reactor类当中还需要提供成员函数AddEvent和DelEvent,用于向Dispatcher当中注册和删除事件。
此外,在Reactor类当中还有一些其他成员,后面实现的时候再做详细论述。
epoll ET服务器的工作流程
这个epoll ET服务器在Reactor模式下的工作流程如下:
- 首先epoll ET服务器需要进行套接字的创建、绑定和监听。
- 然后定义一个Reactor对象并初始化,初始化时要做的就是创建epoll模型。
- 紧接着需要为监听套接字创建对应的EventItem结构,并调用Reactor类中提供的AddEvent函数将监听套接字添加到epoll模型中,并建立监听套接字与其对应的EventItem结构之间的映射关系。
- 之后就可以不断调用Reactor类中的Dispatcher函数进行事件派发。
在事件处理过程中,会不断向Dispatcher当中新增或删除事件,而每个事件就绪时都会自动调用其对应的回调函数进行处理,所以我们要做的就是不断调用Dispatcher函数进行事件派发即可。
EventItem结构
EventItem结构中除了包含文件描述符和其对应的读回调、写回调和异常回调之外,还包含一个输入缓冲区inbuffer、一个输出缓冲区outbuffer以及一个回指指针R。
- 当某个文件描述符的读事件就绪时,我们会调用recv函数读取客户端发来的数据,但我们并不能保证我们读取到了一个完整的报文,因此需要将读取到的数据暂时存放到该文件描述符对应的inbuffer当中,当inbuffer当中可以分离出一个完整的报文后再将其分离出来进行数据处理,这里的inbuffer本质就是用来解决粘包问题的。
- 当处理完一个报文请求后,需要将响应数据发送给客户端,但我们并不能保证底层TCP的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的数据暂时存放到该文件描述符对应的outbuffer当中,当底层TCP的发送缓冲区中有空间,即写事件就绪时,再依次发送outbuffer当中的数据。
- EventItem结构当中设置回指指针R,便于快速找到我们定义的Reactor对象,因为后续我们需要根据EventItem结构找到这个Reactor对象。比如当连接事件就绪时,需要调用Reactor类当中的AddEvent函数将其添加到Dispatcher当中。
此外,EventItem结构当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。
代码如下:
typedef int(*callback_t)(EventItem*);class EventItem{
public:int _sock; //文件描述符Reactor* _R; //回指指针callback_t _recv_handler; //读回调callback_t _send_handler; //写回调callback_t _error_handler; //异常回调std::string _inbuffer; //输入缓冲区std::string _outbuffer; //输出缓冲区
public:EventItem(): _sock(-1), _R(nullptr), _recv_handler(nullptr), _send_handler(nullptr), _error_handler(nullptr){}//管理回调void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler){_recv_handler = recv_handler;_send_handler = send_handler;_error_handler = error_handler;}~EventItem(){}
};
Reactor类
在Reactor类当中有一个unordered_map成员,用于建立文件描述符和与其对应的EventItem结构之间的映射,还有一个epfd成员,该成员是epoll模型对应的文件描述符。
- 在初始化Reactor对象的时候就可以调用epoll_create函数创建epoll模型,并将该epoll模型对应的文件描述符用epfd成员记录下来,便于后续使用。
- Reactor对象在析构的时候,需要调用close函数将该epoll模型进行关闭。
代码如下:
#define SIZE 256class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:Reactor(): _epfd(-1){}void InitReactor(){//创建epoll模型_epfd = epoll_create(SIZE);if (_epfd < 0){std::cerr << "epoll_create error" << std::endl;exit(5);}}~Reactor(){if (_epfd >= 0){close(_epfd);}}
};
Dispatcher函数(事件分派器)
Reactor类当中的Dispatcher函数就是之前所说的初始分发器,这里我们更形象的将其称之为事件分派器。
- 事件分派器要做的就是调用epoll_wait函数等待事件发生。
- 当某个文件描述符上的事件发生后,先通过unordered_map找到该文件描述符对应的EventItem结构,然后调用EventItem结构当中对应的回调函数对该事件进行处理即可。
代码如下:
#define MAX_NUM 64class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public://事件分派器void Dispatcher(int timeout){struct epoll_event revs[MAX_NUM];int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);for (int i = 0; i < num; i++){int sock = revs[i].data.fd; //就绪的文件描述符if ((revs[i].events&EPOLLERR) || (revs[i].events&EPOLLHUP)){ //异常事件就绪(优先处理)if (_event_items[sock]._error_handler)_event_items[sock]._error_handler(&_event_items[sock]); //调用异常回调}if (revs[i].events&EPOLLIN){ //读事件就绪if (_event_items[sock]._recv_handler)_event_items[sock]._recv_handler(&_event_items[sock]); //调用读回调}if (revs[i].events&EPOLLOUT){ //写事件就绪if (_event_items[sock]._send_handler)_event_items[sock]._send_handler(&_event_items[sock]); //调用写回调}}}
};
说明一下:
- 这里没有用switch或if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断。
- 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for循环内部进行事件处理。
- 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不会进入到for循环内部进行事件处理。
- 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数对事件进行处理。
- 事件处理时最好先对异常事件进行处理,因此代码中将异常事件的判断放在了最前面。
AddEvent函数
Reactor类当中的AddEvent函数是用于进行事件注册的。
- 在注册事件时需要传入一个文件描述符和一个事件集合,表示需要监视哪个文件描述符上的哪些事件。
- 还需要传入该文件描述符对应的EventItem结构,表示当该文件描述符上的事件就绪后应该执行的回调方法。
- 在AddEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符及其对应的事件集合注册到epoll模型当中,然后建立该文件描述符与其对应的EventItem结构的映射关系。
代码如下:
class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:void AddEvent(int sock, uint32_t event, const EventItem& item){struct epoll_event ev;ev.data.fd = sock;ev.events = event;if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0){ //将该文件描述符添加到epoll模型当中std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;}else{//建立sock与EventItem结构的映射关系_event_items.insert({ sock, item });std::cout << "添加: " << sock << " 到epoll模型中,成功" << std::endl;}}
};
DelEvent函数
Reactor类当中的DelEvent函数是用于进行事件删除的。
- 在删除事件时只需要传入一个文件描述符即可。
- 在DelEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符从epoll模型中删除,并取消该文件描述符与其对应的EventItem结构的映射关系。
代码如下:
class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:void DelEvent(int sock){if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0){ //将该文件描述符从epoll模型中删除std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;}else{//取消sock与EventItem结构的映射关系_event_items.erase(sock);std::cout << "从epoll模型中删除: " << sock << ",成功" << std::endl;}}
};
EnableReadWrite函数
Reactor类当中的EnableReadWrite函数,用于使能或使能某个文件描述符的读写事件。
- 调用EnableReadWrite函数时需要传入一个文件描述符,表示需要设置的是哪个文件描述符对应的事件。
- 还需要传入两个bool值,分别表示需要使能还是使能读写事件。
- EnableReadWrite函数内部会调用epoll_ctl函数修改将该文件描述符的监听事件。
代码如下:
class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:void EnableReadWrite(int sock, bool read, bool write){struct epoll_event ev;ev.data.fd = sock;ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0){ //修改该文件描述符所需要监视的事件std::cerr << "epoll_ctl mod error, fd: " << sock << std::endl;}}
};
回调函数
下面我们就可以实现一些回调函数,这里主要实现四个回调函数。
- accepter:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
- recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
- sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
- errorer:当异常事件就绪时可以调用该函数将对应的文件描述符进行关闭。
当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数到EventItem结构当中。
- 我们会将监听套接字对应的EventItem结构当中的recv_handler设置为accepter,因为监听套接字的读事件就绪就意味着连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的send_handler和error_handler可以设置为nullptr。
- 当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的EventItem结构当中的recv_handler回调,此时就会调用accepter回调获取底层建立好的连接。
- 而对于与客户端建立连接的套接字,我们会将其对应的EventItem结构当中的recv_handler、send_handler和error_handler分别设置为这里的recver、sender和error。
- 当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的EventItem结构当中对应的回调函数,也就是这里的recver、sender和error。
accepter回调
accepter回调用于处理连接事件,其工作流程如下:
- 调用accept函数获取底层建立好的连接。
- 将获取到的套接字设置为非阻塞,并为其创建EventItem结构,填充EventItem结构当中的各个字段,并注册该套接字相关的回调方法。
- 将该套接字及其对应需要关心的事件注册到Dispatcher当中。
下一次Dispatcher在进行事件派发时就会帮我们关注该套接字对应的事件,当事件就绪时就会执行该套接字对应的EventItem结构中对应的回调方法。
代码如下:
int accepter(EventItem* item)
{while (true){struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));socklen_t len = sizeof(peer);int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);if (sock < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //并没有读取出错,只是底层没有连接了return 0;}else if (errno == EINTR){ //读取的过程被信号中断了continue;}else{ //获取连接失败std::cerr << "accept error" << std::endl;return -1;}}SetNonBlock(sock); //将该套接字设置为非阻塞//构建EventItem结构EventItem sock_item;sock_item._sock = sock;sock_item._R = item->_R;sock_item.ManageCallbacks(recver, sender, errorer); //注册回调方法Reactor* R = item->_R;R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item); //将该套接字及其对应的事件注册到Dispatcher中}return 0;
}
需要注意的是,因为这里实现的ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。
- 因为ET模式下只有当底层建立的连接从无到有或是从有到多时才会通知上层,如果没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有读取完的连接就相当于丢失了,所以需要循环多次调用accept函数获取底层建立好的连接。
- 循环调用accept函数也就意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住,因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就会返回,而不会被阻塞住。
accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。
设置文件描述符为非阻塞
设置文件描述符为非阻塞时,需要先调用fcntl函数获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK
,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。
代码如下:
//设置文件描述符为非阻塞
bool SetNonBlock(int sock)
{int fl = fcntl(sock, F_GETFL);if (fl < 0){std::cerr << "fcntl error" << std::endl;return false;}fcntl(sock, F_SETFL, fl | O_NONBLOCK);return true;
}
监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数会以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。
- 如果错误码为
EAGAIN
或EWOULDBLOCK
,说明本次出错返回是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次accepter调用成功。 - 如果错误码为
EINTR
,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。 - 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次accepter调用失败。
accept、recv和send等IO系统调用为什么会被信号中断?
IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前就被信号中断了,也就是说IO系统调用在陷入内核,但并没有返回用户态的时候内核跑去处理其他信号了。
- 在内核态返回用户态之前检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,那么内核就会对该信号进行处理。
- 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上就是一个特例,因为IO过程分为“等”和“拷贝”两个步骤,而一般“等”的过程比较漫长,而这个过程中我们的执行流其实是处于闲置状态的,因此在“等”的过程中如果有信号产生,内核就会立即进行信号的处理。
写事件是按需打开的
这里调用accept获取上来的套接字在添加到Dispatcher中时,只添加了EOPLLIN
和EPOLLET
事件,也就是说只让epoll帮我们关心该套接字的读事件。
- 这里之所以没有添加写事件,是因为当前我们并没有要发送的数据,因此没有必要让epoll帮我们关心写事件。
- 一般读事件是经常会被设置的,而写事件则是按需打开的,只有当我们有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭。
recver回调
recver回调用于处理读事件,其工作流程如下:
- 循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应EventItem结构的inbuffer当中。
- 对inbuffer当中的数据进行切割,将完整的报文切割出来,剩余的留在inbuffer当中。
- 对切割出来的完整报文进行反序列化。
- 业务处理。
- 业务处理后形成响应报文。
- 将响应报头添加到对应EventItem结构的outbuffer当中,并打开写事件。
下一次Dispatcher在进行事件派发时就会帮我们关注该套接字的写事件,当写事件就绪时就会执行该套接字对应的EventItem结构中写回调方法,进而将outbuffer中的响应数据发送给客户端。
代码如下:
int recver(EventItem* item)
{if (item->_sock < 0) //该文件描述符已经被关闭return -1;//1、数据读取if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败item->_error_handler(item);return -1;}//2、报文切割std::vector<std::string> datagrams;StringUtil::Split(item->_inbuffer, &datagrams, "X");for (auto s : datagrams){//3、反序列化struct data d;StringUtil::Deserialize(s, &d._x, &d._y, &d._op);//4、业务处理int result = 0;switch (d._op){case '+':result = d._x + d._y;break;case '-':result = d._x - d._y;break;case '*':result = d._x * d._y;break;case '/':if (d._y == 0){std::cerr << "Error: div zero!" << std::endl;continue; //继续处理下一个报文}else{result = d._x / d._y;}break;case '%':if (d._y == 0){std::cerr << "Error: mod zero!" << std::endl;continue; //继续处理下一个报文}else{result = d._x % d._y;}break;default:std::cerr << "operation error!" << std::endl;continue; //继续处理下一个报文}//5、形成响应报文std::string response;response += std::to_string(d._x);response += d._op;response += std::to_string(d._y);response += "=";response += std::to_string(result);response += "X"; //报文与报文之间的分隔符//6、将响应报文添加到outbuffer中item->_outbuffer += response;if (!item->_outbuffer.empty())item->_R->EnableReadWrite(item->_sock, true, true); //打开写事件}return 0;
}
一、数据读取
我们可以将循环调用recv函数读取数据的过程封装成一个recver_helper函数。
- recver_helper函数要做的就是循环调用recv函数将读取到的数据添加到inbuffer当中。
- 当recv函数的返回值小于0时同样需要进一步判断错误码,如果错误码为
EAGAIN
或EWOULDBLOCK
则说明底层数据读取完毕了,如果错误码为EINTR
则说明读取过程被信号中断了,此时还需要继续调用recv函数进行读取,否则就是读取出错了。 - 当读取出错时直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
代码如下:
int recver_helper(int sock, std::string* out)
{while (true){char buffer[128];ssize_t size = recv(sock, buffer, sizeof(buffer)-1, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //数据读取完毕return 0;}else if (errno == EINTR){ //被信号中断,继续尝试读取continue;}else{ //读取出错return -1;}}else if (size == 0){ //对端连接关闭return -1;}//读取成功buffer[size] = '\0';*out += buffer; //将读取到的数据添加到该套接字对应EventItem结构的inbuffer中}
}
二、报文切割
报文切割本质就是为了防止粘包问题,而粘包问题实际是涉及到协议定制的。
- 因为我们需要根据协议知道如何将各个报文进行分离,比如UDP分离报文采用的就是定长报头+自描述字段。
- 我们的目的是演示整个数据处理的过程,为了简单起见就不进行过于复杂的协议定制了,这里我们就以“X”作为各个报文之间的分隔符,每个报文的最后都会以一个“X”作为报文结束的标志。
- 因此现在要做的就是以“X”作为分隔符对inbuffer当中的字符串进行切割,这里将这个过程封装成一个Split函数并放到一个StringUtil工具类当中。
- Split函数要做的就是对inbuffer当中的字符串进行切割,将切割出来的一个个报文放到vector当中,对于最后无法切出完整报文的数据就留在inbuffer当中即可。
代码如下:
class StringUtil{
public:static void Split(std::string& in, std::vector<std::string>* out, std::string sep){int start = 0;size_t pos = in.find(sep, start);while (pos != std::string::npos){out->push_back(in.substr(start, pos - start));start = pos + sep.size();pos = in.find(sep, start);}in = in.substr(start);}
};
三、反序列化
在数据发送之前需要进行序列化encode,接收到数据之后需要对数据进行反序列化decode。
- 序列化就是将对象的状态信息转换为可以存储或传输的形式(字节序列)的过程。
- 反序列化就是把字节序列恢复为原对象的过程。
实际反序列化也是与协议定制相关的,假设这里的epoll服务器向客户端提供的就是计算服务,客户端向服务器发来的都是需要服务器计算的计算表达式,因此可以用一个结构体来描述这样一个计算表达式,结构体当中包含两个操作数x和y,以及一个操作符op。
struct data{int _x;int _y;char _op;
};
此时这里所谓的反序列化就是将一个计算表达式转换成这样一个结构体,
- 因此现在要做的就是将形如“1+2”这样的计算表达式转换成一个结构体,该结构体当中的x成员的值就是1,y的值就是2,op的值就是‘+’,这里将这个过程封装成一个Deserialize函数并放到StringUtil工具类当中。
- Deserialize函数要做的工作其实也很简单,就是在传入的字符串当中找到操作符op,此时操作符左边的就是操作数x,右边的就是操作数y。
代码如下:
class StringUtil{
public:static void Deserialize(std::string& in, int* x, int* y, char* op){size_t pos = 0;for (pos = 0; pos < in.size(); pos++){if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')break;}if (pos < in.size()){std::string left = in.substr(0, pos);std::string right = in.substr(pos + 1);*x = atoi(left.c_str());*y = atoi(right.c_str());*op = in[pos];}else{*op = -1;}}
};
说明一下: 实际在做项目时不需要我们自己进行序列化和反序列化,我们一般会直接用JSON或XML这样的序列化反序列化工具。
四、业务处理
业务处理就是服务器拿到客户端发来的数据后,对数据进行数据分析,最终拿到客户端想要的资源。
- 我们这里要做的业务处理非常简单,就是用反序列化后的数据进行数据计算,此时得到的计算结果就是客户端想要的。
五、形成响应报文
在业务处理后我们已经拿到了客户端想要的数据,现在我们要做的就是形成响应报文,由于我们这里规定每个报文都以“X”作为报文结束的标志,因此在形成响应报文的时候,就需要在每一个计算结果后面都添加上一个“X”,表示这是之前某一个请求报文的响应报文,因为协议制定后就需要双方遵守。
六、将响应报文添加到outbuffer中
响应报文构建完后需要将其添加到该套接字对应的outbuffer中,并打开该套接字的写事件,此后当写事件就绪时就会将outbuffer当中的数据发送出去。
sender回调
sender回调用于处理写事件,其工作流程如下:
- 循环调用send函数发送数据,并将发送出去的数据从该套接字对应EventItem结构的outbuffer中删除。
- 如果循环调用send函数后该套接字对应的outbuffer当中的数据被全部发送,此时就需要将该套接字对应的写事件关闭,因为已经没有要发送的数据了,如果outbuffer当中的数据还有剩余,那么该套接字对应的写事件就应该继续打开。
代码如下:
int sender(EventItem* item)
{if (item->_sock < 0) //该文件描述符已经被关闭return -1;int ret = sender_helper(item->_sock, item->_outbuffer);if (ret == 0){ //全部发送成功,不再关心写事件item->_R->EnableReadWrite(item->_sock, true, false);}else if (ret == 1){ //没有发送完毕,还需要继续关心写事件item->_R->EnableReadWrite(item->_sock, true, true);}else{ //写入出错item->_error_handler(item);}return 0;
}
我们可以将循环调用send函数发送数据的过程封装成一个sender_helper函数。
- sender_helper函数要做的就是循环调用send函数将outbuffer中的数据发送出去。
- 当send函数的返回值小于0时也需要进一步判断错误码,如果错误码为
EAGAIN
或EWOULDBLOCK
则说明底层TCP发送缓冲区已经被写满了,这时需要将已经发送的数据从outbuffer中移除。 - 如果错误码为
EINTR
则说明发送过程被信号中断了,此时还需要继续调用send函数进行发送,否则就是发送出错了。 - 当发送出错时也直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
- 如果最终outbuffer当中的数据全部发送成功,则将outbuffer清空即可。
代码如下:
int sender_helper(int sock, std::string& in)
{size_t total = 0; //累加已经发送的字节数while (true){ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //底层发送缓冲区已经没有空间了in.erase(0, total); //将已经发送的数据移出outbufferreturn 1; //缓冲区写满,没写完}else if (errno == EINTR){ //被信号中断,继续尝试写入continue;}else{ //写入出错return -1;}}total += size;if (total >= in.size()){in.clear(); //清空outbufferreturn 0; //全部写入完毕}}
}
errorer回调
errorer回调用于处理异常事件。
- 对于异常事件就绪的套接字我们这里不做其他过多的处理,简单的调用close函数将该套接字关闭即可。
- 但是在关闭该套接字之前,需要先调用DelEvent函数将该套接字从epoll模型中删除,并取消该套接字与其对应的EventItem结构的映射关系。
- 由于在Dispatcher当中是先处理的异常事件,为了避免该套接字被关闭后继续进行读写操作,然后因为读写操作失败再次调用errorer回调重复关闭该文件描述符,因此在关闭该套接字后将其EventItem当中的文件描述符值设置为-1。
- 在调用recver和sender回调执行读写操作之前,都会判断该EventItem结构当中的文件描述符值是否有效,如果无效则不会进行后续操作。
代码如下:
int errorer(EventItem* item)
{item->_R->DelEvent(item->_sock); //将该文件描述符从epoll模型中删除,并取消该文件描述符与其EventItem结构的映射关系close(item->_sock); //关闭该文件描述符item->_sock = -1; //防止关闭后继续执行读写回调return 0;
}
套接字相关
这里可以编写一个Socket类,对套接字相关的接口进行一定程度的封装,为了让外部能够直接调用Socket类当中封装的函数,于是将这些函数定义成了静态成员函数。
代码如下:
class Socket{
public://创建套接字static int SocketCreate(){int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){std::cerr << "socket error" << std::endl;exit(2);}//设置端口复用int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}//绑定static void SocketBind(int sock, int port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;socklen_t len = sizeof(local);if (bind(sock, (struct sockaddr*)&local, len) < 0){std::cerr << "bind error" << std::endl;exit(3);}}//监听static void SocketListen(int sock, int backlog){if (listen(sock, backlog) < 0){std::cerr << "listen error" << std::endl;exit(4);}}
};
运行epoll ET服务器
运行我们的epoll ET服务器的步骤如下:
- 首先需要进行的就是套接字的创建、绑定和监听,因为是ET模式下的epoll服务器,因此监听套接字创建出来后需要将其设置为非阻塞。
- 然后就可以实例化一个Reactor对象,并对其进行初始化,也就是创建epoll模型。
- 紧接着需要为监听套接字定义一个EventItem结构,填充EventItem结构当中的各个字段,并将accepter回调设置为监听套接字的读回调方法。
- 然后调用AddEvent函数将监听套接字及其需要关系的事件添加到Dispatcher当中,该过程包括将监听套接字注册到epoll模型中,以及建立监听套接字与其对应EventItem结构的映射。
- 最后就可以循环调用Reactor类当中的Dispatcher函数进行事件派发了。
代码如下:
#include "app_interface.hpp"
#include "reactor.hpp"
#include "socket.hpp"
#include "util.hpp"
#include <string>#define BACK_LOG 5static void Usage(std::string proc)
{std::cout << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char* argv[])
{if (argc != 2){Usage(argv[0]);exit(1);}int port = atoi(argv[1]);//服务器监听套接字的创建、绑定和监听int listen_sock = Socket::SocketCreate();SetNonBlock(listen_sock); //将监听套接字设置为非阻塞Socket::SocketBind(listen_sock, port);Socket::SocketListen(listen_sock, BACK_LOG);//创建Reactor,并初始化Reactor R;R.InitReactor();//创建监听套接字对应的EventItem结构 EventItem item;item._sock = listen_sock;item._R = &R;item.ManageCallbacks(accepter, nullptr, nullptr); //监听套接字只需要关心读事件//将监听套接字托管给DispatcherR.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);//循环进行事件派发int timeout = 1000;while (true){R.Dispatcher(timeout);}return 0;
}
参考文献:
http://t.csdn.cn/pN4A9