在进行Spark Streaming的开发时,我们常常需要将DStream转为DataFrame来进行进一步的处理,
共有两种方式,方式一:
利用map算子和tuple来完成,一般的场景下采用这种方式即可。
但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22的影响。这时可以采用方式二:
val spark = SparkSession.builder() .appName("Test") .getOrCreate() dStream.foreachRDD{ rdd => val res:RDD[Row] = rdd.map{ row => val buffer = ArrayBuffer.empty[Any] val fields: Array[String] = row.split("\\|~\\|") buffer.append(fields(0)) buffer.append(fields(1)) buffer.append(fields(2)) // 省略 buffer.append(fields(25)) Row.fromSeq(buffer) } val schema = StructType(Seq( StructField("col1", StringType, false), StructField("col2", StringType, false), StructField("col3", StringType, false), // 省略 StructField("col26", StringType, false) )) val df: DataFrame = spark.createDataFrame(result, schema) // 业务逻辑 }