SpringBoot(Java)实现MQTT连接(本地Mosquitto)通讯调试

1.工作及使用背景

        工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析,如电表、流量计、泵、控制器等物联网设备。

        目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储(也会涉及收费问题),通过组态软件自带的转储工具将数据转储到关系型数据库,如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。

        但上述解决方案实际上是实现简单,但是数据统计时机有潜在的偏差风险,且逻辑设计非常别扭,数据库压力大等问题,理论上应该通过消息队列来接收实时数据参与计算的方式,Web系统只负责展示计算统计之后的结果,这样无论是时效还是数据准确性更容易保证,实时数据存储的数据库压力也不存在(可做数据校验用,也可不用),逻辑也不显别扭。

2.开发环境及工具

JDK1.8、maven、Mosquitto、IDEA、postman

3.框架结构及文件声明

因为我用的现成的框架,所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。

MqttClientConnectorPool对外提供一个初始化的Mqtt客户端,在服务启动时初始化
MqttMsgSender对外提供一个可以执行消息发送的方法
MqttMsgSubscriber初始化一个Mqtt客户端,并根据配置订阅topic
TestController接收web请求的调用消息发送,用于测试
BusinessApplicationStartup服务启动时执行,调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待
BusinessApplicationShutdown服务正常终止时调用,关闭服务启动默认创建的Mqtt客户端
MqttBrokerServerSpringBoot服务启动类

4.具体实现逻辑及代码

4.1 maven依赖

<properties><MQTTv3.version>1.2.5</MQTTv3.version>
</properties><dependencyManagement><dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>${MQTTv3.version}</version></dependency></dependencies>
</dependencyManagement>或者直接使用
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>1.2.5</version>
</dependency>

4.2 MqttClientConnectorPool

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttClientConnectorPool {public static MqttClient mqttClient;/*** 连接MQTT客户端* @return 获取MQTT连队对象*/public static MqttClient connectMQTT() {if (mqttClient != null){log.info("已存在,我深深的脑海!");return mqttClient;}try {// broker及连接信息String broker = "tcp://127.0.0.1:1883";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";//创建MQTT客户端(指定broker、客户端id、消息持久策略)mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());//创建连接参数配置MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);//连接超时时间options.setKeepAliveInterval(20);//是否自动重连options.setAutomaticReconnect(true);mqttClient.connect(options);log.info("MqttClient 服务启动broker初始化!");} catch (MqttException e){log.error("MqttClient connect Error:{}", e.getMessage());e.printStackTrace();}return mqttClient;}/*** 关闭MQTT客户端* @param client client*/public static void closeClient(MqttClient client){try {// 断开连接client.disconnect();// 关闭客户端client.close();} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}/*** 关闭MQTT客户端*/public static void closeStaticClient(){try {if (mqttClient != null){// 断开连接mqttClient.disconnect();// 关闭客户端mqttClient.close();}} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}
}

4.3 MqttMsgSender

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class MqttMsgSender {public void sendMessage(MqttClient client,String topic,String content,int qos){MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try{client.publish(topic,message);} catch (MqttException e){log.error("MqttClient publish text info Error:{}!", e.getMessage());e.printStackTrace();}}
}

4.4 MqttMsgSubscriber

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttMsgSubscriber {String broker = "tcp://127.0.0.1:1883";String topic = "/deviceUp";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";int qos = 1;public void readSubscribeTopicMessage(){try {MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);options.setConnectionTimeout(60);options.setKeepAliveInterval(60);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {log.error("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info("topic为: " + topic);log.info("qos为: " + mqttMessage.getQos());log.info("消息内容为: " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {// 当消息被完全传送出去后调用log.info("交付完成 ---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}});client.connect(options);client.subscribe(topic, qos);} catch (MqttException e){log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());} catch (Exception e){log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());}}}

4.5 TestController

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;import java.util.List;@Slf4j
@RestController
@RequestMapping()
public class TestController {@GetMapping("/test/mqtt/{msg}")public String testSendMqttMsg(@PathVariable("msg") String msg){log.info("消息内容:{}.", msg);MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();MqttMsgSender sender = new MqttMsgSender();String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";String topic = "/deviceUp";int qos = 1;if (null != mqttClient){sender.sendMessage(mqttClient, topic, content, qos);} else {log.info("MqttClient为空,无法发送!");return "失败!";}return "成功!";}}

4.6 BusinessApplicationStartup

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import 包路径(可以删掉这一行手动导入).MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;@Slf4j
@Order(10)
@Component
public class BusinessApplicationStartup implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("MqttClientConnectorPool ===================== Startup");MqttClientConnectorPool.connectMQTT();log.info("MqttClientConnectorPool ===================== recoveryAllJob Over !");log.info("MqttMsgSubscriber ===================== Startup");// 先订阅等待MqttMsgSubscriber subscriber = new MqttMsgSubscriber();subscriber.readSubscribeTopicMessage();}
}

4.7 BusinessApplicationShutdown

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class BusinessApplicationShutdown implements ApplicationListener<ContextClosedEvent> {@Overridepublic void onApplicationEvent(ContextClosedEvent contextClosedEvent) {log.info("服务终止! shutdown hook, ContextClosedEvent");MqttClientConnectorPool.closeStaticClient();}}

4.8 MqttBrokerServer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@EnableScheduling
@SpringBootApplication
public class MqttBrokerServer {public static void main(String[] args) {SpringApplication.run(MqttBrokerServer.class, args);}}

5.其他备注

5.1 需要Mqtt(Broker)服务器

        如果是直接使用示例代码的Mqtt服务器(Broker)配置,需要在自己电脑上安装Mqtt服务器,如mosquitto、EMQX等,具体自行搜索,或者使用公用的Mqtt服务器(我没测试试过

// 📢注意,当前Broker本人未测试
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";

5.2 调试地址

如果配置文件没配置[server.servlet.context-path],就不需要我自己/backend这一段

6.参考文章

MQTT协议介绍及Java教程

https://baijiahao.baidu.com/s?id=1801542244354727565&wfr=spider&for=pc

7.喜欢作者

暂无

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/435072.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ -- 异常

C中的异常是用于处理程序执行过程中出现的错误情况。通过异常处理&#xff0c;程序可以在遇到错误时优雅地处理这些问题&#xff0c;而不是直接崩溃。 C语言处理错误的方式 C语言传统的处理错误的方式主要有两种&#xff1a; 终止程序&#xff1a;使用如assert这样的宏来检查…

RTX 5090、5080规格完整曝光,来看来看

近日&#xff0c;科技圈内再掀波澜&#xff0c;有知名博主独家揭秘了英伟达即将推出的RTX 5090与RTX 5080两款高端显卡的详尽规格&#xff0c;预示着显卡市场即将迎来新一轮的性能飞跃与定位分化。 据最新披露的信息&#xff0c;这两款显卡均采用了先进的PG144/145-SKU30 PCB设…

如何借助Java批量操作Excel文件?

最新技术资源&#xff08;建议收藏&#xff09; https://www.grapecity.com.cn/resources/ 前言 | 问题背景 在操作Excel的场景中&#xff0c;通常会有一些针对Excel的批量操作&#xff0c;批量的意思一般有两种&#xff1a; 对批量的Excel文件进行操作。如导入多个Excel文件…

若依--Request.js

编写一个request.js的基本类&#xff0c;封装一些信息&#xff0c;比如请求地址、响应时间、携带的token参数等等。 //创建一个axios实列这里的 import.meta.env.VITE_APP_BASE_API 表示这个基础 URL 的值来自于环境变量。通常&#xff0c;这种做法用于将不同环境&#xff08;…

vue3结合 vue-router和keepalive实现路由跳转保持滚动位置不改变(超级简易清晰)

1.首先我们在路由跳转页面设置keepalive(Seeall是我想实现结果的页面) 2. 想实现结果的页面中如果不是全屏实现滚动而是有单独的标签实现滚动效果

docker - 迁移和备份

文章目录 1、docker commit1.1、查询 容器 docker ps1.2、docker commit zookeeper zookeeper:3.4.13 2、docker save -o2.1、宿主机 切换到 /opt 目录下2.2、将镜像保存到 宿主机/opt目录下 3、docker load -i 对某一个容器修改完毕以后&#xff0c;我们可以把最新的容器部署到…

HTML5实现好看的唐朝服饰网站模板源码2

文章目录 1.设计来源1.1 网站首页1.2 唐装演变1.3 唐装配色1.4 唐装花纹1.5 唐装文化 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板&#xff0c;程序开发&#xff0c;在线开发&#xff0c;在线沟通 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.ne…

Spring Boot实战:构建在线商城系统

1 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范化管理。这样的大环境让那些止步不前&#…

iLogtail 进化论:重塑可观测采集的技术边界

作者&#xff1a;余韬(迅飞) 采集代理发展回顾 iLogtail 作为一款开创性的轻量级日志采集器&#xff0c;历经 13 载风雨&#xff0c;始终致力于高效地从多元化的数据源中萃取、处理可观测信息&#xff0c;并无缝传输至阿里云日志服务或各类日志分析平台。今年&#xff0c;适逢…

矩阵奇异值

一、ATA 任给一个矩阵A&#xff0c;都有&#xff1a; ATA 为一个对称矩阵 例子&#xff1a;A为一个mn的矩阵&#xff0c;A的转置为一个nm的矩阵 对称矩阵的重要性质如下&#xff1a; ① 对称矩阵的特征值全为实数&#xff08;实数特征根&#xff09; ② 任意一个n阶对称矩阵…

《黑神话:悟空》天命人速通法宝 | 北通鲲鹏20智控游戏手柄评测

《黑神话:悟空》天命人速通法宝 | 北通鲲鹏20智控游戏手柄评测 哈喽小伙伴们好&#xff0c;我是Stark-C~ 截止目前&#xff0c;《黑神话:悟空》已经面世一个多月&#xff0c;不知道还有多少天命人没有通关呢&#xff1f; 作为国内首款真正意义上的3A大作&#xff0c;《黑神话…

实验一 网络基础及仿真模拟软件Packet Tracer 入门

实验一 网络基础及仿真模拟软件Packet Tracer 入门 【实验目的】 一、认识 Packet Tracer 。 二、学习使用 Packet Tracer 进行拓扑的搭建。 三、学习使用 Packet Tracer 对设备进行配置&#xff0c;并进行简单的测试。 【实验内容和结果】 一、拖放设备和布置线缆 二、用…

Redis系列补充:聊聊布隆过滤器(go语言实践篇)

1 介绍 布隆过滤器&#xff08;Bloom Filter&#xff09;是 Redis 4.0 版本之后提供的新功能&#xff0c;我们一般将它当做插件加载到 Redis Service服务器中&#xff0c;给 Redis 提供强大的滤重功能。 它是一种概率性数据结构&#xff0c;可用于判断一个元素是否存在于一个集…

vscode 顶部 Command Center,minimap

目录 vscode 顶部 Command Center 设置显示步骤: minimap设置 方法一:使用设置界面 方法二:使用命令面板 方法三:编辑 settings.json 文件 左侧目录树和编辑器字体不一致: OPEN EDITORS vscode 顶部 Command Center Visual Studio Code (VSCode) 中的 Command Ce…

高胜率TPS交易策略:轻松应对市场波动

原本基于美国经济数据&#xff0c;市场预期美联储不会那么迅速放宽货币政策&#xff0c;然而&#xff0c;最新美联储官员的表态却显著提升了市场对于加速降息的预期。只能说市场果然没有那么好预测呀&#xff0c;作为交易者&#xff0c;咱们只能不断提升自己的技术&#xff0c;…

掌握流程图设计:5款高效流程图软件推荐

在现代办公环境中&#xff0c;流程图制作软件是提高工作效率和组织能力的重要工具。无论是用于项目管理、业务流程优化&#xff0c;还是技术文档编写&#xff0c;流程图都能帮助我们更清晰地理解和传达复杂的信息。然而&#xff0c;面对市面上琳琅满目的流程图制作软件&#xf…

Java零工市场小程序如何改变自由职业者生活

如今&#xff0c;自由职业者越来越多&#xff0c;他们需要找到合适的工作机会&#xff0c;Java零工市场小程序&#xff0c;为自由职业者提供了一个方便、快捷的寻找工作机会的方式&#xff0c;这样一来&#xff0c;改变了自由职业者找寻工作的方式&#xff0c;也提高了他们的收…

【WPF】桌面程序开发之窗口的用户控件详解

使用Visual Studio开发工具&#xff0c;我们可以编写在Windows系统上运行的桌面应用程序。其中&#xff0c;WPF&#xff08;Windows Presentation Foundation&#xff09;项目是一种常见的选择。然而&#xff0c;对于初学者来说&#xff0c;WPF项目中xaml页面的布局设计可能是一…

Type-C接口桌面显示器的优势

随着科技的飞速发展&#xff0c;电子设备的连接性、便捷性和高效性成为了消费者关注的重点。在这个背景下&#xff0c;Type-C接口桌面显示器以其卓越的性能和广泛的兼容性&#xff0c;正逐步成为市场上的主流选择。本文将深入探讨Type-C接口桌面显示器的优势、应用场景、市场现…

【期刊】论文索引库-SCI\SSCI\IE\南大核心\北大核心\CSCD等

外文期刊检索 SCI SCI即《科学引文索引》(Science Citation Index),是由美国科学信息研究所(Institute for Scientific Information)创建于1961年,收录文献的作者、题目、源期刊、摘要、关键词,不仅可以从文献引证的角度评估文章的学术价值,还可以迅速方便地组建研究课…