Reactor介绍,如何从简易版本的epoll修改成Reactor模型(demo版本代码+详细介绍)

目录

Reactor demo​​​​​​​

引入

比喻 

修改代码

connection

tcp_server

ET模式

主逻辑

处理事件

运行结果

代码

完善功能

读取数据

运行结果

​编辑

代码

处理数据

回指指针 

如何处理写事件

引入

循环内

处理对写事件的关心

异常处理

代码

server.hpp

server.cpp

运行结果


Reactor demo

引入

Reactor 模式是一种设计模式,广泛应用于处理高效的事件驱动编程和网络编程中

  • 用于管理和分发来自不同事件源的事件
  • 核心目的是优化系统在处理大量事件时的性能,特别是在 I/O 操作密集的环境中

Reactor 是一个半同步半异步模型 

  • reactor直译过来就是反应堆
  • 同步 -- 调用epoll接口等待的过程
  • 异步 -- 事件以回调方式进行处理

比喻 

回想起我们曾经玩过的打地鼠游戏:

  • 整个游戏界面就是reactor模型,操作的人就是多路转接方案,检测每个洞(连接)有没有地鼠出来(事件就绪),一旦出来就砸他(执行注册好的回调方法)
  • 这里我们写的代码算是同步,因为需要在内部处理事件
  • 如果要写异步,可以搞个线程池,将收到的数据直接push进任务队列中,交给线程池处理,我们只进行io,然后在内部搞个字段来接收线程池返回的结果

这里我们只是写一个demo版本的(当然,demo版本也很麻烦)

(大家如果遇到什么问题,可以评论交流)

修改代码

之前我们只调用一次read,无法确定是否读完了一份完整数据,并且只有读功能 -- epoll接口使用 -- 非阻塞式网络io(仅读事件)-CSDN博客

  • 在这份代码中,我们并没有保证读完所有数据,并且也不会有机会拿到完整数据,因为每次读取都会创建新的临时缓冲区
  • 所以,我们需要把没读完的数据临时缓存起来

因为应用层上存在大量连接,每个连接都对应一个套接字文件,这些连接都会遇到这个问题

  • 所以需要给每个文件都设置输入输出缓冲区,并定义结构来管理

这次主要有两个模块:

connection

对应我们上面说的,是对文件的管理结构,结合网络通信+epoll,可以确定里面的成员变量:

  • 要有每个连接对应的套接字fd
  • 缓冲区肯定输入输出都要有(这里我们就用string就行,虽然它并不适合处理二进制流,应该用vector,但vector会有很多拷贝,所以方便起见,还是用string)
  • 可以将处理读/写/异常事件就绪时的回调函数也放在里面,刚好可以实现自定义特定文件的处理方式 -- 这样可以使用类内的缓冲区,而不是在读数据时,调用公共函数,将数据添加到公共临时缓冲区中
  • 定义一个回指指针,指向tcp_server(按下不表,在后续说明)

tcp_server

是我们服务器的类

  • 肯定先要包括之前我们封装好的epoll接口和socket接口对象

需要管理多个连接,也就是需要一个结构来将多个connection对象组织起来

  • 使用unordered_map结构,建立fd->连接结构的映射关系
  • 每次将新的要关注的文件添加到connections中,一旦有文件上的事件就绪,就可以通过fd,找到处理事件的方法

我们目前将读事件分为两类,所以需要两种回调函数

  • 获取新连接 和 读取数据
  • 那么,最好是先定义出针对各种类型的处理函数,然后根据文件类型,手动设置好我们需要的方法
  • 因为我们是在服务器内部进行回调函数的设置,所以将回调函数定义在类内,使用会更方便

以上,我们可以定义一个函数来解决,总的来说分为两步:

  • 将需要关注的[新文件上的特定事件]添加进epoll模型中 (内核层)
  • 将[新文件+如何处理特定事件]添加进connections中 (用户层)

除此之外,我们可以直接在类内定义一个struct epoll_event数组,存放从内核捞取出的就绪事件,然后交给事件派发器

ET模式

保证服务器以ET模式工作,要设置相应的标志位:

以及,为了保证全部读取,需要将fd设置为非阻塞io方式 -- fcntl()

主逻辑

服务器不断循环,循环过程中派发事件

  • 然后在派发器逻辑中,每次获取一个就绪事件,分辨是哪个文件上的哪个事件就绪了,然后调用注册好的回调函数

判断是否就绪:

  • 我们检测是否是读事件就绪,和epoll的工作模式无关,只检测EPOLLIN就行了

如果出现异常(EPOLLERR,EPOLLHUP),统一转化为读写问题(设置两个标记位)

  • 因为一旦出现异常,读写一定会受到影响,只要转化,就能在读写函数内部解决 ??

而且,在正式处理前,我们需要一个函数,来判断当前连接是否安全(是否是我们需要关注的文件),以及当前对应的处理函数是否被设置

处理事件

连接/数据到来时,我们无法确定只有一个连接/一份数据,并且这里是在ET模式下,必须要读取出所有连接

  • 所以,我们写一个循环来获取连接/读取数据,直到读完  

运行结果

我们从本地连接上该服务,会发现连接成功:

最后一条的警告,是我们在socket.hpp中封装的accept里显示的,因为此时是非阻塞式+底层没有数据,所以系统调用的accept走了返回值<0的情况,然后打印到日志上

  • 我们可以在内部添加判断语句,如果错误码=EAGAIN,就不打印日志

代码

#pragma once#include <memory>
#include <errno.h>
#include <string>
#include <functional>
#include <fcntl.h>#include "Log.hpp"
#include "socket.hpp"
#include "myepoll.hpp"#include <unordered_map>class connection;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区
{int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 回指指针
public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}int get_fd(){return fd_;}private:
};class epoll_server
{static const int def_timeout = 1000;static const int def_num = 64;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port): port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb){// 添加到connections中 -- 用户层connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb)));// 添加到epoll模型 -- 内核p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}private:void accept(std::shared_ptr<connection> conn) // 处理连接事件{// 连接到来时,我们要循环处理,直到无数据while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(INFO, "get a new connection ,fd : %d\n", sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1), nullptr, nullptr);}else{// 如果底层无数据,也会错误返回,并设置错误码11if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void receiver(std::shared_ptr<connection> conn){}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %s is not safe\n", fd);return;}int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读取回调存在{connections_[fd]->read_cb_(connections_[fd]); // 调用读回调}if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果读取回调存在{connections_[fd]->write_cb_(connections_[fd]); // 调用读回调}}bool is_safe(int fd){auto it = connections_.find(fd);if (it != connections_.end()){return true;}else{return false;}}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}
};

完善功能

读取数据

完善普通文件的回调函数(监听套接字只需要处理读事件,但通信用的套接字需要三种事件都处理)

我们先写好读取数据的函数:

  • 循环读取至读完全部数据
  • 读取一段就放入输入缓冲区中(服务器不应该关心数据格式/是否是一份完整数据,只要把数据全拿到手就行)
  • 因为我们是非阻塞式,所以一旦读取完毕,会返回错误,我们需要将返回值<0的情况分类: 读完数据 / 因异常信号中断读取 / 真的出错
  • 然后,我们在真的出错时,调用异常处理函数
  • 同理,在对方关闭连接时,也需要进入异常处理阶段

这里为了日志好看,可以在connection结构中增加两个字段 -- ip和port

运行结果

可以看到,随着我们的输入,打印出的[输入缓冲区的内容]变得更多:

代码

​
#pragma once#include <memory>
#include <errno.h>
#include <string>
#include <functional>
#include <fcntl.h>#include "Log.hpp"
#include "socket.hpp"
#include "myepoll.hpp"#include <unordered_map>class connection;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区
{int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 方便日志打印std::string ip_;uint16_t port_;// 回指指针
public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}int get_fd(){return fd_;}void append(const std::string &str){in_buffer_ += str;}std::string& inbuffer(){return in_buffer_;}private:
};class epoll_server
{static const int def_timeout = 1000;static const int def_num = 64;static const int def_buffsize = 128;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port): port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb){// 添加到connections中 -- 用户层connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb)));// 添加到epoll模型 -- 内核p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}private:void accept(std::shared_ptr<connection> conn) // 处理连接事件{// 连接到来时,我们要循环处理,直到无数据while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1),std::bind(&epoll_server::sender, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1));}else{// 如果底层无数据,也会错误返回,并设置错误码11if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void receiver(std::shared_ptr<connection> conn){while (true) // 读取至底层无数据{char buffer[def_buffsize];int n = read(conn->get_fd(), buffer, sizeof(buffer) - 1);if (n > 0) // 还没读完{buffer[n] = 0;conn->append(buffer);}else if (n == 0) // 对方关闭连接{lg(INFO, "sockfd: %d, client info %s:%d quit...", conn->get_fd(), conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}else // 出错/读完{if (errno == EAGAIN) // 读完全部数据{break;}else if (errno == EINTR){continue;}else // 真的出错{conn->except_cb_(conn); // 关注异常事件return;}}}}void sender(std::shared_ptr<connection> conn) {}void excepter(std::shared_ptr<connection> conn) {}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %s is not safe\n", fd);return;}int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读回调存在{connections_[fd]->read_cb_(connections_[fd]); // 调用读回调}if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果写回调存在{connections_[fd]->write_cb_(connections_[fd]); // 调用写回调}print(connections_[fd]);}bool is_safe(int fd){auto it = connections_.find(fd);if (it != connections_.end()){return true;}else{return false;}}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}void print(std::shared_ptr<connection> conn){std::cout << "fd: " << conn->get_fd() << " , ";std::cout << "in_buffer: " << conn->inbuffer().c_str() << std::endl;}
};​

处理数据

虽然我们存入了数据,但我们还没有处理数据

处理数据应该交给用户来决定(也就是使用回调函数,在参数中传入connection对象即可,里面包含该文件读到的所有数据),因为这部分属于应用层的事情

  • 检测数据是否完整(协议定制,序列化/反序列化)
  • 如果包含一份完整数据,进行处理(具体业务处理)

我们将之前写过的网络计算机代码拿过来用 -- 网络计算器(使用json序列化/反序列化,条件编译,注意点),json介绍+语法介绍_json序列化和反序列化工具-CSDN博客

网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客

  • 直接定义一个函数,使用自定义协议来处理数据(将输入缓冲区中的数据做处理,然后把结果写回输出缓冲区)
  • 然后在实例化服务器时,将该函数作为实参传进去
  • 这样,就可以在读取完数据之后进行处理,并且发送 (注意,发送数据应该由服务器来做,也就需要我们的回指指针发挥用处 -- 回指服务器,然后由服务器调用发送函数)
回指指针 

因为connection和epoll_server会互相引用

  • server里保存了全部connection结构的指针,用于管理
  • 而connection里也需要引用epoll_server,来使用里面的函数
  • 这样就造成了循环引用问题

所以,需要我们将connection里存放的回指指针的类型定义成weak_ptr,当需要使用时再转换为shared_ptr

  • 而我们传参时,必须通过shared_ptr来转换为weak_ptr,所以我们调用shared_from_this()来返回当前对象的shared_ptr
  • 而调用该函数的前提是,让需要使用的类继承enable_shared_from_this这个模板类

如何处理写事件

引入

因为写事件关注的是发送缓冲区是否有空间

  • 缓冲区经常都是有空间的,所以写事件经常会就绪
  • 而一旦事件就绪,wait就会返回
  • 但我们通常真正关心的是"是否有数据可以发送",而不是"是否有空间"
  • 所以写事件,要按需设置是否关心 -- 代码体现

对于读事件来说,我们设置常关心

  • 因为读事件看的就是有无数据
循环内

那我们该如何写入呢?

  • 直接调用写函数(send/write)
  • 并且要像读数据一样,需要把输出缓冲区内的数据全部写进文件的发送缓冲区中才行
  • 所以要循环写入

并且,和读取不一样,需要我们手动删除输出缓冲区中的数据

  • 因为读取是从内核读到用户层,内核会自动帮我们删除已读出的数据

接下来,说说函数返回值的问题:

  • 如果返回值>0,说明此时成功写入了数据,需要我们删除已经写入的数据(如果已经将数据全部写完了,退出循环)
  • 返回值为0,说明此时缓冲区内没有数据,压根没有写入数据,直接返回
  • 注意这里[退出循环]和[直接返回]的区别,因为我们要在循环结束后,处理对写事件的关心

发送出错,分几种情况(和处理读数据一样)

  • 底层缓冲区没有空间了,返回EWOULDBLOCK(=EAGIN=11) 
  • 被信号中断
  • 真的出错
处理对写事件的关心

出循环后分为两种情况:

如果outbuffer里还有数据没写完 -- 设置对写事件的关心 

  • 因为此时受限于底层的缓冲区空间,所以需要关注写事件
  • 一旦发送缓冲区有空间了,就会通知我们,然后回调我们的写处理函数,继续发送数据

outbuffer里的数据已经被写完了 -- 取消对写事件的关心

  • 数据已经写完了,即使有空间也不需要,所以不用关注

以上可以自定义一个使能事件的函数,可以自主决定是否开启读/写事件

  • 是否开启 -- bool类型字段
  • 然后在内部调用epoll_ctl函数,来修改特定文件对事件的关注

异常处理

一旦走到异常处理的函数中,一定是出问题了

  • 那就直接移除epoll中对该文件上事件的关心
  • 关闭这个连接
  • 从自定义的连接管理结构中移除

代码

server.hpp
#pragma once#include <memory>
#include <errno.h>
#include <string>
#include <functional>
#include <fcntl.h>
#include <unordered_map>#include "Log.hpp"
#include "socket.hpp"
#include "myepoll.hpp"class connection;
class epoll_server;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区
{int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 方便日志打印std::string ip_;uint16_t port_;// 使用 weak_ptr 防止循环引用std::weak_ptr<epoll_server> p_svr_;public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}void set_p_svr(std::weak_ptr<epoll_server> ptr){p_svr_ = ptr;}int get_fd() { return fd_; }void in_append(const std::string &str) { in_buffer_ += str; }void out_append(const std::string &str) { out_buffer_ += str; }std::string &inbuffer() { return in_buffer_; }std::string &outbuffer() { return out_buffer_; }
};class epoll_server : public std::enable_shared_from_this<epoll_server>, public no_copy
{static const int def_timeout = 1000;static const int def_num = 64;static const int def_buffsize = 128;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;func_t handle_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port, func_t handle): port_(port), handle_(handle), p_listen_sock_(new MY_SOCKET()), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb, const std::string &ip = "0.0.0.0", uint16_t port = 0){std::shared_ptr<connection> new_connection(new connection(fd, read_cb, write_cb, except_cb));new_connection->set_p_svr(shared_from_this()); // shared_from_this(): 返回当前对象的shared_ptr,要确保epoll_server已经以shared_ptr的形式存在(主函数中以shared_ptr形式实例化对象)new_connection->ip_ = ip;new_connection->port_ = port;connections_.insert(std::make_pair(fd, new_connection));p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}void receiver(std::shared_ptr<connection> conn){int fd = conn->get_fd();while (true) // 读取至底层无数据{char buffer[def_buffsize];memset(buffer, 0, sizeof(buffer));int n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0) // 还没读完{conn->in_append(buffer);}else if (n == 0) // 对方关闭连接{lg(INFO, "sockfd: %d, client info %s:%d quit", fd, conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}else // 出错/读完{if (errno == EAGAIN) // 读完全部数据{break;}else if (errno == EINTR){continue;}else // 真的出错{lg(WARNING, "sockfd: %d, client info %s:%d error", fd, conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}}}// 读完了数据,就该处理了,但不一定包含了一份完整报文handle_(conn);}void excepter(std::shared_ptr<connection> conn){int fd = conn->get_fd();lg(WARNING, "Excepter hander sockfd: %d, client info %s:%d excepter handler", fd, conn->ip_.c_str(), conn->port_);p_epoll_->ctl(EPOLL_CTL_DEL, fd, 0);close(fd);lg(DEBUG, "close %d done\n", fd);connections_.erase(fd);lg(DEBUG, "remove %d from _connections done\n", fd);}void sender(std::shared_ptr<connection> conn){auto &buffer = conn->outbuffer();int fd = conn->get_fd();while (true){ssize_t n = write(fd, buffer.c_str(), buffer.size()); // 将输出缓冲区的内容写入内核if (n > 0)                                            // 写入一定数据{buffer.erase(0, n);if (buffer.empty()) // 数据写完了{break;}}else if (n == 0) // 没有数据可写{return;}else{if (errno == EAGAIN){break;}else if (errno == EINTR){continue;}else{lg(WARNING, "sockfd: %d, client info %s:%d send error...", conn->get_fd(), conn->ip_.c_str(), conn->port_);conn->except_cb_(conn);return;}}}// 判断接下来是否需要关注写事件if (buffer.empty()){enable_event(fd, true, false);}else{enable_event(fd, true, true);}}private:void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void accept(std::shared_ptr<connection> conn) // 处理连接事件{while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1),clientip, clientport);}else{if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %d is not safe\n", fd);return;}auto conn = connections_[fd];if (!conn)return;int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && conn->read_cb_) // 如果读回调存在{conn->read_cb_(conn); // 调用读回调}if ((event & EPOLLOUT) && conn->write_cb_) // 如果写回调存在{conn->write_cb_(conn); // 调用写回调}}bool is_safe(int fd){return connections_.find(fd) != connections_.end(); // 是否在connections结构中存在}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}void enable_event(int fd, bool f_read, bool f_write){if (fd < 0){lg(ERROR, "Invalid file descriptor: %d", fd);return;}uint32_t event = 0;event |= ((f_read ? EPOLLIN : 0) | (f_write ? EPOLLOUT : 0) | EPOLLET);p_epoll_->ctl(EPOLL_CTL_MOD, fd, event);}
};
server.cpp
#include "server.hpp"
#include "cal.hpp"void def_handle(std::weak_ptr<connection> conne)
{if (conne.expired())return;auto conn = conne.lock();calculate Cal;std::string str = Cal.cal(conn->inbuffer()); // 处理数据,得到结果if (str.empty()){return;}//lg(DEBUG, "get data: %s\n", str.c_str());conn->out_append(str); // 添加到输出缓冲区//lg(DEBUG, "out_append success\n");// 写入auto server = conn->p_svr_.lock(); // weak_ptr不拥有对象的所有权,需要转换为shared_ptrserver->sender(conn);             // 需要让服务器调用写处理函数,后续让服务器擦屁股(也许没有写入全部数据)//lg(DEBUG, "sender success\n");
}int main()
{std::shared_ptr<epoll_server> epoll_svr(new epoll_server(8080, def_handle));epoll_svr->loop();return 0;
}

其他代码在压缩包里

运行结果

我们把网络计算器的客户端也拿过来 -- 网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客

直接做测试:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/427058.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 携手 7-Zip 命令行:大文件压缩的终极武器?

前言 嗨&#xff0c;大家好&#xff01; 今天咱们来聊聊如何用 C# 调用 7-Zip 命令行来压缩大文件&#xff0c;这是个既高效又稳定的好办法&#xff0c;亲测有效&#xff01; 在实际工作中&#xff0c;压缩文件几乎是家常便饭&#xff0c;但可惜的是&#xff0c;许多常用的方…

【科技论文写作与发表】论文分类

目录 一、实验性论文特点写作结构适用学科方向 二、报道性论文特点写作结构适用学科方向 三、理论性论文特点写作结构适用学科方向 一、实验性论文 通过科学实验获得的数据和结果进行详细阐述和分析&#xff0c;检验某一科学理论或假说。 特点 详细描述实验目的、实验设计、实…

ORM框架详解:为什么不直接写SQL?

想象一下&#xff0c;你正在开发一个小型的在线书店应用。你需要存储书籍信息、用户数据和订单记录。作为一个初学者&#xff0c;你可能会想&#xff1a;“我已经学会了SQL&#xff0c;为什么还要使用ORM框架呢&#xff1f;直接写SQL语句不是更简单、更直接吗&#xff1f;” 如…

Uniapp的alertDialog返回值+async/await处理确定/取消问题

今天在使用uniui的alertDialog时&#xff0c;想添加一个确定/取消的警告框时 发现alertDialog和下面的处理同步进行了&#xff0c;没有等待alaertDialog处理完才进行 查询后发现问题在于 await 关键字虽然被用来等待 alertDialog.value.open() 的完成&#xff0c;但是 alertDi…

前端mock了所有……

目录 一、背景描述 二、开发流程 1.引入Mock 2.创建文件 3.需求描述 4.Mock实现 三、总结 一、背景描述 前提&#xff1a; 事情是这样的&#xff0c;老板想要我们写一个demo拿去路演/拉项目&#xff0c;有一些数据&#xff0c;希望前端接一下&#xff0c;写几个表格&a…

vs code 跳转很慢

查看结构体、接口等非常之慢。c/c语言服务功能使用了智能引擎所致&#xff0c;设置为模糊检索即可。 修改如下&#xff1a; 1.打开"文件"&#xff0c;选"首选项"&#xff0c;"设置" 2.弹出的窗口中搜索 "C_Cpp.intelliSenseEngine" …

TCP客户端编码和解码处理:发送和接收指定编码消息

文章目录 引言基于Netty实现TCP客户端Netty发送GBK编码指令Netty接收GBK编码基于Channel发送指令基于ChannelHandlerContext发送指令:建立连接时发送登陆指令开启日志,查看报文信息基于ChannelInboundHandlerAdapter进行业务逻辑处理原生API实现TCP客户端基于DataOutputStrea…

C语言 | Leetcode C语言题解之题409题最长回文串

题目&#xff1a; 题解&#xff1a; int longestPalindrome(char * s) {int c[128]{0},ret0;for(int i0;i<strlen(s);i){c[s[i]];}for(int i0;i<128;i){retc[i]-c[i]%2;}return ret(ret!strlen(s)); }

gazebo 仿真阶段性问题汇总二

目录 写在前面的话遇到的问题问题一&#xff1a;启动了多个 robot_state_publisher解决办法 问题二&#xff1a;rviz 启动报错解决办法 问题三&#xff1a;rviz 中 wheel 一直指向 base_link解决方法 问题四&#xff1a;摄像头和opencv坐标系的问题解决方法 问题五&#xff1a;…

JavaWeb笔记整理——Redis

目录 Redis数据类型 各种数据类型的特点 Redis常用命令 字符串操作命令 哈希操作命令 列表操作命令 集合操作命令 有序集合操作命令 通用命令 在Java中操作Redis Spring Data Redis的使用方式 操作字符串类型的数据 ​编辑操作hash类型的数据 ​编辑 操作列表类…

Rasa对话模型——做一个语言助手

1、Rasa模型 1.1 模型介绍 Rasa是一个用于构建对话 AI 的开源框架&#xff0c;主要用于开发聊天机器人和语音助手。Rasa 提供了自然语言理解&#xff08;NLU&#xff09;和对话管理&#xff08;DM&#xff09;功能&#xff0c;使开发者能够创建智能、交互式的对话系统。 1.2…

《中国数据库前世今生》纪录片观感:从古至今数据库的演变与未来

我的数据库之路&#xff1a;从新手到稳步前行 三年数据库开发的经历&#xff0c;让我从一名菜鸟程序员逐步成长为能够独立解决问题的开发者。这段时间里&#xff0c;我经历过迷茫、困惑&#xff0c;也感受过技术攻关后的成就感。最近看了腾讯云推出的《中国数据库前世今生》纪…

ARM相关概念

ARM课程大纲 ARM相关的基本概念 机器码 计算机能够识别由1和0组成的编码格式 汇编&#xff1a;将汇编文件转换为二进制文件&#xff08;.bin/.elf&#xff09; 汇编指令 是一条具备特殊功能的指令 编译&#xff1a;生成汇编文件 int a 10; ------> mov r0 #10 …

架构师:在 Spring Cloud 中实现全局异常处理的技术指南

1、简述 在分布式系统中,微服务架构是最流行的设计模式之一。Spring Cloud 提供了各种工具和库来简化微服务的开发和管理。然而,随着服务的增多,处理每个服务中的异常变得尤为复杂。因此,实现统一的全局异常处理成为了关键。本篇博客将介绍如何在 Spring Cloud 微服务架构…

Docker日志管理

ELK Filebeat Filebeat 是 ELK 组件的新成员&#xff0c; 也是 Beat 成员之一。基于 Go 语言开发&#xff0c; 无任何依赖&#xff0c; 并且比 Logstash 更加轻量&#xff0c; 不会带来过高的资源占用&#xff0c; 非常适合安装在生产机器上。轻量意 味着简单&#xff0c;Fileb…

51单片机-DA(数字转模拟)

作者&#xff1a;Whappy 个人理解&#xff1a;将电压或电流信号进行等分或不等分&#xff08;高电平的电压范围和低电平的范围&#xff0c;如0-5v&#xff0c;0-1.8位低电平&#xff0c;3.8-5v为高电平&#xff09;&#xff0c;同样也是通过采样&#xff0c;量化等操作将不连续…

苍穹外卖Day01-2

目录 导入接口文档 创建项目​编辑 导入接口文件 ​编辑 导入结果界面​编辑 Swagger 介绍 如何集成 Swagger&#xff1f; 1.添加依赖 2.配置 Swagger 3.创建 Swagger 配置类 4.使用注解生成文档 5.访问 Swagger UI 6.Swagger 的优势 导入接口文档 yApi接口管理平台h…

Oracle从入门到放弃

Oracle从入门到放弃 左连接和右连接Where子查询单行子查询多行子查询 from子句的子查询select子句的子查询oracle分页序列序列的应用 索引PL/SQL变量声明与赋值select into 赋值变量属性类型 异常循环游标存储函数存储过程不带传出参数的存储过程带传出参数的存储过程 左连接和…

【数据结构】排序算法系列——堆排序(附源码+图解)

堆排序 堆排序基于一种常见的**[[二叉树]]结构**&#xff1a;堆 我们前面讲到选择排序&#xff0c;它在待排序的n个记录中选择一个最小的记录需要比较n一1次。本来这也可以理解&#xff0c;查找第一个数据需要比较这么多次是正常的&#xff0c;否则无法知道它是最小的记录。 …

BCLinux您的授权码是无效的,请获得正确的授权码来注册大云Linux操作系统

更新yum源老弹出这个&#xff0c;很烦人。 [rootlocalhost yum.repos.d]# yum clean all 服务器检查结果: ***信息***您的授权码是无效的&#xff0c;请获得正确的授权码来注册大云Linux操作系统。您可以使用bclinux-license -g命令获得机器码&#xff0c;然后与我们联系帮您产…