spark知识体系03-Rdds,Accumulators,Broadcasts (4)

主要参数包括createCombiner、mergeValue、mergeCombiners三个函数,其对数据类型转换示意图如下:

spark知识体系03-Rdds,Accumulators,Broadcasts

spark源码对这三个参数解释如下:

@param createCombiner function to create the initial value of the aggregation. @param mergeValue function to merge a new value into the aggregation result. @param mergeCombiners function to merge outputs from multiple mergeValue function.

由于聚合操作会遍历分区中所有的元素,因此每个元素(键值对)的键只有两种情况,即以前出现过的和没出现过的。分区内如果没有出现过,聚合执行的是createCombiner方法,否则执行更新,即mergeValue方法。
分区间的聚合操作采用mergeCombiners方法。

示例:
根据单词长度分组

scala> val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[27] at parallelize at <console>:24 scala> val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24 scala> val c = b.zip(a) c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[29] at zip at <console>:28 scala> c.collect res24: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee)) scala> val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y) d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[30] at combineByKey at <console>:30 scala> d.collect res25: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf))) scala>

根据人名进行平均数计算

scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> type ScoreCollector = (Int, Double) defined type alias ScoreCollector scala> type PersonScores = (String, (Int, Double)) defined type alias PersonScores scala> val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) initialScores: Array[(String, Double)] = Array((Fred,88.0), (Fred,95.0), (Fred,91.0), (Wilma,93.0), (Wilma,95.0), (Wilma,98.0)) scala> val wilmaAndFredScores = sc.parallelize(initialScores).cache() wilmaAndFredScores: org.apache.spark.rdd.RDD[(String, Double)] = ParallelCollectionRDD[0] at parallelize at <console>:27 scala> val createScoreCombiner = (score: Double) => (1, score) createScoreCombiner: Double => (Int, Double) = <function1> scala> val scoreCombiner = (collector: ScoreCollector, score: Double) => { | val (numberScores, totalScore) = collector | (numberScores + 1, totalScore + score) | } scoreCombiner: (ScoreCollector, Double) => (Int, Double) = <function2> scala> val scoreMerger = (collector1: ScoreCollector, collector2: ScoreCollector) => { | val (numScores1, totalScore1) = collector1 | val (numScores2, totalScore2) = collector2 | (numScores1 + numScores2, totalScore1 + totalScore2) | } scoreMerger: (ScoreCollector, ScoreCollector) => (Int, Double) = <function2> scala> scala> val scores = wilmaAndFredScores.combineByKey(createScoreCombiner, scoreCombiner, scoreMerger) scores: org.apache.spark.rdd.RDD[(String, (Int, Double))] = ShuffledRDD[1] at combineByKey at <console>:37 scala> scores.collect res0: Array[(String, (Int, Double))] = Array((Wilma,(3,286.0)), (Fred,(3,274.0))) scala> val averagingFunction = (personScore: PersonScores) => { | val (name, (numberScores, totalScore)) = personScore | (name, totalScore / numberScores) | } averagingFunction: PersonScores => (String, Double) = <function1> scala> val averageScores = scores.collectAsMap().map(averagingFunction) averageScores: scala.collection.Map[String,Double] = Map(Fred -> 91.33333333333333, Wilma -> 95.33333333333333)

需要说明的groupByKey,reduceByKey,aggregateByKey,以及foldByKey都是通过调用combineByKey(combineByKeyWithClassTag)来实现的,具体实现方式可以参考org.apache.spark.rdd.PairRDDFunctions类。

foldByKey(func)

含义:
RDD[K,V]根据K将V做折叠、合并处理,其中先将zeroValue应用于V(同一个分区单个key应用一次),再将映射函数应用于处理后的V

输入输出:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzwjz.html