当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
……
String trackerName = status.getTrackerName();
……
short newResponseId = (short)(responseId + 1);
……
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
//如果TaskTracker向JobTracker请求一个task运行
if (acceptNewTasks) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
//setup和cleanup的task优先级最高
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
//任务调度器分配任务
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
//将任务放入actions列表,返回给TaskTracker
expireLaunchingTasks.addNewTask(task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
}
}
……
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
……
return response;
}
默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
int numMaps = taskTracker.countMapTasks();
int numReduces = taskTracker.countReduceTasks();
//计算剩余的map和reduce的工作量:remaining
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
int totalMapTasks = job.desiredMaps();
int totalReduceTasks = job.desiredReduces();
remainingMapLoad += (totalMapTasks - job.finishedMaps());
remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
}
}
}
//计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。
int maxMapLoad = 0;
int maxReduceLoad = 0;
if (numTaskTrackers > 0) {
maxMapLoad = Math.min(maxCurrentMapTasks,
(int) Math.ceil((double) remainingMapLoad /
numTaskTrackers));
maxReduceLoad = Math.min(maxCurrentReduceTasks,
(int) Math.ceil((double) remainingReduceLoad
/ numTaskTrackers));
}
……
//map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task
if (numMaps < maxMapLoad) {
int totalNeededMaps = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
……
}
}
}
//分配完map task,再分配reduce task
if (numReduces < maxReduceLoad) {
int totalNeededReduces = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}
Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
return Collections.singletonList(t);
}
……
}
}
}
return null;
}
从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。