文章目录
- 前言
- 一、MQTT是什么?
- 二、继承步骤
- 1.安装MQTT
- 2.创建项目,引入依赖
- 3. 对应步骤2的代码
- 3 测试
- 总结
- mqtt 启动后访问地址
前言
随着物联网的火热,MQTT的应用逐渐增多
曾经也有幸使用过mqtt,今天正好总结下MQTT的使用;
一、MQTT是什么?
可以把他理解为,也是一种mq消息,设计简单且轻量级,通讯报文开销小,占用的网络带宽和资源较少,适用于低带宽、不稳定网络环境下的通讯。
MQTT采用发布/订阅模式,分为发布者和订阅者两个角色,需要一个中介来协调发布者和订阅者之间的消息传递,这个中介就是MQTT代理(Broker)。
MQTT协议在物联网领域应用广泛,包括智能家居、工业自动化、智能交通系统等。
个人简单总结:
- 每个客户端可以订阅一个或者多个主题(发消息,收消息)
- 每个客户端不订阅主题,也可以发送主题消息(只接受消息,不发送消息)
- 客户端A发送消息给客户端B流程为:
客户端A>>>Broker>>>客户端B
---
前置条件:
a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
b: 客户端B 订阅主题
二、继承步骤
1.安装MQTT
这里直接采用windows版本,解压版,比较快
- 下载地址 MQTT-windows版本
- 解压后,在bin文件下执行运行命令
.\emqx console
- 访问MQTT管理页面
http://localhost:18083/#/
用户名密码admin/public
2.创建项目,引入依赖
大致分为如下步骤:
- yml配置 主题 用户名 密码
- 根据配置创建客户端实例,实例订阅主题
- 实现
MqttCallback
接口1. 重连处理 connectionLost 2. 消息接受处理 messageArrived 3. 消息发生成功处理 deliveryComplete
- 根据客户端信息发送某个主题的消息
3. 对应步骤2的代码
- yml配置
server:port: 8081
# 下面这里要看你自己的需求
customer:mqtt:broker: tcp://127.0.0.1:1883clientList:#发布客户端ID- clientId: nxys_service#监听主题 同时订阅多个主题使用 - 分割开subscribeTopic: mqtt/publish#用户名userName: admin#密码password: public#接受客户端ID- clientId: receive_service#监听主题 同时订阅多个主题使用 - 分割开subscribeTopic: mqtt/receive#用户名userName: admin#密码password: public
- 实例信息获取
/*** Mqtt配置类*/
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {/*** mqtt broker地址*/String broker;/*** 需要创建的MQTT客户端*/List<MqttClient> clientList;
}
/*** MQTT客户端*/
@Data
public class MqttClient {/*** 客户端ID*/private String clientId;/*** 监听主题*/private String subscribeTopic;/*** 用户名*/private String userName;/*** 密码*/private String password;
}
- 根据信息创建实例,订阅主题
/*** MQTT客户端创建*/
@Component
@Slf4j
public class MqttClientCreate {@Resourceprivate MqttClientManager mqttClientManager;@Autowiredprivate MqttConfig mqttConfig;/*** 创建MQTT客户端*/@PostConstructpublic void createMqttClient() {List<MqttClient> mqttClientList = mqttConfig.getClientList();for (MqttClient mqttClient : mqttClientList) {log.info("{}", mqttClient);//创建客户端,客户端ID:demo,回调类跟客户端ID一致mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());}}
}
/*** MQTT客户端管理类,如果客户端非常多后续可入redis缓存*/
@Slf4j
@Component
public class MqttClientManager {@Value("${customer.mqtt.broker}")private String mqttBroker;@Resourceprivate MqttCallBackContext mqttCallBackContext;/*** 存储MQTT客户端*/public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();public static MqttClient getMqttClientById(String clientId) {return MQTT_CLIENT_MAP.get(clientId);}/*** 创建mqtt客户端** @param clientId 客户端ID* @param subscribeTopic 订阅主题,可为空* @param userName 用户名,可为空* @param password 密码,可为空* @return mqtt客户端*/public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();if (null != userName && !"".equals(userName)) {connOpts.setUserName(userName);}if (null != password && !"".equals(password)) {connOpts.setPassword(password.toCharArray());}connOpts.setCleanSession(true);if (null != subscribeTopic && !"".equals(subscribeTopic)) {AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);if (null == callBack) {callBack = mqttCallBackContext.getCallBack("default");}callBack.setClientId(clientId);callBack.setConnectOptions(connOpts);client.setCallback(callBack);}//连接mqtt服务端brokerclient.connect(connOpts);// 订阅主题if (null != subscribeTopic && !"".equals(subscribeTopic)) {if (subscribeTopic.contains("-"))client.subscribe(subscribeTopic.split("-"));else
// if (!subscribeTopic.equals("mqtt/receive")){client.subscribe(subscribeTopic);}}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);} catch (MqttException e) {log.error("Create mqttClient failed!", e);}}
}
- 实现
MqttCallback
接口
/*** MQTT回调抽象类*/
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {private String clientId;private MqttConnectOptions connectOptions;public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public MqttConnectOptions getConnectOptions() {return connectOptions;}public void setConnectOptions(MqttConnectOptions connectOptions) {this.connectOptions = connectOptions;}/*** 失去连接操作,进行重连** @param throwable 异常*/@Overridepublic void connectionLost(Throwable throwable) {try {if (null != clientId) {if (null != dconnectOptions) {MqttClientManager.getMqttClientById(clientId).connect(connectOptions);} else {MqttClientManager.getMqttClientById(clientId).connect();}}} catch (Exception e) {log.error("{} reconnect failed!", e);}}/*** 接收订阅消息* @param topic 主题* @param mqttMessage 接收消息* @throws Exception 异常*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {String content = new String(mqttMessage.getPayload());handleReceiveMessage(topic, content);}/*** 消息发送成功** @param iMqttDeliveryToken toke*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("消息发送成功");}/*** 处理接收的消息* @param topic 主题* @param message 消息内容*/protected abstract void handleReceiveMessage(String topic, String message);
}
/*** 默认回调*/
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {/*** @param topic 主题* @param message 消息内容*/@Overrideprotected void handleReceiveMessage(String topic, String message) {log.info("接收到主题---{}", topic);log.info("接收到消息---{}", message);// 你自己的消息处理业务}
}
/*** MQTT订阅回调环境类*/
@Component
@Slf4j
public class MqttCallBackContext {private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();/*** 默认构造函数** @param callBackMap 回调集合*/public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {this.callBackMap.clear();this.callBackMap.putAll(callBackMap);}/*** 获取MQTT回调类** @param clientId 客户端ID* @return MQTT回调类*/public AbsMqttCallBack getCallBack(String clientId) {return this.callBackMap.get(clientId);}
}
- 发送消息
@RestController
public class SendController {@ResourceMqttClientManager mqttClientManager;@RequestMapping("/sendMessage")public String sendMessage(String topic){try {MqttMessage mqttMessage = new MqttMessage("你好".getBytes());mqttClientManager.getMqttClientById("nxys_service").publish(topic,mqttMessage);return "发送成功";} catch (Exception e) {e.printStackTrace();return "发送失败";}}
}
3 测试
- 启动订阅,查看MQTT 管理页面
- 测试发送消息,查看发送情况,接受情况
http://localhost:8081/sendMessage?topic=mqtt/receive
总结
文中涉及的所有代码: MQTT-Demo
mqtt 启动后访问地址
http://localhost:18083/#/
- 用户名/密码:
- admin/public
- 每个客户端可以订阅一个或者多个主题
- 每个客户端不订阅主题,也可以发送主题消息
- 客户端A发送消息给客户端B流程为:
客户端A>>>Broker>>>客户端B
---
前置条件:
a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
b: 客户端B 订阅主题
mqtt启动命令
在bin目录下,cmd 执行
.\emqx console