含义:
与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。
输入输出:
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
示例:
通过mapPartitionsWithIndex可以更准确判定数据在分区中的分布情况,见运行结果
groupByKey(func)
含义:
在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable
输入输出:
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
示例:
将数据按照长度分组
reduceByKey(func)
含义:
在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的。
运行时会现在分区内进行合并操作
输入输出:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(func: (V, V) => V): RDD[(K, V)]示例:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res19: Array[(Int, String)] = Array((4,lion), (6,spider), (3,dogcat), (5,tigereagle))
aggregateByKey(func)
含义:
在 (K, V) pairs 的 dataset 上调用时, 返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的. 允许聚合值的类型与输入值的类型不一样, 同时避免不必要的配置. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的.
输入输出:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]需要说明的是第一个函数即(U, V) => U用于在分区内部合并数据,而第二个函数(U, U) => U则用于不同分区间数据的合并
示例:
//首先根据mapPartitionsWithIndex函数查看数据的分布情况,便于后面理解计算结果 val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { iter.map(x => "[partID:" + index + ", val: " + x + "]") } pairRDD.mapPartitionsWithIndex(myfunc).foreach(println) /** * 0:(cat,2),(cat,5),(mouse,4) * 1:(mouse,2),(dog,12),(cat,12) * */ //先计算每个分区单个key的最大值(),然后将不同分区的值相加 pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect // res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect // res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
combineByKey
含义:
使用用户自定义的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]。
输入输出
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]