使用Apache Spark进行预测性数据分析--数据准备篇


本文是Boutros El-Gamil的使用Apache Spark进行预测性数据分析系列文章的第二篇,http://www.data-automaton.com/2019/01/04/predictive-data-analytics-with-apache-spark-part-2-data-preparation/

  • 第一篇详见使用Apache Spark进行预测性数据分析--简介篇

  • 关于Windows 下PySpark的安装教程可以查看Windows 下pyspark的最简化安装

1.下载数据并创建项目目录

我们首先从https://ti.arc.nasa.gov/tech/dash/groups/pcoe/prognostic-data-repository/下载Turbofan Engine Degradation Simulation Data Set。解压缩.zip文件,并将展开的文件夹(/CMAPSSData)添加到您的项目主目录。

数据文件夹由12个.txt文件组成,它们代表测试数据集的四个独立部分,以及它们对应的测试数据集和真实数据。本教程分析数据的第一部分。但是您可以自由添加附加数据部分,并使用相同的代码对其进行测试。

本教程中的代码可以在Python 3.7Spark 2.4.4版本下执行(原文基于Python2.7和Spark2.3.1,需要Python2.7版本代码的可以直接看原文提供的代码)。

2.清理主目录

在启动Spark编码之前,我们需要从自动生成的组件中清除Spark主目录,这些组件可能在先前的Spark执行过程中添加。从Spark 2.1.x开始,由于Spark-defaults.conf.template文件中的默认设置,Spark在代码的主目录中自动生成metastore_db目录和derby.log文件。这些自动生成的组件中的数据可能会中断当前Spark会话的执行。因此,我们需要同时删除metastore_db目录和derby.log文件,以使Spark会话能够正常启动。以下功能可以完成这项工作。

def clean_project_directory():    '''    This function deletes both '/metastore_db' folder and 'derby.log' file before     initializing Apache Spark session.    The goal is to avoid any inconsistant startout of Spark session    '''
print (os.getcwd())
# delete metastore_db folder if found if os.path.isdir(os.getcwd() + '/metastore_db'): shutil.rmtree(os.getcwd() + '/metastore_db', ignore_errors=True)
# delete derby.log file if found if os.path.exists(os.getcwd() + '/derby.log'): os.remove(os.getcwd() + '/derby.log')
# run functionclean_project_directory()

3.启动Spark会话

清理Spark主目录后,我们创建一个新的Spark会话。pyspark.sql模块包含SparkSession()  函数,使我们能够创建新的Spark会话。使用此功能,我们可以设置一些不错的属性,例如会话的主URL,Spark应用程序名称以及为每个执行程序进程保留的最大内存量。在下面的函数中,我们创建一个Spark会话,该会话在本地计算机上运行,最大保留内存为1 GB。

def create_spark_session(app_name, exe_memory):    '''    This function creates Spark session with application name and available memory to the session        INPUTS:    @app_name: name of Spark application    @exe_memory: value of reserved memory for Spark session        OUTPUTS:    @SparkSession instance    '''        # create Spark Session    return SparkSession.builder \       .master("local") \       .appName(app_name) \       .config("spark.executor.memory", exe_memory) \       .getOrCreate()
# create Spark session spark = create_spark_session('Predictive Maintenance', '1gb')
# set Spark Context objectsc = spark.sparkContext
# print configurations of current Spark sessionsc.getConf().getAll()

4.导入.CSV数据文件到Spark会话

在此步骤中,我们将.CSV格式的训练和测试数据文件作为RDD对象导入到Spark会话中。RDD(即弹性分布式数据集)是Apache Spark的核心数据格式,可以通过所有Spark支持的语言(Scala,Python,Java和SQL)调用。与Hadoop分布式文件系统(HDFS)相比,RDD在速度上有着明显优势。

将CSV文件导入为RDD对象的最简单方法是使用sparkContext模块。我们使用union()函数将所有CSV文件附加到一个RDD对象。然后我们使用map()函数将数据字段分为RDD对象。

def read_csv_as_rdd(sc, path, files_list, sep):    '''    This function reads .CSV data files into RDD object and returns the RDD object        INPUTS:    @sc: Spark Context object    @path: path of .CSV files    @files_list: list of :CVS data files    @sep: fields separator        OUTPUTS:    @rdd: RDD object contains data observations    '''        # Read files and append them to RDD    rdd = sc.union([sc.textFile(path + f) for f in files_list])
# Split lines on spaces rdd = rdd.map(lambda line: line.split(sep)) # return RDD object return rdd
# get files of training datatrain_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('train') and filename.endswith('.txt'))])
# get files of test datatest_files = sorted([filename for filename in os.listdir(path) if (filename.startswith('test') and filename.endswith('.txt'))])
# read training data in RDDtrain_rdd = read_csv_as_rdd(sc, path, [train_files[0]], " ")
# read test data in RDDtest_rdd = read_csv_as_rdd(sc, path, [test_files[0]], " ")
# print num. of observations of training dataprint ("number of observations in train data: ", train_rdd.count())
# print num. of observations of test dataprint ("number of observations in test data: ", test_rdd.count())

5.将RDD转换为Spark Dataframe

PySpark的基本优点是能够将RDD对象转换为Dataframes。对于那些熟悉R或Python Dataframes的读者,使用Spark Dataframes使Spark编码容易得多。与R和Python Dataframes相似,Spark Dataframes也是将数据对象组成的组,这些数据对象被组织到命名字段(即)中。为了将RDD对象转换为Spark Dataframe,您所需要做的就是定义要分配给数据的列名列表。函数toDF()将为您完成其余工作。

def convert_rdd_to_df(rdd, header):    '''    This function converts data from RDD format to Spark Dataframe, and adds header to the dataframe        INPUTS:    @rdd: RDD object contains data features    @header: list of column names          OUTPUTS:    PySpark DF version of @rdd    '''            # convert RDD to DF with header    return rdd.toDF(header)
# Set data header, contains list of names of data columnsheader = ["id", "cycle", "setting1", "setting2", "setting3", "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10", "s11", "s12", "s13", "s14", "s15", "s16","s17", "s18", "s19", "s20", "s21"]
# get df of both train and test data out of corresponded RDD objectstrain_df = convert_rdd_to_df(train_rdd, header)test_df = convert_rdd_to_df(test_rdd, header)
# drop empty columns from DFtrain_df= train_df.drop("_27").drop("_28")test_df= test_df.drop("_27").drop("_28")
# check the dimensions of the dataprint (train_df.count(), len(train_df.columns))print (test_df.count(), len(test_df.columns))

6.删除NULL值

在将数据集作为Spark Dataframes获取之后,我们想要删除NULL观察值(即没有值的数据观察值)。为此,我们所需要做的就是将na.drop()应用于我们的数据帧。

def remove_na_rows(df):    '''    This function removes rows with empty values from Spark Dataframe        INPUTS:    @df: Spark Dataframe with possible NULL values        OUTPUTS:    @df: Spark Dataframe without NULL values    '''        return df.na.drop()
# check the dimensions of the dataprint ("Before removing NULL rows:")print (train_df.count(), len(train_df.columns))print (test_df.count(), len(test_df.columns))
# remove empty rowstrain_df = remove_na_rows(train_df)test_df = remove_na_rows(test_df)
print ("After removing NULL rows:")# check the dimensions of the dataprint (train_df.count(), len(train_df.columns))print (test_df.count(), len(test_df.columns))

7.设置数据类型

下一步是为数据框中的每一列分配一种数据类型。为此,我们将各列转换为适当的数据类型(例如Integer,Double,String等)。下面的代码将列列表转换为Integer数据类型。

from pyspark.sql.types import IntegerTypeif len(int_list) > 0:   for f in int_list:      df = df.withColumn(f, df[f].cast(IntegerType()))

8.可视化数据

Apache Spark没有用于数据可视化的原生模块。因此,为了可视化我们的数据,我们需要将Spark Dataframes转换为另一种格式。我们可以在PySpark中应用的直接方法是将PySpark Dataframes转换为Pandas Dataframes。为了以有效的方式进行这种转换,我们选择要可视化的数据特征集,以Pandas DF的方式获取之。以下函数使用SQL查询获取Pandas DF。

def get_pandasdf_from_sparkdf(spark_df, view_name, query):    '''    This function queries Spark DF, and returns the result table as Pandas DF        INPUTS:    @spark_df: Spark Dataframe to be queried    @view_name: name of SQL view to be created from Spark Dataframe    @query: SQL query to be run on @spark_df         OUTPUTS:    SQL view of @query in Pandas format    '''        spark_df.createOrReplaceTempView(view_name)    return spark.sql(query).toPandas()

获得Pandas版本的数据后,我们可以使用matplotlib库轻松地对其进行可视化。

# set SQL query for dataframe dfsqlQuery = """    SELECT cycle, setting1, setting2, setting3,                    s1, s2, s3, s4, s5, s6, s7, s8,                    s9, s10, s11, s12, s13, s14, s15,                     s16,s17, s18, s19, s20, s21    FROM df1     WHERE df1.id=15    """
# get SQL query result as Pandas DFplotdata1 = get_pandasdf_from_sparkdf(train_df, "df1", sqlQuery)

下图显示了训练数据集中15号发动机的特征随时间的变化。

正如我们从上面的图表中看到的那样,测试数据具有随时间没有变化或变化很小的特征。这种类型的特征在构建预测数据模型中几乎没有用。因此,我们将通过删除低方差特征来开始本教程的下一篇文章。

9.完整代码

可以在我的Github中https://github.com/boutrosrg/Predictive-Maintenance-In-PySpark中找到本教程的代码。(该代码经测试,仅仅print函数需要加括号,其它代码不需要修改,能顺利运行通过)

P.S.

其实使用spark.read_csv可以更方便快速地获得Spark Dataframes,有兴趣的可以自己尝试下,如果想更多了解Spark Dataframe,可以参考PySpark 之Spark DataFrame入门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/66955.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

王者服务器维护段位掉了,王者荣耀s13赛季段位掉段规则 s13段位重置掉段继承表[图]...

王者荣耀s13赛季段位会掉多少,新赛季开始的时候会重置到什么阶段呢?或许有的玩家朋友很好奇吧,下面是友情MT为大家带来的王者荣耀s13赛季段位掉段规则,s13段位重置掉段继承表,希望能帮助到大家! 王者荣耀s1…

BRAUN A5S13B90-06速度传感器

向地球赎罪 美国加州有一个创业奇人名叫马修。 向地球赎罪少年时。他的梦想是“做爱迪生那样为世界带来光明的英雄”。 大学毕业后,他创办起圣诞灯制造厂,将美国的圣诞灯全面翻新,闪耀了整个美国,又将业务扩充到多种显示器&am…

倍福PLC标签导入威纶通触摸屏

使用到的软件TwinCAT2、威纶通EBpro V6.02.01.274 步骤1:编译PLC程序,生成后缀为".tpy"的文件。记住PLC程序路径 步骤2:打开威纶通EBpro,这里用到EBpro HMI版本是V6.02.01.274,和其他更高的版本大同小异。 …

威纶通触摸屏报错问题汇总

1、端口号被占用-在线模拟提示端口号8000被占 一旦出现这样提示以后,不管是离线模拟还是在线模拟都无法顺利往下执行,那么,怎么来解决这个问题呢?解决这个问题,通常分以下三类情况: 情况一 常规的解决方法…

威纶通触摸屏模板,直接打开就可以用 威纶通触摸屏,全部图库

威纶通触摸屏模板,直接打开就可以用,可根据自己要求修改, 威纶通触摸屏,全部图库。 ID:1219657830585655工控自动化技师

【告别传统】人工智能时代下,学习网安的成本有多低?

自我介绍⛵ 📣我是秋说,研究人工智能、大数据等前沿技术,传递Java、Python等语言知识。 🙉主页链接:秋说的博客 📆 学习专栏推荐: 人工智能:创新无限🤖 MySQL进阶之路&am…

理解unsafe-assume-no-moving-gc包

1. 背景 在之前的《Go与神经网络:张量计算》[1]一文中,不知道大家是否发现了,所有例子代码执行时,前面都加了一个环境变量ASSUME_NO_MOVING_GC_UNSAFE_RISK_IT_WITH,就像下面这样: $ASSUME_NO_MOVING_GC_UN…

探秘火山之巅:我在字节跳动的三年奇妙旅程

❤️点击上方,选择星标或置顶,每天给你送上干货❤️ 作者 | godweiyang 出品 | 公众号:算法码上来(ID:GodNLP) - BEGIN - 本文由ChatGPT润色,朱自清文笔 似乎已有半载的光阴,我未曾触…

OpenAI再获100亿美元?DoNotPay力砸100万仅为AI律师辩护复述;新冠四种亚型被机器学习算法进行归纳...

来源:AI科技大本营 本周AI界又有哪些新鲜事? 基础核心 超聚变服务器操作系统新版FusionOS 23发布 1月12日,“超聚变2023新品发布会”在北京举行。公司算力服务领域总裁郝峰会议上做了《桥接南北,融合生长,加速算力释放…

iPhone出黄色了/ 马斯克嘲笑推特前员工/ 日本火箭空中自毁…今日更多新鲜事在此...

日报君 发自 凹非寺量子位 | 公众号 QbitAI 大家好,今天是3月8日,国际妇女节。 祝女性读者们节日快乐。 今日科技圈还有哪些新鲜事儿,下滑走起~ 特斯拉下一代小型汽车价成本仅为Model 3一半 马斯克最新透露称,特斯拉正…

知网搜索论文:如何在知网上查找期刊论文

目录 一、先进入学术期刊库,然后再进行高级搜索 二、直接进行高级搜索 一、先进入学术期刊库,然后再进行高级搜索 1)进入中国知网官方主页之后,点击搜索框下方的选项功能键【学术期刊】。 2)之后页面就会跳转到期刊…

可以从知网领钱了,你知道吗?

说到知网,大家一定都不陌生。 中国知网给大家发钱了,是怎么回事儿呢? 原来是知网为了响应国家关于知识产权保护的相关政策推出了这个活动和政策。 这次活动不仅惠及今年的应届毕业生,还惠及往届毕业生,历年的研究生…

知网查重提交论文显示服务器错误,知网查重怎么会提交失败

在毕业之即,毕业生在完成论文初稿后,便要对论文进行查重。高校认证的一般都是知网查重,但在知网查重中会有许多问题出现,比如在上交论文的时候却提交失败。这是为什么呢?今天就让我们一起来聊聊知网查重怎么会提交失败…

知网获取论文参考文献

知网获取论文参考文献 进入知网搜索相应材料普通检索高级检索 选择相应的文献点击右上角左边双引号“凑”参考文献 进入知网 中国知网官方网址:https://www.cnki.net/ 搜索相应材料 搜素一般可分为普通检索和高级检索。 一般而言,普通检索即可完成我…

知网导入EndNote

首先进入知网,搜索你想要找的期刊论文。 选择EndNote 点击导出 浏览器自动下载以txt为后缀的文件 导入到EndNote中

毕业论文中计算机代码重复吗,知网查重程序代码算重复吗?

知网查重程序代码也是有源代码对比库的,因此程序代码也会被系统检测出来的。对于程序代码查重率过高的情况,我们需要通过别的方式来降低查重率,比如说通过截图。但为了保证论文字数与质量,建议大家多换个思路写代码。那么&#xf…

计算机毕业论文截图,知网查重代码截图能过吗?

知网查重代码截图在之前可能还适用,但是最新的检测系统中已经变得不可行了,最新版本中新增了源代码库,以及OCR图片识别功能,由此可见代码截图也是有可能被查到。换句话说,代码截图也不是百分百就能过的!那么…

计算机论文查重修改吗,程序符号换了知网查重能过吗

程序符号换了知网查重未必能过的,对于程序代码来说知网也是检测的。对于最新版本的检测系统来说,它对比的数据库中还新增了源代码数据库。所以说,程序代码一样容易出现非常高的重复率,与其它论文一样也是需要降重修改的。那么&…

【ChatGPT】这是一篇ChatGPT写的关于Python的文章

文章目录 Python基础语法教学1、变量2、数据类型3、运算符4、条件语句5、循环语句 更高级的概念1、函数2、模块3、面向对象编程 ChatGPT的记录 Python基础语法教学 Python是一种高级编程语言,它被广泛应用于计算机科学领域、数据分析和人工智能等各种领域。在学习P…

妙手ERP更新:Ozon支持批量编辑SKU列表的“补充属性”、速卖通支持批量自动生成1:1和3:4场景图、Lazada组包预估重量自动累加等

为了给卖家朋友带来更好的使用体验,更高效地运营跨境店铺,妙手ERP在上周优化了以下多项功能。 1、产品模块优化 全平台 - 采集箱、在线产品编辑支持使用ChatGPT智能生成产品标题、描述 Ozon - 支持批量编辑SKU列表的“补充属性” 速卖通 - 支持批…