实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

目录

1. 准备工作

2. 将需要缓存的数据存储 Redis

3. 监听 canal 存储在 Kafka Topic 中数据


1. 准备工作

1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud    # 监控 AI Cloud 项目

如果要同步多个项目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重启MySQL服务

3. 赋值数据同步权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安装并配置 Canal

下载地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),

可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据

(主节点的 slaveId = 1)

address 配置连接本地的 MySQL

topic 配置数据发送到 Kafka 的某个主题下

5. 拷贝 Jar 包到 lib

将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。

6. 删除 bin 目录下 startup.bat 里的参数

如果启动时报错:

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

删除 -XX:PermSize=128m 参数即可。

7. 启动 canal

打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车

2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:

/*** 获取历史聊天记录(对话/绘图)** @param type* @return {@link ResponseEntity }*/
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);Object list = redisTemplate.opsForValue().get(listCacheKey);if (ObjectUtil.isNull(list)) {LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());queryWrapper.eq(Answer::getType, type);queryWrapper.eq(Answer::getModel, model);queryWrapper.orderByDesc(Answer::getAid);List<Answer> answerList = answerService.list(queryWrapper);List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());// 缓存 1 天redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);return ResponseEntity.success(answerVoList);} else {return ResponseEntity.success(list);}
}
/*** 查询列表存储 Redis 缓存** @param uid* @param model* @param type* @return {@link String }*/
public static String getListCacheKey(Long uid, Integer model, Integer type) {return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 监听 Kafka Topic 中数据并删除 Redis 缓存

首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的  topic 并针对 data 中的数据进行一个提取,得到一个  cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。

【代码示例】

/*** canal 监控 binlog 日志,将修改的数据存储 kafka topic 中* 监听 kafka topic 中的数据** @param data* @param ack* @throws JsonProcessingException*/
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);if (map.isEmpty()) {ack.acknowledge();return;}// 匹配上对应的数据库和数据表if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {// 更新缓存 List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);if (!CollectionUtils.isEmpty(list)) {for (Map<String, Object> answerMap : list) {String answerListCacheKey = RedisUtil.getListCacheKey(Long.valueOf(answerMap.get("uid").toString()),Integer.parseInt(answerMap.get("model").toString()),Integer.parseInt(answerMap.get("type").toString()));// 删除缓存,让下一次查询走数据库,并同步缓存redisTemplate.delete(answerListCacheKey);}}}//  手动确认应答ack.acknowledge();
}
/*** canal 同步数据到 kafka*/
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";/*** 数据库,缓存数据一致性的*/public static final String DATABASE_KEY = "database";public static final String TABLE_KEY = "table";public static final String DATA_KEY = "data";public static final String TARGET_DATABASE = "aicloud";public static final String TARGET_TABLE = "answer";

【补充】

kafka ui 下载地址:​​​​​​https://github.com/provectus/kafka-ui/tags

修改配置

kafka:clusters:- name: kafka3_clusterbootstrapServers: 127.0.0.1:9092

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/383788.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

本田Honda EDI项目案例:非EDI标准的数据格式转换与传输

近期知行帮助东风本田Honda的供应商E公司成功实施EDI项目&#xff0c;与以往采用X12、EDIFACT等EDI标准的项目不同&#xff0c;Honda向其供应商提供API接口&#xff0c;以JSON的格式传输库存信息以及生产计划。 EDI需求概览 Honda提供公开的WSRM系统供应商API接口&#xff0c…

数据库中的事务

一、理解事务 1、本质 事务由一组DML语句组成&#xff0c;这一组语句要么全部成功&#xff0c;要么全部失败。在逻辑上&#xff0c;事务就是一组sql语句&#xff0c;但在实际中&#xff0c;公共的数据库一定会高并发地接受各种事务的请求&#xff0c;所以一个事务要有4个属性…

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案 一&#xff0c; 文章简介二&#xff0c;硬件平台构建2.1 音频源板2.2 音频收发板2.3 双板硬件连接 三&#xff0c;软件方案与软件实现3.1 方案实现3.2 软件代码实现3.2.1 4路I2S接收3.2.2 I2S DMA pingpong配置3.2.3 音频数…

经典文献阅读之--World Models for Autonomous Driving(自动驾驶的世界模型:综述)

Tip: 如果你在进行深度学习、自动驾驶、模型推理、微调或AI绘画出图等任务&#xff0c;并且需要GPU资源&#xff0c;可以考虑使用UCloud云计算旗下的Compshare的GPU算力云平台。他们提供高性价比的4090 GPU&#xff0c;按时收费每卡2.6元&#xff0c;月卡只需要1.7元每小时&…

ctfshow-web入门-php特性(web132-web136)

目录 1、web132 2、web133 3、web134 4、web135 5、web136 1、web132 存在 robots.txt 访问 /admin 需要传三个参数&#xff0c;并且需要满足&#xff1a; if($code mt_rand(1,0x36D) && $password $flag || $username "admin"){if($code admin){ech…

设计模式-Git-其他

目录 设计模式&#xff1f; 创建型模式 单例模式&#xff1f; 啥情况需要单例模式 实现单例模式的关键点&#xff1f; 常见的单例模式实现&#xff1f; 01、饿汉式如何实现单例&#xff1f; 02、懒汉式如何实现单例&#xff1f; 03、双重检查锁定如何实现单例&#xff…

dsp c6657 SYS/BIOS学习笔记

1 SYS/BIOS简介 SYS/BIOS是一种用于TI的DSP平台的嵌入式操作系统&#xff08;RTOS&#xff09;。 2 任务 2.1 任务调度 SYS/BIOS任务线程有0-31个优先级&#xff08;默认0-15&#xff0c;优先级0被空闲线程使用&#xff0c;任务最低优先级为1&#xff0c;最高优先级为15&am…

Superset二次开发之筛选器native Filters 水平布局

引言 Apache Superset作为一个功能强大的开源数据探索和可视化平台&#xff0c;提供了丰富的配置选项来定制化用户体验。其中&#xff0c;HORIZONTAL_FILTER_BAR 是一个重要的配置项&#xff0c;专注于优化和改进Superset中的筛选器条布局与交互。 什么是HORIZONTAL_FILTER_B…

Linux嵌入书学习—数据结构——栈(seqstak)

一、栈&#xff1b; 定义&#xff1a; 是限定仅在表尾&#xff08;栈顶&#xff09;进行插入和删除操作的线性表 栈又称为 后进先出&#xff08;Last In First Out&#xff09; 的线性表&#xff0c;简称 LIFO 结构 栈顶&#xff08;Top&#xff09; 栈顶是栈中允许进行添加&…

开源邮箱套件介绍系列1:SOGo

项目网站&#xff1a;SOGo | Free Open Source Webmail 提示&#xff1a;如下内容大部分来自官方网站&#xff0c;通过AI智能翻译而来。 1. SOGo功能概述 SOGo提供了多种访问日历和消息数据的方式。您的用户可以使用网页浏览器、Microsoft Outlook、Mozilla Thunderbird、Ap…

jackson序列化(jackson codec)

Jackson 是一个用于 Java 平台的开源 JSON 库&#xff0c;它提供了灵活且高效的方式来处理 JSON 数据的序列化(Java对象 → JSON字符串)和反序列化(JSON 字符串→ Java对象)。 以下是 Jackson 的一些主要特点和功能&#xff1a; 高性能&#xff1a;Jackson 通过使用基于流的处理…

32单片机开发bootloader程序

一&#xff0c;单片机为什么要使用bootloader 1、使用bootloader的好处 1) 程序隔离&#xff1a;可以同时存在多个程序&#xff0c;只要flash空间够大&#xff0c;或者通过外挂flash&#xff0c;可以实现多个程序共存&#xff0c;在多个程序之间切换使用。 2&#xff09;方便程…

【树状数组】2659. 将数组清空

本文涉及知识点 树状数组 LeetCode2659. 将数组清空 给你一个包含若干 互不相同 整数的数组 nums &#xff0c;你需要执行以下操作 直到数组为空 &#xff1a; 如果数组中第一个元素是当前数组中的 最小值 &#xff0c;则删除它。 否则&#xff0c;将第一个元素移动到数组的…

监测Nginx访问日志状态码,并做相应动作

文章目录 引言I 监测 Nginx 访问日志情况,并做相应动作1.1 前提准备1.2 访问日志 502 情况,重启 bttomcat9服务1.3 其他案例:访问日志 502 情况,重启 php-fpm 服务II 将Shell 脚本check499.sh包装成systemd服务2.1 创建systemd服务2.2 配置service2.3 开机启动2.4 其他常用…

内网对抗-隧道技术篇防火墙组策略FRPNPSChiselSocks代理端口映射C2上线

知识点&#xff1a; 1、隧道技术篇-传输层-工具项目-Frp&Nps&Chisel 2、隧道技术篇-传输层-端口转发&Socks建立&C2上线Frp Frp是专注于内网穿透的高性能的反向代理应用&#xff0c;支持TCP、UDP、HTTP、HTTPS等多种协议。可以将内网服务以安全、便捷的方式通过…

垃圾桶为什么要装缓冲器?

在我们日常生活中&#xff0c;垃圾桶是一个再常见不过的物品。然而&#xff0c;您是否留意过垃圾桶盖上的缓冲器&#xff1f;这个看似不起眼的小装置&#xff0c;其实有着不可忽视的重要作用。首先&#xff0c;垃圾桶装缓冲器能够有效地降低噪音。想象一下&#xff0c;在一个安…

【文心智能体】00后疯感工牌生成器,低代码工作流的简单应用以及图片快速响应解决方案,干活满满,不容错过哦

背景 文心智能体平台&#xff0c;开启新一轮活动&#xff0c;超级创造营持续百日活动。 在AI 浪潮席卷的今天&#xff0c;如雨后春笋般丛生的 AI 应用&#xff0c;昭告着时代风口显然已随之到来。 如何能把握住时代红利&#xff0c;占据风口&#xff0c;甚至打造新风向&#x…

基于微信小程序+SpringBoot+Vue的自习室选座与门禁系统(带1w+文档)

基于微信小程序SpringBootVue的自习室选座与门禁系统(带1w文档) 基于微信小程序SpringBootVue的自习室选座与门禁系统(带1w文档) 本课题研究的研学自习室选座与门禁系统让用户在小程序端查看座位&#xff0c;预定座位&#xff0c;支付座位价格&#xff0c;该系统让用户预定座位…

人工智能:大语言模型提示注入攻击安全风险分析报告下载

大语言模型提示注入攻击安全风险分析报告下载 今天分享的是人工智能AI研究报告&#xff1a;《大语言模型提示注入攻击安全风险分析报告》。&#xff08;报告出品方&#xff1a;大数据协同安全技术国家工程研究中心安全大脑国家新一代人工智能开放创新平台&#xff09; 研究报告…

57 数据链路层

用于两个设备&#xff08;同一种数据链路节点&#xff09;之间传递 目录 对比理解“数据链路层” 和 “网络层”以太网 2.1 认识以太网 2.2 以太网帧格式MAC地址 3.1 认识MAC地址 3.2 对比理解MAC地址和IP地址局域网通信MTU 5.1 认识MTU 5.2 MTU对ip协议的影响 5.3 MTU对UDP的…