Kafka
- 1、为什么要使用 kafka
- 2、Kafka 的架构是怎么样的
- 3、什么是 Kafka 的重平衡机制
- 4、Kafka 几种选举过程
- 5、Kafka 高水位了解过吗
- 6、Kafka 如何保证消息不丢失
- 7、Kafka 如何保证消息不重复消费
- 8、Kafka 为什么这么快
1、为什么要使用 kafka
1. 解耦:在一个复杂的系统中,不同的模块或服务之间可能需要相互依赖,如果直接使用函数调用或者API调用的方式,会造成模块之间的耦合,当其中一个模块发生改变时,需要同时修改调用方和被调用方的代码。而使用消息队列作为中间件,不同的模块可以将消息发送到消息队列中,不需要知道具体的接收方是谁,接收方可以独立地消费消息,实现了模块之间的解耦。
2. 异步:有些操作比较耗时,例如发送邮件、生成报表等,如果使用同步的方式处理,会阻塞主线程或者进程,导致系统的性能下降。而使用消息队列,可以将这些操作封装成消息,放入消息队列中,异步地处理这些操作,不影响主流程的执行,提高了系统的性能和响应速度。
3. 削峰:削峰是一种在高并发场景下平衡系统压力的技术,在削峰的过程中,通常使用消息队列作为缓冲区,将请求放入消息队列中,然后在系统负载低的时候进行处理。这种方式可以将系统的峰值压力分散到较长的时间段内,减少瞬时压力对系统的影响,从而提高系统的稳定性和可靠性。
2、Kafka 的架构是怎么样的
1、Producer 生产者
生产者负责将消息发布到 kafka 中的一个或多个主题,每个主题包含一个或多个分区,消息保存在各个分区上,每一个分区都是一个顺序的,分区中的消息都被分了一个序列号,称之为偏移量,就是指消息在分区中的位置,所有分区的消息加在一起就是一个主题的所有消息。
分区策略
分区策略 | 说明 |
---|---|
轮询策略 | 按顺序轮流将每条数据分配到每个分区中 |
随机策略 | 每次都随机地将消息分配到每个分区 |
按键保存策略 | 生产者发送数据的时候,可以指定一个key,计算这个key的hashCodet值,按照hashCodel的值对不同消息进行存储 |
如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下,需要将分区数目设为1 或者指定消息的 key。
消息发送
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
生产者架构图
消息在通过 send 方法发往 broker 的过程中,有可能需要经过拦截器、序列化器、分区器一系列之后才能被真正地发往 broker。整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender发送线程。
① 主线程
拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。通过自定义实现 ProducerInterceptor 接口来使用。
序列化
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给 Kafka。消费者需要用反序列化把从 Kafka 中收到的字节数组转换成相应的对象。自带的有StringSerializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long等,还可以自定义序列化器。
分区器
如果消息中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。也可以自定义分区器。
消息累加器
消息累加器主要用来缓存消息以便 Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。消息累加器的缓存大小可以通过buffer.memory
配置。在消息累加器的内部为每个分区都维护了一个双端队列,主线程发送过来的消息都会被追加到某个双端队列中,队列中的内容就是 ProducerBatch,即Dqueue< ProducerBatch >。
当一条消息流入消息累加器,如果这条消息小于batch.size
参数大小则以batch.size
参数大小创建 ProducerBatch,否则以消息的实际大小创建 ProducerBatch。
② Sender发送线
程负责从消息累加器中获取消息并将其发送到 Kafka 中。后续 Sender 从缓存中获取消息,进行转换,发送到broker。在发送前还会保存到InFlightRequests中,作用是缓存已经发送出去但还没有收到响应的请求,缓存数量由max.in.flight.requests.per.connection
参数确定,默认是5,表示每个连接最多缓存5个未响应的请求。
2、Consumer 消费者
消费者,消息的订阅者,可以订阅一个或多个主题,并且依据消息生产的顺序读取他们,消费者通过检查消息的偏移量来区分已经读取过的消息。消费者一定属于某一个特定的消费组。消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 kafkal 的消息,我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。消息最终还是会被删除的,默认生命周期为1周(7*24小时)。
订阅主题和分区
通过 subscribe 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配政策来自动分配各个消费者与分区的关系,以实现消费者负载均衡和故障自动转移。而通过 assign 方法则没有。
消息消费
Kafka 中的消息是基于推拉模式的。Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll 方法,而 poll 方法返回的是所订阅的主题(分区)上的一组消息。如果没有消息则返回空。
public ConsumerRecords<K, V> (final Duration timeout)
timeout 用于控制 poll 方法的阻塞时间,没有消息时会阻塞。
位移提交
Kafka 中的每条消息都有唯一的 offset,用来标识消息在分区中对应的位置。Kafka 默认的消费唯一的提交方式是自动提交,由enable.auto.commit
配置,默认为true。自动提交不是每一条消息提交一次,而是定期提交,周期由auto.commit.interval.ms
配置,默认为5秒。
自动提交可能发生消息重复或者丢失的情况,Kafka 还提供了手动提交的方式。enable.auto.commit
配置为false开启手动提交。
指定位移消费
在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset
的配置来决定从何处开始进行消费。默认值为 lastest,表示从分区末尾开始消费消息;earliest 表示从起始开始消费;none为不进行消费,而是抛出异常。
seek 可以从特定的位移处开始拉去消息,得以追前消费或回溯消费。
public void seek(TopicPartition partition, long offset)
再均衡
再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。再均衡后也可能出现重复消费的情况。所以应尽量避免不必要的再均衡发生。
3、Consumer Group 消费者群组
同一个消费者组中保证每个分区只能被一个消费者使用 ,不会出现多个消费者读取同一个分区的情况,通过这种方式,消费者可以消费包含大量消息的主题。而且如果某个消费者失效,群组里的其他消费者可以接管失效悄费者的工作。
4、Broker 服务器
一个独立的 Kafka 服务器被称为 broker, broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
在集群中,每个分区都有一个Leader Broker和多个Follower Broker,只有Leader Broker才能处理生产者和消费者的请求,而Follower Broker只是Leader Broker的备份,用于提供数据的冗余备份和容错能力。如果Leader Broker发生故障,Kafka集群会自动将Follower Broker提升为新的Leader Broker,从而实现高可用性和容错能力。
AR、ISR、OSR
- 分区中的所有副本统称为AR。
- 所有与leader副本保持一定同步程度的副本组成ISR。
- 与leader副本同步滞后过多的副本组成OSR。
- AR = ISR +OSR。正常情况 应该AR=ISR,OSR集合为空。
5、 Log 日志存储
一个分区对应一个日志文件(Log),为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log 切分为多个 LogSegment,便于消息的维护和清理。Log在物理上只以(命名为topic-partitiom)文件夹的形式存储,而每个LogSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment的索引文件和数据文件
- partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值
- 数值大小为64位,20位数据字符长度,没有数字用0填充
消息压缩
一条消息通常不会太大,Kafka 是批量消息压缩,通过compression.type
配置,默认为 producer,还可以配置为gzip、snappy、lz4,uncompressed表示不压缩。
日志索引
Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(log.index.interval.bytes
指定,默认4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引文件项和时间戳索引文件项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。
日志清理
Kafka提供两种日志清理策略:
- 日志删除:按照一定的保留策略(基于时间、日志大小或日志起始偏移量)直接删除不符合条件的日志分段。
- 日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。
页缓存
页缓存是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问,减少对磁盘IO的操作。
零拷贝
所谓的零拷贝是将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。减少了数据拷贝的次数和内核和用户模式之间的上下文切换。对于Linux操作系统而言,底层依赖于sendfile()方法实现。
一般的数据流程:磁盘 -> 内核 -> 应用 -> Socket -> 网卡,数据复制4次,上下文切换4次。
流程步骤:
- 操作系统将数据从磁盘文件中读取到内核空间的页面缓存。
- 应用程序将数据从内核空间读入用户空间缓冲区。
- 应用程序将读到数据写回内核空间并放入socket缓冲区。
- 操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。
通过网卡直接去访问系统的内存,就可以实现现绝对的零拷贝了。这样就可以最大程度提高传输性能。通过“零拷贝”技术,我们可以去掉那些没必要的数据复制操作, 同时也会减少上下文切换次数。
通过上图可以看到,零拷贝技术只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。
6、ZooKeeper
ZooKeeper是Kafka集群中使用的分布式协调服务,用于维护Kafka集群的状态和元数据信息,例如主题和分区的分配信息、消费者组和消费者偏移量等。
3、什么是 Kafka 的重平衡机制
Kafka 的重平衡(Rebalance)机制是指在消费者组(Consumer Group)中,当消费者数量发生变化(如新增、减少或崩溃)时,Kafka 重新分配分区(Partition)给消费者的过程。重平衡的目的是确保每个分区只被组内的一个消费者消费,从而实现负载均衡和高可用性。
重平衡的个触发条件:
- 消费者加入组:新的消费者加入消费者组。
- 消费者离开组:消费者主动离开(如关闭)或崩溃。
- 订阅主题变化:消费者组订阅的主题或分区数量发生变化。
- 分区数量变化:主题的分区数量发生变化。
重平衡的过程
- 选举协调者:消费者组中的某个 Broker 被选为协调者,负责管理重平衡过程。
- 发送加入组请求:所有消费者向协调者发送加入组请求(JoinGroup Request)。
- 选举领导者:协调者从消费者中选出一个领导者(Leader),其他消费者成为跟随者(Follower)。
- 分配分区:领导者根据分区分配策略(如 RangeAssignor、RoundRobinAssignor 等)为每个消费者分配分区,并将分配结果发送给协调者。
- 同步分配结果:协调者将分配结果同步给所有消费者,消费者根据分配结果开始消费。
重平衡的影响
- 消费暂停:在重平衡期间,消费者会暂停消费,直到分配完成。
- 性能开销:频繁的重平衡会增加集群的负载,影响整体性能。
- 重复消费:重平衡可能导致消费者重新读取已处理的消息,造成重复消费。
减少重平衡的策略
- 优化会话超时:合理设置 session.timeout.ms 和 heartbeat.interval.ms,避免因网络延迟导致的误判。
- 减少消费者变动:尽量避免频繁地启动或关闭消费者。
- 使用静态成员资格:Kafka 2.3+ 支持静态成员资格(Static Membership),减少因消费者短暂离线触发的重平衡。
- 优化分区分配策略:根据业务需求选择合适的分配策略,减少不必要的重平衡。
Kafka 的重平衡机制确保消费者组在变化时能重新分配分区,实现负载均衡和高可用性。尽管重平衡会带来一定的性能开销,但通过合理配置和优化,可以显著减少其影响。
4、Kafka 几种选举过程
- 控制器(Controller)选举
集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时
会去尝试读取/controller节点的brokerid的值,读取到的brokerid的值不为-1知道已经有其他broker节点成功竞选为控制器,就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。
- 分区领导者(Partition Leader)选举
controller感知到分区leader所在的broker挂了,controller会从replicas副本列表(同时在ISR列表里)中取出第一个broker作为leader。
分区领导者负责处理读写请求,选举过程由控制器管理。触发选举的情况包括:
- 分区领导者崩溃
- 分区副本不再同步
- 手动触发选举
- 消费者组领导者(Consumer Group Leader)选举
GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法很简单,当消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果当前leader退出消费组,则会挑选以HashMap结构保存的消费者节点数据中,第一个键值对来作为leader。
在消费者组中,协调者负责管理消费者和分区分配。选举过程如下:
- 消费者加入组时,向协调者发送加入请求。
- 协调者从消费者中选出一个领导者,负责分区分配。
- 领导者根据分配策略分配分区,并将结果发送给协调者。
- 协调者将分配结果同步给所有消费者。
- ZooKeeper 领导者选举(如果使用 ZooKeeper)
在依赖 ZooKeeper 的 Kafka 集群中,ZooKeeper 通过 Zab 协议选举领导者,确保其自身的高可用性。Kafka Broker 利用 ZooKeeper 进行元数据管理和选举。
5、Kafka 高水位了解过吗
Kafka 的高水位(High Watermark,HW)是一个关键概念,用于确保数据的一致性和可靠性。高水位机制在 Kafka 的分区副本管理中起着重要作用,尤其是在保证消息的持久化和消费者的可见性方面。
什么是高水位(High Watermark)
高水位是分区中已成功复制到所有同步副本(ISR,In-Sync Replicas)的消息的偏移量(Offset)。它表示消费者可以安全读取的消息范围,即消费者只能读取到高水位之前的消息。
- 高水位之前的消息:这些消息已经被所有 ISR 副本确认,是已提交(Committed)的消息,消费者可以安全消费。
- 高水位之后的消息:这些消息尚未被所有 ISR 副本确认,可能丢失,因此对消费者不可见。
高水位标识了一个特定的消息偏移量(offset),即一个分区中已提交消息的最高偏移量(offset),消费者只能拉取到这个offset之前的消息。消费者可以通过跟踪高水位来确定自己消费的位置。
高水位的作用
- 消费进度管理:消费者可以通过记录上一次消费的偏移量,然后将其与分区的高水位进行比较,来确定自己的消费进度。消费者可以在和高水位对北比之后继续消费新的消息,确保不会错过任何已提交的消息。这样,消费者可以按照自己的节奏进行消费,不受其他消费者的影响。
- 数据的可靠性:高水位还用于确保数据的可靠性。在Kafka中,只有消息被写入主副本并被所有的同步副本ISR确认后,才被认为是已提交的消息。高水位表示已经被提交的消息的边界。只有高水位之前的消息才能被认为是已经被确认的,其他的消息可能会因为副本故障或其他原因而丢失。
高水位的工作原理
- 生产者写入消息:生产者将消息发送到分区的领导者(Leader),领导者将消息写入本地日志。领导者将消息复制到所有 ISR 副本。
- 更新高水位:当所有 ISR 副本都成功复制了某条消息后,领导者会更新高水位。高水位是 ISR 副本中最小日志结束偏移量(LEO,Log End Offset)的最小值。
- 消费者读取消息:消费者只能读取到高水位之前的消息。如果消费者尝试读取高水位之后的消息,会被阻塞,直到这些消息被提交。
高水位更新示例
假设一个分区有 3 个副本(Leader 和 2 个 Follower),它们的 LEO 分别为:
Leader LEO = 10
Follower1 LEO = 9
Follower2 LEO = 8
此时高水位为 8,因为它是 ISR 副本中 LEO 的最小值。消费者只能读取偏移量 0 到 7 的消息。
高水位与 LEO(Log End Offset)的关系
-
LEO(Log End Offset):LEO 是分区副本中下一条待写入消息的偏移量。每个副本(包括领导者和跟随者)都有自己的 LEO。
-
高水位(HW):高水位是 ISR 副本中 LEO 的最小值。它表示已提交消息的边界。
当消费者消费消息时,它可以使用高水位作为参考点,只消费高水位之前的消息,以确保消费的是已经被确认的消
息,从而保证数据的可靠性。如上图,只消费offet为6之前的消息。
我们都知道,在Kafka中,每个分区都有一个Leader副本和多个Follower副本。当Leader副本发生故障时,Kafka会选择一个新的Leader副本。这个切换过程中,需要保证数据的一致性,即新的Leader副本必须具有和旧Leader副本一样的消息顺序。为了实现这个目标,Kafka引入了Leader Epoch
的概念。
Leader Epoch的过程
- 每个分区都有一个初始的Leader Epoch,通常为0。
- 当Leader副本发生故障或需要进行切换时,Kafka会触发副本切换过程。
- 副本切换过程中,Kafka会从ISR同步副本)中选择一个新的Follower副本作为新的Leader副本。
- 新的Leader副本会增加自己的Leader Epoch,使其大于之前的Leader Epoch。这表示进入了一个新的任期。
- 新的Leader副本会验证旧Leader副本的状态以确保数据的一致性。它会检查旧Leader副本的Leader Epoch和高水位。
- 如果旧Leader副本的Leader Epoch小于等于新Leader副本的Leader Epoch,并且旧Leader副本的高水位小于等于新Leader副本的高水位,则验证通过。
- 验证通过后,新的Leader副本开始从旧Leader副本复制数据。它只会接受旧Leader副本的Leader Epoch和高水位之前的消息。
- 一旦新的Leader副本复制了旧Leader副本的所有数据,并达到了与旧Leader副本相同的高水位,副本切换过程就完成了。
6、Kafka 如何保证消息不丢失
- 消息确认机制
/*** (1)acks=0:生生产者不等待确认,消息可能丢失,其实就是保证消息不会重复发送或者重复消费,但是速度最快。同时重试配置不会发生作用。* (2)acks=1:默认值,领导者确认后即认为消息已发送,但若领导者在同步前崩溃,消息可能丢失。* (3)acks=all或acks=-1:所有同步副本确认后,消息才被认为已提交,确保消息不丢失。*/
props.put(ProducerConfig.ACKS_CONFIG, "all");/*** 如果请求失败,生产者会自动重试,如果启用重试*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);/*** 消息发送超时或失败后,间隔的重试时间*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
并且生产者调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。所以,我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。可以采用为其添加回调函数的形式,如果消息发送失败的话,可以对失败消息做记录,我们检查失败的原因之后重新发送即可!
// 异步发送消息
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("发送成功");} else {System.out.println("发送失败");}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic‐" + metadata.topic() + "|partition‐"+ metadata.partition() + "|offset‐" + metadata.offset());}}
});
- 副本机制和持久化存储
复制机制:Kafka使用复制机制来保证数据的可靠性。每个分区都有多个副本,副本可以分布在不同的节点上,当一个节点宕机时,其他节点上的副本仍然可以提供服务,保证消息不丢失。
ISR机制:Kafka使用ISR机制来确保消息不会丢失。ISR是指已经复制了数据并与主节点保持同步的节点集合,只有SR中的节点才会被认为是“可用”的节点,只有在ISR中的节点上的副本才会被认为是“可用”。
持久化存储:Kafka 将消息持久化存储在磁盘上,即使 Broker 重启,消息也不会丢失。Kafka 的日志结构设计支持高效的消息写入和读取。
在服务端,也有一些参数配置可以调节来避免消息丢失:
replication.factor //表示分区副本的个数,replication.factor>1 当1eader副本挂了,follower副本会被选举为leader继续提供服务。
min.insync.rep1icas //表示ISR最少的副本数量,通常设置min.insync.replicas>1,这样才有可用的fol1ower副本执行替换,保证消息不丢
unclean.leader.election.enable=false //是否可以把非ISR集合中的副本选举为leader副本。
- 消费者偏移量管理
消费者通过提交偏移量来记录消费进度。Kafka 提供自动和手动提交偏移量的方式,确保消费者在崩溃后能从正确的位置继续消费,避免消息丢失。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
while (true) {/*** poll() API 是拉取消息的长轮询 比如设置了1000毫秒 并不是在这1秒钟内只拉取一次 而是当没有拉取到数据时 会多次拉取数据 直到拉取到数据 然后继续循环*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}if (records.count() > 0) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了// consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getStackTrace());}}});}
}
这种情况的解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
- 高水位机制
Kafka 使用高水位(High Watermark)机制确保消费者只能读取已提交的消息,避免读取未完全复制的消息。
- 数据保留策略
Kafka 允许配置消息的保留时间和大小,确保在指定时间内或达到大小限制前,消息不会被删除。
7、Kafka 如何保证消息不重复消费
- 消费者偏移量管理
Kafka 消费者通过提交偏移量(Offset)来记录消费进度。如果偏移量提交不当,可能导致重复消费。为了避免重复消费,可以采取以下策略:
-
手动提交偏移量:
- 消费者在处理完消息后,手动提交偏移量(enable.auto.commit=false)。
- 确保消息处理和偏移量提交在同一个事务中,避免消息处理成功但偏移量未提交的情况。
-
同步提交:
- 使用同步提交(commitSync())而不是异步提交(commitAsync()),确保偏移量提交成功后再继续消费。
-
幂等性处理:
- 在消费者端实现幂等性逻辑,即使消息重复消费,也不会对系统产生影响。
- 事务机制
Kafka 从 0.11 版本开始支持事务机制,可以实现精确一次语义(Exactly-Once Semantics)。通过事务机制,生产者和消费者可以确保消息的精确一次处理。
-
生产者事务:
- 生产者开启事务,将消息发送和偏移量提交放在同一个事务中。
- 如果事务失败,消息和偏移量都不会提交。
-
消费者事务:
- 消费者在消费消息时,可以将消息处理和偏移量提交放在同一个事务中。
- 如果事务失败,消息处理和偏移量提交都会回滚。
-
配置:
- 生产者配置:enable.idempotence=true 和 transactional.id。
- 消费者配置:isolation.level=read_committed,确保只读取已提交的消息。
- 幂等性生产者
Kafka 生产者支持幂等性(Idempotence),可以避免消息重复发送。
- 配置:
- 设置 enable.idempotence=true,生产者会为每条消息分配唯一的序列号(Sequence Number),Broker 会根据序列号去重。
- 作用:
- 即使在网络重试的情况下,Broker 也不会重复存储相同的消息。
- 消费者组的重平衡
在消费者组发生重平衡(Rebalance)时,可能会导致部分消息重复消费。为了减少重复消费,可以采取以下措施:
-
减少重平衡频率:
- 优化消费者组的配置,如 session.timeout.ms 和 heartbeat.interval.ms,避免不必要的重平衡。
-
静态成员资格:
- 使用 Kafka 2.3+ 的静态成员资格(Static Membership)功能,减少因消费者短暂离线触发的重平衡。
- 外部存储去重
如果 Kafka 本身无法完全避免重复消费,可以在消费者端使用外部存储(如数据库、Redis)实现去重:
-
记录已处理消息:
- 在外部存储中记录已处理消息的唯一标识(如消息 ID 或偏移量)。
- 在处理消息前,先检查该消息是否已处理。
-
实现幂等性:
- 在消费者端实现幂等性逻辑,确保即使消息重复消费,也不会对系统产生影响。
- 消息唯一标识
为每条消息分配唯一标识(如消息 ID),在消费者端根据唯一标识去重:
- 生产者生成唯一 ID:
- 生产者在发送消息时,为每条消息生成唯一 ID。
- 消费者去重:
- 消费者在处理消息时,检查唯一 ID 是否已处理。
8、Kafka 为什么这么快
-
批处理(Batching):生产者将多条消息打包成一个批次(Batch)发送,减少了网络请求的次数。通过配置 linger.ms 和 batch.size 参数,可以优化批处理的大小和延迟。消费者一次拉取多个消息,减少了网络往返时间(RTT)和系统调用次数。
-
分区和并行化:Kafka 将主题(Topic)划分为多个分区(Partition),每个分区可以独立读写。分区机制允许生产者和消费者并行操作,提高了吞吐量。每个分区可以有多个副本(Replica),副本之间并行同步数据。
-
高效的存储格式:Kafka 将日志文件划分为多个固定大小的段(Segment),便于管理和清理。个段文件以偏移量命名,方便快速定位和读取。Kafka 为每个日志段维护一个索引文件,支持快速查找消息。
-
压缩机制:消息压缩,Kafka 支持多种压缩算法(如 Snappy、Gzip、LZ4),减少网络传输和磁盘存储的开销。Kafka 支持多种压缩算法(如 Snappy、Gzip、LZ4),减少网络传输和磁盘存储的开销。
-
零拷贝技术(Zero-Copy):传统的数据传输需要多次拷贝:磁盘 -> 内核缓冲区 -> 用户缓冲区 -> 网络缓冲区。Kafka 使用零拷贝技术,通过 sendfile 系统调用直接将数据从磁盘文件传输到网络通道,避免了用户空间和内核空间之间的数据拷贝,大幅减少了 CPU 开销和上下文切换。