Kafka-Connect

一、概述

Kafka Connect是一个在Apache Kafka和其他系统之间可扩展且可靠地流式传输数据的工具。细心的你会发现,我们编写的producer、consumer都有很多重复的代码,KafkaConnect就是将这些通用的api进行了封装。让我们可以只关心业务部分(数据的处理逻辑)。它可以以配置定义的方式实现数据的生产和消费。Kafka Connect可以将整个数据库或所有应用服务器的指标收集到Kafka主题中,使数据能够以低延迟进行流处理。导出作业可以将数据从Kafka主题传递到辅助存储和查询系统,或传递到批处理系统进行离线分析。

我们写大数据程序的时候,只写数据的处理逻辑就能实现分布式的并行计算,是因为MapReduce、Spark这些计算框架的优秀设计。把输入格式化、节点间Shuffle性能优化、任务拆解等都抽离出来,我们只需要继承Mapper、Reducer接口或者使用算子就可以了。Kafka Connect也是这样的设计思想,你就把我当作一个消息队列工具,我给你封装好,你按照我们的规范处理数据就可以了。

Kafka Connect功能如下:

1、通用框架

        Kafka Connect标准化了其他数据系统与Kafka的集成,简化了连接器的开发、部署和管理

2、Distributed 和 standalone 模式

        扩展到支持整个组织的大型集中管理服务,或扩展到开发、测试和小型生产部署

3、REST 接口

        通过易于使用的REST API向Kafka Connect集群提交和管理连接器

4、offset 自动管理

        只需来自连接器的一点信息,Kafka Connect就可以自动管理偏移提交过程,因此连接器开发人员无需担心连接器开发中这个容易出错的部分

5、可扩展

        Kafka Connect建立在现有的组管理协议之上。可以添加更多工作人员来扩展Kafka Connect集群。

6、流/批处理集成

        利用Kafka的现有功能,Kafka Connect是桥接流和批处理数据系统的理想解决方案

二、使用方法

1、运行Kafka Connect

Kafka Connect目前支持两种执行模式:standalone (单进程)和distributed

standalone 模式

在独立模式下,所有工作都在单个进程中执行。该模式容易配置和上手,在有些场景下也很适合(例如收集日志文件),但它不能受益于Kafka Connect的某些功能,例如容错。您可以使用以下命令启动独立进程:

bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.json …]
参数配置:

第一个参数(也就是connector1.properties)是worker的配置。这包括诸如Kafka连接参数、序列化格式以及提交偏移的频率等设置。config/server.properties提供了默认配置运行的本地集群。它需要调整以用于不同的配置或生产部署。所有worker(独立和分布式)都需要一些配置:

  • bootstrap.servers-用于引导连接Kafka的服务器列表
  • key.converter-序列化key的转换器类。常见格式的示例包括JSON和Avro。
  • value.converter-序列化value的转换器类。常见格式的示例包括JSON和Avro。
  • plugin.path(默认为empty)-包含Connect插件(连接器、转换器、转换)的路径列表。在运行快速启动之前,用户必须添加包含示例FileStreamSourceConnector的绝对路径,

以下是standalone 模式特有的重要配置:

  • offset.storage.file.filename-存储源连接器偏移量的文件

此处配置的参数旨在供Kafka Connect用于访问配置、偏移量和状态主题的生产者和消费者使用。对于Kafka源任务使用的生产者和Kafka接收器任务使用的消费者的配置,可以使用相同的参数,但需要分别以producer.consumer.为前缀。从工作配置中继承的唯一没有前缀的Kafka客户端参数是bootstrap.servers

从2.3.0开始,客户端配置覆盖可以通过使用前缀producer.override.consumer.override.分别为Kafka源或Kafka接收器单独配置。这些覆盖包含在连接器的其余配置属性中。

distributed 模式

分布式模式处理work的自动平衡,允许动态扩展(或缩减),并在活动任务以及配置和偏移提交数据中提供容错。执行与独立模式非常相似:

bin/connect-distributed.sh config/connect-distributed.properties

区别在于启动的类和配置参数,这些参数改变了Kafka Connect进程决定在哪里存储配置、如何分配工作以及在哪里存储偏移量和任务状态的方式。在分布式模式下,Kafka Connect将偏移量、配置和任务状态存储在Kafka主题中。建议手动创建偏移量、配置和状态的主题,以实现所需的分区数和复制因子。如果启动Kafka Connect时尚未创建主题,则将使用默认的分区数和复制因子自动创建主题(一般不怎么做)。

参数配置:

在启动集群之前需要设置的重要参数:

  • group.id(默认connect-cluster)-集群的唯一名称,用于形成Connect集群组;请注意,这不得与消费者组ID相冲突
  • config.storage.topic(默认connect-configs)-用于存储连接器和任务配置的topic;请注意,这应该是单个分区、高度复制、压缩的topic。可能需要手动创建topic以确保正确的配置,因为自动创建的topic可能有多个分区或自动配置为删除而不是压缩
  • offset.storage.topic(默认connect-offsets)-用于存储偏移量的topic;此topic应该有许多分区,可以复制,并配置为压缩
  • status.storage.topic(默认connect-status)-用于存储状态的topic;这个topic可以有多个分区,应该复制和配置以进行压缩

请注意,在分布式模式下,连接器配置不会在命令行上传递。相反,使用下面描述的REST API来创建、修改和销毁连接器

2、配置Connectors

连接器配置是简单的键值映射。在独立模式和分布式模式下,它们都包含在创建(或修改)连接器的REST请求的JSON有效负载中。在独立模式下,这些也可以在属性文件中定义并通过命令行传递给Connect进程。

以下是一些常见选项:

  • name-连接器的唯一名称。尝试使用相同的名称再次注册将失败。
  • connector.class-连接器的Java类
  • tasks.max-应为此连接器创建的最大任务数。如果连接器无法实现此并行级别,它可能会创建更少的任务。
  • key.converter-(可选)覆盖worker设置的默认key转换器。
  • value.converter-(可选)覆盖worker设置的默认value转换器。

connector.class配置支持多种格式:此连接器的类的全名或别名。如果连接器是org. apache.kafka.connect.file.FileStreamSinkConnector,则可以指定此全名或使用FileStreamSink或FileStreamSinkConnector使配置更短。

接收器连接器还有一些额外的选项来控制它们的输入。每个接收器连接器必须设置以下选项之一:

  • topics-用逗号分隔的主题列表,用作此连接器的输入
  • topics.regex-主题的Java正则表达式,用作此连接器的输入

3、转换

连接器可以配置转换以进行轻量级的Message-at-time修改。

可以在连接器配置中指定转换链:

  • transforms-转换的别名列表,指定应用转换的顺序。
  • transforms.$alias.type-转换的完全限定类名。
  • transforms.$alias.$transformationSpecificConfig转换的配置属性

4、REST API

由于Kafka Connect旨在作为服务运行,因此它还提供了用于管理连接器的REST API。此REST API在独立模式和分布式模式下都可用。可以使用listeners配置选项来配置REST API服务器。此字段应包含以下格式的侦听器列表:protocol://host:port,protocol2://host2:port2。目前支持的协议是httphttps。例如:

listeners=http://localhost:8080,https://localhost:8443

默认情况下,如果未指定listeners,则REST服务器使用HTTP协议在端口8083上运行。

REST API不仅被用户用于监控/管理Kafka Connect。在分布式模式下,它也用于Kafka Connect跨集群通信。在follower 节点REST API上接收到的一些请求将被转发到leader 节点REST API。如果给定主机可达的URI与其侦听的URI不同,配置选项rest.advertised.host.namerest.advertised.portrest.advertised.listener可用于更改follower 节点将用于与leader 连接的URI。当同时使用HTTP和HTTPS侦听器时,rest.advertised.listenerssl.* or listeners.https 选项将用于配置HTTPS客户端。

以下是当前支持的REST API端点:

  • GET /connectors-返回活动连接器列表
  • POST /connectors-创建一个新的连接器;请求正文应该是一个JSON对象,其中包含一个字符串name字段和一个带有连接器配置参数的对象config字段。JSON对象还可以选择包含一个字符串initial_state字段,该字段可以采用以下值-STOPPEDPAUSEDRUNNING(默认值)
  • GET /connectors/{name}-获取有关特定连接器的信息
  • GET /connectors/{name}/config-获取特定连接器的配置参数
  • PUT /connectors/{name}/config-更新特定连接器的配置参数
  • PATCH /connectors/{name}/config-修补特定连接器的配置参数,其中JSON正文中的null表示从最终配置中删除键
  • GET /connectors/{name}/status-获取连接器的当前状态,包括是否正在运行、失败、暂停等,分配给哪个工作人员,失败时的错误信息,以及所有任务的状态
  • GET /connectors/{name}/tasks-获取当前为连接器运行的任务列表及其配置
  • GET /connectors/{name}/tasks-config-获取特定连接器的所有任务的配置。此端点已弃用,将在下一个主要版本中删除。请改用GET /connectors/{name}/tasks端点。注意两个端点的响应结构略有不同,
  • GET /connectors/{name}/tasks/{taskid}/status-获取任务的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个工作人员,以及失败时的错误信息
  • PUT /connectors/{name}/pause-暂停连接器及其任务,这将停止消息处理,直到连接器恢复。其任务声明的任何资源都被分配,这允许连接器在恢复后快速开始处理数据。
  • PUT /connectors/{name}/stop-停止连接器并关闭其任务,释放其任务声明的任何资源。从资源使用的角度来看,这比暂停连接器更有效,但可能会导致连接器在恢复后需要更长时间才能开始处理数据。请注意,如果连接器处于停止状态,则只能通过偏移管理端点修改连接器的偏移量
  • PUT /connectors/{name}/resume-恢复暂停或停止的连接器(如果连接器没有暂停或停止,则不执行任何操作)
  • POST /connectors/{name}/tasks/{taskId}/restart-重新启动单个任务(通常是因为它失败了)
  • DELETE /connectors/{name}-删除连接器,停止所有任务并删除其配置
  • GET /connectors/{name}/topics-获取特定连接器自创建连接器或发出重置其活动主题集的请求以来正在使用的主题集
  • PUT /connectors/{name}/topics/reset-发送清空连接器活动主题集的请求

Kafka Connect还提供了一个REST API来获取有关连接器插件的信息:

  • GET /connector-plugins-返回安装在Kafka Connect集群中的连接器插件列表。请注意,API仅检查处理请求的工作人员上的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果您添加新的连接器jar
  • GET /connector-plugins/{plugin-type}/config-获取指定插件的配置定义。
  • PUT /connector-plugins/{connector-type}/config/validate-根据配置定义验证提供的配置值。此API执行每个配置验证,在验证期间返回建议值和错误消息。

以下是顶级(根)端点支持的REST请求:

  • GET /-返回有关Kafka Connect集群的基本信息,例如服务于REST请求的Connect工作程序版本(包括源代码的git提交ID)和连接到的Kafka集群ID。

可以使用admin.listeners配置在Kafka Connect的REST API服务器上配置管理员REST API

admin.listeners=http://localhost:8080,https://localhost:8443

默认情况下,如果未配置admin.listeners,则管理员REST API将在常规侦听器上可用。

以下是当前支持的管理员REST API端点:

  • GET /admin/loggers-列出明确设置了级别的当前记录器及其日志级别
  • GET /admin/loggers/{name}-获取指定记录器的日志级别
  • PUT /admin/loggers/{name}-设置指定记录器的日志级别

5、错误报告

Kafka Connect提供错误报告来处理在处理的各个阶段遇到的错误。默认情况下,在转换期间或转换中遇到的任何错误都将导致连接器失败。每个连接器配置还可以通过跳过它们来允许容忍此类错误,可选地将每个错误以及失败操作的详细信息和有问题的记录(具有不同的详细级别)写入Connect应用程序日志。当接收器连接器处理从其Kafka主题消耗的消息时,这些机制还会捕获错误,并且所有错误都可以写入可配置的“死信队列”(DLQ)Kafka主题。

要向日志报告连接器转换器、转换或接收器连接器本身中的错误,请在连接器配置中设置errors.log.enable=true以记录每个错误和问题记录的主题、分区和偏移量的详细信息。出于其他调试目的,请设置errors.log.include.messages=true以将问题记录键、值和标头记录到日志中(请注意,这可能会记录敏感信息)。

要在连接器的转换器内报告错误,请将接收器连接器本身转换为死信队列主题,设置errors.deadletterqueue.topic.name,并可选地errors.deadletterqueue.context.headers.enable=true

默认情况下,连接器在出现错误或异常时立即表现出“快速故障”行为。这相当于将以下配置属性及其默认值添加到连接器配置中:

# 禁用失败重试
errors.retry.timeout=0

# 不记录错误及其上下文
errors.log.enable=false

# 不要在死信队列主题中记录错误
errors.deadletterqueue.topic.name=

# 第一次出错失败
errors.tolerance=none

可以更改这些和其他相关的连接器配置属性以提供不同的行为。例如,可以将以下配置属性添加到连接器配置中,以通过多次重试来设置错误处理、记录到应用程序日志和my-connector-errorsKafka主题,并通过报告错误而不是连接器任务失败来容忍所有错误:

# 重试最多10分钟,连续失败之间最多等待30秒
errors.retry.timeout=600000
errors.retry.delay.max.ms=30000

# 将错误上下文与应用程序日志一起记录,但不包括配置和消息
errors.log.enable=true
errors.log.include.messages=false

# 在Kafka主题中生成错误上下文
errors.deadletterqueue.topic.name=my-connector-errors

# 容忍所有错误。
errors.tolerance=all

6、支持精确一次语义

Kafka Connect能够为接收器连接器(从版本0.11.0开始)和源连接器(从版本3.3.0开始)提供精确一次语义学。请注意,对精确一次语义学的支持高度依赖于您运行的连接器类型。即使您在配置中为集群中的每个节点设置了所有正确的工作属性,如果连接器不是为Kafka Connect框架设计的,或者不能利用Kafka Connect框架的功能,精确一次可能是不可能的。

Sink connectors

如果接收器连接器支持精确一次语义学,要在Connect工作级别启用精确一次,您必须确保其消费者组配置为忽略中止事务中的记录。您可以通过将工作属性consumer.isolation.level设置为read_committed来做到这一点,或者,如果运行支持它的Kafka Connect版本,则使用连接器客户端配置覆盖策略,该策略允许在单个连接器配置中将consumer.override.isolation.level属性设置为read_committed。没有额外的ACL要求。

Source connectors

如果源连接器支持精确一次语义学,则必须配置Connect群集以启用对精确一次源连接器的框架级支持。如果针对安全的Kafka群集运行,可能需要额外的ACL。请注意,对Source connectors的精确一次支持目前仅在分布式模式下可用;独立的Connect工作人员不能提供精确一次语义学。

Worker 配置

对于新的Connect群集,在群集中每个节点的工作配置中将exactly.once.source.support属性设置为enabled。对于现有群集,需要两次滚动升级。在第一次升级期间,exactly.once.source.support属性应设置为preparing,在第二次升级期间,应设置为enabled

ACL要求

启用了精确一次源支持,或者exactly.once.source.support设置为preparing,每个Connect工作人员的主体将需要以下ACL:

操作资源类型资源名称注意
WriteTransactionalIdconnect-cluster-${groupId},其中${groupId}是集群的group.id
DescribeTransactionalIdconnect-cluster-${groupId},其中${groupId}是集群的group.id
IdempotentWriteCluster托管工作人员配置主题的Kafka集群的ID仅适用于在Kafka2.8之前

启用了精确一次源(但如果exactly.once.source.support设置为preparing),每个单独连接器的主体将需要以下ACL:Write、Describe、Write、Read、Describe、Create、IdempotentWrite。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/481840.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源项目:纯Python构建的中后台管理系统

来源:Python大数据分析 费弗里 大家好我是费老师,目前市面上有很多开源的「中后台管理系统」解决方案,复杂如「若依」那种前端基于Vue,后端基于Java的框架,虽然其提供了较为完善的一整套前后端分离权限管理系统解决方…

Power BI - Connect to SharePoint online list with Image column

1.简单介绍 当前SharePoint online list有modern和classic两种模式,现在使用modern模式的比较多。list中有Image类型的列,Power BI如何连接到SharePoint list并显示image呢 note, SharePoint list中的Image列,Lookup列,People列…

DroneCAN 最新开发进展,Andrew在Ardupilot开发者大会2024的演讲

本文是Andrew演讲的中文翻译,你可以直接观看视频了解演讲的全部内容,此演讲视频的中文版本已经发布在Ardupilot社区的Blog板块,你可以在 Arudpilot官网(https://ardupilot.org) 获取该视频: 你也可以直接通过Bilibili链…

DataWhale—PumpkinBook(TASK07支持向量机)

课程开源地址及相关视频链接:(当然这里也希望大家支持一下正版西瓜书和南瓜书图书,支持文睿、秦州等等致力于开源生态建设的大佬✿✿ヽ(▽)ノ✿) Datawhale-学用 AI,从此开始 【吃瓜教程】《机器学习公式详解》(南瓜…

基于Python制作一个简易UI界面

基于Python制作一个简易UI界面 目录 基于Python制作一个简易UI界面1 原理简介2 编写程序3 程序测试 1 原理简介 这里用到了Python自带的UI库tkinter。 tkinter 是 Python 的标准 GUI(图形用户界面)库,用于创建和管理图形界面。它提供了一个简…

【electron-vite】搭建electron+vue3框架基础

一、拉取项目 electron-vite 中文文档地址: https://cn-evite.netlify.app/guide/ 官网网址:https://evite.netlify.app/ 版本 vue版本:vue3 构建工具:vite 框架类型:Electron JS语法:TypeScript &…

操作无法完成,因为其中的文件夹或文件已在另一程序中打开 请关闭该文件夹或文件,然后重试。>>怎么删除被打开的文件

出现这种弹窗是不是很烦人, 也很烦我, 今天就了结了它 我们可以使用一款命令行工具来查看哪些软件正在占用这个文件, 把这些使用文件的软件进程都关闭就可以了 解决办法: 1.下载命令行工具handle 打开浏览器,访问 Sysinternals 官方网站的 Handle 页面, 在页面上…

修改IDEA配置导致Spring Boot项目读取application.properties中文乱码问题

之前很多配置都是放在nacos里面,然后这次同事有个配置写在application.properties中,这个配置含有中文,启动之后发现拿到的中文值会乱码,然后就帮忙看了一下问题。 排查问题 经过不停的百度、排查发现,spring读取app…

常用端口与Udp协议

目录 1.再谈端口 1.1 五元组 1.2 端口号范围划分 1.3 两个指令 1.3.1 netstat 1.3.2 pidof 2.UDP协议 2.1 协议整体格式 2.2 udp特点 2.3 udo缓冲区 1.再谈端口 1.1 五元组 端口号表示了一个主机上进行通信的不同的应用程序;在Tcp/IP协议中,用…

webpack(react)基本构建

文章目录 概要整体架构流程技术名词解释技术细节小结 概要 Webpack 是一个现代 JavaScript 应用程序的静态模块打包工具。它的主要功能是将各种资源(如 JavaScript、CSS、图片等)视为模块,并将它们打包成一个或多个输出文件,以便…

MATLAB期末复习笔记(中)

三、MATLAB函数和程序结构 1.MATLAB文件 两种类型的M文件: • 脚本 ,不接受输入参数或返回输出参数。它们处理工作区中的数据。 • 函数 ,可接受输入参数,并返回输出参数。内部变量是函数的局部变量。 ① 函数文件是另一类 m 文…

Mouser EDI 需求分析

为了提高供应链的自动化水平,贸泽电子(Mouser Electronics)使用EDI技术更好地管理与其全球合作伙伴之间的业务数据往来。对接Mouser EDI,对于企业而言,需要在本地部署EDI软件,建立与Mouser之间的EDI连接通道…

[免费]SpringBoot+Vue景区订票(购票)系统【论文+源码+SQL脚本】

大家好,我是java1234_小锋老师,看到一个不错的SpringBootVue大景区订票(购票)系统,分享下哈。 项目视频演示 【免费】SpringBootVue景区订票(购票)系统 Java毕业设计_哔哩哔哩_bilibili 项目介绍 现代经济快节奏发展以及不断完善升级的信息…

GitLab的使用

文章目录 一、什么是GitLab、有什么用、与Jenkins的区别什么是GitLab及其用途GitLab与Jenkins的区别GitLab的CI/CD功能介绍 二、GitLab的安装与配置Linux下GitLab的安装*Linux下GitLab的简单使用 /etc/gitlab/gitlab.rb 的配置GitLab服务器的域名邮箱配置功能优化关闭一些暂时不…

通信与网络基础

1.网络通信基本概念 通信:人、物通过某种介质和行为进行信息传递与交流 网络通信:终端设备之间通过计算机网络进行通信 两个终端通过网线传递文件 多个终端通过路由器传递文件 终端通过Internet下载文件 2.信息传递过程 图1-1 假定A计算机访问B的web…

RAT:融合RAG和CoT的高效多步推理任务策略

今天分享的是由北京大学、加州大学洛杉矶分校和北京通用人工智能研究院合作发表的一篇文章 论文题目:RAT: Retrieval Augmented Thoughts Elicit Context-Aware Reasoning in Long-Horizon Generation 论文链接:https://arxiv.org/pdf/2403.05313 代码地址:https://githu…

应急响应靶机——Windows挖矿事件

载入虚拟机,开启虚拟机: (账户密码:administrator/zgsf123) 发现登录进去就弹出终端界面,自动运行powshell命令,看来存在计划任务,自动下载了一些文件,之后就主动结束退…

构网型与跟网型混合直驱风电场并网稳定域研究

传统的风机变流器控制采用跟网型(grid-following,GFL)控制,需依赖于锁相环跟踪电网电压的频率/相位信息,以实现与电网的同步。随着能源电力系统的转型,电网逐渐转变为呈现低短路比(short-circuitratio,SCR&…

带外配置IP

要想了解带内,私下我 管理IP:9.101.8.20 掩码:255.0.0.0 网关:9.101.0.254 1 首先自己电脑要修改ip 192.168.70.x 段 2 在cmd 去ping 192.168.70.125 必须通 3 去浏览器 登录192.168.70.125 4 更改ip 5 再次修改电脑IP 网关 掩码 7 检测…

设计模式---建造者模式

建造者模式 一种创建型设计模式,它允许你一步一步地构建复杂对象。通过使用建造者模式,你可以将对象的构建过程与其表示分离,使得同样的构建过程可以创建不同的表示。说白点就是,解决了构造函数创建对象的问题。 适用于那种构造函…