SpringBoot整合Kafka

一、首先下载windows版本的Kafka

官网:Apache Kafka

二、启动Kafka

cmd进入到kafka安装目录:

1:cmd启动zookeeer

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

2:cmd启动kafka server

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

3:使用cmd窗口启动一个生产者命令:

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic Topic1

4:cmd启动zookeeer

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 -topic Topic1

 三、引入kafka依赖

       <!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

四、配置文件

server:port: 8080spring:application:name: kafka-demokafka:bootstrap-servers: localhost:9092producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer#      value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer#      value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 五、编写生产者发送消息

1:异步发送

@RestController
@Api(tags = "异步接口")
@RequestMapping("/kafka")
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("Topic3", JSON.toJSONString(message));}
}

1:同步发送

//测试同步发送与监听
@RestController
@Api(tags = "同步接口")
@RequestMapping("/kafka")
public class AsyncProducer {private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;//同步发送@GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("Topic3", JSON.toJSONString(message));//注意,可以设置等待时间,超出后,不再等候结果SendResult<String, Object> result = future.get(3, TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}}

六、消费者编写

@Component
public class KafkaConsumer {private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);//不指定group,默认取yml里配置的@KafkaListener(topics = {"Topic3"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}
}

 通过swagger,进行生产者发送消息,观察控制台结果

 至此,一个简单的整合就完成了。

后续会持续更新kafka相关内容(多多关注哦!)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/238907.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图解拒付平台:如何应对用户的拒付

这是《百图解码支付系统设计与实现》专栏系列文章中的第&#xff08;5&#xff09;篇。 本章主要讲清楚支付系统中拒付涉及的基本概念&#xff0c;产品架构、系统架构&#xff0c;以及一些核心的流程和相关领域模型、状态机设计等。 1. 前言 拒付在中国比较少见&#xff0c;但…

MPP架构和分布式架构的区别

前言&#xff1a;对大数据的数据处理需求&#xff0c;当前技术方向上存在两个不同的发展路线&#xff0c;MPP和分布式处理。两者数据处理的基本思路都是一样的&#xff0c;分布式并行处理再合并结果&#xff1b;但由于二者在处理架构上的差异&#xff0c;最终产品在应用需求性能…

LabVIEW在金属铜大气腐蚀预测评价系统中的应用

为了应对电子设备和仪器中金属铜因大气腐蚀带来的挑战&#xff0c;开发一种基于LabVIEW平台的先进预测评价系统。这个系统的设计宗旨是准确预测并评估在不同室内外环境中金属铜的腐蚀状况。我们团队在LabVIEW的强大数据处理和图形化编程支持下&#xff0c;结合实际的大气腐蚀数…

2024Flutter岗位面试题总结

StatelessWidget和StatefulWidget的区别是什么&#xff1f; StatelessWidget是一个不可变的类&#xff0c;充当UI布局中某些部分的蓝图&#xff0c;当某个组件在显示期间不需要改变&#xff0c;或者说没有状态&#xff08;State&#xff09;&#xff0c;你可以使用它。 Statef…

免费的域名要不要?

前言 eu.org的免费域名相比于其他免费域名注册服务&#xff0c;eu.org的域名后缀更加独特。同时&#xff0c;eu.org的域名注册也比较简单&#xff0c;只需要填写一些基本信息&#xff0c;就可以获得自己的免费域名。 博客地址 免费的域名要不要&#xff1f;-雪饼前言 eu.org…

AtCoder Beginner Contest 336 G. 16 Integers(图计数 欧拉路径转欧拉回路 矩阵树定理 best定理)

题目 给16个非负整数&#xff0c;x[i∈(0,1)][j∈(0,1)][k∈(0,1)][l∈(0,1)] 求长为n3的01串的方案数&#xff0c;满足长度为4的ijkl&#xff08;2*2*2*2&#xff0c;16种情况&#xff09;串恰为x[i][j][k][l]个 答案对998244353取模 思路来源 https://www.cnblogs.com/tz…

深度系统QT 环境搭建

1.QT安装 不折腾最新版直接去商店搜索QT安装。 2.修改su密码&#xff0c;安装需要权限 打开一个终端&#xff0c;然后输入下面的命令&#xff1a;按照提示输入密码按回车就行。 sudo passwd 回车后会出现让你输入现在这个账户的密码&#xff1a; 3.编译环境安装。 安…

生活自来水厂污水处理设备需要哪些

生活自来水厂是确保我们日常用水质量安全的重要设施。在自来水的生产过程中&#xff0c;污水处理设备是不可或缺的环节。那么&#xff0c;生活自来水厂的污水处理设备都有哪些呢&#xff1f;本文将为您详细介绍。 首先&#xff0c;生活自来水厂的污水处理设备主要包括预处理设备…

大语言模型系列-总述

大语言模型发展史 研究人员发现&#xff0c;扩展预训练模型&#xff08;Pre-training Language Model&#xff0c;PLM&#xff09;&#xff0c;例如扩展模型大小或数据大小&#xff0c;通常会提高下游任务的模型性能&#xff0c;模型大小从几十亿&#xff08;1 B 10亿&#x…

Linux常用命令大全(三)

系统权限 用户组 1. 创建组groupadd 组名 2. 删除组groupdel 组名 3. 查找系统中的组cat /etc/group | grep -n “组名”说明&#xff1a;系统每个组信息都会被存放在/etc/group的文件中1. 创建用户useradd -g 组名 用户名 2. 设置密码passwd 用户名 3. 查找系统账户说明&am…

MySQL的多表数据记录查询笔记

关系数据操作 合并查询数据记录 在MySQL中通过关键字UNION来实现并操作&#xff0c;即可以通过其将多个SELECT语句的查询结果合并在一起组成新的关系。 两张表&#xff0c;表1 和表2 带有关键字UNION的合并操作 关键字UNION会把查询结果集直接合并在一起&#xff0c;同时将…

力扣每日一题--2088. 统计农场中肥沃金字塔的数目

看到这道题有些人很容易放弃&#xff0c;其实这道题不是很难&#xff0c;主要是题目长&#xff0c;读的容易让人放弃&#xff0c;但是 只要抓住一些性质就可以解决该问题。 本题中的定义放到图像里其实就是个金字塔&#xff0c;下层的那部分比上一层的那部分&#xff0c;长度加…

What does `ResponseEntity` do?

ResponseEntity 作为 Spring MVC controller层 的 HTTP response&#xff0c;包含 status code, headers, body 这三部分。 正常场景 RestController Slf4j public class SearchController {AutowiredUserService userService;RequestMapping(value "/getAllStudents4&…

Androidmanifest文件加固和对抗

前言 恶意软件为了不让我们很容易反编译一个apk&#xff0c;会对androidmanifest文件进行魔改加固&#xff0c;本文探索androidmanifest加固的常见手法以及对抗方法。这里提供一个恶意样本的androidmanifest.xml文件&#xff0c;我们学完之后可以动手实践。 1、Androidmanife…

文心一言 vs. ChatGPT:哪个更胜一筹?

文心一言 vs. ChatGPT&#xff1a;从简洁美到深度思考的文本生成之旅 近年来&#xff0c;文本生成工具的崛起使得人们在表达和沟通方面拥有了更多的选择。在这个领域中&#xff0c;文心一言和ChatGPT作为两个备受瞩目的工具&#xff0c;各自以独特的优势展现在用户面前。本文将…

国际版WPS Office 18.6.1

【应用名称】&#xff1a;WPS Office 【适用平台】&#xff1a;#Android 【软件标签】&#xff1a;#WPS 【应用版本】&#xff1a;18.6.1 【应用大小】&#xff1a;160MB 【软件说明】&#xff1a;软件日常更新。WPS Office是使用人数最多的移动办公软件。独有手机阅读模式…

从零开始的神经网络

了解如何在没有框架帮助的情况下构建神经网络的基础知识&#xff0c;这些框架可能使其更易于使用 在 Python 中创建具有不同架构的复杂神经网络应该是任何机器学习工程师或数据科学家的标准做法。但是&#xff0c;真正了解神经网络的工作原理同样有价值。在本文中&#xff0c;…

代码随想录刷题笔记(DAY11)

今日总结&#xff1a;继续准备期末&#xff0c;今天的算法题目比较简单&#xff0c;晚上看看能不能再整理一篇前端的笔记。 Day 11 01. 有效的括号&#xff08;No. 20&#xff09; 题目链接 代码随想录题解 1.1 题目 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff…

【C++】__declspec含义

目录 一、__declspec(dllexport)如果这篇文章对你有所帮助&#xff0c;渴望获得你的一个点赞&#xff01; 一、__declspec(dllexport) __declspec(dllexport) 是 Microsoft Visual C 编译器提供的一个扩展&#xff0c;用于指示一个函数或变量在 DLL&#xff08;动态链接库&…

非常好用的个人工作学习记事本Obsidian

现在记事本有两大流派&#xff1a;Obsidian 和Notion&#xff0c;同时据说logseq也很不错 由于在FreeBSD下后两种都没有相关ports&#xff0c;所以优先尝试使用Obsidian Obsidian简介 Obsidian是基于Markdown文件的本地知识管理软件&#xff0c;并且开发者承诺Obsidian对于个…