004 Kafka异常处理

6.异常处理

文章目录

  • 6.异常处理
      • 1.异常分类与处理原则
      • 2.生产者异常处理
        • 1. 同步发送捕获异常
        • 2. 异步发送回调处理
      • 3.消费者异常处理
        • 1.全局异常处理器
        • 2.方法级处理
        • 3.重试yml配置
      • 4.死信队列(DLQ)配置
        • 1. 启用死信队列
        • 2. 手动发送到DLQ
      • 5.事务场景异常处理
        • 1. 声明式事务
        • 2. 事务异常回滚
      • 6.监控与告警
        • 1. Actuator 健康检查
        • 2. Prometheus 指标
      • 7.完整异常处理流程
      • 8.最佳实践总结

来源参考的deepseek,如有侵权联系立删

1.异常分类与处理原则

异常类型典型场景处理建议
可恢复异常网络抖动、数据库锁冲突重试机制(有限次数 + 退避策略)
不可恢复异常消息格式错误、权限不足直接记录日志并进入死信队列
事务异常事务超时、生产者ID冲突终止事务并回滚操作

2.生产者异常处理

1. 同步发送捕获异常
public void sendSync(String topic, String message) {try {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.get(5, TimeUnit.SECONDS); // 阻塞等待结果} catch (InterruptedException | ExecutionException | TimeoutException e) {// 记录日志并触发补偿逻辑log.error("消息发送失败: {}", e.getMessage());throw new BusinessException("消息发送失败", e);}
}
2. 异步发送回调处理
public void sendAsync(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> {// 发送成功处理log.info("消息发送成功: topic={}", result.getRecordMetadata().topic());},ex -> {// 发送失败处理log.error("消息发送失败", ex);if (ex instanceof RetriableException) {// 可重试异常(如网络问题)retrySend(topic, message);} else {// 不可重试异常(如消息过大)deadLetterService.saveToDlq(topic, message);}});
}

3.消费者异常处理

1.全局异常处理器
@Configuration
public class KafkaGlobalErrorConfig {// 定义全局错误处理器(支持批量/单消息模式)@Beanpublic CommonErrorHandler globalErrorHandler(KafkaTemplate<String, Object> template) {// 重试策略:3次重试,间隔5秒DefaultErrorHandler handler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), // 死信队列恢复器new FixedBackOff(5000L, 3));// 指定可重试异常类型handler.addRetryableExceptions(NetworkException.class);handler.addNotRetryableExceptions(SerializationException.class);// 偏移量提交策略handler.setCommitRecovered(true);return handler;}// 容器工厂绑定全局处理器@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory,CommonErrorHandler globalErrorHandler) {ConcurrentKafkaListenerContainerFactory<String, Object> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setCommonErrorHandler(globalErrorHandler);return factory;}
}
2.方法级处理
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;@Slf4j
@Configuration
public class KafkaExceptionConfig {/*** 自定义异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler orderErrorHandler() {return (message, exception, consumer) -> {// 业务相关错误处理(如库存不足)/*   if (exception instanceof InventoryException) {retryService.scheduleRetry(message.getPayload());}*/System.out.println("异常执行:"+exception);return null;};}/*** 注册全局异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler globalExceptionHandler() {return (message, exception, consumer) -> {log.error("捕获消费异常: topic={}, message={}",message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),message.getPayload(),exception);// 反序列化异常特殊处理if (exception.getCause() instanceof DeserializationException) {// 跳过错消息并提交偏移量return null;}throw exception; // 其他异常继续抛出};}}
    @KafkaListener(topics = "test", groupId = "spring-group",errorHandler = "globalExceptionHandler")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));//异常测试int i = 1/0;ack.acknowledge();}
3.重试yml配置
spring:kafka:listener:retry:max-attempts: 3               # 最大重试次数backoff:initial-interval: 1000     # 初始间隔(毫秒)multiplier: 2.0            # 间隔倍数exclude-exceptions:          # 不重试的异常- javax.validation.ValidationException

4.死信队列(DLQ)配置

1. 启用死信队列
spring:kafka:listener:dead-letter-publish:enable: true                  # 自动发布到死信队列dead-letter-topic: dlq-${topic} # 死信队列命名规则
2. 手动发送到DLQ
@KafkaListener(topics = "payments")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {try {paymentService.process(event);ack.acknowledge();} catch (InvalidPaymentException ex) {// 手动发送到DLQkafkaTemplate.send("dlq-payments", event);ack.acknowledge(); // 避免重复消费}
}

5.事务场景异常处理

1. 声明式事务
@Transactional
public void processWithTransaction(Order order) {// 数据库操作orderRepository.save(order);// Kafka事务消息kafkaTemplate.send("orders", order.toEvent());// 其他业务...
}
2. 事务异常回滚
@Bean
public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> pf) {return new KafkaTransactionManager<>(pf);
}@Transactional(rollbackFor = {KafkaException.class, SQLException.class})
public void transactionalProcess() {// 数据库与Kafka操作
}

6.监控与告警

1. Actuator 健康检查
management:endpoints:web:exposure:include: health,kafkahealth:kafka:enabled: true
2. Prometheus 指标
@Bean
public MicrometerConsumerListener<K, V> consumerMetrics() {return new MicrometerConsumerListener<>("kafka.consumer");
}@Bean
public MicrometerProducerListener<K, V> producerMetrics() {return new MicrometerProducerListener<>("kafka.producer");
}

7.完整异常处理流程

  1. 捕获异常 → 2. 分类判断 → 3. 重试/记录/DLQ → 4. 提交Offset → 5. 监控告警

8.最佳实践总结

  • 分层处理:全局处理器兜底 + 方法级精细控制
  • 幂等消费:确保消息重复消费时的数据安全性
  • 监控覆盖:跟踪重试次数、DLQ堆积等关键指标
  • 事务隔离@Transactional + read_committed 保证数据一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/25025.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一键部署DeepSeek

腾讯Cloud Studio提供DeepSeek一键部署功能&#xff0c;0行代码&#xff0c;秒级部署使用&#xff01; 重点是每月免费提供10000分钟&#xff01; 不用等待模型下载&#xff0c;创建即可使用。 内置 Ollama、DeepSeek-R1 1.5B、7B、8B、14B 及 32B 模型。 热门模板 AI模板 前…

【计算机网络】IP协议

目录 1. 协议头格式 2. 网段划分 3. 特殊的IP 4. 公网IP && 内网IP 总结 网络层的IP协议主要解决的是什么问题&#xff1f;——将数据包从B主机发送给C主机&#xff1b;传输层协议tcp提供可靠的策略&#xff1b;网络层IP协议提供数据数据传输的能力&#xff1b; 发…

YOLOv12 ——基于卷积神经网络的快速推理速度与注意力机制带来的增强性能结合

概述 实时目标检测对于许多实际应用来说已经变得至关重要&#xff0c;而Ultralytics公司开发的YOLO&#xff08;You Only Look Once&#xff0c;只看一次&#xff09;系列一直是最先进的模型系列&#xff0c;在速度和准确性之间提供了稳健的平衡。注意力机制的低效阻碍了它们在…

2022年全国职业院校技能大赛网络系统管理赛项模块A:网络构建(样题6)-网络部分解析-附详细代码

目录 附录1:拓扑图 附录2:地址规划表 1.SW1 2.SW2 3.SW3 4.SW4 5.VSU 6.SW7 7.R1 8.R2 9.R3 10.AC1 11.AC2 12.EG1 13.EG2 附录1:拓扑图 附录2:地址规划表

C#实现本地Deepseek模型及其他模型的对话

前言 1、C#实现本地AI聊天功能 WPFOllamaSharpe实现本地聊天功能,可以选择使用Deepseek 及其他模型。 2、此程序默认你已经安装好了Ollama。 在运行前需要线安装好Ollama,如何安装请自行搜索 Ollama下载地址&#xff1a; https://ollama.org.cn Ollama模型下载地址&#xf…

突破“第一崇拜“:五维心理重构之路

一、视频介绍 在这个崇尚"第一"的时代&#xff0c;我们如何找到自己的独特价值&#xff1f;本视频将带您踏上五维心理重构之旅&#xff0c;从诗意人生的角度探讨如何突破"圣人之下皆蝼蚁"的局限。我们将穿越人生的不同阶段&#xff0c;从青春的意气风发到…

SpringWeb

目录 一.SpringWeb 1.SpringWeb 概述 2.SpringWEB 特点 3.SpringWeb 运行流程 4.SpringWEB 组件 二.搭建SpringWeb 1.在pom.xml中导包 2.配置DispatcherServlet 3.开启SpringWEB注解 4.测试 三.接收请求 1.定义地址、请求方式 2.获取请求数据 1&#xff09;使用r…

性能测试的方案编写与执行步骤

性能测试计划书 在测试过程中我们如果编写一份性能测试计划书&#xff0c;需要一下几个背景板块及要点 性能测试的流程&#xff1a; 确认需求&#xff08;确认正确的需求) —>编写测试方案&#xff08;准备怎么动手&#xff09;测试环节—>&#xff08;尽量与生成配置一…

[AI]从零开始的树莓派运行DeepSeek模型教程

一、前言 在前面的教程中&#xff0c;教了大家如何在windows中使用llama.cpp来运行DeepSeek模型。根据前面的教程中&#xff0c;我们也了解到了&#xff0c;我们只需要编译好llama.cpp就可以运行DeepSeek以及类似的LLM模型。那么本次教程就来教大家如何使用树莓派来运行大模型。…

03_pyqt5 + vlc 实现视频播放器

1.功能需求如图 按钮: 播放/暂停, 前进/后退, 视频上一个/下一个, 打开视频进度条: 视频进度条显示, 进度条拖拽, 音量控制按键控制: 1,2,3,4缩放画面大小, 2.方案选择 开发语言: python UI界面: pyqt5 qt_designed 设计ui布局 视频编码: python-vlc 方案说明: 视频解码可…

Linux——高级IO(select后续poll,epoll)

目录 一、poll函数 1.函数原型 2.参数说明 3.struct pollfd 结构体 4.返回值 5.使用步骤 6.与 select 的对比 7.适用场景 8.缺点 9.总结 二、epoll函数 1.核心思想 2.核心函数 1. epoll_create - 创建 epoll 实例 2. epoll_ctl - 管理 epoll 事件表 3. epoll_w…

基于 ‌MySQL 数据库‌对三级视图(用户视图、DBA视图、内部视图)的详细解释

基于 ‌MySQL 数据库‌对三级视图&#xff08;用户视图、DBA视图、内部视图&#xff09;的详细解释&#xff0c;结合理论与实际操作说明&#xff1a; 一、三级视图核心概念 数据库的三级视图是 ANSI/SPARC 体系结构的核心思想&#xff0c;MySQL 的实现逻辑如下&#xff1a; …

突破性能极限:DeepSeek开源FlashMLA解码内核技术解析

引言&#xff1a;大模型时代的推理加速革命 在生成式AI大行其道的今天&#xff0c;如何提升大语言模型的推理效率已成为行业焦点。DeepSeek团队最新开源的FlashMLA项目凭借其惊人的性能表现引发关注——在H800 GPU上实现580 TFLOPS计算性能&#xff0c;这正是大模型推理优化的…

ROS ur10机械臂添加140夹爪全流程记录

ROS ur10机械臂添加140夹爪 系统版本&#xff1a;Ubuntu20.04 Ros版本&#xff1a;noetic Moveit版本&#xff1a;moveit-noetic 参考博客&#xff1a; ur3robotiq ft sensorrobotiq 2f 140配置rviz仿真环境_有末端力传感器的仿真环境-CSDN博客 UR5机械臂仿真实例&#xf…

Redis速成(1)VMware虚拟机安装Redis+Session验证登录注册+MybatisPlus

课程&#xff1a;黑马程序员Redis入门到实战教程&#xff0c;深度透析redis底层原理redis分布式锁企业解决方案黑马点评实战项目_哔哩哔哩_bilibili Mybatis与MybatisPlus: 参考springboot&#xff0c;需要额外写mapper.class&#xff0c;在方法上Select等 在ssm中&#xff0c;…

thinkphp下的Job队列处理

需要think-queue扩展&#xff0c;没有的请composer安装一下 "require": {"php": ">7.2.5","topthink/framework": "^6.1","topthink/think-orm": "^2.0","topthink/think-multi-app": &qu…

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(五)

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷&#xff08;五&#xff09; 第一部分&#xff1a;网络平台搭建与设备安全防护任务书第二部分&#xff1a;网络安全事件响应、数字取证调查、应用程序安全任务书任务 1&#xff1a;应急响应&…

JSON Schema 入门指南:如何定义和验证 JSON 数据结构

文章目录 一、引言二、什么是 JSON Schema&#xff1f;三、JSON Schema 的基本结构3.1 基本关键字3.2 对象属性3.3 数组元素3.4 字符串约束3.5 数值约束 四、示例&#xff1a;定义一个简单的 JSON Schema五、使用 JSON Schema 进行验证六、实战效果6.1 如何使用 七、总结 一、引…

VMware虚拟机Mac版安装Win10系统

介绍 Windows 10是由美国微软公司开发的应用于计算机和平板电脑的操作系统&#xff0c;于2015年7月29日发布正式版。系统有生物识别技术、Cortana搜索功能、平板模式、桌面应用、多桌面、开始菜单进化、任务切换器、任务栏的微调、贴靠辅助、通知中心、命令提示符窗口升级、文…

计算机网络:ICMP协议(Internet控制消息协议)介绍

目录 一、简介 二、为什么要有ICMP协议? 三、ICMP协议报文格式 四、ICMP报文的类型 4.1 差错报文 4.2 查询报文 五、ICMP报文的实际案例 5.1 Ping命令 5.2 Traceroute命令 总结 今天和大家聊聊ICMP协议相关的知识,感兴趣的可以一起了解一下! 一、简介 ICMP(Inte…