Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

目录

0. 相关文章链接

1. 输出的选项

2. 输出模式(output mode)

2.1. Append 模式(默认)

2.2. Complete 模式

2.3. Update 模式

2.4. 输出模式总结

3. 输出接收器(output sink)

3.1. file sink

3.2. kafka sink

3.2.1. 以 Streaming 方式输出数据

3.2.2. 以 batch 方式输出数据

3.3. console sink

3.4. memory sink

3.5. foreach sink

3.6. ForeachBatch Sink


0. 相关文章链接

 Spark文章汇总 

1. 输出的选项

一旦定义了最终结果DataFrame / Dataset,剩下的就是开始流式计算。为此,必须使用返回的 DataStreamWriter Dataset.writeStream()。

需要指定一下选项:

  • 输出接收器的详细信息:数据格式,位置等。
  • 输出模式:指定写入输出接收器的内容。
  • 查询名称:可选,指定查询的唯一名称以进行标识。
  • 触发间隔:可选择指定触发间隔。如果未指定,则系统将在前一处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成而错过了触发时间,则系统将立即触发处理。
  • 检查点位置:对于可以保证端到端容错的某些输出接收器,请指定系统写入所有检查点信息的位置。这应该是与HDFS兼容的容错文件系统中的目录。

2. 输出模式(output mode)

2.1. Append 模式(默认)

        默认输出模式, 仅仅添加到结果表的新行才会输出。采用这种输出模式, 可以保证每行数据仅输出一次。在查询过程中, 如果没有使用 watermask 机制, 则不能使用聚合操作。 如果使用了 watermask 机制, 则只能使用基于 event-time 的聚合操作。watermask 用于高速 append 模式如何输出不会再发生变动的数据。 即只有过期的聚合结果才会在 Append 模式中被“有且仅有一次”的输出。

2.2. Complete 模式

每次触发, 整个结果表的数据都会被输出。 仅仅聚合操作才支持。同时该模式使用 watermask 无效。

2.3. Update 模式

        该模式在 从 spark 2.1.1 可用. 在处理完数据之后, 该模式只输出相比上个批次变动的内容(新增或修改)。如果没有聚合操作, 则该模式与 append 模式一样。如果有聚合操作, 则可以基于 watermast 清理过期的状态。

2.4. 输出模式总结

不同的查询支持不同的输出模式

3. 输出接收器(output sink)

spark 提供了几个内置的 output-sink,不同 output sink 所适用的 output mode 不尽相同:

SinkSupported Output ModesOptionsFault-tolerantNotes
File SinkAppendpath: path to the output directory, must be specified. For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for “parquet” format options see DataFrameWriter.parquet()Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
ForeachBatch SinkAppend, Update, CompleteNoneDepends on the implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) truncate: Whether to truncate the output if too long (default: true)No
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.

3.1. file sink

存储输出到目录中 仅仅支持 append 模式

需求: 把单词和单词的反转组成 json 格式写入到目录中。

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 数据计算val words: DataFrame = lines.as[String].flatMap((line: String) => {line.split("\\W+").map((word: String) => {(word, word.reverse)})}).toDF("原单词", "反转单词")// 结果输出words.writeStream.outputMode("append").format("json") // 支持 "orc", "json", "csv".option("path", "./filesink") // 输出目录.option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录.start.awaitTermination()// 关闭执行环境spark.stop()}
}

输出的数据:

{"原单词":"abc","反转单词":"cba"}

3.2. kafka sink

将 wordcount 结果写入到 kafka

写入到 kafka 的时候应该包含如下列:

ColumnType
key (optional)string or binary
value (required)string or binary
topic (optional)string

注意:

  • 如果没有添加 topic option 则 topic 列必须有.
  • kafka sink 三种输出模式都支持

3.2.1. 以 Streaming 方式输出数据

这种方式使用流的方式源源不断的向 kafka 写入数据:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 数据计算val words = lines.as[String].flatMap((_: String).split("\\W+")).groupBy("value").count().map((row: Row) => row.getString(0) + "," + row.getLong(1)).toDF("value") // 写入数据时候, 必须有一列 "value"words.writeStream.outputMode("update").format("kafka").trigger(Trigger.ProcessingTime(0)).option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置.option("topic", "update") // kafka 主题.option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录.start.awaitTermination()// 关闭执行环境spark.stop()}
}

3.2.2. 以 batch 方式输出数据

这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理. 处理完毕后程序退出:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._val wordCount: DataFrame = spark.sparkContext.parallelize(Array("hello hello abc", "abc, hello")).toDF("word").groupBy("word").count().map(row => row.getString(0) + "," + row.getLong(1)).toDF("value") // 写入数据时候, 必须有一列 "value"wordCount.write // batch 方式.format("kafka").option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092") // kafka 配置.option("topic", "update") // kafka 主题.save()// 关闭执行环境spark.stop()}
}

3.3. console sink

主要用于测试数据输出

3.4. memory sink

该 sink 也是用于测试, 将其统计结果全部输入内中指定的表中, 然后可以通过 sql 与从表中查询数据。

如果数据量非常大, 可能会发送内存溢出:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestamp
import java.util.{Timer, TimerTask}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).loadval words: DataFrame = lines.as[String].flatMap((_: String).split("\\W+")).groupBy("value").count()val query: StreamingQuery = words.writeStream.outputMode("complete").format("memory") // memory sink.queryName("word_count") // 内存临时表名.start// 测试使用定时器执行查询表val timer: Timer = new Timer(true)val task: TimerTask = new TimerTask {override def run(): Unit = spark.sql("select * from word_count").show}timer.scheduleAtFixedRate(task, 0, 2000)query.awaitTermination()// 关闭执行环境spark.stop()}
}

3.5. foreach sink

foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出;把 wordcount 数据写入到 mysql。

注意(需要在依赖中添加MySQL的驱动依赖):

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>

建表语句如下所示:

create database ss;
use ss;
create table word_count
(word  varchar(255) primary key not null,count bigint                   not null
);

代码示例如下:

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Timer, TimerTask}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).loadval wordCount: DataFrame = lines.as[String].flatMap((_: String).split("\\W+")).groupBy("value").count()val query: StreamingQuery = wordCount.writeStream.outputMode("update")// 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现. 每个批次的所有分区都会创建 ForeeachWriter 实例.foreach(new ForeachWriter[Row] {var conn: Connection = _var ps: PreparedStatement = _var batchCount = 0// 一般用于 打开链接. 返回 false 表示跳过该分区的数据,override def open(partitionId: Long, epochId: Long): Boolean = {println("open ..." + partitionId + "  " + epochId)Class.forName("com.mysql.jdbc.Driver")conn = DriverManager.getConnection("jdbc:mysql://hadoop201:3306/ss", "root", "aaa")// 插入数据, 当有重复的 key 的时候更新val sql = "insert into word_count values(?, ?) on duplicate key update word=?, count=?"ps = conn.prepareStatement(sql)conn != null && !conn.isClosed && ps != null}// 把数据写入到连接override def process(value: Row): Unit = {println("process ...." + value)val word: String = value.getString(0)val count: Long = value.getLong(1)ps.setString(1, word)ps.setLong(2, count)ps.setString(3, word)ps.setLong(4, count)ps.execute()}// 用户关闭连接override def close(errorOrNull: Throwable): Unit = {println("close...")ps.close()conn.close()}}).startquery.awaitTermination()// 关闭执行环境spark.stop()}
}

3.6. ForeachBatch Sink

ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据。将统计结果同时输出到本地文件和 mysql 中。

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.{Properties, Timer, TimerTask}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 设置数据源,并接收数据val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).loadval wordCount: DataFrame = lines.as[String].flatMap(_.split("\\W+")).groupBy("value").count()val props: Properties = new Properties()props.setProperty("user", "root")props.setProperty("password", "aaa")val query: StreamingQuery = wordCount.writeStream.outputMode("complete").foreachBatch((df: Dataset[Row], batchId: Long) => { // 当前分区id, 当前批次idif (df.count() != 0) {df.cache()df.write.json(s"./$batchId")df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop201:3306/ss", "word_count", props)}}).start()query.awaitTermination()// 关闭执行环境spark.stop()}
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/85127.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

R语言4_安装BayesSpace

环境Ubuntu22/20, R4.1 你可能会报错说你的R语言版本没有这个库&#xff0c;但其实不然。这是一个在Bioconductor上的库。 同时我也碰到了这个问题&#xff0c;ERROR: configuration failed for package systemfonts’等诸多类似问题&#xff0c;下面的方法可以一并解决。 第…

数据结构刷题训练——链表篇(二)

目录 前言 1.题目一&#xff1a;链表分割 1.1 思路 1.2 分析 1.3 题解 2. 题目二&#xff1a;相交链表 2.1 思路 2.2 分析 2.3 题解 3. 题目三&#xff1a;环形链表 3.1 思路 3.2 分析 3.3 题解 总结 前言 本期继续分享链表相关的OJ题目&#xff0c;在这个专栏博客…

elasticsearch简单入门语法

基本操作 创建不同的分词器 ik_smart&#xff1a; 极简分词 &#xff1b; ik_max_word: 最细力再度分词 基本的rest命令 methodurl地址描述PUTlocalhost:9200/索引名称/类型名称/文档id创建文档&#xff08;指定文档id&#xff09;POSTlocalhost:9200/索引名称/类型名称创建文…

蝉妈妈:2023年抖音电商半年报(附下载)

关于报告的所有内容&#xff0c;公众【营销人星球】获取下载查看 核心观点 平台流量竞争从愈发激烈变为趋于愈加缓和商家直攝总时长与观众君播总时长的总体趋势并没有愈加激烈&#xff0c;从23年上半年各自流量的同比增速来看&#xff0c;观众看摄总时长增速高于商家直攝总时…

电脑合上盖子无线网络不会断开

控制面板\硬件和声音\电源选项\系统设置 最终选择不会采取任何操作 选择不会采取任何操作

Cocos Creator的 Cannot read property ‘applyForce‘ of undefined报错

序&#xff1a; 1、博主是看了这个教程操作的时候出的bug>游戏开发 | 17节课学会如何用Cocos Creator制作3D跑酷游戏 | P9 代码控制对象移动_哔哩哔哩_bilibili 2、其实问题不是出在代码上&#xff0c;但是发现物体就是不平移 3、node全栈的资料》node全栈框架 正文…

逆向破解学习-雷电星海战歌

apk 雷电星海战歌 https://download.csdn.net/download/AdrianAndroid/88200826 安装apk&#xff0c;并试玩 # 通过关键字搜索jad 找到统一支付接口 找到匿名内部类的名称 Hook代码 public class HookComAstPlane extends HookImpl {Overridepublic String packageNam…

竞赛项目 深度学习手势识别算法实现 - opencv python

文章目录 1 前言2 项目背景3 任务描述4 环境搭配5 项目实现5.1 准备数据5.2 构建网络5.3 开始训练5.4 模型评估 6 识别效果7 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习手势识别算法实现 - opencv python 该项目较为新颖…

LeetCode 31题:下一个排列

目录 题目 思路 代码 题目 整数数组的一个 排列 就是将其所有成员以序列或线性顺序排列。 例如&#xff0c;arr [1,2,3] &#xff0c;以下这些都可以视作 arr 的排列&#xff1a;[1,2,3]、[1,3,2]、[3,1,2]、[2,3,1] 。 整数数组的 下一个排列 是指其整数的下一个字典序…

Jenkins 中 shell 脚本执行失败却不自行退出

Jenkins 中 执行 shell 脚本时&#xff0c;有时候 shell 执行失败了&#xff0c;或者判断结果是错误的&#xff0c;但是 Jenkins 执行完成后确提示成功 success 。 此时&#xff0c;可以通过条件判断来解决这个问题&#xff0c;让 Jenkins 强制退出并提示执行失败 failed 。 …

Nginx使用proxy_cache指令设置反向代理缓存静态资源

场景 CentOS7中解压tar包的方式安装Nginx&#xff1a; CentOS7中解压tar包的方式安装Nginx_centos7 tar文件 怎么load_霸道流氓气质的博客-CSDN博客 参考上面流程实现搭建Nginx的基础上&#xff0c;实现静态资源的缓存设置。 注意上面安装时的目录是在/opt/nginx目录下&…

win10 + VS2022 安装opencv C++

最近需要用到C opencv&#xff0c;看了很多帖子都需要自己编译opencv源码。为避免源码编译&#xff0c;可以使用VS来配置opencv C。下面是主要过程&#xff1a; 目录 1. 从官网下载 opencv - Get Started - OpenCV 2. 点击这个exe文件进行安装 3. 配置环境变量 4. VS中的项…

java spring cloud 企业电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展 tbms

​ 项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以…

Word转PDF工具哪家安全?推荐好用的文件格式转换工具

Word文档是我们最常见也是最常用的办公软件&#xff0c;想必大家都知道了Word操作起来十分的简单&#xff0c;而且功能也是比较齐全的。随着科技的不断进步&#xff0c;如今也是有越来越多类型的办公文档&#xff0c;PDF就是其中之一&#xff0c;那么word转pdf怎么转?Word转PD…

DSP学习笔记

TI公司提供的c/c编译器&#xff0c;可以将其变成dsp语言。char类型本来是8位&#xff0c;在dsp里面是16位&#xff0c;int也是16位&#xff0c;long才是32位&#xff0c;float也是32位&#xff0c;enum是16位&#xff0c;double32位&#xff0c;long double是32位&#xff0c;p…

客户端脚本安全

客户端脚本安全 白帽子讲web安全 ———— a.了解web安全测试的基本知识 b.掌握前端的脚本安全知识&#xff0c;了解基本的前端安全测试条目&#xff0c;如同源策略、xss攻击测试、CSRF测试、点击劫持测试 c.webinsepct nessus 绿盟扫描 数据流 输入输出 浏览器安全 同源…

【HCIP】重发布实验

题目&#xff1a; 配置&#xff1a; R1 //配置ip地址 [r1]int g0/0/0 [r1-GigabitEthernet0/0/0]ip add 12.1.1.1 24 [r1-GigabitEthernet0/0/0]int g0/0/1 [r1-GigabitEthernet0/0/1]ip add 13.1.1.1 24 [r1-GigabitEthernet0/0/1]int lo0 [r1-LoopBack0]ip add 1.1.1.1 24 /…

MySQL—日志

这里写目录标题 undo logundo log的作用undo log页记录的是什么 Buffer Pool为什么需要Buffer PoolBuffer Pool缓存什么 redo log什么是redo logredo log的作用redo log什么时候刷盘undo和redo的区别 binlogbinlog 作用redo log和binlog区别如果数据数据被删了&#xff0c;能用…

【redis】redis的认识和安装

目录 1.redis是什么2.Redis的特点3.安装redis4.设置远程连接4.1 开启隧道4.2 可视化客户端连接4.3 开启防火墙 5.redis常见数据类型5.1 redis的一些全局命令5.2 数据结构 6. redis的典型应用---缓存&#xff08;cache&#xff09;6.1 使用redis做缓存6.2 缓存穿透&#xff0c;缓…

并发——Atomic 原子类总结

文章目录 1 Atomic 原子类介绍2 基本类型原子类2.1 基本类型原子类介绍2.2 AtomicInteger 常见方法使用2.3 基本数据类型原子类的优势2.4 AtomicInteger 线程安全原理简单分析 3 数组类型原子类3.1 数组类型原子类介绍3.2 AtomicIntegerArray 常见方法使用 4 引用类型原子类4.1…