社交网站用户的访问日期在格式上属于文本格式,访问次数为整型数值格式。其组成的键值对为<访问日期,访问次数>,因此Mapper的输出与Reducer的输出都选用Text类与IntWritable类。
(2)Mapper 类的逻辑实现
Mapper类中最主要的部分就是map函数。map函数的主要任务就是读取用户访问文件中的数据,输出所有访问日期与初始次数的键值对。因此访问日期是数据文件的第二列,所有先定义一个数组,再提取第二个元素,与初始次数1一起构成要输出的键值对,即<访问日期,1>。
(3)Reducer的逻辑实现
Reducer类中最主要的部分就是reduce函数。reduce的主要任务就是读取Mapper输出的键值对<访问日期,1>。这一部分与官网给出的WordCount中的Reducer完全相同。
3.2 编写核心模块代码
目录结构:

package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DailyAccessCount {
// Mapper模块
public static class MyMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) //map函数的编写要根据读取的文件内容和业务逻辑来写
throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split(",");//指定,为分隔符,组成数组
String keyOutput = array[1];//提取数组中的访问日期作为Key
context.write(new Text(keyOutput), one);//形成键值对
}
}
// Reducer模块
public static class MyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
//定义累加器,初始值为0
for (IntWritable val : values) {
sum += val.get();
//将相同键的所有值进行累加
}
result.set(sum);
context.write(key, result);
}
}
//Driver模块,主要是配置参数
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "DailyAccessCount");
job.setJarByClass(DailyAccessCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < args.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(args[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(args[args.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.3 任务实现
将文件编译生成JAR包文件,提交Hadoop集群执行




在运行过程中报错,原因是我在外面的JDK用的是1.9的,等级过高了(Linux系统的JDK是1.8的),所以要重新配置JDK。
记住,在用Windows环境下的JDK要和Hadoop集群环境下的JDK环境相同。
历史各个版本的JDK下载地址:
https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8-2177648.html
经过配置,终于可以运行啦,哈哈哈 o(* ̄︶ ̄*)o
hadoop jar NewDaily.jar test.NewDaily /user/dftest/user_login.txt /user/dftest/AccessCount
结果如下: