利用Map Reduce 过滤大数据的解决方案

问题引入:假设从200亿条记录中(大约200G)查找100多条其中的记录,不考虑集群的计算能力,我们可以这样写mapreduce: 直接不考虑数据量大小,reduce阶段一次行过滤。

public static class UserChainSixMapper extends Mapper<LongWritable, Text, Text, Text> 
    { 
          private static String prefix1 = "tm"; 
          private Text outKey = new Text(); 
          private Text outVal = new Text(); 
         
          public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ 
              String path =  ((FileSplit)context.getInputSplit()).getPath().toString(); 
               
              if(path.contains("userchain")){ 
                    String [] vals = value.toString().split(","); 
                    if(vals.length == 8){ 
                        if(UserChainFirst.isUid(vals[0]) && UserChainFirst.isUid(vals[1])){ 
                              outKey.set(vals[0]); 
                              outVal.set(value); 
                              context.write(outKey, outVal); 
                        } 
                    } 
              } 
              else if(path.contains("userid")){ 
                    String val = value.toString(); 
                    if(UserChainFirst.isUid(val)){ 
                        outKey.set(val); 
                        outVal.set(prefix1); 
                        context.write(outKey, outVal); 
                    } 
              } 
          } 
    } 
 
public static class UserChainSixReducer extends Reducer<Text,Text,Text,Text>{ 
          private static String prefix1 = "tm"; 
          private Text outKey = new Text(); 
          Boolean flag = false; 
          int count = 0; 
          List<String> lists = new ArrayList<String>(); 
         
          public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{ 
              Iterator<Text> iter = values.iterator(); 
              flag = false; 
              count = 0; 
              lists.clear(); 
               
              while(iter.hasNext()){ 
                    String value = iter.next().toString(); 
//                    System.out.println("key:" + key +"," + "value:" + value); 
                        if(value.contains(prefix1)){ 
                              flag = true; 
                        } 
                    else{ 
                        lists.add(value); 
                    } 
              } 
               
              if(flag){ 
                    for(String s : lists){ 
                        count ++; 
                        if(0 == count % 1000){ 
                              context.progress(); 
                              Thread.sleep(1*1000); 
                        } 
                        outKey.set(s); 
                        context.write(outKey, null); 
                    } 
              } 
              else{ 
                    lists.clear(); 
              } 
          } 
    } 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/fb2cc95b34ac3e360cfa26bf6ec22eaf.html