对于#loadInvolvedFiles方法而言,其会查询指定分区分区下所有的数据文件(parquet格式),并且如果开启了hoodie.bloom.index.prune.by.ranges,还会读取文件中的最小key和最大key(为加速后续的查找)。
HoodieBloomIndex#findMatchingFilesForRecordKeys核心代码如下
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys( final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable, Map<String, Long> fileGroupToComparisons) { JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD = explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD); if (config.useBloomIndexBucketizedChecking()) { Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons, config.getBloomIndexKeysPerBucket()); fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t)) .repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2); } else { fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism); } return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true) .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0) .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream() .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()))) .collect(Collectors.toList()).iterator()); }该方法首先会查找记录需要进行比对的文件,然后再查询的记录的位置信息。
其中,对于#explodeRecordRDDWithFileComparisons方法而言,其会借助树/链表结构构造的文件过滤器来加速记录对应文件的查找(每个record可能会对应多个文件)。
而使用Bloom Filter的核心逻辑承载在HoodieBloomIndexCheckFunction,HoodieBloomIndexCheckFunction$LazyKeyCheckIterator该迭代器完成了记录对应文件的实际查找过程,查询的核心逻辑在computeNext`中,其核心代码如下
protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() { List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>(); try { // process one file in each go. while (inputItr.hasNext()) { Tuple2<String, HoodieKey> currentTuple = inputItr.next(); String fileId = currentTuple._1; String partitionPath = currentTuple._2.getPartitionPath(); String recordKey = currentTuple._2.getRecordKey(); Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId); // lazily init state if (keyLookupHandle == null) { keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); } // if continue on current file if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) { keyLookupHandle.addKey(recordKey); } else { // do the actual checking of file & break out ret.add(keyLookupHandle.getLookupResult()); keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); keyLookupHandle.addKey(recordKey); break; } } // handle case, where we ran out of input, close pending work, update return val if (!inputItr.hasNext()) { ret.add(keyLookupHandle.getLookupResult()); } } catch (Throwable e) { if (e instanceof HoodieException) { throw e; } throw new HoodieIndexException("Error checking bloom filter index. ", e); } return ret; }该方法每次迭代只会处理一个文件,每次处理时都会生成HoodieKeyLookupHandle,然后会添加recordKey,处理完后再获取查询结果。
其中HoodieKeyLookupHandle#addKey方法核心代码如下
public void addKey(String recordKey) { // check record key against bloom filter of current file & add to possible keys if needed if (bloomFilter.mightContain(recordKey)) { ... candidateRecordKeys.add(recordKey); } totalKeysChecked++; }可以看到,这里使用到了Bloom Filter来判断该记录是否存在,如果存在,则加入到候选队列中,等待进一步判断;若不存在,则无需额外处理,其中Bloom Filter会在创建HoodieKeyLookupHandle实例时初始化(从指定文件中读取Bloom Filter)。