Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> map = applicationContext.getBeansOfType(RedisQueueListener.class);executorService = createThreadPool("redis-queue");for (Map.Entry<String, RedisQueueListener> entry : map.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}private ExecutorService createThreadPool(String namePrefix) {return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),new NamedThreadFactory(namePrefix));}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听: {}", queueName);while (true) {if (shutdownRequested.get() || redissonClient.isShutdown()) {log.info("Redisson已关闭,停止监听队列: {}", queueName);break;}try {Object message = blockingQueue.take();redisQueueListener.invoke(message);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("监听队列线程被中断", e);break;} catch (Exception e) {log.error("监听队列线程错误", e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}
}

RedisQueueListener

/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void invoke(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content   消息* @param <T>       泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param timeUnit  单位* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/420163.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

原生 iOS 引入 Flutter 报错 kernel_blob.bin 找不到

情况 在一次原生 iOS 项目中引入 Flutter 的过程中&#xff0c;在模拟器中运行出现报错&#xff1a; 未能打开文件“kernel_blob.bin”&#xff0c;因为它不存在。 如下图&#xff1a; 模拟器中一片黑 原因&解决方案 这个是因为 Flutter 的打包 iOS framework 命令中…

OCR技术视角:智能文档管理中的票据自动化识别与处理

在数字化转型的浪潮中&#xff0c;企业对于高效、自动化的文档管理需求日益增长。票据作为企业运营中不可或缺的部分&#xff0c;其识别与管理的智能化成为了提升工作效率的关键。本文将深入探讨智能文档系统中票据识别功能的原理、技术优势以及在不同行业中的应用实践&#xf…

Java、python、php、node.js版 铁路售票自动选座系统 高铁购票系统 火车订票平台(源码、调试、LW、开题、PPT)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人 八年开发经验&#xff0c;擅长Java、Python、PHP、.NET、Node.js、Android、微信小程序、爬虫、大数据、机器学习等&#xff0c;大家有这一块的问题可以一起交流&…

Mac无法安装软件怎么解决?mac安装软件提示无法验证开发者怎么办

在使用 macOS 系统时&#xff0c;你可能会遇到一个常见的问题&#xff1a;当你尝试安装或打开某些应用程序时&#xff0c;系统会弹出一个警告&#xff0c;提示“无法验证开发者”。出现这个提示导致自己无法去进行程序安装&#xff0c;接下来我们就来看看如何解决此问题的方法吧…

云计算实训43——部署k8s基础环境、配置内核模块、基本组件安装

一、前期系统环境准备 1、关闭防火墙与selinux [rootk8s-master ~]# systemctl stop firewalld[rootk8s-master ~]# systemctl disable firewalldRemoved symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus…

VuePress搭建个人博客(一键安装)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

【第26章】Spring Cloud之Sentinel适配API Gateway

文章目录 前言一、准备1. 引入库2. 注册过滤器3. 添加配置4. 效果展示 二、基于网关的流控1. 新增流控规则2. 测试准备3. 测试结果 总结 前言 Sentinel从1.6.0 版本开始&#xff0c;Sentinel 提供了 Spring Cloud Gateway 的适配模块&#xff0c;可以提供两种资源维度的限流&a…

Django + websocket 连不上

看了网上的几个简单例子&#xff0c;一步一步做&#xff0c;但无一成功。都连不上websocket。 后来按一个视频教程的操作步骤来做&#xff0c;成功了。差别在于视频教程中加了 pip install daphne 和setting.py中 连不上的表现&#xff1a; 前端报错&#xff1a; WebSock…

Linux网络协议栈的实现

网络协议栈是操作系统核心的一个重要组成部分&#xff0c;负责管理网络通信中的数据包处理。在 Linux 操作系统中&#xff0c;网络协议栈&#xff08;Network Stack&#xff09;负责实现 TCP/IP 协议簇&#xff0c;处理应用程序发起的网络请求并与底层的网络硬件进行交互。本文…

[SWPUCTF 2022 新生赛]

目录 [SWPUCTF 2022 新生赛]ez_rce 什么是poc&#xff1f; [SWPUCTF 2022 新生赛]where_am_i [SWPUCTF 2022 新生赛]js_sign [SWPUCTF 2022 新生赛]xff ​[SWPUCTF 2022 新生赛]numgame call_user_func()函数 ::双冒号运算符 [SWPUCTF 2022 新生赛]ez_sql [SWPUCTF 2…

TortoiseGit无法安装解决方案

Win11安装TortoiseGit报错&#xff0c;错误码&#xff1a;2503&#xff0c;如下图&#xff1a; 开始-右键-Windows PowerShell&#xff08;管理员&#xff09;/终端 (管理员) 输入 msiexec /package 安装程序所在绝对路径&#xff0c; 例如 : msiexec /package D:\我的资料…

jenkins 部署应用到多个环境

在日常开发的过程中&#xff0c;我们经常会遇到将应用程序部署到多个环境的需求场景&#xff0c;如会先发布到测试环境&#xff0c;由测试人员进行测试&#xff0c;成功之后&#xff0c;会继续将当前应用部署到集成环境&#xff0c;进行集成测试&#xff0c;全部通过后&#xf…

RAG 聊天机器人:用 Langchain 和 Streamlit开启与 PDF 的智能对话

与大量 PDF 文档的交互如今变得前所未有地便捷与智能。想象一下,您可以轻松与您的笔记、书籍和各种文档进行无缝对话,不再需要繁琐的手动查找和处理。 这篇文章将带您逐步构建一个基于 Multi-RAG 和 Streamlit 的 Web 应用程序,该应用程序通过 AI 驱动的聊天机器人来读取、…

以实时,见未来——DolphinDB 2024 年度峰会圆满举办

2024年9月6日&#xff0c;“以实时&#xff0c;见未来”—— DolphinDB 2024 年度峰会在杭州圆满落下帷幕。本次峰会由主会场与三个专题分会场组成&#xff0c;众多金融机构领导与专家、行业领袖、高校与研究机构学者等近300位嘉宾共襄盛举&#xff0c;一同探讨数智化浪潮下金融…

基于JAVA+SpringBoot+Vue的工程教育认证的计算机课程管理平台

基于JAVASpringBootVue的工程教育认证的计算机课程管理平台 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接…

第三届人工智能与智能信息处理国际学术会议(AIIIP 2024)

目录 大会介绍 基本信息 合作单位 主讲嘉宾 会议组委 征文主题 ​ 参会方式 会议日程 中国-天津 | 2024年10月25-27日 | 会议官网&#xff1a;www.iiip.net 大会介绍 第三届人工智能与智能信息处理国际学术会议&#xff08;AIIIP 2024&#xff09;将于202…

MNIST数据集内容查看

测试数据集&#xff1a;t10k-images-idx3-ubyte.gz&#xff08;1.57 MB &#xff0c;包含10,000个样本&#xff09;。测试数据集标签&#xff1a;t10k-labels-idx1-ubyte.gz&#xff08;4.43 KB&#xff0c;包含10,000个样本的标签&#xff09;训练数据集&#xff1a;train-ima…

使用 Parallel 类进行多线程编码(上)

用 C# 进行多线程编程有很多方式&#xff0c;比如使用 Thread 对象开启一个新线程&#xff0c;但这已经是一种落后的写法了&#xff0c;现在推荐的写法是使用 Parallel 类&#xff0c;它可以让我们像写传统代码一样编写多线程的程序&#xff0c;Parallel 类有三个常用的方法如下…

PyTorch 创建数据集

图片数据和标签数据准备 1.本文所用图片数据在同级文件夹中 ,文件路径为train/’ 2.标签数据在同级文件&#xff0c;文件路径为train.csv 3。将标签数据提取 train_csvpd.read_csv(train.csv)创建继承类 第一步&#xff0c;首先创建数据类对象 此时可以想象为单个数据单元的…

在这12种场景下会使Spring事务失效--注意防范

在某些业务场景下&#xff0c;如果一个请求中&#xff0c;需要同事写入多张表的数据&#xff0c;但为了保证操作的原子性&#xff08;要么同事插入数据成功&#xff0c;要么同事插入失败&#xff09;&#xff0c;例如&#xff0c;当我们创建用户的时候&#xff0c;往往会给用户…