目录
0 任务概述
1 不同的与服务器进行持续通讯的方式对比
2 C++参考链接:
3 C#实现
服务端代码:
客户端代码:
4 代码集成
5 试验过程
1> 开启mqtt服务器
2> 启动WebSocket_Server
3> 启动WebSocket_Client
4> 服务端信息
0 任务概述
架构形式:
1> Hololens*2 PC 三者之间用mqtt进行通讯;
2> 在PC上做开发,接收来自2个hololens的mqtt的消息,将其转发到websocket服务器(手持终端);
3> PC上MQTT+WEBSOCKET(暂定直接转发)。
目标:假定有一条来自mqtt的信息,将这条信息转发至WebSocket服务器端。
1 不同的与服务器进行持续通讯的方式对比
>>>>>>>>>>>> | 传统轮询 | 长轮询 | 服务器发送事件 | WebSocket |
---|---|---|---|---|
浏览器支持 | 几乎所有现代浏览器 | 几乎所有现代浏览器 | Firefox 6+ Chrome 6+ Safari 5+ Opera 10.1+ | IE 10+ Edge Firefox 4+ Chrome 4+ Safari 5+ Opera 11.5+ |
服务器负载 | 较少的CPU资源,较多的内存资源和带宽资源 | 与传统轮询相似,但是占用带宽较少 | 与长轮询相似,除非每次发送请求后服务器不需要断开连接 | 无需循环等待(长轮询),CPU和内存资源不以客户端数量衡量,而是以客户端事件数衡量。四种方式里性能最佳。 |
客户端负载 | 占用较多的内存资源与请求数。 | 与传统轮询相似。 | 浏览器中原生实现,占用资源很小。 | 同Server-Sent Event。 |
延迟 | 非实时,延迟取决于请求间隔。 | 同传统轮询。 | 非实时,默认3秒延迟,延迟可自定义。 | 实时。 |
实现复杂度 | 非常简单。 | 需要服务器配合,客户端实现非常简单。 | 需要服务器配合,而客户端实现甚至比前两种更简单。 | 需要Socket程序实现和额外端口,客户端实现简单。 |
websocket详解
http - WebSocket 详解 - 不挑食的程序员 - SegmentFault 思否
2 C++参考链接:
zaphoyd/websocketpp: C++ websocket client/server library (github.com)
在电脑上试跑examples/echo_client和echo_server两个cpp文件;其他的文件都尝试了,但是跑不成功。
结果:成功连接,但是无法将自定义的消息发送出来。
服务器截图:
客户端截图:
在另外一篇博文中已用C#实现mqtt信息接收,因此考虑改为C#进行开发。
链接:https://mp.csdn.net/mp_blog/creation/editor/new/129476972
3 C#实现
服务端代码:
using System;
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace WebSocketServer
{class WebSocket_Server{#region 避免重复显示static string lastMsg;static int lastCount;static int dupMsgCount = 0;static bool isFirst = true;#endregionstatic async Task Main(string[] args){string localIp = "127.0.0.1"; // 默认本地IP地址string port = ":8080";foreach (IPAddress ip in Dns.GetHostAddresses(Dns.GetHostName())){if (ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork){localIp = ip.ToString();break;}}Console.WriteLine("Server started at: " + localIp + port);HttpListener listener = new HttpListener();listener.Prefixes.Add("http://localhost:8080/");listener.Start();while (true){HttpListenerContext context = await listener.GetContextAsync();if (context.Request.IsWebSocketRequest){WebSocketContext webSocketContext = await context.AcceptWebSocketAsync(null);WebSocket webSocket = webSocketContext.WebSocket;Console.WriteLine("Client connected: " + context.Request.RemoteEndPoint.ToString());await Echo(webSocket, context);}else{context.Response.StatusCode = 400;context.Response.Close();}}}private static async Task Echo(WebSocket webSocket, HttpListenerContext context){byte[] buffer = new byte[1024];while (webSocket.State == WebSocketState.Open)try{WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);if (result.MessageType == WebSocketMessageType.Text){string message = Encoding.UTF8.GetString(buffer, 0, result.Count);if (isFirst){lastMsg = message; //将第一次进入函数体的消息初始化为message的内容lastCount = 1;isFirst = false;}//Console.WriteLine("Received: message - " + message);//如果收到的信息重复,则不显示if (string.Equals(lastMsg, message)){dupMsgCount++;}else if ((dupMsgCount != 1) && (dupMsgCount != 0)){lastCount = dupMsgCount;//Console.WriteLine("Received: response - " + response + " " + dupMsgCount + " times. ");Console.WriteLine("Received: message - " + lastMsg + " " + lastCount + " times. ");dupMsgCount = 1;}lastMsg = message;byte[] responseBuffer = Encoding.UTF8.GetBytes("Echo: " + message);//将收到的数据回响给客户端await webSocket.SendAsync(new ArraySegment<byte>(responseBuffer), WebSocketMessageType.Text, true, CancellationToken.None);}else if (result.MessageType == WebSocketMessageType.Close){Console.WriteLine("Client disconnected: " + context.Request.RemoteEndPoint.ToString());await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);}}catch (Exception ex){Console.WriteLine("Error: " + ex.Message);}}}
}
客户端代码:
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace WebSocketClient
{class WebSocket_Client{static async Task Main(string[] args){ClientWebSocket webSocket = new ClientWebSocket();string targetURI = "ws://localhost:8080";await webSocket.ConnectAsync(new Uri(targetURI), CancellationToken.None);Console.WriteLine("Connected to server at: " + targetURI);byte[] buffer;string message;while (true){Console.Write("Enter a message (type 'exit' to quit): ");message = Console.ReadLine();if (message.ToLower() == "exit"){break;}buffer = Encoding.UTF8.GetBytes(message);await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);buffer = new byte[1024];WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);string response = Encoding.UTF8.GetString(buffer, 0, result.Count);Console.WriteLine("Received: " + response);}// Send a close message to the server before closing the connectionawait webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);}}
}
4 代码集成
MQTT+WebSocket客户端
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
using System.IO;namespace csharpMQTT
{class CsharpMQTT_Com{static string _msg;#region 避免重复显示static string lastMsg;static int lastCount;static int dupMsgCount = 0;static bool isFirst = true;#endregion//MQTT部分static MqttClient ConnectMQTT(string broker, int port, string clientId, string username, string password){MqttClient client = new MqttClient(broker, port, false, MqttSslProtocols.None, null, null);client.Connect(clientId, username, password);if (client.IsConnected){Console.WriteLine("Connected to MQTT Broker. Client Id: " + clientId.ToString());}else{Console.WriteLine("Failed to connect");}return client;}static async Task Publish(MqttClient client, string topic){int msg_count = 0;//自定义消息的内容while (true){//System.Threading.Thread.Sleep(1 * 1000);await Task.Delay(TimeSpan.FromSeconds(1));string msg = "messages: " + msg_count.ToString();client.Publish(topic, System.Text.Encoding.UTF8.GetBytes(msg));//将msg通过话题topic发布Console.WriteLine("Send `{0}` to topic `{1}`", msg, topic);msg_count++;_msg = msg;_ = WebSocketClient_Main();}}static void Subscribe(MqttClient client, string topic){client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;//用于将clientMqttMsgPublishReceived方法添加到对象MqttMsgPublishReceived的事件中。当客户端接收到来自订阅主题的消息时,将调用clientMqttMsgPublishReceived方法。client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){//sender是事件的发送者,即MqttClient对象,e是事件参数,即MqttMsgPublishEventArgs对象。//MqttMsgPublishEventArgs对象包含有关消息的信息:主题、内容、是否重复、质量等级、是否应被保留。string payload = System.Text.Encoding.Default.GetString(e.Message);Console.WriteLine("Received `{0}` from `{1}` topic", payload, e.Topic.ToString());}static async Task Main(){try{string broker = "broker.emqx.io";//默认的云端服务器地址//string broker = "115.156.168.35"; //本地服务器ipint port = 1883;string topic = "Csharp/mqtt";string clientId = Guid.NewGuid().ToString();//生成全局唯一标识符,用于标识客户端ID地址string username = "emqx";string password = "public";MqttClient client = ConnectMQTT(broker, port, clientId, username, password);Subscribe(client, topic);//异步方法,它不会阻塞当前线程。相反,它会返回一个Task对象,表示指定的时间过去后,操作将完成。在等待这个Task对象完成的过程中,线程可以执行其他操作。这使得程序更加响应,因为用户可以与程序进行交互,而不必等待Task.Delay返回。var cancellationTokenSource = new CancellationTokenSource();var publishTask = Task.Run(() => Publish(client, topic), cancellationTokenSource.Token);await publishTask;//Publish(client, topic);}catch (Exception ex){Console.WriteLine("An error occurred: " + ex.Message);}}//WebSocket_Client部分static async Task WebSocketClient_Main(){//连接WebSocket服务器ClientWebSocket webSocket = new ClientWebSocket();string targetURI = "ws://localhost:8080";await webSocket.ConnectAsync(new Uri(targetURI), CancellationToken.None);Console.WriteLine("Connected to server at: " + targetURI);//发送和接收消息byte[] buffer;string message;while (true){#region 注释的是原先手动输入信息内容的代码//Console.Write("Enter a message (type 'exit' to quit): ");//message = Console.ReadLine();//if (message.ToLower() == "exit")//{// break;//}#endregionmessage = _msg;//将消息替换为mqtt处的消息if (isFirst) {lastMsg = message; //将第一次进入函数体的消息初始化为message的内容lastCount = 1;isFirst = false; }buffer = Encoding.UTF8.GetBytes(message);await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);buffer = new byte[1024];WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);string response = Encoding.UTF8.GetString(buffer, 0, result.Count);//将从服务器端收到的数据存储//如果收到的信息重复,则不显示if (string.Equals(lastMsg, message)){dupMsgCount++;}else if ((dupMsgCount != 1) && (dupMsgCount != 0)){lastCount = dupMsgCount;//Console.WriteLine("Received: response - " + response + " " + dupMsgCount + " times. ");Console.WriteLine("Received: response - " + lastMsg + " " + lastCount + " times. ");//记录数据改变时,实际上已经是下一组数据了;所以要记录改变前的数据和次数,对外输出。dupMsgCount = 1;}lastMsg = message;}//关闭WebSocket连接await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);}}
}
5 试验过程
1> 开启mqtt服务器
20230523下载emqx-5.0.25-windows-amd64.zip按照上述方式启动会提示路径有问题。如下图
没有找到解决方法,已改用emqx-4.4.18-otp24.3.4.6-windows-amd64。
进入bin目录下运行DOS窗口,输入emqx start,4.x版本运行成功无提示(5.x版本会有两行代码),任务管理器中进程多一项Erlang,如下。即为成功。
2> 启动WebSocket_Server
等待客户端连接界面:
3> 启动WebSocket_Client
客户端启动并连接成功:
4> 服务端信息
连接成功后服务端界面
客户端意外退出,服务器也不会断开。
6> 后期又修改了一些内容,使控制台输出信息统计次数,如图。
6 补充知识
1.mqtt消息的质量等级
SpringBoot 开发之 MQTT 协议消息质量等级分析_知道的越多,不知道的越多的博客-CSDN博客
qos 是发送方和接收方之间达成的协议,不是发布者和订阅者之间达成的协议。
也就是说发布者 Publish 一条 qos = 1 的消息,只能保证 Broker 能至少收到一次这个消息,至于对应的订阅者能否至少收到一次这个消息,还要取决于订阅者在 Subscribe 的时候和 Broker 协商的 qos 等级。
qos 为1 或者2时,消息会携带 messageId,qos 为 0 时没有。
qos = 0,最多一次送达。也就是消息发出去就直接删除掉,没有后面的事情了。
qos = 1,至少一次送达。发送方消息发出去之后会将消息保存,等待接收方带 messageId 的回应,如果没有回应,就会重新发送,直到回应到达后再把消息删除。
qos = 2,准确一次送达。1 发送方消息发送后保存发送的消息,等待接收方回应,如果没有回应将会再次发送。
2 接收方会记录收到的 messageId,之后再收到相同 messageId 的消息都会被丢弃掉。
3 接收方发送带 messageId 的确认消息,等待发送方的回应,如果没有等到发送方回应,接收方会重复发送确认消息。
4 发送方收到确认消息之后,删除发送的消息,并发送带 messageId 的已确认消息。
5 接收方接收已确认消息后删除确认消息。
7 其他可行链接
用C#和html js文件实现了WebSocket通讯,后续还要考虑如何将变量从网页中取出来https://blog.csdn.net/Shuai_Sir/article/details/127364565?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168493215516800226547797%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=168493215516800226547797&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~hot_rank-1-127364565-null-null.142%5Ev87%5Econtrol_2,239%5Ev2%5Einsert_chatgpt&utm_term=c%23%20websocket%E9%80%9A%E8%AE%AF&spm=1018.2226.3001.4187