1、官网下载 DataX
https://github.com/alibaba/DataX
2、将依赖添加到本地(DataX没有maven坐标,需要自己安装)
mvn install:install-file -Dfile="datax-common-0.0.1.jar" "-DgroupId=com.datax" "-DartifactId=datax-common" "-Dversion=0.0.1" "-Dpackaging=jar"
mvn install:install-file -Dfile="datax-core-0.0.1.jar" "-DgroupId=com.datax" "-DartifactId=datax-core" "-Dversion=0.0.1" "-Dpackaging=jar"
3、引入依赖
<!-- datax --><dependency><groupId>com.datax</groupId><artifactId>datax-core</artifactId><version>0.0.1</version></dependency><dependency><groupId>com.datax</groupId><artifactId>datax-common</artifactId><version>0.0.1</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.13</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-io</artifactId><version>1.3.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency>
3、测试类
import com.alibaba.datax.core.Engine;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class DataxDemo {public static void main(String[] args) throws Throwable {// datax.home中存放的 是datax工具的路径。// System.setProperty("datax.home", "E:\\datax"); 设置datax.home环境变量,在系统启动时会通过datax.home环境变量获取配置文件和read、writer数据量插件。System.setProperty("datax.home", "E:\\Develop\\datax\\datax");String jsonPath = "E:\\Gotion\\code2\\manager_platform_develop\\platform-admin\\src\\main\\resources\\datax";String[] datxArgs = {"-job", jsonPath + "/test.json", "-mode", "standalone", "-jobid", "-1"};try {// 启动DataXEngine.entry(datxArgs);} catch (Exception e) {log.error("DataX启动失败", e);}}//获取resource的路径public static String getCurrentClasspath() {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();String currentClasspath = classLoader.getResource("").getPath();// 当前操作系统String osName = System.getProperty("os.name");if (osName.startsWith("Win")) {// 删除path中最前面的/currentClasspath = currentClasspath.substring(1, currentClasspath.length() - 1);}return currentClasspath;}
}
4、数据同步配置Json说明
{"job": {"setting": {"speed": {"channel": 1 // 根据实际情况调整并发通道数}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "", // 源数据库用户名"password": "", // 源数据库密码"column": ["*"], // 要同步的列,使用*表示同步所有列"splitPk": "", // 分片键"connection": [{"table": ["table_name"], // 源表名"jdbcUrl": [""] // 源数据库连接URL}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "", // 目标数据库用户名"password": "", // 目标数据库密码"column": ["*"], // 要写入的列,使用*表示写入所有列"preSql": ["delete from table"], // 预处理SQL,用于清空目标表"session": ["set session sql_mode='ANSI'"], // 数据库会话配置"connection": [{"table": ["table_name"], // 目标表名"jdbcUrl": "" // 目标数据库连接URL}],"writeMode": "insert" // 写入模式,可以是insert、replace、update等}}}]}
}
Json接收参数:
select t.id,t.name,t.status from users t where t.id=${id}
{"job": {"setting": {"speed": {"channel": 4}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test"],"querySql": ["select t.id,t.name,t.status from users t where t.id=${id}"]}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","writeMode": "insert","column": ["id","name","status"],"connection": [{"table": ["temp_users"],"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test"}]}}}]}
}
启动之前设置参数:
System.setProperty("id", "1");