kafka晋升之路
- 一:故事背景
- 二:核心概念
- 2.1 系统架构
- 2.2 生产者(Producer)
- 2.2.1 生产者分区
- 2.2.2 生产者分区策略
- 2.3 经纪人(Broker)
- 2.3.1 主题(Topic)
- 2.3.2 分区(Partition)
- 2.3.2 消息
- 2.4 消费者(Consumer)
- 2.4.1 消费者组(ConsumerGroup)
- 2.4.2 协调者(Coordinator)
- 2.4.3 消费者策略
- 2.4.4 重平衡 Rebalance
- 三:进阶概念
- 3.1 消息幂等
- 3.2 事务
- 3.3 拦截器
- 3.4 控制器(Controller)
- 3.4 日志存储
- 3.5 常用参数配置
- 3.5.1 broker.id
- 3.5.2 port
- 3.5.3 zookeeper.connect
- 3.5.4 log.dirs
- 3.5.5 auto.create.topics.enable
- 3.5.6 num.partitions
- 3.5.7 default.replication.factor
- 3.5.8 log.retention.ms
- 3.5.9 message.max.bytes
- 四:常见问题
- 4.1 消息丢失问题
- 4.1.1 生产者消息丢失
- 4.1.2 消费者消息丢失
- 4.2 重复消费问题
- 4.3 消息顺序问题
- 五:总结提升
一:故事背景
一直在使用消息队列中间件,今天进行梳理其对应的知识。本篇文章将会带你系统梳理常用消息中间件kafka。主要侧重于核心知识、应用场景、常见问题。希望读者能够通过本篇文章系统了解,应用kafka。
二:核心概念
Kafka是一种分布式的,基于发布/订阅的消息系统。消息系统的作用,我们就不在这里讲了,大家可以网上自行查阅。接下来我们主要讲一讲kafka的系统架构,整个架构的各个角色。
2.1 系统架构
通常情况下,一个kafka体系架构包括 「多个Producer」、「多个Consumer」、「多个broker」以及「一个Zookeeper集群」。
一张图胜过千言万语,首先让我们开看一下,kafka的基本的架构。
这里我们列出了kafka的基本架构,整张图宏观看下来分为了四大部分,Zookeeper是用来做集群管理、元数据以及控制器选择的,抛出它以外剩下的三部分分别是:
- 生产者
- 队列
- 消费者
这三部分其实就是kafka的核心逻辑,生产、暂存、消费。
接下来我会围绕这张基本架构的图,详细展开讲解kafka架构的各个部分。
2.2 生产者(Producer)
生产者,负责将消息发送到kafka中。生产者是整个流程的起始位置,如果没有生产者,就没有接下来的流程。针对生产者我们只需要考虑以下两个问题。
- 谁来生产
- 生产出来放在那
谁来进行消息的生产呢?在kafka中没有对谁来进行生产进行限制,也就是说我不用关注这条消息是谁生产的,我们只需要关注,生产出来的数据应该放在哪里,这就引入了生产者分区的概念
2.2.1 生产者分区
Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
在逻辑上一个Topic就是一个队列、但是实际上一个Topic内分了多个区,每个区都是一个队列。生产者生产的数据也都是放到分区内的。
实际上的消息是放到分区的某个块上的,读写操作是针对分区的粒度上进行的。这样每个节点下每个分区都能独立的处理各自的读写请求、增加了系统的效率。
2.2.2 生产者分区策略
分区策略指的就是生产者生产出来的消息要放到具体的topic的哪一个分区下的策略,主要有以下三种:
- 轮询策略(Round-robin)
顺序分配,例如一个topic下有3个分区,第一条消息发送到分区0,第二条发送到1,第三条发送到2。第四条又会重新开始。是kafka默认的分区策略。 - 随机策略
将消息放到任意一个分区上,一般是先算出topic的总分区数,返回一个小于它的随机数即可,但是可能会造成数据分布不均匀的情况。 - 按消息键保存策略
Kafka允许为每条消息定义消息键,简称为Key。一旦消息被定义了Key,那么你就可以保证同一个Key的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
如果没有指定key的话,会走默认的轮询策略。
2.3 经纪人(Broker)
Kafka服务节点,一个或多个Broker组成了一个Kafka集群。每一个broker都是一个单独提供服务的节点,每个broker内可以放置多个topic。
2.3.1 主题(Topic)
Topic是Producer和Consumer订阅的对象,可以为每个业务、每个应用、每类数据创造相应的Topic。我们可以把一个topic看成一个逻辑上的队列,Produce生产消息放入队列,Consumer从队列取出消息进行消费。
2.3.2 分区(Partition)
kafka支持分区机制,所谓分区指的就是一个Topic内可以有多个分区,虽然这些消息同属一个Topic,但是却有不同的分区,并且只会存在一份。通过分区设置,等于一个Topic内拥有了多个队列,这些Partition可以进行单独支持接收消息,取消息。
2.3.2 消息
在 Kafka 中,“消息”(Message)是指一段数据,它可以是任何形式的信息,例如文本、图像、日志记录等。消息一般包含以下几个关键部分:
-
主题(Topic): 主题是消息的分类标签。消息会被发布到一个特定的主题中,而消费者可以订阅这些主题来接收消息。主题可以看作是消息的逻辑容器,帮助对消息进行组织和分区。
-
消息键(Message Key): 每条消息可以有一个可选的键,它用于在发布消息时指定分区。如果消息键被提供,Kafka 会使用哈希算法将所有具有相同键的消息分配到同一个分区中,以确保具有相同键的消息始终位于同一个分区内。
-
消息值(Message Value): 消息值是实际的数据内容,它可以是任何字节序列,通常是文本或二进制数据。
-
时间戳(Timestamp): 每条消息可以有一个时间戳,表示消息的创建时间。时间戳可以用于数据处理和分析,以及确保消息的顺序性。
-
分区(Partition): 主题可以被分成多个分区,每个分区是一个有序、不可变的消息序列。分区可以帮助实现消息的水平扩展,使 Kafka 集群能够处理大量消息。
-
偏移量(Offset): 每条消息在其所属分区内都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者可以根据偏移量来跟踪已经处理过的消息。
2.4 消费者(Consumer)
消费者Consumer是一种应用程序,它订阅一个或多个 Kafka 主题并从这些主题中拉取消息进行处理。
消费者通常的工作流程如下:
我们可以根据需求,创建多个消费者实例进行并行消费,以实现并行处理。
偏移量(offset)指的是队列中的一个标记变量,其记录了某个消费者组消费到了什么位置,每个消费者组都在其订阅的主题的分区内有对应的偏移量。
2.4.1 消费者组(ConsumerGroup)
- Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。一个组内可以有多个消费者实例,他们共享同一个公共的GroupID。
- 组内的所有消费者一起消费组订阅的主题内的所有分区。每个分区只能由消费者组内一个Consumer实例进行消费。
- Consumer Group之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。这就是上文所讲每个消费者组都在其订阅的主题的分区内有对应的偏移量。
2.4.2 协调者(Coordinator)
- 协调者,它专门为Consumer Group服务,负责为Group执行Rebalance以及提供位移管理和组成员管理等。
- Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移,同样地,当Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。
- 所有Broker都有各自的Coordinator组件。
2.4.3 消费者策略
上文我们说到,一个消费者组内有多个消费者实例,这些实例一起消费订阅的主题内的所有分区。那么这些分区应该怎么分呢?一个实例具体应该去消费那个或那几个分区呢?针对这个问题,kafka有3种对应的消费者策略。
-
Round
默认的轮循方式、决定消费者分区。假设有7个分区,分区将会如下分配:
-
Range
对一个消费者组来说决定消费方式是以分区总数除以消费者总数来决定,一般如果不能整除,往往是从头开始将剩余的分区分配开。
一个分区放两个,剩下的轮循。 -
Sticky
它是在Range上的一种升华,且前面两个当同组内有新的消费者加入或者旧的消费者退出的时候,会从新开始决定消费者消费方式,但是Sticky,在同组中有新的新的消费者加入或者旧的消费者退出时,不会直接开始新的Range分配,而是保留现有消费者原来的消费策略,将退出的消费者所消费的分区平均分配给现有消费者,新增消费者同理,同其他现存消费者的消费策略中分离。
2.4.4 重平衡 Rebalance
上文说了消费者的分配策略,但是我们的消费者往往都不是静态的,会有消费者上线、下线的情况。这时候就引入了Rebalance机制,Rebalance触发的条件有3个分别是:
- 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,或是有Consumer实例崩溃被踢出组。
- 订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明该Group订阅所有以字母t开头、字母c结尾的主题,在Consumer Group的运行过程中,你新创建了一个满足这样条件的主题,那么该Group就会发生Rebalance。
- 订阅主题的分区数发生变更。Kafka当前只能允许增加一个主题的分区数,当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance。
三:进阶概念
3.1 消息幂等
在Kafka中,Producer默认不是幂等性的,但我们可以创建幂等性Producer。
设置方法:
props.put(“enable.idempotence”, ture);
或
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
通过设置幂等,kafka会自动对你的消息进行去重,确保重复的消息只会有一条。
一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。它只能实现单会话上的幂等性,不能实现跨会话的幂等性
3.2 事务
事务型Producer能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
设置方式:
和幂等性Producer一样,开启enable.idempotence = true。
设置Producer端参数transactional. id,最好为其设置一个有意义的名字。
producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}
3.3 拦截器
拦截器一共有两类四种,分别是:
- onSend:该方法会在消息发送之前被调用。
- onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。
- onConsume:该方法在消息返回给Consumer程序之前调用。
- onCommit:Consumer在提交位移之后调用该方法。
3.4 控制器(Controller)
在kafka中,控制器会由某个Broker节点担任,每个Broker启动时,都会尝试成为控制器,但是只有第一个会成为控制器,当运行中的控制器宕机或中止的时候,kafka会进行Failover(故障转移),选择新的控制器。
控制器主要有以下几个职责:
- 主题管理(创建、删除、增加分区)
- 分区重分配
- Preferred领导者选举
- 集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
- 数据服务
3.4 日志存储
Kafka中的消息是以主题为基本单位进行归类的,每个主题在逻辑上相互独立。
每个主题又可以分为一个或多个分区,在不考虑副本的情况下,一个分区会对应一个日志。
在kafka中引入的日志分段的概念,默认最大是1G,超过1G,日志文件就会分出一个新的段。
3.5 常用参数配置
3.5.1 broker.id
每个 kafka broker 都有一个唯一的标识来表示
这个唯一的标识符即是 broker.id,它的默认值是 0
这个值在 kafka 集群中必须是唯一的,可以任意设定
3.5.2 port
默认为9092端口,可以修改
3.5.3 zookeeper.connect
用来指定zookeeper的连接,其中有如下几个指定参数
- hostname 是 Zookeeper 服务器的机器名或者 ip 地址。
- port 是 Zookeeper 客户端的端口号
- /path 是可选择的 Zookeeper 路径,Kafka 路径是使用了 chroot 环境,如果不指定默认使用跟路径。
3.5.4 log.dirs
Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs 来指定的
3.5.5 auto.create.topics.enable
自动创建主题、默认情况下,kafka自动创建主题
3.5.6 num.partitions
num.partitions 参数指定了新创建的主题需要包含多少个分区,该参数的默认值是 1。
3.5.7 default.replication.factor
kafka保存消息的副本数。
3.5.8 log.retention.ms
决定数据可以保留多久,默认是 168 个小时,也就是一周
3.5.9 message.max.bytes
限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。
四:常见问题
4.1 消息丢失问题
4.1.1 生产者消息丢失
问题描述:
Kafka Producer是异步发送消息的,也就是说如果你调用producer.send(msg)这个API,那么它通常会立即返回。实际上消息并没有发送成功,消息有可能因为网络抖动、消息过大、等原因没有实际发送成功。
解决方式:
**不要使用producer.send(msg),而要使用producer.send(msg, callback)。**通过回调确认消息是否真正发送成功
4.1.2 消费者消息丢失
问题描述:
Consumer端丢失数据主要体现在Consumer端要消费的消息不见了。Consumer程序从Kafka获取到消息后开启了多个线程异步处理消息,而Consumer程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于Consumer而言实际上是丢失了。
解决方案:
这里问题的关键存在于kafka的自动提交。如果是多线程异步处理消费消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移线程执行失败了,不提交位移,将这组消息视为失败。
但这里有另外一个问题,**可能一组50条的数据,已经成功处理了20条,但是我们又无法去提交成功了20条,位移20条的信息。**我们可以在每个消费者的处理逻辑中,根据业务去验证拿到的消费是否已经消费过,避免重复消费。
4.2 重复消费问题
问题描述:
上文已经提到一种重复消费问题,还有另外一种场景即:
消费者消费时间过长
max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起离开组的请求,Coordinator 也会开启新一轮 Rebalance。
解决方案:
- 提高消费能力,提高单条消息的处理速度;根据实际场景可讲max.poll.interval.ms值设置大一点,避免不必要的rebalance;可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。
- 生成消息时,可加入唯一标识符如消息id,在消费端,保存最近的1000条消息id存入到redis或mysql中,消费的消息时通过前置去重。
4.3 消息顺序问题
问题描述
由于kafka的设计中一个Topic可以包含多个Partition,每个parttition内部是有序的,kafka只能保证partition内部有序,所以业务设计到需要有序的情况时,需要在parttition层面上进行控制,已达到有序。
解决方案
- 可以设置topic,有且只有一个partition
- 根据业务需要,需要顺序的 指定为同一个partition
- 根据业务需要,比如同一个订单,使用同一个key,可以保证分配到同一个partition上
上述只能从生产的过程中保证生产消息是有序的,但是如果消费者是通过多线程进行消息消费的,任然可能出现顺序不符问题。如果这种情况我们可以在消费者内部,使用队列维护从kafka获得的消息,然后通过锁的方式,控制消息的取出。
五:总结提升
本文讲解了kafka的基本概念、常见问题、通过此篇文章,相信你对kafka已经有了一定的了解,赶紧实验起来吧。