1、引入依赖
<!--MQTT start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.4</version></dependency><!--MQTT end--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>
2、增加yml配置
spring:mqtt:username: testpassword: testurl: tcp://127.0.0.1:8080subClientId: singo_sub_client_id_888 #订阅 客户端idpubClientId: singo_pub_client_id_888 #发布 客户端idconnectionTimeout: 30keepAlive: 60
3、资源配置类
@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId;private String pubClientId;private int connectionTimeout;private int keepAlive;
}
注意启动类需要增加注解
@EnableConfigurationProperties(MqttConfigurationProperties.class)
4、MQTT配置类
@Configuration
public class MqttConfig {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;/*** 连接参数** @return*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout());options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive());options.setCleanSession(true); // 设置为false以便断线重连后恢复会话options.setAutomaticReconnect(true);return options;}/*** 连接工厂** @param options* @return*/@Beanpublic MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);return factory;}/*** 消息输入通道* 每次只有一个消息处理器可以消费消息。* 当前消息的处理完成之前,新消息需要排队等待,无法并行处理。* 默认是:单线程、顺序执行的* @return*/// @Bean// public DirectChannel mqttInputChannel() {// return new DirectChannel();// }/*** 支持多线程并发处理消息的输入通道** @return*/@Beanpublic ExecutorChannel mqttInputChannel() {return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 线程池大小可以调整}/*** 配置入站适配器** @param mqttClientFactory* @return*/@Beanpublic MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory);// adapter.addTopic("pub/300119110099"); 订阅主题,也可以放在初始化动态配置adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** 配置消息处理器** @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道public MessageHandler messageHandler() {return new MqttReceiverMessageHandler();}}
5、消息处理器配置
@Slf4j
@Component
public class MqttReceiverMessageHandler implements MessageHandler {@Autowiredprivate MqttMessageProcessingService mqttMessageProcessingService;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();log.info("线程名称:{},收到消息,主题:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());// log.info("收到消息主题:{}", headers.get("mqtt_receivedTopic").toString());// log.info("收到消息:{}", message.getPayload());// 消息保存到内存队列里面,定时批量入库,也可以在这里直接入库mqttMessageProcessingService.addMessage(message.getPayload().toString());}
}
6、消息主题缓存对象
@Component
public class MqttTopicStore {private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>();public ConcurrentHashMap<String, String> getTopics() {return topics;}
}
7、动态订阅数据库主题配置
@Slf4j
@Component
public class MqttInit {@Autowiredprivate MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;@Autowiredprivate MqttTopicStore mqttTopicStore;@PostConstructpublic void init() {subscribeAllTopics();}public void subscribeAllTopics() {// List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled();// for (MqttTopicConfig topic : topics) {// subscribeTopic(topic);// }log.info("===================>从数据库里获取并初始化订阅所有主题");List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100");topics.stream().forEach(t -> {messageDrivenChannelAdapter.addTopic(t);// 同时往MqttTopicStore.topics中增加一条记录用于缓存});}}
8、消息处理服务
@Service
public class MqttMessageProcessingService {@Autowiredprivate MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;@Autowiredprivate MqttTopicStore mqttTopicStore;// 内存队列,用于暂存消息private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();// 添加消息到队列public void addMessage(String message) {messageQueue.add(message);}/*** 可以放到定时任务里面去,注入后取队列方便维护* 定时任务,每5秒执行一次 ,建议2分钟一次 理想的触发间隔应略小于数据到达间隔,以确保及时处理和插入* 如果每 5 分钟收到一条数据,可以设置任务执行周期为4 分钟或更短,以便任务有足够的时间处理数据,同时减少积压的可能性。*/@Scheduled(fixedRate = 1 * 60 * 1000)public void batchInsertToDatabase() {System.out.println("定时任务执行中,当前队列大小:" + messageQueue.size());List<String> batch = new ArrayList<>();messageQueue.drainTo(batch, 500); // 一次性取最多500条消息if (!batch.isEmpty()) {// 批量插入数据库saveMessagesToDatabase(batch);}}private void saveMessagesToDatabase(List<String> messages) {// 假设这是批量插入逻辑System.out.println("批量插入数据库,条数:" + messages.size());for (String message : messages) {System.out.println("插入消息:" + message);}// 实际数据库操作代码}/*** 订阅与取消订阅定时任务*/public void subscribeAndUnsubscribeTask() {// 从数据库获取所有主题,正常状态、删除状态// 正常状态:判断mqttTopicStore.topics中是否存在,不存在则订阅,并在mqttTopicStore.topics中增加// 删除状态: 判断mqttTopicStore.topics中是否存在,存在则取消订阅,并在mqttTopicStore.topics中删除// messageDrivenChannelAdapter.addTopic(t);}
}
以上是简单的对接步骤,部分类、方法可以根据实际情况进行合并处理!!!!
9、定时任务
@Slf4j
@Configuration
@EnableScheduling
public class MqttJob {@Value("${schedule.enable}")private boolean enable;@Autowiredprivate MqttMessageProcessingService mqttMessageProcessingService;/*** 定时订阅与取消订阅主题,从共享主题对象MqttTopicStore里面取出主题列表,然后进行订阅或取消订阅* 每分钟一次*/public void subscribeAndUnsubscribe() {if (!enable) return;mqttMessageProcessingService.subscribeAndUnsubscribeTask();}/*** 定时处理队列里面的订阅消息,会有丢失风险,宕机时会丢失队列里面的消息* 每分钟一次 要考虑一次消息处理的时间;也可先不使用队列,每次收到消息直接实时入库,有性能问题时在启用*/public void batchSaveSubscribeMessage() {}}