注意:如果需要启动Spark ThriftServer 服务,需要关闭hiveserver2 服务
SparkSQL的ThriftServer服务测试
??? -1. 查看进程是否存在
jps -ml | grep HiveThriftServer2
-2. 查看WEB界面是否正常
有JDBC/ODBC Server这个选项就是正常的
-3. 通过spark自带的beeline命令
./bin/beeline
-4. 通过jdbc来访问spark的ThriftServer接口
Spark中beeline的使用
$ bin/beeline #启动beeline #可以使用!help查看相应的命令 beeline> !help #如connect beeline> !connect Usage: connect <url> <username> <password> [driver] #这样可以多个用户连接 beeline> !connect jdbc:hive2://hadoop01.com:10000 #退出 beeline> !quit
连接成功,在4040 页面也可以看到我们连接的hive
注:如果报错
No known driver to handle "jdbc:hive2://hadoop01.com:10000"
说明缺少了hive的驱动jar,在我们编译好的源码中hive-jdbc-1.2.1.spark2.jar 找到并copy到spark的jars中
通过jdbc来访问spark的ThriftServer接口
向我们java连接mysql一样,我们使用scala来连接ThriftServer
package com.jeffrey import java.sql.DriverManager object SparkJDBCThriftServerDemo { def main(args: Array[String]): Unit = { //1 添加驱动 val driver = "org.apache.hive.jdbc.HiveDriver" Class.forName(driver) //2 构建连接对象 val url = "jdbc:hive2://hadoop01.com:10000" val conn = DriverManager.getConnection(url,"ijeffrey","123456") //3 sql 语句执行 conn.prepareStatement("use common").execute() var pstmt = conn.prepareStatement("select empno,ename,sal from emp") var rs = pstmt.executeQuery() while (rs.next()){ println(s"empno = ${rs.getInt("empno")} " + s"ename=${rs.getString("ename")} " + s" sal=${rs.getDouble("sal")}") } println("---------------------------------------------------------------------------") pstmt = conn.prepareStatement("select empno,ename,sal from emp where sal > ? and ename = ?") pstmt.setDouble(1,3000) pstmt.setString(2,"KING") rs = pstmt.executeQuery() while (rs.next()){ println(s"empno = ${rs.getInt("empno")} " + s"ename=${rs.getString("ename")} " + s" sal=${rs.getDouble("sal")}") } rs.close() pstmt.close() conn.close() } }
执行结果:
SparkSQL案例
案例一:SparkSQL读取HDFS上Json格式的文件
?? ?1. 将案例数据上传到HDFS上
样例数据在${SPARK_HOME}/examples/src/main/resources/*
?? ?2. 编写SparkSQL程序
启动一个spark-shell进行编写
scala> val path = "/spark/data/people.json" scala> val df = spark.read.json(path) scala> df.registerTempTable("tmp04") //通过DataFrame注册一个临时表 scala> spark.sql("show tables").show //通过SQL语句进行操作 scala> spark.sql("select * from tmp04").show #saveAsTable 使用之前 先要use table scala> spark.sql("select * from tmp04").write.saveAsTable("test01") #overwrite 覆盖 append 拼接 ignore 忽略 scala> spark.sql("select * from tmp01").write.mode("overwrite").saveAsTable("test01") scala> spark.sql("select * from tmp01").write.mode("append").saveAsTable("test01") scala> spark.sql("select * from tmp01").write.mode("ignore").saveAsTable("test01")
? ? saveAsTable("test01")默认保存到一张不存在的表中(test01不是临时表),如果表存在的话就会报错
??? SaveMode四种情况:
Append:拼接
Overwrite: 重写
ErrorIfExists:如果表已经存在,则报错,默认就是这一种,存在即报错
Ignore:如果表已经存在了,则忽略这一步操作
除了spark.read.json的方式去读取数据外,还可以使用spark.sql的方式直接读取数据
scala> spark.sql("select * from json.`/spark/data/people.json` where age is not null").show +---+------+ |age| name| +---+------+ | 30| Andy| | 19|Justin| +---+------+ # hdfs上的路径使用`(反票号)引起来
案例二:DataFrame和Dataset和RDD之间的互相转换
??? 在IDEA中集成Hive的话,需要将hive-site.xml文件放到resources目录下面
package com.jeffrey.sql import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object HiveJoinMySQLDemo { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir","D:\\hadoop-2.7.3") // 1.构建SparkSession val warehouseLocation = "/user/hive/warehouse" val spark = SparkSession .builder() .master("local") //如果放到集群运行需要注释掉 .appName("RDD 2 DataFrame") .config("spark.sql.warehouse.dir",warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql val url = "jdbc:mysql://hadoop01.com:3306/test" val table = "tb_dept" val props = new Properties() props.put("user","root") props.put("password","123456") // 1.Hive表数据导入到MySQL中 在shell中可以使用paste写多行 spark.read.table("common.dept") .write .mode(SaveMode.Overwrite) .jdbc(url,table,props) // 2.Hive和MySQL的join操作 //2.1 读取MySQL的数据 val df: DataFrame = spark .read .jdbc(url,table,props) df.createOrReplaceTempView("tmp_tb_dept") //2.1 数据聚合 spark.sql( """ |select a.*,b.dname,b.loc |from common.emp a |join tmp_tb_dept b on a.deptno = b.deptno """.stripMargin).createOrReplaceTempView("tmp_emp_join_dept_result") spark.sql("select * from tmp_emp_join_dept_result").show() // 对表进行缓存的方法 spark.read.table("tmp_emp_join_dept_result").cache() spark.catalog.cacheTable("tmp_emp_join_dept_result") //输出到HDFS上 // 方法一 /*spark .read .table("tmp_emp_join_dept_result") .write.parquet("/spark/sql/hive_join_mysql")*/ // 方法二 spark .read .table("tmp_emp_join_dept_result") .write .format("parquet") .save(s"hdfs://hadoop01.com:8020/spark/sql/hive_join_mysql/${System.currentTimeMillis()}") //输出到Hive中,并且是parquet格式 按照deptno分区 spark .read .table("tmp_emp_join_dept_result") .write .format("parquet") .partitionBy("deptno") .mode(SaveMode.Overwrite) .saveAsTable("hive_emp_dept") println("------------------------------------------------------------") spark.sql("show tables").show() //清空缓存 spark.catalog.uncacheTable("tmp_emp_join_dept_result") } }
可以打成jar文件放在集群上执行
bin/spark-submit \ --class com.jeffrey.sql.HiveJoinMySQLDemo \ --master yarn \ --deploy-mode client \ /opt/datas/jar/hivejoinmysql.jar bin/spark-submit \ --class com.jeffrey.sql.HiveJoinMySQLDemo \ --master yarn \ --deploy-mode cluster \ /opt/datas/logAnalyze.jar
以上即使Spark?SQL的基本使用。
SparkSQL的函数
HIve支持的函数,SparkSQL基本都是支持的,SparkSQL支持两种自定义函数,分别是:UDF和UDAF,两种函数都是通过SparkSession的udf属性进行函数的注册使用的;SparkSQL不支持UDTF函数的 自定义使用。
☆ UDF:一条数据输入,一条数据输出,一对一的函数,即普通函数
☆ UDAF:多条数据输入,一条数据输出,多对一的函数,即聚合函数
下一篇会写一下SparkSQL自定义函数的案例以及其关于SparkSQL其他的案例 ^_^
到此这篇关于Spark SQL配置及使用的文章就介绍到这了,更多相关Spark SQL配置内容请搜索
更多SQL内容来自木庄网络博客