kafka详解(三)

2.2 Kafka命令行操作

在这里插入图片描述

2.2.1 主题命令行操作

1)查看操作主题命令参数

[aa kafka]$ bin/kafka-topics.sh

在这里插入图片描述
2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/)

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3)创建first topic
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 3 --replication-factor 3
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数

4)查看first主题的详情

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first	TopicId: 3pIfoppvRmq84FjACWzAgw	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: first	Partition: 0	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102Topic: first	Partition: 1	Leader: 103	Replicas: 103,102,104	Isr: 103,102,104Topic: first	Partition: 2	Leader: 102	Replicas: 102,104,103	Isr: 102,104,103
[aa ~]$

5)修改分区数( 注意:分区数只能增加,不能减少,如果减少会报错!

[a kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 4
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first	TopicId: 3pIfoppvRmq84FjACWzAgw	PartitionCount: 4	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: first	Partition: 0	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102Topic: first	Partition: 1	Leader: 103	Replicas: 103,102,104	Isr: 103,102,104Topic: first	Partition: 2	Leader: 102	Replicas: 102,104,103	Isr: 102,104,103Topic: first	Partition: 3	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102 
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 2
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 2.
[2023-09-13 19:22:16,891] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.(kafka.admin.TopicCommand$)
[aa ~]$

6)再次查看first主题的详情

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

7)删除topic

[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --delete
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list[aa ~]$

2.2.2 生产者命令行操作

1)查看操作生产者命令参数
[aa kafka]$ bin/kafka-console-producer.sh

在这里插入图片描述
2)发送消息

[aa kafka]$ kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>111
>222
>333
>

2.2.3 消费者命令行操作

[aa kafka]$ bin/kafka-console-consumer.sh

在这里插入图片描述

2)消费消息
[aa kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group test --from-beginning
111
222
333
还可以动态的生产和消费,比如102机器上输入
>444
103机器就会自动在结尾弹出
111
222
333
444

Kafka生产者

生产者消息发送流程

3.1.1 发送原理

Kafka的producer发送消息采用的是异步发送的方式
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程(两个线程是异步!),以及一个线程共享变量:RecordAccumulator。

  1. 在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator。
  2. Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
    在这里插入图片描述
    main线程将外部数据包装成kafka要求的格式ProducerRecord,类似于Flume中的Event.网络中进行数据传输都会序列化(kryo框架)。
    分区策略:涉及到生产者和消费者。生产者分区针对的是数据,消费者分区针对是分区怎么消费数据
    。RecordAccumulator(一种堆内存缓冲区)达到两种标准之后就唤醒Sender进行发送!bachsize(同一个队列中,两个时间非常紧密的数据可以形成一个bachsize)一般就是数据洪峰的时候;linger.ms就是在数据量非常小的时候;默认值0代表来一条发一条;
    sender发送也是异步发送,sender将RecordAccumulator中的数据包装成Request(一个批次包装成一个Request),sender发送Request1之后不等待响应就发送Request2,然后不等待响应就发送Request2,…Request5,Request6必须排队了。
    sender发送过去的数据在Leader中应该是先存在线程对应的内存中,还没等到磁盘中存储数据落盘的一个时间点决定是不是回复ack为0,此时就是不安全,时延最低!。为1的时候就是数据落盘之后再发送ack,此时数据安全性有所提高,稍慢!注意此时的fllower还没有数据!完全保证数据安全,Leader和follwer都罗盘,回复-1
    发送成功:清理网络客户端请求Request
    线程共享变量中RecordAccumulator清理数据,因为只有32M。
    发送失败:重试次数----int的最大值
    Selector是负责决定将数据发送到集群的哪个分区!
    注意:
    中间涉及到数据的发送和拉取都是异步的!main线程放数据和sender拉取数据并发送两个过程异步!
    一个队列只能发送到最右边的集群中的一个分区,假如有两个toptic,5个分区,就需要创建5个双端队列,队列内部才能形成批次(bachsize),所以只能发到一个分区!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/159076.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux gcc和make学习

文章目录 GCCgcc的安装gcc的工作流程 makefilemakefile的规则工作原理自动生成makefile的变量自定义变量预定义变量自动变量 模式匹配函数wildcard函数patsubst函数 伪声明 GCC gcc全程是(GNU compiler collection CNU编译器套件),是由GNU开发…

想要精通算法和SQL的成长之路 - 分割数组的最大值

想要精通算法和SQL的成长之路 - 分割数组的最大值 前言一. 分割数组的最大值1.1 二分法 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 分割数组的最大值 原题链接 首先面对这个题目,我们可以捕获几个关键词: 非负整数。非空连续子数组。 那么我…

线性排序:如何根据年龄给100万用户数据排序?

文章来源于极客时间前google工程师−王争专栏。 桶排序、计数排序、基数排序时间复杂度是O(n),所以这类排序算法叫作线性排序。 线性的原因:三个算法是非基于比较的排序算法,都不涉及元素之间的比较操作。 三种排序对排序的数据要求苛刻&am…

CCF CSP认证 历年题目自练Day30

题目一 试题编号: 202203-1 试题名称: 未初始化警告 时间限制: 1.0s 内存限制: 512.0MB 问题描述: 题目背景 一个未经初始化的变量,里面存储的值可能是任意的。因此直接使用未初始化的变量,比…

太强了,真的太强了!

国庆之后gpt4上线了很多强大的功能,有超级强大的数据分析和挖掘的功能,有可以比肩AI绘图神器Midjourney的绘图功能(前面写了一篇泰酷辣!目前最强的AI绘画神器!文生图模型 DALLE 3来啦!)&#xf…

Python正则表达式

正则表达式 当处理文本数据时,正则表达式是一种强大的工具,它允许我们根据特定的模式来匹配、搜索和处理字符串。 正则表达式由一系列字符和特殊字符组成,用于描述文本模式。这些模式可以包含普通字符(如字母、数字和标点符号&a…

【TensorFlow2 之012】TF2.0 中的 TF 迁移学习

#012 TensorFlow 2.0 中的 TF 迁移学习 一、说明 在这篇文章中,我们将展示如何在不从头开始构建计算机视觉模型的情况下构建它。迁移学习背后的想法是,在大型数据集上训练的神经网络可以将其知识应用于以前从未见过的数据集。也就是说,为什么…

蓝桥杯 第 1 场算法双周赛 第1题 三带一 c++ map 巧解 加注释

题目 三带一【算法赛】https://www.lanqiao.cn/problems/5127/learning/?contest_id144 问题描述 小蓝和小桥玩斗地主,小蓝只剩四张牌了,他想知道是否是“三带一”牌型。 所谓“三带一”牌型,即四张手牌中,有三张牌一样&#…

CSS学习基础知识

CSS学习笔记 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width,…

独立式三相无源逆变电源设计

摘要 面对全球日趋严重的能源危机问题&#xff0c;可再生能源的开发和利用得到了人们的高度重视。其中辐射到地球太阳能资源是十分富饶的&#xff0c;绿色清洁的太阳能不会危害我们的生存环境&#xff0c;因而受到了人们的广泛利用。光伏发电作为可再生能源被广泛的应用&#x…

RabbitMq启用TLS

Windows环境 查看配置文件的位置 选择使用的节点 查看当前节点配置文件的配置 配置TLS 将证书放到同配置相同目录中 编辑配置文件添加TLS相关配置 [{ssl, [{versions, [tlsv1.2]}]},{rabbit, [{ssl_listeners, [5671]},{ssl_options, [{cacertfile,"C:/Users/17126…

如何定制化跑腿小程序源码

跑腿小程序源码为您提供了一个强大的起点&#xff0c;但要创建一个成功的本地服务平台&#xff0c;您通常需要对源码进行定制化。这篇文章将介绍如何定制化跑腿小程序源码&#xff0c;包括添加新功能、修改界面和优化用户体验。 选择合适的跑腿小程序源码 首先&#xff0c;您…

Linux查看端口号及进程信息

Linux查看端口号及进程 Linux查看端口号 netstat netstat -tuln显示当前正在监听的端口号以及相关的进程信息 ss ss -tuln与netstat类似&#xff0c;ss也可以用于显示当前监听的端口以及相关信息 isof isof -i :端口号端口号替换为具体要查找的端口号&#xff0c;显示该端…

Leetcode 75——1768.交替合并字符串 解题思路与具体代码【C++】

一、题目描述与要求 1768. 交替合并字符串 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给你两个字符串 word1 和 word2 。请你从 word1 开始&#xff0c;通过交替添加字母来合并字符串。如果一个字符串比另一个字符串长&#xff0c;就将多出来的字母追加到合并后字符…

DOSBox和MASM汇编开发环境搭建

DOSBox和MASM汇编开发环境搭建 1 安装DOSBox2 安装MASM3 编译测试代码4 运行测试代码5 调试测试代码 本文属于《 X86指令基础系列教程》之一&#xff0c;欢迎查看其它文章。 1 安装DOSBox 下载DOSBox和MASM&#xff1a;https://download.csdn.net/download/u011832525/884180…

MySQL操作合集

数据库的操作 创建数据库 create database [if not exists] db_name [character set utf8] [collate utf8_general_ci];查看所有数据库 show databases;查看数据库的创建语句 show create database db_name;修改数据库 alter database db_name character set utf8 colla…

Linux之open/close/read/write/lseek记录

一、文件权限 这里不做过多描述&#xff0c;只是简单的记录&#xff0c;因为下面的命令会涉及到。linux下一切皆是文件包括文本、硬件设备、管道、数据库、socket等。通过ls -l 命令可以查看到以下信息 drwxrwxrwx 1 root root 0 Oct 10 17:06 open -rwxrwxrwx 1 root roo…

内网渗透——隧道代理

文章目录 代理代理使用场景VPS建立隧道frpMSF木马生成监听开启frp服务端和客户端执行exe木马文件 代理 实验环境&#xff1a; 攻击机kali&#xff1a;192.168.188.133&#xff08;NAT模式&#xff09; 模拟的公网服务器&#xff08;本机&#xff09;&#xff1a;10.9.75.239 …

【数据库——MySQL(实战项目1)】(4)图书借阅系统——触发器

目录 1. 简述2. 功能代码2.1 创建两个触发器&#xff0c;分别在借出或归还图书时&#xff0c;修改借阅人表中的已借数目(附加&#xff1a;借阅人表的总借书数、图书表的借阅次数以及更新图书表的图书状态为(已借出/在架上))字段&#xff1b;2.2 创建触发器&#xff0c;当借阅者…

相似性搜索:第 3 部分--混合倒排文件索引和产品量化

接续前文&#xff1a;相似性搜索&#xff1a;第 2 部分&#xff1a;产品量化 SImilarity 搜索是一个问题&#xff0c;给定一个查询的目标是在所有数据库文档中找到与其最相似的文档。 一、介绍 在数据科学中&#xff0c;相似性搜索经常出现在NLP领域&#xff0c;搜索引擎或推…