Dubbo中暴露服务的过程解析

原文链接
dubbo暴露服务有两种情况,一种是设置了延迟暴露(比如delay=”5000”),另外一种是没有设置延迟暴露或者延迟设置为-1(delay=”-1”):

  • 设置了延迟暴露,dubbo在Spring实例化bean(initializeBean)的时候会对实现了InitializingBean的类进行回调,回调方法是afterPropertySet(),如果设置了延迟暴露,dubbo在这个方法中进行服务的发布。
  • 没有设置延迟或者延迟为-1,dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的类进行回调onApplicationEvent,dubbo会在这个方法中发布服务。

但是不管延迟与否,都是使用ServiceConfig的export()方法进行服务的暴露。使用export初始化的时候会将Bean对象转换成URL格式,所有Bean属性转换成URL的参数。

过程

以没有设置延迟暴露熟属性的过程为例。

简易的暴露流程

  1. 首先将服务的实现封装成一个Invoker,Invoker中封装了服务的实现类。
  2. 将Invoker封装成Exporter,并缓存起来,缓存里使用Invoker的url作为key。
  3. 服务端Server启动,监听端口。(请求来到时,根据请求信息生成key,到缓存查找Exporter,就找到了Invoker,就可以完成调用。)

Spring容器初始化调用

当Spring容器实例化bean完成,走到最后一步发布ContextRefreshEvent事件的时候,ServiceBean会执行onApplicationEvent方法,该方法调用ServiceConfig的export方法。

ServiceConfig初始化的时候,会先初始化静态变量protocol和proxyFactory,这两个变量初始化的结果是通过dubbo的spi扩展机制得到的。

生成的protocol实例是:

1package com.alibaba.dubbo.rpc; 2import com.alibaba.dubbo.common.extension.ExtensionLoader; 3 4public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { 5 public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class { 6 if (arg1 == null) 7 throw new IllegalArgumentException("url == null"); 8 9 com.alibaba.dubbo.common.URL url = arg1; 10 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 11 if(extName == null) 12 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 13 14 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 15 16 return extension.refer(arg0, arg1); 17 } 18 19 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { 20 if (arg0 == null) 21 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 22 23 if (arg0.getUrl() == null) 24 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); 25 //根据URL配置信息获取Protocol协议,默认是dubbo 26 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 27 if(extName == null) 28 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 29 //根据协议名,获取Protocol的实现 30 //获得Protocol的实现过程中,会对Protocol先进行依赖注入,然后进行Wrapper包装,最后返回被修改过的Protocol 31 //包装经过了ProtocolFilterWrapper,ProtocolListenerWrapper,RegistryProtocol 32 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 33 34 return extension.export(arg0); 35 } 36 37 public void destroy() { 38 throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); 39 } 40 41 public int getDefaultPort() { 42 throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); 43 } 44} 45

生成的proxyFactory实例:

1package com.alibaba.dubbo.rpc; 2import com.alibaba.dubbo.common.extension.ExtensionLoader; 3public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory { 4 public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { 5 if (arg2 == null) 6 throw new IllegalArgumentException("url == null"); 7 8 com.alibaba.dubbo.common.URL url = arg2; 9 String extName = url.getParameter("proxy", "javassist"); 10 if(extName == null) 11 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); 12 13 com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); 14 15 return extension.getInvoker(arg0, arg1, arg2); 16 } 17 18 public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { 19 if (arg0 == null) 20 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 21 22 if (arg0.getUrl() == null) 23 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); 24 25 String extName = url.getParameter("proxy", "javassist"); 26 if(extName == null) 27 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); 28 29 com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); 30 31 return extension.getProxy(arg0); 32 } 33} 34

生成的代码中可以看到,默认的Protocol实现是dubbo,默认的proxy是javassist。

ServiceConfig的export

export的步骤简介

  1. 首先会检查各种配置信息,填充各种属性,总之就是保证我在开始暴露服务之前,所有的东西都准备好了,并且是正确的。
  2. 加载所有的注册中心,因为我们暴露服务需要注册到注册中心中去。
  3. 根据配置的所有协议和注册中心url分别进行导出。
  4. 进行导出的时候,又是一波属性的获取设置检查等操作。
  5. 如果配置的不是remote,则做本地导出。
  6. 如果配置的不是local,则暴露为远程服务。
  7. 不管是本地还是远程服务暴露,首先都会获取Invoker。
  8. 获取完Invoker之后,转换成对外的Exporter,缓存起来。

export方法先判断是否需要延迟暴露(这里我们使用的是不延迟暴露),然后执行doExport方法。

doExport方法先执行一系列的检查方法,然后调用doExportUrls方法。检查方法会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。

doExportUrls方法先调用loadRegistries获取所有的注册中心url,然后遍历调用doExportUrlsFor1Protocol方法。对于在标签中指定了registry属性的Bean,会在加载BeanDefinition的时候就加载了注册中心。

获取注册中心url,会把注册的信息都放在一个URL对象中,一个URL内容如下:

1registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&organization=china&owner=cheng.xi&pid=2939&registry=zookeeper&timestamp=1488898049284 2

doExportUrlsFor1Protocol根据不同的协议将服务以URL形式暴露。如果scope配置为none则不暴露,如果服务未配置成remote,则本地暴露exportLocal,如果未配置成local,则注册服务registryProcotol。

这里的URL是:

1dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider&application.version=1.0&delay=5000&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=china&owner=cheng.xi&pid=2939&side=provider&timestamp=1488898464953 2

这时候会先做本地暴露,exportLocal(url);:

1private void exportLocal(URL url) { 2 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { 3 //这时候转成本地暴露的url:injvm://127.0.0.1/dubbo.common.hello.service.HelloService?anyhost=true& 4 //application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product& 5 //interface=dubbo.common.hello.service.HelloService&methods=sayHello& 6 //organization=china&owner=cheng.xi&pid=720&side=provider&timestamp=1489716708276 7 URL local = URL.valueOf(url.toFullString()) 8 .setProtocol(Constants.LOCAL_PROTOCOL) 9 .setHost(NetUtils.LOCALHOST) 10 .setPort(0); 11 //首先还是先获得Invoker 12 //然后导出成Exporter,并缓存 13 //这里的proxyFactory实际是JavassistProxyFactory 14 //有关详细的获得Invoke以及exporter会在下面的流程解析,在本地暴露这个流程就不再说明。 15 Exporter<?> exporter = protocol.export( 16 proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 17 exporters.add(exporter); 18 logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry"); 19 } 20} 21

接下来是暴露为远程服务,跟本地暴露的流程一样还是先获取Invoker,然后导出成Exporter:

1//根据服务具体实现,实现接口,以及registryUrl通过ProxyFactory将HelloServiceImpl封装成一个本地执行的Invoker 2//invoker是对具体实现的一种代理。 3//这里proxyFactory是上面列出的生成的代码 4 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 5 //使用Protocol将invoker导出成一个Exporter 6 //暴露封装服务invoker 7 //调用Protocol生成的适配类的export方法 8 //这里的protocol是上面列出的生成的代码 9 Exporter<?> exporter = protocol.export(invoker); 10

关于Invoker,Exporter等的解释参见最下面的内容。

暴露远程服务时的获取Invoker过程

服务实现类转换成Invoker,大概的步骤是:

  1. 根据上面生成的proxyFactory方法调用具体的ProxyFactory实现类的getInvoker方法获取Invoker。

  2. getInvoker的过程是,首先对实现类做一个包装,生成一个包装后的类。

  3. 然后新创建一个Invoker实例,这个Invoker中包含着生成的Wrapper类,Wrapper类中有具体的实现类。

1Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 2

这行代码中包含服务实现类转换成Invoker的过程,其中proxyFactory是上面列出的动态生成的代码,其中getInvoker的代码为(做了精简,把包都去掉了):

1public Invoker getInvoker(Object arg0, Class arg1, URL arg2) throws Object { 2 if (arg2 == null) throw new IllegalArgumentException("url == null"); 3 //传进来的url是dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider 4 //&application.version=1.0&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=china&owner=cheng.xi 5 //&pid=28191&side=provider&timestamp=1489027396094 6 URL url = arg2; 7 //没有proxy参数配置,默认使用javassist 8 String extName = url.getParameter("proxy", "javassist"); 9 if(extName == null) throw new IllegalStateException("Fail to get extension(ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); 10 //这一步就使用javassist来获取ProxyFactory的实现类JavassistProxyFactory 11 ProxyFactory extension = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName); 12 //JavassistProxyFactory的getInvoker方法 13 return extension.getInvoker(arg0, arg1, arg2); 14} 15

JavassistProxyFactory的getInvoker方法:

1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 2 // TODO Wrapper类不能正确处理带$的类名 3 //第一步封装一个Wrapper类 4 //该类是手动生成的 5 //如果类是以$开头,就使用接口类型获取,其他的使用实现类获取 6 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); 7 //返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod 8 //关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用 9 return new AbstractProxyInvoker<T>(proxy, type, url) { 10 @Override 11 protected Object doInvoke(T proxy, String methodName, 12 Class<?>[] parameterTypes, 13 Object[] arguments) throws Throwable { 14 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 15 } 16 }; 17} 18

生成wrapper类的过程,首先看getWrapper方法:

1public static Wrapper getWrapper(Class<?> c){ 2 while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class. 3 c = c.getSuperclass(); 4 //Object类型的 5 if( c == Object.class ) 6 return OBJECT_WRAPPER; 7 //先去Wrapper缓存中查找 8 Wrapper ret = WRAPPER_MAP.get(c); 9 if( ret == null ) { 10 //缓存中不存在,生成Wrapper类,放到缓存 11 ret = makeWrapper(c); 12 WRAPPER_MAP.put(c,ret); 13 } 14 return ret; 15} 16

makeWrapper方法代码不在列出,太长了。就是生成一个继承自Wrapper的类,最后的结果大概是:

1public class Wrapper1 extends Wrapper { 2 public static String[] pns; 3 public static Map pts; 4 public static String[] mns; // all method name array. 5 public static String[] dmns; 6 public static Class[] mts0; 7 8 public String[] getPropertyNames() { 9 return pns; 10 } 11 12 public boolean hasProperty(String n) { 13 return pts.containsKey($1); 14 } 15 16 public Class getPropertyType(String n) { 17 return (Class) pts.get($1); 18 } 19 20 public String[] getMethodNames() { 21 return mns; 22 } 23 24 public String[] getDeclaredMethodNames() { 25 return dmns; 26 } 27 28 public void setPropertyValue(Object o, String n, Object v) { 29 dubbo.provider.hello.service.impl.HelloServiceImpl w; 30 try { 31 w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); 32 } catch (Throwable e) { 33 throw new IllegalArgumentException(e); 34 } 35 throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl."); 36 } 37 38 public Object getPropertyValue(Object o, String n) { 39 dubbo.provider.hello.service.impl.HelloServiceImpl w; 40 try { 41 w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); 42 } catch (Throwable e) { 43 throw new IllegalArgumentException(e); 44 } 45 throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl."); 46 } 47 48 public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { 49 dubbo.provider.hello.service.impl.HelloServiceImpl w; 50 try { 51 w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); 52 } catch (Throwable e) { 53 throw new IllegalArgumentException(e); 54 } 55 try { 56 if ("sayHello".equals($2) && $3.length == 0) { 57 w.sayHello(); 58 return null; 59 } 60 } catch (Throwable e) { 61 throw new java.lang.reflect.InvocationTargetException(e); 62 } 63 throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.provider.hello.service.impl.HelloServiceImpl."); 64 } 65} 66

生成完Wrapper以后,返回一个AbstractProxyInvoker实例。至此生成Invoker的步骤就完成了。可以看到Invoker执行方法的时候,会调用Wrapper的invokeMethod,这个方法中会有真实的实现类调用真实方法的代码。

暴露远程服务时导出Invoker为Exporter

Invoker导出为Exporter分为两种情况,第一种是Registry类型的Invoker,第二种是其他协议类型的Invoker,分开解析。

代码入口:

1Exporter<?> exporter = protocol.export(invoker); 2

Registry类型的Invoker处理过程

大概的步骤是:

  1. 经过两个不用做任何处理的Wrapper类,然后到达RegistryProtocol中。
  2. 通过具体的协议导出Invoker为Exporter。
  3. 注册服务到注册中心。
  4. 订阅注册中心的服务。
  5. 生成一个新的Exporter实例,将上面的Exporter进行引入,然后返回。

protocol是上面列出的动态生成的代码,会先调用ProtocolListenerWrapper,这个Wrapper负责初始化暴露和引用服务的监听器。对于Registry类型的不做处理,代码如下:

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //registry类型的Invoker,不需要做处理 3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4 return protocol.export(invoker); 5 } 6 //非Registry类型的Invoker,需要被监听器包装 7 return new ListenerExporterWrapper<T>(protocol.export(invoker), 8 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) 9 .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); 10} 11

接着调用ProtocolFilterWrapper中的export方法,ProtocolFilterWrapper负责初始化invoker所有的Filter。代码如下:

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //Registry类型的Invoker不做处理 3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4 return protocol.export(invoker); 5 } 6 //非Registry类型的Invoker需要先构建调用链,然后再导出 7 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); 8} 9

这里我们先解析的是Registry类型的Invoker,接着就会调用RegistryProtocol的export方法,RegistryProtocol负责注册服务到注册中心和向注册中心订阅服务。代码如下:

1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 //export invoker 3 //这里就交给了具体的协议去暴露服务(先不解析,留在后面,可以先去后面看下导出过程) 4 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 5 //registry provider 6 //根据invoker中的url获取Registry实例 7 //并且连接到注册中心 8 //此时提供者作为消费者引用注册中心核心服务RegistryService 9 final Registry registry = getRegistry(originInvoker); 10 //注册到注册中心的URL 11 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 12 //调用远端注册中心的register方法进行服务注册 13 //若有消费者订阅此服务,则推送消息让消费者引用此服务。 14 //注册中心缓存了所有提供者注册的服务以供消费者发现。 15 registry.register(registedProviderUrl); 16 // 订阅override数据 17 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 18 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 19 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); 20 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 21 //提供者向注册中心订阅所有注册服务的覆盖配置 22 //当注册中心有此服务的覆盖配置注册进来时,推送消息给提供者,重新暴露服务,这由管理页面完成。 23 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 24 //保证每次export都返回一个新的exporter实例 25 //返回暴露后的Exporter给上层ServiceConfig进行缓存,便于后期撤销暴露。 26 return new Exporter<T>() { 27 public Invoker<T> getInvoker() { 28 return exporter.getInvoker(); 29 } 30 public void unexport() { 31 try { 32 exporter.unexport(); 33 } catch (Throwable t) { 34 logger.warn(t.getMessage(), t); 35 } 36 try { 37 registry.unregister(registedProviderUrl); 38 } catch (Throwable t) { 39 logger.warn(t.getMessage(), t); 40 } 41 try { 42 overrideListeners.remove(overrideSubscribeUrl); 43 registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 44 } catch (Throwable t) { 45 logger.warn(t.getMessage(), t); 46 } 47 } 48 }; 49} 50

交给具体的协议去暴露服务

先不解析,留在后面,可以先去后面看下导出过程,然后再回来接着看注册到注册中心的过程。具体协议暴露服务主要是打开服务器和端口,进行监听。

注册到注册中心

具体的协议进行暴露并且返回了一个ExporterChangeableWrapper之后,接下来看下一步连接注册中心并注册到注册中心,代码是在RegistryProtocol的export方法:

1//此步已经分析完 2final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 3//得到具体的注册中心,连接注册中心,此时提供者作为消费者引用注册中心核心服务RegistryService 4final Registry registry = getRegistry(originInvoker); 5final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 6//调用远端注册中心的register方法进行服务注册 7//若有消费者订阅此服务,则推送消息让消费者引用此服务 8registry.register(registedProviderUrl); 9//提供者向注册中心订阅所有注册服务的覆盖配置 10registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 11//返回暴露后的Exporter给上层ServiceConfig进行缓存 12return new Exporter<T>() {。。。} 13

getRegistry(originInvoker)方法:

1//根据invoker的地址获取registry实例 2private Registry getRegistry(final Invoker<?> originInvoker){ 3 //获取invoker中的registryUrl 4 URL registryUrl = originInvoker.getUrl(); 5 if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { 6 //获取registry的值,这里获得是zookeeper,默认值是dubbo 7 String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); 8 //这里获取到的url为: 9 //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? 10 //application=dubbo-provider&application.version=1.0&dubbo=2.5.3& 11 //environment=product&export=dubbo%3A%2F%2F192.168.1.100%3A20880%2F 12 //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26 13 //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26 14 //interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26 15 //organization%3Dchina%26owner%3Dcheng.xi%26pid%3D9457%26side%3Dprovider%26timestamp%3D1489807681627&organization=china&owner=cheng.xi& 16 //pid=9457&timestamp=1489807680193 17 registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); 18 } 19 //根据SPI机制获取具体的Registry实例,这里获取到的是ZookeeperRegistry 20 return registryFactory.getRegistry(registryUrl); 21} 22

这里的registryFactory是动态生成的代码,如下:

1import com.alibaba.dubbo.common.extension.ExtensionLoader; 2public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory { 3 public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { 4 5 if (arg0 == null) throw new IllegalArgumentException("url == null"); 6 7 com.alibaba.dubbo.common.URL url = arg0; 8 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 9 10 if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])"); 11 12 com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName); 13 14 return extension.getRegistry(arg0); 15 } 16} 17

所以这里registryFactory.getRegistry(registryUrl)用的是ZookeeperRegistryFactory。

先看下getRegistry方法,会发现该方法会在AbstractRegistryFactory中实现:

1public Registry getRegistry(URL url) { 2 url = url.setPath(RegistryService.class.getName()) 3 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) 4 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); 5 //这里key为: 6 //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService 7 String key = url.toServiceString(); 8 // 锁定注册中心获取过程,保证注册中心单一实例 9 LOCK.lock(); 10 try { 11 //先从缓存中获取Registry实例 12 Registry registry = REGISTRIES.get(key); 13 if (registry != null) { 14 return registry; 15 } 16 //创建registry,会直接new一个ZookeeperRegistry返回 17 //具体创建实例是子类来实现的 18 registry = createRegistry(url); 19 if (registry == null) { 20 throw new IllegalStateException("Can not create registry " + url); 21 } 22 //放到缓存中 23 REGISTRIES.put(key, registry); 24 return registry; 25 } finally { 26 // 释放锁 27 LOCK.unlock(); 28 } 29} 30

createRegistry(url);是在子类中实现的,这里是ZookeeperRegistry,首先需要经过AbstractRegistry的构造:

1public AbstractRegistry(URL url) { 2 //url保存起来 3 setUrl(url); 4 // 启动文件保存定时器 5 // 6 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); 7 //保存的文件为: 8 ///home/xxx/.dubbo/dubbo-registry-127.0.0.1.cache 9 String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"); 10 File file = null; 11 if (ConfigUtils.isNotEmpty(filename)) { 12 file = new File(filename); 13 if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){ 14 if(! file.getParentFile().mkdirs()){ 15 throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); 16 } 17 } 18 } 19 this.file = file; 20 //加载文件中的属性 21 loadProperties(); 22 //通知订阅 23 notify(url.getBackupUrls()); 24} 25

notify()方法:

1protected void notify(List<URL> urls) { 2 if(urls == null || urls.isEmpty()) return; 3 //getSubscribed()方法获取订阅者列表 4 //订阅者Entry里每个URL都对应着n个NotifyListener 5 for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { 6 URL url = entry.getKey(); 7 8 if(! UrlUtils.isMatch(url, urls.get(0))) { 9 continue; 10 } 11 12 Set<NotifyListener> listeners = entry.getValue(); 13 if (listeners != null) { 14 for (NotifyListener listener : listeners) { 15 try { 16 //通知每个监听器 17 notify(url, listener, filterEmpty(url, urls)); 18 } catch (Throwable t) {} 19 } 20 } 21 } 22} 23

notify(url, listener, filterEmpty(url, urls));代码:

1protected void notify(URL url, NotifyListener listener, List<URL> urls) { 2 Map<String, List<URL>> result = new HashMap<String, List<URL>>(); 3 for (URL u : urls) { 4 if (UrlUtils.isMatch(url, u)) { 5 //分类 6 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 7 List<URL> categoryList = result.get(category); 8 if (categoryList == null) { 9 categoryList = new ArrayList<URL>(); 10 result.put(category, categoryList); 11 } 12 categoryList.add(u); 13 } 14 } 15 if (result.size() == 0) { 16 return; 17 } 18 Map<String, List<URL>> categoryNotified = notified.get(url); 19 if (categoryNotified == null) { 20 notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); 21 categoryNotified = notified.get(url); 22 } 23 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { 24 String category = entry.getKey(); 25 List<URL> categoryList = entry.getValue(); 26 categoryNotified.put(category, categoryList); 27 //保存到主目录下的.dubbo目录下 28 saveProperties(url); 29 //上面获取到的监听器进行通知 30 listener.notify(categoryList); 31 } 32} 33

AbstractRegistry构造器初始化完,接着调用FailbackRegistry构造器初始化:

1public FailbackRegistry(URL url) { 2 super(url); 3 //重试时间,默认5000ms 4 int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); 5 //启动失败重试定时器 6 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { 7 public void run() { 8 // 检测并连接注册中心 9 try { 10 //重试方法由每个具体子类实现 11 //获取到注册失败的,然后尝试注册 12 retry(); 13 } catch (Throwable t) { // 防御性容错} 14 } 15 }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); 16} 17

最后回到ZookeeperRegistry的构造初始化:

1public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 2 super(url); 3 if (url.isAnyHost()) { 4 throw new IllegalStateException("registry address == null"); 5 } 6 //获得到注册中心中的分组,默认dubbo 7 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); 8 if (! group.startsWith(Constants.PATH_SEPARATOR)) { 9 group = Constants.PATH_SEPARATOR + group; 10 } 11 //注册到注册中心的节点 12 this.root = group; 13 //使用zookeeperTansporter去连接 14 //ZookeeperTransport这里是生成的自适应实现,默认使用ZkClientZookeeperTransporter 15 //ZkClientZookeeperTransporter的connect去实例化一个ZkClient实例 16 //并且订阅状态变化的监听器subscribeStateChanges 17 //然后返回一个ZkClientZookeeperClient实例 18 zkClient = zookeeperTransporter.connect(url); 19 //ZkClientZookeeperClient添加状态改变监听器 20 zkClient.addStateListener(new StateListener() { 21 public void stateChanged(int state) { 22 if (state == RECONNECTED) { 23 try { 24 recover(); 25 } catch (Exception e) { 26 logger.error(e.getMessage(), e); 27 } 28 } 29 } 30 }); 31} 32

获取到了Registry,Registry实例中保存着连接到了zookeeper的zkClient实例之后,下一步获取要注册到注册中心的url(在RegistryProtocol中)。

1final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 2//得到的URL是: 3//dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService? 4//anyhost=true&application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product& 5//interface=dubbo.common.hello.service.HelloService&methods=sayHello& 6//organization=china&owner=cheng.xi&pid=9457&side=provider&timestamp=1489807681627 7

然后调用registry.register(registedProviderUrl)注册到注册中心(在RegistryProtocol中)。register方法的实现在FailbackRegistry中:

1public void register(URL url) { 2 super.register(url); 3 failedRegistered.remove(url); 4 failedUnregistered.remove(url); 5 try { 6 // 向服务器端发送注册请求 7 //调用子类具体实现,发送注册请求 8 doRegister(url); 9 } catch (Exception e) { 10 Throwable t = e; 11 12 // 如果开启了启动时检测,则直接抛出异常 13 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 14 && url.getParameter(Constants.CHECK_KEY, true) 15 && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 16 boolean skipFailback = t instanceof SkipFailbackWrapperException; 17 if (check || skipFailback) { 18 if(skipFailback) { 19 t = t.getCause(); 20 } 21 throw 。。。 22 } else { } 23 24 // 将失败的注册请求记录到失败列表,定时重试 25 failedRegistered.add(url); 26 } 27} 28

doRegister(url);在这里是ZookeeperRegistry中具体实现的,这里将会注册到注册中心:

1protected void doRegister(URL url) { 2 try { 3 //这里zkClient就是我们上面调用构造的时候生成的 4 //ZkClientZookeeperClient 5 //保存着连接到Zookeeper的zkClient实例 6 //开始注册,也就是在Zookeeper中创建节点 7 //这里toUrlPath获取到的path为: 8 ///dubbo/dubbo.common.hello.service.HelloService/providers/dubbo%3A%2F%2F192.168.1.100%3A20880%2F 9 //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26 10 //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26interface%3D 11 //dubbo.common.hello.service.HelloService%26methods%3DsayHello%26 12 //organization%3Dchina%26owner%3Dcheng.xi%26pid%3D8920%26side%3Dprovider%26timestamp%3D1489828029449 13 //默认创建的节点是临时节点 14 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 15 } catch (Throwable e) { } 16} 17

经过这一步之后,Zookeeper中就有节点存在了,具体节点为:

1/dubbo 2 dubbo.common.hello.service.HelloService 3 providers 4 /dubbo/dubbo.common.hello.service.HelloService/providers/ 5 dubbo%3A%2F%2F192.168.1.100%3A20880%2Fdubbo.common.hello.service.HelloService%3F 6 anyhost%3Dtrue%26application%3Ddubbo-provider%26 7 application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26 8 interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26 9 organization%3Dchina%26owner%3Dcheng.xi%26pid%3D13239%26side%3D 10 provider%26timestamp%3D1489829293525 11

订阅注册中心的服务

在注册到注册中心之后,registry会去订阅覆盖配置的服务,这一步之后就会在/dubbo/dubbo.common.hello.service/HelloService节点下多一个configurators节点。(具体过程暂先不解析)。

返回新Exporter实例

最后返回Exporter新实例,返回到ServiceConfig中。服务的发布就算完成了。

交给具体的协议进行服务暴露

这里也就是非Registry类型的Invoker的导出过程。主要的步骤是将本地ip和20880端口打开,进行监听。最后包装成exporter返回。

doLocalExport(invoker):

1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ 2 //原始的invoker中的url: 3 //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? 4 //application=dubbo-provider&application.version=1.0&dubbo=2.5.3 5 //&environment=product&export=dubbo%3A%2F%2F10.42.0.1%3A20880%2F 6 //dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26 7 //application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26 8 //interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26 9 //organization%3Dchina%26owner%3Dcheng.xi%26pid%3D7876%26side%3Dprovider%26timestamp%3D1489057305001& 10 //organization=china&owner=cheng.xi&pid=7876&registry=zookeeper&timestamp=1489057304900 11 12 //从原始的invoker中得到的key: 13 //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider& 14 //application.version=1.0&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService& 15 //methods=sayHello&organization=china&owner=cheng.xi&pid=7876&side=provider&timestamp=1489057305001 16 String key = getCacheKey(originInvoker); 17 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); 18 if (exporter == null) { 19 synchronized (bounds) { 20 exporter = (ExporterChangeableWrapper<T>) bounds.get(key); 21 if (exporter == null) { 22 //得到一个Invoker代理,里面包含原来的Invoker 23 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); 24 //此处protocol还是最上面生成的代码,调用代码中的export方法,会根据协议名选择调用具体的实现类 25 //这里我们需要调用DubboProtocol的export方法 26 //这里的使用具体协议进行导出的invoker是个代理invoker 27 //导出完之后,返回一个新的ExporterChangeableWrapper实例 28 exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); 29 bounds.put(key, exporter); 30 } 31 } 32 } 33 return (ExporterChangeableWrapper<T>) exporter; 34} 35

这里protocol.export(invokerDelegete)就要去具体的DubboProtocol中执行了,DubboProtocol的外面包裹着ProtocolFilterWrapper,再外面还包裹着ProtocolListenerWrapper。会先经过ProtocolListenerWrapper:

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //Registry类型的Invoker 3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4 return protocol.export(invoker); 5 } 6 //其他具体协议类型的Invoker 7 //先进行导出protocol.export(invoker) 8 //然后获取自适应的监听器 9 //最后返回的是包装了监听器的Exporter 10 return new ListenerExporterWrapper<T>(protocol.export(invoker), 11 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) 12 .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); 13} 14

再经过ProtocolFilterWrapper:

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //Registry类型的Invoker 3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4 return protocol.export(invoker); 5 } 6 //其他具体协议类型的Invoker 7 //先构建Filter链,然后再导出 8 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); 9} 10

查看下构建Invoker链的方法:

1private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { 2 //我们要处理的那个Invoker作为处理链的最后一个 3 Invoker<T> last = invoker; 4 //根据key和group获取自动激活的Filter 5 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); 6 if (filters.size() > 0) { 7 //把所有的过滤器都挨个连接起来,最后一个是我们真正的Invoker 8 for (int i = filters.size() - 1; i >= 0; i --) { 9 final Filter filter = filters.get(i); 10 final Invoker<T> next = last; 11 last = new Invoker<T>() { 12 13 public Class<T> getInterface() { 14 return invoker.getInterface(); 15 } 16 17 public URL getUrl() { 18 return invoker.getUrl(); 19 } 20 21 public boolean isAvailable() { 22 return invoker.isAvailable(); 23 } 24 25 public Result invoke(Invocation invocation) throws RpcException { 26 return filter.invoke(next, invocation); 27 } 28 29 public void destroy() { 30 invoker.destroy(); 31 } 32 33 @Override 34 public String toString() { 35 return invoker.toString(); 36 } 37 }; 38 } 39 } 40 return last; 41} 42

接着就到了DubboProtocol的export方法,这里进行暴露服务:

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService? 3 //anyhost=true&application=dubbo-provider& 4 //application.version=1.0&dubbo=2.5.3&environment=product& 5 //interface=dubbo.common.hello.service.HelloService& 6 //methods=sayHello&organization=china&owner=cheng.xi& 7 //pid=7876&side=provider&timestamp=1489057305001 8 URL url = invoker.getUrl(); 9 10 // export service. 11 //key由serviceName,port,version,group组成 12 //当nio客户端发起远程调用时,nio服务端通过此key来决定调用哪个Exporter,也就是执行的Invoker。 13 //dubbo.common.hello.service.HelloService:20880 14 String key = serviceKey(url); 15 //将Invoker转换成Exporter 16 //直接new一个新实例 17 //没做啥处理,就是做一些赋值操作 18 //这里的exporter就包含了invoker 19 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); 20 //缓存要暴露的服务,key是上面生成的 21 exporterMap.put(key, exporter); 22 23 //export an stub service for dispaching event 24 //是否支持本地存根 25 //远程服务后,客户端通常只剩下接口,而实现全在服务器端, 26 //但提供方有些时候想在客户端也执行部分逻辑,比如:做ThreadLocal缓存, 27 //提前验证参数,调用失败后伪造容错数据等等,此时就需要在API中带上Stub, 28 //客户端生成Proxy实,会把Proxy通过构造函数传给Stub, 29 //然后把Stub暴露组给用户,Stub可以决定要不要去调Proxy。 30 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); 31 Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); 32 if (isStubSupportEvent && !isCallbackservice){ 33 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); 34 if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ 35 } else { 36 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); 37 } 38 } 39 //根据URL绑定IP与端口,建立NIO框架的Server 40 openServer(url); 41 42 return exporter; 43} 44

上面得到的Exporter会被放到缓存中去,key就是上面生成的,客户端就可以发请求根据key找到Exporter,然后找到invoker进行调用了。接下来是创建服务器并监听端口。

接着调用openServer方法创建NIO Server进行监听:

1private void openServer(URL url) { 2 // find server. 3 //key是IP:PORT 4 //192.168.110.197:20880 5 String key = url.getAddress(); 6 //client 也可以暴露一个只有server可以调用的服务。 7 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); 8 if (isServer) { 9 10 ExchangeServer server = serverMap.get(key); 11 //同一JVM中,同协议的服务,共享同一个Server, 12 //第一个暴露服务的时候创建server, 13 //以后相同协议的服务都使用同一个server 14 if (server == null) { 15 serverMap.put(key, createServer(url)); 16 } else { 17 //同协议的服务后来暴露服务的则使用第一次创建的同一Server 18 //server支持reset,配合override功能使用 19 //accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化 20 //这时需要重新设置Server 21 server.reset(url); 22 } 23 } 24} 25

继续看createServer方法:

1//url为: 2//dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService? 3//anyhost=true&application=dubbo-provider& 4//application.version=1.0&dubbo=2.5.3&environment=product& 5//interface=dubbo.common.hello.service.HelloService& 6//methods=sayHello&organization=china&owner=cheng.xi& 7//pid=720&side=provider&timestamp=1489716708276 8private ExchangeServer createServer(URL url) { 9 //默认开启server关闭时发送readonly事件 10 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); 11 //默认开启heartbeat 12 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 13 //默认使用netty 14 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); 15 16 if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) 17 throw new RpcException("Unsupported server type: " + str + ", url: " + url); 18 19 url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); 20 ExchangeServer server; 21 try { 22 //Exchangers是门面类,里面封装的是Exchanger的逻辑。 23 //Exchanger默认只有一个实现HeaderExchanger. 24 //Exchanger负责数据交换和网络通信。 25 //从Protocol进入Exchanger,标志着程序进入了remote层。 26 //这里requestHandler是ExchangeHandlerAdapter 27 server = Exchangers.bind(url, requestHandler); 28 } catch (RemotingException e) { } 29 str = url.getParameter(Constants.CLIENT_KEY); 30 if (str != null && str.length() > 0) { 31 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 32 if (!supportedTypes.contains(str)) { 33 throw new RpcException("Unsupported client type: " + str); 34 } 35 } 36 return server; 37} 38

Exchangers.bind方法:

1public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 2 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); 3 //getExchanger方法根据url获取到一个默认的实现HeaderExchanger 4 //调用HeaderExchanger的bind方法 5 return getExchanger(url).bind(url, handler); 6} 7

HeaderExchanger的bind方法:

1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 2 //直接返回一个HeaderExchangeServer 3 //先创建一个HeaderExchangeHandler 4 //再创建一个DecodeHandler 5 //最后调用Transporters.bind 6 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 7} 8

这里会先创建一个HeaderExchangerHandler,包含着ExchangeHandlerAdapter,接着创建一个DecodeHandler,会包含前面的handler,接下来调用Transporters的bind方法,返回一个Server,接着用HeaderExchangeServer包装一下,就返回给Protocol层了。

在HeaderExchangerServer包装的时候会启动心跳定时器startHeatbeatTimer();,暂不解析。

Transports的bind方法:

1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { 2 ChannelHandler handler; 3 if (handlers.length == 1) { 4 handler = handlers[0]; 5 } else { 6 //如果有多个handler的话,需要使用分发器包装下 7 handler = new ChannelHandlerDispatcher(handlers); 8 } 9 //getTransporter()获取一个Adaptive的Transporter 10 //然后调用bind方法(默认是NettyTransporter的bind方法) 11 return getTransporter().bind(url, handler); 12} 13

getTransporter()生成的Transporter的代码如下:

1import com.alibaba.dubbo.common.extension.ExtensionLoader; 2public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter { 3 public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { 4 if (arg0 == null) throw new IllegalArgumentException("url == null"); 5 com.alibaba.dubbo.common.URL url = arg0; 6 //Server默认使用netty 7 String extName = url.getParameter("server", url.getParameter("transporter", "netty")); 8 if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); 9 //获取到一个NettyTransporter 10 com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); 11 //调用NettyTransporter的bind方法 12 return extension.bind(arg0, arg1); 13 } 14 15public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { 16 if (arg0 == null) throw new IllegalArgumentException("url == null"); 17 com.alibaba.dubbo.common.URL url = arg0; 18 19 String extName = url.getParameter("client", url.getParameter("transporter", "netty")); 20 21 if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])"); 22 23 com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); 24 25 return extension.connect(arg0, arg1); 26} 27} 28

NettyTransporter的bind方法:

1 public Server bind(URL url, ChannelHandler listener) throws RemotingException { 2 //创建一个Server 3 return new NettyServer(url, listener); 4} 5
1public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ 2 //handler先经过ChannelHandlers的包装方法 3 //然后再初始化 4 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); 5} 6

ChannelHandlers.wrap方法中会根据SPI扩展机制动态生成Dispatcher的自适应类,生成的代码不在列出,默认使用AllDispatcher处理,会返回一个AllChannelHandler,会把线程池和DataStore都初始化了。然后经过HeartbeatHandler封装,再经过MultiMessageHandler封装后返回。

NettyServer构造,会依次经过AbstractPeer,AbstractEndpoint,AbstractServer,NettyServer的初始化。重点看下AbstractServer的构造方法:

1public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { 2 super(url, handler); 3 localAddress = getUrl().toInetSocketAddress(); 4 String host = url.getParameter(Constants.ANYHOST_KEY, false) 5 || NetUtils.isInvalidLocalHost(getUrl().getHost()) 6 ? NetUtils.ANYHOST : getUrl().getHost(); 7 bindAddress = new InetSocketAddress(host, getUrl().getPort()); 8 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); 9 this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); 10 try { 11 //初始化的时候会打开Server 12 //具体实现这里是NettyServer中 13 doOpen(); 14 } catch (Throwable t) { } 15 if (handler instanceof WrappedChannelHandler ){ 16 executor = ((WrappedChannelHandler)handler).getExecutor(); 17 } 18} 19

然后调用doOpen方法:

1protected void doOpen() throws Throwable { 2 NettyHelper.setNettyLoggerFactory(); 3 //boss线程池 4 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); 5 //worker线程池 6 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); 7 //ChannelFactory,没有指定工作者线程数量,就使用cpu+1 8 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); 9 bootstrap = new ServerBootstrap(channelFactory); 10 11 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); 12 channels = nettyHandler.getChannels(); 13 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 14 public ChannelPipeline getPipeline() { 15 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); 16 ChannelPipeline pipeline = Channels.pipeline(); 17 pipeline.addLast("decoder", adapter.getDecoder()); 18 pipeline.addLast("encoder", adapter.getEncoder()); 19 pipeline.addLast("handler", nettyHandler); 20 return pipeline; 21 } 22 }); 23 // bind之后返回一个Channel 24 channel = bootstrap.bind(getBindAddress()); 25} 26

doOpen方法创建Netty的Server端并打开,具体的事情就交给Netty去处理了,Netty的过程,原理,代码有时间再另行研究。

  • NIO框架接受到消息后,先由NettyCodecAdapter解码,再由NettyHandler处理具体的业务逻辑,再由NettyCodecAdapter编码后发送。
  • NettyServer既是Server又是Handler。
  • HeaderExchangerServer只是Server。
  • MultiMessageHandler是多消息处理Handler。
  • HeartbeatHandler是处理心跳事件的Handler。
  • AllChannelHandler是消息派发器,负责将请求放入线程池,并执行请求。
  • DecodeHandler是编解码Handler。
  • HeaderExchangerHandler是信息交换Handler,将请求转化成请求响应模式与同步转异步模式。
  • RequestHandler是最后执行的Handler,会在协议层选择Exporter后选择Invoker,进而执行Filter与Invoker,最终执行请求服务实现类方法。
  • Channel直接触发事件并执行Handler,Channel在有客户端连接Server的时候触发创建并封装成NettyChannel,再由HeaderExchangerHandler创建HeaderExchangerChannel,负责请求响应模式的处理。
  • NettyChannel其实是个Handler,HeaderExchangerChannel是个Channel,
  • 消息的序列化与反序列化工作在NettyCodecAdapter中发起完成。

当有客户端连接Server时的连接过程:

  • NettyHandler.connected()
  • NettyServer.connected()
  • MultiMessageHandler.connected()
  • HeartbeatHandler.connected()
  • AllChannelHandler.connected()
  • DecodeHandler.connected()
  • HeaderExchangerHandler.connected()
  • requestHandler.connected()
  • 执行服务的onconnect事件的监听方法

名词解释

Invoker

可执行的对象,执行具体的远程调用,能够根据方法名称,参数得到相应的执行结果。

Invocation,包含了需要执行的方法,参数等信息。目前实现类只有RpcInvocation。

有三种类型的Invoker:

  • 本地执行类的Invoker。
  • 远程通信执行类的Invoker。
  • 多个远程通信执行类的Invoker聚合成集群版的Invoker。

以HelloService为例:

  • 本地执行类的Invoker:在Server端有HelloServiceImpl实现,要执行该接口,只需要通过反射执行对应的实现类即可。
  • 远程通信执行类的Invoker:在Client端要想执行该接口的实现方法,需要先进行远程通信,发送要执行的参数信息给Server端,Server端利用本地执行Invoker的方式执行,最后将结果发送给Client。
  • 集群版的Invoker:Client端使用的时候,通过集群版的Invoker操作,Invoker会挑选一个远程通信类型的Invoker来执行。

提供者端的Invoker封装了服务实现类,URL,Type,状态都是只读并且线程安全。通过发起invoke来具体调用服务类。

ProxyFactory

在服务提供者端,ProxyFactory主要服务的实现统一包装成一个Invoker,Invoker通过反射来执行具体的Service实现对象的方法。默认的实现是JavassistProxyFactory,代码如下:

1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 2 // TODO Wrapper类不能正确处理带$的类名 3 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); 4 return new AbstractProxyInvoker<T>(proxy, type, url) { 5 @Override 6 protected Object doInvoke(T proxy, String methodName, 7 Class<?>[] parameterTypes, 8 Object[] arguments) throws Throwable { 9 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 10 } 11 }; 12} 13

Protocol

服务地址的发布和订阅。

Protocol是dubbo中的服务域,只在服务启用时加载,无状态,线程安全,是实体域Invoker暴露和引用的主功能入口,负责Invoker的生命周期管理,是Dubbo中远程服务调用层。

Protocol根据指定协议对外公布服务,当客户端根据协议调用这个服务时,Protocol会将客户端传递过来的Invocation参数交给Invoker去执行。

Protocol加入了远程通信协议,会根据客户端的请求来获取参数Invocation。

1@Extension("dubbo") 2public interface Protocol { 3 4 int getDefaultPort(); 5 6 //对于服务提供端,将本地执行类的Invoker通过协议暴漏给外部 7 //外部可以通过协议发送执行参数Invocation,然后交给本地Invoker来执行 8 @Adaptive 9 <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; 10 11 //这个是针对服务消费端的,服务消费者从注册中心获取服务提供者发布的服务信息 12 //通过服务信息得知服务提供者使用的协议,然后服务消费者仍然使用该协议构造一个Invoker。这个Invoker是远程通信类的Invoker。 13 //执行时,需要将执行信息通过指定协议发送给服务提供者,服务提供者接收到参数Invocation,然后交给服务提供者的本地Invoker来执行 14 @Adaptive 15 <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; 16 17 void destroy(); 18 19} 20

关于RegistryProtocol和DubboProtocol的疑惑

以下是官方文档说明:

暴露服务:

(1) 只暴露服务端口:

在没有注册中心,直接暴露提供者的情况下,即:
<dubbo:service regisrty="N/A" /> or <dubbo:registry address="N/A" />

ServiceConfig解析出的URL的格式为:
dubbo://service-host/com.foo.FooService?version=1.0.0

基于扩展点的Adaptive机制,通过URL的”dubbo://”协议头识别,直接调用DubboProtocol的export()方法,打开服务端口。

(2) 向注册中心暴露服务:

在有注册中心,需要注册提供者地址的情况下,即:
<dubbo:registry address="zookeeper://10.20.153.10:2181" />

ServiceConfig解析出的URL的格式为:
registry://registry-host/com.alibaba.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0")

基于扩展点的Adaptive机制,通过URL的”registry://”协议头识别,就会调用RegistryProtocol的export()方法,将export参数中的提供者URL,先注册到注册中心,再重新传给Protocol扩展点进行暴露:
dubbo://service-host/com.foo.FooService?version=1.0.0

基于扩展点的Adaptive机制,通过提供者URL的”dubbo://”协议头识别,就会调用DubboProtocol的export()方法,打开服务端口。

RegistryProtocol,注册中心协议集成,装饰真正暴露引用服务的协议,增强注册发布功能。

ServiceConfig中的protocol是被多层装饰的Protocol,是DubboProtocol+RegistryProtocol+ProtocolListenerWrapper+ProtocolFilterWrapper。

  • ProtocolFilterWrapper负责初始化invoker所有的Filter。
  • ProtocolListenerWrapper负责初始化暴露或引用服务的监听器。
  • RegistryProtocol负责注册服务到注册中心和向注册中心订阅服务。
  • DubboProtocol负责服务的具体暴露与引用,也负责网络传输层,信息交换层的初始化,以及底层NIO框架的初始化。

Exporter

负责invoker的生命周期,包含一个Invoker对象,可以撤销服务。

Exchanger

负责数据交换和网络通信的组件。每个Invoker都维护了一个ExchangeClient的 引用,并通过它和远端server进行通信。

代码交流 2021