Hadoop学习总结之三:Map(2)

2、编写Map-Reduce程序

编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数和reducer中的reduce函数。

一般遵循以下格式:

map: (K1, V1)  ->  list(K2, V2)

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {

void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)

throws IOException;

}

 
reduce: (K2, list(V))  ->  list(K3, V3) 

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

void reduce(K2 key, Iterator<V2> values,

OutputCollector<K3, V3> output, Reporter reporter)

throws IOException;

}

 

对于上面的例子,则实现的mapper如下:

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

@Override

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(25) == '+') {

airTemperature = Integer.parseInt(line.substring(26, 30));

} else {

airTemperature = Integer.parseInt(line.substring(25, 30));

}

output.collect(new Text(year), new IntWritable(airTemperature));

}

}

 

实现的reducer如下:

public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

int maxValue = Integer.MIN_VALUE;

while (values.hasNext()) {

maxValue = Math.max(maxValue, values.next().get());

}

output.collect(key, new IntWritable(maxValue));

}

}

 

欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:

输入的数据,也即需要处理的数据 Map-Reduce程序,也即上面实现的Mapper和Reducer 此任务的配置项JobConf

欲配置JobConf,需要大致了解Hadoop运行job的基本原理:

Hadoop将Job分成task进行处理,共两种task:map task和reduce task Hadoop有两类的节点控制job的运行:JobTracker和TaskTracker JobTracker协调整个job的运行,将task分配到不同的TaskTracker上 TaskTracker负责运行task,并将结果返回给JobTracker Hadoop将输入数据分成固定大小的块,我们称之input split Hadoop为每一个input split创建一个task,在此task中依次处理此split中的一个个记录(record) Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以input split的大小也一般是HDFS的block的大小。 Reduce task的输入一般为Map Task的输出,Reduce Task的输出为整个job的输出,保存在HDFS上。 在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partition partition的规则为:(K2, V2) –> Integer, 也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。

public interface Partitioner<K2, V2> extends JobConfigurable {

int getPartition(K2 key, V2 value, int numPartitions);

}

 

下图大概描述了Map-Reduce的Job运行的基本原理:

image

下面我们讨论JobConf,其有很多的项可以进行配置:

setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数 setMapperClass:设置Mapper,默认为IdentityMapper setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数 setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式 setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式 setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数 setReducerClass:设置Reducer,默认为IdentityReducer setOutputFormat:设置任务的输出格式,默认为TextOutputFormat FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径 FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:

public class MaxTemperature {

public static void main(String[] args) throws IOException {

if (args.length != 2) {

System.err.println("Usage: MaxTemperature <input path> <output path>");

System.exit(-1);

}

JobConf conf = new JobConf(MaxTemperature.class);

conf.setJobName("Max temperature");

FileInputFormat.addInputPath(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setMapperClass(MaxTemperatureMapper.class);

conf.setReducerClass(MaxTemperatureReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);

}

}

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppfgz.html