Flink CDC系列之:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

Flink CDC系列之:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

  • 一、技术路线
  • 二、MySQL数据库建表
  • 三、PostgreSQL数据库建表
  • 四、在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 五、关联订单数据并且将其写入 Elasticsearch 中
  • 六、Kibana查看商品和物流信息的订单数据
  • 七、修改数据库中表的数据,Kibana查看更新

一、技术路线

在这里插入图片描述

二、MySQL数据库建表

mysql数据库创建数据库和表 products,orders

创建products表

-- MySQLCREATE DATABASE mydb;USE mydb;CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));ALTER TABLE products AUTO_INCREMENT = 101;

创建orders表

CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed) AUTO_INCREMENT = 10001;

products表插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");

orders表插入数据

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

三、PostgreSQL数据库建表

创建表 shipments

-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL);

插入数据

ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;ALTER TABLE public.shipments REPLICA IDENTITY FULL;INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),(default,10002,'Hangzhou','Shanghai',false),   (default,10003,'Shanghai','Hangzhou',false);

四、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

-- Flink SQLFlink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);

五、关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中

-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;

六、Kibana查看商品和物流信息的订单数据

创建 index pattern enriched_orders
在这里插入图片描述
查看写入的数据
在这里插入图片描述

七、修改数据库中表的数据,Kibana查看更新

修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:

在 MySQL 的 orders 表中插入一条数据

--MySQLINSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

在 Postgres 的 shipment 表中插入一条数据

--PG
INSERT INTO shipmentsVALUES (default,10004,'Shanghai','Beijing',false);

在 MySQL 的 orders 表中更新订单的状态

--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

在 Postgres 的 shipment 表中更新物流的状态

--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

在 MYSQL 的 orders 表中删除一条数据

--MySQL
DELETE FROM orders WHERE order_id = 10004;

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/96249.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Hibench 】完成 HDP-Spark 性能测试

🍁 博主 "开着拖拉机回家"带您 Go to New World.✨🍁 🦄 个人主页——🎐开着拖拉机回家_Linux,Java基础学习,大数据运维-CSDN博客 🎐✨🍁 🪁🍁 希望本文能够给您带来一定的…

【C++】map和set的封装

回顾:之前我们一起学习了map,set,multimap,multiset的接口和相关介绍map和se详细介绍 还实现了AVL树以及RB树,旋转,旋转变色相信大家已经很熟悉啦手撕AVL和红黑树 本文将带大家一起封装map和set 目录 1.找…

在海外如何进行应用商店的关键词优化

分析市场,了解我们的应用类别,将我们的应用与竞争对手的优点和缺点进行比较,找到市场上的空白或所谓未满足的需求,并思考如何填补。 1、应用商店关键词优化。 关键词优化的目的是找到最相关的关键词 ,并测试应用元数据…

[保研/考研机试] KY26 10进制 VS 2进制 清华大学复试上机题 C++实现

题目链接: 10进制 VS 2进制http://www.nowcoder.com/share/jump/437195121691738172415 描述 对于一个十进制数A,将A转换为二进制数,然后按位逆序排列,再转换为十进制数B,我们称B为A的二进制逆序数。 例如对于十进制…

创作的1024天 分享月入5K的副业心得

机缘 今天早上醒来打开电脑,和往常一样点开csdn,看见有一封私信,原来是系统通知,今天是我成为创作者的1024天,那就趁着这个机会,分享一下目前我月入5K的副业心得。我是一个普通人,最初想成为创…

ARM 作业1

一、思维导图 二、 1. 2. .text 文本段 .globl _start 声明_start:mov r0,#0mov r1,#0fun:cmp r1,#100bhi stopadd r0,r0,r1add r1,r1,#1b fun stop:b stop .end

【python办公自动化】PysimpleGUI中的popup弹窗中的按钮设置居中

PysimpleGUI中的popup弹窗中的按钮设置居中 背景问题解决背景 默认的popup弹窗中的OK按钮是在最下面偏左侧一些,有时需要将按钮放置居中 问题解决 首先找到pysimplegui源代码文件中popup的部分 然后定位到19388行,源文件内容如下 关于popup弹窗OK按钮的设置,将pad属性…

《合成孔径雷达成像算法与实现》Figure3.10

代码复现如下: clc clear close all% 参数设置 TBP 100; % 时间带宽积 T 7.2e-6; % 脉冲持续时间 t_0 1e-6; % 脉冲回波时延% 参数计算 B TBP/T; …

STM32 cubemx CAN

接收用到的结构体如下:CAN概念: 全称Controller Area Network,是一种半双工,异步通讯。 物理层: 闭环:允许总线最长40m,最高速1Mbps,规定总线两端各有一个120Ω电阻,闭环…

马上七夕到了,用各种编程语言实现10种浪漫表白方式

目录 1. 直接表白:2. 七夕节表白:3. 猜心游戏:4. 浪漫诗句:5. 爱的方程式:6. 爱心Python:7. 心形图案JavaScript 代码:8. 心形并显示表白信息HTML 页面:9. Java七夕快乐:…

Jenkins改造—nginx配置鉴权

先kill掉8082的端口进程 netstat -natp | grep 8082 kill 10256 1、下载nginx nginx安装 EPEL 仓库中有 Nginx 的安装包。如果你还没有安装过 EPEL,可以通过运行下面的命令来完成安装 sudo yum install epel-release 输入以下命令来安装 Nginx sudo yum inst…

【实战】十一、看板页面及任务组页面开发(二) —— React17+React Hook+TS4 最佳实践,仿 Jira 企业级项目(二十四)

文章目录 一、项目起航:项目初始化与配置二、React 与 Hook 应用:实现项目列表三、TS 应用:JS神助攻 - 强类型四、JWT、用户认证与异步请求五、CSS 其实很简单 - 用 CSS-in-JS 添加样式六、用户体验优化 - 加载中和错误状态处理七、Hook&…

【C++】STL---list

STL---list 一、list 的介绍二、list 的模拟实现1. list 节点类2. list 迭代器类(1)前置(2)后置(3)前置- -、后置- -(4)! 和 运算符重载(5)* 解引用重载 和 …

科大讯飞星火模型申请与chatgpt 3.5模型以及new bing的对比

科大讯飞星火模型 申请科大讯飞星火认知大模型账号科大讯飞星火认知大模型使用1.界面介绍2. 在编程能力上与chatgpt 3.5对比科大讯飞星火模型chatgpt 3.5模型 3. 在图片生成能力上与new bing对比 总结 申请科大讯飞星火认知大模型账号 注册网址: 科大讯飞星火认知大…

CW12B-3A-RCWW12B-6A-RCW12B-10A-RCWW12B-20A-RCWW12B-30A-RCWW12B-40A-R导轨式滤波器

CW4L2-3A-R1 CW4L2-6A-R1 CW4L2-10A-R1 CW4L2-20A-R1 CW4L2-30A-R1导轨式滤波器 CW12B-3A-R CWW12B-6A-R CW12B-10A-R CWW12B-20A-R CWW12B-30A-R CWW12B-40A-R导轨式滤波器 CW12C-3A-R CWW12C-6A-R CWW12C-10A-R CW12C-20A-R CW12C-30A-R导轨式滤波器 CW4L2-3A-R…

步入React正殿 - State进阶

目录 扩展学习资料 State进阶知识点 状态更新扩展 shouldComponentUpdate PureComponent 为何使用不变数据【保证数据引用不会出错】 单一数据源 /src/App.js /src/components/listItem.jsx 状态提升 /src/components/navbar.jsx /src/components/listPage.jsx src/A…

reeds_sheep运动规划算法Python源码分析

本文用于记录Python版本zhm-real / PathPlanning运动规划库中reeds_sheep算法的源码分析 关于reeds sheep算法的原理介绍前文已经介绍过了,链接如下所示: 《Reeds-Shepp曲线学习笔记及相关思考》 《Reeds-Shepp曲线基础运动公式推导过程》 正文&#xff…

防丢器Airtag国产版

Airtag是什么? AirTag是苹果公司设计的一款定位神奇,它通过一款纽扣电池进行供电,即可实现长达1-2年的关键物品的定位、查找的功能。 按照苹果公司自己的话说—— 您“丢三落四这门绝技,要‍失‍传‍了”。 AirTag 可帮你轻松追…

阿里云与中国中医科学院合作,推动中医药行业数字化和智能化发展

据相关媒体消息,阿里云与中国中医科学院的合作旨在推动中医药行业的数字化和智能化发展。随着互联网的进步和相关政策的支持,中医药产业受到了国家的高度关注。这次合作将以“互联网 中医药”为载体,致力于推进中医药文化的传承和创新发展。…

2023最新红盟云卡个人自动发卡系统源码 全开源

​ 简介: 2023最新红盟云卡个人自动发卡系统源码 全开源 该系统完全开源且无任何加密,可商业使用,并支持个人免签多个接口。 ​ 图片: