基于 Redis Stream 实现消息队列功能

好长时间没更新了。。。。。。

背景:举个例子在某个接口执行完成后只需要前半段返回结果,后半段可能是日志记录、下游系统调用等功能的情况下,将耗时的消息进行异步发送就显得很有必要,这时就有很多种选择,单体项目甚至可以选择自定义线程池+DelayQueue 这种操作去进行异步操作,而大多数人会在第一时间想到消息丢列,但是消息引入消息队列这件事对于一个并发量不大、后半段消息允许失败的情况单独引入一个中间件对系统的开发维护难度都会提升一个等级,所以我就想到应用 Redis Stream 这种方式来实现异步任务的执行

废话不多说,直接上代码

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisCountdownTaskProducer implements CountdownStrategy {private final StringRedisTemplate stringRedisTemplate;@Overridepublic void startCountdown(long duration, Runnable onFinish, String userId, String taskId) {log.info("使用redis stream 的延时任务开始执行 userId:{},taskId:{}",userId,taskId);Map<String,String> producerMap = Map.of("userId",userId,"taskId",taskId,"duration",String.valueOf(duration));// 发送延迟消费信息  topic: pickUpTheLightRecordstringRedisTemplate.opsForStream().add("streamKey", producerMap);}@Overridepublic void cancelCountdown(String userId, String taskId) {}
}

以我的应用场景为例,大家可以忽略这个继承的 CountdownStrategy 的接口,我这是用策略模式来实现多种方式的动态切换

最核心的代码就是一行 stringRedisTemplate.opsForStream().add(“pickUpTheLightRecord”, producerMap);
应用 redis 提供的 stream 功能,直接发送你的 topic 和你的 key(这个 key 可以是你的某个实体,某个信息,或者说某种标识,以便后续取出的时候可以知道自己要进行什么操作)

@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {private final RedisConnectionFactory redisConnectionFactory;private final RedisCountdownTaskConsumer redisCountdownTaskConsumer;@Beanpublic ExecutorService asyncStreamConsumer() {AtomicInteger index = new AtomicInteger();int processors = Runtime.getRuntime().availableProcessors();return new ThreadPoolExecutor(processors,processors + processors >> 1,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),runnable -> {Thread thread = new Thread(runnable);thread.setName("stream_consumer_countdown_task_" + index.incrementAndGet());thread.setDaemon(true);return thread;});}@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(ExecutorService asyncStreamConsumer) {// 创建消费者组String consumerGroup = "your_comsumer_group";String streamKey = "streamKey"; // 与生产者topic一致try {redisConnectionFactory.getConnection().xGroupCreate(streamKey.getBytes(), consumerGroup, ReadOffset.from("0-0"), true);} catch (Exception e) {// 捕获异常,检查是否是因为消费者组已存在导致的错误if (e.getMessage().contains("BUSYGROUP")) {// 如果消费者组已存在,则复用现有的消费者组log.warn("消费者组已存在,复用现有的消费者组: {}", consumerGroup);} else {// 如果是其他错误,则记录日志log.warn("消费者组创建失败: {}", e.getMessage());}}StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(10)// 执行从 Stream 拉取到消息的任务流程.executor(asyncStreamConsumer)// 如果没有拉取到消息,需要阻塞的时间。不能大于 ${spring.data.redis.timeout},否则会超时.pollTimeout(Duration.ofSeconds(3)).build();StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =StreamMessageListenerContainer.create(redisConnectionFactory, options);streamMessageListenerContainer.receiveAutoAck(Consumer.from(consumerGroup,"countdownTaskConsumer"),StreamOffset.create(streamKey, ReadOffset.lastConsumed()),redisCountdownTaskConsumer);return streamMessageListenerContainer;}
}

以上是对消费者的配置,配置过后我们通过 Bean 的形式直接注入 Spring 容器,方便在应用启动时它可以自动创建,应用结束时可以自动销毁,避免资源浪费

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisCountdownTaskConsumer implements StreamListener<String, MapRecord<String, String, String>> {private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;private final TaskCommonPomodoroTechniqueMapper taskCommonPomodoroTechniqueMapper;private final StringRedisTemplate stringRedisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {log.info("receive message:{},倒计时任务开始执行 userId:{},taskId:{}",message);// 执行倒计时任务String stream = message.getStream();RecordId id = message.getId();String consumerGroup = "pickUpTheLightRecord_consumer_group";//消息幂等处理if(!messageQueueIdempotentHandler.isMessageProcessed(id.toString())){//判断当前消息是否执行完成if(messageQueueIdempotentHandler.isAccomplish(id.toString())){log.info("消息已处理完成: {}", id);stringRedisTemplate.opsForStream().acknowledge(stream, consumerGroup, id); // 显式ACreturn;}throw new RuntimeException("消息未完成流程,选择消息队列重试");}try {Map<String, String> value = message.getValue();String userId = value.get("userId");String taskId = value.get("taskId");LambdaUpdateWrapper<TaskCommonPomodoroTechnique> wrapper = new LambdaUpdateWrapper<>();wrapper.eq(TaskCommonPomodoroTechnique::getUserId,userId);wrapper.eq(TaskCommonPomodoroTechnique::getId,taskId);TaskCommonPomodoroTechnique taskCommonPomodoroTechnique = taskCommonPomodoroTechniqueMapper.selectOne(wrapper);if(taskCommonPomodoroTechnique == null){log.error("倒计时任务不存在,userId:{},taskId:{}",userId,taskId);return;}taskCommonPomodoroTechnique.setCompletionTimes(taskCommonPomodoroTechnique.getCompletionTimes()+1);taskCommonPomodoroTechniqueMapper.updateById(taskCommonPomodoroTechnique);log.info("倒计时任务执行成功,userId:{},taskId:{}",userId,taskId);stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());}catch (Throwable e){messageQueueIdempotentHandler.deleteMessageProcessed(id.toString());log.error("记录倒计时任务异常",e);throw e;}messageQueueIdempotentHandler.setAccomplish(id.toString());}
}

最后我们来实现具体的消费者,直接继承 StreamListener 这个类,重写 onMessage 方法,在这个方法中定义你要执行的业务逻辑(大家可以忽略幂等的处理,这个不是讲解的重点)

这样就可以使用基于 Redis Stream 的消息丢列啦

总的来说和 RocketMQ 的使用差不多,但是多了一些配置的过程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/34202.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue.js 中 class 和 style 绑定的全面解析

目录 引言 6.1 v-bind 指令 介绍 使用方法 6.2 绑定 HTML class 介绍 用法 6.3 绑定内联样式 介绍 用法 6.4 实战&#xff1a;制作消息提示框 介绍 用法 总结 引言 在Vue.js构建用户界面的宏伟蓝图里&#xff0c;样式的动态呈现与交互性的完美融合是吸引用户目光…

【红黑树】—— 我与C++的不解之缘(二十五)

前言 学习了avl树&#xff0c;现在来学习红黑树。 一、什么是红黑树 红黑树是一颗平衡二叉搜索树&#xff0c;它每一个节点增加了一个存储位表示节点的颜色&#xff0c;可以是红色或者黑色。 相比较于AVL树&#xff0c;红黑树也是一个自平衡二叉搜索树&#xff0c;但是它与AVL树…

SFT数据处理部分的思考

SFT数据及处理的业内共识 1&#xff0e;prompt的质量和多样性远重要于数据量级&#xff0c;微调一个 30 b 量级的base model只需要 10 w 量级的数据即可 参考&#xff1a;《LIMA&#xff1a;Less Is More for Alignment》 2&#xff0e;合成数据很重要&#xff01;一般需要通过…

Python(学习一)

做网站有成熟的框架像FLASK、DJANGO、TORNADO&#xff0c;写爬虫有好用到哭的REQUESTS&#xff0c;还有强大到没盆友的SCRAPY 随着NUMPY、SCIPY、MATLOTLIB等众多第三方模块的开发和完善&#xff0c;不仅支持py支持各种数学运算&#xff0c;还可以绘制高质量的2D和3D图像&…

ArcGIS Pro将有文字标注底图切换为无标注底图(在线地图图源)

今天介绍一下在ArcGIS Pro将有标注的地形底图换成无标注的底图。 大家在这项目底图时候会经常调用ArcGIS Pro自带的地形图&#xff0c;但是这个地形图自带是有注记的&#xff0c;如下图。 如何更改&#xff0c;才可以调用无文字注记的呢&#xff1f; 对于一个已经切好图的有注记…

Linux第三次练习

1、创建根目录结构中的所有的普通文件 首先在根目录下面新创建一个test目录&#xff0c;然后将查找到的普通文件新建到test目录下 2、列出所有账号的账号名 3、将/etc/passwd中内容按照冒号隔开的第三个字符从大到小排序后输出所有内容 4、列出/etc/passwd中的第20行-25行内容…

[CISCN 2022 初赛]ezpop(没成功复现)

打开在线环境可以看到&#xff1a; 记得之前做过一个类似的就是有点像照着漏洞去复现。应该可以直接在网上找到链子去打。 www.zip查看路由是 Index/test&#xff0c;然后 post 传参 a&#xff1a; exp&#xff08;参考了别的大神的wp&#xff09;&#xff1a; <?php //…

技术-NBIOT

是什么&#xff1f; 窄带物联网&#xff08;Narrow Band Internet of Things, NB-IoT&#xff09;成为万物互联网络的一个重要分支支持低功耗设备在广域网的蜂窝数据连接&#xff0c;也被叫作低功耗广域网(LPWAN)NB-IoT支持待机时间长、对网络连接要求较高设备的高效连接NB-Io…

Spring @Bean注解使用场景二

bean:最近在写一篇让Successfactors顾问都能搞明白的sso的逻辑的文章&#xff0c;所以一致在研究IAS的saml2.0的协议&#xff0c;希望用代码去解释SP、idp的一些概念&#xff0c;让顾问了解SSO与saml的关系&#xff0c;在github找代码的时候发现一些代码的调用关系很难理解&…

pip install和conda install的区别

这里写目录标题 一、什么是 Python 依赖&#xff08;Python Dependencies&#xff09;&#xff1f;1. 依赖的作用2. 如何管理 Python 依赖3. 依赖管理问题4. 依赖锁定总结 二、使用pip安装包venv隔离环境方法 1&#xff1a;使用 venv&#xff08;推荐&#xff09;创建虚拟环境激…

R语言高效数据处理-自定义EXCEL数据排版

注&#xff1a;以下代码均为实际数据处理中的笔记摘录&#xff0c;所以很零散 1、自定义excel表数据输出格式、布局 在实际数据处理中为了提升效率&#xff0c;将Excel报表交付给需求方时减少手动调整的环节很有必要 #1.1设置表头格式 header_style <- createStyle(font…

Word 小黑第4套

对应大猫41 上下日期是一起变动的&#xff0c;删掉第一个&#xff0c;第二个日期格式&#xff08;文件 -选项 -自定义功能区 -选上开发工具&#xff09; 点开发工具 -属性 选择相应的日期格式&#xff09; 修改标题样式时&#xff0c;标题三只有点标题二时才会显示 右击正文样…

酒店宾馆IPTV数字电视系统:创新宾客体验,引领智慧服务新潮流

酒店宾馆IPTV数字电视系统&#xff1a;创新宾客体验&#xff0c;引领智慧服务新潮流 北京海特伟业科技有限公司任洪卓于2025年3月15日发布 随着智慧酒店的不断发展&#xff0c;宾客对于酒店内的娱乐和信息服务需求日益多样化&#xff0c;传统的电视服务已难以满足现代宾客的高…

jupyter无法转换为PDF,HTMLnbconvert failed: Pandoc wasn‘t found.

无法转为PDF 手动下载工具 https://github.com/jgm/pandoc/releases/tag/3.6.3 似乎跟我想的不大一样&#xff0c;还有新的报错 https://nbconvert.readthedocs.io/en/latest/install.html#installing-tex 不知道下的啥玩意儿 sudo apt-get install texlive-xetex texlive-fon…

如何在 VS编译器上使用 C99规定的变长数组------使用Clang工具

VS编译器默认处理代码的工具是 MSVC&#xff0c;而MSVC工具是无法处理变长数组的&#xff0c;这个时候我们就要换一个处理代码的工具了----Clang 1 int n 9; 2 int arr[n];// 数组长度可以拟定1.打开 Visual Stdudio Intaller 2.点击修改&#xff0c;鼠标下滑找到>>使用…

vue echarts封装使用

echarts 尺寸自动调节 resize.js 柱状图 components/dashboard/lineChart.vue <template><div :class"className" :style"{height:height,width:width}" /> </template><script> import echarts from echarts require(echarts/…

《计算机图形学》第二课笔记-----二维变换的推导

前言&#xff1a;为什么这么突兀的把这一节内容放在了第二课&#xff0c;第一是因为我急于求成&#xff0c;第二是因为这一章节太重要了&#xff0c;这几乎是二维三维变换的最核心的东西&#xff0c;理解了这一章节内容&#xff0c;后面的就会像打通了任督二脉一样&#xff0c;…

OTP单片机调试工具之—单线数据编码

OTP单片机调试工具在实现过程中离不开单线数据的传输&#xff0c;那么使用哪一种方式的数据编码会比较好呢&#xff1f; 我所了解的主要有以下三种&#xff1a; 1.UART&#xff08;串口&#xff09;&#xff0c;这种方式在单片机和pc之间进行传输都非常常见&#xff0c;效率比较…

背诵--2

DAY01 面向对象回顾、继承、抽象类 学习目标 能够写出类的继承格式public class 子类 extends 父类{}public class Cat extends Animal{} 能够说出继承的特点子类继承父类,就会自动拥有父类非私有的成员 能够说出子类调用父类的成员特点1.子类有使用子类自己的2.子类没有使用…

穷举vs暴搜vs深搜vs回溯vs剪枝刷题 + 总结

文章目录 全排列题解代码 子集题解代码 总结 全排列 题目链接 题解 1. 画一颗决策树 2. 全局变量&#xff1a; int[ ][ ] ret&#xff1a;用于存结果的二维数组 int[ ] path&#xff1a;用于存每次路径的答案 bool[ ] check&#xff1a;判断这个数是否已经用过&#xff0c;…