Flink CDC系列之:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
- 一、技术路线
- 二、MySQL数据库建表
- 三、PostgreSQL数据库建表
- 四、在 Flink SQL CLI 中使用 Flink DDL 创建表
- 五、关联订单数据并且将其写入 Elasticsearch 中
- 六、Kibana查看商品和物流信息的订单数据
- 七、修改数据库中表的数据,Kibana查看更新
一、技术路线
二、MySQL数据库建表
mysql数据库创建数据库和表 products,orders
创建products表
-- MySQLCREATE DATABASE mydb;USE mydb;CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));ALTER TABLE products AUTO_INCREMENT = 101;
创建orders表
CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed) AUTO_INCREMENT = 10001;
products表插入数据
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");
orders表插入数据
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
三、PostgreSQL数据库建表
创建表 shipments
-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL);
插入数据
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;ALTER TABLE public.shipments REPLICA IDENTITY FULL;INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),(default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
四、在 Flink SQL CLI 中使用 Flink DDL 创建表
首先,开启 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
然后, 对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
-- Flink SQLFlink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');
最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
五、关联订单数据并且将其写入 Elasticsearch 中
使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
六、Kibana查看商品和物流信息的订单数据
创建 index pattern enriched_orders
查看写入的数据
七、修改数据库中表的数据,Kibana查看更新
修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:
在 MySQL 的 orders 表中插入一条数据
--MySQLINSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
在 Postgres 的 shipment 表中插入一条数据
--PG
INSERT INTO shipmentsVALUES (default,10004,'Shanghai','Beijing',false);
在 MySQL 的 orders 表中更新订单的状态
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
在 Postgres 的 shipment 表中更新物流的状态
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
在 MYSQL 的 orders 表中删除一条数据
--MySQL
DELETE FROM orders WHERE order_id = 10004;
每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示: