文章目录
- 前言
- 特性
- 使用场景
- brpc && grpc 对比
- 相关类与接口
- 日志输出类与接口
- protobuf类与接口
- 服务端类与接口
- 客户端类与接口
- 使用
- 同步调用 & 异步调用
- 封装
- 封装思想
- 代码
前言
brpc 是用 c++语言编写的工业级 RPC 框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统
RPC(Remote Procedure Call,远程过程调用)框架指用于在网络中实现进程间通信的技术,使得程序能够调用远程计算机上的程序或服务,就像调用本地程序一样。
特性
特性 | 描述 |
---|---|
高性能 | 针对高并发场景优化,支持异步 IO,低延迟和高吞吐量。 |
多种传输协议 | 支持 HTTP/2、gRPC、TCP 等多种协议,灵活选择适合的方案。 |
流式支持 | 支持单向和双向流式 RPC 调用,适合实时数据传输场景。 |
负载均衡 | 内置多种负载均衡策略,支持客户端和服务端负载均衡。 |
服务发现 | 提供内置服务发现机制,支持与外部工具(如 Etcd、Consul)集成。 |
灵活序列化方式 | 默认使用 Protobuf,还支持 JSON 等其他序列化格式。 |
易用性 | 提供简单易懂的 API,快速上手和实现服务。 |
扩展性 | 支持插件机制,可以根据需求扩展功能。 |
使用场景
- 微服务架构:作为微服务间通信的基础框架,支持快速构建和部署微服务应用。
- 高并发处理:适用于需要处理大量并发请求的应用,如在线支付、即时通讯等。
- 实时数据传输:支持实时数据流的场景,例如视频直播、在线游戏等。
- 大规模分布式系统:适合于构建大规模的分布式系统,支持动态扩展和负载均衡。
- 跨语言服务调用:支持多种编程语言的服务调用。
brpc && grpc 对比
下面是 gRPC 与 brpc 的对比表格,涵盖了 主要特性和优缺点:
特性 | gRPC | brpc |
---|---|---|
语言支持 | 多种语言(C++, Java, Python, Go等) | 多种语言(C++, Python, Java等) |
传输协议 | HTTP/2 | 自定义协议,支持多种传输方式 |
序列化方式 | Protobuf | Protobuf、JSON等 |
性能 | 高性能,适合微服务架构 | 较高性能,优化了并发处理 |
流式支持 | 支持单向和双向流式 | 支持双向流式 |
负载均衡 | 需要外部支持或使用 gRPC 的内置功能 | 内置支持多种负载均衡策略 |
服务发现 | 依赖外部服务发现(如 Consul) | 内置服务发现机制 |
生态系统 | 强大的生态系统,广泛应用 | 相对较小但在特定场景下表现出色 |
社区活跃度 | 活跃,广泛使用 | 较小,但有特定用户群体 |
文档与支持 | 丰富的文档和社区支持 | 文档相对较少,社区较小 |
总结
- gRPC 适合需要跨语言和高性能通信的微服务架构,具有强大的生态系统和社区支持。
- brpc 更加专注于高性能的 RPC 通信,并且在某些场景下具有更好的灵活性和效率,适合特定需求的使用者。
相关类与接口
日志输出类与接口
日志输出类 包含头文件: #include <butil/logging.h>
在编写项目时,日志输出完全根据个人以及项目需求以使用不同的日志输出类,这里介绍如何关闭brpc自带的日志输出类:
namespace logging {// 日志输出目标枚举enum LoggingDestination {LOG_TO_NONE = 0 // 不输出日志};// 日志设置结构体struct BUTIL_EXPORT LoggingSettings {// 构造函数,初始化日志设置LoggingSettings();// 日志输出目标,决定日志将被发送到何处LoggingDestination logging_dest;};// 初始化日志系统// 参数:// settings - 包含日志设置的 LoggingSettings 对象// 返回:// bool - 初始化是否成功bool InitLogging(const LoggingSettings& settings);
}// 0. 关闭 brpc 默认日志输出
logging::LoggingSettings settings; // 创建一个 LoggingSettings 对象
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE; // 设置日志输出为不输出
logging::InitLogging(settings); // 初始化日志系统,应用上述设置
protobuf类与接口
namespace google
{namespace protobuf{// Closure 类用于定义可调用对象的接口class PROTOBUF_EXPORT Closure{public:// 默认构造函数Closure() {}// 虚析构函数,用于确保派生类被正确析构virtual ~Closure();// 纯虚函数,派生类需要实现该函数以定义具体的操作virtual void Run() = 0;};// 创建一个新的回调对象// 参数:// function - 指向要调用的无参数函数的指针// 返回:// Closure* - 指向新创建的 Closure 对象的指针inline Closure *NewCallback(void (*function)());// RpcController 类用于控制 RPC 调用的状态class PROTOBUF_EXPORT RpcController{public:// 检查 RPC 调用是否失败// 返回:// bool - 如果调用失败则返回 true,否则返回 falsebool Failed();// 获取错误信息文本// 返回:// std::string - 表示错误的文本信息std::string ErrorText();};}
}
服务端类与接口
namespace brpc
{// ServerOptions 结构用于配置服务器选项struct ServerOptions{// 空闲超时时间,超过该时间后关闭连接int idle_timeout_sec; // 默认值: -1(禁用)// 服务器线程数量,默认值为 CPU 核心数int num_threads; // 默认值: #cpu-cores// 其他可能的选项...};// ServiceOwnership 枚举定义服务的所有权管理方式enum ServiceOwnership {// 当添加服务失败时,服务器负责删除服务对象SERVER_OWNS_SERVICE,// 当添加服务失败时,服务器不会删除服务对象SERVER_DOESNT_OWN_SERVICE};// Server 类表示一个 BRPC 服务器class Server{public:// 添加服务到服务器// 参数:// service - 要添加的服务对象// ownership - 服务的所有权类型// 返回:// int - 返回操作结果的状态码int AddService(google::protobuf::Service *service, ServiceOwnership ownership);// 启动服务器// 参数:// port - 监听的端口号// opt - 服务器选项// 返回:// int - 返回启动状态int Start(int port, const ServerOptions *opt);// 停止服务器// 参数:// closewait_ms - 等待关闭的时间(不再使用)// 返回:// int - 返回停止状态int Stop(int closewait_ms /*not used anymore*/);// 等待服务器完成所有任务并退出// 返回:// int - 返回加入状态int Join();// 运行服务器,直到收到退出请求void RunUntilAskedToQuit();};// ClosureGuard 类用于确保在作用域结束时执行回调class ClosureGuard{public:explicit ClosureGuard(google::protobuf::Closure *done): _done(done) {} // 初始化回调指针~ClosureGuard(){if (_done)_done->Run(); // 在析构时调用回调}private:google::protobuf::Closure *_done; // 存储回调指针};// HttpHeader 类表示 HTTP 请求或响应的头部信息class HttpHeader{public:// 设置内容类型void set_content_type(const std::string &type);// 获取指定键的头部值const std::string *GetHeader(const std::string &key);// 设置指定键的头部值void SetHeader(const std::string &key, const std::string &value);// 获取 URIconst URI &uri() const { return _uri; }// 获取 HTTP 方法HttpMethod method() const { return _method; }// 设置 HTTP 方法void set_method(const HttpMethod method);// 获取状态码int status_code();// 设置状态码void set_status_code(int status_code);private:URI _uri; // 存储 URI 信息HttpMethod _method; // 存储 HTTP 方法// 其他可能的成员...};// Controller 类用于管理 RPC 调用class Controller : public google::protobuf::RpcController{public:// 设置超时时间void set_timeout_ms(int64_t timeout_ms);// 设置最大重试次数void set_max_retry(int max_retry);// 获取响应消息google::protobuf::Message *response();// 获取 HTTP 响应头HttpHeader &http_response();// 获取 HTTP 请求头HttpHeader &http_request();// 检查 RPC 调用是否失败bool Failed();// 获取错误文本std::string ErrorText();// 定义 RPC 响应后的回调函数类型using AfterRpcRespFnType = std::function<void(Controller *cntl,const google::protobuf::Message *req,const google::protobuf::Message *res)>; // 设置 RPC 响应后的回调函数void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn);private:// 其他成员...};
}
客户端类与接口
namespace brpc
{// ChannelOptions 结构用于配置通道选项struct ChannelOptions{// 请求连接超时时间,单位为毫秒int32_t connect_timeout_ms; // 默认值: 200 毫秒// RPC 请求超时时间,单位为毫秒int32_t timeout_ms; // 默认值: 500 毫秒// 最大重试次数int max_retry; // 默认值: 3// 序列化协议类型AdaptiveProtocolType protocol; // 默认值: "baidu_std"// 其他可能的选项...};// Channel 类表示一个 RPC 通道,用于和服务器进行通信class Channel : public ChannelBase{public:// 初始化接口// 参数:// server_addr_and_port - 服务器地址和端口// options - 指向 ChannelOptions 的指针,用于配置通道// 返回:// int - 成功返回 0,失败返回错误码int Init(const char *server_addr_and_port,const ChannelOptions *options);};// 其他相关类和功能...
}
使用
同步调用 & 异步调用
同步调用是指在程序中,当一个函数被调用时,调用者会等待被调用的函数执行完毕并返回结果后,才会继续执行后面的代码。
放在brpc中,同步调用是指客户端在发送请求后,会以阻塞的方式等待服务端的响应;
首先编写proto文件:
syntax = "proto3";package emp;option cc_generic_services = true; // 生成通用服务代码 (用于rpc)message EchoRequest {string message = 1;
}message EchoResponse {string message = 1;
}service EchoService {rpc Echo(EchoRequest) returns (EchoResponse);
}
编辑后开始写server代码:
#include <butil/logging.h>
#include <brpc/server.h>
#include "main.pb.h"class EchoServiceT : public emp::EchoService {
public:EchoServiceT() {}~EchoServiceT() {}// 重写父类回声函数void Echo(google::protobuf::RpcController* controller,const ::emp::EchoRequest* request,::emp::EchoResponse* response,::google::protobuf::Closure* done){brpc::ClosureGuard done_guard(done); // 自动调用done->Run()std::cout << "接收到消息: " << request->message() << std::endl;std::string msg = "响应消息: " + request->message();response->set_message(msg);}
};int main(int argc, char* argv[]) {// 0. 关闭brpc默认日志输出logging::LoggingSettings settings;settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;logging::InitLogging(settings);// 1. 初始化服务器对象brpc::Server server;// 2. 注册服务EchoServiceT echo_service;// SERVER_OWNS_SERVICE 服务器负责管理销毁 该服务// SERVER_DOESNT_OWN_SERVICE 服务器不负责该服务的生命周期auto ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret != 0) {std::cout << "添加RPC服务失败。" << std::endl;return -1;}// 3. 启动服务器brpc::ServerOptions options;options.idle_timeout_sec = -1; // 设置超时时间 为-1,表示不超时options.num_threads = 1; // 设置线程数ret = server.Start(8080, &options);if (ret != 0) {std::cout << "启动服务器失败。" << std::endl;return -1;}server.RunUntilAskedToQuit(); // 阻塞等待直到收到退出信号return 0;
}
客户端代码:
#include <brpc/channel.h>
#include <thread>
#include <iostream>
#include "main.pb.h"#define SYNC 0// 异步回调函数
void callback(brpc::Controller *cntl, emp::EchoResponse *resp) {std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<emp::EchoResponse> resp_guard(resp);if (cntl->Failed()) {std::cout << "RPC调用失败: " << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << resp->message() << std::endl;
}int main(int argc, char* argv[]) {// 1. 创建信道 连接服务器brpc::ChannelOptions options;options.protocol = "baidu_std"; // 序列化协议 默认options.connect_timeout_ms = -1; // 连接超时时间 -1表示永不超时options.timeout_ms = -1; // 超时时间 -1表示永不超时options.max_retry = 3; // 最大重试次数brpc::Channel channel;if (channel.Init("127.0.0.1:8080", &options) != 0) {std::cout << "初始化信道失败" << std::endl;return -1;}// 2. 构造EchoService_Stub对象(用于RPC调用)emp::EchoService_Stub stub(&channel);// 3. 进行RPC调用emp::EchoRequest req;std::cout << "请输入消息: ";std::string msg;getline(std::cin, msg);req.set_message(msg);// 4. 构造Controller对象(用于控制RPC调用)brpc::Controller *cntl = new brpc::Controller();emp::EchoResponse *resp = new emp::EchoResponse();#if SYNC // 同步调用stub.Echo(cntl, &req, resp, nullptr); // google::protobuf::Closure *done: 传入nullptr代表同步调用if(cntl->Failed()) {std::cout << "RPC调用失败: " << cntl->ErrorText() << std::endl;return -1;}std::cout << "RPC调用成功, 响应信息: " << resp->message() << std::endl;delete cntl;delete resp;#else // 异步调用auto clusure = google::protobuf::NewCallback(callback, cntl, resp);stub.Echo(cntl, &req, resp, clusure);std::cout << "异步调用成功" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));#endifreturn 0;
}
封装
封装思想
由于不同的服务调用使用不同的 Stub,其封装意义不大。因此,我们主要封装通信所需的 Channel。当需要进行服务调用时,只需通过服务名称获取对应的 Channel,然后实例化 Stub 进行调用即可。
设计概要
-
Channel 管理类:
- 每个服务可能有多个主机提供服务,因此一个服务可能对应多个 Channel。需要管理这些 Channel,并提供获取指定服务 Channel 的接口。
- 在进行 RPC 调用时,根据 Round Robin(RR)轮转策略选择 Channel。
-
服务声明接口:
- 整体项目中通常会提供多个服务,当前可能并不需要用到所有服务。因此,通过声明来告知模块当前关心的服务,并建立连接进行管理。未声明的服务即使上线也不需要进行连接的建立。
-
服务上线处理接口:
- 提供新增指定服务的 Channel 的接口,以便在服务上线时进行管理。
-
服务下线处理接口:
- 提供删除指定服务下的 Channel 的接口,以便在服务下线时进行管理。
代码
class ServiceChannel
{
public:using ptr = std::shared_ptr<ServiceChannel>;using ChannelPtr = std::shared_ptr<brpc::Channel>;ServiceChannel(const std::string& service_name) : _index(0), _service_name(service_name) {}void append(const std::string& host){// 创建信道auto channel = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.protocol = "baidu_std";options.timeout_ms = -1;options.connect_timeout_ms = -1;options.max_retry = 3;int ret = channel->Init(host.c_str(), &options);if (ret != 0){LOG_ERROR("初始化{}-{}信道失败", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_hosts.insert({host, channel});_channels.push_back(channel);} /* 服务上线一个节点 - 调用append新增信道 */void remove(const std::string& host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if(it == _hosts.end()) {LOG_WARN("{}-{}删除时,未找到信道信息", _service_name, host);return;}for(auto vit = _channels.begin(); vit != _channels.end(); ++vit) {if(*vit == it->second) {_channels.erase(vit);break;}}_hosts.erase(it);LOG_INFO("{}-{}删除成功", _service_name, host);} /* 服务下线一个节点 - 调用remove释放信道 */ChannelPtr getChannel() {std::unique_lock<std::mutex> lock(_mutex);if(_channels.empty()) {LOG_ERROR("当前没有能提供{}服务的节点", _service_name);return ChannelPtr();}int32_t idx = _index++ % _channels.size(); // 轮转索引return _channels[idx];}private:std::mutex _mutex; // 互斥锁int32_t _index; // 轮转索引std::string _service_name; // 服务名称std::vector<ChannelPtr> _channels; // 服务对应的信道集合std::unordered_map<std::string, ChannelPtr> _hosts; // // 主机地址到信道映射
};class ServiceManager
{
public:using ptr = std::shared_ptr<ServiceManager>;ServiceManager() {} /* 获取指定服务的信道节点 */ServiceChannel::ChannelPtr getChannel(const std::string& service_name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(service_name);if(it == _services.end()) {LOG_ERROR("当前没有能提供{}服务的节点", service_name);return ServiceChannel::ChannelPtr();}return it->second->getChannel();}/* 声明关注的服务 */void declareTrackService(const std::string& service_name) {std::unique_lock<std::mutex> lock(_mutex);_track_services.insert(service_name);}/* 服务上线回调 */void onServiceOnline(const std::string& service_instance, const std::string& host) {std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto tit = _track_services.find(service_name);if(tit == _track_services.end()) {LOG_DEBUG("{}-{}服务上线了 (不在关注列表中)", service_name, host);return;}auto sit = _services.find(service_name);if(sit == _services.end()) {service = std::make_shared<ServiceChannel>(service_name);_services.insert({service_name, service});} else {service = sit->second;}}if(!service) {LOG_ERROR("{}服务新增失败", service_name);return;}service->append(host);LOG_DEBUG("{}服务新增成功", service_name);}/* 服务下线回调 */void onServiceOffline(const std::string& service_instance, const std::string& host) {std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto tit = _track_services.find(service_name);if(tit == _track_services.end()) {LOG_DEBUG("{}-{}服务下线了 (不在关注列表中)", service_name, host);return;}auto sit = _services.find(service_name);if(sit == _services.end()) {LOG_WARN("删除{}服务时,未找到管理对象", service_name);return;}service = sit->second;}service->remove(host);LOG_DEBUG("{}服务删除成功", service_name);}
private:std::string getServiceName(const std::string& service_instance) {auto pos = service_instance.find_last_of('/');if(pos == std::string::npos) {return service_instance;}return service_instance.substr(0, pos);}private:std::mutex _mutex;std::unordered_set<std::string> _track_services; // 跟踪的服务集合std::unordered_map<std::string, ServiceChannel::ptr> _services; // 服务名称到信道集合的映射
};