【探索SpringCloud】服务发现-Nacos服务端数据结构和模型

前言

上一文中,我们从官方的图示了解到Nacos的服务数据结构。但我关心的是,Nacos2.x不是重构了吗?怎么还是这种数据结构?我推测,必然是为了对Nacos1.x的兼容,实际存储应该不是这样的。于是,沿着这个问题出发我们一起来翻一下源码。

从NamingService的使用开始

在扎入源码之前,我们需要回忆一下,我们是怎么使用Nacos的?

  1. 构建NamingService
    NamingService serviceRegistry = NacosFactory.createNamingService(properties);
    实际上,这个动作的背后,意味着我们连接了Nacos服务端。
  2. 注册服务
    serviceRegistry.registerInstance(serviceName, groupName, instance);
  3. 查询服务
    serviceRegistry.getAllInstances(serviceName, groupName, List.of(clusterName));
    因此,我们就沿着这几个操作,摸一摸源码。

!!!高能警告!!!

没有耐心看源码的同学,可以直接翻到总结,直接看结论。

构建NamingService

客户端

    // com.alibaba.nacos.client.naming.NacosNamingService/*** 初始化方法* <p>由NacosNamingService构造器调用,用于初始NamingService</p>*/private void init(Properties properties) throws NacosException {final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);// 省略...// 创建客户端this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);}// com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate/*** NamingClientProxyDelegate构造器*/    public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,InstancesChangeNotifier changeNotifier) throws NacosException {// 省略...// 初始化了两个客户端,一个是Http,另一个是Grpc。不过,在注册实例时,如果该实例为临时实例,则使用Grpc,因此我们重点关注Grpcthis.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);}// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy/*** NamingGrpcClientProxy构造器*/    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {// 省略...// 创建RPC客户端this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);this.redoService = new NamingGrpcRedoService(this);// 启动客户端start(serverListFactory, serviceInfoHolder);}// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxyprivate void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);rpcClient.registerConnectionListener(redoService);rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));// 启动客户端rpcClient.start();NotifyCenter.registerSubscriber(this);}// com.alibaba.nacos.common.remote.client.RpcClient#start/*** 启动客户端*/public final void start() throws NacosException {// 控制只启动一次boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}// 创建一个只有2个线程的定时任务线程池clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});// 提交-处理连接事件的TaskclientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take;take = eventLinkedBlockingQueue.take();if (take.isConnected()) {notifyConnected();} else if (take.isDisConnected()) {notifyDisConnected();}}});// 提交-心跳任务clientEventExecutor.submit(() -> {while (true) {// 由于这里有一大堆逻辑,省略。// 1. 超过时间间隔,发起心跳请求// 1.1 心跳请求失败,记录当前状态为不健康,并记录上下文。// 1.2 检查当前配置的推荐的Nacos服务器是否在服务器列表中。在,则尝试重新连接推荐的服务器。});// connect to server, try to connect to server sync retryTimes times, async starting if failed.// 连接服务端,尝试retryTimes次,同步地连接服务端,如果依然失败,则改为异步连接。Connection connectToServer = null;rpcClientStatus.set(RpcClientStatus.STARTING);int startUpRetryTimes = rpcClientConfig.retryTimes();while (startUpRetryTimes > 0 && connectToServer == null) {try {startUpRetryTimes--;ServerInfo serverInfo = nextRpcServer();// 连接服务器connectToServer = connectToServer(serverInfo);} catch (Throwable e) {LoggerUtils.printIfWarnEnabled(LOGGER,"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);}}if (connectToServer != null) {this.currentConnection = connectToServer;rpcClientStatus.set(RpcClientStatus.RUNNING);eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));} else {switchServerAsync();}registerServerRequestHandler(new ConnectResetRequestHandler());// register client detection request.// 注册客户端检测请求处理器,用于响应服务端的探测registerServerRequestHandler(request -> {if (request instanceof ClientDetectionRequest) {return new ClientDetectionResponse();}return null;});}

服务端-处理连接请求

服务端的源码首先我们得找到GrpcServer

@Overridepublic void startServer() throws Exception {// 1. 创建请求处理器注册器final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();// 2. 注册请求处理器,并封装拦截器器。封装后,有点类似于SpringMVC的HandlerAdapteraddServices(handlerRegistry, new GrpcConnectionInterceptor());NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());// 省略server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry).compressorRegistry(CompressorRegistry.getDefaultInstance()).decompressorRegistry(DecompressorRegistry.getDefaultInstance()).addTransportFilter(new AddressTransportFilter(connectionManager)).keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS).keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS).permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS).build();// 启动服务server.start();}private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {// unary common call register.// 通用调用注册final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.UNARY).setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,GrpcServerConstants.REQUEST_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();// 定义服务器调用处理器。核心处理逻辑可就在这lambda表达式定义的匿名内部类里了。也只有一个方法:// grpcCommonRequestAcceptor.request(request, responseObserver)final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(GrpcServerConstants.REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));// bi stream register.// bi流式调用服务,主要是连接请求、连接断开// 核心处理逻辑:// grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));}

处理连接请求:

	// com.alibaba.nacos.core.remote.grpc.GrpcBiStreamRequestAcceptor@Overridepublic StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();String clientIp = "";@Overridepublic void onNext(Payload payload) {// 处理连接请求clientIp = payload.getMetadata().getClientIp();traceDetailIfNecessary(payload);Object parseObj;// 省略...// 检查if (parseObj instanceof ConnectionSetupRequest) {ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;// 设置label,省略// 构建ConnectionConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());metaInfo.setTenant(setUpRequest.getTenant());// 第三个参数Channel,是发生网路数据的关键Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());connection.setAbilities(setUpRequest.getAbilities());boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();// 注册连接, 重点在 “或” 条件上// connectionManager.registerif (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {//Not register to the connection manager if current server is over limit or server is starting.// 如果当前服务器已超限制,或者服务器还在启动过程中,则注册失败。connection.request(new ConnectResetRequest(), 3000L);connection.close();}}            // 省略。。。}// 省略。。。};return streamObserver;}

这里出现了我们接触到的第一个概念:Connection-连接,他有个属性ConnectionMeta,记录连接相关的信息。当需要发起请求时,他会将这些信息设置到Request中,然后通过GrpcUtils转换成Payload发出请求
继续看com.alibaba.nacos.core.remote.ConnectionManager#register

    public synchronized boolean register(String connectionId, Connection connection) {if (connection.isConnected()) {String clientIp = connection.getMetaInfo().clientIp;// 省略入参检查// 注册客户端connections.put(connectionId, connection);// 登记客户端IPif (!connectionForClientIp.containsKey(clientIp)) {connectionForClientIp.put(clientIp, new AtomicInteger(0));}connectionForClientIp.get(clientIp).getAndIncrement();// 通知客户端连接ListenerclientConnectionEventListenerRegistry.notifyClientConnected(connection);return true;}return false;}

此处出现第一个Manager:ConnectionManager。用来管理所有客户端的连接。登记连接后,调用了所有的Listener的clientConnected方法。其中,有个ConnectionBasedClientManager,看名字就知道,可能是负责管理客户端的。

	// > ConnectionBasedClientManager#clientConnected(com.alibaba.nacos.core.remote.Connection)//  > ConnectionBasedClientManager#clientConnected(java.lang.String, com.alibaba.nacos.naming.core.v2.client.ClientAttributes)// ConnectionBasedClientManager@Overridepublic boolean clientConnected(String clientId, ClientAttributes attributes) {String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);// 通过ClientFactory创建客户端// 从以上的两行代码,我们通过ClientConstants.CONNECTION_TYPE就知道工厂是ConnectionBasedClientFactory,对应的客户端自然是ConnectionBasedClientreturn clientConnected(clientFactory.newClient(clientId, attributes));}@Overridepublic boolean clientConnected(final Client client) {// 登记客户端clients.computeIfAbsent(client.getClientId(), s -> {return (ConnectionBasedClient) client;});return true;}

至此,我们又发现一个新概念:Client-客户端。由Grpc连接的客户端,都由ConnectionBasedClientManager进行管理。

小结

概念管理者
连接com.alibaba.nacos.core.remote.ConnectionConnectionManager
客户端com.alibaba.nacos.naming.core.v2.client.ClientClientManager

注册实例

客户端

在这里插入图片描述
我们重点看看

    public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {// 创建请求。每个Request在Nacos服务端都由对应的HandlerInstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,NamingRemoteConstants.REGISTER_INSTANCE, instance);requestToServer(request, Response.class);redoService.instanceRegistered(serviceName, groupName);}

服务端

我们前面说服务端启动时,说这个是负责处理通用请求的:

final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));

我们就顺着往下看

	// com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request@Overridepublic void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {String type = grpcRequest.getMetadata().getType();// 省略如下内容:// 检查服务是否已启动// 如果是客户端对服务端的健康检查,则直接响应// ----------------------------// 从对应的请求处理器RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);// 省略:no handler found. 的异常处理// ----------------------------String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();// 省略:检查连接是否正常.Object parseObj = null;parseObj = GrpcUtils.parse(grpcRequest);// 省略:转换异常、无效请求异常Request request = (Request) parseObj;// 从ConnectionManager获取到对应的ConnectionConnection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());// 组装RequestMetaRequestMeta requestMeta = new RequestMeta();requestMeta.setClientIp(connection.getMetaInfo().getClientIp());requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());requestMeta.setClientVersion(connection.getMetaInfo().getVersion());requestMeta.setLabels(connection.getMetaInfo().getLabels());connectionManager.refreshActiveTime(requestMeta.getConnectionId());// 调用requestHandler处理请求Response response = requestHandler.handleRequest(request, requestMeta);Payload payloadResponse = GrpcUtils.convert(response);traceIfNecessary(payloadResponse, false);responseObserver.onNext(payloadResponse);responseObserver.onCompleted();}

这些便是通用请求处理的核心逻辑。现在我们便来看InstanceRequest的处理com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler

    @Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE:return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE:return deregisterInstance(service, request, meta);default:throw new NacosException}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)throws NacosException {// 1. 注册实例clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());// 2. 发布事件:RegisterInstanceTraceEventNotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),request.getInstance().getIp(), request.getInstance().getPort()));return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}// 注册实例:
// com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#registerInstance@Overridepublic void registerInstance(Service service, Instance instance, String clientId) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);// 从ServiceManager获取已注册服务。而我们当前是要注册实例,所以,这个方法肯定还内含玄机Service singleton = ServiceManager.getInstance().getSingleton(service);// 省略:如果获取到的是持久化实例,意味着当前注册临时实例冲突了,返回异常。Client client = clientManager.getClient(clientId);InstancePublishInfo instanceInfo = getPublishInfo(instance);// 记录当前客户端发布的实例client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();client.recalculateRevision();// 发布服务注册事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}// com.alibaba.nacos.naming.core.v2.ServiceManager/*** Get singleton service. Put to manager if no singleton.* 获取单例服务(单例意味着整个应用只有一个对象),如果不存在,则注册到Manager*/public Service getSingleton(Service service) {// 如果不存在就注册singletonRepository.computeIfAbsent(service, key -> {// 发布服务元信息数据事件。不过该事件对于持久实例才有用处。NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));return service;});Service result = singletonRepository.get(service);// 将服务登记到namespacenamespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());namespaceSingletonMaps.get(result.getNamespace()).add(result);return result;}// 再看看ClientOperationEvent.ClientRegisterServiceEvent// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#onEvent//  > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation//   > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#addPublisherIndexes// 登记发布服务的客户端private void addPublisherIndexes(Service service, String clientId) {publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());publisherIndexes.get(service).add(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}

小结

我们总结一下,以上涉及到的概念。

概念管理者描述
服务com.alibaba.nacos.naming.core.v2.pojo.ServiceServiceManager

除了这个概念,实际上我们还看到Client的内部结构:
AbstractClient:

  • 记录客户端发布的服务:ConcurrentHashMap<Service, InstancePublishInfo> publishers
  • 记录客户端订阅的服务:ConcurrentHashMap<Service, Subscriber> subscribers

    这个点其实要到订阅服务请求才会分析到,但为了信息不会太分散,所以就放到一起了。

ClientServiceIndexesManager

  • 客户端索引管理者。这里的索引指的是,通过Service快速找到客户端,只是客户端有ClientManager,如果这里再存一份也不合适,不利于数据维护。因此这里存的是clientId。估计也是如此,他才叫客户端索引管理者。

查询和订阅实例

> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>)> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>, boolean)> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe)> com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe> com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doSubscribepublic ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {// 重点SubscribeServiceRequest,看服务端代码需要知道是什么请求SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();}

服务端

	// com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler@Override@Secured(action = ActionTypes.READ)public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);Service service = Service.newService(namespaceId, groupName, serviceName, true);// 订阅者Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());// 服务信息,这里有几个参数是需要通过方法来获取的// 重点是:serviceStorage.getData(service)// 而这个方法也是个重要的方法,过滤cluster、健康实例,并执行自动保护机制,都是他实现的ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,true, subscriber.getIp());if (request.isSubscribe()) {// 订阅服务clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());// 发布订阅事件NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));} else {// 取消订阅clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);}// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getData//  > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData//   > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getAllInstancesFromIndex// > com.alibaba.nacos.naming.utils.ServiceUtil#selectInstancesWithHealthyProtection(com.alibaba.nacos.api.naming.pojo.ServiceInfo, com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata, java.lang.String, boolean, boolean, java.lang.String)//  > com.alibaba.nacos.naming.utils.ServiceUtil#doSelectInstances// 上面是调用路径,这里把生产数据的方法重点捞出来// ServiceStorage的数据生产public ServiceInfo getPushData(Service service) {ServiceInfo result = emptyServiceInfo(service);if (!ServiceManager.getInstance().containSingleton(service)) {return result;}Service singleton = ServiceManager.getInstance().getSingleton(service);result.setHosts(getAllInstancesFromIndex(singleton));// 从ServiceManager拿到服务的实例信息,并登记到ServiceStorage#serviceDataIndexes中serviceDataIndexes.put(singleton, result);return result;}private List<Instance> getAllInstancesFromIndex(Service service) {Set<Instance> result = new HashSet<>();Set<String> clusters = new HashSet<>();for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {// 获取实例信息Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);if (instancePublishInfo.isPresent()) {InstancePublishInfo publishInfo = instancePublishInfo.get();//If it is a BatchInstancePublishInfo type, it will be processed manually and added to the instance listif (publishInfo instanceof BatchInstancePublishInfo) {BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);result.addAll(batchInstance);} else {Instance instance = parseInstance(service, instancePublishInfo.get());result.add(instance);clusters.add(instance.getClusterName());}}}// cache clusters of this service// 缓存集群信息serviceClusterIndex.put(service, clusters);return new LinkedList<>(result);}private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {// 通过客户端ID,获取到Client,进而从其获取客户端发布的服务。Client client = clientManager.getClient(clientId);if (null == client) {return Optional.empty();}return Optional.ofNullable(client.getInstancePublishInfo(service));}

从查询实例这里,我们看到有个数据存储:ServiceStorage。重要的是,这个虽然叫存储,但是实际上里面的数据却是从别处获取来的。来源于:ServiceManager、ServiceIndexesManager、ClientManager。从这个角度说,更像是个缓存。

总结

上面的整了一堆源代码,容易看烦了。感兴趣的,可以根据上面的源码深入看看。为了方便大家,我画了图给大家:
在这里插入图片描述
为了让大家重点看到数据生产过程:
在这里插入图片描述
从图中,我们可以看到,nacos2.x的数据结构并不像官方的Service->Group->Instance。而是按照Connection、Client、Service分别通过对应的管理器进行管理。此外,为了避免数据多处存储,还有ClientServiceIndexesManager作为Client和Service的桥梁。
除此之外,还有ServiceStorage,作为数据缓存。不过,当我们深入了解ServiceStorage时,会发现他的数据一致性/数据的更新,是在给订阅服务的客户端定时推送时通过调用com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData来实现的。个人认为这是有可以优化空间的,他们完全可以通过各种事件来监听实例的生死来更新数据。

总而言之,如果不算ServiceStorage这个缓存,那么数据主要存在于一下的Manager中:
ConnectionManager、ClientManager、ServiceManager、ClientServiceIndexesManager。

到这里,可能有同学就有疑问了。那么Group、Cluster这些概念去哪了呢?
这些概念都成为了属性/字段了。
com.alibaba.nacos.naming.core.v2.pojo.Service#group
com.alibaba.nacos.api.naming.pojo.Instance#clusterName
即使在ServiceStorage封装ServiceInfo时,他们也是作为属性来存储的。通过ServiceUtil来过滤目标实例。

最后,提醒大家一下,我们这里只是分析了临时实例。是最常用的场景。当然,如果我们用Nacos的持久实例,SpringCloud也就自然支持了持久实例。不过,咱们不深究了,感兴趣的同学,可以顺着往下挖一挖持久实例。

后记

这种深度刨析源码、深挖一个技术细节的实现,太费时间、也太费篇幅了。我自己都感觉差点把整个nacos的源码都搬上来了。莫见怪。。。
关于nacos的一致性协议,就不在这里聊了,这个东西得单独倒腾,还要与其他分布式中间件相互对比,还有理论。。
下次,咱们先往后聊OpenFeign。

推荐

Nacos的实现原理在官网有一电子书《Nacos架构&原理》,想要了解顶层设计原理的同学,建议看看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/117146.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue的第2篇 第一个vue程序

一 环境的搭建 1.1常见前端开发ide 1.2 安装vs.code 1.下载地址&#xff1a;Visual Studio Code - Code Editing. Redefined 2.进行安装 1.2.1 vscode的中文插件安装 1.在搜索框输入“chinese” 2.安装完成重启&#xff0c;如下变成中文 1.2.2 修改工作区的颜色 选中[浅色]…

opencv 提取选中区域内指定hsv颜色的水印

基于《QT 插件化图像算法研究平台》做的功能插件。提取选中区域内指定hsv颜色的水印。 《QT 插件化图像算法研究平台》有个HSV COLOR PICK功能&#xff0c;可以很直观、方便地分析出水印 的hsv颜色&#xff0c;比如, 蓝色&#xff1a;100,180,0,255,100,255。 然后利用 opencv …

Django(10)-项目实战-对发布会管理系统进行测试并获取测试覆盖率

在发布会签到系统中使用django开发了发布会签到系统&#xff0c; 本文对该系统进行测试。 django.test django.test是Django框架中的一个模块&#xff0c;提供了用于编写和运行测试的工具和类。 django.test模块包含了一些用于测试的类和函数&#xff0c;如&#xff1a; Tes…

每日一题 1372二叉树中的最长交错路径

题目 给你一棵以 root 为根的二叉树&#xff0c;二叉树中的交错路径定义如下&#xff1a; 选择二叉树中 任意 节点和一个方向&#xff08;左或者右&#xff09;。如果前进方向为右&#xff0c;那么移动到当前节点的的右子节点&#xff0c;否则移动到它的左子节点。改变前进方…

vscode 清除全部的console.log

在放页面的大文件夹view上面右键点击在文件夹中查找 console.log.*$ 注意&#xff1a;要选择使用正则匹配 替换为 " " (空字符串)

跨模态可信感知

文章目录 跨模态可信感知综述摘要引言跨协议通信模式PCP网络架构 跨模态可信感知跨模态可信感知的概念跨模态可信感知的热点研究场景目前存在的挑战可能改进的方案 参考文献 跨模态可信感知综述 摘要 随着人工智能相关理论和技术的崛起&#xff0c;通信和感知领域的研究引入了…

ELK日志收集系统(四十九)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 二、组件 1. elasticsearch 2. logstash 2.1 工作过程 2.2 INPUT 2.3 FILETER 2.4 OUTPUTS 3. kibana 三、架构类型 3.1 ELK 3.2 ELKK 3.3 ELFK 3.5 EF…

VScode远程连接主机

一、前期准备 1、Windows安装VSCode&#xff1b; 2、在VSCode中安装PHP Debug插件&#xff1b; 3、安装好Docker 4、在容器中安装Xdebug ①写一个展现phpinfo的php文件 <?php phpinfo(); ?>②在浏览器上打开该文件 ③复制所有信息丢到Xdebug: Installation instr…

【C进阶】深度剖析数据在内存中的存储

目录 一、数据类型的介绍 1.类型的意义&#xff1a; 2.类型的基本分类 二、整形在内存中的存储 1.原码 反码 补码 2.大小端介绍 3.练习 三、浮点型在内存中的存储 1.一个例子 2.浮点数存储规则 一、数据类型的介绍 前面我们已经学习了基本的内置类型以及他们所占存储…

封装(个人学习笔记黑马学习)

1、格式 #include <iostream> using namespace std;const double PI 3.14;//设计一个圆类&#xff0c;求圆的周长 class Circle {//访问权限//公共权限 public://属性//半径int m_r;//行为//获取圆的周长double calculateZC() {return 2 * PI * m_r;} };int main() {//通…

Linux 学习笔记(1)——系统基本配置与开关机命令

目录 0、起步 0-1&#xff09;命令使用指引 0-2&#xff09;查看历史的命令记录 0-3&#xff09;清空窗口内容 0-4&#xff09;获取本机的内网 IP 地址 0-5&#xff09;获取本机的公网ip地址 0-6&#xff09;在window的命令行窗口中远程连接linux 0-7&#xff09;修改系…

VScode 国内下载源 以及 nvm版本控制器下载与使用

VScode 国内下载源 进入官网 https://code.visualstudio.com/ 点击下载 复制下载链接到新的浏览器标签 将地址中的/stable前的az764295.vo.msecnd.net换成vscode.cdn.azure.cn&#xff0c;再回车就会直接在下载列表啦。 参考大神博客 2.使用nvm 对 node 和npm进行版本控制…

ARM编程模型-内存空间和数据

ARM属于RISC体系&#xff0c;许多指令单周期指令&#xff0c;是32位读取/存储架构&#xff0c;对内存访问是32位&#xff0c;Load and store的架构&#xff0c;只有寄存器对内存&#xff0c;不能内存对内存存储&#xff0c;CPU通过寄存器对内存进行读写操作。 ARM的寻址空间是线…

go Session的实现(一)

〇、前言 众所周知&#xff0c;http协议是无状态的&#xff0c;这对于服务器确认是哪一个客户端在发请求是不可能的&#xff0c;因此为了能确认到&#xff0c;通常方法是让客户端发送请求时带上身份信息。容易想到的方法就是客户端在提交信息时&#xff0c;带上自己的账户和密…

17.看楼房

目录 Description Input Output Notes 思路 注意事项 C完整代码&#xff08;含详细注释&#xff09; Description 小张在暑假时间进行了暑期社会调查。调查的内容是楼房的颜色如何影响人们的心情。于是他找到了一个楼房从左到右排成一排的小区&#xff0c;这个小区一共有…

51单片机项目(7)——基于51单片机的温湿度测量仿真

本次做的设计&#xff0c;是利用DHT11传感器&#xff0c;测量环境的温度以及湿度&#xff0c;同时具备温度报警的功能&#xff1a;利用两个按键&#xff0c;设置温度阈值的加和减&#xff0c;当所测温度大于温度阈值的时候&#xff0c;蜂鸣器就会响起&#xff0c;进行报警提示。…

安卓 tcp 客户端

安卓 tcp 客户端 Server:8888 是Qt 写的Tcp 服务器 ip 是 192.168.2.103 port是8888 安卓手机运行 kotlin 语法的Tcp Client &#xff0c;连接&#xff0c;收发数据 效果如下图 Tcpclient package com.example.myapplicationimport android.os.Handler import android.os.Loo…

【leetcode 力扣刷题】数学题之计算次幂//次方:快速幂

利用乘法求解次幂问题—快速幂 50. Pow(x, n)372. 超级次方 50. Pow(x, n) 题目链接&#xff1a;50. Pow(x, n) 题目内容&#xff1a; 题目就是要求我们去实现计算x的n次方的功能函数&#xff0c;类似c的power()函数。但是我们不能使用power()函数直接得到答案&#xff0c;那…

ARM Cortex-M 的 SP

文章目录 1、栈2、栈操作3、Cortex-M中的栈4、MDK中的SP操作流程5、Micro-Lib的SP差别1. 使用 Micro-Lib2. 未使用 Micro-Lib 在嵌入式开发中&#xff0c;堆栈是一个很基础&#xff0c;同时也是非常重要的名词&#xff0c;堆栈可分为堆 (Heap) 和栈 (Stack) 。 栈(Stack): 一种…

EG1164大功率同步整流升压模块开源,最高效率97%

EG1164大功率同步整流Boost升压电源模块&#xff0c;最高效率97%&#xff0c;输入电压8~50V&#xff0c;输出电压8~60V可调&#xff0c;最大功率300瓦以上&#xff0c;开关频率219kHz。 白嫖了张嘉立创的彩色丝印券就随便画了个板试试&#xff0c;第一次打彩色丝印。 因为我测…