Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机(3)

首先获取Job的Cleanup任务,每个Job有两个Cleanup任务,分别是map和reduce的。

for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
t = job.obtainTaskCleanupTask(taskTracker, true);
if (t != null) {
return Collections.singletonList(t);
}
}

然后获取一个Cleanup任务的TaskAttempt。

for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
numUniqueHosts, true);
if (t != null) {
return Collections.singletonList(t);
}
}

然后在获取Job的setup任务。上面这三个全部是获取的map任务,而下面是获取reduce任务,方法基本一样。

如果该方法返回null,则表示没有cleanup或者setup任务需要执行,则执行map/reduce任务。

10.JobTracker.heartbeat():

if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}

此处是使用TaskScheduler调度任务,一大难点,后期分析。

11.JobTracker.heartbeat():

if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()) {
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}

生成一个LaunchTaskAction指令。

// Check for tasks to be killed
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
actions.addAll(killTasksList);
}

// Check for jobs to be killed/cleanedup
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
if (killJobsList != null) {
actions.addAll(killJobsList);
}

// Check for tasks whose outputs can be saved
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}

以上分别是下达kill task指令,kill/cleanedup job指令,commit task指令。以上四种指令,加上一个ReinitTackerAction,这是心跳JT对TT下达的所有五种指令,以后可以相信对其进行分析。

12.JobTracker.heartbeat():

// calculate next heartbeat interval and put in heartbeat response
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));

// check if the restart info is req
if (addRestartInfo) {
response.setRecoveredJobs(recoveryManager.getJobsToRecover());
}

// Update the trackerToHeartbeatResponseMap
trackerToHeartbeatResponseMap.put(trackerName, response);

// Done processing the hearbeat, now remove 'marked' tasks
removeMarkedTasks(trackerName);

剩下一些收尾工作,如计算下次发送心跳的时间,以及设置需要TT进行恢复的任务,更新trackerToHeartbeatResponseMap队列,移除标记的task。最后返回HeartbeatResponse对象,完成心跳请求响应。

到此JT的heartbeat()完成了,中间很多地方比较复杂,都没有去深追,以后有时间可以继续研究,如有错误,请不吝指教,谢谢

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/4df51c8c50c12f279f6705c9d1d9cfea.html