Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks方案

  • 准备
    • 准备 Flink Standalone 集群
    • 准备 docker compose
    • 为 MySQL 准备记录
    • 使用 Flink CDC CLI 提交作业
  • 同步架构和数据更改
  • 路由变更
  • 清理

本教程将展示如何使用 Flink CDC 快速构建从 MySQL 到 StarRocks 的 Streaming ELT 作业,包括同步一个数据库的所有表、模式变更演变和将分片表同步到一张表的功能。
本教程中的所有练习都在 Flink CDC CLI 中执行,整个过程使用标准 SQL 语法,无需一行 Java/Scala 代码或 IDE 安装。

准备

准备一台安装了 Docker 的 Linux 或 MacOS 电脑。

准备 Flink Standalone 集群

下载 Flink 1.18.0 ,解压得到 flink-1.18.0 目录。

使用以下命令进入 Flink 目录,并将 FLINK_HOME 设置为 flink-1.18.0 所在的目录。

cd flink-1.18.0

通过将以下参数附加到 conf/flink-conf.yaml 配置文件来启用检查点,每 3 秒执行一次检查点。

execution.checkpointing.interval: 3000

使用以下命令启动 Flink 集群。

./bin/start-cluster.sh

如果启动成功,你就可以通过http://localhost:8081/访问Flink Web UI,如下所示。
在这里插入图片描述
多次执行start-cluster.sh可以启动多个TaskManager。

准备 docker compose

以下教程将使用 docker-compose 准备所需的组件。使用下面提供的内容创建 docker-compose.yml 文件:

version: '2.1'
services:StarRocks:image: starrocks/allin1-ubuntu:3.2.6ports:- "8080:8080"- "9030:9030"MySQL:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

Docker Compose 应包含以下服务(容器):

  • MySQL:包含一个名为 app_db 的数据库
  • StarRocks:存储来自 MySQL 的表
docker-compose up -d

该命令会自动以分离模式启动 Docker Compose 配置中定义的所有容器。运行 docker ps 检查这些容器是否正常运行。您也可以访问 http://localhost:8030/ 检查 StarRocks 是否正在运行。

为 MySQL 准备记录

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建 app_db 数据库和订单、产品、发货表,然后插入记录

-- create database
CREATE DATABASE app_db;USE app_db;-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

使用 Flink CDC CLI 提交作业

  • 下载下面列出的二进制压缩包并解压到目录 flink cdc-3.1.0’:
    flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 目录下会包含四个目录:bin、lib、log、conf。
  • 下载下面列出的连接器包并移动到 lib 目录
    下载链接只针对稳定版本,SNAPSHOT 依赖需要自行基于 master 或 release 分支构建。请注意,需要将 jar 移动到 Flink CDC Home 的 lib 目录,而不是 Flink Home 的 lib 目录。
    • MySQL 管道连接器 3.1.0
    • StarRocks 管道连接器 3.1.0

您还需要将 MySQL 连接器放入 Flink lib 文件夹或使用 --jar 参数传递它,因为它们不再与 CDC 连接器一起打包:

  • MySQL Connector Java

编写任务配置yaml文件。下面是同步整个数据库的示例文件mysql-to-starrocks.yaml:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksname: StarRocks Sinkjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8080username: rootpassword: ""table.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to StarRocksparallelism: 2

注意:

  • source 中的 tables: app_db..* 通过正则匹配同步 app_db 中的所有表。
  • sink 中的 table.create.properties.replication_num 是因为 Docker 镜像中只有一个 StarRocks BE 节点。

最后,使用Cli将作业提交到Flink Standalone集群。

bash bin/flink-cdc.sh mysql-to-starrocks.yaml

提交成功后返回信息如下:

Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks

我们可以通过 Flink Web UI 找到一个名为“Sync MySQL Database to StarRocks“的作业正在运行。
在这里插入图片描述
通过Dbeaver等数据库连接工具使用mysql://127.0.0.1:9030连接jdbc,可以在StarRocks中查看写入三张表的数据。

在这里插入图片描述

同步架构和数据更改

进入MySQL容器

docker-compose exec mysql mysql -uroot -p123456

然后修改MySQL中的schema和记录,Doris的表也会实时改变:

在MySQL中的orders中插入一条记录:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

在 MySQL 的订单中添加一列:

ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

从 MySQL 更新订单中的一条记录:

UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

从 MySQL 中删除订单中的一条记录:

DELETE FROM app_db.orders WHERE id=2;

每执行一步刷新一下Dbeaver,可以看到StarRocks中展示的订单表会实时更新,如下图:
在这里插入图片描述
同样的,通过修改shipping和products表,你也可以在StarRocks中实时看到同步修改的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置。
利用此功能,我们可以实现表名、数据库名替换、全库同步等功能。以下是使用路由功能的示例文件:

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: starrocksjdbc-url: jdbc:mysql://127.0.0.1:9030load-url: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to StarRocksparallelism: 2

通过上面的路由配置,我们可以将app_db.orders的表结构和数据同步到ods_db.ods_orders中,从而实现数据库迁移的功能。具体来说,source-table支持正则匹配多表来同步分库分表,如下:

route:- source-table: app_db.order\.*sink-table: ods_db.ods_orders

这样我们就可以将app_db.order01、app_db.order02、app_db.order03等分片表同步到一张ods_db.ods_orders表中了。

注意,目前还不支持多张表存在相同主键数据的场景,后续版本会支持。

清理

完成教程后,运行以下命令停止docker-compose.yml目录中的所有容器:

docker-compose down

在Flink flink-1.18.0目录下,执行以下命令停止Flink集群:

./bin/stop-cluster.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/457619.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Rust 力扣 - 1. 两数相加

文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 我们使用一个全局的备忘录,然后我们遍历数组,如果当前元素在备忘录里面找到了,就返回备忘录里面记录的下标和当前下标记录,没找到就把当前元素匹配的元素和当前元素…

DEVOPS: 容器与虚拟化与云原生

概述 传统虚拟机,利用 hypervisor,模拟出独立的硬件和系统,在此之上创建应用虚拟机是一个主机模拟出多个主机虚拟机需要先拥有独立的系统docker 是把应用及配套环境独立打包成一个单位docker 是在主机系统中建立多个应用及配套环境docker 是…

【WiFi7】 支持wifi7的手机

数据来源 Smartphones with WiFi 7 - list of all latest phones 2024 Motorola Moto X50 Ultra 6.7" 1220x2712 Snapdragon 8s Gen 3 16GB RAM 1024 GB 4500 mAh a/b/g/n/ac/6e/7 Sony Xperia 1 VI 6.5" 1080x2340 Snapdragon 8 Gen 3 12GB RAM 512 G…

基于JAVASE的题

字符集合 描述: 每组数据输入一个字符串,字符串最大长度为100,且只包含字母,不可能为空串,区分大小写。 每组数据一行,按字符串原有的字符顺序,输出字符集合,记重复出现并靠后的字…

【数学二】多元函数积分学-重积分-二重积分定义、性质、计算

考试要求 1、了解多元函数的概念,了解二元函数的几何意义. 2、了解二元函数的极限与连续的概念,了解有界闭区域上二元连续函数的性质. 3、了解多元函数偏导数与全微分的概念,会求多元复合函数一阶、二阶偏导数,会求全微分&#x…

以 6502 为例讲讲怎么阅读 CPU 电路图

开篇 你是否曾对 CPU 的工作原理充满好奇,以及简单的晶体管又是如何组成逻辑门,进而构建出复杂的逻辑电路实现?本文将以知名的 6502 CPU 的电路图为例,介绍如何阅读 CPU 电路图,并向你演示如何从晶体管电路还原出逻辑…

RISC-V笔记——显式同步

1. 前言 RISC-V的RVWMO模型主要包含了preserved program order、load value axiom、atomicity axiom、progress axiom和I/O Ordering。今天主要记录下preserved program order(保留程序顺序)中的Explicit Synchronization(显示同步)。 2. 显示同步 显示同步指的是&#xff1a…

ArcGIS计算落入面图层中的线的长度或面的面积

本文介绍在ArcMap软件中,计算落入某个指定矢量面图层中的另一个线图层的长度、面图层的面积等指标的方法。 如下图所示,现在有2个矢量要素集,其中一个为面要素,表示某些区域;另一个为线要素,表示道路路网。…

软考系统分析师知识点二四:错题集11-20

前言 今年报考了11月份的软考高级:系统分析师。 考试时间:11月9日。 倒计时:13天。 目标:优先应试,其次学习,再次实践。 复习计划第二阶段:刷选择题,搜集错题集反复查看&#x…

Pr 视频效果:波形变形

视频效果/扭曲/波形变形 Distort/Wave Warp 波形变形 Wave Warp效果用于在剪辑上创建类似波浪的动态变形效果。 此效果会自动动画化,波形以恒定速度移动。要改变速度或停止波动,需要设置关键帧。 ◆ ◆ ◆ 效果选项说明 通过调整波形的类型、高度、宽度…

《分布式机器学习模式》:解锁分布式ML的实战宝典

在大数据和人工智能时代,机器学习已经成为推动技术进步的重要引擎。然而,随着数据量的爆炸性增长和模型复杂度的提升,单机环境下的机器学习已经难以满足实际需求。因此,将机器学习应用迁移到分布式系统上,成为了一个不…

Flutter鸿蒙next 中如何实现 WebView【跳、显、适、反】等一些基础问题

✅近期推荐:求职神器 https://bbs.csdn.net/topics/619384540 🔥欢迎大家订阅系列专栏:flutter_鸿蒙next 💬淼学派语录:只有不断的否认自己和肯定自己,才能走出弯曲不平的泥泞路,因为平坦的大路…

【计算机操作系统】课程 作业二 进程与线程 408考研

作业二 进程与线程 1.根据下图,回答问题。(共65分) (1) 请简述进程发生状态变迁1、3、4、6、7的原因。(每条5分.共25分) 1表示操作系统把处于创建状态的进程移入就绪队列;3表示进程…

.Net 8 Web API CRUD 操作

本次介绍分为3篇文章: 1:.Net 8 Web API CRUD 操作https://blog.csdn.net/hefeng_aspnet/article/details/143228383 2:在 .Net 8 API 中实现 Entity Framework 的 Code First 方法https://blog.csdn.net/hefeng_aspnet/article/details/1…

【LeetCode:264. 丑数 II + 小根堆】

在这里插入代码片 🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕…

stm32 使用J-Link RTT Viewer打印日志

文章目录 stm32 使用J-Link RTT Viewer一、RTT功能简介二、准备工作安装J-Link软件驱动:获取RTT驱动文件:配置工程: 三、使用RTT打印日志初始化RTT:打印日志:查看日志: 四、高级功能封装print_log函数&…

021、深入解析前端请求拦截器

目录 深入解析前端请求拦截器: 1. 引言 2. 核心实现与基础概念 2.1 基础拦截器实现 2.2 响应拦截器配置 3. 实际应用场景 3.1 完整的用户认证系统 3.2 文件上传系统 3.3 API请求缓存系统 3.4 请求重试机制 3.5 国际化处理 4. 性能优化实践 4.1 请求合并…

三周精通FastAPI:15 请求文件和同时请求表单+文件

官网文档:请求文件 - FastAPI 请求文件 File 用于定义客户端的上传文件。 from fastapi import FastAPI, File, UploadFileapp FastAPI()app.post("/files/") async def create_file(file: bytes File()):return {"file_size": len(file)}…

直播系统源码技术搭建部署流程及配置步骤

系统环境要求 PHP版本:5.6、7.3 Mysql版本:5.6,5.7需要关闭严格模式 Nginx:任何版本 Redis:需要给所有PHP版本安装Redis扩展,不需要设置Redis密码 最好使用面板安装:宝塔面板 - 简单好用的…

Kafka消费者故障,出现活锁问题如何解决?

大家好,我是锋哥。今天分享关于【Kafka消费者故障,出现活锁问题如何解决?】面试题?希望对大家有帮助; Kafka消费者故障,出现活锁问题如何解决? 1000道 互联网大厂Java工程师 精选面试题-Java资…