MQ介绍和基本使用在75章介绍过了,不再重复
MQ高级用法-延时消息
什么是延时消息?
Producer 将消息发送到 MQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息
插件安装
RabbitMQ延迟队列插件下载
下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装
把下载好的文件拖到你的rabbitMQ下面的plugins目录里面
#举例
D:\Applaaction\rabbitmq_server-3.13.0\plugins
启用插件
执行下面的命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
检查是否成功
打开可视化面板(可视化面板如何安装75章有讲)
访问 http://localhost:15672/#/ 账号密码都是 guest
发现新增了一个延迟队列类型 x-delayed-message
代码编写
应用场景
现在是2024-06-06 半夜1.08分,我选择外卖预约中午的11.00 - 11.20
左右的外卖,我如果选择下单,那么这个单不会立马推送到商家的客户端里面,而是存放到消息队列,使用延时消息
,在差不多的时间段例如10.30左右才会把这个单推送到商家的客户端,这样商家出餐10分钟,骑手送20-30分钟左右,送过来就差不多11点左右
生产者
发布订阅模式在上一章已经讲过了不懂去看上一章
- 我们使用新增的延时类型切换一下type类型
x-delayed-message
- 连接交换机的时候增加
arguments对象
添加x-delayed-type
目标交换机类型 这儿使用direct
- 发布消息的时候增加头部信息
x-delay:延时的时间(毫秒)
import amqplib from 'amqplib'
//1.连接MQ
const connection = await amqplib.connect('amqp://localhost:5672')
//2.创建一个通道
const channel = await connection.createChannel()
//3.创建交换机
/*** @param {string} exchange 交换机名称 随便写* @param {string} type 交换机类型 direct fanout topic headers x-delayed-message* @param {options} options 可选配置项*/
//这个方法就是说如果你创建过这个交换机就不会再创建了 如果没有创建过这个交换机就会创建
await channel.assertExchange('delayed-1', 'x-delayed-message',{arguments:{'x-delayed-type': 'direct' //目标交换机类型}
})//4.发送消息
/*** @param {string} exchange 要发送到交换机的名称* @param {string} routingKey 匹配路由的key* @param {Buffer} buffer 要发送的消息* @param {options} options 可选配置项*/
channel.publish('delayed-1', 'time', Buffer.from('延时消息'),{headers:{'x-delay': 10000 //延时 10秒}
})
//断开连接
await channel.close()
await connection.close()
process.exit(0)
消费者
import amqplib from 'amqplib'
//1.连接MQ
const connection = await amqplib.connect('amqp://localhost:5672')
//2.创建一个通道
const channel = await connection.createChannel()
//3.创建交换机
await channel.assertExchange('delayed-1', 'x-delayed-message',{arguments:{'x-delayed-type': 'direct' //目标交换机类型}
})
//4.创建一个队列
const { queue } = await channel.assertQueue('queue-1')
//5.交换机跟队列要绑定
/*** @param {string} queue 队列名称* @param {string} exchange 交换机名称* @param {string} routingKey 匹配路由的key*/
channel.bindQueue(queue, 'delayed-1', 'time')
//6.消费消息
channel.consume(queue, (msg) => {console.log(msg.content.toString())
}, {noAck: true
})