mapreduce过程详解(基于hadoop2.x架构)

本文基于hadoop2.x架构详细描述了mapreduce的执行过程,包括partition,combine,shuffle等组件以及yarn平台与mapreduce编程模型的关系。

mapreduce的简介和优点

mapreduce是一个分布式运算程序的编程框架,是hadoop数据分析的核心.

mapreduce的核心思想是将用户编写的逻辑代码和架构中的各个组件整合成一个分布式运算程序,实现一定程序的并行处理海量数据,提高效率.

海量数据难以在单机上处理,而一旦将单机版程序扩展到集群上进行分布式运行势必将大大增加程序的复杂程度.引入mapreduce架构,开发人员可以将精力集中于数据处理的核心业务逻辑上,而将分布式程序中的公共功能封装成框架,以降低开发的难度.

一个完整的mapreduce程序有三类实例进程

MRAppMaster:负责整个程序的协调过程

MapTask:负责map阶段的数据处理

ReduceTask:负责reduce阶段的数据处理

案例(统计各个手机号的上传和下载流量总和)

数据展示

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200

数据解释:

每行数据的第二列数据是手机号,倒数第三列表示上行流量,倒数第二列表示下行流量

要求:

根据总流量降序排列

输出格式要求:

手机号 上行流量 下行流量 总流量

创建bean对象用于封装上行流量,下行流量和总流量:

package com.xiaojie.flowcount; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; //作为key输出的时候都要排序 //不要排序的话,可实现Writable //实现WritableComparable是为了实现比较大小,排序的功能 public class FlowBean implements WritableComparable<FlowBean>{ private Long upFlow; private Long downFlow; private Long sumFlow; //反序列化时需要反射调用空参构造函数,显示地定义一个 public FlowBean(){} public FlowBean(Long upFlow, Long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(Long upFlow, Long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } //反序列化方法 public void readFields(DataInput in) throws IOException { //反序列化的顺序和序列化的顺序一致 upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } //序列化方法 public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } public Long getSumFlow() { return sumFlow; } public void setSumFlow(Long sumFlow) { this.sumFlow = sumFlow; } //toString方法可控制bean对象被写出在文件时的格式 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow ; } //大的话返回-1,表示排在前面,即降序排序 public int compareTo(FlowBean o) { return this.sumFlow > o.getSumFlow()?-1:1; } }

第一个map方法:

static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { // 每一行读进来的数据转化为String类型 String line = value.toString(); //根据tab分割 String[] fields = line.split("\t"); //取出手机号 String phonenum = fields[1]; //取出上行流量 将String转为Long Long upFlow = Long.parseLong(fields[fields.length-3]); //取出下行流量 long downFlow = Long.parseLong(fields[fields.length-2]); // 把数据发送给reduce context.write(new Text(phonenum), new FlowBean(upFlow, downFlow)); } }

partition(分区方法):

//根据省份分发给不同的reduce程序,其输入数据是map的输出 public class ProvincePartitioner extends Partitioner<Text, FlowBean>{ public static HashMap<String, Integer> provinceDict = new HashMap<String, Integer>(); static{ provinceDict.put("136", 0); provinceDict.put("137", 1); provinceDict.put("138", 2); provinceDict.put("139", 3); } //返回的是分区号 给哪个reduce @Override public int getPartition(Text key, FlowBean value, int num_partitioner) { // 根据手机号前三位分省份,分给不同的reduce String phone_num = key.toString().substring(0, 3); Integer provinceId = provinceDict.get(phone_num); return provinceId==null?4:provinceId; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppygp.html