Mahout源码分析:并行化FP(6)

  由于已经生成了fList,上一次MapReduce的输出结果已经没有用了,因此,saveFList方法首先删除了这些文件。之后,saveFList方法将flist写入到hdfs上。对于存储在hdfs上的文件,DistributedCache提供了缓存文件的功能,在Slave Node进行计算之前可将hdfs上的文件复制到这些节点上。

1 /** 2 * Serializes the fList and returns the string representation of the List 3 */ 4 public static void saveFList(Iterable<Pair<String,Long>> flist, Parameters params, Configuration conf) 5 throws IOException { 6 Path flistPath = new Path(params.get(OUTPUT), F_LIST); 7 FileSystem fs = FileSystem.get(flistPath.toUri(), conf); 8 flistPath = fs.makeQualified(flistPath); 9 HadoopUtil.delete(conf, flistPath); 10 SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class, LongWritable.class); 11 try { 12   for (Pair<String,Long> pair : flist) { 13 writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond())); 14 } 15 } finally { 16 writer.close(); 17 } 18 DistributedCache.addCacheFile(flistPath.toUri(), conf); 19 }  

  startParallelFPGrowth方法初始化了一个Job对象。该Job对象将调用ParallelFPGrowthMapper和ParallelFPGrowthReducer��实现Step3和Step4。

1 /** 2 * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards 3 */ 4 public static void startParallelFPGrowth(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 conf.set("mapred.compress.map.output", "true"); 8 conf.set("mapred.output.compression.type", "BLOCK"); 9 Path input = new Path(params.get(INPUT)); 10 Job job = new Job(conf, "PFP Growth Driver running over input" + input); 11 job.setJarByClass(PFPGrowth.class); 12 13 job.setMapOutputKeyClass(IntWritable.class); 14 job.setMapOutputValueClass(TransactionTree.class); 15 16 job.setOutputKeyClass(Text.class); 17 job.setOutputValueClass(TopKStringPatterns.class); 18 19 FileInputFormat.addInputPath(job, input); 20 Path outPath = new Path(params.get(OUTPUT), FPGROWTH); 21 FileOutputFormat.setOutputPath(job, outPath); 22 23 HadoopUtil.delete(conf, outPath); 24 25 job.setInputFormatClass(TextInputFormat.class); 26 job.setMapperClass(ParallelFPGrowthMapper.class); 27 job.setCombinerClass(ParallelFPGrowthCombiner.class); 28 job.setReducerClass(ParallelFPGrowthReducer.class); 29 job.setOutputFormatClass(SequenceFileOutputFormat.class); 30 31 boolean succeeded = job.waitForCompletion(true); 32 if (!succeeded) { 33  throw new IllegalStateException("Job failed!"); 34 } 35 } 

ParallelFPGrowthMapper.java

  ParallelFPGrowthMapper中的setup方法将在map方法之前被运行。setup方法中调用了readFList方法。注意这里的readFList方法与之前分析的readFList方法参数不一样,所以是两个完全不同的方法。这里的readFList方法通过HadoopUtil.getCachedFiles(conf)来获取缓存文件flist,将其存储到fMap,其中item作为fMap的键,item在flist中的位置序号作为fMap的值,例如flist中的第一个item,其在fMap中将是<key=item, value=0>。这样做的原因是之后将fMap分Q个group时需要用到这个位置序号。在map方法中,输入是字节偏移量和事务数据库中的某一行数据。根据用户指定的分隔符splitter来切分数据。为了过滤非频繁项,通过fMap.containsKey(item)方法来查找该项是否存在于fList中。若存在,将其所对应的位置序号加入到itemSet,否则,将其丢弃。itemArr复制itemSet中的数据,并按照位置序号递增进行排序,即按照支持度递减进行排序。之后的for循环从itemArr的最后一个元素向前遍历,如果其所对应的groupID不在groups中,那么将初始化TransactionTree,将itemArr[0],itemArr[1],…,itemArr[j]存入该TransactionTree中。groupID的计算非常简单,将位置序号除以maxPerGroup即可。TransactionTree实现了Writable和Iterable<Pair<IntArrayList, Long>>接口,初始化TransactionTree时,构造方法将参数赋值给TransactionTree中的数据成员List<Pair<IntArrayList, Long>> transactionSet。这里Pair对象存储的两个元素分别是位置序号列表和1。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/6097db1c5fd01edfd736d2c0dae61d62.html