Unity中使用Best MQTT v3插件实现MQTT通信功能,进行模块拆分

概述

本文将详细介绍如何在Unity中使用Best MQTT v3插件实现MQTT通信功能。将通过模块化设计实现配置加载、连接管理、订阅/发布等功能,并提供完整的代码实现。
重连说明:当意外断开连接的时候,会进行重新连接,重连上之后会再次订阅MqttSubscriptionService 中callbacks 中的主题,并添加回调。重连的次数保存再:MqttConnectionService的MaxReconnectAttempts
使用的MQTT版本是:3.1.1 在配置文件中是 SupportedProtocolVersions =0,SupportedProtocolVersions =1 则使用5.0版本,版本的判断保存在:MqttConnectionService的GetProtocolVersion。

环境准备

  1. 插件安装
插件名称功能描述下载地址备注
Best MQTT v3MQTT通信核心插件Unity Asset Store提供MQTT协议的完整实现
LitJsonJSON解析库LitJson官网 或使用BestHTTP自带的LitJson轻量高效的JSON解析工具
DOTween动画与延迟工具Unity Asset Store用于实现重连延迟效果
UniTask异步编程工具GitHub提供高性能的异步任务支持
  1. 配置文件
    StreamingAssets/SystemConfig路径下创建MQTTConfig.json
{"MqttClientHost": "broker.emqx.io","MqttClientPort_TCP": 1883,"MqttClientPort_WebSocket": 8083,"MqttClientUserName": "","MqttClientPassword": "","SupportedProtocolVersions": 0
}

核心模块介绍

MQTT管理器(MqttManager)

public class MqttManager : SingletonMono<MqttManager>
{private async void Start(){var config = await _configLoader.LoadConfigAsync();if (config != null){_connectionService = new MqttConnectionService();_connectionService.Initialize(config);_connectionService.Connect();_connectionService._client.OnConnected += HandleOnConnected;}}private void HandleOnConnected(MQTTClient mqttclient){_subscriptionService = new MqttSubscriptionService(_connectionService);_mqttPublishService = new MqttPublishService(_connectionService);}}

功能说明: 在InitializeServices 初始化各个模块,然后再start中启动连接。

配置加载器(MqttConfigLoader)

         public async UniTask<MQTTClientConfig> LoadConfigAsync(){var path = new Uri(Path.Combine(Application.streamingAssetsPath, "SystemConfig","MQTTConfig.json"));using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))){try{var json = await ReadFileAsyncByWebRequest(path.ToString(), cts.Token);var jsonData = JsonMapper.ToObject<MQTTClientConfig>(json);Debug.Log("MQTT 配置加载成功");return jsonData;}catch (Exception e){Debug.LogError($"读取配置文件失败: {e.Message}");return null; // 或者根据需求抛出异常}}}

功能说明:
跨平台路径处理
异步加载避免卡顿,处理了webgl以及windows下的文件加载
LitJson 对配置文件进行解析

连接服务(MqttConnectionService)

        public void Connect(){ var options = new ConnectionOptionsBuilder()
#if UNITY_WEBGL && !UNITY_EDITOR.WithWebSocket(_config.MqttClientHost, _config.MqttClientPort_WebSocket)
#else.WithTCP(_config.MqttClientHost, _config.MqttClientPort_TCP)
#endif.WithProtocolVersion(GetProtocolVersion()).Build();_client = new MQTTClient(options);SetupEventHandlers();_client.BeginConnect(ConfigureConnection);}

功能说明:
自动识别平台选择协议(WebGL使用WebSocket)
随机ClientID避免冲突
重连机制(5次指数退避重试)

订阅服务(MqttSubscriptionService)

        /// <summary>/// 订阅主题/// </summary>/// <param name="_topic">主题</param>/// <param name="_callback">回调</param>public void Subscribe(string _topic, Action<string> _callback){if (string.IsNullOrEmpty(_topic)){Debug.Log("Topic cannot be null or empty." + nameof(_topic));}if (!connection.IsConnected){Debug.Log($"Cannot subscribe to topic '{_topic}' because the _connection is not established.");return;}if (callbacks.ContainsKey(_topic)){callbacks.Remove(_topic);}Unsubscribe(_topic);callbacks.Add(_topic, _callback);connection._client.CreateSubscriptionBuilder(_topic).WithMessageCallback(HandleMessageReceived).BeginSubscribe();Debug.Log($"Subscribed to topic :{_topic}.");}

功能说明:
自动重订阅机制
线程安全的消息处理

消息发布服务(MqttPublishService)

         /// <summary>/// 发布消息/// </summary>/// <param name="_data"></param>/// <param name="_topic"></param>private void Publish(string _topic, string _data){if (mqttPublishClient.State != ClientStates.Connected){Debug.Log("连接已经断开,不发送消息");return;}mqttPublishClient.CreateApplicationMessageBuilder(_topic).WithPayload(_data).WithQoS(Best.MQTT.Packets.QoSLevels.AtMostOnceDelivery).WithContentType("text/plain; charset=UTF-8").BeginPublish();}

全部代码

MqttManager

 
using Best.MQTT;
using Manager;
using System;
namespace MQTT
{/// <summary>///     MQTT管理器/// </summary>public class MqttManager : SingletonMono<MqttManager>{// 模块化组件private MqttConfigLoader _configLoader;private MqttConnectionService _connectionService;public MqttSubscriptionService _subscriptionService;public MqttPublishService _mqttPublishService;//mqtt发布服务protected override void Init(){base.Init();}private async void Start(){var config = await _configLoader.LoadConfigAsync();if (config != null){_connectionService = new MqttConnectionService();_connectionService.Initialize(config);_connectionService.Connect();_connectionService._client.OnConnected += HandleOnConnected;}}private void HandleOnConnected(MQTTClient mqttclient){_subscriptionService = new MqttSubscriptionService(_connectionService);_mqttPublishService = new MqttPublishService(_connectionService);}void OnDestroy(){_connectionService.isRightDisConnection = true;_connectionService?.Disconnect();}internal void InitCommpoent(){}}
}

MqttConfigLoader

using Best.HTTP.JSON.LitJson;
using Cysharp.Threading.Tasks;
using System;
using System.IO;
using System.Threading;
using UnityEngine;
using UnityEngine.Networking;namespace MQTT
{/// <summary>/// MQTT 配置加载/// </summary>public class MqttConfigLoader{/// <summary>///     读取mqtt的配置文件/// </summary>/// <returns>读取到的mqtt配置文件</returns>public async UniTask<MQTTClientConfig> LoadConfigAsync(){var path = new Uri(Path.Combine(Application.streamingAssetsPath, "SystemConfig","MQTTConfig.json"));using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))){try{var json = await ReadFileAsyncByWebRequest(path.ToString(), cts.Token);var jsonData = JsonMapper.ToObject<MQTTClientConfig>(json);Debug.Log("MQTT 配置加载成功");return jsonData;}catch (Exception e){Debug.LogError($"读取配置文件失败: {e.Message}");return null; // 或者根据需求抛出异常}}}/// <summary>///     通过webrequest 读取文件/// </summary>/// <param name="path">路径</param>/// <param name="cancellationToken"></param>/// <returns></returns>/// <exception cref="Exception"></exception>private async UniTask<string> ReadFileAsyncByWebRequest(string path, CancellationToken cancellationToken){using (var request = UnityWebRequest.Get(path)){var operation = request.SendWebRequest();while (!operation.isDone){await UniTask.Yield(PlayerLoopTiming.Update, cancellationToken);}if (request.result != UnityWebRequest.Result.Success){throw new Exception(request.error);}return request.downloadHandler.text;}}}
}

MqttConnectionService

using Best.MQTT;
using Best.MQTT.Packets.Builders;
using DG.Tweening;
using System;
using UnityEngine;namespace MQTT
{/// <summary>/// MQTT 连接服务(处理连接/断开/重连)/// </summary>public class MqttConnectionService{/// <summary>/// 是正常的断开链接吗?/// </summary>public bool isRightDisConnection = false;public MQTTClient _client;private MQTTClientConfig _config;private const int MaxReconnectAttempts = 5;private int _reconnectAttempts;public bool IsConnected => _client?.State == ClientStates.Connected;public void Initialize(MQTTClientConfig config){_config = config;}public MqttConnectionService() { }/// <summary>/// 链接/// </summary>public void Connect(){ var options = new ConnectionOptionsBuilder()
#if UNITY_WEBGL && !UNITY_EDITOR.WithWebSocket(_config.MqttClientHost, _config.MqttClientPort_WebSocket)
#else.WithTCP(_config.MqttClientHost, _config.MqttClientPort_TCP)
#endif.WithProtocolVersion(GetProtocolVersion()).Build();_client = new MQTTClient(options);SetupEventHandlers();_client.BeginConnect(ConfigureConnection);}/// <summary>/// 断开链接/// </summary>public void Disconnect(){isRightDisConnection = true;_client?.CreateDisconnectPacketBuilder().BeginDisconnect();_client = null;}private ConnectPacketBuilder ConfigureConnection(MQTTClient client, ConnectPacketBuilder builder){return builder.WithClientID(GenerateClientId()).WithCleanStart();}private void SetupEventHandlers(){_client.OnConnected += HandleConnected;_client.OnDisconnect += HandleDisconnected;_client.OnStateChanged += HandleStateChanged;}private void HandleStateChanged(MQTTClient client, ClientStates oldState, ClientStates newState){Debug.Log($"链接状态改变oldState:{oldState},newState:{newState}");}private void HandleConnected(MQTTClient client){Debug.Log("链接到mqtt服务器");isRightDisConnection = false;}private void HandleDisconnected(MQTTClient client, DisconnectReasonCodes code, string reason){Debug.Log("断开mqtt的链接"+ reason);if (isRightDisConnection == true){}else{TryReconnect();}}private void TryReconnect(){if (_reconnectAttempts >= MaxReconnectAttempts) return;DOTween.Sequence().AppendInterval(1f).AppendCallback(() =>{_reconnectAttempts++;Connect();});}private SupportedProtocolVersions GetProtocolVersion() =>_config.SupportedProtocolVersions == 0 ?SupportedProtocolVersions.MQTT_3_1_1 :SupportedProtocolVersions.MQTT_5_0;private string GenerateClientId() => Guid.NewGuid().ToString();}
}

MqttSubscriptionService

using Best.MQTT;
using System;
using System.Collections.Generic;
using System.Text;
using UnityEngine;namespace MQTT
{/// <summary>/// 订阅管理服务(处理订阅/取消订阅)/// </summary>public class MqttSubscriptionService{private readonly MqttConnectionService connection;private readonly Dictionary<string, Action<string>> callbacks = new();public MqttSubscriptionService(MqttConnectionService _connection){this.connection = _connection;this.connection._client.OnConnected += ReSubscribeWhenReconnecting;}/// <summary>/// 订阅主题/// </summary>/// <param name="_topic">主题</param>/// <param name="_callback">回调</param>public void Subscribe(string _topic, Action<string> _callback){if (string.IsNullOrEmpty(_topic)){Debug.Log("Topic cannot be null or empty." + nameof(_topic));}if (!connection.IsConnected){Debug.Log($"Cannot subscribe to topic '{_topic}' because the _connection is not established.");return;}if (callbacks.ContainsKey(_topic)){callbacks.Remove(_topic);}Unsubscribe(_topic);callbacks.Add(_topic, _callback);connection._client.CreateSubscriptionBuilder(_topic).WithMessageCallback(HandleMessageReceived).BeginSubscribe();Debug.Log($"Subscribed to topic :{_topic}.");}/// <summary>/// 取消订阅/// </summary>/// <param name="_topic">主题</param>public void Unsubscribe(string _topic){if (!connection.IsConnected || !callbacks.ContainsKey(_topic)) return;connection._client.CreateUnsubscribePacketBuilder(_topic).BeginUnsubscribe();callbacks.Remove(_topic);if (string.IsNullOrEmpty(_topic)){Debug.Log("Topic cannot be null or empty." + nameof(_topic));}if (!connection.IsConnected || !callbacks.ContainsKey(_topic)){Debug.Log($"Cannot unsubscribe from topic '{_topic}' because the _connection is not established or the topic is not subscribed.");return;}connection._client.CreateUnsubscribePacketBuilder(_topic).BeginUnsubscribe();callbacks.Remove(_topic);Debug.Log($"Unsubscribed from topic '{_topic}'.");}/// <summary>/// 收到消息之后的处理/// </summary>/// <param name="_"></param>/// <param name="__"></param>/// <param name="_topic"></param>/// <param name="_message"></param>private void HandleMessageReceived(MQTTClient _, SubscriptionTopic __, string _topic, ApplicationMessage _message){var payload = Encoding.UTF8.GetString(_message.Payload.Data, _message.Payload.Offset, _message.Payload.Count);if (callbacks.TryGetValue(_topic, out var callback)){callback?.Invoke(payload);}}/// <summary>/// 在连接的时候重新订阅之前的主题/// </summary>private void ReSubscribeWhenReconnecting(MQTTClient mqttclient){if (callbacks.Count == 0){return;}foreach (var topic in callbacks.Keys){Subscribe(topic, callbacks[topic]);}}}
}

MqttPublishService


using Best.MQTT;
using UnityEngine;namespace MQTT
{/// <summary>/// MQTT发布服务(发布消息)/// </summary>public class MqttPublishService{private MQTTClient mqttPublishClient;public MqttPublishService(MqttConnectionService mqttConnectionService){mqttPublishClient = mqttConnectionService._client;}/// <summary>/// 发布消息/// </summary>/// <param name="_data"></param>/// <param name="_topic"></param>public void Publish(string _topic, string _data){if (mqttPublishClient.State != ClientStates.Connected){Debug.Log("连接已经断开,不发送消息");return;}mqttPublishClient.CreateApplicationMessageBuilder(_topic).WithPayload(_data).WithQoS(Best.MQTT.Packets.QoSLevels.AtMostOnceDelivery).BeginPublish();}}
}

SingletonMono

单例类

using UnityEngine;namespace Manager
{/// <summary>/// 单例模式/// </summary>/// <typeparam name="T">需要变成单例的class</typeparam>public abstract class SingletonMono<T> : MonoBehaviour where T : SingletonMono<T>{protected static T _Instance = null;public static T Instance{get{if (null == _Instance){//寻找是否存在当前单例类对象_Instance = FindObjectOfType<T>();//不存在的话if (_Instance == null){//new一个并添加一个单例脚本_Instance = new GameObject(typeof(T).Name).AddComponent<T>();}else{_Instance.Init();}//在切换场景的时候不要释放这个对象DontDestroyOnLoad(_Instance.gameObject);}return _Instance;}}private void Awake(){if (_Instance == null) //如果未实例化{_Instance = this as T; //as T 指强转 为 派生类 传递进来的 类型Init(); //为实现 派生类继承 单例模式基类时 无法使用 Awake() 方法采用 Init() 虚方法 代替}}/// <summary>/// MonoSingleton 基类 留给派生类做Awake()初始化的方法[虚方法]/// </summary>protected virtual void Init(){}}
}

MQTTClientConfig

using System;namespace MQTT
{[Serializable]public class MQTTClientConfig{public string MqttClientClientID;public string MqttClientHost;public int MqttClientPort_TCP;public int MqttClientPort_WebSocket;public string MqttClientUserName;public string MqttClientPassword;public int SupportedProtocolVersions;}
}

MqttStart 使用示例

using MQTT;
using UnityEngine;
using UnityEngine.UI;namespace MQTT
{public class MqttStart : MonoBehaviour{public Button btn;private void Start(){MqttManager.Instance.InitCommpoent(); //创建并初始化mqttManager对象btn.onClick.AddListener(SendData2Server);//按钮绑定一个事件用于测试}//发送消息,并订阅一个主题private void SendData2Server(){MqttManager.Instance._mqttPublishService.Publish("test/data/wyw", "1");MqttManager.Instance._subscriptionService.Subscribe("test/data/wyw/get", (str) =>{Debug.Log("收到消息"+str);});}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/18340.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】--- 基础开发工具之yum/apt、vim、gcc/g++的使用

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; Linux网络编程 本篇博客我们来认识一下Linux中的一些基础开发工具 --- yum,vim,gcc/g。 &#x1f3e0; yum &#x1f3b8; 什么是yum 当用户想下载软…

DeepSeek教unity------MessagePack-02

内置支持类型&#xff1a; 对象序列化 MessagePack for C# 可以序列化你自己定义的公共类或结构体类型。默认情况下&#xff0c;可序列化的类型必须用 [MessagePackObject] 属性进行注解&#xff0c;成员需要用 [Key] 属性进行注解。键可以是索引&#xff08;整数&#xff09;…

deepseek部署在本地详细教程

最近&#xff0c;DeepSeek爆火&#xff0c;先进的算法、卓越的能力&#xff0c;表现出众&#xff0c;其凭一己之力推动国内Ai大模型跨越式发展。作为一款现象级的Ai产品&#xff0c;用户量暴增&#xff0c;最近服务器又被攻击&#xff0c;使用DeepSeek&#xff0c;经常出现服务…

修改OnlyOffice编辑器默认字体

通过Docker修改OnlyOffice编辑器默认字体 问题描述详细方案1. 删除原生字体文件2. 创建字体目录3. 复制字体文件到容器中4. 执行字体更新脚本5. 重新启动容器 注意事项 问题描述 在OnlyOffice中&#xff0c;编辑器的默认字体可能不符合公司或个人的需求&#xff0c;通常会使用…

Vue学习笔记4

Vue学习笔记 一、自定义创建项目 基于VueCli自定义创建项目架子 二、vuex基本认知 1、vuex概述 是什么&#xff1a;是vue的状态管理工具&#xff08;插件&#xff09;&#xff0c;状态就是数据 大白话&#xff1a;vuex是一个插件&#xff0c;可以帮助我们管理vue通用的数…

文心一言4月起全面免费,6月底开源新模型:AI竞争进入新阶段?

名人说&#xff1a;莫听穿林打叶声&#xff0c;何妨吟啸且徐行。—— 苏轼 Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 一、文心一言免费化的背后&#xff1a;AI成本与应用的双重驱动1️⃣成本下降&#xff0c;推动文心一言普及2…

SSM课设-学生选课系统

【课设者】SSM课设-学生选课系统 分为 管理员 和 老师 和 学生端 技术栈 前端: HtmlCssJavaScriptAjax 后端: Spring、Spring MVC、MyBatis、MySQL、JSP 学生端 --选课 选课 搜索 --查看选课结果 --退选 --查看已修课程 --管理个人信息 老师端 --添加教学课程 添加 …

LabVIEW外腔二极管激光器稳频实验

本项目利用LabVIEW软件开发了一个用于外腔二极管激光器稳频实验的系统。系统能够实现激光器频率的稳定控制和实时监测&#xff0c;为激光实验提供了重要支持。 项目背景&#xff1a; 系统解决了外腔二极管激光器频率不稳定的问题&#xff0c;以满足对激光器频率稳定性要求较高…

windows,docker停止所有容器

CMD命令窗口 你可以尝试使用以下命令来停止所有正在运行的Docker容器&#xff1a; FOR /f "tokens*" %i IN (docker ps -q) DO docker stop %i这条命令的工作原理是&#xff1a; docker ps -q 列出所有正在运行的容器的ID。 FOR /f "tokens*" %i IN (c…

RedHat8安装postgresql15和 postgis3.4.4记录及遇到的问题总结

安装包对照版本参考 UsersWikiPostgreSQLPostGIS – PostGIS 如果Red Hat系统上有旧版本的PostgreSQL需要卸载 在较新的Red Hat版本&#xff0c;使用dnf包管理器卸载&#xff1a;sudo dnf remove postgresql-server postgresql 旧版本&#xff0c;使用yum包管理器卸载 sudo y…

DBeaver clickhouse 时区不对 时间少了8小时

选择DataBase选择Driver Manager选择clickhouse数据库点中之后&#xff0c;选择编辑添加两个全局属性 use_server_time_zone use_time_zone 鼠标移动到User Properties上&#xff0c;右键即可添加一列空白 然后断开重连

【vscode】VScode Remote SSH配置

VScode使用remote ssh 到服务器上的Docker容器中 1. 配置远程服务器docker容器的端口映射&#xff0c;例如将服务器的2222端口映射到container的22端口(默认) 1.1 在容器系统的sshd_config文件中配置参数 #配置文件 vim /etc/ssh/sshd_config #打开端口号 Port 221.2 建立容…

光谱相机在天文学领域的应用

天体成分分析 恒星成分研究&#xff1a;恒星的光谱包含了其大气中各种元素的吸收和发射线特征。通过光谱相机精确测量这些谱线&#xff0c;天文学家能确定恒星大气中氢、氦、碳、氮、氧等元素的含量。如对太阳的光谱分析发现&#xff0c;太阳大气中氢元素占比约 71%&#xff0…

MySQL错误-this is incompatible with sql_mode=only_full_group_by完美解决方案

项目场景 有时候&#xff0c;遇到数据库重复数据&#xff0c;需要将数据进行分组&#xff0c;并取出其中一条来展示&#xff0c;这时就需要用到group by语句。 但是&#xff0c;如果mysql是高版本&#xff0c;当执行group by时&#xff0c;select的字段不属于group by的字段的…

Unity 全部版本下载存档【需要梯子】

首先 挂一个非cn的梯子&#xff08;因为实测港澳不行&#xff0c;会跳转到cn官网&#xff09; 然后打开这个网址&#xff1a; Download Archive 最后打开你的Unity Hub 点击下载就可以自动跳转到Hub里下载了

同为科技智能PDU助力Deepseek人工智能和数据交互的快速发展

1 2025开年&#xff0c;人工智能领域迎来了一场前所未有的变革。Deepseek成为代表“东方力量”的开年王炸&#xff0c;不仅在国内掀起了技术热潮&#xff0c;并且在全球范围内引起了高度关注。Deepseek以颠覆性技术突破和现象级应用场景席卷全球&#xff0c;这不仅重塑了产业格…

Linux-C/C++《七、字符串处理》(字符串输入/输出、C 库中提供的字符串处理函数、正则表达式等)

字符串处理在几乎所有的编程语言中都是一个绕不开的话题&#xff0c;在一些高级语言当中&#xff0c;对字符串的处理支 持度更是完善&#xff0c;譬如 C、 C# 、 Python 等。若在 C 语言中想要对字符串进行相关的处理&#xff0c;譬如将两个字符串进行拼接、字符串查找、两个…

Golang GORM系列:GORM事务及错误处理

在数据库管理领域&#xff0c;确保数据完整性至关重要。GORM是健壮的Go对象关系映射库&#xff0c;它为开发人员提供了维护数据一致性和优雅地处理错误的基本工具。本文是掌握GORM事务和错误处理的全面指南。我们将深入研究如何使用事务来保证原子性&#xff0c;并探索有效处理…

「软件设计模式」工厂方法模式(Factory Method) vs 抽象工厂模式(Abstract Factory)

前言 在软件工程领域&#xff0c;设计模式是解决常见问题的经典方案。本文将深入探讨两种创建型模式&#xff1a;工厂方法模式和抽象工厂模式&#xff0c;通过理论解析与实战代码示例&#xff0c;帮助开发者掌握这两种模式的精髓。 一、工厂方法模式&#xff08;Factory Metho…