dubbo服务暴露

ServiceBean#onApplicationEvent(ContextRefreshedEvent)

ServiceConfig#export()

ServiceConfig#doExport()
首先校验该service的配置是否为空,则加载dubbo:provider、dubbo:module、dubbo:application缺省配置,若还为空则加载dubbo.properties的配置。
配置覆盖策略

ServiceConfig#doExportUrls()

1private void doExportUrls() { 2 //加载注册中心url 3 List<URL> registryURLs = loadRegistries(true); 4 for (ProtocolConfig protocolConfig : protocols) { 5 doExportUrlsFor1Protocol(protocolConfig, registryURLs); 6 } 7 } 8 9

首先看AbstractInterfaceConfig#loadRegistries(boolean)

1protected List<URL> loadRegistries(boolean provider) { 2 //检验registry是否为空 3 checkRegistry(); 4 //遍历所有registry 5 List<URL> registryList = new ArrayList<URL>(); 6 if (registries != null && !registries.isEmpty()) { 7 for (RegistryConfig config : registries) { 8 /*省略代码,参数校验*/ 9 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { 10 //构造kv属性 11 Map<String, String> map = new HashMap<String, String>(); 12 appendParameters(map, application); 13 appendParameters(map, config); 14 map.put("path", RegistryService.class.getName()); 15 map.put("dubbo", Version.getProtocolVersion()); 16 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 17 if (ConfigUtils.getPid() > 0) { 18 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 19 } 20 if (!map.containsKey("protocol")) { 21 if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { 22 map.put("protocol", "remote"); 23 } else { 24 map.put("protocol", "dubbo"); 25 } 26 } 27 //生成url对象 28 List<URL> urls = UrlUtils.parseURLs(address, map); 29 for (URL url : urls) { 30 url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); 31 url = url.setProtocol(Constants.REGISTRY_PROTOCOL); 32 if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) 33 || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { 34 registryList.add(url); 35 } 36 } 37 } 38 } 39 } 40 return registryList; 41 } 42 43

生成的registry url如下图
在这里插入图片描述
接着回去看doExportUrls,循环调用ServiceConfig#doExportUrlsFor1Protocol(ProtocolConfig , List)

1private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { 2 String name = protocolConfig.getName(); 3 if (name == null || name.length() == 0) { 4 name = "dubbo"; 5 } 6 //把application、provider、protocol等配置读取到map中 7 Map<String, String> map = new HashMap<String, String>(); 8 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); 9 map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); 10 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 11 if (ConfigUtils.getPid() > 0) { 12 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 13 } 14 appendParameters(map, application); 15 appendParameters(map, module); 16 appendParameters(map, provider, Constants.DEFAULT_KEY); 17 appendParameters(map, protocolConfig); 18 appendParameters(map, this); 19 if (methods != null && !methods.isEmpty()) { 20 for (MethodConfig method : methods) { 21 /*省略代码,循环方法级别配置,一般不会配置方法的配置*/ 22 } // end of methods for 23 } 24 //是否泛化实现http://dubbo.apache.org/zh-cn/docs/user/demos/generic-service.html 25 //普通service走else 26 if (ProtocolUtils.isGeneric(generic)) { 27 map.put(Constants.GENERIC_KEY, generic); 28 map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); 29 } else { 30 String revision = Version.getVersion(interfaceClass, version); 31 if (revision != null && revision.length() > 0) { 32 map.put("revision", revision); 33 } 34 //getWrapper通过javassist生成Wrapper类,保存到一个WRAPPER_MAP中 35 //最后拿到所有方法的名称,拼接成method1,method2...,放到最初的那个map中 36 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 37 if (methods.length == 0) { 38 logger.warn("NO method found in service interface " + interfaceClass.getName()); 39 map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); 40 } else { 41 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 42 } 43 } 44 //令牌验证,http://dubbo.apache.org/zh-cn/docs/user/demos/token-authorization.html 45 if (!ConfigUtils.isEmpty(token)) { 46 if (ConfigUtils.isDefault(token)) { 47 map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); 48 } else { 49 map.put(Constants.TOKEN_KEY, token); 50 } 51 } 52 //是否本地方法,http://dubbo.apache.org/zh-cn/docs/user/demos/local-call.html 53 if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { 54 protocolConfig.setRegister(false); 55 map.put("notify", "false"); 56 } 57 //开始了,服务暴露 58 String contextPath = protocolConfig.getContextpath(); 59 //http协议的配置?一般为空串 60 if ((contextPath == null || contextPath.length() == 0) && provider != null) { 61 contextPath = provider.getContextpath(); 62 } 63 64 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); 65 Integer port = this.findConfigedPorts(protocolConfig, name, map);66 //构造URL 67 URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); 68 69 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 70 .hasExtension(url.getProtocol())) { 71 url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 72 .getExtension(url.getProtocol()).getConfigurator(url).configure(url); 73 } 74 75 String scope = url.getParameter(Constants.SCOPE_KEY); 76 // scope在dubbo.apache.org dubbo:service没有了?以前老的网站好像写了 77 // scope配置成none不暴露 78 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { 79 80 // 不是配置remote就本地暴露(配置remote,表示只远程暴露) 81 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { 82 exportLocal(url); 83 } 84 // 不是配置local就远程暴露(配置local,表示只本地暴露) 85 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 86 /*省略代码,logger*/ 87 if (registryURLs != null && !registryURLs.isEmpty()) { 88 //遍历所有注册中心 89 for (URL registryURL : registryURLs) { 90 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); 91 //监控中心,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-monitor.html 92 URL monitorUrl = loadMonitor(registryURL); 93 if (monitorUrl != null) { 94 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 95 } 96 /*省略代码,logger*/ 97 // For providers, this is used to enable custom proxy to generate invoker 98 String proxy = url.getParameter(Constants.PROXY_KEY); 99 if (StringUtils.isNotEmpty(proxy)) { 100 registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); 101 } 102 //通过代理工厂获得invoker 103 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 104 //生成invoker的委托 105 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 106 //远程暴露 107 Exporter<?> exporter = protocol.export(wrapperInvoker); 108 exporters.add(exporter); 109 } 110 } else { 111 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); 112 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 113 114 Exporter<?> exporter = protocol.export(wrapperInvoker); 115 exporters.add(exporter); 116 } 117 } 118 } 119 this.urls.add(url); 120 } 121 122

小结

  1. 读取各种配置,生成注册中心URL;
  2. 读取各种配置,生成服务的URL;
  3. 通过代理工厂将服务ref对象转化成invoker对象,并且生成委托;
  4. 远程暴露,将多个提供者保存到exporters。

接下来,分别讲一下本地暴露和远程暴露的细节。
这边涉及到dubbo的Adaptive,其实就是通过传入的参数获得一个具体实现。

本地暴露

ServiceConfig#exportLocal(URL)

1private void exportLocal(URL url) { 2 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { 3 //转换成local url 4 URL local = URL.valueOf(url.toFullString()) 5 .setProtocol(Constants.LOCAL_PROTOCOL) 6 .setHost(LOCALHOST) 7 .setPort(0); 8 //将class放到ThreadLocal 9 ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); 10 //获得ProxyFactory生成invoker,先往下看 11 //继续看protocol.export 12 Exporter<?> exporter = protocol.export( 13 proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 14 //最终会将生成的exporter加入到ServiceConfig的实例对象exporters中 15 exporters.add(exporter); 16 logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); 17 } 18 } 19 20

ProxyFactory$Adaptive#getInvoker(java.lang.Object , java.lang.Class , com.alibaba.dubbo.common.URL )

1public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException { 2 if (arg2 == null) throw new IllegalArgumentException("url == null"); 3 com.alibaba.dubbo.common.URL url = arg2; 4 //获得url中的协议 5 String extName = url.getParameter("proxy", "javassist"); 6 if (extName == null) 7 /*省略代码,throw异常*/ 8 //根据协议名称获得具体的ProxyFactory实现类,类似反射,ExtensionLoader为dubbo的扩展机制,这边不分析 9 com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); 10 //获得invoker 11 return extension.getInvoker(arg0, arg1, arg2); 12 } 13 14

包装类,StubProxyFactoryWrapper#getInvoker(T proxy, Class type, URL url)

1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException { 2 return proxyFactory.getInvoker(proxy, type, url); 3 } 4 5

实际调用类,JavassistProxyFactory#getInvoker(T proxy, Class type, URL url),默认协议Javassist

1public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 2 // TODO Wrapper类不能正确处理带$的类名 3 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); 4 return new AbstractProxyInvoker<T>(proxy, type, url) { 5 @Override 6 protected Object doInvoke(T proxy, String methodName, 7 Class<?>[] parameterTypes, 8 Object[] arguments) throws Throwable { 9 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 10 } 11 }; 12 } 13 14

生成的Wrapper中的invokeMethod方法如下所示,可以看一下AbstractProxyInvoker的调用过程就明白了:

1public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException 2{ 3 org.apache.dubbo.demo.provider.DemoServiceImpl w; 4 try{ 5 w = ((org.apache.dubbo.demo.provider.DemoServiceImpl)$1); 6 }catch(Throwable e){ 7 throw new IllegalArgumentException(e); 8 } 9 try{ 10 if( "sayHello".equals( $2 ) && $3.length == 1 ) { 11 return ($w)w.sayHello((java.lang.String)$4[0]); 12 } 13 } catch(Throwable e) { 14 throw new java.lang.reflect.InvocationTargetException(e); 15 } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class org.apache.dubbo.demo.provider.DemoServiceImpl."); 16} 17 18

回来看protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)),和ProxyFactory一样动态生成了Protocol$Adaptive,Protocol$Adaptive#export(com.alibaba.dubbo.rpc.Invoker arg0)方法代码如下:

1public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { 2 /*省略代码,参数null判断,throw异常*/ 3 com.alibaba.dubbo.common.URL url = arg0.getUrl(); 4 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); 5 if (extName == null) 6 /*省略代码,throw异常*/ 7 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 8 return extension.export(arg0); 9 } 10 11

上述代码中,Protocol为包装类,如下图所示:
在这里插入图片描述

调用了ProtocolFilterWrapper#export(Invoker invoker)

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //判断是不是registry协议 3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4 return protocol.export(invoker); 5 } 6 //本地暴露走这 7 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); 8 } 9 10

ProtocolFilterWrapper#buildInvokerChain(final Invoker invoker, String key, String group),生成责任链

1private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { 2 Invoker<T> last = invoker; 3 //根据URL的属性获得相应Activate的过滤器 4 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); 5 //生成调用的责任链 6 if (!filters.isEmpty()) { 7 for (int i = filters.size() - 1; i >= 0; i--) { 8 final Filter filter = filters.get(i); 9 final Invoker<T> next = last; 10 last = new Invoker<T>() { 11 @Override 12 public Class<T> getInterface() { 13 return invoker.getInterface(); 14 } 15 @Override 16 public URL getUrl() { 17 return invoker.getUrl(); 18 } 19 @Override 20 public boolean isAvailable() { 21 return invoker.isAvailable(); 22 } 23 @Override 24 public Result invoke(Invocation invocation) throws RpcException { 25 return filter.invoke(next, invocation); 26 } 27 @Override 28 public void destroy() { 29 invoker.destroy(); 30 } 31 @Override 32 public String toString() { 33 return invoker.toString(); 34 } 35 }; 36 } 37 } 38 return last; 39 } 40 41

回到ProtocolFilterWrapper#export(Invoker invoker),protocol.export调用了QosProtocolWrapper#export(Invoker),2.5.8 新版本增加了 QOS 模块,本地服务不涉及。

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //判断是不是registry协议 3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4 startQosServer(invoker.getUrl()); 5 return protocol.export(invoker); 6 } 7 //本地暴露走这 8 return protocol.export(invoker); 9 } 10 11

ProtocolListenerWrapper#export(Invoker),由于本地暴露protocol为InjvmProtocol。

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //判断是不是registry协议 3 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4 return protocol.export(invoker); 5 } 6 //本地暴露走这,先往下看 7 //然后构造ListenerExporterWrapper,也是把参数赋值 8 return new ListenerExporterWrapper<T>(protocol.export(invoker), 9 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) 10 .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); 11 } 12 13

InjvmProtocol#export(Invoker),终于到最后了!!!

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 //这边的ServiceKey为“接口名称:版本号”,如果没有版本号就为“接口名称” 3 //exporterMap是InjvmProtocol抽象父类的对象变量,protected final Map<String, Exporter<?>> exporterMap 4 //构造方法,把参数赋值到对象变量,并且exporterMap.put(key, this),this为InjvmExporter 5 //所以所有本地暴露服务都放在exporterMap中 6 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); 7 } 8 9

回到ProtocolListenerWrapper#export(Invoker),构造ListenerExporterWrapper,把参数赋值。默认ExporterListener是空的。

最终回到本地暴露小节的开始,生成的Exporter如下图:
在这里插入图片描述

远程暴露

远程暴露是服务暴露的重点,涉及的内容比较多。
下面这段代码在上面提到过,远程暴露生成Invoker和本地暴露是类似的,只是URL不同。

1private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { 2 /*省略无关代码,具体代码在上面提到过*/ 3 4 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { 5 6 // 不是配置remote就本地暴露(配置remote,表示只远程暴露) 7 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { 8 exportLocal(url); 9 } 10 // 不是配置local就远程暴露(配置local,表示只本地暴露) 11 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 12 /*省略代码,logger*/ 13 if (registryURLs != null && !registryURLs.isEmpty()) { 14 //遍历所有注册中心 15 for (URL registryURL : registryURLs) { 16 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); 17 //监控中心,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-monitor.html 18 URL monitorUrl = loadMonitor(registryURL); 19 if (monitorUrl != null) { 20 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 21 } 22 /*省略代码,logger*/ 23 // For providers, this is used to enable custom proxy to generate invoker 24 String proxy = url.getParameter(Constants.PROXY_KEY); 25 if (StringUtils.isNotEmpty(proxy)) { 26 registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); 27 } 28 //通过代理工厂获得invoker 29 //registryURL.addParameterAndEncoded会将dubboURL以export为key加入到registryURL 30 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 31 //生成invoker的委托,将invoker和ServiceConfig绑定 32 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 33 //远程暴露 34 Exporter<?> exporter = protocol.export(wrapperInvoker); 35 exporters.add(exporter); 36 } 37 } else { 38 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); 39 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 40 41 Exporter<?> exporter = protocol.export(wrapperInvoker); 42 exporters.add(exporter); 43 } 44 } 45 } 46 this.urls.add(url); 47 } 48 49

远程暴露的核心类(这个流程会反复出现)为RegisterProtocol所以Protocol$Adaptive动态获得了RegisterProtocol,ProtocolFilterWrapper和本地暴露一致,只是协议不同,本地暴露为InjvmProtocol,所以直接看到RegistryProtocol#export(final Invoker)方法。

1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 //导出服务部分,先往下看 3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 4 5 /*省略代码,下面会有写*/ 6 7 //保证每次export都返回一个新的exporter实例 8 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); 9 } 10 11

RegistryProtocol#doLocalExport(final Invoker originInvoker)

1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { 2 //从invoker中获得服务的URL,默认是dubbo://开头的URL 3 String key = getCacheKey(originInvoker); 4 //从缓存中获得exporter,如果已经暴露就不再暴露 5 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); 6 //双重检查锁 7 if (exporter == null) { 8 synchronized (bounds) { 9 exporter = (ExporterChangeableWrapper<T>) bounds.get(key); 10 if (exporter == null) { 11 //将originInvoker和服务的URL(默认dubbo://开头的URL)封装到一个委托中 12 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); 13 //生成Exporter流程与本地暴露相同 14 //由于默认dubbo协议,所以protocol包装类最终的protocol为DubboProtocol,只是最后会调用DubboProtocol#export(Invoker<T> invoker) 15 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); 16 //写缓存 17 bounds.put(key, exporter); 18 } 19 } 20 } 21 return exporter; 22 } 23 24

DubboProtocol#export(Invoker invoker)

1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 URL url = invoker.getUrl(); 3 // 和本地暴露一样,远程暴露DubboProtocol中也有一个exporterMap,记录了暴露的服务 4 // key由服务组名,服务名,服务版本号以及端口组成 5 //没有设置group和版本号,key为pers.congzhou.service.DemoService:20880 6 //如全设置为demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 7 String key = serviceKey(url); 8 //构造DubboExporter 9 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); 10 //加入缓存 11 exporterMap.put(key, exporter); 12 //本地存根,http://dubbo.apache.org/zh-cn/docs/user/demos/local-stub.html 13 //跳过,消费者端为调度事件导出存根服务 14 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); 15 Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); 16 if (isStubSupportEvent && !isCallbackservice) { 17 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); 18 if (stubServiceMethods == null || stubServiceMethods.length() == 0) { 19 if (logger.isWarnEnabled()) { 20 /*省略代码,logger*/ 21 } 22 } else { 23 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); 24 } 25 } 26 //启动服务器 27 openServer(url); 28 //优化序列化 29 optimizeSerialization(url); 30 return exporter; 31 } 32 33

启动服务器实例

DubboProtocol#openServer(URL url)

1private void openServer(URL url) { 2 // 获取地址(host:port),并将其作为服务器实例的 key,用于标识当前的服务器实例 3 String key = url.getAddress(); 4 // 官方注释:client 也可以暴露一个只有server可以调用的服务。 5 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); 6 if (isServer) { 7 //缓存,不存在就创建一个,默认只启动一个 8 ExchangeServer server = serverMap.get(key); 9 if (server == null) { 10 serverMap.put(key, createServer(url)); 11 } else { 12 // 服务器已存在,则根据URL中的配置重置服务器 13 server.reset(url); 14 } 15 } 16 } 17 18

DubboProtocol#createServer(URL url)

1private ExchangeServer createServer(URL url) { 2 //往URL加配置 3 // 默认开启server关闭时发送readonly事件 4 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); 5 // 默认开启heartbeat 6 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 7 //获得server参数,http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-protocol.html 8 //dubbo协议缺省为netty,http协议缺省为servlet 9 //这边我配置了netty4 10 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); 11 12 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) 13 throw new RpcException("Unsupported server type: " + str + ", url: " + url); 14 //配置协议编码方式 15 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); 16 ExchangeServer server; 17 try { 18 //启动服务器 19 //Exchangers通过Adaptive机制获得Exchanger,默认为HeaderExchanger 20 server = Exchangers.bind(url, requestHandler); 21 } catch (RemotingException e) { 22 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); 23 } 24 str = url.getParameter(Constants.CLIENT_KEY); 25 if (str != null && str.length() > 0) { 26 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 27 if (!supportedTypes.contains(str)) { 28 throw new RpcException("Unsupported client type: " + str); 29 } 30 } 31 return server; 32 } 33 34

HeaderExchanger#bind(URL url, ExchangeHandler handler)方法的三个逻辑:

  1. HeaderExchangeHandler代理了DubboProtocol#requestHandler

  2. DecodeHandler代理了HeaderExchangeHandler

  3. bind,创建 NettyServer

1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 2 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 3 } 4 5

Transporters#bind(URL url, ChannelHandler… handlers)

1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { 2 /*省略代码,参数校验*/ 3 ChannelHandler handler; 4 if (handlers.length == 1) { 5 handler = handlers[0]; 6 } else { 7 handler = new ChannelHandlerDispatcher(handlers); 8 } 9 //getTransporter()获得Adaptive 10 return getTransporter().bind(url, handler); 11 } 12 13

Transporter$Adaptive#bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1)

1public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException { 2 if (arg0 == null) throw new IllegalArgumentException("url == null"); 3 com.alibaba.dubbo.common.URL url = arg0; 4 String extName = url.getParameter("server", url.getParameter("transporter", "netty")); 5 if (extName == null) 6 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); 7 //根据server参数获得具体Transporter实例,我使用的server参数为netty4 8 com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); 9 return extension.bind(arg0, arg1); 10 } 11 12

com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter#bind(URL url, ChannelHandler listener)
netty启动的代码不写了,其中listener会被包装,当netty触发事件时会调用listener处理。

1public Server bind(URL url, ChannelHandler listener) throws RemotingException { 2 return new NettyServer(url, listener); 3 } 4 5

服务注册

回到远程暴露的核心类RegistryProtocol#export(final Invoker)
再贴一下代码:

1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 //导出服务部分 3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 4 //获得注册中心URL,zookeeper的URL为zookeeper://开头 5 URL registryUrl = getRegistryUrl(originInvoker); 6 // 获取 Registry,先往下看 7 final Registry registry = getRegistry(originInvoker); 8 //获得注册的URL,过滤了一些不需要的参数 9 final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); 10 11 //判断是否延迟发布 12 boolean register = registeredProviderUrl.getParameter("register", true); 13 14 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); 15 16 //register过程,就刚刚回来的地方继续 17 if (register) { 18 register(registryUrl, registeredProviderUrl); 19 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); 20 } 21 22 // 订阅override数据 23 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 24 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); 25 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 26 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 27 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 28 //保证每次export都返回一个新的exporter实例 29 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); 30 } 31 32

RegistryProtocol#getRegistry(final Invoker<?> originInvoker)

1private Registry getRegistry(final Invoker<?> originInvoker) { 2 //获得注册中心URL 3 URL registryUrl = getRegistryUrl(originInvoker); 4 //通过RegistryFactory$Adaptive获得具体实例,和其他Adaptive一样,就不贴代码了 5 return registryFactory.getRegistry(registryUrl); 6 } 7 8

zookeeper会获得ZookeeperRegistryFactory
AbstractRegistryFactory#getRegistry(URL url)

1public Registry getRegistry(URL url) { 2 url = url.setPath(RegistryService.class.getName()) 3 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) 4 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); 5 String key = url.toServiceString(); 6 // 加锁 7 LOCK.lock(); 8 try { 9 // 缓存 10 Registry registry = REGISTRIES.get(key); 11 if (registry != null) { 12 return registry; 13 } 14 // 缓存未命中,创建 Registry 实例 15 registry = createRegistry(url); 16 if (registry == null) { 17 throw new IllegalStateException("Can not create registry " + url); 18 } 19 // 写缓存 20 REGISTRIES.put(key, registry); 21 return registry; 22 } finally { 23 LOCK.unlock(); 24 } 25 } 26 27

ZookeeperRegistryFactory#createRegistry(URL url)

1public Registry createRegistry(URL url) { 2 return new ZookeeperRegistry(url, zookeeperTransporter); 3 } 4 5
1public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 2 super(url); 3 if (url.isAnyHost()) { 4 throw new IllegalStateException("registry address == null"); 5 } 6 //获取组名,默认dubbo 7 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); 8 if (!group.startsWith(Constants.PATH_SEPARATOR)) { 9 //加上路径,/group 10 group = Constants.PATH_SEPARATOR + group; 11 } 12 this.root = group; 13 // 创建 Zookeeper 客户端,先往下看 14 zkClient = zookeeperTransporter.connect(url); 15 // 状态监听器 16 zkClient.addStateListener(new StateListener() { 17 @Override 18 public void stateChanged(int state) { 19 if (state == RECONNECTED) { 20 try { 21 recover(); 22 } catch (Exception e) { 23 logger.error(e.getMessage(), e); 24 } 25 } 26 } 27 }); 28 } 29 30

ZookeeperTransporter$Adaptive,默认为 CuratorZookeeperTransporter,Apache Curator一个Zookeeper客户端。
CuratorZookeeperTransporter#connect(URL url)

1public ZookeeperClient connect(URL url) { 2 return new CuratorZookeeperClient(url); 3 } 4 5
1public CuratorZookeeperClient(URL url) { 2 super(url); 3 try { 4 // CuratorFramework 工厂,各种参数加进去 5 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() 6 .connectString(url.getBackupAddress()) 7 .retryPolicy(new RetryNTimes(1, 1000)) 8 .connectionTimeoutMs(5000); 9 String authority = url.getAuthority(); 10 if (authority != null && authority.length() > 0) { 11 builder = builder.authorization("digest", authority.getBytes()); 12 } 13 // 构建 CuratorFramework 实例 14 client = builder.build(); 15 // 添加监听器 16 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { 17 @Override 18 public void stateChanged(CuratorFramework client, ConnectionState state) { 19 if (state == ConnectionState.LOST) { 20 CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); 21 } else if (state == ConnectionState.CONNECTED) { 22 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); 23 } else if (state == ConnectionState.RECONNECTED) { 24 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); 25 } 26 } 27 }); 28 // 启动客户端 29 client.start(); 30 } catch (Exception e) { 31 throw new IllegalStateException(e.getMessage(), e); 32 } 33 } 34 35

回到服务暴露的核心类,在贴一下代码就不需要往上看啦

1public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 //导出服务部分 3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 4 //获得注册中心URL,zookeeper的URL为zookeeper://开头 5 URL registryUrl = getRegistryUrl(originInvoker); 6 // 获取 Registry,先往下看 7 final Registry registry = getRegistry(originInvoker); 8 9 //上面写到这边 10 //获得注册的URL,过滤了一些不需要的参数 11 final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); 12 13 //判断是否延迟发布 14 boolean register = registeredProviderUrl.getParameter("register", true); 15 16 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); 17 18 //register过程 19 if (register) { 20 // 向注册中心注册服务 21 register(registryUrl, registeredProviderUrl); 22 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); 23 } 24 25 // 订阅override数据 26 // 创建监听器,FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 27 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); 28 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 29 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 30 // 向注册中心订阅 31 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 32 // 创建DestroyableExporter,保证每次export都返回一个新的exporter实例 33 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); 34 } 35 36

RegistryProtocol#register(URL registryUrl, URL registedProviderUrl)

1 public void register(URL registryUrl, URL registedProviderUrl) { 2 Registry registry = registryFactory.getRegistry(registryUrl); 3 registry.register(registedProviderUrl); 4 } 5 6

ZookeeperRegistry的抽象父类FailbackRegistry#register(URL url)

1public void register(URL url) { 2 //加到一个hashset中 3 super.register(url); 4 failedRegistered.remove(url); 5 failedUnregistered.remove(url); 6 try { 7 // 向服务器端发送注册请求 8 doRegister(url); 9 } catch (Exception e) { 10 Throwable t = e; 11 // 如果打开启动检测,则直接抛出异常 12 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 13 && url.getParameter(Constants.CHECK_KEY, true) 14 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 15 boolean skipFailback = t instanceof SkipFailbackWrapperException; 16 if (check || skipFailback) { 17 if (skipFailback) { 18 t = t.getCause(); 19 } 20 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); 21 } else { 22 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); 23 } 24 // 将失败的注册请求记录到失败的列表中,定期重试 25 failedRegistered.add(url); 26 } 27 } 28 29

ZookeeperRegistry#doRegister(URL url)

  1. toUrlPath 方法生成节点路径,路径格式/${group}/${serviceInterface}/providers/${url}

  2. 通过Zookeeper客户端创建节点

1protected void doRegister(URL url) { 2 try { 3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 } 8 9

没有了!

下一篇:Dubbo负载均衡

代码交流 2021