Kafka 快速入门(一)

1.1安装部署

1.1.1 集群规划

bigdata01bigdata02bigdata03
zookeeperzookeeperzookeeper
kafkakafkakafka

1.1.2 集群部署

官方下载地址:http://kafka.apache.org/downloads.html

检查三台虚拟机的zk是否启动:zkServer.sh start 默认启动方式

1)解压安装包

kafka中 2.12 是scala语言的版本,3.0.0是kafka版本

将第一台虚拟机bigdata01作为主机,在bigdata01上完成一系列操作:

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/installs/ 
#在压缩包路径下解压缩kafka, -C 后面路径为解压路径

2)修改解压后的文件名称

 mv kafka_2.12-3.0.0/ kafka3#版本号解读:2.12 是scala版本,3.0.0是kafka版本

3)进入到/opt/installs/kafka3 目录,修改配置文件

cd config/ # 切换到kafkaa的config文件夹下
vi server.properties #进入server.properties 修改配置

修改红色部分:

#broker 的全局唯一编号,不能重复,只能是数字。

broker.id=0

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘 IO 的线程数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以

配置多个磁盘路径,路径与路径之间可以用","分隔

log.dirs=/opt/installs/kafka3/datas

#topic 在当前 broker 上的分区个数

num.partitions=1

#用来恢复和清理 data 下数据的线程数量

num.recovery.threads.per.data.dir=1

# 每个 topic 创建时的副本数,默认时 1 个副本

offsets.topic.replication.factor=1

#segment 文件保留的最长时间,超时将被删除

log.retention.hours=168

#每个 segment 文件的大小,默认最大 1G

log.segment.bytes=1073741824

# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期

log.retention.check.interval.ms=300000

#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)

zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka

# /kafka的意思是:在zk中创建一个文件夹叫做kafka

4)分发安装包

可以使用scp

scp /opt/installs/kafka3 root@hadoop12:/opt/installs

或者使用一下脚本命令:xsync.sh kafka3/

脚本:xsync.sh(可选择存放在/usr/local/bin或者sbin中)

脚本代码:

#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if [ $pcount -lt 1 ]
thenecho No Enough Arguement!exit;
fi#2. 遍历集群所有机器
for host in bigdata02 bigdata03
doecho ====================    $host    ====================#3. 递归遍历所有目录for file in $@do#4 判断文件是否存在if [ -e $file ]then#5. 获取全路径pdir=$(cd -P $(dirname $file); pwd)echo pdir=$pdir#6. 获取当前文件的名称fname=$(basename $file)echo fname=$fname#7. 通过ssh执行命令:在$host主机上递归创建文件夹(如果存在该文件夹)ssh $host "source /etc/profile;mkdir -p $pdir"#8. 远程同步文件至$host主机的$USER用户的$pdir文件夹下rsync -av $pdir/$fname $USER@$host:$pdirelseecho $file Does Not Exists!fidone
done

5)修改bigdata02和bigdata03的broker.id

分别在 bigdata02 和 bigdata03上修改配置文件/opt/installs/kafka/config/server.properties 中的 broker.id=1、broker.id=2

注:broker.id 不得重复,整个集群中唯一。

6)配置环境变量

(1)修改bigdata01的环境变量,增加如下内容:

#KAFKA_HOME export KAFKA_HOME=/opt/installs/kafka3
export PATH=$PATH:$KAFKA_HOME/bin分发一下:
xsync.sh /etc/profile

(2)刷新一下环境变量

 xcall.sh source /etc/profile 

7)启动集群

(1)先启动 Zookeeper 集群,然后启动 Kafka。 【如果启动过就跳过】

xcall.sh zkServer.sh start

(2)依次在 bigdata01、bigdata02、bigdata03 节点上启动 Kafka。

先进入到kafka3 这个文件夹中,在三台服务器上分别运行启动命令:
bin/kafka-server-start.sh -daemon config/server.properties

关闭集群 :

bin/kafka-server-stop.sh #每台服务器都要运行

1.1.3 集群kafka启停脚本

1)在/usr/local/sbin 目录下创建文件 kf.sh 脚本文件

vim kf.sh

2) 编写脚本

#! /bin/bash
case $1 in
"start"){for i in bigdata01 bigdata02 bigdata03doecho " --------启动 $i Kafka-------"ssh $i "source /etc/profile; /opt/installs/kafka3/bin/kafka-server-start.sh -daemon /opt/installs/kafka3/config/server.properties"done
};;
"stop"){for i in bigdata01 bigdata02 bigdata03doecho " --------停止 $i Kafka-------"ssh $i "source /etc/profile; /opt/installs/kafka3/bin/kafka-server-stop.sh"done
};;
esac

3)添加权限

chmod u+x kf.sh

4)使用脚本

kf.sh start #启动集群kafka
kf.sh stop #停止集群kafka

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止, Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

1.2 Kafka 命令行操作

1.2.1 主题命令行操作

1)查看操作主题命令参数:  bin/kafka-topics.sh

2)查看当前服务器中的所有 topic

(如果提示文件不存在可能是环境变量未配置,详细见1.1.2中的  6))

kafka-topics.sh --bootstrap-server bigdata01:9092 --list 

3)创建 first topic

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:

--topic 定义 topic 名

--replication-factor 定义副本数

--partitions 定义分区数

4)查看 first 主题的详情

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic first 

5)修改分区数

注意:分区数只能增加,不能减少

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 3

6)再次查看 first 主题的详情

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic first 

7)删除 topic(慎行)

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --delete --topic first

1.2.2 生产者命令行操作

1)查看操作生产者命令参数

bin/kafka-console-producer.sh

2)发送消息

bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic first>hello world>bigdata bigdata

1.2.3 消费者命令行操作

1)查看操作消费者命令参数

bin/kafka-console-consumer.sh

2)消费消息

(1)消费 first 主题中的数据。

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first 

(2)把主题中所有的数据都读取出来(包括历史数据)

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/469469.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wordpress实用功能A5资源网同款 隐藏下载框 支付框 需要登录才能查看隐藏的内容

实用功能 隐藏下载框 支付框 需要登录才能查看隐藏的内容, 个人网站防天朝申查实测有效 。 登录前,未登录: 登录后,已登录: 功能说明 该代码段的主要功能是隐藏支付框并为未 登录用户显示一条提示信息,告知他们需要…

SQL HAVING子句

SQL 是一种基于“面向集合”思想设计的语言。HAVING 子句是一个聚合函数,用于过滤分组结果。 1 实践 1.1 缺失的编号 图 连续编号记录表t_seq_record 需求:判断seq 列编号是否有缺失。 SELECT 存在缺失的编号 AS res FROM t_seq_record HAVING COUN…

TCP可靠连接的建立和释放,TCP报文段的格式,UDP简单介绍

TCP连接的建立(三次握手) 建立连接使用的三报文 SYN 报文仅用于 TCP 三次握手中的第一个和第二个报文(SYN 和 SYN-ACK),用于初始化连接的序列号。数据传输阶段不再使用 SYN 标志。 SYN 报文通常只携带连接请求信息&a…

【量化交易笔记】14.模拟盘效果

说明 距离上一篇的量化文章有一段时间,应小伙伴要求,继续写下去,我思考了一下,内容有很多,绝大多数是研究的过程,并且走的是弯路,分享了怕影响大伙,之前因为行情不好,研…

FPGA实现以太网(二)、初始化和配置PHY芯片

系列文章目录 FPGA实现以太网(一)、以太网基础知识 文章目录 系列文章目录一、MDIO协议介绍二、PHY芯片管脚以及结构框图三、MDIO帧时序介绍3.1 MDIO帧格式3.2 MDIO写时序3.3 MDIO读时序 四、PHY芯片常用寄存器描述4.1 基本模式控制寄存器(0…

【韩老师零基础30天学会Java 】06章 数组、排序和查找

第六章 数组、排序和查找 1. 数组🚩🚩 数组介绍: 数组可以存放多个同一类型的数据。数组也是一种数据类型,是引用类型。即:数组就是一组数据。 示例: double [] hens{3,5,1,3,4,2,50,7.8,88.8,1.1,5}; double totalWe…

基于Zynq FPGA对雷龙SD NAND的测试

文章目录 SD NAND特征SD卡简介1.2 SD卡块图 SD卡样片Zynq测试平台搭建测试流程SOC搭建软件搭建 测试结果总结 SD NAND特征 SD卡简介 雷龙的SD NAND有很多型号,在测试中使用的是CSNP4GCR01-AMW与CSNP32GCR01-AOW。芯片是基于 NAND FLASH 和 SD控制器实现的SD卡。具…

在Linux上部署(MySQL Redis Elasticsearch等)各类软件

实战章节:在Linux上部署各类软件 前言 为什么学习各类软件在Linux上的部署 在前面,我们学习了许多的Linux命令和高级技巧,这些知识点比较零散,同学们跟随着课程的内容进行练习虽然可以基础掌握这些命令和技巧的使用&#xff0c…

电脑不显示wifi列表怎么办?电脑不显示WiF列表的解决办法

有用户会遇到电脑总是不显示wifi列表的问题,但是不知道要怎么解决。随着无线网络的普及和使用,电脑无法显示WiFi列表的问题有时会让人感到困扰。电脑不显示WiFi列表是很常见的问题,但这并不意味着你无法连接到网络。不用担心,这个…

Android中Activity启动的模式

在 Android 开发中,Activity 的启动模式(Launch Mode)定义了当启动一个 Activity 时,系统会如何处理它的实例。不同的启动模式可以影响 Activity 在任务栈中的管理方式,对用户的使用体验产生直接影响。下面详细介绍四种…

Xshell 7 偏好设置

1 Xshell7 工具——更改用户数据文件夹 就是此电脑目录下的文档 该目录下的7 Xshell下的 applog ColorScheme Files 配色方案文件目录 HighlightSet Files 突出显示集目录 Logs 日志 QuickButton Files 快速命令集 Scripts 脚本文件 Sessions 会话文件 会话文件目录就…

丹摩征文活动 | 丹摩智算:大数据治理的智慧引擎与实践探索

丹摩DAMODEL|让AI开发更简单!算力租赁上丹摩! 目录 一、引言 二、大数据治理的挑战与重要性 (一)数据质量问题 (二)数据安全威胁 (三)数据管理复杂性 三、丹摩智算…

企业级容器技术docker之一键生成 Docker Compose

案例: 一键生成 Docker Compose 利用网站将docker 命令自动生成 Docker Compse Composerizehttps://www.composerize.com/ 基于docker-compose编译多服务镜像并启动容器案例 输入docker命令就可以自动转换为 docker-compose的格式

C++《stack与queue》

在之前的章节我们学习了C当中string、vector和list三种容器并且试着模拟实现这三种容器,那么接下来在本篇当中我们将STL当中的stack和queue,并且在学习stack和queue的使用之后和之前一样还会试着模拟实现stck和queue。由于stck和queue的模拟实现较为简单…

【Linux】常用命令(2.6万字汇总)

文章目录 Linux常用命令汇总1. 基础知识1.1. Linux系统命令行的含义1.2. 命令的组成 2. 基础知识2.1. 关闭系统2.2. 关闭重启2.3. 帮助命令(help)2.4. 命令说明书(man)2.5. 切换用户(su)2.6.历史指令 3.目录…

Selenium+Pytest自动化测试框架 ------ 禅道实战

前言 有人问我登录携带登录的测试框架该怎么处理,今天就对框架做一点小升级吧,加入登录的测试功能。 选用的测试网址为我电脑本地搭建的禅道 更改了以下的一些文件,框架为原文章框架主体 conftest.py更改 conftest.py #!/usr/bin/env python3 # -*…

java---认识异常(详解)

还有大家来到权权的博客~欢迎大家对我的博客提出意见哦,有错误会及时改进的~点击进入我的博客主页 目录 一、异常的概念及体系结构1.1 异常的概念1.2 异常的体系结构1.3异常的分类 二、异常的处理2.1防御式编程2.2 异常的抛出2.3 异常的捕获2.3.1异常声明throws2.3.…

鸿蒙多线程开发——并发模型对比(Actor与内存共享)

1、概 述 并发是指在同一时间段内,能够处理多个任务的能力。为了提升应用的响应速度与帧率,以及防止耗时任务对主线程的干扰,HarmonyOS系统提供了异步并发和多线程并发两种处理策略。 异步并发:指异步代码在执行到一定程度后会被…

Axure是什么软件?全方位解读助力设计入门

在产品设计和开发领域,Axure是一款大名鼎鼎且功能强大的软件,它为专业人士和团队提供了卓越的设计支持,帮助他们将创意转化为实际可操作的产品原型。 一、Axure 的基本介绍 Axure是一款专业的原型设计工具,主要用于创建交互式的…

客户手机号收集小程序有什么用

客户手机号收集小程序具有多方面的重要作用,主要体现在以下几个领域: 商业营销与客户关系管理 精准营销:通过收集客户手机号,企业能够依据客户的消费行为、偏好等信息,进行精准的个性化营销。例如,电商企业…