采用Flink CDC操作SQL Server数据库获取增量变更数据

采用Flink CDC操作SQL Server数据库获取增量变更数据

Flink CDC 1.12版本引入了对SQL Server的支持,包括SqlServerCatalogSqlServerTable。在SqlServerCatalog中,你可以根据表名获取对应的字段和字段类型。

SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。

1、准备工作

软件版本

Flink 1.17.1

数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)

1.1、数据库准备 启动CDC

--  开启SQL Server数据库CDC。  在需要开启CDC的数据库执行此命令
EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库
select name, is_cdc_enabled from sys.databases 

1.2、开启SQL Server代理

打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启

在这里插入图片描述

1.3、为需要跟踪更改的表启用 CDC。

-- 开启表级别的CDC   --需要开启先SQL Server代理  然后执行 EXEC sys.sp_cdc_enable_table@source_schema = 'dbo', -- source_schema@source_name = 'AIR_STATION_HOUR_DATA', -- table_name@capture_instance = NULL, -- capture_instance@supports_net_changes = 1, -- supports_net_changes@role_name = NULL -- role_name--  验证表是否开启cdc成功EXEC sys.sp_cdc_help_change_data_capture

2、代码编写

2.1、引入依赖

    <properties><flink.version>1.17.1</flink.version></properties><dependencies><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>mssql-jdbc</artifactId><version>9.4.1.jre8</version></dependency>        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency></dependencies>

2.2、代码编写

2.2.1 、数据库配置文件编写

public class SQLServerConstant {public static final String SQLSERVER_HOST = "0.0.0.0";  //数据库地址public static final Integer SQLSERVER_PORT = 1433; //端口public static final String SQLSERVER_DATABASE = "HBDC_AQI";  //库public static final String SQLSERVER_TABLE_LIST= "dbo.AIR_STATION_HOUR_DATA"; // 表public static final String SQLSERVER_USER_NAME = "sa"; //用户public static final String SQLSERVER_PASSWORD = "*******"; //密码
}

2.2.2 CDC数据实体类

@Data
public class DataChangeInfo implements Serializable {/*** 数据库名*/private String database;/*** 表名*/private String tableName;/*** 变更时间*/private LocalDateTime changeTime;/*** 变更类型 1新增 2修改 3删除*/private Integer eventType;/*** 变更前数据*/private String beforeData;/*** 变更后数据*/private String afterData;}

2.2.2 、SQLServer消息读取自定义序列化

@Slf4j
public class SQLServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String CREATE = "CREATE";public static final String UPDATE = "UPDATE";@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {try {String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo = new DataChangeInfo();dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());// 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除Envelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;dataChangeInfo.setEventType(eventType);dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);ZoneId zone = ZoneId.systemDefault();Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> 		         Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));//7.输出数据collector.collect(dataChangeInfo);} catch (Exception e) {log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());}}/**** 从源数据获取出变更之前或之后的数据*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<DataChangeInfo> getProducedType() {return TypeInformation.of(DataChangeInfo.class);}
}

2.2.3 、功能工具类

public class FlinkSourceUtil {/*** 构造SQL Server CDC数据源*/public static DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {String[] tables = SQLSERVER_TABLE_LIST.replace(" ", "").split(",");return SqlServerSource.<DataChangeInfo>builder().hostname(SQLSERVER_HOST).port(SQLSERVER_PORT).database(SQLSERVER_DATABASE) // monitor sqlserver database.tableList(tables) // monitor products table.username(SQLSERVER_USER_NAME).password(SQLSERVER_PASSWORD)/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)*/.startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial()).deserializer(new SQLServerJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env.addSource(dataChangeInfoMySqlSource, "SQLServer-source").setParallelism(1);streamSource.print();env.execute("SQLServer-stream-cdc");}
}

2.3、运行main方法测试

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/302912.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】简单介绍进制和操作符

&#x1f308;个人主页&#xff1a;是店小二呀 &#x1f308;C语言笔记专栏&#xff1a;C语言笔记 &#x1f308;C笔记专栏&#xff1a; C笔记 &#x1f308;喜欢的诗句:无人扶我青云志 我自踏雪至山巅 本文简要介绍进制和操作符&#xff0c;愿能为您提供帮助&#xff01;文章…

论文解读:吴恩达来信AI Agent技巧—利用自我反馈的迭代细化技术

《自我完善&#xff1a;利用自我反馈的迭代细化技术》 https://arxiv.org/pdf/2303.17651.pdf 摘要 Large language models (LLMs) 经常无法在一次尝试中生成最佳输出。受人类在修改书面文本时所表现出的迭代精炼过程的启发&#xff0c; 我们提出了 SELF-REFINE&#xff0c…

R语言绘图 | 散点小提琴图

原文链接&#xff1a;R语言绘图 | 散点小提琴图 本期教程 写在前面 本期的图形来自发表在Nature期刊中的文章&#xff0c;这样的基础图形在日常分析中使用频率较高。 获得本期教程数据及代码&#xff0c;后台回复关键词&#xff1a;20240405 绘图 设置路径 setwd("You…

CSDN 广告太多,停更通知,转移到博客园

文章目录 前言新博客地址 前言 CSDN的广告实在是太多了&#xff0c;我是真的有点忍不了。直接把广告插在我的文章中间。而且我已经懒得找工作了&#xff0c;我当初写CSDN的目的就是为了找工作&#xff0c;有个博客排名。当时经济环境实在是太差了。我也没必要纠结这个2000粉丝…

因为使用ArrayList.removeAll(List list)导致的机器重启

背景 先说一下背景&#xff0c;博主所在的业务组有一个核心系统&#xff0c;需要同步两个不同数据源给过来的数据到redis中&#xff0c;但是每次同步之前需要过滤掉一部分数据&#xff0c;只存储剩下的数据。每次同步的数据与需要过滤掉的数据量级大概在0-100w的数据不等。 由…

快排序解读

排序算法是计算机科学中不可或缺的一部分&#xff0c;它们在各种数据处理场景中发挥着关键作用。在众多排序算法中&#xff0c;快速排序以其高效的性能和简洁的实现成为了许多程序员的首选。今天&#xff0c;我们就来深入剖析快速排序算法&#xff0c;了解其原理、实现方式以及…

比特币革命:刚刚开始

作者&#xff1a;Marius Farashi Tasooji 编译&#xff1a;秦晋 要充分理解比特币及其含义&#xff0c;首先必须理解什么是价值&#xff0c;什么是货币。以及是什么赋予资产价值&#xff1f; 这个问题看似愚蠢&#xff0c;但实际上非常有趣。我们的生活是由我们消费或出售的物品…

【问题解决】ubuntu安装新版vscode报code-insiders相关错误

问题 目前 vscode官网 最新的包为 insiders_1.89.0-1712297812_amd64.deb &#xff0c;双击或者使用sudo dpkg -i code-insiders_1.89.0-1712297812_amd64.deb安装后报错&#xff0c;执行其他命令也报错。 安装环境&#xff1a;ubuntu18.04 dpkg: 处理软件包 code-insiders (…

Taro框架中的H5 模板基本搭建

1.H5 模板框架的搭建 一个h5 的基本框架的搭建 基础template 阿乐/H5 Taro 的基础模板

在Spring中使用Redis

端口怎么设置&#xff0c;看我前一篇文章 前面使用jedis&#xff0c;通过Jedis对象中各种方法来操作redis的。 此处Spring中则是通过StringRedisTemplate来操作redis。 最原始提供的类是RedisTemplate StringRedisTemplate是RedisTemplate的子类&#xff0c;专门处理文本数据的…

PUBG绝地求生29.1版本延迟高/卡顿/掉帧/丢包的快速解决方法

要想在绝地求生中获得好成绩&#xff0c;咱们需求把握一些根本的游戏技巧。比方&#xff0c;在挑选降落点时&#xff0c;咱们可以运用u标签来着重“安全”二字。挑选一个相对较为安全的降落点可以防止与其他玩家过早触摸&#xff0c;给自己争夺更多时间来搜集资源和配备。接下来…

Vant DropdownMenu 下拉菜单带搜索功能

Vant DropdownMenu 下拉菜单带搜索功能 效果图&#xff1a; 上代码&#xff1a; <van-dropdown-menu active-color"#E33737"><van-dropdown-item ref"dropdownItem"><template #title><span>{{ dropdownItem.text }}</span…

Mysql密码修改问题

docker安装mysql&#xff0c;直接拉取镜像&#xff0c;挂载关键目录即可启动&#xff0c;默认3306端口。此时无法直接连接&#xff0c;需要配置密码。docker进入mysql容器中 docker exec -it mysql bash #mysq是容器名称&#xff0c;也可以用容器id通过修改mysql的配置进行免密…

应用商店备案登记流程解析

引言&#xff1a; 随着智能手机的普及和移动互联网的发展&#xff0c;移动应用程序&#xff08;App&#xff09;已成为人们日常生活中不可或缺的一部分。在开发一个App之后&#xff0c;开发者需要将其上传到应用商店进行审核和上架。然而&#xff0c;在上架之前&#xff0c;开…

智慧运维解决方案

1&#xff1a;排口截污 控源截污、内源治理、生态修复 通过传感器对周围环境进行监测&#xff0c;将雨水和污水分别流入不同的管道&#xff0c;进行分流和净化处理&#xff0c;守好排污口&#xff0c;解决城市雨水和污水污染问题&#xff0c;减少城市环境污染。 2&#xff1…

html骨架以及常见标签

推荐一个网站mdn。 html语法 双标签&#xff1a;<标签 属性"属性值">内容</标签> 属性&#xff1a;给标签提供附加信息。大多数属性以键值对的形式存在。如果属性名和属性值一样&#xff0c;可以致谢属性值。 单标签&#xff1a;<标签 属性"属…

私域电商客户要挨一刀的“订单发货管理”,微信:必须强制接入

文丨微三云营销总监胡佳东&#xff0c;点击上方“关注”&#xff0c;为你分享市场商业模式电商干货。 - 引言&#xff1a;超90%的私域运营商家都见到了或者说遇到了这个问题&#xff0c;如果没有读懂这个微信的模型机制&#xff0c;一定会懵逼&#xff0c;微三云营销总监胡佳…

计算机网络:数据链路层 - 点对点协议PPP

计算机网络&#xff1a;数据链路层 - 点对点协议PPP PPP协议的帧格式透明传输字节填充法零比特填充法 差错检测循环冗余校验 对于点对点链路&#xff0c;PPP协议是目前使用最广泛的数据链路层协议。比如说&#xff0c;当用户想要接入互联网&#xff0c;就需要通过因特网服务提供…

被狠狠拷打!想冲 PDD 机器学习算法岗,一面直接挂了。。。

节前&#xff0c;我们社群组织了一场技术&面试讨论会&#xff0c;邀请了一些互联网大厂朋友、今年参加社招和校招面试的同学&#xff0c;针对新手如何机器学习算法、企业级落地场景、大模型的发展趋势与落地实践、新人该如何备考、面试常考点等热门话题进行了深入的讨论。 …

LoRa自组网络设计 6

1 深入了解LoRaWan 1.1 LoRaWan概述 LoRaWAN采用星型无线拓扑 End Nodes 节点 Gateway 网关 Network Server 网络服务器 Application Server 应用服务器 LoRa联盟是2015年3月Semtech牵头成立的一个开放的、非盈利的组织&#xff0c;发起成员还有法国Actility&#xff0c;中国…