Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性

目录

1. 背景

2. Windows系统安装canal

3.Mysql准备工作

4. 公共依赖包

5. Redis缓存设计

6. mall-canal-service


1. 背景

canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以其核心功能如下:

  • 数据实时备份
  • 异构数据源(elasticsearch、Hbase)与数据库数据增量同步
  • 业务缓存cache 刷新,保证缓存一致性
  • 带业务逻辑的增量数据处理,如监听某个数据的变化做一定的逻辑处理

canal是借助于MySQL主从复制原理实现

  • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。

 canal的工作原理:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

2. Windows系统安装canal

Github下载链接:Releases · alibaba/canal · GitHub

 

 解压

 进入/conf/example,修改instance.properties

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

之后进入到bin目录下启动startup就可以了。

3.Mysql准备工作

/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*//*!40101 SET NAMES utf8 */;/*!40101 SET SQL_MODE=''*/;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;USE `shop`;/*Table structure for table `brand` */DROP TABLE IF EXISTS `brand`;CREATE TABLE `brand` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '品牌id',`name` VARCHAR(100) NOT NULL COMMENT '品牌名称',`image` VARCHAR(1000) DEFAULT '' COMMENT '品牌图片地址',`initial` VARCHAR(1) DEFAULT '' COMMENT '品牌的首字母',`sort` INT DEFAULT NULL COMMENT '排序',PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 COMMENT='品牌表';/*Data for the table `brand` */INSERT  INTO `brand`(`id`,`name`,`image`,`initial`,`sort`) VALUES 
(11,'华为','https://sklll.oss-cn-beijing.aliyuncs.com/secby/eed72cc4-a9c1-4010-949a-03cef5b933d6.jpg','',NULL),
(12,'中兴','https://sklll.oss-cn-beijing.aliyuncs.com/secby/4fedb361-5ab3-4ad0-a667-580c1f37dff0.jpg','',NULL),
(13,'大疆','https://sklll.oss-cn-beijing.aliyuncs.com/secby/e8382c48-0487-4a9b-8fd0-a3716c3eea19.jpg','',NULL);/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

4. 公共依赖包

<dependencies><!--web包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--MyBatis Plus--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.2</version></dependency><!--MySQL--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!--Redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--Nacos--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency></dependencies>

5. Redis缓存设计

这里基于Springcloud Alibaba进行设计,对应的服务名称为mall-goods

bootstrap.yaml代码如下:

server:port: 8081
spring:application:name: mall-goodsdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTCusername: rootpassword: 123456cloud:nacos:config:file-extension: yamlserver-addr: localhost:8848discovery:#Nacos的注册地址server-addr: localhost:8848redis:host: xx //改为自己redis ip地址port: 6379

导入RedisConfig配置

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(factory);GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer();// 值采用json序列化redisTemplate.setValueSerializer(serializer);//使用StringRedisSerializer来序列化和反序列化redis的key值redisTemplate.setKeySerializer(new StringRedisSerializer());// 设置hash key 和value序列化模式redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(serializer);redisTemplate.afterPropertiesSet();return redisTemplate;}@Beanpublic RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(12))//设置默认缓存时间.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);}
}

对于Brand进行redis缓存设计

在主启动类上添加@EnableCaching开启缓存。

@Cacheable 注解可以标记一个方法需要被缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会先查找缓存,如果缓存中存在相应的数据,则直接从缓存中读取,否则执行方法并将结果缓存到缓存中。

@CachePut 注解可以标记一个方法需要更新缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会更新缓存中的数据。

@CacheEvict 注解可以标记一个方法需要删除缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会删除缓存中对应的数据。

代码如下:

@RestController
@RequestMapping("/brand")
public class BrandController {@AutowiredBrandService brandService;@GetMapping("/{id}")@Cacheable(value = "brand",key = "#id")public Brand search(@PathVariable(value = "id")Integer id){return brandService.getById(id);}/***** 添加缓存*/@PostMapping@CachePut(value = "brand",key = "#brand.id")public Brand add(@RequestBody Brand brand){return brand;}/***** 修改缓存*/@PutMapping@CachePut(value = "brand",key = "#brand.id")public Brand update(@RequestBody Brand brand){return brand;}/***** 删除缓存*/@DeleteMapping("/{id}")@CacheEvict(value = "brand",key = "#id")public void delete(@PathVariable(value = "id")Integer id){}
}

6. mall-canal-service

这里微服务名称为mall-canal-service,监控Mysql数据库数据变化进行实时的更新缓存。

除公共依赖外导入依赖

     <dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-loadbalancer</artifactId></dependency>

bootstrap.yaml设计如下:

server:port: 8083
spring:application:name: mall-canalcloud:nacos:config:file-extension: yamlserver-addr: localhost:8848discovery:#Nacos的注册地址server-addr: localhost:8848
#Canal配置
canal:server: localhost:11111destination: example

设计Feign接口

@FeignClient(value = "mall-goods",contextId = "brand")//服务名字
public interface BrandFeign {@GetMapping("/brand/{id}")Brand search(@PathVariable(value = "id")Integer id);@PostMapping("/brand")Brand add(@RequestBody Brand brand);/***** 修改方法*/@PutMapping("/brand")Brand update(@RequestBody Brand brand);/***** 删除方法*/@DeleteMapping("/brand/{id}")void delete(@PathVariable(value = "id")Integer id);}

Canal实时监控Mysql数据库变化,代码设计如下:

@Component
@CanalTable(value = "brand")
public class BrandHandler implements EntryHandler<Brand> {@AutowiredBrandFeign brandFeign;@Overridepublic void insert(Brand brand) {System.out.println(brand);brandFeign.add(brand);}@Overridepublic void update(Brand before, Brand after) {System.out.println(after);brandFeign.update(after);}@Overridepublic void delete(Brand brand) {System.out.println(brand);brandFeign.delete(brand.getId());}
}

在MallCanalApplication主启动类上添加@EnableFeignClients(basePackages = {"org.example.feign"}),包名为feign接口路径。

注意:本人在使用canal时,数据库有非varchar类型的null值,在运行时会抛异常错误。

	at top.javatool.canal.client.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:52) ~[canal-client-1.2.1-RELEASE.jar:na]at top.javatool.canal.client.handler.impl.AsyncMessageHandlerImpl.lambda$handleMessage$0(AsyncMessageHandlerImpl.java:30) ~[canal-client-1.2.1-RELEASE.jar:na]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_281]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_281]at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_281]
Caused by: java.lang.NumberFormatException: For input string: ""at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_281]

代码运行后,经过测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/228028.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深度解析C++】const成员函数

系列文章目录 &#x1f308;座右铭&#x1f308;&#xff1a;人的一生这么长、你凭什么用短短的几年去衡量自己的一生&#xff01; &#x1f495;个人主页:清灵白羽 漾情天殇_计算机底层原理,深度解析C,自顶向下看Java-CSDN博客 ❤️相关文章❤️&#xff1a;Cthis指针&#xf…

PostgreSQL | FunctionProcedure | 函数与存储过程的区别

文章目录 PostgreSQL | Function&Procedure | 函数与存储过程的区别1. 简述书面说法大白话讲 2. 函数&#xff08;Function&#xff09;2.1 定义2.2 用途2.3 执行2.4 事务处理2.5 说点例子1. 当参数都是IN类时2. 参数中出现OUT、INOUT参数时 3. 存储过程&#xff08;Proced…

C语言之整型提升

文章目录 1 有可能出现的问题2 产生以上问题的原因&#xff08;整型提升&#xff09;3 整型提升的过程4 整型提升示例5 总结 1 有可能出现的问题 代码如下 #include <stdio.h>int main () {int a -1;unsigned int b 1;if (a < b) {printf("a < b");}…

【STM32】SPI通信

1 SPI通信 SPI&#xff08;Serial Peripheral Interface&#xff0c;串行外设接口&#xff09;是由Motorola公司开发的一种通用数据总线 四根通信线&#xff1a;SCK&#xff08;Serial Clock&#xff0c;串行时钟&#xff09;、MOSI&#xff08;Master Output Slave Input&am…

使用react+vite开发项目时候,部署上线后刷新页面无法访问解决办法

说一下我这边的环境和使用的路由模式&#xff1a;vitereactBrowserRouter路由模式&#xff0c;所以如果你和我一样的话&#xff0c;可以试试我的这种解决办法&#xff0c;我是将项目打包后直接丢到服务器上的目录里面&#xff0c;然后配置nginx直接访问根目录。 我的nginx配置…

React快速入门之交互性

响应事件 创建事件处理函数 处理函数名常以handle事件名命名 function handlePlayClick() {alert(Playing);}传递事件处理函数 函数名、匿名两种方式&#xff01; function PlayButton() {function handlePlayClick() {alert(Playing);}return (<Button handleClick{handl…

java虚拟机内存管理

文章目录 概要一、jdk7与jdk8内存结构的差异二、程序计数器三、虚拟机栈3.1 什么是虚拟机栈3.2 什么是栈帧3.3 栈帧的组成 四、本地方法栈五、堆5.1 堆的特点5.2 堆的结构5.3 堆的参数配置 六、方法区6.1 方法区结构6.2 运行时常量池 七、元空间 概要 根据 JVM 规范&#xff0…

探索小红书笔记API:挖掘数据背后的故事

随着数字化时代的到来&#xff0c;数据已经成为企业和个人决策的重要依据。小红书作为一个流行的社交电商平台&#xff0c;积累了大量的用户数据和内容。通过探索小红书笔记API&#xff0c;我们可以深入挖掘这些数据背后的故事&#xff0c;从而更好地理解用户需求和市场趋势。 …

弱电工程计算机网络系统基础知识

我们周围无时无刻不存在一张网&#xff0c;如电话网、电报网、电视网、计算机网络等&#xff1b;即使我们身体内部也存在许许多多的网络系统&#xff0c;如神经系统、消化系统等。最为典型的代表即计算机网络&#xff0c;它是计算机技术与通信技术两个领域的结合。 计算机网络的…

《Spring Cloud学习笔记:微服务保护Sentinel + JMeter快速入门》

Review 解决了服务拆分之后的服务治理问题&#xff1a;Nacos解决了服务治理问题OpenFeign解决了服务之间的远程调用问题网关与前端进行交互&#xff0c;基于网关的过滤器解决了登录校验的问题 流量控制&#xff1a;避免因为突发流量而导致的服务宕机。 隔离和降级&#xff1a…

八股文打卡day12——计算机网络(12)

面试题&#xff1a;HTTPS的工作原理&#xff1f;HTTPS是怎么建立连接的&#xff1f; 我的回答&#xff1a; 1.客户端向服务器发起请求&#xff0c;请求建立连接。 2.服务器收到请求之后&#xff0c;向客户端发送其SSL证书&#xff0c;这个证书包含服务器的公钥和一些其他信息…

LVS那点事

LVS 原理 IPVS LVS 的 IP 负载均衡技术是通过 IPVS 模块来实现的&#xff0c;IPVS 是 LVS 集群系统的核心软件&#xff0c;它的主要作用是&#xff1a;安装在 Director Server 上&#xff0c;同时在 Director Server 上虚拟出一个 IP 地址&#xff0c;用户必须通过这个虚拟的…

k8s的二进制部署: 源码包部署-----node节点部署

服务器IP软件包k8s--master0120.0.0.61kube-aplserver&#xff0c;kube-controer-manager&#xff0c;kube-scheduler&#xff0c;etcdk8s--master0220.0.0.62kube-controer-manager&#xff0c;kube-schedulernode节点0120.0.0.62kubelet&#xff0c;kube-proxy&#xff0c;et…

初识javaWeb

一、JavaWeb是什么&#xff1f; 1、概念 javaWeb指的是使用java语言进行互联网领域项目开发的技术栈——进行web项目开发所需的技术的集合。 -Web前端——在浏览器中用户可以看到的网页 -Web后端——为前端提供数据的程序 2、Web项目 java语言是可以进行多种类型的项目开发&a…

信号处理设计模式

问题 如何编写信号安全的应用程序&#xff1f; Linux 应用程序安全性讨论 场景一&#xff1a;不需要处理信号 应用程序实现单一功能&#xff0c;不需要关注信号 如&#xff1a;数据处理程序&#xff0c;文件加密程序&#xff0c;科学计算程序 场景二&#xff1a;需要处理信…

计算机毕业设计------ssm茶叶溯源系统

项目介绍 茶叶溯源系统&#xff0c;分为前台与后台。普通用户可在前台通过18位的编码查询茶叶的出售历史。 后台分为两种角色&#xff0c;管理员与经销商&#xff1b; 管理员主要功能包括&#xff1a; 主界面&#xff1b; 管理员管理&#xff1a;管理员列表、添加管理员&am…

SparkStreaming_window_sparksql_reids

1.5 window 滚动窗口滑动窗口 window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持&#xff0c;从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据&#xff0c;会被聚合起来执行计算操作&#xff0c;然后生成的RDD&#xff0c;会…

设计模式——行为型模式

模板方法模式 行为型模式用于描述程序在运行时复杂的流程控制&#xff0c;即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务&#xff0c;它涉及算法与对象间职责的分配。 行为型模式分为类行为模式和对象行为模式&#xff0c;前者采用继承机制来在类间…

PHP序列化总结3--反序列化的简单利用及案例分析

反序列化中生成对象里面的值&#xff0c;是由反序列化里面的值决定&#xff0c;与原类中预定义的值的值无关&#xff0c;穷反序列化的对象可以使用类中的变量和方法 案例分析 反序列化中的值可以覆盖原类中的值 我们创建一个对象&#xff0c;对象创建的时候触发了construct方…

基于QT开发的温室气体数据记录软件

1、概述 温室气体分析仪数据记录软件用于实现温室气体分析仪数据的获取与存储&#xff0c;阀箱数据的获取与存储、冷阱数据的获取与存储、采样单元数据的获取与存储、阀箱和采样单元的远程操作以及系统功能的管理。其主操作界面如下&#xff1a; 上述软件界面分为2各区域&…