Linux 搭建 Kafka 环境 - 详细教程

目录

一. Kafka介绍

1. 应用场景

2. 版本对比

二. Kafka安装

1. 前置环境

(1)安装JDK

2. 软件安装

(3)环境变量配置

(3)服务启动

三. Console测试

基础命令

(1)列出Kafka集群中所有存在的主题

(3)创建一个新的主题

(3)删除主题

(4)描述主题

(5)启动生产者

(6)启动消费者

四. 注册系统服务

1. Systemd服务配置

2. Kafka服务控制


一. Kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

1. 应用场景

Kafka可以看作是一个能够处理消息队列的中间件,适用于实时的流数据处理,主要用于平衡好生产者和消费者之间的关系。

  • 生产者

生产者可以看作是数据源,可以来自于日志采集框架,如Flume,也可以来自于其它的流数据服务。当接收到数据后,将根据预设的Topic暂存在Kafka中等待消费。对于接收到的数据将会有额外的标记,用于记录数据的被消费【使用】情况。

  • 消费者

消费者即数据的使用端,可以是一个持久化的存储结构,如Hadoop,也可以直接接入支持流数据计算的各种框架,如Spark - Streaming。消费者可以有多个,通过订阅不同的Topic来获取数据。

2. 版本对比

Kafka的0.x和1.x可以看作是上古版本了,最近的更新也是几年以前,从目前的场景需求来看,也没有什么特别的理由需要使用到这两个版本了。

  • 2.x

在进行版本选择时,通常需要综合考虑整个数据流所设计到的计算框架和存储结构,来确定开发成本以及兼容性。目前2.x版本同样是一个可以用于生产环境的版本,并且保持着对Scala最新版本的编译更新。

  • 3.x

3.x是目前最新的稳定版,需要注意的是,Kafka的每个大版本之间的差异较大,包括命令参数以及API调用,所以在更换版本前需要做好详细的调查与准备,本文以3.x的安装为例。

二. Kafka安装

解压安装的操作方式可以适用于各种主流Linux操作系统,只需要解决好前置环境问题。

1. 前置环境

此前,运行Kafka需要预先安装Zookeeper。在Kafka 2.8.0版本以后,引入了Kraft(Kafka Raft)模式,可以使Kafka在不依赖外部Zookeeper的前提下运行。除此之外Kafka由Scala语言编写,需要JVM的运行环境。

(1)安装JDK

 Ubuntu/Debian:

sudo apt install openjdk-8-jdk

  CentOS/RedHat:

sudo yum install java-1.8.0-openjdk

安装完成后可以使用java-version命令验证【可省去环境变量配置】。

2. 软件安装

  • 下载Kafka ,链接如下:
# 离线下载安装包
https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz# 在线利用wget远程下载​
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
  • 解压安装  
tar -zvxf kafka_2.12-3.5.2.tgz

(3)环境变量配置

需要在环境变量中指定Kafka的安装目录以及命令文件所在目录,系统环境变量与用户环境变量配置其中之一即可。

/etc/profile 文件最下方添加如下两行命令--配置全局环境。

export KAFKA_HOME=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2
export PATH=$PATH:$KAFKA_HOME/bin

在文件结尾添加以上内容后执行source命令,使其立即生效。

source /etc/profile[Ubuntu/Debian] source ~/.bashrc[CentOS/RedHat] source ~/.bash_profile

执行后可以输入kafka,然后按Tab尝试补全【需要按多次】,如果出现命令列表则证明配置成功。

(3)服务启动

使用Kraft模式,则需要先进行集群初始化【即使是单个节点】,以下为操作步骤:

  • 目录下创建 kafka-logs文件夹

  • 修改配置文件

修改Kafka的/config/kraft/server.properties文件,更换其中的log.dirs目录指向创建目录,防止默认的/tmp被清空:

log.dirs=/home/ygsj/Config_files/kafka_server/kafka-logs

  • 创建Kafka的集群ID 
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

调用 kafka-storage.sh 生成一个UUID

将获得的 UUID 放到 kafka_2.12-3.5.2/config/kraft/server.properties 文件中 如下:

相同文件内修改:远程连接开启 (红框内写服务器ip)---自己测试0.0.0.0无效

进入到Kafka的家目录后,执行以下命令 

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties# bin/kafka-server-start.sh config/kraft/server.properties

 这种方式并不是后台运行,需要保证终端开启,等测试稳定后可以在后台执行或者注册为系统服务。 

三. Console测试

基础命令

(1)列出Kafka集群中所有存在的主题

kafka-topics.sh --list --bootstrap-server localhost:9092

--bootstrap-server localhost:9092 指定了Kafka集群的连接地址(在这里是本地的Kafka服务器)
如果集群中没有主题,命令不会返回任何内容
当你创建主题后,这条命令会返回集群中存在的主题列表

(3)创建一个新的主题

kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092

这条命令用于创建一个名为 my-topic 的新主题。
--create 指定了创建操作。
--topic my-topic 指定了要创建的主题名称。
--bootstrap-server localhost:9092 指定了Kafka集群的连接地址。
Created topic my-topic. 表示主题 my-topic 已成功创建。

(3)删除主题

kafka-topics.sh --delete  --topic my-topic --bootstrap-server localhost:9092

--delete: 指定要删除一个主题。
--topic my-topic: 指定要删除的主题名称是 my-topic。
--bootstrap-server localhost:9092: 指定Kafka集群的连接地址(在此是本地的Kafka服务器)。

(4)描述主题

 kafka-topics.sh --describe  --topic my-topic --bootstrap-server localhost:9092

获取指定主题 my-topic 的详细信息。
--describe 指定了描述操作。
--topic my-topic 指定了要描述的主题名称。
--bootstrap-server localhost:9092 指定了Kafka集群的连接地址。

(5)启动生产者

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

启动一个基于console的生产者脚本,可以方便的进行数据输入的测试,直接进行数据输入即可。

(6)启动消费者

 kafka-console-consumer.sh --help  打印所有参数

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

添加from-beginning参数来从头消费数据。

四. 注册系统服务

 为了方便的控制Kafka服务的启动和停止,可以将其注册为系统服务。

1. Systemd服务配置

创建Systemd服务文件

sudo vim /etc/systemd/system/kafka.service

在文件中添加以下内容,需要手动替换ExecStartExecStop中关于路径的部分:

[Unit]
Description=Apache Kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/bin/kafka-server-start.sh /home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/config/kraft/server.properties
ExecStop=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

 重新加载Systemd配置 

sudo systemctl daemon-reload

2. Kafka服务控制

  • 开机自动启动
sudo systemctl enable kafka.service
  • 启动Kafka服务
sudo systemctl start kafka.service
  • 检查Kafka状态 
sudo systemctl status kafka.service

  • 停止Kafka服务
sudo systemctl stop kafka.service
  • 重启Kafka服务
sudo systemctl restart kafka.service

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/368069.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最近,被“AI”狠狠刷屏了......

最近,被“AI”狠狠刷屏了。 作为时下最热门的话题,AI画图、AI配音、AI写文案、AI做视频......AI在最近两年可谓是火遍全球。ChatGPT、Midjourney和SORA等技术不断涌现,不仅深刻改变着我们的生活方式,也推动了AI技术的飞速发展。 …

白杨SEO:打粉是啥?打粉引流怎么做?打粉引流犯法吗?小红书代发效果好吗?

文章大纲: 1、打粉是什么意思? 2、打粉有哪些方法? 3、打粉一般怎么变现? 4、打粉引流是违法犯罪吗? 5、小红书代发是啥? 6、小红书批量代发效果好吗? 打粉是什么意思? 打粉这…

【期末复习】数据库系统概论(附带考点汇总)

第1章.绪论 目录 第1章.绪论1.1. 数据库系统概述1.1.1.基本概念1.1.2.产生和发展 1.2.概念模型1.2.1.三种模型1.2.2.概念模型1.2.3.关系模型 1.3.数据库系统结构1.3.1三级模式结构1.3.2.两级映像与数据独立性 第2章.关系型数据库2.1.关系2.2.关系操作2.2.1.基本关系操作2.2.2.关…

计算机网络 | 期末复习

物理层: 奈氏准则:带宽(w Hz),在不考虑噪音的情况下,最大速率(2W)码元/秒 信噪比S/N:以分贝(dB)为度量单位。信噪比(dB)…

docker安装nacos:v2.3.0

拉取镜像 sudo docker pull nacos/nacos-server:v2.3.0 查看镜像 sudo docker images 宿主机创建挂载文件 sudo mkdir -p /home/docker/nacos/logs sudo mkdir -p /home/docker/nacos/data sudo mkdir -p /home/docker/nacos/conf sudo touch /home/docker/nacos/conf/appli…

Xilinx FPGA:vivado实现乒乓缓存

一、项目要求 1、用两个伪双端口的RAM实现缓存 2、先写buffer1,再写buffer2 ,在读buffer1的同时写buffer2,在读buffer2的同时写buffer1。 3、写端口50M时钟,写入16个8bit 的数据,读出时钟25M,读出8个16…

解决:使用MySQL Command Line Client时光标不显示

问题描述: 使用MySQL Command Line Client时,命令行输入字符光标不显示, 如下图: 解决办法: 1.按Shift键将输入法切换至中文,打出中文: 2.再按一次Shift键,光标就会显示:

uniapp/Android App上架三星市场需要下载所需要的SDK

只需添加以下一个权限在AndroidManifest.xml <uses-permission android:name"com.samsung.android.providers.context.permission.WRITE_USE_APP_FEATURE_SURVEY"/>uniapp开发的&#xff0c;需要在App权限配置中加入以上的额外权限&#xff1a;

UB9A0全系统全频高精度板卡性能指标

UB9A0 板卡是基于和芯星通自主研发的新一代射频基带及高精度算法一体化 GNSS SoC 芯片—Nebulas Ⅳ开发的全系统全频点高精 OEM 板卡 &#xff0c;支持 BDS&#xff0c;GPS&#xff0c; GLONASS&#xff0c;Galileo&#xff0c;QZSS&#xff0c;NavIC&#xff0c;SBAS&#xff…

计算机网络之局域网

目录 1.局域网的基本概念 2.LAN的特性 3.局域网特点 4.拓扑结构 5.传输媒体的选择 6.传输媒体 7.传输技术 8.传输技术距离问题 9.LAN的逻辑结构 10.局域网工作原理 上篇文章内容&#xff1a;OSI七层体系结构 1.局域网的基本概念 局域网 是将分散在有限地 理范围内&…

十大护眼落地灯品牌怎么选比较好?落地台灯十大排名

十大护眼落地灯品牌怎么选比较好&#xff1f;现在小孩子户外活动不足&#xff0c;课间那点时间少的可怜&#xff0c;放学后还有作业要写&#xff0c;长时间在舒适环境中用眼才是保护视力的上上策&#xff01;大路灯能照亮整个书桌&#xff0c;全光谱、防蓝光等技术加持下比教室…

回溯算法-以单位人事管理系统为例

1.回溯算法介绍 1.来源 回溯算法也叫试探法&#xff0c;它是一种系统地搜索问题的解的方法。 用回溯算法解决问题的一般步骤&#xff1a; 1、 针对所给问题&#xff0c;定义问题的解空间&#xff0c;它至少包含问题的一个&#xff08;最优&#xff09;解。 2 、确定易于搜…

K8S学习教程(二):在 PetaExpress KubeSphere容器平台部署高可用 Redis 集群

前言 Redis 是在开发过程中经常用到的缓存中间件&#xff0c;为了考虑在生产环境中稳定性和高可用&#xff0c;Redis通常采用集群模式的部署方式。 在制定Redis集群的部署策略时&#xff0c;常规部署在虚拟机上的方式配置繁琐并且需要手动重启节点&#xff0c;相较之下&#…

工业路由器与家用路由器的区别

在现代网络环境中&#xff0c;路由器扮演着至关重要的角色。无论是在家庭网络还是在工业网络&#xff0c;选择合适的路由器都至关重要。本文将从多个角度&#xff0c;对工业路由器与家用路由器进行详细比较&#xff0c;帮助您更好地理解二者的区别。 1、安全性 工业路由器&…

软考《信息系统运行管理员》-2.4信息系统运维管理标准

2.4信息系统运维管理标准 信息系统运维的相关标准 ITIL信息技术基础设施库 基于服务生命周期主要包含五个方面&#xff1a;服务战略&#xff08;轴心&#xff09;、服务设计、服务转换、服务运营及服务改进 COBIT信息系统和技术控制目标 考法1&#xff1a;概念 在ITILv3基于…

【python】Data Augmentation

参考学习来自&#xff1a;使用PythonOpenCV进行数据增广方法综述&#xff08;附代码演练&#xff09; 文章目录 Random CutCutoutColor JitterAdd Noisy Random Cut 随机裁剪图片中的长款缩放为 scale 倍的区域&#xff0c;resize 到原图大小&#xff0c;bounding box 的坐标也…

8.12 矢量图层面要素单一符号使用十四(标记符号渲染边界)

前言 本章介绍矢量图层线要素单一符号中标记符号渲染边界&#xff08;Outline: Marker line&#xff09;的使用说明&#xff1a;文章中的示例代码均来自开源项目qgis_cpp_api_apps 标记符号渲染边界&#xff08;Outline: Marker line&#xff09; Outline系列只画边界&#…

嵌入式学习——硬件(UART)——day55

1. UART 1.1 定义 UART&#xff08;Universal Asynchronous Receiver/Transmitter&#xff0c;通用异步收发器&#xff09;是一种用于串行通信的硬件设备或模块。它的主要功能是将数据在串行和并行格式之间进行转换。UART通常用于计算机与外围设备或嵌入式系统之间的数据传输。…

013、MongoDB常用操作命令与高级特性深度解析

目录 MongoDB常用操作命令与高级特性深度解析 1. 数据库操作的深入探讨 1.1 数据库管理 1.1.1 数据库统计信息 1.1.2 数据库修复 1.1.3 数据库用户管理 1.2 数据库事务 2. 集合操作的高级特性 2.1 固定集合(Capped Collections) 2.2 集合验证(Schema Validation) 2.…

防止跨站脚本攻击XSS之Antisamy

目录 一、什么是跨站脚本攻击&#xff08;XSS&#xff09; 二、通常有哪些解决方案 三、常见的XSS攻击例子有哪些 3.1 存储型XSS攻击&#xff08;黑产恶意截流&#xff0c;跳转不法网站&#xff09; 3.2反射型XSS攻击&#xff1a; 四、什么是跨站请求伪造&#xff1f; 五…