@KafkaListener和KafkaTemplate自动装配原理分析

依赖项和配置信息参见另一篇博文@KafkaListener的配置使用,这里主要借助源码分析@KafkaListener和KafkaTemplate自动装配原理。

1、KafkaAutoConfiguration 源码分析

KafkaAutoConfiguration类自动装配生成了生产者客户端KafkaTemplate的bean和消费者基础ConsumerFactory的bean,KafkaAutoConfiguration导入KafkaAnnotationDrivenConfiguration,KafkaAnnotationDrivenConfiguration最终生成了ConcurrentKafkaListenerContainerFactory的bean, 该bean是@KafkaListener默认使用的容器工厂,即指定了消费的kafka集群。

package org.springframework.boot.autoconfigure.kafka;import java.io.IOException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;@Configuration(proxyBeanMethods = false
)
/// KafkaTemplate在类路径下存在时,加载该配置类
@ConditionalOnClass({KafkaTemplate.class})  
@EnableConfigurationProperties({KafkaProperties.class})
/// 将KafkaAnnotationDrivenConfiguration、KafkaStreamsAnnotationDrivenConfiguration配置类合并过来
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})  
public class KafkaAutoConfiguration {private final KafkaProperties properties;public KafkaAutoConfiguration(KafkaProperties properties) {this.properties = properties;}@Bean  /// KafkaTemplate的bean未定义时,自动生成该bean@ConditionalOnMissingBean({KafkaTemplate.class})public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean@ConditionalOnMissingBean({ProducerFactory.class})public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> {customizer.customize(factory);});return factory;}@Bean@ConditionalOnMissingBean({ProducerListener.class})public ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener();}@Bean/// ConsumerFactory的bean未定义时,自动生成该bean@ConditionalOnMissingBean({ConsumerFactory.class})public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> {customizer.customize(factory);});return factory;}@Bean@ConditionalOnProperty(name = {"spring.kafka.producer.transaction-id-prefix"})@ConditionalOnMissingBeanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager(producerFactory);}@Bean@ConditionalOnProperty(name = {"spring.kafka.jaas.enabled"})@ConditionalOnMissingBeanpublic KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean@ConditionalOnMissingBeanpublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}
}

2、KafkaAnnotationDrivenConfiguration 源码分析

package org.springframework.boot.autoconfigure.kafka;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;/*** Configuration for Kafka annotation-driven support.** @author Gary Russell* @author Eddú Meléndez*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {private final KafkaProperties properties;private final RecordMessageConverter messageConverter;private final BatchMessageConverter batchMessageConverter;private final KafkaTemplate<Object, Object> kafkaTemplate;private final KafkaAwareTransactionManager<Object, Object> transactionManager;private final ConsumerAwareRebalanceListener rebalanceListener;private final ErrorHandler errorHandler;private final BatchErrorHandler batchErrorHandler;private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;private final RecordInterceptor<Object, Object> recordInterceptor;KafkaAnnotationDrivenConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> messageConverter,ObjectProvider<BatchMessageConverter> batchMessageConverter,ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,ObjectProvider<BatchErrorHandler> batchErrorHandler,ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {this.properties = properties;this.messageConverter = messageConverter.getIfUnique();this.batchMessageConverter = batchMessageConverter.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));this.kafkaTemplate = kafkaTemplate.getIfUnique();this.transactionManager = kafkaTransactionManager.getIfUnique();this.rebalanceListener = rebalanceListener.getIfUnique();this.errorHandler = errorHandler.getIfUnique();this.batchErrorHandler = batchErrorHandler.getIfUnique();this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();this.recordInterceptor = recordInterceptor.getIfUnique();}@Bean@ConditionalOnMissingBeanConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))? this.batchMessageConverter : this.messageConverter;configurer.setMessageConverter(messageConverterToUse);configurer.setReplyTemplate(this.kafkaTemplate);configurer.setTransactionManager(this.transactionManager);configurer.setRebalanceListener(this.rebalanceListener);configurer.setErrorHandler(this.errorHandler);configurer.setBatchErrorHandler(this.batchErrorHandler);configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);configurer.setRecordInterceptor(this.recordInterceptor);return configurer;}@Bean@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")  /// ConcurrentKafkaListenerContainerFactory的bean未定义时,自动生成该bean(这是@KafkaListener默认使用的容器工厂)ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);return factory;}@Configuration(proxyBeanMethods = false)@EnableKafka@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)static class EnableKafkaConfiguration {}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/23935.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pycharm管理虚拟环境

不借用Anoconda 1.检查pip所在位置&#xff0c; 因为pip的默认安装路径是python的安装目录下的依赖库路径D:\Program Files\Python397\Lib\site-packages。项目如果用之前pycharm创建的环境是无法加载这个路径的库的。 2.安装时指定安装路径 千万要注意指定安装路径为项目的…

DeepSeek 助力 Vue 开发:打造丝滑的 复选框(Checkbox)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…

FMT源码 - module

module 功能模块 1、uMCN uMCN 是 类似于 PX4里面的 uORB 模块。 mcn listmcn echo sensor_imu0mcn echo <topic> [options]options:-n, --number Set topic echo number, e.g, -n 10 will echo 10 times. (朝终端打印的次数)-p, --period Set topic echo peri…

城电科技|会追日的智能花,光伏太阳花开启绿色能源新篇章

当艺术与科技相遇&#xff0c;会碰撞出怎样的火花&#xff1f;城电科技推出的光伏太阳花&#xff0c;以其独特的设计与智能化的功能&#xff0c;给出了答案。这款产品不仅具备太阳能发电的实用功能&#xff0c;更是一件充满科技属性的艺术性光伏产品&#xff0c;吸引了广泛关注…

湖北中医药大学谱度众合(武汉)生命科技有限公司研究生工作站揭牌

2025年2月11日&#xff0c;湖北中医药大学&谱度众合&#xff08;武汉&#xff09;生命科技有限公司研究生工作站揭牌仪式在武汉生物技术研究院一楼101会议室举行&#xff0c;湖北中医药大学研究生院院长刘娅教授、基础医学院院长孔明望教授、基础医学院赵敏教授、基础医学院…

计算机网络————(一)HTTP讲解

基础内容分类 从TCP/IP协议栈为依托&#xff0c;由上至下、从应用层到基础设施介绍协议。 1.应用层&#xff1a; HTTP/1.1 Websocket HTTP/2.0 2.应用层的安全基础设施 LTS/SSL 3.传输层 TCP 4.网络层及数据链路层 IP层和以太网 HTTP协议 网络页面形成基本 流程&#xff1a…

货车一键启动无钥匙进入手机远程启动的正确使用方法

一、移动管家货车无钥匙进入系统的使用方法 基本原理&#xff1a;无钥匙进入系统通常采用RFID无线射频技术和车辆身份识别码识别系统。车钥匙需要随身携带&#xff0c;当车钥匙靠近货车时&#xff0c;它会自动与货车的解码器匹配。开门操作&#xff1a;当靠近货车后&#xff0…

2.2logstash规则配置

工作流程 Logstash工作的三个阶段&#xff1a; input数据输入端&#xff0c;以接收来自任何地方的源数据 * file&#xff1a;从文件中读取 * syslog&#xff1a;监听在514端口的系统日志信息, 并解析成RFC3164格式 * redis&#xff1a;从redis-server list中获取 * beat&a…

Java进阶:Zookeeper相关笔记

概要总结&#xff1a; ●Zookeeper是一个开源的分布式协调服务&#xff0c;需要下载并部署在服务器上(使用cmd启动&#xff0c;windows与linux都可用)。 ●zookeeper一般用来实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能。 ●有多台服…

GB 44497-2024《智能网联汽车 自动驾驶数据记录系统》标准解读

GB 44497-2024《智能网联汽车 自动驾驶数据记录系统》是由工业和信息化部提出并归口的强制性国家标准&#xff0c;由国家市场监督管理总局、国家标准化管理委员会于2024年8月23日批准发布(国家标准公告2024年第18号文)&#xff0c;将于2026年1月1日起实施。标准规定了智能网联汽…

在低功耗MCU上实现人工智能和机器学习

作者&#xff1a;Silicon Labs 人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;技术不仅正在快速发展&#xff0c;还逐渐被创新性地应用于低功耗的微控制器&#xff08;MCU&#xff09;中&#xff0c;从而实现边缘AI/ML解决方案。这些MCU是许多嵌入式…

[数据结构笔记]数据结构必要的C语言基础

数据结构必要的C语言基础 使用C语言学习数据结构之前有一些必要了解的基础&#xff0c;许多同学在初学数据结构时因为对这些知识不熟&#xff0c;导致了对数据结构的畏惧心理。实际上很大一部分来自C语言的基础 C语言 结构体与指针 ​ 在一些场景中&#xff0c;如果传递给函…

Java进阶(一)

文章目录 前言一、常用类 1.Object类常用方法 toString方法equals方法fianlize()方法 2. String类 String字符串的储存原理内存图分析String常用的构造方法String常用方法3. StringBuilder/StringBuffer类 4. 基本类型包装类 简介包装类类的常用方法&#xff08;以Integer为例…

蓝桥杯单片机组第十二届省赛第二批次

前言 第十二届省赛涉及知识点&#xff1a;NE555频率数据读取&#xff0c;NE555频率转换周期&#xff0c;PCF8591同时测量光敏电阻和电位器的电压、按键长短按判断。 本试题涉及模块较少&#xff0c;题目不难&#xff0c;基本上准备充分的都能完整的实现每一个功能&#xff0c;并…

微信小程序调用火山方舟(字节跳动火山引擎)中的DeepSeek大模型

一、注册火山引擎账号&#xff0c;创建API Key和model&#xff08;接入点ID&#xff09; 1.注册并登陆火山引擎账号&#xff0c;网址为&#xff1a;https://console.volcengine.com/ 2.根据登陆后的页面提示进行实名认证&#xff0c;实名认证后才能创建API Keyt和创建接入点。…

全星FMEA软件系统是一款高效、智能的失效模式及影响分析工具,广泛应用于汽车、电子、机械等行业

全星FMEA软件系统是一款高效、智能的失效模式及影响分析工具&#xff0c;广泛应用于汽车、电子、机械等行业。该系统基于2019版FMEA手册开发&#xff0c;严格遵循七步方法&#xff0c;能够全面识别潜在风险并提前制定应对措施。 全星FMEA软件系统功能特点 自动化分析&#xff…

ROS的action通信——实现阶乘运算(一)

在ROS中除了常见的话题(topic&#xff09;通信、服务(server)通信等方式&#xff0c;还有action通信这一方式&#xff0c;由于可以实时反馈任务完成情况&#xff0c;该通信方式被广泛运用于机器人导航等任务中。本文将通过三个小节的分享&#xff0c;实现基于action通信的阶乘运…

一文读懂什么是K8s Admission Controller

#作者&#xff1a;曹付江 文章目录 1、什么是 Admission Controllers&#xff1f;2、如何创建 Admission Controllers&#xff1f;3、Admission 控制器的最佳实践 K8s 中的操作与安全标准执行机制&#xff1a; 1、什么是 Admission Controllers&#xff1f; Admission contro…

vue3:vue3项目安装并引入Element-plus

一、安装Element-plus 1、安装语句位置 安装 | Element Plushttps://element-plus.org/zh-CN/guide/installation.html根据所需进行安装&#xff0c;这里使用npm包 2、找到项目位置 找到项目位置&#xff0c;在路径上输入cmd回车打开“运行”窗口 输入安装语句回车完成安装 …

【C/C++】理解C++内存与Linux虚拟地址空间的关系---带你通透C++中所有数据

每日激励&#xff1a;“不设限和自我肯定的心态&#xff1a;I can do all things。 — Stephen Curry” 绪论&#xff1a; 本质编写的原因是我在复习过程中突然发现虚拟地址空间和C内存划分我好想有点分不清时&#xff0c;进行查询各类资料和整理各类文章后得出的文章&#xff…