Kafka原生API使用Java代码-生产者-分区策略-默认分区策略轮询分区策略

文章目录

  • 1、代码演示
  • 1.1、pom.xml
  • 1.2、KafkaProducerPartitioningStrategy.java
    • 1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询
    • 1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询
    • 1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询
    • 1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询
  • 2、分区策略
    • 2.1、linger.ms参数的含义
    • 2.2、linger milliseconds
    • 2.3、linger.ms配置参数的理解

1、代码演示

1.1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.2、KafkaProducerPartitioningStrategy.java

1.2.1、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,不轮询

不等待,不轮询,默认分区策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送//props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.2、ProducerConfig.LINGER_MS_CONFIG取 0 值得情况,轮询

不等待,立即发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,设置为0表示不等待,立即发送props.put(ProducerConfig.LINGER_MS_CONFIG, 0);   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.3、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,轮询

等待1秒后发送,轮询策略

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

1.2.4、ProducerConfig.LINGER_MS_CONFIG取 1000 值得情况,不轮询

等待1秒后发送,不轮询

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.UUID;
public class KafkaProducerPartitioningStrategy {public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口// 确认消息写入策略,"acks"设置为all表示所有副本都确认消息接收后才会响应props.put("acks", "all");// 消息发送失败时的重试次数,设置为0表示不重试props.put("retries", 0);// 发送缓冲区等待时间,等待1秒后,发送props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 分区器选择RoundRobinPartitioner,实现消息在主题分区间的轮询分配//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"  );// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) {producer.send(new ProducerRecord<String, String>("my_topic3", "88"), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("recordMetadata.partition() = " + recordMetadata.partition());}}});}producer.close();}
}

在这里插入图片描述

2、分区策略

kafka的生产者分区策略

  1. 默认分区策略:减少重新建立分区连接的性能损耗 开发使用最多的分区方式,采用黏性分区,默认向第一次连接上的主题分区发送消息,直到消息累积到 batch.size大小(16kb)
  2. 轮询分区策略:每个分区接收一次消息(linger.ms决定生产者一次批量发送多少条消息 到一个分区中),开发中一定不会用轮询分区策略,顶多自定义,因为轮询性能太差,频繁跟不同的分区建立连接,大数据会用轮询策略

2.1、linger.ms参数的含义

在Kafka的生产者(Producer)配置中,props.put("linger.ms", 1); 这行代码是用于设置生产者的linger.ms参数的。

linger.ms参数的含义是:生产者会在发送消息之前等待更多消息被发送到同一个分区(partition)的额外时间(以毫秒为单位)。这样做的目的是为了提高吞吐量,因为将多个消息批量发送到同一个分区可以减少网络传输的开销和服务器端的I/O开销。

具体来说,当你设置了linger.ms参数(比如设置为1毫秒),Kafka生产者会尝试在发送消息之前等待1毫秒,看看是否还有其他的消息要发送到同一个分区。如果有,这些消息将会被合并成一个批次(batch)一起发送。

注意,设置linger.ms参数可能会增加消息的延迟,因为生产者会等待指定的时间以合并更多的消息。所以,这个参数需要在吞吐量和延迟之间进行权衡。

这里是一个简化的示例,展示如何在使用Java Kafka生产者时设置linger.ms参数:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置linger.msprops.put("linger.ms", 1);KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息...producer.close();}
}

在这个示例中,我们创建了一个KafkaProducer对象,并设置了包括linger.ms在内的多个配置参数。然后,你可以使用这个生产者对象来发送消息到Kafka集群。

2.2、linger milliseconds

linger.ms 的英文全称就是 “linger milliseconds”,其中 “linger” 是指延迟或等待,“milliseconds” 是毫秒的意思。

在 Kafka 的 Producer 配置中,linger.ms 参数用于控制 Producer 在发送消息之前等待更多消息到达相同分区(partition)的时间,以便可以将这些消息一起发送,从而提高吞吐量。默认情况下,linger.ms 的值为 0,这意味着 Producer 收到消息后会立即发送,不进行任何延迟。

linger.ms 参数与 batch.size 参数一起使用时,可以实现更复杂的消息发送策略。batch.size 参数定义了单个批次(batch)中允许的最大消息字节数。当 Producer 收到消息时,它会尝试将消息添加到当前批次中。如果linger.ms 大于 0,并且当前批次中的消息数量尚未达到 batch.size 的限制,那么 Producer 会等待 linger.ms 指定的时间,看看是否还有更多的消息要发送到相同的分区。如果有,这些消息将被添加到当前批次中;如果没有,那么在当前时间到达后,Producer 将发送当前批次中的所有消息。

需要注意的是,linger.ms 参数的值应该根据具体的业务场景和性能需求进行调整。较小的值可以提高消息的实时性,但可能会降低吞吐量;较大的值可以提高吞吐量,但可能会增加消息的延迟。因此,在实际应用中需要根据实际情况进行权衡和选择。

2.3、linger.ms配置参数的理解

在Kafka中,linger.ms是一个配置参数,用于控制生产者(producer)在发送消息到broker之前的等待时间,以便将更多的消息累积到同一批次中,从而提高吞吐量。linger.ms的取值可以是任何非负整数,表示毫秒数。

以下是关于linger.ms的一些关键点:

  • 如果linger.ms设置为0,生产者会立即发送消息到broker,不会等待其他消息来累积到同一批次。
  • 如果linger.ms设置为大于0的值,生产者会等待该指定的毫秒数,或者直到达到batch.size(批次大小)的限制,然后将累积的消息作为一个批次发送到broker。
  • 增大linger.ms的值可能会提高吞吐量,因为可以累积更多的消息到同一批次中,减少网络传输的次数。但是,这也会增加消息的延迟。
  • linger.ms的取值可以根据具体的应用场景和需求进行调整。在需要低延迟的场景中,可以将linger.ms设置为较小的值;在可以容忍一定延迟的场景中,可以尝试增大linger.ms的值以提高吞吐量。

综上所述,linger.ms的取值并没有固定的几个选项,而是可以根据实际需求设置为任何非负整数。在配置Kafka生产者时,需要根据具体的业务场景和需求来选择合适的linger.ms值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/335472.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端应用开发实验:表单控件绑定

目录 实验目的相关知识点实验内容代码实现效果 实验目的 &#xff08;1&#xff09;熟练掌握应用v-model指令实现双向数据绑定的方法&#xff0c;学会使用 v-model指令绑定文本框、复选框、单选按钮、下拉菜单&#xff1b; &#xff08;2&#xff09;学会值绑定&#xff08;将…

Java枚举

引入&#xff1a; 当有一些类&#xff0c;希望它的成员的值是具体的有限的值&#xff0c;且只读不需要修改&#xff0c;不希望用户去自定义其他的值。 比如季节类&#xff0c;它的成员只能是春夏秋冬&#xff0c;不希望用户构造其他的值。 枚举enum&#xff1a; 枚举是一组的特…

SQL数据库多层嵌套 json转sql建表语句,SQL数据库里数组里对象数据怎么创建

1. uniapp sqlite 一个数组包含对象嵌套对象通过主外键方式插入数据库&#xff1a; // 假设有一个对象数组&#xff0c;对象中包含嵌套对象 const objectsArray [{parentObject: {id: 1,name: Parent 1,// 其他父对象属性},childObject: {id: 11,parentId: 1,name: Child 1 o…

字符串操作:写一个方法,实现字符串的反转,如:输入abc,输出cba

import java.util.Scanner; public class Test_A15 {public static void main(String[] args){String strA"";System.out.println("请输入一串字符串:");Scanner scannernew Scanner(System.in);strAscanner.next();Test_A15 T15new Test_A15();String re…

使用 LangFuse 意外被挂马!我是怎么恢复系统稳定的?

在使用 LangFuse 过程中,被意外挂马!通过一番折腾服务恢复正常~ 本文将详细介绍应对恶意脚本和进程的完整方案,包括识别、清理、恢复和预防步骤。 阿里云扫到的信息 被执行的 Base64 SUlaQnRTCmV4ZWMgJj4vZGV2L251bGwKSUhDa0hQbmQ9Li8uJChkYXRlfG1kNXN1bXxoZWFkIC1jMjApCl…

AI Agent智能体概述及原理

AI Agent概述 AI Agent旨在理解、分析和响应人类输入&#xff0c;像人类一样执行任务、做出决策并与环境互动。它们可以是遵循预定义规则的简单系统&#xff0c;也可以是根据经验学习和适应的复杂、自主的实体&#xff1b;可以是基于软件的实体&#xff0c;也可以是物理实体。…

行为型设计模式之模板模式

文章目录 概述原理结构图实现 小结 概述 模板方法模式(template method pattern)原始定义是&#xff1a;在操作中定义算法的框架&#xff0c;将一些步骤推迟到子类中。模板方法让子类在不改变算法结构的情况下重新定义算法的某些步骤。 模板方法中的算法可以理解为广义上的业…

【YOLOv5/v7改进系列】引入AKConv——即插即用的卷积块

一、导言 介绍了一种名为AKConv&#xff08;Alterable Kernel Convolution&#xff09;的新型卷积操作&#xff0c;旨在解决标准卷积操作存在的两个根本性问题。首先&#xff0c;标准卷积操作受限于局部窗口&#xff0c;无法捕获来自其他位置的信息&#xff0c;且其采样形状固…

Facebook隐私保护:数据安全的前沿挑战

在数字化时代&#xff0c;随着社交媒体的普及和应用&#xff0c;个人数据的隐私保护问题日益受到关注。作为全球最大的社交平台之一&#xff0c;Facebook承载了数十亿用户的社交活动和信息交流&#xff0c;但与此同时&#xff0c;也面临着来自内外部的数据安全挑战。本文将深入…

玄机平台应急响应—Linux入侵排查

1、前言 这篇文章主要说一下linux的入侵排查&#xff0c;也就是说当你的服务器已经被入侵的时候&#xff0c;该如何去排查使其恢复正常。下面是排查的步骤&#xff0c;但是实际情况往往更为复杂&#xff0c;需要进一步来分析&#xff0c;而不是无脑的按照步骤来敲就完事了。 …

【FPGA】Verilog语言从零到精通

接触fpga一段时间&#xff0c;也能写点跑点吧……试试系统地康康呢~这个需要耐心但是回报巨大的工作。正原子&&小梅哥 15_语法篇&#xff1a;Verilog高级知识点_哔哩哔哩_bilibili 1Verilog基础 Verilog程序框架&#xff1a;模块的结构 类比&#xff1a;c语言的基础…

07 FreeRTOS 事件组(event group)

1、事件组概念 1.1 基本概念 使用事件组可以等待某个事件、若干事件中的任意一个事件、若干事件中的所有事件&#xff0c;但是不能指定若干事件中的某些事件。 事件组可以简单地认为就是一个整数&#xff1a;这个整数的每一位表示一个事件&#xff1b;每一位事件的含义由程序员…

常用的优化器汇总及keras实现

1.SGD&#xff08;Stochastic Gradient Descent&#xff09; 2.RMSprop&#xff08;Root Mean Square Propagation&#xff09; 3.Adadelta 4.Adam&#xff08;Adaptive Moment Estimation&#xff09; 5.Nadam 6.代码实现 from sklearn.compose import make_column_transforme…

外企如何有效面对日益严格的跨境数据传输法律?

在当今这个数据驱动的时代&#xff0c;随着全球化步伐的加快&#xff0c;企业跨国界的数据交流已成为常态。但随之而来的&#xff0c;是各国政府对跨境数据传输日益严格的规定和监管&#xff0c;这让众多外资企业&#xff08;简称“外企”&#xff09;在享受全球市场红利的同时…

区块链技术和应用

文章目录 前言 一、区块链是什么&#xff1f; 二、区块链核心数据结构 2.1 交易 2.2 区块 三、交易 3.1 交易的生命周期 3.2 节点类型 3.3 分布式系统 3.4 节点数据库 3.5 智能合约 3.6 多个记账节点-去中心化 3.7 双花问题 3.8 共识算法 3.8.1 POW工作量证明 总结 前言 学习长…

大模型智力升级:AI的未来之路

大模型的发展引领了人工智能的新时代&#xff0c;其强大的数据处理和学习能力在医疗、金融、教育等众多领域取得了令人瞩目的成就。然而&#xff0c;随之而来的挑战也不容忽视。尽管大模型在特定任务上展现出了卓越的性能&#xff0c;但它们在理解复杂语境、处理未见情况的能力…

项目日记(1): boost搜索引擎

目录 1. 项目相关背景 2. 搜索引擎的相关宏原理 3. 搜索引擎的技术栈和项目环境 4. 正排索引, 倒排索引, 搜索引擎具体原理 5. 编写数据去标签化和数据清洗的模块parser(解析器). 1.项目相关背景 百度, 搜狗, 360等都有搜索引擎, 但是都是全网的搜索; boost是进行站内搜索…

yq—2024/5/29—零钱兑换

代码实现&#xff1a; #define min(a, b) ((a) > (b) ? (b) : (a))int coinChange(int *coins, int coinsSize, int amount) {int dp[amount 1];// 初始化for (int i 0; i < amount 1; i) {dp[i] INT32_MAX;}dp[0] 0;// 01背包 -----先遍历物品&#xff0c;再遍历背…

社区供稿丨GPT-4o 对实时互动与 RTC 的影响

以下文章来源于共识粉碎机 &#xff0c;作者AI芋圆子 前面的话&#xff1a; GPT-4o 发布当周&#xff0c;我们的社区伙伴「共识粉碎机」就主办了一场主题为「GPT-4o 对实时互动与 RTC 的影响」讨论会。涉及的话题包括&#xff1a; GPT-4o 如何降低延迟&#xff08;VAD 模块可…

安卓开发板_开发评估套件_4G/5G联发科MTK安卓主板定制开发

安卓开发板采用了联发科八核A53 CPU&#xff0c;主频2.0GHz&#xff0c;采用12nm制程工艺&#xff0c;拥有强大的通用计算性能。配备GE8300 GPU&#xff0c;支持1080P视频编码和H.264硬解码&#xff0c;能够解析目前流行的视频和图片格式&#xff0c;非常适合各种功能APP的测试…