企业级基于SpringBoot的MQTT的构建和使用

基于SpringBoot的MQTT配置及使用

首先要使用EMQX搭建一个MQTT服务器,参考文档:EMQX快速开始

本着开源分享的观点,闲话不多说,直接上代码

导入Maven

        <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.13</version></dependency>

配置文件

spring:mqtt:#MQTT服务端地址,如果是集群,用逗号隔开url: tcp://localhost:1883#用户名username: root#密码password: 123#clientId代表该服务挂起时的名字,在MQTT服务端clientId不可重复clientId: MqttTest#MQTT默认的消息订阅主题,可配置多个,‘#’为通配符,详情可自己看一下MQTT文档的通配符说明defaultTopic:- server1/#- server2/#
# 默认Qos,关于消息等级可以查看文档关于Qos的不同等级对信息的约束力度,2是最高,有且只接受一次,由于我的项目中对数据要求很高,所以不考虑资源消耗的情况下,我一般采用2,此处有几个默认主题,就要设置几个Qos,按照顺序对应每个主题的等级defaultQos:- 2- 2

配置类

该类为一个Config类,用于接收上一步在application.yml配置文件中配置的配置信息

@Component
@Slf4j
@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {private String username;private String password;private String url;private String clientId;private List<String> defaultTopic;private List<Integer> defaultQos;
}

MqttBO类

该类用于构建发送信息的对象

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqttBO {
//    int qos,boolean retained,String topic,String message
// 发送信息的消息等级,要求高的就是2private Integer qos;// 是否信息保留private Boolean retained;// 发送信息的主题private String topic;// 发送信息的主体private byte[] message;
}

MQTTClient类

由于在springboot项目中,我们只想创建一个单例的Mqttclient进行连接,所以我们创建一个类似于工厂的类,在工厂Bean加载后,创建单例的client,并做连接,订阅,断开等方法的支持,该client加载完成后,在调用时,Spring会自动注入工厂创建连接的client。

/*** @Description MQTT客户端实现工厂,该类主要做工厂生成client,在Bean加载后,创建单例的client。在所有MQTT的Bean中第一位加载* 同时做一些连接,订阅,断开等方法的支持*/
@Slf4j
@Component
public class MyMqttClient {private MqttClient client;@Autowiredprivate MqttProperties mqttProperties;/*** 在Bean加载后,创建单例的client* PostConstruct会在该Bean加载后执行,初始化client*/@PostConstructpublic void init() {try {this.client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());log.info("MQTT客户端初始化成功");} catch (MqttException e) {log.error("MQTT客户端初始化失败: {}", e.getMessage(), e);throw new RuntimeException(e);}}// 连接public synchronized void connect(MqttCallBack mqttCallBack) {try {if (client != null && client.isConnected()) {log.info("发现旧连接,正在断开...");client.disconnectForcibly(); // 强制断开旧连接}MqttConnectOptions options = createConnectOptions();client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.setCallback(mqttCallBack);client.connect(options);log.info("MQTT连接成功");} catch (MqttSecurityException e) {log.error("MQTT安全异常: {}", e.getMessage(), e);} catch (MqttPersistenceException e) {log.error("MQTT持久化异常: {}", e.getMessage(), e);} catch (MqttException e) {log.error("MQTT连接失败: {}", e.getMessage(), e);}}// 创建连接选项private MqttConnectOptions createConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());// 保留会话options.setCleanSession(false);// 自动重连,无需在连接断联回调方法中处理重连options.setAutomaticReconnect(true);// 连接超时时间(秒)options.setConnectionTimeout(10);// 心跳间隔options.setKeepAliveInterval(30);// 遗嘱消息options.setWill("willTopic", "客户端已断开".getBytes(), 2, false);return options;}// 订阅public synchronized void subscribe() {try {String[] topics = mqttProperties.getDefaultTopic().toArray(new String[0]);int[] qoses = mqttProperties.getDefaultQos().stream().mapToInt(Integer::valueOf).toArray();client.subscribe(topics, qoses);log.info("订阅主题成功: {}", String.join(", ", topics));} catch (MqttException e) {log.error("订阅主题失败: {}", e.getMessage(), e);}}/*** 发布消息** @param mqttBO 消息对象*/public synchronized void publish(MqttBO mqttBO) {if (mqttBO == null || mqttBO.getTopic() == null || mqttBO.getMessage() == null) {log.warn("发布消息失败: 参数不完整");return;}MqttMessage mqttMessage = new MqttMessage();if (mqttBO.getQos()==null){mqttBO.setQos(2); //默认2}mqttMessage.setQos(mqttBO.getQos());mqttMessage.setRetained(mqttBO.getRetained());mqttMessage.setPayload(mqttBO.getMessage());MqttTopic mqttTopic = client.getTopic(mqttBO.getTopic());try {MqttDeliveryToken token = mqttTopic.publish(mqttMessage);token.waitForCompletion();log.info("消息发布成功: Topic={}, Payload={}", mqttBO.getTopic(), new String(mqttBO.getMessage()));} catch (MqttException e) {log.error("发布消息失败: {}", e.getMessage(), e);}}// 断开连接public synchronized void disConnect() {try {if (client != null && client.isConnected()) {client.disconnect();client.close(); // 确保释放资源log.info("成功断开连接并释放资源");}} catch (MqttException e) {log.error("断开连接失败: {}", e.getMessage(), e);}}// 重新连接public synchronized void reconnect() {try {if (!client.isConnected()) {log.info("尝试重新连接...");client.connect(createConnectOptions());log.info("重新连接成功");}} catch (MqttException e) {log.error("重新连接失败: {}", e.getMessage(), e);}}
}

MQTT回调实现类

要想实现接收到发送到mqtt中的信息,我们需要实现回调接口。

/*** @Description MQTT的回调函数,此处在接收回调里仅作判断,具体逻辑放在mqttService里面*/
@Slf4j
@Component
@DependsOn("myMqttClient")
public class MqttCallBack implements MqttCallbackExtended {// 在回调中,我们的业务逻辑都放在mqttService里面@Autowiredprivate MqttService mqttService;// 注入已有的单例Bean@Autowiredprivate MyMqttClient client;@Autowiredprivate MqttProperties mqttProperties;/*** 客户端断开连接的回调,断开后,mqtt开启了断联自动重连机制,由于在创建连接时,我们开启了自动重连机制,此处无需处理重连,如果有其他需求可以改写*/@Overridepublic void connectionLost(Throwable throwable) {log.info("与服务器断开连接,尝试重新连接...");
//        断开后,mqtt开启了断联自动重连机制,此处无需处理}/*** 接受到信息回调* @param s* @param mqttMessage* @throws Exception*/@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {//    此处我们将接收到的信息传递给mqttService处理,其中s为发送到我们这里的具体地址,MqttMessage为Mqtt返回的对象,但是返回的是字节数组,需要自己转,如果传回的是Json,我们需要转化String message = new String(MqttMessage.getPayload());log.info("接收到信息: Topic={}, Payload={}", s, message);// 然后下一步我们可以用FastJson或者其他的Json工具对接收到的json进行处理}/*** 通知客户端某条消息已经成功发送到 MQTT 服务器并完成交付。此处没有特殊需求无需处理* @param iMqttDeliveryToken*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}/*** 连接成功后回调,由于我们开启了会话保留机制,在断线后会保留会话的信息,但是首次连接需要订阅主题。* @param reconnect* @param serverURI*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {// 首次连接成功时订阅if (!reconnect) {client.subscribe();log.info("首次连接,订阅主题成功");}else{log.info("重新连接成功");}}
}

MQTTConfig类

MQTTConfig类是为了解耦设立,异步连接mqtt,在项目启动时,会调用mqttInitRunner方法,进行连接。

/*** @Description mqttConfig是为了解耦设立,异步连接mqtt*/
@Configuration
@Slf4j
public class MqttConfig {@Autowiredprivate MqttCallBack mqttCallBack;@Autowiredprivate MyMqttClient client;@Beanpublic ApplicationRunner mqttInitRunner() {return args -> {try {client.connect(mqttCallBack);
//                连接后,此处会回调connectComplete方法去进行订阅主题} catch (Exception e) {log.error("MQTT 初始化失败", e);}};}}

如何使用

导入Maven后,将上述代码复制进项目后,直接启动即可,该配置不唯一,根据项目需求可以更改。
在有的需求中,需要对设备去请求信息,这时就要发送到mqtt中请求信息,然后设备订阅mqtt主题,在MqttService中,注入client,使用client.publish(mqttBO)即可发送到mqtt请求信息,设备接收后,返回信息到Mqtt中,接收信息需要在MqttCallBack中的messageArrived方法中处理。

一切的返回都会在MqttCallBack的messageArrived方法中返回,具体逻辑在MqttService中处理,如果返回的是json,需要自己解析。

ps:
关于技术方案的构建,不同业务场景往往存在多种实现路径,本文所述仅为其中一种实践方案。若读者在具体实施过程中遇到技术选型或架构设计方面的疑问,欢迎在评论区留言探讨,笔者将结合过往经验给予针对性建议。

需要特别说明的是,文中代码源自笔者过往工作实践中的项目积累,应用于真实的企业级开发中,现经脱敏和简化处理后开源,以供读者借鉴。
鉴于当前技术生态的演进态势,传统Java技术栈的市场空间逐渐收窄,该篇文章可能是笔者的封笔作。技术浪潮奔涌不息,青山不改,绿水长流,期待与诸位在更广阔的数字化领域相逢,共同见证科技赋能未来的无限可能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/41419.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

K8S学习之基础五十:k8s中pod时区问题并通过kibana查看日志

k8s中pod默认时区不是中国的&#xff0c;挂载一个时区可以解决 vi pod.yaml apiVersion: v1 kind: Pod metadata:name: counter spec:containers:- name: countimage: 172.16.80.140/busybox/busybox:latestimagePullPolicy: IfNotPresentargs: [/bin/sh,-c,i0;while true;do …

创新前沿 | 接管主机即刻增量CDP备份,高效保障接管期间业务安全!

科力锐创新前沿系列 接管主机增量CDP备份 高效保障接管业务安全 当核心系统遭遇系统故障或误操作导致数据逻辑损毁等&#xff0c;往往需要将生产业务主机接管起来&#xff0c;继续对外提供服务&#xff0c;保障业务连续性。 然而&#xff0c;你的接管主机真的安全吗?一旦接…

Android平台毫秒级低延迟HTTP-FLV直播播放器技术探究与实现

一、前言 在移动互联网蓬勃发展的今天&#xff0c;视频播放功能已成为众多Android应用的核心特性之一。面对多样化的视频格式和传输协议&#xff0c;开发一款高效、稳定的视频播放器是许多开发者追求的目标。FLV&#xff08;Flash Video&#xff09;格式&#xff0c;尽管随着H…

STL之list

1. list的介绍和使用 1.1 list的介绍 list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。list的底层是带头双向循环链表结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&#xff0c;在节点中通过指针指向 其…

26考研——查找_树形查找_二叉排序树(BST)(7)

408答疑 文章目录 三、树形查找二叉排序树&#xff08;BST&#xff09;二叉排序树中结点值之间的关系二叉树形查找二叉排序树的查找过程示例 向二叉排序树中插入结点插入过程示例 构造二叉排序树的过程构造示例 二叉排序树中删除结点的操作情况一&#xff1a;被删除结点是叶结点…

C++异常处理完全指南:从原理到实战

文章目录 异常的基本概念基本异常抛出与捕获多类型异常捕获异常重新抛出异常安全异常规范&#xff08;noexcept&#xff09;栈展开与析构标准库异常总结 异常的基本概念 异常是程序运行时发生的非预期事件&#xff08;如除零、内存不足&#xff09;。C通过try、catch和throw提…

汽车方向盘开关功能测试的技术解析

随着汽车智能化与电动化的发展&#xff0c;方向盘开关的功能日益复杂化&#xff0c;从传统的灯光、雨刷控制到智能语音、自动驾驶辅助等功能的集成&#xff0c;对开关的可靠性、耐久性及安全性提出了更高要求。本文结合北京沃华慧通测控技术有限公司&#xff08;以下简称“慧通…

matplotlib——南丁格尔玫瑰

南丁格尔玫瑰图&#xff08;Nightingale Rose Chart&#xff09;&#xff0c;是一种特殊形式的柱状图&#xff0c;它以南丁格尔&#xff08;Florence Nightingale&#xff09;命名&#xff0c;她在1858年首次使用这种图表来展示战争期间士兵死亡原因的数据。 它将数据绘制在极坐…

【大模型基础_毛玉仁】4.4 低秩适配方法

目录 4.4 低秩适配方法4.4.1 LoRA1&#xff09;方法实现2&#xff09;参数效率 4.4.2 LoRA 变体1&#xff09;打破低秩瓶颈&#xff08;例ReLoRA&#xff09;2&#xff09;动态秩分配&#xff08;例AdaLoRA&#xff09;3&#xff09;训练过程优化&#xff08;例DoRA&#xff09…

融合YOLO11与行为树的人机协作智能框架:动态工效学优化与自适应安全决策

人工智能技术要真正发挥其价值&#xff0c;必须与生产生活深度融合&#xff0c;为产业发展和人类生活带来实际效益。近年来&#xff0c;基于深度学习的机器视觉技术在工业自动化领域取得了显著进展&#xff0c;其中YOLO&#xff08;You Only Look Once&#xff09;算法作为一种…

Java为什么要使用线程池?

前言1.对线程的管理更加的规范化2.降低创建线程和销毁线程的开销 前言 之前对于Java线程池的理解&#xff0c;一直停留在&#xff1a;对于Java中的多线程机制来说&#xff0c;如果不使用线程池的话&#xff0c;线程的使用就会变得杂乱无章。这一步。一直没有深入去理解为什么其…

告别分库分表,时序数据库 TDengine 解锁燃气监控新可能

达成效果&#xff1a; 从 MySQL 迁移至 TDengine 后&#xff0c;设备数据自动分片&#xff0c;运维更简单。 列式存储可减少 50% 的存储占用&#xff0c;单服务器即可支撑全量业务。 毫秒级漏气报警响应时间控制在 500ms 以内&#xff0c;提升应急管理效率。 新架构支持未来…

TDengine 3.3.2.0 集群报错 Post “http://buildkitsandbox:6041/rest/sql“

原因&#xff1a; 初始化时处于内网环境下&#xff0c;Post “http://buildkitsandbox:6041/rest/sql“ 无法访问 修复&#xff1a; vi /etc/hosts将buildkitsandbox映射为本机节点 外网环境下初始化时没有该问题

【Linux】POSIX信号量与基于环形队列的生产消费者模型

目录 一、POSIX信号量&#xff1a; 接口&#xff1a; 二、基于环形队列的生产消费者模型 环形队列&#xff1a; 单生产单消费实现代码&#xff1a; RingQueue.hpp&#xff1a; main.cc&#xff1a; 多生产多消费实现代码&#xff1a; RingQueue.hpp&#xff1a; main.…

【13】Ajax爬取案例实战

目录 一、准备工作 二、爬取目标 三、初步探索&#xff1a;如何判断网页是经js渲染过的&#xff1f; 四、爬取列表页 4.1 分析Ajax接口逻辑 4.2 观察响应的数据 4.3 代码实现 &#xff08;1&#xff09;导入库 &#xff08;2&#xff09;定义一个通用的爬取方法…

嵌入式八股RTOS与Linux---网络系统篇

前言 关于计网的什么TCP三次握手 几层模型啊TCP报文啥的不在这里讲,会单独分成一个计算机网络模块   这里主要介绍介绍lwip和socket FreeRTOS下的网络接口–移植LWIP 实际上FreeRTOS并不自带网络接口,我们一般会通过移植lwip协议栈让FreeRTOS可以通过网络接口收发数据,具体可…

全分辨率免ROOT懒人精灵-自动化编程思维-设计思路-实战训练

全分辨率免ROOT懒人精灵-自动化编程思维-设计思路-实战训练 1.2025新版懒人精灵-实战红果搜索关键词刷视频&#xff1a;https://www.bilibili.com/video/BV1eK9kY7EWV 2.懒人精灵-全分辨率节点识别&#xff08;红果看广告领金币小实战&#xff09;&#xff1a;https://www.bili…

【更新中】【React】基础版React + Redux实现教程(Vite + React + Redux + TypeScript)

本项目是一个在react中&#xff0c;使用 redux 管理状态的基础版实现教程&#xff0c;用简单的案例练习redux的使用&#xff0c;旨在帮助学习 redux 的状态管理机制&#xff0c;包括 store、action、reducer、dispatch 等核心概念。 项目地址&#xff1a;https://github.com/Yv…

【MySQL】从零开始:掌握MySQL数据库的核心概念(四)

人们之所以不愿改变&#xff0c;是因为害怕未知。但历史唯一不变的事实&#xff0c;就是一切都会改变。 前言 这是我自己学习mysql数据库的第四篇博客总结。后期我会继续把mysql数据库学习笔记开源至博客上。 上一期笔记是关于mysql数据库的表格约束&#xff0c;没看的同学可以…

AP 场景架构设计(一) :OceanBase 读写分离策略解析

说明&#xff1a;本文内容对应的是 OceanBase 社区版&#xff0c;架构部分不涉及企业版的仲裁副本功能。OceanBase社区版和企业版的能力区别详见&#xff1a; 官网链接。 概述​ 当两种类型的业务共同运行在同一个数据库集群上时&#xff0c;这对数据库的配置等条件提出了较高…