@KafkaListener指定kafka集群

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("发送消息成功:" + sendResult.toString());}});}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true);      // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId          = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*************************      接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/*************************      处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/236746.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker与微服务实战(高级篇)- 【下】

Docker与微服务实战&#xff08;高级篇&#xff09;- 【下】 八、Docker轻量级可视化工具Portainer8.1.可视化工具Portainer简介8.2.安装Portainer8.2.1.官网8.2.2.docker命令安装8.2.2.1.搜索portainer镜像8.2.2.2.拉取portainer镜像8.2.2.3.启动portainer容器 8.2.3.第一次登…

高通平台开发系列讲解(USB篇)adb function代码分析

文章目录 一、FFS相关动态打印二、代码入口三、ffs_alloc_inst四、ep0、ep1&ep2的注册五、读写过程沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本文主要介绍高通平台USB adb function代码f_fs.c。 一、FFS相关动态打印 目录:msm-4.14/drivers/usb/gadget/fun…

Git新手?这篇文章带你飞!基础操作一网打尽!

推荐阅读 智能化校园&#xff1a;深入探讨云端管理系统设计与实现&#xff08;一&#xff09; 智能化校园&#xff1a;深入探讨云端管理系统设计与实现&#xff08;二&#xff09; 文章目录 推荐阅读Git初识Git啥是版本控制系统&#xff1f;&#xff1f;集中式VS分布式 git使用…

88.乐理基础-记号篇-反复记号(二)D.C.、D.S.、Fine、Coda

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;87.乐理基础-记号篇-反复记号&#xff08;一&#xff09;反复、跳房子-CSDN博客 下图红色左括号框起来的东西&#xff0c;它们都相对比较抽象一点&#xff0c;这几个词都是意大利语 首先D.C.这个标记&#xff0c;然…

基于DNA的密码学和隐写术综述

摘要 本文全面调研了不同的脱氧核糖核酸(DNA)-基于密码学和隐写术技术。基于DNA的密码学是一个新兴领域,利用DNA分子的大规模并行性和巨大的存储容量来编码和解码信息。近年来,由于其相对传统密码学方法的潜在优势,如高存储容量、低错误率和对环境因素的抗性,该领域引起…

.NET core 中的Kestrel 服务器

什么是Kestrel&#xff1f; Kestrel 是一个跨平台的Web服务器&#xff0c;会默认在ASP.NET Core 项目模板中对其进行配置。未使用 IIS 托管时&#xff0c;ASP.NET Core 项目模板默认使用 Kestrel。 Kestrel 的功能包括&#xff1a; 跨平台&#xff1a;Kestrel 是可在 Window…

蓝桥杯基础知识3 memset()

蓝桥杯基础知识3 memset() #include <bits/stdc.h> using namespace std;int main(){int a[5]; //随机数for(int i 0;i < 5; i)cout << a[i] << \n;cout << \n;memset(a, 0, sizeof a); //0for(int i 0;i < 5; i)cout << a[i] << …

最新ThinkPHP版本实现证书查询系统,实现批量数据导入,自动生成电子证书

前提&#xff1a;朋友弄了一个培训机构&#xff0c;培训考试合格后&#xff0c;给发证书&#xff0c;需要一个证书查询系统。委托我给弄一个&#xff0c;花了几个晚上给写的证书查询系统。 实现功能&#xff1a; 前端按照姓名手机号码进行证书查询证书信息展示证书展示&#x…

kotlin运行

1.使用android studio 由于我本身是做android的&#xff0c;android studio本身有内置kotlin的插件。但若只是想跑kotlin的程序&#xff0c;并不像和android程序绑在一起&#xff0c;可以创建一个kt文件&#xff0c;在里面写一个main函数&#xff0c;就可以直接运行kotlin程序…

2024.1.13每日一题

LeetCode 2182.构造限制重复的字符串 2182. 构造限制重复的字符串 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给你一个字符串 s 和一个整数 repeatLimit &#xff0c;用 s 中的字符构造一个新字符串 repeatLimitedString &#xff0c;使任何字母 连续 出现的次数都…

[足式机器人]Part2 Dr. CAN学习笔记 - Ch02动态系统建模与分析

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记 - Ch02动态系统建模与分析 1. 课程介绍2. 电路系统建模、基尔霍夫定律3. 流体系统建模4. 拉普拉斯变换&#xff08;Laplace&#xff09;传递函数、微分方程4.1 Laplace Transform 拉式变换4.2 收…

【python】07.字符串和常用数据结构

字符串和常用数据结构 使用字符串 第二次世界大战促使了现代电子计算机的诞生&#xff0c;最初计算机被应用于导弹弹道的计算&#xff0c;而在计算机诞生后的很多年时间里&#xff0c;计算机处理的信息基本上都是数值型的信息。世界上的第一台电子计算机叫ENIAC&#xff08;电…

Vue+ElementUI+Axios实现携带参数的文件上传(数据校验+进度条)

VueElementUIAxios实现携带参数的文件上传&#xff08;数据校验进度条&#xff09; 可以实现对上传文件的类型&#xff0c;大小进行数据校验&#xff0c;以及对上传文件所要携带的数据也进行的校验&#xff0c;也有文件上传进度的进度条。 一、Vue 结构部分 弹窗显示&#xff0…

Java 面试题 - 多线程并发篇

线程基础 创建线程有几种方式 继承Thread类 可以创建一个继承自Thread类的子类&#xff0c;并重写其run()方法来定义线程的行为。然后可以通过创建该子类的实例来启动线程。 示例代码&#xff1a; class MyThread extends Thread {public void run() {// 定义线程的行为} …

ubuntu20.04网络问题以及解决方案

1.网络图标消失&#xff0c;wired消失&#xff0c;ens33消失 参考&#xff1a;https://blog.51cto.com/u_204222/2465609 https://blog.csdn.net/qq_42265170/article/details/123640669 原始是在虚拟机中切换网络连接方式&#xff08;桥接和NAT&#xff09;&#xff0c; 解决…

K8S--安装MySQL8(单机)

原文网址&#xff1a;K8S--安装MySQL8&#xff08;单机&#xff09;-CSDN博客 简介 本文介绍K8S部署MySQL8&#xff08;单机&#xff09;的方法。 ----------------------------------------------------------------------------------------------- 分享Java真实高频面试题…

解决虚拟机的网络图标不见之问题

在WIN11中&#xff0c;启动虚拟机后&#xff0c;发现网络图标不见了&#xff0c;见下图&#xff1a; 1、打开虚拟机终端 输入“sudo server network-manager stop”&#xff0c;停止网络管理器 输入“cd /回车” &#xff0c; 切换到根目录 输入“cd var回车” &#xff0c;…

探索Shadowsocks-Android:保护你的网络隐私

探索Shadowsocks-Android&#xff1a;保护你的网络隐私 I. 引言 在数字时代&#xff0c;网络隐私和安全变得愈发重要。我们越来越依赖互联网&#xff0c;但同时也面临着各种网络限制和监控。在这个背景下&#xff0c;Shadowsocks-Android应用程序应运而生&#xff0c;为用户提…

java Servlet体育馆运营管理系统myeclipse开发mysql数据库网页mvc模式java编程计算机网页设计

一、源码特点 JSP 体育馆运营管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统采用serlvetdaobean&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用 B/S模式开发。 java Servlet体育馆运营管理系…

自定义数据实现SA3D

SA3D&#xff1a;Segment Anything in 3D with NeRFs 实现了3D目标分割 原理是利用SAM(segment anything) 模型和Nerf分割渲染3D目标&#xff0c; SAM只能分块&#xff0c;是没有语义标签的&#xff0c;如何做到语义连续&#xff1f; SA3D中用了self-prompt, 根据前一帧的mask…