Kafka-Windows搭建全流程(环境,安装包,编译,消费案例,远程连接,服务自启,可视化工具)

目录

一. Kafka安装包获取

  1. 官网地址 

  2. 百度网盘链接

二. 环境要求

1. Java 运行环境

(1) 对 java 环境变量进行配置

(2) 下载完毕之后进行解压

三. 启动Zookeeper

四. 启动Kafka

(1)  修改Conf下的server.properties文件,修改kafka的日志文件路径

(2)  Kafka开启远程连接

五. kafka-windwos 自启动脚本

六. python-demo生产者消费者案例

       (1)生产者

       (2)消费者

七. Docker搭建kafka-map (数据可视化)

八. 备注


Kafka是一款流行分布式消息分布订阅系统,除Kafka之外还有MQ、Redis等。我们可以把消息队列视为一个管道,管道的两端分别是消息生产者(producer)和消息消费者(consumer),消息生产者产生日志等消息后可以发送到管道中,这时消息队列可以驻留在内存或者磁盘上,直到消费者来把它读走为止。

上述就是Kafka的一个概括,我们只需要了解一下Kafka的架构和一些专业术语即可,下面就来介绍一下Kafka 中一些专业术语。

Producer:消息生产者,负责把产生的消息发送到Kafka服务器上。

Consumer:消息消费者,从Kafka服务器读取消息。

Consumer Group:消费者群组,每个消息消费者可以划分为一个特定的群组。

Topic:这是Kafka使用中非常重要的一个术语,它相当于消息的"身份标识",消息生产者产生消息时会给它贴上一个Topic标签,当消息消费者需要读取消息时,可以根据这个Topic读取特定的数据。

Broker:Kafka集群中包含的服务器。


一. Kafka安装包获取

  1. 官网地址 

Apache Kafkaicon-default.png?t=O83Ahttps://kafka.apache.org/downloads

     下载版本为kafka_2.13-3.5.0

  2. 百度网盘链接

百度网盘 请输入提取码 (baidu.com)icon-default.png?t=O83Ahttps://pan.baidu.com/share/init?surl=qD06L8_OLbe7NFmQI3Ja6g&pwd=2024

二. 环境要求


在安装 Kafka 之前,需要满足以下环境要求:

1. Java 运行环境

Kafka 是使用 Java 语言编写的,因此需要在安装 Kafka 之前先安装 Java 运行环境。Kafka 支持 Java 8 及以上版本。可以通过以下命令检查 Java 运行环境的版本:

java -version
(1) 对 java 环境变量进行配置

(2) 下载完毕之后进行解压

     因为Kafka的运行依赖于 Zookeeper,所以还需要下并安装Zookeeper,ZooKeeper和Kafka版本之间有一定的对应关系,不同版本的ZooKeeper和Kafka可以相互兼容,但需要满足一定的条件。
     Kafka 2.2.0 开始支持使用内置的ZooKeeper替代外部ZooKeeper。 所以3.5.0是不需要安装Zookeeper的,直接解压即可。

三. 启动Zookeeper


     因为Kafka中的Broker注册,Topic注册,以及负载均衡都是在Zookeeper中管理,所以需要先启动内置的Zookeeper

     打开Conf文件下的zookeeper.properties文件,修改dataDir目录路径

dataDir=D:\Kafka\kafka_2.13-3.5.0\zookeeperData

执行启动Zookeeper命令

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

当看到绑定到IP地址为0.0.0.0、端口号为2181的地址,表示ZooKeeper服务器监听在该地址,启动成功

四. 启动Kafka


(1)  修改Conf下的server.properties文件,修改kafka的日志文件路径

 (2)  Kafka开启远程连接

新开一个命令行窗口,在之前的目录中输入启动命令

.\bin\windows\kafka-server-start.bat .\config\server.properties

 

五. kafka-windwos 自启动脚本

@echo offREM 设置绝对路径
set KAFKA_HOME=D:\Kafka\kafka_2.13-3.5.0
set ZOOKEEPER_CONFIG=%KAFKA_HOME%\config\zookeeper.properties
set KAFKA_CONFIG=%KAFKA_HOME%\config\server.propertiesREM 删除 kafka_log 目录及其内容
if exist %KAFKA_HOME%\kafka_log (echo 删除 kafka_log 目录...rmdir /s /q %KAFKA_HOME%\kafka_log
)REM 删除 zookeeperData 目录
if exist %KAFKA_HOME%\bin\windows\Kafkakafka_2.13-3.5.0zookeeperData (echo 删除 Kafkakafka_2.13-3.5.0zookeeperData 目录...rmdir /s /q %KAFKA_HOME%\bin\windows\Kafkakafka_2.13-3.5.0zookeeperData
)REM 启动Zookeeper
cd /d %KAFKA_HOME%\bin\windows
start /b zookeeper-server-start.bat %ZOOKEEPER_CONFIG%REM 等待Zookeeper启动
timeout /t 10 /nobreakREM 检查 meta.properties 文件
if exist %KAFKA_HOME%\kafka_log\meta.properties (echo "Meta properties already exist. Starting Kafka server..."
) else (echo "Meta properties not found. Starting Kafka server for the first time..."
)REM 启动Kafka服务器
start /b kafka-server-start.bat %KAFKA_CONFIG%exit

 将脚本后缀改为bat,放到windwos自启中

六. python-demo生产者消费者案例

(运行如下代码可以对kafka数据进行推送与拉取)

   (1)生产者

from kafka import KafkaProducer
import jsonproducer = KafkaProducer(bootstrap_servers='192.168.14.93:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# kafka 队列生产者
def producer_demo():# 配置Kafka生产者producer = KafkaProducer(bootstrap_servers='192.168.14.93:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))# 发送消息到指定的topicproducer.send('my-topic', {'data_time':'2024-09-10 17:06:12','data_image':'http://192.168.7.51:9001/var/model_algorithm_package/output/17/2024-09-10-17-06-11.jpg','data_result':"['[323, 356, 437, 665, 0, [226, 255, 0], 0.5512826]', '[1514, 238, 1619, 447, 0, [226, 255, 0], 0.5094569]']"})# 关闭生产者连接producer.close()if __name__ == '__main__':while 1:producer_demo()

  (2)消费者

import json
from kafka import KafkaConsumerlisr = []# kafka 队列消费者
def consumer_demo():# 配置Kafka消费者consumer = KafkaConsumer('my-topic',bootstrap_servers='192.168.14.93:9092',# auto_offset_reset='latest',  # 从最新消息开始消费auto_offset_reset='earliest',  # 从最新消息开始消费enable_auto_commit=True,      # 自动提交offsetgroup_id='my-consumer-group')  # 指定消费者组# 消费消息for message in consumer:res = message.value.decode('utf-8')try:# 解析 JSON 字符串为 Python 字典parsed_dict = json.loads(res)# 打印解析后的字典print(parsed_dict)except Exception as e:print(f"Error: {e}, 消息内容: {res}")if __name__ == '__main__':consumer_demo()

七. Docker搭建kafka-map (数据可视化)

docker run -d --name kafka-map \-p 9001:8080 \-v /opt/kafka-map/data:/usr/local/kafka-map/data \-e DEFAULT_USERNAME=admin \-e DEFAULT_PASSWORD=admin \--restart always dushixiang/kafka-map:latest

登录端口地址为9001

可以对相应分区数据进行拉取

八. 备注

Kafka定位为分布式消息发布-订阅系统,提及分布式就可以想象,只有当在多节点环境下才能最大的发挥它的价值,前面所介绍的Kafka配置方式都是基于单节点的配置,由于本文主要是为了系统的梳理一下Kafka的配置及使用,因此对于配置这一块不再花费大的篇幅去详细介绍,如果需要到多节点配置Kafka可以自行查阅其他资料。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/455317.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

海外云手机实现高效的海外社交媒体营销

随着全球化的深入发展,越来越多的中国企业走向国际市场,尤其是B2B外贸企业,海外社交媒体营销已成为其扩大市场的重要手段。在复杂多变的海外市场环境中,如何有效提高营销效率并降低运营风险,成为了众多企业的首要任务。…

无人机悬停精度算法!

一、主要算法类型 PID控制算法: PID控制算法是一种常用的闭环控制算法,通过计算目标值与当前值的误差,并根据比例(P)、积分(I)、微分(D)三个参数来调整控制输出&#x…

SQL高级查询03

SQL查询语句的下载脚本链接!!! 【免费】SQL练习资源-具体练习操作可以查看我发布的文章资源-CSDN文库​编辑https://download.csdn.net/download/Z0412_J0103/89908378https://download.csdn.net/download/Z0412_J0103/89908378 1 查询employ…

聚链成网,趣链科技参与 “跨链创新联合体”建设

近日,2024全球数商大会在上海举办。大会由上海数据集团和上海市数商协会联合主办,上海市数据局和浦东新区人民政府支持,以“数联全球,商通未来——‘链’接数字经济新未来”为主题,聚焦区块链技术和应用场景展开。 会上…

PostGis空间(下):空间连接与空间索引

目录 1、简介2、空间连接3、空间索引3.1 索引操作3.2 空间索引的工作原理3.2.1 R-Tree 3.3 空间索引函数3.4 仅索引查询3.5 ANALYZE3.6 VACUUMing3.7 函数列表 PS 1024到啦!!! 先祝各位程序员或者想成为程序员正在奋斗中的伙伴1024程序员节快…

JavaScript进阶:手写代码挑战(二)

​🌈个人主页:前端青山 🔥系列专栏:JavaScript篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来JavaScript篇专栏内容:JavaScript手写代码篇 在现代Web开发中,JavaScript 是不可或缺的编程语言…

Linux系统基础-进程间通信(5)_模拟实现命名管道和共享内存

个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 Linux系统基础-进程间通信(5)_模拟实现命名管道和共享内存 收录于专栏[Linux学习] 本专栏旨在分享学习Linux的一点学习笔记,欢迎大家在评论区交流讨…

Mac 使用 zsh 终端提示 zsh: killed 的问题

我的脚本的内容为: #!/bin/bashset -epids$(ps -ef | grep consul | grep -v grep | awk {print $2})for pid in $pids; doecho "kill process: $pid"kill -9 $pid donecd $(dirname $0)nohup ./consul agent -dev > nohup.log &可以看到这是一个…

阿里云项目启动OOM问题解决

问题描述 随着项目业务的增长,系统启动时内存紧张,每次第一次启动的时候就会出现oom第二次或者第n的时候,就启动成功了。 带着这个疑问,我就在阿里云上提交了工单,咨询为什么第一次提交失败但是后面却能提交成功尼&a…

Matlab学习01-矩阵

目录 一,矩阵的创建 1,直接输入法创建矩阵 2,利用M文件创建矩阵 3,利用其它文本编辑器创建矩阵 二,矩阵的拼接 1,基本拼接 1) 水平方向的拼接 2)垂直方向的拼接 3&#xf…

shell脚本-函数

文章目录 一、函数介绍什么是函数、为什么使用函数、如何使用函数 二、shell脚本中如何定义函数Way1Way2Way3 三、shell脚本中如何调用函数四、shell脚本中使用内置变量(1、#、?、2等等)五、函数的返回值、shell脚本中函数的返回值函数的返回值概念shell脚本中函数的返回值ret…

梦金园三闯港交所上市:年营收200亿元,靠加盟模式取胜

近日,梦金园黄金珠宝集团股份有限公司(以下简称“梦金园”)向港交所递交IPO申请,中信证券为其独家保荐人。贝多财经了解到,这已经是梦金园第三次向港股发起冲击,此前曾于2023年9月、2024年4月两度递表。 继…

刷题 - 图论

1 | bfs/dfs | 网格染色 200. 岛屿数量 访问到马上就染色(将visited标为 true)auto [cur_x, cur_y] que.front(); 结构化绑定(C17)也可以不使用 visited数组,直接修改原始数组时间复杂度: O(n * m),最多将 visited 数…

Deepinteraction 深度交互:通过模态交互的3D对象检测

一.前提 为什么要采用跨模态的信息融合? 点云在低分辨率下提供必要的定位和几何信息,而图像在高分辨率下提供丰富的外观信息。 -->因此必须采用跨模态的信息融合 提出的原因? 传统的融合办法可能会由于信息融合到统一表示中的不太完美而丢失很大一部分特定…

磁珠的工作原理:【图文讲解】

1:什么是磁珠 磁珠是一种被动组件,用来抑制电路中的高频噪声。磁珠是一种特别的扼流圈,其成分多半为铁氧体,利用其高频电流产生的热耗散来抑制高频噪声。磁珠有时也称为磁环、EMI滤波器、铁芯等。 磁珠是滤波常用的器件&#xf…

SpringMVC常用注解

RequestMapping接口的映射,可以将HTTP请求映射到控制器方法上,通过这个注解使用不同的映射,就可以区分不同的控制器,其中RequestMapping中还有不同的属性,比如method,params,produces等在这里我…

快速搭建SpringBoot3+Prometheus+Grafana

快速搭建SpringBoot3PrometheusGrafana 一、搭建SpringBoot项目 1.1 创建SpringBoot项目 1.2 修改pom文件配置 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://…

25年山东高考报名时间为10月23日-29日

今日&#xff0c;山东省招生考试院发布关于《山东省2025年普通高等学校招生考试报名工作的通知》 其中高考报名时间定为&#xff1a;2024年10月23日29日&#xff08;每天9&#xff1a;0018&#xff1a;00&#xff09; 资格审查时间为&#xff1a;10月30日~11月11日 网上缴费时间…

Android问题记录 - 适配Android Studio Ladybug/Java 21/AGP 8.0

文章目录 前言开发环境问题描述问题分析1. 适配Java 212. 适配AGP 8.0 解决方案补充内容最后 前言 Android Studio版本从Koala Feature Drop升级到Ladybug&#xff0c;出现了一系列报错。 开发环境 Flutter: 3.24.3Android Studio: 2024.2.1 Patch 1Java: 21.0.3Gradle: 7.4…

FPGA实现PCIE采集电脑端视频转SFP光口万兆UDP输出,基于XDMA+GTX架构,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的PCIE方案10G Ethernet Subsystem实现万兆以太网物理层方案 3、PCIE基础知识扫描4、工程详细设计方案工程设计原理框图电脑端视频PCIE视频采集QT上位机XDMA配置及使用XDMA中断模块FDMA图像缓存UDP视频组包发送UDP协议栈MAC…