kafka消费数据太慢了,给优化下

原代码

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据props.put("fetch.max.wait.ms", "500"); // 最大等待500ms// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));if (!records.isEmpty()) {for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 批量提交偏移量consumer.commitSync();}}}}
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1"); // 减少最小获取字节数props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {processRecords(records); // 异步处理消息consumer.commitAsync(); // 异步提交偏移量}}}private void processRecords(ConsumerRecords<String, String> records) {// 异步处理消息的逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 这里可以添加消息处理逻辑,例如使用线程池并行处理}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/470332.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP搭建开发环境(Windows系统)

要搭建一个完整的PHP动态网站&#xff0c;离不开操作系统、Web服务器、数据库、和PHP软件。 虽然有不错方便的方式&#xff0c;比如使用phpstudio等等等等许多面板都是非常快速不错的方式&#xff0c;但是这里是教会大家如何配置而不只是依赖别人整合好的面板软件&#xff0c;…

开源 2 + 1 链动模式、AI 智能名片、S2B2C 商城小程序在用户留存与品牌发展中的应用研究

摘要&#xff1a;本文以企业和个人品牌发展中至关重要的用户留存问题为切入点&#xff0c;结合管理大师彼得德鲁克对于企业兴旺发达的观点&#xff0c;阐述了用户留存对品牌营收的关键意义。在此基础上&#xff0c;深入分析开源 2 1 链动模式、AI 智能名片、S2B2C 商城小程序在…

SpringBoot后端解决跨域问题

1.全局方式 新建一个conifg配置类&#xff0c;内容如下&#xff1a; Configuration public class CorsConfig implements WebMvcConfigurer {Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**")//是否发送Cookie.allowCrede…

「数据要素」行业简报|2024.11.上刊

纵观数据要素行业动态&#xff0c;洞察行业风向&#xff0c;把握行业脉搏&#xff01; 一、政策发布 1、《山东省公共数据资源登记管理工作规范(试行)》公开征求意见 11月7日&#xff0c;为认真贯彻落实《中共中央办公厅 国务院办公厅关于加快公共数据资源开发利用的意见》《…

什么是RAG? LangChain的RAG实践!

1. 什么是RAG RAG的概念最先在2020年由Facebook的研究人员在论文《Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks》中提出来。在这篇论文中他们提出了两种记忆类型&#xff1a; 基于预训练模型&#xff08;当时LLM的概念不像现在这么如日中天&#xff0…

Vite初始化Vue3+Typescrpt项目

初始化项目 安装 Vite 首先&#xff0c;确保你的 Node.js 版本 > 12.0.0。然后在命令行中运行以下命令来创建一个 Vite Vue 3 TypeScript 的项目模板&#xff1a; npm init vitelatest进入项目目录 创建完成后&#xff0c;进入项目目录&#xff1a; cd vue3-demo启动…

nginx部署H5端程序与PC端进行区分及代理多个项目及H5内页面刷新出现404问题。

在项目中会碰见需要在nginx代理多个项目&#xff0c;如果在加上uniapp开发的H5端的项目&#xff0c;你还要在nginx中区分PC端和手机H5端&#xff0c;这就会让人很头大&#xff01;网上大部分的资料都是采用在nginx的conf配置文件中添加区分pc和手机端的变量例如&#xff1a;set…

软件测试项目实战

软件测试是使用人工或者自动的手段来运行或者测定某个软件系统的过程&#xff0c;其目的在于检验它是否满足规定的需求或弄清预期结果与实际结果之间的差别。 在软件投入使用前&#xff0c;要经过一系列的严格测试&#xff0c;才能保证交付质量。 一、引言 1.编写目的 本文档…

2024开发者浏览器必备扩展,不允许还有人不知道~

在开发过程中&#xff0c;优秀的扩展工具能够极大提升我们的工作效率&#xff0c;简化工作流程&#xff0c;并使得在浏览器中的开发和调试变得更加便捷。 根据市场占比&#xff0c;Chrome、Safari、Edge、Firefox、Opera 是前五大浏览器&#xff0c;其中Chrome浏览器占据了领先…

分享一个傻瓜式一键启动的加速器

主要发现开通一个号能电脑手机互通&#xff0c;原来电脑手机各一个加速器钱包在滴血。。。一个月也很便宜差不多二十多 链接放这了&#xff0c;有需要自提&#xff1a;首页-小熊加速器http://xxjsq.co/ytfa

TDesign了解及使用

文章目录 1、概述2、快速开始2.1使用 npm 安装2.2通过 浏览器引入 安装2.3、使用 3、简单案例3.1 路由创建3.2、 页面创建3.3、 Table组件3.4、序号展示3.5、 图片展示及预览3.6、 性别字段处理 1、概述 TDesign 是腾讯推出的设计系统&#xff0c;旨在提供一致的设计语言和视觉…

11Java面向对象高级(篇2,Java程序的核心套路!!!!)

更多java知识请点击上面专栏&#xff01;&#xff01;&#xff01; 修道之始&#xff1a; 01Java基础入门(纯小白也能入门&#xff0c;速通Java&#xff0c;知识点归纳超级全面&#xff01;&#xff01;&#xff01;2024版后端成仙起始篇&#xff01;&#xff01;&#xff01;…

定时器(QTimer)与随机数生成器(QRandomGenerator)的应用实践——Qt(C++)

一、QTimer与QRandomGenerator &#xff08;一&#xff09;QTimer&#xff08;定时器&#xff09;[2] QTimer类为定时功能提供了一个高级编程接口。在使用QTimer时&#xff0c;实例化一个QTimer对象并将其timeout()发射信号与合适的信号槽相连接。通过调用QTimer的start()函数…

翼鸥教育:从OceanBase V3.1.4 到 V4.2.1,8套核心集群升级实践

引言&#xff1a;自2021年起&#xff0c;翼鸥教育便开始应用OceanBase社区版&#xff0c;两年间&#xff0c;先后部署了总计12套生产集群&#xff0c;其中核心集群占比超过四分之三&#xff0c;所承载的数据量已突破30TB。自2022年10月&#xff0c;OceanBase 社区发布了4.2.x 版…

AI绘画经验(stable-diffusion)

提示词理解 总的 AI绘画的优点是【想象力】&#xff0c;而不是自然语言的精确描述。 AI绘画只能控制【主体】和【风格】&#xff0c;姿势&#xff0c;表情&#xff0c;装饰&#xff0c;手指都太过于详细了。这也是【人类画师的魅力】 准确描述是徒劳的&#xff0c;只能通过【…

使用支付宝沙箱完成商品下单

使用支付宝沙箱完成商品下单 一&#xff1a;效果展示&#xff1a; 二&#xff1a;代码实现 1&#xff1a;准备工作&#xff1a; 申请支付宝沙箱账户&#xff1a; 登录 - 支付宝 然后要下载密钥密钥工具来生成密钥&#xff1b; 2&#xff1a;流程分析&#xff1a; 先是用户…

Linux设置socks代理

公司里绝大多数主机已经禁止外网访问&#xff0c;仅保留一台主机设置socks作为代理服务器。如下为对socks这一概念的学习整理 什么是socks 是OSI模型下会话层的协议&#xff0c;位于表示层与传输层之间&#xff0c;作用是&#xff1a; exchanges network packets between a c…

以往运维岗本人面试真题分享

以下是本人面试运维岗的一些面试经历&#xff0c;在此做个记录分享 目录 TCP/IP三次握手 IPtables IPtables四表五链都是什么&#xff1f; nat端口如何做&#xff1f; 开放本机的80端口该如何做&#xff1f; 如何在单用户模式下引导Centos&#xff1f; nginx轮询模式都有…

【Hadoop实训】Hive 数据操作①

目录 一、准备文件 1、创建表 2、 数据映射 二、HIVE的数据操作 1、基本查询 a、全表查询 b、选择特定字段查询 c、查询员工表总人数 d、查询员工表总工资额 e、查询5条员工表的信息 2、Where条件查询 a、查询工资等于5000的所有员工 b、查询工资在500到1000的员工信息 …

3.5【数据库系统】ER图

2、实体之间的关系 下面主要针对两个实体间的关系进行介绍 &#xff08;a&#xff09;一对一联系&#xff08;1:1&#xff09;如班级和班长&#xff0c;一个班级只有一个班长&#xff0c;一个班长只能在一个班级任职。 &#xff08;b&#xff09;一对多联系&#xff08;1&#…