DataX-Oracle新增writeMode支持update

目录

前言

第一步下载源码

第二步修改源码

1、Oraclewriter

2、WriterUtil

 2.1、修改getWriteTemplate方法

 2.2、新增onMergeIntoDoString与getStrings方法

3、CommonRdbmsWriter

 3.1、修改startWriteWithConnection

 3.2、修改doBatchInsert

 3.3、修改fillPreparedStatement

第三步打包

第四步脚本修改



前言

目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。

原理:oracle 不支持类似 MySQL的 REPLACE INTO 和 INSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN[UPDATE sql] 
WHEN NOT MATCHED THEN [INSERT sql]

第一步下载源码

 地址:datax_v202309。

第二步修改源码

一共修改3个文件

1、Oraclewriter

 

找到该代码直接注释掉就行。 

2、WriterUtil
 2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean update = writeMode.trim().toLowerCase().startsWith("update");boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| update;if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate || update) {//update只在mysql下使用if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();}//update在Oracle下使用else if (dataBaseType == DataBaseType.Oracle) {writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +StringUtils.join(columnHolders, ",") +") VALUES(" + StringUtils.join(valueHolders, ",") +")";}else {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));}} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (update) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}return writeDataSqlTemplate;}
 2.2、新增onMergeIntoDoString与getStrings方法

代码作用:对Oracle进行update的MERGE拼接

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {String[] sArray = getStrings(merge);StringBuilder sb = new StringBuilder();sb.append("MERGE INTO %s A USING ( SELECT ");boolean first = true;boolean first1 = true;StringBuilder str = new StringBuilder();StringBuilder update = new StringBuilder();for (String columnHolder : columnHolders) {if (Arrays.asList(sArray).contains(columnHolder)) {if (!first) {sb.append(",");str.append(" AND ");} else {first = false;}str.append("TMP.").append(columnHolder);sb.append("?");str.append(" = ");sb.append(" AS ");str.append("A.").append(columnHolder);sb.append(columnHolder);}}for (String columnHolder : columnHolders) {if (!Arrays.asList(sArray).contains(columnHolder)) {if (!first1) {update.append(",");} else {first1 = false;}update.append(columnHolder);update.append(" = ");update.append("?");}}sb.append(" FROM DUAL ) TMP ON (");sb.append(str);sb.append(" ) WHEN MATCHED THEN UPDATE SET ");sb.append(update);sb.append(" WHEN NOT MATCHED THEN ");return sb.toString();}public static String[] getStrings(String merge) {merge = merge.replace("update", "");merge = merge.replace("(", "");merge = merge.replace(")", "");merge = merge.replace(" ", "");return merge.split(",");}
3、CommonRdbmsWriter
 3.1、修改startWriteWithConnection
        // 替换原先的代码块public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {this.taskPluginCollector = taskPluginCollector;List<String> columns = new LinkedList<>();if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);this.columns.forEach(column->{if (Arrays.asList(sArray).contains(column)) {columns.add(column);}});this.columns.forEach(column->{if (!Arrays.asList(sArray).contains(column)) {columns.add(column);}});}columns.addAll(this.columns);// 用于写入数据的时候的类型根据目的表字段类型转换this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));// 写数据库的SQL语句calcWriteRecordSql();List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);int bufferBytes = 0;try {Record record;while ((record = recordReceiver.getFromReader()) != null) {if (record.getColumnNumber() != this.columnNumber) {// 源头读取字段列数与目的表字段写入列数不相等,直接报错throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",record.getColumnNumber(),this.columnNumber));}writeBuffer.add(record);bufferBytes += record.getMemorySize();if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}}if (!writeBuffer.isEmpty()) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}} catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);} finally {writeBuffer.clear();bufferBytes = 0;DBUtil.closeDBResources(null, null, connection);}}
 3.2、修改doBatchInsert
 protected void doBatchInsert(Connection connection, List<Record> buffer)throws SQLException{PreparedStatement preparedStatement = null;try {connection.setAutoCommit(false);preparedStatement = connection.prepareStatement(this.writeRecordSql);if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);for (Record record : buffer) {List<Column> recordOne = new ArrayList<>();for (int j = 0; j < this.columns.size(); j++) {if (Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {if (!Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {recordOne.add(record.getColumn(j));}for (int j = 0; j < recordOne.size(); j++) {record.setColumn(j, recordOne.get(j));}preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}else {for (Record record : buffer) {preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}preparedStatement.executeBatch();connection.commit();}catch (SQLException e) {LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());connection.rollback();doOneInsert(connection, buffer);}catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);}finally {DBUtil.closeDBResources(preparedStatement, null);}}
 3.3、修改fillPreparedStatement
  protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)throws SQLException{for (int i = 0; i < record.getColumnNumber(); i++) {int columnSqltype = this.resultSetMetaData.getMiddle().get(i);String typeName = this.resultSetMetaData.getRight().get(i);preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));}return preparedStatement;}

第三步打包

1、只需要在idea里面打包修改的两个程序就可以

 2、打包成功后获取两个jar包

 3、将包替换到datax的插件里面

 将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter

 将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs

第四步脚本修改

{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "${r_username}","password": "${r_password}","connection": [{	   "querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],"jdbcUrl": ["${r_jdbcUrl}"]}]}},"writer": {"name": "oraclewriter","parameter": {"writeMode": "update(f_year,f_code)","username": "${w_username}","password": "${w_password}","column": ["f_year","f_code","f_name","f_order"],"session": [],"preSql": [],"connection": [{"jdbcUrl": "${w_jdbcUrl}","table": ["tableName"]}]}}		   }]}
}

参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"

update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。

这时候运行就成功了

参考文章DataX 二次开发支持 Oracle 更新数据icon-default.png?t=N7T8https://blog.csdn.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新icon-default.png?t=N7T8https://blog.csdn.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7

两个包的地址: https://download.csdn.net/download/qq_36802726/89046154icon-default.png?t=N7T8https://download.csdn.net/download/qq_36802726/89046154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/289986.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单例模式如何保证实例的唯一性

前言 什么是单例模式 指一个类只有一个实例&#xff0c;且该类能自行创建这个实例的一种创建型设计模式。使用目的&#xff1a;确保在整个系统中只能出现类的一个实例&#xff0c;即一个类只有一个对象。对于频繁使用的对象&#xff0c;“忽略”创建时的开销。特点&#xff1a…

zedboard+AD9361 运行 open WiFi

先到github上下载img&#xff0c;网页链接如下&#xff1a; https://github.com/open-sdr/openwifi?tabreadme-ov-file 打开网页后下载 openwifi img 用win32 Disk lmager 把文件写入到SD卡中&#xff0c;这一步操作会把SD卡重新清空&#xff0c;注意保存数据。这个软件我会…

基于SpringBoot和Vue的在线视频教育平台的设计与实现

今天要和大家聊的是一款基于SpringBoot和Vue的在线视频教育平台的设计与实现 &#xff01;&#xff01;&#xff01; 有需要的小伙伴可以通过文章末尾名片咨询我哦&#xff01;&#xff01;&#xff01; &#x1f495;&#x1f495;作者&#xff1a;李同学 &#x1f495;&…

mysql--事务四大特性与隔离级别

事务四大特性与隔离级别 mysql事务的概念事务的属性事务控制语句转账示例 并发事务引发的问题脏读脏读场景 不可重复读幻读幻读场景 事务的隔离级别读未提交读已提交可重复读&#xff08;MySQL默认&#xff09; 总结 mysql事务的概念 事务就是一组操作的集合&#xff0c;他是一…

STM32时钟简介

1、复位&#xff1a;使时钟恢复原始状态 就是将寄存器状态恢复到复位值 STM32E10xxx支持三种复位形式,分别为系统复位、上电复位和备份区域复位。 复位分类&#xff1a; 1.1系统复位 除了时钟控制器的RCC_CSR寄存器中的复位标志位和备份区域中的寄存器以外,系统 复位将复位…

leetcode热题100.柱状图中最大的矩形

Problem: 84. 柱状图中最大的矩形 文章目录 题目思路复杂度Code 题目 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大面积。 示例 1: 输入&#xff1a;hei…

Qlib-Server:量化库数据服务器

Qlib-Server:量化库数据服务器 介绍 Qlib-Server 是 Qlib 的配套服务器系统,它利用 Qlib 进行基本计算,并提供广泛的服务器系统和缓存机制。通过 Qlib-Server,可以以集中的方式管理 Qlib 提供的数据。 框架 Qlib 的客户端/服务器框架基于 WebSocket 构建,这是因为 WebS…

OpenHarmony中的LLDB高性能调试器

概述 LLDB&#xff08;Low Lever Debugger&#xff09;是新一代高性能调试器。详细说明参考 LLDB官方文档 。 当前OpenHarmony中的LLDB工具是在 llvm15.0.4 基础上适配演进出来的工具&#xff0c;是HUAWEI DevEco Studio工具中默认的调试器&#xff0c;支持调试C和C应用。 工…

镜视界 | DevSecOps CI/CD 管道中数字供应链安全的集成策略

目录 前言 数字供应链&#xff08;DSC&#xff09;的定义 数字供应链安全的重点内容和风险因素 CI/CD管道的安全目标和可信实体 将数字供应链安全集成到CI/CD管道中 结语 本文字数&#xff1a;7715&#xff0c;阅读时长&#xff1a;19分钟 1.前言 在敏捷开发的模式下&…

SnapGene 5 for Mac 分子生物学软件

SnapGene 5 for Mac是一款专为Mac操作系统设计的分子生物学软件&#xff0c;以其强大的功能和用户友好的界面&#xff0c;为科研人员提供了高效、便捷的基因克隆和分子实验设计体验。 软件下载&#xff1a;SnapGene 5 for Mac v5.3.1中文激活版 这款软件支持DNA构建和克隆设计&…

StarRocks实战——多点大数据数仓构建

目录 前言 一、背景介绍 二、原有架构的痛点 2.1 技术成本 2.2 开发成本 2.2.1 离线 T1 更新的分析场景 2.2.2 实时更新分析场景 2.2.3 固定维度分析场景 2.2.4 运维成本 三、选择StarRocks的原因 3.1 引擎收敛 3.2 “大宽表”模型替换 3.3 简化Lambda架构 3.4 模…

目标检测的相关模型图:YOLO系列和RCNN系列

目标检测的相关模型图&#xff1a;YOLO系列和RCNN系列 前言YOLO系列的图展示YOLOpassthroughYOLO2YOLO3YOLO4YOLO5 RCNN系列的图展示有关目标检测发展的 前言 最近好像大家也都在写毕业论文&#xff0c;前段时间跟朋友聊天&#xff0c;突然想起自己之前写画了一些关于YOLO、Fa…

excel中批量插入分页符

excel中批量插入分页符&#xff0c;实现按班级打印学生名单。 1、把学生按照学号、班级排序好。 2、选择班级一列&#xff0c;点击数据-分类汇总。汇总方式选择计数&#xff0c;最后三个全部勾选。汇总结果一定要显示在数据的下发&#xff0c;如果显示在上方&#xff0c;后期…

实战|使用 Node.js 和 htmx 构建全栈应用程序

在本教程中&#xff0c;我将演示如何使用 Node 作为后端和 htmx 作为前端来构建功能齐全的 CRUD 应用程序。这将演示 htmx 如何集成到全栈应用程序中&#xff0c;使您能够评估其有效性并确定它是否是您未来项目的不错选择。 htmx 是一个现代 JavaScript 库&#xff0c;旨在通过…

系统架构图怎么画

画架构图是架构师的一门必修功课。 对于架构图是什么这个问题&#xff0c;我们可以按以下等式进行概括&#xff1a; 架构图 架构的表达 架构在不同抽象角度和不同抽象层次的表达&#xff0c;这是一个自然而然的过程。 不是先有图再有业务流程、系统设计和领域模型等&#…

项目四-图书管理系统

1.创建项目 流程与之前的项目一致&#xff0c;不再进行赘述。 2.需求定义 需求: 1. 登录: ⽤⼾输⼊账号,密码完成登录功能 2. 列表展⽰: 展⽰图书 3.前端界面测试 无法启动&#xff01;&#xff01;&#xff01;--->记得加入mysql相关操作记得在yml进行配置 配置后启动…

基于ZHW3548的红外额温枪解决方案

红外额温枪&#xff0c;非接触式测量最典型的方法是红外测温。自红外辐射原理被发现以来&#xff0c;红外技术被广泛应用在温度测量中。红外测温仪具有测温范围广&#xff0c;响应速度快&#xff0c;灵敏度高等特点。红外耳温枪、红外额温计和红外筛检仪都属于非接触式体温计。…

admin端

一、创建项目 1.1 技术栈 1.2 vite 项目初始化 npm init vitelatest vue3-element-admin --template vue-ts 1.3 src 路径别名配置 Vite 配置 配置 vite.config.ts // https://vitejs.dev/config/import { UserConfig, ConfigEnv, loadEnv, defineConfig } from vite im…

C语言 C6031:返回值被忽略:“scanf“ 问题解决

我们在代码中 直接使用 scanf 就会出现这个错误 在最上面 加上 #define _CRT_SECURE_NO_WARNINGS//禁用安全函数警告 #pragma warning(disable:6031)//禁用 6031 的安全警告即可正常运行

TitanIDE与传统 IDE 比较

与传统IDE的比较 TitanIDE 和传统 IDE 属于不同时代的产物&#xff0c;在手工作坊时代&#xff0c;一切都是那么的自然&#xff0c;开发者习惯 Windows 或 MacOS 原生 IDE。不过&#xff0c;随着时代的变迁&#xff0c;软件行业已经步入云原生时代&#xff0c;TitanIDE 是顺应…