SpringBoot 集成MQTT实现消息订阅

1、引入依赖

  <!--MQTT start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.4</version></dependency><!--MQTT end--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>

2、增加yml配置

  spring:mqtt:username: testpassword: testurl: tcp://127.0.0.1:8080subClientId: singo_sub_client_id_888 #订阅 客户端idpubClientId: singo_pub_client_id_888 #发布 客户端idconnectionTimeout: 30keepAlive: 60

3、资源配置类

@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId;private String pubClientId;private int connectionTimeout;private int keepAlive;
}

注意启动类需要增加注解

@EnableConfigurationProperties(MqttConfigurationProperties.class)

4、MQTT配置类


@Configuration
public class MqttConfig {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;/*** 连接参数** @return*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout());options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive());options.setCleanSession(true); // 设置为false以便断线重连后恢复会话options.setAutomaticReconnect(true);return options;}/*** 连接工厂** @param options* @return*/@Beanpublic MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);return factory;}/*** 消息输入通道* 每次只有一个消息处理器可以消费消息。* 当前消息的处理完成之前,新消息需要排队等待,无法并行处理。* 默认是:单线程、顺序执行的* @return*/// @Bean// public DirectChannel mqttInputChannel() {//     return new DirectChannel();// }/*** 支持多线程并发处理消息的输入通道** @return*/@Beanpublic ExecutorChannel mqttInputChannel() {return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 线程池大小可以调整}/*** 配置入站适配器** @param mqttClientFactory* @return*/@Beanpublic MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory);// adapter.addTopic("pub/300119110099"); 订阅主题,也可以放在初始化动态配置adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** 配置消息处理器** @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道public MessageHandler messageHandler() {return new MqttReceiverMessageHandler();}}

5、消息处理器配置

@Slf4j
@Component
public class MqttReceiverMessageHandler implements MessageHandler {@Autowiredprivate MqttMessageProcessingService mqttMessageProcessingService;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();log.info("线程名称:{},收到消息,主题:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());// log.info("收到消息主题:{}", headers.get("mqtt_receivedTopic").toString());// log.info("收到消息:{}", message.getPayload());// 消息保存到内存队列里面,定时批量入库,也可以在这里直接入库mqttMessageProcessingService.addMessage(message.getPayload().toString());}
}

6、消息主题缓存对象

@Component
public class MqttTopicStore {private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>();public ConcurrentHashMap<String, String> getTopics() {return topics;}
}

7、动态订阅数据库主题配置

@Slf4j
@Component
public class MqttInit {@Autowiredprivate MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;@Autowiredprivate MqttTopicStore mqttTopicStore;@PostConstructpublic void init() {subscribeAllTopics();}public void subscribeAllTopics() {// List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled();// for (MqttTopicConfig topic : topics) {//     subscribeTopic(topic);// }log.info("===================>从数据库里获取并初始化订阅所有主题");List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100");topics.stream().forEach(t -> {messageDrivenChannelAdapter.addTopic(t);// 同时往MqttTopicStore.topics中增加一条记录用于缓存});}}

8、消息处理服务

@Service
public class MqttMessageProcessingService {@Autowiredprivate MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;@Autowiredprivate MqttTopicStore mqttTopicStore;// 内存队列,用于暂存消息private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();// 添加消息到队列public void addMessage(String message) {messageQueue.add(message);}/*** 可以放到定时任务里面去,注入后取队列方便维护* 定时任务,每5秒执行一次 ,建议2分钟一次 理想的触发间隔应略小于数据到达间隔,以确保及时处理和插入* 如果每 5 分钟收到一条数据,可以设置任务执行周期为4 分钟或更短,以便任务有足够的时间处理数据,同时减少积压的可能性。*/@Scheduled(fixedRate = 1 * 60 * 1000)public void batchInsertToDatabase() {System.out.println("定时任务执行中,当前队列大小:" + messageQueue.size());List<String> batch = new ArrayList<>();messageQueue.drainTo(batch, 500); // 一次性取最多500条消息if (!batch.isEmpty()) {// 批量插入数据库saveMessagesToDatabase(batch);}}private void saveMessagesToDatabase(List<String> messages) {// 假设这是批量插入逻辑System.out.println("批量插入数据库,条数:" + messages.size());for (String message : messages) {System.out.println("插入消息:" + message);}// 实际数据库操作代码}/*** 订阅与取消订阅定时任务*/public void subscribeAndUnsubscribeTask() {// 从数据库获取所有主题,正常状态、删除状态// 正常状态:判断mqttTopicStore.topics中是否存在,不存在则订阅,并在mqttTopicStore.topics中增加// 删除状态: 判断mqttTopicStore.topics中是否存在,存在则取消订阅,并在mqttTopicStore.topics中删除// messageDrivenChannelAdapter.addTopic(t);}
}

以上是简单的对接步骤,部分类、方法可以根据实际情况进行合并处理!!!!

9、定时任务

@Slf4j
@Configuration
@EnableScheduling
public class MqttJob {@Value("${schedule.enable}")private boolean enable;@Autowiredprivate MqttMessageProcessingService mqttMessageProcessingService;/*** 定时订阅与取消订阅主题,从共享主题对象MqttTopicStore里面取出主题列表,然后进行订阅或取消订阅* 每分钟一次*/public void subscribeAndUnsubscribe() {if (!enable) return;mqttMessageProcessingService.subscribeAndUnsubscribeTask();}/*** 定时处理队列里面的订阅消息,会有丢失风险,宕机时会丢失队列里面的消息* 每分钟一次 要考虑一次消息处理的时间;也可先不使用队列,每次收到消息直接实时入库,有性能问题时在启用*/public void batchSaveSubscribeMessage() {}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/480416.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序蓝牙writeBLECharacteristicValue写入数据返回成功后,实际硬件内信息查询未存储?

问题&#xff1a;连接蓝牙后&#xff0c;调用小程序writeBLECharacteristicValue&#xff0c;返回传输数据成功&#xff0c;查询硬件响应发现没有存储进去&#xff1f; 解决&#xff1a;一直以为是这个write方法的问题&#xff0c;找了很多相关贴&#xff0c;后续进行硬件日志…

Zero to JupyterHub with Kubernetes中篇 - Kubernetes 常规使用记录

前言&#xff1a;纯个人记录使用。 搭建 Zero to JupyterHub with Kubernetes 上篇 - Kubernetes 离线二进制部署。搭建 Zero to JupyterHub with Kubernetes 中篇 - Kubernetes 常规使用记录。搭建 Zero to JupyterHub with Kubernetes 下篇 - Jupyterhub on k8s。 参考&…

电脑无互联网连接怎么解决?分享5种解决方案

无互联网连接是指设备无法与互联网进行通信或连接失败。这可能会导致我们无法正常上网&#xff0c;给我们的日常生活和工作带来很大的不便。但请不要担心&#xff0c;下面将为您介绍一些解决无互联网连接问题的方法。 一、检查网络是否正常连接 首先&#xff0c;确保您的路由器…

Web前端学习_CSS盒子模型

content padding border margin <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>CSS盒子模型</title><style></style> </head> <body> <div class"demo&quo…

HTML CSS JS基础考试题与答案

一、选择题&#xff08;2分/题&#xff09; 1&#xff0e;下面标签中&#xff0c;用来显示段落的标签是&#xff08; d &#xff09;。 A、<h1> B、<br /> C、<img /> D、<p> 2. 网页中的图片文件位于html文件的下一级文件夹img中&#xff0c;…

华为开源操作系统openEuler安装部署

本文主要描述华为开源操作系统openEuler的安装部署。openEuler是面向数字基础设施的开源操作系统&#xff0c;是由开放原子开源基金会&#xff08;OpenAtom Foundation&#xff09;孵化及运营的开源项目&#xff0c;其愿景是为世界提供数字基础设施的开源操作系统&#xff0c;其…

分布式搜索引擎之elasticsearch单机部署与测试

分布式搜索引擎之elasticsearch单机部署与测试 1.部署单点es 1.1.创建网络 因为我们还需要部署kibana容器&#xff0c;因此需要让es和kibana容器互联。这里先创建一个网络&#xff1a; docker network create es-net1.2.加载镜像 这里我们采用elasticsearch的7.12.1版本的…

渣土车治理新方案:智能化引领安全与环保新时代

一、渣土车问题现状 1. 盲区众多隐患大&#xff0c;事故频发令人忧。 渣土车盲区多&#xff0c;易引发交通事故&#xff0c;给行人和其他车辆带来严重安全威胁。由于渣土车体积庞大&#xff0c;实际的视觉盲区范围包括半盲区为左车门 1.2 米、右前方 1.5 米、正前方 1.2 米&am…

Vue3+node.js实现登录

文章目录 前端代码实现后端代码实现跨域处理 前端代码实现 效果图 前端代码实现 <template><div class"login-container"><el-card class"login-card"><template #header><div class"card-header"><span>…

jenkins 2.346.1最后一个支持java8的版本搭建

1.jenkins下载 下载地址&#xff1a;Index of /war-stable/2.346.1 2.部署 创建目标文件夹&#xff0c;移动到指定位置 创建一个启动脚本&#xff0c;deploy.sh #!/bin/bash set -eDATE$(date %Y%m%d%H%M) # 基础路径 BASE_PATH/opt/projects/jenkins # 服务名称。同时约定部…

Windows10+VirtualBox+Ubuntu:安装虚拟机VirtualBox,虚拟机中安装Ubuntu

一、需求 在Windows10系统中&#xff0c;安装虚拟机VirtualBox&#xff0c;VirtualBox中安装Ubuntu桌面版。 二、环境准备 系统环境 Windows10 内存&#xff1a;8G 虚拟化 虚拟机的运行&#xff0c;如果需要Windows系统开启虚拟化&#xff0c;可以通过BIOS设置。 “虚拟…

pcb元器件选型与焊接测试时的一些个人经验

元件选型 在嘉立创生成bom表&#xff0c;对照bom表买 1、买电容时有50V或者100V是它的耐压值&#xff0c;注意耐压值 2、在买1117等降压芯片时注意它降压后的固定输出&#xff0c;有那种可调降压比如如下&#xff0c;别买错了 贴片元件焊接 我建议先薄薄的在引脚上涂上锡膏…

【漏洞复现】|百易云资产管理运营系统/mobilefront/c/2.php前台文件上传

漏洞描述 湖南众合百易信息技术有限公司&#xff08;简称&#xff1a;百易云&#xff09;成立于2017年是一家专注于不动产领域数字化研发及服务的国家高新技术企业&#xff0c;公司拥有不动产领域的数字化全面解决方案、覆盖住宅、写字楼、商业中心、专业市场、产业园区、公建、…

重学 Android 自定义 View 系列(八):星星评分控件(RatingBar)

前言 本节实现一个常见的星星评分控件&#xff0c;广泛应用于各种评价类应用中&#xff0c;比如电影评分、商品评价等。难度不大&#xff0c;直接开搂&#xff01; 最终效果如下&#xff1a; 1. 效果分析 显示若干颗星星&#xff08;默认为5颗&#xff0c;可根据属性配置&a…

【力扣热题100】—— Day3.相交链表

被你改变的那部分我&#xff0c;代替你&#xff0c;永远与我站在一起 —— 24.11.28 160. 相交链表 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 图示两个链表在节点 c1 …

SpringBoot实战(三十二)集成 ofdrw,实现 PDF 和 OFD 的转换、SM2 签署OFD

目录 一、OFD 简介1.1 什么是 OFD&#xff1f;1.2 什么是 版式文档&#xff1f;1.3 为什么要用 OFD 而不是PDF&#xff1f; 二、ofdrw 简介2.1 定义2.2 Maven 依赖2.3 ofdrw 的 13 个模块 三、PDF/文本/图片 转 OFD&#xff08;ofdrw-conterver&#xff09;3.1 介绍&#xff1a…

cesium 3Dtiles变量

原本有一个变亮的属性luminanceAtZenith&#xff0c;但是新版本的cesium没有这个属性了。于是 let lightColor 3.0result._customShader new this.ffCesium.Cesium.CustomShader({fragmentShaderText:void fragmentMain(FragmentInput fsInput, inout czm_modelMaterial mate…

Java 语言的起源发展与基本概念(JDK,JRE,JVM)

Java语言的起源 源起 Java语言最初是由Sun Microsystems公司&#xff08;该公司于2009年被Oracle公司收购&#xff09;开发的一种编程语言。其创造者是詹姆斯高斯林&#xff08;James Gosling&#xff09;&#xff0c;他是一位加拿大计算机科学家。其前身名为Oak&#xff08;橡…

Mac安装及合规无限使用Beyond Compare

文章目录 Beyond CompareBeyond Compare简介Beyond Compare安装Beyond Compare到期后继续免费使用 Beyond Compare Beyond Compare简介 Beyond Compare 是一款由 Scooter Software 开发的文件和文件夹比较工具。它主要用于对比两个文件或文件夹之间的差异&#xff0c;并支持文…

使用 Spring AI + Elasticsearch 让 RAG 变得简单

作者&#xff1a;来自 Elastic Laura Trotta 使用私人数据定制你的人工智能聊天机器人体验。 Spring AI 最近将 Elasticsearch 添加为向量存储&#xff0c;Elastic 团队为其提供了优化。我们很高兴展示使用 Spring AI 和 Elasticsearch 向量数据库&#xff08;vector database&…