使用Flink-JDBC将数据同步到Doris

在现代数据分析和处理环境中,数据同步是一个至关重要的环节。Apache Flink和Doris是两个强大的工具,分别用于实时数据处理和大规模并行处理(MPP)SQL数据库。本文将介绍如何使用Flink-JDBC连接器将数据同步到Doris。

一、背景介绍

1、Apache Flink:Flink是一个开源流处理框架,用于处理无界和有界数据流。它提供了高吞吐量、低延迟的数据处理能力,并支持复杂的状态管理和容错机制。

2、Doris:Doris(原百度Palo)是一个基于MPP架构的高性能、实时分析型数据库。它支持高并发查询和高吞吐量的复杂分析场景,具有亚秒级响应时间,并兼容MySQL协议。

3、JDBC:Java数据库连接(JDBC)是一种Java API,用于连接和操作数据库。Flink提供了JDBC连接器,允许从各种关系型数据库中读取和写入数据。

二、准备工作

1、安装Flink:确保你的环境中已经安装了Apache Flink。本文示例使用的是Flink 1.20.0版本。

2、安装Doris:确保你的环境中已经安装并配置了Doris。

3、准备相关测试数据集(excel)。

4、依赖库:使用flink-connector-jdbc和Doris的JDBC驱动。如果你使用的是Maven,可以在项目的pom.xml文件中添加以下依赖:

<dependencies><!--    flink--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink-kafka.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink-jdbc.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!-- flink-doris-connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>${flink-doris.version}</version></dependency><!--    json处理--><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.53</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>4.0.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.24.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency>
</dependencies>

三、实现步骤

1.读取并解析excel中的测试数据集

package org.example.day20250105;import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import com.alibaba.excel.support.ExcelTypeEnum;
import org.example.day20250105.domain.*;
import org.example.day20250105.flink.jdbc.OdsXwDetailDataWriteByFlink;import java.util.ArrayList;
import java.util.List;public class ExcelDataHandle {private static final String filePath = "src/main/resources/20250105/WW2216069019-循环-90-3-1-20230104110446.xlsx";/*** 每隔5条存储数据库,实际使用中可以100条,然后清理list ,方便内存回收*/private static final int BATCH_COUNT = 1000;/*** 缓存的数据*/private static List<OdsXwCycleData> odsXwCycleDataList = new ArrayList<>(BATCH_COUNT);private static List<OdsXwDetailData> odsXwDetailDataList = new ArrayList<>(BATCH_COUNT);private static List<OdsXwStepData> odsXwStepDataList = new ArrayList<>(BATCH_COUNT);private static List<OdsTestFlow> odsTestFlowList = new ArrayList<>(BATCH_COUNT);private static final OdsTestFlow finalFlow = new OdsTestFlow();public static void main(String[] args) {EasyExcel.read(filePath, OdsTestFlowDataFormat.class, new OdsTestFlowDataHandle()).sheet(1).doRead();System.out.println(odsTestFlowList.add(finalFlow));System.out.println(odsTestFlowList);}public static List<OdsXwCycleData> readCycleData() {EasyExcel.read(filePath, OdsXwCycleData.class, new OdsXwCycleDataHandle()).sheet(2).doRead();return odsXwCycleDataList;}public static List<OdsTestFlow> readTestFlowData(){EasyExcel.read(filePath, OdsTestFlowDataFormat.class, new OdsTestFlowDataHandle()).sheet(1).doRead();return odsTestFlowList;}public static List<OdsXwStepData> readStepData() {EasyExcel.read(filePath, OdsXwStepData.class, new OdsXwStepDataHandle()).excelType(ExcelTypeEnum.XLSX).sheet(3).doRead();return odsXwStepDataList;}public static List<OdsXwDetailData> readDetailData() {EasyExcel.read(filePath, OdsXwDetailData.class, new OdsXwDetailDataHandle()).sheet(4).doRead();return odsXwDetailDataList;}/*** 新威原始测试流程数据   sheet 1*/public static class OdsTestFlowDataHandle implements ReadListener<OdsTestFlowDataFormat> {private static int rowCount = 0;@Overridepublic void invoke(OdsTestFlowDataFormat format, AnalysisContext analysisContext) {if (rowCount > 5) {return;}if (rowCount == 0) {finalFlow.setStartStepNo(format.getV1());finalFlow.setVoltageUpperLimit(format.getV2());finalFlow.setBatteryBatchNumber(format.getV3());} else if (rowCount == 1) {finalFlow.setCycleTimes(format.getV1());finalFlow.setVoltageLowerLimit(format.getV2());finalFlow.setCreator(format.getV3());} else if (rowCount == 2) {finalFlow.setRecordCondition(format.getV1());finalFlow.setCurrentUpperLimit(format.getV2());finalFlow.setRemark(format.getV3());} else if (rowCount == 3) {finalFlow.setVoltageRange(format.getV1());finalFlow.setCurrentLowerLimit(format.getV2());} else if (rowCount == 4) {finalFlow.setCurrentRange(format.getV1());finalFlow.setStartDatetime(format.getV2());finalFlow.setTestBarcode(format.getV3());} else if (rowCount == 5) {finalFlow.setActiveSubstances(format.getV1());finalFlow.setNominalCapacity(format.getV2());}rowCount++;}@Overridepublic void doAfterAllAnalysed(AnalysisContext analysisContext) {odsTestFlowList.add(finalFlow);}}/*** 循环数据处理   sheet 2*/public static class OdsXwCycleDataHandle implements ReadListener<OdsXwCycleData> {@Overridepublic void invoke(OdsXwCycleData odsXwCycleData, AnalysisContext analysisContext) {odsXwCycleDataList.add(odsXwCycleData);
//            if (odsXwCycleDataList.size() >= BATCH_COUNT) {
//                // 存储完成清理 list
//                odsXwCycleDataList.clear();
//            }}@Overridepublic void doAfterAllAnalysed(AnalysisContext analysisContext) {}}/*** 新威原始详情数据 sheet 4*/public static class OdsXwDetailDataHandle implements ReadListener<OdsXwDetailData> {@Overridepublic void invoke(OdsXwDetailData odsXwDetailData, AnalysisContext analysisContext) {odsXwDetailDataList.add(odsXwDetailData);if (odsXwDetailDataList.size() >= BATCH_COUNT) {//每1000条去调用存储方法一次OdsXwDetailDataWriteByFlink.save(odsXwDetailDataList);// 存储完成清理 listodsXwDetailDataList.clear();}}@Overridepublic void doAfterAllAnalysed(AnalysisContext analysisContext) {}}/*** 新威原始工步数据 sheet 3*/public static class OdsXwStepDataHandle implements ReadListener<OdsXwStepData> {@Overridepublic void invoke(OdsXwStepData odsXwStepData, AnalysisContext analysisContext) {odsXwStepDataList.add(odsXwStepData);}@Overridepublic void doAfterAllAnalysed(AnalysisContext analysisContext) {}}
}

2.通过filnk-jdbc写入到doris中

package org.example.day20250105.flink;public interface DorisConstant {public static final String FE_NODE_URL = "192.168.12.244:18030";String BE_NODE_URL = "192.168.12.244:18030";String DORIS_JDBC_URL = "jdbc:mysql://192.168.12.244:19030/test_db_czq";String USERNAME = "root";String PASSWORD = "*******";String DB_NAMME = "test_db_czq";String DRIVER_CLASS_NAME = "com.mysql.cj.jdbc.Driver";
}package org.example.day20250105.flink.jdbc;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.day20250105.ExcelDataHandle;
import org.example.day20250105.flink.DorisConstant;/*** INSERT INTO `test_db_czq`.`ods_xw_step_data` (`cycle_id`, `step_id`, `step_no`, `step_type`, `step_time`, `start_absolute_time`, `end_absolute_time`, `capacity`, `specific_capacity`, `charge_capacity`, `charge_specific_capacity`, `discharge_capacity`, `discharge_specific_capacity`, `net_discharge_capacity`, `energy`, `specific_energy`, `charge_energy`, `charge_specific_energy`, `discharge_energy`, `discharge_specific_energy`, `net_discharge_energy`, `super_capacitor`, `initial_voltage`, `charge_initial_voltage`, `discharge_initial_voltage`, `end_voltage`, `charge_end_voltage`, `discharge_end_voltage`, `charge_median_voltage`, `discharge_median_voltage`, `initial_current`, `end_current`, `dcir`, `t1_start_temperature`, `t1_end_temperature`, `t1_max_temperature`, `t1_min_temperature`, `v1_start_voltage`, `v1_end_voltage`, `v1_max_voltage`, `v1_min_voltage`, `file_name`) VALUES ('5', '30', '45', '恒流放电', '02:00:19', '2023-01-03 03:47:37', '2023-01-03 05:47:56', '4.0266', '4026648', '0', '0', '4.0266', '4026648', '4.0266', '12.858', '12857977.78', '0', '0', '12.858', '12857977.78', '12.858', '10407.4004', '3.4573', '0', '3.4573', '1.9993', '0', '1.9993', '0', '3.2285', '0', '-2.0089', '0', '25.3', '25.5', '25.6', '25.2', '-0.0003', '-0.0003', '-0.0001', '-0.0003', NULL);*/
public class OdsXwStepDataWriteByFlink {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.fromData(ExcelDataHandle.readStepData()).addSink(JdbcSink.sink("INSERT INTO `test_db_czq`.`ods_xw_step_data` " +"(`cycle_id`, `step_id`, `step_no`, `step_type`, `step_time`, `start_absolute_time`, " +"`end_absolute_time`, `capacity`, `specific_capacity`, `charge_capacity`, `charge_specific_capacity`," +" `discharge_capacity`, `discharge_specific_capacity`, `net_discharge_capacity`, `energy`, `specific_energy`, " +"`charge_energy`, `charge_specific_energy`, `discharge_energy`, `discharge_specific_energy`, `net_discharge_energy`, " +"`super_capacitor`, `initial_voltage`, `charge_initial_voltage`, `discharge_initial_voltage`, `end_voltage`, `charge_end_voltage`," +" `discharge_end_voltage`, `charge_median_voltage`, `discharge_median_voltage`, `initial_current`, `end_current`, `dcir`, " +"`t1_start_temperature`, `t1_end_temperature`, `t1_max_temperature`, `t1_min_temperature`, `v1_start_voltage`, `v1_end_voltage`, " +"`v1_max_voltage`, `v1_min_voltage`, `file_name`) " +"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",(ps, data) -> {ps.setString(1, data.getCycle_id());ps.setString(2, data.getStep_id());ps.setString(3, data.getStep_no());ps.setString(4, data.getStep_type());ps.setString(5, data.getStep_time());ps.setString(6, data.getStart_absolute_time());ps.setString(7, data.getEnd_absolute_time());ps.setString(8, data.getCapacity());ps.setString(9, data.getSpecific_capacity());ps.setString(10, data.getCharge_capacity());ps.setString(11, data.getCharge_specific_capacity());ps.setString(12, data.getDischarge_capacity());ps.setString(13, data.getDischarge_specific_capacity());ps.setString(14, data.getNet_discharge_capacity());ps.setString(15, data.getEnergy());ps.setString(16, data.getSpecific_energy());ps.setString(17, data.getCharge_energy());ps.setString(18, data.getCharge_specific_energy());ps.setString(19, data.getDischarge_energy());ps.setString(20, data.getDischarge_specific_energy());ps.setString(21, data.getNet_discharge_energy());ps.setString(22, data.getSuper_capacitor());ps.setString(23, data.getInitial_voltage());ps.setString(24, data.getCharge_initial_voltage());ps.setString(25, data.getDischarge_initial_voltage());ps.setString(26, data.getEnd_voltage());ps.setString(27, data.getCharge_end_voltage());ps.setString(28, data.getDischarge_end_voltage());ps.setString(29, data.getCharge_median_voltage());ps.setString(30, data.getDischarge_median_voltage());ps.setString(31, data.getInitial_current());ps.setString(32, data.getEnd_current());ps.setString(33, data.getDcir());ps.setString(34, data.getT1_start_temperature());ps.setString(35, data.getT1_end_temperature());ps.setString(36, data.getT1_max_temperature());ps.setString(37, data.getT1_min_temperature());ps.setString(38, data.getV1_start_voltage());ps.setString(39, data.getV1_end_voltage());ps.setString(40, data.getV1_max_voltage());ps.setString(41, data.getV1_min_voltage());ps.setString(42, data.getFile_name());},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUsername(DorisConstant.USERNAME).withPassword(DorisConstant.PASSWORD).withDriverName(DorisConstant.DRIVER_CLASS_NAME).withUrl(DorisConstant.DORIS_JDBC_URL).build()));env.execute();}
}
 

四、实现结果

登录Doris后台可以看到数据已经同步到了ods_xw_step_data表中

图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/2357.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python】OpenCV—Local Translation Warps

文章目录 1、功能描述2、原理分析3、代码实现4、效果展示5、完整代码6、参考 1、功能描述 利用液化效果实现瘦脸美颜 交互式的液化效果原理来自 Gustafsson A. Interactive image warping[D]. , 1993. 2、原理分析 上面描述很清晰了&#xff0c;鼠标初始在 C&#xff0c;也即…

灵活妙想学数学

灵活妙想学数学 题1&#xff1a;海星有几只&#xff1f; 一共有12只海洋生物&#xff0c;分别是5只脚的海星&#xff0c;8只脚的章鱼和10只脚的鱿鱼&#xff0c;这些海洋动物的脚一共有87只&#xff0c;每种生物至少有1只&#xff0c;问海星有几只&#xff1f; 解&#xff1a…

STM32-笔记40-BKP(备份寄存器)

一、什么是BKP&#xff08;备份寄存器&#xff09;&#xff1f; 备份寄存器是42个16位的寄存器&#xff0c;可用来存储84个字节的用户应用程序数据。他们处在备份域里&#xff0c;当VDD电源被切断&#xff0c;他们仍然由VBAT维持供电。当系统在待机模式下被唤醒&#xff0c;或…

Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器

Spring Boot – 动态启动/停止 Kafka 监听器 当 Spring Boot 应用程序启动时&#xff0c;Kafka Listener 的默认行为是开始监听某个主题。但是&#xff0c;有些情况下我们不想在应用程序启动后立即启动它。 要动态启动或停止 Kafka Listener&#xff0c;我们需要三种主要方法…

编译pytorch——cuda-toolkit-nvcc

链接 https://blog.csdn.net/wjinjie/article/details/108997692https://docs.nvidia.com/cuda/cuda-installation-guide-linux/#switching-between-driver-module-flavorshttps://forums.developer.nvidia.com/t/can-not-load-nvidia-drivers-on-ubuntu-22-10/239750https://…

如何发布自己的第一个Chrome扩展程序

如何发布自己的Chrome扩展程序 只需要六步即可完成Chrome扩展程序的发布 &#xff08;1&#xff09;首先打开google chrome 应用商城注册开发者账号的页面 &#xff08;2&#xff09;现在进行一个绑卡支付5美元的一次性注册费用即可。【不知道如何绑卡的支付的&#xff0c;文…

SpringBoot入门实现简单增删改查

本例子的依赖 要实现的内容 通过get、post、put和delete接口,对数据库中的trade.categories表进行增删改查操作。 目录结构 com.test/ │ ├── controller/ │ ├── CateController.java │ ├── pojo/ │ ├── dto/ │ │ └── CategoryDto.java │ ├─…

electron 如何申请 Mac 系统权限

对于一些使用 Electron开发的app, 需要获取一些系统权限,比如录屏权限, 获取摄像头权限,麦克风等等,类似于以下界面: 那么Electron App 应该如何申请呢? 首先我们明确一下macOS中基础权限的分类,可以分为以下几种: 隐私权限(Private Permissions) : <!-- entitlements.ma…

浅谈云计算02 | 云计算模式的演进

云计算计算模式的演进 一、云计算计算模式的起源追溯1.2 个人计算机与桌面计算 二、云计算计算模式的发展阶段2.1 效用计算的出现2.2 客户机/服务器模式2.3 集群计算2.4 服务计算2.5 分布式计算2.6 网格计算 三、云计算计算模式的成熟与多元化3.1 主流云计算服务模式的确立3.1.…

An FPGA-based SoC System——RISC-V On PYNQ项目复现

本文参考&#xff1a; &#x1f449; 1️⃣ 原始工程 &#x1f449; 2️⃣ 原始工程复现教程 &#x1f449; 3️⃣ RISCV工具链安装教程 1.准备工作 &#x1f447;下面以LOCATION代表本地源存储库的安装目录&#xff0c;以home/xilinx代表在PYNQ-Z2开发板上的目录 ❗ 下载Vivad…

AI智能体实战|使用扣子Coze搭建AI智能体,看这一篇就够了(新手必读)

有朋友看到我使用Coze搭建的AI智能体蛮实用的&#xff0c;也想自己尝试一下。那今天我就分享一下如何使用Coze&#xff08;扣子&#xff09;搭建AI智能体&#xff0c;手把手教学&#xff0c;流程超级详细&#xff0c;学会了的话&#xff0c;欢迎分享转发&#xff01; 一、搭建A…

.NET8.0多线程编码结合异步编码示例

1、创建一个.NET8.0控制台项目来演示多线程的应用 2、快速创建一个线程 3、多次运行程序&#xff0c;可以得到输出结果 这就是多线程的特点 - 当多个线程并行执行时&#xff0c;它们的具体执行顺序是不确定的&#xff0c;除非我们使用同步机制&#xff08;如 lock、信号量等&am…

nginx 实现 正向代理、反向代理 、SSL(证书配置)、负载均衡 、虚拟域名 ,使用其他中间件监控

我们可以详细地配置 Nginx 来实现正向代理、反向代理、SSL、负载均衡和虚拟域名。同时&#xff0c;我会介绍如何使用一些中间件来监控 Nginx 的状态和性能。 1. 安装 Nginx 如果你还没有安装 Nginx&#xff0c;可以通过以下命令进行安装&#xff08;以 Ubuntu 为例&#xff0…

《数据思维》之数据可视化_读书笔记

文章目录 系列文章目录前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 数据之道&#xff0c;路漫漫其修远兮&#xff0c;吾将上下而求索。 一、数据可视化 最基础的数据可视化方法就是统计图。一个好的统计图应该满足四个标准&#xff1a;准确、有…

Linux之进程

Linux之进程 一.进程进程之形ps命令进程状态特殊进程孤儿进程守护进程 进程创建之创建子进程进程特性优先级进程切换&#xff08;分时操作系统&#xff09; 二.环境变量三.进程地址空间四.进程终止&进程等待五.进程替换六.自定义shell 本篇博客希望简略的介绍进程&#xff…

漫话架构师|什么是系统架构设计师(开篇)

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” 关注犬余&#xff0c;共同进步 技术从此不孤单

在AI智能中有几种重要的神经网络类型?6种重要的神经网络类型分享!

神经网络今天已经变得非常流行&#xff0c;但仍然缺乏对它们的了解。一方面&#xff0c;我们已经看到很多人无法识别各种类型的神经网络及其解决的问题&#xff0c;更不用说区分它们中的每一个了。其次&#xff0c;在某种程度上更糟糕的是&#xff0c;当人们在谈论任何神经网络…

业务幂等性技术架构体系之消息幂等深入剖析

在系统中当使用消息队列时&#xff0c;无论做哪种技术选型&#xff0c;有很多问题是无论如何也不能忽视的&#xff0c;如&#xff1a;消息必达、消息幂等等。本文以典型的RabbitMQ为例&#xff0c;讲解如何保证消息幂等的可实施解决方案&#xff0c;其他MQ选型均可参考。 一、…

【C语言】线程----同步、互斥、条件变量

目录 3. 同步 3.1 概念 3.2 同步机制 3.3 函数接口 1. 同步 1.1 概念 同步(synchronization)指的是多个任务(线程)按照约定的顺序相互配合完成一件事情 1.2 同步机制 通过信号量实现线程间的同步 信号量&#xff1a;通过信号量实现同步操作&#xff1b;由信号量来决定…

Linux内核的启动

一、需求 Linux系统中内核处于硬件和应用层之间。整个系统启动和初始化的过程&#xff0c;Linux内核是在主处理器启动之后才会执行。不同的处理器启动流程并不相同&#xff0c;这就要求内核能支持各种处理器的初始化操作。Liux内核各个模块&#xff0c;大部分设计时做到了体系…