springboot+canal+mysql+redis缓存双写一致性

canal官网地址:https://github.com/alibaba/canal/wiki/QuickStart
基本上按照官网的步骤来就行
在这里插入图片描述

准备

首先服务器上要安装好jdk,因为canal运行需要jdk,同时把canal对应的端口在服务中开放,否则连接不上

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

查看是否开启binlog日志功能

SHOW VARIABLES LIKE 'log_bin';

如果为OFF说明没有开启

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

注意点,这里执行命令最好是在服务器端执行,因为我们通过连接客户端登录的自己创建的后台用户比如root ,没有授权权限,会出错
出现以下情况
在这里插入图片描述

DROP USER IF EXISTS 'canal'@'%'; #删除用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';   ## 创建用户
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;

启动

下载 canal, 访问 release 页面 , 选择需要的包下载, 我这里下载最新的

https://github.com/alibaba/canal/releases/tag/canal-1.1.6

解压缩

上传服务器选择一个位置,并解压指定的文件夹下面

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

解压之后

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

配置修改

vi conf/example/instance.properties
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

## mysql serverId
canal.instance.mysql.slaveId = 1234 #默认是注释的,
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  #canal默认用户名
canal.instance.dbPassword = canal  #canal默认密码 
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*  #监听的表,默认监听所有表

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

在这里插入图片描述

查看 instance 的日志

vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

DEMO

依赖

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.3.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- 导入mysql驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency><!-- 导入数据源依赖--><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.9</version></dependency><!--canal--><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency></dependencies>

代码

配置文件
server:port: 8081
spring:datasource:url: jdbc:mysql://42.192.46.136:3306/test?useUnicode=true&useSSL=false&characterEncoding=utf-8username: rootpassword: lkzroottype: com.alibaba.druid.pool.DruidDataSourceredis:password: 123456host: 42.192.43.136port: 16379lettuce:pool:max-active: 200max-idle: 10max-wait: -1msmin-idle: 3
测试类
package com.boot.canal.client;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @Author: lkz* @Title: RedisCanalClientExample* @Description: TODO* @Date: 2023/9/18 14:06*/public class RedisCanalClientExample {public static final Integer _60SECONDS = 60;public static final String  Canal_host_ip = "127.0.0.1";private static void redisInsert(List<CanalEntry.Column> columns){JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(List<CanalEntry.Column> columns){JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(List<CanalEntry.Column> columns){JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {//获取变更的row数据rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);}//获取变动类型CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType == CanalEntry.EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");//=================================// 创建链接canal服务端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(Canal_host_ip, 11111),"example","","");int batchSize = 1000;//空闲空转计数器int emptyCount = 0;System.out.println("---------------------canal init OK,开始监听mysql变化------");try {connector.connect();//connector.subscribe(".*\\..*");connector.subscribe("test.test1");connector.rollback();int totalEmptyCount = 10 * _60SECONDS;while (emptyCount < totalEmptyCount) {System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");} finally {connector.disconnect();}}}
redis配置
package com.boot.canal.client;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class RedisUtils
{public static final String  REDIS_IP_ADDR = "127.0.0.1";public static final String  REDIS_pwd = "123456";public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,16379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}}

结果

修改表数据

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/141108.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT-day5

1、添加注册功能到数据库 头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMessageBox> //消息对话框类头文件 #include <QDebug> #include <QPushButton> #include <QSqlDatabase> //数据库管理类 #include…

ChatGPT详细搭建教程+支持AI绘画

一、AI创作系统 SparkAi系统是基于很火的GPT提问进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT系统&#xff1f;小编这里写一个详细图文教程吧&#x…

SpringBoot 之配置加密

Jasypt库的使用 Jasypt是一个Java简易加密库&#xff0c;用于加密配置文件中的敏感信息&#xff0c;如数据库密码。 Jasypt库与springboot集成&#xff0c;在实际开发中非常方便。 1、引入依赖 <dependency><groupId>com.github.ulisesbocchio</groupId>&…

构建工具Webpack简介

一、构建工具 当我们习惯了Node中使用ES模块化编写代码以后&#xff0c;用原生的HTML、CSS、JS这些东西会感觉到各种不便。比如&#xff1a;不能放心的使用模块化规范&#xff08;浏览器兼容性问题&#xff09;、即使可以使用模块化规范也会面临模块过多时的加载问题。 这时候…

当当网商品详情数据接口

当当网商品详情数据接口可以通过当当网的开放平台获取相关信息。您可以注册当当开放平台账号&#xff0c;并按照要求提交申请获取API接口的调用凭证。获得授权后&#xff0c;您将会收到一组AccessKey和SecretKey。使用编程语言&#xff08;如Java&#xff09;调用API接口&#…

《计算机网络》——应用层

2.1 应用层协议原理&#xff08;P54&#xff09; 研发网络应用的核心是写出能够运行在不同端系统和通过网络彼此交流的程序。 2.1.1 网络应用程序体系结构 两种主流的应用体系结构&#xff1a;客户-服务器体系结构、对等体系结构。 客户-服务器体系&#xff1a;服务器是一个…

适合新手自学的网络安全基础技能“蓝宝书”:《CTF那些事儿》

CTF比赛是快速提升网络安全实战技能的重要途径&#xff0c;已成为各个行业选拔网络安全人才的通用方法。但是&#xff0c;本书作者在从事CTF培训的过程中&#xff0c;发现存在几个突出的问题&#xff1a; 1&#xff09;线下CTF比赛培训中存在严重的“最后一公里”问题&#xf…

【项目实战】Linux系统下jar包自启动

什么是jar包自启动 在Linux系统中&#xff0c;"jar包自启动"是指通过配置将Java程序打包成可执行的Jar文件&#xff0c;并设置其在系统启动时自动运行。以下是与jar包自启动相关的一些概念&#xff1a; Jar文件&#xff1a;Jar&#xff08;Java Archive&#xff09…

51单片机光照强度检测自动路灯开关仿真( proteus仿真+程序+报告+讲解视频)

51单片机光照强度检测自动路灯开关仿真( proteus仿真程序报告讲解视频&#xff09; 仿真图proteus7.8及以上 程序编译器&#xff1a;keil 4/keil 5 编程语言&#xff1a;C语言 设计编号&#xff1a;S0052 讲解视频 基于51单片机的光照检测自动路灯控制仿真设计( proteus仿…

Day56:组件库封装-TypeScript入门

配置 安装tsc工具进行编译 npm i typescript -g 查看版本号&#xff1a;tsc -v 编译ts代码-需要使用tsc编译之后才能运行&#xff0c;TS为JS的衍生&#xff0c;浏览器不能直接识别TS语法&#xff1a;tsc xxx.ts 运行ts代码&#xff1a;node xxx.js 或者直接运行ts代码——t…

【从0学习Solidity】52. EIP712 类型化数据签名

【从0学习Solidity】 52. EIP712 类型化数据签名 博主简介&#xff1a;不写代码没饭吃&#xff0c;一名全栈领域的创作者&#xff0c;专注于研究互联网产品的解决方案和技术。熟悉云原生、微服务架构&#xff0c;分享一些项目实战经验以及前沿技术的见解。关注我们的主页&#…

ChatGPT的问世给哪些行业带来了冲击?

目录 引言Chat GPT 对行业的影响在线客服和智能客服行业传统自动回复机器人的局限性Chat GPT 的提升能力 教育培训行业个性化学习需求的挑战Chat GPT 的个性化优势 金融保险行业客户服务的变革Chat GPT 的智能化应用 医疗健康领域自助诊断及咨询的便利性Chat GPT 在医疗领域的应…

vue项目打包部署到服务器,报错。

这个是因为后端部署服务器时&#xff0c;名称没有对上&#xff0c;不是前端的问题&#xff0c;后端配置名称和前端的包名称保持一致就可以了。

轻量级c语言开源日志库log.c介绍 - 实现不同级别和参数化日志打印

前言 c语言没有现成的日志库&#xff0c;如果要记录日志&#xff0c;需要自己封装一个日志库。如果要实现日志级别和参数打印&#xff0c;还是比较麻烦的&#xff0c;正好在github找到了一个c语言开源日志库&#xff0c;可以实现日志级别打印&#xff0c;参数打印&#xff0c;…

SAP PO运维(四):适配器消息监控

登录SAP PO系统,点击“Configuration and Monitoring Home”,使用PISUPER账号登录: 2、选择“适配器引擎->消息监控器”: 3、查看是否有报错消息: 双击报错的数字,筛选出报错的条目(可以根据状态、接口命名空间等来筛选):常见的报错消息有: 接口配置问题:字段为空值…

好题记录 Leetcode 394.字符串解码 中等难度

方法一&#xff1a;递归 思路很简单&#xff0c;比较好理解&#xff0c;注意细节处理&#xff01;&#xff01;&#xff01; class Solution { public:string decodeString(string s) {string ans;for(int i0;s[i]!0;i){if(s[i]>a&&s[i]<z)anss[i];if(s[i]>…

十四、流式编程(4)

本章概要 终端操作 数组循环集合组合匹配查找信息数字流信息 终端操作 以下操作将会获取流的最终结果。至此我们无法再继续往后传递流。可以说&#xff0c;终端操作&#xff08;Terminal Operations&#xff09;总是我们在流管道中所做的最后一件事。 数组 toArray()&…

实时更新进度条:JavaScript中的定时器和异步编程技巧

前言 在Web开发中&#xff0c;有许多场景需要实时地更新页面上的进度&#xff0c;例如上传文件、数据处理等。本文将介绍如何利用JavaScript中的定时器和异步编程技巧来实现实时更新进度&#xff0c;并探讨一些其他解决方案。 处理进度实时更新&#xff1a; 利用异步编程实现实…

速卖通商品详情数据接口

速卖通商品详情数据接口&#xff08;aliexpress商品详情API接口&#xff09;可以获取到速卖通商品的详细信息&#xff0c;如商品标题、价格、库存、详情描述、图片等。 速卖通商品详情API接口是速卖通提供的一种产品数据接口&#xff0c;可以帮助速卖通卖家快速地将产品分类、…

Mysql主从数据恢复随笔

目录 1.使用pt-table-checksum插件安装方式如下 2.在主节点执行检查数据同步情况 3.同步检查出现的问题 3.1没有sock文件 3.2 Authentication plugin ‘sha256_password’ cannot be loaded: /usr/lib64/mysql/plugin/sha256_password.so: 无法打开共享对象文件: 没有那个文…