scala> val soGouQRdd = sc.textFile("hdfs://SparkSingleNode:9000/SogouQ.mini")
scala> soGouQRdd.count
took 10.753423 s
res0: Long = 2000
可以看出,count之后有2000条记录
首先过滤出有效的数据:
scala> val mapSoGouQRdd = soGouQRdd.map((_.split("\t"))).filter(_.length == 6)
mapSoGouQRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at filter at <console>:23
scala> mapSoGouQRdd.count
took 2.175379 s
res1: Long = 2000
可以发现该文件中的数据都是有效数据。
该文件的格式如下所示:
访问时间 \t 用户ID \t 查询词 \t 该URL在返回结果中的排名 \ t用户点击的顺序号 \t 用户点击的URL
下面使用spark获得搜索结果排名第一同时点击结果排名也是第一的数据量:
scala> val filterSoGouQRdd = mapSoGouQRdd.filter(_(3).toInt == 1).filter(_(4).toInt == 1)
filterSoGouQRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at filter at <console>:25
scala> filterSoGouQRdd.count
可以发现搜索结果排名第一同时点击结果排名也是第一的数据量为794条;
使用toDebugString查看一下其lineage:
scala> filterSoGouQRdd.toDebugString
res3: String =
(2) MapPartitionsRDD[5] at filter at <console>:25 []
| MapPartitionsRDD[4] at filter at <console>:25 []
| MapPartitionsRDD[3] at filter at <console>:23 []
| MapPartitionsRDD[2] at map at <console>:23 []
| MapPartitionsRDD[1] at textFile at <console>:21 []
| hdfs://SparkSingleNode:9000/SogouQ.mini HadoopRDD[0] at textFile at <console>:21 []
scala>
为什么没有?
HadoopRDD->MappedRDD->MappedRDD->FilteredRDD->FilteredRDD->FilteredRDD
3、搜狗日志文件深入实战
下面看,用户ID查询次数排行榜:
该文件的格式如下所示:
访问时间 \t 用户ID \t 查询词 \t 该URL在返回结果中的排名 \ t用户点击的顺序号 \t 用户点击的URL
scala> val sortedSoGouQRdd = mapSoGouQRdd.map(x => (x(1),1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
对sortedSogouQRdd进行collect操作:(不要乱collect 会出现OOM的)
scala> sortedSoGouQRdd.collect