将 OneLake 数据索引到 Elasticsearch - 第二部分

作者:来自 Elastic Gustavo Llermaly 及 Jeffrey Rengifo

本文分为两部分,第二部分介绍如何使用自定义连接器将 OneLake 数据索引并搜索到 Elastic 中。

在本文中,我们将利用第 1 部分中学到的知识来创建 OneLake 自定义 Elasticsearch 连接器。

我们已经上传了一些 OneLake 文档并将其索引到 Elasticsearch 中以供搜索。但是,这仅适用于一次性上传。如果我们想要同步数据,那么我们需要开发一个更复杂的系统。

幸运的是,Elastic 有一个连接器框架可用于开发满足我们需求的自定义连接器:

我们现在将根据本文制作一个 OneLake 连接器:如何为 Elasticsearch 创建自定义连接器。

步骤

  • 连接器引导
  • 实现 BaseDataSource 类
  • 身份验证
  • 运行连接器
  • 配置计划

连接器引导

背景信息:Elastic 连接器分为两种类型:

  1. Elastic 托管连接器:完全由 Elastic Cloud 托管和运行。
  2. 自托管连接器:由用户自行托管,必须部署在你的基础设施中。

自定义连接器属于 “连接器客户端” 类别,因此我们需要下载并部署连接器框架。

首先,克隆连接器的代码库:

git clone https://github.com/elastic/connectors

现在在 requirements/framework.txt 文件末尾添加你将使用的依赖项。在本例中:

azure-identity==1.19.0
azure-storage-file-datalake==12.17.0

这样,存储库就完成了,我们可以开始编码了。

实现 BaseDataSource 类

你可以在此存储库中找到完整的工作代码。

我们将介绍 onelake.py 文件中的核心部分。

在导入和类声明之后,我们必须定义将捕获配置参数的 __init__ 方法。

"""OneLake connector to retrieve data from datalakes"""from functools import partialfrom azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClientfrom connectors.source import BaseDataSourceACCOUNT_NAME = "onelake"class OneLakeDataSource(BaseDataSource):"""OneLake"""name = "OneLake"service_type = "onelake"incremental_sync_enabled = True# Here we can enter the data that we'll later need to connect our connector to OneLake.def __init__(self, configuration):"""Set up the connection to the azure base clientArgs:configuration (DataSourceConfiguration): Object of DataSourceConfiguration class."""super().__init__(configuration=configuration)self.tenant_id = self.configuration["tenant_id"]self.client_id = self.configuration["client_id"]self.client_secret = self.configuration["client_secret"]self.workspace_name = self.configuration["workspace_name"]self.data_path = self.configuration["data_path"]

然后,你可以配置 UI 将显示的表单,使用返回配置字典的 get_default_configuration 方法填充这些参数。

    # Method to generate the Enterprise Search UI fields for the variables we need to connect to OneLake.@classmethoddef get_default_configuration(cls):"""Get the default configuration for OneLakeReturns:dictionary: Default configuration"""return {"tenant_id": {"label": "OneLake tenant id","order": 1,"type": "str",},"client_id": {"label": "OneLake client id","order": 2,"type": "str",},"client_secret": {"label": "OneLake client secret","order": 3,"type": "str","sensitive": True, # To hide sensitive data like passwords or secrets},"workspace_name": {"label": "OneLake workspace name","order": 4,"type": "str",},"data_path": {"label": "OneLake data path","tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>","order": 5,"type": "str",},"account_name": {"tooltip": "In the most cases is 'onelake'","default_value": ACCOUNT_NAME,"label": "Account name","order": 6,"type": "str",},}

然后我们配置下载方法,并从 OneLake 文档中提取内容。

async def download_file(self, file_client):"""Download file from OneLakeArgs:file_client (obj): File clientReturns:generator: File stream"""try:download = file_client.download_file()stream = download.chunks()for chunk in stream:yield chunkexcept Exception as e:self._logger.error(f"Error while downloading file: {e}")raiseasync def get_content(self, file_name, doit=None, timestamp=None):"""Obtains the file content for the specified file in `file_name`.Args:file_name (obj): The file name to process to obtain the content.timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None.doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.Returns:str: Content of the file or None if not applicable."""if not doit:returnfile_client = await self._get_file_client(file_name)file_properties = file_client.get_file_properties()file_extension = self.get_file_extension(file_name)doc = {"_id": f"{file_client.file_system_name}_{file_properties.name}",  # workspacename_data_path"name": file_properties.name.split("/")[-1],"_timestamp": file_properties.last_modified,"created_at": file_properties.creation_time,}can_be_downloaded = self.can_file_be_downloaded(file_extension=file_extension,filename=file_properties.name,file_size=file_properties.size,)if not can_be_downloaded:return docextracted_doc = await self.download_and_extract_file(doc=doc,source_filename=file_properties.name.split("/")[-1],file_extension=file_extension,download_func=partial(self.download_file, file_client),)return extracted_doc if extracted_doc is not None else doc

为了让我们的连接器对框架可见,我们需要在 connectors/config.py 文件中声明它。为此,我们将以下代码添加到源中:

 "sources": {..."onelake": "connectors.sources.onelake:OneLakeDataSource",...}

身份验证

在测试连接器之前,我们需要获取 client_id, tenant_id 和 client_secret,我们将使用它们从连接器访问工作区。

我们将使用 service principals 作为身份验证方法。

Azure service principal 是为与应用程序、托管服务和自动化工具一起使用以访问 Azure 资源而创建的身份。

步骤如下:

  1. 创建应用程序并收集 client_id、tenant_id 和 client_secret
  2. 在工作区中启用 service principal
  3. 将 service principal 添加到工作区

你可以逐步遵循本教程。

准备好了吗?现在是测试连接器的时候了!

运行连接器

连接器准备好后,我们现在可以连接到我们的 Elasticsearch 实例。

转到: Search > Content > Connectors > New connector 并选择 Customized Connector

选择要创建的名称,然后选择 “Create and attach an index” 以创建与连接器同名的新索引。

你现在可以使用 Docker 运行它或从源代码运行它。在此示例中,我们将使用 “Run from source”。

单击 “Generate Configuration”,然后将框中的内容粘贴到项目根目录中的 config.yml 文件中。在字段 service_type 上,你必须匹配 Connectors/config.py 中的连接器名称。在本例中,将 changeme 替换为 onelake。

现在,你可以使用以下命令运行连接器:

make install
make run

如果连接器正确初始化,你应该在控制台中看到如下消息:

注意:如果出现兼容性错误,请检查你的连接器/版本文件并与你的 Elasticsearch 集群版本进行比较:与 Elasticsearch 的版本兼容性。我们建议保持连接器版本和 Elasticsearch 版本同步。在本文中,我们使用 Elasticsearch 和连接器版本 8.15。

如果一切顺利,我们的本地连接器将与我们的 Elasticsearch 集群通信,我们将能够使用我们的 OneLake 凭据对其进行配置:

我们现在将索引来自 OneLake 的文档。为此,请单击 Sync > Full Content,运行完整内容同步:

同步完成后,你应该在控制台中看到以下内容:

在企业搜索 UI 中,你可以单击 “Documents” 来查看已索引的文档:

配置计划

你可以根据需要使用 UI 安排定期内容同步,以使索引保持更新并与 OneLake 同步。

要配置计划同步,请转到 “Search > Content > Connectors,然后选择你的连接器。然后单击 “scheduling”:

或者,你可以使用允许 CRON 表达式的更新连接器调度 API。

结论

在第二部分中,我们通过使用 Elastic 连接器框架并开发我们自己的 OneLake 连接器来轻松与我们的 Elastic Cloud 实例通信,将我们的配置更进一步。

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch - Part II - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/7371.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

“AI教学实训系统:打造未来教育的超级引擎

嘿&#xff0c;各位教育界的伙伴们&#xff0c;今天我要跟你们聊聊一个绝对能让你们眼前一亮的教学神器——AI教学实训系统。作为资深产品经理&#xff0c;我可是亲眼见证了这款系统如何颠覆传统教学&#xff0c;成为未来教育的超级引擎。 一、什么是AI教学实训系统&#xff1f…

Linux下php8安装phpredis扩展的方法

Linux下php8安装phpredis扩展的方法 下载redis扩展执行安装编辑php.ini文件重启php-fpmphpinfo 查看 下载redis扩展 前提是已经安装好redis服务了 php-redis下载地址 https://github.com/phpredis/phpredis 执行命令 git clone https://github.com/phpredis/phpredis.git执行…

基于SMPL的三维人体重建-深度学习经典方法之VIBE

本文以开源项目VIBE[1-2]为例&#xff0c;介绍下采用深度学习和SMPL模板的从图片进行三维人体重建算法的整体流程。如有错误&#xff0c;欢迎评论指正。 一.算法流程 包含生成器模块和判别器模块&#xff0c;核心贡献就在于引入了GRU模块&#xff0c;使得当前帧包含了先前帧的先…

2.1.3 第一个工程,点灯!

新建工程 点击菜单栏左上角&#xff0c;新建工程或者选择“文件”-“新建工程”&#xff0c;选择工程类型“标准工程”选择设备类型和编程语言&#xff0c;并指定工程文件名及保存路径&#xff0c;如下图所示&#xff1a; 选择工程类型为“标准工程” 选择主模块机型&#x…

CVE-2025-0411 7-zip 漏洞复现

文章目录 免责申明漏洞描述影响版本漏洞poc漏洞复现修复建议 免责申明 本文章仅供学习与交流&#xff0c;请勿用于非法用途&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任 漏洞描述 此漏洞 &#xff08;CVSS SCORE 7.0&#xff09; 允许远程攻击者绕…

mysql 学习6 DML语句,对数据库中的表进行 增 删 改 操作

添加数据 我们对 testdatabase 数据中 的 qqemp 这张表进行 增加数据&#xff0c;在这张表 下 打开 命令行 query console 在 软件中就是打开命令行的意思 可以先执行 desc qqemp; 查看一下当前表的结构。 插入一条数据 到qqemp 表&#xff0c;插入时要每个字段都有值 insert…

[特殊字符]【计算机视觉】r=2 采样滤波器全解析 ✨

Hey小伙伴们&#xff01;今天来给大家分享一个在 计算机视觉 领域中非常有趣但又超级重要的概念——r2 采样滤波器&#xff08;Sampling Filter with r2&#xff09;。通过这种滤波器&#xff0c;我们可以在图像降采样的过程中有效地减少混叠效应&#xff0c;提升图像质量。 如…

数据库SQLite和SCADA DIAView应用教程

课程简介 此系列课程大纲主要包含七个课时。主要使用到的开发工具有&#xff1a;SQLite studio 和 SCADA DIAView。详细的可成内容大概如下&#xff1a; 1、SQLite 可视化管理工具SQLite Studio &#xff1a;打开数据库和查询数据&#xff1b;查看视频 2、创建6个变量&#x…

【开源免费】基于Vue和SpringBoot的景区民宿预约系统(附论文)

本文项目编号 T 162 &#xff0c;文末自助获取源码 \color{red}{T162&#xff0c;文末自助获取源码} T162&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

[Spring] Gateway详解

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

Pandas基础02(DataFrame创建/索引/切片/属性/方法/层次化索引)

DataFrame数据结构 DataFrame 是一个二维表格的数据结构&#xff0c;类似于数据库中的表格或 Excel 工作表。它由多个 Series 组成&#xff0c;每个 Series 共享相同的索引。DataFrame 可以看作是具有列名和行索引的二维数组。设计初衷是将Series的使用场景从一维拓展到多维。…

矩阵快速幂

矩阵快速幂&#xff1a; 高效计算矩阵的幂次&#xff08;如A^n&#xff09;的一种算法&#xff0c;只适用于计算某一项&#xff0c;而不是全部项。 递推公式 如果 n为偶数&#xff0c;则&#xff1a; A^nA^(n/2)A^(n/2) 如果 nnn 为奇数&#xff0c;则&#xff1a; A^nA^(n-1…

复位信号的同步与释放(同步复位、异步复位、异步复位同步释放)

文章目录 背景前言一、复位信号的同步与释放1.1 同步复位1.1.1 综述1.1.2 优缺点 1.2 recovery time和removal time1.3 异步复位1.3.1 综述1.3.2 优缺点 1.4 同步复位 与 异步复位1.5 异步复位、同步释放1.5.1 总述1.5.2 机理1.5.3 复位网络 二、思考与补充2.1 复…

【Git版本控制器--3】Git的远程操作

目录 理解分布式版本控制系统 创建远程仓库 仓库被创建后的配置信息 克隆远程仓库 https克隆仓库 ssh克隆仓库 向远程仓库推送 拉取远程仓库 忽略特殊文件 为什么要忽略特殊文件&#xff1f; 如何配置忽略特殊文件&#xff1f; 配置命令别名 标签管理 理…

ios打包:uuid与udid

ios的uuid与udid混乱的网上信息 新人开发ios&#xff0c;发现uuid和udid在网上有很多帖子里是混淆的&#xff0c;比如百度下&#xff0c;就会说&#xff1a; 在iOS中使用UUID&#xff08;通用唯一识别码&#xff09;作为永久签名&#xff0c;通常是指生成一个唯一标识&#xf…

中国认知作战研究中心:从认知战角度分析2007年iPhone发布

中国认知作战研究中心&#xff1a;从认知战角度分析2007年iPhone发布 中国认知作战研究中心&#xff1a;从认知战角度分析2007年iPhone发布 关键词 认知作战,新质生产力,人类命运共同体,认知战,认知域,认知战研究中心,认知战争,认知战战术,认知战战略,认知域作战研究,认知作…

Chameleon(变色龙) 跨平台编译C文件,并一次性生成多个平台的可执行文件

地址:https://github.com/MartinxMax/Chameleon Chameleon 跨平台编译C文件&#xff0c;并一次性生成多个平台的可执行文件。可以通过编译Chameleon自带的.C文件反向Shell生成不同平台攻击载荷。 登录 & 代理设置 按照以下步骤设置 Docker 的代理&#xff1a; 创建配置目…

【机器学习】穷理至极,观微知著:微积分的哲思之旅与算法之道

文章目录 微积分基础&#xff1a;理解变化与累积的数学前言一、多重积分的高级应用1.1 高维概率分布的期望值计算1.1.1 多维期望值的定义1.1.2 Python代码实现1.1.3 运行结果1.1.4 结果解读 1.2 特征空间的体积计算1.2.1 单位球体的体积计算1.2.2 Python代码实现1.2.3 运行结果…

Kubernetes可视化界面

DashBoard Kubernetes Dashboard 是 Kubernetes 集群的一个开箱即用的 Web UI&#xff0c;提供了一种图形化的方式来管理和监视 Kubernetes 集群中的资源。它允许用户直接在浏览器中执行许多常见的 Kubernetes 管理任务&#xff0c;如部署应用、监控应用状态、执行故障排查以及…

【转帖】eclipse-24-09版本后,怎么还原原来版本的搜索功能

【1】原贴地址&#xff1a;eclipse - 怎么还原原来版本的搜索功能_eclipse打开类型搜索类功能失效-CSDN博客 https://blog.csdn.net/sinat_32238399/article/details/145113105 【2】原文如下&#xff1a; 更新eclipse-24-09版本后之后&#xff0c;新的搜索功能&#xff08;CT…