在现代的分布式系统中,远程过程调用(RPC)是一个非常重要的机制,它允许不同的服务或组件之间的通信,就像调用本地函数一样。本文将介绍如何使用ZeroMQ和MessagePack来构建一个轻量级的RPC框架,并提供一个简单的使用示例。
ZeroMQ简介
ZeroMQ(也称为0MQ)是一个高性能的异步消息库,旨在使用标准的、对等的传输协议实现消息的发送与接收。ZeroMQ的核心是提供一个消息队列,使得消息发送和接收更加高效和可靠。ZeroMQ支持多种消息模式,如请求/响应(Request/Reply)、发布/订阅(Publish/Subscribe)等。在上述代码中,ZMQ_REP
表示响应模式,用于服务器端,而ZMQ_REQ
则用于客户端,发起请求并等待响应。
MessagePack简介
MessagePack是一种高效的二进制序列化库,类似于JSON,但更紧凑、更快。它使用二进制格式来表示数据,使得数据传输更加高效。MessagePack可以用于多种编程语言,包括C++,并且不需要额外的依赖,如Boost。
安装MessagePack和ZeroMQ
安装MessagePack
MessagePack的C++版本可以通过多种方式进行安装,例如使用包管理器或从源代码编译安装。以下是通过CMake安装MessagePack的方法:
-
下载MessagePack源码:
git clone https://github.com/msgpack/msgpack-c.git cd msgpack-c
-
创建构建目录并编译安装:
mkdir build cd build cmake .. make sudo make install
安装ZeroMQ
下载ZeroMQ源码
可以从ZeroMQ的官方GitHub仓库下载最新的源码。使用git clone
命令进行下载:
git clone https://github.com/zeromq/libzmq.git
cd libzmq
编译和安装
进入源码目录后,可以按照以下步骤进行编译和安装:
-
生成配置文件:使用
autogen.sh
脚本生成配置文件。./autogen.sh
-
配置编译选项:使用
configure
脚本进行配置。可以添加各种选项来定制编译过程,例如指定安装路径等。./configure
如果需要指定安装路径(例如
/usr/local
),可以添加--prefix
选项:./configure --prefix=/usr/local
-
编译源码:使用
make
命令进行编译。
make
- 安装库文件:使用
make install
命令安装编译好的库文件到系统中。
sudo make install
如果需要运行在嵌入式linux上则需要交叉编译,如下:
# 下载源码
git clone https://github.com/zeromq/libzmq.git
cd libzmq# 配置交叉编译环境(示例:ARM架构)
export CC=arm-linux-gnueabihf-gcc
export CXX=arm-linux-gnueabihf-g++# 配置编译选项(禁用非必要功能)
./configure --host=arm-linux-gnueabihf \--prefix=/opt/zmq-embedded \--without-libsodium \ # 禁用加密--without-docs# 编译并安装
make -j4 && make install
验证安装
安装完成后,可以通过编写一个简单的ZeroMQ程序来验证安装是否成功。以下是一个简单的示例程序:
服务器端代码 (server.cpp)
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>int main() {zmq::context_t context(1);zmq::socket_t socket(context, ZMQ_REP);socket.bind("tcp://*:5555");while (true) {zmq::message_t request;// 等待客户端请求socket.recv(request, zmq::recv_flags::none);// 处理请求std::cout << "Received Hello" << std::endl;// 发送响应zmq::message_t reply(5);memcpy(reply.data(), "World", 5);socket.send(reply, zmq::send_flags::none);std::this_thread::sleep_for(std::chrono::seconds(1));}return 0;
}
客户端代码 (client.cpp)
#include <zmq.hpp>
#include <iostream>int main() {zmq::context_t context(1);zmq::socket_t socket(context, ZMQ_REQ);socket.connect("tcp://localhost:5555");for (int request_nbr = 0; request_nbr != 10; request_nbr++) {zmq::message_t request(5);memcpy(request.data(), "Hello", 5);socket.send(request, zmq::send_flags::none);zmq::message_t reply;socket.recv(reply, zmq::recv_flags::none);std::cout << "Received " << reply.to_string() << std::endl;}return 0;
}
编译示例程序
使用g++
编译上述两个示例程序。确保编译时链接ZeroMQ库:
g++ -o server server.cpp -lzmq
g++ -o client client.cpp -lzmq
运行示例程序
首先运行服务器端程序,然后运行客户端程序。客户端将向服务器发送请求,服务器收到请求后将返回响应。
./server
# 在另一个终端中
./client
实现简单的RPC框架
下面是一个基于ZeroMQ和MessagePack的简单RPC框架的实现示例。该框架支持注册、调用和异步调用远程方法。
定义RpcHandler类型
using RpcHandler = std::function<msgpack::object(const msgpack::object&)>;
初始化服务器和客户端
bool LightweightRPC::initServer(const std::string& serverAddress) {try {// 创建服务器套接字m_serverSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REP);// 绑定地址m_serverSocket->bind(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}bool LightweightRPC::initClient(const std::string& serverAddress) {try {// 创建客户端套接字m_clientSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REQ);// 连接到服务器m_clientSocket->connect(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}
注册和调用方法
void LightweightRPC::registerMethod(const std::string& methodName, RpcHandler handler) {std::lock_guard<std::mutex> lock(m_methodsMutex);m_methods[methodName] = handler;
}msgpack::object LightweightRPC::callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms) {if (!m_clientSocket) {throw std::runtime_error("RPC client not initialized");}// 序列化请求zmq::message_t request = serializeRequest(methodName, params);// 发送请求m_clientSocket->send(request, zmq::send_flags::none);// 设置接收超时m_clientSocket->set(zmq::sockopt::rcvtimeo, timeout_ms);// 接收响应zmq::message_t reply;auto result = m_clientSocket->recv(reply);// 反序列化响应return deserializeResponse(reply);
}
异步调用方法
std::future<msgpack::object> LightweightRPC::callMethodAsync(const std::string& methodName, const msgpack::object& params) {// 创建promise和futureauto promise = std::make_shared<std::promise<msgpack::object>>();auto future = promise->get_future();// 在新线程中调用方法std::thread([this, methodName, params, promise]() {try {msgpack::object result = this->callMethod(methodName, params);promise->set_value(result);} catch (const std::exception& e) {promise->set_exception(std::make_exception_ptr(e));}}).detach();return future;
}
启动和停止服务器
void LightweightRPC::startServer(int threadCount) {if (!m_serverSocket) {throw std::runtime_error("RPC server not initialized");}// 启动服务器m_running = true;// 创建工作线程for (int i = 0; i < threadCount; ++i) {m_workerThreads.emplace_back(&LightweightRPC::workerThread, this);}
}void LightweightRPC::stopServer() {// 停止服务器m_running = false;// 等待所有工作线程结束for (auto& thread : m_workerThreads) {if (thread.joinable()) {thread.join();}}// 清空工作线程m_workerThreads.clear();
}
服务器处理请求的逻辑
void LightweightRPC::handleRequest(zmq::message_t& request, zmq::message_t& reply) {try {// 反序列化请求auto [methodName, params] = deserializeRequest(request);// 查找方法并调用RpcHandler handler;{std::lock_guard<std::mutex> lock(m_methodsMutex);auto it = m_methods.find(methodName);if (it == m_methods.end()) {reply = serializeError("Method not found: " + methodName);return;}handler = it->second;}// 调用方法处理函数msgpack::object result = handler(params);// 序列化响应reply = serializeResponse(result);} catch (const std::exception& e) {reply = serializeError(std::string("RPC Error: ") + e.what());}
}
序列化和反序列化消息
zmq::message_t LightweightRPC::serializeRequest(const std::string& methodName, const msgpack::object& params) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(2);packer.pack("method");packer.pack(methodName);packer.pack("params");packer.pack(params);return zmq::message_t(sbuf.data(), sbuf.size());
}std::pair<std::string, msgpack::object> LightweightRPC::deserializeRequest(const zmq::message_t& request) {msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(request.data()), request.size());msgpack::object obj = oh.get();std::string methodName;msgpack::object params;if (obj.type != msgpack::type::MAP || obj.via.map.size != 2) {throw std::runtime_error("Invalid RPC request format");}for (uint32_t i = 0; i < obj.via.map.size; ++i) {auto key = obj.via.map.ptr[i].key;auto val = obj.via.map.ptr[i].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "method") {methodName = std::string(val.via.str.ptr, val.via.str.size);} else if (keyStr == "params") {params = val;}}}if (methodName.empty()) {throw std::runtime_error("Method name not found in RPC request");}return std::make_pair(methodName, params);
}zmq::message_t LightweightRPC::serializeResponse(const msgpack::object& result) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(1);packer.pack("result");packer.pack(result);return zmq::message_t(sbuf.data(), sbuf.size());
}msgpack::object LightweightRPC::deserializeResponse(const zmq::message_t& response) {msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(response.data()), response.size());msgpack::object obj = oh.get();if (obj.type != msgpack::type::MAP || obj.via.map.size != 1) {throw std::runtime_error("Invalid RPC response format");}auto key = obj.via.map.ptr[0].key;auto val = obj.via.map.ptr[0].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "result") {return val;} else if (keyStr == "error") {std::string errorMsg(val.via.str.ptr, val.via.str.size);throw std::runtime_error(errorMsg);}}throw std::runtime_error("Invalid RPC response format");
}zmq::message_t LightweightRPC::serializeError(const std::string& errorMessage) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(1);packer.pack("error");packer.pack(errorMessage);return zmq::message_t(sbuf.data(), sbuf.size());
}
使用示例
#include "lightweight_rpc.h"
#include <iostream>
#include <string>
#include <thread>
#include <chrono>// 示例RPC方法:加法
int add(int a, int b) {std::cout << "RPC Server: Calculating " << a << " + " << b << std::endl;return a + b;
}// 示例RPC方法:字符串连接
std::string concat(const std::string& a, const std::string& b) {std::cout << "RPC Server: Concatenating \"" << a << "\" and \"" << b << "\"" << std::endl;return a + b;
}// 服务器示例
void runServer() {try {std::cout << "Starting RPC server..." << std::endl;// 创建RPC服务器hub::RpcServer server("tcp://*:5555");// 注册RPC方法server.registerMethod("add", add);server.registerMethod("concat", concat);// 启动服务器server.start();std::cout << "RPC server started. Press Ctrl+C to stop." << std::endl;while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}} catch (const std::exception& e) {std::cerr << "Server error: " << e.what() << std::endl;}
}// 客户端示例
void runClient() {try {std::cout << "Starting RPC client..." << std::endl;// 创建RPC客户端hub::RpcClient client("tcp://localhost:5555");// 调用加法方法auto result1 = client.call<int, int>("add", 2000, 10, 20);int sum;result1.convert(sum);std::cout << "RPC Client: 10 + 20 = " << sum << std::endl;// 调用字符串连接方法auto result2 = client.call<std::string, std::string>("concat", 2000, "Hello, ", "World!");std::string concatResult;result2.convert(concatResult);std::cout << "RPC Client: concat result = " << concatResult << std::endl;// 异步调用示例auto future = client.callAsync<int, int>("add", 30, 40);std::cout << "RPC Client: Async call made, waiting for result..." << std::endl;// 等待结果auto result5 = future.get();int asyncSum;result5.convert(asyncSum);std::cout << "RPC Client: Async result 30 + 40 = " << asyncSum << std::endl;} catch (const std::exception& e) {std::cerr << "Client error: " << e.what() << std::endl;}
}int main(int argc, char* argv[]) {if (argc < 2) {std::cerr << "Usage: " << argv[0] << " [server|client]" << std::endl;return 1;}std::string mode = argv[1];if (mode == "server") {runServer();} else if (mode == "client") {runClient();} else {std::cerr << "Invalid mode. Use 'server' or 'client'." << std::endl;return 1;}return 0;
}
完整源码
#ifndef LIGHTWEIGHT_RPC_H
#define LIGHTWEIGHT_RPC_H#include <string>
#include <functional>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <vector>
#include <future>
#include <thread>
#include <atomic>
#include <zmq.hpp>
#include <msgpack.hpp>namespace hub {/*** @brief 轻量级RPC框架,基于ZeroMQ和MessagePack*/
class LightweightRPC {
public:// RPC方法处理函数类型using RpcHandler = std::function<msgpack::object(const msgpack::object&)>;/*** @brief 获取LightweightRPC的单例实例* * @return LightweightRPC& 单例实例*/static LightweightRPC& getInstance();/*** @brief 初始化RPC服务器* * @param serverAddress 服务器地址,例如"tcp://*:5555"* @return true 如果初始化成功,false 否则*/bool initServer(const std::string& serverAddress);/*** @brief 初始化RPC客户端* * @param serverAddress 服务器地址,例如"tcp://localhost:5555"* @return true 如果初始化成功,false 否则*/bool initClient(const std::string& serverAddress);/*** @brief 注册RPC方法* * @param methodName 方法名* @param handler 处理函数*/void registerMethod(const std::string& methodName, RpcHandler handler);/*** @brief 调用远程方法* * @param methodName 方法名* @param params 参数* @param timeout_ms 超时时间(毫秒)* @return msgpack::object 返回结果*/msgpack::object callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms = 1000);/*** @brief 异步调用远程方法* * @param methodName 方法名* @param params 参数* @return std::future<msgpack::object> 返回结果的future*/std::future<msgpack::object> callMethodAsync(const std::string& methodName, const msgpack::object& params);/*** @brief 启动RPC服务器* * @param threadCount 工作线程数量*/void startServer(int threadCount = 1);/*** @brief 停止RPC服务器*/void stopServer();/*** @brief 关闭RPC客户端*/void closeClient();private:LightweightRPC();~LightweightRPC();LightweightRPC(const LightweightRPC&) = delete;LightweightRPC& operator=(const LightweightRPC&) = delete;// 服务器相关std::unique_ptr<zmq::context_t> m_context;std::unique_ptr<zmq::socket_t> m_serverSocket;std::unique_ptr<zmq::socket_t> m_clientSocket;std::unordered_map<std::string, RpcHandler> m_methods;std::mutex m_methodsMutex;std::vector<std::thread> m_workerThreads;std::atomic<bool> m_running;// 工作线程函数void workerThread();// 处理RPC请求void handleRequest(zmq::message_t& request, zmq::message_t& reply);// 序列化RPC请求zmq::message_t serializeRequest(const std::string& methodName, const msgpack::object& params);// 反序列化RPC请求std::pair<std::string, msgpack::object> deserializeRequest(const zmq::message_t& request);// 序列化RPC响应zmq::message_t serializeResponse(const msgpack::object& result);// 反序列化RPC响应msgpack::object deserializeResponse(const zmq::message_t& response);// 序列化RPC错误zmq::message_t serializeError(const std::string& errorMessage);
};// 便捷的RPC客户端类
class RpcClient {
public:/*** @brief 构造函数* * @param serverAddress 服务器地址*/RpcClient(const std::string& serverAddress);/*** @brief 析构函数*/~RpcClient();/*** @brief 调用远程方法* * @param methodName 方法名* @param params 参数* @param timeout_ms 超时时间(毫秒)* @return msgpack::object 返回结果*/template<typename... Args>msgpack::object call(const std::string& methodName, int timeout_ms = 1000, Args... args) {// 打包参数msgpack::type::tuple<Args...> params(args...);msgpack::sbuffer sbuf;msgpack::pack(sbuf, params);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());msgpack::object obj = oh.get();// 调用方法return LightweightRPC::getInstance().callMethod(methodName, obj, timeout_ms);}/*** @brief 异步调用远程方法* * @param methodName 方法名* @param params 参数* @return std::future<msgpack::object> 返回结果的future*/template<typename... Args>std::future<msgpack::object> callAsync(const std::string& methodName, Args... args) {// 打包参数msgpack::type::tuple<Args...> params(args...);msgpack::sbuffer sbuf;msgpack::pack(sbuf, params);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());msgpack::object obj = oh.get();// 异步调用方法return LightweightRPC::getInstance().callMethodAsync(methodName, obj);}
};// 便捷的RPC服务器类
class RpcServer {
public:/*** @brief 构造函数* * @param serverAddress 服务器地址* @param threadCount 工作线程数量*/RpcServer(const std::string& serverAddress, int threadCount = 1);/*** @brief 析构函数*/~RpcServer();/*** @brief 注册RPC方法* * @param methodName 方法名* @param handler 处理函数*/template<typename Func>void registerMethod(const std::string& methodName, Func handler) {LightweightRPC::getInstance().registerMethod(methodName, [handler](const msgpack::object& params) -> msgpack::object {// 调用处理函数return invokeHandler(handler, params);});}/*** @brief 启动RPC服务器*/void start();/*** @brief 停止RPC服务器*/void stop();private:int m_threadCount;// 调用处理函数并返回结果template<typename Func, typename... Args>static msgpack::object invokeHandler(Func handler, const msgpack::object& params) {try {// 解包参数msgpack::type::tuple<Args...> args;params.convert(args);// 调用处理函数auto result = callHandlerWithTuple(handler, args);// 打包结果msgpack::sbuffer sbuf;msgpack::pack(sbuf, result);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();} catch (const std::exception& e) {// 处理异常msgpack::sbuffer sbuf;msgpack::pack(sbuf, std::string("RPC Error: ") + e.what());msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();}}// 使用tuple调用处理函数template<typename Func, typename Tuple, std::size_t... I>static auto callHandlerWithTupleImpl(Func handler, Tuple&& tuple, std::index_sequence<I...>) {return handler(std::get<I>(std::forward<Tuple>(tuple))...);}template<typename Func, typename Tuple>static auto callHandlerWithTuple(Func handler, Tuple&& tuple) {constexpr auto size = std::tuple_size<typename std::decay<Tuple>::type>::value;return callHandlerWithTupleImpl(handler, std::forward<Tuple>(tuple), std::make_index_sequence<size>{});}
};} // namespace hub#endif // LIGHTWEIGHT_RPC_H
#include "lightweight_rpc.h"
#include <iostream>
#include <chrono>namespace hub {// 单例实例
LightweightRPC& LightweightRPC::getInstance() {static LightweightRPC instance;return instance;
}LightweightRPC::LightweightRPC() : m_running(false) {// 创建ZeroMQ上下文m_context = std::make_unique<zmq::context_t>(1);
}LightweightRPC::~LightweightRPC() {// 停止服务器stopServer();// 关闭客户端closeClient();
}bool LightweightRPC::initServer(const std::string& serverAddress) {try {// 创建服务器套接字m_serverSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REP);// 绑定地址m_serverSocket->bind(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}bool LightweightRPC::initClient(const std::string& serverAddress) {try {// 创建客户端套接字m_clientSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REQ);// 连接到服务器m_clientSocket->connect(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}void LightweightRPC::registerMethod(const std::string& methodName, RpcHandler handler) {std::lock_guard<std::mutex> lock(m_methodsMutex);m_methods[methodName] = handler;
}msgpack::object LightweightRPC::callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms) {if (!m_clientSocket) {throw std::runtime_error("RPC client not initialized");}try {// 序列化请求zmq::message_t request = serializeRequest(methodName, params);// 发送请求m_clientSocket->send(request, zmq::send_flags::none);// 设置接收超时m_clientSocket->set(zmq::sockopt::rcvtimeo, timeout_ms);// 接收响应zmq::message_t reply;auto result = m_clientSocket->recv(reply);if (!result.has_value() || result.value() == 0) {throw std::runtime_error("RPC call timeout");}// 反序列化响应return deserializeResponse(reply);} catch (const std::exception& e) {std::cerr << "RPC call error: " << e.what() << std::endl;// 返回错误msgpack::sbuffer sbuf;msgpack::pack(sbuf, std::string("RPC Error: ") + e.what());msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();}
}std::future<msgpack::object> LightweightRPC::callMethodAsync(const std::string& methodName, const msgpack::object& params) {// 创建promise和futureauto promise = std::make_shared<std::promise<msgpack::object>>();auto future = promise->get_future();// 在新线程中调用方法std::thread([this, methodName, params, promise]() {try {// 调用方法msgpack::object result = this->callMethod(methodName, params);// 设置结果promise->set_value(result);} catch (const std::exception& e) {// 设置异常promise->set_exception(std::make_exception_ptr(e));}}).detach();return future;
}void LightweightRPC::startServer(int threadCount) {if (!m_serverSocket) {throw std::runtime_error("RPC server not initialized");}// 停止现有的服务器stopServer();// 启动服务器m_running = true;// 创建工作线程for (int i = 0; i < threadCount; ++i) {m_workerThreads.emplace_back(&LightweightRPC::workerThread, this);}
}void LightweightRPC::stopServer() {// 停止服务器m_running = false;// 等待所有工作线程结束for (auto& thread : m_workerThreads) {if (thread.joinable()) {thread.join();}}// 清空工作线程m_workerThreads.clear();
}void LightweightRPC::closeClient() {// 关闭客户端套接字m_clientSocket.reset();
}void LightweightRPC::workerThread() {while (m_running) {try {// 接收请求zmq::message_t request;auto result = m_serverSocket->recv(request, zmq::recv_flags::dontwait);if (result.has_value() && result.value() > 0) {// 处理请求zmq::message_t reply;handleRequest(request, reply);// 发送响应m_serverSocket->send(reply, zmq::send_flags::none);} else {// 没有请求,休眠一段时间std::this_thread::sleep_for(std::chrono::milliseconds(10));}} catch (const std::exception& e) {std::cerr << "RPC server error: " << e.what() << std::endl;}}
}void LightweightRPC::handleRequest(zmq::message_t& request, zmq::message_t& reply) {try {// 反序列化请求auto [methodName, params] = deserializeRequest(request);// 查找方法处理函数RpcHandler handler;{std::lock_guard<std::mutex> lock(m_methodsMutex);auto it = m_methods.find(methodName);if (it == m_methods.end()) {// 方法不存在reply = serializeError("Method not found: " + methodName);return;}handler = it->second;}// 调用方法处理函数msgpack::object result = handler(params);// 序列化响应reply = serializeResponse(result);} catch (const std::exception& e) {// 处理异常reply = serializeError(std::string("RPC Error: ") + e.what());}
}zmq::message_t LightweightRPC::serializeRequest(const std::string& methodName, const msgpack::object& params) {// 创建请求对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包方法名和参数packer.pack_map(2);packer.pack("method");packer.pack(methodName);packer.pack("params");packer.pack(params);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}std::pair<std::string, msgpack::object> LightweightRPC::deserializeRequest(const zmq::message_t& request) {// 解包请求msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(request.data()), request.size());msgpack::object obj = oh.get();// 提取方法名和参数std::string methodName;msgpack::object params;if (obj.type != msgpack::type::MAP || obj.via.map.size != 2) {throw std::runtime_error("Invalid RPC request format");}for (uint32_t i = 0; i < obj.via.map.size; ++i) {auto key = obj.via.map.ptr[i].key;auto val = obj.via.map.ptr[i].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "method") {if (val.type != msgpack::type::STR) {throw std::runtime_error("Method name must be a string");}methodName = std::string(val.via.str.ptr, val.via.str.size);} else if (keyStr == "params") {params = val;}}}if (methodName.empty()) {throw std::runtime_error("Method name not found in RPC request");}return std::make_pair(methodName, params);
}zmq::message_t LightweightRPC::serializeResponse(const msgpack::object& result) {// 创建响应对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包结果packer.pack_map(1);packer.pack("result");packer.pack(result);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}msgpack::object LightweightRPC::deserializeResponse(const zmq::message_t& response) {// 解包响应msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(response.data()), response.size());msgpack::object obj = oh.get();// 提取结果if (obj.type != msgpack::type::MAP || obj.via.map.size != 1) {throw std::runtime_error("Invalid RPC response format");}auto key = obj.via.map.ptr[0].key;auto val = obj.via.map.ptr[0].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "result") {return val;} else if (keyStr == "error") {if (val.type == msgpack::type::STR) {std::string errorMsg(val.via.str.ptr, val.via.str.size);throw std::runtime_error(errorMsg);} else {throw std::runtime_error("Unknown RPC error");}}}throw std::runtime_error("Invalid RPC response format");
}zmq::message_t LightweightRPC::serializeError(const std::string& errorMessage) {// 创建错误对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包错误packer.pack_map(1);packer.pack("error");packer.pack(errorMessage);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}// RpcClient实现
RpcClient::RpcClient(const std::string& serverAddress) {if (!LightweightRPC::getInstance().initClient(serverAddress)) {throw std::runtime_error("Failed to initialize RPC client");}
}RpcClient::~RpcClient() {LightweightRPC::getInstance().closeClient();
}// RpcServer实现
RpcServer::RpcServer(const std::string& serverAddress, int threadCount) : m_threadCount(threadCount) {if (!LightweightRPC::getInstance().initServer(serverAddress)) {throw std::runtime_error("Failed to initialize RPC server");}
}RpcServer::~RpcServer() {stop();
}void RpcServer::start() {LightweightRPC::getInstance().startServer(m_threadCount);
}void RpcServer::stop() {LightweightRPC::getInstance().stopServer();
}} // namespace hub
总结
通过上述步骤,我们可以构建一个简单的、基于ZeroMQ和MessagePack的RPC框架。该框架支持注册、调用和异步调用远程方法,具备较高的性能和可靠性。希望本文能帮助大家更好地理解和使用ZeroMQ与MessagePack来实现高效的分布式通信。