网页端HTML使用MQTTJs订阅RabbitMQ数据

  最近在做一个公司的日志组件时有一个问题难住了我。今天问题终于解决了。由于在解决问题中,在网上也查了很多资料都没有一个完整的实例可以参考。所以本着无私分享的目的记录一下完整的解决过程和实例。

  需求:做一个统一日志系统可以查看日志列表和一个可以订阅最新日志的页面。通过提供一个封装好日志记录方法的sdk文件将日志统一收集。

  通过上面的需求进行我们使用RabbitMQ+Mongodb来实现系统。

  使用C#封装一个SDK大家都会这里就不说了。C#连接RabbitMQ示例代码也是一堆堆的也没什么好说的。下面重点说一下网页端如何使用JS去订阅RabbitMQ收到的最新日志信息。

  后端都是使用RabbitMQ的AMQP协议,而前端要求在网页HTML上显示数据。我们选择了使用MQTT协议从RabbitMQ中订阅数据。

  具体步骤:

1、先准备好相关JS库。MQTT有一个叫browserMqtt.js看名字就知道是为浏览器提供的JS库。还有一个封装了操作MQ的JS库 mqfactory.js。最后还要一个jquery.js文件。这样工具就准备好了。JS文件下载

2、HTML端代码。

<script type="text/javascript" src="~/js/MqJs/jquery.js"></script>
<script type="text/javascript" src="~/js/MqJs/browserMqtt.min.js"></script>
<script type="text/javascript" src="~/js/MqJs/mqfactory.js"></script>
<body><div><lable>Host: </lable><input id="txtHost" placeholder="192.168.1.88" value="10.1.0.7" /><br /><lable>Port: </lable><input id="txtPort" placeholder="15675" value="15675" /><br /><label>UserName: </label><input id="txtUserName" placeholder="username" value="admin" /><br /><label>Password: </label><input id="txtPassword" placeholder="password" value="admin" /><br /><label>Protocol: </label><input id="txtProtocol" placeholder="ws" value="ws" /><br /><input id="btnConnect" type="button" value="Connect RabbitMQ" /></div><div><input id="btnSubscribe" type="button" value="Subscribe" /><input id="btnPublish" type="button" value="Publish" /><br /><input id="btnSSHuanjing" type="button" value="Subscribe Huanjing" /><input id="hdnIsSubscribed" type="hidden" value="" /><input id="btnPubHuanjing" type="button" value="Publish Huanjing"><br />路由:<input id="btnRoutingKey" type="text" value="Dcon/Logs/Client"><br /><input id="txtMessage" type="text" placeholder="Please enter message" /></div><div><label>log:</label><br /><ul id="lstLog"></ul><input id="btnClearLog" type="button" value="Clear Log" /></div>
</body>
<script type="text/javascript">$(function () {var mqclient;//var routingKey = 'Dcon.Logs.ServerWebShow';var message;$('#btnSubscribe').attr('disabled', 'disabled');$('#btnPublish').attr('disabled', 'disabled');$('#btnSSHuanjing').attr('disabled', 'disabled');$('#btnPubHuanjing').attr('disabled', 'disabled');$('#btnConnect').click(function () {var mqttOpts = {host: (() => $('#txtHost').val())(),port: (() => $('#txtPort').val())(),username: (() => $('#txtUserName').val())(),password: (() => $('#txtPassword').val())(),//transformWsUrl方法用于在浏览器中使用MQTT的场景,默认情况下,MQTT自动生成的url为ws://ip:port形式,//然而服务器要求的格式是ws://ip:port/ws,所以MQTT提供了此接口用于在生成url时自定义url格式transformWsUrl: (url, opts, client) => { return opts.protocol && opts.protocol == 'ws' ? url + 'ws' : url; },clientId: (() => { return 'mqttjs_' + Math.random().toString(16).substr(2, 8); })()};var biz = {huanjing: function (handler, isOn) {if (isOn !== false) {this.ss(this.topics.huanjing, handler);} else {this.sus(this.topics.huanjing, handler);}},topics: {huanjing: '/hyj/huanjing/monitor'}};//系统初始化时注入连接选项mqfactory.inject(mqttOpts, biz);//创建mqclient单例 mqclient = mqfactory.create();//注册mqclient的连接成功事件mqclient.on('connect', mqconnected);});$('#btnSubscribe').click(function () {if ($(this).val() == 'Subscribe') {//订阅成功后,仅注册一次事件(要考虑每次注册事件时,事件处理器调用的次数,如果仅用一次,就用once方法)//routingKey = $("#btnRoutingKey").val();mqclient.once('onss', mqSubscribeSuccess);//简单订阅mqclient.ss($("#btnRoutingKey").val());} else {mqclient.once('onsus', mqUnsubscribeSuccess)mqclient.sus($("#btnRoutingKey").val());}});$('#btnPublish').click(function () {var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();if (message === msg) {msg = guid();}message = msg;$('#txtMessage').val(message);//发送消息mqclient.pub($("#btnRoutingKey").val(), message);$('#lstLog').append('<li>Send Message: ' + message + '</li>');});$('#btnSSHuanjing').click(function () {if ($(this).val() == 'Subscribe Huanjing') {mqclient.once('onss', mqHJSubscribeSuccess);mqclient.huanjing(onHuanjingMessageArrived);} else {mqclient.once('onsus', mqHJUnsubscribeSuccess);mqclient.huanjing(onHuanjingMessageArrived, false);}});$('#btnPubHuanjing').click(function () {var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();if (message === msg) {msg = guid();}message = msg;$('#txtMessage').val(message);//发送消息mqclient.pub(mqclient.topics.huanjing, message);$('#lstLog').append('<li>Send Huanjing Message: ' + message + '</li>');});$('#btnClearLog').click(function () {$('#lstLog').empty();});function mqconnected() {//alert("mqconnected");$('#btnSubscribe').removeAttr('disabled');$('#btnPublish').removeAttr('disabled');$('#btnSSHuanjing').removeAttr('disabled');$('#btnPubHuanjing').removeAttr('disabled');$('#lstLog').append('<li>mqclient connected</li>');}function mqSubscribeSuccess() {//订阅成功,就注册接受消息的方法,此处要接收多次,因此使用了onmqclient.on($("#btnRoutingKey").val(), onMessageArrived);$('#btnSubscribe').val('Unsubscribe');$('#lstLog').append('<li>Subscribe successful.' + $("#btnRoutingKey").val()+'</li>');}function mqUnsubscribeSuccess() {//注销订阅,所以将事件处理器解除绑定mqclient.off($("#btnRoutingKey").val(), onMessageArrived);$('#btnSubscribe').val('Subscribe');$('#lstLog').append('<li>Unsubscribe successful</li>');}function mqHJSubscribeSuccess() {$('#btnSSHuanjing').val('Unsubscribe Huanjing');$('#lstLog').append('<li>Hanjing Subscribe successful</li>');}function mqHJUnsubscribeSuccess() {$('#btnSSHuanjing').val('Subscribe Huanjing');$('#lstLog').append('<li>Huanjing Unsubscribe successful</li>');}function onMessageArrived(message) {$('#lstLog').append('<li>Receive message: ' + new Date().toString() + '    ' + message.toString() + '</li>');}function onHuanjingMessageArrived(message) {$('#lstLog').append('<li>Receive Huanjing message: ' + new Date().toString() + '    ' + message.toString() + '</li>');}function guid() {function s4() {return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);}return s4() + s4() + '-' + s4() + '-' + s4() + '-' +s4() + '-' + s4() + s4() + s4();}});
</script>

3.后端代码:

3.1客户端sdk代码

/// <summary>/// 写日志/// </summary>/// <param name="model"></param>public static void Write(LogModel model){//判断写入的日志级别if (model != null && model.LogLevel >= LogLevel){try{var mqMsg = new MqMessage(){MessageBody = JSON.Serialize(model),MessageRouter = SystemConst.RoutingKeyTopic.LogTopic_Producer};//MQHelper.Instance.ProducerMessage_Fanout(mqMsg);MQHelper.Instance.ProducerMessage_Topic(mqMsg);}catch (Exception ex){var errorLog = string.Format("Ip:{0},LogHelper.Write方法异常,{1}", IpHelper.LocalHostIp, ex.Message);//MQHelper.Instance.ProducerMessage_Fanout(new MqMessage() { MessageBody = errorLog });MQHelper.Instance.ProducerMessage_Topic(new MqMessage() { MessageBody = errorLog });}}}

3.2后端MQ代码:

#region 主题 交换机/// <summary>/// 生产者 客户端调用/// </summary>/// <param name="msg"></param>public void ProducerMessage_Topic(MqMessage msg){try{using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){var body = Encoding.UTF8.GetBytes(msg.MessageBody);channel.BasicPublish(exchange: SystemConst.MqName_LogMq_TopicDefault,routingKey: msg.MessageRouter,basicProperties: null,body: body);Console.WriteLine(" [x] Sent {0}", msg.MessageBody);}}}catch (Exception ex){var exMsg = ex.Message;}}/// <summary>/// 消费者 服务器接收并写入数据库/// 消费方法无法通过参数传入/// EventHandler<BasicDeliverEventArgs> received/// </summary>public void ConsumeMessage_Topic(params string[] routingKeys){if (routingKeys == null || routingKeys.Length == 0){throw new Exception("请指定接收路由");}using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){var queueName = channel.QueueDeclare().QueueName;//获得已经生成的随机队列名//对列与交换机绑定foreach (var rKey in routingKeys){channel.QueueBind(queue: queueName,exchange: SystemConst.MqName_LogMq_TopicDefault,routingKey: rKey);}var consumer = new EventingBasicConsumer(channel);//绑定消费方法consumer.Received += consomer_Received_Topic;//绑定消费者channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine("日志订阅服务启动成功.");Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}/// <summary>/// 接收通知服务异步的推送/// </summary>/// <param name="sender"></param>/// <param name="e"></param>void consomer_Received_Topic(object sender, BasicDeliverEventArgs e){var body = e.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] {0}", message);
//这里可以增加写入数据库的代码}#endregion

3.3路由

/// <summary>/// 主题路由/// </summary>public class RoutingKeyTopic{/// <summary>/// 生产者/// </summary>public const string LogTopic_Producer = "Dcon.Logs.Client";/// <summary>/// 消息者_日志服务_保存日志/// </summary>public const string LogTopic_Consume_Server_SaveDB = "Dcon.Logs.*";/// <summary>/// 消息者_日志服务_Web显示日志/// </summary>public const string LogTopic_Consume_Server_WebShow = "Dcon.Logs#";//".Logs.Client";/// <summary>/// 消息者_日志服务_Web显示日志/// </summary>public const string LogTopic_Consume_Server_WebShow_T = "*.Logs.Client";//".Logs.Client";/// <summary>/// 消息者_日志服务_ # 接收所有/// </summary>public const string LogTopic_Consume_Server_All = "#";//".Logs.Client";}}

注意点:

1、MQTT的路由是以 / 来分割的。在RabbitMQ中会被转义成 . 如示例中的路由Dcon/Logs/Client会被转换成 Dcon.Logs.Client

2、网页端接收时的路由要和发送端的路由一至。也就是说 后端用 Dcon.Logs.Client 来推数据前端就要使用 Dcon/Logs/Client来接收数据。

3、MQTT路由不支持通配符.

4、由于MQTT的JS库没有提供Topic交换机与路由绑定功能。所以前端接收时 不能设置订阅主题交换机名称。如果要和amqp交互只能使用amqp的默认主题交换机名称 amq.topic

运行效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/303379.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BUUCTF:BUU UPLOAD COURSE 1[WriteUP]

构造一句话PHP木马 <?php eval(system($_POST[shell])); ?> 利用eval函数解析$shell的值使得服务器执行system命令 eval函数是无法直接执行命令的&#xff0c;只能把字符串当作php代码解析 这里我们构造的木马是POST的方式上传&#xff0c;那就用MaxHacKBar来执行 …

Lua热更新(AssetBundle)

AssetBundle 新版本导入ab包报错,则删除其中的Tests文件夹。 给资源分组 打包设置:平台、路径、重复打包清空文件夹、复制到streaming文件夹 建议勾选 建议使用LZ4压缩方式 用来观察文件中的包大小,不常用 参数总结: 这六个只做了解,重要的是上面的

基于Dell 3930 RACK服务器的RAID1配置

**背景&#xff1a;**项目上使用的Dell 3930 RACK服务器需要配置RAID1冗余备份功能&#xff0c;设置比较简单&#xff0c;此处也做个记录&#xff0c;以免忘记。 步骤&#xff1a; 1、重启服务器&#xff0c;启动过程中按F12&#xff0c;进入设置界面 2、先选中进入BIOS Setup…

MT3021 拦截罪犯

思路&#xff1a;用二分&#xff0c;每次二分间距&#xff0c;判断需要的组数是否>k。 #include <bits/stdc.h> using namespace std; const int N 1e5 10; int L, n, k; int a[N];bool check(int p) { // 看此时的间距所用的警力数满不满足<kint cnt 0;for (in…

苹果商店审核指南:确保Flutter应用顺利通过审核的关键步骤

引言 Flutter是一款由Google推出的跨平台移动应用开发框架&#xff0c;其强大的性能和流畅的用户体验使其备受开发者青睐。然而&#xff0c;开发一款应用只是第一步&#xff0c;将其成功上架到苹果商店才是实现商业目标的关键一步。本文将详细介绍如何使用Flutter将应用程序上…

【随笔】Git 高级篇 -- 项目里程碑 git tag(二十)

&#x1f48c; 所属专栏&#xff1a;【Git】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#x1f496; 欢迎大…

ElasticSearch分词检索

1. 倒排索引&#xff1a;表示一种数据结构&#xff0c;分词词条与文档id集合的隐射关系 2. 它跟关系型数据库是一种互补的关系&#xff0c;因为关系型数据库支持事务操作&#xff0c;满足ACID原则 3. 索引库的文档字段只允许新增不允许修改 1.创建索引库 put /索引库名称2.1 …

xss.pwnfunction-Ah That‘s Hawt

<svg/onloadalert%26%2340%3B1%26%2341%3B> <svg/>是一个自闭合形式 &#xff0c;当页面或元素加载完成时&#xff0c;onload 事件会被触发&#xff0c;从而可以执行相应的 JavaScript 函数

视频插针调研

视频插针 1、评估指标2、准确度3、实时4、视频流处理3、实时RIFE视频插帧测试 1、评估指标 参考&#xff1a;https://blog.csdn.net/weixin_43478836/article/details/104159648 https://blog.csdn.net/weixin_43605641/article/details/118088814 PSNR和SSIM PSNR数值越大表…

【springboot开发】Gradle VS Maven

前言&#xff1a; java构建工具的主要作用是依赖管理和构建的生命周期管理。gradle和maven是目前java中最流行的两个构建工具&#xff0c;springboot常用maven&#xff0c;Android studio使用gradle。 目录 1. 简介2. Maven2.1 安装2.2 依赖管理2.3 构建生命周期管理 3. Gradle…

Utilize webcam to capture photo with camera

1. Official Guide& my github Official course my github 2. Overcome Webcam js Error in Chrome: Could not access webcam link 直接把代码拷贝到本机的下述目录下 To ignore Chrome’s secure origin policy, follow these steps. Navigate to chrome://flags/#un…

大语言模型如何工作?

此为观看视频How Large Language Model works的笔记。 GPT&#xff08;Generative Pre-trained Transformer&#xff09;是一个大语言模型&#xff08;LLM&#xff09;&#xff0c;可以生成类似人类的文本。本文阐述&#xff1a; 什么是LLMLLM如何工作LLM的应用场景 什么是…

基于JSP的网上订餐系统

第一章 绪论 1.1课题背景与意义 自新世纪以来&#xff0c;我国经济发生翻天覆地的变化。中国经济发展迎来空前巨大的机遇与挑战&#xff0c;世界性的发展交流在这三十年较近四十年的时间中整体性上升发展&#xff0c;东西文化的碰撞&#xff0c;不断为国民经济的发展注入新鲜…

通信分类3G,4G,5G,通信专用名词

Generation: 2G: GSM全名为&#xff1a;Global System for Mobile Communications&#xff0c;中文为全球移动通信系统&#xff0c;俗称"全球通"&#xff0c;是一种起源于欧洲的移动通信技术标准&#xff0c;是第二代移动通信技术 3G&#xff1a;WCDMA 4G&#xff1a…

计算机视觉——基于傅里叶幅度谱文档倾斜度检测与校正

概述 在计算机视觉领域&#xff0c;处理文档数据时&#xff0c;OCR算法的性能往往会受到文档的倾斜度影响。如果文档在输入到模型之前没有经过恰当的校正&#xff0c;模型就无法期待模型能够提供准确的预测结果&#xff0c;或者模型预测的精度会降低。例如&#xff0c;在信息提…

Day106:代码审计-PHP原生开发篇文件安全上传监控功能定位关键搜索1day挖掘

目录 emlog-文件上传&文件删除 emlog-模板文件上传 emlog-插件文件上传 emlog-任意文件删除 通达OA-文件上传&文件包含 知识点&#xff1a; PHP审计-原生开发-文件上传&文件删除-Emlog PHP审计-原生开发-文件上传&文件包含-通达OA emlog-文件上传&文件…

基于YOLOv8的摄像头下铁路工人安全作业检测系统

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文摘要&#xff1a;基于YOLOv8的铁路工人安全作业检测系统&#xff0c;属于小目标检测范畴&#xff0c;并阐述了整个数据制作和训练可视化过程&#xff0c; 博主简介 AI小怪兽&#xff0c;YOLO骨灰级玩家&#xff0c;1&#xff0…

【计算机考研】408网课汇总+资源分享

王道的四件套无疑是大多数同学的首选。相比其他课程来说&#xff0c;也是属于市面上最好的408课程了。 从今年的难度来看选择题部分和计网&#xff0c;比起往年来看是有很多偏题&#xff0c;大题除了计网的冷门外&#xff0c;其他倒是中规中矩。总体来看24考研的408难度是非常…

Vue - 4( 8000 字 Vue 入门级教程)

一&#xff1a; Vue 初阶 1.1 关于不同版本的 Vue Vue.js 有不同版本&#xff0c;如 vue.js 与 vue.runtime.xxx.js&#xff0c;这些版本主要针对不同的使用场景和需求进行了优化&#xff0c;区别主要体现在以下几个方面&#xff1a; 完整版 vs 运行时版&#xff1a; vue.js&…

软件可靠性基本概念_1.定义和定量描述

1.软件可靠性定义 软件可靠性&#xff08;Software Reliability&#xff09;是软件产品在规定的条件下和规定的时间区间完成规定功能的能力。规定的条件是指直接与软件运行相关的使用该软件的计算机系统的状态和软件的输入条件&#xff0c;或统称为软件运行时的外部输入条件&am…