reduceByKey(func, [numTasks]) :在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的。
reduceBykey 操作产生一个新的 RDD,其中 key 所有相同的的值组合成为一个 tuple - key 以及与 key 相关联的所有值在 reduce 函数上的执行结果。面临的挑战是,一个 key 的所有值不一定都在一个同一个 paritition 分区里,甚至是不一定在同一台机器里,但是它们必须共同被计算。
在 spark 里,特定的操作需要数据不跨分区分布。在计算期间,一个任务在一个分区上执行,为了所有数据都在单个 reduceByKey 的 reduce 任务上运行,我们需要执行一个 all-to-all 操作。它必须从所有分区读取所有的 key 和 key对应的所有的值,并且跨分区聚集去计算每个 key 的结果 - 这个过程就叫做 shuffle。
尽管每个分区新 shuffle 的数据集将是确定的,分区本身的顺序也是这样,但是这些数据的顺序是不确定的。如果希望 shuffle 后的数据是有序的,可以使用:
mapPartitions 对每个 partition 分区进行排序,例如sorted
repartitionAndSortWithinPartitions 在分区的同时对分区进行高效的排序.
sortBy 对 RDD 进行全局的排序
触发的 shuffle 操作包括 repartition 操作,如 repartition 和 coalesce, ‘ByKey 操作 (除了 counting 之外) 像 groupByKey 和 reduceByKey, 和 join 操作, 像 cogroup 和 join.
性能影响shuffle 是一个代价比较高的操作,它涉及磁盘 I/O、数据序列化、网络 I/O。为了准备 shuffle 操作的数据,Spark 启动了一系列的任务,map 任务组织数据,reduce 完成数据的聚合。
在内部,一个 map 任务的所有结果数据会保存在内存,直到内存不能全部存储为止。然后,这些数据将基于目标分区进行排序并写入一个单独的文件中。在 reduce 时,任务将读取相关的已排序的数据块。
某些 shuffle 操作会大量消耗堆内存空间,因为 shuffle 操作在数据转换前后,需要在使用内存中的数据结构对数据进行组织。需要特别说明的是,reduceByKey 和 aggregateByKey 在 map 时会创建这些数据结构,'ByKey 操作在 reduce 时创建这些数据结构。当内存满的时候,Spark 会把溢出的数据存到磁盘上,这将导致额外的磁盘 I/O 开销和垃圾回收开销的增加。
shuffle 操作还会在磁盘上生成大量的中间文件。在 Spark 1.3 中,这些文件将会保留至对应的 RDD 不在使用并被垃圾回收为止。这么做的好处是,如果在 Spark 重新计算 RDD 的血统关系(lineage)时,shuffle 操作产生的这些中间文件不需要重新创建。如果 Spark 应用长期保持对 RDD 的引用,或者垃圾回收不频繁,这将导致垃圾回收的周期比较长。这意味着,长期运行 Spark 任务可能会消耗大量的磁盘空间。临时数据存储路径可以通过 SparkContext 中设置参数 spark.local.dir 进行配置。
Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。
RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。
另外,每个持久化的 RDD 可以使用不同的 storage level 存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 StorageLevel 对象 (Scala, Java, Python) 给 persist() 方法进行设置。cache() 方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下:
在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据.这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法. 存储级别选择
Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:
如果您的 RDD 适合于默认存储级别 (MEMORY_ONLY),那就这样. 这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行.
如果不是, 试着使用 MEMORY_ONLY_SER 和选择快速的序列化库以使对象更加节省空间,但仍然能够快速访问。 (Java和Scala)
不要溢出到磁盘,除非计算您的数据集的函数是昂贵的, 或者它们过滤大量的数据. 否则, 重新计算分区可能与从磁盘读取分区一样快.