Kafka(四)消费者消费消息

文章目录

  • 如何确保不重复消费消息?
  • 消费者业务逻辑重试
  • 消费者提交
  • 自定义反序列化类
  • 消费者参数配置及其说明
    • 重要的参数session.time.ms和heartbeat.interval.ms和group.instance.id
    • 增加消费者的吞吐量
    • 消费者消费的超时时间和poll()方法的关系
  • 消费者消费逻辑
  • 启动消费者
  • 关闭消费者
  • 配置listener
  • 结语
  • 示例源码仓库

在 上一篇文章里,对于生产者,发送时失败之后会由定时任务进行重新发送, 并且我们是根据消息的key进行分区的, 所以不管我们重新发送了多少次,对于同一个key,始终会被送到同一个分区

那么到消费者这里,最重要的问题是如何确保不会重复消费之前因为各种原因被重新发送到某个分区的消息。

如何确保不重复消费消息?

基本思路如下

  1. 我们在数据库中创建了一个已成功消费的消息表,里面只有一列,消息的key。当消费者消费逻辑成功之后,我们会把其key保存到这张表里 。
  2. 当消费者拉取新的一批消息时,我们会去数据库的消息表里查是否已经存在该消息的key,存在的话,就跳过实际的消费业务。
  3. 一批消息里也可能存在相同的key,所以我们处理完一次消费业务,就把该key放到一个set里,消费下一条消息时,则先去set里看一下,存在的话即跳过,不存在则正常执行消费业务。即使前面的消息消费业务失败了,后面相同key的消息也直接跳过,不会再次消费

消费者业务逻辑重试

对于消费者业务逻辑的重试,我们使用failsafe框架进行重试,该框架的使用可参考官方文档,这里不做过多赘述。

消费者提交

这里的方式采用的是Kafka权威指南中消费者一章中提出的方式。 异步+同步。平时使用异步提交,在关闭消费者时,使用同步提交,确保消费者退出之前将当前的offset提交上去。

自定义反序列化类

在生产者端,我们发送自定义的对象时,利用自定义序列化类将其序列化为JSON。在消费者端,我们同样需要自定义反序列类将JSON转为我们之前的对象

public class UserDTODeserializer implements Deserializer<UserDTO> {@Override@SneakyThrowspublic UserDTO deserialize(final String s, final byte[] bytes) {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.readValue(bytes, UserDTO.class);}
}

消费者参数配置及其说明

    /*** 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景需求 自己调整* https://kafka.apache.org/26/documentation/#group.instance.id** 为什么需要group.instance.id?* 假设auto.offset.reset=latest* 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会【丢失这部分消息】* 假如auto.offset.reset=earliest* 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会重复消费【全部消息】** 光有group.instance.id还不够,还需要修改heartbeat.interval.ms和session.timeout.ms的值为合理的值* 如果程序部署,重启期间,重启时间超过了session.timeout.ms的值,那么kafka会认为此消费者已经挂了会触发rebalance,在一些大型消息场景,rebalance的过程可能会很慢, 更详细的解释请参考* https://kafka.apache.org/26/documentation/#static_membership* @param groupInstanceId* @return*/public static Properties loadConsumerConfig(int groupInstanceId, String valueDeserializer) {Properties result = new Properties();result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9093");result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);result.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 代表此消费者是消费者组的static memberresult.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "test-" + ++groupInstanceId);// 修改heartbeat.interval.ms和session.timeout.ms的值,和group.instance.id配合使用,避免重启或重启时间过长的时候,触发rebalanceresult.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 60);result.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 60 * 5);// 关闭自动提交result.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);// 默认1MB,增加吞吐量,其设置对应的是每个分区,也就是说一个分区返回10MB的数据result.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576 * 10);result.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 返回全部数据的大小result.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1048576 * 100);// 默认5分钟result.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000 * 60 * 5);return result;}

重要的参数session.time.ms和heartbeat.interval.ms和group.instance.id

三者的使用方式见上面代码中的注释。

增加消费者的吞吐量

和上一篇文章一样,由于我们的邮件消息每个大概是20KB,使用默认的消费者参数,吞吐量是上不来的。 所以做了一些优化,除了消费者消费逻辑要尽可能简单之外,为了增加消费者的吞吐量,可以根据实际场景修改倒数第4、3、2个参数。

消费者消费的超时时间和poll()方法的关系

由max.poll.interval.ms参数控制,默认5分钟。如果消费者业务逻辑处理特别耗时,在5分钟之内没有再次调用poll()拉取消息,则Kafka认为消费者已死,根据具体配置会立刻触发rebalance还是等一段时间再触发rebalance。

这里特别强调一下,网上有一部分文章说是要确保消费逻辑在poll(timeUnit)时间内处理完,否则就会触发rebalance。这都是很早之前的Kafka版本了,是因为原来消费者的poll()线程和心跳线程使用的是同一个线程。现在的版本早就把这两个分开了。所以你只需要注意,自己的消费逻辑别超过max.poll.interval.ms即可,如果觉得不够用,也可自己调整。

poll()方法中的时间代表的是多长时间去拉取一次消息。假设你设置的是1分钟,你的消费逻辑处理的很快,可能用了10s。那么在你消费完了之后,消费者会在1分钟之后拉取新消息。

在消费者中使用手动提交。

消费者消费逻辑

这里要注意

  1. 如果消费逻辑可能抛出异常,则使用try-catch处理,防止因为抛出异常,导致我们错误的关闭了消费者
  2. 消费者消费逻辑失败时会重试,重试N次之后,我们会将其保存在数据库中,以便和生产者一样,定时处理失败的消息
  3. 消费逻辑没问题的话,则把该消息的key进行入库处理
@Log
public class MessageConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private MessageAckConsumesSuccessService messageAckConsumesSuccessService = new MessageAckConsumesSuccessService();private MessageFailedService messageFailedService = new MessageFailedService();private final KafkaConsumer<String, UserDTO> consumer;private final int consumerPollIntervalSecond;public MessageConsumerRunner(KafkaConsumer<String, UserDTO> consumer, int consumerPollIntervalSecond) {this.consumer = consumer;this.consumerPollIntervalSecond = consumerPollIntervalSecond;}/*** 1. 使用https://failsafe.dev/进行重试* 2. 每次消费消息前,判断消息ID是否存在于数据库中和当前Set集合中,避免重复消费,*    我们的消息时根据消息的key进行hash分区的,所以同一个消息即使生产多次,一定会到同一个partition中,partition动态增加引起的特殊情况不在考虑范围之内* 4. 在一次消费消息中重试两次,如果两次都失败,那么将失败原因、消息的JSON字符串插入到message_failed表中,以便后续再次生产或排查问题* 3. 平时异步提交,关闭消费者时使用同步提交*/@Overridepublic void run() {AtomicReference<String> errorMessage = new AtomicReference<>(StringUtils.EMPTY);RetryPolicy<Boolean> retryPolicy = RetryPolicy.<Boolean>builder().handle(Exception.class)// 如果业务逻辑返回false或者抛出异常,则重试.handleResultIf(Boolean.FALSE::equals)// 不包含首次.withMaxRetries(2).withDelay(Duration.ofMillis(200)).onRetry(e -> log.warning("consume message failed, start the {}th retry"+ e.getAttemptCount())).onRetriesExceeded(e -> {Optional.ofNullable(e.getException()).ifPresent(u -> errorMessage.set(u.getMessage()));log.severe("max retries exceeded" + e.getException());}).build();Fallback<Boolean> fallback = Fallback.<Boolean>builder(e -> {// do nothing, suppress exceptions}).build();try {consumer.subscribe(Collections.singletonList("email"));while (!closed.get()) {// get message from kafkaConsumerRecords<String, UserDTO> records = consumer.poll(Duration.ofSeconds(consumerPollIntervalSecond));if (records.isEmpty()) {return;}Set<UserDTO> successConsumed = new HashSet<>();Set<UserDTO> failedConsumed = new HashSet<>();Map<String, String> failedConsumedReason = new HashMap<>();// check message if exist in databaseSet<String> checkingMessageIds = new HashSet<>(records.count());records.iterator().forEachRemaining(item -> checkingMessageIds.add(item.value().getMessageId()));Set<String> hasBeenConsumedMessageIds = messageAckConsumesSuccessService.checkMessageIfExistInDatabase(checkingMessageIds);records.forEach(item -> {if (hasBeenConsumedMessageIds.contains(item.value().getMessageId())) {// if exist, continuereturn;}// 每一批消息中也可能存在同样的消息,所以需要再次判断hasBeenConsumedMessageIds.add(item.value().getMessageId());try {Failsafe.with(fallback, retryPolicy).onSuccess(e -> successConsumed.add(item.value())).onFailure(e -> {failedConsumed.add(item.value());failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(errorMessage.get()) ? errorMessage.get() : "no reason, may be check server log");errorMessage.set(StringUtils.EMPTY);}).get(() -> {// 这里是业务逻辑,可以返回true或false,为什么要这样?是因为上面RetryPolicy这里定义的boolean,根据自己实际业务设置相应的类型return true;});// 这里要catch住所有业务异常,防止由业务异常导致消费者线程退出}catch (Exception e) {log.severe("failed to consume email message" + e);failedConsumed.add(item.value());failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(e.getMessage()) ? e.getMessage() : e.getCause().toString());}});postConsumed(successConsumed, failedConsumed, failedConsumedReason);// 平时使用异步提交consumer.commitAsync();}}catch (WakeupException e) {if (!closed.get()) {throw e;}} finally {// 消费者退出时使用同步提交try {consumer.commitSync();} catch (Exception e) {log.info("commit sync occur exception: " + e);} finally{try {consumer.close();}catch (Exception e) {log.info("consumer close occur exception: " + e);}log.info( "shutdown kafka consumer complete");}}}/*** 处理成功、成功后的回调、失败* @param successConsumed* @param failedConsumed* @param failedConsumedReason*/private void postConsumed(Set<UserDTO> successConsumed, Set<UserDTO> failedConsumed, Map<String, String> failedConsumedReason) {// 后置处理开启异步线程处理,不阻塞消费者线程// 克隆传进来的集合,而不使用原集合的引用,因为原集合每次消费都会重置Set<UserDTO> cloneSuccessConsumed = new HashSet<>(successConsumed);Set<UserDTO> cloneFailedConsumed = new HashSet<>(failedConsumed);Map<String, String> cloneFailedConsumedReason = new HashMap<>(failedConsumedReason);new Thread( () -> {if (!cloneSuccessConsumed.isEmpty()) {messageAckConsumesSuccessService.insertMessageIds(cloneSuccessConsumed.stream().map(UserDTO::getMessageId).collect(Collectors.toSet()));cloneFailedConsumed.forEach(item -> {if (Objects.nonNull(item.getCallbackMetaData())) {// do callbackCallbackProducer callbackProducer = new CallbackProducer();callbackProducer.sendCallbackMessage(item.getCallbackMetaData(), MessageFailedPhrase.PRODUCER);}});}if (!cloneFailedConsumed.isEmpty()) {ObjectMapper objectMapper = new ObjectMapper();cloneFailedConsumed.forEach(item -> {MessageFailedEntity entity = new MessageFailedEntity();entity.setMessageId(item.getMessageId());entity.setMessageType(MessageType.EMAIL);entity.setMessageFailedPhrase(MessageFailedPhrase.CONSUMER);entity.setFailedReason(cloneFailedConsumedReason.get(item.getMessageId()));try {entity.setMessageContentJsonFormat(objectMapper.writeValueAsString(item));} catch (JsonProcessingException e) {log.info("failed to convert UserDTO message to json string");}messageFailedService.saveOrUpdateMessageFailed(entity);});}}).start();}public void shutdown() {log.info( Thread.currentThread().getName() + " shutdown kafka consumer");closed.set(true);consumer.wakeup();}
}

启动消费者

通过实现ServletContextListener接口对于方法使其在Tomcat启动之后,启动消费者

public class StartUpConsumerListener implements ServletContextListener {/*** 假设开启10个消费者.** 消费者的数量要和partition的数量一致,实际情况下,可以调用AdminClient的方法获取到topic的partition数量,然后根据partition数量来创建消费者.* @param sce*/@Overridepublic void contextInitialized(final ServletContextEvent sce) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new AbortPolicy());for (int i = 0; i < 10; i++) {KafkaConsumer<String, UserDTO> consumer = new KafkaConsumer<>(KafkaConfiguration.loadConsumerConfig(i, UserDTO.class.getName()));MessageConsumerRunner messageConsumerRunner = new MessageConsumerRunner(consumer, 10);// 使用另外一个线程来关闭消费者Thread shutdownHooks = new Thread(messageConsumerRunner::shutdown);KafkaListener.KAFKA_CONSUMERS.add(shutdownHooks);// 启动消费者线程threadPoolExecutor.execute(messageConsumerRunner);}}
}

关闭消费者

public class KafkaListener implements ServletContextListener {public static final Vector<Thread> KAFKA_CONSUMERS = new Vector<>();@Overridepublic void contextInitialized(ServletContextEvent sce) {// do noting}@Overridepublic void contextDestroyed(ServletContextEvent sce) {KAFKA_CONSUMERS.forEach(Thread::run);}
}

配置listener

<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaeehttps://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"version="6.0"><display-name>Kafka消息的消费者-消息系统</display-name><!--  listener的contextInitialized顺序按照声明顺序执行, contextDestroyed方法按照声明顺序反向执行--><listener><listener-class>com.message.server.listener.KafkaListener</listener-class></listener><listener><listener-class>com.message.server.listener.StartUpConsumerListener</listener-class></listener>
</web-app>

结语

  1. 在处理消费者相关逻辑时,我们重点关心如何确保消息不重复消费以及如何增加消费者的吞吐量
  2. 消费逻辑尽可能保证处理速度快,尽量减少耗时的逻辑

示例源码仓库

  1. Github地址
  2. 项目下message-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述

我们生产者和消费者的正常情况都以处理完了,下一篇文章我们将重点处理生产者失败和消费者失败之后重新生产消息和消费消息的逻辑,以及简单说一下Kafka中的rebalance。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/197480.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ExcelBDD PHP Guideline

在PHP里面支持利用Excel的BDD&#xff0c;也支持利用Excel进行参数化测试 ExcelBDD Use Excel file as BDD feature file, get example data from Excel files, support automation tests. Features The main features provided by this library are: Read test data acco…

【win32_001】win32命名规、缩写、窗口

整数类型 bool类型 使用注意&#xff1a; 一般bool 的false0&#xff1b;true1 | 2 | …|n false是为0&#xff0c;true是非零 不建议这样用&#xff1a; if (result TRUE) // Wrong! 因为result不一定只返回1&#xff08;true&#xff09;&#xff0c;当返回2时&#xff0c…

集合框架面试题

一、集合容器的概述 1. 什么是集合 集合框架&#xff1a;用于存储数据的容器。 集合框架是为表示和操作集合而规定的一种统一的标准的体系结构。 任何集合框架都包含三大块内容&#xff1a; 对外的接口、接口的实现和对集合运算的算 法。 接口&#xff1a;表示集合的抽象数据…

鸿蒙:实现两个Page页面跳转

效果展示 这篇博文在《鸿蒙&#xff1a;从0到“Hello Harmony”》基础上实现两个Page页面跳转 1.构建第一个页面 第一个页面就是“Hello Harmony”&#xff0c;把文件名和显示内容都改一下&#xff0c;改成“FirstPage”&#xff0c;再添加一个“Next”按钮。 Entry Compone…

Axure9 基本操作(二)

1. 文本框、文本域 文本框&#xff1a;快速实现提示文字与不同类型文字显示的效果。 2. 下拉列表、列表框 下拉列表&#xff1a;快速实现下拉框及默认显示项的效果。 3. 复选框、单选按钮 4. 利用动态面板实现同个按键的不同状态切换

Codewhisperer 使用评价

最近亚⻢逊推出了一款基于机器学习的 AI 编程助手 Amazon CodeWhisperer&#xff0c;可以实时提供代码建议。在编写代码时&#xff0c;它会自动根据现有的代码和注释给出建议。Amazon CodeWhisperer 与GitHub Copilot类似&#xff0c;主要的功能有: 代码补全注释和文档补全代码…

图像分类(四) 全面解读复现GoogleNet_InceptionV1-V4

论文解读 InceptionV1 前言 论文题目: Going Deeper with Convolutions Googlenet论文原文地址:https://arxiv.org/pdf/1409.4842.pdf 之前看过VGG的论文&#xff08;VGG精读直达&#xff09;。当时VGG获得了 2014 ILSVRC 图像分类的第二名&#xff0c;今天来看一下第一名…

Linux | 进程间通信

目录 前言 一、进程间通信的基本概念 二、管道 1、管道的基本概念 2、匿名管道 &#xff08;1&#xff09;原理 &#xff08;2&#xff09;测试代码 &#xff08;3&#xff09;读写控制相关问题 a、读端关闭 b、写端关闭 c、读快写慢 d、读慢些快 &#xff08;4&a…

Linux 系统编程,Binder 学习,文件访问相关的接口

文章目录 Linux 系统编程&#xff0c;Binder 学习&#xff0c;文件访问相关的接口1.概念2.linux文件结构3.文件描述符4.Linux文件系统的两类常用接口&#xff0c;linux系统内置库函数4.1 open4.2 close4.3 read4.4 write 5.标准I/O库函数5.1 fopen Linux 系统编程&#xff0c;B…

NewStarCTF2023 Reverse Week3 EzDLL WP

分析 这里调用了z3h.dll中的encrypt函数。 用ida64载入z3h.dll 直接搜索encrypt 找到了一个XTEA加密。接着回去找key和密文。 发现key 这里用了个调试状态来判断是否正确&#xff0c;v71&#xff0c;要v7&#xff1d;1才会输出Right&#xff0c;即程序要处于飞调试状态。 可…

一、MySQL-Replication(主从复制)

1.1、MySQL Replication 主从复制&#xff08;也称 AB 复制&#xff09;允许将来自一个MySQL数据库服务器&#xff08;主服务器&#xff09;的数据复制到一个或多个MySQL数据库服务器&#xff08;从服务器&#xff09;。 根据配置&#xff0c;您可以复制数据库中的所有数据库&a…

ESP32 Arduino实战协议篇-搭建独立的 Web 服务器

在此项目中,您将创建一个带有 ESP32 的独立 Web 服务器,该服务器使用 Arduino IDE 编程环境控制输出(两个 LED)。Web 服务器是移动响应的,可以使用本地网络上的任何浏览器设备进行访问。我们将向您展示如何创建 Web 服务器以及代码如何逐步工作。 项目概况 在直接进入项目…

基于机器学习的居民消费影响因子分析预测

项目视频讲解: 基于机器学习的居民消费影响因子分析预测_哔哩哔哩_bilibili 主要工作内容: 完整代码: import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import missingno as msno import warnings warnings.filterwarnin…

驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接

参考&#xff1a;https://www.cnblogs.com/sam-snow-v/p/15917898.html eclipse链接SQL Server出现问题 笔者使用Open JDK 17&#xff0c;SQL Server 2016&#xff0c;项目中使用JPA操作数据库。测试环境没问题&#xff0c;生产环境出现如题所示“驱动程序无法通过使用安全套接…

qsort函数使用方法总结

目录 一、qsort函数原型 二、compar参数 三、各种类型的qsort排序 1. int 数组排序 2. 结构体排序 3. 字符串指针数组排序 4. 字符串二维数组排序 四、回调函数 1. 什么是回调函数 2. 为什么要用回调函数&#xff1f; 3. 怎么使用回调函数&#xff1f; 4.下面是…

C++多线程编程(2):四种线程管理方法

文章首发于我的个人博客&#xff1a;欢迎大佬们来逛逛 文章目录 线程管理get_idsleep_forsleep_untilyield 线程管理 有一个this_thread的名称空间中定义了许多的线程管理方法&#xff1a; get_id&#xff1a;获取当前线程idsleep_for&#xff1a;当前线程休眠一段时间sleep_…

Linux系统编程学习 NO.9——git、gdb

前言 本篇文章简单介绍了Linux操作系统中两个实用的开发工具git版本控制器和gdb调试器。 git 什么是git&#xff1f; git是一款开源的分布式版本控制软件。它不仅具有网络功能&#xff0c;还是服务端与客户端一体的软件。它可以高效的处理程序项目中的版本管理。它是Linux内…

实验五:Java多线程程序设计

一、线程接力 编写一个应用程序&#xff0c;除了主线程外&#xff0c;还有三个线程&#xff1a;first、second和third。first负责模拟一个红色的按钮从坐标&#xff08;10&#xff0c;60&#xff09;运动到&#xff08;100&#xff0c;60&#xff09;&#xff1b;second负责模…

【机器学习基础】正则化

&#x1f680;个人主页&#xff1a;为梦而生~ 关注我一起学习吧&#xff01; &#x1f4a1;专栏&#xff1a;机器学习 欢迎订阅&#xff01;后面的内容会越来越有意思~ ⭐特别提醒&#xff1a;针对机器学习&#xff0c;特别开始专栏&#xff1a;机器学习python实战 欢迎订阅&am…

OpenCV图像处理、计算机视觉实战应用

OpenCV图像处理、计算机视觉实战应用 专栏简介一、基于差异模型模板匹配缺陷检测二、基于NCC多角度多目标匹配三、基于zxing多二维码识别四、基于tesseract OCR字符识别 专栏简介 基于OpenCV C分享一些图像处理、计算机视觉实战项目。不定期持续更新&#xff0c;干货满满&…