Map-Reduce的过程首先是由客户端提交一个任务开始的。
提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:
public static RunningJob runJob(JobConf job) throws IOException {
//首先生成一个JobClient对象
JobClient jc = new JobClient(job);
……
//调用submitJob来提交一个任务
running = jc.submitJob(job);
JobID jobId = running.getID();
……
while (true) {
//while循环中不断得到此任务的状态,并打印到客户端console中
}
return running;
}
其中JobClient的submitJob函数实现如下:
public RunningJob submitJob(JobConf job) throws FileNotFoundException,
InvalidJobConfException, IOException {
//从JobTracker得到当前任务的id
JobID jobId = jobSubmitClient.getNewJobId();
//准备将任务运行所需要的要素写入HDFS:
//任务运行程序所在的jar封装成job.jar
//任务所要处理的input split信息写入job.split
//任务运行的配置项汇总写入job.xml
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");
//此处将-libjars命令行指定的jar上传至HDFS
configureCommandLineOptions(job, submitJobDir, submitJarFile);
Path submitJobFile = new Path(submitJobDir, "job.xml");
……
//通过input format的格式获得相应的input split,默认类型为FileSplit
InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());
// 生成一个写入流,将input split得信息写入job.split文件
FSDataOutputStream out = FileSystem.create(fs,
submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
try {
//写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。
//对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。
writeSplitsFile(splits, out);
} finally {
out.close();
}
job.set("mapred.job.split.file", submitSplitFile.toString());
//根据split的个数设定map task的个数
job.setNumMapTasks(splits.length);
// 写入job的配置信息入job.xml文件
out = FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));
try {
job.writeXml(out);
} finally {
out.close();
}
//真正的调用JobTracker来提交任务
JobStatus status = jobSubmitClient.submitJob(jobId);
……
}