public static void writeToAvro(File srcPath, OutputStream outputStream) throws IOException {
DataFileWriter<Object> writer = new DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100);
writer.setCodec(CodecFactory.snappyCodec());
writer.create(SCHEMA, outputStream);
for (Object obj : FileUtils.listFiles(srcPath, null, false)){
File file = (File) obj;
String filename = file.getAbsolutePath();
byte content[] = FileUtils.readFileToByteArray(file);
GenericRecord record = new GenericData.Record(SCHEMA);
record.put(FIELD_FILENAME, filename);
record.put(FIELD_CONTENTS, ByteBuffer.wrap(content));
writer.append(record);
System.out.println(file.getAbsolutePath() + ":"+ DigestUtils.md5Hex(content));
}
IOUtils.cleanup(null, writer);
IOUtils.cleanup(null, outputStream);
}
public static void main(String args[]) throws Exception {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
File sourceDir = new File(args[0]);
Path destFile = new Path(args[1]);
OutputStream os = hdfs.create(destFile);
writeToAvro(sourceDir, os);
}
}
public class Demo {
private static final String FIELD_FILENAME = "filename";
private static final String FIELD_CONTENTS = "contents";
public static void readFromAvro(InputStream is) throws IOException {
DataFileStream<Object> reader = new DataFileStream<Object>(is,new GenericDatumReader<Object>());
for (Object o : reader) {
GenericRecord r = (GenericRecord) o;
System.out.println(r.get(FIELD_FILENAME)+ ":"+DigestUtils.md5Hex(((ByteBuffer)r.get(FIELD_CONTENTS)).array()));
}
IOUtils.cleanup(null, is);
IOUtils.cleanup(null, reader);
}
public static void main(String... args) throws Exception {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
Path destFile = new Path(args[0]);
InputStream is = hdfs.open(destFile);
readFromAvro(is);
}
}