package com.wn.flow; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowwBean> { private Text phone=new Text(); private FlowwBean flow=new FlowwBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); phone.set(split[1]); long upFlow = Long.parseLong(split[split.length - 3]); long downFlow = Long.parseLong(split[split.length - 2]); flow.set(upFlow,downFlow); context.write(phone,flow); } }
3.3 编写FlowReducerpackage com.wn.flow; import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; public class FlowReducer extends Reducer<Text,FlowwBean,Text,FlowwBean> { private FlowwBean sumFlow=new FlowwBean(); @Override protected void reduce(Text key, Iterable<FlowwBean> values, Context context) throws IOException, InterruptedException { long sumUpFlow=0; long sumDownFlow=0; for (FlowwBean value:values){ sumUpFlow+=value.getUpFlow(); sumDownFlow+=value.getDownFlow(); } sumFlow.set(sumUpFlow,sumDownFlow); context.write(key,sumFlow); } }
3.4 编写FlowDriver
package com.wn.flow; import com.wn.wordcount.WcDriver; import com.wn.wordcount.WcMapper; import com.wn.wordcount.WcReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取一个Job实例 Job job = Job.getInstance(new Configuration()); //设置类路径 job.setJarByClass(WcDriver.class); //设置mapper和reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //设置mapper和reducer输出类型 job.setMapOutputKeyClass(org.apache.hadoop.io.Text.class); job.setMapOutputValueClass(FlowwBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowwBean.class); //设置输入的数据 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交job boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }