概述
本文将详细介绍如何在Unity中使用Best MQTT v3插件实现MQTT通信功能。将通过模块化设计实现配置加载、连接管理、订阅/发布等功能,并提供完整的代码实现。
重连说明:当意外断开连接的时候,会进行重新连接,重连上之后会再次订阅MqttSubscriptionService 中callbacks 中的主题,并添加回调。重连的次数保存再:MqttConnectionService的MaxReconnectAttempts
使用的MQTT版本是:3.1.1 在配置文件中是 SupportedProtocolVersions =0,SupportedProtocolVersions =1 则使用5.0版本,版本的判断保存在:MqttConnectionService的GetProtocolVersion。
环境准备
- 插件安装
插件名称 | 功能描述 | 下载地址 | 备注 |
---|---|---|---|
Best MQTT v3 | MQTT通信核心插件 | Unity Asset Store | 提供MQTT协议的完整实现 |
LitJson | JSON解析库 | LitJson官网 或使用BestHTTP自带的LitJson | 轻量高效的JSON解析工具 |
DOTween | 动画与延迟工具 | Unity Asset Store | 用于实现重连延迟效果 |
UniTask | 异步编程工具 | GitHub | 提供高性能的异步任务支持 |
- 配置文件
在StreamingAssets/SystemConfig
路径下创建MQTTConfig.json
{"MqttClientHost": "broker.emqx.io","MqttClientPort_TCP": 1883,"MqttClientPort_WebSocket": 8083,"MqttClientUserName": "","MqttClientPassword": "","SupportedProtocolVersions": 0
}
核心模块介绍
MQTT管理器(MqttManager)
public class MqttManager : SingletonMono<MqttManager>
{private async void Start(){var config = await _configLoader.LoadConfigAsync();if (config != null){_connectionService = new MqttConnectionService();_connectionService.Initialize(config);_connectionService.Connect();_connectionService._client.OnConnected += HandleOnConnected;}}private void HandleOnConnected(MQTTClient mqttclient){_subscriptionService = new MqttSubscriptionService(_connectionService);_mqttPublishService = new MqttPublishService(_connectionService);}}
功能说明: 在InitializeServices 初始化各个模块,然后再start中启动连接。
配置加载器(MqttConfigLoader)
public async UniTask<MQTTClientConfig> LoadConfigAsync(){var path = new Uri(Path.Combine(Application.streamingAssetsPath, "SystemConfig","MQTTConfig.json"));using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))){try{var json = await ReadFileAsyncByWebRequest(path.ToString(), cts.Token);var jsonData = JsonMapper.ToObject<MQTTClientConfig>(json);Debug.Log("MQTT 配置加载成功");return jsonData;}catch (Exception e){Debug.LogError($"读取配置文件失败: {e.Message}");return null; // 或者根据需求抛出异常}}}
功能说明:
跨平台路径处理
异步加载避免卡顿,处理了webgl以及windows下的文件加载
LitJson 对配置文件进行解析
连接服务(MqttConnectionService)
public void Connect(){ var options = new ConnectionOptionsBuilder()
#if UNITY_WEBGL && !UNITY_EDITOR.WithWebSocket(_config.MqttClientHost, _config.MqttClientPort_WebSocket)
#else.WithTCP(_config.MqttClientHost, _config.MqttClientPort_TCP)
#endif.WithProtocolVersion(GetProtocolVersion()).Build();_client = new MQTTClient(options);SetupEventHandlers();_client.BeginConnect(ConfigureConnection);}
功能说明:
自动识别平台选择协议(WebGL使用WebSocket)
随机ClientID避免冲突
重连机制(5次指数退避重试)
订阅服务(MqttSubscriptionService)
/// <summary>/// 订阅主题/// </summary>/// <param name="_topic">主题</param>/// <param name="_callback">回调</param>public void Subscribe(string _topic, Action<string> _callback){if (string.IsNullOrEmpty(_topic)){Debug.Log("Topic cannot be null or empty." + nameof(_topic));}if (!connection.IsConnected){Debug.Log($"Cannot subscribe to topic '{_topic}' because the _connection is not established.");return;}if (callbacks.ContainsKey(_topic)){callbacks.Remove(_topic);}Unsubscribe(_topic);callbacks.Add(_topic, _callback);connection._client.CreateSubscriptionBuilder(_topic).WithMessageCallback(HandleMessageReceived).BeginSubscribe();Debug.Log($"Subscribed to topic :{_topic}.");}
功能说明:
自动重订阅机制
线程安全的消息处理
消息发布服务(MqttPublishService)
/// <summary>/// 发布消息/// </summary>/// <param name="_data"></param>/// <param name="_topic"></param>private void Publish(string _topic, string _data){if (mqttPublishClient.State != ClientStates.Connected){Debug.Log("连接已经断开,不发送消息");return;}mqttPublishClient.CreateApplicationMessageBuilder(_topic).WithPayload(_data).WithQoS(Best.MQTT.Packets.QoSLevels.AtMostOnceDelivery).WithContentType("text/plain; charset=UTF-8").BeginPublish();}
全部代码
MqttManager
using Best.MQTT;
using Manager;
using System;
namespace MQTT
{/// <summary>/// MQTT管理器/// </summary>public class MqttManager : SingletonMono<MqttManager>{// 模块化组件private MqttConfigLoader _configLoader;private MqttConnectionService _connectionService;public MqttSubscriptionService _subscriptionService;public MqttPublishService _mqttPublishService;//mqtt发布服务protected override void Init(){base.Init();}private async void Start(){var config = await _configLoader.LoadConfigAsync();if (config != null){_connectionService = new MqttConnectionService();_connectionService.Initialize(config);_connectionService.Connect();_connectionService._client.OnConnected += HandleOnConnected;}}private void HandleOnConnected(MQTTClient mqttclient){_subscriptionService = new MqttSubscriptionService(_connectionService);_mqttPublishService = new MqttPublishService(_connectionService);}void OnDestroy(){_connectionService.isRightDisConnection = true;_connectionService?.Disconnect();}internal void InitCommpoent(){}}
}
MqttConfigLoader
using Best.HTTP.JSON.LitJson;
using Cysharp.Threading.Tasks;
using System;
using System.IO;
using System.Threading;
using UnityEngine;
using UnityEngine.Networking;namespace MQTT
{/// <summary>/// MQTT 配置加载/// </summary>public class MqttConfigLoader{/// <summary>/// 读取mqtt的配置文件/// </summary>/// <returns>读取到的mqtt配置文件</returns>public async UniTask<MQTTClientConfig> LoadConfigAsync(){var path = new Uri(Path.Combine(Application.streamingAssetsPath, "SystemConfig","MQTTConfig.json"));using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))){try{var json = await ReadFileAsyncByWebRequest(path.ToString(), cts.Token);var jsonData = JsonMapper.ToObject<MQTTClientConfig>(json);Debug.Log("MQTT 配置加载成功");return jsonData;}catch (Exception e){Debug.LogError($"读取配置文件失败: {e.Message}");return null; // 或者根据需求抛出异常}}}/// <summary>/// 通过webrequest 读取文件/// </summary>/// <param name="path">路径</param>/// <param name="cancellationToken"></param>/// <returns></returns>/// <exception cref="Exception"></exception>private async UniTask<string> ReadFileAsyncByWebRequest(string path, CancellationToken cancellationToken){using (var request = UnityWebRequest.Get(path)){var operation = request.SendWebRequest();while (!operation.isDone){await UniTask.Yield(PlayerLoopTiming.Update, cancellationToken);}if (request.result != UnityWebRequest.Result.Success){throw new Exception(request.error);}return request.downloadHandler.text;}}}
}
MqttConnectionService
using Best.MQTT;
using Best.MQTT.Packets.Builders;
using DG.Tweening;
using System;
using UnityEngine;namespace MQTT
{/// <summary>/// MQTT 连接服务(处理连接/断开/重连)/// </summary>public class MqttConnectionService{/// <summary>/// 是正常的断开链接吗?/// </summary>public bool isRightDisConnection = false;public MQTTClient _client;private MQTTClientConfig _config;private const int MaxReconnectAttempts = 5;private int _reconnectAttempts;public bool IsConnected => _client?.State == ClientStates.Connected;public void Initialize(MQTTClientConfig config){_config = config;}public MqttConnectionService() { }/// <summary>/// 链接/// </summary>public void Connect(){ var options = new ConnectionOptionsBuilder()
#if UNITY_WEBGL && !UNITY_EDITOR.WithWebSocket(_config.MqttClientHost, _config.MqttClientPort_WebSocket)
#else.WithTCP(_config.MqttClientHost, _config.MqttClientPort_TCP)
#endif.WithProtocolVersion(GetProtocolVersion()).Build();_client = new MQTTClient(options);SetupEventHandlers();_client.BeginConnect(ConfigureConnection);}/// <summary>/// 断开链接/// </summary>public void Disconnect(){isRightDisConnection = true;_client?.CreateDisconnectPacketBuilder().BeginDisconnect();_client = null;}private ConnectPacketBuilder ConfigureConnection(MQTTClient client, ConnectPacketBuilder builder){return builder.WithClientID(GenerateClientId()).WithCleanStart();}private void SetupEventHandlers(){_client.OnConnected += HandleConnected;_client.OnDisconnect += HandleDisconnected;_client.OnStateChanged += HandleStateChanged;}private void HandleStateChanged(MQTTClient client, ClientStates oldState, ClientStates newState){Debug.Log($"链接状态改变oldState:{oldState},newState:{newState}");}private void HandleConnected(MQTTClient client){Debug.Log("链接到mqtt服务器");isRightDisConnection = false;}private void HandleDisconnected(MQTTClient client, DisconnectReasonCodes code, string reason){Debug.Log("断开mqtt的链接"+ reason);if (isRightDisConnection == true){}else{TryReconnect();}}private void TryReconnect(){if (_reconnectAttempts >= MaxReconnectAttempts) return;DOTween.Sequence().AppendInterval(1f).AppendCallback(() =>{_reconnectAttempts++;Connect();});}private SupportedProtocolVersions GetProtocolVersion() =>_config.SupportedProtocolVersions == 0 ?SupportedProtocolVersions.MQTT_3_1_1 :SupportedProtocolVersions.MQTT_5_0;private string GenerateClientId() => Guid.NewGuid().ToString();}
}
MqttSubscriptionService
using Best.MQTT;
using System;
using System.Collections.Generic;
using System.Text;
using UnityEngine;namespace MQTT
{/// <summary>/// 订阅管理服务(处理订阅/取消订阅)/// </summary>public class MqttSubscriptionService{private readonly MqttConnectionService connection;private readonly Dictionary<string, Action<string>> callbacks = new();public MqttSubscriptionService(MqttConnectionService _connection){this.connection = _connection;this.connection._client.OnConnected += ReSubscribeWhenReconnecting;}/// <summary>/// 订阅主题/// </summary>/// <param name="_topic">主题</param>/// <param name="_callback">回调</param>public void Subscribe(string _topic, Action<string> _callback){if (string.IsNullOrEmpty(_topic)){Debug.Log("Topic cannot be null or empty." + nameof(_topic));}if (!connection.IsConnected){Debug.Log($"Cannot subscribe to topic '{_topic}' because the _connection is not established.");return;}if (callbacks.ContainsKey(_topic)){callbacks.Remove(_topic);}Unsubscribe(_topic);callbacks.Add(_topic, _callback);connection._client.CreateSubscriptionBuilder(_topic).WithMessageCallback(HandleMessageReceived).BeginSubscribe();Debug.Log($"Subscribed to topic :{_topic}.");}/// <summary>/// 取消订阅/// </summary>/// <param name="_topic">主题</param>public void Unsubscribe(string _topic){if (!connection.IsConnected || !callbacks.ContainsKey(_topic)) return;connection._client.CreateUnsubscribePacketBuilder(_topic).BeginUnsubscribe();callbacks.Remove(_topic);if (string.IsNullOrEmpty(_topic)){Debug.Log("Topic cannot be null or empty." + nameof(_topic));}if (!connection.IsConnected || !callbacks.ContainsKey(_topic)){Debug.Log($"Cannot unsubscribe from topic '{_topic}' because the _connection is not established or the topic is not subscribed.");return;}connection._client.CreateUnsubscribePacketBuilder(_topic).BeginUnsubscribe();callbacks.Remove(_topic);Debug.Log($"Unsubscribed from topic '{_topic}'.");}/// <summary>/// 收到消息之后的处理/// </summary>/// <param name="_"></param>/// <param name="__"></param>/// <param name="_topic"></param>/// <param name="_message"></param>private void HandleMessageReceived(MQTTClient _, SubscriptionTopic __, string _topic, ApplicationMessage _message){var payload = Encoding.UTF8.GetString(_message.Payload.Data, _message.Payload.Offset, _message.Payload.Count);if (callbacks.TryGetValue(_topic, out var callback)){callback?.Invoke(payload);}}/// <summary>/// 在连接的时候重新订阅之前的主题/// </summary>private void ReSubscribeWhenReconnecting(MQTTClient mqttclient){if (callbacks.Count == 0){return;}foreach (var topic in callbacks.Keys){Subscribe(topic, callbacks[topic]);}}}
}
MqttPublishService
using Best.MQTT;
using UnityEngine;namespace MQTT
{/// <summary>/// MQTT发布服务(发布消息)/// </summary>public class MqttPublishService{private MQTTClient mqttPublishClient;public MqttPublishService(MqttConnectionService mqttConnectionService){mqttPublishClient = mqttConnectionService._client;}/// <summary>/// 发布消息/// </summary>/// <param name="_data"></param>/// <param name="_topic"></param>public void Publish(string _topic, string _data){if (mqttPublishClient.State != ClientStates.Connected){Debug.Log("连接已经断开,不发送消息");return;}mqttPublishClient.CreateApplicationMessageBuilder(_topic).WithPayload(_data).WithQoS(Best.MQTT.Packets.QoSLevels.AtMostOnceDelivery).BeginPublish();}}
}
SingletonMono
单例类
using UnityEngine;namespace Manager
{/// <summary>/// 单例模式/// </summary>/// <typeparam name="T">需要变成单例的class</typeparam>public abstract class SingletonMono<T> : MonoBehaviour where T : SingletonMono<T>{protected static T _Instance = null;public static T Instance{get{if (null == _Instance){//寻找是否存在当前单例类对象_Instance = FindObjectOfType<T>();//不存在的话if (_Instance == null){//new一个并添加一个单例脚本_Instance = new GameObject(typeof(T).Name).AddComponent<T>();}else{_Instance.Init();}//在切换场景的时候不要释放这个对象DontDestroyOnLoad(_Instance.gameObject);}return _Instance;}}private void Awake(){if (_Instance == null) //如果未实例化{_Instance = this as T; //as T 指强转 为 派生类 传递进来的 类型Init(); //为实现 派生类继承 单例模式基类时 无法使用 Awake() 方法采用 Init() 虚方法 代替}}/// <summary>/// MonoSingleton 基类 留给派生类做Awake()初始化的方法[虚方法]/// </summary>protected virtual void Init(){}}
}
MQTTClientConfig
using System;namespace MQTT
{[Serializable]public class MQTTClientConfig{public string MqttClientClientID;public string MqttClientHost;public int MqttClientPort_TCP;public int MqttClientPort_WebSocket;public string MqttClientUserName;public string MqttClientPassword;public int SupportedProtocolVersions;}
}
MqttStart 使用示例
using MQTT;
using UnityEngine;
using UnityEngine.UI;namespace MQTT
{public class MqttStart : MonoBehaviour{public Button btn;private void Start(){MqttManager.Instance.InitCommpoent(); //创建并初始化mqttManager对象btn.onClick.AddListener(SendData2Server);//按钮绑定一个事件用于测试}//发送消息,并订阅一个主题private void SendData2Server(){MqttManager.Instance._mqttPublishService.Publish("test/data/wyw", "1");MqttManager.Instance._subscriptionService.Subscribe("test/data/wyw/get", (str) =>{Debug.Log("收到消息"+str);});}}
}