基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。
此前我们学习了调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象的整体流程,我们知道会调用createInvokerForRemote方法创建远程引用Invoker,这是Dubbo 3 服务引用的核心方法,我们现在来接着学习createInvokerForRemote方法。
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
文章目录
- 1 createInvokerForRemote创建远程引用Invoker
- 2 Protocol$Adaptive自适应Protocol
- 3 ProtocolSerializationWrapper协议序列化包装器
- 4 ProtocolFilterWrapper协议过滤器包装器
- 5 ProtocolListenerWrapper协议监听器包装器
- 6 总结
1 createInvokerForRemote创建远程引用Invoker
远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。该方法对于一个注册中心url和多个注册中心url的处理不一样,我们仅看一个注册中心的情况。
一个注册中心的情况下,该方法主要逻辑就是执行protocolSPI.refer方法,通过协议protocolSPI引用服务Invoker。
这里的protocolSPI是Protocol的自适应扩展实现,即Protocol$Adaptive,将会根据url的协议选择Protocol实现,然后调用Protocol#refer方法引用服务。
我们下面主要看protocolSPI#refer方法方法的相关源码,这也是Dubbo服务引入的核心流程之一。
/*** ReferenceConfig的方法* <p>* 创建远程引用Invoker*/
@SuppressWarnings({"unchecked", "rawtypes"})
private void createInvokerForRemote() {//一个url,表示一个注册中心或者直连地址,这是大多数情况//url例如: registry://47.94.229.245:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=demo1&application=demo-consumer&dubbo=2.0.2&pid=34457®istry=zookeeper&timeout=20000×tamp=1666854892012if (urls.size() == 1) {//获取注册中心协议urlURL curUrl = urls.get(0);/** 通过协议protocolSPI引用服务,该方法固定返回InjvmInvoker实例,内部没有NettyClient,因为不需要发起网络调用** 这里的protocolSPI是Protocol的自适应扩展实现,即Protocol$Adaptive* 将会根据url的协议选择Protocol实现,然后调用Protocol#refer方法引用服务*/invoker = protocolSPI.refer(interfaceClass, curUrl);// registry url, mesh-enable and unloadClusterRelated is true, not need Cluster.//如果是registry url(非直连地址), 或者unloadClusterRelated为true,那么不需要Cluster,否则需要Cluster包装//对于远程注册中心协议来说,在protocolSPI.refer方法中就已经进行了Cluster包装,这里不再需要了if (!UrlUtils.isRegistry(curUrl) &&!curUrl.getParameter(UNLOAD_CLUSTER_RELATED, false)) {List<Invoker<?>> invokers = new ArrayList<>();invokers.add(invoker);invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);}}//多个url,表示多个注册中心或者直连地址else {List<Invoker<?>> invokers = new ArrayList<>();URL registryUrl = null;for (URL url : urls) {// For multi-registry scenarios, it is not checked whether each referInvoker is available.// Because this invoker may become available later.invokers.add(protocolSPI.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// use last registry urlregistryUrl = url;}}if (registryUrl != null) {// registry url is available// for multi-subscription scenario, use 'zone-aware' policy by defaultString cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker// (RegistryDirectory, routing happens here) -> Invokerinvoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);} else {// not a registry url, must be direct invoke.if (CollectionUtils.isEmpty(invokers)) {throw new IllegalArgumentException("invokers == null");}URL curUrl = invokers.get(0).getUrl();String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);}}
}
2 Protocol$Adaptive自适应Protocol
Protocol$Adaptive的refer方法将会根据url的协议基于Dubbo SPI机制选择Protocol实现,然后调用Protocol#refer方法引用服务,默认dubbo协议。
/*** Protocol$Adaptive的方法* 基于url协议参数的自适应引用服务** @param arg0 服务class* @param arg1 远程服务的url地址* @return Invoker*/
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {if (arg1 == null) throw new IllegalArgumentException("url == null");org.apache.dubbo.common.URL url = arg1;//获取url协议作为扩展名,默认dubboString extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null)throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);//基于DUbbo SPI机制查找指定扩展名的Protocol实现类,默认DubboProtocol,这里会进行wrapper包装org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);//通过具体的Protocol包装实现类的refer方法实现服务的引用return extension.refer(arg0, arg1);
}
同服务导出时一样,获取Protocol的时候,将会经过wrapper的包装。以InjvmProtocol协议为例,可以看到经过了三层包装,调用时由外向内调用,即ProtocolSerializationWrapper -> ProtocolFilterWrapper -> ProtocolListenerWrapper -> InjvmProtocol(具体的Protocol实现)。实际上Dubbo正式采用warpper机制和装饰设计模式实现类似aop的功能。
本地引入的injvm协议对应InjvmProtocol,需要引入远程接口级注册中心的registry对应InterfaceCompatibleRegistryProtocol,需要引入远程应用级注册中心的service-discovery-registry对应RegistryProtocol。
下面我们分别讲解这些wrapper和protocol是如何进行服务引入的!
3 ProtocolSerializationWrapper协议序列化包装器
protocol的最外层wrapper,它仅会在导出服务的export方法中起作用,在引入服务的refer方法中没有其他处理。
/*** ProtocolSerializationWrapper的方法*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//无特殊处理return protocol.refer(type, url);
}
4 ProtocolFilterWrapper协议过滤器包装器
这个包装器首先会判断如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法。
否则,获取服务url对应的Filter并且构建为一个InvokerChain对象返回,内部包含了一个下层refer方法的Invoker和一条过滤器调用链。
/*** ProtocolFilterWrapper的方法*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法if (UrlUtils.isRegistry(url)) {return protocol.refer(type, url);}//获取服务url对应的Filter并且构建为一个InvokerChain对象返回,内部包含了一个下层refer方法的Invoker和一条过滤器调用链。FilterChainBuilder builder = getFilterChainBuilder(url);return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
5 ProtocolListenerWrapper协议监听器包装器
这个包装器首先会判断如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法。
否则,调用下一层refer方法获取返回的Invoker,如果url不包含registry-cluster-type参数,将返回的Invoker包装为ListenerInvokerWrapper,内部包含了一个Invoker和一个监听器列表。
/*** ProtocolListenerWrapper的方法*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法if (UrlUtils.isRegistry(url)) {return protocol.refer(type, url);}//调用下一层refer方法获取返回的InvokerInvoker<T> invoker = protocol.refer(type, url);//如果url不包含registry-cluster-type参数if (StringUtils.isEmpty(url.getParameter(REGISTRY_CLUSTER_TYPE_KEY))) {//将返回的Invoker包装为ListenerInvokerWrapper,内部包含了一个Invoker和一个监听器列表invoker = new ListenerInvokerWrapper<>(invoker,Collections.unmodifiableList(ScopeModelUtil.getExtensionLoader(InvokerListener.class, invoker.getUrl().getScopeModel()).getActivateExtension(url, INVOKER_LISTENER_KEY)));}return invoker;
}
6 总结
本次我们学习了createInvokerForRemote方法中的Wrapper有哪些以及作用,接下来我们将会的学习真正的本地、应用级别、接口级别的Protocol的引入逻辑。