配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
public Map<String,Object> producerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;
}public ProducerFactory producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfigs());
}// 覆盖spring-kafka中的配置
@Bean
public KafkaTemplate<String,Object> kafkaTemplate(){return new KafkaTemplate<String,Object>(producerFactory());
}
自定义消息拦截器
public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {// 发送消息时,对消息拦截。@Overridepublic ProducerRecord<String,Object> onSend(ProducerRecord producerRecord) {System.out.println("拦截消息" + producerRecord.toString());return null;}// 服务器是否收到了当前这条消息@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(recordMetadata != null){System.out.println("服务器收到消息" + recordMetadata.offset());}else{// 没有收到消息发送失败System.out.println("消息发送失败!!!");}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}