架构图
配置
package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterConfig {public String Nomarl_Exchange = "normal_exchange";public String Normal_Queue = "normal_queue";public String Normal_RoutingKey = "normal.#";public String Dead_Exchange = "dead_exchange";public String Dead_Queue = "dead_queue";public String Dead_RoutingKey = "dead.#";@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(Nomarl_Exchange).build();}@Beanpublic Queue normalQueue(){return QueueBuilder.durable(Normal_Queue).deadLetterExchange(Dead_Exchange).deadLetterRoutingKey("dead.sss.a").build();}@Beanpublic Binding bindingNormalKey(Exchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with(Normal_RoutingKey).noargs();}@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(Dead_Exchange).build();}@Beanpublic Queue deadQueue(){return QueueBuilder.durable(Dead_Queue).build();}@Beanpublic Binding bindingDeadKey(Exchange deadExchange,Queue deadQueue){return BindingBuilder.bind(deadQueue).to(deadExchange).with(Dead_RoutingKey).noargs();}}
1.被消费者拒绝,并且requeue值设置为false
package com.example.demo.consumer;import com.example.demo.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class RejectConsumer {@RabbitListener(queues = DeadLetterConfig.Normal_Queue)public void rejectOrBicNack(String str, Channel channel, Message message) throws IOException {System.out.println("接收到消息"+str);//1.进行channel进行basicNack,记得将requeue设置为false
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//记得在配置文件配置 acknowledge-mode: manual #开启手动ACK
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);//以上方式二选一即可}
}
2.消息过期或者队列存储消息过期
public void publishExpire(){String msg = "dead dlx test expire";rabbitTemplate.convertAndSend(DeadLetterConfig.Nomarl_Exchange, "normal.211313",msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");//设置过期时间return message;}});}
消息过期
@Beanpublic Queue normalQueue(){return QueueBuilder.durable(Normal_Queue).deadLetterExchange(Dead_Exchange).deadLetterRoutingKey("dead.sss.a").ttl(10000).build();}
给队列存储消息设置最大时间,超过这个时间,消息将会通过设置的这个routingkey从死信交换机转发给对应的死信队列。
3.队列消息达到最大长度
@Beanpublic Queue normalQueue(){return QueueBuilder.durable(Normal_Queue).deadLetterExchange(Dead_Exchange).deadLetterRoutingKey("dead.sss.a")//.ttl(10000).maxLength(1).build();}
通过maxLength属性设置最大数量,这里设置属性最大为1
4.设置延迟交换机
延迟交换机下载地址
package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedConfig {public static final String Delayed_Exchange = "delayed_exchange";public static final String Delayed_Queue = "delayed_queue";public static final String Delayed_RoutingKey = "delayed_routingLey";@Beanpublic Exchange buildDealayedExchange(){Map<String,Object>arguments =new HashMap<>();arguments.put("x-delayed-type","topic");String type = "x-delayed-type";Exchange exchange =new CustomExchange(Delayed_Exchange,type,true,false,arguments);return exchange;}@Beanpublic Queue buildDealyedQueue(){return QueueBuilder.durable(Delayed_Queue).build();}@Beanpublic Binding bindingDelayed(Exchange buildDealayedExchange,Queue buildDealyedQueue){return BindingBuilder.bind(buildDealyedQueue).to(buildDealayedExchange).with(Delayed_RoutingKey).noargs();}
}
这样设置之后,发送的消息会在交换机中待够设置的过期时间后才会到相应的队列。
如果消息过期时间一致,可以只不设置延迟交换机,当过期时间类型过多的时候,就可以通过设置延迟交换机来满足不同过期时间的类型。
注意,这里有个参数,arguments 类型为Map<String,Object> 注意要在这个参数里面设置交换机类型,并且放入CustomExchange的构造函数中,不然交换机会创建失败。