大数据技术 - 通俗理解MapReduce之WordCount(三)

上一章我们编写了简单的 MapReduce 程序,掌握这些就能编写大多数数据处理的代码。但是 MapReduce 框架提供给用户的能力并不止如此,本章我们仍然以上一章 word count 为例,继续完善我们的数据处理代码。本章主要关注的重点包括三个部分:

  1. 完整的 map / reduce 任务,完整的 map 任务除了 map 方法里的逻辑外,还包括任务运行前的准备工作以及任务结束后的清理工作,reduce 任务也一样

  2. Counter 的作用,有时候为了统计程序运行中任务的状态,比如:某个异常出现的次数,因此需要一个计数器进行统计并输出

  3. 给 MapReduce 任务传自定义配置,命令行可以实现传参数,但是参数比较多的情况下,命令行参数不好维护且不具备很好的可读性,最好能够使用 Hadoop 配置文件中的那种格式配置

在这里, 我们仍然用上一章 word count 中的 map 任务, 区别是我们可以通过自定义配置实现只统计某个单词出现的次数,同时增加了计数功能。下面看下如何在 map 任务中实现上面这三个内容。

package com.cnblogs.duma.mapreduce; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * WordCountMapper 继承 Mapper 类,需要指定4个泛型类型,分别是 * 输入 key 类型:本例中输入的 key 为每行文本的行号,例子中用不到所以这里是 Object * 输入 value 类型:本例中输入的 value 是每行文本,因此是Text * 输出 key 类型:map 输出的是每个单词,类型为 Text * 输出 value 类型:单词出现的次数,为 1,因此类型 IntWritable */ public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { /** * 把每个单词映射成 <word, 1> 的格式 */ private final static IntWritable one = new IntWritable(1); private Text outWord = new Text(); private String filterWord; /** * 每一个 map 进程调用一次 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); filterWord = conf.get("wordcount.filter.word", null); /** * 初始化工作, 比如连接数据库 */ } /** * 每个 map 方法处理一行数据 * @param key 输入的行号 * @param value 每一行文本 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); //空格分割一行中的每个单词 Counter counter = context.getCounter("group1", "counter1"); //第一个参数代表计数组,第二个参数代表计数名称 for (String word : words) { if (filterWord != null && !filterWord.equals(word)) //判断是否只统计过滤词 continue; counter.increment(1); // 计数 outWord.set(word); context.write(outWord, one); // map输出 } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { /** * 做清理工作, 比如释放数据库连接 */ } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzjsx.html