Debezium2.7 数据同步 MySQL/Oracle -- AI生成

        Debezium是Red Hat开源的一个工具,用于实时捕获多种数据源(包括MySQL、PostgreSQL、SQL Server、Oracle等)的变更数据,并将这些数据作为事件流输出到Kafka等消息中间件中。通过Debezium,可以实现数据的实时同步和变更数据捕获(CDC)。

        Debezium官网:Reference Documentation

一、环境准备

1、安装 MySQL

        确保 MySQL 版本至少为 5.7.6,因为 Debezium 需要此版本或更高版本的 MySQL 来支持 Binlog。安装 MySQL 并配置好基本的访问权限和网络设置。

# 编辑 MySQL 的配置文件(通常是 /etc/my.cnf 或 /etc/mysql/my.cnf),添加或修改以下配置:
[mysqld]  
server-id = 1  # 确保每个 MySQL 实例的 server-id 是唯一的  
log_bin = mysql-bin  # 开启 Binlog 并指定日志文件名前缀  
binlog_format = ROW  # 使用行格式记录 Binlog,以捕获详细的行级变更  
binlog_row_image = FULL  # 记录更详细的数据变更信息  
expire_logs_days = 10  # 设置 Binlog 的过期时间

        在 MySQL 中创建用于 Debezium 的用户,并授权访问数据库: 

CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';  
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';  
FLUSH PRIVILEGES;

2、安装Oracle

        确保 Oracle 数据库已安装并配置好,归档日志(Archive Log)和补充日志(Supplemental Log Data)已开启。创建一个用户并授予足够的权限以访问需要同步的表和日志。

-- 1. 查看数据库是否处于归档模式
SELECT log_mode FROM v$database;-- 如果结果显示为ARCHIVELOG,则数据库已处于归档模式,可以跳过转换步骤-- 2. 如果不是归档模式,转换为归档模式
SHUTDOWN IMMEDIATE; -- 立即关闭数据库
STARTUP MOUNT; -- 启动实例并挂载数据库
ALTER DATABASE ARCHIVELOG; -- 更改数据库为归档模式
ALTER DATABASE OPEN; -- 打开数据库-- 3. 指定归档日志的位置,可以是一个本地目录或一个网络位置
-- 首先查看归档日志的配置
SELECT destination FROM v$archived_log WHERE destination IS NOT NULL;-- 如果需要更改归档日志的位置,可以使用以下命令
-- 例如,指定归档日志的位置为+ARCH/orcl
ALTER SYSTEM SET log_archive_dest_1='LOCATION=+ARCH/orcl' SCOPE=BOTH;-- 确保归档进程正在运行
ALTER SYSTEM ARCHIVE LOG START;-- 可以通过以下命令查看归档进程状态
SELECT status FROM v$instance WHERE status = 'ACTIVE';

3、安装 Kafka 和 Zookeeper

        Kafka 和 Zookeeper 是 Debezium 实现数据同步所必需的组件。

        安装 Kafka 和 Zookeeper,并确保它们能够正常运行,且 Kafka 集群的配置符合生产环境的要求。

4、下载 Debezium 插件

        访问 Debezium 官网((https://debezium.io/documentation/)下载对应版本的 MySQL/Oracle Connector 插件(这里以2.7版本为例),如 debezium-connector-mysql-x.y.z.Final-plugin.tar.gz

二、配置 Kafka Connect

1、解压 Debezium 插件

        将下载的 Debezium MySQL/Oracle Connector 插件解压到 Kafka 的插件目录中,例如 /opt/kafka/plugins ,没有plugins 目录,创建一个plugins 目录

2、修改 Kafka Connect 配置

        编辑 Kafka Connect 的配置文件(如 connect-distributed.properties),确保包含以下关键配置:

bootstrap.servers=localhost:9092  # Kafka 集群地址  
group.id=connect-cluster  # Kafka Connect 集群的组 ID  
key.converter=org.apache.kafka.connect.json.JsonConverter  # 键转换器  
value.converter=org.apache.kafka.connect.json.JsonConverter  # 值转换器  
key.converter.schemas.enable=false  # 禁用键的 schema  
value.converter.schemas.enable=false  # 禁用值的 schema  
offset.storage.topic=connect-offsets  # 偏移量存储主题  
config.storage.topic=connect-configs  # 配置存储主题  
status.storage.topic=connect-status  # 状态存储主题  
plugin.path=/opt/kafka/plugins  # 插件路径

3、启动 Kafka Connect

        使用命令启动 Kafka Connect 服务,并确保它以分布式模式运行:

/opt/kafka/bin/connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties

三、配置 Debezium Connector

        创建一个 JSON 文件来定义 Debezium MySql/Oracle Connector 的配置,指定 MySql/Oracle 数据库的连接信息、Kafka 主题、以及需要同步的表。

1、创建 Debezium Mysql Connector 配置

        编写一个配置文件mysql-source.json,用于启动Debezium MySQL Connector。可参照官网例子:Debezium connector for MySQL :: Debezium Documentation

{"name": "mysql-connector", //在Kafka Connect服务中注册时的连接器名称。"config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector"", "database.hostname": "mysql_host", "database.port": "3306", "database.user": "debezium-user", "database.password": "debezium-user-pw", "database.server.id": "184054", "topic.prefix": "fullfillment", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.fullfillment", "include.schema.changes": "true" }
}

 

        更多配置可查看:Debezium connector for MySQL :: Debezium Documentation 

        使用Kafka Connect REST API或CLI工具部署Debezium MySQL Connector。

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @mysql-source.json http://connect_cluster:8083/connectors

2、创建 Debezium Oracle Connector 配置

        编写一个配置文件oracle-source.json,用于启动Debezium Oracle Connector。

{"name": "oracle-connector",  "config": {"connector.class" : "io.debezium.connector.oracle.OracleConnector",  "database.hostname" : "<ORACLE_IP_ADDRESS>",  "database.port" : "1521",  "database.user" : "c##dbzuser",  "database.password" : "dbz",   "database.dbname" : "ORCLCDB",  "topic.prefix" : "server1",  "tasks.max" : "1",  "database.pdb.name" : "ORCLPDB1",  "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory"  }
}

        还可以使用JDBC URL连接到数据库。

{"name": "oracle-connector","config": {"connector.class" : "io.debezium.connector.oracle.OracleConnector","tasks.max" : "1","topic.prefix" : "server1","database.user" : "c##dbzuser","database.password" : "dbz","database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=OFF)(FAILOVER=ON)(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 1>)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 2>)(PORT=1521)))(CONNECT_DATA=SERVICE_NAME=)(SERVER=DEDICATED)))","database.dbname" : "ORCLCDB","database.pdb.name" : "ORCLPDB1","schema.history.internal.kafka.bootstrap.servers" : "kafka:9092","schema.history.internal.kafka.topic": "schema-changes.inventory"}
}

         以上是容器数据库(CDB)配置方式,下面是非容器数据库(非CDB)配置方式。

{"config": {"connector.class" : "io.debezium.connector.oracle.OracleConnector","tasks.max" : "1","topic.prefix" : "server1","database.hostname" : "<oracle ip>","database.port" : "1521","database.user" : "c##dbzuser","database.password" : "dbz","database.dbname" : "ORCLCDB","schema.history.internal.kafka.bootstrap.servers" : "kafka:9092","schema.history.internal.kafka.topic": "schema-changes.inventory"}
}

当您配置Debezium Oracle连接器以与Oracle CDB一起使用时,必须为属性database.pdb.name指定一个值,该值命名了您希望连接器从中捕获更改的pdb。对于非CDB安装,不要指定database.pdb.name属性。

        更多配置可查看:Debezium Connector for Oracle :: Debezium Documentation

        使用Kafka Connect REST API或CLI工具部署Debezium Oracle Connector。 

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @oracle-source.json http://connect_cluster:8083/connectors

四、使用JDBC Sink Connector同步数据到数据库

        以JDBC Sink Connector为例,以下是一个简化的配置示例,用于将数据从Kafka订阅消费事件JSON:

{"name": "jdbc-connector",  //在Kafka Connect服务中注册连接器时分配给连接器的名称。"config": {"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  //JDBC接收器连接器类的名称。"tasks.max": "1",  //为此连接器创建的最大任务数。"connection.url": "jdbc:postgresql://localhost/db",  //连接器用于连接到其写入的接收器数据库的JDBC URL。"connection.username": "pguser",  //用于身份验证的数据库用户的名称。"connection.password": "pgpassword",  //用于身份验证的数据库用户的密码。"insert.mode": "upsert",  // 插入模式,可以选择 `insert`, `update`, 或者 `upsert`"delete.enabled": "true",  //允许删除数据库中的记录"primary.key.mode": "record_key",  //指定用于解析主键列的方法。"schema.evolution": "basic",  //指定连接器如何演化目标表架构,可选择`none`, 或者 `basic`, basic:指定发生基本演变。连接器通过将传入事件的记录架构与数据库表结构进行比较,将缺失的列添加到表中。"database.time_zone": "UTC",  //指定写入时间字段类型时使用的时区。"topics": "orders" //要使用的主题列表,用逗号分隔。}
}

        更多配置可查看:Debezium connector for JDBC :: Debezium Documentation 

        使用Kafka Connect REST API或CLI工具部署Oracle Sink Connector。 

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @sink.json http://connect_cluster:8083/connectors

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/409651.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Qt】常用控件QCalendarWidget

常用控件QCalendarWidget的使用 QCalendarWidget表示一个日历 核心属性 属性说明 selectDate 当前选中的⽇期 minimumDate 最⼩⽇期 maximumDate 最⼤⽇期 firstDayOfWeek 每周的第⼀天(也就是⽇历的第⼀列) 是周⼏. gridVisible 是否显⽰表格的边框 selectionMode…

何为MethodHandles?

最近在梳理ThreadPoolExecutor&#xff0c;无意间看到其内部类Worker实现了一个名字叫做AbstractQueuedSynchronizer的抽象类。看到它&#xff0c;我便想起当年为了面试而疯狂学习这个知识点的场景。不过这种临时抱佛脚的行为&#xff0c;并未给我带来即时的收益。也是这次的疯…

软件上显示“mfc140.dll丢失”错误信息?那么mfc140.dll丢失该如何修复

mfc140.dll是 Microsoft Foundation Class (MFC) 库的一部分&#xff0c;这个库被用于基于 C 的 Windows 应用程序的开发。当 Windows 或软件上显示“mfc140.dll丢失”或“找不到 mfc140.dll”这类错误信息时&#xff0c;表示你的系统可能缺少与 Visual C 相关的组件或这些组件…

文本处理函数

1.文本的提取 left mid right 2.文本的查找与替换 replace&#xff0c;substitute 3.字符个数 len字符 lenb字节, office365好像没有此功能 4.数据的清理 clean , trim 5.找不同 exact

【Qt】多元素控件QTableWidget

多元素控件QTableWidget 使用QTableWidget表示一个表格控件&#xff0c;一个表格中包含若干行、每一个行又包含若干列。 表格中的每一个单元格&#xff0c;都是一个QTableWidget对象。 QTableWidget核心方法 方法说明 item(int row, int column) 根据⾏数列数获取指定的 Q…

WIN32实现远程桌面监控

文章目录 完整代码API简介调试代码 后记reference 完整代码 server.cpp #include <winsock2.h> #include <Ws2tcpip.h> #include <windows.h> #include <stdio.h> #include <vector> #pragma comment(lib, "ws2_32.lib")LRESULT CAL…

免费JSON在线解析工具网址

1&#xff0c;https://tool.juhe.cn/ JSON在线解析 (juhe.cn) 2&#xff0c;https://www.sojson.com/ JSON在线 | JSON解析格式化—SO JSON在线工具

Android Studio:模拟器页面闪烁,手机模拟器输入画面闪烁 android studio闪屏

主要解决&#xff0c;android studio 启动app测试&#xff0c;输入数据时&#xff0c;手机画面就会闪烁&#xff0c;闪屏 1. 如图所示&#xff0c;依照顺序找到Edit &#xff0c;并点击Edit 2. 找到Graphics 选择为SoftWare &#xff0c;并保存修改即可 3. 如果此处不能选择S…

MongoDB Compass初体验

入坑Mongodb也好多年了&#xff0c;客户端一直都是使用的Robomongo&#xff0c;后改名为Robo 3T了&#xff0c;现在又改名为Studio 3T&#xff0c;还分了免费版和付费版。 最近换了新电脑&#xff0c;需要重新安装Mongodb的客户端&#xff0c;加上公司对安装软件的各种限制&…

【C语言报错已解决】 `Buffer Overflow`

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 引言一、问题描述&#x1f469;‍&#x1f52c;报错示例&#x1f4da;报错分析&#x1f4da;解决思路 二、解决方法&a…

加速自动驾驶模型迭代,数据存算一体是关键

自动驾驶的每一个业务阶段都会涉及到 AI 深度学习算法和算力的参与&#xff0c;机器视觉&#xff0c;深度学习&#xff0c;传感器技术等均在自动驾驶领域发挥着重要的作用。自动驾驶系统不断迭代的前提是算法的持续优化&#xff0c;目前&#xff0c;自动驾驶发展的瓶颈主要在于…

Vue3.0项目实战(二)——大事件管理系统登录注册功能实现

目录 1. 登录注册页面 [element-plus 表单 & 表单校验] 1.1 注册登录 静态结构 & 基本切换 2. 注册功能 2.1 实现注册校验 2.2 注册前的预校验 2.3 封装 api 实现注册功能 3. 登录功能 3.1 实现登录校验 3.2 登录前的预校验 & 登录成功 1. 登录注册页面 […

交换排序(冒泡排序和快速排序)

一、基本思想 所谓交换&#xff0c;就是根据序列中两个记录键值的比较结果来对换这两个记录在序列中的位置。 交换排序的特点是&#xff1a;将键值较大的记录向序列的尾部移动&#xff0c;键值较小的记录向序列的前部移动。 二、冒泡排序 1.核心思想 两两相邻的元素进行比…

大二必做项目贪吃蛇超详解之上篇win32库介绍

文章目录 1. 游戏背景2. 游戏效果演示3. 项目目标4. 前置知识5. Win32 API5. 1 控制台程序(Console)5. 2 控制台屏幕上的坐标 COORD5. 3 GetStdHandle5. 4 GetConsoleCursorlnfo5. 4. 1 CONSOLE_CURSOR_INFO5. 4. 2 SetConsoleCursorlnfo 5. 5 SetconsoleCursorPosition5. 6 Ge…

Linux(面试篇)

目录 什么是Linux 什么是Linux内核&#xff1f; Linux的基本组件是什么&#xff1f; Bash和Dos之间基本区别是什么&#xff1f; 什么是Root账户 什么是Bash? 什么时CLI? Linux的目录结构时怎样的&#xff1f; 什么是硬链接和软链接&#xff1f; 什么叫CC攻击&#…

【项目日记】高并发内存池 ---项目介绍及组件定长池的实现

余生还长&#xff0c;你别慌&#xff0c;也别回头&#xff0c;别念旧. --- 余华 --- 1 高并发内存池简介 高并发内存池项目是实现一个高并发的内存池&#xff0c;他的原型是google的一个开源项目tcmalloc&#xff0c;tcmalloc全称Thread-Caching Malloc&#xff0c;即线程缓存…

RocketMQ Dashboard

rocketmq-dashboard是一个可视化查看和管理RocketMQ消息队列的工具 官方地址&#xff1a;RocketMQ Dashboard | RocketMQ 1、点击下载源码 2、下载并解压&#xff0c;切换至源码目录rocketmq-dashboard-1.0.0 3、修改配置文件 4、编译 rocketmq-dashboard打成jar包 &#xf…

MySQL中的回表查询、索引覆盖、索引下推

本文重点介绍索引中的常见概念&#xff1a;回表查询、索引覆盖、索引下推 一、回表查询 我们首先理解&#xff1a;在InnoDB存储引擎中&#xff0c;根据索引的存储形式&#xff0c;又可以分为以下两种&#xff1a; 分类含义特点聚集索引 (Clustered Index)将数据存储与索引放到…

leetcode 438.找到字符串中所有字母异位词

目录 题目描述 示例1&#xff1a; 示例2&#xff1a; 提示&#xff1a; 解题思路 Collections库 介绍 滑动窗口法 概念 应用场景及特点&#xff1a; 思路 流程展示 代码 复杂度分析 题目描述 给定两个字符串s和p&#xff0c;找到s中所有p的异位词的子串&#xf…

cdga|让数据治理真正内嵌于企业本身,释放企业数字化建设的最大价值

在当今这个数据驱动的时代&#xff0c;企业数据已成为最宝贵的资产之一&#xff0c;它不仅记录着企业的运营轨迹&#xff0c;更是指导决策、优化流程、创新产品与服务的关键力量。然而&#xff0c;要充分发挥数据的潜力&#xff0c;实现数字化转型的深度与广度&#xff0c;就必…