Dubbo 源码分析 - 服务调用过程 (8)

上面方法通过检测消息头中的魔数书否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。接着在对消息体长度,以及可读字节数进行检测。最后调用 decodeBody 方法进行后续的解码工作,ExchangeCodec 中实现了 decodeBody 方法,但因其子类 DubboCodec 覆写了该方法,所以在运行时 DubboCodec 中的 decodeBody 方法会被调用。下面我们来看一下该方法的代码。

public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { // 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 获取调用编号 long id = Bytes.bytes2long(header, 4); // 通过逻辑与运算得到调用类型,0 - Response,1 - Request if ((flag & FLAG_REQUEST) == 0) { // 对响应结果进行解码,得到 Response 对象。这个非本节内容,后面再分析 // ... } else { // 创建 Request 对象 Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); // 通过逻辑与运算得到调用方式,并设置到 Request 对象中 req.setTwoWay((flag & FLAG_TWOWAY) != 0); // 通过位运算检测数据包是否为事件类型 if ((flag & FLAG_EVENT) != 0) { // 设置心跳事件到 Request 对象中 req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; if (req.isHeartbeat()) { // 对心跳包进行解码,该方法已被标注为废弃 data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (req.isEvent()) { // 对事件数据进行解码 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcInvocation inv; // 根据 url 参数判断是否在 IO 线程上对消息体进行解码 if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); // 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将 // 调用方法名、attachment、以及调用参数解析出来 inv.decode(); } else { // 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑 inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } // 设置 data 到 Request 对象中 req.setData(data); } catch (Throwable t) { // 若解码过程中出现异常,则将 broken 字段设为 true, // 并将异常对象设置到 Reqeust 对象中 req.setBroken(true); req.setData(t); } return req; } } }

如上,decodeBody 对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用 DecodeableRpcInvocation 的 decode 方法进行后续的解码工作。此工作完成后,可将调用方法名、attachment、以及调用参数解析出来。下面我们来看一下 DecodeableRpcInvocation 的 decode 方法逻辑。

public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable { @Override public Object decode(Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); // 通过反序列化得到 dubbo version,并保存到 attachments 变量中 String dubboVersion = in.readUTF(); request.setVersion(dubboVersion); setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion); // 通过反序列化得到 path,version,并保存到 attachments 变量中 setAttachment(Constants.PATH_KEY, in.readUTF()); setAttachment(Constants.VERSION_KEY, in.readUTF()); // 通过反序列化得到调用方法名 setMethodName(in.readUTF()); try { Object[] args; Class<?>[] pts; // 通过反序列化得到参数类型字符串,比如 Ljava/lang/String; String desc = in.readUTF(); if (desc.length() == 0) { pts = DubboCodec.EMPTY_CLASS_ARRAY; args = DubboCodec.EMPTY_OBJECT_ARRAY; } else { // 将 desc 解析为参数类型数组 pts = ReflectUtils.desc2classArray(desc); args = new Object[pts.length]; for (int i = 0; i < args.length; i++) { try { // 解析运行时参数 args[i] = in.readObject(pts[i]); } catch (Exception e) { if (log.isWarnEnabled()) { log.warn("Decode argument failed: " + e.getMessage(), e); } } } } // 设置参数类型数组 setParameterTypes(pts); // 通过反序列化得到原 attachments 的内容 Map<String, String> map = (Map<String, String>) in.readObject(Map.class); if (map != null && map.size() > 0) { Map<String, String> attachment = getAttachments(); if (attachment == null) { attachment = new HashMap<String, String>(); } // 将 map 与当前对象中的 attachment 集合进行融合 attachment.putAll(map); setAttachments(attachment); } // 对 callback 类型的参数进行处理 for (int i = 0; i < args.length; i++) { args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]); } // 设置参数列表 setArguments(args); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read invocation data failed.", e)); } finally { if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } } return this; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppyfd.html