Dubbo 源码分析 - 服务调用过程 (3)

上面的代码包含了 Dubbo 对同步和异步调用的处理逻辑,搞懂了上面的代码,会对 Dubbo 的同步和异步调用方式有更深入的了解。Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。ResponseFuture 是一个接口,下面我们来看一下它的默认实现类 DefaultFuture 的源码。

public class DefaultFuture implements ResponseFuture { private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); private final long id; private final Channel channel; private final Request request; private final int timeout; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private volatile Response response; public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; // 获取请求 id,这个 id 很重要,后面还会见到 this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 存储 <requestId, DefaultFuture> 键值对到 FUTURES 中 FUTURES.put(id, this); CHANNELS.put(id, channel); } @Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { // 循环检测服务提供方是否成功返回了调用结果 while (!isDone()) { // 如果调用结果尚未返回,这里等待一段时间 done.await(timeout, TimeUnit.MILLISECONDS); // 如果调用结果成功返回,或等待超时,此时跳出 while 循环,执行后续的逻辑 if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // 如果调用结果仍未返回,则抛出超时异常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } // 返回调用结果 return returnFromResponse(); } @Override public boolean isDone() { // 通过检测 response 字段为空与否,判断是否收到了调用结果 return response != null; } private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } // 如果调用结果的状态为 Response.OK,则表示调用过程正常,服务提供方成功返回了调用结果 if (res.getStatus() == Response.OK) { return res.getResult(); } // 抛出异常 if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); } // 省略其他方法 }

如上,当服务消费者还未接收到调用结果时,用户线程调用 get 方法会被阻塞住。同步调用模式下,框架获得 DefaultFuture 对象后,会立即调用 get 方法进行等待。而异步模式下则是将该对象封装到 FutureAdapter 实例中,并将 FutureAdapter 实例设置到 RpcContext 中,供用户使用。FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFuture 与 JDK 中的 Future 进行适配。这样当用户线程调用 Future 的 get 方法时,经过 FutureAdapter 适配后,最终会调用 ResponseFuture 实现类对象的 get 方法,也就是 DefaultFuture 的 get 方法。

到这里关于 Dubbo 几种调用方式的代码逻辑就分析完了,下面来分析请求数据的发送与接收,以及响应数据的发送与接收过程。

2.2 服务消费方发送请求 2.2.1 发送请求

本节我们来看一下同步调用模式下,服务消费方是如何发送调用请求的。在深入分析源码前,我们先来看一张图。

Dubbo 源码分析 - 服务调用过程

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppyfd.html