Hadoop实例RandomWriter

RandomWriter(随机写)例子利用 Map/Reduce把 数据随机的写到dfs中。每个map输入单个文件名,然后随机写BytesWritable的键和值到DFS顺序文件。map没有产生任何输出,所以reduce没有执行。产生的数据是可以配置的。配置变量如下

名字

 

默认值

 

描述

 

test.randomwriter.maps_per_host

 

10

 

Number of maps/host

 

test.randomwrite.bytes_per_map

 

1073741824

 

Number of bytes written/map

 

test.randomwrite.min_key

 

10

 

minimum size of the key in bytes

 

test.randomwrite.max_key

 

1000

 

maximum size of the key in bytes

 

test.randomwrite.min_value

 

0

 

minimum size of the value

 

test.randomwrite.max_value

 

20000

 

maximum size of the value

 

test.randomwriter.maps_per_host表示每个slave节点上运行map的次数。默认情况下,即只有一个数据节点,那么就有10个map,每个map的数据量为1G,因此要将10G数据写入到hdfs中。不过我配置的试验环境中只有2个slave节点,因此有两个map。

test.randomwrite.bytes_per_map我原本以为是随机写输出的测试文件的大小,默认为1G=1*1024*1024*1024,但是我将这个数据改成1*1024*1024以后,输出的测试文件还是1G,这让我很不解。(?)

代码实例

其中test.randomwrite.bytes_per_map=1*1024*1024,test.randomwriter.maps_per_host=1。

/**   * Licensed to the Apache Software Foundation (ASF) under one   * or more contributor license agreements.  See the NOTICE file   * distributed with this work for additional information   * regarding copyright ownership.  The ASF licenses this file   * to you under the Apache License, Version 2.0 (the   * "License"); you may not use this file except in compliance   * with the License.  You may obtain a copy of the License at   *   *        *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   * See the License for the specific language governing permissions and   * limitations under the License.   */      package org.apache.Hadoop.examples;      import java.io.IOException;   import java.util.Date;   import java.util.Random;      import org.apache.hadoop.conf.Configuration;   import org.apache.hadoop.conf.Configured;   import org.apache.hadoop.fs.Path;   import org.apache.hadoop.io.BytesWritable;   import org.apache.hadoop.io.Text;   import org.apache.hadoop.io.Writable;   import org.apache.hadoop.io.WritableComparable;   import org.apache.hadoop.mapred.ClusterStatus;   import org.apache.hadoop.mapred.FileOutputFormat;   import org.apache.hadoop.mapred.FileSplit;   import org.apache.hadoop.mapred.InputFormat;   import org.apache.hadoop.mapred.InputSplit;   import org.apache.hadoop.mapred.JobClient;   import org.apache.hadoop.mapred.JobConf;   import org.apache.hadoop.mapred.MapReduceBase;   import org.apache.hadoop.mapred.Mapper;   import org.apache.hadoop.mapred.OutputCollector;   import org.apache.hadoop.mapred.RecordReader;   import org.apache.hadoop.mapred.Reporter;   import org.apache.hadoop.mapred.SequenceFileOutputFormat;   import org.apache.hadoop.mapred.lib.IdentityReducer;   import org.apache.hadoop.util.GenericOptionsParser;   import org.apache.hadoop.util.Tool;   import org.apache.hadoop.util.ToolRunner;      /**   * This program uses map/reduce to just run a distributed job where there is   * no interaction between the tasks and each task write a large unsorted   * random binary sequence file of BytesWritable.   * In order for this program to generate data for terasort with 10-byte keys   * and 90-byte values, have the following config:   * <xmp>   * <?xml version="1.0"?>   * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>   * <configuration>   *   <property>   *     <name>test.randomwrite.min_key</name>   *     <value>10</value>   *   </property>   *   <property>   *     <name>test.randomwrite.max_key</name>   *     <value>10</value>   *   </property>   *   <property>   *     <name>test.randomwrite.min_value</name>   *     <value>90</value>   *   </property>   *   <property>   *     <name>test.randomwrite.max_value</name>   *     <value>90</value>   *   </property>   *   <property>   *     <name>test.randomwrite.total_bytes</name>   *     <value>1099511627776</value>   *   </property>   * </configuration></xmp>   *    * Equivalently, {@link RandomWriter} also supports all the above options   * and ones supported by {@link GenericOptionsParser} via the command-line.   */   public class RandomWriter extends Configured implements Tool {          /**     * User counters     */     static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }          /**     * A custom input format that creates virtual inputs of a single string     * for each map.     */     static class RandomInputFormat implements InputFormat<Text, Text> {          /**        * Generate the requested number of file splits, with the filename       * set to the filename of the output file.       */       public InputSplit[] getSplits(JobConf job,                                      int numSplits) throws IOException {         InputSplit[] result = new InputSplit[numSplits];         Path outDir = FileOutputFormat.getOutputPath(job);         for(int i=0; i < result.length; ++i) {           result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,                                      (String[])null);         }         return result;       }          /**       * Return a single record (filename, "") where the filename is taken from       * the file split.       */       static class RandomRecordReader implements RecordReader<Text, Text> {         Path name;         public RandomRecordReader(Path p) {           name = p;         }         public boolean next(Text key, Text value) {           if (name != null) {             key.set(name.getName());             name = null;             return true;           }           return false;         }         public Text createKey() {           return new Text();         }         public Text createValue() {           return new Text();         }         public long getPos() {           return 0;         }         public void close() {}         public float getProgress() {           return 0.0f;         }       }          public RecordReader<Text, Text> getRecordReader(InputSplit split,                                           JobConf job,                                            Reporter reporter) throws IOException {         return new RandomRecordReader(((FileSplit) split).getPath());       }     }        static class Map extends MapReduceBase       implements Mapper<WritableComparable, Writable,                         BytesWritable, BytesWritable> {              private long numBytesToWrite;       private int minKeySize;       private int keySizeRange;       private int minValueSize;       private int valueSizeRange;       private Random random = new Random();       private BytesWritable randomKey = new BytesWritable();       private BytesWritable randomValue = new BytesWritable();              private void randomizeBytes(byte[] data, int offset, int length) {         for(int i=offset + length - 1; i >= offset; --i) {           data[i] = (byte) random.nextInt(256);         }       }              /**       * Given an output filename, write a bunch of random records to it.       */       public void map(WritableComparable key,                        Writable value,                       OutputCollector<BytesWritable, BytesWritable> output,                        Reporter reporter) throws IOException {         int itemCount = 0;         while (numBytesToWrite > 0) {           int keyLength = minKeySize +              (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);           randomKey.setSize(keyLength);           randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());           int valueLength = minValueSize +             (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);           randomValue.setSize(valueLength);           randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());           output.collect(randomKey, randomValue);           numBytesToWrite -= keyLength + valueLength;           reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);           reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);           if (++itemCount % 200 == 0) {             reporter.setStatus("wrote record " + itemCount + ". " +                                 numBytesToWrite + " bytes left.");           }         }         reporter.setStatus("done with " + itemCount + " records.");       }              /**       * Save the values out of the configuaration that we need to write       * the data.       */       @Override       public void configure(JobConf job) {         numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",                                       1*1024*1024);         minKeySize = job.getInt("test.randomwrite.min_key", 10);         keySizeRange =            job.getInt("test.randomwrite.max_key", 1000) - minKeySize;         minValueSize = job.getInt("test.randomwrite.min_value", 0);         valueSizeRange =            job.getInt("test.randomwrite.max_value", 20000) - minValueSize;       }            }          /**     * This is the main routine for launching a distributed random write job.     * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.     * The reduce doesn't do anything.     *      * @throws IOException      */     public int run(String[] args) throws Exception {           if (args.length == 0) {         System.out.println("Usage: writer <out-dir>");         ToolRunner.printGenericCommandUsage(System.out);         return -1;       }              Path outDir = new Path(args[0]);       JobConf job = new JobConf(getConf());              job.setJarByClass(RandomWriter.class);       job.setJobName("random-writer");       FileOutputFormat.setOutputPath(job, outDir);              job.setOutputKeyClass(BytesWritable.class);       job.setOutputValueClass(BytesWritable.class);              job.setInputFormat(RandomInputFormat.class);       job.setMapperClass(Map.class);               job.setReducerClass(IdentityReducer.class);       job.setOutputFormat(SequenceFileOutputFormat.class);              JobClient client = new JobClient(job);       ClusterStatus cluster = client.getClusterStatus();       int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 1);       long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",                                                1*1024*1024);       if (numBytesToWritePerMap == 0) {         System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");         return -2;       }       long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",             numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());       int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);       if (numMaps == 0 && totalBytesToWrite > 0) {         numMaps = 1;         job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);       }              job.setNumMapTasks(numMaps);       System.out.println("Running " + numMaps + " maps.");              // reducer NONE        job.setNumReduceTasks(0);              Date startTime = new Date();       System.out.println("Job started: " + startTime);       JobClient.runJob(job);       Date endTime = new Date();       System.out.println("Job ended: " + endTime);       System.out.println("The job took " +                           (endTime.getTime() - startTime.getTime()) /1000 +                           " seconds.");              return 0;     }          public static void main(String[] args) throws Exception {       int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);       System.exit(res);     }      }  

输出信息:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/pppzz.html