【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2

文章目录

  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
  • 前言
  • 一、背景说明
  • 二、使用步骤
    • 1.引入库
    • 2.开发代码
    • 运行测试
  • 总结


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
在这里插入图片描述

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"ThisBuild / scalaVersion := "2.13.11"lazy val root = (project in file(".")).settings(name := "SparkLearning",idePackagePrefix := Some("cn.lh.spark"),libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  object NetworkWordCountStatefultoMysql {  def main(args: Array[String]): Unit = {  //    定义状态更新函数  val updateFunc = (values: Seq[Int], state: Option[Int]) => {  val currentCount = values.foldLeft(0)(_ + _)  val previousCount = state.getOrElse(0)  Some(currentCount + previousCount)  }  //    设置log4j日志级别  StreamingExamples.setStreamingLogLevels()  val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  //    设置检查点,具有容错机制  scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  val words: DStream[String] = lines.flatMap(_.split(" "))  val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  // 打印出状态  stateDstream.print()  // 将统计结果保存到MySQL中  stateDstream.foreachRDD(rdd =>{  val repartitionedRDD = rdd.repartition(3)  repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  })  scc.start()  scc.awaitTermination()  scc.stop()  }  }

StreamingSaveMySQL8.scala

package cn.lh.spark  import java.sql.DriverManager  object StreamingSaveMySQL8 {  // 定义写入 MySQL 的函数  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  // 保存到MySQL  val ip = "192.168.137.110"  val port = "3306"  val db = "sparklearning"  val username = "lh"  val pwd = "Lh123456!"  val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  val conn = DriverManager.getConnection(jdbcurl, username, pwd)  val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  try {  // 写入数据  iter.foreach { wc =>  statement.setString(1, wc._1.trim)  statement.setInt(2, wc._2.toInt)  statement.executeUpdate()  }  } catch {  case e:Exception => e.printStackTrace()  } finally {  if(statement != null){  statement.close()  }  if(conn!=null){  conn.close()  }  }  }  }

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    在这里插入图片描述

  2. 启动nc -lk 9999
    在这里插入图片描述

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]在这里插入图片描述

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

在这里插入图片描述


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/82426.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

时序数据库 TDengine 与 WhaleStudio 完成相互兼容性测试认证

近年来,开源及其价值获得社会各界的广泛认可,无论是国家政策导向还是企业数字化转型,都在加速拥抱开源。对于如操作系统、数据库等基础软件来说,开源更是成为驱动技术创新的有力途径。 在此背景下,近日,涛…

Feign

一、为什么使用Feign 二、Feign和RestTemplate的区别 三、自定义配置 四、Feign日志 1.配置文件方式 2.java代码方式

TS 踩坑之路(四)之 Vue3

一、在使用定义默认值withDefaults和defineProps 组合时,默认值设置报错 代码案例 报错信息 不能将类型“{ isBackBtn: false; }”分配给类型“(props: PropsType) > RouteMsgType”。 对象字面量只能指定已知属性,并且“isBackBtn”不在类型“(pro…

【机器学习】在 MLOps构建项目 ( MLOps2)

My MLOps tutorials: Tutorial 1: A Beginner-Friendly Introduction to MLOps教程 2:使用 MLOps 构建机器学习项目 一、说明 如果你希望将机器学习项目提升到一个新的水平,MLOps 是该过程的重要组成部分。在本文中,我们将以经典手写数字分类…

【车道线】TwinLiteNet 复现过程全纪录

码字不易,喜欢的请点赞收藏!!!!! 论文全文翻译:【freespace】TwinLiteNet: An Efficient and Lightweight Model for Driveable Area and Lane Segmentation_莫克_Cheney的博客-CSDN博客 目录…

Vue2:组件高级(下)

Vue2:组件高级(下) Date: May 25, 2023 Sum: 自定义指令、插槽、商品列表、动态组件 目标: 自定义指令 基础概念: 概念: 内置指令:vue 官方提供了 v-for、v-model、v-if 等常用的内置指令。…

机器学习基础08-回归算法矩阵分析(基于波士顿房价(Boston House Price)数据集)

回归算法通常涉及到使用矩阵来表示数据和模型参数。线性回归是最常见的回归算法之一,它可以用矩阵形式来表示。 考虑一个简单的线性回归模型: y m x b y mx b ymxb,其中 y y y 是因变量, x x x 是自变量, m m m 是…

nginx服务

目录 基本介绍 nginx的主要功能 nginx的主要应用场景 nginx常用命令 nginx另外一种安装方式 nginx常用的信号符: nginx配置文件详解 全局配置 event模块 http模块 server模块 location模块: 模块的划分 基本介绍 nginx:高性能、…

【Docker】AUFS、BTRFS、ZFS、储存池详解

洁洁的个人主页 我就问你有没有发挥! 知行合一,志存高远。 前言 Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux或Windows操作系统的机器上,也可以实现虚拟化,容器是…

元组是什么,python怎么使用元组

目录 引言 元组的概念 作用和优势 元组的应用 Python中如何使用元组 注意事项 总结 引言 在Python编程语言中,元组(Tuple)是一种不可变的数据结构,它允许我们以有序且不可修改的方式存储多个元素。与列表不同,…

FFmpeg 编码详细流程

介绍 FFmpeg的 libavcodec 模块完成音视频多媒体的编解码模块。FFmpeg 本身不具有音视频编码的功能和底层能力,只是对各类第三方的编码器API 进行封装调用。老版本的 FFmpeg 将avcodec_encode_video2()作为视频的解码函数 API,将avcodec_encode_audio2(…

【C语言】初识C语言+进阶篇导读

✨个人主页: Anmia.🎉所属专栏: C Language 🎃操作环境: Visual Studio 2019 版本 本篇目的是面向编程新手,没接触过编程的人。以及C进阶的导读。 内容是C语言重要知识点的简单解释,不做详解。给…

web-ssrf

目录 ssrf介绍 以pikachu靶场为例 curl 访问外网链接 利用file协议查看本地文件 利用dict协议扫描内网主机开放端口 file_get_content 利用file协议查看本地文件: fsockopen() 防御方式: ssrf介绍 服务器端请求伪造,是一种由攻击者构造形成…

ClickHouse(九):Clickhouse表引擎 - Log系列表引擎

进入正文前,感谢宝子们订阅专题、点赞、评论、收藏!关注IT贫道,获取高质量博客内容! 🏡个人主页:含各种IT体系技术,IT贫道_Apache Doris,Kerberos安全认证,大数据OLAP体系技术栈-CSDN博客 &…

使用ResponseBodyAdvice做分页处理

目录 父pom文件 pom文件 配置文件 MyResponseBodyAdvice ResponseDto MyBatisConfig UsersController UsersMapper UserMapper.xml 结果 父pom文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/PO…

代码反向生成时序图类图-Visual_Paradigm实践

目录 前言 一、Visual_Paradigm介绍 1、主界面介绍 2、功能简介 二、基于代码的序列图生成 1、新建VP项目工程 2、序列图反向生成 三、VP类图生成 1、生成主入口 四、数据ER模型生成 1、SQL脚本 2、ER反向生成 总结 前言 不知道作为研发的小伙伴们&#xff0c;在平时的…

【Git】保姆级详解:Git配置SSH Key(密钥和公钥)到github

博主简介&#xff1a;22级计算机科学与技术本科生一枚&#x1f338;博主主页&#xff1a;是瑶瑶子啦每日一言&#x1f33c;: “当人们做不到一些事情的时候&#xff0c;他们会对你说你也同样不能。”——《当幸福来敲门》 克里斯加德纳 Git配置SSH Key 一、什么是Git?二、什么…

【D3S】集成smart-doc并同步配置到Torna

目录 一、引言二、maven插件三、smart-doc.json配置四、smart-doc-maven-plugin相关命令五、推送文档到Torna六、通过Maven Profile简化构建 一、引言 D3S&#xff08;DDD with SpringBoot&#xff09;为本作者使用DDD过程中开发的框架&#xff0c;目前已可公开查看源码&#…

攻防世界-web-shrine

1. 题目描述 打开链接&#xff0c;发现是一串源码&#xff1a; 从源码中不难发现关键词是flask.render_template_string(safe_jinja(shrine)) &#xff0c;这个函数说明了题目的关键点在于模板渲染&#xff0c;即存在模板注入 2. 思路分析 从代码中不难发现&#xff0c;即使…

探索泛型与数据结构:解锁高效编程之道

文章目录 引言第一部分&#xff1a;了解泛型1.1 为什么使用泛型1.2 使用泛型的好处 第二部分&#xff1a;泛型的使用场景2.1 类的泛型2.2 方法的泛型2.3 接口的泛型 第三部分&#xff1a;泛型通配符3.1 通配符3.2 通配符的受限泛型 第四部分&#xff1a;数据结构和泛型的应用4.…