NIFI实现数据库数据增量同步

说明

nifi版本:1.23.2(docker镜像)

需求背景

将数据库中的数据同步到另一个数据库中,要求对于新增的数据和历史有修改的数据进行增量同步

模拟数据

建表语句

源数据库和目标数据库结构要保持一致,这样可以避免后面单独转换

-- 创建测试表
CREATE TABLE `sys_user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',`age`  int NOT NULL DEFAULT 0 COMMENT '年龄',`gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

测试数据

-- 模拟数据
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据1', 20, 1);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据2', 21, 1);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据3', 21, 0);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据4', 18, 0);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据5', 22, 1);

完整测试数据

配置数据库连接池

在画布空白位置鼠标右键,选择Configure

新增配置 

在弹出的界面点击+号,添加新的数据库连接池配置,如果已经有了配置该步骤可以跳过

 在弹出的界面筛选对应类型的连接池,我这里选择DBCPConnectionPool,然后点击ADD

点击刚才新添加的那一条数据右侧的小齿轮,进行连接池相关的配置

配置连接池相关属性 

主要配置以下几个内容,其他的根据情况决定是否需要修改,密码输入后是不会显示的

校验属性

校验配置是否正确,点击右上角的对钩,然后在弹出的界面点击VERIFY进行验证

 验证通过会全部显示绿色,如果某一条不通过会有提示,最后点击APPLY

(可选操作)给配置起个名字

为了方便后续使用,给连接池起个名字,要不然以后配置多了会分不清

激活连接池的配置

点击右侧的闪电标志激活配置,在新的页面中点击ENABLE激活,最后点击CLOSE关闭

已激活的配置

同理增加目标数据库的连接池配置,步骤和上面是一样的这里不再重复了,最终配置好后会有两个连接池的配置。如下:

获取数据库表数据

添加处理器:QueryDatabaseTable

点击工具栏的Processor,拖拽到画布中,筛选QueryDatabaseTable处理器,然后点击ADD添加到画布中

配置处理器:QueryDatabaseTable

双击处理器,切换到PROPERTIES选项卡,配置以下内容

Maximum-value Columns(最大值列):官方文档是这么解释的:以逗号分隔的列名列表。处理器将跟踪自处理器开始运行以来返回的每一列的最大值。使用多个列意味着列列表的顺序,并且每列的值预计比前几列的值增加得更慢。因此,使用多个列意味着列的分层结构,通常用于对表进行分区。此处理器可用于仅检索自上次检索以来添加/更新的那些行。请注意,某些 JDBC 类型(如 bit/boolean)不利于保持最大值,因此这些类型的列不应列在此属性中,并且将导致处理过程中的错误。如果未提供列,则将考虑表中的所有行,这可能会对性能产生影响。注意:为给定表使用一致的最大值列名非常重要,这样增量提取才能正常工作。
支持表达式语言:true

校验属性

给处理器起个名字,表示当前整个工作流的作用

拆分数据

添加处理器:SplitAvro

配置处理器:SplitAvro

双击处理器,切换到PROPERTIES选项卡,所有内容默认即可

数据入库

添加处理器:PutDatabaseRecord

配置处理器:PutDatabaseRecord

双击处理器,切换到PROPERTIES选项卡

新增Record Reader

配置AvroReader

点击右侧的箭头,在弹出的界面选择刚才配置的Reader,然后点击右侧的小齿轮

 在弹出的界面根据自己的需要自行配置,这里按照默认的配置即可

 激活Reader

点击右侧的闪电标志进行激活

 激活后的状态变为Enabled

其他配置

校验属性

连接所有处理器

连接处理器

连接QueryDatabaseTable和SplitAvro两个处理器,勾选For Relationships下的success

连接SplitAvro和PutDatabaseRecord两个处理器,勾选For Relationships下的split

处理SplitAvro处理器的告警

双击SplitAvro处理器,切换到RELATIONSHIPS,勾选下面的两个选项,然后点击APPLY

 处理PutDatabaseRecord处理器的告警

双击PutDatabaseRecord处理器,切换到RELATIONSHIPS,勾选下面的选项,然后点击APPLY

 完整配置

 启动所有处理器

QueryDatabaseTable处理器默认是一分钟执行一次的,可以在SCHEDULING选项卡下面进行配置,这里按照默认的时间来执行

 在画布空白位置鼠标右键选择Start启动所有的处理器

 

查看目标数据库数据

等待一分钟后查看目标数据库数据,发现源数据库的5条数据被同步到了目标数据库

 修改源数据库的数据

UPDATE sys_user SET is_deleted = 1 WHERE id = 1;
UPDATE sys_user SET is_deleted = 1 WHERE id = 4;
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据6', 22, 1);

再次查看目标数据库数据

等待处理器执行后,查看目标数据库数据发现新的数据已经被同步过去

 可以看到最后一个处理器最终由8条记录流入

结束语

以上便是使用NIFI增量同步数据库数据的全过程,如果有什么疑问欢迎评论区进行评论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/125399.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【美团3.18校招真题1】

大厂笔试真题网址:https://codefun2000.com/ 塔子哥刷题网站博客:https://blog.codefun2000.com/ 小美剪彩带 提交网址:https://codefun2000.com/p/P1088 题意:找出区间内不超过k种数字子数组的最大长度 使用双指针的方式&…

基于SSM的学校运动会信息管理系统

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…

Paimon+StarRocks 湖仓一体数据分析方案

本文整理自阿里云高级开发工程师曾庆栋(曦乐)在 Streaming Lakehouse Meetup 分享的内容,深入探讨了传统数据仓库分析、PaimonStarRocks湖仓一体数据分析、StarRocks 与 Paimon 的协同使用方法与实现原理,以及StarRocks 社区湖仓分…

Android高通 8.1 老化apk打开摄像头花屏问题

1、最近由于公司VR 3D系统要做双Camera老化测试apk,同时老化4小时需要轮询切换二个摄像头,保证后面camera标定精度数据更准确。 2、一开始我尝试用之前方案移植过去然后同时打开双摄像头 突然发现花屏 如下图所示 3、于是一第一时间想到是不是分辨率不兼…

揭秘iPhone 15 Pro Max:苹果如何战胜三星

三星Galaxy S23 Ultra在我们的最佳拍照手机排行榜上名列前茅有几个原因,但iPhone 15 Pro Max正在努力夺回榜首——假设它有一个特定的功能。别误会我的意思,苹果一直在追赶三星,因为它的iPhone 14 Pro和14 Pro Max都表现强劲。尽管如此&#…

如何把Android Framework学彻底?一条龙学习

Framework通俗易懂 平时学习 Android 开发的第一步就是去学习各种各样的 API,如 Activity,Service,Notification 等。其实这些都是 Framework 提供给我们的。Framework 层为开发应用程序提供了非常多的API,我们通过调用这些 API …

Java虚拟机反射机制

1 什么是Java虚拟机反射机制? 虚拟机在运行期间,对于任何一个类,我们都能知道其内部信息,包括属性,方法,构造函数,实现接口;对于任何一个对象,我们都能获取其字段值、调…

【Redis】Redis 的学习教程(七)之 SpringBoot 集成 Redis

在前几篇文章中,我们详细介绍了 Redis 的一些功能特性以及主流的 java 客户端 api 使用方法。 在当前流行的微服务以及分布式集群环境下,Redis 的使用场景可以说非常的广泛,能解决集群环境下系统中遇到的不少技术问题,在此列举几…

软件测试面试:app闪退的原因(超详细~)

APP闪退的原因是软件测试面试中常见的问题,遇到这个问题时我们应该如何回答呢?实际的测试过程遇到APP闪退的问题应该排查呢? 今天这篇文章就来告诉你答案。 同时,我也为大家准备了一份软件测试视频教程(含面试、接口…

Vue2进阶篇学习笔记

文章目录 Vue2进阶学习笔记前言1、Vue脚手架学习1.1 Vue脚手架概述1.2 Vue脚手架安装1.3 常用属性1.4 插件 2、组件基本概述3、非单文件组件3.1 非单文件组件的基本使用3.2 组件的嵌套 4、单文件组件4.1 快速体验4.2 Todo案例 5、浏览器本地存储6、组件的自定义事件6.1 使用自定…

计算机毕业设计 基于SSM的问卷调查管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ 🍅文末获取源码联系🍅 👇🏻 精…

OpenCV(三十二):轮廓检测

1.轮廓概念介绍 在计算机视觉和图像处理领域中,轮廓是指在图像中表示对象边界的连续曲线。它是由一系列相邻的点构成的,这些点在边界上连接起来形成一个封闭的路径。 轮廓层级: 轮廓层级(Contour Hierarchy)是指在包含…

826. 安排工作以达到最大收益;2257. 统计网格图中没有被保卫的格子数;816. 模糊坐标

826. 安排工作以达到最大收益 核心思想:排序维护最大利润。首先我们需要对工人按照能力排序,前面工人满足的最大利润后面的工人肯定是满足的,所以我们只需要用一个tmp来维护小于等于当前工人的最大利润,然后如何得到tmp&#xff…

2023国赛A题保姆级思路代码:定日镜场的优化设计

A题是一套传统的机理分析加规划求解题,首先我们要根据每个月21号的特定时间点建立一个太阳角度框架,根据题目所给出的公式计算效率,还有输出的热功率,然后根据月份求解各种效率,再把年份进行汇总,二三题都是…

功能定义-紧急制动系统

功能简介 紧急制动系统的触发过程如上图所示: 安全距离报警:当两车距离较近时,会给予驾驶员相应提示 预报警:当两车存在碰撞风险但风险较低【Danger Level1】时,会给予驾驶员提示【提示相比之前更为明显】 制动预填充&…

vue前后端端口不一致解决方案

在config index.js文件中 引入如下代码即可 const path require(path) const devEnv require(./dev.env) module.exports {dev: {// PathsassetsSubDirectory: static,assetsPublicPath: /,proxyTable: devEnv.OPEN_PROXY false ? {} : {/api: {target: http://localhos…

MySQL大数据量高速迁移,500GB只需1个小时

在上篇「快、准、稳的实现亿级别MySQL大表迁移」的文章中,介绍了NineData在单张大表场景下的迁移性能和优势。但在大部分场景中,可能遇到的是多张表构成的大数据量场景下的数据搬迁问题。因为搬迁数据量较大,迁移的时长、稳定性及准确性都受到…

spring-secrity的Filter顺序+自定义过滤器

Filter顺序 Spring Security的官方文档向我们提供了filter的顺序,实际应用中无论用到了哪些,整体的顺序是保持不变的: ChannelProcessingFilter,重定向到其他协议的过滤器。也就是说如果你访问的channel错了,那首先就会在channel…

校园二手物品交易系统微信小程序设计

系统简介 本网最大的特点就功能全面,结构简单,角色功能明确。其不同角色实现以下基本功能。 服务端 后台首页:可以直接跳转到后台首页。 用户信息管理:管理所有申请通过的用户。 商品信息管理:管理校园二手物品中…

数字信封技术概论

数字信封技术是一种通过加密手段实现信息保密性和验证的技术,它在保护敏感信息传输过程中得到了广泛应用。本文将详细介绍数字信封技术的原理、实现和应用场景。 一、数字信封技术的原理 数字信封技术是一种将对称密钥通过非对称加密手段分发的方法。在数字信封中…