Canal笔记:安装与整合Springboot模式Mysql同步Redis

官方文档

https://github.com/alibaba/canal

使用场景

学习一件东西前,要知道为什么使用它。

1、同步mysql数据到redis

常规情况下,产生数据的方法可能有很多地方,那么就需要在多个地方中,都去做mysql数据同步到redis的处理,相对麻烦很多。
可以使用canal,对mysql进行集中,统一的处理。

概述

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

原理

MySQL主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relaylog 中事件,将数据变更反映它自己的数据

image.png

canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

架构

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)
  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

image.png

安装和准备

Centos7安装Canal

1、Mysql配置

开启binlog日志

如果是使用Linux安装的话,则直接找my.cnf,直接修改内容即可

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

docker安装

1、安装my.cnf文件

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、修改docker-compose.yaml内容
配置挂载卷 前面路径为my.cnf的路径,/etc/mysql/conf.d的路径
image.png

3、查询是否成功

show variables like "%server_id%";

image.png

show variables like 'log_bin';

image.png

获取bin_log当前位置

show master status;

image.png
获取后,记录下来,然后不要动数据库了

创建canal数据库用户
这里可以使用
mysql -uroot -p登录进入设置,
或者直接可视化页面

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2、Canal下载

下载
方式一:关注公众号 I am Walker 回复canal

方式二:https://github.com/alibaba/canal/releases?page=2
image.png

# 创建文件夹
mkdir /opt/env/canal
# 解压
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/env/canal
3、配置文件修改

进入canal/conf/example/instance.properties
主要修改下列相关参数

# 数据库
canal.instance.master.address=127.0.0.1:3306
# bin log日志
canal.instance.master.journal.name=mysql-bin.000001
# bin log写入位置
canal.instance.master.position=157#数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

之后进入 canal/bin 执行 ./startup.sh
查看是否启动
image.png
有CanalLauncher则代表ok,或者看日志也ok

场景

springboot整合

简单整合

1、依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>
2、配置文件
canal:# 服务地址serverAddress: localhost# 端口serverPort: 11111# 订阅 库 表subscrie: ".*\\..*"# batchSize: 100# 实例instance:- example

subscrie配置


全库全表	
connector.subscribe(".*\\..*")
指定库全表	
connector.subscribe("test\\..*")
单表
connector.subscribe("test.user")
多规则组合使用
connector.subscribe("test\\..*,test2.user1,test3.user2")
3、properties类
package com.walker.mybatisplus.canal;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.List;@Data
@Component
@ConfigurationProperties(value = "canal")
public class CanalProperties {private String serverAddress;private Integer serverPort;private String subcribe;private Integer batchSize;private List<String> instance;}
4、监听类编写
package com.walker.mybatisplus.canal;import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
@Component
public class CanalListener {@Autowiredprivate CanalProperties canalProperties;public static Map<String,Integer> NUM_MAP=new HashMap<>();/*** 可能会有多业务,不同的业务,应该有多个处理类,不要使用if等*/@PostConstructpublic void run() throws InterruptedException, InvalidProtocolBufferException {//创建Canal连接对象CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getServerAddress(),canalProperties.getServerPort()),canalProperties.getInstance().get(0),null, null);while(true){//连接conn.connect();
//            监听的数据库和表conn.subscribe(canalProperties.getSubcribe());//回滚操作conn.rollback();//获取信息Message message = conn.getWithoutAck(canalProperties.getBatchSize());long id = message.getId();List<CanalEntry.Entry> entries = message.getEntries();if(id!=-1&&entries.size()>0){//处理数据messageProcess(entries);}else{//防止重复链接数据库Thread.sleep(1000);}//确认消费信息conn.ack(id);//释放连接conn.disconnect();}}private void messageProcess(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entries) {log.info("接收Entry:{}", entry);CanalEntry.Header header = entry.getHeader();//数据库String schemaName = header.getSchemaName();//表名String tableName = header.getTableName();//事件类型CanalEntry.EventType eventType = header.getEventType();//这里可以对数据库和表进行一个重新判断 虽然在subscribe已经定义,但是一般可以配置一个库,然后表的可能可以是全部表
//            对库进行判断if(!"walker_share".equals(schemaName)){continue;}//对表进行判断//这里只是一个案例,如果是实际场景,可以使用工厂模式去编写,不然会有很多的ifif("dish".equals(tableName)){//获取修改数据List<CanalEntry.RowData> rowDataList = getRowDataList(entry);//新增if(eventType.getNumber()==CanalEntry.EventType.INSERT_VALUE){log.info("新增事件");if(CollUtil.isEmpty(rowDataList)) continue;//模拟场景:获取新增的数据,并存储到redis中,这里是直接存储到Map中for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {//获取name的类型if("type".equals(column.getName())){//模拟redis  根据类型进行分类String key = column.getValue();NUM_MAP.put(key,NUM_MAP.getOrDefault(key,0)+1);log.info("NUM_MAP {}",NUM_MAP);continue;}}}}if(eventType.getNumber()==CanalEntry.EventType.UPDATE_VALUE){log.info("修改事件");}if(eventType.getNumber()==CanalEntry.EventType.DELETE_VALUE){log.info("删除事件");}}}}//获取row数据private List<CanalEntry.RowData> getRowDataList(CanalEntry.Entry entry) {CanalEntry.RowChange rowChange=null;try {//解析数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);}List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();return rowDatasList;}
}

相关类和配置

CanalConnector

image.png

CanalEntry

image.png

EntryType

image.png

Header

image.png

EventType

事件类型,可以根据事件类型去做不一样的操作
image.png

RowChange

image.png

获取数据

CanalEntry.RowChange rowChange=null;
try {//解析数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
}
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
log.info("rowDatasList:{}",rowDatasList);
RowData

image.png

CanalEntry.Column

属性
image.png

问题

IOException: caching_sha2_password Auth failed

因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed

参考文档

Java开发 - Canal的基本用法_canal java-CSDN博客
15分钟学会Canal安装与部署-CSDN博客
SpringBoot整合Canal1.1.6并同步数据到Redis(超详细和很多踩坑点)_canal同步数据到redis-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/210974.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2005-2021年地级市绿色发展注意力数据(根据政府报告文本词频统计)

2005-2021年地级市绿色发展注意力数据&#xff08;根据政府报告文本词频统计&#xff09; 1、时间&#xff1a;2005-2021年 2、指标&#xff1a;省、市、年份、一级指标、关键词、关键词词频、总词频 3、范围&#xff1a;270个地级市 4、来源&#xff1a;地级市政府工作报告…

深度学习TensorFlow2基础知识学习前半部分

目录 测试TensorFlow是否支持GPU&#xff1a; 自动求导&#xff1a; 数据预处理 之 统一数组维度 定义变量和常量 训练模型的时候设备变量的设置 生成随机数据 交叉熵损失CE和均方误差函数MSE 全连接Dense层 维度变换reshape 增加或减小维度 数组合并 广播机制&#…

CCKS2023-面向金融领域的主体事件检测-亚军方案分享

赛题分析 大赛地址 https://tianchi.aliyun.com/competition/entrance/532098/introduction?spma2c22.12281925.0.0.52b97137bpVnmh 任务描述 主体事件检测是语言文本分析和金融领域智能应用的重要任务之一&#xff0c;如在金融风控领域往往会对公司主体进行风险事件的检测…

杂散表的阅读

杂散表得阅读 —— 以Marki公司得手册为例 混频杂散&#xff08;Mixing Spurs&#xff09;是指信号经过混频器时&#xff0c;不仅会与本振混频&#xff0c;还会与本振的高次谐波混频&#xff08;对于第二章说的方波本振&#xff0c;信号只与本振的奇次谐波混频因为方波只含有奇…

VSC改造MD编辑器及图床方案分享

VSC改造MD编辑器及图床方案分享 用了那么多md编辑器&#xff0c;到头来还是觉得VSC最好用。这次就来分享一下我的blog文件编辑流吧。 这篇文章包括&#xff1a;VSC下md功能扩展插件推荐、图床方案、blog文章管理方案 VSC插件 Markdown All in One Markdown Image - 粘粘图片…

gitLab创建新项目

1.进入git2.选择创建项目3.勾选生成readme.md文件4.邀请成员

【MATLAB源码-第93期】基于matlab的白鲸优化算法(BWO)和鲸鱼优化算法(WOA)机器人栅格路径规划对比。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 白鲸优化算法&#xff08;BWO&#xff09; 白鲸优化算法是受到白鲸捕食和迁徙行为启发的一种算法。其主要特点和步骤包括&#xff1a; 1. 搜索食物&#xff08;全局搜索&#xff09;&#xff1a;算法模仿白鲸寻找食物的行为。…

流媒体音视频/安防视频云平台/可视化监控平台EasyCVR无法启动且打印panic报错,是什么原因?

国标GB视频监控管理平台/视频集中存储/云存储EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;实现视频资源的鉴权管理、按需调阅、全网分发、智能分析等。AI智能大数据视频分析EasyCVR平台已经广泛应用在工地、工厂、园…

分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署

一、分布式搜索引擎&#xff1a;Elastic Search Elastic Search的目标就是实现搜索。是一款非常强大的开源搜索引擎&#xff0c;可以帮助我们从海量数据中快速找到需要的内容。在数据量少的时候&#xff0c;我们可以通过索引去搜索关系型数据库中的数据&#xff0c;但是如果数…

【C++11】线程库/异常

一&#xff1a;线程库 1.1:线程库(thread) 1.1.1&#xff1a;为什么要有线程库 1.1.2&#xff1a;thread库中的成员函数 1.1.3&#xff1a;线程函数参数 1.2:互斥锁(mutex) 1.2.1&#xff1a;为什么要有互斥锁 1.2.2&#xff1a;C11中的互斥锁 1.3:原子操作(atomic) 1.4:条件变…

Apollo新版本Beta技术沙龙

有幸参加Apollo开发者社区于12月2日举办的Apollo新版本(8.0)的技术沙龙会&#xff0c;地址在首钢园百度Apollo Park。由于去的比较早&#xff0c;先参观了一下这面的一些产品&#xff0c;还有专门的讲解&#xff0c;主要讲了一下百度无人驾驶的发展历程和历代产品。我对下面几个…

Java画爱心

Java画爱心代码&#xff0c;每个人都可以被需要 效果图 源代码 package com.example.test; import java.awt.Color; import java.awt.Font; import java.awt.Graphics; import java.awt.Image; import java.awt.Toolkit; import javax.swing.JFrame; class Cardioid extend…

Java+Swing+Mysql实现超市管理系统

一、系统介绍 1.开发环境 操作系统&#xff1a;Win10 开发工具 &#xff1a;IDEA2018 JDK版本&#xff1a;jdk1.8 数据库&#xff1a;Mysql8.0 2.技术选型 JavaSwingMysql 3.功能模块 4.系统功能 1.系统登录登出 管理员可以登录、退出系统 2.商品信息管理 管理员可以对商品信息…

超完整的mysql安装配置方法(包含idea和navicat连接mysql,并实现建表)

mysql安装配置方法 1、下载mysql2、解压到指定的安装目录3、配置初始化文件my.ini4、配置用户变量和系统变量5、初始化mysql6、安装mysql服务并启动修改密码7、使用idea连接mysql8、使用Navicat可视化工具连接mysql&#xff0c;并实现新建数据库&#xff0c;新建表 1、下载mysq…

C语言-详解指针

目录 一.内存 1.内存的定义 2.内存的结构图 二.地址 1.什么是地址 2.什么是变量的地址 三.什么是指针 1.指针的定义 四.如何获取数据存储空间的地址 1.&运算符 五.指针变量 1.什么是指针变量&#xff08;一级指针变量&#xff09; 2.指针变量的定义 3…

<JavaEE> 什么是线程安全?产生线程不安全的原因和处理方式

目录 一、线程安全的概念 二、线程不安全经典示例 三、线程不安全的原因和处理方式 3.1 线程的随机调度和抢占式执行 3.2 修改共享数据 3.3 关键代码或指令不是“原子”的 3.4 内存可见性和指令重排序 四、Java标准库自带的线程安全类 一、线程安全的概念 线程安全是指…

Electron+Ts+Vue+Vite桌面应用系列:sqlite增删改查操作篇

文章目录 1️⃣ sqlite应用1.1 sqlite数据结构1.2 初始化数据库1.3 初始化实体类1.4 操作数据类1.5 页面调用 优质资源分享 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418/article/details/134692751 ElectronTsVueVite桌面应用系列 &am…

nacos启动报错 java.lang.RuntimeException: [db-load-error]load jdbc.properties error

以standalone mode sh startup.sh -m standalone 为例子 启动nacos 报错&#xff1a; Caused by: org.springframework.boot.web.server.WebServerException: Unable to start embedded Tomcatat org.springframework.boot.web.embedded.tomcat.TomcatWebServer.initialize(To…

Linux中shell的运行原理

在Linux中&#xff0c;每次输入命令时&#xff0c;前面都会出现一串字母&#xff0c;我们称之为命令行提示符 实际上&#xff0c;命令行提示符是一种外壳程序 外壳程序的概念&#xff1a; 前面我们提到过&#xff0c;在Linux中&#xff0c;一切皆文件&#xff0c;所谓的命令就…

C语言 - 字符函数和字符串函数

系列文章目录 文章目录 系列文章目录前言1. 字符分类函数islower 是能够判断参数部分的 c 是否是⼩写字⺟的。 通过返回值来说明是否是⼩写字⺟&#xff0c;如果是⼩写字⺟就返回⾮0的整数&#xff0c;如果不是⼩写字⺟&#xff0c;则返回0。 2. 字符转换函数3. strlen的使⽤和…