kafka原理简介

Kafka是由LinkedIn开发的一个分布式发布/订阅的消息系统和一个强大的队列,使用Scala编写,它以可扩展和高吞吐率而被广泛使用。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内以master-flower方式实现数据同步,从而防止数据丢失。

1、组件和角色

Producer:消息生产者,发布消息到 kafka 集群的终端或服务。

Consumer:从 kafka 集群中消费消息的终端或服务。

Consumer grouphigh-level consumer API 中,每个 consumer 都属于一个 consumer group,一个partition只能被同一个 consumer group 中的一个 Consumer 消费,但可以被多个不同consumer group 中的consumer消费。

Broker: 集群中的每一个kafka进程都是一个Broker,通常一台服务器上部署一个broker

Topic :每条发布到 kafka 集群的消息属于的类别,即kafka是面向topic的,topic是逻辑概念。

Partition:每个topic包含一个或多个partitionkafka分配的单位是partitionpartition是物理概念,生产者发送的消息就是保存在partition中的。

Segmentpartition物理上由多个segment组成。

offset : 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续递增的序列号叫做offset,偏移量offset在每个分区中是唯一的。

replicapartition的副本,保障 partition 的高可用。leaderfollower统称为Replica。在kafka集群中,为了防止数据丢失,每个partition都会有主分区和从分区,当然,也可以没有从分区。每个partition有且只有一个主分区,可以没有从分区,也可以有一个或者多个从分区。

leaderreplica中的一个角色,主分区所在的节点称为leader。在kafka集群中,每个partition都有一个leaderproducerconsumer只跟leader交互,leader负责数据的读写。

followerreplica中的一个角色,从分区所在的节点称为follower,从leader中复制(fentch)数据。为了防止leaderfollower节点上数据不一致性的问题,kafka没有使用读写分离,而是只在leader节点上读写数据,follower节点只是从leader节点上定期复制数据。如果leader节点异常,随机选择一个follower节点成为leader节点,从而防止数据丢失。

controllerkafka 集群中的一个broker,用来进行 leader 选举以及各种故障转移。

zookeeperkafka 通过 zookeeper 来存储集群的 meta 信息,meta信息主要包括kafkabroker列表(ip:port)topicpartition等信息。

ARAssigned Replica 已分配的副本):表示某个分区的所有副本。

ISRIn-Sync-Replica 在同步中的副本):表示所有与leader副本保持一定程度同步的副本(包括leader副本在内)。

OSROut-of-Sync-Replica 不在同步中的副本):表示所有与leader副本同步滞后过多的follower副本(不包括leader副本)。

AR = ISR + OSR

正常情况下,所有的follower副本都应该与leader副本保持同步,即:AR = ISROSR集合为空。

(1)controller的选举

Kafka启动时,会在所有的broker(集群节点)中选择一个controller,leader和follower是针对分区而言的,而controller是针对broker而言的。创建topic、添加分区、修改副本数量等管理任务都是由controller完成的,以及Kafka分区leader的选举,也是有controller决定的。

在Kafka集群启动时,每个broker都会将自己注册到zookeeper上,并尝试在zookeeper上抢锁,抢占成功的broker就注册成为Controller(ZK临时节点)。只会有一个broker节点竞争成功,其他的broker会注册该节点的监视器,一旦该临时节点状态发生变化,就可以进行相应的处理。Controller是高可用的,一旦某个broker崩溃,其他的broker会重新注册成为Controller。

(2)leader的选举

所有分区的leader选举都是由controller决定的,controller会将leader的改变直接通过RPC的方式通知需为此做出响应的broker,controller读取到当前分区的ISR,只有一个replica存活时,就选择这个replica作为leader,否则任意选择一个replica作为leader,如果该分区的所有replica都已经宕机,则新的leader为-1。

为什么不通过ZK的方式选举分区的leader?

Kafka集群如果业务很多的情况下,会存在很多的分区,假设某个broker宕机,就会出现很多的分区都需要重新选举leader,如果使用zookeeper选举leader,会给zk带来巨大的压力。因此,Kafka中leader的选举不能使用zk来实现。

2、原理简介

1.一个Topic分为多个Partition来进行数据管理,一个Partition中的数据是有序、不可变的,使用偏移量(offset)唯一标识一条数据,是一个long类型的数据。Partition接收到producer发送过来的数据后,会产生一个递增的offset偏移量数据,同时将数据保存到本地的磁盘文件中(文件内容以追加的方式写入数据)Partition中的数据存活时间超过参数值(log.retention.{ms,minutes,hours},默认7)的时候进行删除(默认)Consumer根据offset消费对应TopicPartition中的数据(也就是每个Consumer消费的每个TopicPartition都拥有自己的offset偏移量)。注意:Kafka的数据消费是顺序读写的,磁盘的顺序读写速度(600MB/sec)比随机读写速度(100k/sec)快。

2.Kafka集群中,producer生产数据并发送到对应的TopicProducer通过push的方式将数据发送到对应Topic的分区,Producer发送到Topic的数据是由key/value键值对组成的,Kafka根据不同的key将数据发送到不同的Partition,默认采用Hash的机制发送数据到对应Topic的不同Partition中,配置参数为{partitioner.class}。也可以配置自定义分配机制,自定义类实现Partitioner接口,重写partition方法的方式。Producer发送数据的方式分为sync(同步)async(异步)两种,默认为同步方式, 由参数{producer.type}决定;当发送模式为异步发送的时候,Producer提供重试机制,默认失败重试发送3次。

3.如果生产者同步发消息,在收到kafkaack告知发送成功之前一直处于阻塞状态。如果生产者异步发消息,发送完之后不用等待broker给回复,直接执行后面的业务逻辑。可以提供回调方法,让broker异步的调用callback,告知生产者,消息发送的结果。如果告知的结果异常,再进行相应的处理操作。

4.Kafka有两种模式消费数据:队列和发布订阅;在队列模式下,一条数据只会发送给consumer group中的一个consumer进行消费;在发布订阅模式下,一条数据会发送给多个consumer进行消费。Kafka中通过控制consumer的参数{group.id}来决定kafka是什么数据消费模式,如果所有消费者的该参数值是相同的,那么此时的kafka就是类似于队列模式,数据只会发送到一个consumer,此时类似于负载均衡;否则就是发布订阅模式。Kafkaconsumer基于offsetkafka中的数据进行消费。

5.Kafka的数据是按照分区进行排序的(插入的顺序),也就是每个分区中的数据是有序的。在Consumer进行数据消费的时候,也是对分区的数据进行有序消费的, 但是不保证所有数据的有序性(多个分区之间),同一个分区数据先进先出。

6.Consumer Rebalance:当一个consumer group组中的消费者数量和对应Topic的分区数量一致的时候,此时一个Consumer消费一个Partition的数据; 如果不一致,那么可能出现一个Consumer消费多个Partition的数据或者不消费数据的情况,这个机制是根据ConsumerPartition的数量动态变化的。Consumer通过poll的方式主动从Kafka集群中获取数据。

7.KafkaReplication指的是Partition的复制,一个Partition的所有分区中只有一个分区是leader节点,其它分区是follower节点。ReplicationKafka的吞吐率有一定的影响,但是极大的增强了可靠性。Follower节点会定时的从leader节点上获取增量数据,一个活跃的follower节点必须满足以下两个条件: (1)所有节点必须维护与zookeeper的连接(通过zkheartbeat实现) (2)follower必须能够及时的将leader上的writing复制过来,不能落后太多,由参数{replica.lag.time.max.ms}{replica.lag.max.messages}决定。

8.MessageDeliverySemantics是消息系统中数据传输的可靠性保证的一个定义,主要分为三种类型: At most once(最多一次):消息可能会丢失,但是不可能重复发送。At least once(最少一次):消息不可能丢失,但是可能重复发送。Exactly once(仅仅一次):消息只发送一次,但不存在消息的丢失。Kafka的Producer通过参数{request.required.acks}来确定Producer和Broker之间是哪种消息传递类型。Ack=0,相当于异步发送,意味着producer不等待broker同步完成,消息发送完毕继续发送下一批消息。提供了最低延迟,但持久性最弱,当broker发生故障时很可能发生数据丢失。如果leader死亡,producer继续发送消息,broker接收不到数据就会造成数据丢失。 Ack=1,producer要等待leader成功收到消息并确认,才发送下一条message。提供较低的延迟性以及较好的持久性。但是如果partition下的leader死亡,而follower尚未复制数据,数据就会丢失。 Ack=-1,leader收到所有消息,且follower同步完数据,才发送下一条数据。延迟性最差,持久性最好(即可靠性最好)。 三种参数设置性能递减,可靠性递增。 同时,Ack默认值为1,此时吞吐量与可靠性折中。实际生产中可以根据实际需求进行调整。

3、常用参数介绍

(1)kafkaserver.properties配置文件中参数:

broker.id=0  #当前机器在集群中的唯一标识,和zookeepermyid性质一样
port=9092 #当前kafka对外提供服务的端口,默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bugDNS解析问题,失败率的问题。改成自己centosip地址。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后再发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:2181,192.168.7.101:2181,192.168.7.107:2181/kafka #设置zookeeper的连接端口,在集群配置时,要把所有机器的ip地址都要写上,这里以三个机器为例。如果是单机部署,只需要写一个ip地址就行了。

注意:在zookeeper.connect的最后加上/kafka是因为kafka需要依赖zookeeper,在kafka启动之后默认会在zookeeper服务所在节点的根目录下创建很多与kafka有关的目录,这样就会导致zookeeper服务所在节点的根目录下的文件很多很乱。另外,如果多个kafka共用一个zookeeper,就会导致zookeeper服务的根目录下各个kafka文件更加混乱。所以在zookeeper.connect的最后加上/kafka是为了在kafka启动时将创建的文件都放到zookeeper节点根目录下的/kafka子目录下。多个kafka共用一个zookeeper时可以分别配置自己的子目录以示区分。

启动zookeeper和kafka之后,会自动在zookeeper节点上创建/kafka目录。

(2)生产者producer.properties配置文件中的参数

1.bootstrap.servers=host1:port1,host2:port2  // 用于生产者与kafka集群建立连接

2. acks:表示Producer需要Leader确认的模式。

1acks = 0: 表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。

2acks = -1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。

3acks = 1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。

3. compression.type=none  // 压缩类型,目前支持none(不压缩), gzip, snappy, lz4, zstd

4. partitioner.class= kafka.producer.DefaultPartitioner  // 分区的策略,默认是取模

5. request.timeout.ms=10000  // 消息发送的最长等待时间

6. linger.ms=0  //这个值是为了延迟发送来收集更多的消息一批发送,Producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不延迟。为了减少网络IO,提升整体的性能,建议设置5-100ms

7. batch.size=16384   // Producer按照batch进行发送,通过这个参数来设置批量提交的数据大小,默认是16KB,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)。

8. buffer.memory=33554432  //该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:3355443232MB。发送的消息会先进入到本地缓冲区(32MB),生产者会跑一个线程,该线程去缓冲区中取16KB的数据,发送到kafka,如果到10毫秒数据没取满16KB,也会发送一次。异步的时候假如设置了缓存消息数量为200,但是一直没有200条数据,那么不可能一直等下去,就会取16KB大小的数据,直接发,不够16KB也会发。

(3)消费者consumer.properties配置文件中的参数

1.bootstrap.servers= host1:port1,host2:port2 ...  // 用于消费者与kafka集群建立连接

2. group.id=test-consumer-group  // 标记消费者所属的消费者组

3. key.deserializer和value.deserializer:指定接收消息的key和value的反序列化类型。一定要写全类名。

4. enable.auto.commit:默认值为true,消费者会自动周期性地向服务器提交偏移量。

5. auto.commit.interval.ms:如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

6. auto.offset.reset:当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),读取数据偏移量的处理方式:

(1)earliest:自动重置偏移量到最早的偏移量。

(2)latest:默认,自动重置偏移量为最新的偏移量。

(3)none:如果消费组原来的偏移量不存在,则向消费者抛异常。

7. max.poll.records:一次poll拉取数据返回消息的最大条数,默认500条。

8.offsets.topic.num.partitions:__consumer_offsets的分区数,默认是50个分区。

9.heartbeat.interval.ms:Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。

10.session.timeout.ms:Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

11.max.poll.interval.ms:消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

12.fetch.min.bytes:默认1个字节。消费者获取服务器端一批消息最小的字节数。

13.fetch.max.wait.ms:默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。

14.fetch.max.bytes:默认值: 52428800字节,即50MB。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/349661.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AutoCAD 2025 ObjectARX(C++)二次开发环境搭建

(原文:https://blog.iyatt.com/?p16480) 基本环境 AutoCAD 机械版 2025 Visual Studio 2022(需要安装“C 桌面开发”) 开发环境 下载 (1)ObjectARX SDK 下载(提供开发使用的 …

C#、C++、Java、Python 选择哪个好?

选择哪种编程语言取决于你的需求和偏好,以及你打算做什么类型的项目。我这里有一套编程入门教程,不仅包含了详细的视频 讲解,项目实战。如果你渴望学习编程,不妨点个关注,给个评论222,私信22,我…

springcloud gateway转发websocket请求的404问题定位

一、问题 前端小程序通过springcloud gateway接入并访问后端的诸多微服务,几十个微服务相关功能均正常,只有小程序到后端推送服务的websocket连接建立不起来,使用whireshark抓包,发现在小程序通过 GET ws://192.168.6.100:8888/w…

【Linux】基础IO——系统文件IO

我之前是讲过c语言的文件操作的,但是说实话我压根就不知道它在干什么,后面c语言/c,数据结构的学习过程中也没用过文件操作,今天我们就来会会这个文件操作 1.回顾c语言文件接口 1.1.fopen r :只读模式打开,文件流指针…

开源大模型的新星:ChatGPT-Next-Web 项目解析与推荐

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

oracle 删除当前用户下所有表

荆轲刺秦王 通常呢 我们将正式环境的 oracle 数据库 导出成 dmp 文件,然后导入到测试环境或者本地环境,期间可能会出现各种问题。那么如何使错误的导入数据全部删除呢。可以这样做: 1. 本地虚拟机启动 oracle 服务 2. sqldeveloper 连接 o…

Rust : windows下protobuf和压缩传输方案

此前dbpystream库是用python开发 web api。今天在rust中试用一下protobuf。 本文关键词:编译器、protobuf、proto文件、序列化、zstd压缩,build。 一、 protobuf编译器下载 具体见相关文章。没有编译器,protobuf无法运行。 windows参见&am…

工作组局域网-ARP欺骗-攻击防御单双向

免责声明:本文仅做技术交流与学习... 目录 断网限制-单向 环境: 演示: win10: 欺骗前 欺骗后 kali: kali执行命令: win10结果: 劫持数据-双向 欺骗: 网络分析: 防御--动态解析改静态 中间人攻击 断网限制-单向 环境: 靶机:win10 攻击机:kali…

【JavaEE】Spring Boot MyBatis详解(一)

一.MyBatis的基本概念与相关配置. 1.基本概念 MyBatis是一款优秀的持久层框架,用于简化JDBC的开发。MyBatis本是Apache的一个开源项目iBatis,2010年这个项目由apache迁移到了google code,并且改名为MyBatis. 2013年11月迁移到Github.持久层…

Spring Boot:Java 应用开发高效之道

Spring Boot 是一种革命性的框架,旨在简化 Java 应用的创建和部署过程。通过自动化配置和简化项目搭建流程,Spring Boot 大大加速了开发周期,让 Java 应用开发变得更加高效和便捷。 核心优势: 快速启动和简化配置:Spr…

智慧公安指挥中心大数据信息化两中心两基地系统方案

1.1 系统建设目标 本系统是一个汇接全市的报警求助的大型通信指挥系统,技术难度较高、可靠性要求高,技术路线的选择至关重要。 在充分考虑XX市公安局的业务需要,利用现代通信及计算机网络技术的基础上,最大程度地实现资源整合、…

数据结构和矩阵细节用法:double、cell和complex #matlab

矩阵建立 建立矩阵用[]; 矩阵的同一行内的元素用逗号或者空格隔开; 矩阵的不同行的元素用分号隔开 eg. 矩阵 A 1 2 3 4 5 6 7 8 9 在matlab中矩阵A表示为: clc;clear; A[1,2,3;4,5,6;7,8,9]; %或者A[1 2 3;4 5 …

苹果AI来了,ios18史诗级发布

今天凌晨1点,苹果举行了WWDC开发者大会,正式发布了 全新iOS 18、iPadOS 18、watchOS 11、tvOS 18、macOS 等以及Apple Intelligence的个人化智能系统 苏音给大家汇总下,ios18的更新内容以及苹果的AI。 本次更新,官方带来的title…

关于怎么用Cubemx生成的USBHID设备实现读取一体的鼠标键盘设备(改进版)

主要最近做了一个要用STM32实现读取鼠标键盘一体的那种USB设备,STM32的界面上要和电脑一样的能通过这个USB接口实现鼠标移动,键盘的按键。然后我就很自然的去参考了正点原子的例程,可是找了一圈,发现正点原子好像用的库函数&#…

46.Python-web框架-Django - 多语言配置

目录 1.Django 多语言基础知识 1.1什么是Django国际化和本地化? 1.2Django LANGUAGE_CODE 1.3关于languages 1.4RequestContext对象针对翻译的变量 2.windows系统下的依赖 3.django多语言配置 3.1settings.py配置 引用gettext_lazy 配置多语言中间件&#x…

【机器学习300问】110、什么是Lasso回归模型?

LASSO回归的全称是Least Absolute Shrinkage and Selection Operator,中文叫“最小绝对收缩和选择算子”,用一个比喻来初步感受一下它的作用: 想象你在整理一个杂乱无章的房间,里面堆满了各种物品(代表众多的预测变量&…

快捷回复话术分享:如何应对顾客愤怒骂人?

在客服的日常工作中,面对情绪激动、甚至愤怒发泄骂人的顾客是常见的挑战。初入此行业的小伙伴们往往在遭遇顾客的激烈情绪时感到手足无措,不知道如何妥善回应。为此,本文将分享一些实用的快捷回复话术和技巧,帮助新手客服更好地处…

flink源码系列:RPC通信

这里写目录标题 1. 本节课目的2.开始本节内容2.1.RPC概念3.2.大数据组件常见的RPC实现技术3.3.Pekko(Akka)3.3.1. Akka、Pekko基本概念3.3.2.Pekko Demo事例3.3.2.1.PekkoData 类3.3.2.2.PekkoRpcReceiverActor类3.3.2.3.PekkoRpcSenderActor 类3.3.2.4.…

使用QT制作QQ登录界面

mywidget.cpp #include "mywidget.h"Mywidget::Mywidget(QWidget *parent): QWidget(parent) {/********制作一个QQ登录界面*********************/this->resize(535,415);//设置登录窗口大小this->setFixedSize(535,415);//固定窗口大小this->setWindowTi…

图片转Base64

在Python中, 可以使用内置的base64模块以及图像处理库(如PIL, 也称为Pillow)来将图片转换为Base64编码的字符串. 以下是一个简单的示例, 说明如何实现这一过程:首先, 需要安装Pillow库(如果尚未安装), 可以使用pip来安装: pip install pillow然后, 可以使用以下Python代码将图片…