浅析RabbitMQ死信队列

原文首发于公众号【CSJerry】
在这里插入图片描述

在现代分布式系统中,消息队列扮演着至关重要的角色。它们可以实现应用程序之间的异步通信,并确保数据的可靠传输和处理。而在这个领域中,RabbitMQ作为一种强大而受欢迎的消息队列解决方案,具备了高效、可靠和灵活的特性。

然而,即使使用了RabbitMQ,我们仍然会遇到一些不可预料的情况,例如消费者无法处理某些消息、消息过期或者队列溢出等。为了解决这些问题,RabbitMQ引入了死信队列(Dead Letter Queue)的概念,为开发人员提供了一种有效的错误处理机制。

那么,究竟什么是死信队列呢?

本文结合Spring Boot使用RabbitMQ的死信队列,着重从是什么、为什么、怎么用几个方面对死信队列进行简单介绍。

1. 是什么:

  • 死信队列(Dead Letter Queue)是一种特殊的消息队列,用于存储无法被消费的消息。
  • 当消息满足某些条件无法被正常消费时,将被发送到死信队列中进行处理。
  • 死信队列提供了一种延迟处理、异常消息处理等场景的解决方案。

2. 为什么

  • 用来处理消费者无法正确处理的消息,避免消息丢失或积压
  • 实现延迟消息处理,例如订单超时未支付,可以将该消息发送到死信队列,然后再进行后续处理。
  • 用于实现消息重试机制,当消费者处理失败时,将消息重新发送到死信队列进行重试。
  • 提高了系统的可伸缩性和容错性,能够应对高并发和异常情况。

3. 怎么用

  1. 在Spring Boot中配置和使用死信队列:
    • 首先,在pom.xml文件中添加RabbitMQ的依赖项。
    • 然后,在application.properties文件中配置RabbitMQ连接信息。
    • 接下来,创建生产者和消费者代码,并通过注解将队列和交换机进行绑定。
    • 在队列的声明中添加死信队列的相关参数,如x-dead-letter-exchangex-dead-letter-routing-key等。
    • 最后,在消费者中编写处理消息的逻辑,包括对异常消息进行处理,并设置是否重新发送到死信队列。

简而言之,死信队列可以认为是一个正常队列的备用队列(或者说是兜底队列),当正常队列的消息无法消费的时候mq会重新把该消息发送到死信交换机,由死信交换机根据路由键将消息投递到备用队列,启动服务备用方案。

消息从正常队列到死信队列的三种情况:

1、消息被否定确认使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false

2、消息在队列中的时间超过了设置的TTL())时间。

3、消息数量超过了队列的容量限制()。

当一个队列中的消息满足上述三种情况任一个时,改消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。

4. 实战

以下是一个简单的Spring Boot集成RabbitMQ的死信队列示例代码:

  • 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
# 开启消费者手动确认
spring.rabbitmq.listener.type=direct# 发送到队列失败时的手动处理
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.publisher-returns=true# 发送到交换机手动确认
spring.rabbitmq.publisher-confirm-type=simple
  • 配置类
@Configuration
@Slf4j
public class RabbitCof {@Resourceprivate MqKeys mqKeys;@Bean("normalQueue")public Queue normalQueue() {/*** 为普通队列绑定交换机*/Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", mqKeys.DIE_EXCHANGE);args.put("x-dead-letter-routing-key", mqKeys.DIE_ROUTING_KEY);args.put("x-message-ttl", 1000); // 队列中的消息未被消费则1秒后过期return new Queue(mqKeys.NORMAL_QUEUE, true, false, false, args);}@Bean("normalExchange")public Exchange normalExchange() {return new DirectExchange(mqKeys.NORMAL_EXCHANGE);}@Bean("normalBind")public Binding normalBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") Exchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(mqKeys.ROUTING_KEY).noargs();}/*** 死信队列* @return*/@Bean("dieQueue")public Queue dlQueue() {return new Queue(mqKeys.DIE_QUEUE, true, false, false);}/*** 死信交换机* @return*/@Bean("dieExchange")public Exchange dlExchange() {return new DirectExchange(mqKeys.DIE_EXCHANGE);}@Bean("dieBind")public Binding dlBinding(@Qualifier("dieQueue") Queue dlQueue, @Qualifier("dieExchange") Exchange dlExchange) {return BindingBuilder.bind(dlQueue).to(dlExchange).with(mqKeys.DIE_ROUTING_KEY).noargs();}@Resourceprivate ConnectionFactory connectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);/*** 消费者确认收到消息后,手动ack回调处理* spring.rabbitmq.publisher-confirm-type=simple*/rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause)->{if(!ack) {log.info("消息投递到交换机失败,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);} else {log.info("消息成功投递到交换机,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);}});/*** 消息投递到队列失败回调处理* spring.rabbitmq.listener.direct.acknowledge-mode=manual* spring.rabbitmq.publisher-returns=true*/rabbitTemplate.setReturnsCallback((returnedMessage)->{Message message = returnedMessage.getMessage();log.error("分发到到队列失败, body->{}", message.getBody());});return rabbitTemplate;}
}
  • 生产者类
@Component
public class Producer {@Resourceprivate  MqKeys mqKeys;@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend(mqKeys.NORMAL_EXCHANGE, mqKeys.ROUTING_KEY, message);}
}
  • 消费者类
@Component
@RabbitListener(queues = "normal.queue")
@Slf4j
public class Consumer {@RabbitHandlerpublic void handleMessage(String data, Message message, Channel channel) {boolean success = false;int retryCount = 3;System.out.println(message.toString());long deliveryTag = message.getMessageProperties().getDeliveryTag();while (!success && retryCount-- > 0){try {// 处理消息log.info("收到消息: {}, deliveryTag = {}", data, deliveryTag);// 正常处理完毕,手动确认,此处不确认让他进入死信队列
//                success = true;
//                channel.basicAck(deliveryTag, false);Thread.sleep(3 * 1000L);}catch (Exception e){log.error("程序异常:{}", e.getMessage());}}// 达到最大重试次数后仍然消费失败if(!success){try {log.info("move to die queue");// 手动拒绝,移至死信队列/***deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Delivermultiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.requeue – true if the rejected message(s) should be requeued rather than discarded/dead-lettered*/channel.basicNack(deliveryTag, false, false);} catch (IOException e) {e.printStackTrace();}}}
}

以上代码演示了如何在Spring Boot中配置一个普通队列和一个死信队列,然后通过生产者发送消息到普通队列,在消费者中处理消息,并模拟了当发生异常时将消息重新发送到死信队列。

参考连接

  • [rabbit 官网]Dead Letter Exchanges — RabbitMQ

  • 具体代码仓库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/77745.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Jenkins+Python+Ubuntu+Docker的接口/UI自动化测试环境部署详细过程

基于JenkinsPythonUbuntuDocker的接口/UI自动化测试环境部署详细过程 1 Jenkins是什么&#xff1f;2 Jenkins目标是什么&#xff1f;3 什么是CI/CD?3.1 CI持续集成3.2 CD持续部署3.3 CD持续交付 4 Ubuntu环境4.1 环境需求4.2 实现思路 5 Ubuntu下安装Docker6 安装Jenkins6.1 拉…

idea如何加快创建Maven项目的速度

一、下载archetype-catalog.xml 下载archetype-catalog.xml的地址 二、配置 以下所说的配置都指全局配置。 配置Maven -DarchetypeCataloglocal -Dfile.encodinggbk

深度学习基础知识扫盲

深度学习 监督学习&#xff08;Supervised learning&#xff09;监督学习分类 无监督学习&#xff08;Non-supervised learning&#xff09;无监督学习的算法无监督学习使用场景 术语特征值特征向量特征工程&#xff08;Feature engineering&#xff09;特征缩放Sigmod functio…

使用Golang实现一套流程可配置,适用于广告、推荐系统的业务性框架——简单应用

在诸如广告、推荐等系统中&#xff0c;我们往往会涉及过滤、召回和排序等过程。随着系统业务变得复杂&#xff0c;代码的耦合和交错会让项目跌入难以维护的深渊。于是模块化设计是复杂系统的必备基础。这篇文章介绍的业务框架脱胎于线上多人协作开发、高并发的竞价广告系统&…

数据库操作系列-Mysql, Postgres常用sql语句总结

文章目录 1.如果我想要写一句sql语句&#xff0c;实现 如果存在则更新&#xff0c;否则就插入新数据&#xff0c;如何解决&#xff1f;MySQL数据库实现方案: ON DUPLICATE KEY UPDATE写法 Postgres数据库实现方案:方案1&#xff1a;方案2&#xff1a;关于更新&#xff1a;如何实…

前端js--扩展卡片

效果图 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><link rel"stylesheet" href"…

构建器/建造者/构建者模式(C++)

定义 将一个复杂对象的构建与其表示相分离,使得同样的构建过程(稳定)可以创建不同的表示(变化)。 应用场景 在软件系统中&#xff0c;有时候面临着“一个复杂对象”的创建工作&#xff0c;其通常由各个部分的子对象用一定的算法构成;由于需求的变化&#xff0c;这个复杂对象…

centos7 ‘xxx‘ is not in the sudoers file...

如题 执行命令输入密码后时报错&#xff1a; [sudo] password for admin &#xff08;我的账户&#xff09;原因&#xff0c;当前用户还没有加入到root的配置文件中。 解决 vim打开配置文件&#xff0c;如下&#xff1a; #切换到root用户 su #编辑配置文件 vim /etc/sudoe…

AMASS database

AMASS是一个由不同的光学标记运动捕捉数据集统一表示在一个公共框架和参数化下的大型人体运动数据库。它包含了超过40小时的运动数据&#xff0c;涵盖了300多个主体和11000多个运动。它使用了SMPL人体模型&#xff0c;它是一种基于混合形状和姿态空间的生成式人体模型&#xff…

【python】绘图代码模板

【python】绘图代码模板 pandas.DataFrame.plot( )画图函数Seaborn绘图 -数据可视化必备主题样式导入数据集可视化统计关系散点图抖动图箱线图小提琴图Pointplot群图 可视化数据集的分布绘制单变量分布柱状图直方图 绘制双变量分布Hex图KDE 图可视化数据集中的成对关系 好看的图…

机器学习中的人生启示:“没有免费的午餐”定理(NFL)的个人发展之道→探讨感觉和身边其他人有差距怎么办?

文章目录 1 引言2 探究NFL定理的含义3 将NFL定理应用于个人发展4 探索个人兴趣和天赋5 结论 1 引言 机器学习中的“没有免费的午餐”定理&#xff08;NFL&#xff09;是一条深具启示意义的原则。该定理表明&#xff0c;没有一种算法可以在所有问题上都表现最好。在机器学习领域…

FPGA开发:音乐播放器

FPGA开发板上的蜂鸣器可以用来播放音乐&#xff0c;只需要控制蜂鸣器信号的方波频率、占空比和持续时间即可。 1、简谱原理 简谱上的4/4表示该简谱以4分音符为一拍&#xff0c;每小节4拍&#xff0c;简谱上应该也会标注每分钟多少拍。音符时值对照表如下图所示&#xff0c;这表…

软件工程专业应该学什么?

昨天&#xff0c;我朋友的孩子报考了软件工程专业&#xff0c;问我软件工程到底学啥&#xff1f;所以我给他开列了一个书单。 现在高校开了一堆花名头的专业&#xff1a; 偏技术类&#xff1a;云计算、大数据、人工智能、物联网 偏应用类&#xff1a;电子商务、信息管理 但我个…

MySQL|查看事务加锁情况

文章目录 使用information_schema数据库中的表获取锁信息INNODB_TRXINNODB_LOCKSINNODB_LOCK_WAITS 使用SHOW ENGINE INNODB STATUS获取信息补充 使用information_schema数据库中的表获取锁信息 在information_schema数据库中&#xff0c;有几个与事务和锁紧密相关的表 INNOD…

3个命令定位CPU飙高

top 指令找出消耗CPU最厉害的那个进程的pid top -H -p 进程pid 找出耗用CPU资源最多的线程pid printf ‘0x%x\n’ 线程pid 将线程pid转换为16进制 结合jstack 找出哪个代码有问题 jstack 进程pid | grep 16进制的线程pid -A 多少行日志 jstack 进程pid | grep 16进制的线程…

Source Insight_突出显示对选定字符的引用

1、突出显示对选定字符的引用 在Source Insight中&#xff0c;当我们选中一个函数或者变量的时候&#xff0c;关于它的所有引用地方都高亮显示&#xff0c;想要实现效果如下。 2、配置方法 (1)点击"Options"→“File Type options...” (2)勾选“Highlight referenc…

Python - 【socket】 客户端client重连处理简单示例Demo(一)

一. 前言 在Python中&#xff0c;使用socket进行网络通信时&#xff0c;如果连接断开&#xff0c;可以通过以下步骤实现重连处理 二. 示例代码 1. 定义一个函数&#xff0c;用于建立socket连接 import socketdef connect_socket(host, port):while True:try:# 建立socket连…

CMake:检测python解释器和python库

CMake:检测python解释器和python库 导言检测python解释器CMakeLists.txt输出附录 检测python库项目结构CMakeLists.txt相关源码附录 导言 python是一种非常流行的语言。许多项目用python编写的工具&#xff0c;从而将主程序和库打包在一起&#xff0c;或者在配置或构建过程中使…

当服务器域名出现解析错误的问题该怎么办?

​  域名解析是互联网用户接收他们正在寻找的域的地址的过程。更准确地说&#xff0c;域名解析是人们在浏览器中输入时使用的域名与网站IP地址之间的转换过程。您需要站点的 IP 地址才能知道它所在的位置并加载它。但&#xff0c;在这个过程中&#xff0c;可能会出现多种因素…

小白解密ChatGPT大模型训练;Meta开源生成式AI工具AudioCraft

&#x1f989; AI新闻 &#x1f680; Meta开源生成式AI工具AudioCraft&#xff0c;帮助用户创作音乐和音频 摘要&#xff1a;美国公司Meta开源了一款名为AudioCraft的生成式AI工具&#xff0c;可以通过文本提示生成音乐和音频。该工具包含三个核心组件&#xff1a;MusicGen用…