Mahout源码分析:并行化FP(3)

  PFPGrowth是并行化FP-Growth算法的驱动类。runPFPGrowth(params)方法内初始化了一个Configuration对象,之后调用runPFPGrowth(params, conf)方法。runPFPGrowth(params, conf)方法包括了并行化FP-Growth算法的五个关键步骤。其中,startParallelCounting(params, conf)对应Step1和Step2,通过类似WordCount的方法统计每一项的支持度,其输出结果将被readFList()和saveList()用于生成FList。之后,将按照用户输入的命令行参数NUM_GROUPS来计算每一个group所含项的个数,并将其存储到params。startParallelFPGrowth(params, conf)对应Step3和Step4。startAggregating(params, conf)对应Step5。

1 public static void runPFPGrowth(Parameters params, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException { 2 conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); 3 4 startParallelCounting(params, conf); //对应Step1和Step2 5 6 // save feature list to dcache 7 List<Pair<String,Long>> fList = readFList(params); 8 saveFList(fList, params, conf); 9 10 // set param to control group size in MR jobs 11 int numGroups = params.getInt(NUM_GROUPS, NUM_GROUPS_DEFAULT); 12 int maxPerGroup = fList.size() / numGroups; 13 if (fList.size() % numGroups != 0) { 14 maxPerGroup++; 15 } 16 params.set(MAX_PER_GROUP, Integer.toString(maxPerGroup)); 17 18 startParallelFPGrowth(params, conf); //对应Step3和Step4 19 20 startAggregating(params, conf); //对应Step5 21 }   

  startParallelCounting方法初始化了一个Job对象。该Job对象将调用ParallelCountingMapper和ParallelCountingReducer来完成支持度的统计。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/6097db1c5fd01edfd736d2c0dae61d62.html