改造muduo,不依赖boost,用C++11重构

组件的实现

1. 序

1.1. 总述

        muduo库是基于多Reactor-多线程模型实现的TCP网络编程库,性能良好。如libev作者:“One loop per thread is usually a good model”,muduo库的作者陈硕在其《Linux多线程服务端编程》中也力荐这种“One loop per thread”的IO模型,使我们仅需要关注EventLoop的设计与实现,然后每个线程run一个loop即可。不过由于当时C++11并没有进入实用,在这一书中,作者没有谈及C++11,整个muduo库的实现,也依赖了boost库。

        而在项目设计与实现中,按照C++11标准对muduo库中核心部分进行重写,主要涉及了以下模块:Channel、Poller、EventLoop、Thread、EventLoopThread、EventLoopThreadPool、Socket、Acceptor、Buffer、TcpConnection、TcpServer,下面将进行分述。

1.2. One loop per thread

        在多Reactor-多线程模型中,运用one loop per thread的思想,由一个mainReactor负责accept连接,然后把该连接挂载到某个subReactor,多个连接分配到多个线程,充分利用CPU。

2. 核心部分

        在手写muduo库项目之中,存在三个核心部分,分别是Channel类、Poller类和EventLoop类,这三大类的组合,实现了reactor用以监听fd并同时处理相应的回调函数。其中Poller和Channel之间通过EventLoop相互通信。

2.1. Channel
  1. fd_:封装sockfd,两种Channel:listenfd-acceptorChannel,connfd-ConnectionChannel;
  2. events_:fd监听的事件类型;
  3. revents_:Poller返回的具体监听到的事件。
  4. callback:上层设置的各种类型事件回调;
  5. tie_:weak_ptr<void>,在事件监听器返回监听结果后,就会调用Channel中的handleEvent()函数。首先会把tie_这个weak_ptr提升为shared_ptr,它会指向当前的TcpConnection对象,即使外面调用了删除析构了其他所有指向该TcpConnection的智能指针,只要没有handleEvent()完,这个TcpConnection都不会被析构释放堆内存。
2.2. Poller/EpollPoller

        muduo库提供poll和epoll两种IO多路复用方法来实现事件监听,重写时,通过基类Poller和派生类EpollPoller,支持了Epoll。Poller主要扮演Reactor模型中Demultiplex事件分发器(也可以说是事件监听器)的角色。

  1. epollfd_:记录epoll_create返回的句柄
  2. channels_:用来记录注册在其上的Channel的unordered_map。
2.3. EventLoop

        EventLoop扮演Reactor模型中Reactor的角色,是对epoll的封装。EventLoop在epoll_create,注册各个Channel之后,处于epoll_wait阻塞状态,要想唤醒当前的EventLoop去执行新的连接,通过往wakefd上写入一个字符,唤醒当前的EventLoop。(而并非生产者-消费者模型)。

  1. 包含了所有的Channel
  2. 每一个loop都有一个wakeupFd
2.4. 具体方法的部分代码实现
  • EventLoop::loop()——开启事件循环
// 开启事件循环
void EventLoop::loop()
{// ...   while(!quit_){activeChannels_.clear();// 监听两种fd: client的fd、wakeupfdpollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // epoll_wait发生的位置for(Channel *channel : activeChannels_){   // Poller监听哪些Channel发生事件了,然后上报给EventLoop,EventLoop通知处理相应的事件// handleEevent根据具体事件类型调用相应类型的回调函数channel->handleEvent(pollReturnTime_);}// ...}LOG_INFO("EventLoop %p stop looping. \n", this);// ...
}
  • EpollPoller::poll()——开启Poller事件监听,调用了::epoll_wait()
// 通过epoll_wait监听哪些Channel/fd发生事件
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{// ...int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);int savedErrno = errno;Timestamp now(Timestamp::now());if(numEvents > 0)   // 有事件发生{LOG_DEBUG("%d events happened \n", numEvents);fillActiveChannels(numEvents, activeChannels);if(numEvents == events_.size()){events_.resize(events_.size() * 2);}}else if(numEvents == 0) // 超时{LOG_DEBUG("%s timeout! \n", __FUNCTION__);}else{if(savedErrno != EINTR){errno = savedErrno;LOG_ERROR("EPollPoller::poll() err! \n");}}return now;
}
  • 唤醒机制——通过向eventfd写一个数据

        在Linux操作系统上,可以通过三种方式唤醒fd:1. 通过管道pipe向绑定到epollfd的一端写一个字节;2. 使用Linux内核2.6版本之后的eventfd;3. 使用socketpair。而在本项目中,采用的是创建eventfd然后在需要唤醒的时候写数据(8个字节)来唤醒subLoop。

// 创建wakeupfd,用来notify唤醒subReactor处理新的Channel
//O_CLOEXEC避免文件描述符被继承到子进程中
int createEventFd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);// ...return evtfd;
}// 用于唤醒loop所在线程: 向wakefd写一个数据
//wakeupFd_在构造函数中通过createsEventFd()函数初始化
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = write(wakeupFd_, &one, sizeof(one));if(n != sizeof(one)){LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);}
}

3. 其他部分

3.1. EventLoopThreadPool

        EventLoopThreadPool类,可以理解为subLoop池,主要是对EventLoopThread的封装,而EventLoopThread又是对EventLoop(Reactor)和Thread(记录线程的详细信息)的封装。

        其中,初始化时,会提供一个baseLoop(mainLoop)来进行基本的事件循环。通过设置numthreads_来创建对应数量的subReactor,每当创建一个线程,就会生成一个EventLoop。

        在工作方式上,通过getNextLoop()方法,实现对subReactor的轮询。

// ...
class EventLoopThreadPool : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback &cb = ThreadInitCallback());// 如果工作在多线程中,baseLoop默认以轮询的方式分配Channel给subLoopEventLoop *getNextLoop();std::vector<EventLoop *> getAllLoops();bool started() const { return started_; }const std::string &name() const { return name_;}private:EventLoop *baseLoop_;   //EventLoop loop 用户线程std::string name_;bool started_;int numThreads_;int next_;std::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop *> loops_;
};
3.2. Acceptor

        Acceptor类,封装的是服务器监听socketfd和相关处理函数。接收新用户连接后,通过轮询来选择subReactor并给它分发连接。

3.3. TcpConnection

        每个连接进来的客户端,对应一个TcpConnection,封装了一个connfd,一个Channel,各种回调函数(Callback)和读写缓冲区(Buffer)。

        state_:记录当前连接状态,一共有四种:kConnected、kConnecting、kDisconnecting、kDisconnected。

整个TcpConnection的工作流程

  1. TcpServer通过Acceptor监听用户新连接,用accept拿到connfd
  2. TcpConnection设置回调给Channel,Channel注册到Poller
  3. Poller监听到事件就通知调用Channel的回调
3.4. Buffer

        Buffer缓冲区通过vector来实现,空间不足时,通过vector类的成员函数resize()即可实现扩容。在空间的设计上,主要分为如下图三个区域(和Netty中Buffer的设计类似?)

3.5. TcpServer

        在TcpServer类中,有一个Acceptor,一个EventLoopThreadPool,一些回调函数,一个记录所有连接的unordered_map<string, TcpConnectionPtr>。

// 对外服务器编程需要使用的类
class TcpServer : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg,Option option = kNoReusePort);~TcpServer();void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }// 设置subLoop个数void setThreadNum(int numThreads);// 开启服务器监听void start();
private:void newConnection(int sockfd, const InetAddress &peerAddr);void removeConnection(const TcpConnectionPtr &conn);void removeConnectionInLoop(const TcpConnectionPtr &conn);using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;EventLoop *loop_;   // baseLoopconst std::string ipPort_;const std::string name_;std::unique_ptr<Acceptor> acceptor_;    // 运行在mainLoop,监听新连接事件std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per threadConnectionCallback connectionCallback_;         // 有新连接时的回调MessageCallback messageCallback_;               // 有读写消息时的回调WriteCompleteCallback writeCompleteCallback_;   // 消息发送完成后的回调ThreadInitCallback threadInitCallback_;         // loop线程初始化的回调std::atomic_int started_; int nextConnId_;ConnectionMap connections_; // 保存连接的HashMap
};

        start():启动EventLoopThreadPool,调用acceptor_的listen()方法,监听客户端的连接套接字。

        newConnection():该方法被注册到了acceptor_中,当acceptor_监听到新用户连接时会执行该回调,轮询选择一个subReactor;根据连接成功的sockfd,创建一个连接对象并加入到TcpServer的存储连接信息的connections_中;给这个连接设置回调;然后在mainLoop执行connectEstablished();

        上面提到的关闭连接的回调函数,真实的调用过程:TcpConnection::setCloseCallBack() --> TcpServer::removeConnection() --> TcpServer::removeConnectionInLoop() --> TcpConnection::connectionDestroyed()

4. 工作流程

4.1. 安装

下载到文件夹后,sudo ./autobuild.sh,运行编译和安装脚本,相关头文件也会添加到系统路径。

4.2. 测试代码

下面的内容是一个回射服务器,可以编译运行后,使用telnet、netcat等工具进行简单测试。

#include <ee_muduo_cpp11/TcpServer.h>
#include <ee_muduo_cpp11/Logger.h>#include <string>
#include <functional>class EchoServer
{
public:EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name): server_(loop, addr, name), loop_(loop){// 注册回调函数server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, std::placeholders::_1));server_.setMessageCallback(std::bind(&EchoServer::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置合适的loop线程数量 loopthreadserver_.setThreadNum(3);}void start(){server_.start();}
private:// 连接建立或者断开的回调void onConnection(const TcpConnectionPtr &conn){if (conn->connected()){LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());}else{LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());}}// 可读写事件回调void onMessage(const TcpConnectionPtr &conn,Buffer *buf,Timestamp time){std::string msg = buf->retrieveAllAsString();conn->send(msg);conn->shutdown(); // 写端   EPOLLHUP =》 closeCallback_}EventLoop *loop_;TcpServer server_;
};int main()
{EventLoop loop;InetAddress addr(8000);EchoServer server(&loop, addr, "EchoServer"); // Acceptor non-blocking listenfd  create bind server.start(); // listen  loopthread  listenfd => acceptChannel => mainLoop =>loop.loop(); // 启动mainLoop的底层Pollerreturn 0;
}
4.3. 工作流程

对于整个库:

  1. 用户创建mainLoop,主线程作为mainReactor,主要用来接收/断开用户连接。
  2. 给TcpServer设置连接和读写事件回调,TcpServer再给TcpConnection设置回调(用户设置的),TcpConnection再给Channel设置回调(先执行这个,再执行用户回调)。
  3. TcpServer根据用户设置传入的线程数,去ThreadPool中开启几个线程。如果没有设置,mainLoop还要负责读写事件的任务。
  4. 当有新连接进来,创建一个TcpConnection,然后由Acceptor轮询唤醒subLoop来提供服务。
  5. 每个subLoop在服务时,其所包含的Poller没有事件就会处于循环阻塞状态,发生事件之后,根据类型再去执行相应的回调操作。

5. 参考资料

  • 《高性能服务结构设计思想——one-thread-one-loop》,张小方,CppGuide,05. 高性能服务结构设计思想——one-thread-one-loop
  • 《Linux多线程服务器编程:使用muduo C++网络库》,陈硕
  • 《Muduo网络库源代码分析:EventLoopThread和EventLoopThreadPool的封装》,blfbuaa,https://www.cnblogs.com/blfbuaa/p/7263398.html
  • 《图解操作系统》,小林coding,https://mp.weixin.qq.com/mp/appmsgalbum?action=getalbum&__biz=MzUxODAzNDg4NQ==&scene=1&album_id=1408057986861416450&count=3#wechat_redirect

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/269380.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用生成式人工智能探索视频博客的魅力?

视频博客&#xff0c;尤其是关于旅游的视频博客&#xff0c;为观众提供了一种全新的探索世界的方式。通过图像和声音的结合&#xff0c;观众可以身临其境地体验到旅行的乐趣和发现的喜悦。而对于内容创作者来说&#xff0c;旅游视频博客不仅能分享他们的旅行故事&#xff0c;还…

【教程】APP开发后如何上架?

摘要 本文介绍了移动应用程序&#xff08;APP&#xff09;开发后如何上架的步骤和注意事项。内容包括选择合适的应用商店、遵循应用商店的规则和政策、准备上架所需材料、创建开发者账号、提交APP并等待审核等环节&#xff0c;以及上架成功后的推广和维护工作。 引言 移动应…

自动化测试基础——allure下载安装及配置及pytest + allure-pytest插件生成allure企业级测试报告及企业级定制

文章目录 前言一、allure下载二、allure安装三、allure目录介绍四、allure环境变量配置五、pytest allure-pytest插件生成allure企业级测试报告六、allure企业级报告的log定制七、allure企业级报告功能内容定制1.功能左边层级定制2.功能右边优先级定制3.功能右边测试用例描述定…

用HTML5的<canvas>元素实现刮刮乐游戏

用HTML5的&#xff1c;canvas&#xff1e;元素实现刮刮乐游戏 用HTML5的<canvas>元素实现刮刮乐&#xff0c;要求&#xff1a;将上面的“图层”的图像可用鼠标刮去&#xff0c;露出下面的“图层”的图像。 示例从简单到复杂。 简单示例 准备两张图像&#xff0c;我这…

java 版本企业招标投标管理系统源码+功能描述+tbms+及时准确+全程电子化

功能描述 1、门户管理&#xff1a;所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含&#xff1a;招标公告、非招标公告、系统通知、政策法规。 2、立项管理&#xff1a;企业用户可对需要采购的项目进行立项申请&#xff0c;并提交审批&#xff0c;查看所…

使用GitOps自动化推动AI/ML工作流程

作为一名深耕自动化和人工智能领域的开发人员&#xff0c;我们逐渐认识到尖端工具和方法之间的显着协同作用&#xff0c;这些协同作用突破了可能性的界限。在这次探索中&#xff0c;我们想分享一个概念&#xff0c;它不仅彻底改变了我们的软件开发和基础设施管理方法&#xff0…

cmd模式下启动mysql

1.打开cmd输入services.msc&#xff0c;找到MYSQL&#xff0c;右击属性&#xff0c;找到可执行文件路径&#xff0c;加载到环境变量。 2.打开cmd&#xff0c;启动MYSQL&#xff1a;输入net start mysql; 3.登陆MYSQL&#xff0c;需要管理权限&#xff1b; 输入&#xff1a;my…

java实现图片转pdf,并通过流的方式进行下载(前后端分离)

首先需要导入相关依赖&#xff0c;由于具体依赖本人也不是记得很清楚了&#xff0c;所以简短的说一下。 iText&#xff1a;PDF 操作库&#xff0c;用于创建和操作 PDF 文件。可通过 Maven 或 Gradle 引入 iText 依赖。 MultipartFile&#xff1a;Spring 框架中处理文件上传的类…

【深蓝学院】移动机器人运动规划--第7章 集群机器人运动规划--笔记

文章目录 0. Contents1. Multi-Agent Path Finding (MAPF)1.1 HCA*1.2 Single-Agent A*1.3 ID1.4 M*1.5 Conflict-Based Search(CBS)1.6 ECBS1.6.1 heuristics1.6.2 Focal Search 2. Velocity Obstacle (VO&#xff0c;速度障碍物)2.1 VO2.2. RVO2.3 ORCA 3. Flocking model&am…

金三银四,程序员如何备战面试季

金三银四&#xff0c;程序员如何备战面试季 一个人简介二前言三面试技巧分享3.1 自我介绍 四技术问题回答4.1 团队协作经验展示 五职业规划建议5.1 短期目标5.2 中长期目标 六后记 一个人简介 &#x1f3d8;️&#x1f3d8;️个人主页&#xff1a;以山河作礼。 &#x1f396;️…

Golang 程序启动原理详解

一.编译 go源代码首先要通过 go build 编译为可执行文件,然后去机器上直接执行的&#xff0c;在 linux 平台上为 ELF 格式的可执行文件&#xff0c;linux 能直接执行这个文件,而编译阶段会经过编译器、汇编器、链接器三个过程最终生成可执行文件 编译器&#xff1a;*.go 源码通…

23.基于springboot + vue实现的前后端分离-在线旅游网站系统(项目 + 论文PPT)

项目介绍 本旅游网站系统采用的数据库是MYSQL &#xff0c;使用 JSP 技术开发&#xff0c;在设计过程中&#xff0c;充分保证了系统代码的良好可读性、实用性、易扩展性、通用性、便于后期维护、操作方便以及页面简洁等特点。 技术选型 后端: SpringBoot Mybatis 数据库 : MyS…

视频生成模型Sora的全面解析:从AI绘画、ViT到ViViT、DiT、VDT、NaViT、VideoPoet

前言 真没想到&#xff0c;距离视频生成上一轮的集中爆发(详见《Sora之前的视频生成发展史&#xff1a;从Gen2、Emu Video到PixelDance、SVD、Pika 1.0》)才过去三个月&#xff0c;没想OpenAI一出手&#xff0c;该领域又直接变天了 自打2.16日OpenAI发布sora以来(其开发团队包…

第十七天-反爬与反反爬-验证码识别

目录 反爬虫介绍 基于身份识别反爬和解决思路 Headers反爬-使用User-agent Headers反爬-使用coookie字段 Headers反爬-使用Referer字段 基于参数反爬 验证码反爬 1.验证码介绍 2.验证码分类&#xff1a; 3.验证码作用 4.处理方案 5.图片识别引擎:ocr 6.使用打码平…

AWTK 开源串口屏开发(11) - 天气预报

# AWTK 开源串口屏开发 - 天气预报 天气预报是一个很常用的功能&#xff0c;在很多设备上都有这个功能。实现天气预报的功能&#xff0c;不能说很难但是也绝不简单&#xff0c;首先需要从网上获取数据&#xff0c;再解析数据&#xff0c;最后更新到界面上。 在 AWTK 串口屏中…

如何在jupyter notebook 中下载第三方库

在anconda 中找到&#xff1a; Anaconda Prompt 进入页面后的样式&#xff1a; 在黑色框中输入&#xff1a; 下载第三方库的命令 第三方库&#xff1a; 三种输入方式 标准保证正确 pip instsall 包名 -i 镜像源地址 pip install pip 是 Python 包管理工具&#xff0c;…

牛客练习赛122

D:圆 正着求删除的最小代价不好做&#xff0c;采用逆向思维&#xff0c;求选择一些不相交的线段使得构成一个圆的代价尽量大&#xff0c;最后答案就是所有线段权值之和减去最大代价。 那么如何求这个最大代价呢&#xff1f;显然区间DP 老套路&#xff1a;破环成链&#xff0…

Java实现手机库存管理

一、实验任务 编写一个程序&#xff0c;模拟库存管理系统。该系统主要包括系统首页、商品入库、商品显示和删除商品功能。每个功能的具体要求如下&#xff1a; 1.系统的首页&#xff1a;用于显示系统所有的操作&#xff0c;并且可以选择使用某一个功能。 2.商品入库功能&…

Java 数据结构篇-深入了解排序算法(动态图 + 实现七种基本排序算法)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 实现冒泡排序 2.0 实现选择排序 2.1 选择排序的改良升级 3.0 实现堆排序 4.0 实现插入排序 5.0 实现希尔排序 6.0 实现归并排序 6.1 递归实现归并排序 6.2 使用…

用FPGA CORDIC IP核实现信号的相位检测,计算相位角

用FPGA CORDIC IP核实现信号的相位检测 1.matlab仿真 波形仿真代码&#xff1a; 代码功能&#xff1a;生成一个点频信号s&#xff0c;求出s的实部和虚部&#xff1b;并且结算相位角atan2。画出图形&#xff0c;并且将Q和I数据写入文件中。 %代码功能&#xff1a;生成一个点…