Flink_DataStreamAPI_输出算子Sink

Flink_DataStreamAPI_输出算子Sink

  • 1连接到外部系统
  • 2输出到文件
  • 3输出到Kafka
  • 4输出到MySQL(JDBC)
  • 5自定义Sink输出

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

在这里插入图片描述

1连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,

stream.sinkTo()

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:
在这里插入图片描述
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
在这里插入图片描述
除此以外,就需要用户自定义实现sink连接器了。

2输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
-行编码: FileSink.forRowFormat(basePath,rowEncoder)。
-批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

示例:

public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

3输出到Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码
输出无key的record:

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key,可以自定义序列化器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

(4)运行代码,在Linux主机启动一个消费者,查看是否收到数据

[atguigu@hadoop102 ~]$ 
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

4输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories><repository><id>apache-snapshots</id><name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url></repository>
</repositories>

添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

	<mirror><id>aliyunmaven</id><mirrorOf>*,!apache-snapshots</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror>

(2)启动MySQL,在test库下建表ws

mysql>     
CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:*    第一个参数: 执行的sql,一般就是 insert into*    第二个参数: 预编译sql, 对占位符填充值*    第三个参数: 执行选项 ---》 攒批、重试*    第四个参数: 连接选项 ---》 url、用户名、密码*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

5自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/470587.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

01-Ajax入门与axios使用、URL知识

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

HarmonyOS Next星河版笔记--界面开发(4)

布局 1.1.线性布局 线性布局通过线性容器column和row创建 column容器&#xff1a;子元素垂直方向排列row容器&#xff1a;子元素水平方向排列 1.1.1.排布主方向上的对齐方式&#xff08;主轴&#xff09; 属性&#xff1a;.justifyContent&#xff08;枚举FlexAlign&#…

webpack loader全解析,从入门到精通(10)

webpack 的核心功能是分析出各种模块的依赖关系&#xff0c;然后形成资源列表&#xff0c;最终打包生成到指定的文件中。更多复杂的功能需要借助 webpack loaders 和 plugins 来完成。 1. 什么是 Loader Loader 本质上是一个函数&#xff0c;它的作用是将某个源码字符串转换成…

如何用WordPress和Shopify提升SEO表现?

选择合适的建站程序对于SEO优化非常重要。目前&#xff0c;WordPress和Shopify是两种备受推崇的建站平台&#xff0c;各有优势。 WordPress最大的优点是灵活性。它支持大量SEO插件&#xff0c;帮助你调整元标签、生成站点地图、优化内容结构等。这些功能让你能够轻松地提升网站…

ArcGIS Pro属性表乱码与字段名3个汉字解决方案大总结

01 背景 我们之前在使用ArcGIS出现导出Excel中文乱码及shp添加字段3个字被截断的情况&#xff0c;我们有以下应对策略&#xff1a; 推荐阅读&#xff1a;ArcGIS导出Excel中文乱码及shp添加字段3个字被截断&#xff1f; 那如果我们使用ArGIS Pro出现上述问题&#xff0c;该如何…

24/11/13 算法笔记<强化学习> DQN算法

DQN算法的主要特点包括&#xff1a; 神经网络代替Q表&#xff1a;在传统的Q学习中&#xff0c;需要维护一个Q表来存储每个状态-动作对的Q值。而在DQN中&#xff0c;使用神经网络来近似这些Q值&#xff0c;这使得算法能够处理具有大量状态和动作的问题。 经验回放&#xff08;E…

Blender进阶:图像纹理节点和映射节点

13 图像纹理节点 13.1 图像纹理节点 图像纹理节点&#xff0c;用于加载一张贴图 加载图片后&#xff0c;可以从图片上取得一个像素点。 输入&#xff1a;一个坐标矢量 输出&#xff1a;该坐标的像素颜色 演示&#xff1a;使用合并xyz节点来指定坐标。。 13.2 多种贴图 一…

MYSQL 库,表 基本操作

相关的两个编码集(简单了解即可) 1.数据库编码集 :对将要存储的数据进行编码 2.数据库校验集:对将要执行的操作&#xff08;增删查改&#xff09;数据是对数据编码的校验&#xff0c;本质也是一种读取数据库中数据库采用的一种编码格式。 总结&#xff1a;数据库无论对数据做…

万字长文分析函数式编程

目录 一.认识函数式编程 一、函数式编程的定义 二、函数式编程的思想 三、函数式编程的特点 四、函数式编程的应用 二.Lambda表达式 三.Stream流 3.1 创建流对象 3.2 注意事项 3.3 Stream流的中间操作 filter map distinct sorted limit skip flatMap 3.4 St…

DOM 规范 — MutationObserver 接口

前言 最近在重学 JavaScript 中&#xff0c;再一次接触到了 MutationObserver 内容&#xff0c;接着联想到了 Vue 源码中有使用过这个接口&#xff0c;因此觉得有必要对 MutationObserver 接口进行相关了解和学习。 下面是 vue 源码中关于 MutationObserver 接口使用的代码&a…

灰狼优化算法

一、简介 1.1 灰狼优化算法-Grey Wolf Optimizer 通过模拟灰狼群体捕食行为&#xff0c;基于狼群群体协 作的机制来达到优化的目的。&#xff27;&#xff37;&#xff2f;算法具有结构简单、需 要调节的参数少、容易实现等特点&#xff0c;其中存在能够自适应调整 的收敛因子…

AI 写作(五)核心技术之文本摘要:分类与应用(5/10)

一、文本摘要&#xff1a;AI 写作的关键技术 文本摘要在 AI 写作中扮演着至关重要的角色。在当今信息爆炸的时代&#xff0c;人们每天都被大量的文本信息所包围&#xff0c;如何快速有效地获取关键信息成为了一个迫切的需求。文本摘要技术正是为了解决这个问题而诞生的&#x…

【 ElementUI 组件Steps 步骤条使用新手详细教程】

本文介绍如何使用 ElementUI 组件库中的步骤条组件完成分步表单设计。 效果图&#xff1a; 基础用法​ 简单的步骤条。 设置 active 属性&#xff0c;接受一个 Number&#xff0c;表明步骤的 index&#xff0c;从 0 开始。 需要定宽的步骤条时&#xff0c;设置 space 属性即…

尽量通俗易懂地概述.Net U nity跨语言/跨平台相关知识

本文参考来自唐老狮,Unity3D高级编程:主程手记,ai等途径 仅作学习笔记交流分享 目录 1. .Net是什么? 2. .Net框架的核心要点? 跨语言和跨平台 .Net x Unity跨平台发展史 Net Framework 2002 Unity跨平台之 Mono 2004 Unity跨平台之 IL2CPP 2015 二者区别 .NET Core …

基于yolov8、yolov5的番茄成熟度检测识别系统(含UI界面、训练好的模型、Python代码、数据集)

摘要&#xff1a;番茄成熟度检测在农业生产及质量控制中起着至关重要的作用&#xff0c;不仅能帮助农民及时采摘成熟的番茄&#xff0c;还为自动化农业监测提供了可靠的数据支撑。本文介绍了一款基于YOLOv8、YOLOv5等深度学习框架的番茄成熟度检测模型&#xff0c;该模型使用了…

应用程序部署(IIS的相关使用,sql server的相关使用)

数据服务程序&#xff08;API&#xff09;部署 1、修改配置文件 打开部署包中的web.config配置文件&#xff0c;确认数据库登录名和密码正确 修改ip为电脑IP&#xff08;winR输入cmd&#xff0c;输入ipconfig&#xff0c;IPv4对应的就是本机IP&#xff09; 2、打开IIS&#x…

网页版五子棋——对战模块(服务器端开发②)

前一篇文章&#xff1a;网页版五子棋——对战模块&#xff08;服务器端开发①&#xff09;-CSDN博客 项目源代码&#xff1a;Java: 利用Java解题与实现部分功能及小项目的代码集合 - Gitee.com 目录 前言 一、创建并注册 GameAPI 类 1.创建 GameAPI 类 2.注册 GameAPI 类 …

STM32单片机WIFI语音识别智能衣柜除湿消毒照明

实践制作DIY- GC0196-WIFI语音识别智能衣柜 一、功能说明&#xff1a; 基于STM32单片机设计-WIFI语音识别智能衣柜 二、功能介绍&#xff1a; STM32F103C系列最小系统板LCD1602显示器ULN2003控制的步进电机&#xff08;柜门开关&#xff09;5V加热片直流风扇紫外消毒灯DHT11…

网络远程操控

1.给两个设备配上ip地址让他们能通 2.开启远程管理功能&#xff0c;打开telnet 3.创建远程管理的账号和密码&#xff0c;账号权限 输入system-view进入视图&#xff0c;不敲这个命令不能进行配置 配好ip后进入AR1ping一下AR2的ip看看通不通&#xff0c;接着进入AR2开启telnet权…

【go从零单排】Timer、Epoch 时间函数

&#x1f308;Don’t worry , just coding! 内耗与overthinking只会削弱你的精力&#xff0c;虚度你的光阴&#xff0c;每天迈出一小步&#xff0c;回头时发现已经走了很远。 &#x1f4d7;概念 在 Go 语言中&#xff0c;time.Timer 是一个用于在指定时间后执行操作的计时器。…