Spark SQL编程

1. Spark SQL概述

1.1 什么是Spark SQL

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种,包括SQL和Dataset API。计算结果时,使用相同的执行引擎,与您用于表达计算的API/语言无关。

1.2 为什么要有Spark SQL

1.3 SparkSQL的发展

1)发展历史

RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的他们的执行效率和执行方式在现在的版本中,dataSet性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSet<Row>。

2)三者的共性

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算

(3)三者有许多共同的函数,如filter,排序等

(4)三者都会根据Spark的内存情况自动缓存运算

(5)三者都有分区的概念

1.4 Spark SQL的特点

1)易整合

无缝的整合了SQL查询和Spark编程。

2)统一的数据访问方式

使用相同的方式连接不同的数据源。

3)兼容Hive

在已有的仓库上直接运行SQL或者HQL。

4)标准的数据连接

通过JDBC或者ODBC来连接

1.5 SparkSession新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

  • 一个叫SQLContext,用于Spark自己提供的SQL查询;
  • 一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

[atguigu@hadoop102 spark-local]$ bin/spark-shell

20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://hadoop102:4040

Spark context available as 'sc' (master = local[*], app id = local-1599880621394).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1

      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)

Type in expressions to have them evaluated.

Type :help for more information.

2 常用方式

2.1 方法调用

1)创建一个maven工程SparkSQL

2)创建包名为com.atguigu.sparksql

3)输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}{"age":19,"name":"xuzhu"}{"age":18,"name":"duanyu"}

{"age":22,"name":"qiaofeng"}

{"age":11,"name":"xuzhu"}

{"age":12,"name":"duanyu"}

5)在pom.xml文件中添加spark-sql的依赖

<dependencies>

    <dependency>

       <groupId>org.apache.spark</groupId>

       <artifactId>spark-sql_2.12</artifactId>

       <version>3.3.1</version>

    </dependency>

    <dependency>

       <groupId>org.projectlombok</groupId>

       <artifactId>lombok</artifactId>

       <version>1.18.22</version>

    </dependency>

</dependencies>

6)代码实现

添加javaBean的User

package com.atguigu.sparksql.Bean;

import lombok.Data;

import java.io.Serializable;

@Data

public class User implements Serializable {

    public Long age;

    public String name;

    public User() {

    }

    public User(Long age, String name) {

        this.age = age;

        this.name = name;

    }

}

代码编写

package com.atguigu.sparksql;

import com.atguigu.sparksql.Bean.User;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.MapFunction;

import org.apache.spark.api.java.function.ReduceFunction;

import org.apache.spark.sql.*;

import scala.Tuple2;

public class Test01_Method {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        // 按照行读取

        Dataset<Row> lineDS = spark.read().json("input/user.json");

        // 转换为类和对象

        Dataset<User> userDS = lineDS.as(Encoders.bean(User.class));

//        userDS.show();

        // 使用方法操作

        // 函数式的方法

        Dataset<User> userDataset = lineDS.map(new MapFunction<Row, User>() {

            @Override

            public User call(Row value) throws Exception {

                return new User(value.getLong(0), value.getString(1));

            }

        },

                // 使用kryo在底层会有部分算子无法使用

                Encoders.bean(User.class));

        // 常规方法

        Dataset<User> sortDS = userDataset.sort(new Column("age"));

        sortDS.show();

        // 区分

        RelationalGroupedDataset groupByDS = userDataset.groupBy("name");

        // 后续方法不同

        Dataset<Row> count = groupByDS.count();

        // 推荐使用函数式的方法  使用更灵活

        KeyValueGroupedDataset<String, User> groupedDataset = userDataset.groupByKey(new MapFunction<User, String>() {

            @Override

            public String call(User value) throws Exception {

                return value.name;

            }

        }, Encoders.STRING());

        // 聚合算子都是从groupByKey开始

        // 推荐使用reduceGroup

        Dataset<Tuple2<String, User>> result = groupedDataset.reduceGroups(new ReduceFunction<User>() {

            @Override

            public User call(User v1, User v2) throws Exception {

                // 取用户的大年龄

                return new User(Math.max(v1.age, v2.age), v1.name);

            }

        });

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}

在sparkSql中DS直接支持的转换算子有:map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。

2.2 SQL使用方式

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class Test02_SQL {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> lineDS = spark.read().json("input/user.json");

        // 创建视图 => 转换为表格 填写表名

        // 临时视图的生命周期和当前的sparkSession绑定

        // orReplace表示覆盖之前相同名称的视图

        lineDS.createOrReplaceTempView("t1");

        // 支持所有的hive sql语法,并且会使用spark的又花钱

        Dataset<Row> result = spark.sql("select * from t1 where age > 18");

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}}

2.3 DSL特殊语法(扩展)

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.col;

public class Test03_DSL {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        // 导入特殊的依赖 import static org.apache.spark.sql.functions.col;

        Dataset<Row> lineRDD = spark.read().json("input/user.json");

        Dataset<Row> result = lineRDD.select(col("name").as("newName"),col("age").plus(1).as("newAge"))

                .filter(col("age").gt(18));

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}

3 SQL语法的用户自定义函数

3.1 UDF 用户自定义函数

1)UDF:一行进入,一行出

2)代码实现

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.api.java.UDF1;

import org.apache.spark.sql.expressions.UserDefinedFunction;

import org.apache.spark.sql.types.DataTypes;

import static org.apache.spark.sql.functions.udf;

public class Test04_UDF {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> lineRDD = spark.read().json("input/user.json");

        lineRDD.createOrReplaceTempView("user");

        // 定义一个函数

        // 需要首先导入依赖import static org.apache.spark.sql.functions.udf;

        UserDefinedFunction addName = udf(new UDF1<String, String>() {

            @Override

            public String call(String s) throws Exception {

                return s + " 大侠";

            }

        }, DataTypes.StringType);

        spark.udf().register("addName",addName);

        spark.sql("select addName(name) newName from user")

                .show();

        // lambda表达式写法

        spark.udf().register("addName1",(UDF1<String,String>) name -> name + " 大侠",DataTypes.StringType);

        //4. 关闭sparkSession

        spark.close();

    }

}

3.2 UDAF 用户自定义聚合函数

1)UDAF:输入多行,返回一行。通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有的数据合并在一起。

2)Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

3)Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame

4)案例实操

需求:实现求平均年龄,自定义UDAFMyAvg(age)

(1)自定义聚合函数实现-强类型

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Encoder;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.expressions.Aggregator;

import java.io.Serializable;

import static org.apache.spark.sql.functions.udaf;

public class Test05_UDAF {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        spark.read().json("input/user.json").createOrReplaceTempView("user");

        // 注册需要导入依赖 import static org.apache.spark.sql.functions.udaf;

        spark.udf().register("avgAge",udaf(new MyAvg(),Encoders.LONG()));

        spark.sql("select avgAge(age) newAge from user").show();

        //4. 关闭sparkSession

        spark.close();

    }

    public static class Buffer implements Serializable {

        private Long sum;

        private Long count;

        public Buffer() {

        }

        public Buffer(Long sum, Long count) {

            this.sum = sum;

            this.count = count;

        }

        public Long getSum() {

            return sum;

        }

        public void setSum(Long sum) {

            this.sum = sum;

        }

        public Long getCount() {

            return count;

        }

        public void setCount(Long count) {

            this.count = count;

        }

    }

    public static class MyAvg extends Aggregator<Long,Buffer,Double>{

        @Override

        public Buffer zero() {

            return new Buffer(0L,0L);

        }

        @Override

        public Buffer reduce(Buffer b, Long a) {

            b.setSum(b.getSum() + a);

            b.setCount(b.getCount() + 1);

            return b;

        }

        @Override

        public Buffer merge(Buffer b1, Buffer b2) {

            b1.setSum(b1.getSum() + b2.getSum());

            b1.setCount(b1.getCount() + b2.getCount());

            return b1;

        }

        @Override

        public Double finish(Buffer reduction) {

            return reduction.getSum().doubleValue() / reduction.getCount();

        }

        @Override

        public Encoder<Buffer> bufferEncoder() {

            // 可以用kryo进行优化

            return Encoders.kryo(Buffer.class);

        }

        @Override

        public Encoder<Double> outputEncoder() {

            return Encoders.DOUBLE();

        }

    }

}

3.3 UDTF(没有)

输入一行,返回多行(Hive)

SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/192279.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文讲清生产质量场景的数据分析思路及案例实战

今天&#xff0c;顺着制造业数据分析这个大主题&#xff0c;我们来讲讲质量管理数据分析。   说起质量管理&#xff0c;就是对所生产的产品质量进行管理&#xff0c;其最终目的就是保证客户收到的产品质量&#xff0c;提高客户满意度&#xff0c;减少退货和维修的数量。质量管…

Netty Review - 从BIO到NIO的进化推演

文章目录 BIODEMO 1DEMO 2小结论单线程BIO的缺陷BIO如何处理并发多线程BIO服务器的弊端 NIONIO要解决的问题模拟NIO方案一&#xff1a; &#xff08;等待连接时和等待数据时不阻塞&#xff09;方案二&#xff08;缓存Socket&#xff0c;轮询数据是否准备好&#xff09;方案二存…

【计算机网络笔记】CIDR与路由聚合

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

日历应用程序 BusyCal mac中文版软件特点

BusyCal mac是一款日历应用程序&#xff0c;它可以帮助用户轻松地管理日程安排、事件提醒、会议安排等。BusyCal 支持 macOS 和 iOS 平台&#xff0c;并且可以与 iCloud、Google 日历、Exchange 等多种日历服务进行同步。 BusyCal mac软件特点 强大的日历功能&#xff1a;Busy…

单链表经典OJ题(三)

目录 1、反转链表 2、合并两个有序链表 3、链表的中间结点 4、环形链表的约瑟夫问题 5、移除链表元素 6、移除元素 1、反转链表 206. 反转链表 - 力扣&#xff08;LeetCode&#xff09; 翻转链表的实质就是更改当前结点的前驱结点和后继结点 假设原链表为:1->2->…

【nlp】2.4 GRU模型

GRU模型 1 GRU介绍2 GRU的内部结构图2.1 GRU结构分析2.2 Bi-GRU介绍2.3 使用Pytorch构建GRU模型2.4 GRU优缺点3 RNN及其变体1 GRU介绍 GRU(Gated Recurrent Unit)也称门控循环单元结构, 它也是传统RNN的变体, 同LSTM一样能够有效捕捉长序列之间的语义关联, 缓解梯度消失或爆…

Day29力扣打卡

打卡记录 美丽塔 II&#xff08;前后缀分解 单调栈&#xff09; 链接 大佬的题解 class Solution:def maximumSumOfHeights(self, a: List[int]) -> int:n len(a)suf [0] * (n 1)st [n] # 哨兵s 0for i in range(n - 1, -1, -1):x a[i]while len(st) > 1 and …

​TechSmith Camtasia 2024破解版功能介绍及使用教程

在现在的网络互联网时代&#xff0c;越来越多的人走上了自媒体的道路。有些自媒体人会自己在网络上录制精彩视频&#xff0c;也有一些人会将精彩、热门的电影剪辑出来再加上自己给它的配音&#xff0c;做成大家喜欢看的电影剪辑片段。相信不管大家是自己平时有独特的爱好也好、…

Django(五、视图层)

文章目录 一、视图层1.视图函数返回值的问题2.三板斧的使用结论&#xff1a;在视图文件中写视图函数的时候不能没有返回值&#xff0c;默认返回的是None&#xff0c;但是页面上会报错&#xff0c;用来处理请求的视图函数都必须返回httpResponse对象。 二、JsonReponse序列化类的…

[单片机课程设计报告汇总] 单片机设计报告常用硬件元器件描述

[单片机课程设计必看] 单片机设计报告常用描述 硬件设计 AT89C51最小系统 AT89C51是美国ATMEL公司生产的低电压&#xff0c;高性能CMOS16位单片机&#xff0c;片内含4k bytes的可反复擦写的只读程序存储器和128 bytes的随机存取数据存储器&#xff0c;期间采用ATMEL公司的高…

FreeRTOS源码阅读笔记3--queue.c

消息队列可以应用于发送不定长消息的场合&#xff0c;包括任务与任务间的消息交换&#xff0c;队列是 FreeRTOS 主要的任务间通讯方式&#xff0c;可以在任务与任务间、中断和任务间传送信息&#xff0c;发送到 队列的消息是通过拷贝方式实现的&#xff0c;这意味着队列存储…

Python入门:一文详解Python列表(List)操作方法

文章目录 前言一、创建一个列表二、访问列表中的值三、更新列表四、删除列表元素六、Python列表截取七、Python列表操作的函数和方法关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②…

部署百川大语言模型Baichuan2

Baichuan2是百川智能推出的新一代开源大语言模型&#xff0c;采用 2.6 万亿 Tokens 的高质量语料训练。在多个权威的中文、英文和多语言的通用、领域 benchmark 上取得同尺寸最佳的效果。包含有 7B、13B 的 Base 和 Chat 版本&#xff0c;并提供了 Chat 版本的 4bits 量化。 模…

网络运维Day16

文章目录 Docker简介什么是容器命名空间&#xff1a; Docker 的优缺点 Docker安装Docker镜像管理什么是镜像镜像管理 Docker容器管理运行容器容器启动、停止、重启拷贝文件进入容器容器与应用 DockerfileDockerfile 语法案例 总结 Docker简介 什么是容器 容器是用来装东西的&a…

Python-Python高阶技巧:HTTP协议、静态Web服务器程序开发、循环接收客户端的连接请求

版本说明 当前版本号[20231114]。 版本修改说明20231114初版 目录 文章目录 版本说明目录HTTP协议1、网址1.1 网址的概念1.2 URL的组成1.3 知识要点 2、HTTP协议的介绍2.1 HTTP协议的概念及作用2.2 HTTP协议的概念及作用2.3 浏览器访问Web服务器的过程 3、HTTP请求报文3.1 H…

红队专题-从零开始VC++C/S远程控制软件RAT-MFC-超级终端

红队专题 招募六边形战士队员[16]超级终端(1)消息 宏的定义映射cmdshell.cpp重载 构造函数Onsize 随窗口大小事件回车键发送命令添加字符转换类 StringToTransform [17]超级终端(2)接受命令创建m_cmd c类发送 接收客户端远端进程关闭 招募六边形战士队员 一起学习 代码审计、安…

景联文科技:驾驭数据浪潮,赋能AI产业——全球领先的数据标注解决方案供应商

根据IDC相关数据统计&#xff0c;全球数据量正在经历爆炸式增长&#xff0c;预计将从2016年的16.1ZB猛增至2025年的163ZB&#xff0c;其中大部分是非结构化数据&#xff0c;被直接利用&#xff0c;必须通过数据标注转化为AI可识别的格式&#xff0c;才能最大限度地发挥其应用价…

网络运维Day17

文章目录 什么是数据库MySQL介绍实验环境准备构建MySQL服务连接数据库修改root密码 数据库基础常用的SQL命令分类SQL命令使用规则MySQL基本操作创建库创建表查看表结构 记录管理命令 数据类型数值类型 数据类型日期时间类型时间函数案例枚举类型 约束条件案例修改表结构添加新字…

C++二分查找算法:最大为 N 的数字组合

涉及知识点 二分查找 数学 题目 给定一个按 非递减顺序 排列的数字数组 digits 。你可以用任意次数 digits[i] 来写的数字。例如&#xff0c;如果 digits [‘1’,‘3’,‘5’]&#xff0c;我们可以写数字&#xff0c;如 ‘13’, ‘551’, 和 ‘1351315’。 返回 可以生成的…

基于群居蜘蛛算法优化概率神经网络PNN的分类预测 - 附代码

基于群居蜘蛛算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于群居蜘蛛算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于群居蜘蛛优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…