Redis实战(5)——Redis实现消息队列

消息队列,顾名思义,就是一个存放消息的队列。最简单的消息队列包含3个角色

  • 生产者:将消息存入队列中
  • 队列:存放和管理消息
  • 消费者: 将消息从队列中取出来并做业务处理
    在这里插入图片描述
    R e d i s 提供了三种实现消息队列的方式,基于 L i s t 结构、 P u b S u b 、 S t r e a m 结构 \textcolor{red}{Redis 提供了三种实现消息队列的方式,基于List结构、PubSub、Stream结构} Redis提供了三种实现消息队列的方式,基于List结构、PubSubStream结构

1 基于List 结构实现消息队列

Redis的List是一个双向列表。可以从两端存入数据或者取出数据。

  • LPUSH/RPUSH key element 【elements】
  • BLPOP/RPOP key timeout

利用list 结构实现的消息队列主要是依据阻塞取指令 BLPOP/RPOP 来模拟消费者监听队列,直到队列中有消失时获得该数据
优点: 实现简单,且可以持久化
缺点: 只能有一个消费者来消费数据,且只能消费一次,无法避免消息的丢失

2 基于PubSub(发布/订阅)

PubSub 是一个基于点对点的消息模型,消费者可以订阅一个或者多个chanel,当生产者向队列发送了消息时,消费者只要订阅了频道就可以收到并处理消息
在这里插入图片描述

  • PUBLISH channel message 将信息 message 发送到指定的频道 channel
  • SUBSCRIBE channel [channel …] 订阅一个或多个频道
  • PSUBSCRIBE pattren 订阅与通配符匹配的chanel

在使用PSUBSCRIBE pattren 时,支持多种通配符
1 \textcolor{blue}{1} 1 ?:匹配一个字符
2 \textcolor{blue}{2} 2 * :匹配零个字符或多个字符
3 \textcolor{blue}{3} 3 [] :选择匹配,匹配[]中定义的字符 如hell[ae]o 可以匹配 hello 和 hellao

使用PubSub 实现的消费队列时,支持 多生产、多消费 \textcolor{red}{多生产、多消费} 多生产、多消费的模式,不过PubSub不支持数据的持久化,相较于List,它本身就不是一个数据结构无法利用Redis持久化数据。并且无法避免消息的丢失,如生产者向无人订阅的频道发消息时,数据会丢失。另外还会出现由于消费者的缓存空间有效,超时缓存上限时,将会出现消息的丢失。由于这些缺点,redis的PUBSUB模式,无法满足对可靠性要求较高的服务。

3 基于Stream 数据结构

Stream 是redis5.0 及之后针对消息队列场景设计的 数据结构 \textcolor{red}{数据结构} 数据结构,因此数据的安全性得到了保障,因为可以持久化。相较于List 数据结构实现的消息队列的方式,有更多针对消息队列的单独命令,可以实现一个功能更加完善的消息队列

发送消息

  • XADD k e y \textcolor{red}{key} key [ N O M K S T R E A M ] \textcolor{blue}{[NOMKSTREAM] } [NOMKSTREAM] [ < M A X L E N ∣ M I N I D > [ = ∣ ] t h r e s h o l d [ L I M I T c o u n t ] ] \textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]} [<MAXLENMINID>[= ]threshold[LIMITcount]] < ∗ ∣ i d > \textcolor{orange}{<* | id>} <id> F i e l d v a l u e [ F i e l d v a l u e . . . ] \textcolor{purple}{Field value [Field value ...]} Fieldvalue[Fieldvalue...]

参数说明
K e y \textcolor{red}{Key} Key : 存储消息的队列的名字
[ N O M K S T R E A M ] \textcolor{blue}{[NOMKSTREAM] } [NOMKSTREAM] :可选参数,是否在队列不存在时,创建队列。默认是创建的
[ < M A X L E N ∣ M I N I D > [ = ∣ ] t h r e s h o l d [ L I M I T c o u n t ] ] \textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]} [<MAXLENMINID>[= ]threshold[LIMITcount]] :可选参数,设置消息队列的最大消息数,默认是设上限的
< ∗ ∣ i d > \textcolor{orange}{<* | id>} <id> :消息的唯一id,* 表示有redis自动生成,格式是时间戳_递增值 如 1526919030474-0。Id值也可以自定义。
F i e l d v a l u e [ F i e l d v a l u e . . . ] \textcolor{purple}{Field value [Field value ...]} Fieldvalue[Fieldvalue...] 消息体

读取消息

  • XREAD [ C O U N T c o u n t ] \textcolor{red}{[COUNT count] } [COUNTcount] [ B L O C K m i l l i s e c o n d s ] \textcolor{blue}{[BLOCK milliseconds]} [BLOCKmilliseconds] S T R E A M S k e y [ k e y . . . ] \textcolor{green}{STREAMS key [key ...] } STREAMSkey[key...] i d [ i d . . . ] \textcolor{orange}{id [id ...]} id[id...]

参数说明
[ C O U N T c o u n t ] \textcolor{red}{[COUNT count] } [COUNTcount] 可选参数, 指定读取消息的条数
[ B L O C K m i l l i s e c o n d s ] \textcolor{blue}{[BLOCK milliseconds]} [BLOCKmilliseconds] 当没有消息时,读取队列消息的阻塞时长,当设置为0时,永久等待,直到读取到队列中消息
S T R E A M S k e y [ k e y . . . ] \textcolor{green}{STREAMS key [key ...] } STREAMSkey[key...] 需要读取的队列的key名字,可以从多个队列中读取数据
i d [ i d . . . ] \textcolor{orange}{id [id ...]} id[id...] 读取消息的起始Id 。有两个特殊的id,0 表示从第一个消息读起,$ 表示读取最新的一条消息

在读取消息时,可以通过while(true) 循环 调用xread block 0 streams key $ 去永久的监听队列去获得消息。不过这种模式下会出现一个问题,在获得消息并处理消息这个时间间隙中,可能生产者又往队列中增加了好几条消息,由于Id 为$ 只会读取最新的一条消息,那么可能会出现消息的漏读。这里可以采用基于消费者组去读取消息

3.1 基于消费者组去消费消息

可以将多个消费者划分到一个组中,其中每个组消费消息时都会维护一个最后消费消息的标识 L a s t d e l i v e r e d i d \textcolor{red}{Last delivered id} Lastdeliveredid,当宕机重启后,直接从该标识id之后的消息消费。意味者不会重复消费消息。
在消费者组中还维护了一个 Pending_ids集合,该集合中存放了未确认【ACK】消费数据的消息Id,
机器出现宕机后重启,可继续确认未处理的消息。可以通过 X A C K \textcolor{red}{XACK} XACK来确认客户端确认已经消费了消息,之后从Pending_ids集合中移除。

基于消费者组消费消息时,最大程度的保证了消息的安全消费、不重复消费。
在这里插入图片描述

创建消费者组
XGROUP C R E A T E \textcolor{red}{CREATE} CREATE K E Y \textcolor{green}{KEY } KEY G R O U P N A M E \textcolor{blue}{GROUPNAME } GROUPNAME I D \textcolor{orange}{ID} ID [ M K S T R E A M ] \textcolor{purple}{ [MKSTREAM]} [MKSTREAM]

C R E A T E \textcolor{red}{CREATE} CREATE :创建组
K E Y \textcolor{green}{KEY } KEY :基于哪个队列去创建组
G R O U P N A M E \textcolor{blue}{GROUPNAME } GROUPNAME :创建的消费者组名称
I D \textcolor{orange}{ID} ID 消息的标识id。0 从头消费 $ 消费最新的消息
[ M K S T R E A M ] \textcolor{purple}{ [MKSTREAM]} [MKSTREAM] : 可选参数,当队列不存在时,是否创建队列,默认是创建

从消费者组中消费消息

XGROUPREAD GROUP g r o u p \textcolor{red}{group } group c o n s u m e r \textcolor{green}{consumer } consumer [ C O U N T c o u n t ] \textcolor{blue}{[COUNT count] } [COUNTcount] [ B l o c k m i l l i s e c o n d s ] \textcolor{orange}{[Block milliseconds] } [Blockmilliseconds] [ N O A C K ] \textcolor{purple}{ [NOACK] } [NOACK] S T R E A M S K E Y [ k e y . . . ] \textcolor{red}{STREAMS KEY [key ...]} STREAMSKEY[key...] I D [ I D . . . . . ] \textcolor{green}{ ID [ID.....] } ID[ID.....]

g r o u p \textcolor{red}{group } group : 组的名字,定义从哪个消费者组消费消息
c o n s u m e r \textcolor{green}{consumer } consumer :消费者名字,如果不存在,自动创建
[ C O U N T c o u n t ] \textcolor{blue}{[COUNT count] } [COUNTcount] :消费数量
[ B l o c k m i l l i s e c o n d s ] \textcolor{orange}{[Block milliseconds] } [Blockmilliseconds] :可选参数,阻塞时长【单位ms】,不设置时为非阻塞消费。
[ N O A C K ] \textcolor{purple}{ [NOACK] } [NOACK] :可选参数,是否自动确认。true时消息不会进入pending_ids[] 集合中,可能会有未消费的消息。所以为了安全性,无需设置。
S T R E A M S K E Y [ k e y . . . ] \textcolor{red}{STREAMS KEY [key ...]} STREAMSKEY[key...] : 监听的队列的名字
I D [ I D . . . . . ] \textcolor{green}{ ID [ID.....] } ID[ID.....] :获得消息的起始ID 。
设置成 ">" :从下一个未消费的消息开始消费。
设置成其他:均是从pending-list中获得已消费但是未确认的消息,如0 ,从pending-list中第一个消息开始。
根据实际情况可设置不同的ID 去消费消息。正常读取设置> 异常读取未确认的消息

确认消息
XACK k e y \textcolor{red}{key } key g r o u p \textcolor{green}{group } group I D [ I D . . . ] \textcolor{blue}{ ID [ID...] } ID[ID...]

k e y \textcolor{red}{key } key :队列名称
g r o u p \textcolor{green}{group } group :组名称
I D [ I D . . . ] \textcolor{blue}{ ID [ID...] } ID[ID...] :待确认的消息Id

3.2 数据测试

(1) 向order队列中添加4条消息

xadd order * voucherId 9 userId 150 orderId 79297921056506055
xadd order * voucherId 9 userId 129 orderId 79297921056506083
xadd order * voucherId 9 userId 111 orderId 79297921056506108
xadd order * voucherId 9 userId 111 orderId 79297921056506101

在这里插入图片描述
(2) 向order队列创建消费者组group_1

## 消费者组从头开始消费数据
XGROUP CREATE order  group_1 0 

(3) 从消费者组中消费消息

## 消费最新的未消费的消息,采用阻塞式获取,最长等待2000ms
XREADGROUP GROUP group_1 consumer_1 COUNT 1 BLOCK 2000 STRAEAMS order >

在这里插入图片描述
第五次消费时,阻塞等待后返回空。队列中的消息全部消费,此时都处于为确认状态,全部存入了penging-list中。
此时需要手动确认这些消息确实已经被成功的消费了,需要手动确认将其从pending-list 集合中移除

(4) 手动确认已经消费的消息

 XACK order group_1 1691146911471-0  1691148054821-0 1691148657217-0  1691202770386-0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/79981.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Day10-NodeJS和NPM配置

Day10-NodeJS和NPM 一 Nodejs 1 简介 Nodejs学习中文网:https://www.nodeapp.cn/synopsis.html Nodejs的官网:https://nodejs.org/ 概念:Nodejs是JavaScript的服务端运行环境.Nodejs不是框架,也不是编程语言,就是一个运行环境. Nodejs是基于chrome V8引擎开发的一套js代码…

【Redis】——RDB快照

Redis 是内存数据库&#xff0c;但是它为数据的持久化提供了两个技术&#xff0c;一个是AOF日志&#xff0c;另一个是RDB快照&#xff1a; AOF 文件的内容是操作命令&#xff1b;RDB 文件的内容是二进制数据。 RDB 快照就是记录某一个瞬间的内存数据&#xff0c;记录的是实际…

【云原生】kubectl命令的详解

目录 一、陈述式资源管理方式1.1基本查看命令查看版本信息查看资源对象简写查看集群信息配置kubectl自动补全node节点查看日志 1.3基本信息查看查看 master 节点状态查看命名空间查看default命名空间的所有资源创建命名空间app删除命名空间app在命名空间kube-public 创建副本控…

Spring 事务失效的八种场景

1. 抛出检查异常导致事务不能正确回滚 Service public class Service1 {Autowiredprivate AccountMapper accountMapper;Transactionalpublic void transfer(int from, int to, int amount) throws FileNotFoundException {int fromBalance accountMapper.findBalanceBy(from…

LeetCode每日一题——1331.数组序号转换

题目传送门 题目描述 给你一个整数数组 arr &#xff0c;请你将数组中的每个元素替换为它们排序后的序号。 序号代表了一个元素有多大。序号编号的规则如下&#xff1a; 序号从 1 开始编号。一个元素越大&#xff0c;那么序号越大。如果两个元素相等&#xff0c;那么它们的…

【Python】模块学习之locust性能测试

目录 背景 安装 测试代码 运行命令 资料获取方法 背景 locust是一个python的第三方库&#xff0c;用于做性能测试&#xff0c;可使用多台机器同时对一台服务器进行压测&#xff0c;使用其中一台机器作为主节点&#xff0c;进行分布式管理 博主测试接口的时候一直是使用p…

【使用 DSP 滤波器加速速度和位移】使用信号处理算法过滤加速度数据并将其转换为速度和位移研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

C++入门之stl六大组件--List源码深度剖析及模拟实现

文章目录 前言 一、List源码阅读 二、List常用接口模拟实现 1.定义一个list节点 2.实现一个迭代器 2.2const迭代器 3.定义一个链表&#xff0c;以及实现链表的常用接口 三、List和Vector 总结 前言 本文中出现的模拟实现经过本地vs测试无误&#xff0c;文件已上传gite…

FPGA优质开源模块 - SRIO

本文介绍一个FPGA常用模块&#xff1a;SRIO&#xff08;Serial RapidIO&#xff09;。SRIO协议是一种高速串行通信协议&#xff0c;在我参与的项目中主要是用于FPGA和DSP之间的高速通信。有关SRIO协议的详细介绍网上有很多&#xff0c;本文主要简单介绍一下SRIO IP核的使用和本…

【Shell】基础语法(二)

文章目录 一、Shell基本语法文件名代换命令代换算术代换转义字符引号 二、Shell脚本语法条件测试分支结构循环 三、总结 一、Shell基本语法 文件名代换 用于匹配的字符称为通配符&#xff08;Wildcard&#xff09;&#xff0c;如&#xff1a;* ? [ ] 具体如下&#xff1a; *…

mysql 数据库引擎介绍

一、数据库引擎 数据库引擎是用于存储、处理和保护数据的核心服务。利用数据库引擎可控制访问权限并快速处理事务&#xff0c;从而满足企业内大多数需要处理大量数据的应用程序的要求。 使用数据库引擎创建用于联机事务处理或联机分析处理数据的关系数据库。这包括创建用于存储…

分布式事务

事务是用户定义的一系列的数据库操作&#xff0c;这些操作可以视为一个完整的逻辑处理工作单元&#xff0c;要么全部成功&#xff08;全部执行&#xff09;&#xff0c;要么全部失败&#xff08;全都不执行&#xff09;&#xff0c;是不可分割的工作单元 分布式事务是指会涉及…

[NOIP2007 普及组] 纪念品分组

[NOIP2007 普及组] 纪念品分组 题目描述 元旦快到了&#xff0c;校学生会让乐乐负责新年晚会的纪念品发放工作。为使得参加晚会的同学所获得 的纪念品价值相对均衡&#xff0c;他要把购来的纪念品根据价格进行分组&#xff0c;但每组最多只能包括两件纪念品&#xff0c; 并且…

17、YML配置文件及让springboot启动时加载我们自定义的yml配置文件的几种方式

YML配置文件及加载自定义配置文件的几种方式 ★ YAML配置文件 其实本质和.properties文件的是一样的。 Spring Boot默认使用SnakeYml工具来处理YAML配置文件&#xff0c;SnakeYml工具默认就会被spring-boot-starter导入&#xff0c;因此无需开发者做任何额外配置。 YAML本质…

ip网络广播系统网络音频解码终端公共广播SV-7101

SV-7101V网络音频终端产品简介 网络广播终端SV-7101V&#xff0c;接收网络音频流&#xff0c;实时解码播放。本设备只有网络广播功能&#xff0c;是一款简单的网络广播终端。提供一路线路输出接功放或有源音箱。 产品特点 ■ 提供固件网络远程升级■ 标准RJ45网络接口&…

安装Win10操作系统时找不到任何驱动器的解决方法

安装Win10操作系统时找不到任何驱动器的解决方法 有时候在一台新电脑上使用U盘安装系统时提示&#xff1a;我们找不到任何驱动器。 如下图所示&#xff1a; 解决方法&#xff1a; 一、按F12&#xff08;不同电脑进入Bios的按键可能不同&#xff09;将电脑进入Bios画面&#x…

MySQL 的事件调度器

MySQL 的事件调度器可以通过以下方式进行管理&#xff1a; 1】查看事件调度器的状态 SHOW VARIABLES LIKE event_scheduler;2】启用/禁用事件调度器 SET GLOBAL event_scheduler ON;SET GLOBAL event_scheduler OFF; 注意&#xff1a;启用/禁用事件调度器需要具有 SUPE…

处理nacos、tomcat、nginx日志增长过快问题

1.nacos日志清理 修改nacos-logback.xml 将日志级别改为error级&#xff0c;减少info级日志产生量 将<maxHistory>调整为2以下&#xff0c;将 <totalSizeCap>调整为2GB左右 比如&#xff1a; [rootiZ0jlapur4hqjezy8waee0Z logs]# ll -h total 2.1G -rw-r--r-…

【Spring Boot】Thymeleaf模板引擎 — 表达式的语法

表达式的语法 模板的主要作用是将后台返回的数据渲染到HTML中。那么Thymeleaf是如何解析后台数据的呢&#xff1f;接下来从变量、方法、条件判断、循环、运算&#xff08;逻辑运算、布尔运算、比较运算、条件运算&#xff09;方面学习Thymeleaf表达式支持的语法。 1.赋值和拼…

RPC框架引入zookeeper服务注册与服务发现

Zookeeper概念及其作用 ZooKeeper是一个分布式的&#xff0c;开放源码的分布式应用程序协调服务&#xff0c;是Google的Chubby一个开源的实现&#xff0c;是大数据生态中的重要组件。它是集群的管理者&#xff0c;监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理…