概述
每当我们调用Kafka的poll()方法或者使用Spring的@KafkaListener(其实底层也是poll()方法)注解消费Kafka消息时,它都会返回之前被写入Kafka的记录,即我们组中的消费者还没有读过的记录。
这意味着我们有一种方法可以跟踪该组消费者读取过的记录。 如前所述,Kafka的一个独特特征是它不会像许多JMS队列那样跟踪消费过的记录。相反,它允许消费者使用Kafka跟踪每个分区中的位置(位移,也称偏移量)。
我们将更新分区中当前位置的操作称为提交位移(commits offset)。
offset的作用和意义
offset 是 partition 中每条消息的唯一标识,是一个单调递增且不变的值,由 kafka 自动维护,offset 用于定位和记录消息在 partition 中的位置和消费进度,保证 partition 内的消息有序。
offset 是 Kafka 为每条消息分配的一个唯一的编号,它表示消息在分区中的顺序位置。offset 是从 0 开始的,每当有新的消息写入分区时,offset 就会加 1。offset 是不可变的,即使消息被删除或过期,offset 也不会改变或重用。
offset 的作用主要有两个:
- 一是用来定位消息。通过指定 offset,消费者可以准确地找到分区中的某条消息,或者从某个位置开始消费消息。
- 二是用来记录消费进度。消费者在消费完一条消息后,需要提交 offset 来告诉 Kafka broker 自己消费到哪里了。这样,如果消费者发生故障或重启,它可以根据保存的 offset 来恢复消费状态。
offset在语义上拥有两种:Consumer Offset和Committed Offset。
Consumer & Committed
Consumer Offset
- 每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset),它是消费者消费进度的指示器。Consumer Offset保存在消费者客户端中,表示当前消费序号,仅在poll()方法中使用。例如:
消费者第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。
这样消费者下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。
这样就能够保证每次poll消息时,都能够收到不重复的消息。
- 看上去Offset 就是一个数值而已,其实对于 Consumer Group 而言,它是一组 KV 对,Key 是分区,V 对应 Consumer 消费该分区的最新位移 TopicPartition->long
- 不过切记的是消费者位移是下一条消息的位移,而不是目前最新消费消息的位移。
Committed Offset
Committed Offset是保存在kafka客户端上,主要通过commitSync
和commitAsync
API操作,SpringBoot集成Kafka中是使用ack.acknowledge()
。若消费者poll了消息但是不调用API,Committed Offset依旧为0。
Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的过程中,一个分区分配给了一个消费者,消费者将从Committed Offset记录的序号后开始消费。又或者消费者调用了poll消费了5条消息并调用API更新了Committed Offset,然后宕机了过了一会儿又重启了,消费者也可以通过Committed Offset得知从第6条消息消费。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及Consumer Rebalance。
offset 的存储和管理
offset 的存储和管理主要涉及到两个方面:生产者端和消费者端。
生产者端
生产者在向 Kafka 发送消息时,可以指定一个分区键(Partition Key),Kafka 会根据这个键和分区算法来决定消息应该发送到哪个分区。如果没有指定分区键,Kafka 会采用轮询或随机的方式来选择分区。生产者也可以自定义分区算法。
当消息被写入到分区后,Kafka broker 会为消息分配一个 offset,并返回给生产者。生产者可以根据返回的 offset 来确认消息是否成功写入,并进行重试或其他处理。
消费者端
消费者在消费 Kafka 消息时,需要维护一个当前消费的 offset 值,以及一个已提交的 offset 值。当前消费的 offset 值表示消费者正在消费的消息的位置,已提交的 offset 值表示消费者已经确认消费过的消息的位置。
消费者在消费完一条消息后,需要提交 offset 来更新已提交的 offset 值。提交 offset 的方式有两种:自动提交和手动提交。
无论是自动提交还是手动提交,offset 的实际存储位置都是在 Kafka 的一个内置主题中:__consumer_offsets。这个主题有 50 个分区(可配置),每个分区存储一部分消费组(Consumer Group)的 offset 信息。Kafka broker 会根据消费组 ID 和主题名来计算出一个哈希值,并将其映射到 __consumer_offsets 主题的某个分区上。__consumer_offsets主题包含每个分区需要提交的偏移量。 但是,如果消费者组的消费者崩溃或新的消费者加入消费者组,这将触发重新平衡(rebalance),即消费者组内的消费者负责的分区会发生变化。 在重新平衡之后,可以为每个消费者分配一组新的分区而不是之前处理的分区。 然后消费者将读取每个分区的已提交偏移量并从那里继续。
__consumer_offsets 主题是 Kafka 0.9.0 版本引入的新特性,之前的版本是将 offset 存储在 Zookeeper 中。但是 Zookeeper 不适合大量写入,因此后来改为存储在 Kafka 自身中,提高了性能和可靠性。
自动提交(Automatic Commit)
Kafka 提供了一个配置参数 enable.auto.commit,默认为 true,表示开启自动提交功能。自动提交功能会在后台定期(由 auto.commit.interval.ms 参数控制)将当前消费的 offset 值提交给 Kafka broker。
提交偏移量的最简单方法是允许消费者来完成。 如果配置 enable.auto.commit=true
,则消费者每五秒钟将提交客户端从poll()收到的最大偏移量。 五秒间隔是默认值,可通过设置auto.commit.interval.ms
来控制。 就像消费者中的其他机制一样,自动提交由poll loop驱动。 无论您何时轮询,消费者都会检查是否需要提交,如果是,它将提交它在上次轮询中返回的偏移量。
它实际保证的是位移至少要隔一段时间才会提交,如果你是单线程处理消息,那么只有处理完消息后才会提交位移,可能远比你设置的间隔长,因为你的处理逻辑可能需要一定的时间。
提交的时机
- Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。
- 从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
- 但自动提交位移的一个问题在于,它可能会出现重复消费,如果处理失败了下次开始的时候就会从上一次提交的offset 开始处理
存在的问题
数据重复写入
假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。
位移提交和rebalance
虽然自动提交很方便,但是它也有一定的不足。
请注意,默认情况下,自动提交每五秒钟发生一次。 假设我们在最近的提交之后三秒钟并且触发了重新平衡。 在重新平衡之后,所有消费者将从最后提交的偏移开始消费。 在这种情况下,偏移量是三秒钟之前的偏移量,因此在这三秒内到达的所有事件将被处理两次。 可以将提交间隔配置为更频繁地提交并减少记录将被复制的窗口,但是不可能完全消除它们。这是自动提交机制的一个缺陷(其实就是重复消费的问题)。
启用自动提交后,对poll的调用将始终提交上一轮询返回的最后一个偏移量。 它不知道实际处理了哪些事件,因此在再次调用poll()之前,始终处理完poll()返回的所有事件至关重要, 因为和poll()一样,close()方法也会自动提交偏移量。
其实仔细思考,手动提交也存在这个问题,因为rebalance会先让所以的消费者停止消费,因为在kafka的角度来看,消息消费的那一刻,消费已经完成,所以停止消费的时候,你的逻辑很可能没有完成,那么你的offset 也很可能没有提交。在rebalance后分区重新分配的消费者会重新从服务端获取分区的offset值,此时可能是消费端提交前的offset,也会产生重复消费问题。
自动提交很方便,但它们不能给开发人员足够的控制以避免重复的消息。
手动提交
如果 enable.auto.commit 设置为 false,则表示关闭自动提交功能,此时消费者需要手动调用 commitSync 或 commitAsync 方法来提交 offset。手动提交功能可以让消费者更灵活地控制何时以及如何提交 offset。
- 很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。这就引出了另一种位移提交方式:手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false,Kafka Consumer API 为你提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。
同步手动提交
实现方案
- 设置
enable.auto.commit
为 false - 代码中手动提交
public static void main(String[] args) {while (true) {// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {// 模拟消息的处理逻辑System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});try {//处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息consumer.commitSync();} catch (CommitFailedException e) {e.printStackTrace();}}
}
SrpingBoot中是通过ack.acknowledge()
达到手动提交的目的。
@KafkaListener(topics = "order", groupId = "order_group")
public void consume(ConsumerRecord<?, ?> record, Acknowledgment ack) {System.out.println("Received: " + record);ack.acknowledge();
}
存在的问题
从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。
commitSync()的问题在于,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,需要注意的是同步提交会在提交失败之后进行重试。
在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS
异步手动提交
实现方案
- 设置
enable.auto.commit
为 false - 代码中异步提交
下面都是三个测试用例都是异步提交,不同之处在于有没有去实现回调函数。建议生产环境中一定要实现,至少记录下日志。
@Test
public void asynCommit1(){while (true) {// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});consumer.commitAsync();}
}@Test
public void asynCommit2(){while (true) {// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});// 异步回调机制consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception!=null){System.out.println(String.format("提交失败:%s", offsets.toString()));}}});}
}@Test
public void asynCommit3(){while (true) {// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});consumer.commitAsync((offsets, exception) ->{if (exception!=null){System.out.println(String.format("提交失败:%s", offsets.toString()));}});}
}
从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。
存在的问题
commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的,所以只要在程序停止前最后一次提交成功即可。
这里提供一个解决方案,那就是不论成功还是失败我们都将offsets信息记录下来,如果最后一次提交成功那就忽略,如果最后一次没有提交成功,我们可以在下次重启的时候手动指定offset。
综合异步和同步来提交
try {while (true) {// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});consumer.commitAsync();}
} catch (CommitFailedException e) {System.out.println(String.format("提交失败:%s", e.toString()));
} finally {consumer.commitSync();
}
同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。
精细化提交(分批提交)
设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。
对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
while (true) {// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));for (ConsumerRecord<String, String> record : records) {// 数据的处理逻辑System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());// 记录下offset 信息offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));if (count % 100 == 0) {// 回调处理逻辑是nullconsumer.commitAsync(offsets, null);}count++;}try {//处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息consumer.commitSync(offsets);} catch (CommitFailedException e) {e.printStackTrace();}
}
实际应用场景提交方式选择
在实际应用中,我们通常会根据业务需求的不同,选择不同的offset提交方式。
- 如果我们的业务对数据的一致性要求较高,不允许数据的丢失或重复,那么我们应该选择手动提交offset。在手动提交offset的模式下,我们可以在处理完消息后,再提交offset,从而避免数据的重复消费。同时,我们还可以在处理消息和提交offset之间,增加一些容错机制,例如将消息持久化到数据库等,从而避免数据的丢失。
- 如果我们的业务对数据的一致性要求较低,更注重系统的吞吐量,那么我们可以选择自动提交offset。在自动提交offset的模式下,消费者会在消费消息后的一段时间内,自动提交offset,无需手动管理,从而简化了编程模型。
offset 的提交和重置
提交 offset 是消费者在消费完一条消息后,将当前消费的 offset 值更新到 Kafka broker 中的操作。提交 offset 的目的是为了记录消费进度,以便在消费者发生故障或重启时,能够从上次消费的位置继续消费。
重置 offset 是消费者在启动或运行过程中,将当前消费的 offset 值修改为其他值的操作。重置 offset 的目的是为了调整消费位置,以便在需要重新消费或跳过某些消息时,能够实现这个需求。
提交 offset
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
- 位移提交的方式
- 从用户的角度来说,位移提交分为自动提交和手动提交;
- 从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
- 大数据组件都关闭了自动提交,采取了手动提交。
前面已经介绍过自动提交和手动提交这两种方式的区别和用法,这里不再赘述。需要注意的是,无论是自动提交还是手动提交,都不保证提交成功。因为 Kafka broker 可能发生故障或网络延迟,导致提交失败或延迟。因此,消费者需要处理提交失败或延迟的情况。
提交失败:如果提交失败,消费者可以选择重试或放弃。重试的话,可能会导致多次提交同一个 offset 值,但是不会影响正确性,因为 Kafka broker 会忽略重复的 offset 值。放弃的话,可能会导致下次启动时重新消费已经消费过的消息,但是不会影响完整性,因为 Kafka 消息是幂等的。
提交延迟:如果提交延迟,消费者可以选择等待或继续。等待的话,可能会导致消费速度变慢,或者超过 session.timeout.ms
参数设置的时间而被认为已经死亡。继续的话,可能会导致下次启动时漏掉一些没有提交成功的消息。
重置 offset
重置 offset 的方式有两种:手动重置和自动重置。
-
手动重置是指消费者主动调用
seek
或seekToBeginning
或seekToEnd
方法来修改当前消费的 offset 值。手动重置可以让消费者精确地控制从哪个位置开始消费。例如,如果想要重新消费某个分区的所有消息,可以调用seekToBeginning
方法将 offset 设置为 0;如果想要跳过某个分区的所有消息,可以调用seekToEnd
方法将 offset 设置为最大值;如果想要从某个具体的位置开始消费,可以调用seek
方法将 offset 设置为任意值。 -
自动重置是指消费者在启动时根据
auto.offset.reset
参数来决定从哪个位置开始消费。消费者配置auto.offset.reset
表示Kafka中没有存储对应的offset信息的(有可能offset信息被删除),亦或者offset所处位置信息过期了的情况,消费者从何处开始消费消息。auto.offset.reset
参数有三个可选值:earliest, latest 和 none。
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
offset 的消费和保证
offset 的消费和保证主要涉及到两个方面:顺序性和一致性。
顺序性
顺序性是指 Kafka 消息是否按照发送和接收的顺序进行处理。Kafka 只保证分区内的顺序性,即同一个分区内的消息按照 offset 的顺序进行发送和接收。但是不保证主题内或跨主题的顺序性,即不同分区内的消息可能会乱序发送和接收。因此,如果需要保证主题内或跨主题的顺序性,需要在生产者和消费者端进行额外的处理,例如使用同一个分区键或同一个消费组。
一致性
一致性是指 Kafka 消息是否能够被正确地发送和接收,不会出现丢失或重复的情况。Kafka 提供了三种不同级别的一致性保证:最多一次(At most once),最少一次(At least once)和精确一次(Exactly once)。
- 最多一次:最多一次是指 Kafka 消息只会被发送或接收一次或零次,不会出现重复的情况,但是可能会出现丢失的情况。这种保证的实现方式是在生产者端关闭重试功能,在消费者端在消费消息之前提交 offset。这种保证适用于对消息丢失不敏感的场景,例如日志收集或监控。
- 最少一次:最少一次是指 Kafka 消息只会被发送或接收一次或多次,不会出现丢失的情况,但是可能会出现重复的情况。这种保证的实现方式是在生产者端开启重试功能,在消费者端在消费消息之后提交 offset。这种保证适用于对消息重复不敏感的场景,例如计数或累加。
精确一次:精确一次是指 Kafka 消息只会被发送或接收一次,不会出现丢失或重复的情况。这种保证的实现方式是在生产者端和消费者端使用事务功能,在消费者端使用幂等功能。这种保证适用于对消息丢失和重复都敏感的场景,例如转账或支付。
重复消费、消息丢失
重复消费
如果提交的偏移量小于客户端处理的最后一条消息的偏移量,那么最后处理的偏移量与提交的偏移量之间的消息将被处理两次。
如下图:
消息丢失
如果提交的偏移量大于客户端实际处理的最后一条消息的偏移量,那么消费者组将忽略上次处理的偏移量与提交的偏移量之间的所有消息。如下图:
CommitFailedException 异常处理
产生原因
从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。因为KafkaConsumer.commitSync()有重试机制,所以一般的网络原因可以排除,发生这个异常的原因主要就是超时了,但是这个超时不是说提交本身超时了,而是消息的处理时间超长,导致发生了Rebalance,已经将要提交位移的分区分配给了另一个消费者实例。
熟悉的错误:
Exception in thread “main” org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
解决方案
缩短单条消息处理的时间
比如之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
增加 Consumer 端允许下游系统消费一批消息的最大时长
这取决于 Consumer 端参数 max.poll.interval.ms
的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。
值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms
参数的值。不幸的是,session.timeout.ms
参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms
参数,将这部分含义从 session.timeout.ms
中剥离出来的原因之一。
减少下游系统一次性消费的消息总数
这取决于 Consumer 端参数 max.poll.records
的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。
可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。
如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
下游系统使用多线程来加速消费
这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。
之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。
如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。
事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsu
消费端消费后不提交offset情况的分析总结
故最近在使用kafka的过程中遇到了一个疑问,在查阅了一些资料和相关blog之后,做一下总结和记录。
问题:消费者在消费消息的过程中,配置参数
spring.kafka.listener .ackMode
设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?
spring.kafka.listener.ackMode
:指定消息确认模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式,用于控制消息的确认方式。
ackMode是个枚举类型:
- RECORD
每处理一条commit一次 - BATCH(默认)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率 - TIME
每次间隔ackTime的时间去commit - COUNT
累积达到ackCount次的ack去commit - COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit - MANUAL
处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。最终也是批量提交。 - MANUAL_IMMEDIATE
每次处理完业务,手动调用Acknowledgment.acknowledge()后立即提交
参考Kafka系列之SpringBoot集成Kafka
————————————————————————————————————————————————————————————
首先简单的介绍一下消费者对topic的订阅。
- 客户端的消费者订阅了topic后,如果是单个消费者,那么消费者会顺序消费这些topic分区中的数据;
- 如果是创建了消费组有多个消费者,那么kafak的服务端将这些topic平均分配给每个消费者。比如有2个topic,每个topic有2个分区,总共有4个分区,如果一个消费组开了四个消费者线程,那么每个消费者将被分配一个分区进行消费。一般建议是一个消费组里的消费者的个数与订阅的topic的总分区数相等,这样可以达到最高的吞吐量。如果消费者的个数大于订阅的topic的总分区,那么多出的消费者将分配不到topic的分区,等于是白白创建了一个消费者线程,浪费资源。
我们进入正题,对开头提出的问题的总结如些:
注意:以下情况均基于kafka的消费者关闭自动提交offset的条件下。亦是基于同一个消费者组的情况,因为不同的消费者组之间,他们彼此的offset偏移量是完全独立的。
-
如果消费端在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
-
如果在消费的过程中有几条或者一批数据数据没有提交offset(比如异常情况程序没有走到手动提交的代码),后面其他的消息消费后正常提交offset至服务端,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序或者rebalance也不会重新消费。
-
消费端如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你没有提交offset的消息时你新增或者减少消费端,此时会发生rebalance现象,即可再次消费到这个未提交offset的数据,产生重复消费问题。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在发生rebalance现象之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。
总结就是如果消费端不提交或者抛异常,相当于一直没有提交offset,在此程序运行过程中不会重复消费。除非是重启,或者有新的消费者退出或者加入导致重新平衡的时候才会再次触发消费;而且有新的消费端正常消费且提交offset以后,服务端就会更新最新的offset,这样就算程序重启或者重新平衡也不会重新消费。