RabbitMQ 客户端 连接、发送、接收处理消息

RabbitMQ 客户端 连接、发送、接收处理消息

一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样

RabbitMQ 服务,不是像其他服务器一样,负责逻辑处理,然后转发给客户端
而是所有客户端想要向 RabbitMQ服务发送消息,

第一步:创建一个链接 RabbitMQ 服务的连接

需要传入 RabbitMQ服务地址、用户名、密码,然后在连接代码中传入一个 queue 的字符串作为 标志
连接成功后,RabbitMQ服务上就可以看到这个链接了
如下图,可以看到有一个 Name = queueL1 的连接,后边有链接状态、消息数
Ready 和 Total 都是 0
在这里插入图片描述

向 RabbitMQ 发送消息的:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 通过 发送消息接口向 RabbitMQ 服务 发消息
(3) RabbitMQ 服务接收到消息,只是按照连接的 queue 分别把消息放在自己名字的 queue 下, RabbitMQ 服务只是存着客户端发送的消息,服务什么都不处理

向 RabbitMQ 服务发送几条消息
下图可以看到 queueL1 的队列已经接收了 5 条消息,这五条消息如果没有客户端接收处理,就一直在这存着
在这里插入图片描述

接收 RabbitMQ 服务消息:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 注册接收消息接口,在 RabbitMQ 中叫 消费消息,可以标记消费消息后是否将 RabbitMQ 的数据删除
(3) 如果 RabbitMQ 服务收到消息,就转发给 注册接收消息接口的 连接,如果接收的连接标记了 AutoDelete,那么发送给客户端后,RabbitMQ 就会将消息从消息队列中删除

注册接收消息,我的客户端就会收到 RabbitMQ 发送过来的消息,消息中包含发送上来的消息内容,还有发送消息的 queue 名字

此时再看,就会发现 Ready 和 Total 又变成 0 了
在这里插入图片描述

为什么上面讲解中将 接收 RabbitMQ 服务消息、向 RabbitMQ 发送消息的 分开说
是因为 RabbitMQ 发送消息就仅仅是发消息,发送完就不管了
而 RabbitMQ 的消费消息(接收消息) 也仅仅是接收消息,它不管是谁发的消息,只要是发送的 RabbitMQ 服务的消息,它都能接收,

(3.1) 比如我创建了 一个 连接,queue名为 xxxA,
它发送了消息 “Hello World”,
xxxA 连接自己又注册了 消费消息(接收消息),那么xxxA 自己就会接收到 xxxA 队列发送的 Hello World 信息

(3.2) 我又创建了 新的连接,queue 名还是 xxxA
那么新的连接也可以收到 (3.1) 发的 消息 HelloWorld

二. 客户端连接服务器

  1. 实例化一个 连接 RabbitMQ 服务的客户端连接
    实例化需要传入 服务地址、端口、用户名、密码
using RabbitMQ.Client;
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
using UnityEngine;
using System.Text;
using RabbitMQ.Client.Events;namespace Network
{/// <summary>/// RabbitMQ 创建一个链接/// 供 RabbitMQReceive、RabbitMQSend 使用/// </summary>public class RabbitMQConnect{private RabbitMQConnectData connectData;private ConnectionFactory factory;private IChannel channel;private IConnection connection;private NetWorkState state;private Action<string, byte[]> receivedCallBack;private const int TimeOut = 10; //连接超时 10 秒private bool dispose = false;public RabbitMQConnect(RabbitMQConnectData connectData){this.connectData = connectData;State = NetWorkState.Disconnected;dispose = false;}public string Queue{get { return connectData.queue; }}public NetWorkState State{get { return state; }private set { state = value; }}public IChannel Channel{get { return channel; }}/// <summary>/// 网络是否连接中/// </summary>public bool IsConnect{get{if (null == channel || null == connection){return false;}return channel.IsOpen && connection.IsOpen;}}public async Task StartConnect(){if (State == NetWorkState.Connecting){await Task.Delay(TimeOut * 1000);}if (State == NetWorkState.Connected){return;}// 创建连接工厂// 如果初始化失败,不会启动恢复连接//factory = new ConnectionFactory()//{//    HostName = hostName, // 替换为你的 RabbitMQ 服务器地址//    UserName = userName, // 替换为用户名//    Password = password  // 替换为密码//};string url = $"amqp://{connectData.userName}:{connectData.password}@{connectData.hostName}:{connectData.port}"; //string.Format("amqp://unity:unity@139.9.137.14:5672");factory = new ConnectionFactory(){Uri = new Uri(url)};// 自动恢复连接factory.AutomaticRecoveryEnabled = true;// 如果由于异常导致恢复失败(例如RabbitMQ节点仍然不可达),它将在固定的时间间隔(默认为5秒)后重试。间隔时间可配置如下// Connection.CloseAsync 关闭的连接不会启动自动恢复连接factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);factory.TopologyRecoveryEnabled = true;while (State != NetWorkState.Connected){if (!dispose){await Connect();}}await Task.Delay(1);if (!string.IsNullOrEmpty(connectData.receiveQueeu)){await BasicConsumer();}}private async Task Connect(){try{State = NetWorkState.Connecting;// 异步创建连接connection = await factory.CreateConnectionAsync();channel = await connection.CreateChannelAsync();// 声明队列QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(queue: connectData.queue,durable: false,exclusive: false,autoDelete: false,arguments: null);/*autoDelete = true:没有消费者时队列自动删除,通常用于临时或一次性的队列。autoDelete = false:队列不会自动删除,通常用于需要长期存在的队列。选择是否设置 autoDelete = true 取决于你是否希望队列在没有消费者时自动删除。如果你的队列是临时的、一次性的,那么使用 autoDelete = true 会更适合;如果队列是长期需要使用的,则设置为 autoDelete = false 会更为合适 */State = NetWorkState.Connected;// 设置消费者的预取计数为10,允许同时处理10条消息await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);Debug.Log("RabbitMQ Connect Success");GameNotifycation.GetInstance().Notify<NetWorkState>(ENUM_MSG_TYPE.MSG_NETWORK_STATE_CHANGE, State);}catch (BrokerUnreachableException e){await Task.Delay(5000);State = NetWorkState.ConnectFailed;Debug.LogError("ConnectError:" + e.ToString());// apply retry logic}await Task.Delay(1);}/// <summary>/// 发送消息/// exchange:   要发布消息的交换机名称。/// routingKey: 路由键,决定消息应该路由到哪个队列。/// mandatory:  如果设置为 true,RabbitMQ 会确保消息至少被投递到一个队列。如果没有队列接收该消息,RabbitMQ 会触发 basic.return。/// immediate:  如果设置为 true,RabbitMQ 会在消息无法立即被消费时丢弃消息。/// basicProperties: 消息的属性,类型为 IBasicProperties。这些属性可以设置消息的优先级、持久性等。/// body: 消息体的字节数组。/// /// BasicPublishAsync 方法 没有返回消息投递的结果。它仅仅表示“请求已经被成功发送到 RabbitMQ 的交换机”。如果发布操作成功,Task 会正常完成,不会抛出异常。你可以通过异常处理来捕获潜在的错误。/// </summary>/// <param name="msg"></param>public async Task SendAsync(string message){if (!IsConnect){UnityEngine.Debug.Log("Send not IsConnect");await StartConnect();}try{IChannel channel = Channel;var body = Encoding.UTF8.GetBytes(message);var props = new BasicProperties();props.ContentType = "text/plain";props.DeliveryMode = DeliveryModes.Transient;await channel.BasicPublishAsync(exchange: "",routingKey: Queue,mandatory: false,basicProperties: props,body: body).ConfigureAwait(false);//Debug.Log($"[x] Sent: Complete");}catch (Exception ex){UnityEngine.Debug.LogError($"Error publishing message: {ex.Message}");}}/// <summary>/// 设置接收消息回调/// </summary>/// <param name="receivedCallBack"></param>public void SetReceive(Action<string, byte[]> receivedCallBack){this.receivedCallBack = receivedCallBack;}/// <summary>/// 创建异步消费者/// </summary>/// <returns></returns>public async Task<string> BasicConsumer(){if (!IsConnect){await StartConnect();}var consumer = new AsyncEventingBasicConsumer(Channel);// 处理消息的异步回调逻辑consumer.ReceivedAsync += ReceivedAsync;// 开始消费string result = await Channel.BasicConsumeAsync(queue: connectData.receiveQueeu,  // 指定消费者要监听的队列名称autoAck: false,        // 决定是否自动确认消息。如果 true,消息在交付时会自动确认。如果 false,则需要手动调用 BasicAck 确认消息consumer: consumer);  // 指定消息的处理方式,通过实现 IBasicConsumer 接口来定义如何处理从队列中接收到的消息/*autoAck = true:消息一旦传递给消费者,RabbitMQ 就认为该消息被成功处理,无需再确认。autoAck = false:消费者需要显式地调用 channel.BasicAck 来确认消息的处理,通常用于消息处理失败时能够重试消息。*/return result;}/// <summary>/// 异步接收消息/// 如果 Channel.BasicConsumeAsync 方法中 autoAck 设置为 true,那么 channel.BasicAckAsync 调用是不允许的/// 想在  Channel.BasicConsumeAsync 消费消息收到消息时 调用 channel.BasicAckAsync,必须将 Channel.BasicConsumeAsync 方法中 autoAck 设置为 false/// </summary>/// <param name="sender"></param>/// <param name="eventArgs"></param>/// <returns></returns>private async Task ReceivedAsync(object sender, BasicDeliverEventArgs eventArgs){try{//Debug.Log("ReceivedAsync");AsyncEventingBasicConsumer consumer = sender as AsyncEventingBasicConsumer;string queue = consumer.Channel.CurrentQueue;var body = eventArgs.Body.ToArray();receivedCallBack?.Invoke(queue, body);// 模拟异步任务处理(比如访问数据库或调用其他服务)await channel.BasicAckAsync(eventArgs.DeliveryTag, false);}catch (Exception ex){Debug.LogError($"Error processing message: {ex.Message}");// 如果处理失败,可以拒绝并重新入队(可选)//await Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: true);}await Task.Delay(1);}/// <summary>/// 关闭连接/// </summary>public async void Dispose(){dispose = true;// 先关闭通道、再关闭连接if (channel != null)  // 通道关闭{await channel.CloseAsync();channel = null;}if (connection != null)  // 连接关闭{UnityEngine.Debug.Log("ConnectDispose");await connection.CloseAsync();connection = null;}await Task.Delay(1);}}
}

RibbitMQ 服务通过 queue 来区分每一个连接的客户端,代码部分如下

QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(queue: queue,durable: false,exclusive: false,autoDelete: false,arguments: null);
  1. 客户端实例

  2. 测试用例

using UnityEngine;
using Network;
using LitJson;
using System.Text;
using System.Collections;
using System.Collections.Generic;public class RabbitMQDemo : MonoBehaviour
{// 客户端private RabbitMQConnect rabbitMQConnect;private Queue<string> receiveQueue = new Queue<string>();void Start(){RabbitMQConnectData connectData = new RabbitMQConnectData();connectData.queue = "TestA";connectData.receiveQueeu = "TestA";connectData.hostName = "XXX.XXX.XXX.XXX";connectData.port = "5672";connectData.userName = "unity";connectData.password = "unity";// 实例化rabbitMQConnect = new RabbitMQConnect(connectData);rabbitMQConnect.SetReceive(Receive);StartConnect();}private async void StartConnect(){await rabbitMQConnect.StartConnect();}private async void Send(string meg){await rabbitMQConnect.SendAsync(meg);}private void Receive(string queue, byte[] byteData){var json = Encoding.UTF8.GetString(byteData);UnityEngine.Debug.Log($"[x] ReceivedAsync: {json}");receiveQueue.Enqueue(netWorkData);}private int number = 1000;// Update is called once per framevoid Update(){if (Input.GetKeyDown(KeyCode.A)){++number;Send("Hello RabbitMQ:" + number);}DispatchMessage();}private void DispatchMessage(){if (receiveQueue.Count <= 0){return;}string json = receiveQueue.Dequeue();}private void OnDestroy(){Debug.LogError("OnDestroy");rabbitMQConnect.Dispose();}
}
/// <summary>/// 网络连接状态/// </summary>public enum NetWorkState{// init/// <summary>/// 关闭/断开连接/// </summary>Closed,// client/// <summary>/// 已经建立连接/// </summary>Connected,/// <summary>/// 正在请求连接/// </summary>Connecting,/// <summary>/// 连接失败/// </summary>ConnectFailed,// both/// <summary>/// 连接超时/// </summary>Timeout,/// <summary>/// 断开连接/// </summary>Disconnected,}

扩展
可以在 网页上 Overview 页面,找到 Ports and contexts 部分
可以看到每种协议对应的端口是不一样的
每种协议都有一种独立的连接方式
需要根据自己选择的协议拼接路径

比如 我上面代码使用的 http 方式

    string localHost = "localhost"; // ip如 xxx.xxx.xxx.xxxstring userName = "用户名";string password = "密码";// 创建连接工厂// 如果初始化失败,不会启动恢复连接factory = new ConnectionFactory(){HostName = hostName, // 替换为你的 RabbitMQ 服务器地址UserName = userName, // 替换为用户名Password = password  // 替换为密码};

amqp 协议连接方式如下

	string url = $"amqp://{userName}:{password}@{hostName}:{port}"; factory = new ConnectionFactory(){Uri = new Uri(url)};

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/484436.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MSSQL2022的一个错误:未在本地计算机上注册“Microsoft.ACE.OLEDB.16.0”提供程序

MSSQL2022导入Excel的一个错误&#xff1a;未在本地计算机上注册“Microsoft.ACE.OLEDB.16.0”提供程序 一、导入情况二、问题发现三、问题解决 最近在安装新版SQLServer SSMS 2022后&#xff0c;每次导入Excel都会出现错误提示&#xff1a;未在本地计算机上注册“Microsoft.AC…

GPT 1到4代的演进笔记

1. GPT-1 标题是 Improving Language Understanding by Generative Pre-Training. 发表于 2018.02, 比 bert(发布于 2018.10) 早了半年. 1.1 动机 困难:NLU 任务是多样的, 有 {textual entailment, question answering, semantic similarity assessment, document classifica…

【06】 MySQL 数据表的约束都有哪些?理解与实践

文章目录 1. 主键约束&#xff08;Primary Key&#xff09;2. 外键约束&#xff08;Foreign Key&#xff09;3. 唯一约束&#xff08;Unique&#xff09;4. 非空约束&#xff08;Not Null&#xff09;5. 默认值约束&#xff08;Default&#xff09;6. 检查约束&#xff08;Chec…

element-ui的下拉框报错:Cannot read properties of null (reading ‘disabled‘)

在使用element下拉框时&#xff0c;下拉框option必须点击输入框才关闭&#xff0c;点击其他地方报错&#xff1a;Cannot read properties of null (reading disabled) 造成报错原因&#xff1a;项目中使用了el-dropdown组件&#xff0c;但是在el-dropdown里面没有定义el-dropdo…

工业—使用Flink处理Kafka中的数据_ChangeRecord1

使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,当某设备 30 秒状态连续为 “ 预警 ” ,输出预警 信息。当前预警信息输出后,最近30

丹摩征文活动 | AI创新之路,DAMODEL助你一臂之力GPU

目录 前言—— DAMODEL&#xff08;丹摩智算&#xff09; 算力服务 直观的感受算力提供商的强大​ 平台功能介绍​ 镜像选择 云磁盘创建 总结 前言—— 只需轻点鼠标,开发者便可拥有属于自己的AI计算王国 - 从丰富的GPU实例选择,到高性能的云磁盘,再到预配置的深度学习…

Java程序调kubernetes(k8s1.30.7)core API简单示例,并解决403权限验证问题,即何进行进行权限授权以及验证

简单记录问题 一、问题描述 希望通过Java程序使用Kubernetes提供的工具包实现对Kubernetes集群core API的调用&#xff0c;但是在高版本上遇见权限验证问题4xx。 <dependency><groupId>io.kubernetes</groupId><artifactId>client-java</artifact…

微信小程序wx.showShareMenu配置全局分享功能

在app.js文件中配置如下即可&#xff1a; onLaunch() {//开启分享功能this.overShare()},/*** 开启朋友圈分享功能* 监听路由切换/自动执行*/overShare() {wx.onAppRoute((res) > {// console.log(route, res)let pages getCurrentPages()let view pages[pages.length - …

生信软件开发1 - 设计一个简单的Windwos风格的GUI报告软件

1. 安装基础库 使用Windows 11标题样式和主题自定义UI窗口库pywinstyles&#xff08;github: https://github.com/Akascape/py-window-styles&#xff09;&#xff0c;结合python自带tkinter库设计一个报告GUI软件。 pip install pywinstyles2. 设计一个简单的Windwos风格的G…

【PlantUML系列】类图(一)

目录 一、类 二、接口 三、抽象类 四、泛型类 五、类之间的关系 六、添加注释 七、包图 八、皮肤参数 一、类 使用class关键字定义类&#xff0c;类名后跟大括号&#xff0c;声明类的属性和方法。 属性&#xff1a;格式为{visibility} attributeName : AttributeType…

复现SMPLify-X: Ubuntu22.04, Cuda-11.3, GPU=3090Ti

Env: 3090Ti CUDA 最低支持版本需要>cuda-11.1 Ubuntu 22.04 Installation: Installing CUDA11.3 wget https://developer.download.nvidia.com/compute/cuda/11.3.0/local_installers/cuda_11.3.0_465.19.01_linux.run sudo sh cuda_11.3.0_465.19.01_linux.run …

数据库实验7

数据库实验7 0 建立登录名用SSMS工具建立登录名用T-SQL语句建立登录名 1 删除登录名用SSMS工具实现用T-SQL语句实现 2 建立数据库用户用SSMS工具实现用T-SQL语句实现 3 删除数据库用户用SSMS工具实现用T-SQL语句实现 4 管理用户权限用SSMS工具实现用T-SQL语句实现 5 建立用户定…

4.5 TCP 报文段的首部格式

欢迎大家订阅【计算机网络】学习专栏&#xff0c;开启你的计算机网络学习之旅&#xff01; 文章目录 前言1 TCP 报文段的基本结构2 固定部分2.1 源端口与目的端口2.2 序号2.3 确认号2.4 数据偏移2.5 保留字段2.6 控制位2.7 窗口2.8 检验和2.9 紧急指针 3 可变部分3.1 选项3.2 填…

青听云音乐项目总结

前言 先来介绍一下我的项目&#xff1a;青听云音乐&#xff0c;几乎完全参考网易云音乐网页版实现&#xff0c;使用的技术栈有SpringBoot、SpringCloud、Mybatis、Mybatis-Plus、Redis、Elasticsearch、RabbitMQ、Docker&#xff0c;也包括一些小技术的使用&#xff0c;比如 j…

openssl的运用

一、概述 Opssl是一个用于TLS/SSL协议的工具包&#xff0c;也是一个通用密码库。 包含了国密sm2 sm3 sm4&#xff0c;包含了对称加密&#xff0c;非对称加密&#xff0c;单项散列&#xff0c;伪随机、签名&#xff0c;密码交换&#xff0c;证书等一些算法库。 为了深层次的学习…

房屋租赁系统源码 SpringBoot + Vue 实现全功能解析

这是一套使用 SpringBoot 与 Vue 开发的房屋租赁系统源码,站长分析过这套源码,推测其原始版本可能是一个员工管理系统,经过二次开发后,功能被拓展和调整,现已完全适用于房屋租赁业务。 该系统功能完善,涉及房屋销售、租赁管理等。站长在测试部署过程中,发现源码结构清晰…

工业齐套管理虚拟现实仿真模拟软件

工业齐套管理虚拟现实仿真模拟软件是与法国最大的汽车制造商合作开发的一款虚拟现实仿真模拟软件&#xff0c;借助身临其境的虚拟现实环境&#xff0c;无需停止生产线&#xff0c;即可模拟仓库和提货区域。 工业齐套管理虚拟现实仿真模拟软件不仅适用于汽车工业&#xff0c;安全…

基于python爬虫的智慧人才数据分析系统

文末获取源码和万字论文&#xff0c;制作不易&#xff0c;感谢点赞支持。 废话不多说&#xff0c;先看效果图 更多效果图可私信我获取 源码分享 import os import sysdef main():"""Run administrative tasks."""os.environ.setdefault(DJANGO…

Android 分词的两种方式

前言&#xff1a; 本文分别介绍了原生和三方(Jieba)两种分词方式的使用和注意事项 1、安卓原生BreakIterator分词 比较简单&#xff0c;但是效果不太行 /*** 功能&#xff1a;原生分词* 参数&#xff1a;text&#xff1a;需要分词的语句* 返回值&#xff1a;return&#xf…

TCP Analysis Flags 之 TCP Spurious Retransmission

前言 默认情况下&#xff0c;Wireshark 的 TCP 解析器会跟踪每个 TCP 会话的状态&#xff0c;并在检测到问题或潜在问题时提供额外的信息。在第一次打开捕获文件时&#xff0c;会对每个 TCP 数据包进行一次分析&#xff0c;数据包按照它们在数据包列表中出现的顺序进行处理。可…