【大数据学习 | kafka】简述kafka的消费者consumer

1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。

这里面要涉及到一个动作叫做拉取。

首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push

1.1 消费者组

在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。

正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。

这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2. 消费者实现

在实现消费者的时候我们需要知道几个消费者的配置重要参数

参数解释
bootstrap.servers集群地址
key.deserializerkey反序列化器
value.deserializervalue反序列化器
group.id消费者组id

首先创建消费者对象

消费者对象订阅相应的topic然后拉取其中的数据进行消费

整体代码如下

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");//设定组idpro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定key的反序列化器pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定value的反序列化器KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_a","topic_b");//一个消费者可以消费多个分区的数据consumer.subscribe(topics);//订阅这个topicwhile (true){//死循环要一直消费数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//间隔一秒钟消费一次数据,拉取一批数据过来Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。

# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况

首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行

启动两次,这个时候我们就有了两个消费者实例

生产者线程:分别向三个分区中发送1 2 3元素

package com.hainiu.kafka.consumer;/*** ClassName : test3_producer* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 23:40* Version 1.0*/import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test3_producer {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");producer.send(record1);producer.send(record2);
//        producer.send(record3);producer.close();}
}

可以看到有的消费者消费了两个分区的数据

如果启动三个消费者会发现每个人消费一个分区的数据

如果启动四个消费者

我们发现有一个消费者没有数据

3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的

3.2 多个组消费一个topic

同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的

修改同一份代码的组标识不同。启动两个实例查看里面的消费信息

   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");//分别修改消费者组的id不同
package com.hainiu.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_c");//订阅多个topic的数据变化consumer.subscribe(topics);while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/467448.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp发布到微信小程序,提示接口未配置在app.json文件中

使用uniapp打包上传微信小程序发布&#xff0c;在提交审核时提示 “接口未配置在app.json文件中” 如下图所示 解决方法&#xff1a;在manifest.json文件中打开源码视图&#xff0c;添加 requiredPrivateInfos 字段键入所需要的接口&#xff08;数组&#xff09;

重新下载Window11系统中的mfc100.dll文件

环境 Xshell6Xftp6Window11 前言 最近下载了一款绿色版本的Xshell远程客户端软件&#xff0c;用来登录Linux服务器&#xff0c;在Window11使用&#xff0c;点击时候提示很多dll文件缺失&#xff0c;所以比较纠结&#xff0c;因为是绿色版本软件&#xff0c;所以不能重装&…

js基础篇笔记 (万字速通)

此笔记来自于黑马程序员,仅供笔者复习 JavaScript 基础 - 第1天 了解变量、数据类型、运算符等基础概念&#xff0c;能够实现数据类型的转换&#xff0c;结合四则运算体会如何编程。 体会现实世界中的事物与计算机的关系理解什么是数据并知道数据的分类理解变量存储数据的“容…

vue3+ts+element-ui实现的可编辑table表格组件 插入单行多行 组件代码可直接使用

最近需求越来越离谱&#xff0c;加班越来越严重&#xff0c;干活的牛马也越来越卑微。写了一个可编辑表格&#xff0c;并已封装好组件&#xff0c;可直接使用。 基于这位大佬的 动态表格自由编辑 方法和思路&#xff0c;于是参考和重写了表格&#xff0c;在基础上增加和删除了…

决策树(部分)

目录 信息熵 总结&#xff1a; 特征选择 信息增益&#xff1a;ID3算法 增益率&#xff1a;C4.5 基尼指数 剪枝处理 预剪枝 后剪枝 信息熵 信息熵 (entropy)是 用于度量样本集合“ 纯度 ” 最常用的一种指标&#xff0c;其中 “ 熵 ” 是事物的不确定性&#xff0c;假定…

webpack 执行流程 — 实现 myWebpack

前言 实现 myWebpack 主要是为了更好的理解&#xff0c;webpack 中的工作流程&#xff0c;一切都是最简单的实现&#xff0c;不包含细节内容和边界处理&#xff0c;涉及到 ast 抽象语法树和编译代码部分&#xff0c;最好可以打印出来观察一下&#xff0c;方便后续的理解。 re…

【python】Flask

文章目录 1、Flask 介绍2、Flask 实现网页版美颜效果3、参考 1、Flask 介绍 Flask 是一个用 Python 编写的轻量级 Web 应用框架。它设计简单且易于扩展&#xff0c;非常适合小型项目到大型应用的开发。 以下是一些 Flask 库中常用的函数和组件&#xff1a; 一、Flask 应用对…

AI大模型如何重塑软件开发流程?

《AI大模型对软件开发流程的重塑&#xff1a;变革、优势、挑战与展望》 一、传统软件开发流程与模式&#xff08;一&#xff09;传统软件开发流程&#xff08;二&#xff09;传统软件开发模式面临的问题&#xff08;一&#xff09;AI在软件开发中的应用场景&#xff08;二&…

OceanBase 应用实践:如何处理数据空洞,降低存储空间

问题描述 某保险行业客户的核心系统&#xff0c;从Oracle 迁移到OceanBase之后&#xff0c;发现数据存储空间出现膨胀问题&#xff0c;数据空间 datasize9857715.48M&#xff0c;实际存储占用空间17790702.00M。根据 required_mb - data_mb 值判断&#xff0c;数据空洞较为严重…

Zookeeper运维秘籍:四字命令基础、详解及业务应用全解析

文章目录 一、四字命令基础二、四字命令详解三、四字命令的开启与配置四、结合业务解读四字命令confconsenvi命令Stat命令MNTR命令ruok命令dump命令wchswchp ZooKeeper&#xff0c;作为一款分布式协调服务&#xff0c;提供了丰富的四字命令&#xff08;也称为四字短语&#xff…

MATLAB大数计算工具箱及其用法

1. MATLAB大数工具箱Variable Precision Integer Arithmetic介绍 Variable Precision Integer Arithmetic是John DErrico 开发的大数运算工具箱&#xff0c;可以用完全任意大小的整数进行算术运算。支持vpi定义的数组和向量。 2.MATLAB代码 完整代码见: https://download.cs…

【野生动物识别系统】Python+深度学习+人工智能+卷积神经网络算法+TensorFlow+ResNet+图像识别

一、介绍 动物识别系统&#xff0c;使用Python作为主要开发语言&#xff0c;基于深度学习TensorFlow框架&#xff0c;搭建卷积神经网络算法。并通过对18种动物数据集进行训练&#xff0c;最后得到一个识别精度较高的模型。并基于Django框架&#xff0c;开发网页端操作平台&…

数据库_SQLite3

下载 1、更新软件源&#xff1a; sudo apt-get update 2、下载SQLite3&#xff1a; sudo apt-get install sqlite3 3、验证&#xff1a; sqlite3启动数据库&#xff0c;出现以下界面代表运行正常。输入 .exit 可以退出数据库 4、安装sqlite3的库 sudo apt-get install l…

PyTorch核心概念:从梯度、计算图到连续性的全面解析(三)

文章目录 Contiguous vs Non-Contiguous TensorTensor and ViewStrides非连续数据结构&#xff1a;Transpose( )在 PyTorch 中检查Contiguous and Non-Contiguous将不连续张量&#xff08;或视图&#xff09;转换为连续张量view() 和 reshape() 之间的区别总结 参考文献 Contig…

如何解决导入aioredis报错TypeError: duplicate base class TimeoutError的问题(轻松解决,亲测有效)

下面是根据你的要求撰写的文章: 文章目录 📖 介绍 📖🏡 演示环境 🏡📒 aioredis导包报错 📒📝 解决方案📝 小贴士⚓️ 相关链接 ⚓️📖 介绍 📖 最近在使用Python异步redis模块aioredis的时候遇到了一个错误,导包报错提示 TypeError: duplicate base cla…

基于Springboot+Android的智慧社区互助平台 (含源码数据库)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 这个系…

讨论一个mysql事务问题

最近在阅读一篇关于隔离级别的文章&#xff0c;文章中提到了一种场景&#xff0c;我们下面来分析一下。 文章目录 1、实验环境2、两个实验的语句执行顺序3、关于start transaction和start transaction with consistent snapshot4、实验结果解释4.1、实验14.2、实验24.3、调整实…

Kubernetes-编排工具篇-01-Kustomize与Helm对比

Kustomize与Helm对比 0、前言 K8s 是一个开源容器编排平台&#xff0c;可自动执行容器化应用程序的部署、扩展和管理。近年来&#xff0c;K8s 已成为采用云原生架构和容器化技术的组织的标准。 但是由于K8s的复杂性&#xff0c;所以很多公司以及开源组织都在开发相关的工具来…

确定图像的熵和各向异性 Halcon entropy_gray 解析

1、图像的熵 1.1 介绍 图像熵&#xff08;image entropy&#xff09;是图像“繁忙”程度的估计值&#xff0c;它表示为图像灰度级集合的比特平均数&#xff0c;单位比特/像素&#xff0c;也描述了图像信源的平均信息量。熵指的是体系的混乱程度&#xff0c;对于图像而言&#…

数字IC后端设计实现之Innovus自动修复Min Step DRC Violation方案

在实际IC后端项目中我们经常会遇到min step的DRC Violation&#xff0c;如下图所示。 在咱们IC后端训练营项目中也会遇到这类DRC Violation。这类DRC Violation的本质是出现Metal的Notch&#xff0c;即metal有凹槽。 如果是pg net的 Min Step问题&#xff0c;我们可以使用下面的…