Kafka基础 (上)

前言

各位清明 快乐呀,近期博主也是学习了一下kafka,以下是博主的一些学习笔记,希望对你有所帮助

前置知识

线程中的数据交互以及进程中的数据交互

我们知道线程之间可以使用堆空间进行数据交互的

但是如果发送方和接收方处理数据的效率差距过大,这里就会造成消息积压的问题,怎么处理呢?存入文件显然是不可取的,因为这里文件的大小也是有上限的,所以我们加上一个中间件,也就是kafka

这里进程呢?

进程之间肯定是不可以使用共用内存进行交互的,这里就采用网络传输的方式进行交互,因为他们的内存都是独立存在的,使用socket网络传输即可

我们知道一个一般处理消息的不止一个消费者,这样直接让消费者和生产者进行交互耦合度也就太高了,我们也引入了消息中间件来降低消息的耦合度吧

JMS Java Message Service

JMS包含了p2p和消息订阅发布模型,基本上很多mq都是遵循这个模型的

我们kafka没有加上mq的的后缀,他其实不是完全遵循这个模型

下面我们介绍一下这个模型

p2p 点对点模型

这里指的是一条消息只能被消费者消费一次,然后消费者会给生产者一个反馈

sub/pub订阅发布模型

生产者生产的消息会发送到对应的topic,订阅了这个topic的消费者都可以消费数据,同样的数据可以被不同的消费者进行消费

注:本文基于的Windows的kafka进行演示学习,kafka一般部署在linux操作系统上

kafka的生产者消费者模型

kafka在底层大量的使用生产者消费者模型

并且为了保证数据的安全性,其还使用了日志文件进行了数据的保存

下面我们通过一个简单的helloworld程序来感受一下

我们先启动zookeeper 再启动kafka即可

注意,先进行修改 两个配置文件,存放对应的data

注:记得进入对应的文件夹,使用对应的bat脚本文件

先演示一下单机

开启zookeeper脚本

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

开启kafka脚本

call bin/windows/kafka-server-start.bat config/server.properties

创建主题

查看主题

执行经典helloworld

注:启动完一定要先创建主题,主题是kafka一个基本的逻辑分类单位,先开启zookeeper再开启kafka

如果这里kafka客户端一闪而过启动失败的情况,直接删除data文件即可

maven项目简单搭建

引入依赖

 <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency></dependencies>   

注:在kafka中提供服务的节点就称之为broker

producer代码

package kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;public class testProducer {public static void main(String[] args) {//TODO 创建配置对象Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//TODO 对生产的KV操作进序列化configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO   创建生产者对象//生产者对象需要确定泛型,是kv类型的KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);//TODO 创建数据//构建数据时,需要传入三个参数,主题,key,valuefor (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test", "key"+i,"hello kafka"+i);//TODO 通过生产者对象将数据发送给kafkaproducer.send(record);}//TODO 关闭生产者对象producer.close();}
}

consumer代码

package kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;public class testConsumer {public static void main(String[] args) {//TODO  创建消费者对象//消费者也需要相应的配置Map<String,Object> consumerConfig = new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置groupIDconsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);//TODO  订阅主题consumer.subscribe(Collections.singletonList("test"));//TODO  从kafka的主题中获取数据//消费者从kafka拉取数据       不是推送的概念while(true) {ConsumerRecords<String, String> dates = consumer.poll(100);for (ConsumerRecord<String, String> date : dates) {System.out.println(date);}}//TODO  关闭消费者对象//consumer.close();}
}

先启动consumer再启动producer,让我们在可视化工具上查看一下信息是否存在了

这里使用的是kafkatool

执行完成就可以发现数据已经存在了

 Kafka系统架构以及核心组件

我们都知道kafka肯定不是只有一个生产者和一个消费者呀

当这里的数据频繁生产消费就可能造成IO热点问题

最后这个节点可能就成为分布式系统的性能瓶颈,一旦节点挂了,就可能造成数据丢失

tips:这里的挂了可能只是网络不稳定,资源耗尽等问题导致的长时间连接不上

解决方案

横向扩展和纵向扩展

横向扩展:使用更快的网络,更大的磁盘...无法根本解决问题

纵向扩展:使用集群的方式,也是kafka的解决方案

这还没有结束,光增加节点是没用的,生产和消费请求还是指向同一个节点,我们需要将这里的数据分散到各个节点,实现同一个主题在不同的broker中

区分不同的数据,加上编号就称之为分区,这也是kafka物理上的存储单位 

例如 partition-0

一个主题可以有多个分区,分散在不同的broker中

但是每个消费者也不能只消费一部分数据呀,每个消费者向每个分区发送请求,这里效率也是很低的,所以又提出了消费组的概念

并且为了数据的安全考虑,也提出了将数据进行备份的方案,但是并不在自己的节点进行备份,因为在自己的节点进行备份的话,自己挂了,备份也没了,所以这里是在其他节点保存备份文件,称之为foller副本,kafka中备份统称为副本,主文件叫做leader副本,只有leader副本可以读写,foller副本只负责备份.

基础组件

每一个kafka节点中都包含很多个组件,下面我们来介绍一下经典的几个组件

首先就是我们的Controller了,在多个节点中我们得选举出一个管理者

这里管理者选举的操作就交给我们的Zookeeper了

这里的选举也很简单粗暴,哪个节点先和他建立连接,他就是Controller

Controller的备份

1.采用备份的方式

2.升级,每个节点都能做备份

这里假设一个节点挂了,可以通过Zookeeper的一个选举功能在选举出新的Controller 

broker架构

为啥生产者和消费者指向同一个broker呢

因为数据是有主题的,主题是有分区的,分区是有副本的,一个叫leader,一个叫foller

指向同一个broker是因为他对应的分区是leader副本,分区管理器会将其同步到文件

集群部署

我们知道kafka一般是以集群方式出现

为了模拟,我们也部署一下集群

这时候解压三个kafka到不同文件夹,修改data配置以及端口即可

可以设置为9091 9092 9093  

注意Zookeeper也得配置

可以写批处理脚本,这样运行起来更加方便

出现以下问题就将其放在根目录下或者将文件夹名改短

可视化工具创建主题

注:这里副本数量超过节点数量不会创建成功,因为一个节点放多个副本是无意义的

Zookeeper的作用

我们简述一下Zookeeper的作用

1.Controller的选举

选举规则就是比较随意,第一个建立和zookeeper建立连接的broker节点就是 controller 然后其他的节点来建立连接的时候也想创建,但是controller已经有了,之后的节点就是放一个监听器,假设现在的Controller挂了,这个监听器就起作用了,从其余的broker中选举出新的broker

2.对节点的监听

Znode节点有个监听功能 可以使用kafka对节点进行监听到节点的变化 数据的变化 连接超时... 监听到以后马上通知kafka进行对应的处理

Controller和Broker之间的通信

第一个broker启动的流程 1.注册broker节点 监听controller节点 2.注册controller节点 选举成为controller,监听/broker/ids节点 因为broker启动就会创建ids,所以这里的监听主要就是看看ids的变化,是否有新的节点创建了 第二个节点加入之后监听器就知道了,会通知broker1集群的变化 然后在第二个broker进来之后还会和第一个节点连接 传输一些集群的信息等等 但是第三个节点连接上来之后,controller会给两个broker都发送相关的集群信息

这也就是说,每当有节点连上了之后,controller就会向各个节点发送对应的集群信息

Broker组件

主要是包含日志组件  网络客户端  副本管理器 controller信息  kafka apis(负责处理数据)         Zookeeper的客户端等等

手动创建主题

我们之前的主题使用的都是默认参数自动创建的,我们如果想修改其中的参数就得手动创建对应的admin管理员对象 从而对他的副本信息分区信息进行设置

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;public class AdminTopicTest {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//TODO 创建管理员Admin admin = Admin.create(configs);//TODO 创建主题//第一个参数是主题名//第二个参数是分区的数量 int//第三个参数是副本的因子(本质是数量)  shortString topicName = "test1";int partitionCount = 1;short replicationCount = 1;NewTopic topic = new NewTopic(topicName,partitionCount,replicationCount);String topicName2 = "test2";int partitionCount2 = 2;short replicationCount2 = 2;NewTopic topic2 = new NewTopic(topicName2,partitionCount2,replicationCount2);CreateTopicsResult result = admin.createTopics(Arrays.asList(topic, topic2));//TODO  关闭管理者对象admin.close();}
}

 副本分配策略

主题只是逻辑上的分类,只有分区才能在物理文件上以及存储中有所体现

我们知道我们是使用多个副本冗余来提高数据的可靠性的

那么副本在节点中又是如何分配的呢,咱们接下来慢慢说

先说理想的情况

我们知道副本也分为leader和follower

我们这里的均衡指的是leader的分布应该是均匀的

我们先说理想情况下

我们希望每个节点的leader数量都是相近的

实际上kafka并不是这样的

因为副本的创建是有顺序的,我们无法再一开始就预测浩好这里的副本分配

kafka是采用一个简单的分配算法来进行的副本分配

例子

注:我们也可以自己手动分配

一个重要的名词  ISR  in-sync-Replication   就是同步副本列表的意思

主题创建流程

大概就是先问一下controller在哪,通过controller来创建topic

但是底层有很多生产者消费者模型

这里的具体操作由apis接口来实现

生产数据

一般主题都是提前创建好的,如果使用自动创建的话很可能导致IO热点问题

因为副本的leader都在同一个节点上

具体流程如下图

生产者数据先通过拦截器的拦截,然后去元数据区获取controller的信息,然后进行序列化(因为是通过网络传输的数据),在通过分区器确定分区,最后加入缓冲区等待发送

注:拦截器是对数据进行了一些规范化的处理,但是出现错误之后不会导致程序的停止,不影响数据的发送,捕捉到异常也不会进行处理

然后通过发送线程继续发送数据,这里的在途请求缓冲区的大小表示一个节点在同一时间最多处理的请求数量,默认是5,这是经过压力测试的,这样性能最优

分区器

我们刚刚看到数据会经过分区器处理来知道放到哪个分区,下面我们介绍一下分区器是怎么工作的,分区器是从元数据区获取到主题信息再开始计算分区的,注意这里根据的主题信息直接指定分区的话是不会做校验而是直接使用的

算法

分区算法,将key使用散列算法后和分区数进行一次取模运算,在写入数据收集器的时候,就需要进行处理了,如果当前主题分区是未知分区,就会根据当前主题分区的负载情况动态进行分区(粘性分区策略)如果当前发送的时候没有分区负载情况,这时候就是随机选择的,选择以后就尽可能向这里添加,超过阈值就会切换另外一个分区,阈值默认是16k,后面不为空之后就会根据每个分区的负载情况生成一个随机的权重,然后通过一个二分查找找到一个和这个值相近的,然后算出来分区编号

缓冲区

缓冲区中对数据的追加是只要批次大小足够,没到达阈值,直接向后追加即可

批次对象空间不足 将满了的批次对象锁定并关闭,等着sender线程来拿,然后重新开一个批次对象来追加这里的数据 数据是可以超过16k的,比如60k的数据,直接装,然后关闭准备发车,是不可以拆开的

sender发送线程

会将符合发送条件的数据重新进行整合,前面是因为相同主题的不同分区可能在不同的broker中,但是不同主题的分区可能在相同的broker中,用topic进行区分效率更高一点

批次对象到达大小或者是时间阈值之后就会被发送

应答机制 ACKS

本质上是使用异步的方式

发送数据无需等待应答以后再继续发送

这样数据的发送效率高了,但是安全性无法保证

Kafka就面对不同的场景给出了三个ACKS处理等级

分别是 0 1  all

0就是优先考虑效率

all就是优先考虑数据的安全性

1就是两者之间的折中考虑

ACKS = 0

表示只是将数据放到网络中了,根本不关心其是否发送完成,直到放到网络中就给main线程发送一个应答

ACKS等级为1的时候是需要数据在leader中进行保存到文件中之后才能应答

all等级是等待数据进行备份之后才进行对应的应答

retry重试机制

我们知道数据既然是在网络中传输的,那么数据丢包是很正常的,假设网络不稳定等等情况就很容易导致数据的丢包等等

我们这时候和tcp协议一样定义了数据的重传机制

只要主线程没有收到acks,到达一定的超时时间,这时候就会将数据再次放回缓冲区重新进行一次发送

但是这也会导致一定的问题,比如数据重复多次或者是数据乱序问题

好处是让数据更安全,但是也有坏处

数据重复:

假设这里leader写入了磁盘了,但是传ack的时候网络不稳定,没发成,这里就会再传一次 这里数据就在文件中放了两次

数据乱序:

还有一个问题就是数据的顺序问题,发生顺序是 a b c 但是可能 a发送失败重发了 结果就是b c a 的顺序了

这在某种情况下不是我们想看到的,于是我们又引入了幂等性操作和事务的概念

幂等性

幂等性要求数据的ACKS等级一定是all或者-1  (这俩等级一个意思),并且必须开启retry ,并且要求在途请求缓冲区的数量必须小于等于5

实现?

就是给数据标上生产者编号,标上数据序号,但是注意这里的幂等性不可以跨越客户端或者是跨越分区来起作用

这里的幂等性只是在同一个分区内的幂等

数据在发送给Kafka的时候,kafka会记录生产者的状态

重复是靠在加入在途数据缓冲区的时候判断一下contains

有序是按照序号来的,下一个加入的数据必须大于当前的最后一个数据

缺陷:

只能保证一个分区的数据是有序且不重复的

但是如果这时候生产者重启了,此时仍然会导致数据的重复

这就要通过下面的事务来完成对这个缺点的补充了

事务

这里的事务是基于幂等性的,和数据库中的事务完全不是一个意思

基本原理是保证生产者id重启前后不会改变

执行顺序如下

注:事务这里的发送数据不是通过send方法进行发送,而是commit才会发送,send只是将数据放到缓冲区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/299057.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE小:UE5.3无法创建C++工程

当您在使用Unreal Engine (UE) 构建项目时&#xff0c;如果遇到以下问题&#xff1a; Running C:/Program Files/Epic Games/UE\_5.3/Engine/Build/BatchFiles/Build.bat -projectfiles -project"C:/UEProject/Shp\_1/Shp\_1.uproject" -game -rocket -progress Usi…

Hippo4j线程池实现技术

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容部署运行模式集成线程池监控配置参数默认配置 &#x1f4e2;文章总结&#x1f4e5;博主目标 &#x1f50a;博主介绍 &#x1f31f;我是廖志伟&#xff0c;一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51CTO专家…

了解 Solidity 语言:构建智能合约的首选编程语言

了解 Solidity 语言&#xff1a;构建智能合约的首选编程语言 Solidity 是一种用于编写智能合约的高级编程语言&#xff0c;广泛应用于以太坊和其他以太坊虚拟机&#xff08;EVM&#xff09;兼容的区块链平台。它是以太坊智能合约的首选语言之一&#xff0c;具有丰富的功能和灵活…

【HTML】CSS样式(二)

上一篇我们学习了CSS基本样式和选择器&#xff0c;相信大家对于样式的使用有了初步认知。 本篇我们继续来学习CSS中的扩展选择器及CSS继承性&#xff0c;如何使用这些扩展选择器更好的帮助我们美化页面。 下一篇我们将会学习CSS中常用的属性。 喜欢的 【点赞】【关注】【收藏】…

最新在线工具箱网站系统源码

内容目录 一、详细介绍二、效果展示1.部分代码2.效果图展示 三、学习资料下载 一、详细介绍 系统内置高达72种站长工具、开发工具、娱乐工具等功能。此系统支持本地调用API&#xff0c;同时还自带免费API接口&#xff0c; 是一个多功能性工具程序&#xff0c;支持后台管理、上…

FPGA常用IP核之FIFO学习

IP核是FPGA芯片公司提供的逻辑功能块&#xff0c;在FPGA芯片中可以进行优化和预先配置&#xff0c;可以直接在用户设计的程序中使用&#xff0c;应用范围很广。在FPGA设计开发过程中使用IP核&#xff0c;可以大大的缩短开发周期&#xff0c;高度优化的IP核可以使FPG开发工程师专…

CANoe自带的TCP/IP协议栈中TCP的keep alive机制是如何工作的

TCP keep alive机制我们已经讲过太多次,车内很多控制器的TCP keep alive机制相信很多开发和测试的人也配置或者测试过。我们今天想知道CANoe软件自带的TCP/IP协议栈中TCP keep alive机制是如何工作的。 首先大家需要知道TCP keep alive的参数有哪些?其实就三个参数:CP_KEEP…

腾讯云容器与Serverless的融合:探索《2023技术实践精选集》中的创新实践

腾讯云容器与Serverless的融合&#xff1a;探索《2023技术实践精选集》中的创新实践 文章目录 腾讯云容器与Serverless的融合&#xff1a;探索《2023技术实践精选集》中的创新实践引言《2023腾讯云容器和函数计算技术实践精选集》整体评价特色亮点分析Serverless与Kubernetes的…

【C++航海王:追寻罗杰的编程之路】C++的类型转换

目录 1 -> C语言中的类型转换 2 -> 为什么C需要四种类型转换 3 -> C强制类型转换 3.1 -> static_cast 3.2 -> reinterpret_cast 3.3 -> const_cast 3.4 -> dynamic_cast 4 -> RTTI 1 -> C语言中的类型转换 在C语言中&#xff0c;如果赋值运…

PyQt ui2py 使用PowerShell将ui文件转为py文件并且将导入模块PyQt或PySide转换为qtpy模块开箱即用

前言 由于需要使用不同的qt环境&#xff08;PySide&#xff0c;PyQt&#xff09;所以写了这个脚本&#xff0c;使用找到的随便一个uic命令去转换ui文件&#xff0c;然后将导入模块换成qtpy这个通用库(支持pyside2-6&#xff0c;pyqt5-6)&#xff0c;老版本的是Qt.py(支持pysid…

糟糕,Oracle归档满RMAN进不去,CPU98%了!

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

C语言 | Leetcode C语言题解之第12题整数转罗马数字

题目&#xff1a; 题解&#xff1a; const char* thousands[] {"", "M", "MM", "MMM"}; const char* hundreds[] {"", "C", "CC", "CCC", "CD", "D", "DC"…

根据mysql的执行顺序来写select

过滤顺序指的是mysql的逻辑执行顺序&#xff0c;个人觉得我们可以按照执行顺序来写select查询语句。 目录 一、执行顺序二、小tips三、案例第一轮查询&#xff1a;统计每个num的出现次数第二轮查询&#xff1a;计算**最多次数**第三轮查询&#xff1a;找到所有出现次数为最多次…

【数据库】主流数据库及其常用工具简单科普

主流数据库及其常用工具 数据库分类关系型数据库&#xff08;RDBMS&#xff09;非关系型数据库&#xff08;NoSQL&#xff09;混合型数据库&#xff08;Hybrid Databases&#xff09;对象关系数据库&#xff08;ORDBMS&#xff09;多维数据库&#xff08;Multidimensional Data…

InterliJ IDEA基本设置

安装好idea后&#xff0c;将软件打开&#xff0c;可以进行基础设置 1.打开软件&#xff0c;先安装插件-汉化包&#xff08;不推荐&#xff0c;最好使用英文版&#xff09;&#xff0c;本次我们使用汉化版本完成基本设置&#xff0c;后期希望大家适应英文版的开发环境。&#x…

05-延迟任务精准发布文章

延迟任务精准发布文章 1)文章定时发布 2)延迟任务概述 2.1)什么是延迟任务 定时任务&#xff1a;有固定周期的&#xff0c;有明确的触发时间延迟队列&#xff1a;没有固定的开始时间&#xff0c;它常常是由一个事件触发的&#xff0c;而在这个事件触发之后的一段时间内触发…

docker + miniconda + python 环境安装与迁移(详细版)

本文主要列出从安装dockerpython环境到迁移环境的整体步骤。windows与linux之间进行测试。 简化版可以参考&#xff1a;docker miniconda python 环境安装与迁移&#xff08;简化版&#xff09;-CSDN博客 目录 一、docker 安装和测试 二、docker中拉取miniconda&#xff…

Java数据结构队列

队列(Queue) 概念 队列的使用 注意&#xff1a;Queue是个接口&#xff0c;在实例化时必须实例化LinkedList的对象&#xff0c;因为LinkedList实现了Queue接口。 import java.util.LinkedList; import java.util.Queue;public class Test {public static void main(String[]…

数据挖掘中的PCA和KMeans:Airbnb房源案例研究

目录 一、PCA简介 二、数据集概览 三、数据预处理步骤 四、PCA申请 五、KMeans 聚类 六、PCA成分分析 七、逆变换 八、质心分析 九、结论 十、深入探究 10.1 第 1 步&#xff1a;确定 PCA 组件的最佳数量 10.2 第 2 步&#xff1a;使用 9 个组件重做 PCA 10.3 解释 PCA 加载和特…

路由器拨号失败解决方法

目录 一、遇到问题 二、测试 三、解决方法 &#xff08;一&#xff09;路由器先单插wan口设置 &#xff08;二&#xff09;mac地址替换 &#xff08;三&#xff09;更改路由器DNS 一、遇到问题 1 .在光猫使用桥接模式&#xff0c;由路由器进行拨号的时候&#xff0c;出现…