RabbitMQ (4)

RabbitMQ (4)

文章目录

  • 1. 死信的概念
  • 2. 死信的来源
  • 3. 死信代码案例
    • 3.1 TTL 过期时间
    • 3.2 超过队列最大长度
    • 3.3 拒绝消息

前言

上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列

1. 死信的概念


先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

2. 死信的来源

  1. 消息 TTL 过期 : TTL 是 Time To Live 的缩写, TTL 就是 生存时间
  2. 队列达到最长长度 : 队列满了 , 无法添加数据到 MQ 中
  3. 消息被拒绝 (basic.reject 或 basic.nack) 并且 requeue = false

3. 死信代码案例

这里 创建一个 direct 交换机 ,两个消费者 , 一个生产者 , 两个 队列 (一个为 消息队列 , 一个为死信队列)


图:

在这里插入图片描述


代码 :

3.1 TTL 过期时间

生产者:

package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());}}
}

消费者 c1 (启动之后关闭该消费者, 模拟其接受不到消息)

package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}

先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息

在这里插入图片描述


c1 看完,我们在来写 c2 消费者 ,将进入到死信队列的消息 进行消费.


消费者c2

package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}


图:

在这里插入图片描述


看完 消息过期后 ,消息转发到 死信队列 被 c2 消费,下面我们来 尝试使用 死信最大长度 (队列满了,将多的消息转发到死信队列中)

3.2 超过队列最大长度

消息生产者代码 去掉 TTL 属性 , 将 basicPublish 的第三个参数改为 null


生产者:

package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10s
//        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;
//            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}


c1 消费者 (启动之后关闭该消费者 模拟其接收不到消息)

package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}


注意:

这参数改变了(没有设置 ttl 时间,新增了 队列的 最大长度限制 为 6) ,所以 需要把原来队列删除


消费者c2 代码不变

package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}


效果:

在这里插入图片描述


这里 之所以要启动 c1 后在关闭,是为了展示 6个消息放到 普通队列 ,4个消息放到死信队列, 如果不这么做,发送的 10个消息 都会被 c1 消费 ,(消息发送到 队列后 , 立马 转发给 c1 导致 队列就不会达到 6 个 ,队列不会满 ,也就不会将消息转化给 死信队列).

在这里插入图片描述

3.3 拒绝消息


消息生产者 和 消费者 c2 与上面的代码一样

这里我们 拒绝 info7 消息 ,想要 拒绝 info7 消息,我们可以采用手动应答.


消费者c1

package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信
//        arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {String msg = new String(message.getBody(), "UTF-8");if (msg.equals("info7")) {System.out.println("C1 接收到消息为: " + msg + " 此消息被 C1 拒绝");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println("C1 接收到的消息为: " + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};
//        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});// 开启 手动应答channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (tag) -> {});}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/173167.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Wpf 使用 Prism 实战开发Day01

一.开发环境准备 1. VisualStudio 2022 2. .NET SDK 7.0 3. Prism 版本 8.1.97 以上环境&#xff0c;如有新的版本&#xff0c;可自行选择安装新的版本为主 二.创建Wpf项目 1.项目的名称:MyToDo 项目名称:这里只是记录学习&#xff0c;所以随便命名都无所谓,只要觉得合理就…

数据清洗与规范化详解

数据处理流程&#xff0c;也称数据处理管道&#xff0c;是将原始数据转化为有意义的信息和知识的一系列操作步骤。它包括数据采集、清洗、转换、分析和可视化等环节&#xff0c;旨在提供有用的见解和决策支持。在数据可视化中数据处理是可视化展示前非常重要的一步&#xff0c;…

CN考研真题知识点二轮归纳(1)

本轮开始更新真题中涉及过的知识点&#xff0c;总共不到20年的真题&#xff0c;大致会出5-10期&#xff0c;尽可能详细的讲解并罗列不重复的知识点~ 目录 1.三类IP地址网络号的取值范围 2.Socket的内容 3.邮件系统中向服务器获取邮件所用到的协议 4.RIP 5.DNS 6.CSMA/CD…

python随手小练8(南农作业题)

题目1: 输入3 门课程 a,b,c 的成绩,求 3 门成绩的总和平均值(整数,四舍五人)以及最高和最低值。如果3门课程考试成绩分别以权重 0.50.3 和0.2计人总评成绩(整数先求总和再四舍五入),则最终总评成绩是多少? 具体操作&#xff1a; a float(input("a:")) b float(in…

Element 多个Form表单 同时验证

一、背景 在一个页面中需要实现两个Form表单&#xff0c;并在页面提交时需要对两个Form表单进行校验&#xff0c;两个表单都校验成功时才能提交 所用技术栈&#xff1a;Vue2Element UI 二、实现效果 三、多个表单验证 注意项&#xff1a; 两个form表单&#xff0c;每个表单上…

2017年亚太杯APMCM数学建模大赛B题喷雾轨迹规划问题求解全过程文档及程序

2017年亚太杯APMCM数学建模大赛 B题 喷雾轨迹规划问题 原题再现 喷釉工艺用喷釉枪或喷釉机在压缩空气下将釉喷入雾中&#xff0c;使釉附着在泥体上。这是陶瓷生产过程中一个容易实现自动化的过程。由于不均匀的釉料在烧制过程中会产生裂纹&#xff0c;导致工件报废&#xff0…

https下载图片

OpenSSL用法示例 OpenSSL源码安装 对于ubuntu&#xff0c;懒得编译源码可以直接安装 sudo apt-get install libssl–dev /usr/include/openssl/ssl.h CMakeLists中添加 link_libraries(ssl crypto) apt-get安装不需要再制定libssl.a, libcrypto.a的路径了, 就像用libc标…

【算法|动态规划 | 01背包问题No.1】AcWing 426. 开心的金明

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【AcWing算法提高学习专栏】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&a…

node模块导出引入两种方式和npm包管理

模块化的好处 在 Node.js 中每个文件都被当做是一个独立的模块&#xff0c;模块内定义的变量和函数都是独立作用域的&#xff0c;因为 Node.js 在执行模块代码时&#xff0c;将使用如下所示的函数封装器对其进行封装 (function(exports,require,module,__filename,_dirname){//…

C# | Chaikin算法 —— 计算折线对应的平滑曲线坐标点

Chaikin算法——计算折线对应的平滑曲线坐标点 本文将介绍一种计算折线对应的平滑曲线坐标点的算法。该算法使用Chaikin曲线平滑处理的方法&#xff0c;通过控制张力因子和迭代次数来调整曲线的平滑程度和精度。通过对原始点集合进行切割和插值操作&#xff0c;得到平滑的曲线坐…

CNN 网络结构简介

本文通过整理李宏毅老师的机器学习教程的内容&#xff0c;介绍 CNN&#xff08;卷积神经网络&#xff09;的网络结构。 CNN 网络结构, 李宏毅 CNN 主要应用在图像识别&#xff08;image classification, 图像分类&#xff09;领域。 通常&#xff0c;输入的图片大小相同&am…

【开源】基于SpringBoot的计算机机房作业管理系统的设计和实现

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 登录注册模块2.2 课程管理模块2.3 课时管理模块2.4 学生作业模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 课程表3.2.2 课时表3.2.3 学生作业表 四、系统展示五、核心代码5.1 查询课程数据5.2 新增课时5.3 提交作…

Jupyter Notebook还有魔术命令?太好使了

在Jupyter Notebooks中&#xff0c;Magic commands&#xff08;以下简称魔术命令&#xff09;是一组便捷的功能&#xff0c;旨在解决数据分析中的一些常见问题&#xff0c;可以使用%lsmagic 命令查看所有可用的魔术命令 插播&#xff0c;更多文字总结指南实用工具科技前沿动态…

视频上的水印文字如何去掉?

嘿&#xff0c;大家好&#xff01;作为一个自媒体从业者&#xff0c;我相信大家都想知道如何去掉视频上的水印文字&#xff0c;想必大家和我一样每天都会在互联网寻找素材&#xff0c;而大部分图片或者视频都带有各种各样的水印&#xff0c;这给我的创作带来了不小的麻烦&#…

Vue 3.3.6 ,得益于WeakMap,比之前更快了

追忆往昔&#xff0c;穿越前朝&#xff0c;CSS也是当年前端三剑客之一&#xff0c;风光的很&#xff0c;随着前端跳跃式的变革&#xff0c;CSS在现代前端开发中似乎有点默默无闻起来。 不得不说当看到UnoCss之前&#xff0c;我甚至都还没听过原子化CSS[1]这个概念&#xff08;…

[开源]一个低代码引擎,支持在线实时构建低码平台,支持二次开发

一、开源项目简介 TinyEngine低代码引擎使能开发者定制低代码平台&#xff0c;支持在线实时构建低码平台&#xff0c;支持二次开发或集成低码平台能力。 二、开源协议 使用MIT开源协议 三、界面展示 四、功能概述 TinyEngine是一个低代码引擎&#xff0c;基于这个引擎可以构…

【Java每日一题】——第四十三题:USB接口程序设计。(2023.10.29)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

中颖单片机SH367309全套量产PCM,专用动力电池保护板开发资料

方案总体介绍 整套方案硬件部分共2块板子&#xff0c;包括MCU主板&#xff0c;采用SH79F6441-32作为主处理器。MCU主板包括2个版本。PCM动力电池保护板采用SH367309。 软件方案采用Keil51建立的工程&#xff0c;带蓝牙的版本&#xff0c;支持5~16S电池。 硬件方案--MCU主板 MC…

【Matlab2016】Matlab中文版的下载、安装、激活(不建议安装过高版本!!)

这里写目录标题 首先双击R2016_win64.iso加载镜像文件双击setup.exe开始安装选择使用文件密钥安装填入密钥修改安装路径并记住此路径建议全部勾选等待安装完成 激活复制补丁到matlab路径下 创建快捷方式进入bin目录&#xff0c;找到matlab.exe 安装包 首先双击R2016_win64.iso加…

java基础巩固

JDK11和JDK8是oracle重点维护的 常用的包 单例 多例 枚举 jar包打包 测试