Hadoop JobTracker提交job源码浅析(3)

3.jobTracker.submit():实例化JobInProgress之后,会根据jobProfile获取job的队列信息,并判断相应的队列是否在运行中,不在则任务失败。然后检查内存情况checkMemoryRequirements(job),再调用taskScheduler的taskScheduler.checkJobSubmission(job)检查任务提交情况(具体是啥玩意,不太情况)。接下来就是执行status = addJob(jobId, job),为Job设置listener。

4.jobTracker.addJob():前面说过,在初始化jobTracker的时候会实例化taskScheduler,然后调用taskScheduler的start()方法,为jobTracker添加JobListener对象,所以这里的JobInProgressListener对象就是相应的taskScheduler的JobListener,这里为job添加了JobListener。

private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
  throws IOException {
    totalSubmissions++;

synchronized (jobs) {
      synchronized (taskScheduler) {
        jobs.put(job.getProfile().getJobID(), job);
        for (JobInProgressListener listener : jobInProgressListeners) {
          listener.jobAdded(job);
        }
      }
    }
    myInstrumentation.submitJob(job.getJobConf(), jobId);
    job.getQueueMetrics().submitJob(job.getJobConf(), jobId);

LOG.info("Job " + jobId + " added successfully for user '"
            + job.getJobConf().getUser() + "' to queue '"
            + job.getJobConf().getQueueName() + "'");
    AuditLogger.logSuccess(job.getUser(),
        Operation.SUBMIT_JOB.name(), jobId.toString());
    return job.getStatus();
  }

到这里整个JobTracker的job提交过程就结束了,中间很多东西没有深入去研究,只是浅显的了解了下,如有错误,请指出,谢谢

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/67d76d6fd816a2fbe1b1d392c3c9baca.html