5. MapReduce程序开发
编写一个简单的MapReduce程序,实现wordcount功能。
新一个Java文件:WordCount.java
package org.conan.myHadoop.mr;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
importorg.apache.hadoop.mapred.FileOutputFormat;
importorg.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class WordCountMapper extends MapReduceBase implementsMapper<Object, Text, Text, IntWritable> {
private final static IntWritable one =new IntWritable(1);
private Textword =new Text();
@Override
public void map(Object key, Text value,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {
StringTokenizer itr = newStringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word,one);
}
}
}
public static class WordCountReducer extends MapReduceBase implementsReducer<Text, IntWritable, Text, IntWritable> {
private IntWritableresult =new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable>values, OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {
int sum = 0;
while (values.hasNext()){
sum +=values.next().get();
}
result.set(sum);
output.collect(key, result);
}
}
public static void main(String[] args)throws Exception {
String input = "hdfs://10.1.32.91:9000/user/licz/hdfs/o_t_account/test.txt";
String output = "hdfs:// 10.1.32.91:9000/user/licz/hdfs/o_t_account/result";
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("WordCount");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WordCountMapper.class);
conf.setCombinerClass(WordCountReducer.class);
conf.setReducerClass(WordCountReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(input));
FileOutputFormat.setOutputPath(conf,new Path(output));
JobClient.runJob(conf);
System.exit(0);
}
}
首次运行控制台错误
2013-9-3019:25:02 org.apache.hadoop.util.NativeCodeLoader
警告: Unable toload native-hadoop library for your platform... using builtin-java classeswhere applicable
2013-9-3019:25:02 org.apache.hadoop.security.UserGroupInformation doAs
严重: PriviledgedActionExceptionas:Administrator cause:java.io.IOException: Failed to set permissions of path:\tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to0700
Exception inthread "main" java.io.IOException: Failed to set permissions of path:\tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to0700
atorg.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:689)
atorg.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:662)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509)
atorg.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
atorg.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189)
at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)
atorg.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856)
atorg.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
atjava.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
atorg.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
atorg.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
atorg.conan.myhadoop.mr.WordCount.main(WordCount.java:78)
这个错误是win中开发特有的错误,文件权限���题,在Linux下可以正常运行。
解决方法是,修改/hadoop-1.2.1/src/core/org/apache/hadoop/fs/FileUtil.java文件
688-692行注释,然后重新编译源代码,重新打一个hadoop.jar的包。
685private static void checkReturnValue(boolean rv, File p,
686 FsPermissionpermission
687 )throws IOException {
688 /*if (!rv) {
689 throw new IOException("Failed toset permissions of path: " + p +
690 " to " +
691 String.format("%04o",permission.toShort()));
692 }*/
693 }
注:为了方便,我直接在网上下载的已经编译好的hadoop-core-1.2.1.jar包
我们还要替换maven中的hadoop类库。
~cp lib/hadoop-core-1.2.1.jarC:\Users\licz\.m2\repository\org\apache\hadoop\hadoop-core\1.2.1\hadoop-core-1.2.1.jar
启动Java APP,控制台输出:
2014-1-17 11:25:27 org.apache.hadoop.util.NativeCodeLoader <clinit>
警告: Unable to load native-hadoop library for yourplatform... using builtin-java classes where applicable
2014-1-17 11:25:27 org.apache.hadoop.mapred.JobClientcopyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments.Applications should implement Tool for the same.
2014-1-17 11:25:27 org.apache.hadoop.mapred.JobClientcopyAndConfigureFiles
警告: No job jar file set. User classes may not be found. See JobConf(Class) orJobConf#setJar(String).
2014-1-17 11:25:27 org.apache.hadoop.io.compress.snappy.LoadSnappy<clinit>
警告: Snappy native library not loaded
2014-1-17 11:25:27 org.apache.hadoop.mapred.FileInputFormat listStatus
信息: Total input paths to process : 1
2014-1-17 11:25:27 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local2052617578_0001
2014-1-17 11:25:27 org.apache.hadoop.mapred.LocalJobRunner$Job run
信息: Waiting for map tasks
2014-1-17 11:25:27org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable run
信息: Starting task: attempt_local2052617578_0001_m_000000_0
2014-1-17 11:25:28 org.apache.hadoop.mapred.Task initialize
信息: UsingResourceCalculatorPlugin : null
2014-1-17 11:25:28 org.apache.hadoop.mapred.MapTask updateJobWithSplit
信息: Processing split: hdfs://10.1.32.91:9000/user/licz/hdfs/o_t_account/test.txt:0+46
2014-1-17 11:25:28 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2014-1-17 11:25:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer<init>
信息: io.sort.mb = 100
2014-1-17 11:25:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer<init>
信息: data buffer = 79691776/99614720
2014-1-17 11:25:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer<init>
信息: record buffer = 262144/327680
2014-1-17 11:25:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2014-1-17 11:25:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffersortAndSpill
信息: Finished spill 0
2014-1-17 11:25:28 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local2052617578_0001_m_000000_0 is done.And is in the process of commiting
2014-1-17 11:25:28 org.apache.hadoop.mapred.LocalJobRunner$JobstatusUpdate
信息: hdfs://10.1.32.91:9000/user/licz/hdfs/o_t_account/test.txt:0+46
2014-1-17 11:25:28 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local2052617578_0001_m_000000_0' done.
2014-1-17 11:25:28org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable run
信息: Finishing task: attempt_local2052617578_0001_m_000000_0
2014-1-17 11:25:28 org.apache.hadoop.mapred.LocalJobRunner$Job run
信息: Map task executor complete.
2014-1-17 11:25:28 org.apache.hadoop.mapred.Task initialize
信息: UsingResourceCalculatorPlugin : null
2014-1-17 11:25:28 org.apache.hadoop.mapred.LocalJobRunner$JobstatusUpdate
信息:
2014-1-17 11:25:28 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2014-1-17 11:25:28 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left oftotal size: 80 bytes
2014-1-17 11:25:28 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息:
2014-1-17 11:25:28 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local2052617578_0001_r_000000_0 is done.And is in the process of commiting
2014-1-17 11:25:28 org.apache.hadoop.mapred.LocalJobRunner$JobstatusUpdate
信息:
2014-1-17 11:25:28 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local2052617578_0001_r_000000_0 is allowedto commit now
2014-1-17 11:25:28 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task'attempt_local2052617578_0001_r_000000_0' to hdfs://10.1.32.91:9000/user/licz/hdfs/o_t_account/result
2014-1-17 11:25:28 org.apache.hadoop.mapred.LocalJobRunner$JobstatusUpdate
信息: reduce > reduce
2014-1-17 11:25:28 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local2052617578_0001_r_000000_0' done.
2014-1-17 11:25:28 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: map 100% reduce100%
2014-1-17 11:25:28 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local2052617578_0001
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: File InputFormat Counters
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Bytes Read=46
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: File OutputFormat Counters
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: BytesWritten=50
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: FileSystemCounters
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: FILE_BYTES_READ=430
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: HDFS_BYTES_READ=92
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: FILE_BYTES_WRITTEN=137344
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: HDFS_BYTES_WRITTEN=50
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Map-ReduceFramework
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Map output materializedbytes=84
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Map inputrecords=5
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Reduceshuffle bytes=0
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Spilled Records=14
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Map outputbytes=81
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Totalcommitted heap usage (bytes)=1029046272
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Map inputbytes=46
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Combine inputrecords=9
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: SPLIT_RAW_BYTES=111
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Reduce inputrecords=7
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Reduce inputgroups=7
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Combineoutput records=7
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Reduce outputrecords=7
2014-1-17 11:25:28 org.apache.hadoop.mapred.Counters log
信息: Map outputrecords=9
成功运行了wordcount程序,通过命令我们查看输出结果
或是在eclipse导出jar包,上传到服务器运行
File-> export -> java -> JAR file
下一步
如果不选择Main class,要在运行时指定main class,如下
[licz@nticket1~]$ hadoop jar /app/hadoop/WordCount.jarWordCount