003 SpringBoot集成Kafka操作

4.SpringBoot集成Kafka

文章目录

  • 4.SpringBoot集成Kafka
      • 1.入门示例
      • 2.yml完整配置
      • 3.关键配置注释说明
        • 1. 生产者优化参数
        • 2. 消费者可靠性配置
        • 3. 监听器高级特性
        • 4. 安全认证配置
      • 4.配置验证方法
      • 5.不同场景配置模板
        • 场景1:高吞吐日志收集
        • 场景2:金融级事务消息
        • 场景3:跨数据中心同步
    • 5.高级配置
      • 1.事务支持
      • 2.消息重试与死信队列

来源参考的deepseek,如有侵权联系立删

1.入门示例

1.pom依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2.KafkaProducer消息生产者配置

@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}

springboot3.x写法

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
  • Spring Boot 2.xsend() 返回 ListenableFuture<SendResult>,支持 addCallback() 回调。
  • Spring Boot 3.xsend() 返回 CompletableFuture<SendResult>,弃用 ListenableFuture,因此需要使用 CompletableFuture 的 API(如 whenComplete)。

3.KafkaConsumer消息消费

@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}

4.yml配置文件

生产者配置

#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1

消费者配置

spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch

5.实体类

@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}

6.消息发送

@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}

kafka启动方式

./kafka-server-start.sh  ../config/server.properties

2.yml完整配置

spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092  # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app         # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer   # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer  # 值序列化器acks: all                         # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5                        # 发送失败重试次数(需配合幂等性使用)batch-size: 16384                 # 批量发送缓冲区大小(单位:字节)linger-ms: 50                     # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432           # 生产者内存缓冲区大小(默认32MB)compression-type: snappy          # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx-        # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group      # 消费者组ID(同一组共享分区)auto-offset-reset: earliest       # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false         # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500             # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500            # 消费者等待broker返回数据的最长时间isolation-level: read_committed   # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single                      # 监听器类型:single(单条)/batch(批量)ack-mode: manual                  # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3                    # 消费者线程数(建议等于分区数)poll-timeout: 3000                # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3                     # 最大重试次数initial-interval: 1000          # 初始重试间隔(毫秒)multiplier: 2.0                 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic}   # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT  # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN             # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events        # 业务输入Topicoutput-topic: processed-events  # 业务输出Topic

3.关键配置注释说明

1. 生产者优化参数
参数说明推荐值
acks=all确保所有ISR副本写入成功,防止数据丢失高可靠性场景必选
compression-type=snappy减少网络带宽占用,提升吞吐量消息体>1KB时启用
transaction-id-prefix支持跨分区原子性写入(需配合@Transactional注解)金融交易类业务必配
2. 消费者可靠性配置
参数说明注意事项
enable-auto-commit=false避免消息处理失败但Offset已提交导致数据丢失需手动调用ack.acknowledge()
isolation-level=read_committed只消费已提交的事务消息需与生产者事务配置联动
3. 监听器高级特性
参数使用场景示例
type=batch批量消费(提升吞吐量)适用于日志处理等实时性要求低的场景
concurrency=3并发消费者数需与Topic分区数一致,避免资源浪费
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
  • 企业级必配:生产环境需启用SSL加密+SASL认证

4.配置验证方法

  1. 启动检查:添加@ConfigurationProperties(prefix = "spring.kafka")绑定配置到Bean,通过单元测试验证注入值
  2. 日志监控:开启DEBUG日志观察生产者/消费者连接状态
   logging:level:org.springframework.kafka: DEBUG
  1. AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}

5.不同场景配置模板

场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true  # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips  # 支持多IP解析reconnect.backoff.ms: 1000          # 断线重连策略

5.高级配置

1.事务支持

// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}

2.消息重试与死信队列

spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/25002.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ROS ur10机械臂添加140夹爪全流程记录

ROS ur10机械臂添加140夹爪 系统版本&#xff1a;Ubuntu20.04 Ros版本&#xff1a;noetic Moveit版本&#xff1a;moveit-noetic 参考博客&#xff1a; ur3robotiq ft sensorrobotiq 2f 140配置rviz仿真环境_有末端力传感器的仿真环境-CSDN博客 UR5机械臂仿真实例&#xf…

Redis速成(1)VMware虚拟机安装Redis+Session验证登录注册+MybatisPlus

课程&#xff1a;黑马程序员Redis入门到实战教程&#xff0c;深度透析redis底层原理redis分布式锁企业解决方案黑马点评实战项目_哔哩哔哩_bilibili Mybatis与MybatisPlus: 参考springboot&#xff0c;需要额外写mapper.class&#xff0c;在方法上Select等 在ssm中&#xff0c;…

thinkphp下的Job队列处理

需要think-queue扩展&#xff0c;没有的请composer安装一下 "require": {"php": ">7.2.5","topthink/framework": "^6.1","topthink/think-orm": "^2.0","topthink/think-multi-app": &qu…

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(五)

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷&#xff08;五&#xff09; 第一部分&#xff1a;网络平台搭建与设备安全防护任务书第二部分&#xff1a;网络安全事件响应、数字取证调查、应用程序安全任务书任务 1&#xff1a;应急响应&…

JSON Schema 入门指南:如何定义和验证 JSON 数据结构

文章目录 一、引言二、什么是 JSON Schema&#xff1f;三、JSON Schema 的基本结构3.1 基本关键字3.2 对象属性3.3 数组元素3.4 字符串约束3.5 数值约束 四、示例&#xff1a;定义一个简单的 JSON Schema五、使用 JSON Schema 进行验证六、实战效果6.1 如何使用 七、总结 一、引…

VMware虚拟机Mac版安装Win10系统

介绍 Windows 10是由美国微软公司开发的应用于计算机和平板电脑的操作系统&#xff0c;于2015年7月29日发布正式版。系统有生物识别技术、Cortana搜索功能、平板模式、桌面应用、多桌面、开始菜单进化、任务切换器、任务栏的微调、贴靠辅助、通知中心、命令提示符窗口升级、文…

计算机网络:ICMP协议(Internet控制消息协议)介绍

目录 一、简介 二、为什么要有ICMP协议? 三、ICMP协议报文格式 四、ICMP报文的类型 4.1 差错报文 4.2 查询报文 五、ICMP报文的实际案例 5.1 Ping命令 5.2 Traceroute命令 总结 今天和大家聊聊ICMP协议相关的知识,感兴趣的可以一起了解一下! 一、简介 ICMP(Inte…

python读取sqlite温度数据,并画出折线图

需求&#xff1a; 在Windows下请用python画出折线图&#xff0c;x轴是时间&#xff0c;y轴是温度temperature 和体感温度feels_like_temperature 。可以选择县市近1小时&#xff0c;近1天&#xff0c;近1个月的。sqlite文件weather_data.db当前目录下&#xff0c;建表结构如下…

window下kafka安装

kafka下载 下载好,直接解压即可 默认是带有zookeeper(注册中心) 需要先启动zookeeper zookeeper配置 先配置下zookeeper 这个data文件夹是自定建的 随意建在哪里 注意 这里斜杠用和linux一样 启动zookeeper 进入bin/windows目录 启动zookeeper zookeeper-server-start.ba…

开发HarmonyOS NEXT版五子棋游戏实战

大家好&#xff0c;我是 V 哥。首先要公布一个好消息&#xff0c;V 哥原创的《鸿蒙HarmonyOS NEXT 开发之路 卷1&#xff1a;ArkTS 语言篇》图书终于出版了&#xff0c;有正在学习鸿蒙的兄弟可以关注一下&#xff0c;写书真是磨人&#xff0c;耗时半年之久&#xff0c;感概一下…

2月26(信息差)

&#x1f30d;思科和英伟达新旧双王联手 目标重塑网络架构抢占下沉市场 &#x1f384;全球AI大混战升温&#xff01;超越Sora的阿里万相大模型开源 家用显卡都能跑 ✨小米15 Ultra、小米SU7 Ultra定档2月27日 雷军宣布&#xff1a;向超高端进发 1.全球首个&#xff01;人形机器…

物联网通信应用案例之《智慧农业》

案例概述 在智慧农业方面&#xff0c;一般的应用场景为可以自动检测温度湿度等一系列环境情况并且可以自动做出相应的处理措施如简单的浇水和温度控制等&#xff0c;且数据情况可远程查看&#xff0c;以及用户可以实现远程控制。 基本实现原理 传感器通过串口将数据传递到Wi…

C# Unity 唐老狮 No.1 模拟面试题

本文章不作任何商业用途 仅作学习与交流 安利唐老狮与其他老师合作的网站,内有大量免费资源和优质付费资源,我入门就是看唐老师的课程 打好坚实的基础非常非常重要: Unity课程 - 游习堂 - 唐老狮创立的游戏开发在线学习平台 - Powered By EduSoho 目录 C# 1.其他类型转object类…

网络安全扫描--基础篇

前言 1、了解互联网安全领域中日趋重要的扫描技术 2、了解在不同网络场景下扫描技术手段 3、熟悉linux下系统内核防护策略并能大件一个有效的系统防护体系 4、增强工作安全意识&#xff0c;并能有效的实践于工作场景中 目录 1、熟悉主机扫描工具&#xff08;fping&#xff0c;…

P8697 [蓝桥杯 2019 国 C] 最长子序列

P8697 [蓝桥杯 2019 国 C] 最长子序列 题目 分析代码 题目 分析 先分析一波xdm 题意呢就是在s中找有多少个能和t匹配的字符&#xff0c;注意&#xff1a;连续匹配&#xff0c;输出连续的次数 欧克&#xff0c;开始分析&#xff0c;首先&#xff0c;哎~字母&#xff01;还强调…

一篇docker从入门到精通

Docker Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙盒机制&#xff0c;相互之间不会有任何接口&#xff08;类似 iP…

TCP/IP 5层协议簇:物理层

目录 1. 物理层&#xff08;physical layer&#xff09; 2. 网线/双绞线 1. 物理层&#xff08;physical layer&#xff09; 工作设备&#xff1a;网线、光纤、空气 传输的东西是比特bit 基本单位如下&#xff1a;数字信号 信号&#xff1a;【模拟信号&#xff08;放大器&a…

生成对抗网络(GAN)

生成对抗网络&#xff08;GAN&#xff09;:生成对抗网络是一种深度学习模型&#xff0c;由 Ian Goodfellow 等人在 2014 年提出。GAN由生成器和判别器组成&#xff0c;生成器生成假数据&#xff0c;判别器区分真假数据。两者通过对抗训练不断提升&#xff0c;最终生成器能够生成…

FastJSON 默认行为:JSON.toJSONString 忽略 null 字段

完整的 FakeRegistrationController 代码&#xff0c;这让我可以全面分析后端逻辑&#xff0c;特别是为什么空的字段&#xff08;如 compareDate&#xff09;不返回给前端。我将详细分析代码的每个接口&#xff0c;尤其是与 list 请求和字段返回相关的部分&#xff0c;并解释原…

网络通信/IP网络划分/子网掩码的概念和使用

文章目录 概述子网的考题子网掩码的历史有/无类地址子网划分!子网掩码超网技术/CIDR子网掩码和路由IP子网掩码定义 网络规划网络规划-拆子网网络规划-组超网子网划分案例 区分于其他特殊IP地址IP地址和网络地址子网掩码和网络地址子网掩码和广播地址 子网间的通信其他 概述 本…