Hadoop大数据开发基础系列:四、MapReduce初级编程 (4)

Hadoop大数据开发基础系列:四、MapReduce初级编程

再来查看输出结果:

Hadoop大数据开发基础系列:四、MapReduce初级编程

打开文件可以看到:

Hadoop大数据开发基础系列:四、MapReduce初级编程

第一列是已经按照自然日期排好顺序,第二列是对应日期的总访问次数,任务基本完成。

4.编程实现按访问次数排序

前一部分完成了日期统计任务,本部分要对AccessCount中的数据按照访问次数进行排序,将排序后的结果存放在相同目录下的TimesSort中。

  4.1 分析思路与处理逻辑

MapReduce只会对键值进行排序,所以我们在Mapper模块中对于输入的键值对,把Key与Value位置互换,在Mapper输出后,键值对经过shuffle的处理,已经变成了按照访问次数排序的数据顺序啦,输出格式为<访问次数,日期>。Reducer的处理和Mapper恰好相反,将键和值的位置互换,输出格式变为<日期,访问次数>。

  4.2 编写核心模块代码

package test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class AccessTimesSort { // Mapper模块 public static class MyMapper extends Mapper<Object, Text, IntWritable,Text>{ public void map(Object key, Text value, Context context) //map函数的编写要根据读取的文件内容和业务逻辑来写 throws IOException, InterruptedException { String line = value.toString(); String array[] = line.split("\t");//指定,为分隔符,组成数组 int keyOutput = Integer.parseInt(array[1]);//提取数组中的访问次数作为Key String valueOutput = array[0]; //将日期作为value context.write(new IntWritable(keyOutput), new Text(valueOutput)); } } // Reducer模块 public static class MyReducer extends Reducer<IntWritable,Text,Text,IntWritable> {//注意与上面输出对应 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { context.write(val, key); //进行键值位置互换 } } } //Driver模块,主要是配置参数 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "AccessTimesSort"); job.setJarByClass(AccessTimesSort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < args.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(args[i])); } FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

    4.3 任务实现

(1)编译生成jar包

(2)将jar包上传到集群

(3)执行jar包内的AccessTimesSort类

hadoop jar NewDaily.jar test.AccessTimesSort /user/dftest/AccessCount /user/dftest/TimesSort

Hadoop大数据开发基础系列:四、MapReduce初级编程

(4)查看执行结果(由此可以看到结果为升序排列)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzpsww.html