C++使用ZeroMQ和MessagePack实现简单又轻量级的RPC框架

在现代的分布式系统中,远程过程调用(RPC)是一个非常重要的机制,它允许不同的服务或组件之间的通信,就像调用本地函数一样。本文将介绍如何使用ZeroMQ和MessagePack来构建一个轻量级的RPC框架,并提供一个简单的使用示例。

ZeroMQ简介

ZeroMQ(也称为0MQ)是一个高性能的异步消息库,旨在使用标准的、对等的传输协议实现消息的发送与接收。ZeroMQ的核心是提供一个消息队列,使得消息发送和接收更加高效和可靠。ZeroMQ支持多种消息模式,如请求/响应(Request/Reply)、发布/订阅(Publish/Subscribe)等。在上述代码中,ZMQ_REP表示响应模式,用于服务器端,而ZMQ_REQ则用于客户端,发起请求并等待响应。

MessagePack简介

MessagePack是一种高效的二进制序列化库,类似于JSON,但更紧凑、更快。它使用二进制格式来表示数据,使得数据传输更加高效。MessagePack可以用于多种编程语言,包括C++,并且不需要额外的依赖,如Boost。

安装MessagePack和ZeroMQ

安装MessagePack
MessagePack的C++版本可以通过多种方式进行安装,例如使用包管理器或从源代码编译安装。以下是通过CMake安装MessagePack的方法:

  1. 下载MessagePack源码:

    git clone https://github.com/msgpack/msgpack-c.git
    cd msgpack-c
    
  2. 创建构建目录并编译安装:

    mkdir build
    cd build
    cmake ..
    make
    sudo make install
    

安装ZeroMQ

下载ZeroMQ源码

可以从ZeroMQ的官方GitHub仓库下载最新的源码。使用git clone命令进行下载:

git clone https://github.com/zeromq/libzmq.git
cd libzmq
编译和安装

进入源码目录后,可以按照以下步骤进行编译和安装:

  1. 生成配置文件:使用autogen.sh脚本生成配置文件。

    ./autogen.sh
    
  2. 配置编译选项:使用configure脚本进行配置。可以添加各种选项来定制编译过程,例如指定安装路径等。

    ./configure
    

    如果需要指定安装路径(例如/usr/local),可以添加--prefix选项:

    ./configure --prefix=/usr/local
    
  3. 编译源码:使用make命令进行编译。

   make
  1. 安装库文件:使用make install命令安装编译好的库文件到系统中。
   sudo make install

如果需要运行在嵌入式linux上则需要交叉编译,如下:

# 下载源码
git clone https://github.com/zeromq/libzmq.git
cd libzmq# 配置交叉编译环境(示例:ARM架构)
export CC=arm-linux-gnueabihf-gcc
export CXX=arm-linux-gnueabihf-g++# 配置编译选项(禁用非必要功能)
./configure --host=arm-linux-gnueabihf \--prefix=/opt/zmq-embedded \--without-libsodium \      # 禁用加密--without-docs# 编译并安装
make -j4 && make install
验证安装

安装完成后,可以通过编写一个简单的ZeroMQ程序来验证安装是否成功。以下是一个简单的示例程序:

服务器端代码 (server.cpp)

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>int main() {zmq::context_t context(1);zmq::socket_t socket(context, ZMQ_REP);socket.bind("tcp://*:5555");while (true) {zmq::message_t request;// 等待客户端请求socket.recv(request, zmq::recv_flags::none);// 处理请求std::cout << "Received Hello" << std::endl;// 发送响应zmq::message_t reply(5);memcpy(reply.data(), "World", 5);socket.send(reply, zmq::send_flags::none);std::this_thread::sleep_for(std::chrono::seconds(1));}return 0;
}

客户端代码 (client.cpp)

#include <zmq.hpp>
#include <iostream>int main() {zmq::context_t context(1);zmq::socket_t socket(context, ZMQ_REQ);socket.connect("tcp://localhost:5555");for (int request_nbr = 0; request_nbr != 10; request_nbr++) {zmq::message_t request(5);memcpy(request.data(), "Hello", 5);socket.send(request, zmq::send_flags::none);zmq::message_t reply;socket.recv(reply, zmq::recv_flags::none);std::cout << "Received " << reply.to_string() << std::endl;}return 0;
}
编译示例程序

使用g++编译上述两个示例程序。确保编译时链接ZeroMQ库:

g++ -o server server.cpp -lzmq
g++ -o client client.cpp -lzmq
运行示例程序

首先运行服务器端程序,然后运行客户端程序。客户端将向服务器发送请求,服务器收到请求后将返回响应。

./server
# 在另一个终端中
./client
实现简单的RPC框架

下面是一个基于ZeroMQ和MessagePack的简单RPC框架的实现示例。该框架支持注册、调用和异步调用远程方法。

定义RpcHandler类型

using RpcHandler = std::function<msgpack::object(const msgpack::object&)>;

初始化服务器和客户端

bool LightweightRPC::initServer(const std::string& serverAddress) {try {// 创建服务器套接字m_serverSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REP);// 绑定地址m_serverSocket->bind(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}bool LightweightRPC::initClient(const std::string& serverAddress) {try {// 创建客户端套接字m_clientSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REQ);// 连接到服务器m_clientSocket->connect(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}

注册和调用方法

void LightweightRPC::registerMethod(const std::string& methodName, RpcHandler handler) {std::lock_guard<std::mutex> lock(m_methodsMutex);m_methods[methodName] = handler;
}msgpack::object LightweightRPC::callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms) {if (!m_clientSocket) {throw std::runtime_error("RPC client not initialized");}// 序列化请求zmq::message_t request = serializeRequest(methodName, params);// 发送请求m_clientSocket->send(request, zmq::send_flags::none);// 设置接收超时m_clientSocket->set(zmq::sockopt::rcvtimeo, timeout_ms);// 接收响应zmq::message_t reply;auto result = m_clientSocket->recv(reply);// 反序列化响应return deserializeResponse(reply);
}

异步调用方法

std::future<msgpack::object> LightweightRPC::callMethodAsync(const std::string& methodName, const msgpack::object& params) {// 创建promise和futureauto promise = std::make_shared<std::promise<msgpack::object>>();auto future = promise->get_future();// 在新线程中调用方法std::thread([this, methodName, params, promise]() {try {msgpack::object result = this->callMethod(methodName, params);promise->set_value(result);} catch (const std::exception& e) {promise->set_exception(std::make_exception_ptr(e));}}).detach();return future;
}

启动和停止服务器

void LightweightRPC::startServer(int threadCount) {if (!m_serverSocket) {throw std::runtime_error("RPC server not initialized");}// 启动服务器m_running = true;// 创建工作线程for (int i = 0; i < threadCount; ++i) {m_workerThreads.emplace_back(&LightweightRPC::workerThread, this);}
}void LightweightRPC::stopServer() {// 停止服务器m_running = false;// 等待所有工作线程结束for (auto& thread : m_workerThreads) {if (thread.joinable()) {thread.join();}}// 清空工作线程m_workerThreads.clear();
}

服务器处理请求的逻辑

void LightweightRPC::handleRequest(zmq::message_t& request, zmq::message_t& reply) {try {// 反序列化请求auto [methodName, params] = deserializeRequest(request);// 查找方法并调用RpcHandler handler;{std::lock_guard<std::mutex> lock(m_methodsMutex);auto it = m_methods.find(methodName);if (it == m_methods.end()) {reply = serializeError("Method not found: " + methodName);return;}handler = it->second;}// 调用方法处理函数msgpack::object result = handler(params);// 序列化响应reply = serializeResponse(result);} catch (const std::exception& e) {reply = serializeError(std::string("RPC Error: ") + e.what());}
}

序列化和反序列化消息

zmq::message_t LightweightRPC::serializeRequest(const std::string& methodName, const msgpack::object& params) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(2);packer.pack("method");packer.pack(methodName);packer.pack("params");packer.pack(params);return zmq::message_t(sbuf.data(), sbuf.size());
}std::pair<std::string, msgpack::object> LightweightRPC::deserializeRequest(const zmq::message_t& request) {msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(request.data()), request.size());msgpack::object obj = oh.get();std::string methodName;msgpack::object params;if (obj.type != msgpack::type::MAP || obj.via.map.size != 2) {throw std::runtime_error("Invalid RPC request format");}for (uint32_t i = 0; i < obj.via.map.size; ++i) {auto key = obj.via.map.ptr[i].key;auto val = obj.via.map.ptr[i].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "method") {methodName = std::string(val.via.str.ptr, val.via.str.size);} else if (keyStr == "params") {params = val;}}}if (methodName.empty()) {throw std::runtime_error("Method name not found in RPC request");}return std::make_pair(methodName, params);
}zmq::message_t LightweightRPC::serializeResponse(const msgpack::object& result) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(1);packer.pack("result");packer.pack(result);return zmq::message_t(sbuf.data(), sbuf.size());
}msgpack::object LightweightRPC::deserializeResponse(const zmq::message_t& response) {msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(response.data()), response.size());msgpack::object obj = oh.get();if (obj.type != msgpack::type::MAP || obj.via.map.size != 1) {throw std::runtime_error("Invalid RPC response format");}auto key = obj.via.map.ptr[0].key;auto val = obj.via.map.ptr[0].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "result") {return val;} else if (keyStr == "error") {std::string errorMsg(val.via.str.ptr, val.via.str.size);throw std::runtime_error(errorMsg);}}throw std::runtime_error("Invalid RPC response format");
}zmq::message_t LightweightRPC::serializeError(const std::string& errorMessage) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(1);packer.pack("error");packer.pack(errorMessage);return zmq::message_t(sbuf.data(), sbuf.size());
}

使用示例

#include "lightweight_rpc.h"
#include <iostream>
#include <string>
#include <thread>
#include <chrono>// 示例RPC方法:加法
int add(int a, int b) {std::cout << "RPC Server: Calculating " << a << " + " << b << std::endl;return a + b;
}// 示例RPC方法:字符串连接
std::string concat(const std::string& a, const std::string& b) {std::cout << "RPC Server: Concatenating \"" << a << "\" and \"" << b << "\"" << std::endl;return a + b;
}// 服务器示例
void runServer() {try {std::cout << "Starting RPC server..." << std::endl;// 创建RPC服务器hub::RpcServer server("tcp://*:5555");// 注册RPC方法server.registerMethod("add", add);server.registerMethod("concat", concat);// 启动服务器server.start();std::cout << "RPC server started. Press Ctrl+C to stop." << std::endl;while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}} catch (const std::exception& e) {std::cerr << "Server error: " << e.what() << std::endl;}
}// 客户端示例
void runClient() {try {std::cout << "Starting RPC client..." << std::endl;// 创建RPC客户端hub::RpcClient client("tcp://localhost:5555");// 调用加法方法auto result1 = client.call<int, int>("add", 2000, 10, 20);int sum;result1.convert(sum);std::cout << "RPC Client: 10 + 20 = " << sum << std::endl;// 调用字符串连接方法auto result2 = client.call<std::string, std::string>("concat", 2000, "Hello, ", "World!");std::string concatResult;result2.convert(concatResult);std::cout << "RPC Client: concat result = " << concatResult << std::endl;// 异步调用示例auto future = client.callAsync<int, int>("add", 30, 40);std::cout << "RPC Client: Async call made, waiting for result..." << std::endl;// 等待结果auto result5 = future.get();int asyncSum;result5.convert(asyncSum);std::cout << "RPC Client: Async result 30 + 40 = " << asyncSum << std::endl;} catch (const std::exception& e) {std::cerr << "Client error: " << e.what() << std::endl;}
}int main(int argc, char* argv[]) {if (argc < 2) {std::cerr << "Usage: " << argv[0] << " [server|client]" << std::endl;return 1;}std::string mode = argv[1];if (mode == "server") {runServer();} else if (mode == "client") {runClient();} else {std::cerr << "Invalid mode. Use 'server' or 'client'." << std::endl;return 1;}return 0;
}
完整源码
#ifndef LIGHTWEIGHT_RPC_H
#define LIGHTWEIGHT_RPC_H#include <string>
#include <functional>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <vector>
#include <future>
#include <thread>
#include <atomic>
#include <zmq.hpp>
#include <msgpack.hpp>namespace hub {/*** @brief 轻量级RPC框架,基于ZeroMQ和MessagePack*/
class LightweightRPC {
public:// RPC方法处理函数类型using RpcHandler = std::function<msgpack::object(const msgpack::object&)>;/*** @brief 获取LightweightRPC的单例实例* * @return LightweightRPC& 单例实例*/static LightweightRPC& getInstance();/*** @brief 初始化RPC服务器* * @param serverAddress 服务器地址,例如"tcp://*:5555"* @return true 如果初始化成功,false 否则*/bool initServer(const std::string& serverAddress);/*** @brief 初始化RPC客户端* * @param serverAddress 服务器地址,例如"tcp://localhost:5555"* @return true 如果初始化成功,false 否则*/bool initClient(const std::string& serverAddress);/*** @brief 注册RPC方法* * @param methodName 方法名* @param handler 处理函数*/void registerMethod(const std::string& methodName, RpcHandler handler);/*** @brief 调用远程方法* * @param methodName 方法名* @param params 参数* @param timeout_ms 超时时间(毫秒)* @return msgpack::object 返回结果*/msgpack::object callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms = 1000);/*** @brief 异步调用远程方法* * @param methodName 方法名* @param params 参数* @return std::future<msgpack::object> 返回结果的future*/std::future<msgpack::object> callMethodAsync(const std::string& methodName, const msgpack::object& params);/*** @brief 启动RPC服务器* * @param threadCount 工作线程数量*/void startServer(int threadCount = 1);/*** @brief 停止RPC服务器*/void stopServer();/*** @brief 关闭RPC客户端*/void closeClient();private:LightweightRPC();~LightweightRPC();LightweightRPC(const LightweightRPC&) = delete;LightweightRPC& operator=(const LightweightRPC&) = delete;// 服务器相关std::unique_ptr<zmq::context_t> m_context;std::unique_ptr<zmq::socket_t> m_serverSocket;std::unique_ptr<zmq::socket_t> m_clientSocket;std::unordered_map<std::string, RpcHandler> m_methods;std::mutex m_methodsMutex;std::vector<std::thread> m_workerThreads;std::atomic<bool> m_running;// 工作线程函数void workerThread();// 处理RPC请求void handleRequest(zmq::message_t& request, zmq::message_t& reply);// 序列化RPC请求zmq::message_t serializeRequest(const std::string& methodName, const msgpack::object& params);// 反序列化RPC请求std::pair<std::string, msgpack::object> deserializeRequest(const zmq::message_t& request);// 序列化RPC响应zmq::message_t serializeResponse(const msgpack::object& result);// 反序列化RPC响应msgpack::object deserializeResponse(const zmq::message_t& response);// 序列化RPC错误zmq::message_t serializeError(const std::string& errorMessage);
};// 便捷的RPC客户端类
class RpcClient {
public:/*** @brief 构造函数* * @param serverAddress 服务器地址*/RpcClient(const std::string& serverAddress);/*** @brief 析构函数*/~RpcClient();/*** @brief 调用远程方法* * @param methodName 方法名* @param params 参数* @param timeout_ms 超时时间(毫秒)* @return msgpack::object 返回结果*/template<typename... Args>msgpack::object call(const std::string& methodName, int timeout_ms = 1000, Args... args) {// 打包参数msgpack::type::tuple<Args...> params(args...);msgpack::sbuffer sbuf;msgpack::pack(sbuf, params);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());msgpack::object obj = oh.get();// 调用方法return LightweightRPC::getInstance().callMethod(methodName, obj, timeout_ms);}/*** @brief 异步调用远程方法* * @param methodName 方法名* @param params 参数* @return std::future<msgpack::object> 返回结果的future*/template<typename... Args>std::future<msgpack::object> callAsync(const std::string& methodName, Args... args) {// 打包参数msgpack::type::tuple<Args...> params(args...);msgpack::sbuffer sbuf;msgpack::pack(sbuf, params);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());msgpack::object obj = oh.get();// 异步调用方法return LightweightRPC::getInstance().callMethodAsync(methodName, obj);}
};// 便捷的RPC服务器类
class RpcServer {
public:/*** @brief 构造函数* * @param serverAddress 服务器地址* @param threadCount 工作线程数量*/RpcServer(const std::string& serverAddress, int threadCount = 1);/*** @brief 析构函数*/~RpcServer();/*** @brief 注册RPC方法* * @param methodName 方法名* @param handler 处理函数*/template<typename Func>void registerMethod(const std::string& methodName, Func handler) {LightweightRPC::getInstance().registerMethod(methodName, [handler](const msgpack::object& params) -> msgpack::object {// 调用处理函数return invokeHandler(handler, params);});}/*** @brief 启动RPC服务器*/void start();/*** @brief 停止RPC服务器*/void stop();private:int m_threadCount;// 调用处理函数并返回结果template<typename Func, typename... Args>static msgpack::object invokeHandler(Func handler, const msgpack::object& params) {try {// 解包参数msgpack::type::tuple<Args...> args;params.convert(args);// 调用处理函数auto result = callHandlerWithTuple(handler, args);// 打包结果msgpack::sbuffer sbuf;msgpack::pack(sbuf, result);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();} catch (const std::exception& e) {// 处理异常msgpack::sbuffer sbuf;msgpack::pack(sbuf, std::string("RPC Error: ") + e.what());msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();}}// 使用tuple调用处理函数template<typename Func, typename Tuple, std::size_t... I>static auto callHandlerWithTupleImpl(Func handler, Tuple&& tuple, std::index_sequence<I...>) {return handler(std::get<I>(std::forward<Tuple>(tuple))...);}template<typename Func, typename Tuple>static auto callHandlerWithTuple(Func handler, Tuple&& tuple) {constexpr auto size = std::tuple_size<typename std::decay<Tuple>::type>::value;return callHandlerWithTupleImpl(handler, std::forward<Tuple>(tuple), std::make_index_sequence<size>{});}
};} // namespace hub#endif // LIGHTWEIGHT_RPC_H 
#include "lightweight_rpc.h"
#include <iostream>
#include <chrono>namespace hub {// 单例实例
LightweightRPC& LightweightRPC::getInstance() {static LightweightRPC instance;return instance;
}LightweightRPC::LightweightRPC() : m_running(false) {// 创建ZeroMQ上下文m_context = std::make_unique<zmq::context_t>(1);
}LightweightRPC::~LightweightRPC() {// 停止服务器stopServer();// 关闭客户端closeClient();
}bool LightweightRPC::initServer(const std::string& serverAddress) {try {// 创建服务器套接字m_serverSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REP);// 绑定地址m_serverSocket->bind(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}bool LightweightRPC::initClient(const std::string& serverAddress) {try {// 创建客户端套接字m_clientSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REQ);// 连接到服务器m_clientSocket->connect(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}void LightweightRPC::registerMethod(const std::string& methodName, RpcHandler handler) {std::lock_guard<std::mutex> lock(m_methodsMutex);m_methods[methodName] = handler;
}msgpack::object LightweightRPC::callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms) {if (!m_clientSocket) {throw std::runtime_error("RPC client not initialized");}try {// 序列化请求zmq::message_t request = serializeRequest(methodName, params);// 发送请求m_clientSocket->send(request, zmq::send_flags::none);// 设置接收超时m_clientSocket->set(zmq::sockopt::rcvtimeo, timeout_ms);// 接收响应zmq::message_t reply;auto result = m_clientSocket->recv(reply);if (!result.has_value() || result.value() == 0) {throw std::runtime_error("RPC call timeout");}// 反序列化响应return deserializeResponse(reply);} catch (const std::exception& e) {std::cerr << "RPC call error: " << e.what() << std::endl;// 返回错误msgpack::sbuffer sbuf;msgpack::pack(sbuf, std::string("RPC Error: ") + e.what());msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();}
}std::future<msgpack::object> LightweightRPC::callMethodAsync(const std::string& methodName, const msgpack::object& params) {// 创建promise和futureauto promise = std::make_shared<std::promise<msgpack::object>>();auto future = promise->get_future();// 在新线程中调用方法std::thread([this, methodName, params, promise]() {try {// 调用方法msgpack::object result = this->callMethod(methodName, params);// 设置结果promise->set_value(result);} catch (const std::exception& e) {// 设置异常promise->set_exception(std::make_exception_ptr(e));}}).detach();return future;
}void LightweightRPC::startServer(int threadCount) {if (!m_serverSocket) {throw std::runtime_error("RPC server not initialized");}// 停止现有的服务器stopServer();// 启动服务器m_running = true;// 创建工作线程for (int i = 0; i < threadCount; ++i) {m_workerThreads.emplace_back(&LightweightRPC::workerThread, this);}
}void LightweightRPC::stopServer() {// 停止服务器m_running = false;// 等待所有工作线程结束for (auto& thread : m_workerThreads) {if (thread.joinable()) {thread.join();}}// 清空工作线程m_workerThreads.clear();
}void LightweightRPC::closeClient() {// 关闭客户端套接字m_clientSocket.reset();
}void LightweightRPC::workerThread() {while (m_running) {try {// 接收请求zmq::message_t request;auto result = m_serverSocket->recv(request, zmq::recv_flags::dontwait);if (result.has_value() && result.value() > 0) {// 处理请求zmq::message_t reply;handleRequest(request, reply);// 发送响应m_serverSocket->send(reply, zmq::send_flags::none);} else {// 没有请求,休眠一段时间std::this_thread::sleep_for(std::chrono::milliseconds(10));}} catch (const std::exception& e) {std::cerr << "RPC server error: " << e.what() << std::endl;}}
}void LightweightRPC::handleRequest(zmq::message_t& request, zmq::message_t& reply) {try {// 反序列化请求auto [methodName, params] = deserializeRequest(request);// 查找方法处理函数RpcHandler handler;{std::lock_guard<std::mutex> lock(m_methodsMutex);auto it = m_methods.find(methodName);if (it == m_methods.end()) {// 方法不存在reply = serializeError("Method not found: " + methodName);return;}handler = it->second;}// 调用方法处理函数msgpack::object result = handler(params);// 序列化响应reply = serializeResponse(result);} catch (const std::exception& e) {// 处理异常reply = serializeError(std::string("RPC Error: ") + e.what());}
}zmq::message_t LightweightRPC::serializeRequest(const std::string& methodName, const msgpack::object& params) {// 创建请求对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包方法名和参数packer.pack_map(2);packer.pack("method");packer.pack(methodName);packer.pack("params");packer.pack(params);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}std::pair<std::string, msgpack::object> LightweightRPC::deserializeRequest(const zmq::message_t& request) {// 解包请求msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(request.data()), request.size());msgpack::object obj = oh.get();// 提取方法名和参数std::string methodName;msgpack::object params;if (obj.type != msgpack::type::MAP || obj.via.map.size != 2) {throw std::runtime_error("Invalid RPC request format");}for (uint32_t i = 0; i < obj.via.map.size; ++i) {auto key = obj.via.map.ptr[i].key;auto val = obj.via.map.ptr[i].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "method") {if (val.type != msgpack::type::STR) {throw std::runtime_error("Method name must be a string");}methodName = std::string(val.via.str.ptr, val.via.str.size);} else if (keyStr == "params") {params = val;}}}if (methodName.empty()) {throw std::runtime_error("Method name not found in RPC request");}return std::make_pair(methodName, params);
}zmq::message_t LightweightRPC::serializeResponse(const msgpack::object& result) {// 创建响应对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包结果packer.pack_map(1);packer.pack("result");packer.pack(result);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}msgpack::object LightweightRPC::deserializeResponse(const zmq::message_t& response) {// 解包响应msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(response.data()), response.size());msgpack::object obj = oh.get();// 提取结果if (obj.type != msgpack::type::MAP || obj.via.map.size != 1) {throw std::runtime_error("Invalid RPC response format");}auto key = obj.via.map.ptr[0].key;auto val = obj.via.map.ptr[0].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "result") {return val;} else if (keyStr == "error") {if (val.type == msgpack::type::STR) {std::string errorMsg(val.via.str.ptr, val.via.str.size);throw std::runtime_error(errorMsg);} else {throw std::runtime_error("Unknown RPC error");}}}throw std::runtime_error("Invalid RPC response format");
}zmq::message_t LightweightRPC::serializeError(const std::string& errorMessage) {// 创建错误对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包错误packer.pack_map(1);packer.pack("error");packer.pack(errorMessage);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}// RpcClient实现
RpcClient::RpcClient(const std::string& serverAddress) {if (!LightweightRPC::getInstance().initClient(serverAddress)) {throw std::runtime_error("Failed to initialize RPC client");}
}RpcClient::~RpcClient() {LightweightRPC::getInstance().closeClient();
}// RpcServer实现
RpcServer::RpcServer(const std::string& serverAddress, int threadCount) : m_threadCount(threadCount) {if (!LightweightRPC::getInstance().initServer(serverAddress)) {throw std::runtime_error("Failed to initialize RPC server");}
}RpcServer::~RpcServer() {stop();
}void RpcServer::start() {LightweightRPC::getInstance().startServer(m_threadCount);
}void RpcServer::stop() {LightweightRPC::getInstance().stopServer();
}} // namespace hub 
总结

通过上述步骤,我们可以构建一个简单的、基于ZeroMQ和MessagePack的RPC框架。该框架支持注册、调用和异步调用远程方法,具备较高的性能和可靠性。希望本文能帮助大家更好地理解和使用ZeroMQ与MessagePack来实现高效的分布式通信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/34193.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux第三次练习

1、创建根目录结构中的所有的普通文件 首先在根目录下面新创建一个test目录&#xff0c;然后将查找到的普通文件新建到test目录下 2、列出所有账号的账号名 3、将/etc/passwd中内容按照冒号隔开的第三个字符从大到小排序后输出所有内容 4、列出/etc/passwd中的第20行-25行内容…

[CISCN 2022 初赛]ezpop(没成功复现)

打开在线环境可以看到&#xff1a; 记得之前做过一个类似的就是有点像照着漏洞去复现。应该可以直接在网上找到链子去打。 www.zip查看路由是 Index/test&#xff0c;然后 post 传参 a&#xff1a; exp&#xff08;参考了别的大神的wp&#xff09;&#xff1a; <?php //…

技术-NBIOT

是什么&#xff1f; 窄带物联网&#xff08;Narrow Band Internet of Things, NB-IoT&#xff09;成为万物互联网络的一个重要分支支持低功耗设备在广域网的蜂窝数据连接&#xff0c;也被叫作低功耗广域网(LPWAN)NB-IoT支持待机时间长、对网络连接要求较高设备的高效连接NB-Io…

Spring @Bean注解使用场景二

bean:最近在写一篇让Successfactors顾问都能搞明白的sso的逻辑的文章&#xff0c;所以一致在研究IAS的saml2.0的协议&#xff0c;希望用代码去解释SP、idp的一些概念&#xff0c;让顾问了解SSO与saml的关系&#xff0c;在github找代码的时候发现一些代码的调用关系很难理解&…

pip install和conda install的区别

这里写目录标题 一、什么是 Python 依赖&#xff08;Python Dependencies&#xff09;&#xff1f;1. 依赖的作用2. 如何管理 Python 依赖3. 依赖管理问题4. 依赖锁定总结 二、使用pip安装包venv隔离环境方法 1&#xff1a;使用 venv&#xff08;推荐&#xff09;创建虚拟环境激…

R语言高效数据处理-自定义EXCEL数据排版

注&#xff1a;以下代码均为实际数据处理中的笔记摘录&#xff0c;所以很零散 1、自定义excel表数据输出格式、布局 在实际数据处理中为了提升效率&#xff0c;将Excel报表交付给需求方时减少手动调整的环节很有必要 #1.1设置表头格式 header_style <- createStyle(font…

Word 小黑第4套

对应大猫41 上下日期是一起变动的&#xff0c;删掉第一个&#xff0c;第二个日期格式&#xff08;文件 -选项 -自定义功能区 -选上开发工具&#xff09; 点开发工具 -属性 选择相应的日期格式&#xff09; 修改标题样式时&#xff0c;标题三只有点标题二时才会显示 右击正文样…

酒店宾馆IPTV数字电视系统:创新宾客体验,引领智慧服务新潮流

酒店宾馆IPTV数字电视系统&#xff1a;创新宾客体验&#xff0c;引领智慧服务新潮流 北京海特伟业科技有限公司任洪卓于2025年3月15日发布 随着智慧酒店的不断发展&#xff0c;宾客对于酒店内的娱乐和信息服务需求日益多样化&#xff0c;传统的电视服务已难以满足现代宾客的高…

jupyter无法转换为PDF,HTMLnbconvert failed: Pandoc wasn‘t found.

无法转为PDF 手动下载工具 https://github.com/jgm/pandoc/releases/tag/3.6.3 似乎跟我想的不大一样&#xff0c;还有新的报错 https://nbconvert.readthedocs.io/en/latest/install.html#installing-tex 不知道下的啥玩意儿 sudo apt-get install texlive-xetex texlive-fon…

如何在 VS编译器上使用 C99规定的变长数组------使用Clang工具

VS编译器默认处理代码的工具是 MSVC&#xff0c;而MSVC工具是无法处理变长数组的&#xff0c;这个时候我们就要换一个处理代码的工具了----Clang 1 int n 9; 2 int arr[n];// 数组长度可以拟定1.打开 Visual Stdudio Intaller 2.点击修改&#xff0c;鼠标下滑找到>>使用…

vue echarts封装使用

echarts 尺寸自动调节 resize.js 柱状图 components/dashboard/lineChart.vue <template><div :class"className" :style"{height:height,width:width}" /> </template><script> import echarts from echarts require(echarts/…

《计算机图形学》第二课笔记-----二维变换的推导

前言&#xff1a;为什么这么突兀的把这一节内容放在了第二课&#xff0c;第一是因为我急于求成&#xff0c;第二是因为这一章节太重要了&#xff0c;这几乎是二维三维变换的最核心的东西&#xff0c;理解了这一章节内容&#xff0c;后面的就会像打通了任督二脉一样&#xff0c;…

OTP单片机调试工具之—单线数据编码

OTP单片机调试工具在实现过程中离不开单线数据的传输&#xff0c;那么使用哪一种方式的数据编码会比较好呢&#xff1f; 我所了解的主要有以下三种&#xff1a; 1.UART&#xff08;串口&#xff09;&#xff0c;这种方式在单片机和pc之间进行传输都非常常见&#xff0c;效率比较…

背诵--2

DAY01 面向对象回顾、继承、抽象类 学习目标 能够写出类的继承格式public class 子类 extends 父类{}public class Cat extends Animal{} 能够说出继承的特点子类继承父类,就会自动拥有父类非私有的成员 能够说出子类调用父类的成员特点1.子类有使用子类自己的2.子类没有使用…

穷举vs暴搜vs深搜vs回溯vs剪枝刷题 + 总结

文章目录 全排列题解代码 子集题解代码 总结 全排列 题目链接 题解 1. 画一颗决策树 2. 全局变量&#xff1a; int[ ][ ] ret&#xff1a;用于存结果的二维数组 int[ ] path&#xff1a;用于存每次路径的答案 bool[ ] check&#xff1a;判断这个数是否已经用过&#xff0c;…

深度学习中学习率调整策略

学习率衰减策略是深度学习优化过程中的一个关键因素&#xff0c;它决定了训练过程中学习率的调整方式&#xff0c;从而影响模型收敛的速度和效果。不同的衰减策略在不同的任务和模型上可能有不同的表现&#xff0c;下面从我用到过的几个衰减策略进行记录&#xff0c;后续慢慢跟…

《Electron 学习之旅:从入门到实践》

前言 Electron 简介 Electron 是由 GitHub 开发的一个开源框架&#xff0c;基于 Chromium 和 Node.js。 它允许开发者使用 Web 技术&#xff08;HTML、CSS、JavaScript&#xff09;构建跨平台的桌面应用程序。 Electron 的优势 跨平台&#xff1a;支持 Windows、macOS 和 Linux…

UBuntu24.04-JDK7-TOMCAT7安装

jdk7 apt-get 找不到。 tomcat7 也没找到。 以下是安装成功的&#xff0c;供大家参考。 1.JAVA openjdk-7-jdk /usr/lib/jvm/java-7-openjdk-amd641.安装指定版本apt search jdk //查找版本sudo apt install default-jdk //此为默认版本sudo apt install ope…

美畅物联丨WebRTC 技术详解:构建实时通信的数字桥梁

在互联网技术飞速发展的今天&#xff0c;实时通信已成为数字生活的核心需求。WebRTC作为一个开源项目&#xff0c;凭借卓越的技术实力与创新理念&#xff0c;为网页和移动应用带来了颠覆性的实时通信能力。它突破了传统通信方式的限制&#xff0c;实现了音频、视频和数据在用户…

驾驭 DeepSeek 科技之翼,翱翔现代学习新天际

在当今这个信息爆炸的时代&#xff0c;学习的方式和途径正在经历着前所未有的变革。人工智能技术的飞速发展&#xff0c;为我们的学习带来了全新的机遇和挑战。DeepSeek 作为一款强大的大语言模型&#xff0c;凭借其卓越的性能和丰富的功能&#xff0c;为现代学习注入了新的活力…