mongodb使用debezium

前置

服务器上需要安装jdk11
jdk下载地址

kafka安装

官网下载地址

安装教程

debezium 安装

运行 Debezium 连接器需要 Java 11 或更高版本
Debezium 并不是一个独立的软件,而是很多个 Kafka 连接器的总称。这些 Kafka 连接器分别对应不同的数据库,比如 MySQL、Oracle 等。按 Kafka 连接器的常见命名规则,可能我们会把它们叫做 MySQL Kafka Source Connector 之类。

部署

1.下载对应版本的debezium插件

插件地址
在这里插入图片描述
在这里插入图片描述

2.文件解压

将下载的文件解压,将解压后的文件放到kafka的plugin文件夹下(该plugin文件夹为自己创建的plugin文件夹)*,例如
在这里插入图片描述

3. 通过 kafka connect部署

kafka connect有两种部署方式,一是单机部署,二是分布式部署。单机部署配置kafka/config/connect-standalone.properties 文件,分布式部署则配置kafka/config/connect-distributed.properties。分布式部署支持通过rest api管理connector

此处是单机部署,配置文件为kafka/config/connect-standalone.properties,主要修改以下内容:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/local/kafka/plugin

4.启动kafka-connect

需要先启动kafka

bin/connect-standalone.sh config/connect-standalone.properties

5.创建对应的debezium配置文件

在这里插入图片描述

curl -X POST http://${debezium所在服务器}:8083/connectors

{"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams"}
}
  • 如果需要在cdc输出的语句上显示before信息,需要开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image
  • 如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
    在这里插入图片描述
db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true} 
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true} 
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}

在这里插入图片描述

重点参数

参数描述
connector.class固定值io.debezium.connector.mongodb.MongoDbConnector
mongodb.connection.stringmongodb连接信息
collection.include.list需要监听的具体collection
topic.prefixkafkaTopic前缀
capture.mode输出模式(默认:change_streams_update_full)

capture.mode

模式描述
change_streams输出变化流,但是在进行update操作时,不输出after字段
change_streams_update_full在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容
change_streams_with_pre_image在change_streams的基础上,增加before字段的输出,但需要进行配置
change_streams_update_full_with_pre_image在change_streams_with_pre_image的基础上增加,增加after字段,用于输出现在变化后的数据的内容

其他未使用参数

参数描述
database.include.list需要监听的具体database
database.exclude.list不监听的database(不要与database.include.list填写相同的db)
collection.exclude.list不监听的collection(不要与collection.include.list填写相同的collection)
snapshot.mode指定在连接器启动时执行快照的条件。Initial(默认:重头开始)当连接器启动时,如果没有在其偏移主题中检测值,它会执行数据库的快照。never(从当前位置开始)当连接器启动时,它会跳过快照进程,并立即开始将数据库记录的操作流传输到 oplog。

更多参数请参考
在这里插入图片描述

cdc结果

原数据

{"userId": "1000000","allPoints": 190,"createTime": {"$date": "2024-04-25T13:31:59.678Z"},"updateTime": {"$date": "2024-04-25T13:31:59.678Z"}
}

添加数据

在这里插入图片描述
在这里插入图片描述
capture.mode两种模式输出结果一样

push数据

{$push: {"history":{"historyId": "1","changerPoints": 0,"beforePoints": 0,"afterPoints": 0,"status": "0","createTime": {"$date": "2024-01-01T16:00:00.000Z"},"comment": "测试数据","versionNo": 0}},
}

在这里插入图片描述

第一次添加

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}

在这里插入图片描述

第二次添加

在这里插入图片描述

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1\": {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}}","truncatedArrays": null}}

在这里插入图片描述

如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据的完整数据,例如
在这里插入图片描述

修改数组中的值

在这里插入图片描述

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1.historyId\": \"100\"}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如
在这里插入图片描述

pull操作

在这里插入图片描述

{$pull: {history: {historyId: "100",},},
}

在这里插入图片描述
此时会把现有的所有数据都返回

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"3\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"4\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如在这里插入图片描述

删除字段

在这里插入图片描述

{  "payload": {"before": null,"after": null,"updateDescription": {"removedFields": ["updateTime"],"updatedFields": "{}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据(此处删除的是另外一个字段),例如
在这里插入图片描述

删除数据

在这里插入图片描述
在这里插入图片描述
如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
在这里插入图片描述
通过开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image即可解决

db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true} 
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true} 
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/317502.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

十一、大模型-Semantic Kernel与 LangChain 的对比

Semantic Kernel 与 LangChain 的对比 Semantic Kernel 和 LangChain 都是用于开发基于大型语言模型(LLM)的应用程序的框架,但它们各有特点和优势。 基本概念和目标 Semantic Kernel 是一个由微软开发的轻量级 SDK,旨在帮助开发…

uniapp 自定义 App启动图

由于uniapp默认的启动界面太过普通 所以需要自定义个启动图 普通的图片不可以过不了苹果的审核 所以使用storyboard启动图 生成 storyboard 的网站:初雪云-提供一站式App上传发布解决方案

短视频素材哪个App最好?短视频素材哪里有免费的?

在数字媒体的黄金时代,富有创意的视频内容已成为吸引观众的关键。高质量的视频素材不仅能增强视觉效果,还能提升整体叙述的力度。以下列出了一系列全球顶尖的视频素材提供网站,它们将为你的广告制作、社交媒体或任何视频项目提供极具影响力的…

C++浮点数format时的舍入问题

C浮点数format时的舍入问题 首先有这样一段代码&#xff1a; #include <iostream> #include <stdio.h> using namespace std;int main() {cout << " main begin : " << endl;printf("%.0f \r\n", 1.5);printf("%.0f \r\n&…

jenkins教程

jenkins 一、简介二、下载安装三、配置jdk、maven和SSH四、部署微服务 一、简介 Jenkins是一个流行的开源自动化服务器&#xff0c;用于自动化软件开发过程中的构建、测试和部署任务。它提供了一个可扩展的插件生态系统&#xff0c;支持各种编程语言和工具。 Jenkins是一款开…

ROM修改进阶教程------如何去除安卓机型系统的开机向导 几种操作步骤解析

在和很多工作室定制化系统中。手机在第一次启动的时候系统都会进入设置向导,虽然可以设置手机的基本配置。但有很多客户需要去除手机的开机向导来缩短开机时间。确保手机直接进入工作状态。那么今天的教程针去除对开机向导的几种方法做个解析。机型很多版本不同。操作也有不同…

ENVI下遥感积雪面积信息的提取

积雪是气温、降水变化的最敏感的指示因子之一&#xff0c;ENVI为积雪面积信息的提取提供了多种技术方法 光谱统计学方法 光谱统计学提取积雪面积信息主要利用感兴趣区域ROI&#xff08;样本&#xff09;的选择&#xff0c;利用传统的监督方法实现。 决策树方法 决策树方法提取…

《HCIP-openEuler实验指导手册》1.7 Apache虚拟主机配置

知识点 配置步骤 需求 域名访问目录test1.com/home/source/test1test2.com/home/source/test2test3.com/home/source/test3 创建配置文件 touch /etc/httpd/conf.d/vhost.conf vim /etc/httpd/conf.d/vhost.conf文件内容如下 <VirtualHost *.81> ServerName test1.c…

翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习二

合集 ChatGPT 通过图形化的方式来理解 Transformer 架构 翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习一翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深度学习二翻译: 什么是ChatGPT 通过图形化的方式来理解 Transformer 架构 深…

阿里云开源大模型开发环境搭建

ModelScope是阿里云通义千问开源的大模型开发者社区&#xff0c;本文主要描述AI大模型开发环境的搭建。 如上所示&#xff0c;安装ModelScope大模型基础库开发框架的命令行参数&#xff0c;使用清华大学提供的镜像地址 如上所示&#xff0c;在JetBrains PyCharm的项目工程终端控…

2024深圳杯数学建模竞赛D题(东三省数学建模竞赛D题):建立非均质音板振动模型与参数识别模型

更新完整代码和成品完整论文 《2024深圳杯&东三省数学建模思路代码成品论文》↓↓↓&#xff08;浏览器打开&#xff09; https://www.yuque.com/u42168770/qv6z0d/zx70edxvbv7rheu7?singleDoc# 2024深圳杯数学建模竞赛D题&#xff08;东三省数学建模竞赛D题&#xff0…

深入探索计算机视觉:高级主题与前沿应用的全面解析

引言 计算机视觉&#xff0c;作为人工智能领域的一个重要分支&#xff0c;旨在让计算机能够“看”懂世界&#xff0c;理解和解释视觉场景。随着深度学习技术的迅猛发展&#xff0c;计算机视觉已经在许多领域取得了显著的进展&#xff0c;如自动驾驶、安防监控、医疗诊断等。在…

Go 语言基础(一)【基本用法】

前言 最近心情格外不舒畅&#xff0c;不仅仅是对前途的迷茫&#xff0c;这种迷茫倒是我自己的问题还好&#xff0c;关键它是我们这种普通吗喽抗衡不了的。 那就换个脑子&#xff0c;学点新东西吧&#xff0c;比如 Go&#xff1f; 1、Go 语言入门 介绍就没必要多说了&#xff0…

Linux(ubuntu)—— 用户管理user 用户组group

一、用户 1.1、查看所有用户 cat /etc/passwd 1.2、新增用户 useradd 命令&#xff0c;我这里用的是2.4的命令。 然后&#xff0c;需要设置密码 passwd student 只有root用户才能用passwd命令设置其他用户的密码&#xff0c;普通用户只能够设置自己的密码 二、组 2.1查看…

CentOS/Anolis的Linux系统如何通过VNC登录远程桌面?

综述 需要在server端启动vncserver&#xff0c;推荐tigervnc的server 然后再本地点来启动client进行访问&#xff0c;访问方式是IPport&#xff08;本质是传递数据包到某个ip的某个port&#xff09; 然后需要防火墙开启端口 服务器上&#xff1a;安装和启动服务 安装服务 y…

Macos安装OrbStack

什么是OrbStack OrbStack 是一种在 macOS 上运行容器和 Linux 机器的快速、轻便和简单方法。它是 Docker Desktop 和 WSL 的超强替代品&#xff0c;所有这些都在一个易于使用的应用程序中。 在Macos M系列芯片上&#xff0c;经常遇到docker镜像不兼容的问题&#xff0c;此时使…

LangChain入门2 RAG详解

RAG概述 一个典型的RAG应用程序,它有两个主要组件&#xff1a; 索引&#xff1a;从源中获取数据并对其进行索引的管道。这通常在脱机情况下发生。检索和生成&#xff1a;在运行时接受用户查询&#xff0c;并从索引中检索相关数据&#xff0c;然后将其传递给模型。 从原始数据…

【PHP】安装指定版本Composer

1、下载指定版本composer.phar文件&#xff1a;https://github.com/composer/composer/releases 2、将下载的文件添加到全局路径&#xff1a; sudo mv composer.phar /usr/local/bin/composer 3、赋予权限&#xff1a; sudo chmod x /usr/local/bin/composer 4、查看compos…

【GitHub】github学生认证,在vscode中使用copilot的教程

github学生认证并使用copilot教程 写在最前面一.注册github账号1.1、注册1.2、完善你的profile 二、Github 学生认证注意事项&#xff1a;不完善的说明 三、Copilot四、在 Visual Studio Code 中安装 GitHub Copilot 扩展4.1 安装 Copilot 插件4.2 配置 Copilot 插件&#xff0…

如何使用ChatGPT进行高效的中文到科学英文翻译?

如何使用ChatGPT进行高效的中文到科学英文翻译 在全球化加速的今天&#xff0c;科学交流往往需要跨越语言障碍。特别是在科研领域&#xff0c;有效地将中文研究成果转化为精准的科学英语描述&#xff0c;对于学术发表和国际合作尤为关键。AI翻译工具如ChatGPT可以在这一过程中…