CloudCanal x Debezium 打造实时数据流动新范式

简述

Debezium 是一个开源的数据订阅工具,主要功能为捕获数据库变更事件发送到 Kafka。

CloudCanal 近期实现了从 Kafka 消费 Debezium 格式数据,将其 同步到 StarRocks、Doris、Elasticsearch、MongoDB、ClickHouse 等 12 种数据库和数仓,补全其数据到达能力。

本文将先简单介绍该项技术实现的背景,再通过 MySQL -> Kafka -> Starrocks 示例展示此功能。

image.png

为什么要消费 Debezium 格式数据

高流行度

Debezium 是一个高质量、被大量项目集成的开源项目,社区用户活跃,官方维护积极,修复 bug、增加新特性,不断更新版本。

作为 Kafka Connect 生态系统的一部分,Debezium 能够无缝与 Kafka 进行对接,为用户后端数据处理提供了强大的 实时数据准备 能力。

由此形成的高流行度,让每一个数据行业从业者不能忽视其影响力。

合理的消息结构

Schema(数据结构) 遵循 Kafka Connect 标准,提供了详细的字段信息。

"schema": {"type": "struct","fields": [{"type": "int32", "optional": false, "field": "id"},{"type": "string", "optional": false, "field": "name"},{"type": "int32", "optional": false, "field": "age"}],"optional": false, "name": "my_database.user.Value"
}

Payload(数据)包含实际的数据库变更数据,与 Schema 中定义的字段对应。

"payload": {"id": 123,"name": "John Doe","age": 30,"source": {...}
}

此外消息还携带了源端数据源全面的关联信息,包括库、表、时间戳、位点等信息。整体格式实用、简洁。

支持 Schema 演进

Debezium 不仅捕获数据库模式的当前状态,还能感知和记录每次模式变更细节。

当数据库表结构发生变化时(如添加、删除、修改字段等),Debezium 能够 实时捕获这些结构变更,确保变更事件的精准传递。

另外 Debezium 会为每个捕获的变更事件 记录包含当前和先前 Schema 的历史记录

这意味着 可追溯任何时刻数据库 Schema,了解特定时间点表字段、数据类型等信息, 并且可精准还原数据库在某一时刻的结构,无需额外的查询或推测。

CDC 数据格式标准

Debezium 数据 Schema 基于 Kafka Connect 标准设计,这使 Debezium 产生的变更事件能够轻松地集成到各种 Kafka Connect 连接器中,实现了与 Kafka 生态系统的顺畅对接。

这个设计使得 Debezium 数据 Schema 有望成为 CDC(Change Data Capture) 领域标准,为实时数据流的流动提供了基础设施。

端到端的缺憾

Debezium 集如此众多的优点,但是其官方缺少消息到对端的能力(目前有在补充),这让一部分用户感觉束手无策,CloudCanal 支持消费 Debezium 数据即解决这个问题,为用户实时数据生态建设贡献绵薄之力。

支持 Debezium 的主流 CDC 技术比较

对于使用 Debezium 的用户来说,消费 Kafka 中的 Debezium 数据并将其写入其他数据源,有几种主流 CDC 技术可选,如下表。

Kafka-ConnectFlink-CDCCloudCanal
同步配置配置文件代码/配置(新版本)可视化
同步性能(延迟)优秀优秀优秀
社区支持一般积极积极
大规模部署使用一般优秀优秀
消息格式符合其标准的 JSON、Avro…Debezium JSON、Canal JSON、Maxwell JSONDebezium JSON、Canal JSON、CloudCanal JSON 等
插件支持Oracle、MySQL、SqlServer…Oracle、MySQL、SqlServer…StarRocks、Doris、Elasticsearch 等 12 种

CloudCanal 支持 Debezium 做了那些事

CloudCanal 之前即实现了将数据库数据以 Debezium 格式写入目标端 Kafka 的能力,并在兼容性方面做了大量优化。

此次版本更新则支持从 Kafka 消费 Debezium 格式数据,并同步到对端数据库或数仓, 形成基于 Kafka 中转的端到端数据迁移同步能力,同时可平滑对接上/下游已使用其他工具且以 Debezium 数据格式载体的需求。

操作示例

Debezium 环境准备

  • 相关资源一键部署 (Docker): debezium-test.tar.gz
    • Kafka 集群 + Kafka UI
    • Debezium
    • MySQL (源端)
    • Starrocks (目标端)
    tar -xzvf debezium-test.tar.gz
    sh install.sh
    

创建 MySQL Source Connector

  • 源端是 MySQL,通过下面的表进行创建。

    CREATE DATABASE `inventory`;CREATE TABLE `inventory`.`customer` (`c_int` int NOT NULL,`c_bigint` bigint NOT NULL, `c_decimal` decimal(10,3) NOT NULL,`c_date` date NOT NULL,`c_datetime` datetime NOT NULL,`c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`c_year` int NOT NULL,`c_varchar` varchar(10) NOT NULL,`c_text` text NOT NULL,PRIMARY KEY (`c_int`)
    );
    
  • 通过 Debezium 的 Api 接口创建 Connector 订阅 MySQL 的变更事件。

    curl -i -X POST http://127.0.0.1:7750/connectors \-H 'Content-Type: application/json' \-d '{"name": "connector-test-mx","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "112.124.38.87","database.port": "25000","database.user": "root","database.password": "123456","database.server.id": "1","database.server.name": "mx","database.include.list": "inventory","topic.prefix": "mx","table.include.list": "inventory.customer","snapshot.mode": "never","database.history.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092","schema.history.internal.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092","schema.history.internal.kafka.topic": "mx.schemahistory.customer","database.history.kafka.topic": "mx.mx_history_schema","include.schema.changes": "false"     }}'
    
  • 创建后,查看 Connetor 的状态。

    curl -s http://127.0.0.1:7750/connectors/connector-test-mx/status
    

CloudCanal 订阅 Kafka 的数据变更

准备 CloudCanal

  • 下载安装 CloudCanal 私有部署版本

添加数据源

  • 数据源管理 -> 添加数据源, 添加 Kafka、Starrocks、MySQL
    image.png
    image.png
    image.png

创建同步任务

  • 任务管理-> 新建任务

  • Kafka选择 Debezium Envelope Json Format格式

  • 该消息格式的说明,参见:源端 Kafka Debezium Json 使用说明
    image.png
    image.png

  • Kafka 消息中如果有 Schema,需要在 任务详细 -> 参数修改 -> 源数据源配置 中修改 envelopSchemaIncludetrue
    image.png

同步测试

  • 源端数据库做数据变更,Debezium 将数据写入 Kafka 后,CloudCanal 会写入到 Starrocks 中。
    image.png

  • 数据同步结束后校验 MySQL 和 Starrocks 的数据,40 万左右的数据是一致的。
    image.png

总结

本文介绍了 CloudCanal 支持消费 Debezium 格式数据的背景,以及通过 MySQL -> Kafka -> Starrocks 示例介绍其使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/222109.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

simulink代码生成(一)——环境搭建

一、安装C2000的嵌入式环境; 点击matlab附加功能, 然后搜索C2000,安装嵌入式硬件支持包;点击安装即可;(目前还不知道破解版的怎么操作,目前我用的是正版的这样,完全破解的可能操作…

华为数通方向HCIP-DataCom H12-831题库(多选题:201-220)

第201题 在多集群RR组网中,每个集群中部署了一台RR设备及其客户机,各集群的RR与为非客户机关系,并建立IBGP全连接。以下关于BGP路由反射器发布路由规则的描述,正确的有哪些? A、若某RR从EBGP对等体学到的路由,此RR会传递给其他集群的RR B、若某RR从非客户机IBGP对等体学…

Axure之中继器的使用(交互动作reperter属性Item属性)

目录 一.中继器的基本使用 二.中继器的动作(增删改查) 2.1 新增 2.2 删除 2.3 更新行 2.4 效果展示 2.5 模糊查询 三.reperter属性 在Axure中,中继器(Repeater)是一种功能强大的组件,用于创建重复…

C# 使用MSTest进行单元测试

目录 写在前面 代码实现 执行结果 写在前面 MSTest是微软官方提供的.NET平台下的单元测试框架;可使用DataRow属性来指定数据,驱动测试用例所用到的值,连续对每个数据化进行运行测试,也可以使用DynamicData 属性来指定数据&…

Flink系列之:Savepoints

Flink系列之:Savepoints 一、Savepoints二、分配算子ID三、Savepoint 状态四、算子五、触发Savepoint六、Savepoint 格式七、触发 Savepoint八、使用 YARN 触发 Savepoint九、使用 Savepoint 停止作业十、从 Savepoint 恢复十一、跳过无法映射的状态恢复十二、Resto…

IEEE TASLP | 联合语音识别与口音识别的解耦交互多任务学习网络

尽管联合语音识别(ASR)和口音识别(AR)训练已被证明对处理多口音场景有效,但当前的多任务ASR-AR方法忽视了任务之间的粒度差异。细粒度单元(如音素、声韵母)可用于捕获与发音相关的口音特征&…

ruoyi若依前后端分离版部署centos7服务器(全)

目录 VMware虚拟机 centos7 安装环境如下 一、msql 5.7 二、nginx1.23.3 三、java8 四、redis 3.2.1 五、部署若依前端 六、部署若依后端 前言 虚拟机的桥接与nat模式 : 重点 重点!!! 无线不可以用桥接模式 ,而你用了nat模式会…

安全狗云原生安全-云甲·云原生容器安全管理系统

随着云计算的快速发展,容器技术逐渐成为主流。然而,随着容器的普及,安全问题也日益突出。为了解决这一问题,安全狗推出了云原生容器安全管理系统——云甲。 云甲是安全狗云原生安全的重要组成部分,它采用了先进的云原生…

版本化数据库管理工具Flyway介绍和Spring Boot集成使用

文章目录 核心功能如何使用 Flyway最佳实践Spring Boot使用 Flyway 是一个版本化数据库管理工具,用于跟踪、管理和应用数据库的变化。它非常适合在团队开发环境中使用,其中多个人员可能会在数据库结构进行更改。Flyway 通过版本控制可以帮助你确保所有人…

Redis-Day3实战篇-商户查询缓存(缓存的添加和更新, 缓存穿透/雪崩/击穿, 缓存工具封装)

Redis-Day3实战篇-商户查询缓存 什么是缓存添加Redis缓存业务流程项目实现练习 - 给店铺类型查询业务添加缓存 缓存更新策略最佳实践方案案例 - 给查询商铺的缓存添加超时剔除和主动更新 缓存穿透/雪崩/击穿缓存穿透概述项目实现 - 商铺查询缓存 缓存雪崩缓存击穿概述互斥锁逻辑…

SpringCloudGateway网关处拦截并修改请求

SpringCloudGateway网关处拦截并修改请求 需求背景 老系统没有引入Token的概念,之前的租户Id拼接在请求上,有的是以Get,Param传参形式;有的是以Post,Body传参的。需要在网关层拦截请求并进行请求修改后转发到对应服务。…

使用pytest+selenium+allure实现web页面自动化测试

测试文件 base 基本方法data 测试数据page web页面相关操作image 测试截图log 日志文件report 测试报告文件temp 临时文件tool 文件读取,发邮件文件TestCases 测试用例 在page下的__init__.py文件下配置 import os import time from selenium.webdriver.common.by…

10 Vue3中v-html指令的用法

概述 v-html主要是用来渲染富文本内容,比如评论信息,新闻信息,文章信息等。 v-html是一个特别不安全的指令,因为它会将文本以HTML的显示进行渲染,一旦文本里面包含一些恶意的js代码,可能会导致整个网页发…

12、Qt:用QProcess类启动外部程序:简单使用

一、说明 简单使用:在一个函数中,使用QProcess类的临时对象调用可执行文件exe,只有这个exe执行完了,这个函数才往下执行,一次性打印出exe所有输出信息;复杂使用:创建QProcess类的全局对象&…

Python---TCP 客户端程序开发

1. 开发 TCP 客户端程序开发步骤回顾 创建客户端套接字对象和服务端套接字建立连接发送数据接收数据关闭客户端套接字 2. socket 类的介绍 导入 socket 模块 import socket 创建客户端 socket 对象 socket.socket(AddressFamily, Type) 参数说明: AddressFamily 表示IP地…

重塑数字生产力体系,生成式AI将开启云计算未来新十年?

科技云报道原创。 今天我们正身处一个历史的洪流,一个巨变的十字路口。生成式AI让人工智能技术完全破圈,带来了机器学习被大规模采用的历史转折点。 它掀起的新一轮科技革命,远超出我们今天的想象,这意味着一个巨大的历史机遇正…

医院影像科PACS系统源码,医学影像系统,支持MPR、CPR、MIP、SSD、VR、VE三维图像处理

PACS系统是医院影像科室中应用的一种系统,主要用于获取、传输、存档和处理医学影像。它通过各种接口,如模拟、DICOM和网络,以数字化的方式将各种医学影像,如核磁共振、CT扫描、超声波等保存起来,并在需要时能够快速调取…

057:vue组件方法中加载匿名函数

第057个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下,本专栏提供行之有效的源代码示例和信息点介绍,做到灵活运用。 (1)提供vue2的一些基本操作:安装、引用,模板使…

宽带阻抗匹配的工程实现-第一步,端口驻波仿真

概要 ADS仿真,Matlab仿真,宽带阻抗匹配,smith圆图。 其实阻抗匹配我工作以来经常说,也经常做,但是基本上都是直接在印制板上进行调试。现在想先用仿真软件直接设计出来,才发现很多东西嘴上说容易&#xf…

力扣每日一题day36[112.路径总和]

给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判断该树中是否存在 根节点到叶子节点 的路径,这条路径上所有节点值相加等于目标和 targetSum 。如果存在,返回 true ;否则,返回 false 。 叶子节点 是指没有子节点…