延迟队列设计
在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。
常规使用rabbitmq设计延迟队列有两种方式
- 使用创建一个延迟队列阻塞消息
- 使用延迟队列插件
Dead Letter Exchanges — RabbitMQ
配置
- To set the DLX for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
- You may also specify a routing key to use when the messages are being dead-lettered. If the routing key is not set, the message’s own routing keys are used. args.put("x-dead-letter-routing-key", “some-routing-key”);
package com.wnhz.mq.common.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DlxConfig {@Beanpublic Queue dlxQueue(){return new Queue("dlx_queue_test");}@Beanpublic DirectExchange dlxExchange(){return new DirectExchange("dlx_exchange_test");}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key");}@Beanpublic Queue normalQueue(){Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange", "dlx_exchange_test");map.put("x-dead-letter-routing-key","dlx_routing_key");return new Queue("normal_queue_test",true,false,false,map);}@Beanpublic DirectExchange normalExchange(){return new DirectExchange("normal_exchange_test");}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_test");}}
server:port: 10005spring:application:name: book-consumerautoconfigure:exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfigurationrabbitmq:host: 192.168.198.130port: 5672username: adminpassword: 123publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:prefetch: 1acknowledge-mode: auto
logging:level:com.wnhz.mq.consumer: debug
生产者发送信息
@Overridepublic void delaySendMessage() {String uuid = UUID.randomUUID().toString();CorrelationData data = new CorrelationData(uuid);String msg = "hello delay";int delayTime =5000;rabbitTemplate.convertAndSend("normal_exchange_test", "normal_routing_test", msg,p -> {p.getMessageProperties().setExpiration(String.valueOf(delayTime ));return p;});log.debug("发送一条消息{},当前时间:{},延迟{}秒", msg, new Date(), delayTime / 1000);}
}
消费者消费
@RabbitListener(queues = "dlx_queue_test")public void delayConsume(Message message){log.debug("消费者消费信息:{},当前时间:{}",message.getBody(),new Date());}
延迟队列插件安装
访问官网
Community Plugins — RabbitMQ
进入rabbitmq docker容器
[root@localhost ~]# docker exec -it rabbitmq bash
查询插件列表是否存在延迟插件
root@6d2342d51b11:/plugins# rabbitmq-plugins list
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...Configured: E = explicitly enabled; e = implicitly enabled| Status: * = running on rabbit@6d2342d51b11|/
[ ] rabbitmq_amqp1_0 3.9.11
[ ] rabbitmq_auth_backend_cache 3.9.11
[ ] rabbitmq_auth_backend_http 3.9.11
[ ] rabbitmq_auth_backend_ldap 3.9.11
[ ] rabbitmq_auth_backend_oauth2 3.9.11
[ ] rabbitmq_auth_mechanism_ssl 3.9.11
[ ] rabbitmq_consistent_hash_exchange 3.9.11
[ ] rabbitmq_event_exchange 3.9.11
[ ] rabbitmq_federation 3.9.11
[ ] rabbitmq_federation_management 3.9.11
[ ] rabbitmq_jms_topic_exchange 3.9.11
[E*] rabbitmq_management 3.9.11
[e*] rabbitmq_management_agent 3.9.11
[ ] rabbitmq_mqtt 3.9.11
[ ] rabbitmq_peer_discovery_aws 3.9.11
[ ] rabbitmq_peer_discovery_common 3.9.11
[ ] rabbitmq_peer_discovery_consul 3.9.11
[ ] rabbitmq_peer_discovery_etcd 3.9.11
[ ] rabbitmq_peer_discovery_k8s 3.9.11
[E*] rabbitmq_prometheus 3.9.11
[ ] rabbitmq_random_exchange 3.9.11
[ ] rabbitmq_recent_history_exchange 3.9.11
[ ] rabbitmq_sharding 3.9.11
[ ] rabbitmq_shovel 3.9.11
[ ] rabbitmq_shovel_management 3.9.11
[ ] rabbitmq_stomp 3.9.11
[ ] rabbitmq_stream 3.9.11
[ ] rabbitmq_stream_management 3.9.11
[ ] rabbitmq_top 3.9.11
[ ] rabbitmq_tracing 3.9.11
[ ] rabbitmq_trust_store 3.9.11
[e*] rabbitmq_web_dispatch 3.9.11
[ ] rabbitmq_web_mqtt 3.9.11
[ ] rabbitmq_web_mqtt_examples 3.9.11
[ ] rabbitmq_web_stomp 3.9.11
[ ] rabbitmq_web_stomp_examples 3.9.11
下载支持3.9.x的插件
退出容器:
root@6d2342d51b11:/plugins# exit
exit
上传到linux服务器
在/usr/local/software/下创建文件夹rabbitmq/plugins
[root@localhost software]# mkdir -p rabbitmq/plugins
拷贝插件到容器中
[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
进入容器安装插件
[root@localhost plugins]# docker exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
打开管理页面
进入Exchange页面,下拉Type看是否已经安装成功。
代码实现
配置类
package com.wnhz.rabbitmq.mq.config;public interface RabbitmqConstants {String DELAYX_QUEUE = "mq_delayx__queue";String DELAYX_ROUTING_KEY = "mq_delayx_routing_key";String DELAYX_EXCHANGE = "mq_delayx__exchange";String DELAYX_EXCHANGE_TYPE = "x-delayed-message";
}
package com.wnhz.rabbitmq.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;import java.util.HashMap;@Configuration
@Slf4j
public class RabbitmqConfig {@Beanpublic Queue delayxQueue() {return new Queue(RabbitmqConstants.DELAYX_QUEUE);}@Beanpublic CustomExchange delayRoutingExchange() {return new CustomExchange(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_EXCHANGE_TYPE,true,false,new HashMap<String, Object>() {{put("x-delayed-type","direct");}});}@Beanpublic Binding delayxBinding() {return BindingBuilder.bind(delayxQueue()).to(delayRoutingExchange()).with(RabbitmqConstants.DELAYX_ROUTING_KEY).noargs();}@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());log.debug("rabbitmq配置:{}完成", rabbitTemplate);return rabbitTemplate;}
}
生产者
@Service
@Slf4j
public class ProduceServiceImpl implements IProduceService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendDelayxUser(User user) {int delayTime = 10000;rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_ROUTING_KEY,user, mpp -> {mpp.getMessageProperties().setDelay(delayTime);return mpp;});log.debug("发送消息:{},发送时间:{},延迟:{}秒", user,new Date(),delayTime/1000);}
}
消费者
@Slf4j
@Service
public class ConsumeServiceImpl implements IConsumeService {@RabbitListener(queues = RabbitmqConstants.DELAYX_QUEUE)@Overridepublic void receiveDelayxUser(User user) {log.debug("消费者:接收到消息-->{},接收时间:{}",user,new Date());}
}