离线数据处理生态系统包含许多关键任务,最大限度的提高数据管道基础设施的稳定性和效率是至关重要的。这边博客将分享Hive和Spark分区的各种策略,以最大限度的提高数据工程生态系统的稳定性和效率。
2.内容大多数Spark Job可以通过三个阶段来表述,即读取输入数据、使用Spark处理、保存输出数据。这意味着虽然实际数据转换主要发生在内存中,但是Job通常以大量的I/O开始和结束。使用Spark常用堆栈是使用存储在HDFS上的Hive表作为输入和输出数据存储。Hive分区有效地表示为分布式文件系统上的文件目录。理论上,尽可能多的文件写入是有意义的,但是,这个也是有代价的。HDFS不能很好的支持大量小文件,每个文件在NameNode内存中大概有150字节的开销,而HDFS的整体IOPS数量有限。文件写入中的峰值绝对会导致HDFS基础架构的某些部分产生性能瓶颈。
比如从某个历史日期到当前日期重新计算表,通常用于修复错误或者数据质量问题。在处理包含一年数据的大型数据集(比如1TB以上)时,可能会将数据分成几千个Spark分区进行处理。虽然从表面上看,这种处理方法并不是最合适的,使用动态分区并将数据结果写入按照日期分区的Hive表中将产生多大100+万个文件。
假如有一个包含3个分区的Spark任务,并且想将数据写入到包含3个分区的Hive中。在这种情况下,希望发送的是将3个文件写入到HDFS,所有数据都存储在每个分区键的单个文件中。实际发生的是将生成9个文件,并且每个文件都有1个记录。使用动态分区写入Hive时,每个Spark分区都由执行程序并行处理。处理Spark分区数据时,每次执行程序在给定Spark分区中遇到新的分区键时,它都会打开一个新文件。默认情况下,Spark对数据会使用Hash或者Round Robin分区器。当应用于任意数据时,可以假设这2中方法在整个Spark分区中相对均匀但是随机分布数据行。如下图所示:
理想情况下,目标文件大小应该大约是HDFS Block大小的倍数,默认情况下为128MB。在Hive管道中,提供了一些配置来自动将结果收集到合理大小的文件中,从开发人员的角度来看几乎是透明的,比如hive.merge.smallfiles.avgsize和hive.merge.size.per.task。但是,Spark中不存在此类功能,因此,我们需要自己开发实现,来给定一个数据集,应该写入多少文件。
2.1 基于Size的计算理论上,这是最直接的方法,设置目标大小,估计数据的大小,然后进行划分。但是,在很多情况下,文件被写入磁盘时会进行压缩,并且其格式与存储在Java堆中的记录格式有所不同。这意味着估算写入磁盘时内存的记录大小不是一件容易的事情。
虽然可以使用Spark SizeEstimator实用程序通过内存中数据的大小进行估计,然后应用某种估计的压缩文件格式因此,但是SizeEstimator会考虑数据帧、数据集的内部消耗,以及数据的大小。总体来说,这种方式不太容易准确实现。
2.2 基于行数的计算这种方法是设置目标行数,计算数据集的大小,然后执行除法以估计目标。我们的目标行数可以通过多种方式确定,或者通过为所有数据集选择一个静态数字,或者通过确定磁盘上单个记录的大小并执行必要的计算。哪种方式是最好取决于你的数据集数量及其复杂性。计数相对来说成本较低,但是需要在计数前缓存以避免重新计算数据集。
2.3 静态文件计数最简单的解决方案是只要求开发人员在每个插入的基础上告诉Spark总共应该写入多少个文件,这种方式需要给开发人员一些其他方法来获得具体的数字,可以通过这种方式来替换昂贵的计算。
3.如何让Spark以合理的方式分发处理数据?即使我们知道希望如何将文件写入磁盘,我们仍然必须让Spark以符合实际的方式生成这些文件来构建我们的分区。Spark提供了许多工具来确定数据在整个分区中的分布方式。但是,各种功能中隐藏着很多复杂性,在某些情况下,它们的含义并不明显。下面将介绍Spark提供的一些选项来控制Spark输出文件的数量。
3.1 合并Spark Coalesce是一个特殊版本的重新分区,它只允许减少总的分区,但是不需要完全的Shuffle,因此比重新分区要快得多。它通过有效的合并分区来实现这一点。如下图所示:
Coalesce在某些情况下看起来不错,但是也有一些问题。首先,Coalesce有一个让我们难以使用的行为。以一个非常基本的Spark应用程序为例,代码如下:
load().map(…).filter(…).save()