dubbo源码总结

源码导读:http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html

概念
Invoker:实体域。Invoker是Dubbo的核心模型,其它模型都向它靠扰或转换成它,它代表一个可执行体,可向它发起invoke调用。
它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
提供者的invoker
1.本质上应该是一个代理,经过层层包装最终进行了发布。当消费者发起请求的时候,会获得这个invoker进行调用。
2.最终发布出去的invoker也不是单纯的一个代理,也是经过多层包装:InvokerDelegate(DelegateProviderMetaDataInvoker(JavassistProxyFactory.AbstractProxyInvoker))
AbstractProxyInvoker invoker = JavassistProxyFactory.getInvoke();
消费者的2层invoker
1.外层invoker:Cluster.join(Directory directory);
把Directory中的多个DubboInvoker合并为1个Invoker,这个Invoker代表消费者进行远程调用时的操作对象。
Invoker=MockClusterInvoker(FailoverClusterInvoker(Directory)。对应1个服务提供者。
2.内层invoker:RegistryDirectory.urlInvokerMap 中缓存的invoker
invoker=InvokerDelegate(ListenerInvokerWrapper(CallbackRegistrationInvoker(AsyncToSyncInvoker(DubboInvoker))))。对应1个服务。
Exporter:服务暴露。Exporter exporter= Protocol.export(Invoker invoker):暴露某种协议的服务。
Proxy:Proxy层封装了所有接口的透明化代理。其它层都以Invoker为中心,只有到了暴露给用户使用时,才用Proxy将Invoker转成接口,或将接口实现转成Invoker。
也就是去掉Proxy层RPC是可以Run的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
Registry:注册中心。有多种实现:Zookeeper/Nacos/Consul/Redis/Sofa/Etcd/Dubbo
Directory:目录。分为:StaticDirectory/RegistryDirectory。
服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。
如果服务提供者的数量或者配置发生变动,服务目录中的记录也要做相应的更新。
服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个Invoker对象,并把这个Invoker对象存储起来,这个Invoker才是服务目录最终持有的对象。Invoker是一个具有远程调用功能的对象。
服务目录本质就是Invoker集合,且这个集合中的元素会随注册中心的变化而进行动态调整。
Mock:降级。
Router:路由。包括:条件路由/标签路由/脚本路由
LoadBalance:负载均衡。策略:基于权重随机(RandomLoadBalance)/最少活跃调用数(LeastActiveLoadBalance)/一致性hash(ConsistentHashLoadBalance)/加权轮询(RoundRobinLoadBalance)
Cluster:集群容错。包括: Failover/Failfast/Failsafe/Failback/Forking => 失败自动切换/快速失败/失败安全/失败自动恢复/并行调用多个服务提供者
Cluster的目的是将多个Invoker伪装成一个Invoker。只有一个提供者时,是不需要Cluster的。
Filter:在每次的调用过程中,Filter的拦截都会被执行。
Dubbo已经实现的Filter有二十几个,它们的入口都是ProtocolFilterWrapper,ProtocolFilterWrapper对Protocol做了Wrapper,会在加载扩展的时候被加载进来。
用户自己配置的Filter中,有些是默认激活,有些是需要通过配置文件来激活。
而所有Filter的加载顺序,也是先处理Dubbo的默认Filter,再来处理用户自己定义并且配置的Filter。
通过"-"配置,可以替换掉Dubbo的原生Filter,通过这样的设计,可以灵活地替换或者修改Filter的加载顺序。
Protocol:协议层。支持:dubbo/http/rmi/hessian/rest/webservice/thrift/grpc/memcached/redis
Protocol是核心层,只要有Protocol+Invoker+Exporter就可以完成非透明的RPC调用,然后在Invoker的主过程上Filter拦截点。
AbstractProtocol.exporterMap:保存了提供者暴露的所有服务。
Transport:netty3/netty4/mina/jetty/grizzly/p2p/zookeeper
Serialization:Hessian2/java/json/fastjson/kryo/NativeHessian/Avro
RpcInvocation:参数组装。它持有调用过程中的变量,比如方法名,参数等。
Request:客户端请求。封装了Invocation。
Response:
Result:一次RPC调用的结果。
ProviderConsumerRegTable:本地注册表。服务暴露时注册提供者,服务发现时注册消费者,服务重新暴露时重新注册提供者。

invoker代理的服务提供方接口类(interface属性)可以有多个方法。
消费者调用时怎么知道调哪个方法:配置接口级别的引用,代码中直接调API接口的某个方法。
ProviderConfig是提供方配置:当 ProtocolConfig 和 ServiceConfig 某属性没有配置时,采用此缺省值
ConsumerConfig是消费方配置:当 ReferenceConfig 某属性没有配置时,采用此缺省值
GenericService:泛化服务:http://dubbo.apache.org/zh-cn/docs/user/demos/generic-service.html http://dubbo.apache.org/zh-cn/docs/user/demos/generic-reference.html

SPI
1.ExtensionLoader.getExtensionLoader:创建指定类型的ExtensionLoader对象
1.new ExtensionLoader(type):创建指定类型的ExtensionLoader对象
->AdaptiveExtensionFactory objectFactory = ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension():初始化这个extensionLoader对象的objectFactory
objectFactory作用:扩展点依赖注入(injectExtension)时会通过objectFactory生成待注入的属性所需类型的自适应扩展点
2.把extensionLoader对象缓存到Map中:EXTENSION_LOADERS.putIfAbsent(type,extensionLoader);
2.ExtensionLoader.getExtension:创建指定类型静态扩展点的实例
createExtension
1.getExtensionClasses->loadExtensionClasses->loadDirectory(加载指定路径下的文件中配置的扩展点Class,保存到cachedClasses属性中)->loadResource->loadClass
1.cacheAdaptiveClass:把标注了@Adaptive注解的类赋值给cachedAdaptiveClass属性。
2.cacheWrapperClass:保存type类型的包装类到cachedWrapperClasses属性。type为Protocol时包装类为:QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper
3.cacheActivateClass:保存标注了@Activate注解的类到cachedActivates属性。
4.saveInExtensionClass
2.clazz.newInstance
3.injectExtension
4.instance = wrapperClass.getConstructor(type).newInstance(instance);包装instance,例如生成DubboProtocol时经过包装后变为:QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol))
3.ExtensionLoader.getAdaptiveExtension:创建指定类型自适应扩展点的实例
createAdaptiveExtension
1.getAdaptiveExtensionClass
1.getExtensionClasses->loadExtensionClasses->loadDirectory(加载指定路径下的文件中配置的扩展点Class,保存到cachedClasses属性中)->loadResource->loadClass
1.cacheAdaptiveClass:把标注了@Adaptive注解的类赋值给cachedAdaptiveClass属性。
2.cacheWrapperClass:保存type类型的包装类到cachedWrapperClasses属性。
1.type为Protocol时包装类为:QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper
2.程序运行过程中,当调用到Protocol$Adaptive的@Adaptive方法时,会根据URL(例如dubbo://ip:port)获取静态扩展点(DubboProtocol),其中会把DubboProtocol包装为QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol)) 3.cacheActivateClass:保存标注了@Activate注解的类到cachedActivates属性。 4.saveInExtensionClass 2.createAdaptiveExtensionClass:为标注了@Adaptive的方法创建Class对象 1.String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();生成动态代理类的字符串 2.ClassLoader classLoader = findClassLoader(); 3.Compiler compiler = ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();Compiler实际上是JavassistCompiler 4.Class clazz = compiler.compile(code, classLoader);编译1生成的code代码为Class对象 2.Class.newInstance:实例化1中的Class对象 3.injectExtension 4.ExtensionLoader.getActivateExtension:创建指定类型激活扩展点的实例 5.ExtensionLoader.injectExtension:扩展点依赖注入 0.ExtensionLoader.createExtension/createAdaptiveExtension 创建扩展点时会调用injectExtension,把当前扩展点实例需要通过setter注入赋值的属性赋上值 1.instance:需要通过setter注入属性的对象,其本身也是一个扩展点。例如:RegistryProtocol,需要通过setProtocol注入Protocol 2.method:RegistryProtocol的setProtocol方法 3.pt:Protocol.class。其本身是一个扩展点。 4.property:"Protocol" 5.object:Protocol$Adaptive。objectFactory.getExtension(pt, property)->AdaptiveExtensionFactory.getExtension:遍历SpiExtensionFactory/SpringExtensionFactory获取扩展点实例。
1.SpiExtensionFactory.getExtension->ExtensionLoader.getAdaptiveExtension(Protocol.class)
2.SpringExtensionFactory.getExtension->ApplicationContext.getBean:从spring容器中按name查找
6.method.invoke(instance, object);调用RegistryProtocol的setProtocol方法,把Protocol$Adaptive对象注入到RegistryProtocol的protocol属性 7.例子 1.RegistryProtocol的protocol属性赋值:RegistryProtocol registryProtocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry")->createExtension->injectExtension:把5中生成的Protocol$Adaptive对象赋值给protocol属性
2.RegistryProtocol的registryFactory属性赋值:RegistryProtocol registryProtocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(“registry”)->createExtension->injectExtension:把5中生成的RegistryFactory$Adaptive对象赋值给registryFactory属性 3.ZookeeperRegistryFactory的zookeeperTransporter属性赋值:ZookeeperRegistryFactory zookeeperRegistryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper")->createExtension->injectExtension:把5中生成的ZookeeperTransporter$Adaptive对象赋值给zookeeperTransporter属性

服务暴露
1.DubboBeanDefinitionParser.parse方法解析spring.xml中的dubbo:service标签内容保存到ServiceBean
2.ServiceBean.afterPropertiesSet把dubbo中配置的application/registry/service/protocol等信息,加载到对应的config实体中,便于后续的使用
3.ServiceBean.onApplicationEvent(监听到spring上下文被刷新或者加载的时候触发)->export->ServiceConfig.export->doExport->doExportUrls:循环遍历protocols暴露服务
4.ServiceConfig.doExportUrlsFor1Protocol
1.解析dubbo:method配置,保存到map集合中
2.获得当前服务需要暴露的ip和端口
3.把解析到的所有数据,组装成一个URL
4.发布Local或Remote服务:Remote循环遍历registryURLs暴露服务。registryURL类型:registry://ip:port
1.AbstractProxyInvoker invoker = JavassistProxyFactory.getInvoker
2.protocol.export
5.RegistryProtocol.export
1.URLregistryUrl=getRegistryUrl:registryUrl类型从registry://ip:port变为zookeeper://ip:port
2.URLproviderUrl=getProviderUrl:providerUrl类型:dubbo://ip:port
3.订阅dubbo-admin,监听配置变动
4.doLocalExport->protocol.export:protocol经过包装变为QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol))
->QosProtocolWrapper.export->ProtocolListenerWrapper.export->ProtocolFilterWrapper.export->DubboProtocol.export
0.ExtensionLoader.loadClass中会判断:isWrapperClass(clazz)。此处是一个扩展点。
1.QosprotocolWrapper:如果当前配置了注册中心,则会启动一个Qosserver.qos是dubbo的在线运维命令
2.ProtocolFilterWrapper:基于激活扩展点使用责任链模式构建filter链,来对invoker进行filter包装,从而在实现远程调用的时候,会经过这些filter进行过滤。
3.ProtocolListenerWrapper:用于服务export时候插入监听机制,暂未实现
6.DubboProtocol.export
1.openServer->createServer->Exchangers.bind->HeaderExchanger.bind->Transporters.bind->Transporter$Adaptive.bind
->NettyTransporter(默认netty4).bind->newNettyServer->newAbstractServer->NettyServer(默认netty4).doOpen
提供者启动netty服务与注册中心建立nio连接
ServerBootstrap.nettyServerHandler = NettyServerHandler(NettyServer(MultiMessageHandler(HeartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHandler(DubboProtocol.ExchangeHandlerAdapter)))))))
2.exporterMap.put(key, exporter):exporterMap缓存本次exporter,方便调用时提供者从缓存中得到exporter进而获取invoker。
7.总结
1.基于spring解析配置文件存储到config
2.各种判断,保障配置信息安全
3.组装URL:registry/zookeeper/dubbo/injvm
4.使用动态代理构建1个invoker:AbstractProxyInvoker = JavassistProxyFactory.getInvoker
5.RegistryProtocol.export
6.对DubboProtocol进行包装:QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol))
7.DubboProtocol.export发布服务:Exporter exporter= DubboProtocol.export(Invoker invoker)
8.启动NettyServer
9.扩展点依赖注入:ExtensionLoader.injectExtension

服务注册
1.RegistryProtocol.export:此处为入口
1.URL registryUrl = getRegistryUrl(originInvoker);得到的registryUrl形式为:zookeeper://ip:port
2.URL providerUrl = getProviderUrl(originInvoker);得到的providerUrl形式为:dubbo://ip:port
2.RegistryProtocol.getRegistry:获取注册中心
1.getRegistryUrl:把registryUrl从配置的协议(registry://)转换为具体的注册中心协议(例如zookeeper://)
2.AbstractRegistryFactory.getRegistry->createRegistry(根据URL创建注册中心)->ZookeeperRegistryFactory.createRegistry
->new ZookeeperRegistry->AbstractZookeeperTransporter.connect->createZookeeperClient->CuratorZookeeperTransporter.createZookeeperClient
->new CuratorZookeeperClient->CuratorFrameworkFactory.Builder.build
3.RegistryProtocol.register:把提供者URL写到注册中心zookeeper节点上
1.Registry registry = registryFactory.getRegistry(registryUrl);获取到的注册中心(registry)为ZookeeperRegistry
->RegistryFactory$Adaptive.getRegistry(registryUrl)
->ZookeeperRegistryFactory extension=ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(“zookeeper”)
->ZookeeperRegistryFactory.getRegistry(registryUrl)->AbstractRegistryFactory.getRegistry:2.2生成的注册中心会缓存起来,这里可以从缓存中取到
2.FailbackRegistry.register->doRegister->ZookeeperRegistry.doRegister->AbstractZookeeperClient.create->createEphemeral
->CuratorZookeeperClient.createEphemeral->CuratorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(path)

服务发现
1.DubboBeanDefinitionParser.parse方法解析spring.xml中的dubbo:reference标签内容保存到ReferenceBean
->ReferenceBean.afterPropertiesSet把dubbo中配置的consumer/application/module/monitor/metrics等信息,加载到对应的config实体中,便于后续的使用
->getObject->ReferenceConfig.get
2.ReferenceConfig.get
->init->createProxy
1.checkRegistry
2.loadRegistries:根据注册中心配置进行解析得到URL=> registry://ip:port/org.apache.dubbo.service.RegsitryService
3.Invoker invoker=RegistryProtocol.refer(type,url)->doRefer
1.new RegistryDirectory:初始化RegistryDirectory
2.ZookeeperRegistry.register(registeredConsumerUrl)😕/注册consumerUrl到注册中心
3.RegistryDirectory.buildRouterChain
4.RegistryDirectory.subscribe:订阅注册中心指定节点的变化,如果发生变化,则通知到RegistryDirectory
5.Invoker invoker=MockClusterWrapper(FailoverCluster).join(RegistryDirectory):构建消费者的外层invoker=MockClusterInvoker(FailoverClusterInvoker(RegistryDirectory)
4.ProxyFactory$Adaptive.getProxy(invoker)->AbstractProxyFactory.getProxy->JavassistProxyFactory.getProxy
->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
@Reference注入的一个对象实例本质上就是一个动态代理类。通过调用这个类中的方法,会触发InvokerInvocationHandler.invoke()
3.RegistryDirectory.subscribe
1.setConsumerUrl
2.ConsumerConfigurationListener.addNotifyListener(this):把当前RegistryDirectory作为listener,去监听zk上节点的变化
Dubbo2.7.X新增了针对配置节点的监听回调:
ReferenceConfigurationListener:负责reference的配置监听。监听/dubbo/config/org.apache.dubbo.demo.DemoService.configurators节点的data变化。
ConsumerConfigurationListener:负责consumer的配置监听。监听/dubbo/config/consumer-application-name.configurators节点的data变化。
ServiceConfigurationListener:负责service的配置监听。
ProviderConfigurationListener:负责provider的配置监听。
3.new ReferenceConfigurationListener(this, url)
4.ZookeeperRegsitry.subscribe->FailbackRegistry.subscribe->doSubscribe->ZookeeperRegistry.doSubscribe:监听某个服务下3个节点的子节点变动:providers/configurators/routers
1.ZookeeperClient.addChildListener:监听zk上providers/configurators/routers节点的子节点
2.notify->doNotify->AbstractRegistry.notify->RegistryDirectory.notify:对已经可用的列表通知provider的变动。Invoker的网络连接以及后续的配置变更,都会调用这个notify方法。
1.toRouters(routerURLs).ifPresent(this::addRouters):如果router路由节点有变化,则从新router下的数据生成router
2.refreshOverrideAndInvoker->refreshInvoker:获得providerURLs,然后刷新providerURLs
toInvokers:根据providerURL生成新的invoker。1个服务有n个提供者,就会生成n个providerURL。
Invoker invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl):构建消费者的内层invoker,缓存到RegistryDirectory.urlInvokerMap
invoker=InvokerDelegate(ListenerInvokerWrapper(CallbackRegistrationInvoker(AsyncToSyncInvoker(DubboInvoker))))
protocol.refer = ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol))).refer
其中 CallbackRegistrationInvoker=ProtocolFilterWrapper.refer:包装了Filter链
4.ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol))).refer->DubboProtocol.refer->AbstractProtocol.refer
Protocol.refer:通过指定的协议来调用refer生成一个invoker对象。消费者生成的对远程服务引用。invoker主要用于执行远程调用。
->DubboProtocol.protocolBindingRefer->getClients:获得客户端连接。
DubboInvoker就是通过调用 client.request()方法完成网络通信的请求发送和响应接收功能。
DubboInvoker.clients:1个DubboInvoker中可以保存多个ExchangeClient。1个ExchangeClient代表consumer到provider的1个连接。client = ReferenceCountExchangeClient(HeaderExchangeClient)
->getSharedClient:如果没有配置共享连接数,则connections为默认值1。如果配置了共享连接数,则connections为配置的值,表示consumer可以与每个provider节点建立多个连接。
->buildReferenceCountExchangeClientList->buildReferenceCountExchangeClient
->initClient->Exchangers.connect->HeaderExchanger.connect->Transporters.connect:通过netty建立连接
->NettyTransporter.connect->AbstractClient.connect->NettyClient.doConnect->Bootstrap.connect
5.总结
1.生成远程服务的代理
2.从注册中心获取获得目标服务的url地址,订阅注册中心上的节点变化
3.实现远程网络通信,建立通信连接
4.实现负载均衡
5.实现集群容错
6.服务降级(Mock)
7.序列化

服务调用
1.客户端调用服务
@Reference注入的一个对象实例本质上就是一个动态代理类。
通过调用这个类中的方法,会触发InvokerInvocationHandler(MockClusterInvoker(FailoverClusterInvoker(RegistryDirectory))).invoke()
MockClusterInvoker.invoke->FailoverClusterInvoker(extends AbstractClusterInvoker).invoke
1.RpcInvocation.addAttachments:隐式传参
2.List invokers = list(RpcInvocation)->RegistryDirectory.doList->RouterChain.route->Router.route:执行路由策略筛选RegistryDirectory中保存的多个invoker
3.initLoadBalance:初始化负载均衡策略
4.doInvoke->FailoverClusterInvoker.doInvoke
1.获得1个内层invoker:Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked)->AbstractClusterInvoker.doSelect
->AbstractLoadBalance.select->RandomLoadBalance.doSelect:执行负载均衡策略选出1个内层invoker
invoker=InvokerDelegate(ListenerInvokerWrapper(CallbackRegistrationInvoker(AsyncToSyncInvoker(DubboInvoker))))
2.远程调用服务:Result result = 内层invoker.invoke(invocation)->…->DubboInvoker(extends AbstractInvoker).invoke->DubboInvoker.doInvoke
->ExchangeClient.request->ReferenceCountExchangeClient.request->HeaderExchangeClient.request->HeaderExchangeChannel.request
->AbstractPeer.send->AbstractClient.send->NettyChannel.send->Channel.writeAndFlush
2.服务端处理请求
nettyServerHandler.channelRead->NettyServer(extends AbstractPeer).received->MultiMessageHandler.received->HeartbeatHandler.received
->AllChannelHandler.received->ChannelEventRunnable.run->DecodeHandler.received->HeaderExchangeHandler.received->handleRequest
->DubboProtocol.ExchangeHandlerAdapter.reply
1.Invoker invoker = getInvoker(HeaderExchangeChannel,Invocation)
->DubboExporter exporter =AbstractProtocol.exporterMap.get(serviceKey)->DubboExporter.getInvoker()
2.Result result = invoker.invoke(Invocation)
服务提供者的invoker = InvokerDelegate(DelegateProviderMetaDataInvoker(JavassistProxyFactory.AbstractProxyInvoker))
InvokerDelegate(extends InvokerWrapper).invoke->DelegateProviderMetaDataInvoker.invoke
->AbstractProxyInvoker.AbstractProxyInvoker.doInvoke->Wrapper.invokeMethod->DemoServiceImpl.sayHello
MultiMessageHandler: 复合消息处理
HeartbeatHandler:心跳消息处理,接收心跳并发送心跳响应
AllChannelHandler:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任务给线程池处理
DecodeHandler:业务解码处理器

代码交流 2021