Hadoop异步RPC通信机制

Hadoop的IPC是实现rpc的一种方法,不基于java的序列化机制。IPC中方法的调用参数和返回值只能是:

1、java基本类型

2、String和Writeable接口的实现类

3、以1、2元素为类型的数组

4、接口只允许抛出IOException

采用的是C/S模型(Client-NameNode,Client-DataNode,NameNode-DataNode,DataNode-DataNode)

如何区分不同的请求,hadoop的rpc通过给每个请求call一个id,当请求返回的时候,要附带上这个id,这样请求者就可以知道这是哪个请求了。

客户端Client与服务器Server的请求用Call类表示,以Call.id来区分不同的请求。不同的请求集合存放在连接类Connection的HashTable calls中:

private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();

Connection是一个线程,表示Client和Server的连接,类似socket。一个C/S对应一个连接,对应一个Connection线程。Connection保存了该socket上的所有请求集合。同时在Connection.run里面循环处理接收端发送回来的响应信息。receiveResponse处理响应消息的流程如下:

先读取请求的id,根据该请求id得到相应的call,再读取响应的状态state,根据响应的状态(成功,错误,失败)进行处理。如果请求状态为成功,则将返回值setValue到call.value里【注:此时会notify唤醒阻塞的call,使得Client.call()方法知道已经请求成功了,好继续处理】,设置请求结束标志call.done=true,再将该call移除出calls。

一个Client可以连接多个Server,所有一个Client内可以有多个Connection。每个Connection用一个ConnectionId标识,保存在Client的HashTable connections中:

private Hashtable<ConnectionId, Connection>connections =new Hashtable<ConnectionId, Connection>();

Client发送请求给server的流程client.call():

将要发送的内容param作为参数,创建一个Call实例call-->调用client的getConnection(调用addCall将call添加到calls中)获取client-server对应的connection-->利用connection.sendParam(call)将call上的数据call.param发送出去-->等待server的响应call.wait()-->接收到响应(connection线程接收并处理响应消息。处理消息时Call.setValue会调用notify方法唤醒)之后,返回响应值call.value。

Client通过调用Connection.addCall(Call)方法将一个请求添加到calls中。

总结来说:

1、Connection线程是一个client和server间的连接,保存了它们的所有请求,并不断接收和处理server发送过来的请求。如果

2、client通过Client.call()将请求发送给server,最终返回server的响应信息。

上述两者可以视为hadoop的异步通信机制。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/d0c28d5fce92a9f34a725091154f8ba5.html