05、Kafka 操作命令

05、Kafka 操作命令

1、主题命令

(1)创建主题

kafka-topics.sh --create --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 4 --replication-factor 3

–bootstrap-server:设置kafka执行节点

–topic:主题名称

–partitions:设置分区数,可以用于并发消费。

–replication-factor:设置副本因子,数量不能大于kafka节点数。

(2)查看主题

kafka-topics.sh --list --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

image-20240509122153192

(3)在我们配置的文件夹下,

/usr/local/kafka_2.12-3.7.0/data

就可以看到topic对应的文件夹,其中 0,1,2,3是因为我们指定的partitions为4,创建了4个分区。

image-20240509160712720

在其他两台机器上,也同样有这三个文件夹,是因为我们的replication-factor 为3,表示会有三个副本,刚好对应三台机器。

(3)查询主题描述信息

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

image-20240509161337423

ReplicationFactor 表示副本数为3

Partition:表示当前的分区号

Replicas:表示副本存放的机器

Leader:表示三个副本中的leader

Isr:表示当前可用的机器id

(4)修改主题

分区数partitions可以修改,只能比原来的大。

replication-factor 一旦确定就不能修改了。

kafka-topics.sh --alter --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 6

image-20240509162034581

(5)删除主题

kafka-topics.sh --delete --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test

image-20240509165305344

2、生产消息

给test1主题,发送消息

[root@localhost bin]# kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

如果给一个不存在的主题,发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2
>message1

image-20240509171549191

第一个消息发送后会有警告,后续发送消息没有警告,因为会自动创建topic,并且指定

partitions 和 replication-factor 都为1。

kafka-topics.sh --describe --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test2

image-20240509171756305

但是,建议 topic 还是手动创建,应该 partitions 是可以修改的,但是 replication-factor

是不允许修改的。

3、消费消息

(1)消费最新数据

消费 test1 主题下的消息:

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1

我们再启动一个生产者来发送消息:

kafka-console-producer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1
>hello world

image-20240509172354876

此时查看我们的消费者,就会打印接受到的消息:

image-20240509172430840

上面消费者,只能消费最新的数据,无法消费历史数据。

(2)消费历史数据

消费 test1 主题下的历史消息:

  • –from-beginning 表示从头开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --from-beginning

image-20240509173256632

  • –offset earliest --partition 1 表示从1号分区的头部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset earliest --partition 1

image-20240509173418365

  • –offset latest --partition 1 表示从1号分区的尾部开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset latest --partition 1

image-20240509173610825

  • –offset 2 --partition 1 从1号分区的offset 为2的位置开始消费:
kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --offset 2 --partition 1

image-20240509173747666

4、消费者组

消费者组其实就是一个容器,可以容纳若干个消费者,每一个消费者必须被包含在一个消费者组里面。

(1)通过 --group 来指定消费者组。

kafka-console-consumer.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --group my-group1

如果分组存在,则会将消费者添加到对应分组下,如果不存在,则会创建消费者组。

(2)查看已有的消费者组

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --list

image-20240509174505505

(3)删除消费者组

删除消费者组,必须要先保证消费者组下没有消费者:

kafka-consumer-groups.sh --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --delete --group my-group1

image-20240509174700845

(4)查看消费的位置

kafka-consumer-groups.sh --describe  --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --group my-group1

image-20240509175128763

PARTITION:表示分区号

CURRENT-OFFSET:当前消费到的offset下标

LOG-END-OFFSET:当前分区最新的offset下标

LAG:是偏移量,未消费的数量。

消费详情:

kafka消费者在消费数据的时候,都是分组消费的,不同的消费者组之间没有影响,都会去消费。在同一个组内消费,一条消息只能被一个消费者消费,同时一个分区数据只能被一个消费者消费,如果消费者数量大于分区数,则多余出来的消费者永远不会消费消息。如果分区数大于消费者,则会均匀的将分区分配给多个消费者。

image-20240510105221038

因此,kafka 的 topic 的 partition 个数代表是 kafka 的 topic 的并行度,同一时间最多可以有多个线程来消费 topic 的数据,所以如果要提高 kafka 的 topic 的消费能力,应该增大 partition 的个数。

5、手动平衡 leader:

# 使用的脚本是:kafka-leader-election.sh
# 必要的参数:
# --bootstrap-server:指定服务器列表
# --election-type:选举的类型,默认选择preferred即可
# 下面的三个参数三选一:
# --all-topic-partitions:平衡所有的主题、所有的分区
# --topic:平衡指定的主题,如果选择这个参数,则必须使用--partition指定分区
# --path-to-json-file:将需要平衡的主题、分区信息写入一个json文件,指定这个文件
#     json的格式: {"partitions": [{"topic": "test", "partition": 1}, {"topic": "test", "partition": 2}]}[root@qianfeng01 data]$ kafka-leader-election.sh \
--bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 \
--election-type preferred \
--all-topic-partitions

6、kafka 自带的压力测试工具

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。

一般都是网络IO达到瓶颈。

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

(1)生产者Producer压测

kafka-producer-perf-test.sh \
--topic test \
--record-size 100 \
--num-records 100000 \
--throughput -1 \
--producer-props bootstrap.servers=192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092

record-size:一条信息有多大,单位字节

num-records:总共发送多少条信息

throughput:每秒多少条信息,设置成-1,表示不限流,可测生产者最大吞吐量

producer-props:发送端端消息配置

结果:

100000 records sent, 27495.188342 records/sec (2.62 MB/sec), 1461.75 ms avg latency, 2183.00 ms max latency, 1696 ms 50th, 2103 ms 95th, 2177 ms 99th, 2181 ms 99.9th.

解析:

一共写入10万条消息

吞吐量为2.62 MB/sec

每次写入的平均延迟为1461.75ms

最大延迟2183.00 ms

(2)消费者 Consumer 压力测试

consumer测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能

kafka-consumer-perf-test.sh \
--broker-list hadoop100:9092 \
--topic test \
--fetch-size 10000 \
--messages 10000000 \
--threads 1

broker-list:节点地址

topic:指定topic名称

fetch-size:指定每个fetch的数据大小

messages:总共要消费的消息个数

结果:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec2020-06-27 13:17:57:490, 2020-06-27 13:18:11:751, 20.0272, 1.4043, 210000, 14725.4751, 1593235077858, -1593235063597, -0.0000, -0.0001

解释:

开始时间

结束时间

共消费数据:20.0272M

吞吐量:1.4043MB/s

共消费数据:210000条

平均每秒消费:14725.4751条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/324307.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

互动科技如何强化法治教育基地体验?

近年来,多媒体互动技术正日益融入我们生活的各个角落,法治教育领域亦不例外。步入法治教育基地,我们不难发现,众多创新的多媒体互动装置如雨后春笋般涌现,这些装置凭借前沿的科技手段,不仅极大地丰富了法制…

Capl中的运算符

Capl中的运算符类似于C语言。由于capl中没有指针的概念,所以没有指针取值,取地址等运算符。 Capl中的运算符优先级同C语言一样,同样小括号可以 提升优先级。 1.算数运算符 整数类型之间的数据进行除法运算,结果一定是整数。如果…

双目相机标定流程(MATLAB)

一:经典标定方法 1.1OPENCV 1.2ROS ROS进行双目视觉标定可以得到左右两个相机的相机矩阵和畸变系数,如果是单目标定,用ROS会非常方便。 3.MATLAB标定(双目标定) MATLAB用来双目标定会非常方便,主要是为…

SparkSQL编程入口和模型与SparkSQL基本编程

SparkSQL编程入口和模型 SparkSQL编程模型 主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。 1)SQL:SQL不用多说,就和Hive操作一样,但是需要清楚一点的是,SQL操作的是表&#xf…

【北京迅为】《iTOP-3588开发板从零搭建ubuntu环境手册》-第2章 获取并安装Ubuntu操作系统

RK3588是一款低功耗、高性能的处理器,适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用,RK3588支持8K视频编解码,内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP&…

网络编程套接字 (二)---udosocket

本专栏内容为:Linux学习专栏,分为系统和网络两部分。 通过本专栏的深入学习,你可以了解并掌握Linux。 💓博主csdn个人主页:小小unicorn ⏩专栏分类:网络 🚚代码仓库:小小unicorn的代…

力扣每日一题-统计已测试设备-2024.5.10

力扣题目:统计已测试设备 题目链接: 2960.统计已测试设备 题目描述 代码思路 根据题目内容,第一感是根据题目模拟整个过程,在每一步中修改所有设备的电量百分比。但稍加思索,发现可以利用已测试设备的数量作为需要减少的设备电…

前端组件库图片上传时候做自定义裁剪操作

不论是vue还是react项目,我们在使用antd组件库做上传图片的时候,有一个上传图片裁剪的功能,但是这个功能默认是只支持1:1的裁剪操作,如何做到自定义的裁剪操作?比如显示宽高比?是否可以缩放和旋转操作&…

Leetcode—239. 滑动窗口最大值【困难】

2024每日刷题&#xff08;132&#xff09; Leetcode—239. 滑动窗口最大值 算法思想 用vector会超时的&#xff0c;用deque最好&#xff01; 实现代码 class Solution { public:vector<int> maxSlidingWindow(vector<int>& nums, int k) {deque<int> …

学习笔记:【QC】Android Q qmi扩展nvReadItem/nvWriteItem

一、qmi初始化 流程图 初始化流程: 1、主入口&#xff1a; vendor/qcom/proprietary/qcril-hal/qcrild/qcrild/rild.c int main(int argc, char **argv) { const RIL_RadioFunctions *(*rilInit)(const struct RIL_Env *, int, char **); rilInit RIL_Init; funcs rilInit…

react18【系列实用教程】JSX (2024最新版)

为什么要用 JSX&#xff1f; JSX 给 HTML 赋予了 JS 的编程能力 JSX 的本质 JSX 是 JavaScript 的语法扩展&#xff0c;浏览器本身不能识别&#xff0c;需要通过解析工具&#xff08;如babel&#xff09;解析之后才能在浏览器中运行。 bable 官网可以查看解析过程 JSX 的语法 …

【Java orm 框架比较】十 新增hammer_sql_db 框架对比

迁移到&#xff08;https://gitee.com/wujiawei1207537021/spring-orm-integration-compare&#xff09; orm框架使用性能比较 比较mybatis-plus、lazy、sqltoy、mybatis-flex、easy-query、mybatis-mp、jpa、dbvisitor、beetlsql、dream_orm、wood、hammer_sql_db 操作数据 …

【MySQL】SQL基本知识点DDL(1)

目录 1.SQL分类&#xff1a; 2.DDL-数据库操作 3.DDL-表操作-创建 4.DDL-表操作-查询 5.DDL-表操作-数据类型 6.DDL-表操作-修改 1.SQL分类&#xff1a; 2.DDL-数据库操作 3.DDL-表操作-创建 注意&#xff1a;里面的符号全部要切换为英文状态 4.DDL-表操作-查询 5.DDL…

Django开发实战之定制管理后台界面及知识梳理(中)

上一篇文章末尾讲到如何能够展示更多的字段在界面上&#xff0c;那么针对整个界面数据&#xff0c;如果我想按照某一个条件进行筛选&#xff0c;我该怎么做呢&#xff0c;只需要加上下面一行代码 注意&#xff1a;中途只有代码片段&#xff0c;文末有今天涉及的所有代码 1、增…

【Python可视化】pyecharts

Echarts 是一个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可。而 Python 是一门富有表达力的语言&#xff0c;很适合用于数据处理。当数据分析遇上数据可视化时&#xff0c;pyecharts 诞生了。 需要安…

Lazada、Shopee测评自养号,快速出单技巧全解析!

每个人都憧憬着自己的店铺能够拥有一款或多款引人注目的热销商品&#xff0c;这些商品不仅能为店铺带来可观的收益&#xff0c;更重要的是它们能够成为吸引顾客的强大磁石&#xff0c;显著提升店铺的整体流量。一旦这样的爆款商品成功吸引顾客&#xff0c;其他产品也将随之受到…

PHP 框架安全:ThinkPHP 序列 漏洞测试.

什么是 ThinkPHP 框架. ThinkPHP 是一个流行的国内 PHP 框架&#xff0c;它提供了一套完整的安全措施来帮助开发者构建安全可靠的 web 应用程序。ThinkPHP 本身不断更新和改进&#xff0c;以应对新的安全威胁和漏洞。 目录&#xff1a; 什么是 ThinkPHP 框架. ThinkPHP 框架…

ASP.NET学生成绩管理系统

摘要 本系统依据开发要求主要应用于教育系统&#xff0c;完成对日常的教育工作中学生成绩档案的数字化管理。开发本系统可使学院教职员工减轻工作压力&#xff0c;比较系统地对教务、教学上的各项服务和信息进行管理&#xff0c;同时&#xff0c;可以减少劳动力的使用&#xf…

uniapp、web网页跨站数据交互及通讯

来来来&#xff0c;说说你的创作灵感&#xff01;这就跟吃饭睡觉一样&#xff0c;饿了就找吃的&#xff0c;渴了就倒水张口灌。 最近一个多月实在是忙的没再更新日志&#xff0c;好多粉丝私信说之前的创作于他们而言非常有用&#xff01;受益菲浅&#xff0c;这里非常感谢粉丝…

阿里云ECS服务器实例挂载数据盘步骤(磁盘自动挂载.、访问挂载点)

阿里云ECS服务器实例挂载数据盘步骤 1.磁盘自动挂载 首先登录阿里云ECS服务器&#xff0c;通过 df -h 命令查看当前磁盘挂载情况 通过 fdisk -l 命令查看磁盘情况&#xff0c;可以发现有两个盘&#xff1a; 系统盘 /dev/vda: 60GB&#xff0c; 数据盘 /dev/vdb: 500GB 使用…