Spark和PySpark的介绍
PySpark的相关设置
安装PySpark库
pip install pyspark
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
构建PySpark执行环境入口对象
# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
"""
上面这句等价于:
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("test_spark_app")
"""# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)# 打印pyspark的运行版本
print(sc.version)# 停止SparkContext类对象的运行(停止pyspark程序)
sc.stop()
PySpark的编程模型
RDD对象
将容器转化为RDD对象
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize("12345")
rdd3 = sc.parallelize((1,2,3,4,5))
rdd4 = sc.parallelize({1,2,3,4,5})
rdd5 = sc.parallelize({"name":1,"age":2})print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
[1, 2, 3, 4, 5]
['1', '2', '3', '4', '5']
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
['name', 'age']
读取文件转RDD对象
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.textFile("C:/Users/18757/Desktop/pythontext/bill.txt") # 文件路径
print(rdd.collect())
['周杰轮,2022-01-01,100000,消费,正式', '周杰轮,2022-01-02,300000,消费,正式', '周杰轮,2022-01-03,100000,消费,测试', '林俊节,2022-01-01,300000,消费,正式', '林俊节,2022-01-02,100000,消费,正式', '林俊节,2022-01-03,100000,消费,测试', '林俊节,2022-01-02,100000,消费,正式']
map算子
from pyspark import SparkConf, SparkContext
# 导入python解释器的位置
import os
os.environ['PYSPARK_PYTHON'] = r"D:\dev\python\python3.10.4\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5])
# 通过map方法将全部数据都乘以10def func(data):return data * 10
rdd2 = rdd.map(func).map(lambda x:x+1)print(rdd2.collect())
[11, 21, 31, 41, 51]