基于hadoop的图书推荐 (2)

1). 建立物品的同现矩阵
按用户分组,找到每个用户所选的物品,单独出现计数及两两一组计数。

[101] [102] [103] [104] [105] [106] [107] [101] 5 3 4 4 2 2 1 [102] 3 3 3 2 1 1 0 [103] 4 3 4 3 1 2 0 [104] 4 2 3 4 2 2 1 [105] 2 1 1 2 2 1 1 [106] 2 1 2 2 1 2 0 [107] 1 0 0 1 1 0 1

2). 建立用户对物品的评分矩阵
按用户分组,找到每个用户所选的物品及评分

U3 [101] 2.0 [102] 0.0 [103] 0.0 [104] 4.0 [105] 4.5 [106] 0.0 [107] 5.0

3). 矩阵计算推荐结果
同现矩阵*评分矩阵=推荐结果

alogrithm_1

图片摘自”Mahout In Action”

MapReduce任务设计

aglorithm_2

图片摘自”Mahout In Action”

解读MapRduce任务:

步骤1: 按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵

步骤2: 对物品组合列表进行计数,建立物品的同现矩阵

步骤3: 合并同现矩阵和评分矩阵

步骤4: 计算推荐结果列表

4. 架构设计:推荐系统架构

hadoop-recommand-architect

上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

业务系统记录了用户的行为和对物品的打分

设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid,itemid,value,time)。

完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。

完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。

5. 程序开发:MapReduce程序实现

win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目

新建Java类:

Recommend.java,主任务启动程序

Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵

Step2.java,对物品组合列表进行计数,建立物品的同现矩阵

Step3.java,合并同现矩阵和评分矩阵

Step4.java,计算推荐结果列表

HdfsDAO.java,HDFS操作工具类

1). Recommend.java,主任务启动程序
源代码:

package org.conan.myhadoop.recommend; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; import org.apache.hadoop.mapred.JobConf; public class Recommend { public static final String HDFS = "hdfs://192.168.1.210:9000"; public static final Pattern DELIMITER = Pattern.compile("[\t,]"); public static void main(String[] args) throws Exception { Map<String, String> path = new HashMap<String, String>(); path.put("data", "logfile/small.csv"); path.put("Step1Input", HDFS + "/user/hdfs/recommend"); path.put("Step1Output", path.get("Step1Input") + "/step1"); path.put("Step2Input", path.get("Step1Output")); path.put("Step2Output", path.get("Step1Input") + "/step2"); path.put("Step3Input1", path.get("Step1Output")); path.put("Step3Output1", path.get("Step1Input") + "/step3_1"); path.put("Step3Input2", path.get("Step2Output")); path.put("Step3Output2", path.get("Step1Input") + "/step3_2"); path.put("Step4Input1", path.get("Step3Output1")); path.put("Step4Input2", path.get("Step3Output2")); path.put("Step4Output", path.get("Step1Input") + "/step4"); Step1.run(path); Step2.run(path); Step3.run1(path); Step3.run2(path); Step4.run(path); System.exit(0); } public static JobConf config() { JobConf conf = new JobConf(Recommend.class); conf.setJobName("Recommend"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); return conf; } }

2). Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵

源代码:

package org.conan.myhadoop.recommend; import java.io.IOException; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO; public class Step1 { public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> { private final static IntWritable k = new IntWritable(); private final static Text v = new Text(); @Override public void map(Object key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { String[] tokens = Recommend.DELIMITER.split(value.toString()); int userID = Integer.parseInt(tokens[0]); String itemID = tokens[1]; String pref = tokens[2]; k.set(userID); v.set(itemID + ":" + pref); output.collect(k, v); } } public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> { private final static Text v = new Text(); @Override public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { StringBuilder sb = new StringBuilder(); while (values.hasNext()) { sb.append("," + values.next()); } v.set(sb.toString().replaceFirst(",", "")); output.collect(key, v); } } public static void run(Map<String, String> path) throws IOException { JobConf conf = Recommend.config(); String input = path.get("Step1Input"); String output = path.get("Step1Output"); HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); hdfs.rmr(input); hdfs.mkdirs(input); hdfs.copyFile(path.get("data"), input); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(Step1_ToItemPreMapper.class); conf.setCombinerClass(Step1_ToUserVectorReducer.class); conf.setReducerClass(Step1_ToUserVectorReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(output)); RunningJob job = JobClient.runJob(conf); while (!job.isComplete()) { job.waitForCompletion(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgzxjf.html