Spring boot 使用Redis 消息发布订阅

Spring boot 使用Redis 消息发布订阅

文章目录

  • Spring boot 使用Redis 消息发布订阅
    • Redis 消息发布订阅
        • Redis 发布订阅 命令
    • Spring boot 实现消息发布订阅
      • 发布消息
      • 消息监听
      • 主题订阅
    • Spring boot 监听 Key 过期事件
      • 消息监听
      • 主题订阅

最近在做请求风控的时候,在网上搜集了大量的解决方案,最后使用Redis 消息发布订阅 比较符合业务。做一下记录

img

Redis 消息发布订阅

img

Redis 发布订阅 命令:redis命令手册

1、Redis 中"pub/sub"的消息,为"即发即失",server 不会保存消息,如果 publish 的消息没有任何 client 处于 “subscribe” 状态,消息将会被丢弃;如果 client 在 subcribe 时,链接断开后重连,那在么此期间的消息也将丢失。

2、Redis server 将会"尽力"将消息发送给处于 subscribe 状态的 client,但是仍不会保证每条消息都能被正确接收。

**优点:**支持发布订阅,支持多组生产者、消费者处理消息

缺点:

  1. 消费者下线数据会丢失

  2. 不支持数据持久化,Redis宕机则数据也会丢失

  3. 消息堆积,缓存区溢出,消费者会被强制踢下线,数据也会丢失

Redis 发布订阅 命令
命令描述
Redis Unsubscribe 命令指退订给定的频道。
Redis Subscribe 命令订阅给定的一个或多个频道的信息。
Redis Pubsub 命令查看订阅与发布系统状态。
Redis Punsubscribe 命令退订所有给定模式的频道。
Redis Publish 命令将信息发送到指定的频道。
Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。

Spring boot 实现消息发布订阅

1、引入 Redis 依赖

    <!--Spring Boot redis 启动器--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2、Redis 数据库配置

spring:data:redis:database: 0host: localhostport: 6379password:

发布消息

	/*** redis 将信息发送到指定的频道* @param topic   :消息所属的主题/频道* @param context :消息内容* @return*/redisTemplate.convertAndSend(topic, context);
@RequiredArgsConstructor
@Service
public class RequestRateLimiterService {private final RedisTemplate<String, Object> redisTemplate;// Redis 中的 key 前缀private static final String REDIS_KEY_PREFIX = "select_rate_limit:";// Redis 中的通道名称private static final String REDIS_CHANNEL = "select_rate_limit_channel";// 根据用户名 请求风控public boolean allowRequest(String username) {// 每分钟最大请求次数Long MAX_REQUESTS_PER_MINUTE = 60L;String key = REDIS_KEY_PREFIX + username;Long currentRequests = redisTemplate.opsForValue().increment(key);if (currentRequests != null && currentRequests > MAX_REQUESTS_PER_MINUTE) {redisTemplate.convertAndSend(REDIS_CHANNEL, username);return false; // 超过阈值,拒绝请求}if (currentRequests != null && currentRequests == 1) {redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 设置过期时间为1分钟}return true; // 允许请求}}

消息监听

1、 Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理。

/*** Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理* <p>* 1、可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.* 2、自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。**/
@Component
public class RequestRateLimitSubscriber implements MessageListener {// 直接从容器中获取@Resourceprivate RedisTemplate<String, Object> redisTemplate;/*** 监听到的消息必须进行与发送时相同的方式进行反序列* 1、订阅端与发布端 Redis 序列化的方式必须相同,否则会乱码。** @param message :消息实体* @param pattern :匹配模式*/@Overridepublic void onMessage(Message message, byte[] pattern) {// 消息订阅的匹配规则,如 new PatternTopic("basic-*") 中的 basic-*String msgPattern = new String(pattern);// 消息所属的通道,可以根据不同的通道做不同的业务逻辑String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());// 接收的消息内容,可以根据自己需要强转为自己需要的对象,但最好先使用 instanceof 判断一下Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());log.info("收到 Redis 订阅消息: channel={} body={} pattern={} ", channel, body, msgPattern);// 模拟数据处理 ********// 发送警告通知,可以通过邮件、短信等方式进行通知log.info("------------数据处理完成.......");}
}

主题订阅

1、自定义 RedisTemplate 序列化方式(发布者和订阅者必须相同)。

2、配置主题订阅 - Redis 消息监听器绑定监听指定通道。

/*** 自定义 RedisTemplate 序列化方式* 配置主题订阅 - Redis 消息监听器绑定监听指定通道*/
@Configuration
public class RedisConfig {// 自定义的消息订阅监听器,当收到阅订的消息时,会将消息交给这个类处理@Resourceprivate RequestRateLimitSubscriber requestRateLimitSubscriber;//  自定义 RedisTemplate 序列化方式   @Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化规则redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化规则redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化规则redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化规则redisTemplate.setConnectionFactory(factory); //绑定 RedisConnectionFactoryreturn redisTemplate; //返回设置好的 RedisTemplate}/*** 配置主题订阅* RedisMessageListenerContainer - Redis 消息监听器绑定监听指定通道* 1、可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。* 2、订阅的通道可以配置在全局配置文件中,也可以配置在数据库中,* <p>* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定* addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定** @param connectionFactory* @return*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取container.setConnectionFactory(factory);// 订阅名称叫 select_rate_limit_channel 的通道, 类似 Redis 中的 subscribe 命令container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic("*"));// 订阅名称以 'basic-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令container.addMessageListener(requestRateLimitSubscriber, new PatternTopic("*"));return container;}
}

Spring boot 监听 Key 过期事件

1、Redis 数据库可以通过命令设置 Key 的有效时间,当一个 Key 过期后会自动从数据库中删除,释放空间。得益于于这个特性,可以很轻松地实现诸多类似于 “Session” 管理、数据缓存等功能。它们都有一个共同点就是,数据不会永久保存!

2、在有些场景中,可能希望在某些 Key 过期的时候获取到通知,进行一些业务处理。或者是干脆用于 “定时通知/任务” 功能,例如:下单 30 分钟后未支付,则取消订单。那么可以在用户下单的时候使用订单号作为 key 设置到 Redis 数据库中,并且设置过期时间为 30 分钟。当超时后,可以在 “key 过期通知” 中获取到 key 也就是订单号,判断用户是否已经支付从而是否取消订单。

3、Redis 的 Key 过期通知功能本质上是通过 发布/订阅 功能实现的,所以它「不能保证通知消息的交付」,当 Key 过期时如果服务器停机、重启后则该通知消息会永久丢失。

消息监听

1、Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。

2、doHandleMessage 方法用于处理 Redis Key 过期通知事件,其中 Message 参数表示通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。

3、在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/*** Redis 缓存 Key 过期监听器* Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,* 自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。*/
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);/*** 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener** @param listenerContainer : Redis消息侦听器容器*/public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** doHandleMessage 方法用于处理 Redis Key 过期通知事件,* 在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」** @param message:通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。*/@Overridepublic void doHandleMessage(Message message) {// 过期的 keyString key = new String(message.getBody());// 消息通道String channel = new String(message.getChannel());logger.info("过期key={} 消息通道(channel)={}", key, channel);}
}

主题订阅

1、与上面稍微有点不同,因为 key 过期事件属于 Redis 内部消息,内部频道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手动设置监听器 监听指定的通道/频道(topic 表达式)。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);//  container.setTaskExecutor(null);            // 设置用于执行监听器方法的 Executor//  container.setErrorHandler(null);            // 设置监听器方法执行过程中出现异常的处理器//  container.addMessageListener(null, null);   // 手动设置监听器 & 监听的 topic 表达式return container;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/212393.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ESP32-Web-Server编程- 在 Web 上开发动态纪念册

ESP32-Web-Server编程- 在 Web 上开发动态纪念册 概述 Web 有很多有趣的玩法&#xff0c;在打开网页的同时送她一个惊喜。 需求及功能解析 本节演示在 ESP32 上部署一个 Web&#xff0c;当打开对应的网页时&#xff0c;将运行动态的网页内容&#xff0c;显示炫酷的纪念贺词…

.NET 8 中 Android 资源生成的改进和变化

作者&#xff1a;Dean Ellis 排版&#xff1a;Alan Wang 随着 .NET 8 的发布&#xff0c;我们引入了一个新系统&#xff0c;用于生成访问 Android 资源的 C# 代码。 在 Xamarin.Android、.NET 6 和 .NET 7 中生成 Resource.designer.cs 文件的系统已经被弃用。 新系统生成一个名…

苍穹外卖+git开源

搁置了很久重新开始学 为了学习方便&#xff0c;苍穹外卖的前后端代码已放至git开源。前端源代码请看给i他-->sky-take-out: 苍穹外卖 git学习-->Git基础使用-CSDN博客 后端接口员工管理和分类管理模块 添加员工&#xff0c;添加的表单账号、手机号、身份证都…

Spring Boot的日志

打印日志 打印日志的步骤: • 在程序中得到日志对象. • 使用日志对象输出要打印的内容 在程序中得到日志对象 在程序中获取日志对象需要使用日志工厂LoggerFactory,代码如下: package com.example.demo;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public c…

安装you-get(mac)

1、首先要有python环境 2、更新pip python -m pip install --upgrade pip 3、安装you-get pip install you-get;

T天池SQL训练营(五)-窗口函数等

–天池龙珠计划SQL训练营 5.1窗口函数 5.1.1窗口函数概念及基本的使用方法 窗口函数也称为OLAP函数。OLAP 是OnLine AnalyticalProcessing 的简称&#xff0c;意思是对数据库数据进行实时分析处理。 为了便于理解&#xff0c;称之为窗口函数。常规的SELECT语句都是对整张表进…

创建vue项目:node.js下载安装、配置环境变量,下载安装cnpm,配置npm的目录、镜像,安装vue、搭建vue项目开发环境(保姆级教程一)

今天讲解 Windows 如何创建 vue 项目&#xff0c;搭建 vue 开发环境&#xff0c;这是这个系列的第一章&#xff0c;有什么问题请留言&#xff0c;请点赞收藏&#xff01;&#xff01;&#xff01; 文章目录 一、Vue简单介绍二、开始搭建1、安装node.js环境2、配置npm下载时的默…

一文3000字从0到1用Python进行gRPC接口测试!

gRPC 是一个高性能、通用的开源RPC框架&#xff0c;其由 Google 主要面向移动应用开发并基于HTTP/2 协议标准而设计&#xff0c;基于 ProtoBuf(Protocol Buffers) 序列化协议开发&#xff0c;且支持众多开发语言。 自gRPC推出以来&#xff0c;已经广泛应用于各种服务之中。在测…

数据可视化免费化的双面影响探析

近年来数据可视化的免费化也越来越明显&#xff0c;今天就以我作为可视化设计师的经验来和大家分析一下&#xff0c;数据可视化工具免费化所带来的利与弊。 先从好处入手&#xff0c;最明显的就是免费化可以让数据可视化工具得到更广泛的使用。 免费数据可视化工具使得更多人可…

docker搭建nginx实现负载均衡

docker搭建nginx实现负载均衡 安装nginx 查询安装 [rootlocalhost ~]# docker search nginx [rootlocalhost ~]# docker pull nginx准备 创建一个空的nginx文件夹里面在创建一个nginx.conf文件和conf.d文件夹 运行映射之前创建的文件夹 端口&#xff1a;8075映射80 docker…

电脑版便签软件怎么设置在桌面上显示?

对于不少上班族来说&#xff0c;如果想要在使用电脑办公的时候&#xff0c;随手记录一些常用的工作资料、工作注意事项等内容&#xff0c;直接在电脑上使用便签软件记录是比较方便的。电脑桌面便签工具不仅方便我们随时记录各类工作事项&#xff0c;而且支持我们快速便捷使用这…

长城之上的无人机:文化遗产的守护者

长城之上的无人机&#xff1a;文化遗产的守护者 在八达岭长城景区&#xff0c;两架无人机分别部署在了长城的南、北楼两点。根据当前的保护焦点和需求&#xff0c;制定了5条无人机综合巡查航线&#xff0c;以确保长城景区的所有开放区域都能得到有效监管。每天&#xff0c;无人…

Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler1、接收HTTP请求的hander2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction 二、对indexShard执行刷新请求1、首先获取读锁&#xff0c;再获取刷新锁&#xff0c;如果获取不到根据参数决定是否直接返回还是等待2、在刷新之后transl…

Java的三种代理模式实现

代理模式的定义&#xff1a; Provide a surrogate or placeholder for another object to control access to it.&#xff08;为其他对象提供一种代理以控制对这个对象的访问。&#xff09; 简单说&#xff0c;就是设置一个中间代理来控制访问原目标对象&#xff0c;达到增强原…

ProEasy机器人案例:电池边包胶

如下图所示&#xff0c;对一个电池三边包边&#xff0c;因客户现场有很多规格电池的大小&#xff0c;所以就需要建立动态的工具坐标来实现适配所有种类的电池 程序如下&#xff1a;Ddome程序 function Speed(num) --速度设置 MaxSpdL(2000) --movl最大速度…

茄子科技张韶全:跨多云大数据平台DataCake在OceanBase的实践

11 月 16 日&#xff0c;OceanBase 在北京顺利举办 2023 年度发布会&#xff0c;正式宣布&#xff1a;将持续践行“一体化”产品战略&#xff0c;为关键业务负载打造一体化数据库。其中&#xff0c;在“数字化转型升级实践专场”&#xff0c;我们有幸邀请到了茄子科技大数据技术…

数据库:JDBC编程

专栏目录 MySQL基本操作-CSDN博客 MySQL基本操作-CSDN博客 数据库的增删查改&#xff08;CRUD&#xff09;基础版-CSDN博客 数据库增删改查&#xff08;CRUD&#xff09;进阶版-CSDN博客 数据库的索引-CSDN博客 基本概念 JDBC编程就是通过Java代码来操作数据库 api 数据库是…

Apache+mod_jk模块代理Tomcat容器

一、背景介绍 最近在看Tomcat运行架构原理, 正好遇到了AJP协议(Apache JServ Protocol). 顺道来研究下这个AJP协议和具体使用方法. 百度百科是这么描述AJP协议的: AJP&#xff08;Apache JServ Protocol&#xff09;是定向包协议。因为性能原因&#xff0c;使用二进制格式来传输…

postcss-pxtorem实现页面自适应的原理

先声明一点这玩意本身不能实现哈&#xff0c;他只是一个工具&#xff0c;更是一个postcss的插件 帮助我们从px转化成为rem比如我们的代码 div {height: 100px;width: 100px; }经过这个插件转化之后变成 假设变成下面这样哈 div {height: 1rem;width: 1rem; }其他没啥子太大作…

2023年江西省“振兴杯”网络信息行业职业技能竞赛 Web4 Writeup

这次振兴杯碰到的一道题&#xff0c;某些姿势之前貌似没有碰过&#xff0c;简单记一下吧 源码 <?php class Bird{public $funcs;public $salt;public $flag;function say_flag(){$secret hash_hmac(sha256, $_GET[salt], file_get_contents(/flag));$hmac hash_hmac(sha…