分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

文章目录

    • 1. 自动提交消费位移
    • 2. 自动提交消费位移存在的问题?
    • 3. 手动提交消费位移
      • 1. 同步提交消费位移
      • 2. 异步提交消费位移
      • 3. 同步和异步组合提交消费位移
      • 4. 提交特定的消费位移
      • 5. 按分区提交消费位移
    • 4. 消费者查找不到消费位移时怎么办?
    • 5. 如何从特定分区位移处读取消息?
    • 6. 如何优雅地退出轮询循环消费?

1. 自动提交消费位移

最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数:

  • enable.auto.commit:是否开启自动提交 offset 功能,默认为 true;
  • auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒;

如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返回的最大偏移量,即将拉取到的每个分区中最大的消息位移进行提交。提交时间间隔通过 auto.commit.interval.ms 来设定,默认是5秒。与消费者中的其他处理过程一样,自动提交也是在轮询循环中进行的。消费者会在每次轮询时检查是否该提交偏移量了,如果是,就会提交最后一次轮询返回的偏移量。

① 启动消费者消费程序,并设置为自动提交消费者位移的方式:

public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-ni");// 显式配置消费者自动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 显式配置消费者自动提交位移的事件间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,4);// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList("ni"));// 消费数据while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.printf("主题 = %s, 分区 = %d, 位移 = %d, " + "消息键 = %s, 消息值 = %s\n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}}
}

② 启动生产者程序发送3条消息,消息的内容都为 hello,kafka

③ 查看消费者消费的消息记录:

主题 = ni, 分区 = 0, 位移 = 0, 消息键 = null, 消息值 = hello,kafka
主题 = ni, 分区 = 0, 位移 = 1, 消息键 = null, 消息值 = hello,kafka
主题 = ni, 分区 = 0, 位移 = 2, 消息键 = null, 消息值 = hello,kafka

可以看到,消费者消费分区的最新消息的位移为 offset= 2,即消费者的消息位移为 offset =2;

④ 查看消费者提交的位移:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning[group-ni,ni,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1692168114999, expireTimestamp=None)

可以看到,消费者的消息位移为 offset =2,但是消费者的提交位移为 offset =3;

2. 自动提交消费位移存在的问题?

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用,再均衡完成之后,接管分区的消费者将从最后一次提交的偏移量的位置开始读取消息)。可以通过修改提交时间间隔来更频繁地提交偏移量,缩小可能导致重复消息的时间窗口,但无法完全避免。

在使用自动提交时,到了该提交偏移量的时候,轮询方法将提交上一次轮询返回的偏移量,但它并不知道具体哪些消息已经被处理过了。所以,在再次调用poll()之前,要确保上一次poll()返回的所有消息都已经处理完毕(调用close()方法也会自动提交偏移量)。通常情况下这不会有什么问题,但在处理异常或提前退出轮询循环时需要特别小心。

虽然自动提交很方便,但是没有为避免开发者重复处理消息留有余地。

3. 手动提交消费位移

在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。

开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false,让应用程序自己决定何时提交偏移量。手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。

① 同步提交位移是指消费者在提交位移时会阻塞,直到提交完成并收到确认。它会提交 poll() 返回的最新偏移量,提交成功后马上返回,如果由于某些原因提交失败就抛出异常。 commitAsync() 方法有四个不同的重载方法,具体定义如下:

public void commitSync()
public void commitSync(Duration timeout)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) 
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) 

② 异步提交位移在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:

public void commitAsync() 
public void commitAsync(OffsetCommitCallback callback) 
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) 

1. 同步提交消费位移

在消费消息的循环中,处理完当前批次的消息后,在轮询更多的消息之前,调用 commitSync() 方法提交当前批次最新的偏移量,这会阻塞当前线程,直到位移提交完成并收到确认。 只要没有发生不可恢复的错误,commitSync() 方法就会一直尝试直至提交成功。如果提交失败,就把异常记录到错误日志里。

public void commitSync()
@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList("topic-01"));// 消费数据while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {// 业务处理拉取的消息}try{// 消费者手动提交消费位移:同步提交方式consumer.commitSync();}catch (CommitFailedException exception){log.error("commit failed....");}}}
}

还可以将消费者程序修改为批量处理+批量提交的方式:

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList("topic-01"));// 消费数据while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));int minSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();for (ConsumerRecord<String, String> record : consumerRecords) {buffer.add(record);}try{// 消费者手动提交消费位移:同步提交方式if(buffer.size()>minSize){// 批量处理消息// ...}// 手动提交位移:同步方式consumer.commitSync();}catch (CommitFailedException exception){log.error("commit failed....");}}}
}

上面的示例中将拉取到的消息存入缓存 buffer,等到积累到足够多的时候,也就是大于等于200个的时候,再做相应的批量处理,之后再做批量提交。

commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如 CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException 等,我们可以将其捕获并做针对性的处理。

需要注意的是,同步提交位移时需要确保在处理完消息后再进行提交,因为 commitSync() 将会提交 poll() 返回的最新偏移量,如果你在处理完所有记录之前就调用了 commitSync(),那么一旦应用程序发生崩溃,就会有丢失消息的风险(消息已被提交但未被处理)。如果应用程序在处理记录时发生崩溃,但 commitSync() 还没有被调用,那么从最近批次的开始位置到发生再均衡时的所有消息都将被再次处理——这或许比丢失消息更好,或许更坏。

2. 异步提交消费位移

同步提交有一个缺点,在broker对请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,则会增加潜在的消息重复。这个时候可以使用异步提交API。只管发送请求,无须等待broker做出响应。

public void commitAsync() 
@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList("topic-01"));// 消费数据while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 业务逻辑处理}// 异步提交消费位移consumer.commitAsync();}}
}

在提交成功或碰到无法恢复的错误之前,commitSync() 会一直重试,但commitAsync()不会,这是commitAsync() 的一个缺点。之所以不进行重试,是因为 commitAsync() 在收到服务器端的响应时,可能已经有一个更大的位移提交成功。假设我们发出一个提交位移2000的请求,这个时候出现了短暂的通信问题,服务器收不到请求,自然也不会做出响应。与此同时,我们处理了另外一批消息,并成功提交了位移3000。如果此时 commitAsync() 重新尝试提交位移2000,则有可能在位移3000之后提交成功。这个时候如果发生再均衡,就会导致消息重复。

之所以提到这个问题并强调提交顺序的重要性,是因为 commitAsync() 也支持回调,回调会在broker返回响应时执行。回调经常被用于记录位移提交错误或生成指标,如果要用它来重试提交位移,那么一定要注意提交顺序。

public void commitAsync(OffsetCommitCallback callback)
@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList("topic-01"));// 消费数据while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 业务逻辑处理}// 异步提交消费位移consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap, Exception exception) {if(exception!=null){log.info("fail to commit offsets:{}",offsetAndMetadataMap,exception);}}});}}
}

异步提交中如何实现重试:我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移已经提交了,不需要再进行本次重试;如果两者相同,则说明可以进行重试提交。

3. 同步和异步组合提交消费位移

一般情况下,偶尔提交失败但不进行重试不会有太大问题,因为如果提交失败是由于临时问题导致的,后续的提交总会成功。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;但如果这是发生在消费者被关闭或再均衡前的最后一次提交,则要确保提交是成功的,可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("topic-01"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll( Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务逻辑处理}// 异步提交位移consumer.commitAsync();}} catch (Exception e) {log.error("Unexpected error", e);} finally {try {// 同步提交位移consumer.commitSync();}finally{consumer.close();}}}
}

4. 提交特定的消费位移

对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的。但如果想要更频繁地提交位移该怎么办?如果 poll() 返回了一大批数据,那么为了避免可能因再均衡引起的消息重复,想要在批次处理过程中提交位移该怎么办?这个时候不能只是调用 commitSync() 或commitAsync(),因为它们只会提交消息批次里的最后一个位移。

幸运的是,消费者API允许在调用 commitSync() 和 commitAsync() 时传给它们想要提交的分区和位移:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

在这里插入图片描述

如图:消费者的提交位移=当前一次poll拉取的分区消息的最大位移offset + 1,这个提交位移就是下次

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("topic-01"));ConcurrentHashMap<TopicPartition,OffsetAndMetadata> offsets = new ConcurrentHashMap<>();int count = 0;while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 消息所属的主题和分区TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());// 消费者提交的消费位移=当前消费消息的位移+1OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);offsets.put(topicPartition, offsetAndMetadata);if(count % 1000 == 0){consumer.commitAsync(offsets,null);}count++;}}}
}

5. 按分区提交消费位移

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("topic-01"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 获取拉取的消息包含的所有分区列表Set<TopicPartition> partitions = consumerRecords.partitions();for (TopicPartition partition : partitions) {// 获取当前分区要消费的消息List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);// 获取当前分区消息的最大位移long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 当前分区的消费位移提交 = 当前分区消息的最大位移 + 1Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumerOffset + 1));consumer.commitSync(topicPartitionOffsetAndMetadataMap);}}}
}

4. 消费者查找不到消费位移时怎么办?

当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当__consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。当 Kafka 中没有初始位移或服务器上不再存在当前位移时,该怎么办?

此时会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,auto.offset.reset 参数的取值如下:

  • latest(默认值):表示从分区末尾开始消费消息。
  • earliest: 表示消费者会从起始处,也就是0开始消费。
  • none:查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常。如果能够找到消费位移,那么配置为“none”不会出现任何异常。

如果配置的不是“latest”、“earliest”和“none”,则会报出ConfigException异常。

auto.offset.reset 参数用于指定消费者在启动时,如果找不到消费位移应该从哪里开始消费消息。 如果能够找到消费位移,那么消费者会从该位移处开始消费消息,那么 auto.offset.reset 参数并不会奏效,只有在找不到消费位移时才会生效。如果发生位移越界,即消费位移超出了消息队列中消息的数量或位置范围,那么 auto.offset.reset 参数也会生效。

5. 如何从特定分区位移处读取消息?

如果消费者能够找到消费位移,使用 poll() 可以从各个分区的最新位移处读取消息, 而且提供的 auto.offset.reset 参数也可以在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。但是有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们得以追前消费或回溯消费。

public void seek(TopicPartition partition, long offset)
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)

① seek() 方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的。也就是说,在执行 seek() 方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置:

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("topic-01"));// 执行一次poll() 方法完成分区分配的逻辑//  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));Set<TopicPartition> topicPartitions = consumer.assignment();for (TopicPartition topicPartition : topicPartitions) {consumer.seek(topicPartition,10);}while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));// ...}}
}

② 如果 poll() 方法中的参数为0,此方法立刻返回,那么 poll() 方法内部进行分区分配的逻辑就会来不及实施,也就是说,消费者此时并未分配到任何分区,那么 topicPartitions 便是一个空列表。那么这里的 timeout 参数设置为多少合适呢?太短会使分配分区的动作失败,太长又有可能造成一些不必要的等待。我们可以通过 KafkaConsumer的 assignment()方法来判定是否分配到了相应的分区:

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("topic-01"));Set<TopicPartition> topicPartitions = consumer.assignment();// 此时说明还未完成分区分配while (topicPartitions.size()==0){consumer.poll(Duration.ofMillis(100));topicPartitions = consumer.assignment();}for (TopicPartition topicPartition : topicPartitions) {// 重置每个分区的消费位置为10consumer.seek(topicPartition,10);}while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));// 消费消息}}
}

③ 如果对未分配到的分区执行seek() 方法,那么会报出 IllegalStateException 的异常。类似在调用subscribe() 方法之后直接调用seek() 方法:

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("topic-01"));// 未完成分区分配,直接调用seek方法,重置分区1的消费位置为10consumer.seek(new TopicPartition("topic-01",1),10);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));// 消费消息}}
}

报错:

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition topic-01-1

④ 如果消费组内的消费者在启动的时候能够找到消费位移,那么消费者就会从该位移处开始消费消息。除非发生位移越界,即消费位移超出了消息队列中消息的数量或位置范围,否则 auto.offset.reset 参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要seek() 方法的帮助了,指定从分区末尾开始消费:

endOffsets() 方法用来获取指定分区的末尾的消息位置, endOffsets 的具体方法定义如下:

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)

其中 partitions 参数表示分区集合,而 timeout 参数用来设置等待获取的超时时间。如果没有指定 timeout 参数的值,那么 endOffsets() 方法的等待时间由客户端参数 request.timeout.ms 来设置,默认值为 30000。与 endOffsets 对应的是 beginningOffset() 方法,一个分区的起始位置起初是0,但并不代表每时每刻都为0,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加,beginningOffsets() 方法的具体定义如下:

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) 
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)

beginningOffsets() 方法中的参数内容和含义都与 endOffsets() 方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer中直接提供了seekToBeginning() 方法和seekToEnd() 方法来实现这两个功能,这两个方法的具体定义如下:

public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

⑤ 有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek() 方法来追溯到相应的位置。KafkaConsumer同样考虑到了这种情况,它提供了一个offsetsForTimes() 方法,通过timestamp来查询与此对应的分区位置:

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)

offsetsForTimes() 方法的参数 timestampsToSearch 是一个Map类型,key为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于 OffsetAndTimestamp 中的 offset 和 timestamp字段。下面的示例演示了 offsetsForTimes() 和 seek() 之间的使用方法,首先通过 offsetsForTimes() 方法获取一天之前的消息位置,然后使用 seek() 方法追溯到相应位置开始消费:

@Slf4j
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");// 显式配置消费者手动提交位移properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("topic-01"));Map<TopicPartition,Long> timestampToSearch = new HashMap<>();Set<TopicPartition> topicPartitionSet = consumer.assignment();// 查询的分区以及查询的时间戳for (TopicPartition topicPartition : topicPartitionSet) {timestampToSearch.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);}// 获取时间戳大于等于待查询时间的第一条消息对应的位置和时间戳Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampToSearch);for (TopicPartition topicPartition : topicPartitionSet) {OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);// seek 方法重置消费的位移if(offsetAndTimestamp != null){consumer.seek(topicPartition,offsetAndTimestamp.offset());}}}
}

⑥ 位移越界也会触发 auto.offset.reset 参数的执行,位移越界是指知道消费位置却无法在实际的分区中查找到,比如原本拉取位置为101(fetch offset 101),但已经越界了(out of range),所以此时会根据 auto.offset.reset 参数的默认值来将拉取位置重置(resetting offset)为100,我们也能知道此时分区中最大的消息 offset 为99。

6. 如何优雅地退出轮询循环消费?

如何优雅地退出轮询循环,如果你确定马上要关闭消费者(即使消费者还在等待一个poll()返回),那么可以在另一个线程中调用consumer.wakeup()。如果轮询循环运行在主线程中,那么可以在ShutdownHook里调用这个方法。需要注意的是,consumer.wakeup() 是消费者唯一一个可以在其他线程中安全调用的方法。调用 consumer.wakeup() 会导致poll()抛出WakeupException,如果调用 consumer.wakeup() 时线程没有在轮询,那么异常将在下一次调用 poll() 时抛出。不一定要处理WakeupException,但在退出线程之前必须调用consumer.close() 。消费者在被关闭时会提交还没有提交的偏移量,并向消费者协调器发送消息,告知自己正在离开群组。协调器会立即触发再均衡,被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者,不需要等待会话超时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/95626.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

不花一分钱,利用免费电脑软件将视频MV变成歌曲音频MP3

教程 1.点击下载电脑软件下载地址&#xff0c;点击下载&#xff0c;安装。&#xff08;没有利益关系&#xff0c;没有打广告&#xff0c;只是单纯教学&#xff09; 2.安装完成后&#xff0c;点击格式工厂 3.然后如图所示依次&#xff0c;点击【音频】->【-MP3】 3.然后点击…

C#反编译工具ILSPY

ILSPY ILSpy 是一个开源的.Net程序集浏览器和反编译工具。 Visual Studio 2022附带了默认情况下启用的F12反编译支持&#xff08;使用我们的引擎v7.1&#xff09;。 在Visual Studio 2019中&#xff0c;您必须手动启用F12支持。转到“工具”/“选项”/“文本编辑器”/C#/Adva…

Kotlin协程runBlocking并发launch,Semaphore同步1个launch任务运行

Kotlin协程runBlocking并发launch&#xff0c;Semaphore同步1个launch任务运行 <dependency><groupId>org.jetbrains.kotlinx</groupId><artifactId>kotlinx-coroutines-core</artifactId><version>1.7.3</version><type>pom&…

2023国赛数学建模思路 - 复盘:光照强度计算的优化模型

文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米&#xff0c;宽为12米&…

LVS+keepalived群集

keepalived概述 keepalived软件就是通过vrrp协议来实现高可用功能 vrrp通信原理 vrrp就是虚拟路由冗余协议&#xff0c;它的出现就是为了解决静态路由的单点故障 vrrp是通过一种竞选的一种协议机制&#xff0c;来将路由交给某台vrrp路由器 vrrp用的IP是多播的方式&#xff…

设计模式之原型模式

文章目录 一、介绍二、实现步骤三、案例四、应用五、细胞分裂六、改造细胞分裂逻辑七、总结 一、介绍 原型模式属于创建型设计模式&#xff0c;用于创建重复的对象&#xff0c;且同时又保证了性能。 该设计模式的好处是将对象的创建与调用方分离。 其目的就是**根据一个对象…

【UE】Web Browser内嵌网页在场景中的褪色问题

使用WebBrowser放置在场景中时&#xff0c;网页颜色会出现异常的褪色。 这是因为 Web 浏览器插件以 sRGB 格式输出其颜色数据&#xff0c;而 Widget/3D Widget 需要线性 RGB 格式的数据。 可以通过创建在 3D Widget 中使用的新材质&#xff08;而不是默认的 Widget3DPassthr…

数字化施工:解决传统施工难题,提高施工效率和质量的行业革命

建筑行业是我国国民经济的重要组成部分&#xff0c;也是支柱性产业之一。然而&#xff0c;建筑业同时也是一个安全事故多发的高风险行业。如何加强施工现场的安全管理&#xff0c;降低事故发生的频率&#xff0c;避免各种违规操作和不文明施工&#xff0c;提高建筑工程的质量&a…

chatglm2-6b模型在9n-triton中部署并集成至langchain实践 | 京东云技术团队

一.前言 近期&#xff0c; ChatGLM-6B 的第二代版本ChatGLM2-6B已经正式发布&#xff0c;引入了如下新特性&#xff1a; ①. 基座模型升级&#xff0c;性能更强大&#xff0c;在中文C-Eval榜单中&#xff0c;以51.7分位列第6&#xff1b; ②. 支持8K-32k的上下文&#xff1b…

955 神仙公司名单

你是否想过&#xff0c;有一种公司&#xff0c;每天上班不打卡&#xff0c;没有绩效考核&#xff0c;员工可以带着宠物上班&#xff0c;还有公司专门的健身房和游戏室&#xff1f;这样的公司&#xff0c;真的存在&#xff01;今天我们就来探秘这个传说中的955神仙公司&#xff…

java并发:synchronized锁详解

背景&#xff1a; 在java多线程当中&#xff0c;我们总有遇到过多个线程操作一个共享数据时&#xff0c;而这个最后的代码执行结果并没有按照我们的预期一样得到正确的结果。此时我们就需要让代码执行在操作共享变量时&#xff0c;要等一个线程操作完毕时&#xff0c;另一个线程…

【云原生,k8s】Helm应用包管理器介绍

目录 一、为什么需要Helm&#xff1f; &#xff08;一&#xff09;Helm介绍 &#xff08;二&#xff09;Helm有3个重要概念&#xff1a; &#xff08;三&#xff09;Helm特点 二、Helm V3变化 &#xff08;一&#xff09;架构变化 &#xff08;二&#xff09;自动创建名…

从其他地方复制的内容无法粘贴到idea中

问题描述 提示&#xff1a;这里描述项目中遇到的问题&#xff1a; 使用 idea 开发的时候&#xff0c;从其他地方复制的内容无法粘贴到 idea中&#xff0c;idea的版本是 2023.2 解决方案&#xff1a; 提示&#xff1a;这里填写该问题的具体解决方案&#xff1a; 网上查找资料…

redis — 基于Spring Boot实现redis延迟队列

1. 业务场景 延时队列场景在我们日常业务开发中经常遇到&#xff0c;它是一种特殊类型的消息队列&#xff0c;它允许把消息发送到队列中&#xff0c;但不立即投递给消费者&#xff0c;而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种&#xff1a; 在…

SpringBoot3集成Kafka

标签&#xff1a;Kafka3.Kafka-eagle3&#xff1b; 一、简介 Kafka是一个开源的分布式事件流平台&#xff0c;常被用于高性能数据管道、流分析、数据集成和关键任务应用&#xff0c;基于Zookeeper协调的处理平台&#xff0c;也是一种消息系统&#xff0c;具有更好的吞吐量、内…

【云原生】3分钟快速在Kubernetes部署Prometheus2.42+Grafana9.5.1+Alertmanager0.25

文章目录 1、简介2、GitHub地址3、环境信息4、安装5、访问Grafana1、简介 Prometheus-operator帮助我们快速创建Prometheus+Grafana+Alertmanager等服务,而kube-prometheus更加完整的帮助我们搭建全套监控体系,这包括部署多个 Prometheus 和 Alertmanager 实例, 指标导出器…

axios / fetch 实现 stream 流式请求

axios 是一个支持node端和浏览器端的易用、简洁且高效的http库。本文主要介绍 axios 如何实现 stream 流式请求&#xff0c;注意这里需要区分 node 环境和浏览器环境。 一、node端 代码演示&#xff1a; const axios require(axios);axios({method: get,url: http://tiven.c…

系统架构设计师-信息安全技术(2)

目录 一、安全架构概述 1、信息安全所面临的威胁 二、安全模型 1、安全模型的分类 2、BLP模型 3、Biba 模型 4、Chinese Wall模型 三、信息安全整体架构设计 1、WPDRRC模型 2、各模型的安全防范功能 四、网络安全体系架构设计 1、开放系统互联安全体系结构 2、安全服务与安…

[保研/考研机试] KY103 2的幂次方 上海交通大学复试上机题 C++实现

题目链接&#xff1a; KY103 2的幂次方 https://www.nowcoder.com/share/jump/437195121691999575955 描述 Every positive number can be presented by the exponential form.For example, 137 2^7 2^3 2^0。 Lets present a^b by the form a(b).Then 137 is present…

0基础入门C++之类和对象上篇

目录 1.面向过程和面向对象初步认识2.类的引入3.类的定义3.1类的两种定义方式:3.2成员变量命名规则的建议 4.类的访问限定符及封装4.1访问限定符4.2封装 5.类的作用域6.类的实例化7.类对象模型7.1如何计算类对象的大小7.2 类对象的存储方式猜测 8.this指针8.1this指针的引出8.2…