Kafka第四篇——生产数据总体概括,源码解析分区策略,数据收集器,Sender发送线程,key值

目录

流程图以及总体概述

拦截器

分区器以及分区计算策略

为啥进行分区计算?

producer生产者怎么知道有哪些分区?

分区计算

如何自定义实现分区器?

想说的在图里啦!宝宝!💡 ​编辑

如果key值忘记传递了呢!?

数据校验

数据收集器

注意

Sender发送线程


流程图以及总体概述

producer进行发送record,record对象包含topic,key,value,partition,时间戳,通过拦截器,将数据信息发送给broker,但是咱们也不知道把数据信息发送给哪个broker,而我们的Metadata就可以获取出来这个,如下面代码就是获取到9092.获取到缓存,放在底层。然后经过key对象的序列化,value对象的序列化,对应在代码中就是,configMap.put()那两行,并且这个是必须写的。然后经过分区器,partition,每个数据需要发送到broker中,每个消息发送到特定的主题,主题分为多个分区。kafka在发送数据时候,可以将数据发送到指定主题的指定分区,kafka会自动决定将消息发送到那个分区。分区器有那种判断发送给那个broker。然后进行数据校验。在数据收集器当中,相当于一个缓冲池,将同一个主题的数据可以存放在一个队列中,按“批”为单位进行发送,提高效率,并且指定了每批的大小是16K,

数据已经缓存到数据收集器后,就可以进行发送数据喽!此时就不会按topic为单位进行发送了,就可以重新整合,以节点为主!(why??因为不同的topic可以发送给同一个节点呀傻瓜!也就是说,在缓冲区以topic为单位,在发送线程中以节点为单位)封装请求,然后放在缓冲区中。再由网络通信从缓冲区中取出,发送给socket。在缓冲区,需要注意概念,在途请求缓冲区为5,表示同一个节点同一时间处理的请求数量。

拦截器

数据的规范化处理。可以有多个,可以按顺序执行数据的被拦截。和框架那块的一样。

onsend方法就是主要进行执行拦截规则的,for(ProducerInterceptor<K,V> interceptor:this.intercept)就可以循环执行多个拦截器,并且,看try,catch内容,无论当前拦截器发生什么异常,都不会影响到下一个拦截器的执行,更不会影响整个数据的发送。

自定义实现拦截器,帮助自己更好地了解拦截器。

java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class ValueInterceptorTest implements ProducerInterceptor<String, String> {/*** 实现拦截器规则**/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {}/*** 当记录被Broker确认接收时调用** */@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 这个方法在记录被Broker确认接收时被调用// 根据确认情况实现自定义的处理逻辑}/*** 关闭拦截器时调用*/@Overridepublic void close() {}/*** 配置拦截器时调用**configs 配置信息*/@Overridepublic void configure(Map<String, ?> configs) {}
}

分区器以及分区计算策略

为啥进行分区计算?

 数据发送给某个主题,主题会有很多分区,会在不同的broker当中,所以要算分区编号,不然连数据要发送给主题哪个节点都不知道。但是分区标号也得有范围呀!

producer生产者怎么知道有哪些分区?

从元数据缓存中获取到producer需要的主题相关信息

意味着只要元数据信息缓存了,主题的相关信息我们就可以拿到。

 分区器通过Matadata获取到分区,副本id,leadid之类的,

分区计算

¹²³⁴ 如果参数中指定了分区编号就直接返回

如何自定义实现分区器?

1.实现partitioner接口, 重写相关方法。感觉主要就是实现partition方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;public class CustomPartitioner implements Partitioner {/*** 配置分区器** @param configs 配置信息*/@Overridepublic void configure(Map<String, ?> configs) {}/*** 计算分区** @param topic       主题名称* @param key         消息键,可以为null* @param keyBytes    消息键的字节数组表示,可以为null* @param value       消息值* @param valueBytes  消息值的字节数组表示* @param cluster     Kafka集群信息* @return 分配的分区ID*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 如果键为null,则使用轮询分区策略if (keyBytes == null) {return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;}// 使用键的hashCode来计算分区return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}/*** 关闭分区器*/@Overridepublic void close() {// 可以在这里进行资源的清理操作,通常分区器不需要进行额外的关闭操作}
}

想说的在图里啦!宝宝!💡 

嘿嘿,这里解决了之前的问题,key并不像之前学到的hashmap中消费者用来消费的key,它的核心作用就是用来进行分区计算

这个点就可以从:没有指定特定的分区标号,并且分区标号没有超过范围!序列化key以及分区器不忽略key的情况下看出来。partitionForKey()方法中就用不加密的hash算法并且对分区数量进行取余处理计算。

如果key值忘记传递了呢!?

return RecordMetadata.UNKNOWN_PARTITION;(这是一个表示未知分区的常量)。表明当前生产者无法确定消息发送到哪个分区,可能需要进一步处理或记录错误信息。那感觉也不太对啊,不知道把key发送到哪一个分区!

其实他是在数据收集器那一步追加了,看这个accumulator.append方法!

点进去哦!分区标号计算:粘性分区策略

如果没有进行传递key参数,也就是当前分区是未知分区,就会根据当前主题的分区负载情况来动态获取分区标号。这就是一种优化后的粘性分区策略!如图1.1

🤔图1.1  当前分区是未知分区,就会根据当前主题的分区负载因子来动态获取分区标号。

会根据当前分区负载情况判断去那个分区!如图1.2

✅当分区负载情况为空,就动态去随机选择分区,然后就尽可能的给这个分区追加数据(粘性分区策略),并且也不能超过数值batch.size=16K。如果超过这个阈值就会切换到下一个分区。并且更新分区负载情况。

✅当前主题分区负载情况不为空,那就不用随机生成了。会根据分区负载使用频率随机生成一个随机权重,然后利用二分查找算法找与权重相近的值,根据这个值获取到相应的分区,就可以得到我们的分区标号啦!

图1.2

数据校验

当数据校验成功,数据就到达了数据收集器当中。数据收集器,生产的数据作为一个临时的存储。

数据收集器

 

如果直接生产一条数据就通过网络通信来发送,这样做效率很低哦!像javaio流读取文件一样,读一个字节写一个字节,性能很低呀!

所以就有了ProducerBatch双端队列,从很减少频繁的网络交互,提高传输效率!

在神魔时候真正进行网络交互呢??

嘿嘿,看最大范围,batch.size=16K。在前面分区计算中,有一个粘性分区策略(一旦确定了一个分区,就尽可能往这个分区中追加数据,追加数据就是往producebatch中追加数据,当到达16K,就会被sender检测到),里面就有“没有传递key,如果没有分区负载情况,就会随机生成分区,不能超过最大

注意

🤔而且这里的16k意思是超过16k就不再接收数据了,不意味着数据不能超过16k!比如数据是20k,kafka要保证数据的完整性,发现这个数据值大于16k,就立马关闭,不再接收!

Sender发送线程

 kafka底层就采用了很多生产者消费者模型,一个放一个取。数据收集器是按照主题分区来放数据,而Sender发送线程会按照broker重新整合。(主题的不同分区会放在不同的节点当中,所以有可能存在不同主题的分区在同一个节点当中)。

当整合好之后,就会封装成produceRequest,进而发送给网络客户端。

默认发送时间0,也就是消息取过来就可以直接发送了!

注意这个在途请求缓冲区数量:5

  • Broker 和 Topic:每个 broker 可以存储一个或多个 topic 的数据分片,Kafka 集群的每个 broker 都可以服务于多个 topic
  • Topic 和 分区:每个 topic 可以被分为多个分区,分区内的消息顺序是有序的,而不同分区之间的消息顺序则不保证,分区允许 Kafka 横向扩展和提高并行处理能力。
  • Broker 和 分区:每个 broker 可能会存储多个 topic 的多个分区数据,这样在整个 Kafka 集群中就形成了数据的分布式存储和处理能力。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/374655.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python+selenium-UI自动框架之[优化]元素查找和BasePage页面

痛点&#xff1a;在页面查找元素的时候会遇到找不到或者其他无法处理某个字段的情况&#xff0c;又或者想要在输出的log或者report里面显示这个字段名称&#xff0c;这时候加上字段名称就很重要&#xff01; [3]pythonselenium - UI自动框架之封装查找元素https://mp.csdn.net…

电脑的D盘E盘F盘突然消失了 电脑只剩下C盘了其他盘怎么恢复

现如今随着时代的发展&#xff0c;无纸化办公成为主流&#xff0c;这主要归功于电脑&#xff0c;能够通过电脑完成的工作绝不使用纸质文件&#xff0c;这不仅提高了工作效率&#xff0c;也让一些繁杂的工作变的更加简单。不过电脑毕竟是电子产品&#xff0c;不可避免的会出现一…

通信协议_Modbus协议简介

概念介绍 Modbus协议&#xff1a;一种串行通信协议&#xff0c;是Modicon公司&#xff08;现在的施耐德电气Schneider Electric&#xff09;于1979年为使用可编程逻辑控制器&#xff08;PLC&#xff09;通信而发表。Modbus已经成为工业领域通信协议的业界标准&#xff08;De f…

Navicat导入sql文件

文章目录 Navicat导入SQL文件&#xff0c;使用默认导入&#xff0c;不做任何修改报错尝试一修改运行时的选择 尝试二修改my.ini的配置文件 Navicat导入SQL文件&#xff0c;使用默认导入&#xff0c;不做任何修改报错 尝试一 修改运行时的选择 取消勾选 ‘每个运行中运行多重查…

一键掌握天气动态 - 基于Vue和高德API的实时天气查询

前言 本文将学习如何使用Vue.js快速搭建天气预报界面,了解如何调用高德地图API获取所需的天气数据,并掌握如何将两者有机结合,实现一个功能丰富、体验出色的天气预报应用 无论您是前端新手还是有一定经验,相信这篇教程都能为您带来收获。让我们一起开始这段精彩的Vue.js 高德…

Mac的系统数据怎么删除 cleanmymac会乱删东西吗 cleanmymac有用吗

作为一款专业级的苹果电脑清理软件&#xff0c;CleanMyMac可以精准识别系统垃圾&#xff0c;有效防止Mac系统数据被误删。软件可以深入系统底层&#xff0c;清理无用的系统数据&#xff0c;优化苹果电脑设置&#xff0c;提升Mac系统性能。有关Mac的系统数据可以删吗&#xff0c…

拥抱 AGI:PieDataCS 引领云原生数据计算系统新范式

自2023年后&#xff0c;人工智能技术进入了一个更为成熟和广泛应用的阶段&#xff0c;人工通用智能&#xff08;AGI&#xff09;这一概念也成为了科技界和产业界热议的焦点。本文将结合 AGI 时代背景&#xff0c;从架构设计到落地实践&#xff0c;详细介绍拓数派云原生数据计算…

HTAP 数据库在国有大行反洗钱场景的应用

导读 在金融领域&#xff0c;随着数字化服务的深入和监管要求的提高&#xff0c;反洗钱工作变得尤为关键。洗钱活动不仅威胁金融安全&#xff0c;也对社会秩序构成挑战。本文深入探讨了国产 HTAP 分布式数据库 TiDB 在某国有大行反洗钱系统中的应用实践。 依托 TiDB 构建的新…

springboot大学校园二手书交易APP-计算机毕业设计源码25753

摘 要 在数字化与移动互联网迅猛发展的今天&#xff0c;人们对于图书的需求与消费方式也在悄然改变。为了满足广大读者对图书的热爱与追求&#xff0c;我们倾力打造了一款基于Android平台的图书交易APP。这款APP不仅汇聚了海量的图书资源&#xff0c;提供了便捷的交易平台&…

usbserver工程师手记(三)手工开通 OTP功能

1、设定密钥&#xff0c;用户自行选择一个密钥&#xff0c;以下以密钥为 EAZAYOKNGETBOPC5 为例说明 2、usb server 配置otp 密钥&#xff0c;目前还没有UI 界面开通&#xff0c;后续版本会支持从管理界面开通 curl -X POST -H Content-Type: application/json -H Accept: app…

【深度学习入门篇 ②】Pytorch完成线性回归!

&#x1f34a;嗨&#xff0c;大家好&#xff0c;我是小森( &#xfe61;ˆoˆ&#xfe61; )&#xff01; 易编橙终身成长社群创始团队嘉宾&#xff0c;橙似锦计划领衔成员、阿里云专家博主、腾讯云内容共创官、CSDN人工智能领域优质创作者 。 易编橙&#xff1a;一个帮助编程小…

数据结构复习计划之复杂度分析(时间、空间)

第二节&#xff1a;算法 时间复杂度和空间复杂度 算法(Algorithm)&#xff1a;是对特定问题求解方法(步骤)的一种描述&#xff0c;是指令的有限序列&#xff0c;其中每一条指令表示一个或多个操作。 算法可以有三种表示形式&#xff1a; 伪代码 自然语言 流程图 算法的五…

FFmpeg 实现从麦克风获取流并通过RTMP推流

使用FFmpeg库&#xff08;版本号为&#xff1a;4.4.2-0ubuntu0.22.04.1&#xff09;实现从麦克风获取流并通过RTMP推流。 RTMP服务器使用的是SRS&#xff0c;我这边是跑在Ubuntu上的&#xff0c;最好是关闭掉系统防火墙&#xff0c;不然连接服务器好像会出问题&#xff0c;拉流…

SpringBoot开发实用篇(三)

一&#xff1a;任务 1&#xff1a;SpringBoot整合Quartz 导入SpringBoot整合quartz的坐标定义具体要执行的任务&#xff0c;继承QuartzJobBean定义工作明细和触发器&#xff0c;并绑定对应关系 2&#xff1a;SpringBoot整合task 开启定时任务功能设置定时执行的任务&#x…

怎么样的主食冻干算好冻干?品质卓越、安全可靠的主食冻干分享

当前主食冻干市场产品质量参差不齐。一些品牌过于追求营养数据的堆砌和利润的增长&#xff0c;却忽视了猫咪健康饮食的基本原则&#xff0c;导致市场上出现了以肉粉冒充鲜肉、修改产品日期等不诚信行为。更令人担忧的是&#xff0c;部分产品未经过严格的第三方质量检测便上市销…

记录文字视差背景学习

效果图 文字背景会随鼠标上下移动变成红色或透明 html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><titl…

【linux】服务器卸载cuda

【linux】服务器卸载cuda 文章目录 【linux】服务器卸载cuda1、查找已安装的 CUDA 包&#xff1a;2、卸载 CUDA&#xff1a;3、删除残留文件4、更新系统的包索引&#xff1a;5、检查是否卸载干净&#xff1a; 1、查找已安装的 CUDA 包&#xff1a; dpkg -l | grep cuda2、卸载…

CSS3实现彩色变形爱心动画【附源码】

随着前端技术的发展&#xff0c;CSS3 为我们提供了丰富的动画效果&#xff0c;使得网页设计更加生动和有趣。今天&#xff0c;我们将探讨如何使用 CSS3 实现一个彩色变形爱心加载动画特效。这种动画不仅美观&#xff0c;而且可以应用于各种网页元素&#xff0c;比如加载指示器或…

基于深度学习LightWeight的人体姿态之行为识别系统源码

一. LightWeight概述 light weight openpose是openpose的简化版本&#xff0c;使用了openpose的大体流程。 Light weight openpose和openpose的区别是&#xff1a; a 前者使用的是Mobilenet V1&#xff08;到conv5_5&#xff09;&#xff0c;后者使用的是Vgg19&#xff08;前10…

Django QuerySet对象,exclude()方法

模型参考上一章内容&#xff1a; Django QuerySet对象&#xff0c;filter()方法-CSDN博客 exclude()方法&#xff0c;用于排除符合条件的数据。 1&#xff0c;添加视图函数 Test/app11/views.py from django.shortcuts import render from .models import Postdef index(re…