Canal介绍原理及安装

Canal 扩展篇

1.Canal介绍、

链接: https://github.com/alibaba/canal

Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,工作原理如下:
Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )
Canal 解析 binary log 对象(原始为 byte 流)
可以用于以下业务场景:
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 Canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

2.原理

在这里插入图片描述

安装Canal

1. 新建文件夹logs,新建文件canal.properties instance.properties docker.compose.yml

如下图
在这里插入图片描述

  • 新建instance.properties,其中要修改两个地方,如下
    在这里插入图片描述
    在这里插入图片描述
    下面的文件是改过后的要修改成自己的数据库、用户和密码
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address=192.168.11.41:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=20041010
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
  • 新建 docker-compose.yml,这个不需要改
version: '3'
services:canal:container_name: canal_latestimage: canal/canal-server:v1.1.5restart: alwaysports:- 11111:11111volumes:                                - ./canal.properties:/home/admin/canal-server/conf/canal.properties- ./instance.properties:/home/admin/canal-server/conf/example/instance.properties- ./logs:/home/admin/canal-server/logs
  • 最后新建 canal.properties,这个也需要修改两个地方
    在这里插入图片描述
    在这里插入图片描述
#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
#########               destinations            #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
#########             MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host = 192.168.11.81:5672
rabbitmq.virtual.host = /
rabbitmq.exchange = canal_exchange
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode = 2##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =

2.然后就可以启动容器了

docker-compose up -d

在这里插入图片描述

3.最后查看结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/304430.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis中的集群(五)

集群 在集群中执行命令 MOVED错误。 当节点发现键所在的槽并非由自己负责处理的时候&#xff0c;节点就会向客户端返回一个MOVED错误&#xff0c;指引客户端转向至正在负责槽的节点&#xff0c;MOVED错误的格式为: MOVED <slot> <ip>:<port>其中slot为键…

深度解读C++17中的std::string_view:解锁字符串处理的新境界

深入研究C17中的std::string_view&#xff1a;解锁字符串处理的新境界 一、简介二、std::string_view的基础知识2.1、构造函数2.2、成员函数 三、std::string_view为什么性能高&#xff1f;四、std::string_view的使用陷阱五、std::string_view源码解析六、总结 一、简介 C中有…

【开源社区】openEuler、openGauss、openHiTLS、MindSpore

【开源社区】openEuler、openGauss、openHiTLS、MindSpore 写在最前面开源社区参与和贡献的一般方式开源技术的需求和贡献方向 openEuler 社区&#xff1a;开源系统官方网站官方介绍贡献攻略开源技术需求 openGauss 社区&#xff1a;开源数据库官方网站官方介绍贡献攻略开源技术…

数字乡村:科技引领新时代农村发展

随着信息技术的迅猛发展和数字化浪潮的推进&#xff0c;数字乡村作为新时代农村发展的重要战略&#xff0c;正日益成为引领农村现代化的强大引擎。数字乡村不仅代表着农村信息化建设的新高度&#xff0c;更是农村经济社会发展的重要支撑。通过数字技术的深入应用&#xff0c;农…

vue2创建项目的两种方式,配置路由vue-router,引入element-ui

提示&#xff1a;vue2依赖node版本8.0以上 文章目录 前言一、创建项目基于vue-cli二、创建项目基于vue/cli三、对吧两种创建方式四、安装Element ui并引入五、配置路由跳转四、效果五、参考文档总结 前言 使用vue/cli脚手架vue create创建 使用vue-cli脚手架vue init webpack创…

✌2024/4/6—力扣—最长公共前缀✌

代码实现&#xff1a; char *longestCommonPrefix(char **strs, int strsSize) {if (strsSize 0) {return "";}for (int i 0; i < strlen(strs[0]); i) { // 列for (int j 1; j < strsSize; j) { // 行if (strs[0][i] ! strs[j][i]) { // 如果比较字符串的第…

Java特性之设计模式【外观模式】

一、外观模式 概述 外观模式&#xff08;Facade Pattern&#xff09;隐藏系统的复杂性&#xff0c;并向客户端提供了一个客户端可以访问系统的接口。这种类型的设计模式属于结构型模式&#xff0c;它向现有的系统添加一个接口&#xff0c;来隐藏系统的复杂性 这种模式涉及到一…

uniapp中页面滚动锚点位置及滚动到对应高度显示对应按钮

可以把页面代码和组件代码放自己项目里跑一下 页面代码 <template><view class"Tracing-detail"><view class"title" v-for"i in 30">顶部信息</view><!-- tab按钮 --><Tab v-model"activeIndex" …

云手机解决海外社媒运营的诸多挑战

随着海外社交媒体运营的兴起&#xff0c;如何有效管理多个账户成为了一项挑战。云手机作为一种新兴的解决方案&#xff0c;为海外社媒运营带来了前所未有的便利。 云手机的基本原理是基于云计算和虚拟化技术&#xff0c;允许用户在物理手机之外创建和使用多个虚拟手机。这种创新…

Node.js 的 5 个常见服务器漏洞

Node.js 是一个强大且广泛使用的 JavaScript 运行时环境&#xff0c;用于构建服务器端应用程序。然而&#xff0c;与任何其他软件一样&#xff0c;Node.js 也有自己的一些漏洞&#xff0c;如果处理不当&#xff0c;可能会导致安全问题。请注意&#xff0c;这些漏洞并不是 Node.…

嵌入式面向对象学习 RT-Thread I/O 设备管理框架 设备驱动层 案例测试

嵌入式面向对象 RT-Thread I/O 设备管理框架 设备驱动层 注&#xff1a;本文介绍性内容转载于《RT-Thread记录&#xff08;十、全面认识 RT-Thread I/O 设备模型&#xff09;》 注&#xff1a; 本次使用的开发板 &#xff1a; ​ 兆易创新GD32F407VET6开发板 ​ 雅特力科技…

nginx配置证书和私钥进行SSL通信验证

文章目录 一、背景1.1 秘钥和证书是两个东西吗&#xff1f;1.2 介绍下nginx配置文件中参数ssl_certificate和ssl_certificate_key1.3介绍下nginx支持的证书类型1.4 目前nginx支持哪种证书格式&#xff1f;1.5 nginx修改配置文件目前方式也会有所不同1.6 介绍下不通格式的证书哪…

微服务-4 Nacos

目录 一、注册中心 二、配置管理 1. 添加配置 2. 配置自动刷新 3. 多环境配置共享​编辑 一、注册中心 服务列表&#xff1a; 服务详情&#xff1a; 二、配置管理 1. 添加配置 (1). 在 nacos 界面中添加配置文件&#xff1a; 配置列表&#xff1a; 配置详情&#xff1a;…

Qt之QSS样式表

QSS简介 QSS&#xff08;Qt Style Sheet&#xff09;样式表是一种用于描述图形用户界面&#xff08;GUI&#xff09;样式的语言。它允许开发者为应用程序的控件定义视觉外观&#xff0c;例如颜色、字体、尺寸和布局等。 QSS 样式表的主要目的是提供一种简洁而灵活的方式来美化…

【优选算法专栏】专题四:前缀和(一)

本专栏内容为&#xff1a;算法学习专栏&#xff0c;分为优选算法专栏&#xff0c;贪心算法专栏&#xff0c;动态规划专栏以及递归&#xff0c;搜索与回溯算法专栏四部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握算法。 &#x1f493;博主csdn个人主页&#xff1a;小…

DRF 常用功能

文章目录 一、主流认证方式Session认证Token认证JWT认证 二、DRF认证与权限Session认证所有视图&#xff08;全局&#xff09;启用认证视图级别启用认证 Token认证[推荐]安装APP启用Token认证生成数据库表(因为token要存储到数据库&#xff09;配置Token认证接口URL获取token使…

迭代器模式【行为模式C++】

1.简介 迭代器模式是一种行为设计模式&#xff0c; 让你能在不暴露集合&#xff08;聚合对象&#xff09;底层表现形式 &#xff08;列表、 栈和树等&#xff09; 的情况下遍历集合&#xff08;聚合对象&#xff09;中所有的元素。 迭代器的意义就是将这个行为抽离封装起来&a…

STM32F4 IAP跳转APP问题及STM32基于Ymodem协议IAP升级笔记

STM32F4 IAP 跳转 APP问题 ST官网IAP例程Chapter1 STM32F4 IAP 跳转 APP问题1. 概念2. 程序2.1 Bootloader 程序 问题现象2.2. APP程序 3. 代码4. 其他问题 Chapter2 STM32-IAP基本原理及应用 | ICP、IAP程序下载流程 | 程序执行流程 | 配置IAP到STM32F4xxxChapter3 STM32基于Y…

SQLite数据库在Linux系统上的使用

SQLite是一个轻量级的数据库解决方案&#xff0c;它是一个嵌入式的数据库管理系统。SQLite的特点是无需独立的服务器进程&#xff0c;可以直接嵌入到使用它的应用程序中。由于其配置简单、支持跨平台、服务器零管理&#xff0c;以及不需要复杂的设置和操作&#xff0c;SQLite非…

python如何输入多行

Python中的Input()函数在输入时&#xff0c;遇到回车符&#xff0c;那么一次输入就结束了。这不能满足输入多行文本并且行数也不确定的情形&#xff0c;当然输入空行也是允许的。 方法1&#xff1a;利用异常处理机制实现 lines[] while True:try:lines.append(input())except:…