Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门

参考文章

【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客

Docker搭建kafka+zookeeper

打开我们的docker的镜像源配置

vim /etc/docker/daemon.json

配置

 {
  "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}

 下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像

然后开始拉取我们的zookeeper镜像和我们的kafka镜像

这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本

docker pull zookeeper

kafka镜像 

docker pull wurstmeister/kafka

因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络

让不同容器都加入这个网络

docker network create创建我们的网络

然后那个zookeeper_network是我们自定义的网络名称

docker network create --driver bridge zookeeper_network

kafka是依赖于zookeeper的所以我们要先安装zookeeper

我们先用run来创建一个zookeeper容器

 docker run -d --name zookeeper1  --network zookeeper_network -p 2181:2181   zookeeper

-d 是后台运行

--name 是我们自定义容器的名字  我定义的名字是zookeeper1

--network 

是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络

-p 是指定我们的容器暴露给外部的端口  2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射

最后面的那个zookeeper 是我们的使用的镜像源的名称

一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本

查看我们创建的网络环境的地址

docker inspect zookeeper_network

那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址

我的是12.21.0.2,这个ip地址是要记住方便后面使用的
 

创建一个kafka容器

这段代码有点长,根子自己改吧

 # 启动kafka
docker run -d --name kafka1  --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka

解释 

KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址

不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了


SpringBoot集成kafka

首先就是springboot和kafka的版本兼容了

Spring for Apache Kafka

然后我们引入两个kafka的依赖 

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version>
</dependency>

自己对着自己的版本来

自己看b站视频,9分钟就搞定了

63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili

然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析

其实和普通mq差不多

也就是配置生产者和消费者和一些过期时间超时时间

重点在于那个missing-topics-fatal

主题不存在的话,我们是否还要成功启动

我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题

所以我启动的时候,报错然后失败了 

spring:kafka:bootstrap-servers: 192.168.88.130:9092  #Kafka 集群的地址和端口号producer:acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入batch-size: 16384  #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。buffer-memory: 33554432  #生产者用于缓存消息的内存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer  #定义了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消费者组ID#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,表示从最早的消息开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliest  #当消费者没有提交过 offset 时, 从何处开始消费消息enable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  #定义了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式max-poll-records: 2  #一次 poll 操作最多返回的消息数量properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}#消费者与 Kafka 服务端的会话超时时间session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡#消费者调用 poll 方法的最大间隔时间max.poll.interval.ms: 300000#消费者发送请求到 Kafka 服务端的超时时间#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#消费者向协调器发送心跳的间隔时间。#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#每个分区最多拉取的消息字节数。#max.partition.fetch.bytes=1048576  #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate   手动 ACK 的方式。#如果监听的主题不存在, 是否启动失败。missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略#消费方式, single 表示单条消费, batch 表示批量消费#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batch#并发消费的线程数concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲#默认的主题名称template:default-topic: "test"#springboot启动的端口号
server:port: 9999 #这个是java项目启动的端口

基本案例

这是常量类

指定了一个topic和group

主题和分组id

groupid是消费者组的唯一标识

这个视频9分钟看懂kafka

小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili

生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

 我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

 @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}

我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic

以及指定消费者的唯一标识GROUP_ID

这些其实都是自己在常量类里面自己写好的



@Header(KafkaHeaders.RECEIVED_TOPIC) String topic

 这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了


kafka的图形化工具

这里介绍一个免费的开源项目KafkaKing

Releases · Bronya0/Kafka-King (github.com)

里面还能指定中文

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/376835.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全面升级的对象创建——抽象工厂模式(Python实现和JAVA实现)

1. 引言 大家好&#xff01;在之前的文章中&#xff0c;我们探讨了简单工厂和工厂方法模式&#xff1a; 轻松创建对象——简单工厂模式&#xff08;Python实现&#xff09; 轻松创建对象——简单工厂模式&#xff08;Java实现&#xff09; 灵活多变的对象创建——工厂方法模式…

20240711每日消息队列-------------MQ消息的积压的折磨

目标 解决MQ消息的积压 背景 菜馆系统----------- 系统读取消息&#xff0c;处理业务逻辑&#xff0c;持久化订单和菜品数据&#xff0c;然后将其显示在菜品管理客户端上。 最初我们的用户基数很小&#xff0c;上线后的一段时间内&#xff0c;MQ消息通信还算顺利。 随着用户…

k8s(五)---名称空间

五、名称空间 名称空间是k8s划分不同工作空间的逻辑单位,是k8s资源逻辑隔离的机&#xff0c;。可以给不同的租户&#xff0c;不同的环境、不同的项目创建对应的命名空间。 1、查看名称空间 kubectl get ns kubectl get namespaces 此处展示了四个命名空间 2、管理名称空间 1…

Matlab-Simulink模型保存为图片的方法

有好多种办法将模型保存为图片&#xff0c;这里直接说经常用的 而且贴到Word文档中清晰、操作简单。 simulink自带有截图功能&#xff0c;这两种方法都可以保存模型图片。选择后直接就复制到截切板上了。直接去文档中粘贴就完事了。 这两个格式效果不太一样&#xff0c;第一种清…

[Java IO] 文件的概念与相关操作

一 概念 什么是文件&#xff1f; 文件就是保存数据的地方。 二 文件流 文件在程序中是以流的形式来操作的。 流——数据在数据源&#xff08;文件&#xff09;和程序&#xff08;内存&#xff09;之间经历的路径。 输入流&#xff1a;数据从数据源&#xff08;文件&#…

算法学习day12(动态规划)

一、不同的二叉搜索树 二叉搜索树的性质&#xff1a;父节点比左边的孩子节点都大&#xff1b;比右边的孩子节点都小&#xff1b; 由图片可知&#xff0c;dp[3]是可以由dp[2]和dp[1]得出来的。(二叉搜索树的种类和根节点的val有关) 当val为1时&#xff0c;左边是一定没有节点的…

React18+Redux+antd 项目实战 JS

React18Reduxantd 项目实战 js Ant Design插件官网 Axios官网 (可配置请求拦截器和响应拦截器) JavaScript官网 Echarts官网 一、项目前期准备 1.创建新项目 hotel-manager npx create-react-app hotel-manager2.安装依赖 //安装路由 npm i react-router-domnpm i aixos /…

系统架构师考点--系统安全

大家好。今天我来总结一下系统安全相关的考点&#xff0c;这类考点每年都会考到&#xff0c;一般是在上午场客观题&#xff0c;占2-4分。 一、信息安全基础知识 信息安全包括5个基本要素&#xff1a;机密性、完整性、可用性、可控性与可审查性 (1)机密性&#xff1a;确保信息…

【机器学习理论基础】回归模型定义和分类

定义 回归分析是研究自变量与因变量之间数量变化关系的一种分析方法&#xff0c;它主要是通过因变量 Y Y Y与影响它的自变量 X i X_i Xi​ 之间的回归模型&#xff0c;衡量自变量 X i X_i Xi​ 对因变量 Y Y Y 的影响能力的&#xff0c;进而可以用来预测因变量Y的发展趋势。…

基于Python/MATLAB长时间序列遥感数据处理及在全球变化、植被物候提取、植被变绿与生态系统固碳分析、生物量估算与趋势分析应用

植被是陆地生态系统中最重要的组分之一&#xff0c;也是对气候变化最敏感的组分&#xff0c;其在全球变化过程中起着重要作用&#xff0c;能够指示自然环境中的大气、水、土壤等成分的变化&#xff0c;其年际和季节性变化可以作为地球气候变化的重要指标。此外&#xff0c;由于…

Python3极简教程(一小时学完)下

目录 PEP8 代码风格指南 知识点 介绍 愚蠢的一致性就像没脑子的妖怪 代码排版 缩进 制表符还是空格 每行最大长度 空行 源文件编码 导入包 字符串引号 表达式和语句中的空格 不能忍受的情况 其他建议 注释 块注释 行内注释 文档字符串 版本注记 命名约定 …

WPF学习(4) -- 数据模板

一、DataTemplate 在WPF&#xff08;Windows Presentation Foundation&#xff09;中&#xff0c;DataTemplate 用于定义数据的可视化呈现方式。它允许你自定义如何展示数据对象&#xff0c;从而实现更灵活和丰富的用户界面。DataTemplate 通常用于控件&#xff08;如ListBox、…

鸿蒙语言基础类库:【@ohos.util (util工具函数)】

util工具函数 说明&#xff1a; 本模块首批接口从API version 7开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。开发前请熟悉鸿蒙开发指导文档&#xff1a;gitee.com/li-shizhen-skin/harmony-os/blob/master/README.md点击或者复制转到。 该模块…

Wikijs 部署教程

以下是一个 Wikijs 部署的简单教程&#xff0c;涵盖了使用 Docker 和直接安装两种方式&#xff1a; 方法一&#xff1a; 使用 Docker (推荐) Docker 是一个方便快捷的方式来部署 Wikijs&#xff0c;它可以避免许多手动配置步骤。 安装 Docker: 按照 https://docs.docker.com/…

jvm 07 GC算法,内存池

01 垃圾判断算法 1.1引用计数算法 最简单的垃圾判断算法。在对象中添加一个属性用于标记对象被引用的次数&#xff0c;每多一个其他对象引用&#xff0c;计数1&#xff0c; 当引用失效时&#xff0c;计数-1&#xff0c;如果计数0&#xff0c;表示没有其他对象引用&#xff0c;…

TensorBoard ,PIL 和 OpenCV 在深度学习中的应用

重要工具介绍 TensorBoard&#xff1a; 是一个TensorFlow提供的强大工具&#xff0c;用于可视化和理解深度学习模型的训练过程和结果。下面我将介绍TensorBoard的相关知识和使用方法。 TensorBoard 简介 TensorBoard是TensorFlow提供的一个可视化工具&#xff0c;用于&#x…

代理模式(大话设计模式)C/C++版本

代理模式 C #include <iostream> using namespace std;class Subject // Subject 定义了RealSubject和Proxy的共用接口..这样就在任何使用RealSubject的地方都可以使用Proxy { public:virtual void func(){cout << "Subject" << endl;} };class R…

Leetcode3200. 三角形的最大高度

Every day a Leetcode 题目来源&#xff1a;3200. 三角形的最大高度 解法1&#xff1a;模拟 枚举第一行是红色还是蓝色&#xff0c;再按题意模拟即可。 代码&#xff1a; /** lc appleetcode.cn id3200 langcpp** [3200] 三角形的最大高度*/// lc codestart class Solutio…

《Linux系统编程篇》认识在linux上的文件 ——基础篇

前言 Linux系统编程的文件操作如同掌握了一把魔法钥匙&#xff0c;打开了无尽可能性的大门。在这个世界中&#xff0c;你需要了解文件描述符、文件权限、文件路径等基础知识&#xff0c;就像探险家需要了解地图和指南针一样。而了解这些基础知识&#xff0c;就像学会了魔法咒语…

【Python】数据分析-Matplotlib绘图

数据分析 Jupyter Notebook Jupyter Notebook: 一款用于编程、文档、笔记和展示的软件。 启动命令&#xff1a; jupyter notebookMatplotlib 设置中文格式&#xff1a;plt.rcParams[font.sans-serif] [KaiTi] # 查看本地所有字体 import matplotlib.font_manager a sorted…