openmetadata1.3.1 自定义连接器 开发教程

openmetadata自定义连接器开发教程

一、开发通用自定义连接器教程

官网教程链接:

1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors

2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector

(一)创建服务类型自定义连接器类

参考文档:https://docs.open-metadata.org/v1.3.x/sdk/python/build-connector/source#for-consumers-of-openmetadata-ingestion-to-define-custom-connectors-in-their-own-package-with-same-namespace

1.创建自定义连接器

示例:my_csv_connector.py

"""
自定义Database Service 从 CSV 文件中提取元数据
"""
import csv
import tracebackfrom pydantic import BaseModel, ValidationError, validator
from pathlib import Path
from typing import Iterable, Optional, List, Dict, Anyfrom metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import Either
from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from metadata.ingestion.api.steps import Source, InvalidSourceException
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import (CustomDatabaseConnection,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.api.data.createDatabaseSchema import (CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.entity.services.databaseService import (DatabaseService,
)
from metadata.generated.schema.entity.data.table import (Column,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class InvalidCsvConnectorException(Exception):"""Sample data is not valid to be ingested"""class CsvModel(BaseModel):name: strcolumn_names: List[str]column_types: List[str]@validator("column_names", "column_types", pre=True)def str_to_list(cls, value):"""Suppose that the internal split is in ;"""return value.split(";")class CsvConnector(Source):"""Custom connector to ingest Database metadata.We'll suppose that we can read metadata from a CSVwith a custom database name from a business_unit connection option."""# 内置方法def __init__(self, config: WorkflowSource, metadata: OpenMetadata):self.config = configself.metadata = metadata# 获取配置信息self.service_connection = config.serviceConnection.__root__.configself.source_directory: str = (# 获取CSV文件路径self.service_connection.connectionOptions.__root__.get("source_directory"))if not self.source_directory:raise InvalidCsvConnectorException("未获取到source_directory配置信息")self.business_unit: str = (# 获取自定义的数据库名称self.service_connection.connectionOptions.__root__.get("business_unit"))if not self.business_unit:raise InvalidCsvConnectorException("未获取到business_unit配置信息")self.data: Optional[List[CsvModel]] = Nonesuper().__init__()# 内置函数@classmethoddef create(cls, config_dict: dict, metadata_config: OpenMetadataConnection) -> "CsvConnector":config: WorkflowSource = WorkflowSource.parse_obj(config_dict)connection: CustomDatabaseConnection = config.serviceConnection.__root__.configif not isinstance(connection, CustomDatabaseConnection):raise InvalidSourceException(f"Expected CustomDatabaseConnection, but got {connection}")return cls(config, metadata_config)# 静态方法:按行读取@staticmethoddef read_row_safe(row: Dict[str, Any]):try:return CsvModel.parse_obj(row)except ValidationError:logger.warning(f"Error parsing row {row}. Skipping it.")# 预处理:读取文件及数据def prepare(self):# Validate that the file existssource_data = Path(self.source_directory)if not source_data.exists():raise InvalidCsvConnectorException("Source Data path does not exist")try:with open(source_data, "r", encoding="utf-8") as file:reader = csv.DictReader(file)# 读取数据self.data = [self.read_row_safe(row) for row in reader]except Exception as exc:logger.error("Unknown error reading the source file")raise excdef yield_create_request_database_service(self):yield Either(# 串讲元数据读取服务right=self.metadata.get_create_service_from_source(entity=DatabaseService, config=self.config))# 业务原数据库名处理方法def yield_business_unit_db(self):# 选择我们刚刚创建的服务(如果不是UI)# 获取提取服务对象service_entity: DatabaseService = self.metadata.get_by_name(entity=DatabaseService, fqn=self.config.serviceName)yield Either(right=CreateDatabaseRequest(name=self.business_unit,service=service_entity.fullyQualifiedName,))# chems处理方法def yield_default_schema(self):# Pick up the service we just created (if not UI)database_entity: Database = self.metadata.get_by_name(entity=Database, fqn=f"{self.config.serviceName}.{self.business_unit}")yield Either(right=CreateDatabaseSchemaRequest(name="default",database=database_entity.fullyQualifiedName,))# 业务元数据处理方法def yield_data(self):"""Iterate over the data list to create tables"""database_schema: DatabaseSchema = self.metadata.get_by_name(entity=DatabaseSchema,fqn=f"{self.config.serviceName}.{self.business_unit}.default",)# 异常处理# 假设我们有一个要跟踪的故障# try:#     1/0# except Exception:#     yield Either(#         left=StackTraceError(#             name="My Error",#             error="Demoing one error",#             stackTrace=traceback.format_exc(),#         )#     )# 解析csv元数据信息(获取列名和类型)for row in self.data:yield Either(right=CreateTableRequest(name=row.name,databaseSchema=database_schema.fullyQualifiedName,columns=[Column(name=model_col[0],dataType=model_col[1],)for model_col in zip(row.column_names, row.column_types)],))# 迭代器:元数据迭代返回def _iter(self) -> Iterable[Entity]:# 数据库元数据信息存储yield from self.yield_create_request_database_service()# 业务源数据库yield from self.yield_business_unit_db()# 业务schemayield from self.yield_default_schema()# 业务数据yield from self.yield_data()# 测试数据库连接def test_connection(self) -> None:pass# 连接关闭def close(self):pass

(二)将自定义连接器方法打包编译进ingestion镜像

项目目录:

image-20240701153616535

Dockerfile:

FROM openmetadata/ingestion:1.3.1# Let's use the same workdir as the ingestion image
WORKDIR ingestion
USER airflow# Install our custom connector
COPY connector connector
COPY setup.py .
COPY sample.csv .
#COPY person_info.proto .
RUN pip install --no-deps .

编译服务镜像

docker build -t om-ingestion:build -f Dockerfile .

(三)部署新版ingestion服务()

docker-compose up -d

docker-compose-ingestion.yml

version: "3.9"
volumes:ingestion-volume-dag-airflow:ingestion-volume-dags:ingestion-volume-tmp:es-data:
services:  ingestion:container_name: om_ingestionimage: om-ingestion:buildenvironment:AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"AIRFLOW__CORE__EXECUTOR: LocalExecutorAIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS: "/opt/airflow/dag_generated_configs"DB_SCHEME: ${AIRFLOW_DB_SCHEME:-postgresql+psycopg2}DB_HOST: ${AIRFLOW_DB_HOST:-host.docker.internal}DB_PORT: ${AIRFLOW_DB_PORT:-5432}AIRFLOW_DB: ${AIRFLOW_DB:-airflow_db}DB_USER: ${AIRFLOW_DB_USER:-airflow_user}DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}# extra connection-string properties for the database# EXAMPLE# require SSL (only for Postgres)# properties: "?sslmode=require"DB_PROPERTIES: ${AIRFLOW_DB_PROPERTIES:-}# To test the lineage backend# AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend# AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflowAIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://host.docker.internal:8585/apiAIRFLOW__LINEAGE__JWT_TOKEN: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDk3MDkyNDMsImV4cCI6bnVsbH0.U7XIYZjJAmJ-p3WTy4rTGGSzUxZeNpjOsHzrWRz7n-zAl-GZvznZWMKX5nSX_KwRHAK3UYuO1UX2-ZbeZxdpzhyumycNFyWzwMs8G6iEGoaM6doGhqCgHileco8wcAoaTXKHTnwa80ddWHt4dqZmikP7cIhLg9etKAepQNQibefewHbaLOoCrFyo9BqFeZzNaVBo1rogNtslWaDO6Wnk_rx0jxRLTy57Thq7R7YS_nZd-JVfYf72BEFHJ_WDZym4k-dusV0PWGzMPYIXq3s1KbpPBt_tUSz4cUrXbLuI5-ZsOWIvUhsLeHJDU-35-RymylhMrQ92kZjsy7v2nl6apQentrypoint: /bin/bashcommand:- "/opt/airflow/ingestion_dependency.sh"expose:- 8080ports:- "8080:8080"networks:- app_net_ingestionvolumes:- ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs- ingestion-volume-dags:/opt/airflow/dags- ingestion-volume-tmp:/tmpnetworks:app_net_ingestion:ipam:driver: defaultconfig:- subnet: "172.16.240.0/24"

(四)根据服务类型选择对应类型的custom服务创建采集器测试

image-20240701160552070

点击保存添加元数据提取器:

image-20240701160623658

image-20240701160654370

二、开发内置连接器教程(Streamsets)

官网教程链接:https://docs.open-metadata.org/v1.3.x/developers/contribute/developing-a-new-connector

(一)定义连接器class类json模版(streamSetsConnection.json)

目录openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/streamSetsConnection.json

{"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/streamSetsConnection.json","$schema": "http://json-schema.org/draft-07/schema#","title": "StreamSetsConnection","description": "StreamSets Metadata Pipeline Connection Config","type": "object","javaType": "org.openmetadata.schema.services.connections.pipeline.StreamSetsConnection","definitions": {"StreamSetsType": {"description": "Service type.","type": "string","enum": ["StreamSets"],"default": "StreamSets"},"basicAuthentication": {"title": "Username Authentication","description": "Login username","type":"object","properties": {"username": {"title": "Username","description": "StreamSets user to authenticate to the API.","type": "string"}},"additionalProperties": false}},"properties": {"type": {"title": "Service Type","description": "Service Type","$ref": "#/definitions/StreamSetsType","default": "StreamSets"},"hostPort": {"expose": true,"title": "Host And Port","description": "Pipeline Service Management/UI URI.","type": "string","format": "uri"},"streamSetsConfig": {"title": "StreamSets Credentials Configuration","description": "We support username authentication","oneOf": [{"$ref": "#/definitions/basicAuthentication"}]},"supportsMetadataExtraction": {"title": "Supports Metadata Extraction","$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"}},"additionalProperties": false,"required": ["hostPort", "streamSetsConfig"]
}

(二)开发采集器源码:

目录:ingestion/src/metadata/ingestion/source/pipeline/streamsets/*

image-20240701162822027

1.streamsets连接客户端(client.py)

import logging
import traceback
from typing import Any, Iterable, Optionalimport requests
from requests import HTTPError
from requests.auth import HTTPBasicAuth# 设置日志记录器
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)REQUESTS_TIMEOUT = 60 * 5def clean_uri(uri: str) -> str:"""清理URI,确保它以HTTP或HTTPS开头"""if not uri.startswith(("http://", "https://")):return "http://" + urireturn uriclass StreamSetsClient:"""在StreamSets Data Collector REST API之上的包装器"""def __init__(self,host_port: str,username: Optional[str] = None,password: Optional[str] = None,verify: bool = False,):self.api_endpoint = clean_uri(host_port) + "/rest"self.username = usernameself.password = passwordself.verify = verifyself.headers = {"Content-Type": "application/json"}def get(self, path: str) -> Optional[Any]:"""GET方法包装器"""try:res = requests.get(f"{self.api_endpoint}/{path}",verify=self.verify,headers=self.headers,timeout=REQUESTS_TIMEOUT,auth=HTTPBasicAuth(self.username, self.password),)res.raise_for_status()return res.json()except HTTPError as err:logger.warning(f"Connection error calling the StreamSets API - {err}")raise errexcept ValueError as err:logger.warning(f"Cannot pick up the JSON from API response - {err}")raise errexcept Exception as err:logger.warning(f"Unknown error calling StreamSets API - {err}")raise errdef list_pipelines(self) -> Iterable[dict]:"""List all pipelines"""try:return self.get("v1/pipelines")except Exception as err:logger.error(traceback.format_exc())raise errdef get_pipeline_details(self, pipeline_id: str) -> dict:"""Get a specific pipeline by ID"""return self.get(f"v1/pipeline/{pipeline_id}?rev=0&get=pipeline")def test_list_pipeline_detail(self) -> Iterable[dict]:"""Test API access for listing pipelines"""return self.list_pipelines()

2.连接器和测试连接器(connection.py)

"""
源连接处理程序
"""
from typing import Optionalfrom metadata.generated.schema.entity.automations.workflow import (Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (BasicAuthentication,StreamSetsConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.streamsets.client import StreamSetsClientdef get_connection(connection: StreamSetsConnection) -> StreamSetsClient:"""Create connection"""if isinstance(connection.streamSetsConfig, BasicAuthentication):return StreamSetsClient(host_port=connection.hostPort,username=connection.streamSetsConfig.username,password="95bd7977208bc935cac3656f4a9eea3a",verify=False,)def test_connection(metadata: OpenMetadata,client: StreamSetsClient,service_connection: StreamSetsConnection,automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:"""元数据工作流或自动化工作流期间测试连接。这可以作为一部分执行"""def custom_executor():list(client.list_pipelines())test_fn = {"GetPipelines": custom_executor}test_connection_steps(metadata=metadata,test_fn=test_fn,service_type=service_connection.type.value,automation_workflow=automation_workflow,)

3.元数据提取器(metadata.py)

"""
提取StreamSets 源的元数据 
"""
import traceback
from typing import Iterable, List, Optional, Anyfrom metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from pydantic import BaseModel, ValidationErrorfrom metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (StreamSetsConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class StagesDetails(BaseModel):instanceName: strlabel:strstageType: strstageName: strdescription: strinputLanes: List[str]outputLanes: List[str]downstream_task_names: set[str] = set()class StreamSetsPipelineDetails(BaseModel):"""Defines the necessary StreamSets information"""uuid: strpipelineId: strtitle: strname: strcreated: intcreator: strdescription: strclass StreamsetsSource(PipelineServiceSource):"""执行必要的方法,从 Airflow 的元数据数据库中提取管道元数据"""@classmethoddef create(cls, config_dict: dict, metadata: OpenMetadata):logger.info("create..........")config: WorkflowSource = WorkflowSource.parse_obj(config_dict)logger.info(f"WorkflowSource: {config}")connection: StreamSetsConnection = config.serviceConnection.__root__.configlogger.info(f"StreamSetsConnection: {connection}")if not isinstance(connection, StreamSetsConnection):raise InvalidSourceException(f"Expected StreamSetsConnection, but got {connection}")return cls(config, metadata)def yield_pipeline(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[CreatePipelineRequest]]:logger.info("yield_pipeline.......")try:connection_url = Noneif self.service_connection.hostPort:connection_url = (f"{clean_uri(self.service_connection.hostPort)}/rest/v1/pipelines")logger.info(f"pipeline_details:{pipeline_details}")logger.info(f"connection_url:{connection_url}")pipeline_request = CreatePipelineRequest(name=pipeline_details.name,displayName=pipeline_details.title,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",tasks=self._get_tasks_from_details(pipeline_details),service=self.context.pipeline_service,)yield Either(right=pipeline_request)self.register_record(pipeline_request=pipeline_request)except TypeError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=(f"Error building DAG information from {pipeline_details}. There might be Airflow version"f" incompatibilities - {err}"),stackTrace=traceback.format_exc(),))except ValidationError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Error building pydantic model for {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))except Exception as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Wild error ingesting pipeline {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))# 获取解析管道详情def _get_tasks_from_details(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[Task]]:logger.info("_get_tasks_from_details.......")logger.info(f"StreamSetsPipelineDetails:{pipeline_details}")try:stages = self.get_stages_by_pipline(pipeline_details)return [Task(name=stage.instanceName,displayName=stage.label,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",taskType=stage.stageType,description=stage.description,downstreamTasks=list(stage.downstream_task_names)if stage.downstream_task_nameselse [],)for stage in stages]except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}.")return Nonedef yield_pipeline_lineage_details(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[AddLineageRequest]]:logger.info("yield_pipeline_lineage_details..........")"""将连接转换为管道实体:param pipeline_details: 来自  StreamSets的pipeline_details对象return:使用任务创建管道请求"""passdef get_pipelines_list(self) -> Optional[List[StreamSetsPipelineDetails]]:logger.info("get_pipelines_list..........")"""Get List of all pipelines"""if self.connection.list_pipelines() is not None:for list_pipeline in self.connection.list_pipelines():logger.info(f"pipeline:{list_pipeline}")try:yield StreamSetsPipelineDetails(uuid=list_pipeline.get("uuid"),pipelineId=list_pipeline.get("pipelineId"),title=list_pipeline.get("title"),name=list_pipeline.get("name"),created=list_pipeline.get("created"),creator=list_pipeline.get("creator"),description=list_pipeline.get("description"),)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StreamSetsPipelineDetails from {list_pipeline} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {list_pipeline} - {err}.")else:return None# 获取上下关联关系def get_stages_lane(self, stages: Optional[List[StagesDetails]]) -> {}:logger.info("get_stages_lane......")input_lane_to_stage_map = {}for stage in stages:logger.info(f"stage_info:{stage}")for input_lane in stage.get("inputLanes", []):try:if input_lane_to_stage_map.get(input_lane) is None:input_lane_to_stage_map[input_lane] = set()input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))else:input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get stages from Pipeline Details {stages} - {err}.")logger.info(f"input_lane_to_stage_map:{input_lane_to_stage_map}")return input_lane_to_stage_mapdef get_stages_by_pipline(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[StagesDetails]]:logger.info("get_stages_by_pipline")pipeline_detail = self.connection.get_pipeline_details(pipeline_details.pipelineId)stages = []if pipeline_detail.get("stages"):stages = pipeline_detail.get("stages")input_lane_to_stage_map = self.get_stages_lane(stages)for stage in stages:logger.info(f"stage:{stage}")try:input_lanes = stage.get("inputLanes", [])output_lanes = stage.get("outputLanes", [])downstream_stage_names = set()for output_lane in stage.get("outputLanes", []):if output_lane in input_lane_to_stage_map.keys():for down_stage in input_lane_to_stage_map.get(output_lane, []):downstream_stage_names.add(down_stage)yield StagesDetails(instanceName=stage.get("instanceName"),label=stage["uiInfo"].get("label"),stageType=stage["uiInfo"].get("stageType"),stageName=stage.get("stageName"),description=stage["uiInfo"].get("description"),inputLanes=input_lanes,outputLanes=output_lanes,downstream_task_names=downstream_stage_names)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StagesDetails from {stage} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {stage} - {err}.")def get_pipeline_name(self, pipeline_details: StreamSetsPipelineDetails) -> str:return pipeline_details.namedef yield_pipeline_status(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[OMetaPipelineStatus]]:pass

(三)修改前端ui源码,添加连接器对象

目录:openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts

/**  Copyright 2022 Collate.*  Licensed under the Apache License, Version 2.0 (the "License");*  you may not use this file except in compliance with the License.*  You may obtain a copy of the License at*  http://www.apache.org/licenses/LICENSE-2.0*  Unless required by applicable law or agreed to in writing, software*  distributed under the License is distributed on an "AS IS" BASIS,*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.*  See the License for the specific language governing permissions and*  limitations under the License.*/import { cloneDeep } from 'lodash';
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json';
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
import databricksPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/databricksPipelineConnection.json';
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
import streamSetsConnection from '../jsons/connectionSchemas/connections/pipeline/streamSetsConnection.json';export const getPipelineConfig = (type: PipelineServiceType) => {let schema = {};const uiSchema = { ...COMMON_UI_SCHEMA };switch (type) {case PipelineServiceType.Airbyte: {schema = airbyteConnection;break;}case PipelineServiceType.Airflow: {schema = airflowConnection;break;}case PipelineServiceType.GluePipeline: {schema = gluePipelineConnection;break;}case PipelineServiceType.Fivetran: {schema = fivetranConnection;break;}case PipelineServiceType.Dagster: {schema = dagsterConnection;break;}case PipelineServiceType.Nifi: {schema = nifiConnection;break;}case PipelineServiceType.StreamSets: {schema = streamSetsConnection;break;}case PipelineServiceType.DomoPipeline: {schema = domoPipelineConnection;break;}case PipelineServiceType.CustomPipeline: {schema = customPipelineConnection;break;}case PipelineServiceType.DatabricksPipeline: {schema = databricksPipelineConnection;break;}case PipelineServiceType.Spline: {schema = splineConnection;break;}default:break;}return cloneDeep({ schema, uiSchema });
};

(四)前端ui源码,添加MD说明文档

路径:openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/StreamSets.md

# StreamSets
在本节中,我们将提供使用 StreamSets 连接器的指南和参考。## 要求
系统 支持 StreamSets 连接器的 1 种连接类型:
- **基本认证**:使用用户名对 StreamSets 进行登陆。您可以在 [docs](https://docs.open-metadata.org/connectors/pipeline/StreamSets) 中找到有关 StreamSets 连接器的详细信息。## 连接详细信息
$$section
### Host and Port $(id="hostPort")
管道服务管理 URI。这应指定为格式为"scheme://hostname:port"的 URI 字符串。例如,“http://localhost:8443”、“http://host.docker.internal:8443”。
$$$$section
### StreamSets Config $(id="StreamSetsConfig")
OpenMetadata 支持基本身份验证(用户名/密码身份验证。有关详细信息,请参阅要求部分。
$$$$section
### Username $(id="username")
用于连接到 StreamSets 的用户名。此用户应该能够向 StreamSets API 发送请求并访问“资源”终结点。
$$

(五)创建 Java ClassConverter(可选)

(六)构建dockefile重新构建镜像

server服务Dockerfile

# Build stage
FROM alpine:3.19 AS buildCOPY openmetadata-dist/target/openmetadata-*.tar.gz /
#COPY docker/openmetadata-start.sh /RUN mkdir -p /opt/openmetadata && \tar zxvf openmetadata-*.tar.gz -C /opt/openmetadata --strip-components 1 && \rm openmetadata-*.tar.gz# Final stage
FROM alpine:3.19EXPOSE 8585RUN adduser -D openmetadata && \apk update && \apk upgrade && \apk add --update --no-cache bash openjdk17-jre tzdata
ENV TZ=Asia/ShanghaiCOPY --chown=openmetadata:openmetadata --from=build /opt/openmetadata /opt/openmetadata
COPY --chmod=755 docker/openmetadata-start.sh /USER openmetadataWORKDIR /opt/openmetadata
ENTRYPOINT [ "/bin/bash" ]
CMD ["/openmetadata-start.sh"]

ingestion服务Dockerfile

路径:ingestion/Dockerfile

FROM apache/airflow:2.7.3-python3.10#FROM docker-compose-ingestion-ingestion:latest
USER root
RUN curl -sS https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl -sS https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
# Install Dependencies (listed in alphabetical order)
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -qq update \&& apt-get -qq install -y \tzdata \alien \build-essential \default-libmysqlclient-dev \freetds-bin \freetds-dev \gcc \gnupg \libaio1 \libevent-dev \libffi-dev \libpq-dev \librdkafka-dev \libsasl2-dev \libsasl2-2 \libsasl2-modules \libsasl2-modules-gssapi-mit \libssl-dev \libxml2 \libkrb5-dev \openjdk-11-jre \openssl \postgresql \postgresql-contrib \tdsodbc \unixodbc \unixodbc-dev \unzip \vim \git \wget --no-install-recommends \# Accept MSSQL ODBC License&& ACCEPT_EULA=Y apt-get install -y msodbcsql18 \&& rm -rf /var/lib/apt/lists/*RUN if [[ $(uname -m) == "arm64" || $(uname -m) == "aarch64" ]]; \then \wget -q https://download.oracle.com/otn_software/linux/instantclient/191000/instantclient-basic-linux.arm64-19.10.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \else \wget -q https://download.oracle.com/otn_software/linux/instantclient/1917000/instantclient-basic-linux.x64-19.17.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \fiENV LD_LIBRARY_PATH=/instantclient# Security patches for base image
# monitor no fixed version for
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-LIBTASN16-3061097
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-MARIADB105-2940589
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-BIND9-3027852
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-EXPAT-3023031 we are already installed the latest
RUN echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/backports.list
RUN apt-get -qq update \&& apt-get -qq install -t bullseye-backports -y \curl \libpcre2-8-0 \postgresql-common \expat \bind9# Required for Starting Ingestion Container in Docker Compose
# Provide Execute Permissions to shell script
COPY --chown=airflow:0 --chmod=775 ingestion/ingestion_dependency.sh /opt/airflow
# Required for Ingesting Sample Data
COPY --chown=airflow:0 --chmod=775 ingestion /home/airflow/ingestionCOPY --chown=airflow:0 --chmod=775 openmetadata-airflow-apis /home/airflow/openmetadata-airflow-apis# Required for Airflow DAGs of Sample Data
#COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dagsUSER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
ENV TZ=Asia/Shanghai# Disable pip cache dir
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching
ENV PIP_NO_CACHE_DIR=1
# Make pip silent
ENV PIP_QUIET=1RUN pip install --upgrade pipWORKDIR /home/airflow/openmetadata-airflow-apis
RUN pip install "."WORKDIR /home/airflow/ingestion# 提供要安装的引入依赖项的参数。默认为全部提供要安装的引入依赖项的参数。默认为全部
ARG INGESTION_DEPENDENCY="all"
RUN pip install ".[${INGESTION_DEPENDENCY}]"# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593
RUN echo "Image built for $(uname -m)"
RUN if [[ $(uname -m) != "aarch64" ]]; \then \pip install "ibm-db-sa~=0.4"; \fi# bump python-daemon for https://github.com/apache/airflow/pull/29916
RUN pip install "python-daemon>=3.0.0"
# remove all airflow providers except for docker and cncf kubernetes
RUN pip freeze | grep "apache-airflow-providers" | grep --invert-match -E "docker|http|cncf" | xargs pip uninstall -y
# Uninstalling psycopg2-binary and installing psycopg2 instead 
# because the psycopg2-binary generates a architecture specific error 
# while authenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient==2.1.1
# Make required folders for openmetadata-airflow-apis
RUN mkdir -p /opt/airflow/dag_generated_configsEXPOSE 8080
# This is required as it's responsible to create airflow.cfg file
RUN airflow db init && rm -f /opt/airflow/airflow.db

(七)构建服务镜像

根目录下执行构建server:

docker build -t om-server:build -f docker/development/Dockerfile .

根目录下执行构建ingestion:

docker build -t om-ingestion:build -f ingestion/Dockerfile .

(八)部署新版服务

docker-compose -f docker/development/docker-compose-postgres.yml up -d

(九)访问服务,创建streamsets元数据采集

image-20240701165027755

image-20240701165054548

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/371006.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

24西安电子科技大学经济与管理学院—考研录取情况

24西安电子科技大学—经理与管理学院—考研录取统计 01、经理与管理学院各个方向 02、24经济与管理近三年复试分数线对比 1、经管院24年院线相对于23年院线普遍下降2-15分,个别专业上涨4-10分。 2、经管院应用经济学2024年院线350分;管理科学与工程院线…

Apache Seata tcc 模块源码分析

本文来自 Apache Seata官方文档,欢迎访问官网,查看更多深度文章。 本文来自 Apache Seata官方文档,欢迎访问官网,查看更多深度文章。 一 .导读 spring 模块分析中讲到,Seata 的 spring 模块会对涉及到分布式业务的 b…

Linux——进程间通信一(共享内存、管道、systrem V)

一、进程间通信介绍 1.1、进程间通信的概念和意义 进程间通信(IPC interprocess communication)是一组编程接口,让不同进程之间相互传递、交换信息(让不同的进程看到同一份资源) 数据传输:一个进程需要将它的数据发送给另外一个进程 资源共享:多个进程之间共享同样…

如何选择一家适合自己的商城源码?

商城源码的选择取决于多个因素,包括商城的功能需求、稳定性、易用性、可定制性以及价格等。启山智软作为在市场上被广泛认可且表现优异的商城源码提供商,具有以下的特点和优势: 特点①:国内知名的B2B2C开源商城源码系统&#xff…

Golang | Leetcode Golang题解之第213题打家劫舍II

题目: 题解: func _rob(nums []int) int {first, second : nums[0], max(nums[0], nums[1])for _, v : range nums[2:] {first, second second, max(firstv, second)}return second }func rob(nums []int) int {n : len(nums)if n 1 {return nums[0]}…

python conda查看源,修改源

查看源 conda config --show-sources 修改源 可以直接vim .condarc修改源,

vue事件处理v-on或@

事件处理v-on或 我们可以使用v-on指令(简写)来监听DOM事件,并在事件触发时执行对应的Javascript。用法:v-on:click"methodName"或click"hander" 事件处理器的值可以是: 内敛事件处理器&#xff1…

如何让代码兼容 Python 2 和 Python 3?Future 库助你一臂之力

目录 01Future 是什么? 为什么选择 Future? 安装与配置 02Future 的基本用法 1、兼容 print 函数 2、兼容整数除法 3、兼容 Unicode 字符串 03Future 的高级功能 1. 处理字符串与字节 2. 统一异常处理…

基于SpringBoot的校园台球厅人员与设备管理系统

本系统是要设计一个校园台球厅人员与设备管理系统,这个系统能够满足校园台球厅人员与设备的管理及用户的校园台球厅人员与设备管理功能。系统的主要功能包括首页、个人中心、用户管理、会员账号管理、会员充值管理、球桌信息管理、会员预约管理、普通预约管理、留言…

LRU缓存算法设计

LRU 缓存算法的核⼼数据结构就是哈希链表,双向链表和哈希表的结合体。这个数据结构⻓这样: 创建的需要有两个方法,一个是get方法,一个是put方法。 一些问题:为什么需要使用双向链表呢?因为删除链表的本身&…

第一节 网络安全概述

一.网络空间安全 网络空间:一个由信息基础设施组成相互依赖的网络。 ---- 海陆空天(大海、陆 地、天空、航天) 通信保密阶段 ---- 计算机安全 ----- 信息系统安全 ----- 网络空间安全 计算机安全:开始秉持着“严于律己&#x…

《安富莱嵌入式周报》第339期:单片机运行苹果早期Mac系统模拟器,2GHz示波器有源探头,下一代矩阵开关面包板,卡片式声音分贝器,HP经典示波器,ReRAM

周报汇总地址:嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 视频版 https://www.bilibili.com/video/BV1Kf421Q7Lh 《安富莱嵌入式周报》第339期:单片机运行苹果早期Ma…

Linux开发讲课33---线程实现与线程控制步骤简析

线程概述 进程是系统中程序执行和资源分配的基本单位。 每个进程都拥有自己的数据段、代码段和堆栈段,这就造成了进程在进行切换等操作时都需要有比较负责的上下文切换等动作。为了进一步减少处理机的空转时间支持多处理器和减少上下文切换开销,进程在演…

IDEA安装IDE Eval Reset插件,30天自动续期,无限激活

第一步: 下载idea 注意:版本要是2021.2.2以下 第二步:快捷键CtrlAlts打开设置 第三步:打开下图中蓝色按钮 第四步:点击弹窗的 “” ,并输入 plugins.zhile.io 点击 “ok” 第五步:搜索IDE Ea…

【文献解析】一种像素级的激光雷达相机配准方法

大家好呀,我是一个SLAM方向的在读博士,深知SLAM学习过程一路走来的坎坷,也十分感谢各位大佬的优质文章和源码。随着知识的越来越多,越来越细,我准备整理一个自己的激光SLAM学习笔记专栏,从0带大家快速上手激…

搭建论坛和mysql数据库安装和php安装

目录 概念 步骤 安装mysql8.0.30 安装php 安装Discuz 概念 搭建论坛的架构: lnmpDISCUZ l 表示linux操作系统 n 表示nginx前端页面的web服务 m 表示 mysql 数据库 用来保存用户和密码以及论坛的相关内容 p 表示php 动态请求转发的中间件 步骤 &#xff…

Explore Synapse

rm -r dp-203 -f git clone https://github.com/MicrosoftLearning/dp-203-azure-data-engineer dp-203 cd dp-203/Allfiles/labs/01 ./setup.ps1 -- This is auto-generated code SELECTTOP 100 * FROMOPENROWSET(BULK https://datalakexxxxxxx.dfs.core.windows.net/fil…

分布式共识算法

分布式的基石 分布式共识算法 前置知识:分布式的 CAP 问题,在事务一章中已有详细介绍。 正式开始探讨分布式环境中面临的各种技术问题和解决方案以前,我们先把目光从工业界转到学术界,学习两三种具有代表性的分布式共识算法&…

Python 编程快速上手——让繁琐工作自动化(第2版)读书笔记01 Python基础快速过关

Python 编程快速上手——让繁琐工作自动化(第2版)读书笔记01 Python基础快速过关 1 python基础概念 Python提供了高效的高级数据结构,还能简单有效地面向对象编程。 python运算符顺序 **——%——//——/——*——-——python中常见的数据…

Linux防火墙使用(firewalld与iptables)

防火墙概述 防火墙是一种由硬件和软件组合而成,在内部网和外部网之间、专有网和公共网之间构造的保护屏障,用以保护用户资料和信息安全的一种技术 防火墙作用在于及时发现并处理计算机网络运行时可能存在的安全风险、数据传输等问题,从而实现…