使用flink sqlserver cdc 同步数据到StarRocks

前沿: flink cdc功能越发强大,支持的数据源也越多,本篇介绍使用flink cdc实现:

sqlserver-》(using flink cdc)-〉flink -》(using flink starrocks connector)-〉starrocks整个流程

1.sqlserver 环境准备(得使用sqlserver 16以下版本,flink cdc当前只支持16以下sqlserver版本)

我这个使用的是docker环境:

xiuchenggong@xiuchengdeMacBook-Pro ~ % docker images
REPOSITORY                                          TAG                            IMAGE ID       CREATED         SIZE
starrocks.docker.scarf.sh/starrocks/allin1-ubuntu   latest                         4d3c0066a012   3 days ago      4.71GB
mcr.microsoft.com/mssql/server                      2019-latest                    e7fc0b49be3c   4 weeks ago     1.47GB
mcr.microsoft.com/mssql/server                      2022-latest                    683d523cd395   5 weeks ago     2.9GB
federatedai/standalone_fate                         latest                         6019ec787699   9 months ago    5.29GB
milvusdb/milvus                                     v2.1.4                         d9a5c977c414   11 months ago   711MB
starrocks/dev-env                                   main                           8f4edba3b115   16 months ago   7.65GB
minio/minio                                         RELEASE.2022-03-17T06-34-49Z   239acc52a73a   17 months ago   228MB
kicbase/stable                                      v0.0.29                        64d09634c60d   20 months ago   1.14GB
quay.io/coreos/etcd                                 v3.5.0                         a7908fd5fb88   2 years ago     110MB
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest
docker exec -it --user root sql_server_2019 bash

开启代理,重启sqlserver环境,连接: 

xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
root@99e196828047:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
SQL Server needs to be restarted in order to apply this setting. Please run
'systemctl restart mssql-server.service'.
root@99e196828047:/# exit
exit
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker restart sql_server_2019
sql_server_2019
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bashroot@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"

开启sqlserver cdc功能:


root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"
1> use cdc_test;
2> go
Changed database context to 'cdc_test'.
1> EXEC sys.sp_cdc_enable_db;
2> go
1> SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
2> go
is_cdc_enabled1> CREATE TABLE orders (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id]))
2> go
1>
2>
3> EXEC sys.sp_cdc_enable_table
4> @source_schema = 'dbo',
5> @source_name   = 'orders',
6> @role_name     = 'cdc_role';
7> goJob 'cdc.cdc_test_capture' started successfully.
Job 'cdc.cdc_test_cleanup' started successfully.

插入一些数据:

1> select * from orders;
2> go
id          order_date       purchaser   quantity    product_id
----------- ---------------- ----------- ----------- -----------1       2023-07-07           1           1           12       2023-07-07           2           2           23       2023-07-07           3           3           34       2023-07-07           4           4           445       2023-07-07           5           5           5(5 rows affected)
1> update orders set quantity = 100 where id =1 ;
2> go(1 rows affected)
1> select * from orders;
2> go
id          order_date       purchaser   quantity    product_id
----------- ---------------- ----------- ----------- -----------1       2023-07-07           1         100           12       2023-07-07           2           2           23       2023-07-07           3           3           34       2023-07-07           4           4           445       2023-07-07           5           5           5(5 rows affected)
1> update orders set quantity = 200 where id = 2;
2> go

2.准备flink环境:

  • 下载flink 1.16.2 (官网下载)
  • 下载flink sqlserver cdc 2.2.0 (Central Repository: com/ververica/flink-cdc-connectors)
  • 下载flink starrocks connector 1.15(这个应该也要下载对应版本1.16.2,但官方还没出,我拿1.15测试了也ok)下载链接:Release Release 1.2.6 · StarRocks/starrocks-connector-for-apache-flink · GitHub

3.准备starrocks docker环境:

见链接:使用 Docker 部署 StarRocks @ deploy_with_docker @ StarRocks Docs

4.启动flink环境(cd {FLINK_HOME}):

xiuchenggong@xiuchengdeMacBook-Pro bin % ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host xiuchengdeMacBook-Pro.local.
Starting taskexecutor daemon on host xiuchengdeMacBook-Pro.local.
xiuchenggong@xiuchengdeMacBook-Pro bin % ./sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/flink-1.16.2/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Command history file path: /Users/xiuchenggong/.flink-sql-historyFlink SQL> 

建sqlsever到flink的表:

Flink SQL> CREATE TABLE t_source_sqlserver (
>     id INT,
>     order_date DATE,
>     purchaser INT,
>     quantity INT,
>     product_id INT,
>     PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
> ) WITH (
>     'connector' = 'sqlserver-cdc',  -- 使用SQL Server CDC连接器
>     'hostname' = 'localhost',  -- SQL Server主机名
>     'port' = '30027',               -- SQL Server端口
>     'username' = 'sa',              -- SQL Server用户名
>     'password' = 'abc@123456',      -- SQL Server密码
>     'database-name' = 'cdc_test',   -- 数据库名称
>     'schema-name' = 'dbo',          -- 模式名称
>     'table-name' = 'orders'         -- 要捕获更改的表名
> );

 再建flink到starrocks的表:

Flink SQL> 
> 
> CREATE TABLE IF NOT EXISTS `orders_sink` (
>      id int,
>      order_date date,
>      purchaser int,
>      quantity int,
>      product_id int,
>      PRIMARY KEY(`id`) NOT ENFORCED
> ) with (
> 'load-url' = 'localhost:8030',
> 'sink.buffer-flush.interval-ms' = '15000',
> 'sink.properties.row_delimiter' = '\x02',
> 'sink.properties.column_separator' = '\x01',
> 'connector' = 'starrocks',
> 'database-name' = 'test',
> 'table-name' = 'orders',
> 'jdbc-url' = 'jdbc:mysql://localhost:9030',
> 'password' = '',
> 'username' = 'root'
> )
> ;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+--------------------+
|         table name |
+--------------------+
|        orders_sink |
| t_source_sqlserver |
+--------------------+
2 rows in set

提交作业:

Flink SQL> insert into orders_sink select * from t_source_sqlserver;
[INFO] Submitting SQL update statement to the cluster...
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/xiuchenggong/flink/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 746cc173cd71133e96d080f25327e9bc

flink webui看到长期驻留的作业:

5.验证在sqlserver中的数据是不是已经同步到starrocks中了,insert/update/delete:


StarRocks > select * from orders;
+------+------------+-----------+----------+------------+
| id   | order_date | purchaser | quantity | product_id |
+------+------------+-----------+----------+------------+
|    1 | 2023-07-07 |         1 |      100 |          1 |
|    3 | 2023-07-07 |         3 |        3 |          3 |
|    4 | 2023-07-07 |         4 |        4 |          4 |
|   45 | 2023-07-07 |         5 |        5 |          5 |
|    2 | 2023-07-07 |         2 |      200 |          2 |
+------+------------+-----------+----------+------------+
5 rows in set (0.016 sec)StarRocks >

数据的增删改都同步过去了;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/115582.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小游戏分发平台如何以技术拓流?

2023年&#xff0c;小游戏的发展将受到多方面的影响&#xff0c;例如新技术的引入、参与小游戏的新玩家以及游戏市场的激烈竞争等。首先&#xff0c;新技术如虚拟现实&#xff08;VR&#xff09;、增强现实&#xff08;AR&#xff09;和机器人技术都可以带来新颖的游戏体验。其…

嘉泰实业和您共创未来财富生活

每一次暖心的沟通都是一次公益,真诚不会因为它的渺小而被忽略;每一声问候都是一次公益,善意不会因为它的普通而被埋没。熟悉嘉泰实业的人都知道,这家企业不但擅长在金融理财领域里面呼风唤雨,同时也非常擅长在公益事业当中践行,属于企业的责任心,为更多有困难的群体带来大爱的传…

大数据课程K13——Spark的距离度量相似度度量

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Spark的距离度量和相似度度量; ⚪ 掌握Spark的欧氏距离; ⚪ 掌握Spark的曼哈顿距离; ⚪ 掌握Spark的切比雪夫距离; ⚪ 掌握Spark的最小二乘法; 一、距离度量和相似度度量 1. …

打磨 8 个月、功能全面升级,Milvus 2.3.0 文字发布会现在开始!

Milvus 社区的各位伙伴&#xff1a; 大家晚上好&#xff01;欢迎来到 Milvus 2.3.0 文字发布会&#xff01; 作为整个团队的匠心之作&#xff0c;Milvus 2.3.0 历经 8 个月的设计与打磨&#xff0c;无论在新功能、应用场景还是可靠度方面都有不小的提升。 具体来看&#xff1a;…

UG\NX CAM二次开发 插入工序 UF_OPER_create

文章作者:代工 来源网站:NX CAM二次开发专栏 简介: UG\NX CAM二次开发 插入工序 UF_OPER_create 效果: 代码: void MyClass::do_it() {tag_t setup_tag=NULL_TAG;UF_SETUP_ask_setup(&setup_tag);if (setup_tag==NULL_TAG){uc1601("请先初始化加工环境…

【Ubuntu】解决ubuntu虚拟机和物理机之间复制粘贴问题(无需桌面工具)

解决Ubuntu虚拟机和物理机之间复制粘贴问题 第一步 先删除原来的vmware tools&#xff08;如果有的话&#xff09; sudo apt-get autoremove open-vm-tools第二步 安装软件包&#xff0c;一般都是用的desktop版本&#xff08;如果是server换一下&#xff09; sudo apt-get …

开源vue动态表单组件

一、项目简介 vueelement的动态表单组件&#xff0c;拖拽组件到面板即可实现一个表单 二、实现功能 支持拖拽 支持输入框 支持文本框 支持数字输入框 支持下拉选择器 支持多选框 支持日期控件 支持开关 支持动态表格 支持上传图片 支持上传文件 支持标签 支持ht…

简单了解网络传输介质

目录 一、同轴电缆 二、双绞线 三、光纤 四、串口电缆 一、同轴电缆 10BASE前面的数字表示传输带宽为10M&#xff0c;由于带宽较低、现在已不再使用。 50Ω同轴电缆主要用来传送基带数字信号&#xff0c;因此也被称作为基带同轴电缆&#xff0c;在局域网中得到了广泛的应用…

基于OFDM的水下图像传输通信系统matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 function [rx_img] func_TR(tx_img, num_path, pathdelays, pathgains, snr) rng(default); …

AI图像行为分析算法 opencv

AI图像行为分析算法通过pythonopencv深度学习框架对现场操作行为进行全程实时分析&#xff0c;AI图像行为分析算法通过人工智能视觉能够准确判断出现场人员的作业行为是否符合SOP流程规定&#xff0c;并对违规操作行为进行自动抓拍告警。OpenCV是一个基于Apache2.0许可&#xf…

Ubuntu 22.04安装 —— Win11 22H2

目录 Ubuntu使用下载UbuntuVmware 安装图示安装步骤图示 Ubuntu使用 系统环境&#xff1a; Windows 11 22H2Vmware 17 ProUbutun 22.04.3 Server Ubuntu Server documentation | Ubuntu 下载 Ubuntu 官网下载 建议安装长期支持版本 ——> 可以选择桌面版或服务器版(仅包…

cocos 2.4 版本 设置物理引擎步长 解决帧数不一致的设备 物理表现不一致问题 设置帧刷新率

官网地址Cocos Creator 3.8 手册 - 2D 物理系统 官网好像写的不太对 下面是我自己运行好使的 PhysicsManager.openPhysicsSystem()var manager cc.director.getPhysicsManager();// 开启物理步长的设置manager.enabledAccumulator true;// cc.PhysicsManagercc.PhysicsManag…

飞桨中的李宏毅课程中的第一个项目——PM2.5的预测

所谓的激活函数&#xff0c;就是李宏毅老师讲到的sigmoid函数 和 hard sigmoid函数 &#xff0c;ReLU函数那些 现在一点点慢慢探索&#xff0c;会成为日后想都做不到的经历&#xff0c;当你啥也不会的时候&#xff0c;才是慢慢享受探索的过程。 有一说一&#xff0c;用chatGP…

双基证券:游戏版号发放整体趋势的向好将持续优化供给端

双基证券表示&#xff0c;版号发放整体趋势的向好将继续优化供应端&#xff0c;游戏新产品周期正逐渐开启&#xff0c;各家游戏公司盈余端将逐渐企稳&#xff0c;同时将推进游戏商场规划进一步增加。长时间来看&#xff0c;AIGC等技术对游戏全工业链具有降本增效&#xff0c;提…

WPF+Prism+WebApi 学习总结

一、基本概念 WPF:WPF&#xff08;Windows Presentation Foundation&#xff09;是&#xff08;微软推出的&#xff09;基于Windows的用户界面框架&#xff0c;提供了统一的编程模型&#xff0c;语言和框架&#xff0c;做到了分离界面设计人员与开发人员的工作&#xff1b;WPF…

2.Redis 通用命令

Redis 中最核心的两个命令&#xff1a; set 作用&#xff1a;设置 key 对应的 value 值并存储进去。若key已包含一个值&#xff0c;则无论其类型如何&#xff0c;都会覆盖该值。在SET操作成功时&#xff0c;将丢弃与密钥相关联的任何先前生存时间。 对于上述这里的 key和val…

vue3组合式api bus总线式通信

vue2中可以创建一个 vue 实例&#xff0c; 做为 总结来完成组件间的通信 但是在vue3中&#xff0c; 这种方法是不能使用的。 因为vue3中main.js中&#xff0c; 使用的createApp() 没有机会再写 new Vue了 但是我们可以使用 mitt 的插件来解决这个问题 vue3 bus组件的用法 安装…

前端速查速记系列----评论列表

小程序评论列表 效果图 wxml代码 <view id"econtent"><block wx:for"{{commentlist}}" wx:for-item"item" wx:for-index"index" wx:key"{{item.id}}"><view class"box1"><view class"…

Thymeleaf

这就是自动装配的原理 1) .SpringBoot启动会加载大量的自动配置类 2)、我们看我们需要的功能有没有在SpringBoot默认写好的自动配置类当中; 3)、我们再来看这个自动配置类中到底配置了哪些组件;(只要我们要用的组件存在在其中&#xff0c;我们就不需要再手动配置了) 4)、给容器…

pdfh5在线预览pdf文件

前言 pc浏览器和ios的浏览器都可以直接在线显示pdf文件&#xff0c;但是android浏览器不能在线预览pdf文件&#xff0c;如何预览pdf文件&#xff1f; Github: https://github.com/gjTool/pdfh5 Gitee: https://gitee.com/gjTool/pdfh5 使用pdfh5预览pdf 编写预览页面 <…