203 private static class Invoker implements InvocationHandler {
果然没有令我们失望,这与前面的分析完全吻合。208 public Invoker(Class<? extends VersionedProtocol> protocol, 209 InetSocketAddress address, UserGroupInformation ticket, 210 Configuration conf, SocketFactory factory, 211 int rpcTimeout) throws IOException { 212 this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, 213 ticket, rpcTimeout, conf); 214 this.client = CLIENTS.getClient(conf, factory); 215 }
看一下Invoke构造函数主要完成的工作,初始化一个 Client.ConnectionId remoteId;,可以看出这个对象主要封装了服务器地址,代理接口的类型以及连接服务器时用到的一些参数。另外一个关键的对象是client,就是同感client对象与NN进行交互。前边也提到在使用RPC对象调用相关函数的时,其实进行真正共的是InvocationHandler中的invoke方法。那么我们必须来看一个这个invoke方法的实现了,其中最核心的代码如下
225 ObjectWritable value = (ObjectWritable) 226 client.call(new Invocation(method, args), remoteId);
代码很简短,主要就是调用了client的call方法。其中比较感兴趣的参数应该是new Invocation(method, args)这个Invocation的对象,其实从字面意思我们就可以理解到这个对象封装是具体调用的方法和参数(这些信息需要传递到服务器端,执行服务器端相应的方法)。其实这个Invocation类实现了Writable接口,熟悉源代码的童鞋应该知道这是Hadoop序列化和反序列话的一个接口,作用就明显了,那就是方便把这些信息以序列化的方式传递到服务器端。下边我们需要看一下在client的call方法中发生了那些剧情,核心代码如下:
1043 public Writable call(Writable param, ConnectionId remoteId) 1044 throws InterruptedException, IOException { 1045 Call call = new Call(param); 1046 Connection connection = getConnection(remoteId, call); 1047 connection.sendParam(call); // send the parameter 1048 boolean interrupted = false; 1049 synchronized (call) { 1050 while (!call.done) { 1051 try { 1052 call.wait(); // wait for the result 1053 } catch (InterruptedException ie) { 1054 // save the fact that we were interrupted 1055 interrupted = true; 1056 } 1057 }