工具模块及项目整体模块框架

文章目录

  • 工具模块
    • logger.hpp
    • helper.hpp
    • threadpool.hpp
  • 核心概念
    • 核心API
    • 交换机类型
    • 持久化
    • ⽹络通信
    • 消息应答
    • 持久化数据管理中心模块
    • 虚拟机管理模块
    • 交换路由模块
    • 消费者管理模块
    • 信道管理模块
    • 连接管理模块
    • Broker服务器模块
    • 消费者管理
    • 信道请求模块
    • 通信连接模块
    • 项⽬模块关系图

工具模块

这个模块没有什么好说的,就是一些工具的编写,方便进行使用

logger.hpp

#ifndef M_LOG_H__
#define M_LOG_H__
#include <iostream>
#include <ctime>//封装一个日志宏,通过日志宏进行日志的打印,在打印的信息前带有系统时间以及文件名和行号
//     [17:26:24] [log.cpp:12] 打开文件失败! #define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...) {\if (level >= DEFAULT_LEVEL) {\time_t t = time(nullptr);\struct tm* ptm = localtime(&t);\char time_str[32];\strftime(time_str, 31, "%H:%M:%S", ptm);\printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\}\
}#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)#endif

helper.hpp

sqlite3

class SqliteHelper {public:typedef int(*SqliteCallback)(void*,int,char**,char**);SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr){}bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX) {//int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);if (ret != SQLITE_OK) {ELOG("创建/打开sqlite数据库失败: %s", sqlite3_errmsg(_handler));return false;}return true;}bool exec(const std::string &sql, SqliteCallback cb, void *arg) {//int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);if (ret != SQLITE_OK) {ELOG("%s \n语句执行失败: %s", sql.c_str(), sqlite3_errmsg(_handler));return false;}return true;}void close() {if (_handler) sqlite3_close_v2(_handler);}private:std::string _dbfile;sqlite3 *_handler;
};

字符串分割

class StrHelper{public:static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result) {size_t pos, idx = 0;while(idx < str.size()) {pos = str.find(sep, idx);if (pos == std::string::npos) {result.push_back(str.substr(idx));return result.size();}if (pos == idx) {idx = pos + sep.size();continue;}result.push_back(str.substr(idx, pos - idx));idx = pos + sep.size();}return result.size();}
};

生成随机数

class UUIDHelper {public:static std::string uuid() {std::random_device rd;std::mt19937_64 gernator(rd());std::uniform_int_distribution<int> distribution(0, 255);std::stringstream ss;for (int i = 0; i < 8; i++) {ss << std::setw(2) << std::setfill('0') << std::hex << distribution(gernator) ;if (i == 3 || i == 5 || i == 7) {ss << "-";}}static std::atomic<size_t> seq(1);size_t num = seq.fetch_add(1);for (int i = 7; i >= 0; i--) {ss << std::setw(2) << std::setfill('0') << std::hex << ((num>>(i*8)) & 0xff);if (i == 6) ss << "-";}return ss.str();}
};

文件常用操作

class FileHelper {public:FileHelper(const std::string &filename):_filename(filename){}bool exists() //判断文件是否存在{struct stat st;return (stat(_filename.c_str(), &st) == 0);}size_t size() {  //文件大小struct stat st;int ret = stat(_filename.c_str(), &st);if (ret < 0) {return 0;}return st.st_size;}bool read(char *body, size_t offset, size_t len) //读取文件,从offset位置读取len长度{//1. 打开文件std::ifstream ifs(_filename, std::ios::binary | std::ios::in); if (ifs.is_open() == false) {ELOG("%s 文件打开失败!", _filename.c_str());return false;}//2. 跳转文件读写位置ifs.seekg(offset, std::ios::beg);//3. 读取文件数据ifs.read(body, len);if (ifs.good() == false) {ELOG("%s 文件读取数据失败!!", _filename.c_str());ifs.close();return false;}//4. 关闭文件ifs.close();return true;}bool read(std::string &body) {//获取文件大小,根据文件大小调整body的空间size_t fsize = this->size();body.resize(fsize);return read(&body[0], 0, fsize);}bool write(const char *body, size_t offset, size_t len) {//向文件写入//1. 打开文件std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out); if (fs.is_open() == false) {ELOG("%s 文件打开失败!", _filename.c_str());return false;}//2. 跳转到文件指定位置fs.seekp(offset, std::ios::beg);//3. 写入数据fs.write(body, len);if (fs.good() == false) {ELOG("%s 文件写入数据失败!!", _filename.c_str());fs.close();return false;}//4. 关闭文件fs.close();return true;}bool write(const std::string &body) {return write(body.c_str(), 0, body.size());}bool rename(const std::string &nname) {//重命名return (::rename(_filename.c_str(), nname.c_str()) == 0);}static std::string parentDirectory(const std::string &filename) {//获取父文件目录// /aaa/bb/ccc/ddd/test.txtsize_t pos = filename.find_last_of("/");if (pos == std::string::npos) {// test.txtreturn "./";}std::string path = filename.substr(0, pos);return path;}static bool createFile(const std::string &filename) {//创建文件std::fstream ofs(filename, std::ios::binary | std::ios::out); if (ofs.is_open() == false) {ELOG("%s 文件打开失败!", filename.c_str());return false;}ofs.close();return true;}static bool removeFile(const std::string &filename) {//删除文件return (::remove(filename.c_str()) == 0);}static bool createDirectory(const std::string &path) {//创建父文件目录//  aaa/bbb/ccc    cccc// 在多级路径创建中,我们需要从第一个父级目录开始创建size_t pos, idx = 0;while(idx < path.size()) {pos = path.find("/", idx);if (pos == std::string::npos) {return (mkdir(path.c_str(), 0775) == 0);}std::string subpath = path.substr(0, pos);int ret = mkdir(subpath.c_str(), 0775);if (ret != 0 && errno != EEXIST) {ELOG("创建目录 %s 失败: %s", subpath.c_str(), strerror(errno));return false;}idx = pos + 1;}return true;}static bool removeDirectory(const std::string &path) {//删除父文件目录// rm -rf path// system()std::string cmd = "rm -rf " + path;return (system(cmd.c_str()) != -1);}private:std::string _filename;
};

threadpool.hpp

class threadpool {public:using ptr = std::shared_ptr<threadpool>;using Functor = std::function<void(void)>;threadpool(int thr_count = 1) : _stop(false){for (int i = 0; i < thr_count; i++) {_threads.emplace_back(&threadpool::entry, this);}}~threadpool() {stop();}void stop() {if (_stop == true) return;_stop = true;_cv.notify_all();for (auto &thread : _threads) {thread.join();}}//push传入的是首先有一个函数--用户要执行的函数, 接下来是不定参,表示要处理的数据也就是要传入到函数中的参数//push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),//使用lambda生成一个可调用对象(内部执行异步任务),抛入到任务池中,由工作线程取出进行执行template<typename F, typename ...Args>auto push(F &&func, Args&& ...args) -> std::future<decltype(func(args...))> {//1. 将传入的函数封装成一个packaged_task任务using return_type = decltype(func(args...));auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);std::future<return_type> fu = task->get_future();//2. 构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象{std::unique_lock<std::mutex> lock(_mutex);//3. 将构造出来的匿名函数对象,抛入到任务池中_taskpool.push_back(   [task](){ (*task)(); }  );_cv.notify_one();}return fu;}private://线程入口函数---内部不断的从任务池中取出任务进行执行。void entry() {while(!_stop){std::vector<Functor> tmp_taskpool;{//加锁std::unique_lock<std::mutex> lock(_mutex);//等待任务池不为空,或者_stop被置位返回,_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });//取出任务进行执行tmp_taskpool.swap(_taskpool);}for (auto &task : tmp_taskpool) {task();}}}private:std::atomic<bool> _stop;std::vector<Functor> _taskpool;//任务池std::mutex _mutex;std::condition_variable _cv;std::vector<std::thread> _threads;
};

核心概念

  • ⽣产者(Producer)
  • 消费者(Consumer)
  • 中间⼈(Broker)
  • 发布(Publish)
  • 订阅Subscribe)
  • ⼀个⽣产者,一个消费者
    在这里插入图片描述
  • N个⽣产者,N个消费者
    在这里插入图片描述
    其中,Broker Server是最核⼼的部分, 负责消息的存储和转发。

在这里插入图片描述
在这里插入图片描述
上述数据结构,既需要在内存中存储,也需要在硬盘中存储

  • 内存存储: ⽅便使⽤
  • 硬盘存储:重启数据不丢失

核心API

对于Broker来说,要实现以下核⼼API,通过这些API来实现消息队列的基本功能

在这里插入图片描述

另⼀⽅⾯,Producer和Consumer则通过⽹络的⽅式,远程调⽤这些API,实现⽣产者消费者模型

交换机类型

对于RabbitMQ来说,主要⽀持三种交换机类型:

  • Direct : ⽣产者发送消息时,直接指定被该交换机绑定的队列
  • Fanout : ⽣产者发送的消息会被复制到该交换机的所有队列中
  • Topic : 绑定队列到交换机上时,指定⼀个字符串为bindingKey。发送消息指定⼀个字符串为routingKey。当routingKey和bindingKey满⾜⼀定的匹配条件的时候,则把消息投递到指定队列。

持久化

Exchange,Queue,Binding,Message等数据都有持久化需求
当程序重启/主机重启保证 , 上述内容不丢失。

⽹络通信

⽣产者和消费者都是客⼾端程序,Broker则是作为服务器,通过⽹络进⾏通信。
在⽹络通信的过程中,客⼾端部分要提供对应的api,来实现对服务器的操作。

在这里插入图片描述
可以看到,在Broker的基础上,客⼾端还要增加Connection操作和Channel操作

  • Connection对应⼀个TCP连接
  • Channel则是Connection中的逻辑通道

⼀个Connection中可以包含多个Channel。Channel和Channel之间的数据是独⽴的,不会相互⼲扰。这样做主要是为了能够更好的复⽤TCP连接,达到⻓连接的效果,避免频繁的创建关闭TCP连接。

消息应答

被消费的消息,需要进⾏应答。应答模式分成两种:

在这里插入图片描述

持久化数据管理中心模块

在这里插入图片描述

虚拟机管理模块

因为交换机/队列/绑定都是基于虚拟机为单元整体进⾏操作的,因此虚拟机是对以上数据管理模块的整合模块。

在这里插入图片描述

交换路由模块

当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?交换路由模块就是决定这件事情的。

在这里插入图片描述
在这里插入图片描述

消费者管理模块

消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅⼀个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。

因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)

在这里插入图片描述

信道管理模块

本质上,在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴。

在这里插入图片描述

连接管理模块

本质上,咱们仿照实现的服务器是通过muduo库来实现底层通信的,⽽这⾥的连接管理,更多的是对muduo库中的Connection进⾏⼆次封装管理,并额外提供项⽬所需操作。

在这里插入图片描述

Broker服务器模块

整合以上所有模块,并搭建⽹络通信服务器,实现与客⼾端⽹络通信,能够识别客⼾端请求,并提供客⼾端请求的处理服务。
在这里插入图片描述

消费者管理

消费者在客⼾端的存在感⽐较低,因为在⽤⼾的使⽤⻆度中,只要创建⼀个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传⼊了⼀个消费者标识,且当前的简单实现也仅仅是⼀个信道只能创建订阅⼀个队列,也就是只能创建⼀个消费者,它们⼀⼀对应,因此更是弱化了消费者的存在。

在这里插入图片描述

信道请求模块

与服务端的信道类似,客⼾端这边在AMQP模型中,也是除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴。

在这里插入图片描述

通信连接模块

向⽤⼾提供⼀个⽤于实现⽹络通信的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与服务端进⾏⽹络通信。

在这里插入图片描述

项⽬模块关系图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/438466.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ScrapeGraphAI 大模型增强的网络爬虫

在数据驱动的动态领域&#xff0c;从在线资源中提取有价值的见解至关重要。从市场分析到学术研究&#xff0c;对特定数据的需求推动了对强大的网络抓取工具的需求。 NSDT工具推荐&#xff1a; Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线…

电脑出现msvcp140.dll丢失的解决方法,总结6种解决方法

在计算机使用过程中&#xff0c;我们常常会遇到一些错误提示&#xff0c;其中最常见的就是“msvcp140.dll丢失”的错误。这个错误提示通常出现在运行某个程序时&#xff0c;它意味着计算机无法找到所需的msvcp140.dll文件。那么&#xff0c;msvcp140.dll丢失是怎么回事呢&#…

69 BERT预训练_by《李沐:动手学深度学习v2》pytorch版

系列文章目录 文章目录 系列文章目录NLP里的迁移学习Bert的动机Bert架构对输入的修改五、预训练任务1、2、3、 六、1、2、3、 七、1、2、3、 八、1、2、3、 NLP里的迁移学习 之前是使用预训练好的模型来抽取词、句子的特征&#xff0c;例如 word2vec 或语言模型这种非深度学习…

银河麒麟V10如何关闭定期锁屏功能?

银河麒麟V10如何关闭定期锁屏功能? 1. 打开终端2. 执行命令3. 验证设置 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在银河麒麟高级服务器操作系统V10中&#xff0c;关闭定期锁屏功能很简单。使用场景&#xff1a;比如&#xff0c;当你…

大模型~合集6

我自己的原文哦~ https://blog.51cto.com/whaosoft/11566566 # 深度模型融合&#xff08;LLM/基础模型/联邦学习/微调等&#xff09; 23年9月国防科大、京东和北理工的论文“Deep Model Fusion: A Survey”。 深度模型融合/合并是一种新兴技术&#xff0c;它将多个深度学习模…

.NET CORE程序发布IIS后报错误 500.19

发布IIS后浏览时报错误500.19&#xff0c;同时配置文件web.config的路径中也存在问号“?”。 可能原因&#xff1a;没有安装运行时

CMU 10423 Generative AI:lec14(Vision Language Model:CLIP、VQ-VAE)

文章目录 1 概述2 CLIP (Used in GPT-V)3 VQ-VAE (Used in Gemini)**VQ-VAE 详细笔记****VQ-VAE 的模块组成与数据流** **1. 输入数据****2. 编码器&#xff08;Encoder&#xff09;****2.1 编码器的作用****2.2 数据流与维度变化****2.3 编码器输出** **3. 量化器&#xff08;…

手机使用指南:如何在没有备份的情况下从 Android 设备恢复已删除的联系人

在本指南中&#xff0c;您将了解如何从 Android 手机内存中恢复已删除的联系人。Android 诞生、见证并征服了 80% 的智能手机行业。有些人可能将此称为“非常大胆的宣言”&#xff0c;但最近的统计数据完全支持我们的说法。灵活性、高度改进的可用性和快速性是 Android 操作系统…

Qt QWidget控件

目录 一、概述 二、Qwidget常用属性及函数介绍 2.1 enable 2.2 geometry 2.3 windowTitle 2.4 windowIcon 2.5 cursor 2.6 font 设置字体样式 2.7 toolTip 2.8 focusPolicy焦点策略 2.9 styleSheet 一、概述 widget翻译而来就是小控件&#xff0c;小部件。…

10.3作业

基于TCP的快查云词典 仿照有道云词典功能&#xff0c;实现一个自己的云词典功能&#xff0c;可以查询单词和发音。 服务器描述&#xff1a;服务器启动后&#xff0c;等待客户端连接&#xff0c;完成以下操作&#xff1a; 1.创建用户表、单词表、历史表 2.注册&#xff1a;接…

C++模拟实现vector容器【万字模拟✨】

更多精彩内容..... &#x1f389;❤️播主の主页✨&#x1f618; Stark、-CSDN博客 本文所在专栏&#xff1a; 学习专栏C语言_Stark、的博客-CSDN博客 项目实战C系列_Stark、的博客-CSDN博客 数据结构与算法_Stark、的博客-CSDN博客 座右铭&#xff1a;梦想是一盏明灯&#xff…

mysql-索引笔记

索引 1、什么是索引 索引是对数据库中数据的一种结构化表示。它像一本书的目录&#xff0c;能够快速定位信息&#xff0c;而无需逐行扫描所有数据。 索引的出现其实就是为了提高数据查询的效率&#xff0c;就像书的目录一样。 2、索引的常见模型 2.1.哈希表 用一个哈希函…

828华为云征文|华为云 Flexus X 实例初体验

一直想有自己的一款的服务器&#xff0c;为了更好的进行家庭娱乐&#xff0c;甚至偶尔可以满足个人搭建开发环境的需求&#xff0c;直到接触到了华为云 Flexus X 云服务器。Flexus 云服务器 X 实例是面向中小企业和开发者打造的轻量级云服务器。提供快速应用部署和简易的管理能…

每日论文5—06TCAS2锁相环电流匹配的gain-boosting电荷泵

《Gain-Boosting Charge Pump for Current Matching in Phase-Locked Loop》 06TCAS2 本质上和cascode来增加输出电阻&#xff0c;从而减小电流变化的思路是一样的。这里用了放大器来增加输出电阻。具体做法如下图&#xff1a; 如图1(a)&#xff0c;A3把Vb和Vx拉平&#xff0…

vscode安装及c++配置编译

1、VScode下载 VS Code官网下载地址&#xff1a;Visual Studio Code - Code Editing. Redefined。 2、安装中文插件 搜索chinese&#xff0c;点击install下载安装中文插件。 3、VS Code配置C/C开发环境 3.1、MinGW-w64下载 VS Code是一个高级的编辑器&#xff0c;只能用来写代…

基础算法--枚举

枚举算法是一种简单而有效的算法&#xff0c;它通过枚举所有可能的情况来解决问题。它通常用于解决问题规模比较小的问题&#xff0c;因为它的时间复杂度很高&#xff0c;随着问题的规模增加&#xff0c;算法的效率会急剧下降。 枚举算法的基本思路是通过循环遍历所有可能的情…

Rust和Go谁会更胜一筹

在国内&#xff0c;我认为Go语言会成为未来的主流&#xff0c;因为国内程序员号称码农&#xff0c;比较适合搬砖&#xff0c;而Rust对心智要求太高了&#xff0c;不适合搬砖。 就个人经验来看&#xff0c;Go语言简单&#xff0c;下限低&#xff0c;没有什么心智成本&#xff0c…

使用MTVerseXR SDK实现VR串流

1、概述​ MTVerseXR SDK 是摩尔线程GPU加速的虚拟现实&#xff08;VR&#xff09;流媒体平台&#xff0c;专门用于从远程服务器流式传输基于标准OpenXR的应用程序。MTVerseXR可以通过Wi-Fi和USB流式将VR内容从Windows服务器流式传输到XR客户端设备, 使相对性能低的VR客户端可…

【CSS in Depth 2 精译_043】6.5 CSS 中的粘性定位技术 + 本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一章 层叠、优先级与继承&#xff08;已完结&#xff09;第二章 相对单位&#xff08;已完结&#xff09;第三章 文档流与盒模型&#xff08;已完结&#xff09;第四章 Flexbox 布局&#xff08;已…

【2022工业3D异常检测文献】AST: 基于归一化流的双射性产生不对称学生-教师异常检测方法

Asymmetric Student-Teacher Networks for Industrial Anomaly Detection 1、Background 所谓的学生-教师网络&#xff0c;首先&#xff0c;对教师进行训练&#xff0c;以学习语义嵌入的辅助性训练任务&#xff1b;其次&#xff0c;训练学生以匹配教师的输出。主要目的是让学生…