文章目录
- 拦截器
- 序列化器
- 分区器
拦截器
拦截器(Interceptor)是早在Kafka0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。本节主要讲述生产者拦截器的相关内容,有关消费者拦截器的具体细节先不讲。
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法:
``
public ProducerRecord<K,V> onSend(ProducerRecord<K,V> record);
public void onAcknowledgement (RecordMetadata metadata,Exception exception);
public void close();
``
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend0方法来对消息进行相应的定制化操作。一般来说最好不要修改消息ProducerRecord的topic、key和partition等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(LogCompaction)的功能。
KafkaProducer会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgementO方法,优先于用户设定的Caliback之前执行。这个方法运行在Producer 的IO线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
close0方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
自定义拦截器时需要实现ProducerInterceptor接口,并在配置类或者配置中设置需要添加的拦截器,多个拦截器存在时会根据拦截器的添加顺序(配置类或者配置中)按顺序执行。
序列化器
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了orgapache.kafkka.common.serialization.Serializer接口。
生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如StringSerializer,而消费者使用了另一种序列化器,比如IntegerSerializer那么是无法解析出想要的数据的。
如果Kafka客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。自定义序列化器需要实现Serializer接口,并在配置类或者配置中设置该自定义序列化器为生产端序列化器。
分区器
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口。
接口中partition()方法用来计算分区号,返回值为int类型。partition()方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close()方法在关闭分区器的时候用来回收一些资源。
在默认分区器DefaultPartitioner的实现中,closeO是空方法,而在partition0方法中定义了主要的分区分配逻辑。如果key不为null,那么默认的分区器会对key进行哈希,最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。
除了使用Kafka提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同DefaultPartitioner一样实现Partitioner接口即可。默认的分区器在key为null时不会选择非可用的分区,我们可以通过自定义的分区器DemoPartitioner来打破这一限制。如果自定义分区器还是需要在配置类或者配置中设置分区器为自定义的分区器。