RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

文章目录

  • RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
    • 一、引言
    • 二、简介
    • 三、准备工作
      • 3.1 说明
      • 3.2 生成项目
    • 四、实战
      • 4.1 交换机(Exchanges)
      • 4.2 临时队列(Temporary Queues)
      • 4.3 绑定(Bindings)
      • 4.4 整合代码
        • 发布程序
        • 订阅程序
      • 4.5 验证一:先广播后订阅
      • 4.6 验证二:先订阅后广播
    • 五、结论

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

一、引言

在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。

三、准备工作

3.1 说明

在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。

3.2 生成项目

首先,我们需要生成两个项目:

EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:

dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。

四、实战

4.1 交换机(Exchanges)

在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。

我们将创建一个名为logsfanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
在这里插入图片描述

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

4.2 临时队列(Temporary Queues)

在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。

在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:

var queueName = channel.QueueDeclare().QueueName;

4.3 绑定(Bindings)

我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

在这里插入图片描述

4.4 整合代码

发布程序
using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();//声明名为"logs"的fanout类型的交换机await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到队列await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",type: ExchangeType.Fanout);// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 打印接收到的消息Console.WriteLine($" [x] {message}");// 返回Task.CompletedTask以满足异步事件处理的签名要求return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

4.5 验证一:先广播后订阅

运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
在这里插入图片描述

4.6 验证二:先订阅后广播

先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。
在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:

  1. 解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。

  2. 消息广播fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。

  3. 临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。

  4. 动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。

通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/474091.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

对PolyMarket的突袭

一天清晨六点&#xff0c;美国联邦调查局的探员冲进了纽约市的一间公寓。这间公寓的主人是26岁的Shane Copeland&#xff0c;一个有着凌乱头发的年轻人&#xff0c;也是一个加密货币狂热者。他运营着一个名为PolyMarket的网站——一个允许用户YZ全球事件结果的平台&#xff0c;…

DataStream编程模型之数据源、数据转换、数据输出

Flink之DataStream数据源、数据转换、数据输出&#xff08;scala&#xff09; 0.前言–数据源 在进行数据转换之前&#xff0c;需要进行数据读取。 数据读取分为4大部分&#xff1a; &#xff08;1&#xff09;内置数据源&#xff1b; 又分为文件数据源&#xff1b; socket…

Django5 2024全栈开发指南(三):数据库模型与ORM操作

目录 一、模型的定义二、数据迁移三、数据表关系四、数据表操作4.1 Shell工具4.2 数据新增4.3 数据修改4.4 数据删除4.5 数据查询4.6 多表查询4.7 执行SQL语句4.8 数据库事务 Django 对各种数据库提供了很好的支持&#xff0c;包括 PostgreSQL、MySQL、SQLite 和 Oracle&#x…

ASP.NET Core Webapi 返回数据的三种方式

ASP.NET Core为Web API控制器方法返回类型提供了如下几个选择&#xff1a; Specific type IActionResult ActionResult<T> 1. 返回指定类型&#xff08;Specific type&#xff09; 最简单的API会返回原生的或者复杂的数据类型&#xff08;比如&#xff0c;string 或者…

网关在能源物联网中扮演了什么角色?

随着通信、物联网、云平台等技术的飞速发展&#xff0c;越来越多能源用户希望借助先进的管理手段&#xff0c;对能源进行分布式监测、集中管理&#xff0c;构建能源物联网。准确的分布式监测和集中管理有助于制定更科学合理的节能减排计划。企业或能源使用单位可以依据能源物联…

【快速入门】前端御三家:HTML、CSS和JS

HTML HTML&#xff0c;超文本标记语言&#xff0c;可以理解成骨架&#xff0c;是一个基础的东西。 一.基础结构 如图所示&#xff1a; 二.常见标签 1.标题标签 在页面上定义标题性的内容 <h1>一级标题</h1> <h2>二级标题</h2> <h3>三级标…

WebSocket实战,后台修改订单状态,前台实现数据变更,提供前端和后端多种语言

案例场景&#xff1a; 在实际的后台中需要变更某个订单的状态&#xff0c;在官网中不刷新页面&#xff0c;可以自动更新状态 在前端页面实现订单状态的实时更新&#xff08;不刷新页面&#xff09;&#xff0c;可以通过 WebSocket 的方式与后台保持通信&#xff0c;监听订单状态…

Django5 2024全栈开发指南(二):Django项目配置详解

目录 一、基本配置信息二、资源文件配置2.1 资源路由——STATIC_URL2.2 资源集合——STATICFILES_DIRS2.3 资源部署——STATIC_ROOT2.2.4 媒体资源——MEDIA 三、模板配置四、数据库配置4.1 mysqlclient连接MySQL4.2 pymysql连接MySQL4.3 多个数据库的连接方式4.4 使用配置文件…

近几年新笔记本重装系统方法及一些注意事项

新笔记本怎么重装系统&#xff1f; 近几年的新笔记本默认开启了raid on模式或vmd选项&#xff0c;安装过程中会遇到问题&#xff0c;新笔记本电脑重装自带的系统建议采用u盘方式安装&#xff0c;默认新笔记本有bitlocker加密机制&#xff0c;如果采用一键重装系统或硬盘方式安装…

黑马智数Day10

项目背景说明 后台管理部分使用的技术栈是Vue2&#xff0c;前台可视化部分使用的技术栈是Vue3 前台可视化项目不是独立存在&#xff0c;而是和后台管理项目共享同一个登录页面 微前端的好处 微前端是一种前端架构模式&#xff0c;它将大型单体应用程序分解为小的、松散耦合的…

Visual Studio 圈复杂度评估

VisualStudio自带的有工具 之后就可以看到分析结果

prop校验,prop和data区别

prop:组件上注册的一些自定义属性 prop作用&#xff1a;向子组件传递数据 特点&#xff1a; 可以传递任意数量&#xff0c;任意类型的prop 父组件 &#xff08;一个个地传递比较麻烦&#xff0c;可以直接打包成一个对象传过去&#xff0c;然后通过点属性接收&#xff09; <t…

ubuntu显示管理器_显示导航栏

ubuntu文件管理器_显示导航栏 一、原始状态&#xff1a; 二、显示导航栏状态&#xff1a; 三、原始状态--->导航栏状态: 1、打开dconf编辑器&#xff0c;直接在搜索栏搜索 dconf-editor ------如果没有安装&#xff0c;直接按流程安装即可。 2、进入目录&#xff1a;org …

跨平台WPF框架Avalonia教程 一

安装 安装 Avalonia UI 模板​ 开始使用 Avalonia 的最佳方式是使用模板创建一个应用程序。 要安装 Avalonia 模板&#xff0c;请运行以下命令&#xff1a; dotnet new install Avalonia.Templates 备注 对于 .NET 6.0 及更早版本&#xff0c;请将 install 替换为 --inst…

UE5 材质里面画圆锯齿严重的问题

直接这么画圆会带来锯齿&#xff0c;我们对锯齿位置进行模糊 可以用smoothstep&#xff0c;做值的平滑过渡&#xff08;虽然不是模糊&#xff0c;但是类似&#xff09;

【MySql】实验十六 综合练习:图书管理系统数据库结构

文章目录 创建图书管理系统数据库结构一、创建数据表1.1 book表1.2 reader表1.3 borrow表 二、插入示例数据2.1 向book表插入数据2.2 向reader表插入数据2.3 向borrow表插入数据 三、查询操作3.1 根据语义为借书表borrow的bno列和 rno列建立外键3.2 查询张小海编写的“数据库原…

QT QLabel双击事件

新建类&#xff1a; DoubleClickLabel .h #pragma once#include <QLabel>class DoubleClickLabel : public QLabel {Q_OBJECTpublic:DoubleClickLabel(QWidget *parent);~DoubleClickLabel(); signals:void doubleClicked();protected: //这里重写双击事件virtual v…

Vue3中实现插槽使用

目录 一、前言 二、插槽类型 三、示例 四、插槽的分类实现 1. 基本插槽 2. 命名插槽 3. 默认插槽内容 4. 作用域插槽&#xff08;Scoped Slots&#xff09; 5. 多插槽与具名插槽组合 一、前言 在 Vue 3 中&#xff0c;插槽&#xff08;Slot&#xff09;用于实现组件的内…

【学习笔记】科学计算

[pytorch 加速] CPU传输 & GPU计算的并行&#xff08;pin_memory&#xff0c;non_blocking&#xff09; https://www.bilibili.com/video/BV15Xxve1EtZ from IPython.display import Image import os os.environ[http_proxy] http://127.0.0.1:7890 os.environ[https_pr…

2、计算机网络七层封包和解包的过程

计算机网络osi七层模型 1、网络模型总体预览2、数据链路层4、传输层5.应用层 1、网络模型总体预览 图片均来源B站&#xff1a;网络安全收藏家&#xff0c;没有本人作图 2、数据链路层 案例描述&#xff1a;主机A发出一条信息&#xff0c;到路由器A&#xff0c;这里封装目标MAC…