Debezium-MySqlConnectorTask

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
    • 小结

概要

MySqlConnectorTask,用于读取MySQL的二进制日志并生成对应的数据变更事件

整体架构流程

技术名词解释

数据库模式(Database Schema)
数据库模式是指数据库中数据的组织结构和定义,它描述了数据库中所有对象(如表、视图、索引、存储过程等)的结构和关系。具体来说,数据库模式包括以下几个方面:
1  表结构:定义了数据库中各个表的名称、列的名称、数据类型、约束条件(如主键、外键、唯一性约束等)。
2  关系:描述了表与表之间的关系,如一对多、多对多等。
3  索引:定义了表上的索引,用于提高查询性能。
4  视图:定义了虚拟表,这些虚拟表基于SQL查询结果,可以简化复杂的查询操作。
5  存储过程和函数:定义了数据库中的存储过程和函数,用于执行特定的业务逻辑。
6  触发器:定义了在特定事件发生时自动执行的操作。

在 DatabaseHistory 接口中的应用
在 DatabaseHistory 接口中,数据库模式的变更记录和恢复功能主要用于以下场景:
    1  记录变更:当数据库模式发生变化时(如添加新表、修改表结构、删除表等),通过 record 方法记录这些变更。
    2  恢复:当需要恢复到某个历史点的数据库模式时,通过 recover 方法恢复到指定的历史状态。
通过这些功能,可以有效地管理和追踪数据库模式的变化,确保数据的一致性和完整性。

技术细节

@Overridepublic void start(Map<String, String> props) {if (context == null) {throw new ConnectException("Unexpected null context");}// Validate the configuration ...final Configuration config = Configuration.from(props);if (!config.validate(MySqlConnectorConfig.ALL_FIELDS, logger::error)) {throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");}// Create and configure the database history ...this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);if (this.dbHistory == null) {throw new ConnectException("Unable to instantiate the database history class " +config.getString(MySqlConnectorConfig.DATABASE_HISTORY));}Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false); // do not remove// prefixthis.dbHistory.configure(dbHistoryConfig); // validatesthis.dbHistory.start();this.running.set(true);// Read the configuration ...final String user = config.getString(MySqlConnectorConfig.USER);final String password = config.getString(MySqlConnectorConfig.PASSWORD);final String host = config.getString(MySqlConnectorConfig.HOSTNAME);final int port = config.getInteger(MySqlConnectorConfig.PORT);final String initialBinLogFilename = config.getString(MySqlConnectorConfig.INITIAL_BINLOG_FILENAME);final long serverId = config.getLong(MySqlConnectorConfig.SERVER_ID);serverName = config.getString(MySqlConnectorConfig.SERVER_NAME.name(), host + ":" + port);final boolean keepAlive = config.getBoolean(MySqlConnectorConfig.KEEP_ALIVE);final int maxQueueSize = config.getInteger(MySqlConnectorConfig.MAX_QUEUE_SIZE);final long timeoutInMilliseconds = config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);final boolean includeSchemaChanges = config.getBoolean(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);final long pollIntervalMs = config.getLong(MySqlConnectorConfig.POLL_INTERVAL_MS);maxBatchSize = config.getInteger(MySqlConnectorConfig.MAX_BATCH_SIZE);metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);// Define the filter using the whitelists and blacklists for tables and database names ...Predicate<TableId> tableFilter = TableId.filter(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),config.getString(MySqlConnectorConfig.TABLE_WHITELIST),config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {Predicate<TableId> isBuiltin = (id) -> {return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());};tableFilter = tableFilter.and(isBuiltin.negate());}// Create the queue ...events = new LinkedBlockingDeque<>(maxQueueSize);batchEvents = new ArrayDeque<>(maxBatchSize);// Set up our handlers for specific kinds of events ...tables = new Tables();tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs);eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate);eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete);// Set up the log reader ...client = new BinaryLogClient(host, port, user, password);client.setServerId(serverId);client.setKeepAlive(keepAlive);if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);client.registerEventListener(this::enqueue);client.registerLifecycleListener(traceLifecycleListener());// Set up the event deserializer with additional types ...EventDeserializer eventDeserializer = new EventDeserializer();eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());client.setEventDeserializer(eventDeserializer);// Check if we've already processed some of the log for this database ...source.setServerName(serverName);// Get the offsets for our partition ...Map<String, ?> offsets = context.offsetStorageReader().offset(source.partition());if (offsets != null) {source.setOffset(offsets);// And set the client to start from that point ...client.setBinlogFilename(source.binlogFilename());client.setBinlogPosition(source.binlogPosition());// The event row number will be used when processing the first event ...logger.info("Restarting MySQL connector '{}' from binlog file {}, position {}, and event row {}",serverName, source.binlogFilename(), source.binlogPosition(), source.eventRowNumber());// We have to make our Tables reflect the state of the database at the above source partition (e.g., the location// in the MySQL log where we last stopped reading. Since the TableConverts writes out all DDL statements to the// TopicSelector.getTopic(serverName) topic, we can consume that topic and apply each of the DDL statements// to our Tables object. Each of those DDL messages is keyed by the database name, and contains a single string// of DDL. However, we should consume no further than offset we recovered above.try {logger.info("Recovering MySQL connector '{}' database schemas from history stored in {}", serverName, dbHistory);DdlParser ddlParser = new MySqlDdlParser();dbHistory.recover(source.partition(), source.offset(), tables, ddlParser);tableConverters.loadTables();logger.debug("Recovered MySQL connector '{}' database schemas: {}", serverName, tables.subset(tableFilter));} catch (Throwable t) {throw new ConnectException("Failure while recovering database schemas", t);}} else {// initializes this position, though it will be reset when we see the first event (should be a rotate event) ...client.setBinlogFilename(initialBinLogFilename);logger.info("Starting MySQL connector from beginning of binlog file {}, position {}",source.binlogFilename(), source.binlogPosition());}// Start the log reader, which starts background threads ...try {logger.debug("Connecting to MySQL server");client.connect(timeoutInMilliseconds);logger.debug("Successfully connected to MySQL server and beginning to read binlog");} catch (TimeoutException e) {double seconds = TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds);throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to the MySQL database at " + host+ ":" + port + " with user '" + user + "'", e);} catch (AuthenticationException e) {throw new ConnectException("Failed to authenticate to the MySQL database at " + host + ":" + port + " with user '" + user + "'",e);} catch (Throwable e) {throw new ConnectException("Unable to connect to the MySQL database at " + host + ":" + port + " with user '" + user + "': " + e.getMessage(), e);}}

 

  1. 验证配置:从传入的属性中创建配置对象并验证其有效性。
  2. 创建数据库历史记录:根据配置实例化 DatabaseHistory 对象并启动。
  3. 读取配置参数:从配置中读取各种必要的参数,如用户名、密码、主机、端口等。
  4. 定义表过滤器:根据白名单和黑名单定义表过滤器,忽略内置表。
  5. 创建队列:初始化事件队列和批处理队列。
  6. 设置事件处理器:为不同的事件类型设置处理器。
  7. 设置日志读取器:创建并配置 BinaryLogClient,注册事件监听器和生命周期监听器。
  8. 设置事件反序列化器:配置事件反序列化器以处理特定类型的事件。
  9. 恢复数据库状态:检查是否有已处理的日志,如果有则恢复数据库模式。
  10. 连接到 MySQL 服务器:尝试连接到 MySQL 服务器并开始读取二进制日志。

小结

/**
 * 该类负责配置和初始化MySQL连接器,包括设置数据库和表的过滤条件、创建事件队列、注册事件处理器、设置二进制日志客户端、恢复数据库模式等。
 * 主要功能包括:
 * - 应用数据库和表的黑白名单过滤条件。
 * - 配置是否忽略内置表。
 * - 创建事件队列和批处理事件队列。
 * - 注册不同类型的事件处理器。
 * - 初始化二进制日志客户端并设置相关参数。
 * - 检查并恢复已处理的日志位置。
 * - 连接到MySQL服务器并开始读取二进制日志。
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474250.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SDF,一个从1978年运行至今的公共Unix Shell

关于SDF 最近发现了一个很古老的公共Unix Shell服务器&#xff0c;这个项目从1978年运行至今&#xff0c;如果对操作系统&#xff0c;对Unix感兴趣&#xff0c;可以进去玩一玩体验一下 SDF Public Access UNIX System - Free Shell Account and Shell Access 注册方式 我一…

逆向攻防世界CTF系列41-EASYHOOK

逆向攻防世界CTF系列41-EASYHOOK 看题目是一个Hook类型的&#xff0c;第一次接触&#xff0c;虽然学过相关理论&#xff0c;可以看我的文章 Hook入门(逆向)-CSDN博客 题解参考&#xff1a;https://www.cnblogs.com/c10udlnk/p/14214057.html和攻防世界逆向高手题之EASYHOOK-…

C# 面向对象

C# 面向对象编程 面向过程&#xff1a;一件事情分成多个步骤来完成。 把大象装进冰箱 (面向过程化设计思想)。走一步看一步。 1、打开冰箱门 2、把大象放进冰箱 3、关闭冰箱门 面向对象&#xff1a;以对象作为主体 把大象装进冰箱 1、抽取对象 大象 冰箱 门 &#xff0…

【AI图像生成网站Golang】项目架构

AI图像生成网站 目录 一、项目介绍 二、雪花算法 三、JWT认证与令牌桶算法 四、项目架构 五、图床上传与图像生成API搭建 六、项目测试与调试(等待更新) 四、项目架构 本项目的后端基于Golang和Gin框架开发&#xff0c;主要包括的模块有&#xff1a; backend/ ├── …

Acme PHP - Let‘s Encrypt

Lets Encrypt是一个于2015年三季度推出的数字证书认证机构&#xff0c;旨在以自动化流程消除手动创建和安装证书的复杂流程&#xff0c;并推广使万维网服务器的加密连接无所不在&#xff0c;为安全网站提供免费的SSL/TLS证书。 使用PHP来更新证书&#xff1a; Acme PHP | Rob…

前后端交互之动态列

一. 情景 在做项目时&#xff0c;有时候后会遇到后端使用了聚合函数&#xff0c;导致生成的对象的属性数量或数量不固定&#xff0c;因此无法建立一个与之对应的对象来向前端传递数据&#xff0c;这时可以采用NameDataListVO向前端传递数据。 Data Builder AllArgsConstructo…

【LeetCode 题】只出现一次的数字--其余数字都出现3次

&#x1f536;力扣上一道有意思的题&#xff0c;参考了评论区的解法&#xff0c;一起来学习 &#x1f354;思路说明&#xff1a; &#x1f31f;举例说明 &#xff1a; nums [2,2,3,2] 我们需要把其中的数字 ‘3’ 找出来 1️⃣把每个数都想成32位的二进制数&#xff08;这里举…

如何在 Ubuntu 上安装 Jupyter Notebook

本篇文章将教你在 Ubuntu 服务器上安装 Jupyter Notebook&#xff0c;并使用 Nginx 和 SSL 证书进行安全配置。 我将带你一步步在云服务器上搭建 Jupyter Notebook 服务器。Jupyter Notebook 在数据科学和机器学习领域被广泛用于交互式编码、可视化和实验。在远程服务器上运行…

一文了解Android的核心系统服务

在 Android 系统中&#xff0c;核心系统服务&#xff08;Core System Services&#xff09;是应用和系统功能正常运行的基石。它们负责提供系统级的资源和操作支持&#xff0c;包含了从启动设备、管理进程到提供应用基础组件的方方面面。以下是 Android 中一些重要的核心系统服…

学者观察 | 元计算、人工智能和Web 3.0——山东大学教授成秀珍

导语 成秀珍教授提出元计算是在开放的零信任环境下整合算力资源打通数据壁垒构建自进化智能的新质生产力技术&#xff0c;是一种新计算范式&#xff1b;区块链是Web3.0的核心技术之一&#xff0c;有助于保障开放零信任环境下&#xff0c;用户、设备和服务间去中心化数据流通的…

集群聊天服务器(9)一对一聊天功能

目录 一对一聊天离线消息服务器异常处理 一对一聊天 先新添一个消息码 在业务层增加该业务 没有绑定事件处理器的话消息会派发不出去 聊天其实是服务器做一个中转 现在同时登录两个账号 收到了聊天信息 再回复一下 离线消息 声明中提供接口和方法 张三对离线的李…

MySQL —— MySQL索引介绍、索引数据结构、聚集索引和辅助索引、索引覆盖

文章目录 索引概念索引分类索引数据结构种类Innodb 索引数据结构聚集索引和辅助索引&#xff08;非聚集索引&#xff09;聚集索引辅助索引&#xff08;非聚集索引&#xff09; 索引覆盖 索引概念 索引是对数据库表中一列或多列的值进行排序后的一种数据结构。用于帮助 mysql 提…

4A架构之间的关系和集成

首先我们还是来看业务架构业务域&#xff0c;大家都知道在业务架构里面其实有三个核心的内容&#xff0c;一个是价值流&#xff0c;一个是业务能力&#xff0c;一个是业务流程。 价值流往往就是顶端的流程&#xff0c;业务能力的分解往往是2~4级&#xff0c;对于详细的业务流程…

RadSystems 自定义页面全攻略:个性化任务管理系统的实战设计

系列文章目录 探索RadSystems&#xff1a;低代码开发的新选择&#xff08;一&#xff09;&#x1f6aa; 探索RadSystems&#xff1a;低代码开发的新选择&#xff08;二&#xff09;&#x1f6aa; 探索RadSystems&#xff1a;低代码开发的新选择&#xff08;三&#xff09;&…

([LeetCode仓颉解题报告] 661. 图片平滑器

[LeetCode仓颉解题报告] 661. 图片平滑器 一、 题目1. 题目描述2. 原题链接 二、 解题报告1. 思路分析2. 复杂度分析3. 代码实现 三、 本题小结四、 参考链接 一、 题目 1. 题目描述 2. 原题链接 链接: 661. 图片平滑器 二、 解题报告 1. 思路分析 由于只需要3*39个格子&am…

若依权限控制

springbootvue2项目中的权限控制(若依项目) 步骤: 1.登录管理员账号,为普通用户增加权限按钮 绿色部分为权限控制字符 2.在后端对应的方法上增加权限控制(这里以删除操作为例):PreAuthorize(“ss.hasPermi(‘area:store:remove’)”) 3.在前端对应的按钮上增加权限控制:v-ha…

gvim添加至右键、永久修改配置、放大缩小快捷键、ctrl + c ctrl +v 直接复制粘贴、右键和还原以前版本(V)冲突

一、将 vim 添加至右键 进入安装目录找到 vim91\install.exe 管理员权限执行 Install will do for you:1 Install .bat files to use Vim at the command line:2 Overwrite C:\Windows\vim.bat3 Overwrite C:\Windows\gvim.bat4 Overwrite C:\Windows\evim.bat…

使用 OpenAI 进行数据探索性分析(EDA)

探索性数据分析&#xff08;Exploratory Data Analysis, 简称 EDA&#xff09;是数据分析中不可或缺的环节&#xff0c;帮助分析师快速了解数据的分布、特征和潜在模式。传统的 EDA 通常需要手动编写代码或使用工具完成。现在&#xff0c;通过 OpenAI 的 GPT-4 模型&#xff0c…

汽车资讯新篇章:Spring Boot技术启航

4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式&#xff0c;是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示&#xff1a; 图4-1系统工作原理…

【EasyExcel】复杂导出操作-自定义颜色样式等(版本3.1.x)

文章目录 前言一、自定义拦截器二、自定义操作1.自定义颜色2.合并单元格 三、复杂操作示例1.实体(使用了注解式样式)&#xff1a;2.自定义拦截器3.代码4.最终效果 前言 本文简单介绍阿里的EasyExcel的复杂导出操作&#xff0c;包括自定义样式&#xff0c;根据数据合并单元格等。…