非阻塞通信下的同步API实现原理

  Netty在Java NIO领域基本算是独占鳌头,涉及到高性能网络通信,基本都会以Netty为底层通信框架,Dubbo 也不例外。以下将以Dubbo实现为例介绍其是如何在NIO非阻塞通信基础上实现同步通信的。

Dubbo为一种RPC通信框架,提供进程间的通信,在使用dubbo协议+Netty作为传输层时,提供三种API调用方式:

同步接口

异步带回调接口

异步不带回调接口

同步接口适用在大部分环境,通信方式简单、可靠,客户端发起调用,等待服务端处理,调用结果同步返回。这种方式下,在高吞吐、高性能(响应时间很快)的服务接口场景中最为适用,可以减少异步带来的额外的消耗,也方便客户端做一致性保证。

非阻塞通信下的同步API实现原理

异步带回调接口,用在任务处理时间较长,客户端应用线程不愿阻塞等待,而是为了提高自身处理能力希望服务端处理完成后可以异步通知应用线程。这种方式可以大大提升客户端的吞吐量,避免因为服务端的耗时问题拖死客户端。

非阻塞通信下的同步API实现原理

异步不带回调接口,一些场景为了进一步提升客户端的吞吐能力,只需发起一次服务端调用,不需关系调用结果,可以使用此种通信方式。一般在不需要严格保证数据一致性或者有其他补偿措施的情况下,选用这种,可以最小化远程调用带来的性能损耗。

非阻塞通信下的同步API实现原理

   

来看一下Dubbo是如何实现这三种API的。核心代码在com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker,如下图对应的位置,属于协议层的实现部分。为方便大家可以准确定位代码所在位置,使用截图的方式,而不是直接贴代码了。

非阻塞通信下的同步API实现原理

上文描述的是三种API方式,Dubbo里面通过参数isOneway、isAsync来控制,isOneway=true表示异步不带回调,isAsync=true表示异步带回调,否则是同步API。具体是如何控制,看以下代码:

非阻塞通信下的同步API实现原理

isOneway==true时,客户端send完请求后,直接return一个空结果的RpcResult;isAsync==true时,客户端发起请求,设置一个ResponseFuture,直接return一个空结果的RpcResult,接下来当服务端处理完成,客户端Netty层在收到响应后会通过Future通知应用线程;最后是同步情况下,客户端发起请求,并通过get()方法阻塞等待服务端的响应结果。

异步API情况下,结合NIO模型比较好理解是如何实现的(当然需要先了解NIO的reactor模型),接下来重点理解下,这个get()阻塞方法是如何做到基于非阻塞NIO实现同步阻塞效果。

直接进入get()方法内部。

非阻塞通信下的同步API实现原理

可以看到是利用Java的锁机制实现,循环判断是否收到响应,如果收到或者等待超时则返回。done的实例对象如下:

private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition();

使用可重入锁ReentrantLock,获取一个Condition对象在其上做await操作。这里有await操作,何时被唤醒呢,有两个条件,第一个是等待timeout超时,默认dubbo是1s,第二个就是被其他线程唤醒,即收到了服务端的响应。

非阻塞通信下的同步API实现原理

signal信号一发出,上文循环检测内的await操作会立即返回,下一次isDone判断会变成true,直接跳出循环。

仔细看代码会发现,被唤醒的地方还有一个是在DefaultFuture内部有一个超时轮询检测的线程,这个线程主要是处理响应超时后触发资源回收、记录异常日志等操作。   

private static class RemotingInvocationTimeoutScan implements Runnable {

public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/47d7a973e919d82b0333c779ffb8fd48.html