Spark 系列(十)—— Spark SQL 外部数据源

一、简介 1.1 多数据源支持

Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景。

CSV

JSON

Parquet

ORC

JDBC/ODBC connections

Plain-text files

注:以下所有测试文件均可从本仓库的resources 目录进行下载

1.2 读数据格式

所有读取 API 遵循以下调用格式:

// 格式 DataFrameReader.format(...).option("key", "value").schema(...).load() // 示例 spark.read.format("csv") .option("mode", "FAILFAST") // 读取模式 .option("inferSchema", "true") // 是否自动推断 schema .option("path", "path/to/file(s)") // 文件路径 .schema(someSchema) // 使用预定义的 schema .load()

读取模式有以下三种可选项:

读模式 描述
permissive   当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中  
dropMalformed   删除格式不正确的行  
failFast   遇到格式不正确的数据时立即失败  
1.3 写数据格式 // 格式 DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save() //示例 dataframe.write.format("csv") .option("mode", "OVERWRITE") //写模式 .option("dateFormat", "yyyy-MM-dd") //日期格式 .option("path", "path/to/file(s)") .save()

写数据模式有以下四种可选项:

Scala/Java 描述
SaveMode.ErrorIfExists   如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式  
SaveMode.Append   数据以追加的方式写入  
SaveMode.Overwrite   数据以覆盖的方式写入  
SaveMode.Ignore   如果给定的路径已经存在文件,则不做任何操作  


二、CSV

CSV 是一种常见的文本文件格式,其中每一行表示一条记录,记录中的每个字段用逗号分隔。

2.1 读取CSV文件

自动推断类型读取读取示例:

spark.read.format("csv") .option("header", "false") // 文件中的第一行是否为列的名称 .option("mode", "FAILFAST") // 是否快速失败 .option("inferSchema", "true") // 是否自动推断 schema .load("/usr/file/csv/dept.csv") .show()

使用预定义类型:

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType} //预定义数据格式 val myManualSchema = new StructType(Array( StructField("deptno", LongType, nullable = false), StructField("dname", StringType,nullable = true), StructField("loc", StringType,nullable = true) )) spark.read.format("csv") .option("mode", "FAILFAST") .schema(myManualSchema) .load("/usr/file/csv/dept.csv") .show() 2.2 写入CSV文件 df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具体的分隔符:

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2") 2.3 可选配置

为节省主文篇幅,所有读写配置项见文末 9.1 小节。


三、JSON 3.1 读取JSON文件 spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:默认不支持一条数据记录跨越多行 (如下),可以通过配置 multiLine 为 true 来进行更改,其默认值为 false。

// 默认支持单行 {"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"} //默认不支持多行 { "DEPTNO": 10, "DNAME": "ACCOUNTING", "LOC": "NEW YORK" } 3.2 写入JSON文件 df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept") 3.3 可选配置

为节省主文篇幅,所有读写配置项见文末 9.2 小节。


四、Parquet

Parquet 是一个开源的面向列的数据存储,它提供了多种存储优化,允许读取单独的列非整个文件,这不仅节省了存储空间而且提升了读取效率,它是 Spark 是默认的文件格式。

4.1 读取Parquet文件 spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5) 2.2 写入Parquet文件 df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept") 2.3 可选配置

Parquet 文件有着自己的存储规则,因此其可选配置项比较少,常用的有如下两个:

读写操作 配置项 可选值 默认值 描述
Write   compression or codec   None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy
  None   压缩文件格式  
Read   mergeSchema   true, false   取决于配置项 spark.sql.parquet.mergeSchema   当为真时,Parquet 数据源将所有数据文件收集的 Schema 合并在一起,否则将从摘要文件中选择 Schema,如果没有可用的摘要文件,则从随机数据文件中选择 Schema。  

更多可选配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html


五、ORC

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssxpj.html