第一个reduce方法:
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ //(18989,[bean1,bean2,bean3]) @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upflow = 0; long sum_downflow = 0; // 将上行流量和下行流量分别累加 for(FlowBean bean:values){ sum_upflow += bean.getUpFlow(); sum_downflow += bean.getDownFlow(); } FlowBean resultBean = new FlowBean(sum_upflow,sum_downflow); context.write(key, resultBean); } }第一个驱动类
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 将默认配置文件传给job Job job = Job.getInstance(conf); // 告诉yarn jar包在哪 job.setJarByClass(FlowCount.class); //指定job要使用的map和reduce job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 指定map的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 指定最终输出的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // job的输入数据所在的目录 // 第一个参数:给哪个job设置 // 第二个参数:输入数据的目录,多个目录用逗号分隔 FileInputFormat.setInputPaths(job, new Path("/home/miao/input/flowcount/")); // job的数据输出在哪个目录 FileOutputFormat.setOutputPath(job, new Path("/home/miao/output/flowcount/")); //将jar包和配置文件提交给yarn // submit方法提交作业就退出该程序 // job.submit(); // waitForCompletion方法提交作业并等待作业执行 // true表示将作业信息打印出来,该方法会返回一个boolean值,表示是否成功运行 boolean result = job.waitForCompletion(true); // mr运行成功返回true,输出0表示运行成功,1表示失败 System.exit(result?0:1); }执行结果:
13726230503 4962 49362 54324 13826544101 528 0 528 13926251106 480 0 480 13926435656 264 3024 3288结果分析:
输出数据的格式已经符合了要求,但是并没有按照总流量大小降序排列,需要再写第二个mapreduce来达到最终结果第二个map方法:
static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ FlowBean bean = new FlowBean(); Text phone = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到的是上一个mapreduce程序的输出结果,各手机号和流量信息 String line = value.toString(); String[] fields = line.split("\t"); //获取手机号 String phonenum = fields[0]; //获取上行流量 long upFlow = Long.parseLong(fields[1]); //获取下行流量 long downFlow = Long.parseLong(fields[2]); //多次调用map函数时,只创建一个对象 bean.set(upFlow, downFlow); phone.set(phonenum); // write时,就将bean对象序列化出去了 reducer那边反序列化回对象 根据bean对象的sumFlow排序 //map结束后会分发给reduce,默认根据key的hash函数进行分发 //reduce要实现全局有序,必须只有一个reduce,否则分成多个reduce,只有在每个reduce产生的文件里是有序的 context.write(bean, phone); }第二个reduce方法:
static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ //<bean(),phonenum> 相同key的被分为一组,一起执行一次reduce //对于key是对象的情况下,不可能有两个对象相同(即使上行流量下行流量都相同),所以每组都只有一条数据 @Override protected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), bean); } }第二个驱动方法:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 将默认配置文件传给job Job job = Job.getInstance(conf); //指定自定义的map数据分区器 //job.setPartitionerClass(ProvincePartitioner.class); //根据partitioner里的分区数量,设置reduce的数量 //job.setNumReduceTasks(5); // 告诉yarn jar包在哪 job.setJarByClass(FlowCountSort.class); //指定job要使用的map和reduce job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 指定map的输出类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 指定最终输出的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // job的输入数据所在的目录 // 第一个参数:给那个job设置 // 第二个参数:输入数据的目录,多个目录用逗号分隔 FileInputFormat.setInputPaths(job, new Path(args[0])); //适用于做测试,不建议这么做 Path outpath = new Path(args[1]); //根据配置文件获取hdfs客户端对象 FileSystem fs = FileSystem.get(conf); // 如果输出目录存在就将其删除 if(fs.exists(outpath)){ fs.delete(outpath, true); } // job的数据输出在哪个目录 FileOutputFormat.setOutputPath(job, outpath); //将jar包和配置文件提交给yarn // submit方法提交作业就退出该程序 // job.submit(); // waitForCompletion方法提交作业并等待作业执行 // true表示将作业信息打印出来,该方法会返回一个boolean值,表示是否成功运行 boolean result = job.waitForCompletion(true); // mr运行成功返回true,输出0表示运行成功,1表示失败 System.exit(result?0:1); }输出结果:
13726230503 4962 49362 54324 13926435656 264 3024 3288 13826544101 528 0 528 13926251106 480 0 480结果分析:
已满足格式要求,并按总流量降序 mapreduce详细流程图文详解