springboot 实现kafka多源配置

文章目录

  • 背景
  • 核心配置
    • 自动化配置类
    • 注册生产者、消费者核心bean到spring
    • 配置spring.factories
    • yml配置
    • 使用
  • 源码仓库

背景

实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

核心配置

自动化配置类

import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka
@Configuration(proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {public MyKafkaAutoConfiguration() {}public void setBeanFactory(BeanFactory beanFactory) throws BeansException {beanFactory.getBean(CustomKafkaDataSourceRegister.class);}
}

注册生产者、消费者核心bean到spring

public void afterPropertiesSet() {Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();if (factories != null && !factories.isEmpty()) {factories.forEach((factoryName, consumerConfig) -> {KafkaProperties.Listener listener = consumerConfig.getListener();Integer concurrency = consumerConfig.getConcurrency();// 创建监听容器工厂ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);// 注册到容器if (!beanFactory.containsBean(factoryName)) {beanFactory.registerSingleton(factoryName, containerFactory);}});}Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();if (!ObjectUtils.isEmpty(templates)) {templates.forEach((templateName, producerConfig) -> {//registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);//注册spring bean的两种方式registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));});}}

配置spring.factories


org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.kafka.MyKafkaAutoConfiguration

yml配置

spring:kafka:multiple:consumer:factories:test-factory:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: 192.168.56.112:9092group-id: group_aconcurrency: 25fetch-min-size: 1048576fetch-max-wait: 3000listener:type: batchproperties:spring-json-trusted-packages: '*'key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latestproducer:templates:test-template:bootstrap-servers: 192.168.56.112:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

使用

在这里插入图片描述

在这里插入图片描述

源码仓库

https://github.com/fafeidou/shield

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/339413.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Centos 7下的VulFocus靶场搭建详细教程

一、靶场介绍 自带 Flag 功能&#xff1a;每次启动 flag 都会自动更新&#xff0c;明确漏洞是否利用成功。带有计分功能。兼容 Vulhub、Vulapps 中所有漏洞镜像。 二、下载安装 下载 VMware 软件下载 centos镜像 三、Docker知识 学习链接&#xff1a;https://www.runoob.c…

如何在路由器上安装代理服务:详细教程

如何在路由器上安装代理服务&#xff1a;详细教程 步骤一&#xff1a;通过漏洞进入路由器系统开启Telnet服务使用Telnet登录路由器系统查看系统信息和CPU信息步骤二&#xff1a;交叉编译MIPS程序 Go对MIPS的支持 安装TFTP Server使用BusyBox tftp传输文件在路由器系统中下载编译…

linux进程加载和启动过程分析

我们的源代码通过预处理,编译,汇编,链接后形成可执行文件,那么当我们在终端敲下指令$ ./a.out argv1 argv2 后,操作系统是怎么将我们的可执行文件加载并运行的呢? 首先知道,计算机的操作系统的启动程序是写死在硬件上的,每次计算机上电时,都将自动加载启动程序,之后…

四、.Net8对接Ollama实现文字翻译(.Net8+SemanticKernel+Ollama)本地运行自己的大模型

.Net8SemanticKernelOllama 一、Semantic Kernel官方定义SK能做什么&#xff1f; 二、基本使用1、普通对话2、使用插件实现文本翻译功能 三、IChatCompletionService、ITextGenerationService、ITextEmbeddingGenerationService 很多情况都有这样的需求&#xff0c;使用自有系统…

运筹学_5.动态规划

文章目录 引言5.1 动态规划的基本概念首先明确什么是多阶段决策问题 5.2 动态规划的最短路径问题贝尔曼最优化原理建立动态规划模型的步骤 5.3 动态规划的投资分配问题投资分配问题定义投资分配问题的数学模型 5.4 动态规划的背包问题背包问题定义背包问题数学模型 引言 动态规…

Java——二进制原码、反码和补码

一、简要介绍 原码、反码和补码只是三种二进制不同的表示形式&#xff0c;每个二进制数都有这三个形式。 1、原码 原码是将一个数的符号位和数值位分别表示的方法。 最高位为符号位&#xff0c;0表示正&#xff0c;1表示负&#xff0c;其余位表示数值的绝对值。 例如&…

前端JS必用工具【js-tool-big-box】学习,检测密码强度

js-tool-big-box 前端工具库&#xff0c;实用的公共方法越来越多了&#xff0c;这一小节&#xff0c;我们带来的是检测密码强度。 我们在日常开发中&#xff0c;为了便于测试&#xff0c;自己总是想一个简单的密码&#xff0c;赶紧输入。但到了正式环境&#xff0c;我们都应该…

智能售货机的小投入大回报创业机遇

智能售货机的小投入大回报创业机遇 在当今这个快速进化的数字时代&#xff0c;智能售货机作为零售领域的新秀&#xff0c;正以其独特的便捷性和创新性逐步重塑传统零售格局。24小时不间断服务与自动化管理的结合&#xff0c;大幅度削减人力成本&#xff0c;使得智能售货机成为…

电脑突然提示:“failed to load steamui.dll”是什么情况?分享几种解决steamui.dll丢失的方法

相信有一些用户正在面临一个叫做“failed to load steamui.dll”的问题&#xff0c;这种情况多半发生在试图运行某个程序时&#xff0c;系统会提示一条错误消息&#xff1a;“failed to load steamui.dll”。那么&#xff0c;为何steamui.dll文件会丢失&#xff0c;又应该如何解…

Linux 使用 yum安装 ELK服务,yum 安装elasticsearch和Kibana(未写完)

文章目录 环境准备ELK组件介绍安装Elasticsearch安装Kibana 丢弃下载ELK 服务安装包Elasticsearch安装 Tips:关闭elasticsearch https修改 es 启动内存 环境准备 ELK组件介绍 ElasticSearch &#xff1a; 是一个近实时&#xff08;NRT&#xff09;的分布式搜索和分析引擎&…

Unity + 雷达 粒子互动(待更新)

效果预览: 花海(带移动方向) VFX 实例 脚本示例 使用TouchScript,计算玩家是否移动,且计算移动方向 using System.Collections; using System.Collections.Generic; using TouchScript; using TouchScript.Pointers; using UnityEngine; using UnityEngine.VFX;public …

python中的while循环

没有循环时&#xff0c;想打印0-100之间的数字&#xff0c;则需要循环多次&#xff0c;例&#xff1a; print(0) print(1) print(2) print(3) ... print(99) 但是使用循环的话&#xff0c;就不会有那么麻烦 while 循环 while 这个单词有“在……时”的含义&#xff0c;whil…

AI智能分析技术与安防视频融合当前面临的困难与挑战

人工智能与安防视频的融合为现代安全领域带来了革命性的变化&#xff0c;提高了安全管理水平、降低了管理成本并为用户提供了更加便捷和高效的服务。随着技术的不断进步和应用场景的不断拓展&#xff0c;未来人工智能与安防的融合将展现出更加广阔的发展前景。然而&#xff0c;…

Linux 服务查询命令(包括 服务器、cpu、数据库、中间件)

Linux 服务查询命令&#xff08;包括 服务器、cpu、数据库、中间件&#xff09; Linux获取当前服务器ipLinux使用的是麒麟版本还是cenos版本Linux获取系统信息Linux查询nignx版本 Linux获取当前服务器ip hostname -ILinux使用的是麒麟版本还是cenos版本 这个文件通常包含有关L…

Vue进阶之Vue无代码可视化项目(三)

Vue无代码可视化项目 项目初始化store的使用DataSourceView.vuestores/counter.ts开发模式按钮store/editor.tsLayoutView.vue导航条安装图标iconpackage.jsonstore/debug.tssrc/components/AppNavigator.vueAppNavigator.ts:AppNavigator.vue:theme样式theme/reset.csstheme/v…

CRM系统主要是干什么?CRM系统主要功能和作用

什么是CRM 系统&#xff1f;CRM系统到底是干什么的&#xff1f;不同的企业人员该如何利用CRM去解决他们的问题等等&#xff0c;问题太多了&#xff0c;今天来为大家详细介绍。 干货满满&#xff0c;建议收藏&#xff01;&#xff01; 首先第一个问题&#xff0c;什么是CRM系统…

智能监控技术助力山林生态养鸡:打造智慧安全的养殖新模式

随着现代科技的不断发展&#xff0c;智能化、自动化的养殖方式逐渐受到广大养殖户的青睐。特别是在山林生态养鸡领域&#xff0c;智能化监控方案的引入不仅提高了养殖效率&#xff0c;更有助于保障鸡只的健康与安全。视频监控系统EasyCVR视频汇聚/安防监控视频管理平台在山林生…

国产操作系统上Vim的详解01--vim基础篇 _ 统信 _ 麒麟 _ 中科方德

原文链接&#xff1a;国产操作系统上Vim的详解01–vim基础篇 | 统信 | 麒麟 | 中科方德 Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇在国产操作系统上使用Vim的详解文章。Vim是一款功能强大且高度可定制的文本编辑器&#xff0c;广泛应用于编程和日常文本编辑中。…

FreeRTOS基础(九):FreeRTOS的列表和列表项

今天我们将探讨FreeRTOS中的一个核心概念——列表&#xff08;List&#xff09;和列表项&#xff08;List Item&#xff09;。在实时操作系统&#xff08;RTOS&#xff09;中&#xff0c;任务的管理和调度是至关重要的&#xff0c;而FreeRTOS使用列表来实现这一功能。列表可以说…

oracle数据回显时候递归实战

太简单的两篇递归循环 orcale 在项目里递归循环实战 先看资产表T_ATOM_ASSET结构 看业务类别表T_ATOM_BUSI_CATEGORY结构 问题出现 页面显示 实际对应的归属业务分类 涉及到oracle递归实战(这里不会如何直接在atomAsset的seelct里面处理递归回显) 直接在实现层看atomAs…