MapReduce剥洋葱 (2)

  MapReduce本质上就是方法三,但是如何拆分文件集,如何copy程序,如何整合结果这些都是框架定义好的。我们只要定义好这个任务(用户程序),其它都交给MapReduce。


map函数和reduce函数  


map函数和reduce函数是交给用户实现的,这两个函数定义了任务本身。 

  map函数:接受一个键值对(key-value pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。 

  reduce函数:接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)

 

 

MapReduce实现wordcount

 

 

 

 

 

编程实现wordcount

 

/*

 * KEYIN:输入kv数据对中key的数据类型

 * VALUEIN:输入kv数据对中value的数据类型

 * KEYOUT:输出kv数据对中key的数据类型

 * VALUEOUT:输出kv数据对中value的数据类型

 */

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

        

         /*

          * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法

          * map task在调用map方法时,传递的参数:

          *             一行的起始偏移量LongWritable作为key

          *             一行的文本内容Text作为value

          */

         @Override

         protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

                   //拿到一行文本内容,转换成String 类型

                   String line = value.toString();

                   //将这行文本切分成单词

                   String[] words=line.split(" ");

                  

                   //输出<单词,1>

                   for(String word:words){

                            context.write(new Text(word), new IntWritable(1));

                   }

         }

}

 

/*

 * KEYIN:对应mapper阶段输出的key类型

 * VALUEIN:对应mapper阶段输出的value类型

 * KEYOUT:reduce处理完之后输出的结果kv对中key的类型

 * VALUEOUT:reduce处理完之后输出的结果kv对中value的类型

 */

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

         @Override

         /*

          * reduce方法提供给reduce task进程来调用

          *

          * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组

          * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法

          * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1>

          *  hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理

          *  调用时传递的参数:

          *                     key:一组kv中的key

          *                     values:一组kv中所有value的迭代器

          */

         protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

                   //定义一个计数器

                   int count = 0;

                   //通过value这个迭代器,遍历这一组kv中所有的value,进行累加

                   for(IntWritable value:values){

                            count+=value.get();

                   }

                  

                   //输出这个单词的统计结果

                   context.write(key, new IntWritable(count));

         }

}

 

 

public class WordCountJobSubmitter {

        

         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

                   Configuration conf = new Configuration();

                   Job wordCountJob = Job.getInstance(conf);

                  

                   //重要:指定本job所在的jar包

                   wordCountJob.setJarByClass(WordCountJobSubmitter.class);

                  

                   //设置wordCountJob所用的mapper逻辑类为哪个类

                   wordCountJob.setMapperClass(WordCountMapper.class);

                   //设置wordCountJob所用的reducer逻辑类为哪个类

                   wordCountJob.setReducerClass(WordCountReducer.class);

                  

                   //设置map阶段输出的kv数据类型

                   wordCountJob.setMapOutputKeyClass(Text.class);

                   wordCountJob.setMapOutputValueClass(IntWritable.class);

                  

                   //设置最终输出的kv数据类型

                   wordCountJob.setOutputKeyClass(Text.class);

                   wordCountJob.setOutputValueClass(IntWritable.class);

                  

                   //设置要处理的文本数据所存放的路径

                   FileInputFormat.setInputPaths(wordCountJob, "hdfs://192.168.77.70:9000/wordcount/srcdata/");

                   FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://192.168.77.70:9000/wordcount/output/"));

                  

                   //提交job给hadoop集群

                   wordCountJob.waitForCompletion(true);

         }

}

 

 

 

 

三、 MapReduce过程详解

 

 

MapReduce过程:

Input --》 Map --》 shuffle --》 Reduce  --》 Output

 

 

 

 

 

MapReduce中,分片、分区、排序和分组(Group)的关系图:

 

 

分片大小

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydsjp.html