Kafka的Offset(偏移量)详解

Kafka的Offset详解

  • 1、生产者Offset
  • 2、消费者Offset
    • 2.1、消费者
    • 2.2、生产者
    • 2.3、实体类对象
    • 2.4、JSON工具类
    • 2.5、项目配置文件
    • 2.6、测试类
    • 2.7、测试
    • 2.8、总结

1、生产者Offset

在这里插入图片描述
在这里插入图片描述

2、消费者Offset

在这里插入图片描述

2.1、消费者

package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {/*** topics 用于指定从哪个主题中消费消息* concurrency 用于指定有多少个消费者* @param record*/@KafkaListener(topics = {"offSetTopic"}, groupId = "offSetGroup")public void onEventA(ConsumerRecord<String, String> record) {System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);}
}

2.2、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent(){for (int i = 0; i < 2; i++) {User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("offSetTopic","k"+i, userJson);}}}

2.3、实体类对象

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

2.4、JSON工具类

package com.power.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;public class JSONUtils {private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();public static String toJSON(Object object){try {return OBJECTMAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json,Class<T> clazz){try {return OBJECTMAPPER.readValue(json,clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}
}

2.5、项目配置文件

spring:application:#应用名称name: spring-boot-06-kafka-offset#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置消费者的反序列化consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.6、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot07KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}

2.7、测试

  • 先启动生产者,会发送两条消息到kafka服务器

  • 再启动消费者监听,此时我们发现,启动后的消费者并不会监听到生产者已发送的两条消息

  • 在kafka安装目录的bin文件夹下执行命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe
  • 根据命令结果:查看kafka消费者的偏移量offset,我们发现当前消费者偏移量CURRENT-OFFSET值为2 ,当前日志记录的生产者消息偏移量LOG-END-OFFSET值为2,消费者偏移量和日志记录的生产者消息偏移量差值LAG值为0 ,所以消费者查询不到生产者发送的消息。

在这里插入图片描述

  • 关闭消费者,再次使用生产者发送消息,再次执行命令查看消费者偏移量

在这里插入图片描述

  • 此时我们发现消费者偏移量为4,日志记录的偏移量为6,两者差值为2,此时启动消费者,读取到了差值为2的数据

2.8、总结

在这里插入图片描述

  • 消费者从什么地方开始消费,就看消费者的offset是多少,消费者启动后他的offset是多少。
  • 消费者offset是多少,可以通过命令查看
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/411790.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nexus 清理 docker 镜像

下载配置 nexus-cli 看网上文档都用如下地址&#xff0c;但现在已经不能下载&#xff1a; wget https://s3.eu-west-2.amazonaws.com/nexus-cli/1.0.0-beta/linux/nexus-cli chmod x nexus-cli 在 github 上下载&#xff1a; wget https://github.com/heyonggs/nexus-cli/r…

跟李沐学AI:转置卷积

定义 卷积不会增大输入的高宽&#xff0c;通常卷积层后高宽不变或减半。转置卷积则可以用来增大输入的宽高。 转置卷积是一种卷积&#xff0c;它将输入和核进行了重新排列&#xff0c;通常用作上采用。 如果卷积将输入从变为&#xff0c;同样超参数的情况下&#xff0c;转置…

Java中三大容器类(List、Set、Map)详解

三大容器介绍 名称结构特点常见实现类List&#xff08;列表&#xff09;由有序的元素序列组成&#xff0c;可以包含重复元素可以通过索引访问元素&#xff0c;插入的顺序与遍历顺序一致ArrayList、LinkedList、VectorMap&#xff08;映射&#xff09;由键值对(Key-Value)组成的…

SpringBoot项目中mybatis执行sql很慢的排查改造过程(Interceptor插件、fetchSize、隐式转换等)

刚入职公司&#xff0c;就发现公司项目跑sql特别慢&#xff0c;差不多一万条数据插入到数据库要5秒以上&#xff08;没有听错&#xff0c;就是这个速度&#xff09;&#xff0c;查询修改删除也是特别慢。直到22年年底实在是受不了了&#xff0c;我就去排查了一下。 用的是Oracl…

OpenCV小练习:身份证号码识别

目标&#xff1a;针对一张身份证照片&#xff0c;把身份证号码识别出来&#xff08;转成数字或字符串&#xff09;。 实现思路&#xff1a;需要将目标拆分成两个子任务&#xff1a;(1) 把身份证号码区域从整张图片中检测/裁剪出来&#xff1b;(2) 将图片中的数字转化成文字。第…

Ubuntu 22.04上稳定安装与配置搜狗输入法详细教程

摘要&#xff1a;本教程详细介绍了如何在Ubuntu 22.04上安装和配置搜狗输入法&#xff0c;每个步骤详细配图。由于在Ubuntu 24.04上存在兼容性问题&#xff0c;建议用户继续使用稳定的22.04版本。教程涵盖了从更新系统源、安装fcitx输入法框架&#xff0c;到下载和配置搜狗输入…

存储实验:基于华为存储实现存储双活(HyperMetro特性)

目录 什么是存储双活仲裁机制 实验需求实验拓扑实验环境实验步骤1. 双活存储存储初始化&#xff08;OceanStor v3 模拟器&#xff09;1.1开机&#xff0c;设置密码1.2登录DM&#xff0c;修改设备名、系统时间和导入License1.3 设置接口IP 2. 仲裁服务器配置&#xff08;Centos7…

全局点云配准的新思考:没有良好初值时如何配准?

更多优质内容&#xff0c;请关注公众号&#xff1a;智驾机器人技术前线 1.论文信息 论文标题&#xff1a;BiEquiFormer: Bi-Equivariant Representations for Global Point Cloud Registration 作者&#xff1a;Stefanos Pertigkiozoglou*, Evangelos Chatzipantazis∗ and K…

【循环顺序队的实现】

1.队列的逻辑结构 与 抽象数据类型定义 先进先出的线性表 在顺序队列中&#xff0c;我们使用头指针front指向队首元素&#xff1b;用尾指针rear指向队尾元素的下一个位置&#xff08;当然这里的指针是用下标模拟出来的&#xff09; 同时顺序队列中的元素当然是用数组来存储的 …

解决STM32使用J-Link可以擦除和读取但是无法烧录问题

现象 使用J-Link烧录模组固件&#xff0c;出现可以读取和擦除&#xff0c;但是无法烧录问题&#xff0c;提示错误如下&#xff1a; ERROR: Programming failed address 0x08000080 (program error)End of flash programmingERROR: Program failed 读出来的时候这个地址数据…

Linux 软件包管理器yum 自动化构建工具-make/makefile

Linux 工具 linux 软件包管理器 yum 把一些常用的软件提前编译好&#xff0c;做成软件包放在一个服务器上&#xff0c;通过包管理器可以很方便的获取到在这个编译好的软件包。直接进行安装。 软件包和软件包管理器就相当于 App 和应用商店这样的关系。 Linux 安装软件 源代码…

poe供电原理以及应用

1,根据IEEE802.3af标准,一个完整的PoE系统包括供电端设备PSE和受电端设备PD两部分; 供电设备PSE是整个系统的电源提供者,为PD设备提供直流电源,其可分为M

如何理解进程

一、进程的概念 进程&#xff1a;顾名思义&#xff0c;就是一个完整执行程序的过程。没错&#xff0c;就是这么简单&#xff0c;但是在程序执行的过程之中&#xff0c;系统会为这个执行的程序分配内存资源&#xff0c;这些过程也包含在进程当中。 进程是动态的&#xff0c;是程…

【网络编程通关之路】 Tcp 基础回显服务器(Java实现)及保姆式知识原理详解 ! ! !

本篇会加入个人的所谓鱼式疯言 ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人…

Linux下IO多路复用—select,poll,epoll

一.概述 1.IO多路复用介绍 IO多路复用是一种操作系统的技术&#xff0c;用于在单个线程或进程中管理多个输入输出操作。它的主要目的是通过将多个IO操作合并到一个系统调用中来提高系统的性能和资源利用率&#xff0c;避免了传统的多线程或多进程模型中因为阻塞IO而导致的资源…

在Linux下搭建go环境

下载go go官网&#xff1a;All releases - The Go Programming Language 我们可以吧压缩包下载到Windows上再传到Linux上&#xff0c;也可以直接web下载&#xff1a; wget https://golang.google.cn/dl/go1.23.0.linux-amd64.tar.gz 解压 使用命令解压&#xff1a; tar -x…

解决有向图中节点出度和入度计算问题

解决有向图中节点出度和入度计算问题 引言邻接链表表示法邻接链表的数据结构创建图添加边计算节点的出度伪代码C代码计算节点的入度伪代码C代码时间复杂度示例结论引言 在图论中,有向图是一种重要的数据结构,用于表示元素之间的方向性关系。有向图中的节点(顶点)通过边连接…

VBA之正则表达式(47)-- 快速将公式转换为静态值计算

实例需求&#xff1a;工作表I列包含多种计算公式&#xff0c;为了便于演示&#xff0c;将I列公式显示在J列单元格中&#xff0c;现在需要将公式的单元格引用转换为静态值&#xff0c;如K列所示。 示例代码如下。 Sub RegExpDemoReplace()Dim Res()Dim objRegEx As ObjectDim o…

[解决]Invalid configuration `aarch64-openwrt-linux‘: machine `aarch64-openwrt

背景 交叉编译libev-4.19 问题 checking host system type… Invalid configuration aarch64-openwrt-linux: machine aarch64-openwrt’ not recognized 解决 打开config.sub&#xff0c;在244行后添加"| aarch64-openwrt \ "

Git学习(001 git介绍以及安装)

尚硅谷2024最新Git企业实战教程&#xff0c;全方位学习git与gitlab 总时长 5:42:00 共40P 此文章包含第1p-第p4的内容 文章目录 介绍Git介绍GitLab介绍 概述Git安装版本控制工具介绍 介绍 Git介绍 GitLab介绍 相当于中央仓库 概述 Git安装 进入官网(下载当前版本 2.43.0) …