RabbitMQ高级

文章目录

  • 一.消息可靠性
    • 1.生产者消息确认
    • 2.消息持久化
    • 3.消费者确认
    • 4.消费者失败重试


MQ的一些常见问题

1.消息可靠性问题:如何确保发送的消息至少被消费一次

2.延迟消息问题:如何实现消息的延迟投递

3.高可用问题:如何避免单点的MQ故障而导致的不可用问题

4.消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

一.消息可靠性

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • -发送时丢失:

    • 生产者发送的消息未送达exchange

    • 消息到达exchange后未到达queue

  • MQ宕机,queue将消息丢失

  • consumer接收到消息后未消费就宕机

1.生产者消息确认

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认

消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack

  • publisher-return,发送者回执
    消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

在这里插入图片描述

简单来说:在publisher-confirm下的nack是消息投递到交换机失败返回的信息;在publisher-confirm下的ack是消息成功到达了消费者;在publisher-return下的ack是消息到达了交换机但是路由失败的返回信息


SpringAMQP实现生产者确认

一.想要实现生产者消息确认机制,需要在配置文件编写开启代码,即在微服务的application.yml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated publisher-returns: true template:mandatory: true

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型

    • simple:同步等待confirm结果,直到超时

    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback(推荐使用)

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

二.配置ReturnCallback

ReturnCallback是消息到达交换机但是没有成功进行路由的回调函数(作用于全局)

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString());});}
}

对于代码中的ApplicationContext是负责管理和组织Spring应用中的各个组件,如bean、配置文件等.

通过实现ApplicationContextAware这个接口,bean可以获取对ApplicationContext的引用,并因此获得访问应用上下文中的其他bean、资源和容器特性的能力,所以说实现了接口等同于获取到了bean容器,就可以获取到 rabbitTemplate并进行设置唯一的ReturnCallback

在回调函数中,消息路由失败会返回很多信息,其中使用路由键,消息的交换机的名称可以实现重发消息

三.在生产者类中发送消息并同时实现ConfirmCallback

ConfirmCallback同样是回调函数,与ReturnCallback不同的是ConfirmCallback可以创建多次

ConfirmCallback是对消息还没有进入到交换机就丢失的一种消息返回策略,当丢失后,执行回调函数并可以记录消息的失败原因和UUID,成功也是同理

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {// 消息体String message = "hello, spring amqp!";// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){ // ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 发送消息rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

需要注意的是:在手动添加交换机的过程中,想要使用通配符"#"的话,应该设置交换机为topic类型!

总结:

SpringAMQP中处理消息确认的几种情况:

  • publisher-comfirm:

    • 消息成功发送到exchange,返回ack

    • 消息发送失败,没有到达交换机,返回nack

    • 消息发送过程中出现异常,没有收到回执

  • 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback


2.消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

1.交换机持久化:

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false);
}

2.队列持久化:

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

3.消息持久化,SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 .build();

但在springamqp中,其实已经在声明交换机和队列的时候将其持久化了,发送消息的方法convertAndSend()内部也将消息做了持久化,了解消息持久化的设置方法可以将以后不是很重要的交换机,队列,消息设置为非持久化


3.消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。

  • auto(推荐):自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,抛出异常后,会不断重新发送消息即失败重试机制

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

配置方式是修改application.yml文件,添加下面配置:

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

4.消费者失败重试

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

在这里插入图片描述

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)

第三种处理策略是将重试失败的消息重新投递到指定的交换机,然后在投递到指定的队列中,形成了一个交换机-队列的错误消息存放容器,在这个容器中存放不仅有错误消息,还有错误消息头的异常栈信息

在这里插入图片描述

实现方式:

1.定义接收失败消息的交换机、队列及其绑定关系:

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true); 
}
@Bean
public Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

2.定义RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); 
}

总结:如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/234914.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

统一网关 Gateway【微服务】

文章目录 1. 前言2. 搭建网关服务3. 路由断言工厂4. 路由过滤器4.1 普通过滤器4.2 全局过滤器4.3 过滤器执行顺序 5. 跨域问题处理 1. 前言 通过前面的学习我们知道,通过 Feign 就可以向指定的微服务发起 http 请求,完成远程调用。但是这里有一个问题&am…

力扣最热一百题——只出现一次的数字

这个合集已经很久没有更新了,今天来更新更新~~~ 目录 力扣题号 题目 题目描述 示例 提示 题解 Java解法一:Map集合 Java解法二:位运算 C位运算代码 力扣题号 136. 只出现一次的数字 - 力扣(LeetCode) 下述题…

Linux 常用指令汇总

Linux 常用指令汇总 文章目录 Linux 常用指令汇总[toc]前言一、文件目录指令pwd 指令ls 指令cd 指令mkdir 指令rmdir 指令tree 指令cp 指令rm 指令mv 指令cat 指令more 指令less 指令head 指令tail 指令echo 指令> 指令>> 指令 二、时间日期指令date 指令cal 指令 三、…

MySQL语法及IDEA使用MySQL大全

在项目中我们时常需要写SQL语句,或简单的使用注解直接开发,或使用XML进行动态SQL之类的相对困难的SQL,并在IDEA中操控我们的SQL,但网上大都图方便或者觉得太简单了,完全没一个涵盖两个方面的讲解。 单表: …

[C#]winform部署PaddleDetection的yolo印章检测模型

【官方框架地址】 https://github.com/PaddlePaddle/PaddleDetection.git 【算法介绍】 PaddleDetection 是一个基于 PaddlePaddle(飞桨)深度学习框架的开源目标检测工具库。它提供了一系列先进的目标检测算法,包括但不限于 Faster R-CNN, …

音量控制软件sound control mac功能亮点

sound control mac可以帮助用户控制某个独立应用程序的音量,通过每应用音量,均衡器,平衡和音频路由独立控制每个应用的音频,还有整个系统的音量。 sound control mac功能亮点 每个应用程序的音量控制 独立控制应用的数量。 键盘音…

接口功能测试策略

由于平台服务器是通过接口来与客户端交互数据提供各种服务,因此服务器测试工作首先需要进行的是接口测试工作。测试人员需要通过服务器接口功能测试来确保接口功能实现正确,那么其他测试人员进行客户端与服务器结合的系统测试过程中,就能够排…

【现代密码学】笔记3.4-3.7--构造安全加密方案、CPA安全、CCA安全 《introduction to modern cryphtography》

【现代密码学】笔记3.4-3.7--构造安全加密方案、CPA安全、CCA安全 《introduction to modern cryphtography》 写在最前面私钥加密与伪随机性 第二部分流加密与CPA多重加密 CPA安全加密方案CPA安全实验、预言机访问(oracle access) 操作模式伪随机函数PR…

ChatGPT知名开源项目有哪些

ChatGPT-Next-Web:基于ChatGPT API的私有化部署网页聊天系统 主要功能: 只需在 1 分钟内即可在 Vercel 上一键免费部署,支持私有服务器快速部署,支持使用私有域名支持ChatGPT3.5、4等常见模型Linux/Windows/MacOS 上的紧凑型客户…

数据库开发工具Navicat Premium 15 mac软件特色

Navicat Premium 15 mac版是一款数据库开发工具,Navicat Premium 15 Mac版可以让你以单一程序同時连接到 MySQL、MariaDB、SQL Server、SQLite、Oracle 和 PostgreSQL 数据库。 Navicat Premium mac软件特色 无缝数据迁移 数据传输,数据同步和结构同步…

Mysql InnoDB行锁深入理解

Record Lock记录锁 Record Lock 称为记录锁,锁住的是一条记录。而且记录锁是有 S 锁和 X 锁之分的: 当一个事务对一条记录加了 S 型记录锁后,其他事务也可以继续对该记录加 S 型记录锁(S 型与 S 锁兼容),…

唠一唠Java线程池

第1章:引言 大家好,我是小黑,咱们今天来聊聊Java线程池,如果没有线程池,每个线程都需要手动创建和销毁线程,那将是多么低效和耗资源啊! 线程池的核心作用就是复用已创建的线程,减少…

固乔快递查询助手:批量、快速、全面的快递信息查询软件

在快递行业飞速发展的今天,如何高效、准确地掌握快递信息成为了很多人的需求。而固乔快递查询助手正是解决这一难题的利器。 固乔快递查询助手是一款专注于快递信息查询的软件,支持多家主流快递公司查询。用户只需输入单号,即可快速查询到实时…

对root用户的理解

1.什么是root用户? Windows、MacOS、Linux均采用多用户的管理模式进行权限管理。在Linux系统中,拥有最大权限的账户名为:root(超级管理员) root用户拥有最大的系统操作权限,而普通用户在许多地方的权限是受…

75应急响应-数据库漏洞口令检索应急取证箱

必要知识点 第三方应用是选择性的安装的,比如mysql,如何做好信息收集,有没有爆过它的漏洞,和漏洞探针也是获取攻击者思路的重要操作,除去本身漏洞外,提前预知或口令相关攻击也要进行筛选。 排除三方应用…

力扣刷题-二叉树-合并二叉树

617.合并二叉树(经典) 合并二叉树是操作两棵树的题目里面很经典的,如何对两棵树遍历以及处理? 给定两个二叉树,想象当你将它们中的一个覆盖到另一个上时,两个二叉树的一些节点便会重叠。 你需要将他们合并…

LabVIEW在微生物检测中的应用

随着对食品安全关注的增加,食品检测的准确性变得越来越重要。其中,微生物计数作为食品合格的关键指标,对其检测技术的准确性和实时性要求极高。传统的微生物检测面临着菌落识别困难、设备实时性差和自动化程度不高等问题,尤其在疫…

华清远见作业第二十五天——IO(第八天)

思维导图&#xff1a; 使用信号灯集完成三个进程的同步&#xff0c;A进程输出字符A&#xff0c;B进程输出字符B&#xff0c;C进程输出字符C&#xff0c;要求输出结果为ABCABCABCABCABC 代码&#xff1a; #include<stdio.h> #include<string.h> #include<stdli…

基于 IP 多播的网络会议程序(2024)

1.题目描述 局域网 IP 多播程序&#xff0c;设计一个图形界面的网络会议程序&#xff08;实现文本多播方式即可&#xff09;。 2.演示Demo 3.参考代码 广播发送代码 //服务端 #include <winsock2.h> #include <iostream> #include <list>#pragma comment(l…

test fuzz-05-模糊测试 kelinci AFL-based fuzzing for Java

拓展阅读 开源 Auto generate mock data for java test.(便于 Java 测试自动生成对象信息) 开源 Junit performance rely on junit5 and jdk8.(java 性能测试框架。性能测试。压测。测试报告生成。) test fuzz-01-模糊测试&#xff08;Fuzz Testing&#xff09; test fuzz-…