Kafka之消费者客户端

1、历史上的二个版本

与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:

  • 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端
  • 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端

2、必要的参数配置

  • bootstrap.servers

    用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。

    不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。

    建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。

  • group.id

    消费者隶属消费组的名称。

  • key.deserializer 和 value.deserializer

    与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。

    用来将字节数组中的key和value反序列化还原为原来的对象格式。

3、订阅主题与分区

一个消费者可以订阅一个或多个主题。

Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。

这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。

其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。

综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
在这里插入图片描述

4、消费消息

消息消费一般有两种方式:

  • 推模式:服务器主动将消息推送给消费者。
  • 拉模式:消费者主动向服务器发起请求来来取信息。

Kafka采用的消息消费模式是拉模式。

在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。

5、位移提交

在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。

对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。

对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”

通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
在这里插入图片描述
通过上图我们可以了解到如下信息:

  1. 正在消费的消息下标为3。
  2. 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
  3. 对于分区来说,下标4则作为下一个消息要写入的位置。
  4. 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。

Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。

根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
在这里插入图片描述
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。

  • 消息丢失

    假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。

  • 重复消费

    假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。

通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。

6、指定位移消息

首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?

为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:

  • latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
  • earliest:意为消费者会从起始处也就是0开始消费。
  • none:直接抛出NoOffsetForPartitionException异常。

7、再均衡

所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。

再均衡的过程中,消费组内的消费者无法读取消息。

再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。

8、消费者拦截器

我们可以通过消费者拦截器在poll返回消息之前消费位移提交之后进行一些特定的处理。

9、多线程实现

为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。

有三种多线程的实现方式:

  1. 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图: 在这里插入图片描述
  2. 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
  3. 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图: 在这里插入图片描述
    在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
    在这里插入图片描述
    基于这种实现方式提供以下两种实现方案:
    • 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
    • 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图: 在这里插入图片描述
      窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。

上一篇:Kafka之消费组与消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/454416.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

rpm 命令

rpm(Red Hat Package Manager)是 Red Hat Linux 及其衍生发行版(如 CentOS、Fedora)中用于管理软件包的系统。它允许用户安装、卸载、升级、查询和验证软件包。 一、安装软件包 (1)安装一个 RPM 软件包&a…

高并发下如何保证接口的幂等性?

前言 接口幂等性问题,对于开发人员来说,是一个跟语言无关的公共问题。本文分享了一些解决这类问题非常实用的办法,绝大部分内容我在项目中实践过的,给有需要的小伙伴一个参考。 不知道你有没有遇到过这些场景: 有时我们在填写某些form表单时,保存按钮不小心快速点了两次…

十二、【智能体】深入剖析:大模型节点的全面解读,举例说明,教你如何在扣子中嵌入代码

大模型节点 大模型节点主要分为5部分: 处理类型 单次批处理 模型类型:目前可以选择的模型有 豆包、通义千问、智谱、MinMax和Kimi输入:此时的参数可以被下面的提示词所用提示词:给大模型使用的提示词输出:经过此大模型处理后的输…

Vehicle Spy3.9如何新建工程—总览

1:写作目的 学习和精通SPY的使用,对于spy,目前主要是通用系用的比较多,本身spy的生产厂家英特佩斯也是美国的公司,除了软件自带教程。中文网上很少能找到相关的中文教程。 故写下这篇文章,帮助自己和大家…

Ubuntu(22.04)本地部署Appsmith

Ubuntu(22.04)安装Appsmith 简要介绍 Appsmith 是一个开源的低代码开发平台,旨在帮助开发者和非开发者快速构建定制化的内部应用程序和管理工具。通过直观的拖拽界面和丰富的预配置组件,Appsmith 让用户无需编写大量代码即可创建…

软件工程的学习之详细绪论

软件的定义 软件是程序和所有使程序正确运行所需要的相关文档和配置信息。 Software Program Data Document 一、软件危机: 软件开发和维护过程中遇到的一系列严重问题。 二、具体表现: 1、产品不符合用户的实际需要; 2、软件开发生产率…

Sigrity 共模电感的S-parameter仿真数据导入

下载S4P参数 https://ds.murata.co.jp/simsurfing/cmcc.html?partnumbers%5B%22DLW32MH101XT2%22%5D&oripartnumbers%5B%22DLW32MH101XT2L%22%5D&rgearjomoqke&rgearinfocom&md51729525489334# 下载S4P参数; DLW32MH101XT2.s4p Sigrity 使用-dif…

Mac电脑:资源库Library里找不到WebServer问题的解决

今天看到一本书里写到Windows电脑自带IIS Web服务器,好奇了一下下,mac电脑自带的又是什么服务器呢?经查询,原来是Apache服务器,这个名字我很熟悉。只是如何设置呢?我从来没用过,于是试验了一番。…

如何看待AI技术的应用前景?

人工智能:引领未来的变革力量 在当今快速变化的科技时代,人工智能(AI)作为一项前沿技术,已然成为推动全球各行各业变革的核心驱动力。随着人工智能技术的不断发展,其广泛的应用前景和深远的影响力&#xf…

Lua环境安装

软考鸭微信小程序 学软考,来软考鸭! 提供软考免费软考讲解视频、题库、软考试题、软考模考、软考查分、软考咨询等服务 Lua是一种轻量级、小巧且易于嵌入应用程序的脚本语言,广泛用于游戏开发、Web开发、自动化脚本等领域。本文将详细介绍如何在不同操作系统上安装L…

深度学习 基本函数01

np.dot 是 NumPy 库中的一个函数,用于计算两个数组的点积(也称为内积或数量积)。点积是两个向量的对应元素乘积之和。 np.random.normal 是 NumPy 库中的一个函数,用于生成符合正态分布(也称为高斯分布)的…

vue3-高德地图天气小组件

效果图 使用方法 <weather-view type"rect-solid" :borderColor"[#7ACAEC, #068BBD]"></weather-view>天气图标文件夹 本来想全弄成svg动态图片的,但找了很久都没找到对应的图(只找到了几个),于是就暂时搁置了 组件全代码如下 注意getWeat…

缓存框架JetCache源码解析-缓存定时刷新

作为一个缓存框架&#xff0c;JetCache支持多级缓存&#xff0c;也就是本地缓存和远程缓存&#xff0c;但是不管是使用着两者中的哪一个或者两者都进行使用&#xff0c;缓存的实时性一直都是我们需要考虑的问题&#xff0c;通常我们为了尽可能地保证缓存的实时性&#xff0c;都…

酒吧收银系统解决方案——未来之窗行业应用跨平台架构

一、酒吧管理数字化 1. 提高效率&#xff1a;能够快速处理订单&#xff0c;减少顾客等待时间&#xff0c;提高服务效率&#xff0c;从而提升顾客满意度。 2. 精确计费&#xff1a;准确计算酒水、小吃等各类消费项目的费用&#xff0c;避免人工计算错误导致的经济损失。 3. 库存…

vue后台管理系统从0到1(5)

文章目录 vue后台管理系统从0到1&#xff08;5&#xff09;完善侧边栏修改bug渲染header导航栏 vue后台管理系统从0到1&#xff08;5&#xff09; 接上一期&#xff0c;我们需要完善我们的侧边狼 完善侧边栏 我们在 element 组件中可以看见&#xff0c;这一个侧边栏是符合我们…

windows下Qt的安装方法

Qt Creator是个人非常喜欢的一款开发工具&#xff0c;喜欢用其来开发C和CPC平台项目&#xff0c;当然也可以用其来开发Android和Auto平台项目&#xff0c;但其现在采用离线安装&#xff0c;限于网络问题&#xff0c;安装速度非常慢。 现在介绍一种可以完成快速的安装方法。 下…

群晖通过 Docker 安装 MySQL

1. 打开 Docker 应用&#xff0c;并在注册表搜索 MySQL 2. 下载 MySQL 镜像&#xff0c;并选择版本 3. 在 Docker 文件夹中创建 MySQL&#xff0c;并创建子文件夹 4. 设置权限 5. 选择 MySQL 映像运行&#xff0c;创建容器 6. 配置 MySQL 容器 6.1 使用高权限执行容器 6.2 启…

圆周率的估算

圆周率的估算有多种方案&#xff1a; 方案一&#xff1a;无穷级数4/1 - 4/3 4/5 - 4/7 ……的和是圆周率π&#xff0c;这一无穷级数前n项的和即可估算圆周率值。 方案二&#xff1a;利用求单位正方形与内接圆面积的比例关系来求的π的近似值。单位圆的1/4面积是一个扇形&am…

Java调用大模型 - Spring AI 初体验

Spring AI&#xff1a;为Java开发者提供高效的大模型应用框架 当前Java调用大模型时面临缺乏高效AI应用框架的问题。Spring作为资深的Java应用框架提供商&#xff0c;通过推出Spring AI来解决这一挑战。它借鉴了LangChain的核心理念&#xff0c;并结合了Java面向对象编程的优势…