springboot整合kafka多数据源

整合kafka多数据源

  • 项目背景
  • 依赖
  • 配置
  • 生产者
  • 消费者
  • 消息体

项目背景

在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。

依赖

    implementation 'org.springframework.kafka:spring-kafka:2.8.2'

配置

单机的情况
如果是单机的kafka我们直接通过springboot自动配置的就可以使用,例如在yml里面直接引用

spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: server001.bbd:9092

在使用的时候直接注入,然后就可以使用里面的方法了

    @Resourceprivate KafkaTemplate<String, String> kafkaTemplate;

在这里插入图片描述

多数据源情况下

本篇文章主要讲的是在多数据源下的使用,和单机的有所不同,我也看了网上的一些博客,但是当我去按照网上的配置的时候,总是会报错 kafakTemplate这个bean找不到,所以没办法只有按照springboot自动配置里面的来改
在这里插入图片描述

package com.ddb.zggz.config;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.io.IOException;@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {private final KafkaProperties properties;private final KafkaSecondProperties kafkaSecondProperties;public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {this.properties = properties;this.kafkaSecondProperties = kafkaSecondProperties;}@Bean("kafkaTemplate")@Primarypublic KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaSecondTemplate")public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,@Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaProducerListener")@Primarypublic ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaSecondProducerListener")public ProducerListener<Object, Object> kafkaSecondProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaConsumerFactory")@Primarypublic ConsumerFactory<Object, Object> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondConsumerFactory")public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.kafkaSecondProperties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("zwKafkaContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(kafkaSecondConsumerFactory);factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Bean("kafkaProducerFactory")@Primarypublic ProducerFactory<Object, Object> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondProducerFactory")public ProducerFactory<Object, Object> kafkaSecondProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.kafkaSecondProperties.buildProducerProperties());String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Bean@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean("kafkaAdmin")@Primarypublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}}

生产者


package com.ddb.zggz.event;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import javax.annotation.Resource;@Component
@Slf4j
public class KafkaPushEvent {@Resourceprivate KafkaTemplate<String, String> kafkaSecondTemplate;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate ApplicationConfiguration configuration;public void pushEvent(PushParam param) {ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;if ("zw".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if ("net".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if (sendResultListenableFuture == null){throw new IllegalArgumentException("kakfa发送消息失败");}sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("kafka发送的message报错,发送数据:{}", param);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("kafka发送的message成功,发送数据:{}", param);}});}}

消费者

package com.ddb.zggz.event;import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;@Component
@Slf4j
public class SendMessageListener {@Autowiredprivate GzApprovalService gzApprovalService;@Autowiredprivate GzServiceService gzServiceService;@KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")@RetryableTopic(include = {Exception.class},backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000))public void listen(ConsumerRecord<?, ?> consumerRecord) {String value = (String) consumerRecord.value();PushParam pushParam = JSONObject.parseObject(value, PushParam.class);//版本提审if ("version-approval".equals(pushParam.getEvent())) {ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);gzApprovalService.approval(approvalDTO);}//服务下架if (pushParam.getEvent().equals("server-OffShelf-gzt")) {OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());}}@DltHandlerpublic void processMessage(String message) {}
}

消息体

package com.ddb.zggz.event;import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;import java.io.Serializable;
import java.time.LocalDateTime;/*** @author bbd*/
@Data
public class PushParam implements Serializable {/*** 发送的消息数据*/private Object data;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)@JSONField(format = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime = LocalDateTime.now();/*** 事件名称,用于消费者处理相关业务*/private String event;/*** 保存版本参数*/public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {PushParam pushParam = new PushParam();pushParam.setData(gzH5VersionManage);pushParam.setEvent("save-version");return pushParam;}/*** 保存服务参数*/public static PushParam toKafkaServer(GzService gzService) {PushParam pushParam = new PushParam();pushParam.setData(gzService);pushParam.setEvent("save-server");return pushParam;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/90134.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

读书笔记 |【项目思维与管理】➾ 项目成为一种生存方式

读书笔记 |【项目思维与管理】➾ 项目成为一种生存方式 一、理解项目固有的挑战二、项目对企业的价值三、知识型企业的经营逻辑四、做项目管理的推进者 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 项目无处不在&#xff0c;项目已经成为…

考研408 | 【计算机网络】 网络层

导图 网络层&#xff1a; 路由器功能&#xff1a;转发&路由选择 数据平面 数据平面执行的主要功能是根据转发表进行转发&#xff0c;这是路由器的本地动作。 控制平面 1.传统方法/每路由器法&#xff1a; 2.SDN方法&#xff08;Software-Defined Networking) 控制平面中的…

【变形金刚01】attention和transformer所有信息

图1.来源&#xff1a;Arseny Togulev在Unsplash上的照片 一、说明 这是一篇 长文 &#xff0c;几乎讨论了人们需要了解的有关注意力机制的所有信息&#xff0c;包括自我注意、查询、键、值、多头注意力、屏蔽多头注意力和转换器&#xff0c;包括有关 BERT 和 GPT 的一些细节。因…

Vue3 引用第三方Swiper内容触摸滑动简单应用

去官网查看更多教程→&#xff1a;Swiper官网 → 点击教程在vue中使用Swiper→ 在Vue中使用Swiper cd 到项目 安装Swiper&#xff1a; cnpm install --save swiper 安装指定版本 cnpm install --save swiper8.1.6 9.4.1 10.1.0…

excel 下载方法封装

1.首先需要拿到后端返回的URL下载地址 2.写个下载方法 // url 接口返回的下载地址。例如&#xff1a;https://cancer-research.oss-cn-beijing.aliyuncs.com/yuance-platform-permission/校内共享数据导入模板.xlsx // name 文件名称 例如&#xff1a; 校内共享数据导入模板 /…

若依vue -【 100 ~ 更 ~ 110 】

100 主子表代码生成详解 1 新建数据库表结构&#xff08;主子表&#xff09; -- ---------------------------- -- 客户表 -- ---------------------------- drop table if exists sys_customer; create table sys_customer (customer_id bigint(20) not null…

设计模式——建造者(Builder)模式

建造者模式&#xff08;Builder Pattern&#xff09;&#xff0c;又叫生成器模式&#xff0c;是一种对象构建模式 它可以将复杂对象的建造过程抽象出来&#xff0c;使这个抽象过程的不同实现方法可以构造出不同表现的对象。建造者模式是一步一步创建一个复杂的对象&#xff0c;…

Elasticsearch的一些基本概念

文章目录 基本概念&#xff1a;文档和索引JSON文档元数据索引REST API 节点和集群节点Master eligible节点和Master节点Data Node 和 Coordinating Node其它节点 分片(Primary Shard & Replica Shard)分片的设定操作命令 基本概念&#xff1a;文档和索引 Elasticsearch是面…

【Unity造轮子】制作一个简单的2d抓勾效果(类似蜘蛛侠的技能)

前言 欢迎阅读本文&#xff0c;本文将向您介绍如何使用Unity游戏引擎来实现一个简单而有趣的2D抓勾效果&#xff0c;类似于蜘蛛侠的独特能力。抓勾效果是许多动作游戏和平台游戏中的常见元素&#xff0c;给玩家带来了无限的想象和挑战。 不需要担心&#xff0c;即使您是一…

车载智能座舱开发核心技术——SystemServer

SystemServer在车载开发中扮演着重要角色&#xff0c;它是Android系统的核心组件之一&#xff0c;负责管理和调度其他系统服务。我们这篇内容将对SystemServer技术进行深入解析&#xff0c;并以实战代码示例加以分析&#xff0c;帮助读者更好地理解和应用该技术。 一、SystemS…

leetcode 力扣刷题 旋转矩阵(循环过程边界控制)

力扣刷题 旋转矩阵 二维矩阵按圈遍历&#xff08;顺时针 or 逆时针&#xff09;遍历59. 旋转矩阵Ⅱ54. 旋转矩阵剑指 Offer 29. 顺时针打印矩阵 二维矩阵按圈遍历&#xff08;顺时针 or 逆时针&#xff09;遍历 下面的题目的主要考察点都是&#xff0c;二维数组从左上角开始顺…

Camx--概述

该部分代码主要位于 vendor/qcom/proprietary/ 目录下&#xff1a; 其中 camx 代表了通用功能性接口的代码实现集合&#xff08;CamX&#xff09;&#xff0c;chi-cdk代表了可定制化需求的代码实现集合&#xff08;CHI&#xff09;&#xff0c;从图中可以看出Camx部分对上作为H…

Typora常用手册

常用快捷键 加粗&#xff1a; Ctrl B 标题&#xff1a; Ctrl H 插入链接&#xff1a; Ctrl K 插入代码&#xff1a; Ctrl Shift C – 无法执行 行内代码&#xff1a; Ctrl Shift K 插入图片&#xff1a; Ctrl Shift I 无序列表&#xff1a;Ctrl Shift L – 无法执行…

Spring事务

Spring事务 一、什么是事务、事务的特性、事务的隔离级别二、Spring中事务实现编程式事务&#xff1a;手动写代码操作事务声明式事务&#xff1a;使用注解自动开启和提交事务 三、Spring事务隔离级别及设置方法四、Spring的事务传播机制Spring事务失效的情况Transactional注解参…

玩赚音视频开发高阶技术——FFmpeg

随着移动互联网的普及&#xff0c;人们对音视频内容的需求也不断增加。无论是社交媒体平台、电商平台还是在线教育&#xff0c;都离不开音视频的应用。这就为音视频开发人员提供了广阔的就业机会。根据这些年来网站上的音视频开发招聘需求来看&#xff0c;音视频开发人员的需求…

Redis缓存读写策略(三种)数据结构(5+3)

Redis缓存读写策略&#xff08;三种&#xff09; Cache Aside Pattern&#xff08;旁路缓存模式&#xff09; Cache Aside Pattern 是我们平时使用比较多的一个缓存读写模式&#xff0c;比较适合读请求比较多的场景。 写&#xff1a; 先更新 db然后直接删除 cache 。 读 : …

Leetcode链表篇 Day3

.24. 两两交换链表中的节点 - 力扣&#xff08;LeetCode&#xff09; 1.构建虚拟结点 2.两两一组&#xff0c;前继结点一定在两两的前面 3.保存结点1和结点3 19. 删除链表的倒数第 N 个结点 - 力扣&#xff08;LeetCode&#xff09; 1.双指针&#xff1a;快慢指针 两个指针的差…

【BASH】回顾与知识点梳理(二十三)

【BASH】回顾与知识点梳理 二十三 二十三. Linux 账号管理&#xff08;二&#xff09;23.1 账号管理新增与移除使用者&#xff1a; useradd, 相关配置文件, passwd, usermod, userdelusermoduserdel 23.2 用户功能&#xff08;普通用户可使用&#xff09;idfingerchfnchsh 23.3…

Linux知识点 -- 进程信号(二)

Linux知识点 – 进程信号&#xff08;二&#xff09; 文章目录 Linux知识点 -- 进程信号&#xff08;二&#xff09;一、信号保存1.相关概念2.信号保存的相关接口3.对所有的信号都进行自定义捕捉4.将2号信号block&#xff0c;并打印pending信号集5.将所有信号都block 二、处理信…

.NET6使用SqlSugar操作数据库

1.//首先引入SqlSugarCore包 2.//新建SqlsugarSetup类 public static class SqlsugarSetup{public static void AddSqlsugarSetup(this IServiceCollection services, IConfiguration configuration,string dbName "ConnectString"){SqlSugarScope sqlSugar new Sq…