Linux系统上搭建高可用Kafka集群(使用自带的zookeeper)

本次在CentOS7.6上搭建Kafka集群

Apache Kafka 是一个高吞吐量的分布式消息系统,被广泛应用于大规模数据处理和实时数据管道中。本文将介绍在CentOS操作系统上搭建Kafka集群的过程,以便于构建可靠的消息处理平台。

文件分享(KafkaUI、kafka3.3.2、jdk1.8)

链接:https://pan.baidu.com/s/1dn_mQKc1FnlQvuSgGjBc1w?pwd=t2v9 提取码:t2v9

也可以从官网自己下

Kafka官网

步骤概览

本次使用三台机器 hostname别名分别为res01(10.41.7.41)、res02(10.41.7.42)、res03(10.41.7.43)

在这个教程中,我们将覆盖以下主要步骤:

  1. 准备环境:安装和配置Java、Zookeeper和Kafka所需的依赖。
  2. 配置Zookeeper集群:确保Kafka有可靠的分布式协调服务。(本次使用Kafka自带的zookeeper)
  3. 配置Kafka集群:在每个节点上安装和配置Kafka,设置Kafka集群以实现高性能和高可用性。

步骤详解

步骤1:环境准备 (先对第一台服务器操作 其他的使用scp传输之后进行小修改就好)

安装Java

确保Java安装正确,并设置JAVA_HOME环境变量。

如果对于jdk安装有问题的可以看一下这篇Linux安装MySQL、JDK(含环境变量配置)、Tomcat

步骤2:安装Kafka

下载和解压Kafka
[root@res01 module]# clear
[root@res01 module]# ll
总用量 104124
drwxr-xr-x. 4 root root        40 11月 10 10:07 data
-rw-r--r--. 1 root root 106619987 11月 10 10:33 kafka_2.13-3.3.2.tgz
[root@res01 module]# tar -zxvf kafka_2.13-3.3.2.tgz 
解压之后通过mv改名

步骤3:配置zookeeper

进入文件夹kafka3.3.2中找到config

接下来主要修改zookeeper.properties和server.properties这两个文件

zookeeper.properties如下

# 需要去新建/opt/module/data/zookeeper下面这两个文件夹
dataDir=/opt/module/data/zookeeper/data
dataLogDir=/opt/module/data/zookeeper/logs
clientPort=12181
maxClientCnxns=0
admin.enableServer=false
tickTime=2000
initLimit=10
syncLimit=5
# server.X=hostname:peerPort:leaderPort
# peerPort 是服务器之间通信的端口。
# leaderPort 是用于选举 leader 的端口。
server.1=res01:12182:12183
server.2=res02:12182:12183
server.3=res03:12182:12183#res01、res02、res03是我本地设置过的主机名 如果没设置使用ip地址即可
在每个节点上zookeeper的配置文件中dataDir目录下创建一个名为myid的文件,并分别填入相应节点的ID号:123

步骤4:配置Kafka

编辑Kafka配置文件config/server.properties,设置broker.idzookeeper.connect

# 设置 broker.id 这个是 Kafka 集群区分每个节点的唯一标志符。 对应那个myid即可
broker.id=1
# 将监听端口设置为19091
listeners=PLAINTEXT://res01:19091# 将广告给客户端的地址也设置为19091
advertised.listeners=PLAINTEXT://res01:19091num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## 设置 Kafka 的数据存储路径 这个目录下不能有其他非 Kafka 目录,不然会导致 Kafka 集群无法启动。
log.dirs=/opt/module/data/kafka-log
# 默认的 Partition 的个数。
num.partitions=3
# 设置默认的复制因子为3
default.replication.factor=3num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# Kafka 的数据保留的时间,默认是 7 天 168h。 这里使用24小时
log.retention.hours=24log.retention.check.interval.ms=300000
# Kafka 连接的 ZooKeeper 的地址和连接 Kafka 的超时时间。
zookeeper.connect=res01:12181,res02:12181,res03:12181
zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0
# 设置是否可以删除 Topic,默认 Kafka 的 Topic 是不允许删除的 这里打开了
delete.topic.enable=true# 这是用于启用或禁用日志清理的选项,默认值为 true,以确保 Kafka 持续进行日志清理。需要根据实际需求进行设置。
log.cleaner.enable=true
# 这个参数控制日志清理线程的数量。对于你的硬件配置,你可以考虑设置为 4 或 8 来充分利用服务器的性能。
log.cleaner.threads=4
# 这个参数用于控制日志清理线程的 IO 缓冲区大小。对于你的硬件配置,可以设置为 8192 或 16384。
log.cleaner.io.buffer.size=8192
# 这个参数是用来设置主题日志保留的最大字节数。对于控制磁盘空间的使用非常重要。例如,如果你希望限制每个主题的数据量不超过 100GB,可以设置为 107374182400
log.retention.bytes=107374182400
# 这个参数用于控制每个日志段文件的最大大小。对于你的硬件配置,你可以设置为 1073741824(即 1GB)。
log.segment.bytes=1073741824
# 这个参数用于设置 Zookeeper 会话的超时时间。对于较大的集群和连接较慢的网络,你可以考虑将其设置为 10000,即 10 秒。
zookeeper.session.timeout.ms=10000

重点是这个:

# 设置默认的复制因子为3
default.replication.factor=3

在Kafka集群的每个节点上,修改broker.id为对应的节点ID。

配置kafka环境变量

#java环境
export JAVA_HOME=/usr/local/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
#kafka环境
export KAFKA_HOME=/opt/module/kafka3.3.2
export PATH=$PATH:$KAFKA_HOME/bin

步骤5:复制虚拟机

(已有其他服务器的直接连网线scp就好 环境变量 配置小改一下就好 还有hosts、ip等等别忘了配置,我这里直接复制虚拟机了 )

配置ip、hostname以及hosts后尝试ping

res02修改kafka的config文件即可(zookeeper配置文件都一样 用res01配置的就好)

res03的kafka配置文件同理

以及各台机器的myid(对应上brokerId即可)

步骤6:配置Kafka集群

确保防火墙或安全组允许Kafka端口通过,通常是9092端口。(我这是修改过的为19091,我直接关防火墙了 方便。)

systemctl stop firewalld.service
#关闭运行的防火墙
systemctl disable firewalld.service
#永久关闭防火墙
本次使用的是绝对路径 各位可以到kafka目录下执行命令 去掉前面的绝对路径就好
zookeeper命令

在每个机器上,先启动zookeeper:

/opt/module/kafka3.3.2/bin/zookeeper-server-stop.sh
#停止命令
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh  /opt/module/kafka3.3.2/config/zookeeper.properties
#启动命令
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh -daemon /opt/module/kafka3.3.2/config/zookeeper.properties
#后台启动命令 常用~
统一启动后jps查看进程

Kafka命令
在每个Kafka节点上,启动Kafka服务器:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh  /opt/module/kafka3.3.2/config/server.properties
#kafka启动命令
/opt/module/kafka3.3.2/bin/kafka-server-start.sh -daemon /opt/module/kafka3.3.2/config/server.properties
#kafka后台启动命令 常用~
/opt/module/kafka3.3.2/bin/kafka-server-stop.sh
#停止命令
统一启动后jps查看进程
创建主题

使用kafka-topics.sh命令创建一个主题:这里设置的复制因子为3 

bin/kafka-topics.sh --create --topic 你的topic--bootstrap-server res01:19091--replication-factor 3 --partitions 3

验证Kafka集群

使用生产者和消费者验证Kafka集群的功能:

# 启动生产者
bin/kafka-console-producer.sh --topic myTopic --bootstrap-server res01:19091# 启动消费者
bin/kafka-console-consumer.sh --topic myTopic --bootstrap-server res01:19091--from-beginning

停止 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-stop.sh
启动 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh  /opt/module/kafka3.3.2/config/zookeeper.properties
后台启动 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh -daemon /opt/module/kafka3.3.2/config/zookeeper.properties
清空 Kafka 日志:
rm -rf //opt/module/data/kafka-logs/*
启动 Kafka 服务:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh  /opt/module/kafka3.3.2/config/server.properties
后台启动 Kafka 服务:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh -daemon /opt/module/kafka3.3.2/config/server.properties
停止 Kafka 服务:
/opt/module/kafka3.3.2/bin/kafka-server-stop.sh
创建 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --create --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT] --partitions [PARTITIONS_SIZE] --replication-factor [REPLICATION_FACTOR]
删除 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --delete --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT]
查看 Topic 信息:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --describe --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT]
列出所有的 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --list --bootstrap-server [SERVER_IP]:[PORT]
控制台生产消息:
/opt/module/kafka3.3.2/bin/kafka-console-producer.sh --bootstrap-server [SERVER_IP]:[PORT] --topic [TOPIC_NAME]
控制台消费信息:
/opt/module/kafka3.3.2/bin/kafka-console-consumer.sh --bootstrap-server [SERVER_IP]:[PORT] --topic [TOPIC_NAME] --from-beginning
查看副本:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --describe --bootstrap-server [SERVER_IP]:[PORT] | grep consumer_offsets
请记住替换 [TOPIC_NAME]、[SERVER_IP]:[PORT]、[PARTITIONS_SIZE]、[REPLICATION_FACTOR] 等位中的值为实际的值。

自己的做的总结如上:

运行Java代码生成topic 可以看到分区都是3符合集群要求

运行kafkaui查看详细情况(百度网盘链接里有,自己输入命令太累了 直接用别人封装好现成的看就好~)

结论

通过这个步骤,我们成功地搭建了一个基本的Kafka集群。在实际生产环境中,您可能需要进一步调整和优化配置,以满足特定需求和性能要求。

希望这个教程可以帮助您成功搭建Kafka集群,为您的数据处理和消息传递架构提供强大的基础设施。

最后温馨提示:如果你远程服务器起了别名,而自己电脑的hosts别名对应其他的服务器 也会发生报错 记得别名对应好ip即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/189503.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode421. 数组中两个数的最大异或值

Every day a Leetcode 题目来源:421. 数组中两个数的最大异或值 解法1:贪心 位运算 初始化答案 ans 0。从最高位 high_bit 开始枚举 i,也就是 max⁡(nums) 的二进制长度减一。设 newAns ans 2i,看能否从数组 nums 中选两个…

servlet 的XML Schema从哪边获取

servlet 6.0的规范定义: https://jakarta.ee/specifications/servlet/6.0/ 其中包含的三个XML Schema:web-app_6_0.xsd、web-common_6_0.xsd、web-fragment_6_0.xsd。但这个页面没有给出下载的链接地址。 正好我本机有Tomcat 10.1.15版本的源码&#…

头歌答案--爬虫实战

目录 urllib 爬虫 第1关:urllib基础 任务描述 第2关:urllib进阶 任务描述 requests 爬虫 第1关:requests 基础 任务描述 第2关:requests 进阶 任务描述 网页数据解析 第1关:XPath解析网页 任务描述 第…

基于公共业务提取的架构演进——外部依赖防腐篇

1.背景 有了前两篇的帐号权限提取和功能设置提取的架构演进后,有一个问题就紧接着诞生了,对于诸多业务方来说,关键数据源的迁移如何在各个产品落地? 要知道这些数据都很关键: 对于帐号,获取不到帐号信息是…

如何在后台执行 SwiftData 操作

文章目录 前言Core Data 私有队列上下文SwiftData 并发支持使用 ModelActor合并上下文更改的问题通过标识符访问模型总结 前言 SwiftData 是一个用于处理数据操作的框架,特别是在 Swift 语言中进行并发操作。本文介绍了如何在后台执行 SwiftData 操作以及与 Core D…

基于springboot的在线文档管理系统

基于springboot的在线文档管理系统 摘要 基于Spring Boot的在线文档管理系统是一种通过使用Spring Boot框架构建的现代化应用程序,旨在有效地组织、存储和分享文档内容。该系统通过利用Spring Boot的快速开发和简化配置的优势,提供了一个稳健的基础架构&…

某手游完整性校验分析

前言 只是普通的单机手游,广告比较多,所以分析处理了下,校验流程蛮有意思的,所以就分享出来了 1.重打包崩溃处理 样本进行了加固,对其dump出dex后重打包出现崩溃 ida分析地址发现为jni函数引起 利用Xposed直接替换…

Yolo自制detect训练

Install 把代码拉下来 GitHub - ultralytics/yolov5 at v5.0 然后 pip install -r requirements.txt 安装完了,运行一下detect.py即可 结果会保存在对应的目录下 Intro ├── data:主要是存放一些超参数的配置文件(这些文件(yaml文件)是用来配置训练集和测试集还有验…

基于讯飞星火大语言模型开发的智能插件:小策问答

星火大语言模型是一种基于深度学习的自然语言处理技术,它能够理解和生成人类语言。这种模型的训练过程涉及到大量的数据和复杂的算法,但最终的目标是让机器能够像人一样理解和使用语言。 小策问答是一款基于星火大语言模型的定制化GPT插件小工具。它的主…

ios安全加固 ios 加固方案

​ 目录 一、iOS加固保护原理 1.字符串混淆 2.类名、方法名混淆 3.程序结构混淆加密 4.反调试、反注入等一些主动保护策略 二 代码混淆步骤 1. 选择要混淆保护的ipa文件 2. 选择要混淆的类名称 3. 选择要混淆保护的函数,方法 4. 配置签名证书 5. 混淆和测…

ida81输入密码验证算法分析以及破解思路

本文分析了ida81对输入密码的验证流程,分别对输入密码到生成解密密钥、密码素材的生成过程以及文件数据的加密过程这三个流程进行分析,并尝试找一些可利用的破绽。很遗憾,由于水平有限,目前也只是有个思路未能完全实现&#xff0c…

腾讯域名优惠卷领取

腾讯域名到到期了,听说申请此计划,可获得优惠卷,看到网上5年域名只需要10元,姑且试试看。 我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?in…

“辛巴猫舍”内网渗透、提权、撞库学习笔记

前言: 在拿到靶机时,我们最先需要做的是信息收集,包括不限于:C段扫描,端口探测,指纹识别,版本探测等。其次就是 漏洞挖掘、漏洞利用、提权、维持权限、日志清理、留下后门。 以上就是渗透的基本…

Autosar UDS开发01(UDS诊断入门概念(UDSOnCan))

目录 回顾接触UDS的过程 UDS基本概念 UDS的作用 UDS的宏观认识 UDS的CAN通讯链路 UDS的报文种类 回顾接触UDS的过程 自21年毕业后,我一直干了2年的Autosar CAN通讯开发。 开发的主要内容简单概括就是:应用报文开发、网管报文开发、休眠唤醒开发&am…

【PostgreSql本地备份为dump文件与恢复】使用脚本一键备份为dump文件

环境:windows数据库:postgresql 1.准备脚本 backUpDb.bat 脚本为备份脚本,双击运行,右键可以选择编辑;restoreDb.bat 脚本为恢复脚本,双击运行,右键选择编辑; 1.1 脚本介绍 如上图…

docker更改存储目录原因及方案

为什么一定要将docker的存储目录挂载到其他目录 docker在安装时默认存储目录在/var/lib/docker,而该目录是在系统盘下的。docker安装后,会使用各种各样的镜像,动辄几个G,那么如此多的镜像文件,装着装着系统盘就撑爆了…

【mysql】将逗号分割的字段内容转换为多行并group by

先说需求: 公司想让我通过mysql导出一个报表,内容为公司每个人参加会议的次数,现在有一个会议表fusion_meeting,正常的逻辑是通过人员直接group by就可以得出结果,但是我们的参会人是通过逗号分割这种方式存在一个字段…

网络原理-UDP/TCP详解

一. UDP协议 UDP协议端格式 由上图可以看出,一个UDP报文最大长度就是65535. • 16位长度,表示整个数据报(UDP首部UDP数据)的最大长度(注意,这里的16位UDP长度只是一个标识这个数据报长度的字段&#xff0…

银河麒麟等 Linux系统 安装 .net 3.1,net 6及更高版本的方法

确定 系统的版本。华为鲲鹏处理器是 Arm64位的。 于是到windows 官网下载对应版本 .net sdk 下载地址 https://dotnet.microsoft.com/zh-cn/download/dotnet 2.下载完成后,再linux 服务器 上进入到文件所在目录,建议全英文路径。 然后依次输入以下命令 …

基于 Gin 的 HTTP 代理 demo

上次用 TCP 模拟了一个 HTTP 代理之后,感觉那样还是太简陋了,想着是不是可以用框架来做一个有点实际用处的东西。所以,就思索如何用 golang 的 Gin 框架来实现一个?嗯,对的你没有听错,是 gin 框架。你可能会…