Mqtt消费端实现的几种方式

此处测试的mqtt的Broker是使用的EMQX 5.7.1,可移步至https://blog.csdn.net/tiantang_1986/article/details/140443513查看详细介绍

一、方式1

添加必要的依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

# mqtt 服务端配置
spring:# mqtt 配置mqtt:url: tcp://127.0.0.1:1883,tcp://127.0.0.2:1883clientId: "00000001"       # 客户端Id(不可重复)username: <访问用户名>      # 认证的用户名password: <访问密码>        # 认证的密码qos: 1topic: test/#              # 监听的topic

读取配置文件

import org.apache.commons.lang3.StringUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;@Data
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfig {private String username;private String password;private String url;private String clientId;private String topic;private Integer qos;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());if (StringUtils.isNotBlank(url) && url.contains(",")) {options.setServerURIs(url.split(","));} else {options.setServerURIs(new String[]{url});}        options.setCleanSession(true);//自动重连options.setAutomaticReconnect(true);//设置超时时间,单位为秒options.setConnectionTimeout(0);//设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(90);//设置遗嘱消息options.setWill("will_topic", (this.clientId + "与服务器断开连接").getBytes(), qos, false);factory.setConnectionOptions(options);factory.setPersistence(new MemoryPersistence());return factory;}
}

MQTT消息入站配置

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {@Resourceprivate MqttConfig mqttConfig;@Resourceprivate MqttPahoClientFactory mqttClientFactory;@Resourceprivate MqttMessageReceiver mqttMessageReceiver;@Beanpublic MessageChannel mqttInBoundChannel() {return new PublishSubscribeChannel();}@Beanpublic MessageProducerSupport mqttInbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory, mqttConfig.getTopic());DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();//传输Hex数据,如果是String则可使用默认值falseconverter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setRecoveryInterval(10000);adapter.setQos(mqttConfig.getQos());adapter.setOutputChannel(mqttInBoundChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInBoundChannel")public MessageHandler mqttMessageHandler() {return this.mqttMessageReceiver;}
}

消费者

@Slf4j
@Component
public class MqttMessageReceiver implements MessageHandler {@Resourceprivate DataConvertStrategyFactory convertStrategyContext;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {MessageHeaders headers = message.getHeaders();String topic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);if (StringUtils.isNotBlank(topic)) {return;}byte[] payload = (byte[]) message.getPayload();log.info("topic: {}, message: {}", topic, HexUtils.bytesToHex(payload));//从topic中获取clientId,topic的格式:{业务}/{clientId}/{事件标识}Map<String, String> map = MqttDataConverter.covertTopic(topic);String clientId = map.get("clientId");log.info("clientId: {}", clientId);//topic中的事件标识String eventUrl = map.get("event");//自定义的enum,主要用来消息处理消息分组,相同组可以使用相同的数据转换服务Event[] events = Event.values();String deviceId = clientId;Arrays.stream(events).filter(item -> item.getEvent().equals(eventUrl)).findFirst().ifPresent(item -> {//使用策略模式实现DataConvertService convertService = convertStrategyContext.getStrategy(item.getGroup());convertService.convert(deviceId, eventUrl, payload);});}
}

数据转换服务接口,具体的数据解析只要实现这个接口就行

public interface DataConvertService {/*** 转换数据** @param clientId 设备SN* @param topic  topic* @param data  数据* @return*/Boolean convert(String clientId, String topic, byte[] data);/*** 获取转换器** @return*/String getConverter();
}

MQTT数据转换策略工厂

@Component
public class DataConvertStrategyFactory implements InitializingBean {@Resourceprivate List<DataConvertService> handlers;private Map<String, DataConvertService> dataConvertServiceMap = new ConcurrentHashMap<>();/*** 初始化*/@Overridepublic void afterPropertiesSet() {//进行初始化if (CollectionUtils.isNotEmpty(handlers)) {handlers.forEach(item -> {dataConvertServiceMap.put(item.getConverter(), item);});}}/*** 返回实际处理对象** @param strategy 处理策略* @return 实际处理对象*/public DataConvertService getStrategy(String strategy) {return dataConvertServiceMap.get(strategy);}
}

二、方式2

使用EMQXWebhook钩子
首先创建钩子函数,把需要监听的事件加上处理逻辑,示例:

@Slf4j
@RequestMapping("/mqtt/client")
@RestController
public class ClientController {@PostMapping("/webhook")public Result webhook(@RequestBody Map<String, Object> message) {log.info("webhook map:{}", message);String action = (String) message.get("action");String clientid = (String) message.get("clientid");if ("client_connected".equals(action)) {log.info("client:{} 上线", clientid);}if ("client_disconnected".equals(action)) {log.info("client:{} 下线", clientid);}if ("message.publish".equals(action)) {log.info("已接收到 client:{} 的消息:{}", clientid, message.get("payload"));}return Result.success("OK");}
}

然后在EMQX的Dashboard中创建Webhook,可以选择多个触发器
在这里插入图片描述
填好URL后可以进行测试,之后使用MQTTX进行消息发送测试
在这里插入图片描述
控制台输出日志
在这里插入图片描述

三、方式3

package com.iinplus.mqtt.handler;import com.iinplus.mqtt.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Slf4j
@Component
public class MqttSubscriber implements InitializingBean {@Resourceprivate MqttConfig config;@Overridepublic void afterPropertiesSet() {try {MqttClient client = new MqttClient(config.getUrl(), config.getClientId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(config.getUsername());options.setPassword(config.getPassword().toCharArray());options.setCleanSession(true);options.setAutomaticReconnect(true);options.setConnectionTimeout(0);client.connect(options);client.subscribe(config.getTopic());//设置消息回调client.setCallback(new MqttMsgHandler());} catch (MqttException e) {log.error("MqttException:", e);}}
}

消息回调处理

@Slf4j
public class MqttMsgHandler implements MqttCallback {@Overridepublic void connectionLost(Throwable t) {// 连接丢失log.info("Connection lost:", t);}@Overridepublic void messageArrived(String topic, MqttMessage message) {// 接收到消息log.info("Message arrived:" + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 消息发送成功log.info("Delivery complete");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/415628.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

直播相关概念

文章目录 1、腾讯云直播2、直播&#xff1a;视频直播3、常用的直播组合&#xff1a;4、推流&#xff1a;主播通过推流地址进行视频的推送5、拉流&#xff1a;观众通过拉流地址进行视频的播放6、准备工作6.1、进入腾讯云直播 1、腾讯云直播 直播即时聊天&#xff1a;打赏 文字 …

Linux运维--iptables防火墙命令以及端口号等详解(全)

Linux之iptable防火墙命令以及端口号等详解&#xff08;全&#xff09; 在Linux系统中&#xff0c;你可以使用firewalld和iptables来管理和设置防火墙规则。Firewalld是一个动态管理防火墙的工具&#xff0c;而iptables是一个更底层的工具&#xff0c;可以直接配置Linux内核的…

【重学 MySQL】一、数据库概述

【重学 MySQL】一、数据库概述 为什么要使用数据库数据库与数据库管理系统数据库&#xff08;Database&#xff09;数据库管理系统&#xff08;DBMS&#xff09;数据库与数据库管理系统的关系数据库是数据存储的容器数据库管理系统是数据库的管理者相互依存的关系数据库系统的组…

【网络安全】服务基础第一阶段——第六节:Windows系统管理基础---- DNS部署与安全

计算机智能识别并用IP地址定位&#xff0c;例如我们想要访问一个网页&#xff0c;其实是只能使用这个网页的IP地址&#xff0c;即四位的0&#xff5e;255来访问&#xff0c;但这一串数字难以记忆&#xff0c;于是就有了DNS&#xff0c;将难以记忆的数字转化为容易记忆的域名&am…

Elasticsearch 介绍

1、课程介绍 1.1 ES 8.x 演化进程 版本号发布日期多少个次要版本迭代历时8.02022年2月11日&#xff1f;至今7.02019年4月11日17个次要版本34个月6.02017年11月15日8个次要版本17个月5.02016年10月27日6个次要版本13个月 2、Elasticsearch 是什么 2.1 概念 2.1.1 标准定义 …

文件上传的学习

文件上传漏洞 文件上传漏洞是指由于程序员在对用户文件上传部分的控制不足或者处理缺陷&#xff0c;而导致的用户可以越过其本身权限向服务器上上传可执行的动态脚本文件。这里上传的文件可以是木马&#xff0c;病毒&#xff0c;恶意脚本或者WebShell等。“文件上传”本身没有…

计算机二级 C程序设计(2020B场)全解

A选项&#xff1a;C语言中&#xff0c;一共有3种结构。分别是顺序结构、选择结构&#xff08;else-if语句&#xff09;、循环结构&#xff08;for、while语句&#xff09;。因此&#xff0c;C语言具有结构化特征。 B选项&#xff1a;不仅能解决简单问题&#xff0c;3种基本结构…

WPF MVVM如何在ViewModel直接操作控件对象

早些年在WPF中使用COM组件时&#xff0c;需要在ViewModel中操作COM组件中的控件对象&#xff0c;但是这个控件对象又不支持绑定&#xff0c; 后面的解决办法是在窗口加载时&#xff0c;将控件对象以参数传递到Loaded事件的处理命令中&#xff0c;然后将这个对象记录下来&#…

Ubuntu 18.04升级gclibc为2.28版本

一、查看系统支持的 GLIBC 版本号 ​strings /lib/x86_64-linux-gnu/libc.so.6 | grep GLIBC_出现以下&#xff0c;说明到2.27版本&#xff0c;没有2.28版本&#xff0c;所以我们需要手动安装 GLIBC_2.2.5 GLIBC_2.2.6 GLIBC_2.3 GLIBC_2.3.2 GLIBC_2.3.3 GLIBC_2.3.4 GLIBC_…

Docker入门笔记

Docker 文章目录 Docker1. 下载 &#xff08;centos&#xff09;2. 部署 MySQL3. 常用命令4. 数据卷5. 自定义镜像6. Java 项目部署 1. 下载 &#xff08;centos&#xff09; 卸载旧版 yum remove docker \docker-client \docker-client-latest \docker-common \docker-lates…

84、 k8s的pod基础+https-harbor

一、pod基础&#xff1a; pod进阶&#xff1a;探针&#xff08;面试必问—扩缩容&#xff0c;挂载&#xff09; 1.1、pod的定义 pod是k8s里面的最小单位&#xff0c;pod也是最小运行容器的资源对象。 容器时基于pod在k8s集群当中工作。 在k8s集群当中&#xff0c;一个pod就…

第二阶段:机器学习经典算法-02决策树与随机森林-1.决策树概述

该视频主要讲述了决策树与随机森林算法的基本概念和构造过程。决策树是一个树形结构&#xff0c;用于进行一系列的决策&#xff0c;可以用于分类和回归问题。随机森林算法是基于决策树的集成学习算法&#xff0c;通过构建多棵决策树并结合它们的预测结果来提高分类准确率。视频…

asp.net core web api项目添加自定义中间件

前言 在asp.net core web api项目中&#xff0c;默认提供了很多的中间件&#xff0c;比如访问静态文件中间件UseStaticFiles&#xff0c;跨域配置中间件UseCors&#xff0c;路由中间件UseRouting,身份验证中间件UseAuthentication。 那么如何添加一些自定义的中间件呢。 需求…

java SpringBoot 使用ijpay对接微信支付-商家转账到零钱

使用的maven版本&#xff1a;2.9.11 由于ijpay中提供的实体类没有设置回调参数的属性&#xff0c; 这里是自定义一个实体类:InitiateBatchTransferRequest代码如下&#xff1a; package com.foo.web.controller.pay.wxpay;import com.ijpay.wxpay.model.v3.TransferDetailInput…

【办公软件】Excel如何开n次方根

在文章&#xff1a;【分立元件】电阻的基础知识中我们学习电阻值、电阻值容差标注相关标准。知道了标准将电阻值标准数列化。因此电阻值并非1Ω、2Ω、3Ω那样的整数&#xff0c;而是2.2Ω、4.7Ω那样的小数。 这是因为电阻值以标准数(E系列)为准。系列的“E”是Exponent(指数)…

react vant 在使用dialog.confirm取消报错 Uncaught (in promise) undefined

项目场景&#xff1a; 在使用react做移动端开发时&#xff0c;需要使用Dialog.confirm确认框来做弹框选项&#xff0c;这是在操作中非常常用的一种场景。 问题描述 在列表中&#xff0c;使用弹框时&#xff0c;点击取消时&#xff0c;语法报错&#xff1b;导致后面再触发弹框…

【RabbitMQ之一:windows环境下安装RabbitMQ】

目录 一、下载并安装Erlang1、下载Erlang2、安装Erlang3、配置环境变量4、验证erlang是否安装成功 二、下载并安装RabbitMQ1、下载RabbitMQ2、安装RabbitMQ3、配置环境变量4、验证RabbitMQ是否安装成功5、启动RabbitMQ服务&#xff08;安装后服务默认自启动&#xff09; 三、安…

作业0903

1.封装栈 #include <iostream>using namespace std;class myStack { private:int size; // 大小int capacity;int *ptr;int top; // 栈顶下标 public:// 无参构造函数myStack():size(0), top(-1), capacity(10) {ptr new int[capacity];}// 有参构造函数myStack(in…

Linux Debian12使用flameshot或gnome-screenshot和ImageMagick垂直合并多张图片后组成一张滚动长图

在发布博客&#xff0c;有时需要滚动截长图&#xff0c;虽然在windows系统有滚动截长图的工具&#xff0c;例如&#xff1a;FastStone Capture等&#xff0c;但是Linux Debian系统&#xff0c;这种滚动截长图的工具没有找到合适的。经过自己筛选验证&#xff0c;发现Linux Debi…

基于Bert-base-chinese训练多分类文本模型(代码详解)

目录 一、简介 二、模型训练 三、模型推理 一、简介 BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;是基于深度学习在自然语言处理&#xff08;NLP&#xff09;领域近几年出现的、影响深远的创新模型之一。在BERT之前&#xff0c;已…