在客户端把作业提交给JobTracker之后,JobTracker就需要开始考虑把这个Job交给那些TaskTracker来执行——Job任务调度。当然在JobTracker调度该Job之前,必须要确保该Job的JobInProgress被初始化了,即将Job划分为若干个map任务和reduce任务。在JobTracker中有一个默认的任务调度器JobQueueTaskScheduler,先来看看它的类图:
在上面的类图中,我们可以看出JobQueueTaskScheduler类依赖于两个JobInProgressListener的实现类,其中JobQueueJobInProgressListener类被用来按照队列来管理Job,EagerTaskInitializationListener类被用来初始化Job,即对Job进行map任务和reduce任务的切分(关于这个我将会在以后详细讨论)。关于这两个JobInProgressListener类的具体实现,我在这里不再作详细的阐述,因为他们的实现都比较的简单,有兴趣的童鞋可以自己查看它的源代码。所以,当JobTracker给某个TaskTracker分配任务时,它就会调用TaskScheduler的assignTasks(TaskTrackerStatus)方法,让TaskScheduler给该TaskTracker分配任务。那么,究竟TaskScheduler是如何给TaskTracker任务分配任务的,这就得看TaskScheduler的具体实现了,Hadoop允许用户自定义TaskScheduler来根据自己的实际情况来调度Job任务,这个具体实现可以通过配置文件中的mapred.jobtracker.taskScheduler项来设置。
下面以JobQueueTaskScheduler为例,来详细的讨论如何给一个TaskTracker分配任务。
1.分配map任务
2.分配reduce任务
2.计算一个TaskTracker是否需要预留任务slots
在任务调度器JobQueueTaskScheduler中,同时集群中的TaskTracker又比较多的情况下,它总是会想办法预留一些比较空闲的TaskTracker,以便能够快速的处理优先级比较高的Task或者发生错误的Task。它的具体实现如下:
private boolean exceededPadding(boolean isMapTask, ClusterStatus clusterStatus, int maxTaskTrackerSlots) {
int numTaskTrackers = clusterStatus.getTaskTrackers();
int totalTasks = (isMapTask) ? clusterStatus.getMapTasks() : clusterStatus.getReduceTasks();
int totalTaskCapacity = isMapTask ? clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks();
Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
boolean exceededPadding = false;
synchronized (jobQueue) {
int totalNeededTasks = 0;
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) {
continue;
}
totalNeededTasks += isMapTask ? job.desiredMaps() : job.desiredReduces();
int padding = 0;
if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
padding = Math.min(maxTaskTrackerSlots,(int) (totalNeededTasks * padFraction));
}
if (totalTasks + padding >= totalTaskCapacity) {
exceededPadding = true;
break;
}
}
}
return exceededPadding;
}
其中,全局变量padFraction的默认值为0.01,也可通过配置文件中的mapred.jobtracker.taskalloc.capacitypad项来设置。
对于JobQueueTaskScheduler的任务调度原则可总结如下:
1.任务先进先出;
2.尽量使集群每一个TaskTracker达到负载均衡(这个均衡是task数量上的而不是实际的工作强度);