Redis 篇-深入了解基于 Redis 实现消息队列(比较基于 List 实现消息队列、基于 PubSub 发布订阅模型之间的区别)

🔥博客主页: 【小扳_-CSDN博客】
❤感谢大家点赞👍收藏⭐评论✍

文章目录

        1.0 消息队列的认识

        2.0 基于 List 实现消息队列

        2.1 基于 List 实现消息队列的优缺点

        3.0 基于 PubSub 实现消息队列

        3.1 基于 PubSub 的消息队列优缺点

        4.0 基于 Stream 实现消息队列

        4.1 Stream 的单消费模式

        4.2 Stream 的消费组模式


        1.0 消息队列的认识

        消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包含 3 个角色:

        1)消息队列:存储和管理消息,也被称为消息代理(Message Broker)

        2)生产者:发送消息到消息队列。

        3)消费者:从消息队列获取消息并处理消息。

        2.0 基于 List 实现消息队列

        Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果。

实现思路:

        队列时入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP 或者 RPUSH 结合 LPOP 来实现。不过需要注意的是,当队列中没有消息时 RPOP 或 LPOP 操作会直接返回 null ,并不像 JVM 的阻塞队列那样会阻塞并等待消息,因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果。

代码演示:

        当数据要进入队列时,那么可以使用 LPUSH KEY VALUE 命令,KEY 为队列名称,VALUE 为数据值,将数据写入 Redis 中。当要获取数据的时,使用 BRPOP KEY TIMEOUT命令,KEY 为队列名称,TIMEOUT 为最大阻塞时间,在最大阻塞时间内,仍旧没有获取数据,则返回 null 。该命令主要做了两步,将数据移除队列中,并将该数据返回。

        2.1 基于 List 实现消息队列的优缺点

        优点:

        1)利用 Redis 存储,不受限于 JVM 内存上限。

        2)基于 Redis 的持久化机制,数据安全性有保证。

        3)可以满足消息有序性。

        缺点:

        1)无法避免消息丢失。

        当在 BRPOP 获取数据的时候,出现异常,返回数据失败,从而导致数据丢失。因为数据已经从队列中移除出来了,所以队列中已经不存在之前的数据了。

        2)只支持单消费者。

        当一个消费者来消费之后,其他再来的消费者就不能再获取到第一个消费者的数据,所以说数据只能给一个消费者。

        3.0 基于 PubSub 实现消息队列

        PubSub 是 Redis2.0 版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个 channel ,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

常用的命令:

        1)SUBSCRIBE channel [channel]:订阅一个或者多个频道。

        2)PUBLISH channel msg:向一个频道发送消息。

        3)PSUBSCRIBE pattern [pattern]:订阅与 pattern 格式匹配的所有频道。* 代表通配符,订阅所有频道。

        这就实现了支持多个消费者获取到相同的消息。当消息被发布了,那么已经订阅该频道的消费者就可以及时获取到消息了。

代码演示:

        先订阅频道:

        发送消息:

        当生产者发送完消息,消费者就会收到通知,从通道中获取到消息。

        3.1 基于 PubSub 的消息队列优缺点

        优点:

        1)采用发布订阅模型,支持多生产、多消费。

        解决了基于 List 实现的消息队列的缺点,单消费。

        缺点:

        1)不支持数据持久化。

        将消息发布出去之后,不会进行数据保存。不管有无消费者订阅,都会将消息直接发布出去。

        2)无法避免消息丢失。

        因为不支持持久化,当消息丢失之后,无法再找到原本的数据。

        3)消息堆积有上限,超出时数据丢失。 

        在消费者中,接收到的数据会暂时存放起来,一旦超过存放的大小,就会导致数据丢失。

        4.0 基于 Stream 实现消息队列

        Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

XADD key ID field string [field string ...]

         key 为队列名称,*|ID 为消息的唯一 id,* 代表由 Redis 自动生成。格式是“时间戳-递增数字”,例如 "1644804662707-0"。field value 代表发送到队列中的消息,称为  Entry 。格式就是多个 key-value 键值对。

代码演示:

        4.1 Stream 的单消费模式

单消费者获取数据的命名:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

        COUNT count 为每次读取消息的最大数量;BLOCK milliseconds 代表当没有消息时,是否阻塞,阻塞时长;STREAMS key 代表要从哪个队列读取消息,key 就是队列名;ID 代表起始 id ,只返回大于该 ID 的消息,0 为从第一个消息开始,而 $ 为从最新的消息开始。

代码演示:

        当 ID 使用 $ 时,不会从原本 s 中直接获取原本的数据,而是在 2 秒内有无最新的数据添加进来,如果有,则返回该数据;如果没有,则返回 null。

        当 ID 使用 0 时,则从原本 s 中直接获取原本的数据。

        在业务开发中,我们可以循环的调用 XREAD 阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

        需要注意的地方:

        当使用 Stream 单消费者模式的时候,我们指定起始 ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

        XREAD 命令特点:

        1)消息可回溯。

        2)一个消息可以被多个消费者读取。

        3)可以阻塞读取。

        4)有消息漏读的风险。

        4.2 Stream 的消费组模式

        将多个消费者划分到一个组中,监听同一个队列。

特点:

        1)消息分流:

        队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。

        2)消息标示:

        消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。

        3)消息确认:

        消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list 。当处理完成后,需要通过 XACK 来确认消息,标记消息为已处理,才会从 pending-list 移除。

创建消费者组:

XGROUP CREATE key groupname id|$ [MKSTREAM]

        key 代表队列名称,groupName 代表消费者组名称,ID 起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息。MKSTREAMS 代表不存在时自动创建队列。

        如果之前列表的数据要继续获取,则 ID 选为 0;如果之前的列表中的数据不需要了,则 ID 选为 $ 。

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

        group 代表组名,consumer 代表组内消费者名称,count 代表每次读取的最大数量,milliseconds 代表当没有消息时最长的等待时间,NOACK 代表无需手动 ACK,获取消息后自动确认。key 代表指定队列名称,

        ID 代表获取消息的起始 ID :

                当 ID 为 ">" :从下一个未消费的消息开始。

                当 ID 为其他:根据指定 id 从 pending-list 中获取已消费但未确认的消息,例如 0,是从 pending-list 中的第一个消息开始。

确认消息:

XACK key groupName ID

        key 为队列名,groupName 为组名,ID 为消息唯一 id 。

查看未确认的消息:

XPENDING key group [start end count] [consumer]

        key 为队列名,group 为组名,start 起始地址,count 个数,consumer 组内消费者名称。

消费者监听消息思路:

 

Java 代码实现从消息队列中获取消息:

import cn.hutool.core.bean.BeanUtil;
import com.project.volunteermanagementproject.pojo.StreamObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.util.List;
import java.util.Map;
@Component
public class StreamUtil {@AutowiredStringRedisTemplate stringRedisTemplate;//实现从消息队列中获取消息public void getStream(){while (true){try {List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("s1", ReadOffset.lastConsumed()));if (read == null || read.isEmpty()){//如果获取失败,说明没有消息,继续下一次循环continue;}//解析消息中的消息MapRecord<String, Object, Object> entries = read.get(0);Map<Object, Object> value = entries.getValue();StreamObject streamObject = BeanUtil.fillBeanWithMap(value, new StreamObject(), true);//这就拿到了消息队列中的数据了,就可以去使用该对象了System.out.println(streamObject);//这就需要确认消息队列stringRedisTemplate.opsForStream().acknowledge("s1", "g1", entries.getId());} catch (Exception e) {//如果在获取消息过程中出现异常,则需要再次执行该消息任务while (true){try {List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("s1", ReadOffset.from("0")));if (read == null || read.isEmpty()){break;}MapRecord<String, Object, Object> entries = read.get(0);Map<Object, Object> value = entries.getValue();StreamObject streamObject = BeanUtil.fillBeanWithMap(value, new StreamObject(), true);//重新拿到未确认的数据System.out.println(streamObject);//再次进行消息确认Long acknowledge = stringRedisTemplate.opsForStream().acknowledge("s1", "g1", entries.getId());} catch (Exception ex) {throw new RuntimeException(ex);}}}}}
}

XREADGROUP 命令特点:

        1)消息可回溯

        2)可以多消费者争抢消息,加快消费速度

        3)可以阻塞读取

        4)没有消息漏读的风险

        5)有消息确认机制,保证消息至少被消费一次

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/422954.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity数据持久化 之 使用Excel.DLL读写Excel表格

本文仅作笔记学习和分享&#xff0c;不用做任何商业用途 本文包括但不限于unity官方手册&#xff0c;unity唐老狮等教程知识&#xff0c;如有不足还请斧正​​ 终于找到一个比较方便容易读表的方式了&#xff0c;以前用json读写excel转的cvs格式文件我怎么使用怎么别扭&#xf…

AlmaLinux 9 上配置静态 IP 地址

在 Rocky Linux 9 中&#xff0c;密钥文件的新默认存储位置在 /etc/NetworkManager/system-connections 中 cd /etc/NetworkManager/system-connections默认dhcp配置 ~ …

免费SSL证书正在逐渐被淘汰,证书部署自动化的发展趋势即将到来!

目录 背景解决方案。1.使用自签证书&#xff08;浏览器报警、免费&#xff09;2.更换支持自签自续的CA机构&#xff08;免费&#xff09;3.付费选择CA机构 免费SSL证书正在逐渐被淘汰&#xff0c;证书部署自动化的发展趋势即将到来免费的SSL证书有以下弊端1.有效期短&#xff1…

stm32驱动开发与linux驱动的区别

stm32&#xff0c;gpio设置原理 下图&#xff0c;定义了gpio E的基地址&#xff0c;只要将这个地址强制转换成gpiotypedf的类型&#xff0c;解析时&#xff0c;结构体地址就会自增。这样就可以对不同gpio组&#xff0c;就像定义。 全部gpio定义&#xff0c;强制为结构体类型…

Linux CentOS更换阿里云源解决Could not retrieve mirrorlist http://mirrorlist.centos.org

Linux CentOS7 更新yum 操作的时候出现这个问题&#xff1a; Loading mirror speeds from cached hostfile Could not retrieve mirrorlist http://mirrorlist.centos.org 然后我执行 grep -nr "mirrorlist.centos.org" /etc/yum.repos.d/* 出现 这个问题时可以…

搭建 WordPress 及常见问题与解决办法

浪浪云活动链接 &#xff1a;https://langlangy.cn/?i8afa52 文章目录 环境准备安装 LAMP 堆栈 (Linux, Apache, MySQL, PHP)配置 MySQL 数据库 安装 WordPress配置 WordPress常见问题及解决办法数据库连接错误白屏问题插件或主题冲突内存限制错误 本文旨在介绍如何在服务器上…

爬虫使用代理IP后报错?解决方案在这里!

在数据抓取的过程中&#xff0c;使用代理IP是避免被封禁、提高抓取效率的重要手段。然而&#xff0c;有时候即使配置了代理IP&#xff0c;依然会遇到各种报错问题。本文将详细解析常见的报错类型&#xff0c;并提供解决方案&#xff0c;帮助你顺利进行数据抓取。 常见报错类型…

MySQL表的操作与数据类型

目录 前言 一、表的操作 1.创建一个表 2.查看表的结构 3.修改表 4.删除一个表 二、 MySQL的数据类型 0.数据类型一览&#xff1a; 1.整数类型 2.位类型 3.小数类型 4.字符类型 前言 在MySQL库的操作一文中介绍了有关MySQL库的操作&#xff0c;本节要讲解的是由库管理的结构——…

智能体 vs AI智能体:区别与联系,一文读懂!

​ 在AI技术蓬勃发展的今天&#xff0c;“智能体”&#xff08;Agent&#xff09;和”AI智能体”&#xff08;AI Agent&#xff09;两个概念经常被提及&#xff0c;二者在很多场合下会被混淆&#xff0c;但其实它们有着不同的定义和应用。我觉得很有必要小小科普下两者的定义与…

软件测试学习笔记丨Pytest的使用

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/22158 1. 简介 pytest是一个成熟的全功能python测试框架测试用例的skip和xfail&#xff0c;自动失败重试等处理能够支持简单的单元测试和复杂的功能测试&#xff0c;还可以用来做selenium/ap…

HTML的块级元素与行内元素

在HTML中&#xff0c;元素可以分为两大类&#xff1a;块级元素&#xff08;block-level elements&#xff09;和行内元素&#xff08;inline elements&#xff09;。这两种类型的元素在网页布局和呈现中扮演着不同的角色。 块级元素&#xff08;Block-level Elements&#xff…

CMU 10423 Generative AI:HW1(编程部分:在GPT-2模型中实现RoPE、GQA)

完整代码和PDF笔记&#xff1a;https://github.com/YM2025/CMU_10423_2024S 文章目录 1 概述Rotary Positional Embeddings (RoPE)Grouped Query Attention (GQA)实验任务 2 项目文件1. requirements.txt2. input.txt3. chargpt.py4. mingpt/a. model.pyb. trainer.pyc. utils.…

毕业论文选题难?5招帮你轻松搞定选题!

AIPaperGPT&#xff0c;论文写作神器~ https://www.aipapergpt.com/ 你是不是已经为毕业论文的选题愁得头发都要掉光了&#xff1f;每次打开文档&#xff0c;都觉得什么都想写&#xff0c;又好像什么都写不了。选题看起来很简单&#xff0c;但真正开始动手的时候&#xff0c;…

深入探索系统架构设计

目录 前言 软件的体系结构 软件架构定义 软件架构设计与生命周期 1、需求分析阶段 2、设计阶段 3、实现阶段 4、构件组装阶段 5、部署阶段 6、后开发阶段 软件架构的重要性 1、架构设计能够满足系统的品质 2、架构设计使受益人达成一致的目标 3、架构设计能够支持…

UDS 诊断 - RequestTransferExit(请求传输终止)(0x37)服务

UDS 诊断服务系列文章目录 诊断和通信管理功能单元 UDS 诊断 - DiagnosticSessionControl&#xff08;诊断会话控制&#xff09;&#xff08;0x10&#xff09;服务 UDS 诊断 - ECUReset&#xff08;ECU重置&#xff09;&#xff08;0x11&#xff09;服务 UDS 诊断 - SecurityA…

【北京迅为】《STM32MP157开发板使用手册》- 第二十六章Cortex-M4 GPIO_蜂鸣器实验

iTOP-STM32MP157开发板采用ST推出的双核cortex-A7单核cortex-M4异构处理器&#xff0c;既可用Linux、又可以用于STM32单片机开发。开发板采用核心板底板结构&#xff0c;主频650M、1G内存、8G存储&#xff0c;核心板采用工业级板对板连接器&#xff0c;高可靠&#xff0c;牢固耐…

matlab 基于选权迭代法的空间平面拟合

目录 一、算法原理1、参数平差2、选权迭代法3、参考文献二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接,爬虫自重。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的抄袭狗。 一、算法原理 1、参数平差 由空间几何学知,空间平面方程可以表述为: A x…

【C++】——string类的模拟实现

目录 一、string模拟实现 1.1构造析构 1.2迭代器 1.3修改 1.4查找 1.5substr 深浅拷贝的区别 1.6比较函数与流插入流提取 二、string类的拷贝 2.1浅拷贝与深拷贝 2.2传统版与现代版区别 2.3写时拷贝&#xff08;了解&#xff09; 三、vs和g下string结构的说明 3.1v…

零信任沙箱让源代码防泄漏“如虎添翼”

"数据泄露事件频发&#xff0c;给企业带来了巨大的经济损失和声誉损害。SDC沙盒&#xff0c;一款基于零信任模型构建的数据防泄密解决方案&#xff0c;正成为企业数据安全的新防线。 &#x1f510; 零信任模型的核心&#xff1a;SDC沙盒遵循“永不信任&#xff0c;始终验…

Python爬虫案例七:抓取南京公交信息数据并将其保存成excel多表形式

测试链接: https://nanjing.8684.cn/line4 思路&#xff1a;先抓取某个类型下的某一条线路所有数据&#xff0c;然后实现批量,&#xff0c;列举出三个类型代表既可 源码&#xff1a; from lxml import etree from xlutils.copy import copy import requests, os, xlrd, xlwtd…