Fork me on GitHub
0%

Dubbo 远程调用链路源码分析

首先声明,由于 Dubbo 目前一直在不断迭代更新中,尤其对于一些比较老的版本实现方式可能完全是不一样的,就算新的版本之间也有一些细微的差别,而我下面的源码分析用的是 Dubbo 官方的消费端发起请求调用服务端接口的一个例子来进行源码分析,而且是基于 Dubbo 3.2.0-beta.4 的版本。

服务端定义和实现

这里用的是 Dubbo 官方最简单的一个 dubbo 接口请求的例子,服务端接口定义如下:

1
2
3
public interface GreetingsService {
String sayHi(String name);
}

服务端接口实现如下:

1
2
3
4
5
6
public class GreetingsServiceImpl implements GreetingsService {
@Override
public String sayHi(String name) {
return "hi, " + name;
}
}

服务端接口暴露

服务端接口定义好之后,需要将接口注册到注册中心暴露出去,这样消费端就可以通过注册中心获取到服务端的接口定义和请求地址,接口注册到 zookeeper 并启动服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Application {
private static final String ZOOKEEPER_HOST = System.getProperty("zookeeper.address", "127.0.0.1");
private static final String ZOOKEEPER_PORT = System.getProperty("zookeeper.port", "2181");
private static final String ZOOKEEPER_ADDRESS = "zookeeper://" + ZOOKEEPER_HOST + ":" + ZOOKEEPER_PORT;

public static void main(String[] args) {
// 配置服务端接口
ServiceConfig<GreetingsService> service = new ServiceConfig<>();
service.setInterface(GreetingsService.class);
service.setRef(new GreetingsServiceImpl());
// 暴露及注册当前服务
DubboBootstrap.getInstance()
.application("first-dubbo-provider")
.registry(new RegistryConfig(ZOOKEEPER_ADDRESS))
.protocol(new ProtocolConfig("dubbo", -1))
.service(service)
.start()
.await();
}
}

消费端发起调用

服务端将自身接口定义信息注册到 zookeeper,消费端同样需要连接 zookeeper 获取到相关定义信息,然后发起远程请求调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Application {
private static final String ZOOKEEPER_HOST = System.getProperty("zookeeper.address", "127.0.0.1");
private static final String ZOOKEEPER_PORT = System.getProperty("zookeeper.port", "2181");
private static final String ZOOKEEPER_ADDRESS = "zookeeper://" + ZOOKEEPER_HOST + ":" + ZOOKEEPER_PORT;

public static void main(String[] args) throws IOException {
// 引用远程服务端接口,此实例很重,封装了与注册中心的连接以及与提供者的连接,需自行缓存,否则可能造成内存和连接泄漏
ReferenceConfig<GreetingsService> reference = new ReferenceConfig<>();
reference.setInterface(GreetingsService.class);

// 连接注册中心配置
DubboBootstrap.getInstance()
.application("first-dubbo-consumer")
.registry(new RegistryConfig(ZOOKEEPER_ADDRESS))
.reference(reference)
.start();

// 消费端创建服务端接口代理对象,让使用者看起来像是调用本地方法一样
// 该代理对象封装了所有和服务端通讯的细节,对象较重,需缓存复用
GreetingsService service = reference.get();
String message = service.sayHi("dubbo");
System.out.println("Receive result ======> " + message);
System.in.read();
System.exit(0);
}
}

首先启动服务端,然后再启动消费端,正常的话就能在消费端的控制台打印如下信息:

1
Receive result ======> hi, dubbo

上面就是一次完整的 RPC 请求,看起来似乎挺简单的,代码量也很少,但 dubbo 框架底层可是为我们做了大量的工作,我们可以尝试跟着消费端的请求链路一直到服务端处理请求,再到消费端接收响应,看看整个链路到底是怎样的。

消费端请求链路源码跟踪

dubbo_consumer_proxy_obj

从上面断点处可以看到从 reference 里面获取到服务端对象是个代理对象,代理对象代理的是 InvokerInvocationHandler 属性对象,具体可以跟踪下 reference.get 方法,最后就是到 JavassistProxyFactory 类里面的 getProxy 方法,创建代理对象的代码如下:

1
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

InvokerInvocationHandler 类里面只有一个 invoke 方法,先是做了一些特殊方法的校验,若是直接本地执行后返回,否则组装对应的请求参数,调用 InvocationUtil.invoke 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# InvokerInvocationHandler.invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 校验是否是一些特殊方法,若是,直接执行
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 组装请求调用参数,
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);

if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
// 调用 InvocationUtil.invoke 方法,底层异步调用返回
return InvocationUtil.invoke(invoker, rpcInvocation);
}

到了 InvocationUtil.invoke 方法里面则是调用 invoker.invoke 方法,而 invoker 从上面可以看到是 MigrationInvoker 对象,也就是调用 MigrationInvoker 对象里面的 invoke 方法,里面主要是做了些校验然后继续调用 MockClusterInvoker.invoke 方法,再是 AbstractCluster$ClusterFilterInvoker.invoke,然后到 FilterChainBuilder.CopyOfFilterChainNode.invoke 方法,在 FilterChainBuilder 里面将会执行一系列的 Filter 链,这个后面单独再讲,这次主要是先跟着源码整体走一遍,等执行完这些 Filter 链后继续走到 FailoverClusterInvoker.doInvoke 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# FailoverClusterInvoker.doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 计算方法调用失败时重试次数,默认重试 2 次,总共执行 3 次
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 在这里进行循环调用
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
// 带上 Context 调用 DubboInvoker.invoke 方法
Result result = invokeWithContext(invoker, invocation);
// 请求成功,立即退出循环返回,否则继续循环调用
success = true;
return result;
} catch (RpcException e) {
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
if (!success) {
providers.add(invoker.getUrl().getAddress());
}
}
}
throw new RpcException();
}

从上面可以看到 Dubbo 里面的重试逻辑,通过获取方法上配置的重试次数进行相应的 for 循环调用,成功立即退出循环返回,for 循环里面调用的是 DubboInvoker.invoke 方法:DubboInvoker 继承了 AbstractInvoker 类,调用的是 AbstractInvoker 的 invoke 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#  AbstractInvoker.invoke
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (isDestroyed()) {
logger.warn(PROTOCOL_FAILED_REQUEST, "", "", "Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}

RpcInvocation invocation = (RpcInvocation) inv;

// 准备远程调用的一些参数
prepareInvocation(invocation);

// 开始异步执行远程调用,返回得到异步的结果
AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);

// 这里要注意,针对上一步的返回结果,判断是哪种调用方式,决定是否需要阻塞等待执行结果
waitForResultIfSync(asyncResult, invocation);

return asyncResult;
}
# DubboInvoker.doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

// 计算本次调用的超时时间
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT);
if (timeout <= 0) {
// 发起调用前就已经超时了,直接返回超时调用终止异常
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ invocation.getMethodName() + ", terminate directly."), invocation);
}

invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));

Integer payload = getUrl().getParameter(PAYLOAD, Integer.class);
// 组装请求传输参数
Request request = new Request();
if (payload != null) {
request.setPayload(payload);
}
request.setData(inv);
request.setVersion(Version.getProtocolVersion());

if (isOneway) {
// 单向请求,也就是不关心请求结果的调用,发出请求后立即返回
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
request.setTwoWay(false);
currentClient.send(request, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 双向请求,关心请求结果的调用,发出请求后需接收响应结果
request.setTwoWay(true);
// 构建一个回调 ExecutorService 来处理响应结果
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 请求 ReferenceCountExchangeClient.request-> ReferenceCountExchangeClient.request
// -> HeaderExchangeClient.request -> HeaderExchangeChannel.request
// 注意这里返回的是 CompletableFuture 对象,用的是 concurrent 包下面的一个对象
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
}
}
# DubboInvoker.getCallbackExecutor
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
// 如果是同步调用每次创建 ThreadlessExecutor 返回,后面通过这个线程池来处理响应结果
return new ThreadlessExecutor(sharedExecutor);
} else {
return sharedExecutor;
}
}

这里做的逻辑就更多一些了,上面我列了一些,可能很多没有跟进去执行,比如上面计算超时时间,构建回调 ExecutorService 里面的逻辑都比较多,这里先把主流程熟悉,最后调用 currentClient.request 方法,请求的是 ReferenceCountExchangeClient.request-> ReferenceCountExchangeClient.request -> HeaderExchangeClient.request -> HeaderExchangeChannel.request,最后到 HeaderExchangeChannel.request 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# HeaderExchangeChannel.request
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Request req;
if (request instanceof Request) {
req = (Request) request;
} else {
// create request.
req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
}
// 构建 DefaultFuture 对象,设置响应结果处理线程池,同时配置响应超时检测
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
// 底层通过集成 netty 和服务端进行通讯,将消费端要请求服务端的方法以及参数传递到服务端
// 这里可以看出来消费端是异步请求服务端的,将请求消息发出去之后立马就返回了,只不过返回的是 CompletableFuture 对象
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

创建 DefaultFuture 对象也会做一些关联处理,以便收到服务端响应的时候能找到对应的请求进行匹配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# DefaultFuture
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// future 保存当前回调处理的线程池
future.setExecutor(executor);
// 如果是 ThredlessExecutor 类型,也就是同步调用时,将 future 关联上当前线程池
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
// 进行超时检测
timeoutCheck(future);
return future;
}
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// 放入等待响应的 Future Map 集合,响应回来的时候通过这个可以设置请求结果
FUTURES.put(id, this);
// 放入等待响应 Channel 集合,优雅关闭的时候通过这个 Map 来判断是否还有等待响应的请求
CHANNELS.put(id, channel);
}

从上面的流程可以看到消费端在把请求发送给服务端后立马就返回了,返回的是一个 Future,Future 里面包含了当前回调处理的线程池以及超时检测任务,而并没有一直在等服务端的执行结果,这个过程是异步执行的,然后在下一步 waitForResultIfSync 里面来决定什么时候取服务端返回的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#  AbstractInvoker.invoke
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
if (InvokeMode.SYNC != invocation.getInvokeMode()) {
// 如果不是同步调用,直接返回,不需要等待结果
return;
}
try {
/*
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
*/
Object timeout = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
if (timeout instanceof Integer) {
// 配置了超时时间,阻塞一定时间等待结果,超时则抛出异常,在下面进行捕获对应异常抛出
asyncResult.get((Integer) timeout, TimeUnit.MILLISECONDS);
} else {
// 没有配置超时时间,无限等待阻塞等待结果,一般不会无限等待,有默认的超时时间
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (ExecutionException e) {
}
}
# AsyncRpcResult.get
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 获取刚才设置的回调处理线程池,同步调用则是调用 threadlessExecutor 里面的 waitAndDrain 来阻塞等待结果
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
// 否则直接通过 CompletableFuture 里面的 get 方法阻塞等结果
return responseFuture.get(timeout, unit);
}

如果不是同步调用则直接返回,不需要阻塞等待执行结果,同样直接返回给业务代码一个 Future,业务代码拿到 Future 后自行决定在什么地方拿执行结果,或者也可以直接绑定回调;如果是同步调用的话根据超时时间配置来阻塞等待一定时间等待响应结果,具体阻塞方式根据调用方式的不同阻塞的方式也不同,同步调用采用的是 ThreadlessExecutor 线程池来阻塞,否则是 CompletableFuture 里面的 get 方法阻塞等结果。这里可能得先熟悉下 ThreadlessExecutor 这个类:

1.正常线程池的作用是管理池中的线程和调度线程执行,而 ThreadlessExecutor 线程池则并不管理任何线程

2.通过 execute 方法提交给这个执行器的任务并不会被安排线程执行,而正常的线程池是会进行执行调度的,对于 ThreadlessExecutor 线程池来说提交的任务直接存储在阻塞队列中,只有调用 waitAndDrain 方法时才会真正执行,而且执行任务的线程正是调用 waitAndDrain 方法的线程

通过查看 ThreadlessExecutor 类中的 waitAndDrain 和 execute 方法就能知道该类是如何实现上面的特性了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// 通常,#waitAndDrain 方法只会被调用一次。它第一次阻止响应,一旦响应结果到达,正在执行 waitAndDrain 等待的线程将会立即返回,然后整个请求过程然后完成。
// 对 #waitAndDrain 方法的后续调用(如果有的话)都会立即返回
public void waitAndDrain() throws InterruptedException {
if (isFinished()) {
// 如果任务状态已经完成,直接返回不做任何处理
return;
}

Runnable runnable;
try {

// 从阻塞队列里面取要执行的任务,如果队列为空则阻塞等待,主要通过 LinkedBlockingQueue 里面的 ReentrantLock 来实现阻塞
// 等待直到有任务,则继续往下执行任务和所有排队的任务(如果有的话)
runnable = queue.take();
} catch (InterruptedException e) {
setWaiting(false);
throw e;
}
// 获取对象锁,设置状态,开始执行任务,注意这里并没有再开线程执行,而是直接由调用 waitAndDrain 方法的线程来执行
synchronized (lock) {
setWaiting(false);
runnable.run();
}
// 如果队列里面还有任务则继续执行
runnable = queue.poll();
while (runnable != null) {
runnable.run();
runnable = queue.poll();
}
// 任务状态设置已完成
setFinished(true);
}

public void execute(Runnable runnable) {
runnable = new RunnableWrapper(runnable);
// 获取对象锁,判断调用线程是否还在等待回调任务,如果是则将该任务加入阻塞队列,然后上面 waitAndDrain 方法里面的阻塞结束,拿到任务后往下执行
// 否则,直接在当前线程中执行完成,这也间接说明了当 waitAndDrain 被调用结束,已经将 waiting 状态设置为 true 后,就不会再往队列里面添加任务
synchronized (lock) {
if (!isWaiting()) {
runnable.run();
return;
}
queue.add(runnable);
}
}

综上也就能看出,如果是同步调用的话,消费端在将请求信息发送给服务端后,通过调用 ThreadlessExecutor 类里面的 waitAndDrain 进行阻塞等待服务端的响应结果。那么后续消费端在接收到服务端的响应结果后必然会调用 execute 方法来唤醒阻塞等待结果的地方继续往下执行。

到这里消费端的请求流程就算是结束了,然后就是服务端收到请求执行相应方法然后响应返回给消费端,消费端这里拿到结果之后继续往下走,返回给业务代码相应的执行结果,下面看看服务端收到消费端的请求后的流程。

服务端处理请求链路源码跟踪

首先,Dubbo 底层集成的改造后 Netty 通讯框架,端点收到消息后将数据解析出来,通过 AllChannelHandler.received 方法来处理,首先获取一个线程池,这里要注意的是如果作为服务端收到的是请求消息,则是获取共享线程池中的线程来进行处理,如果是作为消费端收到的是响应消息,那就是获取在请求时创建的回调处理线程池。

拿到线程池后将消息封装成 ChannelEventRunnable 任务提交到线程池中执行,在 ChannelEventRunnable 线程中处理各种连接和断开连接以及请求响应的消息,再通过 Handler 来根据不同消息做不同的逻辑处理,这里对应的是 HeaderExchangeHandler 处理器,服务端收到的是请求消息,对应的也就是请求消息处理逻辑,后面消费端接收响应的代码逻辑也是在这里,只不过对应的是响应消息,走的是响应消息的处理分支。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# AllChannelHandler.received
public void received(Channel channel, Object message) throws RemotingException {
// 获取线程池处理消息,如果是请求消息则从共享线程池中获取,但如果是响应消息的话这里返回的就是在请求时创建的回调处理线程池
ExecutorService executor = getPreferredExecutorService(message);
try {
// 将消息封装成 ChannelEventRunnable 任务提交到线程池
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
# ChannelEventRunnable.received
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
// 作为服务端接收到请求消息,执行请求消息处理逻辑
Request request = (Request) message;
if (request.isEvent()) {
// 事件处理
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
// 双向请求,需要返回结果,调用 DubboProtocol.reply 方法得到一个异步结果,绑定回调,结果出来后立马将结果发送给消费端
// reply 方法里面紧接着调用 FilterChainBuilder.CallbackRegistrationInvoker.invoke 方法,
// 在里面根据请求的方法和参数异步执行真正的方法,同时绑定一系列 Filter 过滤器回调,异步结果出来后立马执行
handleRequest(exchangeChannel, request);
} else {
// 单向请求,不需要返回结果
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// 作为消费端收到服务端的调用响应,执行响应消息处理逻辑
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(TRANSPORT_UNSUPPORTED_MESSAGE, "", "", e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (StringUtils.isNotEmpty(echo)) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}

这里我们先看作为服务端收到请求消息的处理链路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# ChannelEventRunnable.handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// 构建响应消息
Response res = new Response(req.getId(), req.getVersion());
// find handler by message class.
Object msg = req.getData();
try {
// 调用 DubboProtocol 里面 reply 方法得到异步 Future
CompletionStage<Object> future = handler.reply(channel, msg);
// 绑定异步完成回调,将执行结果发送给消费端
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 发送响应结果给消费端
channel.send(res);
} catch (RemotingException e) {
logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}

从这里可以看到服务端在收到消费的请求消息后,调用相应 DubboProtocol 的 reply 方法得到异步 Future,然后在 Future 上绑定一个执行完成的回调,执行完成后将结果发送给消费端,说明服务端也是异步去执行消费端的请求方法逻辑,等到执行完成后异步返回给消费端。而在 DubboProtocol.reply 方法里面调用的是FilterChainBuilder.CallbackRegistrationInvoker.invoke 方法,紧接着请求 FilterChainBuilder.CopyOfFilterChainNode.invoke 方法,在里面执行一系列的Filter 链,然后通过反射执行最终要执行的方法,当然执行结果也是异步返回的,返回的是一个异步 Future,然后同样在 Future 上绑定一系列的回调过滤器。

消费端接收响应链路源码跟踪

如刚才所说 Dubbo 底层集成的是改造后 Netty 通讯框架,同样消费端收到响应消息后将数据解析出来,然后通过 AllChannelHandler.received 方法来处理,获取线程池时由于是作为消费端接收的是响应消息,那么这里拿到的线程池就是发起请求时创建的回调处理线程池,而且如果是同步调用的话返回的就是 ThreadlessExecutor,然后调用 ThreadlessExecutor 的 execute 方法将消息封装成 ChannelEventRunnable 任务加入到阻塞队列,然后刚才请求时阻塞地方被唤醒,拿到提交 ChannelEventRunnable 任务开始往下执行,在 ChannelEventRunnable 里面判断是响应消息,执行响应消息处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# AllChannelHandler.getPreferredExecutorService
public ExecutorService getPreferredExecutorService(Object msg) {
if (msg instanceof Response) {
// 收到的是响应消息
Response response = (Response) msg;
DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
// 根据响应消息id获取对应请求时在 DefaultFuture 里面通过 Map 关联的 future,
// 当然这里可能由于超时等原因 future 已经被移除了,然后通过共享线程池来处理
if (responseFuture == null) {
return getSharedExecutorService();
} else {
// 通过 future 获取请求时设置的回调线程池,如果是同步调用的话返回的是 ThreadlessExecutor,
// 然后将消息封装成 ChannelEventRunnable 任务提交给它去执行设置返回结果,之前阻塞的地方阻塞结束,拿到执行结果后继续往下执行
ExecutorService executor = responseFuture.getExecutor();
if (executor == null || executor.isShutdown()) {
executor = getSharedExecutorService(msg);
}
return executor;
}
} else {
return getSharedExecutorService(msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# HeaderExchangeHandler.handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}

# DefaultFuture.received
public static void received(Channel channel, Response response) {
received(channel, response, false);
}

# DefaultFuture.received
public static void received(Channel channel, Response response, boolean timeout) {
try {
// 通过请求响应id映射请求时的 future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// 如果收到响应结果后还没有超时,将超时检测任务取消
t.cancel();
}
future.doReceived(response);
} else {
// 没有匹配到说明服务端执行可能已经超时了,消费端已经返回,不再处理超时的响应结果
logger.warn(PROTOCOL_TIMEOUT_SERVER, "", "", "The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response status is " + response.getStatus()
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
}
}
# DefaultFuture.doReceived
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
// 响应成功,将 future 状态设置已完成并设置结果
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}

// 如果回调处理线程池是 ThreadlessExecutor,可能由于某种原因 ThreadlessExecutor 还在等待结果,需要通知该线程池结果已经返回
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
" which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}

在处理响应之前需先找到当前响应是对应哪个请求,通过请求和响应 id 进行匹配,将当时保存在 Map 中的映射关系移出,如果没有匹配上,说明服务端响应超时,则不在处理该响应结果,打印出告警日志,如果能匹配上说明还没有超时,继续往下执行,判断超时检测任务是否还存在,如果在则将超时检测任务取消, 获取到请求时的 future 将状态设置为已完成并设置响应结果,这样刚才阻塞那里的第二步就能够获取到执行结果了:

1
2
3
4
5
6
7
8
9
10
11
# AsyncRpcResult.get
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 获取刚才设置的回调处理线程池,同步调用则是调用 threadlessExecutor 里面的 waitAndDrain 来阻塞等待结果
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
// 阻塞等待响应任务入队列
threadlessExecutor.waitAndDrain();
}
// 通过 CompletableFuture 里面的 get 获取响应结果
return responseFuture.get(timeout, unit);
}

到这里从消费端发起请求,服务端接收请求,处理请求并返回执行结果给消费端,再到消费端收到响应结果整个链路的分析就结束了,主要就是消费端异步发送请求到服务端,然后根据调用方式来决定是否需要阻塞获取结果,上面主要分析了同步调用的方式,同步调用是采用 ThreadlessExecutor 这个类来阻塞获取执行结果,通过 threadlessExecutor.waitAndDrain() 方法来阻塞等待,然后当服务端执行结果返回给消费端后,消费端则根据响应 id 找到对应的请求信息,然后找到发起请求时设置的 threadlessExecutor,再调用 execute 方法来唤醒阻塞等待的地方,结合 CompletableFuture 拿到执行结果后交给业务逻辑代码处理。

上面的流程还是涉及到比较多的东西的,这里只是把 Dubbo 请求的整体主链路走通了,还有很多细节的地方没有深入的分析,比如说 Dubbo 的负载均衡,过滤器链,异步调用流程处理,线程池模型等等,这些等后续再针对具体的功能来进行分析研究了。下面再列一下 Dubbo 里面整体主链路涉及的一些类和方法:

consumer:

1
2
3
4
5
6
7
8
request:
InvokerInvocationHandler.invoke->RpcInvocation->InvocationUtil.invoke->MigrationInvoker.invoke
->MockClusterInvoker.invoke->AbstractCluster$ClusterFilterInvoker.invoke->FilterChainBuilder.CopyOfFilterChainNode.invoke(执行一系列的Filter链)
->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke->ReferenceCountExchangeClient.request->HeaderExchangeClient.request->AsyncRpcResult.get

response:
AllChannelHandler.received->AllChannelHandler.getPreferredExecutorService->ChannelEventRunnable.run
->HeaderExchangeHandler.received->HeaderExchangeHandler.handleResponse->DefaultFuture.received->DefaultFuture.doReceived

provider:

1
2
3
AllChannelHandler.received->AllChannelHandler.getPreferredExecutorService->ChannelEventRunnable.run
->HeaderExchangeHandler.received->HeaderExchangeHandler.handleRequest->DubboProtocol.reply->FilterChainBuilder.CallbackRegistrationInvoker.invoke
->FilterChainBuilder.CopyOfFilterChainNode.invoke(执行一系列的Filter链)->绑定一系列回调过滤器->future.whenComplete
 wechat
扫描上面图中二维码关注微信公众号