Kafka 之批量消息发送消费

前言:

前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 消息批量发送

Kafka 没有提供批量发送消息的 API,Kafka 的方式是提供一个 RecordAccumulator 消息收集器,将发送给同一个 Topic 同一个 Partition 的消息先缓存起来,当其达到某些条件后,才会一次性的将消息提交给 Kafka Broker。

Kafka 消息的批量发送主要跟以下三个参数有关:

  • batch.size:批量发送消息的大小,默认 16KB,产生的消息达到这个数量后,即刻触发消息批量提交到 Kafka Broker。
  • buffer.memory:生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数,模式是 32 MB,如果超过这个数量,即刻触发消息批量提交到 Kafka Broker。
  • linger.ms:批量发送的的最大时间间隔,单位是毫秒,当达到配置的时间之后,会立刻触发消息批量提交大 Kafka Broker。

以上三个条件满足一个就会触发消息的批量提交。

官方文档传送门

Kafka 批量消息 参数配置

上面我们分析了 Kafka 没有提供批量发送的 API,而是使用了三个参数来控制批量发送的,换句话说,其实我们每次使用 Kafka 发送消息的时候都是批量发送,Kafka 批量发送消息的代码没有什么特殊之处,只需要对上面解释的三个参数进行按需配置即可,本案例的配置如下:

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

Kafka 批量消息 Producer 代码演示

Kafka 批量发送消息代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: MyKafkaBatchProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

在 Kafka 发送完成消息后,我们记录了当前时间,这个时间是用来证明消息是被批量发送的。

Kafka 批量消息 Consumer 代码演示

Kafka 批量消息的代码也没有什么特殊之处,还是使用 @KafkaListener注解来监听消息,只不过参数变成了 List<ConsumerRecord<String, String>> 类型,然后我们在配置中配置了批量消费的模式,批量消费的配置如下:

#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
spring.kafka.listener.type = batch

Consumer 代码如下:

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** @ClassName: MyKafkaBatchConsumer * @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchConsumer {@KafkaListener(id = "my-kafka-consumer-01",groupId = "my-kafka-consumer-groupId-01",topics = "my-topic",containerFactory = "myContainerFactory",properties = {"max.poll.records:10"})public void listen(List<ConsumerRecord<String, String>> consumerRecords) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("my-kafka-consumer-groupId-01 消息消费成功,当前时间:{},消息size:{}", dateStr, consumerRecords.size());for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String value = consumerRecord.value();log.info("消息内容:{}",value);}}}

这里我们使用了 properties 这个属性配置,后面详细讲解。

** Kafka 批量消息验证**

触发消息发送消费结果如下:

2024-10-27 15:27:17.563  INFO 18320 --- [nio-8086-exec-2] c.o.s.k.producer.MyKafkaBatchProducer    : 完成消息发送,当前时间:2024-10-27 15:27:17
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:27:22,消息size:10
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第0条 kafka 消息
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第1条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第2条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第3条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第4条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第5条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第6条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第7条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第8条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第9条 kafka 消息

2024-10-27 15:27:17 完成消息发送,:2024-10-27 15:27:22 完成消息消费,时间间隔是 5秒,消息是 10 条,符合预期。

我们修改配置再次演示,将批量发送消息的时间间隔改为 10 秒,同时一次性发送 1000 条消息,是消息的总大小大于 1KB。

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

调整消息发送端代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @ClassName: KafkaProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

触发消息发送消费结果如下:

2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-2-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10

可以看到消息发送和消息消费几乎是同时进行的,因为这里我们打印的是时间只有秒,是看不出差异的,但是也可以根据这个结果看出,消费者并没有等到 10秒后才开始消费,是因为批量发送消息的大小大于了1KB 就触发了批量消息的提交,符合上面我们说的三个条件满足其中一个就触发批量消息提交到 Kafka Broker,结果符合预期。

关于 buffer-memory 这个参数这里不做验证了,有兴趣的朋友可以自己去验证哈。

spring.kafka.consumer.max-poll-records 参数讨论

spring.kafka.consumer.max-poll-records 表示一次调用 poll() 操作时返回的最大记录数,默认为 500 条,上面的案例中我们使用了 properties = {“max.poll.records:10”} 这个配置,其实这个配置也是配置批量拉去消息的最大数量,我们配置的是 10,日志记录每次最多拉去的数量就是 10,使用 properties 的配置方式可以覆盖掉项目配置文件中的配置,也就是局部配置覆盖全局配置,这样做的好处是显而易见的,我们可以针对每个消费端按需做出灵活配置。

总结:本篇简单分享了 Kafka 批量发送消息消费的一些案例,希望可以帮助到有需要的朋友,分享有错误的地方也欢迎大家提出纠正。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/464608.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0.STM32F1移植到F0的各种经验总结

1.结构体的声明需放在函数的最前面 源代码&#xff1a; /*开启时钟*/RCC_APB2PeriphClockCmd(RCC_APB2Periph_USART1, ENABLE); //开启USART1的时钟RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE); //开启GPIOA的时钟/*GPIO初始化*/GPIO_InitTypeDef GPIO_InitStructu…

在Microsoft Outlook日历中添加多个时区

在Microsoft Outlook日历中添加多个时区 1.单击Outlook中的文件选项卡&#xff0c;单击选项 2.左侧菜单中选择日历 3.向下滚动到时区部分&#xff0c;并标记当前时区&#xff0c;比如China 4.选中“显示第二个时区”框 5.选择第二个时区并给它一个标签&#xff0c;比如Germa…

为啥学习数据结构和算法

基础知识就像是一座大楼的地基&#xff0c;它决定了我们的技术高度。而要想快速做出点事情&#xff0c;前提条件一定是基础能力过硬&#xff0c;“内功”要到位。 想要通关大厂面试&#xff0c;千万别让数据结构和算法拖了后腿 我们学任何知识都是为了“用”的&#xff0c;是为…

爬虫学习4

from threading import Thread#创建任务 def func(name):for i in range(100):print(name,i)if __name__ __main__:#创建线程t1 Thread(targetfunc,args("1"))t2 Thread(targetfunc, args("2"))t1.start()t2.start()print("我是诛仙剑")from …

【Maven】——基础入门,插件安装、配置和简单使用,Maven如何设置国内源

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 引入&#xff1a; 一&#xff1a;Maven插件的安装 1&#xff1a;环境准备 2&#xff1a;创建项目 二…

Vue中使用echarts生成地图步骤详解

1.创建容器元素 <div class"map" id"map" style"width:1000px;height:1000px;"></div> 2.Vue项目引入world.js(我这里的演示是世界地图&#xff0c;不同地图对应js文件不一样) world.js文件包含&#xff1a; 地理坐标数据&#xff…

docker安装低版本的jenkins-2.346.3,在线安装对应版本插件失败的解决方法

提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、网上最多的默认解决方法1、jenkins界面配置清华源2、替换default.json文件 二、解决低版本Jenkins在线安装插件问题1.手动下载插件并导入2.低版本jenkins在…

算法专题:栈

目录 1. 删除字符串中的所有相邻重复项 1.1 算法原理 1.2 算法代码 2. 844. 比较含退格的字符串 2.1 算法原理 2.2 算法原理 3. 基本计算器 II 3.1 算法原理 3.2 算法代码 4. 字符串解码 4.1 算法原理 4.2 算法代码 5. 验证栈序列 5.1 算法原理 5.2 算法代码 1.…

ZDH权限-扩展支持数据权限

目录 项目源码 预览地址 安装包下载地址 ZDH权限模块 ZDH权限扩展更细粒度方案 第一种方案&#xff1a; 第二种方案&#xff1a; ZDH权限扩展支持数据权限-新增属性 总结 感谢支持 项目源码 zdh_web: GitHub - zhaoyachao/zdh_web: 大数据采集,抽取平台 预览地址 后…

交换机的基本配置

交换机的基本配置 实验题目实验目的实验任务实验设备实验环境实验步骤VLAN 的简单配置跨交换机 vlan 的配置主机配置信息表解释&#xff1a; vlan 间路由 实验题目 交换机的基本配置。 实验目的 1) 理解交换机的原理和应用场景&#xff1b; 2) 交换机的基本指令系统&#xf…

借助 Aspose.Words,使用 C# 从 Word 文档中删除页面

如果您正在寻找一种快速删除 Word 文档中不相关、过时或空白页的方法&#xff0c;那么您来对地方了。在这篇博文中&#xff0c;我们将学习如何使用 C# 从 Word 文档中删除页面。我们将逐步引导您完成该过程&#xff0c;提供清晰的示例&#xff0c;以帮助您以编程方式高效地从 W…

华为 HarmonyOS NEXT 原生应用开发: 动画的基础使用(属性、显示、专场)动画

2024年11月5日 LiuJinTao 文章目录 鸿蒙中动画的使用一、属性动画 - animation属性动画代码示例 二、显示动画 - AnimateTo三、专场动画 鸿蒙中动画的使用 一、属性动画 - animation 属性动画代码示例 /*** 属性动画的演示*/ Entry Component struct Index {State selfWidth:…

基于STM32的手式电视机遥控器设计

引言 本项目基于STM32微控制器设计了一个手式电视机遥控器系统&#xff0c;通过集成加速度传感器和陀螺仪&#xff0c;实现手势识别和遥控功能。该遥控器系统可以通过简单的手势操作实现对电视机的音量调节、频道切换和开关机控制等功能。项目涉及到硬件设计、手势识别算法和红…

基于SpringBoot+微信小程序+协同过滤算法+二维码订单位置跟踪的农产品销售平台-新

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; “农产品商城”小程序…

论文阅读-用于点云分析的自组织网络

目前存在的问题&#xff1a; 原始的SOM&#xff08;1&#xff09;训练结果与初始节点高度相关&#xff08;2&#xff09;样本更新规则取决于输入点的顺序3D 卷积神经网络&#xff08;需要将数据转换为体素&#xff0c;存在分辨率损失和计算成本上涨的问题&#xff09;、PointN…

ComfyUI和Photoshop相结合,PS内实现:文生图,图生图,高清放大,局部重绘,面部修复,设计师福音

本文主要介绍&#xff1a;ComfyUI和Photoshop相结合&#xff0c;一个平台实现&#xff1a;图像生成&#xff0c;放大&#xff0c;局部重绘&#xff0c;面部修复&#xff0c;实时绘画 简直是设计师的福音。 主要包括&#xff1a; Photoshop 的安装以及插件的安装 Creative Cl…

新一代跟踪器StrongSORT: Make DeepSORT Great Again论文解析—让 DeepSORT 再次伟大

新一代跟踪器StrongSORT: Make DeepSORT Great Again论文解析—让 DeepSORT 再次伟大 时间&#xff1a;2023年 机构:北京邮电大学 发表在&#xff1a;IEEE TRANSACTIONS ON MULTIMEDIA, VOL. 25, 2023 代码源码地址&#xff1a; pytorch版本&#xff1a;https://github.com/dyh…

【力扣专题栏】重排链表,如何实现链表里面节点之间的交换?

题解目录 1、题目描述解释2、算法原理解析3、代码编写 1、题目描述解释 主要就是实现&#xff1a;第一个节点和最后一个节点交换&#xff0c;第二节点和倒数第二个节点交换&#xff0c;依次交换下去。 2、算法原理解析 3、代码编写 class Solution { public:void reorderList(…

TP-LINK TL-XDN7000H免驱版 ubuntu 20.04驱动安装

最近装机购买的主板没有无线网卡&#xff0c;PCIE网卡因为显卡占位无法安装&#xff0c;无奈只能购买USB无限网卡。 1 网卡型号 TP-LINK WiFi6免驱900M usb无线网卡 外置高增益 台式机笔记本电脑wifi接收器发射器 TL-XDN7000H 2 驱动下载链接 TL-XDN7000H免驱版 V1.1 Linu…

【初阶数据结构与算法】复杂度分析练习之轮转数组(多种方法)

文章目录 复杂度练习之轮转数组方法1方法2方法3 总结 复杂度练习之轮转数组 题目链接&#xff1a;https://leetcode.cn/problems/rotate-array/description/    为什么我们把这道题作为复杂度的练习题呢&#xff1f;是因为我们以后做题都会涉及到复杂度的计算&#xff0c;我…