Linux知识点 -- 高级IO(二)

Linux知识点 – 高级IO(二)

文章目录

  • Linux知识点 -- 高级IO(二)
  • 一、IO多路转接 -- poll
    • 1.poll接口
    • 2.poll实现
    • 3.poll优缺点
  • 二、IO多路转接 -- epoll
    • 1.epoll接口
    • 2.epoll的工作原理
    • 3.epoll服务器实现
    • 4.epoll的优点
    • 5.epoll的工作模式
    • 6.Reactor服务器


一、IO多路转接 – poll

1.poll接口

在这里插入图片描述
参数:

  • fds:传入的struct pollfd结构体的起始地址;
  • nfds:传入的结构体的个数;
    前两个参数代表所有传入的struct pollfd结构体;
  • timeout:为0,表示非阻塞方式等待;为-1,表示阻塞等待;大于0 的值,表示等待n毫秒后返回;
  • 返回值:大于0,表示有几个文件描述符就绪;等于0,超时timeout;小于0,poll失败;

poll的输入输出参数是分离的,因为struct pollfd结构体的内部成员有很多,可以完成不同的功能;
在这里插入图片描述

  • fd:文件描述符,一旦设置好,调用和返回时,都不会变;
  • events:用户通知内核,需要帮助我关心这个fd的哪些事件
  • revents:内核通知用户,这个fd的哪些事件已经就绪了
  • events和revents都是使用标记位传参,如下图
    在这里插入图片描述

2.poll实现

poll和select对fd的处理过程相近,只是具体对多个fd的管理方式有所不同;
Log.hpp
同select;

Sock.hpp
同select;

main.cc

#include "pollServer.hpp"
#include<memory>int main()
{// 1. fd_set是一个固定大小位图,直接决定了select能同时关心的fd的个数是有上限的!// std::cout << sizeof(fd_set) * 8 << std::endl;std::unique_ptr<PollServer> svr(new PollServer);svr->Start();return 0;
}

pollServer.hpp

#ifndef __POLL_SVR_H__
#define __POLL_SVR_H__#include <iostream>
#include <string>
#include <vector>
#include <poll.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"#define FD_NONE -1 // 文件描述符初始化状态
using namespace std;class PollServer
{
public:static const int nfds = 100;public:PollServer(const uint16_t &port = 8080): _port(port), _nfds(nfds){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);logMessage(DEBUG, "%s", "create base socket success");_fds = new struct pollfd[_nfds];for(int i = 0; i < _nfds; i++){//初始化所有的struct pollfd_fds[i].fd = FD_NONE;_fds[i].events = _fds[i].revents = 0;}// 将listensock加入poll关心_fds[0].fd = _listensock;_fds[0].events = POLLIN;_timeout = 1000;}void Start(){while (true){int n = poll(_fds, _nfds, _timeout);switch (n){case 0:logMessage(DEBUG, "timeout");break;case -1:logMessage(WARNING, "poll error: %d : %s", errno, strerror(errno));break;default:// select成功logMessage(DEBUG, "get a new link event");HandlerEvent(); // 对就绪的fd进行处理break;}}}~PollServer(){if (_listensock >= 0){close(_listensock);}if(_fds){delete[] _fds;}}private:void HandlerEvent(){for (int i = 0; i < _nfds; i++){// 1.去掉不合法fdif (_fds[i].fd == FD_NONE){continue;}// 2.合法fd也不一定就绪了if (_fds[i].revents & POLLIN){// 指定的fd,读事件就绪// 读事件就绪:连接事件到来,acceptif (_fds[i].fd == _listensock){Accepter(); // listensock需要进行accept}else{Recver(i); // 普通sock进行recv}}}}void Accepter(){string clientip;uint16_t clientport = 0;// listensock上面的读事件就绪了,表示可以读取了// 获取新连接了int sock = Sock::Accept(_listensock, &clientip, &clientport); // 在这里进行accept是不会阻塞的if (sock < 0){logMessage(WARNING, "%s", "accept error");return;}logMessage(DEBUG, "get a new link success : [%s:%d] : %d", clientip.c_str(), clientport, sock);int pos = 1;for (; pos < _nfds; pos++){if (_fds[pos].fd == FD_NONE) // 找出未设置合法fd的位置{break;}}if (pos == _nfds) // 数组满了{logMessage(WARNING, "%s:%d", "poll server already full,close: %d", sock);close(sock);}else{_fds[pos].fd = sock;_fds[pos].events = POLLIN;}}void Recver(int pos){// 读事件就绪:INPUT事件到来,recv,readlogMessage(DEBUG, "message in, get IO event: %d", _fds[pos].fd);// 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞// 这样读取有bug吗?有的,你怎么保证以读到了一个完整报文呢?char buffer[1024];int n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client[%d]# %s", _fds[pos].fd, buffer);}else if (n == 0) // 对端关闭连接{logMessage(DEBUG, "client[%d] quit, me too...", _fds[pos].fd);// 1.我们也要关闭不需要的fdclose(_fds[pos].fd);// 2.不要让select帮我关心当前的fd了_fds[pos].fd = FD_NONE;_fds[pos].events = 0;}else{logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno));// 1.我们也要关闭不需要的fdclose(_fds[pos].fd);// 2.不要让select帮我关心当前的fd了_fds[pos].fd = FD_NONE;_fds[pos].events = 0;}}// void DebugPrint()// {//     cout << "_fd_array[]: ";//     for (int i = 0; i < _nfds; i++)//     {//         if (_fds[pos].fd == FD_NONE)//             continue;//         cout << _fds[pos].fd << " ";//     }//     cout << endl;// }private:uint16_t _port;int _listensock;struct pollfd *_fds;int _nfds;int _timeout;
};
#endif

运行结果:
在这里插入图片描述

3.poll优缺点

优点:

  1. 效率高;
  2. 有大量的连接,但是只有少量的是活跃的,节省资源;
  3. 输入输出参数分离的,不需要进行大量的重置;
  4. poll参数级别,没有可以管理的fd的上限;

poll缺点:
5. poll依旧需要不少的遍历,在用户层检测时间就绪,与内核检测fd就绪,都是一样,用户还是要维护数组;
6. pol需要内核到用户的拷贝 – 少不了的;
7. poll的代码也比较复杂 – 比select容易;

二、IO多路转接 – epoll

1.epoll接口

epoll有三个接口:
epoll_create
在这里插入图片描述

  • 创建epoll模型;
  • size参数现在多半是废弃的,一般写成512或256;
  • 返回值是一个文件描述符;

epoll_ctl
在这里插入图片描述

  • epfd:epoll_create返回的fd;
  • op:对这个epoll模型进行什么操作(增、删、改)
    在这里插入图片描述
  • fd:需要关心的文件描述符;
  • event:关心该fd的什么事件;
    event是struct epoll_event*类型的,其实是一个struct epoll_event类型的数组,每个struct epoll_event结构体里面都存储对应fd的信息和事件类型;
    在这里插入图片描述
    struct epoll_event结构体中的events成员可以是以下几个宏的集合:
    在这里插入图片描述
    struct epoll_event结构体中的data成员是epoll_data_t类型的联合体,可以储存fd的信息:
    在这里插入图片描述
  • 返回值:返回0表示调用成功;返回-1表示调用失败;

epoll_wait
在这里插入图片描述

  • 在epoll模型中获取已经就绪的事件,timeout参数与poll是一样的;
  • epfd:epoll_create返回的fd;
  • events:分配好的epoll_events结构体数组;
  • maxevents:events数组有多大;
  • 返回值:为0,表示timeout;为-1,表示wait失败;大于0,表示有几个关心的文件描述符的事件就绪了;

2.epoll的工作原理

回想select和poll的工作流程:

  1. 无论是select还是poll,都是需要用户自己维护一个数组来进行保存fd,与特定的事件的;
  2. select or poll都要遍历
  3. select or poll工作模式
    a.通过select or poll,用户告诉内核,你要帮我关心哪些fd上的哪些event;
    b.通过select or poll返回,内核告诉用户,哪些fd上的哪些event已经发生了;

操作系统是通过什么方式得知网卡里面有数据的? – 硬件中断;网卡数据就绪后,会触发硬件中断来通知OS取数据;

epoll工作原理

  • 调用epoll_create接口创建一个epoll模型,OS会为用户维护一个红黑树结构
    在这里插入图片描述
    红黑树节点:用户告诉内核,需要关心哪些fd的哪些事件,等价于poll所维护的数组;

  • 在epoll中,对于每一个事件,都会建立一个epitem结构体:
    在这里插入图片描述

  • OS还会维护一个就绪队列,用于通知用户哪些事件已经就绪;
    在这里插入图片描述
    当某个fd上的某个事件就绪了,OS会在就绪队列上生成一个节点;

  • OS可以设定一个回调函数,可以被注册进底层,一旦底层有数据,就会调用回调函数;
    在这里插入图片描述
    有了回调函数就不用OS进行频繁的遍历来查找事件是否就绪了

  • 调用epoll_create构建红黑树,建立底层回调函数,构建就绪队列;
    调用epoll_ctl向特定epoll模型中增加、修改或删除特定fd上的特定事件(修改红黑树);
    调用epoll_wait如果就绪队列不为空,则把发生的事件复制到用户态,同时将事件的数量返回给用户,这个操作的时间复杂度是O(1);

  • 对于epoll_create的返回值:完成epoll模型中所有任务的一定是一个进程;
    指向epoll模型对应的数据结构的指针保存在一个文件中,epoll_create在创建完epoll模型后就会返回该文件描述符
    后面进程在调用epoll模型时,就能够通过该fd找到对应的数据结构;
    在这里插入图片描述

细节

  1. 红黑树的时候,是要有key值的,使用文件描述符作为key值
  2. 用户只需要设置关系,获取结果即可,不用关心任何对fd与event的管理细节
  3. 底层只要有fd就绪了,OS自己会给我构建节点,连入到就绪队列中;
    上层只需要不断的从就绪队列中将数据拿走,就完成了获取就绪事件的任务;
    这也是一个生产者消费者模型;对于共享资源 – epoll已经保证所有的epoll接口都是线程安全的
  4. 如果底层没有就绪事件呢? 我们的上层应该怎么办?阻塞等待
  5. 在epoll_wait的时候,如果底层就绪的sock非常多,revs承装不下,怎么办?一次拿不完,就下一次再拿;
  6. 关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有就绪的event按照顺序放入到revs数组中,一共有返回值个;
  7. epoll为什么高效
    a. 用户不用管理文件描述符;
    b. OS不用浪费精力在文件描述符的事件监测上;
    c. 事件就绪后直接放到就绪队列中,用户直接从就绪队列中取走事件就可以了;

3.epoll服务器实现

Sock.hpp
同select

Log.hpp
同select

epoll.hpp
对epoll接口的封装;

#pragma once#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>class Epoll
{
public:static const int gsize = 256;
public:static int CreateEpoll(){int epfd = epoll_create(gsize);if(epfd > 0){return epfd;}exit(5);}static bool CtlEpoll(int epfd, int oper, int sock, uint32_t events){struct epoll_event ev;ev.events = events;ev.data.fd = sock; // 存储sockint n = epoll_ctl(epfd, oper, sock, &ev);return n == 0;}static int WaitEpoll(int epfd, struct epoll_event revs[], int num, int timeout){// 细节1:如果底层就绪的sock非常多,revs承装不下,怎么办??不影响!一次拿不完,就下一次再拿// 细节2:关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有//       就绪的event按照顺序放入到revs数组中!一共有返回值个!return epoll_wait(epfd, revs, num, timeout);}
};

epollServer.hpp
epoll服务器

  • 成员变量包括一个epoll_event数组的地址,该数组的大小,以及一个回调函数指针;
  • 构造时就需要开好数组的空间,创建epoll模型,并在创建好listen套接字后,就将listensock加入epoll模型中进行监测;
  • 在有事件就绪后,需要判断是listensock的事件就绪,还是普通sock的事件就绪,两者需要调用不同的方法;
  • 回调函数的作用是在接收到数据后,可以由用户自定义数据的处理方式,在写好处理函数后,传入对象内部即可;
#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
#include "Log.hpp"
#include "Sock.hpp"
#include "epoll.hpp"namespace ns_epoll
{const static int default_port = 8080;const static int gnum = 64;class EpollServer{using func_t = std::function<void(std::string)>; // 处理数据时时候回调函数处理public:EpollServer(func_t HandlerRequest, const int& port = default_port): _port(port), _revs_num(gnum), _HandlerRequest(HandlerRequest){// 1.申请对应的epoll_event数组的空间_revs = new struct epoll_event[_revs_num];// 2.创建listensock_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 3.创建epoll模型_epfd = Epoll::CreateEpoll();logMessage(DEBUG, "init success, listensock: %d, epfd: %d", _listensock, _epfd);// 4.将listensock添加到epoll模型中,让它帮忙管理if(!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, _listensock, EPOLLIN)){exit(6);}logMessage(DEBUG, "add listensock to epoll success.");}void Accepter(){std::string clientip;uint16_t clientport;int sock = Sock::Accept(_listensock, &clientip, &clientport);if(sock < 0){logMessage(WARNING, "accept error!");return;}//不能直接读取,因为不知道底层数据是否就绪//因此需要将sock再加入epoll模型进行监测if(!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, sock, EPOLLIN)){return;}logMessage(DEBUG, "add new sock : %d to epoll success", sock);}void Recver(int sock){//1.读取数据char buffer[10240];ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);if(n > 0){//假设读取到了一个完整的报文buffer[n] = 0;_HandlerRequest(buffer); // 调用回调函数来处理数据,可以有不同的回调函数传进来}else if(n == 0) // 读取完成{// 1. 先在epoll中去掉对sock的关心,因为epoll操作的需要是合法文件描述符,否则会报错bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);assert(res);(void)res;//2.再close文件close(sock);logMessage(NORMAL, "client %d quit, me too...", sock);}else // 读取异常{// 1. 先在epoll中去掉对sock的关心,因为epoll操作的需要是合法文件描述符,否则会报错bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);assert(res);(void)res;//2.再close文件close(sock);logMessage(NORMAL, "client recv %d error, close error sock", sock);}}void HandlerEvents(int n) // 只需遍历到已经就绪的fd就好{assert(n > 0);for(int i = 0; i < n; i++){uint32_t revents = _revs[i].events;int sock = _revs[i].data.fd;//读事件就绪if(revents & EPOLLIN){// 如果是listensock就绪if(sock == _listensock){Accepter();}else {Recver(sock);}}}}void LoopOnce(int timeout) //一次循环,epoll的一次等待{int n = Epoll::WaitEpoll(_epfd, _revs, _revs_num, timeout);switch(n){case 0:logMessage(DEBUG, "timeout...");break;case -1:logMessage(WARNING, "epoll wait error: %s", strerror(errno));break;default://等待成功logMessage(DEBUG, "get a event");HandlerEvents(n); // 将已经就绪的事件的数量传入HandlerEventsbreak;}}void Start(){int timeout = -1;while(true){LoopOnce(timeout);}}~EpollServer(){if(_listensock > 0){close(_listensock);}if(_epfd > 0){close(_epfd);}if(_revs){delete[] _revs;}}private:int _listensock;int _epfd;uint16_t _port;struct epoll_event* _revs; // 已经就绪的fdint _revs_num; // 已经就绪的fd的数量func_t _HandlerRequest; // 处理数据的回调函数};
}
#endif

main.cc

#include "epollServer.hpp"
#include<memory>using namespace std;
using namespace ns_epoll;void change(std::string request)
{//完成业务逻辑cout << "change : " << request << endl;
}int main()
{// 1. fd_set是一个固定大小位图,直接决定了select能同时关心的fd的个数是有上限的!// std::cout << sizeof(fd_set) * 8 << std::endl;std::unique_ptr<EpollServer> svr(new EpollServer(change));svr->Start();return 0;
}

运行结果:
在这里插入图片描述

4.epoll的优点

  • 接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离开;
  • 数据拷贝轻量:只在合适的时候调用EPOLL CTL ADD将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/polI都是每次循环都要进行拷贝);
  • 事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度0(1);即使文件描述符数目很多,效率也不会受到影响;
  • 没有数量限制:文件描述符数目无上限;

5.epoll的工作模式

epoll有两种工作模式:

  • 水平触发(LT):如果epoll服务器里面有该文件描述符的数据,就会一直通知该fd(select,poll,epoll的默认模式);
    • 当epoll检测到socket上事件就绪的时候可以不立刻进行处理,或者只处理一部分
    • 例如,由于只读了1K数据,缓冲区中还剩1 K数据,在第二次调用epoll wait时,epoll wait仍然会立刻返回并通知socket读事件就绪;
    • 直到缓冲区上所有的数据都被处理完,epoll_wait才不会立刻返回;
    • 支持阻塞读写和非阻塞读写;
  • 边缘触发(ET):如果epoll服务器是首次有该文件描述符的数据,或者是数据变多(变化)的时候,服务器才会通知该fd;
    如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志,epolli进入ET工作模式;
    • 当epoll检测到socket上事件就绪时,必须立刻处理
    • 例如,虽然只读了1 K的数据,缓冲区还剩1 K的数据,在第二次调用epoll wait的时候,epoll wait 不会再返回了;
    • 也就是说ET模式下文件描述符上的事件就绪后,只有一次处理机会
    • ET的性能比LT性能更高(epoll wait 返回的次数少了很多),Nginx默认采用ET模式使用epoll;
    • 只支持非阻塞的读写

ET模式更加高效

  1. 更少的epoll_wait返回次数;
  2. ET模式会倒逼程序员尽快将接收缓冲区中的数据全部取走,应用层尽快的取走了缓冲区中的数据,那么在单位时间下,该模式下工作的服务器,就可以在一定程度上,给发送方发送一个更大的接收窗口,所以对方就可以有更大的滑动窗口,一次发送更多的数据,提高IO吞吐;

6.Reactor服务器

Reactor服务器是epoll工作在ET模式下的服务器;

Sock.hpp

  • 增加对sock设置非阻塞的接口;
  • 在Accept接口增加输出参数,输出accept的错误码;
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
#include <fcntl.h>
#include "Log.hpp"class Sock
{
private:const static int gbacklog = 20;public:Sock() {}static int Socket(){int listensock = socket(AF_INET, SOCK_STREAM, 0);if (listensock < 0){logMessage(FATAL, "create socket error, %d:%s", errno, strerror(errno));exit(2);}logMessage(NORMAL, "create socket success, listensock: %d", listensock);return listensock;}static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0"){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);inet_pton(AF_INET, ip.c_str(), &local.sin_addr);if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno));exit(3);}}static void Listen(int sock){if (listen(sock, gbacklog) < 0){logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno));exit(4);}logMessage(NORMAL, "init server success");}// 一般经验// const std::string &: 输入型参数// std::string *: 输出型参数// std::string &: 输入输出型参数// 输出accept的错误码,用于判断accept的状态static int Accept(int listensock, std::string *ip, uint16_t *port, int* accept_errno){struct sockaddr_in src;socklen_t len = sizeof(src);*accept_errno = 0;int servicesock = accept(listensock, (struct sockaddr *)&src, &len);if (servicesock < 0){logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno));*accept_errno = errno;return -1;}if(port) *port = ntohs(src.sin_port);if(ip) *ip = inet_ntoa(src.sin_addr);return servicesock;}static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port){struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(server_port);server.sin_addr.s_addr = inet_addr(server_ip.c_str());if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;else return false;}static bool SetNonBlock(int sock) // 设置sock为非阻塞{int fl = fcntl(sock, F_GETFL);if(fl < 0)return false;fcntl(sock, F_SETFL, fl | O_NONBLOCK);return true;}~Sock() {}
};

Log.hpp
同select;

Epoll.hpp

  • 封装epoll的三个接口;
#pragma once
#include <iostream>
#include <sys/epoll.h>class Epoll
{const static int gnum = 128;const static int gtimeout = 5000;
public:Epoll(int timeout = gtimeout): _timeout(timeout){}void CreateEpoll(){_epfd = epoll_create(gnum);if(_epfd < 0){exit(5);}}bool AddSockToEpoll(int sock, uint32_t events){struct epoll_event ev;ev.events = events;ev.data.fd = sock;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);return n == 0;}int WaitEpoll(struct epoll_event revs[], int num){return epoll_wait(_epfd, revs, num, _timeout);}~Epoll(){}private:int _epfd;int _timeout;
};

Protocol.hpp

  • 报文的序列化和反序列化;
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
// 1. 报文和报文之间,我们采用特殊字符来进行解决粘报问题
// 2. 获取一个一个独立完整的报文,序列和反序列化 -- 自定义
// 100+19X100+19X100+19
// 支持解决粘报问题,处理独立报文
#define SEP "X"
#define SEP_LEN strlen(SEP)
// 自己手写序列反序列化
#define SPACE " "
#define SPACE_LEN strlen(SPACE)
// 我们要把传入进来的缓冲区进行切分
// 1. buffer被切走的,也同时要从buffer中移除
// 2. 可能会存在多个报文,多个报文依次放入out
// buffer: 输入输出型参数
// out: 输出型参数
void SpliteMessage(std::string &buffer, std::vector<std::string> *out)
{// 100+// 100+19X1// 100+19X100+19while (true){auto pos = buffer.find(SEP);if (std::string::npos == pos)break;std::string message = buffer.substr(0, pos);buffer.erase(0, pos + SEP_LEN);out->push_back(message);// std::cout << "debug: " << message << " : " << buffer << std::endl;// sleep(1);}
}
// TODO
//
std::string Encode(std::string &s)
{return s + SEP;
}class Request
{
public:std::string Serialize(){std::string str;str = std::to_string(x_);str += SPACE;str += op_; // TODOstr += SPACE;str += std::to_string(y_);return str;}bool Deserialized(const std::string &str) // 1 + 1{std::size_t left = str.find(SPACE);if (left == std::string::npos)return false;std::size_t right = str.rfind(SPACE);if (right == std::string::npos)return false;x_ = atoi(str.substr(0, left).c_str());y_ = atoi(str.substr(right + SPACE_LEN).c_str());if (left + SPACE_LEN > str.size())return false;elseop_ = str[left + SPACE_LEN];return true;}public:Request(){}Request(int x, int y, char op) : x_(x), y_(y), op_(op){}~Request() {}public:int x_;   // 是什么?int y_;   // 是什么?char op_; // '+' '-' '*' '/' '%'
};
class Response
{
public:// "code_ result_"std::string Serialize(){std::string s;s = std::to_string(code_);s += SPACE;s += std::to_string(result_);return s;}// "111 100"bool Deserialized(const std::string &s){std::size_t pos = s.find(SPACE);if (pos == std::string::npos)return false;code_ = atoi(s.substr(0, pos).c_str());result_ = atoi(s.substr(pos + SPACE_LEN).c_str());return true;}public:Response(){}Response(int result, int code) : result_(result), code_(code){}~Response() {}public:// 约定!// result_? code_? code_ 0? 1?2?3?int result_; // 计算结果int code_;   // 计算结果的状态码
};

TcpServer.hpp

  • 为了保证未来正确的读取,每一个sock都要有属于自己的缓冲区;
  • 设置一个Connection类,将每个sock及其读写回调函数和缓冲区都封装起来,作为一个连接对象,TcpServer中会维护大量的Connection连接;
  • bind:绑定函数参数,返回的是一个函数对象,placeholders::_1是一个占位符,使用时需传递一个参数;
    由于类内成员函数的第一个参数默认为this指针,因此类内成员函数在作为回调函数时,需要先将第一个参数绑定为this指针;
  • 将所有Connection管理起来,通过设置一个哈希表将sock和Connection建立起映射,通过sock能够查找到Connection,进而能够调用相应方法;
  • 通过回调方法的不同,来区分listensock和普通sock;
  • 由于是ET模式,因此每次读取的时候,都需要将已经就绪的数据全部读取,sock是非阻塞模式,在底层没有数据的时候,就会报错结束读取,而不会阻塞;
  • recv每次接收到的数据都存到连接自己的接收缓冲区;
  • 将TCP服务器与上层业务解耦,在服务器类中有一个上层业务处理的回调函数接口_cb,在每次调用业务分派的时候指定数据处理方法,当每次读取到完整报文后,就会调用函数进行处理;
  • 在Recver接口中,数据保存到连接的接收缓冲区后,会进行请求分割,将字符串分割成一个个的完整请求,如果有不完整的请求,就先保留在缓冲区中,等待剩下的完整数据到来,完整的请求保存在vector中,在调用上层业务逻辑进行处理;
  • 第一次发送之前,epoll服务器并没有关心此sock的发送时间,因此在数据准备好,发送之前,需要在业务层触发该sock的写事件;
  • Sender发送的时候也是一样的逻辑,不能保证全部发送完成,但是可以保证,如果没有出错,一定是要么发完,要么发送条件不满足,下次发送;
#pragma once
#pragma once
#include <iostream>
#include <functional>
#include <string>
#include <vector>
#include <cerrno>
#include <cassert>
#include <unordered_map>
#include "Sock.hpp"
#include "Log.hpp"
#include "Epoll.hpp"
#include "Protocol.hpp"
class TcpServer;
class Connection;using func_t = std::function<void(Connection *)>;
using callback_t = std::function<void(Connection *, std::string &)>;// 我们为了能够正常工作,常规的sock必须是要有自己独立的接收缓冲区&&发送缓冲区
class Connection
{
public:Connection(int sock = -1): _sock(sock), _tsvr(nullptr){}void SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}~Connection(){}public:// 负责进行IO的文件描述符int _sock;// 三个回调方法,表征的就是对sock进行特定读写对应的方法func_t _recv_cb;func_t _send_cb;func_t _except_cb;// 接收缓冲区和发送缓冲区std::string _inbuffer;std::string _outbuffer;// 设置对TcpServer的回指指针TcpServer *_tsvr;
};// TcpServer中会维护大量的Connection连接class TcpServer
{const static int gport = 8080;const static int gnum = 128;public:TcpServer(int port = gport): _port(port), _revs_num(gnum){// 1.创建listensock_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2.创建多路转接对象_epoll.CreateEpoll();// 3. 添加listensock到epoll服务器中// 设置listensock的接收回调函数// bind:绑定函数参数,返回的是一个函数对象,placeholders::_1是一个占位符,使用时需传递一个参数// 由于Accepter是类内成员,因此第一个参数是隐藏的this指针AddConnection(_listensock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);// 4.构建一个获取就绪事件的缓冲区_revs = new struct epoll_event[_revs_num];}// 将sock添加到epoll服务器中void AddConnection(int sock, func_t recv_cb, func_t send_cb, func_t except_cb){Sock::SetNonBlock(sock);// 除了listensock,未来服务器中会存在大量的socket,每一个sock都必须被封装为一个Connection// 当服务器中存在大量Connection的时候,TcpServer就需要将所有的Connection进行管理:先描述,再组织//  1.构建Connection对象,封装sockConnection *conn = new Connection(sock);// 设置回调函数conn->SetCallBack(recv_cb, send_cb, except_cb);conn->_tsvr = this;// 2.添加sock到epoll中_epoll.AddSockToEpoll(sock, EPOLLIN | EPOLLET); // epoll默认为LT模式,需设置为ET模式// 3.还要将对应的Connection*对象指针添加到Connections的映射表中_connections.insert(std::make_pair(sock, conn));}void Accepter(Connection *conn) // 所有的连接都是封装到Connection对象中的,因此所有回调函数的参数都是Connection*{// logMessage(DEBUG, "Accepter been called");//  一定是listensock就绪了,此次读取是不会阻塞的while (true) // 需要一次读完所有的数据,底层不一定只有一个连接就绪{std::string clientip;uint16_t clientport;int accept_errno = 0; // 获取accept的错误码// sock一定是常规的IO cokint sock = Sock::Accept(conn->_sock, &clientip, &clientport, &accept_errno);if (sock < 0){if (accept_errno == EAGAIN || accept_errno == EWOULDBLOCK) // 底层没链接了{break;}else if (accept_errno == EINTR) // accept被中断了{continue;}else{// accept失败logMessage(WARNING, "accept error, %d : %s", accept_errno, strerror(accept_errno));break;}}// 将sock托管给TcpServerif (sock >= 0){AddConnection(sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1),std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Excepter, this, std::placeholders::_1));logMessage(DEBUG, "accept client %s:%d success, add to epoll&&TcpServer success, sock: %d",clientip.c_str(), clientport, sock);}}}// 使能读写void EnableReadWrite(Connection *conn, bool readable, bool writeable){uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0));bool res = _epoll.CtlEpoll(conn->_sock, events);assert(res);}void Recver(Connection *conn){const int num = 1024;bool err = false;while (true){char buffer[num];ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK)break; // 正常的else if (errno == EINTR)continue; // 被中断else{logMessage(ERROR, "recv error, %d : %s", errno, strerror(errno));conn->_except_cb(conn);err = true; // 出错break;}}else if (n == 0) // 对端关闭{logMessage(DEBUG, "client[%d] quit, server close [%d]", conn->_sock, conn->_sock);conn->_except_cb(conn);err = true;break;}else{// 读取成功buffer[n] = 0;conn->_inbuffer += buffer; // 每次接收到的数据都存到自己的接收缓冲区}}logMessage(DEBUG, "conn->_inbuffer[sock: %d]: %s", conn->_sock, conn->_inbuffer.c_str());if (!err) // 没有出错{std::vector<std::string> messages;SpliteMessage(conn->_inbuffer, &messages); // 分割成一个个的完整请求,如果有不完整求情,就留在缓冲区等待下次读取// 能保证走到这里,就是一个完整的报文for (auto &msg : messages){_cb(conn, msg);}}}// 最开始的时候,我们的连接中的写事件是没有被触发的,此时epoll服务器不关心此链接的写事件// 因此需要在业务逻辑中触发写事件void Sender(Connection *conn){while (true){ssize_t n = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);if (n > 0){conn->_outbuffer.erase(0, n);if (conn->_outbuffer.empty())break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{logMessage(ERROR, "send error, %d : %s", errno, strerror(errno));conn->_except_cb(conn);break;}}}// 不能保证全部发送完成,但是可以保证,如果没有出错,一定是要么发完,要么发送条件不满足,下次发送if (conn->_outbuffer.empty()){EnableReadWrite(conn, true, false); // 发完了,就关闭epoll对此sock的写关心}else{EnableReadWrite(conn, true, true); // 没发完,就继续发}}void Excepter(Connection *conn){if (!IsConnectionExists(conn->_sock))return;// 1. 从epoll中移除bool res = _epoll.DelFromEpoll(conn->_sock);assert(res); // 要判断// 2. 从我们的unorder_map中移除_connections.erase(conn->_sock);// 3. close(sock);close(conn->_sock);// 4. delete conn;delete conn;logMessage(DEBUG, "Excepter 回收完毕,所有的异常情况");}void LoopOnce(){int n = _epoll.WaitEpoll(_revs, _revs_num); // 获取已就绪事件for (int i = 0; i < n; i++){int sock = _revs[i].data.fd;uint32_t revents = _revs[i].events;// 将所有的异常,全部交给read或者write来统一处理!if (revents & EPOLLERR)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLHUP)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLIN) // 接收事件{// 如果该连接存在且连接的接收回调方法存在,就是接收事件成功触发if (IsConnectionExists(sock) && _connections[sock]->_recv_cb != nullptr){// listensock和普通sock在构建Connection对象时传入的回调函数不同,以此来区分_connections[sock]->_recv_cb(_connections[sock]);}}if (revents & EPOLLOUT){if (IsConnectionExists(sock) && _connections[sock]->_send_cb != nullptr){_connections[sock]->_send_cb(_connections[sock]);}}}}// 根据就绪的事件,进行特定事件的派发void Dispather(callback_t cb){_cb = cb;while (true){LoopOnce();}}bool IsConnectionExists(int sock){auto iter = _connections.find(sock);if (iter == _connections.end())return false;elsereturn true;}~TcpServer(){if (_listensock >= 0)close(_listensock);if (_revs)delete[] _revs;}private:int _listensock;int _port;Epoll _epoll;// sock : connection 产生映射std::unordered_map<int, Connection *> _connections; // 管理connectionstruct epoll_event *_revs;                          // 保存就绪事件的数组int _revs_num;// 上层业务处理//  将TCP服务与上层服务解耦callback_t _cb;
};

main.cc

  • 网络计算器的业务逻辑
#include "TcpServer.hpp"
#include<memory>static Response calculator(const Request &req)
{Response resp(0, 0);switch (req.op_){case '+':resp.result_ = req.x_ + req.y_;break;case '-':resp.result_ = req.x_ - req.y_;break;case '*':resp.result_ = req.x_ * req.y_;break;case '/':if (0 == req.y_)resp.code_ = 1;elseresp.result_ = req.x_ / req.y_;break;case '%':if (0 == req.y_)resp.code_ = 2;elseresp.result_ = req.x_ % req.y_;break;default:resp.code_ = 3;break;}return resp;
}void NetCal(Connection* conn, std::string& request)
{logMessage(DEBUG, "NetCal been called, get request: %s", request.c_str());// 1.反序列化Request req;if(!req.Deserialized(request))return;// 2.业务处理Response resp = calculator(req);// 3.序列化,构建应答std::string sendstr = resp.Serialize();// 4.交给服务器connconn->_outbuffer += sendstr;// 5.让底层的TcpServer开始发送// a.需要有完整的发送逻辑// b.我们触发发送的动作,一旦我们开启EPOLLOUT,epoll会自动立马触发一次发送事件就绪,// 如果后续保持发送的开启,epoll会一直发送conn->_tsvr->EnableReadWrite(conn, true, true);
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer());svr->Dispather(NetCal);return 0;
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/162728.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Django REST Framework完整教程-认证与权限-JWT的使用

文章目录 1.认证(Authentication)与权限(Permission)1.1.视图添加权限1.2.登录验证1.3.常用DRF自带权限类1.4.自定义权限类1.5.全局权限1.6.函数视图权限 2.认证详解2.1.认证方案2.2.如何使用TokenAuthentication&#xff1f; 3.JSON Web Token(JWT)认证3.1.工作原理3.2.安装3.…

Java学习笔记(四)——程序控制结构

一、顺序控制 二、分支控制 &#xff08;一&#xff09;单分支 &#xff08;二&#xff09;双分支 &#xff08;三&#xff09;多分支 &#xff08;四&#xff09;嵌套分支 &#xff08;五&#xff09;switch分支结构 &#xff08;六&#xff09;if和switch的选择 三、循…

Megatron-LM GPT 源码分析(一) Tensor Parallel分析

引用 本文基于开源代码 https://github.com/NVIDIA/Megatron-LM &#xff0c;通过GPT的模型运行示例&#xff0c;从三个维度 - 模型结构、代码运行、代码逻辑说明 对其源码做深入的分析。 Tensor Parallel源码分析

uniapp(uncloud) 使用生态开发接口详情4(wangeditor 富文本, 云对象, postman 网络请求)

wangeditor 官网: https://www.wangeditor.com/v4/pages/01-%E5%BC%80%E5%A7%8B%E4%BD%BF%E7%94%A8/01-%E5%9F%BA%E6%9C%AC%E4%BD%BF%E7%94%A8.html 这里用vue2版本,用wangeditor 4 终端命令: npm i wangeditor --save 开始使用 在项目pages > sy_news > add.vue 页面中…

免密码方式获取Hive元数据

前言 开发中可能用到hive的元数据信息 ,如获取hive表列表、hive表字段、hive表数据量大小、hive表文件大小等信息,要想获取hive元数据信息即需要hive元数据库的账号及密码,此次提供的是一种不需要hive元数据库密码及可获取元数据信息的方式,且此种方式是只读 组件:hive …

程序员必备的IP查询工具

shigen坚持日更的博客写手&#xff0c;擅长Java、python、vue、shell等编程语言和各种应用程序、脚本的开发。坚持记录和分享从业两年以来的技术积累和思考&#xff0c;不断沉淀和成长。 hello&#xff0c;今天shigen给大家分享一下如何优雅的查询IP的工具。我们先看一下效果&a…

CSS的美化(文字、背景) Day02

一、文字控制属性 分为&#xff1a;字体样式属性 、文本样式属性 1.1 CSS字体样式属性 1.color定义元素内文字颜色2.font-size 字号大小3 font-family 字体4 font-weight 字体粗细5.font-style 字体风格6.font 字体综合属性 1.1.1 > 文字颜色 color 属性名: color color …

序列化和反序列化指令在PLC通信上的应用

在了解本篇博客之前,大家可以熟悉下序列化指令的相关介绍,详细内容如下: 博途PLC 1200/1500 PLC 序列化和反序列化指令编程应用_博图序列化和反序列化-CSDN博客序列化最重要的作用:在传递和保存对象时.保证对象的完整性和可传递性。对象转换为有序字节流,以便在网络上传输…

在 Python 脚本中设置环境变量

环境变量是与系统进程交互的一种深入方式&#xff1b; 它允许用户获得有关系统属性、路径和已经存在的变量的更详细信息。 我们如何使用环境变量 如上所述&#xff0c;环境变量促使我们与系统进程进行交互。 我们可以使用环境变量来访问系统中的所有变量和键。 为此&#xff…

6 个可解锁部分 GPT-4 功能的 Chrome 扩展(无需支付 ChatGPT Plus 费用)

在过去的几个月里&#xff0c;我广泛探索了 ChatGPT 的所有可用插件。在此期间&#xff0c;我发现了一些令人惊叹的插件&#xff0c;它们改进了我使用 ChatGPT 的方式&#xff0c;但现在&#xff0c;我将透露一些您需要了解的内容。 借助 Chrome 扩展程序&#xff0c;所有 Chat…

Windows:VS Code IDE安装ESP-IDF【保姆级】

物联网开发学习笔记——目录索引 Visual Studio Code&#xff08;简称“VS Code”&#xff09;是Microsoft向开发者们提供的一款真正的跨平台编辑器。 参考&#xff1a; VS Code官网&#xff1a;Visual Studio Code - Code Editing. Redefined 乐鑫官网&#xff1a;ESP-IDF…

分类算法-逻辑回归与二分类

1、逻辑回归的应用场景 广告点击率是否为垃圾邮件是否患病金融诈骗虚假账号 看到上面的例子&#xff0c;我们可以发现其中的特点&#xff0c;那就是都属于两个类别之间的判断。逻辑回归就是解决二分类问题的利器。 2、 逻辑回归的原理 2.1 输入 逻辑回归的输入就是一个线性…

python二次开发CATIA:CATIA Automation

CATIA 软件中有一套逻辑与关系都十分严谨的自动化对象&#xff0c;它们从CATIA(Application)向下分支。每个自动化对象&#xff08;Automation Object&#xff0c;以下简称Object&#xff09;都有各自的属性与方法。我们通过程序语言调用这些 Object 的属性与方法&#xff0c;便…

C语言 内存

内存分配 内存分配的类型 C/C中内存分为5个区&#xff0c;分别为栈区、堆区、全局/静态存储区、常量存储区、代码区 静态内存分配&#xff1a;编译时分配&#xff0c;包括全局、静态全局、静态局部三种变量。 动态内存分配&#xff1a;运行时分配&#xff0c;包括栈&#x…

SVM支持向量机

定义 支持向量机&#xff08;SVM&#xff09;&#xff0c;Supported Vector Machine,基于线性划分&#xff0c;输出一个最优化的分隔超平面&#xff0c;该超平面不但能将两类正确分开,且使分类间隔(margin)最大 **所有训练数据点距离最优分类超平面的距离都要大于支持向量距离…

【docker】查看容器日志

目录 一.通过查找宿主机日志路径&#xff0c;通过Linux命令查看即可。 1.1 查看容器日志路径 1.2 按照日志路径检索日志 二、通过docker命令检索日志 2.1 查看指定时间后的日志&#xff0c;只显示最后20行 2.2 查看最近10分钟的日志 2.3 查看某时间段之后的日志 2.4 查…

SpringCloud-Nacos

一、介绍 &#xff08;1&#xff09;作为服务注册中心和配置中心 &#xff08;2&#xff09;等价于&#xff1a;EurekaConfigBus &#xff08;3&#xff09;nacos集成了ribbon&#xff0c;支持负载均衡 二、安装 &#xff08;1&#xff09;官网 &#xff08;2&#xff09; …

【算法设计zxd】第6章 回溯法

目录 6.1 回溯法的设计技术 &#xff1a; 四皇后问题 回溯法&#xff1a; 算法框架&#xff1a; 思考题&#xff1a; 回溯算法的适用条件 【例6-1】求满足下列不等式的所有整数解&#xff1a; 6.2回溯算法的经典例题 【例6-2】装载问题  问题分析 计算模型  算法设计与描…

网络安全常见问题隐患及其应对措施

随着数字化时代的到来&#xff0c;网络安全已经成为组织和个人面临的严重挑战之一。网络攻击日益普及&#xff0c;黑客和不法分子不断寻找机会侵入系统、窃取敏感信息、破坏服务和网络基础设施。在这种情况下&#xff0c;了解网络安全的常见问题隐患以及如何应对它们至关重要。…