13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

目录

  • kafka 消费者API用法
    • 消费者API
    • 使用消费者API消费消息
    • 消费者消费消息的代码演示
      • 1、官方API示例
      • 2、创建消费者类
      • 3、演示消费结果
        • 1、演示消费者属于同一个消费者组
        • 2、演示消费者不属于同一个消费者组
        • 3、停止线程不适用
        • 4、一些参数解释
    • 代码
      • 生产者:MessageProducer
      • 消费者 Consumer01
      • 消费者 Consumer02
      • pom.xml

kafka 消费者API用法

消费者API

消费者API的核心类是 KafkaConsumer,它提供了如下常用方法:

- subscribe(Collection<String> topics):订阅主题。- subscribe(Pattern pattern):订阅符合给定正则表达式的所有主题。- subscription():返回该消费者所订阅的主题集合。- unsubscribe():取消订阅。- close():关闭消费者。- poll(Duration timeout):拉取消息。- assign(Collection<TopicPartition> partitions):手动为该消费者分配分区。- assignment():返回分配该消费者的分区集合。- commitAsync():异步提交offset。- commitSync():同步提交offset。提示:如果开启了自动提交offset,则无需调用上面commitAsync()或commitSync()方法进行手动提交;自动提交offset比较方便,但手动提交offset则更精确,消费者程序可以等到消息真正被处理后再手动提交offset。——该选项有点类似于JMS、RabbitMQ的消息消费者的,消息确认机制。- enforceRebalance():强制执行重平衡。

下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。

- seek(TopicPartition partition, long offset):跳到指定的offset处,即下一条消息从offset处开始拉取。- seekToBeginning(Collection<TopicPartition> partitions):跳到指定分区的开始处。- seekToEnd(Collection<TopicPartition> partitions):跳到指定分区的结尾处。- position(TopicPartition partition):返回指定分区当前的offset。


使用消费者API消费消息

根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步:

1、创建KafkaConsumer对象,创建该对象时要传入Properties对象,用于对该消费者进行配置。

2、调用KafkaConsumer对象的poll()方法拉取消息,该方法返回ConsumerRecords。

3、对ConsumerRecords执行迭代,即可获取到抓取的每条消息。

4、程序结束时,取消订阅,关闭KafkaConsumer。



消费者消费消息的代码演示

1、官方API示例

KafkaConsumer

在这里插入图片描述

2、创建消费者类

在上一篇的生产者项目中,再写2个消费者来消费消息

Kafka 生产者API 用法

如图,创建2个消费者类,这个是消费者01,消费者02和01都是一模一样的。

在这里插入图片描述

在这里插入图片描述

注意: 消费者02多加了这 个reset 配置,就是消费者02在启动监听的时候,只会读取最新的消息,并不会读取之前的存量消息(就是消费者02还没启动起来之前,生产者发送的消息)

而消费者01没有这个设置,就是当消费者01启动后,只要该主题有消息还没有被消费,这个消费者01就会去读取并消费所有消息。
在这里插入图片描述



3、演示消费结果

在这里插入图片描述



1、演示消费者属于同一个消费者组

如上图,可以看出,两个消费者属于同一个消费者组 ConsumerGroupTest_01 ,所以两个消费者消费到的消息是不重复的。因为每个消费者消费的分区都是不同的。

演示前预期结果:因为两个消费者属于同一个消费者组,所以每个消费者消费的分区都是不同的,也就是不会重复消费消息

在这里插入图片描述

演示结果:

演示步骤:启动两个消费者实例,然后启动生产者,往test2主题中发送20条消息,10条消息带key,10条消息不带key,大概率这各10条的消息就会被分配在不同的2个分区中。
根据kafka默认的分区消费规则,应该是一个消费者消费一个分区的消息

生产者发送消息:
生产者代码在这篇:
Kafka 生产者API 用法

在这里插入图片描述

在这里插入图片描述

消费者消费:

如图:消费者01 获取到了带key的消息并消费,消费者02 获取到了不带key的消息并消费,这里的消费消息先弄成打印就可以了。

类似集群模式,就是消息只能被一个消费者消费

在这里插入图片描述



2、演示消费者不属于同一个消费者组

因为两个消费者不属于同一个消费者组,所以两个消费者都能消费到test2主题下的所有分区的消息。

演示步骤:其他代码没变,只是修改了他们所属的消费者组
在这里插入图片描述

在这里插入图片描述

演示结果如图:两个消费者不属于同一个消费者组,每个消费者都能消费到所有消息,

类似于广播模式、或者是发布/订阅模式,
发布/订阅模型可以让一条消息能被多个消费者消费

在这里插入图片描述



3、停止线程不适用

这个停止消费者的线程好像没有用,如图,我生产者再发送消息后,这个消费者还是能消费到消息,并没有想象中的被停止。
现阶段要关闭消费者的话,直接关闭项目就可以了

在这里插入图片描述



4、一些参数解释

在这里插入图片描述

auto.offset.reset

设置从哪里读取消息

在这里插入图片描述



代码

生产者:MessageProducer

package cn.ljh;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Properties: Kafka 设计了 Properties 来封装所有的配置属性* <p>* KafkaProducer:用来创建消息生产者,是 生产者API 的核心类,* 它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象* <p>* ProducerRecord:代表了一条消息,Kafka 的消息是包含了key、value、timestamp*/
public class MessageProducer
{//主题常量public static final String TEST_TOPIC = "test2";public static void main(String[] args){//Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性Properties props = new Properties();//指定连接Kafka的地址,多个地址之间用逗号隔开props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");//指定Kafka的消息确认机制//0:不等待消息确认;1:只等待领导者分区的消息写入之后确认;all:等待所有分区的消息都写入之后才确认props.put("acks", "all");//指定消息发送失败的重试多少次props.put("retries", 0);//控制生产者在发送消息之前等待的时间//props.put("linger.ms", 3);//设置序列化器props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//1、创建 KafkaProducer 时,需要传入 Properties 对象来配置消息生产者Producer<String, String> producer = new KafkaProducer<>(props);//2、发送消息for (int i = 0; i < 20; i++){var msg = "这是第【 " + (i + 1) + " 】条消息!";if (i < 10){//发送带 key 的消息producer.send(new ProducerRecord<String, String>(TEST_TOPIC, "ljh", msg));} else{//发送不带 key 的消息producer.send(new ProducerRecord<String, String>(TEST_TOPIC, msg));}}System.out.println("消息发送成功!");//3、关闭资源producer.close();}
}

消费者 Consumer01

package cn.ljh;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;public class Consumer01
{//组id:设置这个消费者实例是属于 ConsumerGroupTest_01 这个消费者组的public static final String GROUP_ID = "ConsumerGroupTest_01";//1、创建 KafkaConsumer 消费者对象 ,把这个消费者定义成成员变量public static KafkaConsumer<String, String> consumer = null;public static void main(String[] args){//Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性Properties props = new Properties();//指定连接Kafka的地址,多个地址之间用逗号隔开props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");//设置这个消费者实例属于哪个消费者组props.setProperty("group.id", GROUP_ID);//自动提交offset,就是类似之前的自动消息确认props.setProperty("enable.auto.commit", "true");//多个消息之间,自动提交消息的时间间隔props.setProperty("auto.commit.interval.ms", "1000");//设置session的超时时长,默认是10秒,这里设置15秒props.setProperty("session.timeout.ms", "15000");//设置每次都从最新的消息开始读取props.setProperty("auto.offset.reset","latest");//设置序列化器props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//1、创建 KafkaConsumer 消费者对象consumer = new KafkaConsumer<>(props);//2、订阅主题,订阅kafka集群中的test2主题consumer.subscribe(Arrays.asList(MessageProducer.TEST_TOPIC));//因为获取消息的循环是一个死循环,没法退出,所以我在这里再加一个线程来关闭这个消费者//启动一个线程来关闭这个 KafkaConsumernew Thread(() ->{//创建一个Scanner 类来读取控制台数据Scanner sc = new Scanner(System.in);//如果有下一行,就读取下一行while (sc.hasNextLine()){//获取控制台下一行的内容var str = sc.nextLine();//就是这个线程一直监听控制台,如果我们在控制台输出” :exit “,则关闭这个这个 KafkaConsumerif (str.equals(":exit")){//取消订阅consumer.unsubscribe();//关闭消费者对象consumer.close();}}}).start();//这是一个死循环,一直在获取主题中的消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("收到消息: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}
}

消费者 Consumer02

package cn.ljh;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;public class Consumer02
{//组id:设置这个消费者实例是属于 ConsumerGroupTest_02 这个消费者组的public static final String GROUP_ID = "ConsumerGroupTest_02";//1、创建 KafkaConsumer 消费者对象 ,把这个消费者定义成成员变量public static KafkaConsumer<String, String> consumer = null;public static void main(String[] args){//Properties 中所设置的key,有效的value,可以通过Kafka官方文档来查询生产者API支持哪些配置属性Properties props = new Properties();//指定连接Kafka的地址,多个地址之间用逗号隔开props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");//设置这个消费者实例属于哪个消费者组props.setProperty("group.id", GROUP_ID);//自动提交offset,就是类似之前的自动消息确认props.setProperty("enable.auto.commit", "true");//多个消息之间,自动提交消息的时间间隔props.setProperty("auto.commit.interval.ms", "1000");//设置session的超时时长,默认是10秒,这里设置15秒props.setProperty("session.timeout.ms", "15000");//设置每次都从最新的消息开始读取props.setProperty("auto.offset.reset","latest");//设置序列化器props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//1、创建 KafkaConsumer 消费者对象consumer = new KafkaConsumer<>(props);//2、订阅主题,订阅kafka集群中的test2主题consumer.subscribe(Arrays.asList(MessageProducer.TEST_TOPIC));//因为获取消息的循环是一个死循环,没法退出,所以我在这里再加一个线程来关闭这个消费者//启动一个线程来关闭这个 KafkaConsumernew Thread(() ->{//创建一个Scanner 类来读取控制台数据Scanner sc = new Scanner(System.in);//如果有下一行,就读取下一行while (sc.hasNextLine()){//获取控制台下一行的内容var str = sc.nextLine();//就是这个线程一直监听控制台,如果我们在控制台输出” :exit “,则关闭这个这个 KafkaConsumerif (str.equals(":exit")){//取消订阅consumer.unsubscribe();//关闭消费者对象consumer.close();}}}).start();//这是一个死循环,一直在获取主题中的消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("收到消息: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());}}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>kafkaproducertest</artifactId><version>1.0.0</version><!-- 项目名,和 artifactId 保持一致 --><name>kafkaproducertest</name><properties><!-- 在这里指定编译器的版本 --><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- 导入 Kafka 客户端APIJAR--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency></dependencies></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/246023.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flutter设置windows是否显示标题栏和状态栏和全屏显示

想要让桌面软件实现全屏和不显示状态栏或者自定义状态栏&#xff0c;就可以使用window_manager这个依赖库&#xff0c;使用起来还是非常方便的&#xff0c;可以自定义显示窗口大小和位置&#xff0c;还有设置标题栏是否展示等内容&#xff0c;也可以设置可拖动区域。官方仓库地…

万界星空科技可视化数据大屏的作用

随着科技的不断发展和进步&#xff0c;当前各种数据化的设备也是如同雨后春笋般冒了出来&#xff0c;并且其可以说是给我们带来了极大的便利的。在这其中&#xff0c;数据大屏就是非常具有代表性的一个例子。 数据大屏的主要作用包括&#xff1a; 数据分析&#xff1a;数据大屏…

【mongoDB】文档 CRUD

目录 1.插入文档 批量插入&#xff1a; 2.查询文档 3.更新文档 4.删除文档 deleteOne() deleteMany() findOneAndDelete() 1.插入文档 可以使用 insert () 方法或者 save() 方法向集合中插入文档 语法如下&#xff1a; db.collection_name.insert(document) collectio…

软考培训机构哪家比较好?各软考培训机构排名如何?

先放上机构测评图 一、机构情况 &#xff08;1&#xff09;主营业务 大多数软考培训机构主要致力于IT培训或者软件行业。这些机构的课程更加专业&#xff0c;因为他们起源于该行业。我相信报考软考的同学大部分也是从事这个行业的。个人认为选择这类机构进行培训会有更多好处…

[C#]winform部署yolov5实例分割模型onnx

【官方框架地址】 https://github.com/ultralytics/yolov5 【算法介绍】 YOLOv5实例分割是目标检测算法的一个变种&#xff0c;主要用于识别和分割图像中的多个物体。它是在YOLOv5的基础上&#xff0c;通过添加一个实例分割模块来实现的。 在实例分割中&#xff0c;算法不仅…

C++输入输出流

输入/输出流类&#xff1a;iostream---------i input&#xff08;输入&#xff09; o output&#xff08;输出&#xff09; stream&#xff1a;流 iostream&#xff1a; istream类&#xff1a;输入流类-------------cin&#xff1a;输入流类的对象 ostre…

Cesium加载地图-高德影像

废话不多说&#xff0c;直接上代码 整体代码 <template><div id"cesiumContainer" style"height: 100vh;"></div><div id"toolbar" style"position: fixed;top:20px;left:220px;"><el-breadcrumb><…

爬虫(一)

1. HTTP协议与WEB开发 1. 什么是请求头请求体&#xff0c;响应头响应体 2. URL地址包括什么 3. get请求和post请求到底是什么 4. Content-Type是什么1.1 简介 HTTP协议是Hyper Text Transfer Protocol&#xff08;超文本传输协议&#xff09;的缩写,是用于万维网&#xff08;…

【Python】02快速上手爬虫案例二:搞定验证码

文章目录 前言1、不要相信什么验证码的库2、以古诗文网为例&#xff0c;获取验证码1&#xff09;code_result.py2&#xff09;gsw.py 前言 提示&#xff1a;以古诗文网为例&#xff0c;获取验证码&#xff1a; 登录&#xff1a;https://so.gushiwen.cn/user/login.aspx 1、不…

牛刀小试 - C++ 推箱子小游戏

参考文档 C笔记&#xff1a;推箱子小游戏 copy函数 memcpy()函数用法&#xff08;可复制数组&#xff09; 使用memcpy踩出来的坑&#xff0c;值得注意 完整代码 /********************************************************************* 程序名:推箱子小游戏 说明&#x…

无状态应用管理Deployment

无状态应用管理Deployment 1、Deployment介绍 Deployment一般用于部署公司的无状态服务。 格式&#xff1a; apiVersion: apps/v1 kind: Deployment metadata: name: nginx-deployment labels: app: nginx spec: replicas: 3 selector: matchLabels: app: nginx …

2024亚马逊开店教程:开店准备与注册流程指南

随着新一年的到来&#xff0c;亚马逊开启了新一轮的卖家入驻&#xff0c;并且针对新卖家优化了入驻流程&#xff0c;下面为大家简单整理一下最新亚马逊入驻教程&#xff0c;有想要入驻开店的小伙伴速速看过来&#xff01; 一、开店前准备 1、账号环境准备 为了防止账号由于网…

k8s实例

k8s实例举例 &#xff08;1&#xff09;Kubernetes 区域可采用 Kubeadm 方式进行安装。 &#xff08;2&#xff09;要求在 Kubernetes 环境中&#xff0c;通过yaml文件的方式&#xff0c;创建2个Nginx Pod分别放置在两个不同的节点上&#xff0c;Pod使用动态PV类型的存储卷挂载…

Python 数据分析实战——为什么销售额减少?酒卷隆治_案例1

# 为什么黑猫游戏的销售额会减少&#xff1f; # 数据集 DAU : 每天至少来访问一次的用户数据 数据内容 数据类型 字段名 访问时间 string&#xff08;字符串&#xff09; log_data 应用名称 string&#xff08;字符串&#xff09; app_name 用户 ID int&#xff08;数值&…

使用代码取大量2*2像素图片各通道均值,存于Excel文件中。

任务是取下图RGB各个通道的均值及标签&#xff08;R, G&#xff0c;B&#xff0c;Label&#xff09;,其中标签由图片存放的文件夹标识。由于2*2像素图片较多&#xff0c;所以将结果放置于Excel表格中&#xff0c;之后使用SVM对他们进行分类。 from PIL import Image import os …

C++(搜索二叉树)

目录 前言&#xff1a; 1.二叉搜索树 1.1二叉搜索树的定义 1.2二叉搜索树的特点 2.二叉搜索树的实现 2.1框架 2.2查找 2.3插入 2.4删除 1.右子树为空 2.左子树为空 3.左右都不为空 3.递归版本 3.1前序遍历 3.2中序遍历 3.3后续遍历 3.4查找&#xff08;递…

智能分析网关V4智慧机房:视频AI智能安全监管方案

一、背景分析 随着互联网的迅猛发展&#xff0c;机房及其配套设施的数量持续攀升&#xff0c;它们的运行状况对于企业运营效率和服务质量的影响日益显著。作为企业信息化的基石&#xff0c;机房的安全监测与管理的重要性不容忽视。它不仅关乎企业的稳定运营&#xff0c;同时也直…

Linux编译器-gcc/g++

文章目录 前言预处理头文件展开条件编译 编译汇编链接 函数库静态库动态库 gcc选项 前言 gcc/g是Linux中的编译器&#xff0c;vim是Linux中的编辑器。要想将代码运行起来还需要编译才可实现。 本篇文章&#xff0c;主要通过预处理、编译、汇编、链接来介绍gcc/g。 预处理 预…

8.6 代理设计模式

文章目录 一、代理模式&#xff08;Proxy Pattern&#xff09;概述二、代理模式和观察者设计模式三、模式结构四、协作角色五、实现策略六、相关模式七、示例八、应用 一、代理模式&#xff08;Proxy Pattern&#xff09;概述 代理模式是一种设计模式&#xff0c;它通过引入一个…

【Unity学习笔记】第十一 · 动画基础(Animation、状态机、root motion、bake into pose、blendTree、大量案例)

转载引用请注明出处&#xff1a;&#x1f517;https://blog.csdn.net/weixin_44013533/article/details/132081959 作者&#xff1a;CSDN|Ringleader| 如果本文对你有帮助&#xff0c;不妨点赞收藏关注一下&#xff0c;你的鼓励是我前进最大的动力&#xff01;ヾ(≧▽≦*)o 主…