使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。

通过内置的Flink CDC,连接器可以直接将上游源的表模式和数据同步到Apache Doris,这意味着用户不再需要编写DataStream程序或在Doris中预先创建映射表。

当 Flink 作业启动时,Connector 会自动检查源数据库和 Apache Doris 之间的数据等效性。如果数据源包含 Doris 中不存在的表,Connector 会自动在 Doris 中创建相同的表,并利用 Flink 的侧输出来方便一次摄取多个表;如果源中发生架构更改,它将自动获取 DDL 语句并在 Doris 中进行相同的架构更改。
 

一、快速开始

  • 对于MySQL:

下载 JAR 文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0


行家:

<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.15</artifactId><!--artifactId>flink-doris-connector-1.16</artifactId--><!--artifactId>flink-doris-connector-1.17</artifactId--><version>1.4.0</version>
</dependency>
  • 对于Oracle:

下载 JAR 文件:
Flink 1.15:http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.15-1.5.0-SNAPSHOT.jar
Flink 1.16:http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar
Flink 1.17:http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.17-1.5.0-SNAPSHOT.jar


如何使用它

例如,要将整个 MySQL 数据库引入mysql_dbDoris(MySQL 表名以tbl或test开头),只需执行以下命令(无需提前在Doris 中创建表):

<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \lib/flink-doris-connector-1.16-1.4.0.jar \mysql-sync-database \--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label1 \--table-conf replication_num=1

摄取Oracle数据库:请参考示例代码(https://github.com/apache/doris-flink-connector/pull/156)。


表现如何

当涉及到同步整个数据库(包含数百甚至数千个活动或不活动的表)时,大多数用户希望在几秒钟内完成。因此我们测试了连接器,看看它是否符合要求:

  • 1000 个 MySQL 表,每个表有 100 个字段。所有表都是活动的(这意味着它们不断更新,每次数据写入涉及一百多行)

  • Flink作业检查点:10s

经过压力测试,系统表现出较高的稳定性,主要指标如下:

根据早期采用者的反馈,该Connector在生产环境中的万表数据库同步中也提供了高性能和系统稳定性。这证明Apache Doris和Flink CDC的结合能够高效可靠地进行大规模数据同步。

二、它如何使数据工程师受益

工程师不再需要担心表创建或表模式维护,从而节省了数天繁琐且容易出错的工作。之前在Flink CDC中,需要为每个表创建一个Flink作业,并在源端建立日志解析链路,但现在通过全库摄取,源数据库的资源消耗大大减少。也是增量更新和全量更新的统一解决方案。

其他特性

1、连接维度表和事实表

常见的做法是将维度表放在Doris中,通过Flink的实时流进行Join查询。Flink-Doris-Connector 1.4.0基于Flink 的 Async I/O实现了异步 Lookup Join,因此 Flink 实时流不会因为查询而阻塞。此外,连接器还允许您将多个查询合并为一个大查询,并将其立即发送给 Doris 进行处理。这提高了此类连接查询的效率和吞吐量。

2、节俭 SDK

我们在 Connector 中引入了 Thrift-Service SDK,用户不再需要使用 Thrift 插件或在编译时配置 Thrift 环境。这使得编译过程变得更加简单。

3、按需流加载

数据同步过程中,当没有新的数据摄入时,不会发出Stream Load请求。这样可以避免不必要的集群资源消耗。

4、后端节点轮询

对于数据摄取,Doris 调用前端节点获取后端节点列表,并随机选择一个发起摄取请求。该后端节点将是协调器。Flink-Doris-Connector 1.4.0 允许用户启用轮询机制,即在每个Flink 检查点都有不同的后端节点作为 Coordinator,以避免单个后端节点长期承受过大的压力。

5、支持更多数据类型

除了常见的数据类型外,Flink-Doris-Connector 1.4.0 还支持 Doris 中的 DecimalV3/DateV2/DateTimev2/Array/JSON。


三、用法示例

可以通过DataStream或FlinkSQL(有界流)从Doris读取数据。支持谓词下推。

CREATE TABLE flink_doris_source (name STRING,age INT,score DECIMAL(5,2)) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = 'password','doris.filter.query' = 'age=18'
);
​
SELECT * FROM flink_doris_source;


连接维度表和事实表:

CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()
) WITH ('connector' = 'kafka',
...
);
​
create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','lookup.jdbc.async' = 'true','table.identifier' = 'dim.dim_city','username' = 'root','password' = ''
);
​
SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city


写入Apache Doris:

CREATE TABLE doris_sink (name STRING,age INT,score DECIMAL(5,2)) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.label-prefix' = 'doris_label',//json write in'sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true'
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/123872.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue + Element UI 实现权限管理系统 前端篇(十四):菜单功能实现菜

Vue Element UI 实现权限管理系统 前端篇&#xff08;十四&#xff09;&#xff1a;菜单功能实现 菜单功能实现 菜单接口封装 菜单管理是一个对菜单树结构的增删改查操作。 提供一个菜单查询接口&#xff0c;查询整颗菜单树形结构。 http/modules/menu.js 添加 findMenu…

VsCode搭建Java开发环境 vscode搭建java开发环境 vscode springboot 搭建springboot

VsCode搭建Java开发环境 vscode搭建java开发环境 vscode springboot 搭建springboot VsCode java开发截图1、安装Java 环境相关插件2、安装 Spring 插件3、安装 Mybatis 插件第一个 vsc-mybatis第二个 mybatisX 4、安装Maven环境4.1、安装Maven环境4.2、VsCode配置Maven环境 5、…

Excel_VBA程序文件的加密及解密说明

VBA应用技巧及疑难解答 Excel_VBA程序文件的加密及解密 在您看到这个文档的时候&#xff0c;请和我一起念&#xff1a;“唵嘛呢叭咪吽”“唵嘛呢叭咪吽”“唵嘛呢叭咪吽”&#xff0c;为自己所得而感恩&#xff0c;为付出者赞叹功德。 本不想分享之一技术&#xff0c;但众多学…

智慧公厕是将数据、技术、业务深度融合的公共厕所敏捷化“操作系统”

文明社会的进步离不开公共设施的不断创新和提升。而在这些公共设施中&#xff0c;公共厕所一直是一个备受关注和改善的领域。近年来&#xff0c;随着智慧城市建设的推进&#xff0c;智慧公厕成为了城市管理的重要一环。智慧公厕不仅仅是为公众提供方便和舒适的便利设施&#xf…

TVC广告片存在的商业价值

TVC广告片是商业广告中最常见和重要的形式之一&#xff0c;具有广泛的覆盖面和影响力。宣传片是一种用于宣传推广产品、服务或活动的短片或视频。相比宣传片&#xff0c;TVC广告片可能存在一些弊端。接下来由深圳TVC广告片制作公司老友记小编从以下几个方面浅析一些可能的弊端&…

1998-2014年工业企业数据库和绿色专利匹配

1998-2014年工业企业数据库绿色专利匹配 1、时间&#xff1a;1998-2014年 2、样本量&#xff1a;470万 3、来源&#xff1a;工业企业数据库、国家知识产权局、WIPO 4、指标&#xff1a; 企业匹配唯一标识码、组织机构代码、企业名称、年份、法定代表人、法定代表人职务、行…

华为云云服务器评测|华为云耀云L搭建zerotier服务测试

0. 环境 - Win10 - 云耀云L服务器 1. 安装docker 检查yum源&#xff0c;本EulerOS的源在这里&#xff1a; cd /etc/yum.repos.d 更新源 yum makecache 安装 yum install -y docker-engine 运行测试 docker run hello-world 2. 运行docker镜像 默认配…

Android基础之Activity生命周期

Activity是Android四大组件之一、称为之首也恰如其分。 Activity直接翻译为中文叫活动。在Android系统中Activity就是我看到的一个完整的界面。 界面中看到的TextView(文字&#xff09;、Button(按钮)、ImageView&#xff08;图片&#xff09;都是需要Activity来承载的。 总…

文件包含漏洞学习小结

目录 一、介绍 二、常见文件包含函数 三、文件包含漏洞代码举例分析 四、文件包含漏洞利用方式 4.1 本地文件包含 1、读取敏感文件 2、文件包含可运行的php代码 ①包含图片码 ②包含日志文件 ③包含环境变量getshell ④临时文件包含 ⑤伪协议 4.2 远程文件包含 4.…

部署Django报错-requires SQLite 3.8.3 or higher

记一次CentOS7部署Django项目时的报错 问题出现 在部署测试环境时&#xff0c;有需要用到一个python的后端服务&#xff0c;要部署到测试环境中去 心想这不是so easy吗&#xff0c;把本地调试时使用的python版本及Django版本在服务器上对应下载好&#xff0c;然后直接执行命…

MyBatis基础操作

准备工作&#xff1a; 准备数据库表emp -- 部门管理 create table dept(id int unsigned primary key auto_increment comment 主键ID,name varchar(10) not null unique comment 部门名称,create_time datetime not null comment 创建时间,update_time datetime not null com…

axios封装/基础配置

步骤&#xff1a;装包 -> 封装axios实例 ->调用实例发送请求 1. 装包 npm install axios 2. 封装 axios基础配置 // axios实例封装 import axios from axios// 创建axios实例 const axiosInstance axios.create({baseURL:http://xxx.net, //基地址timeout:5000 //…

不用额外插件?RunnerGo内置压测模式怎么选

我们在做性能测试时需要根据性能需求配置不同的压测模式如&#xff1a;阶梯模式。使用jmeter时我们需要安装插件来配置测试模式&#xff0c;为了方便用户使用&#xff0c;RunnerGo内嵌了压测模式这一选项&#xff0c;今天给大家介绍一下RunnerGo的几种压测模式和怎么根据性能需…

Kafka核心原理第二弹——更新中

架构原理 一、高吞吐机制&#xff1a;Batch打包、缓冲区、acks 1. Kafka Producer怎么把消息发送给Broker集群的&#xff1f; 需要指定把消息发送到哪个topic去 首先需要选择一个topic的分区&#xff0c;默认是轮询来负载均衡&#xff0c;但是如果指定了一个分区key&#x…

2023全国大学生数学建模竞赛C题思路模型代码来啦

目录 一.选题建议先发布&#xff0c;思路模型代码论文第一时间更新&#xff0c;获取见文末名片 二.选题建议&#xff0c;后续思路代码论文 C 题 蔬菜类商品的自动定价与补货决策 各题分析 获取完整思路代码见此处名片 一.选题建议先发布&#xff0c;思路模型代码论文第一时…

深入了解苹果证书及其分类,提升iOS应用开发效率

目录 1. 企业证书 2. 开发者证书 开发证书&#xff1a; 发布证书&#xff1a; 3. 推送证书 4. 分发证书 5. MDM证书 摘要&#xff1a;本文将详细介绍苹果证书的作用及分类&#xff0c;包括企业证书、开发者证书、推送证书、分发证书和MDM证书&#xff0c;帮助开发者了解…

webrtc的FULL ICE和Lite ICE

1、ICE的模式 分为FULL ICE和Lite ICE&#xff1a; FULL ICE:是双方都要进行连通性检查&#xff0c;完成的走一遍流程。 Lite ICE: 在FULL ICE和Lite ICE互通时&#xff0c;只需要FULL ICE一方进行连通性检查&#xff0c; Lite一方只需回应response消息。这种模式对于部署在公网…

数学建模--二次规划型的求解的Python实现

目录 1.算法流程简介 2.算法核心代码 3.算法效果展示 1.算法流程简介 #二次规划模型 #二次规划我们需要用到函数:Cvxopt.solvers.qp(P,q,G,h,A,b) #首先解决二次规划问题和解决线性规划问题的流程差不多 """ 求解思路如下: 1.针对给定的代求式,转化成标准式…

简明SQL截断和偏移指南:掌握LIMIT实现数据筛选

以下是用到的表。 截断 LIMIT 用于限制查询结果返回的行数&#xff0c;即最多返回多少行数据。 例如&#xff0c;返回前两行数据。 例如&#xff0c;从第二个数据开始返回两条数据&#xff08;从0开始计算&#xff09;。 偏移 OFFSET 用于指定查询结果的起始位置&#xff0c…

PHP8中查询数组中指定元素-PHP8知识详解

php是使用最广泛的web编程语言&#xff0c;数组是一个数据集合&#xff0c;数组是一种非常常用的数据类型。在操作数组时&#xff0c;有时我们需要查询数组中是否有某个指定元素。在实际的程序开发中&#xff0c;我们用到了下列方法来查询数组中指定的元素&#xff1a;使用arra…