Spring kafka源码分析——消息是如何消费的

文章目录

    • 概要
    • 端点注册
    • 创建监听容器
    • 启动监听容器
    • 消息拉取与消费
    • 小结

概要

本文主要从Spring Kafka的源码来分析,消费端消费流程;从spring容器启动到消息被拉取下来,再到执行客户端自定义的消费逻辑,大致概括为以下4个部分:

在这里插入图片描述

源码分析主要也是从以上4个部分进行分析;

环境准备

maven依赖如下:

 	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.6.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

消费端代码:

 @KafkaListener(topics = KafkaController.TOPIC_TEST_ERROR, groupId = "${spring.application.name}")public void replicatedTopicConsumer2(ConsumerRecord<String, String> recordInfo) {int partition = recordInfo.partition();System.out.println("partition:" + partition);String value = recordInfo.value();System.out.println(value);}

参数配置使用默认配置

端点注册

KafkaAutoConfiguration

与其他组件相同,spring-kafka的入口加载入口类也是以AutoConfiguration结尾,即:KafkaAutoConfiguration,由于本文重点分析消费者流程,自动类这里主要关注以下几个地方:
在这里插入图片描述
在这里插入图片描述
kafka启动后,会自动将ConcurrentKafkaListenerContainerFactory加载到容器中。

一般来说,消费端会使用到@KafkaListener注解或者@KafkaListeners注解,所以,我们的重点就是只要是关注,这两个注解是如何被识别,并且起到监听作用的,以下是类的加载流程:

在这里插入图片描述
Bean在执行init方法后会调用,初始化后置处理方法,而KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor,KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization就会被触发执行,在该方法中,会读取该bean中标注了@KafkaListener@KafkaListeners的方法
在这里插入图片描述

protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}

从上面看出,每个标注了KafkaListener注解的方法都会创建一个MethodKafkaListenerEndpoint,接着调用KafkaListenerEndpointRegistrar#registerEndpoint(KafkaListenerEndpoint,KafkaListenerContainerFactory<?>)进行注册

在这里插入图片描述
MethodKafkaListenerEndpoint又得到KafkaListenerEndpointDescriptor,最后将有的KafkaListenerEndpointDescriptor放到endpointDescriptors集合中

这里需要注意的是,KafkaListenerAnnotationBeanPostProcessor中的KafkaListenerEndpointRegistrar registrar属性是new出来的,并没有在spring容器中,而后面的创建监听器时还会再用到。
在这里插入图片描述
以上就是kafka端点注册流程。

创建监听容器

spring kafka把每个标注了KafkaListener注解的方法称为Endpoint,为每个方法生成了一个MethodKafkaListenerEndpoint对象,同时又为每个端点生成了一个MessageListenerContainer;以下是具体的生成流程

在这里插入图片描述
KafkaListenerAnnotationBeanPostProcessor实现了SmartInitializingSingleton,其中的方法afterSingletonsInstantiated会在bean初始化后进行执行

@Override
public void afterSingletonsInstantiated() {// 这个registrar没有放入到spring 容器中this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {Map<String, KafkaListenerConfigurer> instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {if (this.endpointRegistry == null) {Assert.state(this.beanFactory != null,"BeanFactory must be set to find endpoint registry by bean name");this.endpointRegistry = this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}if (this.defaultContainerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurerMessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// 主要方法,注册端点并创建容器this.registrar.afterPropertiesSet();
}
···
**KafkaListenerEndpointRegistrar**```java
@Override
public void afterPropertiesSet() {registerAllEndpoints();
}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {// 上一个阶段已经把所有的端点放入了endpointDescriptors集合中for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(// 注意这个resolveContainerFactorydescriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}
}// 如果在KafkaListener注解中的属性containerFactory没有配置容器工厂的名字,就会默认获取ConcurrentKafkaListenerContainerFactory实现类作为容器工厂
private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) {if (descriptor.containerFactory != null) {return descriptor.containerFactory;}else if (this.containerFactory != null) {return this.containerFactory;}else if (this.containerFactoryBeanName != null) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");this.containerFactory = this.beanFactory.getBean(this.containerFactoryBeanName, KafkaListenerContainerFactory.class);return this.containerFactory;  // Consider changing this if live change of the factory is required}else {throw new IllegalStateException("Could not resolve the " +KafkaListenerContainerFactory.class.getSimpleName() + " to use for [" +descriptor.endpoint + "] no factory was given and no default is set.");}}

在这里插入图片描述
以上截图是真正创建容器的地方,并把创建好的容器添加到Map<String, MessageListenerContainer> listenerContainers,后面起动时会用到。
至此,kafka监听容器创建完成,整理下主要类之间的关系,如下

在这里插入图片描述

一个@KafkaListener注解标注的方法,就可以得到一个MethodKafkaListenerEndpoint,再使用默认的ConcurrentKafkaListenerContainerFactory就会创建出一个MessageListenerContainer监听容器,有几个方法标注了@KafkaListener 就可以得到几个ConcurrentMessageListenerContainer

启动监听容器

上面的流程知道,所有创建的容器放到了`Map<String, MessageListenerContainer> listenerContainers``

在这里插入图片描述
KafkaListenerEndpointRegistry实现了Lifecycle,其中的start()方法会在bean加载的最后一个阶段中被执行到

在这里插入图片描述
以下是执行流程
在这里插入图片描述
其中org.springframework.kafka.listener.KafkaMessageListenerContainer#doStart如下

@Override
protected void doStart() {if (isRunning()) {return;}if (this.clientIdSuffix == null) { // stand-alone containercheckTopics();}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = determineListenerType(listener);// ListenerConsumer的构造函数中创建了真正的Consumer<K, V> consumerthis.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch = new CountDownLatch(1);// ListenerConsumer 实现了Runnable,调用submitListenable是就会开启新的线程执行其中的run方法this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);try {if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {this.logger.error("Consumer thread failed to start - does the configured task executor "+ "have enough threads to support all containers and concurrency?");publishConsumerFailedToStart();}}catch (@SuppressWarnings(UNUSED) InterruptedException e) {Thread.currentThread().interrupt();}
}

每个监听容器ConcurrentMessageListenerContainer中都会创建一个出一个ListenerConsumer或多个(跟concurrency参数配置有关)ListenerConsumer,真正从kafka服务端拉去消息的逻辑在ListenerConsumerrun方法中。
到这里,主要类跟参数之间的对应关系如下
在这里插入图片描述

消息拉取与消费

这一阶段只要关注,消息的拉取到触发用户自定义方法流程与自动位移提交

不断循环拉去消息,并反射调用用户自定义方法:
在这里插入图片描述

protected void pollAndInvoke() {if (!this.autoCommit && !this.isRecordAck) {processCommits();}idleBetweenPollIfNecessary();if (this.seeks.size() > 0) {processSeeks();}pauseConsumerIfNecessary();this.lastPoll = System.currentTimeMillis();this.polling.set(true);// 调用kafka原生api进行拉取ConsumerRecords<K, V> records = doPoll();if (!this.polling.compareAndSet(true, false) && records != null) {/** There is a small race condition where wakeIfNecessary was called between* exiting the poll and before we reset the boolean.*/if (records.count() > 0) {this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());}return;}resumeConsumerIfNeccessary();debugRecords(records);if (records != null && records.count() > 0) {if (this.containerProperties.getIdleEventInterval() != null) {this.lastReceive = System.currentTimeMillis();}// 获取消息后,触发@KafkaListener标注地方法invokeListener(records);}else {checkIdle();}
}

下面先关注消费者位移在dopoll方法中什么时候触发提交地
在这里插入图片描述

在这里插入图片描述

// 每个消费组都有一个消费者协调器coordinator,在coordinator.poll方法中会判断是否需要自动提交位移
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {if (coordinator != null && !coordinator.poll(timer)) {return false;}return updateFetchPositions(timer);}public boolean poll(Timer timer) {maybeUpdateSubscriptionMetadata();invokeCompletedOffsetCommitCallbacks();if (subscriptions.partitionsAutoAssigned()) {// Always update the heartbeat last poll time so that the heartbeat thread does not leave the// group proactively due to application inactivity even if (say) the coordinator cannot be found.// 唤醒心跳检测线程,触发一次心跳检测pollHeartbeat(timer.currentTimeMs());if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {return false;}if (rejoinNeededOrPending()) {// due to a race condition between the initial metadata fetch and the initial rebalance,// we need to ensure that the metadata is fresh before joining initially. This ensures// that we have matched the pattern against the cluster's topics at least once before joining.if (subscriptions.hasPatternSubscription()) {// For consumer group that uses pattern-based subscription, after a topic is created,// any consumer that discovers the topic after metadata refresh can trigger rebalance// across the entire consumer group. Multiple rebalances can be triggered after one topic// creation if consumers refresh metadata at vastly different times. We can significantly// reduce the number of rebalances caused by single topic creation by asking consumer to// refresh metadata before re-joining the group as long as the refresh backoff time has// passed.if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {this.metadata.requestUpdate();}if (!client.ensureFreshMetadata(timer)) {return false;}maybeUpdateSubscriptionMetadata();}if (!ensureActiveGroup(timer)) {return false;}}} else {// For manually assigned partitions, if there are no ready nodes, await metadata.// If connections to all nodes fail, wakeups triggered while attempting to send fetch// requests result in polls returning immediately, causing a tight loop of polls. Without// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.// When group management is used, metadata wait is already performed for this scenario as// coordinator is unknown, hence this check is not required.if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {client.awaitMetadataUpdate(timer);}}// 判断是否需要自动提交位移maybeAutoCommitOffsetsAsync(timer.currentTimeMs());return true;
}

在这里插入图片描述
以下就是消息拉去义位移自动提交地处理流程
在这里插入图片描述
记录返回后,会调用用户自定义地处理逻辑
在这里插入图片描述
以下时具体地调用流程

在这里插入图片描述

小结

1、kafka spring消费者端点注册、创建监听容器、启动监听容器阶段,有两个重要的类KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry,他们对应的方法postProcessAfterInitializationstart在spring容器启动时会被执行,从而实现了kafka的监听容器的创建与启动

在这里插入图片描述
2、kafka自动提交位移时在poll方法中进行的,也就是每次获取新消息时,会先提交上次消费完成的消息;
3、拉取消息跟用户标注了@KafkaListener注解方法的处理逻辑用的是同一个线程,自动提交时间auto.commit.interval.ms默认是5s,假如用户的方法逻辑处理时长是10s,那么位移自动提交是在10s后再次调用poll方法时才会提交,而不是5s后就准时提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/90291.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flutter-第三方组件

卡片折叠 stacked_card_carousel 扫一扫组件 qr_code_scanner 权限处理组件 permission_handler 生成二维码组件 pretty_qr_code 角标组件 badges 动画组件 animations app更新 app_installer 带缓存的图片组件 cached_network_image 密码输入框 collection 图片保存 image_g…

数据分析两件套ClickHouse+Metabase(一)

ClickHouse篇 安装ClickHouse ClickHouse有中文文档, 安装简单 -> 文档 官方提供了四种包的安装方式, deb/rpm/tgz/docker, 自行选择适合自己操作系统的安装方式 这里我们选deb的方式, 其他方式看文档 sudo apt-get install -y apt-transport-https ca-certificates dirm…

SpringBoot复习:(44)MyBatisAutoConfiguration

可以看到MyBatisAutoConfiguration引入了MyBatisProperties这个属性&#xff1a; MyBatisAutoConfiguration中配置了一个SqlSessionFactoryBean,代码如下&#xff1a; 可以配置mybatis-config.xml,需要配置文件里指定&#xff1a; mybatis.config-locationclasspath:/mybat…

2023年京东宠物食品行业数据分析(京东大数据)

宠物食品市场需求主要来自于养宠规模&#xff0c;近年来由于我国宠物数量及养宠人群的规模均在不断扩大&#xff0c;宠物相关产业和市场规模也在蓬勃发展&#xff0c;宠物食品市场也同样保持正向增长。 根据鲸参谋电商数据分析平台的相关数据显示&#xff0c;2023年1月-7月&am…

python单元测试框架(测试固件、批量执行)

python测试框架 在Python语言中应用最广泛的单元测试框架是unittest和pytest,unittest属于标准库&#xff0c;只要安装了Python解释器后就可以直接导入使用了,pytest是第三方的库&#xff0c;需要单独的安装。 1.白盒测试原理 在软件架构的层面来说&#xff0c;测试最核心的步…

红日ATT&CK VulnStack靶场(三)

网络拓扑 web阶段 1.扫描DMZ机器端口 2.进行ssh和3306爆破无果后访问web服务 3.已知目标是Joomla&#xff0c;扫描目录 4.有用的目录分别为1.php 5.configuration.php~中泄露了数据库密码 6.administrator为后台登录地址 7.直接连接mysql 8.找到管理员表&#xff0c;密码加密了…

“多测合一”生产软件-不动产测量(不动产权籍调查测绘软件RESS),房地一体化测量由请湖南来示范

湖南“多测合一”生产软件-不动产测量软件&#xff0c;提取码&#xff1a;RESShttps://pan.baidu.com/s/1OqakLJICIP6buNiZ6j9Npw?pwdRESS 2020年7 月&#xff0c;国务院办公厅印发《 国务院办公厅关于进一步优化营商环境 更好服务市场主体的实施意见》 &#xff08;国办发〔 …

【网络】高级IO

目录 一、五种IO模型 1、阻塞IO 2、非阻塞IO 3、信号驱动 4、IO多路转接 5、异步IO 6、总结 二、高级IO重要概念 1、同步通信与异步通信 2、阻塞 vs 非阻塞 三、非阻塞IO 1、fcntl 2、实现函数SetNoBlock 四、IO多路转接select 1、select 1.1、参数解释 1.2、…

Unity实现异步加载场景

一&#xff1a;创建UGUI 首先我们在LoginCanvas登入面板下面创建一个Panel,取名为LoadScreen,再在loadScreen下面创建一个Image组件&#xff0c;放置背景图片&#xff0c;然后我们再在lpadScreen下面继续创建一个Slider,这个是用来加载进度条的&#xff0c;我们改名为LoadSlid…

简单入门seleniumUI自动化测试

目录 一、selenium的介绍 二、selenium的原理 三、selenium的八种元素定位的方法 1、ID定位&#xff1a; 2 、name定位&#xff1a; 3、class定位&#xff1a; 4、tag定位&#xff1a; 5、link_text定位&#xff1a; 6、partial_link_text定位&#xff1a; 7、css定位…

航顺HK32F030M怎么样 航顺HK32F030M应用领域介绍

航顺HK32F030M是一款基于ARM Cortex-M0内核的32位微控制器&#xff0c;具有高性能、低功耗、经济适用等特点。以下是颖特新关于航顺HK32F030M的详细介绍&#xff1a; 一、性能表现 航顺HK32F030M采用ARM Cortex-M0内核&#xff0c;主频最高可达64MHz&#xff0c;具有出色的计算…

反编译微信小程序,可导出uniapp或taro项目

微信小程序反编译&#xff08;全网通用&#xff09; 微信小程序反编译 反编译主要分为四个阶段 操作流程 1. node.js安装 2. node安装模块 3. 开始反编译 4. 导入到微信开发者工具既可运行 微信小程序反编译 当碰到不会写的小程序功能时&#xff0c;正好看到隔壁小程序有类似…

FPGA应用学习笔记--时钟域的控制 亚稳态的解决

时钟域就是同一个时钟的区域&#xff0c;体现在laways语句边缘触发语句中&#xff0c;设计规模增大就会导致时钟不同步&#xff0c;有时差&#xff0c;就要设计多时钟域。 会经过与门的延时产生的新时钟域&#xff0c;这种其实不推荐使用&#xff0c;但在ascl里面很常见 在处理…

2023国赛数学建模C题思路分析

文章目录 0 赛题思路1 竞赛信息2 竞赛时间3 建模常见问题类型3.1 分类问题3.2 优化问题3.3 预测问题3.4 评价问题 4 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 竞赛信息 全国大学生数学建模…

SpringCloud实用篇5——elasticsearch基础

目录 1.初识elasticsearch1.1 了解ES1.1.1 elasticsearch的作用1.1.2 ELK技术栈1.1.3 elasticsearch和lucene1.1.4 总结 1.2.倒排索引1.2.1.正向索引1.2.2.倒排索引1.2.3.正向和倒排 1.3 es的一些概念1.3.1 文档和字段1.3.2 索引和映射1.3.3 mysql与elasticsearch 1.4 部署单点…

单模光纤模场强度分布以及高斯近似的MATLAB仿真

已知纤芯半径5um&#xff0c;数值孔径NA 0.1&#xff0c;波长 用波长和数值孔径计算归一化常数V 之前我们在单模光纤特征方程及其MATLAB数值求解中&#xff0c;用线性关系拟合过V和W&#xff0c;这里直接用拟合结果 U用V和W计算 clc clear close alla 5e-6;%纤芯半径 NA …

Stable Diffusion + AnimateDiff运用

1.安装AnimateDiff&#xff0c;重启webui 2.下载对应的模型&#xff0c;最好到c站下载&#xff0c;google colab的资源有可能会出现下载问题 https://civitai.com/models/108836 3.下载完成后&#xff0c;你可以随便抽卡了。 抽卡完成后固定seed&#xff0c;然后打开这个插件&…

基于深度信念网络的西储大学轴承故障分类识别,基于EMD+DBN的西储大学轴承故障识别,LCD+DBN,LMD+DBN

目录 背影 DBN神经网络的原理 DBN神经网络的定义 受限玻尔兹曼机(RBM) (EMD,LCD,LMD)+DBN的深度信念网络的西储大学轴承故障分类识别 基本结构 主要参数 数据 MATALB代码 结果图 展望 背影 DBN是一种深度学习神经网络,拥有提取特征,非监督学习的能力,是一种非常好的分类…

C数据结构与算法——常见排序算法时间复杂度比较 应用

实验任务 (1) 掌握常见比较排序算法的实现&#xff1b; (2) 掌握常用比较排序算法的性能及其适用场合。 实验内容 (1) 平均时间复杂度O(n2)和O(nlog2n)的算法至少各选两种实现&#xff1b; (2) 待排序的无重复关键字存放在一维整型数组中&#xff0c;数量为60000个&#xff…

springboot使用configtree读取树形文件目录中的配置

文章目录 一、介绍二、演示环境三、项目演示1. 配置文件2. 导入配置3. 检测配置属性 四、应用场景五、源码解析1. ConfigTreeConfigDataLocationResolver2. ConfigTreeConfigDataLoader 六、总结 一、介绍 相信绝大多数使用springboot开发项目的朋友们在添加配置时&#xff0c…