kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。
一、自定义PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class BroadcastAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "broadcast";}private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {Map<String, List<String>> res = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic =consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();subscriptions.keySet().forEach(memberId ->assignment.put(memberId, new ArrayList<>()));consumersPerTopic.entrySet().forEach(topicEntry->{String topic = topicEntry.getKey();List<String> members = topicEntry.getValue();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null || members.isEmpty())return;List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);if (!partitions.isEmpty()) {members.forEach(memberId ->assignment.get(memberId).addAll(partitions));}});return assignment;}
}

二、定义两个消费者,给其配置上述PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class KafkaTest19 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());String topic="study2023";myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}}}
}
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class KafkaTest20 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());String topic="study2023";myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}}}
}

在kafka创建只有一个分区的topic : study2023

创建一个生产者往study2023这个 topic发送消息:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaTest01 {public static void main(String[] args) {Properties properties= new Properties();properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);ProducerRecord<String,String> producerRecord=new ProducerRecord<>("study2023",0,"fff","hello sister,now is: "+ new Date());Future<RecordMetadata> future = kafkaProducer.send(producerRecord);long offset = 0;try {offset = future.get().offset();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(offset);kafkaProducer.close();}
}

分别运行生产者和消费者,可以看到相同消费者组里两个消费者可以消费study2023这个topic的同一个分区的数据
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/110412.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JDK配置环境变量(超详细)

先安装JDK再配置环境变量&#xff01; JDK可以简单理解为就是java&#xff0c;JDK包含了java项目运行所需要的运行环境JRE&#xff0c;编译运行java程序的java虚拟机JVM。 jdk-8u201-windows-x64安装包&#xff08;jdk1.8&#xff09;&#xff1a; 提取码&#xff1a;19xv …

外部库/lib/maven依赖项 三者关系

外部库(存放项目初始配置的jar包)(它的文件夹里并没有包含lib文件夹的引的外部的依赖的jar包) lib(存放外部导入到项目的依赖的jar包) maven依赖项(管理项目所有的jar包依赖) 三者存放jar包的关系 项目所依赖的全部的jar包 maven依赖项的jar包 外部库中的jar包 lib中的…

文件夹无法删除?简单3招,轻松解决问题!

“我电脑里有一个文件夹占用了很大的内存&#xff0c;我想将它删除来释放一些内存&#xff0c;但是根本没法删除&#xff0c;为什么会这样呢&#xff1f;文件夹无法删除应该怎么办呢&#xff1f;” 在日常电脑使用中&#xff0c;有时候会遇到文件夹无法删除的情况&#xff0c;这…

【Terraform学习】使用 Terraform 创建应用程序负载均衡器(Terraform-AWS最佳实战学习)

使用 Terraform 创建应用程序负载均衡器 实验步骤 前提条件 安装 Terraform&#xff1a; 地址 下载仓库代码模版 本实验代码位于 task_elb 文件夹中。 变量文件 variables.tf 在上面的代码中&#xff0c;您将声明&#xff0c;aws_access_key&#xff0c;aws_secret_key…

深入理解ArrayList的动态扩容机制及应用

在java编程中&#xff0c;数据结构起着至关重要的作用&#xff0c;而ArrayList作为一种常用的动态数组&#xff0c;为我们在处理数据时提供了便利。其中&#xff0c;其独特的动态扩容机制更是为其赢得了广泛的应用。我们不管在工作还是面试中&#xff0c;都会遇到ArrayList&…

Nginx详解 二:配置文件部分

文章目录 1. Nginx 配置文件1.1 主配置文件1.2 子配置文件1.3 全局配置1.3.1 修改启动的进程数1.3.2 cpu和work进程绑定&#xff08;nginx调优&#xff09;1.3.3 修改PID路径1.3.4 nginx进程的优先级&#xff08;work进程的优先级&#xff09;1.3.5 调试work进程打开的文件的个…

聚观早报|2023戴尔科技峰会助力创新;小米汽车电池供应商敲定

【聚观365】8月23日消息 2023戴尔科技峰会助力企业创新 小米汽车电池供应商敲定中创新航和宁德时代 iPhone15预计有6种配色 王小川卸任自动驾驶企业禾多科技董事 特斯拉动力总成副总裁宣布离职 2023戴尔科技峰会助力企业创新 近日“新生万物 数实新格局 —— 2023戴尔科技…

周鸿祎为360智脑招贤纳士;LLM时代的选择指南;Kaggle大语言模型实战;一文带你逛遍LLM全世界 | ShowMeAI日报

&#x1f440;日报&周刊合集 | &#x1f3a1;生产力工具与行业应用大全 | &#x1f9e1; 点赞关注评论拜托啦&#xff01; &#x1f916; 思否「齐聚码力」黑客马拉松&#xff0c;用技术代码让生活变得更美好 主页&#xff1a;https://pages.segmentfault.com/google-hacka…

系统架构设计高级技能 · 安全架构设计理论与实践

点击进入系列文章目录 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the future of dream weaving wings, let the dream fly in reality. 系统架构设计高级技能 安全架构设计理论与实践 一、信息安全面临的威胁1.1 信息…

相机SD卡数据丢失如何恢复?

出门在外&#xff0c;相机是人们记录生活点滴的重要工具&#xff0c;是旅游的最佳玩伴。人们每到一个地方&#xff0c;都喜欢用相机来见证自己来过的痕迹&#xff0c;拍好的照片都会被放到相机卡里&#xff0c;但在使用相机时&#xff0c;有时我们会意外删除了重要的照片或视频…

【SpringCloudAlibaba】Sentinel使用

文章目录 概述官网解决的问题主要特性 配置下载可视化控制台POMYML 流控规则直接(默认)关联链路 降级规则降级策略实战RT异常比例异常数 热点key限流示例&#xff1a;高级选项&#xff1a;参数例外项其他 系统规则SentinelResource按资源名称限流后续处理按照Url地址限流后续处…

hdfs操作

hadoop fs [generic options] [-appendToFile … ] [-cat [-ignoreCrc] …] [-checksum …] [-chgrp [-R] GROUP PATH…] [-chmod [-R] <MODE[,MODE]… | OCTALMODE> PATH…] [-chown [-R] [OWNER][:[GROUP]] PATH…] [-copyFromLocal [-f] [-p] [-l] [-d] … ] [-copyTo…

深度学习卷积神经网络识别光学字符验证码,及captcha使用简单案例

深度学习卷积神经网络识别验证码 文章目录 深度学习卷积神经网络识别验证码一、引言二、导入必要的库三、防止 tensorflow 占用所有显存四、定义数据生成器并测试五、定义网络结构六、训练模型七、测试模型 一、引言 验证码识别&#xff0c;本身使用来判断访问网站的用户是不是…

【JSDocvscode】使用JSDoc、在vscode中开启node调试、使用vscode编写运行Python程序

JSDoc JSDoc是JavaScript的一种注释语法&#xff0c;同时通过JSDoc注释也可以规避js弱类型中不进行代码提示的问题 图形展示JSDoc的效果&#xff1a; 上述没有进行JSDoc&#xff0c;然后我们a点什么 是没有任何提示的 上述就是加上 JSDoc的效果 常用的 vscode 其实内置了 js…

SpringBoot整合JUnit、MyBatis、SSM

&#x1f40c;个人主页&#xff1a; &#x1f40c; 叶落闲庭 &#x1f4a8;我的专栏&#xff1a;&#x1f4a8; c语言 数据结构 javaEE 操作系统 石可破也&#xff0c;而不可夺坚&#xff1b;丹可磨也&#xff0c;而不可夺赤。 SpringBoot整合 一、SpringBoot整合JUnit二、Spri…

Android获取手机已安装应用列表JAVA实现

最终效果: 设计 实现java代码: //获取包列表private List<String> getPkgList() {List<String> packages new ArrayList<String>();try {//使用命令行方式获取包列表Process p Runtime.getRuntime().exec("pm list packages");//取得命令行输出…

11.2.1-通货膨胀CPI

文章目录 1. 什么是CPI2. 在哪里获取CPI数据3. CPI的同比、环比到底是什么意思&#xff1f;4. 计算购买力侵蚀5. 复利计算 微不足道的小事也会引发惊人的结果&#xff0c; 每念及此&#xff0c; 我就认为世上无小事。——布鲁斯巴登&#xff08;Bruce Barton&#xff09; 核心内…

大数据Flink实时计算技术

1、架构 2、应用场景 Flink 功能强大&#xff0c;支持开发和运行多种不同种类的应用程序。它的主要特性包括&#xff1a;批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。在启用高可用选项的情况下&#xff0c;它不存在单点失效问题。事实证明&#…

Scrum敏捷研发迭代式开发

Scrum是一个迭代式增量软件开发过程&#xff0c;是敏捷方法论中的重要框架之一。它通常用于敏捷软件开发&#xff0c;包括了一系列实践和预定义角色的过程骨架。Scrum中的主要角色包括Scrum主管&#xff08;Scrum Master&#xff09;、产品负责人&#xff08;Product Owner&…

Dockerfile文件详细

Dockerfile 是一个文本文件&#xff0c;里面包含组装新镜像时用到的基础镜像和各种指令&#xff0c;使用dockerfile 文件来定义镜像&#xff0c;然后运行镜像&#xff0c;启动容器。 构建镜像步骤 ① 编写一个 dockerfile 文件 ② 使用 ​​​docker build​​​构建镜像 ③ …