FairScheduler job初始化过程源码浅析

上一篇文章()说到了jobTracker中的submitJob()方法,这个方法最终会调用listener.jobAdded(job),将Job注册到TaskScheduler中,由其进行调度。今天接着研究。Hadoop中默认的TaskScheduler是JobQueueTaskScheduler,采用的是FIFO(先进先出)原则进行调度,还有FiarScheduler和CapacityTaskScheduler两种调度类(非hadoop自带,不过hadoop也把他们加入到类库中),这两个类可以在hadoop目录下的lib包下找到,源码在src/contrib下可以找到。主要对FairScheduler进行解读。

上文提到jobTracker最终将job注册到jobListener中,下面就来看看FairScheduler的JobListener。

1.FairScheduler.JobListener.addJob():这个方法比较简单,JobSchedulable mapSched = ReflectionUtils.newInstance(conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class, JobSchedulable.class), conf)这里通过反射获得两个JobSchedulable对象,也就是默认的FairScheduler.JobSchedulable对象,一个是mapSched,一个是redSched,然后进行JobSchedulable的初始化,比较简单。infos.put(job, info)将job添加到infos(存放所有的jobInPorgress对象)中,同时将job添加到PoolScheduable中,主要是根据配置的poolName获取对应的pool。下面的是重点,update()方法,下面看看这个方法。

public void jobAdded(JobInProgress job) {
      synchronized (FairScheduler.this) {
        eventLog.log("JOB_ADDED", job.getJobID());
        JobSchedulable mapSched = ReflectionUtils.newInstance(
            conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class,
                JobSchedulable.class), conf);
        mapSched.init(FairScheduler.this, job, TaskType.MAP);

JobSchedulable redSched = ReflectionUtils.newInstance(
            conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class,
                JobSchedulable.class), conf);
        redSched.init(FairScheduler.this, job, TaskType.REDUCE);

JobInfo info = new JobInfo(mapSched, redSched);
        infos.put(job, info);
        poolMgr.addJob(job); // Also adds job into the right PoolScheduable
        update();
      }
    }

2.FairScheduler.update():跳过看不懂的,直接看poolMgr.reloadAllocsIfNecessary(),这个方法主要是读取FairScheduler的配置文件(fair-scheduler.xml),由mapred.fairscheduler.allocation.file参数设置,这里是根据配置文件的最后修改时间+ALLOC_RELOAD_INTERVAL决定是否重新加载配置文件,加载文件的时候就是简单地读取xml文件。接着看update方法,加载完配置文件之后会遍历infos(保存了FairScheduler所有的jobInProgress),遍历的时候去除成功了的job和失败了的job以及被kill掉的job,同时也会从pool中去掉该job。接下来就是updateRunnability(),这个方法会根据userMaxJob以及poolMaxJob数量进行判断是否启动job。

List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
      for (JobInProgress job: infos.keySet()) {
        int runState = job.getStatus().getRunState();
        if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
          || runState == JobStatus.KILLED) {
            toRemove.add(job);
        }
      }
      for (JobInProgress job: toRemove) {
        jobNoLongerRunning(job);
      }

3.FairScheduler.updateRunnability():第一步将所有infos中剩余的job(成功以及失败的任务会在update时清除)状态全部设为notrunning。接着对infos中的job进行排序,Collections.sort(jobs, new FifoJobComparator()),排序规则是FIFO原则(奇怪,不懂)。然后接着对jobs进行遍历,同时根据该job的提交用户和提交的pool的最大提交job数量决定是否将其添加到任务队列中(就是两个list),如果该job状态=RUNNING,则jobinfo.running=true,如果job状态=PREP(准备中),则对其进行初始化(注意这里只对job状态=RUNNING和PREP的job进行操作)。jobInitializer.initJob(jobInfo, job)进行job初始化,这里使用到jdk的threadPool(其实就是将thread加入到线程池中,由线程池绝对什么时候对其进行执行,总之都会调用thread的run方法),看看thread的run方法。run方法中调用ttm.initJob(job),此处的ttm就是jobTracker,现在回到jobTracker去。

if (userCount < poolMgr.getUserMaxJobs(user) &&
          poolCount < poolMgr.getPoolMaxJobs(pool)) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING ||
            job.getStatus().getRunState() == JobStatus.PREP) {
          userJobs.put(user, userCount + 1);
          poolJobs.put(pool, poolCount + 1);
          JobInfo jobInfo = infos.get(job);
          if (job.getStatus().getRunState() == JobStatus.RUNNING) {
            jobInfo.runnable = true;
          } else {
            // The job is in the PREP state. Give it to the job initializer
            // for initialization if we have not already done it.
            if (jobInfo.needsInitializing) {
              jobInfo.needsInitializing = false;
              jobInitializer.initJob(jobInfo, job);
            }
          }
        }
      }

更多详情见请继续阅读下一页的精彩内容:

相关阅读

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/b863d2b6f31e8e2afdef814c23e98022.html