Hadoop:mapreduce程序reduce输出控制

1,Hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法

[java]

public class LzoHandleLogMr extends Configured implements Tool {           static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {                              public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)                   throws IOException {               try {                   String[] sp = value.toString().split(",");                   output.collect(new Text(sp[0]), value);               }catch (Exception e) {                  e.printStackTrace();               }                  }             }       static class LzoHandleLogReducer  extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> {                            @Override           public void reduce(Text key, Iterator<Text> values,                   OutputCollector<Text, NullWritable> output, Reporter reporter)                   throws IOException {               while (values.hasNext()) {                     output.collect(values.next(), NullWritable.get());                     }                          }          }              public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable>           {                 @Override           protected String generateFileNameForKeyValue(Text key,                   NullWritable value, String name) {               String sp[] = key.toString().split(",");               String filename = sp[0];               if(sp[0].contains(".")) filename="000000000000";               return filename;           }                  }                    @Override       public int run(String[] args) throws Exception {                           JobConf jobconf = new JobConf(LzoHandleLogMr.class);               jobconf.setMapperClass(LzoHandleLogMapper.class);               jobconf.setReducerClass(LzoHandleLogReducer.class);               jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);               jobconf.setOutputKeyClass(Text.class);               jobconf.setNumReduceTasks(12);                                          FileInputFormat.setInputPaths(jobconf,new Path(args[0]));               FileOutputFormat.setOutputPath(jobconf,new Path(args[1]));               FileOutputFormat.setCompressOutput(jobconf, true);               FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class);                                JobClient.runJob(jobconf);             return 0;                      }   }  

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/560e462818f4132993e4df7b47a2ec31.html