DataFrame详解

清洗相关的API

清洗相关的API:

1.去重API: dropDupilcates

2.删除缺失值API: dropna

3.替换缺失值API: fillna

去重API: dropDupilcates

dropDuplicates(subset):删除重复数据

1.用来删除重复数据,如果没有指定参数subset,比对行中所有字段内容,如果全部相同,则认为是重复数据,会被删除

2.如果有指定参数subset,只比对subset中指定的字段范围

删除缺失值API: dropna

dropna(thresh,subset):删除缺失值数据.

1.如果不传递参数,只要任意一个字段值为null,就会删除整行数据

2.如果只指定了subset,那么空值的检查,就只会限定在subset指定范围内

3.如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除

 替换缺失值API: fillna

fillna(value,subset):替换缺失值数据

1.value:必须要传递参数,指定填充缺失值的数据

2.subset:限定缺失值的替换范围

注意:

        value如果不是字典,那么就只会替换字段类型匹配的空值

        最常用的是value传递字典形式

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('API的清洗')# 创建Sparksession对象spark = SparkSession \.builder \.appName('api_etl_demo') \.master('local[*]') \.getOrCreate()# 数据输入init_df = spark.read.csv(path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',sep=',',header=True,inferSchema=True,encoding='utf8')# 查看数据init_df.show()init_df.printSchema()# 数据处理print('=' * 50)# 去重API:  dropDuplicatesinit_df.dropDuplicates().show()# 指定字段去重init_df.dropDuplicates(subset=['id', 'name']).show()print('=' * 50)# 删除缺失值的API:  dropnainit_df.dropna().show()# 指定字段删除init_df.dropna(subset='name').show()init_df.dropna(subset=['name', 'age', 'address']).show()init_df.dropna(thresh=1, subset=['name', 'age', 'address']).show()init_df.dropna(thresh=2, subset=['name', 'age', 'address']).show()print('=' * 50)# 替换缺失值APIinit_df.fillna(9999).show()# value传递字典形式init_df.fillna(value={'id': 9999, 'name': '刘亦菲', 'address': '北京'}).show()# 释放资源spark.stop()

Spark SQL的Shuffle分区设置

Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行

Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小

调整shuffle分区的数量:

方案一(不推荐):直接修改spark的配置文件spark-defaults.conf,全局设置,默认值为200

修改设置 spark.sql.shuffle.partitions 20

方案二(常用,推荐使用):在客户端通过指令submit命令提交的时候动态设置shuffle的分区数量,部署上线的时候,基于spark-submit提交运行的时候

        "./spark-submit --conf "spark.sql.shuffle.partitions=20"

方案三(比较常用):在代码中设置,主要在测试环境中使用,一般部署上线的时候,会删除,优先级也是最高的,一般的使用场景是数据量未来不会发生太大的波动

sparksession.conf.set("spark.sql.shuffle.partitions",20)

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('直接基于DataFrame来处理')spark = SparkSession \.builder \.config("spark.sql.shuffle.partitions", 1) \.appName('dataFrame_world_count_demo') \.master('local[*]') \.getOrCreate()# 数据输入# text方式读取hdfs上的文件init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')# # 查看数据# init_df.show()# # 打印dataframe表结构信息# init_df.printSchema()# 创建临时视图init_df.createTempView('words')# 数据处理"""sparksql方式处理数据-子查询1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.使用子查询方式聚合统计每个单词出现的次数"""spark.sql("""select word,count(*) as cnt from (select explode(split(value,' ')) as word from words)group by word order by cnt desc""").show()"""sparksql方式处理数据-侧视图1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.使用侧视图方式聚合统计每个单词出现的次数炸裂函数配合侧视图使用如下:格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)侧视图名 as 字段名"""spark.sql("""select word,count(*) as cntfrom words w lateral view explode(split(value,' ')) t as wordgroup by word order by cnt desc""").show()print('=' * 50)"""DSL方式处理数据-方式一1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.调用API聚合统计单词个数再排序"""init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').count().orderBy('count', ascending=False).show()"""DSL方式处理数据-方式二1.先切分每一行的数据2.使用炸裂函数获得一个word单词列3.调用API聚合统计单词个数再排序4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可"""init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word'),).orderBy('cnt', ascending=False).show()"""DSL方式处理数据-方式三withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源"""init_df.withColumn('word',F.explode(F.split('value', ' '))).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word'),F.min('word').alias('min_word')).orderBy('cnt', ascending=False).show()# 数据输出# 是否资源spark.stop()

数据写出操作

统一的输出语法:

对应的简写API格式如下,以CSV为例:
init_df.write.csv(
    path='存储路径',
    mode='模式',
    header=True,
    sep='\t',
    encoding='UTF-8'
)

输出到本地文件

常用参数说明:
    1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
    2- mode:当输出目录中文件已经存在的时候处理办法
        2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
        2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
        2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
        2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path     
                    file:xxx already exists.
        
    3- sep:字段间的分隔符
    4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
    5- encoding:文件输出的编码方式

 

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('数据输出本地文件')# 创建Sparksession对象spark = SparkSession \.builder \.appName('api_etl_demo') \.master('local[*]') \.getOrCreate()# 数据输入init_df = spark.read.csv(path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',sep=',',header=True,inferSchema=True,encoding='utf8')# 数据处理result = init_df.where('age>20')# 数据查看result.show()result.printSchema()# 数据输出# 以csv格式输出,简写APIresult.write.csv(path='file:///export/data/pyspark_projects/02_spark_sql/data/output',mode='append',header=True,sep=',',encoding='utf8')# 以json方式输出到本地文件系统,复杂APIresult.write \.format('json') \.option('encoding', 'utf8') \.mode('overwrite') \.save('file:///export/data/pyspark_projects/02_spark_sql/data/output_json')

数据输出到数据库

数据库的驱动包, 一般都是一些Jar包

如何放置【mysql-connector-java-5.1.41.jar】驱动包呢?  
    1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
        目录位置: /export/server/spark/jars
    
    2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
        目录位置:
            /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
    
    3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
        hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars
        

    请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars  ....

将中文输出到了数据表中乱码
解决办法:
1- 数据库连接要加上:useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码character set utf8

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式需求分析:1- 将每行内容切分得到单个的单词2- 组织DataFrame的数据结构2.1- 有两列。一列是单词,一列是次数
"""os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':print('API的清洗')# 创建Sparksession对象spark = SparkSession \.builder \.appName('api_etl_demo') \.master('local[*]') \.getOrCreate()# 数据输入init_df = spark.read.csv(path='file:///export/data/pyspark_projects/02_spark_sql/data/clear_data.csv',sep=',',header=True,inferSchema=True,encoding='utf8')# 数据处理result = init_df.where('age>20')# 数据查看result.show()result.printSchema()# 数据输出# 以csv格式输出,简写APIresult.write.jdbc(url='jdbc:mysql://node1:3306/day06?useUnicode=true&characterEncoding=utf-8',table='student',mode='append',properties={'user': 'root', 'password': '123456'})

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/234481.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习笔记之——3D Gaussian Splatting及其在SLAM与自动驾驶上的应用调研

之前博客介绍了NeRF-SLAM,其中对于3D Gaussian Splatting没有太深入介绍。本博文对3D Gaussian Splatting相关的一些工作做调研。 学习笔记之——NeRF SLAM(基于神经辐射场的SLAM)-CSDN博客文章浏览阅读967次,点赞22次&#xff0…

Java中的输入输出处理(一)

文件 文件:文件是放在一起的数据的集合。比如1.TXT。 存储地方:文件一般存储在硬盘,CD里比如D盘 如何访问文件属性:我们可以通过java.io.File类对其处理 File类 常用方法: 方法名称说明boolean exists()判断文件或目…

LiveGBS流媒体平台GB/T28181常见问题-国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道

LiveGBS国标GB28181中国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道 1、什么是国标编号?2、国标设备ID和通道ID3、ID 统一编码规则4、搭建GB28181视频直播平台 1、什么是国标编号? 国标GB28181对接过程中,可能有的小…

安科瑞对电子半导体行业电能质量监测与治理系统解决方案——安科瑞赵嘉敏

摘要:在国家鼓励半导体材料国产化的政策导向下,本土半导体材料厂商不断提升半导体产品技术水平和研发能力,逐渐打破了国外半导体厂商的垄断格局半导体材料国产化进程,促进中国半导体行业的发展。半导体产品的制造使用到的设备如单…

c++|关键字extern

一个C语言项目往往由多个文件组合而成。而对于多个文件来说,它们可能会共用到一些相同的变量。而有些情况下,这些相同的变量并没有出现在本文件内,有可能在其他文件内。而一个文件可能只会搜寻该文件内部是否有该变量。 所以,需要…

ROS学习笔记(9)进一步深入了解ROS第三步

0.前提 1. (C)Why did you include the header file of the message file instead of the message file itself?(为包含消息的头文件而不是消息本身?) 回答:msg文件是描述ROS消息字段的文本文件,用于生成不同语言消息…

NX二次开发 Block UI 指定方位控件的应用

一、概述 NX二次开发中一般都是多个控件的组合,这里我首先对指定方位控件进行说明并结合选择对象控件,具体如下图所示。 二、实现功能获取方位其在选择面上原点的目标 2.1 在initialize_cb()函数中进行初始化,实现对象选择过滤面 //过滤平…

回归预测 | Matlab实现DE-BP差分算法优化BP神经网络多变量回归预测

回归预测 | Matlab实现DE-BP差分算法优化BP神经网络多变量回归预测 目录 回归预测 | Matlab实现DE-BP差分算法优化BP神经网络多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现DE-BP差分算法优化BP神经网络多变量回归预测(完整源码和…

计算机组成原理19——控制单元的功能和实现1

本系列文章是学习了网课《哈尔滨工业大学–计算机组成原理》之后,用以梳理思路而整理的听课笔记及相关思维拓展。本文涉及到的观点均为个人观点,如有不同意见,欢迎在评论区讨论。 目录 四种周期下的微操作命令取指周期间址周期执行周期非访存…

大众汽车宣布将ChatGPT,批量集成在多种汽车中!

1月9日,大众汽车在官网宣布,将ChatGPT批量集成到电动、内燃机汽车中。 大众表示,将ChatGPT与其IDA语音助手相结合,用户通过自然语言就能与ChatGPT进行互动,例如,帮我看看最近的三星米其林饭店在哪里&#…

dubbo与seata集成

1.seata是什么? Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 2.seata的注解 GlobalTransactional:全局事务注解,添加了以后可实现分布式事务的回滚和提交,用法与spring…

linux 内存管理

地址类型 一个虚拟内存系统, 意味着用户程序见到的地址不直接对应于硬件使用 的物理地址. 虚拟内存引入了一个间接层, 它允许了许多好事情. 有了虚拟内存, 系统重 运行的程序可以分配远多于物理上可用的内存; 确实, 即便一个单个进程可拥有一个虚拟 地址空间大于系统的物理内存…

Docker 镜像以及镜像分层

Docker 镜像以及镜像分层 1 什么是镜像2 Docker镜像加载原理2.1 UnionFs:联合文件系统2.2 Docker镜像加载原理2.3 Docker镜像的特点 3 镜像的分层结构4 可写的容器层 1 什么是镜像 镜像是一种轻量级、可执行的独立软件包,用来打包软件运行环境和基于运行…

C语言中关于函数递归的理解

递归的概念:如果一个对象部分包含它自己,或者利用自己定义自己,则称这个对象是递归的;如果 一个过程直接或间接调用自己,则称这个过程是一个递归过程。递归的主要思考方式在于:将大事化小 我们先看一个例子 题目:输入一个无符号数&#xff0…

Open CASCADE学习|非线性方程组

非线性方程组是一组包含非线性数学表达式的方程,即方程中含有未知数的非线性项。解这类方程组通常比解线性方程组更为复杂和困难。 非线性方程组在很多领域都有应用,例如物理学、工程学、经济学等。解决非线性方程组的方法有很多种,包括数值…

CCSC,一种CPU架构

core-circuit-separate-computer 核与执行电路的分离,最初是为了省电。 用寄存器实现这种分离。 V寄存器控制着执行电路的供电,V0则不供电,进入省电模式;V1则供电,进入工作模式。 P寄存器是parameter-register&#xf…

Spark与Cassandra的集成与数据存储

Apache Spark和Apache Cassandra是大数据领域中两个重要的工具,用于数据处理和分布式数据存储。本文将深入探讨如何在Spark中集成Cassandra,并演示如何将Spark数据存储到Cassandra中。将提供丰富的示例代码,以帮助大家更好地理解这一集成过程…

【数据结构】二叉树的链式实现

树是数据结构中非常重要的一种,在计算机的各方个面都有他的身影 此篇文章主要介绍二叉树的基本操作 目录 二叉树的定义:二叉树的创建:二叉树的遍历:前序遍历:中序遍历:后序遍历:层序遍历&#…

ULINK2仿真器安装使用之工程设置

一、 ULINK2仿真器 ULINK2是ARM公司最新推出的配套RealView MDK使用的仿真器,是ULink仿真器的升级版本。ULINK2不仅具有ULINK仿真器的所有功能,还增加了串行调试(SWD)支持,返回时钟支持和实时代理等功能。开发工程师通…

vite 搭建vue3 TS项目初始框架

目录 仓库地址: 一.搭建项目 1.安装 Vite: 2.创建 Vue 3 项目: 3.进入项目目录: 4.安装依赖: 5.运行项目: 6.流程实操 二.修改项目结构,显示自定义的页面 1.整理静态样式文件 1.1.在 sr…