我们的业务是要使用mongodb的Hadoop driver处理输出。我们重写的mongodbInputFormat的时候传递数据的时候是把数据写入conf,然后再从mongoSplitter里面里面从conf里面读出来。比如下面这样:
把数据放入数据conf:
List<Long> tagsUrns =null;
//tagUrns 赋值.....
conf.set("tagUrns",
ObjectSerializer.serialize((Serializable) tagsUrns));
在mapper,reduce,或者mongoSpiltter里拿出conf里的数据:
List<Long> tagUrns = (List<Long>) ObjectSerializer
.deserialize(context.getConfiguration().get("tagUrns"));
由于conf只能放入boolean、int、string的值,而我需要给hadoop Configuration放入的是list或者其他对象,所以需要用到一个序列化工具类。
序列化工具类代码:
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import Java.io.*;
public class ObjectSerializer {
private static final Log log = LogFactory.getLog(ObjectSerializer.class);
public static String serialize(Serializable obj) throws IOException {
if (obj == null)
return "";
try {
ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
objStream.writeObject(obj);
objStream.close();
return encodeBytes(serialObj.toByteArray());
} catch (Exception e) {
throw new IOException("Serialization error: " + e.getMessage(), e);
}
}
public static Object deserialize(String str) throws IOException {
if (str == null || str.length() == 0)
return null;
try {
ByteArrayInputStream serialObj = new ByteArrayInputStream(
decodeBytes(str));
ObjectInputStream objStream = new ObjectInputStream(serialObj);
return objStream.readObject();
} catch (Exception e) {
throw new IOException("Deserialization error: " + e.getMessage(), e);
}
}
public static String encodeBytes(byte[] bytes) {
StringBuffer strBuf = new StringBuffer();
for (int i = 0; i < bytes.length; i++) {
strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) 'a')));
strBuf.append((char) (((bytes[i]) & 0xF) + ((int) 'a')));
}
return strBuf.toString();
}
public static byte[] decodeBytes(String str) {
byte[] bytes = new byte[str.length() / 2];
for (int i = 0; i < str.length(); i += 2) {
char c = str.charAt(i);
bytes[i / 2] = (byte) ((c - 'a') << 4);
c = str.charAt(i + 1);
bytes[i / 2] += (c - 'a');
}
return bytes;
}
}
但是当我放入的数据太大时,运行hadoop任务时报错,错误信息:
错误信息说明hadoop的conf是有限制的,查询下发现限制为5M:
所以当时就懵了。这不从conf传入,好像又拿不到。最后想着能不能从hdfs文件直接读数据文件。但是我的数据必须在mongospliter里面获取数据,而这里只能拿到conf。最后发现hadoop获取FileSystem方式为:
@Resource(name = "hadoopConfiguration")
private Configuration configuration = null;
...........
fileSystem = FileSystem.get(configuration);
推荐阅读:
Hadoop 1.2.1 单节点安装(Single Node Setup)步骤
在CentOS上安装Hadoop
Ubuntu 12.04安装Hadoop