基于Python语言的Spark数据处理分析——2020年美国新冠肺炎疫情数据分析
- 一、实验环境
- 二、数据集
- 1.数据集下载来源
- 2.转换文件格式
- 3.上传文件至HDFS文件系统
- 三、使用Spark进行数据分析
- 1.读取文件并生成DataFrame
- 2.采用python编程语言进行数据分析
- 3.将HDFS上结果文件保存到本地文件系统
- 四、数据可视化
- 1.可视化工具
- 2.数据可视化代码
- 3.可视化结果
- 五、总结
- 六、参考材料
一、实验环境
(1)操作系统:Ubuntu 64位14.04.6
(2)Hadoop版本:3.2.1(安装教程)
(3)Python版本:3.7.6
(4)Spark版本:2.4.7(安装教程)
(5)Jupyter Notebook(安装和使用方法教程)
二、数据集
1.数据集下载来源
本次数据分析所使用的数据集来源于数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,包含了美国发现首例新冠肺炎确诊病例至2020年5月19日的相关数据。以下为数据包含的字段:
字段名称 | 字段含义 |
---|---|
date | 日期 |
county | 区县(州的下一级单位) |
state | 州 |
cases | 截止该日期该区县的累计确诊人数 |
deaths | 截止该日期该区县的累计死亡人数 |
2.转换文件格式
(1)将下载好的us-counties.csv文件放到/home/hadoop目录下。
(2)由于数据集是以.csv文件组织的,需要先将us-counties.csv转换为.txt格式的文件,方便spark读取生成RDD或者DataFrame。使用python实现转换,首先在终端输入命令jupyter notebook来运行Jupyter Notebook;然后在浏览器中输入http://localhost:8888打开jupyter界面。
(3)在Jupyter Notebook中新建一个.ipynb文件,命名为toTxt.ipynb,并写入以下代码。
import pandas as pd#.csv->.txt
data = pd.read_csv('/home/hadoop/us-counties.csv')
with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:for line in data.values:f.write((str(line[0])+'\t'+str(line[1])+'\t'+str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))
(4)运行代码查看结果如下。
3.上传文件至HDFS文件系统
(1)将.txt文件上传到HDFS中,需确保已启动Hadoop。使用如下命令:
./sbin/start-dfs.sh
(2)将本地文件系统/home/hadoop/目录下的us-counties.txt文件上传到HDFS文件系统中。使用如下命令:
./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop
可使用ls命令查看该文件是否成功上传。
三、使用Spark进行数据分析
1.读取文件并生成DataFrame
由于使用的数据集数据为结构化数据,这里使用spark读取源文件生成DataFrame,以便后续进行分析。读取us-counties.txt生成DataFrame的代码如下:
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as funcdef toDate(inputStr):newStr = ""if len(inputStr) == 8:s1 = inputStr[0:4]s2 = inputStr[5:6]s3 = inputStr[7]newStr = s1+"-"+"0"+s2+"-"+"0"+s3else:s1 = inputStr[0:4]s2 = inputStr[5:6]s3 = inputStr[7:]newStr = s1+"-"+"0"+s2+"-"+s3date = datetime.strptime(newStr, "%Y-%m-%d")return date#主程序:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))shemaUsInfo = spark.createDataFrame(rdd1,schema)shemaUsInfo.createOrReplaceTempView("usInfo")
2.采用python编程语言进行数据分析
在计算过程中采用了DataFrame自带的操作函数以及spark sql。
(1)以date作为分组字段,对cases和deaths字段进行汇总统计,统计美国截止每日的累计确诊人数和累计死亡人数。
#1.计算每日的累计确诊病例数和死亡数
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())#列重命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result/result1.json") #写入hdfs#注册为临时表供下一步使用
df1.createOrReplaceTempView("ustotal")print('Success') # 判断程序是否已成功运行
(2)因为新增数=今日数-昨日数,所以以t1.date=t2.date+1为连接条件,使用t1.totalCases-t2.totalCases计算美国每日较昨日的新增确诊人数和新增死亡人数。
#2.计算每日较昨日的新增确诊病例数和死亡病例数
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")df2.sort(df2["date"].asc()).repartition(1).write.json("result/result2.json") #写入hdfsprint('Success') # 判断程序是否已成功运行
(3)筛选出2020.5.19的数据,以state作为分组字段,对cases和deaths字段进行汇总统计,统计截止2020.5.19美国各州的累计确诊人数和死亡人数。
#3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result/result3.json") #写入hdfsdf3.createOrReplaceTempView("eachStateInfo")print('Success') # 判断程序是否已成功运行
(4)对(3)的结果DataFrame注册临时表,然后按确诊人数降序排列,取前10个州即找出美国确诊最多的10个州。
#4.找出美国确诊最多的10个州
df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10")
df4.repartition(1).write.json("result/result4.json")print('Success') # 判断程序是否已成功运行
(5)对(3)的结果DataFrame注册临时表,然后按死亡人数降序排列,取前10个州即找出美国死亡最多的10个州。
#5.找出美国死亡最多的10个州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result/result5.json")print('Success') # 判断程序是否已成功运行
(6)对(3)的结果DataFrame注册临时表,然后按确诊人数升序排列,取前10个州即找出美国确诊最少的10个州。
#6.找出美国确诊最少的10个州
df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10")
df6.repartition(1).write.json("result/result6.json")print('Success') # 判断程序是否已成功运行
(7)对(3)的结果DataFrame注册临时表,然后按死亡人数升序排列,取前10个州即找出美国死亡最少的10个州。
#7.找出美国死亡最少的10个州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result/result7.json")print('Success') # 判断程序是否已成功运行
(8)对(3)的结果DataFrame注册临时表,按病死率=死亡人数/确诊人数这个公式进行计算,统计截止2020.5.19全美和各州的病死率。
#8.统计截止5.19全美和各州的病死率
df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result/result8.json")print('Success') # 判断程序是否已成功运行
3.将HDFS上结果文件保存到本地文件系统
(1)在本地文件系统中的目录/home/hadoop下新建result文件夹,接着在result文件夹中新建8个子文件夹,分别命名为result1、result2、result3、……
(2)为方便后续可视化处理,Spark计算结果保存.json文件。可在终端进行查看,使用命令如下:
./bin/hdfs dfs -ls /user/hadoop/result
(3)因为python读取HDFS文件系统不方便,所以将HDFS上结果文件保存到本地文件系统中,使用命令如下:
./bin/hdfs dfs -get /user/hadoop/result/result1.json/*.json /home/hadoop/result/result1/part-00000.json
./bin/hdfs dfs -get /user/hadoop/result/result1.json/*.json /home/hadoop/result/result2/part-00000.json
以此类推,对于result3等结果文件,使用相同命令,只需要改一下路径即可。这里统一名称part-00000.json是为了方便后续数据可视化的操作。
(4)查看下载结果。
四、数据可视化
1.可视化工具
选择使用python第三方库pyecharts作为可视化工具。安装pyecharts代码如下:
pip install pyecharts
2.数据可视化代码
在Jupyter Notebook中新建一个.ipynb文件,命名为showdata.ipynb。
(1)导入第三方库。
from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json
(2)用双柱状图画出每日的累计确诊病例数和死亡数。
#1.画出每日的累计确诊病例数和死亡数——>双柱状图
def drawChart_1(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"date = []cases = []deaths = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)date.append(str(js['date']))cases.append(int(js['cases']))deaths.append(int(js['deaths']))d = (Bar().add_xaxis(date).add_yaxis("累计确诊人数", cases, stack="stack1").add_yaxis("累计死亡人数", deaths, stack="stack1").set_series_opts(label_opts=opts.LabelOpts(is_show=False)).set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数")).render("/home/hadoop/result/result1/result1.html"))print('Success') # 判断程序是否已成功运行
(3)用折线图画出每日的新增确诊病例数和死亡数。
#2.画出每日的新增确诊病例数和死亡数——>折线图
def drawChart_2(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"date = []cases = []deaths = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)date.append(str(js['date']))cases.append(int(js['caseIncrease']))deaths.append(int(js['deathIncrease']))(Line(init_opts=opts.InitOpts(width="1600px", height="800px")).add_xaxis(xaxis_data=date).add_yaxis(series_name="新增确诊",y_axis=cases,markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem(type_="max", name="最大值")]),markline_opts=opts.MarkLineOpts(data=[opts.MarkLineItem(type_="average", name="平均值")]),).set_global_opts(title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),tooltip_opts=opts.TooltipOpts(trigger="axis"),toolbox_opts=opts.ToolboxOpts(is_show=True),xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),).render("/home/hadoop/result/result2/result1.html"))(Line(init_opts=opts.InitOpts(width="1600px", height="800px")).add_xaxis(xaxis_data=date).add_yaxis(series_name="新增死亡",y_axis=deaths,markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem(type_="max", name="最大值")]),markline_opts=opts.MarkLineOpts(data=[opts.MarkLineItem(type_="average", name="平均值"),opts.MarkLineItem(symbol="none", x="90%", y="max"),opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),]),).set_global_opts(title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),tooltip_opts=opts.TooltipOpts(trigger="axis"),toolbox_opts=opts.ToolboxOpts(is_show=True),xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),).render("/home/hadoop/result/result2/result2.html"))print('Success') # 判断程序是否已成功运行
(4)用表格画出截止2020.5.19,美国各州累计确诊、死亡人数和病死率。
#3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格
def drawChart_3(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"allState = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)row = []row.append(str(js['state']))row.append(int(js['totalCases']))row.append(int(js['totalDeaths']))row.append(float(js['deathRate']))allState.append(row)table = Table()headers = ["State name", "Total cases", "Total deaths", "Death rate"]rows = allStatetable.add(headers, rows)table.set_global_opts(title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle=""))table.render("/home/hadoop/result/result3/result1.html")print('Success') # 判断程序是否已成功运行
(5)用词云图画出美国确诊最多的10个州。
#4.画出美国确诊最多的10个州——>词云图
def drawChart_4(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"data = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)row=(str(js['state']),int(js['totalCases']))data.append(row)c = (WordCloud().add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND).set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10")).render("/home/hadoop/result/result4/result1.html"))print('Success') # 判断程序是否已成功运行
(6)用象柱状图画出美国死亡最多的10个州。
#5.画出美国死亡最多的10个州——>象柱状图
def drawChart_5(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"state = []totalDeath = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)state.insert(0,str(js['state']))totalDeath.insert(0,int(js['totalDeaths']))c = (PictorialBar().add_xaxis(state).add_yaxis("",totalDeath,label_opts=opts.LabelOpts(is_show=False),symbol_size=18,symbol_repeat="fixed",symbol_offset=[0, 0],is_symbol_clip=True,symbol=SymbolType.ROUND_RECT,).reversal_axis().set_global_opts(title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),xaxis_opts=opts.AxisOpts(is_show=False),yaxis_opts=opts.AxisOpts(axistick_opts=opts.AxisTickOpts(is_show=False),axisline_opts=opts.AxisLineOpts(linestyle_opts=opts.LineStyleOpts(opacity=0)),),).render("/home/hadoop/result/result5/result1.html"))print('Success') # 判断程序是否已成功运行
(7)用词云图画出美国确诊最少的10个州。
#6.找出美国确诊最少的10个州——>词云图
def drawChart_6(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"data = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)row=(str(js['state']),int(js['totalCases']))data.append(row)c = (WordCloud().add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND).set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州")).render("/home/hadoop/result/result6/result1.html"))print('Success') # 判断程序是否已成功运行
(8)用漏斗图画出美国死亡最少的10个州。
#7.找出美国死亡最少的10个州——>漏斗图
def drawChart_7(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"data = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)data.insert(0,[str(js['state']),int(js['totalDeaths'])])c = (Funnel().add("State",data,sort_="ascending",label_opts=opts.LabelOpts(position="inside"),).set_global_opts(title_opts=opts.TitleOpts(title="")).render("/home/hadoop/result/result7/result1.html"))print('Success') # 判断程序是否已成功运行
(9)用饼状图画出美国的病死率。
#8.美国的病死率--->饼状图
def drawChart_8(index):root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"values = []with open(root, 'r') as f:while True:line = f.readline()if not line: # 到 EOF,返回空字符串,则终止循环breakjs = json.loads(line)if str(js['state'])=="USA":values.append(["Death(%)",round(float(js['deathRate'])*100,2)])values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])c = (Pie().add("", values).set_colors(["blcak","orange"]).set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率")).set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")).render("/home/hadoop/result/result8/result1.html"))print('Success') # 判断程序是否已成功运行
(10)可视化主程序。
#可视化主程序:
index = 1
while index<9:funcStr = "drawChart_" + str(index)eval(funcStr)(index)index+=1print('Success') # 判断程序是否已成功运行
3.可视化结果
可视化结果是.html格式的,reslut1的结果展示图保存路径为“/home/hadoop/result/result1/result1.html”,reslut2的结果展示图保存路径为“/home/hadoop/result/result2/result1.html”,以此类推。
(1)美国每日的累计确诊病例数和死亡数。
(2)美国每日的新增确诊病例数和死亡数。
(3)截止2020.5.19,美国各州累计确诊、死亡人数和病死率。
(4)截止2020.5.19,美国累计确诊最多的10个州。
(5)截止2020.5.19,美国累计死亡最多的10个州。
(6)截止2020.5.19,美国累计确诊最少的10个州。
(7)截止2020.5.19,美国累计死亡最少的10个州。
(8)截止2020.5.19,美国的病死率。
五、总结
本文仅仅简单介绍了基于python语言的Spark数据处理分析的使用。首先搭建实验环境;其次获取数据集并转换为.txt格式以方便spark读取生成RDD或DataFrame;然后在启动Hadoop的前提下,将.txt文件上传到HDFS文件系统中;接着使用Spark进行数据分析,在此过程中,采用了python编程语言、DataFrame自带的操作函数以及spark sql,并将结果文件下载到本地文件系统中;最后,使用python第三方库pyecharts作为可视化工具,将数据可视化。在整个实验操作过程中,我遇到了两个问题,其一,在安装jupyter notebook时,基于anaconda3的python版本太高,导致jupyter notebook和spark交互出现问题,这里建议在Anaconda清华大学镜像下载anaconda3,在Anaconda官网下载的anaconda3中python3版本过高,不支持jupyter notebook和spark交互;其二,将HDFS上结果文件下载到本地文件系统中时,没有统一.json名称,在后面数据可视化时,代码中统一用part-00000.json,导致.json名称对应不上,运行结果出错,重新下载结果文件并统一.json名称就能解决问题了。
六、参考材料
[1] http://dblab.xmu.edu.cn/blog/290-2/
[2] http://dblab.xmu.edu.cn/blog/2575-2/
[3] http://dblab.xmu.edu.cn/blog/2636-2/