问题引入:假设从200亿条记录中(大约200G)查找100多条其中的记录,不考虑集群的计算能力,我们可以这样写mapreduce: 直接不考虑数据量大小,reduce阶段一次行过滤。
public static class UserChainSixMapper extends Mapper<LongWritable, Text, Text, Text>
{
private static String prefix1 = "tm";
private Text outKey = new Text();
private Text outVal = new Text();
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String path = ((FileSplit)context.getInputSplit()).getPath().toString();
if(path.contains("userchain")){
String [] vals = value.toString().split(",");
if(vals.length == 8){
if(UserChainFirst.isUid(vals[0]) && UserChainFirst.isUid(vals[1])){
outKey.set(vals[0]);
outVal.set(value);
context.write(outKey, outVal);
}
}
}
else if(path.contains("userid")){
String val = value.toString();
if(UserChainFirst.isUid(val)){
outKey.set(val);
outVal.set(prefix1);
context.write(outKey, outVal);
}
}
}
}
public static class UserChainSixReducer extends Reducer<Text,Text,Text,Text>{
private static String prefix1 = "tm";
private Text outKey = new Text();
Boolean flag = false;
int count = 0;
List<String> lists = new ArrayList<String>();
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{
Iterator<Text> iter = values.iterator();
flag = false;
count = 0;
lists.clear();
while(iter.hasNext()){
String value = iter.next().toString();
// System.out.println("key:" + key +"," + "value:" + value);
if(value.contains(prefix1)){
flag = true;
}
else{
lists.add(value);
}
}
if(flag){
for(String s : lists){
count ++;
if(0 == count % 1000){
context.progress();
Thread.sleep(1*1000);
}
outKey.set(s);
context.write(outKey, null);
}
}
else{
lists.clear();
}
}
}
}