SpringBoot基础Kafka示例

这里将生产者和消费者放在一个应用中

使用的Boot3.4.3

引入Kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml配置


spring:application:name: kafka-1#kafka连接地址kafka:bootstrap-servers: 127.0.0.1:9092#配置生产者producer:#消息发送失败重试次数retries: 0#一个批次可以使用内存的大小batch-size: 16384#一个批次消息数量buffer-memory: 33554432#键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer#值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:#是否自动提交enable-auto-commit: false#自动提交的频率auto-commit-interval: 1000#earliest	从分区的最早偏移量开始消费	需要消费所有历史消息  latest	从分区的最新偏移量开始消费,忽略历史消息	只关心新消息#none	如果没有有效的偏移量,抛出异常	严格要求偏移量必须存在#exception spring-kafka不支持auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:#用于配置消费者如何处理消息的确认  ack配置方式  这里指定由消费者手动提交偏移量#Acknowledgment.acknowledge() 方法来提交偏移量ack-mode: MANUAL_IMMEDIATEconcurrency: 4
test-1: group-1
test-2: group-2
test-3: group-3server:port: 8099

生产者示例,一般可能是一个MQTT接收消息入口

package com.hrui.kafka1.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author hrui* @date 2025/3/10 14:56*/
@RestController
public class EventProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/sendMessage")public String sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);return "Message sent to topic '" + topic + "': " + message;}@RequestMapping("/sendMessage2")public String sendMessage2() {//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();kafkaTemplate.send(message);return "Message sent to topic";}}

消费者示例

注意:如果配置了手动提交ack,那么

主要目的不仅仅是避免重复消费,而是为了确保消息的可靠处理和偏移量(offset)的正确提交。它可以避免重复消费,但更重要的是保证消息不会丢失,并且在消息处理失败时能够重新消费。

package com.hrui.kafka1.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @author hrui* @date 2025/3/10 15:57*/
@Component
public class EventConsumer {@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-1}'}")public void onMessage(ConsumerRecord<String,String> message){System.out.println("接收到消息1:"+message.value());}@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-2}'}")public void onMessage(String message){System.out.println("接收到消息2:"+message);}@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.GROUP_ID) String groupId) {try {System.out.println("接收到消息3:" + message + ", ack:" + ack + ", topic:" + topic + ", groupId:" + groupId);// 处理消息逻辑// ...} catch (Exception e) {// 处理异常,记录日志System.err.println("处理消息失败: " + e.getMessage());// 可以根据业务需求决定是否重新抛出异常}finally {// 手动提交偏移量ack.acknowledge();}}
}

生产者可选择异步或者同步发送消息

生产者发送消息有同步异步之说 那么消费者在消费消息时候 有没有同步异步之说呢???

在 Kafka 消费者中,消费消息的方式本质上是由 Kafka 的设计决定的,而不是由消费者代码显式控制的。Kafka 消费者在消费消息时,通常是以拉取(poll)的方式从 Kafka 服务器获取消息,然后处理这些消息。从这个角度来看,消费者的消费行为是同步的,因为消费者需要主动调用 poll 方法来获取消息。

然而,消费者的消息处理逻辑可以是同步异步的,具体取决于业务实现。以下是对消费者消费消息的同步和异步行为的详细分析:

 消费者的同步消费

在默认情况下,Kafka 消费者的消费行为是同步的,即:

  • 消费者通过 poll 方法从 Kafka 拉取一批消息。

  • 消费者逐条处理这些消息。

  • 每条消息处理完成后,消费者提交偏移量(offset)。

  • 消费者继续调用 poll 方法获取下一批消息。

特点:
  • 消息处理是顺序的,即一条消息处理完成后才会处理下一条消息。

  • 如果某条消息处理时间较长,会影响后续消息的处理速度。

  • 适合消息处理逻辑简单、处理时间较短的场景。

@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {try {System.out.println("接收到消息:" + message.value());// 同步处理消息逻辑processMessage(message);} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

2. 消费者的异步消费

在某些场景下,消费者可能需要以异步的方式处理消息,即:

  • 消费者通过 poll 方法拉取一批消息。

  • 将每条消息提交到一个线程池或异步任务中处理。

  • 消费者继续调用 poll 方法获取下一批消息,而不等待上一条消息处理完成。

特点:
  • 消息处理是并发的,可以提高消息处理的吞吐量。

  • 需要额外的线程池或异步任务管理机制。

  • 适合消息处理逻辑复杂、处理时间较长的场景。

示例代码:

@Autowired
private ExecutorService executorService; // 注入线程池@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {if (!StringUtils.hasText(message.value())) {ack.acknowledge();return;}// 提交异步任务处理消息executorService.submit(() -> {try {System.out.println("接收到消息:" + message.value());processMessage(message); // 异步处理消息} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}});
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

同步代码示例

@RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);try {//阻塞等待拿结果SendResult<String, String> sendResult = send.get();System.out.println("说明消息发送成功,如果不成功会抛出异常");} catch (Exception e) {throw new RuntimeException(e);}return "Message sent to topic";}

异步注册回调的方式

 @RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);//非阻塞  异步 注册回调异步通知send.thenAccept(result -> {System.out.println("消息发送成功");}).exceptionally(e->{System.out.println("发送失败");e.printStackTrace();return null;});return "Message sent to topic";}

如果需要发送的不是String类型 

那么要发送的不是String类型

KafkaTemplate<String,Object> kafkaTemplate;

一般来说可以专成JSON字符串发送

在引入spring-kafka的时候     KafkaAutoConfiguration中  配置了KafkaTemplate

Kafka<Object,Object>

如果需要用KafkaTemplate发送对象的时候

默认用的String序列化   会报错   除非将对象转为JSON字符串(一般可以这么做)

如果用对象的话   改成JsonSerializer  这样自动转JSON字符串

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/32199.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

µCOS-III从入门到精通 第十三章(事件标志组)

参考教程&#xff1a;【正点原子】手把手教你学UCOS-III实时操作系统_哔哩哔哩_bilibili 一、事件标志组简介 1、概述 &#xff08;1&#xff09;事件标志位是一个“位”&#xff0c;用来表示事件是否发生。 &#xff08;2&#xff09;事件标志组是一组事件标志位的集合&am…

LiveGBS流媒体平台GB/T28181常见问题-视频流安全控制HTTP接口鉴权勾选流地址鉴权后401Unauthorized如何播放调用接口流地址校验

LiveGBS流媒体平台GB/T28181常见问题频流安全控制HTTP接口鉴权勾选流地址鉴权后401Unauthorized如何播放调用接口流地址校验&#xff1f; 1、安全控制1.1、HTTP接口鉴权1.2、流地址鉴权 2、401 Unauthorized2.1、携带token调用接口2.1.1、获取鉴权token2.1.2、调用其它接口2.1.…

短视频下载去水印,用什么工具好?

去除视频和图片水印是许多用户的需求&#xff0c;尤其是在分享或保存内容时。以下是6款超好用的工具&#xff0c;帮助你轻松去除水印&#xff0c;享受纯净的视觉体验&#xff1a; 1. 易下载去水印小程序 特点&#xff1a; 操作简单&#xff0c;支持抖音、快手、小红书、哔哩哔哩…

Java Collection(3)——BinaryTree(二叉树)

前言 1.掌握树的基本概念 2.掌握二叉树概念及特性 3.掌握二叉树的基本操作 后面的优先级队列(大根堆&#xff0c;小根堆)也是基于二叉树实现的&#xff0c;所以理解好二叉树至关重要 1.树形结构 1.1描述 树是一种非线性的数据结构&#xff0c;它是由不为零个有限结点组成一…

[Echarts]图例换行时icon对齐标题

当前效果 目标效果 让图例中的“点”和标题同一行 代码 const data [{value: 100,name: 未标注},{value: 100,name: 已标注},{value: 100,name: 标注中} ];option {tooltip: {backgroundColor: #fff,extraCssText: box-shadow: 0px 0px 10px 0px rgba(0,0,0,0.15);,backgro…

Scala编程_实现Rational的基本操作

在Scala中实现一个简单的有理数&#xff08;Rational&#xff09;类&#xff0c;并对其进行加法、比较等基本操作. 有理数的定义 有理数是可以表示为两个整数的比值的数&#xff0c;通常形式为 n / d&#xff0c;其中 n 是分子&#xff0c;d 是分母。为了确保我们的有理数始终…

责任链模式如何减少模块之间的耦合

责任链模式如何减少模块之间的耦合 在复杂的软件系统中&#xff0c;模块之间的耦合是一个常见的问题。高耦合的代码不仅增加了维护成本&#xff0c;还会导致系统的扩展性和灵活性受限。当我们需要为不同的请求设计灵活的处理逻辑时&#xff0c;传统的硬编码方式会将请求的发送…

【最新】DeepSeek 实用集成工具有那些?

deepseek 系列github仓库地址 【主页】deepseek-aiDeepSeek-R1DeepSeek-V3DeepSeek-VL2【本文重点介绍】awesome-deepseek-integration 注意&#xff1a;以下内容来自awesome-deepseek-integration DeepSeek 实用集成&#xff08;awesome-deepseek-integration&#xff09; 将…

如何在Python下实现摄像头|屏幕|AI视觉算法数据的RTMP直播推送

技术背景 在直播应用开发中&#xff0c;RTMP推流是核心功能之一。本文将结合大牛直播SDK的Python接口实现&#xff0c;详细讲解如何在Python环境下进行RTMP推流开发。好多开发者都知道&#xff0c;在发布Python的RTMP推流demo示例之前&#xff0c;我们十年前已经发布了非常稳定…

不用 Tomcat?SpringBoot 项目用啥代替?

在SpringBoot框架中&#xff0c;我们使用最多的是Tomcat&#xff0c;这是SpringBoot默认的容器技术&#xff0c;而且是内嵌式的Tomcat。 同时&#xff0c;SpringBoot也支持Undertow容器&#xff0c;我们可以很方便的用Undertow替换Tomcat&#xff0c;而Undertow的性能和内存使…

LLM训练中常用的Benchmarks

在当今人工智能领域,大语言模型(LLM)凭借其在理解和生成人类自然语言文本方面的卓越表现,成为了备受瞩目的焦点。然而,随着LLM的广泛应用,如何对其性能进行准确、全面的评估成为了一个关键问题。在这样的背景下,大语言模型基准测试应运而生,它是评估LLM不可或缺的重要工…

基于深度学习的医学CT图像肺结节智能检测与语音提示系统【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

Selenium | 无法正常打开Google Chrome浏览器 转 Edge Chrome

目录 背景案例 换成 Edge Chrome 驱动下载 配置环境 代码案例 测试结果 背景案例 Python正常&#xff0c;环境正常&#xff0c;驱动正常&#xff0c;但是就是打不开浏览器&#xff0c;就是一直报错&#xff0c;导致很烦躁 换成 Edge Chrome 与 Google Chrome浏览器一样…

【JavaEE】文件操作和IO

【JavaEE】文件操作和IO 一、认识文件1.1 狭义和广义的文件概念1.2 文件路径1.3 文件的分类 二、Java 中操作⽂件2.1 File类2.2 代码演示 三、文件内容的读写 —— 数据流3.1 字节流和字符流字节流字符流 3.2 特别注意 四、实战演示4.1 查找删除文件4.2 普通文件的复制4.3 文件…

【数据挖掘】通过心脏病数据案例熟悉数据挖掘的完整过程

心脏病数据挖掘过程 一、加载数据源 # 如果没有安装数据源所依赖的库&#xff0c;则先安装数据源所在的python库: pip install ucimlrepo # 引入pandas和ucimlrepo import pandas as pd from ucimlrepo import fetch_ucirepo# fetch dataset Heart Disease dataset的Id为45 h…

【Golang】第二弹-----变量、基本数据类型、标识符

笔上得来终觉浅,绝知此事要躬行 &#x1f525; 个人主页&#xff1a;星云爱编程 &#x1f525; 所属专栏&#xff1a;Golang &#x1f337;追光的人&#xff0c;终会万丈光芒 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 一、变量 1.1基本介绍…

go个人论坛项目

搭建个人论坛 项目地址&#xff1a;MyForum: goginvue搭建论坛 - Gitee.com PS&#xff1a;有些地方没有写好&#xff0c;请选择性查看 初始化项目 创建目录结构 利用ini配置初始化框架 [server] AppMode debug HttpPort :3000 JwtKey "dhjasdkajh321"[databa…

日志系统项目——准备工作了解类的设计模式如单例模式、工厂模式、代理模式

1.六大原则 1.1 单一职责原则 类的职责应该单⼀&#xff0c;⼀个⽅法只做⼀件事。职责划分清晰了&#xff0c;每次改动到最⼩单位的⽅法或 类。 使⽤建议&#xff1a;两个完全不⼀样的功能不应该放⼀个类中&#xff0c;⼀个类中应该是⼀组相关性很⾼的函 数、数据的封装 ⽤例…

股指期货基差怎么计算?公式介绍

先说说啥是基差。简单来说&#xff0c;基差就是股指期货价格和现货指数价格之间的差值。就好比你手里有一张股票指数的“未来提货券”&#xff08;股指期货&#xff09;&#xff0c;但你现在就能买到股票指数&#xff08;现货指数&#xff09;&#xff0c;这两者之间的价格差&a…

Comfyui 与 SDwebui

ComfyUI和SD WebUI是基于Stable Diffusion模型的两种不同用户界面工具&#xff0c;它们在功能、用户体验和适用场景上各有优劣。 1. 功能与灵活性 ComfyUI&#xff1a;ComfyUI以其节点式工作流设计为核心&#xff0c;强调用户自定义和灵活性。用户可以通过连接不同的模块&…