SparkSql读取数据的方式

一、读取普通文件 

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

1、代码演示最普通的文件读取方式: 

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 构建环境变量# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取sparkSession对象spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df01 = spark.read.json("../../datas/resources/people.json")df01.printSchema()df02 = spark.read.format("json").load("../../datas/resources/people.json")df02.printSchema()df03 = spark.read.parquet("../../datas/resources/users.parquet")df03.printSchema()#spark.read.orc("")df04 = spark.read.format("orc").load("../../datas/resources/users.orc")df04.printSchema()df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")df05.printSchema()df06 = spark.read.load(path="../../datas/resources/people.csv",format="csv",sep=";")df06.printSchema()spark.stop()

二、 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (`empno` int(11) NULL DEFAULT NULL,`ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`mgr` int(11) NULL DEFAULT NULL,`hiredate` date NULL DEFAULT NULL,`sal` decimal(7, 2) NULL DEFAULT NULL,`comm` decimal(7, 2) NULL DEFAULT NULL,`deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (`deptno` int(11) NULL DEFAULT NULL,`dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

查询时会报如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driverat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

接着放驱动程序: 

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

我的最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars# 上传jar包:mysql-connector-java-5.1.32.jar

代码演示: 

import osfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongTypeif __name__ == '__main__':# 获取sparkSession对象# 设置 任务的环境变量os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 得到sparkSession对象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()dict = {"user":"root","password":"root"}jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)jdbcDf.show()# jdbc的另一种写法jdbcDf2 = spark.read.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "spark.dept") \.option("user", "root") \.option("password", "root").load()jdbcDf2.show()# 关闭spark.stop()

三、 读取table中的数据【hive】

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的

1)集群环境操作hive 

需要启动的服务: 

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置: 

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>hive.metastore.uris</name><value>thrift://bigdata01:9083</value></property>
</configuration>接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL: 

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

2)开发环境如何编写代码,操作hive:

代码实战:

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 构建环境变量# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 防止在本地操作hdfs的时候,出现权限问题os.environ['HADOOP_USER_NAME'] = 'root'# 获取sparkSession对象spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()spark.sql("select * from yhdb.student").show()spark.read.table("yhdb.student").show()spark.stop()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/468318.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux相关概念和易错知识点(19)(HDD、Block group)

目录 1.HDD &#xff08;1&#xff09;HDD存储描述 &#xff08;2&#xff09;HDD结构图 &#xff08;3&#xff09;磁盘管理的分治思想 &#xff08;4&#xff09;硬盘中文件系统的整体划分图 2.Block group &#xff08;1&#xff09;文件管理 ①文件属性的存储 ②in…

IDEA构建JavaWeb项目,并通过Tomcat成功运行

目录 一、Tomcat简介 二、Tomcat安装步骤 1.选择分支下载 2.点击下载zip安装包 3.解压到没有中文、空格和特殊字符的目录下 4.双击bin目录下的startup.bat脚本启动Tomcat 5.浏览器访问Tomcat 6.关闭Tomcat服务器 三、Tomcat目录介绍 四、WEB项目的标准结构 五、WEB…

【C#】选课程序增加、删除统计学时

文章目录 【例6-2】编写选课程序。利用利用列表框和组合框增加和删除相关课程&#xff0c;并统计学时数1. 表6-2 属性设置2. 设计窗体及页面3. 代码实现4. 运行效果 【例6-2】编写选课程序。利用利用列表框和组合框增加和删除相关课程&#xff0c;并统计学时数 分析&#xff1…

Sigrity SPEED2000 Power Ground Noise Simulation模式如何进行电源地噪声分析操作指导-SODIMM

Sigrity SPEED2000 Power Ground Noise Simulation模式如何进行电源地噪声分析操作指导-SODIMM Sigrity Speed2000是时域仿真分析工具&#xff0c;Power Ground Noise Simulation模式可以观测器件的时域电压波形和观测电源地空间电压分布&#xff0c; 以下图为例进行分析 用Sp…

【CLIP系列】开篇

在多模态学习领域&#xff0c;CLIP无疑是一项具有里程碑意义的工作&#xff0c;自发布以来便引发了广泛关注。其在视觉-语言基础模型中的影响力极为深远&#xff0c;截至目前&#xff0c;该研究的引用量已突破23,000次&#xff0c;充分体现了其在学术界和工业界的重要地位。 为…

dell服务器安装ESXI8

1.下载镜像在官网 2.打开ipmi&#xff08;idrac&#xff09;&#xff0c;将esxi镜像挂载&#xff0c;然后服务器开机 3.进入bios设置cpu虚拟化开启&#xff0c;进入boot设置启动选项为映像方式 4..进入安装引导界面3.加载完配置进入安装 系统提示点击继 5.选择安装磁盘进行…

深度学习-神经网络基础-激活函数与参数初始化(weight, bias)

一. 神经网络介绍 神经网络概念 神经元构建 神经网络 人工神经网络是一种模仿生物神经网络结构和功能的计算模型, 由神经元构成 将神经元串联起来 -> 神经网络 输入层: 数据 输出层: 目标(加权和) 隐藏层: 加权和 激活 全连接 第N层的每个神经元和第N-1层的所有神经元…

栈(Stack)和队列(Deque、Queue)

文章目录 一、栈1.1 栈 VS 虚拟机栈 VS 栈帧1.2 数据结构 -- 栈介绍1.3 用数组模拟实现栈1.4 栈的功能&#xff1a;逆序打印 二、队列2.1 数据结果 -- 队列介绍2.2 用单链表模拟实现Queue队列 一、栈 1.1 栈 VS 虚拟机栈 VS 栈帧 区别&#xff1a; 栈&#xff1a;是一种数据结…

Spring Boot2.0之九 使用EasyExcel导出Excel

前言 SpringBoot项目实现Excel文件导出功能&#xff0c;可以使用alibaba开源项目EasyExcel实现。默认导出的Excel表头为宋体14加粗&#xff0c;表内容为宋体11。 一、引入EasyExcel依赖 <dependency><groupId>com.alibaba</groupId><artifactId>eas…

Java关于暴力破解MD5加密字符串示例

最近看到一个系统的用户密码直接就是用MD5加密的方式存在数据库的&#xff0c;而且也没有加盐&#xff0c;顿时有些好奇&#xff0c;因为一直听说MD5加密不够安全&#xff0c;很容易碰撞攻击&#xff0c;但是这个容易是有多容易&#xff0c;如果要破解一个MD5加密的密码大概要多…

我国成功发射航天宏图PIESAT-2 01~04星

11月9日11时39分&#xff0c;我国在酒泉卫星发射中心使用长征二号丙运载火箭&#xff0c;成功将航天宏图PIESAT-2 01&#xff5e;04星发射升空&#xff0c;卫星顺利进入预定轨道&#xff0c;发射任务获得圆满成功[1]。 航天宏图信息技术股份有限公司&#xff08;以下简称“航天…

三十四、VB基本知识与提高篇

一、代码编写规则: (一)标识符的使用规则: 标识符有两种:一种是系统关键字,另一种是自己定义标识符。 1、不能与系统关键字相同。 2、同一作用域(块)中不同出现重名标识符。用户自定义的标识符是不区分大小写的。 3、自定义标识符必须以字母开头,长度不能超过255…

[代码随想录打卡Day8] 344.反转字符串 541. 反转字符串II 54. 替换数字

反转字符串 难度&#xff1a;易。 问题描述&#xff1a;编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。不要给另外的数组分配额外的空间&#xff0c;你必须原地修改输入数组、使用 O(1) 的额外空间解决这一问题。 这个就是开头…

【双十一特惠】腾讯云省钱攻略:如何智取云计算资源

前言 双十一不仅是购物的狂欢节&#xff0c;对于云计算用户来说&#xff0c;更是一个节省成本的绝佳时机。腾讯云&#xff0c;作为国内领先的云计算服务商&#xff0c;每年双十一都会推出一系列优惠活动。本文将为您揭开如何在这个购物节中&#xff0c;最大化利用腾讯云的优惠…

mean_x2 = (x**2).mean(dim=dims, keepdims=True)

这行代码的作用是计算输入张量 x 在指定维度上的平方均值&#xff0c;并保持原始维度的形状。具体来说&#xff1a; mean_x2 (x**2).mean(dimdims, keepdimsTrue) # [b,1,1] 参数解释 x**2&#xff1a;对输入张量 x 的每个元素进行平方运算。.mean(dimdims, keepdimsTrue)…

如何在 Android 上增加 SELinux 权限

SELinux&#xff08;Security-Enhanced Linux&#xff09;是一种强制访问控制&#xff08;MAC&#xff09;机制&#xff0c;它为 Android 系统提供了额外的安全层。通过 SELinux&#xff0c;系统管理员可以定义细粒度的安全策略&#xff0c;限制进程对文件、网络和其他资源的访…

数字化转型实践:金蝶云星空与钉钉集成提升企业运营效率

数字化转型实践&#xff1a;金蝶云星空与钉钉集成提升企业运营效率 本文介绍了深圳一家电子设备制造企业在数字化转型过程中&#xff0c;如何通过金蝶云星空与钉钉的高效集成应对挑战、实施解决方案&#xff0c;并取得显著成果。集成项目在提高沟通效率、自动化审批流程和监控异…

『事善能』MySQL基础 — 2.MySQL 5.7安装(一)

1、通过msi安装软件进行MySQL安装 &#xff08;1&#xff09;点击运行MySQL安装文件 &#xff08;2&#xff09;选择安装类型 我们选择自定义安装&#xff0c;点击Next。 说明 Develop Default&#xff1a;默认开发类型&#xff0c;安装MySQL服务器以及开发MySQL应用所需要的工…

DICOM图像知识:DICOM图像排序与坐标系解析

目录 引言 1. 概述 2. DICOM图像排序规则 2.1 Patient的Study按Study Date排序 2.2 Study的Series按Series Number排序 2.3 Series的SOP按Instance Number或Slice Location排序 2.3.1 Instance Number排序 2.3.2 Slice Location排序 2.3.3 使用Image Position (Patien…

uniapp—android原生插件开发(2原生插件开发)

本篇文章从实战角度出发&#xff0c;将UniApp集成新大陆PDA设备RFID的全过程分为四部曲&#xff0c;涵盖环境搭建、插件开发、AAR打包、项目引入和功能调试。通过这份教程&#xff0c;轻松应对安卓原生插件开发与打包需求&#xff01; ***环境问题移步至&#xff1a;uniapp—an…