基于SpringBoot的MQTT配置及使用
首先要使用EMQX搭建一个MQTT服务器,参考文档:EMQX快速开始
本着开源分享的观点,闲话不多说,直接上代码
导入Maven
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.13</version></dependency>
配置文件
spring:mqtt:#MQTT服务端地址,如果是集群,用逗号隔开url: tcp://localhost:1883#用户名username: root#密码password: 123#clientId代表该服务挂起时的名字,在MQTT服务端clientId不可重复clientId: MqttTest#MQTT默认的消息订阅主题,可配置多个,‘#’为通配符,详情可自己看一下MQTT文档的通配符说明defaultTopic:- server1/#- server2/#
# 默认Qos,关于消息等级可以查看文档关于Qos的不同等级对信息的约束力度,2是最高,有且只接受一次,由于我的项目中对数据要求很高,所以不考虑资源消耗的情况下,我一般采用2,此处有几个默认主题,就要设置几个Qos,按照顺序对应每个主题的等级defaultQos:- 2- 2
配置类
该类为一个Config类,用于接收上一步在application.yml配置文件中配置的配置信息
@Component
@Slf4j
@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {private String username;private String password;private String url;private String clientId;private List<String> defaultTopic;private List<Integer> defaultQos;
}
MqttBO类
该类用于构建发送信息的对象
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqttBO {
// int qos,boolean retained,String topic,String message
// 发送信息的消息等级,要求高的就是2private Integer qos;// 是否信息保留private Boolean retained;// 发送信息的主题private String topic;// 发送信息的主体private byte[] message;
}
MQTTClient类
由于在springboot项目中,我们只想创建一个单例的Mqttclient进行连接,所以我们创建一个类似于工厂的类,在工厂Bean加载后,创建单例的client,并做连接,订阅,断开等方法的支持,该client加载完成后,在调用时,Spring会自动注入工厂创建连接的client。
/*** @Description MQTT客户端实现工厂,该类主要做工厂生成client,在Bean加载后,创建单例的client。在所有MQTT的Bean中第一位加载* 同时做一些连接,订阅,断开等方法的支持*/
@Slf4j
@Component
public class MyMqttClient {private MqttClient client;@Autowiredprivate MqttProperties mqttProperties;/*** 在Bean加载后,创建单例的client* PostConstruct会在该Bean加载后执行,初始化client*/@PostConstructpublic void init() {try {this.client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());log.info("MQTT客户端初始化成功");} catch (MqttException e) {log.error("MQTT客户端初始化失败: {}", e.getMessage(), e);throw new RuntimeException(e);}}// 连接public synchronized void connect(MqttCallBack mqttCallBack) {try {if (client != null && client.isConnected()) {log.info("发现旧连接,正在断开...");client.disconnectForcibly(); // 强制断开旧连接}MqttConnectOptions options = createConnectOptions();client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.setCallback(mqttCallBack);client.connect(options);log.info("MQTT连接成功");} catch (MqttSecurityException e) {log.error("MQTT安全异常: {}", e.getMessage(), e);} catch (MqttPersistenceException e) {log.error("MQTT持久化异常: {}", e.getMessage(), e);} catch (MqttException e) {log.error("MQTT连接失败: {}", e.getMessage(), e);}}// 创建连接选项private MqttConnectOptions createConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());// 保留会话options.setCleanSession(false);// 自动重连,无需在连接断联回调方法中处理重连options.setAutomaticReconnect(true);// 连接超时时间(秒)options.setConnectionTimeout(10);// 心跳间隔options.setKeepAliveInterval(30);// 遗嘱消息options.setWill("willTopic", "客户端已断开".getBytes(), 2, false);return options;}// 订阅public synchronized void subscribe() {try {String[] topics = mqttProperties.getDefaultTopic().toArray(new String[0]);int[] qoses = mqttProperties.getDefaultQos().stream().mapToInt(Integer::valueOf).toArray();client.subscribe(topics, qoses);log.info("订阅主题成功: {}", String.join(", ", topics));} catch (MqttException e) {log.error("订阅主题失败: {}", e.getMessage(), e);}}/*** 发布消息** @param mqttBO 消息对象*/public synchronized void publish(MqttBO mqttBO) {if (mqttBO == null || mqttBO.getTopic() == null || mqttBO.getMessage() == null) {log.warn("发布消息失败: 参数不完整");return;}MqttMessage mqttMessage = new MqttMessage();if (mqttBO.getQos()==null){mqttBO.setQos(2); //默认2}mqttMessage.setQos(mqttBO.getQos());mqttMessage.setRetained(mqttBO.getRetained());mqttMessage.setPayload(mqttBO.getMessage());MqttTopic mqttTopic = client.getTopic(mqttBO.getTopic());try {MqttDeliveryToken token = mqttTopic.publish(mqttMessage);token.waitForCompletion();log.info("消息发布成功: Topic={}, Payload={}", mqttBO.getTopic(), new String(mqttBO.getMessage()));} catch (MqttException e) {log.error("发布消息失败: {}", e.getMessage(), e);}}// 断开连接public synchronized void disConnect() {try {if (client != null && client.isConnected()) {client.disconnect();client.close(); // 确保释放资源log.info("成功断开连接并释放资源");}} catch (MqttException e) {log.error("断开连接失败: {}", e.getMessage(), e);}}// 重新连接public synchronized void reconnect() {try {if (!client.isConnected()) {log.info("尝试重新连接...");client.connect(createConnectOptions());log.info("重新连接成功");}} catch (MqttException e) {log.error("重新连接失败: {}", e.getMessage(), e);}}
}
MQTT回调实现类
要想实现接收到发送到mqtt中的信息,我们需要实现回调接口。
/*** @Description MQTT的回调函数,此处在接收回调里仅作判断,具体逻辑放在mqttService里面*/
@Slf4j
@Component
@DependsOn("myMqttClient")
public class MqttCallBack implements MqttCallbackExtended {// 在回调中,我们的业务逻辑都放在mqttService里面@Autowiredprivate MqttService mqttService;// 注入已有的单例Bean@Autowiredprivate MyMqttClient client;@Autowiredprivate MqttProperties mqttProperties;/*** 客户端断开连接的回调,断开后,mqtt开启了断联自动重连机制,由于在创建连接时,我们开启了自动重连机制,此处无需处理重连,如果有其他需求可以改写*/@Overridepublic void connectionLost(Throwable throwable) {log.info("与服务器断开连接,尝试重新连接...");
// 断开后,mqtt开启了断联自动重连机制,此处无需处理}/*** 接受到信息回调* @param s* @param mqttMessage* @throws Exception*/@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {// 此处我们将接收到的信息传递给mqttService处理,其中s为发送到我们这里的具体地址,MqttMessage为Mqtt返回的对象,但是返回的是字节数组,需要自己转,如果传回的是Json,我们需要转化String message = new String(MqttMessage.getPayload());log.info("接收到信息: Topic={}, Payload={}", s, message);// 然后下一步我们可以用FastJson或者其他的Json工具对接收到的json进行处理}/*** 通知客户端某条消息已经成功发送到 MQTT 服务器并完成交付。此处没有特殊需求无需处理* @param iMqttDeliveryToken*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}/*** 连接成功后回调,由于我们开启了会话保留机制,在断线后会保留会话的信息,但是首次连接需要订阅主题。* @param reconnect* @param serverURI*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {// 首次连接成功时订阅if (!reconnect) {client.subscribe();log.info("首次连接,订阅主题成功");}else{log.info("重新连接成功");}}
}
MQTTConfig类
MQTTConfig类是为了解耦设立,异步连接mqtt,在项目启动时,会调用mqttInitRunner方法,进行连接。
/*** @Description mqttConfig是为了解耦设立,异步连接mqtt*/
@Configuration
@Slf4j
public class MqttConfig {@Autowiredprivate MqttCallBack mqttCallBack;@Autowiredprivate MyMqttClient client;@Beanpublic ApplicationRunner mqttInitRunner() {return args -> {try {client.connect(mqttCallBack);
// 连接后,此处会回调connectComplete方法去进行订阅主题} catch (Exception e) {log.error("MQTT 初始化失败", e);}};}}
如何使用
导入Maven后,将上述代码复制进项目后,直接启动即可,该配置不唯一,根据项目需求可以更改。
在有的需求中,需要对设备去请求信息,这时就要发送到mqtt中请求信息,然后设备订阅mqtt主题,在MqttService中,注入client,使用client.publish(mqttBO)即可发送到mqtt请求信息,设备接收后,返回信息到Mqtt中,接收信息需要在MqttCallBack中的messageArrived方法中处理。
注
一切的返回都会在MqttCallBack的messageArrived方法中返回,具体逻辑在MqttService中处理,如果返回的是json,需要自己解析。
ps:
关于技术方案的构建,不同业务场景往往存在多种实现路径,本文所述仅为其中一种实践方案。若读者在具体实施过程中遇到技术选型或架构设计方面的疑问,欢迎在评论区留言探讨,笔者将结合过往经验给予针对性建议。
需要特别说明的是,文中代码源自笔者过往工作实践中的项目积累,应用于真实的企业级开发中,现经脱敏和简化处理后开源,以供读者借鉴。
鉴于当前技术生态的演进态势,传统Java技术栈的市场空间逐渐收窄,该篇文章可能是笔者的封笔作。技术浪潮奔涌不息,青山不改,绿水长流,期待与诸位在更广阔的数字化领域相逢,共同见证科技赋能未来的无限可能。