BloomFilter在Hudi中的应用

Bloom Filter在Hudi中的应用 介绍

Bloom Filter可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,主要缺点是存在一定的误判率:当其判断元素存在时,实际上元素可能并不存在。而当判定不存在时,则元素一定不存在,Bloom Filter在对精确度要求不太严格的大数据量场景下运用十分广泛。

引入

为何要引入Bloom Filter?这是Hudi为加快数据upsert采用的一种解决方案,即判断record是否已经在文件中存在,若存在,则更新,若不存在,则插入。对于upsert显然无法容忍出现误判,否则可能会出现应该插入和变成了更新的错误,那么Hudi是如何解决误判问题的呢?一种简单办法是当Bloom Filter判断该元素存在时,再去文件里二次确认该元素是否真的存在;而当Bloom Filter判断该元素不存在时,则无需读文件,通过二次确认的方法来规避Bloom Filter的误判问题,实际上这也是Hudi采取的方案,值得一提的是,现在Delta暂时还不支持Bloom Filter,其判断一条记录是否存在是直接通过一次全表join来实现,效率比较低下。接下来我们来分析Bloom Filter在Hudi中的应用。

流程

Hudi从上游系统(Kafka、DFS等)消费一批数据后,会根据用户配置的写入模式(insert、upsert、bulkinsert)写入Hudi数据集。而当配置为upsert时,意味着需要将数据插入更新至Hudi数据集,而第一步是需要标记哪些记录已经存在,哪些记录不存在,然后,对于存在的记录进行更新,不存在记录进行插入。

在HoodieWriteClient中提供了对应三种写入模式的方法(#insert、#upsert、#bulkinsert),对于使用了Bloom Filter的#upsert方法而言,其核心源代码如下

public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) { ... // perform index loop up to get existing location of records JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); ... return upsertRecordsInternal(taggedRecords, commitTime, table, true); }

可以看到首先利用索引给记录打标签,然后再进行更新,下面主要分析打标签的过程。

对于索引,Hudi提供了四种索引方式的实现:HBaseIndex、HoodieBloomIndex、HoodieGlobalBloomIndex、InMemoryHashIndex,默认使用HoodieBloomIndex。其中HoodieGlobalBloomIndex与HoodieBloomIndex的区别是前者会读取所有分区文件,而后者只读取记录所存在的分区下的文件。下面以HoodieBloomIndex为例进行分析。

HoodieBloomIndex#tagLocation核心代码如下

public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) { // Step 0: cache the input record RDD if (config.getBloomIndexUseCaching()) { recordRDD.persist(config.getBloomIndexInputStorageLevel()); } // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) JavaPairRDD<String, String> partitionRecordKeyPairRDD = recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); // Lookup indexes for all the partition/recordkey pair JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); // Cache the result, for subsequent stages. if (config.getBloomIndexUseCaching()) { keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); } // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys // Cost: 4 sec. JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD); if (config.getBloomIndexUseCaching()) { recordRDD.unpersist(); // unpersist the input Record RDD keyFilenamePairRDD.unpersist(); } return taggedRecordRDD; }

该过程会缓存记录以便优化数据的加载。首先从记录中解析出对应的分区路径 -> key,接着查看索引,然后将位置信息(存在于哪个文件)回推到记录中。

HoodieBloomIndex#lookup核心代码如下

private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex( JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc, final HoodieTable hoodieTable) { // Obtain records per partition, in the incoming records Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as <Partition, filename> pairs List<Tuple2<String, BloomIndexFileInfo>> fileInfoList = loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable); final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))); // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, // that contains it. Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup); int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism); return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup); }

该方法首先会计算出每个分区有多少条记录和影响的分区有哪些,然后加载影响的分区的文件,最后计算并行度后,开始找记录真正存在的文件。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgydpg.html