Spark Streaming DStream

Spark Streaming DStream

DStream

Discretized Stream,中文叫做离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。

DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。

DStream的内部,其实是一系列持续不断产生的RDD,RDD是Spark Core的核心抽象,即不可变的,分布式的数据集。

DStream中的每个RDD都包含了一个时间段内的数据。
spark streaming 离散数据流

对DStream应用的算子,其实在底层会被翻译为对DStream中每个RDD的操作,比如对一个DStream执行一个map操作,会产生一个新的DStream,其底层原理为,对输入DStream中的每个时间段的RDD,都应用一遍map操作,然后生成的RDD,即作为新的DStream中的那个时间段的一个RDD。

底层的RDD的transformation操作,还是由Spark Core的计算引擎来实现的,Spark Streaming对Spark core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次API。
spark streaming DStream

操作DStream

对于从数据源得到的DStream,用户可以在其基础上进行各种操作。

与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三类:普通的转换操作、窗口转换操作和输出操作。

普通的转换操作

转换描述
map(func)源 DStream的每个元素通过函数func返回一个新的DStream。
flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。
filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。
repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。
union(otherStream)返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。
count()对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。
reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。
countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。
reduceByKey(func, [numTasks])当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。
join(otherStream, [numTasks])当被调用类型分别为(K,V)和(K,W)键值对的2个DStream 时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。
cogroup(otherStream, [numTasks])当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。
transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。
updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。
transform(func)

该transform操作(转换操作)及其类似的transformWith操作,允许在DStream上应用任意的RDD-to-RDD函数。它可以实现DStream API中未提供的操作,比如两个数据流的连接操作。

示例代码如下:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
}
updateStateByKey操作

我们使用的一般操作都是不记录历史数据的,也就说只记录当前定义时间段内的数据,跟前后时间段无关。如果要统计历史时间内的总共数据并且实时更新,如何解决呢?该updateStateByKey操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用updateStateByKey操作,必须进行下面两个步骤 :

  • 定义状态: 状态可以是任意的数据类型。
  • 定义状态更新函数:用一个函数指定如何使用先前的状态和从输入流中获取的新值更新状态。

对DStream通过updateStateByKey(updateFunction)来实现实时更新。

更新函数有两个参数 :

  • newValues是当前新进入的数据。
  • runningCount 是历史数据,被封装到了Option中。

为什么历史数据要封装到Option中呢?有可能我们没有历史数据,这个时候就可以用None,有数据可以用Some(x)。当然我们的当前结果也要封装到Some()中,以便作为之后的历史数据。

我们并不用关心新进入的数据和历史数据,系统会自动帮我们产生和维护,我们只需要专心写处理方法就行。

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {//定义的更新函数val newCount = ...  // add the new values with the previous running count to get the new countSome(newCount)
}
val runningCounts = pairs.updateStateByKey[Int](updateFunction)//应用

示例:

  1. 首先我们需要了解数据的类型

  2. 编写处理方法

  3. 封装结果

    //定义更新函数
    //我们这里使用的Int类型的数据,因为要做统计个数
    def updateFunc(newValues : Seq[Int],state :Option[Int]) :Some[Int] = {//传入的newVaules将当前的时间段的数据全部保存到Seq中//调用foldLeft(0)(_+_) 从0位置开始累加到结束   val currentCount = newValues.foldLeft(0)(_+_) //获取历史值,没有历史数据时为None,有数据的时候为Some//getOrElse(x)方法,如果获取值为None则用x代替val  previousCount = state.getOrElse(0)//计算结果,封装成Some返回Some(currentCount+previousCount) 
    }
    //使用
    val stateDStream = DStream.updateStateByKey[Int](updateFunc)
    

窗口转换函数

Spark Streaming 还提供了窗口的计算,它允许你通过滑动窗口对数据进行转换,窗口转换操作如下:

转换描述
window(windowLength, slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。
countByValueAndWindow(windowLength,slideInterval, [numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的。因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

对于窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同,而窗口间隔一般设置的要比它们两个大。

在这里必须注意的一点是滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。
spark streaming 窗口转换函数

如图所示,批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time 1-time 3,只有窗口间隔满足了定义的长度也就是3才触发数据的处理,不够3继续等待。当间隔满足3之后进行计算后然后进行窗口滑动,滑动2个单位,会有新的数据流入窗口。然后重复等待满足窗口间隔执行计算。

输出操作

Spark Streaming允许DStream的数据被输出到外部系统,如数据库或文件系统。由于输出操作实际上使transformation操作后的数据可以通过外部系统被使用,同时输出操作触发所有DStream的transformation操作的实际执行(类似于RDD操作)。

以下表列出了目前主要的输出操作:

转换描述
print()在Driver中打印出DStream中数据的前10个元素。
saveAsTextFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

DStream持久化

与RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/285182.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JAVA】通过JAVA实现用户界面的登录

🌈个人主页: Aileen_0v0 🔥热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法|MySQL| ​💫个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-wyCvaz0EBNwHcwsi {font-family:"trebuchet ms",verdana,arial,sans-serif;f…

2025汤家凤考研数学视频,基础网课百度网盘课程+PDF讲义资料

2025汤家凤大神及数学全程 docs.qq.com/doc/DTmtOa0Fzc0V3WElI 复制粘贴到浏览器,可以见所有的Ke 第一轮 夯实基础 1.阅读大纲考查要求,明确每章的学习目标; 2.按节学习数学理论基础知识,吃透书中例题; 3.学习每章…

数学建模综合评价模型与决策方法

评价方法主要分为两类,其主要区别在确定权重的方法上 一类是主观赋权法,多次采取综合资讯评分确定权重,如综合指数法,模糊综合评判法,层次评判法,功效系数法等 另一类是客观赋权法,根据各指标…

ubuntu上一款好用的串口工具screen

看名字,你猜他是什么? 安装 sudo apt install screen 然后将USB串口接到虚拟机,执行dmesg命令查看串口设备名: 测试: sudo screen /dev/ttyUSB0 115200确实很简单。

机器视觉学习(六)—— 图像的颜色识别

目录 一、色彩空间 1.1 RGB色彩空间 1.2 HSV色彩空间 1.3 灰度 1.4 CMYK色彩空间 1.5 Lab色彩空间 二、色彩空间转换 三、识别颜色 3.1 识别一种特定的颜色 3.2 识别多种颜色 一、色彩空间 计算机视觉中常用的色彩空间有RGB色彩空间、HSV色彩空间、CMYK色彩空间、La…

如何设计循环队列(两种方法)

文章目录 前言一、方法一:数组法二、方法二.链表法总结 前言 前面有提到过队列的知识,这次来说一下怎么设计一个循环队列 一.循环队列(力扣) . - 力扣(LeetCode). - 备战技术面试?力扣提供海量技术面试资…

Java Synchronized

Synchronized Synchronized 原理偏向锁轻量级锁重量级锁 Synchronized特征jdk1.8Synchronized优化了什么?Synchronized修饰范围Synchronized lock 区别 Synchronized 原理 在Java对象内存布局中,每个对象都有一个对象头,其中包含锁状态信息。…

【stable diffusion扩散模型】一篇文章讲透

目录 一、引言 二、Stable Diffusion的基本原理 1 扩散模型 2 Stable Diffusion模型架构 3 训练过程与算法细节 三、Stable Diffusion的应用领域 1 图像生成与艺术创作 2 图像补全与修复 3 其他领域 四、Stable Diffusion的优势与挑战 👉优势 &#x1f…

SpringBoot3集成PostgreSQL

标签:PostgreSQL.Druid.Mybatis.Plus; 一、简介 PostgreSQL是一个功能强大的开源数据库系统,具有可靠性、稳定性、数据一致性等特点,且可以运行在所有主流操作系统上,包括Linux、Unix、Windows等。 通过官方文档可以…

抠门精出游记之吉隆坡篇

我在新加坡一直是个街溜子,每天就是到处溜达,当然,时髦的词叫做citywalk。anyway,叫啥不重要,新加坡走腻了,跟老婆申请,去吉隆坡溜达一下,为啥要来吉隆坡呢,说起来还是因…

day3-QT

1>使用手动连接,将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中,在自定义的槽函数中调用关闭函。将登录按钮使用qt5版本的连接到自定义的槽函数中,在槽函数中判断ui界面上输入的账号是否为"admin",密码是…

【循环神经网络rnn】一篇文章讲透

目录 引言 二、RNN的基本原理 代码事例 三、RNN的优化方法 1 长短期记忆网络(LSTM) 2 门控循环单元(GRU) 四、更多优化方法 1 选择合适的RNN结构 2 使用并行化技术 3 优化超参数 4 使用梯度裁剪 5 使用混合精度训练 …

科技云报道:造完“大模型”,“具身智能”将引领AI下一个浪潮?

科技云报道原创。 资深机器人专家Eric Jang不久前曾预言:“ChatGPT 曾在一夜之间出现。我认为,有智慧的机器人技术也将如此。” 3月13日深夜,一段人形机器人的视频开始热传。 在视频中,Figure的人形机器人,可以完全…

研华工控机610L学习笔记2:visualstudio与第一个C#程序

今日继续学习工控机 C# 编程相关知识: 这篇结束后我将先进行一段时间的C#的学习研究,并写一些C#的笔记 后续再更新工控机编程设计相关 目录 1、安装visualstudio: 2、创建第一个C#程序: 3、寻找C#解决方案源文件: …

【Godot4.2】基础知识 - Godot中的2D向量

概述 在Godot中,乃至一切游戏编程中,你应该都躲不开向量。这是每一个初学者都应该知道和掌握的内容,否则你将很难理解和实现某些其实原理非常简单的东西。 估计很多刚入坑Godot的小伙伴和我一样,不一定是计算机专业或编程相关专…

pytorch 实现多层神经网络MLP(Pytorch 05)

一 多层感知机 最简单的深度网络称为多层感知机。多层感知机由 多层神经元 组成,每一层与它的上一层相连,从中接收输入;同时每一层也与它的下一层相连,影响当前层的神经元。 softmax 实现了 如何处理数据,如何将 输出…

SpringAOP+自定义注解实现限制接口访问频率,利用滑动窗口思想Redis的ZSet(附带整个Demo)

目录 1.创建切面 2.创建自定义注解 3.自定义异常类 4.全局异常捕获 5.Controller层 demo的地址,自行获取《《—————————————————————————— Spring Boot整合Aop面向切面编程实现权限校验,SpringAop自定义注解自定义异常全局…

【微服务】Gateway服务网关

📝个人主页:五敷有你 🔥系列专栏:微服务 ⛺️稳中求进,晒太阳 Spring Cloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等响…

Windows 设置多显示器显示

Windows 设置多显示器显示 1. Windows 7 设置 HDMI 输出2. Windows 11 设置多显示器显示References 1. Windows 7 设置 HDMI 输出 2. Windows 11 设置多显示器显示 ​​​ References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

Ubuntu Desktop 安装谷歌拼音输入法

Ubuntu Desktop 安装谷歌拼音输入法 1. Installation1.1. 汉语语言包​1.2. 谷歌拼音输入法1.3. 安装语言包1.4. 键盘输入方式系统1.5. 重启电脑1.6. 输入法配置 2. configuration2.1. Text Entry Settings… 3. ExecutionReferences 1. Installation 1.1. 汉语语言包 strong…