RabbitMQ教程:路由(Routing)(四)

文章目录

  • RabbitMQ教程:路由(Routing)(四)
    • 一、引言
    • 二、基本概念
      • 2.1 路由与绑定
      • 2.2 Direct交换机
      • 2.3 多绑定
      • 2.4 发送日志
      • 2.5 订阅
    • 三、整合代码
      • 3.1 EmitLogDirectApp.cs
      • 3.2 ReceiveLogsDirectApp.cs
      • 3.3 推送所有和接收error、warning级别日志
      • 3.4 推送和接收error级别日志
    • 四、结论

RabbitMQ教程:路由(Routing)(四)

一、引言

在之前的教程中,我们构建了一个简单的日志系统,该系统能够将日志消息广播给多个接收者。在本教程中,我们将扩展这个系统,增加一个功能:只订阅消息的一个子集。例如,我们可能只想将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

二、基本概念

2.1 路由与绑定

在之前的示例中,我们已经创建了绑定。绑定是交换机和队列之间的关系,可以简单地理解为:队列对来自这个交换机的消息感兴趣。

channel.QueueBind(queue: queueName,exchange: "logs",routingKey: string.Empty);

绑定可以带一个额外的routingKey参数,为了避免与BasicPublish参数混淆,我们将其称为binding key。下面是如何创建带有键的绑定:

channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: "black");

绑定键的含义取决于交换机类型。我们之前使用的fanout交换机,简单地忽略了它的值。

2.2 Direct交换机

我们的日志系统从上一个教程中广播所有消息给所有消费者。我们希望扩展这一点,允许根据消息的严重性过滤消息。例如,我们可能希望写入磁盘日志消息的脚本只接收关键错误,而不是浪费磁盘空间在警告或信息日志消息上。

我们之前使用的fanout交换机没有给我们提供太多灵活性——它只能进行无脑广播。

我们将改用direct交换机。direct交换机背后的路由算法很简单——消息会被路由到binding key与消息的routing key完全匹配的队列。
在这里插入图片描述

2.3 多绑定

将多个队列绑定到同一个绑定键是完全合法的。在我们的示例中,我们可以在XQ1之间添加一个绑定键为black的绑定。在这种情况下,direct交换机会表现得像fanout一样,将消息广播给所有匹配的队列。带有路由键black的消息将被传递给Q1Q2
在这里插入图片描述

2.4 发送日志

我们将为我们的日志系统使用这个模型。我们将消息发送到一个direct交换机,并提供日志严重性作为routing key。这样,接收脚本将能够选择它想要接收的严重性。

首先,我们需要创建一个交换机:

channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);

然后我们可以发送消息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);

为了简化,我们假设’severity’可以是infowarningerror中的一个。

2.5 订阅

接收消息将与上一个教程中的工作方式相同,唯一的区别是我们将为每个我们感兴趣的严重性创建一个新的绑定。

var queueName = channel.QueueDeclare().QueueName;foreach(var severity in args)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);
}

三、整合代码

3.1 EmitLogDirectApp.cs

using RabbitMQ.Client;
using System.Text;// 从外部传递循环次数,例如10或20
int loopCount = 10; // 可以根据需要修改循环次数
await SendLogsAsync(loopCount);// 等待用户按下回车键退出程序
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();//发送日志消息到RabbitMQ的direct类型的交换机
async Task SendLogsAsync(int loopCount)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();// 声明名为"direct_logs"的direct类型的交换机string ExchangeName = "direct_logs";await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);// 定义可能的严重性级别string[] severities = { "info", "warning", "error" };// 创建Random实例用于生成随机数Random random = new Random();// 循环发送指定次数的消息for (int i = 0; i < loopCount; i++){// 随机选择一个严重性级别string severity = severities[random.Next(severities.Length)];// 构建消息内容,包含循环次数string message = $"Iteration {i + 1} - Hello World!";// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到交换机,使用严重性级别作为路由键await channel.BasicPublishAsync(exchange: ExchangeName, routingKey: severity, body: body);// 打印消息发送成功的信息Console.WriteLine($" [x] Sent '{severity}':'{message}'");}
}

3.2 ReceiveLogsDirectApp.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 声明名为"direct_logs"的direct类型的交换机
string ExchangeName = "direct_logs";
await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);// 声明一个由服务器命名的队列
var queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 设定感兴趣的日志级别
//string[] severities = { "info","warning", "error" };
//string[] severities = { "warning", "error" };
string[] severities = {  "error" };// 为每个感兴趣的日志级别创建绑定
foreach (string severity in severities)
{await channel.QueueBindAsync(queue: queueName, exchange: ExchangeName, routingKey: severity);
}Console.WriteLine(" [*] Waiting for messages."); // 提示信息,表示消费者正在等待消息// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey; // 获取路由键,即消息的严重性级别Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); // 打印接收到的消息return Task.CompletedTask; // 返回Task.CompletedTask以满足异步事件处理的签名要求
};// 开始消费指定队列的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit."); // 提示信息,等待用户按下回车键退出程序
Console.ReadLine();

在这里插入图片描述

3.3 推送所有和接收error、warning级别日志

配置EmitLogDirectApp.cs推送所有日志级别数据。
配置ReceiveLogsDirectApp接收error和warning级别数据。
在这里插入图片描述

3.4 推送和接收error级别日志

配置EmitLogDirectApp.cs推送所有日志级别数据。
配置ReceiveLogsDirectApp接收error级别数据。
在这里插入图片描述

四、结论

在本教程中,我们深入探讨了RabbitMQ路由模式的概念和实现。通过构建一个可以根据消息属性(如严重性级别)路由消息的日志系统,我们学习了如何创建direct类型的交换机,以及如何根据路由键将消息发送到特定的队列。以下是我们从本教程中获得的关键要点:

  1. 路由灵活性:通过使用direct交换机,我们实现了基于路由键的消息路由,这允许我们灵活地控制消息的流向,而不是简单地广播给所有订阅者。

  2. 消息过滤:我们可以根据消息的属性(如严重性级别)来过滤消息,确保只有相关的消费者接收到消息,从而节省资源并提高效率。

  3. 绑定键与路由键:我们学习了如何使用绑定键和路由键来控制消息的路由,使得消息可以根据特定的键值被路由到对应的队列。

  4. 多绑定:我们了解了如何将多个队列绑定到同一个绑定键,从而实现消息的多播分发。

  5. 实际应用:通过这个教程,我们掌握了如何在实际应用中实现消息的精细化控制,这对于构建复杂的事件驱动架构和微服务架构至关重要。

通过这些机制,我们能够建立一个既高效又灵活的路由系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/473121.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AntFlow:一款高效灵活的开源工作流引擎

AntFlow 是一款功能强大、设计优雅的开源工作流引擎&#xff0c;其灵感来源于钉钉的工作流设计理念&#xff0c;旨在为企业和开发者提供灵活、高效的工作流解决方案。AntFlow 支持复杂的业务流程管理&#xff0c;具有高度可定制性&#xff0c;且拥有现代化的前端设计&#xff0…

游戏引擎学习第13天

视频参考:https://www.bilibili.com/video/BV1QQUaYMEEz/ 改代码的地方尽量一张图说清楚吧,懒得浪费时间 game.h #pragma once #include <cmath> #include <cstdint> #include <malloc.h>#define internal static // 用于定义内翻译单元内部函数 #…

中仕公考怎么样?事业编面试不去有影响吗?

事业编考试笔试已经通过&#xff0c;但是面试不去参加会有影响吗&#xff1f; 1. 自动放弃面试资格&#xff1a;未能按时出席事业单位的面试将被视为主动放弃该岗位的竞争机会。 2. 个人信誉问题&#xff1a;面试作为招聘流程的关键步骤&#xff0c;无故缺席可能被解释为诚信…

ElasticSearch学习笔记二:使用Java客户端

一、前言 在上一篇文章中&#xff0c;我们对ES有了最基本的认识&#xff0c;本着实用为主的原则&#xff0c;我们先不学很深的东西&#xff0c;今天打算先学习一下ES的Java客户端如何使用。 二、创建项目 1、普通Maven项目 1、创建一个Maven项目 2、Pom文件 <dependenc…

使用 Grafana api 查询 Datasource 数据

一、使用grafana 的api 接口 官方API 二、生成Api key 点击 Administration -》Users and accss -》Service accounts 进入页面 点击Add service account 创建 service account 点击Add service account token 点击 Generate token , 就可以生成 api key 了 三、进入grafana…

机器学习-36-对ML的思考之机器学习研究的初衷及科学研究的期望

文章目录 1 机器学习最初的样子1.1 知识工程诞生(专家系统)1.2 知识工程高潮期1.3 专家系统的瓶颈(知识获取)1.4 机器学习研究的初衷2 科学研究对机器学习的期望2.1 面向科学研究的机器学习轮廓2.2 机器学习及其应用研讨会2.3 智能信息处理系列研讨会2.4 机器学习对科学研究的重…

深入List集合:ArrayList与LinkedList的底层逻辑与区别

目录 一、前言 二、基本概念 三、相同之处 四、不同之处 五、ArrayList 底层 六、LinkedList 底层 七、ArrayList 应用场景 八、LinkedList 应用场景 九、ArrayList和LinkedList高级话题 十、总结 一、前言 在Java集合的广阔舞台上&#xff0c;ArrayList与LinkedLis…

python实现十进制转换二进制,tkinter界面

目录 需求 效果 代码实现 代码解释 需求 python实现十进制转换二进制 效果 代码实现 import tkinter as tk from tkinter import messageboxdef convert_to_binary():try:# 获取输入框中的十进制数decimal_number int(entry.get())# 转换为二进制binary_number bin(de…

关于强化学习的一份介绍

在这篇文章中&#xff0c;我将介绍与强化学习有关的一些东西&#xff0c;具体包括相关概念、k-摇臂机、强化学习的种类等。 一、基本概念 所谓强化学习就是去学习&#xff1a;做什么才能使得数值化的收益信号最大化。学习者不会被告知应该采取什么动作&#xff0c;而是必须通…

js导入导出

前言: 后面将学习: Vue3ElementPlus 前置知识:前端三件套 HTML,CSS,JS 使用Vscode 本篇学习 这里先补充一个JavaScript的模块化的知识点 - 导入导出 JS提供的导入导出机制,可以实现按需导入. 我们之前是这样导入的 showMessage.js //简单的展示信息 function simpleMessage…

Web导出Excel表格

背景&#xff1a; 1. 后端主导实现 流程&#xff1a;前端调用到导出excel接口 -> 后端返回excel文件流 -> 浏览器会识别并自动下载 场景&#xff1a;大部分场景都有后端来做 2. 前端主导实现 流程&#xff1a;前端获取要导出的数据 -> 常规数据用插件处理成一个e…

【Linux】Ubuntu中muduo库的编译环境安装

Muduo is a multithreaded C network library based on the reactor pattern. muduo库的介绍就是&#xff1a;一个基于reactor反应堆模型的多线程C网络库。 muduo网络库是C语言开发的一个非常优秀的网络库&#xff0c;作者陈硕&#xff0c;muduo网络库在多线程环境下性能非常高…

IDEA leetcode插件代码模板配置,登录闪退解决

前言 最近换电脑&#xff0c;配置idea时和原来的模板格式不一样有点难受&#xff0c;记录一下自己用的模板&#xff0c;后期换电脑使用&#xff0c;大家也可以使用&#xff0c;有更好的地方可以分享给我~ IDEA leetcode插件代码模板配置,登录闪退解决 前言1 下载IDEA leetcode…

网络安全SQL初步注入2

六.报错注入 mysql函数 updatexml(1,xpath语法,0) xpath语法常用concat拼接 例如: concat(07e,(查询语句),07e) select table_name from information_schema.tables limit 0,1 七.宽字节注入(如果后台数据库的编码为GBK) url编码:为了防止提交的数据和url中的一些有特殊意…

【GeekBand】C++设计模式笔记11_Builder_构建器

1. “对象创建” 模式 通过 “对象创建” 模式绕开new&#xff0c;来避免对象创建&#xff08;new&#xff09;过程中所导致的紧耦合&#xff08;依赖具体类&#xff09;&#xff0c;从而支持对象创建的稳定。它是接口抽象之后的第一步工作。典型模式 Factory MethodAbstract …

JS学习日记(jQuery库)

前言 今天先更新jQuery库的介绍&#xff0c;它是一个用来帮助快速开发的工具 介绍 jQuery是一个快速&#xff0c;小型且功能丰富的JavaScript库&#xff0c;jQuery设计宗旨是“write less&#xff0c;do more”&#xff0c;即倡导写更少的代码&#xff0c;做更多的事&#xf…

排序算法(基础)大全

一、排序算法的作用&#xff1a; 排序算法的主要作用是将一组数据按照特定的顺序进行排列&#xff0c;使得数据更加有序和有组织。 1. 查找效率&#xff1a;通过将数据进行排序&#xff0c;可以提高查找算法的效率。在有序的数据中&#xff0c;可以使用更加高效的查找算法&…

动手学深度学习73 课程总结和进阶学习

1. 课程总结和进阶学习 https://c.d2l.ai/stanford-cs329p/ https://paperswithcode.com https://www.bilibili.com/video/BV1nA41157y4/?vd_sourceeb04c9a33e87ceba9c9a2e5f09752ef8 怎么建立知识库 2. QA 20 算法提取的特征和人的不一样&#xff0c;互补 21 很难预测未…

WebRTC视频 04 - 视频采集类 VideoCaptureDS 中篇

WebRTC视频 01 - 视频采集整体架构 WebRTC视频 02 - 视频采集类 VideoCaptureModule WebRTC视频 03 - 视频采集类 VideoCaptureDS 上篇 WebRTC视频 04 - 视频采集类 VideoCaptureDS 中篇&#xff08;本文&#xff09; WebRTC视频 05 - 视频采集类 VideoCaptureDS 下篇 一、前言…

【弱监督视频异常检测】2024-ESWA-基于扩散的弱监督视频异常检测常态预训练

2024-ESWA-Diffusion-based normality pre-training for weakly supervised video anomaly detection 基于扩散的弱监督视频异常检测常态预训练摘要1. 引言2. 相关工作3. 方法论3.1. 使用扩散自动编码器进行常态学习3.2. 全局-局部特征编码器3.2.1 局部块3.2.2 全局块3.2.3 协同…