【API篇】六、Flink输出算子Sink

文章目录

  • 1、输出到外部系统
  • 2、输出到文件
  • 3、输出到KafKa
  • 4、输出到MySQL(JDBC)
  • 5、自定义Sink输出

Flink做为数据处理引擎,要把最终处理好的数据写入外部存储,为外部系统或应用提供支持。与输入算子Source相对应的,输出算子为Sink。

在这里插入图片描述
前面一直在用的print就是一种Sink,用来将数据流写到控制台打印

在这里插入图片描述

1、输出到外部系统

Flink程序中所有对外的输出操作,利用Sink算子完成

Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法

stream.addSink(new SinkFunction());
//重写SinkFunction接口的invoke方法,用来将指定的值写入到外部系统中
//invoke方法在每条数据记录到来时都会调用。

Flink1.12开始,Sink算子的创建是通过调用DataStream的.sinkTo()方法

stream.sinkTo()

Flink官网为我们提供了一部分的框架的Sink连接器:

Flink官方为我们提供了一部分的框架的Sink连接器

source/sink即可读可写,能做为数据源连接,也能做为下游去输出。

2、输出到文件

先引入Flink流式文件系统的连接器FileSink的依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder):

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)

下面演示实现读往d盘下的tmp目录写数据(tmp目录不用提前创建,不存在会自动创建):

public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件是正在写入状态env.setParallelism(1);// 必须开启checkpoint,否则文件一直都是 .inprogress状态,即正在写入env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);//生成器模拟一个数据源DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000), //每秒生成1000条数据Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀,new也行,这里展示build方式创建配置对象.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("code9527").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录。ZoneId.systemDefault()即系统默认时区,也可是ZoneId类中的其他时区.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

运行,看下效果:inprocess,此时文件正在写入数据,不可读。一个这个inprocess文件,因为上面并行度设置的1

在这里插入图片描述

总结:重点还是FileSink对象的创建

  • 输出行/批文件存储的文件,可指定文件路径、文件编码、文件前后缀

  • 按目录分桶,传参的接口实现类对象自选,demo中是按照时间给文件夹命名

  • 特别注意文件滚动策略,是达到指定时间或者文件到达指定大小,是或的关系

  • FileSink对象创建完后,直接流对象调用sinkTo即可完成写入到文件的动作

3、输出到KafKa

添加KafKa连接器的依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>

以下用socket模拟无界流,来演示数据输出到KafKa:

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint,否则无法写入Kafkaenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("node1", 9527);KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("topic1").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("test-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

关于 Kafka Sink,如果要使用精准一次写入Kafka,需要满足以下条件,缺一不可

  • 开启checkpoint(后续介绍)
  • 设置事务前缀
  • 设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟

如果要指定写入kafka的key,可以自定义序列化器:

  • 实现 一个接口,重写 序列化 方法
  • 指定key,转成 字节数组
  • 指定value,转成 字节数组
  • 返回一个 ProducerRecord对象,把key、value放进去
public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("node1", 9527);/***指定写入kafka的key,可以自定义序列化器:*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");  //输入的测试数据格式为a,b,c,所以这里先分割一下byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("topic1", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("test-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

4、输出到MySQL(JDBC)

添加MySQL驱动依赖:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>

在这里插入图片描述

再引入flink-jdbc连接器依赖:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version>
</dependency>


PS:

教学视频中提到了另一种情况,这里记录下。即:官方还未提供flink-connector-jdbc的某高版本的正式依赖,如1.17.0(当前时间已有),暂时从apache snapshot仓库下,因此引入依赖前,先在pom文件中指定仓库路径

<repositories><repository><id>apache-snapshots</id>  <!--这个id后面setting.xml里有用--><name>apache-snapshots</name><url>https://repository.apache.org/content/repositories/snapshots/</url></repository>
</repositories>

再引入flink-jdbc连接器依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加!apache-snapshots

<mirror><id>aliyunmaven</id><mirrorOf>*,!apache-snapshots</mirrorOf>   <!--即除了apache-snapshots,其余的都去阿里仓库下,!即排除,后面的名称是pom中定义的那个--><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url>
</mirror>


根据你的数据类型,建立对应结构的表,这里根据要接收的自定义对象WaterSensor建表test:

mysql>     
CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

输出到MySQL的Demo代码:

public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());  //输入的信息映射转为自定义的WaterSensor实体类对象SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://node01:3306/testDB?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("admin123").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}

总结: 写入mysql时注意只能用老的sink写法: addsink,此外JdbcSink的4个参数:

  • 第一个参数: 执行的sql,一般就是 insert into搭配占位符
  • 第二个参数: 预编译sql对象, 对占位符填充值
  • 第三个参数: 执行选项 ,比如批次大小、重试时间
  • 第四个参数: 数据库连接选项 , url、用户名、密码

运行,输入数据,查看MySQL:

在这里插入图片描述

5、自定义Sink输出

现有的Flink连接器不能满足需求时,需要自定义连接器进行输出。与Source类似,Flink提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,实现这个接口,就可通过DataStream的.addSink()方法自定义写入任何的外部存储。

public class MySinkFunction implements SinkFunction<String>{@Overridepublic void invoke(String value, Context context) throws Exception{//输出逻辑//value即流中的数据,来一条数据,invoke方法就被调用一次(所以不要在这里创建连接对象)//如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象}
}
stream.addSink(new MySinkFunction<String>());

来一条数据,invoke方法就被调用一次,如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象:

public class MySinkFunction implements RichSinkFunction<String>{Connection connection = null;@Overrdiepublic void open(Configuration parameters) throws Exception{connection = new xxConnection(xx);}@Overridepublic void close() throws Exception{super.close();}@Overridepublic void invoke(String value, Context context) throws Exception{//输出逻辑connection.executeXXX(xxx);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/168686.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【银河麒麟系统】备份还原工具显示“备份分区空间不足,请删除过期或者不需要的备份”解决方法

一.问题的现象 在进行银行麒麟V10的系统备份时&#xff0c;会因为所需备份的系统过大导致备份分区容量不足导致备份失败的情况&#xff1a; 二.解决方法 该问题的处理思路与之前写过的一篇文章&#xff1a;【linux】把home目录挂载到其他分区&#xff08;数据盘/data等&#xf…

万界星空科技/免费MES系统/开源MES/免费追溯管理

开源系统概述&#xff1a; 万界星空科技免费MES、开源MES、商业开源MES、市面上最好的开源MES、MES源代码、免费MES、免费智能制造系统、免费排产系统、免费排班系统、免费质检系统、免费生产计划系统、免费仓库管理系统、免费出入库管理系统、免费可视化数字大屏。 万界星空…

图像语义分割 pytorch复现DeepLab v1图像分割网络详解以及pytorch复现(骨干网络基于VGG16、ResNet50、ResNet101)

图像语义分割 pytorch复现DeepLab v1图像分割网络详解以及pytorch复现&#xff08;骨干网络基于VGG16、ResNet50、ResNet101&#xff09; 背景介绍2、 网络结构详解2.1 LarFOV效果分析 2.2 DeepLab v1-LargeFOV 模型架构2.3 MSc&#xff08;Multi-Scale&#xff0c;多尺度(预测…

深入浅出Apache SeaTunnel SQL Server Sink Connector

在大数据时代&#xff0c;数据的迁移和流动已经变得日益重要。为了使数据能够更加高效地从一个源流向另一个目标&#xff0c;我们需要可靠、高效和易于配置的工具。今天&#xff0c;我们将介绍 JDBC SQL Server Sink Connector&#xff0c;这是一个专为 SQL Server 设计的连接器…

美国国防部网络战略如何改变国家网络防御

十年前&#xff0c;时任国防部长莱昂帕内塔说了一句后来臭名昭著的短语&#xff1a;“网络珍珠港”。帕内塔利用他作为该国主要国家安全官员的平台来警告美国未来将遭受可怕的数字攻击。 他警告说&#xff0c;能源基础设施、交通系统、金融平台等都容易受到剥削。媒体、专家和…

什么是Sectigo证书?

Sectigo证书&#xff0c;早前被称为Comodo证书&#xff0c;是一种SSL&#xff08;安全套接层&#xff09;证书&#xff0c;用于保护互联网上的数据传输的安全性和隐私性。这些证书由全球领先的SSL证书颁发机构Sectigo颁发&#xff0c;被广泛用于网站、应用程序和服务器上。本文…

广告掘金全自动挂机项目,单设备30+【软件脚本+技术教程】

广告掘金项目是一种越来越受欢迎的赚钱方式&#xff0c;它通过观看广告视频来获取收益。然而&#xff0c;手动观看每个广告视频可能会耗费大量时间和精力。为了简化操作并提升效率&#xff0c;我们可以利用全自动挂机脚本来完成这一任务。接下来&#xff0c;将为您介绍如何使用…

SpringCloud 微服务全栈体系(三)

第五章 Nacos 注册中心 国内公司一般都推崇阿里巴巴的技术&#xff0c;比如注册中心&#xff0c;SpringCloudAlibaba 也推出了一个名为 Nacos 的注册中心。 一、认识和安装 Nacos 1. 认识 Nacos Nacos是阿里巴巴的产品&#xff0c;现在是SpringCloud中的一个组件。相比Eure…

【面试经典150 | 哈希表】快乐数

文章目录 写在前面Tag题目来源题目解读解题思路方法一&#xff1a;哈希集合判重方法二&#xff1a;快慢指针判重 其他语言python3 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为…

第二证券:风电概念强势拉升,威力传动“20cm”涨停,双一科技等大涨

风电概念20日盘中强势拉升&#xff0c;到发稿&#xff0c;威力传动“20cm”涨停&#xff0c;双一科技涨超17%&#xff0c;顺发恒业亦涨停&#xff0c;金雷股份、大金重工涨约7%&#xff0c;新强联、海力风电涨超5%。 音讯面上&#xff0c;9月以来江苏、广东海风项目加快推动&a…

第十五章 I/O(输入/输出)流

15.1 输入/输出流 流是一组有序的数据序列&#xff0c;可分为输入流和输出流两种。 程序从指向源的输入流中读取源中数据&#xff0c;源可以是文件、网络、压缩包或者其他数据源 输出流的指向是数据要到达的目的地&#xff0c;输出流的目标可以是文件、网络、压缩包、控制台和…

机器学习笔记 - 特斯拉的占用网络简述

一、简述 ​ 2022 年,特斯拉宣布即将在其车辆中发布全新算法。该算法被称为occupancy networks,它应该是对Tesla 的HydraNet 的改进。 自动驾驶汽车行业在技术上分为两类:基于视觉的系统和基于激光雷达的系统。后者使用激光传感器来确定物体的存在和距离,而视觉系统…

【tg】6: MediaManager的主要功能

【tg】2:视频采集的输入和输出 的管理者是 media manager‘ media 需要 network的支持:NetworkInterface friend class MediaManager::NetworkInterfaceImpl;NetworkInterfaceImpl 直接持有 MediaManager 的指针即可:发送rtp包、rtcp包、设置socket选项?

小程序 swiper滑动

整个红色区域为可滑动区域&#xff0c;数字1区域为展示区域&#xff0c;数字2为下一个展示模块 <scroll-view class"h_scroll_horizontal" enhanced"ture" bind:touchend"touchEnd" bind:touchstart"touchStart"><view clas…

从昏暗到明亮—改善照明环境,提升编程效率

作为一名程序员博主&#xff0c;长时间写代码、写博客&#xff0c;对着电脑屏幕的生活方式已经渐渐成为了我的日常。 然而&#xff0c;这种生活方式却给我的眼睛带来了相当大的压力。每当一天的工作结束&#xff0c;我的眼睛总是感到干涩、疲劳&#xff0c;让我感到不舒适。&am…

基于C语言 --- 自己写一个通讯录

C语言程序设计笔记---039 C语言之实现通讯录1、介绍C/C程序的内存开辟2、C语言实现通讯录2.1、ContactMain.c程序大纲2.2、Contact2.h2.3、Contact2.c2.3.1 InitContact( )初始化通讯录函数2.3.2 AddContact( )添加联系人和CheckCapaticy( )检查容量函数2.3.3、ShowContact( )显…

模式识别——高斯分类器

模式识别——高斯分类器 需知定义特殊情况&#xff08;方差一致&#xff09;Sigmoid 需知 所有问题定义在分类问题下&#xff0c;基于贝叶斯决策 定义 条件概率为多元高斯分布&#xff0c;此时观测为向量 X X 1 , X 2 , . . . , X n X{X_1,X_2,...,X_n} XX1​,X2​,...,Xn​…

Docker Service 创建

Docker Swarm Mode Docker Swarm 集群搭建 Docker Swarm 节点维护 Docker Service 创建 service 只能依附于 docker swarm 集群&#xff0c;所以 service 的创建前提是&#xff0c;swarm 集群搭建完毕。 1. 创建 service docker service create 命令用于创建 service&#xff…

【C++项目】高并发内存池第二讲中心缓存CentralCache框架+核心实现

CentralCache 1.框架介绍2.核心功能3.核心函数实现介绍3.1SpanSpanList介绍3.2CentralCache.h3.3CentralCache.cpp3.4TreadCache申请内存函数介绍3.5慢反馈算法 1.框架介绍 回顾一下ThreadCache的设计&#xff1a; 如图所示&#xff0c;ThreadCache设计是一个哈希桶结构&…

前端领域的插件式设计

插件&#xff0c;是一个常见的概念。 例如&#xff0c;当我们需要把我们前端代码中的 css 样式提取打包&#xff0c;我们可以用 webpack 的 mini-css-extract-plugin&#xff0c;或者你如果用 rollup 的话&#xff0c;可以选择 rollup-plugin-postcss。 再比如我们可以给 bab…