kafka发送消息-生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

  • 1、默认策略,程序自动计算并指定分区
    • 1.1、指定key,不指定分区
    • 1.2、不指定key,不指定分区
  • 2、轮询分配策略RoundRobinPartitioner
    • 2.1、创建配置类
    • 2.2、application.yml文件
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、执行结果
  • 3、自定义分区分配策略
    • 3.1、创建自定义分配策略类
    • 3.2、修改kafka配置类
    • 3.3、application.yml文件
    • 3.4、生产者
    • 3.5、测试类
    • 3.6、测试结果
    • 3.7、总结

在这里插入图片描述

1、默认策略,程序自动计算并指定分区

1.1、指定key,不指定分区

生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看程序最终会把消息发送到哪个分区。

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send9(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);}
}

测试类:

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send9(){eventProducer.send9();}
}

程序最终是通过以下代码进行目标分区计算的:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

通过调试发现,程序是通过以下代码进行目标分区计算的:
程序自动读取生产者发送消息时的key(本次发送时值为“key9”),将key生成一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果一定在0-8之间),进而计算得出消息默认发送到的分区值

在这里插入图片描述

在这里插入图片描述

1.2、不指定key,不指定分区

生产者:
在这里插入图片描述

测试类:

在这里插入图片描述
此时时通过随机数与默认分区取余数计算默认分区的

使用随机数 % numPartitions

2、轮询分配策略RoundRobinPartitioner

通过查看kafka源码发现,分区接口有一个轮询分配策略相关实现类。
在这里插入图片描述

在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配策略的配置,那么该如何试下轮询指定分区的配置呢?
在这里插入图片描述

需要编写代码试下轮询指定分区策略:

在这里插入图片描述

2.1、创建配置类

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;}public ProducerFactory<String, ?> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, ?> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}//第二次创建@Beanpublic NewTopic newTopic9() {return new NewTopic("heTopic", 9, (short) 1);}
}

2.2、application.yml文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:#key默认是StringSerializer序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer#value默认是ToStringSerializer序列化value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

2.3、生产者

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

在这里插入图片描述

2.4、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send10(){for (int i = 0; i <5 ; i++) {eventProducer.send10();}}}

2.5、执行结果

执行完测试类,发现5次请求分别发送到了kafka的heTopic主题的5个不同分区中:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、自定义分区分配策略

在这里插入图片描述

3.1、创建自定义分配策略类

package com.power.config;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner {private AtomicInteger nextPartition = new AtomicInteger(0);@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(key==null){//使用轮询方式选择分区int next = nextPartition.getAndIncrement();if(next>=numPartitions){nextPartition.compareAndSet(next,0);}if(next>0){next--;}System.out.println("分区值:"+next);return next;}else {//如果key不为inull,则使用默认的分区策略return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3.2、修改kafka配置类

指定使用自定义的分区分配类
在这里插入图片描述

3.3、application.yml文件

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置生产者(24个配置)producer:#key默认是StringSerializer序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer#value默认是ToStringSerializer序列化value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer#配置消费者(24个配置)consumer:auto-offset-reset: earliesttemplate:default-topic: default-topic

在这里插入图片描述

3.4、生产者

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

在这里插入图片描述

3.5、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid send10(){for (int i = 0; i <5 ; i++) {eventProducer.send10();}}}

3.6、测试结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.7、总结

使用自定义分区策略类尝试发送消息,发现发送的5次消息,并没有连续发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。

doSend方法;

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {throwIfProducerClosed();// first make sure the metadata for the topic is availablelong nowMs = time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException("Producer closed while send in progress", e);throw e;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;byte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}int partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers = record.headers().toArray();int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),compressionType, serializedKey, serializedValue, headers);ensureValidRecordSize(serializedSize);long timestamp = record.timestamp() == null ? nowMs : record.timestamp();if (log.isTraceEnabled()) {log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);}// producer callback will make sure to call both 'callback' and interceptor callbackCallback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);if (transactionManager != null && transactionManager.isTransactional()) {transactionManager.failIfNotReadyForSend();}RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition = partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);if (log.isTraceEnabled()) {log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);}// producer callback will make sure to call both 'callback' and interceptor callbackinterceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (transactionManager != null && transactionManager.isTransactional())transactionManager.maybeAddPartitionToTransaction(tp);if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch (ApiException e) {log.debug("Exception occurred during message send:", e);if (callback != null)callback.onCompletion(null, e);this.errors.record();this.interceptors.onSendError(record, tp, e);return new FutureFailure(e);} catch (InterruptedException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);} catch (KafkaException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw e;} catch (Exception e) {// we notify interceptor about all exceptions, since onSend is called before anything else in this methodthis.interceptors.onSendError(record, tp, e);throw e;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/410622.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用idea快速创建springbootWeb项目(springboot+springWeb+mybatis-Plus)

idea快速创建springbootWeb项目 详细步骤如下 1&#xff09;创建项目 2&#xff09;选择springboot版本 3&#xff09;添加web依赖 4&#xff09;添加Thymeleaf 5&#xff09;添加lombok依赖 然后点击create进入下一步 双击pom.xml文件 6&#xff09;添加mybatis-plus依赖 …

【系统分析师】-案例篇-数据库

1、分布式数据库 1&#xff09;请用300字以内的文字简述分布式数据库跟集中式数据库相比的优点。 &#xff08;1&#xff09;坚固性好。由于分布式数据库系统在个别结点或个别通信链路发生故障的情况下&#xff0c;它仍然可以降低级别继续工作&#xff0c;系统的坚固性好&…

Ubuntu搭建FTP服务器

目录 1.ftp简介 2.vsftpd 2.1.介绍 2.2.安装与卸载 2.3.综合案例 - 本地用户模式 2.4.1.创建FTP用户 2.4.2.配置vsftpd 2.4.3.配置防火墙 1.ftp简介 一般来讲&#xff0c;人们将计算机联网的首要目的就是获取资料&#xff0c;而文件传输是一种非常重要的获取资料的方…

Docker 修改镜像源

由于docker hub 被禁&#xff0c;导致 docker 拉取镜像失败&#xff0c;解决办法就是使用国内的镜像源&#xff0c;目前国内的镜像源还是很多的&#xff0c;例如阿里云、腾讯云、华为云等等&#xff0c;下面演示一个更换成阿里云的步骤。 1. 阿里云获取加速地址 1.1 首先登录阿…

Git —— 1、Windows下安装配置git

Git简介 Git 是一个免费的开源分布式版本控制系统&#xff0c;旨在处理从小型到 快速高效的超大型项目。 Git 易于学习&#xff0c;占用空间小&#xff0c;性能快如闪电。 它超越了 Subversion、CVS、Perforce 和 ClearCase 等 SCM 工具 具有 cheap local branching、 方便的暂…

HIVE 数据仓库工具之第一部分(讲解部署)

HIVE 数据仓库工具 一、Hive 概述1.1 Hive 是什么1.2 Hive 产生的背景1.3 Hive 优缺点1.3.1 Hive的优点1.3.2 Hive 的缺点 1.4 Hive在Hadoop生态系统中的位置1.5 Hive 和 Hadoop的关心 二、Hive 原理及架构2.1 Hive 的设计原理2.2 Hive 特点2.3 Hive的体现结构2.4 Hive的运行机…

Linux 配置wireshark 分析thread 使用nRF-Sniffer dongle

Linux 配置wireshark nRF-Sniffer-for-802.15.4 1.下载固件和配置文件 https://github.com/NordicSemiconductor/nRF-Sniffer-for-802.15.4 2.烧写固件 使用nRF Connect for Desktop 中的 programmer 4.3烧写 https://www.nordicsemi.com/Products/Development-tools/nrf-conne…

【layUI】点击导出按钮,导出excel文件

要实现的功能如下&#xff1a;根据执行状态判断是否可以导出。如果可以导出&#xff0c;点击导出&#xff0c;在浏览器里下载对应的文件。 代码实现 html里&#xff1a; <table class"layui-hide" id"studentTable" lay-filter"studentTable&…

Dubbo3框架概述

1 什么是分布式系统? 《分布式系统原理与范型》定义: “分布式系统是若干独立计算机的集合,这些计算机对于用户来说就像单个相关系统” 分布式系统(distributed system)是建立在网络之上的软件系统。 简单来说:多个(不同职责)人共同来完成一件事! 任何一台服务器都无法…

open62541 使用账号密码认证示例

一、官方源码示例 源码参考 服务端官方示例&#xff1a; /* This work is licensed under a Creative Commons CCZero 1.0 Universal License.* See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */#include <open62541/plugin/accesscont…

QtWebEngineView加载本地网页

直接加载放在exe同级目录下的资源是不行的&#xff0c;需要把资源通过qrc放到exe里面&#xff0c;然后通过类似qrc:/robotHtml/index.html这样的路径加载才行。 mWebView new QWebEngineView(parent);// mWebView->load(QUrl::fromLocalFile("./robotHtml/index.html&…

【网络安全】XML-RPC PHP WordPress漏洞

未经许可,不得转载。 文章目录 前言WordPressWordPress中的Xmlrpc.php利用前提:Xmlrpc可访问深度利用1、用户名枚举2、跨站点端口攻击(XSPA)或端口扫描3、使用xmlrpc.php进行暴力攻击前言 本文将解释xmlrpc.php WordPress 漏洞及利用方式,并以三种攻击方法进行阐发: 1、…

代码随想录算法训练营第四十一天 | 121. 买卖股票的最佳时机 , 122.买卖股票的最佳时机II , 123.买卖股票的最佳时机III

目录 121. 买卖股票的最佳时机 思路 暴力 贪心 动态规划 1.确定dp数组&#xff08;dp table&#xff09;以及下标的含义 2.确定递推公式 3.dp数组如何初始化 4.确定遍历顺序 5.举例推导dp数组 方法一&#xff1a; 贪心 方法二&#xff1a;动态规划1 方法三&#xf…

国内外大模型汇总:Open AI大模型、Google大模型、Microsoft大模型、文心一言大模型、通义千问大模型、字节豆包大模型、智普清言大模型

Open AI大模型 特点&#xff1a; 多模态能力&#xff1a;如GPT-4o&#xff0c;能接受文本、音频、图像作为组合输入&#xff0c;并生成任意形式的输出。 情感识别与回应&#xff1a;具备情感识别能力&#xff0c;能根据对话者的情绪做出有感情的回应。 几乎无延迟&#xff…

XSS LABS - Level 14 过关思路

关注这个靶场的其他相关笔记&#xff1a;XSS - LABS —— 靶场笔记合集-CSDN博客 0x01&#xff1a;关卡配置 这一关有些特殊&#xff0c;需要链接到外部站点&#xff0c;但是这个站点已经挂了&#xff0c;无法访问&#xff1a; 所以笔者就根据网上的资料&#xff0c;对这一关进…

k8s的安装

概念 全写&#xff1a;Kubernets k8s作用&#xff1a;用于自动部署、拓展、管理容器化部署的应用程序。它是半开源的&#xff0c;核心是在谷歌里面&#xff0c;它的底层是由go语言开发的。可以理解成负责自动化运维管理多个容器化的应用的集群。也可以理解为容器编排框架的工…

2k1000LA 调试4G

问题&#xff1a; 其实算不上 调试&#xff0c; 之前本来4G是好的&#xff0c;但是 我调试了触摸之后&#xff0c;发现4G用不了了。 其实主要是 pppd 这个命令找不到。 首先来看 为什么 找不到 pppd 这个命令。 再跟目录使用 find 命令&#xff0c;能够找到这个命令&#…

PyCharm中python语法要求——消去提示波浪线

PyCharm中python语法要求——消去提示波浪线 关闭代码规范检查 在Setting里边搜索pep&#xff0c;取消勾选pep8 coding style violation 问题产生 解决问题 按照下图操作&#xff0c;也可直接CtrlAlts弹出设置页面 在 Settings 中 &#xff1a; Editor > Color Sheame >…

百度搜索的RLHF性能优化实践

作者 | 搜索架构部 导读 本文大语言模型在未经标注的大量文本上进行预训练后&#xff0c;可能产生包含偏见、泄露隐私甚至对人类构成威胁的内容。OpenAI 最先提出了基于人类反馈的强化学习算法(Reinforcement Learning fromHuman Feedback, RLHF)&#xff0c;将人类偏好引入到…

关于ssrf的实现

目录 ssrf漏洞形成 ssrf实现 ssrf(curl) ssrf漏洞形成 SSRF(Server-Side Request Forgery:服务器端请求伪造)漏洞形成的原因主要是服务器端所提供的接口中包含了所要请求的内容的URL参数&#xff0c;并且未对客户端所传输过来的URL参数进行过滤 ssrf实现 本次ssrf于Pikac…