ok,开始说说今天的主题Hadoop的RPC机制,之所以在HDFS源码解析的系列中添加这部分的内容,是因为DN和NN交互使用的就是RPC的机制,而RPC机制这部分代码年前也是比较深入的研究过,但是是模仿RPC的机制进行分布式检索的实现。
开始先介绍一下RPC几个主要的组成类RPC.java、Client.java、Server.java,其中RPC类主要是提供对外服务的函数实现动态代理机制,Client是RPC进行服务的函数,主要是连接服务器、传递函数名和相应的参数、等待结果返回,Server主要接受Client的请求、执行相应的函数、返回结果。
Hadoop中的RPC比较难理解的部分就是动态代理的实现,感觉下面这个图画的比较清楚。从图中可以看出动态代理其实只能代理某一个接口,所以所有需要被动态代理的实现类都要实现该接口,在Hadoop中接口是VersionedProtocol。在动态代理中结合了InvocationHandler的功能,保证匿名实现类的方法能够得到正确的执行。使用匿名实现类的对象调用某个方法,实际上调用的是InvocationHandler中的invoke方法,通过该方法将要调用的函数名以及参数告知服务器端,后边会详细介绍一些细节。
结合NN和DN之间的交互作为例子对RPC的原理做一个简单的剖析
在DataNode.java的startDataNode函数中有这样一个调用
348 this.namenode = (DatanodeProtocol) 349 RPC.waitForProxy(DatanodeProtocol.class, 350 DatanodeProtocol.versionID, 351 nameNodeAddr, 352 conf);
我们可以看到通过调用RPC的waitForProxy方法创建了一个DatanodeProtocol对象namenode(其中DatanodeProtocol实现了接口VersionedProtocol),那么namenode应该就是前边提到的匿名实现类的对象,通过这个对象就可以与NN进行交互了。下边看一下RPC中的相关实现:
首先看一下waitForProxy函数干了些什么
329 while (true) { 330 try { 331 return getProxy(protocol, clientVersion, addr, conf, rpcTimeout); 332 } catch(ConnectException se) { // namenode has not been started
很明显waitForProxy是保证NN出现一些意外的情况下,我们还是可以获得相应的交互对象。那么接着来看getProxy的作用:392 VersionedProtocol proxy = 393 (VersionedProtocol) Proxy.newProxyInstance( 394 protocol.getClassLoader(), new Class[] { protocol }, 395 new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
这里才是生成RPC对象的入口(姑且就称作RPC对象),前两个参数 protocol.getClassLoader(), new Class[] { protocol },比较好理解,一个是获得相应的classloader(因为需要在运行是动态的生成对象),第二个就是说明相应的接口类。可能现在最感兴趣的是第三个参数new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout),我们前边提到过 Proxy需要与InvocationHandler相结合才能达到想要的效果,那么第三个参数到底是不是InvocationHandler的对象呢,我们看一下Invoke类的相关实现。