58.Nacos源码分析2

三、服务心跳。

3.服务心跳

Nacos的实例分为临时实例和永久实例两种,可以通过在yaml 文件配置:

spring:application:name: order-servicecloud:nacos:discovery:ephemeral: false # 设置实例为永久实例。true:临时; false:永久server-addr: 192.168.150.1:8845

临时实例基于心跳方式做健康检测,而永久实例则是由Nacos主动探测实例状态。

其中Nacos提供的心跳的API接口为:

接口描述:发送某个实例的心跳

请求类型:PUT

请求路径

/nacos/v1/ns/instance/beat

请求参数

名称类型是否必选描述
serviceName字符串服务名
groupName字符串分组名
ephemeralboolean是否临时实例
beatJSON格式字符串实例心跳内容

错误编码

错误代码描述语义
400Bad Request客户端请求中的语法错误
403Forbidden没有权限
404Not Found无法找到资源
500Internal Server Error服务器内部错误
200OK正常

3.1.客户端

在2.2.4.服务注册这一节中,我们说过NacosNamingService这个类实现了服务的注册,同时也实现了服务心跳:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 判断是否是临时实例。if (instance.isEphemeral()) {// 如果是临时实例,则构建心跳信息BeatInfoBeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);// 添加心跳任务beatReactor.addBeatInfo(groupedServiceName, beatInfo);}serverProxy.registerService(groupedServiceName, groupName, instance);
}

3.1.1.BeatInfo

这里的BeanInfo就包含心跳需要的各种信息:

3.1.2.BeatReactor

BeatReactor这个类则维护了一个线程池:

当调用BeatReactor.addBeatInfo(groupedServiceName, beatInfo)方法时,就会执行心跳:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);// 利用线程池,定期执行心跳任务,周期为 beatInfo.getPeriod()executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

心跳周期的默认值在com.alibaba.nacos.api.common.Constants类中:

可以看到是5秒,默认5秒一次心跳。

3.1.3.BeatTask

心跳的任务封装在BeatTask这个类中,是一个Runnable,其run方法如下:

@Override
public void run() {if (beatInfo.isStopped()) {return;}// 获取心跳周期long nextTime = beatInfo.getPeriod();try {// 发送心跳JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}// 判断心跳结果int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {// 如果失败,则需要 重新注册实例Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
​} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}
}

3.1.5.发送心跳

最终心跳的发送还是通过NamingProxysendBeat方法来实现:

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
​if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}// 组织请求参数Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));// 发送请求,这个地址就是:/v1/ns/instance/beatString result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);
}

3.2.服务端

对于临时实例,服务端代码分两部分:

  • 1)InstanceController提供了一个接口,处理客户端的心跳请求

  • 2)定时检测实例心跳是否按期执行

3.2.1.InstanceController

与服务注册时一样,在nacos-naming模块中的InstanceController类中,定义了一个方法用来处理心跳请求:

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {// 解析心跳的请求参数ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
​String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);// 尝试根据参数中的namespaceId、serviceName、clusterName、ip、port等信息// 从Nacos的注册表中 获取实例Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 如果获取失败,说明心跳失败,实例尚未注册if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}
​Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);// 这里重新注册一个实例instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());
​serviceManager.registerInstance(namespaceId, serviceName, instance);}// 尝试基于namespaceId和serviceName从 注册表中获取Service服务Service service = serviceManager.getService(namespaceId, serviceName);// 如果不存在,说明服务不存在,返回404if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 如果心跳没问题,开始处理心跳结果service.processClientBeat(clientBeat);
​result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;
}

最终,在确认心跳请求对应的服务、实例都在的情况下,开始交给Service类处理这次心跳请求。调用了Service的processClientBeat方法

3.2.2.处理心跳请求

查看Serviceservice.processClientBeat(clientBeat);方法:

public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

可以看到心跳信息被封装到了 ClientBeatProcessor类中,交给了HealthCheckReactor处理,HealthCheckReactor就是对线程池的封装,不用过多查看。

关键的业务逻辑都在ClientBeatProcessor这个类中,它是一个Runnable,其中的run方法如下:

@Override
public void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}
​String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();// 获取集群信息Cluster cluster = service.getClusterMap().get(clusterName);// 获取集群中的所有实例信息List<Instance> instances = cluster.allIPs(true);
​for (Instance instance : instances) {// 找到心跳的这个实例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 更新实例的最后一次心跳时间 lastBeatinstance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}
}

处理心跳请求的核心就是更新心跳实例的最后一次心跳时间,lastBeat,这个会成为判断实例心跳是否过期的关键指标!

3.3.3.心跳异常检测

在服务注册时,一定会创建一个Service对象,而Service中有一个init方法,会在注册时被调用:

public void init() {// 开启心跳检测的任务HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}

其中HealthCheckReactor.scheduleCheck就是执行心跳检测的定时任务:

可以看到,该任务是5000ms执行一次,也就是5秒对实例的心跳状态做一次检测。

此处的ClientBeatCheckTask同样是一个Runnable,其中的run方法为:

@Override
public void run() {try {// 找到所有临时实例的列表List<Instance> instances = service.allIPs(true);
​// first set health status of instances:for (Instance instance : instances) {// 判断 心跳间隔(当前时间 - 最后一次心跳时间) 是否大于 心跳超时时间,默认15秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {// 如果超时,标记实例为不健康 healthy = falseinstance.setHealthy(false);// 发布实例状态变更的事件getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}
​if (!getGlobalConfig().isExpireInstance()) {return;}
​// then remove obsolete instances:for (Instance instance : instances) {
​if (instance.isMarked()) {continue;}// 判断心跳间隔(当前时间 - 最后一次心跳时间)是否大于 实例被删除的最长超时时间,默认30秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果是超过了30秒,则删除实例Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}
​} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}
​
}

其中的超时时间同样是在com.alibaba.nacos.api.common.Constants这个类中:

3.3.4.主动健康检测

对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。

入口在2.3.2小节的ServiceManager类中的registerInstance方法:

创建空服务时:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {// 如果服务不存在,创建新的服务createServiceIfAbsent(namespaceId, serviceName, local, null);
}

创建服务流程:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {// 尝试获取服务Service service = getService(namespaceId, serviceName);if (service == null) {// 发现服务不存在,开始创建新服务Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();// ** 写入注册表并初始化 **putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}

关键在putServiceAndInit(service)方法中:

private void putServiceAndInit(Service service) throws NacosException {// 将服务写入注册表putService(service);service = getService(service.getNamespaceId(), service.getName());// 完成服务的初始化service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

进入初始化逻辑:service.init(),这个会进入Service类中:

/*** Init service.*/
public void init() {// 开启临时实例的心跳监测任务HealthCheckReactor.scheduleCheck(clientBeatCheckTask);// 遍历注册表中的集群for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);// 完成集群初识化entry.getValue().init();}
}

这里集群的初始化entry.getValue().init();会进入Cluster类型的init()方法:

/*** Init cluster.*/
public void init() {if (inited) {return;}// 创建健康检测的任务checkTask = new HealthCheckTask(this);// 这里会开启对 非临时实例的 定时健康检测HealthCheckReactor.scheduleCheck(checkTask);inited = true;
}

这里的HealthCheckReactor.scheduleCheck(checkTask);会开启定时任务,对非临时实例做健康检测。检测逻辑定义在HealthCheckTask这个类中,是一个Runnable,其中的run方法:

public void run() {
​try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {// 开始健康检测healthCheckProcessor.process(this);// 记录日志 。。。}} catch (Throwable e) {// 记录日志 。。。} finally {if (!cancelled) {// 结束后,再次进行任务调度,一定延迟后执行HealthCheckReactor.scheduleCheck(this);// 。。。}}
}

健康检测逻辑定义在healthCheckProcessor.process(this);方法中,在HealthCheckProcessor接口中,这个接口也有很多实现,默认是TcpSuperSenseProcessor

进入TcpSuperSenseProcessor的process方法:

@Override
public void process(HealthCheckTask task) {// 获取所有 非临时实例的 集合List<Instance> ips = task.getCluster().allIPs(false);
​if (CollectionUtils.isEmpty(ips)) {return;}
​for (Instance ip : ips) {// 封装健康检测信息到 BeatBeat beat = new Beat(ip, task);// 放入一个阻塞队列中taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();}
}

可以看到,所有的健康检测任务都被放入一个阻塞队列,而不是立即执行了。这里又采用了异步执行的策略,可以看到Nacos中大量这样的设计。

TcpSuperSenseProcessor本身就是一个Runnable,在它的构造函数中会把自己放入线程池中去执行,其run方法如下:

public void run() {while (true) {try {// 处理任务processTask();// ...} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}}
}

通过processTask来处理健康检测的任务:

private void processTask() throws Exception {// 将任务封装为一个 TaskProcessor,并放入集合Collection<Callable<Void>> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;}
​tasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);// 批量处理集合中的任务for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();}
}

任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:

@Override
public Void call() {// 获取检测任务已经等待的时长long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {// 获取实例信息Instance instance = beat.getIp();// 通过NIO建立TCP连接channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);
​Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));// 注册连接、读取事件SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));
​beat.setStartTime(System.currentTimeMillis());
​GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage());
​if (channel != null) {try {channel.close();} catch (Exception ignore) {}}}
​return null;
}

3.3.总结

Nacos的健康检测有两种模式:

  • 临时实例:

    • 采用客户端心跳检测模式,心跳周期5秒

    • 心跳间隔超过15秒则标记为不健康

    • 心跳间隔超过30秒则从服务列表删除

  • 永久实例:

    • 采用服务端主动健康检测方式

    • 周期为2000 + 5000毫秒内的随机数

    • 检测异常只会标记为不健康,不会删除

那么为什么Nacos有临时和永久两种实例呢?

以淘宝为例,双十一大促期间,流量会比平常高出很多,此时服务肯定需要增加更多实例来应对高并发,而这些实例在双十一之后就无需继续使用了,采用临时实例比较合适。而对于服务的一些常备实例,则使用永久实例更合适。

与eureka相比,Nacos与Eureka在临时实例上都是基于心跳模式实现,差别不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。

另外,Nacos支持永久实例,而Eureka不支持,Eureka只提供了心跳模式的健康监测,而没有主动检测功能。

四、服务发现。

4.服务发现

Nacos提供了一个根据serviceId查询实例列表的接口:

接口描述:查询服务下的实例列表

请求类型:GET

请求路径

/nacos/v1/ns/instance/list

请求参数

名称类型是否必选描述
serviceName字符串服务名
groupName字符串分组名
namespaceId字符串命名空间ID
clusters字符串,多个集群用逗号分隔集群名称
healthyOnlyboolean否,默认为false是否只返回健康实例

错误编码

错误代码描述语义
400Bad Request客户端请求中的语法错误
403Forbidden没有权限
404Not Found无法找到资源
500Internal Server Error服务器内部错误
200OK正常

4.1.客户端

4.1.1.定时更新服务列表

4.1.1.1.NacosNamingService

在2.2.4小节中,我们讲到一个类NacosNamingService,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。

多个重载的方法最终都会进入一个方法:

@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,boolean subscribe) throws NacosException {
​ServiceInfo serviceInfo;// 1.判断是否需要订阅服务信息(默认为 true)if (subscribe) {// 1.1.订阅服务信息serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));} else {// 1.2.直接去nacos拉取服务信息serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));}// 2.从服务信息中获取实例列表并返回List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<Instance>();}return list;
}
4.1.1.2.HostReactor

进入1.1.订阅服务消息,这里是由HostReactor类的getServiceInfo()方法来实现的:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
​NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());// 由 服务名@@集群名拼接 keyString key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo>ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 判断缓存是否存在if (null == serviceObj) {// 不存在,创建空ServiceInfoserviceObj = new ServiceInfo(serviceName, clusters);// 放入缓存serviceInfoMap.put(serviceObj.getKey(), serviceObj);// 放入待更新的服务列表(updatingMap)中updatingMap.put(serviceName, new Object());// 立即更新服务列表updateServiceNow(serviceName, clusters);// 从待更新列表中移除updatingMap.remove(serviceName);
​} else if (updatingMap.containsKey(serviceName)) {// 缓存中有,但是需要更新if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finish 等待5秒中,待更新完成synchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 开启定时更新服务列表的功能scheduleUpdateIfAbsent(serviceName, clusters);// 返回缓存中的服务信息return serviceInfoMap.get(serviceObj.getKey());
}

基本逻辑就是先从本地缓存读,根据结果来选择:

  • 如果本地缓存没有,立即去nacos读取,updateServiceNow(serviceName, clusters)

  • 如果本地缓存有,则开启定时更新功能,并返回缓存结果:

    • scheduleUpdateIfAbsent(serviceName, clusters)

    在UpdateTask中,最终还是调用updateService方法:

不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor中的updateService()方法:

public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 基于ServerProxy发起远程调用,查询服务列表String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
​if (StringUtils.isNotEmpty(result)) {// 处理查询结果processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}
4.1.1.3.ServerProxy

而ServerProxy的queryList方法如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {// 准备请求参数final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));// 发起请求,地址与API接口一致return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

4.1.2.处理服务变更通知

除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。

在HostReactor类的构造函数中,有非常重要的几个步骤:

基本思路是:

  • 通过PushReceiver监听服务端推送的变更数据

  • 解析数据后,通过NotifyCenter发布服务变更的事件

  • InstanceChangeNotifier监听变更事件,完成对服务列表的更新

4.1.2.1.PushReceiver

我们先看PushReceiver,这个类会以UDP方式接收Nacos服务端推送的服务变更数据。

先看构造函数:

public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;// 创建 UDP客户端String udpPort = getPushReceiverUdpPort();if (StringUtils.isEmpty(udpPort)) {this.udpSocket = new DatagramSocket();} else {this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));}// 准备线程池this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});// 开启线程任务,准备接收变更数据this.executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);}
}

PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:

@Override
public void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 接收推送数据udpSocket.receive(packet);// 解析为json字符串String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());// 反序列化为对象PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {// 交给 HostReactor去处理hostReactor.processServiceJson(pushPacket.data);
​// send ack to server 发送ACK回执,略。。} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}
4.1.2.2.HostReactor

通知数据的处理由交给了HostReactorprocessServiceJson方法:

public ServiceInfo processServiceJson(String json) {// 解析出ServiceInfo信息ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 查询缓存中的 ServiceInfoServiceInfo oldService = serviceInfoMap.get(serviceKey);
​// 如果缓存存在,则需要校验哪些数据要更新boolean changed = false;if (oldService != null) {// 拉取的数据是否已经过期if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "+ serviceInfo.getLastRefTime());}// 放入缓存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;// modHosts:需要修改的实例if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {// 发布实例变更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, cacheDir);}
​} else {// 本地缓存不存在changed = true;// 放入缓存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 直接发布实例变更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}// 。。。return serviceInfo;
}

4.2.服务端

4.2.1.拉取服务列表接口

在2.3.1小节介绍的InstanceController中,提供了拉取服务列表的接口:

/*** Get all instance of input service.** @param request http request* @return list of instance* @throws Exception any error during list*/
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {// 从request中获取namespaceId和serviceNameString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);
​String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);// 获取客户端的 UDP端口int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
​String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
​String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
​boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
​// 获取服务列表return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}

进入doSrvIpxt()方法来获取服务列表:

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,String clusters, String clientIP,int udpPort, String env, boolean isCheck,String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();// 获取服务列表信息Service service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();
​// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {// 添加当前客户端 IP、UDP端口到 PushService 中pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}
​if (service == null) {// 如果没找到,返回空if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.put("cacheMillis", cacheMillis);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}// 结果的检测,异常实例的剔除等逻辑省略// 最终封装结果并返回 。。。
​result.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}

4.2.2.发布服务变更的UDP通知

在上一节中,InstanceController中的doSrvIpxt()方法中,有这样一行代码:

pushService.addClient(namespaceId, serviceName, clusters, agent,new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);

其实是把消费者的UDP端口、IP等信息封装为一个PushClient对象,存储PushService中。方便以后服务变更后推送消息。

PushService类本身实现了ApplicationListener接口:

这个是事件监听器接口,监听的是ServiceChangeEvent(服务变更事件)。

当服务列表变化时,就会通知我们:

4.3.总结

Nacos的服务发现分为两种模式:

  • 模式一:主动拉取模式,消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。

  • 模式二:订阅模式,消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者。

与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/214320.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx负载均衡实战

&#x1f3b5;负载均衡组件 ngx_http_upstream_module https://nginx.org/en/docs/http/ngx_http_upstream_module.html upstream模块允许Nginx定义一组或多组节点服务器组&#xff0c;使用时可以通过多种方式去定义服务器组 样例&#xff1a; upstream backend {server back…

Python开源项目周排行 2023年 第39周

Python 趋势周报&#xff0c;按周浏览往期 GitHub,Gitee 等最热门的Python开源项目&#xff0c;入选的项目主要参考GitHub Trending,部分参考了Gitee和其他。排名不分先后&#xff0c;都是当周相对热门的项目。 入选公式&#xff1d;70%GitHub Trending20%Gitee10%其他 关注微…

C# WPF上位机开发(动态库dll的开发)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 很多时候&#xff0c;我们并不希望所有的程序都放到一个exe里面。因为这样相当于把所有的风险都放在了一个文件里里面&#xff0c;既不利于程序的升…

[CTFshow 红包挑战] 刷题记录

文章目录 红包挑战7红包挑战8红包挑战9 红包挑战7 考点&#xff1a;xdebug拓展 源码 <?php highlight_file(__FILE__); error_reporting(2);extract($_GET); ini_set($name,$value);system("ls ".filter($_GET[1])."" );function filter($cmd){$cmd s…

P2 Qt Creator创建第一个Qt程序

前言 &#x1f3ac; 个人主页&#xff1a;ChenPi &#x1f43b;推荐专栏1: 《C_ChenPi的博客-CSDN博客》✨✨✨ &#x1f525; 推荐专栏2: 《LLinux C应用编程&#xff08;概念类&#xff09;_ChenPi的博客-CSDN博客》✨✨✨ &#x1f33a;本篇简介 &#xff1a;这一章我们学…

简易加减运算器的制作----数字电路设计(含proteus仿真)

简易加减运算器的制作 一、功能要求—基本功能 1、自制0-9按键&#xff0c;在一个LED数码管上稳定地显示当前按下的值。&#xff08;基本功能&#xff09; 2、增加、两个按键&#xff0c;实现0-9两个一位数的加法运算&#xff0c;同时在两位LED上稳定地显示运算结果。&#…

C#大型LIS检验信息系统项目源码

LIS系统&#xff0c;一套医院检验科信息系统。它是以数据库为核心&#xff0c;将实验仪器与电脑连接成网&#xff0c;基础功能包括病人样本登录、实验数据存取、报告审核、打印分发等。除基础功能外&#xff0c;实验数据统计分析、质量控制管理、人员权限管理、试剂出入库等功能…

Java UDP 多人聊天室简易版

服务端 import java.io.*; import java.net.*; import java.util.ArrayList; public class Server{public static ServerSocket server_socket;public static ArrayList<Socket> socketListnew ArrayList<Socket>(); public static void main(String []args){try{…

实战演练 | 在 Navicat 中格式化日期和时间

Navicat 支持团队收到来自用户常问的一个问题是&#xff0c;如何将网格和表单视图中的日期和时间进行格式化。其实这个很简单。今天&#xff0c;我们将介绍在 Navicat Premium 中进行全局修改日期和时间格式的步骤。 如果你想边学边用&#xff0c;欢迎点击 这里 下载免费全功能…

EM32DX-C4【C#】站15

1外观: J301 直流 24V 电源输入 CAN0 CAN0 总线接口 CAN1 CAN1 总线接口 J201 IO 接线段子 S301-1、S301-2 输出口初始电平拨码设置 S301-3~S301-6 模块 CAN ID 站号拨码开关 S301-7 模块波特率拨码设置 S301-8 终端电阻选择开关 2DI: 公共端是: 0V【GND】 6100H …

基于ssm实验室开放管理系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本实验室开放管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信…

App备案、ios备案Bundle ID查询、公钥信息、SHA-1值

App备案、ios备案Bundle ID查询、公钥信息、SHA-1值 Bundle ID这个就不说了&#xff0c;都知道是啥&#xff0c;主要说公钥信息和SHA-1值的获取 打开钥匙串访问&#xff0c;找到当前需要备案App的dis证书&#xff0c;如下&#xff1a; #####右键点击显示简介 #####可以看…

Navicat 技术指引 | 适用于 GaussDB 分布式的调试器

Navicat Premium&#xff08;16.3.3 Windows 版或以上&#xff09;正式支持 GaussDB 分布式数据库。GaussDB 分布式模式更适合对系统可用性和数据处理能力要求较高的场景。Navicat 工具不仅提供可视化数据查看和编辑功能&#xff0c;还提供强大的高阶功能&#xff08;如模型、结…

[架构之路-261]:目标系统 - 设计方法 - 软件工程 - 软件设计 - 架构设计 - 网络数据交换格式

一、网络数据交换格式 1.1 什么是网络数据交换格式 网络数据交换格式指的是在计算机网络中传输和存储数据时所采用的特定格式。 它定义了数据的组织方式、结构和编码规则&#xff0c;以便不同系统和应用程序之间能够准确地解析和处理数据。 网络数据交换格式的主要目的是&a…

IDEA启动应用时报错:错误: 找不到或无法加载主类 @C:\Users\xxx\AppData\Local\Temp\idea_arg_filexxx

IDEA启动应用时报错&#xff0c;详细错误消息如下&#xff1a; C:\devel\jdk1.8.0_201\bin\java.exe -agentlib:jdwptransportdt_socket,address127.0.0.1:65267,suspendy,servern -XX:TieredStopAtLevel1 -noverify -Dspring.output.ansi.enabledalways -Dcom.sun.management…

安装python第三方库后,在pycharm中不能正常导入

python小白学习opencv&#xff0c;使用pip安装完opencv库后import cv2报错&#xff0c;按照如下设置解决&#xff1a; 需要正确设置python解释器路径

Linux权限理解

文章目录 前言概述Linux下的权限Linux权限管理文件访问者的分类&#xff1a;属性&#xff1a;文件权限值表示方法&#xff1a; 文件类型&#xff1a; 权限的修改chmod对 text.txt 文件的权限进行修改法1&#xff1a;法2&#xff1a; chownchgrpumaskfile指令目录权限粘滞位 前言…

SLAM教程:ROS学习

玩SLAM一定会遇到ROS,你可以看看稚晖君里的机器人操作系统,许多控制机器的软件代码很大程度都是基于ROS,多传感融合也是基于ROS,在GitHub上几乎大部分的多传感融合以及机器人操作代码框架都是基于ROS。ROS 的主要目标是为机器人研究和开发提供代码复用的支持。ROS是一个分布…

【附源码】完整版,Python+Selenium+Pytest+POM自动化测试框架封装

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、测试框架简介 …

Hadoop学习笔记(HDP)-Part.08 部署Ambari集群

目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …