下面展示读取txt文件的例子,首先创建一个user.txt
a 23 b 24 c 25 d 26
现在我要将上面的这几行变成DataFrame,第一列表示姓名,第二列表示年龄,于是就可以像下面这样操作:
public class SqlTest2 { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder() .appName("sql") .master("local") .getOrCreate(); SparkContext sparkContext = sparkSession.sparkContext(); JavaSparkContext sc = new JavaSparkContext(sparkContext); JavaRDD<String> lines = sc.textFile("data/user.txt"); //将String类型转化为Row类型 JavaRDD<Row> rowJavaRDD = lines.map(new Function<String, Row>() { @Override public Row call(String v1) throws Exception { String[] split = v1.split(" "); return RowFactory.create( split[0], Integer.valueOf(split[1]) ); } }); //定义schema List<StructField> structFields = Arrays.asList( DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true) ); StructType structType = DataTypes.createStructType(structFields); //生成dataFrame Dataset<Row> dataFrame = sparkSession.createDataFrame(rowJavaRDD, structType); dataFrame.show(); } }
(四)通过JDBC创建DataFrame
通过JDBC可直接将对应数据库中的表放入Spark中进行一些处理,下面通过MySQL进行展示。
使用MySQL需要在依赖中引入MySQL的引擎:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency>
接着通过类似JDBC的方式读取MySQL数据:
public class SqlTest3 { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder() .appName("sql") .master("local") .getOrCreate(); Map<String,String> options = new HashMap<>(); options.put("url","jdbc:mysql://127.0.0.1:3306/books"); options.put("driver","com.mysql.jdbc.Driver"); options.put("user","root"); options.put("password","123456"); options.put("dbtable","book"); Dataset<Row> jdbc = sparkSession.read().format("jdbc").options(options).load(); jdbc.show(); sparkSession.close(); } }
读取到的数据是DataFrame,接下来的操作就是对DataFrame的操作了。
(五)总结
SparkSQL是对Spark原生RDD的增强,虽然很多功能通过RDD就可以实现,但是SparkSQL可以更加灵活地实现一些功能。
到此这篇关于SparkSQL快速入门教程的文章就介绍到这了,更多相关SparkSQL入门内容请搜索
更多SQL内容来自木庄网络博客