canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据 订阅 和 消费。应该是阿里云DTS(Data Transfer Service)的开源版本。
Canal与DTS提供的功能基本相似:
基于Mysql的Slave协议实时dump binlog流,解析为事件发送给订阅方。
单Canal instance,单DTS数据订阅通道均只支持订阅一个RDS,提供给一个消费者。
可以使用canal-client客户端进行消息消费。
也可以通过简单配置,也可以不需要自行使用canal-client消费,可以选择直接投递到kafka或者RocketMQ集群,用户只需要使用消息队列的consumer消费即可。
成功消费消息后需要进行Ack,以确保一致性,服务端则会维护客户端目前的消费位点。
Canal是怎么实现的?
MySQL的主从复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
canal 就是模拟了MySQL主从复制这个过程:
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
canal 1.1.4开始支持admin管理,通过canal-admin为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作,替代了过去繁琐的配置文件管理。
多个canal-server可以组成集群模式,每个instance任务通过zookeeper在集群中实现高可用
通过多个集群,可以实现同步资源的物理隔离
可以直接抓取消费投递MQ,可以实现生产/消费解耦、消息堆积、消息回溯
可以抓取消费投递给canal-client,在用户的服务中进行消息处理,减少中间过程
Canal-server架构
server 代表一个 canal 运行实例,对应于一个 jvm进程
instance 对应于一个数据队列 (1个 canal server 对应 1…n 个 instance )
instance 下的子模块 :
eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
eventStore: 数据存储
metaManager: 增量订阅 & 消费信息管理器
EventParser子模块
主要有两个核心组件组成:
CanalLogPositionManager:用来记录最新解析成功的binlog position信息,在canal重启后,作为起始位点
CanalHAController:支持Mysql主备,基于Heartbeat判断当前数据库连接的有效性,一旦主库失去心跳,就切换连接备库
EventParser从CanalHAController确定连接mysql的位置,然后通过LogPositionManager确定binlog解析位点的起点,最后便通过dump协议拉取binlog进行解析,把解析后的消息存入EventSink.
EventSink子模块
目前只提供了一个带有实际作用的实现:GroupEventSink
GroupEventSink用于将多个instance上的数据进行归并,常用于分库后的多数据源归并。
EventStore子模块
目前只实现了基于内存存储的MemoryEventStoreWIthBuffer
MemoryEventStoreWIthBuffer内部采用的是一个RingBuffer,我们可以理解为基于内存的高性能消息队列。如果使用canal-client直接消费canal-server的数据,那么只能通过这个消息队列做一定程度的消息堆积。
- Put : Sink模块进行数据存储的最后一次写入位置
- Get : 数据订阅获取的最后一次提取位置
- Ack : 数据消费成功的最后一次消费位置
这些位点信息通过MetaManager进行管理
源码概览
deployer模块:独立部署模块,用于canal-server的独立启动,包括本地配置解析、拉取远程配置、启动canal-server。
server模块:canal-server的实现逻辑,一个canal-server一般是一个jvm进程。重点关注两种canal-server的实现方式,内嵌型的canalServerEmbed和独立使用的canalServerWithNetty。新版本中新增了直接对接mq的canal-server实现。
instance模块:具体实时订阅任务是由一个个instance组成的,每个canal-server中可以同时运行多个instance。instance由parser、sink、store三个重点模块组成。
parser模块:数据源接入,模拟slave协议和master进行交互,协议解析。parser模块依赖于dbsync、driver模块。
sink模块:将parser抓取到的数据,进行过滤,加工,然后发送到store模块进行存储。核心接口为CanalEventSink。
store模块:数据存储模块,类似内存模式到消息队列,本质上是一个RingBuffer。核心接口为CanalEventStore。
meta模块:增量订阅&消费信息管理器,核心接口为CanalMetaManager,主要用于记录canal消费到的mysql binlog的位置
client模块:项目最早的消费客户端,通过将client模块引入自己的项目中,然后直接消费canal-server获取的数据。
client-adapter模块:1.1.x后新出的模块,可以独立部署为canal-server的消费服务端,是一个springboot项目。通过SPI机制,能够加载不同plugins,将消费信息投递到ES\hbase\rdb等下游。
admin模块:1.1.x新出的模块,可以独立部署为canal-server的控制台,配置canal-server、instance相关配置,非常好用。
模块关联
server模块是服务端核心模块,用来拉取binlog的实时变更,然后投递到客户端。
server可以通过配置,选择投递到MQ,或者是启动一个netty,让客户端来拉取。
client-adapter就是一个独立部署到服务,可以直接拉取canal-server的消息(或者拉取mq的消息),转发到对应RDS/Redis/HBase,当然,你也可以自己实现一个转发到redis的adapter
admin模块是管理控制台,可以调度canal-server组成一个个集群实现instance的高可用、可以更改server、instance的配置信息。
Canal-server模块局部关系,包括deployer模块、server模块、instance模块、parser模块、sink模块、store模块、meta模块、client模块。
deployer模块是一个启动模块,可以启动canal-server。
一个server是一个独立应用,是一个jvm进程,里面可以有多个instance对象。
instance内包括了parser、sink、store、meta
parser负责获取binlog变更,然后sink将parser获取的binlog变更转换为event,存入store。
meta是元信息管理器
client模块可以内嵌入你的应用,用来消费canal-server的消息事件。
实践
集群管理
管理canal-server集群,核心配置:
#canal-admin
canal.admin.manager = 172.16.66.181:18200
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
canal.zkServers = 172.16.86.180:2181,172.16.66.181:2181,172.16.66.182:2181
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
#MQ Properties
rocketmq.producer.group = canal_default_producer_group
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = rmq-cn-4xl3ctfi70a-vpc.cn-shenzhen.rmq.aliyuncs.com:8080
rocketmq.retry.times.when.send.failed = 3
Server管理
Instance管理
# position info
canal.instance.master.address=rm-wz90f49ny39o6je81.mysql.rds.aliyuncs.com:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal2023
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex= frxs_erp_basedata\\.wproducts
# mq config
canal.mq.topic=tp_default_product_change
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=tp_product_change:frxs_erp_basedata\\.wproducts