大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:

  • 消费组测试,消费者变动对消费的影响
  • 消费者的心跳机制
  • 消费者的相关配置参数

在这里插入图片描述

主题和分区

  • Topic:Kafka用于分类管理消息的逻辑单元,类似于MySQL的数据库
  • Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以进行水平扩展,通常Partition的数量是BrokerServer数量的整数倍
  • ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部消息。在消息组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。
  • Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制消费的速度。
    在这里插入图片描述

反序列化

  • Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交由给用户程序消费。
  • 消费者的反序列化器包括Key和Value。

自定义反序列化

如果要实现自定义的反序列化器,需要实现 Deserializer 接口:

public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {Deserializer.super.configure(configs, isKey);}@Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer buffer = ByteBuffer.allocate(data.length);buffer.put(data);buffer.flip();int userId = buffer.getInt();int usernameLen = buffer.getInt();String username = new String(data, 8, usernameLen);int passwordLen = buffer.getInt();String password = new String(data, 8 + usernameLen, passwordLen);int age = buffer.getInt();User user = new User();user.setUserId(userId);user.setUsername(username);user.setPassword(password);user.setAge(age);return user;}@Overridepublic User deserialize(String topic, Headers headers, byte[] data) {return Deserializer.super.deserialize(topic, headers, data);}@Overridepublic void close() {Deserializer.super.close();}
}

消费者拦截器

消费者在拉取了分区消息之后,要首先经过反序列化器对Key和Value进行反序列化操作。
消费端定义消息拦截器,要实现 ConsumerInterceptor接口:

  • 一个可插拔的接口,允许拦截、更改消费者接收到的消息,首要的用例在于将第三方组件引入消费者应用程序,用于定制监控、日志处理等
  • 该接口的实现类通过configure方法获取消费者配置的属性,如果消费者配置中没有指定ClientID,还可以获取KafkaConsumer生成的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。
  • ConsumerInterceptor方法抛出异常会被捕获,但不会向下传播,如果配置了错误的参数类型,消费者不会抛出异常而是记录下来。
  • ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("=== 消费者拦截器 01 onConsume ===");return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {System.out.println("=== 消费者拦截器 01 onCommit ===");}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("消费者设置的参数");configs.forEach((k, v) -> {System.out.println(k + ", " + v);});}
}

位移提交

相关概念

  • Consumer 需要向Kafka记录自己的位移数据,这个汇报过程称为:提交位移(Committing Offsets)
  • Consumer 需要为分配给它的每个分区提交各自的位移数据
  • 位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中
  • 位移提交:自动提交和手动提交
  • 位移提交:同步提交和异步提交

自动提交

Kafka Consumer后台提交

  • 开启自动提交 enable.auto.commit=true
  • 配置启动提交间隔:auto.commit.interval.ms,默认是5秒

位移顺序

自动提交位移的顺序:

  • 配置 enable.auto.commit=true
  • Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的
  • 因此自动提交不会出现消息丢失,但是会重复消费

重复消费

重复消费的场景:

  • Consumer设置5秒提交offset
  • 假设提交offset后3秒发生了Rebalance
  • Rebalance之后所有的Consumer从上一次提交的Offset的地方继续消费
  • 因为Rebalance发生前3秒的内的提交就丢失了

异步提交

  • 使用 KafkaConsumer#commitSync,会提交所有poll返回的最新Offset
  • 该方法为同步操作 等待直到 offset 被成功提交才返回
  • 手动同步提交可以控制offset提交的时机和频率

位移管理

Kafka中,消费者根据消息的位移顺序消费消息,消费者的位移由消费者者管理,Kafka提供了消费者的API,让消费者自行管理位移。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

重平衡

重平衡可以说是Kafka中诟病最厉害的一部分。
重平衡是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中每一个分区。
比如一个Topic中有100个分区,一个消费组内有20个消费者,在协调者的控制下可以让每一个消费者能分配到5个分区,这个分配过程就是重平衡。

重平衡的出发条件主要有三个:

  • 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
  • 主题的分区数发生变化,Kafka目前只能增加分区数,当增加的时候就会触发重平衡
  • 订阅的主题发生变化,当消费组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡

为什么说重平衡让人诟病呢?因为重平衡过程中,消费者无法从Kafka消费消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会很久。

避免重平衡

要完全避免重平衡做不到,但是要尽量避免重平衡。
在分布式系统中,由于网络问题没有接收到心跳,此时不确认是挂了还是负载没过来还是网络阻塞。

  • session.timeout.ms 规定超时时间是多久
  • heartbeat.interval.ms 规定心跳的频率 越高越不容易误判 但是会消耗更多资源
  • max.poll.interval.ms 消费者poll数据后,需要处理在进行拉取,如果两次拉取时间超过间隔就会被剔除,默认是5分钟。

这里给出一些推荐参数的配置:

  • session.timeout.ms 设置为6秒
  • heaertbeat.interval.ms 设置2秒
  • max.poll.interval.ms 推荐消费者处理消息最长耗时再加1分钟

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/390108.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python酷库之旅-第三方库Pandas(062)

目录 一、用法精讲 241、pandas.Series.view方法 241-1、语法 241-2、参数 241-3、功能 241-4、返回值 241-5、说明 241-6、用法 241-6-1、数据准备 241-6-2、代码示例 241-6-3、结果输出 242、pandas.Series.compare方法 242-1、语法 242-2、参数 242-3、功能 …

最新小猫咪PHP加密系统源码V1.4_本地API接口_带后台

简介&#xff1a; 最新小猫咪PHP加密系统源码V1.4_完全本地化加密API接口_带后台 小猫咪PHP加密系统历时半年&#xff0c;它再一次迎来更新&#xff0c;更新加密算法&#xff08;这应该是最后一次更新加密算法了&#xff0c;以后主要更新都在框架功能上面了&#xff09;&…

Python 爬虫项目实战(一):破解网易云 VIP 免费下载付费歌曲

前言 网络爬虫&#xff08;Web Crawler&#xff09;&#xff0c;也称为网页蜘蛛&#xff08;Web Spider&#xff09;或网页机器人&#xff08;Web Bot&#xff09;&#xff0c;是一种按照既定规则自动浏览网络并提取信息的程序。爬虫的主要用途包括数据采集、网络索引、内容抓…

代码随想录27天|贪心

455.分发饼干 代码随想录 第一想法 将孩子胃口值g[i] 按从小到达的顺序排列&#xff0c;饼干尺寸也按照从小到大的顺序去排列。 优先将大尺寸喂给大胃口孩子。如果满足不了胃口那么久试着分给下一个孩子。 要尽量满足更多的孩子&#xff0c;那么大尺寸的饼干就不能喂给小胃口…

【多线程】线程状态与并发三大特性的细节剖析

这篇文章主要用于对于多线程的一些查缺补漏。 一、 线程的状态 1&#xff0c;操作系统层面&#xff0c;线程的5种状态 关于线程有几种状态&#xff0c;有多种说法&#xff0c;5、6、7都有。 首先对于操作系统来说&#xff0c;只有5种状态&#xff0c;状态如下新建&#xff…

浅谈KMP算法(c++)

目录 前缀函数应用【模板】KMP题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示样例 1 解释数据规模与约定 思路AC代码 本质不同子串数 例题讲解[NOI2014] 动物园题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示思路AC代码 [POI2006] OKR-Periods of …

智慧水务项目(一)django(drf)+angular 18 通过pycharm建立项目

一、环境准备 windows 10 pycharm python3.11 二、pycharm 创建django项目 三、建立requirements.txt 在根目录创建requirements.txt,也就是与manage.py同一目录下&#xff0c;先放下面几个依赖 Django djangorestframeworkpip install -r .\requirements.txt 更新下pip python…

ShardingSphere实战(1)- 分库分表基础知识

一、为什么要分库分表 分库分表是一种数据库优化策略&#xff0c;主要用于解决大型应用或高并发场景下数据库性能瓶颈的问题。具体来说&#xff0c;分库分表可以带来以下好处&#xff1a; 提高性能&#xff1a; 减少单个数据库实例的负载&#xff0c;避免单点性能瓶颈。当数据…

【香橙派系列教程】(五)Linux的热拔插UDEV机制

【五】Linux的热拔插UDEV机制 在上一篇中我们发现&#xff0c;当手机接入开发板时&#xff0c;系统并不认识&#xff0c;当我们在/etc/udev目录下创建一个规则后&#xff0c;就可以通过adb访问到手机了&#xff0c;这里到底是怎么回事&#xff1f; 文章目录 【五】Linux的热拔插…

武汉流星汇聚:亚马逊平台消费者众多,助力中国卖家销售额大幅增长

在全球电商的浩瀚星空中&#xff0c;亚马逊凭借其庞大的消费者规模和强大的市场影响力&#xff0c;为无数商家特别是中国卖家提供了前所未有的发展机遇。近年来&#xff0c;越来越多的中国卖家选择通过亚马逊平台&#xff0c;将优质产品直接送达全球消费者的手中&#xff0c;并…

精选3款国内wordpress 主题,建站首选

WordPress作为一款功能强大且易于使用的建站平台&#xff0c;已经成为了许多企业和个人搭建网站的首选。为了帮助大家更好地选择适合自己的WordPress主题&#xff0c;小编将为大家推荐三款国内优秀的WordPress主题&#xff1a;子比主题、OneNav主题和RiTheme主题。 1.子比主题…

JavaScript ES6语法详解(下)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;我是码喽的自我修养&#xff01;今天给大家分享JavaScript ES6语法详解(下)&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到带大家&#xff0c;欢迎收藏关注…

Qt Creator 与 ESP-IDF QEMU 模拟器使用指南

标题: Qt Creator 与 ESP-IDF QEMU 模拟器使用指南 概要: 本文为开发者提供了使用 Qt Creator 和 ESP-IDF QEMU 模拟器进行 ESP32 开发的详细指南&#xff0c;包括环境准备、项目创建和编译、模拟器设置、编程和调试等方面的内容。通过本指南&#xff0c;可以快速上手 Qt Crea…

Learning vtkjs之Calculator

过滤器 公式计算器 Calculator 介绍 The Calculator filter is a fast way to add derived data arrays to a dataset. These arrays can be defined over points, cells, or just field data that is “uniform” across the dataset (i.e., constant over all of space). Va…

手把手教你实现日期类

目录 前言 1.头文件的实现 2.日期类函数各项功能实现 2.1 初始化和打印&#xff08;比较简单&#xff09; 2.2日期大小判断 2.3日期的加减运算 3.日期类的输入输出 4.测试代码参考 结束语 前言 前面我们讲解了类的对象的大部分知识&#xff0c;例如拷贝构造&#xff0c…

优化数据处理效率,解读 EasyMR 大数据组件升级

EasyMR 作为袋鼠云基于云原生技术和 Hadoop、Hive、Spark、Flink、Hbase、Presto 等开源大数据组件构建的弹性计算引擎。此前&#xff0c;我们已就其展开了多方位、多角度的详尽介绍。而此次&#xff0c;我们成功接入了大数据组件的升级和回滚功能&#xff0c;能够借助 EasyMR …

乐乐音乐Kotlin版

简介 乐乐音乐Kotlin版&#xff0c;主要是基于ExoPlayer框架开发的Android音乐播放器&#xff0c;它支持lrc歌词和动感歌词(ksc歌词、krc歌词、trc歌词、zrce歌词和hrc歌词等)、多种格式歌词转换器及制作动感歌词、翻译歌词和音译歌词。 编译环境 Android Studio Jellyfish | …

canvas-视频绘制

通过Canvas元素来实时绘制一个视频帧&#xff0c;并在视频帧上叠加一个图片的功能可以当作水印。 获取Canvas元素&#xff1a; let canvas document.getElementById(canvas) 通过getElementById函数获取页面中ID为canvas的Canvas元素&#xff0c;并将其存储在变量canvas中。 …

【C++】C++11(可变参数模板、lambda表达式、包装器)

文章目录 1. 可变参数模板1.1 介绍1.2 emplace系列接口实现 2. lambda表达式2.1 语法介绍2.2 原理 3. 包装器4. bind 1. 可变参数模板 1.1 介绍 可变参数我们在C语言阶段已经了解过了&#xff0c;C语言中叫做可变参数列表&#xff0c;其中使用 ... 代表可变参数。 C语言中的可…

【给嵌入式新人的几条建议(共勉):三-C语言基础怎么补?】

给嵌入式新人的几条建议&#xff08;共勉&#xff09;&#xff1a;三-C语言基础怎么补&#xff1f; 前言1、先回答一个问题&#xff0c;对C语言的害怕到底在哪&#xff1f;&#xff08;纠正认知&#xff09;2、C语言基础&#xff0c;要补全部吗&#xff1f;No2.1 先看下自己属于…