Hadoop中一些采样器的实现(2)

首先通过InputFormat的getSplits方法得到所有的输入分区;然后确定需要抽样扫描的分区数目,取输入分区总数与用户输入的maxSplitsSampled两者的较小的值得到splitsToSample;然后对输入分区数组shuffle排序,打乱其原始顺序;然后循环逐个扫描每个分区中的记录进行采样,循环的条件是当前已经扫描的分区数小于splitsToSample或者当前已经扫描的分区数超过了splitsToSample但是小于输入分区总数并且当前的采样数小于最大采样数numSamples。

每个分区中记录采样的具体过程如下:

从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq,如果大于则放弃这条记录,然后判断当前的采样数是否小于最大采样数,如果小于则这条记录被选中,被放进采样集合中,否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。然后依次遍历分区中的其它记录。

SplitSampler从s个分区中采样前n个记录,是采样随机数据的一种简便方式。SplitSampler类有两个属性:numSamples(最大采样数),maxSplitsSampled(最大分区数)。其getSample方法实现如下:

public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples = new ArrayList<K>(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
      int splitStep = splits.length / splitsToSample;
      int samplesPerSplit = numSamples / splitsToSample;
      long records = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          samples.add(key);
          key = reader.createKey();
          ++records;
          if ((i+1) * samplesPerSplit <= records) {
            break;
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/504db5dd42194dbdf647da41d1d54c8b.html