如何为 Elasticsearch 创建自定义连接器

了解如何为 Elasticsearch 创建自定义连接器以简化数据摄取过程。

作者:JEDR BLASZYK

Elasticsearch 拥有一个摄取工具库,可以从多个来源获取数据。 但是,有时你的数据源可能与 Elastic 现有的提取工具不兼容。 在这种情况下,你可能需要创建自定义连接器以将数据与 Elasticsearch 连接。

在你的应用程序中使用 Elastic 连接器有多种原因。 例如,你可能想要:

  • 将数据从自定义或遗留应用程序引入 Elasticsearch
  • 为你的组织数据引入语义搜索
  • 从 PDF、MS Office 文档等文件中提取文本内容
  • 使用 Kibana UI 管理你的数据源(包括配置、过滤规则、设置定期同步计划规则)
  • 你想要在自己的基础设施上部署 Elastic 连接器(一些 Elastic 支持的连接器可作为 Elastic Cloud 中的本机连接器使用)

用于创建定制连接器的开放代码框架

如果创建你自己的连接器是满足你需求的解决方案,那么连接器框架将帮助你创建一个。 我们创建的框架是为了支持创建自定义连接器并帮助用户将独特的数据源连接到 Elasticsearch。 连接器的代码可在 GitHub 上找到,并且我们有可以帮助你入门的文档。

该框架设计简单且高性能。 它旨在对开发人员友好,因此它是开放代码且高度可定制的。 你创建的连接器可以在你自己的基础设施上进行自我管理。 目标是让开发人员能够轻松地将自己的数据源与 Elasticsearch 集成。

使用连接器框架之前你需要了解什么

该框架是用 async-python 编写的

有几门课程可以学习 async-python。 如果你需要推荐,我们认为这个 LinkedIn 学习课程非常好,但需要订阅。 我们喜欢的一个免费替代方案是这个。

为什么我们选择异步 Python?

摄取受 IO 限制(而非 CPU 限制),因此从资源利用的角度构建连接器时,异步编程是最佳方法。 在 I/O 密集型应用程序中,大部分时间都花在等待外部资源上,例如读取文件、发出网络请求或查询数据库。 在这些等待期间,传统的同步代码会阻塞整个程序,导致资源利用效率低下。

还有其他先决条件吗?

这不是先决条件。 在开始之前,绝对值得阅读《连接器开发人员指南》! 希望你觉得这个有用。

使用连接器框架构建定制连接器

入门很容易。 在与框架相关的术语中,我们将自定义连接器称为源。 我们通过创建一个新类来实现一个新的源,该类的职责是将文档从自定义数据源发送到Elasticsearch。

作为一种可选的入门方式,用户还可以查看此目录源 (directory source) 示例。 这是一个很好但基本的示例,可以帮助你了解如何编写自定义连接器。

步骤概要

一旦你知道要为其创建连接器的自定义数据源,以下是编写新源的步骤概述:

  • 在 connectors/sources 中添加模块或目录
  • 在 requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的方法
  • (可选,在为 repo 做出贡献时)在 connectors/sources/tests 中添加单元测试,覆盖率 +90%
  • 在源部分声明你的连接器 connectors/config.py
  • 就是这样。 我们完成了! 现在你应该能够运行连接器

在编写定制连接器之前你需要了解什么

为了使 Elasticsearch 用户能够获取数据并在该数据的基础上构建搜索体验,我们提供了一个轻量级的连接器协议。 该协议允许用户轻松获取数据、使用企业搜索功能来操作该数据并创建搜索体验,同时在 Kibana 中为他们提供无缝的用户体验。 为了与企业搜索兼容并充分利用 Kibana 中提供的连接器功能,连接器必须遵守该协议。

关于连接器协议你需要了解的内容

该文档页面提供了该协议的良好概述。 以下是你需要了解的内容:

  • 连接器和系统其他部分之间的所有通信都通过 Elasticsearch 索引异步进行
  • 连接器将其状态传达给 Elasticsearch 和 Kibana,以便用户可以为其提供配置并诊断任何问题
  • 这允许简单、开发人员友好的连接器部署。 connectors 服务是无状态的,并且不关心你的 Elastic 部署在哪里运行,只要它可以通过网络连接到它就可以正常工作。 该服务还具有容错能力,可以在重新启动或发生故障后在不同的主机上恢复操作。 一旦与 Elasticsearch 重新建立连接,它将继续正常运行。
  • 在底层,该协议使用 Elasticsearch 索引来跟踪连接器状态
    • .elastic-connectors 和 .elastic-connectors-sync-jobs (在上面链接的文档中描述)

托管自定义连接器的位置

连接器本身不依赖于 Elasticsearch,它可以托管在你自己的环境中

如果你有 Elasticsearch 部署,无论它是自我管理还是位于 Elastic Cloud 中:

  • 作为开发人员/公司,你可以为你的数据源编写自定义连接器
  • 在你自己的基础设施上管理连接器并根据你的需求配置连接器服务
  • 只要连接器可以通过网络发现 Elasticsearch,它就能够对数据建立索引
  • 作为管理员,你可以通过 Kibana 控制连接器

示例:使用连接器框架的 Google Drive 连接器

我们使用连接器框架为 Google Drive 编写了一个简单的连接器。 我们通过创建一个新类来实现新的源,该类的职责是将文档从目标源发送到 Elasticsearch。

注意:本教程与 Elastic stack 版本 8.10 兼容。 对于更高版本,请务必检查连接器发行说明以获取更新并参考 Github 存储库。

我们从具有 BaseDataSource 预期方法签名的 GoogleDriveDataSource 类开始,以配置数据源、检查其可用性(ping)并检索文档。 为了使这个连接器发挥作用,我们需要实现这些方法。

class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"@classmethoddef get_default_configuration(cls):"""Returns a dict with a default configuration"""raise NotImplementedErrorasync def ping(self):"""When called, pings the backendIf the backend has an issue, raises an exception"""raise NotImplementedErrorasync def get_docs(self, filtering=None):"""Returns an iterator on all documents present in the backendEach document is a tuple with:- a mapping with the data to index- a coroutine to download extra data (attachments)The mapping should have least an `id` fieldand optionally a `timestamp` field in ISO 8601 UTCThe coroutine is called if the document needs to be syncedand has attachments. It needs to return a mapping to index.It has two arguments: doit and timestampIf doit is False, it should return None immediately.If timestamp is provided, it should be used in the mapping.Example:async def get_file(doit=True, timestamp=None):if not doit:returnreturn {'TEXT': 'DATA', 'timestamp': timestamp,'id': 'doc-id'}"""raise NotImplementedError

这个 GoogleDriveDataSource 类是编写 Google Drive 源代码的起点。 通过执行以下步骤,你将实现与 Google Drive 同步数据所需的逻辑:

  • 我们需要将此文件添加到 connectors/sources 中
  • 设置新的连接器名称和 service_type,例如 Google Drive 作为名称,google_drive 作为服务类型 (service type)
  • 要从源获取连接器同步数据,你需要实现:
    • get_default_configuration - 此函数应返回 RichConfigurableFields 的集合。 这些字段允许你从 Kibana UI 配置连接器。 这包括传递身份验证详细信息、凭据和其他特定于源的设置。 Kibana 巧妙地呈现这些配置。 例如,如果你将某个字段标记为 "sensitive": true, Kibana 会出于安全原因屏蔽它。
    • ping - 对数据源的简单调用,验证其状态,将其视为健康检查。
    • get_docs - 此方法需要实现实际从源获取数据的逻辑。 此函数应返回一个异步迭代器,该迭代器返回一个包含以下内容的元组:(document, lazy_download),其中:
      • document - 是远程源中项目的 JSON 表示形式。 (如 name, location, table, author, size 等)
      • lazy_download - 是一个协程,用于下载框架处理的内容提取的对象/附件(例如从 PDF 文档中提取文本)

BaseDataSource 类中还有其他抽象方法。 请注意,如果你只想支持内容同步(例如从谷歌驱动器获取所有数据),则不需要实现这些方法。 它们指的是其他连接器功能,例如:

  • 文档级安全性(get_access_control、access_control_query)
  • 高级过滤规则(advanced_rules_validators)
  • 增量同步 (get_docs_incrementally)
  • 将来可能会添加其他功能

我们如何编写官方 Elasticsearch Google Drive 连接器

首先实现 BaseDataSource 类所需的方法

我们需要实现方法 get_default_configuration、ping 和 get_docs 以使连接器同步数据。 因此,让我们更深入地了解实现。

首先要考虑的是:如何与Google Drive “对话” 来获取数据?

Google 提供了官方的 python 客户端,但它是同步的,因此同步内容可能会很慢。 我们认为更好的选择是 aiogoogle 库,它提供了用异步 python 编写的完整客户端功能。 一开始这可能并不直观,但使用异步操作来提高性能非常重要。 因此,在此示例中,我们选择不使用官方谷歌库,因为它不支持异步模式。

如果你在异步框架中使用同步或阻塞代码,可能会对性能产生重大影响。 任何异步框架的核心都是事件循环。 事件循环允许通过连续轮询已完成的任务并调度新任务来并发执行异步任务。 如果引入阻塞操作,它将停止循环的执行,从而阻止它管理其他任务。 这本质上否定了异步架构提供的并发优势。

下一个问题是连接器身份验证

我们将 Google Drive 连接器验证为服务帐户。 有关身份验证的更多信息可以在这些连接器文档页面中找到。

  • 服务帐户可以使用密钥进行身份验证
  • 我们通过 Elasticsearch 中的 Kibana UI 将身份验证密钥传递给服务帐户

让我们看一下 get_default_configuration 实现,它允许最终用户传递凭证密钥,该凭证密钥将存储在索引中以在同步期间进行身份验证:

class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"{...}@classmethoddef get_default_configuration(cls):"""Get the default configuration for Google Drive.Returns:dict: Default configuration."""return {"service_account_credentials": {"display": "textarea","label": "Google Drive service account JSON","sensitive": True,"order": 1,"tooltip": "This connectors authenticates as a service account to synchronize content from Google Drive.","type": "str","value": "",},}

接下来我们来实现一个简单的 ping 方法

我们将对 google Drive api 进行简单的调用,例如 /about 端点。

对于此步骤,我们考虑 GoogleDriveClient 的简化表示。 我们的主要目标是指导你完成连接器创建,因此我们不关注 Google Drive 客户端的实现细节。 然而,最少的客户端代码对于连接器的操作至关重要,因此我们将依赖 GoogleDriveClient 类表示的伪代码。

class GoogleDriveClient(GoogleAPIClient):"""A google drive client to handle api calls made to Google Drive API."""{... google drive client implementation}async def ping(self):return await self.api_call(resource="about", method="get", fields="kind")class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"{...}@cached_propertydef google_drive_client(self):"""Initialize and return the GoogleDriveClientReturns:GoogleDriveClient: An instance of the GoogleDriveClient."""self._validate_service_account_json()json_credentials = json.loads(self.configuration["service_account_credentials"])return GoogleDriveClient(json_credentials=json_credentials)async def ping(self):"""Verify the connection with Google Drive"""try:await self.google_drive_client.ping()self._logger.info("Successfully connected to the Google Drive.")except Exception:self._logger.exception("Error while connecting to the Google Drive.")raise

异步 iterator 从 google drive 返回文件以进行内容提取

下一步是编写 get_docs 异步迭代器,该迭代器将从 Google drive 和协程返回文件以下载它们以进行内容提取。 根据个人经验,开始将 get_docs 作为一个简单的独立 python 脚本来实现并获取一些数据通常更简单。 一旦 get_docs 代码正常工作,我们就可以将其移动到数据源类。

我们看一下 api 文档,我们可以:

  • 使用文 files/list 端点通过分页迭代 drive 中的文档
  • 使用 files/get 和 files/export 下载文件(或将 google 文档导出为特定文件格式)
class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"{...}async def get_content(self, file, timestamp=None, doit=None):"""Extracts the content from a file file.Args:file (dict): Formatted file document.timestamp (timestamp, optional): Timestamp of file last_modified. Defaults to None.doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.Returns:dict: Content document with id, timestamp & text"""# Code details have been omitted here for brevity. For a complete implementation,# please refer to the connector code on GitHub.async def get_docs(self, filtering=None):"""Executes the logic to fetch Google Drive objects in an async manner.Args:filtering (optional): Advenced filtering rules. Defaults to None.Yields:dict, partial: dict containing meta-data of the Google Drive objects,partial download content function"""async for files_page in self.google_drive_client.list_files():async for file in self.prepare_files(files_page=files_page):yield file, partial(self.get_content, file)

那么这段代码中发生了什么?

  • list_files 对驱动器中的文件进行分页。
  • prepare_files 将文件元数据格式化为预期模式
  • get_content 是一个下载文件并对其内容进行 Base64 编码的协程(内容提取的兼容格式)

为了简洁起见,省略了一些代码细节。 有关完整的实现,请参阅 GitHub 上的当前连接器实现。

让我们运行连接器!

要将自定义连接器集成到框架中,你需要注册其实现。 通过在 connectors/config.py 的源部分中添加自定义连接器的条目来执行此操作。 对于 Google Drive 示例,添加内容将如下所示:

"sources": {...,"google_drive": "connectors.sources.google_drive:GoogleDriveDataSource",...
}

现在在 Kibana 界面中:

  • 转到 Search -> Indices -> Create a new index -> Use a Connector
  • 选择 Customized connector(使用自定义连接器时)
  • 配置你的连接器。 生成 Elasticsearch API 密钥和连接器 ID,并按照说明将这些详细信息放入 config.yml 中,然后启动连接器。

此时,Kibana 应该检测到您的连接器! 安排定期数据同步或只需单击 “Sync” 即可开始完全同步。

连接器可以配置为使用 Elasticsearch 的摄取管道在将数据存储到索引之前对数据执行转换。 一个常见的用例是通过机器学习丰富文档。 例如,你可以:

  • 使用文本嵌入模型分析文本字段,该模型将生成数据的密集向量表示
  • 运行文本分类以进行情感分析
  • 使用命名实体识别 (NER) 从文本中提取关键信息

同步完成后,你的数据将在搜索优化的 Elasticsearch 索引中可用。 此时,你可以深入构建搜索体验或深入分析。

你想创建并贡献一个新的连接器吗?

如果你为可能对 Elasticsearch 社区有所帮助的源创建自定义连接器,请考虑贡献它。 以下是使定制连接器成为 Elastic 支持的连接器的升级路径指南。

贡献连接器的验收标准

此外,在开始花一些时间开发连接器之前,你应该创建一个问题并寻求有关连接器及其将使用哪些库的一些初步反馈。 一旦你的连接器想法得到一些初步反馈,请确保您的项目满足一些验收标准:

  • 在 connectors/sources 中添加模块或目录
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的所有方法
  • 在 connectors/sources/tests 中添加覆盖率 +90% 的单元测试
  • 在源部分的 connectors/config.py 中声明你的连接器
  • 在 requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 对于你要添加的每个依赖项(包括间接依赖项),列出所有许可证并在补丁中提供该列表。
  • 确保你的源使用异步库。 如果不可能,请确保没有阻塞循环
  • 如果可能,请提供运行后端服务的 docker 映像,以便我们测试连接器。 如果您无法提供 Docker 映像,请提供针对在线服务运行所需的凭据。
  • 由于 Elasticsearch 分页的默认大小限制为 10k,测试后端需要返回超过 10k 的文档。 从测试后端返回超过 10k 文档将有助于测试连接器

用于测试连接器的支持工具

我们还有一些支持工具来分析连接器代码并运行性能测试。 你可以在这里找到这些资源:

  • Perf8 - 性能库和仪表板,用于分析 python 代码的质量,以评估资源利用率并检测阻塞调用
  • E-2-E 功能测试,利用 perf8 库来分析每个连接器

总结

我们希望这个博客和示例对你有用。

以下是 Elasticsearch 可用的 native connectors 和 connector clients 的完整列表。 如果你没有找到列出的数据源,是否可以创建一个自定义连接器?

以下是与本文相关的一些有用资源:

  • 连接器 GitHub 存储库和文档页面
  • 异步 Python 学习课程
  • 新的自定义连接器社区指南
  • Elastic 连接器框架的许可详细信息(在此链接中搜索 'Connector Framework')

如果你没有 Elastic 帐户,你可以随时启动试用帐户来开始!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/166353.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文件列表创建工具 Nifty File Lists mac中文版功能特色

Nifty File Lists mac是一款文件列表创建工具,全面的元数据支持,涵盖了从基本文件信息,如文件名、路径、大小、创建和修改日期等等内容。 Nifty File Lists mac功能特色 全面的 元数据支持强大的多线程元数据提取系统涵盖了从基本文件信息&a…

elasticsearch的docker安装与使用

安装 docker network create elasticdocker pull docker.elastic.co/elasticsearch/elasticsearch:8.10.4# 增加虚拟内存, 此处适用于linux vim /etc/sysctl.conf # 添加 vm.max_map_count262144 # 重新启动 sysctl vm.max_map_countdocker run --name es01 --net …

Spring定时任务@Scheduled

在 Spring 框架中,可以使用定时任务来执行周期性或延迟执行的任务。Spring 提供了多种方式来配置和管理定时任务。有Java自带的java.util.Timer类,也有强大的调度器Quartz,还有SpringBoot自带的Scheduled。 在实际应用中,如果没有…

聊聊分布式架构09——分布式中的一致性协议

目录 01从集中式到分布式 系统特点 集中式特点 分布式特点 事务处理差异 02一致性协议与Paxos算法 2PC(Two-Phase Commit) 阶段一:提交事务请求 阶段二:执行事务提交 优缺点 3PC(Three-Phase Commit&#x…

实际项目中最常用的设计模式

在软件开发领域,设计模式是一种经过验证的通用解决方案,用于解决各种常见问题。它们有助于提高代码的可维护性、可扩展性和可重用性。虽然有许多不同的设计模式,但以下是实际项目中最常用的一些: 1. 单例模式 (Singleton Pattern) 单例模式确保一个类只有一个实例,并提供…

蓝桥杯每日一题2023.10.21

后缀表达式 - 蓝桥云课 (lanqiao.cn) 题目描述 题目分析 30分解法&#xff1a;要求出最大的结果就需要加的数越大&#xff0c;减的数越小&#xff0c;以此为思路简单列举即可 #include<bits/stdc.h> using namespace std; typedef long long ll; const int N 2e5 10…

AI智能分析视频监控系统如何助力智慧民宿规范化、安全最大化?

民宿智能监控系统是一种便捷而有效的安全解决方案&#xff0c;它可以提供全面的监控和保护民宿的功能。以下为具体方案&#xff1a; 1、视频监控 安装高清摄像头覆盖民宿的关键区域&#xff0c;如大门、入口、走廊和共用区域等。这些摄像头可以实时监控&#xff0c;记录入住和…

线上Timeout waiting for connection from pool问题分析和解决方案

目录 现象 理论分析 代码分析 解决方案 方案一:直接修改pollingConnectionManager 方案二:修改HttpClient 参考 现象 线上共有5个类似服务,但是只有流量较大的服务会出现成功率的问题。 问题的表现主要是在GetFile(fileId=AgACAgUAAxkDAAEbP1JlJPxyJM82phEKhYYZYfY9…

工业RFID厂家与您分享工业生产制造的应用案例

随着科技的不断进步&#xff0c;RFID技术在工业生产制造领域的应用越来越广泛。AGV/RGV小车运输、立体仓库、生产线、物料跟踪与管理等各行业工业自动化的使用上都有着RFID的身影。为工业生产制造智能化自动化提供了助力。下面&#xff0c;为大家分享RFID技术在工业生产制造上的…

使用C#和Flurl.Http库的下载器程序

根据您的要求&#xff0c;我为您编写了一个使用C#和Flurl.Http库的下载器程序&#xff0c;用于下载凤凰网的图片。以下是一个简单的示例代码&#xff1a; using System; using Flurl.Http;namespace DownloadImage {class Program{static void Main(string[] args){string url…

Unity之ShaderGraph如何实现触电电流效果

前言 之前使用ASE做过一个电流效果的shader&#xff0c;今天我们通过ShaderGraph来实现一个电流效果。 效果如下&#xff1a; 关键节点 Simple Noise&#xff1a;根据输入UV生成简单噪声或Value噪声。生成的噪声的大小由输入Scale控制。 Power&#xff1a;返回输入A的结果…

docker运行redis镜像

很多项目会用到redis作为缓存用到项目中&#xff0c;鉴于刚了解过docker&#xff0c;今天这里用docker运行redis镜像&#xff0c;这样下载&#xff0c;安装运行&#xff0c;或者是使用后的删除都会干净&#xff0c;简单。 好了&#xff0c;第一步是先拉取镜像&#xff0c;使用d…

2023年浙大MEM考前80天上岸经验分享

时间过得真快&#xff0c;转眼间已经是十月份了。回想起去年这个时候&#xff0c;我还在为考研而感到焦虑不安。然而&#xff0c;如今我已经在浙大MEM项目学习了一个多月的时间了。在这一个月的学习过程中&#xff0c;我不仅学到了许多专业知识&#xff0c;还结识了很多志同道合…

STM32cubemx对FreeRTOS的适配(工程模板配置)

文章目录 前言一、工程的创建二、什么是CMSIS三、STM32cubemx生成的FreeRTOS工程分析总结 前言 本篇文章将带大家使用STM32cubemx对FreeRTOS进行工程模板的配置。 一、工程的创建 1.开始工程的创建&#xff1a; 2.芯片型号选择&#xff1a; 3.修改时钟为TIM8&#xff1a; …

JS学习-CryptoJS加密库

CryptoJS加密库 安装库 npm install crypto-js如下例子 对称加密 const CryptoJS require(crypto-js); //引入加密库 var str"123456" //md5加密 console.log(CryptoJS.MD5(str).toString()) var str2 CryptoJS.enc.Utf8.parse(str); //可以把字符串转成UTF-…

大数据Flink(一百):SQL自定义函数(UDF)和标量函数(Scalar Function)

文章目录 SQL自定义函数(UDF)和标量函数(Scalar Function)

只需五步,在Linux安装chrome及chromedriver(CentOS)

一、安装Chrome 1&#xff09;先执行命令下载chrome&#xff1a; wget https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm2&#xff09;安装chrome yum localinstall google-chrome-stable_current_x86_64.rpm看到下图中的Complete出现则代表安装…

微信小程序之会议OA系统首页布局搭建与Mock数据交互

目录 前言 一、Flex 布局&#xff08; 分类 编程技术&#xff09; 1、Flex布局是什么&#xff1f; 2、基本概念 3、容器的属性 3.1 flex-direction属性 3.2 flex-wrap属性 3.3 flex-flow 3.4 justify-content属性 3.5 align-items属性 3.6 align-content属性 4、项目…

CentOS | 添加普通用户并授权sudo

sudo -i adduser peter passwd peter whereis sudoers nano /etc/sudoers添加一行新用户到root组 ## Allow root to run any commands anywhere root ALL(ALL) ALL peter ALL(ALL) ALL如果提升权限后无法cd到其他目录等&#xff0c;修改 /etc/passwd 文件&…

2023CANN训练营第二季——Ascend C算子开发(入门)——基础概念

第一章 Ascend C 算子开发入门 一、基础概念 1.Ascend C 概念&#xff1a;Ascend C是CANN针对算子开发场景推出的编程语言&#xff0c;原生支持C和C标准规范&#xff0c;最大化匹配用户开发习惯;通过多层接口抽象、自动并行计算、孪生调试等关键技术&#xff0c;极大提高算子…