DataFrame
-
Dataframe 是什么
DataFrame
是SparkSQL
中一个表示关系型数据库中表
的函数式抽象, 其作用是让Spark
处理大规模结构化数据的时候更加容易. 一般DataFrame
可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到Schema
信息. 也就是说DataFrame
中有Schema
信息, 可以像操作表一样操作DataFrame
.DataFrame
由两部分构成, 一是row
的集合, 每个row
对象表示一个行, 二是描述DataFrame
结构的Schema
DataFrame
支持SQL
中常见的操作, 例如:select
,filter
,join
,group
,sort
,join
等- code
@Testdef dataframe1(): Unit = {// 1. 创建 SparkSession 对象val spark = SparkSession.builder().master("local[6]").appName("dataframe1").getOrCreate()// 2. 创建 DataFrameimport spark.implicits._val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF()// 3. 看看 DataFrame 可以玩出什么什么花样// select name from t where t.age >10dataFrame.where('age > 10).select( 'name).show()}case class Person(name: String, age: Int)
-
DataFrame 如何创建
DataFrame如何创建数据集【BeijingPM20100101_20151231_noheader.rar】
@Testdef dataframe2():Unit = {val spark = SparkSession.builder().master("local[6]").appName("dataframe2").getOrCreate()import spark.implicits._val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))// 创建 DataFrame 的方法// 1.toDFval df1 = personList.toDF()val df2 = spark.sparkContext.parallelize(personList).toDF() // RDD.toDf()// 2. createDatFrameval df3 = spark.createDataset(personList)// 3. read val df4 = spark.read.csv("./dataset/BeijingPM20100101_20151231_noheader.csv")df4.show()}case class Person(name: String, age: Int)
-
DataFrame 操作 (案例)
DataFrame操作数据集[BeijingPM20100101_20151231.rar]
需求:查看 PM_Dongsi 每个月的统计数量
object DataFrameTest {def main(args: Array[String]): Unit = {// 1. 创建SparkSessionval spark = SparkSession.builder().master("local[6]").appName("pm_analysis").getOrCreate()import spark.implicits._// 2. 读取数据集val sourceDF = spark.read.option("header",true) // 把表头读取出来.csv("./dataset/BeijingPM20100101_20151231.csv")//sourceDF.show()//查看DataFrame 的 schema 信息,要意识到 DataFrame 中是有结构信息的,叫做SchemasourceDF.printSchema()// 3. 处理// 1. 选择列// 2. 过滤 NA 的 PM记录// 3. 分组 select year, month, count(PM_Dongsi) from .. where PM_Dongsi != NA group by year, month// 4. 聚合// 4. 得出结论sourceDF.select('year,'month,'PM_Dongsi).where('PM_Dongsi =!= "NA") // 过滤 NA 的 PM记录.groupBy('year,'month).count().show() // action// 是否能支持使用 SQL 语句进行查询println("---------接下来是SQL语句查询的--------------")// 1. 将 DataFrame 注册为临时表sourceDF.createOrReplaceTempView("pm")// 2. 执行查询val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year,month")resultDF.show()spark.stop()} }
总结
- DataFrame 是一个类似于关系型数据库表的函数式组件
- DataFrame 一般处理结构化数据和半结构化数据
- DataFrame 具有数据对象的 Schema 信息
- 可以使用命令式的 API 操作 DataFrame, 同时也可以使用 SQL 操作 DataFrame
- DataFrame 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建
小Tips
一般处理数据都差不多是ETL这个步骤
- E -> 抽取
- T -> 处理转换
- L -> 装载,落地
Spark代码编写的套路:
- 创建DataFrame Dataset RDD,制造或者读取数据
- 通过DataFrame Dataset RDD的API来进行数据处理
- 通过DataFrame Dataset RDD进行数据落地