首先声明,由于 Dubbo 目前一直在不断迭代更新中,尤其对于一些比较老的版本实现方式可能完全是不一样的,就算新的版本之间也有一些细微的差别,而我下面的源码分析用的是 Dubbo 官方的消费端发起请求调用服务端接口的一个例子来进行源码分析,而且是基于 Dubbo 3.2.0-beta.4 的版本。
服务端定义和实现 这里用的是 Dubbo 官方最简单的一个 dubbo 接口请求的例子,服务端接口定义如下:
1 2 3 public interface GreetingsService { String sayHi (String name) ; }
服务端接口实现如下:
1 2 3 4 5 6 public class GreetingsServiceImpl implements GreetingsService { @Override public String sayHi (String name) { return "hi, " + name; } }
服务端接口暴露 服务端接口定义好之后,需要将接口注册到注册中心暴露出去,这样消费端就可以通过注册中心获取到服务端的接口定义和请求地址,接口注册到 zookeeper 并启动服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Application { private static final String ZOOKEEPER_HOST = System.getProperty("zookeeper.address" , "127.0.0.1" ); private static final String ZOOKEEPER_PORT = System.getProperty("zookeeper.port" , "2181" ); private static final String ZOOKEEPER_ADDRESS = "zookeeper://" + ZOOKEEPER_HOST + ":" + ZOOKEEPER_PORT; public static void main (String[] args) { ServiceConfig<GreetingsService> service = new ServiceConfig<>(); service.setInterface(GreetingsService.class); service.setRef(new GreetingsServiceImpl()); DubboBootstrap.getInstance() .application("first-dubbo-provider" ) .registry(new RegistryConfig(ZOOKEEPER_ADDRESS)) .protocol(new ProtocolConfig("dubbo" , -1 )) .service(service) .start() .await(); } }
消费端发起调用 服务端将自身接口定义信息注册到 zookeeper,消费端同样需要连接 zookeeper 获取到相关定义信息,然后发起远程请求调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Application { private static final String ZOOKEEPER_HOST = System.getProperty("zookeeper.address" , "127.0.0.1" ); private static final String ZOOKEEPER_PORT = System.getProperty("zookeeper.port" , "2181" ); private static final String ZOOKEEPER_ADDRESS = "zookeeper://" + ZOOKEEPER_HOST + ":" + ZOOKEEPER_PORT; public static void main (String[] args) throws IOException { ReferenceConfig<GreetingsService> reference = new ReferenceConfig<>(); reference.setInterface(GreetingsService.class); DubboBootstrap.getInstance() .application("first-dubbo-consumer" ) .registry(new RegistryConfig(ZOOKEEPER_ADDRESS)) .reference(reference) .start(); GreetingsService service = reference.get(); String message = service.sayHi("dubbo" ); System.out.println("Receive result ======> " + message); System.in.read(); System.exit(0 ); } }
首先启动服务端,然后再启动消费端,正常的话就能在消费端的控制台打印如下信息:
1 Receive result ======> hi, dubbo
上面就是一次完整的 RPC 请求,看起来似乎挺简单的,代码量也很少,但 dubbo 框架底层可是为我们做了大量的工作,我们可以尝试跟着消费端的请求链路一直到服务端处理请求,再到消费端接收响应,看看整个链路到底是怎样的。
消费端请求链路源码跟踪
从上面断点处可以看到从 reference 里面获取到服务端对象是个代理对象,代理对象代理的是 InvokerInvocationHandler 属性对象,具体可以跟踪下 reference.get
方法,最后就是到 JavassistProxyFactory
类里面的 getProxy
方法,创建代理对象的代码如下:
1 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
而 InvokerInvocationHandler
类里面只有一个 invoke
方法,先是做了一些特殊方法的校验,若是直接本地执行后返回,否则组装对应的请求参数,调用 InvocationUtil.invoke 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 # InvokerInvocationHandler.invoke public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0 ) { if ("toString" .equals(methodName)) { return invoker.toString(); } else if ("$destroy" .equals(methodName)) { invoker.destroy(); return null ; } else if ("hashCode" .equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals" .equals(methodName)) { return invoker.equals(args[0 ]); } RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args); if (serviceModel instanceof ConsumerModel) { rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel); rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method)); } return InvocationUtil.invoke(invoker, rpcInvocation); }
到了 InvocationUtil.invoke 方法里面则是调用 invoker.invoke 方法,而 invoker 从上面可以看到是 MigrationInvoker 对象,也就是调用 MigrationInvoker 对象里面的 invoke 方法,里面主要是做了些校验然后继续调用 MockClusterInvoker.invoke
方法,再是 AbstractCluster$ClusterFilterInvoker.invoke
,然后到 FilterChainBuilder.CopyOfFilterChainNode.invoke
方法,在 FilterChainBuilder
里面将会执行一系列的 Filter 链,这个后面单独再讲,这次主要是先跟着源码整体走一遍,等执行完这些 Filter 链后继续走到 FailoverClusterInvoker.doInvoke
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # FailoverClusterInvoker.doInvoke public Result doInvoke (Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = calculateInvokeTimes(methodName); RpcException le = null ; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0 ; i < len; i++) { if (i > 0 ) { checkWhetherDestroyed(); copyInvokers = list(invocation); checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getServiceContext().setInvokers((List) invoked); boolean success = false ; try { Result result = invokeWithContext(invoker, invocation); success = true ; return result; } catch (RpcException e) { } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { if (!success) { providers.add(invoker.getUrl().getAddress()); } } } throw new RpcException(); }
从上面可以看到 Dubbo 里面的重试逻辑,通过获取方法上配置的重试次数进行相应的 for 循环调用,成功立即退出循环返回,for 循环里面调用的是 DubboInvoker.invoke
方法:DubboInvoker 继承了 AbstractInvoker 类,调用的是 AbstractInvoker 的 invoke 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 # AbstractInvoker.invoke public Result invoke (Invocation inv) throws RpcException { if (isDestroyed()) { logger.warn(PROTOCOL_FAILED_REQUEST, "" , "" , "Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer" ); } RpcInvocation invocation = (RpcInvocation) inv; prepareInvocation(invocation); AsyncRpcResult asyncResult = doInvokeAndReturn(invocation); waitForResultIfSync(asyncResult, invocation); return asyncResult; } # DubboInvoker.doInvoke protected Result doInvoke (final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT); if (timeout <= 0 ) { return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE, "No time left for making the following call: " + invocation.getServiceName() + "." + invocation.getMethodName() + ", terminate directly." ), invocation); } invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout)); Integer payload = getUrl().getParameter(PAYLOAD, Integer.class); Request request = new Request(); if (payload != null ) { request.setPayload(payload); } request.setData(inv); request.setVersion(Version.getProtocolVersion()); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); request.setTwoWay(false ); currentClient.send(request, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { request.setTwoWay(true ); ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<AppResponse> appResponseFuture = currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { } } # DubboInvoker.getCallbackExecutor protected ExecutorService getCallbackExecutor (URL url, Invocation inv) { ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url); if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) { return new ThreadlessExecutor(sharedExecutor); } else { return sharedExecutor; } }
这里做的逻辑就更多一些了,上面我列了一些,可能很多没有跟进去执行,比如上面计算超时时间,构建回调 ExecutorService 里面的逻辑都比较多,这里先把主流程熟悉,最后调用 currentClient.request 方法,请求的是 ReferenceCountExchangeClient.request-> ReferenceCountExchangeClient.request -> HeaderExchangeClient.request -> HeaderExchangeChannel.request
,最后到 HeaderExchangeChannel.request
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # HeaderExchangeChannel.request public CompletableFuture<Object> request (Object request, int timeout, ExecutorService executor) throws RemotingException { if (closed) { throw new RemotingException(this .getLocalAddress(), null , "Failed to send request " + request + ", cause: The channel " + this + " is closed!" ); } Request req; if (request instanceof Request) { req = (Request) request; } else { req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true ); req.setData(request); } DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
创建 DefaultFuture 对象也会做一些关联处理,以便收到服务端响应的时候能找到对应的请求进行匹配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # DefaultFuture public static DefaultFuture newFuture (Channel channel, Request request, int timeout, ExecutorService executor) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); if (executor instanceof ThreadlessExecutor) { ((ThreadlessExecutor) executor).setWaitingFuture(future); } timeoutCheck(future); return future; } private DefaultFuture (Channel channel, Request request, int timeout) { this .channel = channel; this .request = request; this .id = request.getId(); this .timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); FUTURES.put(id, this ); CHANNELS.put(id, channel); }
从上面的流程可以看到消费端在把请求发送给服务端后立马就返回了,返回的是一个 Future,Future 里面包含了当前回调处理的线程池以及超时检测任务,而并没有一直在等服务端的执行结果,这个过程是异步执行的,然后在下一步 waitForResultIfSync 里面来决定什么时候取服务端返回的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 # AbstractInvoker.invoke private void waitForResultIfSync (AsyncRpcResult asyncResult, RpcInvocation invocation) { if (InvokeMode.SYNC != invocation.getInvokeMode()) { return ; } try { Object timeout = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY); if (timeout instanceof Integer) { asyncResult.get((Integer) timeout, TimeUnit.MILLISECONDS); } else { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (ExecutionException e) { } } # AsyncRpcResult.get public Result get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } return responseFuture.get(timeout, unit); }
如果不是同步调用则直接返回,不需要阻塞等待执行结果,同样直接返回给业务代码一个 Future,业务代码拿到 Future 后自行决定在什么地方拿执行结果,或者也可以直接绑定回调;如果是同步调用的话根据超时时间配置来阻塞等待一定时间等待响应结果,具体阻塞方式根据调用方式的不同阻塞的方式也不同,同步调用采用的是 ThreadlessExecutor 线程池来阻塞,否则是 CompletableFuture 里面的 get 方法阻塞等结果。这里可能得先熟悉下 ThreadlessExecutor 这个类:
1.正常线程池的作用是管理池中的线程和调度线程执行,而 ThreadlessExecutor 线程池则并不管理任何线程
2.通过 execute 方法提交给这个执行器的任务并不会被安排线程执行,而正常的线程池是会进行执行调度的,对于 ThreadlessExecutor 线程池来说提交的任务直接存储在阻塞队列中,只有调用 waitAndDrain 方法时才会真正执行,而且执行任务的线程正是调用 waitAndDrain 方法的线程
通过查看 ThreadlessExecutor 类中的 waitAndDrain 和 execute 方法就能知道该类是如何实现上面的特性了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();public void waitAndDrain () throws InterruptedException { if (isFinished()) { return ; } Runnable runnable; try { runnable = queue.take(); } catch (InterruptedException e) { setWaiting(false ); throw e; } synchronized (lock) { setWaiting(false ); runnable.run(); } runnable = queue.poll(); while (runnable != null ) { runnable.run(); runnable = queue.poll(); } setFinished(true ); } public void execute (Runnable runnable) { runnable = new RunnableWrapper(runnable); synchronized (lock) { if (!isWaiting()) { runnable.run(); return ; } queue.add(runnable); } }
综上也就能看出,如果是同步调用的话,消费端在将请求信息发送给服务端后,通过调用 ThreadlessExecutor 类里面的 waitAndDrain 进行阻塞等待服务端的响应结果。那么后续消费端在接收到服务端的响应结果后必然会调用 execute 方法来唤醒阻塞等待结果的地方继续往下执行。
到这里消费端的请求流程就算是结束了,然后就是服务端收到请求执行相应方法然后响应返回给消费端,消费端这里拿到结果之后继续往下走,返回给业务代码相应的执行结果,下面看看服务端收到消费端的请求后的流程。
服务端处理请求链路源码跟踪 首先,Dubbo 底层集成的改造后 Netty 通讯框架,端点收到消息后将数据解析出来,通过 AllChannelHandler.received
方法来处理,首先获取一个线程池,这里要注意的是如果作为服务端收到的是请求消息,则是获取共享线程池中的线程来进行处理,如果是作为消费端收到的是响应消息,那就是获取在请求时创建的回调处理线程池。
拿到线程池后将消息封装成 ChannelEventRunnable 任务提交到线程池中执行,在 ChannelEventRunnable
线程中处理各种连接和断开连接以及请求响应的消息,再通过 Handler 来根据不同消息做不同的逻辑处理,这里对应的是 HeaderExchangeHandler 处理器,服务端收到的是请求消息,对应的也就是请求消息处理逻辑,后面消费端接收响应的代码逻辑也是在这里,只不过对应的是响应消息,走的是响应消息的处理分支。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 # AllChannelHandler.received public void received (Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return ; } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } } # ChannelEventRunnable.received public void received (Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(TRANSPORT_UNSUPPORTED_MESSAGE, "" , "" , e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (StringUtils.isNotEmpty(echo)) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } }
这里我们先看作为服务端收到请求消息的处理链路:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # ChannelEventRunnable.handleRequest void handleRequest (final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null ) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn(TRANSPORT_FAILED_RESPONSE, "" , "" , "Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
从这里可以看到服务端在收到消费的请求消息后,调用相应 DubboProtocol 的 reply 方法得到异步 Future,然后在 Future 上绑定一个执行完成的回调,执行完成后将结果发送给消费端,说明服务端也是异步去执行消费端的请求方法逻辑,等到执行完成后异步返回给消费端。而在 DubboProtocol.reply
方法里面调用的是FilterChainBuilder.CallbackRegistrationInvoker.invoke
方法,紧接着请求 FilterChainBuilder.CopyOfFilterChainNode.invoke
方法,在里面执行一系列的Filter 链,然后通过反射执行最终要执行的方法,当然执行结果也是异步返回的,返回的是一个异步 Future,然后同样在 Future 上绑定一系列的回调过滤器。
消费端接收响应链路源码跟踪 如刚才所说 Dubbo 底层集成的是改造后 Netty 通讯框架,同样消费端收到响应消息后将数据解析出来,然后通过 AllChannelHandler.received
方法来处理,获取线程池时由于是作为消费端接收的是响应消息,那么这里拿到的线程池就是发起请求时创建的回调处理线程池,而且如果是同步调用的话返回的就是 ThreadlessExecutor,然后调用 ThreadlessExecutor 的 execute 方法将消息封装成 ChannelEventRunnable 任务加入到阻塞队列,然后刚才请求时阻塞地方被唤醒,拿到提交 ChannelEventRunnable 任务开始往下执行,在 ChannelEventRunnable 里面判断是响应消息,执行响应消息处理逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # AllChannelHandler.getPreferredExecutorService public ExecutorService getPreferredExecutorService (Object msg) { if (msg instanceof Response) { Response response = (Response) msg; DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId()); if (responseFuture == null ) { return getSharedExecutorService(); } else { ExecutorService executor = responseFuture.getExecutor(); if (executor == null || executor.isShutdown()) { executor = getSharedExecutorService(msg); } return executor; } } else { return getSharedExecutorService(msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 # HeaderExchangeHandler.handleResponse static void handleResponse (Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } } # DefaultFuture.received public static void received (Channel channel, Response response) { received(channel, response, false ); } # DefaultFuture.received public static void received (Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null ) { Timeout t = future.timeoutCheckTask; if (!timeout) { t.cancel(); } future.doReceived(response); } else { logger.warn(PROTOCOL_TIMEOUT_SERVER, "" , "" , "The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS" ).format(new Date())) + ", response status is " + response.getStatus() + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result." ); } } finally { CHANNELS.remove(response.getId()); } } # DefaultFuture.doReceived private void doReceived (Response res) { if (res == null ) { throw new IllegalStateException("response cannot be null" ); } if (res.getStatus() == Response.OK) { this .complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this .completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this .completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception." )); } } }
在处理响应之前需先找到当前响应是对应哪个请求,通过请求和响应 id 进行匹配,将当时保存在 Map 中的映射关系移出,如果没有匹配上,说明服务端响应超时,则不在处理该响应结果,打印出告警日志,如果能匹配上说明还没有超时,继续往下执行,判断超时检测任务是否还存在,如果在则将超时检测任务取消, 获取到请求时的 future 将状态设置为已完成并设置响应结果,这样刚才阻塞那里的第二步就能够获取到执行结果了:
1 2 3 4 5 6 7 8 9 10 11 # AsyncRpcResult.get public Result get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } return responseFuture.get(timeout, unit); }
到这里从消费端发起请求,服务端接收请求,处理请求并返回执行结果给消费端,再到消费端收到响应结果整个链路的分析就结束了,主要就是消费端异步发送请求到服务端,然后根据调用方式来决定是否需要阻塞获取结果,上面主要分析了同步调用的方式,同步调用是采用 ThreadlessExecutor 这个类来阻塞获取执行结果,通过 threadlessExecutor.waitAndDrain()
方法来阻塞等待,然后当服务端执行结果返回给消费端后,消费端则根据响应 id 找到对应的请求信息,然后找到发起请求时设置的 threadlessExecutor,再调用 execute 方法来唤醒阻塞等待的地方,结合 CompletableFuture 拿到执行结果后交给业务逻辑代码处理。
上面的流程还是涉及到比较多的东西的,这里只是把 Dubbo 请求的整体主链路走通了,还有很多细节的地方没有深入的分析,比如说 Dubbo 的负载均衡,过滤器链,异步调用流程处理,线程池模型等等,这些等后续再针对具体的功能来进行分析研究了。下面再列一下 Dubbo 里面整体主链路涉及的一些类和方法:
consumer:
1 2 3 4 5 6 7 8 request: InvokerInvocationHandler.invoke->RpcInvocation->InvocationUtil.invoke->MigrationInvoker.invoke ->MockClusterInvoker.invoke->AbstractCluster$ClusterFilterInvoker.invoke->FilterChainBuilder.CopyOfFilterChainNode.invoke(执行一系列的Filter链) ->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke->ReferenceCountExchangeClient.request->HeaderExchangeClient.request->AsyncRpcResult.get response: AllChannelHandler.received->AllChannelHandler.getPreferredExecutorService->ChannelEventRunnable.run ->HeaderExchangeHandler.received->HeaderExchangeHandler.handleResponse->DefaultFuture.received->DefaultFuture.doReceived
provider:
1 2 3 AllChannelHandler.received->AllChannelHandler.getPreferredExecutorService->ChannelEventRunnable.run ->HeaderExchangeHandler.received->HeaderExchangeHandler.handleRequest->DubboProtocol.reply->FilterChainBuilder.CallbackRegistrationInvoker.invoke ->FilterChainBuilder.CopyOfFilterChainNode.invoke(执行一系列的Filter链)->绑定一系列回调过滤器->future.whenComplete