用Hadoop AVRO进行大量小文件的处理(2)

public static void writeToAvro(File srcPath, OutputStream outputStream) throws IOException {
  DataFileWriter<Object> writer = new  DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100);
  writer.setCodec(CodecFactory.snappyCodec());
  writer.create(SCHEMA, outputStream);
  for (Object obj : FileUtils.listFiles(srcPath, null, false)){
   File file = (File) obj;
   String filename = file.getAbsolutePath();
   byte content[] = FileUtils.readFileToByteArray(file);
   GenericRecord record = new GenericData.Record(SCHEMA);
   record.put(FIELD_FILENAME, filename);
   record.put(FIELD_CONTENTS, ByteBuffer.wrap(content));
   writer.append(record);
   System.out.println(file.getAbsolutePath() + ":"+ DigestUtils.md5Hex(content));
  }
  IOUtils.cleanup(null, writer);
  IOUtils.cleanup(null, outputStream);
 }

public static void main(String args[]) throws Exception {
  Configuration config = new Configuration();
  FileSystem hdfs = FileSystem.get(config);
  File sourceDir = new File(args[0]);
  Path destFile = new Path(args[1]);
  OutputStream os = hdfs.create(destFile);
  writeToAvro(sourceDir, os);
 }
}

public class Demo {
 private static final String FIELD_FILENAME = "filename";
 private static final String FIELD_CONTENTS = "contents";

public static void readFromAvro(InputStream is) throws  IOException {
  DataFileStream<Object> reader = new DataFileStream<Object>(is,new GenericDatumReader<Object>());
  for (Object o : reader) {
   GenericRecord r = (GenericRecord) o;
   System.out.println(r.get(FIELD_FILENAME)+ ":"+DigestUtils.md5Hex(((ByteBuffer)r.get(FIELD_CONTENTS)).array()));
  }
  IOUtils.cleanup(null, is);
  IOUtils.cleanup(null, reader);
 }

public static void main(String... args) throws Exception {
  Configuration config = new Configuration();
  FileSystem hdfs = FileSystem.get(config);
  Path destFile = new Path(args[0]);
  InputStream is = hdfs.open(destFile);
  readFromAvro(is);
 }
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/6e8aaa407f3c3a7d4a3323bc53f77c55.html