Kafka的安装及接入SpringBoot

环境:windows、jdk1.8、springboot2

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/

1.概述

        Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

        详细概述见Kafka概述:

1.1 Kafka的作用

  • 发布和订阅记录流
  • 持久存储记录流,Kafka中的数据即使消费后也不会消失
  • 在系统或应用之间构建可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • Kafka可以处理源源不断产生的数据

1.2 Kafka的一些概念

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2.Kafka下载安装

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/downloads        选择最新版就可以

2.1 配置kafka

        解压下载的文件,修改 config 文件夹下的 zookeeper.properties

        修改 config 文件夹下的 server.properties

        当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

 

2.2 启动 zookeeper

        Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

        可以本地访问看一下:http://localhost:2181/ 

2.3 启动Kafka 

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

kafka-server-start.sh ../../config/server.properties

        访问路径: http://localhost:9092/ 

2.4 便捷启动脚本

        两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

cd bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

cd bin\windows

kafka-server-start.bat ../../config/server.properties

3.springboot集成Kafka

3.1 环境搭建

(1)添加pom依赖

<!-- 继承Spring boot工程 -->
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.8.RELEASE</version>
</parent>
<properties><fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>

(2)配置类application.yml

        生产者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092producer:retries: 0key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

        消费者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092consumer:group-id: kafka-demo-kafka-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApp {public static void main(String[] args) {SpringApplication.run(KafkaApp.class, args);}
}

3.2 消息生产者

        junit测试,新建消息发送方

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
​
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSendTest {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误
​@Testpublic void sendMsg(){String topic = "spring_test";kafkaTemplate.send(topic,"hello spring boot kafka!");System.out.println("发送成功.");while (true){ //保存加载ioc容器
​}}
}

3.3 消息消费者

        新建监听类:

​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyKafkaListener {
​//    以下两种方法都行// 指定监听的主题
//    @KafkaListener(topics = "spring_test")
//    public void receiveMsg(String message){
//        System.out.println("接收到的消息:"+message);
//    }
​@KafkaListener(topics = "spring_test")public void handleMessage(ConsumerRecord<String, String> record) {System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/326029.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis继续(黑马)

Redis持久化 RDB与AOF RDB记录是二进制数据&#xff0c;Redis停机时会触发保存&#xff0c;名称&#xff1a; dump.rdb 缺点&#xff1a;间歇式复制可能存在宕机数据更新丢失 AOF 记录的写操作命令&#xff0c;每秒记录一下&#xff0c;也存在数据更新丢失的可能&#xff0c;相…

java学习之zip炸弹攻击

一、概述 Zip炸弹是一种特殊类型的Zip文件&#xff0c;它包含了大量的无用数据。Zip文件格式允许使用压缩算法来减小文件的大小&#xff0c;但是如果Zip文件中的某些内容被重复压缩&#xff0c;就会导致文件大小急剧增加。Zip炸弹利用这个特性&#xff0c;将一些无用的数据多次…

差分约束 C++ 算法例题

差分约束 差分约束 是一种特殊的 n 元一次不等式组&#xff0c;m 个约束条件&#xff0c;可以组成形如下的格式&#xff1a; { x 1 − x 1 ′ ≤ y 1 x 2 − x 2 ′ ≤ y 2 ⋯ x m − x m ′ ≤ y m \begin{cases} x_1-x_1^{} \le y_1 \\ x_2-x_2^{} \le y_2 \\ \cdots \\ x_…

Pygame简单入门教程(绘制Rect、控制移动、碰撞检测、Github项目源代码)

Pygame简明教程 引言&#xff1a;本教程中的源码已上传个人Github: GItHub链接 视频教程推荐&#xff1a;YouTube教程–有点过于简单了 官方文档推荐&#xff1a;虽然写的一般&#xff0c;但还是推荐&#xff01; Navigator~ Pygame简明教程安装pygame一、代码框架二、案件输入…

java项目之车辆管理系统(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的车辆管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 车辆管理系统的主要使用者分…

day-35 被围绕的区域

思路 很明显&#xff0c;只有与边界上的O连接的O才不会X覆盖 解题方法 检测边界上的字符&#xff0c;如果是O则向周围探测&#xff0c;访问与之连接的不会被覆盖的X。边界探测结束后&#xff0c;没有访问过的O皆会被X覆盖 Code class Solution {public int dir[][]{{0,1},{0…

用webui.sh安装报错No module named ‘importlib.metadata‘

安装sdweb报错&#xff0c;出现No module named importlib.metadata&#xff1a; glibc version is 2.35 Cannot locate TCMalloc. Do you have tcmalloc or google-perftool installed on your system? (improves CPU memory usage) Traceback (most recent call last):File…

【Qt 学习笔记】Qt常用控件 | 布局管理器 | 水平布局Horizontal Layout

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt常用控件 | 布局管理器 | 水平布局Horizontal Layout 文章编号&…

力扣32. 最长有效括号

Problem: 32. 最长有效括号 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.创建一个栈&#xff0c;并将-1先添加到栈中&#xff08;添加-1到栈中只是为了方便接下来的操作&#xff09;&#xff0c;定义int变量len用于记录每一个子有效括号的长度&#xff0c;ma…

ctfshow SSRF 351-358

做题前,需要先学习关于ssrf漏洞的相关知识 小注意: 当使用 file_get_contents() 函数访问远程 URL 时&#xff0c;它会尝试获取该 URL 指向的资源的内容&#xff0c;并将内容以字符串的形式返回。 如果 b.php 文件是一个 PHP 文件&#xff0c;它包含的内容取决于该 PHP 文件…

vue3+ant design实现表格数据导出Excel

提示:实现表格数据导出Excel 文章目录 前言 一、安装ant design? 二、引用ant design 1.搭建框架 2.获取表格数据 三、封装导出表格的代码 四、导出 1.获取导出地址 2.在下载导出事件中添加导出代码 五、全部代码 前言 今天终于有时间来更新文章了,最近公司项目比较紧…

遨游 JavaScript 对象星际:探索面向对象编程的深邃世界

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;JavaScript 精粹 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 &#x1f4af;面向对象编程&#x1f517;1 什么是对象&#x1f517;2 什么是…

数智结合,智慧合同让法务管理发挥内在价值

在当今这个信息化、数字化飞速发展的时代&#xff0c;数据已成为企业重要的战略资源。法务管理作为企业内部控制和风险防范的重要环节&#xff0c;其重要性不言而喻。然而&#xff0c;传统的法务管理模式往往存在效率低下、信息孤岛、反应迟缓等问题。在这样的背景下&#xff0…

神卓互联内网穿透之快速创建https类型通道【最新】

神卓互联最近上线了V9.0内网穿透通信传输模式&#xff0c;相比与之前的V8.0在速度和延迟方面确实提升了很多&#xff0c;控制台也进行了改版升级&#xff0c;这里是对升级后的控制台创建https通道方法进行记录&#xff1a; 登录神卓互联控制台 选择【内网穿透】-【映射管理】…

ICode国际青少年编程竞赛- Python-2级训练场-坐标与列表遍历

ICode国际青少年编程竞赛- Python-2级训练场-坐标与列表遍历 1、 for i in range(5):Flyer[i].step(Dev.x -Flyer[i].x) Dev.step(Item.y - Dev.y)2、 for i in range(7):Flyer[i].step(Dev.y - Flyer[i].y) Dev.step(Item[2].x - Dev.x)3、 for i in range(5):Flyer[i].…

软件技术主要学什么课程

软件技术专业主要学习的课程和内容有编程语言、数据结构与算法、数据库技术等&#xff0c;以下是上大学网( www.sdaxue.com)整理的软件技术主要学什么课程&#xff0c;供大家参考&#xff01; 编程语言&#xff1a;掌握一种或多种编程语言&#xff0c;如C#、Java、Python、C等&…

AI绘画神级Stable Diffusion入门教程|快速入门SD绘画原理与安装

什么是Stable Diffusion&#xff0c;什么是炼丹师&#xff1f;根据市场研究机构预测&#xff0c;到2025年全球AI绘画市场规模将达到100亿美元&#xff0c;其中Stable Diffusion&#xff08;简称SD&#xff09;作为一种先进的图像生成技术之一&#xff0c;市场份额也在不断增长&…

QT+MYSQL数据库处理

1、打印Qt支持的数据库驱动&#xff0c;看是否有MYSQL数据库驱动 qDebug() << QSqlDatabase::drivers(); 有打印结果可知&#xff0c;没有MYSQL数据库的驱动 2、下载MYSQL数据库驱动&#xff0c;查看下面的文章配置&#xff0c;亲测&#xff0c;可以成功 Qt6 配置MySQL…

【Linux】centos7安装软件(rpm、yum、编译安装),补充:查找命令的相关文件路径,yum安装mysql

【Linux】技术上&#xff0c;Linux是内核。而术语上&#xff0c;我们通常说的Linux是完整的操作系统&#xff0c;其实称为"Linux发行版"&#xff0c;是将Linux内核和应用系统打包&#xff0c;由不同的发行家族发行了不同版本。Linux发行版众多&#xff0c;主要有RedH…

VScode通过ssh远程连接服务器被拒绝:permission denied, please try again

使用场景&#xff1a; 使用windows系统下的vscode远程连接服务器的linux系统&#xff0c;终端提示permission denied, please try again,但是使用cmd是可以远程登录的。 解决办法&#xff1a; 前提条件windows端的vscode安装了ssh远程连接的相关插件Remote - SSH&#xff0c;…