主要参数包括createCombiner、mergeValue、mergeCombiners三个函数,其对数据类型转换示意图如下:
spark源码对这三个参数解释如下:
@param createCombiner function to create the initial value of the aggregation. @param mergeValue function to merge a new value into the aggregation result. @param mergeCombiners function to merge outputs from multiple mergeValue function.由于聚合操作会遍历分区中所有的元素,因此每个元素(键值对)的键只有两种情况,即以前出现过的和没出现过的。分区内如果没有出现过,聚合执行的是createCombiner方法,否则执行更新,即mergeValue方法。
分区间的聚合操作采用mergeCombiners方法。
示例:
根据单词长度分组
根据人名进行平均数计算
scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> type ScoreCollector = (Int, Double) defined type alias ScoreCollector scala> type PersonScores = (String, (Int, Double)) defined type alias PersonScores scala> val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) initialScores: Array[(String, Double)] = Array((Fred,88.0), (Fred,95.0), (Fred,91.0), (Wilma,93.0), (Wilma,95.0), (Wilma,98.0)) scala> val wilmaAndFredScores = sc.parallelize(initialScores).cache() wilmaAndFredScores: org.apache.spark.rdd.RDD[(String, Double)] = ParallelCollectionRDD[0] at parallelize at <console>:27 scala> val createScoreCombiner = (score: Double) => (1, score) createScoreCombiner: Double => (Int, Double) = <function1> scala> val scoreCombiner = (collector: ScoreCollector, score: Double) => { | val (numberScores, totalScore) = collector | (numberScores + 1, totalScore + score) | } scoreCombiner: (ScoreCollector, Double) => (Int, Double) = <function2> scala> val scoreMerger = (collector1: ScoreCollector, collector2: ScoreCollector) => { | val (numScores1, totalScore1) = collector1 | val (numScores2, totalScore2) = collector2 | (numScores1 + numScores2, totalScore1 + totalScore2) | } scoreMerger: (ScoreCollector, ScoreCollector) => (Int, Double) = <function2> scala> scala> val scores = wilmaAndFredScores.combineByKey(createScoreCombiner, scoreCombiner, scoreMerger) scores: org.apache.spark.rdd.RDD[(String, (Int, Double))] = ShuffledRDD[1] at combineByKey at <console>:37 scala> scores.collect res0: Array[(String, (Int, Double))] = Array((Wilma,(3,286.0)), (Fred,(3,274.0))) scala> val averagingFunction = (personScore: PersonScores) => { | val (name, (numberScores, totalScore)) = personScore | (name, totalScore / numberScores) | } averagingFunction: PersonScores => (String, Double) = <function1> scala> val averageScores = scores.collectAsMap().map(averagingFunction) averageScores: scala.collection.Map[String,Double] = Map(Fred -> 91.33333333333333, Wilma -> 95.33333333333333)需要说明的groupByKey,reduceByKey,aggregateByKey,以及foldByKey都是通过调用combineByKey(combineByKeyWithClassTag)来实现的,具体实现方式可以参考org.apache.spark.rdd.PairRDDFunctions类。
foldByKey(func)
含义:
RDD[K,V]根据K将V做折叠、合并处理,其中先将zeroValue应用于V(同一个分区单个key应用一次),再将映射函数应用于处理后的V
输入输出:
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]