PySpark——Python与大数据

一、Spark 与 PySpark

Apache Spark 是用于大规模数据( large-scala data )处理的统一( unified )分析引擎。简单来说, Spark 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算 TB 、 PB 乃至 EB 级别的海量数据。

Spark 作为全球顶级的分布式计算框架,支持众多的编程语言进行开发,Python 语言是 Spark 重点支持的方向,PySpark 是由 Spark 官方开发的 Python 语言第三方库。

1.1 PySpark的安装

使用快捷键Win+R打开运行窗口,然后输入"cmd"并按下回车键,调出CMD命令窗口,在命令窗口中输入“ pip install pyspark ”,按下回车键,等待安装成功即可。(若无法直接顺利下载,可使用国内代理镜像网站(清华大学源)pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark)

1.2构建 PySpark 执行环境入口对象

SparkContext 类对象,是 PySpark 编程中一切功能的入口。想要使用 PySpark 库完成数据处理,首先需要构建一个执行环境入口对象,PySpark 的执行环境入口对象是类 SparkContext 的类对象。

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#上述写法是一种链式调用,等同于下面的写法
# conf=SparkConf()
# conf.setMaster('local[*]')
# conf.setAppName('test_spark_app')#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#打印PySpark的运行版本
print(sc.version)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/09 15:34:18 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/09 15:34:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

3.5.3

1.3 PySpark 的编程模型

PySpark 的编程,主要分为如下三大步骤:

PySpark 的编程模型 图1
PySpark 的编程模型 图2

通过 SparkContext 对象,完成数据输入,输入数据后得到 RDD 对象,对 RDD 对象进行迭代计算,最终通过 RDD 对象的成员方法,完成数据输出工作。

二、数据输入

2.1 RDD对象

PySpark 支持多种数据的输入,在输入完成后,都会得到一个 RDD 类的对象,RDD 全称为弹性分布式数据集( Resilient Distributed Datasets )。
为什么要使用RDD对象呢?因为PySpark 针对数据的处理,都是以 RDD 对象作为载体,即:

  • 数据存储在 RDD 内
  • 各类数据的计算方法也都是 RDD 的成员方法
  • RDD 的数据计算方法,返回值依旧是 RDD 对象

可以结合 -PySpark 的编程模型 图2- 理解。

2.2数据转换为RDD对象

2.2.1 Python 数据容器转换为 RDD 对象

PySpark 支持通过 SparkContext 对象的 parallelize 成员方法,将Python 数据容器( list、tuple、 set、 dict、str)转换为 PySpark 的 RDD 对象。

语法:rdd=SparkContext类对象.parallelize(Python 数据容器)

代码示例如下:

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#数据输入——Python 数据容器转 RDD 对象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize('my heart go on')
rdd4=sc.parallelize({'a',2,4,'abc'})
rdd5=sc.parallelize({1:'a',2:'b',3:'c'})
#输出RDD的内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 15:01:20 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 15:01:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
['m', 'y', ' ', 'h', 'e', 'a', 'r', 't', ' ', 'g', 'o', ' ', 'o', 'n']
[2, 'a', 4, 'abc']
[1, 2, 3]

由输出结果可知:

  • 字符串会被拆分成 一个一个的字符,存入 RDD 对象
  • 字典中仅有 key 会被存入 RDD 对象

2.2.2文件转换为 RDD 对象

PySpark 支持通过 SparkContext 的 textFile 成员方法,读取文本文件,转换成 RDD 对象。

语法:rdd=SparkContext类对象.textFile(文件路径)

我们新建一个文件“hello.txt”,在其中写入一句“Say goodbye to all your troubles”,然后保存。

读取文件“hello.txt”,将其转换成 RDD 对象,代码如下:

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#数据输入——文件数据转 RDD 对象
rdd=sc.textFile('E:/可视化案例数据/hello.txt')
#输出RDD的内容
print(rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 15:31:40 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 15:31:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['Say goodbye to all your troubles']

三、数据计算

PySpark 的数据计算,都是基于 RDD 对象来进行的,依赖 RDD 对象内置丰富的成员方法,也称为算子,本节主要介绍以下6种算子。

3.1 map算子

功能:对 RDD内的数据逐个处理,处理的具体步骤基于其接收的处理函数,处理完成后返回一个新的RDD。

语法:rdd.map ( func )

f : (T)  ->  U 表示这是一个函数(方法),接收一个参数传入,参数类型不限,返回一个返回值,返回值类型不限。

f : (T)  ->  T 表示这是一个函数(方法),接收一个参数传入,参数类型不限,返回一个返回值,返回值类型和传入参数类型一致。

写好代码后直接运行会报错“SparkException: Python worker failed to connect back.”,简单来说就是spark找不到python在哪里,我们需要通过OS模块设置一个环境变量来告诉spark python在哪里,代码如下所示,environ是一个字典,key为“PYSPARK_PYTHON”,value为python.exe的文件路径(因人而异,安装在哪里就写哪里)。

import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'

利用map算子将RDD中的数据扩大10倍,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
def func(x):return x*10
rdd2=rdd1.map(func)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

也可以使用 lambda匿名函数,让代码更加简洁高效:

 lambda匿名函数语法:lambda  传入参数:函数体(只能写一行代码)

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
rdd2=rdd1.map(lambda data:data*10)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

两种方法输出结果相同:

24/11/10 17:20:52 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 17:20:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[10, 20, 30, 40, 50]

如果函数体复杂,用直接定义方式写比较方便,如果函数体一行就能写完,用lambda匿名函数写比较方便。

如果需要经历数次处理,可以通过链式调用的方式多次调用算子,更为简便:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
def func1(x):return x*10
def func2(x):return x+10
def func3(x):return x-2
rdd2=rdd1.map(func1).map(func2).map(func3) #链式调用map算子
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 17:29:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 17:29:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[18, 28, 38, 48, 58]

注意:此处的函数func1,func2,func3只是为了演示链式调用,其中的操作很简单,可以合并。遇到无法合并的函数又需要调用多次map算子时,可以使用链式调用。

3.2 flatMap算子

功能:对RDD执行map操作,然后进行解除嵌套操作。

语法:rdd.flatMap ( func )

将数据 ['my heart','go on'] 转换成 ['my' , 'heart' , 'go' , 'on'] 这种格式,使用字符串分割函数split,按照空格切分数据,然后解除嵌套。

字符串分割函数split语法:字符串.split(分隔符字符串)    

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd3=sc.parallelize(['my heart','go on'])
print(rdd3.collect())
#按照空格切分数据,然后解除嵌套
rdd4=rdd3.flatMap(lambda x:x.split(' '))
print(rdd4.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 18:32:16 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 18:32:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['my heart','go on']
['my', 'heart', 'go', 'on']

flatMap 算子计算逻辑和map算子 一样,只是比map算子多出一层解除嵌套的功能。

3.3 reduceByKey算子

功能:针对KV型RDD,自动按照Key分组,然后根据提供的聚合逻辑(函数),完成对组内数据(value)的聚合操作。

语法:rdd.reduceByKey ( func )  #func为聚合函数

reduceByKey的聚合逻辑

统计数据[('a',1),('a',1),('b',1),('b',1),('b',1)]中a和b的出现次数,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd5=sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
rdd6=rdd5.reduceByKey(lambda a,b:a+b)
print(rdd6.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 19:20:56 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 19:20:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('a', 2), ('b', 3)]

注意:rdd.reduceByKey ( func )中,func只负责聚合,分组是自动ByKey来分组的。

图解

3.4 filter算子

功能:过滤数据,即对 RDD 数据逐个进行处理,返回值为True的数据被保留,返回值为False的数据被丢弃。

语法:rdd.filter ( func )

过滤掉数据[1,2,3,4,5,6]中的偶数,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5,6])
print(rdd1.collect())
#过滤掉数据中的偶数
rdd2=rdd1.filter(lambda x:x%2==1)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:20:46 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:20:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1,2,3,4,5,6]

[1, 3, 5]

3.5 distinct算子

功能:对 RDD 内数据去重。

语法:rdd.distinct() #无需传参

对数据[1,2,1,4,6,6]进行去重,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,1,4,6,6])
print(rdd1.collect())
#去重
rdd2=rdd1.distinct()
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:29:46 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:29:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 1, 4, 6, 6]
[1, 2, 4, 6]

3.6 sortBy算子

功能:对 RDD 数据进行排序,基于指定的排序依据(由函数规定)。

语法:rdd.sortBy(func,ascending,numPartitions) 

对数据[('苹果',6),('香蕉',3),('橙子',4),('西瓜',9),('火龙果',10)]进行排序,按照value值进行排序(降序),代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([('苹果',6),('香蕉',3),('橙子',4),('西瓜',9),('火龙果',10)])
print(rdd1.collect())
#按照value值进行排序(降序)
rdd2=rdd1.sortBy(lambda x:x[1],ascending=False,numPartitions=1) #此处设置分区数量为1,非要点,无需理解
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 22:05:04 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 22:05:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('苹果', 6), ('香蕉', 3), ('橙子', 4), ('西瓜', 9), ('火龙果', 10)]
[('火龙果', 10), ('西瓜', 9), ('苹果', 6), ('橙子', 4), ('香蕉', 3)]

四、数据输出

4.1输出为 Python 对象

数据输出为 Python 对象的方法是很多的,本小节简单介绍了以下 4 个。

4.1.1 collect算子

功能:将RDD各个分区内的数据统一收集到Driver中,返回一个list对象。

语法:rdd.collect()#无须传参

代码验证:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.collect())
print(type(rdd.collect()))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:33:52 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:33:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
<class 'list'>

4.1.2 reduce算子

功能:对RDD数据集按照传入的逻辑进行聚合

语法:rdd.reduce(func)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.reduce(lambda a,b:a*b))
print(type(rdd.reduce(lambda a,b:a*b)))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:52:33 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:52:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

120
<class 'int'>

图解
​​​​

4.1.3 take算子

功能:取RDD的前N个元素,组合成列表返回。

语法:rdd.take(N)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.take(3))
print(type(rdd.take(3)))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:41:18 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:41:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3]
<class 'list'>

4.1.4 count算子

功能:统计RDD内有多少条数据,返回一个数字

语法:rdd.count() #无须传参

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.count())
print(type(rdd.count()))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:44:34 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:44:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

5
<class 'int'>

4.2输出到文件中

将数据输出到文件中的方法本小节主要介绍了saveAsTextFile算子。

saveAsTextFile算子功能:将RDD中的数据写入文本文件中。

语法:rdd.saveAsTextFile(文件路径)

注意:调用saveAsTextFile这类保存文件的算子,需要配置 Hadoop 依赖,步骤如下:

  1. 下载 Hadoop 安装包,解压到电脑任意位置。(文件链接: https://pan.baidu.com/s/1HHEE2oIp2kYohTpObgVN8g?pwd=mwdy 提取码: mwdy 复制这段内容后打开百度网盘手机App,操作更方便哦)
  2. 在 Python 代码中使用 os 模块配置: os.environ[‘HADOOP_HOME’] = ‘HADOOP 解压文件夹路径’
  3. 下载 winutils.exe ,并放入 Hadoop 解压文件夹的 bin 目录内(文件链接: https://pan.baidu.com/s/1z67mUP3yrci93uLOXAuOxg?pwd=txv2 提取码: txv2 复制这段内容后打开百度网盘手机App,操作更方便哦)
  4. 下载 hadoop.dll ,并放入 :C:/Windows/System32 文件夹内(文件链接: https://pan.baidu.com/s/15vA0o6j8W9f5kNaEYRopDA?pwd=pbww 提取码: pbww 复制这段内容后打开百度网盘手机App,操作更方便哦)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
os.environ[‘HADOOP_HOME’] = ‘E:/HADOOP’#具体看个人的HADOOP保存路径
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5],numSlices=1)#修改 rdd 分区为 1 个,便于文件输出演示
#输出到文件file_test.txt中
rdd.saveAsTextFile('E:/可视化案例数据/file_test.txt')
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行代码后,打开该文件路径,发现文件“file_test.txt”已被建立,且通过右侧的文件预览可见文件内容与代码内容一致,如下图所示:

五、案例

4.1统计出现的单词数量

我们新建一个文件“test.txt”,在其中写入如下图所示的内容,然后保存。

案例要求:统计文件内所出现的单词的数量。

思路:

  1. 利用flatMap算子,将单词依次独立地取出。
  2. 利用map算子,给所有单词加上value(值为1),单词本身作为key。
  3. 利用reduceByKey算子,分组聚合,从而得出每个单词的出现总数。

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取数据文件
file=sc.textFile('E:/可视化案例数据/test.txt')
#将单词依次独立地取出
words_rdd=file.flatMap(lambda line:line.split(' '))
print(words_rdd.collect())
#给所有单词加上value(值为1),单词本身作为key
words_with_one_rdd=words_rdd.map(lambda x:(x,1))
print(words_with_one_rdd.collect())
#分组聚合
result_rdd=words_with_one_rdd.reduceByKey(lambda a,b:a+b)
print(result_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:02:57 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:02:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['goodbye', 'to', 'troubles', 'comes', 'to', 'comes', 'to', 'ease', 'troubles', 'comes', 'to', 'ease', 'goodbye']
[('goodbye', 1), ('to', 1), ('troubles', 1), ('comes', 1), ('to', 1), ('comes', 1), ('to', 1), ('ease', 1), ('troubles', 1), ('comes', 1), ('to', 1), ('ease', 1), ('goodbye', 1)]
[('goodbye', 2), ('troubles', 2), ('to', 4), ('comes', 3), ('ease', 2)]

4.2销售数据计算

读取文件“orders.txt”中的数据,并按要求进行计算。

文件链接: https://pan.baidu.com/s/1rEiJm3-BNZeo5MrzXiSZnw?pwd=g3a6 提取码: g3a6 复制这段内容后打开百度网盘手机App,操作更方便哦

案例要求:

  1. 各个城市销售额排名,从大到小
  2. 全部城市,有哪些商品类别在售卖
  3. 北京市有哪些商品类别在售卖

1.各个城市销售额排名,从大到小

如上图所示,文件数据是json格式,但并不是每个json数据都独占一行,其中有些json数据之间用“|”隔开了,我们要先利用flatMap算子,接收字符串分割函数,将json数据独立地分隔开。然后利用map算子将json数据转换成python字典,数据初步处理完成,

文件内同一城市的不同商品类别的销售数据是分散的,我们需要先取出城市和销售额数据,然后利用reduceByKey算子分组聚合,聚合同一城市不同商品的销售额,最后利用sortBy算子排序。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出城市和销售额数据
city_with_money_rdd=dict_rdd.map(lambda x:(x['areaName'],int(x['money']))) #注意,文件内money数据是字符串格式,为了满足排序的需求,必须将其强制转换为int类型
#聚合同一城市不同商品的销售额
city_result_money_rdd=city_with_money_rdd.reduceByKey(lambda a,b:a+b)
#按销售额排序,从大到小
result1_rdd=city_result_money_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(result1_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 20:54:15 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 20:54:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]

2.全部城市,有哪些商品类别在售卖

从文件中可以看出,同一商品可能多个城市都在售卖,那取出全部商品类别数据后,可能存在重复情况,就需要利用distinct算子去重。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出全部商品类别数据
category_rdd=dict_rdd.map(lambda x:x['category'])
#去重
result2_rdd=category_rdd.distinct()
print(result2_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:11:54 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:11:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']

3.北京市有哪些商品类别在售卖

首先利用filter算子过滤掉areaName不是北京的数据,因为北京同一商品类别不同时间的数据也而不同,需要去重。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出北京市的数据
BeiJing_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')
#取出北京市的商品类别数据并去重
result3_rdd=BeiJing_rdd.map(lambda x:x['category']).distinct() #链式调用
print(result3_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:23:00 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:23:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']

4.3搜索引擎日志分析

文件链接: https://pan.baidu.com/s/1y8Ydf2JErNWfIUU5jWMJ7Q?pwd=m81n 提取码: m81n 复制这段内容后打开百度网盘手机App,操作更方便哦

文件“search_log.txt”数据格式如下图所示:

案例要求:

  1. 打印输出:热门搜索时间段(小时精度) Top3
  2. 打印输出:热门搜索词 Top3
  3. 打印输出:统计黑马程序员关键字在哪个时段被搜索最多

1.打印输出:热门搜索时间段(小时精度) Top3

​​​​​​​思路如下:

  • ​​​​​​​从文件中可以看出每列数据之间使用“\t”对齐,利用字符串分割函数取出第一列时间数据。
  • 案例要求小时精度,时间数据格式为【小时:分钟:秒】,我们只需要小时,利用数据容器的切片操作取前两位即可。
  • 将数据转换成(小时,1)的二元元组形式,然后分组聚合,按降序从高到低排序。
  • 取前3个小时时间段即为“热门搜索时间段(小时精度) Top3”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result1=file_rdd.map(lambda x:x.split('\t')).map(lambda x:x[0][:2]).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(3)
print(result1)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/14 14:25:49 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 14:25:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('20', 3479), ('23', 3087), ('21', 2989)]

我们得到了搜索次数最多的时间段:晚上8点、晚上11点、晚上9点。

代码可以进一步改进:

改进后的代码:

​
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result1=file_rdd.map(lambda x:(x.split('\t')[0][:2],1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)
print(result1)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()​

2.打印输出:热门搜索词 Top3

​​​​​​​思路如下:

  • 从文件中可以看出每列数据之间使用“\t”对齐,利用字符串分割函数取出第三列热搜词数据。
  • 将数据转换成(热搜词,1)的二元元组形式,然后分组聚合,按降序从高到低排序。
  • 取前3个数据即为“热门搜索词 Top3”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result2=file_rdd.map(lambda x:x.split('\t')).\map(lambda x:x[2]).\map(lambda x:(x,1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)
print(result2)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

​​​​​​​输出:

24/11/14 15:02:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 15:02:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('scala', 2310), ('hadoop', 2268), ('博学谷', 2002)]

我们得到了被检索次数最多的3个热搜词及其具体被检索次数。

此处依然可以简化3个map算子,但为了代码易读性,此处不再进行简化,简化可看下图:

3.打印输出:统计黑马程序员关键字在哪个时段(小时精度)被搜索最多

思路如下:

  • 从文件中可以看出每列数据之间使用“\t”对齐利用字符串分割函数取出第一列时间数据和第三列关键字数据。案例要求小时精度,时间数据格式为【小时:分钟:秒】,我们只需要小时,利用数据容器的切片操作取前两位即可。
  • 过滤出关键词=黑马程序员的数据,过滤完成后不再需要关键字数据,将数据转换成(时间,1)的二元元组形式,然后分组聚合,统计不同时间段内关键词黑马程序员被搜索的次数。
  • 最后按降序从高到低排序,取第一个数据即为“黑马程序员关键字被搜索次数最多的时段”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result3=file_rdd.map(lambda x:x.split('\t')).\map(lambda x:(x[0][:2],x[2])).\filter(lambda x:x[1]=='黑马程序员').\map(lambda x:(x[0],1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(1)
print(result3)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

​​​​​​​输出:

24/11/14 15:21:20 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 15:21:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('22', 245)]

黑马程序员关键字在晚上10点这个时间段被检索次数最多,被检索次数为245次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/473679.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Java Springboot编程语言在线学习平台

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…

WebRTC视频 02 - 视频采集类 VideoCaptureModule

WebRTC视频 01 - 视频采集整体架构 WebRTC视频 02 - 视频采集类 VideoCaptureModule&#xff08;本文&#xff09; WebRTC视频 03 - 视频采集类 VideoCaptureDS 上篇 WebRTC视频 04 - 视频采集类 VideoCaptureDS 中篇 WebRTC视频 05 - 视频采集类 VideoCaptureDS 下篇 一、前言…

深度学习笔记14-卷积神经网络2

1.卷积神经网络的结构 卷积神经网络&#xff0c;是包含卷积运算且具有深度结构的前馈神经网络。在卷积神经网络中&#xff0c;包含卷积层、池化层和全连接层三种重要的结构。相比前馈神经网络&#xff0c;卷积层和池化层是新增的网络结构&#xff0c;在提取特征时&#xff0c;卷…

Python 正则表达式使用指南

Python 正则表达式使用指南 正则表达式&#xff08;Regular Expression, 简称 regex&#xff09;是处理字符串和文本的强大工具。它使用特定的语法定义一组规则&#xff0c;通过这些规则可以对文本进行匹配、查找、替换等操作。Python 提供了 re 模块&#xff0c;使得正则表达…

FPGA开发-逻辑分析仪的应用-数字频率计的设计

目录 逻辑分析仪的应用 数字频率计的设计 -基于原理图方法 主控电路设计 分频器设计 顶层电路设计 数字系统开发不但需要进行仿真分析&#xff0c;更重要的是需要进行实际测试。 逻辑分析仪的应用 测试方式&#xff1a;&#xff08;1&#xff09;传统的测试方式&#…

.NET 9.0 中 System.Text.Json 的全面使用指南

以下是一些 System.Text.Json 在 .NET 9.0 中的使用方式&#xff0c;包括序列化、反序列化、配置选项等&#xff0c;并附上输出结果。 基本序列化和反序列化 using System; using System.Text.Json; public class Program {public class Person{public string Name { get; se…

Linux 命令 | 每日一学,文本处理三剑客之awk命令实践

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] 0x00 前言简述 描述&#xff1a;前面作者已经介绍了文本处理三剑客中的 grep 与 sed 文本处理工具&#xff0c;今天将介绍其最后一个且非常强大的 awk 文本处理输出工具&#xff0c;它可以非常方便…

【第五课】Rust所有权系统(一)

目录 前言 所有权机制的核心 再谈变量绑定 主人变更-所有权转移 总结 前言 这节课我们来介绍下rust中最重要的一个点&#xff1a;所有权系统。这是网上经常说rust无gc的秘密所在。在开始之前&#xff0c;我们来想想JVM系语言&#xff0c;在做垃圾回收的过程&#xff0c;1.…

三周精通FastAPI:42 手动运行服务器 - Uvicorn Gunicorn with Uvicorn

官方文档&#xff1a;Server Workers - Gunicorn with Uvicorn - FastAPI 使用 fastapi 运行命令 可以直接使用fastapi run命令来启动FastAPI应用&#xff1a; fastapi run main.py如创建openapi.py文件&#xff1a; from fastapi import FastAPIapp FastAPI(openapi_url&…

任意文件下载漏洞

1.漏洞简介 任意文件下载漏洞是指攻击者能够通过操控请求参数&#xff0c;下载服务器上未经授权的文件。 攻击者可以利用该漏洞访问敏感文件&#xff0c;如配置文件、日志文件等&#xff0c;甚至可以下载包含恶意代码的文件。 这里再导入一个基础&#xff1a; 你要在网站下…

编写一个生成凯撒密码的程序

plain list(input("请输入需要加密的明文&#xff08;只支持英文字母&#xff09;&#xff1a;"))key int(input("请输入移动的位数&#xff1a;"))base_A ord(A)base_a ord(a)cipher []for each in plain:if each :cipher.append( )else:if each.i…

RDIFramework.NET CS敏捷开发框架 V6.1发布(.NET6+、Framework双引擎、全网唯一)

RDIFramework.NET C/S敏捷开发框架V6.1版本迎来重大更新与调整&#xff0c;全面重新设计业务逻辑代码&#xff0c;代码量减少一半以上&#xff0c;开发更加高效。全系统引入全新字体图标&#xff0c;整个界面焕然一新。底层引入最易上手的ORM框架SqlSugar&#xff0c;让开发更加…

华为USG5500防火墙配置NAT

实验要求&#xff1a; 1.按照拓扑图部署网络环境&#xff0c;使用USG5500防火墙&#xff0c;将防火墙接口加入相应的区域&#xff0c;添加区域访问规则使内网trust区域可以访问DMZ区域的web服务器和untrust区域的web服务器。 2.在防火墙上配置easy-ip&#xff0c;使trust区域…

Java基础-I/O流

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 字节流 定义 说明 InputStream与OutputStream示意图 说明 InputStream的常用方法 说明 OutputStrea…

RabbitMQ的工作队列在Spring Boot中实现(详解常⽤的⼯作模式)

上文着重介绍RabbitMQ 七种工作模式介绍RabbitMQ 七种工作模式介绍_rabbitmq 工作模式-CSDN博客 本篇讲解如何在Spring环境下进⾏RabbitMQ的开发.&#xff08;只演⽰部分常⽤的⼯作模式&#xff09; 目录 引⼊依赖 一.工作队列模式 二.Publish/Subscribe(发布订阅模式) …

<项目代码>YOLOv8 番茄识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

Python数据分析NumPy和pandas(三十五、时间序列数据基础)

时间序列数据是许多不同领域的结构化数据的重要形式&#xff0c;例如金融、经济、生态学、神经科学和物理学。在许多时间点重复记录的任何内容都会形成一个时间序列。许多时间序列是固定频率的&#xff0c;也就是说&#xff0c;数据点根据某些规则定期出现&#xff0c;例如每 1…

【C++滑动窗口】1248. 统计「优美子数组」|1623

本文涉及的基础知识点 C算法&#xff1a;滑动窗口及双指针总结 LeetCode1248. 统计「优美子数组」 给你一个整数数组 nums 和一个整数 k。如果某个连续子数组中恰好有 k 个奇数数字&#xff0c;我们就认为这个子数组是「优美子数组」。 请返回这个数组中 「优美子数组」 的数…

【不写for循环】玩玩行列

利用numpy的并行操作可以比纯用Python的list快很多&#xff0c;不仅如此&#xff0c;代码往往精简得多。 So, 这篇来讲讲进阶的广播和花哨索引操作&#xff0c;少写几个for循环&#xff08;&#xff09;。 目录 一个二维的例题 一个三维的例题 解法一 解法二 更难的三维例题…

《Java核心技术 卷I》用户界面中首选项API

首选项API 在桌面程序中&#xff0c;通常都会存储用户首选项&#xff0c;如用户最后处理的文件、窗口的最后位置等。 利用Properties类可以很容易的加载和保存程序的配置信息&#xff0c;但有以下缺点&#xff1a; 有些操作系统没有主目录概念&#xff0c;很难为匹配文件找到…