消息应答
消息应答机制:消费者接收信息并处理完之后,告诉rabbitmq该信息已经处理,rabbitmq可以把该信息删除了.
消息自动重新入队:如果处理某个消息的消费者异常关闭了,没有发送ACK确认,rabbitmq会将其重新入队,分发给其他消费者处理。
自动应答
消息发送后即被认为传送成功。该模式弊端,消费者不断的接收消息,如果处理不及时,导致内存耗尽,系统杀死消费者进程。所以适用于消费者可以高效处理信息的情况下。
手动应答:
Channel.basicAck(long deliveryTag, boolean multiple):
确认一条或多条收到的消息。对于经过确认的消息,RabbitMQ 认为该消息成功的处理,可以将其丢弃了。
在手动应答情况下,如果消费者没有关闭,但就是不发送ACK确认,这条消息就会一直在MQ中显示unacked,不会重新分发给其他消费者。
deliveryTag:对应消息的标识。
multiple:如果为True,则应用多消息。
multiple该参数用于开启批量应答:比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的还未应答的消息都会被确认收到消息应答。
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):
拒绝一条或多条收到的消息。
deliveryTag:对应消息的标识。
multiple:如果为True,则应用多消息。
requeue:如果为True,拒绝的消息将会重新加入队列
重新入队的消息会分发给其他消费者,不重新入队的消息将会丢失/死信。
Channel.basicReject(long deliveryTag, boolean requeue):
拒绝一条消息。与 Channel.basicNack 相比少一个参数Multiple。
发布确认
生产者把消息发送到MQ后,需要MQ返回一个ACK给生产者,这样生产者才知道自己的消息成功发出去了。
实现过程:生产者将信道设置成confirm模式,所有在该信道的消息都会被指派一个唯一的id,消息到达匹配的队列后,broker发送一个确认给生产者,这样生产者就知道消息已经发布到MQ了
RabbitMQ默认没有开启发布确认模式的,需要使用Channel.confirmSelect()
开启发布确认。
Channel.waitForCinfirms()
:等待Broker确认或拒绝自上次调用以来发布的所有消息。
单个发布确认
单个确认发布:发布一个确认一个,才能接着发第二个。
优缺点:可以确认哪个消息失败,适用于吞吐量不大的程序。
public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception {publishMessageIndividually();//发布1000个单独确认消息,耗时:546ms}public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//单个确认boolean flag = channel.waitForConfirms();if(flag){// TODO:消息发送成功的处理逻辑} else {// TODO: 消息发送失败的处理逻辑}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");}
}
批量发布确认
批量发布确认:以批次发布,一批次可以有很多消息,待批次发布完,才确认。
优缺点:速度比单个快,但是不能确认哪个消息失败。
public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception {publishMessageBatch(); //发布1000个批量确认消息,耗时:117ms}public static void publishMessageBatch() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();int batchSize = 10;//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//批量确认if(i % batchSize == 0) {boolean flag = channel.waitForConfirms();if(flag) {// TODO:消息发送成功的处理逻辑} else {// TODO: 消息发送失败的处理逻辑}}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");}
}
异步发布确认
异步确认发布:每个信息都有自己的标识,一直发布即可,发布成功与否,都会有回调信息。
优缺点:最快,又能确认哪个消息发布失败。
public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {publishMessageAsync(); //发布1000个异步确认消息,耗时:61ms}public static void publishMessageAsync() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//异步确认ConfirmCallback confirmAckCallback = (deliveryTag, multiple) -> {// TODO:消息发送成功的处理逻辑};ConfirmCallback confirmNackCallback = (deliveryTag, multiple) -> {// TODO: 消息发送失败的处理逻辑};channel.addConfirmListener(confirmAckCallback, confirmNackCallback);//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");}
}
消息应答和发布确认的区别
消息应答属于消费者,消费完消息告诉MQ已经消费成功。
发布确认属于生产者,生产消息发送到MQ,MQ告诉生产者已经收到消息。