1、动手实战和调试Spark文件操作
这里,我以指定executor-memory参数的方式,启动spark-shell。
启动hadoop集群
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ jps
8457 Jps
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh
启动spark集群
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g
在命令行中,我指定了spark-shell运行时暂时用的每个机器上executor的内存大小为1GB。
从HDFS上读取该文件
scala> val rdd1 = sc.textFile("/README.md")
或
scala> val rdd1 = sc.textFile("hdfs:SparkSingleNode:9000/README.md")
返回,MapPartitionsRDD
使用,toDebugString,可以查看其lineage的关系。
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> rdd1.toDebugString
16/09/26 22:47:01 INFO mapred.FileInputFormat: Total input paths to process : 1
res0: String =
(2) MapPartitionsRDD[1] at textFile at <console>:21 []
| /README.md HadoopRDD[0] at textFile at <console>:21 []
scala>
可以看出,MapPartitionsRDD是HadoopRDD转换而来的。
hadoopFile,这个方法,产生HadoopRDD
map,这个方法,产生MapPartitionsRDD
从源码分析过程
scala> val result = rdd1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
le>:23, took 15.095588 s
result: Array[(String, Int)] = Array((package,1), (this,1), (Version"](#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (DataFram...
scala>
不可这样使用toDebugString
scala> result.toDebugString
<console>:26: error: value toDebugString is not a member of Array[(String, Int)]
result.toDebugString