【RabbitMQ】golang客户端教程3——发布订阅(使用fanout交换器)

发布订阅

在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务只传递给一个工人。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一个消息。这就是所谓的“订阅/发布模式”

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。

在我们的日志系统中,每一个运行的接收器程序副本都会收到消息。这样,我们就可以运行一个接收器并将日志定向到磁盘;同时,我们还可以运行另一个接收器并在屏幕上查看日志。

本质上,已发布的日志消息将被广播到所有接收者。

Exchanges(交换器)

在本教程的前面部分中,我们向队列发送消息和接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。

让我们快速回顾一下先前教程中介绍的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

相反,生产者只能将消息发送到交换器。交换器是非常简单的东西。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交换器必须确切知道如何处理接收到的消息。它应该被附加到特定的队列吗?还是应该将其附加到许多队列中?或者它应该被丢弃。这些规则由交换器的类型定义。

在这里插入图片描述
有几种交换器类型可用:direct, topic, headersfanout。我们将集中讨论最后一个——fanout。让我们创建一个这种类型的交换器,并给它起个名字叫logs:

err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments
)

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。而这正是我们记录器所需要的。

交换器清单

rabbitmqctl list_exchanges


在此列表中,将有一些`amq.*`交换器和一个默认的(未命名)交换器。这些是默认创建的,但是你现在不太可能需要使用它们。默认交换器在本教程的前面部分中,我们还不知道交换器的存在,但仍然能够将消息发送到队列。之所以能这样做,是因为我们使用的是默认交换器,该交换器由空字符串(`""`)标识。回想一下我们之前是怎么发布消息的:err = ch.Publish("",     // exchangeq.Name, // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})在这里,我们使用默认或无名称的交换器:消息将以`route_key`参数指定的名称路由到队列(如果存在)。
现在,我们可以改为发布到我们的命名交换器:
err = ch.ExchangeDeclare("logs",   // 使用命名的交换器"fanout", // 交换器类型true,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil)     // argumentsfailOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

临时队列

你可能还记得,先前我们使用的是具有特定名称的队列(还记得hello和task_queue吗?)能够命名队列对我们来说至关重要——我们需要将工作人员指向同一个队列。当你想在生产者和消费者之间共享队列时,给队列一个名称非常重要。

但对于我们的记录器来说,情况并非如此。我们希望收到所有日志消息,而不仅仅是它们的一部分。我们也只对当前正在发送的消息感兴趣,而对旧消息不感兴趣。为了解决这个问题,我们需要两件事。

首先,当我们连接到Rabbit时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者更好的方法是让服务器为我们选择一个随机队列名称。

其次,一旦我们断开消费者的连接,队列就会自动删除。

在amqp客户端中,当我们传递一个空字符串作为队列名称时,我们将使用随机生成的名称创建一个非持久队列:

q, err := ch.QueueDeclare("",    // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue,  // 独占队列(当前声明队列的连接关闭后即被删除)false, // no-waitnil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。

你可以在队列指南中了解有关exclusive标志和其他队列属性的更多信息。

绑定

在这里插入图片描述
我们已经创建了一个扇出交换器和一个队列。现在我们需要告诉交换器将消息发送到我们的队列。交换器和队列之间的关系称为绑定

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,
)

从现在开始,logs交换器将会把消息添加到我们的队列中。

列出绑定关系
你猜也猜到了,我们可以使用下面的命令列出绑定关系

rabbitmqctl list_bindings

完整示例

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:


import ("log""os""strings""github.com/streadway/amqp" )func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)} }func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body) }func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s }

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

如果要将日志保存到文件,只需打开控制台并输入:

 go run receive_logs.go > logs_from_rabbit.log

在这里插入图片描述
如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

在这里插入图片描述
在这里插入图片描述
使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

源自:https://www.rabbitmq.com/getstarted.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/81970.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

cloudstack management高可用

一、环境说明 CPU&#xff1a; kunpeng 920 操作系统&#xff1a;OpenEuler 22.03 IP角色192.168.157.20mysql192.168.157.21management-server 1192.168.157.22management-server 2192.168.157.30nginx 二、部署 基础环境准备参考【cloudstack测试环境搭建】 1、部署mysql&…

若依管理系统后端将 Mybatis 升级为 Mybatis-Plus

文章目录 说明流程增加依赖修改配置文件注释掉MybatisConfig里面的Bean 代码生成使用IDEA生成代码注意 Controller文件 说明 若依管理系统是一个非常完善的管理系统模板&#xff0c;里面含有代码生成的方法&#xff0c;可以帮助用户快速进行开发&#xff0c;但是项目使用的是m…

linux手动安装 golangci-lint-1.53.3-linux-386.rpm

首先还是 去下载对应的 rpm 包 https://github.com/golangci/golangci-lint/releases 然后上传到 服务器/usr/local 目录下 执行安装命令 sudo rpm -ivh golangci-lint-1.53.3-linux-386.rpm 查看版本 golangci-lint --version

一周 AIGC 丨苹果下架多款 AIGC 应用,阿里云开源通义千问 70 亿参数模型

多个 AIGC 应用在苹果应用商店下架&#xff0c;包含数据采集和使用不够规范等问题。阿里云开源通义千问 70 亿参数模型&#xff0c;包括通用模型 Qwen-7 B 和对话模型 Qwen-7 B-Chat。腾讯混元大模型开始应用内测&#xff0c;内部多个业务线接入测试。百度智能云“千帆大模型平…

Azure通过自动化账户实现对资源变更

Azure通过自动化账户实现对资源变更 创建一个自动化账户第一种方式 添加凭据&#xff08;有更改资源权限的账户&#xff0c;没有auth认证情况&#xff09;创建一个Runbook&#xff0c;测试修改 AnalysisServices 定价层设置定时任务&#xff1a;开始定时任务&#xff1a; 第二种…

Python爬虫(八)_Requests的使用

Requests&#xff1a;让HTTP服务人类 虽然Python的标准库中urllib2模块中已经包含了平常我们使用的大多数功能&#xff0c;但是它的API使用起来让人感觉不太好&#xff0c;而Requests自称"HTTP for Humans"&#xff0c;说明使用更简单方便。 Requests唯一的一个非转…

【移动机器人运动规划】03 —— 基于运动学、动力学约束的路径规划

文章目录 前言相关代码整理:相关文章&#xff1a; 介绍什么是kinodynamic&#xff1f;为什么需要kinodynamic&#xff1f;模型示例unicycle model&#xff08;独轮车模型&#xff09;differential model&#xff08;两轮差速模型&#xff09;Simplified car model (简化车辆模型…

【技巧】如何保护PowerPoint不被改动?

PPT&#xff0c;也就是PowerPoint&#xff0c;是很多小伙伴在工作生活中经常用到的图形演示文稿软件。 做好PPT后&#xff0c;担心自己不小心改动了或者不想他人随意更改&#xff0c;我们可以如何保护PPT呢&#xff1f;下面小编就来分享两个常用的方法&#xff1a; 1. 将PPT改…

win10笔记本显示器根据页面显示亮度自动调节亮度的问题

系统是win10企业版&#xff0c;针对这个问题查了很多种方法&#xff0c;比如&#xff1a; 1、控制面板->硬件和声音->电源选项->点击当前电源计划的更改计划设置->更改高级电源设置->显示->启用自适应亮度 但是我发现我的电源计划只有平衡这一种&#xff0c…

怎么把图片表格转换成word表格?几个步骤达成

在处理文档时&#xff0c;图片表格的转换是一个常见的需求。而手动输入表格是非常耗时的&#xff0c;因此&#xff0c;使用文本识别软件来自动转换图片表格可以大大提高工作效率。在本文中&#xff0c;我们将介绍如何使用OCR文字识别技术来将图片表格转换为Word表格。 OCR文字识…

【架构设计】高并发架构实战:从需求分析到系统设计

写在前面 很多软件工程师的职业规划是成为架构师&#xff0c;但是要成为架构师很多时候要求先有架构设计经验&#xff0c;而不做架构师又怎么会有架构设计经验呢&#xff1f;那么要如何获得架构设计经验呢&#xff1f; 1 高并发是什么 高并发是指系统在同一时间内处理的请求量…

tomcat上部署jpress

一.确保有jdk&#xff0c;tomcat和mysql环境 二.新建jpress数据库&#xff0c;新建jpress用户并赋予所有权限 三.将jpress的war上传到tomcat/apache-tomcat-8.5.70/webapps&#xff0c;具体根据你的实际tomcat安装路径为准&#xff0c;上传完成后他会自己解包 四.到浏览器完…

python优雅地爬虫

申明&#xff1a;仅用作学习用途&#xff0c;不提供任何的商业价值。 背景 我需要获得新闻&#xff0c;然后tts&#xff0c;在每天上班的路上可以听一下。具体的方案后期我也会做一次分享。先看我喜欢的万能的老路&#xff1a;获得html内容-> python的工具库解析&#xff0…

FreeRTOS(系统配置)

一、FreeRTOSConfig.h文件 FreeRTOS的系统配置文件为FreeRTOSConfig.h&#xff0c;在此配置文件中可完成FreeRTOS的裁剪与配置。 FreeRTOSConfig.h 根据正在构建的应用程序定制 FreeRTOS 内核。因此&#xff0c;它特定于应用程序&#xff0c;而不是 FreeRTOS&#xff0c;并且应…

容器——2.Collection 子接口之 List

文章目录 2.1. Arraylist 和 Vector 的区别?2.2. Arraylist 与 LinkedList 区别?2.2.1. 补充内容:双向链表和双向循环链表2.2.2. 补充内容:RandomAccess 接口 2.3 ArrayList 的扩容机制 2.1. Arraylist 和 Vector 的区别? ArrayList 是 List 的主要实现类&#xff0c;底层使…

并发编程2:如何进行对象共享?

目录 1、对象的可见性&#xff1a;Volatile 变量 2、发布和逸出 3、线程封闭&#xff1a;ThreadLocal 4、对象的不变性 5、安全发布 5.1 - 安全发布常用的模式 5.2 - 可变对象 5.3 - 安全地共享对象 同步在用于实现原子性或者确定 “临界区(Critical Section)” 的同时…

启动RocketMQ报错

说明&#xff1a;启动RocketMQ消费者时&#xff0c;报以下错误&#xff1a;java.lang.IllegalStateException&#xff1a;Failed to start RocketMQ push consumer. 解决&#xff1a;看下所有的监听器类&#xff0c;检查是不是有相同的消费者组名&#xff0c;注释掉其中一个即可…

踩坑 视觉SLAM 十四讲第二版 ch13 编译及运行问题

一、安装Geset 库 sudo apt-get install libgtest-dev cd /usr/src/gtest sudo mkdir build cd build sudo cmake .. //一定要以sudo的方式运行&#xff0c;否则没有写入权限 sudo make //这个也一样要以sudo的方式 sudo cp libgtest*.a /usr/local/lib //将生成…

使用 ESP32 Arduino 和机器学习实现WIFI室内定位

在这个 Arduino 机器学习项目中,我们将使用附近的 WiFi 接入点来定位我们所在的位置。为了使该项目正常运行,您需要一块配备 WiFi 的板,例如 ESP8266、ESP32 或 MKR WiFI 1010。 什么是室内定位? 我们都习惯了 GPS 定位,我们的设备将使用卫星来跟踪我们在地球上的位置。GP…

如何使用win10专业版系统自带远程桌面公司内网电脑,从而实现居家办公?

使用win10专业版自带远程桌面公司内网电脑 文章目录 使用win10专业版自带远程桌面公司内网电脑 在现代社会中&#xff0c;各类电子硬件已经遍布我们身边&#xff0c;除了应用在个人娱乐场景的消费类电子产品外&#xff0c;各项工作也离不开电脑的帮助&#xff0c;特别是涉及到数…