【Go】十五、分布式系统、Consul服务注册发现、Nacos配置中心搭建

分布式

传统开发方式的痛点:

我们的服务分为很多种:用户服务、商品服务、订单服务等,若我们一个成熟的体系内,新添加一个服务,会变得十分的繁琐与困难

当我们的负载较大时,如果选择添加机器的方式来减轻负载,那么增加的机器需要修改很多配置文件甚至代码,这样的重新部署会导致一系列的问题。

此时,选用注册中心即可对这些问题有系统性的解决:

用户服务-web、用户服务-srv、商品服务-web、商品服务1-srv、商品服务2-srv… 都需要注册到注册中心中,也就是说,具有一个注册中心,其管控着所有的服务

也就是说:当我们的一个服务要调用另一个服务时,其会首先到注册中心拉取对应的服务信息,再通过从注册中心获取的信息来访问对应的服务

注册中心技术选型

优点缺点接口一致性算法
zookeeper1. 功能强大
2. watcher 机制,实时获取服务提供者的状态
3. dubbo 等框架支持
1. 没有健康检查
2. 需要在服务中集成sdk,复杂度高
3. 不支持多数据中心
sdkPaxos
consul1. 简单易用,不需要集成sdk
2. 自带健康检查
3. 支持多数据中心
4. 提供web管理界面
不能实时获取服务变化http/dnsRaft
etcd1. 简单易用,不需要集成sdk
2. 可配置性强
1. 没有健康检查
2. 配合第三方工具一起完成服务发现
3. 不支持多数据中心
httpRaft

由于这里使用golang 进行开发,最贴合的还是 consul 作为注册中心,最为强大,也可选用 Nacos

consul

consul 的资料可以在 github 上找到

docker 拉取 consul:

docker run -d \
-p 8500:8500 \
-p 8300:8300 \
-p 8301:8301 \
-p 8302:8302 \
-p 8600:8600/udp \
consul consul agent -dev -client=0.0.0.0

使用这种命令的话,默认访问8500就是consul 的http端口、8600就是consul 的dns端口

// TODO : 1-2 5min 左右

  • consul 支持其作为 DNS 服务器:

    其可以将传过来的域名解析为对应的IP地址再进行进一步访问

consul

consul 是一个强大的服务注册中心,其可以同时作为服务注册中心和 DNS 地址解析服务器,在这种情况下,consul 还提供服务健康检查的机制,但与 Nacos 不同的是,consul 的服务注册不是直接进行发现的,而是要通过发送请求进行配置,我们要发送的请求如下:

PUT
http://192.168.202.140:8500/v1/agent/service/register
Body - json:
{"Name": "mxshop-web","ID": "mxshop-web","Tags": ["web", "mxshop", "xxx", "笑死我啦哈哈哈"],"Address": "127.0.0.1","Port": 50051
}

之后就可以在 consul 的控制台中看到我们注册的内容了,但要注意,通过这种方式的注册是没有健康监测功能的,如果我们需要健康监测功能,需要在请求中添加额外的参数

服务注销接口:

PUT
http://192.168.202.140:8500/v1/agent/service/deregister/mxshop-web

这里的请求最后一个位置要填写我们注册的服务的 id

对于每一个服务来讲,都需要将其注册到注册中心,而只有被调用的环节只需要进行服务注册,但还调用其他模块的服务还需要配置服务发现功能

consul 的 go 语言集成

go 语言集成 consul 测试,在任意一个地方创建一个 go 文件:

package mainimport "github.com/hashicorp/consul/api"func Register(address string, port int, name string, tags []string, id string) error {cfg := api.DefaultConfig()cfg.Address = "192.168.202.140:8500" // consul 的地址client, err := api.NewClient(cfg)if err != nil {panic(err)}// 生成 consul 的注册对象// 配置基础信息registration := new(api.AgentServiceRegistration)registration.Name = nameregistration.ID = idregistration.Tags = tagsregistration.Port = portregistration.Address = address// 配置检查对象,也就是健康检查机制check := &api.AgentServiceCheck{HTTP:                           "http://192.168.10.48:8021/health", // 发送 GET 请求来进行健康检查,服务的地址Timeout:                        "5s",                               // 每次健康检查中,多久没有回复视为健康检查失败Interval:                       "5s",                               // 进行健康检查的频率DeregisterCriticalServiceAfter: "10s",                              // 不健康服务允许存活的时间,当一个服务被检查为不健康时,若 10s 内其没有转为健康,则将其从服务中删除}// 将检查对象配置进 consul 的注册对象 registration 中registration.Check = check// 将配置的 consul 注册进去err = client.Agent().ServiceRegister(registration)if err != nil {panic(err)}return nil}func main() {_ = Register("192.168.10.48", 8021, "user-web", []string{"testtt"}, "user-web")
}

按照如上机制就可以将我们的服务注册进去

consul 获取服务节点信息

下面是一个获取 consul 中所有服务节点内容的示例:

package mainimport ("fmt""github.com/hashicorp/consul/api"
)func AllServices() {cfg := api.DefaultConfig()cfg.Address = "192.168.202.140:8500"client, err := api.NewClient(cfg)if err != nil {panic(err)}// 获取所有的服务内容data, err := client.Agent().Services()if err != nil {panic(err)}for key, _ := range data {fmt.Println(key)}
}func main() {AllServices()
}

这样子获取到的是所有服务节点的名称:

gulimail-user
gulimail-web
mxshop-user
mxshop-web
  • 进一步的

如果我们希望获取某些特定服务节点,就需要用到 consul 提供的过滤器来进行操作:

package mainimport ("fmt""github.com/hashicorp/consul/api"
)func AllServices() {cfg := api.DefaultConfig()cfg.Address = "192.168.202.140:8500"client, err := api.NewClient(cfg)if err != nil {panic(err)}// 获取全部 Services 名称严格等于 gulimail-web 的服务// 如果我们要获取 ID ... 就可以写: `ID == "gulimail-user"`data, err := client.Agent().ServicesWithFilter(`Service == "gulimail-web"`)for key, _ := range data {fmt.Println(key)}
}func main() {AllServices()
}

输出为:

gulimail-web

consul - GRPC 健康检查

由于 GRPC 不是以简单的 HTTP 协议进行传输数据的,其 默认使用Proto进行数据传输,这就导致其心跳机制不能简单的开启一个配置就完成,而是应该配置其自己的 Proto 的规范:

在 main.go 中添加如下依赖:

// 引入如下包
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/health"...// 在 server 创建之后添加如下监听功能
// 绑定服务健康检查
grpc_health_v1.RegisterHealthServer(server, health.NewServer())

在 config/config.go 中添加如下配置对象:

主要是 ConsulConfig MysqlConfig 是现补的

package configtype MysqlConfig struct {Host     string `mapstructure:"host" json:"host"`Port     int    `mapstructure:"port" json:"port"`Name     string `mapstructure:"db" json:"db"`User     string `mapstructure:"user" json:"user"`Password string `mapstructure:"passord" json:"password"`
}type ConsulConfig struct {Host string `mapstruce:"host" json:"host"`Port int    `mapstruct:"port" json:"port"`
}type ServerConfig struct {MysqlInfo  MysqlConfig  `mapstructure:"mysql" json:"mysql"`ConsulInfo ConsulConfig `mapstructure:"consul" json:"consul"`
}

之后添加 GRPC 服务 向 CONSUL 的添加和 状态检测机制,下面是完整的 main.go 代码

package mainimport ("flag""fmt""mxshop_srvs/user_srv/global""mxshop_srvs/user_srv/initialize""net""github.com/hashicorp/consul/api""google.golang.org/grpc""google.golang.org/grpc/health""google.golang.org/grpc/health/grpc_health_v1""mxshop_srvs/user_srv/handler""mxshop_srvs/user_srv/proto"
)func main() {// 由于ip和端口号有可能需要用户输入,所以这里摘出来// flag 包是一个命令行工具包,允许从命令行中设置参数IP := flag.String("ip", "0.0.0.0", "ip地址")Port := flag.Int("port", 50051, "端口号")initialize.InitLogger()initialize.InitConfig()flag.Parse()fmt.Println("ip: ", *IP)fmt.Println("port: ", *Port)// *************************************************************************************// 从这里开始是 GRPC 的心跳检测和服务注册功能// 创建新服务器server := grpc.NewServer()// 注册自己的已实现的方法进来proto.RegisterUserServer(server, &handler.UserServer{})//lis, err := net.Listen("tcp", fmt.Sprintf("192.168.202.140:8021"))lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", *IP, *Port))if err != nil {panic("failed to listen" + err.Error())}// 绑定服务健康检查grpc_health_v1.RegisterHealthServer(server, health.NewServer())// 服务注册cfg := api.DefaultConfig()cfg.Address = fmt.Sprintf("%s:%d", global.ServerConfig.ConsulInfo.Host, global.ServerConfig.ConsulInfo.Port)client, err := api.NewClient(cfg)if err != nil {panic(err)}check := &api.AgentServiceCheck{GRPC:                           fmt.Sprintf("192.168.0.111:50051"),Interval:                       "5s",DeregisterCriticalServiceAfter: "15s",}registration := new(api.AgentServiceRegistration)registration.Address = "192.168.0.111"	// 这里是自己服务的地址,这里我写的是本机registration.ID = global.ServerConfig.Nameregistration.Port = *Portregistration.Tags = []string{"imooc", "bobby", "user", "srv", "666"}registration.Name = global.ServerConfig.Nameregistration.Check = checkerr = client.Agent().ServiceRegister(registration)if err != nil {panic(err)}// ******************************************************************************************// 将自己的服务绑定端口err = server.Serve(lis)if err != nil {panic("fail to start grpc" + err.Error())}
}

consul-服务发现

对于我们的 gin 服务来说,更为重要的是服务发现的功能,因为具备服务发现功能,才能从consul中发现尚在服务中的服务

先进行配置:

config-debug.yaml:

consul:host: "192.168.202.140"port: "8500"

config.go:

type ServerConfig struct {Name        string        `mapstructure:"name"`Port        int32         `mapstructure:"port"`UserSrvInfo UserSrvConfig `mapstructure:"user_srv"`JWTInfo     JWTConfig     `mapstructure:"jwt"`AliSmsInfo  AliSmsConfig  `mapstructure:"sms"`RedisInfo   RedisConfig   `mapstructure:"redis"`ConsulInfo  ConsulConfig  `mapstructure:"consul"`
}type ConsulConfig struct {Host string `mapstructure:"host"`Port string `mapstructure:"port"`
}

之后我们就可以改写我们的服务,让我们在拉取 grpc 服务时通过consul 进行拉取,以实现服务发现

我们测试在 user-api 中进行服务发现的添加

import "github.com/hashicorp/consul/api"

之后进行服务的发现:

user.go

func GetUserList(ctx *gin.Context) {// 从注册中心获取用户信息:cfg := api.DefaultConfig()consulInfo := global.ServerConfig.ConsulInfocfg.Address = fmt.Sprintf("%s:%d", consulInfo.Host, consulInfo.Port)userSrvHost := ""userSrvPort := 0client, err := api.NewClient(cfg)if err != nil {panic(err)}data, err := client.Agent().ServicesWithFilter(fmt.Sprintf("Service == \"%s\"", global.ServerConfig.UserSrvInfo.Name))//data, err := client.Agent().ServicesWithFilter(fmt.Sprintf(`Service == "%s"`, global.ServerConfig.UserSrvInfo.Name))if err != nil {panic(err)}for _, value := range data {userSrvHost = value.AddressuserSrvPort = value.Port}if userSrvHost == "" {ctx.JSON(http.StatusBadRequest, gin.H{"msg": "用户服务不可达",})}//ip := "127.0.0.1"//port := 50051// 拨号连接用户 GRPC 服务//userConn, err := grpc.Dial(fmt.Sprintf("%s:%d", global.ServerConfig.UserSrvInfo.Host, global.ServerConfig.UserSrvInfo.Port), grpc.WithInsecure())// 引入consul后,这个位置就不再是普通的了,而是使用 Consul 中通过服务发现取出来的了userConn, err := grpc.Dial(fmt.Sprintf("%s:%d", userSrvHost, userSrvPort), grpc.WithInsecure())if err != nil {zap.L().Error("[GetUserList] 连接 【用户服务失败】",zap.String("msg", err.Error()))}// 生成 grpc 的 client 并调用接口userSrvClient := proto.NewUserClient(userConn)// 测试 ID 是否可以取到claims, _ := ctx.Get("claims")currentUser := claims.(*models.CustomClaims)zap.S().Infof("访问用户: %d", currentUser.ID)// 通过上下文 gin.Context 获取请求参数// 若能找到对应的请求参数,则返回传入的请求参数,若不存在,则返回默认值pn := ctx.DefaultQuery("pn", "0")pnInt, _ := strconv.Atoi(pn)pSize := ctx.DefaultQuery("psize", "10")pSizeInt, _ := strconv.Atoi(pSize)rsp, err := userSrvClient.GetUserList(context.Background(), &proto.PageInfo{Pn:    uint32(pnInt),PSize: uint32(pSizeInt),})if err != nil {zap.L().Error("[GetUserList] 查询 用户列表失败")HandleGrpcErrorToHttp(err, ctx)return}// 构建请求结果result := make([]interface{}, 0)for _, value := range rsp.Data {//data := make(map[string]interface{}) // 创建一个 map//data["id"] = value.Id//data["name"] = value.NickName//data["birth"] = value.BirthDay//data["gender"] = value.Gender//data["mobile"] = value.Mobilevar user = response.UserResponse{Id:       value.Id,NickName: value.NickName,Birthday: response.JsonTime(time.Unix(int64(value.BirthDay), 0)),//Birthday: time.Time(time.Unix(int64(value.BirthDay), 0)).Format("2006-01-02"),//Birthday: time.Time(time.Unix(Int64(value.BirthDay), 0)),Gender: value.Gender,Mobile: value.Mobile,}result = append(result, user)}// 利用上下文的 JSON 转换返回结果,在这里将结果返回给请求ctx.JSON(http.StatusOK, result)
}

这是全部的 getUserList 接口的代码,这里前面是通过consul来获取用户服务,就可以直接进行开启了,但注意这里需要配置 config-debug.yml:

这里需要配置host 和 name

user_srv:host: "192.168.102.177"port: 50051name: "user-srv"

将consul 配置由拦截器(全局变量)实现

设置一个全局变量,将这个全局变量配置进来,以实现 consul 的功能

本质上我们是通过 consul 来实现找到 我们的 GRPC 服务并生成 userSrvClient 对象来进行后续对 GRPC 服务的调用的,所以我们此时可以将 userSrvClient 定义为全局变量来实现一次定义、多处使用的效果。

在 global 中进行定义:

// 全局变量
var (// 用于读取配置ServerConfig *config.ServerConfig = &config.ServerConfig{}// 用于进行错误处理Trans ut.Translator// 进行UserClient grpc 服务的生成UserSrvClient proto.UserClient
)

在initialize 中创建 srv_conn.go 用于对GRPC 服务的连接初始化

func InitUserSrvConn() {// 从注册中心获取用户信息:cfg := api.DefaultConfig()consulInfo := global.ServerConfig.ConsulInfocfg.Address = fmt.Sprintf("%s:%d", consulInfo.Host, consulInfo.Port)userSrvHost := ""userSrvPort := 0client, err := api.NewClient(cfg)if err != nil {panic(err)}data, err := client.Agent().ServicesWithFilter(fmt.Sprintf("Service == \"%s\"", global.ServerConfig.UserSrvInfo.Name))//data, err := client.Agent().ServicesWithFilter(fmt.Sprintf(`Service == "%s"`, global.ServerConfig.UserSrvInfo.Name))if err != nil {panic(err)}for _, value := range data {userSrvHost = value.AddressuserSrvPort = value.Port}if userSrvHost == "" {zap.S().Fatal("[InitUserSrvConn] 用户服务无法获取 ")}//ip := "127.0.0.1"//port := 50051// 拨号连接用户 GRPC 服务//userConn, err := grpc.Dial(fmt.Sprintf("%s:%d", global.ServerConfig.UserSrvInfo.Host, global.ServerConfig.UserSrvInfo.Port), grpc.WithInsecure())// 引入consul后,这个位置就不再是普通的了,而是使用 Consul 中通过服务发现取出来的了userConn, err := grpc.Dial(fmt.Sprintf("%s:%d", userSrvHost, userSrvPort), grpc.WithInsecure())if err != nil {zap.L().Error("[GetUserList] 连接 【用户服务失败】",zap.String("msg", err.Error()))}// 生成 grpc 的 client 并调用接口userSrvClient := proto.NewUserClient(userConn)global.UserSrvClient = userSrvClient
}

并将其在 main.go 中进行调用,这里的详细调用就不再做详细介绍,只要在 initialize config 之后进行调用就可以

之后改造 对应的 GetUserList 接口进行尝试

func GetUserList(ctx *gin.Context) {userSrvClient := global.UserSrvClient// 测试 ID 是否可以取到claims, _ := ctx.Get("claims")currentUser := claims.(*models.CustomClaims)zap.S().Infof("访问用户: %d", currentUser.ID)// 通过上下文 gin.Context 获取请求参数// 若能找到对应的请求参数,则返回传入的请求参数,若不存在,则返回默认值pn := ctx.DefaultQuery("pn", "0")pnInt, _ := strconv.Atoi(pn)pSize := ctx.DefaultQuery("psize", "10")pSizeInt, _ := strconv.Atoi(pSize)rsp, err := userSrvClient.GetUserList(context.Background(), &proto.PageInfo{Pn:    uint32(pnInt),PSize: uint32(pSizeInt),})if err != nil {zap.L().Error("[GetUserList] 查询 用户列表失败")HandleGrpcErrorToHttp(err, ctx)return}// 构建请求结果result := make([]interface{}, 0)for _, value := range rsp.Data {var user = response.UserResponse{Id:       value.Id,NickName: value.NickName,Birthday: response.JsonTime(time.Unix(int64(value.BirthDay), 0)),Gender: value.Gender,Mobile: value.Mobile,}result = append(result, user)}// 利用上下文的 JSON 转换返回结果,在这里将结果返回给请求ctx.JSON(http.StatusOK, result)
}

但要注意的是:

  1. 此时我们的服务正在运行,但如果我们获取到的 GPC 服务如果下线了,其也不会自动重新获取 GRPC服务,或者改端口或IP了,其都无法实现自动检错和修改
  2. 我们的服务在一次启动时就进行了 TCP 的三次握手,没有在每次功能调用时进行三次握手,所以这样做的性能很高
  3. 我们这样做仅仅实现了一条连接 但这个链接由多个 groutine 来实现,我们可以考虑使用 GRPC 连接池来进行优化(grpc-pool), 不过负载均衡同样可以解决这个问题。

负载均衡

端口分配

在服务的启动过程中,每个服务都需要一个端口,一个服务若需要启动多个实例也需要多个端口,我们认为的去维护这个端口是一个比较复杂且不必要的情况,故而我们可以考虑动态分配端口、端口也动态获取的情况,可以让我们不再考虑端口带来的复杂情况。

我们创建一个 utils 目录,在目录中创建一个 addr.go 工具用来获取端口:

端口分配的核心逻辑

package utilsimport ("net"
)func GetFreePort() (int, error) {// 当指定端口号为 0 时,操作系统会自动分配一个未被使用的端口给这个 TCP 地址addr, err := net.ResolveTCPAddr("tcp", "localhost:0")if err != nil {return 0, err}l, err := net.ListenTCP("tcp", addr)if err != nil {return 0, err}defer l.Close()return l.Addr().(*net.TCPAddr).Port, nil
}

接着在 main 的位置进行设置,设置为 生产环境自动获取,开发环境固定

	viper.AutomaticEnv()debug := viper.GetBool("MXSHOP-DEBUG")fmt.Println(debug)if debug {port, err := utils.GetFreePort()if err == nil {global.ServerConfig.Port = int32(port)}}

在合适的位置添加如上代码来保证端口号的自动获取

在我们的 GPRC服务上也进行如上配置:

GRPC 的 main.go

package mainimport ("flag""fmt""mxshop_srvs/user_srv/global""mxshop_srvs/user_srv/initialize""mxshop_srvs/user_srv/utils""net""github.com/hashicorp/consul/api""google.golang.org/grpc""google.golang.org/grpc/health""google.golang.org/grpc/health/grpc_health_v1""mxshop_srvs/user_srv/handler""mxshop_srvs/user_srv/proto"
)func main() {// 由于ip和端口号有可能需要用户输入,所以这里摘出来// flag 包是一个命令行工具包,允许从命令行中设置参数IP := flag.String("ip", "0.0.0.0", "ip地址")Port := flag.Int("port", 0, "端口号")initialize.InitLogger()initialize.InitConfig()flag.Parse()fmt.Println("ip: ", *IP)// 设置端口号自动获取if *Port == 0 {*Port, _ = utils.GetFreePort()}fmt.Println("port: ", *Port)// 创建新服务器server := grpc.NewServer()// 注册自己的已实现的方法进来proto.RegisterUserServer(server, &handler.UserServer{})//lis, err := net.Listen("tcp", fmt.Sprintf("192.168.202.140:8021"))lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", *IP, *Port))if err != nil {panic("failed to listen" + err.Error())}// 绑定服务健康检查grpc_health_v1.RegisterHealthServer(server, health.NewServer())// 服务注册cfg := api.DefaultConfig()cfg.Address = fmt.Sprintf("%s:%d", global.ServerConfig.ConsulInfo.Host, global.ServerConfig.ConsulInfo.Port)client, err := api.NewClient(cfg)if err != nil {panic(err)}check := &api.AgentServiceCheck{GRPC:     fmt.Sprintf("192.168.102.177:%d", *Port),Interval: "5s",//Timeout:                        "10s",DeregisterCriticalServiceAfter: "30s",}registration := new(api.AgentServiceRegistration)registration.Address = "192.168.102.177"//registration.Address = "127.0.0.1"registration.ID = global.ServerConfig.Nameregistration.Port = *Portregistration.Tags = []string{"imooc", "bobby", "user", "srv", "666"}registration.Name = global.ServerConfig.Nameregistration.Check = checkerr = client.Agent().ServiceRegister(registration)if err != nil {panic(err)}//err = server.Serve(lis)// 将自己的服务绑定端口err = server.Serve(lis)if err != nil {panic("fail to start grpc" + err.Error())}
}

负载均衡

负载均衡就是指,我们在调用服务时,如何选取合适服务的策略,一个服务可能有很多的机器来分担压力,这些压力的分担策略就是负载均衡策略。

对于 HTTP 服务,其实 NGINX 就可以直接完成负载均衡的工作,但是对于 GRPC 服务,我们还需要进一步探究

另外的,对于用户请求,从网关发送到 GIN 服务的位置,也需要进行负载均衡考虑

负载均衡的三种策略:

  • 集中式负载均衡

    在用户调用和服务之间插入一个第三方软件或硬件,所有的用户负载均衡都通过这种方式进入,这种负载均衡策略有明显的劣势,所有的流量都要经过这个第三方负载均衡器,非常容易导致系统出现问题

  • 进程内负载均衡

    再 web 服务内启动一个 goroutine 在 gin 启动之前获取连接表,并生成 TCP 连接,在后续的调用中进行直接使用,每一个web服务单独维护自己的负载均衡机制,避免一个全服务的集中负载均衡器,优点是避免了集中式负载均衡的集中问题,缺点是需要每个web服务自己去实现一个自己独立的负载均衡机制,提高了人员工作量。

  • 独立负载均衡

    在web服务的同一台机器上部署一个独立的负载均衡 LoadBalance 器,这个独立的负载均衡器既规避了需要独立开发服务的问题,也避免了需要将所有的流量都进行集中的问题,但由于其在每个web服务的机器上都部署一个独立的负载均衡器,其维护成本会偏高,并且也需要独立编写 watchDog 机制来对负载均衡器的在线状态进行检测,也是一种弊端较大的机制。

一般来说我们使用第二种,进程内负载均衡的机制会比较多。

负载均衡的算法:

  • 轮询法

    针对于每一个请求,让他们依次按顺序访问服务

  • 随机法

    见名知意

  • 源地址哈希法

    对于同一个用户的服务,将其进行 hash 运算,生成一个固定的数,对这个固定的数和服务数进行取模来选取机器,这样的优点是每个用户访问的服务是固定的,可以独立创建数据库,大幅度降低服务压力,但是如果我们要新增服务,我们原来的数据就会全面失效,但针对于这个问题,还有一致性哈希可以解决这个问题。

  • 加权轮询法

    根据服务器的配置和负载生成一个权重,根据权重对服务进行随机选择。

  • 最小连接数法

    最小连接数法会根据当前所有服务的现存积压连接数的多少来进行负载均衡,负载均衡会根据当前积压连接数最少的服务进行分配。

基于 GRPC 的负载均衡和 Consul 的集成策略

GRPC 的负载均衡策略是基于第三方策略和内部策略两种形式的,这两种形式都是可以被允许的,只要在配置中进行配置就可。

另外,对于GRPC 对于 CONSOLE 的负载均衡连接,我们一般使用 grpc-console-resolver 组件对 console 内容进行拉取,这个组件的作用是将console注册中心的信息拉取到服务中

注意:这个包的使用可能不会真正引入到这个包的某一个变量,但这个包还是必须要引入的,因为引入这个包就是引入这个包里对于 Console连接的对应内容的 init 方法,这个方法会帮助我们直接生成Console的相关信息。

注意在此处我们测试,也将 proto 的整个文件夹复制过来进行测试

main.go:

必须引入的包:

_ "github.com/mbobakov/grpc-consul-resolver"

全部的逻辑以及对于连接的尝试

package mainimport ("GoTes/grpclb_test/proto""context""fmt"_ "github.com/mbobakov/grpc-consul-resolver""google.golang.org/grpc"
)func main() {// 注意这里是尝试连接的操作,具体行为是:第一个是 consul 的地址,第一个标识是服务名,后面的是连接等待时间,tag是服务携带的标签的过滤波器,注意,这里的过滤是且的逻辑// grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), 这段代码标注了负载均衡方式是 轮询conn, err := grpc.Dial("consul://192.168.202.140:8500/user-srv?wait=14s&tag=srv",grpc.WithInsecure(),grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),)if err != nil {panic(err)}defer conn.Close()userSrvClient := proto.NewUserClient(conn)rsp, err := userSrvClient.GetUserList(context.Background(), &proto.PageInfo{Pn:    1,PSize: 2,})if err != nil {panic(err)}for index, data := range rsp.Data {fmt.Println(index, data)}}

注意此处的两个问题:

  1. 我们的 user-srv 在启动的时候只启动了一个,我们获取服务的时候也只会获取这一个,无法体现出负载均衡的效果
  2. 我们在终端中进行重复启动服务的时候,会进行覆盖
  3. 我们在进行consul 服务注册的时候,相同的服务ID会覆盖前一个,所以我们在进行测试的时候还需要进一步的考虑

我们可以使用终端 go run main.go 来进行服务启动,这个时候我们启动两个终端来进行服务启动就可以启动两个服务了,但是我们的后一个服务会将前一个服务覆盖。(注意这里的启动需要在外层服务启动,也就是 go.mod 的同一层也就是 main.go 的上一层 )

所以此处的解决方案是:将 consul 的注册时的ID修改为唯一的,此处选用的唯一方案是:UUID

下面是更新后的服务注册的效果:

引入包:

"github.com/satori/go.uuid"
	//registration.ID = global.ServerConfig.Name		// 此处修改为使用 UUID 生成registration.ID = fmt.Sprintf("%s", uuid.NewV4())		// 此处修改为使用 UUID 生成

再次尝试,我们就会发现 我们的 user-srv 实例中含有两个实例了

此处有一个需要理解的小案例,这个案例很适用于理解go语言的基础知识:

我们监听 go grpc 服务的优雅退出,即一旦出现 使用 ctrl + c 进行服务退出的场景,我们就实现立刻从 consul 中将服务取消并在控制台提示服务注销成功的提示(否则就只能等待一分钟 consul 检测不到服务活动才会自动将服务取消):

// 注意此处是阻塞式的所以需要一个 goroutine 来进行异步操作// 将自己的服务绑定端口go func() {err = server.Serve(lis)if err != nil {panic("fail to start grpc" + err.Error())}}()// 创建一个通道quit := make(chan os.Signal)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)// 阻塞住,若接到请求则放通,直接将服务注销<-quitif err = client.Agent().ServiceDeregister(serviceID); err != nil {zap.S().Info("注销失败...")}zap.S().Info("注销成功")

这是一个很好的理解 goroutine 和 信号消息队列机制的例子

配置负载均衡到 gin 服务中

这里要修改的是 initialize/srv_conn.go 这个策略:

src_conn.go

import ("fmt""github.com/hashicorp/consul/api"_ "github.com/mbobakov/grpc-consul-resolver""go.uber.org/zap""google.golang.org/grpc""mxshop-api/user-web/global""mxshop-api/user-web/proto"
)func InitUserSrvConn() {consulInfo := global.ServerConfig.ConsulInfouserConn, err := grpc.Dial(fmt.Sprintf("consul://%s:%d/%s?wait=14s", consulInfo.Host, consulInfo.Port, global.ServerConfig.UserSrvInfo.Name),grpc.WithInsecure(),grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),)if err != nil {zap.S().Fatal("InitUserSrvConn, 建立用户服务连接失败")}userSrvClient := proto.NewUserClient(userConn)global.UserSrvClient = userSrvClient}

将原来的 InitUserSrvConn() 修改为 不可用,用这里的新的服务

配置中心

我们需要远程的统一配置,来避免本地配置会出现的一系列问题

  • 大型项目中,每一个服务的实例过多,一个服务可能有20多个实例,这种情况下若要一个一个修改配置文件是很不现实的,且极容易出错。
  • 大型项目中会有很多的服务,有时可能会达到100多个服务,这些服务有时候会有公共配置文件,对于这些公共配置文件来讲,如果要一个一个改的话也会极为复杂
  • 难以动态监听配置文件变化,需要服务重启,golang 中有 viper、但是其他语言的对应框架有不同的用法,很割裂
  • 极容易出错

而我们的配置中心通常来讲需要具有以下功能:配置实时推送、权限管理、集群支持度、配置回滚、环境隔离等多项能力,用以解决上述问题。

目前的主流配置中心有:apollo、nacos,当然我们也可以直接使用consul 做我们的配置中心

  • apollo 是协程开源的配置中心,其专注于配置中心,功能大且完善,但其没有丰富的多语言支持,其多语言支持都是借助于第三方开发人员实现的
  • nacos 是 alibaba 开源的配置中心,同时也集成了服务注册和发现功能,但其服务注册发现功能没有 consul 的生态更完善,但其官方支持多语言开发,较为稳定

使用 Docker 安装 Nacos

一键安装拉取:

docker run --name nacos-standalone -e MODE=standlone -e JVM_XMS=512m -e JVM_XMX=512m -e JVM_XMN=256m -p 8848:8848 -d nacos/nacos-server:latest# 下面是可以的,上面最新版本可能不支持快速启动了?docker run --name nacos-standalone -e MODE=standalone -e JVM_XMS=512m -e JVM_XMX=512m -e JVM_XMN=256m -p 8848:8848 -p 9848:9848 -d nacos/nacos-server:v2.2.0

Nacos 的基本使用

打开nacos控制台,我们一般使用 命名空间 + 配置文件的方式对配置进行管理,每一个服务对应一个命名空间,每一个命名空间下对应多个配置文件

例如 user-web 和 user-srv 就是同一个命名空间下的服务

我们使用组来区分生产、开发和测试环境

  • 通过 api 调用配置中心

我们尝试新建一个 user 命名空间中的文件:user-srv.json 组为 dev 格式为 json ,进行简单配置:

{"name": "user-srv"
}

测试:

在一个标准 main.go 中创建如下测试代码:

package mainimport ("fmt""github.com/nacos-group/nacos-sdk-go/clients""github.com/nacos-group/nacos-sdk-go/common/constant""github.com/nacos-group/nacos-sdk-go/vo"
)func main() {// 连接 nacos 服务器sc := []constant.ServerConfig{{IpAddr: "192.168.202.140",Port:   8848,},}// 创建客户端配置对象,配置命名空间,存活时间等信息cc := constant.ClientConfig{NamespaceId:         "2a856b71-60d2-44ff-8ff4-4ae698544724",TimeoutMs:           5000,NotLoadCacheAtStart: true,LogDir:              "tmp/nacos/log",CacheDir:            "tmp/nacos/cache",LogLevel:            "debug",}// 之后将创建的这两个对象传入到具体的配置对象中configClient, err := clients.CreateConfigClient(map[string]interface{}{"serverConfigs": sc,"clientConfig":  cc,})if err != nil {panic(err)}// 获取配置content, err := configClient.GetConfig(vo.ConfigParam{DataId: "user-srv.json",Group:  "dev",})if err != nil {panic(err)}fmt.Println(content)
}

标准输出:

{"name": "user-srv"
}

若要动态监听配置变化,在GetConfig 后添加 ListenConfig 的参数中添加 OnChange 参数来获取监听配置文件变化的能力:

	// 添加配置监听的变化信息err = configClient.ListenConfig(vo.ConfigParam{DataId: "user-srv.json",Group:  "dev",OnChange: func(namespace, group, dataId, data string) {fmt.Println("配置文件发生变化")fmt.Println("namespace: " + namespace)fmt.Println("group: " + group)fmt.Println("dataId: " + dataId)fmt.Println("data: " + data)},})if err != nil {panic(err)}

Nacos 集成在 GIN 中

此时我们的 Nacos 就基本配置完成了,但我们原先的配置文件还需要配置一个 Nacos 地址就可以了,剩下的我们全部在远程注册中心Nacos 中完成

我们先建立本地的 Nacos 配置文件,这个配置文件和 java 中的 bootstrap.yml 有异曲同工之妙

host: '192.168.202.140'
port: 8848
namespace: '2a856b71-60d2-44ff-8ff4-4ae698544724'
user: 'nacos'
password: 'nacos'
dataid: 'user-web.json'
group: 'dev'

之后我们在 Config.go 中将Nacos 的读取信息录入:

type NacosConfig struct {Host string `mapstructure:"host"`Port int `mapstructure:"port"`Namespace string `mapstructure:"namespace"`User string `mapstructure:"user"`Password string `mapstructure:"password"`Dataid string `mapstructure:"dataid"`Group string `mapstructure:"group"`
}

并且将 NacosConfig 作为全局变量提前生成

在 global.go 中添加

var (// 用于读取配置ServerConfig *config.ServerConfig = &config.ServerConfig{}// 用于进行错误处理Trans ut.Translator// nacos 配置NacosConfig *config.NacosConfig = &config.NacosConfig{}// 进行UserClient grpc 服务的生成UserSrvClient proto.UserClient
)

之后再 InitConfig 中将对应的信息进行初始化:

initialzie/config.go

func InitConfig() {configFileName := "user-web/config-pro.yaml"debug := GetenvInfo("MXSHOP-DEBUG")if debug {configFileName = "user-web/config-debug.yaml"}v := viper.New()v.SetConfigFile(configFileName)if err := v.ReadInConfig(); err != nil {panic(err)}// 注意这里应该是全局变量,全局变量的部署应该是在 global 目录中//serverConfig := config.ServerConfig{}if err := v.Unmarshal(global.NacosConfig); err != nil {panic(err)}zap.L().Info(fmt.Sprintf("配置信读取:%v", global.NacosConfig))fmt.Println(global.NacosConfig)sc := []constant.ServerConfig{{IpAddr: global.NacosConfig.Host,Port:   global.NacosConfig.Port,},}cc := constant.ClientConfig{NamespaceId:         global.NacosConfig.Namespace,TimeoutMs:           5000,NotLoadCacheAtStart: true,LogDir:              "tmp/nacos/log",CacheDir:            "tmp/nacos/cache",LogLevel:            "debug",}configClient, err := clients.CreateConfigClient(map[string]interface{}{"serverConfigs": sc,"clientConfig":  cc,})if err != nil {zap.S().Fatalf("initialize config fail: %s", err.Error())}content, err := configClient.GetConfig(vo.ConfigParam{DataId: global.NacosConfig.Dataid,Group:  global.NacosConfig.Group,})if err != nil {zap.S().Fatal("initialize config fail: %s", err.Error())}zap.S().Infof("config info read success: %s", content)// 监听远程配置信息变化err = configClient.ListenConfig(vo.ConfigParam{DataId: global.NacosConfig.Dataid,Group:  global.NacosConfig.Group,OnChange: func(namespace, group, dataId, data string) {fmt.Println("配置文件发生变化")fmt.Println("namespace: " + namespace)fmt.Println("group: " + group)fmt.Println("dataId: " + dataId)fmt.Println("data: " + data)},})fmt.Println(content)err = json.Unmarshal([]byte(content), &global.ServerConfig)if err != nil {zap.S().Fatalf("NACOS read fail: %s", err.Error())}fmt.Println(global.ServerConfig)//v.WatchConfig()//v.OnConfigChange(func(e fsnotify.Event) {//	zap.S().Infof("配置文件产生变化:%s", e.Name)//	v.ReadInConfig()//	v.Unmarshal(global.ServerConfig)//	zap.L().Info(fmt.Sprintf("修改了配置信息:%v\n", global.ServerConfig))//})
}

Nacos 集成在 grpc 中

操作和 集成在 GIN 中的思路完全一致:

package initializeimport ("encoding/json""fmt""github.com/nacos-group/nacos-sdk-go/clients""github.com/nacos-group/nacos-sdk-go/vo""github.com/nacos-group/nacos-sdk-go/common/constant""github.com/spf13/viper""go.uber.org/zap""mxshop_srvs/user_srv/global"
)func GetEnvInfo(env string) bool {viper.AutomaticEnv()var rs boolrs = viper.GetBool(env)return rsreturn true
}func InitConfig() {debug := GetEnvInfo("MXSHOP-DEBUG")zap.S().Info(fmt.Sprintf("------------", debug))configFileNamePrefix := "config"configFileName := fmt.Sprintf("user_srv/%s-pro.yaml", configFileNamePrefix)if debug {configFileName = fmt.Sprintf("user_srv/%s-debug.yaml", configFileNamePrefix)}v := viper.New()v.SetConfigFile(configFileName)if err := v.ReadInConfig(); err != nil {panic(err)}// 将配置文件进行解析if err := v.Unmarshal(&global.NacosConfig); err != nil {panic(err)}sc := []constant.ServerConfig{{IpAddr: global.NacosConfig.Host,Port:   global.NacosConfig.Port,},}cc := constant.ClientConfig{TimeoutMs:           5000,NamespaceId:         "2a856b71-60d2-44ff-8ff4-4ae698544724",CacheDir:            "tmp/nacos/cache",NotLoadCacheAtStart: true,LogDir:              "tmp/nacos/log",LogLevel:            "debug",}configClient, err := clients.CreateConfigClient(map[string]interface{}{"serverConfigs": sc,"clientConfig":  cc,})if err != nil {zap.S().Fatalf("%s", err.Error())}content, err := configClient.GetConfig(vo.ConfigParam{DataId: global.NacosConfig.Dataid,Group:  global.NacosConfig.Group,})if err != nil {zap.S().Fatalf("%s", err.Error())}err = configClient.ListenConfig(vo.ConfigParam{DataId: global.NacosConfig.Dataid,Group:  global.NacosConfig.Group,OnChange: func(namespace, group, dataId, data string) {fmt.Println("配置文件发生变化")fmt.Println("namespace: " + namespace)fmt.Println("group: " + group)fmt.Println("dataId: " + dataId)fmt.Println("data: " + data)},})if err != nil {zap.S().Fatalf("%s", err.Error())}err = json.Unmarshal([]byte(content), &global.ServerConfig)if err != nil {zap.S().Fatalf("%s", err.Error())}zap.S().Info(global.ServerConfig)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/427341.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

八股文-JVM

是什么&#xff1f;有什么用&#xff1f;谁发明的&#xff1f;什么时候发明的&#xff1f; Java虚拟机&#xff0c;用来运行Java程序&#xff0c;有很多个版本的虚拟机&#xff0c;比如HotSpot&#xff0c;最开始是SUN公司开发人员&#xff0c;和Java一起发布&#xff0c;现在…

通信工程学习:什么是PON无源光网络

PON&#xff1a;无源光网络 PON&#xff08;Passive Optical Network&#xff0c;无源光纤网络&#xff09;是一种采用光分路器等无源光器件进行信号传输和分配的光纤接入技术。它利用光纤作为传输媒介&#xff0c;通过无源设备将光信号从中心局&#xff08;如光线路终端OLT&am…

测试开发基础——软件测试中的bug

二、软件测试中的Bug 1. 软件测试的生命周期 软件测试贯穿于软件的整个生命周期 需求分析 测试计划 测试设计与开发 测试执行 测试评估 上线 运行维护 用户角度&#xff1a;软件需求是否合理 技术角度&#xff1a;技术上是否可行&#xff0c;是否还有优化空间 测试角度…

Android 15 正式发布至 AOSP

Google官方宣布&#xff0c;将于近期发布了 Android 15&#xff0c;而在早些时候&#xff0c;Google已经将其源代码推送至 Android 开源项目 (AOSP)。未来几周内&#xff0c;Android 15 将在受支持的 Pixel 设备上正式推出&#xff0c;并将于今年晚些时候在三星、Honor、iQOO、…

[vue2+axios]下载文件+文件下载为乱码

export function downloadKnowledage(parameter) {return axios({url: /knowledage/download,method: GET,params: parameter,responseType: blob}) }添加 responseType: blob’解决以下乱码现象 使用触发a标签下载文件 downloadKnowledage(data).then((res) > {let link …

高效处理NPE!!

相信不少小伙伴已经被java的NPE(Null Pointer Exception)所谓的空指针异常搞的头昏脑涨,有大佬说过“防止 NPE&#xff0c;是程序员的基本修养。”但是修养归修养&#xff0c;也是我们程序员最头疼的问题之一&#xff0c;那么我们今天就要尽可能的利用Java8的新特性 Optional来…

AI绘画与摄影新纪元:ChatGPT+Midjourney+文心一格 共绘梦幻世界

文章目录 一、AI艺术的新时代二、ChatGPT&#xff1a;创意的引擎与灵感的火花三、Midjourney&#xff1a;图像生成的魔法与技术的奇迹四、文心一格&#xff1a;艺术的升华与情感的共鸣五、融合创新&#xff1a;AI绘画与摄影实战的无限可能六、应用场景与实践案例AI艺术的美好未…

11 vue3之插槽全家桶

插槽就是子组件中的提供给父组件使用的一个占位符&#xff0c;用<slot></slot> 表示&#xff0c;父组件可以在这个占位符中填充任何模板代码&#xff0c;如 HTML、组件等&#xff0c;填充的内容会替换子组件的<slot></slot>标签。 匿名插槽 1.在子组…

一个场景是否可以同时选择CPU和GPU渲染

一个场景是否可以同时选择CPU和GPU渲染&#xff0c;主要取决于所使用的渲染软件及其支持的渲染引擎。在大多数情况下&#xff0c;现代渲染软件如3DMax等确实支持同时利用CPU和GPU进行渲染&#xff0c;以提高渲染效率和速度。 渲染软件的支持 以3DMax为例&#xff0c;它允许用…

xxl-job、Quartz、power-job、elastic-job对比选型

一、框架对比 1. Quartz 优点&#xff1a;稳定性和可扩展性好&#xff0c;适用于企业级应用&#xff1b;调度功能丰富&#xff0c;满足多种需求。 缺点&#xff1a;本身不提供原生的分布式支持&#xff0c;需要通过扩展或与其他组件结合来实现分布式任务调度&#xff1b;调度…

使用NotificationChannel实现后台视频上传

1、添加依赖 implementation net.gotev:uploadservice:4.8.0 implementation net.gotev:uploadservice-okhttp:4.8.02、在application中初始化服务&#xff1a; //初始化上传服务private fun initUploadService() {// 文件上传createNotificationChannel()//notificationChan…

算法里面的离散化

一、离散化&#xff08;discretization&#xff09;在算法和数据结构中指的是将连续的输入数据映射到离散的值或者范围&#xff0c;从而使得处理和计算变得更高效。通常用于处理大范围或者无限可能的输入&#xff0c;以便将其转化为有限的、可以有效处理的范围。 离散化的定义…

【深度学习】(3)--损失函数

文章目录 损失函数一、L1Loss损失函数1. 定义2. 优缺点3. 应用 二、NLLLoss损失函数1. 定义与原理2. 优点与注意3. 应用 三、MSELoss损失函数1. 定义与原理2. 优点与注意3. 应用 四、BCELoss损失函数1. 定义与原理2. 优点与注意3. 应用 五、CrossEntropyLoss损失函数1. 定义与原…

9.19总结

这几天学习了网络流 1&#xff0c;EK ek的主要思路是不断通过bfs找到增广路&#xff0c;找到增广路再建立反向边&#xff0c;直到不能再bfs到汇点&#xff0c;为什么可以通过建反向边呢&#xff1f;以上图举例&#xff0c;上图走完第一条增广路建立了一条反向边&#xff0c;当…

Maya动画基础

Maya动画基础教程&#xff08;完整&#xff09;_哔哩哔哩_bilibili 第一集 动画基础设置 altv播放动画 选择撕下副本 右键---播放预览 第二集 k帧记录物体的空间信息 初始位置清零 删除历史记录 s键key帧 自动记录位置信息 删除帧&#xff0c;按住右键选择delete 按shif…

Python if 语句优化技巧

大家好&#xff01;今天我们来聊聊Python中的if语句优化技巧。if语句是Python中最基本的控制结构之一&#xff0c;它用于根据条件执行不同的代码块。虽然if语句本身非常简单&#xff0c;但通过一些小技巧&#xff0c;可以让我们的代码更加高效、简洁。接下来&#xff0c;我们将…

LeetCode 算法笔记-第 04 章 基础算法篇

1.枚举 采用枚举算法解题的一般思路如下&#xff1a; 确定枚举对象、枚举范围和判断条件&#xff0c;并判断条件设立的正确性。一一枚举可能的情况&#xff0c;并验证是否是问题的解。考虑提高枚举算法的效率。 我们可以从下面几个方面考虑提高算法的效率&#xff1a; 抓住…

js中两种异步方式:async+await以及then

第一种方式 第二种方式 完整代码 前端代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>pywebv…

重学SpringBoot3-SpringApplicationRunListener

更多SpringBoot3内容请关注我的专栏&#xff1a;《SpringBoot3》 期待您的点赞&#x1f44d;收藏⭐评论✍ 重学SpringBoot3-SpringApplicationRunListener 1. 基本作用2. 如何实现2.1. 创建SpringApplicationRunListener2.2. 注册SpringApplicationRunListener2.3. 完整示例 3.…

【机器学习】经典数据集鸢尾花的分类识别

【机器学习】经典数据集鸢尾花的分类识别 1、数据集介绍1.1 数据集详情 2、实验内容2.1 准备数据集2.2 创建颜色映射对象2.3 绘制特征散点图2.4 数据的归一化2.5 数据的标准化 3、实验截图提取萼片长度与萼片宽度分类提取萼片长度与花瓣长度分类提取萼片长度与花瓣宽度分类提取…