我们通过下面这个天气数据处理的例子来说明Hadoop的运行原理。
1、Map-Reduce的逻辑过程假设我们需要处理一批有关天气的数据,其格式如下:
按照ASCII码存储,每行一条记录
每一行字符从0开始计数,第15个到第18个字符为年
第25个到第29个字符为温度,其中第25位是符号+/-
0067011990999991950051507+0000+
0043011990999991950051512+0022+
0043011990999991950051518-0011+
0043012650999991949032412+0111+
0043012650999991949032418+0078+
0067011990999991937051507+0001+
0043011990999991937051512-0002+
0043011990999991945051518+0001+
0043012650999991945032412+0002+
0043012650999991945032418+0078+
现在需要统计出每年的最高温度。
Map-Reduce主要包括两个步骤:Map和Reduce
每一步都有key-value对作为输入和输出:
map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应
对于上面的例子,在map过程,输入的key-value对如下:
(0, 0067011990999991950051507+0000+)
(33, 0043011990999991950051512+0022+)
(66, 0043011990999991950051518-0011+)
(99, 0043012650999991949032412+0111+)
(132, 0043012650999991949032418+0078+)
(165, 0067011990999991937051507+0001+)
(198, 0043011990999991937051512-0002+)
(231, 0043011990999991945051518+0001+)
(264, 0043012650999991945032412+0002+)
(297, 0043012650999991945032418+0078+)
在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(1937, 1)
(1937, -2)
(1945, 1)
(1945, 2)
(1945, 78)
在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入
(1950, [0, 22, –11])
(1949, [111, 78])
(1937, [1, -2])
(1945, [1, 2, 78])
在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:
(1950, 22)
(1949, 111)
(1937, 1)
(1945, 78)
其逻辑过程可用如下图表示:
下图大概描述了Map-Reduce的Job运行的基本原理:
下面我们讨论JobConf,其有很多的项可以进行配置:
setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
setMapperClass:设置Mapper,默认为IdentityMapper
setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
setReducerClass:设置Reducer,默认为IdentityReducer
setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在
当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
相关阅读: