使用 Redis Streams 实现高性能消息队列

1. 引言

在后端开发中,消息队列是一个常见的组件,主要用于解耦系统、提高吞吐量以及实现异步处理。常见的消息队列包括 Kafka、RabbitMQ 以及 ActiveMQ,但 Redis Streams 作为 Redis 5.0 引入的新特性,也提供了一种高效、轻量的消息队列解决方案。

本文将深入探讨 Redis Streams 的核心概念,并演示如何在后端服务中使用 Redis Streams 实现一个高性能的消息队列。


2. Redis Streams 基本概念

Redis Streams 是 Redis 提供的流数据结构,允许存储和消费有序的数据流。它的主要特点包括:

  • 持久化存储:不同于 Pub/Sub 仅支持瞬时消息,Streams 支持持久化存储。

  • 消费分组(Consumer Groups):支持多个消费者消费不同的消息,提高并行能力。

  • 自动消息确认(Acknowledgment):支持消费确认机制,保证消息可靠性。

  • 阻塞读取(Blocking Reads):可以使用 XREADXREADGROUP 进行阻塞式消费,提高实时性。

Redis Streams 的数据结构类似于日志系统,每条消息都带有唯一的 ID 及对应的数据字段,如:

XADD mystream * field1 value1 field2 value2

上面的命令将 field1:value1field2:value2 存入 mystream 流中,* 让 Redis 自动生成 ID。


3. Redis Streams 基本操作

3.1 生产者:添加消息到 Stream

在 Redis 中,使用 XADD 命令向 Stream 发送消息,例如:

XADD my_stream * user_id 12345 action "login"

其中,my_stream 是流名称,* 表示自动生成 ID,user_idaction 代表存储的数据。

3.2 消费者:读取 Stream 中的消息

使用 XRANGE 读取 Stream 消息:

XRANGE my_stream - +

-+ 表示从头到尾读取所有消息。

3.3 组消费模式(Consumer Groups)

创建消费组:

XGROUP CREATE my_stream my_group 0 MKSTREAM

添加消费者并读取数据:

XREADGROUP GROUP my_group consumer1 COUNT 10 STREAMS my_stream >

确认消息已被处理:

XACK my_stream my_group 1681956776310-0

删除已确认的消息(减少存储占用):

XDEL my_stream 1681956776310-0

4. Redis Streams 在后端开发中的应用

4.1 场景 1:用户行为日志存储

应用 Redis Streams 记录用户行为,如登录、点击、浏览等,后台可实时分析用户数据:

  • 生产者:前端或业务逻辑向 user_logs 追加用户行为数据。

  • 消费者:后端消费日志,存入数据库或进行实时分析。

4.2 场景 2:任务队列

Redis Streams 适合作为任务队列,将任务推送到 Stream,多个 Worker 并发消费,提高处理能力。

  • 生产者:任务生成器将任务推送到 task_queue

  • 消费者:多个 Worker 消费任务并处理。

  • 优势:相比传统队列,Redis Streams 可回溯未处理的任务,确保任务不会丢失。


5. Redis Streams vs 传统消息队列

特性Redis StreamsKafkaRabbitMQ
消息持久化
消息确认机制
并行消费
去重功能
性能超高

从表中可以看出,Redis Streams 适用于轻量级消息队列需求,如日志收集、任务队列等,而 Kafka 适用于高吞吐量场景。


6. 示例代码:基于 Python 的 Redis Streams 生产者 & 消费者

安装 Redis-Py 依赖

pip install redis

生产者(Producer)

import redisr = redis.Redis(host='localhost', port=6379, decode_responses=True)# 发送消息
def send_message():r.xadd('task_queue', {'task_id': '123', 'action': 'process_data'})print("Message sent!")send_message()

消费者(Consumer)

import redisdef consume_messages():r = redis.Redis(host='localhost', port=6379, decode_responses=True)while True:messages = r.xread({'task_queue': '$'}, count=1, block=1000)for stream, msgs in messages:for msg_id, data in msgs:print(f"Processing {data}")r.xack('task_queue', 'task_group', msg_id)consume_messages()

7. 总结

Redis Streams 作为 Redis 5.0 引入的新功能,在高性能消息队列场景下表现出色。相比 Kafka 和 RabbitMQ,Redis Streams 适用于中小型业务场景,如日志收集、任务队列等,同时具备持久化存储、消费分组及确认机制。

如果你的项目已经使用 Redis,并且有消息队列需求,Redis Streams 可能是一个非常合适的选择。


8. 参考资料

  • Redis 官方文档 - Streams

  • Redis Streams vs Kafka


希望这篇文章能帮你快速掌握 Redis Streams 并在实际项目中应用!🎯

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/13761.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通信易懂唠唠SOME/IP——SOME/IP-SD服务发现阶段和应答行为

一 SOME/IP-SD服务发现阶划分 服务发现应该包含3个阶段 1.1 Initial Wait Phase初始等待阶段 初始等待阶段的作用 初始等待阶段是服务发现过程中的一个阶段。在这个阶段,服务发现模块等待服务实例的相关条件满足,以便继续后续的发现和注册过程。 对…

1. Kubernetes组成及常用命令

Pods(k8s最小操作单元)ReplicaSet & Label(k8s副本集和标签)Deployments(声明式配置)Services(服务)k8s常用命令Kubernetes(简称K8s)是一个开源的容器编排系统,用于自动化应用程序的部署、扩展和管理。自2014年发布以来,K8s迅速成为容器编排领域的行业标准,被…

Vue全流程--Vue2组件的理解第二部分

组件命名规则 好的命名规则可以省去很多不必要的麻烦,这个好习惯还是要养成的 一个单词组成: 第一种写法(首字母小写):school 第二种写法(首字母大写):School 多个单词组成: 第一种写法(kebab-case命名)&#xf…

【OS】AUTOSAR架构下的Interrupt详解(上篇)

目录 前言 正文 1.中断概念分析 1.1 中断处理API 1.2 中断级别 1.3 中断向量表 1.4 二类中断的嵌套 1.4.1概述 1.4.2激活 1.5一类中断 1.5.1一类中断的实现 1.5.2一类中断的嵌套 1.5.3在StartOS之前的1类ISR 1.5.4使用1类中断时的注意事项 1.6中断源的初始化 1.…

红包雨项目前端部分

创建项目 pnpm i -g vue/cli vue create red_pakage pnpm i sass sass-locader -D pnpm i --save normalize.css pnpm i --save-dev postcss-px-to-viewportpnpm i vantlatest-v2 -S pnpm i babel-plugin-import -Dhttps://vant.pro/vant/v2/#/zh-CN/<van-button click&…

深入理解k8s中的容器存储接口(CSI)

CSI出现的原因 K8s原生支持一些存储类型的PV&#xff0c;像iSCSI、NFS等。但这种方式让K8s代码与三方存储厂商代码紧密相连&#xff0c;带来不少麻烦。比如更改存储代码就得更新K8s组件&#xff0c;成本高&#xff1b;存储代码的bug还会影响K8s稳定性&#xff1b;K8s社区维护和…

DeepSeek回答禅宗三重境界重构交易认知

人都是活在各自心境里&#xff0c;有些话通过语言去交流&#xff0c;还是要回归自己心境内在的&#xff0c;而不是靠外在映射到股票和技术方法&#xff1b;比如说明天市场阶段是不修复不接力节点&#xff0c;这就是最高视角看整个市场&#xff0c;还有哪一句话能概括&#xff1…

简单说一下CAP理论和Base理论

CAP理论 什么是CAP 一致性 可用性 分区容错性&#xff1a;系统如果不能再时限内达成数据一致性&#xff0c;就说明发生了分区的情况 然后当前操作在C和A之间做出选择 例如我的网络出现问题了&#xff0c;但是我们的系统不能因为网络问题就直接崩溃 只要我们的分布式系统没…

13.PPT:诺贝尔奖【28】

目录 NO1234 NO567 NO8/9/10 NO11/12 NO1234 设计→变体→字体→自定义字体 SmartArt超链接新增加节 NO567 版式删除图片中的白色背景&#xff1a;选中图片→格式→删除背景→拖拉整个图片→保留更改插入→图表→散点图 &#xff1a;图表图例、网格线、坐标轴和图表标题…

RabbitMQ的安装

1、官网地址 下载地址&#xff1a;Installing RabbitMQ | RabbitMQhttp://www.rabbitmq.com/download.htmlhttp://www.rabbitmq.com/download.html RabbitMQ Documentation | RabbitMQhttps://www.rabbitmq.com/docshttps://www.rabbitmq.com/docs 2、Windows上安装 2.1 安装…

【LeetCode】152、乘积最大子数组

【LeetCode】152、乘积最大子数组 文章目录 一、dp1.1 dp1.2 简化代码 二、多语言解法 一、dp 1.1 dp 从前向后遍历, 当遍历到 nums[i] 时, 有如下三种情况 能得到最大值: 只使用 nums[i], 例如 [0.1, 0.3, 0.2, 100] 则 [100] 是最大值使用 max(nums[0…i-1]) * nums[i], 例…

【分布式理论六】分布式调用(4):服务间的远程调用(RPC)

文章目录 一、RPC 调用过程二、RPC 动态代理&#xff1a;屏蔽远程通讯细节1. 动态代理示例2. 如何将动态代理应用于 RPC 三、RPC 序列化四、RPC 协议编码1. 协议编码的作用2. RPC 协议消息组成 五、RPC 网络传输1. 网络传输流程2. 关键优化点 一、RPC 调用过程 RPC&#xff08…

Spring Task之Cron表达式

&#x1f31f; Spring Task高能预警&#xff1a;你以为的Cron表达式可能都是错的&#xff01;【附实战避坑指南】 开篇暴击&#xff1a;为什么你的定时任务总在凌晨3点翻车&#xff1f; “明明设置了0 0 2 * * ?&#xff0c;为什么任务每天凌晨3点执行&#xff1f;” —— 来…

第16章 Single Thread Execution设计模式(Java高并发编程详解:多线程与系统设计)

简单来说&#xff0c; Single Thread Execution就是采用排他式的操作保证在同一时刻只能有一个线程访问共享资源。 1.机场过安检 1.1非线程安全 先模拟一个非线程安全的安检口类&#xff0c;旅客(线程)分别手持登机牌和身份证接受工作人员的检查&#xff0c;示例代码如所示。…

OSPF基础(2):数据包详解

OSPF数据包(可抓包) OSPF报文直接封装在IP报文中&#xff0c;协议号89 头部数据包内容&#xff1a; 版本(Version):对于OSPFv2&#xff0c;该字段值恒为2(使用在IPV4中)&#xff1b;对于OSPFv3&#xff0c;该字段值恒为3(使用在IPV6中)。类型(Message Type):该OSPF报文的类型。…

MAC 安装mysql全过程记录

4.然后等待下载吧&#xff0c;&#xff08;下载中。。。。&#xff09;&#xff0c;好了&#xff0c;网速的问题&#xff0c;半个小时终于下载好了&#xff0c;开始安装吧。 5.得到如下安装包&#xff0c;mac下也是双击直接下载&#xff0c;来&#xff0c;我们来看看下载的过程…

神经网络常见激活函数 1-sigmoid函数

sigmoid 1 函数求导 sigmoid函数 σ ( x ) 1 1 e ( − x ) \sigma(x) \frac{1}{1e^{(-x)}} σ(x)1e(−x)1​ sigmoid函数求导 d d x σ ( x ) d d x ( 1 1 e − x ) e − x ( 1 e − x ) 2 ( 1 e − x ) − 1 ( 1 e − x ) 2 1 1 e − x − 1 ( 1 e − x ) 2 …

微软发布基于PostgreSQL的开源文档数据库平台DocumentDB

我们很高兴地宣布正式发布DocumentDB——一个开源文档数据库平台&#xff0c;以及基于 vCore、基于 PostgreSQL 构建的 Azure Cosmos DB for MongoDB 的引擎。 过去&#xff0c;NoSQL 数据库提供云专用解决方案&#xff0c;而没有通用的互操作性标准。这导致对可互操作、可移植…

【苍穹外卖 Day1】前后端搭建 Swagger导入接口文档

项目技术选型 前端 直接使用打包好的nginx运行。 后端 1、导入初始代码结构如下&#xff1a; 2、将代码上传远程仓库。 3、创建数据库&#xff0c;并修改数据库配置。 4、断点调试&#xff0c;前后端联调。 5、使用Nginx代理&#xff0c;修改Nginx配置 好处&#xff1a;提…

零基础Vue入门6——Vue router

本节重点&#xff1a; 路由定义路由跳转 前面几节学习的都是单页面的功能&#xff08;都在专栏里面https://blog.csdn.net/zhanggongzichu/category_12883540.html&#xff09;&#xff0c;涉及到项目研发都是有很多页面的&#xff0c;这里就需要用到路由&#xff08;vue route…