文章目录
- Reactor是什么?
- LT模式 VS ET模式
- 示例代码
提示:以下是本篇文章正文内容,下面案例可供参考
Reactor是什么?
Reactor模式是一种事件驱动的并发模型,它通过将事件处理逻辑与事件分发机制解耦,实现高性能、可扩展的并发处理。Reactor模式适用于大量短时连接或需要高效I/O处理的场景,如Web服务器、聊天服务器等。
今天我们所实现的Reactor是基于ET模式下的多路转接模式。
LT模式 VS ET模式
以我们的epoll为例,我们的epoll默认是LT模式。
LT模式:只要有事件就绪,就会不断提醒。
ET模式:只有事件从无到有,从少到多的情况,才会提醒一次。
所以对于ET模式而言,就需要逼服务器一次性将所有缓冲区数据全部读完,也算是逼着你效率提高。 而只提醒一次也是高效的表现。
示例代码
#include "Epoll.hpp"
#include "Socket.hpp"
#include <vector>
#include <functional>
#include <unordered_map>
#include "Common.hpp"
#include "Calculator.hpp"class Connection;
class ReactorServer;using func_t = std::function<void(std::shared_ptr<Connection>)>;
const std::string default_ip = "0.0.0.0";
const uint16_t default_port = 8080;#define EVENT_IN (EPOLLIN | EPOLLET)
#define EVENT_OUT (EPOLLOUT | EPOLLET)void SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0){// 获取失败perror("F_GETFD Error");return;}int n = fcntl(fd, F_SETFL, fl | O_NONBLOCK);if (n < 0){perror("Set Nonblock Error");}else{lg(Info, "Set Nonblock Succeed, Fd: %d", fd);// std::cout << "Fd:" << fd << " ,set nonblock done" << std::endl;}
}class Connection : public nocopy
{
public:Connection(int sockfd, ReactorServer *reactor_server): _sockfd(sockfd), _reactor_server(reactor_server) {}void SetHandle(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}int Getfd(){return _sockfd;}std::string &GetInBuffer(){return _inbuffer;}std::string &GetOutBuffer(){return _outbuffer;}~Connection() {}private:int _sockfd;std::string _inbuffer;std::string _outbuffer;ReactorServer *_reactor_server;public:func_t _recv_cb;func_t _send_cb;func_t _except_cb;
};class ReactorServer
{
private:const int num = 128;public:ReactorServer(uint16_t port, func_t handle_message): _port(port), _listensock(new Socket), _epoller(new Epoller), _handle_message(handle_message) {}void EnableEvent(int fd, bool recv, bool send){int events = 0;events |= (recv ? EVENT_IN : 0);events |= (send ? EVENT_OUT : 0);_epoller->EpollerUpdate(EPOLL_CTL_MOD, fd, events);}void Accepter(std::shared_ptr<Connection> connection){while (true){int newsock = _listensock->Accept();if (newsock < 0){if (errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}lg(Warning, "Accept Error...");break;}AddConnetion(newsock, EVENT_IN, std::bind(&ReactorServer::Recver, this, std::placeholders::_1),std::bind(&ReactorServer::Sender, this, std::placeholders::_1),std::bind(&ReactorServer::Excepter, this, std::placeholders::_1));}}void Recver(std::shared_ptr<Connection> connection){int fd = connection->Getfd();while (true){char buffer[1024];memset(buffer, 0, sizeof buffer);int n = recv(connection->Getfd(), buffer, sizeof buffer - 1, 0);if (n > 0){connection->GetInBuffer() += buffer;std::cout << connection->GetInBuffer();}else if (n < 0){if (errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;lg(Warning, "Read Error...");connection->_except_cb(connection);return;}else{lg(Info, "Foreign Host Closed...");connection->_except_cb(connection);return;}}_handle_message(connection);}void Sender(std::shared_ptr<Connection> connection){int fd = connection->Getfd();std::string &mes = connection->GetOutBuffer();while (1){int n = send(fd, mes.c_str(), mes.size(), 0);if (n < 0){if (errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}connection->_except_cb(connection);return;}if (n == 0){break;}mes.erase(0, n);if (mes.empty()){break;}}if (!mes.empty()){EnableEvent(fd, true, true);}else{EnableEvent(fd, true, false);}}void Excepter(std::shared_ptr<Connection> connection){int fd = connection->Getfd();// 1.先从内核中移除_epoller->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);// 2.从_connections中移除_connections.erase(fd);// 3.关闭sockfdclose(fd);lg(Info, "Sockfd: %d Closed...", fd);}void AddConnetion(int fd, int events, func_t recv_cb, func_t send_cb, func_t except_cb){// 1.设置非阻塞SetNonBlock(fd);// 2.创建新connectionstd::shared_ptr<Connection> newcon(new Connection(fd, this));newcon->SetHandle(recv_cb, send_cb, except_cb);// 3.插入到_connetcions_connections[fd] = newcon;// 4.放入内核_epoller->EpollerUpdate(EPOLL_CTL_ADD, fd, events);}void Init(){_epoller->Init();_listensock->Init();_listensock->Bind(AF_INET, default_ip, _port);_listensock->Listen();AddConnetion(_listensock->_sockfd, EVENT_IN, std::bind(&ReactorServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}bool IsConnectionSafe(int fd){return _connections.find(fd) == _connections.end() ? false : true;}void Start(){struct epoll_event recvs[num];while (1){int n = _epoller->EpollWait(recvs, num, -1);if (n > 0){// std::cout << "检测到事件 n:" << n << std::endl;for (int i = 0; i < n; i++){int fd = recvs[i].data.fd;int events = recvs[i].events;if (events & EPOLLERR){events |= EPOLLIN;}if (events & EPOLLHUP){events |= EPOLLIN;}if ((events & EPOLLIN) && IsConnectionSafe(fd)){if (_connections[fd]->_recv_cb){_connections[fd]->_recv_cb(_connections[fd]);}}if ((events & EPOLLOUT) && IsConnectionSafe(fd)){if (_connections[fd]->_send_cb){_connections[fd]->_send_cb(_connections[fd]);}}}}else if (n == 0){lg(Info, "Time Out...");}else{if (errno == EWOULDBLOCK)continue;lg(Warning, "Epoll error...");std::cout << "errno:" << errno << " strerror:" << strerror(errno) << std::endl;exit(1);}}}~ReactorServer() {}private:std::unique_ptr<Socket> _listensock;std::unique_ptr<Epoller> _epoller;std::unordered_map<int, std::shared_ptr<Connection>> _connections;uint16_t _port;func_t _handle_message;
};
#include "ReactorServer.hpp"void default_HandleMessage(std::shared_ptr<Connection> connection)
{
//根据服务器的服务内容编写此函数
//例如我这里想做一个计算器服务Calculator cal;std::string &inbuffer = connection->GetInBuffer();while (!inbuffer.empty()){std::string mes;int type;if (!CheckType(inbuffer, &type)){// 报文内容出现问题inbuffer = "";break;}if (type == 1){IntHandle(inbuffer, &mes);}else if (type == 2){DoubleHandle(inbuffer, &mes);}else{lg(Warning, "Type Error, type: %d ...", type);}connection->GetOutBuffer() += mes;connection->_send_cb(connection);}
}int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: ./selectServer port[8000-9000]" << std::endl;}ReactorServer ser(atoi(argv[1]), func_t(default_HandleMessage));ser.Init();ser.Start();return 0;
}