Hadoop学习总结:Map(4)

当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean initialContact, boolean acceptNewTasks, short responseId)

throws IOException {

……

String trackerName = status.getTrackerName();

……

short newResponseId = (short)(responseId + 1);

……

HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

  //如果TaskTracker向JobTracker请求一个task运行

if (acceptNewTasks) {

TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

if (taskTrackerStatus == null) {

LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

} else {

      //setup和cleanup的task优先级最高

List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

if (tasks == null ) {

        //任务调度器分配任务

tasks = taskScheduler.assignTasks(taskTrackerStatus);

}

if (tasks != null) {

for (Task task : tasks) {

          //将任务放入actions列表,返回给TaskTracker

expireLaunchingTasks.addNewTask(task.getTaskID());

actions.add(new LaunchTaskAction(task));

}

}

}

}

……

int nextInterval = getNextHeartbeatInterval();

response.setHeartbeatInterval(nextInterval);

response.setActions(

actions.toArray(new TaskTrackerAction[actions.size()]));

……

return response;

}

 

默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:

public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)

throws IOException {

ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

int numTaskTrackers = clusterStatus.getTaskTrackers();

Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();

int maxCurrentMapTasks = taskTracker.getMaxMapTasks();

int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

int numMaps = taskTracker.countMapTasks();

int numReduces = taskTracker.countReduceTasks();

  //计算剩余的map和reduce的工作量:remaining

int remainingReduceLoad = 0;

int remainingMapLoad = 0;

synchronized (jobQueue) {

for (JobInProgress job : jobQueue) {

if (job.getStatus().getRunState() == JobStatus.RUNNING) {

int totalMapTasks = job.desiredMaps();

int totalReduceTasks = job.desiredReduces();

remainingMapLoad += (totalMapTasks - job.finishedMaps());

remainingReduceLoad += (totalReduceTasks - job.finishedReduces());

}

}

}

  //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。

int maxMapLoad = 0;

int maxReduceLoad = 0;

if (numTaskTrackers > 0) {

maxMapLoad = Math.min(maxCurrentMapTasks,

(int) Math.ceil((double) remainingMapLoad /

numTaskTrackers));

maxReduceLoad = Math.min(maxCurrentReduceTasks,

(int) Math.ceil((double) remainingReduceLoad

/ numTaskTrackers));

}

……

  //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task

if (numMaps < maxMapLoad) {

int totalNeededMaps = 0;

synchronized (jobQueue) {

for (JobInProgress job : jobQueue) {

if (job.getStatus().getRunState() != JobStatus.RUNNING) {

continue;

}

Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,

taskTrackerManager.getNumberOfUniqueHosts());

if (t != null) {

return Collections.singletonList(t);

}

……

}

}

}

  //分配完map task,再分配reduce task

if (numReduces < maxReduceLoad) {

int totalNeededReduces = 0;

synchronized (jobQueue) {

for (JobInProgress job : jobQueue) {

if (job.getStatus().getRunState() != JobStatus.RUNNING ||

job.numReduceTasks == 0) {

continue;

}

Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,

taskTrackerManager.getNumberOfUniqueHosts());

if (t != null) {

return Collections.singletonList(t);

}

……

}

}

}

return null;

}

 

从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppfzg.html