brpc的二次封装以及brpc与etcd的联合

目的:

搭配etcd的注册中心管理能知道谁能提供什么服务,并用rpc进行服务调用

封装思想:

信道管理,将不同服务主机的通信信道管理起来

封装:

1.指定的信道管理类

一个服务通常会有多个节点,每个节点都会有自己的信道类,建立信道与服务的映射关系,服务一对多信道。

2.总体的信道管理类

管理多个服务的信道管理类管理起来

tips:我们没必要将所有的服务信道都建立起来,我们得申明我们关心的服务,不关心的就可以不管理这个服务

#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <iostream>
#include "./logger.hpp"
namespace common{
class ServerChannel
{
public:using ChannelPtr = std::shared_ptr<brpc::Channel>;using Ptr = std::shared_ptr<ServerChannel>;ServerChannel(const std::string &servername): _service_name(servername), _index(0){}void append(const std::string &host){ChannelPtr newchannel=std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 尝试连接时长Default: 200 (milliseconds)options.timeout_ms = -1; // rpc调用等待时间// Max duration of RPC over this Channel RPC调用在信道的期间options.protocol = "baidu_std";options.max_retry = 3;int ret = newchannel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(newchannel);_hosts.insert({host, newchannel});}// 服务器下线了一个节点,则把这个信道删除void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto score = _hosts.find(host);if (score == _hosts.end()){// 没找到LOG_WARN("没有找到该服务中的服务为{}主机名为{}的信道", _service_name, host);return;}// 找到了ChannelPtr scoreptr = score->second;for (auto it = _channels.begin(); it != _channels.end(); ++it){if (*it == scoreptr){_channels.erase(it);break;}}_hosts.erase(host);LOG_INFO("服务为{}主机名为{}的信道已经删除", _service_name, host);}// 选择一个信道给这个服务ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0){LOG_INFO("该服务{}中暂时没有信道", _service_name);return ChannelPtr();}int pos = _index++ % _channels.size();return _channels[pos];}private:int32_t _index;std::mutex _mutex;std::string _service_name;std::vector<ChannelPtr> _channels;// 节点与信道的位图std::unordered_map<std::string, ChannelPtr> _hosts;
};class ServerManager
{
public:using Ptr = std::shared_ptr<ServerManager>;ServerManager() {}ServerChannel::ChannelPtr choose(const std::string &instance_name){std::string servicename=instance_name;std::unique_lock<std::mutex> lock(_mutex);auto serversret = _servers.find(servicename);if (serversret == _servers.end()){LOG_INFO("当前服务{}没有可使用的节点!", servicename);return ServerChannel::ChannelPtr();}// 该服务被订阅且上线了,提供一个该服务的节点else{       auto score = _servers.find(servicename);ServerChannel::Ptr serverchannal = score->second;return serverchannal->choose();}}void onlinechannel(const std::string &instance_name, const std::string &host){ServerChannel::Ptr onlineserver;std::string servicename=get_service_name(instance_name);{std::unique_lock<std::mutex> lock(_mutex);auto followret = _follows.find(servicename);if (followret == _follows.end()){LOG_INFO("该服务{}-{}上线了,暂时没有被订阅", servicename, host);return;}// 新增该服务中的一个节点// 当该服务未上线auto serversret = _servers.find(servicename);if (serversret == _servers.end()){// 没有找到该服务 则添加该服务onlineserver = std::make_shared<ServerChannel>(servicename);_servers.insert({servicename, onlineserver});}else{onlineserver = serversret->second;}if (!onlineserver){LOG_ERROR("新增 {} 服务管理节点失败!", servicename);return;}// 已经存在该服务}onlineserver->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", servicename, host);}void offlinechannel(const std::string &instance_name, const std::string &host){ServerChannel::Ptr onlineserver;std::string servicename=get_service_name(instance_name);{std::unique_lock<std::mutex> lock(_mutex);auto followret = _follows.find(servicename);if (followret == _follows.end()){LOG_INFO("该服务{}-{}下线了,暂时没有被订阅", servicename, host);return;}// 删除该服务中的一个节点// 当该服务未上线auto serversret = _servers.find(servicename);if (serversret == _servers.end()){// 没有找到该服务LOG_INFO("没有找到该服务", servicename);return;}else{onlineserver = serversret->second;}if (!onlineserver){LOG_ERROR("删除 {} 服务管理节点失败!", servicename);return;}// 已经存在该服务}onlineserver->remove(host);LOG_DEBUG("{}-{} 服务下线该节点", servicename, host);}void declared(const std::string &servername){std::unique_lock<std::mutex> lock(_mutex);_follows.insert(servername);}std::string get_service_name(const std::string &server_instance_name){int pos=server_instance_name.find_last_of('/');if(pos==std::string::npos){return server_instance_name;}std::string retstring = server_instance_name.substr(0,pos);return retstring;}private:std::mutex _mutex;// 订阅的服务 没有被订阅的服务不提供节点使用std::unordered_set<std::string> _follows;// 服务名称 和 该服务的信道管理器std::unordered_map<std::string, ServerChannel::Ptr> _servers;
};
}

brpc与etcd的联合

服务端:1.构建echo服务 2.搭建RPC服务器 3.启动RPC服务器 4.在etcd上注册这个RPC服务

客户端: 1.构造RPC信道管理对象 2.再构造etcd的监控对象 3.再从RPC信道管理对象中获取提供echo服务的信道 4.发起echo业务

​​

​​

由于choose找的是servicename : /service/echo

而在新增服务时是 /service/echo/instance 所以在新增的回调函数中要采取截断的方法

​rigistry

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"
#include "../common/etcd.hpp"
#include "../common/channel.hpp"
#include<gflags/gflags.h>
#include <functional>
#include <brpc/server.h>
#include <brpc/closure_guard.h>
#include <butil/logging.h>
#include "main.pb.cc"
DEFINE_bool(run_mode,false,"这是运行的模式默认为调试模式false");
DEFINE_string(log_file,"","发布模式下指定的文件默认为空");
DEFINE_int32(log_level,0,"调试模式下默认的等级TRACE");DEFINE_string(Host,"http://127.0.0.1:2379","服务注册中心地址");
DEFINE_string(instance_Host,"127.0.0.1:7070","新上线服务的访问地址");
DEFINE_string(base_service,"/service","服务器监控目录");
DEFINE_string(instance_service,"/echo/instance","新上线的服务");
//一个 echo 中应该有多个 key - value ,如果定义成echo的话 只能对应一个服务的访问地址
//一个 echo 中应该有多个instance - value
DEFINE_int32(listen_post,7070,"新上线服务的访问地址");
class EchoServiceImpl:public example::EchoService               
{public:EchoServiceImpl(){}~EchoServiceImpl() override{} void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) override{brpc::ClosureGuard rpc_guard(done);// ~ClosureGuard() {//if (_done) {//_done->Run();//}std::cout<<"收到了消息"<<request->message()<<std::endl;response->set_message(request->message()+"好的,我知道了");//_done->Run();}};
int main(int argc,char * argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode,FLAGS_log_file,FLAGS_log_level);//1.启动rpc服务器 并添加服务//2.etcd内注册该服务 以及 访问该服务的 地址logging::LoggingSettings logger;logger.logging_dest= logging :: LoggingDestination::LOG_TO_NONE;logging::InitLogging(logger);//2.创建服务器并添加业务brpc::Server server;EchoServiceImpl echosservice;int n=server.AddService(&echosservice,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if(n<0){std::cout<<"AddService error"<<std::endl;exit(0);}//函数不匹配,源码有可能设置的是指针类型,tips//3.设置服务器选项,并且启动服务器brpc::ServerOptions opt;opt.idle_timeout_sec=-1;//尝试连接时间 超时则退出opt.num_threads=1;//number of pthreads that server runs on. 单线程; //number of requests processed in parallel(并行)// Default: 0 (unlimited)int m=server.Start(FLAGS_listen_post,&opt);if(m<0){std::cout<<"server Start error"<<std::endl;exit(1);}Rigistry::ptr rigserver=std::make_shared<Rigistry>(FLAGS_Host);rigserver->registry(FLAGS_base_service+FLAGS_instance_service,FLAGS_instance_Host);//运行等待服务结束server.RunUntilAskedToQuit();//运行等待服务结束
}

discovey

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"
#include "../common/etcd.hpp"
#include "../common/channel.hpp"
#include <gflags/gflags.h>
#include <functional>
#include <brpc/server.h>
#include <brpc/closure_guard.h>
#include <butil/logging.h>
#include "main.pb.cc"
DEFINE_bool(run_mode, false, "这是运行的模式默认为调试模式false");
DEFINE_string(log_file, "", "发布模式下指定的文件默认为空");
DEFINE_int32(log_level, 0, "调试模式下默认的等级TRACE");DEFINE_string(Host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(instance_Host, "168.198.0.1::8080", "新上线服务的访问地址");DEFINE_string(instance_service, "/echo", "新上线的服务");
DEFINE_string(base_dir, "/service", "新上线的服务");
DEFINE_string(call_service, "/service/echo", "新上线的服务");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1.创建服务管理类 并申明订阅的服务ServerManager::Ptr server = std::make_shared<ServerManager>();server->declared(FLAGS_call_service);// 2.将两个回调函数绑定,创建发现对象auto onlineput = std::bind(&ServerManager::onlinechannel, (server.get()), std::placeholders::_1, std::placeholders::_2);auto offlineput = std::bind(&ServerManager::offlinechannel, (server.get()), std::placeholders::_1, std::placeholders::_2);// 3.发现对象发现该服务,创建Echo_ServicestubDiscovery::ptr Disserver = std::make_shared<Discovery>(FLAGS_Host, FLAGS_base_dir, onlineput, offlineput);while (1){auto channel = server->choose(FLAGS_call_service);if (!channel){std::this_thread::sleep_for(std::chrono::seconds(1));return -1;}example::EchoService_Stub echoServiceStub(channel.get());brpc::Controller *control = new brpc::Controller();control->Reset();example::EchoRequest request1;request1.set_message("你好啊少年");example::EchoResponse *response1 = new example::EchoResponse();// 4.Echo_Servicestub调用Echo服务// rpc业务调用echoServiceStub.Echo(control, &request1, response1, nullptr);if (control->Failed()){std::cout << "rpc echo service failed" << control->ErrorText() << std::endl;delete control;delete response1;std::this_thread::sleep_for(std::chrono::seconds(1));continue;}std::cout << "client收到响应" << response1->message() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));}return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/486025.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Adminer源码编译 精简语言中英文和基本使用方法

Adminer是一个小而强悍的基于web的数据库管理工具&#xff0c; 官方默认支持几十种语言&#xff0c;但是对于中国的用户而言只需要有中文和英文就够了&#xff0c;其他语言基本无用。这就需要我们下载Adminer源码自己编译 Adminer.php , 如下图所示 adminer 中英文语言精简版本…

OpenStack-Glance组件

Glance Glance使用磁盘格式和容器格式基础配置镜像转换 Glance 是 OpenStack 的镜像服务&#xff0c;负责存储、发现和管理虚拟机镜像。它允许用户创建和共享镜像&#xff0c;用于启动虚拟机实例。 Glance 的主要功能 &#xff08;1&#xff09;虚拟机镜像的管理 支持镜像的上…

Leetcode 每日一题 56.合并区间

目录 问题描述 示例 示例 1 示例 2 问题分析 算法设计 步骤 1&#xff1a;排序 步骤 2&#xff1a;合并区间 步骤 3&#xff1a;返回结果 过题图片 代码实现 复杂度分析 题目链接 结语 问题描述 给定一个区间数组 intervals&#xff0c;其中每个区间由两个整数 s…

Oceanbase离线集群部署

准备工作 两台服务器 服务器的配置参照官网要求来 服务器名配置服务器IPoceanbase116g8h192.168.10.239oceanbase216g8h192.168.10.239 这里选oceanbase1作为 obd机器 oceanbase安装包 选择社区版本的时候自己系统的安装包 ntp时间同步rpm包 联网机器下载所需的软件包 …

Bert的Transformer原理

多义词如何应对&#xff1a; 答&#xff1a;通过Self attention&#xff0c;不同的上下文&#xff0c;对同一个"苹果"&#xff0c;得到截然不同的embedding激活值&#xff1b; Multi-head的作用&#xff1a; 有些类似CNN里用的多个卷积核得到多个Channel的特征图&…

AIDD-人工智能药物设计-化学自然语言引导的扩散式类药分子编辑:DiffIUPAC的魔法之旅

J. Pharm. Anal. | 化学自然语言引导的扩散式类药分子编辑&#xff1a;DiffIUPAC的魔法之旅 AIDD药研. 制药工程和生命科学背景&#xff0c;重点关注于计算机辅助药物设计&#xff08;CADD&#xff09;/药物筛选、分子动力学模拟MD&#xff0c;兽药信息学VetInformatics&…

ThinkPHP+Layui开发的ERP管理系统

ERP采购生产销售系统&#xff0c;一款基于ThinkPHPLayui开发的ERP管理系统&#xff0c;帮助中小企业实现ERP管理规范化&#xff0c;此系统能为你解决五大方面的经营问题&#xff1a;1.采购管理 2.销售管理 3.仓库管理 4.资金管理 5.生产管理&#xff0c;适用于&#xff1a;服装…

Elasticsearch:使用 Elastic APM 监控 Android 应用程序

一、前言 人们通过私人和专业的移动应用程序在智能手机上处理越来越多的事情。 拥有成千上万甚至数百万的用户&#xff0c;确保出色的性能和可靠性是移动应用程序和相关后端服务的提供商和运营商面临的主要挑战。 了解移动应用程序的行为、崩溃的发生和类型、响应时间慢的根本…

杂发单的单据类型一个参数的逻辑

【核准中可改】被产线滥用了。它们可以这样做&#xff0c;开立一张杂发单&#xff0c;打印出来交领导层签名。单据要交财务做核算的。然后去修改杂发单的材料。以为可以瞒天过海。2个仓库&#xff0c;一个中掉坑里&#xff0c;一个发现了它们的拙劣的手段&#xff0c;上报之后没…

事务的介绍(spring)

什么是事务&#xff1a; 事务是一组操作的集合&#xff0c;是不可分割的操作。比如一系列sql语句在一个操作中执行&#xff0c;要么成功要么失败。 比如在转账的时候&#xff0c;a钱包-100&#xff0c;b钱包100&#xff0c;两个要么同时成功要么同时失败。 &#xff08;复习&a…

CSS一些小点 —— 12.7

1. box-sizing: border-box box-sizing 属性&#xff0c;默认值为 content-box box-sizing: border-box 使padding和border的值不会再影响元素的宽高&#xff1b;padding和border的值算在指定宽高的内部&#xff08;但是外边距依然算做外部&#xff09; 2. overflow: hidden …

51c嵌入式~单片机合集3

我自己的原文哦~ https://blog.51cto.com/whaosoft/12581900 一、STM32代码远程升级之IAP编程 IAP是什么 有时项目上需要远程升级单片机程序&#xff0c;此时需要接触到IAP编程。 IAP即为In Application Programming&#xff0c;解释为在应用中编程&#xff0c;用户自己的程…

将军令游戏源码(​全套源代码+数据库+全套工具+客户端+服务端)

将军令游戏源码&#xff08;​全套源代码数据库全套工具客户端服务端&#xff09; 下载地址&#xff1a; 通过网盘分享的文件&#xff1a;【源码】将军令游戏源码&#xff08;全套源代码数据库全套工具客户端服务端&#xff09; 链接: https://pan.baidu.com/s/1A5oOn7NsDU1woH…

渐冻症患者的饮食希望:五种食物带来的可能

渐冻症&#xff0c;一个令人胆寒的医学难题。目前&#xff0c;全球约有 50 万渐冻症患者&#xff0c;这个数字还在不断增长。渐冻症会逐渐剥夺患者的行动能力、语言能力&#xff0c;甚至呼吸能力&#xff0c;给患者及其家庭带来沉重的打击。然而&#xff0c;有传言称 “渐冻症最…

文件IO——01

1. 认识文件 1&#xff09;文件概念 “文件”是一个广义的概念&#xff0c;可以代表很多东西 操作系统里&#xff0c;会把很多的硬件设备和软件资源抽象成“文件”&#xff0c;统一管理 但是大部分情况下的文件&#xff0c;都是指硬盘的文件&#xff08;文件相当于是对“硬…

安装 pytorch lighting

1 搜寻配对版本 进入lighting官网&#xff0c;查看配对版本 比如我就选择Python3.11、torch2.4、lightning2.4.0 2 搜寻pytorch安装命令 进入pytorch官网&#xff0c;查看以前版本的下载命令 注意要选择是 gpu版本的pytorch查看自己显卡驱动命令&#xff1a;nvidia-smi查看…

2030. gitLab A仓同步到B仓

文章目录 1 A 仓库备份 到 B 仓库2 B 仓库修改main分支的权限 1 A 仓库备份 到 B 仓库 #!/bin/bash# 定义变量 REPO_DIR"/home/xhome/opt/git_sync/zz_xx_xx" # 替换为你的本地库A的实际路径 REMOTE_ORIGIN"http://192.168.1.66:8181/zzkj_software/zz_xx_xx.…

在GITHUB上传本地文件指南(详细图文版)

这份笔记简述了如何在GITHUB上上传文件夹的详细策略。 既是对自己未来的一个参考&#xff0c;又希望能给各位读者带来帮助。 详细步骤 打开目标文件夹&#xff08;想要上传的文件夹&#xff09; 右击点击git bash打开 GitHub创立新的仓库后&#xff0c;点击右上方CODE绿色按…

[每周一更]-(第126期):MQ解耦场景

消息队列&#xff08;MQ&#xff09;解耦是一种软件架构设计模式&#xff0c;主要通过中间件将系统中的生产者和消费者模块分离&#xff0c;减少模块之间的直接依赖&#xff0c;使系统具有更高的扩展性和灵活性。这种模式尤其适用于需要处理复杂业务逻辑、频繁请求或异步处理的…

学习Python的笔记--面向对象-继承

1、概念 多个类之间的所属关系&#xff0c;即子类默认继承父类的所有属性和方法。 注&#xff1a;所有类默认继承object类&#xff0c;object类是顶级类或基类&#xff1b; 其他子类叫做派生类。 #父类A class A(object):def __init__(self):self.num1def info_print(self)…