python实现MQTT协议(发布者,订阅者,topic)

python实现MQTT协议

一、简介

1.1 概述

本文章针对物联网MQTT协议完成python实现

1.2 环境

  • Apache-apollo创建broker
  • Python实现发布者和订阅者

1.3 内容

  • MQTT协议架构说明 :

  • 利用仿真服务体会 MQTT协议

  • 针对MQTT协议进行测试

任务1:MQTT协议应用场景

说明: MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、 易于实现。这些特点使它适用于受限环境。该协议的特点有:

1 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2 对负载内容屏蔽的消息传输。
3 使用 TCP/IP 提供网络连接。

物联网应用场景:

image-20230901160722558

协议角色分工:

客户端分为2种角色:发布者(Publisher)和订阅者(Subscriber)。
每一个发布者(Publisher)可以发送不同类型的消息,我们把消息的类型叫做主题(topic),
MQTT通信中的消息都属于某一个主题 ,而只有订阅了这个主题的订阅者(Subscriber)才能收到属于这个主题的消息。
发布者和订阅者不需要在意和知道对方的存在(不需要知道对方的IP和端口),也不需要直接与对方建立连接。因为通信中存在着一个叫代理 (MQTT broker)的第三种角色,也可以叫MQTT服务器(MQTT server)。 
发布者、订阅者只需要知道MQTT服务器的IP和端口即可,并和它直接建立连接通信。MQTT代理作为消息的 中转,它过滤所有接受到的消息,并按照一定的机制(MQTT标准规定是基于主题的消息过滤派发方式,而具 体的MQTT服务器软件也提供了其他的派发方式)分发它们,使得所有注册到MQTT代理的订阅者只接收到他 们订阅了的消息,而不会收到他不关心的消息。
当发布者发布一条消息的时候,他必须同时指定消息的主题和消息的负载。MQTT代理在收到发布者发过来的 消息时,无需访问消息负载,他只是访问消息的主题信息,然后根据这主题派发给订阅者。需要注意的是,一个客户端可以同时既当发布者又当订阅者。比如一个开发板连接了一盏LED灯,它可以发布灯的暗/亮状态 信息,也可以从其他节点订阅对灯的控制消息。

3.生产者(发布消息)和消费者(消耗消息-订阅者)模式理解

生产者:wifi设备采集各种物联网传感器比如温度重力传感器
消费者:客户端比如手机
原理如下:

image-20230901095257410

任务2:搭建Mqtt协议服务 (broker)

前提:安装JDK和JAVA_HOME环境变量

1 下载Apollo服务器 地址 http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/

2 进入bin目录命令行 输入:

D:\softwares\apache-apollo-1.7.1\bin\apollo.cmd create jwbroker

3:broker\etc\apollo.xml文件下是配置服务器信息的文件
初始默认帐号是admin,密码password;

4:启动命令行: (以一个实例为单位进行创建的)

进入... jwbroker创建实例的\bin\ 目录,
在CMD输入命令apollo-broker.cmd run,可以使用TAB键自动补全,运行后输出信息如下:验证:
MQTT服务器TCP连接端口: tcp://0.0.0.0:61613
后台web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/

出现如下图表示启动成功

image-20230901154500857

安装mqtt需要的包:

pip install paho-mqtt

发布者publish创建:

import time
from paho.mqtt import publish
#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))client.subscribe("jw-temperature") # 发布主题def on_message(client,userdata,msg):print(msg.topc +  "消息发送!" + msg.payload.decode("utf-8"))if __name__ == '__main__':print("消息发布!----我是一个发布者:正在给设备和传感器发布主题-----")client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))for i in range(20):time.sleep(2)publish.single("lightChange","现在天黑了", qos = 1, hostname = HOST, port = PORT, client_id = client_id,auth = {'username': "admin", 'password': "password"})print("已发送"+str(i+1)+"条消息")

订阅者light_subcribe创建:

import paho.mqtt.client as mqtt
import time#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].
'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and  reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic+ msg.payload.decode("utf-8") +  ",回调消息:收到收到!我已经接收到发布者的消息,并且打开了光传感器" )def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手电筒打开----我是一个订阅者:需要消费主题-----")client_loop()

订阅者phone_subcribe创建:

import paho.mqtt.client as mqtt
import time#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613
'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and  reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic  + msg.payload.decode("utf-8")+  ",回调消息:收到收到!我已经接收到发布者的消息并给用户反馈手电筒已经打开")def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手机启动----我是一个订阅者:需要消费主题-----")client_loop()

演示:分别运行publish和light_subcribe和phone_subcribe

publish:

image-20230901160129432

light_subcribe:

image-20230901160148825

phone_subcribe:

image-20230901160158514

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/119084.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈安防视频监控平台EasyCVR视频汇聚平台对于夏季可视化智能溺水安全告警平台的重要性

每年夏天都是溺水事故高发的时期,许多未成年人喜欢在有水源的地方嬉戏,这导致了悲剧的发生。常见的溺水事故发生地包括水库、水坑、池塘、河流、溪边和海边等场所。 为了加强溺水风险的提示和预警,完善各类安全防护设施,并及时发现…

如何制作一个百货小程序

在这个数字化时代,小程序已成为各行各业的必备工具。其中,百货小程序因其便捷性和多功能性,越来越受到人们的青睐。那么,如何制作一个百货小程序呢?下面,我们就详细介绍一下无需编写代码的步骤。 一、进入后…

【GAMES202】Real-Time Global Illumination(screen space)1—实时全局光照(屏幕空间)1

一、Real-Time Global Illumination(in 3D cont.) 上篇只介绍了RSM,这里我们还会简要介绍另外两种在3D空间中做全局光照的方法,分别是LPV和VXGI。 1.Light Propagation Volumes (LPV) 首先我们知道Radiance在传播过程中是不会被改变的,这点…

Redis事务为什么不支持回滚

Redis事务中过程中的错误分类两类: 在exec执行之前的错误,这种错误通常是指令错误,比如指令语法错误、内存不足等... --> 在开始事务后,传输指令时,遇到这种错误,Redis会给出Error错误提示,…

lv3 嵌入式开发-4 linux shell命令(进程管理、用户管理)

目录 1 进程处理相关命令 1.1 进程的概念 1.2 查看进程的命令 1.3 发送信号命令 2 用户管理相关命令 2.1 用户管理相关文件介绍 2.2 用户管理相关命令介绍 1 进程处理相关命令 1.1 进程的概念 进程的概念主要有两点: 进程是一个实体。每一个进程都有它自己…

基于java Swing 和 mysql实现的飞机订票系统(源码+数据库+ppt+ER图+流程图+架构说明+论文+运行视频指导)

一、项目简介 本项目是一套基于java Swing 和 mysql实现的飞机订票系统,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含:项目源码、项目文档、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过…

PostgreSQL命令行工具psql常用命令

1. 概述 通常情况下操作数据库使用图形化客户端工具,在实际工作中,生产环境是不允许直接连接数据库主机,只能在跳板机上登录到Linux服务器才能连接数据库服务器,此时就需要使用到命令行工具。psql是PostgreSQL中的一个命令行交互…

【Linux】线程安全-信号量

文章目录 信号量原理信号量保证同步和互斥的原理探究信号量相关函数初始化信号量函数等待信号量函数释放信号量函数销毁信号量函数 信号量实现生产者消费者模型 信号量原理 信号量的原理:资源计数器 PCB等待队列 函数接口 资源计数器:对共享资源的计…

【Linux】进程的优先级

我们都知道进程等待需要cpu处理的,那就需要一个数据结构来记录要被cpu处理的进程,那这些进程是按一个什么样的方式在这个结构中进行等待呢?下面就要谈到进程的优先级了: 目录 一、进程的优先级的概念 二、查看进程的优先级 2.1…

【javaweb】学习日记Day8 - Mybatis入门 Mysql 多表查询 事务 索引

之前学习过的SQL语句笔记总结戳这里→【数据库原理与应用 - 第六章】T-SQL 在SQL Server的使用_Roye_ack的博客-CSDN博客 【数据库原理与应用 - 第八章】数据库的事务管理与并发控制_一级封锁协议_Roye_ack的博客-CSDN博客 目录 一、多表查询 1、概述 (1&#…

Spring-Kafka生产者源码分析

文章目录 概要初始化消息发送小结 概要 本文主要概括Spring Kafka生产者发送消息的主流程 代码准备&#xff1a; SpringBoot项目中maven填加以下依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent&…

大数据技术原理与应用学习笔记第1章

黄金组合访问地址&#xff1a;http://dblab.xmu.edu.cn/post/7553/ 1.《大数据技术原理与应用》教材 官网&#xff1a;http://dblab.xmu.edu.cn/post/bigdata/ 2.大数据软件安装和编程实践指南 官网林子雨编著《大数据技术原理与应用》教材配套大数据软件安装和编程实践指…

Liunx系统编程:信号量

一. 信号量概述 1.1 信号量的概念 在多线程场景下&#xff0c;我们经常会提到临界区和临界资源的概念&#xff0c;如果临界区资源同时有多个执行流进入&#xff0c;那么在多线程下就容易引发线程安全问题。 为了保证线程安全&#xff0c;互斥被引入&#xff0c;互斥可以保证…

Java-泛型

文章目录 Java泛型什么是泛型&#xff1f;在哪里使用泛型&#xff1f;设计出泛型的好处是什么&#xff1f;动手设计一个泛型泛型的限定符泛型擦除泛型的通配符 结论 Java泛型 什么是泛型&#xff1f; Java泛型是一种编程技术&#xff0c;它允许在编译期间指定使用的数据类型。…

操作视频的开始与暂停

调用 ref.current.play() 方法来播放视频&#xff1b; 如果视频需要暂停&#xff0c;我们调用 ref.current.pause() 方法来暂停视频。 通过 useRef 创建的 ref 操作视频的开始与暂停 当用户点击按钮时&#xff0c;根据当前视频的状态&#xff0c;我们会开始或暂停视频&…

ip地址、LINUX、与虚拟机

子网掩码&#xff0c;是用来固定网络号的&#xff0c;例如255&#xff0c;255,255,0&#xff0c;表明前面三段必须为网络号&#xff0c;后面必须是主机号&#xff0c;那么怎么实现网络复用呢&#xff0c;例如使用c类地址&#xff0c;但是正常子网掩码是255&#xff0c;255,255,…

GB28181学习(二)——注册与注销

概念 使用REGISTER方法进行注册和注销&#xff1b;注册和注销应进行认证&#xff0c;认证方式应支持数字摘要认证方式&#xff0c;高安全级别的宜支持数字证书认证&#xff1b;注册成后&#xff0c;SIP代理在注册过期时间到来之前&#xff0c;应向注册服务器进行刷新注册&…

vue从零开始学习

npm install慢解决方法:删掉nodel_modules。 5.0.3:表示安装指定的5.0.3版本 ~5.0.3:表示安装5.0X中最新的版本 ^5.0.3: 表示安装5.x.x中最新的版本。 yarn的优点: 1.速度快,可以并行安装 2.安装版本统一 项目搭建: 安装nodejs查看node版本:node -v安装vue clie : np…

C语言练习8(巩固提升)

C语言练习8 编程题 前言 奋斗是曲折的&#xff0c;“为有牺牲多壮志&#xff0c;敢教日月换新天”&#xff0c;要奋斗就会有牺牲&#xff0c;我们要始终发扬大无畏精神和无私奉献精神。奋斗者是精神最为富足的人&#xff0c;也是最懂得幸福、最享受幸福的人。正如马克思所讲&am…

明厨亮灶监控实施方案 opencv

明厨亮灶监控实施方案通过pythonopencv网络模型图像识别算法&#xff0c;一旦发现现场人员没有正确佩戴厨师帽或厨师服&#xff0c;及时发现明火离岗、不戴口罩、厨房抽烟、老鼠出没以及陌生人进入后厨等问题生成告警信息并进行提示。OpenCV是一个基于Apache2.0许可&#xff08…