示例:
如下,b将RDD[String]转换为RDD[Int],c将RDD[String]转换为RDD[(String,Int)]
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
a: org.apache.
spark.
rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:25
scala> val b = a.map(_.length)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:27
scala> b.collect()
res1: Array[Int] = Array(3, 6, 6, 3, 8)
scala> val c= a.map(x=>(x,x.length))
c: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:27
scala> c.collect()
res2: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
flatMap(func)
含义:
通过将一个函数应用于该RDD的所有元素,返回一个新的RDD。 其中flatMap函数可以一个元素对应一个或者多个元素
输入输出:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
示例:
如下先应用元素dog先被map为(d,1),(o,1),(g,1),然后和其他map后的元素一起扁平化
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at
scala> val d=a.flatMap(x=>{
| for (i<-0 until x.length) yield (x.charAt(i),1)
| })
d: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[7] at flatMap at <console>:27
scala> d.collect()
res3: Array[(Char, Int)] = Array((d,1), (o,1), (g,1), (s,1), (a,1), (l,1), (m,1), (o,1), (n,1), (s,1), (a,1), (l,1), (m,1), (o,1), (n,1), (r,1), (a,1), (t,1), (e,1), (l,1), (e,1), (p,1), (h,1), (a,1), (n,1), (t,1))
mapPartition(func)
含义
运行在在每个 RDD 的 partition(分区),所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U>类型。
输入输出
def mapPartitions[U:ClassTag](f:Iterator[T] =Iterator[U],preservesPartitioning:Boolean = false): RDD[U]
示例
如下判断分区中两两相邻的元素,根据结果推断,可以判定(1,2,3)在一个分区,(4,5,6)在一个分区,(7,8,9)在一个分区
scala> val e = sc.parallelize(1 to 9, 3)
e: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:25
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
| var res = List[(T, T)]()
| var pre = iter.next
| while (iter.hasNext)
| {
|
val cur = iter.next;
|
res .::= (pre, cur)
|
pre = cur;
| }
| res.iterator
| }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
scala> e.mapPartitions(myfunc).collect
res4: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
mapPartitionsWithIndex(func)