Kafka 消费者专题

目录

  • 消费者
    • 消费者组
    • 消费方式
    • 消费规则
    • 独立消费主题
      • 代码示例(极简)
      • 代码示例(独立消费分区)
    • offset
      • 自动提交
      • 代码示例(自动提交)
      • 手动提交
      • 代码示例(同步)
      • 代码示例(异步)
    • 其他说明

消费者

消费者组

  1. 由多个消费者组成
  2. 消费者组之间互不影响。
  3. 其他消费规则如下

消费方式

  1. push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。

  2. consumer采用pull(拉)模式从broker中读取数据。pull模式则可以根据consumer的消费能力以适当的速率消费消息。
    pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。

消费规则

在这里插入图片描述

  1. 一个消费者(单独消费者或消费者组中的一个)可以消费一个分区中的数据也可以消费两个或以上的分区数据
  2. 消费者组中的消费者必须访问不同的数据分区,不能访问同一个
  3. 同一个分区中的数据允许被不同的消费者访问(消费者不属于同一个组)

独立消费主题

代码示例(极简)

以下代码创建模拟一个消费者组(testCg)中的消费者,订阅来自topicA的消息
CustomTopicConsumer.java

package com.wunaiieq.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;//创建一个独立消费者,消费topicA主题下的数据
public class CustomTopicConsumer {public static void main(String[] args) {//1.创建消费者属性文件对象Properties prop = new Properties();//2.为属性对象设置相关参数//设置kafka服务器prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.16.100:9092");//设置key和value的序列化类prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//设置消费者的消费者组的名称prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testCg");//3.创建消费者对象KafkaConsumer<String, String> kafkaConsumer =new KafkaConsumer<String, String>(prop);//4.注册要消费的主题ArrayList<String> topics = new ArrayList<>();topics.add("topicA");//订阅主题kafkaConsumer.subscribe(topics);//5.拉取数据并打印输出while (true) {//6.设置1s消费一批数据ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));//7.打印输出消费到的数据for (ConsumerRecord consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

代码示例(独立消费分区)

这个消费者需要消费topicA分区下的0号分区数据

package com.wunaiieq.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;//创建一个独立消费者,消费topicA主题0号分区中的数据
public class ConsumTopicPartitionConsumer {public static void main(String[] args) {//1.创建属性对象Properties prop = new Properties();//2.设置相关参数prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testCg2");//3.创建消费者对象KafkaConsumer<String,String> kafkaConsumer =new KafkaConsumer<String, String>(prop);//4.为消费者注册主题和分区号List<TopicPartition> topicPartitions =new ArrayList<>();topicPartitions.add(new TopicPartition("topicA",0));kafkaConsumer.assign(topicPartitions);//5.消费数据while(true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord consumerRecord:consumerRecords){System.out.println(consumerRecord);}}}
}

offset

表示消费者在特定主题分区中的消费进度。
一般而言,这个offset不会主动去用,除非宕机重启等等
可以手动查看offset值和状况

修改系统配置
[root@node4 ~]# cd /opt/kafka/config/ [root@node4 config]# vim
consumer.properties exclude.internal.topics=false
查询offset
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node2:9092 --consumer.config config/consumer.properties --formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” --from-beginning

描述和作用:

  • Offset是Kafka中标识消息在分区内位置的一个唯一标识符。每个消息都有一个对应的Offset值,用于表示消息在分区中的相对位置。Offset是从0开始递增的,每当有新的消息写入分区时,Offset就会加1。Offset是不可变的,即使消息被删除或过期,Offset也不会改变或重用。
  • 定位消息:通过指定Offset,消费者可以准确地找到分区中的某条消息,或者从某个位置开始消费消息。
  • 记录消费进度:消费者在消费完一条消息后,需要提交Offset来告诉Kafka Broker自己消费到哪里了。这样,如果消费者发生故障或重启,它可以根据保存的Offset来恢复消费状态。
  • __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic名称+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic名称+分区号就保留最新数据。

自动提交

自动提交主要是根据时间设置,每隔一段时间提交

代码示例(自动提交)

设置offset自动提交,每xxx秒提交一次(默认是5秒)

package com.wunaiieq.offset;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerAutoOffset {public static void main(String[] args) {//1.创建属性对象Properties prop = new Properties();//2.设置属性参数prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cgauto");//是否自动提交offset: true表示自动提交,false表示非自动提交prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交offset的时间周期1000ms,默认是5000msprop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//3.创建消费者对象KafkaConsumer<String,String> kafkaConsumer =new KafkaConsumer<String, String>(prop);//4.设置消费主题kafkaConsumer.subscribe(Arrays.asList("topicA"));//5.消费消息while(true){//6.读取消息ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));//7.循环输出消息for(ConsumerRecord cr:consumerRecords){System.out.println(cr.value());}}}
}

手动提交

手动提交offset的方法有两种方式:

  1. commitSync同步提交:必须等待offset提交完毕,再去消费下一批数据。

  2. commitAsync异步提交:发送完提交offset请求后,就开始消费下一批数据了。
    两者的区别:

相同点是,都会将本次消费的一批数据的最高的偏移量提交;
不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

代码示例(同步)

等待offset提交完毕,再去消费下一批数据。

package com.wunaiieq.offset;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerHandSyncCommit {public static void main(String[] args) {//1.创建属性对象Properties prop = new Properties();//2.设置相关参数prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cghandSyncCommit");//设置为非自动提交prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//3.创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(prop);//4.注册消费主题consumer.subscribe(Arrays.asList("topicA"));//5.消费数据while(true){ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord record:records){System.out.println(record.value());}//6.同步提交offsetconsumer.commitSync();}}
}

代码示例(异步)

代码上的区别很小,提交方式由consumer.commitSync();改为consumer.commitAsync();

package com.wunaiieq.offset;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerHandASyncCommit {public static void main(String[] args) {//1.创建属性对象Properties prop = new Properties();//2.设置相关参数prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cghandAsyncCommit");//设置为非自动提交prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//3.创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(prop);//4.注册消费主题consumer.subscribe(Arrays.asList("topicA"));//5.消费数据while(true){ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord record:records){System.out.println(record.value());}//6.异步提交offsetconsumer.commitAsync();}}
}

其他说明

  1. 一个消费者允许消费多个主题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/502773.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【踩坑指南2.0 2025最新】Scala中如何在命令行传入参数以运行主函数

这个地方基本没有任何文档记录&#xff0c;在学习的过程中屡屡碰壁&#xff0c;因此记录一下这部分的内容&#xff0c;懒得看可以直接跳到总结看结论。 踩坑步骤 首先来看看书上让我们怎么写&#xff1a; //main.scala object Start {def main(args:Array[String]) {try {v…

数据分析思维(七):分析方法——群组分析方法

数据分析并非只是简单的数据分析工具三板斧——Excel、SQL、Python&#xff0c;更重要的是数据分析思维。没有数据分析思维和业务知识&#xff0c;就算拿到一堆数据&#xff0c;也不知道如何下手。 推荐书本《数据分析思维——分析方法和业务知识》&#xff0c;本文内容就是提取…

CSS 之 position 定位属性详解

CSS系列文章目录 CSS 之 display 布局属性详解 CSS 之 position 定位属性详解一文搞懂flex布局 【弹性盒布局】 文章目录 CSS系列文章目录一、前言二、静态定位&#xff1a;position:static&#xff1b;二、相对定位&#xff1a;position:relative三、绝对定位&#xff1a;pos…

麒麟信安云在长沙某银行的应用入选“云建设与应用领航计划(2024)”,打造湖湘金融云化升级优质范本

12月26日&#xff0c;2024云计算产业和标准应用大会在北京成功召开。大会汇集政产学研用各方专家学者&#xff0c;共同探讨云计算产业发展方向和未来机遇&#xff0c;展示云计算标准化工作重要成果。 会上&#xff0c;云建设与应用领航计划&#xff08;2024&#xff09;建云用…

微信小程序Uniapp

使用命令行创建项目&#xff08;vuets&#xff09; npx degit dcloudio/uni-preset-vue#vite-ts my-vue3-project然后用HBX打开项目 再安装依赖 npm i 再运行开发版本&#xff0c;生成dist目录 pnpm dev:mp-weixin 注意要设置APPid 再用微信小程序打开

汇编环境搭建

学习视频 将MASM所在目录 指定为C盘

计算机网络--路由表的更新

一、方法 【计算机网络习题-RIP路由表更新-哔哩哔哩】 二、举个例子 例1 例2

热备份路由HSRP及配置案例

✍作者&#xff1a;柒烨带你飞 &#x1f4aa;格言&#xff1a;生活的情况越艰难&#xff0c;我越感到自己更坚强&#xff1b;我这个人走得很慢&#xff0c;但我从不后退。 &#x1f4dc;系列专栏&#xff1a;网路安全入门系列 目录 一&#xff0c;HSRP的相关概念二&#xff0c;…

今日头条ip属地根据什么显示?不准确怎么办

在今日头条这样的社交媒体平台上&#xff0c;用户的IP属地信息对于维护网络环境的健康与秩序至关重要。然而&#xff0c;不少用户发现自己的IP属地显示与实际位置不符&#xff0c;这引发了广泛的关注和讨论。本文将深入探讨今日头条IP属地的显示依据&#xff0c;并提供解决IP属…

倍思氮化镓充电器分享:Super GaN伸缩线快充35W

快节奏的时代,在旅游、办公等场景下,一款高效、便捷的充电器可以让我们的生活更便捷、高效。今天就给大家推荐一款倍思氮化镓充电器——Super GaN伸缩线快充35W。它具备多重亮点,可以满足我们在许多场景下的充电需求,成为我们的得力助手。 倍思氮化镓Super GaN伸缩线快充35W的亮…

云架构Web端的工业MES系统设计之区分工业过程

云架构Web端的工业MES系统设计之区分工业过程 在当今数字化浪潮席卷全球的背景下,制造业作为国家经济发展的重要支柱产业,正面临着前所未有的机遇与挑战。市场需求的快速变化、客户个性化定制要求的日益提高以及全球竞争的愈发激烈,都促使制造企业必须寻求更加高效、智能的生产…

嵌入式linux中socket控制与实现

一、概述 1、首先网络,一看到这个词,我们就会想到IP地址和端口号,那IP地址和端口各有什么作用呢? (1)IP地址如身份证一样,是标识的电脑的,一台电脑只有一个IP地址。 (2)端口提供了一种访问通道,服务器一般都是通过知名端口号来识别某个服务。例如,对于每个TCP/IP实…

VScode SSH 错误:Got bad result from install script 解決

之前vscode好好的&#xff0c;某天突然连接报错如下 尝试1. 服务器没有断开,ssh可以正常连接 2. 用管理员权限运行vscode&#xff0c;无效 3. 删除服务器上的~/.vscode-server 文件夹&#xff0c;无效 试过很多后&#xff0c;原来很可能是前一天anaconda卸载导致注册表项 步…

GPT分区 使用parted标准分区划分,以及相邻分区扩容

parted 是一个功能强大的命令行工具&#xff0c;用于创建和管理磁盘分区表和分区。它支持多种分区表类型&#xff0c;如 MBR&#xff08;msdos&#xff09;、GPT&#xff08;GUID Partition Table&#xff09;等&#xff0c;并且可以处理大容量磁盘。parted 提供了一个交互式界…

关系分类(RC)模型和关系抽取(RE)模型的区别

目标不同 关系分类模型&#xff1a;对给定的实体对在给定句子中预测其关系类型。两阶段&#xff08;RC&#xff09; 关系抽取模型&#xff1a;从句子中识别出所有潜在实体对&#xff0c;并为其预测关系类型。一阶段&#xff08;NERRE&#xff09; 训练/预测阶段输入输出数据不…

VSCode编辑+GCC for ARM交叉编译工具链+CMake构建+OpenOCD调试(基于STM32的标准库/HAL库)

一、CMake安装 进入CMake官网的下载地址Get the Software&#xff0c;根据系统安装对应的Binary distributions。 或者在CMake——国内镜像获取二进制镜像安装包。 或者访问GitHub的xPack项目xPack CMake v3.28.6-1&#xff0c;下载即可。 记得添加用户/系统的环境变量&#…

【数据结构】链表(2):双向链表和双向循环链表

双向链表&#xff08;Doubly Linked List&#xff09; 定义&#xff1a; 每个节点包含三个部分&#xff1a; 数据域。前驱指针域&#xff08;指向前一个节点&#xff09;。后继指针域&#xff08;指向下一个节点&#xff09;。 支持从任意节点向前或向后遍历。 #define dat…

RK3588+麒麟国产系统+FPGA+AI在电力和轨道交通视觉与采集系统的应用

工业视觉识别系统厂家提供的功能主要包括&#xff1a; 这些厂家通过先进的视觉识别技术&#xff0c;实现图像的采集、处理与分析。系统能够自动化地完成质量检测、物料分拣、设备监控等任务&#xff0c;显著提升生产效率和产品质量。同时&#xff0c;系统具备高度的灵活性和可扩…

3 抢红包系统

我们还是按照我们分析问题的方法论开展 一 场景分析 我们分析的是集体活动的抢红包&#xff0c;比如春晚&#xff0c;大型活动红包&#xff0c;需要在网页操作的抢红包 抢红包的问题也是多个人抢资源的问题&#xff0c;可以和秒杀进行比对。但是也有很多不同的地方。 用户打…

数据库中的并发控制

并发操作带来的数据不一致性 1、并发控制:为了保证事务的隔离性和一致性&#xff0c;数据库管理系统需要对并发操作进行正确调度 并发控制的主要技术有:封锁、时间戳、乐观控制法、多版本并发控制等 并发操作带来的数据不一致性: ① 丟失修改:两个事务 T1 和 T2 读入同一数据…