Spark学习笔记之Spark SQL的具体使用


本文整理自网络,侵删。

1. Spark SQL是什么?

  • 处理结构化数据的一个spark的模块
  • 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用

2. Spark SQL的特点

  • 多语言的接口支持(java python scala)
  • 统一的数据访问
  • 完全兼容hive
  • 支持标准的连接

3. 为什么学习SparkSQL?

我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

4. DataFrame(数据框)

  • 与RDD类似,DataFrame也是一个分布式数据容器
  • 然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
  • DataFrame其实就是带有schema信息的RDD

5. SparkSQL1.x的API编程

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

5.1 使用sqlContext创建DataFrame(测试用)

object Ops3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man")))
    val df1: DataFrame = sqlContext.createDataFrame(rdd1)
    df1.show(1)
  }
}
case class Person(name: String, age: Int, sex: String);

5.2 使用sqlContxet中提供的隐式转换函数(测试用)

import org.apache.spark
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man")))
import sqlContext.implicits._
val df1: DataFrame = rdd1.toDF
df1.show()
5.3 使用SqlContext创建DataFrame(常用)
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowRDD: RDD[Row] = linesRDD.map(line => {
 val lineSplit: Array[String] = line.split(",")
 Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2))
})
val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema)
rowDF.show()

6. 使用新版本的2.x的API

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

df.createOrReplaceTempView("p1")
val df2 = sparkSession.sql("select * from p1")
df2.show()

7. 操作SparkSQL的方式

阅读剩余部分

相关阅读 >>

mysql建表常用的sql语句汇总

sqlserver中的触发器基本语法与作用

数据库表的创建、管理和数据操作(实验一)

sql实现leetcode(184.系里最高薪水)

oracle存储过程基本语法介绍

最全mysql数据类型梳理汇总

sql2005注射辅助脚本[粗糙版]

sql无效字符 执行sql语句报错解决方案

sqlite教程(一):sqlite数据库介绍

sql和access哪个好?

更多相关阅读请进入《sql》频道 >>


数据库系统概念 第6版
书籍

数据库系统概念 第6版

机械工业出版社

本书主要讲述了数据模型、基于对象的数据库和XML、数据存储和查询、事务管理、体系结构等方面的内容。



打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...