C++编程:模拟实现CyberRT的DataVisitor和DataDispatcher

文章目录

    • 0. 引言
    • 1. 设计概要
      • 1.1 主要组件
      • 1.2 类关系图
      • 1.3 工作流程
    • 2. 代码实现
      • 2.1. 定义数据结构
      • 2.2. 实现 DataVisitor
      • 2.3. 实现 DataDispatcher
      • 2.4. 实现 Receiver
      • 2.5. 实现具体的 DataVisitor
      • 2.6. 示例主程序
      • 2.7. 编译和运行

0. 引言

使用 C++ 实现一个类似CyberRT 架构的 DataVisitorDataDispatcher。在 CyberRT 中:

  • Receiver 接收到消息后,会触发回调。
  • 回调中调用 DataDispatcher(消息分发器)发布消息。
  • DataDispatcher 是一个单例,负责所有的数据分发,并将数据放入对应的缓存中。
  • 然后,DataDispatcher 会通知对应的协程(在此简化为线程)去处理消息。
  • DataVisitor(消息访问器)是辅助类,用于管理数据处理过程,包括注册通知机制和绑定回调函数。

1. 设计概要

1.1 主要组件

  • DataDispatcher

    • 单例模式。
    • 管理所有 DataVisitor
    • 分发数据到对应的 DataVisitor 的缓冲区。
    • 通知 DataVisitor 处理数据。
  • DataVisitor

    • 负责特定类型数据的处理。
    • 包含一个线程,等待 DataDispatcher 的通知。
    • 绑定一个回调函数用于处理数据。
    • 管理自己的数据缓冲区。
  • Receiver

    • 模拟消息接收器,接收到消息后调用 DataDispatcher 发布数据。

1.2 类关系图

以下类关系图,反映了 DataDispatcher 作为单例管理多个 DataVisitor,并与 Receiver 交互的关系。

管理
1
*
调用 Dispatch
Data
+int id
+std::string content
«abstract»
DataVisitor
+Notify(std::shared_ptr<data> data)
LoggingVisitor
+Create()
ProcessingVisitor
+Create()
DataDispatcher
-std::vector> visitors
-std::mutex mutex
+Instance()
+RegisterVisitor(std::shared_ptr visitor)
+UnregisterVisitor(std::shared_ptr visitor)
+Dispatch(std::shared_ptr<data> data)
Receiver
+ReceiveMessage(int id, const std::string& content)

1.3 工作流程

  1. Receiver 接收到消息后,调用 DataDispatcher::Instance()->Dispatch(data)
  2. DataDispatcher 将数据放入对应的 DataVisitor 的缓冲区。
  3. DataDispatcher 通知对应的 DataVisitor
  4. DataVisitor 的线程被唤醒,取出数据并执行绑定的回调函数进行处理。

2. 代码实现

完整代码见:data-visitor-dispatcher

2.1. 定义数据结构

首先,定义一个通用的数据结构 Data

// data.h
#ifndef DATA_H
#define DATA_H#include <string>// 定义一个通用的数据类型
struct Data {int id;std::string content;
};#endif  // DATA_H

2.2. 实现 DataVisitor

DataVisitor 负责处理特定类型的数据。它包含一个线程,该线程等待 DataDispatcher 的通知,然后处理数据。

// data_visitor.h
#ifndef DATA_VISITOR_H
#define DATA_VISITOR_H#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>#include "data.h"class DataVisitor : public std::enable_shared_from_this<DataVisitor> {public:using Callback = std::function<void(std::shared_ptr<Data>)>;// 构造函数,绑定回调函数explicit DataVisitor(Callback callback) : callback_(callback), stop_flag_(false) {worker_thread_ = std::thread(&DataVisitor::ProcessData, this);}// 析构函数,确保线程安全停止~DataVisitor() {Stop();if (worker_thread_.joinable()) {worker_thread_.join();}}// 用于 DataDispatcher 发布数据时调用void Notify(std::shared_ptr<Data> data) {{std::lock_guard<std::mutex> lock(queue_mutex_);data_queue_.push(data);}cv_.notify_one();}private:// 数据处理线程函数void ProcessData() {while (!stop_flag_) {std::unique_lock<std::mutex> lock(queue_mutex_);cv_.wait(lock, [this]() { return stop_flag_ || !data_queue_.empty(); });while (!data_queue_.empty()) {auto data = data_queue_.front();data_queue_.pop();lock.unlock();// 执行绑定的回调函数if (callback_) {try {callback_(data);} catch (const std::exception& e) {// 处理回调中的异常,防止线程终止// 可以记录日志或采取其他措施// 这里简单输出错误信息std::cerr << "Exception in callback: " << e.what() << std::endl;}}lock.lock();}}}// 停止线程void Stop() {stop_flag_ = true;cv_.notify_all();}Callback callback_;std::thread worker_thread_;std::mutex queue_mutex_;std::condition_variable cv_;std::queue<std::shared_ptr<Data>> data_queue_;std::atomic<bool> stop_flag_;
};#endif  // DATA_VISITOR_H

2.3. 实现 DataDispatcher

DataDispatcher 是一个单例,负责管理所有的 DataVisitor 并分发数据。

// data_dispatcher.h
#ifndef DATA_DISPATCHER_H
#define DATA_DISPATCHER_H#include <algorithm>
#include <memory>
#include <mutex>
#include <vector>#include "data.h"
#include "data_visitor.h"// 单例的 DataDispatcher
class DataDispatcher {public:// 获取单例实例static DataDispatcher& Instance() {static DataDispatcher instance;return instance;}// 禁止拷贝和赋值DataDispatcher(const DataDispatcher&) = delete;DataDispatcher& operator=(const DataDispatcher&) = delete;// 注册一个 DataVisitorvoid RegisterVisitor(const std::shared_ptr<DataVisitor>& visitor) {std::lock_guard<std::mutex> lock(mutex_);visitors_.emplace_back(visitor);}// 注销一个 DataVisitorvoid UnregisterVisitor(const std::shared_ptr<DataVisitor>& visitor) {std::lock_guard<std::mutex> lock(mutex_);visitors_.erase(std::remove(visitors_.begin(), visitors_.end(), visitor), visitors_.end());}// 分发数据到所有注册的 DataVisitorvoid Dispatch(const std::shared_ptr<Data>& data) {std::lock_guard<std::mutex> lock(mutex_);for (auto& visitor : visitors_) {if (visitor) {visitor->Notify(data);}}}private:// 私有构造函数DataDispatcher() = default;~DataDispatcher() = default;std::vector<std::shared_ptr<DataVisitor>> visitors_;std::mutex mutex_;
};#endif  // DATA_DISPATCHER_H

2.4. 实现 Receiver

模拟消息接收器,每当接收到消息时,调用 DataDispatcher 分发数据。

// receiver.h
#ifndef RECEIVER_H
#define RECEIVER_H#include <functional>
#include <memory>
#include <string>#include "data.h"
#include "data_dispatcher.h"// Receiver,模拟消息接收器
class Receiver {public:using Callback = std::function<void(std::shared_ptr<Data>)>;// 构造函数,绑定回调函数explicit Receiver(Callback callback) : callback_(callback) {}// 模拟接收消息void ReceiveMessage(int id, const std::string& content) {auto data = std::make_shared<Data>();data->id = id;data->content = content;// 触发回调if (callback_) {callback_(data);}}private:Callback callback_;
};#endif  // RECEIVER_H

2.5. 实现具体的 DataVisitor

例如,创建 LoggingVisitorProcessingVisitor,它们各自有不同的处理逻辑。

// logging_visitor.h
#ifndef LOGGING_VISITOR_H
#define LOGGING_VISITOR_H#include <iostream>
#include <memory>
#include "data_visitor.h"// LoggingVisitor,负责记录数据
class LoggingVisitor {public:// 创建一个 LoggingVisitor 的 DataVisitor 实例static std::shared_ptr<DataVisitor> Create() {return std::make_shared<DataVisitor>([](std::shared_ptr<Data> data) {std::cout << "[LoggingVisitor] Received data: ID=" << data->id << ", Content=\"" << data->content << "\""<< std::endl;});}
};#endif  // LOGGING_VISITOR_H
// processing_visitor.h
#ifndef PROCESSING_VISITOR_H
#define PROCESSING_VISITOR_H#include <iostream>
#include <memory>
#include "data_visitor.h"// ProcessingVisitor,负责处理数据
class ProcessingVisitor {public:// 创建一个 ProcessingVisitor 的 DataVisitor 实例static std::shared_ptr<DataVisitor> Create() {return std::make_shared<DataVisitor>([](std::shared_ptr<Data> data) {// 简单示例:打印数据长度std::cout << "[ProcessingVisitor] Processed data ID=" << data->id << ", Length=" << data->content.length()<< std::endl;});}
};#endif  // PROCESSING_VISITOR_H

2.6. 示例主程序

展示如何使用上述组件,实现数据接收、分发和处理。

// main.cpp
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>#include "data_dispatcher.h"
#include "logging_visitor.h"
#include "processing_visitor.h"
#include "receiver.h"int main() {// 创建并注册 DataVisitorauto logger = LoggingVisitor::Create();auto processor = ProcessingVisitor::Create();DataDispatcher::Instance().RegisterVisitor(logger);DataDispatcher::Instance().RegisterVisitor(processor);// 创建 Receiver,并绑定 DataDispatcher 的 Dispatch 方法Receiver receiver([](std::shared_ptr<Data> data) { DataDispatcher::Instance().Dispatch(data); });// 模拟接收消息std::cout << "=== 接收第1条消息 ===" << std::endl;receiver.ReceiveMessage(1, "Hello, CyberRT!");std::cout << "=== 接收第2条消息 ===" << std::endl;receiver.ReceiveMessage(2, "Another data packet.");// 等待一段时间以确保所有消息被处理std::this_thread::sleep_for(std::chrono::seconds(1));// 注销一个 DataVisitorstd::cout << "\n=== 移除 LoggingVisitor ===" << std::endl;DataDispatcher::Instance().UnregisterVisitor(logger);// 再次接收消息std::cout << "=== 接收第3条消息 ===" << std::endl;receiver.ReceiveMessage(3, "Data after removing logger.");// 等待一段时间以确保消息被处理std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "\n=== 程序结束 ===" << std::endl;return 0;
}

2.7. 编译和运行

假设所有文件在同一目录下,可以使用以下命令进行编译:

g++ -std=c++14 main.cpp -o dispatcher -pthread

运行程序后,您将看到类似如下的输出:

test@pi:~/dataVisitor$ g++ -std=c++14 main.cpp -o dispatcher -pthread
test@pi:~/dataVisitor$ ./dispatcher 
=== 接收第1条消息 ===
=== 接收第2条消息 ===
[LoggingVisitor] Received data: ID=[ProcessingVisitor] Processed data ID=11, Content="Hello, CyberRT!"
[LoggingVisitor] Received data: ID=2, Content="Another data packet."
, Length=15
[ProcessingVisitor] Processed data ID=2, Length=20=== 移除 LoggingVisitor ===
=== 接收第3条消息 ===
[ProcessingVisitor] Processed data ID=3, Length=27=== 程序结束 ===

注意,第三条消息只被 ProcessingVisitor 处理,因为 LoggingVisitor 已被移除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/482638.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL底层概述—9.ACID与事务

大纲 1.ACID之原子性 2.ACID之持久性 3.ACID之隔离性 4.ACID之一致性 5.ACID的关系 6.事务控制演进之排队 7.事务控制演进之排它锁 8.事务控制演进之读写锁 9.事务控制演进之MVCC 10.事务隔离级别之隔离级别的类型 11.事务隔离级别之和锁的关系 12.事务隔离级别之隔…

基于 SpringBoot 的新冠密接者跟踪系统:如何实现高效信息推送功能

第2章 程序开发技术 2.1 Mysql数据库 为了更容易理解Mysql数据库&#xff0c;接下来就对其具备的主要特征进行描述。 &#xff08;1&#xff09;首选Mysql数据库也是为了节省开发资金&#xff0c;因为网络上对Mysql的源码都已进行了公开展示&#xff0c;开发者根据程序开发需要…

手撸了一个文件传输工具

在日常的开发与运维中&#xff0c;文件传输工具是不可或缺的利器。无论是跨服务器传递配置文件&#xff0c;还是快速从一台机器下载日志文件&#xff0c;一个高效、可靠且简单的文件传输工具能够显著提高工作效率。今天&#xff0c;我想分享我自己手撸一个文件传输工具的全过程…

基于Java Springboot电子书阅读器APP且微信小程序

一、作品包含 源码数据库全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 微信开发者工具 数…

【AI系统】AI 编译器基本架构

AI 编译器基本架构 在上一篇文章中将 AI 编译器的发展大致分为了 3 个阶段&#xff0c;分别为 1&#xff09;朴素编译器、2&#xff09;专用编译器以及 3&#xff09;通用编译器。 本文作为上一篇文章 AI 编译器架构的一个延续&#xff0c;着重讨论 AI 编译器的通用架构。首先…

华为关键词覆盖应用市场ASO优化覆盖技巧

在我国的消费者群体当中&#xff0c;华为的品牌形象较高&#xff0c;且产品质量过硬&#xff0c;因此用户基数也大。与此同时&#xff0c;随着影响力的增大&#xff0c;华为不断向外扩张&#xff0c;也逐渐成为了海外市场的香饽饽。作为开发者和运营者&#xff0c;我们要认识到…

SuperMap GIS基础产品FAQ集锦(20241202)

一、SuperMap iDesktopX 问题1&#xff1a;请问一下&#xff0c;iDesktopX11.2.1如何修改启动界面 11.2.0 【解决办法】参考帮助文档的“自定义启动界面”内容&#xff1a;https://help.supermap.com/iDesktopX/zh/SpecialFeatures/Development/DevelopmentTutorial/UserCust…

Java基础访问修饰符全解析

一、Java 访问修饰符概述 Java 中的访问修饰符用于控制类、方法、变量和构造函数的可见性和访问权限&#xff0c;主要有四种&#xff1a;public、protected、default&#xff08;无修饰符&#xff09;和 private。 Java 的访问修饰符在编程中起着至关重要的作用&#xff0c;它…

浪潮X86服务器NF5280、8480、5468、5270使用inter VROC Raid key给NVME磁盘做阵列

Inter VROC技术简介 Intel Virtual RAID on CPU (Intel VROC) 简单来说就是用CPU的PCIE通道给NVME硬盘做Raid 更多信息可以访问官方支持页面 Raid Key 授权&#xff0c;即VROC SKU 授权主要有用的有2个标准和高级&#xff0c;仅Raid1的授权我暂时没见过。 标准 VROCSTANMOD …

【Pytorch】torch.view与torch.reshape的区别

文章目录 一. 简介&#xff1a;二. Pytorch中Tensor的存储方式2.1 Pytorch中张量存储的底层原理2.2 Pytorch张量步长(stride)属性 三. 对视图(view)的理解四. view()与reshape()的比较4.1 对view()的理解4.1.1 &#xff08;1&#xff09;如何理解满足条件 stride[i] stride[i1…

光伏电站设计排布前的准备

1、确定安装地点 地理位置&#xff1a;了解安装地点的经纬度&#xff0c;这对于确定太阳辐射角度和强度非常重要&#xff0c;海拔越高&#xff0c;阳光辐照就越高&#xff0c;比较适合安装光伏电站&#xff0c;根据地理位置还可以确定光伏板的安装倾角是多少&#xff0c;可以进…

5、防火墙一

防火墙的含义 firewalld&#xff1a;隔离功能 病毒防护&#xff1a; 1、入侵检测系统&#xff1a;在互联网访问的过程中&#xff0c;不阻断任何网络访问&#xff0c;也不会定位网络的威胁&#xff0c;提供告警和事后的监督&#xff0c;类似于监控。 2、入侵防御系统&#x…

代码随想录算法训练营第六十天|Day60 图论

Bellman_ford 队列优化算法&#xff08;又名SPFA&#xff09; https://www.programmercarl.com/kamacoder/0094.%E5%9F%8E%E5%B8%82%E9%97%B4%E8%B4%A7%E7%89%A9%E8%BF%90%E8%BE%93I-SPFA.html 本题我们来系统讲解 Bellman_ford 队列优化算法 &#xff0c;也叫SPFA算法&#xf…

详解LZ4文件解压缩问题

详解LZ4文件解压缩问题 一、LZ4文件解压缩方法1. 使用LZ4命令行工具2. 使用Python库3. 使用第三方工具4. 在线解压工具 二、常见问题及解决方法1. 解压显示文件损坏2. 解压后文件大小异常 三、总结 LZ4是一种快速的压缩算法&#xff0c;广泛应用于需要实时压缩和解压缩大文件的…

【Linux网络编程】第四弹---构建UDP服务器与字典翻译系统:源码结构与关键组件解析

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】【Linux网络编程】 目录 1、UdpServer.hpp 1.1、函数对象声明 1.2、Server类基本结构 1.3、构造函数 1.4、Start() 2、Dict.hpp…

DBA面试题-1

面临失业&#xff0c;整理一下面试题&#xff0c;找下家继续搬砖 主要参考&#xff1a;https://www.csdn.net/?spm1001.2101.3001.4476 略有修改 一、mysql有哪些数据类型 1&#xff0c; 整形 tinyint,smallint,medumint,int,bigint&#xff1b;分别占用1字节、2字节、3字节…

vxe-table 树形表格序号的使用

vxe-table 树形结构支持多种方式的序号&#xff0c;可以及时带层级的序号&#xff0c;也可以是自增的序号。 官网&#xff1a;https://vxetable.cn 带层级序号 <template><div><vxe-grid v-bind"gridOptions"></vxe-grid></div> <…

精通.NET鉴权与授权

授权在.NET 中是指确定经过身份验证的用户是否有权访问特定资源或执行特定操作的过程。这就好比一个公司&#xff0c;身份验证(鉴权)是检查你是不是公司的员工&#xff0c;而授权则是看你这个员工有没有权限进入某个特定的办公室或者使用某台设备。 两个非常容易混淆的单词 鉴…

Spring Task和WebSocket使用

在现代 Web 应用中&#xff0c;WebSocket 作为一种全双工通信协议&#xff0c;为实时数据传输提供了强大的支持。若要确保 WebSocket 在生产环境中的稳定性和性能&#xff0c;使用 Nginx 作为反向代理服务器是一个明智的选择。本篇文章将带你了解如何在 Nginx 中配置 WebSocket…

机器学习任务功略

loss如果大&#xff0c;训练资料没有学好&#xff0c;此时有两个可能&#xff1a; 1.model bias太过简单&#xff08;找不到loss低的function&#xff09;。 解决办法&#xff1a;增加输入的feacture&#xff0c;设一个更大的model&#xff0c;也可以用deep learning增加弹性…