社交网站用户的访问日期在格式上属于文本格式,访问次数为整型数值格式。其组成的键值对为<访问日期,访问次数>,因此Mapper的输出与Reducer的输出都选用Text类与IntWritable类。
(2)Mapper 类的逻辑实现
Mapper类中最主要的部分就是map函数。map函数的主要任务就是读取用户访问文件中的数据,输出所有访问日期与初始次数的键值对。因此访问日期是数据文件的第二列,所有先定义一个数组,再提取第二个元素,与初始次数1一起构成要输出的键值对,即<访问日期,1>。
(3)Reducer的逻辑实现
Reducer类中最主要的部分就是reduce函数。reduce的主要任务就是读取Mapper输出的键值对<访问日期,1>。这一部分与官网给出的WordCount中的Reducer完全相同。
3.2 编写核心模块代码
目录结构:
package test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DailyAccessCount { // Mapper模块 public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) //map函数的编写要根据读取的文件内容和业务逻辑来写 throws IOException, InterruptedException { String line = value.toString(); String array[] = line.split(",");//指定,为分隔符,组成数组 String keyOutput = array[1];//提取数组中的访问日期作为Key context.write(new Text(keyOutput), one);//形成键值对 } } // Reducer模块 public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //定义累加器,初始值为0 for (IntWritable val : values) { sum += val.get(); //将相同键的所有值进行累加 } result.set(sum); context.write(key, result); } } //Driver模块,主要是配置参数 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "DailyAccessCount"); job.setJarByClass(DailyAccessCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < args.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(args[i])); } FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }3.3 任务实现
将文件编译生成JAR包文件,提交Hadoop集群执行
在运行过程中报错,原因是我在外面的JDK用的是1.9的,等级过高了(Linux系统的JDK是1.8的),所以要重新配置JDK。
记住,在用Windows环境下的JDK要和Hadoop集群环境下的JDK环境相同。
历史各个版本的JDK下载地址:
https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8-2177648.html
经过配置,终于可以运行啦,哈哈哈 o(* ̄︶ ̄*)o
hadoop jar NewDaily.jar test.NewDaily /user/dftest/user_login.txt /user/dftest/AccessCount结果如下: