Java实践-物联网loT入门-MQTT传输协议

前言

MQTT是一个极其轻量级发布/订阅消息传输协议,适用于网络带宽较低的场合.

通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息

业务场景

硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。

MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的。

什么是MQTT

MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景,比如:

  • 遥感数据
  • 汽车
  • 智能家居
  • 智慧城市
  • 医疗医护

由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

  1. 精简,不添加可有可无的功能。
  2. 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递。
  3. 允许用户动态创建主题,零运维成本。
  4. 把传输量降到最低以提高传输效率。
  5. 把低带宽、高延迟、不稳定的网络等因素考虑在内。
  6. 支持连续的会话控制。
  7. 理解客户端计算能力可能很低。
  8. 提供服务质量管理。
  9. 假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

MQTT与HTTP的区别

首先看一下HTTP请求:

  • HTTP 是一种同步协议。客户端需要等待服务器响应。Web 浏览器具有这样的要求,但它的代价是牺牲了可伸缩性。
  • HTTP 是单向的。客户端必须发起连接。
  • HTTP 是一种 1-1 协议。客户端发出请求,服务器进行响应。将消息传送到网络上的所有设备上,不但很困难,而且成本很高。
  • HTTP 是一种有许多标头和规则的重量级协议。它不适合受限的网络。

再来看一下MQTT: 

        在 IoT 领域,大量设备以及很可能不可靠或高延迟的网络使得同步通信成为问题。异步消息协议更适合 IoT 应用程序。传感器发送读数,让网络确定将其传送到目标设备和服务的最佳路线和时间。在 IoT 应用程序中,设备或传感器通常是客户端,这意味着它们无法被动地接收来自网络的命令。

MQTT的核心: 发布和订阅模型 

MQTT 协议在网络中定义了两种实体类型:一个消息代理和一些客户端。

代理是一个服务器,它从客户端接收所有消息,然后将这些消息路由到相关的目标客户端。

客户端是能够与代理交互来发送和接收消息的任何事物。客户端可以是现场的 IoT 传感器,或者是数据中心内处理 IoT 数据的应用程序。

  1. 客户端连接到代理。它可以订阅代理中的任何消息“主题”。此连接可以是简单的 TCP/IP 连接,也可以是用于发送敏感消息的加密 TLS 连接。
  2. 客户端通过将消息和主题发送给代理,发布某个主题范围内的消息。
  3. 代理然后将消息转发给所有订阅该主题的客户端。

        同时,MQTT 是轻量级的。它有一个用来指定消息类型的简单标头,有一个基于文本的主题,还有一个任意的二进制有效负载。应用程序可对有效负载采用任何数据格式,比如 JSON、XML、加密二进制或 Base64,只要目标客户端能够解析该有效负载。

Java实例

1.通过包管理工具 Maven导入依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>

 2.编写订阅方的代码,并启动。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/*** 订阅方*/
public class SubscribeSample {public static void main(String[] args) {//EMQ X 默认端口 1883String broker = "tcp://broker.emqx.io:1883";String TOPIC = "test";int qos = 1;String clientid = "subClient";String userName = "admin";String passWord = "password";try {// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());// MQTT的连接设置MqttConnectOptions options = new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接options.setCleanSession(true);// 设置连接的用户名options.setUserName(userName);// 设置连接的密码options.setPassword(passWord.toCharArray());// 设置超时时间 单位为秒options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.setKeepAliveInterval(20);// 设置回调函数client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("connectionLost");}public void messageArrived(String topic, MqttMessage message) {System.out.println("======监听到来自[" + topic + "]的消息======");System.out.println("message content:"+new String(message.getPayload()));System.out.println("============");}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------"+ token.isComplete());}});// 建立连接System.out.println("连接到 broker: " + broker);client.connect(options);System.out.println("连接成功.");//订阅消息client.subscribe(TOPIC, qos);System.out.println("开始监听" + TOPIC);} catch (Exception e) {e.printStackTrace();}}
}

启动订阅方运行结果如下:

 

3.编写发布方的代码并启动

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/*** 发布方*/
public class PublishSample {public static void main(String[] args) {String topic = "test";String content = "你好,我给你发了条消息呀!!!!!!!!!!!";int qos = 1;String broker = "tcp://broker.emqx.io:1883";String userName = "admin";String password = "password";String clientId = "pubClient";// 内存存储MemoryPersistence persistence = new MemoryPersistence();try {// 创建客户端MqttClient sampleClient = new MqttClient(broker, clientId, persistence);// 创建链接参数MqttConnectOptions connOpts = new MqttConnectOptions();// 在重新启动和重新连接时记住状态connOpts.setCleanSession(false);// 设置连接的用户名connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());// 建立连接System.out.println("连接到 broker: " + broker);sampleClient.connect(connOpts);System.out.println("连接成功.");// 创建消息MqttMessage message = new MqttMessage(content.getBytes());// 设置消息的服务质量message.setQos(qos);// 发布消息System.out.println("向" + topic + "发送消息:" + message);sampleClient.publish(topic, message);// 断开连接sampleClient.disconnect();// 关闭客户端sampleClient.close();} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}

 

启动发布方运行结果如下:

4.最后查看订阅方的控制台,订阅方收到消息

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/124682.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华硕ROG2/ROG5/ROG6/ROG7Pro强解锁L锁-快速实现root权限-支持Zenfone9/8/7

2023年9月新增解锁BL适配&#xff08;需要联系技术远程操作&#xff09;&#xff1a; 新增支持华硕ROG5/5S/5Pro机型强制解锁BL&#xff0c;并且支持OTA在线更新功能 新增支持华硕ROG6/6Pro机型强制解锁BL&#xff0c;并且支持OTA在线更新功能 新增支持华硕ROG7/7Pro机型强制解…

kuiper安装

1:使用docker方式安装 docker pull lfedge/ekuiper:latest docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVERtcp://127.0.0.1:1883 lfedge/ekuiper:latest这样就安装好了&#xff0c;但是操作只能通过命令完成&#xff0c;如果想要通过页面来操作&…

1065 A+B and C (64bit)

题&#xff1a;点我 题目大意&#xff1a; 这题虽然看着像签到&#xff0c;然鹅签不过去。 因为我最初写的沙雕代码是&#xff1a; #include<iostream> #include<cstdio> using namespace std; int main(void) {int t;scanf("%d", &t);for (int i …

Java后端开发面试题——JVM虚拟机篇

目录 什么是程序计数器&#xff1f; 你能给我详细的介绍Java堆吗? 什么是虚拟机栈 1. 垃圾回收是否涉及栈内存&#xff1f; 2. 栈内存分配越大越好吗&#xff1f; 3. 方法内的局部变量是否线程安全&#xff1f; 4.什么情况下会导致栈内存溢出&#xff1f; 5.堆栈的区别…

React Hook之useContext

1. 什么是useContext React官方解释&#xff1a;useContext 是一个 React Hook&#xff0c;可以让你读取和订阅组件中的 context&#xff08;React官方文档地址&#xff09;。 通俗的讲&#xff0c;useContext的作用就是&#xff1a;实现组件间的状态共享&#xff0c;主要应用场…

RFID溯源驱动汽车座椅制造的智能时代

在今天的快速发展的制造业中&#xff0c;信息化和智能化已经成为不可或缺的部分。信息化和智能化能够极大地提高生产效率、减少浪费&#xff0c;降低成本&#xff0c;提升产品的质量。汽车座椅产线信息化和智能化是汽车座椅产线升级的重要方向&#xff0c;RFID技术方案在汽车座…

【Flask】from flask_sqlalchemy import SQLAlchemy报错

【可能出现的情况】 1、未安装 Flask-SQLAlchemy&#xff1a; 在使用 flask_sqlalchemy 之前&#xff0c;你需要确保已经通过 pip 安装了 Flask-SQLAlchemy。可以通过以下命令安装它&#xff1a; pip install Flask-SQLAlchemy 2、包名大小写问题&#xff1a; Python 是区分大…

VGG 07

一、发展 1989年&#xff0c;Yann LeCun提出了一种用反向传导进行更新的卷积神经网络&#xff0c;称为LeNet。 1998年&#xff0c;Yann LeCun提出了一种用反向传导进行更新的卷积神经网络&#xff0c;称为LeNet-5 AlexNet是2012年ISLVRC 2012&#xff08;ImageNet Large Sca…

l8-d8 TCP并发实现

一、TCP多进程并发 1.地址快速重用 先退出服务端&#xff0c;后退出客户端&#xff0c;则服务端会出现以下错误&#xff1a; 地址仍在使用中 解决方法&#xff1a; /*地址快速重用*/ int flag1,len sizeof (int); if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &a…

yum安装mysql5.7散记

## 数据源安装 $ yum -y install wget $ wget http://dev.mysql.com/get/mysql57-community-release-el7-8.noarch.rpm $ yum localinstall mysql57-community-release-el7-8.noarch.rpm $ yum repolist enabled | grep "mysql.*-community.*" $ yum install mysql-…

宇凡微YE09合封芯片,集成高性能32位mcu和2.4G芯片

合封芯片是指将主控芯片和外部器件合并封装的芯片&#xff0c;能大幅降低开发成本、采购成本、减少pcb面积等等。宇凡微YE09合封芯片&#xff0c;将技术领域推向新的高度。这款高度创新性的芯片融合了32位MCU和2.4G芯片&#xff0c;为各种应用场景提供卓越的功能和性能。 32位M…

谈一谈冷门的C语言爬虫

目录 C语言写爬虫是可行的 C语言爬虫不受待见 C语言爬虫有哪些可用的库和工具 C语言爬虫示例 总结 在当今的编程世界中&#xff0c;C语言相比于一些主流编程语言如Python、JavaScript等&#xff0c;使用范围相对较窄。然而&#xff0c;尽管C语言在爬虫领域的应用并不常见&…

Redis持久化、主从与哨兵架构详解

Redis持久化 RDB快照&#xff08;snapshot&#xff09; 在默认情况下&#xff0c; Redis 将内存数据库快照保存在名字为 dump.rdb 的二进制文件中。 你可以对 Redis 进行设置&#xff0c; 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时&#xff0c; 自动保存一次数…

MIT6.824 Spring2021 Lab 1: MapReduce

文章目录 0x00 准备0x01 MapReduce简介0x02 RPC0x03 调试0x04 代码coordinator.gorpc.goworker.go 0x00 准备 阅读MapReduce论文配置GO环境 因为之前没用过GO,所以 先在网上学了一下语法A Tour of Go 感觉Go的接口和方法的语法和C挺不一样, 并发编程也挺有意思 0x01 MapRed…

OCR多语言识别模型构建资料收集

OCR多语言识别模型构建 构建多语言识别模型方案 合合&#xff0c;百度&#xff0c;腾讯&#xff0c;阿里这四家的不错 调研多家&#xff0c;发现有两种方案&#xff0c;但是大多数厂商都是将多语言放在一个字典里&#xff0c;构建1w~2W的字典&#xff0c;训练一个可识别多种语…

解决报错之org.aspectj.lang不存在

一、IDEA在使用时&#xff0c;可能会遇到maven依赖包明明存在&#xff0c;但是build或者启动时&#xff0c;报找不存在。 解决办法&#xff1a;第一时间检查Setting->Maven-Runner红圈中的√有没有选上。 二、有时候&#xff0c;明明依赖包存在&#xff0c;但是Maven页签中…

监听Helm release资源

监听Helm release资源 基于helm做部署管理工具时&#xff0c;可能想要管理用户已有环境&#xff0c;这时需要将已有环境中的release信息上报到业务系统中。当用户在环境中部署新的release时&#xff0c;也需要实时监听并上报回来。下面将讲解如何去监听release资源 helm rele…

湖南省副省长秦国文一行调研考察亚信科技

9月5日&#xff0c;湖南省人民政府党组成员、副省长秦国文一行到亚信科技调研考察&#xff0c;亚信科技高级副总裁陈武主持接待。 图&#xff1a;双方合影 在亚信科技创新展示中心&#xff0c;秦国文了解了亚信科技在5G、算力网络、人工智能、大数据等前沿领域的创新探索&…

Git 基本原理和常用操作

Git Git 是一个开源的分布式版本控制系统&#xff0c;可以有效、高速地处理从很小到非常大的项目版本管理。由 Linus Torvalds 为了帮助管理 Linux 内核开发而开发的一个开源的版本控制软件。 Git 常用操作 git 提交流程&#xff1a;工作区 -> git add 到暂存区 -> gi…

Pytorch中如何加载数据、Tensorboard、Transforms的使用

一、Pytorch中如何加载数据 在Pytorch中涉及到如何读取数据&#xff0c;主要是两个类一个类是Dataset、Dataloader Dataset 提供一种方式获取数据&#xff0c;及其对应的label。主要包含以下两个功能&#xff1a; 如何获取每一个数据以及label 告诉我们总共有多少的数据 Datal…