Nacos 2.0 架构设计及新模型
参考 https://zhuanlan.zhihu.com/p/344572647
使用GRPC注册临时实例流程图
SpringBoot自动注入
注入对应服务注册的Bean
监听Tomcat启动事件
NacosAutoServiceRegistration 继承了AbstractAutoServiceRegistration 而 AbstractAutoServiceRegistration实现了ApplicationListener接口当tomcat启动之后会发送WebServerInitializedEvent事件 AbstractAutoServiceRegistration监听了WebServerInitializedEvent事件进行后续的注册操作
开始注册服务
构建出来的 Instance信息
临时实例使用GRPC协议注册
这里判断是不是临时节点 临时节点默认为true 会走NamingGrpcClientProxy的注册方法采用grpc协议,nacos2.x版本中临时节点默认都是此协议进行通信。持久化节点使用HTTP请求进行通信
缓存实例信息
开始注册服务信息
发送注册请求
将服务信息改为已注册
服务端接收注册请求
InstanceRequestHandler 继承自RequestHandler所以当RequestHandler执行handler方法的时候会执行InstanceRequestHandler 的handler方法进行逻辑处理
clientOperationService是一个临时实例还是持久化实例的代理类,去管理不同实例的注册行为
Client模型
Nacos2.x以后新增Client模型**。**一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的id(clientId)。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。我们可以看一下这个模型其实就是一个接口
public interface Client {// 客户端id/gRPC的connectionIdString getClientId();// 是否临时客户端boolean isEphemeral();// 客户端更新时间void setLastUpdatedTime();long getLastUpdatedTime();// 服务实例注册/注销/查询boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);InstancePublishInfo removeServiceInstance(Service service);InstancePublishInfo getInstancePublishInfo(Service service);Collection<Service> getAllPublishedService();// 服务订阅/取消订阅/查询订阅boolean addServiceSubscriber(Service service, Subscriber subscriber);boolean removeServiceSubscriber(Service service);Subscriber getSubscriber(Service service);Collection<Service> getAllSubscribeService();// 生成同步给其他节点的client数据ClientSyncData generateSyncData();// 是否过期boolean isExpire(long currentTime);// 释放资源void release();
}
服务信息Service与Instance创建
@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//缓存服务、缓存命名空间与服务的关系Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM,String.format("Current service %s is persistent service, can't register ephemeral instance.",singleton.getGroupedServiceName()));}//连接Id作为客户端Id,获取客户端Client client = clientManager.getClient(clientId);//检查客户端是否合法:客户端是否存在、客户端是否瞬时if (!clientIsLegal(client, clientId)) {return;}// 生成服务端存储的instance信息,并记录到ClientInstancePublishInfo instanceInfo = getPublishInfo(instance);//在客户端添加服务与实例的关系信息client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();// 发布注册服务事件,源码解读见下文:服务变更事件处理NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));// 发布元数据事件,源码解读见下文:管理元数据源码NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}
ServiceManager
Service的容器是ServiceManager,但是在com.alibaba.nacos.naming.core.v2包下,容器中Service都是单例。
public class ServiceManager {private static final ServiceManager INSTANCE = new ServiceManager();//单例Service,可以查看Service的equals和hasCode方法private final ConcurrentHashMap<Service, Service> singletonRepository;//namespace下的所有serviceprivate final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;.....
}
所以从这个位置可以看出,当调用这个注册方法的时候ServiceManager负责管理Service单例
//通过Map储存单例的Service
public Service getSingleton(Service service) {singletonRepository.putIfAbsent(service, service);Service result = singletonRepository.get(service);namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());namespaceSingletonMaps.get(result.getNamespace()).add(result);return result;
}
ClientManager
ClientManager这是一个接口这里我们要看它对应的一个实现类ConnectionBasedClientManager,这个实现类负责管理长连接clientId与Client模型的映射关系
// 根据clientId查询Client
public Client getClient(String clientId) {return clients.get(clientId);
}
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {if (null == publishers.put(service, instancePublishInfo)) {MetricsMonitor.incrementInstanceCount();}NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;
}
添加服务实例信息
@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));}
Client实例AbstractClient负责存储当前客户端的服务注册表,即Service与Instance的关系。注意对于单个客户端来说,同一个服务只能注册一个实例。
@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {//缓存服务与实例映射关系到客户端的容器、一个客户端对应一个连接if (null == publishers.put(service, instancePublishInfo)) {MetricsMonitor.incrementInstanceCount();}NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;}
可以从下图中看到 我把同一个服务 开了两个不同端口的实例 他每一个客户端id 都不相同 所以Clinet里面publishers这个map是一个service 对应一个实例
持久化节点注册
在Nacos 2.0版本中如果是持久化实例会使用NamingHttpClientProxy进行HTTP请求进行实例的注册
@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);if (instance.isEphemeral()) {BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//拼接请求参数final Map<String, String> params = new HashMap<String, String>(32);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, groupedServiceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put(IP_PARAM, instance.getIp());params.put(PORT_PARAM, String.valueOf(instance.getPort()));params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);}
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {return reqApi(api, params, Collections.EMPTY_MAP, method);}public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)throws NacosException {return reqApi(api, params, body, serverListManager.getServerList(), method);}
发送HTTP请求
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,String method) throws NacosException {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {throw new NacosException(NacosException.INVALID_PARAM, "no server available");}NacosException exception = new NacosException();if (serverListManager.isDomain()) {String nacosDomain = serverListManager.getNacosDomain();for (int i = 0; i < maxRetry; i++) {try {return callServer(api, params, body, nacosDomain, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);}}}} else {Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);try {return callServer(api, params, body, server, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", server, e);}}index = (index + 1) % servers.size();}}NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),exception.getErrMsg());throw new NacosException(exception.getErrCode(),"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());}
服务端接收HTTP请求进行注册
通过阅读官方文档可以知道,对于HTTP注册来说,服务端处理的API地址为:
http://ip:port/nacos/v1/ns/instance/register
InstanceController
/*** Register new instance.** @param request http request* @return 'ok' if success* @throws Exception any error during register*/@CanDistro@PostMapping@Secured(action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {// 获得当前请求中的命名空间信息,如果不存在则使用默认的命名空间final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// 获得当前请求中的服务名称,如果不存在则使用默认的服务名称final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);// 检查服务名称是否合法NamingUtils.checkServiceNameFormat(serviceName);// 将当前信息构造为一个Instance实例对象final Instance instance = HttpRequestInstanceBuilder.newBuilder().setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();// 根据当前对GRPC的支持情况 调用符合条件的处理,支持GRPC特征则调用InstanceOperatorClientImplgetInstanceOperator().registerInstance(namespaceId, serviceName, instance);return "ok";}
判断是否支持GRPC 如果支持GRPC则会使用V2版本 否则使用V1
private InstanceOperator getInstanceOperator() {return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;}
V1版本方案进行注册
先来看v1会调用 InstanceOperatorServiceImpl的registerInstance进行注册
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//创建服务信息,不存在则进行创建,同时创建service、cluster的关系//同时初始化service的时候,创建服务端的心跳检测判断任务createEmptyService(namespaceId, serviceName, instance.isEphemeral());// 再次获得服务信息Service service = getService(namespaceId, serviceName);// 再次判断服务checkServiceIsNull(service, namespaceId, serviceName);// 添加实例信息addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
创建服务信息
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {//从缓存中获取服务信息Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);//初始化serviceservice = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {//关联服务和集群的关系cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}//校验服务名称等是否合规service.validate();//初始话服务信息,创建心跳检测任务putServiceAndInit(service);if (!local) {//是否是临时服务,一致性处理addOrReplaceService(service);}}
在putServiceAndInit方法中的核心逻辑就是将当前服务信息放置到缓存中,同时调用初始化方法开启服务端的心跳检测任务,用于判断当前服务下的实例信息的变化,如果有变化则同时客户端.
public void init() {// 开启当前服务的心跳检测任务HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}
添加实例信息
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {//构建keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//从缓存中获得服务信息Service service = getService(namespaceId, serviceName);//为服务设置一把锁synchronized (service) {//这个方法里面就是最核心的对命名空间->服务->cluster->instance//基于这套数据结构和模型完成内存服务注册,就是在这里List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);// 真正你的Distro协议生效,主要是在这里,会去走distro的put逻辑// 会把你的服务实例数据页放在内存里,同时发起一个延迟异步任务的sync的数据复制任务// 延迟一段时间consistencyService.put(key, instances);}
}
对于addIpAddresses方法来说,核心的就是创建起相关的关联关系
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)throws NacosException {Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = CollectionUtils.set();for (Instance instance : currentIPs) {currentInstances.put(instance.toIpAddr(), instance);currentInstanceIds.add(instance.getInstanceId());}Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}return new ArrayList<>(instanceMap.values());
}
该方法结束以后,命名空间->服务->cluster->instance,这个存储结构的关系就确定了。
V2版本方案进行注册
InstanceOperatorClientImpl
@Overridepublic void registerInstance(String namespaceId, String serviceName, Instance instance) {//是否是临时实例boolean ephemeral = instance.isEphemeral();//获取客户端idString clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);createIpPortClientIfAbsent(clientId);//获取服务信息Service service = getService(namespaceId, serviceName, ephemeral);//注册实例信息clientOperationService.registerInstance(service, instance, clientId);}
持久化实例会使用PersistentClientOperationServiceImpl进行服务的信息注册 使用JRaft协议,将数据写入raft集群
@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {Service singleton = ServiceManager.getInstance().getSingleton(service);if (singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM,String.format("Current service %s is ephemeral service, can't register persistent instance.",singleton.getGroupedServiceName()));}final InstanceStoreRequest request = new InstanceStoreRequest();request.setService(service);request.setInstance(instance);request.setClientId(clientId);final WriteRequest writeRequest = WriteRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(serializer.serialize(request))).setOperation(DataOperation.ADD.name()).build();try {protocol.write(writeRequest);} catch (Exception e) {throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);}}
服务信息映射事件处理
在上面的流程中,可以看到调用通知中心派发了2个事件:
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)
new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId()
这里的目的是为了过滤目标服务得到最终Instance列表建立Service与Client的关系,建立Service与Client的关系就是为了加速查询。
ClientServiceIndexesManager类服务处理这个类的监听业务,ClientServiceIndexesManager维护了两个索引:
Service与发布clientId
Service与订阅clientId
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {addPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {removePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {addSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {removeSubscriberIndexes(service, clientId);}
}//建立Service与发布Client的关系
private void addPublisherIndexes(Service service, String clientId) {publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());publisherIndexes.get(service).add(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
从ClientServiceIndexesManager类的源代码中可以看到,该类注册订阅了4个事件:
客户端注册服务事件、客户端取消注册服务事件、客户端订阅服务事件、客户端取消订阅服务事件
@Override
public List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);result.add(ClientEvent.ClientDisconnectEvent.class);return result;
}
这个索引关系建立以后,还会触发ServiceChangedEvent,代表服务注册表变更。对于注册表变更紧接着还要做两个事情:
1.通知订阅客户端
2.Nacos集群数据同步。