五、Kafka消费者

目录

    • 5.1 Kafka的消费方式
    • 5.2 Kafka 消费者工作流程
      • 1、总体流程
      • 2、消费者组原理
      • 3、==消费者组初始化流程==
      • 4、==消费者组详细消费流程==
    • 5.3 消费者API
      • 1 独立消费者案例(订阅主题)
      • 2 独立消费者案例(订阅分区)
      • 3 消费者组案例
    • 5.4 生产经验——分区的分配以及再平衡

5.1 Kafka的消费方式

pull(拉)模 式:consumer采用从broker中主动拉取数据。Kafka采用这种方式。

缺点: pull模式不足之处是,如 果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据

push(推)模式:Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的消费速率



5.2 Kafka 消费者工作流程

1、总体流程

【注意】

  • 消费者只能从主分区上拉取数据,从节点起到同步和冗余数据的作用
  • 每个分区的数据只能由消费者组中一个消费者消费
  • 一个消费者可以消费多个分区数据
  • 每个消费者的offset由消费者提交到系统主题保存
    在这里插入图片描述

2、消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    在这里插入图片描述
    在这里插入图片描述

3、消费者组初始化流程

4、消费者组详细消费流程

在这里插入图片描述



5.3 消费者API

1 独立消费者案例(订阅主题)

public class CustomConsumer {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");// 设置分区分配策略// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}kafkaConsumer.commitAsync();}}
}

在这里插入图片描述

2 独立消费者案例(订阅分区)

public class CustomConsumerPartition {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题对应的分区ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

3 消费者组案例

1)需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费
在这里插入图片描述



5.4 生产经验——分区的分配以及再平衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/109089.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4.22 TCP 四次挥手,可以变成三次吗?

目录 为什么 TCP 挥手需要四次呢&#xff1f; 粗暴关闭 vs 优雅关闭 close函数 shotdown函数 什么情况会出现三次挥手&#xff1f; 什么是 TCP 延迟确认机制&#xff1f; TCP 序列号和确认号是如何变化的&#xff1f; 在一些情况下&#xff0c; TCP 四次挥手是可以变成 T…

冲破时代鸿沟,Linus和Eversheet,杰出程序员的创新成果

在80年代末&#xff0c;电脑技术的普及程度与今日相较&#xff0c;犹如鸿沟天堑。那时&#xff0c;计算机对大多数人来说还是稀罕物&#xff0c;尤其在像中国这样的发展中国家。 与如今充满信息的网络环境相比&#xff0c;那个时代没有Web&#xff0c;没有Google等搜索引擎&am…

Docker安装并配置Pushgateway

Linux下安装Docker请参考&#xff1a;Linux安装Docker 简介 Pushgateway是Prometheus的一个组件&#xff0c;prometheus server默认是通过Exporter主动获取数据&#xff08;默认采取pull拉取数据&#xff09;&#xff0c;Pushgateway则是通过exporter主动方式推送数据到Pushg…

SQLmap使用

文章目录 利用sqlmap 注入得到cms网站后台管理员账密获取数据库名称获取cms数据库的表名获取users表中的字段&#xff08;内容&#xff09;获取username字段和password字段的内容 salmap破解psot请求数据包salmap获取getshell 利用sqlmap 注入得到cms网站后台管理员账密 获取数…

机器人制作开源方案 | 桌面级机械臂--本体说明+驱动及控制

一、本体说明 1. 机械臂整体描述 该桌面级机械臂为模块化设计&#xff0c;包含主机模块1个、转台模块1个、二级摆动模块1个、可编程示教盒1个、2种末端执行器、高清摄像头&#xff0c;以及适配器、组装工具、备用零件等。可将模块快速组合为一个带被动关节的串联3自由度机械臂…

Maven详解

文章目录 一、引言1.1 为什么需要 Maven&#xff1f;1.2 Maven 解决了哪些问题&#xff1f;1.2.1 添加第三方jar包1.2.2 jar包之间的依赖关系1.2.3 处理jar包之间的冲突1.2.4 获取第三方jar包1.2.5 将项目拆分成多个工程模块1.2.6 实现项目的分布式部署 二、介绍三、Maven 的特…

python数组基本使用

使用Numpy进行数组运算 相比 List&#xff0c;NumPy 数组的优势 NumPy 全称为 Numerical Python&#xff0c;是 Python 的一个以矩阵为主的用于科学计算的基础软件包。NumPy 和 Pandas、Matpotlib 经常结合一起使用&#xff0c;所以被人们合称为数据分析三剑客。Numpy 中有功能…

winpe还原windows系统备份

准备工作 用大白菜制作一个启动u盘&#xff0c;里面可以镜系统备份文件 插入电脑&#xff0c;启动&#xff0c;按f11&#xff08;这个快捷键因电脑而异&#xff09;&#xff0c;选择启动u盘&#xff0c;进入winpe 硬盘格式化 选择分区助手软件 选择硬盘&#xff0c;右键选择【…

【Java 中级】一文精通 Spring MVC - 数据验证(七)

&#x1f449;博主介绍&#xff1a; 博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家&#xff0c;WEB架构师&#xff0c;阿里云专家博主&#xff0c;华为云云享专家&#xff0c;51CTO 专家博主 ⛪️ 个人社区&#x…

算法通关村第8关【黄金】| 寻找祖先问题

思路&#xff1a;递归三部曲 第一步&#xff1a;确定参数和返回值 题目要求找到指定的结点&#xff0c;就需要返回结点。 题目又涉及到p,q就需要传入p,q&#xff0c;需要遍历传入root 第二步&#xff1a;确定终止条件 当遍历到结点为空说明到底没找到返回空 或者遍历到p,…

飞腾平台芯片测试固件(SFW)和开机启动log

一、说两句 最近公司飞腾产品越来越多了&#xff0c;FT-2000/4的D2000的X100的&#xff0c;最近又新出了E2000。越来越多新来的小孩儿开始加入到飞腾的调测试中&#xff0c;那么在他们实际的调试中会遇到很多的问题。在固件启动阶段有的板卡会有一些异常&#xff0c;有时我们需…

ChatGPT 与 Python进行动态可视化分析

Python数据分析目前最为热门的岗位操作。 想使用Python进行可视化分析&#xff0c;但是又不想写代码&#xff0c;测试&#xff0c;验证。可以交给ChatGPT&#xff0c;open AI 来进行操作。 这样的动态图显示&#xff0c;我们只需要给ChatGPT发送一个指令&#xff0c;人工智能就…

用AI + Milvus Cloud搭建着装搭配推荐系统教程

以下函数定义了如何将图像转换为向量并插入到 Milvus Cloud 向量数据库中。代码会循环遍历所有图像。(注意:如果需要开启 Milvus Cloud 全新特性动态 Schema,需要修改代码。) 查询向量数据库 以下代码演示了如何使用输入图像查询 Milvus Cloud 向量数据库,以检索和上传…

新方案unity配表工具

工具下载&#xff1a;网盘链接 工具结构&#xff1a;针对每张表格生成一个表格类&#xff0c;其中默认包含一个list和字典类型参数记录表格数据&#xff0c;初始化项目时将list中的数据转为按id索引的dictionary&#xff0c;用于访问数据。额外包含一个同名Temp后缀的类&#…

docker使用harbor进行镜像仓库管理演示以及部分报错解决

目录 一.安装harbor和docker-compose 1.下载 2.将该文件修改为这样&#xff0c;修改好自己的hostname和port&#xff0c;后文的用户和密码可以不改也可以改&#xff0c;用于登录 3.安装 二.修改daemon.json文件和/etc/hosts文件 三.使用powershell作windows端域名映射 四…

Unity 之transform.LookAt() 调整一个物体的旋转,使其朝向指定的位置

文章目录 总的介绍补充&#xff08;用于摄像机跟随的场景&#xff09; 总的介绍 transform.LookAt 是 Unity 引擎中 Transform 组件的一个方法&#xff0c;用于调整一个物体的旋转&#xff0c;使其朝向指定的位置。通常情况下&#xff0c;它被用来使一个物体&#xff08;如摄像…

无涯教程-PHP - preg_match_all()函数

preg_match_all() - 语法 int preg_match_all (string pattern, string string, array pattern_array [, int order]); preg_match_all()函数匹配字符串中所有出现的模式。 它将按照您使用可选输入参数order指定的顺序将这些匹配项放置在pattern_array数组中。有两种可能的类…

韶音的骨传导耳机怎么样,韶音骨传导耳机值得入手吗

韶音在骨传导耳机中&#xff0c;也存在着一定的影响力&#xff0c;并且目前就款式而言&#xff0c;关于骨传导耳机的产品韶音最新发布的还是去年的OpenRun Pro&#xff0c;从外观上具备了七种颜色可以挑选&#xff0c;颜值上可以说是比较耐打的&#xff0c;配置方面对于现在需求…

java八股文面试[JVM]——类加载器

一、类加载器的概念 类加载器是Java虚拟机用于加载类文件的一种机制。在Java中&#xff0c;每个类都由类加载器加载&#xff0c;并在运行时被创建为一个Class对象。类加载器负责从文件系统、网络或其他来源中加载类的字节码&#xff0c;并将其转换为可执行的Java对象。类加载器…

window如何实时刷新日志文件

1 安装windows git 下载地址&#xff1a;Git - Downloading Package (git-scm.com) 2 打开git bash 输入tail.exe -f 日志文件路径