write和readFields主要实现了把对象序列化成byte数组并写入到DataOutput中和从DataInput中读取byte数组并反序列化成对象。这样实现了一个自定义的Writable。
可以使用这个Writable来写SequenceFile,但是如果想用这个Writable在map/reduce中使用,会报错。因为在map/reduce过程中需要对key做sort,因此需要key实现org.apache.Hadoop.io.WritableComparable,此接口继承java.lang.Comparable和Writable,需要实现一个compareTo方法,用于在sort的时候比较两个对象。
一开始在项目中在compareTo中自己实现了一个逻辑,就是比较Writable中对象的属性。后来在map/reduce job中使用这个类的时候,发现在运行job的时候爆慢无比。后来发现主要的耗时都花在比较对象上,原因其实很简单,因为我们在write data的时候把对象序列化成byte数组,在compare的时候又要把byte数组反序列成对象再比较,如果数据多的话,不慢才怪。。。。其实在测试的时候也只用了100多M的数据,就需要运行3个小时左右。后来去看了一下hadoop Text的实现,发现在Text中实现的compare是直接使用byte数组来比较,因此没有了反序列化操作,再运行一下此job就很快运行完毕。最终的Writable实现如下:
public class MultipleObjectWritable extends BinaryComparable implements
Writable, WritableComparable<BinaryComparable> {
private MultipleObject multipleObject;
private static final byte[] EMPTY_BYTES = new byte[0];
private byte[] bytes;
private int length;
public MultipleObjectWritable() {
bytes = EMPTY_BYTES;
}
@Override
public void readFields(DataInput dataInput) throws IOException {
length = dataInput.readInt();
bytes = new byte[length];
dataInput.readFully(bytes);
if (multipleObject == null) {
multipleObject = new MultipleObject();
}
multipleObject = SerializeUtil.deserialize(bytes, length,
multipleObject.getClass());
}
@Override
public void write(DataOutput dataOutput) throws IOException {
if (multipleObject == null) {
throw new IOException("Inner multiple object is null");
}
DataOutputBuffer out = SerializeUtil.serialize(multipleObject);
if (out != null) {
bytes = out.getData();
length = out.getData().length;
dataOutput.writeInt(length);
dataOutput.write(bytes);
}
}
@Override
public int getLength() {
return length;
}
@Override
public byte[] getBytes() {
return bytes;
}
public MultipleObject getMultipleObject() {
return multipleObject;
}
public void setMultipleObject(MultipleObject multipleObject) {
this.multipleObject = multipleObject;
}
/** A WritableComparator optimized for Text keys. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(MultipleObjectWritable.class);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
}
}
static {
// register this comparator
WritableComparator.define(MultipleObjectWritable.class,
new Comparator());
}
}