基于SpringBoot实现MySQL与Redis的数据最终一致性

问题场景

在并发场景下,MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:

  • 强一致性:写入何值,读出何值,但在实现中,性能较差。
  • 弱一致性:写入新数据后,承诺在某个时间级别(分、秒、毫秒)后,达到数据一致。
  • 最终一致性:写入新数据后,承诺在规定时间内达到数据一致。

解决方案

强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。

弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。

最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。

在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。

Cache Aside

  1. 该方案在读取数据库时,首先从缓存中查询数据库:
    • 如果缓存中存在数据,则直接返回给应用程序。
    • 如果缓存中不存在数据,则从数据库中读取数据,并将数据存储到缓存中,然后返回给应用程序。
  1. 写入数据时,先更数据库的数据,当数据库更新成功后,再删除缓存中的数据。

Cache Aside注意事项
  • 缓存失效:缓存中的数据可能会过期或失效,需要考虑设置合适的缓存过期时间,或使用合适的缓存失效策略(如LRU)来管理缓存中的数据。
  • 缓存穿透:当请求查询一个不存在的数据时,会导致缓存层无法命中,从而直接访问数据库。为了避免缓存穿透问题,可以使用空值缓存或布隆过滤器等技术来减轻数据库的负载。

综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。

Cache Aside demo

基于Cache Aside实现点赞功能。

实体类信息

public class Like {private String postId;private int likeCount;// 构造函数、getter和setter方法
}

逻辑层

@Service
public class LikeService {private final LikeRepository likeRepository;private final RedisUtils redisUtils;public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) {this.likeRepository = likeRepository;this.redisUtils = redisUtils;}public Like getLikeInfo(String postId) {String cacheKey = "like:" + postId;// 从缓存中获取点赞信息Like like = (Like) redisUtils.get(cacheKey);// 如果缓存中不存在,则从持久层(数据库)获取if (like == null) {like = likeRepository.findByPostId(postId);// 如果数据库中存在数据,则保存到缓存中if (like != null) {redisUtils.set(cacheKey, like);}}// 如果点赞信息为空,则初始化为0if (like == null) {like = new Like(postId, 0);}return like;}public void addLike(String postId) {String cacheKey = "like:" + postId;// 在持久层(数据库)新增点赞信息Like like = likeRepository.findByPostId(postId);if (like == null) {like = new Like(postId, 1);} else {like.setLikeCount(like.getLikeCount() + 1);}likeRepository.save(like);// 更新缓存中的数据redisUtils.set(cacheKey, like);}
}

canal

引用canal官方说明:

canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

前置知识:MySQL主从复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
环境搭建

需要的开发环境:

  • MySQL
  • Redis
  • Canal

特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。

那么,麻烦ChatGPT写一个Docker-Compose文件吧:

  • version请按本地安装的Docker-Compose版本定义。
  • Docker-Compose安装请自行查询。
version: '2.4'services:mysql:image: mysql:8.0container_name: mysqlrestart: falseenvironment:MYSQL_ROOT_PASSWORD: rootports:- "33060:3306"volumes:- ./mysql-data:/var/lib/mysqlcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: falseports:- "11111:11111"- "11112:11112"depends_on:- mysqlenvironment:- canal.destinations=example- canal.instance.mysql.slaveId=1234- canal.instance.master.address=mysql:3306- canal.instance.dbUsername=root- canal.instance.dbPassword=root- canal.instance.connectionCharset=UTF-8- canal.instance.tsdb.enable=false- canal.instance.gtidon=false- canal.instance.filter.regex=.*- canal.instance.filter.black.regex=mysql\.slave_.*redis:image: redis:latestrestart: alwaysports:- 6379:6379volumes:- ./redis_data:/data

将文件命名为:docker-compose.yml,开始安装。

docker-compose up -d

本案例使用balance余额表来演示,数据库表设计如下:

CREATE TABLE `balance` (`id` varchar(50) NOT NULL COMMENT '主键',`account` varchar(50) NOT NULL COMMENT '账户',`amount` decimal(10,2) NOT NULL COMMENT '金额',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
COMMENT='余额表';
开发环境
  • JDK 17
  • SpringBoot 3.1.2
  • MyBatis-Plus 3.5.3.1
  • druid
  • lettuce

开发环境根据您的实际需要选择即可。

环境启动后,进入编码阶段。

/*** @author: liu_pc* Date:        2023/8/25* Description: 余额信息变更Redis变成处理类* Version:     1.0*/
@Component
public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable {private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class);private final RedisUtils redisUtils;private final CanalConfig canalConfig;private final Executor executor;@Value("${canal.server.open}")private boolean open;@Autowiredpublic BalanceRedisProcessorService(RedisUtils redisUtils,CanalConfig canalConfig,@Qualifier("ownThreadPoolExecutor") Executor executor) {this.redisUtils = redisUtils;this.canalConfig = canalConfig;this.executor = executor;}@PostConstructpublic void init() {Map<String, String> mainMdcContext = Maps.newHashMap();mainMdcContext.put("canal-thread", "balance-redis-processor-service");MDC.setContextMap(mainMdcContext);executor.execute(this);logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动");}@Overridepublic void run() {CanalConnector canalConnector = canalConfig.canalConnector();canalConnector.connect();// 回滚到未进行ack的地方canalConnector.rollback();try {while (open) {// 获取数据 每次获取一百条改变数据Message message = canalConnector.getWithoutAck(100);//获取这条消息的idlong batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);continue;}// 处理数据for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE) {for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {List<CanalEntry.Column> columns = rowData.getAfterColumnsList();String tableName = entry.getHeader().getTableName();// 判断是否是 Balance 表的 amount 字段变更if ("balance".equals(tableName)) {StringBuilder redisKey = new StringBuilder("balance:");for (CanalEntry.Column column : columns) {logger.info("Balance changed in 'balance' dataInfo: {}", column);if ("id".equals(column.getName())) {String changeId = column.getValue();logger.info("当前变更id为:{}", changeId);redisKey.append(changeId);}if ("amount".equals(column.getName())) {String changeValue = column.getValue();logger.info(changeValue);redisUtils.set(redisKey.toString(), changeValue);}}}}}}}// 确认消费完成这条消息canalConnector.ack(message.getId());logger.info("消费成功");}} catch (Exception e) {logger.warn("canal-消费失败");} finally {// 关闭连接canalConnector.disconnect();}}
}
测试

使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:

Redis数据:

完成。

我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。

mdc_logback

同时,如果您对笔者其他文章感兴趣,可以扫一扫关注笔者的公众号:种颗代码技术树

公众号文章更新更及时,以及一些程序员周边相关更新。

感谢您阅读到这里,不胜感激。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/107883.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wazuh--sql检测

官网&#xff1a;Virtual Machine (OVA) - Installation alternatives Wazuh(Wazuh The Open Source Security Platform)&#xff1a;是一整套基于ossec安全检测工具和EFK日志工具构成的终端安全管理工具。不管是将其分类至HIDS&#xff0c;还是EDR&#xff0c;它都是一套通过…

QCC_BES 音频重采样算法实现

+V hezkz17进数字音频系统研究开发交流答疑群(课题组) 这段代码是一个用于将音频数据进行立体声重采样的函数。以下是对代码的解读: 函数接受以下参数: pcm_buf:16位有符号整型的音频缓冲区,存储了输入的音频数据。pcm_len:音频缓冲区的长度。mic1:16位有符号整型的音频…

本地私有仓库、harbor私有仓库部署与管理

本地私有仓库、harbor私有仓库部署与管理 一、本地私有仓库1.本地私有仓库简介2.搭建本地私有仓库3.容器重启策略介绍 二、harbor私有仓库部署与管理1.什么是harbor2.Harbor的特性3.Harbor的构成4.harbor部署及配置5.客户端测试 三、Harbor维护1.创建2.普通用户操作私有仓库3.日…

不用循环数组,js+html实现贪吃蛇

功能描述&#xff1a;每走10步随机改变一个方方向&#xff0c;当键盘按下方向键 w,s,a,d时&#xff0c;使用键盘方向控制蛇的移动&#xff0c;蛇头每撞到一次自身时改变屏幕颜色&#xff0c;蛇头碰到边界时从另一边回来。 实现思路&#xff1a;用个30大小的数组存放每个结点&a…

学习率调整策略

学习率是可以控制更新的步伐的。 我们在训练模型的时候&#xff0c;一般开始的时候学习率会比较大&#xff0c;这样可以以一个比较快的速度到达最优点的附近&#xff0c;然后再把学习率降下来&#xff0c; 缓慢的去收敛到最优值。学习率前期要大&#xff0c;后期要小 在学习学…

命令行环境

sleep 20 延迟20秒 这个是操作系统的信号机制 ctrl z ^z可以恢复 jobs 可以查看 终端运行工作列表&#xff0c;bg可以将暂停的作业重新运行 通过kill暂停作业 通过 -KILL 之后才可以将 -HUP 作业悬挂起来 终端复路多用 会话 ^a p 上一个会话 ^ a n 下一个会话 别名 左右不能…

学习中ChatGPT的17种用法

ChatGPT本质上是一个聊天工具&#xff0c;旧金山的人工智能企业OpenAI于2022年11月正式推出ChatGPT。那么&#xff0c;ChatGPT与其他人工智能产品相比有什么特殊呢&#xff1f; 它除了可以回答结构性的问题&#xff0c;例如语法修正、翻译和查找答案之外。最关键的是它能够去解…

LeetCode 1267. 统计参与通信的服务器

【LetMeFly】1267.统计参与通信的服务器 力扣题目链接&#xff1a;https://leetcode.cn/problems/count-servers-that-communicate/ 这里有一幅服务器分布图&#xff0c;服务器的位置标识在 m * n 的整数矩阵网格 grid 中&#xff0c;1 表示单元格上有服务器&#xff0c;0 表…

UE4 材质学习笔记

CheapContrast与CheapContrast_RGB都是提升对比度的&#xff0c;一个是一维输入&#xff0c;一个是三维输入&#xff0c;让亮的地方更亮&#xff0c;暗的地方更暗&#xff0c;不像power虽然也是提升对比度&#xff0c;但是使用过后的结果都是变暗或者最多不变&#xff08;值为1…

【Java基础增强】Stream流

1.Stream流 1.1体验Stream流【理解】 案例需求 按照下面的要求完成集合的创建和遍历 创建一个集合&#xff0c;存储多个字符串元素 把集合中所有以"张"开头的元素存储到一个新的集合 把"张"开头的集合中的长度为3的元素存储到一个新的集合 遍历上一步得…

HDLBits-Verilog学习记录 | Verilog Language-Modules(1)

文章目录 20.Module21.Connecting ports by position | Moudle pos22.Connecting ports by name | Module name23.Three modules | Module shift24.Modules and vectors | Module shift8 20.Module practice:You may connect signals to the module by port name or port posi…

ubuntu 22.04 LTS openai triton 安装

第一种方法&#xff1a; pip install triton 第二种方法&#xff0c;安装最新的版本&#xff1a; pip install -U --index-url https://aiinfra.pkgs.visualstudio.com/PublicPackages/_packaging/Triton-Nightly/pypi/simple/ triton-nightly 第三种方法&#xff1a; git c…

【Unity细节】Unity制作汽车时,为什么汽车会被弹飞?为什么汽车会一直抖动?

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 &#x1f636;‍&#x1f32b;️收录于专栏&#xff1a;unity细节和bug &#x1f636;‍&#x1f32b;️优质专栏 ⭐【…

【生态经济学】利用R语言进行经济学研究技术——从数据的收集与清洗、综合建模评价、数据的分析与可视化、因果推断等方面入手

查看原文>>>如何快速掌握利用R语言进行经济学研究技术——从数据的收集与清洗、综合建模评价、数据的分析与可视化、因果推断等方面入手 近年来&#xff0c;人工智能领域已经取得突破性进展&#xff0c;对经济社会各个领域都产生了重大影响&#xff0c;结合了统计学、…

Java入职第十一天,深入了解静态代理和动态代理(jdk、cglib)

一、代理模式 一个类代表另一个类去完成扩展功能,在主体类的基础上,新增一个代理类,扩展主体类功能,不影响主体,完成额外功能。比如买车票,可以去代理点买,不用去火车站,主要包括静态代理和动态代理两种模式。 代理类中包含了主体类 二、静态代理 无法根据业务扩展,…

LeetCode 43题:字符串相乘

题目 给定两个以字符串形式表示的非负整数 num1 和 num2&#xff0c;返回 num1 和 num2 的乘积&#xff0c;它们的乘积也表示为字符串形式。 注意&#xff1a;不能使用任何内置的 BigInteger 库或直接将输入转换为整数。 示例 1: 输入: num1 "2", num2 "3&…

0201hdfs集群部署-hadoop-大数据学习

文章目录 1 前言2 集群规划3 hadoop安装包上传与安装3.1 上传解压 4 hadoop配置5 从节点同步和环境变量配置6 创建用户7 集群启动8 问题集8.1 Invalid URI for NameNode address (check fs.defaultFS): file:/// has no authority. 结语 1 前言 下面我们配置下单namenode节点h…

C# 实现 国密SM4/ECB/PKCS7Padding对称加密解密

C# 实现 国密SM4/ECB/PKCS7Padding对称加密解密&#xff0c;为了演示方便本问使用的是Visual Studio 2022 来构建代码的 1、新建项目&#xff0c;之后选择 项目 鼠标右键选择 管理NuGet程序包管理&#xff0c;输入 BouncyCastle 回车 添加BouncyCastle程序包 2、代码如下&am…

4.19 20

服务端没有 listen&#xff0c;客户端发起连接建立&#xff0c;会发生什么&#xff1f; 服务端如果只 bind 了 IP 地址和端口&#xff0c;而没有调用 listen 的话&#xff0c;然后客户端对服务端发起了连接建立&#xff0c;服务端会回 RST 报文。 没有 listen&#x…

如何开发一款唯一艺术平台 区块链 /数字藏品

艺术作品是人类文化的瑰宝&#xff0c;而艺术平台则是连接艺术家与观众的桥梁。如何开发一款独一无二的艺术平台&#xff0c;既要满足专业艺术作品展示的要求&#xff0c;又要提供深度思考的空间&#xff0c;这是我们所面临的挑战。本文将从专业性、思考深度和逻辑性等多个方面…