1 /** 2 * maps each transaction to all unique items groups in the transaction. mapper 3 * outputs the group id as key and the transaction as value 4 * 5 */ 6 public class ParallelFPGrowthMapper extends Mapper<LongWritable,Text,IntWritable,TransactionTree> { 7 8 private final OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>(); 9 private Pattern splitter; 10 private int maxPerGroup; 11 private final IntWritable wGroupID = new IntWritable(); 12 13 @Override 14 protected void map(LongWritable offset, Text input, Context context) 15 throws IOException, InterruptedException { 16 17 String[] items = splitter.split(input.toString()); 18 19 OpenIntHashSet itemSet = new OpenIntHashSet(); 20 21 for (String item : items) { 22 if (fMap.containsKey(item) && !item.trim().isEmpty()) { 23 itemSet.add(fMap.get(item)); 24 } 25 } 26 27 IntArrayList itemArr = new IntArrayList(itemSet.size()); 28 itemSet.keys(itemArr); 29 itemArr.sort(); 30 31 OpenIntHashSet groups = new OpenIntHashSet(); 32 for (int j = itemArr.size() - 1; j >= 0; j--) { 33 // generate group dependent shards 34 int item = itemArr.get(j); 35 int groupID = PFPGrowth.getGroup(item, maxPerGroup); 36 37 if (!groups.contains(groupID)) { 38 IntArrayList tempItems = new IntArrayList(j + 1); 39 tempItems.addAllOfFromTo(itemArr, 0, j); 40 context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item); 41 wGroupID.set(groupID); 42 context.write(wGroupID, new TransactionTree(tempItems, 1L)); 43 } 44 groups.add(groupID); 45 } 46 47 } 48 49 @Override 50 protected void setup(Context context) throws IOException, InterruptedException { 51 super.setup(context); 52 53 int i = 0; 54 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 55 fMap.put(e.getFirst(), i++); 56 } 57 58 Parameters params = 59 new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 60 61 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, 62 PFPGrowth.SPLITTER.toString())); 63 64 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 65 } 66 }
ParallelFPGrowthReducer.java