canal同步数据教程

canal简介

官网:https://github.com/alibaba/canal

主要是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,是一个实时同步的方案。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

工作原理

   canal相当于一个mysql slave节点,工作原理如下:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

canal各个组件

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client(消费端)支持多种语言:

  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal php客户端: https://github.com/xingwenge/canal-php
  • canal Python客户端:https://github.com/haozi3156666/canal-python
  • canal Rust客户端:https://github.com/laohanlinux/canal-rs
  • canal Nodejs客户端:https://github.com/marmot-z/canal-nodejs

记住:canal是基于订阅和消费机制的,这边的client就是消费端,为了支持更多的语言或者防止消费能力不足,可以把消息直接投递到mq,借助mq的削峰平谷的能力

  • 最重要的组件是deployer,也就是server
  • admin是一个webUI的动态管理组件,根据需要搭建
  • example是client的一些例子
  • adapter是一种客户端数据落地的适配,不如你想把数据同步到hbase,你可以直接用adapter,adapter会帮忙你做数据格式转换等,当然也可以通过客户端自己写。

实践

环境版本

  • mysql版本:5.7.24 docker版
  • canal版本:1.1.8-alpha-3

docker部署mysql

 启动一个实例,如下

docker run -d --name mysql-test -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql:5.7.24

修改容器内/etc/mysql/mysql.conf.d/mysqld.cnf的内容,在末尾增加如下3行内容,可以通过docker cp来修改

[mysqld]
pid-file	= /var/run/mysqld/mysqld.pid
socket		= /var/run/mysqld/mysqld.sock
datadir		= /var/lib/mysql
#log-error	= /var/log/mysql/error.log
# By default we only accept connections from localhost
#bind-address	= 127.0.0.1
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
# 以下3行是新增的
server_id=1
log_bin=mysql-bin
binlog_format=ROW

重新启动容器

docker restart mysql-test

通过客户端(比如DBeaver连接mysql),查看这个配置参数

创建数据库test

创建表sys_user

CREATE TABLE `sys_user` (`user_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',`user_name` varchar(60) NOT NULL COMMENT '用户昵称',`email` varchar(50) DEFAULT '' COMMENT '用户邮箱',`sex` char(1) DEFAULT '0' COMMENT '用户性别(0男 1女 2未知)',`password` varchar(50) DEFAULT '' COMMENT '密码',`status` char(1) DEFAULT '0' COMMENT '帐号状态(0正常 1停用)',PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用户信息表';

到此mysql的准备工作已结束,重点在于开启bin log,并设置format为ROW,因为canal是作为mysql slave存在的,需要通过binlog来同步数据。

部署canal-server

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.deployer-1.1.8-SNAPSHOT.tar.gz,解压缩得到如下的目录

打开conf/example/instance.properties配置源数据库(mysql)的信息,我这边偷懒直接用root账号,密码来自docker部署mysql启动时设置的密码

进入bin,直接点击startup.bat(windows环境)

然后打开logs/canal/canal.log,如果出现以下信息表示启动成功。接下来就可以通过客户端去订阅了。

例子1:canal-client的使用

在idea中新建一个maven项目,引入client的依赖

        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version></dependency>

新增client类-CanalClient,代码如下:

package com.example.demo.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class CanalClient {public static void main(String[] args) throws InvalidProtocolBufferException {// 1. 创建链接,即server的地址和端口,其中example就是源数据库的名称(来自canal.deployer下的conf下的example文件及名称)CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111), "example", "", "");while (true) {// 2. 获取连接connector.connect();// 3. 指定订阅的数据库testconnector.subscribe("test.*");// 4. 获取MessageMessage message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size() == 0) {System.out.println("没有数据,休息下");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {for (CanalEntry.Entry entry : entries) {// 获取表名String tableName = entry.getHeader().getTableName();// Entry类型CanalEntry.EntryType entryType = entry.getEntryType();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 获取序列化数据ByteString storeValue = entry.getStoreValue();// 反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取事件类型CanalEntry.EventType eventType = rowChange.getEventType();// 获取具体数据List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();// 遍历并打印数据for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();Map<String, Object> bMap = new HashMap<>();for (CanalEntry.Column column : beforeColumnsList) {bMap.put(column.getName(), column.getValue());}List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();Map<String, Object> aMap = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {aMap.put(column.getName(), column.getValue());}System.out.println("表名:" + tableName + ",操作类型:" + eventType);System.out.println(",变化前:" + bMap);System.out.println(",变化后:" + aMap);}}}}}}
}

启动后,控制台打印如下:

当往数据库插入一条记录

insert sys_user(user_name, email, sex, `password`, status) values('张三','zhangsan@qq.com', 0, '123',0);

结果,控制台打印如下:

当修改记录时,控制台打印如下

update sys_user set `password`='123456' where user_name='张三';

当删除记录时,控制台打印如下

delete from sys_user where user_name='张三';

至此获取到的数据的变化,你可以把他手动转换、处理等等操作,也可以同步到其他的数据库,灵活性很大,但是如果当只是想把数据同步到其他数据库时,可以直接选择使用client-adapter,不需要编码,client-adapter可以认为是针对特定场景的加强版的客户端。

例子2:springboot集成canal-client

这边采用非官方封装的jar包,但是使用很方便,首先新建一个springboot项目(略)

引入以下maven依赖

<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.6-RELEASE</version>
</dependency>

使用很简单,比如同步数据到redis,详细参考官网:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client

例子3:同步数据到clickhouse(client-adapter的使用)

clickhouse-adapter是1.18新增的,不知是我的配置还有问题,当sys_user没有id字段时,只有新增能正常同步到clickhouse,故把sys_user.user_id改为id。

先部署clickhouse,在default新建表,建表语句如下:

CREATE TABLE default.test_user
(`id` Int64,`user_name` String,`email` String,`sex` String,`password` String,`status` String
)
ENGINE = MergeTree
PRIMARY KEY (id)  -- 明确指定 PRIMARY KEY
ORDER BY (id)     -- ORDER BY 是必须的
SETTINGS index_granularity = 8192;

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.adapter-1.1.8-SNAPSHOT.tar.gz

解压缩得到如下的目录

在conf下新增clickhouse文件夹,并且新建test_user.yml文件,内容如下:

#dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: clickhouse1
concurrent: true
dbMapping:database: testtable: sys_usertargetTable: default.test_usertargetPk:id: id
#  mapAll: truetargetColumns:id:user_name:email:sex:password:status:  #etlCondition: "where id>={}"commitBatch: 3000 # 批量提交的大小

修改conf下的application.yml,把outerAdapters: 下的配置改为如下:主要是配置下目标clickhouse数据库的信息。

启动bin下的startup.bat(windows系统),当看到如下信息表示启动成功

此时去mysql中新增几条记录,adapter的控制台会打印出对应的信息:

查看clickhouse,会发现记录已同步,删除和修改类似的。

部署canal-admin

新建数据库canal_manager,用于admin,这边偷个懒,还是用root账号

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.admin-1.1.8-SNAPSHOT.tar.gz

解压缩得到如下的目录

把conf下的canal_manager.sql sql导入数据库中,进行数据初始化。

修改conf/application.yml的配置,如下:

server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin

启动bin/startup.bat

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

关闭admin,接下去要把canal-server对接到admin上,

修改canal-server的conf下的canal_local.properties,内容如下:

# register ip
canal.register.ip =# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = 

然后在canal-deploy目录下以local的配置启动canal-server

$ bin/startup.bat local

重启canal-admin

进入网页端,能看到如下信息,server管理已经存在一个新的节点了

使用心得

canal定位是一个增量日志同步,但是也有方式可以设置全量同步,但都需要canal-adapter支持

  • canal可以设置全库同步,详见参考:Sync RDB · alibaba/canal Wiki · GitHub
  • 对于表级别的全量同步,可以使用etl接口,详见:ClientAdapter · alibaba/canal Wiki · GitHub

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/480712.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Learn Three.js》学习(3)光源

前言&#xff1a; WebGL本身不支持光源&#xff0c;不使用three.js,则需使用着色程序来模拟光源。 学习大纲&#xff1a; Three.js中的光源 特定光源的使用时机 如何调整和配置所有光源的行为 如何创建镜头光晕 光源表 基础光源&#xff1a;THRER.AmbientLight、THERE.Point…

shell(9)

声明&#xff01; 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&…

【头歌实训:递归实现斐波那契数列】

头歌实训&#xff1a;递归实现斐波那契数列 文章目录 任务描述相关知识递归相关知识递归举例何时使用递归定义是递归的数据结构是递归的问题的求解方法是递归的 编程要求测试说明源代码&#xff1a; 任务描述 本关任务&#xff1a;递归求解斐波那契数列。 相关知识 为了完成…

回声消除延时估计的一些方法

在音频信号处理&#xff0c;尤其是在回声消除和语音通信中&#xff0c;延时估计是一个至关重要的任务。回声消除技术旨在减少或消除在语音通信中由于信号反射而产生的回声。为了有效地实现这一点&#xff0c;系统需要准确估计发送信号和接收信号之间的延迟。通过了解延迟&#…

我们来学mysql -- 事务之概念(原理篇)

事务的概念 题记一个例子一致性隔离性原子性持久性 题记 在漫长的编程岁月中&#xff0c;存在一如既往地贯穿着工作&#xff0c;面试的概念这类知识点&#xff0c;事不关己当然高高挂起&#xff0c;精准踩坑时那心情也的却是日了&#x1f436;请原谅我的粗俗&#xff0c;遇到B…

剪映自动批量替换视频、图片素材教程,视频批量复刻、混剪裂变等功能介绍

一、三种批量替换模式的区别 二、混剪裂变替换素材 三、分区混剪裂变替换素材 四、按组精确替换素材 五、绿色按钮教程 &#xff08;一&#xff09;如何附加音频和srt字幕 &#xff08;二&#xff09;如何替换固定文本的内容和样式 &#xff08;三&#xff09;如何附加…

【力扣】389.找不同

问题描述 思路解析 只有小写字母&#xff0c;这种设计参数小的&#xff0c;直接桶排序我最开始的想法是使用两个不同的数组&#xff0c;分别存入他们单个字符转换后的值&#xff0c;然后比较是否相同。也确实通过了 看了题解后&#xff0c;发现可以优化&#xff0c;首先因为t相…

HarmonyOS Next 模拟器安装与探索

HarmonyOS 5 也发布了有一段时间了&#xff0c;不知道大家实际使用的时候有没有发现一些惊喜。当然随着HarmonyOS 5的更新也带来了很多新特性&#xff0c;尤其是 HarmonyOS Next 模拟器。今天&#xff0c;我们就来探索一下这个模拟器&#xff0c;看看它能给我们的开发过程带来什…

net9 abp vnext 多语言通过数据库动态管理

通过数据库加载实现动态管理&#xff0c;用户可以自己修改界面显示的文本&#xff0c;满足国际化需求 如图所示,前端使用tdesign vnext 新建表TSYS_Localization与TSYS_LocalizationDetail 国旗图标下载网址flag-icons: Free Country Flags in SVG 在Shared下创建下图3个文件 …

ceph的用户管理和cephx认证

用户权限概述 用户格式 参考链接&#xff1a; 权限&#xff1a;https://docs.ceph.com/en/latest/rados/operations/user-management/#authorization-capabilities 用户&#xff1a;https://docs.ceph.com/en/reef/rados/operations/user-management/ ceph的用户格式TYPEID…

springboot340“共享书角”图书借还管理系统(论文+源码)_kaic

摘 要 随着社会的发展&#xff0c;图书借还的管理形势越来越严峻。越来越多的借阅者利用互联网获得信息&#xff0c;但图书借还信息量大。为了方便借阅者更好的获得本图书借还信息&#xff0c;因此&#xff0c;设计一种安全高效的“共享书角”图书借还管理系统极为重要。 为…

爬虫笔记24——纷玩岛自动抢票脚本笔记

纷玩岛自动抢票&#xff0c;协议抢票思路实现 一、获取Authorization凭证二、几个关键的参数三、几个关键的接口获取参数v&#xff0c;这个参数其实可以写死&#xff0c;可忽略通过价位获取演出的参数信息获取观演人信息&#xff0c;账号提前录入即可提交订单接口 先看实现图&a…

配置泛微e9后端开发环境

配置泛微e9的后端开发环境 1.安装jdk1.8&#xff08;请自行安装并设置环境变量&#xff09; 2.将服务器上的WEARVER文件夹拷贝到开发环境下(其中要包含ecology和Resin目录) 3.通过idea创建一个基础Java项目,将jdk设置为1.8 4.添加依赖,需要将3个文件夹的所有jar包添加到项目中…

52-基于单片机的超声波、温湿度、光照检测分阶段报警

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 1.通过DHT11模块读取环境温度和湿度: 2.将湿度、障碍物距显示在lcd1602上面&#xff0c;第一行显示温度和湿度,格式为:xxCyy%&#xff0c;第二行显示超声波传感器测得的距离&#xff0c;格式为:Di…

C++类的自动转换和强制类型转换

目录 一、类型转换 二、转换函数 一、类型转换 C⽀持内置类型隐式类型转换为类类型对象&#xff0c;需要有相关内置类型为参数的构造函数 简单说就是可以将内置类型转化为自定义类型 示例&#xff1a; class Test { public:Test(int n1 0):num1(n1){}void pr…

w~视觉~合集26

我自己的原文哦~ https://blog.51cto.com/whaosoft/12663170 #InternVL 本文设计了一个大规模的视觉-语言基础模型&#xff08;InternVL&#xff09;&#xff0c;将视觉基础模型的参数扩展到60亿&#xff0c;并逐步与LLM对齐&#xff0c;利用来自不同来源的网络规模的图像-文…

C++优选算法十六 BFS解决最短路问题

1.BFS解决最短路问题的优势与局限 BFS是一种有效的解决最短路问题的算法&#xff0c;特别适用于无权图或边权相等的图。 优势&#xff1a; BFS能够逐层遍历图中的所有节点&#xff0c;直到找到目标节点或遍历完所有可达节点。对于无权图&#xff08;即边权为1的图&#xff0…

服务器创建容器时报错: no main manifest attribute

1.出现问题的原因 springboot项目快速搭建完成以后&#xff0c;打包 > 制作容器 > 启动 在创建完成docker容器以后,启动时出现以下问题 查询了一下百度,说的是没有main文件信息, 2.解决方法 在pom文件里面加入以下代码即可 <plugins><plugin><groupI…

【小白学机器学习34】基础统计2种方法:用numpy的方法np().mean()等进行统计,pd.DataFrame.groupby() 分组统计

目录 1 用 numpy 快速求数组的各种统计量&#xff1a;mean, var, std 1.1 数据准备 1.2 直接用np的公式求解 1.3 注意问题 1.4 用print() 输出内容&#xff0c;显示效果 2 为了验证公式的背后的理解&#xff0c;下面是详细的展开公式的求法 2.1 均值mean的详细 2.2 方差…

无需插件,如何以二维码网址直抵3D互动新世界?

随着Web技术的飞速发展&#xff0c;一个无需额外插件&#xff0c;仅凭二维码或网址即可直接访问的三维互动时代已经悄然来临。这一变革&#xff0c;得益于WebGL技术与先进web3D引擎的完美融合&#xff0c;它们共同构建了51建模网这样一个既便捷又高效的在线三维互动平台&#x…