ParallelCountingReducer中reduce方法的输入是<key=item, value={one, one, ... , one}>。所有key=item的键值对将被分配到一台机器上,所以只需要对values进行遍历求和就可以求出该item的支持度。
1 public class ParallelCountingReducer extends Reducer<Text,LongWritable,Text,LongWritable> { 2 3 @Override 4 protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, 5 InterruptedException { 6 long sum = 0; 7 for (LongWritable value : values) { 8 context.setStatus("Parallel Counting Reducer :" + key); 9 sum += value.get(); 10 } 11 context.setStatus("Parallel Counting Reducer: " + key + " => " + sum); 12 context.write(key, new LongWritable(sum)); 13 14 } 15 }
PFPGrowth.java
通过params中的OUTPUT参数可以获取ParallelCountingReducer的输出路径。在readFList这个方法中用到了几个数据结构。Pair实现了Comparable接口和Serializable接口,其数据成员first和second分别用来表示item和item所对应的支持度。PriorityQueue是一个用平衡二叉树实现的小顶堆,如果指定了Comparator,将按照Comparator对PriorityQueue中的元素进行排序,如果未指定Comparator,则将按照元素实现的Comparable接口进行排序。在并行化FP-Growth算法中,初始化PriorityQueue时指定了Comparator,其按照Pair的第一个元素进行排序,如果第一个元素相等,则按照第二个元素进行排序。通过初始化SequenceFileDirIterable来遍历上一次MapReduce输出的结果,每次将Pair添加到PriorityQueue的同时完成排序。最后,逐一将PriorityQueue中的元素取出放入fList。因此,fList是一个按照支持度递减的列表。
1 /** 2 * read the feature frequency List which is built at the end of the Parallel counting job 3 * 4 * @return Feature Frequency List 5 */ 6 public static List<Pair<String,Long>> readFList(Parameters params) { 7 int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3")); 8 Configuration conf = new Configuration(); 9 10 Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 11 12 PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11, 13 new Comparator<Pair<String,Long>>() { 14 @Override 15 public int compare(Pair<String,Long> o1, Pair<String,Long> o2) { 16 int ret = o2.getSecond().compareTo(o1.getSecond()); 17 if (ret != 0) { 18 return ret; 19 } 20 return o1.getFirst().compareTo(o2.getFirst()); 21 } 22 }); 23 24 for (Pair<Text,LongWritable> record 25 : new SequenceFileDirIterable<Text,LongWritable>(new Path(parallelCountingPath, FILE_PATTERN), 26 PathType.GLOB, null, null, true, conf)) { 27 long value = record.getSecond().get(); 28 if (value >= minSupport) { 29 queue.add(new Pair<String,Long>(record.getFirst().toString(), value)); 30 } 31 } 32 List<Pair<String,Long>> fList = Lists.newArrayList(); 33 while (!queue.isEmpty()) { 34 fList.add(queue.poll()); 35 } 36 return fList; 37 }