spark知识体系03-Rdds,Accumulators,Broadcasts (5)

示例:
scala> val aa = sc.parallelize(List( ("cat",2), ("mouse", 2),("cat", 3), ("dog", 4), ("mouse", 2), ("cat", 1)),2 )
aa: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at

scala> def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { | iter.map(x => "[partID:" + index + ", val: " + x + "]") | } myfunc: (index: Int, iter: Iterator[(String, Int)])Iterator[String] scala> aa.mapPartitionsWithIndex(myfunc).foreach(println) [partID:1, val: (dog,4)] [partID:0, val: (cat,2)] [partID:1, val: (mouse,2)] [partID:0, val: (mouse,2)] [partID:1, val: (cat,1)] [partID:0, val: (cat,3)] scala> aa.foldByKey(0)(_+_).collect() res10: Array[(String, Int)] = Array((dog,4), (cat,6), (mouse,4)) scala> aa.foldByKey(2)(_+_).collect() res11: Array[(String, Int)] = Array((dog,6), (cat,10), (mouse,8)) scala> scala> val bb = sc.parallelize(List( ("cat",2), ("mouse", 2),("cat", 3), ("dog", 4), ("mouse", 2), ("cat", 1)),3 ) bb: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:25 scala> bb.mapPartitionsWithIndex(myfunc).foreach(println) [partID:0, val: (cat,2)] [partID:2, val: (mouse,2)] [partID:1, val: (cat,3)] [partID:2, val: (cat,1)] [partID:0, val: (mouse,2)] [partID:1, val: (dog,4)] scala> bb.foldByKey(2)(_+_).collect() res13: Array[(String, Int)] = Array((cat,12), (mouse,8), (dog,6)) scala> aa.foldByKey(0)(_*_).collect() res14: Array[(String, Int)] = Array((dog,0), (cat,0), (mouse,0)) scala> aa.foldByKey(1)(_*_).collect() res15: Array[(String, Int)] = Array((dog,4), (cat,6), (mouse,4))

首先查看了aa数据的分布情况,两个分区,分区0内三个元素((cat,2),(mouse,2),(cat,3)),分区1内三个元素((dog,4),(mouse,2),(cat,1)),计算过程示意如下:

spark知识体系03-Rdds,Accumulators,Broadcasts

由此可见zeroValue化后,分区0中的(cat,2)变为了(cat,4),而同分区的(cat,3)没有发生变化。分区1中的(cat,1)变成了(cat,3),故cat最后的结果为10。并不是所有的元素都加2,而是同一个分区的单个元素加2。bb的结果可以对应去对比分析。
其实很好理解,foldByKey是通过调用combineByKeyWithClassTag方法实现的,zeroValue方法对应combineByKeyWithClassTag中的createCombiner,而combineByKey是通过调用org.apache.spark.Aggregator来实现的,关键源码如下:

val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) }

分区内的计算通过调用aggregator.combineValuesByKey(iter, context),iter是单个分区的迭代器,

def combineValuesByKey( iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) combiners.insertAll(iter) updateMetrics(context, combiners) combiners.iterator }

org.apache.spark.util.collection.util.collection.ExternalAppendOnlyMap中insertAll方法如下:

def insertAll(entries: Iterator[Product2[K, V]]): Unit = { if (currentMap == null) { throw new IllegalStateException( "Cannot insert new elements into a map after calling iterator") } // An update function for the map that we reuse across entries to avoid allocating // a new closure each time var curEntry: Product2[K, V] = null val update: (Boolean, C) => C = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2) } while (entries.hasNext) { curEntry = entries.next() val estimatedSize = currentMap.estimateSize() if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } if (maybeSpill(currentMap, estimatedSize)) { currentMap = new SizeTrackingAppendOnlyMap[K, C] } currentMap.changeValue(curEntry._1, update) addElementsRead() } }

其update说明了分区内没出现过,聚合执行的是createCombiner,否则执行mergeValue。

sortByKey(func)

含义
将RDD的数据按照key排序重组后再保存到RDD中

输入输出:

示例:

scala> val a = sc.parallelize(List( ("cat",2), ("mouse", 2),("bear", 3), ("dog", 4), ("ant", 2), ("horse", 1)),2 ) a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[49] at parallelize at <console>:25 scala> def myfunc3(index: Int, iter: Iterator[(String,Int)]) : Iterator[String] = { | iter.map(x => index + "," +x.toString()) | } myfunc3: (index: Int, iter: Iterator[(String, Int)])Iterator[String] scala> a.mapPartitionsWithIndex(myfunc3).collect().sorted.foreach(println) 0,(bear,3) 0,(cat,2) 0,(mouse,2) 1,(ant,2) 1,(dog,4) 1,(horse,1) scala> a.sortByKey(true).mapPartitionsWithIndex(myfunc3).collect().sorted.foreach(println) 0,(ant,2) 0,(bear,3) 0,(cat,2) 1,(dog,4) 1,(horse,1) 1,(mouse,2)

Action常用操作

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzwjz.html