SpringCloud源码:客户端分析(二)- 客户端源码分析

6f3ceaf83f7d086cdaa12f3d37927247.jpeg


背景

我们继续分析EurekaClient的两个自动化配置类:

自动化配置类
功能职责
EurekaClientAutoConfiguration配置EurekaClient确保了Eureka客户端能够正确地:
- 注册到Eureka服务端
- 周期性地发送心跳信息来更新服务租约
- 下线时通知Eureka服务端
- 获取服务实例列表;

更侧重于Eureka客户端的基本配置和功能实现
EurekaDiscoveryClientConfiguration配置EurekaDiscoveryClient创建RefreshScopeRefreshedEvent事件的监听类,用于重启注册;
更多地涉及到服务的自动注册、健康检查以及事件处理等方面

CloudEurekaClient分析

原理

客户端本质就是4个动作:

  1. 获取服务列表

  2. 注册服务实例

  3. 租约续约

  4. 取消租约

源码

让我们继续关注 第一个自动装配类 EurekaClientAutoConfiguration 对CloudEurekaClient 的构造封装,即如下代码块:

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
}

分析代码:

  • CloudEurekaClient对象,并交给容器管理bean

CloudEurekaClient    

public class CloudEurekaClient extends DiscoveryClient {public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,ApplicationEventPublisher publisher) {super(applicationInfoManager, config, args);this.applicationInfoManager = applicationInfoManager;this.publisher = publisher;this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,"eurekaTransport");ReflectionUtils.makeAccessible(this.eurekaTransportField);}
}

分析代码:

  • 实际上CloudEurekaClient调用了父类DiscoveryClient的构造器

DiscoveryClient

经历了多个重载构造器的嵌套,我们进入了最终的构造器:

private final ScheduledExecutorService scheduler;
// additional executors for supervised subtasks
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {// .... 一些初始化工作logger.info("Initializing Eureka in region {}", clientConfig.getRegion());try {// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());  // use direct handoffcacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());  // use direct handoffeurekaTransport = new EurekaTransport();scheduleServerEndpointTask(eurekaTransport, args);AzToRegionMapper azToRegionMapper;if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);} else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);}if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));}instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);}// .......if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");}} catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());throw new IllegalStateException(th);}}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetchinitScheduledTasks();// ...其他初始化工作
}

代码分析:

  • 这里初始化了3个异步线程池:scheduler、heartbeatExecutor、cacheRefreshExecutor

    • scheduler:coreSize=2的周期任务线程池,线程名命名是DiscoveryClient-%s

    • heartbeatExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-HeartbeatExecutor-%d

    • cacheRefreshExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-CacheRefreshExecutor-%d

  • 这三个线程池,是怎么配合工作的呢?不着急,慢慢往下看

initScheduledTasks()的代码如下:
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();cacheRefreshTask = new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());// 【1】scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerheartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());// 【2】scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);} else {logger.info("Not registering with Eureka server per configuration");}
}

分析代码:

  • 【1】检查是否需要获取注册表信息(配置项eureka.client.fetchRegistry默认=true)

    • 用注入的异步线程池cacheRefreshExecutor,按指定时间间隔registryFetchIntervalSeconds,去执行CacheRefreshThread,即缓存刷新refreshRegistry()任务

    • 缓存刷新任务cacheRefreshTask

    • 使用调度器 scheduler 安排任务

  • 【2】检查是否需要注册入Eureka(配置项eureka.client.registerWithEureka默认=true)

    • 用注入的异步线程池heartbeatExecutor,按指定时间间隔renewalIntervalInSecs,去执行HeartbeatThread,即执行续租renew()任务

    • 心跳续租任务heartbeatTask

    • 使用调度器 scheduler 安排任务

CacheRefreshThread - 缓存刷新
class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();}
}@VisibleForTesting
void refreshRegistry() {try {//.....//【1】刷新本地注册服务列表boolean success = fetchRegistry(remoteRegionsModified);//.....} catch (Throwable e) {logger.error("Cannot fetch registry from server", e);}
}private boolean fetchRegistry(boolean forceFullRegistryFetch) {try {// 【2】获取本地localRegionApps的服务列表Applications applications = getApplications();// 【3】获取远程数据并更新服务列表getAndUpdateDelta(applications);}// registry was fetched successfully, so return truereturn true;
}private void getAndUpdateDelta(Applications applications) throws Throwable {// .....//【4】检查缓存delta的服务注册列表Applications delta = null;if (delta == null) {// 【4.1】如果缓存为空,就再去拉取一次EurekaServer的数据getAndStoreFullRegistry();} else {if (fetchRegistryUpdateLock.tryLock()) {try {//【5】获取EurekaServer最新的服务注册表,并执行delta更新getAndStoreFullRegistry()updateDelta(delta);} finally {fetchRegistryUpdateLock.unlock();}}    }
} private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications apps = null;EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}
}private void updateDelta(Applications delta) {int deltaCount = 0;//【6】遍历服务注册列表的每个appfor (Application app : delta.getRegisteredApplications()) {//【7】遍历每个服务的所有实例instancefor (InstanceInfo instance : app.getInstances()) {//【8】获取本地cache的服务注册信息Applications applications = getApplications();String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {Applications remoteApps = remoteRegionVsApps.get(instanceRegion);if (null == remoteApps) {remoteApps = new Applications();remoteRegionVsApps.put(instanceRegion, remoteApps);}applications = remoteApps;}++deltaCount;//【9】如果实例是新增的类型if (ActionType.ADDED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {//【10】执行实例添加applications.addApplication(app);}applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} //【11】如果实例是修改的类型else if (ActionType.MODIFIED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {//【12】没有已有实例,执行添加操作applications.addApplication(app);}//【13】存在已有实例,则注册新的实例信息applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} //【14】如果实例是删除的类型else if (ActionType.DELETED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp != null) {//【15】删除这个服务的实例existingApp.removeInstance(instance);//【16】如果这个服务的实例数量=0,则直接删除服务信息appif (existingApp.getInstancesAsIsFromEureka().isEmpty()) {applications.removeApplication(existingApp);}}}}}
}

代码分析:见下面流程图

34a3a1ce9a8b23a83da835f99ab9195e.png

HeartbeatThread - 心跳续租
private final Counter REREGISTER_COUNTER = Monitors.newCounter(PREFIX+ "Reregister");private class HeartbeatThread implements Runnable {public void run() {//【1】更新操作if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}
}boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {//【2】客户端发送心跳包,获取响应httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);//【3】响应码=404,说明服务在EurekaServer不存在if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();long timestamp = instanceInfo.setIsDirtyWithTime();//【4】客户端重新发起一次register操作,给EurekaServerboolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}//【5】EurekaServer注册成功,则续约成功return success;}//【6】响应码=200,则在EurekaServer侧续约成功了return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}
}

代码分析:见下面流程图

345ce2121267fe7585d492bf0921565c.png

取消租约
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
}@PreDestroy
@Override
public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();Monitors.unregisterObject(this);logger.info("Completed shut down of DiscoveryClient");}
}private void cancelScheduledTasks() {if (instanceInfoReplicator != null) {instanceInfoReplicator.stop();}if (heartbeatExecutor != null) {heartbeatExecutor.shutdownNow();}if (cacheRefreshExecutor != null) {cacheRefreshExecutor.shutdownNow();}if (scheduler != null) {scheduler.shutdownNow();}if (cacheRefreshTask != null) {cacheRefreshTask.cancel();}if (heartbeatTask != null) {heartbeatTask.cancel();}
}

代码分析:见下面流程图

2f689c9147234a3979b9b72c3722c439.png

小结

我们回到开头的原理,知道EurekaClient客户端本质就是4个动作:

  1. 获取服务列表:在CacheRefreshThread里有实现,即CacheRefreshThread的【4.1】步骤的eurekaTransport.queryClient.getApplications

  2. 注册服务实例:在HeartbeatThread里有实现,即HeartbeatThread的【4】步骤的eurekaTransport.registrationClient.register

  3. 租约续约:在HeartbeatThread里有实现,即HeartbeatThread的【2】步骤的eurekaTransport.registrationClient.sendHeartBeat

  4. 取消租约:在定义CloudEurekaClient的@Bean(destroyMethod = "shutdown")注解有生命

但我们还想知道,CacheRefreshThread 和 HeartbeatThread的背后通信,以及在EurekaServer的原理细节。可以,我们放到下一个章节再讲。

其他文章

Kafka消息堆积问题排查

基于SpringMVC的API灰度方案

理解到位:灾备和只读数据库

SQL治理经验谈:索引覆盖

Mybatis链路分析:JDK动态代理和责任链模式的应用

大模型安装部署、测试、接入SpringCloud应用体系

Mybatis插件-租户ID的注入&拦截应用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/432194.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TypeScript 设计模式之【建造者模式】

文章目录 **建造者模式**&#xff1a;打造你的梦想之屋建造者的秘密建造者有什么利与害&#xff1f;如何使用建造者搭建各种房子代码实现案例建造者模式的主要优点建造者模式的主要缺点建造者模式的适用场景总结 建造者模式&#xff1a;打造你的梦想之屋 假设你想要一栋完美的…

SpringBoot代码实战(MyBatis-Plus+Thymeleaf)

构建项目 修改pom.xml文件&#xff0c;添加其他依赖以及设置 <!--MyBatis-Plus依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.6</version><…

LiveGBS流媒体平台GB/T28181功能-支持电子放大拉框放大直播视频拉框放大录像视频流拉框放大电子放大

LiveGBS流媒体平台GB/T28181功能-支持电子放大拉框放大直播视频拉框放大录像视频流拉框放大电子放大 1、直播播放2、录像播放3、搭建GB28181视频直播平台 1、直播播放 国标设备-》查看通道-》播放 &#xff0c;左键单击可以拉取矩形框&#xff0c;放大选中的范围&#xff0c;释…

序列化流(对象操作输出流)反序列化流(对象操作输入流)

可以把Java中的对象写到本地文件中 序列化流&#xff08;对象操作输出流&#xff09; 构造方法 成员方法 使用对象输出流将对象保存到文件会出现NotSerializableException异常 解决方案&#xff1a;需要让Javabean类实现Serializable接口 Student package myio;import java.…

家政服务预约系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;客户管理&#xff0c;员工管理&#xff0c;家政服务管理&#xff0c;服务预约管理&#xff0c;员工风采管理&#xff0c;客户需求管理&#xff0c;接单信息管理 微信端账号功能包括&#xff1a;系统首…

MySQL_子查询

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…

力扣最热一百题——寻找重复数(中等)

目录 题目链接&#xff1a;287. 寻找重复数 - 力扣&#xff08;LeetCode&#xff09; 题目描述 示例 提示&#xff1a; 解法一&#xff1a;暴力搜寻 Java写法&#xff1a; 运行时间 解法二&#xff1a;排序搜寻 Java写法&#xff1a; 运行时间 C写法&#xff1a; 运…

2024/9/26 英语每日一段

In part, that’s because it’s harder to empathize with someone who feels distant or unknown than a close loved one. “The more shared experiences you have with someone, the more of a rich, nuanced representation you can draw on,” Cameron says. But empath…

【Java网络编程】使用Tcp和Udp实现一个小型的回声客户端服务器程序

网络编程的概念 Java中的网络编程是指使用Java语言及其库创建和管理网络应用程序的过程。这一过程使得不同的计算机可以通过网络进行通信和数据交换。Java提供了一系列强大的API&#xff08;应用程序编程接口&#xff09;来支持网络编程&#xff0c;主要涉及以下几个概念&…

简易STL实现 | 红黑树的实现

1、原理 红黑树&#xff08;Red-Black Tree&#xff09;是一种自平衡的二叉搜索树 红黑树具有以下特性&#xff0c;这些特性保持了树的平衡&#xff1a; 节点颜色&#xff1a; 每个节点要么是红色&#xff0c;要么是黑色根节点颜色&#xff1a; 根节点是黑色的。叶子节点&…

【stm32】TIM定时器输出比较-PWM驱动LED呼吸灯/舵机/直流电机

TIM定时器输出比较 一、输出比较简介1、OC&#xff08;Output Compare&#xff09;输出比较2、PWM简介3、输出比较通道(高级)4、输出比较通道(通用)5、输出比较模式6、PWM基本结构配置步骤&#xff1a;程序代码&#xff1a;PWM驱动LED呼吸灯 7、参数计算8、舵机简介程序代码&am…

【笔记】KaiOS 系统框架和应用结构(APP界面逻辑)

KaiOS系统框架 最早自下而上分成Gonk-Gecko-Gaia层,代码有同名的目录,现在已经不用这种称呼。 按照官网3.0的版本迭代介绍,2.5->3.0已经将系统更新成如下部分: 仅分为上层web应用和底层平台核心,通过WebAPIs连接上下层,这也是kaios系统升级变更较大的部分。 KaiOS P…

括号匹配问题 -------------

1.题目说明&#xff1a; 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。左括号必须以正确的顺序闭合。每个右括号都有…

Jenkins入门:从搭建到部署第一个Springboot项目(踩坑记录)

本文讲述在虚拟机环境下(模拟服务器)&#xff0c;使用docker方式搭建jenkins&#xff0c;并部署一个简单的Springboot项目。仅记录关键步骤和遇到的坑&#xff0c;后续再进行细节补充。 一、环境准备和基础工具安装 1. 环境 系统环境为本机vmware创建的Ubuntu24.04。 2. yum…

【C++】STL--string(下)

1.string类对象的修改操作 erase&#xff1a;指定位置删除 int main() {string str1("hello world");str1.push_back(c);//尾插一个ccout << str1 << endl;string str2;str2.append("hello"); // 在str后追加一个字符"hello"cout…

CNN-LSTM预测 | MATLAB实现CNN-LSTM卷积长短期记忆神经网络时间序列预测

CNN-LSTM预测 | MATLAB实现CNN-LSTM卷积长短期记忆神经网络时间序列预测 目录 CNN-LSTM预测 | MATLAB实现CNN-LSTM卷积长短期记忆神经网络时间序列预测预测效果基本介绍模型描述程序设计参考资料预测效果 基本介绍 本次运行测试环境MATLAB2020b 提出一种包含卷积神经网络和长短…

多机部署,负载均衡-LoadBalance

文章目录 多机部署,负载均衡-LoadBalance1. 开启多个服务2. 什么是负载均衡负载均衡的实现客户端负载均衡 3. Spring Cloud LoadBalance快速上手使用Spring Cloud LoadBalance实现负载均衡修改IP,端口号为服务名称启动多个服务 负载均衡策略自定义负载均衡策略 LoadBalance原理…

c++模拟真人鼠标轨迹算法

一.鼠标轨迹算法简介 鼠标轨迹底层实现采用 C / C语言&#xff0c;利用其高性能和系统级访问能力&#xff0c;开发出高效的鼠标轨迹模拟算法。通过将算法封装为 DLL&#xff08;动态链接库&#xff09;&#xff0c;可以方便地在不同的编程环境中调用&#xff0c;实现跨语言的兼…

红外辐射在大气中的衰减原理(含C++实现)

目录 一、原理 1.1 水蒸气吸收衰减 1.2 二氧化碳的吸收衰减 1.3 大气的散射衰减 1.4 气象衰减 1.5 衰减后的红外辐射强度 二、C++实现 2.1 头文件 2.2 源文件 参考论文 一、原理 红外辐射在大气中传播的影响因素主要有3个: (1)大气中某些气体分子(H2O、CO2等)…

31214324

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…