1.什么是canal?
canal 是阿里开源的一款 MySQL 数据库增量日志解析工具,提供增量数据订阅和消费。
工作原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据
Canal 工作原理
- Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )
- Canal 解析 binary log 对象(原始为 byte 流)
2.环境搭建
搭建mysql
version: '3'
services:mysql:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/mysql:5.7container_name: mysql_3306restart: unless-stoppedvolumes:- "./mysql/my.cnf:/etc/mysql/my.cnf"- "./mysql/init-file.sql:/etc/mysql/init-file.sql"- "./mysql/data:/var/lib/mysql"
# - "./mysql/conf.d:/etc/mysql/conf.d"- "./mysql/log/mysql/error.log:/var/log/mysql/error.log"- "./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d" # init sql script directory -- tips: it can be excute when `/var/lib/mysql` is emptyenvironment: # set environment,equals docker run -eTZ: Asia/ShanghaiLANG: en_US.UTF-8MYSQL_ROOT_PASSWORD: root # set root passwordMYSQL_DATABASE: root # init database nameports: # port mappping- "3306:3306"
注意my.cnf在windows系统必须为只读,否则忽略
docker-compose -f docker-compose.yml -p mysql57 up -d
查看日志是否开启
show variables like 'log_%';
如果启用了,这个查询将返回log_bin的值为ON。
搭建canal-server
下载我canal客户端,官方地址进行相应版本的安装包进行下载(注意:如果下载翻到本文最下面联系我): https://github.com/alibaba/canal/releases
修改配置文件
example/instance.properties
instance.properties 基本上只用改动 dbUsername 和 dbPassword就可以了
启动
3.代码工程
实验目的
实现canal读取mysql的变更数据
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>canal</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
</project>
client
package com.et.canal.client;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.context.CanalContext;
import top.javatool.canal.client.handler.EntryHandler;
import top.javatool.canal.client.model.CanalModel;import java.util.Map;@Slf4j
@Component
@CanalTable(value = "all")
public class AllConsumer implements EntryHandler<Map<String, String>> {@Overridepublic void insert(Map<String, String> map) {log.info("add,{}", map);}@Overridepublic void update(Map<String, String> before, Map<String, String> after) {// CanalModelCanalModel canal = CanalContext.getModel();log.info("update,update before={},update after={}", before, after);}@Overridepublic void delete(Map<String, String> map) {log.info("delete,{}", map);}
}
application.yaml
canal:mode: simplefilter: # 过滤表名,可以为空batch-size: 1timeout: 1server: 127.0.0.1:11111destination: example # canal-server中定义的实例名,不可以为空user-name: rootpassword: rootasync: true # 必须是true,设为false在启动时会报MessageHandler bean找不到,具体原因没看
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
- https://github.com/Harries/springboot-demo(canal)
4.测试
- 启动springboot应用
- 更新数据库数据
- 观察控制台变化
5.引用
- https://github.com/alibaba/canal
- Spring Boot集成canal快速入门demo | Harries Blog™