1 /** 2 * Count the frequencies of various features in parallel using Map/Reduce 3 */ 4 public static void startParallelCounting(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 8 conf.set("mapred.compress.map.output", "true"); 9 conf.set("mapred.output.compression.type", "BLOCK"); 10 11 String input = params.get(INPUT); 12 Job job = new Job(conf, "Parallel Counting Driver running over input: " + input); 13 job.setJarByClass(PFPGrowth.class); 14 15 job.setOutputKeyClass(Text.class); 16 job.setOutputValueClass(LongWritable.class); 17 18 FileInputFormat.addInputPath(job, new Path(input)); 19 Path outPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 20 FileOutputFormat.setOutputPath(job, outPath); 21 22 HadoopUtil.delete(conf, outPath); 23 24 job.setInputFormatClass(TextInputFormat.class); 25 job.setMapperClass(ParallelCountingMapper.class); 26 job.setCombinerClass(ParallelCountingReducer.class); 27 job.setReducerClass(ParallelCountingReducer.class); 28 job.setOutputFormatClass(SequenceFileOutputFormat.class); 29 30 boolean succeeded = job.waitForCompletion(true); 31 if (!succeeded) { 32 throw new IllegalStateException("Job failed!"); 33 } 34 35 }
ParallelCountingMapper.java
ParallelCountingMapper中map方法的输入分别是字节偏移量offset和事务数据库中的���一行数据input。所有input数据中多次出现的项都被视为出现一次,所以将input数据split之后存储到HashSet中。map方法的输出是<key=item, value=one>。
1 public class ParallelCountingMapper extends Mapper<LongWritable,Text,Text,LongWritable> { 2 3 private static final LongWritable ONE = new LongWritable(1); 4 5 private Pattern splitter; 6 7 @Override 8 protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException { 9 10 String[] items = splitter.split(input.toString()); 11 Set<String> uniqueItems = Sets.newHashSet(Arrays.asList(items)); 12 for (String item : uniqueItems) { 13 if (item.trim().isEmpty()) { 14 continue; 15 } 16 context.setStatus("Parallel Counting Mapper: " + item); 17 context.write(new Text(item), ONE); 18 } 19 } 20 21 @Override 22 protected void setup(Context context) throws IOException, InterruptedException { 23 super.setup(context); 24 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 25 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString())); 26 } 27 }
ParallelCountingReducer.java