RPC教程 7.服务发现与注册中心

0.前言

这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体)这种就解决不了,也即是没有服务ip地址和服务实例的映射关系。

1.为什么需要注册中心

在上一节中,客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。这时可能会出问题,要是该服务实例ip改变了呢,该服务实例下线宕机了呢?这时如何是好。

// 调用单个服务实例
func clientCall(addr1, addr2 string) {d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()//省略其他......
}

这时,注册中心的重要性就出来了。

注册中心主要有三种角色:

  • 服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
  • 服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
  • 服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。

最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。

 当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcd、zookeeper、consul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。

2.Gee Registry

主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口也很多。所以这里我们选择自己实现一个简单的支持心跳保活的注册中心。

GeeRegistry 的代码独立放置在子目录 registry 中。

首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,超过5min没有收到该注册的服务的心跳,即视其为不可用状态。

//registry.go
type ServerItem struct {Addr  stringstart time.Time //用于心跳时间计算
}// GeeRegistry is a simple register center
type GeeRegistry struct {timeout time.Durationmutex   sync.Mutex //protcect serversservers map[string]*ServerItem
}const (defaultPath    = "/_rpc_/registry"defaultTimeout = time.Minute * 5
)func New(timeout time.Duration) *GeeRegistry {return &GeeRegistry{servers: make(map[string]*ServerItem),timeout: timeout,}
}var DefalultGeeRegister = New(defaultTimeout)

然后,为 GeeRegistry 实现添加服务实例和返回服务列表的方法。

  • putServer:添加服务实例,如果服务已经存在,则更新 start。
  • aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
func (r *GeeRegistry) putServer(addr string) {r.mutex.Lock()defer r.mutex.Unlock()s := r.servers[addr]if s == nil {r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}} else {s.start = time.Now() // if exists, update start time to keep alive}
}func (r *GeeRegistry) aliveServers() []string {r.mutex.Lock()defer r.mutex.Unlock()var alive []stringfor addr, s := range r.servers {if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {alive = append(alive, addr)} else {delete(r.servers, addr)}}sort.Strings(alive)return alive
}

 为了简单,那么rpc客户端通过HTTP去访问注册中心,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-rpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-rpc-Server 承载。
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {switch req.Method {case "GET":w.Header().Set("X-rpc-Servers", strings.Join(r.aliveServers(), ","))case "POST":addr := req.Header.Get("X-rpc-Servers")if addr == "" {w.WriteHeader(http.StatusInternalServerError)return}r.putServer(addr) //更新保存在注册中心的服务实例default:w.WriteHeader(http.StatusMethodNotAllowed)}
}func (r *GeeRegistry) HandleHTTP(registryPath string) {http.Handle(registryPath, r)
}func HandleHTTP() {DefalultGeeRegister.HandleHTTP(defaultPath)
}

另外,也要提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳(也是通过HTTP),默认周期比注册中心设置的过期时间少 1 min。

// only send once
func sendHeartbeat(registryURL, addr string) error {httpClient := &http.Client{Timeout: time.Second * 10}req, _ := http.NewRequest("POST", registryURL, nil)req.Header.Set("X-rpc-Servers", addr)resp, err := httpClient.Do(req)if err != nil {fmt.Println("rpc server: heart beat err:", err)return err}defer resp.Body.Close()return nil
}// Heartbeat send a heartbeat message every once in a while
func Heartbeat(registryURL, addr string, duration time.Duration) {if duration == 0 {duration = defaultTimeout - time.Duration(1)*time.Minute}err := sendHeartbeat(registryURL, addr)go func() {//创建一个定时器t := time.NewTicker(duration)for err == nil {<-t.Cerr = sendHeartbeat(registryURL, addr)}}()
}

3.需要注册中心的服务发现

上一节我们实现了一个不需要注册中心,服务列表由手工维护的服务发现的结构体MultiServersDiscovery。

而现在我们实现了注册中心,那这一节的服务发现就可以继承上一节的,并添加与注册中心相关的细节

type GeeRegistryDiscovery struct {*MultiServerDiscoveryregistryAddr stringtimeout      time.Duration //服务列表的过期时间lastUpdate   time.Time
}const defaultUpdateTimeout = time.Second * 10func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {if timeout == 0 {timeout = defaultUpdateTimeout}return &GeeRegistryDiscovery{MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),registryAddr:         registerAddr,timeout:              timeout,}
}
  • GeeRegistryDiscovery 嵌套了 MultiServersDiscovery,很多能力可以复用。
  • registryAddr 即注册中心的地址
  • timeout 服务列表的过期时间
  • lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。

实现 Update 和 Refresh 方法,超时重新获取的逻辑在 Refresh 中实现: 

func (d *GeeRegistryDiscovery) Update(servers []string) error {d.rwMutex.Lock()defer d.rwMutex.Unlock()d.servers = serversd.lastUpdate = time.Now()return nil
}// 刷新,有了注册中心,在客户端每次获取服务实例时候,需要刷新注册中心的保存的服务实例
func (d *GeeRegistryDiscovery) Refresh() error {d.rwMutex.Lock()defer d.rwMutex.Unlock()//注册中心保存的服务实例还没超时,不用更新if d.lastUpdate.Add(d.timeout).After(time.Now()) {return nil}httpClient := http.Client{Timeout: time.Second * 10} //http客户端最好有个超时resp, err := httpClient.Get(d.registryAddr)if err != nil {fmt.Println("rpc registry refresh err:", err)return err}defer resp.Body.Close()servers := strings.Split(resp.Header.Get("X-rpc-Servers"), ",")d.servers = make([]string, 0, len(servers))for _, server := range servers {//返回一个string类型,并将最前面和最后面的ASCII定义的空格去掉,中间的空格不会去掉s := strings.TrimSpace(server)if s != "" {d.servers = append(d.servers, s)}}d.lastUpdate = time.Now()return nil
}

 Get 和 GetAll 与 MultiServersDiscovery 相似,唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期

func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {if err := d.Refresh(); err != nil {return "", err}//d.Get(mode) 表示调用的是(GeeRegistryDiscovery).Getreturn d.MultiServerDiscovery.Get(mode) //d.MultiServerDiscovery是调用MultiServerDiscovery的Get()
}func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {if err := d.Refresh(); err != nil {return nil, err}return d.MultiServerDiscovery.GetAll()
}

4.测试

添加函数 startRegistry,之后需要稍微修改 startServer,定期向注册中心发送心跳保活(Heartbeat)。

这里使用sync.WaitGroup是为了等待该操作执行完毕才会往后执行,因为这些函数都是新开协程运行。

func startServer(registryAddr string, wg *sync.WaitGroup) {var myServie Myl, _ := net.Listen("tcp", "localhost:0") //端口是0表示端口随机server := geerpc.NewServer()//这里一定要用&myServie,因为前面Sum方法的接受者是*My;若接受者是My,myServie或者&myServie都可以server.Register(&myServie)registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)  //定时发送心跳wg.Done()server.Accept(l)
}func startRegistry(wg *sync.WaitGroup) {l, _ := net.Listen("tcp", "localhost:9999")registry.HandleHTTP()wg.Done()http.Serve(l, nil)
}

接下来,将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery,不再需要硬编码服务列表。 

这里就重点对比下NewGeeRegistryDiscovery方法和之前的不同之处。

// 调用单个服务实例
func clientCall(registryAddr string) {// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {log.Println("call Foo.Sum error:", err)}fmt.Println("reply: ", reply)}(i)}wg.Wait()
}func broadcast(registryAddr string) {// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("Broadcast reply: ", reply)ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)defer cancel()var replyTimeout int = 1324if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("timeout Broadcast reply: ", replyTimeout)}(i)}wg.Wait()
}

最后是main函数。

确保注册中心启动后,再启动 RPC 服务端,最后客户端远程调用。

func main() {registryAddr := "http://localhost:9999/_rpc_/registry"var wg sync.WaitGroupwg.Add(1)go startRegistry(&wg) //开启注册中心服务wg.Wait()time.Sleep(time.Second)wg.Add(2)go startServer(registryAddr, &wg)go startServer(registryAddr, &wg)wg.Wait()time.Sleep(time.Second)clientCall(registryAddr)broadcast(registryAddr)
}

运行结果:

代码: https://github.com/liwook/Go-projects/tree/main/geerpc/7-registry

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/249142.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何将Mac连接到以太网?这里有详细步骤

在Wi-Fi成为最流行、最简单的互联网连接方式之前&#xff0c;每台Mac和电脑都使用以太网电缆连接。这是Mac可用端口的标准功能。 如何将Mac连接到以太网 如果你的Mac有以太网端口&#xff0c;则需要以太网电缆&#xff1a; 1、将电缆一端接入互联网端口&#xff08;可以在墙…

IDEA2023打开新项目默认SDK变成了17

问题描述 项目安装了2个sdk版本&#xff0c;jdk8和jdk17 自从升级IDEA版本到2023以后&#xff0c;每次打开新项目&#xff0c;sdk都被默认选择成了jdk17, 每次都得手动修改 &#xff08;File--Project Structure&#xff09;&#xff0c;超级麻烦。 没有用的解决方法 以下这…

正则表达式与文本三剑客

目录 一、正则表达式 1. 定义 2. 字符匹配 3. 重复限定符 4. 位置锚点 5. 分组和引用 6. 扩展正则表达式 二、文本三剑客 1. grep 1.1 定义 1.2 语法 1.3 选项 1.4 示例 2. sed 2.1 定义 2.2 通式 2.3 选项 2.4 脚本格式&#xff08;脚本语法&#xff09; 2.…

Backtrader 文档学习- Broker - Cheat-On-Open

Backtrader 文档学习- Broker - Cheat-On-Open 1.概述 V1.9.44.116增加了Cheat On Open的支持。对于全押的人来说&#xff0c;这似乎是一个必需的功能&#xff0c;用bar的收盘价后进行计算&#xff0c;希望与开盘价相匹配。 当开盘价差距&#xff08;上涨或下跌&#xff0c;取…

DjangoURL调度器(二)

一、默认值与额外参数 1.1、默认值 1.1.1、urls.py from django.urls import pathfrom . import viewsurlpatterns [# http://127.0.0.1:8000/polls/blog/ 等同于 # http://127.0.0.1:8000/polls/blog/1/path(blog/, views.page),# http://127.0.0.1:8000/polls/blo…

BSV区块链将凭借Teranode的创新在2024年大放异彩

​​发表时间&#xff1a;2024年1月15日 2024年1月15日&#xff0c;瑞士楚格 – BSV区块链协会研发团队今日官宣了Teranode的突破性功能&#xff0c;这些功能将显著提升BSV区块链网络的效率和速度。在不久的将来&#xff0c;BSV区块链的交易处理能力将达到每秒100万笔交易。 T…

怎么备份ESXi虚拟机?

ESXI备份虚拟机与快照的区别是什么&#xff1f; 关于虚拟机备份的一个常见误解是它与虚拟机快照有何不同以及它们是否可以相互替代&#xff1f;为了回答这个问题&#xff0c;让我们仔细看看这两个概念&#xff1a; VM虚拟机快照&#xff1a;快照&#xff08;或Hyper-V 的检查…

神经网络的一些常规概念

epoch&#xff1a;是指所有样本数据在神经网络训练一次&#xff08;单次epoch(全部训练样本/batchsize)/iteration1&#xff09;或者&#xff08;1个epochiteration数 batchsize数&#xff09; batch-size&#xff1a;顾名思义就是批次大小&#xff0c;也就是一次训练选取的样…

网络安全03---Nginx 解析漏洞复现

目录 一、准备环境 二、实验开始 2.1上传压缩包并解压 2.2进入目录&#xff0c;开始制作镜像 2.3可能会受之前环境影响&#xff0c;删除即可 ​编辑 2.4制作成功结果 2.5我们的环境一个nginx一个php 2.6访问漏洞 2.7漏洞触发结果 2.8上传代码不存在漏洞 2.9补充&#…

Log4j2-29-log4j2 discard policy 极端情况下的丢弃策略 同步+异步配置的例子

Log4j2异步日志、同步日志和混合日志的配置详解 Log4j 2中记录日志的方式有同步日志和异步日志两种方式&#xff0c;其中异步日志又可分为使用AsyncAppender和使用AsyncLogger两种方式。 异步日志(性能最好&#xff0c;推荐使用) 异步日志情况下&#xff0c;增加 Disruptor …

深入浅出HBase:一文理解HBase基础概念(列存储、时间戳、key-value)、架构特点以及适合的使用场景

文章目录 一. HBase 数据模型1. 行存储与列式存储1.1. 行存储1.2. 列存储 2. HBase 数据模型2.1. 模型概览2.2. 列与列族2.3. 时间戳&#xff1a;定义数据版本2.4. HBase的Key-Value 三. HBase架构1. HBase读写流程简述2. HRegionServer内部内部数据流转&#xff1a;HRegion &l…

Web性能优化之如何评估网页性能——性能指标和度量工具介绍

前言 用户在访问 web 网页时&#xff0c;大部分都希望网页能够在一秒完成。事实上&#xff0c;加载时间每多 1 秒&#xff0c;就会流失 7%的用户。如果时间超过 8s 用户就会感到不耐烦、会放弃访问。这也就是著名的 “8秒原则”。 虽然当今设备及网络环境都大幅提升&#xff…

【Spark系列3】RDD源码解析实战

本文主要讲 1、什么是RDD 2、RDD是如何从数据中构建 一、什么是RDD&#xff1f; RDD&#xff1a;弹性分布式数据集&#xff0c;Resillient Distributed Dataset的缩写。 个人理解&#xff1a;RDD是一个容错的、并行的数据结构&#xff0c;可以让用户显式的将数据存储到磁盘…

DVI接口如何连接HDMI接口显示器?DVI转HDMI转换器DHA

DVI转HDMI转换器DHA简介 DVI转HDMI转换器DHA能够将DVI信号和R/L音频信号输入转换成HDMI信号输出,独特的功能使其顺畅地整合到家庭影院中&#xff0c;并且播放出高品质的图像。主要用于数据监控中心、大型会议展示中心、学校及各个公司 DVI转HDMI转换器DHA特点 01.支持分辨率4K…

【TCP】三次握手(建立连接)

前言 在网络通信的世界里&#xff0c;可靠传输协议&#xff08;TCP&#xff09;扮演着重要的角色&#xff0c;它保证了数据包能够按顺序、完整地从发送端传送到接收端。TCP协议中有一个至关重要的机制——三次握手。这一过程确保了两个TCP设备在开始数据传输之前建立起一个稳定…

内衣洗衣机有必要买吗?最好用的迷你洗衣机推荐

随着内衣洗衣机的流行&#xff0c;很多小伙伴在纠结该不该入手一款内衣洗衣机&#xff0c;专门来洗一些贴身衣物&#xff0c;答案是非常有必要的&#xff0c;因为我们现在市面上的大型洗衣机只能做清洁&#xff0c;无法对我们的贴身衣物进行一个高强度的清洁&#xff0c;而小小…

手写分布式存储系统v0.1版本

引言 这是手写分布式存储系统v0.1版本&#xff0c;只有一个目标就是支持通过tcp接收数据并落地到磁盘文件(单机模式)&#xff0c;那接下来就开始吧 设计 实现一个系统&#xff0c;设计是最过瘾的过程没有之一&#xff0c;类似你搭积木前在脑海设计构建一副大致的“雏形”&am…

力扣hot100 最小路径和 多维DP 滚动数组 一题多解

Problem: 64. 最小路径和 文章目录 思路&#x1f496; 朴素版&#x1f496; 空间优化版 思路 &#x1f468;‍&#x1f3eb; 路飞 &#x1f496; 朴素版 ⏰ 时间复杂度: O ( n m ) O(nm) O(nm) &#x1f30e; 空间复杂度: O ( n m ) O(nm) O(nm) class Solution {public …

查看Windows 所有账户方法

目标 了解Windows查看所有账户的方法 方法 方法1&#xff1a;本地和用户组 按下Win X键&#xff0c;选择“计算机管理”。在计算机管理界面的左侧面板中&#xff0c;展开“系统工具” -> “本地用户和组” -> “用户”。在右侧窗口中&#xff0c;查看列出的所有用户账…

数据结构+算法(第01篇):走下神坛吧!算法

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 学习必须往深处挖&…