Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制

Hadoop中JT(JobTracker)与TT(TaskTracker)之间的通信是通过心跳机制完成的。JT实现InterTrackerProtocol协议,该协议定义了JT与TT之间的通信机制——心跳。心跳机制实际上就是一个RPC请求,JT作为Server,而TT作为Client,TT通过RPC调用JT的heartbeat方法,将TT自身的一些状态信息发送给JT,同时JT通过返回值返回对TT的指令。

目录

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——TT篇

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——JT篇

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——命令篇

心跳有三个作用:

1)判断TT是否活着

2)报告TT的资源情况以及任务运行情况

3)为TT发送指令(如运行task,kill task等)

下面详细阅读下涉及到心跳调用的源码。

首先我们需要清楚,心跳机制是TT调用JT的方法,而非JT主动调用TT的方法。TT通过transmitHeartBeat方法调用JT的heartbeat方法。

1.TaskTracker.transmitHeartBeat:

// Send Counters in the status once every COUNTER_UPDATE_INTERVAL
    boolean sendCounters;
    if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
      sendCounters = true;
      previousUpdate = now;
    }
    else {
      sendCounters = false;
    }

根据sendCounters的间隔判断此次心跳是否发送计算器信息。
 
2.TaskTracker.transmitHeartBeat:

1.TaskTracker.transmitHeartBeat:

// Check if the last heartbeat got through...
    // if so then build the heartbeat information for the JobTracker;
    // else resend the previous status information.
    //
    if (status == null) {
      synchronized (this) {
        status = new TaskTrackerStatus(taskTrackerName, localHostname,
                                      httpPort,
                                      cloneAndResetRunningTaskStatuses(
                                        sendCounters),
                                      taskFailures,
                                      localStorage.numFailures(),
                                      maxMapSlots,
                                      maxReduceSlots);
      }
    } else {
      LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
              "' with reponseId '" + heartbeatResponseId);
    }

此处根据status变量是否为null,判断上次的心跳是否成功发送。tatus!=null,则表示上次的心跳尚未发送,所以直接将上次收集到的TT状态信息(封装在status中)发送给JT;相反,status==null,则表示上次心跳已完成,重新收集TT的状态信息,同样封装到status中。下面详细看下new TaskTrackerStatus()方法。注意此处有个cloneAndResetRunningTaskStatuses(sendCounters)方法:

private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
                                          boolean sendCounters) {
    List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
    for(TaskInProgress tip: runningTasks.values()) {
      TaskStatus status = tip.getStatus();
      status.setIncludeCounters(sendCounters);
      // send counters for finished or failed tasks and commit pending tasks
      if (status.getRunState() != TaskStatus.State.RUNNING) {
        status.setIncludeCounters(true);
      }
      result.add((TaskStatus)status.clone());
      status.clearStatus();
    }
    return result;
  }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/62516c0392a42080fbd76bd24d9549b5.html