redisson的延时队列机制简述

概述

业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq或者rocketmq的延迟消息;
但是系统中不一定集成了mq,但为了控制分布式下的并发,一般redis都是有集成的;
rediskey过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;

那么用redisson的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List中获取任务;

redisson中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;

注意点,在消费者监听处如果使用thread相关操作因为redisson的默认线程nameredisson-netty会抛异常,我的处理方式是把相关操作都放到自己的线程池中操作.

官方解释是在netty线程中调用同步方法可能会导致超时;
issue:https://github.com/redisson/redisson/issues/3549

异常见源码

org.redisson.command.CommandAsyncService.get(org.redisson.api.RFuture<V>)

版本
redissonredisson-spring-boot-starter-3.17.6.jar
redis:6.2.7

redisson延时任务机制简述

生产者先将任务pushdelay_queue_timeout等待队列中,延迟时间到了,消费者会把任务从timeout队列挪到SANYOU任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;

这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get一下队列,达到订阅队列的目的;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);

这样做的目的:
消费者订阅队列,从delay_queue_timeout等待延迟队列中将已经到达时间的任务挪到真正的任务List队列中,然后再将delay_queue_timeout队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel中;然后客户端监听到这个channel中的消息后,会再次重复上述步骤,让delay_queue_timeout中的任务,可以都放到真正的任务List队列中;

这样有一个好处就是不用一直while扫描等待,客户端的延迟任务时间和delay_queue_timeout中的延迟时间是一样的,可以精准利用cpu,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;

另外由于客户端都是用lua脚本去redis的同一个List队列中获取任务,lua脚本在redis中都是原子任务,而且redis真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);

捞一张图片
在这里插入图片描述

代码Demo


import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {@Resourceprivate RedissonClient redissonClient;//延时队列mapprivate final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);/*** 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务*/@PostConstructpublic void reScheduleDelayedTasks() {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);}}@Overridepublic void afterPropertiesSet() {// 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumerDelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());if (delayQueueConsumer == null) {throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");}// Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,// 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());//消费者初始化队列RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);//set到map中方便获取delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);// 订阅新元素的到来,调用的是takeAsync(),异步执行rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);}}public RedissonClient getRedissonClient() {return redissonClient;}public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {return delayQueueMap;}
}import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class DelayQueueUtil {private static RedissonDelayQueueConfig redissonDelayQueueConfig;@Resourcepublic void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;}private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {if(null == redissonDelayQueueConfig) return Collections.emptyMap();return redissonDelayQueueConfig.getDelayQueueMap();}private static RedissonClient getRedissonClient() {if(null == redissonDelayQueueConfig) return null;return redissonDelayQueueConfig.getRedissonClient();}/*** 添加延迟消息*/public static void addDelayMessage(DelayMessageDTO delayMessage) {log.info("delayMessage={}", delayMessage);Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");delayMessage.setCreateTime(DateUtil.now());if(null == delayMessage.getTimeUnit()){delayMessage.setTimeUnit(TimeUnit.SECONDS);}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());//移除相同的消息rDelayedQueue.remove(delayMessage);//添加消息rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());}/*** 移除指定队列中的消息*/public static void removeDelayMessage(DelayMessageDTO delayMessage) {log.info("取消:delayMessage={}", delayMessage);if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());return;}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());rDelayedQueue.remove(delayMessage);removeDelayQueue(delayMessage);}/*** 从所有队列中删除消息*/public static void removeDelayQueue(DelayMessageDTO value) {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);delayedQueue.remove(value);}}}

参考了大佬的博文
https://lhalcyon.com/delay-task/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/244670.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

表白墙网站PHP源码,支持封装成APP

源码介绍 PHP表白墙网站源码&#xff0c;适用于校园内或校区间使用&#xff0c;同时支持封装成APP。告别使用QQ空间的表白墙。 简单安装&#xff0c;只需PHP版本5.6以上即可。 通过上传程序进行安装&#xff0c;并设置账号密码&#xff0c;登录后台后切换模板&#xff0c;适配…

rust使用protobuf

前言 c,java,go 等直接是用 &#xff0c;具体就不说了&#xff0c;这章主要讲述rust 使用protobuf 这章主要讲述2种 1 > protoc protoc-gen-rust plugin 2> protoc prost-build 1&#xff1a;环境 win10 rustrover64 25-2 下载地址 https://github.com/protocolbu…

构建中国人自己的私人GPT—与文档对话

先看效果 他可以从上传的文件中提取内容作为答案。上传文件摄取速度 摄取速度取决于您正在摄取的文档数量以及每个文档的大小。为了加快摄取速度&#xff0c;您可以在配置中更改摄取模式。 存在以下摄取模式&#xff1a; simple&#xff1a;历史行为&#xff0c;一次按顺序摄…

MySQL建表练习

练习题目&#xff1a;通过所提供的E-R图和数据库模型图完成库表的创建&#xff0c;并插入适量的数据.要求必须使用SQL命令进行构建。 已知如下&#xff1a; 1、创建客户信息表&#xff1a; 代码&#xff1a; CREATE DATABASE Bank; //建库CREATE TABLE Userinfo(Cust…

2024.1.23 GNSS 零散知识 学习笔记

1.天线种类 2.接收机 2.四大导航系统的介绍 3.卫星高度与轨道卫星种类 4.GNSS有哪些应用 5.在空间保持静⽌或匀速直线运动(⽆加速度)的坐标系称为惯性坐标系。 6.地⼼惯性坐标系实际上并没有满⾜能成为惯性坐标系的条件&#xff1a; ⾸先&#xff0c;地球及其质⼼都在围绕太阳…

K8S四层代理Service-02

Service的四种类型使用 ClusterIP使用示例Pod里使用service的服务名访问应用 NodePort使用示例 ExternalName使用示例 LoadBalancer K8S支持以下4种Service类型&#xff1a;ClusterIP、NodePort、ExternalName、LoadBalancer 以下是使用4种类型进行Service创建&#xff0c;应对…

MySQL45道练习题

作业需要数据表SQL语句已给 1. 查询" 01 "课程比" 02 "课程成绩高的学生的信息及课程分数 select * from Student RIGHT JOIN (select t1.SId, class1, class2 from(select SId, score as class1 from sc where sc.CId 01)as t1, (select SId, score as …

从开发、部署到维护:SAAS与源代码小程序的全流程对比

在数字化时代&#xff0c;小程序已成为企业开展业务的重要工具。然而&#xff0c;小程序开发过程中存在多种形式&#xff0c;其中SAAS版本小程序和源代码小程序是最常见的两种。乔拓云SaaS系统作为业界领先的SaaS服务平台&#xff0c;为企业提供高效、便捷的小程序解决方案。与…

01、领域驱动设计:微服务设计为什么要选择DDD总结

目录 1、前言 2、软件架构模式的演进 3、微服务设计和拆分的困境 4、为什么 DDD适合微服务 5、DDD与微服务的关系 6、总结 1、前言 我们知道&#xff0c;微服务设计过程中往往会面临边界如何划定的问题&#xff0c;不同的人会根据自己对微服务的理 解而拆分出不同的微服…

springboot119基于工程教育认证的计算机课程管理平台

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的基于工程教育认证的计算机课程管理平台 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图…

线程的同步和互斥学习笔记

目录 互斥锁的概念和使用 线程通信-互斥 互斥锁的创建和销毁 申请锁-pthread_mutex_lock 释放锁-pthread_mutex_unlock 读写锁的概念和使用 死锁的避免 互斥锁的概念和使用 线程通信-互斥 临界资源 一次只允许一个任务&#xff08;进程、线程&#xff09;访问的共享资…

ClickHouse与Doris数据库比较

概述 都说“实践是检验真理的唯一标准”&#xff0c;光说不练假把式&#xff0c;那么本文就通过实际的测试来感受一下Doris和clickhouse在读写方面的性能差距&#xff0c;看看Doris盛名之下&#xff0c;是否真有屠龙之技&#xff1b;clickhouse长锋出鞘&#xff0c;是否敢缚苍…

【GitHub项目推荐--不错的 Java 开源项目】【转载】

1 基于 Java 的沙盒塔防游戏 Mindustry 是一款用 Java 编写的沙盒塔防游戏。玩家需要建造精密的传送带供应链&#xff0c;提供炮塔弹药&#xff0c;生产建筑材料&#xff0c;保护建筑并抵御敌人。也可以在跨平台多人合作游戏中与朋友一起战斗&#xff0c;或组队进行 PVP 比赛。…

什么品牌洗地机最好?专业旗舰级洗地机推荐

作为一个打工族&#xff0c;很能理解大家对日常清洁繁琐的烦恼&#xff0c;尤其是在忙碌工作后难以有力气打扫卫生。这时候&#xff0c;洗地机就是解决问题的利器了。它不仅方便轻松&#xff0c;还能有效消菌杀毒&#xff0c;助力深度清洁。若你正在为选择哪款洗地机而烦恼&…

【Java IO】设计模式 (装饰者模式)

Java I/O 使用了装饰者模式来实现。 装饰者模式 请参考装饰者模式详解 装饰者(Decorator)和具体组件(ConcreteComponent)都继承自组件(Component)&#xff0c;具体组件的方法实现不需要依赖于其它对象&#xff0c;而装饰者组合了一个组件&#xff0c;这样它可以装饰其它装饰者…

初识汇编指令

1. ARM汇编指令 目的 认识汇编, 从而更好的进行C语言编程 RAM指令格式: 了解 4字节宽度 地址4字节对齐 方便寻址 1.1 指令码组成部分 : condition: 高4bit[31:28] 条件码 0-15 &#xff08;16个值 &#xff09; 条件码: 用于指令的 条件执行 , ARM指定绝大部分 都可…

【Midjourney】绘画风格关键词

1.松散素描(Loose Sketch) "Loose sketch"&#xff08;松散素描&#xff09;通常指的是一种艺术或设计中的手绘风格&#xff0c;其特点是线条和形状的表现相对宽松、自由&#xff0c;没有过多的细节和精确度。这样的素描通常用于表达创意、捕捉概念或者作为设计的初步…

视频监控平台EasyCVR增加fMP4流媒体视频格式及其应用场景介绍

近期我们在视频监控管理平台EasyCVR系统中新增了HTTP-FMP4播放协议&#xff0c;今天我们就来聊聊该协议的特点和应用。 fMP4&#xff08;Fragmented MPEG-4&#xff09;是基于MPEG-4 Part 12的流媒体格式&#xff0c;是流媒体的一项重要技术&#xff0c;因为它能通过互联网传送…

【论文阅读 SIGMOD18】Query-based Workload Forecasting for Self-Driving

Query-based Workload Forecasting for Self-Driving Database Management Systems My Summary ABSTRACT Autonomous DBMS的第一步就是能够建模并预测工作负载&#xff0c;以前的预测技术对查询的资源利用率进行建模。然而&#xff0c;当数据库的物理设计和硬件资源发生变化…

Unity SnapScrollRect 滚动 匹配 列表 整页

展示效果 原理: 当停止滑动时 判断Contet的horizontalNormalizedPosition 与子Item的缓存值 相减,并得到最小值&#xff0c;然后将Content horizontalNormalizedPosition滚动过去 使用方式&#xff1a; 直接将脚本挂到ScrollRect上 注意&#xff1a;在创建Content子物体时…