本篇文章所分析的源码版本为 2.6.4,该版本下的 Response 支持 attachments 集合,所以上面仅对部分 case 分支进行了注释。其他 case 分支的逻辑比被注释分支的逻辑更为简单,所以这里就忽略了。我们所使用的测试服务接口 DemoService 包含了一个具有返回值的方法,所以正常调用下,线程会进入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。线程首先会从 invocation 变量(大家探索一下 invocation 变量的由来)中获取返回值类型,接着对调用结果进行反序列化,并将序列化后的结果存储起来。最后对 attachments 集合进行反序列化,并存到指定字段中。到此,关于响应数据的解码过程就分析完了。接下来,我们再来探索一下响应对象 Response 的去向。
2.5.2 向用户线程传递调用结果响应数据解码完成后,Dubbo 会将响应对象派发到线程池上。要注意的是,线程池中的线程并非用户的调用线程,所以要想办法将响应对象从线程池线程传递到用户线程上。我们在 2.1 节分析过用户线程在发送完请求后的动作,即调用 DefaultFuture 的 get 方法等待响应对象的到来。当响应对象到来后,用户线程会被唤醒,并通过调用编号获取属于自己的响应对象。下面我们就来看一下整个过程。
public class HeaderExchangeHandler implements ChannelHandlerDelegate { @Override public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // 处理请求,前面已分析过,省略 } else if (message instanceof Response) { // 处理响应 handleResponse(channel, (Response) message); } else if (message instanceof String) { // telnet 相关,忽略 } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { // 继续向下调用 DefaultFuture.received(channel, response); } } } public class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private volatile Response response; public static void received(Channel channel, Response response) { try { // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { // 继续向下调用 future.doReceived(response); } else { logger.warn("The timeout response finally returned at ..."); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { // 保存响应对象 response = res; if (done != null) { // 唤醒用户线程 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } } }以上逻辑是将响应对象保存到相应的 DefaultFuture 实例中,然后再唤醒用户线程,随后用户线程即可从 DefaultFuture 实例中获取到相应结果。
本篇文章在多个地方都强调过调用编号很重要,但一直没有解释原因,这里简单说明一下。一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中取出调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:
3. 总结本篇文章主要对 Dubbo 中的几种服务调用方式,以及从双向通信的角度对整个通信过程进行了详细的分析。按照通信顺序,通信过程包括服务消费方发送请求,服务提供方接收请求,服务提供方返回响应数据,服务消费方接收响应数据等过程。理解这些过程需要大家对网络编程,尤其是 Netty 有一定的了解。限于篇幅原因,本篇文章无法将服务调用的所有内容都进行分析。对于本篇文章未讲到或未详细分析的内容,比如服务降级、过滤器链、以及序列化等。对于这些内容,大家若感兴趣,可自行进行分析。并将分析整理成文,分享给社区。