RabbitMQ的消息可靠性保证

文章目录

    • 1.环境搭建
        • 1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
        • 2.common-rabbitmq-starter-demo下创建一个生产者一个消费者
    • 2.生产者可靠性
        • 1.开启消息超时重试机制
        • 2.生产者开启ConfirmCallback消息确认机制
          • 1.application.yml
          • 2.TestConfigPublisher.java
          • 3.测试交换机名字写错的情况
    • 3.MQ可靠性
        • 1.使用LazyQueue和持久化队列结合的方式来做
    • 4.消费者可靠性
        • 1.消费者失败重试机制
          • 1.application.yml
          • 2.解释
        • 2.消费者消息失败处理策略
          • 1.ErrorConfiguration.java 指定错误消息发送到异常交换机
          • 2.ErrorListener.java 异常队列监听器
          • 3.ErrorMessageHandler.java 异常消息处理器
          • 4.TestConfig.java配置
          • 5.TestConfigPublisher.java 生产者
          • 6.TestConfigConsumer.java 消费者故意消费失败
          • 7.测试,消费失败则重试三次后到异常处理逻辑

1.环境搭建

1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
spring:rabbitmq:# 消费者配置listener:simple:prefetch: 1 # 每次获取一条消息,处理完再获取下一条
2.common-rabbitmq-starter-demo下创建一个生产者一个消费者

CleanShot 2024-12-31 at 21.59.36@2x

2.生产者可靠性

1.开启消息超时重试机制
    # 生产者消息重试配置template:retry:# 启用消息重试机制,默认为 falseenabled: true# 初始重试间隔时间为一秒initial-interval: 1000ms# 重试最大次数,默认为 3 次max-attempts: 2# 重试的间隔倍数# 配置 2 的话,第一次等initial-interval也就是1s,第二次等 2s,第三次等 4smultiplier: 2connection-timeout: 500ms # 连接超时时间500ms
2.生产者开启ConfirmCallback消息确认机制
1.application.yml
    # 生产者配置publisher-confirm-type: correlated # 发布确认类型为异步回调(一旦配置了,就必须要有回调方法)
2.TestConfigPublisher.java
package com.sunxiansheng.publisher.pub;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.UUID;/*** Description: 测试发布者** @Author sun* @Create 2024/12/31 19:05* @Version 1.0*/
@RestController
@Slf4j
public class TestConfigPublisher {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public void send() {log.info("发送消息");// 1.创建CorrelationData对象CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.设置回调cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable throwable) {// 基本不可能发生,因为这里的异常不是MQ问题导致的log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());}@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {// 判断是否发送成功if (confirm.isAck()) {log.info("ConfirmCallback:消息发送成功:{}", confirm);} else {log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());}}});rabbitTemplate.convertAndSend("fanout.exchange.tesst", "", "hello rabbitmq", cd);}
}
3.测试交换机名字写错的情况

CleanShot 2024-12-31 at 19.57.56@2x

3.MQ可靠性

1.使用LazyQueue和持久化队列结合的方式来做
    /*** 创建一个队列** @return*/@Beanpublic Queue fanoutQueueTest() {return QueueBuilder.durable("lazyQueue") // 持久化队列.lazy()               // 惰性队列.build();}

持久化队列可以保存队列的元数据,重启后自动恢复,惰性队列可以将所有的消息都持久化到磁盘,内存只保留最近的2048条消息

4.消费者可靠性

1.消费者失败重试机制
1.application.yml
    # 消费者配置listener:simple:acknowledge-mode: auto # 自动确认模式(消费者确认机制)retry:enabled: true # 开启重试机制max-attempts: 3 # 最大重试次数initial-interval: 1000ms # 重试间隔时间multiplier: 1.0 # 重试时间间隔倍数stateless: false # false:有状态,true:无状态,如果是有状态的,每次重试都会发送到同一个队列
2.解释

首先开启了消费者自动确认机制,如果消息消费失败,就进行重试

2.消费者消息失败处理策略
1.ErrorConfiguration.java 指定错误消息发送到异常交换机
package com.sunxiansheng.rabbitmq.error;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Description: 处理失败消息的交换机和队列** @Author sun* @Create 2024/12/31 19:07* @Version 1.0*/
@Configuration
// 当配置文件中spring.rabbitmq.listener.simple.retry.enabled=true时,才会生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple", name = "retry.enabled", havingValue = "true")
public class ErrorConfiguration {/*** 一个error交换机*/@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.exchange");}/*** 一个error队列*/@Beanpublic Queue errorQueue() {return new Queue("error.queue");}/*** 绑定error队列到error交换机*/@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** MessageRecoverer*/@Beanpublic MessageRecoverer myMessageRecoverer(RabbitTemplate rabbitTemplate) {// 指定错误消息发送到error.exchange交换机,routingKey为errorreturn new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");}
}
2.ErrorListener.java 异常队列监听器
package com.sunxiansheng.consumer.error;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Description: 错误消息监听器** @Author sun* @Create 2024/12/31 20:32* @Version 1.0*/
@Component
@Slf4j
public class ErrorListener {@RabbitListener(queues = "error.queue")public void errorListener(Message message) {// 解析错误信息ErrorMessageHandler.handleErrorMessage("error.queue", message);}
}
3.ErrorMessageHandler.java 异常消息处理器
package com.sunxiansheng.consumer.error;import com.rabbitmq.client.LongString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;import java.nio.charset.StandardCharsets;
import java.util.Map;/*** Description: 错误消息处理器** @Author sun* @Create 2024/12/31 20:32* @Version 1.0*/
@Slf4j
public class ErrorMessageHandler {public static void handleErrorMessage(String listenerName, Message message) {// 获取消息属性MessageProperties messageProperties = message.getMessageProperties();String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);Map<String, Object> headers = messageProperties.getHeaders();// 从消息头部获取异常信息String exceptionMessage = (String) headers.get("x-exception-message");String originalExchange = (String) headers.get("x-original-exchange");String originalRoutingKey = (String) headers.get("x-original-routingKey");// 处理LongString类型的异常堆栈跟踪信息String exceptionStackTrace = null;if (headers.containsKey("x-exception-stacktrace")) {Object stacktraceObject = headers.get("x-exception-stacktrace");if (stacktraceObject instanceof LongString) {exceptionStackTrace = stacktraceObject.toString();}}// 格式化输出所有信息,并在前后添加分割线log.error("\n-------------------------------\n" +"MQ错误监听队列: {}\n" +"原始交换机: {}\n" +"原始路由键: {}\n" +"原始信息: {}\n" +"异常信息: {}\n" +"异常堆栈: {}\n" +"-------------------------------",listenerName, originalExchange, originalRoutingKey, messageBody, exceptionMessage, exceptionStackTrace);}
}
4.TestConfig.java配置
package com.sunxiansheng.publisher.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Description: 测试配置类** @Author sun* @Create 2024/12/31 19:00* @Version 1.0*/
@Configuration
public class TestConfig {/*** 创建一个fanout类型的交换机** @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange.test");}/*** 创建一个队列** @return*/@Beanpublic Queue fanoutQueueTest() {return QueueBuilder.durable("lazyQueue") // 持久化队列.lazy()               // 惰性队列.build();}/*** 交换机和队列绑定*/@Beanpublic Binding binding() {return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());}
}
5.TestConfigPublisher.java 生产者
package com.sunxiansheng.publisher.pub;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.UUID;/*** Description: 测试发布者** @Author sun* @Create 2024/12/31 19:05* @Version 1.0*/
@RestController
@Slf4j
public class TestConfigPublisher {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public void send() {log.info("发送消息");// 1.创建CorrelationData对象CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.设置回调cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable throwable) {// 基本不可能发生,因为这里的异常不是MQ问题导致的log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());}@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {// 判断是否发送成功if (confirm.isAck()) {log.info("ConfirmCallback:消息发送成功:{}", confirm);} else {log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());}}});rabbitTemplate.convertAndSend("fanout.exchange.test", "", "hello rabbitmq", cd);}
}
6.TestConfigConsumer.java 消费者故意消费失败
package com.sunxiansheng.consumer.con;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Description: 测试消费者** @Author sun* @Create 2024/12/31 19:03* @Version 1.0*/
@Component
@Slf4j
public class TestConfigConsumer {@RabbitListener(queues = "fanout.queue.test")public void receive(String message) {log.info("接收到的消息:{}", message);int i = 1 / 0;}
}
7.测试,消费失败则重试三次后到异常处理逻辑

CleanShot 2024-12-31 at 22.07.15@2x

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/5602.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

transformers使用过程问题

transfomers新旧版本冲突&#xff0c;和accelerate、datasets、evaluate这些库直接也经常会发生冲突 我使用了下面的版本&#xff0c;暂时没有冲突&#xff0c;如果有冲突再更新 transformers4.41.2 datasets2.20.0 accelerate0.31.0 evaluate0.4.2pip install transformers安…

Text2SQL 智能报表方案介绍

0 背景 Text2SQL智能报表方案旨在通过自然语言处理&#xff08;NLP&#xff09;技术&#xff0c;使用户能够以自然语言的形式提出问题&#xff0c;并自动生成相应的SQL查询&#xff0c;从而获取所需的数据报表&#xff0c;用户可根据得到结果展示分析从而为结论提供支撑&#…

Idea调试的时候字符串路径乱码 poi解析时表单中文名字正确,但是找不到

目录 原因 解决措施 POI表单中文名字正确但是找不到 原因 1.编码格式冲突 2.文件编码多次转换&#xff0c;已经凌乱 解决措施 1.找到工程目录下的文件夹【.idea】 2.进入【encodings.xml】文件 3.将【encodings.xml】中&#xff0c;除了<file url"PROJECT"&g…

LAYA3.0 组件装饰器说明

原文 在LayaAirIDE中&#xff0c;如果想在IDE内展示组件脚本的属性&#xff0c;需要通过装饰器的规则来实现。下面分别介绍四种装饰器。 文章目录 一、regClass()二、property()2.1 组件属性的常规使用2.2 属性访问器的装饰器使用2.3 是否序列化保存2.4 组件属性是否在IDE中显…

精选100+套HTML可视化大屏模板源码素材

大屏数据可视化以大屏为主要展示载体的数据可视化设计。 “大面积、炫酷动效、丰富色彩”&#xff0c;大屏易在观感上给人留下震撼印象&#xff0c;便于营造某些独特氛围、打造仪式感。 原本看不见的数据可视化后&#xff0c;便能调动人的情绪、引发人的共鸣。 使用方法&…

Unity中实现伤害跳字效果(简单好抄)

第一步骤安装并导入Dotween插件&#xff08;也可以不用导入之后直接下载我的安装包&#xff09; 官网DOTween - 下载 第二步&#xff1a; 制作跳字预制体 建议把最佳适应打开&#xff0c;这样就不怕数字太大显示不全了。 第三步&#xff1a;创建一个空对象并编写脚本JumpNumbe…

Java复习第四天

一、代码题 1.相同的树 (1)题目 给你两棵二叉树的根节点p和q&#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1: 输入:p[1,2,3]&#xff0c;q[1,2,3] 输出:true示例 2: 输…

【2024年华为OD机试】(C/D卷,200分)- 5G网络建设 (JavaScriptJava PythonC/C++)

一、问题描述 题目描述 现需要在某城市进行5G网络建设&#xff0c;已经选取N个地点设置5G基站&#xff0c;编号固定为1到N。接下来需要各个基站之间使用光纤进行连接以确保基站能互联互通。不同基站之间假设光纤的成本各不相同&#xff0c;且有些节点之间已经存在光纤相连。 …

Kubernetes 集群中安装和配置 Kubernetes Dashboard

前言 上篇成功部署Kubernetes集群后&#xff0c;为了方便管理和监控集群资源&#xff0c;安装Kubernetes Dashboard显得尤为重要。Kubernetes Dashboard 是一个通用的、基于 Web 的 UI&#xff0c;旨在让用户轻松地部署容器化应用到 Kubernetes 集群&#xff0c;并对这些应用进…

前端【7】javascript-dom操作

目录 DOM 加载与脚本执行的时序问题 1. 将 <script> 标签放到 HTML 末尾 2.使用 defer 属性 3. 使用 window.onload 一、获取元素 1、getElementById 2、getElementsByClassName 3、getElementsByTagName 4、querySelector和querySelectorALL 5、对象的属性关…

python学opencv|读取图像(四十)掩模:三通道图像的局部覆盖

【1】引言 前序学习了使用numpy创建单通道的灰色图像&#xff0c;并对灰色图像的局部进行了颜色更改&#xff0c;相关链接为&#xff1a; python学opencv|读取图像&#xff08;九&#xff09;用numpy创建黑白相间灰度图_numpy生成全黑图片-CSDN博客 之后又学习了使用numpy创…

【2024年终总结】我与CSDN的一年

&#x1f449;作者主页&#xff1a;心疼你的一切 &#x1f449;作者简介&#xff1a;大家好,我是心疼你的一切。Unity3D领域新星创作者&#x1f3c6;&#xff0c;华为云享专家&#x1f3c6; &#x1f449;记得点赞 &#x1f44d; 收藏 ⭐爱你们&#xff0c;么么哒 文章目录 …

MySQL主从配置

一、 主从原理 MySQL 主从同步是一种数据库复制技术&#xff0c;它通过将主服务器上的数据更改复制到一个或多个从服务器&#xff0c;实现数据的自动同步。主从同步的核心原理是将主服务器上的二进制日志复制到从服务器&#xff0c;并在从服务器上执行这些日志中的操作。 二、主…

【SpringCloud】黑马微服务学习笔记

目录 1. 关于微服务 ?1.1 微服务与单体架构的区别 ?1.2 SpringCloud 技术 2. 学习前准备 ?2.1 环境搭建 ?2.2 熟悉项目 3. 正式拆分 ?3.1 拆分商品功能模块 ?3.2 拆分购物车功能模块 4. 服务调用 ?4.1 介绍 ?4.2 RustTemplate?的使用 4.3 服务治理-注册中…

windows git bash 使用zsh 并集成 oh my zsh

参考了 这篇文章 进行配置&#xff0c;记录了自己的踩坑过程&#xff0c;并增加了 zsh-autosuggestions 插件的集成。 主要步骤&#xff1a; 1. git bash 这个就不说了&#xff0c;自己去网上下&#xff0c;windows 使用git时候 命令行基本都有它。 主要也是用它不方便&…

解决leetcode第3418题机器人可以获得的最大金币数

3418.机器人可以获得的最大金币数 难度&#xff1a;中等 问题描述&#xff1a; 给你一个mxn的网格。一个机器人从网格的左上角(0,0)出发&#xff0c;目标是到达网格的右下角(m-1,n-1)。在任意时刻&#xff0c;机器人只能向右或向下移动。 网格中的每个单元格包含一个值coin…

opengrok_windows_环境搭建

目录 软件列表 软件安装 工程索引 ​编辑 工程部署 问题列表 软件列表 软件名下载地址用途JDKhttps://download.java.net/openjdk/jdk16/ri/openjdk-1636_windows-x64_bin.zipindex 使用java工具tomcathttps://dlcdn.apache.org/tomcat/tomcat-9/v9.0.98/bin/apache-tom…

c++ 与 Matlab 程序的数据比对

文章目录 背景环境数据保存数据加载 背景 ***避免数据精度误差&#xff0c;快速对比变量 *** 环境 c下载 https://github.com/BlueBrain/HighFive 以及hdf5库 在vs 中配置库 数据保存 #include <highfive/highfive.hpp> using namespace HighFive;std::string fil…

【2024 博客之星评选】请继续保持Passion

我尝试复盘自己2024年走的路&#xff0c;希望能给诸君一些借鉴。 文章目录 回头望感想与收获成长与教训今年计划感恩一些体己话 回头望 回望我的2024年&#xff0c;年初拿高绩效&#xff0c;但感觉逐渐被公司一点点剥离出中心&#xff1b;年中一直在学习防患于未然&#xff1b…

unity插件Excel转换Proto插件-ExcelToProtobufferTool

unity插件Excel转换Proto插件-ExcelToProtobufferTool **ExcelToProtobufTool 插件文档****1. 插件概述****2. 默认配置类&#xff1a;DefaultIProtoPathConfig****属性说明** **3. 自定义配置类****定义规则****示例代码** **4. 使用方式****4.1 默认路径****4.2 自定义路径**…