【Kafka】Kafka源码解析之producer过程解读

从本篇开始 打算用三篇文章 分别介绍下Producer生产消费,Consumer消费消息 以及Spring是如何集成Kafka 三部分,致于对于Broker的源码解析,因为是scala语言写的,暂时不打算进行学习分享。

总体介绍

在这里插入图片描述

  • clients : 保存的是Kafka客户端代码,主要就是生产者和消费者代码
  • config:保存Kafka的配置文件,其中比较重要的配置文件是server.properties。
  • connect目录:保存Connect组件的源代码。我在开篇词里提到过,Kafka Connect组件是用来实现Kafka与外部系统之间的实时数据传输的。
  • core目录:保存Broker端代码。Kafka服务器端代码全部保存在该目录下。

而一条消息的整体流转过程其实就是经过三部分,也就是Producer\Broker\Consumer。
因为是对主要核心流程的分析,所以只会截核心代码。具体后面细节,在说。
在这里插入图片描述

producer整体流程

对于Producer来说,其实就是几部分。

  • 初始化、发送流程、缓冲区

初始化流程

设置分区器

// 设置分区器this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));

设置重试时间,默认100ms,如果配置Kafka可以重试,retries制定重试次数,retryBackoffMs指定重试的间隔

 long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

获取Key和Value的序列化器

      // 序列化器if (keySerializer == null) {this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);} else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);this.keySerializer = keySerializer;}if (valueSerializer == null) {this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serializer.class);this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);} else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);this.valueSerializer = valueSerializer;}

拦截器

    // 设置拦截器List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));if (interceptors != null)this.interceptors = interceptors;elsethis.interceptors = new ProducerInterceptors<>(interceptorList);

其他参数

	   // 设置最大消息为多大,默认是1M 默认是16384this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);// 设置缓存大小 默认是32M 默认是33554432 RecordAccumulator=32MBthis.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);// 设置压缩类型 可以提升性能this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.accumulator = new RecordAccumulator(logContext,
       // 因为是通过缓冲区发送消息的,所以需要消息累计器RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning,config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG));

初始化元数据

 // 初始化集群元数据if (metadata != null) {this.metadata = metadata;} else {this.metadata = new ProducerMetadata(retryBackoffMs,config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),logContext,clusterResourceListeners,Time.SYSTEM);this.metadata.bootstrap(addresses);}

创建Sender线程,其中包含一个重要的网络组件NetWorkClient

	// 创建sender线程this.sender = newSender(logContext, kafkaClient, this.metadata);// 线程nameString ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;// 封装起来 设置为守护线程 并启动this.ioThread = new KafkaThread(ioThreadName, this.sender, true);// 线程启动this.ioThread.start();

发送消息流程

发送消息的过程

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// 执行拦截器逻辑ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}

先执行拦截器,可以发现就是遍历拦截器,然后执行对应的onSend()方法。当我们想增加一个拦截器,直接实现对应的接口,重写onSend()方法,然后Kafka就会调用我们的onSend方法。通过提供一个拓展点进行使用。

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {ProducerRecord<K, V> interceptRecord = record;for (ProducerInterceptor<K, V> interceptor : this.interceptors) {try {interceptRecord = interceptor.onSend(interceptRecord);} catch (Exception e) {}}return interceptRecord;}

从Kafka Broker集群获取元数据metadata

// 从broker获取元数据clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);

对key和value进行序列化,调用对应的serialize的方法。

	  byte[] serializedKey;try {// 选择对应的序列化进行操作serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {}
 	// 选择具体的分区int partition = partition(record, serializedKey, serializedValue, cluster);// 消息缓存到RecoredAccumulatorresult = accumulator.append(record.topic(), partition, timestamp, serializedKey,serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);// 消息发送的条件// 缓冲区数据大小达到batch.size 或者linnger.ms达到上限后 唤醒sneder线程。if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());this.sender.wakeup();}

Sender线程

	runOnce();long pollTimeout = sendProducerData(currentTimeMs);

在这里插入图片描述

在这里插入图片描述
缓冲区、

在这里插入图片描述

这篇讲解很详细 https://www.cnblogs.com/rwxwsblog/p/14754810.html

生产者核心参数配置

bootstrap.servers:连接Broker配置,一般就是xxxx:9092

key.serializer 和 value.serializer:对key和value进行序列化器,可以自定义,一般就是String方式

buffer.memory:RecordAccumulator 缓冲区总大小,默认32m。

batch.size: 消息会以batch的方式进行发送,这是一批数据的大小 默认是16K

linger.ms:发送消息的时机,如果没有达到batch.size or linger.ms的时间就会发送 默认是0ms 立即发送

acks: 0: 不落盘 1:只有leader落盘 -1(all) : leader和所有从节点持久化成功 默认是-1

max.in.flight.requests.per.connection:允许最多没有返回 ack 的次数,默认为 5

retries: 消息发送失败时,系统重发消息 默认值 2147483647

retry.backoff.ms:两次重试间隔 默认是100ms

enable.idempotence: 开启幂等性 默认true

compression.type: 压缩格式 默认是none

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/450531.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为携手竹云发布海外一网通办解决方案,助力海外政务数智化发展

10月14日&#xff0c;第44届GITEX GLOBAL展会&#xff08;GITEX GLOBAL 2024&#xff09;在迪拜世界贸易中心盛大开幕。作为全球最具影响力的科技和创业盛会之一&#xff0c;本届活动吸引180多个国家的6500余家全球知名企业集聚迪拜&#xff0c;展示涵盖人工智能、网络安全、移…

【Linux】解答:为什么创建目录文件,硬链接数是2;创建普通文件时,硬链接数是1?(超详细图文)

前言 大家好吖&#xff0c;欢迎来到 YY 滴Linux系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的《Lin…

spring boot热部署

使用热部署解决了每次都需要重新启动的问题&#xff0c;但不过热部署的在对于改动比较小时速度可能快一些&#xff0c;改动大的话尽量停止 1.使用热部署之前需要在pom.xml文件中导入依赖 <dependency><groupId>org.springframework.boot</groupId><artifa…

DS链式二叉树的遍历(11)

文章目录 前言一、链式二叉树的结构结构定义手动搭建 二、二叉树的遍历三种常见遍历(前序、中序、后序)层序遍历 总结 前言 堆是特殊的二叉树&#xff0c;可二叉树本身也很值得研究~   正文开始&#xff01; 一、链式二叉树的结构 前文也提到了二叉树一共有两种&#xff0c;空…

人工智能创造出大量新型蛋白质

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

【线性回归分析】:基于实验数据的模型构建与可视化

目录 线性回归分析&#xff1a;基于实验数据的模型构建与可视化 1. 数据准备 2. 构建线性回归模型 3. 可视化 数据分析的核心 构建预测模型 应用场景 预测模型中的挑战 结论 线性回归分析&#xff1a;基于实验数据的模型构建与可视化 在数据分析领域&#xff0c;线性…

《拿下奇怪的前端报错》:1比特丢失导致的音视频播放时长无限增长-浅析http分片传输核心和一个坑点

问题背景 在一个使用MongoDB GridFS实现文件存储和分片读取的项目中&#xff0c;同事遇到了一个令人困惑的问题&#xff1a;音频文件总是丢失最后几秒&#xff0c;视频文件也出现类似情况。更奇怪的是&#xff0c;播放器显示的总时长为无限大。这个问题困扰了团队成员几天&…

wps安装教程

WPS office完整版是一款由金山推出的免费办公软件&#xff0c;软件小巧安装快&#xff0c;占用内存极小&#xff0c;启动速度快。WPS office完整版包含WPS文字、WPS表格、WPS演示三大功能模块&#xff0c;让我们轻松办公。WPS的功能是依据OFFICE用户的使用习惯而设计&#xff0…

Java5.--继承-重写-多态

笔记暂未整理&#xff1a; 一、面向对象的第二大特征&#xff1a;继承 1.分类&#xff1a;业务封装 功能封装 2.作用 封装-->属性的安全&#xff01; 继承-->重用----重用代码&#xff08;属性方法&#xff09; 多态-->扩展 3.实现继承的步骤 ①从多个相似的类中…

OpenShift 4 - 云原生备份容灾 - Velero 和 OADP 基础篇

《OpenShift 4.x HOL教程汇总》 说明&#xff1a; 本文主要说明能够云原生备份容灾的开源项目 Velero 及其红帽扩展项目 OADP 的概念和架构篇。操作篇见《OpenShift 4 - 使用 OADP 对容器应用进行备份和恢复&#xff08;附视频&#xff09; 》 Velero 和 OADP 包含的功能和模…

精品!“缠论分笔预测”,缠论分笔波段空间预测指标!

精品&#xff01;“缠论分笔预测”&#xff0c;缠论分笔波段空间预测指标&#xff01; 使用技巧该指标属于缠论相关指标&#xff0c;可结合缠论使用。使用缠论分笔方法来确定波段的高低点&#xff0c;相比使用“ZIG”算法&#xff0c;似乎更为准确。它能有效减少某些股票高点和…

大模型生图安全疫苗注入赛题解析(DataWhale组队学习)

引言 大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者。本系列文章是我跟随DataWhale 2024年10月实践赛的大模型生图安全疫苗注入赛道&#xff1b;本文主要整理本次赛事的基本流程和优化方法。&#x1f495;&#x1f495;&#x1f60a; 一…

Unity 山水树木

本章节内容 1. Unity对3D游戏物体的简单操作&#xff1b; 2. 构建山水树木的场景 1. Unity 简易操作 1.1 新建3D游戏场景 1. 打开Unity Hub&#xff0c;点击 New Project &#xff08;新建项目&#xff09;按键&#xff0c;选择第二项 3D(Built-In Render Pipeline)&#xf…

harmonyOS next之实现时间打卡定时器

需求&#xff1a;实现一个时间打卡签到按钮。 实现方法&#xff1a;每隔一秒钟获取一下当前时间。 实现代码如下&#xff1a; Column(){Text(this.curTime).fontColor(#FFFFFF).fontWeight(600).fontSize(32vp)Text(上班打卡).fontColor(#FFFFFF) } .width(170vp) .height(170…

【 香格里拉酒店-注册/登录安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

[0633].第3-3节:@SentinueResource注解

我的后端学习大纲 SpringCloud学习大纲 是什么&#xff1a; SentinueResource是一个流量防卫防护组件注解 用于指定防护资源&#xff0c;&#xff0c;对配置的资源进行流量控制、熔断降级等功能 SentinueResource注解说明&#xff1a; Target({ElementType.METHOD, ElementTy…

选择合适的SSL证书

随着我们在线业务的增长&#xff0c;确保网站安全变得越来越重要。对于许多人来说&#xff0c;保护网站安全的想法似乎令人望而生畏&#xff0c;尤其是在有各种SSL证书可用的情况下。您可能想知道哪一个最适合您的业务需求或如何浏览这些选项。 除了SSL证书之外&#xff0c;使…

SQL Injection | SQL 注入 —— 时间盲注

关注这个漏洞的其他相关笔记&#xff1a;SQL 注入漏洞 - 学习手册-CSDN博客 0x01&#xff1a;时间盲注 —— 理论篇 时间盲注&#xff08;Time-Based Blind SQL Injection&#xff09;是一种常见的 SQL 注入技术&#xff0c;适用于那些页面不会返回错误信息&#xff0c;只会回…

appium启动hbuild打包的apk异常解决

目录 一、错误信息 二、问题解决 2.1 通过以下命令获取安装包名称&#xff1a; 2.2 这个launcher状态下的安装包名称和active&#xff0c;替换原先的安装包名称 一、错误信息 通过adb shell dumpsys activity | findstr "mResume" 命令获取的安装包信息&#xff…

第十四届单片机嵌入式蓝桥杯

一、CubeMx配置 &#xff08;1&#xff09;LED配置 &#xff08;1&#xff09;LED灯里面用到了SN74HC573ADWR锁存器&#xff0c;这个锁存器有一个LE引脚,这个是我们芯片的锁存引脚&#xff08;使能引脚&#xff09;&#xff0c;由PD2这个端口来控制的 &#xff08;2&#xff…