kafka基本概念及操作

kafka介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的 (replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、 Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

1.使用场景

  • 日志收集:可以用Kafka收集各种服务的log,通过kafka以统⼀接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

  • 消息系统:解耦和生产者和消费者、缓存消息等。

  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、 搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

2.kafka基本概念

image-20230811105243277

整个流程应该是:producer通过网络发送消息到Kafka集群,然后consumer 来进行消费,如下图:

image-20230811105555825

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。

kafka基本使用

1.安装&关闭

以下所有操作全部基于kafka_2.13-3.0.1.tgz (3.0.1版本) 这个版本

配置文件server.properties(主要修改以下配置)

#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
listeners=PLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9092
advertised.listeners=PLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9092
#kafka的消息存储⽂件
log.dir=/usr/local/kafka/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181

启动

./kafka-server-start.sh -daemon ../config/server.properties

验证

# 查看端口是否占用
netstat -ntlp 

或者

进入到zk内查看是否有kafka的节点:/brokers/ids/0

./zkCli.sh

image-20230814134630858

关闭kafka

./kafka-server-stop.sh stop ../config/server.properties

2.创建topic

执行以下命令创建名为“test”的topic,这个topic只有⼀个partition,并且备份因子也设置为 1

./kafka-topics.sh --bootstrap-server kafkahost:9092 --create --topic test --partitions 1 --replication-factor 1-- 新版本的kafka,已经不需要依赖zookeeper来创建topic,新版的kafka创建topic指令如下:
./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test --partitions 1 --replication-factor 1

3.查看kafka中所有的主题

./kafka-topics.sh --bootstrap-server kafkahost:9092 --list./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --list

4.发送消息

kafka自带了⼀个producer命令客户端,可以从本地文件中读取内容或者以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每⼀个行会被当做成⼀个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

把消息发送给broker中的某个topic,打开⼀个kafka发送消息的客户端,然后开始用客户端向kafka服务器发送消息

./kafka-console-producer.sh --bootstrap-server 124.222.253.33:9092 --topic test

5.消费消息

消费消息两种方式

对于consumer,kafka同样也携带了⼀个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。 使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息

  1. 从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费

    ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --topic test
    
  2. 从当前主题中的第⼀条消息开始消费

    ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --from-beginning --topic test
    

6.消息的细节

image-20230811094410650

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时可以通过offset来描述当前要消费的那条消息的位置

7.单播&多播消息

单播还是多播消息取决于topic有多少消费组

1)单播

如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。(同⼀个消费组中只能有⼀个消费者收到订阅topic中的消息。)

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup --topic test

2)多播

不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同⼀个消息。

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup2 --topic test

3)区别

image-20230811111854111

8.查看消费组详细信息

# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --list# 查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --describe --group testGroup

image-20230811113410786

  • Currennt-offset:当前消费组的已消费偏移量(最后被消费的消息的偏移量)
  • Log-end-offset:主题对应分区消息的结束偏移量(HW) 【消息总量,最后一条消息偏移量】
  • Lag:当前消费组未消费的消息数(积压消息量)

Kafka中主题和分区的概念

1.主题

主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。

但是有⼀个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的。为了解决单个文件过大的问题,kafka提出了Partition分区的概念。

2.分区

1)分区的概念

通过partition将⼀个topic中的消息分区来存储。

这样的好处有多个:

  • 分区存储,可以解决统⼀存储文件过大的问题
  • 提高了读写的吞吐量:读和写可以同时在多个分区中进行

image-20230813215613707

2)创建多分区的主题

./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test1 --partitions 2 --replication-factor 1

3.kafka中消息日志文件中保存的内容

  • 00000.log: 这个文件中保存的就是消息

  • __consumer_offsets-49:

    kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区(可以通过offsets.topic.num.partitions设置)。

    • 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数
    • 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前offset的值
  • 文件中保存的消息,默认保存7天。七天到后消息会被删除,最后就保留最新的那条数据。

Kafka集群操作

1.搭建kafka集群(三个broker)

  • 创建三个server.properties文件
# 0 1 2
broker.id=2
# 9092 9093 9094
listeners=PLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9094
advertised.listeners=PLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
  • 通过命令来启动三台broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
  • 校验是否启动成功

进入到zk中查看/brokers/ids中过是否有三个znode(0,1,2)

2.副本的概念

在创建主题时,除了指明了主题的分区数以外,还指明了副本数 replication-factor参数

如下主题,创建了两分区、三副本(副本对应集群中broker数量)

./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic my-replicated-topic --partitions 2 --replication-factor 3

副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有⼀个副本作为leader,其他是follower。

查看topic情况:

# 查看topic情况
./kafka-topics.sh --describe --bootstrap-server 124.222.253.33:9092 --topic my-replicated-topic

image-20230814105758165

image-20230814105921066

  • leader:

kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产⽣⼀个新的leader(follower通过poll的方式来同步数据)

  • follower:

接收leader的同步的数据,leader挂了,参与leader选举

  • replicas:

当前副本存在的broker节点

  • isr:

可以同步和已同步的broker节点会被存入到isr集合中。如果isr中的broker节点性能较差,会被踢出isr集合。

3.broker、主题、分区、副本

综上broker、主题、分区、副本概念已全部展示:

集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker⾥。

4.kafka集群消息的发送

./kafka-console-producer.sh --broker-list 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --topic my-replicated-topic

5.kafka集群消息的消费

1)普通消费

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --topic my-replicated-topic

2)指定消费组消费

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

6.分区分消费组的集群消费中的细节

image-20230814115045080

  • ⼀个partition只能被⼀个消费组中的⼀个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺序性呢?(Kafka只在partition的范围内保证消息消费的局部顺序性不能在同⼀个topic中的多个partition中保证总的消费顺序性。 ⼀个消费者可以消费多个partition。)

  • partition的数量决定了消费组中消费者的数量,建议同⼀个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息

  • 如果消费者挂了,那么会触发rebalance机制,会让其他消费者来消费该分区

kafka集群中的controller、 rebalance、HW

1.controller

  • 集群中谁来充当controller 【Kafka集群中始终只有一个Controller Broker

每个broker启动时会向zk创建⼀个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller

负责这么几件事:

  • 当集群中有⼀个副本的leader挂掉,需要在集群中选举出⼀个新的leader,选举的规则是 从isr集合中最左边获得。
  • 当集群中有broker新增或减少,controller会同步信息给其他broker
  • 当集群中有分区新增或减少,controller会同步信息给其他broker

脑裂
如果controller Broker 挂掉了,Kafka集群必须找到可以替代的controller,集群将不能正常运转。这里面存在一个问题,很难确定Broker是挂掉了,还是仅仅只是短暂性的故障。但是,集群为了正常运转,必须选出新的controller。如果之前被取代的controller又正常了,他并不知道自己已经被取代了,那么此时集群中会出现两台controller
其实这种情况是很容易发生。比如,某个controller由于GC而被认为已经挂掉,并选择了一个新的controller。在GC的情况下,在最初的controller眼中,并没有改变任何东西,该Broker甚至不知道它已经暂停了。因此,它将继续充当当前controller,这是分布式系统中的常见情况,称为脑裂。

2.rebalance机制

  • 前提:消费组中的消费者没有指明分区来消费
  • 触发的条件:当消费组中的消费者和分区的关系发生变化的时候
  • 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
    • range:根据公式计算得到每个消费消费哪几个分区:前面的消费者是分区总数/消费者数量+1,之后的消费者是分区总数/消费者数量(7个分区,三个消费者,则是:第一个消费者前三个分区,后面两个消费者各两个分区;8,3->3,3,2;9,3->3,3,3)
    • 轮询:大家轮着来
    • sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配(全部重新分配效率较差)。建议开启。

3.HW和LEO

LEO是某个副本最后消息的消息位置(log-end-offset)

HW是已完成同步的位置。消息在写入broker时,且每个broker完成这条消息的同步后,hw才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失。(保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。)
HW <= LEO
image-20230815233558072

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/91291.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【脚踢数据结构】内核链表

(꒪ꇴ꒪ )&#xff0c;Hello我是祐言QAQ我的博客主页&#xff1a;C/C语言,Linux基础,ARM开发板&#xff0c;软件配置等领域博主&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff0c;让我们成为一个强大的攻城狮&#xff01;送给自己和读者的一句鸡汤&#x1f914;&…

ide internal errors【bug】

ide internal errors【bug】 前言版权ide internal errors错误产生相关资源解决1解决2 设置虚拟内存最后 前言 2023-8-15 12:36:59 以下内容源自《【bug】》 仅供学习交流使用 版权 禁止其他平台发布时删除以下此话 本文首次发布于CSDN平台 作者是CSDN日星月云 博客主页是h…

8.15号经典模型复习笔记

文章目录 Deep Residual Learning for Image Recognition(CVPR2016)方法 Densely Connected Convolutional Networks&#xff08;CVPR2017&#xff09;方法 EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks&#xff08;ICML2019&#xff09;方法 Re…

深入了解 Rancher Desktop 设置

Rancher Desktop 设置的全面概述 Rancher Desktop 拥有方便、强大的功能&#xff0c;是最佳的开发者工具之一&#xff0c;也是在本地构建和部署 Kubernetes 的最快捷方式。 本文将介绍 Rancher Desktop 的功能和特性&#xff0c;以及 Rancher Desktop 作为容器管理平台和本地…

KU Leuven TU Berlin 推出“RobBERT”,一款荷兰索塔 BERT

荷兰语是大约24万人的第一语言&#xff0c;也是近5万人的第二语言&#xff0c;是继英语和德语之后第三大日耳曼语言。来自比利时鲁汶大学和柏林工业大学的一组研究人员最近推出了基于荷兰RoBERTa的语言模型RobBERT。 谷歌的BERT&#xff08;来自Transformers的B idirectional …

vant van-tabs van-pull-refresh van-list 标签栏+上拉加载+下拉刷新

<template><div class"huibj"><div class"listtab"><!--顶部导航--><div class"topdh"><topnav topname"余额明细"></topnav></div><!--Tab 标签--><van-tabs v-model"…

linux 查看文件被那个进程所调用

使用lsof 命令 显示文件被哪个进程所占用 lsof /var/log/messagesCOMMAND&#xff1a;进程的名称PID&#xff1a;进程标识符USER&#xff1a;进程所有者FD&#xff1a;文件描述符&#xff0c;应用程序通过文件描述符识别该文件。如cwd、txt等TYPE&#xff1a;文件类型&#…

设计HTML5文档结构

定义清晰、一致的文档结构不仅方便后期维护和拓展&#xff0c;同时也大大降低了CSS和JavaScript的应用难度。为了提高搜索引擎的检索率&#xff0c;适应智能化处理&#xff0c;设计符合语义的结构显得很重要。 1、头部结构 在HTML文档的头部区域&#xff0c;存储着各种网页元…

力扣:63. 不同路径 II(Python3)

题目&#xff1a; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish”&#xff09;。 现在考虑网格中有障碍物。那么从…

淘宝搜索店铺列表API:关键字搜索店铺信息 获取店铺主页 店铺所在地 服务评级

接口名称&#xff1a;item_search_seller 基本功能介绍 该API可以通过传入关键字&#xff0c;获取到淘宝商城的店铺列表&#xff0c;支持翻页显示。指定参数page获取到指定页的数据。返回的店铺信息包括&#xff1a;店铺名、店铺ID、店铺主页、宝贝图片、掌柜名字、店铺所在地…

Python爬虫的应用场景与技术难点:如何提高数据抓取的效率与准确性

作为专业爬虫程序员&#xff0c;我们在数据抓取过程中常常面临效率低下和准确性不高的问题。但不用担心&#xff01;本文将与大家分享Python爬虫的应用场景与技术难点&#xff0c;并提供一些实际操作价值的解决方案。让我们一起来探索如何提高数据抓取的效率与准确性吧&#xf…

【广州华锐互动】物联网工程VR虚拟课件有哪些特色?

物联网工程VR虚拟课件由广州华锐互动制作&#xff0c;是一种利用虚拟现实技术&#xff0c;将物联网的概念和应用场景通过模拟的方式呈现给学生的教学工具。相比传统的教学方式&#xff0c;物联网工程VR虚拟课件具有以下特色&#xff1a; 1.交互性强 物联网工程VR虚拟课件可以让…

选择最适合自己的NIO, 一探流技术

目录 一、Channel1、FileChannel代码示例2、DatagramChannel代码示例3、SocketChannel 和 ServerSocketChannel代码示例 二、Buffer1、ByteBuffer示例代码2、CharBuffer示例代码3、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer 等示例代码 三、Selector1、S…

转行软件测试四个月学习,第一次面试经过分享

我是去年上半年从销售行业转行到测试的&#xff0c;从销售公司辞职之后选择去培训班培训软件测试&#xff0c;经历了四个月左右的培训&#xff0c;在培训班结课前两周就开始投简历了&#xff0c;在结课的时候顺利拿到了offer。在新的公司从事软件测试工作已经将近半年有余&…

贝锐蒲公英:助力企业打造稳定高效的智能安防监控网络

随着技术的快速发展和物联网的普及&#xff0c;企业面临着许多安全威胁和风险&#xff0c;如盗窃、入侵、信息泄露等&#xff0c;企业需要建立安防监控系统来保护其资产、员工和业务运营的安全。 然而&#xff0c;企业在搭建安防监控系统的过程中&#xff0c;可能会面临一些难…

手机的发展历史

目录 一.人类的通信方式变化 二.手机对人类通信的影响 三.手机的发展过程 四.手机对现代人的影响 一.人类的通信方式变化 人类通信方式的变化是一个非常广泛和复杂的话题&#xff0c;随着技术的进步和社会的发展&#xff0c;人类通信方式发生了许多重大的变化。下面是一些主…

python技术栈 之 单元测试中mock的使用

一、什么是mock&#xff1f; mock测试就是在测试过程中&#xff0c;对于某些不容易构造或者不容易获取的对象&#xff0c;用一个虚拟的对象来创建以便测试的测试方法。 二、mock的作用 特别是开发过程中上下游未完成的工序导致当前无法测试&#xff0c;需要虚拟某些特定对象…

fiddler抓包工具的用法以及抓取手机报文定位bug

前言&#xff1a; fiddler抓包工具是日常测试中常用的一种bug定位工具 一 抓取https报文步骤 使用方法&#xff1a; 1 首先打开fiddler工具将证书导出 点击TOOLS------Options------Https-----Actions---选中第二个选项 2 把证书导出到桌面后 打开谷歌浏览器 设置---高级…

数据结构算法--2 冒泡排序,选择排序,插入排序

基础排序算法 冒泡排序 思想就是将相邻元素两两比较&#xff0c;当一个元素大于右侧相邻元素时&#xff0c;交换他们的位置&#xff0c;小于右侧元素时&#xff0c;位置不变&#xff0c;最终序列中的最大元素&#xff0c;像气泡一样&#xff0c;到了最右侧。 这时冒泡排序第一…

分类预测 | MATLAB实现CNN-BiGRU-Attention多输入分类预测

分类预测 | MATLAB实现CNN-BiGRU-Attention多输入单输出分类预测 目录 分类预测 | MATLAB实现CNN-BiGRU-Attention多输入单输出分类预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 Matlab实现CNN-BiGRU-Attention多特征分类预测&#xff0c;卷积双向门控循环…