public static void main(String[] args) throws Exception {
args =new String[] {"hdfs://caozw:9100/user/hadoop/hadooprealword","hdfs://caozw:9100/user/hadoop/hadooprealword/output"};
ToolRunner.run(new Configuration(), new NamedCountryOutputJob(), args);
}
public int run(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: named_output <input> <output>");
System.exit(1);
}
Job job = new Job(conf, "IP count by country to named files");
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(IPCountryMapper.class);
job.setReducerClass(IPCountryReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setJarByClass(NamedCountryOutputJob.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 1 : 0;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return conf;
}
public static class IPCountryMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int country_pos = 1;
private static final Pattern pattern = Pattern.compile("\\t");
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {
String country = pattern.split(value.toString())[country_pos];
context.write(new Text(country), new IntWritable(1));
}
}
测试结果: