1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.
/** * Specifies the input data source format.Built-in options include “parquet”,”json”,etc. * * @since 1.4.0 */ def format(source: String): DataFrameReader = { this.source = source //FileType this }
DataFrame.write()
1. 创建DataFrameWriter实例
/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */ @Experimental def write: DataFrameWriter = new DataFrameWriter(this) 1
2. 追踪DataFrameWriter源码如下:
以DataFrame的方式向外部存储系统中写入数据。
/** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @since 1.4.0 */ @Experimental final class DataFrameWriter private[sql](df: DataFrame) {
DataFrameWriter.mode()
1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。
/** * Specifies the behavior when data or table already exists. Options include: * - `SaveMode.Overwrite`: overwrite the existing data. * - `SaveMode.Append`: append the data. * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). //默认操作 * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */ def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this }
2. 通过模式匹配接收外部参数
/** * Specifies the behavior when data or table already exists. Options include: * - `overwrite`: overwrite the existing data. * - `append`: append the data. * - `ignore`: ignore the operation (i.e. no-op). * - `error`: default option, throw an exception at runtime. * * @since 1.4.0 */ def mode(saveMode: String): DataFrameWriter = { this.mode = saveMode.toLowerCase match { case "overwrite" => SaveMode.Overwrite case "append" => SaveMode.Append case "ignore" => SaveMode.Ignore case "error" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") } this }
DataFrameWriter.save()
1. save将结果保存传入的路径。
/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */ def save(path: String): Unit = { this.extraOptions += ("path" -> path) save() }
2. 追踪save方法。
/** * Saves the content of the [[DataFrame]] as the specified table. * * @since 1.4.0 */ def save(): Unit = { ResolvedDataSource( df.sqlContext, source, partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), mode, extraOptions.toMap, df) }
3. 其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。
// This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", defaultValue = Some("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")
DataFrame.scala中部分函数详解:
1. toDF函数是将RDD转换成DataFrame
/** * Returns the object itself. * @group basic * @since 1.3.0 */ // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. def toDF(): DataFrame = this
2. show()方法:将结果显示出来
/** * Displays the [[DataFrame]] in a tabular form. For example: * {{{ * year month AVG('Adj Close) MAX('Adj Close) * 1980 12 0.503218 0.595103 * 1981 01 0.523289 0.570307 * 1982 02 0.436504 0.475256 * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right * * @group action * @since 1.5.0 */ // scalastyle:off println def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate)) // scalastyle:on println
追踪showString源码如下:showString中触发action收集数据。
/** * Compose the string representing rows for output * @param _numRows Number of rows to show * @param truncate Whether truncate long strings and align cells right */ private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length
更多SQL内容来自木庄网络博客