Hadoop学习总结:Map(5)

在 向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用 addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):

private void addToTaskQueue(LaunchTaskAction action) {

if (action.getTask().isMapTask()) {

mapLauncher.addToTaskQueue(action);

} else {

reduceLauncher.addToTaskQueue(action);

}

}

 

TaskLauncher 是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用 startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):

private void localizeJob(TaskInProgress tip) throws IOException {

  //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar

Path localJarFile = null;

Task t = tip.getTask();

JobID jobId = t.getJobID();

Path jobFile = new Path(t.getJobFile());

……

Path localJobFile = lDirAlloc.getLocalPathForWrite(

getLocalJobDir(jobId.toString())

+ Path.SEPARATOR + "job.xml",

jobFileSize, fConf);

RunningJob rjob = addTaskToJob(jobId, tip);

synchronized (rjob) {

if (!rjob.localized) {

FileSystem localFs = FileSystem.getLocal(fConf);

Path jobDir = localJobFile.getParent();

……

      //将job.split拷贝到本地

systemFS.copyToLocalFile(jobFile, localJobFile);

JobConf localJobConf = new JobConf(localJobFile);

Path workDir = lDirAlloc.getLocalPathForWrite(

(getLocalJobDir(jobId.toString())

+ Path.SEPARATOR + "work"), fConf);

if (!localFs.mkdirs(workDir)) {

throw new IOException("Mkdirs failed to create "

+ workDir.toString());

}

System.setProperty("job.local.dir", workDir.toString());

localJobConf.set("job.local.dir", workDir.toString());

// copy Jar file to the local FS and unjar it.

String jarFile = localJobConf.getJar();

long jarFileSize = -1;

if (jarFile != null) {

Path jarFilePath = new Path(jarFile);

localJarFile = new Path(lDirAlloc.getLocalPathForWrite(

getLocalJobDir(jobId.toString())

+ Path.SEPARATOR + "jars",

5 * jarFileSize, fConf), "job.jar");

if (!localFs.mkdirs(localJarFile.getParent())) {

throw new IOException("Mkdirs failed to create jars directory ");

}

        //将job.jar拷贝到本地

systemFS.copyToLocalFile(jarFilePath, localJarFile);

localJobConf.setJar(localJarFile.toString());

       //将job得configuration写成job.xml

OutputStream out = localFs.create(localJobFile);

try {

localJobConf.writeXml(out);

} finally {

out.close();

}

        // 解压缩job.jar

RunJar.unJar(new File(localJarFile.toString()),

new File(localJarFile.getParent().toString()));

}

rjob.localized = true;

rjob.jobConf = localJobConf;

}

}

  //真正的启动此Task

launchTaskForJob(tip, new JobConf(rjob.jobConf));

}

 

当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:

public synchronized void launchTask() throws IOException {

……

    //创建task运行目录

localizeTask(task);

if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

this.taskStatus.setRunState(TaskStatus.State.RUNNING);

}

    //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner

this.runner = task.createRunner(TaskTracker.this, this);

this.runner.start();

this.taskStatus.setStartTime(System.currentTimeMillis());

}

 

TaskRunner是一个线程,其run函数如下:

public final void run() {

……

TaskAttemptID taskid = t.getTaskID();

LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");

File jobCacheDir = null;

if (conf.getJar() != null) {

jobCacheDir = new File(

new Path(conf.getJar()).getParent().toString());

}

File workDir = new File(lDirAlloc.getLocalPathToRead(

TaskTracker.getLocalTaskDir(

t.getJobID().toString(),

t.getTaskID().toString(),

t.isTaskCleanupTask())

+ Path.SEPARATOR + MRConstants.WORKDIR,

conf). toString());

FileSystem fileSystem;

Path localPath;

……

//拼写classpath

String baseDir;

String sep = System.getProperty("path.separator");

StringBuffer classPath = new StringBuffer();

// start with same classpath as parent process

classPath.append(System.getProperty("java.class.path"));

classPath.append(sep);

if (!workDir.mkdirs()) {

if (!workDir.isDirectory()) {

LOG.fatal("Mkdirs failed to create " + workDir.toString());

}

}

String jar = conf.getJar();

if (jar != null) {      

// if jar exists, it into workDir

File[] libs = new File(jobCacheDir, "lib").listFiles();

if (libs != null) {

for (int i = 0; i < libs.length; i++) {

classPath.append(sep);            // add libs from jar to classpath

classPath.append(libs[i]);

}

}

classPath.append(sep);

classPath.append(new File(jobCacheDir, "classes"));

classPath.append(sep);

classPath.append(jobCacheDir);

}

……

classPath.append(sep);

classPath.append(workDir);

    //拼写命令行java及其参数

Vector<String> vargs = new Vector<String>(8);

File jvm =

new File(new File(System.getProperty("java.home"), "bin"), "java");

vargs.add(jvm.toString());

String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");

javaOpts = javaOpts.replace("@taskid@", taskid.toString());

String [] javaOptsSplit = javaOpts.split(" ");

String libraryPath = System.getProperty("java.library.path");

if (libraryPath == null) {

libraryPath = workDir.getAbsolutePath();

} else {

libraryPath += sep + workDir;

}

boolean haSUSErLDPath = false;

for(int i=0; i<javaOptsSplit.length ;i++) {

if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {

javaOptsSplit[i] += sep + libraryPath;

hasUserLDPath = true;

break;

}

}

if(!hasUserLDPath) {

vargs.add("-Djava.library.path=" + libraryPath);

}

for (int i = 0; i < javaOptsSplit.length; i++) {

vargs.add(javaOptsSplit[i]);

}

    //添加Child进程的临时文件夹

String tmp = conf.get("mapred.child.tmp", "./tmp");

Path tmpDir = new Path(tmp);

if (!tmpDir.isAbsolute()) {

tmpDir = new Path(workDir.toString(), tmp);

}

FileSystem localFs = FileSystem.getLocal(conf);

if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {

throw new IOException("Mkdirs failed to create " + tmpDir.toString());

}

vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

// Add classpath.

vargs.add("-classpath");

vargs.add(classPath.toString());

    //log文件夹

long logSize = TaskLog.getTaskLogLength(conf);

vargs.add("-DHadoop.log.dir=" +

new File(System.getProperty("hadoop.log.dir")

).getAbsolutePath());

vargs.add("-Dhadoop.root.logger=INFO,TLA");

vargs.add("-Dhadoop.tasklog.taskid=" + taskid);

vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

// 运行map task和reduce task的子进程的main class是Child

vargs.add(Child.class.getName());  // main of Child

……

    //运行子进程

jvmManager.launchJvm(this,

jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,

workDir, env, pidFile, conf));

}

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppfzg.html