使用Flink实现MySQL到Kafka的数据流转换

使用Flink实现MySQL到Kafka的数据流转换

本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka,这是一个常见的用例,适用于需要实时数据connector的场景。
在这里插入图片描述

环境准备

在开始之前,确保你的环境中已经安装了以下软件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>EastMoney</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies></project>

MySQL数据库,初始化mysql表

CREATE TABLE `t_stock_code_price` (`id` bigint NOT NULL AUTO_INCREMENT,`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',`close` double DEFAULT NULL COMMENT '最新价',`change_percent` double DEFAULT NULL COMMENT '涨跌幅',`change` double DEFAULT NULL COMMENT '涨跌额',`volume` double DEFAULT NULL COMMENT '成交量(手)',`amount` double DEFAULT NULL COMMENT '成交额',`amplitude` double DEFAULT NULL COMMENT '振幅',`turnover_rate` double DEFAULT NULL COMMENT '换手率',`peration` double DEFAULT NULL COMMENT '市盈率',`volume_rate` double DEFAULT NULL COMMENT '量比',`hign` double DEFAULT NULL COMMENT '最高',`low` double DEFAULT NULL COMMENT '最低',`open` double DEFAULT NULL COMMENT '今开',`previous_close` double DEFAULT NULL COMMENT '昨收',`pb` double DEFAULT NULL COMMENT '市净率',`create_time` varchar(64) NOT NULL COMMENT '写入时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

Kafka消息队列

1. 启动zookeeperzkServer start
2. 启动kafka服务kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topickafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
4. 消费数据kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic east_money --from-beginning

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)

定义MySQL数据源表:我们使用一个SQL语句创建了一个临时表t_stock_code_price,这个表代表了我们要从MySQL读取的数据结构和连接信息。

val source_table ="""|CREATE TEMPORARY TABLE t_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 't_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(source_table)

定义Kafka目标表:然后,我们定义了一个Kafka表re_stock_code_price_kafka,指定了Kafka的连接参数和表结构。

tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")

数据转换和写入:最后,我们执行了一个插入操作,将从MySQL读取的数据转换(这里通过case when语句添加了一个新字段rise)并写入到Kafka中。这个可以实现任何的sql etl 来满足我们的需求。

    tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

全部代码

package org.eastimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Mysql2Kafka {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val source_table ="""|CREATE TEMPORARY TABLE t_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 't_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(source_table)val result = tEnv.executeSql("select * from t_stock_code_price")result.print()tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")}
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/292029.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux课程____shell脚本应用

:一、认识shell 常用解释器 Bash , ksh , csh 登陆后默认使用shell&#xff0c;一般为/bin/bash&#xff0c;不同的指令&#xff0c;运行的环境也不同 二、 编写简单脚本并使用 # vim /frist.sh //编写脚本文件&#xff0c;简单内容 #&#xff01;/bin/bash …

Astro 宣布:将超过 500 多个测试从 Mocha 迁移到了 Node.js

近期&#xff0c;Astro 在其官方博客中宣布&#xff0c;虽然我们对 Mocha 感到满意&#xff0c;但也在寻求让我们的 CI 作业更快的方法。最终将超过 500 多个测试从 Mocha 迁移到了 Node.js。 先了解下 Astro 是什么&#xff1f;Astro 是适合构建像博客、营销网站、电子商务网站…

简单了解策略模式

什么是策略模式&#xff1f; 策略模式提供生成某一种产品的不同方式 Strategy策略类定义了某个各种算法的公共方法&#xff0c;不同的算法类通过继承Strategy策略类&#xff0c;实现自己的算法 Context的作用是减少客户端和Strategy策略类之间的耦合&#xff0c;客户端只需要…

基于单片机温湿度PM2.5报警设置系统

**单片机设计介绍&#xff0c;基于单片机温湿度PM2.5报警设置系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机温湿度PM2.5报警设置系统概要主要涵盖了系统的整体设计思路、硬件组成、软件实现以及报警功能等关键方…

在Arduino IDE中使用文件夹组织源文件和头文件

在Arduino IDE中使用文件夹组织源文件和头文件 如果你是一名Arduino爱好者&#xff0c;你可能会发现随着项目的复杂度增加&#xff0c;代码的管理变得越来越困难。在Arduino IDE中&#xff0c;你可以通过使用文件夹来更好地组织你的源文件和头文件&#xff0c;使得代码更加清晰…

腾讯云2核2G服务器优惠价格,61元一年

腾讯云2核2G服务器多少钱一年&#xff1f;轻量服务器61元一年&#xff0c;CVM 2核2G S5服务器313.2元15个月&#xff0c;轻量2核2G3M带宽、40系统盘&#xff0c;云服务器CVM S5实例是2核2G、50G系统盘。腾讯云2核2G服务器优惠活动 txybk.com/go/txy 链接打开如下图&#xff1a;…

深入理解React的setState机制

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

前端工程师————HTML5学习

HTML5基础 开发工具很多&#xff0c;其中Hbulider较好用&#xff0c;下载网址如下&#xff1a; DCloud - HBuilder、HBuilderX、uni-app、uniapp、5、5plus、mui、wap2app、流应用、HTML5、小程序开发、跨平台App、多端框架 html表示整个页面 head表示搜素框 body表示内容 ti…

【第十二届“泰迪杯”数据挖掘挑战赛】【2024泰迪杯】B题基于多模态特征融合的图像文本检索—解题全流程(论文更新)

【第十二届“泰迪杯”数据挖掘挑战赛】【2024泰迪杯】B题基于多模态特征融合的图像文本检索更新&#xff08;论文更新&#xff09; ​ 本节主要更新了论文、训练日志的log数据提取&#xff08;Loss、ACC、RK&#xff09;等数据可视化作图的代码 B题交流QQ群&#xff1a; 4583…

rabbitMQ的基础操作与可视化界面

当你安装好RabbitMq时&#xff0c;可以 尝试一下&#xff0c;这些命令 启动rabbitMQ服务 #启动服务 systemctl start rabbitmq-server #查看服务状态 systemctl status rabbitmq-server #停止服务 systemctl stop rabbitmq-server #开机启动服务 systemctl enable rabbitmq-…

09_Web组件

文章目录 Web组件Listener监听器ServletContextListener执行过程 Filter过滤器Filter与Servlet的执行 案例&#xff08;登录案例&#xff09; 小结Web组件 Web组件 JavaEE的Web组件&#xff08;三大Web组件&#xff09;&#xff1a; Servlet → 处理请求对应的业务Listener →…

RVM安装ruby笔记

环境 硬件&#xff1a;Macbook Pro 系统&#xff1a;macOS 14.1 安装公钥 通过gpg安装公钥失败&#xff0c;报错如下&#xff1a; 换了几个公钥地址&#xff08;hkp://subkeys.pgp.net&#xff0c;hkp://keys.gnupg.net&#xff0c;hkp://pgp.mit.edu&#xff09;&#xff0c;…

ML-Decoder: Scalable and Versatile Classification Head

1、引言 论文链接&#xff1a;https://openaccess.thecvf.com/content/WACV2023/papers/Ridnik_ML-Decoder_Scalable_and_Versatile_Classification_Head_WACV_2023_paper.pdf 因为 transformer 解码器分类头[1] 在少类别多标签分类数据集上表现得很好&#xff0c;但由于其查询…

css3之动画animation

动画animation 一.优点二.定义和使用三.动画序列和解释四.常见属性及解释五.简写&#xff08;名字和时间不能省略&#xff09;&#xff08;持续时间在何时开始的时间前&#xff09;&#xff08;简写中无animation-play-state)六.例子1.大数据热点图2.奔跑的熊大&#xff08;一个…

设计模式6--抽象工厂模式

定义 案例一 案例二 优缺点

代码随想录-二叉树(路径)

目录 257. 二叉树的所有路径 题目描述&#xff1a; 输入输出描述&#xff1a; 思路和想法&#xff1a; 404. 左叶子之和 题目描述&#xff1a; 输入输出描述&#xff1a; 思路和想法&#xff1a; 513.找树左下角的值 题目描述&#xff1a; 输入输出描述&#xff1a;…

Android裁剪图片为波浪形或者曲线形的ImageView

如果需要做一个自定义的波浪效果的进度条&#xff0c;裁剪图片&#xff0c;对ImageView的图片进行裁剪&#xff0c;比如下面2张图&#xff0c;如何实现&#xff1f; 先看下面的效果&#xff0c;看到其实只需要对第一张高亮的图片进行处理即可&#xff0c;灰色状态的作为背景图。…

基于SSM的戒烟网站(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的戒烟网站&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringMv…

腾讯云优惠券领取方法大公开,省钱不再是难事

腾讯云—腾讯倾力打造的云计算品牌&#xff0c;以卓越科技能力助力各行各业数字化转型&#xff0c;为全球客户提供领先的云计算、大数据、人工智能服务&#xff0c;以及定制化行业解决方案和提供可靠上云服务&#xff0c;助力企业和开发者稳定上云&#xff01; 然而&#xff0…

数据结构进阶篇 之 【二叉树顺序存储(堆)】的整体实现讲解(赋完整实现代码)

做人要谦虚&#xff0c;多听听别人的意见&#xff0c;然后记录下来&#xff0c;看看谁对你有意见 一、二叉树的顺序&#xff08;堆&#xff09;结构及实现 1.二叉树的顺序结构 2.堆的概念及结构 3.堆的实现 3.1 向下调整算法 AdJustDown 3.2 向上调整算法 AdJustUP 3.3 …