基于hadoop的图书推荐 (7)

增加文件:Step4_Update.java

package org.conan.myhadoop.recommend; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO; public class Step4_Update { public static class Step4_PartialMultiplyMapper extends Mapper { private String flag;// A同现矩阵 or B评分矩阵 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); flag = split.getPath().getParent().getName();// 判断读的数据集 // System.out.println(flag); } @Override public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { String[] tokens = Recommend.DELIMITER.split(values.toString()); if (flag.equals("step3_2")) {// 同现矩阵 String[] v1 = tokens[0].split(":"); String itemID1 = v1[0]; String itemID2 = v1[1]; String num = tokens[1]; Text k = new Text(itemID1); Text v = new Text("A:" + itemID2 + "," + num); context.write(k, v); // System.out.println(k.toString() + " " + v.toString()); } else if (flag.equals("step3_1")) {// 评分矩阵 String[] v2 = tokens[1].split(":"); String itemID = tokens[0]; String userID = v2[0]; String pref = v2[1]; Text k = new Text(itemID); Text v = new Text("B:" + userID + "," + pref); context.write(k, v); // System.out.println(k.toString() + " " + v.toString()); } } } public static class Step4_AggregateReducer extends Reducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println(key.toString() + ":"); Map mapA = new HashMap(); Map mapB = new HashMap(); for (Text line : values) { String val = line.toString(); System.out.println(val); if (val.startsWith("A:")) { String[] kv = Recommend.DELIMITER.split(val.substring(2)); mapA.put(kv[0], kv[1]); } else if (val.startsWith("B:")) { String[] kv = Recommend.DELIMITER.split(val.substring(2)); mapB.put(kv[0], kv[1]); } } double result = 0; Iterator iter = mapA.keySet().iterator(); while (iter.hasNext()) { String mapk = iter.next();// itemID int num = Integer.parseInt(mapA.get(mapk)); Iterator iterb = mapB.keySet().iterator(); while (iterb.hasNext()) { String mapkb = iterb.next();// userID double pref = Double.parseDouble(mapB.get(mapkb)); result = num * pref;// 矩阵乘法相乘计算 Text k = new Text(mapkb); Text v = new Text(mapk + "," + result); context.write(k, v); System.out.println(k.toString() + " " + v.toString()); } } } } public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException { JobConf conf = Recommend.config(); String input1 = path.get("Step5Input1"); String input2 = path.get("Step5Input2"); String output = path.get("Step5Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(output); Job job = new Job(conf); job.setJarByClass(Step4_Update.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Step4_Update.Step4_PartialMultiplyMapper.class); job.setReducerClass(Step4_Update.Step4_AggregateReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2)); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } }

增加文件:Step4_Update2.java

package org.conan.myhadoop.recommend; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO; public class Step4_Update2 { public static class Step4_RecommendMapper extends Mapper { @Override public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { String[] tokens = Recommend.DELIMITER.split(values.toString()); Text k = new Text(tokens[0]); Text v = new Text(tokens[1]+","+tokens[2]); context.write(k, v); } } public static class Step4_RecommendReducer extends Reducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println(key.toString() + ":"); Map map = new HashMap();// 结果 for (Text line : values) { System.out.println(line.toString()); String[] tokens = Recommend.DELIMITER.split(line.toString()); String itemID = tokens[0]; Double score = Double.parseDouble(tokens[1]); if (map.containsKey(itemID)) { map.put(itemID, map.get(itemID) + score);// 矩阵乘法求和计算 } else { map.put(itemID, score); } } Iterator iter = map.keySet().iterator(); while (iter.hasNext()) { String itemID = iter.next(); double score = map.get(itemID); Text v = new Text(itemID + "," + score); context.write(key, v); } } } public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException { JobConf conf = Recommend.config(); String input = path.get("Step6Input"); String output = path.get("Step6Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(output); Job job = new Job(conf); job.setJarByClass(Step4_Update2.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Step4_Update2.Step4_RecommendMapper.class); job.setReducerClass(Step4_Update2.Step4_RecommendReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgzxjf.html