导入JDBC元数据到Apache Atlas

前言

前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美

本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas

数据库schema与catalog

按照SQL标准的解释,在SQL环境下CatalogSchema都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog,每个Catalog又包含多个Schema,而每个Schema又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema,而该Schema又必然属于一个Catalog,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog名称.Schema名称.表名称。这里还有一点需要注意的是,SQL标准并不要求每个数据库对象的完全限定名称是唯一的。

从实现的角度来看,各种数据库系统对CatalogSchema的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog名,使用用户名作为Schema名,具体可参见下表:

表1 常用数据库

供应商Catalog支持Schema支持
Oracle不支持Oracle User ID
MySQL不支持数据库名
MS SQL Server数据库名对象属主名,2005版开始有变
DB2指定数据库对象时,Catalog部分省略Catalog属主名
Sybase数据库名数据库属主名
Informix不支持不需要
PointBase不支持数据库名

原文:https://www.cnblogs.com/ECNB/p/4611309.html

元数据模型层级抽象

不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系

在这里插入图片描述

  • Datasource -> Catalog -> Schema -> Table -> Column
  • Datasource -> Catalog -> Table -> Column
  • Datasource -> Schema -> Table -> Column

元数据转换设计

在这里插入图片描述

提供元数据

借鉴Apache DolphinScheduler中获取Connection的方式,不多赘述。

public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);logger.info("Get connection from datasource {}", datasourceUniqueId);DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());if (null == dataSourceChannel) {throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));}return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);});return dataSourceClient.getConnection();}

转换元数据

  1. 元数据模型

创建数据库的元数据模型

private AtlasEntityDef createJdbcDatabaseDef() {AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string"));typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);return typeDef;
}

创建数据库模式的元数据模型

private AtlasEntityDef createJdbcSchemaDef() {AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(SchemaProperties.JDBC_TYPE_SCHEMA,Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET));typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "tables");}});return typeDef;
}

创建数据库表的元数据模型

private AtlasEntityDef createJdbcTableDef() {AtlasEntityDef typeDef = createClassTypeDef(TableProperties.JDBC_TYPE_TABLE,Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "columns");}});return typeDef;
}

创建数据库列的元数据模型

private AtlasEntityDef createJdbcColumnDef() {AtlasEntityDef typeDef = createClassTypeDef(ColumnProperties.JDBC_TYPE_COLUMN,Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);HashMap<String, String> options = new HashMap<>() {{put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");}};typeDef.setOptions(options);return typeDef;
}

创建实体之间的关系模型

private List<AtlasRelationshipDef> createAtlasRelationshipDef() {String version = "1.0";// 数据库和模式的关系AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false));databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_TABLES,BaseProperties.RELATIONSHIP_DATABASE_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false));databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 模式和数据表的关系// 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_SCHEMA_TABLES,BaseProperties.RELATIONSHIP_SCHEMA_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false));schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 表和数据列的关系AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_TABLE_COLUMNS,BaseProperties.RELATIONSHIP_TABLE_COLUMNS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false));tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
  1. 提取元数据

    不再赘述

  2. 转换元数据

使用工厂模式,提供不同类型的元数据转换方式

public interface JdbcTransferFactory {JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);boolean supportType(String type);String getName();
}

List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysqlinformation_schema

public interface JdbcTransfer {void transfer();JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}

举例:JdbcMysqlTransfer 和 MysqlTransferFactory

@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {public static final String MYSQL = "mysql";@Overridepublic JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {return new JdbcMysqlTransfer(metaData, client);}@Overridepublic boolean supportType(String type) {return MYSQL.equalsIgnoreCase(type);}@Overridepublic String getName() {return MYSQL;}
}
public class JdbcMysqlTransfer implements JdbcTransfer {private final Jdbc jdbc;private final AtlasService atlasService;private List<Pattern> ignorePatterns;public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {this.jdbc = new Jdbc(new JdbcMetadata(metaData));this.atlasService = new AtlasService(client);this.ignorePatterns = Collections.emptyList();}@Overridepublic JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {this.ignorePatterns = ignorePatterns;return this;}private boolean tableIsNotIgnored(String tableName) {return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());}@Overridepublic void transfer() {// 1.数据库实体转换DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);// 2.表实体转换String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream().filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName())).map(new TableTransfer(atlasService, databaseEntity)).toList();// 3.列转换for (AtlasEntity tableEntity : tableEntities) {String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);jdbc.getColumns(catalog, catalog, tableName).parallelStream().forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));}}}
  1. 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {private final AtlasService atlasService;public DatabaseTransfer(AtlasService atlasService) {this.atlasService = atlasService;}@Overridepublic AtlasEntity apply(Jdbc jdbc) {String userName = jdbc.getUserName();String driverName = jdbc.getDriverName();String productName = jdbc.getDatabaseProductName();String productVersion = jdbc.getDatabaseProductVersion();String url = jdbc.getUrl();String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);// 特殊处理 Oracleif (productName.equalsIgnoreCase("oracle")){catalogName = userName.toUpperCase();urlWithNoParams = urlWithNoParams + "/" + catalogName;}DatabaseProperties properties = new DatabaseProperties();properties.setQualifiedName(urlWithNoParams);properties.setDisplayName(catalogName);properties.setOwner(userName);properties.setUrl(url);properties.setDriverName(driverName);properties.setProductName(productName);properties.setProductVersion(productVersion);// 1.创建Atlas EntityAtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());// 2.判断是否存在实体, 存在则填充GUIDMap<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));// 3,存储或者更新到Atlas中if (entityHeader.isPresent()){atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));}else {AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));atlasEntity.setGuid(header.getGuid());}return atlasEntity;}
}

效果展示

  1. 元数据类型定义

在这里插入图片描述

在这里插入图片描述

  1. 测试导入元数据

由于mysql没有采用schema,因此jdbc_schema为空

在这里插入图片描述

如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容

在这里插入图片描述

数据表元数据,qualifiedName使用数据库连接url.表名
在这里插入图片描述

如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/210109.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全文检索[ES系列] - 第495篇

历史文章&#xff08;文章累计490&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 M…

java设计模式学习之【建造者模式】

文章目录 引言建造者模式简介定义与用途实现方式&#xff1a; 使用场景优势与劣势建造者模式在spring中的应用CD&#xff08;光盘&#xff09;的模拟示例UML 订单系统的模拟示例UML 代码地址 引言 建造者模式在创建复杂对象时展现出其强大的能力&#xff0c;特别是当这些对象需…

linux 应用开发笔记---【标准I/O库/文件属性及目录】

一&#xff0c;什么是标准I/O库 标准c库当中用于文件I/O操作相关的一套库函数&#xff0c;实用标准I/O需要包含头文件 二&#xff0c;文件I/O和标准I/O之间的区别 1.标准I/O是库函数&#xff0c;而文件I/O是系统调用 2.标准I/O是对文件I/O的封装 3.标准I/O相对于文件I/O具有更…

为什么 SQL 不适合图数据库

背景 “为什么你们的图形产品不支持 SQL 或类似 SQL 的查询语言&#xff1f;” 过去&#xff0c;我们的一些客户经常问这个问题&#xff0c;但随着时间的推移&#xff0c;这个问题变得越来越少。 尽管一度被忽视&#xff0c;但图数据库拥有无缝设计并适应其底层数据结构的查询…

SpringBoot启动流程

SpringBoot启动流程 文章目录 SpringBoot启动流程SpringBoot启动流程 SpringBoot启动流程 视频链接&#xff1a; https://www.bilibili.com/video/BV15b4y1a7yG/?p174&spm_id_frompageDriver&vd_sourcef6debc5a79e3f424f9dde2f13891b158 看李老师讲的吧&#xff0c;特…

WPF实战项目十九(客户端):修改RestSharp的引用

修改HttpRestClient&#xff0c;更新RestSharp到110.2.0&#xff0c;因为106版本和110版本的代码不一样&#xff0c;所以需要修改下代码 using Newtonsoft.Json; using RestSharp; using System; using System.Threading.Tasks; using WPFProjectShared;namespace WPFProject.S…

【C++】STL简介(了解)【STL的概念,STL的历史缘由,STL六大组件、STL的重要性、以及如何学习STL、STL的缺陷的讲解】

这里写自定义目录标题 一、什么是STL二、STL的版本1. 原始版本2. P. J. 版本3. RW版本★ 4. SGI版本 三、STL的六大组件四、STL的重要性五、如何学习STL六、STL的缺陷 一、什么是STL STL ( standard template libaray - 标准模板库 )&#xff1a;是C标准库 的重要组成部分&…

使用栈解决括号匹配问题(详解)

项目结构 项目头文件的代码或截图 头文件代码 #ifndef LINKSTACK_H #define LINKSTACK_H #include <stdio.h> #include <stdlib.h> // 链式栈的节点 typedef struct LINKNODE {struct LINKNODE* next; }LinkNode; // 链式栈 typedef struct LINKSTACK {LinkNode h…

VUE项目启动报错: ERROR Error: error:0308010C:digital envelope routines::unsupported

1.vue项目启动报错&#xff1a; 是因为新版本node造成的 方法&#xff1a;修改 packge.json文件 修改为下图&#xff1a; 添加了如下代码 SET NODE_OPTIONS--openssl-legacy-provider && 就可以正常启动了

element-ui upload组件中将file文件数据转成二进制流数据格式

方法一 handleBeforeUpload (file)const reader new FileReader()reader.readAsArrayBuffer(file)reader.onload async function (theFile) {const binary new Blob([theFile.target.result]) // 转成二进制流数据 即binary数据格式}}方法二 const aBlob new Blob([file],…

2024 年甘肃省职业院校技能大赛中职组 电子与信息类“网络安全”赛项竞赛样题-C

2024 年甘肃省职业院校技能大赛中职组 电子与信息类“网络安全”赛项竞赛样题-C 目录 2024 年甘肃省职业院校技能大赛中职组 电子与信息类“网络安全”赛项竞赛样题-C 需要环境或者解析可以私信 &#xff08;二&#xff09;A 模块基础设施设置/安全加固&#xff08;200 分&…

使用Java语言判断一个数据类型是奇数还是偶数

判断一个数字类型是奇数&#xff0c;还是偶数&#xff0c;只需要引入Scanner类&#xff0c;然后按照数据类型的定义方式进行定义&#xff0c;比较是按照与2进行整除后的结果&#xff1b;如果余数为零&#xff0c;则代表为偶数&#xff0c;否则为奇数。 import java.util.Scann…

ES6 import

这里 import 的文件是项目内自己 export 的对象&#xff0c;并非 package.json 里引用的包。 后者的打包策略和配置有关。 原理&#xff1a;彻底理解JavaScript ES6中的import和export - 知乎

Ruby和HTTParty库下载代码示例

ruby require httparty require nokogiri # 设置服务器 proxy_host "" proxy_port "" # 定义URL url "" # 创建HTTParty对象&#xff0c;并设置服务器 httparty HTTParty.new( :proxy > "#{proxy_host}:#{proxy_port}" ) …

CrapApi 本地部署 windows+Linux部署( maven+tomcat+idea)

目录 一、本章节所用到的资源共享&#xff0c;嫌麻烦的可以直接下载本地配置好运行使用二、idea maven tomcat启动&#xff0c;我的maven和tomcat的配置三、遇到的问题四、项目运行后效果图转载请标明出处&#xff0c;写作不易如果有用请给个赞~~~~~~~~~~~~~~~~~~~~~~~~~~~~~…

聚焦数据库Serverless创新,就在2023亚马逊云科技re:Invent

11月28日&#xff0c;亚马逊云科技在其最新的re:Invent 2023大会上宣布了三项重要的serverless创新&#xff0c;这些创新将极大地简化客户在任何规模上分析和管理数据的能力。以下是这些发布的主要要点总结和分析。 Amazon Aurora Limitless Database的新功能&#xff1a; 功能…

键入网址到网页显示,期间发生了什么?(计算机网络)

一、浏览器首先会对URL进行解析 下面以http://www.server.com/dir1/file1.html为例 当没有路径名时&#xff0c;就代表访问根目录下事先设置的默认文件&#xff0c;也就是 /index.html 或者 /default.html 对URL进行解析之后&#xff0c;浏览器确定了 Web 服务器和文件名&am…

Idea 导入Mysql8.0驱动jar包

库是模块可以依赖的已编译代码的集合。在IntelliJ IDEA中&#xff0c;可以在三个级别上定义库&#xff1a; 全局 &#xff08;可用于许多项目&#xff09;&#xff0c; 项目&#xff08;可用于项目中的所有模块&#xff09;和模块 &#xff08;可用于一个模块&#xff09; 简单…

密码学学习笔记(二十三):哈希函数的安全性质:抗碰撞性,抗第一原象性和抗第二原象性

在密码学中&#xff0c;哈希函数是一种将任意长度的数据映射到固定长度输出的函数&#xff0c;这个输出通常称为哈希值。理想的哈希函数需要具备几个重要的安全性质&#xff0c;以确保数据的完整性和验证数据的来源。这些性质包括抗碰撞性、抗第一原象性和抗第二原象性。 抗碰…

LeedCode刷题---双指针问题

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C/C》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 双指针简介 常见的双指针有两种形式&#xff0c;一种是对撞指针&#xff0c;一种是左右指针。 对撞指针:一般用于顺序结构中&…