StructuredStreaming (一)

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc2、启动 nc -lk 9999

案例:wordcount

import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 处理# 方式一:使用dsl语法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的写法 后续会写query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([#     StructField(name="stu_id", dataType=IntegerType(), nullable=False),#     StructField(name="subject_name", dataType=StringType(), nullable=True),#     StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/471194.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信号-3-信号处理

main 信号捕捉的操作 sigaction struct sigaction OS不允许信号处理方法进行嵌套:某一个信号正在被处理时,OS会自动block改信号,之后会自动恢复 同理,sigaction.sa_mask 为捕捉指定信号后临时屏蔽的表 pending什么时候清零&…

软件工程师简历(精选篇)

【#软件工程师简历#】 一份专业而精准的软件工程师简历,不仅能够全面展示技术实力和项目经验,更是赢得理想工作机会的重要敲门砖。那么,如何撰写一份令人印象深刻的软件工程师简历呢?以下是幻主简历整理的软件工程师简历&#xf…

基于springboot的汽车租赁管理系统的设计与实现

项目描述 临近学期结束,还是毕业设计,你还在做java程序网络编程,期末作业,老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下,你想解决的问…

vscode远程连接服务器并启用tmux挂载进程

使用vscode连接远程服务器,有时候由于主机问题,比如中断,断网,超时,重启,关机等等情况,导致进程中断,如果是一个长时间的进程,会很麻烦,毕竟不能长时间一直盯…

设计模式之装饰器模式(SSO单点登录功能扩展,增加拦截用户访问方法范围场景)

前言: 两个本想描述一样的意思的词,只因一字只差就让人觉得一个是好牛,一个好搞笑。往往我们去开发编程写代码时也经常将一些不恰当的用法用于业务需求实现中,但却不能意识到。一方面是由于编码不多缺少较大型项目的实践&#xff…

鸿蒙HarmonyOS 地图不显示解决方案

基于地图的开发准备已完成的情况下,地图还不显式的问题 首先要获取设备uuid 获取设备uuid 安装DevEco Studio的路径下 有集成好的hdc工具 E:\install_tools\DevEco Studio\sdk\default\openharmony\toolchains 这个路径下打开cmd运行 进入“设置 > 关于手机…

【C语言】值传递和地址传递

值传递 引用传递(传地址,传引用)的区别 传值,是把实参的值赋值给行参 ,那么对行参的修改,不会影响实参的值。 传地址,是传值的一种特殊方式,只是他传递的是地址,不是普通…

C语言入门到精通(第六版)——第十六章

16、网络套接字编程 16.1、计算机网络基础 计算机网络技术是计算机技术和通信技术相结合的产物,代表计算机的一个重要发展方向。了解计算机的网络结构,有助于用户开发网络应用程序。 16.1.1、IP地址 为了使网络上的计算机能够彼此识别对方,…

Cyberchef配合Wireshark提取并解析HTTP/TLS流量数据包中的文件

本文将介绍一种手动的轻量级的方式,还原HTTP/TLS协议中传输的文件,为流量数据包中的文件分析提供帮助。 如果捕获的数据包中存在非文本类文件,例如png,jpg等图片文件,或者word,Excel等office文件异或是其他类型的二进…

记录使用documents4j来将word文件转化为pdf文件

本文记录使用documents4j来将word文件转化为pdf文件 文章目录 程序实例maven导入代码实现程序结果 本文小结 程序实例 maven导入 <!--word转pdf--><dependency><groupId>com.documents4j</groupId><artifactId>documents4j-local</artifactI…

SQL面试题——奔驰SQL面试题 车辆在不同驾驶模式下的时间

SQL面试题——奔驰SQL面试题 我们的表大致如下 CREATE TABLE signal_log( vin STRING COMMENTvehicle frame id, signal_name STRING COMMENTfunction name, signal_value STRING COMMENT signal value , ts BIGINT COMMENTevent timestamp, dt STRING COMMENTformat yyyy-mm…

使用 unicorn 和 capstone 库来模拟 ARM Thumb 指令的执行(一)

import binascii import unicorn import capstonedef printArm32Regs(mu):for i in range(66,78):print("R%d,value:%x"%(i-66,mu.reg_read(i)))def testhumb():CODE b\x1C\x00\x0A\x46\x1E\x00"""MOV R3, R0 的机器码&#xff1a;0x1C 0x00&#xf…

WordPress 6.7 “Rollins”发布

每个 WordPress 版本都会向一位在音乐界留下不可磨灭印记的艺术家致敬。WordPress 6.7 的代号为“Rollins”&#xff0c;旨在向传奇爵士萨克斯演奏家桑尼罗林斯致敬。罗林斯是爵士乐界最伟大的即兴演奏家和先驱之一&#xff0c;他以精湛的技术、创新精神和无畏的音乐表达方式影…

844.比较含退格的字符串

java用 O&#xff08;1&#xff09;空间这个方法&#xff0c;容易挺多bug的… O&#xff08;1&#xff09;空间 #&#xff1a;删除前一个字符 》 从后面开始判断&#xff08;这样可以用跳过的思想&#xff09;不能使用两次 i- - 来处理 # 的操作&#xff0c;会造成误删了前面…

WLAN消失或者已连接但是访问不了互联网

目录 1、WLAN已连接但是访问不了互联网 2、WLAN图标消失 今晚电脑突然连不上网了&#xff0c;重启试了好多种办法都没有用。 1、WLAN已连接但是访问不了互联网 这个的问题很多&#xff0c;建议直接网络重置&#xff0c;即将网络驱动全部删除&#xff0c;然后重新安装。 首先…

Linux源码阅读笔记-V4L2框架基础介绍

V4L2视频设备驱动基础 V4L2 是专门为 Linux 设备设计的整套视频框架&#xff08;其主要核心在 Linux 内核&#xff0c;相当于 Linux 操作系统上层的视频源捕获驱动框架&#xff09;。为上层访问系统底层的视频设备提供一个统一的标准接口。V4L2 驱动框架能够支持多种类型设备&…

C 语言 【模拟实现内存库函数】

1、memcpy memcpy函数是C/C语言中的一个用于内存复制的函数&#xff0c;声明在 string.h 中&#xff08;C是 cstring&#xff09;。其原型是&#xff1a; void * memcpy ( void * destination, const void * source, size_t num ); 其中&#xff0c;destination表示的是要拷贝…

【大数据学习 | flume】flume的概述与组件的介绍

1. flume概述 Flume是cloudera(CDH版本的hadoop) 开发的一个分布式、可靠、高可用的海量日志收集系统。它将各个服务器中的数据收集起来并送到指定的地方去&#xff0c;比如说送到HDFS、Hbase&#xff0c;简单来说flume就是收集日志的。 Flume两个版本区别&#xff1a; ​ 1&…

01:(手撸HAL+CubeMX)时钟篇

&#xff08;手撸HALCubeMX&#xff09;时钟篇 1、对SystemInit函数的分析2、使用HSI将总线时钟配置为最高频率3、使用HSE将总线时钟配置为最高频率4、使用Cube配置时钟树的参数5、对HAL_Init函数分析6、对系统定时器中断服务函数分析 有关时钟树和上电/复位的基础知识请参考“…

大数据新视界 -- 大数据大厂之 Impala 存储格式转换:从原理到实践,开启大数据性能优化星际之旅(下)(20/30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…