文章目录
- 1 Nacos原理
- 1.1 Nacos架构
- 1.2 注册中心原理
- 1.3 SpringCloud服务注册
- 1.4 NacosServiceRegistry实现
- 1.4.1 心跳机制
- 1.4.2 注册原理
- 1.4.3 总结
- 1.5 服务提供者地址查询
- 1.6 Nacos服务地址动态感知原理
1 Nacos原理
1.1 Nacos架构
Provider APP
:服务提供者Consumer APP
:服务消费者Name Server
:通过VIP
(Virtual IP
)或DNS
的方式实现Nacos
高可用集群的服务路由Nacos Server
:Nacos
服务提供者,里面包含的Open API
是功能访问入口,Conig Service
、Naming Service
是Nacos
提供的配置服务、命名服务模块。Consitency Protocol
是一致性协议,用来实现Nacos
集群节点的数据同步,这里使用的是Raft
算法(Etcd、Redis哨兵选举)Nacos Console
:控制台
1.2 注册中心原理
注册中心原理:
- 服务实例在启动时注册到服务注册表,并在关闭时注销
- 服务消费者查询服务注册表,获得可用实例
- 服务注册中心需要调用服务实例的健康检查
API
来验证它是否能够处理请求
1.3 SpringCloud服务注册
在Spring-Cloud-Common
包中有一个类org.springframework.cloud. client.serviceregistry .ServiceRegistry
,它是Spring Cloud
提供的服务注册的标准。集成到Spring Cloud
中实现服务注册的组件,都会实现该接口。
该接口有一个实现类是 NacoServiceRegistry
。
SpringCloud
集成Nacos
的实现过程:
在spring-clou-commons
包的META-INF/spring.factories
中包含自动装配的配置信息如下:
其中AutoServiceRegistrationAutoConfiguration
就是服务注册相关的配置类:
@Configuration(proxyBeanMethods = false)
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value ="spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration{@Autowired(required = false)private AutoServiceRegistration autoServiceRegistration;@Autowiredprivate AutoServiceRegistrationProperties properties;@PostConstructprotected void init() {if (this.autoServiceRegistration == null && this.properties.isFailFast()) {throw new IllegalStateException("Auto Service Registration has been requested,but there is no AutoServiceRegistration bean");
}}}
在AutoServiceRegistrationAutoConfiguration
配置类中,可以看到注入了一个AutoServiceRegistration
实例,该类的关系图如下所示。
可以看出, AbstractAutoServiceRegistration
抽象类实现了该接口,并且最重要的是NacosAutoServiceRegistration
继承了AbstractAutoServiceRegistration
。
看到EventListener
我们就应该知道,Nacos
是通过Spring
的事件机制集成到SpringCloud
中去的。
AbstractAutoServiceRegistration
实现了onApplicationEvent
抽象方法,并且监听WebServerInitializedEvent
事件(当Webserver
初始化完成之后) , 调用this.bind ( event )
方法。
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {ApplicationContext context = event.getApplicationContext();if (context instanceof ConfigurableWebServerApplicationContext){if ("management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace )))return;}}this.port.compareAndSet( 0,event.getWebServer() .getPort());this.start();
}
最终会调用NacosServiceREgistry.register()
方法进行服务注册。
public void start() {if (!isEnabled()) {if (logger.isDebugEnabled()) [logger.debug("Discovery Lifecycle disabled. Not starting");}return;}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer belowif (!this.running.get()){this.context.publishEvent(new InstancePreRegisteredEvent(this,getRegistration()));register();if (shouldRegisterManagement()){registerManagement();}this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));this.running.compareAndSet(false,true);}
}protected void register(){this.serviceRegistry.register(getRegistration());
}
1.4 NacosServiceRegistry实现
在NacosServiceRegistry.registry
方法中,调用了Nacos Client SDK
中的namingService.registerInstance
完成服务的注册。
@Override
public void register(Registration registration){if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}String serviceId = registration.getServiceId();Instance instance = getNacosInstanceFromRegistration(registration):try{namingService.registerInstance(serviceId,instance);log.info("nacos registry,{} {} : {}register finished", serviceId,instance.getIp(),instance.getPort());}catch (Exception e) {log.error("nacos registry, {} register failed... {},",serviceId,registration.toString(),e);}
}
跟踪NacosNamingService
的registerInstance()
方法:
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {registerInstance(serviceName,Constants.DEFAULT_GROUP,instance);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeralO){BeatInfo beatInfo = new BeatInfo();beatInfo. setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance .getMetadata());beatInfo.setScheduled(false);long instanceInterval = instance.getInstanceHeartBeatInterval();beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}
serverProxy.registerService(Namingutils.getGroupedName(serviceName, groupName), groupName, instance);
}
通过beatReactor.addBeatInfo()
创建心跳信息实现健康检测, Nacos Server
必须要确保注册的服务实例是健康的,而心跳检测就是服务健康检测的手段。
serverProxy.registerService()
实现服务注册
1.4.1 心跳机制
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
从上述代码看,所谓心跳机制就是客户端通过schedule
定时向服务端发送一个数据包 ,然后启动一个线程不断检测服务端的回应,如果在设定时间内没有收到服务端的回应,则认为服务器出现了故障。Nacos
服务端会根据客户端的心跳包不断更新服务的状态。
1.4.2 注册原理
Nacos
提供了SDK
和Open API
两种形式来实现服务注册。
Open API:
curl -X POST ‘http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=192.16813.1&port=8080’
SDK:
void registerInstance(String serviceName, String ip, int port) throws NacosException;
这两种形式本质都一样,底层都是基于HTTP
协议完成请求的。所以注册服务就是发送一个HTTP请求:
public void registerService(String serviceName,
String groupName,Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",namespaceId,serviceName,instance);final Map<String,String> params = new HashMap<>(9);params.put(CommonParams.NAMESPACE_ID,namespaceId);params.put(CommonParams.SERVICE_NAME,serviceName);params.put(CommonParams.GROUP_NAME,groupName);params.put(CommonParams.CLUSTERNAME,instance.getClusterName());params.put("ip",instance.getIp());params .put("port",String. valueOf(instance.getPort()));params.put("weight",String.valueOf(instance.getWeight()));params.put("enable",String.valueOf(instance.isEnabled()));params.put("healthy",String.valueOf(instance.isHealthy()));params.put("ephemeral",String.valueOf(instance.isEphemeral()));params.put("metadata",JSON.toJSONString(instance.getMetadata()));regAPI(UtilAndComs .NACOS_URL_INSTANCE,params,HttpMethod.POSD);
}
对于nacos
服务端,对外提供的服务接口请求地址为nacos/v1/ns/instance
,实现代码在nacos-naming
模块下的InstanceController
类中:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT+"/instance")
public class InstanceController{
//省略部分代码
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {String serviceName = WebUtils.required(request,CommonParams.SERVICENAME);String namespaceId = WebUtils.optional(request,CommonParams,NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);serviceManager.registerInstance(namespaceId,serviceName,parseInstance(request));return"ok";}
//省略部分代码
}
-
从请求参数汇总获得
serviceName
(服务名)和namespaceId
(命名空间Id) -
调用
registerInstance
注册实例
public void registerInstance(String namespaceld, String serviceName, Instance instance)throws NacosException{createEmptyService(namespaceId,serviceNameinstance.isEphemeral());Service service=getService(namespaceId,serviceName);if (service== null){throw new NacosException(NacosException.INVALID_PARAM,"service not found,namespace:"+namespaceId +",service:"+serviceName);}addInstance(namespaceId,serviceName,instance.isEphemeral(),instance);
}
- 创建一个控服务(在
Nacos
控制台服务列表
中展示的服务信息),实际上是初始化一个serviceMap
,它是一个ConcurrentHashMap
集合 getService
,从serviceMap
中根据namespaceId
和serviceName
得到一个服务对象- 调用
addInstance
添加服务实例
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local,Cluster cluster) throws NacosException {Service service = getService(namespaceId,serviceName);if(service== null){service= new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));service.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if(cluster != null){cluster.setService(service);service.getClusterMap().put(cluster.getName(),cluster);}service.validate();putServiceAndInit(service);if(!local){addOrReplaceService(service);}}
}
- 根据
namespaceId
、serviceName
从缓存中获取Service
实例 - 如果
Service
实例为空,则创建并保存到缓存中
private void putServiceAndInit(Service service) throws NacosException{putService(service);service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),true),service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),false),service);Loggers.SRV_LOG.info("[NEW-SERVICE]{}",service.toJSON());
}
- 通过
putService()
方法将服务缓存到内存 service.init()
建立心跳机制consistencyService.listen
实现数据一致性监听
service.init()
方法的如下图所示,它主要通过定时任务不断检测当前服务下所有实例最后发送心跳包的时间。如果超时,则设置healthy
为false
表示服务不健康,并且发送服务变更事件。
在这里请大家思考一一个问题,服务实例的最后心跳包更新时间是谁来触发的?实际上前面有讲到, Nacos
客户端注册服务的同时也建立了心跳机制。
putService
方法,它的功能是将Service
保存到serviceMap
中:
public void putService(Service service)(if(!serviceMap.containsKey(service.getNamespaceId())){synchronized (putServiceLock){if(!serviceMap.containsKey(service,getNamespaceId())){serviceMap.put(service.getNamespaceId(),new ConcurrentHashMap<>(16));}}}serviceMap.get(service.getNamespaceId()).put(service.getName(),service);
}
继续调用addInstance
方法把当前注册的服务实例保存到Service中:
addInstance(namespaceId,serviceName,instance.isEphemeral(),instance)
1.4.3 总结
Nacos
客户端通过Open API
的形式发送服务注册请求Nacos
服务端收到请求后,做以下三件事:- 构建一个
Service
对象保存到ConcurrentHashMap
集合中 - 使用定时任务对当前服务下的所有实例建立心跳检测机制
- 基于数据一致性协议服务数据进行同步
- 构建一个
1.5 服务提供者地址查询
Open API:
curl -X GET127.00.1:8848/nacos/v1/ns/instance/list?serviceName=example
SDK:
List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException;
InstanceController
中的list
方法:
@GetMapping("/list")
public JSONObject list(HttpServletRequest request) throws Exception {String namespaceId = WebUtils.optional(request,CommonParams,NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request,CommonParams.SERVICE_NAME);String agent =WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request,"clusters",StringUtils.EMPTY);String clientIP = WebUtils.optional(request,"clientIp", StringUtils.EMPTY);Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort","0"));String env= WebUtils.optional(request,"env",StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request,"isCheck","false"));String app= WebUtils.optional(request,"app",StringUtils.EMPTY);String tenant = WebUtils.optional(request,"tid",StringUtils.EMPTY);boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request,"healthyOnly""false"));return doSrvIPXT(namespaceld, serviceName, agent, clusters, clientIP, udpPort, env,isCheck,app,tenant,healthyOnly);
}
- 解析请求参数
- 通过
doSrvIPXT
返回服务列表数据
public JSONObject doSrvIPXT(String namespaceld, String serviceName, String agent, String clusters,String clientIP,int udpPort,String env,boolean isCheck,String app,String tid,boolean healthyonly)
throws Exception {//以下代码中移除了很多非核心代码ClientInfo clientInfo = new ClientInfo(agent);JSONObject result=new JSONObject();Service service= serviceManager.getService(namespaceId,serviceName);List<Instance> srvedIPs;//获取指定服务下的所有实例 IPsrvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters,",")));Map<Boolean,List<Instance>>ipMap =new HashMap<>(2);ipMap.put(Boolean.TRUE,new ArrayList<>());ipMap.put(Boolean.FALSE,new ArrayList<>());for (Instance ip : srvedIPs){ipMap.get(ip.isHealthy()).add(ip);}//遍历,完成JSON字管中的纠装JSONArray hosts = new JSONArray();for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {List<Instance> ips = entry.getValue();if (healthyOnly && !entry.getKey()){continue;}for (Instance instance :ips) {if (!instanceisEnabled()) {continue;}JSONObject ipobj=new JSONObject();ipobj.put("ip",instance.getIp());ipObj.put("port",instance.getPort());ipObj.put("valid",entry.getKey());ipObj.put("healthy",entry.getKey());ipObj.put("marked",instance.isMarked());ipObj.put("instanceId",instance.getInstanceId());ipObj.put("metadata",instance.getMetadata());ipObj.put("enabled",instance.isEnabled());ipObj.put("weight",instance.getweight());ipObj.put("clusterName",instance.getClusterName());if(clientInfo.type== ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0."))>=0){ipObj.put("serviceName",instance.getServiceName());}else{ipObj.put("serviceName",NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put("ephemeral",instance.isEphemeral());hosts.add(ipobj);}}result.put("hosts",hosts);result.put("name",serviceName);result.put("cacheMillis",cacheMillis);result.put("lastRefTime",System.currentTimeMillis());result.put("checksum",service.getChecksum());result.put("useSpecifiedURL",false);result.put("clusters",clusters);result.put("env",env);result.put("metadata",service.getMetadata());return result;
}
- 根据
namespaceId
、serviceName
获得Service实例 - 从
Service
实例中基于srvIPs
得到所有服务提供者实例 - 遍历组装
JSON
字符串并返回
1.6 Nacos服务地址动态感知原理
可以通过subscribe
方法来实现监听,其中serviceName
表示服务名、EventListener
表示监听到的事件:
void subscribe(String serviceName, EventListener listener) throws NacosException;
具体调用方式如下:
NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
naming.subscribe("example",event->(if (event instanceof NamingEvent) {System.out.println(((NamingEvent) event).getServceName());System.out.printIn(((NamingEvent) event).getInstances());}
});
或者调用selectInstance
方法,如果将subscribe
属性设置为true
,会自动注册监听:
public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy,boolean subscribe)
Nacos
客户端中有一个HostReactor
类,它的功能是实现服务的动态更新,基本原理是:
- 客户端发起时间订阅后,在
HostReactor
中有一个UpdateTask
线程,每10s
发送一次Pull
请求,获得服务端最新的地址列表 - 对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个
Push
消息给Nacos
客户端,也就是服务端消费者 - 服务消费者收到请求之后,使用
HostReactor
中提供的processServiceJSON
解析消息,并更新本地服务地址列表