Hadoop Map/Reduce执行全流程关键代码
JobClient.runJob(conf) | 运行job
|-->JobClient jc = new JobClient(job); |-->RunningJob rj = jc.submitJob(job); |-->submitJobInternal(job); |-->int reduces = job.getNumReduceTasks(); |-->JobContext context = new JobContext(job, jobId); |-->maps = writeOldSplits(job, submitSplitFile); |-->job.setNumMapTasks(maps); |-->job.writeXml(out); |-->JobStatus status = jobSubmitClient.submitJob(jobId); JobTracker.submitJob(JobId) |提交job |-->JobInProgress job = new JobInProgress(jobId, this, this.conf); |-->checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB); |检查权限 |-->checkMemoryRequirements(job); |检查内存需求 |-->addJob(jobId, job); |添加至job队列 |-->jobs.put(job.getProfile().getJobID(), job); |--> for (JobInProgressListener listener : jobInProgressListeners) |添加至监听器,供调度使用 |-->listener.jobAdded(job); JobTracker.heartbeat() |JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合 |-->List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); |-->tasks = taskScheduler.assignTasks(taskTrackerStatus); |通过调度器选择合适的tasks |-->for (Task task : tasks) |-->expireLaunchingTasks.addNewTask(task.getTaskID()); |-->actions.add(new LaunchTaskAction(task)); |实际actions还会添加commmitTask等 |-->response.setHeartbeatInterval(nextInterval); |-->response.setActions(actions.toArray(new TaskTrackerAction[actions.size()])); |-->return response; TaskTracker.offerService |TaskTracker启动后通过offerservice()不断发心跳至JobTracker中 |-->transmitHeartBeat() |-->HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited,askForNewTask, heartbeatResponseId); |-->TaskTrackerAction[] actions = heartbeatResponse.getActions(); |-->for(TaskTrackerAction action: actions) |-->if (action instanceof LaunchTaskAction) |-->addToTaskQueue((LaunchTaskAction)action); |添加至执行Queue,根据map/reduce task分别添加 |-->if (action.getTask().isMapTask()) { |-->mapLauncher.addToTaskQueue(action); |-->TaskInProgress tip = registerTask(action, this); |-->tasksToLaunch.add(tip); |-->tasksToLaunch.notifyAll(); |唤醒阻塞进程 |-->else |-->reduceLauncher.addToTaskQueue(action); TaskLauncher.run() |--> while (tasksToLaunch.isEmpty()) |-->tasksToLaunch.wait(); |-->tip = tasksToLaunch.remove(0); |-->startNewTask(tip); |-->localizeJob(tip); |-->launchTaskForJob(tip, new JobConf(rjob.jobConf)); |-->tip.setJobConf(jobConf); |-->tip.launchTask(); |TaskInProgress.launchTask() |-->this.runner = task.createRunner(TaskTracker.this, this); |区分map/reduce |-->this.runner.start(); MapTaskRunner.run() |执行MapTask |-->File workDir = new File(lDirAlloc.getLocalPathToRead() |准备执行路径 |-->String jar = conf.getJar(); |准备jar包 |-->File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java"); |获取jvm |-->vargs.add(Child.class.getName()); |添加参数,Child类作为main主函数启动 |-->tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile); |添加至内存管理 |-->jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, |统一纳入jvm管理器当中并启动 workDir, env, pidFile, conf)); |-->mapJvmManager.reapJvm(t, env); |区分map/reduce操作 JvmManager.reapJvm() | |--> while (jvmIter.hasNext()) |-->JvmRunner jvmRunner = jvmIter.next().getValue(); |-->JobID jId = jvmRunner.jvmId.getJobId(); |-->setRunningTaskForJvm(jvmRunner.jvmId, t); |-->spawnNewJvm(jobId, env, t); |-->JvmRunner jvmRunner = new JvmRunner(env,jobId); |-->jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner); |-->jvmRunner.start(); |执行JvmRunner的run()方法 |-->jvmRunner.run() |-->runChild(env); |-->List<String> wrappedCommand = TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr, env.logSize, env.pidFile); |选取main函数 |-->shexec.execute(); |执行 |-->int exitCode = shexec.getExitCode(); |获取执行状态值 |--> updateOnJvmExit(jvmId, exitCode, killed); |更新Jvm状态 Child.main() 执行Task(map/reduce) |-->JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt); |-->TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID, address, defaultConf); |--> while (true) |-->JvmTask myTask = umbilical.getTask(jvmId); |-->task = myTask.getTask(); |-->taskid = task.getTaskID(); |-->TaskRunner.setupWorkDir(job); |-->task.run(job, umbilical); |以maptask为例 |-->TaskReporter reporter = new TaskReporter(getProgress(), umbilical); |-->if (useNewApi) |-->runNewMapper(job, split, umbilical, reporter); |-->else |-->runOldMapper(job, split, umbilical, reporter); |-->inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job); |-->MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); |-->runner.run(in, new OldOutputCollector(collector, conf), reporter); MapRunner.run() |--> K1 key = input.createKey(); |-->V1 value = input.createValue(); |-->while (input.next(key, value)) |-->mapper.map(key, value, output, reporter); |--> if(incrProcCount) |-->reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, |-->SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); |-->mapper.close();