Redis队列Stream

1 缘起

项目中处理文件的场景:
将文件处理请求放入队列,
一方面,缓解服务器文件处理压力;
另一方面,可以根据文件大小拆分到不同的队列,提高文件处理效率。
这是Java开发组Leader佳汇提出的文件处理方案,非常实用。
从他那学习到之后,开始搜集Redis Stream相关的知识,整理成文,帮助开发者轻松应对知识交流和考核。

2 Redis Stream

Redis Stream是Redis 5.0.0版本新增的数据结构,想使用Stream需要Redis的最低版本是5.0
Stream是一个高性能、高可靠的消息队列,用于异步消息处理,就是传统的队列功能,完成流量削峰。Redis 5.0之前的版本就有提供队列功能,如列表、有序集合和Pub/Sub均可实现队列功能。既然Redis已经有了队列功能,为什么还要Stream这个数据结构呢?
按照正常的思考过程,新事物的出现,一般是为了解决旧事物的问题,或者,为了防止垄断,当然, 技术圈也遵循这个理论。

2.1 解决的问题

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

2.2 架构

先来看一下Stream的总体架构:
在这里插入图片描述
Redis Stream有生产者、消费者和消费组,其中,
(1)消费组:有多个消费者,消费者之间是竞争关系,消费组中有一个last_delivered_id,消费组中的任意一消费者消费了消息,都会使last_delivered_id移动;
(2)消费者:消费者消费消息后,会产生pending_id,即消费者的状态变量,当消费者消费消息后,使用pending_ids记录被消费的消息,当客户端没有进行消费确认(ACK)时,pending_ids中的数据会一直增加,当客户端进行消息确认(ACK)后, 会移除pending_id。Redis官方称pending_ids为PEL(Pending Entries List),用于确保客户端至少消费一次消息,而不会在网络传输中丢失了处理。

2.3 数据结构

先从源码简单看下Stream相关的数据结构:

/* Stream item ID: a 128 bit number composed of a milliseconds time and* a sequence counter. IDs generated in the same millisecond (or in a past* millisecond if the clock jumped backward) will use the millisecond time* of the latest generated ID and an incremented sequence. */
typedef struct streamID {uint64_t ms;        /* Unix time in milliseconds. */uint64_t seq;       /* Sequence number. */
} streamID;typedef struct stream {rax *rax;               /* The radix tree holding the stream. */uint64_t length;        /* Current number of elements inside this stream. */streamID last_id;       /* Zero if there are yet no items. */streamID first_id;      /* The first non-tombstone entry, zero if empty. */streamID max_deleted_entry_id;  /* The maximal ID that was deleted. */uint64_t entries_added; /* All time count of elements added. */rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

由源码知,stream由Radix树和streamID类型的数据构成,
其中,streamID有两部分组成,ms和seq,ms即毫秒(10位),seq即序列号,
Stream中的每一条消息使用:{毫秒}-{序列号}唯一标识。

3 基础操作

3.1 新建数据:XADD

格式:

XADD key ID field value [field value ...]

参数:

XADD mystream-test * name xiaoyi age 10
XADD mystream-test * name xiaoer age 11

在这里插入图片描述

3.2 查询数据:XRANGE

格式:

XRANGE key start end [COUNT count]

参数:

参数描述
key队列名称
start起始ID标识
end结束ID标识
COUNT查询的条数

3.2.1 查询所有数据

XRANGE mystream-test - +

参数:
-:第一条数据
+:最后一条数据
使用- + 表示拆寻所有数据。
在这里插入图片描述

3.2.2 查询指定条数

XRANGE mystream-test - + COUNT 1

在这里插入图片描述

3.2.3 查询指定范围数据

XRANGE mystream-test 1697335440000-0 1697359922197-0

在这里插入图片描述

3.3 读取数据

格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数:

参数描述
COUNT返回的条数
BLOCK用于设置XREAD为阻塞模式,单位毫秒,默认为非阻塞模式。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。如果在这个时间内没有新的数据流入,那么输出(nil) (1.05s)

注:使用Block模式,配合 作为 I D ,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式无意义),若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

3.3.1 直接读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.3.2 阻塞读取

XREAD BLOCK 4000 STRRAMS mystream-test $

在这里插入图片描述

3.3.3 非阻塞读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.4 删除数据

格式:

XDEL key ID [ID ...]

参数:

参数描述
key队列名称
ID数据ID
XDEL mystream-test 1697376922916-0

在这里插入图片描述

3.5 消费组

3.5.1 创建消费组:XGROUP

格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

参数:

参数描述
CREATE创建消费组
key队列名称
groupname消费组名称
id接收指定ID之后的消息
$接收所有的消息
参数描述
DESTROY删除消费组
key队列名称
groupname消费组名称
参数描述
DELCONSUMER删除消费组中的消费者
key队列名称
groupname消费组组名称
consumername消费者名称
# 创建接收最新消息的消费组
XGROUP CREATE mystream-test mygroup-1 $
# 创建接收所有消息的消费组
XGROUP CREATE mystream-test mygroup-2 0

在这里插入图片描述

3.5.2 删除消费组

# 删除消费组
XGROUP DESTROY mystream-test mygroup-1
XGROUP DELCONSUMER mystream-test mygroup-2

3.5.3 消费组消费消息:XREADGROUP

格式:

 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
参数描述你
group消费组名称。
consumer消费者名称。
count要读取的数量。
milliseconds阻塞时间,以毫秒为单位。
key键指定的队列名称。
ID表示消息 ID。
XREADGROUP GROUP mygroup-1 myconsumer-1 COUNT 1 BLOCK 100000 STREAMS mystream-test >

在这里插入图片描述

3.6 查看等待确认状态:XPENDING

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

在这里插入图片描述

3.7 消费信息确认:XACK

格式:

XACK key group ID [ID ...]

参数:

参数描述
key队列名称
group消费组名称
ID消息ID
XACK mystream-test mygroup-1 1698558137966-0

在这里插入图片描述

3.8 查询信息:XINFO

格式:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

参数:
查询消费者信息

参数名称
CONSUMERS查询消费者名称
key消费者名称
groupname
查询消费组信息
参数名称
GROUPS查询消费组信息
key消费组名称
查询队列信息
参数名称
STREAM查询队列信息
key队列名称

3.8.1 查询队列信息

XINFO STREAM mystream-test

在这里插入图片描述

3.8.3 查询队列中的消费组

XINFO GROUPS mystream-test

在这里插入图片描述

3.8.4 查询队列消费组中的消费者

XINFO CONSUMERS mystream-test mygroup-1

在这里插入图片描述

4 小结

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/174636.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hdlbits系列verilog解答(8位宽移位寄存器)-24

文章目录 一、问题描述二、verilog源码三、仿真结果一、问题描述 这项练习是module_shift移位寄存器的延伸。模块端口不是只有单个引脚,我们现在有以向量作为端口的模块,您将在其上附加线向量而不是普通线网数据。与 Verilog 中的其他位置一样,端口的向量长度不必与连接到它…

Go语言用Resty库编写的音频爬虫代码

目录 一、Go语言与Resty库简介 二、音频爬虫的实现 1、确定抓取目标 2、使用Resty发送HTTP请求 3、解析响应数据 4、下载音频文件 5、并发下载音频文件 三、注意事项 总结 随着互联网的飞速发展,网络爬虫逐渐成为数据获取和分析的重要工具。在音频领域&…

[NSSCTF 2nd] web刷题记录

文章目录 php签到MyBox非预期解预期解 php签到 源代码 <?phpfunction waf($filename){$black_list array("ph", "htaccess", "ini");$ext pathinfo($filename, PATHINFO_EXTENSION);foreach ($black_list as $value) {if (stristr($ext, …

【计算机网络】什么是HTTPS?HTTPS为什么是安全的?

【面试经典题】 前言&#xff1a; HTTP最初的设计就是用于数据的共享和传输&#xff0c;并没有考虑到数据的安全性&#xff0c;如窃听风险&#xff0c;篡改风险和冒充风险。HTTPS是在 HTTP 的基础上引入了一个加密层。HTTPS通过数据加密&#xff0c;数据完整性检验和身份认证…

BUUCTF_练[CISCN2019 华北赛区 Day1 Web5]CyberPunk

[CISCN2019 华北赛区 Day1 Web5]CyberPunk 文章目录 [CISCN2019 华北赛区 Day1 Web5]CyberPunk掌握知识解题思路代码分析paylaod的构建正式解题 关键paylaod 掌握知识 ​ php伪协议读取文件&#xff1b;源码泄露hint &#xff1b;代码审计 发现二次注入点&#xff1b;SQL语句的…

【Unity小技巧】可靠的相机抖动及如何同时处理多个震动(附项目源码)

文章目录 每篇一句前言安装虚拟相机虚拟相机震动测试代码控制震动清除震动控制震动的幅度和时间 两个不同的强弱震动同时发生源码完结 每篇一句 围在城里的人想逃出来&#xff0c;站在城外的人想冲进去&#xff0c;婚姻也罢&#xff0c;事业也罢&#xff0c;人生的欲望大都如此…

从 malloc 分配大块内存失败 来简看 linux 内存管理

文章目录 背景Glibc MallocMalloc 分配大块内存失败原因Overcommit_memory 实现OOM (Out Of Memory) 的实现 背景 应用进程 malloc 返回了null&#xff0c;但是观察到的os 的free内存还有较大的余量 &#xff0c;很奇怪为什么会这样&#xff1f; 不可能是oom导致的&#xff0…

【2023Mathorcup大数据】B题 电商零售商家需求预测及库存优化问题 python代码解析

【2023Mathorcup大数据】B题 电商零售商家需求预测及库存优化问题 python代码解析 1 题目 2023 年MathorCup 高校数学建模挑战赛——大数据竞赛赛道B&#xff1a;电商零售商家需求预测及库存优化问题电商平台存在着上千个商家&#xff0c;他们会将商品货物放在电商配套的仓库…

企业金蝶KIS软件服务器中了locked勒索病毒怎么办,勒索病毒解密

最近一段时间&#xff0c;网络上的locked勒索病毒又开始了新一波的攻击&#xff0c;给企业的正常生产生活带来了严重影响。经过最近一段时间云天数据恢复中心对locked勒索病毒的解密&#xff0c;为大家整理了以下有关locked勒索病毒的相关信息。近期locked勒索病毒主要攻击金蝶…

小白如何在一个月写一篇论文(中文核心,SCI)

小白如何半年发3篇sci的我教你如何快速“水”一篇sci论文_哔哩哔哩_bilibili 计算机视觉&#xff0c;cv领域 半年发3篇sci的我教你如何快速“水”一篇sci论文 计算机视觉(辅导 SCI EI 核心) 微信&#xff1a;whbwqq123或主页加up 小白如何快速写出一篇论文并成功发表&…

83.每日一练:搜索插入位置(力扣第35题)

问题描述 代码解决以及思想 class Solution { public:int searchInsert(vector<int>& nums, int target) {int left 0; // 定义左边界int right nums.size() - 1; // 定义右边界while (left < right) { // 当左边界小于…

什么是鉴权?一篇文章带你了解postman的多种方式

一、什么是鉴权&#xff1f; 鉴权也就是身份认证&#xff0c;就是验证您是否有权限从服务器访问或操作相关数据。发送请求时&#xff0c;通常必须包含相应的检验参数以确保请求具有访问权限并返回所需数据。通俗的讲就是一个门禁&#xff0c;您想要进入室内&#xff0c;必须通…

用友 GRP-U8 存在sql注入漏洞复现

0x01 漏洞介绍 用友 GRP-U8 license_check.jsp 存在sql注入&#xff0c;攻击者可利用该漏洞执行任意SQL语句&#xff0c;如查询数据、下载数据、写入webshell、执行系统命令以及绕过登录限制等。 fofa&#xff1a;app”用友-GRP-U8” 0x02 POC: /u8qx/license_check.jsp?kj…

基于【逻辑回归】的评分卡模型金融借贷风控项目实战

背景知识&#xff1a; 在银行借贷过程中&#xff0c;评分卡是一种以分数形式来衡量一个客户的信用风险大小的手段。今天我们来复现一个评分A卡的模型。完整的模型开发所需流程包括&#xff1a;获取数据&#xff0c;数据清洗和特征工程&#xff0c;模型开发&#xff0c…

【微服务开篇-RestTemplate服务调用、Eureka注册中心、Nacos注册中心】

本篇用到的资料&#xff1a;https://gitee.com/Allengan/cloud-demo.githttps://gitee.com/Allengan/cloud-demo.git 目录 1.认识微服务 1.1.单体架构 1.2.分布式架构 1.3.微服务 1.4.SpringCloud 1.5.总结 2.服务拆分和远程调用 2.1.服务拆分原则 2.2.服务拆分示例 …

Composition API的引入

目录 全局API的移除和替代 插件的改进 TypeScript支持的增强 优势 劣势 总结 Vue.js 3.x版本引入了Composition API&#xff0c;这是一个全新的API风格&#xff0c;旨在提高代码的可读性和重用性。Composition API使我们可以根据逻辑相关性组织代码&#xff0c;而不是按照…

Typora(morkdown编辑器)的安装包和安装教程

Typora&#xff08;morkdown编辑器&#xff09;的安装包和安装教程 下载安装1、覆盖文件2、输入序列号①打开 typora &#xff0c;点击“输入序列号”&#xff1a;②邮箱一栏中任意填写&#xff08;但须保证邮箱地址格式正确&#xff09;&#xff0c;输入序列号&#xff0c;点击…

从0到1之微信小程序快速入门(03)

目录 什么是生命周期函数 WXS脚本 ​编辑 与 JavaScript 不同 纯数据字段 组件生命周期 定义生命周期方法 代码示例 组件所在页面的生命周期 代码示例 插槽 什么是插槽 启用多插槽 ​编辑 定义多插槽 组件通信 组件间通信 监听事件 触发事件 获取组件实例 自…

实现接口自动化测试

最近接到一个接口自动化测试的case&#xff0c;并展开了一些调研工作&#xff0c;最后发现&#xff0c;使用pytest测试框架并以数据驱动的方式执行测试用例&#xff0c;可以很好的实现自动化测试。这种方式最大的优点在于后续进行用例维护的时候对已有的测试脚本影响很小。当然…

【MySQL】C语言连接数据库

文章目录 一、安装 MySQL 库二、MySQL C API 相关接口1、C API 官方文档2、初始化 MYSQL3、连接 MySQL4、下发 mysql 指令5、获取 mysql 查询结果6、释放 MYSQL_RES 对象7、关闭 MySQL 连接8、MySQL 其他操作9、总结 三、使用图形化工具连接 MySQL 一、安装 MySQL 库 我们之前…