10. Dubbo原理解析-Listener & filter

Listener

ExporterListener:

dubbo在服务暴露(exporter)以及销毁暴露(unexporter)服务的过程中提供了回调窗口,供用户做业务处理。ProtocolListenerWrapper在暴露过程中构建了监听器链

public class ProtocolListenerWrapper implements Protocol {

   public <T> Exporter<T> export(Invoker<T>invoker)throws RpcException {

    ……. //注册中心代码

     return newListenerExporterWrapper<T>( protocol.export(invoker),                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(),Constants.EXPORTER_LISTENER_KEY)));

}

}

1.      根据Dubbo的SPI扩展机制获取所有实现了ExporterListener的监听器listeners

2.      Protocol.export(invoker)暴露服务返回结果exporter对象

3.      ListenerExporterWrapper装饰exporter, 在构造器中遍历listeners构建export的监听链

4.      ListenerExporterWrapper实现Exproter<T>接口,在unexport方法实现中构建unexport的监听链

 

InvokerListener:

dubbo在服务引用(refer)以及销毁引用(destroy)服务的过程中提供了回调窗口,供用户做业务处理。ProtocolListenerWrapper在暴露过程中构建了监听器链

public <T> Invoker<T> refer(Class<T> type,URL url)throws RpcException {

return new ListenerInvokerWrapper<T>( protocol.refer(type, url),               Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url,Constants.INVOKER_LISTENER_KEY)));

}

1.      根据Dubbo的SPI扩展机制获取所有实现了InvokerListener的监听器listeners

2.      Protocol.refer(type, url)暴露服务返回结果invoker对象

  3. ListenerInvokerWrapper装饰invoker, 在构造器中遍历listeners构建referer的监听链3.      ListenerInvokerWrapper装饰invoker, 在构造器中遍历listeners构建referer的监听链

4.      ListenerInvokerWrapper实现Invoker<T>接口,在destory方法实现中构建destory的监听链

 

Dubbo的开源版本中没有监听的实现,但是开放了口子,业务方如有需要可以利用这个功能实现特定的业务

 

 

Filter

Filter:是一种递归的链式调用,用来在远程调用真正执行的前后加入一些逻辑,跟aop的拦截器servlet中filter概念一样的

Filter接口定义

@SPI

public interface Filter {

   Result invoke(Invoker<?> invoker,Invocation invocation) ** throws** RpcException;

}

 

ProtocolFilterWrapper:在服务的暴露与引用的过程中根据KEY是PROVIDER还是CONSUMER来构建服务提供者与消费者的调用过滤器链

public <T> Exporter<T> export(Invoker<T>invoker)throws RpcException {

return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));

}

public <T> Invoker<T> refer(Class<T> type,URL url)throws RpcException {

     return buildInvokerChain( protocol.refer(type, url),Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);

}

 

Filter的实现类需要打上@Activate注解, @Activate的group属性是个string数组,我们可以通过这个属性来指定这个filter是在consumer, provider还是两者情况下激活,所谓激活就是能够被获取,组成filter链

List<Filter> filters =ExtensionLoader.getExtensionLoader(Filter.class).getAct ivateExtension(invoker.getUrl(),key, group);

Key就是SERVICE_FILTER_KEY还是REFERENCE_FILTER_KEY

Group就是consumer或者provider

 

构建filter链,当我们获取激活的filter集合后就通过buildInvokerChain方法来构建

for (int i = filters.size() - 1; i >= 0; i --) {

      final Filter filter = filters.get(i);

      final Invoker<T> next = last;

      last = new Invoker<T>() {

            public Result invoke(Invocation invocation)throws RpcException {

                 return filter.invoke(next, invocation);

            }

           。。。。。。。 //其他方法

       };

 }

以上代码展示了构建filter链的过程

 

Dubbo内容提供了大量内部实现,用来实现调用过程额外功能, 如向监控中心发送调用数据, Tps限流等等, 每个filer专注一块功能。用户同样可以通过Dubbo的SPI扩展机制现在自己的功能

代码交流 2021