SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

    • 一、前言
    • 二、技术介绍(Flink CDC)
      • 1、Flink CDC
      • 2、Postgres CDC
    • 三、准备工作
    • 四、代码示例
    • 五、总结

一、前言

在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:

【技术实现】java实时同步postgresql变更数据,基于WAL日志

但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;

二、技术介绍(Flink CDC)

1、Flink CDC

Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。

2、Postgres CDC

1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;

三、准备工作

1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;
在这里插入图片描述
2、修改postgresql数据库配置,通过wal日志监听变更数据

修改postgresql.conf文件,重启服务
wal_level=logical

3、springboot关键maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>3.0.1</version>
</dependency>

注:其它依赖不在列举,可以通过获取源码查看

四、代码示例

InitAction02.java

package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Configuration
public class InitAction02 {@PostConstructpublic void run() throws Exception {DebeziumDeserializationSchema<String> deserializer =new JsonDebeziumDeserializationSchema();JdbcIncrementalSource<String> postgresIncrementalSource =PostgresSourceBuilder.PostgresIncrementalSource.<String>builder().hostname("127.0.0.1").port(5432).database("postgres").schemaList("public").tableList("public.student").username("postgres").password("password").slotName("flink").decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),"PostgresParallelSource").setParallelism(2).addSink(new CustomSink());//.print();env.execute("Output Postgres Snapshot");}}

CustomSink.java

package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;@Log4j2
public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {log.info("============数据发生变化:{}", value);}
}

执行结果:

1)新增数据
在这里插入图片描述

2)变更数据输出

2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO  [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO  [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}

五、总结

Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;

Postgres CDC 连接器

👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/389305.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为何重视文件加密?用哪款加密软件好呢?

一、公司都重视文件加密的原因有哪些&#xff1f;保护数据安全&#xff1a;在数字化时代&#xff0c;数据是企业重要的资产之一。文件加密可以确保数据在存储和传输过程中不被未经授权的人员访问或窃取&#xff0c;从而保护数据的机密性和完整性。这对于包含敏感信息&#xff0…

Reat hook开源库推荐

Channelwill Hooks 安装 npm i channelwill/hooks # or yarn add channelwill/hooks # or pnpm add channelwill/hooksAPI 文档 工具 Hooks useArrayComparison: 比较两个数组的变化。useCommunication: 处理组件之间的通信。useCurrencyConverter: 货币转换工具。useCurre…

【Docomo】5G

我们想向您介绍第五代移动通信系统“5G”。 5G 什么是5G&#xff1f;支持5G的技术什么是 5G SA&#xff08;独立&#xff09;&#xff1f;实现高速率、大容量的5G新频段Docomo的“瞬时5G”使用三个宽广的新频段 什么是5G&#xff1f; 5G&#xff08;第五代移动通信系统&#x…

【Elasticsearch】Elasticsearch的分片和副本机制

文章目录 &#x1f4d1;前言一、分片&#xff08;Shard&#xff09;1.1 分片的定义1.2 分片的重要性1.3 分片的类型1.4 分片的分配 二、副本&#xff08;Replica&#xff09;2.1 副本的定义2.2 副本的重要性2.3 副本的分配 三、分片和副本的机制3.1 分片的创建和分配3.2 数据写…

Github Benefits 学生认证/学生包 新版申请指南

本教程适用于2024年之后的Github学生认证申请&#xff0c;因为现在的认证流程改变了很多&#xff0c;所以重新进行了总结这方面的指南。 目录 验证教育邮箱修改个人资料制作认证文件图片转换Base64提交验证 验证教育邮箱 进入Email settings&#xff0c;找到Add email address…

【一图学技术】5.OSI模型和TCP/IP模型关系图解及应用场景

OSI模型和TCP/IP模型关系图解 OSI模型和TCP/IP模型都是网络通信的参考模型&#xff0c;用于描述网络协议的层次结构和功能。下面是它们的定义和区别&#xff1a; OSI模型&#xff08;Open Systems Interconnection Model&#xff09; OSI模型是一个理论上的七层模型&#xff…

揭秘线性代数秩的奥秘:从理论到机器学习的跨越

一、线性代数中的秩&#xff1a;定义与性质 1.1 定义 在线性代数中&#xff0c;秩是一个核心概念&#xff0c;用于描述矩阵或向量组的复杂性和独立性。具体而言&#xff0c;一个矩阵的秩定义为该矩阵中非零子式的最高阶数&#xff0c;而一个向量组的秩则是其最大无关组所含的…

双 Token 三验证解决方案

更好的阅读体验 \huge{\color{red}{更好的阅读体验}} 更好的阅读体验 问题分析 以往的项目大部分解决方案为单 token&#xff1a; 用户登录后&#xff0c;服务端颁发 jwt 令牌作为 token 返回每次请求&#xff0c;前端携带 token 访问&#xff0c;服务端解析 token 进行校验和…

Ubuntu配置项目环境

目录 一、Xshell连接云服务器 二、切换到root用户 三、安装jdk 四、安装tomcat 五、安装mysql 1、安装mysql服务器 2、卸载mysql服务器 六、正式进行程序的部署 一、Xshell连接云服务器 要想使用xshell连接上云服务器就需要明确云服务器的几个信息&#xff1a; 1&…

科研绘图系列:R语言GWAS曼哈顿图(Manhattan plot)

介绍 曼哈顿图(Manhattan Plot)是一种常用于展示全基因组关联研究(Genome-Wide Association Study, GWAS)结果的图形。GWAS是一种研究方法,用于识别整个基因组中与特定疾病或性状相关的遗传变异。 特点: 染色体表示:曼哈顿图通常将每个染色体表示为一个水平条,染色体…

tarojs项目启动篇

TaroJS 是一个开放式跨端开发解决方案&#xff0c;使用 React 语法规范来开发多端应用&#xff08;包括小程序、H5、React Native 等&#xff09;。它可以帮助开发者高效地构建出在不同端上运行一致的应用。以下是启动 TaroJS 项目&#xff08;本来就有的旧项目&#xff09;的步…

⭐️2024年7月全球排名前二十开发语言全面对比横向竖向PK(TIOBE指数榜单)编程语言介绍 适用场景 优势 举例 详细说明 编写第一个语言程序Hello world源代码

2024年7月全球排名前二十开发语言全面对比横向竖向PK&#xff08;TIOBE指数榜单&#xff09;编程语言介绍 适用场景 优势 举例 详细说明 编写第一个语言程序Hello world源代码 2024年7月全球排名前二十开发语言全面对比横向竖向PK&#xff08;TIOBE指数榜单&#xff09;编程语言…

反序列化靶机serial

1.创建虚拟机 2.渗透测试过程 探测主机存活&#xff08;目标主机IP地址&#xff09; 使用nmap探测主机存活或者使用Kali里的netdicover进行探测 -PS/-PA/-PU/-PY:这些参数即可以探测主机存活&#xff0c;也可以同时进行端口扫描。&#xff08;例如&#xff1a;-PS&#xff0…

【python】Python中采集Prometheus数据,进行数据分析和可视化展示

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

如何在 Debian 上安装运行极狐GitLab Runner?【二】

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门面向中国程序员和企业提供企业级一体化 DevOps 平台&#xff0c;用来帮助用户实现需求管理、源代码托管、CI/CD、安全合规&#xff0c;而且所有的操作都是在一个平台上进行&#xff0c;省事省心省钱。可以一键安装极狐GitL…

本地生活服务商公司有哪些?一文教你搭建本地生活系统!

当前&#xff0c;本地生活领域群雄环伺&#xff0c;日益激烈的竞争推动各家互联网大厂调整布局模式的同时&#xff0c;也让本地生活市场持续迸发新的活力。在此背景下&#xff0c;想要通过本地生活服务商身份入局的创业者数量不断增多&#xff0c;以本地生活服务商公司有哪些等…

BEVGPT展示自动驾驶的“全知视角”,预测决策规划三合一的革新之作!

前言 本篇文章由原paper一作Pengqin Wang&#xff08;王鹏钦&#xff09;全权翻译分享&#xff0c;王鹏钦为香港科技大学博士生&#xff0c;师从沈劭劼教授、朱美新教授。他的研究方向为自动驾驶和机器人系统中的决策、预测和规划。他的研究成果发表于TMECH、RAL、IROS、TRB等…

互联网政务应用安全管理规定

互联网政务应用安全管理规定 &#xff08;2024年2月19日中央网络安全和信息化委员会办公室、中央机构编制委员会办公室、工业和信息化部、公安部制定 2024年5月15日发布&#xff09; 第一章 总则 第一条为保障互联网政务应用安全&#xff0c;根据《中华人民共和国网络安全法…

【前端新手小白】学习Javascript的【开源好项目】推荐

目录 前言 1 项目介绍 1.1 时间日期类 1.2 网页store类 1.3 事件类 1.4 Number类 1.5 String类 1.6 正则验证类 1.7 ajax类 1.8 data数据类 1.9 browser浏览器类 2 学习js-tool-big-box开源项目时有哪些收获 2.1 你可以这样做 2.2 如果你需要使用本项目 2.3 你…

内网穿透的应用-Windows系统如何ssh连接群晖nas使用docker安装内网穿透软件

文章目录 前言1. 检查安装Container Manager2. 检查开启群晖SSH连接3. Windows SSH 连接群晖4. 下载Cpolar 镜像5. 群晖Docker安装Cpolar 前言 在某些群晖NAS型号版本&#xff0c;无法使用套件安装的时候&#xff0c;我们可以采用Docker的方式进行安装cpolar内网穿透工具&…