public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
//显然这里的Mapper<Object,Text,Text,IntWritable>是范型,其实是
//Mapper<input_Key_Type,input_Value_Type,output_key_type,output_value_type>
//也就是借此规定map中用到的数据类型
//这几种类型除Object之外,其它是jdk中没有的,这是hadoop对它相应的jdk中数据类型的封装,
//这里的Text相当于jdk中的String,IntWritable相当于jdk的int类型,
//这样做的原因主要是为了hadoop的数据序化而做的。
private final static IntWritable one = new IntWritable(1);
//声时一个IntWritable变量,作计数用,每出现一个key,给其一个value=1的值
private Text word = new Text(); //用来暂存map输出中的key值,Text类型的,故有此声明
//这里就是map函数,也用到了范型,它是和Mapper抽象类中的相对应的,
//此处的Object key,Text value的类型和上边的Object,Text是相对应的,而且最好一样,
//不然的话,多数情况运行时会报错。
public void map(Object key, Text value, Context context) throws IOException,
InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//Hadoop读入的value是以行为单位的,其key为该行所对应的行号
//因为我们要计算每个单词的数目,默认以空格作为间隔,故用StringTokenizer辅助做一下字符串的拆分,
//也可以用string.split("")来作。
while (itr.hasMoreTokens()) {//遍历一下每行字符串中的单词,
word.set(itr.nextToken());//出现一个单词就给它设成一个key并将其值设为1
context.write(word, one);//输出设成的key/value值。
//以上就是打散的过程
}
}
}
public static class IntSumReducer //reduce所在的静态类
extends Reducer<Text,IntWritable,Text,IntWritable> {
//这里和Map中的作用是一样的,设定输入/输出的值的类型
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
//由于map的打散,这里会得到如,{key,values}={"hello",{1,1,1,1,1,1,....}},这样的集合
sum += val.get(); //这里需要逐一将它们的value取出来予以相加,取得总的出现次数,即为汇和
}
result.set(sum); //将values的和取得,并设成result对应的值
context.write(key, result);
//此时的key即为map打散之后输出的key,没有变化,
//变化的是result,以前得到的是一个数字的集合,此时已经//给算出和了,并做为key/value输出。
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //取得系统的参数
if (args.length != 2) {//判断一下命令行输入路径/输出路径是否齐全,即是否为两个参数
System.err.println("Usage: wordcount <in> <out>");
System.exit(2); //若非两个参数,即退出
}
//此程序的执行,在hadoop看来是一个Job,故进行初始化job操作
Job job = new Job(conf, "Word Count");
//配置作业名,此程序要执行WordCount.class这个字节码文件
job.setJarByClass(WordCount.class);
//配置作业的各个类
//此处设置了使用 TokenizerMapper 完成 Map 过程中的处理
//使用 IntSumReducer 完成 Combine 和 Reduce 过程中的处理。
//在这个job中,用TokenizerMapper这个类的map函数
job.setMapperClass(TokenizerMapper.class);
//在这个job中,用IntSumReducer这个类的reduce函数
job.setReducerClass(IntSumReducer.class);
//在reduce的输出时,key的输出类型为Text
job.setOutputKeyClass(Text.class);
//在reduce的输出时,value的输出类型为IntWritable
job.setOutputValueClass(IntWritable.class);
//任务的输出和输入路径则由命令行参数指定,并由 FileInputFormat 和 FileOutputFormat 分别设定
//初始化要计算word的文件的路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//初始化要计算word的文件的之后的结果的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));