【RabbitMQ】golang客户端教程2——工作队列

任务队列/工作队列

在这里插入图片描述
在上一个教程中,我们编写程序从命名的队列发送和接收消息。在这一节中,我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。

工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。

这个概念在Web应用中特别有用,因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务,(译注:例如注册时发送邮件或短信验证码等场景)。

准备工作

在本教程的上一部分,我们发送了一条包含“ Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有实际的任务,例如调整图像大小或渲染pdf文件,所以我们通过借助time.Sleep函数模拟一些比较耗时的任务。我们会将一些包含.的字符串封装为消息发送到队列中,其中每有一个.就表示需要耗费1秒钟的工作,例如,hello...表示一个将花费三秒钟的假任务。

我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go

body := bodyFrom(os.Args)  // 从参数中获取要发送的消息正文
err = ch.Publish("",           // exchangeq.Name,       // routing keyfalse,        // mandatoryfalse,amqp.Publishing {DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

下面是bodyFrom函数:

func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

我们以前的receive.go程序也需要进行一些更改:它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其称为worker.go

msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dot_count := bytes.Count(d.Body, []byte("."))  // 数一下有几个.t := time.Duration(dot_count)time.Sleep(t * time.Second)  // 模拟耗时的任务log.Printf("Done")}
}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们的假任务模拟执行时间。

然后,我们就可以打开两个终端,分别执行new_task.goworker.go了。

# shell 1
go run worker.go
# shell 2
go run new_task.go

循环调度

使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。

首先,让我们尝试同时运行两个worker.go脚本。它们都将从队列中获取消息,但是究竟是怎样呢?让我们来看看。

你需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新任务。启动消费者之后,你可以发布一些消息:

# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....

然后我们在shell1shell2 两个窗口看到如下输出结果了:

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多worker试一下。

消息确认

work 完成任务可能需要耗费几秒钟,如果一个worker在任务执行过程中宕机了该怎么办呢?我们当前的代码中,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果你终止一个worker那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个worker的尚未处理的消息。

我们不想丢失任何任务,如果一个worker意外宕机了,那么我们希望将任务交付给其他worker来处理。

为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。

如果使用者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP连接丢失),RabbitMQ将了解消息未完全处理,并将对其重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。

没有任何消息超时;RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。

在本教程中,我们将使用手动消息确认,方法是为“auto-ack”参数传递一个false,然后在完成任务后,使用d.Ack(false)worker发送一个正确的确认(这将确认一次传递)。

msgs, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // 注意这里传false,关闭自动消息确认false,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
if err != nil {fmt.Printf("ch.Consume failed, err:%v\n", err)return
}// 开启循环不断地消费消息
forever := make(chan bool)
go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手动传递消息确认}
}()

使用这段代码,我们可以确保即使你在处理消息时使用CTRL+C杀死一个worker,也不会丢失任何内容。在worker死后不久,所有未确认的消息都将被重新发送。

消息确认必须在接收消息的同一通道(Channel)上发送。尝试使用不同的通道(Channel)进行消息确认将导致通道级协议异常。

忘记确认

忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:

> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
> ```
>
> 在Windows平台,去掉sudo
>
> ```bash
> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
> ```### 消息持久化我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。首先,我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此,我们需要声明它是持久的:```go
q, err := ch.QueueDeclare("hello", // nametrue,    // 声明为持久队列false,   // delete when unusedfalse,   // exclusivefalse,   // no-waitnil,     // arguments
)

虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如task_queue

q, err := ch.QueueDeclare("task_queue", // nametrue,         // 声明为持久队列false,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments
)

这种持久的选项更改需要同时应用于生产者代码和消费者代码。

在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用amqp.Publishing中的持久性选项amqp.Persistent

err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // 立即false,  // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)ContentType:  "text/plain",Body:         []byte(body),})

有关消息持久性的说明

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用publisher confirms

公平分发

你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个worker的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker将持续忙碌,而另一个worker几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。

这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
在这里插入图片描述
为了避免这种情况,我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker发送新消息。相反,它将把它发送给下一个不忙的worker

err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global
)

关于队列大小的说明

如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。

完整的代码实例

我们的new_task.go的最终代码代入如下:

package mainimport ("github.com/streadway/amqp""log""os""strings"
)func failOnErrorNew(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func bodyFrom(args []string) string {var s stringif len(args) < 2 || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], "")}return s
}func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorNew(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorNew(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("task_queue", //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //arguments)failOnErrorNew(err, "Failed to declare a queue ")body := bodyFrom(os.Args)err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent, //表示将消息设置为持久模式,确保在 RabbitMQ 重启后消息能够被恢复。ContentType:  "text/plain",Body:         []byte(body),})failOnErrorNew(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)}

work.go内容如下:

package mainimport ("bytes""github.com/streadway/amqp""log""time"
)func failOnErrorWork(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorWork(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorWork(err, "Failed to open a channel")defer ch.Close()//声明队列q, err := ch.QueueDeclare("task_queue", //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //argument)failOnErrorWork(err, "Failed to declare a queue")err = ch.Qos(1,     //prefetch count 消费者一次能获取的最大未确认消息数量0,     //prefetch size 消费者一次能获取的最大未确认消息的总大小(以字节为单位)false, //global  是否将预取配置应用于所有通道。如果为 true,则应用于所有通道;如果为 false,则仅应用于当前通道。)//获取接收消息的Delivery通道msgs, err := ch.Consume(q.Name, //queue"",     //consumerfalse,  //注意这里传false,关闭自动消息确认false,  //exclusivefalse,  //no-localfalse,  //no-waitnil,    //args)failOnErrorWork(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Reveived a message:%s", d.Body)dot_count := bytes.Count(d.Body, []byte(".")) //数一下有几个t := time.Duration(dot_count)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) //手动传递消息确认}}()log.Printf("[*] Waiting for messages. To exit press CTRL+C")<-forever
}

源自:https://www.rabbitmq.com/getstarted.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/75189.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencv-38 形态学操作-闭运算(先膨胀,后腐蚀)cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)

闭运算是先膨胀、后腐蚀的运算&#xff0c;它有助于关闭前景物体内部的小孔&#xff0c;或去除物体上的小黑点&#xff0c;还可以将不同的前景图像进行连接。 例如&#xff0c;在图 8-17 中&#xff0c;通过先膨胀后腐蚀的闭运算去除了原始图像内部的小孔&#xff08;内部闭合的…

案例实践:小红书APP出现闪退问题,接口测试怎么做?(二)

Postman实现接口功能测试 新增货品接口实战 1、填写接口请求4要素&#xff1a; 由于货品新增接口文档找不到接口请求4要素中的&#xff1a;请求方法、请求地址和请求头&#xff0c;故&#xff0c;使用Fiddler抓包获取&#xff0c;获取结果如下&#xff1a; 1&#xff09;请求…

Zebec APP:构建全面、广泛的流支付应用体系

目前&#xff0c;流支付协议 Zebec Protocol 基本明确了生态的整体轮廓&#xff0c;它包括由其社区推动的模块化 Layer3 构架的公链 Nautilus Chain、流支付应用 Zebec APP 以及 流支付薪酬工具 Zebec payroll 。其中&#xff0c;Zebec APP 是原有 Zebec Protocol 的主要部分&a…

【逗老师的PMP学习笔记】4、项目整合管理

目录 一、制定项目章程1、制定项目章程的整体输入、输出和工具技术2、输入2.1、输入-商业文件2.2、输入-协议2.3、输入-事业环境因素组织过程资产 3、工具与技术3.1、专家判断3.2、数据收集3.3、人际关系与团队技能3.4、会议 4、输出4.1、输出-项目章程4.2、输出-假设日志 二、…

61 # http 数据处理

node 中的核心模块 http 可以快速的创建一个 web 服务 const http require("http"); const url require("url");// req > request 客户端的所有信息 // res > respone 可以给客户端写入数据 const server http.createServer();server.on("r…

【前端知识】React 基础巩固(四十二)——React Hooks的介绍

React 基础巩固(四十二)——React Hooks的介绍 一、为什么需要Hook? Hook 是 React 16.8 的新增特性&#xff0c;它可以让我们在不编写class的情况下使用state以及其他的React特性&#xff08;比如生命周期&#xff09;。 class组件 VS 函数式组件&#xff1a; class的优势…

让Python点亮你的世界:打造专业级编程环境的必备步骤

文章目录 初识pythonpython的安装win系统Linux系统&#xff08;centos7&#xff09; 第一个Python程序常见问题 Python解释器Python开发环境PyCharm的基础使用创建项目修改主题修改默认字体和大小汉化插件翻译软件常用快捷键 初识python Python语言的起源可以追溯到1989年&…

基于ARM+FPGA的驱控一体机器人控制器设计

目前市场上工业机器人&#xff0c;数控机床等多轴运动控制系统普遍采用运动控制器加 伺服驱动器的分布式控制方式。在这种控制方式中&#xff0c;控制器一方面完成人机交互&#xff0c;另 一方面进行 NC 代码的解释执行&#xff0c;插补运算&#xff0c;继而将计算出来的位…

jmeter之接口测试(http接口测试)

基础知识储备 一、了解jmeter接口测试请求接口的原理 客户端--发送一个请求动作--服务器响应--返回客户端 客户端--发送一个请求动作--jmeter代理服务器---服务器--jmeter代理服务器--服务器 二、了解基础接口知识&#xff1a; 1、什么是接口&#xff1a;前端与后台之间的…

云计算——常见集群策略

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 前言 一.什么是集群 二.集群策略 1.虚拟机HA 实现虚拟机高可用性通常涉及以下关键…

【Java基础教程】(五十)JDBC篇:JDBC概念及操作步骤、主要类与接口解析、批处理与事务处理~

Java基础教程之JDBC &#x1f539;本章学习目标1️⃣ JDBC概念2️⃣ 连接数据库3️⃣ Statement 接口3.1 数据更新操作3.2 数据查询 4️⃣ PreparedStatement 接口4.1 Statement 接口问题4.2 PreparedStatement操作 5️⃣ 批处理与事务处理&#x1f33e; 总结 &#x1f539;本…

高性能网络框架笔记

目录 TCP粘包、分包惊群断开连接&#xff0c;TCP怎么检测的&#xff1f;大量的close wait&#xff0c;如何解 ?双方同时调用close水平触发和边沿触发的区别 TCP粘包、分包 解决&#xff1a;1.应用层协议头前面pktlen&#xff1b;2.为每一个包加上分隔符&#xff1b;(\r\n&…

Java 版 spring cloud + spring boot 工程系统管理 工程项目管理系统源码 工程项目各模块及其功能点清单

工程项目各模块及其功能点清单 一、系统管理 1、数据字典&#xff1a;实现对数据字典标签的增删改查操作 2、编码管理&#xff1a;实现对系统编码的增删改查操作 3、用户管理&#xff1a;管理和查看用户角色 4、菜单管理&#xff1a;实现对系统菜单的增删改查操…

经典CNN(三):DenseNet算法实战与解析

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊|接辅导、项目定制 1 前言 在计算机视觉领域&#xff0c;卷积神经网络&#xff08;CNN&#xff09;已经成为最主流的方法&#xff0c;比如GoogleNet&#xff0c;…

【多模态】ALBEF-融合前对齐

目录 &#x1f341;&#x1f341;背景 &#x1f337;&#x1f337;网络结构 &#x1f385;&#x1f385;损失函数 &#x1f33c;&#x1f33c;动量蒸馏 &#x1f33a;&#x1f33a;下游任务结果 &#x1f4d2;&#x1f4d2;Grad-CAM 特征可视化 &#x1f6a6;&#x1f6a…

欧拉函数与筛法求欧拉函数

目录 欧拉函数欧拉函数的定义欧拉函数的公式欧拉函数的公式推导欧拉定理典型例题代码实现 筛法求欧拉函数思路分析经典例题代码实现 欧拉函数 欧拉函数的定义 对于任意正整数 n n n,欧拉函数 φ ( n ) φ(n) φ(n) 表示小于或等于 n n n 的正整数中&#xff0c;与 n n n …

【视觉SLAM入门】5.1. 特征提取和匹配--FAST,ORB(关键点描述子),2D-2D对极几何,本质矩阵,单应矩阵,三角测量,三角化矛盾

"不言而善应" 0. 基础知识1. 特征提取和匹配1.1 FAST关键点1.2 ORB的关键点--改进FAST1.3 ORB的描述子--BRIEF1.4 总结 2. 对极几何&#xff0c;对极约束2.1 本质矩阵(对极约束)2.1.1 求解本质矩阵2.1.2 恢复相机运动 R &#xff0c; t R&#xff0c;t R&#xff0c;…

修改状态栏The application could not be installed: INSTALL_FAILED_ABORTEDList

打开theme修改状态栏为可见。 <resources xmlns:tools"http://schemas.android.com/tools"><!-- Base application theme. --><style name"Base.Theme.MyApplication" parent"Theme.AppCompat.DayNight"><!-- Customize yo…

[JavaScript游戏开发] 绘制冰宫宝藏地图、人物鼠标点击移动、障碍检测

系列文章目录 第一章 2D二维地图绘制、人物移动、障碍检测 第二章 跟随人物二维动态地图绘制、自动寻径、小地图显示(人物红点显示) 第三章 绘制冰宫宝藏地图、人物鼠标点击移动、障碍检测 第四章 绘制Q版地图、键盘上下左右地图场景切换 文章目录 系列文章目录前言一、本章节…

呼吸灯——FPGA

文章目录 前言一、呼吸灯是什么&#xff1f;1、介绍2、占空比调节示意图 二、系统设计1、系统框图2、RTL视图 三、源码四、效果五、总结六、参考资料 前言 环境&#xff1a; 1、Quartus18.0 2、vscode 3、板子型号&#xff1a;EP4CE6F17C8 要求&#xff1a; 将四个LED灯实现循环…