Flink实现数据写入MySQL

 先准备一个文件里面数据有:

a, 1547718199, 1000000
b, 1547718200, 1000000
c, 1547718201, 1000000
d, 1547718202, 1000000
e, 1547718203, 1000000
f, 1547718204, 1000000
g, 1547718205, 1000000
h, 1547718210, 1000000
i, 1547718210, 1000000
j, 1547718210, 1000000

 scala代码:

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._case class SensorReading(name: String, timestamp: Long, salary: Double)object a1 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//数据源val dataStream: DataStream[String] = env.readTextFile("D:\\wlf.备份24.1.3\\wlf\\ideaProgram\\bbbbbb\\src\\main\\resources\\salary.txt")val stream = dataStream.map(data => {val splited = data.split(",")SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)})stream.addSink( new JDBCSink() )env.execute("  job")}}class JDBCSink() extends RichSinkFunction[SensorReading]{// 定义sql连接、预编译器var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// 初始化,创建连接和预编译语句override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://bigdata1:3306/flink?serverTimezone=UTC", "root", "123456")insertStmt = conn.prepareStatement("INSERT INTO salary_table (name, salary) VALUES (?,?)")updateStmt = conn.prepareStatement("UPDATE salary_table SET salary = ? WHERE name = ?")}override def invoke(value: SensorReading): Unit = {// 执行更新语句updateStmt.setString(1, value.name)updateStmt.setDouble(2, value.salary)updateStmt.execute()// 如果update没有查到数据,那么执行插入语句if( updateStmt.getUpdateCount == 0 ){insertStmt.setString(1, value.name)insertStmt.setDouble(2, value.salary)insertStmt.execute()}}// 关闭时做清理工作override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()}
}

MySQL中查看表 :

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/246413.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于51单片机智能电子秤

实物显示效果: https://www.bilibili.com/video/BV1Wb4y1A7Aw/?vd_source6ff7cd03af95cd504b60511ef9373a1d 功能介绍: (1)用键盘设计单价; (2)称重后同时显示该物品的重量、单价和总额&…

记签名机制

签名过程: 首先将数据源通过摘要算法获取到数字摘要 对数字摘要用私钥进行加密得到签名 将原始消息 以及签名发送给消息接收方 接收方用公钥解密得到数字摘要 用同样的摘要算法将原始消息进行计算 比较得到的数字摘要与解密后的是否一致 Android学习笔记——Androi…

【精选推荐】3款强大的API渗透测试工具

1免责声明 请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,作者不为此承担任何责任。工具来自网络,安全性自测。 2前言 给大家介绍三款优秀的…

智能GPT图书管理系统(SpringBoot2+Vue2)、接入GPT接口,支持AI智能图书馆

☀️技术栈介绍 ☃️前端主要技术栈 技术作用版本Vue提供前端交互2.6.14Vue-Router路由式编程导航3.5.1Element-UI模块组件库,绘制界面2.4.5Axios发送ajax请求给后端请求数据1.2.1core-js兼容性更强,浏览器适配3.8.3swiper轮播图插件(快速实…

VR拍摄+制作

1.VR制作需要的图片宽高是2:1,需要360✖️180的图片,拍摄设备主要有两种: 1)通过鱼眼相机拍摄,拍摄一组图片,然后通过PTGui来合成(拍摄复杂) 2)全景相机,一键拍摄直接就能合成需要的…

C# Graphics对象学习

Graphics对象用于进行绘制; 从哪个对象获取的Graphics,然后进行绘制,就绘制到该对象上; 从位图获取Graphics,然后进行绘制,绘制到该位图上; 从某个控件获取Graphics,然后绘制&…

智慧文旅:提升旅游体验与推动经济发展的新动力

一、智慧文旅的定义与意义 智慧文旅,即智慧文化旅游,是一种以当地特色文化元素为核心驱动,利用现代科技手段实现旅游景区全面智慧升级的旅游模式。其意义在于为游客提供高效便捷的旅游信息化服务,提升旅游体验,同时推…

蓝桥杯备战——6.串口通讯

1.分析原理图 由上图我们可以看到串口1通过CH340接到了USB口上,通过串口1我们就能跟电脑进行数据交互。 另外需要注意的是STC15F是有两组高速串口的,而且可以切换端口。 2.配置串口 由于比赛时间紧,我们最好不要去现场查寄存器手册&#x…

Unity应用在车机上启动有概率黑屏的解决方案

问题描述 最近将游戏适配到车机上(Android系统),碰到了一个严重bug,启动的时候有概率会遇到黑屏,表现就是全黑,无法进入Unity的场景。 经过查看LogCat日志,也没有任何报错,也没有任…

Python网络爬虫分步走之 – 第一步:什么是网络爬虫?

Python网络爬虫分步走之第一步:什么是网络爬虫? Web Scraping in Python Step by Step – 1st Step, What is Web Crawler? By JacksonML 1. 什么是网络爬虫? 在能够使用Google搜索引擎的场合,你是否尝试过简单搜索&#xff…

经典目标检测YOLO系列(三)YOLOV3的复现(1)总体网络架构及前向处理过程

经典目标检测YOLO系列(三)YOLOV3的复现(1)总体网络架构及前向处理过程 和之前实现的YOLOv2一样,根据《YOLO目标检测》(ISBN:9787115627094)一书,在不脱离YOLOv3的大部分核心理念的前提下,重构一款较新的YOLOv3检测器,来对YOLOv3有…

Go 命令行解析 flag 包之快速上手

本篇文章是 Go 标准库 flag 包的快速上手篇。 概述 开发一个命令行工具,视复杂程度,一般要选择一个合适的命令行解析库,简单的需求用 Go 标准库 flag 就够了,flag 的使用非常简单。 当然,除了标准库 flag 外&#x…

vue3预览pdf文件的几种方法

文章目录 vue3预览pdf集中方法方法一:方法二:展示效果:需要包依赖:代码: 方法三:展示效果:需要包依赖:代码:自己调参数,选择符合自己的 vue3预览pdf集中方法 …

蓝桥杯备战——7.DS18B20温度传感器

1.分析原理图 通过上图我们可以看到DS18B20通过单总线接到了单片机的P14上。 2.查阅DS18B20使用手册 比赛的时候是会提供DS18B20单总线通讯协议的代码,但是没有提供读取温度数据的代码,所以还是需要我们去查看手册,我只把重要部分截下来了 …

一款强大的矢量图形设计软件:Adobe Illustrator 2023 (AI2023)软件介绍

Adobe Illustrator 2023 (AI2023) 是一款强大的矢量图形设计软件,为设计师提供了无限创意和畅行无阻的设计体验。AI2023具备丰富的功能和工具,让用户可以轻松创建精美的矢量图形、插图、徽标和其他设计作品。 AI2023在界面和用户体验方面进行了全面升级…

基于LLaMA-Factory的微调记录

文章目录 数据模型准备基于网页的简单微调基于网页的简单评测基于网页的简单聊天 LLaMA-Factory是一个非常好用的无代码微调框架,不管是在模型、微调方式还是参数设置上都提供了非常完备的支持,下面是对微调全过程的一个记录。 数据模型准备 微调时一般…

Dockerfile里ADD * 保留原来的目录结构

1、问题 给新模块写Dockerfile,很多静态资源分散在各个目录,于是Dockerfile里我直接一句: ADD ./* /dest/镜像出来后,启动容器,进入容器种后发现:文件拷贝成功,但原来的目录结构都不在了&…

SAP EXCEL上传如何实现指定读取某一个sheet页(ALSM_EXCEL_TO_INTERNAL_TABLE)

如何读取指定的EXCEL sheet 页签,比如要读取下图中第二个输出sheet页签 具体实现方法如下: 拷贝标准的函数ALSM_EXCEL_TO_INTERNAL_TABLE封装成一个自定义函数ZCALSM_EXCEL_TO_INTERNAL_TABLE 在自定义函数导入参数页签新增一个参数SHEET_NAME 在源代码…

云原生离线工作流编排利器 -- 分布式工作流 Argo 集群

作者:庄宇 在现代的软件开发和数据处理领域,批处理作业(Batch)扮演着重要的角色。它们通常用于数据处理,仿真计算,科学计算等领域,往往需要大规模的计算资源。随着云计算的兴起,阿里…

外包干了一个月,技术退步明显。。。。。

先说一下自己的情况,本科生,19年通过校招进入南京某软件公司,干了接近4年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…