Spring Kafka—— KafkaListenerEndpointRegistry 隐式注册分析

由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务,指定监听 Topic 并控制消费程序的启动和停止这样一个功能,所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念,内容如下:

  1. ConsumerFactory
    • 作用:负责创建 Kafka 消费者的实例。ConsumerFactory 是一个简单的工厂类,用于封装消费者的配置(如bootstrap servers, key deserializer, value deserializer等)并生成Consumer实例。
    • 用法:通常在Spring配置类中定义,并通过依赖注入提供给KafkaListenerContainerFactory
  2. ConcurrentKafkaListenerContainerFactory
    • 作用:这个工厂类用于创建 ConcurrentMessageListenerContainer 实例,该容器管理多个Kafka MessageListenerContainer来提供并发消息消费。
    • 特点:可以设置并发消费的数量,即同时运行的MessageListenerContainer的数量。
      支持消息过滤、错误处理和事务管理。
    • 用法:在Spring配置类中定义,并设置其ConsumerFactory和其他相关配置。然后,可以通过@KafkaListener注解直接使用,Spring会自动使用这个工厂来创建监听器。
  3. KafkaListenerEndpointRegistry
    • 作用:这是一个管理类,用于管理应用中所有由@KafkaListener注解创建的消息监听器容器。
    • 特点:提供了启动和停止监听器的方法,可以在运行时控制监听器。
      可以用来查询当前所有注册的监听器的状态。
    • 用法:通常自动配置,可以通过自动注入到任何Spring管理的Bean中,用于运行时管理监听器。
  4. KafkaTemplate
    • 作用:这是一个高级抽象,用于生产消息到Kafka主题。
    • 特点:提供同步和异步发送消息的方法。
      支持事务消息发送。
    • 用法:定义在Spring配置类中,注入生产者工厂ProducerFactory,并用于应用中的消息发送。
  5. @KafkaListener
    作用:注解用于标记方法以作为Kafka消息的监听器,这些方法会自动被Spring容器管理,并在有新消息时触发。
    特点:
    可以指定主题、分区和消费组。
    支持并发消费。
    用法:放在组件的方法上,方法参数可以灵活地映射消息的key、value、headers等。

从上面的内容可以看到,KafkaListenerEndpointRegistry 这个类是管理消息监听容器的,并提供了启动和停止监听器的方法,于是我就想创建这个类来完成我的需求功能。当我直接写如下内容时:

@Component
public class KafkaConfig {@Autowiredprivate KafkaListenerEndpointRegistry registry;@PostConstructpublic void init() {System.out.println(registry);}
}

IDEA提示了 Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found. 但是我启动 SpringBoot 项目却没有报错 :
在这里插入图片描述
我在我的项目中是没有加 @EnableKafka 这样的注解的,代码如下:

@SpringBootApplication
public class SpringKafkaExampleApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaExampleApplication.class, args);}
}

引入的依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>

于是我就比较好奇,项目启动的时候是在什么地方声明了 KafkaListenerEndpointRegistry 这个 bean 的。

KafkaListenerEndpointRegistry 隐式注册分析

SpringBoot 对于 kafka 有如下的自动配置:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {private final KafkaProperties properties;private final RecordMessageConverter recordMessageConverter;private final RecordFilterStrategy<Object, Object> recordFilterStrategy;private final BatchMessageConverter batchMessageConverter;private final KafkaTemplate<Object, Object> kafkaTemplate;private final KafkaAwareTransactionManager<Object, Object> transactionManager;private final ConsumerAwareRebalanceListener rebalanceListener;private final CommonErrorHandler commonErrorHandler;private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;private final RecordInterceptor<Object, Object> recordInterceptor;KafkaAnnotationDrivenConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> recordMessageConverter,ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,ObjectProvider<BatchMessageConverter> batchMessageConverter,ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,ObjectProvider<CommonErrorHandler> commonErrorHandler,ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {this.properties = properties;this.recordMessageConverter = recordMessageConverter.getIfUnique();this.recordFilterStrategy = recordFilterStrategy.getIfUnique();this.batchMessageConverter = batchMessageConverter.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));this.kafkaTemplate = kafkaTemplate.getIfUnique();this.transactionManager = kafkaTransactionManager.getIfUnique();this.rebalanceListener = rebalanceListener.getIfUnique();this.commonErrorHandler = commonErrorHandler.getIfUnique();this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();this.recordInterceptor = recordInterceptor.getIfUnique();}@Bean@ConditionalOnMissingBeanConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);configurer.setBatchMessageConverter(this.batchMessageConverter);configurer.setRecordMessageConverter(this.recordMessageConverter);configurer.setRecordFilterStrategy(this.recordFilterStrategy);configurer.setReplyTemplate(this.kafkaTemplate);configurer.setTransactionManager(this.transactionManager);configurer.setRebalanceListener(this.rebalanceListener);configurer.setCommonErrorHandler(this.commonErrorHandler);configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);configurer.setRecordInterceptor(this.recordInterceptor);return configurer;}@Bean@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));return factory;}@Configuration(proxyBeanMethods = false)@EnableKafka@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)static class EnableKafkaConfiguration {}
}

可以看到这个配置类里面有一个静态的内部类 EnableKafkaConfiguration 该类上声明了 @EnableKafka 注解,也就是说内部静态类EnableKafkaConfiguration使用了@EnableKafka注解,并且通过@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)确保如果Spring上下文中缺少相应的Bean,则自动激活@EnableKafka功能。这意味着,即便你没有在你的应用配置中显式添加@EnableKafka,这个内部类也可以根据条件自动注册所需的Bean,从而启用Kafka的支持。

@EnableKafka 定义如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}

这个注解的使用导致了KafkaListenerConfigurationSelector的激活,其源码如下:

@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {@Overridepublic String[] selectImports(AnnotationMetadata importingClassMetadata) {return new String[] { KafkaBootstrapConfiguration.class.getName() };}}

上面的代码中 DeferredImportSelector是Spring框架中一个特殊的接口,它继承自ImportSelector。它主要用于处理配置类的导入,允许更细致地控制配置类的加载顺序。这个接口特别适用于那些依赖于由Spring容器中其他Bean或配置动态决定的配置。
KafkaListenerConfigurationSelector 这个类实现了DeferredImportSelector并通过selectImports方法返回了一个配置类名称的数组。这个方法指定了当Spring处理到这个选择器时,它应该导入KafkaBootstrapConfiguration类。

KafkaBootstrapConfiguration 内容如下:

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));}if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,new RootBeanDefinition(KafkaListenerEndpointRegistry.class));}}}

KafkaBootstrapConfiguration 是一个实现了ImportBeanDefinitionRegistrar接口的类,主要用于程序化地注册Bean定义到Spring的ApplicationContext中。通过实现ImportBeanDefinitionRegistrar接口,这个类可以在Spring的配置阶段动态地添加Bean定义。

在这个特定的实现中,KafkaBootstrapConfiguration检查特定的Kafka相关Bean(如KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME和KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)是否已经注册。如果这些Bean尚未注册,它会使用RootBeanDefinition手动注册这些Bean到Spring容器中。

RootBeanDefinition 的功能

RootBeanDefinition是Spring框架中用于定义Bean的一个核心类。它是BeanDefinition接口的一个直接实现,提供了一种配置Spring管理的Bean的方式,包括Bean的类类型、生命周期回调、依赖信息等。

  • Bean配置的详细定义:RootBeanDefinition允许开发者详细定义Bean的创建细节,如构造函数参数、属性值、初始化方法、销毁方法等。
  • 高级功能:它还支持更复杂的配置,如懒加载、自动装配模式、作用域和其他高级特性。
  • 程序化Bean注册:通过使用RootBeanDefinition,开发者可以在运行时动态地注册Bean,这对于条件配置或需要响应不同配置环境的高级用途尤为重要。

KafkaBootstrapConfiguration类中,使用RootBeanDefinition来创建和注册KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry类的实例,这些是设置和管理Kafka消息监听器所必需的。

之后在AbstractBeanFactory会根据 beanName 获取到了 RootBeanDefinition 如下图所示:
在这里插入图片描述
然后在如下所示的位置:
在这里插入图片描述
程序创建了 beanName 为 org.springframework.kafka.config.internalKafkaListenerEndpointRegistry 的实例,具体创建实例的位置如下:
在这里插入图片描述
从调试中可以看到此处实例化了 KafkaListenerEndpointRegistry
所以当我们 springboot 项目引入了

 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

依赖后,即使我们不显示的声明 @EnableKafka 程序也会进行初始化相应的配置。

总结

当Spring Boot项目中引入Spring Kafka依赖后,即使我们没有显式声明@EnableKafka,系统仍会自动进行相应的配置。因此,在项目中尝试注入KafkaListenerEndpointRegistry时,尽管IDE可能会提示“Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found.”,项目依然能够正常启动。这是因为KafkaListenerEndpointRegistry在Spring Kafka的自动配置过程中已被隐式注册。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/315935.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】信号的产生

目录 一. 信号的概念signal() 函数 二. 信号的产生1. 键盘发送2. 系统调用kill()raise()abort() 3. 软件条件alarm() 4. 硬件异常除零错误:野指针: 三. 核心转储 一. 信号的概念 信号是消息的载体, 标志着不同的行为; 是进程间发送异步信息的一种方式, 属于软中断. 信号随时都…

PNPM - nodejs 包管理

文章目录 一、关于 PNPM开发动机1、节省磁盘空间2、提升安装速度3、创建一个 non-flat node_modules 文件夹 二、安装通过 npm 安装 pnpm通过 Homebrew 安装 pnpm 三、pnpm CLI1、与 npm 的差异2、参数-C <path>, --dir <path>-w, --workspace-root 3、命令4、环境…

免费语音转文字:自建Whisper,贝锐花生壳3步远程访问

Whisper是OpenAI开发的自动语音识别系统&#xff08;语音转文字&#xff09;。 OpenAI称其英文语音辨识能力已达到人类水准&#xff0c;且支持其它98中语言的自动语音辨识&#xff0c;Whisper神经网络模型被训练来运行语音辨识与翻译任务。 此外&#xff0c;与其他需要联网运行…

Blender曲线操作

1.几种常见建模方式 -多边形建模&#xff1a;Blender&#xff0c;C4D&#xff0c;3DsMax&#xff0c;MaYa -曲线&#xff1a; -曲面&#xff1a;Rhino&#xff08;Nurbs&#xff09; -雕刻&#xff1a;Blender&#xff0c;ZBrush -蜡笔&#xff1a;Blender 1&#xff09;新…

windows11家庭版开启Hyper-v

前提&#xff1a;如果在控制面板中-->程序和功能-->启用和关闭windows功能-->没有Hyper-v 1.什么是Hyper-v&#xff1f; Hyper-v分为两个部分&#xff1a;底层的虚拟机平台、上层的虚拟机管理软件 2.Hyper-v安装 2.1新建hyper.cmd文件&#xff0c;写入下面的内容&…

物联网:从电信物联开发平台AIoT获取物联设备上报数据示例

设备接入到电信AIoT物联平台后&#xff0c;可以在平台上查询到设备上报的数据。 下面就以接入的NBIOT物联远传水表为例。 在产品中选择指定设备&#xff0c;在数据查看中可以看到此设备上报的数据。 示例中这组数据是base64位加密的&#xff0c;获取后还需要转换解密。 而我…

Linux软件包管理器——yum

文章目录 1.什么是软件包1.1安装与删除命令1.2注意事项1.3查看软件包1.3.1注意事项&#xff1a; 2.关于rzsz3.有趣的Linux下的指令 -sl 1.什么是软件包 在Linux下安装软件, 一个通常的办法是下载到程序的源代码, 并进行编译, 得到可执行程序. 但是这样太麻烦了, 于是有些人把一…

操作系统安全:Windows与Linux的安全标识符,身份鉴别和访问控制

「作者简介」:2022年北京冬奥会网络安全中国代表队,CSDN Top100,就职奇安信多年,以实战工作为基础对安全知识体系进行总结与归纳,著作适用于快速入门的 《网络安全自学教程》,内容涵盖系统安全、信息收集等12个知识域的一百多个知识点,持续更新。 操作系统有4个安全目标…

Unity Meta Quest MR 开发(七):使用 Stencil Test 模板测试制作可以在虚拟与现实之间穿梭的 MR 传送门

文章目录 &#x1f4d5;教程说明&#x1f4d5;Stencil Test 模板测试&#x1f4d5;Stencil Shader&#x1f4d5;使用 Unity URP 渲染管线设置模板测试⭐Render Pipeline Asset 与 Universal Renderer Data⭐删除场景中的天空盒⭐设置虚拟世界的层级 Layer⭐设置模板测试 &#…

大数据运维之数据质量管理

第1章 数据质量管理概述 1.1 数据质量管理定义 数据质量管理&#xff08;Data Quality Management&#xff09;&#xff0c;是指对数据从计划、获取、存储、共享、维护、应用、消亡生命周期的每个阶段里可能引发的各类数据质量问题&#xff0c;进行识别、度量、监控、预警等一…

使用工具速记

文章目录 一、sqlyoy登录账号信息迁移二、idea导入之前的已配置的idea信息三、设置windows UI大小四、其他 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、sqlyoy登录账号信息迁移 工具(sqlyog上面菜单栏)->导入导出详情->选择要导出的账号…

Graph Neural Networks(GNN)学习笔记

本学习笔记的组织结构是&#xff0c;先跟李沐老师学一下&#xff0c;再去kaggle上寻摸一下有没有类似的练习&#xff0c;浅做一下&#xff0c;作为一个了解。 ———————————0428更新—————————————— 课程和博客看到后面准备主要看两个&#xff1a;GCN和…

jvm知识点总结(二)

Java8默认使用的垃圾收集器是什么? Java8版本的Hotspot JVM,默认情况下使用的是并行垃圾收集器&#xff08;Parallel GC&#xff09; 如果CPU使用率飙升&#xff0c;如何排查? 1.先通过top定位到消耗最高的进程id 2.执行top -h pid单独监控该进程 3.在2中输入H&#xff…

Gin的中间件执行流程与用法

一、背景 我们在使用Gin框架进行Web开发的时候&#xff0c;基本上都会遇到登录拦截的场景。 例如某些接口必须在登录以后才能访问&#xff0c;根据登录用户的信息以及权限&#xff0c;拿到属于自己的数据, 反之&#xff0c;没登录过则直接拒绝访问。 那么我们怎么做到这些登录…

Unreal Engine添加UGameInstanceSubsystem子类

点击C类文件夹&#xff0c;在右边的区域点击鼠标右键&#xff0c;在弹出的菜单中选择“新建C类”在弹出的菜单中选中“显示所有类”&#xff0c;选择GameInstanceSubsystem作为父类, 点击“下一步”按钮输入子类名称“UVRVIUOnlineGameSubsystem”&#xff0c;选择插件作为新类…

9种单片机常用的软件架构

长文预警&#xff0c;加代码5000多字&#xff0c;写了4个多小时&#xff0c;盘软件架构&#xff0c;这篇文章就够了! 可能很多工程师&#xff0c;工作了很多年&#xff0c;都不会有软件架构的概念。 因为我在做研发工程师的第6年&#xff0c;才开始意识到这个东西&#xff0c;在…

IDEA主题美化【保姆级】

前言 一款好的 IDEA 主题虽然不能提高我们的开发效率&#xff0c;但一个舒适简单的主题可以使开发人员更舒适的开发&#xff0c;时常换一换主题可以带来不一样的体验&#xff0c;程序员的快乐就这么简单。话不多说&#xff0c;先上我自己认为好看的主题设置。 最终效果图: 原…

qt学习篇---C++基础学习

本学习笔记学习下面视频总结&#xff0c;感兴趣可以去学习。讲的很详细 【北京迅为】嵌入式学习之QT学习篇_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1tp4y1i7EJ/?spm_id_from333.337.search-card.all.click&vd_source8827cc0da16223b9f2ad8ae7111de9e2 目录 C…

使用逆滤波算法deconvwnr恢复图像回复图像时,产生了很多横竖条纹。解决办法

使用逆滤波算法deconvwnr恢复图像回复图像时&#xff0c;产生了很多横竖条纹。解决办法 原来的代码 % 清除工作空间并关闭所有图形窗口 clear; clc; close all;% 读取原始图像 original_image imread(pic3.jpg);% 显示原始图像 subplot(131); imshow(original_image); title…

区块链技术:NFG元宇宙电商模式

大家好&#xff0c;我是微三云周丽 随着互联网技术的迅猛发展&#xff0c;电子商务行业逐渐崛起为现代经济的重要支柱。而在这一浪潮中&#xff0c;元宇宙电商以其独特的商业模式和巨大的发展潜力&#xff0c;成为行业的新宠。其中&#xff0c;NFG作为元宇宙电商模式的代表&am…