Dubbo服务引用(消费端)启动netty源码分析

本文主要讲启动netty源码分析,前面会讲一下服务引用到netty启动部分的源码流程。

服务引用的入口方法为 ReferenceBean 的 getObject 方法

1public Object getObject() throws Exception { 2 return get(); 3 } 4

然后到com.alibaba.dubbo.config.ReferenceConfig#get方法

1public synchronized T get() { 2 if (destroyed) { 3 throw new IllegalStateException("Already destroyed!"); 4 } 5 // 检测 ref 是否为空,为空则通过 init 方法创建 6 if (ref == null) { 7 // init 方法主要用于处理配置,以及调用 createProxy 生成代理类 8 init(); 9 } 10 return ref; 11} 12

然后到com.alibaba.dubbo.config.ReferenceConfig#init

1private void init() { 2 // 避免重复初始化 3 if (initialized) { 4 return; 5 } 6 initialized = true; 7 // 检测接口名合法性 8 if (interfaceName == null || interfaceName.length() == 0) { 9 throw new IllegalStateException("interface not allow null!"); 10 } 11 12 // 检测 consumer 变量是否为空,为空则创建 13 checkDefault(); 14 appendProperties(this); 15 if (getGeneric() == null && getConsumer() != null) { 16 // 设置 generic 17 setGeneric(getConsumer().getGeneric()); 18 } 19 20 // 检测是否为泛化接口 21 if (ProtocolUtils.isGeneric(getGeneric())) { 22 interfaceClass = GenericService.class; 23 } else { 24 try { 25 // 加载类 26 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() 27 .getContextClassLoader()); 28 } catch (ClassNotFoundException e) { 29 throw new IllegalStateException(e.getMessage(), e); 30 } 31 checkInterfaceAndMethods(interfaceClass, methods); 32 } 33 34 // -------------------------------✨ 分割线1 ✨------------------------------ 35 36 // 从系统变量中获取与接口名对应的属性值 37 String resolve = System.getProperty(interfaceName); 38 String resolveFile = null; 39 if (resolve == null || resolve.length() == 0) { 40 // 从系统属性中获取解析文件路径 41 resolveFile = System.getProperty("dubbo.resolve.file"); 42 if (resolveFile == null || resolveFile.length() == 0) { 43 // 从指定位置加载配置文件 44 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); 45 if (userResolveFile.exists()) { 46 // 获取文件绝对路径 47 resolveFile = userResolveFile.getAbsolutePath(); 48 } 49 } 50 if (resolveFile != null && resolveFile.length() > 0) { 51 Properties properties = new Properties(); 52 FileInputStream fis = null; 53 try { 54 fis = new FileInputStream(new File(resolveFile)); 55 // 从文件中加载配置 56 properties.load(fis); 57 } catch (IOException e) { 58 throw new IllegalStateException("Unload ..., cause:..."); 59 } finally { 60 try { 61 if (null != fis) fis.close(); 62 } catch (IOException e) { 63 logger.warn(e.getMessage(), e); 64 } 65 } 66 // 获取与接口名对应的配置 67 resolve = properties.getProperty(interfaceName); 68 } 69 } 70 if (resolve != null && resolve.length() > 0) { 71 // 将 resolve 赋值给 url 72 url = resolve; 73 } 74 75 // -------------------------------✨ 分割线2 ✨------------------------------ 76 if (consumer != null) { 77 if (application == null) { 78 // 从 consumer 中获取 Application 实例,下同 79 application = consumer.getApplication(); 80 } 81 if (module == null) { 82 module = consumer.getModule(); 83 } 84 if (registries == null) { 85 registries = consumer.getRegistries(); 86 } 87 if (monitor == null) { 88 monitor = consumer.getMonitor(); 89 } 90 } 91 if (module != null) { 92 if (registries == null) { 93 registries = module.getRegistries(); 94 } 95 if (monitor == null) { 96 monitor = module.getMonitor(); 97 } 98 } 99 if (application != null) { 100 if (registries == null) { 101 registries = application.getRegistries(); 102 } 103 if (monitor == null) { 104 monitor = application.getMonitor(); 105 } 106 } 107 108 // 检测 Application 合法性 109 checkApplication(); 110 // 检测本地存根配置合法性 111 checkStubAndMock(interfaceClass); 112 113 // -------------------------------✨ 分割线3 ✨------------------------------ 114 115 Map<String, String> map = new HashMap<String, String>(); 116 Map<Object, Object> attributes = new HashMap<Object, Object>(); 117 118 // 添加 side、协议版本信息、时间戳和进程号等信息到 map 中 119 map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); 120 map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); 121 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 122 if (ConfigUtils.getPid() > 0) { 123 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 124 } 125 126 // 非泛化服务 127 if (!isGeneric()) { 128 // 获取版本 129 String revision = Version.getVersion(interfaceClass, version); 130 if (revision != null && revision.length() > 0) { 131 map.put("revision", revision); 132 } 133 134 // 获取接口方法列表,并添加到 map 中 135 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 136 if (methods.length == 0) { 137 map.put("methods", Constants.ANY_VALUE); 138 } else { 139 map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 140 } 141 } 142 map.put(Constants.INTERFACE_KEY, interfaceName); 143 // 将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中 144 appendParameters(map, application); 145 appendParameters(map, module); 146 appendParameters(map, consumer, Constants.DEFAULT_KEY); 147 appendParameters(map, this); 148 149 // -------------------------------✨ 分割线4 ✨------------------------------ 150 151 String prefix = StringUtils.getServiceKey(map); 152 if (methods != null && !methods.isEmpty()) { 153 // 遍历 MethodConfig 列表 154 for (MethodConfig method : methods) { 155 appendParameters(map, method, method.getName()); 156 String retryKey = method.getName() + ".retry"; 157 // 检测 map 是否包含 methodName.retry 158 if (map.containsKey(retryKey)) { 159 String retryValue = map.remove(retryKey); 160 if ("false".equals(retryValue)) { 161 // 添加重试次数配置 methodName.retries 162 map.put(method.getName() + ".retries", "0"); 163 } 164 } 165 166 // 添加 MethodConfig 中的“属性”字段到 attributes 167 // 比如 onreturn、onthrow、oninvoke 等 168 appendAttributes(attributes, method, prefix + "." + method.getName()); 169 checkAndConvertImplicitConfig(method, map, attributes); 170 } 171 } 172 173 // -------------------------------✨ 分割线5 ✨------------------------------ 174 175 // 获取服务消费者 ip 地址 176 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); 177 if (hostToRegistry == null || hostToRegistry.length() == 0) { 178 hostToRegistry = NetUtils.getLocalHost(); 179 } else if (isInvalidLocalHost(hostToRegistry)) { 180 throw new IllegalArgumentException("Specified invalid registry ip from property..." ); 181 } 182 map.put(Constants.REGISTER_IP_KEY, hostToRegistry); 183 184 // 存储 attributes 到系统上下文中 185 StaticContext.getSystemContext().putAll(attributes); 186 187 // 创建代理类 188 ref = createProxy(map); 189 190 // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel, 191 // 并将 ConsumerModel 存入到 ApplicationModel 中 192 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); 193 ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); 194} 195

首先是方法开始到分割线1之间的代码。这段代码主要用于检测 ConsumerConfig 实例是否存在,如不存在则创建一个新的实例,然后通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接着是检测泛化配置,并根据配置设置 interfaceClass 的值。接着来看分割线1到分割线2之间的逻辑。这段逻辑用于从系统属性或配置文件中加载与接口名相对应的配置,并将解析结果赋值给 url 字段。url 字段的作用一般是用于点对点调用。继续向下看,分割线2和分割线3之间的代码用于检测几个核心配置类是否为空,为空则尝试从其他配置类中获取。分割线3与分割线4之间的代码主要用于收集各种配置,并将配置存储到 map 中。分割线4和分割线5之间的代码用于处理 MethodConfig 实例。该实例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。分割线5到方法结尾的代码主要用于解析服务消费者 ip,以及调用 createProxy 创建代理对象。

接下来分析 createProxy 创建代理对象

com.alibaba.dubbo.config.ReferenceConfig#createProxy

该方法不仅用于创建代理对象还会调用其他方法构建以及合并 Invoker 实例

1private T createProxy(Map<String, String> map) { 2 URL tmpUrl = new URL("temp", "localhost", 0, map); 3 final boolean isJvmRefer; 4 if (isInjvm() == null) { 5 // url 配置被指定,则不做本地引用 6 if (url != null && url.length() > 0) { 7 isJvmRefer = false; 8 // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用 9 // 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true 10 } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { 11 isJvmRefer = true; 12 } else { 13 isJvmRefer = false; 14 } 15 } else { 16 // 获取 injvm 配置值 17 isJvmRefer = isInjvm().booleanValue(); 18 } 19 20 // 本地引用 21 if (isJvmRefer) { 22 // 生成本地引用 URL,协议为 injvm 23 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); 24 // 调用 refer 方法构建 InjvmInvoker 实例 25 invoker = refprotocol.refer(interfaceClass, url); 26 27 // 远程引用 28 } else { 29 // url 不为空,表明用户可能想进行点对点调用 30 if (url != null && url.length() > 0) { 31 // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分 32 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); 33 if (us != null && us.length > 0) { 34 for (String u : us) { 35 URL url = URL.valueOf(u); 36 if (url.getPath() == null || url.getPath().length() == 0) { 37 // 设置接口全限定名为 url 路径 38 url = url.setPath(interfaceName); 39 } 40 41 // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心 42 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 43 // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中 44 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 45 } else { 46 // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性), 47 // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等 48 // 最后将合并后的配置设置为 url 查询字符串中。 49 urls.add(ClusterUtils.mergeUrl(url, map)); 50 } 51 } 52 } 53 } else { 54 // 加载注册中心 url 55 List<URL> us = loadRegistries(false); 56 if (us != null && !us.isEmpty()) { 57 for (URL u : us) { 58 URL monitorUrl = loadMonitor(u); 59 if (monitorUrl != null) { 60 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 61 } 62 // 添加 refer 参数到 url 中,并将 url 添加到 urls 中 63 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 64 } 65 } 66 67 // 未配置注册中心,抛出异常 68 if (urls.isEmpty()) { 69 throw new IllegalStateException("No such any registry to reference..."); 70 } 71 } 72 73 // 单个注册中心或服务提供者(服务直连,下同) 74 if (urls.size() == 1) { 75 // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 76 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 77 78 // 多个注册中心或多个服务提供者,或者两者混合 79 } else { 80 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); 81 URL registryURL = null; 82 83 // 获取所有的 Invoker 84 for (URL url : urls) { 85 // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时 86 // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法 87 invokers.add(refprotocol.refer(interfaceClass, url)); 88 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 89 registryURL = url; 90 } 91 } 92 if (registryURL != null) { 93 // 如果注册中心链接不为空,则将使用 AvailableCluster 94 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 95 // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并 96 invoker = cluster.join(new StaticDirectory(u, invokers)); 97 } else { 98 invoker = cluster.join(new StaticDirectory(invokers)); 99 } 100 } 101 } 102 103 Boolean c = check; 104 if (c == null && consumer != null) { 105 c = consumer.isCheck(); 106 } 107 if (c == null) { 108 c = true; 109 } 110 111 // invoker 可用性检查 112 if (c && !invoker.isAvailable()) { 113 throw new IllegalStateException("No provider available for the service..."); 114 } 115 116 // 生成代理类 117 return (T) proxyFactory.getProxy(invoker); 118} 119

然后分析refer方法构建Invoker

Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,本节会分析最常用的两个,分别是 RegistryProtocol 和 DubboProtocol,其他的大家自行分析。下面先来分析 DubboProtocol 的 refer 方法源码。如下:

1public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { 2 optimizeSerialization(url); 3 // 创建 DubboInvoker 4 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); 5 invokers.add(invoker); 6 return invoker; 7} 8

其中有一个方法getClients(url),这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。

com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients

1private ExchangeClient[] getClients(URL url) { 2 // 是否共享连接 3 boolean service_share_connect = false; 4 // 获取连接数,默认为0,表示未配置 5 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); 6 // 如果未配置 connections,则共享连接 7 if (connections == 0) { 8 service_share_connect = true; 9 connections = 1; 10 } 11 12 ExchangeClient[] clients = new ExchangeClient[connections]; 13 for (int i = 0; i < clients.length; i++) { 14 if (service_share_connect) { 15 // 获取共享客户端 16 clients[i] = getSharedClient(url); 17 } else { 18 // 初始化新的客户端 19 clients[i] = initClient(url); 20 } 21 } 22 return clients; 23} 24

这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这两个方法。

1private ExchangeClient getSharedClient(URL url) { 2 String key = url.getAddress(); 3 // 获取带有“引用计数”功能的 ExchangeClient 4 ReferenceCountExchangeClient client = referenceClientMap.get(key); 5 if (client != null) { 6 if (!client.isClosed()) { 7 // 增加引用计数 8 client.incrementAndGetCount(); 9 return client; 10 } else { 11 referenceClientMap.remove(key); 12 } 13 } 14 15 locks.putIfAbsent(key, new Object()); 16 synchronized (locks.get(key)) { 17 if (referenceClientMap.containsKey(key)) { 18 return referenceClientMap.get(key); 19 } 20 21 // 创建 ExchangeClient 客户端 22 ExchangeClient exchangeClient = initClient(url); 23 // 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式 24 client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); 25 referenceClientMap.put(key, client); 26 ghostClientMap.remove(key); 27 locks.remove(key); 28 return client; 29 } 30} 31

上面方法先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。

1private ExchangeClient initClient(URL url) { 2 3 // 获取客户端类型,默认为 netty 4 String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); 5 6 // 添加编解码和心跳包参数到 url 中 7 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); 8 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 9 10 // 检测客户端类型是否存在,不存在则抛出异常 11 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { 12 throw new RpcException("Unsupported client type: ..."); 13 } 14 15 ExchangeClient client; 16 try { 17 // 获取 lazy 配置,并根据配置值决定创建的客户端类型 18 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { 19 // 创建懒加载 ExchangeClient 实例 20 client = new LazyConnectExchangeClient(url, requestHandler); 21 } else { 22 // 创建普通 ExchangeClient 实例 23 client = Exchangers.connect(url, requestHandler); 24 } 25 } catch (RemotingException e) { 26 throw new RpcException("Fail to create remoting client for service..."); 27 } 28 return client; 29} 30

initClient 方法首先获取用户配置的客户端类型,默认为 netty。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的 LazyConnectExchangeClient 代码并不是很复杂,该类会在 request 方法被调用时通过 Exchangers 的 connect 方法创建 ExchangeClient 客户端

下面我们分析一下 Exchangers 的 connect 方法。com.alibaba.dubbo.remoting.exchange.Exchangers#connect(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.exchange.ExchangeHandler)

1public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 if (handler == null) { 6 throw new IllegalArgumentException("handler == null"); 7 } 8 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); 9 // 获取 Exchanger 实例,默认为 HeaderExchangeClient 10 return getExchanger(url).connect(url, handler); 11} 12

如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现。com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger#connect

1public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 2 // 这里包含了多个调用,分别如下: 3 // 1. 创建 HeaderExchangeHandler 对象 4 // 2. 创建 DecodeHandler 对象 5 // 3. 通过 Transporters 构建 Client 实例 6 // 4. 创建 HeaderExchangeClient 对象 7 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); 8} 9 10

这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。

1public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 ChannelHandler handler; 6 if (handlers == null || handlers.length == 0) { 7 handler = new ChannelHandlerAdapter(); 8 } else if (handlers.length == 1) { 9 handler = handlers[0]; 10 } else { 11 // 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器 12 handler = new ChannelHandlerDispatcher(handlers); 13 } 14 15 // 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例 16 return getTransporter().connect(url, handler); 17} 18

如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。如下:

com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter#connect

1public Client connect(URL url, ChannelHandler listener) throws RemotingException { 2 // 创建 NettyClient 对象 3 return new NettyClient(url, listener); 4} 5
1com.alibaba.dubbo.remoting.transport.netty4.NettyClient#NettyClient 2
1public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { 2 super(url, wrapChannelHandler(url, handler)); 3 } 4

com.alibaba.dubbo.remoting.transport.AbstractClient#wrapChannelHandler

1protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) { 2 url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); 3 url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); 4 return ChannelHandlers.wrap(handler, url); 5 } 6

然后会调用ChannelHandlers.wrap 方法com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrap

1 public static ChannelHandler wrap(ChannelHandler handler, URL url) { 2 return ChannelHandlers.getInstance().wrapInternal(handler, url); 3 } 4

然后到com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrapInternal

1 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { 2 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) 3 .getAdaptiveExtension().dispatch(handler, url))); 4 } 5

channelhandler.wrap 里面会调用Dispatcher 的扩展,进行dispatch操作,实际是对handler 的包装动态化。

  根据配置不同,调用Dispatch扩展包装后的handler 是不一样的。 默认采用AllDispatcher扩展,用 AllChannelHandler 包装1次。

  采用默认配置,经过包装之后,传入super方法的handler 包装链如下:

     MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler ->  DecodeHandler ->  HeaderExchangeHandler -> DubboProtocol.requestHandler

然后再回到com.alibaba.dubbo.remoting.transport.netty4.NettyClient#NettyClient

1public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { 2 super(url, wrapChannelHandler(url, handler)); 3 } 4

wrapChannelHandler(url, handler)方法会返回一个handler 包装链如下:

     MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler ->  DecodeHandler ->  HeaderExchangeHandler -> DubboProtocol.requestHandler

然后到com.alibaba.dubbo.remoting.transport.AbstractClient#AbstractClient

1 public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { 2 super(url, handler); 3 4 send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); 5 6 shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); 7 8 // The default reconnection interval is 2s, 1800 means warning interval is 1 hour. 9 reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); 10 11 try { 12 //创建netty客户端 13 doOpen(); 14 } catch (Throwable t) { 15 close(); 16 throw new RemotingException(url.toInetSocketAddress(), null, 17 "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() 18 + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); 19 } 20 try { 21 // connect. 22 connect(); 23 if (logger.isInfoEnabled()) { 24 logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); 25 } 26 } catch (RemotingException t) { 27 if (url.getParameter(Constants.CHECK_KEY, true)) { 28 close(); 29 throw t; 30 } else { 31 logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() 32 + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t); 33 } 34 } catch (Throwable t) { 35 close(); 36 throw new RemotingException(url.toInetSocketAddress(), null, 37 "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() 38 + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); 39 } 40 41 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) 42 .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); 43 ExtensionLoader.getExtensionLoader(DataStore.class) 44 .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); 45 } 46

先分析super(url, handler);com.alibaba.dubbo.remoting.transport.AbstractEndpoint#AbstractEndpoint

1 public AbstractEndpoint(URL url, ChannelHandler handler) { 2 super(url, handler); 3 this.codec = getChannelCodec(url); 4 this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 5 this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); 6 } 7

然后接着super(url, handler);com.alibaba.dubbo.remoting.transport.AbstractPeer#AbstractPeer

1public AbstractPeer(URL url, ChannelHandler handler) { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 if (handler == null) { 6 throw new IllegalArgumentException("handler == null"); 7 } 8 this.url = url; 9 this.handler = handler; 10 } 11

记得handler是handler 包装链如下:

     MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler ->  DecodeHandler ->  HeaderExchangeHandler -> DubboProtocol.requestHandler

然后再回到到com.alibaba.dubbo.remoting.transport.AbstractClient#AbstractClient方法

其中doOpen方法是重点,创建了netty客户端,跟进去看一下:com.alibaba.dubbo.remoting.transport.netty4.NettyClient#doOpen

 

1 @Override 2 protected void doOpen() throws Throwable { 3 NettyHelper.setNettyLoggerFactory(); 4 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); 5 bootstrap = new Bootstrap(); 6 bootstrap.group(nioEventLoopGroup) 7 .option(ChannelOption.SO_KEEPALIVE, true) 8 .option(ChannelOption.TCP_NODELAY, true) 9 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 10 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) 11 .channel(NioSocketChannel.class); 12 13 if (getTimeout() < 3000) { 14 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); 15 } else { 16 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()); 17 } 18 19 bootstrap.handler(new ChannelInitializer() { 20 21 protected void initChannel(Channel ch) throws Exception { 22 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); 23 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug 24 .addLast("decoder", adapter.getDecoder()) 25 .addLast("encoder", adapter.getEncoder()) 26 .addLast("handler", nettyClientHandler); 27 } 28 }); 29 } 30

其中nettyClientHandler是 

1public NettyClientHandler(URL url, ChannelHandler handler) { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 if (handler == null) { 6 throw new IllegalArgumentException("handler == null"); 7 } 8 this.url = url; 9 this.handler = handler; 10 } 11 12

其中com.alibaba.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead

1 @Override 2 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 3 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); 4 try { 5 handler.received(channel, msg); 6 } finally { 7 NettyChannel.removeChannelIfDisconnected(ctx.channel()); 8 } 9 } 10

这里面的handler是NettyClient,由于NettyClient是AbstractPeer字类,(上面通过不断的super()方法也可得知)

所以会跳到com.alibaba.dubbo.remoting.transport.AbstractPeer#received

1 public void received(Channel ch, Object msg) throws RemotingException { 2 if (closed) { 3 return; 4 } 5 handler.received(ch, msg); 6 } 7

这里面的handler就是之前通过不断的super()传递最后初始化的

handler 包装链:

     MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler ->  DecodeHandler ->  HeaderExchangeHandler -> DubboProtocol.requestHandler

其中HeaderExchangeHandler非常重要,将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。

这个可以看我之前写的Dubbo同步调用和超时源码 https://blog.csdn.net/u014082714/article/details/104210936

 

代码交流 2021