DecodeHandler 主要是包含了一些解码逻辑。2.2.1 节分析请求解码时说过,请求解码可在 IO 线程上执行,也可在线程池中执行,这个取决于运行时配置。DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。
public class HeaderExchangeHandler implements ChannelHandlerDelegate { private final ExchangeHandler handler; public HeaderExchangeHandler(ExchangeHandler handler) { if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.handler = handler; } @Override public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { // 处理事件 handlerEvent(channel, request); } // 处理普通的请求 else { // 双向通信 if (request.isTwoWay()) { // 向后调用服务,并得到调用结果 Response response = handleRequest(exchangeChannel, request); // 将调用结果返回给服务消费端 channel.send(response); } // 如果是单向通信,仅向后执行指定服务即可,无需返回执行结果 else { handler.received(exchangeChannel, request.getData()); } } } // 处理响应,服务消费方会执行此处逻辑,后面分析 else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { // telnet 相关,忽略 } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应 if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); // 设置 BAD_REQUEST 状态 res.setStatus(Response.BAD_REQUEST); return res; } // 获取 data,也就是 RpcInvocation 对象 Object msg = req.getData(); try { // 继续向下调用 Object result = handler.reply(channel, msg); // 设置 OK 状态码 res.setStatus(Response.OK); // 设置调用结果 res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; } }到这里,我们看到了比较清晰的请求和响应逻辑。对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。接下来我们继续向后分析,把剩余的调用过程分析完。下面分析定义在 DubboProtocol 类中的匿名类对象逻辑,如下:
public class DubboProtocol extends AbstractProtocol { public static final String NAME = "dubbo"; private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; // 获取 Invoker 实例 Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { // 回调相关,忽略 } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 通过 Invoker 调用具体的服务 return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: ..."); } // 忽略其他方法 } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { // 忽略回调和本地存根相关逻辑 // ... int port = channel.getLocalAddress().getPort(); // 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如: // dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880 String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象, // 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException(channel, "Not found exported service ..."); // 获取 Invoker 对象,并返回 return exporter.getInvoker(); } // 忽略其他方法 }