msgqueue.hpp队列模块

目录

一.MsgQueue模块介绍

二.MsgQueue类的实现

成员变量

构造函数与析构函数

成员函数

参数设置函数 setArgs

参数获取函数 getArgs

三.MsgQueueMapper类的实现

成员变量

构造函数

成员函数

创建表格函数 createTable

删除表格函数 dropTable

插入数据函数 insert

删除数据函数 remove

数据恢复函数 recover

msgQueueMapCb

四.MsgQueueMapper类的实现

成员变量

构造函数

成员函数

声明队列函数 declareQueue

删除队列函数 removeQueue

查询队列函数 selectQueue

查询所有队列函数 selectAll

队列是否存在函数 exists

队列数量函数 size

清空所有队列函数 clear

一.MsgQueue模块介绍

二.MsgQueue类的实现

成员变量

MsgQueue 结构体用于描述一个消息队列的基本属性。

std::string _name; // 队列名称
bool _durable;     // 队列是否持久化
bool _exclusive;   // 队列是否独占
bool _auto_del;    // 队列是否自动删除
google::protobuf::Map<std::string, std::string> _args; // 队列的附加参数

构造函数与析构函数

结构体提供了一个带参数的构造函数用于初始化各成员变量。同时提供了一个默认构造函数以支持无参初始化。

MsgQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &_args): _name(name), _durable(durable), _exclusive(exclusive), _auto_del(auto_del), _args(_args) {}

成员函数

MsgQueue 提供了两个主要的成员函数,用于设置和获取队列的附加参数。

参数设置函数 setArgs

setArgs 方法接收一个格式化的字符串参数,并将其解析为键值对,存储在 _args 中。
该方法使用了 StrHelper::split 函数来分割字符串,通过 = 符号区分键和值,并存储在 _args 中。

参数获取函数 getArgs

getArgs 方法用于将 _args 中的键值对组合成格式化字符串,返回给调用者。

std::string getArgs()
{std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;
}

三.MsgQueueMapper类的实现

成员变量

SqliteHelper sql_helper; //数据库管理句柄
  • sql_helperSqliteHelper 是一个辅助类,封装了SQLite的基本操作,如执行SQL语句和管理数据库连接。

构造函数

MsgQueueMapper(const std::string &dbname) : sql_helper(dbname)
{std::string path = FileHelper::getParentDirName(dbname);FileHelper::createDir(path);if (!sql_helper.open()){ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());assert(0);}createTable();
}

构造函数接受一个数据库名称参数,初始化 sql_helper,并在必要时创建数据库目录和表。

成员函数

创建表格函数 createTable
void createTable()
{std::stringstream sql;sql << "create table if not exists msg_queue(";sql << "name varchar(64) primary key,";sql << "durable int,";sql << "exclusive int,";sql << "auto_del int,";sql << "args varchar(64));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());assert(0);}
}
删除表格函数 dropTable

dropTable 方法用于删除现有的消息队列表格。该方法执行删除表格的SQL语句,如果表格存在,它将被移除。

void dropTable()
{std::stringstream sql;sql << "drop table if exists msg_queue;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());assert(0);}
}
插入数据函数 insert

insert 方法将 MsgQueue 对象插入到数据库中。

void insert(MsgQueue::ptr &msg_queue_ptr)
{std::stringstream sql;sql << "insert into msg_queue(name, durable, exclusive, auto_del, args) values(";sql << "'" << msg_queue_ptr->_name << "',";sql << msg_queue_ptr->_durable << ",";sql << msg_queue_ptr->_exclusive << ",";sql << msg_queue_ptr->_auto_del << ",";sql << "'" << msg_queue_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper insert msg_queue failed:%s", sql.str().c_str());assert(0);}
}
删除数据函数 remove

remove 方法用于从数据库中删除指定名称的消息队列。

void remove(const std::string &name)
{std::stringstream sql;sql << "delete from msg_queue where name = '" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper remove msg_queue failed:%s", sql.str().c_str());assert(0);}
}
数据恢复函数 recover

recover 方法用于从数据库中恢复所有消息队列。该方法执行SQL查询语句,遍历结果集,并将每个消息队列恢复到内存中的 _msgQueues 容器中。

   msgqueue_map recover(){msgqueue_map ret;std::string sql = "select * from msg_queue;";if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret)){ELOG("MsgQueueMapper recover failed:%s", sql.c_str());assert(0);}return ret;}
msgQueueMapCb
  • 该静态回调函数在 recover 函数中被调用,用于将数据库查询结果中的每一行转化为一个 MsgQueue对象,并将其存储到 msgqueue_map中。
      static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names){msgqueue_map *ret = (msgqueue_map *)arg;MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();msg_queue_ptr->_name = col_values[0];msg_queue_ptr->_durable = std::stoi(col_values[1]);msg_queue_ptr->_exclusive = std::stoi(col_values[2]);msg_queue_ptr->_auto_del = std::stoi(col_values[3]);if (col_values[4]){msg_queue_ptr->setArgs((std::string)col_values[4]);}elseELOG("没有其它参数");ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));return 0;}

四.MsgQueueMapper类的实现

MsgQueueManager 类用于管理内存中的消息队列对象,并与 MsgQueueMapper 协作实现消息队列的持久化。

成员变量

  • _msgQueues:一个键值对映射,用于存储当前内存中的所有消息队列。
  • _mapperMsgQueueMapper 对象,用于与数据库进行持久化操作。
  • _mutex:用于保护消息队列操作的线程安全。

构造函数

构造函数在初始化时会调用 MsgQueueMapperrecover 方法,从数据库恢复所有消息队列。

MsgQueueManager(const std::string &dbname) : _mapper(dbname)
{_mapper.recover(_msgQueues);
}

成员函数

声明队列函数 declareQueue

declareQueue 方法用于声明一个新的消息队列,并将其加入到内存和数据库中。

该方法首先检查队列是否已经存在,若不存在,则创建并插入新的队列。如果队列需要持久化,则还会将其插入到数据库中。

bool declareQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &args)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it != _msgQueues.end()){ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())return true;}MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);_msgQueues.insert(std::make_pair(name, msg_queue_ptr));if (msg_queue_ptr->_durable)_mapper.insert(msg_queue_ptr);return true;
}
删除队列函数 removeQueue

removeQueue 方法用于删除指定名称的消息队列。

bool removeQueue(const std::string &name)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ELOG("MsgQueueManager removeQueue:%s not exists", name.c_str());return false;}if (it->second->_durable)_mapper.remove(name);_msgQueues.erase(it);return true;
}
查询队列函数 selectQueue

selectQueue 方法根据队列名称查询并返回消息队列对象

该方法用于在内存中查找指定名称的队列,并返回指向该队列的智能指针。

MsgQueue::ptr selectQueue(const std::string &name)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ELOG("MsgQueueManager selectQueue:%s not exists", name.c_str());return nullptr;}return it->second;
}
查询所有队列函数 selectAll
  msgqueue_map selectAll(){return _msgQueues;}
队列是否存在函数 exists

exists 方法用于判断指定名称的消息队列是否存在。


bool exists(const std::string &name) { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.find(name) != _msgQueues.end(); }

该方法返回一个布尔值,指示队列是否存在。

队列数量函数 size

size 方法用于返回当前存在的消息队列数量。

size_t size() { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.size(); }

该方法返回一个整数值,表示内存中队列的数量。

清空所有队列函数 clear

clear 方法用于清空所有当前存在的消息队列。


void clear() { std::lock_guard<std::mutex> lock(_mutex); _msgQueues.clear(); _mapper.dropTable(); _mapper.createTable(); }

该方法清空内存中的所有队列,并在数据库中删除和重建消息队列表格。

五.MsgQueue.hpp所有代码

#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <google/protobuf/map.h>namespace mq
{struct MsgQueue{std::string _name;bool _durable;   // 是否持久化bool _exclusive; // 是否独占bool _auto_del;  // 是否自动删除google::protobuf::Map<std::string, std::string> _args;using ptr = std::shared_ptr<MsgQueue>;MsgQueue(const std::string &name, bool durable, bool exclusive,bool auto_del, const google::protobuf::Map<std::string, std::string> &_args): _name(name),_durable(durable),_exclusive(exclusive),_auto_del(auto_del),_args(_args){}MsgQueue() {}void setArgs(const std::string &str_args){std::vector<std::string> args;size_t sz = StrHelper::split(str_args, "&", args);for (auto &kv : args){size_t pos = kv.find("=");if (pos == std::string::npos){ELOG("MsgQueue args format error:%s", kv.c_str());assert(0);}std::string key = kv.substr(0, pos);std::string val = kv.substr(pos + 1);_args[key] = val;}}std::string getArgs(){std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;}};using msgqueue_map = std::unordered_map<std::string, std::shared_ptr<MsgQueue>>;class MsgQueueMapper{private:SqliteHelper sql_helper;public:MsgQueueMapper(const std::string &dbname): sql_helper(dbname){// 数据库有path即可,open时自动创建文件std::string path = FileHelper::getParentDirName(dbname);FileHelper::createDir(path);if (!sql_helper.open()){ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());assert(0);}createTable();}// 1.创建,删除表void createTable(){std::stringstream sql;sql << "create table if not exists msg_queue(";sql << "name varchar(64) primary key,";sql << "durable int,";sql << "exclusive int,";sql << "auto_del int,";sql << "args varchar(64));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());assert(0);}}void dropTable(){std::stringstream sql;sql << "drop table if exists msg_queue;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());assert(0);}}// 2.插入/删除数据void insert(MsgQueue::ptr &msg_queue_ptr){std::stringstream sql;sql << "insert into msg_queue (name,durable,exclusive,auto_del,args) values(";sql << "'" << msg_queue_ptr->_name << "',";sql << msg_queue_ptr->_durable << ",";sql << msg_queue_ptr->_exclusive << ",";sql << msg_queue_ptr->_auto_del << ",";sql << "'" << msg_queue_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper insert failed:%s", sql.str().c_str());assert(0);}}void remove(const std::string &name){std::stringstream sql;sql << "delete from msg_queue where name='" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper remove failed:%s", sql.str().c_str());assert(0);}}// 3.recovermsgqueue_map recover(){msgqueue_map ret;std::string sql = "select * from msg_queue;";if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret)){ELOG("MsgQueueMapper recover failed:%s", sql.c_str());assert(0);}return ret;}private:static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names){msgqueue_map *ret = (msgqueue_map *)arg;MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();msg_queue_ptr->_name = col_values[0];msg_queue_ptr->_durable = std::stoi(col_values[1]);msg_queue_ptr->_exclusive = std::stoi(col_values[2]);msg_queue_ptr->_auto_del = std::stoi(col_values[3]);if (col_values[4]){msg_queue_ptr->setArgs((std::string)col_values[4]);}elseELOG("没有其它参数");ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));return 0;}};class MsgQueueManager{public:using ptr = std::shared_ptr<MsgQueueManager>;private:msgqueue_map _msgQueues;std::mutex _mutex;MsgQueueMapper _mapper;public:MsgQueueManager(const std::string &dbname): _mapper(dbname){_msgQueues = _mapper.recover();}// 1. 插入/删除数据bool declareQueue(const std::string &name,bool durable,bool exclusive,bool auto_del,const google::protobuf::Map<std::string, std::string> &args){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it != _msgQueues.end()){ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())return true;}MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);_msgQueues.insert(std::make_pair(name, msg_queue_ptr));if (msg_queue_ptr->_durable)_mapper.insert(msg_queue_ptr);// std::cout<<"declare队列:"<<msg_queue_ptr->_name<<std::endl;// std::cout<<"队列的个数:"<<_msgQueues.size()<<std::endl;return true;}bool removeQueue(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager removeQueue:%s not exists", name.c_str());return true;}_msgQueues.erase(it);if (it->second->_durable)_mapper.remove(name);return true;}// 2. 查询一个/所有 queueMsgQueue::ptr selectQueue(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager select:%s not exists", name.c_str());return MsgQueue::ptr();}return it->second;}msgqueue_map selectAll(){return _msgQueues;}// 3. 其它操作bool exists(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager :%s not exists", name.c_str());return false;}return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _msgQueues.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.dropTable();_msgQueues.clear();}};
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/400710.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GPT-4o:开启多模态AI识别新纪元

GPT-4o功能简介 在人工智能的演变历程中&#xff0c;图像识别技术始终占据着核心地位。技术的发展日新月异&#xff0c;使得AI不仅能够识别图像内容&#xff0c;还能将其转化为文字描述。特别值得一提的是&#xff0c;OpenAI在春季发布的GPT-4o模型&#xff0c;将图像识别技术…

微软Detours Hook库编译与使用

Detours 是微软开发的一个强大的Windows API钩子库&#xff0c;用于监视和拦截函数调用。它广泛应用于微软产品团队和众多独立软件开发中&#xff0c;旨在无需修改原始代码的情况下实现函数拦截和修改。Detours 在调试、监控、日志记录和性能分析等方面表现出色&#xff0c;已成…

shell命令行解释器—既陌生有熟悉的东西

今天做一个感性的认识来&#xff0c;用一个生活的例子。 你生活在有一条村子里面&#xff0c;在村的东边就是王婆&#xff0c;王婆呢&#xff1f;她主要做什么呢啊&#xff1f;她在村儿里面呢&#xff0c;也不种地啊&#xff0c;那她干什么呢&#xff1f;他主要做帮别人进行婚嫁…

【TabBar嵌套Navigation案例-发现页面-按钮上的图片旋转 Objective-C语言】

一、接下来,我们来做这个,点击以后,让它出一个蓝色的View 1.就是我们示例程序的这种效果, 一点击,让这个按钮旋转,然后呢,再让它出来一个蓝色的View, 首先,我们要去监听它的点击事件,这是第一,我点击以后,我要做一些什么样的操作,要有点击事件, 所以呢,我要把…

JS基础进阶Webs-API、HTML 、DOM

一、JS中的API 1. 定义 JavaScript API是指为JavaScript提供的一组编程接口和对象&#xff0c;用以允许开发者访问和操作Web浏览器或其他JavaScript环境&#xff08;如Node.js&#xff09;提供的特定功能。这些API使得开发者能够编写更加动态和交互式的Web应用程序。 2. 主要…

服务器数据恢复—raid5阵列热备盘未全部启用导致阵列崩溃的数据恢复案例

服务器存储数据恢复环境&#xff1a; 一台EMC某型号存储中有一组RAID5磁盘阵列。该raid5阵列中有12块硬盘&#xff0c;其中2块硬盘为热备盘。 服务器存储故障&#xff1a; 该存储raid5阵列中有两块硬盘离线&#xff0c;只有1块热备盘启用替换掉其中一块离线盘&#xff0c;另外…

​产品经理-​你如何理解“互联网思维(35)

在产品规划和功能改版中&#xff0c;确实非常重视用户需求和体验。产品需求是互联网产品的核心 用户体验是互联网产品的重点。在互联网新产品规划中&#xff0c;会非常重视用户验证环节 确保做出来的东西确实是用户想要的&#xff1b;而在已经上线的产品中&#xff0c;往往会有…

人工智能与机器学习原理精解【12】

文章目录 分级聚类理论分级聚类的详细说明1. 定义2. 算法3. 计算4. 例子5. 例题 皮尔逊相关系数 julia实现 参考文献 分级聚类 理论 分级聚类的详细说明 1. 定义 分级聚类&#xff08;Hierarchical Clustering&#xff09;&#xff0c;又称为层次聚类&#xff0c;是一种通过…

谷歌反垄断官司败诉后,或又面临被拆分风险?

KlipC报道&#xff1a;上周8月5日&#xff0c;美国法院裁定谷歌的搜索业务违反了美国反垄断法&#xff0c;非法垄断在线搜索和搜索文本广告市场。据悉&#xff0c;胜诉的美国司法部正在考虑拆分谷歌。其他选项包括强制谷歌与竞争对手分享更多数据&#xff0c;以及防止其在人工智…

【二叉树进阶】--- 根据二叉树创建字符串

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; 数据结构 从本篇文章开始&#xff0c;博主将分享一些结合二叉树的进阶算法题。 &#x1f3e0; 根据二叉树创建字符串 &#x1f4cc; 题目内容 根据二叉…

从行为面试问题(behavioral questions)看中美程序员差异。

中美程序员在职场中的工作状态和职能、福利等有很大区别&#xff0c;从面试中的BQ轮就可见一斑。 中美程序员的面试轮差异&#xff1f; 国内的面试轮在不同公司间差异很大&#xff0c;但总体的问题类型包含笔试面试&#xff08;算法题、概念题、项目深挖、职业目标、职场文化…

FGUI+TS如何实现数字翻滚

FGUITS如何实现数字翻滚 实现效果如下&#xff1a; 实现步骤&#xff1a; fgui制作组件和特效 fgui制作组件&#xff0c;设置一条竖向数字包含1-9或者小数点符号等&#xff0c;可见区域为一个数字大小&#xff0c;最好可见区域紧贴数字&#xff0c;这样滚动的时候滚动区域范围…

深度学习------------------卷积神经网络(LeNet)

目录 LeNet网络手写的数字识别MNIST总结卷积神经网络&#xff08;LeNet&#xff09; 问题 LeNet网络 手写的数字识别 MNIST ①输入的是&#xff1a;3232的image ②放到一个55的卷积层里面&#xff08;为什么是5&#xff1f;因为32-x128&#xff0c;∴x5&#xff09;&#xff0c…

【教程】Ubuntu给pycharm添加侧边栏快捷方式

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 以下教程不仅限于pycharm&#xff0c;其他软件也是一样操作 1、进入到pycharm的目录&#xff0c;先通过命令行打开pycharm&#xff1a; ./bin/pycharm…

keepalived+haproxy高可用负载均衡集群

简介 使用haproxy制作负载均衡集群&#xff0c;keepalived通过状态检测脚本检测本机haproxy状态&#xff0c;若为离线状态&#xff0c;则会降低该节点的优先级。 实验准备 四台虚拟机&#xff1a;KA1、KA2为keepalivedhaproxy&#xff0c;web1、web2为后端服务器&#xff0c;均…

阿里云-java调用短信服务,第三方接口的开启(傻瓜式教程)

第一步&#xff1a;在浏览器中&#xff0c;搜索阿里云 第二步&#xff1a;打开aly的主页 第三步&#xff1a;在最上方的导航栏中&#xff0c;找到云市场&#xff0c;注意不要点击&#xff0c;会自动有触发悬浮框出现&#xff0c;在悬浮框中找到 短信 第四步&#xff1a;点击 短…

无人机之电池注意事项

1、外场作业时&#xff0c;电池一定要放置在阴凉处&#xff0c;避免太阳直射&#xff1b; 2、刚作业完的电池发热严重时&#xff0c;请降至室温再充电&#xff1b; 3、注意电池状态&#xff0c;一旦发现电池出现鼓包、漏液等现象&#xff0c;必须马上停止使用&#xff1b; 4…

UE5 C++项目的配置

创建项目 首先启动UE5,然后选择要创建的项目&#xff0c;选择c进行创建 创建项目完毕之后&#xff0c;会自动打开visual studio&#xff0c;页面如下图所示 点击总体配置状态的刷新按钮&#xff0c;会自动检测总体的配置状态 一般会在下图所示的两项出现警告 Unreal Engi…

舵机模块学习

舵机是一种根据输入PWM信号占空比来控制输出角度的装置 执行逻辑&#xff1a;PWM信号输入到控制板&#xff0c;给控制版一个指定的目标角度&#xff0c;然后电位器检测输出轴的当前角度&#xff0c;如果大于目标角度&#xff0c;电机反转&#xff0c;小于正转&#xff0c;最终使…

Linux--HTTP协议(http服务器构建)

目录 1.HTTP 协议 2.认识 URL 3.urlencode 和 urldecode&#xff08;编码&#xff09; urlencode&#xff08;URL编码&#xff09; urldecode&#xff08;URL解码&#xff09; 4.HTTP 协议请求与响应格式 4.1HTTP 常见方法&#xff08;三种&#xff09; 5.HTTP 的状态码…