介绍
死信就是无法被消费的消息。生产者将消息投递给broker或者直接到队列里,消费者从队列中取出消息进行消费。但是某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就变成了死信。有死信自然就有死信队列。
将不能消费的消息也就是死信放入到死信队列,后续再去处理死信队列中的消息。
应用场景:为了保证订单业务的消息不丢失,需要用到死信队列机制。用户在商城下单成功并点击去支付后,在指定时间未支付时自动失效。
死信的来源
1.消息的TTL过期,消息的存活时间。比如消息存活时间为10s,如果不进行消费,就会变成死信。
2.队列达到最大长度,比如队列满了,无法再添加数据了。
3.消息被拒绝,消息方进行应答时进行了否定应答和拒绝应答basic.reject, basic.nack 并且requeue=false不放入队列中。
死信架构图
当消费者C1启动后,创建了普通队列和普通交换机,死信队列和死信交换机,以及它们之间的关系后,然后手动停掉。启动消息生产者(Producer)并发送十条消息,每条消息的过期时间都是10ms。可以看出,消息首先进入到普通队列,然后因为普通队列的消息过期时间到了又无法被消费成为了死信,进入到死信队列中。
代码
C1消费者
package com.xkj.org.mq.dead;import com.rabbitmq.client.*;
import com.xkj.org.utils.RabbitMQUtil;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;public class Consumer01 {// 普通交换机public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列public static final String NORMAL_QUEUE = "normal_queue";// 死信队列public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();//声明普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);Map<String, Object> arguments = new HashMap<>();//设置消息过期时间10s(消息过期时间一般更多的是在消息的生产者中去设置更加灵活,而不是在这里设置比较固定不能修改)
// arguments.put("x-message-ttl", 10000);//普通队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);//设置死信routingKeyarguments.put("x-dead-letter-routing-key", "lisi");//声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);//声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//绑定普通队列与普通交换机channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");//绑定死信队列与死信交换机channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("Consumer01等待接收消息...");//消费接收消息DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer01普通队列接收到消息:"+ new String(message.getBody(), "UTF-8"));};CancelCallback cancelCallback = consumerTag -> {System.out.println("Consumer01 取消消息的消费");};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);}}
Producer生产者
package com.xkj.org.mq.dead;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.xkj.org.utils.RabbitMQUtil;import java.io.IOException;public class Producer {// 普通交换机public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();//设置消息过期时间10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "消息"+i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,message.getBytes("UTF-8"));}}
}
C2消费者
package com.xkj.org.mq.dead;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xkj.org.utils.RabbitMQUtil;import java.io.IOException;public class Consumer02 {// 死信队列public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();System.out.println("Consumer02等待接收消息...");//消费接收消息DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("Consumer02普通队列接收到消息:"+ new String(message.getBody(), "UTF-8"));};CancelCallback cancelCallback = consumerTag -> {System.out.println("Consumer02 取消消息的消费");};channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);}
}
C2消费者
可以看出10条消息因为ttl过期,变成死信后进入死信队列,全部被消费者C2所消费。