在一个job中,发现在跑到99.6%的时候,job就过不去了,其中一个reduce过不去。内存溢出。考虑到在reduce阶段缓存到内存的数据过多 lists.add(value);造成内存溢出,job根本跑不过去。
分析问题想到其中的key分步不均造成一个reduce的负载过重,所以考虑用partition进行分区,但是不了解uid key的分步情况,猜用前8位uid hashcode分区(uid为10位或11位)。
public static class PartitionByUid extends
Partitioner<TextInt, Text> {
@Override
public int getPartition(Text key, Text value,
int numPartitions) {
// TODO Auto-generated method stub
return (key.subString(0,8).hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
重新跑job,没有效果。这条路行不通,因为我们不知道其中的数据分步情况。(曾考虑用hive分析一下数据分步情况。)
既然分区这条路行不通,考虑到在reduce阶段不缓存数据,(刚开始我并不知道reduce阶段的iterator只能遍历一遍),写下错误的代码:
public static class UserChainSixReducer extends Reducer<Text,Text,Text,Text>{
private static String prefix1 = "tm";
private Text outKey = new Text();
List<String> lists = new ArrayList<String>();
Boolean flag = false;
int count = 0;
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{
Iterator<Text> iter = values.iterator();
lists.clear();
flag = false;
count = 0;
while(iter.hasNext()){
String value = iter.next().toString();
if(value.contains(prefix1)){
flag = true;
}
}
if(flag){
iter = values.iterator();
while(iter.hasNext()){
String value = iter.next().toString();
if(!value.contains(prefix1)){
count ++;
if(0 == count % 1000){
context.progress();
Thread.sleep(1*1000);
}
outKey.set(value);
context.write(outKey, null);
}
}
}
}
}