ActiveMQ + MQTT 集群搭建(虚机版本) + Springboot使用配置

文章目录

  • 前言
  • 一、ActiveMQ、 MQTT是什么?
    • 1.ActiveMQ介绍
    • 2.MQTT介绍
  • 二、集群搭建步骤
    • 1.下载apache-activemq-5.15.12-bin.tar.gz
    • 2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)
    • 3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:
    • 4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:
    • 5.分别到bin目录下运行activemq文件启动activemq,命令如下:
    • 6.全部节点启动后,进入web端界面查询集群配置情况:
  • 三、项目连接配置步骤
    • 1.Activemq连接服务配置:
    • 2.MQTT生产者连接配置:
    • 3.MQTT消费者连接配置:
    • 4.项目使用:
  • 总结


前言

随着技术的不断迭代,在分布式系统中应用消息组件进行通信已经是非常常见的方式,而为了保障消息中间件的高可用性就需要对中间件进行集群化部署,这是应用程序发展的必经之路。


一、ActiveMQ、 MQTT是什么?

1.ActiveMQ介绍

ActiveMQ官网
ActiveMQ是一个开源的、基于Java的消息中间件(Message Oriented Middleware,MOM)实现。它提供了可靠的异步消息传递的功能,用于在分布式系统中进行应用程序之间的通信。

以下是ActiveMQ的一些主要特点和功能:

1.1、 异步消息传递:ActiveMQ支持发布-订阅和点对点模式的消息传递。应用程序可以通过发送和接收消息来进行异步通信。

1.2、持久化和持久订阅:ActiveMQ可以将消息持久化到磁盘,以确保即使在消息发送者和接收者之间的断开连接或重启后,消息也能被正确接收。

1.3、 多种消息传递模式:ActiveMQ支持多种消息传递模式,包括点对点队列、主题订阅和点对点回复等。

1.4、基于JMS标准:ActiveMQ完全支持Java消息服务(JMS)规范,是JMS的一种实现。JMS提供了一系列的API和协议,用于在Java应用程序之间进行消息传递。

1.5、高可用性和故障转移:ActiveMQ支持故障转移和高可用性,可以通过配置多个broker实现自动故障转移和消息备份。

1.6、多种协议支持:ActiveMQ支持多种协议,如AMQP、STOMP、OpenWire、MQTT等。这使得ActiveMQ可以与不同的客户端和应用程序进行集成和通信。

1.7、 插件体系结构:ActiveMQ具有可扩展的插件体系结构,允许开发人员根据需求添加自定义功能和扩展。

1.8、可视化管理工具:ActiveMQ提供了可视化的管理界面,用于监控和管理消息队列、主题、连接等。

作为一种成熟而强大的消息中间件解决方案,ActiveMQ被广泛用于构建可靠的分布式系统、实现异步通信、实现解耦和提高应用程序的可伸缩性等场景。

2.MQTT介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级、开放、简单的消息传输协议,专门针对物联网(IoT)领域设计。它具有低带宽和低功耗的特点,适用于在资源受限的设备上进行可靠的通信。

以下是MQTT协议的一些关键特点:

1.1、 轻量级:MQTT协议设计简单,通信报文开销小,传输数据量较小,适用于带宽有限的网络环境,能够满足物联网设备的资源限制。

1.2、发布/订阅模式:MQTT采用发布/订阅模式,包含发送消息的发布者和接收消息的订阅者。发布者将消息发布到特定的主题上,而订阅者通过订阅感兴趣的主题来接收消息。

1.3、QoS支持:MQTT支持三种不同的服务质量(QoS)级别:QoS 0(至多一次传输)、QoS 1(至少一次传输)和QoS 2(恰好一次传输)。这种级别的支持确保了消息的可靠性和传递保证。

1.4、消息保留:MQTT支持在特定主题上保留最新的消息。这意味着当订阅者订阅一个主题时,它将立即接收到最新的保留消息,而不仅仅是实时发送的消息。

1.5、心跳机制:MQTT协议定义了心跳机制,通过发送心跳报文,保持客户端和代理服务器之间的连接有效性。如果客户端长时间没有发送心跳,代理服务器将断开连接。

1.6、安全性支持:MQTT提供了基于TLS/SSL的加密和身份验证机制,以确保消息的机密性和安全性。

1.7、广泛的应用:MQTT广泛应用于物联网领域,例如传感器网络、远程监测、智能家居、工业自动化等。

MQTT协议的轻量级和简单性使得它成为连接大量设备和传输数据的理想选择,尤其是在资源受限的物联网环境中。它以其可靠性、灵活性和互通性在物联网行业得到了广泛应用。


二、集群搭建步骤

1.下载apache-activemq-5.15.12-bin.tar.gz

官网下载地址

2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)

注意:三个节点是公平节点。一开始我是想做集群节点clusters做数据分发,然后master和slave做主从的。后面发现存在问题,在资源较少情况下clusters、clusters、slave都为单节点的情况下,clusters一挂掉,集群关系就破裂了,没有节点给master和slave做数据分发了,这样的配置不友好。
于是我就把三个节点配置成了平等节点,任何节点宕机都能正常运行。

3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:

3.1、配置默认的传输协议OpenWire 和 支持硬件的传输协议MQTT;
3.2、配置网络代理networkConnectors,做节点间数据传输;
3.3、duplex设置为true,则一个连接上可以双向流动消息(双工连接),默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接);
3.4、修改三个节点的brokerName为localhost_clusters、localhost_master、localhost_slave;

<transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- 配置网络代理,cluster 节点需要与 master 跟 slave 进行穿透 --><networkConnectors><networkConnector name="network-clusters" uri="static:(tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)" duplex="true" /></networkConnectors><transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:2884?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- 配置网络代理,master 节点需要与 cluster 跟 slave 进行穿透 --><networkConnectors><networkConnector name="network-master" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61618)" duplex="true" /></networkConnectors><transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:2885?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- 配置网络代理,slave 节点需要与 master 跟 cluster 进行穿透 --><networkConnectors><networkConnector name="network-slave" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617)" duplex="true" /></networkConnectors>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_clusters" dataDirectory="${activemq.data}"><broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_master" dataDirectory="${activemq.data}"><broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_slave" dataDirectory="${activemq.data}">

4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8161"/>
</bean><bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8171"/>
</bean><bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8181"/>
</bean>

5.分别到bin目录下运行activemq文件启动activemq,命令如下:

sh activemq start

6.全部节点启动后,进入web端界面查询集群配置情况:

6.1、进入web端界面http://192.168.10.41:8161、http://192.168.10.41:8171、http://192.168.10.41:8181,登录账号密码admin/admin,到Network查看是否有另外两个节点的连接情况,若有另外两个节点的连接信息并且Remote Address为true,则集群建立完毕;

6.2、图片如下:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


三、项目连接配置步骤

1.Activemq连接服务配置:

ActiveMQ连接配置开箱即用

failover是一种连接URL配置选项,用于指定多个ActiveMQ broker的连接地址。当一个broker发生故障或不可用时,客户端会自动尝试连接配置中的其他broker。以此机制来实现多节点的集群连接模式。

spring:activemq:broker-url: failover:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)user: adminpassword: adminpool:enabled: truepackages:trust-all: true

2.MQTT生产者连接配置:

注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;

mqtt:brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883clientIds: dig-producer1,dig-producer2qos: 1userName: adminpassword: admin

3.MQTT消费者连接配置:

注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;

mqtt:topics: V5008Upload/#,V6800Upload/#qoss: 1,2brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883clientIds: dig-consumer1,dig-consumer2userNames: adminwords: admin

4.项目使用:

4.1、ActiveMQ配置使用
activeMQ配置使用比较简单,也不是本文的重点,简单贴一点代码


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;@Configuration
public class ActiveMqConfig {@Value("${spring.activemq.broker-url}")private String brokerUrl;/*** 队列模式(消息将按顺序一个一个地被消费,每个消息只能被一个消费者接收)*/@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {// SimpleJmsListenerContainerFactory适用于JMS 1.1规范// 消息监听容器工厂SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();// 关闭事务factory.setSessionTransacted(false);// 手动确认消息factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);// 设置监听容器工厂的发布订阅域为队列模式,即采用点对点消息传递模式factory.setPubSubDomain(false);factory.setConnectionFactory(activeMQConnectionFactory);return factory;}/*** 配置名字为givenConnectionFactory的连接工厂** @return*/@Bean("givenConnectionFactory")public ActiveMQConnectionFactory connectionFactory() throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);// 自定义消费重试机制RedeliveryPolicy policy = new RedeliveryPolicy();// 消息处理失败重新处理次数,默认为5次policy.setMaximumRedeliveries(5);// 启用指数退避策略,以延长每次重试的间隔时间policy.setUseExponentialBackOff(Boolean.TRUE);// 设置初始重试延迟时间为0毫秒,意味着消息处理失败时立即进行重试policy.setInitialRedeliveryDelay(0);// 设置每次重试之间的延迟时间为3秒policy.setRedeliveryDelay(3000L);// 设置指数退避的增加倍数,每次重试的延迟时间将按比例增加policy.setBackOffMultiplier(2);// 设置最大重试延迟时间为20秒policy.setMaximumRedeliveryDelay(20000L);factory.setRedeliveryPolicy(policy);Connection connection = factory.createConnection();connection.start();return factory;}//    /**
//     *  发布-订阅模式(消息会被广播给所有订阅该主题的消费者)
//     */
//    @Bean("topicListener")
//    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory givenConnectionFactory) {
//        // 设置为发布订阅模式, 默认情况下使用生产消费者方式
//        // DefaultJmsListenerContainerFactory 2.0规范
//        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//        bean.setSessionTransacted(false);
//        bean.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
//        bean.setPubSubDomain(true);
//        bean.setConnectionFactory(givenConnectionFactory);
//        return bean;
//    }}
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;@Service
public class ActivimqProducer {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;/*** 发送队列模式** @param queueName* @param message*/public void sendMqQueue(String queueName, String message) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);}}
import com.test.common.enums.QueueType;
import com.test.local.mqtt.process.EquipmentAssetsProcess;
import com.test.local.mqtt.process.MqttDataProcessing;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;import javax.jms.Session;@Slf4j
@Component
public class ActivimqConsumer {@Autowiredprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Autowiredprivate EquipmentAssetsProcess equipmentAssetsProcess;@JmsListener(destination = QueueType.LABEL_STATE, containerFactory = "jmsListenerContainerQueue")public void consumerLabelState(ActiveMQMessage activeMQMessage, String message, Session session) {if (StringUtils.isNotEmpty(message)) {threadPoolTaskExecutor.execute(new MqttDataProcessing(equipmentAssetsProcess,message,QueueType.LABEL_STATE,activeMQMessage,session));}}
}
import com.test.fastjson.JSON;
import com.test.common.constants.Constants;
import com.test.common.enums.PatternStatusEnum;
import com.test.common.enums.QueueType;
import com.test.common.redis.RedisCache;
import com.test.entity.TagInfo;
import com.test.local.entity.*;
import com.test.service.RegionCheckRecordService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.beans.factory.annotation.Autowired;import javax.jms.Session;
import java.util.UUID;@Slf4j
public class MqttDataProcessing implements Runnable {@Autowiredprivate RedisCache redisCache;@Autowiredprivate RegionCheckRecordService regionCheckRecordService;private String topic;private String message;private EquipmentAssetsProcess equipmentAssetsProcess;private ActiveMQMessage activeMQMessage;private Session session;public MqttDataProcessing(EquipmentAssetsProcess equipmentAssetsProcess,String message,String topic,ActiveMQMessage activeMQMessage,Session session) {this.topic = topic;this.message = message;this.equipmentAssetsProcess = equipmentAssetsProcess;this.activeMQMessage = activeMQMessage;this.session = session;}@SneakyThrows@Overridepublic void run() {String logId = UUID.randomUUID().toString().replace("-", "");try {if (QueueType.LABEL_STATE.equals(topic)) {  LabelState labelState = JSON.parseObject(message, LabelState.class); if (labelState.getData() != null && labelState.getData().size() > 0) {equipmentAssetsProcess.processLabelState(labelState, logId);}activeMQMessage.acknowledge();}} catch (Exception e) {// 重发session.recover();log.error("异常,重新消费!logId={},topic={},message={}", logId, topic, message, e);}}}

4.2、MQTT配置使用
对于MQTT的分布式我是这么理解的:

在消费端,同时连接多个节点进行消费,硬件发送的消息定义一个唯一id,此时会有ABC三个消费者等待硬件发送过来的消息,于是使用redisson的分布式锁lock.tryLock来限制消息只被消费一次。

在生产端,同时连接多个节点进行消息发送,因为我们的硬件只能连接到一个节点上面(硬件不能支持多节点代理消费)在一个节点宕机后才会去尝试连接备选节点,所有我们对所有节点都发送消息,保证该消息能被硬件接收并消费到,另外两个节点多发送的消息也不会造成问题(直接无视了),因为硬件同一时刻只能连接一个节点进行消费。

消费者配置

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;@RefreshScope
@Repository
@Data
public class ConfigMqtt {private String[] topics = new String[]{"test1Upload/#","test2Upload/#","test3Upload/#"};// @Value("${mqtt.qoss}")private int[] qoss = new int[]{2,2,2};@Value("${mqtt.brokers}")private String[] brokers;@Value("${mqtt.clientIds}")private String[] clientIds;@Value("${mqtt.userNames}")private String userNames;@Value("${mqtt.words}")private String words;}
import com.alibaba.fastjson.JSON;
import com.test.common.redis.RedisCache;
import com.test.config.ConfigMqtt;
import com.test.util.HexConvert;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** 订阅者:订阅硬件mqtt主题信息,消费硬件发送消息,转换硬件消息发送到ActiveMQ队列,最终到其他微服务处理ActiveMQ队列的消息*/
@Slf4j
@Service
public class MqttSubscription {@Autowiredprivate ConfigMqtt configMqtt;@Autowiredprivate SubscriptionJSON subscriptionJSON;@Autowiredprivate SubscriptionHEX subscriptionHEX;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedisCache redisCache;@Beanpublic void client() throws Exception {String[] hosts = configMqtt.getBrokers();String[] clientIds = configMqtt.getClientIds();// 多个for (int i = 0; i < hosts.length; i++) {String host = hosts[i];String clientId = clientIds[i];try {InetAddress ip4 = Inet4Address.getLocalHost();clientId = clientId + "-" + ip4.getHostAddress();} catch (UnknownHostException e) {log.error("MqttSubscription-client-configMqtt" + configMqtt);log.error("MqttSubscription-client-e" + e);}String finalClientId = clientId;threadPoolTaskExecutor.execute(() -> this.myClient(host, finalClientId));}}private void myClient(String host, String clientId) {try {String[] topics = configMqtt.getTopics();int[] qos = configMqtt.getQoss();String userName = configMqtt.getUserNames();String passWord = configMqtt.getWords();// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存MqttClient client = new MqttClient(host, clientId, new MemoryPersistence());// MQTT的连接设置MqttConnectOptions options = new MqttConnectOptions();// todo:ch:设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,断线重连会消费断线期间的消息options.setCleanSession(true);// 设置连接的用户名options.setUserName(userName);// 设置连接的密码options.setPassword(passWord.toCharArray());// 设置超时时间 单位为秒options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.setKeepAliveInterval(20);// 自动重连options.setAutomaticReconnect(true);// todo:ch:设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);// 设置回调函数client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {while (true) {try {client.connect(options);client.subscribe(topics, qos);break;} catch (Exception e) {e.printStackTrace();log.error("mqtt客户端id-clientId:" + clientId);log.error("mqtt连接异常-e", e);log.error("mqtt连接异常-cause" + cause);try {Thread.sleep(5000);} catch (InterruptedException ex) {ex.printStackTrace();}}}}public void messageArrived(String topic, MqttMessage message) throws Exception {String key = "MQTT-";String uuid = "";// 消息存在if (message.getPayload().length > 0) {byte[] req = message.getPayload();if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {String data = new String(req);Map<String, Object> map = JSON.parseObject(data);if (map.containsKey("uuid") && map.get("uuid") != null) {uuid = map.get("uuid").toString();}else {uuid = clientId;}} else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {String data = HexConvert.convertStringToHex(req);uuid = data.subSequence(data.length() - 8, data.length()).toString();} else if (topic.contains("test2") || topic.contains("test3")) {String data = HexConvert.convertStringToHex(req);uuid = data.subSequence(data.length() - 8, data.length()).toString();}if (!Strings.isNullOrEmpty(uuid)) {key += uuid;}}// 分布式锁,防止多应用节点产生重复消息RLock lock = redissonClient.getLock(key);try {// 加锁,等待30秒锁自动释放, 不在finally手动释放了,给予30秒的缓冲时间boolean resultLock = lock.tryLock(0, 30, TimeUnit.SECONDS);if (resultLock) {String data = new String(message.getPayload());log.info("mqtt-clientId:" + clientId);log.info("mqtt-key:" + key);
//                            log.info("message-ID:" + message.getId());log.info("messageArrived-topic" + topic);log.info("messageArrived-message" + data);if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费......} else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费......} else if (topic.contains("test2") || topic.contains("test3")) {// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费......}}} catch (Exception e) {e.printStackTrace();log.error("mqtt客户端id-clientId:" + clientId);log.error("mqtt发布信息异常-e", e);log.error("mqtt发布信息异常-topic" + topic);log.error("mqtt发布信息异常-message" + message.toString());}
//                    finally {
//                        if (lock.isLocked()) {
//                            if (lock.isHeldByCurrentThread()) {
//                                lock.unlock();
//                            }
//                        }
//                    }}public void deliveryComplete(IMqttDeliveryToken token) {}});// todo:是否需要永久重新连接,能否设定固定重连次数    或者固定多少秒重连一次(类似心跳机制)int retryCount = 0;while (!client.isConnected()) {try {Thread.sleep(getBackoffTime(retryCount));client.connect(options);client.subscribe(topics, qos);} catch (Exception e) {log.error("Reconnect attempt failed", e);retryCount++;}}} catch (MqttException e) {log.error("mqtt客户端id-clientId:" + clientId);log.error("mqtt连接错误:", e);}}private long getBackoffTime(int retryCount) {// 使用指数退避算法计算重连时间long waitTime = Math.min(1000 * (1 << retryCount), 60000); // 最大等待时间为60秒return waitTime;}
}

生产者配置

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;@RefreshScope
@Repository
@Data
public class ConfigMqtt {@Value("${mqtt.brokers}")private String[] brokers;@Value("${mqtt.clientIds}")private String[] clientIds;@Value("${mqtt.qos}")private int qos;@Value("${mqtt.userName}")private String userName;@Value("${mqtt.password}")private String password;}
import com.test.local.config.ConfigMqtt;
import com.test.local.utils.HexConvert;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;/***  MQTT生产者,生产消息发送到硬件*/@Slf4j
@Service
public class MqttConnect {private volatile static MqttClient mqttClientSingleton;private volatile static List<MqttClient> mqttClientSingletonList = new ArrayList<>();@Autowiredprivate ConfigMqtt configMqtt;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;private MqttConnect() {}/*** 创建 多个 mqtt 实例*/public static List<MqttClient> getMoreInstance(ConfigMqtt configMqtt) {String[] hosts = configMqtt.getBrokers();String[] clientIds = configMqtt.getClientIds();// 创建多个节点连接实例if (mqttClientSingletonList == null || mqttClientSingletonList.size() < hosts.length) {if (mqttClientSingletonList != null && !mqttClientSingletonList.isEmpty() && mqttClientSingletonList.size() < hosts.length) {mqttClientSingletonList.forEach(re -> {try {re.disconnect();re.close();} catch (Exception e) {}});// 清除原有实例mqttClientSingletonList.clear();}// 多个for (int i = 0; i < hosts.length; i++) {String clientIdRe = clientIds[i];String broker = hosts[i];String userName = configMqtt.getUserName();String password = configMqtt.getPassword();StringBuffer clientId = new StringBuffer();try {InetAddress ip4 = Inet4Address.getLocalHost();clientId.append(clientIdRe).append("-").append(ip4.getHostAddress()).append("-").append(HexConvert.getStringRandom(11));} catch (UnknownHostException e) {log.error("MqttClient-getInstance-e" + e);}MemoryPersistence persistence = new MemoryPersistence();synchronized (MqttConnect.class) {MqttClient mqttClient = null;try {// 创建客户端mqttClient = new MqttClient(broker, clientId.toString(), persistence);// 创建链接参数MqttConnectOptions connOpts = new MqttConnectOptions();// 在重新启动和重新连接时记住状态connOpts.setCleanSession(true);// 设置连接的用户名connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());// 建立连接mqttClient.connect(connOpts);mqttClientSingletonList.add(mqttClient);} catch (MqttException me) {log.error("reason " + me.getReasonCode());log.error("msg " + me.getMessage());log.error("loc " + me.getLocalizedMessage());log.error("cause " + me.getCause());log.error("excep " + me);log.error("发送连接mqtt异常" + me);try {mqttClient.disconnect();mqttClient.close();} catch (Exception e) {}//将 mqtt 置空mqttClient = null;me.printStackTrace();}}}}return mqttClientSingletonList;}/*** 发布消息给硬件*/public void publish(String version, String gateway, String content) {StringBuffer topic = new StringBuffer();topic.append(version).append("Download/").append(gateway);int qos = configMqtt.getQos();// mqtt多节点消息发送 -- 每个节点都发送一份消息让硬件消费List<MqttClient> clientList = MqttConnect.getMoreInstance(configMqtt);if (!clientList.isEmpty()) {clientList.forEach(client -> {threadPoolTaskExecutor.execute(() -> {try {// 创建消息MqttMessage message = new MqttMessage(content.getBytes());// 设置消息的服务质量message.setQos(qos);log.info("发送消息到MQTT供硬件消费");log.info("================client:"+client.getClientId());log.info("================topic:"+topic);log.info("================message:"+message);// 发布消息client.publish(topic.toString(), message);} catch (MqttException me) {log.error("reason " + me.getReasonCode());log.error("msg " + me.getMessage());log.error("loc " + me.getLocalizedMessage());log.error("cause " + me.getCause());log.error("excep " + me);log.error("发送连接mqtt异常" + me);}});});}}
}

总结

近期有时间总结了一下前段时间搭建ActiveMQ + MQTT集群并且在微服务中使用的流程。经此,牛马小陈同学巩固了中间件和分布式概念知识。MQTT的分布式使用是出于自己对分布式的理解然后手写的,目前能正常进行分布式消费,对于MQTT的理解还不是很深,很多处理非常的粗糙,欢迎各位新手同学一起学习、各路大佬批评指正,谢谢!

ActiveMQ + MQTT使用docker方式部署如下:
ActiveMQ + MQTT 集群搭建(docker版本)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/305612.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ stl容器string的底层模拟实现

目录 前言&#xff1a; 1.成员变量 2.构造函数与拷贝构造函数 3.析构函数 4.赋值重载 5.[]重载 6.比较关系重载 7.reserve 8.resize 9.push_back,append和重载 10.insert 11.erase 12.find 14.迭代器 15.流插入&#xff0c;流提取重载 16.swap 17.c_str 18.完…

信道延迟的计算方法

目录 1.信道延迟影响的因素 2.计算公式 3.实践理解 4.基础知识 1.信道延迟影响的因素 因素1&#xff1a;与源端和宿端距离&#xff1b; 因素2&#xff1a;具体信道中的信号传播速率有关&#xff1b; 2.计算公式 总延迟 T发送延迟T1&#xff08;信号传播速率&#xff09; 线路…

21 - 寄存器控制器

---- 整理自B站UP主 踌躇月光 的视频 1. 程序计数器 PC 改造 原来的 PC 由于后续的 ALU 会较频繁的变动&#xff0c;因而会经常修改 PC&#xff0c;所以将 PC 中的 ALU 换成 8 位加法器。改造后的 PC&#xff1a; 2. 寄存器控制器的实现 我们控制器主要通过 ROM 实现&#xf…

FreeBuf 全球网络安全产业投融资观察(3月)

综述 据不完全统计&#xff0c;2024年3月&#xff0c;全球网络安全市场共发生投融资事件53起&#xff0c;其中国内4起&#xff0c;国外49起。 3月全球络安全产业投融资统计表&#xff08;数据来源&#xff1a;航行资本、36氪&#xff09; 整体而言&#xff0c;国内4起投融资事…

(2022级)成都工业学院数据库原理及应用实验三:数据定义语言DDL

唉&#xff0c;用爱发电连赞都没几个&#xff0c;博主感觉没有动力了 想要完整版的sql文件的同学们&#xff0c;点赞评论截图&#xff0c;发送到2923612607qq,com&#xff0c;我就会把sql文件以及如何导入sql文件到navicat的使用教程发给你的 基本上是无脑教程了&#xff0c;…

Stable Diffusion之Ubuntu下部署

1、安装conda环境 conda create -n webui python3.10.6 2、激活环境 每次使用都要激活 conda activate webui 注意开始位置的变换 关闭环境 conda deactivate webui 3、离线下载SD 代码 https://github.com/AUTOMATIC1111/stable-diffusion-webui https://github.com/Stabilit…

华为OD技术面试-有序数组第K最小值

背景 2024-03-15华为od 二面&#xff0c;记录结题过程 有序矩阵中第 K 小的元素 - 力扣&#xff08;LeetCode&#xff09; https://leetcode.cn/problems/kth-smallest-element-in-a-sorted-matrix/submissions/512483717/ 题目 给你一个 n x n 矩阵 matrix &#xff0c;其…

从 Oracle 到 MySQL 数据库的迁移之旅

文章目录 引言一、前期准备工作1.搭建新的MySQL数据库2 .建立相应的数据表2.1 数据库兼容性分析2.1.1 字段类型兼容性分析2.1.2 函数兼容性分析2.1.3 是否使用存储过程&#xff1f;存储过程的个数&#xff1f;复杂度&#xff1f;2.1.4 是否使用触发器&#xff1f;个数&#xff…

Redis从入门到精通(十三)Redis分布式缓存(一)RDB和AOF持久化、Redis主从集群的搭建与原理分析

文章目录 第5章 Redis分布式缓存5.1 Redis持久化5.1.1 RDB持久化5.1.1.1 执行时机5.1.1.2 bgsave原理 5.1.2 AOF持久化5.1.2.1 AOF原理5.1.2.2 AOF配置5.1.2.3 AOF文件重写 5.1.3 RDB和AOF的对比 5.2 Redis主从5.2.1 搭建主从结构5.2.2 主从数据同步原理5.2.2.1 全量同步5.2.2.…

VMware导出虚拟机vmkd格式转换qcow2

VMware虚拟机导出qcow2格式可以上传至云服务 1、需要导出的虚拟机 2、克隆虚拟机 3、选择克隆源 4、创建完整克隆 5、完成 6、找到VMware安装路径 7、找到vmware-vdiskmanager所在路径使用cmd或Windows PowerShell进入目录 进入vmware-vdiskmanager目录 cd F:\软件\VMware Wo…

结合ArcGIS+SWAT模型+Century模型:流域生态系统水-碳-氮耦合过程模拟

原文链接&#xff1a;结合ArcGISSWAT模型Century模型&#xff1a;流域生态系统水-碳-氮耦合过程模拟https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&tempkeyMTI2NV9sMGRZNUJoVkNVc1ZzSzRuMl9XXzhqX0R3cXpESWFwM1E4cFY4ejNqWFh3VUl0dlZkNWk4b20ydFdFTy1xS2ZObGN0Z0ZXSjly…

sed 字符替换时目标内容包含 特殊字符怎么处理

背景 想写一个自动修改配置的脚本&#xff0c;输入一个 mysql jdbc 的连接路径&#xff0c;然后替换目标配置中的模版内容&#xff0c;明明很简单的一个内容&#xff0c;结果卡在了 & 这个符号上。 & 到底是什么特殊字符呢&#xff1f;结论&#xff1a;它代表要替换的…

Leetcode 406. 根据身高重建队列

心路历程&#xff1a; 看到二维数组的排序问题&#xff0c;第一反应想到了之前合并区间那道题先对数组按照第一维排序&#xff0c;后来在纸上模拟后发现&#xff0c;如果按照第一维度降维&#xff0c;第二维度升维的方式排序&#xff0c;那么后面插入的元素一定不会影响前面的…

[AIGC] Spring中的SPI机制详解

文章目录 1. 什么是SPI2. Spring中的SPI机制3. Spring SPI的工作流程4. Spring SPI的应用 1. 什么是SPI SPI &#xff08;Service Provider Interface&#xff09;是Java中服务提供者接口的缩写&#xff0c;它是Java提供的一种用于被第三方实现或扩展的接口&#xff0c;SPI的作…

微服务demo(四)nacosfeigngateway(2)gatewayspringsercurity

一、思路 1、整体思路 用户通过客户端访问项目时&#xff0c;前端项目会部署在nginx上&#xff0c;加载静态文件时直接从nginx上返回即可。当用户在客户端操作时&#xff0c;需要调用后端的一些服务接口。这些接口会通过Gateway网关&#xff0c;网关进行一定的处理&#xff0…

Redis入门到通过之Redis安装

文章目录 Redis安装说明1.单机安装Redis1.1.安装Redis依赖1.2.上传安装包并解压1.3.启动1.3.1.默认启动1.3.2.指定配置启动1.3.3.开机自启 2.Redis客户端2.1.Redis命令行客户端2.2.图形化桌面客户端2.2.1.安装2.2.2.建立连接 Redis安装说明 大多数企业都是基于Linux服务器来部…

学习大数据,所需要的java(Maven)基础(1)

文章目录 使用Maven的优势第三方jar包添加第三方jar包获取jar包之间的依赖关系jar包之间的冲突处理将项目拆分成多个工程模块 实现项目的分布式部署Maven是什么自动化构建工具构建的概念构建环节自动化构建 Maven如何使用安装Maven核心程序maven联网问题Maven中的settings配置在…

pytorch实现胶囊网络(capsulenet)

胶囊网络在hinton刚提出来的时候小热过一段时间&#xff0c;之后热度并没有维持多久。vision transformer之后基本少有人问津了。不过这个模型思路挺独特的&#xff0c;值得研究一下。 这个模型的提出是为了解决CNN模型学习到的特征之间没有空间上的关系&#xff0c;从而对于各…

开源监控zabbix对接可视化工具grafana教程

今天要给大家介绍的是开源监控工具zabbix对接可视化工具grafana问题。 有一定运维经验的小伙伴大抵都或多或少使用过、至少也听说过开源监控工具zabbix&#xff0c;更进一步的小伙伴可能知道zabbix在数据呈现方面有着明显的短板&#xff0c;因此需要搭配使用第三方的可视化工具…

背 单 词 (考研词汇闪过)

单词&#xff1a; 买考研词汇闪过 研究艾宾浩斯遗忘曲线 https://www.bilibili.com/video/BV18Y4y1h7YR/?spm_id_from333.337.search-card.all.click&vd_source5cbefe6dd70d6d84830a5891ceab2bf9 单词方法 闪记背两排&#xff08;5min&#xff09;重复一遍&#xff08;2mi…