【Redis】Redis 的学习教程(九)之 发布 Pub、订阅 Sub

1. Pub/Sub 介绍

Redis 的发布订阅(Pub/Sub)模式是一种消息传递机制,它允许在发送者和接收者之间建立松耦合的通信关系。在这种模式中,发送者(发布者)将消息发布到一个指定的频道或模式,而接收者(订阅者)可以订阅一个或多个频道,以便接收发布的消息。

以下是Redis发布订阅模式的主要组件:

  • 发布者(Publisher):发布者是产生并发布消息的实体。它可以将消息发送到指定的频道或模式。
  • 订阅者(Subscriber):订阅者是接收并处理消息的实体。它可以订阅一个或多个频道或模式,以便接收相关的消息。
  • 频道(Channel):频道是发布者和订阅者之间的通信渠道。发布者将消息发送到频道,而订阅者从频道接收消息。

可以看下图,Publisher 和 Subscriber、Channel 的关系很清晰:

在这里插入图片描述
发布者往 “Channel A” 通道发布消息:Hello World!,消息的所有订阅者就会收到这个消息

2. 使用 Pub/Sub 实现发布订阅

Redis实现 发布/订阅 一共有两种模式:

  1. 使用频道(Channel)进行发布订阅
  2. 使用模式(Pattern)进行发布订阅

Redis 可以支持多个数据库,每个数据库都有自己的命名空间和数据。通过使用多个数据库,可以实现数据隔离、分区和组织

但是值得注意的是:这种发布订阅机制与 数据分区空间无关,比如在 db 0 发布消息, 其他区的订阅者都会收到消息

Redis 使用以下命令操作 Pub/Sub 工作:

  • SUBSCRIBE:订阅一个或多个频道
    • 语法:SUBSCRIBE channel [channel ...]
  • UNSUBSCRIBE :取消订阅一个或多个频道
    • 语法:UNSUBSCRIBE [channel [channel ...]]
  • PSUBSCRIBE:订阅一个或多个模式
    • 语法:PSUBSCRIBE pattern [pattern ...]
  • PUNSUBSCRIBE取消订阅一个或多个模式
    • 语法:PUNSUBSCRIBE [pattern [pattern ...]]
  • PUBSUB CHANNELS [pattern]:列出活跃的 channel
  • PUBSUB NUMSUB [channel-1 ... channel-N]:列出 channel 的订阅者个数

2.1 通过频道(Channel)进行发布订阅

通过频道(Channel)进行发布订阅过程如下:

  1. Subscriber 订阅某个 Channel,实现对 Channel 的监听
  2. Publisher 对 Channel 这个服务中心媒介发布消息
  3. 所有订阅 Channel 的 Subscriber 接收到消息

2.1.1 订阅者订阅频道

订阅后:

在这里插入图片描述
使用客户端 [subscriber A] 订阅 Channel [mychannel] 来接收消息。从上面可以看出响应的信息:

  • “subscribe” :消息类型,枚举是 subscribe、message、unsubscribe
  • “mychannel” :频道的名称
  • 最后的消息内容:不同的消息类型代表不同含义。

进入订阅后的客户端可以收到 3 种枚举类型的消息:

  • subscribe:订阅成功的消息类型,第 2 个值是订阅成功的频道名称,第 3 个值是当前客户端订阅的频道数量。
  • message:客户端接收消息的消息类型,第 2 个值表示产生消息的频道名称,第 3 个值是消息的内容。
  • unsubscribe:取消订阅的消息类型,第 2 个值是对应的频道名称,第 3 个值是当前客户端订阅的频道数量。值为 0 时说明客户端一个订阅的都没有了,退出订阅状态。

2.1.2 发布者发布消息

发布消息:

在这里插入图片描述
发布的消息并不会持久化存储下来,所以消息发布之后被某个 Subcriber 订阅到的话,消息生命周期基本就完成了

2.1.3 订阅者接收消息

想要收到上面 发布者发布的消息,我们的客户端首先需要关注了 [mychannel] 频道,才能收到 “Hello, World!” 这条消息

在这里插入图片描述

2.1.4 退订频道

如果你不想收到某个频道的消息了,你可以取消预订

2.2 使用模式(Pattern)匹配实现发布订阅

来看看另一种实现发布订阅的方案 ,就是模式匹配的方式:除了直接订阅的客户端之外,还会检查是否有与我们模式相匹配的 Channel,如果有,消息也会发布到对应匹配的频道上,订阅这个 Channel 的客户端也会收到消息

如下图:

在这里插入图片描述
当 Message.Queue.Area1 频道接收到消息之后,除了订阅自身频道的 Actor A 和 Actor B 能收到消息之外。因为频道与模式匹配成功,消息还会发送给订阅 Message.Queue.* 模式的所有人员。

在这里插入图片描述
因为使用匹配模式,PUBLISH 消息发布到 Message.Queue.Area2 之外,还会将该 Channel 与匹配模式的Channel进行对比,如果 Channel 与某个模式匹配的话,也将这个消息发布到订阅这个模式的客户端。

所以图中红色线条部分,包括 Actor C、Actor D、Actor E 都接受到了消息

2.2.1 订阅者订阅频道

Client A 订阅 Message.Queue.Area1:

在这里插入图片描述

Client B 订阅 Message.Queue.Area2:

在这里插入图片描述
Client C 订阅 Message.Queue.*:

在这里插入图片描述

2.2.2 发布者发布消息

在这里插入图片描述

2.2.3 订阅者接收消息

对应频道的订阅者收到消息(Client A ):

在这里插入图片描述
匹配模式的订阅者收到消息(Client C):

在这里插入图片描述

因为没有筛重策略,所以如果你既订阅了匹配模式(如 Message.Queue.* ),又订阅了对应的频道(如 Message.Queue.Area2),那么你的客户端会收到两条同样的消息,一条消息类型是message,一条类型是 pmessage

3. SpringBoot 整合 Redis 实现发布订阅模式

3.1 概述

订阅消息就是接收消息,这个比较复杂。既有对 Redis 连接的管理,也有对消费消息的线程池的管理。不过 Spring 已经把这个“重活”给干了。

Spring 提供了一个全套的解决方案,这里面包括:

  1. 订阅/取消订阅这些相关的用户操作
  2. 接收所有来自Redis的消息
  3. 把这些消息按照订阅关系分发给具体的消费者
  4. 触发消费消息的回调代码在线程池中运行

由于 Spring 已经全权代理,用户只需要提供要消费的 topic 以及对应的消费回调代码即可。

我们需要了解Spring提供的几个接口和类,才可以很好的使用:

  1. Topic 接口,表示一个订阅对象:它有两个实现类,ChannelTopicPatternTopic,前者对应 redis的 channel,后者对应 redis 的 pattern
  2. MessageListener 接口,回调接口,通过它来执行业务代码
  3. Message 接口,表示从 redis 接收到的消息
  4. RedisMessageListenerContainer 类,这个核心类,相当于一个代理,就是它负责接收 redis 的消息,并分发给 MessageListener
  5. RedisConnectionFactoryRedisMessageListenerContainer 需要此类 RedisConnectionFactory:redis连接工厂,用来获取一个redis连接,由于这个连接用于接收消息,所以它是一直阻塞着的
  6. 还可以为这个类指定一个 Executor,即线程池,这不是必须的,如果不指定它会生成一个默认的

SpringBoot集成Redis Messaging (Pub/Sub)

3.2 在 Springboot 中使用发布订阅

先说下在 springboot 中使用 redis 的发布订阅的步骤:

  1. 配置消息监听类(实现 MessageListener 接口,重写 onMessage() 方法)。
  2. 添加监听容器(配置 RedisMessageListenerContainer)。
  3. 订阅频道。
  4. 向频道发布消息。

3.2.1 配置消息监听类

添加一个订单消息监听器:

@Component
public class OrderSubscriber implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 获取消息byte[] messageBody = message.getBody();// 使用值序列化器转换Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);// 获取监听的频道byte[] channelByte = message.getChannel();// 使用字符串序列化器转换Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);// 渠道名称转换String patternStr = new String(pattern);System.out.println(patternStr);System.out.println("---频道---: " + channel);System.out.println("---消息内容---: " + msg);}}

3.2.2 容器添加监听器、订阅频道

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);// json 序列化配置Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);jackson2JsonRedisSerializer.setObjectMapper(om);// String 序列化StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// 所有的 key 采用 string 的序列化template.setKeySerializer(stringRedisSerializer);// 所有的 value 采用 jackson 的序列化template.setValueSerializer(jackson2JsonRedisSerializer);// hash 的 key 采用 string 的序列化template.setHashKeySerializer(stringRedisSerializer);// hash 的 value 采用 jackson 的序列化template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置连接工厂container.setConnectionFactory(redisConnectionFactory);// 监听器订阅频道container.addMessageListener(orderSubscriber, new ChannelTopic("order"));container.addMessageListener(orderSubscriber, new ChannelTopic("sms"));// 序列化Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);container.setTopicSerializer(seria);return container;}}

3.2.3.1 容器添加多个监听器

添加一个短信监听器:

@Component
public class SmsSubscriber implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 获取消息byte[] messageBody = message.getBody();// 使用值序列化器转换}
}

修改配置:

@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber, SmsSubscriber smsSubscriber) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置连接工厂container.setConnectionFactory(redisConnectionFactory);// 监听器订阅频道container.addMessageListener(orderSubscriber, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));container.addMessageListener(smsSubscriber, new ChannelTopic("sms"));// 序列化Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);container.setTopicSerializer(seria);return container;
}

3.2.3.2 使用 PatternTopic

container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));

3.2.3 向频道发布消息

@RestController
@RequestMapping("/pub")
public class PubController {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@GetMapping("/publish")public String publish() {redisTemplate.convertAndSend("order", "该订单已过期");redisTemplate.convertAndSend("sms", "该短信已发送");return "publish";}}

3.3 使用 MessageListenerAdapter 实现发布订阅

1、定义 一个消息接受类

@Component
public class OrderMessageReceiver {public void receiveMessage(String message, String channel){System.out.println("---频道---: " + channel);System.out.println("---消息内容---: " + message);}
}

2、配置一个 MessageListenerAdapter

@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter adapter, SmsSubscriber smsSubscriber) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置连接工厂container.setConnectionFactory(redisConnectionFactory);// 监听器订阅频道container.addMessageListener(adapter, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));// 序列化Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);container.setTopicSerializer(seria);return container;
}@Bean
public MessageListenerAdapter smsExpirationListener(OrderMessageReceiver messageListener) {MessageListenerAdapter receiveMessage = new MessageListenerAdapter(messageListener, "receiveMessage");// 序列化Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);receiveMessage.setSerializer(seria);return receiveMessage;
}

Spring boot整合Redis实现发布订阅(超详细)
springboot中使用redis发布订阅

4. 总结

当使用 Pattern 进行发布订阅的时候。如果有消息发布出来,除了订阅该 Channel 的 Client 之外,所有订阅了与 Channel 匹配的模式的 Client 同样会收到消息。

另外,Redis 发布订阅的消息不会被持久化,所以无历史消息,也不支持 ACK 机制,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/129129.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql--技术文档--索引-《索引为什么查找数据快?》-超底层详细说明索引

索引的概念 在MySQL中&#xff0c;索引是一种数据结构&#xff0c;它被用于快速查找、读取或插入数据。索引能够极大地提高数据库查询的速度。 索引的工作方式类似于图书的索引。如果你想在图书馆找到一本书&#xff0c;你可以按照书名进行查找。书名就像是一个索引&#xf…

C#winform导出DataGridView数据到Excel表

前提&#xff1a;NuGet安装EPPlus&#xff0c;选择合适的能兼容当前.net framwork的版本 主要代码&#xff1a; private void btn_export_Click(object sender, EventArgs e) {SaveFileDialog saveFileDialog new SaveFileDialog();saveFileDialog.Filter "Excel Files…

TCP三次握手和四次挥手

目录 TCP连接建立 问题思考 1.为什么要三次握手&#xff1f; 2.三次握手一定要保证成功吗&#xff1f; TCP连接释放 问题思考 ​ 1.理解TIME-WAIT状态 2.理解CLOSE-WAIT状态 TCP连接建立 TCP建立连接的过程叫作握手&#xff0c;握手需要在客户和服务器之间交换三个TCP…

【LeetCode-简单题】844. 比较含退格的字符串

文章目录 题目方法一&#xff1a;单指针方法二&#xff1a;双指针方法三&#xff1a;栈 题目 方法一&#xff1a;单指针 首先每次进入循环处理之前需要对第一个字符进行判断&#xff0c;若是退格符&#xff0c;直接删掉&#xff0c;结束此次循环fast从0开始&#xff0c;如果fa…

无涯教程-JavaScript - COUPNCD函数

描述 COUPNCD函数返回一个数字,该数字表示结算日期之后的下一个息票日期。 语法 COUPNCD (settlement, maturity, frequency, [basis])争论 Argument描述Required/OptionalSettlement 证券的结算日期。 证券结算日期是指在发行日期之后将证券交易给买方的日期。 RequiredMa…

OSPF路由计算

1、Router LSA LSA 链路状态通告&#xff0c;是OSPF进行路由计算的主要依据&#xff0c;在OSPF的LSU报文中携带&#xff0c;其头重要字段及解释&#xff1a; LS Type&#xff08;链路状态类型&#xff09;&#xff1a;指示本LSA的类型。 在域内、域间、域外…

OpenResume简历解析官方技术文档(翻译)

OpenResume简历解析官方技术文档(翻译) 本文是对OpenResume建立解析器官方技术文档《Resume Parser Playground》的翻译。 相关连接&#xff1a; OpenResume官网 OpenResume简历解析器的官方地址 OpenResume的Github 简历解析测试环境 该测试环境展示了 OpenResume 简历…

vue页面添加水印(可用于H5,APP)

vue页面添加水印 背景实现新建vue组件使用效果 尾巴 背景 最近实现了一个小功能&#xff0c;就是给页面添加背景水印。实现思路就是定义一个宽高充满屏幕的组件&#xff0c;然后使用绝对定位并通过层级控制让水印显示在页面的最前端。 实现 代码相对简单&#xff0c;相信有点…

2023-9-11 高斯消元解异或线性方程组

题目链接&#xff1a;高斯消元解异或线性方程组 #include <iostream> #include <algorithm>using namespace std;const int N 110;int n; int a[N][N];int gauss() {int c, r;for(c r 0; c < n; c ){int t r;for(int i r; i < n; i )if(a[i][c]){t i;b…

超图聚类论文阅读1:Kumar算法

超图聚类论文阅读1&#xff1a;Kumar算法 《超图中模块化的新度量&#xff1a;有效聚类的理论见解和启示》 《A New Measure of Modularity in Hypergraphs: Theoretical Insights and Implications for Effective Clustering》 COMPLEX NETWORKS 2020, SCI 3区 具体实现源码见…

vue checkbox-group和checkbox动态生成,问题解决

源码 <el-checkbox-group v-model"form[keyItem.name]"><el-checkboxv-for"(checkboxItem,cindex) in keyItem.options.split(,)":key"cindex":label"checkboxItem"></el-checkbox></el-checkbox-group> 我是…

不关闭Tamper Protection(篡改保护)下强制卸载Windows Defender和安全中心所有组件

个人博客: xzajyjs.cn 背景介绍 由于微软不再更新arm版本的win10系统&#xff0c;因此只能通过安装insider preview的镜像来使用。而能找到的win10 on arm最新版镜像在安装之后由于内核版本过期&#xff0c;无法打开Windows安全中心面板了&#xff0c;提示如下&#xff1a; 尝…

——二叉树

二叉树种类 二叉树有两种主要的形式&#xff1a;满二叉树和完全二叉树。 满二叉树 如果一棵二叉树只有度为0的结点和度为2的结点&#xff0c;并且度为0的结点在同一层上&#xff0c;则这棵二叉树为满二叉树。 完全二叉树 在完全二叉树中&#xff0c;除了最底层节点可能没…

buuctf web 前5题

目录 一、[极客大挑战 2019]EasySQL 总结&#xff1a; 二、[极客大挑战 2019]Havefun 总结&#xff1a; 三、[HCTF 2018]WarmUp 总论&#xff1a; 四、[ACTF2020 新生赛]Include 总结&#xff1a; 五、[ACTF2020 新生赛]Exec 总结&#xff1a; 一、[极客大挑战 2019]…

VPS使用环境受限?亚马逊云科技Amazon Lightsail为开发者提供更多选择

对于开发者而言&#xff0c;当你想构建系统架构时&#xff0c;你的面前就出现了两种选择&#xff0c;选择一是花时间去亲手挑选每个亚马逊云科技组件&#xff08;云服务器、存储、IP地址等&#xff09;&#xff0c;然后自己组装起来&#xff1b;选择二是只需要一个预先配置且预…

C语言经典100例题(51-54)--学习使用按位与 ,按位或 |,按位异或 ^和按位取反~

目录 题目 问题分析 按位与操作符&#xff08;&&#xff09; 按位或操作符&#xff08;|&#xff09; 按位异或操作符&#xff08;^&#xff09; 按位取反操作符&#xff08;~&#xff09; 代码及运行结果 题目 学习使用按位与& ,按位或 |,按位异或 ^和按位取反…

解决微信开发者工具企业微信小程序模式下模拟器白屏问题

前一天晚上没有关电脑&#xff0c;第二天发现电脑自己重启了&#xff0c;然后微信开发者工具就出了问题&#xff0c;在企业微信小程序模式下&#xff0c;模拟器出现了白屏&#xff0c;只有上方title可以正常显示。点击模拟器右上角三个点都不出弹出菜单&#xff0c;并且在调试器…

初识Nacos

前言 Nacos是一个用于微服务架构下的服务发现和配置管理以及服务管理的综合解决方案&#xff08;官网介绍&#xff09;&#xff0c;这里的服务发现其实就是注册中心&#xff0c;配置管理就是配置中心&#xff0c;而服务管理是二者的综合&#xff1b; Nacos特性 1.服务发现与…

李宏毅机器学习笔记:RNN循环神经网络

RNN 一、RNN1、场景引入2、如何将一个单词表示成一个向量3种典型的RNN网络结构 二、LSTMLSTM和普通NN、RNN区别 三、 RNN的训练RNN与auto encoder和decoder 四、RNN和结构学习的区别五、pytorch实现RNN与LSTM5.1为何 H o u t h i d d e n s i z e H_{out}hidden_size Hout​hi…

一个集成的BurpSuite漏洞探测插件1.1

免责声明 本文发布的工具和脚本&#xff0c;仅用作测试和学习研究&#xff0c;禁止用于商业用途&#xff0c;不能保证其合法性&#xff0c;准确性&#xff0c;完整性和有效性&#xff0c;请根据情况自行判断。如果任何单位或个人认为该项目的脚本可能涉嫌侵犯其权利&#xff0c…