Spark综合练习——电影评分数据分析

文章目录

    • 引言
    • ChatGPT生成测试数据:
    • 今天给大家带来一个Spark综合练习案例--电影评分
    • 补充: 采用DSL编程的详尽注释版
    • 总结

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,俺希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,一个平凡而不平庸的人。

ChatGPT生成测试数据:

请根据如下格式生成100行数据集:
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
在这里插入图片描述
chatpgt注册:https://github.com/xianyu110/awesome-chatgpt-project

今天给大家带来一个Spark综合练习案例–电影评分

老师:给定需求统计评分次数>200的电影平均分Top10,并写入Mysql数据库中

我:所有字我都认识,怎么连在一起我就不认识了
在这里插入图片描述
不管了先new个实例对象,总没错吧

val sparkSession = SparkSession.builder().config("spark.sql.shuffle.partitions", "4").appName("电影数据分析").master("local[2]").getOrCreate()

然后大数据无非输入,转换,输出,我再弄个spark读取文件?
在这里插入图片描述

    val lines: RDD[String] = sparkSession.read.textFile("E:\\xx\\SparkDemo\\input\\ratings.dat").rdd

再然后RDD转换成DF

val rdd: RDD[(Int, Int, Int, Long)] = lines.mapPartitions { item => {item.map { line => {val strings: Array[String] = line.trim.split("::")(strings(0).toInt, strings(1).toInt, strings(2).toInt, strings(3).toLong)}}}}import sparkSession.implicits._val reusltDF: DataFrame = rdd.toDF("user_id", "item_id", "rating", "timestamp")

测试一下行不行
在这里插入图片描述

//    查看约束reusltDF.printSchema()//查看数据reusltDF.show()

好像跑通了!!笑容逐渐放肆~什么SQL不整了,上来直接DSL
在这里插入图片描述

val resultDS: Dataset[Row] = reusltDF//a.对数据按电影id进行分组.groupBy($"item_id")//b.对聚合数据求平均值和评分次数.agg(round(avg($"rating"), 2).as("avg_rating"),count($"user_id").as("cnt_rating"))//c.过滤出评分大于2000的.filter($"cnt_rating" > 2000)//d.按照评分的平均值进行降序排序.orderBy($"avg_rating".desc)//e.取前十条数据.limit(10)

最后最后保存到Mysql
SaveToMysql(resultDF);

/*** 保存数据至MySQL数据库,使用函数foreachPartition对每个分区数据操作,主键存在时更新,不存在时插入*/def saveToMySQL(dataFrame: DataFrame): Unit = {dataFrame.rdd.coalesce(1).foreachPartition{ iter =>// a. 加载驱动类Class.forName("com.mysql.cj.jdbc.Driver")// 声明变量var conn: Connection = nullvar pstmt: PreparedStatement = nulltry{// b. 获取连接conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","root", //"123456")// c. 获取PreparedStatement对象val insertSql ="""|INSERT|O| db_test.demo| (item_id, avg_rating, cnt_rating)|VALUES (?, ?, ?)|""".stripMarginpstmt = conn.prepareStatement(insertSql)conn.setAutoCommit(false)// d. 将分区中数据插入到表中,批量插入iter.foreach{ row =>pstmt.setInt(1, row.getAs[Int]("item_id"))pstmt.setInt(2, row.getAs[Int]("avg_rating"))pstmt.setInt(3, row.getAs[Int]("cnt_rating"))// 加入批次pstmt.addBatch()}// TODO: 批量插入pstmt.executeBatch()conn.commit()}catch {case e: Exception => e.printStackTrace()}finally {if(null != pstmt) pstmt.close()if(null != conn) conn.close()} }

在这里插入图片描述

大功告成了!
在这里插入图片描述

补充: 采用DSL编程的详尽注释版

package cn.itcast.spark.metricsimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel/*** 电影评分数据分析,需求如下:*      需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分*          电影ID    评分个数     电影名称 平均评分   更新时间*          movie_id、rating_num、title、rating_avg、update_time*      需求2:查找每个电影类别及其对应的平均评分*          电影类别  电影类别平均评分     更新时间*          genre、 rating_avg       、update_time*      需求3:查找被评分次数较多的前十部电影*          电影ID   电影名称  电影被评分的次数   更新时间*          movie_id、title、rating_num、      update_time
*/
object MetricsAppMain {// 文件路径private val RATINGS_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\ratings.csv"private val MOVIES_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\movies.csv"def main(args: Array[String]): Unit = {// step1、创建SparkSession实例对象val spark: SparkSession = createSparkSession(this.getClass)import spark.implicits._/*分析需求可知,三个需求最终结果,需要使用事实表数据和维度表数据关联,所以先数据拉宽,再指标计算TODO: 按照数据仓库分层理论管理数据和开发指标- 第一层(最底层):ODS层直接加CSV文件数据为DataFrame- 第二层(中间层):DW层将加载业务数据(电影评分数据)和维度数据(电影基本信息数据)进行Join关联,拉宽操作- 第三层(最上层):DA层/APP层依据需求开发程序,计算指标,进行存储到MySQL表*/// step2、【ODS层】:加载数据,CSV格式数据,文件首行为列名称val ratingDF: DataFrame = readCsvFile(spark, RATINGS_CSV_FILE_PATH, verbose = false)val movieDF: DataFrame = readCsvFile(spark, MOVIES_CSV_FILE_PATH, verbose = false)// step3、【DW层】:将电影评分数据与电影信息数据进行关联,数据拉宽操作val detailDF: DataFrame = joinDetail(ratingDF, movieDF)//printConsole(detailDF)// step4、【DA层】:按照业务需求,进行指标统计分析computeMetric(detailDF)Thread.sleep(1000000)// 应用结束,关闭资源spark.stop()}/*** 构建SparkSession实例对象,默认情况下本地模式运行*/def createSparkSession(clazz: Class[_], master: String = "local[2]"): SparkSession = {SparkSession.builder().appName(clazz.getSimpleName.stripSuffix("$")).master(master).config("spark.sql.shuffle.partitions", "2").getOrCreate()}/*** 读取CSV格式文本文件数据,封装到DataFrame数据集*/def readCsvFile(spark: SparkSession, path: String, verbose: Boolean = true): DataFrame = {val dataframe: DataFrame = spark.read// 设置分隔符为逗号.option("sep", ",")// 文件首行为列名称.option("header", "true")// 依据数值自动推断数据类型.option("inferSchema", "true").csv(path)if(verbose){printConsole(dataframe)}// 返回数据集dataframe}/*** 将事实表数据与维度表数据进行Join关联*/def joinDetail(df1: DataFrame, df2: DataFrame, joinType: String = "left_outer"): DataFrame = {df1// 采用leftJoin关联数据.join(df2, df1("movieId") === df2("movieId"), joinType)// 选取字段.select(df1("userId").as("user_id"), //df1("movieId").as("movie_id"), //df1("rating"), //df1("timestamp"), //df2("title"), //df2("genres") //)}/*** 按照业务需求,进行指标统计,默认情况下,结果数据打印控制台*/def computeMetric(dataframe: DataFrame): Unit = {// TODO: 缓存数据dataframe.persist(StorageLevel.MEMORY_AND_DISK)// 需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分val top10FilesDF: DataFrame = top10Films(dataframe)//printConsole(top10FilesDF)upsertToMySQL(top10FilesDF, //"replace into db_metrics.top_10_files (id, movie_id, rating_num, title, rating_avg) values (null, ?, ?, ?, ?)", //(pstmt: PreparedStatement, row: Row) => {pstmt.setInt(1, row.getAs[Int]("movie_id"))pstmt.setLong(2, row.getAs[Long]("rating_num"))pstmt.setString(3, row.getAs[String]("title"))pstmt.setDouble(4, row.getAs[Double]("rating_avg"))})// 需求2:查找每个电影类别及其对应的平均评分val genresRatingDF: DataFrame = genresRating(dataframe)//printConsole(genresRatingDF)
//		upsertToMySQL(
//			genresRatingDF, //
//			"replace into db_metrics.genres_rating (id, genre, rating_avg) values (null, ?, ?)", //
//			(pstmt: PreparedStatement, row: Row) => {
//				pstmt.setString(1, row.getAs[String]("genre"))
//				pstmt.setDouble(2, row.getAs[Double]("rating_avg"))
//			}
//		)// 需求3:查找被评分次数较多的前十部电影val best10FilesDF: DataFrame = best10Files(dataframe)//printConsole(best10FilesDF)
//		upsertToMySQL(
//			best10FilesDF, //
//			"replace into db_metrics.best_10_films (id, movie_id, title, rating_num) values (null, ?, ?, ?)", //
//			(pstmt: PreparedStatement, row: Row) => {
//				pstmt.setInt(1, row.getAs[Int]("movie_id"))
//				pstmt.setString(2, row.getAs[String]("title"))
//				pstmt.setLong(3, row.getAs[Long]("rating_num"))
//			}
//		)// 释放资源dataframe.unpersist()}/*** 需求:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分*    电影ID    评分个数     电影名称 平均评分   更新时间*    movie_id、rating_num、title、rating_avg、update_time*/def top10Films(dataframe: DataFrame): DataFrame = {import dataframe.sparkSession.implicits._dataframe.groupBy($"movie_id", $"title").agg(count($"movie_id").as("rating_num"), // 统计电影被评分的个数round(avg($"rating"), 2).as("rating_avg") // 统计电影被评分的平均分)// 过滤评分个数大于50.where($"rating_num" > 50)// 降序排序,按照平均分.orderBy($"rating_avg".desc)// 获取前10电影.limit(10)// 添加日期字段.withColumn("update_time", current_timestamp())}/*** 需求:查找每个电影类别及其对应的平均评分*     电影类别  电影类别平均评分     更新时间*     genre、 rating_avg       、update_time*/def genresRating(dataframe: DataFrame): DataFrame = {import dataframe.sparkSession.implicits._dataframe// 将每个电影类别字段:genres,按照|划分,使用爆炸函数进行行转列.select(explode(split($"genres", "\\|")).as("genre"), //$"rating" //)// 按照类别分组,计算平均评分.groupBy($"genre").agg(round(avg($"rating"), 2).as("rating_avg"))// 对统计值降序排序.orderBy($"rating_avg".desc)// 添加日期字段.withColumn("update_time", current_timestamp())}/*** 需求:查找被评分次数较多的前十部电影*     电影ID   电影名称  电影被评分的次数   更新时间*     movie_id、title、rating_num、      update_time*/def best10Files(dataframe: DataFrame): DataFrame = {import dataframe.sparkSession.implicits._dataframe.groupBy($"movie_id", $"title").agg(count($"movie_id").as("rating_num") // 统计电影被评分的个数)// 降序排序,按照平均分.orderBy($"rating_num".desc)// 获取前10电影.limit(10)// 添加日期字段.withColumn("update_time", current_timestamp())}/*** 将DataFrame数据集打印控制台,显示Schema信息和前10条数据*/def printConsole(dataframe: DataFrame): Unit = {// 显示Schema信息dataframe.printSchema()// 显示前10条数据dataframe.show(10, truncate = false)}/*** 将数据保存至MySQL表中,采用replace方式,当主键存在时,更新数据;不存在时,插入数据* @param dataframe 数据集* @param sql 插入数据SQL语句* @param accept 函数,如何设置Row中每列数据到SQL语句中占位符值*/def upsertToMySQL(dataframe: DataFrame, sql: String,accept: (PreparedStatement, Row) => Unit): Unit = {// 降低分区数目,对每个分区进行操作dataframe.coalesce(1).foreachPartition{iter =>// step1. 加载驱动类Class.forName("com.mysql.cj.jdbc.Driver")// 声明变量var conn: Connection = nullvar pstmt: PreparedStatement = nulltry{// step2. 创建连接conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","root","123456")pstmt = conn.prepareStatement(sql)// step3. 插入数据iter.foreach{row =>// 设置SQL语句中占位符的值accept(pstmt, row)// 加入批次中pstmt.addBatch()}// 批量执行批次pstmt.executeBatch()}catch {case e: Exception => e.printStackTrace()}finally {// step4. 关闭连接if(null != pstmt) pstmt.close()if(null != conn) conn.close()}}}}

总结

以上便是电影评分数据分析spark版,愿你读过之后有自己的收获,如果有收获不妨一键三连一下~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/72277.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

女儿米菲儿的照片

2011年3月26日,我们的宝贝女儿出生啦。 女儿非常乖而且聪明,出生第三天我就抓拍到一种微笑表情的照片。 小米菲儿出生第三天的照片: 小米菲儿出生第九天的照片:

游戏公司奇葩富豪身家仅次许家印,征集长腿美女生娃,女友房产超百套

2018年6月,多益网络在香港联交所递交了招股书。这是一个令人瞠目结舌的巨兽,背后传奇老板徐波也浮出水面。 招股书显示,多益网络在最近三年的营业收入分别达到16.29亿元、15.52亿元和19.34亿元,最后一年涨幅达到24.6%。换言之&…

超级玛丽——请你用字符画的形式输出超级玛丽中的一个场景。

#include<stdio.h> int main() {printf( " *****\n" " ****\n" " ####…#.\n" " #…###…##…\n" " ###…###### ### ### \n" " … #…# #…#\n" " ######### #.#.# #.#.#\n" " ########## …

1751 - 快乐的马里奥

马里奥是一个快乐的油漆工人&#xff0c;这天他接到了一个油漆任务&#xff0c;要求马里奥把一个 nn 行 mm 列的矩阵每一格都用油漆标记一个数字&#xff0c;标记的顺序按照广度优先搜索的方式进行&#xff0c;也就是他会按照如下方式标记&#xff1a; 1、首先标记第 11 行第 …

超级玛丽中那些不为人知的细节(下):碰撞与跳跃

"God is in the details"上帝存在于细节之中。这是20世纪前期伟大的建筑家路德维希.密斯的口头禅&#xff0c;而这句话也同样被游戏设计师们奉为圭臬。游戏被称为第九艺术&#xff0c;和其它的艺术形式一样&#xff0c;也是通过前人不断的摸索与创新才能形成自己独特…

7-7 超级玛丽 (10 分)

假定有n个城堡&#xff0c;编号为1至n&#xff0c;有的城堡之间有道路直接相连&#xff0c;有的城堡之间没有道路直接相连。马里奥现在准备从一个城堡出发前往另一个城堡&#xff0c;它有一个魔法棒&#xff0c;可以瞬时通过一条道路&#xff0c;即以0时间通过这条道路&#xf…

Keras利用卷积神经网络玛丽莲梦露与爱因斯坦的识别Part1

目的 突发奇想想会认为下面这张图片究竟是玛丽莲梦露还是爱因斯坦&#xff0c;主要目的顺便实践练习《Python深度学习》书中的例子&#xff0c;只采用了很小批量的数据&#xff0c;也没有深究如何提高正确率&#xff0c;解决过拟合的问题。详细可以参见《python深度学习》第五章…

玛丽莲·梦露从未公开的照片

世界著名的摄影师伊夫 阿诺德最近将几张玛丽莲梦露从未公开的照片限量分发给了英国几个选定的画廊。 已经94岁高龄的阿诺德当年与梦露建立了互相信任的关系&#xff0c;拍了许多反映梦露私秘生活的照片&#xff0c;但她很少将这些照片公之于众&#xff0c;借助曝露名人的隐私来…

定制化需求|一个人工智能大模型应用的算力成本有多高?

“ 人工智能的核心是算力。” 01 — 需要多少预算&#xff1f; 最近在学习大模型ChatGPT、ChatGLM&#xff0c;研究结合企业的应用场景&#xff0c;解决一些业务难点、痛点&#xff0c;不免涉及本地化部署、微调、训练、知识库文档数据提取等等方面的问题。‍‍‍‍ 同时还需要…

一键在线生成朋友圈转发点赞截图教程

1.我们首先打开朋友圈转发截图生成工具网站&#xff1a;https://oss.361s.cn/tool/pyq/ 2.输入自己的微信昵称&#xff0c;上传头像&#xff0c;撰写文本等等&#xff0c;自定义设置完成之后点击生成即可获得你想要的朋友圈截图。 3.点击保存即可完成全部流程。 本文转载自…

朋友圈转发截图生成工具HTML源码

正文: 朋友圈转发截图生成工具HTML源码&#xff0c;微信朋友圈截图模拟器源码&#xff0c;微信朋友圈装逼生成器大全&#xff0c;上传服务器即可使用&#xff0c;装逼必备&#xff0c;有条件的可以打包成AP。 下载方式: lanzou.com/iXBIb033ddgj

朋友圈转发截图生成工具源码

微信朋友圈截图模拟器源码&#xff0c;微信朋友圈装逼生成器大全&#xff0c;上传服务器即可使用&#xff01;装逼必备&#xff01;有条件的可以打包成APP 图片&#xff1a; 学习资料地址&#xff1a;朋友圈转发截图生成工具源码.zip - 蓝奏云

用itchat爬取朋友圈好友信息

用itchat爬取微信好友基本信息 Python有一个好玩的软件包itchat&#xff0c;提供了一个微信api接口&#xff0c;借此可以爬取朋友圈的一些基本信息&#xff0c;下面我们一起来玩玩吧。 import itchat import numpy as np import pandas as pd from collections import defaul…

微信转发指定的图文消息到朋友圈(JAVA版)

微信转发图文消息步骤 微信转发图文消息步骤 需求获取凭证 获取aceess_token获取jsapi_ticket缓存获取的jsapi_ticket代码 config接口注入权限 引入js文件微信权限注入接口 JS-SDK分享接口调用总结温馨提示 需求 当用户购买成功一样产品&#xff0c;为了使用户能够二次消费&a…

生成朋友圈转发点赞截图的小工具

当当当当&#xff01;开工大吉&#xff01; 新春虎年的第一个工作日&#xff0c;相信有不少的小伙伴跟TJ君一样&#xff0c;斗志满满的开始了新一年的工作之旅。 也肯定有不少的小伙伴还在休假&#xff0c;享受一年难得的相聚。 那么春节期间&#xff0c;大家都去了什么好玩的地…

微信截图不能截微信界面

有时候&#xff0c;大家用电脑微信快捷键 Alt A 时&#xff0c;不能截取 微信窗口 界面。 大家可能很迷茫&#xff0c;不要慌&#xff0c;so easy,一招搞定 1&#xff1a;点击微信聊天界面 小剪刀 2&#xff1a; 取消 截图时隐藏当前窗口 大功告成。

kafka消费指定每次最大消费消息数量 max.poll.records

一个属于new consumer的配置项&#xff0c;出现在0.10及其以上版本中。 #一次调用poll()操作时返回的最大记录数&#xff0c;默认值为500 spring.kafka.consumer.max-poll-records; Properties properties new Properties();properties.put("max.poll.records",2);…

Kafka 消费者读取数据

更多内容&#xff0c;前往 IT-BLOG 消费者不需要自行管理 offset&#xff08;分组topic分区&#xff09;&#xff0c;系统通过 broker 将 offset 存放在本地。低版本通过 zk 自行管理。系统自行管理分区和副本情况。消费者断线后会自动根据上一次记录的 offset 去获取数据&…

Kafka消费异常处理

异常 org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the …

账户系统,余额与体现

参考连接: https://blog.pingxx.com/2018/02/27/用户账户系统该怎么用?/ 账户体系的建立实际上是将结清算分开(即实时清算/定时结算)利于更复杂的支付业务(如分账/层级分润等): 建立账户体系时要根据业务需求考虑各种账户(如余额账户/冻结资金账户/红包账户(不能提现但是能…