Kafka3.0.0版本——消费者(消费者组案例)

目录

    • 一、消费者组案例
      • 1.1、案例需求
      • 1.2、案例代码
        • 1.2.1、消费者1代码
        • 1.2.2、消费者2代码
        • 1.2.3、消费者3代码
        • 1.2.4、生产者代码
      • 1.3、测试

一、消费者组案例

1.1、案例需求

  • 测试同一个主题的分区数据,只能由一个消费者组中的一个消费。如下图所示:
    在这里插入图片描述

1.2、案例代码

复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。

1.2.1、消费者1代码

  • 代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");// 设置分区分配策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("firstTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){//每一秒拉取一次数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//输出数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}kafkaConsumer.commitAsync();}}
    }
    

1.2.2、消费者2代码

  • 代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumer2 {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");// 设置分区分配策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("firstTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }

1.2.3、消费者3代码

  • 代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumer3 {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");// 设置分区分配策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("firstTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){//每一秒拉取一次数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//输出数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}kafkaConsumer.commitAsync();}}
    }
    

1.2.4、生产者代码

  • 代码

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 200; i++) {kafkaProducer.send(new ProducerRecord<>("firstTopic", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
    

1.3、测试

  • 在 Kafka 集群控制台,创建firstTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 3 --replication-factor 1 --topic firstTopic
    

    在这里插入图片描述

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:
    由下图可知:3个消费者在消费不同分区的数据
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/124171.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式(1) - UML类图

1、前言 最近在阅读 Android 源码&#xff0c;时常碰到代码中有一些巧妙的写法&#xff0c;简单的如 MediaPlayerService 中的 IFactory&#xff0c;我知道它是工厂模式&#xff0c;但是却不十分清楚它为什么这么用&#xff1b;复杂点的像 NuPlayer 中的 DeferredActions 机制…

vulhub-tomcat弱口令

1.启动靶场 进入文件 进入目录 进入到靶场 启动靶场 docker-compose up -d 2.查看 ip地址 3.使用nmap对ip进行 扫描 发现存在8080的端口&#xff0c;并且端口是开放的状态&#xff0c;apache&#xff0c;tomcat搭建的 4.访问ip地址的端口 点击Manager app 6.开启BP进行抓包 随…

vue-cli3项目本地启用https,并用mkcert生成证书

在项目根目录下的vue.config.js文件中&#xff1a; // vue.config.js module.exports {devServer: {host:dev.nm.cngc// 此处开启 https,并加载本地证书&#xff08;否则浏览器左上角会提示不安全&#xff09;https: {cert: fs.readFileSync(path.join(_dirname,./cert.crt)…

UI自动化之混合框架

什么是混合框架&#xff0c;混合框架就是将数据驱动与关键字驱动结合在一起&#xff0c;主要用来回归业务主流程&#xff0c;将核心流程串联起来。 上一篇我们写到了关键字驱动框架&#xff0c;关键字驱动框架是针对一个业务场景的单条测试用例的。 我们以163邮箱的登录到创建…

云计算中的负载均衡技术,确保资源的平衡分配

文章目录 1. 硬件负载均衡器2. 软件负载均衡器3. DNS负载均衡4. 内容分发网络&#xff08;CDN&#xff09; &#x1f388;个人主页&#xff1a;程序员 小侯 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 ✨收录专栏&#xff1a;云计算 ✨文章内…

【负载均衡】常见的负载均衡策略有哪些?

文章目录 前言负载均衡分类常见负载均衡策略小结 前言 负载均衡策略是实现负载均衡器的关键&#xff0c;而负载均衡器又是分布式系统中不可或缺的重要组件。使用它有助于提高系统的整体性能、可用性、可靠性和安全性&#xff0c;同时支持系统的扩展和故障容忍性。对于处理大量…

el-form表单动态校验(场景: 输入框根据单选项来动态校验表单 没有选中的选项就不用校验)

el-form表单动态校验 el-form常规校验方式: // 结构部分 <el-form ref"form" :model"form" :rules"rules"><el-form-item label"活动名称: " prop"name" required><el-input v-model"form.name" /…

面试中的时间管理:如何在有限时间内展示最大价值

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

客户端发现pod并与之通信

客户端发现pod并与之通信 pod需要一种寻找其他pod的方法来使用其他pod提供的服务&#xff0c;不像在没有Kubernetes的世界&#xff0c;系统管理员要在用户端配置文件中明确指出服务的精确IP地址 或者主机名来配置每个客户端应用&#xff0c;但同样的方法在Kubernetes中不适用 …

IIS WebDAV配置,https绑定及asp设置

IIS支持标准CGI&#xff0c;因此可以用程序语言针对STDIN和STDOUT开发。 IIS CGI配置和CGI程序FreeBasic, VB6, VC 简单样例_Mongnewer的博客-CSDN博客 IIS支持脚本解释CGI&#xff0c;因此可以用脚本语言针对STDIN和STDOUT开发。 IIS perl python cbrother php脚本语言配置…

(二十三)大数据实战——Flume数据采集之采集数据聚合案例实战

前言 本节内容我们主要介绍一下Flume数据采集过程中&#xff0c;如何把多个数据采集点的数据聚合到一个地方供分析使用。我们使用hadoop101服务器采集nc数据&#xff0c;hadoop102采集文件数据&#xff0c;将hadoop101和hadoop102服务器采集的数据聚合到hadoop103服务器输出到…

3dMax全球学习资源、资源文件和教程 !

此样例教育教程和学习资源旨在提供使用Autodesk 3ds Max时的计划知识和培训、正确的工作流、流程管理和最佳实践。 您在Autodesk三维设计领域的职业生涯 有关使用3ds Max和Maya在计算机图形领域开始职业生涯的提示&#xff08;包括新的3ds Max和Maya介绍教程&#xff0c;以复…

vmware fusion12共享文件夹到虚拟机window10

文章目录 一、window10虚拟机安装VMware Tools二、MAC配置共享文件夹三、使用 一、window10虚拟机安装VMware Tools vmware fusion—虚拟机----安装VMware Tools–一路下一步 确认安装 双击进行安装 一路下一步&#xff0c;傻瓜式安装 二、MAC配置共享文件夹 设置—系…

浅谈Spring

Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器&#xff08;框架&#xff09;。 一、什么是IOC&#xff1f; IoC Inversion of Control 翻译成中⽂是“控制反转”的意思&#xff0c;也就是说 Spring 是⼀个“控制反转”的容器。 1.1控制反转推导 这个控制反转怎…

类的加载过程

清楚每个环节的操作过程就可以了 1.加载Loading 加载过程&#xff1a; 通过一个类的全限定名获取定义此类的二进制字节流将这个字节流所代表的静态存储结构转化为方法区的运行时数据结构在内存中生成一个代表这个类的java.lang.class对象&#xff0c;作为方法区这个类的各种数…

Gitea--私有git服务器搭建详细教程

一.官方文档 https://docs.gitea.com/zh-cn/说明 gitea 是一个自己托管的Git服务程序。他和GitHub, Gitlab等比较类似。他是从 Gogs 发展而来&#xff0c;gitea的创作团队重新fork了代码&#xff0c;并命名为giteagitea 功能特性多&#xff0c;能够满足我们所有的的代码管理需…

nowcoder NC10 大数乘法

题目链接&#xff1a; https://www.nowcoder.com/practice/c4c488d4d40d4c4e9824c3650f7d5571?tpId196&tqId37177&rp1&ru/exam/company&qru/exam/company&sourceUrl%2Fexam%2Fcompany&difficultyundefined&judgeStatusundefined&tags&tit…

《异常检测——从经典算法到深度学习》22 Kontrast: 通过自监督对比学习识别软件变更中的错误

《异常检测——从经典算法到深度学习》 0 概论1 基于隔离森林的异常检测算法 2 基于LOF的异常检测算法3 基于One-Class SVM的异常检测算法4 基于高斯概率密度异常检测算法5 Opprentice——异常检测经典算法最终篇6 基于重构概率的 VAE 异常检测7 基于条件VAE异常检测8 Donut: …

Unity中Shader的屏幕坐标

文章目录 前言一、屏幕坐标1、屏幕像素的坐标2、屏幕坐标归一化 二、在Unity中获取 当前屏幕像素 和 总像素1、获取屏幕总像素,使用_ScreenParams参数2、获取当前片段上的像素怎么使用:在片元着色器传入参数时使用 前言 Unity中Shader的屏幕坐标 一、屏幕坐标 1、屏幕像素的坐…

非科班菜鸡算法学习记录 | 代码随想录算法训练营第58天|| 单调栈! 739. 每日温度 496.下一个更大元素 I

739. 每日温度 输入一个数组&#xff0c;找比i天温度高的第一天 知识点&#xff1a;单调栈 状态&#xff1a;看思路自己写 思路&#xff1a; 看自己写的注释&#xff0c;维护一个单调栈 // 版本一 class Solution { public:vector<int> dailyTemperatures(vector<…