上一篇说到jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials())这里,这里就是jobTracker进行job的提交过程,还有一个JobSubmissionProtocol的实现是LocalJobRunner,这是本地执行的时候使用的,真正集群运行Job还是使用的jobTracker,所以只看jobTracker类的submitJob。
1.jobTracker.submitJob():第一句就是checkJobTrackerState()这个是检查jobTracker状态,是否运行中,这里说一句,jobTracker是在Hadoop集群启动的时候启动的,也就是在执行start-all或者start-mapred的时候启动,启动的时候会调用JobTracker的main方法,然后在jps的时候就可以看见一个jobTracker的进程了。下面来看一下JobTracker.main()方法。
2.JobTracker.main():第一句是JobTracker tracker = startTracker(new JobConf()),这是实例化一个jobTracke实例。
3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个jobTracker对象,在实例化的时候会做很多事,所以还是进去瞅瞅。
4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化taskScheduler的内容:Class<? extends TaskScheduler> schedulerClass
= conf.getClass("mapred.jobtracker.taskScheduler",JobQueueTaskScheduler.class, TaskScheduler.class); taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf),这两句就是根据配置文件设置的taskScheduler类名,通过反射获得对应的taskScheduler对象,在实例化的时候虽然不同的TaskScheduler具体操作不一样,但是统一的都会初始化一个JobListener对象,这个对象就是后面将要监听job的listener。剩下的内容就不说了。回到JobTracker.startTracker()方法。
5.JobTracker.JobTracker():在实例化jobTracker之后,会执行result.taskScheduler.setTaskTrackerManager(result),这个就是将jobTracker对象设置给taskScheduler。后面就什么了,现在可以回到main方法了
public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
throws IOException, InterruptedException {
DefaultMetricsSystem.initialize("JobTracker");
JobTracker result = null;
while (true) {
try {
result = new JobTracker(conf, identifier);
result.taskScheduler.setTaskTrackerManager(result);
break;
} catch (VersionMismatch e) {
throw e;
} catch (BindException e) {
throw e;
} catch (UnknownHostException e) {
throw e;
} catch (AccessControlException ace) {
// in case of jobtracker not having right access
// bail out
throw ace;
} catch (IOException e) {
LOG.warn("Error starting tracker: " +
StringUtils.stringifyException(e));
}
Thread.sleep(1000);
}
if (result != null) {
JobEndNotifier.startNotifier();
MBeans.register("JobTracker", "JobTrackerInfo", result);
if(initialize == true) {
result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
result.initializeFilesystem();
result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
result.initialize();
}
}
return result;
}
相关阅读:
Ubuntu 13.04上搭建Hadoop环境
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置
搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)