Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务

  • 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 构建jdbc sinkSinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句new JdbcStatementBuilder<CustomizeBean>() {@Overridepublic void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withBatchSize(10) // 批次大小,条数.withBatchIntervalMs(5000) // 批次最大等待时间.withMaxRetries(1) // 重复次数.build(), // 写入参数配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false").withUsername("root").withPassword("password").build() // jdbc信息配置);// 添加jdbc sinkcustomizeSource.addSink(jdbcSink);env.execute();}
}
  • 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;import javax.sql.XADataSource;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 每20秒作为checkpoint的一个周期env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpointSinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句(JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withMaxRetries(0) // 设置重复次数.withBatchSize(25) // 设置批次大小,数据条数.withBatchIntervalMs(1000) // 批次最大等待时间.build(),JdbcExactlyOnceOptions.builder()// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的.withTransactionPerConnection(true).build(),(SerializableSupplier<XADataSource>) () -> {// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置urlmysqlXADataSource.setUser("root"); // 设置用户mysqlXADataSource.setPassword("password"); // 设置密码return mysqlXADataSource;});// 添加jdbc sinkcustomizeSource.addSink(exactlyOneJdbcSink);env.execute();}
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 --><!-- JDBC connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
  • 结果
    在这里插入图片描述
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/75896.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wireshark简单使用(一)

前两天为同事处理交换机故障&#xff0c;接触到wireshark使用&#xff0c;意识到这个工具对于工程师来说&#xff0c;查询报文还是必须的&#xff0c;了解基本的使用。 于是接触到一些视频&#xff0c;开始自学。 第一步 查询本机IP地址 cmd---ipconfig 查看本机IP地址和网关…

【CI/CD】图解六种分支管理模型

图解六种分支管理模型 任何一家公司乃至于一个小组织&#xff0c;只要有写代码的地方&#xff0c;就有代码版本管理的主场&#xff0c;初入职场&#xff0c;总会遇到第一个拦路虎 git 管理流程&#xff0c;但是每一个企业似乎都有自己的 git 管理流程&#xff0c;倘若我们能掌握…

学习购药系统源码:从前端到后端的技术探索

本文将带领读者探索购药系统源码&#xff0c;从前端到后端逐步深入&#xff0c;了解其核心功能和实现方式。我们将使用常见的Web技术&#xff0c;包括HTML、CSS、JavaScript、以及Python的Django框架&#xff0c;展示购药系统的技术奥秘。 前端技术探索 HTML结构搭建 购药系…

京东开源的、高效的企业级表格可视化搭建解决方案:DripTable

DripTable 是京东零售推出的一款用于企业级中后台的动态列表解决方案&#xff0c;项目基于 React 和 JSON Schema&#xff0c;旨在通过简单配置快速生成页面动态列表来降低列表开发难度、提高工作效率。 DripTable 目前包含以下子项目&#xff1a;drip-table、drip-table-gene…

Jmeter(一) - 从入门到精通 - 环境搭建(详解教程)

1.JMeter 介绍 Apache JMeter是100%纯JAVA桌面应用程序&#xff0c;被设计为用于测试客户端/服务端结构的软件(例如web应用程序)。它可以用来测试静态和动态资源的性能&#xff0c;例如&#xff1a;静态文件&#xff0c;Java Servlet,CGI Scripts,Java Object,数据库和FTP服务器…

【蓝图】p48冲刺、瞬移、多段跳

p48冲刺&#xff0c;瞬移&#xff0c;多段跳 p48冲刺&#xff0c;瞬移&#xff0c;多段跳冲刺功能实现瞬移功能实现Set Actor Location&#xff08;设置Actor位置&#xff09; 二段跳 p48冲刺&#xff0c;瞬移&#xff0c;多段跳 按shift加速&#xff0c;松开shift恢复普通速度…

swift - 如何在数组大小更改后刷新 ForEach 显示元素的数量(SwiftUI、Xcode 11 Beta 5)

我正在尝试实现一个 View &#xff0c;该 View 可以在内容数组的大小发生变化时更改显示项目的数量(由 ForEach 循环创建)&#xff0c;就像购物应用程序可能会在用户下拉刷新后更改其可用项目的数量一样 这是我到目前为止尝试过的一些代码。如果我没记错的话&#xff0c;这些适…

在word的文本框内使用Endnote引用文献,如何保证引文编号按照上下文排序

问题 如下图所示&#xff0c;我在word中插入了一个文本框&#xff08;为了插图&#xff09;&#xff0c;然后文本框内有引用&#xff0c;结果endnote自动将文本框内的引用优先排序&#xff0c;变成文献[1]了&#xff0c;而事实上应该是[31]。请问如何能让文本框内的排序也自动…

8.4 作业

1.思维导图 2.判断家目录下&#xff0c;普通文件的个数和目录文件的个数 #!/bin/bash count10 count20 cd ~ for i in $(ls) doif [ -f "$i" ]thencount1$((count11))elif [ -d "$i" ]then count2$((count21))fi done echo $count1 echo $count2 3.输入一…

Java阶段五Day17

Java阶段五Day17 文章目录 Java阶段五Day17师傅后台功能师傅审核列表相关功能启动进程和启动方式 后台审核详情查询查询审核详情流程远程调用图片服务 缓存逻辑缓存逻辑流程查询引入缓存流程完成缓存逻辑面试题整理 附录redis分布式——架构演变 师傅后台功能 师傅审核列表 相…

aws中opensearch 日志通(Centralized Logging with OpenSearch)2.0(一)

aws日志通2.0 实现全面的日志管理和分析功能 一体化日志摄取 &#xff1a;把aws服务器日志和应用日志传输到opensearch域中无代码日志处理 &#xff1a;在网页控制台中就可以实现数据处理开箱即用 &#xff1a;提供可视化模版&#xff08;nginx、HTTP server &#xff09; 架构…

【Docker】Docker容器化技术基础

Docker容器化技术 Docker&#xff08;软件跨环境迁移&#xff09;Docker概念&#xff1a;安装Dockerdocker架构配置Docker镜像加速器 一、Docker命令服务daemon相关的命令镜像相关命令Docker容器相关命令 二、Docker容器的数据卷数据卷概念配置数据卷配置数据卷容器 三、Docker…

苍穹外卖day10——订单状态定时处理(Spring Task)、来单提醒和客户催单(WebSocket)

预期效果 对于超时没处理的需要定时程序处理。基于SpringTask实现。 来单提醒和客户催单。基于WebSocket实现。 Spring Task 介绍 Cron表达式 周几通常不能和日一起指定。 cron表达式在线生成器 在线Cron表达式生成器 入门案例 创建定时任务类 /*** 定义定时任务类*/ Slf4j…

你知道HTTP与HTTPS有什么区别吗?

作者&#xff1a;Insist-- 个人主页&#xff1a;insist--个人主页 作者会持续更新网络知识和python基础知识&#xff0c;期待你的关注 目录 一、什么是HTTP&#xff1f; 二、什么是HTTPS&#xff1f; 三、HTTPS 的工作原理 1、客户端发起 HTTPS 请求 2、服务端的配置 3、…

为什么Java不支持多继承?

面试回答 因为如何要实现多继承&#xff0c;就会像C中一样&#xff0c;存在菱形继承的问题,C为了解决菱形继承问题&#xff0c;又引入了虚继承。因为支持多继承&#xff0c;引入了菱形继承问题&#xff0c;又因为要解决菱形继承问题&#xff0c;引入了虚继承。而经过分析&#…

docker【安装、存储、镜像、仓库、网络、监控】

docker-0110.0.0.51docker-0210.0.0.52docker-0310.0.0.53 【1】docker安装 docker-01 [rootdocker-01 ~]# vim /etc/yum.conf [main] cachedir/var/cache/yum/$basearch/$releasever keepcache1 debuglevel2 logfile/var/log/yum.log exactarch1 obsoletes1 gpgcheck1 plugin…

常见历史漏洞之Thinkphp

常见历史漏洞之Thinkphp 一、介绍二、Thinkphp历史漏洞三、Thinkphp特征发现四、批量漏洞检测五、漏洞总结六、5.0.23版本案例演示 一、介绍 Thinkphp是一种开源框架。是一个由国人开发的支持windows/Unix/Linux等服务器环境的轻量级PHP开发框架。很多cms就是基于thinkphp二次开…

论文阅读- Uncovering Coordinated Networks on Social Media:Methods and Case Studies

链接&#xff1a;https://arxiv.org/pdf/2001.05658.pdf 目录 摘要&#xff1a; 引言 Methods Case Study 1: Account Handle Sharing Coordination Detection 分析 Case Study 2: Image Coordination Coordination Detection Analysis Case Study 3: Hashtag Sequen…

【问题随记】

ubuntu 14.04源更新(sources.list) deb http://mirrors.aliyun.com/ubuntu/ trusty main restricted universe multiverse deb http://mirrors.aliyun.com/ubuntu/ trusty-security main restricted universe multiverse deb http://mirrors.aliyun.com/ubuntu/ trusty-update…

Mac 安装不在 Apple 商店授权的应用程序

文章目录 一、场景介绍二、实操说明 一、场景介绍 在日常的工作生活中&#xff0c;发现一些好用的应用程序&#xff0c;但是出于某些原因&#xff0c;应用程序的开发者并没有将安装包上架到苹果商店。 那么这些优秀的应用程序下载安装以后就会出现如下弹框被拒之门外 二、实操…