Dubbo 源码分析 - 服务调用过程 (4)

这张图展示了服务消费方发送请求动作的部分调用栈,略为复杂。从上图可以看出,经过多次调用后,才将请求数据送至 Netty NioClientSocketChannel。这样做的原因是通过 Exchange 层为系统引入 Request 和 Response 语义,这一点会在接下来的代码分析过程中会看到。那其他的就说了,下面开始进行分析。首先分析 ReferenceCountExchangeClient 的源码。

final class ReferenceCountExchangeClient implements ExchangeClient { private final URL url; private final AtomicInteger referenceCount = new AtomicInteger(0); public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) { this.client = client; // 引用计数自增 referenceCount.incrementAndGet(); this.url = client.getUrl(); // ... } @Override public ResponseFuture request(Object request) throws RemotingException { // 直接调用被装饰对象的同签名方法 return client.request(request); } @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { // 直接调用被装饰对象的同签名方法 return client.request(request, timeout); } /** 引用计数自增,该方法由外部调用 */ public void incrementAndGetCount() { // referenceCount 自增 referenceCount.incrementAndGet(); } @Override public void close(int timeout) { // referenceCount 自减 if (referenceCount.decrementAndGet() <= 0) { if (timeout == 0) { client.close(); } else { client.close(timeout); } client = replaceWithLazyClient(); } } // 省略部分方法 }

ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。所以这里就不多说了,继续向下分析,这次是 HeaderExchangeClient。

public class HeaderExchangeClient implements ExchangeClient { private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); private final Client client; private final ExchangeChannel channel; private ScheduledFuture<?> heartbeatTimer; private int heartbeat; private int heartbeatTimeout; public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; // 创建 HeaderExchangeChannel 对象 this.channel = new HeaderExchangeChannel(client); // 以下代码均与心跳检测逻辑有关 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } if (needHeartbeat) { // 开启心跳检测定时器 startHeartbeatTimer(); } } @Override public ResponseFuture request(Object request) throws RemotingException { // 直接 HeaderExchangeChannel 对象的同签名方法 return channel.request(request); } @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { // 直接 HeaderExchangeChannel 对象的同签名方法 return channel.request(request, timeout); } @Override public void close() { doClose(); channel.close(); } private void doClose() { // 停止心跳检测定时器 stopHeartbeatTimer(); } private void startHeartbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { return Collections.<Channel>singletonList(HeaderExchangeClient.this); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } } private void stopHeartbeatTimer() { if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) { try { heartbeatTimer.cancel(true); scheduled.purge(); } catch (Throwable e) { if (logger.isWarnEnabled()) { logger.warn(e.getMessage(), e); } } } heartbeatTimer = null; } // 省略部分方法 }

HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。

final class HeaderExchangeChannel implements ExchangeChannel { private final Channel channel; HeaderExchangeChannel(Channel channel) { if (channel == null) { throw new IllegalArgumentException("channel == null"); } // 这里的 channel 指向的是 NettyClient this.channel = channel; } @Override public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(..., "Failed to send request ...); } // 创建 Request 对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); // 设置双向调用标志为 true req.setTwoWay(true); // 这里的 request 变量类型为 RpcInvocation req.setData(request); // 创建 DefaultFuture 对象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // 调用 NettyClient 的 send 方法发送请求 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } // 返回 DefaultFuture 对象 return future; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppyfd.html