基于Rabbitmq和Redis的延迟消息实现

1 基于Rabbitmq延迟消息实现

支付时间设置为30,未支付的消息会积压在mq中,给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理
在这里插入图片描述
在这里插入图片描述

1.1定义延迟消息实体

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体

@Data
public class MultiDelayMessage<T> {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

1.2 定义常量,用于记录交换机、队列、RoutingKey等常量

package com.hmall.trade.constants;public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}

1.3 抽取mq配置到nacos中

spring:rabbitmq:host: ${hm.mq.host:192.168.150.101} # 主机名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机username: ${hm.mq.un:hmall} # 用户名password: ${hm.mq.pw:123} # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

1.4 定义消息处理器

使用延迟消息处理器发送消息
在这里插入图片描述
在这里插入图片描述

1.5 消息监听与延迟消息再次发送

在这里插入图片描述
在这里插入图片描述

2 延迟消息实现

DelayQueue:基于JVM,保存在内存中,会出现消息丢失
在这里插入图片描述

Rabbitmq的延迟任务:基于TTL和死信交换机
在这里插入图片描述

2.1 redis的延迟任务:基于zset的去重和排序功能

在这里插入图片描述
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑

2.为什么使用redis中的两种数据类型,list和zset?

  • 原因一: list存储立即执行的任务,zset存储未来的数据
  • 原因二:任务量过大以后,zset的性能会下降

时间复杂度:执行时间(次数) 随着数据规模增长的变化趋势

  • 操作redis中的list命令LPUSH: 时间复杂度: O(1)
  • 操作redis中的zset命令zadd: 时间复杂度: (Mlog(n))

2.2 设计mybatis映射实体类:

/*** 版本号,用乐观锁*/@Versionprivate Integer version;乐观锁支持:
/*** mybatis-plus乐观锁支持* @return*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}

2.3 创建task类,用于接收添加任务的参数

@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}

2.4 添加任务

2.4.1 添加任务到数据库中

addTaskToDb(task);修改任务表和日志表

@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中** @param task* @return*/private boolean addTaskToDb(Task task) {boolean flag = false;try {//保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}
2.4.2 添加任务到redis

addTaskToCache(task);判断任务执行之间是否在现在还是未来五分钟内

@Autowiredprivate CacheService cacheService;/*** 把任务添加到redis中** @param task*/private void addTaskToCache(Task task) {String key = task.getTaskType() + "_" + task.getPriority();//获取5分钟之后的时间  毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于等于当前时间,存入listif (task.getExecuteTime() <= System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));} else if (task.getExecuteTime() <= nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());}}

2.5 删除任务

1、删除数据库任务表,更改日志表任务状态
2、删除list或者zset中的任务

在TaskService中添加方法

/*** 取消任务* @param taskId        任务id* @return              取消结果*/
public boolean cancelTask(long taskId);
/*** 取消任务* @param taskId* @return*/
@Override
public boolean cancelTask(long taskId) {boolean flag = false;//删除任务,更新日志Task task = updateDb(taskId,ScheduleConstants.EXECUTED);//删除redis的数据if(task != null){removeTaskFromCache(task);flag = true;}return false;
}/*** 删除redis中的任务数据* @param task*/
private void removeTaskFromCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}
}/*** 删除任务,更新任务日志状态* @param taskId* @param status* @return*/
private Task updateDb(long taskId, int status) {Task task = null;try {//删除任务taskinfoMapper.deleteById(taskId);TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error("task cancel exception taskid={}",taskId);}return task;}

2.6 消费任务

1、删除list中的数据
2、使用updateDB删除任务表、跟新日志表

在TaskService中添加方法

/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/
public Task poll(int type,int priority);

实现

/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;
}

2.7 未来定时任务更新-reids管道

减少与redis的交互次数
1、在引导类中添加开启任务调度注解:@EnableScheduling
2、在service中添加定时任务 @Scheduled(cron = “0 */1 * * * ?”),每分钟一次

@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}
public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;String[] strings = values.toArray(new String[values.size()]);stringRedisConnection.rPush(topic_key,strings);stringRedisConnection.zRem(future_key,strings);return null;}});return objects;}

总结

1、使用rebbitmq使用的场景是在支付和订单微服务中,用于实现消息可以延迟30分钟付款的功能。并借用该中间件的插件实现支付的异步下单功能,并可以快速处理前几分钟,防止消息堆积
2、使用redis是基于zset的去重和排序功能,相当于将一定数据的保存在数据库,使用定时任务同步数据库符合五分钟的任务到zset中,然后,在在zest中定时更新可以运行的任务到list集合中,相当于实现了延迟功能和缓存功能。
3、第二种还可以扩展为将rabbitmq中等待时间较长的数据存到redis中,然后定时的去同步redis中的数据到数据库中,防止消息堆积。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/192985.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

应用协议安全:Rsync-common 未授权访问.

应用协议安全&#xff1a;Rsync-common 未授权访问. Rsync 是 Linux 下一款数据备份工具&#xff0c;支持通过 rsync 协议、ssh 协议进行远程文件传输。其中 rsync 协议默认监听 873 端口&#xff0c;如果目标开启了 rsync 服务&#xff0c;并且没有配置 ACL 或访问密码&#…

delete 与 truncate 命令的区别

直接去看原文 原文链接:【SQL】delete 与 truncate 命令的区别_truncate和delete的区别-CSDN博客 -------------------------------------------------------------------------------------------------------------------------------- 1. 相同点 二者都能删除表中的数据…

uniapp插件开发

安装android studio&#xff1a;安装目录下bin下的此文件&#xff0c;是用来修改分配给android studio的占用内存。 Android 11足够用。 创建新项目&#xff1a; 目录结构介绍&#xff1a; UI组件介绍&#xff1a;在设计程序界面时可以使用可视化拖拽的方式&#xff0c;没有必要…

【Apifox】国产测试工具雄起

在开发过程中&#xff0c;我们总是避免不了进行接口的测试&#xff0c; 而相比手动敲测试代码&#xff0c;使用测试工具进行测试更为便捷&#xff0c;高效 今天发现了一个非常好用的接口测试工具Apifox 相比于Postman&#xff0c;他还拥有一个非常nb的功能&#xff0c; 在接…

Maven 的 spring-boot-maven-plugin 红色报错

1、想要处理此情况&#xff0c;在工具下面加上指定的版本号。 2、给自己的maven的setting文件加工一下。 <mirrors><!--阿里云镜像1--><mirror><id>aliyunId</id><mirrorOf>central</mirrorOf><name>aliyun maven</name>…

数据存储和内存对齐

校内课复习笔记 非数值数据表示 在计算机中&#xff0c;只有01序列&#xff0c;这串01序列是什么意思&#xff0c;由人为定义。 西文字符 在ASCII码中&#xff0c;通过一个65的偏移量&#xff0c;使得一部分无符号数指向A-Za-z。 在C语言中&#xff0c;通过char类型的转换规范…

算法萌新闯力扣:同构字符串

力扣题&#xff1a;同构字符串 开篇 对于字符串相关的题目&#xff0c;哈希表经常会使用到&#xff0c;这道题更是如此&#xff0c;还用到了两个哈希表。拿下它&#xff0c;你对字符串题目的理解就会更上一层楼。 题目链接:205.同构字符串 题目描述 代码思路 看完题目后&a…

C语言--五子棋项目【图文详解 经典】

今天小编带领大家学一学C语言入门必写的五子棋项目&#xff0c;题目非常经典&#xff0c;值得一学。 目录 一.目标效果 二.五子棋的元素 1.棋子 2.棋盘 三,需要准备的工具 四.具体内容 1.加载背景图片 2.画横线与竖线 3. 画小黑点 4.获取鼠标消息 5.画棋子 6.如何判断比…

【ERROR】ERR_PNPM_NO_IMPORTER_MANIFEST_FOUND No package.json

1、报错 启动项目的时候&#xff0c;报这个错误&#xff0c;是因为根目录错误&#xff0c;查看&#xff0c;根目录是否错误。

Bobo Python 学习笔记

安装 Bobo 可以通过通常的方式安装&#xff0c;包括使用setup.py install 命令。当然&#xff0c;您可以使用Easy Install、Buildout或pip。 安装bobo Collecting boboDownloading bobo-2.4.0.tar.gz (17 kB) Collecting WebObDownloading WebOb-1.8.7-py2.py3-none-any.whl…

Python爬取股票交易数据代码示例及可视化展示。

文章目录 前言一、开发环境二、第三方模块三、爬虫案例步骤四、爬虫程序全部代码1.分析网页2.导入模块3.请求数据4.解析数据5.翻页6.保存数据 五、实现效果六、数据可视化全部代码1.导入数据2.读取数据3.可视化图表4.效果展示关于Python技术储备一、Python所有方向的学习路线二…

创建一个前后端分离项目:Vue+SpringBoot

这是一个基于SpringBootVue3的前后端分离的项目&#xff0c;麻雀虽小&#xff0c;五脏俱全&#xff0c;开箱即用&#xff01; 这先粗略描述一下它的前后端。JNPF开发平台的前端采用的是Vue.js&#xff0c;这是一种流行的前端JavaScript框架&#xff0c;用于构建用户界面。 后端…

第三方软件测试服务有哪些形式?选择时如何避雷?

高新技术的快速发展&#xff0c;人们对于软件产品越来越依赖&#xff0c;因此软件质量对于软件企业来说至关重要。产品质量的好坏需要通过检测才得知&#xff0c;软件企业为了获得更客观公正的检验结果&#xff0c;会将软件测试交由第三方软件测试服务机构进行?那么有哪些形式…

【milkv】0、duo编译环境搭建

一、开发资料整理 Docker https://hub.docker.com/repository/docker/dreamcmi/cv1800-docker/general GitHub https://github.com/milkv-duo/duo-buildroot-sdk CV181x/CV180x MMF SDK 开发文档汇总 https://developer.sophgo.com/thread/471.html cv181x芯片使用的交叉…

盘点72个ASP.NET Core源码Net爱好者不容错过

盘点72个ASP.NET Core源码Net爱好者不容错过 学习知识费力气&#xff0c;收集整理更不易。 知识付费甚欢喜&#xff0c;为咱码农谋福利。 链接&#xff1a;https://pan.baidu.com/s/1nlQLLly_TqGrs5O8eOmZjA?pwd8888 提取码&#xff1a;8888 项目名称 (Chinese) 物业收费…

【git】解决git报错:ssh:connect to host github.com port 22: Connection timed out 亲测有效

如题&#xff0c;git使用中突然报错 ssh:connect to host github.com port 22: Connection timed out 通过查阅各种资料&#xff0c;得知原因可能是由于电脑的防火墙或者其他网络原因导致ssh连接方式 端口22被封锁。 解决方法 一&#xff1a;抛弃ssh连接方式&#xff0c;使…

图解系列--密码

1.概念 _1.对称密码与公钥密码 对称密码是指在加密和解密时使用同一密钥的方式。 公钥密码则是指在加密和解密时使用不同密钥的方式。因此&#xff0c;公钥密码又称为非对称密码。 _2.混合密码系统 对称密码和公钥密码结合起来的密码方式 _3.散列值 散列值就是用单向散列函数计…

使用c++程序,实现图像平移变换,图像缩放、图像裁剪、图像对角线镜像以及图像的旋转

数字图像处理–实验三A图像的基本变换 实验内容 A实验&#xff1a; &#xff08;1&#xff09;使用VC设计程序&#xff1a;实现图像平移变换&#xff0c;图像缩放、图像裁剪、图像对角线镜像。 &#xff08;2&#xff09;使用VC设计程序&#xff1a;对一幅高度与宽度均相等的…

LOWORD, HIWORD, LOBYTE, HIBYTE的解释

文章目录 实验结论 实验 int 类型大小正常为4Byte 以小端序来看 0x12345678在内存中的存储为 0x78 0x56 0x34 0x120x78在低地址&#xff0c;0x12在高地址 程序输出 #include <stdio.h> #include <string.h> #include<windows.h>int main() {int a 0x12345…