Java模拟多个Mqtt客户端连接Mqtt Broker

上一次我们介绍了Java模拟单个Mqtt客户端的场景,但是在实际的业务场景中,可能需要我们模拟多个Mqtt客户端,比如:我们要对云平台的连接和设备上下行做压测。

Java模拟多个Mqtt客户端基本流程

引入Paho MQTT客户端库

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

定义MqttService

package com.angel.ocean.service.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class MqttService {// mqtt brokerprivate String broker  = "tcp://42.194.132.44:1883";// QoS 等级public static int qos = 2;private final Map<String, MqttClient> clients = new HashMap<>();private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);@PostConstructpublic void init() {// TODO 动态创建客户端的逻辑String clientIdPrefix = "2024120122480";int i = 0;while (i < 3) {String clientId = clientIdPrefix + i;connect(clientId, "test", "123456");i = i + 1;}// 定期检查客户端连接状态(可选)executorService.scheduleAtFixedRate(() -> {clients.values().forEach(client -> {if (!client.isConnected()) {log.info("{} is not connected, attempting to reconnect...", client.getClientId());// 这里应该实现重连逻辑}});}, 0, 10, TimeUnit.SECONDS);}@PreDestroypublic void destroy() {clients.values().forEach(client -> {try {client.disconnect();} catch (MqttException e) {log.error("destroy failed, clientId:{}", client.getClientId(), e);}});executorService.shutdown();}/*** 创建 mqtt 客户端* @param clientId* @param username* @param password*/public void connect(String clientId, String username, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());// 保留会话options.setCleanSession(true);options.setAutomaticReconnect(true);// 设置回调client.setCallback(new OnMessageCallback());// 建立连接log.info("Connecting to broker: {}, clientId:{}", broker, clientId);client.connect(options);clients.put(clientId, client);log.info("Connected: {}, clientId:{}", broker, clientId);} catch (MqttException me) {log.error("reason:{}, msg:{}, loc:{}, cause:{}", me.getReasonCode(), me.getMessage(), me.getLocalizedMessage(), me.getCause(), me);}}/*** 获取 mqtt 客户端* @param clientId* @return*/public MqttClient getClientByClientId(String clientId) {return clients.get(clientId);}/*** 发送消息* @param clientId* @param topic* @param content*/public void publish(String clientId, String topic, String content) {MqttClient client = getClientByClientId(clientId);if(client.isConnected()) {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try {client.publish(topic, message);log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, content);} catch (MqttException e) {log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message publish failed, client:{} not online.", client.getClientId());}/*** 订阅* @param clientId* @param topic*/public void subscribe(String clientId, String topic) {MqttClient client = getClientByClientId(clientId);if(client.isConnected()) {try {client.subscribe(topic);log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);} catch (MqttException e) {log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message subscribe failed, client:{} not online.", client.getClientId());}/*** 断开连接* @param clientId*/public void disconnect(String clientId) {MqttClient client = getClientByClientId(clientId);try {client.disconnect();clients.remove(clientId);client.close();log.info("Disconnected:{}", client.getClientId());} catch (MqttException e) {log.error("Message disconnect failed:{}", client.getClientId(), e);}}
}

自定义MqttCallback

package com.angel.ocean.service.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开,可以做重连");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------:{}", token.isComplete());}
}

MqttController

package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.service.mqtt.MqttService;
import io.swagger.annotations.*;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/***  前端控制器** @author Jaime.yu* @time 2024-12-01*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {@Resourceprivate MqttService mqttService;@GetMapping("/connect")public ApiResult<String> connect(String clientId, String username, String password) {mqttService.connect(clientId, username, password);return ApiResult.success(clientId);}@GetMapping("/subscribe")public ApiResult<String> subscribe(String clientId, String topic) {mqttService.subscribe(clientId, topic);return ApiResult.success(topic);}@GetMapping("/publish")public ApiResult<String> publish(String clientId, String topic, String message) {mqttService.publish(clientId, topic, message);return ApiResult.success(message);}@GetMapping("/disconnect")public ApiResult<Void> disconnect(String clientId) {mqttService.disconnect(clientId);return ApiResult.success();}
}

代码验证

启动服务

启动服务后,可以看到初始化的3个Mqtt客户端成功连接到Mqtt Broker
在这里插入图片描述

动态创建Mqtt客户端

在这里插入图片描述如下图所示,动态创建了3个Mqtt客户端:
在这里插入图片描述

发送消息

在这里插入图片描述从下图日志可以看出,消息发送成功:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/493592.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Qt Creator 4.11.1 教程》

《Qt Creator 4.11.1 教程》 一、Qt Creator 4.11.1 概述&#xff08;一&#xff09;简介&#xff08;二&#xff09;界面构成 二、常用设置指南&#xff08;一&#xff09;环境设置&#xff08;二&#xff09;文本编辑器设置&#xff08;三&#xff09;构建和运行设置 三、构建…

LeetCode 热题 100_K 个一组翻转链表(31_25_困难_C++)(四指针法)

LeetCode 热题 100_K 个一组翻转链表&#xff08;31_25&#xff09; 题目描述&#xff1a;输入输出样例&#xff1a;题解&#xff1a;解题思路&#xff1a;思路一&#xff08;四指针法&#xff09;&#xff1a; 代码实现代码实现&#xff08;思路一&#xff08;四指针法&#x…

探索 Python编程 调试案例:计算小程序中修复偶数的bug

在 学习Python 编程的过程里&#xff0c;会遇到各种各样的bug。而修复bug调试代码就像是一场充满挑战的侦探游戏。每一个隐藏的 bug 都是谜题&#xff0c;等待开发者去揭开真相&#xff0c;让程序可以顺利运行。今天&#xff0c;让我们通过一个实际案例&#xff0c;深入探索 Py…

harmony UI组件学习(1)

Image 图片组件 string格式&#xff0c;通常用来加载网络图片&#xff0c;需要申请网络访问权限:ohos.permission.INTERNET Image(https://xxx.png) PixelMap格式&#xff0c;可以加载像素图&#xff0c;常用在图片编辑中 Image(pixelMapobject) Resource格式&#xff0c;加…

TCL发布万象分区,再造Mini LED技术天花板

作者 |辰纹 来源 | 洞见新研社 现实世界中&#xff0c;光通过悬浮在大气中的冰晶折射&#xff0c;呈现出环形、弧形、柱形或亮点的扩散&#xff0c;从而产生光晕&#xff0c;雨后的彩虹是我们经常能看到的光晕现象。 然而&#xff0c;当光晕出现在电视中&#xff0c;那就不是…

(14)D-FINE网络,爆锤yolo系列

yolo过时了&#xff1f;传统的yolo算法在小目标检测方面总是不行&#xff0c;最新算法DEIM爆锤yolo&#xff0c;已经替yolo解决。 一、创新点 ​ 这个算法名为DEIM&#xff0c;全称是DETR with Improved Matching for Fast Convergence&#xff0c;其主要创新点在于提出了一…

日本充电桩标准--CHAdeMO介绍

一、日本充电桩标准 1、充电桩认证体系 日本是新能源汽车主要推动者之一&#xff0c;其实相比纯电动车来说&#xff0c;在日本混动或者插电混动更受到民众的欢迎&#xff0c;油耗低经济实用比纯电动车更方便&#xff0c;连服务类的出租车和警车也大多都采用混动车型。在日本充…

HDR视频技术之十:MPEG 及 VCEG 的 HDR 编码优化

与传统标准动态范围&#xff08; SDR&#xff09;视频相比&#xff0c;高动态范围&#xff08; HDR&#xff09;视频由于比特深度的增加提供了更加丰富的亮区细节和暗区细节。最新的显示技术通过清晰地再现 HDR 视频内容使得为用户提供身临其境的观看体验成为可能。面对目前日益…

web实验三

web实验三 三四个小时左右吧&#xff0c;做成功了学到新东西了&#xff0c;还是挺有趣的&#xff0c;好玩。还有些功能没做完&#xff0c;暂时这样了&#xff0c;要交了。 html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF…

VUE3+django接口自动化部署平台部署说明文档(使用说明,需要私信)

网址连接&#xff1a;http://118.25.110.213:5200/#/login 账号/密码&#xff1a;renxiaoyong 1、VUE3部署本地。 1.1本地安装部署node.js 1.2安装vue脚手架 npm install -g vue/cli # 或者 yarn global add vue/cli1.3创建本地项目 vue create my-vue-project1.4安装依赖和插…

C++ 智能指针(高频面试题)

本篇文章来介绍一下C高频面试题 智能指针。 1.智能指针高频问题&#xff1a; 接下来我会为大家一 一解读&#xff1a; 2.智能指针的由来&#xff1a; 在实际开发中 遇到的困境&#xff1a; 3.智能指针的核心是采用RAII思想来自动化管理指针指向的动态资源的释放&#xff08;…

Leetcode Hot 100 【二叉树】104. 二叉树的最大深度

104. 二叉树的最大深度 已解答 简单 相关标签 相关企业 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3…

Connecting to Oracle 11g Database in Python

# encoding: utf-8 # 版权所有 2024 涂聚文有限公司 # 许可信息查看&#xff1a;言語成了邀功盡責的功臣&#xff0c;還需要行爲每日來值班嗎 # 描述&#xff1a;python -m pip install oracledb # python -m pip install cx_Oracle --upgrade # pip install cx_Oracle # Autho…

UE5喷涂功能

许多FPS/TPS 游戏都有喷涂、涂鸦功能 其实原理很简单&#xff0c;就是利用了延迟贴花实现的 我们从网上随便找一张图 创建一个材质&#xff0c;材质域选择延迟贴花 混合模式选择半透明&#xff0c;自发光强度可以看感觉调整 材质做好之后编译保存&#xff0c;新建一个Actor…

PCL点云库入门——PCL库中点云数据拓扑关系之K-D树(KDtree)

1、点云的拓扑邻域 在三维空间数据处理的领域中&#xff0c;点云的邻域概念显得尤为关键&#xff0c;它不仅链接了点云数据之间的拓扑结构&#xff0c;而且在构建点云间的拓扑关系时起到了桥梁的作用。这种关系的建立&#xff0c;使得我们能够以一种高效、迅速的方式管理庞大的…

【bodgeito】攻防实战记录

也许有一天我们再相逢&#xff0c;睁开眼睛看清楚&#xff0c;我才是英雄。 进入网站整体浏览网页 点击页面评分进入关卡 一般搭建之后这里都是红色的&#xff0c;黄色是代表接近&#xff0c;绿色代表过关 首先来到搜索处本着见框就插的原则 构造payload输入 <script>…

【1.排序】

排序 笔记记录 1.排序的基本概念1.1 排序的定义 2. 插入排序2.1 直接插入排序2.2 折半插入排序2.3 希尔排序 3. 交换排序3.1 冒泡排序3.2 快速排序 4. 选择排序4.1 简单选择排序4.2 堆排序 5. 归并排序、基数排序和计数排序5.1 归并排序4.2 基数排序4.3 计数排序 6. 各种内部排…

杂七杂八的网络安全知识

一、信息安全概述 1.信息与信息安全 信息与信息技术 信息奠基人&#xff1a;香农&#xff1a;信息是用来消除随机不确定性的东西 信息的定义&#xff1a;信息是有意义的数据&#xff0c;是一种要适当保护的资产。数据经过加工处理之后&#xff0c;就成为信息。而信息需要经…

Loki 微服务模式组件介绍

目录 一、简介 二、架构图 三、组件介绍 Distributor&#xff08;分发器&#xff09; Ingester&#xff08;存储器&#xff09; Querier&#xff08;查询器&#xff09; Query Frontend&#xff08;查询前端&#xff09; Index Gateway&#xff08;索引网关&#xff09…

基于LabVIEW的USRP信道测量开发

随着无线通信技术的不断发展&#xff0c;基于软件无线电的设备&#xff08;如USRP&#xff09;在信道测量、无线通信测试等领域扮演着重要角色。通过LabVIEW与USRP的结合&#xff0c;开发者可以实现信号生成、接收及信道估计等功能。尽管LabVIEW提供了丰富的信号处理工具和图形…