Spark Streaming编程基础

文章目录

  • 1. 流式词频统计
    • 1.1 Spark Streaming编程步骤
    • 1.2 流式词频统计项目
      • 1.2.1 创建项目
      • 1.2.2 添加项目依赖
      • 1.2.3 修改源目录
      • 1.2.4 添加scala-sdk库
      • 1.2.5 创建日志属性文件
    • 1.3 创建词频统计对象
    • 1.4 利用nc发送数据
    • 1.5 启动应用,查看结果
  • 2. 编程模型的基本概念
    • 2.1 数据源依赖的配置
    • 2.2 SparkConf概述
    • 2.3 StreamingContext概述
    • 2.4 初始化StreamingContext详解
  • 3. 离散化数据流
    • 3.1 DStream概述
    • 3.2 DStream内部原理
  • 4. 基本数据源
    • 4.1 Spark Streaming支持两种数据源
    • 4.2 Spark Streaming 资源分配注意事项
    • 4.3 网络端口使用单个接收器获取数据
      • 1. 获取程序入口
      • 2. 业务代码编写
      • 3. 启动实时流程序
      • 4. 查看程序运行结果
  • 5. 基本DStream转换操作
  • 6. DStream输出操作

1. 流式词频统计

  • 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用 nc 工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。

1.1 Spark Streaming编程步骤

  1. 添加SparkStreaming相关依赖
  2. 获取程序入口接收数据
  3. 对数据进行业务处理
  4. 获取最终结果
  5. 启动程序等待程序执行结束

1.2 流式词频统计项目

1.2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述

1.2.2 添加项目依赖

  • pom.xml文件里添加依赖
    在这里插入图片描述
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
  • 刷新项目依赖
    在这里插入图片描述

1.2.3 修改源目录

  • java修改为scala
    在这里插入图片描述

  • pom.xml里设置源目录
    在这里插入图片描述

1.2.4 添加scala-sdk库

  • 在项目结构对话里添加
    在这里插入图片描述
  • 单击【Add to Modules】菜单项
    在这里插入图片描述
  • 单击【OK】按钮以后,就可以在scala里创建Scala Class
    在这里插入图片描述

1.2.5 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

1.3 创建词频统计对象

  • 创建net.huawei.streaming
    在这里插入图片描述
  • net.huawei.streaming包里创建SparkStreamingWordCount对象
    在这里插入图片描述
package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
  • 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(bigdata1:9999)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。

1.4 利用nc发送数据

  • bigdata1节点利用nc发送数据,执行命令:nc -lp 9999
    在这里插入图片描述

1.5 启动应用,查看结果

  • 启动SparkStreamingWordCount对象,在bigdata1节点上输入数据,在控制台查看词频统计结果
    在这里插入图片描述
  • 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用 updateStateByKeymapWithState 实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。

2. 编程模型的基本概念

2.1 数据源依赖的配置

  • 对于Spark Streaming Core API中不存在的数据源(例如Kafka/Kinesis)获取数据,必须要在工程中引入数据源相关的依赖:
SourceArtifact
Kafkaspark-streaming-kafka-0-10_2.12
Kinesisspark-streaming-kinesis-asl_2.12 [Amazon Software License]
  • 需要注意的是引入的依赖要和环境保护一致,例如 spark-streaming-kafka-0-10_2.12 这个是 scala 的版本,要和工程的 scala 版本保持一致。

2.2 SparkConf概述

  • SparkConf 是 Apache Spark 中用于管理应用程序配置的核心类。它通过键值对的形式设置和存储配置参数,如 Master URL、应用程序名称、资源分配等。SparkConf 支持从系统属性加载配置,并允许动态设置和获取参数。其配置优先级高于默认配置文件,确保应用程序在不同环境中灵活运行。SparkConf 还提供克隆和调试功能,便于配置管理和问题排查。
    在这里插入图片描述

2.3 StreamingContext概述

  • StreamingContext 是 Apache Spark Streaming 的核心类,负责流式数据处理的初始化和管理。它通过 SparkContextSparkConf 配置运行环境,支持多种数据源(如 Socket、文件、队列等),并提供启动、停止和检查点功能。StreamingContext 还允许用户定义数据处理逻辑,并通过监听器监控作业状态,确保流式应用的可靠性和高效性。
    在这里插入图片描述

2.4 初始化StreamingContext详解

// 创建SparkConf对象,2个线程,本地运行
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")
// 创建StreamingContext对象,10秒一个批次
val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
  1. conf对象是创建任何spark组件的入口,使用new SparkConf()可以构建对象。
  2. SparkConf实例对象管理Spark应用程序的配置,所有的配置采用key-value键值对的方式进行配置,代码中设置的参数优先级别大于配置文件中的配置。
  3. 如果想做单元测试的话,可以使用new SparkConf(false)的方式跳过加载外部的配置文件和系统属性(默认会加载spark和java系统所有配置信息)。
  4. 将SparkConf对象传递给 SparkContext 实例之后,SparkConf对象将被克隆,用户将无法再对其进行修改。Spark不支持在运行时修改配置。
  5. setMaster()可以设置Master运行模式,本地测试一般使用local方式,local[*]这种方式一般不推荐使用,默认使用当前机器所有的core,线上环境建使用spark-submit的方式进行资源配置,防止编写硬编码的程序。
  6. setAppName()设置程序运行时的名字,本地测试和集群线上环境都是通用的。
  7. 任何一个流式计算程序都需要一个StreamingContext实例,创建当前实例一般需要2个参数,一个是conf配置信息,另外一个是多长时间处理一批数据的参数,Seconds(num),num的单位是秒,具体num设置多少,取决于集群处理数据的能力。

3. 离散化数据流

3.1 DStream概述

  • SparkStreaming数据模型:DStream是Spark Streaming中最基本最重要的一个抽象概念(可以把DStream简单的理解为一个类,类中可以保存数据并且提供一系列操作数据的方法)。
  • DStream数据结构含义:DStream表示为一个连续的数据流,内部由一系列的数据组成,这些数据既可以是从数据源接收到的数据,也可以是从数据源接收到的数据经过transform操作转换后的数据。

3.2 DStream内部原理

  • 从本质上来说一个DStream是由一系列连续的RDDs组成,DStream中的每一个RDD包含了一个batch的数据,每个batch的数据都包含了特定时间间隔的数据(也可以理解为DStream中的每个RDD都是包含了特定时间间隔的数据)
    在这里插入图片描述
DStream转换本质SparkStreaming默认参数
DStream类中存在数据和对数据的操作方法(函数),如果对DStream中的数据使用函数进行操作的话,本质是对其内部的RDD进行操作的,这些底层RDD转换由Spark引擎进行计算。DStream相关操作隐藏了大部分这些细节,并为开发人员提供了更高级别的API以方便它们在SparkStreaming中默认200ms会形成一个数据块(无论是否接收到数据),当到达StreamingContext实例设置的批次时间时会触发业务逻辑算子处理这些数据
  • DStream上做的所有操作在底层都会转换成RDD的操作,每一个时间批次的数据都会被算子与算子之间的血缘关系链中的所有算子进行处理。
    在这里插入图片描述

4. 基本数据源

4.1 Spark Streaming支持两种数据源

  • 基本数据源:通过网络端口获取数据、监控文件系统
  • 扩展数据源:Kafka、Kinesis 等第三方数据存储系统
  • 注意:从扩展数据源中获取数据时,Spark Streaming 中没有直接支持的 API,需要额外引入依赖

4.2 Spark Streaming 资源分配注意事项

  • Spark Streaming 是长期运行的任务,需要占用一定的内存和 CPU 核。
  • 在分配资源时,必须确保 CPU 核数大于接收数据的核数。
  • 如果 CPU 核数不足,可能会导致所有 CPU 核都用于接收数据,而没有 CPU 核来处理数据。

4.3 网络端口使用单个接收器获取数据

  • 在正式编写程序之前,需要准备一个对外开放的端口来传输数据。可以使用 nc -lp port 命令开放一个端口用于传输数据。启动实时流程序后,从 port 获取数据形成 DStream。需要注意的是,在启动实时流程序前,必须提前开放一个传输数据的端口。

1. 获取程序入口

  • 导入相关类
    在这里插入图片描述
  • 设置日志打印级别
    在这里插入图片描述
  • 构建SparkConf实例对象
    在这里插入图片描述
  • 构建SparkContext实例对象
    在这里插入图片描述
  • 构建StreamingContext实例对象,每间隔5秒处理一批数据
    在这里插入图片描述

2. 业务代码编写

  • 测试目的:从网络端口获取数据
  • 业务代码功能:仅打印从网络端口获取的数据
    在这里插入图片描述

3. 启动实时流程序

在这里插入图片描述

4. 查看程序运行结果

  • 启动nc,准备输入数据
    在这里插入图片描述
  • 启动SparkStreamingNetcat对象,在bigdata1节点通过nc输入数据
    在这里插入图片描述
  • 查看控制台输出结果
    在这里插入图片描述

5. 基本DStream转换操作

6. DStream输出操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/8629.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript中的隐式类型转换

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

React第二十六章(createPortal)

createPortal 注意这是一个API&#xff0c;不是组件&#xff0c;他的作用是&#xff1a;将一个组件渲染到DOM的任意位置&#xff0c;跟Vue的Teleport组件类似。 用法 import { createPortal } from react-dom;const App () > {return createPortal(<div>小满zs<…

Linux学习笔记——磁盘管理命令

lsblk(list block devices):其功能是查看系统的磁盘使用情况 df(disk free):列出文件系统的整体磁盘使用量 du(disk used):检查磁盘空间使用量 fdisk:用于磁盘分区&#xff08;创建分区&#xff09; mkfs:创建并构建一个文件系统&#xff08;格式化&#xff09; mount:挂在…

QT使用eigen

QT使用eigen 1. 下载eigen https://eigen.tuxfamily.org/index.php?titleMain_Page#Download 下载后解压 2. QT引入eigen eigen源码好像只有头文件&#xff0c;因此只需要引入头文件就好了 qt新建项目后。修改pro文件. INCLUDEPATH E:\222078\qt\eigen-3.4.0\eigen-3.…

国内优秀的FPGA设计公司主要分布在哪些城市?

近年来&#xff0c;国内FPGA行业发展迅速&#xff0c;随着5G通信、人工智能、大数据等新兴技术的崛起&#xff0c;FPGA设计企业的需求也迎来了爆发式增长。很多技术人才在求职时都会考虑城市的行业分布和发展潜力。因此&#xff0c;国内优秀的FPGA设计公司主要分布在哪些城市&a…

Linux——rzsz工具

rzsz这个工具用于 windows 机器和远端的 Linux 机器通过 XShell 传输文件. 安装完毕之后可以通过拖拽的方式将文件上传过去. 安装rzsz工具 rz&#xff1a;从Windows机器上传到远程Linux机器&#xff08;或者直接把文件托进Xshell中&#xff09; sz&#xff1a;将文件从Linux远…

【linux网络(3)】应用层HTTP协议详解

目录 前言1. 序列化和反序列化2. 认识URL3. 对网络中资源的理解4. HTTP的报文格式5. HTTP方法详解6. HTTP的状态码和header7. HTTP会话管理8. 总结以及拓展 前言 在理解了网络套接字编程后, 后续的文章会从应用层到链路层, 详解的讲解每一层的协议都做了些什么工作, 并且会拆分…

利用JSON数据类型优化关系型数据库设计

利用JSON数据类型优化关系型数据库设计 前言 在关系型数据库中&#xff0c;传统的结构化存储方式要求预先定义好所有的列及其数据类型。 然而&#xff0c;随着业务的发展&#xff0c;这种设计可能会显得不够灵活&#xff0c;尤其是在需要扩展单个列的描述功能时。 JSON数据…

cursor ide配置远程ssh qt c++开发环境过程记录

cursor是啥就不介绍了&#xff0c;好像是目前最好用的ai ide&#xff0c;下面主要是配置远程ssh连接linux机器进行qt5 c程序运行的配置过程记录。 一、c_cpp_properties.json 在项目根目录的.vscode目录里面新建c_cpp_properties.json文件&#xff0c;根据你的实际情况配置该文…

npm:升级自身时报错:EBADENGINE

具体报错信息如下&#xff1a; 1.原因分析 npm和当前的node版本不兼容。 // 当前实际版本: Actual: {"npm":"10.2.4","node":"v20.11.0"}可以通过官网文档查看与自己 node 版本 兼容的是哪一版本的npm&#xff0c;相对应进行更新即可…

Excel中LOOKUP函数的使用

文章目录 VLOOKUP&#xff08;垂直查找&#xff09;&#xff1a;HLOOKUP&#xff08;水平查找&#xff09;&#xff1a;LOOKUP&#xff08;基础查找&#xff09;&#xff1a;XLOOKUP&#xff08;高级查找&#xff0c;较新版本Excel提供&#xff09;&#xff1a; 在Excel中&…

Verilog中if语句和case语句综合出的电路区别

区别是 if else 的逻辑判断有优先级&#xff0c;最内层的 if 的优先级最高&#xff0c;case 的逻辑判断是并列的。 每个 if else 综合出来的电路是一个 2 选 1 选通器。当信号有明显优先级时使用该语句&#xff0c;但是 if 嵌套太多的话会导致路径延时过大&#xff0c;降低运行…

【C语言常见概念详解】

目录 -----------------------------------------begin------------------------------------- 什么是C语言&#xff1a; 1. 基本数据类型 2. 变量与常量 3. 运算符与表达式 4. 控制结构 5. 函数 6. 指针 7. 数组与字符串 8. 结构体与联合体 9. 文件操作 结语 ----…

CE11.【C++ Cont】练习题组12(结构体专题)

目录 1.P5742【深基7.例11】评等级 题目 代码 提交结果 2.B2125 最高分数的学生姓名 题目 代码 方法1 提交结果 方法2:在方法1基础上改进 提交结果 ​编辑 方法3:先排序后选,较麻烦 提交结果 ​编辑 3.[NOIP2007 普及组] 奖学金 题目 错误代码 提交结果 调试…

开源项目Umami网站统计MySQL8.0版本Docker+Linux安装部署教程

Umami是什么&#xff1f; Umami是一个开源项目&#xff0c;简单、快速、专注用户隐私的网站统计项目。 下面来介绍如何本地安装部署Umami项目&#xff0c;进行你的网站统计接入。特别对于首次使用docker的萌新有非常好的指导、参考和帮助作用。 Umami的github和docker镜像地…

Nginx开发01:基础配置

一、下载和启动 1.下载、使用命令行启动&#xff1a;Web开发&#xff1a;web服务器-Nginx的基础介绍&#xff08;含AI文稿&#xff09;_nginx作为web服务器,可以承担哪些基本任务-CSDN博客 注意&#xff1a;我配置的端口是81 2.测试连接是否正常 访问Welcome to nginx! 如果…

20.Word:小谢-病毒知识的科普文章❗【38】

目录 题目​ NO1.2.3文档格式 NO4.5 NO6.7目录/图表目录/书目 NO8.9.10 NO11索引 NO12.13.14 每一步操作完&#xff0c;确定之后记得保存最后所有操作完记得再次删除空行 题目 NO1.2.3文档格式 样式的应用 选中应用段落段落→开始→选择→→检查→应用一个一个应用ctr…

【Python】第五弹---深入理解函数:从基础到进阶的全面解析

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】【MySQL】【Python】 目录 1、函数 1.1、函数是什么 1.2、语法格式 1.3、函数参数 1.4、函数返回值 1.5、变量作用域 1.6、函数…

从AD的原理图自动提取引脚网络的小工具

这里跟大家分享一个我自己写的小软件&#xff0c;实现从AD的原理图里自动找出网络名称和引脚的对应。存成文本方便后续做表格或是使用简单行列编辑生成引脚约束文件&#xff08;如.XDC .UCF .TCL等&#xff09;。 我们在FPGA设计中需要引脚锁定文件&#xff0c;就是指示TOP层…

MySQL--》深度解析InnoDB引擎的存储与事务机制

目录 InnoDB架构 事务原理 MVCC InnoDB架构 从MySQL5.5版本开始默认使用InnoDB存储引擎&#xff0c;它擅长进行事务处理&#xff0c;具有崩溃恢复的特性&#xff0c;在日常开发中使用非常广泛&#xff0c;其逻辑存储结构图如下所示&#xff0c; 下面是InnoDB架构图&#xf…