先下载MQTTX: MQTTX: Your All-in-one MQTT Client Toolbox
使用线上免费的MQTTX BROKER:The Free Global Public MQTT Broker | Try Now | EMQ
打开MQTTX,创建连接,点击NEW SUBSCRIPTION,创建一个主题,这里使用test/topic,在下面Json中填写配置好的主题,点击发送测试OK。订阅者,建立一个主题:
package com.jasonhong.application.media.mq.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttSubscriber implements MqttCallback { public static void main(String[] args) { String brokerUrl = "tcp://broker.hivemq.com:1883"; // 使用公共MQTT代理或你的MQTT代理地址
// String brokerUrl = "tcp://localhost:1883"; // Mosquitto代理地址和端口String clientId = "JavaSubscriber"; String topic = "test/topic"; int qos = 2; try (MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence())) { client.setCallback(new MqttSubscriber()); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); client.connect(connOpts); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } @Override public void connectionLost(Throwable cause) { System.out.println("Connection lost"); cause.printStackTrace(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Message arrived: " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivery complete"); }
}
发布者发送给一个消息
package com.jasonhong.application.media.mq.mqtt;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttPublisher { public static void main(String[] args) { String brokerUrl = "tcp://broker.hivemq.com:1883"; // 使用公共MQTT代理或你的MQTT代理地址
// String brokerUrl = "tcp://localhost:1883"; // Mosquitto代理地址和端口String clientId = "JavaPublisher"; String topic = "test/topic"; int qos = 2; String content = "Hello, MQTT!"; try (MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence())) { MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); client.connect(connOpts); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(topic, message); System.out.println("Message published: " + content); } catch (MqttException e) { e.printStackTrace(); } }
}