RabbitMQ——死信队列和延迟队列

文章目录

  • RabbitMQ——死信队列和延迟队列
    • 1、死信队列
    • 2、基于插件的延迟队列
      • 2.1、安装延迟队列插件
      • 2.2、代码实例

RabbitMQ——死信队列和延迟队列

1、死信队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

死信的来源

在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

  • 消息被拒绝(basic.rejectbasic.nack)且不重新入队(requeue 参数为 false)。
  • 消息过期(TTL,Time-To-Live)。
  • 队列长度超过限制,无法再添加数据到mq中。

生产者

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.weipch.util.RabbitMqUtils;/*** @Author 方唐镜* @Create 2024-03-03 14:08* @Description*/
public class Produce {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//模拟消息过期 10s//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "hello world" + i;channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());}}
}

消费者

正常队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer01 {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_EXCHANGE = "dead_exchange";private static final String NORMAL_QUEUE = "normal_queue";private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机和队列channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");//声明普通交换机和队列channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//死信配制 指定死信交换机和死信路由键Map<String, Object> map = new HashMap<>();map.put("x-dead-letter-exchange", DEAD_EXCHANGE);map.put("x-dead-letter-routing-key", "dead-routing-key");//最大长度//map.put("x-max-length", 6);channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);if (message.contains("5")){System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");//拒绝消息并把消息丢入死信队列channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer01接收消息:" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});}
}

死信队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer02 {private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.basicConsume(DEAD_QUEUE, true,(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),(consumerTag, e) -> {});}
}

生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

2、基于插件的延迟队列

延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。

使用场景

  1. 定时任务执行: 在需要定时执行任务的应用中,可以使用延迟队列来实现。将任务消息发送到延迟队列,设置消息的过期时间为任务执行的时间,当消息过期时,消费者即可执行相应的任务。
  2. 消息重试机制: 当某个操作失败时,可以将操作消息发送到延迟队列,并设置合适的重试时间。在消息重试的过程中,如果操作成功,消息将正常被消费;如果一直失败,可以选择在一定时间后放弃重试,将消息发送到死信队列或进行其他处理。
  3. 订单超时处理: 在电商等场景中,对于长时间未支付的订单,可以将订单消息发送到延迟队列,并设置订单的过期时间。当订单过期时,系统可以取消订单、释放库存等操作。
  4. 限流与流控: 通过使用延迟队列,可以实现消息的有序处理和限流,确保系统在高峰期不会因为瞬时大量请求而过载。
  5. 系统通知与提醒: 在需要发送系统通知或提醒的场景中,可以使用延迟队列来实现消息的定时推送。
  6. 缓解数据库压力: 对于一些需要定期清理的数据,可以使用延迟队列来触发数据清理操作,减轻数据库压力。

2.1、安装延迟队列插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

以docker方式安装

1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录

docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins

插件版本和rabbitmq版本一致

2、进入容器查看插件

在这里插入图片描述

3、启动插件

root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重启容器

docker restart 7c8726620871

5、安装成功

在这里插入图片描述

2.2、代码实例

配置类

package springbootrabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configuration
public class DelayedQueueConfig {//    队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//    交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//    routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingKey";//    声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//    声明交换机 基于插件的交换机@Beanpublic CustomExchange delayedExchange() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");/** 1.交换机名称* 2.交换机类型* 3.是否需要持久化* 4.是否需要自动删除* 5.其他参数* */return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);}//    绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {//            发送消息的时候 延迟时长msg.getMessageProperties().setDelay(delayTime);return msg;});
}

消费者

@Slf4j
@Component
public class DelayedQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/277983.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

npm包、全局数据共享、分包

使用 npm 包 小程序对 npm 的支持与限制 目前&#xff0c;小程序中已经支持使用 npm 安装第三方包&#xff0c;从而来提高小程序的开发效率。但是&#xff0c;在小程序中使用npm 包有如下 3 个限制&#xff1a; ① 不支持依赖于 Node.js 内置库的包 ② 不支持依赖于浏览器内置…

C#配置连接数据库字段

在Web.config文件中 添加如下配置 <!--连接数据库字段--><connectionStrings><add name"sql" connectionString"server.;uidsa;pwd8888;databaseArticleWebSite" /></connectionStrings>

Day67:WEB攻防-Java安全JNDIRMILDAP五大不安全组件RCE执行不出网

知识点&#xff1a; 1、Java安全-RCE执行-5大类函数调用 2、Java安全-JNDI注入-RMI&LDAP&高版本 3、Java安全-不安全组件-Shiro&FastJson&JackJson&XStream&Log4j Java安全-RCE执行-5大类函数调用 Java中代码执行的类&#xff1a; GroovyRuntimeExecPr…

【基于HTML5的网页设计及应用】——改变文字和背景颜色

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

JavaWeb:vue、AJax、ELement、maven、SpringBoot、、Http、Tomcat、请求响应、分层解耦

1 Vue 1.1 Vue介绍 VUE是前端框架&#xff0c;基于MVVM&#xff0c;实现数据双向绑定 框架是半基础软件&#xff0c;可重用的代码模型 1.2 Vue指令 <script src"js/vue.js"></script></head> <body><div id"id"><!--…

机器人路径规划:基于深度优先搜索(Depth-First-Search,DFS)算法的机器人路径规划(提供Python代码)

一、深度优先搜索算法介绍 深度优先搜索算法&#xff08;Depth-First-Search&#xff09;的基本思想是沿着树的深度遍历树的节点&#xff0c;尽可能深的搜索树的分支。当节点v的所有边都己被探寻过&#xff0c;搜索将回溯到发现节点v的那条边的起始节点。这一过程一直进行到已…

分布式之Nacos配置中心

Nacos作为配置中心源码分析 1、什么是Naocs配置中心 官方文档&#xff1a; https://github.com/alibaba/spring-cloud-alibaba/wiki/Nacos-config Nacos 提供用于存储配置和其他元数据的 key/value 存储&#xff0c;为分布式系统中的外部化配置提供服务器端和客户端支持。使…

基于springboot+vue的乡政府管理系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

Spring MVC文件上传配置

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 文件上传 Spring MVC文件上传基于Servlet 3.0实现&#xff1b;示例代码如下&#xff1a; Overrideprotected void customizeRegistration(ServletRegistration.Dynamic reg…

win10 配置 oh-my-posh

win10 配置 oh-my-posh 0. 前置1. 安装1.1. 软件1.2. 字体1.3. 激活1.3.1. Git Bash1.3.2. PowerShell 2. 配置2.1. 效果2.2. 说明2.3. 其他2.3.1. 新版PowerShell2.3.2 conda问题 0. 前置 这个东西毕竟是个&#xff0c;命令行美化工具&#xff0c;所以需要先有一个命令行&…

最后的挣扎 - Qt For Android on HuaWei Mate 60Pro (v4.0.0)

简介 为什么叫最后的挣扎, 其实都知道即将到来的 HarmonyOS NEXT 将抛弃Android支持&#xff0c;纯血HarmonyOS 将上线&#xff0c; 此时再说Qt for android支持Huawei HarmonyOS的设备其实并没有多少意思&#xff0c; 但恐怕在大多数基础软件完成兼容前&#xff0c; 很多人还是…

量子计算+HPC!ORNL与Riverlane、Rigetti合作研发

内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 编辑丨慕一 编译/排版丨沛贤 1000字丨8分钟阅读 近日&#xff0c;英国量子计算初创公司Riverlane和美国量子计算公司Rigetti Computing宣布将参与由美国能源部橡树岭国家实验室&#xff08;OR…

【javaWeb】在webapp中手动发布一个应用

标题 &#x1f432;一、为什么要在webapp中手动发布一个应用&#x1f389;二、手动发布步骤1.下载Tomcat2.解压并安装3.在webapps中创建文档 ✨三、总结 &#x1f432;一、为什么要在webapp中手动发布一个应用 好处解释灵活性手动发布应用程序可以根据自己的需求进行自定义配置…

【C++】了解一下编码

个人主页 &#xff1a; zxctscl 如有转载请先通知 文章目录 1. 前言2. ASCII编码3. unicode4. GBK5. 类型转换 1. 前言 看到string里面还有Template instantiations&#xff1a; string其实是basic_string<char>&#xff0c;它还是一个模板。 再看看wstring&#xff1…

195基于matlab的凸轮机构GUI界面

基于matlab的凸轮机构GUI界面 &#xff0c; 凸轮设计与仿真 绘制不同的凸轮轮廓曲线 &#xff0c;凸轮机构运动参数包括推程运动角&#xff0c;回程运动角&#xff0c;远休止角&#xff0c;近休止角。运动方式&#xff0c;运动规律。运动仿真过程可视化。内容齐全详尽。用GUI打…

el-select使用filterable下拉无法关闭得问题

这里推荐一个前端框架 sakuya / SCUI&#xff0c;他里面有个formTable&#xff0c;可以解决很多订单明细保存得问题。基本沿用element-plus的前端使用模式&#xff0c;让表单表格变的非常容易。 这个的供应商插件&#xff0c;当使用filterable后&#xff0c;点击表格重的选项&…

HarmonyOS NEXT应用开发—视频全屏切换案例

介绍 本示例介绍了Video组件和ohos.window接口实现媒体全屏的功能。 该场景多用于首页瀑布流媒体播放等。 效果图预览 使用说明&#xff1a; 点击全屏按钮&#xff0c;横屏媒体窗口。点击恢复窗口按钮&#xff0c;恢复媒体窗口。 实现步骤 在Video组件内调用 onFullscreen…

Gin 框架中前端向后端传值的几种方式介绍

我将为您详细讲解 Gin 框架中前端向后端传值的几种方式&#xff0c;并给出相应的简单例子。Gin 是一个高性能的 Web 框架&#xff0c;用于构建后端服务。在 Web 应用程序中&#xff0c;前端通常需要向后端发送数据&#xff0c;以便后端能够进行处理。以下是几种常见的前端向后端…

Vue2(六):生命周期、组件、组件的嵌套、VueComponent构造函数、单文件组件

一、生命周期 1.什么是生命周期&#xff1f; 生命周期 1.又名&#xff1a;生命周期回调函数、生命周期函数、生命周期钩子。 2.是什么&#xff1a;Vue在关键时刻帮我们调用的一些特殊名称的函数。 3.生命周期函数的名字不可更改&#xff0c;但函数的具体内容是程序员根据需求…

mybatis源码阅读系列(二)

前言 上一篇文章mybatis源码阅读系列&#xff08;一&#xff09;介绍了mybatis和原生jdbc的区别&#xff0c;并通过代码展示了两者的运行过程和结果&#xff0c;下面让我们继续详细了解下mybatis的执行过程&#xff1b; package com.wyl.mybatis.service;import com.wyl.mybat…