Hadoop大数据开发基础系列:四、MapReduce初级编程 (5)

Hadoop大数据开发基础系列:四、MapReduce初级编程

此任务顺利完成 哈哈。

5.小结

本章介绍了MapReduce编程的基础知识,通过对Hadoop官方的示例代码的分析及解读,深入了解了MapReduce的执行过程。MapReduce把复杂的、运行在Hadoop集群上的并行计算过程集成到了两个模块——Mapper和Reducer上。开发人员只需要把业务处理逻辑通过其中的map函数和reduce函数来实现,就可以达到分布式并行编程的目的。

MapReduce执行过程主要包括以下几个部分:读取分布式文件系统的数据,进行数据分片,执行map任务以输出中间结果,shuffle阶段把中间结果进行汇合、排序,再传到Reduce任务,在Reduce阶段对数据进行处理,输出最终结果到分布式文件系统内。

6.实训

实训目的是,掌握MapReduce编程的基本方法,通过MapReduce编程来实现一些常用的数据处理方法,包括求最大值、去重等。

  实训1.获取成绩表的最高分记录

(1)需求说明:对于样例文件subject_score,即成绩表A。文件中的每一行数据包含两个字段:科目和分数。要求获得成绩列表中每个科目成绩最高的记录,并将结果输出到最高成绩表B。

表A的部分内容:

语文

 

96

 

数学

 

102

 

英语

 

130

 

物理

 

19

 

化学

 

44

 

生物

 

44

 

语文

 

109

 

数学

 

118

 

英语

 

141

 

要输出的表B结构:

化学

 

99

 

数学

 

149

 

物理

 

99

 

生物

 

99

 

英语

 

144

 

语文

 

114

 

(2)实现思路与步骤:

①在Mapper中,map函数读取成绩表A中的数据,直接将读取的数据以空格分隔,组成键值对<科目,成绩>,即设置输出键值对类型为<Text,IntWritable>。

②在Reducer中,由于map函数输出键值对类型是<Text,IntWritable>,所以在Reducer中接收的键值对类型就是<Text,Iterable<IntWritable>>。针对相同的键遍历它的值,找到最高值,最后输出的键值对为<科目,最高成绩>。

(3)实现及输出结果:

①代码实现:

package test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ScoreSorting { // Mapper模块 public static class MyMapper extends Mapper<LongWritable, Text, Text ,IntWritable>{ Text course=new Text(); IntWritable score=new IntWritable(); public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text ,IntWritable>.Context context) //map函数的编写要根据读取的文件内容和业务逻辑来写 throws IOException, InterruptedException { String line = value.toString(); String array[] = line.trim().split(" ");//trim函数去掉两边多余的空格,指定空格为分隔符,组成数组 course.set(array[0]);//第一列是科目 score.set(Integer.parseInt(array[1]));//第二列是分数 context.write(course, score); } } // Reducer模块 public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {//注意与上面输出对应 private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws IOException, InterruptedException { int maxscore=0; //初始化最大值 for (IntWritable score:values) { if(maxscore < score.get()) { maxscore=score.get(); //相同键内找最大值 } } result.set(maxscore); context.write(key, result); } } //Driver模块,主要是配置参数 public static void main(String[] args) throws Exception { //对有几个参数要有很强的敏感性,如果多可以用前面的遍历方式,如果少就可以直接指定。 if(args.length!=2) { System.err.println("ScoreSorting <input> <output>"); System.exit(-1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "ScoreSorting"); job.setJarByClass(ScoreSorting.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzpsww.html