使用 RisingWave、NATS JetStream 和 Superset 进行实时物联网监控

在物联网(IoT)背景下,处理实时数据会遇到一些特定的障碍,如边缘计算资源不足、网络条件限制、扩展性存在问题、设备间有多样性差异。要克服这些挑战,需要高效的边缘计算技术、强大的安全措施、标准化协议、可扩展的管理系统和先进的数据处理能力。

通过综合利用 NATS JetStream、RisingWave 和 Superset,可以构建一个强大的解决方案,用于开发可靠且可扩展的实时物联网应用。

RisingWave 是什么?

RisingWave 是与 PostgreSQL 兼容的流数据库,具有成本效益、可扩展性和真正的云原生架构。它允许用户使用 SQL 从流数据中获取实时见解,易于设置、使用和操作。

NATS JetStream 是什么?

NATS 是一种安全连接技术,设计用于在分布式系统中发现和交换信息。它可以部署在任何环境中,用于微服务、数据流和物联网等不同用例,支持边缘设备,可使用多种语言和客户端进行交互。JetStream 构建在 NATS 之上,支持消息流的持久化。

Superset 是什么?

Apache Superset 是一个现代化数据探索和数据可视化平台。它是一款开源软件,可以取代或增强许多团队的专有商业智能工具。

概述

本文将深入探讨一个物联网场景,重点关注通过物联网传感器监控温度和湿度数据。我们将探讨 NATS JetStream 如何使边缘设备能够轻松将数据流传输到 RisingWave 并进行实时处理。通过窗口操作和聚合,RisingWave 可以高效地对数据进行高级分析。最后,我们将使用 Superset 创建表、图表和集成看板,对处理和分析的数据进行可视化。

实时物联网应用开发解决方案

1. 设置 NATS JetStream

NATS 服务器经过高度优化,其二进制文件不到 20 MB,使其可以轻松在各种机器上运行。无论是在 Raspberry Pi 还是规模宏大的服务器上,也无论是在云端、本地、边缘、裸机、虚拟机还是在容器中,均可轻松运行。

您可以使用 Docker 安装 NATS JetStream,如下所示:

docker pull nats:latest

要在 Docker 上运行 NATS JetStream,可以使用 -js Flag 启动 NATS 服务器。此 Flag 可启用 JetStream 功能,使您能够充分利用其各项功能。

docker run -p 4222:4222 -ti nats:latest -js

该 Docker 命令可启动 NATS JetStream。现在,您可以通过各种语言和客户端发布和订阅信息。

在 4222 端口运行的 NATS JetStream 服务器

2. 向 JetStream 发布数据

在此示例中,我们使用 iot_data 主题将物联网数据发布到 JetStream 的 Stream event_stream 中。下面是正在发布的数据示例:

'{"device_Id":"sensor1","temperature":25,"ts":"2023-01-05 05:50:00+00:00"},
'{"device_Id":"sensor1","temperature":26,"ts":"2023-01-05 05:50:01+00:00"}'
'{"device_Id":"sensor2","humidity":60,"ts":"2023-01-05 05:50:01+00:00"}'
'{"device_Id":"sensor1","temperature":27,"ts":"2023-01-05 05:50:02+00:00"}'
'{"device_Id":"sensor2","humidity":62,"ts":"2023-01-05 05:50:02+00:00"}'

3. 从 RisingWave 摄取 JetStream 的数据

我们可以使用开源 RisingWave 或托管服务(RisingWave Cloud)来摄取和处理流数据。本文将使用 RisingWave Cloud,它能够提供良好的用户体验,简化管理和使用 RisingWave 进行物联网监控的操作。

创建 RisingWave 集群

使用免费计划在 RisingWave Cloud 中创建 RisingWave 集群。有关说明,请参阅 RisingWave Cloud 文档。

RisingWave Cloud:账户注册和登录流程

在 RisingWave 中创建 Source 以摄取数据流

在 RisingWave 中创建 Source,以便从先前设置的 iot_data 主题的 Stream event_stream 中摄取数据。在此示例中,RisingWave 充当 NATS JetStream 的 Stream 和主题的订阅者。

请注意,RisingWave 中带有连接器设置的 Source 会与 Stream 建立连接,但不会持久化流数据。

CREATE SOURCE iot_source(device_Id VARCHAR,temperature VARCHAR,humidity VARCHAR,ts TIMESTAMPTZ
)
WITH (connector='nats',server_url='nats://8.210.9.253:4222',subject='iot_data',stream='event_stream',connect_mode='plain'
)FORMAT PLAIN ENCODE JSON;

4. 在 RisingWave 中进行分析

现在,我们根据名为 iot_source 的 Source 创建一个名为 iot_mv 的物化视图,用于存储传入的数据并进行分析。

CREATE MATERIALIZED VIEW iot_mv AS
SELECT device_Id, temperature,humidity,ts 
FROM iot_source;

可以使用以下 SQL 语句查询结果。

SELECT device_Id, temperature,ts 
from iot_mv
WHERE deviceId ='sensor1'
limit 5;

下面是一个结果示例。

device_id  | temperature |               ts               
----------+-------------+-------------------------------sensor1  |          25 | 2023-01-05 05:50:00+00:00sensor1  |          26 | 2023-01-05 05:50:01+00:00sensor1  |          27 | 2023-01-05 05:50:03+00:00sensor1  |          28 | 2023-01-05 05:50:05+00:00sensor1  |          29 | 2023-01-05 05:50:07+00:00

可以使用以下 SQL 语句查询结果。

SELECT device_Id, humidity,ts 
from iot_mv
WHERE deviceId ='sensor2'
limit 5;
| device_id | humidity |                    ts                    
|----------|----------|------------------------------------------
| sensor2  |    60    | 2023-01-05 05:50:02+00:00 
| sensor2  |    62    | 2023-01-05 05:50:04+00:00 
| sensor2  |    65    | 2023-01-05 05:50:06+00:00 
| sensor2  |    68    | 2023-01-05 05:50:08+00:00 
| sensor2  |    70    | 2023-01-05 05:50:10+00:00

下面的语句可创建一个名为 avg_temperature_mv 的物化视图,用于根据时间戳 ts 计算指定设备 sensor1 在 1 分钟 Tumbling 窗口内的平均温度。结果包括设备 ID、平均温度、窗口开始和窗口结束的列。

CREATE MATERIALIZED VIEW avg_temperature_mv AS
SELECT device_Id, AVG(temperature) AS avg_temperature
window_start, window_end
FROM TUMBLE (iot_mv, ts, INTERVAL '1 MINUTES')
WHERE device_Id ='sensor1'
GROUP BY device_Id,window_start, window_end;

可以使用以下 SQL 语句查询结果。

SELECT * FROM avg_temperature_mv LIMIT 5;

下面是一个结果示例。

| device_id | avg_temperature  |        window_start        |          window_end           
|----------|------------------|----------------------------|--------------------------
| sensor1  |        41        | 2023-01-05T05:56:00Z       | 2023-01-05T05:57:00Z 
| sensor1  |        40        | 2023-01-05T05:50:00Z       | 2023-01-05T05:51:00Z 
| sensor1  |        38        | 2023-01-05T05:55:00Z       | 2023-01-05T05:56:00Z 
| sensor1  |        35        | 2023-01-05T05:54:00Z       | 2023-01-05T05:55:00Z 
| sensor1  |        55        | 2023-01-05T06:01:00Z       | 2023-01-05T06:02:00Z

同样,下面的语句可创建一个名为 avg_humidity_mv 的物化视图,用于根据时间戳 ts 计算指定设备 sensor2 在 1 分钟 Tumbling 窗口内的平均湿度。结果包括设备 ID、平均湿度、窗口开始和窗口结束的列。

CREATE MATERIALIZED VIEW avg_humidity_mv AS
SELECT device_Id, AVG(humidity) AS avg_humidity
window_start, window_end
FROM TUMBLE (iot_mv, ts, INTERVAL '1 MINUTES')
WHERE device_Id ='sensor2'
GROUP BY device_Id,window_start, window_end;

可以使用以下 SQL 语句查询结果。

SELECT * FROM avg_humidity_mv LIMIT 5;

下面是一个结果示例。

| device_Id | avg_humidity |        window_start         |          window_end           
|----------|--------------|-----------------------------|-------------------------------
| sensor2  |   112.33     | 2023-01-05T05:58:00Z | 2023-01-05T05:59:00Z |
| sensor2  |      75      | 2023-01-05T05:53:00Z | 2023-01-05T05:54:00Z |
| sensor2  |      90      | 2023-01-05T05:55:00Z | 2023-01-05T05:56:00Z |
| sensor2  |      95      | 2023-01-05T05:50:00Z | 2023-01-05T05:51:00Z |
| sensor2  |     105      | 2023-01-05T05:57:00Z | 2023-01-05T05:58:00Z |

5. 在 Apache Superset 中可视化数据

我们将配置 Superset,以便从 RisingWave 读取数据并进行可视化。

将 RisingWave 连接到 Superset

可以在 Apache Superset 中将 RisingWave 作为数据源,使用 RisingWave 中的表和物化视图进行可视化和创建看板。要了解该过程,请按照 配置 Superset 从 RisingWave 读取数据 一文中的说明进行操作。

成功将 RisingWave 连接到 Apache Superset 后,我们可将 RisingWave 中的物化视图添加为数据集,以创建表、各种图表和综合看板。

使用 Apache Superset 可视化数据:表、图表和看板

此表由 iot_mv 数据集生成,显示温度传感器 ID、温度读数以及每个读数相应的时间戳等信息。

温度传感器表: 温度传感器 ID、温度读数和时间戳

此表也由 iot_mv 数据集生成,显示湿度传感器 ID、湿度读数以及每个读数相应的时间戳等详细信息。它全面展示了在 iot_mv 物化视图中捕获和存储的湿度数据。

湿度传感器表: 湿度传感器 ID、湿度读数和时间戳

此条形图由 avg_temperature_mv 数据集生成,显示了温度传感器在预定义的 1 分钟时间窗口内获取的平均温度。

平均温度传感器图表:显示温度传感器在 1 分钟窗口内获取的平均温度值

此折线图由 avg_humidity_mv 数据集生成,显示了湿度传感器在指定的 1 分钟时间窗口内获取的平均湿度。

平均湿度传感器图表:显示湿度传感器在 1 分钟窗口内获取的平均湿度值

此综合看板呈现了一系列图表,有助于全面实时监控物联网设备。通过对每个相应时间戳的温度和湿度传感器读数进行深入分析,获取有价值的见解,使用户能够做出明智的决策,并实现对工业物联网设备的有效监控。

物联网设备实时监控看板:基于温度和湿度传感器

总结

本文逐步介绍了如何利用 NATS JetStream、RisingWave 和 Superset 构建实时物联网监控解决方案。以上三个系统的设置过程简单省力,资源效率高且具有强大的可扩展性,是实时物联网应用的理想组合。通过三者的无缝集成,不到一小时即可创建一个实时物联网看板。简而言之,这展示了物联网设备背景下 NATS JetStream、RisingWave 和 Apache Superset 在工业流程中的无缝集成,并通过可视化和看板实现了实时分析和监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/298860.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring中各种bean加载顺序

具体加载顺序按照罗列的顺序 XXXAware ApplicationContextAware、EnvironmentAware、BeanFactoryAware、BeanClassLoaderAware 顾名思义,用于获取对应的对象,需要在实体类中声明对应的对象且当前类为普通类能被注入。 InitializingBean void afterProp…

【软件工程】测试规格

1. 引言 1.1简介 本次的测试用例是基于核心代码基本开发完毕,在第一代系统基本正常运行后编写的,主要目的是为了后续开发与维护的便利性。 该文档主要受众为该系统后续开发人员,并且在阅读此文档前最后先阅读本系统的需求文档、概要设计文…

日志、logback、logback.xml --java学习笔记

什么是日志? 好比生活中的日记,可以记录你生活中的点点滴滴程序中的日志,通常就是一个文件,里面记录的是程序运行过程中的各种信息 之前记录日志的方法都是使用输出语句: 这种方法其实并不适合用来记录日志&#xff…

【c++】初阶模版与STL简单介绍

🔥个人主页:Quitecoder 🔥专栏:c笔记仓 朋友们大家好,本篇文章介绍一下模版和对STL进行简单的介绍,后续我们进入对STL的学习! 目录 模版1.泛型编程2.函数模板2.1函数模板的原理2.2模版的实例化…

实验:基于Red Hat Enterprise Linux系统的创建磁盘和磁盘分区(二、三)

目录 一. 实验目的 二. 实验内容 三. 实验设计描述及实验结果 实验二: 1. 为nvme0n2p1设备建立配额属性和文件(EXT) 2. 要求自己名字的用户只能存储不超过200M的文件,总数量不能大于10个 quotacheck [选项] 文件系统 edquota quotaon [选项] 文件系…

某盾滑块拼图验证码增强版

介绍 提示:文章仅供交流学习,严禁用于非法用途,如有不当可联系本人删除 最近某盾新推出了,滑块拼图验证码,如下图所示,这篇文章介绍怎么识别滑块距离相关。 参数attrs 通过GET请求获取的参数attrs, 决…

背包问题---

一、背包模型 有一个体积为V的背包,商店有n个物品,每个物品有一个价值v和体积w,每个物品只能被拿一次,问能够装下物品的最大价值。 这里每一种物品只有两种状态即"拿"或"不拿". 设状态dp[i][j]表示到第i个物品为止,拿的物品总体积为j的情况下的最大价…

Docker:探索容器化技术,重塑云计算时代应用交付与管理

一,引言 在云计算时代,随着开发者逐步将应用迁移至云端以减轻硬件管理负担,软件配置与环境一致性问题日益凸显。Docker的横空出世,恰好为软件开发者带来了全新的解决方案,它革新了软件的打包、分发和管理方式&#xff…

【智能排班系统】基于SpringSecurity实现登录验证、权限验证

文章目录 SpringSecurity介绍sss-security实现依赖工具类Jwt工具JSON响应工具加密工具类 用户上下文用户信息实体类用户上下文 自定义重写自定义无权限的报错自定义密码加密自定义用户类 过滤器登录过滤器权限过滤器 Service登录Service 配置类说明登录验证权限验证IP流量限制 …

C语言第四十弹---预处理(下)

✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】 预处理 1、#和## 1.1 #运算符 1.2、##运算符 2、命名约定 3、#undef 4、命令行定义 5、条件编译 6、头文件的包含 6.1、头文件被包含的方式 6.1.1、本地…

Spark 部署与应用程序交互简单使用说明

文章目录 前言步骤一:下载安装包Spark的目录和文件 步骤二:使用Scala或PySpark Shell本地 shell 运行 步骤3:理解Spark应用中的概念Spark Application and SparkSessionSpark JobsSpark StagesSpark Tasks 转换、立即执行操作和延迟求值窄变换和宽变换 S…

StreamingT2V文本生成视频多模态大模型,即将开源!

1、前言 Picsart人工智能研究所、德克萨斯大学和SHI实验室的研究人员联合推出了StreamingT2V视频模型。通过文本就能直接生成2分钟、1分钟等不同时间,动作一致、连贯、没有卡顿的高质量视频。 虽然StreamingT2V在视频质量、多元化等还无法与Sora媲美,但…

【鹅厂摸鱼日记(一)】(工作篇)认识八大技术架构

💓博主CSDN主页:杭电码农-NEO💓   ⏩专栏分类:重生之我在鹅厂摸鱼⏪   🚚代码仓库:NEO的学习日记🚚   🌹关注我🫵带你学习更多知识   🔝🔝 认识八大架构 1. 前言2. 架构简介&…

uniapp:小程序腾讯地图程序文件qqmap-wx-jssdk.js 文件一直找不到无法导入

先看问题: 在使用腾讯地图api时无法导入到qqmap-wx-jssdk.js文件 解决方法:1、打开qqmap-wx-jssdk.js最后一行 然后导入:这里是我的路径位置,可以根据自己的路径位置进行更改导入 最后在生命周期函数中输出: 运行效果…

159 Linux C++ 通讯架构实战14,epoll 函数代码实战

ngx_epoll_init函数的调用 //(3.2)ngx_epoll_init函数的调用(要在子进程中执行) //四章,四节 project1.cpp:nginx中创建worker子进程; //nginx中创建worker子进程 //官方nginx ,一个…

为“自研”的KV数据库编写JDBC驱动

一觉醒来,受到梦的启发,自研了一套K/V数据库系统,因为"客户"一直催促我提供数据库的JDBC驱动,无奈之下,只好花费一个上午的时间为用户编写一个。 我们知道,JDBC只定义一系列的接口, 具体的实现需…

python 利用xpath 爬取一周天气

需求: 爬取 中国天气网指定城市一周的天气,以天津为例 实现: 1,先找到一周的数据位置。 divs html.xpath("//div[classhanml]") 2,再遍历每天。 trs div.xpath("./div/div[2]/table//tr[position…

springboot实战---5.最简单最高效的后台管理系统开发

🎈个人主页:靓仔很忙i 💻B 站主页:👉B站👈 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:SpringBoot 🤝希望本文对您有所裨益,如有不足之处&…

JS详解-设计模式

工厂模式: 单例模式: // 1、定义一个类class SingleTon{// 2、添加私有静态属性static #instance// 3、添加静态方法static getInstance(){// 4、判断实例是否存在if(!this.#instance){// 5、实例不存在,创建实例this.#instance new Single…

蓝桥备赛——前缀和

题干 我的 Code(50%样例) 对于上述题目的思路,我的想法是使用两个list存储对应的索引,一个存储头索引,一个存储结束索引。 然后使用全排列,计算所有列表元素之间的索引差,大于等于k的作为符合条件的,使用count计数器加一。 k=int(input()) s,c1,c2=map(str,input()…