大数据课程I2——Kafka的架构

文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

 ▲ 本章节目的

⚪ 掌握Kafka的架构;

⚪ 掌握Kafka的Topic与Partition;

一、Kafka核心概念及操作

 

1. producer生产者,可以是一个测试线程,也可以是某种技术框架(比如flume)。

2. producer向kafka生产数据,必须指定向哪个主题去生产数据。

3. 主题topic,主题是由用户(程序员)自己来创建的。

4. 创建主题的指令:

sh kafka-topics.sh --create ---zookeeper hadoop01:2181

--replication-factor 1 --partitions 1 --topic enbook

5. 查看kafka集群的所有主题:

sh kafka-topics.sh --list --zookeeper hadoop01:2181

6. 创建一个主题,需要指定:

①主题名

②主题的分区数量

③分区的副本数量

7. 主题的分区:本质上就是一个分区文件目录。

分区目录的命名规则:主题名 - 分区编号(分区编号从0开始)。

思考:kafka主题引入分区机制的作用?

回答:可以分布式的对一个主题的数据进行存储和管理。

补充:主题的分区数量可以远大于kafka broker 服务器数量。kafka底层尽可能确保分区目录的负载均衡。比如:一个主题有10个分区,有3个broker服务器,则分区目录的数量分配:3-3-4。

8. 启动一个生产者线程。

sh kafka-console-producer.sh --broker-list

hadoop01:9092, hadoop02:9092, hadoop03:9092 --topic enbook

9. producer向kafka指定的主题生产数据,数据最终是存到了分区目录下的log文件中。此外kafka底层会确保每个分区目录的数据达到负载均衡的效果(轮询发送给每个分区目录)。

10. Kafka支持数据的容错机制,即分区数据丢失后,可以恢复。通过副本冗余机制来实现的。即我们在创建主题时,可以指定每个分区有多个副本。

补充:如果出现kafka创建主题分区异常。主要检查zookeeper状态。

sh /home/software/zookeeper-3.4.8/bin/zkServer.sh status

如果报错:Error Start,查看zookeeper.out文件。

11. 分区的副本机制,是为了数据容错。

sh kafka-topics.sh --create --zookeeper hadoop01:2181

--replication-factor 2 --partitions 1 --topic frbook

补充:

①分区副本的数量不能broker服务器数量。

②分区的副本数不宜过多。副本数量越多,集群磁盘的利用率越低。比如3副本,集群利用率就只有33%。

在实际生产环境下,一般3个副本足够了,2个副本也可以。如果是1个副本则没有容错机制,所有一般需要2个或3个副本即可。

综上,Kafka最基础和核心的概念就是topic主题。因为无论是向kafka生产数据,还是从kafka消费数据,都需要指定主题。

主题topic的属性:

①主题名 topic name

②分区数量 partition

③分区的副本数量 replication

12. 从kafka消费数据,消费者可以是一个测试线程,也可以是某种技术框架(Spark,Flink)。

sh kafka-console-consumer.sh --zookeeper hadoop01:2181

--topic enbook --from-beginning

13. kafka的特点:kafka的数据无论消费与否,会一直存在,不会删除。

14. 在调用kafka相关指令时,如果涉及到zookeeper的,写一台即可。如果涉及kafka的,有几个写几个。

二、Kafka主题分区的副本相关补充

 1. kafka是有Leader和Follower概念的,注意:是针对分区的副本而言的,不是针对broker来说的。kafka集群,broker没有主从之分。

2. 分区的副本Leader的作用:无论是生产数据还是消费数据,都是和分区副本的Leader交互的。

3. 如果分区副本的Leader挂掉了,Kafka会从剩余的Follower中选出新Leader。

三、Kafka架构

1. Kafka拓扑结构

1.producer:

消息生产者,发布消息到 kafka 集群的终端或服务。

2.broker:

kafka 集群中包含的服务器。broker (经纪人,消费转发服务)

3.topic:

每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。

4.partition:

partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。

5.consumer:

从 kafka 集群中消费消息的终端或服务。以一个测试线程也可以是某种技术框架(Spark和Flink)。

6.Consumer group:

high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。

即组间数据是共享的,组内数据是竞争的。

7.replica:

partition 的副本,保障 partition 的高可用。

副本数量不宜过多,因为降低进群磁盘的利用率。

比如3副本,磁盘利用率1/3.

8.leader:

replica 中的一个角色, producer 和 consumer 只跟 leader 交互。

9.follower:

replica 中的一个角色,从 leader 中复制数据。

10.controller:

kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。

11.zookeeper:

kafka 通过 zookeeper 来存储集群的 meta 信息。负责管理和监控Kafka集群运行(临时节点+监听机制),包括存储一些元数据信息(比如主题名,主题的分区数,分区的副本数,副本Leader的位置信息,Controller位置等)。

2. 核心APIs

Producer API:也叫做生产者API,应用程序通过这些API可以发布记录的数据流到一个或多个topic上。

Consumer API:消费者API,应用程序可以通过消费者API订阅一个或多个topic上的记录,并对这些记录进行处理。

Streams API:通过调用Streams API,应用程序可以对一个或多个Topic上的记录进行转换处理,同时将处理后的记录发布到一个或多个Topic上。

Connector API:通过Connector API可以构建和运行可重用的生产者和消费者,能够把topic连接到现有的应用程序或数据库。比如,一个数据库连接器可以捕获每个表的变化。

kafka客户端与服务端的通信是基于一个简单的、高性能的、语言无关的TCP协议。这个协议是向下兼容的。我们提供了一个java语言的客户端,并且还有很多语言版本的客户端。

3. Kafka集群结构

分布式(distribution)

Partition分布在kafka集群的服务器上,每个分区配置一定数量的副本在集群服务器上提供容错能力,每个服务器会共享分区进行数据请求的处理。

每个分区都有一个对应的“leader”服务器,同时具有0个或多个“followers”服务器。leader服务器处理分区上的所有读写请求,follower服务器被动的复制leader服务器上的数据。如贵leader失败了,其中一个follower将自动变成leader。每个服务器都会从当分布在其上的分区的某些分区的领导则,而其他分区会在follower服务器上均匀分布。

生产者(producers)

生产者将数据发布到他选择的topic中,生产者负责指定将数据发送到topic中的那个partition上。可以通过简单的循环方式来实现负载均衡,也可以通过一些复杂的语义算法来实现(例如:根据记录中的一些Key)。

消费者(consumers)

每个消费者都隶属于某个消费群,每个发布到topic的记录都会被发送给一个订阅消费者组中的一个消费者实例上。消费则实例可以是一个单独的进程也可以是单独的一台机器。

如果所有的消费者实例都属于同一个消费者组,那么记录将被均匀的发送给消费者实例。

如果消费者实例在不同的消费群中,那么记录将被广播给所有的消费者进程。

如上图,两台服务器的kafka集群上分布四个partition(P0-P3),两个消费群,消费群A有两个消费者实例,消费群B有四个

四、Topic与Partition

1. 基础概念:

 

如上图所示,像一个主题生产数据。数据最终是存储到哥哥分区中。

分区从逻辑上来看,实际上是一个队列。

分区从物理上来看,实际上是一个分区目录。

向分区中存储数据,最终是存到的分区目录下的log文件中。

 

底层实际上是数据磁盘的顺序写操作(往文件末尾追加),所以Kafka的写入性能较高。

如上图所示,从分区中消费数据。Kafka底层有一个offset机制。

Kafka会记录消费者的offset(消费的位置偏移量),便于下一次从正确的位置进行消费。

Kafka记录offset分为两个版本:

旧版本:Kafka是将offset存到zookeeper上的。存在的问题是:会频繁的和zookeeper进行通信交互,即可能会为Zookeeper带来较高的访问负载。

新版本:Kafka自己来管理offset,以消费者组为单位进行管理。

2. 验证思路:

1. 启动一个消费者线程(需要带有消费者属性)

sh kafka-console-consumer.sh --bootstrap-server

hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic enbook

--from-beginning --new-consumer

2. 会发现多出一些目录

__consumer_offsets-11

__consumer_offsets-14

__consumer_offsets-17

3. 即Kafka底层是通过一个主题来进行管理的。

主题名是__consumer_offsets,分区是50个。

__consumer_offsets-0

__consumer_offsets-49

4. 某个消费者组的offset就存在这个50个分区目录中的其中一个中。

存储位置规则:groupId.hashcode%50= 取余结果就是对应的分区目录。

sh kafka-consumer-groups.sh --bootstrap-server

hadoop01:9092,hadoop02:9092,hadoop03:9092 --list --new-consumer

3. Topic和Partition的关系

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(主题)。

Partition

Parition是物理上的概念,每个Topic包含一个或多个Partition.。

Topic在逻辑上可以被认为是一个queue,每条消息都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。

为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,如下图所示。

因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/87026.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

web-xss

目录 一、简介 二、xss的攻击方式 三、xss 常见标签语句 a标签 img标签 iframe标签 audio标签 video标签 svg标签 button标签 div标签 object标签 script标签 p标签 input标签 details标签 select标签 form标签 body标签 四、xss 常见绕过 编码绕过 1.htm…

(5)所有角色数据分析页面的构建-5

所有角色数据分析页面,包括一个时间轴柱状图、六个散点图、六个柱状图(每个属性角色的生命值/防御力/攻击力的max与min的对比)。 """绘图""" from pyecharts.charts import Timeline from find_type import FindType import pandas …

模仿火星科技 基于cesium+角度测量+高度测量+可编辑

1. 创建提示窗: 启动Cesium应用,地图场景将打开,欢迎您进入编辑模式。 在屏幕的一角,一个友好的提示窗将呈现,随着您的操作,它会为您提供有用的信息和指导。 2. 绘制面积: 轻轻点击鼠标左键&a…

iOS- git对单个或者多个文件权限设置,使用pre-commit hook 和shell脚本,拦截校验

前提:最近,由于团队代码规范和安全问题,有一些文件只能是指定用户才能修改。 对比:调查了一下资料,发现好多人都在使用pre-commit技术。于是,就朝着这个方向去研究。于是抽空写了脚本,在提交的…

【golang】数组和切片底层原理

数组类型的值(以下简称数组)的长度是固定的,而切片类型的值(以下简称切片)是可变长的。 数组的长度在声明它的时候就必须给定,并且之后不会再改变。可以说,数组的长度是其类型的一部分。比如&a…

【C语言】扫雷 小游戏

文章目录 一、游戏规则二、 代码逻辑三、游戏实现1. 游戏菜单设计2.设计雷区并随机布置雷(1) 设置雷区(2) 布置雷 3.排查雷 四、源码 一、游戏规则 1. 在9*9的小格子中,任意选取一个坐标(格子),选择后发现,如果没点中雷…

Substack 如何在去中心化内容创作领域掀起波澜

面对数字内容广告化的困境,Substack回归做内容的初心,通过产品和平台双轮驱动,重塑一个去中心化的多元文化内容聚集地,实现了增长突破。其核心策略在于先使用简洁的创作工具赋能内容生产,进而通过平台的互动机制促进用…

图像处理技巧形态学滤波之膨胀操作

1. 引言 欢迎回来,我的图像处理爱好者们!今天,让我们继续研究图像处理领域中的形态学计算。在本篇中,我们将重点介绍腐蚀操作的反向效果膨胀操作。 闲话少说,我们直接开始吧! 2. 膨胀操作原理 膨胀操作…

【JVM】JVM中的分代回收

文章目录 分代收集算法什么是分代分代收集算法-工作机制MinorGC、 Mixed GC 、 FullGC的区别是什么 分代收集算法 什么是分代 在java8时,堆被分为了两份: 新生代和老年代【1:2】 其中: 对于新生代,内部又被分为了三…

Object.assign详解

一、Object.assign是什么? Object.assign( )方法用于将所有可枚举属性的值从一个或多个源对象复制到目标对象。它将返回目标对象。 二、用法 Object.assign(target, ...sources) 参数:target ——>目标对象 source ——>源对象 返回值:…

【springboot项目】在idea中启动报错合集

一、IDEA中报错 “Error running ‘Application‘: Command line is too long.“ 的解决办法 报错详情: Error running Application: Command line is too long.Shorten command line for Application or also for Spring Boot default configuration.报错原因&am…

Mybatis查询

返回实体类,必须指定返回类型, resultType不能省略,并且数据库字段名与实体类不一致会填充NULL,实体类我们一般都是驼峰,数据库字段一般都是下划线,所以在查询的时候可以起别名解决,属性填充本质上调用的是…

adb 命令行执行单元测试

文章目录 1、配置 adb 环境变量2、adb 执行测试3、官方文档解读 adb 使用(1)第一条执行测试的adb命令(2)am instrument 参数(3)-e 参数 的 key-value键值对(4)用法用例 4、存在问题 …

伪类和伪元素有何区别?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 伪类(Pseudo-class)⭐ 伪元素(Pseudo-element)⭐ 区别总结⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前…

2023年测试工程师,从0到1学习自动化测试,落地实施...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 如何实施自动化测…

湘大 XTU OJ 1291 Buying Gifts 题解(非常详细):枚举 维护最小值 排序

一、链接 1291 Buying Gifts 二、题目 题目描述 快到年末了,Boss Liu准备在年会上发些礼物,由于不想礼物的价格区别太大,Boss Liu希望最好的礼物与最差的礼物价格相差越小越好。 当然,如果存在相同的选择,Boss Liu…

HBase API

我们之后的实际开发中不可能在服务器那边直接使用shell命令一直敲的&#xff0c;一般都是通过API进行操作的。 环境准备 新建Maven项目&#xff0c;导入Maven依赖 <dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>…

​运行paddlehub报错,提示:UnicodeDecodeError: ‘gbk‘ codec can‘t decode byte…**​

我在windows11环境下运行paddlehub报错&#xff0c;提示&#xff1a;UnicodeDecodeError: ‘gbk‘ codec can‘t decode byte…** 参考篇文字的解决方案&#xff1a;window10下运行项目报错&#xff1a;UnicodeDecodeError: ‘gbk‘ codec can‘t decode byte...的解决办法_uni…

NIDS网络威胁检测系统-Golang

使用技术&#xff1a; Golang Gin框架 前端三件套 演示画面&#xff1a; 可以部署在linux和window上 目前已在Kali2021和Window10上进行测试成功

【百度翻译api】中文自动翻译为英文

欸&#xff0c;最近想做一些nlp的项目&#xff0c;做完了中文的想做做英文的&#xff0c;但是呢&#xff0c;国内爬虫爬取的肯定都是中文 &#xff0c;爬取外网的技术我没有尝试过&#xff0c;没有把握。所以我决定启用翻译&#xff0c;在这期间chatGPT给了我非常多的方法&…