一、SparkSql的前世今生
Hive->Shark->Spark SQL
二、SparkSql依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.2</version>
</dependency>
三、SparkSql DataFrame
DataFrame,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。
四、创建SQLContext
val sqlContext = SparkSession.builder().appName("RDD2DataFrameReflection").master("local").getOrCreate;
五、SparkSql创建DataFrame
val properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "root");
val testDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/test", "user", properties);
val testDF =sqlContext.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8")
.option("user", "root")
.option("password", "root")
.option("dbtable","user")
.option("dbtable","(select id from user where id < 50) as t") // sql语句必须是子查询
.load()
六、SparkSql DataFrame的操作
// 查询
testDF.show()
testDF.printSchema()
testDF.select("username").show()
testDF.select(testDF("username"), testDF("id") + 1).show()
testDF.filter(testDF("id") % 2 === 0).show()
testDF.groupBy("sex").count().show
七、RDD转换为DataFrame
1、使用反射来推断包含了特定数据类型的RDD的元数据
case class Student(id: Int, name: String, age: Int) // 定义模式类,属性名对应数据表的字段名
// RDD.toDF()转换为DataFrame
val studentDF = sc.textFile("data/students.txt", 1)
.map { line => line.split(",") }
.map { arr => Student(arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt) }
.toDF()
//.map { arr => (arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt) }
//.toDF("id", "name", "age")
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "root")
studentDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8","student",properties);
2、编程方式使用Row将RDD转换为DataFrame
// 第一步,构造Row数据
val studentRDD = sc.textFile("data/students.txt", 1)
.map { line => Row(line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2).toInt) }
// 第二步,构造元数据
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
// 第三步,转换RDD为DataFrame
val studentDF = sqlContext.createDataFrame(studentRDD, structType)
八、读入每个数据源,生成一个临时视图,通过一个sql去操作这些视图
val studentScoreDF = sqlContext.read.json("data/student_score.json");
studentScoreDF.createOrReplaceTempView("student_score");
val studentInfoDF = sqlContext.read.json("data/student_info.json")
studentInfoDF.createOrReplaceTempView("student_info")
val goodStudentInfoDF = sqlContext.sql("select s.name, s.score, i.gender from student_score s, student_info i where s.score>80 and s.name=i.name");
goodStudentInfoDF.rdd.collect().foreach(row => println(row(0) + " " + row(1) + " " + row(2)))
goodStudentInfoDF.write.json("data/result/goodstudents")
九、SparkSql UDF用户自定义函数
UDAF:User Defined Aggregate Function。用户自定义聚合函数