使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
*  自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value  = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/***  获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/363029.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生成随机函数f3,利用f3生成f18(python)

一、题目 给定一个完全随机函数f3。能够完全随机产生1~3之间任意一个自然数。现在要构造一个f18&#xff0c;让其能随机产生1~18之间任意一个自然数&#xff0c;要求写出f18的函数&#xff0c;另外要测试是否符合预期&#xff0c;f18要用f3 二、代码 欢迎大家给我更优解&…

DIY:在您的 PC 上本地使用 Stable Diffusion AI 模型生成图像

前言 随着DALL-E-2和Midjourney的发布&#xff0c;您可能听说过最近 AI 生成艺术的繁荣。这些人工智能模型如何在几秒钟内创造性地生成逼真的图像&#xff0c;这绝对是令人兴奋的。您可以在这里查看其中的一些&#xff1a;DALL-E-2 gallery和Midjourney gallery 但是这些模型…

【深度学习】深度学习基础

李宏毅深度学习笔记 局部极小值与鞍点 鞍点其实就是梯度是零且区别于局部极小值和局部极大值的点。 鞍点的叫法是因为其形状像马鞍。鞍点的梯度为零&#xff0c;但它不是局部极小值。我们把梯度为零的点统称为临界点&#xff08;critical point&#xff09;。损失没有办法再下…

学生信息管理系统

DDL和DML -- 创建学生表 CREATE TABLE students (student_id INT PRIMARY KEY AUTO_INCREMENT,name VARCHAR(50),age INT,gender VARCHAR(10) );-- 创建课程表 CREATE TABLE courses (course_id INT PRIMARY KEY AUTO_INCREMENT,course_name VARCHAR(50) );-- 创建教师表 CREA…

HTML静态网页成品作业(HTML+CSS+JS)——家乡莆田介绍网页(5个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;使用Javacsript代码实现图片轮播&#xff0c;共有5个页面。 二、作品…

C语言基础笔记(全)

一、数据类型 数据的输入输出 1.数据类型 常量变量 1.1 数据类型 1.2 常量 程序运行中值不发生变化的量&#xff0c;常量又可分为整型、实型(也称浮点型)、字符型和字符串型 1.3 变量 变量代表内存中具有特定属性的存储单元&#xff0c;用来存放数据&#xff0c;即变量的值&a…

【Echarts】散点图 制作 气泡 类型图表

目录 需求主要代码效果展示注 需求 需参照设计图画出对应图表 主要代码 /**** 数据 ****/ this.dataList [...Array(8).keys()].map((item) > {return {ywlxmc: 业务类型 (item 1),sl: item > 4 ? 50 : 70} })/**** 气泡样式 ****/ const styleList [{offset: [56…

MySQL实训

项目名称与项目简介 股票交易系统是一个综合性的金融服务平台&#xff0c;它提供了股票买卖、交易查询、用户管理、股票信息管理以及资金账户管理等功能。系统旨在为用户提供一个安全、高效、便捷的股票交易环境&#xff0c;让用户能够实时掌握市场动态&#xff0c;做出合理的…

探索Facebook的未来世界:数字社交的演进之路

在数字化和全球化的浪潮中&#xff0c;社交网络如Facebook已经成为了人们日常生活不可或缺的一部分。然而&#xff0c;随着技术的迅猛发展和用户需求的不断变化&#xff0c;Facebook正在经历着社交平台的演进之路。本文将探索Facebook的未来世界&#xff0c;分析数字社交的发展…

用英文介绍美国总统Trump: Donald J. Trump Twice Impeached (2017 – 2021)

Donald J. Trump: Twice Impeached (2017 – 2021) Link: https://www.youtube.com/watch?vJ7RC2DKf6rs&listPLybg94GvOJ9E-ZM1U6PAjgPUmz-V4-Yja&index45 Summary Summary of Donald Trump’s Rise and Presidency Donald John Trump, originally from Queens, Ne…

【MTK平台】如何学习Bluedroid A2DP Code

一 Bluedroid A2DP架构图 备注: vendor/mediatek/proprietary/packages/modules/Bluetooth/system/audio_a2dp_hw/src 目录下编译生成audio.a2dp.default.so,主要实现a2dp做为设备的功能 二 A2DP File Hierarchy ModuleFileDescriptionAudio HAL (hardware/libhardware/…

Arcgis 计算经纬度坐标并补齐6位小数

工作中我们经常需要在Arcgis中计算点的经纬度或者线的起点、终点坐标&#xff0c;为确保数据的准确性&#xff0c;我们必须保留6位小数&#xff0c;但我们在默认计算的时候偶尔会遇到算出来的经纬度坐标小数位不足6位&#xff0c;那我们应该如何补齐呢&#xff0c;这里我将方法…

智芯开发板----环境配置

一、软件准备 Keil IDE/ IAR IdeSupport_Install_Package已经上传到资源中自行下载即可。 二、IAR环境配置 1.首先将IdeSupport_Install_Package内的IAR文件复制到你的IAR安装路径中如图所示&#xff1a; 2.按如图所示的路径进行复制即可 3.以记事本的方式打开这个xml文件…

RK3588 Android13 TvSetting 中性能浮窗RAM显示bug

前言 电视产品,客户发现在设备偏好设置->高级设置->性能浮窗菜单里显示的 RAM 大小是错误的, 要求改成正确的,并且屏幕密度修改后,这个浮窗显示不全,也需要一起处理。 效果图 TvSetting 部分修改文件清单 bug 原因在于 Formatter.formatFileSize 这个 API,我们…

聚观早报 | iPhone 16核心硬件曝光;三星Galaxy全球新品发布会

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 6月28日消息 iPhone 16核心硬件曝光 三星Galaxy全球新品发布会 苹果正多方下注布局AI商店 黄仁勋2024年薪酬3400…

OpenHarmony开发实战:HDF驱动开发流程

概述 HDF&#xff08;Hardware Driver Foundation&#xff09;驱动框架&#xff0c;为驱动开发者提供驱动框架能力&#xff0c;包括驱动加载、驱动服务管理、驱动消息机制和配置管理。并以组件化驱动模型作为核心设计思路&#xff0c;让驱动开发和部署更加规范&#xff0c;旨在…

构建个人文件上传服务:Python Flask实现上传和下载完整指南

介绍 在本教程中&#xff0c;我们将学习如何使用Python Flask框架将文件上传到服务器&#xff0c;并使用SQLite数据库来跟踪上传的文件。我们将提供后端代码和一个示例项目的Git链接&#xff0c;以便您可以轻松地跟随本教程。 准备工作 首先&#xff0c;您需要安装Python和F…

Java基础知识-线程

Java基础知识-线程 1、在 Java 中要想实现多线程代码有几种手段&#xff1f; 1. 一种是继承 Thread 类 2. 另一种就是实现 Runnable 接口 3. 最后一种就是实现 Callable 接口 4. 第四种也是实现 callable 接口&#xff0c;只不过有返回值而已 2、Thread 类中的 start() 和 …

Python 面试【★★★】

阐述以下方法 classmethod, staticmethod, property&#xff1f; 解释什么是lambda函数&#xff1f;它有什么好处&#xff1f;

手机远程控制另一台手机的全新使用教程(安卓版)

看完这篇文章&#xff0c;你可以了解到安卓手机如何远程控制安卓手机&#xff0c;以及苹果手机如何远程控制安卓手机。 如果想要用安卓手机远程管控苹果手机&#xff0c;或者苹果手机远程管控另一台苹果手机&#xff0c;请点击查看视频《手机远程管控另一台手机的全新使用教程…