点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
- 事务相关配置
- 事务幂等性
- 事务操作
- 案例1:单Producer 保证仅发送一次
- 案例2:消费-转换-生产 事务仅保证一次发送
控制器
基本介绍
- Kafka集群包含若干个Broker,broker.id 指定broker的编号,编号不要重复。
- Kafka集群上创建的主题,包含若干个分区。
- 每个分区包含若干个副本,副本因子包括了Follower副本和Leader副本。
- 副本又分为ISR(同步副本分区)和OSR(非同步副本分区)
- 控制器就是一个Broker
- 控制器除了一般Broker的功能,还负责Leader分区的选举
Broker选举
控制器信息
集群里第一个启动的Broker在ZooKeeper中创建了临时的节点
<KafkaZkChroot>/controller
其他Broker在该控制器节点创建ZooKeeperWatch对象,使用ZooKeeper的监听机制接收该节点的变更。
即:Kafka通过ZooKeeper的分布式特性选举集群控制器。
下图中,节点 /controller 是一个Zookeeper临时节点,其中Brokerid:0,表示当前控制器是broker.id为0的Broker。
每个新选出的控制器通过ZooKeeper的条件递增操作获得一个全新的、数值更大的controller epoch。其他Broker在知道当前controller epoch后,如果收到由控制器发出的包含较旧epoch的消息,就会忽略它们,以防止脑裂。
比如当一个Leader副本分区所在的Broker宕机,需要选举新的Leader副本分区,有可能两个具有不同纪元数字的控制器都选举了新的Leader副本分区,如果选举出来的Leader副本分区不一样,听谁的?脑裂的时候,有纪元数字,直接使用纪元数字最新的控制器结果。
当控制器发现一个Broker离开集群,那些失去Leader副本分区的Follower分区需要一个新的Leader。
控制器宕机
此时有些问题需要考虑:
- 控制器需要知道哪个Broker宕机了?
- 控制器需要知道宕机的Broker上负责的时候哪些分区的Leader副本分区?
下图中,/brokers/ids/0 保存着Broker的信息,此节点为临时节点,如果Broker节点宕机,该节点丢失。
集群控制器负责监听ids节点,一旦节点子节点发生变化,集群控制器就会得到通知。
- 控制器遍历这些Follower副本分区,并确定谁应该成为新的Leader分区,然后向所有包含新Leader分区和现有Follower的Broker发送请求。
- 该请求消息包含了谁是新的Leader副本分区以及谁是Follower副本分区的信息,随后,新Leader分区开始处理来自生产者和消费者的请求,而跟随者开始从新Leader副本分区消费消息。
- 当控制器发现一个Broker加入集群时,它会使用BrokerId来检查新加入的Broker是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加入的Broker和其他Broker,新Broker上的副本分区开始从Leader分区那里消费消息,与Leader分区保持同步。
最后结论
- Kafka使用ZooKeeper的分布式锁选举控制器,并在节点加入集群或退出集群时通知控制器
- 控制器负责在节点加入或离开集群时进行分区Leader选举
- 控制器使用epoch来避免脑裂(脑裂是指两个节点同时认为自己是当前的控制器)
可靠性保证
基本概念
- 创建Topic的时候可以指定 --replication-factor 3,表示分区的副本数,不要超过Broker的数量。
- Leader是负责读写的节点,而其他副本则是Follower。Producer只把消息发送到Leader,Follower定期到Leader上PULL数据
- ISR是Leader负责维护的与其保持同步的Replica列表,即当前活跃的副本列表。如果一个Follower落后太多,Leader会把它从ISR中移除。
- 落后太多是指该Follower复制的消息Follower长时间没有向Leader发送fetch请求(replica.lag.time.max.ms 默认值 10000)
- 为了保证可靠性,可以设置 acks=all,Follower收到消息后,会向leader发送ACK,一旦Leader收到ISR中所有Replica的ACK,Leader就Commit,那么Leader就向Producer发送ACK。
副本的分配
当某个Topic的–replication-factor为N(N>1)时,每个Partition都有N个副本,称作Replica,原则上是将Replica均匀地分配到整个集群上,不仅如此,Partition的分配也同样需要均匀分配,为了更好的负载均衡。
副本分配的三个目标:
- 均衡的将副本分撒于各个Broker上
- 对于某个Broker上分配的分区,尽量将分区的各个副本分配到不同的机架上的Broker。
- 如果所有的Broker都有机架信息,尽量将分区的各个副本分配到不同的机架上的Broker
在不考虑机架信息的情况下:
- 第一个副本分区通过轮询的方式挑选一个Broker,进行分配。该轮询从Broker列表的随机位置进行轮询
- 其余副本通过增加偏移进行分配
失效的副本
失效副本判定
replica.lag.time.max.ms 默认大小为 10000
当ISR中的一个Follower副本滞后Leader的时间超过参数设置之后,则判断副本失效,需要将此Follower副本踢出ISR。
实现原理
具体的实现原理:当Follower副本将Leader副本的LEO之前的日志全部同步时,则认为该Follower副本已经追赶上了Leader副本,此时更新该副本的lastCaughtUpTimeMs标识。
Kafka的副本管理器(ReplicaManager)启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughtUpTimeMs差值是否大于参数replica.lag.time.max.ms指定的值。
Kafka的源码注释中也说了一般有两种情况会导致副本失效:
- Follower副本进程卡住,在一段时间内没有向Leader副本发起同步请求,比如频繁的FULL GC。
- Follower副本进程同步过慢,在一段时间内都无法追赶上Leader副本,比如IO开销过大。
如果通过工具增加副本因子,那么新增加的副本在赶上Leader副本之前也都是失效状态的。
如果一个Follower副本由于某些原因(宕机)而下线,之后又上线,在追赶上Leader副本之前也是处于失效状态。
如何查看
失效副本的分区个数是用于衡量Kafka性能指标的重要部分。Kafka本身提供了一个相关的指标,即UnderReplicatedPartitions,可以通过JMX访问:
- kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions 取值范围是大于等于0的整数,需要注意,如果Kafka集群正在做分区迁移(kafka-reassign-partition.sh)的时候,这个值也会大于0。
副本复制
日志复制算法(lgo replication algorithm)必须提供的基本保证是,如果它告诉客户端消息已经被提交,而当前Leader出现故障,新选出的Leader也必须具有该消息,在出现故障时,Kafka会从挂掉Leader的ISR里面选择一个Follower作为这个分区新的Leader。
每个分区的Leader会维护一个 in-sync replica(同步副本列表,又称ISR)。当Producer向Broker发送消息,消息先写入到对应的Leader分区,然后复制到这个分区的所有副本中。ACKS=ALL时,只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交。
什么情况会失去同步
一个副本与Leader失去同步的原因有很多,主要包括:
- 慢副本(Slow Replica):Follower replica 在一段时间内一直无法赶上Leader的写进度,造成这种情况的最常见原因之一是Follower replica上的 IO瓶颈,导致它持久化日志的时间比它从Leader消费时间要长
- 卡住副本(Stuck Replica):Follower replica 在很长一段时间内停止从Leader获取消息,这可能是因为GC停顿,或者副本故障
- 刚启动副本(Bootstrapping replica):当用户给某个主题增加副本因子时,新的Follower replicas是不同步的,直到它跟上
Leader日志
当副本落后于Leader分区时,这个副本被认为是不同步或者滞后的,在Kafka中,副本的滞后于Leader是根据replica.lag.tiime.max.ms来衡量。
如何确认某个副本滞后
通过replica.lag.time.max.ms来检测卡住的副本(Stuck replica)在所有情况下都能很好的工作,它跟踪Follower副本没有向Leader发送获取请求的时间,通过这可以推断Follower是否正常。另一方面,使用消息数量检测不同步慢副本(Slow replica)的模型只有在为单个主题或者具有同类流量模式的多个主题设置这些参数时才能很好的工作,但我们发现它不能扩展到生产集群中所有主题。