Hadoop学习总结:Map(6)

真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:

while (true) {

  //从TaskTracker通过网络通信得到JvmTask对象

JvmTask myTask = umbilical.getTask(jvmId);

……

idleLoopCount = 0;

task = myTask.getTask();

taskid = task.getTaskID();

isCleanup = task.isTaskCleanupTask();

JobConf job = new JobConf(task.getJobFile());

TaskRunner.setupWorkDir(job);

numTasksToExecute = job.getNumTasksToExecutePerJvm();

task.setConf(job);

defaultConf.addResource(new Path(task.getJobFile()));

……

  //运行task

task.run(job, umbilical);             // run the task

if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {

break;

}

}

 
6.1、MapTask

如果task是MapTask,则其run函数如下:

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException {

  //用于同TaskTracker进行通信,汇报运行状况

final Reporter reporter = getReporter(umbilical);

startCommunicationThread(umbilical);

initialize(job, reporter);

……

  //map task的输出

int numReduceTasks = conf.getNumReduceTasks();

MapOutputCollector collector = null;

if (numReduceTasks > 0) {

collector = new MapOutputBuffer(umbilical, job, reporter);

} else {

collector = new DirectMapOutputCollector(umbilical, job, reporter);

}

  //读取input split,按照其中的信息,生成RecordReader来读取数据

instantiatedSplit = (InputSplit)

ReflectionUtils.newInstance(job.getClassByName(splitClass), job);

DataInputBuffer splitBuffer = new DataInputBuffer();

splitBuffer.reset(split.getBytes(), 0, split.getLength());

instantiatedSplit.readFields(splitBuffer);

if (instantiatedSplit instanceof FileSplit) {

FileSplit fileSplit = (FileSplit) instantiatedSplit;

job.set("map.input.file", fileSplit.getPath().toString());

job.setLong("map.input.start", fileSplit.getStart());

job.setLong("map.input.length", fileSplit.getLength());

}

RecordReader rawIn =                  // open input

job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);

RecordReader in = isSkipping() ?

new SkippingRecordReader(rawIn, getCounters(), umbilical) :

new TrackedRecordReader(rawIn, getCounters());

job.setBoolean("mapred.skip.on", isSkipping());

  //对于map task,生成一个MapRunnable,默认是MapRunner

MapRunnable runner =

ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

try {

    //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。

runner.run(in, collector, reporter);     

collector.flush();

} finally {

in.close();                               // close input

collector.close();

}

done(umbilical);

}

 

MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,

Reporter reporter)

throws IOException {

try {

K1 key = input.createKey();

V1 value = input.createValue();

while (input.next(key, value)) {

mapper.map(key, value, output, reporter);

if(incrProcCount) {

reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,

SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);

}

}

} finally {

mapper.close();

}

}

 

结果集全部收集到MapOutputBuffer中,其collect函数如下:

public synchronized void collect(K key, V value)

throws IOException {

reporter.progress();

……

  //从此处看,此buffer是一个ring的数据结构

final int kvnext = (kvindex + 1) % kvoffsets.length;

spillLock.lock();

try {

boolean kvfull;

do {

      //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了

kvfull = kvnext == kvstart;

      //在ring中计算是否需要将buffer写入硬盘的阈值

final boolean kvsoftlimit = ((kvnext > kvend)

? kvnext - kvend > softRecordLimit

: kvend - kvnext <= kvoffsets.length - softRecordLimit);

      //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。

      //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘

if (kvstart == kvend && kvsoftlimit) {

startSpill();

}

      //如果buffer满了,则只能等待写入完毕

if (kvfull) {

while (kvstart != kvend) {

reporter.progress();

spillDone.await();

}

}

} while (kvfull);

} finally {

spillLock.unlock();

}

try {

    //如果buffer不满,则将key, value写入buffer

int keystart = bufindex;

keySerializer.serialize(key);

final int valstart = bufindex;

valSerializer.serialize(value);

int valend = bb.markRecord();

    //调用设定的partitioner,根据key, value取得partition id

final int partition = partitioner.getPartition(key, value, partitions);

mapOutputRecordCounter.increment(1);

mapOutputByteCounter.increment(valend >= keystart

? valend - keystart

: (bufvoid - keystart) + valend);

    //将parition id以及key, value在buffer中的偏移量写入索引数组

int ind = kvindex * ACCTSIZE;

kvoffsets[kvindex] = ind;

kvindices[ind + PARTITION] = partition;

kvindices[ind + KEYSTART] = keystart;

kvindices[ind + VALSTART] = valstart;

kvindex = kvnext;

} catch (MapBufferTooSmallException e) {

LOG.info("Record too large for in-memory buffer: " + e.getMessage());

spillSingleRecord(key, value);

mapOutputRecordCounter.increment(1);

return;

}

}

 

内存buffer的格式如下:

(见几位Hadoop大侠的分析 以及)

clip_image001

kvoffsets是为了写入内存前排序使用的。

从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:

private void sortAndSpill() throws IOException {

……

FSDataOutputStream out = null;

FSDataOutputStream indexOut = null;

IFileOutputStream indexChecksumOut = null;

  //创建硬盘上的spill文件

Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),

numSpills, size);

out = rfs.create(filename);

……

final int endPosition = (kvend > kvstart)

? kvend

: kvoffsets.length + kvend;

  //按照partition的顺序对buffer中的数据进行排序

sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

int spindex = kvstart;

InMemValBytes value = new InMemValBytes();

  //依次一个一个parition的写入文件

for (int i = 0; i < partitions; ++i) {

IFile.Writer<K, V> writer = null;

long segmentStart = out.getPos();

writer = new Writer<K, V>(job, out, keyClass, valClass, codec);

    //如果combiner为空,则直接写入文件

if (null == combinerClass) {

……

writer.append(key, value);

++spindex;

}

else {

……

        //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件

combineAndSpill(kvIter, combineInputCounter);

}

}

……

}

 

当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:

private void mergeParts() throws IOException {

……

    //对于每一个partition

for (int parts = 0; parts < partitions; parts++){

//create the segments to be merged

List<Segment<K, V>> segmentList =

new ArrayList<Segment<K, V>>(numSpills);

TaskAttemptID mapId = getTaskID();

       //依次从各个spill文件中收集属于当前partition的段

for(int i = 0; i < numSpills; i++) {

final IndexRecord indexRecord =

getIndexInformation(mapId, i, parts);

long segmentOffset = indexRecord.startOffset;

long segmentLength = indexRecord.partLength;

Segment<K, V> s =

new Segment<K, V>(job, rfs, filename[i], segmentOffset,

segmentLength, codec, true);

segmentList.add(i, s);

}

      //将属于同一个partition的段merge到一起

RawKeyValueIterator kvIter =

Merger.merge(job, rfs,

keyClass, valClass,

segmentList, job.getInt("io.sort.factor", 100),

new Path(getTaskID().toString()),

job.getOutputKeyComparator(), reporter);

      //写入合并后的段到文件

long segmentStart = finalOut.getPos();

Writer<K, V> writer =

new Writer<K, V>(job, finalOut, keyClass, valClass, codec);

if (null == combinerClass || numSpills < minSpillsForCombine) {

Merger.writeFile(kvIter, writer, reporter, job);

} else {

combineCollector.setWriter(writer);

combineAndSpill(kvIter, combineInputCounter);

}

……

}

}

 
6.2、ReduceTask

ReduceTask的run函数如下:

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException {

job.setBoolean("mapred.skip.on", isSkipping());

  //对于reduce,则包含三个步骤:拷贝,排序,Reduce

if (isMapOrReduce()) {

copyPhase = getProgress().addPhase("copy");

sortPhase  = getProgress().addPhase("sort");

reducePhase = getProgress().addPhase("reduce");

}

startCommunicationThread(umbilical);

final Reporter reporter = getReporter(umbilical);

initialize(job, reporter);

  //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。

boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));

if (!isLocal) {

reduceCopier = new ReduceCopier(umbilical, job);

if (!reduceCopier.fetchOutputs()) {

……

}

}

copyPhase.complete();

  //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value

setPhase(TaskStatus.Phase.SORT);

statusUpdate(umbilical);

final FileSystem rfs = FileSystem.getLocal(job).getRaw();

RawKeyValueIterator rIter = isLocal

? Merger.merge(job, rfs, job.getMapOutputKeyClass(),

job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),

!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),

new Path(getTaskID().toString()), job.getOutputKeyComparator(),

reporter)

: reduceCopier.createKVIterator(job, rfs, reporter);

mapOutputFilesOnDisk.clear();

sortPhase.complete();

  //reduce阶段

setPhase(TaskStatus.Phase.REDUCE);

……

Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);

Class keyClass = job.getMapOutputKeyClass();

Class valClass = job.getMapOutputValueClass();

ReduceValuesIterator values = isSkipping() ?

new SkippingReduceValuesIterator(rIter,

job.getOutputValueGroupingComparator(), keyClass, valClass,

job, reporter, umbilical) :

new ReduceValuesIterator(rIter,

job.getOutputValueGroupingComparator(), keyClass, valClass,

job, reporter);

  //逐个读出key-value list,然后调用Reducer的reduce函数

while (values.more()) {

reduceInputKeyCounter.increment(1);

reducer.reduce(values.getKey(), values, collector, reporter);

values.nextKey();

values.informReduceProgress();

}

reducer.close();

out.close(reporter);

done(umbilical);

}

 
七、总结

Map-Reduce的过程总结如下图:

Hadoop学习总结:Map

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppfzg.html