flink系列之:使用flink cdc3从mysql数据库同步数据到doris和starrocks

flink系列之:使用flink cdc3从mysql数据库同步数据到doris和starrocks

  • 一、下载部署flink
  • 二、下载部署flink cdc3
  • 三、下载mysql-connector-java到flink和flink cdc的lib目录
  • 四、flink设置checkpoint支持增量同步数据
  • 五、mysql到doris和starrocks的yaml配置文件
  • 六、启动flink和flink cdc
  • 七、查看flink cdc任务同步日志
  • 八、查看mysql表和starrocks表
  • 九、flink cdc技术生产环境应用

一、下载部署flink

  • 下载flink

解压flink

tar -zxvf flink-1.19.1-bin-scala_2.12.tgz

修改flink配置文件config.yaml

taskmanager:bind-host: localhosthost: localhostnumberOfTaskSlots: 6memory:process:size: 1728mparallelism:default: 1
rest:address: 10.66.77.104# network interface, such as 0.0.0.0.bind-address: 10.66.77.104# port: 8081# # Port range for the REST and web server to bind to.# bind-port: 8080-8090

设置flink 环境变零

cd /etc/profile.d
cat flink.sh #export HADOOP_CLASSPATH=`hadoop classpath`
FLINK_HOME=/data/src/flink/flink-1.19.1
PATH=$PATH:$FLINK_HOME/bin:$FLINK_HOME/sbinexport PATH
export FLINK_HOME

启动flink

./start-cluster.sh

查看jps

jps
760234 StandaloneSessionClusterEntrypoint
390132 Jps
760880 TaskManagerRunner

查看flink web ui,{ip}:{port}
在这里插入图片描述

二、下载部署flink cdc3

  • https://github.com/apache/flink-cdc/releases
    在这里插入图片描述
    解压flink-cdc3
tar -zxvf flink-cdc-3.3.0-bin.tar.gz

下载Pipeline Connectors Jars和Source Connector Jars到lib目录

/data/src/flink/flink-cdc-3.3.0/lib   ls
flink-cdc-dist-3.3.0.jar                              flink-cdc-pipeline-connector-maxcompute-3.3.0.jar  flink-sql-connector-tidb-cdc-3.3.0.jar
flink-cdc-pipeline-connector-doris-3.3.0.jar          flink-cdc-pipeline-connector-mysql-3.3.0.jar       mysql-connector-java-8.0.28.jar
flink-cdc-pipeline-connector-elasticsearch-3.3.0.jar  flink-cdc-pipeline-connector-paimon-3.3.0.jar
flink-cdc-pipeline-connector-kafka-3.3.0.jar          flink-cdc-pipeline-connector-starrocks-3.3.0.jar

三、下载mysql-connector-java到flink和flink cdc的lib目录

https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28

在这里插入图片描述

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

四、flink设置checkpoint支持增量同步数据

  • execution.checkpointing.interval: 3000

参数说明

  • execution.checkpointing.interval: 这个参数用于指定 Flink 作业执行检查点的频率。检查点是 Flink 用于实现容错机制的一种机制,通过定期保存作业的状态,可以在发生故障时恢复到最近的一个检查点。
  • 3000: 这个值表示检查点的间隔时间,单位是毫秒(ms)。因此,3000 毫秒等于 3 秒。

五、mysql到doris和starrocks的yaml配置文件

放到任意目录下

mysql-to-doris.yaml

   source:type: mysqlhostname: ipport: 3306username: *********password: ************tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfoserver-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: dorisfenodes: ip:8030username: ***********password: *************route:- source-table: data_entry_test.debeziumOfflineClusterInfosink-table: optics.debeziumOfflineClusterInfo- source-table: data_entry_test.debeziumRealtimeClusterInfosink-table: optics.debeziumRealtimeClusterInfopipeline:name: Sync MySQL Database to Dorisparallelism: 2

mysql-to-starrocks.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: ipport: 3306username: *********password: **********tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfoserver-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://ip:9030load-url: ip:8030username: ****************password: ****************
route:- source-table: data_entry_test.debeziumOfflineClusterInfosink-table: dd_test_starrocks.debeziumOfflineClusterInfo- source-table: data_entry_test.debeziumRealtimeClusterInfosink-table: dd_test_starrocks.debeziumRealtimeClusterInfo
pipeline:name: MySQL to StarRocks Pipelineparallelism: 6

六、启动flink和flink cdc

启动flink

./start-cluster.sh

启动flink cdc

/data/src/flink/flink-cdc-3.3.0/bin/flink-cdc.sh
/data/src/flink/flink-cdc-3.3.0/conf/mysql-to-starrocks.yaml

flink web ui查看任务
在这里插入图片描述

七、查看flink cdc任务同步日志

2025-02-18 13:48:49,973 INFO  com.starrocks.connector.flink.catalog.StarRocksCatalog       [] - Success to create table dd_test_starrocks.dd_test_starrocks, sql: CREATE TABLE IF NOT EXISTS dd_test_starrocks.debeziumOfflineClusterInfo (
id VARCHAR(21) NOT NULL,
servername VARCHAR(6168) NOT NULL,
connectorname VARCHAR(6168) NOT NULL,
databasename VARCHAR(6168) NOT NULL,
url VARCHAR(6168) NOT NULL,
topicname VARCHAR(6168) NOT NULL,
clustername VARCHAR(6168) NOT NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH (id);
2025-02-18 14:04:25,298 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (1/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:04:25,333 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (2/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_1_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:09:35,729 INFO  com.starrocks.data.load.stream.DefaultStreamLoader           [] - Stream load completed, label : flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97, database : dd_test_starrocks, table : debeziumOfflineClusterInfo, body : {"Status": "OK","Message": "","Label": "flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97","TxnId": 108875857,"LoadBytes": 133959,"StreamLoadPlanTimeMs": 0,"ReceivedDataTimeMs": 0
}

八、查看mysql表和starrocks表

mysql表

-- data_entry_test.debeziumOfflineClusterInfo definitionCREATE TABLE `debeziumOfflineClusterInfo` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',`servername` varchar(2056) NOT NULL COMMENT 'connector标识名',`connectorname` varchar(2056) NOT NULL COMMENT 'connector名称',`databasename` varchar(2056) NOT NULL COMMENT '数据库名',`url` varchar(2056) NOT NULL COMMENT '数据库名',`topicname` varchar(2056) NOT NULL COMMENT 'topic名称',`clustername` varchar(2056) NOT NULL COMMENT '集群名称',`database_server_id` varchar(256) NOT NULL COMMENT '集群名称',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=765 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

starrocks表

-- dd_test_starrocks.debeziumOfflineClusterInfo definitionCREATE TABLE `debeziumOfflineClusterInfo` (`id` varchar(21) NOT NULL COMMENT "",`servername` varchar(6168) NOT NULL COMMENT "",`connectorname` varchar(6168) NOT NULL COMMENT "",`databasename` varchar(6168) NOT NULL COMMENT "",`url` varchar(6168) NOT NULL COMMENT "",`topicname` varchar(6168) NOT NULL COMMENT "",`clustername` varchar(6168) NOT NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
);

如上所示,成功在starrocks表中创建了表,并完成了历史数据和增量数据的同步

九、flink cdc技术生产环境应用

  • 阿里云基于 Flink CDC 的现代数据栈云上实践

细粒度变更策略控制:

  • 支持新增表、新增列、修改列名、修改列定义、删除列、删除表和清空表等操作
    在这里插入图片描述

当上游数据库新增表时,CDC YAML 能够自动识别并同步这些表的数据,而无需重新配置作业。此功能分为两种情况:

  • 历史数据同步:通过开启 scan.newly-added-table.enabled 选项,并通过 savepoint 重启作业来读取新增表的历史数据。
  • 增量数据同步:只需开启 scan.binlog.newly-added-table.enabled 选项,自动同步新增表的增量数据。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/25958.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xr-frame 3D Marker识别,扬州古牌坊 3D识别技术稳定调研

目录 识别物体规范 3D Marker 识别目标文件 map 生成 生成任务状态解析 服务耗时: 对传入的视频有如下要求: 对传入的视频建议: 识别物体规范 为提高Marker质量,保证算法识别效果,可参考Marker规范文档 Marker规…

【pytest框架源码分析一】pluggy源码分析之hook常用方法

简单看一下pytest的源码,其实很多地方是依赖pluggy来实现的。这里我们先看一下pluggy的源码。 pluggy的目录结构如下: 这里主要介绍下_callers.py, _hooks.py, _manager.py,其中_callers.py主要是提供具体调用的功能,_hooks.py提…

一周学会Flask3 Python Web开发-Jinja2模板过滤器使用

锋哥原创的Flask3 Python Web开发 Flask3视频教程: 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 在Jinja2中,过滤器(filter)是一些可以用来修改和过滤变量值的特殊函数,过滤器和变量用一个竖线 | &a…

【官方配图】win10/win11 安装cuda 和 cudnn

文章目录 参考资料1.安装cuda toolkit1. 下载安装包2.安装验证 2. 安装cudnn下载cudnn安装包安装cudnn安装后的配置 参考资料 官方nvidia安装cuda官方nvidia安装cudnn 1.安装cuda toolkit 1. 下载安装包 下载地址 https://developer.nvidia.com/cuda-downloads?target_osW…

Linux Mem -- 关于AArch64 MTE功能的疑问

目录 1.虚拟地址和物理地址映射完成后,才可以设置虚拟地址对应的memory tag ? 2.各种memory allocator中的address tag从哪来,怎么产生? 2.1 vmalloc allocator 2.2 slub分配器 2.3 用户可以指定IRG指令产生的address tag 3.kasan…

FS800DTU联动OneNET平台数据可视化View

目录 1 前言 2 环境搭建 2.1 硬件准备 2.2 软件环境 2.3 硬件连接 3 注册OneNET云平台并建立物模型 3.1 参数获取 3.2 连接OneNET 3.3上报数据 4 数据可视化View 4.1 用户信息获取 4.2 启用数据可视化View 4.3 创建项目 4.4 编辑项目 4.5 新增数据源 4.6 数据过滤器配置 4.6 项…

vscode脚本 shell 调试

插件,按照图片

纯代码实战--用Deepseek+SQLite+Ollama搭建数据库助手

如何用Python调用本地模型实现DeepSeek提示词模板:一步步教你高效解决13种应用场景 从零到一:纯代码联合PyQt5、Ollama、Deepseek打造简易版智能聊天助手 用外接知识库武装大模型:基于Deepseek、Ollama、LangChain的RAG实战解析 纯代码实战–…

Qt监控系统远程回放/录像文件远程下载/录像文件打上水印/批量多线程极速下载

一、前言说明 在做这个功能的时候,着实费了点心思,好在之前做ffmpeg加密解密的时候,已经打通了极速加密保存文件,主要就是之前的类中新增了进度提示信号,比如当前已经处理到哪个position位置,发个信号出来…

《Qt动画编程实战:轻松实现头像旋转效果》

《Qt动画编程实战:轻松实现头像旋转效果》 Qt 提供了丰富的动画框架,可以轻松实现各种平滑的动画效果。其中,旋转动画是一种常见的 UI 交互方式,广泛应用于加载指示器、按钮动画、场景变换等。本篇文章将详细介绍如何使用 Qt 实现…

AIGC生图产品PM必须知道的Lora训练知识!

hihi,其实以前在方向AIGC生图技术原理和常见应用里面已经多次提到Lora的概念了,但是没有单独拿出来讲过,今天就耐心来一下! 🔥 一口气摸透AIGC文生图产品SD(Stable Diffusion)! 一、…

Spring Boot 3.x 基于 Redis 实现邮箱验证码认证

文章目录 依赖配置开启 QQ 邮箱 SMTP 服务配置文件代码实现验证码服务邮件服务接口实现执行流程 依赖配置 <dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spr…

QT day1

作业 代码 class Widget: public QWidget {QPushButton* button; //按钮Widget* other; //显示对面 public:Widget(){button new QPushButton("按钮",this); //控件 认this作父this->resize(300,300); //界面大小button->resize(100,10…

Go红队开发—语法补充

文章目录 错误控制使用自定义错误类型错误包装errors.Is 和 errors.Aspanic捕获、recover 、defer错误控制练习 接口结构体实现接口基本类型实现接口切片实现接口 接口练习Embed嵌入文件 之前有师傅问这个系列好像跟红队没啥关系&#xff0c;前几期确实没啥关系&#xff0c;因为…

linux--多进程开发(5)--进程间通信(IPC)、linux间通信的方式、管道

进程间通讯概念 每两个进程之间都是独立的资源分配单元&#xff0c;不同进程之间不能直接访问另一个进程的资源。 但不同的进程需要进行信息的交互和状态的传递等&#xff0c;因此需要进程间通信&#xff08;IPC,inter processes cimmunication) 进程通信的目的&#xff1a; …

Uniapp开发微信小程序插件的一些心得

一、uniapp 开发微信小程序框架搭建 1. 通过 vue-cli 创建 uni-ap // nodejs使用18以上的版本 nvm use 18.14.1 // 安装vue-cli npm install -g vue/cli4 // 选择默认模版 vue create -p dcloudio/uni-preset-vue plugindemo // 运行 uniapp2wxpack-cli npx uniapp2wxpack --…

RabbitMQ 的介绍与使用

一. 简介 1> 什么是MQ 消息队列&#xff08;Message Queue&#xff0c;简称MQ&#xff09;&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中存放的内容是message而已。 其主要用途&#xff1a;不同进程Process/线程T…

对比Grok3 普通账户与 30 美元 Super 账户:默认模式、Think 和 DeepSearch 次数限制以及如何升级

面对这个马斯克旗下的"最聪明"的人工智能&#xff0c;很多人都不知道他们的基本模式&#xff0c;本期将从几个方面开始说明&#xff1a; Grok3的背景与功能 账户类型及其详细背景 使用限制 使用限制对比表 如何充值使用 Super 账户 纯干货&#xff0c;带你了解…

【含文档+PPT+源码】基于过滤协同算法的旅游推荐管理系统设计与实现

项目介绍 本课程演示的是一款基于过滤协同算法的旅游推荐管理系统设计与实现&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系…

牛客NC288803 和+和

​import java.util.Comparator;import java.util.PriorityQueue;import java.util.Scanner;​public class Main {public static void main(String[] args) {// 创建Scanner对象用于读取输入Scanner sc new Scanner(System.in);// 读取两个整数n和m&#xff0c;分别表示数组的…