Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机(2)

4.JobTracker.heartbeat():

// Process this heartbeat
    short newResponseId = (short)(responseId + 1);
    status.setLastSeen(now);
    if (!processHeartbeat(status, initialContact, now)) {
      if (prevHeartbeatResponse != null) {
        trackerToHeartbeatResponseMap.remove(trackerName);
      }
      return new HeartbeatResponse(newResponseId,
                  new TaskTrackerAction[] {new ReinitTrackerAction()});
    }

首先将responseId+1,然后记录心跳发送时间。接着来看看processHeartbeat()方法。
 
5.JobTracker.processHeartbeat():

boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                    trackerStatus);

根据该TT的上一次心跳发送的状态信息更新JT的一些信息,如totalMaps,totalReduces,occupiedMapSlots,occupiedReduceSlots等,接着根据本次心跳发送的TT状态信息再次更新这些变量。
 
6.JobTracker.processHeartbeat():

TaskTracker taskTracker = getTaskTracker(trackerName);
        if (initialContact) {
          // If it's first contact, then clear out
          // any state hanging around
          if (seenBefore) {
            lostTaskTracker(taskTracker);
          }
        } else {
          // If not first contact, there should be some record of the tracker
          if (!seenBefore) {
            LOG.warn("Status from unknown Tracker : " + trackerName);
            updateTaskTrackerStatus(trackerName, null);
            return false;
          }
        }

如果该TT是首次连接JT,且存在oldStatus,则表明JT丢失了TT,具体意思应该是JT在一段时间内与TT失去了联系,之后TT恢复了,所以发送心跳时显示首次连接。lostTaskTracker(taskTracker):会将该TT从所有的队列中移除,并将该TT上记录的job清除掉(kill掉),当然对那些已经完成的Job不会进行次操作。当TT不是首次连接到JT,但是JT却没有该TT的历史status信息,则表示JT对该TT未知,所以重新更新TaskTracker状态信息。
 
7.JobTracker.processHeartbeat():

updateTaskStatuses(trackerStatus);
    updateNodeHealthStatus(trackerStatus, timeStamp);

更新Task和NodeHealth信息,较复杂。
 
8.JobTracker.heartbeat():如果processHeartbeat()返回false,则返回HeartbeatResponse(),并下达重新初始化TT指令。

// Initialize the response to be sent for the heartbeat
    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
    boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
    // Check for new tasks to be executed on the tasktracker
    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
      if (taskTrackerStatus == null) {
        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
      } else {
        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        if (tasks == null ) {
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }
        if (tasks != null) {
          for (Task task : tasks) {
            expireLaunchingTasks.addNewTask(task.getTaskID());
            if(LOG.isDebugEnabled()) {
              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
            }
            actions.add(new LaunchTaskAction(task));
          }
        }
      }
    }

此处会实例化一个HeartbeatResponse对象,作为本次心跳的返回值,在初始化一个TaskTrackerAction队列,用于存放JT对TT下达的指令。首先需要判断recoveryManager的recoveredTrackers是否为空,即是否有需要回复的TT,然后根据TT心跳发送的acceptNewTasks值,即表明TT是否可接收新任务,并且该TT不在黑名单中,同上满足以上条件,则JT可以为TT分配任务。分配任务的选择方式是优先CleanipTask,然后是SetupTask,然后才是Map/Reduce Task。下面来看下getSetupAndCleanupTasks()方法。
 
9.JobTracker.getSetupAndCleanupTasks():

// Don't assign *any* new task in safemode
    if (isInSafeMode()) {
      return null;
    }

如果集群处于safe模式,则不分配任务。

int maxMapTasks = taskTracker.getMaxMapSlots();
    int maxReduceTasks = taskTracker.getMaxReduceSlots();
    int numMaps = taskTracker.countOccupiedMapSlots();
    int numReduces = taskTracker.countOccupiedReduceSlots();
    int numTaskTrackers = getClusterStatus().getTaskTrackers();
    int numUniqueHosts = getNumberOfUniqueHosts();

计算TT的最大map/reduce slot,以及已占用的map/reduce slot,以及集群可使用的TT数量,和集群的host数量。

for (Iterator<JobInProgress> it = jobs.values().iterator();
            it.hasNext();) {
          JobInProgress job = it.next();
          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                    numUniqueHosts, true);
          if (t != null) {
            return Collections.singletonList(t);
          }
        }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/4df51c8c50c12f279f6705c9d1d9cfea.html