SpingBoot集成kafka发送读取消息

SpingBoot集成kafka开发

    • kafka的几个常见概念
  • 1、springboot和kafka对应版本(重要)
  • 2、创建springboot项目,引入kafka依赖
    • 2.1、生产者EventProducer
    • 2.2、消费者EventConsumer
    • 2.3、启动生产者的方法SpringBoot01KafkaBaseApplication
    • 2.4、application.yml
    • 2.5、pom.xml
    • 2.6、启动springboot项目的启动类(Application)报错
  • 3、springboot集成kafka读取最早的消息
    • 3.1、如何设置消费者auto-offset-reset: earliest
    • 3.2、设置消费者auto-offset-reset: earliest后存在的问题
      • 3.2.1、修改消费组ID
      • 3.2.2、手动重置偏移量
          • 3.2.2.1、手动将偏移量设置为最早
          • 3.2.2.2、手动将偏移量设置为最新

kafka的几个常见概念

在这里插入图片描述

1、springboot和kafka对应版本(重要)

https://spring.io/projects/spring-kafka

在这里插入图片描述

2、创建springboot项目,引入kafka依赖

在这里插入图片描述

2.1、生产者EventProducer

package com.power.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent(){kafkaTemplate.send("hello-topic","hello kafka");}
}

2.2、消费者EventConsumer

package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"hello-topic"},groupId="hello-group")public void onEvent(String event){System.out.println("读取到的事件:"+event);}
}

2.3、启动生产者的方法SpringBoot01KafkaBaseApplication

执行一次该方法,会调用一次生产者发送一次消息。
即每执行一次,会调用EventProducer类下的sendEvent方法一次。

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid test01(){eventProducer.sendEvent();}
}

2.4、application.yml

spring:application:#应用名称name: spring-boot-01-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的服务器ip>:9092#配置生产者(有24个配置)#producer:#配置消费者(有24个配置)#consumer:

启动服务后发现报错:
在这里插入图片描述

2.5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath /></parent><groupId>org.powernode</groupId><artifactId>spring-boot-01-kafka-base</artifactId><version>0.0.1-SNAPSHOT</version><name>kafkaSpringBootProject</name><description>kafka project for Spring Boot</description><properties><java.version>8</java.version></properties><repositories><repository><id>central</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>2.8.0</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

2.6、启动springboot项目的启动类(Application)报错

项目启动类

package com.power;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);System.out.println("启动成功--------------------------");}
}

修改server.properties配置文件:
在这里插入图片描述修改前:

在这里插入图片描述
修改后:
在这里插入图片描述

3、springboot集成kafka读取最早的消息

已经被消费者读取/消费的消息,无法被新启动的消费组消息的,那么新启动的消费组该如何读取最早的消息呢,可以通过设置消费者auto-offset-reset: earliest去实现。
在这里插入图片描述

3.1、如何设置消费者auto-offset-reset: earliest

在这里插入图片描述

1、修改application.yml
在这里插入图片描述

3.2、设置消费者auto-offset-reset: earliest后存在的问题

在这里插入图片描述

3.2.1、修改消费组ID

原消费组ID
在这里插入图片描述
修改后的消费组ID
在这里插入图片描述4、新的消费组ID成功读取到之前的消息
在这里插入图片描述

3.2.2、手动重置偏移量

3.2.2.1、手动将偏移量设置为最早
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute

来到kafka安装目录下:

在这里插入图片描述执行如下命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-earliest --execute

执行后报错

在这里插入图片描述
需要先停掉服务,在去手动重置偏移量,此时重置偏移量成功,偏移量为0

3.2.2.2、手动将偏移量设置为最新
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-latest --execute

设置成功,此时偏移量已为最新:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/405833.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

map与set容器初识:初步运用map与set

前言&#xff1a; 本文主要讲解的时对于map与set容器的初步使用&#xff0c;希望大家对map与set容器不熟悉的看了之后可以快速运用set与map到日常中来。&#xff08;本文适合对vector等基础容器有一定基础的同学&#xff09; 一、set与map容器常见接口 迭代器接口与以往的所…

12 程序控制语句:循环控制(while、do-while、for、多重嵌套循环、死循环)

目录 1 while 循环 1.1 基本语法 1.2 流程图 1.3 计数循环 1.3.1 实现原则 1.3.2 案例&#xff1a;循环输出语句 1.3.3 案例&#xff1a;循环输出数字 7~15 1.3.4 案例&#xff1a;倒序输出数字 56 ~ 43 1.3.5 案例&#xff1a;输出 10&#xff08;包括 10&…

使用 Go 语言将 Base64 编码转换为 PDF 文件

使用Go语言将PDF文件转换为Base64编码-CSDN博客文章浏览阅读104次&#xff0c;点赞2次&#xff0c;收藏5次。本文介绍了如何使用 Go 语言将 PDF 文件转换为 Base64 编码&#xff0c;并保存到文件中。https://blog.csdn.net/qq_45519030/article/details/141224319 在现代编程中…

【WebSocket】websocket学习【二】

1.需求&#xff1a;通过websocket实现在线聊天室 2.流程分析 3.消息格式 客户端 --> 服务端 {"toName":"张三","message":"你好"}服务端 --> 客户端 系统消息格式&#xff1a;{"system":true,"fromName"…

SQL注入(head、报错、盲注)

目录 【学习目标、重难点知识】 【学习目标】 【重难点知识】 1. 报错注入 1.1 那么什么是报错注入呢&#xff1f; 1.2 报错注入原理 extractvalue函数 updatexml函数 1.3 靶场解析 靶场练习 2. HEAD注入 2.1 相关全局变量 2.2 靶场解析 burp暴力破解 靶场练习 3…

Spring核心思想讲解之控制反转(IOC)

控制反转概述 控制反转实现方式 XML方式 方式一 方式二 方式三 注解方式 第一步 第二步 第三步 依赖注入&#xff08;DI&#xff09;实现方式 XML方式 手动注入 set注入 构造器注入 自动注入 set注入 构造方法注入 注解方式 方式一&#xff1a; 方式二&…

Transformer模型中的Position Embedding实现

引言 在自然语言处理&#xff08;NLP&#xff09;中&#xff0c;Transformer模型自2017年提出以来&#xff0c;已成为许多任务的基础架构&#xff0c;包括机器翻译、文本摘要和问答系统等。Transformer模型的核心之一是其处理序列数据的能力&#xff0c;而Position Embedding在…

python之matplotlib (1 介绍及基本用法)

介绍 matplotlib是Python中的一个绘图库&#xff0c;它提供了一个类似于 MATLAB 的绘图系统。使用matplotlib你可以生成图表、直方图、功率谱、条形图、错误图、散点图等。matplotlib广泛用于数据可视化领域&#xff0c;是 Python 中最著名的绘图库之一。 同样matplotlib的安…

Java数组怎么转List,Stream的基本方法使用教程

Stream流 Java 的 Stream 流操作是一种简洁而强大的处理集合数据的方式,允许对数据进行高效的操作,如过滤、映射、排序和聚合。Stream API 于 Java 8 引入,极大地简化了对集合(如 List、Set)等数据的处理。 一、创建 Stream 从集合创建: List<String> list = Ar…

NGINX 之 location 匹配优先级

章节 1 NGINX 的源码安装 2 NGINX 核心配置详解 3 NGINX 之 location 匹配优先级 4 NGINX 基础参数与功能 目录 1 location 基础语法 1.1 location 语法说明表 1.2 URI部分简单介绍 2 location 匹配优先级 2.1 URI匹配的规则与顺序 2.2 精确匹配(location /1.txt) 2.3 区…

Python个人收入影响因素模型构建:回归、决策树、梯度提升、岭回归

全文链接&#xff1a;https://tecdat.cn/?p37423 原文出处&#xff1a;拓端数据部落公众号 “你的命运早在出生那一刻起便被决定了。”这样无力的话语&#xff0c;无数次在年轻人的脑海中回响&#xff0c;尤其是在那些因地域差异而面临教育资源匮乏的年轻人中更为普遍。在中国…

企业级WEB应用服务器——TOMCAT

一、WEB技术 1.1、HTTP协议和B/S 结构 最早出现了CGI&#xff08;Common Gateway Interface&#xff09;通用网关接口&#xff0c;通过浏览器中输入URL直接映射到一个 服务器端的脚本程序执行&#xff0c;这个脚本可以查询数据库并返回结果给浏览器端。这种将用户请求使用程…

AWS不同类型的EC2实例分别适合哪些场景?

Amazon Web Services&#xff08;AWS&#xff09;的弹性计算云&#xff08;EC2&#xff09;提供了多种实例类型&#xff0c;以满足不同的应用需求和工作负载。了解不同类型的 EC2 实例及其适用场景&#xff0c;可以帮助用户更好地优化性能和控制成本。九河云和大家一起了解一下…

安恒信息总裁宋端智,辞职了!活捉一枚新鲜出炉的餐饮人!

吉祥知识星球http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247485367&idx1&sn837891059c360ad60db7e9ac980a3321&chksmc0e47eebf793f7fdb8fcd7eed8ce29160cf79ba303b59858ba3a6660c6dac536774afb2a6330#rd 《网安面试指南》http://mp.weixin.qq.com/s?…

I/O模型

文章目录 I/O模型相关概念网络I/O模型阻塞型I/O模型非阻塞型I/O模型多路复用I/O型信号驱动式I/O型异步I/O模型 apache和nginx的区别&#xff0c;什么时候选择apache&#xff0c;什么时候选择nginx 文章相关连接如下&#xff1a; 如果想更多了解nginx&#xff0c;请点击&#x…

为什么要使用TikTok云手机

随着TikTok平台的日益繁荣&#xff0c;TikTok云手机作为一种新兴的运营工具&#xff0c;正以其独特的云端技术和用户体验&#xff0c;赢得广大用户的青睐。相较于传统手机&#xff0c;TikTok云手机通过云端技术为用户带来了一系列新的优势&#xff0c;让TikTok运营变得更加灵活…

涂料耐久性氙灯老化试验箱

涂料氙灯老化试验箱是现代检测手段中常用的一种设备&#xff0c;它能够模拟自然光照、光照老化等环境条件&#xff0c;对涂料、染料、塑料、橡胶、纺织品、涂层等材料进行老化试验&#xff0c;以评估其耐久性和使用寿命。本文将详细介绍涂料氙灯老化试验箱的工作原理、使用注意…

正则表达式——详解

正则表达式是什么&#xff1f; 正则表达式&#xff08;Regular Expression&#xff0c;通常简写为 regex、regexp 或 RE&#xff09;是一种强大的文本处理工具&#xff0c;用于描述一组字符串的模式。它可以用来匹配、查找、替换等操作&#xff0c;几乎所有现代编程语言都支持…

【流媒体】RTMPDump—RTMP_Connect函数(握手、网络连接)

目录 1. RTMP_Connect函数1.1 网络层连接&#xff08;RTMP_Connect0&#xff09;1.2 RTMP连接&#xff08;RTMP_Connect1&#xff09;1.2.1 握手&#xff08;HandShake&#xff09;1.2.2 RTMP的NetConnection&#xff08;SendConnectPacket&#xff09; 2.小结 RTMP协议相关&am…

2024计算机软考报名流程(电脑报名)

1.24年下半年软考报名时间&#xff0c;各省报名时间不一样&#xff0c; 报名时间大概集中在&#xff1a;24年8月19日&#xff5e;24年9月15日&#xff1b; 报名网站&#xff1a;中国计算机技术职业资格网&#xff1b; 广东&#xff1a;2024年8月21日9:00至29日17:00 安徽&#…