服务注册与发现
一、系统架构设计
让我们先通过流程图了解服务注册与发现的整体架构:
二、核心组件实现
1. 服务注册中心
package discoveryimport ("context""sync""time"
)// ServiceInstance 服务实例
type ServiceInstance struct {ID string `json:"id"`Name string `json:"name"`Version string `json:"version"`Endpoints []string `json:"endpoints"`Metadata map[string]string `json:"metadata"`Status string `json:"status"`LastHeartbeat time.Time `json:"last_heartbeat"`
}// Registry 注册中心
type Registry struct {services map[string]map[string]*ServiceInstance // serviceName -> instanceID -> instancemutex sync.RWMutexwatchers map[string][]chan *ServiceInstance
}// NewRegistry 创建注册中心
func NewRegistry() *Registry {r := &Registry{services: make(map[string]map[string]*ServiceInstance),watchers: make(map[string][]chan *ServiceInstance),}go r.checkHealth()return r
}// Register 注册服务实例
func (r *Registry) Register(ctx context.Context, instance *ServiceInstance) error {r.mutex.Lock()defer r.mutex.Unlock()if _, exists := r.services[instance.Name]; !exists {r.services[instance.Name] = make(map[string]*ServiceInstance)}instance.LastHeartbeat = time.Now()instance.Status = "UP"r.services[instance.Name][instance.ID] = instance// 通知观察者r.notifyWatchers(instance.Name, instance)return nil
}// Deregister 注销服务实例
func (r *Registry) Deregister(ctx context.Context, serviceName, instanceID string) error {r.mutex.Lock()defer r.mutex.Unlock()if instances, exists := r.services[serviceName]; exists {if instance, ok := instances[instanceID]; ok {instance.Status = "DOWN"delete(instances, instanceID)// 如果服务没有实例了,删除服务if len(instances) == 0 {delete(r.services, serviceName)}return nil}}return nil
}// GetService 获取服务实例列表
func (r *Registry) GetService(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {r.mutex.RLock()defer r.mutex.RUnlock()if instances, exists := r.services[serviceName]; exists {result := make([]*ServiceInstance, 0, len(instances))for _, instance := range instances {if instance.Status == "UP" {result = append(result, instance)}}return result, nil}return nil, nil
}// Watch 监听服务变化
func (r *Registry) Watch(ctx context.Context, serviceName string) (<-chan *ServiceInstance, error) {r.mutex.Lock()defer r.mutex.Unlock()ch := make(chan *ServiceInstance, 10)if _, exists := r.watchers[serviceName]; !exists {r.watchers[serviceName] = make([]chan *ServiceInstance, 0)}r.watchers[serviceName] = append(r.watchers[serviceName], ch)return ch, nil
}// Heartbeat 服务心跳
func (r *Registry) Heartbeat(ctx context.Context, serviceName, instanceID string) error {r.mutex.Lock()defer r.mutex.Unlock()if instances, exists := r.services[serviceName]; exists {if instance, ok := instances[instanceID]; ok {instance.LastHeartbeat = time.Now()instance.Status = "UP"return nil}}return nil
}// checkHealth 健康检查
func (r *Registry) checkHealth() {ticker := time.NewTicker(10 * time.Second)for range ticker.C {r.mutex.Lock()now := time.Now()for serviceName, instances := range r.services {for id, instance := range instances {if now.Sub(instance.LastHeartbeat) > 30*time.Second {instance.Status = "DOWN"delete(instances, id)if len(instances) == 0 {delete(r.services, serviceName)}}}}r.mutex.Unlock()}
}// notifyWatchers 通知观察者
func (r *Registry) notifyWatchers(serviceName string, instance *ServiceInstance) {if watchers, exists := r.watchers[serviceName]; exists {for _, ch := range watchers {select {case ch <- instance:default:// 如果channel已满,跳过}}}
}
2. 服务发现实现
package discoveryimport ("context""errors""sync""time"
)// ServiceDiscovery 服务发现
type ServiceDiscovery struct {registry *RegistrylocalCache map[string][]*ServiceInstancecacheMutex sync.RWMutexupdateInterval time.Duration
}// NewServiceDiscovery 创建服务发现实例
func NewServiceDiscovery(registry *Registry, updateInterval time.Duration) *ServiceDiscovery {sd := &ServiceDiscovery{registry: registry,localCache: make(map[string][]*ServiceInstance),updateInterval: updateInterval,}go sd.startCacheUpdate()return sd
}// GetService 获取服务实例
func (sd *ServiceDiscovery) GetService(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {// 首先检查本地缓存sd.cacheMutex.RLock()if instances, exists := sd.localCache[serviceName]; exists && len(instances) > 0 {sd.cacheMutex.RUnlock()return instances, nil}sd.cacheMutex.RUnlock()// 如果本地缓存没有,从注册中心获取instances, err := sd.registry.GetService(ctx, serviceName)if err != nil {return nil, err}// 更新本地缓存if len(instances) > 0 {sd.cacheMutex.Lock()sd.localCache[serviceName] = instancessd.cacheMutex.Unlock()}return instances, nil
}// startCacheUpdate 开始缓存更新
func (sd *ServiceDiscovery) startCacheUpdate() {ticker := time.NewTicker(sd.updateInterval)for range ticker.C {sd.updateCache()}
}// updateCache 更新缓存
func (sd *ServiceDiscovery) updateCache() {sd.cacheMutex.Lock()defer sd.cacheMutex.Unlock()ctx := context.Background()for serviceName := range sd.localCache {instances, err := sd.registry.GetService(ctx, serviceName)if err == nil && len(instances) > 0 {sd.localCache[serviceName] = instances}}
}// WatchService 监听服务变化
func (sd *ServiceDiscovery) WatchService(ctx context.Context, serviceName string) error {instanceCh, err := sd.registry.Watch(ctx, serviceName)if err != nil {return err}go func() {for {select {case instance := <-instanceCh:sd.handleInstanceUpdate(serviceName, instance)case <-ctx.Done():return}}}()return nil
}// handleInstanceUpdate 处理实例更新
func (sd *ServiceDiscovery) handleInstanceUpdate(serviceName string, instance *ServiceInstance) {sd.cacheMutex.Lock()defer sd.cacheMutex.Unlock()instances := sd.localCache[serviceName]updated := false// 更新或添加实例for i, inst := range instances {if inst.ID == instance.ID {instances[i] = instanceupdated = truebreak}}if !updated {instances = append(instances, instance)}// 过滤掉已下线的实例activeInstances := make([]*ServiceInstance, 0)for _, inst := range instances {if inst.Status == "UP" {activeInstances = append(activeInstances, inst)}}sd.localCache[serviceName] = activeInstances
}
3. 负载均衡实现
package discoveryimport ("context""errors""math/rand""sync""sync/atomic"
)// LoadBalancer 负载均衡接口
type LoadBalancer interface {Select(ctx context.Context, instances []*ServiceInstance) (*ServiceInstance, error)
}// RoundRobinLoadBalancer 轮询负载均衡器
type RoundRobinLoadBalancer struct {counter uint64
}// NewRoundRobinLoadBalancer 创建轮询负载均衡器
func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer {return &RoundRobinLoadBalancer{}
}// Select 选择服务实例
func (lb *RoundRobinLoadBalancer) Select(ctx context.Context, instances []*ServiceInstance) (*ServiceInstance, error) {if len(instances) == 0 {return nil, errors.New("no available instances")}count := atomic.AddUint64(&lb.counter, 1)return instances[int(count)%len(instances)], nil
}// RandomLoadBalancer 随机负载均衡器
type RandomLoadBalancer struct {}// NewRandomLoadBalancer 创建随机负载均衡器
func NewRandomLoadBalancer() *RandomLoadBalancer {return &RandomLoadBalancer{}
}// Select 选择服务实例
func (lb *RandomLoadBalancer) Select(ctx context.Context, instances []*ServiceInstance) (*ServiceInstance, error) {if len(instances) == 0 {return nil, errors.New("no available instances")}return instances[rand.Intn(len(instances))], nil
}// WeightedRoundRobinLoadBalancer 加权轮询负载均衡器
type WeightedRoundRobinLoadBalancer struct {mutex sync.Mutexweights map[string]intcurrent map[string]int
}// NewWeightedRoundRobinLoadBalancer 创建加权轮询负载均衡器
func NewWeightedRoundRobinLoadBalancer() *WeightedRoundRobinLoadBalancer {return &WeightedRoundRobinLoadBalancer{weights: make(map[string]int),current: make(map[string]int),}
}// UpdateWeight 更新权重
func (lb *WeightedRoundRobinLoadBalancer) UpdateWeight(instanceID string, weight int) {lb.mutex.Lock()defer lb.mutex.Unlock()lb.weights[instanceID] = weight
}// Select 选择服务实例
func (lb *WeightedRoundRobinLoadBalancer) Select(ctx context.Context, instances []*ServiceInstance) (*ServiceInstance, error) {if len(instances) == 0 {return nil, errors.New("no available instances")}lb.mutex.Lock()defer lb.mutex.Unlock()var total intfor _, instance := range instances {weight := lb.weights[instance.ID]if weight <= 0 {weight = 1}lb.current[instance.ID] += weighttotal += lb.current[instance.ID]}max := 0var selected *ServiceInstancefor _, instance := range instances {current := lb.current[instance.ID]if current > max {max = currentselected = instance}}// 更新current值if selected != nil {weight := lb.weights[selected.ID]if weight <= 0 {weight = 1}lb.current[selected.ID] -= totalreturn selected, nil}return nil, errors.New("failed to select instance")
}// ConsistentHashLoadBalancer 一致性哈希负载均衡器
type ConsistentHashLoadBalancer struct {hashRing *ConsistentHashmutex sync.RWMutex
}// NewConsistentHashLoadBalancer 创建一致性哈希负载均衡器
func NewConsistentHashLoadBalancer(replicas int) *ConsistentHashLoadBalancer {return &ConsistentHashLoadBalancer{hashRing: NewConsistentHash(replicas),}
}// Select 选择服务实例
func (lb *ConsistentHashLoadBalancer) Select(ctx context.Context, instances []*ServiceInstance, key string) (*ServiceInstance, error) {if len(instances) == 0 {return nil, errors.New("no available instances")}lb.mutex.Lock()// 更新哈希环lb.hashRing.Clear()for _, instance := range instances {lb.hashRing.Add(instance.ID)}lb.mutex.Unlock()// 根据key选择节点targetID := lb.hashRing.Get(key)for _, instance := range instances {if instance.ID == targetID {return instance, nil}}return nil, errors.New("failed to select instance")
}// LoadBalancerClient 负载均衡客户端
type LoadBalancerClient struct {discovery *ServiceDiscoverybalancer LoadBalancer
}// NewLoadBalancerClient 创建负载均衡客户端
func NewLoadBalancerClient(discovery *ServiceDiscovery, balancer LoadBalancer) *LoadBalancerClient {return &LoadBalancerClient{discovery: discovery,balancer: balancer,}
}// Call 调用服务
func (c *LoadBalancerClient) Call(ctx context.Context, serviceName string) (*ServiceInstance, error) {// 获取服务实例列表instances, err := c.discovery.GetService(ctx, serviceName)if err != nil {return nil, err}// 选择实例return c.balancer.Select(ctx, instances)
}
三、健康检查实现
package discoveryimport ("context""fmt""net/http""sync""time"
)// HealthChecker 健康检查器
type HealthChecker struct {registry *RegistrycheckInterval time.Durationtimeout time.Durationendpoints map[string]string // instanceID -> health check endpointmutex sync.RWMutex
}// NewHealthChecker 创建健康检查器
func NewHealthChecker(registry *Registry, checkInterval, timeout time.Duration) *HealthChecker {checker := &HealthChecker{registry: registry,checkInterval: checkInterval,timeout: timeout,endpoints: make(map[string]string),}go checker.start()return checker
}// RegisterEndpoint 注册健康检查端点
func (hc *HealthChecker) RegisterEndpoint(instanceID, endpoint string) {hc.mutex.Lock()defer hc.mutex.Unlock()hc.endpoints[instanceID] = endpoint
}// UnregisterEndpoint 注销健康检查端点
func (hc *HealthChecker) UnregisterEndpoint(instanceID string) {hc.mutex.Lock()defer hc.mutex.Unlock()delete(hc.endpoints, instanceID)
}// start 启动健康检查
func (hc *HealthChecker) start() {ticker := time.NewTicker(hc.checkInterval)defer ticker.Stop()for range ticker.C {hc.checkAll()}
}// checkAll 检查所有实例
func (hc *HealthChecker) checkAll() {hc.mutex.RLock()endpoints := make(map[string]string)for id, endpoint := range hc.endpoints {endpoints[id] = endpoint}hc.mutex.RUnlock()var wg sync.WaitGroupfor instanceID, endpoint := range endpoints {wg.Add(1)go func(id, ep string) {defer wg.Done()hc.checkInstance(id, ep)}(instanceID, endpoint)}wg.Wait()
}// checkInstance 检查单个实例
func (hc *HealthChecker) checkInstance(instanceID, endpoint string) {ctx, cancel := context.WithTimeout(context.Background(), hc.timeout)defer cancel()status := hc.doHealthCheck(ctx, endpoint)// 更新实例状态if status {hc.registry.Heartbeat(ctx, "", instanceID) // 服务名为空,因为我们只需要instanceID}
}// doHealthCheck 执行健康检查
func (hc *HealthChecker) doHealthCheck(ctx context.Context, endpoint string) bool {req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)if err != nil {return false}client := &http.Client{Timeout: hc.timeout,}resp, err := client.Do(req)if err != nil {return false}defer resp.Body.Close()return resp.StatusCode == http.StatusOK
}// CustomHealthCheck 自定义健康检查接口
type CustomHealthCheck interface {Check(ctx context.Context) (bool, error)
}// CustomHealthChecker 自定义健康检查器
type CustomHealthChecker struct {checker CustomHealthCheckmutex sync.RWMutex
}// NewCustomHealthChecker 创建自定义健康检查器
func NewCustomHealthChecker(checker CustomHealthCheck) *CustomHealthChecker {return &CustomHealthChecker{checker: checker,}
}// Check 执行健康检查
func (chc *CustomHealthChecker) Check(ctx context.Context) (bool, error) {chc.mutex.RLock()defer chc.mutex.RUnlock()return chc.checker.Check(ctx)
}// HealthCheckServer 健康检查服务器
type HealthCheckServer struct {port inthandler http.Handlerserver *http.Server
}// NewHealthCheckServer 创建健康检查服务器
func NewHealthCheckServer(port int, handler http.Handler) *HealthCheckServer {return &HealthCheckServer{port: port,handler: handler,}
}// Start 启动服务器
func (s *HealthCheckServer) Start() error {s.server = &http.Server{Addr: fmt.Sprintf(":%d", s.port),Handler: s.handler,}return s.server.ListenAndServe()
}// Stop 停止服务器
func (s *HealthCheckServer) Stop(ctx context.Context) error {return s.server.Shutdown(ctx)
}
四、系统配置和策略
1. 服务注册配置
配置项 | 说明 | 默认值 | 推荐值 |
---|---|---|---|
心跳间隔 | 服务实例向注册中心发送心跳的间隔 | 10s | 5-15s |
注册超时 | 服务注册的超时时间 | 5s | 3-10s |
实例缓存 | 本地缓存服务实例信息的时间 | 60s | 30-120s |
重试次数 | 注册失败时的重试次数 | 3 | 3-5 |
2. 负载均衡策略对比
策略 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
轮询 | 实现简单,请求分布均匀 | 不考虑服务器能力差异 | 服务器性能相近 |
随机 | 实现简单,负载较均衡 | 可能造成不均衡 | 大规模集群 |
加权轮询 | 考虑服务器能力差异 | 权重设置需要经验 | 服务器性能差异大 |
一致性哈希 | 请求分布稳定 | 实现复杂 | 需要会话保持 |
五、服务注册流程图
六、最佳实践建议
1. 服务注册
- 合理设置心跳间隔
- 实现优雅关闭
- 添加重试机制
- 使用服务分组
2. 服务发现
- 使用本地缓存
- 实现故障转移
- 监控服务健康
- 定期清理无效实例
3. 负载均衡
- 选择合适的策略
- 动态调整权重
- 实现熔断机制
- 监控服务质量
七、常见问题处理
1. 注册中心故障
- 问题描述:注册中心不可用
- 解决方案:
- 使用注册中心集群
- 本地缓存备份
- 定期同步数据
- 自动故障转移
2. 服务实例异常
- 问题描述:服务实例不稳定
- 解决方案:
- 实现健康检查
- 自动下线机制
- 故障恢复策略
- 监控告警系统
3. 网络分区
- 问题描述:网络故障导致分区
- 解决方案:
- 多数据中心部署
- 网络状态监控
- 自动切换机制
- 数据一致性保证
八、监控指标
1. 关键指标
- 服务注册延迟
- 服务发现延迟
- 心跳成功率
- 服务可用性
- 负载均衡效果
- 健康检查成功率
2. 告警阈值
- 服务注册延迟 > 3s
- 服务发现延迟 > 1s
- 心跳成功率 < 95%
- 服务可用性 < 99.9%
- 负载不均衡度 > 20%
- 健康检查失败率 > 5%
九、扩展建议
-
开发阶段:
- 完善的单元测试
- 模拟各种故障场景
- 性能压力测试
- 可观测性设计
-
运维阶段:
- 监控系统部署
- 告警规则配置
- 故障演练
- 容量规划
-
优化方向:
- 性能优化
- 可靠性提升
- 可扩展性增强
- 运维自动化
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!