NIFI实现JSON转SQL并插入到数据库表中

说明

本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI

需求背景

现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库建表语句。

json数据

[{"name": "张三","age": 23,"gender": 1},{"name": "李四","age": 24,"gender": 1},{"name": "小红","age": 18,"gender": 0}
]

建表语句

CREATE TABLE `sys_user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',`age`  int NOT NULL DEFAULT 0 COMMENT '年龄',`gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

json数据中的属性名和数据库字段的名要一一对应,要不然后期还得做转换,比较麻烦

创建文件流

添加处理器:GetFile

点击工具栏的Processor,拖拽到画布中

筛选GetFile,点击ADD添加到画布中

配置GetFile处理器

双击添加的处理器,弹出对应的配置界面

可选操作)点击SETTINGS选项,在Name中输入处理器的名称:获取文件内容

点击SCHEDULING,在Run Schedule中输入定时器的时间,这里设置每10秒运行一次,如果不设置后面运行处理器的时候会无限循环运行

 点击PROPERTIES选项

 配置PROPERTIES,分别填写Input DirectoryFile FilterKeep Source File,其他选项默认即可。

说明:博主的NIFI是使用docker安装的,容器的数据全部挂载到了宿主机中,NIFI的HOME默认是在/opt/nifi/nifi-current,挂载到宿主机的路径为:/root/data/nifi/nifi-current。所以Input Directory中填写的路径/opt/nifi/nifi-current/mydata/file 实际对应宿主机路径为:/root/data/nifi/nifi-current/mydata/file,到时候把测试文件放到宿主机的/root/data/nifi/nifi-current/mydata/file下面即可

将文件放到对应的目录下

说明:mydata/file中的所有文件需要有读写的权限,否则后面读取文件会报错

修改权限:

chmod +777 /root/data/nifi/nifi-current/mydata/file

(可选操作)测试处理器配置是否成功

 添加LogAttribute处理器

连接处理器

将鼠标放到第一个处理器上,然后点击出现的箭头,将其拖拽到第二个处理器中,等待线条由红色变为绿色后,松开鼠标即可。

在弹出的界面中勾选success,然后点击ADD

 第一个显示红色方框的代表当前处理器可以正常使用;第二个出现黄色三角感叹号的代表当前处理器有问题,双击第二个处理器。

在弹出的界面选择RELATIONSHIPS选项卡,在success下勾选terminate,最后点击APPLY

说明:success下面的两个选项:terminate和retry分别代表着当前处理器执行成功的操作

terminate代表成功后终止,retry代表成功后继续尝试

可以看到黄色的三角变成了红色的方框,表示当前处理器没问题了。

运行处理器

运行处理器有两种方式,第一种是一个一个单独运行另一种是直接运行全部

第一种

鼠标放到第一个处理器中然后右键,可以看到有一堆选项,这里运行处理器可以选择Sart或者Run Once,为了方便调试,这里选择Run Once即只运行一次

点击Run Once之后可以看到,在处理器的右上角多了一个标志,这个代表当前有几个线程在运行中

 当处理器的任务执行结束后可以看到两个处理器的连接处会显示当前有几个队列,以及队列数据总的大小

将鼠标放到两个处理器的连接处,鼠标右键,选择List queue

 在弹出的界面中可以看到等待中的队列列表

选择其中一个队列,点击左上角的提示,可以看到上一个处理器(GetFile)的一些信息,包括一些属性啊什么的,这个可以自己去看,这里不再仔细说明。点击OK可以关闭当前的弹框

 点击某一个队列的右上角,第一个可以下载当前的内容,中间的小眼睛可以查看队列中的数据

 点击小眼睛,可以看到文件中的内容显示在了页面中,默认是original,也可以选择formatted和hex

 运行第二个处理器(LogAttribute),同样的鼠标放到处理器上,然后选择Run Once即可

然后可以在nifi的日志中看到打印了一些日志,主要包括了处理器的属性和内容

说明:如果要想打印出文件的内容,LogAttribute处理器需要选择以下内容

正常打印数据说明GetFile处理器配置的没问题

Json数组分隔

添加处理器:SplitJson

 配置SplitJson处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

JsonPath Expression(JSON 路径表达式):指定要提取的 JSON 对象的路径。例如,如果要提取根级别的 JSON 对象,可以将路径设置为 $

连接处理器

将GetFile处理器和SplitJson处理器连接起来,勾选For Relationships,然后选择ADD

可选操作)测试处理器配置是否成功

将SplitJson处理器和LogAttribute处理器连接,连接处理器中的For Relationships选择split

 此时发现SplitJson处理器还在告警,双击SplitJson处理器,选择RELATIONSHIPS,按照如图勾选

此时所有的处理器已正常显示

开启所有的处理器(在画布空白处鼠标右键,点击Start),查看nifi容器的日志,可以看到此时日志打印出来的不再是整个文件的内容,而是单独一条一条json数据

停止所有处理器(画布空白处鼠标右键,选择Stop),清空队列中的数据,在连接处鼠标右键,选择Empty queue

Json转为SQL

添加处理器:ConvertJSONToSQL

 配置ConvertJSONToSQL处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

配置JDBC Connection Pool

Value下面点击,选择Create new service

根据自己的情况选择对应的services,我这里选择的是默认的 

点击最后面的右箭头

点击右侧的小齿轮

切换到SETTINGS选项卡,给驱动起个名字,方便以后识别

切换到PROPERTIES选项卡 ,配置数据库相关参数,其他按照默认的即可

校验参数配置是否正确,点击右上角的对号

校验通过会出现绿色对钩,如果配置不对会有对应提示,最后点击APPLY

开启JDBC的配置,点击闪电符号,在弹出的界面点击ENABLE,最后点击CLOSE

 最后可以看到state已经变为Enabled,点击右上角的X关闭

到此JDBC的配置结束

配置Statement Type

再次双击处理器,配置Statement Type,选择INSERT,代表生成的是INSERT语句

配置Table Name

校验配置是否正确

最后点击APPLY

连接处理器

将SplitJson处理器和ConvertJSONToSQL处理器进行连接,Relationships选择split

可选操作)测试处理器配置是否成功

这里跳过测试,如果需要测试自己的配置是否正确的,可以自行将处理器和LogAttribute处理器进行连接进行测试,以下是博主自己的测试结果,做个参考,最后面会打印生成的SQL语句

执行生成的SQL

添加处理器:PutSQL

配置PutSQL处理器

双击处理器,在PROPERTIES选项卡中配置以下内容,其他内容默认即可

 

 校验配置是否正确

最后点击APPLY

连接处理器

将ConvertJSONToSQL处理器和PutSQL处理器进行连接,Relationships选择sql

 处理PutSQL处理器的告警

双击处理器,在RELATIONSHIPS选项卡配置勾选以下内容

完整的配置结果

包含四个处理器,依次为GetFile=>SplitJson=>ConvertJSONToSQL=>PutSQL

 开启所有的处理器

数据库是否有数据

可以看到现在的数据库里面还是没有数据的

 开启处理器

在画布的空白位置,鼠标右键选择Start

开启后可以看到所有的处理器左上角都显示为绿色三角,表示处理器已经启动了,过十几秒再看处理器,发现已经有数据流入

 查看数据库数据

此时数据库已经有数据插入,重复数据是因为每隔10秒执行一次任务,就会读取一次文件,然后重复往数据库插入数据,如果不想让数据不停插入数据库,可以将GetFile中的PROPERTIES下的Keep Source File设置为false即可(此操作需要停止处理器才能够设置)

结束语

NIFI学习需要花费一定的时间去仔细研究,它里面内置了大概300多个处理器,每个处理器实现的功能都不一样,配置也都不同。博主也正在不断地学习中,后续也会不断分享关于NIFI的内容,如果有什么疑问欢迎评论区进行评论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/125008.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

初试小程序轮播组件

文章目录 一、轮播组件(一)swiper组件1、功能描述2、属性说明 (二)swiper-item组件1、功能描述2、属性说明 二、案例演示(一)运行效果(二)实现步骤1、创建小程序项目2、准备图片素材…

工程管理系统简介 工程管理系统源码 java工程管理系统 工程管理系统功能设计

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展,企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性,公司对内部工程管…

微信小程序 房贷计算器 js代码终极版

这里写目录标题 展示图1.在utils 中创建文件calculateMortgage.ts2. 在需要使用的地方引入并传参 展示图 1.在utils 中创建文件calculateMortgage.ts /** 假设房贷本金是60万,贷款年利率为4.8%即月利率为0.4%,分20年即240期偿还,等额本金还款…

AWS-数据库迁移工具DMS-场景:单账号跨区域迁移RDS for Mysql

参考文档: 分为几个环节: 要使用 AWS DMS 迁移至 Amazon RDS 数据库实例: 1.创建复制实例 有坑内存必须8g或者以上,我测试空库 都提示内存不足 2.创建目标和源终端节点 目标空库也得自己创建哈 3.刷新源终端节点架构 4.创建迁…

echarts环图配置

echarts环图配置 1、安装echarts npm install echarts4.9.02、页面引入echarts import echarts from echarts;3、应用 template片段 <div class"chart-wrap"><div id "treeChart" style "width: 180px; height:180px;" ><…

vite介绍

vite vite是一种新的前端构建工具&#xff0c;vite借助了浏览器对ESM的支持&#xff0c;采用和传统webpack打包完全不一致的unbundle打包机制&#xff1b; vite的快主要体现在两个方面&#xff0c;快速的冷启动和快速的热更新 快速的冷启动&#xff1a;vite只需启动一台静态页…

LeetCode刷题笔记【24】:贪心算法专题-2(买卖股票的最佳时机II、跳跃游戏、跳跃游戏II)

文章目录 前置知识122.买卖股票的最佳时机II题目描述贪心-直观写法贪心-优化代码更简洁 55. 跳跃游戏题目描述贪心-借助ability数组贪心-只用int far记录最远距离 45.跳跃游戏II题目描述回溯算法贪心算法 总结 前置知识 参考前文 参考文章&#xff1a; LeetCode刷题笔记【23】…

SpringBoot隐藏文件

1.设置 2.输入file Types 3.点击忽略文件或者文件夹 4.成功

Linux下go环境安装、环境配置并执行第一个go程序

一、安装 1.Golang对Linux的内核版本要求 GO对Linux内核版本最低要求是 2.6.23&#xff0c;对应要求操作系统版本是&#xff1a; RHEL 6.0CentOS 6.0即&#xff0c;不支持 (RHEL 和 CentOS) 的 (4.x or 5.x)。2.下载golang的代码版本 Golang的官网下载地址&#xff1a;https:…

Qt 简单闹钟

//wiget.h#ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTime> //时间类 #include <QTimer> //定时器类 #include <QTextToSpeech> #include <QDebug> QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPA…

文件上传与下载

文章目录 1. 前端要求2. 后端要求 1. 前端要求 //采用post方法提交文件 method"post" //采用enctype属性 enctype"" //type属性要求 type"file"2. 后端要求 package com.itheima.reggie.controller;import com.itheima.reggie.common.R; impo…

(数字图像处理MATLAB+Python)第十二章图像编码-第三、四节:有损编码和JPEG

文章目录 一&#xff1a;有损编码&#xff08;1&#xff09;预测编码A&#xff1a;概述B&#xff1a;DM编码C&#xff1a;最优预测器 &#xff08;2&#xff09;变换编码A&#xff1a;概述B&#xff1a;实现变换编码的主要问题 二&#xff1a;JPEG 一&#xff1a;有损编码 &am…

README

一、Markdown 简介 Markdown 是一种轻量级标记语言&#xff0c;它允许人们使用易读易写的纯文本格式编写文档。 应用 当前许多网站都广泛使用 Markdown 来撰写帮助文档或是用于论坛上发表消息。例如&#xff1a;GitHub、简书、知乎等 编辑器 推荐使用Typora&#xff0c;官…

使用Akka的Actor模拟Spark的Master和Worker工作机制

使用Akka的Actor模拟Spark的Master和Worker工作机制 Spark的Master和Worker协调工作原理 在 Apache Spark 中&#xff0c;Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程&#xff1a; Worker 启动后&#xff0c…

Redis 7 第九讲 微服务集成Redis 应用篇

Jedis 理论 Jedis是redis的java版本的客户端实现&#xff0c;使用Jedis提供的Java API对Redis进行操作&#xff0c;是Redis官方推崇的方式&#xff1b;并且&#xff0c;使用Jedis提供的对Redis的支持也最为灵活、全面&#xff1b;不足之处&#xff0c;就是编码复杂度较高。 …

如何选择合适的HTTP代理服务器

HTTP代理服务器是一种常见的网络代理方式&#xff0c;它可以帮助用户隐藏自己的IP地址&#xff0c;保护个人隐私和安全。然而&#xff0c;选择合适的HTTP代理服务器并不容易&#xff0c;需要考虑多个因素。本文将介绍如何选择合适的HTTP代理服务器。 了解代理服务器的类型 HTT…

中使用pack局管理器:管理器布置小部件

一、说明 在本教程中&#xff0c;我们将了解如何制作登录 UI。今天的教程将重点介绍如何使用 Tkinter 的pack布局管理器。 二、设计用户界面 什么是布局管理器&#xff1f;创建图形界面时&#xff0c;窗口中的小部件必须具有相对于彼此排列的方式。例如&#xff0c;可以使用微件…

Vue + Element UI 前端篇(十一):第三方图标库

Vue Element UI 实现权限管理系统 前端篇&#xff08;十一&#xff09;&#xff1a;第三方图标库 使用第三方图标库 用过Elment的同鞋都知道&#xff0c;Element UI提供的字体图符少之又少&#xff0c;实在是不够用啊&#xff0c;幸好现在有不少丰富的第三方图标库可用&…

Python 网页爬虫原理及代理 IP 使用

目录 前言 一、Python 网页爬虫原理 二、Python 网页爬虫案例 步骤1&#xff1a;分析网页 步骤2&#xff1a;提取数据 步骤3&#xff1a;存储数据 三、使用代理 IP 四、总结 前言 随着互联网的发展&#xff0c;网络上的信息量变得越来越庞大。对于数据分析人员和研究人…

【多思路附源码】2023高教社杯 国赛数学建模C题思路 - 蔬菜类商品的自动定价与补货决策

赛题介绍 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差&#xff0c; 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&#xff0c; 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬…