车联网架构设计(二)_消息缓存

在上一篇博客车联网架构设计(一)_消息平台的搭建-CSDN博客中,我介绍了车联网平台需要实现的一些功能,并介绍了如何用EMQX+HAPROXY来搭建一个MQTT消息平台。车联网平台的应用需要消费车辆发布的消息,同时也会下发消息给车辆,以实现车辆控制等功能。通常我们会在MQTT消息平台收到车辆消息后对消息进行缓存,以供上层应用使用。我们可以直接把消息保存到数据库,或者引入一个消息队列,这样可以方便对应用和车辆之间进行解耦合。

这里我将介绍一下如何引入一个Kafka消息队列,把车辆以及上层应用之间需要交互的消息缓存到这个消息队列之中。

在EMQX的企业版中,提供了丰富的数据桥接功能,可以支持把MQTT消息桥接到其他外部系统,例如Kafka或数据库中。但是在开源版,只提供了很有限的数据桥接,不支持Kafka。为此我们可以通过给EMQX开发Hook extension的方式,来加载我们的插件,实现把数据桥接到Kafak。

在EMQX官网的介绍中,Hook扩展是通过gRPC的方式来实现的,支持多种编程语言。如下图:

这里我以Python为例子,来定义一个扩展。

搭建Kafka

首先是在K8S上部署一个Kafka集群,这里我选择了Strimizi的Kafka operator来部署

先创建一个namespace

kubectl create namespace kafka

安装Operator, CRD以及定义RBAC等

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

创建一个只包含一个节点的Kafka

kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka 

打开两个终端,分别运行以下的订阅和发布的指令,测试Kafka是否正常工作

kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

开发ExHook

首先是获取当前EMQX版本定义的gPRC proto。在EMQX服务器的/opt/emqx/lib/emqx_exhook-5.0.14/priv/protos/目录下面有一个exhook.proto文件。

运行以下命令来基于这个proto生成python文件

python -m grpc_tools.protoc -I./ --python_out=. --pyi_out=. --grpc_python
_out=. ./exhook.proto

运行之后,在当前目录下会新生成三个文件,exhook_pb2_grpc.py,exhook_pb2.py,exhook_pb2.pyi

新建一个exhook_server.py文件,继承exhook_pb2_grpc里面的HookProviderServicer,注册对应事件的处理方法,如以下代码:

from concurrent import futures
import logging
from multiprocessing.sharedctypes import Valueimport grpcimport exhook_pb2
import exhook_pb2_grpcimport pickle
from kafka import KafkaProducerclass HookProvider(exhook_pb2_grpc.HookProviderServicer):def __init__(self):self.producer = KafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092')def OnProviderLoaded(self, request, context):print("OnProviderLoaded:", request)'''specs = [exhook_pb2.HookSpec(name="client.connect"),exhook_pb2.HookSpec(name="client.connack"),exhook_pb2.HookSpec(name="client.connected"),exhook_pb2.HookSpec(name="client.disconnected"),exhook_pb2.HookSpec(name="client.authenticate"),exhook_pb2.HookSpec(name="client.authorize"),exhook_pb2.HookSpec(name="client.subscribe"),exhook_pb2.HookSpec(name="client.unsubscribe"),exhook_pb2.HookSpec(name="session.created"),exhook_pb2.HookSpec(name="session.subscribed"),exhook_pb2.HookSpec(name="session.unsubscribed"),exhook_pb2.HookSpec(name="session.resumed"),exhook_pb2.HookSpec(name="session.discarded"),exhook_pb2.HookSpec(name="session.takenover"),exhook_pb2.HookSpec(name="session.terminated"),exhook_pb2.HookSpec(name="message.publish"),exhook_pb2.HookSpec(name="message.delivered"),exhook_pb2.HookSpec(name="message.acked"),exhook_pb2.HookSpec(name="message.dropped")]'''specs = [exhook_pb2.HookSpec(name="message.publish")]return exhook_pb2.LoadedResponse(hooks=specs)def OnProviderUnloaded(self, request, context):print("OnProviderUnloaded:", request)return exhook_pb2.EmptySuccess()def OnClientConnect(self, request, context):print("OnClientConnect:", request)return exhook_pb2.EmptySuccess()def OnClientConnack(self, request, context):print("OnClientConnack:", request)return exhook_pb2.EmptySuccess()def OnClientConnected(self, request, context):print("OnClientConnected:", request)return exhook_pb2.EmptySuccess()def OnClientDisconnected(self, request, context):print("OnClientDisconnected:", request)return exhook_pb2.EmptySuccess()def OnClientAuthenticate(self, request, context):print("OnClientAuthenticate:", request)reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)return replydef OnClientAuthorize(self, request, context):print("OnClientAuthorize:", request)reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)return replydef OnClientSubscribe(self, request, context):print("OnClientSubscribe:", request)return exhook_pb2.EmptySuccess()def OnClientUnsubscribe(self, request, context):print("OnClientUnsubscribe:", request)return exhook_pb2.EmptySuccess()def OnSessionCreated(self, request, context):print("OnSessionCreated:", request)return exhook_pb2.EmptySuccess()def OnSessionSubscribed(self, request, context):print("OnSessionSubscribed:", request)return exhook_pb2.EmptySuccess()def OnSessionUnsubscribed(self, request, context):print("OnSessionUnsubscribed:", request)return exhook_pb2.EmptySuccess()def OnSessionResumed(self, request, context):print("OnSessionResumed:", request)return exhook_pb2.EmptySuccess()def OnSessionDiscarded(self, request, context):print("OnSessionDiscarded:", request)return exhook_pb2.EmptySuccess()def OnSessionTakenover(self, request, context):print("OnSessionTakenover:", request)return exhook_pb2.EmptySuccess()def OnSessionTerminated(self, request, context):print("OnSessionTerminated:", request)return exhook_pb2.EmptySuccess()def OnMessagePublish(self, request, context):self.producer.send('testtopic', pickle.dumps(nmsg))reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)return reply## case2: stop publish the 't/d' messages#def OnMessagePublish(self, request, context):#    nmsg = request.message#    if nmsg.topic == 't/d':#        nmsg.payload = b""#        nmsg.headers['allow_publish'] = b"false"##    reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)#    return replydef OnMessageDelivered(self, request, context):print("OnMessageDelivered:", request)return exhook_pb2.EmptySuccess()def OnMessageDropped(self, request, context):print("OnMessageDropped:", request)return exhook_pb2.EmptySuccess()def OnMessageAcked(self, request, context):print("OnMessageAcked:", request)return exhook_pb2.EmptySuccess()def serve():server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)server.add_insecure_port('[::]:9000')server.start()print("Started gRPC server on [::]:9000")server.wait_for_termination()if __name__ == '__main__':logging.basicConfig()serve()

 解释一下代码,在OnProvidedLoader里面是加载各种事件的钩子,这里只加载message.publish事件。在OnMessagePublish是对应事件的处理函数,这里把收到的MQTT消息通过Pickle进行序列化,发送到Kafka的对应topic

部署ExHook

写一个Dockerfile,把代码打包为一个镜像

FROM python:3.7-slim
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "./exhook_server.py"]

 requirements.txt文件内容为

grpcio==1.59.3
grpcio-tools==1.59.3
kafka-python==2.0.2

运行以下命令来构建镜像

docker build --network=host -t emqx_plugin_test:v1.0 .

创建一个部署这个镜像的deployment和service,然后部署到K8S

apiVersion: apps/v1
kind: Deployment
metadata:name: emqx-hookserver-deploymentlabels:app: hookservernamespace: emqx
spec:replicas: 1selector:matchLabels:app: hookservertemplate:metadata:labels:app: hookserverspec:containers:- name: hookserverimage: emqx_plugin_test:v1.0imagePullPolicy: Neverresources:requests:memory: "250Mi"cpu: "100m"limits:memory: "250Mi"cpu: "100m"ports:- name: rpccontainerPort: 9000
---
apiVersion: v1
kind: Service
metadata:name: hookserver-servicenamespace: emqx
spec:selector:app: hookserverports:- name: rpcport: 9000

回到EMQX的控制面板Dashboard,在ExHook里面添加,url填入http://hookserver-service.emqx.svc.cluster.local:9000,然后选择启用即可,可以看到状态为连接成功,并且显示注册了1个钩子。

在minikube上部署,一开始是显示连接中,等了很久仍然无法连接成功,最后查了资料,原来是coredns的问题,运行以下命令重启即可:

kubectl -n kube-system rollout restart deployment coredns

之后打开订阅Kafka的testtopic,然后通过MQTT连接到EMQX发送消息,可以看到Kafka能成功收到EMQX转发的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/211290.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE4/UE5 材质实现带框环形进度条

UE4/UE5 材质实现带框环形进度条 此处使用版本:UE4.27 原理:大圆减小圆可以得到圆环,大圆环减小圆环,可以得到圆环外围线框 实现效果: 实现(为了给大家放进一张面前能看的图,我费劲了心思&…

6-55.汽车类的继承

根据给定的汽车类vehicle(包含的数据成员有车轮个数wheels和车重weight)声明,完成其中成员函数的定义,之后再定义其派生类并完成测试。 小车类car是它的派生类,其中包含载人数passenger_load。每个类都有相关数据的输出…

使用 Mybatis 的 TypeHandler 存取 Postgresql jsonb 类型

文章目录 使用 TypeHandler 存取 Postgresql jsonb 类型常见错误column "" is of type jsonb but expression is of type character varying 使用 TypeHandler 存取 Postgresql jsonb 类型 首先在数据库表中定义 jsonb 类型: create table tb_user_info…

MyCAT读写分离

Mycat 是一个开源的数据库系统,但是由于真正的数据库需要存储引擎,而 Mycat 并没有存 储引擎,所以并不是完全意义的数据库系统。 那么 Mycat 是什么?Mycat 是数据库中间件,就是介于数据库与应用之间,进行数…

打工人副业变现秘籍,某多/某手变现底层引擎-StableDiffusionUI引擎部署

Stable Diffusion Web UI是一个基于Stable Diffusion的交互式程序,使用gradio模块构建而成。除了基本的txt2img、img2img等功能外,该模块还包含许多模型融合改进、图片质量修复等附加升级。所有这些功能都可以通过易于使用的Web应用程序图形用户界面进行访问。 一、简介 St…

MQTT框架和使用

目录 MQTT框架 1. MQTT概述 1.1 形象地理解三个角色 1.2 消息的传递 2. 在Windows上体验MQTT 2.1 安装APP 2.2 启动服务器 2.3 使用MQTTX 2.3.1 建立连接 2.3.2 订阅主题 2.3.3 发布主题 2.4 使用mosquitto 2.4.1 发布消息 2.4.2 订阅消息 3. kawaii-mqtt源码分析…

git bash查看远程仓库地址

进入代码路径 git remote -vgit remote -v

LeetCode力扣每日一题(Java):13、罗马数字转整数

一、题目 二、解题思路 1、我的思路 整体思路是将字符串转成字符,再遍历每一个字符,找到各个罗马字母对应的数值大小,同时需要将当前罗马字母的数值大小(后文称为“前”)与后一个罗马字母的数值大小(后文…

计算机视觉GPT时刻!UC伯克利三巨头祭出首个纯CV大模型,推理惊现AGI火花

计算机视觉的GPT时刻,来了! 最近,来自UC伯克利的计算机视觉「三巨头」联手推出了第一个无自然语言的纯视觉大模型(Large Vision Models),并且第一次证明了纯视觉模型本身也是可扩展的(scalabil…

【GIT】.gitignore 在忽略目录中放开某目录

示例:忽略build下面的所有目录,只放开build/ast2500-default/workspace/recipes-phosphor/ 目录 .gitignore 实现文件代码 # 忽略 build 目录下的所有目录 # 并放开build/ast2500-default/workspace/recipes-phosphor/ build/* !build/ast2500-defaul…

微信小程序收款手续费怎么搞成0.2

今天,我将分享如何有效地降低日常中的收款手续费率。我们都知道,不管是微信支付还是支付宝,平台都会从中扣除一定的手续费。但你是否知道,其实手续费率是可以降低的呢?今天介绍如何申请最低手续费率为0.2%的方法&#…

梯度下降(批量梯度下降、随机梯度下降、小批量梯度下降)

在上一篇中我们推导了损失函数 J ( θ ) 1 2 m ∑ i 1 m ( y i − h θ ( x i ) ) 2 J(\theta) \frac{1}{2m} \sum_{i1}^{m} (y^{i} - h_{\theta}(x^{i}))^2 J(θ)2m1​∑i1m​(yi−hθ​(xi))2的由来,结尾讲到最小化这个损失函数来找到最优的参数 θ \theta θ&…

Mysq8l在Centos上安装后忘记root密码如何重新设置

场景 Mysql8在Windows上离线安装时忘记root密码: Mysql8在Windows上离线安装时忘记root密码-CSDN博客 如果是在Windows上忘记密码可以参考上面。 如果在Centos中安装mysql可以参考下面。 CentOS7中安装Mysql8并配置远程连接和修改密码等: CentOS7中…

element中el-table表头通过header-row-style设置样式

文章目录 一、知识点二、设置全部表头2.1、方式一2.2、方式二 三、设置某个表头四、最后 一、知识点 有些时候需要给element-ui表头设置不同样式,比如居中、背景色、字体大小等等,这时就可以用到本文要说的属性header-row-style。官网说明如下所示&…

前后端分离vue+Nodejs社区志愿者招募管理系统

1、首页 1)滑动的社区照片册 使用轮播图,对社区的活动纪念与实时事件宣传。 每个图片附有文字链接,点击跳转对应社区要闻具体页。 2)社区公告栏 日常的社区公告以及系统说明在此区域中进行说明与展示。 2、志愿活动 1)志愿活动发布 想发布需要登录 2)志愿…

Linux基础项目开发1:量产工具——输入系统(三)

前言: 前面我们已经实现了显示系统,现在我们来实现输入系统,与显示系统类似,下面让我们一起来对输入系统进行学习搭建吧 目录 一、数据结构抽象 1. 数据本身 2. 设备本身: 3. input_manager.h 二、触摸屏编程 t…

Presto基础学习--学习笔记

1,Presto背景 2011年,FaceBook的数据仓库存储在少量大型hadoop/hdfs集群,在这之前,FaceBook的科学家和分析师一直靠hive进行数据分析,但hive使用MR作为底层计算框架,是专为批处理设计的,但是随…

亿胜盈科ATR2037 无限射频前端低噪声放大器

亿胜盈科ATR2037 是一款应用于无线通信射频前端,工作频段为 0.7 到 6GHz 的超低噪声放大器。 ATR2037 低噪声放大器采用先进的 GaAs pHEMT 工艺设计和制作,ATR2037 低噪声放大器在整个工作频段内可以获得非常好的射频性能超低噪声系数。 亿胜盈科ATR203…

abapgit 安装及使用

abapgit 需求 SA[ BASIS 版本 702 及以上 版本查看路径如下: 安装步骤如下: 1. 下载abapgit 独立版本 程序 链接如下:raw.githubusercontent.com/abapGit/build/main/zabapgit_standalone.prog.abap 2.安装开发版本 2.1 在线安装 前置条…

揭秘:软件测试中Web请求的完整流程!

在软件开发的过程中,测试是一个至关重要的环节。而在现代互联网应用中,Web请求是很常见的一个测试需求。本文将介绍Web请求的完整测试流程,帮助读者更好地理解软件测试的关键步骤。 一、测试准备阶段 在进行Web请求测试之前,测试团…