NIFI使用

1 从Kafka接收消息,存储到数据库中。

在这里插入图片描述
(1) ConsumerKafka processor
在这里插入图片描述
(2)Execute Scripts Processor
我这里是使用JS脚本进行处理。 还有很多其他语言的脚本。
在这里插入图片描述

var flowFile = session.get();
if (flowFile != null) {var IOUtils = Java.type("org.apache.commons.io.IOUtils");var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");var DateFormatUtils=Java.type("org.apache.commons.lang3.time.DateFormatUtils");// var dataType=flowFile.getAttribute('data_type')// var FLAG=flowFile.getAttribute('flag')var tm = null;try {flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {var inputText = IOUtils.toString(inputStream, StandardCharsets.UTF_8);var msg = JSON.parse(inputText);var stationId = msg['stationId'];var stationName = msg['stationName'];var deviceId = msg['deviceId'];var deviceName = msg['deviceName'];var deviceNo = msg['deviceNo'];var receiveType = msg['receiveType'];var createAt = msg['createAt'];var createAtString=DateFormatUtils.format(Number(createAt),'yyyy-MM-dd HH:mm:ss');var obTime = msg['obTime'];var obDate = msg['obDate'];var obDateString=DateFormatUtils.format(Number(obDate),'yyyy-MM-dd HH:mm:ss');var order = msg['order'];var distance = msg['distance'];var channel1SignalStrength = msg['channel1SignalStrength']var powerVoltage = msg['powerVoltage']var sql = 'insert into "SJZT_ODS"."water_data_distance"('+ '"station_id", "station_name", "device_id", "device_name", "device_no", "receive_type", "create_at", "ob_time", "ob_date", "order", "distance", "channel1_signal_strength", "power_voltage")'+ 'VALUES('+ stationId + ', \'' + stationName + '\', ' + deviceId + ', \'' + deviceName + '\', \'' + deviceNo + '\', ' + receiveType + ', \'' + createAtString + '\', \'' + obTime + '\', \'' + obDateString + '\', ' + order + ', ' + distance + ', ' + channel1SignalStrength + ', ' + powerVoltage + ')';outputStream.write(sql.getBytes(StandardCharsets.UTF_8));}));// flowFile = session.putAttribute(flowFile, "tm",tableName);session.transfer(flowFile, REL_SUCCESS);} catch (e) {flowFile = session.putAttribute(flowFile, "rsvr.transfer.error", e);session.transfer(flowFile, REL_FAILURE);}
}

注意: 这里只是生成了一个sql字符串,并没有执行sql,因此需要后面的processor来执行sql语句。
(3)PutSql processor
在这里插入图片描述
注意:autocommit要设置为true,否则看不到数据库里面的数据的。

2 将一堆Processors移动到一个Group里面界面操作

貌似没有直接的移动操作。
(1) Ctrl + A 全选要移动的processors
(2) 点击左边的group按钮
在这里插入图片描述
(3)为新的Group命名
(4)好了。选中的所有的processors都移动到了自己新创建的group中了。

参考材料

[1] https://blog.csdn.net/guijianchouxyz/article/details/120340154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/487636.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【leetcode】替换后的最长重复字符、将字符串翻转到单调递增

1.替换后的最长重复字符 示例如下: 下面我们来分析一下一个例子,其中K 2 暴力枚举 这里的字符串s是仅由大写字母组成,首先我们尝试用暴力解法的思路来想一下这道题,通过从第一个字符开始进行枚举,如果出现了条件判断…

如何制作“优美”PPT

目录 1.免费PPT模板网站: 2.免费有较好质量的图片网站: 免费图片资源 免费透明PNG图片资源: 免费icon图片资源: 3.选择好的图片: 图片底色 4.要与不要 千万不要: 一定要: 6.一些建议…

类和对象一

目录 1.类的引入 2.类的定义 3.访问限定符 4.类的作用域 5.类对象模型 6.类的大小 1.类的引入 C语言结构体中只能定义变量,在C中,结构体不仅可以定义变量,也可以定义函数。 C兼容C语言,结构用法可以继续使用 同时sruct也升…

【计算机网络】实验13:运输层端口

实验13 运输层端口 一、实验目的 本次实验旨在验证TCP和IP运输层端口号的作用,深入理解它们在网络通信中的重要性。通过实验,我将探讨端口号如何帮助区分不同的应用程序和服务,使得在同一台主机上能够同时运行多个网络服务而不发生冲突。此…

关于睡懒觉

我们经常听到一个词:睡懒觉。 我认为,睡懒觉这个词,是错误的。 人,是需要睡眠的,睡不够,就不会醒。睡够了,自然会醒,也不想继续睡。不信你试试,睡够了,你…

【推荐算法】单目标精排模型——FiBiNET

key word: 学术论文 Motivation: 传统的Embedding&MLP算法是通过内积和Hadamard product实现特征交互的,这篇文章的作者提出了采用SENET实现动态学习特征的重要性;作者认为简单的内积和Hadamard product无法有效对稀疏特征进行特征交互&a…

C语言:define定义常量和定义宏(详解)

本篇博客给大家带来的是#define定义常量和#define定义宏的方法 🐟🐟文章专栏:C语言 🚀🚀若有问题评论区下讨论,我会及时回答 ❤❤欢迎大家点赞、收藏、分享 你们的支持就是我创造的动力 今日思想&#xff1…

Spring Boot如何实现防盗链

一、什么是盗链 盗链是个什么操作,看一下百度给出的解释:盗链是指服务提供商自己不提供服务的内容,通过技术手段绕过其它有利益的最终用户界面(如广告),直接在自己的网站上向最终用户提供其它服务提供商的…

Unity入门(了解生命周期)

目录 1.新建Script(给物体添加C#代码) 2.Unity常用的生命周期介绍 1.新建Script(给物体添加C#代码) 首先点击物体,选择Add Component 搜索 New Script,自命名添加这里命名为PlayerController 2.打开Pla…

【OpenCV】图像阈值

简单阈值法 此方法是直截了当的。如果像素值大于阈值,则会被赋为一个值(可能为白色),否则会赋为另一个值(可能为黑色)。使用的函数是 cv.threshold。第一个参数是源图像,它应该是灰度图像。第二…

神经网络中的过拟合问题及其解决方案

目录 ​编辑 过拟合的定义与影响 过拟合的成因 1. 模型复杂度过高 2. 训练数据不足 3. 训练时间过长 4. 数据特征过多 解决方案 1. 数据增强 2. 正则化 3. Dropout 4. 提前停止 5. 减少模型复杂度 6. 集成学习 7. 交叉验证 8. 增加数据量 9. 特征选择 10. 使…

Redis篇-4--原理篇3--Redis发布/订阅(Pub/Sub)

1、概述 Redis 发布/订阅(Publish/Subscribe,简称 Pub/Sub)是一种消息传递模式,允许客户端订阅一个或多个通道(channel),并接收其他客户端发布到这些通道的消息。 2、Redis 发布/订阅的主要概…

ubuntu 7z解压rar文件报错:unsupported method message

问题说明 最近项目需要支持线上上传rar格式,7z来解压缩入库。开发测试过程中发现使用以下命令解压报错, 7z x FileImportTest01.rar -p"123456" -o/home/download -y文件目录内容已列出,但无法解压文件!!! 仔细检查命令没有问题…

流网络等价性证明:边分解后的最大流保持不变

流网络等价性证明:边分解后的最大流保持不变 问题描述证明思路伪代码C 代码实现解释问题描述 在流网络中,证明将一条边分解为两条边所得到的是一个等价的网络。具体来说,假设流网络 $ G $ 包含边 $ (u, v) $,我们以如下方式创建一个新的流网络 $ G’ $: 创建一个新结点 $…

编译问题 fatal error: rpc/rpc.h: No such file or directory

在编译一些第三方软件的时候,会经常遇到一些文件识别不到的问题,这里整理下做个归总。 目前可能的原因有(排序分先后): 文件不存在;文件存在但路径识别不了;…… 这次以常见的编译lmbench测试…

小皮面板(PHPSTUDY)配置多个域名或IP

问题描述 小皮面板默认采用nginx的静态部署,按照使用nginx的习惯只需要额外添加一个server即可,但是会发现直接往配置文件里添加新的server是不生效的,小皮的官网论坛几乎已经停止维护,因此资料较少,原本也没有仔细使…

《AI行政管理:开启高效治理新时代》

一、引言 AI 行政管理能力的定义和重要性 AI 行政管理能力是指人工智能在行政管理领域的应用能力。它涵盖了多个方面,包括政府决策支持、公共服务优化、行政流程自动化、社会治理与公共安全以及政府内部管理等。在当今时代,AI 行政管理能力具有至关重要…

Golang使用etcd构建分布式锁案例

在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统。分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要。它有助于维护一致性,防止竞争条件,并确保在任何给定时间只有一个进程独占访问资源。 我们将使用Go作为编程语言&am…

数字IC后端实现常见的physical only cell都有哪些?如何添加这些cell?

数字IC后端实现阶段常见功能cell有哪些?比如AND,AOI,NAND等。 physical cell有哪些?都是干什么用的? 数字后端零基础入门系列 | Innovus零基础LAB学习Day9 (1) well tap cells:防止…

【NVIDIA orin nx 安装ultralytics yolov11】

注意:不同用户安装的python可能会在不同的路径,因此不同的pip管理会导致安装的 torch和torchvision会在不同的路径下 记得区分用户来运行yolo 一、确认系统 JetPack 版本 此处使用5.1.1 1、查看JetPack 版本 jtop二、安装 ultralytics、pytorch、torchvision、onnxruntime…