Redis之stream类型解读

目录

基本介绍

数据结构

消息

消费组

消费者

基本使用命令

概述 

xadd 命令

xtrim 命令

 xdel 命令

xlen 命令

xrange 命令

xread 命令 

xgroup 命令 

xreadgroup 命令

xack 命令


基本介绍

Redis stream(流)是一种数据结构,其作用类似于仅追加日志,但也实现了多个操作来克服典型仅追加日志的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,如消费者群体。 您可以使用流实时记录和同时联合事件。 

Redis 为每个stream(流)条目生成一个唯一的 ID。可以在以后使用这些 ID 检索其关联的条目,或读取和处理流中的所有后续条目。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

数据结构

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

消息

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。XADD key ID field value[field value ...]

  • 消息 ID:消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
  • 消息内容:消息内容就是键值对,形如 hash 结构的键值对。

消费组

每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息ID开始消费,这个 ID 用来初始化 last_delivered_id 变量。每个消费组的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者

消费者内部会有一个状态变量pending_ids,维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符)。如果客户端没有 ack,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。

基本使用命令

概述 

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

xadd 命令

XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。

一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。

XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"2) 1) "name"2) "Sara"3) "surname"4) "OConnor"
2) 1) "1601372323627-1"2) 1) "field1"2) "value1"3) "field2"4) "value2"5) "field3"6) "value3"

 返回值:该命令返回添加的条目的 ID。如果 ID 参数传的是*,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。

xtrim 命令

XTRIM 将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。

XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"2) 1) "field1"2) "A"3) "field2"4) "B"5) "field3"6) "C"7) "field4"8) "D"

返回值:返回从流中删除的条目数。

 xdel 命令

从指定流中移除指定的条目,并返回成功删除的条目的数量。在传递的ID不存在的情况下,返回的数量可能与传递的ID数量不同。

XDEL key ID[ID ...]
  • key:队列名称。
  • ID:消息 ID 
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-02) 1) "a"2) "1"
2) 1) 1538561701744-02) 1) "c"2) "3"

返回值:删除的条目数量。

xlen 命令

返回流中的条目数。如果指定的key不存在,则此命令返回0,就好像该流为空。

与其他的Redis类型不同,零长度流是可能的,所以你应该调用TYPE或者EXISTS来检查一个key是否存在。一旦内部没有任何的条目(例如调用XDEL后),流不会被自动删除,因为可能还存在与其相关联的消费者组。

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3

返回值:流中包含的条目数量

xrange 命令

xrange命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。

XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"2) 1) "name"2) "Virginia"3) "surname"4) "Woolf"
2) 1) "1601372577811-1"2) 1) "name"2) "Jane"3) "surname"4) "Austen"

返回值:该命令返回ID与指定范围匹配的条目。返回的条目是完整的,这意味着ID和所有组成条目的字段都将返回。此外,返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

xread 命令 

从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP或者BZPOPMIN等等。

XREAD[COUNT count][BLOCK milliseconds] STREAMS key[key ...]id[id ...]
  • count:数量。
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式。
  • key :队列名。
  • id:消息 ID。
redis> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"2) 1) 1) 1526984818136-02) 1) "duration"2) "1532"3) "event-id"4) "5"5) "user-id"6) "7782813"2) 1) 1526999352406-02) 1) "duration"2) "812"3) "event-id"4) "9"5) "user-id"6) "388234"
2) 1) "writers"2) 1) 1) 1526985676425-02) 1) "name"2) "Virginia"3) "surname"4) "Woolf"2) 1) 1526985685298-02) 1) "name"2) "Jane"3) "surname"4) "Austen"

返回值:该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

当使用BLOCK时,超时时将返回一个空回复(nil)。

xgroup 命令 

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

xreadgroup 命令

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。

xack 命令

XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。当一条消息交付到某个消费者时,它将被存储在PEL中等待处理,这通常出现在作为调用XREADGROUP命令的副作用,或者一个消费者通过调用XCLAIM命令接管消息的时候。

一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理,且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。

XACK key group ID[ID ...]

返回值:该命令返回成功确认的消息数。某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认),而且XACK不会把他们算到成功确认的数量中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/108005.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【TA 挖坑03】雾效 | 透光材质 | Impostor | 厚度转球谐

仍旧是记录下半年想要做的东西,很有趣,实现“一团雾效” “面片也有立体感” 等等效果的一些技术上的方法。 仅粗浅记录,保证之后自己填坑的时候看得懂就行! 透光 -> 透光材质ShadingModel 《永劫无间》透光材质的渲染&…

PHP自己的框架cookie()使用(完善篇七)

1、PHP自己的框架cookie() 2、cookie类&#xff08;CookieBase.php&#xff09; <?php class CookieBase {/*** 设置cookie*/public static function set($name, $value, $expire 3600, $path , $domain , $secure false, $httponly false) {setcookie($name, $valu…

几个nlp的小任务(抽取式问答)

几个nlp的小任务(抽取式问答) 安装库抽取式问答介绍、SQuAD数据集初始化参数加载、导入数据集查看数据集示例加载tokenizer对长文本处理的演示对答案的位置进行验证整合刚才的步骤对数据集中的数据进行预处理加载微调模型设置args 参数使用数据清洗设置训练函数,开始训练安装…

人员着装识别算法 yolo

人员着装识别系统通过yolo网络模型识别算法&#xff0c;人员着装识别系统算法通过现场安装的摄像头识别工厂人员及工地人员是否按要求穿戴着装&#xff0c;实时监测人员的着装情况&#xff0c;并进行相关预警。目标检测架构分为两种&#xff0c;一种是two-stage&#xff0c;一种…

一段简单的汇编语言源程序【2】

此文章主要记录代码的编写&#xff0c;编译&#xff0c;连接&#xff0c;调试过程&#xff0c;相关工具的安装和使用介绍在前面的文章中已提供。 主要功能通过栈实现两个数的交换 源代码如下&#xff1a; assume cs:codesg codesg segmentmov ax,2000Hmov ss,axmov sp,0add s…

Qt 自定义菜单 托盘菜单

托盘菜单实现&#xff1a;通过QSystemTrayIconQMenuQAction即可完美实现&#xff01; 实现方式&#xff1a;createActions用于创建菜单、菜单项,translateActions用于设置文本、实现多语化&#xff0c;translateAccount用于设置用户空间配额。 void TrayMenu::createActions(…

ppt如何转pdf文档?用这个方法可将ppt转pdf

在现代社会中&#xff0c;PPT(幻灯片)已成为一种常见的演示工具&#xff0c;被广泛应用于学术、商务、培训等领域。然而&#xff0c;PPT文件的使用和分享存在一些问题&#xff0c;例如文件格式不兼容、内容修改易被篡改等。为了解决这些问题&#xff0c;将PPT转换为PDF格式已成…

一篇掌握BFD技术(三):单臂回声配置

1. 实验目的 熟悉单臂回声的应用场景掌握单臂回声的配置方法 2. 实验拓扑 想要华为数通配套实验拓扑和配置笔记的朋友们点赞关注&#xff0c;评论区留下邮箱发给你 3. 实验步骤 1&#xff09;配置IP地址 AR1的配置 <Huawei>system-v…

React Hooks 全解:零基础入门

Hooks 的由来 你还在为该使用无状态组件&#xff08;Function&#xff09;还是有状态组件&#xff08;Class&#xff09;而烦恼吗&#xff1f; ——拥有了hooks&#xff0c;你再也不需要写Class了&#xff0c;你的所有组件都将是Function。 你还在为搞不清使用哪个生命周期钩…

Redis(缓存预热,缓存雪崩,缓存击穿,缓存穿透)

目录 一、缓存预热 二、缓存雪崩 三、缓存击穿 四、缓存穿透 一、缓存预热 开过车的都知道&#xff0c;冬天的时候启动我们的小汽车之后不要直接驾驶&#xff0c;先让车子发动机预热一段时间再启动。缓存预热是一样的道理。 缓存预热就是系统启动前&#xff0c;提前将相关的…

GitRedisNginx合集

目录 文件传下载 Git常用命令 Git工作区中文件的状态 远程仓库操作 分支操作 标签操作 idea中使用git 设置git.exe路径 操作步骤 linux配置jdk 安装tomcat 查看是否启动成功 查看tomcat进程 防火墙操作 开放指定端口并立即生效 安装mysql 修改mysql密码 安装lrzsz软…

【AI模型】gym强化学习仿真平台配置与使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍gym强化学习仿真平台配置与使用。 无专精则不能成&#xff0c;无涉猎则不能通。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&…

202325读书笔记|《花间集评注》——金盏不辞须满酌,海棠花下思朦胧,醉香风。满身香雾簇朝霞。世间屏障,彩笔画娇娆。

202325读书笔记|《花间集评注》——金盏不辞须满酌&#xff0c;海棠花下思朦胧&#xff0c;醉香风。满身香雾簇朝霞。世间屏障&#xff0c;彩笔画娇娆。 花间集评注卷一花间集评注卷二花间集评注卷三花间集评注卷四花间集评注卷五花间集评注卷六花间集评注卷七花间集评注卷八花…

代码随想录第32天|122.买卖股票的最佳时机 II,55. 跳跃游戏 ,45. 跳跃游戏 II

122.买卖股票的最佳时机 II 122. 买卖股票的最佳时机 II 思路比较简单 class Solution {public int maxProfit(int[] prices) {int res0,sum0;for(int i0;i<prices.length-1;i){if(prices[i1]-prices[i]>0){sumprices[i1]-prices[i];}ressum>res?sum:res;}return …

Spring 容器启动耗时统计

为了了解 Spring 为什么会启动那么久&#xff0c;于是看了看怎么统计一下加载 Bean 的耗时。 极简版 几行代码搞定。 import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor;import java.util.HashMap; imp…

语言基础篇1——Python概述,Python是什么?Python能干什么?

概述 简介 Python&#xff0c;计算机高级语言&#xff0c;读作/ˈpaɪθən/&#xff08;英音&#xff09;、/ˈpaɪθɑːn/&#xff08;美音&#xff09;&#xff0c;意为蟒蛇&#xff0c;Python的logo为两条缠绕的蟒蛇 特点 Python以开发效率高而运行效率低著称 应用领域…

一键实现 Oracle 数据整库同步至 Apache Doris

在实时数据仓库建设或迁移的过程中&#xff0c;用户必须考虑如何高效便捷将关系数据库数据同步到实时数仓中来&#xff0c;Apache Doris 用户也面临这样的挑战。而对于从 Oracle 到 Doris 的数据同步&#xff0c;通常会用到以下两种常见的同步方式&#xff1a; OGG/XStream/Lo…

加油站抽烟烟火智能识别算法

加油站抽烟烟火智能识别系统通过yoloopencv网络模型图像识别分析技术&#xff0c;加油站抽烟烟火智能识别算法识别出抽烟和燃放烟火的情况&#xff0c;并发出预警信号以提醒相关人员&#xff0c;减少火灾风险。OpenCV基于C实现&#xff0c;同时提供python, Ruby, Matlab等语言的…

Linux(多进程与多线程)

目录 1、进程与线程概念 1.1 进程 1.2 线程 1.3 进程与线程区别 2、多进程 2.1多进程概念 2.2 进程相关API 2.3 多进程编程 3、多线程 3.1 多线程概念 3.2 多线程相关API 3.3 多线程编程 1、进程与线程概念 1.1 进程 在计算机科学中&#xff0c;进程是正在执行中…

不同版本.net引用同一个项目

项目文件.csproj文件内容如下&#xff1a; 重点是&#xff1a;不能有其他的 netstandard2;net40;net45;net46;net6 <Project Sdk"Microsoft.NET.Sdk"><PropertyGroup><TargetFrameworks>netstandard2;net40;net45;net46;net6</TargetFrame…