Nacos 进阶篇---集群:选举心跳健康检查劳动者(九)

一、引言

   本章将是我们第二阶段,开始学习集群模式下,Nacos 是怎么去操作的 ?

本章重点:

  • 在Nacos服务端当中,会去开启健康心跳检查定时任务。如果是在Nacos集群下,大家思考一下,有没有必要所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?大家可以思考一下~
  • 既然Nacos有健康心跳检查定时任务,如果微服务健康实例状态发生了改变,这个时候Nacos是怎么把健康实例同步给其他Nacos 集群节点的 ?代码怎么实现的 ?

带着这些问题我们一起往下看吧 ~

二、目录 

目录

          一、引言

二、目录 

三、集群心跳健康检查架构分析

四、集群心跳健康检查选举源码分析

五、集群实例健康状态同步源码分析

六、本章总结


三、集群心跳健康检查架构分析

我们先来分析第一问题。 在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务?还是只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 ?

  • 如果是在Nacos集群下,所有的集群实例都去执行开启健康心跳检查定时任务。那么就会出现跑出来结果不一致的问题,那么以哪个集群实例结果为准呢 ?很明显这种方式很不合理。
  • 那么就是第二种方式的了,只有当中一个实例去执行健康心跳检查定时任务,然后把结果同步给其他集群实例的节点 。

第二种方式明显更加靠谱点,逻辑也更加简洁。在Nacos集群当中也是这么做的,所有集群实例都会开启健康心跳检查任务,但是真正执行健康心跳任务检查逻辑的只有一个实例,在执行完成后。会有一个定时任务,把结果同步给其他集群节点

那我们接下来看看源码当中,Nacos 是怎么去实现的~

四、集群心跳健康检查选举源码分析

既然是 ” 心跳健康检查 “ ,我们还是要看服务端实例注册接口中的 ClientBeatCheckTask 任务:

那我们直接看 ClientBeatCheckTask 当中的 run 方法,一开始有两个 if 判断方法:

// 集群下,判断自身节点是否需要执行心跳健康检查任务,如果不需要,直接 return
if (!getDistroMapper().responsible(service.getName())) {return;
}// 判断是否需要开启健康任务检查,默认为: true
if (!getSwitchDomain().isHealthCheckEnabled()) {return;
}

那么集群下是如何保证只有一台节点去执行定时任务的,关键点就在于第一个判断当中 responsible方法,那我们具体来看下代码:

public boolean responsible(String serviceName) {// 获取集群节点的数量final List<String> servers = healthyList;// 如果为单机模式,就直接返回为 trueif (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}// 没有可用健康集群的节点,就直接返回 falseif (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}// 把 serviceName 的进行 hash操作,然后和 servers.size() 取模,最终只有一个集群节点能够返回 trueint target = distroHash(serviceName) % servers.size();return target >= index && target <= lastIndex;
}

通过这个方法我们可以得知,在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?

我们一起来往下接着看~

五、集群实例健康状态同步源码分析

本节重点:在Nacos集群下,只会有一个节点去执行定时任务。那么该节点定时执行完,怎么把结果同步给其他集群节点的呢 ?

在 ServiceManager 类中,init() 方法被 @PostConstruct 注解修饰,在Spring 创建 Bean的时候,会去执行 init()方法。在这个方法当中,会去开启心跳健康检查同步的定时任务,我们一起来看下~

@PostConstruct
public void init() {// 同步心跳健康检查异结果异步任务GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);// 处理 同步心跳健康检查异结果异步任务  内存队列 + 异步任务GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());// 省略部分代码
}

那我们先来看下 同步心跳健康检查异结果异步任务代码,ServiceReporter 当中的 run() 方法:

我们可以把这块代码分成三个部分,这样看更容易理解:

第一部分:获取当前所有服务,key:命名空间 value:服务名称

// 获取全部服务,key:命名空间   value:服务名称
Map<String, Set<String>> allServiceNames = getAllServiceNames();if (allServiceNames.size() <= 0) {//ignorereturn;
}

第二部分:遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果

// 遍历 allServiceNames 中的每一个命名空间 ,封装请求参数 ,准备同步健康心跳检查结果
for (String namespaceId : allServiceNames.keySet()) {ServiceChecksum checksum = new ServiceChecksum(namespaceId);// 遍历每一个命名空间对应 serviceName 服务名称for (String serviceName : allServiceNames.get(namespaceId)) {if (!distroMapper.responsible(serviceName)) {continue;}Service service = getService(namespaceId, serviceName);if (service == null || service.isEmpty()) {continue;}service.recalculateChecksum();// 添加请求参数checksum.addItem(serviceName, service.getChecksum());}// 封装 Message 对象数据,把请求对象转换成JSONMessage msg = new Message();msg.setData(JacksonUtils.toJson(checksum));Collection<Member> sameSiteServers = memberManager.allMembers();if (sameSiteServers == null || sameSiteServers.size() <= 0) {return;}

第三部分:同步结果到其他集群节点

for (Member server : sameSiteServers) {// 判断是否是当前集群的节点,如果是就跳过if (server.getAddress().equals(NetUtils.localServer())) {continue;}// 重点:同步其他集群节点synchronizer.send(server.getAddress(), msg);
}

在 synchronizer.send(server.getAddress(), msg); 这个方法当中,会通过HTTP 方式给其他集群节点同步心跳任务健康检查结果:

@Override
public void send(final String serverIP, Message msg) {if (serverIP == null) {return;}// 创建请求参数Map<String, String> params = new HashMap<String, String>(10);params.put("statuses", msg.getData());params.put("clientIP", NetUtils.localServer());// 拼接 url 地址String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";if (IPUtil.containsPort(serverIP)) {url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ "/service/status";}try {// 异步发送 http 请求,请求地址:http://ip/v1/ns/service/status , 同步心跳健康检查结果HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {// 代码省略});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);}}

通过代码可以得知,最终也是通过 HTTP 的方式来进行数据同步的,也能够看出请求地址是v1/ns/service/status。接下来我们一起来看下请求地址对应的接口代码逻辑,其实代码很好找,看下图:

这块代码就不细讲了,主要逻辑就是 判断服务状态是否有变动 ,有变动的话就 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中。

代码如下:

public void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {lock.lock();try {// 包装 ServiceKey 对象, 放入到 toBeUpdatedServicesQueue 阻塞队列当中toBeUpdatedServicesQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);} catch (Exception e) {toBeUpdatedServicesQueue.poll();toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);} finally {lock.unlock();}
}

我们刚刚分析的在 ServiceManager类中的 init 方法(代码如下),第一个线程任务就是同步心跳健康检查结果的异步任务,那么我们接下来分析第二个线程任务。

@PostConstruct
public void init() {// 同步心跳健康检查异结果异步任务GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);// 处理 同步心跳健康检查异结果异步任务  内存队列 + 异步任务GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());// 省略部分代码
}

第二个线程任务类是:UpdatedServiceProcessor,我们从run 方法中(代码如下),能够看出是一个 while 循环,并且是没有结束条件的。在循环的逻辑当中,会从toBeUpdatedServicesQueue阻塞队列中一直取任务,取到任务之后,又是提交了一个线程池任务。

@Override
public void run() {ServiceKey serviceKey = null;try {while (true) {try {// 从阻塞队列当中一直获取任务serviceKey = toBeUpdatedServicesQueue.take();} catch (Exception e) {Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");}if (serviceKey == null) {continue;}// 把任务提交到线程池执行GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));}} catch (Exception e) {Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);}
}

那我们接着看 ServiceUpdater 当中的 run 方法,代码如下:

@Override
public void run() {try {// 调用更改健康状态方法updatedHealthStatus(namespaceId, serviceName, serverIP);} catch (Exception e) {Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,serverIP, e);}
}public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));// 解析参数JsonNode serviceJson = JacksonUtils.toObj(msg.getData());ArrayNode ipList = (ArrayNode) serviceJson.get("ips");Map<String, String> ipsMap = new HashMap<>(ipList.size());for (int i = 0; i < ipList.size(); i++) {String ip = ipList.get(i).asText();String[] strings = ip.split("_");ipsMap.put(strings[0], strings[1]);}Service service = getService(namespaceId, serviceName);if (service == null) {return;}// 是否改变标识boolean changed = false;// 遍历全部实例信息,更新健康状态List<Instance> instances = service.allIPs();for (Instance instance : instances) {boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));if (valid != instance.isHealthy()) {changed = true;instance.setHealthy(valid);Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,(instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),instance.getClusterName());}}// 如果实例健康状态改变了,那么就发布 服务改变事件,使用 upd 的方式通知客户端if (changed) {pushService.serviceChanged(service);if (Loggers.EVT_LOG.isDebugEnabled()) {StringBuilder stringBuilder = new StringBuilder();List<Instance> allIps = service.allIPs();for (Instance instance : allIps) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),service.getName(), stringBuilder.toString());}}}

在上面代码中,注意是先解析我们的 msg.getData()参数,然后获取注册表中全部的 Instance 实例列,进行遍历,在健康状态有变动的情况下,会直接更改它的 healthy 属性。在方法的最后,如果有更新 healthy属性的情况下,最终也会发布服务改变事件来通知客户端进行更新。

六、本章总结

在本章节我们首先知道了,在Nacos集群下,是只有一个集群节点去执行心跳健康检查定时任务的,然后把结果同步给其他集群的节点。那么是怎么同步给其他集群节点的呢 ?

在Nacos 服务端是有一个定时任务,来和其他集群节点进行数据同步的。通过源码分析,我们知道最终也是通过 HTTP 的方式进行同步的,采用了 异步任务 + 阻塞队列的方式 的设计架构。这样的好处就是快,先把任务都给接受放入到阻塞队列当中,就立马返回。然后后台会开启一条线程不断从阻塞队列当中获取任务进行处理。

把本章流程图补充完整:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/372228.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL存储过程创建

DQL call create_order_infos(7,2,3); delimiter $$ CREATE PROCEDURE create_order_infos( in in_user_id int, in in_product_id int, in in_count int ) BEGIN -- 业务逻辑 SELECT in_user_id 用户id,in_product_id 产品id,in_count 购买数量; end $$ delimiter ; 结果 c…

7.8洛谷 字符串

P5650 基础字符串练习题 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路 如果 S[i] 0&#xff0c;则 dp[i] max(dp[i-1] 1, 1)&#xff08;因为增加了 0&#xff0c;减少了 1&#xff09;。如果 S[i] 1&#xff0c;则 dp[i] max(dp[i-1] - 1, -1)&#xff08;因为减…

九浅一深Jemalloc5.3.0 -- ④浅*配置

目前市面上有不少分析Jemalloc老版本的博文&#xff0c;但最新版本5.3.0却少之又少。而且5.3.0的架构与5之前的版本有较大不同&#xff0c;本着“与时俱进”、“由浅入深”的宗旨&#xff0c;我将逐步分析最新release版本Jemalloc5.3.0的实现。 另外&#xff0c;单讲实现代码是…

这几类人,千万不要买纯电车

文 | AUTO芯球 作者 | 响铃 纯电车的冤大头真是太多了&#xff0c; 我之前劝过&#xff0c;有些人不适合买纯电车&#xff0c; 你们看&#xff0c;果然吧&#xff0c;麦卡锡最近的一份报告就披露了 去年啊&#xff0c;22%的人在买了电车后后悔了&#xff0c; 这些人说了&a…

亿康源精英盛宴暨亿康源启动成功举办

&#xff08;本台记者报&#xff09;2024年7月7日下午&#xff0c;亿康源精英盛宴暨启动仪式在杭州市中维歌德大酒店盛大举行。此次盛会不仅吸引了行业内的专业人才、著名投资界大咖和科技领域的杰出企业家&#xff0c;还汇聚了众多关注大健康产业的各界人士&#xff0c;共同见…

【qt】获取主机信息系统

话不多说,先一睹芳颜! 如果你也想达到这种效果,那咱们就开始吧! 目录 一.登录界面设计1.ui登录设计 二.加载界面1.lineEdit的密码输入模式2.lineEdit按回车跳转的信号3.密码的判断4.创建加载界面5.创建定时器来进行进度条的移动6.定时器执行的槽函数 三.主机信息界面1.主机信息…

HarmonyOS Next系列之Echarts图表组件(折线图、柱状图、饼图等)实现(八)

系列文章目录 HarmonyOS Next 系列之省市区弹窗选择器实现&#xff08;一&#xff09; HarmonyOS Next 系列之验证码输入组件实现&#xff08;二&#xff09; HarmonyOS Next 系列之底部标签栏TabBar实现&#xff08;三&#xff09; HarmonyOS Next 系列之HTTP请求封装和Token…

2-28 基于matlab提取出频域和时域信号的29个特征

基于matlab提取出频域和时域信号的29个特征&#xff0c;主运行文件feature_extraction&#xff0c;fre_statistical_compute和time_statistical_compute分别提取频域和时域的特征&#xff0c;生成的29个特征保存在生成的feature矩阵中。程序已调通&#xff0c;可直接运行。 2-2…

初学嵌入式是弄linux还是单片机?

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「单片机的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“666”之后私信回复“666”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;1、先入门了51先学了89c52…

绝地求生PUBG没有开始游戏按钮的解决办法

绝地求生是一款特别热门的战术竞技型射击类游戏&#xff0c;游戏中玩家需要在游戏地图上收集各种资源&#xff0c;并在不断缩小的安全区域内持武器对抗其他玩家&#xff0c;让自己生存到最后。当游戏最后场上只剩下一支队伍的时候即可获得游戏胜利。然而一些玩家在游玩绝地求生…

@react-google-maps/api实现谷歌地图中添加多边围栏,并可编辑,编辑后可获得围栏各个点的经纬度

先上一张效果图 看看是不是大家想要的效果&#xff5e; ❤️ 由于该功能微微复杂一点&#xff0c;为了让大家精准了解 我精简了一下地图代码 大家根据自己的需求将center值和paths&#xff0c;用setState做活就可以了 1.第一步要加入项目package.json中或者直接yarn install它…

Powershell 获取电脑保存的所有wifi密码

一. 知识点 netsh wlan show profiles 用于显示计算机上已保存的无线网络配置文件 Measure-Object 用于统计数量 [PSCustomObject]{ } 用于创建Powershell对象 [math]::Round 四舍五入 Write-Progress 显示进度条 二. 代码 只能获取中文Windows操作系统的wifi密码如果想获取…

论文解析——Full Stack Optimization of Transformer Inference: a Survey

作者及发刊详情 摘要 正文 主要工作贡献 这篇文章的贡献主要有两部分&#xff1a; 分析Transformer的特征&#xff0c;调查高效transformer推理的方法通过应用方法学展现一个DNN加速器生成器Gemmini的case研究 1&#xff09;分析和解析Transformer架构的运行时特性和瓶颈…

Java进阶----继承

继承 一.继承概述 继承是可以通过定义新的类&#xff0c;在已有类的基础上扩展属性和功能的一种技术. 案例&#xff1a;优化 猫、狗JavaBean类的设计 狗类&#xff1a;Dog 属性&#xff1a;名字 name&#xff0c;年龄 age 方法&#xff1a;看家 watchHome()&#xff0c;Gett…

防火墙基础及登录(华为)

目录 防火墙概述防火墙发展进程包过滤防火墙代理防火墙状态检测防火墙UTM下一代防火墙&#xff08;NGFW&#xff09; 防火墙分类按物理特性划分软件防火墙硬件防火墙 按性能划分百兆级别和千兆级别 按防火墙结构划分单一主机防火墙路由集成式防火墙分布式防火墙 华为防火墙利用…

命令行运行git reflog(reference log)报错的解决办法

文章目录 1. 检查 Git 是否已安装2. 检查 PATH 环境变量3. 重新安装 Git 在Git中&#xff0c; reflog的英文全称是 “ reference log”。意思是 引用日志&#xff08;参考日志&#xff09;。它记录了本地仓库中HEAD和分支引用所指向的提交的变更历史。这包括了你所有的提交&…

推荐3款免费电脑工具

Tools-Web Tools-Web是一个在线工具箱&#xff0c;提供丰富的工具和功能&#xff0c;适用于日常工作和学习。根据用户评价&#xff0c;Tools-Web的工具种类丰富且操作简单&#xff0c;是日常工作和学习的好帮手。该工具箱涵盖了开发运维、文本处理、图片处理、图表处理、随机工…

收银系统源码-收银台副屏广告

1. 功能描述 门店广告&#xff1a;双屏收银机&#xff0c;副屏广告&#xff0c;主屏和副屏同步&#xff0c;总部可统一控制广告位&#xff0c;也可以给门店开放权限&#xff0c;门店独立上传广告位&#xff1b; 2.适用场景 新店开业、门店周年庆、节假日门店活动宣传&#x…

2通道音频ADC解码芯片ES7243L、ES7243E、ES7243,用于低成本实现模拟麦克风转换为IIS数字话筒

前言&#xff1a; 音频解码芯片某创参考价格&#xff1a; ES7243L 500&#xff1a;&#xffe5;1.36 / 个 ES7243E 500&#xff1a;&#xffe5;1.66 / 个 ES7243 500&#xff1a; &#xffe5;1.91 / 个 其中ES7243L工作电压为1.8V&#xff0c;与其他两款的3.3V工作电压不同&…

九科bit-Worker RPA 内容学习

简介&#xff1a; 什么是RPA&#xff1f; RPA&#xff08;Robotic Process Automation&#xff0c;机器人流程自动化&#xff09;本质上是一种“AI数字员工”&#xff0c;针对企业中存在的大批量、重复性、机械化人工操作&#xff0c;通过模拟人的工作流程使之实现自动化。 b…