kafka-消费者-消费异常处理(SpringBoot整合Kafka)

文章目录

  • 1、消费异常处理
    • 1.1、application.yml配置
    • 1.2、注册异常处理器
    • 1.3、消费者使用异常处理器
    • 1.4、创建生产者发送消息
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:
      • 1.8.1、第一次启动SpringKafkaConsumerApplication
      • 1.8.n、第n次启动SpringKafkaConsumerApplication

1、消费异常处理

1.1、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量# 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false# 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始# earliest:从最早的消息开始消费# latest:第一次从LEO位置开始消费# none:如果主题分区没有偏移量,则抛出异常auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual# 手动ack:消费有异常时停止ack-mode: manual_immediate

在这里插入图片描述

1.2、注册异常处理器

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}//方法名就是注入到容器中对象的名称@Beanpublic ConsumerAwareListenerErrorHandler myErrorHandler() {//创建异常处理器:消费者异常时 且注册使用当前异常处理器 会生效return new ConsumerAwareListenerErrorHandler() {@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {System.out.println("出现异常,消息内容:value = " + message.getPayload());System.out.println("header = "+message.getHeaders());System.out.println("异常信息:" + e.getMessage());System.out.println("=================");return null;}};}
}

1.3、消费者使用异常处理器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListenerAck {/*** 自动ack可能会导致漏消息*      spring-kafka:*                     自动ack 如果有异常,会死循环获取消息重新消费*                        不能继续向后消费消息,会导致消息积压**                    手动ack 配置了手动ack,且ack-mode为manual_immediate时,*                        如果消息消费失败,会继续向后消费* @param record*/@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitions = {"0"})}, groupId = "my_group1",errorHandler = "myErrorHandler")public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {System.out.println("my_group1消费者获取分区0的消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());int i = 1/0;// 手动ack:手动确认消息已经消费 broker 会提交消费者偏移量acknowledgment.acknowledge();}
}

在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定ack-mode: manual_immediate消费"+i);}}}

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

1.8.1、第一次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=91c96206-2381-5fa1-b391-52627056762f, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=8755d954-615c-37ff-67f0-85521e090b03, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=68082673-07d2-6f69-3935-c7034a7caf81, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=bcbda053-9536-df91-d937-2688b5d4c6ea, timestamp=1717672488754}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

1.8.n、第n次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=6bc9edb9-ad1c-e00e-9b27-c0c540248091, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=14e6951b-b25f-10ca-702c-a1699d25645b, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=26e85ef8-2502-fd29-8d5b-091ec0362900, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=25c6b6c8-d02d-2091-7c6f-ebaea6c52d1d, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

此时如果不关闭SpringKafkaConsumerApplication,生产者继续发送消息,消费者只会往后消费,不会从头再次消费

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/342739.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux环境---在线安装jdk

Linux环境—在线安装jdk 一、使用步骤 1.安装环境 JDK版本&#xff1a;1.8 1.1 建立存放软件的目录 注意&#xff1a;此处本人是将需要按照的软件存放在directory目录下&#xff0c;可根据实际情况调整接收路径。 命令如下&#xff1a; mkdir directory2.安装jdk 2.1 建…

【YOLOv8改进[CONV]】SPDConv助力YOLOv8目标检测效果 + 含全部代码和详细修改方式 + 手撕结构图

本文将使用SPDConv助力YOLOv8目标检测效果的实践,文中含全部代码、详细修改方式以及手撕结构图。助您轻松理解改进的方法。 改进前和改进后的参数对比: 目录 一 SPDConv 二 SPDConv助力YOLOv8目标检测效果 1 整体修改 ① 添加SPDConv.py文件 ② 修改ultralytics/nn/tas…

Vue-插槽 Slots

文章目录 前言什么叫插槽简单插槽指定默认值多个插槽根据父级别名称指定区域显示(具名插槽)作用域插槽 前言 本篇文章不做过多的讲解与说明&#xff0c;只记录个人实验测试案例。 详见&#xff1a;vue 官方文档 插槽 slots 什么叫插槽 之前的博客中&#xff0c;父级组件可以…

【简单讲解下TalkingData】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

vite+ts设置别名

准备工作 安装 types/node 避免代码爆红 npm install types/node一、根目录下 vite.config.ts 文件中配置 import { resolve } from path;resolve: {// 设置文件./src路径为 alias: [{find: ,replacement: resolve(__dirname, ./src)}] }二、根目录下 tsconfig.json 文件中配…

【MySQL】数据库入门基础

文章目录 一、数据库的概念1. 什么是数据库2. 主流数据库3. mysql和mysqld的区别 二、MySQL基本使用1. 安装MySQL服务器在 CentOS 上安装 MySQL 服务器在 Ubuntu 上安装 MySQL 服务器验证安装 2. 服务器管理启动服务器查看服务器连接服务器停止服务器重启服务器 3. 服务器&…

美团发布2024年一季度财报:营收733亿元,同比增长25%

6月6日&#xff0c;美团(股票代码:3690.HK)发布2024年第一季度业绩报告。受益于经济持续回暖和消费复苏&#xff0c;公司各项业务继续取得稳健增长&#xff0c;营收733亿元(人民币&#xff0c;下同)&#xff0c;同比增长25%。 财报显示&#xff0c;一季度&#xff0c;美团继续…

$MPC 登录MEXC,加速Partisia Blockchain 生态市场进程

Partisia Blockchain是一个以MPC技术方案为基础&#xff0c;具备可审计特性的隐私Layer1生态&#xff0c;与此同时&#xff0c;该链通过系列创新的系统架构&#xff0c;能够兼顾高迸发、安全、可拓展性以及可互操作特性。基于系列技术特性&#xff0c;Partisia Blockchain正在构…

Mybatis 查询TypeHandler使用,转译查询数据(逗号分隔转List)

创建自定义的Hanndler /*** Package: com.datalyg.common.core.handler* ClassName: CommaSeparatedStringTypeHandler* Author: dujiayu* Description: 用于mybatis 解析逗号拼接字符串* Date: 2024/5/29 10:03* Version: 1.0*/ public class CommaSeparatedStringTypeHandle…

【重学C语言】十八、SDL2 图形编程介绍和环境配置

【重学C语言】十八、SDL2 图形编程介绍和环境配置 **SDL2介绍**SDL 2用途SDL 在哪些平台上运行&#xff1f;下载和安装 SDL2安装 SDL2 clion 配置 SDL2 SDL2介绍 SDL2&#xff08;Simple DirectMedia Layer 2&#xff09;是一个开源的跨平台多媒体开发库&#xff0c;主要用于游…

LabVIEW冲击响应谱分析系统

LabVIEW冲击响应谱分析系统 开发了一种基于LabVIEW开发的冲击响应谱分析系统&#xff0c;该系统主要用于分析在短时间内高量级输入力作用下装备的响应。通过改进的递归数字滤波法和样条函数法进行冲击响应谱的计算&#xff0c;实现了冲击有效持续时间的自动提取和响应谱的精准…

JVM学习-Arthas

Arthas Alibaba开源的Java诊断工具&#xff0c;在线排查问题&#xff0c;无需重启&#xff0c;动态跟踪Java代码&#xff0c;实时监控JVM状态Arthas支持JDK6&#xff0c;支持Linux/Mac/Windows&#xff0c;采用命令行交互模式&#xff0c;同时提供丰富的Tab自动补全功能&#…

数据资产入表-数据治理-指标建设标准

前情提要&#xff1a;数据价值管理是指通过一系列管理策略和技术手段&#xff0c;帮助企业把庞大的、无序的、低价值的数据资源转变为高价值密度的数据资产的过程&#xff0c;即数据治理和价值变现。上一讲介绍了标签标准设计的基本逻辑和思路。数据资产入表-数据治理-标签设计…

stm32太阳能追光储能系统V2

大家好&#xff0c;我是 小杰学长 stm32太阳能追光储能系统V2. 增加了命令行交互和内置AT指令解析框架 &#xff08;就是可以用电脑串口发送at指令控制板子的所有功能&#xff09; 改动了spi 换成硬件 改动了硬件电源 增加了pcb原理图 附带上pcb源文件 增加了freertos 互斥锁…

node.js漏洞——

一.什么是node.js 简单的说 Node.js 就是运行在服务端的 JavaScript。 Node.js 是一个基于 Chrome JavaScript 运行时建立的一个平台。 Node.js 是一个事件驱动 I/O 服务端 JavaScript 环境&#xff0c;基于 Google 的 V8 引擎&#xff0c;V8 引擎执行 Javascript 的速度非常…

2024骨传导耳机品牌排行前五名汇总,揭晓年度最强王者骨传导机型!

骨传导耳机自问世以来&#xff0c;便迅速在蓝牙耳机市场中崭露头角&#xff0c;并且凭借特殊的传声方式和特健康的佩戴方式深得消费者的喜爱。然而&#xff0c;随着骨传导耳机逐渐热门&#xff0c;市场中品牌越来越多&#xff0c;也逐渐出现了一些劣质品牌&#xff0c;这些品牌…

Activity->Activity中动态添加Fragment->Fragment回退栈BackStack

Fragment回退栈 Fragment回退栈用于管理Fragment的导航历史(添加、删除、替换)。每个Activity都有一个包含其所有Fragment的FragmentManager&#xff0c;调用其addToBackStack方法时&#xff0c;这个事务就会被添加到FragmentManager的回退栈中当用户按下返回键时&#xff0c;…

关于计算机是如何工作的

计算机的发展历程 世界上的第一个计算机 冯诺依曼机构体系 1.存储器 (包括内存(存储空间小,访问速度快,成本高,掉电后数据丢失) 外存(硬盘,软盘,U盘,光盘)),存储空间小,访问速度慢,成本低,掉电后数据仍在 2.CPU(中央处理单元,计算机最核心的部分,用于算术运算和逻辑判断),…

【Python】教你彻底了解Python中的模块和包

​​​​ 文章目录 一、模块的概念1. 导入模块2. 导入特定对象3. 给模块或对象取别名 二、标准库模块1. 常用标准库模块2. 使用示例 三、自定义模块1. 创建模块2. 使用自定义模块 四、包的结构与使用1. 创建包2. 使用包中的模块 五、包的深入使用1. 相对导入2. 子包3. 使用子包…

【排序】冒泡排序

在我们的生活中&#xff0c;到处都离不开排序的作用&#xff0c;考试分数要排序&#xff0c;商场购物要排序&#xff0c;可以说排序对我们来说处处存在&#xff0c;那么从本章开始&#xff0c;我将要依次分享一些排序方法&#xff0c;从易到难&#xff0c;包括冒泡&#xff0c;…