以上逻辑用于获取与指定服务对应的 Invoker 实例,并通过 Invoker 的 invoke 方法调用服务逻辑。invoke 方法定义在 AbstractProxyInvoker 中,下面我们看一下。
public abstract class AbstractProxyInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { try { // 调用 doInvoke 执行后续的调用,并将调用结果封装到 RpcResult 中,并 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method ..."); } } }如上,doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,创建逻辑如下:
public class JavassistProxyFactory extends AbstractProxyFactory { // 省略其他方法 @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 创建匿名类对象 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 调用 invokeMethod 方法进行后续的调用 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用消息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。
/** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; // 省略其他方法 public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { // 对参数进行类型转换 demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { // 根据方法名调用指定的方法 if ("sayHello".equals(string) && arrclass.length == 1) { return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString()); } }到这里,整个服务调用过程就分析完了。最后把调用过程贴出来,如下:
ChannelEventRunnable#run() —> DecodeHandler#received(Channel, Object) —> HeaderExchangeHandler#received(Channel, Object) —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request) —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object) —> Filter#invoke(Invoker, Invocation) —> AbstractProxyInvoker#invoke(Invocation) —> Wrapper#invokeMethod(Object, String, Class[], Object[]) —> DemoServiceImpl#sayHello(String) 2.4 服务提供方响应请求服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回,这个方法在 2.2.1 节分析过,这里就不在重复分析了。本节我们仅需关注 Response 对象的编码过程即可,这里仍然省略一些中间调用,直接分析具体的编码逻辑。
public class ExchangeCodec extends TelnetCodec { public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { // 对响应对象进行编码 encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } } protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); // 创建消息头字节数组 byte[] header = new byte[HEADER_LENGTH]; // 设置魔数 Bytes.short2bytes(MAGIC, header); // 设置序列化器编号 header[2] = serialization.getContentTypeId(); if (res.isHeartbeat()) header[2] |= FLAG_EVENT; // 获取响应状态 byte status = res.getStatus(); // 设置响应状态 header[3] = status; // 设置请求编号 Bytes.long2bytes(res.getId(), header, 4); // 更新 writerIndex,为消息头预留 16 个字节的空间 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (status == Response.OK) { if (res.isHeartbeat()) { // 对心跳响应结果进行序列化,已废弃 encodeHeartbeatData(channel, out, res.getResult()); } else { // 对调用结果进行序列化 encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else { // 对错误信息进行序列化 out.writeUTF(res.getErrorMessage()) }; out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); // 获取写入的字节数,也就是消息体长度 int len = bos.writtenBytes(); checkPayload(channel, len); // 将消息体长度写入到消息头中 Bytes.int2bytes(len, header, 12); // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备 buffer.writerIndex(savedWriteIndex); // 从 savedWriteIndex 下标处写入消息头 buffer.writeBytes(header); // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { // 异常处理逻辑不是很难理解,但是代码略多,这里忽略了 } } } public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { Result result = (Result) data; // 检测当前协议版本是否支持带有 attachments 集合的 Response 对象 boolean attach = Version.isSupportResponseAttachment(version); Throwable th = result.getException(); // 异常信息为空 if (th == null) { Object ret = result.getValue(); // 调用结果为空 if (ret == null) { // 序列化响应类型 out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE); } // 调用结果非空 else { // 序列化响应类型 out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE); // 序列化调用结果 out.writeObject(ret); } } // 异常信息非空 else { // 序列化响应类型 out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION); // 序列化异常对象 out.writeObject(th); } if (attach) { // 记录 Dubbo 协议版本 result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); // 序列化 attachments 集合 out.writeObject(result.getAttachments()); } } }