Golang使用消息队列(RabbitMQ)

最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。

使用RabbitMQ的架构

在这里插入图片描述

代码

因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节

  • 注意交换机和队列的绑定,一定要细心
  • 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
  • 下列的内容不包含RabbitMQ持久化的内容
package mainimport ("fmt""github.com/streadway/amqp""log""strings"
)func InitRabbitMQ() *amqp.Connection {mq := "amqp"host := "127.0.0.1"port := "5672"user := "root"pwd := "root"dns := strings.Join([]string{mq, "://", user, ":", pwd,"@", host, ":", port, "/"}, "")conn, err := amqp.Dial(dns)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}return conn
}func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {// 队列信息exchangeName := "main_exchange"queueName := fmt.Sprintf("user_queue_%s", userID)messageTTL := int32(300000)// 声明主交换机err := ch.ExchangeDeclare(exchangeName, // 交换机名"direct",     // Exchange typefalse,        // Durablefalse,        // Auto-deletedfalse,        // Internalfalse,        // No-waitnil,          // Arguments)if err != nil {log.Fatalf("Failed to declare an main exchange: %v", err)}// 声明用户队列_, err = ch.QueueDeclare(queueName, // 队列名false,     // Durablefalse,     // Delete when unusedfalse,     // Exclusivefalse,     // No-waitamqp.Table{"x-dead-letter-routing-key": "dead",          // routing-key"x-dead-letter-exchange":    "dead_exchange", // 死信交换机"x-message-ttl":             messageTTL,      // TTL},)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}return ch
}func InitDeadExchangeAndQueue(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil,)if err != nil {log.Fatalf("Failed to declare an dead exchange: %v", err)}// 声明一个死信队列_, err = ch.QueueDeclare("dead_queue",true,false,false,false,nil)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}
}func PublishMessage(ch *amqp.Channel, userID, fileID string) {// 用户信息message := fmt.Sprintf("%s|%s", userID, fileID)exchangeName := "main_exchange"// 发布用户消息err := ch.Publish(exchangeName, // ExchangeuserID,       // Routing keyfalse,        // Mandatoryfalse,        // Immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(message),})if err != nil {log.Fatalf("Failed to publish a message: %v", err)}log.Printf("Message sent to user %s: %s", userID, message)
}func ConsumeTTL(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange", // 交换机名"direct",        // Exchange typetrue,            // Durablefalse,           // Auto-deletedfalse,           // Internalfalse,           // No-waitnil,             // Arguments)if err != nil {log.Fatalf("Failed to declare a dead letter exchange: %v", err)}// 创建消费者并阻塞等待消费死信队列中的消息megs, err := ch.Consume("dead_queue", // Queue"",           // Consumerfalse,        // Auto-acknowledgefalse,        // Exclusivefalse,        // No-localfalse,        // No-waitnil,          // Args)if err != nil {log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)}// 使用无限循环一直监听fmt.Println("Waiting for message from dead_queue......")for d := range megs {// 实际中,处理消息的逻辑,例如删除文件或其他操作fmt.Println(string(d.Body))// 消费完成后手动确认消息err = d.Ack(false)if err != nil {log.Fatalf("Failed to ack message: %v", err)}}
}func Consume(ch *amqp.Channel, userID string) {// 下面的信息可以通过前后端进行传递queueName := fmt.Sprintf("user_queue_%s", userID)// 消费消息megs, err := ch.Consume(queueName, // Queue"",        // Consumertrue,      // Auto-acknowledgefalse,     // Exclusivefalse,     // No-localfalse,     // No-waitnil,       // Args)if err != nil {log.Fatalf("Failed to register a consumer: %v", err)}// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听d, ok := <-megsif !ok {log.Fatalf("Failed to get message: %v", err)}fmt.Println(string(d.Body))// 消息完成后确认消息err = d.Ack(true)if err != nil {log.Fatalf("Failed to ack message: %v", err)}
}func main() {// 获取客户端client := InitRabbitMQ()defer client.Close()ch, err := client.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 = "test-id1"//var fileID1 = "file1"// 构造main_exchange及user_queue//ch = InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2:模拟其不被后端消费,过期到死信队列中var userID2 = "test-id2"var fileID2 = "file2"ch = InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费,理论上应当被死信队列消费
}

从dead_exchange中消费:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/103577.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用端口映射实现Spring Boot服务端接口的公网远程调试:详细配置与步骤解析

文章目录 前言1. 本地环境搭建1.1 环境参数1.2 搭建springboot服务项目 2. 内网穿透2.1 安装配置cpolar内网穿透2.1.1 windows系统2.1.2 linux系统 2.2 创建隧道映射本地端口2.3 测试公网地址 3. 固定公网地址3.1 保留一个二级子域名3.2 配置二级子域名3.2 测试使用固定公网地址…

使用 Feature Flags 与可观测工具实现数据库灰度迁移

场景描述 很多企业会遇到数据库升级、或数据库迁移的情况&#xff0c;尤其是在自建数据库服务向云数据库服务、自建机房向云机房、旧数据库向新数据库迁移等场景。 然而&#xff0c;我们需要在整个移植过程中保证其稳定性、避免数据遗失、服务宕机等情况&#xff0c;最常见的移…

【Spring】一次性打包学透 Spring | 阿Q送书第五期

文章目录 如何竭尽可能确保大家学透Spring1. 内容全面且细致2. 主题实用且本土化3. 案例系统且完善4. 知识有趣且深刻 关于作者丁雪丰业内专家推图书热卖留言提前获赠书 不知从何时开始&#xff0c;Spring 这个词开始频繁地出现在 Java 服务端开发者的日常工作中&#xff0c;很…

BTP Integration Suite学习笔记 - (Unit4) Developing with SAP Integration Suite

详细指导还是要看官方文档 4. 云集成管理 4.1 云集成介绍 什么是云集成&#xff1f; 前三章讲了很多内容&#xff0c;但都不是最核心的&#xff0c;通常我们用CPI是让他实现原来PI/PO的功能的&#xff0c;是用来做集成的。这章才刚开始。 云集成有以下几个特性&#xff1a;…

Django进阶:DRF(Django REST framework)

什么是DRF&#xff1f; DRF即Django REST framework的缩写&#xff0c;官网上说&#xff1a;Django REST framework是一个强大而灵活的工具包&#xff0c;用于构建Web API。 简单来说&#xff1a;通过DRF创建API后&#xff0c;就可以通过HTTP请求来获取、创建、更新或删除数据(…

【SVN内网穿透】远程访问Linux SVN服务

文章目录 前言1. Ubuntu安装SVN服务2. 修改配置文件2.1 修改svnserve.conf文件2.2 修改passwd文件2.3 修改authz文件 3. 启动svn服务4. 内网穿透4.1 安装cpolar内网穿透4.2 创建隧道映射本地端口 5. 测试公网访问6. 配置固定公网TCP端口地址6.1 保留一个固定的公网TCP端口地址6…

SpringBoot整合阿里云OSS,实现图片上传

在项目中&#xff0c;将图片等文件资源上传到阿里云的OSS&#xff0c;减少服务器压力。 项目中导入阿里云的SDK <dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>3.10.2</version>…

在ubuntu+cpolar+rabbitMQ环境下,实现mq服务端远程访问

文章目录 前言1.安装erlang 语言2.安装rabbitMQ3. 内网穿透3.1 安装cpolar内网穿透(支持一键自动安装脚本)3.2 创建HTTP隧道 4. 公网远程连接5.固定公网TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址 前言 RabbitMQ是一个在 AMQP(高级消息队列协议)基…

开源的密码学工具库:openssl安装在docker容器环境Linux(ubuntu18.04)

OpenSSL&#xff08;Open Secure Socket Layer&#xff09;是一个开源的密码学工具库&#xff0c;它提供了一系列的加密、解密、认证和通信安全相关的功能。OpenSSL 最初是为了支持安全的网络通信而设计的&#xff0c;但后来它的功能逐渐扩展到了许多不同的领域&#xff0c;包括…

多核异构核间通信Mailbox vs rpmsg

目录 一、关键术语解释 二、Mailbox与rpmsg对比 三、rpmsg传输流程 异构核间数据通过共享内存实现数据传递&#xff0c;通过中断来触发发送、接收。 一、关键术语解释 IPC Inter-Processor Communication MailBox IP which provides queued interrupt mechanism for comm…

javaWeb第一课

前言&#xff1a; 先来说说本来写完了的聊天室的项目&#xff0c;然后后面凡尔赛不小心改错了的然后一个上午才发现的问题&#xff0c;真的很有必要总结一下&#xff0c;气死我了&#xff1a;关于要将这个ArrayList的某个类型要转换为数组&#xff0c;我刚开始这么写的&#x…

leetcode 118.杨辉三角

⭐️ 题目描述 &#x1f31f; leetcode链接&#xff1a;https://leetcode.cn/problems/pascals-triangle/description/ 代码&#xff1a; class Solution { public:vector<vector<int>> generate(int numRows) {// 先开空间vector<vector<int>> v;v.…

【数据结构】C语言实现栈(详细解读)

前言: &#x1f4a5;&#x1f388;个人主页:​​​​​​Dream_Chaser&#xff5e; &#x1f388;&#x1f4a5; ✨✨专栏:http://t.csdn.cn/oXkBa ⛳⛳本篇内容:c语言数据结构--C语言实现栈 目录 什么是栈 栈的概念及结构 实现栈的方式 链表的优缺点: 顺序表的优缺点: 栈…

AI创作助手:介绍 TensorFlow 的基本概念和使用场景

目录 背景 环境测试 入门示例 背景 TensorFlow 是一个强大的开源框架&#xff0c;用于实现深度学习和人工智能模型。它最初由 Google 开发&#xff0c;现在已经成为广泛使用的机器学习框架之一。 TensorFlow 简单来说就是一个用于创建和运行机器学习模型的库。它的核心概念…

2023年骨传导耳机推荐,一文读懂骨传导运动耳机哪个牌子好!

这几年&#xff0c;耳机圈开始流行起骨传导耳机&#xff0c;这种耳机通过贴合耳道附近的颌骨通过振动传递声音到听觉神经&#xff0c;相比较入耳式耳机来说&#xff0c;更有利于耳道卫生&#xff0c;而且在听歌同时可保持对环境声的感知&#xff0c;深受不少运动达人的喜爱。我…

LVS之keepalived

1、keepalived 概述 总结&#xff1a;Keepalived 软件就是通过VRRP协议来实现高可用功能。 应用场景&#xff1a;企业应用中&#xff0c;单台服务器承担应用存在单点故障的危险 单点故障一旦发生&#xff0c;企业服务将发生中断&#xff0c;造成极大的危害 VRRP通信原理&…

【Rust】Rust学习 第十六章无畏并发

安全且高效的处理并发编程是 Rust 的另一个主要目标。并发编程&#xff08;Concurrent programming&#xff09;&#xff0c;代表程序的不同部分相互独立的执行&#xff0c;而 并行编程&#xff08;parallel programming&#xff09;代表程序不同部分于同时执行&#xff0c;这两…

iOS逆向初探:揭开iOS App的神秘面纱

逆向是一种分析和还原应用程序的过程&#xff0c;它能够揭示应用程序内部的工作原理和代码结构。接下来我们将全面介绍iOS上的逆向&#xff0c;包括其概念、常用工具和具体实例。 1. 什么是iOS逆向&#xff1f; iOS平台逆向是将应用程序的二进制代码&#xff08;通常是经过编…

使用PyMuPDF添加PDF水印

使用Python添加PDF水印的博客文章。 C:\pythoncode\new\pdfwatermark.py 使用Python在PDF中添加水印 在日常工作中&#xff0c;我们经常需要对PDF文件进行处理。其中一项常见的需求是向PDF文件添加水印&#xff0c;以保护文件的版权或标识文件的来源。本文将介绍如何使用Py…

反向传播求变量导数

反向传播求变量导数 1. 相关习题2. 推导流程2.1 相关公式2.3 变量导数求解 3. 代码实现3.1 参数对应3.2 代码实现 以前只知道反向传播通过链式法则实现今天看书发现图片上求出来的值自己算不出来所以自己算了一下&#xff0c;记录一下&#xff0c;并运行了书中的代码相关书籍&a…