canal官网地址:https://github.com/alibaba/canal/wiki/QuickStart
基本上按照官网的步骤来就行
准备
首先服务器上要安装好jdk,因为canal运行需要jdk,同时把canal对应的端口在服务中开放,否则连接不上
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
查看是否开启binlog日志功能
SHOW VARIABLES LIKE 'log_bin';
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
注意点,这里执行命令最好是在服务器端执行,因为我们通过连接客户端登录的自己创建的后台用户比如root ,没有授权权限,会出错
出现以下情况
DROP USER IF EXISTS 'canal'@'%'; #删除用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; ## 创建用户
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;
启动
下载 canal, 访问 release 页面 , 选择需要的包下载, 我这里下载最新的
https://github.com/alibaba/canal/releases/tag/canal-1.1.6
解压缩
上传服务器选择一个位置,并解压指定的文件夹下面
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
解压之后
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
配置修改
vi conf/example/instance.properties
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
## mysql serverId
canal.instance.mysql.slaveId = 1234 #默认是注释的,
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal #canal默认用户名
canal.instance.dbPassword = canal #canal默认密码
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\* #监听的表,默认监听所有表
启动
sh bin/startup.sh
查看 server 日志
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
DEMO
依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.3.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- 导入mysql驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency><!-- 导入数据源依赖--><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.9</version></dependency><!--canal--><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency></dependencies>
代码
配置文件
server:port: 8081
spring:datasource:url: jdbc:mysql://42.192.46.136:3306/test?useUnicode=true&useSSL=false&characterEncoding=utf-8username: rootpassword: lkzroottype: com.alibaba.druid.pool.DruidDataSourceredis:password: 123456host: 42.192.43.136port: 16379lettuce:pool:max-active: 200max-idle: 10max-wait: -1msmin-idle: 3
测试类
package com.boot.canal.client;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @Author: lkz* @Title: RedisCanalClientExample* @Description: TODO* @Date: 2023/9/18 14:06*/public class RedisCanalClientExample {public static final Integer _60SECONDS = 60;public static final String Canal_host_ip = "127.0.0.1";private static void redisInsert(List<CanalEntry.Column> columns){JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(List<CanalEntry.Column> columns){JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(List<CanalEntry.Column> columns){JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {//获取变更的row数据rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);}//获取变动类型CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType == CanalEntry.EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");//=================================// 创建链接canal服务端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(Canal_host_ip, 11111),"example","","");int batchSize = 1000;//空闲空转计数器int emptyCount = 0;System.out.println("---------------------canal init OK,开始监听mysql变化------");try {connector.connect();//connector.subscribe(".*\\..*");connector.subscribe("test.test1");connector.rollback();int totalEmptyCount = 10 * _60SECONDS;while (emptyCount < totalEmptyCount) {System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");} finally {connector.disconnect();}}}
redis配置
package com.boot.canal.client;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class RedisUtils
{public static final String REDIS_IP_ADDR = "127.0.0.1";public static final String REDIS_pwd = "123456";public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,16379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}}