Debezium-EmbeddedEngine

提示:一个嵌入式的Kafka Connect源连接器的工作机制

文章目录

  • 前言
  • 一、控制流图
  • 二、代码分析
    • 1.构造函数
    • 2.完成回调
    • 3.连接器回调
    • 4.RUN
  • 总结


前言

工作机制:

* 独立运行:嵌入式连接器在应用程序进程中独立运行,不需要Kafka、Kafka Connect或  Zookeeper进程

* 数据传递:应用程序设置连接器并提供一个消费者函数,连接器将所有包含数据库变更事件的SourceRecord传递给该函数。

* 责任转移:应用程序负责故障恢复、可扩展性和持久性。

* 默认存储:连接器的数据库模式历史和偏移量默认存储在内存中,重启后会丢失。

* 执行与停止:连接器设计为提交给Executor或ExecutorService单线程执行,可以通过调用stop()方法或中断线程来停止。


提示:以下是本篇文章正文内容

一、控制流图

控制流图

二、代码分析

1.构造函数

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,CompletionCallback completionCallback, ConnectorCallback connectorCallback) {this.config = config;this.consumer = consumer;this.classLoader = classLoader;this.clock = clock;this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {if (!success) logger.error(msg, error);};this.connectorCallback = connectorCallback;this.completionResult = new CompletionResult();assert this.config != null;assert this.consumer != null;assert this.classLoader != null;assert this.clock != null;keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);Configuration valueConverterConfig = config;if (valueConverter instanceof JsonConverter) {// Make sure that the JSON converter is configured to NOT enable schemas ...valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();}valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);// Create the worker config, adding extra fields that are required for validation of a worker config// but that are not used within the embedded engine (since the source records are never serialized) ...Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());workerConfig = new EmbeddedConfig(embeddedConfig);}

构造函数

2.完成回调

/*** A callback function to be notified when the connector completes.*/
public interface CompletionCallback {/*** Handle the completion of the embedded connector engine.* * @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error*            that prevented startup or premature termination.* @param message the completion message; never null* @param error the error, or null if there was no exception*/void handle(boolean success, String message, Throwable error);
}

这段代码定义了一个接口 CompletionCallback,其中包含一个方法 handle。该方法用于处理嵌入式连接器引擎的完成状态。
参数:
success: 布尔值,表示连接器是否正常完成。如果为 true,表示连接器正常完成;如果为 false,表示连接器启动失败或提前终止。
message: 完成消息,不能为空。
error: 异常对象,如果没有异常则为 null。 

完成回调

3.连接器回调

    /*** Callback function which informs users about the various stages a connector goes through during startup*/public interface ConnectorCallback {/*** Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has* completed successfully*/default void connectorStarted() {// nothing by default}/*** Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has* completed successfully*/default void connectorStopped() {// nothing by default}/*** Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has* completed successfully*/default void taskStarted() {// nothing by default}/*** Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has* completed successfully*/default void taskStopped() {// nothing by default}}

这段代码定义了一个接口 ConnectorCallback,其中包含了四个默认方法:connectorStarted、connectorStopped、taskStarted 和 taskStopped。这些方法用于在连接器和任务的不同生命周期阶段通知用户。
connectorStarted:当连接器成功启动时调用。
connectorStopped:当连接器成功停止时调用。
taskStarted:当连接器任务成功启动时调用。
taskStopped:当连接器任务成功停止时调用。
这些方法的默认实现为空,子类可以根据需要重写这些方法来添加自定义的回调逻辑。


连接回调

4 RUN方法核心流程

run()

核心流程参考上图

注:debezium-0.6版本 

总结

EmbeddedEngine方法和成员变量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474913.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ThreadLocal原理及其内存泄漏

ThreadLocal通过为每个线程创建一个共享变量的副本来保证各个线程之间变量的访问和修改互不影响。 ThreadLocal存放的值是线程内共享的&#xff0c;线程间互斥的&#xff0c;主要用于线程内共享数据&#xff0c;避免通过参数传递。 ThreadLocal有四个方法&#xff1a; initialV…

Java中日志采集框架-JUL、Slf4j、Log4j、Logstash

1. 日志采集 日志采集是指在软件系统、网络设备、服务器或其他IT基础设施中自动收集日志文件和事件信息的过程。这些日志通常包含了时间戳、事件类型、源和目标信息、错误代码、用户操作记录等关键数据。日志采集的目的是为了监控系统运行状态、分析系统性能、审计用户行为、故…

C++系列之继承

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; 继承的概念 继承机制是面向对象程序设计使代码可以复用的最重要的手段&#xf…

记录———封装uni-app+vant(u-upload)上传图片组件

上传图片回显&#xff0c;自定义图片回显样式 这段代码是一个Vue组件&#xff0c;主要实现了图片上传和预览的功能。组件接收了父组件传递的图片列表、最大图片数量和上传状态等属性。在模板中&#xff0c;使用了uni-easyinput组件和u-upload组件来实现图片上传和预览功能。在…

Java从入门到精通笔记篇(十三)

与流处理 ambda表达式 定义 lambda表达式不能被独立执行&#xff0c;因此必须实现函数式接口&#xff0c;并且会返回一个函数式接口的对象。 可将其语法用下列的方式理解 误区警示 “->”符号是由英文状态下的“-”和“>”组成的&#xff0c;符号之间没有空格。 lambd…

kvm-dmesg:从宿主机窥探虚拟机内核dmesg日志

在虚拟化环境中&#xff0c;实时获取虚拟机内核日志对于系统管理员和开发者来说至关重要。传统的 dmesg 工具可以方便地查看本地系统的内核日志&#xff0c;但在KVM&#xff08;基于内核的虚拟机&#xff09;环境下&#xff0c;获取虚拟机内部的内核日志则复杂得多。为了简化这…

apipost下载安装教程、脚本详细使用教程

目录 apipost脚本使用教程 缘由&#xff1a; 实现流程&#xff1a; 1、设置接口需要的URL&#xff1a; 2、boby: 3、预执行操作&#xff1a; 4、断言 5、执行结果&#xff1a; 什么是ApiPost&#xff1f; 下载以及安装&#xff1a; apipost使用文档介绍&#xff1a;…

25. 架构能力

文章目录 第25章 架构能力25.1 个人能力&#xff1a;架构师的职责、技能和知识职责技能知识那经验方面呢&#xff1f; 25.2 软件架构组织的能力25.3 成为更优秀的架构师接受指导指导他人 25.4 小结25.5 扩展阅读25.6 问题讨论 第25章 架构能力 人生苦短&#xff0c;学海无涯。 …

UniApp的Vue3版本中H5配置代理的最佳方法

UniApp的Vue3版本中H5项目在本地开发时需要配置跨域请求调试 最开始在 manifest.json中配置 总是报404&#xff0c;无法通过代理请求远程的接口并返回404错误。 经过验证在项目根目录创建 vite.config.js文件 vite.config.js内容: // vite.config.js import {defineConfig }…

kafka基础

文章目录 一、Kafka入门1.1、JMS1.2、生产者-消费者模式1.3、ZooKeeper 二、kafka基础架构2.1、producer2.2、kafka cluster2.2.1、broker2.2.2、Controller2.2.3、Topic2.2.4、Partition2.2.5、Replication2.2.6、Leader & Follower 2.3、consumer 一、Kafka入门 Kafka是一…

SIMCom芯讯通A7680C在线升级:FTP升级成功;http升级腾讯云对象储存的文件失败;http升级私有服务器的文件成功

从事嵌入式单片机的工作算是符合我个人兴趣爱好的,当面对一个新的芯片我即想把芯片尽快搞懂完成项目赚钱,也想着能够把自己遇到的坑和注意事项记录下来,即方便自己后面查阅也可以分享给大家,这是一种冲动,但是这个或许并不是原厂希望的,尽管这样有可能会牺牲一些时间也有哪天原…

CSS一些练习过程

1.字体样式 代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title…

Linux系统Centos设置开机默认root用户

目录 一. 教程 二. 部分第三方工具配置也无效 一. 教程 使用 Linux 安装Centos系统的小伙伴大概都知道&#xff0c;我们进入系统后&#xff0c;通常都是自己设置的普通用户身份&#xff0c;而不是 root 超级管理员用户&#xff0c;导致我们在操作文件夹时往往爆出没有权限&am…

【机器学习】机器学习中用到的高等数学知识-7.信息论 (Information Theory)

熵 (Entropy)&#xff1a;用于评估信息的随机性&#xff0c;常用于决策树和聚类算法。交叉熵 (Cross-Entropy)&#xff1a;用于衡量两个概率分布之间的差异&#xff0c;在分类问题中常用。 信息论作为处理信息量和信息传输的数学理论&#xff0c;在机器学习中具有广泛的应用。…

【C#】C#编程入门指南:构建你的.NET开发基础

文章目录 前言&#xff1a;1. C# 开发环境 VS的基本熟悉2. 解决方案与项目的关系3. 编辑、编译、链接、运行4. 托管代码和CLR4.1 CLR&#xff1a;4.2 C# 代码第编译过程&#xff08;两次编译的&#xff09; 5. 命名空间6. 类的组成与分析7. C# 的数据类型7.1 值类型7.2 引用类型…

手摸手5-springboot开启打印sql完整语句

目录 手摸手5-springboot开启打印sql完整语句简介 p6spy简介引入依赖修改application-jdbc.yaml配置配置spy.properties文件配置项运行后效果 手摸手5-springboot开启打印sql完整语句 简介 MyBatis-Plus提供了SQL分析与打印的功能&#xff0c;通过集成p6spy组件&#xff0c;可…

深入解析TK技术下视频音频不同步的成因与解决方案

随着互联网和数字视频技术的飞速发展&#xff0c;音视频同步问题逐渐成为网络视频播放、直播、编辑等过程中不可忽视的技术难题。尤其是在采用TK&#xff08;Transmission Keying&#xff09;技术进行视频传输时&#xff0c;由于其特殊的时序同步要求&#xff0c;音视频不同步现…

力扣(leetcode)题目总结——动态规划篇

leetcode 经典题分类 链表数组字符串哈希表二分法双指针滑动窗口递归/回溯动态规划二叉树辅助栈 本系列专栏&#xff1a;点击进入 leetcode题目分类 关注走一波 前言&#xff1a;本系列文章初衷是为了按类别整理出力扣&#xff08;leetcode&#xff09;最经典题目&#xff0c…

MySQL超详细安装配置教程(亲测有效)

目录 1.下载mysql 2.环境配置 3.安装mysql ​4.navicat工具下载与连接 ​5总结 1.下载mysql mysql下载--MySQL &#xff1a;&#xff1a; 下载 MySQL 社区服务器 下载的时候这里直接逃过就行 我这里的版本是最新的mysql8.0.37 下载完成之后,将压缩包进行解压 这里我建议大…

高阶云服务-ELB+AS

ELBAS 弹性负载均衡弹性伸缩 原来1台web服务器不满足相应&#xff0c;现部署多台提供相同服务&#xff1b; 由于多个服务器多个ip该如何提供给应用呢&#xff1f; 引申出负载均衡&#xff08;HAProxy&#xff0c;LVS01四层&#xff0c;Nginx七层&#xff09; 防单点故障做主备…