网络编程(16)——asio多线程模型IOServicePool

目录

十六、day16

1. 什么是多线程?

2. IOServicePool实现

3. 服务器修改

4. 客户端修改

5. 总结

1. boost::asio::io_context::work的作用?


十六、day16

在之前的设计中,我们对 ASIO 的使用都是采用单线程模式。为了提升网络 I/O 并发处理的效率,这次我们设计了在多线程模式下使用 ASIO 的方法。总体而言,ASIO 有两种多线程模型

  • 启动多个线程,每个线程管理一个独立的 io_context。
  • 启动一个 io_context,由多个线程共享。

在后续文章中,我们会对比这两种模式的区别。这里我们先介绍第一种模式,即多个线程,每个线程管理一个独立的 io_context 服务。

1. 什么是多线程?

之前在完善消息节点的章节学习过asio服务器底层通信的流程,它是基于单线程运行的,可参考

知乎用户​www.zhihu.com/people/zhi-chi-tian-ya-10-23/posts

单线程模式

而今天将设计IOServicePool类型的多线程模型,如下图所示

多线程模式

IOServicePool 服务池中,IOServicePool 类会根据系统的 CPU 核数创建相应数量的 io_context 实例,并将每个 io_context 运行在一个独立的线程中。例如,如果系统有两个 CPU 核,就会有两个独立的线程分别运行各自的 io_context。io_context 是一个调度器,用于管理异步事件。例如,对于 Session1 会话,如果想在线程 1 上注册一个读事件,可以通过 async_read 将读事件注册到 io_context1 中,这样它的回调函数就会在线程 1 中执行。同样,线程 2 也是独立运行的,并处理它对应的 io_context 的事件。

IOServicePool多线程模式特点:

  • 每个 io_context 都在独立的线程中运行,因此同一个 socket 会被注册在同一个 io_context 上,它的回调函数也会在同一个线程中执行。这样,对于同一个 socket 来说,每次回调函数触发都会在同一个线程中执行,从而避免了线程安全问题,确保网络 I/O 层面的并发是线程安全的。
  • 但是,对于不同的 socket,回调函数的触发可能会在同一个线程中(如果两个 socket 被分配到同一个 io_context),也可能在不同的线程中(如果两个 socket 分配到不同的 io_context)。多个socket由同一个ioc调度的话,不会发生逻辑安全或线程问题,但如果不同的socket由不同的ioc调度,那么可能会发生安全问题。比如,两个 socket 对应的上层逻辑有交互或共享数据,就可能存在线程安全问题。如果 socket1 代表玩家1,socket2 代表玩家2,而这两个玩家在逻辑层面上有交互(如同属一个工会并且共同完成任务),则涉及的工会积分是共享的数据区域,需要保证线程安全。可以通过加锁或使用逻辑队列来解决这个问题,目前我们采用的是逻辑队列的方法。
  • 与单线程相比,多线程显著提高了并发能力。在单线程模式下,只有一个 io_context 来监听读写事件,事件就绪后回调函数在同一个线程中串行执行,如果一个回调函数执行时间较长,会影响后续的回调函数。而在多线程模式下,可以在一定程度上减少一个逻辑调用对下一个调用的影响。例如,如果两个 socket 被分配到不同的 io_context 上,它们的回调就不会互相影响。但如果两个 socket 分配到同一个 io_context,仍然可能有调用时间的影响。不过,我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦,从而避免了前一个调用时间影响下一个回调触发的问题。

2. IOServicePool实现

IOServicePool本质上是一个线程池,基本功能就是根据构造函数传入的数量创建n个线程和iocontext,然后每个线程跑一个iocontext,这样就可以并发处理不同iocontext读写事件了

a. IOServicePool.h

#pragma once
#include "Singleton.h"
#include <boost/asio.hpp>
#include <vector>class AsioIOServicePool : public Singleton<AsioIOServicePool>
{friend Singleton<AsioIOServicePool>;
public:using IOService = boost::asio::io_context;using Work = boost::asio::io_context::work; // work的作用?using WorkPtr = std::unique_ptr<Work>; // 希望该work不会被拷贝,只能移动或者从头用到尾不被改变~AsioIOServicePool();AsioIOServicePool(const AsioIOServicePool&) = delete;AsioIOServicePool& operator = (const AsioIOServicePool&) = delete;// 使用round-robin 的方式返回一个io_contextboost::asio::io_context& GetIOService();void Stop();
private:AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); // hardware_concurrency获取CPu核数std::vector<IOService> _ioServices;std::vector<WorkPtr> _works;std::vector<std::thread> _threads;// 通过轮询返回ioc时,需要记录当前ioc的下标,累加,当超过vector的size时就归零,然后继续按轮询的方式返回// 记录ioc在vector的下标std::size_t _nextIOService;
};
  • IOServicePool也是单例模式,有且仅有唯一实例
  • IOService :io_context
  • Work :用于绑定ioc,避免ioc.run()提前返回, work的详细作用请看文章末的总结部分
  • WorkPtr :使用unique_ptr管理work,希望该work不会被拷贝,只能移动或者从头用到尾不被改变
  • _ioServices:存储指定数量的ioc
  • _works:存储与ioc数量对应的work
  • _threads:存储指定数量的线程
  • _nextIOService:记录ioc在vector的下标,通过轮询返回ioc时,需要记录当前ioc的下标,累加,当超过vector的size时就归零,然后继续按轮询的方式返回

b. IOServicePool构造函数

AsioIOServicePool::AsioIOServicePool(std::size_t size) : _ioServices(size), _works(size), _nextIOService(0) {for (std::size_t i = 0; i < size; i++) {_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));}// 遍历多个ioservice,创建多个线程,每个线程内部启动ioservicefor (std::size_t i = 0; i < _ioServices.size(); i++) {_threads.emplace_back([this, i]() {_ioServices[i].run();});}
}

size的默认值是std::thread::hardware_concurrency(),该函数用于获取CPU核数。如果不主动更改size,那么IOServicePool会构造数量等于CPU核数的上下文服务、work和线程。

因为work通过std::unique_ptr进行管理,所以下面这段代码是错的,因为std::unique_ptr 不允许将一个普通指针直接赋值给另一个 std::unique_ptr, std::unique_ptr是独占有权的。

auto unptr = std::unique_ptr<Work>(new Work(_ioServices[i]));
_works[i] = unptr;

但是,可以通过移动语义将自动将创建的 unique_ptr 的所有权转移到 _works[i] ,实际上是在 _works[i] 中创建或替换一个 unique_ptr。

std::unique_ptr 不允许复制(即同一对象不能被多个 unique_ptr 同时拥有),但支持移动操作。使用 std::unique_ptr 时,_works[i] 直接接收新创建的 unique_ptr,所有权被有效地转移。

可以将unique_ptr作为右值赋值给另一个unique_ptr

_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));

注意,Work(ioc_context)是asio库的函数,用于将work与ioc进行绑定,避免ioc.run()返回。原型为

boost::asio::io_context::work::work(boost::asio::io_context& io_context)

最后,遍历多个ioservice,创建多个线程,每个线程内部启动ioservice。

c. GetIOService()

boost::asio::io_context& AsioIOServicePool::GetIOService() {auto& service = _ioServices[_nextIOService++];if (_nextIOService == _ioServices.size())_nextIOService = 0;return service;
}

该段代码用于从ioc存储容器_ioServices中获取io_context&,其中_nextIOService为索引,轮询获取io_context&

d. Stop()

void AsioIOServicePool::Stop(){for (auto& work : _works) {work.reset();}for (auto& t : _threads) {t.join();}
}

同样我们要实现Stop函数,控制AsioIOServicePool停止所有ioc的工作,并等待所有线程结束。因为我们要保证每个线程安全退出后再让AsioIOServicePool停止。

3. 服务器修改

a. void CServer::start_accept()

void CServer::start_accept() {auto& ioc = AsioIOServicePool::GetInstance()->GetIOService();// make_shared分配并构造一个 std::shared_ptr,_ioc, this是传给Session的参数std::shared_ptr<CSession> new_session = std::make_shared<CSession>(ioc, this);// 开始一个异步接受操作,当new_session的socket与客户端连接成功时,调用回调函数handle_accept_acceptor.async_accept(new_session->Socket(), std::bind(&CServer::handle_accept, this, new_session,std::placeholders::_1));
}

该段函数在CServer实例化的时候被CServer构造函数调用,使服务器启动异步接收(相当于异步读,之前的代码不需要work的原因是因为ioc.run()是在CServer实例化后运行的,start_accept()函数会执行异步接收操作,相当于异步读注册给ioc,ioc.run不会返回),等待客户端连接。

之前的代码中,new_session使用的ioc是acceptor绑定的ioc,该ioc负责异步接收、异步读和写。但是在多线程模式中,该ioc只需要执行异步接收操作,而异步读写通过从AsioIOServicePool池中获取的ioc运行。

std::shared_ptr<CSession> new_session = std::make_shared<CSession>(_ioc, this); // 修改前
std::shared_ptr<CSession> new_session = std::make_shared<CSession>(ioc, this); // 修改后

b. AsyncServer_MsgNode.cpp

主函数也需要修改,因为现在的ioc不止用于执行异步接受,还有线程池中的ioc,所以需要将二者均stop

int main()
{try {auto pool = AsioIOServicePool::GetInstance();boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);// 必须异步等待,否则建立线程进行处理signals.async_wait([&ioc, pool](const boost::system::error_code& error, int signal_number) {if (!error) {std::cout << "Signal " << signal_number << " received." << std::endl;ioc.stop();  // 停止 io_contextpool->Stop();}});CServer s(ioc, 10086);ioc.run();}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << '\n';}boost::asio::io_context io_context;
}

4. 客户端修改

#include <boost/asio.hpp>
#include <iostream>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>
#include <chrono>
#include <thread>using namespace boost::asio::ip;
using std::cout;
using std::endl;
const int MAX_LENGTH = 1024 * 2; // 发送和接收的长度为1024 * 2字节
const int HEAD_LENGTH = 2;
const int HEAD_TOTAL = 4;std::vector<std::thread> vec_threads;int main()
{auto start = std::chrono::high_resolution_clock::now();for (int i = 0; i < 50; i++) { //建立100个线程vec_threads.emplace_back([]() {try {boost::asio::io_context ioc; // 创建上下文服务// 127.0.0.1是本机的回路地址,也就是服务器和客户端在一个机器上tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086); // 构造endpointtcp::socket sock(ioc);boost::system::error_code error = boost::asio::error::host_not_found; // 错误:主机未找到sock.connect(remote_ep, error);if (error) {cout << "connect failed, code is " << error.value() << " error msg is " << error.message();return 0;}int i = 0;while(i++ < 200) {Json::Value root;root["id"] = 1001;root["data"] = "hello world";std::string request = root.toStyledString();size_t request_length = request.length();char send_data[MAX_LENGTH] = { 0 };int msgid = 1001;int msgid_host = boost::asio::detail::socket_ops::host_to_network_short(msgid);memcpy(send_data, &msgid_host, 2);//转为网络字节序int request_host_length = boost::asio::detail::socket_ops::host_to_network_short(request_length);memcpy(send_data + 2, &request_host_length, 2);memcpy(send_data + 4, request.c_str(), request_length);boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 4));char reply_head[HEAD_TOTAL]; // 首先读取对端发送消息的总长度size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_TOTAL));msgid = 0;memcpy(&msgid, reply_head, HEAD_LENGTH);short msglen = 0; // 消息总长度memcpy(&msglen, reply_head + 2, HEAD_LENGTH); // 将消息总长度赋值给msglen//转为本地字节序msglen = boost::asio::detail::socket_ops::network_to_host_short(msglen);char msg[MAX_LENGTH] = { 0 }; // 构建消息体(不含消息总长度)size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));Json::Reader reader;reader.parse(std::string(msg, msg_length), root);std::cout << "msg id is " << root["id"] << " msg is " << root["data"] << endl;}}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}});std::this_thread::sleep_for(std::chrono::seconds(1));for (auto& t : vec_threads) {t.join();}auto end = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);cout << "Time spent: " << duration.count() << " microsencods" << endl;}return 0;
}

5. 总结

1. boost::asio::io_context::work的作用?

在实际使用中,我们通常会将一些异步操作提交给io_context进行处理,然后该操作会被异步执行,而不会立即返回结果。如果没有其他任务需要执行,那么io_context就会停止工作,导致所有正在进行的异步操作都被取消。这时,我们需要使用boost::asio::io_context::work对象来防止io_context停止工作。

boost::asio::io_context::work的作用是持有一个指向io_context的引用,并通过创建一个“工作”项来保证io_context不会停止工作,直到work对象被销毁或者调用reset()方法为止。当所有异步操作完成后,程序可以使用work.reset()方法来释放io_context,从而让其正常退出。

在之前的代码中,ioc不会被阻塞是因为我们已经提前给ioc注册了一个读事件(acceptor通过async_accept注册了一个读事件监听对端连接,而acceptor又绑定了此io_context),所以此时的ioc不会退出。

        boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);// 必须异步等待,否则建立线程进行处理signals.async_wait([&ioc](const boost::system::error_code& error, int signal_number) {if (!error) {std::cout << "Signal " << signal_number << " received." << std::endl;ioc.stop();  // 停止 io_context}});CServer s(ioc, 10086);ioc.run();CServer::CServer(boost::asio::io_context& ioc, short port) : _ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {cout << "Server start success, on port: " << port << endl;// 开始异步地接受客户端连接请求。服务器启动后就进入等待客户端连接的状态start_accept();
}void CServer::start_accept() {// make_shared分配并构造一个 std::shared_ptr,_ioc, this是传给Session的参数std::shared_ptr<CSession> new_session = std::make_shared<CSession>(_ioc, this);// 开始一个异步接受操作,当new_session的socket与客户端连接成功时,调用回调函数handle_accept_acceptor.async_accept(new_session->Socket(), std::bind(&CServer::handle_accept, this, new_session,std::placeholders::_1));
}

而我们实现的IOServicePool中,在它的构造函数中初始化了n个io_context,且ioc运行在独立的线程中调用ioc.run(),如果不写work,相当于ioc没有绑定任何事件,那么ioc就会退出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/448292.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

持续科技创新 高德亮相2024中国测绘地理信息科技年会

图为博览会期间, 自然资源部党组成员、副部长刘国洪前往高德企业展台参观。 10月15日&#xff0c;2024中国测绘地理信息科学技术年会暨中国测绘地理信息技术装备博览会在郑州召开。作为国内领先的地图厂商&#xff0c;高德地图凭借高精度高动态导航地图技术应用受邀参会。 本…

一文详解流处理、批处理和流批一体

一、流处理 定义&#xff1a;流处理是一种对实时流入的数据进行连续处理的方式&#xff0c;流式数据处理中的数据集是不固定和无边界的。 特点&#xff1a; 1.实时性&#xff1a;能够在数据产生的瞬间就对其进行处理&#xff0c;具有非常高的实时性。这使得企业可以及时响应…

光伏项目开发360°管控,规避潜在风险

光伏项目建设在国家的大力支持下如雨后春笋般涌现&#xff0c;投资者对回报率和项目质量的要求越来越高。在这样的背景下&#xff0c;光伏项目全生命周期管理愈发重要。 一、业主沟通开发 积极和业主进行沟通&#xff0c;了解其合作意愿。向业主科普安装光伏的好处&#xff0c…

[权威出版|稳定检索]2024年信息控制、电气与智慧交通国际会议(ICEIT 2024)

2024年信息控制、电气与智慧交通国际会议 2024 International Conference on Information Control, Electrical and Intelligent Transportation 【1】大会信息 会议名称&#xff1a;2024年信息控制、电气与智慧交通国际会议 会议简称&#xff1a;ICEIT 2024 大会时间&#x…

Open-WebUI

Open-WebUI特点⭐ ️直观的界面&#xff1a;聊天界面从 ChatGPT 中汲取灵感&#xff0c;确保用户友好的体验。响应式设计&#xff1a;在桌面和移动设备上享受无缝体验。⚡快速响应&#xff1a;享受快速响应的性能。轻松设置&#xff1a;使用 Docker 或 Kubernetes&#xff08;…

101、QT摄像头录制视频问题

视频和音频录制类QMediaRecorder QMediaRecorder 通过摄像头和音频输入设备进行录像。 注意: 使用Qt多媒体模块的摄像头相关类无法在Windows平台上进行视频录制&#xff0c;只能进行静态图片抓取但是在Linux平台上可以实现静态图片抓取和视频录制。 Qt多媒体模块的功能实现是依…

msql事务隔离级别 线上问题

1. 对应代码 解决方式&#xff1a; 在事务隔离级别为可重复读&#xff08;RR&#xff09;时&#xff0c;数据库确实通常会记录当前数据的快照。 在可重复读隔离级别下&#xff0c;事务在执行期间看到的数据是事务开始时的数据快照&#xff0c;即使其他事务对数据进行了修改&am…

实战篇:(六)创建属于自己的 Vue 3 组件库:主题切换与样式管理

创建属于自己的 Vue 3 组件库&#xff1a;主题切换与样式管理 构建一个主题化的 Vue 3 组件库需要多个步骤&#xff0c;包括项目的初始化、组件的创建、主题的实现和样式的管理。以下是详细的步骤和实现代码。 1. 初始化项目 使用 Vite 创建 Vue 3 项目&#xff1a; npm cre…

Java基础14-网络编程

十四、网络编程 java.net.*包下提供了网络编程的解决方案! 基本的通信架构 基本的通信架构有2种形式: CS架构( Client客户端/Server服务端)、BS架构(Browser浏 览器/Server服务端)。无论是CS架构&#xff0c;还是BS架构的软件都必须依赖网络编程!。 1、网络通信的三要素 网络通…

堡垒机安装、链接服务器、数据库

堡垒机 JumpServer - 开源堡垒机 - 官网 下载安装包 jumpserver jumpserver.org (.org开源) 1、将安装包上传至虚拟机 &#xff08;1&#xff09;rz上传 &#xff08;2&#xff09;lftp登录 put下载 2、解压 [roothostname ~]# tar -xf jumpserver-ce-v4.2.0-x86_64.t…

认识Java的异常

异常机制 异常机制指的是程序出现错误时&#xff0c;程序的处理方式。 程序的错误分为三种&#xff1a; 编译错误&#xff1a;由于没有遵循对于语言的语法规则&#xff0c;编辑器可以自动发现并提示的错误位置和原因。逻辑错误&#xff1a;程序没有按照预期的顺序执行。运行…

Reality Capture 软件安装 附下载链接

Reality Capture 软件安装 文章目录 Reality Capture 软件安装一、Reality Capture v1.4汉化版安装包下载并解压二、Epic Games Launcher安装三、设置路径并安装![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/f077210990674d9fa9c10b52338b52fe.png)四、启动Epic Ga…

第十四届中国国际健康产品展览会在沪举办,无限未来品牌大放异彩

2024 年&#xff0c;第十四届中国国际健康产品展览会、2024 亚洲天然及营养保健品展在上海成功举办。 此次展会聚焦天然及营养保健品领域&#xff0c;来自香港的 INFINITE FUTURE 无限未来品牌脱颖而出。无限未来将先进的营养科学与尖端数字技术融合&#xff0c;开发专业级营养…

Stylized Far East 古代国风建筑城镇宫殿场景模型

古代国风建筑城镇宫殿场景模型。内容: -演示场景(截图) - 种类繁多的建筑,如宫殿、商店、神社、房屋、餐馆、宝塔、寺庙等 -带有塔楼、门楼的模块化城堡墙 -树木、岩石、悬崖和其他自然资产 -传统装饰,如纸灯笼、绘画、瓷器等 - 城镇道具,如手推车、栅栏、板条箱、市场、…

【JavaEE】——TCP应答报文机制,超时重传机制

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;TCP协议&#xff08;面试重点重点&#xff09; 1&#xff1a;报头长度 2&#xff1a;…

今年双十一可以买啥?2024双十一不用做功课闭眼入的品牌好物分享!

今年的双十一购物狂欢节即将来临&#xff0c;许多消费者已经开始规划他们的购物清单&#xff0c;期待在这个一年一度的促销盛会上抢购到心仪的商品。2024年的双十一&#xff0c;你无需再做繁琐的功课&#xff0c;因为这里将为你分享一些闭眼入的品牌好物&#xff0c;让你轻松享…

unity Gpu优化

不一样的视角&#xff0c;深度解读unity性能优化。unity性能优化&#xff0c;unity内存优化&#xff0c;cpu优化&#xff0c;gpu优化&#xff0c;资源优化&#xff0c;资源包、资源去重优化&#xff0c;ugui优化。 gpu优化静态批处理静态批处理原理规则静态合批的原理静态合批的…

【Sceneform-EQR】(手势优化)通过手势事件实现在AR/VR等三维场景中的控制模型旋转、平移与缩放

在上一篇文档中&#xff0c;我们实现了通过手势控制模型节点的旋转、缩放和平移。现在本文将介绍如何优化上一篇做的手势控制器&#xff0c;从而实现更好的跟手效果。 相关链接&#xff1a;【Sceneform-EQR】&#xff08;手势控制器实现&#xff09;通过手势事件实现在AR/VR等…

网络安全中的RCE命令执行漏洞----入门小白必看

RCE命令执行&代码执行漏洞 RCE命令执行漏洞 RCE漏洞简介 RCE(remote code/command execute) 远程代码/命令执行漏洞 RCE漏洞是两个漏洞&#xff1a; 代码执行漏洞 # 针对后端语言!命令执行漏洞 # 针对系统! 如何产生 在 Web应用中有时候程序员为了考虑灵活性、简洁性…

【SEO】什么是SEO?

什么是SEO&#xff08;搜索引擎优化&#xff09;&#xff1f;为什么SEO对于⼀个⽹站⾄关重要&#xff1f; SEO 全称是搜索引擎优化&#xff08;Search Engine Optimization&#xff09; 因为我们目前开发的网址&#xff0c;需要人看到&#xff0c;除了通过宣传营销的方式展现…