2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)

1、JDBC SQL 连接器

FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据

添加Maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>

注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包

 相关jar可以通过官网下载:JDBC SQL 连接器 


2、读取 MySQL

FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用

-- 在FlinkSQL中创建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01/flink','driver' = 'com.mysql.jdbc.Driver',  -- 【可选】不设置时,将自动从url中推导'username' = 'xxxx','password' = 'xxxx','table-name' = 'books'
);-- 批式 sql,查看 JDBC 表中的数据
select * from mysql_source_table;

运行结果:


3、写入MySQL

3.1 何时批量写入MySQL呢?

FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

开启checkpoint :

# TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
set execution.checkpointing.interval=300sec;

设置 flush 前缓存记录的最大值 、flush 间隔时间:

-- TODO 创建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'xxxx','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
);

使用说明:

FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度

        当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接

        当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优


3.2 sink mysql table 中主键的作用

在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作

否则连接器将以 append 模式工作

         upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行

                               使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束

       append 模式:对MySQL库中底表做insert操作

 upsert 模式:

-- TODO 创建MySQL 表
CREATE TABLE `books` (`id` int(11) NOT NULL,`title` varchar(99) DEFAULT NULL,`author` varchar(99) DEFAULT NULL,`price` double DEFAULT NULL,`qty` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT,PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

append 模式:

-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败

insert into 失败:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/162206.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL双主一从高可用

MySQL双主一从高可用 文章目录 MySQL双主一从高可用环境说明1.配置前的准备工作2.配置yum源 1.在部署NFS服务2.安装主数据库的数据库服务&#xff0c;并挂载nfs3.初始化数据库4.配置两台master主机数据库5.配置m1和m2成为主数据库6.安装、配置keepalived7.安装部署从数据库8.测…

用 JHipster Azure Spring Apps 构建和部署 Spring 应用

作者&#xff1a;Hang Wang 排版&#xff1a;Alan Wang 此教程将向您展示如何使用 React、Spring Boot 和 PostgreSQL 在 Azure 上构建和部署全栈 Web 应用程序&#xff0c;由 JHipster Azure Spring Apps 提供支持。 JHipster Azure Spring Apps 为全栈 Web 开发提供了全面的…

虚拟现实VR技术在医疗行业的应用介绍

虚拟现实 (VR) 虽然经常与游戏联系在一起&#xff0c;但不可否认&#xff0c;未来科技少不了虚拟现实&#xff0c;其应用可以彻底改变许多行业。在医疗领域&#xff0c;无数人正在探索 VR 可以帮助患者和医疗从业者实现更好的治疗结果治疗方式&#xff0c;比如在手术、疼痛管理…

下拉选择器的树状结构图

类似&#xff1a;【Vue-Treeselect 和 vue3-treeselect】树形下拉框 一&#xff1a;图 二&#xff1a;如果有多层级的数据结构&#xff0c;可以用treeselect插件实现 1、安装&#xff1a; npm install --save riophae/vue-treeselect 2、实现&#xff1a; <el-form ref&qu…

[计算机入门] 应用软件介绍(娱乐类)

3.21 应用软件介绍(娱乐类) 3.21.1 音乐&#xff1a;酷狗 音乐软件是一类可以帮助人们播放、管理和发现音乐的应用程序。它们提供了丰富的音乐内容&#xff0c;用户可以通过搜索、分类浏览或个性化推荐等方式找到自己喜欢的歌曲、专辑或艺术家。音乐软件还通常支持创建和管理…

【学习笔记】RabbitMQ-5 消息的可靠性投递 以及示例代码

参考资料 RabbitMQ官方网站RabbitMQ官方文档噼咔噼咔-动力节点教程 文章目录 八、RabbitMQ的确认机制 -confirm8.1 Confirm 模式简介8.2 具体代码设置8.2.1 **设置思路**&#xff1a;8.2.2 **代码实现**8.2.2.1 开启生产者的确认模式.8.2.2.2 实现接口ComfirmCallback8.2.2.3 配…

Git 分布式版本控制工具

目录 1. 前言1.1 什么是Git1.2 使用Git能做什么 2. Git概述2.1 Git简介2.2 Git下载与安装 3. Git代码托管服务3.1 常用的Git代码托管服务3.2 码云代码托管服务3.2.1 注册码云账号3.2.2 登录码云3.2.3 创建远程仓库3.2.4 邀请其他用户成为仓库成员 4. Git常用命令4.1 Git全局设置…

Flume 整合 Kafka

1.背景 先说一下,为什么要使用 Flume + Kafka? 以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kaf…

品牌创意二维码营销活动:MoneyLion 在纽约全城“撒钱”,月增百万级曝光!

在2023年4月——金融知识月&#xff0c;MoneyLion 在纽约策划了一场轰动全城的“撒钱”活动&#xff01; 在开始介绍这场极具创意的活动之前&#xff0c;我们先来了解一下MoneyLion这家公司。MoneyLion 是一家私营金融科技公司&#xff0c;为消费者提供贷款、财务咨询和投资服…

Apache Doris (四十二): RECOVER数据删除恢复

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

HTML 常用标签及练习

常用标签 <head>中的标签 概述 head中的内容不显示到页面上 标签说明<title>定义网页的标题<meta>定义网页的基本信息&#xff08;供搜索引擎&#xff09;<style>定义CSS样式<link>链接外部CSS文件或脚本文件<script>定义脚本语言<…

源码编译安装部署lnmp

源码编译安装部署lnmp 文章目录 源码编译安装部署lnmp1.简介&#xff1a;2.环境说明&#xff1a;3.部署前的准备工作4.安装nginx4.1.进入官网拉取nginx源码包4.2.通过IP地址访问nginx的web页面 5.安装mysql5.1.安装依赖包5.2.创建用户和组5.3.下载源码包并解压到/usr/local/5.4…

Helm upgrade 时 no matches for kind “xxxx“ in version “xxxx“ 问题处理

1. 问题 kubernetes 升过级&#xff0c;但是 helm release 旧版本中有新版本 api 弃用的 version。 在 helm upgrade 时就出现类似如下版本不匹配的错误&#xff0c;导致 helm upgrade 失败。 当然 helm uninstall 再重新安装可能可以跳过这个问题&#xff08;只要 charts 不再…

移动端1px-从基本原理到开源解决方案介绍

1px 不够准确&#xff0c;应该说成 1 物理像素 为什么有 1px 这个问题&#xff1f;实现 1px 有哪些方法&#xff1f;这些方法分别有哪些优缺点&#xff1f;开源项目中使用的哪些解决方案&#xff1f;如何在项目中处理 1px 的相关问题&#xff1f; 基本概念 首先&#xff0c;我们…

给cmd控制台程序 套壳 美化

给cmd控制台程序套壳美化&#xff0c;可以获取程序的标准输出和报错信息。 # _*_ coding: utf-8 _*_ """ 控制台程序启动器&#xff0c;杜绝黑窗口。 Time: 2023/10/18 15:28 Author: Jyun Version: V 0.1 File: main.py Blog: https://ctrlcv.…

Python中Numpy的应用技巧

目录 1. 什么是 NumPy?2. NumPy 中的数组2.1. 创建数组2.2. 用Numpy的数据2.2.1. OpenCV2.2.2. Pandas 3. 数学计算3.1. 四则计算3.1.1. 矩阵乘法3.1.2. 点乘 3.2. 求逆矩阵与伪逆矩阵3.3. 判断矩阵相等3.4. np.eye()函数生成对角阵 4. 统计4.1. 最大值、最小值、均值条件4.2.…

在 Python 中如何向列表或数组添加元素

在这篇文章中&#xff0c;你将了解 Python 中的 .append() 方法。你还会看到 .append() 与其他用于向列表添加元素的方法有什么不同。 让我们开始吧&#xff01; Python 中的列表是什么&#xff1f;给初学者的定义 编程中的数组是一个有序的项目集合&#xff0c;所有的项目都…

【Linux-常用命令-基础命令-解压rar文件-unrar-x-命令-笔记】

【Linux-常用命令-基础命令-解压rar文件-unrar-x-命令-笔记】 1、前言2、操作3、自己的操作 1、前言 最近&#xff0c;在使用Linux的时&#xff0c;使用相关基础命令是&#xff0c;总是容易忘记&#xff0c;上网一搜&#xff0c;大部分都写的比较繁琐&#xff0c;解压不同文件…

XCode15与iOS17/17.1 真机测试问题处理

XCode15与iOS17/17.1 真机测试问题处理&#xff0c;网上相关博客很多&#xff0c;摘录了如下实践后能起作用的地址如下&#xff1a;Xcode 15 报错处理 - 简书iOS17版本适配-CSDN博客 Xcode15适配-六虎 主要介绍下&#xff1a;Assertion failure in void _UIGraphicsBeginImag…

Linux编译内核添加Bcache模块

由于Bcache是在linux kernel 3.10之后才加入的&#xff0c;所以要使用Bcache&#xff0c;首先必须确保内核版本至少是3.10或及以上&#xff0c;可以使用uname -a查看内核版本 [rootceph01 ~]# uname -a Linux ceph01 4.18.0-305.3.1.el8.x86_64 #1 SMP Tue Jun 1 16:14:33 UTC…