使用 LF Edge eKuiper 将物联网流处理数据写入 Databend

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

LF Edge eKuiper

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 的主要目标是在边缘端提供一个流媒体软件框架(类似于 Apache Flink (opens new window))。eKuiper 的规则引擎允许用户提供基于 SQL 或基于图形(类似于 Node-RED)的规则,在几分钟内创建物联网边缘分析应用。具体介绍可以参考 [LF Edge eKuiper - 超轻量物联网边缘流处理软件(https://ekuiper.org/docs/zh/latest/)。 

Databend Sql Sink

eKuiper 支持通过 Golang 或者 Python 在源 (Source)SQL 函数目标 (Sink) 三个方面的扩展,通过支持不同的 Sink,允许用户将分析结果发送到不同的扩展系统中。Databend 作为 Sink 也被集成到了 eKuiper plugin 当中,下面通过一个案例来展示如何使用 eKuiper 将物联网流处理数据写入 Databend。

编译 eKuiper 和 Databend Sql Plugin

eKuiper

git clone https://github.com/lf-edge/ekuiper & cd ekuiper
make

Databend Sql Plugin

go build -trimpath --buildmode=plugin -tags databend -o plugins/sinks/Sql.so extensions/sinks/sql/sql.go

编译后的 sink plugin 拷贝到 build 目录:

cp plugins/sinks/Sql.so _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64/plugins/sinks

Databend 建表

在 Databend 中先创建目标表 ekuiper_test:

create table ekuiper_test (name string,size bigint,id bigint);

启动 eKuiperd

cd _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64 
./bin/kuiperd

服务正常启动:

创建流(stream) 和 规则 (rule)

eKuiper 提供了两种管理各种流、规则,目标端的方式,一种是通过 ekuiper-manager 的 [docker image](https://hub.docker.com/r/lfedge/ekuiper) 启动可视化管理界面,一种是通过 CLI 工具来管理。这里我们使用 CLI。

创建 stream

流是 eKuiper 中数据源连接器的运行形式。它必须指定一个源类型来定义如何连接到外部资源。这里我们创建一个流,从 json 文件数据源中获取数据,并发送到 eKuiper 中。

首先配置文件数据源,连接器的配置文件位于 /etc/sources/file.yaml

default:# 文件的类型,支持 json, csv 和 linesfileType: json# 文件以 eKuiper 为根目录的目录或文件的绝对路径。# 请勿在此处包含文件名。文件名应在流数据源中定义path: data# 读取文件的时间间隔,单位为ms。如果只读取一次,则将其设置为 0interval: 0# 读取后,两条数据发送的间隔时间sendInterval: 0# 是否并行读取目录中的文件parallel: false# 文件读取后的操作# 0: 文件保持不变# 1: 删除文件# 2: 移动文件到 moveTo 定义的位置actionAfterRead: 0# 移动文件的位置, 仅用于 actionAfterRead 为 2 的情况moveTo: /tmp/kuiper/moved# 是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。hasHeader: false# 定义文件的列。如果定义了文件头,该选项将被覆盖。# columns: [id, name]# 忽略开头多少行的内容。ignoreStartLines: 0# 忽略结尾多少行的内容。最后的空行不计算在内。ignoreEndLines: 0# 使用指定的压缩方法解压缩文件。现在支持`gzip`、`zstd` 方法。decompression: ""

使用 CLI 创建 steam 名为 stream1:

./bin/kuiper create stream stream1 '(id BIGINT, name STRING,size BIGINT) WITH (DATASOURCE="test.json", FORMAT="json", TYPE="file");'

Json 文件的内容为:

[{"id": 1,"size":100, "name": "John Doe"},{"id": 2,"size":200, "name": "Jane Smith"},{"id": 3,"size":300, "name": "Kobe Brant"},{"id": 4,"size":400, "name": "Alen Iverson"}
]

创建 Databend Sink Rule

一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。eKuiper 有两种方法来定义规则的业务逻辑。要么使用 SQL / 动作组合,要么使用新增加的图 API。

这里我们通过指定 sql 和 actions 属性,以声明的方式定义规则的业务逻辑。其中,sql 定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action 路由到多个位置。

规则由 JSON 定义,下面是准备创建的规则 myRule.json:

{"id": "myRule","sql": "SELECT id, name from stream1","actions": [{"log": {},"sql": {"url": "databend://databend:databend@localhost:8000/default?sslmode=disable","table": "ekuiper_test","fields": ["id","name"]}}]
}

执行 CLI 创建规则:

./bin/kuiper create rule myRule -f myRule.json

可以查看所创建规则的运行状态:

./bin/kuiper getstatus rule myRule

规则创建后,会立即将符合规则条件的数据发送到目标端,此时我们查看 Databend 的 ekuiper_test 表,可以看到文件数据源中的数据已经被写入到 Databend:

可以看到由于我们的规则 SQL 中只指定了 idname 字段,所以这里只有这两个字段被写入。

结论

eKuiper 是 EMQ 旗下的一款流处理软件,其体积小、功能强大,在工业物联网、车辆网、公共数据分析等很多场景中得到广泛使用。本文介绍如何使用 eKuiper 将物联网流处理数据写入 Databend。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/165773.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大模型必备算力:CPUGPU天梯图(2023年最新版)

在当今计算机世界,CPU、GPU和显卡的性能成为了衡量计算机性能的重要指标。今天深入了解CPU、GPU和显卡天梯图。 首先,CPU作为计算机的大脑,负责处理各种任务。它的性能主要由核心数、主频和缓存大小决定。其中,核心数和主频决定了…

kubeadm初始化搭建cri-dockerd记录 containerd.io

07.尚硅谷_搭建K8s集群(kubeadm方式)-部署master节点_哔哩哔哩_bilibili 视频里的版本只有1.17而现在(2023.10.20)kubernetes最新版本是1.28,需要搭载cri-dockerd, 先去网站下载了对应的rpm包cri-dockerd…

计算机网络篇之TCP滑动窗口

文章目录 前言概述 前言 在网络数据传输时,若传输的原始数据包比较大,会将数据包分解成多个数据包进行发送。需要对数据包确认后,才能发送下一个数据包。在等待确认包的这个过程浪费了大量的时间,不过还好TCP引入了滑动窗口的概念…

2022最新版-李宏毅机器学习深度学习课程-P26 Recurrent Neural Network

RNN 应用场景:填满信息 把每个单词表示成一个向量的方法:独热向量 还有其他方法,比如:Word hashing 单词哈希 输入:单词输出:该单词属于哪一类的概率分布 由于输入是文字序列,这就产生了一个问…

【算法|动态规划No.27】leetcode516. 最长回文子序列

个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 🍔本专栏旨在提高自己算法能力的同时,记录一下自己的学习过程,希望…

快速自动化处理JavaScript渲染页面的方法

目录 一、使用无头浏览器 二、使用JavaScript渲染引擎 三、使用前端框架工具 随着互联网技术的不断发展,JavaScript已经成为Web开发中不可或缺的一部分。然而,在自动化处理JavaScript渲染页面方面,却常常让开发者感到头疼。本文将介绍一些快…

02 开闭原则

官方定义: 开闭原则规定软件中的对象、类、模块和函数对扩展应该是开放的,但对于修 改是封闭的。这意味着应该用抽象定义结构,用具体实现扩展细节,以此确保 软件系统开发和维护过程的可靠性。 通俗解释: 对扩展开放…

基础MySQL的语法练习

基础MySQL的语法练习 create table DEPT(DEPTNO int(2) not null,DNAME VARCHAR(14),LOC VARCHAR(13) );alter table DEPTadd constraint PK_DEPT primary key (DEPTNO);create table EMP (EMPNO int(4) primary key,ENAME VARCHAR(10),JOB VARCHAR(9),MGR …

运营商大数据精准营销,击碎你的固化营销思维

大数据精准营销服务是大数据应用的典型场景之一,也是依托大数据和互联网提升企业效率的一种有效手段。但是,在选择大数据服务的很多时候,企业往往要考虑法律与合规的问题,其中比较重要的是数据获取渠道与数据是否脱敏。在所有大数…

Labview2023安装教程 (最新最详细保姆级教程)

目录 一 .简介 二.安装步骤 软件:Labview版本:2023语言:简体中文大小:2.73G安装环境:Win11/Win10/Win8/Win7硬件要求:CPU2.6GHz 内存8G(或更高)下载通道①百度网盘丨下载链接: htt…

英语——分享篇——每日200词——2401-2600

2401——moisture——[mɔɪstʃə(r)]——n.潮气,湿气,水分——moisture——moist潮湿的(熟词)ur你的(编码your)e鹅(编码)——潮湿的地方你的鹅一身潮气——Moisture in the atmosphere condensed into dew during the night.——大气中的水分在夜间凝结…

微信小程序设计之主体文件app-json-pages

一、新建一个项目 首先,下载微信小程序开发工具,具体下载方式可以参考文章《微信小程序开发者工具下载》。 然后,注册小程序账号,具体注册方法,可以参考文章《微信小程序个人账号申请和配置详细教程》。 在得到了测…

【论文解读】Prefix-Tuning: Optimizing Continuous Prompts for Generation

一.介绍 1.1 前置知识 1.1.1 in-context learning At the limit, GPT-3 (Brown et al, 2020) can be deployed using in-context learning, which is a form of prompting, without modifying any LM parameters. "部署" 指的是将 GPT-3 模型用于实际应用或特定任务…

遗传算法求解旅行商问题(含python源代码)

目录 前言 编码初始化种群 计算适应度 选择 交叉 变异 完整代码 总结 前言 这次的算法有一点不能确定是否正确,希望有大佬能够批评指正。 遗传算法的一般步骤 编码初始化种群 种群(population)指同一时间生活在一定自然区域内&…

C++ 继承

前言 本文将会为你带来关于继承的相关知识(概念、定义、基类的成员变量访问形式、隐藏、父子类之间的赋值、派生类的默认成员函数、菱形继承与虚继承) 继承的概念以及定义 继承(inheritance)机制是面向对象程序设计使代码可以复用的最重要的手段&…

ANR系列之八:疑难ANR问题处理记录

前言: 本文仅是记录作者自身处理过的ANR问题,以及帮助他人解决过的ANR问题。本文中所介绍的ANR处理记录仅供参考,并不适用所有场景。并且最终结论和分析并不一定就是绝对正确的。 案例1.页面切换时前台应用焦点未获得 案例编号:…

无代码的未来

随着无代码技术越来越成熟,很多web应用已经可以基于无代码平台进行开发。本文分析了4个最流行的无代码平台,并梳理了无代码行业今后可能的发展方向。原文: The future of NoCode 所有无代码编辑器都需要回答的问题 当需要选择无代码解决方案时&#xff0…

Vue基础语法核心指令过滤器计算属性监听属性

目录 1. 模板语法 1.1 插值 1.1.1 文本 1.1.2 html 1.1.3 属性 1.1.4 表达式 1.2 指令 1.2.1 核心指令 1.2.1.1 v-if |v-else-if|v-else 1.2.1.2 v-show 1.2.1.3 v-for 1.2.1.4 v-on|v-model|v-for 1.2.1.5 参数 v-bind:href,v-on:click 1.2.1.6 简写 2. 过滤器…

[AUTOSAR][网络管理] 什么是BusOff? 如何实现它?

文章目录 一、BusOff检测机制(1)简介(2)目录介绍(3)软件实现逻辑代码运行流程如下:二、测试方法(1)物理测试三、示例代码(1) busoff_recovery.c(2) busoff_recovery.h一、BusOff检测机制 (1)简介 CAN控制器可以判断出错误的类型是总线上暂时的数据错误(如外部干扰等…

Python中的内存管理:深入分析垃圾回收机制

python中有一个名为refchian的环状双向链表,python运行时创建的所有对象都会添加到refchain中。在refchain中的对象PyObject里都有一个ob_refcnt用来保存当前对象的引用计数器,就是该对象被引用的次数,当对象有新引用时ob_refcnt就会增加&…