Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入

1.1 数据读取

  • 读文件
    • read.json
    • read.csv
      • csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。
    • read.orc
  • 读数据库
    • read.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’})
      数据库创建测试数据:
create database itcast charset=utf8;create table itcast.tb_user(id int,name varchar(20),age int,gender varchar(20)
);insert into  itcast.tb_user values (1,'张三',20,'男');

表查看:
在这里插入图片描述
读取数据库数据:

# 读取数据源,将数据转为DF
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# read读取数据库数据
# 使用jdbc方法通过jdbc读取数据库数据,在读取数据库之前,需要现将数据库连接驱动放入spark的jars目录下
#
df = ss.read.jdbc('jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
df.show()

运行结果:
在这里插入图片描述

1.2 数据写入

因为数据是在df中存储,所以使用DataFrame进行数据写入

使用DataFrame的的write方法


写入文件有个模式,覆盖和追加两种方式,用mode参数指定
覆盖 overwrite
追加 append

  • 写入文件
    • write.json
    • write.csv
    • write.orc
  • 写入数据库
    • write.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’},mode=‘写入方式’)
# 数据写入
from pyspark.sql import SparkSession,Row
ss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([Row(id=1,name='张三',age=20),Row(id=2,name='李四',age=20),Row(id=3,name='王五',age=20)],schema='id int,name string,age int'
)# 将df数据写入hdfs文件中  mode='overwrite' 覆盖写入   append 追加写入
df.write.json('hdfs://node1:8020/data_json',mode='overwrite')# 写入数据库
# create table itcast.tb_stu(
#     id int,
#     name varchar(20),
#     age int
# );
# 在jdbc连接中指定编码字符集为utf-8
df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

运行结果:
在这里插入图片描述

2. 自定义函数

在这里插入图片描述

2.1 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

2.2 UDF函数

对每一行数据依次进行计算,返回每一行的结果。

#UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()#自定义字符串长度计算函数
def len_func(field):if field is None:return 0else:data = len(field)return data
#将自定义的函数注册到spark中使用
len_func = ss.udf.register('len_func', len_func,returnType=IntegerType())#在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()#sql语句中使用
df.createTempView('stu')
df3= ss.sql('select *,len_func(name) from stu')
df3.show()

2.3 UDAF函数

多进一出 主要是聚合
使用pandas中的series实现,可以读取一列数据存储在pandas的series中进行数据的聚合。

#UDAF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
import pandas as pdss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',',schema = 'id int,name string,age int,gender string,cls string')df.show()#对某个字段的整列数据进行计算,自定义udaf函数
# 第一步,装饰器注册
@F.pandas_udf(returnType=IntegerType())
def sub(field:pd.Series) -> int:n=field[0] #取出第一个值作为初始值for i in field[1::]:n-=ireturn n
#第二步,register方法注册
sub = ss.udf.register('sub', sub)df2 = df.select(sub('age'))
df2.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/453840.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

万能工具箱小程序源码系统 带完整的安装代码包以及搭建部署教程

系统概述 万能工具箱小程序源码系统是一款集多种实用工具于一体的综合性平台。它为用户提供了便捷的操作界面和丰富的功能选项,满足了人们在日常生活和工作中的各种需求。 该系统采用先进的技术架构,具备高度的稳定性和可靠性。无论是在处理大量数据还…

python excel如何转成json,并且如何解决excel转成json时中文汉字乱码的问题

1.解决excel转成json时中文汉字乱码的问题 真的好久没有打开这个博客也好久没有想起来记录一下问题了,今天将表格测试集转成json格式的时候遇到了汉字都变成了乱码的问题,虽然这不是个大问题,但是编码问题挺烦人的,乱码之后像下图…

Flink窗口分配器WindowAssigner

前言 Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling wi…

【C++篇】栈的层叠与队列的流动:在 STL 的节奏中聆听算法的静谧旋律

文章目录 C 栈与队列详解:基础与进阶应用前言第一章:栈的介绍与使用1.1 栈的介绍1.2 栈的使用1.2.1 最小栈1.2.2 示例与输出 1.3 栈的模拟实现 第二章:队列的介绍与使用2.1 队列的介绍2.2 队列的使用2.2.1 示例与输出 2.3 队列的模拟实现2.3.…

【linux】线程(二)

10. pthread_t 类型 注意: 每一个线程的库级别的tcb的起始地址,就是线程的 tid每一个线程都有自己独立的栈结构线程和线程之间,也是可以被其他线程看到并访问的(比如全局函数) 代码 如果想要进程拥有私人的全局变量(即…

拥抱“新市民” ,数字银行的“谋与变”

【潮汐商业评论/原创】 数字银行,既是金融行业的创新物种,其在发展的过程中也彰显着普惠金融的基因。 “我劝你买点银行理财吧,选一家靠谱的银行就是最靠谱的理财方式了,踏踏实实地把钱存银行里面不会有问题的”,周日…

SpringBoot篇(二、制作SpringBoot程序)

目录 一、代码位置 二、四种方式 1. IDEA联网版 2. 官网 3. 阿里云 4. 手动 五、在IDEA中隐藏指定文件/文件夹 六、复制工程-快速操作 七、更改引导类别名 一、代码位置 二、四种方式 1. IDEA联网版 2. 官网 官网制作:Spring Boot 3. 阿里云 阿里云版制…

react18中的计算属性及useMemo的性能优化技巧

react18里面的计算属性和使用useMemo来提升组件性能的方法 计算属性 实现效果 代码实现 函数式组件极简洁的实现,就这样 import { useState } from "react"; function FullName() {const [firstName, setFirstName] useState("");const [la…

AlDente Pro for Mac电脑 充电限制保护工具 安装教程【简单,轻松上手】

Mac分享吧 文章目录 AlDente Pro for Mac 充电限制保护工具 安装完成,软件打开效果一、AlDente Pro for Mac 充电限制保护工具 Mac电脑版——v1.28.41️⃣:下载软件2️⃣:安装软件,将安装包从左侧拖入右侧文件夹中,等…

c++初阶--string类(使用)

大家好,许久不见,今天我们来学习c中的string类,在这一部分,我们首先应该学习一下string类的用法,然后再试着自己去实现一下string类。 在这里,我使用的是这个网站来查找的string类,这里面的内容…

Web,RESTful API 在微服务中的作用是什么?

大家好,我是锋哥。今天分享关于【Web,RESTful API 在微服务中的作用是什么?】面试题?希望对大家有帮助; Web,RESTful API 在微服务中的作用是什么? 在微服务架构中,Web 和 RESTful …

react18中如何实现同步的setState来实现所见即所得的效果

在react项目中,实现添加列表项,最后一项自动显示在可视区域范围!! 实现效果 代码实现 import { useState, useRef } from "react"; import { flushSync } from "react-dom"; function FlushSyncRef() {con…

基于SSM网络在线考试系统的设计

管理员账户功能包括:系统首页,个人中心,学生管理,在线考试管理,试题管理,考试管理,系统管理 前台账号功能包括:系统首页,个人中心,在线考试,公告信…

word删除空白页 | 亲测有效

想要删掉word里面的末尾空白页,但是按了delete之后也没有用 找了很久找到了以下亲测有效的方法 1. 通过鼠标右键在要删除的空白页面处显示段落标记 2. 在字号输入01,按ENTER(回车键) 3.成功删除了!!

ECharts饼图-饼图34,附视频讲解与代码下载

引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外,我还将提供详…

模型实战(27)之 YOLO11 推理、验证及训练自己的数据集

模型实战(27)之 YOLO11推理、验证及训练自己的数据集 2024年10.17,YOLO11是近期十月份刚经ultralytics团队更新优化发布的视觉算法深度学习网络模型,其网络模型结构代码实现也采用了比较新的Python数据结构,所以虚拟环境搭建安装包也比较新,经过多次踩坑,把关键环节记录…

电子便签:从偶像剧到职场的实用转变

在快节奏的现代生活中,便签已经成为了我们不可或缺的助手,无论是纸质的还是电子的,它们都以小巧的“身躯”承载着我们的日常记忆和待办事项。从偶像剧中常见的“便利贴”女孩形象,到如今电子便签的普及,它们帮助我们捕…

百度搜索竞价推广有必要做吗?怎么做效果好!

百度竞价推广,有的行业适合,有的行业则不行,下面我给大家分享下哪些行业可以。 如果是招商加盟、招代理商、招经销商,或者是高客单价咨询服务费,甚至是找合作方、渠道方的企业主都可以投放竞价推广。 总之一句话&…

网络安全的五大误区,你中招了吗?

在数字化时代,网络安全问题日益突出,许多人在使用网络过程中存在一些误区,导致个人信息泄露、财产损失等问题。本文将为您揭示网络安全的五大误区,帮助您提高安全防范意识。 误区一:使用复杂密码就一定安全 许多人认为…

基于SpringBoot+Vue+uniapp微信小程序的垃圾分类系统的详细设计和实现(源码+lw+部署文档+讲解等)

项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念,提供了一套默认的配置,让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…