Redis 7.x 系列【14】数据类型之流(Stream)

有道无术,术尚可求,有术无道,止于术。

本系列Redis 版本 7.2.5

源码地址:https://gitee.com/pearl-organization/study-redis-demo

文章目录

    • 1. 概述
    • 2. 常用命令
      • 2.1 XADD
      • 2.2 XRANGE
      • 2.3 XREVRANGE
      • 2.4 XDEL
      • 2.5 XLEN
      • 2.6 XREAD
      • 2.7 XGROUP CREATE
      • 2.8 XACK
      • 2.9 XPENDING
    • 3. 应用场景

1. 概述

消息队列:是指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递,生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由服务端给其推送消息。

Redis 也支持消息队列功能,在 5.0 版本之前,基于以下两种方式实现:

  • Pub/Sub
  • List

Pub/Sub 发布订阅模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到:

在这里插入图片描述
Pub/Sub 中的消息无法持久化,如果出现网络断开、宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Redis List 也可以实现消息队列,按照插入顺序排序,可以添加一个元素到列表的头部(左边)或者尾部(右边)。 将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理:
在这里插入图片描述
Redis List 同样存在诸多问题,比如,不支持多消费者模式,不支持延时消息,不支持优先级,不支持消息确认机制等等。

Redis Stream5.0 版本中引入的一种新的数据结构,用于实现简单但功能强大的消息传递模式。以时间序列的方式存储消息,每个消息都有一个唯一的 ID ,并且可以被多个消费者订阅和消费。是 Redis 实现消息队列的另外一种模式,支持消息的持久化、支持自动生成全局唯一 1D、支持 Ack 确认消息模式、支持消费组模式等,旨在让消息队列更加的稳定和可靠。

其结构图如下:
在这里插入图片描述
各部分解释:

  • Message Content:消息内容
  • Consumer group:消费组,通过 XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
  • Last_delivered_id:游标,每个消费组会有个游标 Last_delivered_id,任意一个消费者读取了消息都会使游标往前移动。
  • Consumer:消费者,消费组中的消费者
  • Pending_ ids:消费者会有一个状态变量,用于记录被当前消费已读取但未 ack 的消息 Id ,如果客户端没有 ack ,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack 它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2. 常用命令

Stream 相关所有命令:

命名描述
XACK确认消费者已经成功处理从 Stream 中获取的消息
XADD添加消息到队列末尾
XAUTOCLAIM转移符合指定条件的待处理流条目的所有权
XCLAIM改变待处理消息的所有权
XDEL删除消息
XGROUP CREATE为存储在 key 的流创建一个新的消费者组
XGROUP CREATECONSUMER要在存储在key的流的消费者组中创建一个消费者
XGROUP DELCONSUMER消费者组中删除一个消费者
XGROUP DESTROY删除一个已存在的消费者组
XGROUP SETID为消费者组设置最后传递的ID
XINFO CONSUMERS返回消费者组中的消费者列表
XINFO GROUPS返回消费者组列表
XINFO STREAM存储在的key流的相关信息
XLEN获取 Stream 中的消息长度
XPENDING通过消费者组从流中获取数据但不确认这些数据,会产生待处理条目
XRANGE获取消息列表(可以指定范围)
XREAD获取消息(阻塞/非阻塞),返回大于指定 ID 的消息
XREADGROUPXREAD命令的一个特殊版本,支持消费者组
XREVRANGEXRANGE 相比区别在于反向获取,ID从大到小
XSETID内部命令。它用于主节点来复制流的最后传递的ID
XTRIM限制 Stream 的长度,如果已经超长会进行截取

2.1 XADD

XADD 命令用于向 Stream(流)数据结构末尾添加消息。

语法格式:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

参数说明:

  • key:指定要添加消息的 Stream 的名称。
  • [NOMKSTREAM]:可选参数,用于指定当流不存在时是否报错。默认情况下,如果指定的流不存在,XADD命令会创建。如果使用NOMKSTREAM选项,则流不存在时命令会失败。
  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:这组选项用于控制流的最大长度或最小消息 ID
    • MAXLEN maxlen:限制 Stream 的最大长度。当长度达到maxlen时,旧的消息会被自动删除。
    • MINID minid:指定最旧的消息ID。当插入新消息时,如果已经存在比minid更旧的消息,则会将这些消息删除。
    • [=|~]:操作符,=表示精确匹配,~表示小于等于(对于MINID而言)或大于等于(对于MAXLEN而言)。
    • [LIMIT count]:当使用MAXLEN~时,指定需要保留的消息数量的最小值。
  • *|ID:消息的ID。使用*表示自动生成一个唯一的ID。如果不使用*,则需要提供一个有效的消息ID,它必须大于流中所有已存在的消息的ID
  • field value [field value ...]:消息的字段和值。可以指定一个或多个字段及其对应的值。

示例,插入消息:

localhost:0>XADD mystream * msg_1 100 msg_2 38
"1719279960591-0"

示例, 插入消息,并限制长度不超过 1000 条:

localhost:0>XADD mystream MAXLEN 1000 * msg_3 100 msg_4 38
"1719279971749-0"

查看控制台:

在这里插入图片描述

2.2 XRANGE

XRANGE 命令用于获取指定范围内的消息。

命令格式:

XRANGE key start end [COUNT count]

参数说明:

  • key:指定 Streamkey
  • start:指定要检索的消息范围的起始 ID 。可以使用特殊值-来表示最小值。
  • end:指定要检索的消息范围的结束 ID 。可以使用特殊值+来表示最大值。
  • [COUNT count]:可选参数,用于限制返回的消息数量。

注意事项:

  • Stream 的消息 ID 由两部分组成:一个时间戳和一个序列号。时间戳表示消息被添加到 Stream 的时间,而序列号则用于在同一时间戳内区分不同的消息。
  • XRANGE 命令返回的消息是按照它们在 Stream 中的顺序排列的,即按照消息 ID 的顺序。
  • 如果在检索消息时使用了 COUNT 参数,但指定的范围内的消息数量少于 COUNT 指定的数量,那么只会返回范围内的所有消息。

示例,检索所有消息:

localhost:0>XRANGE mystream - +1)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4)    "38"2)    1)   "1719279971749-0"2)      1)    "msg_3"2)    "100"3)    "msg_4"4)    "38"

示例,检索特定范围内的消息:

localhost:0>XRANGE mystream  1719279960591-0 1719279960600-01)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4)    "38"

示例,限制返回的消息数量:

localhost:0>XRANGE mystream - + COUNT 11)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4) 

2.3 XREVRANGE

XREVRANGE 命令与 XRANGE 命令类似,但它是按照消息 ID 的递减顺序(用于反向)获取指定范围内的消息。

命令格式:

XREVRANGE key end start [COUNT count]

示例,检索最后两个时间序列的消息:


localhost:0>XREVRANGE mystream + - COUNT 21)    1)   "1719279971749-0"2)      1)    "msg_3"2)    "100"3)    "msg_4"4)    "38"2)    1)   "1719279960591-0"2)      1)    "msg_1"2)    "100"3)    "msg_2"4)    "38"

2.4 XDEL

XDEL 命令用于从 Stream 中删除指定的消息。返回一个整数,表示被成功删除的消息数量。

命令格式:

XDEL key ID [ID ...]

参数说明:

  • key:指定 Streamkey
  • ID:一个或多个要删除的消息的 ID

注意事项:

  • 在使用 XDEL 命令时,你需要确保提供的消息 ID 是存在的,否则命令将不会删除任何消息,并返回0。
  • 可以通过一次 XDEL 命令删除多个消息,只需在命令中提供多个消息 ID 即可。
  • XDEL 命令不会改变 Stream 的其他消息的顺序或 ID

示例,删除消息:

localhost:0>XDEL mystream 1719280747405-0
"1"

2.5 XLEN

XLEN 命令用于获取指定 Stream 中包含的消息数量,即流的长度。如果 Stream 不存在或为空,则返回 0

命令格式:

XLEN key

示例:

localhost:0>XLEN mystream
"1"

2.6 XREAD

XREAD 命令是用于从 Stream 独立消费消息,支持阻塞等待新消息的到来。返回一个数组,其中每个元素都是一个包含 Stream key 和消息列表的数组。消息列表是一个包含消息 ID 和消息数据的数组。

命令格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数说明:

  • COUNT count:指定一次读取的最大消息数量。如果未指定,则默认为 1
  • BLOCK milliseconds:用于指定阻塞的时间(以毫秒为单位)。如果指定了此参数,并且 Stream 中没有可消费的消息,客户端将在指定的时间内阻塞等待。如果设置为 0 ,则表示非阻塞模式,即如果没有消息可消费,则立即返回。
  • STREAMS key [key ...]:指定要从中读取消息的 Streamkey 。可以指定一个或多个。
  • ID [ID ...]:对于每个指定的 key ,可以提供一个或多个消息 ID 。这些 ID 用于指定从哪个位置开始读取消息。如果某个 key 后面没有指定 ID ,则默认为从该 Stream 的最新消息开始读取。

示例,非阻塞模式读取最新消息:

XREAD COUNT 1 STREAMS mystream $

示例,阻塞模式,读取最新消息并等待新消息:

XREAD COUNT 1 BLOCK 10000 STREAMS mystream $

2.7 XGROUP CREATE

XGROUP CREATE 命令用于在已存在的流(stream)上创建一个新的消费者组(consumer group)。消费者组允许多个消费者(consumer)协作消费同一个流中的数据,并且每个消费者都可以从自己的位置开始读取流。

命令格式:

XGROUP CREATE <key> <groupname> <id> [MKSTREAM] [MKTABLE] [CREATECONSUMER <consumername>]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <id>:消费者组初始的最后一个条目 ID ,即消费者组开始读取的起始点。可以使用$表示流的最新条目,或者使用0表示流的起始点,或者使用任何其他有效的 ID
  • [MKSTREAM]:可选参数,如果流不存在,则创建它。
  • [MKTABLE]:在 Redis 6.2 及更高版本中引入的可选参数,用于创建与流关联的二级索引表(secondary index table)。这通常用于支持基于特定字段的查询。
  • [CREATECONSUMER <consumername>]:在 Redis 6.2 及更高版本中引入的可选参数,用于在创建消费者组时同时创建一个消费者。

示例,创建一个新的消费者组,从流的最新条目开始读取:

localhost:0>XGROUP CREATE mystream mygroup $ MKSTREAM
"OK"

2.8 XACK

XACK 命令用于确消费者已经成功处理了一个或多个消息。这些消息通常是从流(Stream)中读取的,并存储在消费者组的待处理条目列表(Pending Entry ListPEL)中。通过发送 XACK 命令,消费者通知 Redis 服务器它已经完成了一个或多个消息的处理,从而将这些消息从 PEL 中移除。

命令格式:

XACK <key> <groupname> <consumername> <ID> [<ID> ...]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <consumername>:消费者的名称。
  • <ID>:要确认的消息的ID,可以指定一个或多个。

示例,假设消费者已经读取了一些消息,并决定它们已经被成功处理。现在,消费者想要确认这些消息:

XACK mystream mygroup myconsumer 1526569900000-0 1526569900002-0

在这个例子中,消费者确认了两个消息,它们的 ID 分别是 1526569900000-01526569900002-0

一旦消息被确认,它们将从该消费者组的 PEL 中移除,表示这些消息已经被成功处理。注意,即使消息被确认并从 PEL 中移除,它们仍然保留在流中,并且可以被其他消费者组或消费者读取。

如果消费者在处理消息时失败,或者需要稍后重试,它可以选择不发送 XACK 命令,这样消息将保持在 PEL 中,直到消费者准备好确认它们或它们因超时而被自动从 PEL 中移除(取决于消费者组的配置)。

2.9 XPENDING

XPENDING 命令用于查询消费者组中未确认消息的详细信息。允许你查看哪些消息正在等待被处理,以及哪些消费者拥有这些消息。

命令格式:

XPENDING <key> <groupname> [start end count] [consumername]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • [start end count]:这三个参数是可选的,用于限制查询结果的范围。
  • start:查询的开始消息ID
  • end:查询的结束消息ID
  • count:要返回的消息数量。
  • [consumername]:可选参数,指定要查询的消费者的名称。如果不提供此参数,将返回消费者组中的所有未确认消息。

XPENDING 命令返回一个数组,其中包含以下信息:

  • 总未确认消息数:整数,表示在指定范围内未确认的消息总数。
  • 最小消息ID:字符串,表示在指定范围内未确认消息的最小ID
  • 最大消息ID:字符串,表示在指定范围内未确认消息的最大ID
  • 每个消费者的未确认消息:一个数组,其中每个元素都是一个包含消费者名称和该消费者拥有的未确认消息数的数组。

注意事项:

  • XPENDING 是一个只读命令,它不会修改任何数据。
  • 如果提供了 consumername 参数,则只返回该消费者的未确认消息信息。
  • 如果提供了 [start end count] 参数,则只返回指定范围内的未确认消息信息。
  • 通过 XPENDING 命令,你可以轻松地监控消费者组中的未确认消息,从而确保消息得到及时处理,并在必要时进行故障排除。

示例:

XPENDING mystream mygroup
2) "1526569900000-0"  # 最小消息ID  
3) "1526569900002-0"  # 最大消息ID  
4) 1) 1) "myconsumer" # 消费者名称  2) (integer) 2   # 该消费者拥有的未确认消息数

3. 应用场景

Redis Stream 主要用于消息队列,所以可以用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。但是更推荐使用专业的消息队列,比如RabbitMQRocketMQ等,对于简单的应用场景,如果能满足需求,也可以使用Redis Stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/366504.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【.Net】Web项目部署腾讯云

文章目录 总述前置准备docker-compose部署普通部署 参考 总述 前置准备 云服务添加端口 另有linux本身防火墙请参考&#xff1a; 【Linux】防火墙命令 需安装.Net SDK和Asp .Net Runtime 注意&#xff1a; 1、sdk也要不只是runtime 2、是Asp .Net Runtime不是.Net Runtime …

国产音频放大器工作原理以及应用领域

音频放大器是在产生声音的输出元件上重建输入的音频信号的设备&#xff0c;其重建的信号音量和功率级都要理想&#xff1a;如实、有效且失真低。音频范围为约20Hz&#xff5e;20000Hz&#xff0c;因此放大器在此范围内必须有良好的频率响应&#xff08;驱动频带受限的扬声器时要…

BIOS设置与系统分区

&#x1f4d1;打牌 &#xff1a; da pai ge的个人主页 &#x1f324;️个人专栏 &#xff1a; da pai ge的博客专栏 ☁️宝剑锋从磨砺出&#xff0c;梅花香自苦寒来 目录 一BIOS 1破解密码的前提 2B…

Spring Cloud Gateway3.x自定义Spring Cloud Loadbalancer负载均衡策略以及实现动态负载均衡策略的方案

目录 前言 1.原理分析 1.1 ReactiveLoadBalancerClientFilter源码分析 1.2 LoadBalancerClientFactory源码分析 2.代码实现 2.1 扩展原生RoundRobinLoadBalancer轮询策略 2.1.1 自定义实现RoundRobinLoadBalancer 2.1.2 配置自定义的RoundRobinLoadBalan…

【web3】分享一个web入门学习平台-HackQuest

前言 一直想进入web3行业&#xff0c;但是没有什么途径&#xff0c;偶然在电鸭平台看到HackQuest的共学营&#xff0c;发现真的不错&#xff0c;并且还接触到了黑客松这种形式。 链接地址&#xff1a;HackQuest 平台功能 学习路径&#xff1a;平台有完整的学习路径&#xff…

金蝶云星空字段之间连续触发值更新

文章目录 金蝶云星空字段之间连续触发值更新场景说明具体需求&#xff1a;解决方案 金蝶云星空字段之间连续触发值更新 场景说明 字段A配置了字段B的计算公式&#xff0c;字段B配置了自动C的计算公式&#xff0c;修改A的时候&#xff0c;触发了B的重算&#xff0c;但是C触发不…

ABeam×StartUp | ABeam德硕中国新创部门拜访通用机器人初创公司 :逐际动力,就具身智能机器人的发展展开交流

近日&#xff0c;ABeam中国新创部门有幸拜访了深圳逐际动力科技有限公司&#xff08;以下简称&#xff1a;逐际动力&#xff09;。作为一家通用机器人公司&#xff0c;其在人形机器人、四轮足机器人等领域具有深厚的学术与技术储备。 现场合影 左&#xff1a;ABeam中国新创部门…

最快33天录用!一投就中的医学4区SCI,几乎不退稿~

【SciencePub学术】今天小编给大家推荐2本生物医学领域的SCI&#xff0c;此期刊为我处目前合作的重点期刊&#xff01;影响因子0-3.0之间&#xff0c;最重要的是审稿周期较短&#xff0c;对急投的学者较为友好&#xff01; 医学医药类SCI 01 / 期刊概况 【期刊简介】IF&…

多模态融合 + 慢病精准预测

多模态融合 慢病精准预测 慢病预测算法拆解子解法1&#xff1a;多模态数据集成子解法2&#xff1a;实时数据处理与更新子解法3&#xff1a;采用大型语言多模态模型&#xff08;LLMMs&#xff09;进行深度学习分析 慢病预测更多模态 论文&#xff1a;https://arxiv.org/pdf/2406…

高通骁龙(Qualcomm Snapdragon)CDSP HVX HTP 芯片简介与开发入门

1. Hexagon DSP/HVX/HTP 硬件演进 说到高通骁龙芯片大家应该不会陌生&#xff0c;其作为最为广泛的移动处理器之一&#xff0c;几乎每一个品牌的智能手机都会使用高通骁龙的处理器。 高通提供了一系列骁龙芯片解决方案。根据性能强弱分为了5个产品系列&#xff1a;从最高端的…

数据结构_1.0

一、数据结构概述 1.1 概念 在计算机科学中&#xff0c;数据结构是一种数据组织、管理和存储的格式 。它是相互之间存在一种或多种特定关系的数据元素的集合。通常情况下&#xff0c;精心选择的数据结构可以带来更高的运行或者存储效率。数据结构往往同高效的检索算法和索引技…

【C语言】typedef 关键字

在C语言中&#xff0c;typedef关键字用于给现有的数据类型起一个新的名字。它在提高代码可读性、简化复杂类型声明、增强可维护性方面非常有用。typedef通常用于定义结构体、指针、函数指针以及其他复杂类型。 基本用法 typedef int MyInt; MyInt x 10;在这个例子中&#xf…

wps linux node.js 加载项开发,和离线部署方案

环境准备 windwos 安装node.js 安装VSCode 安装wps linux 安装node.js 安装VSCode 安装wps 通过npm 安装wpsjs SDK 使用npm安装wpsjs npm install -g wpsjs 创建一个项目 wpsjs create WPS-Addin-PPT 创建项目会让你选择2个东西&#xff1a; 1&#xff1a;选择你的文…

SpringBoot实现图片添加水印(完整)

提示&#xff1a;昨天不是写了一个类似与图片添加水印的版本吗,今天来写一个带数据库&#xff0c;并且可以完整访问的版本 文章目录 目录 文章目录 引入库 配置文件 数据库配置 字段配置 索引配置 数据库表语句 启动文件 前端代码 整体代码目录 配置类AppConfig Contro…

五千元软考补贴,这个地区的人别错过!

软考合格者可享受一些补贴&#xff0c;最高可达25万&#xff1f;&#xff01;持有软考证书可领取哪些补贴&#xff1f;软考补贴详细信息 由于不同地区政策有时间限制&#xff0c;符合条件的人员应尽快领取哦~ 今天继续分享常德地区的补贴信息。 这里给大家总结一下和软考有关…

Java的日期类常用方法

Java_Date 第一代日期类 获取当前时间 Date date new Date(); System.out.printf("当前时间" date); 格式化时间信息 SimpleDateFormat simpleDateFormat new SimpleDateFormat("yyyy-mm-dd hh:mm:ss E); System.out.printf("格式化后时间" si…

ROS2 RQT

1. RQT是什么 RQT是一个GUI框架&#xff0c;通过插件的方式实现了各种各样的界面工具。 强行解读下&#xff1a;RQT就像插座&#xff0c;任何电器只要符合插座的型号就可以插上去工作。 2.选择插件 这里我们可以选择现有的几个RQT插件来试一试&#xff0c;可以看到和话题、参…

计算机公共课面试常见问题:线性代数篇

目录 1. 特征向量和特征值代表什么含义&#xff1f; 2. 矩阵的秩是什么&#xff1f;满秩代表什么&#xff1f;不满秩呢&#xff1f; 3. 奇异值分解是什么&#xff1f; …

【图像处理实战】去除光照不均(Python)

这篇文章主要是对参考文章里面实现一种小拓展&#xff1a; 可处理彩色图片&#xff08;通过对 HSV 的 V 通道进行处理&#xff09;本来想将嵌套循环改成矩阵运算的&#xff0c;但是太麻烦了&#xff0c;而且代码也不好理解&#xff0c;所以放弃了。 代码 import cv2 import …