[flink 实时流基础] 输出算子(Sink)

学习笔记
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
image.png


文章目录

      • **连接到外部系统**
      • **输出到文件**
      • 输出到 Kafka
      • 输出到 mysql
      • 自定义 sink

连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/
image.png

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
image.png
除此以外,就需要用户自定义实现sink连接器了。

输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

输出到 Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key,可以自定义序列化器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

输出到 mysql

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories><repository><id>apache-snapshots</id><name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url></repository>
</repositories>

添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

<mirror><id>aliyunmaven</id><mirrorOf>*,!apache-snapshots</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url>
</mirror>

(2)启动MySQL,在test库下建表ws

mysql>
CREATE TABLE ws (
id varchar(100) NOT NULL,
ts bigint(20) DEFAULT NULL,
vc int(11) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

public class SinkMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:*    第一个参数: 执行的sql,一般就是 insert into*    第二个参数: 预编译sql, 对占位符填充值*    第三个参数: 执行选项 ---》 攒批、重试*    第四个参数: 连接选项 ---》 url、用户名、密码*/
SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build()
);sensorDS.addSink(jdbcSink);env.execute();
}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

自定义 sink

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/294211.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

128:忆往昔,迎春来

机缘 时间真的是过得好快啊&#xff0c;128&#xff0c;距离我的第一篇博客已经128天了&#xff0c;从大一上的冬天&#xff0c;到大一下的春夏&#xff0c;从一个初入大学的新人&#xff0c;到一个开始思考未来的新人。我开始不断更新博客&#xff0c;开始不断吸收知识&#…

【21-40】计算机网络基础知识(非常详细)从零基础入门到精通,看完这一篇就够了

【21-40】计算机网络基础知识&#xff08;非常详细&#xff09;从零基础入门到精通&#xff0c;看完这一篇就够了 以下是本文参考的资料 欢迎大家查收原版 本版本仅作个人笔记使用21、HTTPS是如何保证数据传输的安全&#xff0c;整体的流程是什么&#xff1f;&#xff08;SSL是…

【Servlet】Servlet入门

文章目录 一、介绍二、入门案例导入servlet-api的解决办法 一、介绍 概念&#xff1a;server applet&#xff0c;即&#xff1a;运行在服务器端的小程序 Servlet就是一个接口&#xff0c;定义了Java类被浏览器访问到&#xff08;tomcat识别&#xff09;的规则。 将来我们定义…

Spring Boot | Spring Boot的“数据访问“、Spring Boot“整合MyBatis“

目录: 一、Spring Boot”数据访问概述“二、Spring Boot”整合MyBatis”1. 基础环境搭建 (引入对应的“依赖启动器” 配置数据库的“相关参数”)① 数据准备 (导入Sql文件)② 创建项目&#xff0c;引入相应的启动器&#xff0c;编写数据库对应的“实体类”③额外添加pom.xml文…

海康Ehome2.0与5.0设备接入EasyCVR视频汇聚平台时的配置区别

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

【Ubuntu】用 VMware 安装 macOS

本教程使用 Ubuntu 20.04.6 LTS&#xff0c;VMware Workstation Pro 17.5.1&#xff0c;macOS Sonoma 14.4。文中所有需要的下载链接均以 Markdown 的形式体现在文字上。 下载 VMware Workstation Pro&#xff0c;目前最新版本是 17.5.1。 使用密钥&#xff0c;进行破解。 VM…

SpringBoot+uniApp宠物领养小程序系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.保存宠物信息代码2.提交订单信息代码3.查询评论信息代码 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SpringBootuniApp框架开发的宠物领养微信小程序系统。…

R语言,数据类型转换

原文链接&#xff1a;R语言技能 | 不同数据类型的转换 本期教程 写在前面 今天是4月份的第一天&#xff0c;再过2天后再一次迎来清明小假期。木鸡大家是否正常放假呢&#xff1f; 我们在使用R语言做数据分析时&#xff0c;会一直对数据进行不同类型的转换&#xff0c;有时候…

【Java】API——Calendar日期类使用+题目演示

目录 Calendar日期类简单介绍 导入对应包&#xff1a; 获取 Calendar 对象&#xff1a; 设置日期和时间&#xff1a; 获取日期和时间的各个部分&#xff1a; 日期和时间的加减操作&#xff1a; 例题&#xff1a;世纪末的星期 题目描述 题目代码 Calendar日期类简单介绍…

typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ 安装 sudo pacman -S rabbitmq rabbitmqadmin启动管理模块 sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server管理界面 http://127.0.0.1:15672/ 默认用户名和密码都是guest。 要使用 rabbitmqctl 命令添加用户并分配权限&#xf…

Java类和对象练习题

练习一 下面代码的运行结果是&#xff08;&#xff09; public static void main(String[] args){String s;System.out.println("s"s);} 解析&#xff1a;本题中的代码不能编译通过&#xff0c;因为在Java当中局部变量必须先初始化&#xff0c;后使用。所以此处编译不…

【Python刷题】将有序数组转换为二叉搜索树

问题描述 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 平衡 二叉搜索树。 高度平衡的意思是&#xff1a;二叉树是一颗满足“每个结点的左右两个子树的高度差的绝对值不超过1”的二叉树。 示例 1&#xff1a; 输入&#xf…

农村集中式生活污水分质处理及循环利用技术指南

立项单位&#xff1a;生态环境部土壤与农业农村生态环境监管技术中心、山东文远环保科技股份有限公司、北京易境创联环保有限公司、中国环境科学研究院、广东省环境科学研究院、中铁第五勘察设计院集团有限公司、中华环保联合会水环境治理专业委员会 本文件规定了集中式村镇生活…

Stable Diffusion 模型下载:epiCPhotoGasm(真实、照片)

本文收录于《AI绘画从入门到精通》专栏,专栏总目录:点这里,订阅后可阅读专栏内所有文章。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八下载地址模型介绍

语音克隆技术浪潮:探索OpenAI Voice Engine的奇妙之旅

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

基于SpringBoot的“校园志愿者管理系统”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“校园志愿者管理系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 系统首页界面图 志愿者注册…

游戏引擎中的声音系统

一、声音基础 1.1 音量 声音振幅的大小 压强p&#xff1a;由声音引起的与环境大气压的局部偏差 1.2 音调 1.3 音色 1.4 降噪 1.5 人的听觉范围 1.6 电子音乐 将自然界中连续的音乐转换成离散的信号记录到内存中 采样 - 量化 - 编码 香农定理&#xff1a;采样频率是信…

探究云手机的海外原生IP优势

随着全球数字化进程的加速&#xff0c;企业越来越依赖于网络来扩展其业务。在这个数字时代&#xff0c;云手机作为一种创新的通信技术&#xff0c;已经成为了企业网络优化的重要组成部分。云手机支持海外原生IP的特性&#xff0c;为企业在国际市场上的拓展提供了全新的可能性。…

idea中 错误:找不到或无法加载主类

很神奇的就是maven打包是正常的&#xff0c;本来也是好好的&#xff0c;突然启动就报错了&#xff0c;我百度了很急&#xff0c;没什么结果&#xff0c;找了公司6年工作经验的老员工&#xff0c;还是搞了好久&#xff0c;我站了好久也是没解决。后来我也是在想maven的jar包都能…

【每日一题】2810. 故障键盘-2024.4.1

题目&#xff1a; 2810. 故障键盘 你的笔记本键盘存在故障&#xff0c;每当你在上面输入字符 i 时&#xff0c;它会反转你所写的字符串。而输入其他字符则可以正常工作。 给你一个下标从 0 开始的字符串 s &#xff0c;请你用故障键盘依次输入每个字符。 返回最终笔记本屏幕…