Flink将数据写入MySQL(JDBC)

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

        <flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version>

2.2 导入相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.22</version>
</dependency>

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` ( `id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;import java.util.Objects;/*** TODO POJO类的特点* 类是公有(public)的* 有一个无参的构造方法* 所有属性都是公有(public)的* 所有属性的类型都是可以序列化的*/
public class WaterSensor {//类的公共属性public String id;public Long ts;public Integer vc;//无参构造方法public WaterSensor() {//System.out.println("调用了无参数的构造方法");}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}//生成get和set方法public void setId(String id) {this.id = id;}public void setTs(Long ts) {this.ts = ts;}public void setVc(Integer vc) {this.vc = vc;}public String getId() {return id;}public Long getTs() {return ts;}public Integer getVc() {return vc;}//重写toString方法@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}//重写equals和hasCode方法@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}
//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;import org.apache.flink.api.common.functions.MapFunction;public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** Flink 输出到 MySQL(JDBC)*/
public class flinkSinkJdbc {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO TransferSingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());/**TODO 写入 mysql* 1、只能用老的 sink 写法* 2、JDBCSink 的 4 个参数:*   第一个参数: 执行的 sql,一般就是 insert into*   第二个参数: 预编译 sql, 对占位符填充值*   第三个参数: 执行选项 ---->攒批、重试*   第四个参数: 连接选项---->url、用户名、密码*/SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");}}, JdbcExecutionOptions.builder().withMaxRetries(3)         // 重试次数.withBatchSize(100)        // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("********").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());//TODO 写入到MysqlwaterSensorSingleOutputStreamOperator.addSink(sinkFunction);streamExecutionEnvironment.execute();}
}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

在这里插入图片描述
启动Flink程序
在这里插入图片描述
查看数据库写入是否正常
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/172699.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DBSCAN算法c++实现

首先计算出距离每个点e以内的点&#xff0c;如果个数>minPts,则找出它的直接密度可达和间接可达的点&#xff0c;用visited标记点是否已经在簇中&#xff0c;循环直到最后一个点。 #include <fstream> #include <vector> #include <iostream> #include &…

广州华锐互动:VR技术应用到工程项目施工安全培训的好处

随着科技的飞速发展&#xff0c;虚拟现实(VR)技术已经深入到各个领域。在建筑施工领域&#xff0c;VR技术的应用为工程项目施工安全培训带来了许多好处。本文将探讨VR技术在这方面的优势和应用。 首先&#xff0c;VR技术能够提供沉浸式的安全培训体验。通过VR设备&#xff0c;学…

x210项目重新回顾之十七升级到linux4.19.114 +buildroot2018再讨论

代码参考https://github.com/colourfate/x210_bsp/ 他的是linux_4.10(dtb为 s5pv210-x210..dtb)我打算用linux4.19.114(dtb为 s5pv210-smdkv210.dtb) &#xff0c;所以修改build.sh ------------------------------------------------------------------------------ 5 M…

【开源】基于SpringBoot的高校学院网站的设计和实现

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学院院系模块2.2 竞赛报名模块2.3 教育教学模块2.4 招生就业模块2.5 实时信息模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 学院院系表3.2.2 竞赛报名表3.2.3 教育教学表3.2.4 招生就业表3.2.5 实时信息表 四、系…

python之计算平面点集的的面积

在当今数据驱动的世界中&#xff0c;计算平面点集的最小外接轮廓面积被广泛应用于各种实际场景中。它是一项重要而魅力十足的任务&#xff0c;旨在找到一个最小的矩形或多边形区域&#xff0c;能够完全包围给定的离散点集。这个看似简单的问题背后隐藏着许多挑战&#xff0c;需…

python爬虫之feapder.AirSpider轻量爬虫案例:豆瓣

创建feaderSpider项目&#xff1a;feapder create -p feapderSpider&#xff0c;已创建可忽略进入feapderSpider目录&#xff1a;cd .\ feapderSpider\spiders创建爬虫&#xff1a;feapder create -s airSpiderDouban&#xff0c;选择AirSpider爬虫模板&#xff0c;可跳过1、2直…

KMS在腾讯云的微服务实践助力其降本50%

背景介绍 KMS 是一家日本的游戏公司&#xff0c;主要经营游戏业务、数字漫画业务、广告业务、云解决方案业务等&#xff0c;出品了多款在日本畅销的漫画风游戏&#xff0c;同时有网络漫画专业厂牌&#xff0c;以内容创作为目标&#xff0c;拥有原创 IP 创作、游戏开发等多元化发…

bitlocker 加密锁定的固态硬盘,更换到别的电脑上,怎么把原密钥写进新电脑TPM芯片内,开启无需手动填密钥

环境: Win11 专业版 联想E14笔记本 512G ssd 问题描述: 一台笔记本因充电故障,需要拿去维修,不想重装系统,将bitlocker 加密锁定的固态硬盘拆下更换到别的笔记本电脑上,现在开机要手动填密钥,怎么把原密钥写进新电脑TPM芯片内,开启无需手动填密钥和之前那台电脑一…

DevOps与CI/CD的最佳实践

在当今的软件开发领域&#xff0c;DevOps&#xff08;开发与运维的结合&#xff09;和CI/CD&#xff08;持续集成/持续交付&#xff09;已经成为了不可或缺的一部分。它们不仅提高了软件开发的效率&#xff0c;还帮助团队更快地交付高质量的软件。本文将深入探讨DevOps文化和CI…

iOS Xcode15 适配:Other Linker Flags:-ld_classic

0x00 适配是一条没有尽头的路 Xcode 14 毛问题都没有&#xff0c;Xcode 15 崩溃 看图说话 0x01 解决方案 Other Linker Flags 添加 -ld_classic 即可 0x02 我的小作品 欢迎体验我的作品之一&#xff1a;小挑战-XGame 拼图游戏&#xff0c;渐变色游戏&#xff0c;经典24点游…

List 3.5 详解原码、反码、补码

前言 欢迎来到我的博客&#xff0c;我是雨空集&#xff08;全网同名&#xff09;&#xff0c;无论你是无意中发现我&#xff0c;还是有意搜索而来&#xff0c;我都感到荣幸。这里是一个分享知识、交流想法的平台&#xff0c;我希望我的博客能给你带来帮助和启发。如果你喜欢我…

Ubuntu ARMv8编译Qt源码以及QtCreator

最近需要在NVIDIA小盒子上面跑一个程序&#xff0c;一开始想着在Ubuntu x64下交叉编译一版&#xff0c;后来发现libqxcb.so 这个库在configure时就会一直报错&#xff0c;多方查找怀疑可能是由于硬件不支持在x64环境下编译AMR架构的xcb库。 所以最后在ARM下直接编译Qt源码了&am…

word页脚设置,页脚显示第几页共有几页设置步骤

word页脚设置&#xff0c;页脚显示第几页共有几页设置步骤&#xff1a; 具体步骤&#xff1a; 步骤1&#xff1a; 步骤1.1选择页脚---空白页脚 步骤1.2&#xff0c;在"[在此处键入]"&#xff0c;直接输入你需要的格式&#xff0c;如 “第页/共页” 步骤1.3选择第“…

数据分析和互联网医院小程序:提高医疗决策的准确性和效率

互联网医院小程序已经在医疗领域取得了显著的进展&#xff0c;为患者和医疗从业者提供了更便捷和高效的医疗服务。随着数据分析技术的快速发展&#xff0c;互联网医院小程序能够利用大数据来提高医疗决策的准确性和效率。本文将探讨数据分析在互联网医院小程序中的应用&#xf…

Vue图片路径问题(动态引入)

vue项目中我们经常会遇到动态路径的图片无法显示的问题&#xff0c;以下是静态路径和动态路径的常见使用方法。 1.静态路径 在日常的开发中&#xff0c;图片的静态路径通过相对路径和绝对路径的方式引入。 相对路径&#xff1a;以.开头的&#xff0c;例如./、../之类的。就是…

pytorch笔记:TRIPLETMARGINLOSS

1 介绍 创建一个衡量三元组损失的标准&#xff0c;给定输入张量 x1​、x2​ 和 x3​ 以及一个大于0的间距值。这用于测量样本之间的相对相似性。一个三元组由a、p和n组成&#xff08;锚点、正例和负例&#xff09;。所有输入张量的形状都应为 (N,D) 2 基本使用方法 torch.nn.…

iPhone手机屏幕分辨率

ios app测试时&#xff0c;需要测试应用在不同型号的苹果手机上的表现形式&#xff0c;可以自己在浏览器上配置。 代数设备逻辑像素尺寸缩放发布时间第一代iPhone 2G320 x 480480 x 3203.5寸1x2007年6月29日第二代iPhone 3320 x 480480 x 3203.5寸1x2008年7月11日第三代iPhone …

前端 :用HTML和css制作一个小米官网的静态页面

1.HTML&#xff1a; <body><div id "content"><div id "box"><div id "top"><div id "top-left"><span id "logo">MI</span><span id "text-logo">小米账…

机器视觉3D项目评估的基本要素及测量案例分析

目录 一. 检测需求确认 1、产品名称&#xff1a;【了解是什么产品上的零件&#xff0c;功能是什么】 2、*产品尺寸&#xff1a;【最大兼容尺寸】 3、*测量项目&#xff1a;【确认清楚测量点位】 4、*精度要求&#xff1a;【若客户提出的精度值过大或者过小&#xff0c;可以和客…

【API篇】十、生成Flink水位线

文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则 水位线出现&#xff0c;即代表这个时间之前的数据已经全部到齐&#xff0c;之后不会再出现之前的数…