C# redis通过stream实现消息队列以及ack机制

redis实现

查看redis版本

redis需要>5.0
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

它实现了大部分消息队列的功能:

  • 消息 ID 系列化生成;
  • 消息遍历;
  • 消息的阻塞和非阻塞读;
  • Consumer Groups 消费组;
  • ACK 确认机制。
  • 支持多播。

本次主要实现基本的消息发送接受确认,消费组有需要的可以看参考的文章

info

在这里插入图片描述

插入消息

XADD streamName id field value [field value ...]
# 消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。
# 消息 ID 由两部分组成:当前毫秒内的时间戳; 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令
XADD queue01 * name wjl age 25 gender male

在这里插入图片描述

读取消息

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREAD COUNT 1 BLOCK 0 STREAMS queue01  0-0
# 指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# 如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。

在这里插入图片描述
这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0指令的时候又会重新读取到。

创建消费组

# Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
# 随便再插入一些数据
XADD queue01 * name zhangsan age 52 gender male
XADD queue01 * name lisi age 34 gender male
XADD queue01 * name xiaomei age 24 gender famale
# 创建消费组的指令
# 格式
XGROUP CREATE stream group start_id
# stream:指定队列的名字;
# group:指定消费组名字;
# start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。# 新建group01消费组
XGROUP CREATE queue01 group01 0-0 MKSTREAM

读取群组消息

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
# >:命令的最后参数 >,表示从尚未被消费的消息开始读取;
# BLOCK 0:表示阻塞读取,要是大于0就是等待多少毫秒

在这里插入图片描述

如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。

在这里插入图片描述

查看已读未确认消息

XREADGROUP GROUP groupName consumerName
XPENDING queue01 group01 

在这里插入图片描述

1 # 未读消息条数
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最小
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最大
consumer01
1

查看消费者读取了哪些数据

XPENDING queue01 group01 - + 10 consumer01

在这里插入图片描述

确认消息

XACK key group-key ID [ID ...]XACK queue01 group01 1696822787364-0

在这里插入图片描述
再次查询未读消息

XPENDING queue01 group01 
XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >

在这里插入图片描述
在这里插入图片描述

C#操作redis实现

使用FreeRedis类库,熟悉了上面的流程,直接上代码

using FreeRedis;namespace RedisMQStu01
{internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字//添加数据await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");//创建群组,如果数据存在则不需要执行了,第一次需要执行await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");var result = await cli.XReadGroupAsync(groupName, consumerName,1, 0, noack: false, ids);//查看已读未确认的消息var unReadResults = await cli.XPendingAsync(queueName, groupName);await Console.Out.WriteLineAsync($"未读消息条数为:{unReadResults.count}");foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idawait Console.Out.WriteAsync($"\t");foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($"\t{field.ToString()}");}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName,groupName, entry.id);}}await Console.Out.WriteLineAsync("完成");}}
}

上面的代码是生产者和消费者在一块,不满足生产环境要求,因为生产环境大多需要分开,生产者只负责生产,消费者只负责消费

生产者

using FreeRedis;namespace RedisMQProductor01
{internal class Program{/// <summary>/// redis消息队列的生产者/// </summary>/// <param name="args"></param>/// <returns></returns>async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字//添加数据await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");await Console.Out.WriteLineAsync("生产者添加数据完成");}}
}

消费者

using FreeRedis;namespace RedisMQConsumer01
{/// <summary>/// redis消息队列的消费者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字//如果数据存在则不需要执行了,第一次需要执行var info = await cli.XInfoGroupsAsync(queueName);if (info == null || info.Length < 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);}//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");//block的值是0表示无限等待var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);while (true){if (result != null && result.Length > 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idawait Console.Out.WriteAsync($"\t");foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($"\t{field.ToString()}");}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync("===============本次处理完毕===============");}//继续等待result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}

先启动生产者在启动消费者查看效果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

方法改善

改善之后可以先启动消费者然后等待生产者投递数据即可

消费者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu02
{/// <summary>/// 备份策略消费者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字try{var streamInfo = cli.XInfoStream(queueName);}catch{await cli.XAddAsync(queueName, "student", "");}//如果数据存在则不需要执行了,第一次需要执行var info = await cli.XInfoGroupsAsync(queueName);if (info == null || info.Length < 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);}//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");//block的值是0表示无限等待var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);ConnectionConfig connectionConfig = new ConnectionConfig(){ConnectionString = "",//自己写数据库链接字符串IsAutoCloseConnection = true,DbType = DbType.SqlServer};using SqlSugarClient db = new SqlSugarClient(connectionConfig);//初始化表格db.CodeFirst.InitTables(typeof(Student));while (true){if (result != null && result.Length > 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idfor (int i = 0; i < entry.fieldValues.Length; i++){var field = entry.fieldValues[i];if (field.ToString() == "student"){var studentListJson = entry.fieldValues[i + 1]?.ToString() ?? "";if (string.IsNullOrWhiteSpace(studentListJson)){continue;}var students = JsonConvert.DeserializeObject<List<Student>>(studentListJson);await db.Storageable(students).ExecuteCommandAsync();}}//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync("===============本次处理完毕===============");}//继续等待result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}

生产者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu01
{/// <summary>/// 备份策略生产者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var perProcessNumber = 1000;//每次处理的数据条数int totalPage = 0;//总页码数ConnectionConfig connectionConfig = new ConnectionConfig(){ConnectionString = "",IsAutoCloseConnection = true,DbType = DbType.SqlServer};using (SqlSugarClient db = new SqlSugarClient(connectionConfig)){//初始化表格db.CodeFirst.InitTables(typeof(Student));do{int count = await db.Queryable<Student>().CountAsync();totalPage = count % perProcessNumber == 0 ? count / perProcessNumber : (count / perProcessNumber) + 1;var students = await db.Queryable<Student>().ToPageListAsync(totalPage, perProcessNumber);//批量发送,redis频繁写入会报rdb错误,限制一下写入频率await cli.XAddAsync(queueName, "student", JsonConvert.SerializeObject(students));List<int> deleteStudents = students.Select(p => p.Id).ToList();if (deleteStudents.Any()){//批量删除await db.Deleteable<Student>().Where(p => deleteStudents.Contains(p.Id)).ExecuteCommandAsync();}totalPage -= 1;//Thread.Sleep(2000);} while (totalPage > 0);}await Console.Out.WriteLineAsync("生产者添加数据完成");}}
}

参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/155251.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TensorFlow入门(二十、损失函数)

损失函数 损失函数用真实值与预测值的距离指导模型的收敛方向,是网络学习质量的关键。不管是什么样的网络结构,如果使用的损失函数不正确,最终训练出的模型一定是不正确的。常见的两类损失函数为:①均值平方差②交叉熵 均值平方差 均值平方差(Mean Squared Error,MSE),也称&qu…

Vue思考题_01v-for与v-if的优先级谁更高

目录 vue2vue3 官方文档上说不推荐将v-for与v-if在同一个标签上使用&#xff0c;因为两者优先级并不明显。 那么到底是那个指令的优先级比较高呢&#xff1f; 在vue2与vue3中答案是相反的。 vue2 在vue2中将2个指令放在同一个标签上 <template><ul><li v-fo…

Vue3中reactive, onMounted, ref,toRaw,conmpted 使用方法

import { reactive, onMounted, ref,toRaw,conmpted } from vue; vue3中 reactive &#xff0c;ref &#xff0c; toRaw&#xff0c;watch&#xff0c;conmpted 用法 toRaw 返回原响应式对象 用法&#xff1a; const rowList toRaw(row) reactive:ref: ref和reactive都是V…

关于链表指针的深刻理解

以下列代码为例 //终于给我搞清楚指针的指向究竟是怎么看的了// 按编号对职工记录进行递增排序 void sortById(List* list) {Employee* p, * q, * tail NULL;// tail 变量则是一个边界指针&#xff0c;初始值为 NULL。while (list->head->next ! tail) // tail 变量则是…

【通信系列 1 -- GSM 和 LTE】

文章目录 1. LTE(Long Term Evolution)1.1 FDD&TDD简介1.1.1 3G与4G差异1.1.2 频点与band关系1.1.3 band 与运营商的关系 1.2 TDD&FDD区别1.2.1 FDD帧结构1.2.2 TDD帧结构1.2.3 TDD&FDD优势对比1.2.4 TDD缺点 1.3 VoLTE1.3.1 VoLTE 优点11.3.2 VoLTE 优点21.3.3 Vo…

【ARM Coresight 系列文章19 -- Performance Monitoring Unit(性能监测单元)

文章目录 1.1 PMU 介绍1.2 PMU 寄存器1.2.1 PMU 管理寄存器1.2.2 PMU 外设识别寄存器1.2.3 PMU 组件识别寄存器1.3 性能监控事件1.3.1 Cortex-A9 特定事件1.1 PMU 介绍 许多体系结构都包含 PMU(Performance Monitoring Unit)硬件,用于跟踪、计数系统内部的一些底层硬件事件…

MySql运维篇---009:分库分表:垂直拆分、水平拆分、通过MyCat进行分片,读写分离:一主一从、 双主双从

3.分库分表 3.1 介绍 3.1.1 问题分析 使用单个数据库存储所有的数据&#xff0c;如果磁盘和内存和内存不足了可以增大磁盘和内存&#xff0c;但是对于一台服务器的磁盘和内存不可能无限制的扩张下去&#xff0c;它是受我们服务器的硬件影响的&#xff0c;如果说数据库所存储…

mp4音视频分离技术

文章目录 问题描述一、分离MP3二、分离无声音的MP4三、结果 问题描述 MP4视频想拆分成一个MP3音频和一个无声音的MP4文件 一、分离MP3 ffmpeg -i C:\Users\Administrator\Desktop\一个文件夹\我在财神殿里长跪不起_完整版MV.mp4 -vn C:\Users\Administrator\Desktop\一个文件…

机器学习之自训练协同训练

前言 监督学习往往需要大量的标注数据&#xff0c; 而标注数据的成本比较高 &#xff0e; 因此 &#xff0c; 利用大量的无标注数据来提高监督学习的效果有着十分重要的意义&#xff0e; 这种利用少量标注数据和大量无标注数据进行学习的方式称为 半监督学习 &#xff08; Semi…

Java进击框架:Spring-Bean初始化过程(五)

Java进击框架&#xff1a;Spring-Bean初始化过程&#xff08;五&#xff09; 前言源码初始化配置加载Bean加载Bean对象加载Bean切面 Bean工厂后置处理器注册Bean后置处理器初始化消息源初始化应用程序事件多播器注册监听器完成Bean工厂初始化Bean初始化完成刷新应用上下文创建完…

VSCODE+PHP8.2配置踩坑记录

VSCODEPHP8.2配置踩坑记录 – WhiteNights Site 我配置过的最恶心的环境之一&#xff1a;windows上的php。另一个是我centos服务器上的php。 进不了断点 端口配置和xdebug的安装 这个应该是最常见的问题了。从网上下载完php并解压到本地&#xff0c;打开vscode&#xff0c;安装…

【网路安全 --- Linux,window常用命令】网络安全领域,Linux和Windows常用命令,记住这些就够了,收藏起来学习吧!!

一&#xff0c;Linux 1-1 重要文件目录 1-1-1 系统运行级别 /etc/inittab 1-1-2 开机启动配置文件 /etc/rc.local /etc/rc.d/rc[0~6].d## 当我们需要开机启动自己的脚本时&#xff0c;只需要将可执行脚本丢在 /etc/init.d 目录下&#xff0c;然后在 /etc/rc.d/rc*.d 中建…

XLSX.utils.sheet_to_json() 数字格式转为字符串格式

raw 默认为true&#xff0c;设置为false就可以了 XLSX.utils.sheet_to_json(workbook.Sheets[sheet], {raw:false})

Maven NetBeans

目录 在 NetBeans 里打开一个 Maven 项目 在 NetBeans 里构建一个 Maven 项目 在 NetBeans 里运行应用程序 NetBeans 6.7 及更新的版本已经内置了 Maven。对于以前的版本&#xff0c;可在插件管理中心获取 Maven 插件。此例中我们使用的是 NetBeans 6.9。 关于 NetBeans 的一…

竹云筑基,量子加密| 竹云携手国盾量子构建量子身份安全防护体系

9月23日-24日&#xff0c;2023量子产业大会在安徽合肥举行。作为量子科技领域行业盛会&#xff0c;2023年量子产业大会以“协同创新 量点未来”为主题&#xff0c;展示了前沿的量子信息技术、产业创新成果&#xff0c;并举办主旨论坛、量子科普讲座等系列专项活动。量子信息作为…

PDMS二次开发(二十一)——关于Pipeline工具生成螺栓材料表的计算思路

目录 1.简述2.螺栓元件的数据2.1 A1A等级中螺栓元件库2.2 A1A等级中法兰元件库2.3 A1A要指定螺栓等级2.4 螺栓等级数据解析 3.问题解释 1.简述 因为有好几个网友问到螺栓材料表生成报错的问题&#xff0c;我初步分析可能还是因为螺栓元件库的问题&#xff0c;我这里对Pipeline…

Spark入门

Spark 1.Spark概述 2.Spark特点 3.RDD概述 1. Spark概述 什么是Spark 回顾&#xff1a;Hadoop主要解决&#xff0c;海量数据的存储和海量数据的分析计算。 Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 Hadoop与Spark历史 Hadoop与Spark框架对比 Dr…

内网渗透面试问题

文章目录 1、熟悉哪些域渗透的手段2、详细说明哈希传递的攻击原理NTLM认证流程哈希传递 3、聊一下黄金票据和白银票据4、shiro反序列化漏洞的形成原因&#xff0c;尝试使用burp抓包查看返回包内容安装环境漏洞验证 5、log4j组件的命令执行漏洞是如何造成的6、画图描述Kerberos协…

9-AJAX-上-原理详解

一、定义 1、什么是Ajax Ajax&#xff1a;即异步 JavaScript 和XML。Ajax是一种用于创建快速动态网页的技术。通过在后台与进行少量数据交换&#xff0c;Ajax可以使网页实现异步更新。这意味着可以在不重新加载整个网页的情况下&#xff0c;对网页的某部分进行更新。而传统的…

时代风口中的Web3.0基建平台,重新定义Web3.0!

近年来&#xff0c;Web3.0概念的广泛兴起&#xff0c;给加密行业带来了崭新的叙事方式&#xff0c;同时也为加密行业提供了更加具有想象力的应用场景与商业空间&#xff0c;并让越来越多的行业从业者们意识到只有更大众化的市场共性需求才能推动加密市场的持续繁荣。当前围绕这…