目录
一、安装须知
二、安装Spark
1、下载安装包
2、修改配置文件spark-env.sh
3、验证Spark是否安装成功
三、安装py4j
四、配置环境变量
五、基于PySpark的机器学习实战
1、将数据文件上传HDFS
2、创建代码文件
3、提交应用程序
一、安装须知
前置依赖:
JDK 1.8
Hadoop 3.1.3
安装教程可见我的另一篇文章:Linux CentOS安装Hadoop3.1.3(单机版)详细教程
本教程将对以下软件进行安装:
Spark 3.5.2
py4j
二、安装Spark
1、下载安装包
进入下载地址
下载“spark-3.5.2-bin-without-hadoop.tgz”,然后上传到linux服务器上。
或者在服务器使用以下命令下载安装包:
wget https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-without-hadoop.tgz
将安装包解压:
tar -zxf spark-3.5.2-bin-without-hadoop.tgz -C /opt
mv /opt/spark-3.5.2-bin-without-hadoop/ /opt/spark
2、修改配置文件spark-env.sh
修改配置文件spark-env.sh,命令如下:
cd /opt/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
编辑spark-env.sh文件:
vim ./conf/spark-env.sh
设置SPARK_DIST_CLASSPATH环境变量并保存:
export SPARK_DIST_CLASSPATH=$(/opt/hadoop/bin/hadoop classpath)
通过执行 /opt/hadoop/bin/hadoop classpath 命令获取的 Hadoop 类路径(classpath)。
在 Hadoop 中,classpath 命令用于输出一个命令行,该命令行可以用来设置 Java 的类路径,以便 Hadoop 能够找到它需要的所有库和配置文件。这个命令行通常包含了 Hadoop 的 conf 目录、HDFS 的 data 目录以及 Hadoop 的 lib 目录中的所有 JAR 文件。
将 classpath 命令的输出设置为 SPARK_DIST_CLASSPATH 环境变量,有助于确保 Spark 能够正确地识别和访问 Hadoop 的依赖和配置。这样,Spark 就可以与 Hadoop 协同工作,利用 Hadoop 的文件系统和其他 Hadoop 相关的功能。
3、验证Spark是否安装成功
运行以下命令:
bin/run-example SparkPi
“bin/run-example SparkPi”是 Spark 的一个内置测试用例,用于演示如何使用 Spark 计算圆周率π的近似值。
输出结果为:
Pi is roughly 3.1426557132785664
使用 Spark Shell 编写代码:
bin/spark-shell
测试Scala,计算两数之和:
def add(a: Int, b: Int): Int = a + b
val result = add(5, 7)
退出Spark Shell:
:quit
三、安装py4j
py4j是一个 Python 和 Java 之间的桥接库,它允许 Python 代码调用 Java 代码,反之亦然。py4j 提供了丰富的 API,包括 Java 对象与 Python 对象的相互转换、网络通信、日志记录等。它被广泛用于在 Python 环境中集成 Java 库,尤其是在数据科学和机器学习中,因为它使得可以轻松地使用 Java 编写的库,如 Hadoop 和 Spark。
使用以下命令安装py4j:
pip install py4j
四、配置环境变量
在安装 Spark 时还需要配置一些环境变量,为了确保 Spark 能够正确地找到 Hadoop 和 Java 的相关文件,以及配置 Python 环境以支持 PySpark(Spark 的 Python API)。
①编辑~/.bashrc文件:
vim ~/.bashrc
②添加以下内容:
export PATH=$PATH:/opt/hadoop/bin:/opt/hadoop/sbin
export JAVA_HOME=/usr/java8 # 注意,这里写你的java安装目录
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
export HADOOP_HOME=/opt/hadoop
export SPARK_HOME=/opt/spark
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH
export PYSPARK_PYTHON=python3
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PATH
其中,java目录可通过以下命令查看:
which java
③重新加载配置:
source ~/.bashrc
然后,不出意外的话,就可以运行PySpark了
五、基于PySpark的机器学习实战
以下内容基于我的另一篇文章:(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测
不同的是,该篇文章使用Windows环境,从本地导入csv文件。本文在linux环境下运行,使用HDFS存储数据文件。
1、将数据文件上传HDFS
数据集“creditcard.csv”中的数据来自2013年9月由欧洲持卡人通过信用卡进行的交易。共284807行交易记录,其中数据文件中Class==1表示该条记录是欺诈行为,总共有 492 笔。输入数据中存在 28 个特征 V1,V2,……V28(通过PCA变换得到,不用知道其具体含义),以及交易时间 Time 和交易金额 Amount。
百度云链接:https://pan.baidu.com/s/1_GLiEEqIZqXVG7M1lcnewg
提取码:abcd
目标:构建一个信用卡欺诈分析的分类器。通过以往的交易数据分析出每笔交易是否正常,是否存在盗刷风险。
将creditcard.csv文件下载后,传到linux服务器上。
创建data目录,存放数据文件:
hdfs dfs -mkdir -p /data
将数据文件上传到HDFS:
hdfs dfs -put creditcard.csv /data
查看HDFS存储情况:
hdfs dfs -ls /data
2、创建代码文件
创建test.py文件:
vim test.py
将以下代码写入test.py文件:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator #初始化SparkSession
spark = SparkSession.builder.appName("CreditCardFraudDetection").getOrCreate() # 读取数据
data = spark.read.csv("hdfs://localhost:9000/data/creditcard.csv", header=True, inferSchema=True)
data = data.drop('Time', 'Amount').withColumnRenamed("Class","label")# 组装特征向量
vectorAssembler = VectorAssembler(inputCols=data.columns[:-1], outputCol="features")
data_with_vector = vectorAssembler.transform(data) # 分离标签和特征
label_column = "label"
features_column = "features"
data_with_vector = data_with_vector.select(features_column, label_column) # 划分训练集和测试集
(train_data, test_data) = data_with_vector.randomSplit([0.7, 0.3], seed=0) # 计算少数类和多数类的数量
fraud_count = train_data.filter(train_data[label_column] == 1).count()
normal_count = train_data.filter(train_data[label_column] == 0).count() # 下采样多数类以匹配少数类数量
downsampled_normal = train_data.filter(train_data[label_column] == 0).sample(False, fraud_count / normal_count) # 合并下采样后的多数类样本和原始的少数类样本
balanced_train_data = downsampled_normal.union(train_data.filter(train_data[label_column] == 1)) # 训练逻辑回归模型
lr = LogisticRegression(labelCol=label_column)
lr_model = lr.fit(balanced_train_data)
lr_predictions = lr_model.transform(test_data)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('逻辑回归AUC分数:', evaluator.evaluate(lr_predictions))# 训练决策树模型
dt = DecisionTreeClassifier(labelCol=label_column)
dt_model = dt.fit(balanced_train_data)
dt_predictions = dt_model.transform(test_data)
print('决策树AUC分数:', evaluator.evaluate(dt_predictions))# 训练随机森林模型
rf = RandomForestClassifier(labelCol=label_column)
rf_model = rf.fit(balanced_train_data)
rf_predictions = rf_model.transform(test_data)
print('随机森林AUC分数:', evaluator.evaluate(rf_predictions))spark.stop()
3、提交应用程序
spark-submit test.py
输出结果如下: