大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式

喜大普奔!破百了!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Spark Streaming 基础数据源
  • 文件流、Socket流、RDD队列流
  • 引入依赖、Java编写多种流进行测试

在这里插入图片描述

DStream 转换

DStream上的操作与RDD类似,分为Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的方法,如:

  • updateStateByKey
  • transform
  • window相关操作

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

map(func)

对 DStream 中的每个元素应用 func 函数,并返回一个新的 DStream。
例如,将每个记录转换为其长度。
示例:val lengths = lines.map(line => line.length)

flatMap(func)

对 DStream 中的每个元素应用 func 函数,并将结果展平(即将集合的集合展开)。
例如,将每一行文本拆分为单词。
示例:val words = lines.flatMap(line => line.split(" "))

filter(func)

对 DStream 中的每个元素应用 func 函数,并保留返回值为 true 的元素。
例如,过滤掉长度小于 5 的单词。
示例:val filteredWords = words.filter(word => word.length > 5)

reduceByKey(func)

对键值对 DStream 进行聚合操作,对具有相同键的元素应用 func 函数。
例如,计算每个单词的总数。
示例:val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

groupByKey()

对键值对 DStream 中的每个键进行分组,并将具有相同键的值聚合到一个列表中。
示例:val grouped = pairs.groupByKey()

count()

统计 DStream 中每个 RDD 的元素个数。
示例:val count = words.count()

countByValue()

统计 DStream 中每个 RDD 中每个值的出现次数。
示例:val valueCounts = words.countByValue()

union(otherDStream)

将两个 DStream 合并为一个新的 DStream,包含两个 DStream 中的所有元素。
示例:val mergedStream = stream1.union(stream2)

join(otherDStream)

对两个键值对 DStream 进行连接操作,类似 SQL 中的 JOIN 操作。
示例:val joinedStream = stream1.join(stream2)

备注:

  • 在DStream与RDD上的转换操作非常类似(无状态操作)
  • DStream有自己特殊的操作(窗口操作、追踪状态变化操作)
  • 在DStream上的转换操作比RDD上的转换操作少

DStream 的转换操作可以分为 无状态(stateless)和 有状态(stateful)两种:

  • 无状态转换操作,每个批次的处理不依赖与之前批次的数据,常见的RDD转化操作,例如:map、Filter、reduceByKey等
  • 有状态转换操作,需要使用之前批次的数据或者是中间结果来计算当前批次的数据,有状态转换操作包括:基于滑动窗口的转换操作或追踪状态变化的转化操作

无状态转换

无状态转换操作就是把简单的RDD转换操作应用到每个批次上,也就是转换DStream中的每一个RDD。
常见的无状态转换包括:

  • map
  • flatMap
  • repartition
  • reduceByKey
  • groupByKey

重要的转换操作:transform,通过对源DStream的每个RDD应用RDD-To-RDD函数,创建一个新的DStream,支持在新的DStream中任何RDD操作。
这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD,也就是说开发者,可以任意提供一个RDDToRDD的函数,这个函数在数据流每个批次中都被调用,生成一个新的流。

案例1 黑名单过滤

假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据
未生效
val arr1 = Array(("spark", true), ("scala", false))
假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操
作。如"2 spark"要被过滤掉
1 hadoop
2 spark
3 scala
4 java
5 hive
结果:"2 spark" 被过滤

方案1 外连接实现

package icu.wzkimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object BlackListFilter1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("BlackListFilter1").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(10))// 黑名单val blackList = Array(("spark", true), ("scala", true))val blackListRDD = ssc.sparkContext.makeRDD(blackList)// 测试数据val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper".split("\\s+").zipWithIndex.map {case (word, index) => s"$index $word"}val rdd = ssc.sparkContext.makeRDD(strArray)val clickStream = new ConstantInputDStream(ssc, rdd)// 流式数据的处理val clickStreamFormatted = clickStream.map(value => (value.split(" ")(1), value))clickStreamFormatted.transform(clickRDD => {val joinedBlockListRDD: RDD[(String, (String, Option[Boolean]))] = clickRDD.leftOuterJoin(blackListRDD)joinedBlockListRDD.filter {case (word, (streamingLine, flag)) => {if (flag.getOrElse(false)) {false} else {true}}}.map {case (word, (streamingLine, flag)) => streamingLine}}).print()// 启动ssc.start()ssc.awaitTermination()}
}

方案1 运行结果

-------------------------------------------
Time: 1721618670000 ms
-------------------------------------------
5 hive
6 hbase
1 java
7 zookeeper
3 hadoop
4 kafka... 下一批

对应的结果如下图所示:
在这里插入图片描述

方案2 SQL实现

package icu.wzkimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object BlackListFilter2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("BlackListFilter2").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(10))ssc.sparkContext.setLogLevel("WARN")// 黑名单val blackList = Array(("spark", true), ("scala", true))val blackListRDD = ssc.sparkContext.makeRDD(blackList)// 生成测试 DStreamval strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper".split("\\s+").zipWithIndex.map {case (word, index) => s"$index $word"}val rdd = ssc.sparkContext.makeRDD(strArray)val clickStream = new ConstantInputDStream(ssc, rdd)// 流式数据的处理val clickStreamFormatted = clickStream.map(value => (value.split(" ")(1), value))clickStreamFormatted.transform {clickRDD =>val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import spark.implicits._val clickDF: DataFrame = clickRDD.toDF("word", "line")val blackDF: DataFrame = blackListRDD.toDF("word", "flag")clickDF.join(blackDF, Seq("word"), "left").filter("flag is null or flag == false").select("line").rdd}.print()ssc.start()ssc.awaitTermination()}
}

方案2 SQL运行结果

-------------------------------------------
Time: 1721619900000 ms
-------------------------------------------
[6 hbase]
[4 kafka]
[7 zookeeper]
[1 java]
[3 hadoop]
[5 hive]

运行结果截图如下图所示:
在这里插入图片描述

方案3 直接过滤

package icu.wzkimport org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object BlackListFilter3 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("BlackListFilter3").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(10))ssc.sparkContext.setLogLevel("WARN")// 黑名单val blackList = Array(("spark", true), ("scala", true))val blackListBC: Broadcast[Array[String]] = ssc.sparkContext.broadcast(blackList.filter(_._2).map(_._1))// 生成测试DStreamval strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper".split("\\s+").zipWithIndex.map {case (word, index) => s"$index $word"}val rdd = ssc.sparkContext.makeRDD(strArray)val clickStream = new ConstantInputDStream(ssc, rdd)// 流式数据的处理clickStream.map(value => (value.split(" ")(1), value)).filter {case (word, _) => !blackListBC.value.contains(word)}.map(_._2).print()// 启动ssc.start()ssc.awaitTermination()}
}

方案3 直接过滤运行结果

-------------------------------------------
Time: 1721627600000 ms
-------------------------------------------
1 java
3 hadoop
4 kafka
5 hive
6 hbase
7 zookeeper... 下一批

运行结果如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/407809.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java框架Shiro、漏洞工具利用、复现以及流量特征分析

Shiro流量分析 前言 博客主页: 靶场:Vulfocus 漏洞威胁分析平台 Shiro(Apache Shiro)是一个强大且灵活的开源安全框架,专为Java应用程序提供安全性解决方案。它由Apache基金会开发和维护,广泛应用于企业级…

毛利率承压连亏三年后一季度业绩暴增,百利天恒谋求A+H双上市

《港湾商业观察》施子夫 7月10日,四川百利天恒药业股份有限公司(以下简称,百利天恒)递表港交所主板,联席保荐机构高盛、摩根大通和中信证券。 此次递表港交所系百利天恒第二次谋求上市,若上市成功&#x…

PyTorch升级之旅——安装与基本知识

目录 一、安装 二、张量 创建tensor 张量的操作 广播机制 三、自动求导 四、并行计算 (一)网络结构分布到不同的设备中(Network partitioning) (二)同一层的任务分布到不同数据中(Layer-wise partitioning) (…

GoModule

GOPATH 最早的就是GOPATH构建模式, go get下载的包都在path中的src目录下 src目录是源代码存放目录。 package mainimport ("net/http""github.com/gorilla/mux" )func main() {r : mux.NewRouter()r.HandleFunc("/hello", func(w h…

解决使用matplotlib不显示中文的问题

某季度某城市某天11点到12点气温变化图 import random x range(60) y_BeiJing [random.uniform(15,18) for i in x] plt.figure(figsize(20,8),dpi80) plt.plot(x,y_BeiJing) x_label ["11点{}分".format(i) for i in x] plt.xticks(x[::5],x_label[::5]) plt.yt…

【精选】基于微信小程序的地铁站点查询系统(全网独一无二,阿龙原创设计)

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

C# x Unity面向对象补全计划 设计模式 之 实现一个简单的有限状态机

一个简单的有限状态机可以有如下内容 1.状态基类(定义基本状态的方法,如进入(Enter)、执行(Execute)和退出(Exit),同时可以在此声明需要被管理的对象) 2.具体…

【精选】基于python的影片数据爬取与数据分析

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

软件设计师教程(第5版)第5章 软件工程基础知识(更新中)

5.1 软件工程概述 【软件工程】是指应用计算机科学、数学及管理科学等原理,以工程化的原则和方法来解决软件问题的工程,其目的是提高软件生产率、提高软件质量、降低软件成本。P239 5.1.1 计算机软件 计算机软件是指计算机系统中的【程序】及其【文档】。P240 【…

一文解决---IDEA汉化问题(含中英文切换)

一、英文->中文: ①.下载汉化包插件: 操作顺序:File->Settings->Plugins 在搜索框输入Chinese,然后找到 Chinese (Simplified) Language (汉化插件),等待下载完→Install (安装)&…

OpenCV几何图像变换(9)仿射变换函数warpAffine()的使用

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 函数是应用一个仿射变换到图像上。 warpAffine 函数使用指定的矩阵对源图像进行仿射变换: dst ( x , y ) src ( M 11 x M 12 y M…

《机器学习》 决策树剪枝、树模型参数及案例演示

目录 一、决策树剪枝 1、什么是决策树剪枝? 2、如何剪枝 3、剪枝剪哪个位置的叶子结点 二、树模型参数及用法 1、参数种类 2、参数解释 1)criterion:gini or entropy 2)splitter:best or random 3&#xff0…

从心理学的角度,探究一下人类为什么爱玩游戏。(缓解压力、社交需求、 获得成就感)

文章目录 引言I 游戏中的美学和文化元素,是影响玩家心理状态的关键因素。音乐美工文化背景II 成年人对游戏的心理需求获得成就感社交需求缓解压力III 心流理论(Flow Theory)解释玩家虽受虐,但也其乐无穷的现象知识扩展: 心流知识扩展: 心流活动知识扩展:得性乐观(Learne…

新版本 | GreatSQL 8.0.32-26全新发布 增强“四高”诸多新特性

近日,GreatSQL开源数据库社区正式发布 GreatSQL 8.0.32-26新版本,在高可用、高性能、高兼容、高安全等诸多方面进行了特性增强,修复多个缺陷,并详细说明了多个典型应用场景下,升级/降级到GreatSQL 8.0.32-26的操作策略…

Linux自旋锁和读写锁

在前面的文章中我们已经介绍了有关互斥锁的概念与使用,本篇将开始介绍在 Linux 中的自旋锁和读写锁。这三种锁分别用于在不同的应用场景之中,其中互斥锁最为常用,但是我们需要了解一下其他的锁。 对于自旋锁和读写锁都介绍了其原理以及接口使…

游戏如何对抗 IL2cppDumper逆向分析

众所周知,Unity引擎中有两种脚本编译器,分别是 Mono 和 IL2CPP 。相较于Mono,IL2CPP 具备执行效率高、跨平台支持等优势,已被大多数游戏采用。 IL2CPP 模式下,可以将游戏 C# 代码转换为 C 代码,然后编译为…

GPT-4o System Card is released

GPT-4o System Card is released, including red teaming, frontier risk evaluations, and other key practices for industrial-strength Large Language Models. https://openai.com/index/gpt-4o-system-card/ 报告链接 企业级生成式人工智能LLM大模型技术、算法及案例实战…

UE5用蓝图实现物体A始终朝向物体B |Find Look at Rotation|

非常常用的蓝图节点 |Find Look at Rotation|:获取 物体A 到 物体B 的Rotator。 Tick中将算出的Rotator设置给物体A,即可实现永远朝向物体B

C++STL之map的使用详解

简介&#xff1a;map底层实现为红黑树&#xff0c;增删查的时间复杂度&#xff1a;O(logn), key是有序的&#xff0c;默认升序 一、初始化 #include<iostream> #include<map> #include<string> using namespace std; int main() {std::map<int, std::st…

楼顶气膜羽毛球馆:城市健身新空间—轻空间

随着城市化进程的加快&#xff0c;城市土地资源愈发紧张&#xff0c;如何高效利用有限的空间成为一大挑战。楼顶气膜羽毛球馆作为一种创新的体育场馆建设方式&#xff0c;凭借其独特的优势&#xff0c;逐渐成为城市健身的新宠。它不仅有效利用了楼顶闲置空间&#xff0c;还为市…