SpringBoot日常:集成Kafka

文章目录

      • 1、pom.xml文件
      • 2、application.yml
      • 3、生产者配置类
      • 4、消费者配置类
      • 5、消息订阅
      • 6、生产者发送消息
      • 7、测试发送消息

本章内容主要介绍如何在springboot项目对kafka进行整合,最终能达到的效果就是能够在项目中通过配置相关的kafka配置,就能进行消息的生产和消费。

1、pom.xml文件

原本项目用 Spring Boot 的版本为2.6.X,所以这里用spring-cloud-starter-stream-kafka的版本用的是2.2.1.RELEASE,也可以用其他版本,但是注意兼容性,不然会编译运行报错

<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.2</version>  <!-- 确保与 Spring Boot 2.6.x 兼容 --><scope>import</scope><type>pom</type></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>2.2.1.RELEASE</version></dependency>
</dependencies>

2、application.yml

添加kafka的相关配置

spring:kafka:bootstrap-servers: 192.168.102.179:9092producer:acks: 1retries: 0batch-size: 30720000buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer#消费者配置consumer:group-id: test-kafka#是否开启手动提交 默认自动提交enable-auto-commit: true#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔auto-commit-interval: 5000#earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费auto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#一次调用 poll() 操作时返回的最大记录数 默认为 500 条max-poll-records: 2#kafka session timeoutsession:timeout:ms: 300000listener:#kafka 没有创建指定的 topic 下  项目启动是否报错 true  falsemissing-topics-fatal: false#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息type: singleack-mode: manual_immediate

3、生产者配置类

添加一个生产者配置类KafkaProducerConfig ,主要设置消息的序列化方式等消息处理方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 11:33* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Bean("myProducerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(4);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> newProducerFactory() {return new DefaultKafkaProducerFactory<>(getMyKafkaProps());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(newProducerFactory());}}

4、消费者配置类

创建一个消费者配置类KafkaConsumerConfig,主要设置一些消息的接收处理配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 12:09* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String offsetReset;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-commit-interval}")private String autoCommitIntervalMs;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Bean("myConsumerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(12);//是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//kafak 服务器props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//消费组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//一次调用poll()操作时返回的最大记录数,默认值为500props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//自动提交时间间隔 默认 5秒props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);return props;}/*** 消费者工厂*/@Bean("myContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));// 并发创建的消费者数量factory.setConcurrency(3);// 开启批处理factory.setBatchListener(true);//拉取超时时间factory.getContainerProperties().setPollTimeout(1500);//是否自动提交 ACK kafka 默认是自动提交if (!enableAutoCommit) {//共有其中方式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);}return factory;}
}

5、消息订阅

创建一个消费者监听消息类,里面对主题消息监听,这里的测试主题为testone

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Author 码至终章* @Date 2025/1/8 14:19* @Version 1.0*/
@Slf4j
@Component
public class MyKafkaConsumer {@KafkaListener(id = "my-kafka-consumer",idIsGroup = false, topics = "topicone",containerFactory = "myContainerFactory")public void listen(String message) {log.info("接收到主题消息,消息内容:{}", message);}
}

6、生产者发送消息

为了方便调用测试,这里在controller编写一个方法发送消息

@RestController
@Slf4j
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping ("/sendMessage")public void sendMessage(@RequestParam String message) {this.kafkaTemplate.send("topicone", message);}
}

7、测试发送消息

这里简单用postman调用接口发送一条消息
在这里插入图片描述
从idea的程序控制台可以看到消费者监听可以正常接收到消息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/444.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTTPS SSL/TLS 工作流程

目录 一、HTTP/HTTPS 简介1、HTTP协议相关内容2、HTTPS协议3、HTTP版本差异&#xff1a; 二、HTTPS 协议工作流程解析1. 客户端请求 SSL 握手2. 服务端接收 SSL 握手连接3. TLS 握手中的密钥协商4. HTTP 数据的加密与解密5. 安全性保障 三、HTTPS 协议的相关知识拓展1. TLS 与 …

Ubuntu中使用miniconda安装R和R包devtools

安装devtools环境包 sudo apt-get install gfortran -y sudo apt-get install build-essential -y sudo apt-get install libxt-dev -y sudo apt-get install libcurl4-openssl-dev -y sudo apt-get install libxml2.6-dev -y sudo apt-get install libssl-dev -y sudo apt-g…

解决SpringBoot无法使用JDK8问题

解决SpringBoot无法使用JDK8问题 现状解决方案 现状 使用idea创建springboot项目无法选择java8。原因是23年11月的spring更新后就明确了不在支持java8版本的项目创建&#xff0c;但是目前为止很多公司开发还在用java8&#xff0c;导致会有问题的产生。 解决方案 使用idea创…

八、系统托盘与配置面板

没有人会把你变得越来越好&#xff0c;时间和经历只是陪衬。 支撑你变得越来越好的&#xff0c;是你自己坚强的意志、修养、品行、以及不断的反思和经验。 人生最好的贵人&#xff0c;就是努力向上的自己。 一、系统托盘 1、资源文件夹 新建资源文件夹&#xff0c;我们需要把…

IntelliJ IDEA中Maven项目的配置、创建与导入全攻略

大家好&#xff0c;我是袁庭新。 IntelliJ IDEA是当前最流行的Java IDE&#xff08;集成开发环境&#xff09;之一&#xff0c;也是业界公认最好用的Java开发工具之一。IntelliJ IDEA支持Maven的全部功能&#xff0c;通过它我们可以很轻松地实现创建Maven项目、导入Maven项目、…

Element-plus、Element-ui之Tree 树形控件回显Bug问题。

需求&#xff1a;提交时&#xff0c;需要把选中状态和半选中状态 的数据id提交。如图所示&#xff1a; 数据回显时&#xff0c;会出现代码如下&#xff1a; <template><el-tree ref"treeRef" :data"tree" show-checkbox node-key"id" …

C语言#define定义宏

目录 一、什么是宏以及宏的声明方式 1.宏常量&#xff1a; 2.宏函数&#xff1a; 二、宏的替换原则 三、宏设计的易犯错误 ERROR1&#xff1a;尾部加分号&#xff08;当然有些特定需要加了分号&#xff0c;这里说明一般情况&#xff09; ERROR2&#xff1a;宏函数定义时&…

第33 章 - ES 实战篇 - MySQL 与 Elasticsearch 的一致性问题

思维导图 0. 前言 MySQL 与 Elasticsearch 一致性问题是老生常谈了。网上有太多关于这方面的文章了&#xff0c;但是千篇一律&#xff0c;看了跟没看没有太大区别。 在生产中&#xff0c;我们往往会通过 DTS 工具将 binlog 导入到 Kafka&#xff0c;再通过 Kafka 消费 binlog&…

Gitlab-Runner配置

原理 Gitlab-Runner是一个非常强大的CI/CD工具。它可以帮助我们自动化执行各种任务&#xff0c;如构建、测试和部署等。Gitlab-Runner和Gitlab通过API通信&#xff0c;接收作业并提交到执行队列&#xff0c;Gitlab-Runner从队列中获取作业&#xff0c;并允许在不同环境下进行作…

STM32第6章、WWDG

一、简介 WWDG&#xff1a;全称Window watchdog&#xff0c;即窗口看门狗&#xff0c;本质上是一个能产生系统复位信号和提前唤醒中断的计数器。 特性&#xff1a; 是一个递减计数器。 看门狗被激活后&#xff0c; 当递减计数器值从 0x40减到0x3F时会产生复位&#xff08;即T6位…

【Qt】事件、qt文件

目录 Qt事件 QEvent QMouseEvent QWheelEvent QKeyEvent QTimerEvent Qt文件 QFile QFileInfo Qt事件 在Qt中用一个对象表示一个事件&#xff0c;这些事件对象都继承自抽象类QEvent。事件和信号的目的是一样的&#xff0c;都是为了响应用户的操作。有两种产生事件的方…

Jenkins触发器--在其他项目执行后构建

前言&#xff1a; jenkins中有多种触发器可用&#xff0c;可以方便的控制构建的启动 这里简单介绍下项目后构建的配置方法 1. 解释&#xff1a; Build after other projects are built Set up a trigger so that when some other projects finish building, a new build is…

OpenStack 网络服务的插件架构

OpenStack 的网络服务具有灵活的插件架构&#xff0c;可支持多种不同类型的插件以满足不同的网络需求。以下是对 OpenStack 网络服务插件架构中一些常见插件类型的介绍&#xff1a; 一、SDN 插件 Neutron 与 SDN 的集成&#xff1a;在 OpenStack 网络服务里&#xff0c;SDN 插…

牛客网刷题 ——C语言初阶(6指针)——BC105 矩阵相等判定

1. 题目描述&#xff1a;BC105 矩阵相等判定 牛客网OJ题链接 描述&#xff1a; KiKi得到了两个n行m列的矩阵&#xff0c;他想知道两个矩阵是否相等&#xff0c;请你回答他。(当两个矩阵对应数组元素都相等时两个矩阵相等)。 示例1 输入&#xff1a; 2 2 1 2 3 4 1 2 3 4 输出…

SQLAlchemy

https://docs.sqlalchemy.org.cn/en/20/orm/quickstart.htmlhttps://docs.sqlalchemy.org.cn/en/20/orm/quickstart.html 声明模型 在这里&#xff0c;我们定义模块级构造&#xff0c;这些构造将构成我们从数据库中查询的结构。这种结构被称为 声明式映射&#xff0c;它同时定…

[SMARTFORMS] 导出SMARTFORMS表单数据

当我们配置好了Smartforms表单以后&#xff0c;如何在自开发的ALV程序报表中以PDF格式导出表单数据到电脑本地&#xff1f; 效果图 选择需要进行导出的采购凭证编号行数据&#xff0c;点击PDF格式导出按钮&#xff0c;弹出导出数据的信息窗口&#xff0c;点击"允许"…

seo泛目录(seo泛目录程序)

导言&#xff1a; 在搜索引擎优化&#xff08;SEO&#xff09;的领域中&#xff0c;泛目录程序被广泛应用于提升网站的可见性和排名。本文将深入探讨SEO泛目录程序的概念和作用&#xff0c;重点介绍它在网站优化中的重要性和优势&#xff0c;帮助读者了解SEO泛目录程序的工作原…

Trimble自动化激光监测支持历史遗产实现可持续发展【沪敖3D】

故事桥&#xff08;Story Bridge&#xff09;位于澳大利亚布里斯班&#xff0c;建造于1940年&#xff0c;全长777米&#xff0c;横跨布里斯班河&#xff0c;可载汽车、自行车和行人往返于布里斯班的北部和南部郊区。故事桥是澳大利亚最长的悬臂桥&#xff0c;是全世界两座手工建…

[人工智能自学] Python包学习-pandas

紧接上篇numpy的学习教程 本篇参考&#xff1a; Pandas 教程|菜鸟教程 官方教程 - 10分钟入门pandas joyful-pandas pandas中文教程 它建立在 NumPy 库的基础之上&#xff0c;提供了高效的数据结构和数据分析工具&#xff0c;使得在 Python 中进行数据操作变得更加容易和高效。…

【2024年华为OD机试】 (A卷,100分)- 二元组个数(Java JS PythonC/C++)

一、问题描述 以下是题目描述的 Markdown 格式&#xff1a; 题目描述 给定两个数组 a 和 b&#xff0c;若 a[i] b[j]&#xff0c;则称 [i, j] 为一个二元组。求在给定的两个数组中&#xff0c;二元组的个数。 输入描述 第一行输入 m&#xff0c;表示第一个数组的长度。第二…