Pyspark下操作dataframe方法(1)

文章目录

  • Pyspark dataframe
    • 创建DataFrame
      • 使用Row对象
      • 使用元组与scheam
      • 使用字典与scheam
      • 注意
    • agg 聚合操作
    • alias 设置别名
      • 字段设置别名
      • 设置dataframe别名
    • cache 缓存
    • checkpoint RDD持久化到外部存储
    • coalesce 设置dataframe分区数量
    • collect 拉取数据
    • columns 获取dataframe列

Pyspark dataframe

创建DataFrame

from pyspark.sql import  SparkSession,Row
from pyspark.sql.types import *def init_spark():spark  = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \.config('hive.metastore.uris', 'thrift://hadoop01:9083') \.config('spark.master', "local[2]") \.enableHiveSupport().getOrCreate()return spark
spark = init_spark()# 设置字段类型
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("gender", StringType(), True),
])

使用Row对象

cs = Row('name','age','id','gender')
row_list = [ cs('ldsx','12','1','男'),cs('test1','20','1','女'),cs('test2','26','1','男'),cs('test3','19','1','女'),cs('test4','51','1','女'),cs('test5','13','1','男')]
data = spark.createDataFrame(row_list)
data.show()+-----+---+---+---+
| name|age| id|gender|
+-----+---+---+---+
| ldsx| 12|  1| 男|
|test1| 20|  1| 女|
|test2| 26|  1| 男|
|test3| 19|  1| 女|
|test4| 51|  1| 女|
|test5| 13|  1| 男|
+-----+---+---+---+
data.printSchema()
root|-- name: string (nullable = true)|-- age: string (nullable = true)|-- id: string (nullable = true)|-- gender: string (nullable = true)

使用元组与scheam

park.createDataFrame([('ldsx1','12','1','男'),('ldsx2','12','1','男')],schema).show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|ldsx1| 12|  1|    男|
|ldsx2| 12|  1|    男|
+-----+---+---+------+

使用字典与scheam

spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}]).show()
+---+------+---+----+
|age|gender| id|name|
+---+------+---+----+
| 12|    女|  1|ldsx|
+---+------+---+----+

注意

scheam设置优先级高于row设置,dict设置的key

schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("测试", StringType(), True),
])
spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}],schema).show()
+----+---+---+----+
|name|age| id|测试|
+----+---+---+----+
|ldsx| 12|  1|null|
+----+---+---+----+

agg 聚合操作

在 PySpark 中,agg(aggregate)函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。可以结合groupby使用。

from pyspark.sql import functions as sf
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.agg({'age':'max'}).show()
+--------+
|max(age)|
+--------+
|      51|
+--------+
data.agg({'age':'max','gender':"max"}).show()
+-----------+--------+
|max(gender)|max(age)|
+-----------+--------+
|         男|      51|
+-----------+--------+data.agg(sf.min(data.age)).show()
+--------+
|min(age)|
+--------+
|      12|
+--------+
data.agg(sf.min(data.age),sf.min(data.name)).show()
+--------+---------+
|min(age)|min(name)|
+--------+---------+
|      12|     ldsx|
+--------+---------+

结合groupby使用

data.groupBy('gender').agg(sf.min('age')).show()+------+--------+
|gender|min(age)|
+------+--------+
|    女|      19|
|    男|      12|
+------+--------+
data.groupBy('gender').agg(sf.min('age'),sf.max('name')).show()
+------+--------+---------+
|gender|min(age)|max(name)|
+------+--------+---------+
|    女|      19|    test4|
|    男|      12|    test5|
+------+--------+---------+

alias 设置别名

字段设置别名

#字段设置别名
data.select(data['name'].alias('rename_name')).show()
+-----------+
|rename_name|
+-----------+
|       ldsx|
|      test1|
|      test2|
|      test3|
|      test4|
|      test5|
+-----------+

设置dataframe别名

d1 = data.alias('ldsx1')
d2 = data2.alias('ldsx2')
d1.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
d2.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|测试1| 12|  1|    男|
|测试2| 20|  1|    男|
+-----+---+---+------+d3 = d1.join(d2,col('ldsx1.gender')==col('ldsx2.gender'),'inner')
d3.show()
+-----+---+---+------+-----+---+---+------+
| name|age| id|gender| name|age| id|gender|
+-----+---+---+------+-----+---+---+------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|
|test2| 26|  1|    男|测试1| 12|  1|    男|
|test2| 26|  1|    男|测试2| 20|  1|    男|
|test5| 13|  1|    男|测试1| 12|  1|    男|
|test5| 13|  1|    男|测试2| 20|  1|    男|
+-----+---+---+------+-----+---+---+------+d3[['name']].show()
#报错提示
pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`ldsx1`.`name`, `ldsx2`.`name`].
# 使用别名前缀获取
d3[['ldsx1.name']].show()
+-----+
| name|
+-----+
| ldsx|
| ldsx|
|test2|
|test2|
|test5|
|test5|
+-----+
>>> d3[['ldsx2.name']].show()
+-----+
| name|
+-----+
|测试1|
|测试2|
|测试1|
|测试2|
|测试1|
|测试2|
+-----+
d3.select('ldsx1.name','ldsx2.name').show()
+-----+-----+
| name| name|
+-----+-----+
| ldsx|测试1|
| ldsx|测试2|
|test2|测试1|
|test2|测试2|
|test5|测试1|
|test5|测试2|
+-----+-----+

cache 缓存

dataframe缓存默认缓存级别MEMORY_AND_DISK_DESER

df.cache()
# 查看逻辑计划和物理计划
df.explain()

checkpoint RDD持久化到外部存储

Checkpoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用Checkpoint比较合适,或者数据量很大的时候,采用Checkpoint比较合适。如果数据量小,或者RDD重新计算也是非常快的,直接使用缓存即可。
CheckPoint支持写入HDFS。CheckPoint被认为是安全的

sc = spark.sparkContext
# 设置检查存储目录
sc.setCheckpointDir('hdfs:///ldsx_checkpoint')
d3.count()
# 保存会在hdfs上进行存储
d3.checkpoint()
# 从hdfs读取
d3.count()

在这里插入图片描述

coalesce 设置dataframe分区数量

# 设置dataframe分区数量
d3 = d3.coalesce(3)
# 获取分区数量
d3.rdd.getNumPartitions()

collect 拉取数据

当任务提交到集群的时候collect()操作是用来将所有结点中的数据收集到dirver节点,数据量很大慎用防止dirver炸掉。

d3.collect()
[Row(name='ldsx', age='12', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='ldsx', age='12', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试2', age='20', id='1', gender='男')]

columns 获取dataframe列

>>> d3.columns
['name', 'age', 'id', 'gender', 'name', 'age', 'id', 'gender']d3.withColumn('ldsx1.name_1',col('ldsx1.name')).show()
+-----+---+---+------+-----+---+---+------+------------+
| name|age| id|gender| name|age| id|gender|ldsx1.name_1|
+-----+---+---+------+-----+---+---+------+------------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|        ldsx|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|        ldsx|
|test2| 26|  1|    男|测试1| 12|  1|    男|       test2|
|test2| 26|  1|    男|测试2| 20|  1|    男|       test2|
|test5| 13|  1|    男|测试1| 12|  1|    男|       test5|
|test5| 13|  1|    男|测试2| 20|  1|    男|       test5|
+-----+---+---+------+-----+---+---+------+------------+# 重命名列名
d3.withColumnRenamed('ldsx1.name_1',col('ldsx1.name')).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/422179.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CnCrypt(磁盘加密工具绿色版是一款功能强大磁盘加密工具,供大家学习研究参考

CnCrypt(磁盘加密工具)特点 加密单个分区或整个硬盘,所有加密都是以分区为基础的 提供两级方案,以应对被强迫说出密码的情况(如抢劫。隐藏分区(覆盖式密码术,steganography)无法探测到CnCrypt 加密分区(加密数据会被认为是随机数据)。 CnCrypt(磁盘加密工具)特色 1、加密U…

ucx 编译安装检验方式备忘

1, 下载配置编译 预备依赖: sudo apt-get install valgrind sudo apt-get install libibverbs-dev librdmacm-dev 1.1 下载源码 git clone --recursive https://github.com/openucx/ucx.git cd ucx/ git checkout v1.16.0 git 下来的代码,…

《Diffusion Models Without Attention》CVPR2024

摘要 这篇论文探讨了在高保真图像生成领域,去噪扩散概率模型(Denoising Diffusion Probabilistic Models, DDPMs)的重要性。尽管DDPMs在捕捉复杂视觉分布方面表现出色,但在高分辨率图像生成上面临显著的计算挑战。现有的方法&…

Vue邮件发送:如何有效集成邮件发送功能?

vue邮件发送功能实现方法?Vue邮件发送性能怎么优化? 无论是用户注册验证、密码重置,还是通知提醒,邮件发送功能都能提供重要的支持。本文将详细探讨如何在Vue项目中有效集成邮件发送功能,确保邮件能够准确、及时地送达…

macos 系统文件操作时提示 Operation not permitted 异常解决方法 , 通过恢复模式 开启 /关闭 SIP方法

在macos系统中操作系统文件时提示 Operation not permitted 这个异常, 原因是因为在macos 10.11以上版本中默认启用了 SIP( System Integrity Protection )机制对系统文件进行保护, 要解决这个问题我们需要关机, 然后进入mac的恢复模式 : 在按电源键开机的同时, 一直按住 co…

【机器学习】马尔可夫随机场的基本概念、和贝叶斯网络的联系与对比以及在python中的实例

引言 马尔可夫随机场(Markov Random Field,简称MRF)是一种用于描述变量之间依赖关系的概率模型,它在机器学习和图像处理等领域有着广泛的应用 文章目录 引言一、马尔科夫随机场1.1 定义1.2 特点1.3 应用1.4 学习算法1.5 总结 二、…

UG/NX加载插件失败的原因汇总

在自己的电脑上运行得好好的插件,部署到客户的电脑上出现未注册的命令错误或者“未能加载图像”的错误 1.首先检查插件的所有依赖是否齐全,确保齐全 2.这个问题在网络上搜索一番,大多数都是不知所云,后来看到这一篇文章【UG二次…

C++的流提取(>>)(输入) 流插入(<<)(输出)

什么是输入和输出流 流提取&#xff08;<<&#xff09;(输入) 理解&#xff1a;我们可以理解为&#xff0c;输入到io流里面&#xff0c;比如是cin&#xff0c;然后从输入流中读取数据 流插入&#xff08;<<&#xff09;&#xff08;输出&#xff09; 理解&#xff…

直播相关02-录制麦克风声音,QT 信号与槽,自定义信号和槽

一 信号与槽函数 #include "mainwindow.h" #include <QPushButton> #include <iostream> using namespace std;//我们的目的是在 window中加入一个button&#xff0c;当点击这个button后&#xff0c;关闭 MainWindow 。 MainWindow::MainWindow(QWidget …

828华为云征文 | 华为云Flexus X实例上实现Docker容器的实时监控与可视化分析

前言 华为云Flexus X&#xff0c;以顶尖算力与智能调度&#xff0c;引领Docker容器管理新风尚。828企业上云节之际&#xff0c;Flexus X携手前沿技术&#xff0c;实现容器运行的实时监控与数据可视化&#xff0c;让管理变得直观高效。无论是性能瓶颈的精准定位&#xff0c;还是…

TS 常用类型

我们经常说TypeScript是JavaScript的一个超级 TypeScript 常用类型 TypeScript 是 JS 的超集&#xff0c;TS 提供了 JS 的所有功能&#xff0c;并且额外的增加了&#xff1a;类型系统 所有的 JS 代码都是 TS 代码 JS 有类型&#xff08;比如&#xff0c;number/string 等&…

客厅无主灯设计:灯位布局与灯光灯具的和谐搭配

在现代家居设计中&#xff0c;客厅作为家庭活动的中心区域&#xff0c;其照明设计的重要性不言而喻。无主灯设计以其灵活多变、氛围营造独特的优势&#xff0c;逐渐成为客厅照明的热门选择。然而&#xff0c;如何合理规划灯位布局&#xff0c;并科学搭配灯光与灯具&#xff0c;…

基于java+springboot+vue实现的林业产品推荐系统(文末源码+Lw)135

基于SpringBootVue的实现的林业产品推荐系统&#xff08;源码数据库万字Lun文流程图ER图结构图演示视频软件包&#xff09; 系统功能&#xff1a; 林业产品推荐系统是在MySQL中建立数据表保存信息&#xff0c;运用SpringBoot框架和Java语言编写。 并按照软件设计开发流程进行…

ICETEK-DM6437-AICOM—— DMA直接存储器访问设计

#一、设计目的&#xff1a; 1 进一步了解 ICETEK-DM6437-AF 的内部存储器空间的分配及指令寻址方式&#xff1a; 内部存储器空间分配&#xff1a;研究 ICETEK-DM6437-AF 的存储器架构&#xff0c;包括但不限于片内 SRAM、片外 DRAM 和其他存储器模块。了解这些存储器的大小、起…

k8s 资源管理

文章目录 ResourceQuota什么是资源配额定义一个ResourceQuotaResourceQuota的使用 LimitRangeLimitRange的用途示例1&#xff1a;配置默认的requests和limits示例2&#xff1a;配置requests和limits的范围 QoS什么是服务质量保证示例1&#xff1a;实现QoS为Guaranteed的Pod示例…

优化安防视频监控的关键体验:视频质量诊断技术如何应用在监控系统中?

随着科技的不断进步&#xff0c;视频监控平台在公安、司法、教育、基础设施等众多领域得到了广泛应用。然而&#xff0c;视频图像的质量直接关系到监控系统的应用效果&#xff0c;是反映监控系统运维效果的重要指标之一。因此&#xff0c;视频监控平台需要配备一系列先进的视频…

Active Neural SLAM 复现记录

Active Neural SLAM 复现记录 创建虚拟环境安装habitat-sim安装habitat-api安装Pytorch配置项目准备数据先搞Gibson场景数据再搞pointnav任务数据创建软链接 测试训练 创建虚拟环境 conda create -n AVSLAM python3.10 conda activate AVSLAM安装habitat-sim git clone https…

存储课程学习笔记8_spdk的安装以及简单demo测试

已经对相关的基础概念有一定的了解&#xff0c;比如裸盘&#xff0c;文件系统&#xff0c;读写相关裸盘&#xff0c;裸盘挂载使用&#xff0c;内核插入文件系统的方式&#xff0c;相关操作io的库或者函数&#xff08;io_uring, readv&#xff0c;writev, mmap等&#xff09;&am…

nlohmann::json中有中文时调用dump转string抛出异常的问题

问题描述 Winodows下C开发想使用一个json库&#xff0c;使用的nlohmann::json&#xff0c;但是遇到json中使用中文时&#xff0c;转成string&#xff0c;会抛出异常。 nlohmann::json contentJson;contentJson["chinese"] "哈哈哈";std::string test con…

前端算法(持续更新)

1、最大的钻石 1楼到n楼的每层电梯口都放着一个钻石&#xff0c;钻石大小不一。你从电梯1楼到n楼&#xff0c;每层楼电梯门都会打开一次&#xff0c;只能拿一次钻石&#xff0c;问怎样才能最大的钻石&#xff1f; 解题思路&#xff1a; 这是一个经典的动态规划问题&#xff…