第十二章 RabbitMQ之失败消息处理策略

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}

2.2.3. 消费者代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2.2.5. 生产者代码 

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou

 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/450961.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

爬虫逆向-js进阶(续写,搭建网站)

1.搭建简单网站1 from flask import Flask,render_template import requests import json app Flask(name)# **location**的温度是**temp**度,天气状况:**desc**app.route(/) # 绑定处理函数 def index_url():location 101010100data get_weather(lo…

Python画笔案例-086 turtle 多线程绘画

1、turtle 多线程绘画 通过 python 的turtle 库 多线程绘画,如下图: 2、实现代码 turtle 库 多线程绘画,以下为实现代码: """多线程绘画.py """ from random import random,randint from turtle import Turtle,Screen from threading

智慧光储充一体化能源管理策略

0引言 我国电动汽车的数量正在持续增长,然而,充电设施的发展却相对滞后,车与充电桩的比例远未达到规划目标。充电桩的建设面临着电网增容困难和盈利模式单一的问题。"光-储-充"一体化设备能够有效解决这些问题,通过夜间…

2024盐城大洋湾遇见迷鹿音乐节 首阵容公布开票!

生活不止一个方向,它是一场没有终点的旅行,是无尽的探索,是一次次的挑战与征服。你可以是自由的风,穿梭在城市的钢筋森林,或是旷野的无垠边际,无拘无束,只听从内心的呼唤。你可以是静止的树&…

Oracle云主机申请和使用教程:从注册到SSH连接的全过程

今天我要和大家分享如何成功申请Oracle云主机,并进行基本的配置和使用。我知道很多同行的朋友在申请Oracle云主机时都遇到了困难(疑惑abc错误),可能试了很多次都没有成功。现总结一下这些年来的一些注册流程经验,或许你们也能成功申请到自己的…

内嵌服务器Netty Http Server

内嵌式服务器不需要我们单独部署,列如SpringBoot默认内嵌服务器Tomcat,它运行在服务内部。使用Netty 编写一个 Http 服务器的程序,类似SpringMvc处理http请求那样。举例:xxl-job项目的核心包没有SpringMvc的Controller层,客户端却…

为什么Autosar钟情arxml而非json?大揭秘!

目录 往期推荐 JSON 的优缺点 优点: 缺点: XML 的优缺点 优点: 缺点: JSON与XML适用场景 Autosar选中arxml的原因 1. 复杂数据结构表示能力 2. 严格的数据验证和约束 3. AUTOSAR 历史与工具链的成熟度 4. 灵活的扩展性…

5555字的程序员脱单攻略,看了后悔一天,不看后悔一辈子!

目录 一、序言 二、破圈 三、打造社交魅力之形象改造 四、你知道怎么线下邀约女生吗 五、如何判断对方对你是否有后续 六、90%的人止步于心态上 七、内在力量的根源 一、序言 《对象说》 间歇性想找对象,持续性不想行动。 看着别人撒狗粮,躺在家…

回溯算法【组合 子集 全排列 N皇后】

大家好,最近一直在写算法,刷了力扣中部分回溯,总结了大致题型和思路,在这里分享给大家,希望大家可以有所收获!!! 目录 回溯算法的基本思想: 回溯的典型结构&#xff1a…

今日股市集体狂飙,下周一呢?

今日,中国人民银行与中国证监会联合印发《关于做好证券、基金、保险公司互换便利(SFISF)相关工作的通知》,向参与互换便利操作各方明确业务流程、操作要素、交易双方权利义务等内容。目前获准参与互换便利操作的证券、基金公司有2…

链上的羁绊,数据与节点的暗涌心跳

公主请阅 1. 合并两个有序链表1.1 题目说明示例 1示例 2示例 3 1.2 题目分析1.3 代码部分1.4 代码分析 2. 链表的中间节点2.1 题目说明示例 1示例 2 2.2 题目分析2.3 代码部分2.4 代码分析 1. 合并两个有序链表 题目传送门 1.1 题目说明 这个问题要求将两个升序链表合并成一个…

Chinese Fineweb Edu v2即将开源

Chinese Fineweb Edu🔗:https://opencsg.com/datasets/OpenCSG/chinese-fineweb-edu huggingface🔗:https://huggingface.co/opencsg

LeetCode 3319. 第 K 大的完美二叉子树的大小

LeetCode 3319. 第 K 大的完美二叉子树的大小 给你一棵 二叉树 的根节点 root 和一个整数k。 返回第 k 大的 完美二叉子树的大小,如果不存在则返回 -1。 完美二叉树 是指所有叶子节点都在同一层级的树,且每个父节点恰有两个子节点。 子树 是指树中的某一…

【4.10】图搜索算法-BFS和DFS解电话号码的字母组合

一、题目 给定一个仅包含数字 2-9 的字符串,返回所有它能表示的字母组合。答案可以按 任意顺序 返回。给出数字到字母的映射如下(与电话按键相同)。注意 1 不对应任何字母。 示例 1: 输入:digits "23" 输出…

CEEMDAN +组合预测模型(BiLSTM-Attention + ARIMA)

往期精彩内容: 时序预测:LSTM、ARIMA、Holt-Winters、SARIMA模型的分析与比较 全是干货 | 数据集、学习资料、建模资源分享! EMD、EEMD、FEEMD、CEEMD、CEEMDAN的区别、原理和Python实现(一)EMD-CSDN博客 EMD、EEM…

【高阶数据结构】揭开红黑树‘恶魔’的面具:深度解析底层逻辑

高阶数据结构相关知识点可以通过点击以下链接进行学习一起加油!二叉搜索树AVL树 大家好,我是店小二,欢迎来到本篇内容!今天我们将一起探索红黑树的工作原理及部分功能实现。红黑树的概念相对抽象,但只要我们一步步深入…

flutter assets配置加载本地图片报错

首选列出我在照着网上说的设置assets怎么搞都报错,错误如下,搞的我想骂娘。 flutter: uses-material-design: true assets: - assets/images 后来找到了下面这个教程,才终于解决,就是要在后面加一个"/" 。 flutter这个…

【分布式知识】MapReduce详细介绍

文章目录 MapReduce概述1. MapReduce编程模型Map阶段Reduce阶段 2. Shuffle和Sort阶段3. MapReduce作业的执行流程4. MapReduce的优化和特性5. MapReduce的配置和调优 MapReduce局限性相关文献 MapReduce概述 MapReduce是一个分布式计算框架,它允许用户编写可以在大…

【热门】智慧果园管理系统解决方案

随着科技的进步,原有农业种植方式已经不能满足社会发展的需要,必须对传统的农业进行技术更新和改造。经过多年的实践,人们总结出一种新的种植方法——温室农业,即“用人工设施控制环境因素,使作物获得最适宜的生长条件,从而延长生产季节,获得最佳的产出”。这种农业生产方式…

scala 类的继承

继承的定义 idea实例 语法 重写 重写:在子类中重新定义父类的同名方法 idea实例 多态 多态:传入的对象不同,调用的方法的效果就不同! 原理:参数是父类类型 idea实例 构造器