目录
一.MsgQueue模块介绍
二.MsgQueue类的实现
成员变量
构造函数与析构函数
成员函数
参数设置函数 setArgs
参数获取函数 getArgs
三.MsgQueueMapper类的实现
成员变量
构造函数
成员函数
创建表格函数 createTable
删除表格函数 dropTable
插入数据函数 insert
删除数据函数 remove
数据恢复函数 recover
msgQueueMapCb
四.MsgQueueMapper类的实现
成员变量
构造函数
成员函数
声明队列函数 declareQueue
删除队列函数 removeQueue
查询队列函数 selectQueue
查询所有队列函数 selectAll
队列是否存在函数 exists
队列数量函数 size
清空所有队列函数 clear
一.MsgQueue模块介绍
二.MsgQueue类的实现
成员变量
MsgQueue
结构体用于描述一个消息队列的基本属性。
std::string _name; // 队列名称
bool _durable; // 队列是否持久化
bool _exclusive; // 队列是否独占
bool _auto_del; // 队列是否自动删除
google::protobuf::Map<std::string, std::string> _args; // 队列的附加参数
构造函数与析构函数
结构体提供了一个带参数的构造函数用于初始化各成员变量。同时提供了一个默认构造函数以支持无参初始化。
MsgQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &_args): _name(name), _durable(durable), _exclusive(exclusive), _auto_del(auto_del), _args(_args) {}
成员函数
MsgQueue
提供了两个主要的成员函数,用于设置和获取队列的附加参数。
参数设置函数 setArgs
setArgs
方法接收一个格式化的字符串参数,并将其解析为键值对,存储在 _args
中。
该方法使用了 StrHelper::split
函数来分割字符串,通过 =
符号区分键和值,并存储在 _args
中。
参数获取函数 getArgs
getArgs
方法用于将 _args
中的键值对组合成格式化字符串,返回给调用者。
std::string getArgs()
{std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;
}
三.MsgQueueMapper类的实现
成员变量
SqliteHelper sql_helper; //数据库管理句柄
sql_helper
:SqliteHelper
是一个辅助类,封装了SQLite的基本操作,如执行SQL语句和管理数据库连接。
构造函数
MsgQueueMapper(const std::string &dbname) : sql_helper(dbname)
{std::string path = FileHelper::getParentDirName(dbname);FileHelper::createDir(path);if (!sql_helper.open()){ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());assert(0);}createTable();
}
构造函数接受一个数据库名称参数,初始化 sql_helper
,并在必要时创建数据库目录和表。
成员函数
创建表格函数 createTable
void createTable()
{std::stringstream sql;sql << "create table if not exists msg_queue(";sql << "name varchar(64) primary key,";sql << "durable int,";sql << "exclusive int,";sql << "auto_del int,";sql << "args varchar(64));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());assert(0);}
}
删除表格函数 dropTable
dropTable
方法用于删除现有的消息队列表格。该方法执行删除表格的SQL语句,如果表格存在,它将被移除。
void dropTable()
{std::stringstream sql;sql << "drop table if exists msg_queue;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());assert(0);}
}
插入数据函数 insert
insert
方法将 MsgQueue
对象插入到数据库中。
void insert(MsgQueue::ptr &msg_queue_ptr)
{std::stringstream sql;sql << "insert into msg_queue(name, durable, exclusive, auto_del, args) values(";sql << "'" << msg_queue_ptr->_name << "',";sql << msg_queue_ptr->_durable << ",";sql << msg_queue_ptr->_exclusive << ",";sql << msg_queue_ptr->_auto_del << ",";sql << "'" << msg_queue_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper insert msg_queue failed:%s", sql.str().c_str());assert(0);}
}
删除数据函数 remove
remove
方法用于从数据库中删除指定名称的消息队列。
void remove(const std::string &name)
{std::stringstream sql;sql << "delete from msg_queue where name = '" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper remove msg_queue failed:%s", sql.str().c_str());assert(0);}
}
数据恢复函数 recover
recover
方法用于从数据库中恢复所有消息队列。该方法执行SQL查询语句,遍历结果集,并将每个消息队列恢复到内存中的 _msgQueues
容器中。
msgqueue_map recover(){msgqueue_map ret;std::string sql = "select * from msg_queue;";if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret)){ELOG("MsgQueueMapper recover failed:%s", sql.c_str());assert(0);}return ret;}
msgQueueMapCb
- 该静态回调函数在
recover
函数中被调用,用于将数据库查询结果中的每一行转化为一个 MsgQueue对象,并将其存储到 msgqueue_map中。
static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names){msgqueue_map *ret = (msgqueue_map *)arg;MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();msg_queue_ptr->_name = col_values[0];msg_queue_ptr->_durable = std::stoi(col_values[1]);msg_queue_ptr->_exclusive = std::stoi(col_values[2]);msg_queue_ptr->_auto_del = std::stoi(col_values[3]);if (col_values[4]){msg_queue_ptr->setArgs((std::string)col_values[4]);}elseELOG("没有其它参数");ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));return 0;}
四.MsgQueueMapper类的实现
MsgQueueManager
类用于管理内存中的消息队列对象,并与 MsgQueueMapper
协作实现消息队列的持久化。
成员变量
_msgQueues
:一个键值对映射,用于存储当前内存中的所有消息队列。_mapper
:MsgQueueMapper
对象,用于与数据库进行持久化操作。_mutex
:用于保护消息队列操作的线程安全。
构造函数
构造函数在初始化时会调用 MsgQueueMapper
的 recover
方法,从数据库恢复所有消息队列。
MsgQueueManager(const std::string &dbname) : _mapper(dbname)
{_mapper.recover(_msgQueues);
}
成员函数
声明队列函数 declareQueue
declareQueue
方法用于声明一个新的消息队列,并将其加入到内存和数据库中。
该方法首先检查队列是否已经存在,若不存在,则创建并插入新的队列。如果队列需要持久化,则还会将其插入到数据库中。
bool declareQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &args)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it != _msgQueues.end()){ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())return true;}MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);_msgQueues.insert(std::make_pair(name, msg_queue_ptr));if (msg_queue_ptr->_durable)_mapper.insert(msg_queue_ptr);return true;
}
删除队列函数 removeQueue
removeQueue
方法用于删除指定名称的消息队列。
bool removeQueue(const std::string &name)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ELOG("MsgQueueManager removeQueue:%s not exists", name.c_str());return false;}if (it->second->_durable)_mapper.remove(name);_msgQueues.erase(it);return true;
}
查询队列函数 selectQueue
selectQueue
方法根据队列名称查询并返回消息队列对象
该方法用于在内存中查找指定名称的队列,并返回指向该队列的智能指针。
MsgQueue::ptr selectQueue(const std::string &name)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ELOG("MsgQueueManager selectQueue:%s not exists", name.c_str());return nullptr;}return it->second;
}
查询所有队列函数 selectAll
msgqueue_map selectAll(){return _msgQueues;}
队列是否存在函数 exists
exists
方法用于判断指定名称的消息队列是否存在。
bool exists(const std::string &name) { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.find(name) != _msgQueues.end(); }
该方法返回一个布尔值,指示队列是否存在。
队列数量函数 size
size
方法用于返回当前存在的消息队列数量。
size_t size() { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.size(); }
该方法返回一个整数值,表示内存中队列的数量。
清空所有队列函数 clear
clear
方法用于清空所有当前存在的消息队列。
void clear() { std::lock_guard<std::mutex> lock(_mutex); _msgQueues.clear(); _mapper.dropTable(); _mapper.createTable(); }
该方法清空内存中的所有队列,并在数据库中删除和重建消息队列表格。
五.MsgQueue.hpp所有代码
#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <google/protobuf/map.h>namespace mq
{struct MsgQueue{std::string _name;bool _durable; // 是否持久化bool _exclusive; // 是否独占bool _auto_del; // 是否自动删除google::protobuf::Map<std::string, std::string> _args;using ptr = std::shared_ptr<MsgQueue>;MsgQueue(const std::string &name, bool durable, bool exclusive,bool auto_del, const google::protobuf::Map<std::string, std::string> &_args): _name(name),_durable(durable),_exclusive(exclusive),_auto_del(auto_del),_args(_args){}MsgQueue() {}void setArgs(const std::string &str_args){std::vector<std::string> args;size_t sz = StrHelper::split(str_args, "&", args);for (auto &kv : args){size_t pos = kv.find("=");if (pos == std::string::npos){ELOG("MsgQueue args format error:%s", kv.c_str());assert(0);}std::string key = kv.substr(0, pos);std::string val = kv.substr(pos + 1);_args[key] = val;}}std::string getArgs(){std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;}};using msgqueue_map = std::unordered_map<std::string, std::shared_ptr<MsgQueue>>;class MsgQueueMapper{private:SqliteHelper sql_helper;public:MsgQueueMapper(const std::string &dbname): sql_helper(dbname){// 数据库有path即可,open时自动创建文件std::string path = FileHelper::getParentDirName(dbname);FileHelper::createDir(path);if (!sql_helper.open()){ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());assert(0);}createTable();}// 1.创建,删除表void createTable(){std::stringstream sql;sql << "create table if not exists msg_queue(";sql << "name varchar(64) primary key,";sql << "durable int,";sql << "exclusive int,";sql << "auto_del int,";sql << "args varchar(64));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());assert(0);}}void dropTable(){std::stringstream sql;sql << "drop table if exists msg_queue;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());assert(0);}}// 2.插入/删除数据void insert(MsgQueue::ptr &msg_queue_ptr){std::stringstream sql;sql << "insert into msg_queue (name,durable,exclusive,auto_del,args) values(";sql << "'" << msg_queue_ptr->_name << "',";sql << msg_queue_ptr->_durable << ",";sql << msg_queue_ptr->_exclusive << ",";sql << msg_queue_ptr->_auto_del << ",";sql << "'" << msg_queue_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper insert failed:%s", sql.str().c_str());assert(0);}}void remove(const std::string &name){std::stringstream sql;sql << "delete from msg_queue where name='" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper remove failed:%s", sql.str().c_str());assert(0);}}// 3.recovermsgqueue_map recover(){msgqueue_map ret;std::string sql = "select * from msg_queue;";if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret)){ELOG("MsgQueueMapper recover failed:%s", sql.c_str());assert(0);}return ret;}private:static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names){msgqueue_map *ret = (msgqueue_map *)arg;MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();msg_queue_ptr->_name = col_values[0];msg_queue_ptr->_durable = std::stoi(col_values[1]);msg_queue_ptr->_exclusive = std::stoi(col_values[2]);msg_queue_ptr->_auto_del = std::stoi(col_values[3]);if (col_values[4]){msg_queue_ptr->setArgs((std::string)col_values[4]);}elseELOG("没有其它参数");ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));return 0;}};class MsgQueueManager{public:using ptr = std::shared_ptr<MsgQueueManager>;private:msgqueue_map _msgQueues;std::mutex _mutex;MsgQueueMapper _mapper;public:MsgQueueManager(const std::string &dbname): _mapper(dbname){_msgQueues = _mapper.recover();}// 1. 插入/删除数据bool declareQueue(const std::string &name,bool durable,bool exclusive,bool auto_del,const google::protobuf::Map<std::string, std::string> &args){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it != _msgQueues.end()){ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())return true;}MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);_msgQueues.insert(std::make_pair(name, msg_queue_ptr));if (msg_queue_ptr->_durable)_mapper.insert(msg_queue_ptr);// std::cout<<"declare队列:"<<msg_queue_ptr->_name<<std::endl;// std::cout<<"队列的个数:"<<_msgQueues.size()<<std::endl;return true;}bool removeQueue(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager removeQueue:%s not exists", name.c_str());return true;}_msgQueues.erase(it);if (it->second->_durable)_mapper.remove(name);return true;}// 2. 查询一个/所有 queueMsgQueue::ptr selectQueue(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager select:%s not exists", name.c_str());return MsgQueue::ptr();}return it->second;}msgqueue_map selectAll(){return _msgQueues;}// 3. 其它操作bool exists(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager :%s not exists", name.c_str());return false;}return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _msgQueues.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.dropTable();_msgQueues.clear();}};
};