Java模拟Mqtt客户端连接Mqtt Broker

Java模拟Mqtt客户端基本流程

引入Paho MQTT客户端库

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version>
</dependency>

设置mqtt配置数据

在application.yml中添加如下配置

mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_receive_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0

MqttClient配置

将MqttClient加入到IoC容器,并连接客户端

package com.angel.ocean.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);return client;}
}

MqttService

mqtt客户端,一些基本操作:连接、订阅、发消息,断开连接

package com.angel.ocean.mqtt;import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Slf4j
@Service
public class MqttService {@Resourceprivate MqttClient client;@Resourceprivate KafkaService kafkaService;@PostConstructpublic void init() throws MqttException {client.setCallback(new MqttCallbackHandler(kafkaService));subscribe(MqttTopicConstant.ACTIVATE);subscribe(MqttTopicConstant.RESET);subscribe(MqttTopicConstant.ONLINE);subscribe(MqttTopicConstant.OFFLINE);subscribe(MqttTopicConstant.REPORT);}/*** 连接*/public void connect(String username, String password) throws MqttException {if(!client.isConnected()) {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);}}/*** 发送消息*/public void publish(String topic, String data) {if(client.isConnected()) {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(0);try {client.publish(topic, message);log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, data);} catch (MqttException e) {log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message publish failed, client:{} not online.", client.getClientId());}/*** 订阅*/public void subscribe(String topic) {if(client.isConnected()) {try {client.subscribe(topic);log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);} catch (MqttException e) {log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message subscribe failed, client:{} not online.", client.getClientId());}/*** 断开连接*/public void disconnect() {try {client.disconnect();client.close();log.info("Disconnected:{}", client.getClientId());} catch (MqttException e) {log.error("Message disconnect failed:{}", client.getClientId(), e);}}
}

自定义MqttCallback

对客户端连接丢失,收到消息做一些模拟处理

package com.angel.ocean.mqtt;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.domain.UpData;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;@Slf4j
public class MqttCallbackHandler implements MqttCallback {private KafkaService kafkaService;public MqttCallbackHandler(KafkaService kafkaService) {this.kafkaService = kafkaService;}@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开...", cause);}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String data = new String(message.getPayload());log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);UpData upData = JSONObject.parseObject(data, UpData.class);UpKafKaData upKafKaData = new UpKafKaData(topic, data);log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------:{}", token.isComplete());}
}

MqttController

用于模拟客户端行为

package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.mqtt.MqttService;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/***  前端控制器** @author Jaime.yu* @time 2024-12-01*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {@Resourceprivate MqttService mqttService;@GetMapping("/subscribe")public ApiResult<?> subscribe(String topic) {mqttService.subscribe(topic);return ApiResult.success();}@GetMapping("/publish")public ApiResult<?> publish(String topic, String message) {mqttService.publish(topic, message);return ApiResult.success();}@GetMapping("/disconnect")public ApiResult<?> disconnect() {mqttService.disconnect();return ApiResult.success();}
}

代码验证

启动mqtt客户端

如下图客户端已上线:
在这里插入图片描述

发送消息

在这里插入图片描述如下图mqtt broker该客户端的日志,接收到了我们发送的数据:hello world
在这里插入图片描述

接收数据

首先我们先订阅个主题:mqtt/0/0

在这里插入图片描述

使用MQTTX客户端向该主题发消息

在这里插入图片描述

Java mqtt客户端接收数据

查询本地Java mqtt客户收到的消息,如下图收到该消息
在这里插入图片描述mqtt broker 也可以看到该日志:
在这里插入图片描述

断开连接

在这里插入图片描述如下图本地客户端862024121819020已断开连接:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/493508.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

圣诞快乐(h5 css js(圣诞树))

一&#xff0c;整体设计思路 圣诞树h5&#xff08;简易&#xff09; 1.页面布局与样式&#xff1a; 页面使用了全屏的黑色背景&#xff0c;中央显示圣诞树&#xff0c;树形由三层绿色的三角形组成&#xff0c;每一层的大小逐渐变小。树干是一个棕色的矩形&#xff0c;位于三角…

多音轨视频使用FFmpeg删除不要音轨方法

近期给孩子找宫崎骏动画&#xff0c;但是有很多是多音轨视频但是默认的都是日语&#xff0c;电视上看没办法所以只能下载后删除音轨文件只保留中文。 方法分两步&#xff0c;先安装FFmpeg在转文件即可。 第一步FFmpeg安装 FFmpeg是一个开源项目&#xff0c;包含了处理视频的…

时空信息平台架构搭建:基于netty封装TCP通讯模块(IdleStateHandler网络连接监测,处理假死)

文章目录 引言I 异步TCP连接操作II 心跳机制:空闲检测(读空闲和写空闲)基于Netty的IdleStateHandler类实现心跳机制(网络连接监测)常规的处理假死健壮性的处理假死方案引言 基于netty实现TCP客户端:封装断线重连、连接保持 https://blog.csdn.net/z929118967/article/de…

中国新能源汽车公共充电桩数据合集(2002-2023年)

数据来源&#xff1a;全国各省市统计年鉴、统计公报、国家能源署、中国汽车行业协会&#xff0c;各类汽车统计年鉴、中国电动汽车充电基础设施促进联盟等 时间跨度&#xff1a;新能源汽车数据集&#xff1a;2002-2023年&#xff08;不同数据时间跨度有差异&#xff0c;详见数据…

设计模式12:状态模式

系列总链接&#xff1a;《大话设计模式》学习记录_net 大话设计-CSDN博客 参考&#xff1a;设计模式之状态模式 (C 实现)_设计模式的状态模式实现-CSDN博客 1.概述 状态模式允许一个对象在其内部状态改变时改变其行为。对象看起来像是改变了其类。使用状态模式可以将状态的相…

国内网络在Ubuntu 22.04中在线安装Ollama并配置Open-WebuiDify

配置docker科技网络 登录后复制 创建或编辑 Docker 配置文件 让docker使用代理&#xff1a; sudo mkdir /etc/systemd/system/docker.service.d -p sudo vim /etc/systemd/system/docker.service.d/http-proxy.conf 文件&#xff0c;并添加以下内容&#xff1a; [Service] En…

【线性代数】理解矩阵乘法的意义(点乘)

刚接触线性代数时&#xff0c;很不理解矩阵乘法的计算规则&#xff0c;为什么规则定义的看起来那么有规律却又莫名其妙&#xff0c;现在参考了一些资料&#xff0c;回过头重新总结下个人对矩阵乘法的理解&#xff08;严格来说是点乘&#xff09;。 理解矩阵和矩阵的乘法&#x…

国标GB28181协议平台Liveweb:搭建建筑工地无线视频联网监控系统方案

随着科技高速发展&#xff0c;视频信号经过数字压缩&#xff0c;通过互联网宽带或者移动4G网络传递&#xff0c;可实现远程视频监控功能。将这一功能运用于施工现场安全管理&#xff0c;势必会大大提高管理效率&#xff0c;提升监管层次。而这些&#xff0c;通过Liveweb监控系统…

SQL语句练习

阅读《SQL必知必会》&#xff08;第五版&#xff09;然后结合往常表做的练习记录 这里使用的数据库时sqlite3,使用的工具时navicat 表资源链接https://wenku.baidu.com/view/349fb3639b6648d7c1c74652.html 表录入后如上图所示。后面如果有多张表之间的操作&#xff0c;在引入…

SAP RESTful架构和OData协议

一、RESTful架构 RESTful 架构&#xff08;Representational State Transfer&#xff09;是一种软件架构风格&#xff0c;专门用于构建基于网络的分布式系统&#xff0c;尤其是在 Web 服务中。它通过利用 HTTP 协议和一组简单的操作&#xff08;如 GET、POST、PUT、DELETE&…

基于MATLAB的图像增强

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;传知代码 欢迎大家点赞收藏评论&#x1f60a; 目录 一、背景及意义介绍背景图像采集过程中的局限性 意义 二、概述三、代码结构及说明&#xff08;一&#xff09;整体结构&#xff08;二&#xf…

通过阿里云 Milvus 与 PAI 搭建高效的检索增强对话系统

背景介绍 阿里云向量检索服务Milvus版&#xff08;简称阿里云Milvus&#xff09;是一款云上全托管服务&#xff0c;确保了了与开源Milvus的100%兼容性&#xff0c;并支持无缝迁移。在开源版本的基础上增强了可扩展性&#xff0c;能提供大规模 AI 向量数据的相似性检索服务。相…

滚珠花键的保养与维护方法

滚珠花键作为关键的线性运动引导装置&#xff0c;以其高精度和高刚性在众多领域发挥着举足轻重的作用。然而&#xff0c;为了保持其卓越的性能&#xff0c;保养与维护措施不可或缺。 滚珠花键的保养与维护其实就是润滑与清洁&#xff0c;以下是一些具体的保养与维护方法&#x…

Layui table不使用url属性结合laypage组件实现动态分页

从后台一次性获取所有数据赋值给 Layui table 组件的 data 属性&#xff0c;若数据量大时&#xff0c;很可能会超出浏览器字符串最大长度&#xff0c;导致渲染数据失败。Layui table 结合 laypage 组件实现动态分页可解决此问题。 HTML增加分页组件标签 在table后增加一个用于…

fastdds:idl

1使用网络收发数据的最简单方式 在学习idl之前&#xff0c;先来看一下我们在开发中&#xff0c;通过网络收发数据时&#xff0c;常常怎么实现。 struct Student {char name[32];int age;char sex;// f 男&#xff0c;m 女 };//发送侧 struct Student s1 {"xiaoming&q…

计算机网络之多路转接epoll

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 计算机网络之多路转接epoll 收录于专栏【计算机网络】 本专栏旨在分享学习计算机网络的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目…

多个Echart遍历生成 / 词图云

echart官网 安装 如果版本报错推荐安装以下版本 npm install echarts4.8.0 --savenpm uninstall echarts//这个是卸载命令以下安装成功后是局部引入:多个Echart遍历生成 vue3echart单个页面多个图表循环渲染展示:<template><div class"main"><div …

Windows server 服务器网络安全管理之防火墙出站规则设置

Windows server 服务器网络安全管理之防火墙出站规则设置 创建一条出站规则 这条出站规则针对IE浏览器设置&#xff0c;指定路径 TCP协议和指定端口&#xff08;多个端口的写法要注意&#xff09; 所有IP&#xff0c;所有应用&#xff0c;都采用阻止 给这条规则进行命名…

jmeter 接口性能测试 学习笔记

目录 说明工具准备工具配置jmeter 界面汉化配置汉化步骤汉化结果图 案例1&#xff1a;测试接口接口准备线程组添加线程组配置线程组值线程数&#xff08;Number of Threads&#xff09;Ramp-Up 时间&#xff08;Ramp-Up Period&#xff09;循环次数&#xff08;Loop Count&…

Pytorch | 从零构建ResNet对CIFAR10进行分类

Pytorch | 从零构建ResNet对CIFAR10进行分类 CIFAR10数据集ResNet核心思想网络结构创新点优点应用 ResNet结构代码详解结构代码代码详解BasicBlock 类ResNet 类ResNet18、ResNet34、ResNet50、ResNet101、ResNet152函数 训练过程和测试结果代码汇总resnet.pytrain.pytest.py 前…