数据集albums.csv包含了10万条音乐专辑的数据。主要字段说明如下:
album_title:音乐专辑名称
genre:专辑类型
year_of_pub: 专辑发行年份
num_of_tracks: 每张专辑中单曲数量
num_of_sales:专辑销量
rolling_stone_critic:滚石网站的评分
mtv_critic:全球最大音乐电视网MTV的评分
music_maniac_critic:音乐达人的评分
统计各类型专辑的滚石网站的平均评分
统计各类型专辑的销量总数
统计每年销售的专辑数量
统计各类型专辑的MTV平均评分
统计各类型专辑的数量
第一题 统计各类型专辑的滚石网站的平均评分
Map阶段代码
/** * @author HuangShen * Map阶段代码 * @date 2021-06-21 11:09 */ public class AlbumsPro1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); //排除第一行 if (!split[0].equals("id")) { Albums albums = new Albums( Long.parseLong(split[0]), Long.parseLong(split[1]), split[2], split[3], Integer.parseInt(split[4]), Integer.parseInt(split[5]), Integer.parseInt(split[6]), Double.parseDouble(split[7]), Double.parseDouble(split[8]), Double.parseDouble(split[9])); //k2 专辑类型 v2 评分集合 context.write(new Text(albums.getGenre()),new DoubleWritable(albums.getRolling_stone_critic())); } } }Reducer阶段代码
/** * @author ShuangShen * Reducer阶段代码。 * @date 2021-06-21 11:10 */ public class AlbumsPro1Reducer extends Reducer<Text, DoubleWritable, Text, Text> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double count=0.0; int i=0; for (DoubleWritable value : values) { count+=value.get(); i++; } DecimalFormat df = new DecimalFormat("#.00"); context.write(key,new Text(df.format(count/i)+"分")); } }执行任务
/** * @author HuangShen * @date 2021-06-21 12:07 */ public class AlbumsMain extends Configured implements Tool { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new AlbumsMain(), args); System.out.println("统计完成 请到 :50070/explorer.html#/albums_out 进行下载"); System.exit(run); } @Override public int run(String[] strings) throws Exception { //1 创建一个job 任务对象 Job job = Job.getInstance(super.getConf(), "Albums"); //防止 jar 包运行出错 job.setJarByClass(AlbumsMain.class); // 2 配置Job 任务对象 (8个步骤) job.setInputFormatClass(TextInputFormat.class); //第一步:指定Map 文件的读取方式 和读取路径 TextInputFormat.addInputPath(job, new Path("hdfs://10.158.83.240:9000/albums_input")); //第二步 指定Map 阶段的处理方式和数据类型 System.out.println("请选择抽到的题目:"); System.out.println("1)\t统计各类型专辑的滚石网站的平均评分:"); System.out.println("2)\t统计各类型专辑的销量总数。:"); System.out.println("3)\t统计每年销售的专辑数量。:"); System.out.println("4)\t统计各类型专辑的MTV平均评分。:"); System.out.println("5)\t统计各类型专辑的数量。:"); System.out.println("请输入题号:"); Scanner scanner = new Scanner(System.in); String num = scanner.nextLine(); switch (num){ case "1":{ System.out.println("选择了 1)\t统计各类型专辑的滚石网站的平均评分:"); creatJob(job, AlbumsPro1Mapper.class,Text.class,DoubleWritable.class,AlbumsPro1Reducer.class,Text.class,DoubleWritable.class); break; } case "2":{ System.out.println("选择了 2)\t统计各类型专辑的销量总数。:"); creatJob(job, AlbumsPro2Mapper.class,Text.class, LongWritable.class, AlbumsPro2Reducer.class,Text.class,LongWritable.class); break; } case "3":{ System.out.println("选择了 3)\t统计每年销售的专辑数量。:"); creatJob(job, AlbumsPro3Mapper.class,LongWritable.class, LongWritable.class, AlbumsPro3Reducer.class,LongWritable.class,LongWritable.class); break; } case "4":{ System.out.println("选择了 4)\t统计各类型专辑的MTV平均评分。:"); creatJob(job, AlbumsPro4Mapper.class,Text.class, DoubleWritable.class, AlbumsPro4Reducer.class,Text.class,DoubleWritable.class); break; } case "5":{ System.out.println("选择了 5)\t统计各类型专辑的数量。:"); creatJob(job, AlbumsPro5Mapper.class,Text.class, LongWritable.class, AlbumsPro5Reducer.class,Text.class,LongWritable.class); break; } default:{ System.out.println("输入有误! 请输入题号"); break; } } //第八步设置输出类型 job.setOutputFormatClass(TextOutputFormat.class); //设置输出的路径 Path outputDir = new Path("hdfs://10.158.83.240:9000/albums_out"); TextOutputFormat.setOutputPath(job, outputDir); //获取文件系统 目标文件存在就删除 FileSystem fileSystem = FileSystem.get(new URI("hdfs://10.158.83.240:9000"), new Configuration()); if (fileSystem.exists(outputDir)) { fileSystem.delete(outputDir,true); } //等待任务结束 return job.waitForCompletion(true)?0:1; } //创建 job private Job creatJob(Job job,Class<? extends Mapper> mapper,Class<?> k1,Class<?> v1,Class<? extends Reducer> reducer,Class<?> k3,Class<?> v3){ //设置Map 阶段处理方式 job.setMapperClass(mapper); //设置 Map 阶段K2 的数据类型 job.setMapOutputKeyClass(k1); //设置 Map 阶段 V2 的数据类型 job.setMapOutputValueClass(v1); //第三 四 五 六 采用默认方式 //第七步指定Reduce阶段的处理方式和数据类型 job.setReducerClass(reducer); //设置K3 类型 job.setMapOutputKeyClass(k3); //设置V3 类型 job.setMapOutputValueClass(v3); return job; } }完整代码 点击下载