目录
Reactor完整代码连接
前置知识:
1.普通的epoll读写有什么问题?
2.Connection内的回调函数是什么
3.服务器的初始化(Connection只是使用的一个结构体)
4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;
5._listenSock读取 :Accepter函数获取新链接怎么处理的
6.普通套接字读取:Recver
7.对写事件的关心是按需关系的:
8.执行效果:
9.Reactor的优势:
Reactor完整代码连接
前置知识:
Reactor叫做反应堆模式;反应:对已经就绪的事件(读、写、异常)进行处理;
我们使用epoll来实现,select、poll、epoll是多路转接的发展史,epoll完善了select、poll的缺点;
- 需要程序员维护数组 select/poll都有这个缺点
- 有大量的遍历 select/poll都有这个缺点
- 大量参数为输入输出型参数,需要重新设置 select有
- 管理的fd有上限 select有
1.普通的epoll读写有什么问题?
- 使用的是一个静态数组读取;报文长,读到就是不完整报文,报文短,一次读取可能有多个报文,最后一个报文可能不是完整的;
- 这样的错误报文没法分析和处理,就不能构造响应报文;
综上所述:问题为没法保证读取到的是完整报文,导致没法分析和处理、构建响应报文;
void Read(int fd){char buff[1024];ssize_t s = read(fd, buff, 1023);if(s > 0){buff[s] = 0;LOG2(INFO, buff, fd);}
解决办法:将文件描述符封装,并且有接受发送缓冲区,使用string就可以;
- 使用静态数组读取,然后添加到接受缓冲区保存;
- 读取完毕,对接受缓冲区分析是否有完整报文;
- 处理完整请求报文,构建响应报文添加到发送缓冲区;
using func_t = std::function<void(Connection*)>;
using cals_t = std::function<void(std::string &, Connection*)>;class Connection{
public:Connection(int fd = -1 ):_fd(fd), _ts(nullptr){}~Connection(){if(_fd >= 0)close(_fd);}
public:int _fd;//读写异常回调方法func_t _recver;func_t _sender;func_t _exception;//接受缓冲区std::string _outbuff;//发送缓冲区std::string _inbuff;//TcpServer的回指指针,对写事件的关心是按需打开TcpServer *_ts;//连接最近活跃活动的时间time_t _times;
};
2.Connection内的回调函数是什么
一个包装器;返回值为void,参数为Connection*,的函数指针、仿函数、lamada表达,它都可以接受;
using func_t = std::function<void(Connection*)>;
优势:
- _listenSock是读方法是接受新连接,普通是读取请求报文
- 初始化时设置读写异常回调方法(回调:使用函数指针执行的函数),不需要判断是_listenSock还是普通套接字,统一使用con->_recver;
3.服务器的初始化(Connection只是使用的一个结构体)
- 套接字创建,bind,监听;
- 构建epoll模型,epoll函数也封装了,_epfd封装在Epoll类内;
- 初始化_listenSock Connection结构体;读取回调方法Accept是类函数,多一个this指针,需要使用std::bind来改变参数个数,进行传递给包装器;
- epoll_wait的事件就绪队列初始化;
class TcpServer{const static int gport = 8080;const static int gnum = 128;TcpServer(int port = gport, int num = gnum):_port(gport), _evts_num(num){//套接字,创建bind监听_listenSock = Sock::Socket();Sock::Bind(_listenSock,_port);Sock::Listen(_listenSock);//构建epoll模型_epoll.CreateEpoll();//listensock添加epoll模型和_connections管理起来AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);//epoll_wait就绪队列,获取已就绪的事件_evts = new epoll_event[_evts_num];}~TcpServer(){if(_listenSock >= 0)close(_listenSock);if(_evts != nullptr)delete[] _evts;for(auto &pr : _connections){_connections.erase(pr.first);delete pr.second;}}
private:int _listenSock;//epollEpoll _epoll;//就绪队列epoll_event* _evts;int _evts_num;//管理connection对象std::unordered_map<int, Connection*> _connections;int _port;//业务处理的回调指针cals_t _cb;
};
4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;
void LoopOnce(){int n = _epoll.WaitEpoll(_evts, _evts_num);//有事件就绪if(n > 0){//LOG2(INFO, "epoll wait success",fd);for(int i=0; i<n; i++){int fd = _evts[i].data.fd;int events = _evts->events;//连接关闭或者错误,改为读写统一处理,读写失败调异常处理;if( events & EPOLLHUP){LOG2(INFO,"连接关闭",fd);events |= (EPOLLIN | EPOLLOUT);}if( events & EPOLLERR){LOG2(INFO,"错误",fd);events |= (EPOLLIN | EPOLLOUT);} if(_connections.count(fd) && events & EPOLLIN){if(IsConnectionExits(fd) && _connections[fd]->_recver != nullptr){_connections[fd]->_recver(_connections[fd]);}}if(_connections.count(fd) && events & EPOLLOUT){if(IsConnectionExits(fd) && _connections[fd]->_sender != nullptr)_connections[fd]->_sender(_connections[fd]);}}}else if(n == 0){LOG(INFO, "timeout");}else{LOG(FATAL,"epoll wait fail");exit(4);}}void Dispatcher(cals_t cb){_cb = cb;while(true){//去除不活跃的连接DeleteInactivity();LoopOnce();}}
5._listenSock读取 :Accepter函数获取新链接怎么处理的
- 得到新连接,如果新的fd是合法的,设置对应的读写异常回调方法,读:读取请求报文,写:发送响应报文,异常:出现错误进行处理;
- 所有的套接字都是使用ET模式(通知效率高,只支持非阻塞读写),EPOLLET因该被设置;
void Accepter(Connection * con){while(true){con->_times = time(nullptr);struct sockaddr_in tmp;socklen_t tlen = sizeof(tmp);int new_sock = accept(con->_fd, (struct sockaddr *)&tmp, &tlen);if(new_sock < 0){//所以事件处理完毕if(errno == EAGAIN || errno == EWOULDBLOCK)break;else if(errno == EINTR)//可能被信号中断,概率极小continue;else{std::cout << "accept fail , errno :" << errno << strerror(errno) << std::endl;break;}} else//添加到epoll模型和_connections中管理;{if(AddConnection(new_sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1), std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Exception, this, std::placeholders::_1)))LOG2(INFO, "add connection success",new_sock);elseLOG2(RISK, "add connection fail", new_sock);}}}
6.普通套接字读取:Recver
- 一直读取,直到错误或者读取完毕;每次读取的结果都放到接受缓冲区;
- 读取完毕,对接受缓冲区处理,拿出一个个完整的报文,对请求报文进行业务处理;
void Recver(Connection *con){con->_times = time(nullptr);const int buff_size = 1024;char buff[buff_size];while(true){ssize_t s = recv(con->_fd, buff, buff_size - 1, 0);if(s > 0){buff[s] = 0;con->_outbuff += buff;}else if(s == 0){LOG2(INFO, "写端关闭", con->_fd);con->_exception(con);return;}else{//读取完毕if(errno == EAGAIN || errno == EWOULDBLOCK ){LOG2(INFO, "处理完毕", con->_fd);break;}else if(errno == EINTR)continue;else{LOG2(ERROR, "recv fail ,fd: ", con->_fd);con->_exception(con);return;}}}std::cout << "fd: " << con->_fd << "outbuff: " << con->_outbuff <<std::endl;//对outbuff内的完整报文,进行处理std::vector<std::string> out;//分隔报文,函数在protocol.hppSplitMessage(out, con->_outbuff);for(auto &s : out)_cb(s, con);//业务逻辑回调指针,在主函数}
7.对写事件的关心是按需关系的:
- 如果开启关心,还没有数据发送,写事件会一直就绪;所以按需关心;
- 请求报文业务处理完毕,构建好响应报文,一定有响应,打开对写事件的关心;
- 也是Connection为什么封装一个Tcperver指针的原因,这里开启写事件的关心;
//业务处理
void CalArguments(std::string &str, Connection *con)
{//请求报文反序列化Request req;//std::cout<<str <<std::endl;if(!req.Deserialize(str)){LOG2(ERROR, "deseroalize fail" ,con->_fd);return;}//对数据处理Response res;calculator(req, res);//响应报文序列化std::string s = res.Serialize();//添加到inbuffcon->_inbuff += s;//一定有响应报文,打开写事件的关系con->_ts->EnableReadWrite(con->_fd, true, true);
}
8.执行效果:
- 我写的协议是对任意两个数加减乘除;
- 每个请求或者响应都是用 x 做为分割符的;
9.Reactor的优势:
和进程/线程做比较:
- 它是一个单进程的就可以并发处理请求的服务器,比进程/线程减少了创建、销毁、调度的时间;
- 它的等待一批fd,减少了单位等待时间;一个线程等待对应一个fd;
- 有很高的复用性,替换业务逻辑就行了;