spring-boot 整合 redisson 实现延时队列(文末有彩蛋)

应用场景

通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能,比如以下这些场景:

  • 订单超时提醒
  • 收货自动确认
  • 会议提醒
  • 代办事项提醒

为什么使用延时队列

对于数据量小且实时性要求不高的需求来说,最简单的方法就是定时扫描数据库。

但是,当数量达到数百万、上千万级别且时,定时扫库就显得非常低效且消耗资源,

甚至有些时间间隔小实时性要求高的情况,上一次扫描还没结束,下一次就又开始了,

这时候如果使用延时队列就会比较合适

延时队列的几种方式:

  • Quartz 定时任务实现扫库
  • DelayQueue JDK中提供了一组实现延迟队列的API
  • Redis sorted set
  • Redis 过期键监听回调
  • RabbitMQ 死信队列
  • RabbitMQ 基于插件实现延迟队列
  • Wheel 时间轮训算法

Redisson 实现延时队列

顾名思义 Redis son 就是 Redis 的儿子,举个栗子先:

1.引入 pom

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>${lastest.version}</version>
</dependency>

2.封装一个 RedissonQueue 类

@Service
public class RedissonQueue {public static final String QUEUE = "delayQueue";// 默认超时时间,30秒public static final Integer DEFAULT_TIMEOUT = 30;@Resourceprivate RedissonClient redissonClient;// 加入任务并设置到期时间public void offer(String taskId, Integer timeout) {RDelayedQueue<String> delayedQueue = delayedQueue();delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);}// 移除任务public void remove(String taskId) {RDelayedQueue<String> delayedQueue = delayedQueue();delayedQueue.removeIf(messageId -> messageId.equals(taskId));}// 任务列表public RDelayedQueue<String> delayedQueue() {RBlockingDeque<String> blockingDeque = blockingDeque();return redissonClient.getDelayedQueue(blockingDeque);}public RBlockingDeque<String> blockingDeque() {return redissonClient.getBlockingDeque(QUEUE);}public boolean isShutdown() {return redissonClient.isShutdown();}public void shutdown() {redissonClient.shutdown();}}

3.交给 Spring 管理

@Slf4j
@Service
public class RedissonService implements ApplicationRunner {@Resourceprivate RedissonQueue redissonQueue;@Resource(name = "threadPoolTaskExecutor")private ThreadPoolTaskExecutor executor;@Overridepublic void run(ApplicationArguments args) {RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();executor.execute(() -> {while (true) {if (redissonQueue.isShutdown()) {return;} else {String messageId = null;try {messageId = blockingDeque.take();} catch (InterruptedException e) {log.warn("RedissonConsumer error:{}", e.getMessage());}if (!Objects.isNull(messageId) && !messageId.isEmpty()) {log.warn("timeout messageId : {}", messageId);}}}});}// 初始化,启动服务就执行一次@PostConstructpublic void init() {redissonQueue.delayedQueue();}@PreDestroypublic void shutdown() {redissonQueue.shutdown();}}

4.测试接口

@Operation(summary = "添加任务", description = "添加任务")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,@RequestParam(value = "timeout", required = false) Integer timeout) {taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;redissonQueue.offer(taskId, timeout);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}@Operation(summary = "移除任务", description = "移除任务")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {redissonQueue.remove(taskId);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

5.测试结果

添加10个任务

在这里插入图片描述

删除第1个任务

在这里插入图片描述

可以看到第一个任务删除后没有被执行(没有设置到期时间,默认为30秒到期)

在这里插入图片描述

实现原理

  • redisson_delay_queue_timeout:delayQueue,sorted set 数据类型,存放所有延迟任务,按延迟任务的到期时间戳(提交任务时间戳 +
    延迟时间)排序,所以列表最前面第一个元素就是整个延迟队列中最早被执行的任务。
  • redisson_delay_queue:delayQueue,list 数据类型,也是存放所有任务。
  • delayQueue,list 数据类型,被称为目标队列,这个里面存放的任务都是已经到延迟时间的,可以被消费者获取的任务,所以上面示例中
    RBlockingQueue 的 take 方法是从此目标队列中获取任务的。
  • redisson_delay_queue_channel:delayQueue,是一个 channel,用来通知客户端开启一个延迟任务
  • 生产者提交任务时将任务放到 redisson_delay_queue_timeout:delayQueue 中,提交任务的时间戳+延迟时间
  • 客户端会有一个延迟任务,这个延迟任务会向 Redis Server 发送一段 lua 脚本,Redis 执行 lua 脚本中的命令,此操作是原子性的

lua 脚本主要干两件事

  • 将到了延迟时间的任务从 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 这个目标队列
  • 获取到 redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务的到期时间戳,发布到 redisson_delay_queue_channel:
    delayQueue channel 中

当客户端监听到 redisson_delay_queue_channel:delayQueue 这个 channel 的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到期时间任务的到期时间戳)当前时间戳
这个时间其实也就是 redisson_delay_queue_channel:delayQueue 中最早到期时间的任务的剩余的延迟时间。
一旦时间来到最早到期时间任务的到期时间戳,redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务已经到期,客户端的延迟任务也同时到期,
于是开始执行 lua 脚本操作,及时将到期任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期任务的到期时间戳到 channel
中,
如此循环运行下去,保证 redisson_delay_queue_timeout:delayQueue 中到期数据能及时放到目标队列中。
这里存在一个特殊情况,需要项目启动时就执行一次延时队列。因为由于没有客户端延迟任务的执行,
可能会出现 redisson_delay_queue_timeout:delayQueue 队列中有到期但是没有被放到目标队列的可能,启动就执行一次是为了保证到期的数据能被及时放到目标队列中。

结论

  • Redisson 方案理论上没有延迟,但当消息数量剧增,消费者消费缓慢这种情况下,可能会导致延迟任务消费的延迟。

  • 消息丢失问题 Redisson 方案最大程度上减轻消息丢失的可能性,因为所有任务都是存在 list 和 sorted set 两种数据类型中,Redis
    有持久化机制。除非整个 redis 集群宕机,可能丢失一小部分数据。

  • 广播任务问题,是不会出现的,因为每个客户端都是从同一个目标队列中获取任务。

Redisson 这种实现方案是比较合适且靠谱的,一般中小型项目建议用 Redisson 实现延迟队列,规模较大的项目直接上 MQ。

整合DEMO仓库地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/380432.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电机泵盖机器人打磨去毛刺,选德国进口高精度主轴

机器人打磨去毛刺该如何选择主轴呢&#xff1f;首先我们需要考虑的是工件的材质&#xff0c;电机泵盖通常使用铸铁、不锈钢、合金钢等金属材质&#xff0c;因此这类保持的硬度较高&#xff0c;一般会选择功率、扭矩较大的德国进口高精度主轴Kasite 4060 ER-S。 Kasite 4060 ER-…

【Espressif-ESP32S3】【VScode】安装【ESP-IDF】插件及相关工具链

一、ESP-IDF简介 二、VScode安装ESP-IDF插件 三、安装ESP-IDF、ESP-IDF-Tools以及相关工具链 四、测试例程&编译烧录 五、IDF常用指令 资料下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/15Q2rl2jpIaKfj5rATkYE6g?pwdGLNG 提取码&#xff1a;GLNG 一、ESP-…

浏览器缓存:强缓存与协商缓存实现原理有哪些?

1、强缓存&#xff1a;设置缓存时间的&#xff0c;那么在这个时间内浏览器向服务器发送请求更新数据&#xff0c;但是服务器会让其从缓存中获取数据。 可参考&#xff1a;彻底弄懂强缓存与协商缓存 - 简书 2、协商缓存每次都会向浏览器询问&#xff0c;那么是怎么询问的呢&…

「MQTT over QUIC」与「MQTT over TCP」与 「TCP 」通信测试报告

一、结论 在实车5G测试中「MQTT Over QUIC」整体表现优于「TCP」&#xff0c;可在系统架构升级时采用MQTT Over QUIC替换原有的TCP通讯&#xff1b;从实现原理上基于QUIC比基于TCP在弱网、网络抖动导致频繁重连场景延迟更低。 二、测试方案 网络类型&#xff1a;实车5G、实车…

【Apache Doris】周FAQ集锦:第 14 期

【Apache Doris】周FAQ集锦&#xff1a;第 14 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目&#xff01; 在这个栏目中&#xff0c;每周将筛选社区反馈的热门问题和话题&#xff0c;重点回答并进行深入探讨。旨在为广大用户…

【VUE】v-if和v-for的优先级

v-if和v-for v-if 用来显示和隐藏元素 flag为true时&#xff0c;dom元素会被删除达到隐藏效果 <div class"boxIf" v-if"flag"></div>v-for用来进行遍历&#xff0c;可以遍历数字对象数组&#xff0c;会将整个元素遍历指定次数 <!-- 遍…

Django 执行原生SQL

在Django中&#xff0c;你可以使用Raw SQL queries来执行原生的SQL查询。这对于需要进行复杂查询或Django的ORM无法满足的查询非常有用。 1&#xff0c;添加模型 Test/app11/models.py from django.db import modelsclass Post(models.Model):title models.CharField(max_le…

前端Vue组件技术实践:构建自定义动态宫格菜单按钮组件

随着前端技术的不断发展&#xff0c;复杂度和开发难度也随之增加。传统的整体式开发方式已经难以满足现代前端应用的需求&#xff0c;特别是在业务场景复杂、产品迭代频繁的情况下。组件化开发作为一种有效的解决方案&#xff0c;通过拆分和组合独立的组件&#xff0c;实现了单…

【Nacos】Nacos服务注册与发现 心跳检测机制源码解析

在前两篇文章&#xff0c;介绍了springboot的自动配置原理&#xff0c;而nacos的服务注册就依赖自动配置原理。 Nacos Nacos核心功能点 服务注册 :Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务&#xff0c;提供自身的元数据&#xff0c;比如ip地址、端…

19.x86游戏实战-创建MFC动态链接库

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 工具下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1rEEJnt85npn7N38Ai0_F2Q?pwd6tw3 提…

HarmonyOS鸿蒙- 跳转系统应用能力

一、通过弹窗点击设置跳转系统应用能力 1、 自定义弹窗效果图 2、 自定义弹窗代码 import { common, Want } from kit.AbilityKit; import { BusinessError } from kit.BasicServicesKit;export function alertDialog() {AlertDialog.show({title: ,message: 当前功能依赖定位…

HarmonyOS ArkUi @CustomDialog 和promptAction.openCustomDialog踩坑以及如何选择

CustomDialog 内使用Link&#xff0c;如何正常使用 错误使用方式&#xff1a; 定义一个函数&#xff0c;在函数内使用弹窗&#xff0c;如下面代码showDialog&#xff1a; 这种使用方式&#xff0c;无法在自定义的CustomDialog内使用 Link&#xff0c;进行父子双向绑定&#x…

C++基础语法:STL之容器(4)--序列容器中的list(一)

前言 "打牢基础,万事不愁" .C的基础语法的学习 引入 序列容器的学习.以<C Prime Plus> 6th Edition(以下称"本书")内容理解 本书中容器内容不多只有几页.最好是有数据结构方面的知识积累,如果没有在学的同时补上. 序列容器回顾:序列容器内元素按严格…

css前端面试题

1.什么是css盒子模型&#xff1f; 盒子模型包含了元素内容&#xff08;content&#xff09;、内边距&#xff08;padding&#xff09;、边框&#xff08;border&#xff09;、外边距&#xff08;margin&#xff09;几个要素。 标准盒子模型和IE盒子模型的区别在于其对元素的w…

食家巷香豆烤馍:传统美味,唇齿留香

你是否在寻找一种能唤醒童年记忆的美食&#xff1f;是否又在渴望一种既能充饥又能享受的美味&#xff1f;那么&#xff0c;食家巷的香豆烤馍&#xff0c;一定能满足你的味蕾。 香豆烤馍&#xff0c;以优质的原料、精致的制作和独特的口味&#xff0c;让食家巷香豆烤馍在众…

[React 进阶系列] useSyncExternalStore hook

[React 进阶系列] useSyncExternalStore hook 前情提要&#xff0c;包括 yup 的实现在这里&#xff1a;yup 基础使用以及 jest 测试 简单的提一下&#xff0c;需要实现的功能是&#xff1a; yup schema 需要访问外部的 storage外部的 storage 是可变的React 内部也需要访问同…

数据库第二次作业

1.建立数据库 2.插入数据 3.完成查询 &#xff08;1&#xff09;、显示所有职工的基本信息。 &#xff08;2&#xff09;、查询所有职工所属部门的部门号&#xff0c;不显示重复的部门号。 &#xff08;3&#xff09;、求出所有职工的人数。 &#xff08;4&#xff09;、列…

iPhone手机上备忘录怎么设置字数显示

在日常生活和工作中&#xff0c;我经常会使用iPhone的备忘录功能来记录一些重要的想法、待办事项或临时笔记。备忘录的便捷性让我可以随时捕捉灵感&#xff0c;但有时候&#xff0c;我也会苦恼于不知道自己记录了多少内容&#xff0c;尤其是在需要控制字数的时候。 想象一下&a…

NAS新品“翻车”后,绿联科技要上市了

在消费电子市场回暖的东风中&#xff0c;又一消费电子知名企业登陆A股。 近日&#xff0c;深圳市绿联科技股份有限公司&#xff08;下称“绿联科技”&#xff09;开启申购&#xff0c;将在创业板上市。本次上市&#xff0c;绿联科技的发行价为21.21元/股&#xff0c;发行数量为…

鸿蒙OpenHarmony Native API【HiLog】

HiLog Overview Description: HiLog模块实现日志打印功能。 开发者可以通过使用这些接口实现日志相关功能&#xff0c;输出日志时可以指定日志类型、所属业务领域、日志TAG标识、日志级别等。 syscap SystemCapability.HiviewDFX.HiLog Since: 8 Summary Files File …