需求:
❖ 要求:分别用SparkRDD, SparkSQL两种编程方式完成下列数据分析,结合webUI监控比较性能优劣并给出结果的合理化解释.
1、分别统计用户,性别,职业的个数:
2、查看统计年龄分布情况(按照年龄分段为7段)
3、查看统计职业分布情况(按照职业统计人数)
4、统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
5、统计评分分布情况
6、统计不同用户的评分次数。
7、统计不同类型的电影分布情况
8、统计每年的电影发布情况。
9、统计每部电影有多少用户评价,总评分情况,平均分情况
10、统计每个用户评价次数,评价总分以及平均分情况
11、求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)
12、分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
13、分别求男性,女性看过最多的 10 部电影(性别,电影名)
14、年龄段在“18-24”的男人,最喜欢看 10 部电影
15、求 movieid = 2116 这部电影各年龄段(年龄段为7段)的平均影评(年龄段,影评分)
16、求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)
17、求好片(评分>=4.0)最多的那个年份的最好看的 10 部电影
18、求1997年上映的电影中,评分最高的10部喜剧类电影
19、该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
20、各年评分最高的电影类型(年份,类型,影评分)
构建maven工程
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.dataAnalysis</groupId><artifactId>SparkRddAndSparkSQL</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version><scope>provided</scope></dependency></dependencies><build><finalName>MovieDataAnalysisBySparkRDD</finalName><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><archive><manifest><mainClass>Run</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!--mysql连接信息--><!-- jdbc连接的URL --><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value></property><!-- jdbc连接的Driver--><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value></property><!-- jdbc连接的username--><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value></property><!-- jdbc连接的password --><property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value></property><!-- Hive默认在HDFS的工作目录 --><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property><!-- 指定hiveserver2连接的端口号 --><property><name>hive.server2.thrift.port</name><value>10000</value></property><!-- 指定hiveserver2连接的host --><property><name>hive.server2.thrift.bind.host</name><value>hadoop102</value></property><!-- 元数据存储授权 --><property><name>hive.metastore.event.db.notification.api.auth</name><value>false</value></property><!-- Hive元数据存储版本的验证 --><property><name>hive.metastore.schema.verification</name><value>false</value></property><!-- hiveserver2的高可用参数,开启此参数可以提高hiveserver2的启动速度 --><property><name>hive.server2.active.passive.ha.enable</name><value>true</value></property><!--配置hiveserver2高可用--><property><name>hive.server2.support.dynamic.service.discovery</name><value>true</value></property><property><name>hive.server2.zookeeper.namespace</name><value>hiveserver2_zk</value></property><property><name>hive.zookeeper.quorum</name><value> hadoop102:2181,hadoop103:2181,hadoop104:2181</value></property><property><name>hive.zookeeper.client.port</name><value>2181</value></property><property><name>hive.server2.thrift.bind.host</name><value>hadoop102</value></property><!--配置metastore高可用--><!-- 指定存储元数据要连接的地址 --><property><name>hive.metastore.uris</name><value>thrift://hadoop102:9083,thrift://hadoop104:9083</value></property><!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)--><property><name>spark.yarn.jars</name><value>hdfs://yang-HA/spark-jars/*</value></property><!--Hive执行引擎--><property><name>hive.execution.engine</name><value>spark</value></property><!--Hive和Spark连接超时时间--><property><name>hive.spark.client.connect.timeout</name><value>10000ms</value></property></configuration>
run.scala
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable
import scala.collection.mutable.ListBuffer/*** @description:* @author: 宇文智* @date 2022/5/18 10:22* @version 1.0*/
object MovieDataAnalysisBySparkRDD {//使用spark Rdd分别统计用户,性别,职业的个数def demand_1_by_sparkRdd(spark:SparkSession): Unit = {val sc = spark.sparkContext//分别统计用户,性别,职业的个数: 用户身份::性别::年龄阶段::职业::邮政编码//local[*]:默认模式。自动帮你按照CPU最多核来设置线程数。比如CPU有8核,Spark帮你自动设置8个线程计算。val users: RDD[String] = sc.textFile("hdfs://yang-HA/movie/users.dat")println("--------查看 users ADD 血缘依赖关系-----------")println(users.toDebugString)//users会重复使用,将数据缓存//users.cache()//使用persist方法更改存储级别cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的简写users.persist(StorageLevel.MEMORY_AND_DISK_2)//设置检查点 如果checkpoint之后的出问题 ,避免数据从头开始计算,而且减少开销//会立即启动一个新的job来专门的做checkpoint运算,// 所以建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job// 只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。sc.setCheckpointDir("hdfs://yang-HA/spark_checkpoint")val quotas: ListBuffer[MovieQuota] = ListBuffer()sc.setJobGroup("008", "使用rdd,计算用户总数")val cnt: Long = users.filter(!_.isEmpty).count() //count()为action算子,生成一个job => job_00sc.setJobGroup("008", "使用rdd,计算用户总数")println("用户总数: " + cnt)quotas.append(MovieQuota("001", "用户总数", cnt.toString))println("---------使用系统累加器,计算用户总数---------")sc.setJobGroup("007", "使用系统累加器,计算用户总数")val sum: LongAccumulator = sc.longAccumulator("sum")users.foreach(l=>sum.add(1))sc.setJobGroup("007", "使用系统累加器,计算用户总数")println("用户总数:" + sum.value)sc.setJobGroup("2", "使用rdd算子,计算男女人数")val gender: RDD[(String, Int)] = users.map(line => {val lineArr: Array[String] = line.split("::")(lineArr(1), 1)}).reduceByKey(_ + _) //reduceByKey 自带缓存val genderCount: Array[String] = gender.map(x => { //foreach为action算子,生成一个job => job2if (x._1.equals("M")) {"男性人数:" + x._2} else {"女性人数:" + x._2}}).collect() //job_02sc.setJobGroup("2", "使用rdd算子,计算男女人数")quotas.append(MovieQuota("002", "男女统计", genderCount.mkString(",")))sc.setJobGroup("3", "使用自定义累加器,计算男女人数")val accumulator = new MyAccumulator()sc.register(accumulator)users.map(data => {val arr: Array[String] = data.split("::")accumulator.add(arr(1))}).collect() //job_03sc.setJobGroup("3", "使用自定义累加器,计算男女人数")println(accumulator.value)println("---------使用自定义累加器,计算进行职业统计---------")sc.setJobGroup("4", "使用自定义累加器,计算进行职业统计")val professionAcc = new MyAccumulator()sc.register(professionAcc)users.map(line => {professionAcc.add(line.split("::")(3))}).collect() //job_04sc.setJobGroup("4", "使用自定义累加器,计算进行职业统计")println(professionAcc.value)sc.setJobGroup("b", "checkpoint 容错机制开启一个job ,重新计算数据并存储在hdfs")val professionCount: RDD[(String, Int)] = users.map(line => {val arr: Array[String] = line.split("::")(arr(3), 1)}).reduceByKey(_ + _)professionCount.cache() //只缓存在内存中professionCount.checkpoint() //job_05sc.setJobGroup("b", "checkpoint 容错机制开启一个job ,重新计算数据并存储在hdfs")sc.setJobGroup("a", "使用rdd算子,计算进行职业统计")val profession: RDD[String] = sc.textFile("hdfs://yang-HA/movie/profession.dat")val professionRelation: RDD[(String, String)] = profession.map(line => {val arr: Array[String] = line.split(":")(arr(0), arr(1))})val quotas1: Array[MovieQuota] = professionCount.join(professionRelation).map(line => {MovieQuota("003", "职业统计", line._2._2.trim + ": " + line._2._1)}).collect() //job_06sc.setJobGroup("a", "使用rdd算子,计算进行职业统计")quotas.appendAll(quotas1)loadDataToHiveLocation(quotas,spark)}//使用sparkSQL分别统计用户,性别,职业的个数def demand_1_by_sparkSql(spark:SparkSession): Unit = {spark.sparkContext.setJobGroup("sparksql", "sparksql")val ds: Dataset[String] = spark.read.textFile("hdfs://yang-HA/movie/users.dat")import spark.implicits._val userDS: Dataset[user] = ds.map(line => {val lineArr: Array[String] = line.split("::")user(lineArr(0), lineArr(1), lineArr(2), lineArr(3), lineArr(4))})val professionDS: Dataset[profession] = spark.read.textFile("hdfs://yang-HA/movie/profession.dat").map(line => {val lineArr: Array[String] = line.split(":")profession(lineArr(0), lineArr(1).trim)})userDS.groupBy("professionId").count.join(professionDS, List("professionId"), "left").orderBy("professionId").createOrReplaceTempView("tmp1")userDS.createOrReplaceTempView("user")spark.sql("""|set hive.exec.dynamic.partition.mode=nonstrict|""".stripMargin)spark.sql("""|insert into table spark_data_analysis_quota.movie_quota partition(dt)|select '004','sparkSql职业统计',concat_ws(':',trim(professionName),count), current_date() dt from tmp1 ;|""".stripMargin)}//统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:def demand_2_by_sparkRdd(spark:SparkSession): Unit = {var sc = spark.sparkContext//统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数://每个用户的评分次数: 评分总次数 / 评分总人数(需去重)//最高评分=每部影片评分中取最大值,最低评分同理val rating: RDD[String] = sc.textFile("hdfs://yang-HA/movie/ratings.dat")val quotas = new ListBuffer[MovieQuota]/*var rank = 0var beforeVal = -1.0rating.map(line => {val arr: Array[String] = line.split("::")(arr(1), arr(2).toDouble)}).groupByKey().map {case (k, v) => {val sum1: Double = v.sum(k, sum1 / v.size)}}.collect().sortWith((kv1, kv2) => {kv1._2 > kv2._2}).map(kv=>{if(kv._2 != beforeVal){beforeVal = kv._2rank+=1}(kv,rank)}).filter(_._2==1).foreach(println)*/import spark.implicits._val ratingTuples: RDD[(String, String, String, String)] = rating.map(line => {val arr: Array[String] = line.split("::")(arr(0), arr(1), arr(2), arr(3))})ratingTuples.cache()//userId, movieID, rating, timestampval ratingCnt: Long = ratingTuples.count()val NumberPeoples: Long = ratingTuples.map(_._1).distinct(8).count()val movieCnt: Long = ratingTuples.map(_._2).distinct(8).count()quotas.append(MovieQuota("005","平均每个用户的评分次数,平均每部影片被评分次数",(ratingCnt/NumberPeoples+","+ratingCnt/movieCnt)))ratingTuples.map(line => (line._2, line._3.toDouble)).groupByKey().map(kv => {//统计最高评分,最低评分,平均评分var median = 0if (kv._2.size % 2 == 1) {//奇数median = (kv._2.size + 1) / 2} else {//偶数median = kv._2.size / 2}// println(median)val medianVal: Double = kv._2.toList.sortWith((v1, v2) => {v1 > v2}).apply(median - 1)val avgVal: Double = kv._2.sum / kv._2.sizeMovieQuota("006", "最高评分,最低评分,平均评分,中位评分", (kv._1, kv._2.max, kv._2.min, f"$avgVal%.3f", medianVal).toString())}).toDS.createOrReplaceTempView("tmp2")spark.sql("""|insert into table spark_data_analysis_quota.movie_quota partition(dt)|select *,current_date() dt from tmp2|""".stripMargin)loadDataToHiveLocation(quotas,spark)}//加载数据到hive表def loadDataToHiveLocation(quotas: Seq[MovieQuota],spark:SparkSession): Unit ={import spark.implicits._val sc: SparkContext = spark.sparkContextquotas.toDS.createOrReplaceTempView("quotas")spark.sql("""|msck repair table spark_data_analysis_quota.movie_quota;|""".stripMargin)spark.sql(s"""|insert into table spark_data_analysis_quota.movie_quota partition(dt)|select *, current_date() dt from quotas|""".stripMargin)sc.setJobGroup("c", "保存文件到hive表的location")}}object Run {def main(args: Array[String]): Unit = {// 设置访问HDFS集群的用户名System.setProperty("HADOOP_USER_NAME", "atguigu")System.setProperty("file.encoding", "UTF-8")// 1 创建上下文环境配置对象val conf: SparkConf = new SparkConf().setAppName("movie_data_analysis").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //替换默认序列化机制.registerKryoClasses(Array(classOf[MovieQuota])) //注册使用kryo序列化的自定义类.setMaster("yarn")// 2 创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()spark.sql("""|set hive.exec.dynamic.partition.mode=nonstrict|""".stripMargin)//使用spark Rdd分别统计用户,性别,职业的个数MovieDataAnalysisBySparkRDD.demand_1_by_sparkRdd(spark)//使用sparkSQL分别统计用户,性别,职业的个数MovieDataAnalysisBySparkRDD.demand_1_by_sparkSql(spark)//使用spark Rdd统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:MovieDataAnalysisBySparkRDD.demand_2_by_sparkRdd(spark)spark.close()}
}case class MovieQuota(var quota_id: String, var quota_name: String, var quota_value: String) {override def toString: String = {quota_id + '\t' + quota_name + '\t' + quota_value}
}case class profession(professionId: String, professionName: String)case class rating(userId: String, movieID: String, rating: String, timestamp: String)case class user(userId: String, gender: String, ageGrades: String, professionId: String, postalCode: String)case class movie(movieID: String, title: String, genres: String)//根据输入字段,统计字段总数
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private val genderCountMap: mutable.Map[String, Long] = mutable.Map[String, Long]()override def isZero: Boolean = genderCountMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator}override def reset(): Unit = genderCountMap.clearoverride def add(v: String): Unit = {if (v.equals("M")) {genderCountMap("男性") = genderCountMap.getOrElse("男性", 0L) + 1L} else if (v.equals("F")) {genderCountMap("女性") = genderCountMap.getOrElse("女性", 0L) + 1L} else {genderCountMap(v) = genderCountMap.getOrElse(v, 0L) + 1L}}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {other.value.foreach { case (key, value) => {genderCountMap(key) = genderCountMap.getOrElse(key, 0L) + value}}}override def value: mutable.Map[String, Long] = this.genderCountMap
}
目标表ddl
CREATE TABLE `spark_data_analysis_quota.movie_quota`(`quota_id` string COMMENT '指标id',`quota_name` string COMMENT '指标名',`quota_value` string COMMENT '指标值')COMMENT '电影指标分析表'PARTITIONED BY (`dt` string)clustered by (quota_id) into 3 bucketsstored as orc
使用maven install 打jar 包,放到spark 集群上。启动大数据各集群组件,执行 run_spark_job_byJar.sh
$SPARK_HOME/bin/spark-submit \
--class Run \
--master yarn \
--deploy-mode cluster \
--queue spark \
--conf spark.executor.extraJavaOptions="-Dfile.encoding=UTF-8" \
--conf spark.driver.extraJavaOptions="-Dfile.encoding=UTF-8" \
MovieDataAnalysisBySparkRDD.jar \
查看http://hadoop104:8088/cluster yarn历史服务器
点击history,跳转到[spark 历史服务器(在hadoop102上启动sbin/start-history-server.sh)]http://hadoop102:4000
查看 spark 作业日志
附:
集群启停脚本
cat hadoopHA.sh
#!/bin/bash
if [ $# -lt 1 ]
thenecho "No Args Input..."exit ;
fistart_cluster(){echo " =================== 启动 hadoop集群 ==================="echo " --------------- 启动 hdfs ---------------"ssh hadoop102 "/opt/module/hadoopHA/sbin/start-dfs.sh"echo " --------------- 启动 yarn ---------------"ssh hadoop103 "/opt/module/hadoopHA/sbin/start-yarn.sh"echo " --------------- 启动 historyserver ---------------"ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon start historyserver"echo "---------启动spark日志服务器----------"ssh hadoop102 "/opt/module/spark/sbin/start-history-server.sh "echo "-----启动hiveservice------"ssh hadoop102 "/home/atguigu/bin/hiveservices.sh start"
}stop_cluster(){echo " =================== 关闭 hadoop集群 ==================="echo "----------关闭hiveservice-------------"ssh hadoop102 "/home/atguigu/bin/hiveservices.sh stop"echo " --------------- 关闭 historyserver ---------------"ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon stop historyserver"echo " --------------- 关闭 yarn ---------------"ssh hadoop103 "/opt/module/hadoopHA/sbin/stop-yarn.sh"echo " --------------- 关闭 hdfs ---------------"ssh hadoop102 "/opt/module/hadoopHA/sbin/stop-dfs.sh"echo "---------停止spark日志服务器----------"ssh hadoop102 "/opt/module/spark/sbin/stop-history-server.sh "
}case $1 in
"start")echo "--------启动zookeeper----------"sh /home/atguigu/bin/dataCollectSystem/zk.sh startecho "-------启动大数据高可用集群-------"start_cluster
;;
"stop")stop_clusterecho "----------关闭zookeeper------------"sh /home/atguigu/bin/dataCollectSystem/zk.sh stop
;;
"restart")echo "---------重启集群---------"stop_clusterstart_cluster
;;
"status")echo " =================hadoopHA集群 各个节点状态==========="echo " ==========hadoop102,nn1========="n1_port=`ssh hadoop102 "jps | grep -v Jps | grep NameNode"` nn1=`hdfs haadmin -getServiceState nn1`echo ${n1_port}" "${nn1}echo " ==========hadoop103,nn2,rm1========="n2_port=`ssh hadoop103 "jps | grep -v Jps | grep NameNode"`nn2=`hdfs haadmin -getServiceState nn2`echo ${n2_port}" "${nn2}rm1_port=`ssh hadoop103 "jps | grep -v Jps | grep ResourceManager"`rm1=`yarn rmadmin -getServiceState rm1`echo ${rm1_port}" "${rm1}echo " ==========hadoop104,rm2========="rm2_port=`ssh hadoop104 "jps | grep -v Jps | grep ResourceManager"`rm2=`yarn rmadmin -getServiceState rm2`echo ${rm2_port}" "${rm2}
;;
*)echo "Input Args Error..."
;;
esac
cat hiveservices.sh #!/bin/bash
HIVE_LOG_DIR=$HIVE_HOME/logs
if [ ! -d $HIVE_LOG_DIR ]
thenmkdir -p $HIVE_LOG_DIR
fi
#检查进程是否运行正常,参数1为进程名,参数2为进程端口
function check_process()
{pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}')ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1)echo $pid[[ "$pid" =~ "$ppid" ]] && [ "$ppid" ] && return 0 || return 1
}function hive_start()
{metapid=$(check_process HiveMetastore 9083)cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2>&1 &"cmd=$cmd" sleep 4; hdfs dfsadmin -safemode wait >/dev/null 2>&1"[ -z "$metapid" ] && eval $cmd || echo "Metastroe服务已启动"server2pid=$(check_process HiveServer2 10000)cmd="nohup hive --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2>&1 &"[ -z "$server2pid" ] && eval $cmd || echo "HiveServer2服务已启动"
}function hive_stop()
{metapid=$(check_process HiveMetastore 9083)[ "$metapid" ] && kill $metapid || echo "Metastore服务未启动"server2pid=$(check_process HiveServer2 10000)[ "$server2pid" ] && kill $server2pid || echo "HiveServer2服务未启动"
}case $1 in
"start")hive_start;;
"stop")hive_stop;;
"restart")hive_stopsleep 2hive_start;;
"status")check_process HiveMetastore 9083 >/dev/null && echo "Metastore服务运行正常" || echo "Metastore服务运行异常"check_process HiveServer2 10000 >/dev/null && echo "HiveServer2服务运行正常" || echo "HiveServer2服务运行异常";;
*)echo Invalid Args!echo 'Usage: '$(basename $0)' start|stop|restart|status';;
esac