3.jobTracker.submit():实例化JobInProgress之后,会根据jobProfile获取job的队列信息,并判断相应的队列是否在运行中,不在则任务失败。然后检查内存情况checkMemoryRequirements(job),再调用taskScheduler的taskScheduler.checkJobSubmission(job)检查任务提交情况(具体是啥玩意,不太情况)。接下来就是执行status = addJob(jobId, job),为Job设置listener。
4.jobTracker.addJob():前面说过,在初始化jobTracker的时候会实例化taskScheduler,然后调用taskScheduler的start()方法,为jobTracker添加JobListener对象,所以这里的JobInProgressListener对象就是相应的taskScheduler的JobListener,这里为job添加了JobListener。
private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
throws IOException {
totalSubmissions++;
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobAdded(job);
}
}
}
myInstrumentation.submitJob(job.getJobConf(), jobId);
job.getQueueMetrics().submitJob(job.getJobConf(), jobId);
LOG.info("Job " + jobId + " added successfully for user '"
+ job.getJobConf().getUser() + "' to queue '"
+ job.getJobConf().getQueueName() + "'");
AuditLogger.logSuccess(job.getUser(),
Operation.SUBMIT_JOB.name(), jobId.toString());
return job.getStatus();
}
到这里整个JobTracker的job提交过程就结束了,中间很多东西没有深入去研究,只是浅显的了解了下,如有错误,请指出,谢谢