Dubbo 源码分析 - 服务调用过程 (10)

默认配置下,Dubbo 使用 all 派发策略,即将所有的消息都派发到线程池中。下面我们来分析一下 AllChannelHandler 的代码。

public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } /** 处理连接事件 */ @Override public void connected(Channel channel) throws RemotingException { // 获取线程池 ExecutorService cexecutor = getExecutorService(); try { // 将连接事件派发到线程池中处理 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException(..., " error when process connected event .", t); } } /** 处理断开事件 */ @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException(..., "error when process disconnected event .", t); } } /** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */ @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { // 将请求和响应消息派发到线程池中处理 cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; // 如果调用方式为双向调用,此时将 Server side ... threadpool is exhausted // 错误信息封装到 Response 中,并返回给服务消费方。 if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); // 返回包含错误信息的 Response 对象 channel.send(response); return; } } throw new ExecutionException(..., " error when process received event .", t); } } /** 处理异常信息 */ @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException(..., "error when process caught event ..."); } } }

如上,请求对象会被封装 ChannelEventRunnable 中,ChannelEventRunnable 将会是服务调用过程的新起点。所以接下来我们以 ChannelEventRunnable 为起点向下探索。

2.3.2.2 调用服务

本小节,我们从 ChannelEventRunnable 开始分析,该类的主要代码如下:

public class ChannelEventRunnable implements Runnable { private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; @Override public void run() { // 检测消息类型是否为请求或响应 if (state == ChannelState.RECEIVED) { try { // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用 handler.received(channel, message); } catch (Exception e) { logger.warn("... operation error, channel is ... message is ..."); } } // 其他消息类型通过 switch 进行处理 else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("... operation error, channel is ..."); } break; case DISCONNECTED: // ... case SENT: // ... case CAUGHT: // ... default: logger.warn("unknown state: " + state + ", message is " + message); } } } }

如上,请求和响应消息出现频率明显比其他类型消息高,所以这里对该类型的消息进行了针对性判断。ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。

public class DecodeHandler extends AbstractChannelHandlerDelegate { public DecodeHandler(ChannelHandler handler) { super(handler); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { // 对 Decodeable 接口实现类对象进行解码 decode(message); } if (message instanceof Request) { // 对 Request 的 data 字段进行解码 decode(((Request) message).getData()); } if (message instanceof Response) { // 对 Request 的 result 字段进行解码 decode(((Response) message).getResult()); } // 执行后续逻辑 handler.received(channel, message); } private void decode(Object message) { // Decodeable 接口目前有两个实现类, // 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult if (message != null && message instanceof Decodeable) { try { // 执行解码逻辑 ((Decodeable) message).decode(); } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppyfd.html