kafka个人笔记

大部分内容源于https://segmentfault.com/a/1190000038173886, 本人手敲一边加强印象方便复习

消息系统的作用

解耦
冗余
扩展性
灵活性(峰值处理
可恢复
顺序保证
缓冲
异步

  • 解耦:扩展两边处理过程,只需要让他们遵守约束即可
  • 冗余:持久化数据:规避丢失风险。采用 插入-获取-删除范式明确指出消息被处理完毕
  • 扩展性:解耦处理过程,容易扩展处理过程增大消息处理频率
  • 灵活性(峰值处理:访问激增情况不常见,无需投入过多标准资源。使用消息队列顶住访问压力
  • 可恢复:系统失效时仍可保证队列消息在系统恢复后处理
  • 顺序保证:kafka保证partition内消息有序
  • 缓冲:控制和优化 数据经过系统的速度,解决生产、消费速度不一致的问题
  • 异步:允许用户把一个或若干个消息放入队列,且不立即被处理

架构

在这里插入图片描述

  1. producer,消息生产者
  2. broker:kafka集群的服务器
  3. topic:消息的类别
  4. partition:kafka分配单位,一个topic包含一个或多个partition
  5. consumer:消息消费者,终端或服务
  6. comsumer group:
    high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  7. replica:partition副本
  8. leader:特殊的replica,producer和consumer只和leader交互
  9. follower:除了leader的replica都为follwer,复制数据
  10. controller:服务器:用于leader选举和failover
  11. zookepper,存储集群meta信息

发布消息

producer用push发布到broker,消息被append到partition,顺序写磁盘

消息路由

//构造函数
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {if (topic == null)throw new IllegalArgumentException("Topic cannot be null");if (timestamp != null && timestamp < 0)throw new IllegalArgumentException("Invalid timestamp " + timestamp);this.topic = topic;this.partition = partition;this.key = key;this.value = value;this.timestamp = timestamp;
}private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {//指定了 partition 则直接使用List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int lastPartition = partitions.size() - 1;if (partition < 0 || partition > lastPartition) {throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));}return partition;}//否则使用 key 计算return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {//轮询int nextValue = counter.getAndIncrement();List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {return DefaultPartitioner.toPositive(nextValue) % numPartitions;}} else {//对 keyBytes 进行 hash 选出一个 patitionreturn DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}
}
  1. 指定partition直接用
  2. 未指定partition但指定了key,对key进行hash得到partition
  3. 都未指定,使用轮询

写入流程

在这里插入图片描述

  1. producer从zk的/brokers/…/stateleader
  2. producer发消息给leader
  3. leader把消息写入log
  4. follower从leader拉取消息写入log后发送ACK给leader
  5. leader收到所有replica的ACK后,增加high watermark(位置信息,即位移(offset))给producer发送ack

投递保证

    ① At most once 消息可能会丢,但绝不会重复传递② At least one  消息绝不会丢,但可能会重复传递③ Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的

默认 at least one

接收消息的行为

  1. comsumer从broker读取消息后,可以选择commit或处理消息
    1. 如果commit
      1. zookeeper存在comsumer在partition下读取消息的offset
      2. comsumer下次读取partition从下一条开始读取
    2. 未commit
      1. 下次读取位置和上次commit后开始位置相同

at most once

读完消息先commit再处理消息。
若commit后未处理消息系统崩坏,下次重新开始工作无法读到已提交但未处理的消息

At least once

读完消息先处理再commit消费状态(保存offset)
若处理消息后未commit系统崩坏,重新工作的时候会处理未commit的消息(处理两次)

Exactly once 两阶段提交

协调offset和实际操作的输出。但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方

  1. consumer拿到数据后可能把数据放到HDFS
  2. 最新的offset和数据一起写到HDFS
  3. 保证offset更新和数据输出同时完成

(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。

消息保存

topic分为多个partition,每个partition对应一个文件夹

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据

  • 基于时间:log.retention.hours=168
  • 基于大小:log.retention.bytes=1073741824
log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:清理超过指定时间清理: 
log.retention.hours=16超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824

请添加图片描述

topic的创造

  1. controller在ZK的/brokers/topics 节点上注册 watcher
    ,topic被创建的时候,controller 会通过 watch 得到该 topic 的 partition/replica 分配
  2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
    1. 分配给partition的所有replica(称为AR)任选一个可用的broker作为leader并将AR设置为ISR
    2. 新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
  3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

请添加图片描述
删除 topic 的序列

  1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher
  2. topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配
  3. 若 delete.topic.enable=false,结束;反之controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest

kafka HA 高可用性

replica

同一个 partition 可能会有多个 replica —— erver.properties 配置中的 default.replication.factor=N

若没有replica,broker死机

  • patition 的数据都不可被消费
  • producer 也不能再将数据存于其上的 patition

引入replica,需要选取leader,leader与producer和consumer交互,其他replica与leader复制数据

分配规则

  1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
  2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
  3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

leader failover

partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader

新的 leader 必须拥有旧 leader commit 过的所有消息

zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas)。只有 ISR 里面的成员才能选为 leader。若有f个replica,partition可以保证f-1个replica失效情况下消息不丢失

failover方案

  • 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
  • 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短
    多用第二种方式

broker failover

在这里插入图片描述

  1. controller在zookeeper的/brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
  2. controller从/brokers/ids 节点读取可用broker
  3. controller决定set_p,集合包含死机broker上所有partition
  4. 对set_p所有partition进行: 1. 读取/brokers/ids 节点读取可用broker的ISR 2. 决定新leader, 新leader ISR controller_epoch和leader_epoch信息写入state结点
  5. 通过RPC给broker发送 leaderAndISRRequest 命令

controller failover

controller 宕机时会触发 controller failover

  1. broker在zookeeper的controller节点注册watcher
  2. controller宕机时,zookeeper临时节点消失
  3. 所有存活broker收到fire通知
  4. 每个broker尝试创建新的controller path,其中一个竞选成功为controller
  5. 当选成功触发KafkaController.onControllerFailover
1. 读取并增加 Controller Epoch。
2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
8. 启动 replicaStateMachine 和 partitionStateMachine。
9. 将 brokerState 状态设置为 RunningAsController。
10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

消费

kafka 提供了两套 consumer API:

The high-level Consumer API
The SimpleConsumer API

consumer API

high-level提供kafka消费数据的抽象

  1. 提供了 consumer group 的语义
  2. 消息只能被group内一个consumer消费
  3. 消费的时候不关注offset
  4. 最后一个offset由zookeeper保存

使用high-level consumer API可以是多线程应用

if(消费线程 > partition){部分线程收不到消息
}
if(消费线程 < partition){有些线程收到多个partition消息
}if(一个线程消费多个 patition){无法保证收到消息的顺序
}

** SimpleConsumer API**

适用以下情况

  • 多次读取一个消息
  • 只消费一个 patition 中的部分消息
  • 使用事务来保证一个消息仅被消费一次

partition, offset, broker, leader不透明,需要自己管理

  • 追踪offset确定下一条消费的信息
  • 找出每个partition的follower
  • 处理leader变更

流程如下

  1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
  2. 找到partition的follower
  3. 定义好请求,该请求应该能描述应用程序需要哪些数据
  4. fetch数据
  5. 识别leader变化并做出响应

consumer group
kafka分配单位是partition,consumer属于一个group
一个partition被一个group内的一个consumer消费(但是多个group可以同时消费这个partition)

实现离线处理与实时处理

  • spark 实时处理
  • hadoop 离线处理

消费方法

consumer用pull模式从broker读数据

push 模式很难适应消费速率不同的消费者

  • 消息发送速率是由 broker 决定的
  • 尽可能以最快速度传递消息
  • 容易造成 consumer 来不及处理消息(拒绝服务、网络拥塞

pull模式,consumer根据自己的能力消费信息

pull的优点

  • 简化broker设计
  • consumer自主控制消费速率
  • consumer自主控制消费方式 —— 批量/逐条
  • 选择不同提交方式

消费者递送保证

consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit(Exactly once

实际使用过程中,并不是consumer读完消息就结束了,还需要进一步处理。
处理和commit顺序决定了 consumer delivery guarantee

  • 先commit,后处理消息(At most once
    • consumer 在 commit 后还没来得及处理消息就 crash
    • 重新开始工作后就无法读到刚刚已提交而未处理的消息
  • 先处理再commit( At least once
    • 处理完消息之后 commit 之前 consumer crash
    • 恢复工作:处理刚刚未 commit 的消息
  • 两阶段提交
    (offset 和操作输入存在同一个地方,会更简洁和通用)
    (若不支持,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once) —— high-level API里面offset存于zookeeper中,无法存于HDFS,simple可以存于HDFS

consumer rebalance

触发机制

  • consumer加入退出
  • partition改变(broker 加入退出

算法如下

  1. 目标topic的partition排序,存于PT
  2. 选择consumer group下所有consumer排序, 存于CG
  3. N = ⌈ s i z e ( P T ) / s i z e ( C G ) ⌉ N = \lceil size(PT)/size(CG)\rceil N=size(PT)/size(CG)⌉
  4. 对group内原本的分配partition解除关系
  5. 然后每N个partition分配给一个consumer

consumer调整了单个partition后,为了保证一致性,group内其他consumer也应触发balance

导致以下问题

herd effect

  • broker,comsumer增减触发rebalance

split brain

  • 每个consumer单独通过zk判断broker和consumer宕机,不同的consumer同时从zookeeper看到的view可能不一致 —— 导致不正确的rebalance
  • 所有consumer不知道其他consumer的rebalance是否成功,导致kafka工作状态不正确
  • 因此0.9开始使用中心coordinator空值rebalance,计划在consumer客户端分配方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/195532.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 等后端应用如何获取客户端真实IP —— 筑梦之路

需求说明 现有一套Java开发的应用&#xff0c;需要能获取到用户访问的真实IP地址&#xff0c;以此来过滤到一些不安全的因素。而实际部署的场景中Java服务提供给用户访问需要经过多次代理&#xff0c;默认情况下是无法获取到客户端真实IP地址的&#xff0c;因此要实现该需求&a…

JavaWeb[总结]

文章目录 一、Tomcat1. BS 与 CS 开发介绍1.1 BS 开发1.2 CS 开发 2. 浏览器访问 web 服务过程详解(面试题)2.1 回到前面的 JavaWeb 开发技术栈图2.2 浏览器访问 web 服务器文件的 UML时序图(过程) &#xff01; 二、动态 WEB 开发核心-Servlet1. 为什么会出现 Servlet2. 什么是…

FPGA电平标准的介绍

对FPGA的管脚进行约束的时候&#xff0c;常常看到这样的电平标准&#xff0c;例如LVCOM18&#xff0c;LVCOS25&#xff0c;LVDS&#xff0c;LVDS25等等&#xff0c;其实这些都是一系列的电平标准。 针对数字电路而言&#xff0c;数字电路表示电平的只有1和0两个状态&#xff0c…

云汇优想:抖音矩阵系统有哪些类型?

抖音作为中国最热门的短视频分享平台之一&#xff0c;不断推陈出新&#xff0c;在内容管理和展示方面也进行了创新。其中&#xff0c;抖音矩阵系统是一项重要的功能&#xff0c;它提供了多种类型的矩阵&#xff0c;帮助用户更好地管理和展示自己的内容。那么&#xff0c;抖音矩…

叮!您收到了一封来自达坦科技的Hackthon邀请函

DatenLord Hackathon 2023正式启动&#xff01;达坦科技基于其跨云分布式文件系统DatenLord项目&#xff0c;结合AI大模型时代背景&#xff0c;搭建了擂台&#xff0c;在此正式向您发出邀约&#xff01; 本次大赛赛题深刻有趣&#xff0c;奖品丰厚多样&#xff0c;借此机会您不…

AVL树的底层实现

文章目录 什么是AVL树&#xff1f;平衡因子Node节点插入新节点插入较高左子树的左侧新节点插入较高左子树的右侧新节点插入较高右子树的左侧新节点插入较高右子树的右侧 验证是否为平衡树二叉树的高度AVL的性能 什么是AVL树&#xff1f; AVL树又称平衡二叉搜索树&#xff0c;相…

VSCode 运行java程序中文乱码

现象描述 java文件中包含中文&#xff0c;运行java程序后&#xff0c;乱码报错。 解决方法 原本运行指令为 cd "d:\programProjects\Java_proj\" ; if ($?) { javac Solution.java } ; if ($?) { java Solution } 需要添加 编码格式 -encoding utf8 cd &quo…

AI中文版怎么用,版本分享,GPT官网入口

网页版上线啦&#xff0c;在线助力大学生、上班族的高效生活&#xff01; GPT4.0是OpenAI最新推出的聊天模型&#xff0c;它的语言理解和生成能力比以前的版本更强大。对于忙碌的上班族来说&#xff0c;GPT4.0能帮助你高效处理工作中的大部分写作任务&#xff0c;比如撰写报告…

ROS服务(Service)通信:通信模型、Hello World与拓展

服务通讯是基于请求响应模式的&#xff0c;是一种应答机制。 用于偶然的、对时时性有要求、有一定逻辑处理需求的数据传输场景。 一、服务通讯模型 服务是一种双向通讯方式&#xff0c;它通过请求和应答的方式传递消息&#xff0c;该模型涉及到三个角色&#xff1a; Master…

ubuntu云服务器配置SFTP服务

目录 一、安装并运行SSH服务 1&#xff0c;安装ssh服务 2&#xff0c;运行ssh 3&#xff0c;查看ssh运行状态 二、创建SFTP用户并进行用户相关的配置 1&#xff0c;创建SFTP用户 2&#xff0c;限制用户只能使用 SFTP&#xff0c;并禁止 SSH 登录。打开/ect/ssh/sshd_conf…

【源码系列】短剧系统开发国际版短剧系统软件平台介绍

系统介绍 短剧是一种快节奏、紧凑、有趣的戏剧形式&#xff0c;通过短时间的精彩表演&#xff0c;向观众传递故事的情感和思考。它以其独特的形式和魅力&#xff0c;吸引着观众的关注&#xff0c;成为了当代戏剧娱乐中不可或缺的一部分。短剧每一集都是一个小故事&#xff0c;…

ArcGIS创建格网

目录 1、创建网格 2、裁剪边界外的网格 3、只保留边界内完整的网格 1、创建网格 首先&#xff0c;我们在创建渔网前&#xff0c;需要指定渔网覆盖的范围。这里我们就以四子王为例 在ArcMap软件中&#xff0c;我们依次选择“Toolboxes”→“Data Management Tools&#xff0…

Centos上删除文件及目录的命令积累

01-如果我想删除Centos上当前目录下的文件 test06-2023-11-14-01.sql 该怎么操作&#xff1f; 答&#xff1a;如果你想删除CentOS上当前目录下的文件 test06-2023-11-14-01.sql&#xff0c;可以使用 rm 命令。以下是删除文件的基本语法&#xff1a; rm test06-2023-11-14-01.s…

结构体数组保存进二进制文件的简单做法

作者&#xff1a;朱金灿 来源&#xff1a;clever101的专栏 为什么大多数人学不会人工智能编程&#xff1f;>>> 最近面临这样一个需求&#xff1a;以比较节省存储空间的存储一组坐标点到文件&#xff0c;要求程序能够跨平台读写这种文件。思考了一下&#xff0c;比较…

为开发GPT-5,OpenAI向微软寻求新融资

11月14日&#xff0c;金融时报消息&#xff0c;OpenAI正在向微软寻求新一轮融资&#xff0c;用于开发超级智能向AGI&#xff08;通用人工智能&#xff09;迈进&#xff0c;包括最新模型GPT-5。 最近&#xff0c;OpenAI召开了首届开发者大会&#xff0c;推出了GPT-4 Turbo、自定…

视频一键转码:批量转换MP4视频的技巧

随着数字媒体设备的普及&#xff0c;视频文件在生活中扮演着越来越重要的角色。而在处理视频文件时&#xff0c;有时需要将其转换为不同的格式以适应不同的需求。其中&#xff0c;MP4格式因其通用性和高质量而备受青睐。本文详解云炫AI智剪如何一键转码的技巧&#xff0c;帮助批…

基于SpringBoot的SSMP整合案例(在Linux中发布项目的注意事项与具体步骤步骤)

前言与注意 这几天在Linux中上线之前的小项目时&#xff0c;遇到了很多的问题&#xff0c;Linux镜像的选择&#xff0c;jdk&#xff0c; mysql在linux中的下载&#xff0c;使用finallshell连接linux&#xff0c;使用tomcat连接linux中的数据库........ 在下面的注意事项中我会将…

wpf devexpress 自定义统计

总计统计和分组统计包含预定义总计函数。这些函数允许你计算如下&#xff1a; 数据列的数量&#xff08;Count&#xff09; 最大和最小值(Max和Min) 总计和平均值&#xff08;Sum和Average&#xff09; 处理GridControl.CustomSummary 事件或者使用 GridControl.CustomSumm…

Android Glide加载transform CenterCrop, CircleCrop ShapeableImageView圆形图并描边,Kotlin

Android Glide加载transform CenterCrop, CircleCrop ShapeableImageView圆形图并描边&#xff0c;Kotlin import android.os.Bundle import androidx.appcompat.app.AppCompatActivity import com.bumptech.glide.load.resource.bitmap.CenterCrop import com.bumptech.glide.…

7、使用真机调试鸿蒙项目

此处以华为手机为例&#xff0c;版本为鸿蒙4.0. 一、打开手机调试功能 1、打开开发者模式 打开“设置”—“关于手机”&#xff0c;连续点击“软件版本”可打开开发者模式 2、开启USB调试功能 打开“设置”—“系统更新”—“开发者选项”&#xff0c;下拉找到“USB调试”…