【Redis】Redis 的消息队列 List、Streams—(六)

目录

    • 一、消息队列
    • 二、List 方案
    • 三、Streams 方案

在这里插入图片描述
在这里插入图片描述

一、消息队列

我们一般把消息队列中发送消息的组件称为生产者,把接收消息的组件称为消费者,下图是一个通用的消息队列的架构模型:
消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。

  • (1)消息保序

虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。

  • (2)重复消息处理

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。如果多次处理重复消息的话,就可能造成一个业务逻辑被多次执行,从而出现数据问题。

  • (3)消息可靠性保证

消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

二、List 方案

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,已经能满足消息 保序 的需求了。具体来说,生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。
在这里插入图片描述

List 并不会主动地通知消费者有新消息写入,如果消费者循环调用 RPOP 命令又会带来 CPU 开销问题。Redis 提供了 BRPOP 命令,称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

在解决 重复消息处理 的问题上,一方面,消息队列要能给每一个消息提供全局唯一的 ID,另一方面,消费者程序要把已经处理过的消息的 ID 记录下来,如果已经处理过,消费者程序就不再进行处理了。这种处理特性也称为幂等性,指对于同一条消息,消费者收到一次或多次的处理结果是一致的。

不过,List 本身是不会为每个消息生成 ID,所以,消息的全局唯一 ID 需要生产者程序在发送消息前自行生成,并包含在消息中以供消费者处理。

为了 保证消息可靠性 ,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存,这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
在这里插入图片描述

三、Streams 方案

如果生产者消息发送很快,而消费者处理消息的速度比较慢,会导致 List 中的消息越积越多,给 Redis 的内存带来很大压力,而 List 并不支持多个消费者同时处理。这时候就要用到 Redis 从 5.0 版本开始提供的 Streams 数据类型了。Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID
  • XREAD:用于读取消息,可以按 ID 读取数据
  • XREADGROUP:按消费组形式读取消息
  • XPENDING:命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息
  • XACK:命令用于向消息队列确认消息处理已完成
  • XADD 命令插入新消息的格式是键 - 值对形式,例如往名称为 mqstream 的消息队

列中插入一条消息:

XADD mqstream * repo 5
"1599203861727-0"

其中,* 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID,例如“1599203861727-0”。也可以不用 *,直接在消息队列名称后自行设定一个 ID,只要保证全局唯一就行。

自动生成的 ID 由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,从 0 开始。例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。

XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。设定 block 配置项,可实现类似于 BRPOP 的阻塞读取操作,单位是毫秒。例如,从 ID 为 1599203861727-0 的消息开始,读取后续的所有消息(共 3 条)

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"2) 1) 1) "1599274912765-0"2) 1) "repo"2) "3"2) 1) "1599274925823-0"2) 1) "repo"2) "2"3) 1) "1599274927910-0"2) 1) "repo"2) "1"

再看一个例子,命令以 $ 结尾表示读取最新的消息,同时设置了 block 10000 的配置项,表明 XREAD 在读取最新消息时,如果没有消息到来将阻塞 10000 毫秒(即 10 秒),然后再返回。当消息队列 mqstream 中一直没有消息时,XREAD 在 10 秒后返回空值(nil)

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

XGROUP 创建消费组,是区别于 List 的功能,创建后 Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

例如,我们执行下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream

XGROUP create mqstream group1 0
OK

执行命令,让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。

在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息共4条。

XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599203861727-0"2) 1) "repo"2) "5"2) 1) "1599274912765-0"2) 1) "repo"2) "3"3) 1) "1599274925823-0"2) 1) "repo"2) "2"4) 1) "1599274927910-0"2) 1) "repo"2) "1"

如果队列中的消息已经被其他消费者读取,则其他消费者无法读取,例如,再让 group1 内的 consumer2 读取消息时,返回空值。

XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"2) (empty list or set)

消费组的目的是让组内的多个消费者共同分担读取,从而实现负载均衡,例如,让 group2 中的 consumer1、2、3 各自读取一条消息

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599203861727-0"2) 1) "repo"2) "5"XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599274912765-0"2) 1) "repo"2) "3"XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599274925823-0"2) 1) "repo"2) "2"

为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

例如,查看一下 group2 中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。

XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"2) "1"2) 1) "consumer2"2) "1"3) 1) "consumer3"2) "1"

如果需要进一步查看某个消费者具体读取了哪些数据,可以执行以下命令,consumer2 已读取的消息的 ID 是 1599274912765-0

XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"2) "consumer2"3) (integer) 5133364) (integer) 1

当 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

 XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

一张表格,汇总了用 List 和 Streams 实现消息队列的特点和区别
在这里插入图片描述

Redis 是一个非常轻量级的键值数据库,Kafka、RabbitMQ 是专门面向消息队列场景的重量级软件,例如 Kafka 的运行就需要再部署 ZooKeeper。

如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/411630.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

超详细超实用!!!java开发之IntelliJ IDEA下载与安装破解以及汉化教程(三)

云风网 云风笔记 云风知识库 一、安装包下载 1、官网下载 2、ideaIU-2024.2.0.2.exe 百度网盘资源 安装包下载完成后进行傻瓜式下一步安装就可以了 二、破解激活 由于IntelliJ IDEA可免费一个月,后续需要付费购买激活码,这里采用破解激活的方式 将…

从零上手CV竞赛Task2 # Datawhale AI夏令营

文章目录 平台参赛平台云平台 Task 1 从零上手CV竞赛下载baseline相关文件一键运行baseline!(大约需要25分钟)赛题解析数据集提交结果违法标准注意事项 下载生成的文件结果如图最后要记得关机 不然一直消耗算力 Task 2 建模方案解读与进阶物体…

从 0 到 1 的Prompt 教程,来自Claude 官方,不会写 prompt的看这个足够

Claude 的强大,最近得到了很多网友的验证,甚至效果上面大有超越 GPT的许多声音。 所以从优秀的 Claude 中学习,将会是一个很好的起点。 这里,Claude 的开发者们提供了一个相当详细的 Prompt Engineering 教程。 这个教程能够全面…

若依后端 MyBatis改为MyBatis-Plus

引用 1.引入MyBatis-Plus依赖 在总目录的pom.xml&#xff0c;导入依赖 <mybatis-plus.version>3.4.3</mybatis-plus.version> <!-- mybatis-plus 增强CRUD --> <dependency> <groupId>com.baomidou</groupId> <artifactId>…

计算机毕业设计选题推荐-医院门诊预约-医院预约挂号微信小程序/安卓APP-项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

Unity(2022.3.41LTS) - 着色器

目录 一、着色器的基本概念 二、表面着色器 一、着色器的基本概念 定义与作用&#xff1a; 着色器是一种在图形硬件上运行的程序&#xff0c;用于控制物体的颜色、纹理、光照、透明度等视觉属性。它通过对输入的几何数据&#xff08;如顶点位置、法线、纹理坐标等&#xff09…

项目:基于TCP的文件传输系统

项目介绍: 模拟FTP原理&#xff1a;客户端连接服务器后&#xff0c;向服务器发送一个文件。文件名可以通过参数指定&#xff0c;服务器端接收客户端传来的文件&#xff08;文件名随意&#xff09;&#xff0c;如果文件不存在自动创建文件&#xff0c;如果文件存在&#xff0c;…

代码随想录Day 23|回溯Part02,39.组合总和、40.组合总和Ⅱ、131.分割回文串

提示&#xff1a;DDU&#xff0c;供自己复习使用。欢迎大家前来讨论~ 文章目录 第七章 回溯算法part03一、题目题目一&#xff1a; 39. 组合总和解题思路&#xff1a;回溯三部曲剪枝优化小结&#xff1a; 题目二&#xff1a;40.组合总和Ⅱ解题思路&#xff1a;回溯三部曲 题目…

【项目源码】终于有人将打字游戏和编程英语结合起来啦!编程初学者的福音

Hello&#xff01;各位彦祖&#xff0c;亦菲们&#xff01;又是美好的一天&#xff01;今天给大家分享一个Java项目源码&#xff1a;Java打字游戏项目源码&#xff01; 看到这里&#xff0c;你可能会说&#xff01; 一个破打字游戏有什么可神气的&#xff01;&#xff01;&…

Leetcode面试经典150题-28.找出字符串第一个匹配项的下标

解法都在代码里&#xff0c;不懂就留言或者私信&#xff0c;比第一题稍微难点 用KMP解这个题简直就像大炮打蚂蚁&#xff0c;但是没办法&#xff0c;现在都是这么卷 package dataStructure.bigFactory;public class _28Strstr {public static int strStr(String s1, String s…

springboot3.x入门系列【5】支持unix sock 套接字服务

目录 一、简介 二、springBoot3.x 套接字的支持 1. 环境要求 2. springboot内置tomcat 2.1 支持unix 设置 unixDomainSocketPath 2.2 windows 下unix服务测试 3. springboot外置tomcat 3.1 tomcat 配置unix socket 套接字 3.2 启动tomcat 服务 3.3 nginx 支持unix…

android13固定app方向 强制app方向

总纲 android13 rom 开发总纲说明 1.前言 经常遇到客户有固定或者设置应用方向,不让他们改成其他方向的需求。今天我们就来处理这种问题。 2.问题分析 在 Android 10 之前,显示旋转的处理逻辑分散在多个类和模块中,Android 10 统一了这些逻辑,集中在 DisplayRotation.ja…

MATLAB 沿任意方向分层点云(82)

MATLAB 沿任意方向分层点云(82) 一、算法介绍二、算法实现1.代码2.效果更多内容参考: MATLAB点云处理学习 一、算法介绍 沿着某个方向,将点云分割为多层,每层点云使用不同颜色进行可视化显示,具体代码和不同方向的分层效果如下: 二、算法实现 1.代码 % Load point c…

【文档合集】软件类常用文档整理大全,软件工程,软件项目管理,技术标书方案,模

目的&#xff1a;规范系统开发流程&#xff0c;提高系统开发效率。 立项申请需求分析方案设计方案评审开发调整测试阶段系统培训试运行测试验收投入使用 所有文档过去进主页获取。 获取方式&#xff1a;本文末个人名片直接获取。 软件资料清单列表部分文档清单&#xff1a;工作…

【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch12 随机森林(Random Forest)

系列文章目录 监督学习&#xff1a;参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归&#xff08;SAheart.csv&#xff09; 【学习笔记】 陈强-机器学习-Python-…

(三)、软硬件全开源手表,具有Wi-Fi和蓝牙LE连接,并可连接互联网,超多表盘。(Watchy)

Watchy 是一款具有开源硬件和软件的 E-Ink watch。它采用准系统设计&#xff0c;利用 PCB 作为表身&#xff0c;可以原样佩戴&#xff0c;或进一步定制不同的 3D 打印表壳和表带。这是一款独特的设计&#xff0c;也是一个可穿戴开发平台&#xff0c;让用户可以创造自己的应用。…

【功能自动化】使用HTMLTestRunner生成测试报告

配置环境&#xff1a; 1.部署webtours网站 2.user.txt 3.HTMLTestRunner.py """ A TestRunner for use with the Python unit testing framework. It generates a HTML report to show the result at a glance.The simplest way to use this is to invoke it…

【论文阅读|cryoET】本周粗读汇总

论文1&#xff1a;CryoDRGN-ET&#xff1a;深度重建生成网络以可视化细胞内动态生物分子 Abstract 虽然冷冻电子断层扫描可以以分子分辨率揭示结构&#xff0c;但图像处理算法仍然是解决原位生物分子结构异质性的瓶颈。本文介绍CryoDRGN-ET用于cryoET断层图的异质重建。CryoD…

Figma 替代品 Penpot 安装和使用教程

在设计领域&#xff0c;Figma 无疑是一个巨人。它彻底改变了设计流程&#xff0c;将协作带到了一个全新的高度。然而&#xff0c;随着 Adobe 收购 Figma 的消息传出&#xff0c;许多设计师和开发者开始担心&#xff1a;Figma 未来会如何演变&#xff1f;那些好用的特性会不会被…

灵办AI:解锁办公新境界,让工作更智能、更高效!

在这个信息爆炸的时代&#xff0c;我们每个人都在寻找能够提升效率、简化工作流程的工具。如果您正在寻找一个能够全方位提升工作效率的AI助手&#xff0c;那么灵办AI绝对值得您的关注。 为什么选择灵办AI&#xff1f; 在众多AI工具中&#xff0c;灵办AI凭借其卓越的性能和独…