5). Step4.java,计算推荐结果列表
源代码:
package org.conan.myhadoop.recommend;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;
public class Step4 {
public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
private final static IntWritable k = new IntWritable();
private final static Text v = new Text();
private final static Map<Integer, List> cooccurrenceMatrix = new HashMap<Integer, List>();
@Override
public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
String[] v1 = tokens[0].split(":");
String[] v2 = tokens[1].split(":");
if (v1.length > 1) {// cooccurrence
int itemID1 = Integer.parseInt(v1[0]);
int itemID2 = Integer.parseInt(v1[1]);
int num = Integer.parseInt(tokens[1]);
List list = null;
if (!cooccurrenceMatrix.containsKey(itemID1)) {
list = new ArrayList();
} else {
list = cooccurrenceMatrix.get(itemID1);
}
list.add(new Cooccurrence(itemID1, itemID2, num));
cooccurrenceMatrix.put(itemID1, list);
}
if (v2.length > 1) {// userVector
int itemID = Integer.parseInt(tokens[0]);
int userID = Integer.parseInt(v2[0]);
double pref = Double.parseDouble(v2[1]);
k.set(userID);
for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
v.set(co.getItemID2() + "," + pref * co.getNum());
output.collect(k, v);
}
}
}
}
public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
private final static Text v = new Text();
@Override
public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
Map<String, Double> result = new HashMap<String, Double>();
while (values.hasNext()) {
String[] str = values.next().toString().split(",");
if (result.containsKey(str[0])) {
result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1]));
} else {
result.put(str[0], Double.parseDouble(str[1]));
}
}
Iterator iter = result.keySet().iterator();
while (iter.hasNext()) {
String itemID = iter.next();
double score = result.get(itemID);
v.set(itemID + "," + score);
output.collect(key, v);
}
}
}
public static void run(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input1 = path.get("Step4Input1");
String input2 = path.get("Step4Input2");
String output = path.get("Step4Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Step4_PartialMultiplyMapper.class);
conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class);
conf.setReducerClass(Step4_AggregateAndRecommendReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
}
class Cooccurrence {
private int itemID1;
private int itemID2;
private int num;
public Cooccurrence(int itemID1, int itemID2, int num) {
super();
this.itemID1 = itemID1;
this.itemID2 = itemID2;
this.num = num;
}
public int getItemID1() {
return itemID1;
}
public void setItemID1(int itemID1) {
this.itemID1 = itemID1;
}
public int getItemID2() {
return itemID2;
}
public void setItemID2(int itemID2) {
this.itemID2 = itemID2;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
计算结果:
~ hadoop fs -cat /user/hdfs/recommend/step4/part-00000
1
107,5.0
1
106,18.0
1
105,15.5
1
104,33.5
1
103,39.0
1
102,31.5
1
101,44.0
2
107,4.0
2
106,20.5
2
105,15.5
2
104,36.0
2
103,41.5
2
102,32.5
2
101,45.5
3
107,15.5
3
106,16.5
3
105,26.0
3
104,38.0
3
103,24.5
3
102,18.5
3
101,40.0
4
107,9.5
4
106,33.0
4
105,26.0
4
104,55.0
4
103,53.5
4
102,37.0
4
101,63.0
5
107,11.5
5
106,34.5
5
105,32.0
5
104,59.0
5
103,56.5
5
102,42.5
5
101,68.0
对Step4过程优化,请参考本文最后的补充内容。
6). HdfsDAO.java,HDFS操作工具类
详细解释,请参考文章:Hadoop编程调用HDFS