1. 延迟消息
2. 死信交换机
正常队列不需要接受消息。
@Configuration
public class NormalQueueConfig {@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal.direct");}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();}@Beanpublic Binding normalBings(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");}
}
死信交换机需要key和normalQueue一样。
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlx.queue", durable = "true"),exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),key = {"hi"}))public void listenDlxQueue(String msg) {log.info("消费者监听 dlx.message:【{}】", msg);}
生产者
@Testvoid testSendDelayMessage() {rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", message -> {message.getMessageProperties().setExpiration("10000");return message;});}
3. DelayExchange插件
插件
docker volume inspect mq-plugins
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
生产者
@Testvoid testDelayMessageByPlugin() {rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", message -> {message.getMessageProperties().setDelay(10000);return message;});}
消费者
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = {"hi"}))public void listenDelayQueue(String msg) {log.info("消费者监听到 delay.queue的消息:【{}】", msg);}