图解系列 图解Kafka之Producer

开局一张图,其他全靠吹

发送消息流程如下

1.初始化流程

  • 指定bootstrap.servers,地址的格式为 host:port。它会连接bootstrap.servers参数指定的所有Broker,Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与这1000个Broker的TCP连接。

    • 在实际使用过程中,我并不建议把集群中所有的Broker信息都配置到bootstrap.servers中,通常你指定3~4台就足以了。因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息,故没必要为bootstrap.servers指定所有的Broker。
    • props.put("bootstrap.servers", "localhost:9092");
  • 指定Key和Value的序列化方式。

    •  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
  • 指定acks配置,默认值是all(版本3.x)

    • props.put("acks", "all");
    • 设置为0,表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。
    • 设置为1,表示leader副本成功写入PageCache就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。
    • 设置成all或-1,表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。
  • producer = new KafkaProducer<>(props);

    • 从配置中获取必要的参数,如transactionalIdclientId
    • 根据clientId创建日志记录上下文(LogContext),用于日志记录。
    • 配置度量(Metrics)相关信息,包括度量标签、度量配置、度量报告器等。
    • 创建度量上下文(MetricsContext)和度量实例(Metrics)。
    • 初始化分区器(Partitioner)。
    • 配置并初始化键(key)和值(value)的序列化器(Serializer)。
    • 配置并初始化拦截器(Interceptors)。
    • 配置集群资源监听器(ClusterResourceListeners)。
    • 设置最大请求大小(maxRequestSize)、内存大小(totalMemorySize)和压缩类型(compressionType)等参数。
    • 配置最大阻塞时间(maxBlockTimeMs)和交付超时时间(deliveryTimeoutMs)。
    • 初始化API版本(apiVersions)和事务管理器(transactionManager)。
    • 创建记录累加器(RecordAccumulator),用于累积记录以进行批量发送。
    • 解析并验证引导服务器地址(addresses)。
    • 如果提供了元数据(metadata),则使用提供的元数据,否则创建新的元数据实例,并通过引导服务器地址进行引导。
    • 初始化错误度量传感器(errors)。
    • 创建并启动IO线程(ioThread)来处理消息发送。
    • 注册应用程序信息,用于JMX度量和监控。
    • 如果在初始化过程中发生任何错误,将调用关闭方法以避免资源泄漏,并向上抛出Kafka异常。

2.发送消息流程

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulatormain 线程将消息发送给 RecordAccumulatorSender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker

  • 构造消息记录ProducerRecord 对象,对象包含了四个属性:Topic,partition,key,value;topic 和 value 是必须的,key 和 partition 是可选的。
  • 同步获取Kafka集群信息(Cluster)。
    • 如果缓存有,并且分区没有超过指定分区范围则返回缓存
    • 否则触发更新,等待从broker获取新的元数据信息
    • 默认强制拉取时间是metadata.max.age.ms: 5分钟
  • 使用键序列化器(keySerializer)将消息的键序列化为字节数组,使用值序列化器(valueSerializer)将消息的值序列化为字节数组。
  • 计算数据发送到那个分区,如果指定了 key,那么相同 key 的消息会发往同一个分区,如果实现了自定义分区器,那么就会走自定义分区器进行分区路由。
    • 如果有Key值,则使用Key值的Hash值来分配分区 murmurhash(key) % 主题分区总数
    • 老版本:如果没有key值,则以Round-Robin的方式分配分区。
    • 新版本:如果没有key值,则以粘性分区的方式分配分区
  • 创建一个TopicPartition对象,表示要发送消息的主题和分区。
  • 判断消息的大小是否超过了我们设置的阈值。
  • 异步发送时,给每一条消息都绑定他的回调函数
  • 把消息放入记录累加器(accumulator)(32M的一个内存)*,*然后有accumulator把消息封装成为一个批次一个批次的去发送。
  • 如果批次满了或者新创建出来一个批次, 唤醒sender线程,他才是真正发送数据的线程,发送的时候并不是来一个消息就发送一个消息,这样的话吞吐量比较低,并且频繁的进行网络请求。消息是按照批次来发送的或者等待时间来发的的.

参考

  • https://www.clairvoyant.ai/blog/unleash-kafka-producers-architecture-and-internal-workings
  • 尚硅谷 Kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/128280.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP的滑动窗口协议有什么用?

分析&回答 滑动窗口协议&#xff1a; TCP协议的使用维持发送方/接收方缓冲区 缓冲区是 用来解决网络之间数据不可靠的问题&#xff0c;例如丢包&#xff0c;重复包&#xff0c;出错&#xff0c;乱序 在TCP协议中&#xff0c;发送方和接受方通过各自维护自己的缓冲区。通…

批量采集的时间管理与优化

在进行大规模数据采集时&#xff0c;如何合理安排和管理爬取任务的时间成为了每个专业程序员需要面对的挑战。本文将分享一些关于批量采集中时间管理和优化方面的实用技巧&#xff0c;帮助你提升爬虫工作效率。 1. 制定明确目标并设置合适频率 首先要明确自己所需获取数据的范…

记录在windows下安装MySQL所遇到的各种坑

1.下载 从官网下载installer 然后开始选择要安装的组件 安装了很久进度都是0&#xff0c;无奈点击show detail以后发现&#xff0c;webclient异常&#xff0c;最后是将链接地址复制到迅雷才成功下载的 等迅雷下载完成以后&#xff0c;会看到有如下2个新msi文件 msi都是windows…

Python:安装Flask web框架hello world

安装easy_install pip install distribute 安装pip easy_install pip 安装 virtualenv pip install virtualenv 激活Flask pip install Flask 创建web页面demo.py from flask import Flask app Flask(__name__)app.route(/) def hello_world():return Hello World! 2023if _…

【Go基础】编译、变量、常量、基本数据类型、字符串

面试题文档下链接点击这里免积分下载 go语言入门到精通点击这里免积分下载 编译 使用 go build 1.在项目目录下执行 2.在其他路径下编译 go build &#xff0c;需要再后面加上项目的路径&#xff08;项目路径从GOPATH/src后开始写起&#xff0c;编译之后的可执行文件就保存再…

【Mybatis】Mybatis的工作原理

目录 工作原理图 使用 MyBatis 操作数据库通常需要以下几个步骤&#xff1a; 1、配置数据库连接信息&#xff1a; 2、定义数据表对应的实体类&#xff1a; 3、编写 SQL 映射文件&#xff1a; 4、配置 MyBatis 映射文件&#xff1a; 5、创建 MyBatis 的 SqlSessionFactory&…

基于SSM的宿舍管理系统【附源码文档】

基于SSM的宿舍管理系统【附源码文档】 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringSpringMVCMyBatis工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 【主要功能】 角色&#xff1a;管理员、宿舍管理员、学生 管理员&#xff1a;院系信息、班级信…

leetcode872. 叶子相似的树(java)

叶子相似的树 题目描述递归 题目描述 难度 - 简单 leetcode - 872. 叶子相似的树 请考虑一棵二叉树上所有的叶子&#xff0c;这些叶子的值按从左到右的顺序排列形成一个 叶值序列 。 举个例子&#xff0c;如上图所示&#xff0c;给定一棵叶值序列为 (6, 7, 4, 9, 8) 的树。 如果…

Undefined symbols for architecture arm64

解决问题之前&#xff0c;先了解清晰涉及到的知识点&#xff1a; iOS支持的指令集包含&#xff1a;armv6、armv7、armv7s、arm64&#xff0c;在项目TARGETS---->Build Settings--->Architecturs 可以修改对应的指令集&#xff0c;目前Standard Architectures(arm64, arm…

SSM - Springboot - MyBatis-Plus 全栈体系(五)

第二章 SpringFramework 四、SpringIoC 实践和应用 2. 基于 XML 配置方式组件管理 2.5 实验五&#xff1a;高级特性&#xff1a;FactoryBean 特性和使用 2.5.1 FactoryBean 简介 FactoryBean 接口是Spring IoC容器实例化逻辑的可插拔性点。 用于配置复杂的Bean对象&#x…

精益制造、质量管控,盛虹百世慧共同启动MOM(制造运营管理)

百世慧科技依托在电池智能制造行业中的丰富经验&#xff0c;与盛虹动能达成合作&#xff0c;为其提供MOM制造运营管理平台&#xff0c;并以此为起点&#xff0c;全面提升盛虹动能的制造管理水平与运营体系。 行业困境 中国动力电池已然发展为全球最大的电池产业&#xff0c;但…

Android 系统源码目录frameworks/base/packages和packages/apps下的APP区别

概要 在 Android Open Source Project (AOSP) 源代码中&#xff0c;frameworks/base/packages 和 packages/apps 目录都包含 Android 系统中的应用程序&#xff0c;但它们在性质和用途上有一些区别&#xff1a; 1&#xff0c;frameworks/base/packages frameworks/base 目录…

【初阶C语言】操作符1--对二进制的操作

前言&#xff1a;本节内容介绍的操作符&#xff0c;操作的对象是二进制位。所以前面先介绍整数的二进制位 一、二进制位介绍 1.二进制介绍 &#xff08;1&#xff09;整数的二进制表示形式有三种&#xff1a;原码、反码和补码。 &#xff08;2&#xff09;原码、反码和补码的…

【ARM CoreLink 系列 3 -- CCI-550 控制器介绍 】

文章目录 CCI FamilyCCI-550 简介CCI-550 功能CCI-550 Interfaces Snoop filter 使用背景CCI-550 Snoop filter 上篇文章&#xff1a;ARM CoreLink 系列 2 – CCI-400 控制器简介 CCI Family CCI-550 简介 Arm CoreLink CCI-550 Cache Coherent Interconnect 扩展了 CoreLink…

Linux指令二【进程,权限,文件】

进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程&#xff0c;是操作系统进行 资源分配和调度的一个独立单位&#xff0c;是应用程序运行的载体。 一、进程基本指令 1.ps&#xff1a;当前的用户进程 ps 只显示隶属于自己的进程状态ps -aux 显示所有进程…

解决Spring Boot启动错误的技术指南

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

二叉搜索树

目录 二叉搜索树 操作-查找 操作-插入 操作-删除 性能分析 二叉搜索树 二叉搜索树又称二叉排序树,它要么是一棵空树,要么是具有以下性质的二叉树: 若它的左子树不为空,则左子树上所有结点的值都小于根节点的值若它的右子树不为空,则右子树上所有结点的值都大于根节点的值…

自动化运维—ansible

一、 Ansible 介绍 Ansible 是一种 IT 自动化工具。它可以配置管理&#xff0c;部署软件以及协调更高级的 IT 任务&#xff0c; 例如持续部署&#xff0c;滚动更新。 Ansible 适用于管理企业 IT 基础设施&#xff0c;从 几十台到上百台的服务器环境。Ansible 也是一种简单的自…

上海控安SmartRocket系列产品推介(六):SmartRocket PeneX汽车网络安全测试系统

产品概述 上海控安汽车网络安全测试系统PeneX&#xff08;Penetrator X&#xff09;是一款支持对整车及车辆零部件及子系统实施网络安全测试的系统&#xff0c;其包含硬件安全、软件系统安全、车内通信及车外通信四大安全测试系统&#xff1b;支持合规性测试&#xff0c;包含国…

LLMs之Falcon 180B:Falcon 180B的简介、安装、使用方法之详细攻略

LLMs之Falcon 180B&#xff1a;Falcon 180B的简介、安装、使用方法之详细攻略 导读&#xff1a;2023年9月7日(北京时间)&#xff0c;TII重磅发布Falcon-180B模型&#xff0c;它是Falcon系列的升级版本&#xff0c;是一个参数规模庞大、性能优越的开放语言模型&#xff0c;适用于…