目录
1. springboot配置
2. 开始写RabbitMq代码
3. 队列优化
4. 插件实现延迟队列
5. 总结
前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。
1. springboot配置
-
创建一个 Maven 工程或者 Spring Boot工程
-
添加依赖坐标,这里的 Spring Boot 是3.4.3 版本
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.51</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Knife4j API文档生产工具 --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.4.0</version></dependency><!-- swagger注解支持:Knife4j依赖本依赖 --><dependency><groupId>io.swagger</groupId><artifactId>swagger-annotations</artifactId><version>1.5.22</version></dependency></dependencies>
3.创建 application.yml
文件
server:port: 8888
spring:rabbitmq:host: 你的服务器ipport: 5672username: adminpassword: 123456
这里是 8808 端口,可根据需求决定端口
4. 配置swgger
@Configuration
public class Knife4jConfig {@Beanpublic OpenAPI springShopOpenAPI() {return new OpenAPI()// 接口文档标题.info(new Info().title("接口文档")// 接口文档简介.description("RabbitMq测试文档")// 接口文档版本.version("v1.0")// 开发者联系方式.contact(new Contact().name("luckily").email("3298244978@qq.com")));}}
5. 新建主启动类
@Log4j2
@SpringBootApplication
public class RabbitMqDemoApplication {public static void main(String[] args) {SpringApplication app = new SpringApplication(RabbitMqDemoApplication.class);Environment env = app.run(args).getEnvironment();app.setBannerMode(Banner.Mode.CONSOLE);logApplicationStartup(env);}private static void logApplicationStartup(Environment env) {String protocol = "http";if (env.getProperty("server.ssl.key-store") != null) {protocol = "https";}String serverPort = env.getProperty("server.port");String contextPath = env.getProperty("server.servlet.context-path");if (StringUtils.isBlank(contextPath)) {contextPath = "/doc.html";} else {contextPath = contextPath + "/doc.html";}String hostAddress = "localhost";try {hostAddress = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {log.warn("The host name could not be determined, using `localhost` as fallback");}log.info("""----------------------------------------------------------\t应用程序“{}”正在运行中......\t接口文档访问 URL:\t本地: \t\t{}://localhost:{}{}\t外部: \t{}://{}:{}{}\t配置文件: \t{}----------------------------------------------------------""",env.getProperty("spring.application.name"),protocol,serverPort,contextPath,protocol,hostAddress,serverPort,contextPath,env.getActiveProfiles());}}
2. 开始写RabbitMq代码
代码架构
创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:
代码实现
配置类代码
/*
* TTL队列 配置文件类代码
*
* */
@Configuration
public class TtlQueueConfig {//普通交换机的名称public static final String X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
生产者代码
/** 发送延迟消息* */@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}
}
消费者
/** 队列TTL 消费者* */
@Slf4j
@Component
public class DeadLetterQueueConsumer {//接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}
效果:
3. 队列优化
问题:
第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
代码架构图
增加一个队列QC实现动态延时
实现
配置文件类
/** TTL队列 配置文件类代码** */@Configurationpublic class TtlQueueConfig {//普通交换机的名称public static final String X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明QC队列@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>();//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable().withArguments(arguments).build();}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}
生产者
/** 发送延迟消息* */
@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}//开始发消息@GetMapping("sendExpirationMsg/{message}/{ttlTime}")@Operation(summary = "Rabbitmq发送消息与延迟时间")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg->{//发送消息的时候 延迟时长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}
消费者
消费者代码不改变
效果:本来消息2是延迟2秒,消息1延迟20秒,消息2似乎要比消息1更早接收,但因为RabbitMQ智慧检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这是队列特性
怎么弥补这个缺陷,需要用到Rabbitmq的插件实现延迟队列
4. 插件实现延迟队列
我们之前延迟消息是在队列进行延迟,安装插件之后是在交换机进行延迟
- 下载延迟插件https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
- 将延迟插件放到RabbitMQ的插件目录下:由于我通过docker容器安装的rabbitmq,所以我将安装包先通过xftp发送给主机,在通过docker命令给容器,进入容器安装
复制给容器
进入容器
启动插件
重启容器
docker restart 7af
网页端出现以下就表示成功安装插件
实战
配置文件类
@Configurationpublic class DelayedQueueConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);};//声明交换机@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
生产者
/** 发送延迟消息* */@Slf4j@RestController@RequestMapping("/ttl")public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息 基于插件的 消息 及 延迟的时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {// 发送消息的时候 延迟时长 单位msmsg.getMessageProperties().setDelay(delayTime);return msg;});}}
消费者
// 消费者代码 基于插件的延迟消息@Slf4j@Componentpublic class DelayQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void recieveDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}
}
效果
成功!
5. 总结
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用
RabbitMQ.的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis.的zsset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景