Windows系统安装Flink及实现MySQL之间数据同步

        Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink的设计目标是在所有常见的集群环境中运行,并以内存执行速度和任意规模来执行计算。它支持高吞吐、低延迟、高性能的流处理,并且是一个面向流处理和批处理的分布式计算框架,将批处理看作一种特殊的有界流。

Flink的主要特点包括:

  1. 事件驱动型:Flink是一个事件驱动型的应用,可以从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
  2. 支持有状态计算:Flink提供了Extactor-once语义及checkpoint机制,支持带有事件操作的流处理和窗口处理,以及灵活的窗口处理(如时间窗口、大小窗口等)。
  3. 轻量级容错处理:Flink使用savepoint进行错误恢复,可以在出现故障时快速恢复任务。
  4. 高吞吐、低延迟、高性能:Flink的设计目标是在保证数据处理稳定性的同时,实现高吞吐、低延迟、高性能的流处理。
  5. 支持大规模集群模式:Flink支持在yarn、Mesos、k8s等大规模集群环境中运行。
  6. 支持多种编程语言:Flink对java、scala、python都提供支持,但最适合使用java进行开发。

        Flink的应用场景非常广泛,可以用于实时流数据的分析计算、实时数据与维表数据关联计算、实时数仓建设、ETL(提取-转换-加载)多存储系统之间进行数据转化和迁移等场景。同时,Flink也适用于事件驱动型应用场景,如以kafka为代表的消息队列等。

1.Winows系统安装Flink

下载地址:Downloads | Apache Flink

选择 Apache Flink 1.16.0 - 2022-10-28 (Binaries)

下载 flink-1.16.0-bin-scala_2.12.tgz

使用CMD窗口,在Flink安装路径/bin目录下启动start-cluster.bat

访问http://localhost:8081,界面如下:

2.使用Flink实现MySQL数据库之间数据同步(JAVA)

<flink.version>1.16.0</flink.version>
<flink-cdc.version>2.3.0</flink-cdc.version>

1.创建Flink流处理运行环境。

2.设置流处理并发数。

3.设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步。

4.在Flink中创建中间同步数据库。

5.在Flink中创建中间表flink_source,来源于MySQL表source,(注意connector为mysql-cdc)。

6.在Flink中创建中间表flink_sink,来源于MySQL表sink。

7.将Flink中间表来源表数据写入flink_sink表,Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步。

package com.demo.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkCdcMySql {public static void main(String[] args) {System.out.println("==========start run FlinkCdcMySql#main.");// 创建Flink流处理运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081);// 设置流处理并发数env.setParallelism(3);// 设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步env.enableCheckpointing(5000);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 在Flink中创建中间同步数据库tEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_test");// 在Flink中创建中间表flink_source,来源于MySQL表source// 注意connector为mysql-cdctEnv.executeSql("CREATE TABLE flink_test.flink_source (\n" +"    id int,\n" +"    name varchar(255),\n" +"    create_time TIMESTAMP\n," + // Flink不支持datetime格式"    PRIMARY KEY (id) NOT ENFORCED" + //主键必须标明NOT ENFORCED") WITH (\n" +"  'connector'  = 'mysql-cdc',\n" +"  'hostname'   = '127.0.0.1',\n" +"  'database-name'   = 'flink-source',\n" +"  'table-name' = 'source',\n" +"  'username'   = 'root',\n" +"  'password'   = 'root'\n" +")");// 在Flink中创建中间表flink_sink,来源于MySQL表sinktEnv.executeSql("CREATE TABLE flink_test.flink_sink (\n" +"    id int,\n" +"    name varchar(255),\n" +"    create_time TIMESTAMP\n," +"    PRIMARY KEY (id) NOT ENFORCED" +") WITH (\n" +"  'connector'  = 'jdbc',\n" +"  'url'        = 'jdbc:mysql://127.0.0.1:3306/flink-sink',\n" +"  'table-name' = 'sink',\n" +"  'driver'     = 'com.mysql.jdbc.Driver',\n" +"  'username'   = 'root',\n" +"  'password'   = 'root'\n" +")");//        Table transactions = tEnv.from("flink_source");
//        transactions.executeInsert("flink_sink");System.out.println("==========begin Mysql data cdc.");// 将Flink中间表来源表数据写入flink_sink表// Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步tEnv.executeSql("INSERT INTO flink_test.flink_sink(id, name, create_time)\n" +"select id, name, create_time\n" +"from flink_test.flink_source\n");System.out.println("==========continue Mysql data cdc.");}}

git代码地址:

flink-cdc-MySQL: FlinkCDC实现MySQL之间数据同步

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/255507.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于matlab的密度散点图绘制

1. 什么是密度散点图&#xff1f; 密度散点图就是在普通散点图的基础上&#xff0c;基于样本点一定范围的样本数计算该样本点的密度&#xff0c;以不同的颜色来显示样本点密度的大小&#xff0c;这样能够直观的显示出数据的空间聚集情况&#xff0c;如下图分别是二维和三维密度…

###C语言程序设计-----C语言学习(11)#数据的存储和基本数据类型

前言&#xff1a;感谢您的关注哦&#xff0c;我会持续更新编程相关知识&#xff0c;愿您在这里有所收获。如果有任何问题&#xff0c;欢迎沟通交流&#xff01;期待与您在学习编程的道路上共同进步。 一. 数据的存储 1.整型数据的存储 计算机处理的所有信息都以二进制形式表示…

Nginx实战:1-安装搭建

目录 前言 一、yum安装 二、编译安装 1.下载安装包 2.解压 3.生成makefile文件 4.编译 5.安装执行 6.执行命令软连接 7.Nginx命令 前言 nginx的安装有两种方式&#xff1a; 1、yum安装&#xff1a;安装快速&#xff0c;但是无法在安装的时候带上想要的第三方包 2、…

windowsserver 2016 PostgreSQL9.6.3-2升级解决其安全漏洞问题

PostgreSQL 身份验证绕过漏洞(CVE-2017-7546) PostgreSQL 输入验证错误漏洞(CVE-2019-10211) PostgreSQL adminpack扩展安全漏洞(CVE-2018-1115) PostgreSQL 输入验证错误漏洞(CVE-2021-32027) PostgreSQL SQL注入漏洞(CVE-2019-10208) PostgreSQL 安全漏洞(CVE-2018-1058) …

DolphinScheduler-3.2.0 集群搭建

本篇文章主要记录DolphinScheduler-3.2.0 集群部署流程。 注&#xff1a;参考文档&#xff1a; DolphinScheduler-3.2.0生产集群高可用搭建_dophinscheduler3.2.0 使用说明-CSDN博客文章浏览阅读1.1k次&#xff0c;点赞25次&#xff0c;收藏23次。DolphinScheduler-3.2.0生产…

NGINX upstream、stream、四/七层负载均衡以及案例示例

文章目录 前言1. 四/七层负载均衡1.1 开放式系统互联模型 —— OSI1.2 四/七层负载均衡 2. Nginx七层负载均衡2.1 upstream指令2.2 server指令和负载均衡状态与策略2.2.1 负载均衡状态2.2.2 负载均衡策略 2.3 案例 3. Nginx四层负载均衡的指令3.1 stream3.2 upstream指令3.3 四…

基于SpringBoot+Vue的服装销售商城系统

末尾获取源码作者介绍&#xff1a;大家好&#xff0c;我是墨韵&#xff0c;本人4年开发经验&#xff0c;专注定制项目开发 更多项目&#xff1a;CSDN主页YAML墨韵 学如逆水行舟&#xff0c;不进则退。学习如赶路&#xff0c;不能慢一步。 目录 一、项目简介 二、开发技术与环…

【知识整理】接手新技术团队、管理团队

引言 针对目前公司三大技术中心的不断升级&#xff0c;技术管理岗位要求越来越高&#xff0c;且团队人员特别是管理岗位的选择任命更是重中之重&#xff0c;下面针对接手新的技术团队做简要整理&#xff1b; 一、实践操作 1、前期准备 1、熟悉情况&#xff1a; 熟悉人员&am…

【Linux环境基础开发工具的使用(yum、vim、gcc、g++、gdb、make/Makefile)】

Linux环境基础开发工具的使用yum、vim、gcc、g、gdb、make/Makefile Linux软件包管理器- yumLinux下安装软件的方式认识yum查找软件包安装软件如何实现本地机器和云服务器之间的文件互传卸载软件 Linux编辑器 - vimvim的基本概念vim下各模式的切换vim命令模式各命令汇总vim底行…

【开源】SpringBoot框架开发个人健康管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 健康档案模块2.2 体检档案模块2.3 健康咨询模块 三、系统展示四、核心代码4.1 查询健康档案4.2 新增健康档案4.3 查询体检档案4.4 新增体检档案4.5 新增健康咨询 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpri…

如何入门AI Agent?

随着chatgpt问世&#xff0c;大模型已经在加速各行各业的变革&#xff0c;这是我之前对AI Agent行业的粗浅判断。 下面给大家介绍一下如何制作AI Agent&#xff0c;我会用我开发的全赞AI为例子进行简要的介绍&#xff0c;下面是一种工具型AI Agent的框架图 这是一个大量使用工具…

docker复习笔记01(小滴课堂)安装+部署mysql

查看内核版本。 关闭防火墙&#xff1a; 查看docker版本&#xff1a; 下载阿里yum源&#xff1a; 再看一下yum版本都有哪些&#xff1a; 我们可以看的docker-ce了。 安装它&#xff1a; 设置docker服务开机启动&#xff1a; 更新日志文件&#xff1a; 启动docker&#xff1a; …

LLM之LangChain(七)| 使用LangChain,LangSmith实现Prompt工程ToT

如下图所示&#xff0c;LLM仍然是自治代理的backbone&#xff0c;可以通过给LLM增加以下模块来增强LLM功能: Prompter AgentChecker ModuleMemory moduleToT controller 当解决具体问题时&#xff0c;这些模块与LLM进行多轮对话。这是基于LLM的自治代理的典型情况&#xff0c;…

15.3 Redis入门(❤❤❤❤)

15.3 Redis入门❤❤❤❤ 1. redis简介与配置1.1 简介1.2 Windows安装1.3 Linux安装1.4 守护进程方式启动1.5 客户端启动与使用1.6 指定生成日志 2. 使用2.1 客户端redis使用命令2.2 redis存储的数据类型1. String字符串类型2. Hash键值类型3. List列表类型4. Set与Zset集合类型…

JVM-双亲委派机制

双亲委派机制定义 双亲委派机制指的是&#xff1a;当一个类加载器接收到加载类的任务时&#xff0c;会自底向上查找是否加载过&#xff0c; 再由顶向下进行加载。 详细流程 每个类加载器都有一个父类加载器。父类加载器的关系如下&#xff0c;启动类加载器没有父类加载器&am…

基于springboot + Thymeleaf + vue开发的 个人博客,含前后端

作者&#xff1a;ChenZhen 本人不常看网站消息&#xff0c;有问题通过下面的方式联系&#xff1a; 邮箱&#xff1a;1583296383qq.comvx: ChenZhen_7 我的个人博客地址&#xff1a;https://www.chenzhen.space/&#x1f310; 版权&#xff1a;本文为博主的原创文章&#xff…

【深度学习】实验7布置,图像超分辨

清华大学驭风计划 因为篇幅原因实验答案分开上传&#xff0c; 实验答案链接http://t.csdnimg.cn/P1yJF 如果需要更详细的实验报告或者代码可以私聊博主 有任何疑问或者问题&#xff0c;也欢迎私信博主&#xff0c;大家可以相互讨论交流哟~~ 深度学习训练营 案例 7 &#xff1…

Python爬虫——请求库安装

目录 1.打开Anaconda Prompt 创建环境2.安装resuests3.验证是否安装成功4.安装Selenium5.安装ChromeDriver5.1获取chrom的版本5.1.1点击浏览器右上三个点5.1.2点击设置5.1.3下拉菜单&#xff0c;点击最后关于Chrome&#xff0c;获得其版本 5.2 打开网址 [chromedriver](https:/…

vue3:25—其他API

目录 1、shallowRef和shallowReactive 2、readonly与shallowReadonly readonly shallowReadonly 3、toRaw和markRaw toRaw markRaw 4、customRef 1、shallowRef和shallowReactive shallowRef 1.作用:创建一个响应式数据&#xff0c;但只对顶层属性进行响应式处理。2…

2024-02-07(Sqoop,Flume)

1.Sqoop的增量导入 实际工作中&#xff0c;数据的导入很多时候只需要导入增量的数据&#xff0c;并不需要将表中的数据每次都全部导入到hive或者hdfs中&#xff0c;因为这样会造成数据重复问题。 增量导入就是仅导入新添加到表中的行的技术。 sqoop支持两种模式的增量导入&a…