Linux--epoll(ET)实现Reactor模式

Linux–多路转接之epoll

Reactor反应堆模式

Reactor反应堆模式是一种事件驱动的设计模式,通常用于处理高并发的I/O操作,尤其是在服务器或网络编程中。

基本概念

Reactor模式又称之为响应器模式,基于事件多路复用机制,使得单个线程能够同时管理大量并发连接,而不需要为每个连接创建一个独立的线程。它通过一个事件分发器(Reactor)来监听和管理不同的I/O事件,当事件发生时,分发器会将该事件分发给对应的事件处理器来处理。

核心组件

  • 事件分发器(Reactor):负责监听各种事件源(如socket、文件描述符)并将事件分发给相应的处理器。事件分发器通常使用I/O多路复用机制(如select、poll、epoll)来同时监听多个I/O事件。
  • 事件处理器(Event Handler):定义了如何处理特定事件。当事件分发器检测到某个事件时,就会触发相应的事件处理器中的回调函数。
  • 同步事件分离器(Demultiplexer):本质上是系统调用,用于监听事件源上的事件,并将事件通知给事件分发器。例如,在Linux中,可以使用select、poll或epoll等系统调用来实现同步事件分离器。

工作流程

  • 注册事件:事件分发器注册需要监听的I/O事件(如连接、读写),并关联相应的事件处理器。
  • 进入循环:事件分发器进入循环,使用I/O多路复用机制来监听注册的I/O事件。
  • 分发事件:一旦某个I/O事件发生,事件分发器会将该事件分发给对应的事件处理器。
  • 处理事件:事件处理器执行预定义的操作来处理该事件。处理完成后,可能会重新注册事件或关闭连接。
    在这里插入图片描述

epoll服务器(ET)

服务器监听一个指定的端口,当有新的连接请求到来时,服务器接受连接并将其注册到Reactor中,以便处理后续的数据读写事件。

Socket.hpp

包含了一个抽象基类 Socket 和一个继承自 Socket 的具体实现类 TcpSocket。提供一个面向对象的网络套接字编程接口,允许用户通过继承和实现基类中的纯虚函数来创建不同类型的套接字(例如 TCP 套接字)。

#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Comm.hpp"
namespace socket_ns
{class Socket;const static int gbacklog=8;//默认最大连接数using socket_sptr=std::shared_ptr<Socket>;//套接字指针enum{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERROR,USAGE_ERROR};//在基类创建一系列虚函数,只要派生类能用到就在这里创建class Socket{public:virtual void CreateSocketOrDie() =0; //创建套接字virtual void BindSocketOrDie(InetAddr& addr) =0;  //绑定套接字virtual void ListenSocketOrDie()=0; //监听套接字virtual int Accepter(InetAddr* addr,int* code) =0; //接受客户端virtual bool Connector(InetAddr &addr) = 0; //连接客户端virtual int SockFd() = 0; //获取Sockfdvirtual int Recv(std::string *out) = 0; //接收对方信息virtual int Send(const std::string &in) = 0; //发送给对方信息virtual void Close()=0; //关闭对应文件public://创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建void BuildListenSocket(InetAddr& addr){CreateSocketOrDie();BindSocketOrDie(addr);ListenSocketOrDie();}bool BuildClientSocket(InetAddr &addr){CreateSocketOrDie();return Connector(addr);}};class TcpSocket : public Socket{public:TcpSocket(int sockfd=-1):_sockfd(sockfd){}void CreateSocketOrDie() override  //override明确的重写基类函数{_sockfd=socket(AF_INET,SOCK_STREAM,0);if(_sockfd<0){LOG(FATAL, "socket error");exit(SOCKET_ERROR);}SetNonBlock(_sockfd);LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);}void BindSocketOrDie(InetAddr& addr) override{struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(addr.Port());local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));if (n < 0){LOG(FATAL, "bind error");exit(BIND_ERROR);}LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);}void ListenSocketOrDie() override{int n=listen(_sockfd,gbacklog);if (n < 0){LOG(FATAL, "listen error");exit(LISTEN_ERROR);}LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);}int Accepter(InetAddr* addr,int* code) override{struct sockaddr_in peer;socklen_t len=sizeof(peer);int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);*code=errno;if (sockfd < 0){LOG(WARNING, "accept error\n");return -1;}*addr=peer;SetNonBlock(sockfd);//socket_sptr sock=std::make_shared<TcpSocket>(sockfd);return sockfd;}virtual bool Connector(InetAddr& addr){struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family=AF_INET;server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());server.sin_port=htons(addr.Port());int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));if (n < 0){std::cerr << "connect error" << std::endl;return false;}return true;}int Recv(std::string *out) override{char inbuffer[1024];ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);if (n > 0){inbuffer[n] = 0;*out += inbuffer; // 接收次数可能不只一次,一般是多次的,}return n;}int Send(const std::string &in) override{int n = send(_sockfd,in.c_str(),in.size(),0);return n;}int SockFd() override{return _sockfd;}void Close() override{if (_sockfd > -1)::close(_sockfd);}~TcpSocket(){}private:int _sockfd;};
}

代码和之前不一样的地方是实现了非阻塞套接字的设置
在这里插入图片描述

Calculate.hpp

用于执行基本的算术运算

#pragma once
#include <iostream>
#include "ProToCol.hpp"using namespace protocol_ns;class Calculate
{
public:Calculate(){}//根据输入的请求通过实际计算转换为结果Response Excute(const Request &req){Response resp(0, 0);switch (req._oper){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':{if (req._y == 0){resp._code = 1;}else{resp._result = req._x / req._y;}}break;case '%':{if (req._y == 0){resp._code = 2;}else{resp._result = req._x % req._y;}}break;default:resp._code = 3;break;}return resp;}~Calculate(){}private:
};

protocol.hpp

用于处理网络通信中数据序列化和反序列化、编码和解码以及请求和响应对象生成的类和函数.

#pragma once 
#include <iostream>
#include <string>
#include<unistd.h>
#include<memory>
#include<jsoncpp/json/json.h>namespace protocol_ns
{// 协议的样子:// 报文 = 报头+有效载荷// "有效载荷的长度"\r\n"有效载荷"\r\nconst std::string SEP= "\r\n";// 解决TCP的粘报问题,TCP 读取不全的问题std::string Encode(const std::string &json_str){int json_str_len = json_str.size(); //有效载荷的长度std::string proto_str = std::to_string(json_str_len); //转为stringproto_str += SEP; //+ 分隔符proto_str += json_str;// + 数据字符串proto_str += SEP;// + 分隔符return proto_str; //返回一个报文}//将报文分析出数据字符串出来std::string Decode(std::string &inbuffer){auto pos = inbuffer.find(SEP); //找到分隔符的位置if (pos == std::string::npos)return std::string();std::string len_str = inbuffer.substr(0, pos);//前头的有效数据长度的字符串if (len_str.empty())return std::string();int packlen = std::stoi(len_str);//记录数据字符串的实际长度(传递时的差错主要出在这里)int total = packlen + len_str.size() + 2 * SEP.size(); //报文总长度if (inbuffer.size() < total)return std::string();std::string package = inbuffer.substr(pos + SEP.size(), packlen); //取出数据字符串inbuffer.erase(0, total); //删除掉原先的报文return package;}//请求将我们的数据序列化和反序列化(客户端)    class Request{public:Request(){}Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper){}   //序列化:将结构体数据转换为字符串bool Serialize(std::string* out){Json::Value root; //Json::Value: Json格式的值root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::FastWriter writer;*out=writer.write(root); //将Json值转换为字符串return true;}//反序列化:将字符串转换为结构体数据bool DeSerialize(const std::string& in){Json::Value root;Json::Reader reader;//解析字符串bool res=reader.parse(in,root);//将字符串转为Json值,存放于root中if (!res)return false;//再将Json值转为结构体数据_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();return true;}public:int _x;int _y;char _oper; //操作符 _x 加减乘除 _y};//将结果序列化和反序列化(服务端)class Response{public:Response(){}Response(int result, int code) : _result(result), _code(code){}bool Serialize(std::string *out){// 转换成为字符串Json::Value root;root["result"] = _result;root["code"] = _code;Json::FastWriter writer;// Json::StyledWriter writer;*out = writer.write(root);return true;}bool Deserialize(const std::string &in){Json::Value root;Json::Reader reader;bool res = reader.parse(in, root);if (!res)return false;_result = root["result"].asInt();_code = root["code"].asInt();return true;}public:int _result; // 结果int _code;   // 0:success 1: 除0 2: 非法操作 3. 4. 5};//创建需求class Factory{public:Factory(){srand(time(nullptr) ^ getpid());opers = "+-*/%^&|";                                                                                                                 }std::shared_ptr<Request> BuildRequest(){int x = rand() % 10 + 1;usleep(x * 10);int y = rand() % 5; // [0,1,2,3,4]usleep(y * x * 5);char oper = opers[rand() % opers.size()];std::shared_ptr<Request> req= std::make_shared<Request>(x,y,oper);return req;}std::shared_ptr<Response> BuildResponse(){return std::make_shared<Response>();}~Factory(){}private:std::string opers;};
}
  • SEP:定义了报文分隔符为 “\r\n”。
  • Encode:接受一个 JSON 字符串作为有效载荷,将其长度、分隔符和有效载荷拼接成一个完整的报文字符串返回。
  • Decode:从输入缓冲区 inbuffer 中解析出一个报文,返回有效载荷字符串,并从 inbuffer 中删除已解析的报文。

Request 类:

  • 表示一个计算请求,包含两个整数 _x 和 _y 作为操作数,以及一个字符 _oper 作为运算符。
  • Serialize:将 Request 对象序列化为 JSON 格式的字符串。
  • DeSerialize:将 JSON 格式的字符串反序列化为 Request 对象。

Response 类:

  • 表示一个计算响应,包含一个整数 _result 作为运算结果,以及一个整数 _code 作为状态码。
  • Serialize:将 Response 对象序列化为 JSON 格式的字符串。
  • Deserialize:将 JSON 格式的字符串反序列化为 Response 对象。

Factory 类:用于生成 Request 和 Response 对象的工厂类。
在构造函数中初始化了一个包含所有可能运算符的字符串 opers,并使用当前时间和进程 ID 作为随机数种子。

  • BuildRequest:生成一个随机的 Request 对象,其中包括随机的操作数和运算符。
  • BuildResponse:生成一个默认的 Response 对象。目前,这个实现只是简单地返回了一个新创建的 Response 对象,没有设置任何特定的值。

PackageParse.hpp

负责解析从连接(Connection 对象)中接收到的报文,处理这些报文,并将响应发送回客户端

#pragma once#include <iostream>
#include "Connection.hpp"
#include "ProToCol.hpp"
#include "CalCulate.hpp"using namespace protocol_ns;
//对报文进行解析
class PackageParse
{
public:static void Parse(Connection *conn){// std::cout << "inbuffer: " << conn->Inbuffer() << std::endl;// 2. 分析数据,确认完整报文std::string package;Request req;Calculate cal;while (true){// std::cout << conn->Inbuffer() << std::endl;// conn->AppendOutBuffer(conn->Inbuffer());// break;package = Decode(conn->Inbuffer());//取出缓冲区的报文if (package.empty())break;std::cout << "------------------------begin---------------" << std::endl;std::cout << "resq string:\n"<< package << std::endl;// 3.反序列化req.DeSerialize(package);// 4. 业务处理Response resp = cal.Excute(req);// 5. 对应答做序列化std::string send_str;resp.Serialize(&send_str);std::cout << "resp Serialize:" << std::endl;std::cout << send_str << std::endl;// 6. 添加长度报头send_str = Encode(send_str);std::cout << "resp Encode:" << std::endl;std::cout << send_str << std::endl;//将报文放到发送缓冲区中conn->AppendOutBuffer(send_str);}//将缓冲区内容取出,发送到客户端       if(!conn->OutbufferEmpty()&& conn->_sender!=nullptr){conn->_sender(conn);conn->_R->EnableReadWrite(conn->Sockfd(), true, true);}}
};

Comm.hpp

#pragma once#include <iostream>
#include <unistd.h>
#include <fcntl.h>//错误原因
enum
{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERROR,USAGE_ERROR,EPOLL_CREATE_ERROR,
};//设置为非阻塞的
void SetNonBlock(int fd)
{int fl = ::fcntl(fd, F_GETFL);//获取之前的信息if(fl < 0) {return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);//转换为非阻塞的
}

Connection.hpp

一个网络连接,用于在客户端和服务器之间传输数据;
Connection 类与 Reactor 类一起工作,实现了事件驱动的网络编程模型。
在这个模型中,Reactor 负责监听和处理各种网络事件(如连接、读取、写入等),而 Connection 对象则作为这些事件的处理者。
在这里插入图片描述

#pragma once#include <iostream>
#include <string>
#include <functional>
#include"Reactor.hpp"
#include "InetAddr.hpp"
#include <unistd.h>class Connection;
class Reactor;
using func_t = std::function<void(Connection *)>; //定义出有关连接函数的指针//连接客户端与服务端
//而客户端发出的请求称为事件,服务端称为反应堆,会将事件统收,将处理好的事件派发出去
//连接起到事件与反应堆之间的桥梁
class Connection
{
public:Connection(int sock) : _sock(sock), _R(nullptr){}int Sockfd(){return _sock;}//设置有关事件(初始化)void SetEvents(int events){_events = events;}uint32_t Events(){return _events;}//初始化对应函数指针,调用时触发函数void Register(func_t recver, func_t sender, func_t excepter){_recver = recver;_sender = sender;_excepter = excepter;}//初始化反应堆void SetSelf(Reactor *R){_R = R;}//将数据放入输入缓冲区void AppendInBuffer(const std::string &buff){_inbuffer += buff;}//将数据拿出std::string& Inbuffer() {return _inbuffer;}//将数据放到输出缓冲区void AppendOutBuffer(const std::string &buff){_outbuffer += buff;}//将数据拿出std::string &Outbuffer(){return _outbuffer;}//判断输出缓冲区是不是为空bool OutbufferEmpty(){return _outbuffer.empty();}//将输出缓冲区数据拿出多少void OutbufferRemove(int n){_outbuffer.erase(0, n);}void Close(){if(_sock>=0)::close(_sock);}~Connection(){}func_t _recver; //接收者(调用触发接收函数)func_t _sender; //发送者(调用触发发送函数)func_t _excepter; //其他,处理错误Reactor *_R; //反应堆指针(服务器)
private:int _sock; //sockfdstd::string _inbuffer;//输入缓冲区 std::string _outbuffer;//输出缓冲区 InetAddr _addr;//网络地址uint32_t _events; // Connection对象中,_sock关心的事件集合
};
  • 构造函数:接收一个套接字描述符(sock),并将其存储在私有成员 _sock 中。同时,将 _R(指向 Reactor 的指针)初始化为 nullptr。
  • Sockfd 方法:返回与这个连接关联的套接字描述符。
  • SetEventsEvents 方法:允许设置和查询这个连接关心的事件集合(如可读、可写等)。这些事件用于通知 Reactor 何时应该对这个连接进行操作。
  • Register 方法:允许为这个连接注册三个回调函数:_recver(接收数据时调用)、_sender(发送数据时调用)和 _excepter(处理错误时调用)。这些回调函数是 std::function<void(Connection *)> 类型的,意味着它们可以接受一个指向 Connection 对象的指针作为参数。
  • SetSelf 方法:允许设置这个连接所属的 Reactor 对象(通过 _R 指针)。
  • AppendInBufferInbuffer 方法:用于管理输入缓冲区。AppendInBuffer 方法将接收到的数据添加到输入缓冲区中,而 Inbuffer 方法则返回输入缓冲区的引用。

HandlerConnection.hpp

对具体处理函数的实现:

#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Log.hpp"
#include "Connection.hpp"class HandlerConnection
{
public:HandlerConnection(func_t func):_func(func){}//处理接收void HanlderRecv(Connection *conn){LOG(DEBUG, "HanlderRecv fd : %d\n", conn->Sockfd());while (true){errno = 0; //表示当前没有错误char buffer[1024];//接收存储区域ssize_t n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);//接收函数if(n > 0){buffer[n] = 0;conn->AppendInBuffer(buffer);//将数据放入缓冲区中}else{if(errno == EWOULDBLOCK || errno == EAGAIN)//将报文接收完了(在非阻塞操作中){break;}else if(errno == EINTR)//事件操作时被中断了{continue;}else//出现错误{conn->_excepter(conn);//其他处理return; // 一定要提前返回}}}_func(conn);//调用函数 ,处理解析}//处理发送void HanlderSend(Connection *conn){errno = 0;while(true){ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);//发送到客户端if(n > 0){// n 实际发送了多少conn->OutbufferRemove(n);//发完的在缓冲区去掉if(conn->OutbufferEmpty()) break;}else if(n == 0)//没有发送数据了{break;}else{if(errno == EWOULDBLOCK || errno == EAGAIN)//缓冲区读取完毕{break; }else if(errno == EINTR)//事件中断{continue;}else//出现错误{conn->_excepter(conn);return;}}}//发送缓冲区不为空时if(!conn->OutbufferEmpty()){conn->_R->EnableReadWrite(conn->Sockfd(), true, true); //可读可写}else//发送缓冲区为空时,不可写出{conn->_R->EnableReadWrite(conn->Sockfd(), true, false);}}//处理其他void HanlderExcpet(Connection *conn){errno = 0;LOG(DEBUG, "client quit : %d\n",conn->Sockfd());conn->_R->RemoveConnection(conn->Sockfd());//断开连接}
private:func_t _func;//函数指针
};

Epoller.hpp

封装了 Linux 中 epoll 接口的使用,用于高效地管理大量并发网络连接或文件描述符的事件通知。

#pragma once#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include "Log.hpp"
#include "Comm.hpp"static const int gsize=128;class Epoller
{
private:bool EventMethodCore(int fd,u_int32_t events,int type){struct epoll_event ev;ev.events = events;ev.data.fd = fd;int n = ::epoll_ctl(_epfd, type, fd, &ev);if(n < 0){LOG(ERROR, "epoll_ctl error!\n");return false;}LOG(DEBUG, "epoll_ctl add %d success!\n", fd); // TODOreturn true;}
public://初始化,创建epollEpoller(){_epfd = ::epoll_create(gsize);if (_epfd < 0){LOG(FATAL, "epoll create error!\n");exit(EPOLL_CREATE_ERROR);}LOG(FATAL, "epoll create success, epfd: %d\n", _epfd);}//将事件添加到epoll中bool AddEvent(int fd, uint32_t events){return EventMethodCore(fd,events,EPOLL_CTL_ADD);}//将事件进行修改bool ModEvent(int fd, uint32_t events){return EventMethodCore(fd, events, EPOLL_CTL_MOD);}//对事件进行删除bool DelEvent(int fd){return ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);}//epoll等待事件的发生int Wait(struct epoll_event revs[], int num, int timeout){int n = ::epoll_wait(_epfd, revs, num, timeout);return n;}~Epoller(){if(_epfd >= 0)//析构需要释放掉epoll的fd::close(_epfd);}
private:int _epfd;//epoll的fd
};

EventMethodCore:这是一个辅助方法,用于向 epoll 实例中添加、修改或删除事件。

Listener.hpp

Listener 的类: 用于在指定端口上监听并接受新的连接请求的

#pragma once#include <iostream>
#include <memory>
#include "Connection.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "InetAddr.hpp"
#include "HandlerConnection.hpp"using namespace socket_ns;class Listener
{
public:Listener(int port, HandlerConnection &hc): _port(port),_listensock(std::make_unique<TcpSocket>()),_hc(hc){InetAddr addr("0", _port);_listensock->BuildListenSocket(addr);//创建监听fd}//接收新的连接void Accepter(Connection *conn) {while (true){InetAddr clientaddr;int code = 0;int sockfd = _listensock->Accepter(&clientaddr, &code);//接收新连接cout<<sockfd<<endl;if (sockfd >= 0){// TODO//对反应堆增加新连接conn->_R->AddConnection(sockfd,EPOLLIN | EPOLLET,std::bind(&HandlerConnection::HanlderRecv, &_hc, std::placeholders::_1),std::bind(&HandlerConnection::HanlderSend, &_hc, std::placeholders::_1),std::bind(&HandlerConnection::HanlderExcpet, &_hc, std::placeholders::_1));}else//出现错误{//cout<<123<<endl;if (code == EWOULDBLOCK || code == EAGAIN)//表示接收了所有连接{LOG(DEBUG, "accepter all link!\n");break;}else if (code == EINTR)//事件中断{LOG(DEBUG, "accepter interupt by signal!\n");continue;}else//出现错误{LOG(WARNING, "accept error!\n");break;}}}}int Sockfd(){return _listensock->SockFd();}~Listener(){_listensock->Close();}
private:uint16_t _port; //端口号std::unique_ptr<Socket> _listensock;//监听fdHandlerConnection &_hc; //连接处理事件
};

Accepter:

将这个新的连接添加到 Epoller 中,注册相应的读、写异常事件处理函数。这些处理函数是通过 std::bind 绑定到 HandlerConnection 的成员函数上的。

如果接收过程中出现错误,根据错误码 code 的不同,采取不同的处理方式:

  • EWOULDBLOCKEAGAIN:表示所有可用的连接都已被接受,此时跳出循环。
  • EINTR:表示操作被信号中断,继续尝试接受连接。
  • 其他错误码:记录警告信息,并跳出循环。

Reactor.hpp(重点)

一个使用epoll作为底层事件通知机制的网络服务器框架的核心部分。这个类管理着网络连接,并对这些连接上的事件进行监听和处理。
在这里插入图片描述

#pragma once#include <iostream>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"//反应堆:本质是服务端,对connetion做了管理工作
class Reactor
{const static  int gnum=64;
public:Reactor() : _isrunning(false){}//添加连接到反应堆上void AddConnection(int fd, uint32_t events, func_t recver, func_t sender, func_t excepter){// 1. 构建ConnectionConnection *conn = new Connection(fd);conn->SetEvents(events);//初始化事件conn->Register(recver, sender, excepter);//注册方法conn->SetSelf(this);// 2. 对epoll添加新事件_epller.AddEvent(conn->Sockfd(), conn->Events());// 3. 向_connections添加connection对象_connections.insert(std::make_pair(conn->Sockfd(), conn));}//判断是否有该连接bool ConnectionIsExists(int sockfd){auto iter = _connections.find(sockfd);return iter != _connections.end();}//读写驱动的更改void EnableReadWrite(int sockfd, bool readable, bool writeable){uint32_t events = (readable?EPOLLIN:0) | (writeable ? EPOLLOUT : 0) | EPOLLET;if(ConnectionIsExists(sockfd)){// 1. 修改我们写的connection关心的事件_connections[sockfd]->SetEvents(events);// 2. 写透到内核中_epller.ModEvent(sockfd, events);}}//移除连接void RemoveConnection(int sockfd){if(!ConnectionIsExists(sockfd)) return;//去掉epoll中对事件的关心_epller.DelEvent(sockfd);//服务器关闭sockfd_connections[sockfd]->Close();//释放connectionsdelete _connections[sockfd];_connections.erase(sockfd);}//单次循环处理事务void LoopOnce(int timeout){ int n = _epller.Wait(revs, gnum, timeout);//等待事件的发生for (int i = 0; i < n; i++){   //处理发生的事件int sockfd = revs[i].data.fd;uint32_t revents = revs[i].events;if (revents & EPOLLHUP) //文件符被挂断时revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLERR) //文件符出现错误时revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLIN) //可读时{//判断连接是否存在并且if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_recver != nullptr)){_connections[sockfd]->_recver(_connections[sockfd]);//调用处理接收函数}}if (revents & EPOLLOUT) //可写时{if (ConnectionIsExists(sockfd) && (_connections[sockfd]->_sender != nullptr)){_connections[sockfd]->_sender(_connections[sockfd]);}}}}// 事件派发核心函数void Dispatcher(){_isrunning = true;//int timeout = -1;//表示阻塞等待事件的发生int timeout = 3000;//3s为周期等待事件的发生while (_isrunning){LoopOnce(timeout);// 处理其他事情Debug();}_isrunning = false;}void Debug(){std::cout << "------------------------------------" << std::endl;for(auto &connection : _connections){std::cout << "fd : " << connection.second->Sockfd() << ", ";uint32_t events = connection.second->Events();if((events & EPOLLIN) && (events & EPOLLET))std::cout << "EPOLLIN | EPOLLET, ";if((events & EPOLLOUT) && (events & EPOLLET))std::cout << "EPOLLOUT | EPOLLET";std::cout << std::endl;}std::cout << "------------------------------------" << std::endl;}~Reactor() {}
private:std::unordered_map<int, Connection *> _connections; // int : sockfdstruct epoll_event revs[gnum]; //事件信息的数组Epoller _epller;//一个epollbool _isrunning;//是否运行
};

成员变量

  • _connections: 一个unordered_map,用于存储与每个文件描述符(sockfd)相关联的Connection对象。
  • revs: 一个epoll_event数组,用于从epoll实例中接收事件。
  • _epller: 一个Epoller对象,负责与epoll接口进行交互。
  • _isrunning: 一个布尔值,表示Reactor是否正在运行。

成员函数

  • AddConnection: 添加一个新的连接到Reactor中,包括构建Connection对象、设置事件、注册处理函数,并将连接添加到_connections映射中。同时,通过_epller对象将连接的文件描述符添加到epoll的监听列表中。
  • ConnectionIsExists: 检查给定的文件描述符是否存在于_connections映射中。
  • EnableReadWrite: 修改指定连接关心的事件(读或写),并更新epoll中的事件监听。
  • RemoveConnection: 从Reactor中移除一个连接,包括从epoll中删除事件监听、关闭文件描述符、删除Connection对象,并从_connections映射中移除。
  • LoopOnce: 等待并处理一次epoll事件循环中的事件。这包括读取事件、检查文件描述符的状态(如挂断或错误),并调用相应的处理函数(接收或发送)。
  • Dispatcher: Reactor的主循环函数,不断调用LoopOnce来处理事件,直到_isrunning变为false。
  • Debug: 打印当前Reactor中所有连接的状态和它们关心的事件。

Main.cc

这是所写头文件的逻辑思路
在这里插入图片描述

#include <iostream>
#include <memory>
#include "Reactor.hpp"
#include "Connection.hpp"
#include "Listener.hpp"
#include "PackageParse.hpp"
#include "HandlerConnection.hpp"
#include "Log.hpp"int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " port" << std::endl;return 0;}uint16_t port = std::stoi(argv[1]);EnableScreen();std::unique_ptr<Reactor> react = std::make_unique<Reactor>(); // 主服务HandlerConnection hc(PackageParse::Parse); //处理连接相关函数的对象Listener listener(port,hc);//监听注册//反应堆添加监听连接react->AddConnection(listener.Sockfd(),EPOLLIN| EPOLLET,std::bind(&Listener::Accepter,&listener,std::placeholders::_1),nullptr,nullptr);react->Dispatcher();//事件派发
}
  • 创建一个Reactor类的智能指针实例,这是主服务组件。
  • 创建一个HandlerConnection对象hc,它使用PackageParse::Parse函数来处理数据包的解析。
  • 创建一个Listener对象listener,负责监听指定端口上的连接请求,并将新的连接请求通过hc(连接处理器)处理。
  • 通过react->AddConnection()方法,将监听套接字(listener.Sockfd())注册到Reactor中,设置监听事件为读事件(EPOLLIN)和边缘触发模式(EPOLLET),并绑定Listener::Accepter方法作为事件处理函数。这里使用了std::bind来绑定Listener对象的Accepter成员函数。
  • 调用react->Dispatcher()开始事件分发循环,这是Reactor模式的核心,它不断监听事件并调用相应的事件处理函数。

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/457129.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UDP(用户数据报协议)端口监控

随着网络的扩展&#xff0c;确保高效的设备通信对于优化网络功能变得越来越重要。在这个过程中&#xff0c;端口发挥着重要作用&#xff0c;它是实现外部设备集成的物理连接器。通过实现数据的无缝传输和交互&#xff0c;端口为网络基础设施的顺畅运行提供了保障。端口使数据通…

vuex使用modules模块化

1、main.js引入 //引入vuex import store from ./store new Vue({el: #app,router,store,components: { App },template: <App/>,data:function(){return{wbWinList: [] // 定义的变量&#xff0c;全局参数}}, })2、index.js import Vue from vue; import Vuex from …

python实战(一)——iris鸢尾花数据集分类

一、任务背景 本文是python实战系列专栏的第一篇文章&#xff0c;我们将从分类开始由浅入深逐步学习如何使用python完成常规的机器学习/深度学习任务。iris数据集是经典的机器学习入门数据集&#xff0c;许多分类任务教程都会以这个数据集作为示例&#xff0c;它的数据量是150条…

路由器 相关知识

一、路由器是什么 参考&#xff1a;图解系列--路由器和它庞大的功能_路由功能-CSDN博客 路由器是指&#xff1a;主要负责 OSI参考模型中网络层的处理工作&#xff0c;并根据路由表信息在不同的网络 之间转发IP 分组的网络硬件(图3-1)。这里的网络一般是指IP 子网&#xff0c;…

Redis 发布订阅 总结

前言 相关系列 《Redis & 目录》&#xff08;持续更新&#xff09;《Redis & 发布订阅 & 源码》&#xff08;学习过程/多有漏误/仅作参考/不再更新&#xff09;《Redis & 发布订阅 & 总结》&#xff08;学习总结/最新最准/持续更新&#xff09;《Redis &a…

回顾项目测试全过程,测试如何回答“测完了吗?”

“测完了吗&#xff1f;” 是系统测试岗位同学经常被问到的问题&#xff0c;提问的人可能是合作的研发&#xff0c; 合作的产品经理&#xff0c;甚至是项目的业务方&#xff0c;也有可能是测试自己。 这个问题至少有两层意思&#xff0c;不仅问新功能测试进度是否完成&#xf…

qt QMediaPlaylist

QMediaPlaylist 是 Qt Multimedia 模块中的一个类&#xff0c;用于管理媒体文件的播放列表。它提供了一种方便的方式来组织和控制多媒体内容的播放&#xff0c;如音频和视频文件。 主要方法 QMediaPlaylist(00bject *parent nullptr):构造一个新的媒体播放列表对象。void add…

论文解析八: GAN:Generative Adversarial Nets(生成对抗网络)

目录 1.GAN&#xff1a;Generative Adversarial Nets&#xff08;生成对抗网络&#xff09;1、标题 作者2、摘要 Abstract3、导言 IntroductionGAN的介绍 4、相关工作 Related work5、模型 Adversarial nets总结 6.理论计算 Theoretical Results具体算法公式全局优化 Global O…

【Java网络编程】从套接字(Socket)概念到UDP与TCP套接字编程

目录 网络编程 1.socket套接字 2.udp数据报套接字编程 DatagramSocket API DatagramPacket API Java基于UDP实现客户端-服务器代码实例 3.tcp流套接字编程 ServerSocket API Socket API TCP中的长短连接 Java基于TCP客户端-服务器代码实例 网络编程 1.socket套接字 S…

正则表达式基本语法(快速认知)

正则表达式:一种用于匹配字符串的模式。它可以用于搜索、替换、验证字符串等多种操作。 基本语法: 字符类: [abc]: 匹配 a、b 或 c。[a-z]: 匹配小写字母。[A-Z]: 匹配大写字母。[0-9]: 匹配数字母 比如我们的电话号码是11个数字组成, 则可以表示为: String tel"[0-9][0…

uniapp 引入了uview-ui后,打包错误,主包过大解决方案

原因&#xff1a;由于使用uniapp来设计小程序&#xff0c;使用uview的组件库&#xff0c;导致了主包过大&#xff0c;无法打包 前提条件&#xff1a;已经完成了分包&#xff0c;如果还没有分包的先分包&#xff0c;需要上传代码时用到 1. 通常情况&#xff0c;大多数都是通过点…

构建中小企业设备管理平台:Spring Boot应用

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

ReactOS系统中平衡二叉树按从左到右的顺序找到下一个结点

ReactOS系统中平衡二叉树按从左到右的顺序找到下一个结点MmIterateNextNode()按从左到右的顺序找到下一个结点 文章目录 ReactOS系统中平衡二叉树按从左到右的顺序找到下一个结点MmIterateNextNode()按从左到右的顺序找到下一个结点MmIterateNextNode() MmIterateNextNode() /*…

深入剖析 C 与 C++ 动态内存管理之术

亲爱的读者朋友们&#x1f603;&#xff0c;此文开启知识盛宴与思想碰撞&#x1f389;。 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f970;&#xff0c;共创活力社区。 &#x1f525;&#x1f525;&#x1f525;【C】进阶&#xff1a;类相关…

Python实现基于WebSocket的stomp协议调试助手工具

stomp协议很简单&#xff0c;但是搜遍网络竟没找到一款合适的客户端工具。大多数提供的都是客户端库的使用。可能是太简单了吧&#xff01;可是即便这样&#xff0c;假如有一可视化的工具&#xff0c;将方便的对stomp协议进行抓包调试。网上类似MQTT的客户端工具有很多&#xf…

linux shell 脚本语言教程(超详细!)

Shell 编程详细指南 什么是 Shell&#xff1f; Shell 是用户与操作系统内核之间的接口&#xff0c;允许用户通过命令行输入来控制操作系统。它充当命令解释器&#xff0c;读取用户输入的命令并执行相应的操作。Shell 提供了强大的脚本编程能力&#xff0c;可以自动化许多任务…

【javax maven项目缺少_Maven的依赖管理 引入依赖】

javax maven项目缺少_Maven的依赖管理 引入依赖 Maven的依赖管理 - 引入依赖依赖管理(引入依赖)导入依赖 https://blog.csdn.net/weixin_28932089/article/details/112381468 Maven的依赖管理 - 引入依赖 依赖管理(引入依赖) 能够掌握依赖引入的配置方式 导入依赖 导入依赖练…

银行客户贷款行为数据挖掘与分析

#1024程序员节 | 征文# 在新时代下&#xff0c;消费者的需求结构、内容与方式发生巨大改变&#xff0c;企业要想获取更多竞争优势&#xff0c;需要借助大数据技术持续创新。本文分析了传统商业银行面临的挑战&#xff0c;并基于knn、逻辑回归、人工神经网络三种算法&#xff0…

重构案例:将纯HTML/JS项目迁移到Webpack

我们已经了解了许多关于 Webpack 的知识&#xff0c;但要完全熟练掌握它并非易事。一个很好的学习方法是通过实际项目练习。当我们对 Webpack 的配置有了足够的理解后&#xff0c;就可以尝试重构一些项目。本次我选择了一个纯HTML/JS的PC项目进行重构&#xff0c;项目位于 GitH…

[旧日谈]高清画面撕裂问题考

背景 无边框透明背景透明的窗口&#xff0c;在随着缩放比例非整数倍数放大时的画面发生了露底、撕裂问题。 当我们在使用Qt开发的时候&#xff0c;遇到了一个结构性问题。因为我们的软件是自己做的&#xff0c;所以要自己定义标题栏&#xff0c;所以我们设置了软件为FrameLess…