Opengauss到Oracle增量同步, 使用debezium

一、概述

PGOracle的同步方案使用debezium + kafka + kafka-connect-jdbcdebezium是一款开源的变更捕获软件,它以kafkaconnector形式运行,可以捕获PostgreSQLMySQLOracle中的变更数据,保存到kafkakafka-connect-jdbcconfluent公司的一款开源软件,以connector形式运行,可以从kafka读取变更数据,转换为PostgreSQLMySQLOracleSQLServerDB2等数据库的SQL语句,通过JDBC连接到数据库。

本方案用到的软件都是从开源代码编译而来,编译过程在第6章节。

二、kafkakafka connect

概述

kafka是一个分布式消息队列服务器。从逻辑角度讲,消息存储在称为topic的对象里(可以理解为一个topic就是一个消息队)kafka的客户端读/写消息时,需要指定topic名称。
可以用命令或代码创建topic、配置/查看属性、消息数。topic中的消息被读取后并没有删除,其它客户端仍可读取。客户端读取topic时,除了名称还需指定group id对象,它用于保存此客户端已经读到的消息的位置(offset)kafka是通过group id中的offset,来管理不同客户端对同一topic消费的不同进度。kafka把所有group idoffset保存在名为__consumer_offsettopic下。可以通过命令修改某个group idoffset。消息可以设置保存时间,超时自动删除,但无法用命令删除。topic中的消息只能以先进先出(FIFO)方式读取。

kafka的启动和配置

kafka的运行依赖zookeeper,集群中需要有zookeeper服务器,zookeeper是一个分布式数据库,一般用来存储集群中需要各节点共享的信息,保证信息的一致性,也可以用来实现分布式锁。

我下载的二进制kafka中包含了zookeeper服务器,在配置文件config/zookeeper.properties中,设置一下dataDir,作为实验其它参数默认即可:

注意,zookeeper服务器默认监听端口是2181

zookeeper和kafka都是java程序,注意配置一下java运行环境,我使用的是Java HotSpot 1.8,只要配置好JAVA_HOME和PATH两个环境变量即可,JAVA_HOME指向JDK解压后的目录,启动脚本会优先到JAVA_HOME里去找Java运行环境。

启动zookeeper,在kafka根目录下执行:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

kafka的配置文件是config/server.properties,设置log.dirs,这是kafka存消息数据的目录,会占用较大磁盘,作为实验其它参数默认:

启动kafka,在kafka根目录下执行:

bin/kafka-server-start.sh -daemon config/server.properties

注意,kafka服务器默认监听端口是9092,kafka和zookeeper,客户端和kafka直接都是通过网络读写数据的。

我们用到的关于zookeeper、kafka、connect的脚本都在kafka的bin目录下:

消息的读写:topicgroup id 相关命令

创建topic

bin/kafka-topics.sh --create --topic my-test-topic --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

查看指定topic属性:

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 -describe --topic my-test-topic

修改topic属性:

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --entity-type topics --alter --entity-name my-test-topic --add-config retention.ms=1000

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --entity-type topics --alter --entity-name my-test-topic --delete-config retention.ms

列出所有topic

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

删除topic

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic my-test-topic

查看topic中有几条消息:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic my-test-topic

查看有哪些消费者组:

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

查看消费者组的偏移信息:

./bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group connect-sink-oracle

可以看到connect-sink-oracle中记录了多个topic的偏移量信息,LOG-END-OFFSET表示这个topic中总共有多少消息,CURRENT_OFFSET表示消费者当前已读取的消息偏移量,LAG就是还剩多少条消息没读,这个命令可以查看同步的进度。 

设置某个topic的CURRENT_OFFSET:

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group connect-sink-oracle  --topic my-test-topic --execute --reset-offsets --to-earliest

可以使用bin目录下的kafka-console-producer.sh和kafka-console-consumer.sh,向kafka收发消息:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic-name>

./bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic <topic-name>

--from-beginning表示从topic的第一条消息开始读,否则只读最后一条消息之后的。

常用于调试,例如查看debezium在PG上做snapshot或捕获数据的状态:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic connect-offsets

kafka connect

kafka connectkafka的一个组件,是一个独立于kafka服务器的Java程序,它可以统一管理多个connector注意connectconnector不同),connector用于连接不同数据库,读取数据库写到kafka,或将kafka中的消息写到数据库。一个connectorkafka的生产者或消费者客户端,是jar形式的java程序,放在kafkalibs目录下。从编程角度讲connector实现了kafka connectsinksource接口,kafka connect启动时会加载它们。用户可以通过kafka connectREST API接口配置connector的参数。

实现source接口的connector,是kafka的生产者,连接源数据库,从源数据库获取数据写到kafka

实现sink接口的connector,是kafka的消费者,连接目标数据库,从kafka读取数据,写入目的数据库。

connector写到kafka前,数据可以序列化(压缩),从kafka读出后再反序列化(解压),以降低网络和存储开销,这个工作由称为converter的Java程序来做,它以jar形式被connect加载

一个kafka connect进程称为一个workersource connectorsink connector都运行在这个进程中。

配置和启动kafka connect

kafka connect的配置文件在config/connect-distributed.properties,作为实验参数使用默认值就行。

注意:

  1. bootstrap.serverskafka服务器的IP:PORT
  2. group.idconnect worker作为消费者所使用的group id不能与sink connectorgroup id相同。
  3. plugin.path指定了connect到哪些目录下寻找jar。jdbcconnectorconverter都是jar形式的程序。为了实验简单,只指定kafka根目录下的libs,把所有用到的jar都复制到这里,建议使用绝对路径。

启动connect

./bin/connect-distributed.sh -daemon config/connect-distributed.properties

​​​​​​​​​​​​​​配置和启动connector

kafka connect启动时connctorconverterjar已被加载,还需要通过REST API配置和启动connector

下面的命令创建/启动了一个connector

curl -H "Content-Type: application/json" -X POST http://127.0.0.1:8083/connectors -d '@sink-oracle-156.json'

connect启动以后在8083端口监听REST API命令,向这个端口以http post发送json格式的配置数据,'@sink-oracle-156.json'指定了文件路径为当前目录下sink-oracle-156.json文件,是json格式文本,内容如下:

是source类型还是sink类型,取决于connector.class指向的类是实现source接口还是sink接口。这个connector的名称是sink-oracle,实现了sink接口(类io.confluent.connect.jdbc.JdbcSinkConnector实现了sink类),key.converter和value.converter设定所使用的的converter。

删除/停止名为sink-oracle的connector:

curl -X DELETE http://127.0.0.1:8083/connectors/sink-oracle

查看所有connector:

curl -X GET http://127.0.0.1:8083/connectors

查看某个connector状态:

curl -X GET http://127.0.0.1:8083/connectors/<connector name>/status|jq

配置和部署avro-converterschema-registry

源数据库的每一条变更记录,默认被转换成了json字符串,包含了元数据和数据,然后发送到kafka,下面就是update emp set ename = 'TOMAS', job = 'ENGINEER' where empno=2 and ename='SMITH' 对应的json字符串:

这样有很大冗余,有两个办法可以降低冗余:
一、使用压缩的序列化格式;
二、使用元数据服务器共享元数据,用key值获得某个表的元数据,消息只携带数据和key值,消费者使用key值查询完整的元数据来解析消息。
avro-converterschema-registry共同完成这个工作。本方案的使用的avro-converterschema-registry来自confluentinc公司的开源代码,编译过程见后面章节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/194390.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Linux] ssh远程访问及控制

一、ssh介绍 1.1 SSH简介 SSH&#xff08;Secure Shell&#xff09;是一种安全通道协议&#xff0c;主要用于实现远程登录、远程复制等功能的字符接口。SSH 协议包括用户在登录时输入的用户密码、双方之间的通信。 加密数据传输&#xff0c;SSH 是一种建立在应用层和传输层上…

Please No More Sigma(构造矩阵)

Please No More Sigma 给f(n)定义如下&#xff1a; f(n)1 n1,2; f(n)f(n-1)f(n-2) n>2; 给定n&#xff0c;求下式模1e97后的值 Input 第一行一个数字T&#xff0c;表示样例数 以下有T行&#xff0c;每行一个数&#xff0c;表示n。 保证T<100&#xff0c;n<100000…

【Proteus仿真】【Arduino单片机】DHT11温湿度

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器&#xff0c;使用PCF8574、LCD1602液晶、DHT11温湿度传感器等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD1602显示传感器采集温度和湿度。 二、软件设…

SQL存储过程和函数

SQL存储过程和函数 变量系统变量用户定义变量局部变量 存储过程存储函数 变量 在MySQL中变量分为三种类型: 系统变量、用户定义变量、局部变量。 系统变量 系统变量 是MySQL服务器提供&#xff0c;不是用户定义的&#xff0c;属于服务器层面。分为全局变量&#xff08;GLOBA…

全栈工程师必须要掌握的前端Html技能

作为一名全栈工程师&#xff0c;在日常的工作中&#xff0c;可能更侧重于后端开发&#xff0c;如&#xff1a;C#&#xff0c;Java&#xff0c;SQL &#xff0c;Python等&#xff0c;对前端的知识则不太精通。在一些比较完善的公司或者项目中&#xff0c;一般会搭配前端工程师&a…

Mistral 7B 比Llama 2更好的开源大模型 (二)

Mistral 7B 论文学习 Mistral 7B 论文链接 https://arxiv.org/abs/2310.06825 代码: https://github.com/mistralai/mistral-src 网站: https://mistral.ai/news/announcing-mistral-7b/ 论文摘要 Mistral 7B是一个70亿参数的语言模型,旨在获得卓越的性能和效率。Mistral 7…

C# 使用Microsoft.Office.Interop.Excel库操作Excel

1.在NuGet管理包中搜索&#xff1a;Microsoft.Office.Interop.Excel&#xff0c;如下图红色标记处所示&#xff0c;进行安装 2. 安装完成后&#xff0c;在程序中引入命名空间如下所示&#xff1a; using Microsoft.Office.Interop.Excel; //第一步 添加excel第三方库 usi…

算法笔记-贪心1

算法笔记-贪心 什么是贪心算法分配饼干例题理解二分割字符串最优装箱整数配对最大组合整数分配区间问题买股票的最佳时机区间选点 问题什么是贪心算法 分配饼干例题 //贪心算法 //保证局部最优,从而使最后得到的结果是全局最优的 #include<iostream> #include<a…

VIVADO+FPGA调试记录

vivadoFPGA调试记录 vitis编译vivado导出的硬件平台&#xff0c;提示xxxx.h file cant findVITIS内定义的头文件找不到 vitis编译vivado导出的硬件平台&#xff0c;提示’xxxx.h file cant find’ 此硬件平台中&#xff0c;包含有AXI接口类型的ip。在vitis编译硬件平台时&…

【漏洞复现】maccms苹果cms 命令执行漏洞

漏洞描述 感谢提供更多信息。“苹果CMS” 似乎是指 “Maccms”&#xff0c;这是一款开源的内容管理系统&#xff0c;主要用于搭建视频网站。Maccms 提供了一套完整的解决方案&#xff0c;包括用户管理、视频上传、分类管理、数据统计等功能&#xff0c;使用户能够方便地创建和…

如何构建风险矩阵?3大注意事项

风险矩阵法&#xff08;RMA&#xff09;是确定威胁优先级别的最有效工具之一&#xff0c;可以帮助项目团队识别和评估项目中的风险&#xff0c;帮助项目团队对风险进行排序&#xff0c;清晰地展示风险的可能性和严重性&#xff0c;为项目团队制定风险管理策略提供依据。 如果没…

【ArcGIS处理】行政区划与流域区划间转化

【ArcGIS处理】行政区划与流域区划间转化 引言数据准备1、行政区划数据2、流域区划数据 ArcGIS详细处理步骤Step1&#xff1a;统计行政区划下子流域面积1、创建批量处理模型2、添加批量裁剪处理3、添加计算面积 Step2&#xff1a;根据子流域面积占比均化得到各行政区固定值 参考…

hadoop 大数据环境配置 配置jdk, hadoop环境变量 配置centos环境变量 hadoop(五)

1. 遗漏一步配置系统环境变量&#xff0c;下面是步骤&#xff0c;别忘输入更新系统环境命令 2. 将下载好得压缩包上传至服务器&#xff1a; /opt/module 解压缩文件存放地址 /opt/software 压缩包地址 3. 配置环境变量&#xff1a; 在/etc/profile.d 文件夹下创建shell文件 …

Linux Traefik工具Dashboard结合内网穿透实现远程访问

文章目录 前言1. Docker 部署 Trfɪk2. 本地访问traefik测试3. Linux 安装cpolar4. 配置Traefik公网访问地址5. 公网远程访问Traefik6. 固定Traefik公网地址 前言 Trfɪk 是一个云原生的新型的 HTTP 反向代理、负载均衡软件&#xff0c;能轻易的部署微服务。它支持多种后端 (D…

2023亚太杯数学建模思路 - 复盘:人力资源安排的最优化模型

文章目录 0 赛题思路1 描述2 问题概括3 建模过程3.1 边界说明3.2 符号约定3.3 分析3.4 模型建立3.5 模型求解 4 模型评价与推广5 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 描述 …

网络和Linux网络_2(套接字编程)socket+UDP网络通信代码

目录 1. 预备知识 1.1 源IP地址和目的IP地址 1.2 端口号port和套接字socket 1.3 网络通信的本质 1.4 TCP和UDP协议 1.5 网络字节序 2. socket套接字 2.1 socket创建套接字 2.2 bind绑定 2.3 sockaddr结构体 3. UDP网络编程 3.1 server的初始化服务器 3.2 server的…

如何解决3d max渲染效果图全白这类异常问题?

通过3d max渲染效果图时&#xff0c;经常会出现3Dmax渲染效果图全黑或是3Dmax渲染效果图全白这类异常问题。可能遇到这类问题较多的都是新手朋友。不知如何解决。 3dmax渲染出现异常的问题&#xff0c;该如何高效解决呢&#xff1f;今天小编这里整理几项知识点&#xff0c;大家…

打开文件 和 文件系统的文件产生关联

补充1&#xff1a;硬件级别磁盘和内存之间数据交互的基本单位 OS的内存管理 内存的本质是对数据临时存/取&#xff0c;把内存看成很大的缓冲区 物理内存和磁盘交互的单位是4KB&#xff0c;磁盘中未被打开的文件数据块也是4KB&#xff0c;所以磁盘中页帧也是4KB&#xff0c;内存…

简单理解路由重分发(用两路由器来理解)

相关命令&#xff1a; default-information originate //*重分发默认路由 redistribute rip subnets //*重分发rip redistribute ospf 1 metric 3 //*重分发ospf&#xff08;其中&#xff1a;1是ospf进程id 3是跳数&#xff09; redistribute sta…

电池故障估计:Realistic fault detection of li-ion battery via dynamical deep learning

昇科能源、清华大学欧阳明高院士团队等的最新研究成果《动态深度学习实现锂离子电池异常检测》&#xff0c;用已经处理的整车充电段数据&#xff0c;分析车辆当前或近期是否存在故障。 思想步骤&#xff1a; 用正常电池的充电片段数据构造训练集&#xff0c;用如下的方式构造…