真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
while (true) {
//从TaskTracker通过网络通信得到JvmTask对象
JvmTask myTask = umbilical.getTask(jvmId);
……
idleLoopCount = 0;
task = myTask.getTask();
taskid = task.getTaskID();
isCleanup = task.isTaskCleanupTask();
JobConf job = new JobConf(task.getJobFile());
TaskRunner.setupWorkDir(job);
numTasksToExecute = job.getNumTasksToExecutePerJvm();
task.setConf(job);
defaultConf.addResource(new Path(task.getJobFile()));
……
//运行task
task.run(job, umbilical); // run the task
if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
break;
}
}
6.1、MapTask
如果task是MapTask,则其run函数如下:
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
//用于同TaskTracker进行通信,汇报运行状况
final Reporter reporter = getReporter(umbilical);
startCommunicationThread(umbilical);
initialize(job, reporter);
……
//map task的输出
int numReduceTasks = conf.getNumReduceTasks();
MapOutputCollector collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
}
//读取input split,按照其中的信息,生成RecordReader来读取数据
instantiatedSplit = (InputSplit)
ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(split.getBytes(), 0, split.getLength());
instantiatedSplit.readFields(splitBuffer);
if (instantiatedSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) instantiatedSplit;
job.set("map.input.file", fileSplit.getPath().toString());
job.setLong("map.input.start", fileSplit.getStart());
job.setLong("map.input.length", fileSplit.getLength());
}
RecordReader rawIn = // open input
job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
RecordReader in = isSkipping() ?
new SkippingRecordReader(rawIn, getCounters(), umbilical) :
new TrackedRecordReader(rawIn, getCounters());
job.setBoolean("mapred.skip.on", isSkipping());
//对于map task,生成一个MapRunnable,默认是MapRunner
MapRunnable runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
//MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。
runner.run(in, collector, reporter);
collector.flush();
} finally {
in.close(); // close input
collector.close();
}
done(umbilical);
}
MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
}
}
} finally {
mapper.close();
}
}
结果集全部收集到MapOutputBuffer中,其collect函数如下:
public synchronized void collect(K key, V value)
throws IOException {
reporter.progress();
……
//从此处看,此buffer是一个ring的数据结构
final int kvnext = (kvindex + 1) % kvoffsets.length;
spillLock.lock();
try {
boolean kvfull;
do {
//在ring中,如果下一个空闲位置接上起始位置的话,则表示满了
kvfull = kvnext == kvstart;
//在ring中计算是否需要将buffer写入硬盘的阈值
final boolean kvsoftlimit = ((kvnext > kvend)
? kvnext - kvend > softRecordLimit
: kvend - kvnext <= kvoffsets.length - softRecordLimit);
//如果到达阈值,则开始将buffer写入硬盘,写成spill文件。
//startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘
if (kvstart == kvend && kvsoftlimit) {
startSpill();
}
//如果buffer满了,则只能等待写入完毕
if (kvfull) {
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
}
} while (kvfull);
} finally {
spillLock.unlock();
}
try {
//如果buffer不满,则将key, value写入buffer
int keystart = bufindex;
keySerializer.serialize(key);
final int valstart = bufindex;
valSerializer.serialize(value);
int valend = bb.markRecord();
//调用设定的partitioner,根据key, value取得partition id
final int partition = partitioner.getPartition(key, value, partitions);
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend >= keystart
? valend - keystart
: (bufvoid - keystart) + valend);
//将parition id以及key, value在buffer中的偏移量写入索引数组
int ind = kvindex * ACCTSIZE;
kvoffsets[kvindex] = ind;
kvindices[ind + PARTITION] = partition;
kvindices[ind + KEYSTART] = keystart;
kvindices[ind + VALSTART] = valstart;
kvindex = kvnext;
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value);
mapOutputRecordCounter.increment(1);
return;
}
}
内存buffer的格式如下:
(见几位Hadoop大侠的分析 以及)
kvoffsets是为了写入内存前排序使用的。
从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:
private void sortAndSpill() throws IOException {
……
FSDataOutputStream out = null;
FSDataOutputStream indexOut = null;
IFileOutputStream indexChecksumOut = null;
//创建硬盘上的spill文件
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
out = rfs.create(filename);
……
final int endPosition = (kvend > kvstart)
? kvend
: kvoffsets.length + kvend;
//按照partition的顺序对buffer中的数据进行排序
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
int spindex = kvstart;
InMemValBytes value = new InMemValBytes();
//依次一个一个parition的写入文件
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
long segmentStart = out.getPos();
writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
//如果combiner为空,则直接写入文件
if (null == combinerClass) {
……
writer.append(key, value);
++spindex;
}
else {
……
//如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件
combineAndSpill(kvIter, combineInputCounter);
}
}
……
}
当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
private void mergeParts() throws IOException {
……
//对于每一个partition
for (int parts = 0; parts < partitions; parts++){
//create the segments to be merged
List<Segment<K, V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
TaskAttemptID mapId = getTaskID();
//依次从各个spill文件中收集属于当前partition的段
for(int i = 0; i < numSpills; i++) {
final IndexRecord indexRecord =
getIndexInformation(mapId, i, parts);
long segmentOffset = indexRecord.startOffset;
long segmentLength = indexRecord.partLength;
Segment<K, V> s =
new Segment<K, V>(job, rfs, filename[i], segmentOffset,
segmentLength, codec, true);
segmentList.add(i, s);
}
//将属于同一个partition的段merge到一起
RawKeyValueIterator kvIter =
Merger.merge(job, rfs,
keyClass, valClass,
segmentList, job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()),
job.getOutputKeyComparator(), reporter);
//写入合并后的段到文件
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
if (null == combinerClass || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combineAndSpill(kvIter, combineInputCounter);
}
……
}
}
6.2、ReduceTask
ReduceTask的run函数如下:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
job.setBoolean("mapred.skip.on", isSkipping());
//对于reduce,则包含三个步骤:拷贝,排序,Reduce
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
startCommunicationThread(umbilical);
final Reporter reporter = getReporter(umbilical);
initialize(job, reporter);
//copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job);
if (!reduceCopier.fetchOutputs()) {
……
}
}
copyPhase.complete();
//sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter)
: reduceCopier.createKVIterator(job, rfs, reporter);
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//reduce阶段
setPhase(TaskStatus.Phase.REDUCE);
……
Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
Class keyClass = job.getMapOutputKeyClass();
Class valClass = job.getMapOutputValueClass();
ReduceValuesIterator values = isSkipping() ?
new SkippingReduceValuesIterator(rIter,
job.getOutputValueGroupingComparator(), keyClass, valClass,
job, reporter, umbilical) :
new ReduceValuesIterator(rIter,
job.getOutputValueGroupingComparator(), keyClass, valClass,
job, reporter);
//逐个读出key-value list,然后调用Reducer的reduce函数
while (values.more()) {
reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
values.nextKey();
values.informReduceProgress();
}
reducer.close();
out.close(reporter);
done(umbilical);
}
七、总结
Map-Reduce的过程总结如下图: