Python学习路线 - Python高阶技巧 - PySpark案例实战

Python学习路线 - Python高阶技巧 - PySpark案例实战

    • 前言介绍
      • Spark是什么
      • Python On Spark
      • PySpark
      • Why PySpark
    • 基础准备
      • PySpark库的安装
      • 构建PySpark执行环境入口对象
      • PySpark的编程模型
    • 数据输入
      • RDD对象
      • Python数据容器转RDD对象
      • 读取文件转RDD对象
    • 数据计算
      • map方法
      • flatMap方法
      • reduceByKey方法
      • 练习案例1
      • filter方法
      • distinct方法
      • sortBy方法
      • 练习案例2
        • 案例
    • 数据输出
      • 输出为Python对象
        • collect算子
        • reduce算子
        • take算子
        • count算子
      • 输出文件中
        • saveAsTextFile算子
        • 修改rdd分区为1
    • 综合案例
      • 搜索引擎日志分析
    • 分布式集群运行

前言介绍

Spark是什么

定义:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。
在这里插入图片描述

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃致EB级别的海量数据
在这里插入图片描述

Python On Spark

Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发。而Python语言,则是Spark重点支持的方向。
在这里插入图片描述
在这里插入图片描述

PySpark

Spark对Python语言的支持,重点体现在,Python第三方库:PySpark之上。

PySpark是由Spark官方开发的Python语言第三方库。
Python开发者可以使用pip程序快速的安装PySpark并像其它三方库那样直接使用。
在这里插入图片描述

Why PySpark

Python应用场景和就业方向是十分丰富的,其中,最为亮点的方向为:
大数据开发 和 人工智能
在这里插入图片描述

总结
1.什么是Spark、什么是PySpark

  • Spark是Apache基金会旗下的顶级开源项目,用于对海量数据进行大规模分布式计算。
  • PySpark是Spark的Python实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发
  • PySpark不仅可以作为Python第三方库使用,也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。

2.为什么要学习PySpark?
大数据开发是Python众多就业方向中的明星赛道,薪资高岗位多,Spark(PySpark)又是大数据开发中的核心技术

基础准备

PySpark库的安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。

在"CMD"命令提示符程序内,输入:

pip install pyspark

或者使用国内代理镜像网站(清华大学源)

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

在这里插入图片描述

构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。
PySpark的执行环境入口对象是:类SparkContext的类对象
在这里插入图片描述
在这里插入图片描述
代码示例:

"""
演示获取PySpark的执行环境入口对象:SparkContext
并通过SparkContext对象获取当前PySpark的版本
"""# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf = conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

执行结果:
在这里插入图片描述

PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为如下三大步骤:

在这里插入图片描述
在这里插入图片描述

  • 通过SparkContext对象,完成数据输入
  • 输入数据后得到RDD对象,对RDD对象进行迭代计算
  • 最终通过RDD对象的成员方法,完成数据输出工作

总结
1.如何安装PySpark库
pip install pyspark
2.为什么要构建SparkContext对象作为执行入口
PySpark的功能都是从SparkContext对象作为开始
3.PySpark的编程模型是?

  • 数据输入:通过SparkContext完成数据读取
  • 数据计算:读取到的数据转换为RDD对象,调用RDD的成员方法完成计算
  • 数据输出:调用RDD的数据输出相关成员方法,将结果输出到list、元组、字典、文本文件、数据库等

数据输入

RDD对象

如图可见,PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象
    在这里插入图片描述
    PySpark的编程模型(上图)可以归纳为:
  • 准备数据到RDD -> RDD迭代计算 -> RDD导出为list、文本文件等
  • 即:源数据 -> RDD -> 结果数据

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将:

  • list
  • tuple
  • set
  • dict
  • str

转换为PySpark的RDD对象
在这里插入图片描述

注意:

  • 字符串会被拆分出1个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象,来读取文件,来构建出RDD对象。
在这里插入图片描述
代码示例:

"""
演示通过PySpark代码加载数据,即数据输入
"""
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})# 如果要查看RDD里面有什么内容,需要使用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())# 通过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("D:/yuancheng/20231120/资料/第15章资料/资料/hello.txt")
print(rdd.collect())sc.stop()

输出结果:

D:\python\python-learn\venv\Scripts\python.exe D:\python\python-learn\模块\02_数据输入.py 
24/01/14 09:52:11 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/14 09:52:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
['a', 'b', 'c', 'd', 'e', 'f', 'g']
[1, 2, 3, 4, 5]
['key1', 'key2']
['mry mry itcast mry', 'spark python spark python mry', 'mry itcast itcast mry python', 'python python spark pyspark pyspark', 'mry python pyspark itcast spark']进程已结束,退出代码为 0

总结
1.RDD对象是什么?为什么要使用它?
RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体,它可以:

  • 提供数据存储
  • 提供数据计算的各类方法
  • 数据计算的方法,返回值依旧是RDD(RDD迭代计算)

后续对数据进行各类计算,都是基于RDD对象进行
2.如何输入数据到Spark(即得到RDD对象)

  • 通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象
  • 通过SparkContext的textFile成员方法,读取文本文件得到RDD对象

数据计算

map方法

PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?
自然是依赖,RDD对象内置丰富的:成员方法(算子)

map算子

功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
在这里插入图片描述
在这里插入图片描述

代码示例:

"""
演示RDD的map成员方法的使用
"""
import timefrom pyspark import SparkConf, SparkContext
import os
# os.environ['PYSPARK_PYTHON'] = 'D:/python/python-learn/venv/Scripts/python.exe'
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
# 通过map方法将全部数据都乘以10
def func(data):return data * 10# rdd2 = rdd.map(func)
# 链式调用
rdd2 = rdd.map(lambda x: x*10).map(lambda x : x + 5)
print(rdd2.collect())
# (T) -> U
# (T) -> Tsc.stop()

执行结果:
在这里插入图片描述

总结
1.map算子(成员方法)

  • 接受一个处理函数,可用lambda表达式快速编写
  • 对RDD内的元素逐个处理,并返回一个新的RDD

2.链式调用

  • 对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子。

flatMap方法

功能:对RDD执行map操作,然后进行解除嵌套操作。
在这里插入图片描述
在这里插入图片描述

代码示例:

"""
演示RDD的flatMap成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize(["mry go 666", "mry mry go", "python mry"])# 需求,将RDD数据里面的一个个单词提取出来
rdd2 = rdd.map(lambda x: x.split(" "))
print(rdd2.collect())rdd3 = rdd.flatMap(lambda x: x.split(" "))
print(rdd3.collect())

输出结果:
在这里插入图片描述

总结:
1.flatMap算子

  • 计算逻辑和map一样
  • 可以比map多出,解除一层嵌套的功能

reduceByKey方法

功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组数据(value)的聚合操作。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

代码示例:

"""
演示RDD的reduceByKey成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a,b : a + b)
print(rdd2.collect())

输出结果:
在这里插入图片描述

总结:
1.reduceByKey算子

  • 接受一个处理函数,对数据进行两两计算
    在这里插入图片描述

练习案例1

WordCount案例
使用学习到的内容,完成:

  • 读取文件
  • 统计文件内,单词的出现数量
    在这里插入图片描述

代码示例:

"""
完成练习案例:单词计数统计
"""# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 2.读取数据文件
rdd = sc.textFile("D:/yuancheng/20231120/资料/第15章资料/资料/hello.txt")
print(rdd.collect())# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
print(word_rdd.collect())# 4.将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
print(word_with_one_rdd.collect())# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b : a + b)# 6.打印输出结果
print(result_rdd.collect())

输出结果:
在这里插入图片描述

filter方法

功能:过滤想要的数据进行保留
在这里插入图片描述

代码示例:

"""
演示RDD的filter成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# 对RDD的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())

输出结果:
在这里插入图片描述
总结
1.filter算子

  • 接受一个处理函数,可用lambda快速编写
  • 函数对RDD数据逐个处理,得到True的保留至返回值的RDD中

distinct方法

功能:对RDD数据进行去重,返回新RDD
在这里插入图片描述
在这里插入图片描述

代码示例:

"""
演示RDD的distinct成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5, 7, 8, 8, 9, 10])# 对RDD的数据进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())

输出结果:
在这里插入图片描述

总结
1.distinct算子

  • 完成对RDD内数据的去重操作

sortBy方法

功能:对RDD数据进行排序,基于你指定的排序依据。
在这里插入图片描述

代码示例:

"""
演示RDD的sortBy方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 1.读取数据文件
rdd = sc.textFile("D:/yuancheng/20231120/资料/第15章资料/资料/hello.txt")
# 2.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 3.将所有单词都转换成二元元组,单词为key,value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 4.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())
# 5.对结果进行排序
final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(final_rdd.collect())

输出结果:
在这里插入图片描述

总结
1.sortBy算子

  • 接收一个处理函数,可用lambda快速编写
  • 函数表示用来决定排序的依据
  • 可以控制升序或降序
  • 全局排序需要设置分区数为1

练习案例2

案例

在这里插入图片描述
需求,复制以上内容到文件中,使用Spark读取文件进行计算:

  • 各个城市销售额排名,从大到小
  • 全部城市,有哪些商品类别在售卖
  • 北京市有哪些商品类别在售卖

代码示例:

"""
练习案例:JSON商品统计
需求:
1. 各个城市销售额排名,从大到小
2. 全部城市,有哪些商品类别在售卖
3. 北京市有哪些商品类别在售卖
"""
from pyspark import SparkConf, SparkContext
import json
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# TODD 需求1:城市销售额排名
# 1.1 读取数据文件到RDD
file_rdd = sc.textFile("D:/yuancheng/20231120/资料/第15章资料/资料/orders.txt")# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
print(json_str_rdd.collect())# 1.3 将一个个JSON字符串转换为字典
dict_rdd =json_str_rdd.map(lambda x: json.loads(x))
print(dict_rdd.collect())# 1.4 取出城市和销售额数据
# (城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a,b : a + b)# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result1_rdd.collect())# TODD 需求2: 全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
# 2.2 对全部商品类别进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())# TODD 需求3:北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')# 3.2 取出全部商品类别
# 3.3 进行商品类别去重
result3_rdd = beijing_data_rdd.map(lambda x: x['areaName']).distinct()
print("需求2的结果:", result3_rdd.collect())

输出结果:
在这里插入图片描述

数据输出

数据输入:

  • sc.parallelize
  • sc.textFile

数据计算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey

在这里插入图片描述

输出为Python对象

collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
返回值是一个list

代码示例:

"""
演示将RDD输出为Python对象
"""
from pyspark import SparkConf, SparkContext
import json
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# collect算子,输出RDD为List对象
rdd_list: list = rdd.collect()
print(rdd_list)
print(type(rdd_list))

输出结果:
在这里插入图片描述

reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

返回值等同于计算函数的返回值

代码示例:

"""
演示将RDD输出为Python对象
"""
from pyspark import SparkConf, SparkContext
import json
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)

输出结果:
在这里插入图片描述

take算子

功能:取RDD的前N个元素,组合成list返回给你
在这里插入图片描述

代码示例:

"""
演示将RDD输出为Python对象
"""
from pyspark import SparkConf, SparkContext
import json
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# take算子,输出RDDN个元素,组成List返回
take_list = rdd.take(3)
print(take_list)

输出结果:
在这里插入图片描述

count算子

功能:计算RDD有多少条数据,返回值是一个数字
在这里插入图片描述

代码示例:

"""
演示将RDD输出为Python对象
"""
from pyspark import SparkConf, SparkContext
import json
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")

输出结果:
在这里插入图片描述
总结
1.Spark的编程流程就是:

  • 将数据加载为RDD(数据输入)
  • 对RDD进行计算(数据计算)
  • 将RDD转换为Python对象(数据输出)
    2.数据输出的方法
  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

数据输出可用的方法是很多的,本小节简单的介绍了4个。

输出文件中

saveAsTextFile算子

功能:将RDD的数据写入文本文件中
支持 本地写出,hdfs等文件系统

代码:
在这里插入图片描述

注意事项
调用保存文件的算子,需要配置Hadoop依赖

  • 下载Hadoop安装包
    • http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
  • 解压到电脑任意位置
  • 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’
  • 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
    • https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
  • 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内
    • https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll

代码示例:

"""
演示将RDD输出到文件中
"""
from pyspark import SparkConf, SparkContext
import json
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/tool/hadoop/hadoop-3.0.0'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)])# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])# 输出到文件中
rdd1.saveAsTextFile("D:/yuancheng/20231120/资料/第15章资料/资料/output1")
rdd2.saveAsTextFile("D:/yuancheng/20231120/资料/第15章资料/资料/output2")
rdd3.saveAsTextFile("D:/yuancheng/20231120/资料/第15章资料/资料/output3")

输出结果:
在这里插入图片描述

修改rdd分区为1

方式1,SparkConf对象设置属性全局并行度为1:
在这里插入图片描述

方式2,创建RDD的时候设置(parallelize方法传入numSlices参数为1)
在这里插入图片描述
在这里插入图片描述

代码示例:

"""
演示将RDD输出到文件中
"""
from pyspark import SparkConf, SparkContext
import json
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/tool/hadoop/hadoop-3.0.0'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)# 输出到文件中
rdd1.saveAsTextFile("D:/yuancheng/20231120/资料/第15章资料/资料/output1")
rdd2.saveAsTextFile("D:/yuancheng/20231120/资料/第15章资料/资料/output2")
rdd3.saveAsTextFile("D:/yuancheng/20231120/资料/第15章资料/资料/output3")

输出结果:
在这里插入图片描述

总结
1.RDD输出到文件的方法

  • rdd.saveAsTextFile(路径)
  • 输出的结果是一个文件夹
  • 有几个分区就输出多少个结果文件

2.如何修改RDD分区

  • SparkConf对象设置conf.set(“spark.default.parallelism”, “1”)
  • 创建RDD的时候,sc.parallelize方法传入numSlices参数为1

综合案例

搜索引擎日志分析

在这里插入图片描述
读取文件转换成RDD,并完成:

  • 打印输出:热门搜索时间段(小时精度)Top3
  • 打印输出:热门搜索词Top3
  • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  • 将数据转换为JSON格式,写出为文件

代码示例:

"""
演示PySpark综合案例
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = 'D:/install/python/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)# 读取文件转换成RDD
file_rdd = sc.textFile("D:/yuancheng/20231120/资料/第15章资料/资料/search_log.txt")# TODD 需求1:热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1)的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: x.split("\t")).\map(lambda x: x[0][:2]).\map(lambda x: (x, 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求1的结果:", result1)# TODD 需求2:热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求2的结果:", result2)# TODD 需求3:统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1)的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\filter(lambda x: x[2] == '黑马程序员').\map(lambda x: (x[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(1)
print("需求3的结果:", result3)# TODD 需求4:将数据转换为JSON格式,写出到文件中
# 4.1 转换为文件
file_rdd.map(lambda x: x.split("\t")).\map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\saveAsTextFile("D:/yuancheng/20231120/资料/第15章资料/资料/output_json")

输出结果:

D:\install\python\python.exe D:\python\python-learn\模块\13_综合案例.py 
需求1的结果: [('20', 3479), ('23', 3087), ('21', 2989)]
需求2的结果: [('scala', 2310), ('hadoop', 2268), ('博学谷', 2002)]
需求3的结果: [('22', 245)]进程已结束,退出代码为 0

在这里插入图片描述

分布式集群运行

提交命令:


bin/spark-submit --master yarn --num-executors 3 --queue root.teach --executor-cores 4 --executor-memory 4g /home/hadoop/demo.py

输出结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/252243.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HuggingFace库中BERTForxxx模型代码详细分析 使用BERT进行无监督预训练

HuggingFace库中BERTForxxx模型代码详细分析 使用BERT进行无监督预训练 引言 HF库封装的各种任务列举 BertModel的结构分析 BertForPreTraining的结构分析 BertForMaskedLM的结构分析 BertForNextSentencePrediction的结构分析 BertForSequenceClassification的结构分析 …

sqli.labs靶场(23关到28a关)

23、第二十三关 id1单引号闭合 找位置1 and 12 union select 1,2,3 爆库:1 and 12 union select 1,2,database() 爆表名:1 and 12 union select 1,2,group_concat(table_name) from information_schema.tables where table_schemasecurity 爆字段&#…

推动海外云手机发展的几个因素

随着科技的不断发展,海外云手机作为一种新兴技术,在未来呈现出令人瞩目的发展趋势。本文将在用户需求、技术创新和全球市场前景等方面,探讨海外云手机在未来的发展。 1. 用户需求的引领: 随着人们对移动性和便捷性的需求不断增长&…

Linux|Grep 命令的 12 个实用示例

您是否曾经遇到过在文件中查找特定字符串或模式的任务,但不知道从哪里开始查找?那么,grep 命令可以拯救你! grep 是一个功能强大的文件模式搜索器,每个 Linux 发行版都配备了它。如果出于某种原因,它没有安…

JavaScript运行机制

在web前端开发中,JavaScript无疑是一种非常重要的编程语言。它能够为网页添加动态交互功能,提升用户体验。然而,要充分发挥JavaScript的威力,我们需要对它的运行机制有一定的了解。 JavaScript是一种解释执行的脚本语言&#xff…

【LeetCode力扣】单调栈解决Next Greater Number(下一个更大值)问题

目录 1、题目介绍 2、解题思路 2.1、暴力破解法 2.2、经典Next Greater Number问题解法 1、题目介绍 原题链接:496. 下一个更大元素 I - 力扣(LeetCode) 示例1: 输入:nums1 [4,1,2], nums2 [1,3,4,2].输出&…

SpringSecurity(17)——OAuth2令牌管理策略

刷新令牌策略 注意&#xff1a;刷新令牌只有在授权码模式和密码模式中才有&#xff0c;对应的指定这两种模式时&#xff0c;在类型上加上refresh_token <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-se…

Prometheus 采集Oracle监控数据

前言 oracledb_exporter是一个开源的Prometheus Exporter,用于从Oracle数据库中收集关键指标并将其暴露给Prometheus进行监控和告警。它可以将Oracle数据库的性能指标转换为Prometheus所需的格式,并提供一些默认的查询和指标。 download Oracle Oracle Windows Install …

【前端web入门第四天】02 CSS三大特性+背景图

文章目录: 1. CSS三大特性 1.1继承性 1.2 层叠性 1.3 优先级 1.3.1 优先级1.3.2 优先级-叠加计算规则 2. 背景图 2.1 背景属性2.2 背景图2.3 背景图的平铺方式2.4 背景图位置2.5 背景图缩放2.6 背景图固定2.7 背景复合属性 1. CSS三大特性 1.1继承性 什么是继承性? 子级默…

2023_中国零售业人工智能行业应用 发展图谱

01 零售人工智能行业应用发展背景 02 零售人工智能行业应用发展图谱及行业应用案例 案例&#xff1a;京东云、蓝色光标、京东言犀智能服务、腾讯企点、 案例&#xff1a;淘天集团、极睿科技、百度电商数字人直播 案例&#xff1a;中国联通、云拿科技AI智能商店&#xff1b; 0…

[设计模式Java实现附plantuml源码~结构型]实现对象的复用——享元模式

前言&#xff1a; 为什么之前写过Golang 版的设计模式&#xff0c;还在重新写Java 版&#xff1f; 答&#xff1a;因为对于我而言&#xff0c;当然也希望对正在学习的大伙有帮助。Java作为一门纯面向对象的语言&#xff0c;更适合用于学习设计模式。 为什么类图要附上uml 因为很…

【Iceberg学习二】Branch和Tag在Iceberg中的应用

Iceberg 表元数据保持一个快照日志&#xff0c;记录了对表所做的更改。快照在 Iceberg 中至关重要&#xff0c;因为它们是读者隔离和时间旅行查询的基础。为了控制元数据大小和存储成本&#xff0c;Iceberg 提供了快照生命周期管理程序&#xff0c;如 expire_snapshots&#xf…

基于Vue的移动端UI框架整理

一、Vant 官方地址&#xff1a;https://youzan.github.io/vant/#/zh-CN/ 简介&#xff1a;有赞公司开发。 特性&#xff1a;60 高质量组件、90% 单元测试覆盖率、完善的中英文文档和示例、支持按需引入、支持主题定制、支持国际化、支持 TS、支持 SSR。 特别说明&#xff1…

RabbitMQ-2.SpringAMQP

SpringAMQP 2.SpringAMQP2.1.创建Demo工程2.2.快速入门2.1.1.消息发送2.1.2.消息接收2.1.3.测试 2.3.WorkQueues模型2.2.1.消息发送2.2.2.消息接收2.2.3.测试2.2.4.能者多劳2.2.5.总结 2.4.交换机类型2.5.Fanout交换机2.5.1.声明队列和交换机2.5.2.消息发送2.5.3.消息接收2.5.4…

C语言第十八弹---指针(二)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 指针 1、const修饰指针 1.1、const修饰变量 1.2、const修饰指针变量 2、指针运算 2.1、指针- 整数 2.2、指针-指针 2.3、指针的关系运算 3、野指针 3.1、…

Stable Diffusion 模型下载:国风3 GuoFeng3

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十推荐提示词下载地址模型介绍 欢迎使用GuoFeng3模型 - 这是一个中国华丽古风风格模型,也可以说是一个古风游戏角色模型,具有2.5D的质感。 条目内

2024年Java面试题大全 面试题附答案详解,BTA内部面试题

基础篇 1、 Java语言有哪些特点 1、简单易学、有丰富的类库 2、面向对象&#xff08;Java最重要的特性&#xff0c;让程序耦合度更低&#xff0c;内聚性更高&#xff09; 阿里内部资料 基本类型 大小&#xff08;字节&#xff09; 默认值 封装类 6、Java自动装箱与拆箱 装箱就是…

Python中的while循环,知其然知其所以然

文章目录 while循环结构1.用循环打印1 ~ 100步骤解析2. 1 ~ 100的累加和3.死循环1. 用死循环的方法实现 1 ~ 100累加和 4. 单向循环(1)打印 一行十个小星星*(2)通过打印一个变量的形式,展现一行十个小星星(3)一行十个换色的星星 ★☆★☆★☆★☆★☆(4)用一个循环,打印十行十列…

Docker 一小时从入门到实战 —— Docker commands | Create your own image | vs VM ... 基本概念扫盲

Docker crash course 文章目录 Docker crash course1. What and Why of Docker?2.1 What2.2 What problem does it solve?2.2.1 before containers2.1.2 with containers 2. Docker vs Virtual Machines2.1 Difference2.2 Benefits 3. Install docker locally4. Images vs Co…

深入PyTorch——reshape方法和view方法的差异

深入PyTorch——reshape方法和view方法的差异 &#x1f335;文章目录&#x1f335; &#x1f333;引言&#x1f333;&#x1f333;reshape方法&#x1f333;&#x1f333;view方法&#x1f333;&#x1f333;总结&#x1f333;&#x1f333;结尾&#x1f333; &#x1f333;引言…