目录
Requestor类的实现
框架
完善
onResponse处理回复
完整代码
RpcCaller类的实现
1. 同步调用 call
2. 异步调用 call
3. 回调调用 call
Requestor类的实现
(1)主要功能:
- 客户端发送请求的功能,进行请求描述
- 对服务器响应的处理机制,并对返回信息 进行对应接收
(2)具体实现:
- 意义:针对客户端的每一条请求进行管理,以便于对请求对应的响应做出合适操作。
- 对于客户端而言,其通常是主动发起请求服务的一方。然而,在多线程网络通信中,针对多个请求进行响应时可能会存在时序问题,导致无法保证一个线程发送的请求后接收到的响应就是针对这条请求的响应,这是非常危险的情况。
- 异步IO挑战:类似于Muduo库这种异步IO网络通信库,通常IO操作都是异步的,即发送数据是将数据放入发送缓冲区,而何时发送由底层网络库协调,并且不提供
recv
接口,而是连接触发可读事件后,IO读取数据完成调用处理回调进行数据处理,因此在发送请求后无法直接等待该条请求的响应。
解决方案:
- 创建请求管理模块,通过给每个请求设定一个请求ID来解决上述问题。服务端响应时会标识出响应针对的是哪个请求(即响应信息包含请求ID)。
- 客户端无论收到 哪条请求的响应,都将数据存储入hash_map中,以请求ID作为映射,并提供获取指定请求ID响应的阻塞接口。这样,只要知道自己的请求ID,就能准确获取到想要的响应。
- 进一步优化:可以将每个请求封装描述,添加异步future控制或设置回调函数的方式,不仅支持阻塞获取响应,也能实现异步获取响应及 回调处理响应。
框架
namespace bitrpc{
namespace client
{//客户端 部分class Requestor{public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;struct RequestDescribe{using ptr=std::shared_ptr<RequestDescribe>;//智能指针 管理};//请求 信息描述//之后 好调用 所需要的rsp函数//Dispatcher调用void onResponse(const BaseConnection::ptr &conn,BaseMessage::ptr &msg){}//异步发送bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp){}//同步发送bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {}//回调发送bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {}private://对于 请求 描述进行CURD//增RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {}//查 ridRequestDescribe::ptr getDescribe(const std::string &rid){}//删void delDescribe(const std::string &rid){}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};
}
}
完善
信息描述
struct RequestDescribe {using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};
对收到的响应 通过 uid ,对应上是哪个请求发出的
实现了上面的解决问题
void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);//根据先获取msg->rid() 来进行结果的调用if (rdp.get() == nullptr) {ELOG("收到响应 - %s,但是未找到对应的请求描述!", rid.c_str());return;}if (rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);//调用 不同的接口}else if (rdp->rtype == RType::REQ_CALLBACK){if (rdp->callback) rdp->callback(msg);}else {ELOG("请求类型未知!!");}delDescribe(rid);
}
onResponse
处理回复
onResponse
方法是对收到的消息进行处理的入口点。当收到服务器的响应时,该方法会被调用来匹配相应的请求描述(RequestDescribe
),并通过请求类型(RType
)来决定如何处理响应:
-
- 如果是 异步请求(
RType::REQ_ASYNC
),则通过设置std::promise
的值(response.set_value(msg)
)来完成对应的std::future
,使得调用者可以通过未来对象获取响应。 - 如果是带有回调的请求(
RType::REQ_CALLBACK
),则直接调用注册的回调函数(rdp->callback(msg)
)来处理响应。 - 如果请求类型未知,则记录错误日志。
- 如果是 异步请求(
onResponse
方法则是 对接收到的响应进行处理,
关于 promise set_value: C++11 异步操作 future类_文档学习
send
方法负责构建和发送请求,
//异步操作
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
{RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr){ELOG("构造请求描述对象失败!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;
}
//同步操作
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp)
{AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false){return false;}rsp = rsp_future.get();return true;
}
//回调函数
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb)
{RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr){ELOG("构造请求描述对象失败!");return false;}conn->send(req);//!!!!!!!!!!!!!!!!return true;
}
1.
前文回顾:[C++#28][多态] 两个条件 | 虚函数表 | 抽象类 | override 和 final | 重载 重写 重定义
Requestor
类中的 三个重载send
方法,这些方法用于通过指定的连接对象发送消息
同:
- 基本参数:所有三个
send
方法都接受两个相同的基本参数:一个指向BaseConnection
的智能指针conn
(表示网络连接)和一个指向BaseMessage
的智能指针req
(表示要发送的消息)。 - 错误处理:每个
send
方法在无法成功创建请求描述对象时,都会记录错误日志并返回false
,指示操作失败。 - 消息发送:无论哪种方式,最终都是通过调用
conn->send(req)
来执行实际的消息发送。
异:
send
方法(有三个重载版本)用于通过网络连接conn
发送请求消息req
到服务器:
-
- 第一个
send
方法接受一个AsyncResponse &async_rsp
参数,用于异步发送请求并返回一个std::future
对象,以便于后续获取响应。(不阻塞 - 第二个
send
方法是同步的,它 等待直到接收到服务器的响应并将结果赋值给BaseMessage::ptr &rsp
。 - 第三个
send
方法允许用户在发送请求时提供一个回调函数const RequestCallback &cb
,当收到响应时会自动调用该回调进行处理。(send 后,就不管了,不会阻塞等待)
- 第一个
📒对比第一种和第三种方式:
- 结果获取方式:
-
- 第一种方法需调用者主动通过
future.get()
获取结果,可能导致阻塞。 - 第三种方法响应到达时自动调用回调函数处理结果,无需主动获取。
- 第一种方法需调用者主动通过
- 编程模型:
-
- 第一种更接近同步编程风格,通过异步手段避免长时间阻塞。
- 第三种是典型的异步编程模型,更适合处理并发任务和事件驱动架构。
- 灵活性与复杂性:
-
- 第一种方法直观但可能引入复杂的依赖关系管理。
- 第三种方法灵活,尤其适合链式异步操作,但可能导致“回调地狱”,增加代码维护难度。
<id,请求 desc> CURD
private:// 对于 请求 描述进行CURD//增RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype,const RequestCallback &cb = RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);//加锁RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb){rd->callback = cb;}_request_desc.insert(std::make_pair(req->GetId(), rd));//将id 和描述 进行对应return rd;}//查RequestDescribe::ptr getDescribe(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()){return RequestDescribe::ptr();}return it->second;}//删void delDescribe(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}
完整代码
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future> //异步操作
#include <functional> //1.灵活的函数使用 bind functionnamespace bitrpc{
namespace client
{//客户端 部分class Requestor{public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;//异步处理信息struct RequestDescribe {using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};//请求 信息描述//之后 好调用 所需要的rsp函数//Dispatcher 给RSP_RPC回复调用的void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->GetId();RequestDescribe::ptr rdp = getDescribe(rid);if (rdp.get() == nullptr) {ELOG("收到响应 - %s,但是未找到对应的请求描述!", rid.c_str());return;}if (rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);}else if (rdp->rtype == RType::REQ_CALLBACK){if (rdp->callback) rdp->callback(msg);}else {ELOG("请求类型未知!!");}delDescribe(rid);}
//!!!!!!!对收到 的回复请求 进行id存储//异步详可见demo中的 使用//异步操作bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp){RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr){ELOG("构造请求描述对象失败!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}//同步操作bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp){AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false){return false;}rsp = rsp_future.get();return true;}//回调函数bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb){RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr){ELOG("构造请求描述对象失败!");return false;}conn->send(req);return true;}//private:// <id,请求 desc> CURD//增RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype,const RequestCallback &cb = RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb){rd->callback = cb;}_request_desc.insert(std::make_pair(req->GetId(), rd));//将id 和描述 进行对应return rd;}//查RequestDescribe::ptr getDescribe(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()){return RequestDescribe::ptr();}return it->second;}//删void delDescribe(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};
}
}
RpcCaller类的实现
(Requestor 的处理 调用 RpcCaller)
(1)主要功能:
- 给Requestor() 提供接口。
(2)具体实现:
- 意义:向用户提供进行RPC调用的模块。这个模块相对简单,主要功能是向外提供几个RPC调用接口,内部实现向服务端发送请求并等待获取结果。
- 调用方式:
-
- 同步调用:发起调用后,等到收到响应结果后返回。
- 异步调用:发起调用后立即返回,可以在需要的时候获取结果。
- 回调调用:发起调用的同时设置结果的处理回调,收到响应后自动对结果进行回调处理。
❗❗❗❗
// requestor中的处理是针对BaseMessage进行处理的
// 用于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的
#pragma once
#include "requestor.hpp"// request 有 rpc topic server
// 其中 rpc部分的 调用函数的 实现namespace bitrpc
{namespace client{class RpcCaller{public:using ptr = std::shared_ptr<RpcCaller>;using JsonAsyncResponse = std::future<Json::Value>;using JsonResponseCallback = std::function<void(const Json::Value &)>;RpcCaller(const Requestor::ptr &requestor) : _requestor(requestor) {}// requestor中的处理是针对BaseMessage进行处理的// 用于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的//1.bool call(const BaseConnection::ptr &conn, const std::string &method,const Json::Value ¶ms, Json::Value &result){DLOG("开始同步rpc调用...");// 1. 组织请求auto req_msg = MessageFactory::create<RpcRequest>();req_msg->SetId(UUID::uuid());req_msg->setMethod(method);req_msg->setParams(params);req_msg->SetMType(MType::REQ_RPC);BaseMessage::ptr rsp_msg;// 2. 发送请求bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);if (ret == false){ELOG("同步Rpc请求失败!");return false;}DLOG("收到响应,进行解析,获取结果!");// 3. 等待响应auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if (!rpc_rsp_msg){ELOG("rpc响应,向下类型转换失败!");return false;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("rpc请求出错:%s", errReason(rpc_rsp_msg->rcode()));return false;}result = rpc_rsp_msg->result();DLOG("结果设置完毕!");return true;}////2.bool call(const BaseConnection::ptr &conn, const std::string &method,const Json::Value ¶ms, JsonAsyncResponse &result){// 向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中去堆promise设置数据auto req_msg = MessageFactory::create<RpcRequest>();req_msg->SetId(UUID::uuid());req_msg->SetMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);auto json_promise = std::make_shared<std::promise<Json::Value>>();result = json_promise->get_future();Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback,this, json_promise, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);if (ret == false){ELOG("异步Rpc请求失败!");return false;}return true;}//3.bool call(const BaseConnection::ptr &conn, const std::string &method,const Json::Value ¶ms, const JsonResponseCallback &cb){auto req_msg = MessageFactory::create<RpcRequest>();req_msg->SetId(UUID::uuid());req_msg->SetMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false){ELOG("回调Rpc请求失败!");return false;}return true;}private:void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg){auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg){ELOG("rpc响应,向下类型转换失败!");return;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("rpc回调请求出错:%s", errReason(rpc_rsp_msg->rcode()));return;}cb(rpc_rsp_msg->result());}void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg){auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg){ELOG("rpc响应,向下类型转换失败!");return;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));return;}result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};}
}
RpcCaller
类中 三个重载的call
方法,这些方法提供了不同的方式来发起RPC调用,并处理从服务器返回的响应。每个call
方法根据其参数和使用场景的不同,具有特定的功能和适用性:
1. 同步调用
call
- 目的:同步地发起一个RPC请求,并等待直到收到服务器的响应。
- 实现:
-
- 创建并配置一个
RpcRequest
消息。 - 使用
_requestor->send()
发送请求,并阻塞等待响应。 - 接收到响应后解析结果,并检查是否有错误发生。
- 将结果设置到输出参数
result
中 返回给调用者。
- 创建并配置一个
- 应用场景:适用于需要立即获取结果且可以接受当前线程被阻塞的情况。
2. 异步调用
call
- 目的:异步地发起一个RPC请求,不阻塞当前线程,允许后续通过
std::future
机制获取结果。 - 实现:
-
- 创建并配置一个
RpcRequest
消息。 - 绑定一个回调函数
Callback
用于在接收到响应时设置std::promise
的值。 - 使用
_requestor->send()
发送请求,并立即返回(非阻塞)。 - 结果 可以通过
result.get()
在未来某个时刻获取。
- 创建并配置一个
- 应用场景:适合于那些希望避免阻塞主线程,但仍然需要明确获取结果的场景。
3. 回调调用
call
- 目的:异步发起RPC请求并在接收到响应时自动调用用户提供的回调函数进行处理。
- 实现:
-
- 创建并配置一个
RpcRequest
消息。 - 绑定一个回调函数
Callback1
,该回调会在接收到响应时被调用,并进一步调用用户提供的回调函数处理结果。 - 使用
_requestor->send()
发送请求,并立即返回(非阻塞)。
- 创建并配置一个
- 应用场景:适用于不需要立即处理响应结果,到了 就回调 来处理响应数据的场景。
本节重点,通过 重载 来实现同步 回调 异步
- 同步:阻塞 返回结果参数
- 回调:非阻塞 到了就返回结果
- 异步:非阻塞 .get()获取