在MapReduce中利用MultipleOutputs输出多个文件(2)


    public static void main(String[] args) throws Exception {
        args =new String[] {"hdfs://caozw:9100/user/hadoop/hadooprealword","hdfs://caozw:9100/user/hadoop/hadooprealword/output"};
        ToolRunner.run(new Configuration(), new NamedCountryOutputJob(), args);
    }

public int run(String[] args) throws Exception {
        if(args.length != 2) {
            System.err.println("Usage: named_output <input> <output>");
            System.exit(1);
        }

Job job = new Job(conf, "IP count by country to named files");
        job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(IPCountryMapper.class);
        job.setReducerClass(IPCountryReducer.class);

job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setJarByClass(NamedCountryOutputJob.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 1 : 0;

}

public void setConf(Configuration conf) {
        this.conf = conf;
    }

public Configuration getConf() {
        return conf;
    }

public static class IPCountryMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int country_pos = 1;
        private static final Pattern pattern = Pattern.compile("\\t");

@Override
        protected void map(LongWritable key, Text value,
                          Context context) throws IOException, InterruptedException {
            String country = pattern.split(value.toString())[country_pos];
            context.write(new Text(country), new IntWritable(1));
        }
    }

测试结果:

在Maprecue中利用MultipleOutputs输出多个文件

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/76ddccb68dadf36206d7756256c709ae.html