基于hadoop的图书推荐 (3)

计算结果:

~ hadoop fs -cat /user/hdfs/recommend/step1/part-00000 1 102:3.0,103:2.5,101:5.0 2 101:2.0,102:2.5,103:5.0,104:2.0 3 107:5.0,101:2.0,104:4.0,105:4.5 4 101:5.0,103:3.0,104:4.5,106:4.0 5 101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0

3). Step2.java,对物品组合列表进行计数,建立物品的同现矩阵
源代码:

package org.conan.myhadoop.recommend; import java.io.IOException; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO; public class Step2 { public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static Text k = new Text(); private final static IntWritable v = new IntWritable(1); @Override public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String[] tokens = Recommend.DELIMITER.split(values.toString()); for (int i = 1; i < tokens.length; i++) { String itemID = tokens[i].split(":")[0]; for (int j = 1; j < tokens.length; j++) { String itemID2 = tokens[j].split(":")[0]; k.set(itemID + ":" + itemID2); output.collect(k, v); } } } } public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } result.set(sum); output.collect(key, result); } } public static void run(Map<String, String> path) throws IOException { JobConf conf = Recommend.config(); String input = path.get("Step2Input"); String output = path.get("Step2Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(output); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class); conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class); conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(output)); RunningJob job = JobClient.runJob(conf); while (!job.isComplete()) { job.waitForCompletion(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgzxjf.html