使用Spring Boot和Kafka实现消息发送和订阅

文章目录

  • 一,新建Spring Boot
    • 1,Maven配置
    • 2,无法识别为SpringBoot项目
    • 3,无效的源发行版
    • 4,无法访问SpringApplication
    • 5,运行直接Finish
    • 6,服务运行成功
  • 二,安装启动Kafka
    • 1,下载
    • 2,配置
    • 3,启动
    • 4,其他命令
  • 三,生产消费消息
    • 1,加入依赖
    • 2,yam配置文件
    • 3,报错enabled mechanisms are []
    • 4,生产者生产消息
    • 5,订阅和消费消息
    • 6,接口
    • 7,测试结果
  • 四,参考博文

一,新建Spring Boot

最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目
在这里插入图片描述
注意Type选Maven,java选8,其他默认
在这里插入图片描述

1,Maven配置

点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来
在这里插入图片描述
在这里插入图片描述

2,无法识别为SpringBoot项目

在maven配置没问题的前提下,IDEA无法识别这是一个Spring Boot项目,倒腾半天,终于发现问题原因所在=======>是Maven版本太高的原因
在这里插入图片描述
把.mvn/wrapper目录下的maven-wrapper.properties文件第一行的版本号降低,比如说降为3.5.4,然后重新点下Maven的同步按钮
在这里插入图片描述

3,无效的源发行版

接下来运行项目报错:java: 无效的源发行版: 14
在这里插入图片描述
修改pom.xml中java.version值为8,原来是17

	<properties><java.version>17</java.version></properties>

4,无法访问SpringApplication

继续运行,继续报错在这里插入图片描述
降低spring-boot-starter-parent版本,原来是3.1.3,改为2.7.2

5,运行直接Finish

继续运行,没报错,服务直接Finished
在这里插入图片描述
需要添加web依赖

 		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

6,服务运行成功

终于,一个空的spring boot项目成功跑起来了,喜极而泣
在这里插入图片描述

二,安装启动Kafka

1,下载

官网=>https://kafka.apache.org/downloads,下载最新版的kafka,目前是3.5.1
在这里插入图片描述

2,配置

解压到D盘Config目录下即完成安装,目录为D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties

broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka

(2) zookeeper.properties

dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper

3,启动

先启动zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties	

再启动kafka

bin\windows\kafka-server-start.bat config\server.properties

停止的时候,先停止kafka,再停止zookeeper,直接ctrl+c停止

4,其他命令

1,查看topic列表

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2,查看topic具体信息

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

3,创建topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

三,生产消费消息

1,加入依赖

 		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2,yam配置文件

application.yaml

spring:profiles:active: dev

application-dev.yaml

server:port: 8082servlet:context-path: /test-kafkaspring:cache:type: ehcacheconfig: classpath:ehcache.xmljpa:database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialectkafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: kafka-demo-kafka-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 10

3,报错enabled mechanisms are []

Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

在这里插入图片描述
这个错误我本地测试下来是因为没把账号密码配置这块注释掉
在这里插入图片描述

4,生产者生产消息

@Slf4j
@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public String sendMessage(String content) {String topic = "test_topic";kafkaTemplate.send(topic, content).addCallback(success -> {String topic = success.getRecordMetadata().topic();int partition = success.getRecordMetadata().partition();long offset = success.getRecordMetadata().offset();log.info("发送成功:主题:{},分区:{},偏移量:{}",topic,partition,offset);}, failure -> {log.info("发送失败:{}",failure.getMessage());});return "发送成功";}
}

5,订阅和消费消息

一,订阅主题
1,获取消费者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.util.Properties;/*** kafka消费者配置* @author liuxunming*/
@Configuration
@Component
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);return consumer;}}

2,订阅topic

 		KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();consumer.subscribe(Collections.singleton("traffic"));

3,拉取消息

 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {String key = record.key();String value = record.value();log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}

4,消费位移,释放资源

// 提交消费位移
consumer.commitSync();
// 关闭消费者以释放资源
consumer.close();

二,点对点模式

@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = {"test_topic"})public void handlerMsg(String content) {log.info("接收到消息:消息值:{} ",content);}
}

6,接口

@Slf4j
@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@PostMapping("/sendMessage")public String sendMessage(@RequestParam String content) {kafkaProducer.sendMessage(content);return "ok";}
}

7,测试结果

在这里插入图片描述
接收到消息
在这里插入图片描述

四,参考博文

  1. 解决IDEA无法识别SpringBoot项目
  2. SpringBoot从入门到精通(十二)SpringBoot集成Kafka
  3. Kafka的下载安装以及使用
  4. Kafka消息消费流程详解
  5. Kafka之Consumer使用与基本原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/115875.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ROS 2官方文档(基于humble版本)学习笔记(二)

ROS 2官方文档&#xff08;基于humble版本&#xff09;学习笔记&#xff08;二&#xff09; 理解节点&#xff08;node&#xff09;ros2 runros2 node list重映射&#xff08;remap&#xff09;ros2 node info 理解话题&#xff08;topic&#xff09;rqt_graphros2 topic listr…

visual studio编写DLL,python调用

选择第一个c DLL&#xff0c; 然后项目源文件下右击新建项&#xff0c;这里名字随便取&#xff0c;在代码中输入一下内容&#xff1a; #include <iostream>#define EXPORT extern "C" __declspec(dllexport)EXPORT int sub(int a, int b) {return a - b; } 在…

【JS案例】JS实现积分抽奖(内附源码)

JS案例实现积分抽奖 &#x1f31f;效果展示 &#x1f31f;HTML结构 &#x1f31f;CSS样式 &#x1f31f;实现思路 &#x1f31f;具体实现 1.定义抽奖次数渲染 2.点击抽奖按钮,实现滚动抽奖效果 3.弹窗处理 &#x1f31f;完整代码 &#x1f31f;写在最后 &#x1f3…

docker安装jenkins

1.查看Jenkins LTS 版本 Jenkins官网&#xff1a;https://www.jenkins.io/zh/download/ 2.拉取jenkins镜像 docker pull jenkins/jenkins:2.346.13.创建挂载数据卷 创建/home/software/jenkins/jenkins_home文件夹 mkdir /home/software/jenkins/jenkins_home赋予/home/so…

Ubuntu20以上高版本如何安装低版本GCC

安装了Ubuntu 20.04之后&#xff0c;通过命令行 sudo apt-get install build-essential安装gcc&#xff0c;再通过命令行 gcc -v可查看gcc版本为gcc13 如果想用低版本的gcc&#xff0c;比如gcc4.8&#xff0c;尝试输入命令 sudo apt-get install gcc-4.8会提示找不到gcc4.8的…

0基础学习VR全景平台篇 第93篇:智慧景区教程

一、上传素材 1.上传全景素材 第一步&#xff1a;进入【素材管理】 第二步&#xff1a;选择【全景图智慧景区】分类 第三步&#xff1a;选择相对景区作品分组&#xff0c;上传全景素材 2.素材标注 第一步&#xff1a;选择上传成功后素材&#xff0c;点击【未标注】 第二步&…

HTTP与SOCKS5的区别对比

在互联网世界中&#xff0c;服务器是一种重要的工具&#xff0c;可以帮助我们提高网络安全性等。今天&#xff0c;我们将重点关注两种常见的技术&#xff1a;HTTP和SOCKS5。让我们深入了解它们的工作原理、用途和优缺点&#xff0c;并通过Python代码示例学习如何使用它们。 HT…

微信小程序 - 2023年最新版手机号快捷登录详细教程

前言 最近开发公司手机快捷登录的功能&#xff0c;花费了不少时间&#xff0c;这里附上详细教程。 这里以海底捞小程序的图片为例&#xff0c;如有侵权请联系小编删除。 代码如下 <button open-type"getPhoneNumber" getphonenumber"getPhoneNumber"…

java+springboot+mysql校园跑腿管理系统

项目介绍&#xff1a; 使用javaspringbootmysql开发的校园跑腿管理系统&#xff0c;系统包含超级管理员&#xff0c;系统管理员、用户角色&#xff0c;功能如下&#xff1a; 超级管理员&#xff1a;管理员管理&#xff1b;用户管理&#xff08;充值&#xff09;&#xff1b;任…

全新UI站长在线工具箱系统源码带后台开源版

该系统的全开源版本可供下载&#xff0c;并且支持暗黑模式。 系统内置高达72种站长工具、开发工具、娱乐工具等功能。此系统支持本地调用API&#xff0c;同时还自带免费API接口&#xff0c; 是一个多功能性工具程序&#xff0c;支持后台管理、上传插件、添加增减删功能。 环…

【Python】Web学习笔记_flask(7)——Jinja2模板(1)

Jinja2是基于python的模板引擎&#xff0c;功能类似于PHP的amarty、J2ee的Freemarker和velocity&#xff0c;完全支持Unicode&#xff0c;并具有集成的沙箱执行环境&#xff0c;Jinja2使用的事BSD协议&#xff0c;允许使用者修改和重新发布代码&#xff0c;也允许使用或在BSD代…

kafka 命令脚本说明以及在java中使用

一、命令行使用 1.1、topic 命令 1、关于topic,这里用window 来示例 bin\windows\kafka-topics.bat2、创建 first topic,五个分区&#xff0c;1个副本 bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 -…

Qt各个版本下载及安装教程(离线和非离线安装)

Qt各个版本下载链接&#xff1a; Index of /archive/qthttps://download.qt.io/archive/qt/ 离线安装 &#xff0c;离线安装很无脑&#xff0c;下一步下一步就可以。 我离线下载 半个小时把2G的exe下载下来了

frida动态调试入门02——hook加密函数

说明 frida是一款Python工具可以方便对内存进行hook修改代码逻辑在移动端安全和逆向过程中常用到。 前置知识 frida动态调试入门01——定位关键代码 https://blog.csdn.net/qq_41690468/article/details/132607065 定位函数 关键函数 String code RequestUtil.paraMap(ad…

php开发环境搭建_宝塔、composer

宝塔面板下载&#xff0c;免费全能的服务器运维软件 一 下载宝塔面板 解压安装 登录之后修改安全入口 1 进入软件商店下载nginx,mysql5.6,php7.2 2 将php的安装路径配置到环境变量中 此电脑--右键--点击属性---高级系统设置---环境变量---系统变量path---添加确定 输入php -v…

网络渗透day6-面试01

&#x1f609; 和渗透测试相关的面试问题。 介绍 如果您想自学网络渗透&#xff0c;有许多在线平台和资源可以帮助您获得相关的知识和技能。以下是一些受欢迎的自学网络渗透的平台和资源&#xff1a; Hack The Box: Hack The Box&#xff08;HTB&#xff09;是一个受欢迎的平…

ffmpeg 配合Fiddler抓包获取视频操作

一&#xff0e;获取普通网站视频 1.安装Fiddler软件&#xff0c;直接点击绿色软件中Fiddler.exe&#xff0c;打开即可 2.打开后需要设置一下https解码 3.打开普通视频&#xff0c;获取视频链接在网页打开即可 二&#xff0e;获取一级反爬网站视频 1.随便找一个video/mp…

专访远航汽车远勤山:踏踏实实做好产品 直面挑战乘风远航

8月25日&#xff0c;第二十六届成都国际汽车展览会在中国西部国际博览城隆重开幕。车展举办期间&#xff0c;远航汽车董事长远勤山先生、产品研发总监王震先生向媒体分享了远航汽车品牌发展、产品研发、技术创新以及市场布局等内容。 “通过我们的付出和努力&#xff0c;让我们…

使用 SQL 的方式查询消息队列数据以及踩坑指南

Pulsar-sql.png 背景 为了让业务团队可以更好的跟踪自己消息的生产和消费状态&#xff0c;需要一个类似于表格视图的消息列表&#xff0c;用户可以直观的看到发送的消息&#xff1b;同时点击详情后也能查到消息的整个轨迹。 消息列表 点击详情后查看轨迹 原理介绍 由于 Pulsar …

51单片机智能电风扇控制系统proteus仿真设计( 仿真+程序+原理图+报告+讲解视频)

51单片机智能电风扇控制系统仿真设计( proteus仿真程序原理图报告讲解视频&#xff09; 讲解视频1.主要功能&#xff1a;2.仿真3. 原理图4. 程序代码5.设计报告6. 设计资料内容清单 51单片机智能电风扇控制系统仿真设计( proteus仿真程序原理图报告讲解视频&#xff09; 仿真图…