主线程将数据放入到本地累加器中record accumulator中进行存储,sender线程会异步的拉取数据到kafka集群中,这个数据拉取并且复制到kafka集群中以后,kafka需要返回给sender线程一个确认应答ack,这个确认应答用于在sender线程中进行判定sender线程是否复制拉取数据成功,如果我们在producer中设定了retries开关,那么失败以后sender线程还会多次重新复制尝试拉取数据
其中失败尝试和producer端没有任何关系,producer端只是将数据放入到本地累加器中而已,失败尝试是由sender线程重新尝试的
ack的级别:
ack = 0 ;sender线程认为拉取过去的数据kafka一定会收到
ack = 1 ; sender线程拉取过去的数据leader节点接收到,并且存储到自己的本地,然后在返回ack
ack = -1 ; sender线程拉取数据,leader节点收到存储到本地,所有follower节点全部都接收到并且存储到本地这个时候leader返回ack
综上所述ack = -1的级别是数据稳定性最高的,因为能够保证数据全部都同步完毕再返回给sender线程。
后面我们可以得知:
ack=0或1的时候,都可能会导致数据的丢失问题。
而ack=-1时,确保所有数据同步完毕才返回,但有可能导致数据的重复发送的问题。
带有确认应答的代码:
其中回调函数中的metadata对象可以知道发送数据到哪里了,exception用于区分是不是本条数据发送成功
但是这个回调函数不能做出任何的反馈操作,只能起到通知的作用
代码:
public class producerWithCallBack {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.ACKS_CONFIG, "all");//设定ack,在代码中ack的级别存在三种 0 1 allpro.put(ProducerConfig.RETRIES_CONFIG,3 );//设定重试次数pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "what can i say ? man");for(int i=0;i<5;i++){producer.send(record, new Callback() {//发送方法中增加回调代码@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//metadata中包含所有的发送数据的元数据信息//哪个topic的那个分区的第几个数据String topic = metadata.topic();int partition = metadata.partition();long offset = metadata.offset();if(exception == null ){System.out.println("success"+" "+topic+" "+partition+" "+offset);}else{System.out.println("fail"+" "+topic+" "+partition+" "+offset);}}});}producer.close();}
}
打印的元数据信息。