入门Hadoop的WordCount程序(3)

public static class TokenizerMapper
      extends Mapper<Object, Text, Text, IntWritable>{
//显然这里的Mapper<Object,Text,Text,IntWritable>是范型,其实是
  //Mapper<input_Key_Type,input_Value_Type,output_key_type,output_value_type>
  //也就是借此规定map中用到的数据类型
//这几种类型除Object之外,其它是jdk中没有的,这是hadoop对它相应的jdk中数据类型的封装,
//这里的Text相当于jdk中的String,IntWritable相当于jdk的int类型,
//这样做的原因主要是为了hadoop的数据序化而做的。 
    private final static IntWritable one = new IntWritable(1);
 //声时一个IntWritable变量,作计数用,每出现一个key,给其一个value=1的值
    private Text word = new Text();  //用来暂存map输出中的key值,Text类型的,故有此声明

//这里就是map函数,也用到了范型,它是和Mapper抽象类中的相对应的,
 //此处的Object key,Text value的类型和上边的Object,Text是相对应的,而且最好一样,
 //不然的话,多数情况运行时会报错。
    public void map(Object key, Text value, Context context) throws IOException,
 InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
  //Hadoop读入的value是以行为单位的,其key为该行所对应的行号
//因为我们要计算每个单词的数目,默认以空格作为间隔,故用StringTokenizer辅助做一下字符串的拆分,
//也可以用string.split("")来作。
      while (itr.hasMoreTokens()) {//遍历一下每行字符串中的单词,
        word.set(itr.nextToken());//出现一个单词就给它设成一个key并将其值设为1
        context.write(word, one);//输出设成的key/value值。
//以上就是打散的过程
      }
    }
  }

public static class IntSumReducer  //reduce所在的静态类
      extends Reducer<Text,IntWritable,Text,IntWritable> {
    //这里和Map中的作用是一样的,设定输入/输出的值的类型
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values,
                      Context context
                      ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
    //由于map的打散,这里会得到如,{key,values}={"hello",{1,1,1,1,1,1,....}},这样的集合
        sum += val.get(); //这里需要逐一将它们的value取出来予以相加,取得总的出现次数,即为汇和
      }
      result.set(sum);                  //将values的和取得,并设成result对应的值
      context.write(key, result); 
  //此时的key即为map打散之后输出的key,没有变化,
  //变化的是result,以前得到的是一个数字的集合,此时已经//给算出和了,并做为key/value输出。
    }
  }

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration(); //取得系统的参数
    if (args.length != 2) {//判断一下命令行输入路径/输出路径是否齐全,即是否为两个参数
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);  //若非两个参数,即退出
    }

//此程序的执行,在hadoop看来是一个Job,故进行初始化job操作
    Job job = new Job(conf, "Word Count");

//配置作业名,此程序要执行WordCount.class这个字节码文件
    job.setJarByClass(WordCount.class);

//配置作业的各个类
 //此处设置了使用 TokenizerMapper 完成 Map 过程中的处理
 //使用 IntSumReducer 完成 Combine 和 Reduce 过程中的处理。
 //在这个job中,用TokenizerMapper这个类的map函数
    job.setMapperClass(TokenizerMapper.class);

//在这个job中,用IntSumReducer这个类的reduce函数
    job.setReducerClass(IntSumReducer.class);

//在reduce的输出时,key的输出类型为Text
    job.setOutputKeyClass(Text.class);

//在reduce的输出时,value的输出类型为IntWritable
    job.setOutputValueClass(IntWritable.class);

//任务的输出和输入路径则由命令行参数指定,并由 FileInputFormat 和 FileOutputFormat 分别设定

//初始化要计算word的文件的路径
    FileInputFormat.addInputPath(job, new Path(args[0]));

//初始化要计算word的文件的之后的结果的输出路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/18144f6b2c477466d0b3bf6f1cc73cfd.html