SpringBoot日常:集成Kafka

文章目录

      • 1、pom.xml文件
      • 2、application.yml
      • 3、生产者配置类
      • 4、消费者配置类
      • 5、消息订阅
      • 6、生产者发送消息
      • 7、测试发送消息

本章内容主要介绍如何在springboot项目对kafka进行整合,最终能达到的效果就是能够在项目中通过配置相关的kafka配置,就能进行消息的生产和消费。

1、pom.xml文件

原本项目用 Spring Boot 的版本为2.6.X,所以这里用spring-cloud-starter-stream-kafka的版本用的是2.2.1.RELEASE,也可以用其他版本,但是注意兼容性,不然会编译运行报错

<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.2</version>  <!-- 确保与 Spring Boot 2.6.x 兼容 --><scope>import</scope><type>pom</type></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>2.2.1.RELEASE</version></dependency>
</dependencies>

2、application.yml

添加kafka的相关配置

spring:kafka:bootstrap-servers: 192.168.102.179:9092producer:acks: 1retries: 0batch-size: 30720000buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer#消费者配置consumer:group-id: test-kafka#是否开启手动提交 默认自动提交enable-auto-commit: true#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔auto-commit-interval: 5000#earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费auto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#一次调用 poll() 操作时返回的最大记录数 默认为 500 条max-poll-records: 2#kafka session timeoutsession:timeout:ms: 300000listener:#kafka 没有创建指定的 topic 下  项目启动是否报错 true  falsemissing-topics-fatal: false#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息type: singleack-mode: manual_immediate

3、生产者配置类

添加一个生产者配置类KafkaProducerConfig ,主要设置消息的序列化方式等消息处理方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 11:33* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Bean("myProducerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(4);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> newProducerFactory() {return new DefaultKafkaProducerFactory<>(getMyKafkaProps());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(newProducerFactory());}}

4、消费者配置类

创建一个消费者配置类KafkaConsumerConfig,主要设置一些消息的接收处理配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 12:09* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String offsetReset;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-commit-interval}")private String autoCommitIntervalMs;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Bean("myConsumerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(12);//是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//kafak 服务器props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//消费组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//一次调用poll()操作时返回的最大记录数,默认值为500props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//自动提交时间间隔 默认 5秒props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);return props;}/*** 消费者工厂*/@Bean("myContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));// 并发创建的消费者数量factory.setConcurrency(3);// 开启批处理factory.setBatchListener(true);//拉取超时时间factory.getContainerProperties().setPollTimeout(1500);//是否自动提交 ACK kafka 默认是自动提交if (!enableAutoCommit) {//共有其中方式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);}return factory;}
}

5、消息订阅

创建一个消费者监听消息类,里面对主题消息监听,这里的测试主题为testone

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Author 码至终章* @Date 2025/1/8 14:19* @Version 1.0*/
@Slf4j
@Component
public class MyKafkaConsumer {@KafkaListener(id = "my-kafka-consumer",idIsGroup = false, topics = "topicone",containerFactory = "myContainerFactory")public void listen(String message) {log.info("接收到主题消息,消息内容:{}", message);}
}

6、生产者发送消息

为了方便调用测试,这里在controller编写一个方法发送消息

@RestController
@Slf4j
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping ("/sendMessage")public void sendMessage(@RequestParam String message) {this.kafkaTemplate.send("topicone", message);}
}

7、测试发送消息

这里简单用postman调用接口发送一条消息
在这里插入图片描述
从idea的程序控制台可以看到消费者监听可以正常接收到消息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/504236.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

加速科技荣获“浙江省企业研究院”认定

近日&#xff0c;浙江省经济和信息化厅公布“2024年认定&#xff08;备案&#xff09;省级企业研发机构名单”。经过多轮严格评审和公示&#xff0c;加速科技荣获“省企业研究院”认定。这是加速科技继获国家级专精特新“小巨人”企业认定荣誉后的又一里程碑。 “浙江省企业研究…

mysql中查询json的技巧

前置工作 CREATE TABLE mk_task_record (task_id int NOT NULL AUTO_INCREMENT,task_name varchar(50) DEFAULT NULL,result_json json DEFAULT NULL,result_str longtext,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,PRIMARY KEY (task_id),KEY ta…

arcgis的合并、相交、融合、裁剪、联合、标识操作的区别和使用

1、相交 需要输入两个面要素&#xff0c;最终得到的是两个输入面要素相交部分的结果面要素。 2、合并 合并能将两个单独存放的两个要素类的内容&#xff0c;汇集到一个要素类里面。 3、融合 融合能将一个要素类内的所有元素融合成一个整体。 4、裁剪 裁剪需要输入两个面要…

【网络协议】静态路由详解

网络中的路由器通过以下两种方式之一发现远程网络&#xff1a; 静态配置路由动态路由协议 在本文&#xff0c;我们将学习关于静态路由的各种概念&#xff0c;例如如何配置静态路由、路由表如何进行决策、路由接口等相关知识。 文章目录 引言直连网络静态路由路由表原则原则1原…

C++ 复习总结记录六

C 复习总结记录六 模板初阶主要内容 1、泛型编程 2、函数模板 3、类模板 4、STL 简介 一 泛型编程 如何实现一个通用的交换函数 void Swap(int& left, int& right) {int temp left;left right;right temp; } void Swap(double& left, double& right…

Leecode刷题C语言之字符串中最大的3位相同数字

执行结果:通过 执行用时和内存消耗如下&#xff1a; char* largestGoodInteger(char* num) {int n strlen(num);char* res NULL;for (int i 0; i < n - 2; i) {if (num[i] num[i 1] && num[i 1] num[i 2]) {if (res NULL || strncmp(&num[i], res, 3)…

《繁星路》V1.8.3(Build16632266)官方中文学习版

《繁星路》官方中文版https://pan.xunlei.com/s/VODae2_2Z3QyMF02I5y321uHA1?pwdqgsh# 作为一款星际模拟游戏&#xff0c;完美融合了硬科幻元素与基地建设玩法&#xff0c;体验改造行星的恢弘与壮阔。化身人工意识AMI&#xff0c;遵照基本指示推进火星改造的各项工作&#xf…

《Spring Framework实战》9:4.1.4.依赖注入

欢迎观看《Spring Framework实战》视频教程 典型的企业应用程序不是由单个对象&#xff08;或Spring术语中的bean&#xff09;组成。即使是最简单的应用程序也有几个对象协同工作&#xff0c;以呈现最终用户所认为的连贯应用程序。下一节将解释如何从定义多个独立的bean定义到一…

STM32-笔记37-吸烟室管控系统项目

一、项目需求 1. 使用 mq-2 获取环境烟雾值&#xff0c;并显示在 LCD1602 上&#xff1b; 2. 按键修改阈值&#xff0c;并显示在 LCD1602 上&#xff1b; 3. 烟雾值超过阈值时&#xff0c;蜂鸣器长响&#xff0c;风扇打开&#xff1b;烟雾值小于阈值时&#xff0c;蜂鸣器不响…

云安全博客阅读(三)

WAF强固之盾&#xff1a;机器学习赋能下的语义分析 WAF 中&#xff0c;传统的基于正则的检测方法依赖正则的运营更新&#xff0c;以不断防护新的攻击方法&#xff1b; 主要流程为&#xff1a;HTTP包 -> payload解码 -> 正则匹配 但是&#xff0c;攻击者可以通过修改攻…

个人博客搭建(二)—Typora+PicGo+OSS

个人博客站—运维鹿: http://www.kervin24.top CSDN博客—做个超努力的小奚&#xff1a; 做个超努力的小奚-CSDN博客 一、前言 博客搭建完一直没有更新&#xff0c;因为WordPress自带的文档编辑器不方便&#xff0c;以前用CSDN写作的时候&#xff0c;习惯了Typora。最近对比了…

spring boot 集成 knife4j

1、knife4j介绍以及环境介绍 knife4j是为Java MVC框架集成Swagger生成Api文档的增强解决方案,前身是swagger-bootstrap-ui,取名knife4j是希望它能像一把匕首一样小巧,轻量,并且功能强悍!其底层是对Springfox的封装&#xff0c;使用方式也和Springfox一致&#xff0c;只是对接口…

案例解读 | 香港某多元化综合金融企业基础监控+网管平台建设实践

PART01 项目背景 01客户简介案例客户是一家创立20多年的香港某多元化综合金融企业&#xff0c;其业务范围涵盖证券、期货、资产管理、财富管理等&#xff0c;凭借广泛的业务网络和多元化的金融服务产品&#xff0c;在市场中拥有显著的影响力。02痛点分析随着业务版图的持续拓展…

KCP解读:C#库类图

本文是系列文章中的一篇&#xff0c;内容由浅到深进行剖析&#xff0c;为了方便理解建议按顺序一一阅读。 KCP技术原理 KCP解读&#xff1a;基础消息收发 KCP解读&#xff1a;重传机制 KCP解读&#xff1a;滑动窗口 KCP解读&#xff1a;拥塞控制 本系列的源码基于https://gith…

Nginx:Stream模块

什么是 Stream 模块? Stream 模块 是 Nginx 的一个核心模块,专为处理非 HTTP 协议的流量(TCP 和 UDP 流量)而设计。它可以用来负载均衡和代理 TCP 或 UDP 连接,适用于多种应用场景,如: 数据库集群(MySQL、PostgreSQL 等)邮件服务器(SMTP、IMAP、POP3)游戏服务器VoI…

Profinet转EtherNet/IP网关连接AB PLC的应用案例

某大型制造企业的生产车间同时采用了西门子 S7 - 1500 PLC 作为核心控制系统的一部分&#xff0c;负责主要生产流程的控制与数据处理&#xff1b;而在特定生产环节&#xff0c;由于历史设备遗留或工艺配套需求&#xff0c;存在使用 AB PLC 的情况。这就导致了在整个自动化生产系…

arcgisPro加载CGCS2000天地图后,如何转成米单位

1、导入加载的天地图影像服务&#xff0c;一开始是经纬度显示的。 2、右键地图&#xff0c;选择需要调整的投影坐标&#xff0c;这里选择坐标如下&#xff1a; 3、点击确定后&#xff0c;就可以调整成米单位的了。 4、切换后结果如下&#xff1a; 如有需要&#xff0c;可调整成…

2025新春烟花代码(二)HTML5实现孔明灯和烟花效果

效果展示 源代码 <!DOCTYPE html> <html lang"en"> <script>var _hmt _hmt || [];(function () {var hm document.createElement("script");hm.src "https://hm.baidu.com/hm.js?45f95f1bfde85c7777c3d1157e8c2d34";var …

机器人技术:ModbusTCP转CCLINKIE网关应用

在当今自动化生产与智能制造领域&#xff0c;ModbusTCP转CC-LinkIE网关KJ-MTCPZ-CCIES的应用正日益成为提升生产效率、实现设备间高效通信的重要技术手段。这一转换技术不仅打破了不同通信协议间的壁垒&#xff0c;还为机器人产品的应用提供了更为广阔的舞台。ModbusTCP作为一种…

Openwrt @ rk3568平台 固件编译实践(二)- ledeWRT版本

目录 ledeWRT介绍固件编译下载代码修改feed源更新并安装编译第三方软件包制作用于eMMC烧写的rootfs基于lede发行版验证烧写rk3568.img, LEDE wrt启动成功refhttps://blog.csdn.net/zc21463071/article/details/106751361介绍rk3568平台下, lede 大神版 openwrt固件的下载、编译…