Hadoop mr heartbeat rpc代码分析(2)

                this.wait(1000);

              }  


图:Server.Listener线程

3)、Server.Reader主要是读取数据,做一些分析,再生成Server.call放进BlockingQueue<Call> callQueue队列中。完成后会阻塞在readSelector.select();


图:Server.Reader线程

4)、Server.Handler阻塞在队列上面,当callQueue队列中存在数据的时候就会被唤醒。执行任务主要是 调用 Server的子类(RPC.Server)���现的Call()函数。此函数会根据TaskTreacker传入的函数来反射调用JobTracker的实现(这里是InterTrackerProtocol接口的实现heartbeat(x,x,x)),完成后,再调用Server.Responder的函数。在一些特别的条件下会与Server.Responder线程协调工作。由Server.Responder线程完成数据压入网络的工作。

图:Server.Handler线程

5)、Server.Handler线程会反射调用TaskTreacker.heartbeat(x,x,x)会执行相关的工作,拿到结果HeartbeatResponse。

6)、Server.Responder线程在一些情况下会把响应结果压入网络中。

7)、Client.Connection线程被唤醒,他再调用call.nodify()唤醒TaskTreacker主线程。

此点,在开始Connection发送请求的时候,会把call的id存入在map中,在被唤醒的时候DataInputStream.readInt()直接返回一个call的id。用id到map查找相应的call,再调用call.nodify()唤醒TaskTracker,TaskTracker被唤醒后再根据call.value做相应的事情。如:启动一个任务。


备注:

其实在心跳中,最复杂的一部分还是RPC的实现,分为大约6组线程的协调。
以上的代码分析是基于0.19.1,高版本的代码可能所有不同。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ceb7cf125fc74a2baf1ecd4355213117.html