Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/466449.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习与AI|如何利用数据科学优化库存周转率?

对于所有零售商来说&#xff0c;良好的库存管理都是非常重要的。众所周知&#xff0c;商品如果不放在货架上就无法出售&#xff0c;而如果库存过多则意味着严重的财务负担。 但是做好库存管理绝非易事&#xff0c;它依赖于对未来需求的准确预测和确保始终有合适库存的敏捷供应链…

Proteus中数码管动态扫描显示不全(已解决)

文章目录 前言解决方法后记 前言 我是直接把以前写的 51 数码管程序复制过来的&#xff0c;当时看的郭天祥的视频&#xff0c;先送段选&#xff0c;消隐后送位选&#xff0c;最后来个 1ms 的延时。 代码在 Proteus 中数码管静态是可以的&#xff0c;动态显示出了问题——显示…

如何快速搭建一个spring boot项目

一、准备工作 1.1 安装JDK&#xff1a;确保计算机上已安装Java Development Kit (JDK) 8或更高版本、并配置了环境变量 1.2 安装Maven&#xff1a;下载并安装Maven构建工具&#xff0c;这是Spring Boot官方推荐的构建工具。 1.3 安装代码编辑器&#xff1a;这里推荐使用Inte…

基于ViT的无监督工业异常检测模型汇总

基于ViT的无监督工业异常检测模型汇总 论文1&#xff1a;RealNet: A Feature Selection Network with Realistic Synthetic Anomaly for Anomaly Detection&#xff08;2024&#xff09;1.1 主要思想1.2 系统框架 论文2&#xff1a;Inpainting Transformer for Anomaly Detecti…

数据结构C语言描述2(图文结合)--有头单链表,无头单链表(两种方法),链表反转、有序链表构建、排序等操作,考研可看

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;用C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…

Python | Leetcode Python题解之第542题01矩阵

题目&#xff1a; 题解&#xff1a; class Solution:def updateMatrix(self, matrix: List[List[int]]) -> List[List[int]]:m, n len(matrix), len(matrix[0])# 初始化动态规划的数组&#xff0c;所有的距离值都设置为一个很大的数dist [[10**9] * n for _ in range(m)]…

ENSP作业——园区网

题目 根据上图&#xff0c;可得需求为&#xff1a; 1.配置交换机上的VLAN及IP地址。 2.设置SW1为VLAN 2/3的主根桥&#xff0c;设置SW2为VLAN 20/30的主根桥&#xff0c;且两台交换机互为主备。 3.可以使用super vlan。 4.上层通过静态路由协议完成数据通信过程。 5.AR1作为企…

【1个月速成Java】基于Android平台开发个人记账app学习日记——第7天,申请阿里云SMS短信服务SDK

系列专栏链接如下&#xff0c;方便跟进&#xff1a; https://blog.csdn.net/weixin_62588253/category_12821860.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12821860&sharereferPC&sharesourceweixin_62588253&sharefromfrom_link 同时篇幅…

让Apache正确处理不同编码的文件避免中文乱码

安装了apache2.4.39以后&#xff0c;默认编码是UTF-8&#xff0c;不管你文件是什么编码&#xff0c;统统按这个来解析&#xff0c;因此 GB2312编码文件内的中文将显示为乱码。 <!doctype html> <html> <head><meta http-equiv"Content-Type" c…

『Django』初识前后端分离

点赞 + 关注 + 收藏 = 学会了 本文简介 在前面的「Django」系列的文章 中使用的是“前后端不分离”的方式去学习 Django,但现在企业比较流行的开发方式是前后端分离。 简单来说,前后端分离就是把前端和后端的工作分配给2个人做,前端主要负责用户界面的开发,后端主要负责…

探索开放资源上指令微调语言模型的现状

人工智能咨询培训老师叶梓 转载标明出处 开放模型在经过适当的指令调整后&#xff0c;性能可以与最先进的专有模型相媲美。但目前缺乏全面的评估&#xff0c;使得跨模型比较变得困难。来自Allen Institute for AI和华盛顿大学的研究人员们进行了一项全面的研究&#xff0c;探索…

搜维尔科技:【应用】Xsens在荷兰车辆管理局人体工程学评估中的应用

荷兰车辆管理局&#xff08;RDW&#xff09;通过数据驱动的人体工程学评估&#xff0c;将职业健康和安全放在首位。 关键信息 01 改进人体工程学评估&#xff1a;RDW使用Xsens动作捕捉和Scalefit Industrial Athlete进行精确、实时的人体工程学评估&#xff0c;识别并降低与…

文件系统和日志管理 附实验:远程访问第一台虚拟机日志

文件系统和日志管理 文件系统&#xff1a;文件系统提供了一个接口&#xff0c;用户用来访问硬件设备&#xff08;硬盘&#xff09;。 硬件设备上对文件的管理 文件存储在硬盘上&#xff0c;硬盘最小的存储单位是512字节&#xff0c;扇区。 文件在硬盘上的最小存储单位&…

大众汽车合肥社招入职笔试测评SHL题库:综合能力、性格问卷、英语口语真题考什么?

大众汽车合肥社招入职笔试测评包括综合能力测试、性格问卷和英语口语测试。以下是各部分的具体内容&#xff1a; 1. **综合能力测试**&#xff1a; - 这部分测试需要46分钟完成&#xff0c;建议准备计算器和纸笔。 - 测试内容涉及问题解决能力、数值计算能力和逻辑推理能力。 -…

Python进阶之IO操作

文章目录 一、文件的读取二、文件内容的写入三、之操作文件夹四、StringIO与BytesIO 一、文件的读取 在python里面&#xff0c;可以使用open函数来打开文件&#xff0c;具体语法如下&#xff1a; open(filename, mode)filename&#xff1a;文件名&#xff0c;一般包括该文件所…

UE5.4 PCG 自定义PCG蓝图节点

ExecuteWithContext&#xff1a; PointLoopBody&#xff1a; 效果&#xff1a;点密度值与缩放成正比

Transformer和BERT的区别

Transformer和BERT的区别比较表&#xff1a; 两者的位置编码&#xff1a; 为什么要对位置进行编码&#xff1f; Attention提取特征的时候&#xff0c;可以获取全局每个词对之间的关系&#xff0c;但是并没有显式保留时序信息&#xff0c;或者说位置信息。就算打乱序列中token…

Apache Commons Collections 反序列化漏洞

文章目录 前言一、漏洞爆出二、复现环境java集合框架问题JVM反射 三、Apache Commons Collections漏洞原理≤3.2.1CC关键类调用链路POC构造思路POC 前言 Apache Commons Collections是一个扩展了Java标准库里的Collection结构的第三方基础库&#xff0c;它提供了很多强大的数据…

正则表达式1 re.match惰性匹配详解案例

点个关注 re.match() re.match() 函数尝试从字符串的开头开始匹配一个模式&#xff0c;如果匹配成功&#xff0c;返回一个匹配成功的对象&#xff0c;否则返回None。大小写区分&#xff0c;内容匹配不到后面的,只能匹配一个&#xff0c;不能有空格&#xff08;开头匹配&#…

gov企业征信系统瑞数6vmp算法还原

URL aHR0cHM6Ly9zZC5nc3h0Lmdvdi5jbi8今天再来逆向下国家企业征信系统&#xff0c;这个站很卡&#xff0c;兄弟们你们轻点爬&#xff0c;我刷以下页面就转好久的圈圈&#xff0c;这个站两层防护&#xff0c;一层加速乐&#xff0c;一层瑞数&#xff0c;貌似还有极验验证码防护…