redis stream restTemplate消息监听队列框架搭建

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/236546.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Go并发快速入门:Goroutine

Go并发&#xff1a;Goroutine 1.并发基础概念&#xff1a;进程、线程、协程 (1) 进程 可以比作食材加工的一系列动作 进程就是程序在操作系统中的一次执行过程&#xff0c;是由系统进行资源分配和调度的基本单位&#xff0c;进程是一个动态概念&#xff0c;是程序在执行过程…

力扣hot100 路径总和Ⅲ dfs 前缀和 一题双解 超全注释

Problem: 437. 路径总和 III 思路 树的遍历 DFS 一个朴素的做法是搜索以每个节点为根的&#xff08;往下的&#xff09;所有路径&#xff0c;并对路径总和为 targetSumtargetSumtargetSum 的路径进行累加统计。 使用 dfs1 来搜索所有节点&#xff0c;复杂度为 O(n)O(n)O(n)&am…

【hyperledger-fabric】使用couchDB

简介 本文章主要参考来自于官方文档使用CouchDB以及 https://www.bilibili.com/video/BV1Li4y1f7ex/?spm_id_frompageDriver&vd_source2c5f2831e1c63d3a20045b167ae044e6 B站视频&#xff0c;还是非常感谢up主提供了学习的思路。 为什么要使用couchDB&#xff1f; 原文…

Redis:原理速成+项目实战——Redis实战7(优惠券秒杀+细节解决超卖、一人一单问题)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位大四、研0学生&#xff0c;正在努力准备大四暑假的实习 &#x1f30c;上期文章&#xff1a;Redis&#xff1a;原理速成项目实战——Redis实战6&#xff08;封装缓存工具&#xff08;高级写法&#xff09;&&缓存总…

JS中垃圾数据是如何自动回收的

JS中垃圾数据是如何自动回收的 背景垃圾回收机制调用栈中的数据回收堆空间中数据回收垃圾回收器的工作流程副垃圾回收器主垃圾回收器 全停顿 背景 在JS栈和堆&#xff1a;数据是如何存储的一文中提到了 JavaScript 中的数据是如何存储的&#xff0c;并通过示例代码分析了原始数…

React Native 桥接组件封装原生组件属性

自定义属性可以让组件具备更多的灵活性&#xff0c;所以有必要在JS 层通过自定义属性动态传值。 一、添加原生组件属性 因为 ViewManager 管理了整个组件的行为&#xff0c;所以要新增组件属性也需要在这里面&#xff08;如 InfoViewManager&#xff09;进行定义。 1、在Inf…

中通快递批量查询方法

你是否经常需要处理大量的中通快递单号&#xff0c;却苦于一个个等待查询&#xff1f;现在&#xff0c;有了固乔快递查询助手&#xff0c;这个问题迎刃而解&#xff01;通过批量查询功能&#xff0c;你可以轻松管理、追踪你的中通快递单号&#xff0c;大大提高工作效率。 一、下…

H264码流进行RTP包封装

一.H264基本概念 H.264从框架结构上分为视频编码层&#xff08;VCL&#xff09;和网络抽象层&#xff08;NAL&#xff09;&#xff0c;VCL功能是进行视频编解码&#xff0c;包括运动补偿预测&#xff0c;变换编码和熵编码等功能&#xff1b;NAL用于采用适当的格式对VCL视频数据…

Google ads谷歌广告投放详细步骤与技巧

对于跨境电商、独立站运营的卖家来说&#xff0c;谷歌广告投放是必备的流量拓展来源&#xff0c;但是在投入运营之前&#xff0c;你需要完整了解谷歌广告投放详细步骤&#xff0c;以为你丝滑地进行有效投放做好基础&#xff0c;下面为大家整理具体的谷歌投放技巧与步骤&#xf…

HCIP OSPF实验

任务&#xff1a; 1.使用三种解决ospf不规则区域的方法 2.路由器5、6、7、8、15使用mgre 3.使用各种优化 4.全网可达 5.保证更新安全 6.使用地址为172.16.0.0/16合理划分 7.每个路由器都有环回 拓扑图&IP划分如下&#xff1a; 第一步&#xff0c;配置IP&环回地址…

chat-plus部署指南

目录 1.下载代码 2.启动 3.测试 1.下载代码 cd /optwget https://github.com/yangjian102621/chatgpt-plus/archive/refs/tags/v3.2.4.1.tar.gz 2.启动 cd /opt/chatgpt-plus-3.2.4.1/deploydocker-compose up -d 3.测试 管理员地址xxx:8080/admin 账号密码admin/admin1…

vivado 使用项目摘要、配置项目设置、仿真设置

使用项目摘要 Vivado IDE包括一个交互式项目摘要&#xff0c;可根据设计动态更新命令被运行&#xff0c;并且随着设计在设计流程中的进展。项目摘要包括概览选项卡和用户可配置的仪表板&#xff0c;如下图所示。有关信息&#xff0c;请参阅《Vivado Design Suite用户指南&…

goland报错:The selected directory is not a valid home for Go SDK

原因&#xff1a; IDEA / goland无法识别到GO语言SDK版本 解决办法&#xff1a; 打开GO的安装目录下的src\runtime\internal\sys\zversion.go文件&#xff0c;添加一行&#xff08;我的go版本是1.18.10&#xff09; const TheVersion go1.18.10 重启goland再选择试试 最后…

【设计模式-04】Factory工厂模式

简要描述 简单工厂静态工厂工厂方法 FactoryMethod 产品维度扩展 抽象工厂 产品一族进行扩展Spring IOC 一、工厂的定义 任何可以产生对象的方法或类&#xff0c;都可以称之为工厂单例也是一种工厂不可咬文嚼字&#xff0c;死扣概念为什么有了new之后&#xff0c;还要有工厂&am…

科技创新领航 ,安川运动控制器为工业自动化赋能助力

迈入工业4.0时代&#xff0c;工业自动化的不断发展&#xff0c;让高精度运动控制成为制造业高质量发展的重要技术手段。北京北成新控伺服技术有限公司作为一家集工业自动化产品销售、系统设计、开发、服务于一体的高新技术企业&#xff0c;其引进推出的运动控制产品一直以卓越的…

详解Oracle数据库的启动

Oracle数据库的启动&#xff0c;其概念可参考Overview of Instance and Database Startup。 其过程可参见下图&#xff1a; 当数据库从关闭状态进入打开数据库状态时&#xff0c;它会经历以下阶段。 阶段Mount状态描述1实例在没有挂载数据库的情况下启动实例已启动&#xff…

共融共生:智慧城市与智慧乡村的协调发展之路

随着科技的飞速发展和全球化的不断深入&#xff0c;智慧城市和智慧乡村作为现代社会发展的重要组成部分&#xff0c;正逐渐成为人们关注的焦点。然而&#xff0c;在追求经济发展的过程中&#xff0c;城乡发展不平衡的问题也日益凸显。因此&#xff0c;如何实现智慧城市与智慧乡…

完整的模型验证套路

读取图片 from PIL import Imageimg_path "../Yennefer_of_Vengerberg.jpg" image Image.open(img_path) print(image)转换成灰度图&#xff08;可选&#xff09; image image.convert(L) image.show()转换成RGB格式 image image.convert(RGB)因为png格式是四…

STM32F103RCT6开发板M3单片机教程07-TIMER1CH1输出 PWM做LED呼吸灯

概述 本教程使用是&#xff08;光明谷SUN_STM32mini开发板&#xff09; 免费开发板 在谷动谷力社区注册用户&#xff0c;打卡&#xff0c;发帖求助都可以获取积分&#xff0c;当然最主要是发原创应用文档奖励更多积分&#xff0e; (可用积分换取&#xff0c;真的不用钱&…

Goby 漏洞发布|用友 NC registerServlet 反序列化远程代码执行漏洞

漏洞名称&#xff1a;用友 NC registerServlet 反序列化远程代码执行漏洞 English Name&#xff1a;Yonyou NC registerServlet Deserialize Remote Code Execute Vulnerability CVSS core: 9.8 影响资产数&#xff1a; 21320 漏洞描述&#xff1a; 用友 NC Cloud 是一种商…