Kafka是当前分布式系统中最流行的消息中间件之一,凭借着其高吞吐量的设计,在日志收集系统和消息系统的应用场景中深得开发者喜爱。本篇就聊聊Kafka相关的一些知识点。主要包括以下内容:
- Kafka简介
- Kafka特点
- Kafka基本概念
- Kafka架构
- Kafka的几个核心概念
- 分区Partition
- 复制Replication
- 消息发送
- 消费者组
- 消费偏移量
- Kafka的工程应用
Kafka简介
Kafka特点
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。相比于其他的消息队列中间件,Kafka的主要设计目标,也即其特点如下:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展
Kafka基本概念
Broker
- Kaka集群中的一台或多台服务器称为Broker。Broker存储Topic的数据。
- 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
- 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
- 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Topic
- 发布到Kafka的每条消息都有一个类别,是个逻辑概念。
- 物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处
Partition
- 物理上的Topic分区,一个Topic可以分为多个Partition,至少有一个Partition。
- 每个Partition中的数据使用多个segment文件存储,每个Partition都是一个有序的队列,不同Partition间的数据是无序的。
- Partition中的每条消息都会被分配一个有序的ID(即offset)。
Producer
- 消息和数据的生产者。Producer将消息发布到Kafka的topic中。
- Broker接收到Producer发布的消息后,Broker将该消息追加到当前用于追加数据的segment文件中。
- Producer发送的消息,存储到一个Partition中,Producer也可以指定数据存储的Partition。
Consumer
- 消息和数据的消费者。Consumer从Broker中读取数据。
- Consumer可以消费多个topic中的数据。
Consumer Group
- 每个消费者都属于一个特定的消费者组。
- 可为每个Consumer指定group name,若不指定group name则属于默认的group。
- 一个Topic可以有多个消费者组,Topic的消息会被复制到所有的消费者组中,但每个消费者组只会把消息发送给该组中的一个消费者。
- 消费者组是Kafka用来实现一个Topic消息的广播和单播的手段。
Leader
- 每个Partition有多个副本,其中有且仅有一个作为leader。
- Leader是当前负责数据的读写的Partition。
Follower
- Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。
- 如果Leader失效,则从Follower中选举出一个新的Leader。
- 如果Follower与Leader挂掉、卡住或同步太慢,Leader会把这个Follower从"in sync replicas"列表中删除,重新创建一个Follower。
Kafka架构
Kafka一般以集群方式来部署,一个典型的Kafka集群架构如下图所示:
Kafka的几个核心概念
分区Partition
分区的几个特点
- 分区是Kafka的基本存储单元,在一个Topic中会有一个或多个Partition,不同的Partition可位于不同的服务器节点上,物理上一个Partition对应于一个文件夹。
- Partition内包含一个或多个Segment,每个Segment又包含一个数据文件和一个与之对应的索引文件。
- 对于写操作,每次只会写Partition内的一个Segment;对于读操作,也只会顺序读取同一个Partition内的不同Segment。
- 逻辑上,可以把Partition当做一个非常长的数组,使用时通过这个数组的索引(offset)访问数据。
高吞吐量设计
分区正是Kafka高吞吐量设计的方法之一,具体体现在这样几点:
- 由于不同的Partition可位于不同的机器上,因此可以实现机器间的并行处理。
- 由于一个Partition对应一个文件夹,多个Partition也可位于同一台服务器上,这样就可以在同一台服务器上使不同的Partition对应不同的磁盘,实现磁盘间的并行处理。
- 故一般通过增加Partition的数量来提高系统的并行吞吐量,但也会增加轻微的延迟。
但以下这几种情况需要注意:
- 当一个Topic有多个消费者时,一个消息只会被一个消费者组里的一个消费者消费;
- 由于消息是以Partition为单位分配的,在不考虑Rebalance时,同一个Partition的数据只会被一个消费者消费,所以如果消费者的数量多于Partition的数量,就会存在部分消费者不能消费该Topic的情况,此时再增加消费者并不能提高系统的吞吐量;
- 在生产者和Broker的角度,对不同Partition的写操作是完全并行的,可是对于消费者其并发数则取决于Partition的数量。实际中配置的Partition数量需要根据所设计的系统吞吐量来推算。
复制
复制原理
Kafka利用zookeeper来维护集群成员的信息,每个Broker实例都会被设置一个唯一的标识符,Broker在启动时会通过创建临时节点的方式把自己的唯一标识注册到zookeeper中,Kafka中的其他组件会监视Zookeeper里的/broker/ids路径,所以当集群中有Broker加入或退出时,其他组件就会收到通知。集群间数据的复制机制,在Kafka中是通过Zookeeper提供的leader选举方式实现数据复制方案。基本原理是:首先选举出一个leader,其他副本作为Follower,所有的写操作都先发给leader,然后再由leader把消息发给Follower。复制功能是Kafka架构的核心之一,因为它可以在个别节点不可用时还能保证Kafka整体的可用性。Kafka中的复制操作也是针对分区的。一个分区有多个副本,副本被保存在Broker上,每个Broker都可以保存上千个属于不同Topic和分区的副本。副本有两种类型:
- leader副本:每个分区都会有,所有生产者和消费者的请求都会经过leader;
- follower副本:不处理客户端的请求,它的职责是从leader处复制消息数据,使自己和leader的状态保持一致;
- 如果leader节点宕机,那么某个follower就会被选为leader继续对外提供服务;
- 复制因子:一个分区有几个副本。
消息发送方式
从生产者的角度来看,消息发送到Broker有三种方式:
- 立即发送:只发送消息,不关心消息发送的结果。本质上也是一种异步发送的方式,消息先存储在缓冲区中,达到设定条件后批量发送。当然这是kafka吞吐量最高的一种方式,并配合参数acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息。但是也是消息最不可靠的一种方式,因为对于发送失败的消息没有做任何处理。
- 同步发送:生产者发送消息后获取返回的Future对象,根据该对象的结果查看发送是否成功。如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个partation上,结合参数设置retries的值让发送失败时重试,设置max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送1个消息,在消息发送成功后立刻flush,从而控制消息顺序发送。
- 异步发送:生产者发送消息时将注册的回调函数作为入参传入,生产者接收到Kafka服务器的响应时会触发执行回调函数。如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数retries=0,并将发送失败的消息记录到日志文件中。
消息发送确认
消息发送到Broker后怎么算投递成功呢,Kafka有三种确认模式:
- 不等Broker确认就认为投递成功;
- 由leader来确认投递成功;
- 由所有的leader和follower都确认才认为是成功的。
三种模式对比的话,性能依次降低,但可靠性依次提高。
消息重发机制
当从Broker接收到的是临时可恢复的异常时,生产者会向Broker重发消息,重发次数的限制值由初始化生产者对象的retries属性决定,在默认情况下生产者会在重试后等待100ms,可以通过retry.backoff.ms属性进行修改。
批次发送
当有多条消息要被发送到同一个分区时,生产者会把它们放到同一个批次里,Kafka通过批次的概念来提高吞吐量,但同时也会增加延迟。对批次的控制主要通过构建生产者对象时的两个属性来实现:
- batch.size:当发往每个分区的缓存消息数量达到这个数值时,就会触发一次网络请求,批次里的所有消息都会被发送出去;
- linger.ms:每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即把消息发送出去。
消费者组
消费者组是Kafka提供的可扩展且具有容错性的消费机制,在一个消费者组内可以有多个消费者,它们共享一个唯一标识,即分组ID。组内的所有消费者协调消费它们订阅的主题下的所有分区的消息,但一个分区只能由同一个消费者组里的一个消费者来消费。
广播和单播
一个Topic可以有多个消费者组,Topic的消息会被复制到所有的消费者组中,但每个消费者组只会把消息发送给一个消费者组里的某一个消费者。如果要实现广播,只需为每个消费者都分配一个单独的消费者组接口。如果要实现单播,则需要把所有的消费者都设置在同一个消费者组里
再均衡
消费者组里有新消费者加入或者有消费者离开,分区所有权会从一个消费者转移到另一个消费者。再均衡协议规定了一个消费者组下的所有消费者如何达成一致来分配主题下的每个分区。触发再均衡的场景有三种:
- 一是消费者组内成员发生变更
- 二是订阅的主题数量发生变更
- 三是订阅主题的分区数量发生变更
消费偏移量
Kafka中有一个叫作_consumer_offset特殊主题用来保存消息在每个分区的偏移量,消费者每次消费时都会往这个主题中发送消息,消息包含每个分区的偏移量。如果消费者一直处于运行状态,偏移量没什么作用;如果消费者崩溃或者有新的消费者加入消费者组从而触发再均衡操作,再均衡之后该分区的消费者若不是之前的那个,提交偏移量就有用了。维护消息偏移量对于避免消息被重复消费和遗漏消费,确保消息的ExactlyOnce至关重要,以下是不同的提交偏移量的方式:
- 自动提交:Kafka默认会定期自动提交偏移量,提交的时间间隔默认是5秒。此方式会产生重复处理消息的问题;
- 手动提交:在进行手动提交之前需要先关闭消费者的自动提交配置,然后用commitSync方法来提交偏移量。处理完记录后由开发者确保调用了commitSync方法,来减少重复处理消息的数量,但可能降低消费者的吞吐量;
- 异步提交:使用commitASync方法来提交最后一个偏移量。消费者只管发送提交请求,而不需要等待Broker的立即回应。
Kafka的工程应用
Kafka主要用于三种场景:
- 基于Kafka的用户行为数据采集
- 基于Kafka的日志收集
- 基于Kafka的流量削峰
基于Kafka的用户行为数据采集
要获取必要的数据进行用户行为等的分析,需要这样几个步骤:
- 前端数据(埋点)上报
- 接收前端数据请求
- 后端通过Kafka消费消息,必要时落库
- 分析用户行为
基于Kafka的日志收集
各个应用系统在输出日志时利用高吞吐量的Kafka作为数据缓冲平台,将日志统一输出到Kafka,再通过Kafka以统一接口服务的方式开放给各种消费者。做统一日志平台的方案,收集重要系统的日志集中到Kafka中,然后再导入ElasticSearch、HDFS、Storm等具体日志数据的消费者中,用于进行实时搜索分析、离线统计、数据备份、大数据分析等。
基于Kafka的流量削峰
为了让系统在大流量场景下仍然可用,可以在系统中的重点业务环节加入消息队列作为消息流的缓冲,从而避免短时间内产生的高流量带来的压垮整个应用的问题。
Kafka的生产者,包括如下内容:
- 生产者是如何生产消息
- 如何创建生产者
- 发送消息到Kafka
- 生产者配置
- 分区
生产者是如何生产消息的
首先来看一下Kafka生产者组件图
(生产者组件图。图片来源:《Kafka权威指南》)
第一步,Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
第二步,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。
如何创建生产者
属性设置
在创建生产者对象的时候,要设置一些属性,有三个属性是必选的:
- bootstrap.servers:指定Broker的地址清单,地址格式为host:port。清单里不需要包含所有的Broker地址,生产者会从给定的Broker里查找到其他Broker的信息;不过建议至少要提供两个Broker的信息保证容错。
- key.serializer:指定键的序列化器。Broker希望接收到的消息的键和值都是字节数组。这个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer,因此一般不需要实现自定义的序列化器。需要注意的是,key.serializer属性是必须设置的,即使只发送值内容。
- value.serializer:指定值的序列化器。如果键和值都是字符串,可以使用与key.serializer一样的序列化器,否则需要使用不同的序列化器。
项目依赖
以maven项目为例,要使用Kafka客户端,需要引入kafka-clients依赖:
代码语言:j
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version>
</dependency>
样例
一个简单的创建Kafka生产者的代码样例如下:
代码语言:
Properties props = new Properties();props.put("bootstrap.servers", "producer1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/*创建生产者*/Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "v" + i);/* 发送消息*/producer.send(record);}/*关闭生产者*/producer.close();
这个样例中只配置了必须的这三个属性,其他都使用了默认的配置。
发送消息Kafka
实例化生产者对象后,接下来就可以开始发送消息了。发送消息主要有三种方式:
- 发送并忘记(fire-and-forget):把消息发送给服务器,但并不关心消息是否正常到达,也就是上面样例中的方式。大多数情况下,消息会正常到达,这可以由Kafka的高可用性和自动重发机制来保证。不过有时候也会丢失消息。
- 同步发送:使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,我们就可以知道消息是否发送成功。
- 异步发送:调用send()方法时,同时指定一个回调函数,服务器在返回响应时调用该函数。
发送并忘记
这是最简单的消息发送方式,只发送不管发送结果,代码样例如下:
代码语言:
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v"); // 1
try {producer.send(record); // 2
} catch (Exception e) {e.printStackTrace(); // 3
}
这段代码要注意几点:
- 生产者的send()方法将ProducerRecord对象作为参数,样例里用到的ProducerRecord构造函数需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型都必须与序列化器和生产者对象相匹配。
- 使用生产者的send()方法发送ProducerRecord对象。消息会先被放进缓冲区,然后使用单独的线程发送到服务器端。send()方法会返回一个包含RecordMetadata的Future对象,不过此处不关注返回了什么。
- 发送消息时,生产者可能会出现一些执行异常,序列化消息失败异常、缓冲区超出异常、超时异常,或者发送线程被中断异常。
同步发送
在上一种发送方式中已经解释过同步发送和只发送的区别,以下是最简单的同步发送方式的代码样例,对比可以看到区别:
代码语言:
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v");
try {producer.send(record).get;
} catch (Exception e) {e.printStackTrace();
}
可以看到,二者的区别就在于是否接收发送结果。同步发送会接收send()方法的返回值,即一个Future对象,通过调用Future对象的get()方法来等待Kafka响应。如果服务器返回错误,则get()方法就会抛出异常。如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它来获取消息的偏移量。
异步发送消息
对于吞吐量要求比较高的应用来说,又要同时保证服务的可靠性,发送并忘记方式可靠性较低,但同步发送方式又会降低吞吐量,这就需要异步发送消息的方式了。大多数时候,生产者并不需要等待响应,只需要在遇到消息发送失败时,抛出异常、记录错误日志,或者把消息写入“错误日志”文件便于以后分析。代码样例如下:
代码语言:
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v");
// 异步发送消息,并监听回调
producer.send(record, new Callback() { // 1@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) { // 2if (exception != null) {// 进行异常处理} else {System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset());}}
});
- 从上面代码可以看到,为了使用回调,只需要实现一个org.apache.kafka.clients.producer.Callback接口即可,这个接口只有一个onComplete方法。
- 如果Kafka返回错误,onComplete方法会抛出一个非空异常。在调用send()方法的时候会传入这个callback对象,根据发送的结果决定调用异常处理方法还是发送结果处理方法。
生产者配置
在创建生产者的时候,介绍了三个必须的属性,再介绍下其他的生产者属性:
acks
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
- acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
buffer.memory
该参数用来设置生产者内存缓冲区的大小生产者用它缓冲要发送到服务器的消息。如果程序发送消息的速度超过了发送到服务器的速度,会导致生产者缓冲区空间不足,这时候调用send()方法要么被阻塞,要么抛出异常。
compression.type
默认情况下,发送的消息不会被压缩。它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩,可选值有 snappy(占用CPU少,关注性能和网络带宽时选用),gzip(占用CPU多,更高压缩比,网络带宽有限时选用),lz4。
retries
指定了生产者放消息发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。
batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
linger.ms
该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。
client.id
客户端 id,服务器用来识别消息的来源。
max.in.flight.requests.per.connection
指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试。
timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms
- timeout.ms 指定了 borker 等待同步副本返回消息的确认时间;
- request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;
- metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。
max.block.ms
该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
max.request.size
该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1000K ,那么可以发送的单个最大消息为 1000K ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1K。
receive.buffer.bytes和send.buffer.byte
这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。
分区
分区器
上面在说明生产者发送消息方式的时候有如下一行代码:
代码语言:
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v");
这里指定了Kafka消息的目标主题、键和值。ProducerRecord对象包含了主题、键和值。键的作用是:
- 作为消息的附加信息;
- 用来决定消息被写到主题的哪个分区,拥有相同键的消息将被写到同一个分区。
键可以设置为默认的null,是不是null的区别在于:
- 如果键为null,那么分区器使用轮询算法将消息均衡地分布到各个分区上;
- 如果键不为null,那么 分区器 会使用内置的散列算法对键进行散列,然后分布到各个分区上。
要注意的是,只有在不改变分区主题分区数量的情况下,键与分区之间的映射才能保持不变。
顺序保证
Kafka可以保证同一个分区里的消息是有序的。考虑一种情况,如果retries为非零整数,同时max.in.flight.requests.per.connection为比1大的数如果某些场景要求消息是有序的,也即生产者在收到服务器响应之前可以发送多个消息,且失败会重试。那么如果第一个批次消息写入失败,而第二个成功,Broker会重试写入第一个批次,如果此时第一个批次写入成功,那么两个批次的顺序就反过来了。也即,要保证消息是有序的,消息是否写入成功也是很关键的。那么如何做呢?在对消息的顺序要严格要求的情况下,可以将retries设置为大于0,max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给Broker。当然这会严重影响生产者的吞吐量。
Kafka的消费者,包括如下内容:
- 消费者和消费者组
- 如何创建消费者
- 如何消费消息
- 消费者配置
- 提交和偏移量
- 再均衡
- 结束消费
消费者和消费者组
概念
Kafka消费者对象订阅主题并接收Kafka的消息,然后验证消息并保存结果。Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。消费者组的设计是对消费者进行的一个横向伸缩,用于解决消费者消费数据的速度跟不上生产者生产数据的速度的问题,通过增加消费者,让它们分担负载,分别处理部分分区的消息。
消费者数目与分区数目
在一个消费者组中的消费者消费的是一个主题的部分分区的消息,而一个主题中包含若干个分区,一个消费者组中也包含着若干个消费者。当二者的数量关系处于不同的大小关系时,Kafka消费者的工作状态也是不同的。看以下三种情况:
- 消费者数目<分区数目:此时不同分区的消息会被均衡地分配到这些消费者;
- 消费者数目=分区数目:每个消费者会负责一个分区的消息进行消费;
- 消费者数目>分区数目:此时会有多余的消费者处于空闲状态,其他的消费者与分区一对一地进行消费。
分区再均衡
当消费者数目与分区数目在以上三种关系间变化时,比如有新的消费者加入、或者有一个消费者发生崩溃时,会发生分区再均衡。
分区再均衡是指分区的所有权从一个消费者转移到另一个消费者。再均衡为消费者组带来了高可用性和伸缩性。但是同时,也会发生如下问题:
- 在再均衡发生的时候,消费者无法读取消息,会造成整个消费者组有一小段时间的不可用;
- 当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能需要去刷新缓存,在它重新恢复状态之前会拖慢应用。
因此也要尽量避免不必要的再均衡。
那么消费者组是怎么知道一个消费者可不可用呢?
消费者通过向被指派为群组协调器的Broker发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。还有一点需要注意的是,当发生再均衡时,需要做一些清理工作,具体的操作方法可以通过在调用subscribe()方法时传入一个ConsumerRebalanceListener实例即可。
如何创建消费者
创建Kafka的消费者对象的过程与创建生产者的过程是类似的,需要传入必要的属性。在创建消费者的时候以下三个选项是必选的:
- bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
- key.deserializer :指定键的反序列化器;
- value.deserializer :指定值的反序列化器。
后两个序列化器的说明与生产者的是一样的。一个简单的创建消费者的代码样例如下:
代码语言:
String topic = "Hello";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "server:9091");
/*指定分组 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
如何消费消息
订阅主题
创建了Kafka消费者之后,接着就可以订阅主题了。订阅主题可以使用如下两个 API :
- consumer.subscribe(Collection topics) :指明需要订阅的主题的集合;
- consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合。
代码样例:
代码语言:
consumer.subscribe(Collections.singletonList(topic));
轮询消费
消息轮询是消费者API的核心,消费者通过轮询 API(poll) 向服务器定时请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,这使得开发者只需要关注从分区返回的数据,然后进行业务处理。一个简单的消费者消费的代码样例如下:
代码语言:
try {while (true) {// 轮询获取数据ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));for (ConsumerRecord<String, String> record : records) {System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",record.topic(), record.partition(), record.key(), record.value(), record.offset());}}
} finally {consumer.close();
}
消费者配置
与生产者类似,消费者也有完整的配置列表。接下来一一介绍这些重要的属性。
fetch.min.byte
消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker 会等待有足够的可用数据时才会把它返回给消费者。主要是为了降低消费者和Broker的工作负载。
fetch.max.wait.ms
broker 返回给消费者数据的等待时间,默认是 500ms。如果消费者获取最小数据量的要求得不到满足,就会在等待最多该属性所设置的时间后获取到数据。实际要看二者哪个条件先满足。
max.partition.fetch.bytes
该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。
session.timeout.ms
消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
- latest (默认值) :在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的最新记录);
- earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
enable.auto.commit
是否自动提交偏移量,默认值是 true。为了避免出现重复消费和数据丢失,可以把它设置为 false。
client.id
客户端 id,服务器用来识别消息的来源。
max.poll.records
单次调用 poll() 方法能够返回的记录数量。
receive.buffer.bytes & send.buffer.byte
这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。
提交和偏移量
提交是指更新分区当前位置的操作,分区当前的位置,也就是所谓的偏移量。
什么是偏移量
Kafka 的每一条消息都有一个偏移量属性,记录了其在分区中的位置,偏移量是一个单调递增的整数。消费者通过往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有 什么用处。不过,如果有消费者退出或者新分区加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。因为这个原因,所以如果不能正确提交偏移量,就可能会导致数据丢失或者重复出现消费,比如下面情况:
- 如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复消费;
- 如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
偏移量提交
那么消费者如何提交偏移量呢?Kafka 支持自动提交和手动提交偏移量两种方式。
自动提交:
只需要将消费者的 enable.auto.commit 属性配置为 true 即可完成自动提交的配置。此时每隔固定的时间,消费者就会把 poll() 方法接收到的最大偏移量进行提交,提交间隔由 auto.commit.interval.ms 属性进行配置,默认值是 5s。使用自动提交是存在隐患的,假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。
手动提交:
用户可以通过将 enable.auto.commit 设为 false,然后手动提交偏移量。基于用户需求手动提交偏移量可以分为两大类:手动提交当前偏移量:即手动提交当前轮询的最大偏移量;手动提交固定偏移量:即按照业务需求,提交某一个固定的偏移量。
而按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。
同步提交:通过调用 consumer.commitSync() 来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量。
代码语言:
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));for (ConsumerRecord<String, String> record : records) {System.out.println(record);}// 同步提交consumer.commitSync();
}
如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降低程序的吞吐量。
异步提交
为了解决同步提交降低程序吞吐量的问题,又有了异步提交的方案。异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待 Broker 的响应。代码样例如下:
代码语言:
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));// 异步提交并定义回调consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset()));}}});
}
异步提交如果失败,错误信息和偏移量都会被记录下来。尽管如此,异步提交存在的问题是,如果提交失败不能重试,因为重试可能会出现小偏移量覆盖大偏移量的问题。虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试。可以通过一个 Map offsets 来维护你提交的每个分区的偏移量,也就是异步提交的顺序,在每次提交偏移量之后或在回调里提交偏移量时递增序列号。然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。
同步和异步组合提交:当发生关闭消费者或者再均衡时,一定要确保能够提交成功,为了保证性能和可靠性,又有了同步和异步组合提交的方式。也就是在消费者关闭前组合使用commitAsync()方法和commitSync()方法。代码样例如下:
代码语言:
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));for (ConsumerRecord<String, String> record : records) {System.out.println(record);}// 异步提交consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();
} finally {try {// 因为即将要关闭消费者,所以要用同步提交保证提交成功consumer.commitSync();} finally {consumer.close();}
}
提交特定的偏移量
上面的提交方式都是提交当前最大的偏移量,但如果需要提交的是特定的一个偏移量呢?只需要在重载的提交方法中传入偏移量参数即可。代码样例如下:
代码语言:
// 同步提交特定偏移量
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
// 异步提交特定偏移量
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
结束消费
上面的消费过程都是以无限循环的方式来演示的,那么如何来优雅地停止消费者的轮询呢。Kafka 提供了 consumer.wakeup() 方法用于退出轮询。如果确定要退出循环,需要通过另一个线程调用consumer.wakeup()方法;如果循环运行在主线程里,可以在ShutdownHook里调用该方法。它通过抛出 WakeupException 异常来跳出循环。需要注意的是,在退出线程时最好显式的调用 consumer.close() , 此时消费者会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。下面的示例代码为监听控制台输出,当输入 exit 时结束轮询,关闭消费者并退出程序:
代码语言:
// 调用wakeup优雅的退出轮询
final Thread mainThread = Thread.currentThread();
new Thread(() -> {Scanner sc = new Scanner(System.in);while (sc.hasNext()) {if ("exit".equals(sc.next())) {consumer.wakeup();try {// 等待主线程完成提交偏移量、关闭消费者等操作mainThread.join();break;} catch (InterruptedException e) {e.printStackTrace();}}}
}).start();try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));for (ConsumerRecord<String, String> rd : records) {System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());}}
} catch (WakeupException e) {// 无需处理此异常
} finally {consumer.close();
}