Dubbo 源码分析 - 服务调用过程 (13)

以上就是 Response 对象编码的过程,和前面分析的 Request 对象编码过程很相似。如果大家能看 Request 对象的编码逻辑,那么这里的代码逻辑也不难理解,这里就不多说了。接下来我们再来分析双向通信的最后一环 —— 服务消费方接收调用结果。

2.5 服务消费方接收调用结果

服务消费方再收到响应数据后,第一件要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。这个过程和服务提供方接收请求的过程是一样的,因此这里就不重复分析了。本节我们重点分析两个方面的内容,一是响应数据的解码过程,二是 Dubbo 如何将调用结果传递给用户线程的。下面先来分析响应数据解码过程。

2.5.1 响应数据解码

响应数据解码逻辑主要的逻辑封装在 DubboCodec 中,我们直接分析这个类的代码。如下:

public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 获取请求编号 long id = Bytes.bytes2long(header, 4); // 检测消息类型,若下面的条件成立,表明消息类型为 Response if ((flag & FLAG_REQUEST) == 0) { // 创建 Response 对象 Response res = new Response(id); // 检测事件标志位 if ((flag & FLAG_EVENT) != 0) { // 设置心跳事件 res.setEvent(Response.HEARTBEAT_EVENT); } // 获取响应状态 byte status = header[3]; // 设置响应状态 res.setStatus(status); // 如果响应状态为 OK,表明调用过程正常 if (status == Response.OK) { try { Object data; if (res.isHeartbeat()) { // 反序列化心跳数据,已废弃 data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (res.isEvent()) { // 反序列化事件数据 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcResult result; // 根据 url 参数决定是否在 IO 线程上执行解码逻辑 if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { // 创建 DecodeableRpcResult 对象 result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); // 进行后续的解码工作 result.decode(); } else { // 创建 DecodeableRpcResult 对象 result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } // 设置 DecodeableRpcResult 对象到 Response 对象中 res.setResult(data); } catch (Throwable t) { // 解码过程中出现了错误,此时设置 CLIENT_ERROR 状态码到 Response 对象中 res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } } // 响应状态非 OK,表明调用过程出现了异常 else { // 反序列化异常信息,并设置到 Response 对象中 res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); } return res; } else { // 对请求数据进行解码,前面已分析过,此处忽略 } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppyfd.html