dubbo入门

dubbo是阿里巴巴开源的单一长连接服务框架,底层通信采用nio框架,支持netty,mina,grizzly,默认是netty。对dubbo比较感兴趣的是:

  1. client端的线程模型是什么样的?

传统的io client是请求应答模式,发送请求-->等待远程应答。dubbo底层是异步IO的,所有请求复用单一长连接,所以调用都不会阻在IO上,而是阻在Future超时wait上。
2. server端的线程模型是什么样的?
这个比较成熟了,现在一般的server都是基于nio,一批io thread负责处理io,一批worker thread负责处理业务。

 

 一. 快速启动

学习dubbo最好的方式是快速运行起来,由于dubbo还是比较重量级的产品,之前遇到一些问题。
server端:

 

1import java.io.IOException; 2 3import com.alibaba.dubbo.config.ApplicationConfig; 4import com.alibaba.dubbo.config.ProtocolConfig; 5import com.alibaba.dubbo.config.ServiceConfig; 6import com.duitang.dboss.client.test.BlogQueryService; 7import com.duitang.dboss.client.test.BlogQueryServiceImpl; 8 9public class DubboServerTester { 10 11 public static void main(String[] args) throws IOException { 12 BlogQueryService blogQueryService = new BlogQueryServiceImpl(); 13 ApplicationConfig application = new ApplicationConfig(); 14 application.setName("dubbo-test"); 15 16 ProtocolConfig protocol = new ProtocolConfig(); 17 protocol.setName("dubbo"); 18 protocol.setPort(8989); 19 protocol.setThreads(200); 20 21 // RegistryConfig registry = new RegistryConfig(); 22 // registry.setAddress("10.20.130.230:9090"); 23 // registry.setUsername("aaa"); 24 // registry.setPassword("bbb"); 25 26 ServiceConfig<BlogQueryService> service = new ServiceConfig<BlogQueryService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏 27 service.setApplication(application); 28 29 // service.setRegistry(registry); 30 service.setRegister(false); 31 service.setProtocol(protocol); // 多个协议可以用setProtocols() 32 service.setInterface(BlogQueryService.class); 33 service.setRef(blogQueryService); 34 service.setVersion("1.0.0"); 35 // 暴露及注册服务 36 service.export(); 37 38 System.out.println("Press any key to exit."); 39 System.in.read(); 40 } 41} 42
1  注意:dubbo export服务默认依赖于RegistryConfig,如果没有配置RegistryConfig会报错.可以通过service.setRegister(false)禁用。 2

 

client:

 

1import java.io.IOException; 2import java.util.ArrayList; 3import java.util.List; 4import java.util.concurrent.Callable; 5import java.util.concurrent.ExecutionException; 6import java.util.concurrent.ExecutorService; 7import java.util.concurrent.Executors; 8import java.util.concurrent.Future; 9import java.util.concurrent.ThreadFactory; 10import java.util.concurrent.atomic.AtomicInteger; 11 12import com.alibaba.dubbo.config.ApplicationConfig; 13import com.alibaba.dubbo.config.ReferenceConfig; 14import com.duitang.dboss.client.test.BlogQueryService; 15 16public class DubboClientTester { 17 18 public static void main(String[] args) throws InterruptedException, IOException { 19 ApplicationConfig application = new ApplicationConfig(); 20 application.setName("dubbo-test"); 21 22 ReferenceConfig<BlogQueryService> reference = new ReferenceConfig<BlogQueryService>(); 23 reference.setUrl("dubbo://127.0.0.1:8989/com.duitang.dboss.client.test.BlogQueryService"); 24 reference.setTimeout(500); 25 reference.setConnections(10); 26 reference.setApplication(application); 27 reference.setInterface(BlogQueryService.class); 28 reference.setVersion("1.0.0"); 29 final BlogQueryService blogQueryService = reference.get(); 30 31 long begin = System.currentTimeMillis(); 32 System.out.println(blogQueryService.test()); 33 long end = System.currentTimeMillis(); 34 System.out.println(" cost:" + (end - begin)); 35 36 ExecutorService es = Executors.newFixedThreadPool(50, new NamedThreadFactory("my test")); 37 List<Callable<String>> tasks = new ArrayList<Callable<String>>(); 38 for (int i = 0; i < 100000; ++i) { 39 tasks.add(new Callable<String>() { 40 41 @Override 42 public String call() throws Exception { 43 System.out.println("run"); 44 System.out.println(blogQueryService.test()); 45 System.out.println("run success"); 46 return null; 47 } 48 }); 49 } 50 List<Future<String>> futurelist = es.invokeAll(tasks); 51 for (Future<String> future : futurelist) { 52 try { 53 String result = future.get(); 54 } catch (ExecutionException e) { 55 e.printStackTrace(); 56 } 57 System.out.println("------------------------------------------------------------------------------------------------------------------------------------------------\r\n"); 58 } 59 es.shutdown(); 60 System.out.println("end"); 61 System.in.read(); 62 } 63 64 static class NamedThreadFactory implements ThreadFactory { 65 66 private static final AtomicInteger POOL_SEQ = new AtomicInteger(1); 67 68 private final AtomicInteger mThreadNum = new AtomicInteger(1); 69 70 private final String mPrefix; 71 72 private final boolean mDaemo; 73 74 private final ThreadGroup mGroup; 75 76 public NamedThreadFactory(){ 77 this("pool-" + POOL_SEQ.getAndIncrement(), false); 78 } 79 80 public NamedThreadFactory(String prefix){ 81 this(prefix, false); 82 } 83 84 public NamedThreadFactory(String prefix, boolean daemo){ 85 mPrefix = prefix + "-thread-"; 86 mDaemo = daemo; 87 SecurityManager s = System.getSecurityManager(); 88 mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); 89 } 90 91 public Thread newThread(Runnable runnable) { 92 String name = mPrefix + mThreadNum.getAndIncrement(); 93 Thread ret = new Thread(mGroup, runnable, name, 0); 94 ret.setDaemon(mDaemo); 95 return ret; 96 } 97 98 public ThreadGroup getThreadGroup() { 99 return mGroup; 100 } 101 102 } 103} 104
1   2

 

  1. 通过setUrl("")来实现远程服务直连。
  2. 需要注意的是默认connection只有一个,可以通过setConnections()来指定connection pool。在高负载环境下,nio的单连接也会遇到瓶颈,此时你可以通过设置连接池来让更多的连接分担dubbo的请求负载,从而提高系统的吞吐量。”

二. 代码流程
这里重点分析一下client的调用过程,client调用分为三个部分:
1). 初始化,建立连接。
2). 发送请求。
3). 等待远程应答。
(一).初始化

  1. DubboProtocol.initClient()
  2. Exchangers.connect(URL url, ExchangeHandler handler)   
  3. Exchangers.getExchanger(url).connect(url, handler)
  4. HeaderExchanger.connect(URL url, ExchangeHandler handler)
  5. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  6. Transporters.getTransporter().connect(URL url, ChannelHandler handler)
  7. NettyTransporter.connect(URL url, ChannelHandler listener)
  8. new NettyClient(url, listener) //timeout默认值:timeout=1000;connectTimeout=3000;
  9. NettyClient.doOpen()        //创建netty的ClientBootstrap

bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout()); //注意:此timeout是timeout,而非connectTimeout
10. AbstractClient.connect()
11. NettyClient.doConnect()  //如果远程地址无法连接,抛出timeout异常流程结束。
ChannelFuture future = bootstrap.connect(getConnectAddress());
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
(二).发送请求
1.DubboInvoker.doInvoke(Invocation invocation) //currentClient.request(invocation, timeout).get()
2.HeaderExchangeClient.request(invocation, timeout)
3.HeaderExchangeChannel.request(Invocation invocation,timeout)
4.AbstractPeer.send(Request request)
5.NettyChannel.send(Object message, boolean sent)
6.NioClientSocketChannel.write(message)
7.NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e)
8.AbstractPeer.sent(Channel ch, Request request)
(三).等待远程应答
在调用DubboInvoker.doInvoke(Invocation invocation)中实际是调用currentClient.request(invocation, timeout).get(),此方法会返回DefaultFuture,调用get方法会阻塞直到超时,在阻塞的同时netty的io线程会接收到远程应答,如果收到响应会产生io事件调用NettyHandler.messageReceived。
1.NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
2.AbstractPeer.received(Channel ch, Object msg)
3.MultiMessageHandler.received(Channel channel, Object message) 
4.AllChannelHandler.received(Channel channel, Object message)
5.DecodeHandler.received(Channel channel, Object message)
6.HeaderExchangeHandler.received(Channel channel, Object message)
7.DefaultFuture.received(Channel channel, Response response)  //注意是static方法
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
}
三. dubbo client的核心

我认为dubbo client的核心在DefaultFuture。所以远程调用都不会阻在IO上,而是阻在Future超时wait上,下面忽略掉远程调用把future抽取出来。


下面是代码实现

1package executor; 2 3import java.util.concurrent.Callable; 4import java.util.concurrent.ExecutionException; 5import java.util.concurrent.ExecutorService; 6import java.util.concurrent.Executors; 7import java.util.concurrent.Future; 8import java.util.concurrent.atomic.AtomicLong; 9 10public class Commands { 11 12 private ExecutorService senders = Executors.newCachedThreadPool(); 13 private ExecutorService receviers = Executors.newCachedThreadPool(); 14 private AtomicLong counter = new AtomicLong(); 15 16 public CommandResponse execute(Callable<Object> task, int timeout) { 17 Future<Object> result = senders.submit(task); 18 long id = counter.getAndIncrement(); 19 CommandFuture commandFuture = new CommandFuture(id); 20 receviers.submit(new ReceiveWorker(id, result)); 21 return commandFuture.get(timeout); 22 } 23 24 static class ReceiveWorker implements Runnable { 25 26 private Future<Object> result; 27 private Long id; 28 29 public ReceiveWorker(Long id, Future<Object> result){ 30 super(); 31 this.result = result; 32 this.id = id; 33 } 34 35 @Override 36 public void run() { 37 try { 38 Object obj = result.get(); 39 CommandFuture.received(new CommandResponse(id, obj)); 40 } catch (InterruptedException e) { 41 e.printStackTrace(); 42 } catch (ExecutionException e) { 43 e.printStackTrace(); 44 } 45 } 46 } 47 48 public void shutdown() { 49 senders.shutdown(); 50 receviers.shutdown(); 51 } 52} 53 54

 

1package executor; 2 3import java.util.Map; 4import java.util.concurrent.ConcurrentHashMap; 5import java.util.concurrent.TimeUnit; 6import java.util.concurrent.locks.Condition; 7import java.util.concurrent.locks.Lock; 8import java.util.concurrent.locks.ReentrantLock; 9 10public class CommandFuture { 11 12 private final Lock lock = new ReentrantLock(); 13 14 private final Condition done = lock.newCondition(); 15 16 private CommandResponse response; 17 18 private static final Map<Long, CommandFuture> FUTURES = new ConcurrentHashMap<Long, CommandFuture>(); 19 20 21 public CommandFuture(Long id){ 22 FUTURES.put(id, this); 23 } 24 25 public boolean isDone() { 26 return response != null; 27 } 28 29 public CommandResponse get(int timeout) { 30 31 if (!isDone()) { 32 long start = System.currentTimeMillis(); 33 lock.lock(); 34 try { 35 while (!isDone()) { 36 done.await(timeout, TimeUnit.MILLISECONDS); 37 if (isDone() || System.currentTimeMillis() - start >= timeout) { 38 break; 39 } 40 } 41 } catch (InterruptedException e) { 42 throw new RuntimeException(e); 43 } finally { 44 lock.unlock(); 45 } 46 if (!isDone()) { 47 throw new TimeoutException("timeout"); 48 } 49 } 50 return response; 51 } 52 53 public void doReceived(CommandResponse response) { 54 lock.lock(); 55 try { 56 this.response = response; 57 if (done != null) { 58 done.signal(); 59 } 60 } finally { 61 lock.unlock(); 62 } 63 64 } 65 66 public static void received(CommandResponse response) { 67 try { 68 CommandFuture future = FUTURES.remove(response.getId()); 69 if (future != null) { 70 future.doReceived(response); 71 } else { 72 System.out.println("some error!"); 73 } 74 } finally { 75 // CHANNELS.remove(response.getId()); 76 } 77 } 78} 79 80

 

1package executor; 2 3import java.util.concurrent.Callable; 4import java.util.concurrent.ExecutionException; 5import java.util.concurrent.ExecutorService; 6import java.util.concurrent.Executors; 7import java.util.concurrent.Future; 8import java.util.concurrent.atomic.AtomicLong; 9 10public class Commands { 11 12 private ExecutorService senders = Executors.newCachedThreadPool(); 13 private ExecutorService receviers = Executors.newCachedThreadPool(); 14 private AtomicLong counter = new AtomicLong(); 15 16 public CommandResponse execute(Callable<Object> task, int timeout) { 17 Future<Object> result = senders.submit(task); 18 long id = counter.getAndIncrement(); 19 CommandFuture commandFuture = new CommandFuture(id); 20 receviers.submit(new ReceiveWorker(id, result)); 21 return commandFuture.get(timeout); 22 } 23 24 static class ReceiveWorker implements Runnable { 25 26 private Future<Object> result; 27 private Long id; 28 29 public ReceiveWorker(Long id, Future<Object> result){ 30 super(); 31 this.result = result; 32 this.id = id; 33 } 34 35 @Override 36 public void run() { 37 try { 38 Object obj = result.get(); 39 CommandFuture.received(new CommandResponse(id, obj)); 40 } catch (InterruptedException e) { 41 e.printStackTrace(); 42 } catch (ExecutionException e) { 43 e.printStackTrace(); 44 } 45 } 46 } 47 48 public void shutdown() { 49 senders.shutdown(); 50 receviers.shutdown(); 51 } 52} 53 54

 

下面是jstack

代码交流 2021