ParallelFPGrowthReducer的输入是<key=groupID, value={TransactionTree1, TransactionTree2, … , TransactionTreeN}>。setup方法获取了参数params,并且通过PFPGrowth.readFList(conf)方法获取了缓存文件flist,将频繁项存入featureReverseMap,将频繁项对应的支持度存入freqList。之前分析到ParallelFPGrowthMapper输出的TransactionTree其实是List<Pair<IntArrayList, Long>> transactionSet。在ParallelFPGrowthReducer内初始化了一个TransactionTree,虽然这个TransactionTree与之前的Transaction是同一个类,但是是一棵用二维数组实现的树。考虑到文章篇幅,建树的过程这里不作分析。假设已经建好了这棵树,cTree.generateFList方法遍历这棵树,返回Map<Integer, MutableLong> frequencyList。具体的遍历方法这里不作详细分析,提一下其调用过程:TransactionTree实现Iterator<Pair<IntArrayList, Long>>接口时重写了iterator方法,在generateFList方法中通过iterator方法生成一个迭代器来遍历整棵树。iterator方法返回的是TransactionTreeIterator对象。TransactionTreeIterator对象继承自AbstractIterator<Pair<IntArrayList, Long>>,实现了对TransactionTree进行遍历。localFList合并了generateFList的结果并按照支持度递减进行排序。生成频繁模式的���法有两种,用户可以自己选择来调用FPGrowthIds.generateTopKFrequentPatterns方法或者fpGrowth.generateTopKFrequentPatterns方法来生成频繁模式,本文将对后者进行分析。在ParallelFPGrowthReducer中还有一个IteratorAdapter类。它是设计模式中十分经典的适配器模式的具体应用,可以将两个不同类型的迭代器解耦。ParallelFPGrowthReducer的输出是<key=item, value={Top K Frequent Patterns}>。
1 /** 2 * takes each group of transactions and runs Vanilla FPGrowth on it and 3 * outputs the the Top K frequent Patterns for each group. 4 * 5 */ 6 public final class ParallelFPGrowthReducer extends Reducer<IntWritable,TransactionTree,Text,TopKStringPatterns> { 7 8 private final List<String> featureReverseMap = Lists.newArrayList(); 9 private final LongArrayList freqList = new LongArrayList(); 10 private int maxHeapSize = 50; 11 private int minSupport = 3; 12 private int numFeatures; 13 private int maxPerGroup; 14 private boolean useFP2; 15 16 private static final class IteratorAdapter implements Iterator<Pair<List<Integer>,Long>> { 17 private final Iterator<Pair<IntArrayList,Long>> innerIter; 18 19 private IteratorAdapter(Iterator<Pair<IntArrayList,Long>> transactionIter) { 20 innerIter = transactionIter; 21 } 22 23 @Override 24 public boolean hasNext() { 25 return innerIter.hasNext(); 26 } 27 28 @Override 29 public Pair<List<Integer>,Long> next() { 30 Pair<IntArrayList,Long> innerNext = innerIter.next(); 31 return new Pair<List<Integer>,Long>(innerNext.getFirst().toList(), innerNext.getSecond()); 32 } 33 34 @Override 35 public void remove() { 36 throw new UnsupportedOperationException(); 37 } 38 } 39 40 @Override 41 protected void reduce(IntWritable key, Iterable<TransactionTree> values, Context context) throws IOException { 42 TransactionTree cTree = new TransactionTree(); 43 for (TransactionTree tr : values) { 44 for (Pair<IntArrayList,Long> p : tr) { 45 cTree.addPattern(p.getFirst(), p.getSecond()); 46 } 47 } 48 49 List<Pair<Integer,Long>> localFList = Lists.newArrayList(); 50 for (Entry<Integer,MutableLong> fItem : cTree.generateFList().entrySet()) { 51 localFList.add(new Pair<Integer,Long>(fItem.getKey(), fItem.getValue().toLong())); 52 } 53 54 Collections.sort(localFList, new CountDescendingPairComparator<Integer,Long>()); 55 56 if (useFP2) { 57 FPGrowthIds.generateTopKFrequentPatterns( 58 cTree.iterator(), 59 freqList, 60 minSupport, 61 maxHeapSize, 62 PFPGrowth.getGroupMembers(key.get(), maxPerGroup, numFeatures), 63 new IntegerStringOutputConverter( 64 new ContextWriteOutputCollector<IntWritable, TransactionTree, Text, TopKStringPatterns>(context), 65 featureReverseMap), 66 new ContextStatusUpdater<IntWritable, TransactionTree, Text, TopKStringPatterns>(context)); 67 } else { 68 FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>(); 69 fpGrowth.generateTopKFrequentPatterns( 70 new IteratorAdapter(cTree.iterator()), 71 localFList, 72 minSupport, 73 maxHeapSize, 74 Sets.newHashSet(PFPGrowth.getGroupMembers(key.get(), 75 maxPerGroup, 76 numFeatures).toList()), 77 new IntegerStringOutputConverter( 78 new ContextWriteOutputCollector<IntWritable,TransactionTree,Text,TopKStringPatterns>(context), 79 featureReverseMap), 80 new ContextStatusUpdater<IntWritable,TransactionTree,Text,TopKStringPatterns>(context)); 81 } 82 } 83 84 @Override 85 protected void setup(Context context) throws IOException, InterruptedException { 86 87 super.setup(context); 88 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 89 90 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 91 featureReverseMap.add(e.getFirst()); 92 freqList.add(e.getSecond()); 93 } 94 95 maxHeapSize = Integer.valueOf(params.get(PFPGrowth.MAX_HEAPSIZE, "50")); 96 minSupport = Integer.valueOf(params.get(PFPGrowth.MIN_SUPPORT, "3")); 97 98 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 99 numFeatures = featureReverseMap.size(); 100 useFP2 = "true".equals(params.get(PFPGrowth.USE_FPG2)); 101 }
TransactionTree.java