前置
服务器上需要安装jdk11
jdk下载地址
kafka安装
官网下载地址
安装教程
debezium 安装
运行 Debezium 连接器需要 Java 11 或更高版本
Debezium 并不是一个独立的软件,而是很多个 Kafka 连接器的总称。这些 Kafka 连接器分别对应不同的数据库,比如 MySQL、Oracle 等。按 Kafka 连接器的常见命名规则,可能我们会把它们叫做 MySQL Kafka Source Connector 之类。
部署
1.下载对应版本的debezium插件
插件地址
2.文件解压
将下载的文件解压,将解压后的文件放到kafka的plugin文件夹下(该plugin文件夹为自己创建的plugin文件夹)*,例如
3. 通过 kafka connect部署
kafka connect有两种部署方式,一是单机部署,二是分布式部署。单机部署配置kafka/config/connect-standalone.properties 文件,分布式部署则配置kafka/config/connect-distributed.properties。分布式部署支持通过rest api管理connector
此处是单机部署,配置文件为kafka/config/connect-standalone.properties,主要修改以下内容:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/local/kafka/plugin
4.启动kafka-connect
需要先启动kafka
bin/connect-standalone.sh config/connect-standalone.properties
5.创建对应的debezium配置文件
curl -X POST http://${debezium所在服务器}:8083/connectors
{"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams"}
}
- 如果需要在cdc输出的语句上显示before信息,需要开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image
- 如果capture.mode未设置成change_streams_with_pre_image或change_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true}
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true}
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}
重点参数
参数 | 描述 |
---|---|
connector.class | 固定值io.debezium.connector.mongodb.MongoDbConnector |
mongodb.connection.string | mongodb连接信息 |
collection.include.list | 需要监听的具体collection |
topic.prefix | kafkaTopic前缀 |
capture.mode | 输出模式(默认:change_streams_update_full) |
capture.mode
模式 | 描述 |
---|---|
change_streams | 输出变化流,但是在进行update操作时,不输出after字段 |
change_streams_update_full | 在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容 |
change_streams_with_pre_image | 在change_streams的基础上,增加before字段的输出,但需要进行配置 |
change_streams_update_full_with_pre_image | 在change_streams_with_pre_image的基础上增加,增加after字段,用于输出现在变化后的数据的内容 |
其他未使用参数
参数 | 描述 |
---|---|
database.include.list | 需要监听的具体database |
database.exclude.list | 不监听的database(不要与database.include.list填写相同的db) |
collection.exclude.list | 不监听的collection(不要与collection.include.list填写相同的collection) |
snapshot.mode | 指定在连接器启动时执行快照的条件。Initial(默认:重头开始)当连接器启动时,如果没有在其偏移主题中检测值,它会执行数据库的快照。never(从当前位置开始)当连接器启动时,它会跳过快照进程,并立即开始将数据库记录的操作流传输到 oplog。 |
更多参数请参考
cdc结果
原数据
{"userId": "1000000","allPoints": 190,"createTime": {"$date": "2024-04-25T13:31:59.678Z"},"updateTime": {"$date": "2024-04-25T13:31:59.678Z"}
}
添加数据
capture.mode两种模式输出结果一样
push数据
{$push: {"history":{"historyId": "1","changerPoints": 0,"beforePoints": 0,"afterPoints": 0,"status": "0","createTime": {"$date": "2024-01-01T16:00:00.000Z"},"comment": "测试数据","versionNo": 0}},
}
第一次添加
{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}
第二次添加
{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1\": {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}}","truncatedArrays": null}}
如果capture.mode为change_streams_update_full,则会在after字段上显示现在修改的这条数据的完整数据,例如
修改数组中的值
{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1.historyId\": \"100\"}","truncatedArrays": null}}
如果capture.mode为change_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如
pull操作
{$pull: {history: {historyId: "100",},},
}
此时会把现有的所有数据都返回
{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"3\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"4\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}
如果capture.mode为change_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如
删除字段
{ "payload": {"before": null,"after": null,"updateDescription": {"removedFields": ["updateTime"],"updatedFields": "{}","truncatedArrays": null}}
如果capture.mode为change_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据(此处删除的是另外一个字段),例如
删除数据
如果capture.mode未设置成change_streams_with_pre_image或change_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
通过开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image即可解决
db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true}
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true}
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}