目录
Reactor demo
引入
比喻
修改代码
connection
tcp_server
ET模式
主逻辑
处理事件
运行结果
代码
完善功能
读取数据
运行结果
编辑
代码
处理数据
回指指针
如何处理写事件
引入
循环内
处理对写事件的关心
异常处理
代码
server.hpp
server.cpp
运行结果
Reactor demo
引入
Reactor 模式是一种设计模式,广泛应用于处理高效的事件驱动编程和网络编程中
- 用于管理和分发来自不同事件源的事件
- 核心目的是优化系统在处理大量事件时的性能,特别是在 I/O 操作密集的环境中
Reactor 是一个半同步半异步模型
- reactor直译过来就是反应堆
- 同步 -- 调用epoll接口等待的过程
- 异步 -- 事件以回调方式进行处理
比喻
回想起我们曾经玩过的打地鼠游戏:
- 整个游戏界面就是reactor模型,操作的人就是多路转接方案,检测每个洞(连接)有没有地鼠出来(事件就绪),一旦出来就砸他(执行注册好的回调方法)
- 这里我们写的代码算是同步,因为需要在内部处理事件
- 如果要写异步,可以搞个线程池,将收到的数据直接push进任务队列中,交给线程池处理,我们只进行io,然后在内部搞个字段来接收线程池返回的结果
这里我们只是写一个demo版本的(当然,demo版本也很麻烦)
(大家如果遇到什么问题,可以评论交流)
修改代码
之前我们只调用一次read,无法确定是否读完了一份完整数据,并且只有读功能 -- epoll接口使用 -- 非阻塞式网络io(仅读事件)-CSDN博客
- 在这份代码中,我们并没有保证读完所有数据,并且也不会有机会拿到完整数据,因为每次读取都会创建新的临时缓冲区
- 所以,我们需要把没读完的数据临时缓存起来
因为应用层上存在大量连接,每个连接都对应一个套接字文件,这些连接都会遇到这个问题
- 所以需要给每个文件都设置输入输出缓冲区,并定义结构来管理
这次主要有两个模块:
connection
对应我们上面说的,是对文件的管理结构,结合网络通信+epoll,可以确定里面的成员变量:
- 要有每个连接对应的套接字fd
- 缓冲区肯定输入输出都要有(这里我们就用string就行,虽然它并不适合处理二进制流,应该用vector,但vector会有很多拷贝,所以方便起见,还是用string)
- 可以将处理读/写/异常事件就绪时的回调函数也放在里面,刚好可以实现自定义特定文件的处理方式 -- 这样可以使用类内的缓冲区,而不是在读数据时,调用公共函数,将数据添加到公共临时缓冲区中
- 定义一个回指指针,指向tcp_server(按下不表,在后续说明)
tcp_server
是我们服务器的类
- 肯定先要包括之前我们封装好的epoll接口和socket接口对象
需要管理多个连接,也就是需要一个结构来将多个connection对象组织起来
- 使用unordered_map结构,建立fd->连接结构的映射关系
- 每次将新的要关注的文件添加到connections中,一旦有文件上的事件就绪,就可以通过fd,找到处理事件的方法
我们目前将读事件分为两类,所以需要两种回调函数
- 获取新连接 和 读取数据
- 那么,最好是先定义出针对各种类型的处理函数,然后根据文件类型,手动设置好我们需要的方法
- 因为我们是在服务器内部进行回调函数的设置,所以将回调函数定义在类内,使用会更方便
以上,我们可以定义一个函数来解决,总的来说分为两步:
- 将需要关注的[新文件上的特定事件]添加进epoll模型中 (内核层)
- 将[新文件+如何处理特定事件]添加进connections中 (用户层)
除此之外,我们可以直接在类内定义一个struct epoll_event数组,存放从内核捞取出的就绪事件,然后交给事件派发器
ET模式
保证服务器以ET模式工作,要设置相应的标志位:
以及,为了保证全部读取,需要将fd设置为非阻塞io方式 -- fcntl()
主逻辑
服务器不断循环,循环过程中派发事件
- 然后在派发器逻辑中,每次获取一个就绪事件,分辨是哪个文件上的哪个事件就绪了,然后调用注册好的回调函数
判断是否就绪:
- 我们检测是否是读事件就绪,和epoll的工作模式无关,只检测EPOLLIN就行了
如果出现异常(EPOLLERR,EPOLLHUP),统一转化为读写问题(设置两个标记位)
- 因为一旦出现异常,读写一定会受到影响,只要转化,就能在读写函数内部解决 ??
而且,在正式处理前,我们需要一个函数,来判断当前连接是否安全(是否是我们需要关注的文件),以及当前对应的处理函数是否被设置
处理事件
连接/数据到来时,我们无法确定只有一个连接/一份数据,并且这里是在ET模式下,必须要读取出所有连接
- 所以,我们写一个循环来获取连接/读取数据,直到读完
运行结果
我们从本地连接上该服务,会发现连接成功:
最后一条的警告,是我们在socket.hpp中封装的accept里显示的,因为此时是非阻塞式+底层没有数据,所以系统调用的accept走了返回值<0的情况,然后打印到日志上
- 我们可以在内部添加判断语句,如果错误码=EAGAIN,就不打印日志
代码
#pragma once#include <memory> #include <errno.h> #include <string> #include <functional> #include <fcntl.h>#include "Log.hpp" #include "socket.hpp" #include "myepoll.hpp"#include <unordered_map>class connection;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区 {int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 回指指针 public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}int get_fd(){return fd_;}private: };class epoll_server {static const int def_timeout = 1000;static const int def_num = 64;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port): port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb){// 添加到connections中 -- 用户层connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb)));// 添加到epoll模型 -- 内核p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}private:void accept(std::shared_ptr<connection> conn) // 处理连接事件{// 连接到来时,我们要循环处理,直到无数据while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(INFO, "get a new connection ,fd : %d\n", sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1), nullptr, nullptr);}else{// 如果底层无数据,也会错误返回,并设置错误码11if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void receiver(std::shared_ptr<connection> conn){}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %s is not safe\n", fd);return;}int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读取回调存在{connections_[fd]->read_cb_(connections_[fd]); // 调用读回调}if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果读取回调存在{connections_[fd]->write_cb_(connections_[fd]); // 调用读回调}}bool is_safe(int fd){auto it = connections_.find(fd);if (it != connections_.end()){return true;}else{return false;}}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);} };
完善功能
读取数据
完善普通文件的回调函数(监听套接字只需要处理读事件,但通信用的套接字需要三种事件都处理)
我们先写好读取数据的函数:
- 循环读取至读完全部数据
- 读取一段就放入输入缓冲区中(服务器不应该关心数据格式/是否是一份完整数据,只要把数据全拿到手就行)
- 因为我们是非阻塞式,所以一旦读取完毕,会返回错误,我们需要将返回值<0的情况分类: 读完数据 / 因异常信号中断读取 / 真的出错
- 然后,我们在真的出错时,调用异常处理函数
- 同理,在对方关闭连接时,也需要进入异常处理阶段
这里为了日志好看,可以在connection结构中增加两个字段 -- ip和port
运行结果
可以看到,随着我们的输入,打印出的[输入缓冲区的内容]变得更多:
代码
#pragma once#include <memory> #include <errno.h> #include <string> #include <functional> #include <fcntl.h>#include "Log.hpp" #include "socket.hpp" #include "myepoll.hpp"#include <unordered_map>class connection;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区 {int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 方便日志打印std::string ip_;uint16_t port_;// 回指指针 public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}int get_fd(){return fd_;}void append(const std::string &str){in_buffer_ += str;}std::string& inbuffer(){return in_buffer_;}private: };class epoll_server {static const int def_timeout = 1000;static const int def_num = 64;static const int def_buffsize = 128;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port): port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb){// 添加到connections中 -- 用户层connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb)));// 添加到epoll模型 -- 内核p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}private:void accept(std::shared_ptr<connection> conn) // 处理连接事件{// 连接到来时,我们要循环处理,直到无数据while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1),std::bind(&epoll_server::sender, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1));}else{// 如果底层无数据,也会错误返回,并设置错误码11if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void receiver(std::shared_ptr<connection> conn){while (true) // 读取至底层无数据{char buffer[def_buffsize];int n = read(conn->get_fd(), buffer, sizeof(buffer) - 1);if (n > 0) // 还没读完{buffer[n] = 0;conn->append(buffer);}else if (n == 0) // 对方关闭连接{lg(INFO, "sockfd: %d, client info %s:%d quit...", conn->get_fd(), conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}else // 出错/读完{if (errno == EAGAIN) // 读完全部数据{break;}else if (errno == EINTR){continue;}else // 真的出错{conn->except_cb_(conn); // 关注异常事件return;}}}}void sender(std::shared_ptr<connection> conn) {}void excepter(std::shared_ptr<connection> conn) {}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %s is not safe\n", fd);return;}int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读回调存在{connections_[fd]->read_cb_(connections_[fd]); // 调用读回调}if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果写回调存在{connections_[fd]->write_cb_(connections_[fd]); // 调用写回调}print(connections_[fd]);}bool is_safe(int fd){auto it = connections_.find(fd);if (it != connections_.end()){return true;}else{return false;}}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}void print(std::shared_ptr<connection> conn){std::cout << "fd: " << conn->get_fd() << " , ";std::cout << "in_buffer: " << conn->inbuffer().c_str() << std::endl;} };
处理数据
虽然我们存入了数据,但我们还没有处理数据
处理数据应该交给用户来决定(也就是使用回调函数,在参数中传入connection对象即可,里面包含该文件读到的所有数据),因为这部分属于应用层的事情
- 检测数据是否完整(协议定制,序列化/反序列化)
- 如果包含一份完整数据,进行处理(具体业务处理)
我们将之前写过的网络计算机代码拿过来用 -- 网络计算器(使用json序列化/反序列化,条件编译,注意点),json介绍+语法介绍_json序列化和反序列化工具-CSDN博客
网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客
- 直接定义一个函数,使用自定义协议来处理数据(将输入缓冲区中的数据做处理,然后把结果写回输出缓冲区)
- 然后在实例化服务器时,将该函数作为实参传进去
- 这样,就可以在读取完数据之后进行处理,并且发送 (注意,发送数据应该由服务器来做,也就需要我们的回指指针发挥用处 -- 回指服务器,然后由服务器调用发送函数)
回指指针
因为connection和epoll_server会互相引用
- server里保存了全部connection结构的指针,用于管理
- 而connection里也需要引用epoll_server,来使用里面的函数
- 这样就造成了循环引用问题
所以,需要我们将connection里存放的回指指针的类型定义成weak_ptr,当需要使用时再转换为shared_ptr
- 而我们传参时,必须通过shared_ptr来转换为weak_ptr,所以我们调用shared_from_this()来返回当前对象的shared_ptr
- 而调用该函数的前提是,让需要使用的类继承enable_shared_from_this这个模板类
如何处理写事件
引入
因为写事件关注的是发送缓冲区是否有空间
- 缓冲区经常都是有空间的,所以写事件经常会就绪
- 而一旦事件就绪,wait就会返回
- 但我们通常真正关心的是"是否有数据可以发送",而不是"是否有空间"
- 所以写事件,要按需设置是否关心 -- 代码体现
对于读事件来说,我们设置常关心
- 因为读事件看的就是有无数据
循环内
那我们该如何写入呢?
- 直接调用写函数(send/write)
- 并且要像读数据一样,需要把输出缓冲区内的数据全部写进文件的发送缓冲区中才行
- 所以要循环写入
并且,和读取不一样,需要我们手动删除输出缓冲区中的数据
- 因为读取是从内核读到用户层,内核会自动帮我们删除已读出的数据
接下来,说说函数返回值的问题:
- 如果返回值>0,说明此时成功写入了数据,需要我们删除已经写入的数据(如果已经将数据全部写完了,退出循环)
- 返回值为0,说明此时缓冲区内没有数据,压根没有写入数据,直接返回
- 注意这里[退出循环]和[直接返回]的区别,因为我们要在循环结束后,处理对写事件的关心
发送出错,分几种情况(和处理读数据一样)
- 底层缓冲区没有空间了,返回EWOULDBLOCK(=EAGIN=11)
- 被信号中断
- 真的出错
处理对写事件的关心
出循环后分为两种情况:
如果outbuffer里还有数据没写完 -- 设置对写事件的关心
- 因为此时受限于底层的缓冲区空间,所以需要关注写事件
- 一旦发送缓冲区有空间了,就会通知我们,然后回调我们的写处理函数,继续发送数据
outbuffer里的数据已经被写完了 -- 取消对写事件的关心
- 数据已经写完了,即使有空间也不需要,所以不用关注
以上可以自定义一个使能事件的函数,可以自主决定是否开启读/写事件
- 是否开启 -- bool类型字段
- 然后在内部调用epoll_ctl函数,来修改特定文件对事件的关注
异常处理
一旦走到异常处理的函数中,一定是出问题了
- 那就直接移除epoll中对该文件上事件的关心
- 关闭这个连接
- 从自定义的连接管理结构中移除
代码
server.hpp
#pragma once#include <memory> #include <errno.h> #include <string> #include <functional> #include <fcntl.h> #include <unordered_map>#include "Log.hpp" #include "socket.hpp" #include "myepoll.hpp"class connection; class epoll_server;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区 {int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 方便日志打印std::string ip_;uint16_t port_;// 使用 weak_ptr 防止循环引用std::weak_ptr<epoll_server> p_svr_;public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}void set_p_svr(std::weak_ptr<epoll_server> ptr){p_svr_ = ptr;}int get_fd() { return fd_; }void in_append(const std::string &str) { in_buffer_ += str; }void out_append(const std::string &str) { out_buffer_ += str; }std::string &inbuffer() { return in_buffer_; }std::string &outbuffer() { return out_buffer_; } };class epoll_server : public std::enable_shared_from_this<epoll_server>, public no_copy {static const int def_timeout = 1000;static const int def_num = 64;static const int def_buffsize = 128;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;func_t handle_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port, func_t handle): port_(port), handle_(handle), p_listen_sock_(new MY_SOCKET()), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb, const std::string &ip = "0.0.0.0", uint16_t port = 0){std::shared_ptr<connection> new_connection(new connection(fd, read_cb, write_cb, except_cb));new_connection->set_p_svr(shared_from_this()); // shared_from_this(): 返回当前对象的shared_ptr,要确保epoll_server已经以shared_ptr的形式存在(主函数中以shared_ptr形式实例化对象)new_connection->ip_ = ip;new_connection->port_ = port;connections_.insert(std::make_pair(fd, new_connection));p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}void receiver(std::shared_ptr<connection> conn){int fd = conn->get_fd();while (true) // 读取至底层无数据{char buffer[def_buffsize];memset(buffer, 0, sizeof(buffer));int n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0) // 还没读完{conn->in_append(buffer);}else if (n == 0) // 对方关闭连接{lg(INFO, "sockfd: %d, client info %s:%d quit", fd, conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}else // 出错/读完{if (errno == EAGAIN) // 读完全部数据{break;}else if (errno == EINTR){continue;}else // 真的出错{lg(WARNING, "sockfd: %d, client info %s:%d error", fd, conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}}}// 读完了数据,就该处理了,但不一定包含了一份完整报文handle_(conn);}void excepter(std::shared_ptr<connection> conn){int fd = conn->get_fd();lg(WARNING, "Excepter hander sockfd: %d, client info %s:%d excepter handler", fd, conn->ip_.c_str(), conn->port_);p_epoll_->ctl(EPOLL_CTL_DEL, fd, 0);close(fd);lg(DEBUG, "close %d done\n", fd);connections_.erase(fd);lg(DEBUG, "remove %d from _connections done\n", fd);}void sender(std::shared_ptr<connection> conn){auto &buffer = conn->outbuffer();int fd = conn->get_fd();while (true){ssize_t n = write(fd, buffer.c_str(), buffer.size()); // 将输出缓冲区的内容写入内核if (n > 0) // 写入一定数据{buffer.erase(0, n);if (buffer.empty()) // 数据写完了{break;}}else if (n == 0) // 没有数据可写{return;}else{if (errno == EAGAIN){break;}else if (errno == EINTR){continue;}else{lg(WARNING, "sockfd: %d, client info %s:%d send error...", conn->get_fd(), conn->ip_.c_str(), conn->port_);conn->except_cb_(conn);return;}}}// 判断接下来是否需要关注写事件if (buffer.empty()){enable_event(fd, true, false);}else{enable_event(fd, true, true);}}private:void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void accept(std::shared_ptr<connection> conn) // 处理连接事件{while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1),clientip, clientport);}else{if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %d is not safe\n", fd);return;}auto conn = connections_[fd];if (!conn)return;int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && conn->read_cb_) // 如果读回调存在{conn->read_cb_(conn); // 调用读回调}if ((event & EPOLLOUT) && conn->write_cb_) // 如果写回调存在{conn->write_cb_(conn); // 调用写回调}}bool is_safe(int fd){return connections_.find(fd) != connections_.end(); // 是否在connections结构中存在}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}void enable_event(int fd, bool f_read, bool f_write){if (fd < 0){lg(ERROR, "Invalid file descriptor: %d", fd);return;}uint32_t event = 0;event |= ((f_read ? EPOLLIN : 0) | (f_write ? EPOLLOUT : 0) | EPOLLET);p_epoll_->ctl(EPOLL_CTL_MOD, fd, event);} };
server.cpp
#include "server.hpp" #include "cal.hpp"void def_handle(std::weak_ptr<connection> conne) {if (conne.expired())return;auto conn = conne.lock();calculate Cal;std::string str = Cal.cal(conn->inbuffer()); // 处理数据,得到结果if (str.empty()){return;}//lg(DEBUG, "get data: %s\n", str.c_str());conn->out_append(str); // 添加到输出缓冲区//lg(DEBUG, "out_append success\n");// 写入auto server = conn->p_svr_.lock(); // weak_ptr不拥有对象的所有权,需要转换为shared_ptrserver->sender(conn); // 需要让服务器调用写处理函数,后续让服务器擦屁股(也许没有写入全部数据)//lg(DEBUG, "sender success\n"); }int main() {std::shared_ptr<epoll_server> epoll_svr(new epoll_server(8080, def_handle));epoll_svr->loop();return 0; }
其他代码在压缩包里
运行结果
我们把网络计算器的客户端也拿过来 -- 网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客
直接做测试: