自定义Hadoop Writable(2)

write和readFields主要实现了把对象序列化成byte数组并写入到DataOutput中和从DataInput中读取byte数组并反序列化成对象。这样实现了一个自定义的Writable。

可以使用这个Writable来写SequenceFile,但是如果想用这个Writable在map/reduce中使用,会报错。因为在map/reduce过程中需要对key做sort,因此需要key实现org.apache.Hadoop.io.WritableComparable,此接口继承java.lang.Comparable和Writable,需要实现一个compareTo方法,用于在sort的时候比较两个对象。

一开始在项目中在compareTo中自己实现了一个逻辑,就是比较Writable中对象的属性。后来在map/reduce job中使用这个类的时候,发现在运行job的时候爆慢无比。后来发现主要的耗时都花在比较对象上,原因其实很简单,因为我们在write data的时候把对象序列化成byte数组,在compare的时候又要把byte数组反序列成对象再比较,如果数据多的话,不慢才怪。。。。其实在测试的时候也只用了100多M的数据,就需要运行3个小时左右。后来去看了一下hadoop Text的实现,发现在Text中实现的compare是直接使用byte数组来比较,因此没有了反序列化操作,再运行一下此job就很快运行完毕。最终的Writable实现如下:

public class MultipleObjectWritable extends BinaryComparable implements 
        Writable, WritableComparable<BinaryComparable> { 
    private MultipleObject multipleObject; 
    private static final byte[] EMPTY_BYTES = new byte[0]; 
    private byte[] bytes; 
    private int length; 
 
    public MultipleObjectWritable() { 
        bytes = EMPTY_BYTES; 
    } 
 
    @Override 
    public void readFields(DataInput dataInput) throws IOException { 
        length = dataInput.readInt(); 
        bytes = new byte[length]; 
        dataInput.readFully(bytes); 
        if (multipleObject == null) { 
            multipleObject = new MultipleObject(); 
        } 
        multipleObject = SerializeUtil.deserialize(bytes, length, 
                multipleObject.getClass()); 
    } 
 
    @Override 
    public void write(DataOutput dataOutput) throws IOException { 
        if (multipleObject == null) { 
            throw new IOException("Inner multiple object is null"); 
        } 
        DataOutputBuffer out = SerializeUtil.serialize(multipleObject); 
        if (out != null) { 
            bytes = out.getData(); 
            length = out.getData().length; 
            dataOutput.writeInt(length); 
            dataOutput.write(bytes); 
        } 
    } 
 
     
    @Override 
    public int getLength() { 
        return length; 
    } 
 
    @Override 
    public byte[] getBytes() { 
        return bytes; 
    } 
 
    public MultipleObject getMultipleObject() { 
        return multipleObject; 
    } 
 
    public void setMultipleObject(MultipleObject multipleObject) { 
        this.multipleObject = multipleObject; 
    } 
 
    /** A WritableComparator optimized for Text keys. */ 
    public static class Comparator extends WritableComparator { 
        public Comparator() { 
            super(MultipleObjectWritable.class); 
        } 
 
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
            int n1 = WritableUtils.decodeVIntSize(b1[s1]); 
            int n2 = WritableUtils.decodeVIntSize(b2[s2]); 
            return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2); 
        } 
    } 
 
    static { 
        // register this comparator 
        WritableComparator.define(MultipleObjectWritable.class, 
                new Comparator()); 
    } 
 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/1048eb3d0c0da8e0ff8e3d761beb1acc.html