Hadoop大数据开发基础系列:四、MapReduce初级编程 (3)

社交网站用户的访问日期在格式上属于文本格式,访问次数为整型数值格式。其组成的键值对为<访问日期,访问次数>,因此Mapper的输出与Reducer的输出都选用Text类与IntWritable类。

(2)Mapper 类的逻辑实现

Mapper类中最主要的部分就是map函数。map函数的主要任务就是读取用户访问文件中的数据,输出所有访问日期与初始次数的键值对。因此访问日期是数据文件的第二列,所有先定义一个数组,再提取第二个元素,与初始次数1一起构成要输出的键值对,即<访问日期,1>。

(3)Reducer的逻辑实现

Reducer类中最主要的部分就是reduce函数。reduce的主要任务就是读取Mapper输出的键值对<访问日期,1>。这一部分与官网给出的WordCount中的Reducer完全相同。

3.2 编写核心模块代码

目录结构:

Hadoop大数据开发基础系列:四、MapReduce初级编程

package test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DailyAccessCount { // Mapper模块 public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) //map函数的编写要根据读取的文件内容和业务逻辑来写 throws IOException, InterruptedException { String line = value.toString(); String array[] = line.split(",");//指定,为分隔符,组成数组 String keyOutput = array[1];//提取数组中的访问日期作为Key context.write(new Text(keyOutput), one);//形成键值对 } } // Reducer模块 public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //定义累加器,初始值为0 for (IntWritable val : values) { sum += val.get(); //将相同键的所有值进行累加 } result.set(sum); context.write(key, result); } } //Driver模块,主要是配置参数 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "DailyAccessCount"); job.setJarByClass(DailyAccessCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < args.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(args[i])); } FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

  3.3 任务实现

将文件编译生成JAR包文件,提交Hadoop集群执行

Hadoop大数据开发基础系列:四、MapReduce初级编程

Hadoop大数据开发基础系列:四、MapReduce初级编程

Hadoop大数据开发基础系列:四、MapReduce初级编程

Hadoop大数据开发基础系列:四、MapReduce初级编程

在运行过程中报错,原因是我在外面的JDK用的是1.9的,等级过高了(Linux系统的JDK是1.8的),所以要重新配置JDK。

记住,在用Windows环境下的JDK要和Hadoop集群环境下的JDK环境相同。

历史各个版本的JDK下载地址: 

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8-2177648.html

经过配置,终于可以运行啦,哈哈哈 o(* ̄︶ ̄*)o

hadoop jar NewDaily.jar test.NewDaily /user/dftest/user_login.txt /user/dftest/AccessCount

结果如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzpsww.html