springboot kafka在kafka server AUTH变动后consumer自动销毁

前言

笔者使用了kafka用来传输数据,笔者在今年10月写了文章,怎么使用配置化实现kafka的装载:springboot kafka多数据源,通过配置动态加载发送者和消费者-CSDN博客

不过在实际运行中,kafka broker是加密的,本来也没啥,但是突然的一天笔者在监控发现消费者掉线了,发送者居然还是正常的,见鬼的事情就是这么朴实的发生了,而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set

这样的错误,还有

Closing the Kafka consumer
Kafka consumer has been closed

这样的日志,非常诡异,后面查询kafka集群才知道,kafka集群在某个时间被改了AUTH配置,然后又改回来了,神奇的操作。

现象

实际上看到日志是懵的,毕竟kafka是对方提供的,AUTH用户名和密码也是对方给的,而且发送者也出现AUTH失败,但是发送者一直重试,然后因为kafka集群的AUTH改回来了,重试成功了;唯独消费者AUTH失败后,关闭了。

源码分析

从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set

发现日志出现在

org.springframework.kafka.listener.KafkaMessageListenerContainer

刚好在消息监听器容器的内部类

ListenerConsumer

看源码定义,是一个定时线程池的任务定义

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback

分析怎么去消费的KafkaMessageListenerContainer的doStart方法

既然明晰了,那么分析问题的来源, run方法

		public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent(); //事件通知,spring事件this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions(); // 初始分配partitionspublishConsumerStartedEvent(); // 消费者启动事件Throwable exitThrowable = null;this.lastReceive = System.currentTimeMillis();while (isRunning()) { //状态在上面的截图代码已经更新为运行状态try {pollAndInvoke(); // 队列拉取,拉取过程会出现各种异常}catch (NoOffsetForPartitionException nofpe) { //这个需要注意,但是这个是不可配置的this.fatalError = true;ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");exitThrowable = nofpe;break;}catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败,就是来源于kafka broker的auth鉴权if (this.authExceptionRetryInterval == null) { //时间配置,默认居然是nullListenerConsumer.this.logger.error(ae,"Authentication/Authorization Exception and no authExceptionRetryInterval set");this.fatalError = true;exitThrowable = ae;break; //循环结束}else { // 一段时间后重试ListenerConsumer.this.logger.error(ae,"Authentication/Authorization Exception, retrying in "+ this.authExceptionRetryInterval.toMillis() + " ms");// We can't pause/resume here, as KafkaConsumer doesn't take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError = true;ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG+ "' has been fenced");exitThrowable = fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, "Stopping container due to fencing");stop(false);exitThrowable = e;}catch (Error e) { // NOSONAR - rethrownthis.logger.error(e, "Stopping container due to an Error");this.fatalError = true;wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}//上面异常后,这里的处理wrapUp(exitThrowable);}

注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException,其中能配置重试的是AuthenticationException | AuthorizationException异常,就是授权失败可以通过配置过一段时间抢救一下,一般而言,kafka首次授权失败基本上就不太可能成功了,但是这个只能控制consumer销毁,producer还在重试,所以出现了消费者销毁了,发送者在kafka集群auth还原后成功恢复。看看wrapUp方法

既然知道了原理,那么解决办法是配置

authExceptionRetryInterval

解决方法

配置authExceptionRetryInterval也是不容易,分析取值

private final Duration authExceptionRetryInterval =this.containerProperties.getAuthExceptionRetryInterval();

从 containerProperties来的,实际上是继承org.springframework.kafka.listener.ConsumerProperties

在spring-kafka 2.8版本还对属性命名重构了,毕竟以前的命名字母太多了,😁

不过要修改这个值也不容易,kafkaproperties并没有提供这个参数,而且创建消费者容器工厂时

org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory

那么只能在

ConcurrentMessageListenerContainer

创建成功后,从这个里面读取配置,设置默认值。这就可以使用前面埋点的spring事件了,通过事件拿到Consumer,不就可以修改配置了

使用

publishConsumerStartedEvent();

的事件最合适,执行循环前最后一个事件,而且这里的this,就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象,就是我们需要的

编写代码如下:

@Component
public class KafkaConsumerStartedListener implements ApplicationListener<ConsumerStartedEvent> {@Overridepublic void onApplicationEvent(ConsumerStartedEvent event) {KafkaMessageListenerContainer<?, ?> container = event.getSource(KafkaMessageListenerContainer.class);container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));}
}

就这样就解决了问题,哈哈哈贼简单。 

笔者在使用API时发现了还有一个API是授权失败后重启,是布尔变量,默认值是false,即不重试。那么这个是哪里触发的呢,在我们看到的消费者销毁事件里面,实际上控制重试有多种办法,一个是循环去kafka broker拉取,一个是重启kafka消费者容器。

 

这个在上面已经说明了,简单截个图再说明一下

设置的地方还是上面的代码里面,同理也可以修改事件为

ConsumerStoppedEvent

可以更精准,当然就用刚刚的代码加一行设置也是可以的。

总结

kafka在发送者和消费者是区分开的,发送者如果连接kafka broker失败后可以一直重试直到成功,但是消费者确有各种各样的逻辑,可以精准控制,比如消费者重启的配置

restartAfterAuthExceptions

可以控制消费者在停止时重启,如果仅仅是授权失败,而且不需要反复重启(消耗资源),那么可以通过

authExceptionRetryInterval

配置时间周期的方式实现,但是kafka并没有给我们配置的入口,但是kafka在消费者启动消费的过程埋了很多spring事件钩子,通过这些钩子可以操作,估计spring-kafka也不希望我们去修改,毕竟消费者启动失败了或授权失败了,消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件,可以在启动完成通过

org.springframework.kafka.config.KafkaListenerEndpointRegistry

拿到所有消费者容器,来批量设置属性,毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/484522.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jupyter Lab打印日志

有时候在 jupyter 中执行运行时间较长的程序&#xff0c;且需要一直信息&#xff0c;但是程序执行到某些时候就不再打印了。 可以开启 日志控制台&#xff0c;将日志信息记录在控制台中。 参考&#xff1a;https://www.autodl.com/docs/jupyterlab/

EtherCAT转ProfiNet网关实现西门子1200PLC与伺服电机连接的通讯案例

一. 案例背景 西门子1200PLC通过捷米特JM-ECTM-PN(EtherCAT转ProfiNet)网关将松下伺服电机(包括不限于型号MHMFO22D1U2M)或EtherCAT协议的其它设备或连接到ProfiNetPLC上&#xff0c;并在正常运行中支持EtherCAT协议。本产品可作为EtherCAT主站&#xff0c;做为西门子S7-1200系…

Redis 基础、Redis 应用

Redis 基础 什么是 Redis&#xff1f; Redis &#xff08;REmote DIctionary Server&#xff09;是一个基于 C 语言开发的开源 NoSQL 数据库&#xff08;BSD 许可&#xff09;。与传统数据库不同的是&#xff0c;Redis 的数据是保存在内存中的&#xff08;内存数据库&#xf…

Vue 组件通信全面解析

Vue 组件通信全面解析&#xff1a;方式、原理、优缺点及最佳实践 在 Vue 开发中&#xff0c;组件通信是一个重要的核心问题。随着应用复杂度的增加&#xff0c;如何在组件之间有效传递数据、触发事件&#xff0c;直接影响代码的可维护性和可扩展性。Vue 提供了多种组件通信方式…

Python-链表数据结构学习(1)

一、什么是链表数据&#xff1f; 链表是一种通过指针串联在一起的数据结构&#xff0c;每个节点由2部分组成&#xff0c;一个是数据域&#xff0c;一个是指针域&#xff08;存放下一个节点的指针&#xff09;。最后一个节点的指针域指向null&#xff08;空指针的意思&#xff0…

电脑插入耳机和音响,只显示一个播放设备

1. 控制面板-硬件和声音-Realtek高清音频-扬声器-设备高级设置-播放设备里选择使用前部和后部输出设备同时播放两种不同的音频流 在声音设置中就可以看到耳机播放选项

ISAAC SIM踩坑记录--添加第三方3D场景

ISAAC SIM仿真首先就是要有合适的3D场景&#xff0c;官方提供了一些场景&#xff0c;如果不能满足要求&#xff0c;那就只能自己建。 对于我这种不会3D建模的菜鸟&#xff0c;只能到网上下载了&#xff0c;sketchfab就是一个不错的平台&#xff0c;有不少免费资源可以下载。 …

CentOS 9 配置静态IP

文章目录 1_问题原因2_nmcli 配置静态IP3_使用配置文件固定IP4_重启后存在的问题5_nmcli 补充 1_问题原因 CentOS 7 于 2014年6月发布&#xff0c;基于 RHEL 7&#xff0c;并在 2024年6月30日 结束维护。 CentOS 9 作为目前的最新版本&#xff0c;今天闲来闲来无事下载下来后…

C++趣味编程玩转物联网:基于树莓派Pico控制无源蜂鸣器-实现音符与旋律的结合

无源蜂鸣器是一种多功能的声音输出设备,与有源蜂鸣器相比,它能够通过不同频率的方波生成丰富多样的音调。本项目使用树莓派Pico开发板,通过编程控制无源蜂鸣器播放经典旋律《归来有风》。本文将详细介绍项目实现中的硬件连接、C++代码解析,以及无源蜂鸣器的工作原理。 一、…

【AI模型对比】Kimi与ChatGPT的差距:真实对比它们在六大题型中的全面表现!

文章目录 Moss前沿AI语义理解文学知识数学计算天文学知识物理学知识英语阅读理解详细对比列表总结与建议 Moss前沿AI 【OpenAI】获取OpenAI API Key的多种方式全攻略&#xff1a;从入门到精通&#xff0c;再到详解教程&#xff01;&#xff01; 【VScode】VSCode中的智能AI-G…

Runway 技术浅析(六):文本到视频(Text-to-Video)

1. 核心组件与工作原理 1.1 自然语言处理&#xff08;NLP&#xff09; 1.1.1 文本解析与语义理解 文本到视频的第一步是将用户输入的自然语言文本解析为机器可理解的语义信息。Runway 使用预训练的 NLP 模型&#xff0c;如 GPT-3 和 BERT&#xff0c;这些模型通过大规模文本数…

【C++】双温度转换与并联电阻计算的编程题分析与优化

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;从华氏温度到摄氏温度的转换题目背景与华氏温度与摄氏温度的转换初始代码与验证通过的解法代码分析特点与优缺点 老师的代码&#xff1a;结合 C 与 C 的风格代码分析与对比…

【采样率、采样定理、同步和异步采样】

内容来源&#xff1a;【数据采集卡的【采样率】【采样定理】【同步采样】【异步采样】的相关说明】 此篇文章仅作笔记分享。 前言 模拟信号需要通过采样、储存、量化、编码这几个步骤转换成数字信号&#xff0c;本篇文章将会对采样进行一个更详细的说明。 采样 采样就是将一…

笔记本电脑usb接口没反应怎么办?原因及解决方法

笔记本电脑的USB接口是我们日常使用中非常频繁的一个功能&#xff0c;无论是数据传输、充电还是外接设备&#xff0c;都离不开它。然而&#xff0c;当USB接口突然没有反应时&#xff0c;这无疑会给我们的工作和学习带来不小的困扰。下面&#xff0c;我们就来探讨一下笔记本USB接…

linux运维之shell编程

Shell 编程在系统运维中及其重要 1. Shell 编程概述 Shell 是一种命令行解释器&#xff0c;能够执行操作系统的命令。Shell 脚本是一个包含一系列 Shell 命令的文件&#xff0c;它可以被执行&#xff0c;以自动化和批量处理任务。常用的 Shell 类型包括 bash、sh、zsh 等。Shel…

怎么自己创建一个网站? 开发语言首选 java,使用CMS网站内容管理系统是不错的选择

怎么自己创建一个网站 推荐使用 Java CMS 网站内容管理系统&#xff0c;根据网站规划的功能模块&#xff0c;创建不同的页面风格&#xff1b; 文章目录 怎么自己创建一个网站一、规划网站1.1确定网站主题和目的1.2规划网站结构和内容 二、注册域名2.1选择域名注册商2.2 查找并…

小米澎湃OS2跟蜂窝网络相关的设置和调试【功能设计】

界面功能 开发者模式下&#xff0c;支持数据和WLAN网络相关的设置&#xff0c;跟数据有关的主要如下&#xff1a; 蜂窝网络调试 > 5G-A 特性中心始终开启移动数据网络&#xff08;便于WiFi和数据快速切换&#xff0c;在国外北美运营商有些需求中明确定义要开着&#xff09…

八、Python —— 类、异常处理、模块、包的管理、虚拟环境

文章目录 一、类1.1、类的定义1.2、类变量和实例变量1.3、类的继承 二、异常处理三、模块 和 包3.1、模块和包的概念&#xff0c;以及项目的树形结构3.2、模块和包的使用 (from import)方法一&#xff1a;from 模块 import 变量/函数/类方式二&#xff1a;from 模块 import 变量…

【text2sql】低资源场景下Text2SQL方法

SFT使模型能够遵循输入指令并根据预定义模板进行思考和响应。如上图&#xff0c;、 和 是用于通知模型在推理过程中响应角色的角色标签。 后面的内容表示模型需要遵循的指令&#xff0c;而 后面的内容传达了当前用户对模型的需求。 后面的内容代表模型的预期输出&#xff0c;也…

自适应神经网络架构:原理解析与代码示例

个人主页&#xff1a;chian-ocean 文章专栏 自适应神经网络结构&#xff1a;深入探讨与代码实现 1. 引言 随着深度学习的不断发展&#xff0c;传统神经网络模型在处理复杂任务时的局限性逐渐显现。固定的网络结构和参数对于动态变化的环境和多样化的数据往往难以适应&#…