SparkSQL配置及使用教程


当前第2页 返回上一页

注意:如果需要启动Spark ThriftServer 服务,需要关闭hiveserver2 服务

SparkSQL的ThriftServer服务测试

??? -1. 查看进程是否存在
jps -ml | grep HiveThriftServer2
-2. 查看WEB界面是否正常
有JDBC/ODBC Server这个选项就是正常的
-3. 通过spark自带的beeline命令
./bin/beeline
-4. 通过jdbc来访问spark的ThriftServer接口

Spark中beeline的使用

$ bin/beeline    #启动beeline
#可以使用!help查看相应的命令
beeline> !help
#如connect
beeline> !connect
Usage: connect <url> <username> <password> [driver]
#这样可以多个用户连接
beeline> !connect jdbc:hive2://hadoop01.com:10000
#退出
beeline> !quit

连接成功,在4040 页面也可以看到我们连接的hive

注:如果报错
No known driver to handle "jdbc:hive2://hadoop01.com:10000"
说明缺少了hive的驱动jar,在我们编译好的源码中hive-jdbc-1.2.1.spark2.jar 找到并copy到spark的jars中

通过jdbc来访问spark的ThriftServer接口

向我们java连接mysql一样,我们使用scala来连接ThriftServer

package com.jeffrey
 
import java.sql.DriverManager
 
object SparkJDBCThriftServerDemo {
    def main(args: Array[String]): Unit = {
        //1 添加驱动
        val driver = "org.apache.hive.jdbc.HiveDriver"
        Class.forName(driver)
 
        //2 构建连接对象
        val url = "jdbc:hive2://hadoop01.com:10000"
        val conn = DriverManager.getConnection(url,"ijeffrey","123456")
 
        //3 sql 语句执行
        conn.prepareStatement("use common").execute()
 
        var pstmt = conn.prepareStatement("select empno,ename,sal from emp")
 
        var rs = pstmt.executeQuery()
 
        while (rs.next()){
            println(s"empno = ${rs.getInt("empno")}  " +
                    s"ename=${rs.getString("ename")}   " +
                    s" sal=${rs.getDouble("sal")}")
        }
 
        println("---------------------------------------------------------------------------")
 
        pstmt = conn.prepareStatement("select empno,ename,sal from emp where sal > ? and ename = ?")
        pstmt.setDouble(1,3000)
        pstmt.setString(2,"KING")
 
        rs = pstmt.executeQuery()
 
        while (rs.next()){
            println(s"empno = ${rs.getInt("empno")}  " +
                    s"ename=${rs.getString("ename")}   " +
                    s" sal=${rs.getDouble("sal")}")
        }
 
        rs.close()
        pstmt.close()
        conn.close()
    }
}

执行结果:

SparkSQL案例

案例一:SparkSQL读取HDFS上Json格式的文件

?? ?1. 将案例数据上传到HDFS上
样例数据在${SPARK_HOME}/examples/src/main/resources/*

?? ?2. 编写SparkSQL程序
启动一个spark-shell进行编写

scala> val path = "/spark/data/people.json"
scala> val df = spark.read.json(path)
scala> df.registerTempTable("tmp04") //通过DataFrame注册一个临时表
scala> spark.sql("show tables").show  //通过SQL语句进行操作
scala> spark.sql("select * from tmp04").show
 
#saveAsTable 使用之前 先要use table
scala> spark.sql("select * from tmp04").write.saveAsTable("test01")
#overwrite 覆盖  append 拼接  ignore 忽略
scala> spark.sql("select * from tmp01").write.mode("overwrite").saveAsTable("test01")
scala> spark.sql("select * from tmp01").write.mode("append").saveAsTable("test01")
scala> spark.sql("select * from tmp01").write.mode("ignore").saveAsTable("test01")

? ? saveAsTable("test01")默认保存到一张不存在的表中(test01不是临时表),如果表存在的话就会报错

??? SaveMode四种情况:
Append:拼接
Overwrite: 重写
ErrorIfExists:如果表已经存在,则报错,默认就是这一种,存在即报错
Ignore:如果表已经存在了,则忽略这一步操作

除了spark.read.json的方式去读取数据外,还可以使用spark.sql的方式直接读取数据

scala> spark.sql("select * from json.`/spark/data/people.json` where age is not null").show 
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+
# hdfs上的路径使用`(反票号)引起来

案例二:DataFrame和Dataset和RDD之间的互相转换

??? 在IDEA中集成Hive的话,需要将hive-site.xml文件放到resources目录下面

package com.jeffrey.sql
 
import java.util.Properties
 
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 
object HiveJoinMySQLDemo {
    def main(args: Array[String]): Unit = {
        System.setProperty("hadoop.home.dir","D:\\hadoop-2.7.3")
        // 1.构建SparkSession
        val warehouseLocation = "/user/hive/warehouse"
 
        val spark = SparkSession
                .builder()
                .master("local")    //如果放到集群运行需要注释掉
                .appName("RDD 2 DataFrame")
                .config("spark.sql.warehouse.dir",warehouseLocation)
                .enableHiveSupport()
                .getOrCreate()
 
        import spark.implicits._
        import spark.sql
 
        val url = "jdbc:mysql://hadoop01.com:3306/test"
        val table = "tb_dept"
 
        val props = new Properties()
        props.put("user","root")
        props.put("password","123456")
 
        // 1.Hive表数据导入到MySQL中    在shell中可以使用paste写多行
        spark.read.table("common.dept")
                .write
                .mode(SaveMode.Overwrite)
                .jdbc(url,table,props)
 
        // 2.Hive和MySQL的join操作
        //2.1 读取MySQL的数据
       val df: DataFrame = spark
                .read
                .jdbc(url,table,props)
 
        df.createOrReplaceTempView("tmp_tb_dept")
        //2.1 数据聚合
        spark.sql(
            """
              |select a.*,b.dname,b.loc
              |from common.emp a
              |join tmp_tb_dept b on a.deptno = b.deptno
            """.stripMargin).createOrReplaceTempView("tmp_emp_join_dept_result")
 
        spark.sql("select * from tmp_emp_join_dept_result").show()
 
        // 对表进行缓存的方法
        spark.read.table("tmp_emp_join_dept_result").cache()
        spark.catalog.cacheTable("tmp_emp_join_dept_result")
 
        //输出到HDFS上
        // 方法一
        /*spark
                .read
                .table("tmp_emp_join_dept_result")
                .write.parquet("/spark/sql/hive_join_mysql")*/
 
        // 方法二
        spark
                .read
                .table("tmp_emp_join_dept_result")
                .write
                .format("parquet")
                .save(s"hdfs://hadoop01.com:8020/spark/sql/hive_join_mysql/${System.currentTimeMillis()}")
 
 
        //输出到Hive中,并且是parquet格式 按照deptno分区
        spark
                .read
                .table("tmp_emp_join_dept_result")
                .write
                .format("parquet")
                .partitionBy("deptno")
                .mode(SaveMode.Overwrite)
                .saveAsTable("hive_emp_dept")
 
        println("------------------------------------------------------------")
 
        spark.sql("show tables").show()
 
        //清空缓存
        spark.catalog.uncacheTable("tmp_emp_join_dept_result")
 
    }
}

可以打成jar文件放在集群上执行

bin/spark-submit \
--class com.jeffrey.sql.HiveJoinMySQLDemo \
--master yarn \
--deploy-mode client \
/opt/datas/jar/hivejoinmysql.jar
 
 
bin/spark-submit \
--class com.jeffrey.sql.HiveJoinMySQLDemo \
--master yarn \
--deploy-mode cluster \
/opt/datas/logAnalyze.jar

以上即使Spark?SQL的基本使用。

SparkSQL的函数

HIve支持的函数,SparkSQL基本都是支持的,SparkSQL支持两种自定义函数,分别是:UDF和UDAF,两种函数都是通过SparkSession的udf属性进行函数的注册使用的;SparkSQL不支持UDTF函数的 自定义使用。

☆ UDF:一条数据输入,一条数据输出,一对一的函数,即普通函数

☆ UDAF:多条数据输入,一条数据输出,多对一的函数,即聚合函数

下一篇会写一下SparkSQL自定义函数的案例以及其关于SparkSQL其他的案例 ^_^

到此这篇关于Spark SQL配置及使用的文章就介绍到这了,更多相关Spark SQL配置内容请搜索

更多SQL内容来自木庄网络博客


标签:SQL

返回前面的内容

相关阅读 >>

隐藏在sqlserver 字段中的超诡异字符解决过程

sql语法 分隔符理解小结

mysql常见优化方案汇总

mysql中存储过程的详细详解

oracle中 关于数据库存储过程和存储函数的使用

sqlserver触发器详解

sql中的连接查询详解

oracle存储过程基本语法介绍

sql查询方法精华集

sqlserver、mysql、oracle三种数据库的优缺点总结

更多相关阅读请进入《sql》频道 >>


数据库系统概念 第6版
书籍

数据库系统概念 第6版

机械工业出版社

本书主要讲述了数据模型、基于对象的数据库和XML、数据存储和查询、事务管理、体系结构等方面的内容。



打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...