FPGrowthDriver是FPGrowth算法的驱动类,继承自AbstractJob类。运行Hadoop任务一般都是通过命令行中执行bin/hadoop脚本,同时传入一些参数。ToolRunner类中的GenericOptionsParser可获取这些命令行参数。AbstractJob类封装了addInputOption,addOutputOption,addOption,parseArguments等方法,为解析命令行参数提供了帮助。params对象存储了整个算法所需要的参数。FPGrowthDriver根据命令行参数,若顺序执行,则调用该文件内的runFPGrowth方法,若并行化执行,则调用PFPGrowth.java文件中的runPFPGrowth方法。
1 public final class FPGrowthDriver extends AbstractJob { 2 3 private static final Logger log = LoggerFactory.getLogger(FPGrowthDriver.class); 4 5 private FPGrowthDriver() { 6 } 7 8 public static void main(String[] args) throws Exception { 9 //ToolRunner的静态方法run()内有GenericOptionsParser。通过GenericOptionsParser.getRemainingArgs()可获取传入的命令行参数。之后,ToolRunner.run()将调用FPGrowthDriver.run()。 10 ToolRunner.run(new Configuration(), new FPGrowthDriver(), args); 11 } 12 13 /** 14 * Run TopK FPGrowth given the input file, 15 */ 16 @Override 17 public int run(String[] args) throws Exception { 18 addInputOption(); //添加默认的输入目录路径 19 addOutputOption(); //添加默认的输出目录路径 20 21 addOption("minSupport", "s", "(Optional) The minimum number of times a co-occurrence must be present." 22 + " Default Value: 3", "3"); //添加支持度阈值 23 addOption("maxHeapSize", "k", "(Optional) Maximum Heap Size k, to denote the requirement to mine top K items." 24 + " Default value: 50", "50"); //添加大根堆的大小 25 addOption("numGroups", "g", "(Optional) Number of groups the features should be divided in the map-reduce version." 26 + " Doesn't work in sequential version Default Value:" + PFPGrowth.NUM_GROUPS_DEFAULT, 27 Integer.toString(PFPGrowth.NUM_GROUPS_DEFAULT)); //添加组数g 28 addOption("splitterPattern", "regex", "Regular Expression pattern used to split given string transaction into" 29 + " itemsets. Default value splits comma separated itemsets. Default Value:" 30 + " \"[ ,\\t]*[,|\\t][ ,\\t]*\" ", "[ ,\t]*[,|\t][ ,\t]*"); //添加分隔符 31 addOption("numTreeCacheEntries", "tc", "(Optional) Number of entries in the tree cache to prevent duplicate" 32 + " tree building. (Warning) a first level conditional FP-Tree might consume a lot of memory, " 33 + "so keep this value small, but big enough to prevent duplicate tree building. " 34 + "Default Value:5 Recommended Values: [5-10]", "5"); 35 addOption("method", "method", "Method of processing: sequential|mapreduce", "sequential"); //添加训练方法,顺序执行或并行执行 36 addOption("encoding", "e", "(Optional) The file encoding. Default value: UTF-8", "UTF-8"); //添加编码方式 37 addFlag("useFPG2", "2", "Use an alternate FPG implementation"); 38 39 //如果解析命令行参数失败,则退出 40 if (parseArguments(args) == null) { 41 return -1; 42 } 43 44 Parameters params = new Parameters(); 45 46 if (hasOption("minSupport")) { 47 String minSupportString = getOption("minSupport"); 48 params.set("minSupport", minSupportString); 49 } 50 if (hasOption("maxHeapSize")) { 51 String maxHeapSizeString = getOption("maxHeapSize"); 52 params.set("maxHeapSize", maxHeapSizeString); 53 } 54 if (hasOption("numGroups")) { 55 String numGroupsString = getOption("numGroups"); 56 params.set("numGroups", numGroupsString); 57 } 58 59 if (hasOption("numTreeCacheEntries")) { 60 String numTreeCacheString = getOption("numTreeCacheEntries"); 61 params.set("treeCacheSize", numTreeCacheString); 62 } 63 64 if (hasOption("splitterPattern")) { 65 String patternString = getOption("splitterPattern"); 66 params.set("splitPattern", patternString); 67 } 68 69 String encoding = "UTF-8"; 70 if (hasOption("encoding")) { 71 encoding = getOption("encoding"); 72 } 73 params.set("encoding", encoding); 74 75 if (hasOption("useFPG2")) { 76 params.set(PFPGrowth.USE_FPG2, "true"); 77 } 78 79 Path inputDir = getInputPath(); 80 Path outputDir = getOutputPath(); 81 82 params.set("input", inputDir.toString()); 83 params.set("output", outputDir.toString()); 84 85 String classificationMethod = getOption("method"); 86 if ("sequential".equalsIgnoreCase(classificationMethod)) { 87 runFPGrowth(params); 88 } else if ("mapreduce".equalsIgnoreCase(classificationMethod)) { 89 Configuration conf = new Configuration(); 90 HadoopUtil.delete(conf, outputDir); 91 PFPGrowth.runPFPGrowth(params); 92 } 93 94 return 0; 95 }
PFPGrowth.java