如何保证消息不丢失?——使用rabbitmq的死信队列!

如何保证消息不丢失?——使用rabbitmq的死信队列!

1、什么是死信

在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:

    1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject
    1. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack
    1. 消息的Expiration 过期时长或队列TTL过期时间。
    1. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

2、什么是死信队列

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预
在这里插入图片描述

3、那么,我们到底如何来使用死信队列呢?

死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。

生产者

/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.provider;import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** <p>Project: spring-rabbitmq - DeadProvider</p>* <p>Powered by fpl1116 On 2024-04-09 11:35:12</p>* <p>描述:<p>** @author penglei* @version 1.0* @since 1.8*/
@Service
public class DeadProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OrderingOk orderingOk) {rabbitTemplate.convertAndSend("Direct_E01", "RK01", orderingOk,new MessagePostProcessor(){@Overridepublic Message postProcessMessage(Message message) throws AmqpException {int id  = orderingOk.getId();int expiration = 0;if(id == 1){expiration = 50*1000;}else if(id == 2){expiration = 40*1000;}else if(id ==3){expiration = 30*1000;}else if(id ==4){expiration = 20*1000;}else if(id ==5){expiration = 10*1000;}//为每个消息设置过期时长,但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费message.getMessageProperties().setExpiration(String.valueOf(expiration));return message;}});}
}

消费者

/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.consumers;import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;/*** <p>Project: spring-rabbitmq - DeadConsumer</p>* <p>Powered by fpl1116 On 2024-04-09 11:32:59</p>* <p>描述:<p>** @author penglei* @version 1.0* @since 1.8*/
//@Configuration
@Slf4j
public class DeadConsumer {//死信交换机@Beanpublic DirectExchange deadExchange(){return  ExchangeBuilder.directExchange("Dead_E01").build();}//死信队列@Beanpublic Queue deadQueue1(){return   QueueBuilder.durable("Dead_Q01").build();}//死信交换机与死信队列的绑定@Beanpublic Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");}//业务队列@Beanpublic Queue queue1(){return   QueueBuilder.durable("Direct_Q01").deadLetterExchange("Dead_E01").deadLetterRoutingKey("RK_DEAD")//.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息//.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列.build();}//业务交换机@Beanpublic DirectExchange exchange(){return  ExchangeBuilder.directExchange("Direct_E01").build();}//业务交换机与队列的绑定@Beanpublic Binding binding1(Queue queue1,DirectExchange exchange){return BindingBuilder.bind(queue1).to(exchange).with("RK01");}//    @RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg,Message message, Channel channel) throws IOException {
//
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//
//        System.out.println("消费者1 收到消息:"+ msg +" tag:"+deliveryTag);
//
//        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }
}//}

测试

@Testvoid test4() throws IOException {for (int i = 1; i <=5;i++){OrderingOk orderingOk = OrderingOk.builder().id(i).name("fpl " + i).build();deadProvider.send(orderingOk);System.out.println("发送成功:"+i);}System.in.read();}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/309704.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

畅游网络:构建C++网络爬虫的指南

概述 随着信息时代的来临&#xff0c;网络爬虫技术成为数据采集和网络分析的重要工具。本文旨在探讨如何运用C语言及其强大的cpprestsdk库构建一个高效的网络爬虫&#xff0c;以便捕捉知乎等热点信息。为了应对IP限制的挑战&#xff0c;我们将引入亿牛云爬虫代理服务&#xff…

使用 Python 的 LSTM 进行股市预测

目录 一、说明 二、为什么需要时间序列模型&#xff1f; 三、下载数据 3.1 从 Alphavantage 获取数据 3.1 从 Kaggle 获取数据 3.3 数据探索 3.4 数据可视化 四、将数据拆分为训练集和测试集 五、数据标准化 六、通过平均进行一步预测 6.1 标准平均值 6.2 指数移动平均线 6.3 如…

【how2j练习题】HTML DOM部分阶段练习

练习1 <!-- 验证账号是否已经存在 那么就在js使用简单的验证规则&#xff1a; 如果账号是以a或者A开头的&#xff0c;那么就提示已经存在了。 --> <!-- 1.需要一个输入框和一个按钮 2.按钮上绑上一个事件。 3.编写事件&#xff0c;并输出答案 --><html><…

锂电池寿命预测 | Matlab基于BiLSTM双向长短期记忆神经网络的锂电池寿命预测

目录 预测效果基本介绍程序设计参考资料 预测效果 基本介绍 锂电池寿命预测 | Matlab基于BiLSTM双向长短期记忆神经网络的锂电池寿命预测 程序设计 完整程序和数据获取方式&#xff1a;私信博主回复Matlab基于BiLSTM双向长短期记忆神经网络的锂电池寿命预测。 参考资料 [1] h…

SinoDB创建、更改、删除索引

SinoDB数据库的索引组织方式有两种&#xff1a;B树索引与R树索引。B 树是大多数数据库所采用的索引组织方式。R 树索引作为表的辅助访问方法&#xff0c;主要用于查找多维空间数据。本文主要讨论B 树索引。 1. B 树索引 B 树索引按级别进行组织。最高级别包含指向真实的数据的…

SSM项目转Springboot项目

SSM项目转Springboot项目 由于几年前写的一个ssm项目想转成springboot项目&#xff0c;所以今天倒腾了一下。 最近有人需要毕业设计转换一下&#xff0c;所以我有时间的话可以有偿帮忙转换&#xff0c;需要的私信我或&#xff0b;v&#xff1a;Arousala_ 首先创建一个新的spr…

面试官:MySQL的自增 ID 用完了,怎么办?

如果你用过或了解过MySQL&#xff0c;那你一定知道自增主键了。每个自增id都是定义了初始值&#xff0c;然后按照指定步长增长&#xff08;默认步长是1&#xff09;。虽然&#xff0c;自然数是没有上限的&#xff0c;但是我们在设计表结构的时候&#xff0c;通常都会指定字段长…

判断IQ水平-第12届蓝桥杯选拔赛Python真题精选

[导读]&#xff1a;超平老师的Scratch蓝桥杯真题解读系列在推出之后&#xff0c;受到了广大老师和家长的好评&#xff0c;非常感谢各位的认可和厚爱。作为回馈&#xff0c;超平老师计划推出《Python蓝桥杯真题解析100讲》&#xff0c;这是解读系列的第50讲。 判断IQ水平&#…

MGRE-OSPF接口网络类型实验

OSPF接口网络类型实验 一&#xff0c;实验拓扑 初始拓扑&#xff1a; 最终拓扑&#xff1a; 二&#xff0c;实验要求及分析 要求&#xff1a; 1&#xff0c;R6为ISP只能配置IP地址&#xff0c;R1-R5的环回为私有网段 2&#xff0c;R1/R4/R5为全连的MGRE结构&#xff0c;R…

HCIP的学习(8)

OSPF数据报文 OSPF头部信息&#xff08;公共固定&#xff09; 版本&#xff1a;OSPF版本&#xff0c;在IPv4网络中版本字段恒定为数值2&#xff08;v1属于实验室版本&#xff0c;v3属于IPv6&#xff09;类型&#xff1a;代表具体是哪一种报文&#xff0c;按照1~5排序&#xff…

qt5-入门-QByteArray

参考&#xff1a; Using QByteArray - Qt Wiki https://wiki.qt.io/Using_QByteArray Qt总结之十五&#xff1a;QByteArray详解-CSDN博客 https://blog.csdn.net/Aidam_Bo/article/details/85778012 QT学习&#xff1a;09 QByteArray - schips - 博客园 https://www.cnblogs.…

数据结构—顺序表实现通讯录

在上一节我们基本了解了顺序表的基本知识&#xff0c;接下来我们就用顺序表来实现一下通讯录。 一、基于动态顺序表实现通讯录 1.1 功能介绍 1. 能够保存用户信息&#xff1a;姓名&#xff0c;性别&#xff0c;年龄&#xff0c;电话&#xff0c;地址等 2. 添加联系人信息 3. …

创新营销利器:淘宝扭蛋机小程序开发全解析

在数字化浪潮的推动下&#xff0c;淘宝扭蛋机小程序的开发成为了一种全新的购物体验。它巧妙地将传统扭蛋机的乐趣与移动技术的便捷相结合&#xff0c;为用户带来了前所未有的惊喜与互动。 淘宝扭蛋机小程序的开发&#xff0c;不仅是一次技术的革新&#xff0c;更是一次购物方…

使用Python写接口压测2简单递归

递归其实在压测场景用的不多&#xff0c;但是批量造数据或批量导出&#xff0c;用的比较多&#xff0c;常见的压测登陆&#xff0c;首先你要有登陆账号的csv&#xff0c;这个时候自己可以实现一个批量获取账号的py就很惬意。 编辑器 VScode VSCode 全称 Visual Studio Code&…

基于数据库现有表导出为设计文档

1.查询 SELECTCOLUMN_NAME 字段名,COLUMN_COMMENT 字段描述,COLUMN_TYPE 字段类型,false as 是否为主键 FROMINFORMATION_SCHEMA.COLUMNS wheretable_NAME region -- 表名2.查询结果 3.导出为excel

Python学习笔记20 - 模块

什么叫模块 自定义模块 Python中的包 Python中常用的内置模块 第三方模块的安装与使用

如何提升软件发布管理过程?

大家都说&#xff0c;实践出真知。在成为一位首席技术官&#xff08;CTO&#xff09;之前&#xff0c;我也是做着跑腿活&#xff0c;我的职业生涯是从软件工程师开始的&#xff0c;后来开始管理开发和 QA 团队&#xff0c;并负责发布管理管道。我做过上百个项目&#xff0c;其中…

【第十五届】蓝桥杯省赛C++b组

今年的蓝桥杯省赛已经结束了&#xff0c;与以往不同&#xff0c;今年又回到了8道题&#xff0c;而22&#xff0c;23年出现了10道题 大家觉得难度怎么样&#xff0c;欢迎进来讨论&#xff0c;博主今年没参加哈&#xff0c;大家聊聊&#xff0c;我听听大家的意见和看法哈 试题A:…

九州金榜|孩子青春期应该如何家庭教育?

青春期&#xff0c;是一个人从儿童走向成年的重要过渡阶段&#xff0c;也是心理、生理发生巨大变化的时期。面对这一特殊时期的孩子&#xff0c;家庭教育显得尤为重要。那么&#xff0c;作为家长&#xff0c;我们该如何进行青春期孩子的家庭教育呢&#xff1f;九州金榜家庭教育…

Langchain入门到实战-第一弹

Langchain入门到实战 Langchain简介官网地址Langchain概述Langchain作用Langchain开源包使用案例更新计划 Langchain简介 官网地址 声明: 由于操作系统, 版本更新等原因, 文章所列内容不一定100%复现, 还要以官方信息为准 https://python.langchain.com/Langchain概述 LangC…