分析Hadoop自带WordCount例子的执行过程(4)

前面终于把命令行和Hadoop的配置类说完了,其实就是为了获取Hadoop的配置信息,在这些配置存在的环境下才能进行Tool的运行工作。

众所周之,Hadoop实现了Google的MapReduce算法,所以对于一个Hadoop的Tool必须实现Map函数和Reduce函数了,分别在处理数据的工作中进行映射和化简。

那么WordCount这个工具自然也要实现Map和Reduce函数了。

要知道,在WordCount中,定义了两个成员变量,如下所示:

     private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
 

IntWritable类是一个为整数可以进行写、可以进行比较而定义的,比如统计单词出现频率就是一个整数。

Text类是用来存储文本内容的,存储的文本内容经过了编码、解码等等操作。可以参考org.apache.hadoop.io.Text类来了解更多信息。

先看Map的实现,如下所示:

/**
   * MapClass是一个内部静态类。统计数据文件中每一行的单词。
   */
public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
   
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
   
    public void map(LongWritable key, Text value,
                    OutputCollector<Text, IntWritable> output,
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
}
 

StringTokenizer是将String line = value.toString();这个从文本中获取到的可能很长的不规范(带空格或者其他分隔符,这里默认就是空格作为分隔符的)的字符串进行处理,提取由空格作为分隔符的每个单词。

然后,word.set(itr.nextToken());将提取出来的单词设置到Text word中,看一下org.apache.hadoop.io.Text类的set()方法,如下所示:

   public void set(String string) {
    try {
      ByteBuffer bb = encode(string, true); // 将传进来的单词string进行编码后放到字节缓冲区ByteBuffer bb中
      bytes = bb.array(); // bytes是一个字节数组,是Text的成员
      length = bb.limit(); // length是单词字符串转化为字节后的长度,length是Text的成员
    }catch(CharacterCodingException e) {
      throw new RuntimeException("Should not have happened " + e.toString());
    }
}
 

上面map()方法中,OutputCollector<Text, IntWritable> output是一个输出收集器,因为执行一个Map任务需要输出中间结果的,以便于下一个步骤进行Reduce任务进行合并简化。

OutputCollector<Text, IntWritable>是一个接口,先看看这个接口的定义吧,非常简单:

package org.apache.hadoop.mapred;

import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;


/**
* 收集<key, value>对。用于Map-Reduce框架,可以被Mapper或者Reducer使用  
*/
public interface OutputCollector<K extends WritableComparable,
                                 V extends Writable> {

/** Adds a key/value pair to the output.
   *
   * @param key the key to collect.
   * @param value to value to collect.
   * @throws IOException
   */
void collect(K key, V value) throws IOException;
}

 

收集<key,value>对很好理解的,比如,读取文本文件中的一行,使用StringTokenizer提取后,假如得到一个单词“shirdrn”,要对其统计词频,并且是第一次出现这个单词,则<key,value>对<shridrn,1>就可以表示,然后继续读取;如果再次碰到“shirdrn”这个单词,依然如上面,<key,value>对就是<shridrn,1>。在执行Map任务的时候,<key,value>对可能存在重复,因为Map任务没有对它进行合并。

如果必要的话。可以使用Combiner在Map完成之后先进行一次中间结果的合并,对上面出现两次的“shirdrn”,合并后就是<shridrn,2>。注意了,这里只是一个Map任务,假如另一个Map任务也多次出现“shirdrn”这个单词,例如执行一个Combiner合并后变为<shridrn,4>。

使用Combiner合并后得到的结果仍然是一个中间结果,也就是说,对于某项任务(对应着多个Map子任务)执行完Map任务后,例如上面的两个,全部的中间结果中存在这样两个键值对:<shridrn,2>和<shridrn,4>,而我们的目的是要统计“shirdrn”的词频,期望得到的结果是<shridrn,6>,这就要执行Reduce任务了,Reduce任务输出的不是中间结果了,是最终结果,即有一个输出或者0个输出。

另外,执行Combiner进行中间结果的合并输出中间结果之前,可能需要进行一个排序操作,对Map任务执行的输出结果进行排序后在进行Combiner合并。

OutputCollector<Text, IntWritable>是一个接口,要想知道它的collect方法是如何进行收集数据的,需要看它的具体实现类了,先看一看它的两个具体实现类:

分析Hadoop自带WordCount例子的执行过程

首先,在MapTask类中定义了一个MapOutputCollector<K extends WritableComparable, V extends Writable>接口,它继承自OutputCollector<Text, IntWritable>接口,从而DirectMapOutputCollector<K extends WritableComparable,V extends Writable>类实现了MapOutputCollector<K, V>接口,MapOutputBuffer类也实现了MapOutputCollector接口。

其中,DirectMapOutputCollector类和MapOutputBuffer类都是OutputCollector接口的间接实现,但是在这两个具体实现类中,定义的collect()方法的功能是不同的,DirectMapOutputCollector类中的collect()方法实现非常简单而且直接:

     public void collect(K key, V value) throws IOException {
      this.out.write(key, value);
    }
 

这里主要是使用了RecordWriter接口的具体实现类LineRecordWriter的write()方法来完成单词(应该是Map任务执行生成的key)的收集,LineRecordWriter类是定义在org.apache.hadoop.mapred.TextOutputFormat类内容的一个静态类,看看write()方法是如何收集写入的:

public synchronized void write(K key, V value)
      throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(tab);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }

 

其实非常容易,当key和value都不为空的时候才将它们收集写入,并且适当处理TAB和换行,上面的tab和newline在类中静态定义:

     static {
      try {
        tab = "\t".getBytes(utf8);
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }
 

现在明白多了,如果在看一看writeObject()方法的实现就更加明白了:

     private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
        Text to = (Text) o;
        out.write(to.getBytes(), 0, to.getLength());
      } else {
        out.write(o.toString().getBytes(utf8));
      }
    }
 

其实将一个<key,value>对设置到了一个Text���对象中。Text对象当然可以包含很多个<key,value>对了,只要使用指定的分隔符分割就行了。上面已经使用回车换行符了,具体样式是这样的:设置一个key,例如“shirdrn”,再设置一个value,例如6,然后在来一个回车换行符,多个的话,就形如下所示:

shirdrn      6
master       2
hear       4
 

在writeObject()方法中,还要进行输出呢,使用java.io.DataOutputStream.write()方法进行输出的,写到指定的文件系统或者缓存中,这里是中间结果应该是写入到缓存中,因为Map任务结束会立即执行Reduce任务来对中间结果进行合并输出。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/pxpfw.html