【踩坑记录】
本人下载的Nacos 服务端版本是2.3.2,在开始进行源码编译便遇到问题,下面是各个问题记录
源码大量爆红
在最开始用Idea加载Maven项目的时候,发现项目中大量的代码爆红,提示其类或者包不存在,后来结果查阅资料发现,是Nacos在2.x的版本引入了Grpc通信方式,并且采用Protobuf作为序列化协议。
导致源码中有一些类是采用其进行编写的,我们需要对其进行编译,才能生成对应的包和Class,大家如果在阅读其他源码时遇到这种情况,可以向这个方面去想一想。
具体来说有图中两个模块,需要编译,但是直接在父项目进行Compile即可,编译后重新刷新项目或者重新构建即可。
断点失效?
在成功启动Nacos服务的时候,是采用断点调试方式启动的
同时也对其进行设置了单机模式
最后也成功启动了,就在我创建Demo项目后,进行启动。
控制台也有了对应的服务(这里需要提示一下,Nacos的客户端必须要有 name 这个配置,否则不会注册
spring:application:name: nacos-democloud:nacos:discovery:server-addr: 127.0.0.1:8841ephemeral: true // 是否临时注册,默认为true
通过上一篇客户端的文章,跨域知道,其底层进行注册时是调用
this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
可以知道,其接口地址是/nacos/v1/ns/instance,于是我兴高采烈的去服务的打断点,如下
然后重新启动客户端,奇怪的是,并没有在断点处阻塞,而是直接注册成功了,这我非常蒙,就在绞尽脑汁的时候,突然想到前面不是说了,引入了Grpc吗,后面一搜,果然,2.x版本默认采用grpc进行交互。那这个坑到这就解决了。
对于Nacos 2.x版本,默认是通过gRPC协议进行通信的
正确的入口在这。
实例注册源码分析
1.0 入口
客户端调用入口,首先通过传入的元信息,插件Service,然后根据操作类型,进入对应的函数;
2.0注册流程开始
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)throws NacosException {clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),request.getInstance().getIp(), request.getInstance().getPort()));return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}
2.1 注册并发布事件
@Overridepublic void registerInstance(Service service, Instance instance, String clientId) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);// 通过单例模式 + map 获取对应的服务(没有就添加到对应的map中)Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM,String.format("Current service %s is persistent service, can't register ephemeral instance.",singleton.getGroupedServiceName()));}// 获取当前连接ID,也是通过并发map 管理连接,注意不要长时间阻塞断点,否则连接会断开,导致拿不到Client client = clientManager.getClient(clientId);checkClientIsLegal(client, clientId);InstancePublishInfo instanceInfo = getPublishInfo(instance);client.addServiceInstance(singleton, instanceInfo);// 设置当前连接最新更新的时间client.setLastUpdatedTime();// 重新设置版本号,用于版本控制client.recalculateRevision();// 发布事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}
3.0 NotifyCenter - 统一事件通知中心。
/** Copyright 1999-2018 Alibaba Group Holding Ltd.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR;
/*** Unified Event Notify Center.* 统一事件通知中心 - 实现了一个事件发布-订阅框架,用于处理系统内的事件通知** @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>* @author zongtanghu*/
public class NotifyCenter {// 日志记录器private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);// 环形缓冲区大小 - 用于普通事件发布器的队列大小public static int ringBufferSize;// 共享缓冲区大小 - 用于慢速事件共享发布器的队列大小public static int shareBufferSize;// 通知中心关闭状态标志 - 使用原子布尔值确保线程安全private static final AtomicBoolean CLOSED = new AtomicBoolean(false);// 默认事件发布器工厂 - 用于创建不同类型的事件发布器private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;// NotifyCenter单例实例 - 通过静态final保证单例private static final NotifyCenter INSTANCE = new NotifyCenter();// 共享发布器实例 - 用于处理所有SlowEvent慢速事件private DefaultSharePublisher sharePublisher;// 事件发布器类型 - 通过SPI加载或使用默认实现private static Class<? extends EventPublisher> clazz;/*** 发布器管理容器 - 存储不同事件类型对应的发布器* key: 事件类的规范名称* value: 对应的事件发布器*/private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);// 静态初始化块 - 在类首次加载时执行初始化static {// 从系统属性读取环形缓冲区大小,默认为16384// 对于写入吞吐量高的应用,需要适当增加这个值String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);// 从系统属性读取共享缓冲区大小,默认为1024// 用于公共发布器的消息暂存队列缓冲区大小String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);// 通过SPI机制加载EventPublisher的实现类final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();// 如果找到自定义实现类,使用第一个;否则使用默认实现if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {clazz = DefaultPublisher.class;}// 初始化默认发布器工厂 - 使用lambda表达式实现工厂接口DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {// 创建发布器实例并初始化EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};try {// 创建并初始化共享发布器实例 - 用于处理所有SlowEventINSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);}// 添加JVM关闭钩子,确保系统关闭时能够正确释放资源ThreadUtils.addShutdownHook(NotifyCenter::shutdown);}/*** 获取发布器映射表 - 仅用于测试** @return 发布器映射表*/@JustForTestpublic static Map<String, EventPublisher> getPublisherMap() {return INSTANCE.publisherMap;}/*** 根据事件类型获取对应的发布器** @param topic 事件类型* @return 对应的事件发布器*/public static EventPublisher getPublisher(Class<? extends Event> topic) {// 如果是SlowEvent类型,使用共享发布器if (ClassUtils.isAssignableFrom(SlowEvent.class, topic)) {return INSTANCE.sharePublisher;}// 否则从映射表中获取对应的发布器return INSTANCE.publisherMap.get(topic.getCanonicalName());}/*** 获取共享发布器实例** @return 共享发布器实例*/public static EventPublisher getSharePublisher() {return INSTANCE.sharePublisher;}/*** 关闭通知中心及其包含的所有发布器实例* 使用CAS操作确保只执行一次*/public static void shutdown() {// 如果已经关闭,则直接返回,避免重复关闭if (!CLOSED.compareAndSet(false, true)) {return;}LOGGER.warn("[NotifyCenter] Start destroying Publisher");// 关闭所有普通发布器for (Map.Entry<String, EventPublisher> entry : INSTANCE.publisherMap.entrySet()) {try {EventPublisher eventPublisher = entry.getValue();eventPublisher.shutdown();} catch (Throwable e) {LOGGER.error("[EventPublisher] shutdown has error : ", e);}}// 关闭共享发布器try {INSTANCE.sharePublisher.shutdown();} catch (Throwable e) {LOGGER.error("[SharePublisher] shutdown has error : ", e);}LOGGER.warn("[NotifyCenter] Destruction of the end");}/*** 注册订阅者(使用默认发布器工厂)* 如果发布器不存在,会使用默认工厂创建一个新的发布器** @param consumer 订阅者实例*/public static void registerSubscriber(final Subscriber consumer) {registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);}/*** 使用指定工厂注册订阅者* 如果发布器不存在,会使用指定工厂创建一个新的发布器** @param consumer 订阅者实例* @param factory 发布器工厂*/public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {// 处理智能订阅者 - 可以订阅多种事件类型// 如果要监听多个事件,需要分别进行处理// 基于子类的subscribeTypes方法返回的列表,可以注册到发布器if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// 对于慢速事件,注册到共享发布器if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {// 对于普通事件,注册到对应的发布器addSubscriber(consumer, subscribeType, factory);}}return;}// 处理普通订阅者 - 只订阅一种事件类型final Class<? extends Event> subscribeType = consumer.subscribeType();// 如果是慢速事件,注册到共享发布器if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);return;}// 对于普通事件,注册到对应的发布器addSubscriber(consumer, subscribeType, factory);}/*** 将订阅者添加到发布器中* 如果发布器不存在,会先创建发布器** @param consumer 订阅者实例* @param subscribeType 订阅的事件类型* @param factory 发布器工厂*/private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,EventPublisherFactory factory) {// 获取事件类型的规范名称作为topicfinal String topic = ClassUtils.getCanonicalName(subscribeType);synchronized (NotifyCenter.class) {// 确保发布器存在,如果不存在则创建// 注释说MapUtils.computeIfAbsent是不安全的方法,这里使用自定义的MapUtilMapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);}// 获取发布器并添加订阅者EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher instanceof ShardedEventPublisher) {// 如果是分片发布器,需要传入订阅类型((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);} else {// 普通发布器直接添加订阅者publisher.addSubscriber(consumer);}}/*** 取消订阅者的注册** @param consumer 订阅者实例* @throws NoSuchElementException 如果订阅者没有对应的发布器*/public static void deregisterSubscriber(final Subscriber consumer) {// 处理智能订阅者 - 需要逐一取消多个事件类型的订阅if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {// 从共享发布器中移除订阅INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);} else {// 从普通发布器中移除订阅removeSubscriber(consumer, subscribeType);}}return;}// 处理普通订阅者 - 只有一个订阅类型final Class<? extends Event> subscribeType = consumer.subscribeType();if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {// 从共享发布器中移除订阅INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);return;}// 从普通发布器中移除订阅,如果移除失败则抛出异常if (removeSubscriber(consumer, subscribeType)) {return;}throw new NoSuchElementException("The subscriber has no event publisher");}/*** 从发布器中移除订阅者** @param consumer 订阅者实例* @param subscribeType 订阅的事件类型* @return 移除是否成功*/private static boolean removeSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {// 获取事件类型的规范名称作为topicfinal String topic = ClassUtils.getCanonicalName(subscribeType);// 查找对应的发布器EventPublisher eventPublisher = INSTANCE.publisherMap.get(topic);if (null == eventPublisher) {return false;}// 根据发布器类型调用不同的移除方法if (eventPublisher instanceof ShardedEventPublisher) {((ShardedEventPublisher) eventPublisher).removeSubscriber(consumer, subscribeType);} else {eventPublisher.removeSubscriber(consumer);}return true;}/*** 请求发布器发布事件* 发布器采用懒加载模式,只有在实际发布事件时才会调用publisher.start()** @param event 事件实例* @return 发布是否成功*/public static boolean publishEvent(final Event event) {try {// 通过事件实例获取事件类型,并调用内部发布方法return publishEvent(event.getClass(), event);} catch (Throwable ex) {// 捕获所有异常,确保不影响调用方LOGGER.error("There was an exception to the message publishing : ", ex);return false;}}/*** 请求发布器发布事件的内部实现** @param eventType 事件类型* @param event 事件实例* @return 发布是否成功*/private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {// 如果是慢速事件,使用共享发布器发布if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}// 获取事件类型的规范名称作为topicfinal String topic = ClassUtils.getCanonicalName(eventType);// 查找对应的发布器并发布事件EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}// 对于插件事件,允许没有对应发布器if (event.isPluginEvent()) {return true;}// 找不到发布器,记录警告日志LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}/*** 注册到共享发布器* 用于将慢速事件类型注册到共享发布器** @param eventType 慢速事件类型* @return 共享发布器实例*/public static EventPublisher registerToSharePublisher(final Class<? extends SlowEvent> eventType) {return INSTANCE.sharePublisher;}/*** 使用默认工厂注册发布器** @param eventType 事件类型* @param queueMaxSize 发布器队列最大大小* @return 注册的发布器实例*/public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);}/*** 使用指定工厂注册发布器** @param eventType 事件类型* @param factory 发布器工厂* @param queueMaxSize 发布器队列最大大小* @return 注册的发布器实例*/public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {// 如果是慢速事件,直接返回共享发布器if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}// 获取事件类型的规范名称作为topicfinal String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// 确保发布器存在,如果不存在则创建// 注释说MapUtils.computeIfAbsent是不安全的方法,这里使用自定义的MapUtilMapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);}/*** 注册指定的发布器* 允许用户提供自定义的发布器实例** @param eventType 事件类型* @param publisher 指定的事件发布器*/public static void registerToPublisher(final Class<? extends Event> eventType, final EventPublisher publisher) {// 空检查,避免空指针异常if (null == publisher) {return;}// 获取事件类型的规范名称作为topicfinal String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// 只有在不存在时才放入,避免覆盖已有发布器INSTANCE.publisherMap.putIfAbsent(topic, publisher);}}/*** 取消注册发布器* 将指定事件类型的发布器从管理容器中移除并关闭** @param eventType 事件类型*/public static void deregisterPublisher(final Class<? extends Event> eventType) {// 获取事件类型的规范名称作为topicfinal String topic = ClassUtils.getCanonicalName(eventType);// 从管理容器中移除发布器EventPublisher publisher = INSTANCE.publisherMap.remove(topic);try {// 关闭发布器,释放资源publisher.shutdown();} catch (Throwable ex) {LOGGER.error("There was an exception when publisher shutdown : ", ex);}}}
Nacos NotifyCenter 核心设计与功能
一、整体架构
NotifyCenter 是 Nacos 中的统一事件通知中心,实现了一个高效的事件发布-订阅框架,用于系统内部组件间的解耦通信。它采用单例模式,集中管理事件的发布和订阅,支持高吞吐量的事件处理。
┌────────────────────────────────────────────────────────┐
│ NotifyCenter │
├────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ │
│ │ 事件发布管理 │ │ 订阅者管理 │ │ 资源管理 │ │
│ └─────────────┘ └─────────────┘ └───────────┘ │
│ │
└────────────────────────────────────────────────────────┘▲ ▲ ▲│ │ │
┌─────────┴──────┐ ┌────────┴────────┐ ┌────┴─────┐
│ Publisher │ │ Subscriber │ │ Event │
└────────────────┘ └─────────────────┘ └──────────┘
二、核心设计特点
1. 双类型事件机制
NotifyCenter 将事件分为两类,并采用不同的处理策略:
- 普通事件:每种事件类型对应一个专用发布器,存储在
publisherMap
中 - 慢速事件(SlowEvent):所有慢速事件共享一个发布器
sharePublisher
,避免阻塞普通事件处理
// 决定事件路由到哪个发布器
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);
} else {EventPublisher publisher = INSTANCE.publisherMap.get(topic);return publisher.publish(event);
}
2. 延迟加载与按需创建
发布器采用懒加载模式,仅在需要时才创建,节约系统资源:
// 发布器不存在时才创建
synchronized (NotifyCenter.class) {MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, ringBufferSize);
}
3. 可配置的缓冲区大小
通过系统属性支持自定义缓冲区大小,适应不同场景的性能需求:
// 读取系统配置设置缓冲区大小
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
4. SPI 扩展机制
使用 Java SPI 机制支持自定义发布器实现,提高框架扩展性:
// 通过SPI加载自定义发布器实现
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
5. 线程安全设计
使用并发工具和同步机制确保多线程环境下的正确性:
- 采用
ConcurrentHashMap
存储发布器 - 使用
synchronized
块保护关键操作 - 使用
AtomicBoolean
确保关闭操作仅执行一次
三、核心功能模块
1. 订阅者管理
支持注册和注销订阅者,支持两类订阅者:
- 普通订阅者(Subscriber):只订阅一种事件类型
- 智能订阅者(SmartSubscriber):可同时订阅多种事件类型
public static void registerSubscriber(final Subscriber consumer) {// 针对SmartSubscriber,遍历其所有订阅类型if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// 分别注册每种事件类型// ...}} else {// 针对普通Subscriber处理单一事件类型// ...}
}
2. 发布器管理
维护事件类型到发布器的映射,支持注册、获取和注销发布器:
// 发布器容器
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);// 注册发布器
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {// ...
}// 获取发布器
public static EventPublisher getPublisher(Class<? extends Event> topic) {// ...
}// 注销发布器
public static void deregisterPublisher(final Class<? extends Event> eventType) {// ...
}
3. 事件发布
提供统一的事件发布入口,根据事件类型路由到对应发布器:
public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : ", ex);return false;}
}
4. 资源管理
提供优雅的资源释放机制,确保系统关闭时能正确清理资源:
public static void shutdown() {if (!CLOSED.compareAndSet(false, true)) {return;}// 关闭所有普通发布器for (EventPublisher publisher : publisherMap.values()) {publisher.shutdown();}// 关闭共享发布器sharePublisher.shutdown();
}
四、使用流程示例
1. 定义事件
// 普通事件
public class ConfigChangeEvent extends Event {// 事件内容
}// 慢速事件
public class ServiceChangeEvent extends SlowEvent {// 事件内容
}
2. 创建订阅者
// 普通订阅者
public class ConfigChangeListener implements Subscriber<ConfigChangeEvent> {@Overridepublic void onEvent(ConfigChangeEvent event) {// 处理事件}@Overridepublic Class<? extends Event> subscribeType() {return ConfigChangeEvent.class;}
}
3. 注册订阅者
NotifyCenter.registerSubscriber(new ConfigChangeListener());
4. 发布事件
ConfigChangeEvent event = new ConfigChangeEvent();
NotifyCenter.publishEvent(event);
五、设计优势
- 高性能:通过区分快慢事件、使用环形缓冲区提高事件处理吞吐量
- 高可靠:完善的异常处理和资源管理,确保系统稳定性
- 良好扩展性:通过SPI机制支持定制发布器实现
- 灵活配置:可通过系统属性调整性能参数
- 解耦设计:发布者和订阅者完全解耦,提高系统模块化程度
NotifyCenter 作为 Nacos 中的核心基础设施,为整个系统提供了高效可靠的事件通知机制,是理解 Nacos 内部通信机制的关键组件。