spring boot3 kafka集群搭建到使用

首先自行安装docker,通过docker容器安装kafka
CentOS 系统 docker安装地址

 1.pom.xml和application.properties或者application.yml文件配置

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
spring:kafka:bootstrap-servers: [fafka地址1,fafka地址2,....]
#    producer序列化设置producer:#key序列化设置,设置成json对象
#      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#    val序列化设置,设置成json对象value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

2.博主安装了kafka ui插件,就直接创建主题了

当前一个集群,因为博主只搭建了一台服务器,也可以称为一个节点

创建主题

没有安装kafka ui,就再main那里启动项目时创建

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);TopicBuilder.name("my-new-topic")//主题.partitions(3)//分区.replicas(2)//副本.build();}}

副本就是备份,有几节点就可以创建几个副本,副本数量一般采取分区数量-1,只有一个节点就N分区1副本


 3.在main 加上这个注解@EnableKafka

package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka //扫描kafka注解,开启基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);}}

4.生产者发送消息

package com.atguigu.boot3_08_kafka.controller;import com.atguigu.boot3_08_kafka.entity.Person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/jjj") public String hello() {kafkaTemplate.send("tach", 0,"hello","急急急132");//send("主题", 分区号,"key","val")return "ok";}@GetMapping("/odj")public String odj() {kafkaTemplate.send("tach", 0,"hello",new Person(1L,"odj",19));//对象json需要序列化,可用配置文件配置,也可以在对象中序列化对象return "OK";}
}

5.消费者监听消息

package com.atguigu.boot3_08_kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;@Component
public class MykafkaListener {/*** 默认的监听是从最后一个消息开始拿,也就是只会拿新消息,不会拿历史的* @KafkaListener(topics = "主题",groupId = "用户组")* ConsumerRecord 消费者从 Kafka 获取消息的各种元数据和实际的消息* @param record*/@KafkaListener(topics = "tach",groupId = "teach")public void listen(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}/***  想要到历史的消息或者全部消息,只能设置偏移量*  @KafkaListener(groupId = "用户组" ,topicPartitions = {设置分区,设置偏移量})*  @TopicPartition(topic = "主题" ,partitionOffsets 设置偏移量)*  @PartitionOffset(partition = "哪个分区", initialOffset = "从第几个偏移量开始")** @param record*/@KafkaListener(groupId = "teach" ,topicPartitions = {@TopicPartition(topic = "tach" ,partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")})})public void listens(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}
}

最后查看结果


最后补充一个小知识

groupId = "用户组"

组里的成员是竞争模式

用户组和用户组之间是发布/订阅模式

由zookeeper分配管理

好了可以和面试官吹牛逼了


课外话

如果是传对象json需要序列化,创建对象时序列化,不推荐太原始重要是很占资源

因为开始我们都配置好了,有对象就会自动序列化

package com.atguigu.boot3_08_kafka.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person implements Serializable {//不推荐implements Serializable private Long id;private String name;private Integer age;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/35469.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker的anythingllm和open-webui压缩包分享(国内镜像拉取,百度云压缩包分享)

文章目录 前言第一部分&#xff1a;镜像获取&#x1f680; 方式一&#xff1a;切换国内下载镜像✅1. 下载anythingllm✅ 2. 下载open-webui &#x1f680;方式二&#xff1a;下载我分享的百度云✅ anythingllm压缩包百度云链接❎ open-webui压缩包 第二部分&#xff1a;下载之后…

【VBA】excel获取股票实时行情(历史数据,基金数据下载)

文章目录 0. 效果展示与获取其它相关内容&#xff1a; 1. Excel VBA 自动化与对象模型2. HTTP 请求与 API 数据获取3. JSON 数据解析与字符串处理4. 自动任务调度与实时刷新5. 错误处理与健壮性设计 0. 效果展示与获取 作品&#xff1a;https://mbd.pub/o/bread/aJaUmplq 需要…

docker的使用

时间&#xff1a;2025.3.17 一、当我们想要运行一个容器时&#xff0c;不是在containers处&#xff0c;而是需要在images处找对应容器的镜像 操作步骤&#xff1a; 1.找容器镜像 2.找到容器镜像&#xff0c;通过pull下载到当前主机中 3.下载成功后进行运行 4.运行时的容器镜像…

本地部署deepseek-r1建立向量知识库和知识库检索实践【代码】

目录 一、本地部署DS 二、建立本地知识库 1.安装python和必要的库 2.设置主目录工作区 3.编写文档解析脚本 4.构建向量数据库 三、基于DS,使用本地知识库检索 本地部署DS,其实非常简单,我写了一篇操作记录,我终于本地部署了DeepSeek-R1(图文全过程)-CSDN博客 安装…

Matlab 汽车传动系统的振动特性分析

1、内容简介 Matlab 186-汽车传动系统的振动特性分析 可以交流、咨询、答疑 2、内容说明 略 摘要&#xff1a;汽车动力传动系统是一个具有多自由度的、连续的、有阻尼系统。传动系统的振动主要有横向振动、扭转振动、纵向振动。并且汽车传动系统的扭转振动是一个非常重要的振…

【C++】树和二叉树的实现(上)

本篇博客给大家带来的是用C语言来实现数据结构树和二叉树的实现&#xff01; &#x1f41f;&#x1f41f;文章专栏&#xff1a;数据结构 &#x1f680;&#x1f680;若有问题评论区下讨论&#xff0c;我会及时回答 ❤❤欢迎大家点赞、收藏、分享&#xff01; 今日思想&#xff…

k8s环境部署

四台机器 分别是 k8s-master&#xff1a;172.25.254.100 k8s-node1&#xff1a;172.25.254.10 k8s-node2&#xff1a;172.25.254.20 docker-harbor&#xff1a;172.25.254.200 reg.timinglee.org 四台机器分别配置好网络和软件仓库 做好地址解析 scp -r /etc/hosts/ root17…

transformer bert 多头自注意力

输入的&#xff08;a1,a2,a3,a4&#xff09;是最终嵌入&#xff0c;是一个(512,768)的矩阵&#xff1b;而a1是一个token&#xff0c;尺寸是768 a1通过Wq权重矩阵&#xff0c;经过全连接变换得到查询向量q1&#xff1b;a2通过Wk权重矩阵得到键向量k2&#xff1b;q和k点乘就是值…

它,让机器人与HMI屏无缝对接

随着工业自动化向智能化发展&#xff0c;机器人与HMI屏的通信变得至关重要。本文将为您介绍一款创新的解决方案&#xff0c;它打破了通信协议的壁垒&#xff0c;实现机器人与HMI屏的无缝连接。 随着工业自动化向智能化的迈进&#xff0c;生产制造业正加速引入大量工业机器人以替…

MySQL 锁

MySQL中最常见的锁有全局锁、表锁、行锁。 全局锁 全局锁用于锁住当前库中的所有实例&#xff0c;也就是说会将所有的表都锁住。一般用于做数据库备份的时候就需要添加全局锁&#xff0c;数据库备份的时候是一个表一个表备份&#xff0c;如果没有加锁的话在备份的时候会有其他的…

win10 c++ VsCode 配置PCL open3d并显示

win10 c VsCode配置PCL open3d并显示 一、效果图二、配置步骤2.1 安装vscode2.2 pcl-open3d配置2.3 vscode中设置 三、测试代码四、注意事项及后续 一、效果图 二、配置步骤 2.1 安装vscode vscode下载链接 下载中文插件、c相关插件 2.2 pcl-open3d配置 1&#xff09;下载…

Python----计算机视觉处理(Opencv:图像颜色替换)

一、开运算 开运算就是对图像先进行腐蚀操作&#xff0c; 然后进行膨胀操作。开运算可以去除二值化图中的小的噪点&#xff0c;并分离相连的物体。 其主要目的就是消除那些小白点 在开运算组件中&#xff0c;有一个叫做kernel的参数&#xff0c;指的是核的大小&#xff0c;通常…

泰勒·斯威夫特(Taylor Swift)的音乐影响力与商业版图深度研究

泰勒斯威夫特的音乐影响力与商业版图深度研究 简介 泰勒斯威夫特&#xff08;Taylor Swift&#xff09;是当今流行音乐领域最具影响力的全球巨星之一。自少年时期出道以来&#xff0c;她在音乐风格、形象和商业战略上不断演变&#xff0c;从乡村音乐新人成长为引领流行文化的…

完全托管的DeepSeek-R1模型正式登陆Amazon Bedrock:安全部署与使用指南

文章目录 摘要一、核心优势&#xff1a;完全托管与企业级安全二、部署注意事项三、实践指南&#xff1a;从接入到调用四、支持区域与定价五、结语 摘要 DeepSeek-R1模型已在Amazon Bedrock平台正式上线&#xff0c;支持通过Bedrock Marketplace和自定义模型导入功能调用。 该模…

Matlab 汽车ABS实现模糊pid和pid控制

1、内容简介 Matlab 181-汽车ABS实现模糊pid和pid控制 可以交流、咨询、答疑 2、内容说明 略 实现汽车防抱死制动系统&#xff08;ABS&#xff09;的控制算法&#xff0c;通常涉及到传统的PID控制和模糊PID控制两种方法。下面将分别介绍这两种控制策略的基本概念以及如何在M…

Spring IOC(五个类注解)

controller、service、Repository、Component 、Configurationpackage com.java.ioc;import com.java.ioc.Controller.HelloController; import com.java.ioc.rep.UserRepository; import com.java.ioc.service.UserService; import org.springframework.boot.SpringApplicatio…

[Java实战]Spring Boot服务CPU 100%问题排查:从定位到解决

Spring Boot服务CPU 100%问题排查&#xff1a;从定位到解决 1. 引言 当Spring Boot服务出现CPU占用率100%时&#xff0c;系统性能会急剧下降&#xff0c;甚至导致服务不可用。本文将通过真实代码案例&#xff0c;详细讲解如何快速定位问题根源&#xff0c;并提供解决方案。无…

机器学习扫盲系列(2)- 深入浅出“反向传播”-1

系列文章目录 机器学习扫盲系列&#xff08;1&#xff09;- 序 机器学习扫盲系列&#xff08;2&#xff09;- 深入浅出“反向传播”-1 文章目录 前言一、神经网络的本质二、线性问题解析解的不可行性梯度下降与随机梯度下降链式法则 三、非线性问题激活函数 前言 反向传播(Ba…

LabVIEW 线性拟合

该 LabVIEW 程序实现了 线性拟合&#xff08;Linear Fit&#xff09;&#xff0c;用于计算给定一组数据点的斜率&#xff08;Slope&#xff09;和截距&#xff08;Intercept&#xff09;&#xff0c;并将结果可视化于 XY Graph 中。本案例适用于数据拟合、实验数据分析、传感器…

XSS漏洞靶场---(复现)

XSS漏洞靶场—&#xff08;复现&#xff09; 反射型 XSS 的特点是攻击者诱导用户点击包含恶意脚本的 URL&#xff0c;服务器接收到请求后将恶意脚本反射回响应页面&#xff0c;浏览器执行该脚本从而造成攻击&#xff0c;恶意脚本不会在服务器端存储。 Level 1(反射型XSS) 此漏…