JAVA通过debezium实时采集mysql数据

前期准备

需要提前安装mysql并且开启binlog,需要准备kafka和zookeeper环境

示例采用debezium1.9.0版本

Maven配置

<version.debezium>1.9.0.Final</version.debezium>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-api</artifactId>

    <version>${version.debezium}</version>

</dependency>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-embedded</artifactId>

    <version>${version.debezium}</version>

</dependency>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-connector-mysql</artifactId>

    <version>${version.debezium}</version>

</dependency>

Java代码

import io.debezium.engine.ChangeEvent;

import io.debezium.engine.DebeziumEngine;

import io.debezium.engine.format.Json;

import java.io.IOException;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class DebeziumTest {

    private static DebeziumEngine<ChangeEvent<String, String>> engine;

    public static void main(String[] args) throws Exception {

        final Properties props = new Properties();

        props.setProperty("name", "dbz-engine");

        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

        //offset config begin - 使用文件来存储已处理的binlog偏移量

        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");

//        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");

        props.setProperty("offset.storage.file.filename", "D:/tmp/dbz/storage/mysql_offsets.dat");

        props.setProperty("offset.flush.interval.ms", "0");

        //offset config end

        props.setProperty("database.server.name", "mysql-connector");

        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");

        props.setProperty("database.history.file.filename", "D:/tmp/dbz/storage/mysql_dbhistory.txt");

        props.setProperty("database.server.id", "122110"); //随机设置

        props.setProperty("database.hostname", "localhost");

        props.setProperty("database.port", "3306");

        props.setProperty("database.user", "root");

        props.setProperty("database.password", "123456");

        props.setProperty("database.include.list", "test");//要捕获的数据库名

        props.setProperty("topic.prefix", "mysql-");

//        props.setProperty("table.include.list", "inventory.a");//要捕获的数据表

        props.setProperty("snapshot.mode", "initial");//全量+增量

        props.setProperty("bootstrap.servers", "localhost:9092");

        props.setProperty("topic", "test");

//        props.setProperty("offset.storage.topic", "log-t");

//        props.setProperty("offset.storage.partitions", "1");

//        props.setProperty("offset.storage.replication.factor", "1");

//        KafkaProducerTest test = new KafkaProducerTest("test1841");

        // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式

        engine = DebeziumEngine.create(Json.class)

                .using(props)

                .notifying(record -> {

                    //test.sendMsg(record.value());

                    System.out.println(record.value());//输出到控制台

                })

                .using((success, message, error) -> {

                    if (error != null) {

                        error.printStackTrace();

                        // 报错回调

                        System.out.println("------------error, message:" + message);

                        System.out.println( "exception:" + error);

                    }

                    closeEngine(engine);

                })

                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.execute(engine);

        addShutdownHook(engine);

        awaitTermination(executor);

        System.out.println("------------main finished.");

    }

    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {

        try {

            engine.close();

        } catch (IOException ignored) {

        }

    }

    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {

        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));

    }

    private static void awaitTermination(ExecutorService executor) {

        if (executor != null) {

            try {

                executor.shutdown();

                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {

                }

            } catch (InterruptedException e) {

                Thread.currentThread().interrupt();

            }

        }

    }

}

运行效果

随意修改一条mysql表中的数据

修改后

代码日志

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"create_time"}],"optional":true,"name":"mysql_connector.di_test.t_user.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"create_time"}],"optional":true,"name":"mysql_connector.di_test.t_user.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.di_test.t_user.Envelope"},"payload":{"before":{"id":2,"name":"lisi2","age":19,"phone":"18444444","address":"北京海淀1","create_time":"2023-12-28T06:47:07Z"},"after":{"id":2,"name":"lisi2","age":19,"phone":"18444444","address":"北京海淀","create_time":"2023-12-28T06:47:07Z"},"source":{"version":"1.9.0.Final","connector":"mysql","name":"mysql-connector","ts_ms":1722423711000,"snapshot":"false","db":"di_test","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"binlog.000008","pos":3134,"row":0,"thread":17,"query":null},"op":"u","ts_ms":1722423711663,"transaction":null}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/391984.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java获取exe文件详细信息:产品名称,产品版本等

使用Maven项目&#xff0c;在pom.xml文件中注入&#xff1a; <dependency><groupId>com.kichik.pecoff4j</groupId><artifactId>pecoff4j</artifactId><version>0.4.1</version></dependency> 程序代码&#xff1a; import …

Sun Frame:基于 SpringBoot 的轻量级开发框架(个人开源项目)

文章目录 &#x1f31e; Sun Frame&#xff1a;基于 SpringBoot 的轻量级开发框架&#xff08;个人开源项目&#xff09;&#x1f680; 欢迎使用 Sun Frame&#x1f31f; 项目亮点&#x1f4e6; 模块结构&#x1f310; Sun-Cloud&#x1f4e6; Sun-Common &#x1f4a1; 示例与…

微力同步如何安装使用并使用内网穿透配置公网地址远程访问

文章目录 1.前言2. 微力同步网站搭建2.1 微力同步下载和安装2.2 微力同步网页测试2.3 内网穿透工具安装 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1.前言 私有云盘作为云存储概念的延伸&#xff0c;虽然谈不上多么新颖&#xff0c;但是其广…

主成分分析和线性判别分析

主成分分析 (PCA) PCA 是一种线性降维方法&#xff0c;通过投影到主成分空间&#xff0c;尽可能保留数据的方差。 原理 PCA 通过寻找数据投影后方差最大的方向&#xff0c;主成分是这些方向上的正交向量。 公式推理 对数据中心化&#xff1a; 其中&#xff0c;μ 是数据的…

“微软蓝屏”事件敲响网络安全的警钟

文章目录 前言一、对网络安全的警醒二、我们如何应对&#xff1f;总结 前言 “微软蓝屏”事件是一次由微软合作伙伴CrowdStrike的终端安全产品更新与操作系统内核冲突导致的全球性技术故障。这一事件不仅影响了多个国家的航空、银行、金融、零售、餐饮等多个行业&#xff0c;还…

吴恩达老师机器学习-ex8

data1 导入库&#xff0c;读取数据并进行可视化 因为这次的数据是mat文件&#xff0c;需要使用scipy库中的loadmat进行读取数据。 通过对数据类型的分析&#xff0c;发现是字典类型&#xff0c;查看该字典的键&#xff0c;可以发现又X等关键字。 import numpy as np import…

十七、Intellij IDEA2022.1.1下载、安装、激活

目录 &#x1f33b;&#x1f33b; 一、下载二、 安装三、激活 一、下载 官网下载地址 本地直接下载 目前Intellij IDEA的最新版本已经更新到了 2024.1.4&#xff0c;由于最新版本可能存在不稳定的问题&#xff0c;此处选择其他版本进行下载&#xff0c;此处以2022.1.1为例进行下…

【Spring】Bean详细解析

1.Spring Bean的生命周期 整体上可以简单分为四步&#xff1a;实例化 —> 属性赋值 —> 初始化 —> 销毁。初始化这一步涉及到的步骤比较多&#xff0c;包含 Aware 接口的依赖注入、BeanPostProcessor 在初始化前后的处理以及 InitializingBean 和 init-method 的初始…

基于STM32的智能家居安防系统教程

目录 引言环境准备智能家居安防系统基础代码实现&#xff1a;实现智能家居安防系统 门窗传感器模块视频监控模块报警与通知模块用户界面与远程控制应用场景&#xff1a;家庭安防与监控常见问题与解决方案收尾与总结 引言 随着智能家居的普及&#xff0c;家庭安防系统成为保护…

艾瑞白皮书解读(一)丨为什么说数据工程是中国企业数据治理的最佳实践?

2024年7月 艾瑞咨询公司对国内数据治理行业进行了研究&#xff0c;访问了国内多位大中型企业数据治理相关负责人&#xff0c;深度剖析中国企业在数字化转型过程中面临到的核心数据问题后&#xff0c;重磅发布《2024中国企业数据治理白皮书》&#xff08;以下简称“白皮书”&…

算法通关:017_1:二叉树及三种顺序的递归遍历

文章目录 题目思路代码运行结果 题目 二叉树及三种顺序的递归遍历 思路 代码 /*** Author: ggdpzhk* CreateTime: 2024-08-04** 二叉树及三种顺序的递归遍历* LeetCode 144. 二叉树的前序遍历* LeetCode 94. 二叉树的中序遍历* LeetCode 145. 二叉树的后序遍历* LeetCode 10…

龙迅LT8713SX 高性能TYPE-C/DP转三端口DP1.4/HDMI 2.0转换器,带音频

龙迅LT8713SX描述&#xff1a; LT8713SX是一个高性能类型-C/DP1.4到Type-C/DP1.4/HDMI2.0转换器&#xff0c;具有三个可配置的DP1.4/HDMI2.0/DP输出接口和音频输出接口。LT8713SX同时支持显示端口™单流传输&#xff08;SST&#xff09;模式和多流传输&#xff08;MST&#xf…

Adobe Acrobat不支持图片格式转换PDF文件

我在将图片格式&#xff08;PNG,JPEG&#xff09;转换为PDF的过程中遇到了如下问题&#xff1a; 单文件的解决办法——在软件外实现转换&#xff1a; 使用照片打开图片 选择打印 打印机选择Adobe PDF&#xff0c;执行打印 选择PDF文件的保存位置&#xff0c;过一会儿可以正…

基本K8s搭建Jekins+gitee项目自动部署

这里写目录标题 1.基本K8s部署安装Jekins2.设置Jenkins国内镜像源2.安装Gitee插件1.安装Gitee Plugin2.验证安装Gitee Plugin 3.新建任务1.输入任务名称2.输入你gitee上的项目链接3.测试构建 4.查看项目在k8s集群master节点的位置1.确认 Jenkins Pod 名称2.使用kubectl exec到 …

视频如何生成二维码(自动生成二维码)完整教程

在企业中&#xff0c;产品视频二维码怎么制作&#xff0c;产品二维码怎么实现微信扫码便捷观看&#xff1f;上图文教程&#xff1a;视频二维码生成器/上传视频自动生成二维码完整教程。 目前市面上有很多工具&#xff0c;可以实现&#xff0c;比如草料二维码、酷播云二维码等等…

【Web开发手礼】探索Web开发的秘密(十四)-Vue2(1)Node.js的安装、Vue入门

主要介绍了Node.js的安装教程、Vue2常用的一些指令、声明周期&#xff01;&#xff01;&#xff01; 文章目录 前言 Node.js安装 选择安装目录 验证NodeJS环境变量 配置npm的全局安装路径 切换npm的淘宝镜像 安装Vue-cli ​编辑 Vue2入门 引入vue.js文件 入门代码 常用指令 生…

前端(vue3)和后端(django)的交互

vue3中&#xff1a; <template><div><h2>注册页面</h2><form submit.prevent"submitForm"><label for"username">用户名&#xff1a;</label><input type"text" id"username" v-model…

AWS S3怎么收费的?一文带你搞懂!

Amazon Simple Storage Service&#xff08;S3&#xff09;是亚马逊网络服务&#xff08;AWS&#xff09;提供的一个高度可扩展的对象存储服务&#xff0c;广泛应用于数据存储、备份、归档和大数据分析等领域。S3的计费模式相对灵活&#xff0c;旨在满足不同用户的需求。本文中…

小试牛刀-walletconnect二维码及交互

目录 1.编写目的 2.实现功能 3.功能详解 依赖组件 3.1 二维码生成 3.1.1 初始化SignClient 3.1.2 创建会话空间获取WC协议uri 3.1.3 生成二维码供用户扫描 3.1.4 等待扫描 3.2 发送交易事务 3.2.1 创建交易事务 3.2.2 向用户发送交易事务 3.3 签名事务 3.3.1 接收…

安装VMware16

安装VMware16的步骤主要包括下载、‌安装和激活。‌ 下载VMware16 首先&#xff0c;‌需要从VMware官网下载VMware Workstation Pro 16的安装包。‌下载链接为https://customerconnect.vmware.com/cn/downloads/info/slug/desktop_end_user_computing/vmware_workstation_pro…