Redis Stream Redisson Stream

目录

    • 一、Redis Stream
      • 1.1 场景1:多个客户端可以同时接收到消息
        • 1.1.1 XADD - 向stream添加Entry(发消息 )
        • 1.1.2 XREAD - 从stream中读取Entry(收消息)
        • 1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)
      • 1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)
        • 1.2.1 XGROUP CREATE - 创建消费组
        • 1.2.2 XREADGROUP - 从消费组中读取消息
        • 1.2.3 XACK - 确认消息
        • 1.2.4 XPENDING - 读取PEL消息
        • 1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者
        • 1.2.6 统计命令
      • 1.3 其他
    • 二、Redisson Stream

一、Redis Stream

之前介绍过Redis Pub/Sub相关内容,通过Redis Pub/Sub可以实现发布/订阅消息传递范式,但是存在丢消息的可能,而本文介绍的Redis Stream是一种可用来实现 可靠消息队列、支持消息分组(类似Kafka Group) 的数据结构。

关于Redis Stream的使用存在如下2个场景

  • 场景1: 多个客户端可以同时接收到消息
  • 场景2: 多个客户端仅收到一部分消息(分片sharded),例如发送消息A,B,C,客户端1收到A,C,客户端2收到B(参考Kafka group概念)。

关于场景1,则可参考XADD、XREAD、XRANGE等相关命令的使用,
关于场景2,则需要了解XGROUP CREATE、XREADGROUP、XACK等相关命令的使用。

1.1 场景1:多个客户端可以同时接收到消息

场景1中相关命令XADD、XREAD、XRANGE的使用汇总如下图:
在这里插入图片描述

1.1.1 XADD - 向stream添加Entry(发消息 )

向stream添加Entry(多个key/value对),XADD命令格式:

XADD stream名称 id key1 value1 key2 value2 …

其中id为此次entry的唯一ID,而key1 value1 key2 value2 …即为entry的具体内容,
id为*则表示由Redis自动生成ID:<millisecondsTime>-<sequenceNumber>
亦可明确指定id。

示例:

XADD mystream * name 罗 age 18
XADD mystream 1692632086370-0 name 刘 age 18
1.1.2 XREAD - 从stream中读取Entry(收消息)

从stream中读取entry,XREAD命令格式:

XREAD COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 STREAMS stream名称 上次接收的id

通过XADD添加一条消息,多个执行XREAD的客户端都会读取到该消息,
XREAD会从参数中指定的 上次接收的id 之后开始读取后续的消息,
上次接受的id 可设置为$,需配合BLOCK使用,表示仅读取从阻塞开始后新添加的消息(即不关心历史消息),
上次接受的id 可设置为+,需要Redis版本>=7.4 RC1,表示仅读取最后一条消息。
阻塞等待的毫秒数 如果为0,则表示一直阻塞,直到读取到一条消息。

示例:

# 从头开始读取1条消息
XREAD STREAMS mystream 0# 从头开始读取2条消息
XREAD COUNT 2 STREAMS mystream 0-0
# 从指定消息ID之后开始读取2条消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0# 最长阻塞5秒,最多读取100条消息,仅读取从阻塞开始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 继续从上次接受的id之后继续读取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3# 读取最后一条消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)

从stream指定区间(起始ID范围)正向读取Entry,XRANGE命令格式:

XRANGE stream名称 起始id 结束id COUNT 最多读取数量

按起始到结束正向返回消息,
-表示最小ID,+表示最大ID

示例:

# 返回全部消息(从前到后依次返回)
XRANGE mystream - + 
# 返回5条消息(从前到后依次返回)
XRANGE mystream - + COUNT 5# 返回指定id(包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5# 返回指定id(不包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5

从stream指定区间(起始ID范围)逆向读取Entry,XREVRANGE命令格式:

XREVRANGE stream名称 结束id 起始id COUNT 最多读取数量

按结束到起始逆向返回消息。

示例:

返回全部消息(从后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2条消息(从后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2

1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)

场景2中相关命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM等使用汇总如下图:

在这里插入图片描述

1.2.1 XGROUP CREATE - 创建消费组

给stream创建消费分组,分组间彼此隔离,分组内多个consumer会轮流消费消息(分片),XGROUP CREATE命令格式:

XGROUP CREATE stream名称 group名称 起始读取id [MKSTREAM]

起始读取id0,表示从头开始读取,
起始读取id$,表示从最后一条消息之后开始读取,
MKSTREAM子命令是可选的,表示自动创建stream。

示例:

# 为mystream创建分组mygroup1,且从最新消息开始消费XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 从消费组中读取消息

以分组group读取stream中的消息,group中每个客户端需要指定consumer名称,多个consumer分摊group中的消息,而多个group间彼此隔离,XREADGROUP命令格式:

XREADGROUP GROUP group名称 consumer名称 COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 [NOACK] STREAMS stream名称 上次接收的id

PEL(Pending Entries List): 当使用XREADGROUP读取分组下消息时,服务器会记住哪条消息发给了分组下的哪个消费者,该记录存储在消费者组中,称为PEL,即已发送但尚未确认的消息ID列表。后续在消费者处理完消息后,消费者必须手动调用XACK命令对消息ID进行确认,以便从PEL中删除挂起的消息,关于PEL的结构可参见下图(截取自RedisInsight工具):
在这里插入图片描述

上次接收的id>,表示消费者只希望接收从未传递给任何其他消费者的消息,即给我新的信息>号表示从当前消费组的last_delivered_id后面开始读。
上次接收的id 设为0或其他有效的id,则表示仅读取 PEL(当前consumer没有确认的消息) 中指定id之后的消息。

NOACK子命令式可选的,表示无需确认消息,NOACK子命令适用于对可靠性要求不高、偶尔的消息丢失是可以接受的情况,使用NOACK子命令可以避免将消息添加到PEL( Pending Entries List),相当于在读取消息后自动确认消息,后续无需再调用XACK命令进行确认,

示例:

# 消费者c1阻塞读取mystream下分组mygroup1的最新消息(直到读取到1条消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream ># 消费者c1读取mystream下分组mygroup1的PEL消息(即已投递给c1但c1未进行确认的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 确认消息

确认stream下指定分组group的某条消息已被成功消费,XACK命令格式:

XACK stream名称 group名称 消息id

示例:

# 确认1条消息 
XACK mystream mygroup1 1719206857966-0 # 同时确认3条消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 读取PEL消息

读取stream中指定分组group的PEL挂起消息列表,XPENDING命令格式:

XPENDING stream名称 group名称 IDEL 空闲毫秒数 起始消息id 结束消息id 查询数量 consumer名称

示例:

# 查询mystream下mygroup1分组的PEL列表
XPENDING mystream mygroup1# 查询mystream下mygroup1分组下的消费者c1的空闲9秒的最多10条PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者

通过XPENDING查询出PEL消息(已投递未确认)后,若原先消息对应的consumer已经挂掉,没有能力继续处理消息,则可通过XCLIAM将对应的消息转移给同分组下的其他consumer进行处理,XCLAIM命令格式如下:

XCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 消息id1 消息id2

转移后消息上次投递时间会重置为当前时间(即消息空闲idle时间为0),
默认会返回已经转移成功的消息内容,且消息投递计数会加1,
也可添加JUSTID子命令,则只返回消息ID不返回消息内容,且消息投递计数不变,
若多个客户端同时通过XCLAIM转移同一条消息的所有权,则只会有一个客户端转移成功。
Redis官方原文如下:

Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also

  • reset the idle time (since this is a new attempt at processing the message),
  • two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).

示例:

# mystream下mygroup1分组下的PEL消息1526569498055-0且空闲时长超过1小时,则将其转移给消费者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0

亦可通过XAUTOCLAIM将PEL中指定起始消息ID后的消息批量进行转移,XAUTOCLIAM命令格式如下:

XAUTOCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 起始消息id COUNT 消息数量

示例:

# 扫描mystream下mygroup1分组下的所有PEL消息,空闲时长超过1小时,则最多转移25条消息给消费者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 统计命令
# 查询stream下的分组信息
XINFO GROUPS stream名称# 查询stream信息
XINFO STREAM stream名称# 查询stream下指定分组的消费者信息
XINFO CONSUMERS stream名称 group名称

1.3 其他

删除stream中的消息:

XDEL stream名称 id1 id2 …

查询stream中的消息(entry)数量:

XLEN stream名称

压缩stream中的消息数据量:

XTRIM stream名称 MAXLEN 保留的最近消息数量
XTRIM stream名称 MINID 消息ID(小于此ID的消息均会被删除)

二、Redisson Stream

在Redisson中可通过Stream实现Redis Stream,

场景1 相关示例代码如下:

@Test
void testStream() throws InterruptedException {String streamName = "mystream";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//获取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//读消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);});//读取区间内消息 - XRANGE mystream 0 entryId COUNT 10entries = stream.range(10, StreamMessageId.ALL, entryId);entries.forEach((id, entryMap) -> {log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);});
}

场景2 相关示例代码如下:

@Resource
private RedissonClient redisson;@Test
void testStreamGroup() throws InterruptedException {String streamName = "mystream";String groupName = "mygroup1";String consumerName = "c1";MyMessage2 myMessage = this.buildMyMessageWithTimestampId();//获取StreamRStream<String, Object> stream = this.redisson.getStream(streamName);//发消息 - XADD mystream * name 我的消息 age 18StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));log.info("stream[{}] add success, id: {}", streamName, entryId);//查询已存在的分组 - XINFO GROUPS mystreamList<StreamGroup> streamGroups = stream.listGroups();streamGroups.forEach(streamGroup -> {log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());});Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));if (!existGroup) {//创建分组 - XGROUP CREATE mygroup1 $stream.createGroup(StreamCreateGroupArgs.name(groupName)//此处id支持:NEWEST即$,ALL即0.id(StreamMessageId.ALL));log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);}//读分组消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,//greaterThan即设置从哪个消息ID之后开始读取,支持:NEVER_DELIVERED即>、ALL即0StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED).count(5).timeout(Duration.ofSeconds(5)));entries.forEach((id, entryMap) -> {log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);});//读取PEL中未确认的消息 - XPENDING mystream mygroup1 - + 100 c1Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);streamMessageIdMapMap.forEach((id, entryMap) -> {log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",streamName, groupName, consumerName, id, entryMap);//确认消息(从PEL中移除) - XACK mystream mygroup1 1600000000000-0stream.ack(groupName, id);log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",streamName, groupName, consumerName, id);});}

参考:

Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/

Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/364237.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【王佩丰 Excel 基础教程】第一讲:认识Excel

文章目录 前言一、Excel软件简介1.1、历史上的其他数据处理软件与 Microsoft Excel1.2、Microsoft Excel 能做些什么1.3、Excel 界面介绍 二、Microsoft Excel 的一些重要概念2.1、Microsoft Excel 的几种常见文件类型2.2、工作簿、工作表、单元格. 三、使用小工具&#xff1a;…

SpringDataJPA系列(1)JPA概述

SpringDataJPA系列(1)JPA概述 SpringDataJPA似乎越来越流行了&#xff0c;我厂的mysql数据库和MongoDB数据库持久层都依赖了SpringDataJPA。为了更好的使用它&#xff0c;我们内部还对MongoDB的做了进一步的抽象和封装。为了查漏补缺&#xff0c;温故而知新&#xff0c;整理下…

基于自组织长短期记忆神经网络的时间序列预测(MATLAB)

LSTM是为了解决RNN 的梯度消失问题而诞生的特殊循环神经网络。该网络开发了一种异于普通神经元的节点结构&#xff0c;引入了3 个控制门的概念。该节点称为LSTM 单元。LSTM 神经网络避免了梯度消失的情况&#xff0c;能够记忆更长久的历史信息&#xff0c;更能有效地拟合长期时…

【面试系列】数据科学家 高频面试题及详细解答

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;欢迎订阅相关专栏&#xff1a; ⭐️ 全网最全IT互联网公司面试宝典&#xff1a;收集整理全网各大IT互联网公司技术、项目、HR面试真题. ⭐️ AIGC时代的创新与未来&#xff1a;详细讲解AIGC的概念、核心技术、…

[OtterCTF 2018]Name Game

Name Game 题目描述&#xff1a;我们知道这个帐号登录到了一个名为Lunar-3的频道。账户名是什么&#xff1f;猜想&#xff1a;既然登陆了游戏&#xff0c;我们尝试直接搜索镜像中的字符串 Lunar-3 。 直接搜索 Lunar-3 先把字符串 重定向到 txt文件里面去然后里面查找 Lunar-3…

使用 Spring Boot 3.x 与图形学技术,添加电子印章防伪特征

使用 Spring Boot 3.x 与图形学技术,添加电子印章防伪特征 在电子办公和无纸化办公日益普及的今天,电子印章的使用越来越广泛。然而,如何确保电子印章的安全性和防伪能力成为了一个亟待解决的问题。本文将通过 Spring Boot 3.x 和图形学技术,深入探讨如何为电子印章添加防…

Numpy array和Pytorch tensor的区别

1.Numpy array和Pytorch tensor的区别 笔记来源&#xff1a; 1.Comparison between Pytorch Tensor and Numpy Array 2.numpy.array 4.Tensors for Neural Networks, Clearly Explained!!! 5.What is a Tensor in Machine Learning? 1.1 Numpy Array Numpy array can only h…

IDEA中导入Maven项目

IDEA中导入Maven项目 方式1&#xff1a;使用Maven面板&#xff0c;快速导入项目 打开IDEA&#xff0c;选择右侧Maven面板&#xff0c;点击 号&#xff0c;选中对应项目的pom.xml文件&#xff0c;双击即可 说明&#xff1a;如果没有Maven面板&#xff0c;选择 View > Appe…

C#——SortedList 排序列表详情

SortedList 排序列表 SortedList 类用来表示键/值对的集合&#xff0c;这些键/值对按照键值进行排序&#xff0c;并且可以通过键或索引访问集合中的各个项。 我们可以将排序列表看作是数组和哈希表的组合&#xff0c;其中包含了可以使用键或索引访问各项的列表。如果您使用索…

为什么word生成的PDF内容显示不全?

在现代办公环境中&#xff0c;将文档从一个格式转换为另一个格式是一个常见的任务。然而&#xff0c;有时候我们可能会遇到意想不到的问题&#xff0c;比如使用Word转换成PDF时&#xff0c;生成的PDF文件只显示了整个界面的四分之一内容。这种问题不仅令人困扰&#xff0c;也可…

如何自己录制教学视频?零基础也能上手

随着在线教育的蓬勃发展&#xff0c;录制教学视频成为了教师和教育工作者们不可或缺的一项技能。无论是为了远程教学、课程分享还是知识普及&#xff0c;教学视频的录制都变得愈发重要。可是如何自己录制教学视频呢&#xff1f;本文将介绍两种录制教学视频的方法&#xff0c;这…

pg_rman:备份和恢复管理工具#postgresql培训

pg_rman 是 PostgreSQL 的在线备份和恢复工具。 pg_rman 项目的目标是提供一种与 pg_dump 一样简单的在线备份和 PITR 方法。此外&#xff0c;它还为每个数据库集群维护一个备份目录。用户只需一个命令即可维护包括存档日志在内的旧备份。 #PG培训#PG考试#postgresql考试#pos…

[OtterCTF 2018]Bit 4 Bit

我们已经发现这个恶意软件是一个勒索软件。查找攻击者的比特币地址。** 勒索软件总喜欢把勒索标志丢在显眼的地方&#xff0c;所以搜索桌面的记录 volatility.exe -f .\OtterCTF.vmem --profileWin7SP1x64 filescan | Select-String “Desktop” 0x000000007d660500 2 0 -W-r-…

填报高考志愿时,学校、专业和城市怎么选择呢?

我的观点是&#xff1a; 专业>城市>学校 专业是兴趣导向&#xff0c;符合自己的价值观&#xff0c;失去了这种驱动力的专业学习&#xff0c;会变得非常艰难的&#xff0c;而且没有竞争力&#xff0c;所以我的排序第一位是专业。 其次是城市&#xff0c;最好是一线城市&…

mysql_config 命令, 可以查看mysqlclient库的位置在/usr/lib64/mysql下

好吧&#xff0c;其实我是从这里知道了 -l 后面加的库名和so文件这种名不一样&#xff0c;因为库文件实际叫下面这个名&#xff08;前面有lib)。

基于SSM+Jsp的疫情居家办公OA系统

开发语言&#xff1a;Java框架&#xff1a;ssm技术&#xff1a;JSPJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包…

入门Java爬虫:认识其基本概念和应用方法

Java爬虫初探&#xff1a;了解它的基本概念与用途&#xff0c;需要具体代码示例 随着互联网的快速发展&#xff0c;获取并处理大量的数据成为企业和个人不可或缺的一项任务。而爬虫&#xff08;Web Scraping&#xff09;作为一种自动化的数据获取方法&#xff0c;不仅能够快速…

全网唯一免费无水印AI视频工具!

最近Morph Studio开始免费公测&#xff01;支持高清画质&#xff0c;可以上传语音&#xff0c;同步口型&#xff0c;最重要的是生成的视频没有水印&#xff01; Morph Studio国内就可以访问&#xff0c;可以使用国内邮箱注册&#xff08;我用的163邮箱&#xff09;&#xff0c;…

51单片机STC89C52RC——12.1 数据存储芯片AT24C02

目的/效果 利用存储芯片AT24C02存储数据&#xff0c;LCD1602显示存储的数据。 一&#xff0c;STC单片机模块 二&#xff0c;AT24C02存储芯片 2.1 介绍 AT24C02是一个2K位串行CMOS E2PROM&#xff0c;内部含有256个8位字节&#xff0c;采用先进CMOS技术实质上减少了器件的功…

vue封装原生table表格方法

适用场景&#xff1a;有若干个表格&#xff0c;前面几列格式不一致&#xff0c;但是后面几列格式皆为占一个单元格&#xff0c;所以需要封装表格&#xff0c;表格元素自动根据数据结构生成即可&#xff1b;并且用户可新增列数据。 分类&#xff1a; 固定数据部分 就是根据数据…