spark知识体系03-Rdds,Accumulators,Broadcasts (3)

含义:
与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。

输入输出:
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

示例:
通过mapPartitionsWithIndex可以更准确判定数据在分区中的分布情况,见运行结果

scala> val e = sc.parallelize(1 to 9, 3) e: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:25 scala> scala> def myfunc2(index: Int, iter: Iterator[Int]) : Iterator[String] = { | iter.map(x => index + "," + x) | } myfunc2: (index: Int, iter: Iterator[Int])Iterator[String] scala> e.mapPartitionsWithIndex(myfunc2).collect() res5: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9)

groupByKey(func)

含义:
在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable

输入输出:

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

示例:
将数据按照长度分组

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) b.groupByKey.collect res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

reduceByKey(func)

含义:
在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的。
运行时会现在分区内进行合并操作

输入输出:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(func: (V, V) => V): RDD[(K, V)]

示例:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res19: Array[(Int, String)] = Array((4,lion), (6,spider), (3,dogcat), (5,tigereagle))

aggregateByKey(func)

含义:
在 (K, V) pairs 的 dataset 上调用时, 返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的. 允许聚合值的类型与输入值的类型不一样, 同时避免不必要的配置. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的.

输入输出:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]

需要说明的是第一个函数即(U, V) => U用于在分区内部合并数据,而第二个函数(U, U) => U则用于不同分区间数据的合并

示例:

//首先根据mapPartitionsWithIndex函数查看数据的分布情况,便于后面理解计算结果 val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { iter.map(x => "[partID:" + index + ", val: " + x + "]") } pairRDD.mapPartitionsWithIndex(myfunc).foreach(println) /** * 0:(cat,2),(cat,5),(mouse,4) * 1:(mouse,2),(dog,12),(cat,12) * */ //先计算每个分区单个key的最大值(),然后将不同分区的值相加 pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect // res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect // res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

combineByKey

含义:
使用用户自定义的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]。

输入输出

def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzwjz.html