C++使用线程池模拟异步事件处理机制

  在C++很多框架中都有异步事件处理机制,这导致我们在看源码时经常很疑惑,难以理解,而其中包含的编程套路可能是一些成熟的技术,只是我们不熟悉,比如WebRTC中类似于Qt的信号槽机制,线程事件处理, 或者使用系统异步IO等等,如果看不懂这些套路,理解代码会很难,本篇博客来尝使用用C++线程池实现一种异步事件处理机制。

异步事件处理机制的基本实现

  C++可以使用std::future和std::promise来实现异步操作。然而,为了实现一个异步事件绑定的框架,我们需要更复杂的设计。下面是一个简单的例子,说明如何实现一个异步事件处理器。

  首先,定义一个事件处理器类,该类将接收并处理事件:

class EventHandler {
public:virtual ~EventHandler() = default;virtual void handleEvent(int eventID) = 0;
};

  然后,我们需要创建一个事件分发器,它将异步地调用事件处理器:

/*事件注册,分发*/#pragma once#include "EventHandler.hpp"
#include <map>
#include <thread>
#include <future>
#include <functional>
#include <memory>class EventDispatcher {
public:// 注册事件处理器void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {handlers[eventID] = handler;}// 异步事件分发函数void postEvent(int eventID) {auto it = handlers.find(eventID);if (it != handlers.end()) {std::thread eventThread(&EventDispatcher::dispatchEvent, this, it->second, eventID);eventThread.detach();}}private:// 事件分发函数void dispatchEvent(std::shared_ptr<EventHandler> handler, int eventID) {handler->handleEvent(eventID);}private:std::map<int, std::shared_ptr<EventHandler>> handlers;  // 存储事件,int 事件id, std::shared_ptr<EventHandler> 事件处理器
};

  在这个例子中,EventDispatcher类的postEvent方法接收一个事件ID,并在新线程中调用相应的事件处理器。这样做可以实现事件的异步处理。

  然后,你可以创建一个或多个处理器类,比如下面的打印事件处理器PrintEventHandler ,它实现EventHandler接口,

/*具体的事件处理器*/#include "EventHandler.hpp"
#include <iostream>using namespace std;class PrintEventHandler : public EventHandler {
public:void handleEvent(int eventID) override {std::cout << "Handling event " << eventID << std::endl;}
};

  然后再main函数中进行注册:

/*C++异步事件框架demo01*/#include <iostream>
#include <memory>
#include <thread>
#include <chrono>
#include "EventDispatcher.hpp"
#include "PrintEventHandler.hpp"int main() {EventDispatcher dispatcher;std::shared_ptr<EventHandler> printHandler = std::make_shared<PrintEventHandler>();dispatcher.registerHandler(1, printHandler);dispatcher.postEvent(1);// Sleep main thread to let the event thread finish.std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}

运行结果:

Handling event 1

代码组织如下,有兴趣的可以自行编写实现:
在这里插入图片描述

cmake脚本

#[[编译方法cmake -S . -B buildcd buildmake./demo01]]cmake_minimum_required(VERSION 3.20)project(demo01)set(INCLUDE_PATH1  "./")# 添加头文件目录
include_directories(${INCLUDE_PATH1}
)# 添加子目录src
aux_source_directory("./" SRC)add_executable(demo01 ${SRC})

  这个实现是非常基础的,并没有考虑到线程安全问题和异常处理等等。在实际的项目中,你需要更复杂的设计,并使用更高级的并发编程技术,如线程池、任务队列、互斥锁等等。

添加线程池、任务队列

  如果想要更复杂的设计,包括线程池、任务队列、互斥锁等,你可以考虑使用以下的设计。下面的例子使用了C++17的std::async和std::future来实现线程池和任务队列。

  首先,我们需要一个线程安全的任务队列:

#pragma once#include <queue>
#include <mutex>
#include <condition_variable>template <typename T>
class ThreadSafeQueue {
public:ThreadSafeQueue() = default;ThreadSafeQueue(const ThreadSafeQueue<T> &) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue<T> &) = delete;void push(T value) {std::lock_guard<std::mutex> lock(mMutex);mQueue.push(std::move(value));mCondition.notify_one();}bool try_pop(T& value) {std::lock_guard<std::mutex> lock(mMutex);if (mQueue.empty()) {return false;}value = std::move(mQueue.front());mQueue.pop();return true;}void wait_and_pop(T& value) {std::unique_lock<std::mutex> lock(mMutex);mCondition.wait(lock, [this](){ return !mQueue.empty(); });value = std::move(mQueue.front());mQueue.pop();}private:std::queue<T> mQueue;std::mutex mMutex;std::condition_variable mCondition;
};

  然后,我们需要一个线程池来处理这些任务:

#pragma once#include "ThreadSafeQueue.hpp"
#include <vector>
#include <future>class ThreadPool {
public:ThreadPool(size_t numThreads) {start(numThreads);}~ThreadPool() {stop();}template<typename T>void enqueue(T task) {mTasks.push(std::make_shared<std::packaged_task<void()>>(std::move(task)));}private:std::vector<std::thread> mThreads;ThreadSafeQueue<std::shared_ptr<std::packaged_task<void()>>> mTasks;std::atomic<bool> mContinue { true };void start(size_t numThreads) {for (auto i = 0u; i < numThreads; ++i) {mThreads.emplace_back([=] {while (mContinue) {std::shared_ptr<std::packaged_task<void()>> task;if (mTasks.try_pop(task)) {(*task)();} else {std::this_thread::yield();}}});}}void stop() noexcept {mContinue = false;for (auto &thread : mThreads) {thread.join();}}
};

  然后,我们可以在EventDispatcher中使用线程池来异步地处理事件:

#pragma once
#include "ThreadPool.hpp"
#include <mutex>
#include <map>
#include <memory>
#include "EventHandler.hpp"class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {mThreadPool.enqueue([handler = it->second, eventID]() {handler->handleEvent(eventID);});}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;ThreadPool mThreadPool;std::mutex mMutex;
};

  这个实现现在是线程安全的,并且使用了线程池来处理事件。对于异常处理,你可以在EventHandler的handleEvent方法内部进行。

添加异常处理

  如何在EventDispatcher中处理异常?
EventDispatcher中处理异常,我们可以在任务的执行过程中增加一个try/catch块,捕获并处理可能出现的异常。下面是一个示例:

class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {mThreadPool.enqueue([handler = it->second, eventID]() {try {handler->handleEvent(eventID);} catch (const std::exception &ex) {std::cerr << "Exception occurred while handling event " << eventID << ": " << ex.what() << '\n';} catch (...) {std::cerr << "Unknown exception occurred while handling event " << eventID << '\n';}});}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;ThreadPool mThreadPool;std::mutex mMutex;
};

  在这个示例中,如果handleEvent方法抛出了一个异常,那么我们将在控制台上打印出一个错误消息。具体如何处理这些异常,取决于你的应用程序的特定需求。

  请注意,这种方法仅处理了在handleEvent方法中抛出的异常。如果在其他地方(如ThreadPool的构造函数或enqueue方法)抛出异常,你可能需要额外的代码来处理这些情况。

注册多个事件

  在我们的事件分发器设计中,注册多个事件是非常简单的。你只需要创建不同的事件处理器,然后使用不同的事件ID将它们注册到事件分发器中。以下是一个示例:

class PrintEventHandler : public EventHandler {
public:PrintEventHandler(int id) : id(id) {}void handleEvent(int eventID) override {std::cout << "Handler " << id << " handling event " << eventID << std::endl;}
private:int id;
};int main() {EventDispatcher dispatcher(4);  // 创建一个有4个线程的事件分发器std::shared_ptr<EventHandler> handler1 = std::make_shared<PrintEventHandler>(1);dispatcher.registerHandler(1, handler1);std::shared_ptr<EventHandler> handler2 = std::make_shared<PrintEventHandler>(2);dispatcher.registerHandler(2, handler2);dispatcher.postEvent(1);dispatcher.postEvent(2);// Sleep main thread to let the event threads finishstd::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}

  在这个示例中,我们创建了两个PrintEventHandler对象,每个对象都有一个唯一的ID。然后,我们使用不同的事件ID将这两个处理器注册到EventDispatcher中。当我们发布事件时,相应的处理器将会被调用。

  这就是如何在我们的异步事件绑定框架中注册多个事件的方法。

如何取消注册

  要从事件分发器中取消注册一个事件处理器,你可以在EventDispatcher类中添加一个新的方法,例如unregisterHandler,用于从处理器映射中删除指定的事件ID。下面是如何实现这个函数的例子:

class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void unregisterHandler(int eventID) {std::lock_guard<std::mutex> lock(mMutex);mHandlers.erase(eventID);}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {mThreadPool.enqueue([handler = it->second, eventID]() {try {handler->handleEvent(eventID);} catch (const std::exception &ex) {std::cerr << "Exception occurred while handling event " << eventID << ": " << ex.what() << '\n';} catch (...) {std::cerr << "Unknown exception occurred while handling event " << eventID << '\n';}});}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;ThreadPool mThreadPool;std::mutex mMutex;
};

  这个unregisterHandler函数将删除与给定事件ID关联的事件处理器。请注意,这个函数并不会停止已经在处理该事件的任何线程。如果你想要取消正在进行的事件处理,你可能需要一个更复杂的设计,例如使用std::futurestd::promise来控制线程的执行。

如何停止正在进行的事件

  要停止正在进行的事件处理,我们需要更复杂的设计,它可能包括使用std::futurestd::promise来控制线程的执行。在这种设计中,每当一个事件被发布时,我们将创建一个std::promise,并将相应的std::future存储在某个地方,以便我们可以稍后在需要时停止事件处理。

  但是,要注意的是,根据C++的设计,没有一个简单且安全的方法可以强制停止正在运行的线程,因为这可能会导致资源泄漏或其他未定义的行为。因此,更常见的做法是让事件处理器定期检查一个“停止标记”,然后在检查到该标记时优雅地停止执行。以下是一个简单的示例,演示了如何实现这种设计:

class StoppableEvent {
public:StoppableEvent(std::future<void> future, std::function<void()> func): mFuture(std::move(future)), mFunc(std::move(func)) {}void operator()() {while(mFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::timeout) {mFunc();}}private:std::future<void> mFuture;std::function<void()> mFunc;
};class EventDispatcher {
public:EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {std::lock_guard<std::mutex> lock(mMutex);mHandlers[eventID] = handler;}void postEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mHandlers.find(eventID);if (it != mHandlers.end()) {std::promise<void> stopSignal;auto stopFuture = stopSignal.get_future();mStopSignals[eventID] = std::move(stopSignal);mThreadPool.enqueue(StoppableEvent(std::move(stopFuture), [handler = it->second, eventID]() {handler->handleEvent(eventID);}));}}void stopEvent(int eventID) {std::lock_guard<std::mutex> lock(mMutex);auto it = mStopSignals.find(eventID);if (it != mStopSignals.end()) {it->second.set_value();mStopSignals.erase(it);}}private:std::map<int, std::shared_ptr<EventHandler>> mHandlers;std::map<int, std::promise<void>> mStopSignals;ThreadPool mThreadPool;std::mutex mMutex;
};

  在这个例子中,我们定义了一个StoppableEvent类,它将一个std::future和一个函数组合在一起。当operator()被调用时,它将定期检查future,如果future的状态不是timeout,则停止执行函数。

  然后,当我们在EventDispatcher中发布一个事件时,我们将创建一个新的std::promise和相应的std::future,并将这个future和事件处理器的handleEvent方法一起传递给StoppableEvent。我们还将promise存储在一个映射中,以便我们可以稍后通过调用set_value来发出停止信号。

  最后,我们添加了一个stopEvent方法,它将查找与给定事件ID关联的promise,并通过调用set_value来发出停止信号。然后,它将从映射中删除这个promise,因为我们不再需要它。

  这是一个基本的示例,你可能需要根据你的具体需求来修改和扩展它。请注意,这个设计假设事件处理器的handleEvent方法将被调用多次,每次调用都可能被中断。如果你的事件处理器只执行一次长时间运行的任务,那么这个设计可能并不适合。

  以上是一个简易的异步事件处理demo, 在项目开发中,需要根据具体的业务需求进行调整完善。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/188255.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Sensor 点亮出图后,颜色偏红或者偏绿是为什么?

这是因为 sensor balck level 的值配置的不正确导致&#xff0c;black level 的值一般在效果参数的 calibration 参数里面。 在驱动调试阶段&#xff0c;我们一般都是复用其他已调试好的&#xff0c;sensor 的驱动文件及效果文件&#xff0c; 而不同 sensor 的 balck level 的…

【qemu逃逸】XCTF 华为高校挑战赛决赛-pipeline

前言 虚拟机用户名: root 无密码 设备逆向与漏洞分析 程序没有去符合, 还是比较简单. 实例结构体如下: 先总体说一下流程: encode 为 base64 编码函数, decode 为 base64 解码函数. 然后 encPipe 和 decPipe 分别存放编码数据和解码数据, 分别有四个: 其中 EncPipeLine 中…

网页推理游戏

目录 python challenge &#xff08;0&#xff09; &#xff08;1&#xff09; &#xff08;2&#xff09; The Riddle &#xff08;1&#xff09; &#xff08;2&#xff09; &#xff08;3&#xff09; &#xff08;4&#xff09; Nazo &#xff08;1&#xff09;…

【Spring】SpringBoot日志

SpringBoot日志 日志概述日志使用打印日志获取日志对象使用日志对象打印日志日志框架介绍门面模式SLF4J框架介绍(simple logging facade for java) 日志格式说明日志级别日志级别的分类日志级别的使用 日志配置配置日志级别日志持久化配置日志文件的路径和文件名配置日志文件的…

【tgcalls】Instance接口的实例类的创建

tg 里有多个版本,因此设计了版本管理的map,每次可以选择一个版本进行实例创建这样,每个客户端就可以定制开发了。tg使用了c++20创建是要传递一个描述者,里面是上下文信息 G:\CDN\P2P-DEV\tdesktop-offical\Telegram\ThirdParty\tgcalls\tgcalls\Instance.cpp可以看到竟然是…

【数据结构】顺序表 | 详细讲解

在计算机中主要有两种基本的存储结构用于存放线性表&#xff1a;顺序存储结构和链式存储结构。本篇文章介绍采用顺序存储的结构实现线性表的存储。 顺序存储定义 线性表的顺序存储结构&#xff0c;指的是一段地址连续的存储单元依次存储链性表的数据元素。 线性表的&#xf…

单词规律问题

给定一种规律 pattern 和一个字符串 s &#xff0c;判断 s 是否遵循相同的规律。 这里的 遵循 指完全匹配&#xff0c;例如&#xff0c; pattern 里的每个字母和字符串 s 中的每个非空单词之间存在着双向连接的对应规律。 示例1: 输入: pattern “abba”, s “dog cat cat d…

【Git】Git分支与标签掌握这些技巧让你成为合格的码农

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《Git》。&#x1f3af;&#x1f3af; &#x1f449…

vue Sts认证后直传图片到阿里云OSS

后端进行sts认证生成临时身份凭证&#xff0c;前端通过凭证直传图片等文件到OSS中 一 OSS配置 增加用户和角色&#xff0c;创建OSS bucket 1.1 添加用户 登录阿里云管理控制台&#xff0c;右侧头像&#xff0c;进入访问控制 点击左侧导航栏的身份管理的用户&#xff0c;点击…

MySQL的索引和复合索引

由于MySQL自动将主键加入到二级索引&#xff08;自行建立的index&#xff09;里&#xff0c;所以当select的是主键或二级索引就会很快&#xff0c;select *就会慢。因为有些列是没在索引里的 假设CA有1kw人咋整&#xff0c;那我这个索引只起了前一半作用。 所以用复合索引&am…

探索微信小程序框架的精华——高质量的优秀选择

目录 引言&#xff1a; 1. 框架性能 2. 开发者工具支持 3. 文档和社区支持 4. 扩展能力 5. 使用率和稳定性 结语&#xff1a; 引言&#xff1a; 微信小程序作为一种轻量级、高效便捷的应用形式&#xff0c;已经在移动应用领域占据了重要地位。而其中&#xff0c;选择一个…

【EI会议征稿】JPCS独立出版-第五届新材料与清洁能源国际学术会议(ICAMCE 2024)

JPCS独立出版-第五届新材料与清洁能源国际学术会议&#xff08;ICAMCE 2024&#xff09; 2024 5th International Conference on Advanced Material and Clean Energy 第五届新材料与清洁能源国际学术会议&#xff08;ICAMCE 2024&#xff09;将于2024年2月23-25日在中国▪长沙…

【论文阅读】多模态NeRF:Cross-Spectral Neural Radiance Fields

https://cvlab-unibo.github.io/xnerf-web intro 从不同的light spectrum sensitivity获取信息&#xff0c;同时需要obtain a unified Cross-Spectral scene representation – allowing for querying, for any single point, any of the information sensed across spectra。…

xcode SDK does not contain ‘libarclite‘

SDK does not contain libarclite at the path /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/lib/arc/libarclite_iphonesimulator.a; try increasing the minimum deployment target解决方法 iOS13以上

macOS Sonoma 14.2beta2(23C5041e)发布(附黑白苹果镜像地址)

系统介绍 黑果魏叔11 月 10 日消息&#xff0c;今日向 Mac 电脑用户推送了 macOS 14.2 开发者预览版 Beta 2 更新&#xff08;内部版本号&#xff1a;23C5041e&#xff09;&#xff0c;本次更新距离上次发布隔了 14 天。 macOS Sonoma 14.2 添加了 Music 收藏夹播放列表&…

“Git实践指南:深入探索开发测试上线、分支管理与标签“

文章目录 引言一、Git的分支的使用1.分支2.标签3.分支与标签的关系4. 分支在实际中的作用5. 四个环境以及各自的功能特点6. 分支策略分支应用场景 二、Git的标签3.1 标签的基本使用3.3 标签的共享与推送 总结 引言 在现代软件开发中&#xff0c;版本控制是一个关键的环节&…

11 抽象向量空间

抽象向量空间 向量是什么函数什么是线性推论向量空间 这是关于3Blue1Brown "线性代数的本质"的学习笔记。 向量是什么 可以是一个箭头&#xff0c;可以是一组实数&#xff0c;即一个坐标对。 箭头在高维&#xff08;4维&#xff0c;甚至更高&#xff09;空间&…

云效流水线docker部署 :node.js镜像部署VUE项目

文章目录 引言I 流水线配置1.1 项目dockerfile1.2 Node.js 镜像构建1.3 docker 部署引言 云效流水线配置实现docker 部署微服务项目:https://blog.csdn.net/z929118967/article/details/133687120?spm=1001.2014.3001.5501 配置dockerfile-> 镜像构建->docker部署。 …

电脑怎么录制视频,录制的视频怎么剪辑?

在现今数字化的时代&#xff0c;视频成为了人们日常生活中不可或缺的一部分。因此&#xff0c;对于一些需要制作视频教程、录制游戏或者是进行视频演示的人来说&#xff0c;电脑录屏已经成为了一个必不可少的工具。那么&#xff0c;对于这些人来说&#xff0c;如何选择一个好用…

C语言 每日一题 PTA 11.7 day13

1.求e的近似值 自然常数 e 可以用级数 1 1 / 1! 1 / 2! ⋯ 1 / n! ⋯ 来近似计算。 本题要求对给定的非负整数 n&#xff0c;求该级数的前 n 1 项和。 代码实现 #include<stdio.h> void main() {int a, i, j; double b 1; double c 1;printf("请输入一个数\n…