SparkSQL 创建引用入口 val spark=SparkSession.builder() .master("local[6]") #spark://host:port .appName("PM") .getOrCreate() 读取数据 spark.read .option("header",value = true) #第一行设为表头 .option("inferSchema",value = true) #自动推断类型 .csv("dataset/BeijingPM.csv") 写入数据 df.write .partitionBy("year","month") #安分区写入 .mode(SaveMode.Overwrite) #指定写入类型,overwrite,append .csv("dataset/pm_partitions") #保存路径 SparkSQL整合Hive
修改hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="http://www.likecs.com/configuration.xsl"?> <configuration> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://node03:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>hive.metastore.schema.verification</name> <value>false</value> </property> <property> <name>datanucleus.schema.autoCreateAll</name> <value>true</value> </property> <property> <name>hive.server2.thrift.bind.host</name> <value>node03</value> </property> <property> <name>hive.metastore.local</name> <value>false</value> </property> <property> <name>hive.metastore.uris</name> <value>thirft://node03:9083</value> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> </configuration>
以remote模式启动metastore进程
#后台运行metastore,并指定日志输出路径 nohup bin/hive --service metastore 2>&1 >> var/log.log &
文件分发
1、将hive-site.xml文件复制到spark/conf下 2、将hadoop的hdfs-site.xml、core-site.xml文件复制到spark/conf下
SparkSQL访问Hive
#创建引用 val spark=SparkSession.builder() .appName("Hive") .enableHiveSupport() #启用Hive支持 .config("hive.metastore.uris","thrift://node03:9083") #通过thirft服务访问hive .config("spark.sql.warehouse.dir","/dataset/hive") .getOrCreate() #读取hdfs文件,spark集群运行任务必须将文件等公共资源放在所有集群均可以访问到的地方 #创建Schema,为数据指定类型 #DataFrame数据需要指定Schema val schema=StructType{ List( StructField("name",StringType), StructField("age",IntegerType), StructField("gpa",FloatType) ) } val df=spark.read .option("delimiter","\t") .schema(schema) .csv("hdfs://node01:8020/dataset/studenttab10k") #清洗数据,并将文件写入hive表中 val rsDF=df.where(\'age>50) #数据筛选 rsDF.write .mode(SaveMode.Overwrite) .saveAsTable("spark1.student") #保存成表的形式 SparkSql访问Mysql val spark=SparkSession.builder() .master("local[6]") .appName("mysql") .getOrCreate() import spark.implicits._ val schema=StructType{ List( StructField("name",StringType), StructField("age",IntegerType), StructField("gpa",FloatType) ) } val df=spark.read .schema(schema) .option("delimiter","\t") .csv("dataset/studenttab10k") val optionMap=Map( "url"->"jdbc:mysql://192.168.2.136:3306/spark", "dbtable"->"student", "user"->"root", "password"->"root" ) val rsDF=df.where(\'age>55) rsDF.write .format("jdbc") .options(optionMap) .mode(SaveMode.Overwrite) .save()