基于MQTT协议实现微服务架构事件总线

一、场景描述

昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。

对于由服务集群支撑的微服务架构,websocket提供的点对点通信已无法满足前端订阅后端集群事件的需求,升级方案是使用基于消息总线的通信方式。

在这里插入图片描述

二、几种消息总线适用性比较

常用的消息总线包括Kafka、Redis和基于MQTT协议实现的EMQX。

Kafka和MQTT都是从发布/订阅系统演化而来,但发展侧重点不同。Kafka通过分布式架构提供了海量数据流的存储,并保证数据流顺序,它的设计目标是支持数据发布、订阅和存储。而MQTT用于网络中传输小型数据包,其设计目的是实现简单、可靠的设备间通信。

而Redis是从内存数据库系统演化而来,发布/订阅功能是把消息保存在内存中。与Kafka相比,其只能提供半持久化;与MQTT相比,其通信效率较低。

由于应用场景没有对消息持久化的需求,且考虑到产业大脑平台未来会接入工业互联网,使用MQTT协议来搭建事件总线更利于平台在工业互联网环境下的扩展。

三、MQTT简介

几年前,本人曾写过MQTT简介。本文摘抄其中重要概念。

(一)MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:

  • 至多一次:
    消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。
  • 至少一次:
    确保消息送达订阅者,但消息可能重复,适用于幂等性操作。
  • 只有一次:
    最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。

MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。

MQTT.png

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:

  • connect:客户端建立与服务器的连接
  • disconnect:等待客户端完成工作后,端口与总线的会话
  • subscribe:客户端向消息总线注册订阅主题
  • unsubscribe:客户端等待消息总线取消所注册的订阅
  • publish:客户端向消息总线发送某主题的消息

(二)开源消息总线EMQX

EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,软件主页为https://www.emqx.io/。可根据操作系统类别选择不同版本下载安装,或通过docker部署。

软件安装后,通过 emqx start以后台方式启动。启动后将会开放两个端口:

  • 18083端口为控制台端口,可通过浏览器访问该端口,首次登录的用户名和密码为admin和public。控制台提供了总线监控、用户权限管理、在线客户端订阅/发布等功能。
  • 8083端口为通信端口,MQTT客户端可通过该端口与EMQX消息总线通信。

(三)MQTT.js客户端

MQTT.js是MQTT客户端Nodejs SDK,可在浏览器(ES模块)和Node.js环境(CommonJS模块)下使用,前者可通过MQTT over WebSocket使用,后者既可以通过MQTT over WebSocket使用,也可以直接使用MQTT。区别仅仅是连接参数的协议头不同。

1. 安装和帮助文件

$ pnpm i mqtt -S #安装
$ npx mqtt help  #帮助
MQTT.js command line interface, available commands are:* publish     publish a message to the broker* subscribe   subscribe for updates from the broker* version     the current MQTT.js version* help        help about commandsLaunch 'mqtt help [command]' to know more about the commands.

2. 使用方法

// const mqtt = require('mqtt') //ES模块
import mqtt from 'mqtt'  //CommonJS模块// 连接选项
const options = {clean: true, // true: 清除会话, false: 保留会话connectTimeout: 4000, // 超时时间// 认证信息clientId: 'user_id', // 要保证唯一性// 若在控制台配置了用户名和密码:// username: 'xxx',// password: 'xxx',
}// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接
// wss 加密 WebSocket 连接
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)client.on('reconnect', error => {console.error('正在重连:', error)
})client.on('error', error => {console.error('连接失败:', error)
})//收到消息
client.on('message', (topic, message) => {console.log('收到消息:', topic, message.toString()) //message是二进制流,需要转换成字符串
})//订阅主题
const topic='/user_id/#'
const qos=0  //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{  //订阅user_id主题下所有消息if(error){console.error('订阅主题失败:', error)return}console.log('订阅成功')
})//发布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{if(error){console.error('发布消息失败:', error)}
})//取消订阅
client.unsubscrib(topic, qos, error=>{if(error){console.error('取消订阅失败', error)return}
})//断开连接
if(client.connected){try{client.end(false,()=>{console.log('成功断开连接')})catch(error){console.error('断开连接失败:', error)}}
}

(四)安全性

1. 排它订阅

排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

2. JWT认证

系统整体采用JWT认证方式,通过一台认证服务器颁发JWT Token。MQTT客户端访问EMQX总线时,携带由认证服务器颁发的JWT Token。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/265598.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文讲清DTO、BO、PO、VO

DTO、BO、PO、VO是什么? 在后端开发中,比如传统的MVC架构和现在流行的DDD架构,经常会使用到下列几种对象的概念 DTO (Data Transfer Object) 数据传输对象: DTO设计模式用于将数据从服务端传输到客户端,或者在不同的…

代码随想录训练营第31天 | 理论基础、LeetCode 455.分发饼干、

目录 理论基础 视频讲解:手把手带你学会操作链表 | 贪心算法理论基础!_哔哩哔哩_bilibili LeetCode 455.分发饼干 文章讲解:代码随想录(programmercarl.com) 视频讲解:贪心算法,你想先喂哪个小孩?| Le…

物业智能水电抄表管理系统

物业智能水电抄表管理系统是物业管理行业的关键技术之一,其结合了智能化、远程监控和数据分析等功能,为物业管理公司和业主提供了高效、精准的水电抄表管理解决方案。该系统具有多项优势,能够提升物业管理效率,降低成本&#xff0…

深入理解计算机系统学习笔记

1.算术和逻辑操作 下图是一些整数和逻辑操作 这些操作被分为四组:加载有效地址、一元操作、二元操作和移位。二元操作有两个操作数,而一元操作有一个操作数。 1.1加载有效地址 加载有效地址(load effective address)指令 leaq 实际上是 mo…

Pulsar3.2 Function的介绍与使用

概念 Function 步骤 Pulsar Functions是运行在Pulsar上面的计算框架,输入和输出都是基于Pulsar的Topic。通过使用Function可以对进入Pulsar集群的消息进行简单的清洗、计算,这样不仅避免额外部署单独的流处理引擎(SPE),最大限度的提高开发/…

【力扣hot100】刷题笔记Day14

前言 又是新的一周,快乐的周一,快乐地刷题,今天把链表搞完再干活! 114. 二叉树展开为链表 - 力扣(LeetCode) 前序遍历 class Solution:def flatten(self, root: Optional[TreeNode]) -> None:if not r…

Bicycles(变形dijkstra,动态规划思想)

Codeforces Round 918 (Div. 4) G. Bicycles G. Bicycles 题意: 斯拉夫的所有朋友都打算骑自行车从他们住的地方去参加一个聚会。除了斯拉维奇,他们都有一辆自行车。他们可以经过 n n n 个城市。他们都住在城市 1 1 1 ,想去参加位于城市…

【Java程序员面试专栏 算法思维】四 高频面试算法题:回溯算法

一轮的算法训练完成后,对相关的题目有了一个初步理解了,接下来进行专题训练,以下这些题目就是汇总的高频题目,本篇主要聊聊回溯算法,主要就是排列组合问题,所以放到一篇Blog中集中练习 题目关键字解题思路时间空间全排列回溯算法【元素无重不可复选】构造全排列树,用使…

kafka三节点集群平滑升级过程指导

一、前言 Apache Kafka作为常用的开源分布式流媒体平台,可以实时发布、订阅、存储和处理数据流,多用于作为消息队列获取实时数据,构建对数据流的变化进行实时反应的应用程序,已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型…

Redis String 类型底层揭秘

目录 前言 String 类型低层数据结构 节省内存的数据结构 前言 Redis 的 string 是个 “万金油” ,这么评价它不为过. 它可以保存Long 类型整数,字符串, 甚至二进制也可以保存。对于key,value 这样的单值,查询以及插…

详解Kotlin中run、with、let、also与apply的使用和区别

Kotlin作为一种现代、静态类型的编程语言,不仅提供了丰富的特性,还提供了极具表现力的函数:run, with, let, also, 和 apply。理解这些函数的不同之处对于编写高效、易于维护的代码至关重要。 函数对比表 函数对象引用返回值使用场景runthi…

猜数游戏(个人学习笔记黑马学习)

案例需求 定义一个数字(1~10,随机产生),通过3次判断来猜出来数字 案例要求: 1.数字随机产生,范围1-10 2.有3次机会猜测数字,通过 3.层嵌套判断实现每次猜不中,会提示大了或小了 提示,通过如下代…

【海贼王的数据航海:利用数据结构成为数据海洋的霸主】链表—单链表

目录 1 -> 链表 1.1 -> 链表的概念及结构 1.2 -> 链表的分类 2 -> 无头单向非循环链表(单链表) 2.1 -> 接口声明 2.2 -> 接口实现 2.2.1 -> 动态申请一个结点 2.2.2 -> 单链表的打印 2.2.3 -> 单链表的尾插 2.2.4 -> 单链表的头插 2.…

消息中间件篇之RabbitMQ-消息不丢失

一、生产者确认机制 RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。 当消息没有到交换机就失败了,就会返回publish-confirm。当消息没有到达MQ时&…

2.27数据结构

1.链队 //link_que.c #include "link_que.h"//创建链队 Q_p create_que() {Q_p q (Q_p)malloc(sizeof(Q));if(qNULL){printf("空间申请失败\n");return NULL;}node_p L(node_p)malloc(sizeof(node));if(LNULL){printf("申请空间失败\n");return…

DETR(1):论文详解

文章目录 1. DETR 模型结构2.损失函数2.1 预测结果和GT 的匹配2.2 训练的loss计算3.实验3.1 大物体表现效果好3.2 Transformer Encoder 和Decoder的作用3.3 object query4. 伪代码5. 结论

【《高性能 MySQL》摘录】第 2 章 MySQL 基准测试

文章目录 2.1 为什么需要基准测试2.2 基准测试的策略2.2.1 测试何种指标 2.3 基准测试方法2.3.1 设计和规划基准测试2.3.2 基准测试应该运行多长时间2.3.3 获取系统性能和状态2.3.4 获得准确的测试结果2.3.5 运行基准测试并分析结果2.3.6 绘图的重要性 2.4 基准测试工具…

IntelliJ IDEA 2023:创新不止步,开发更自由 mac/win版

IntelliJ IDEA 2023激活版是一款强大而智能的集成开发环境(IDE),为开发者提供了一系列先进的功能和工具,帮助他们更高效地编写、调试和测试代码。 IntelliJ IDEA 2023 软件获取 IntelliJ IDEA 2023继承了其前代版本的优秀基因,并在此基础上进…

2月28日代码随想录二叉搜索树中的众数

摸了一个寒假了,得赶一赶了 251.二叉搜索树中的众数 给你一个含重复值的二叉搜索树(BST)的根节点 root ,找出并返回 BST 中的所有 众数(即,出现频率最高的元素)。 如果树中有不止一个众数&am…

虚拟机JVM

虚拟机 1、定义jvm 假想计算机 运行在操作系统之上 和硬件之间没有直接交互 包括 一套字节码指令、寄存器、栈、垃圾回收、堆 一个存储方法域 jvm:承担一个翻译工作,动态的将java代码编译成操作系统可以识别的机器码。 从软件层面屏蔽了不同操作系统在底层硬件与指…