spark知识体系03-Rdds,Accumulators,Broadcasts (2)

示例:
如下,b将RDD[String]转换为RDD[Int],c将RDD[String]转换为RDD[(String,Int)]

scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:25 scala> val b = a.map(_.length) b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:27 scala> b.collect() res1: Array[Int] = Array(3, 6, 6, 3, 8) scala> val c= a.map(x=>(x,x.length)) c: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:27 scala> c.collect() res2: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

flatMap(func)

含义:
通过将一个函数应用于该RDD的所有元素,返回一个新的RDD。 其中flatMap函数可以一个元素对应一个或者多个元素

输入输出:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

示例:
如下先应用元素dog先被map为(d,1),(o,1),(g,1),然后和其他map后的元素一起扁平化
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at

scala> val d=a.flatMap(x=>{ | for (i<-0 until x.length) yield (x.charAt(i),1) | }) d: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[7] at flatMap at <console>:27 scala> d.collect() res3: Array[(Char, Int)] = Array((d,1), (o,1), (g,1), (s,1), (a,1), (l,1), (m,1), (o,1), (n,1), (s,1), (a,1), (l,1), (m,1), (o,1), (n,1), (r,1), (a,1), (t,1), (e,1), (l,1), (e,1), (p,1), (h,1), (a,1), (n,1), (t,1))

mapPartition(func)

含义
运行在在每个 RDD 的 partition(分区),所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U>类型。

输入输出

def mapPartitions[U:ClassTag](f:Iterator[T] =Iterator[U],preservesPartitioning:Boolean = false): RDD[U]

示例
如下判断分区中两两相邻的元素,根据结果推断,可以判定(1,2,3)在一个分区,(4,5,6)在一个分区,(7,8,9)在一个分区

scala> val e = sc.parallelize(1 to 9, 3) e: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:25 scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { | var res = List[(T, T)]() | var pre = iter.next | while (iter.hasNext) | { | val cur = iter.next; | res .::= (pre, cur) | pre = cur; | } | res.iterator | } myfunc: [T](iter: Iterator[T])Iterator[(T, T)] scala> e.mapPartitions(myfunc).collect res4: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

mapPartitionsWithIndex(func)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzwjz.html