SparkStreaming_window_sparksql_reids

1.5 window

滚动窗口+滑动窗口

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

  1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

  2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的cron表达式吧。

案例实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/*** 统计,截止到目前为止出现的每一个key的次数* window窗口操作,每个多长M时间,通过过往N长时间内产生的数据* M就是滑动长度sliding interval* N就是窗口长度window length*/
object Demo05_WCWithWindow {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCountUpdateStateByKey").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val ssc = new StreamingContext(conf, duration)val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
​val ret:DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_+_,windowDuration = Seconds(batchInterval * 3),slideDuration = Seconds(batchInterval * 2))
​ret.print()
​ssc.start()ssc.awaitTermination()}
}

1.6 SparkSQL和SparkStreaming的整合案例

Spark最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:top3的商品排序: 最新的top3

这里就是基于updatestateByKey,统计截止到目前为止的不同品类下的商品销量top3

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/*** SparkStreaming整合SparkSQL的案例之,热门品类top3排行* 输入数据格式:* id brand category* 1 huwei watch* 2 huawei phone**/
object Demo06_SQLWithStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingIntegerationSQL").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val spark = SparkSession.builder().config(conf).getOrCreate()val ssc = new StreamingContext(spark.sparkContext, duration)ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk-1")val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)//001 mi moblieval pairs:DStream[(String, Int)] = lines.map(line => {val fields = line.split("\\s+")if(fields == null || fields.length != 3) {("", -1)} else {val brand = fields(1)val category = fields(2)(s"${category}_${brand}", 1)}}).filter(t => t._2 != -1)
​val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
​usb.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {//category_brand countimport spark.implicits._val df = rdd.map{case (cb, count) => {val category = cb.substring(0, cb.indexOf("_"))val brand = cb.substring(cb.indexOf("_") + 1)(category, brand, count)}}.toDF("category", "brand", "sales")
​df.createOrReplaceTempView("tmp_category_brand_sales")val sql ="""|select|  t.category,|  t.brand,|  t.sales,|  t.rank|from (|  select|    category,|    brand,|    sales,|    row_number() over(partition by category order by sales desc) rank|  from tmp_category_brand_sales|) t|where t.rank < 4|;""".stripMarginspark.sql(sql).show()}})
​ssc.start()ssc.awaitTermination()}
​def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {Option(seq.sum + option.getOrElse(0))}
}

1.7 SparkStreaming整合Reids

//将实时结果写入Redis中
dStream.foreachRDD((w,c)=>{val jedis = new Jedis("192.168.10.101", 6379)   //抽到公共地方即可jedis.auth("root")jedis.set(w.toString(),c.toString())  //一个key对应多个值,可以考虑hset
})

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/228006.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式——行为型模式

模板方法模式 行为型模式用于描述程序在运行时复杂的流程控制&#xff0c;即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务&#xff0c;它涉及算法与对象间职责的分配。 行为型模式分为类行为模式和对象行为模式&#xff0c;前者采用继承机制来在类间…

PHP序列化总结3--反序列化的简单利用及案例分析

反序列化中生成对象里面的值&#xff0c;是由反序列化里面的值决定&#xff0c;与原类中预定义的值的值无关&#xff0c;穷反序列化的对象可以使用类中的变量和方法 案例分析 反序列化中的值可以覆盖原类中的值 我们创建一个对象&#xff0c;对象创建的时候触发了construct方…

基于QT开发的温室气体数据记录软件

1、概述 温室气体分析仪数据记录软件用于实现温室气体分析仪数据的获取与存储&#xff0c;阀箱数据的获取与存储、冷阱数据的获取与存储、采样单元数据的获取与存储、阀箱和采样单元的远程操作以及系统功能的管理。其主操作界面如下&#xff1a; 上述软件界面分为2各区域&…

【Linux】内核编译 镜像制作

文章目录 一、Ubuntu内核编译1.1 为什么自己编译内核1.2 Ubuntu 内核源码下载1.21 内核的作用1.22 Linux内核与ubuntu内核1.23 Ubuntu内核源码获取 1.3 在Windows系统下编译ubuntu内核1.4 在Linux系统下编译ubuntu内核 二、镜像制作 一、Ubuntu内核编译 1.1 为什么自己编译内核…

IIS服务器发布PHP网站

IIS服务器&#xff0c;相信开发者都不会陌生&#xff0c;它的英文全称是Internet Information Services&#xff0c;是由微软公司提供的基于运行Microsoft Windows的互联网基本服务&#xff0c;常用于Windows系统的Web项目部署&#xff0c;本篇以PHP项目为例&#xff0c;讲解如…

Qt 中使用 MySQL 数据库保姆级教程(上)

作者&#xff1a;billy 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 前言 在 Qt 中默认只搭载了 QSqlLite 数据库驱动&#xff0c;若要使用其他数据库需要自己下载数据库&#xff0c;并将数据库驱动加载到…

GitOps实践指南:GitOps能为我们带来什么?

Git&#xff0c;作为开发过程中的核心工具&#xff0c;提供了强大的版本控制功能。即便在写代码的时候稍微手抖一下&#xff0c;我们也能通过 Git 的差异对比&#xff08;diff&#xff09;轻松追踪到庞大工程中的问题&#xff0c;确保代码的准确与可靠。这种无与伦比的自省能力…

【小沐学Python】Python实现免费天气预报获取(OpenWeatherMap)

文章目录 1、简介1.1 工具简介1.2 费用1.3 注册1.4 申请key 2、接口说明2.1 One Call 3.02.2 Current Weather and Forecasts collection2.2.1 API 调用2.2.2 API 参数 2.3 Historical Weather collection2.4 Weather Maps collection2.5 Other weather APIs 3、接口测试3.1 例…

SpringBoot集成支付宝,看这一篇就够了。

前 言 在开始集成支付宝支付之前&#xff0c;我们需要准备一个支付宝商家账户&#xff0c;如果是个人开发者&#xff0c;可以通过注册公司或者让有公司资质的单位进行授权&#xff0c;后续在集成相关API的时候需要提供这些信息。 下面我以电脑网页端在线支付为例&#xff0c;介…

HTML教程(1)——概述和第一个网页

一、什么是HTML HTML 是用来描述网页的一种语言。 HTML 指的是超文本标记语言 (Hyper Text Markup Language)HTML 不是一种编程语言&#xff0c;而是一种标记语言 (markup language)标记语言是一套标记标签 (markup tag)HTML 使用标记标签来描述网页 二、什么是HTML 标签 H…

Java项目:102SSM汽车租赁系统

博主主页&#xff1a;Java旅途 简介&#xff1a;分享计算机知识、学习路线、系统源码及教程 文末获取源码 一、项目介绍 汽车租赁系统基于SpringSpringMVCMybatis开发&#xff0c;系统使用shiro框架做权限安全控制&#xff0c;超级管理员登录系统后可根据自己的实际需求配角色…

Kasada p.js (x-kpsdk-cd、x-kpsdk-cd、integrity)

提供x-kpsdk-cd的API服务 详细请私信~ 可试用~ 一、简述 integrity是通过身份验证Kasada检测机器人流量后获得的一个检测结果&#xff08;数据完整性&#xff09; x-kpsdk-cd 是经过编码计算等等获得。当你得到正确的解决验证码值之后&#xff0c;解码会看到如下图 二、cook…

【Pytorch】学习记录分享8——PyTorch自然语言处理基础-词向量模型Word2Vec

【Pytorch】学习记录分享7——PyTorch自然语言处理基础-词向量模型Word2Vec 1. 词向量模型Word2Vec)1. 如何度量这个单词的&#xff1f;2.词向量是什么样子&#xff1f;3.词向量对应的热力图&#xff1a;4.词向量模型的输入与输出![在这里插入图片描述](https://img-blog.csdni…

Volume Control 2

为游戏添加音乐和音效总是需要一些编码来设置一个系统来控制、显示和保存应用程序的音量设置。 音量控制的设计是为了立即为您设置这些内容,让您有更多时间专注于最重要的事情——制作出色的游戏! 在版本2中,我们对系统进行了重新设计,使其更加模块化、灵活,甚至更易于使用…

RHCE9学习指南 第13章 硬盘管理

新的硬盘首先需要对硬盘进行分区和格式化&#xff0c;首先了解一下硬盘的结构&#xff0c;如图13-1所示。 图13-1 磁盘上的磁道和扇区 硬盘的磁盘上有一个个的圈&#xff0c;每两个圈组成一个磁道。从中间往外发射线&#xff0c;把每个磁道分成一个个的扇区&#xff0c;每个扇…

Qt基础之四十五:Qt国际化(I18N)

国际化的英文表述为Internationalization,通常简写为I18N(首尾字母加中间的字符数),这种奇葩的缩写方式,让我想起了NBA球星“字母哥”。 下面看下Qt实现的动态语言切换效果。 一.效果 二.源码 QHSettingDialog.h #ifndef QHSETTINGDIALOG_H #define QHSETTINGDIALOG_H#…

Grafana监控数据可视化

Grafana 是一个可视化面板&#xff0c;有着非常漂亮的图表和布局展示&#xff0c;功能齐全的度量仪表盘和图形编辑器&#xff0c;支持 Graphite、zabbix、InfluxDB、Prometheus、OpenTSDB、Elasticsearch 等作为数据源&#xff0c;比 Prometheus 自带的图表展示功能强大太多&am…

【MySQL】主从异步复制配置

您好&#xff0c;我是码农飞哥&#xff08;wei158556&#xff09;&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f4aa;&#x1f3fb; 1. Python基础专栏&#xff0c;基础知识一网打尽&#xff0c;9.9元买不了吃亏&#xff0c;买不了上当。 Python从入门到精…

删除数据后, redis 内存占用还是很高怎么办?

现象&#xff1a; reids 做了数据删除&#xff0c;数据量不大&#xff0c;使用 top 命令看&#xff0c;发现还是占用大量内存 原因&#xff1a; 1.redis 底层内存根据内存分配器分配&#xff0c;不会立刻释放 2.redis 释放的内存空间不是连续的&#xff0c;存在碎片 内存碎…

软件开发新手用哪个IDE比较好?软件开发最好的IDE都在这!

目录 IDES 的优点 最佳编程 IDE 列表 Java 开发的流行集成开发环境 JetBrains 的 IntelliJ IDEA NetBeans 适用于 C/ C、C# 编程语言的最佳 IDE Visual Studio 和 Visual Studio 代码 Eclipse PHP 开发的最佳 IDE PHPStorm Sublime Text Atom JavaScript 的顶级 I…