JobTracker任务调度器之JobQueueTaskScheduler

在客户端把作业提交给JobTracker之后,JobTracker就需要开始考虑把这个Job交给那些TaskTracker来执行——Job任务调度。当然在JobTracker调度该Job之前,必须要确保该Job的JobInProgress被初始化了,即将Job划分为若干个map任务和reduce任务。在JobTracker中有一个默认的任务调度器JobQueueTaskScheduler,先来看看它的类图:

JobTracker任务调度器之JobQueueTaskScheduler



在上面的类图中,我们可以看出JobQueueTaskScheduler类依赖于两个JobInProgressListener的实现类,其中JobQueueJobInProgressListener类被用来按照队列来管理Job,EagerTaskInitializationListener类被用来初始化Job,即对Job进行map任务和reduce任务的切分(关于这个我将会在以后详细讨论)。关于这两个JobInProgressListener类的具体实现,我在这里不再作详细的阐述,因为他们的实现都比较的简单,有兴趣的童鞋可以自己查看它的源代码。所以,当JobTracker给某个TaskTracker分配任务时,它就会调用TaskScheduler的assignTasks(TaskTrackerStatus)方法,让TaskScheduler给该TaskTracker分配任务。那么,究竟TaskScheduler是如何给TaskTracker任务分配任务的,这就得看TaskScheduler的具体实现了,Hadoop允许用户自定义TaskScheduler来根据自己的实际情况来调度Job任务,这个具体实现可以通过配置文件中的mapred.jobtracker.taskScheduler项来设置。

下面以JobQueueTaskScheduler为例,来详细的讨论如何给一个TaskTracker分配任务。

JobTracker任务调度器之JobQueueTaskScheduler


1.分配map任务

JobTracker任务调度器之JobQueueTaskScheduler


2.分配reduce任务

JobTracker任务调度器之JobQueueTaskScheduler


2.计算一个TaskTracker是否需要预留任务slots

在任务调度器JobQueueTaskScheduler中,同时集群中的TaskTracker又比较多的情况下,它总是会想办法预留一些比较空闲的TaskTracker,以便能够快速的处理优先级比较高的Task或者发生错误的Task。它的具体实现如下:

private boolean exceededPadding(boolean isMapTask, ClusterStatus clusterStatus, int maxTaskTrackerSlots) {
      
    int numTaskTrackers = clusterStatus.getTaskTrackers();
    int totalTasks = (isMapTask) ? clusterStatus.getMapTasks() : clusterStatus.getReduceTasks();
    int totalTaskCapacity = isMapTask ? clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks();
    Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();

    boolean exceededPadding = false;
    synchronized (jobQueue) {
      int totalNeededTasks = 0;
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) {
          continue;
        }
        totalNeededTasks += isMapTask ? job.desiredMaps() : job.desiredReduces();
        int padding = 0;
        if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
          padding = Math.min(maxTaskTrackerSlots,(int) (totalNeededTasks * padFraction));
        }
        if (totalTasks + padding >= totalTaskCapacity) {
          exceededPadding = true;
          break;
        }

      }
    }

    return exceededPadding;
  }

其中,全局变量padFraction的默认值为0.01,也可通过配置文件中的mapred.jobtracker.taskalloc.capacitypad项来设置。

      对于JobQueueTaskScheduler的任务调度原则可总结如下:

1.任务先进先出;

2.尽量使集群每一个TaskTracker达到负载均衡(这个均衡是task数量上的而不是实际的工作强度);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/pszgd.html