互联网全景消息(2)之RabbitMq高阶使用

一、RabbitMQ消息可靠性保障

        消息的可靠性投递是使用消息中间件不可避免的问题,不管是Kafka、rocketMQ或者是rabbitMQ,那么在RabbitMQ中如何保障消息的可靠性呢?

        首先来看一下rabbitMQ的 架构图:

        首先从图里我们可以看到,消息投递的保障性主要从三个方面来解决:

  • 生产者;
  • Broker;
  • 消费者; 

1.1 生产者保障 

        生产者发送消息到broker时,要保障消息的可靠性,主要方案有以下两种:

  1. 生产者确认;
  2. 失败通知; 

         首先RabbitMQ生产者通过制定一个Exchange和routingkey把消息送达到某个队列中,然后消费者监听队列进行消费处理。但是在某些情况下,如果我们在发送消息,当前的exchange不存在或者指定的routingkey找不到对应的队列,这个时候如果要监听这种不可达的消息,就需要失败通知了。

1.1.1 交换器、队列、路由健的关系

        队列通过路由健(routingkey,某种规则)绑定到交换器中,生产者将消息发布到交换器中,交换器根据绑定的routingkey将消息路由到指定队列中,然后由订阅这个队列的消费者进行监听消费。

 

        此时就会存在一个问题,消息路由到了不存在的队列怎么办?一般情况下RabbitMQ会直接忽略,当这个消息不存在,也就是消息丢弃了。

        所以在不做任何配置的情况下,生产者是不知道消息是否真正达到rabbitMQ,也就是说消息发布不会返回任何消息给生产者。

1.1.2 失败通知 

        那如何保证我们消息发布的可靠性,这里我们就可以启动失败通知,在原生的编码中可以在发送消息的时候设置Mandatory,即可开启故障检测模式。

        注意:他只会让RabbitMQ向你通知失败,而不会通知成功,如果消息正确的路由到队列,则发布者不会收到任何通知。带来的问题就是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。

1.1.2.1 实现方式

        spring配置方式:

spring:

        rabbitmq:

                # 消息在未被队列收到的情况下返回

                publisher-returns: true

         关键代码,注意需要发送者实现 ReturnCallback 接口方可实现失败通知

 

1.1.2.2 存在的问题 

        如果消息正确路由到队列,则发布者不会收到任何通知。带来的问题就是无法确保消息一定是成功的,因为通知失败的消息可能会丢失。

        这样子我们可以使用RabbitMQ的发送方确认来实现,它不仅仅在路由失败的时候给我们发送消息,并且能够在路由成功的时候也给我们发送消息。

 1.1.3 发送方确认

        发送方确认是指生产者在投递消息后,如果Broker接收到消息,则会给生产者一个应答。生产者进行接收应答,用来确认这条消息是否正常的发送到Broker,这种方式也是可靠消息投递的核心保证。 

        rabbitMQ消息发送分为两个阶段:

  • 将消息发送到broker,即发送到exchange交换机;
  • 消息通过exchange被路由到队列; 

        一旦消息投递到队列,队列则会向生产者发送一个通知,如果队列设置了消息持久到磁盘,则会等待消息持久化到磁盘之后再发送通知。

        注意:发送者确认只有出现RabbitMQ内部错误才会出现发送者确认失败。 

        在发送者确认这种模式也可以分为具体两种情况来看待:

  1. 队列不可路由;
  2. 队列可路由; 
1.1.3.1 队列不可路由 

        当前的消息达到交换器之后,对于发送者确认是成功的。因为此时的消息已经到达了broker,此时只是不可路由队列他认为是成功的。

 

        首先RabbitMQ交换器不可路由时,消息也根本 不会投递到队列中,所以这里他只管到交换器这里,当消息成功到达交换器后,就会进行确认操作。 

        另外在这过程中,生产者收到了确认之后,那么因为消息不可路由,所以该消息也是无效的相当于被抛弃了,无法到达队列,所以一般这里会结合失败通知来一同使用,这里一般会进行mandatory模式,失败则会调用addReturnListener监听器来处理。

1.1.3.2 队列可以路由

        只要消息能够到达队列即可进行确认,一般是RabbitMQ发生内部错误才会出现确认失败的情况; 

         

        可以路由的消息,要等到被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达队列了。

        如果消息和队列是可持久化的,那么消息会在将消息写入磁盘之后发出,broker回传给生产者的确认小学中delivery-tag包含了确认消息的序列号。

1.1.3.3 使用方式

        Spring配置:

spring:rabbitmq:publisher-confirm-type: correlated

         关键代码,注意需要发送者实现 ConfirmCallback 接口方可实现失败通知:

 1.1.4 broker丢失消息

        前面我们从生产者的角度分析了消息可靠性传输的原理和实现,接下来就要看下broker是如何保障消息的可靠性传输的。

        假设生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到队列中,但是此时消费者还没有进行消费的时候,mq挂掉了,那么重启之后消息就会不存在,那样子就不能保障消息的可靠性 传输了。

        所以此时就要开启RabbitMQ的持久化,也就是将消息持久化到磁盘,此时即使MQ挂掉了,重启之后也会自动读取之前存储的数据。

1.1.4.1 持久化队列 

         在spring开启一个持久化队列。

  @Configurationpublic class RabbitConfig {public static final String DURABLE_QUEUE_NAME = "durable_queue";@Beanpublic Queue durableQueue() {// 创建一个持久化的队列return new Queue(DURABLE_QUEUE_NAME, true); // 第二个参数为true表示队列持久化}}
1.1.4.2 持久化交换器

@Configuration
public class RabbitConfig {public static final String DURABLE_EXCHANGE_NAME = "durable_exchange";public static final String DURABLE_QUEUE_NAME = "durable_queue";public static final String ROUTING_KEY = "durable_routing_key";@Beanpublic DirectExchange durableExchange() {// 创建一个持久化的Direct Exchangereturn new DirectExchange(DURABLE_EXCHANGE_NAME, true, false);}@Beanpublic Queue durableQueue() {// 创建一个持久化的队列return new Queue(DURABLE_QUEUE_NAME, true); // 第二个参数为true表示队列持久化}@Beanpublic Binding binding(Queue durableQueue, DirectExchange durableExchange) {// 绑定队列到交换器return BindingBuilder.bind(durableQueue).to(durableExchange).with(ROUTING_KEY);}
}
 1.1.4.3 发送持久化消息

         在发送消息的时候,需要设置属性deliveryMode=2,表示发送的是一个持久化消息,需要注意的是在springboot中,发送消息时已经自动设置了deliveryMode为2,不需要人工再去设置一遍。

@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendPersistentMessage(String messageContent) {// 创建消息属性,并设置为持久化MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 创建消息Message message = new Message(messageContent.getBytes(), messageProperties);// 发送消息到指定的交换器rabbitTemplate.convertAndSend(RabbitConfig.DURABLE_EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message);System.out.println("Sent message: " + messageContent);}
}

 1.1.5 总结

        生产者以及Broker要保障消息传递的可靠性如果结合失败通知以及发送方确认和持久化消息来实现。

1.发送方确认:保障消息能够到达broker;

2.失败通知:保障的是消息能够成功路由到队列;

3.持久化队列:保障消息的持久化;

1.2 消费者消息可靠性 

        消费者接收到消息,但是还未处理或者还未处理完成,此时消费者进程挂了,比如重启或者异常中断,此时mq会认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。 

        那该如何避免这种情况呢?这就要使用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也就是自己的程序确定消息是否已经处理完成。如果此时出现消息未处理完成进程挂掉的情况,由于没有提交ack,rabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息不回丢失。 

spring:rabbitmq:listener:simple:acknowledge-mode: manual

        acknowledge-mode: manual代表开启手动ack,该配置项的其他两个参数值为none和auto;

  • auto:消费者根据程序执行的正常或者抛出异常来决定是抛出ack或者nack;
  • munual:手动ack,用户必须手动提交ack或者nack;
  • none:没有ack机制; 

        默认值是none,如果将ack的模式设置auto,此时如果消费者执行异常的话,就相当于执行了nack方法,消息会被放置到队列的头部,消息会被无限期的执行,从而导致后续消息无法执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/415446.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

脑机接口定义及相关概念

1 什么是脑机接口 脑机接口(Brain-Computer Interface,简称,BCI)是指一种系统或设备,它通过解码大脑的电生理信号来与外部计算机或设备进行直接的通讯。BCI的目的是在不依赖身体运动的情况下实现大脑与计算机之间的信息交换。 2 相关概念 2.1 脑电图(EEG) 最常用的脑机接口技…

C# 窗口页面布局

1.Groupbox 单机鼠标右键,置于底层 2.Label 在右方属性中修改名称 3.ComboBox 点击属性中的集合,可以添加选择项 4.CheckBox 在属性中修改名称 5.RichTextBox 富文本 在属性中修改名称与区域 6.StatusStrip 状态栏 将AutoSize改成false就可以修改…

深入探索MySQL数据库结构设计:实战案例解析,打造高效、可扩展的数据存储方案

作者简介:我是团团儿,是一名专注于云计算领域的专业创作者,感谢大家的关注 座右铭: 云端筑梦,数据为翼,探索无限可能,引领云计算新纪元 个人主页:团儿.-CSDN博客 前言:…

vue-echarts :知识图谱可视化,动态更新 动态赋值series,更新options

<template><div style="display: flex;align-items: center;justify-content: space-between;"><

【大模型llms本质,并分析未来发展反向】

大模型的本质: 是有损压缩后的概率模型 2024年2月28日&#xff0c;OpenAI 的核心研发人员 Jack Rae 在参加 Stanford MLSys Seminar 的访谈时进行了一个名为 Compression for AGI 的主题分享&#xff0c;其核心观点为&#xff1a;AGI的一个关键目标是通过最小描述长度&#xf…

苹果11月推出新款M4 Mac:Mac mini设计焕新 MacBook Pro仅例行更新

据外媒 MacRumors 报道&#xff0c;苹果公司计划在 11 月推出首批 M4 Mac&#xff0c;这一时间表与去年相似&#xff0c;当时苹果公司在同样的时间点中宣布推出搭载 M3 芯片的 MacBook Pro。 ▲ 苹果公司在 2023 年 10 月 31 日推出的 M3 MacBook Pro 同时根据古尔曼爆料称苹果…

安宝特科技 | AR眼镜在安保与安防领域的创新应用及前景

随着科技的不断进步&#xff0c;增强现实&#xff08;AR&#xff09;技术逐渐在多个领域展现出其独特的优势&#xff0c;尤其是在安保和安防方面。AR眼镜凭借其先进的功能&#xff0c;在机场、车站、海关、港口、工厂、园区、消防局和警察局等行业中为安保人员提供了更为高效、…

Tableau 社区项目 | 参与 Data+TV 挑战,洞悉全球电视剧集数据的精彩故事!

如果你钟爱某部电视剧集&#xff0c;正苦于没有数据练手&#xff0c;就快来参与 DataTV 挑战吧~ 去年&#xff0c;Tableau 和 IMDb 携手发起 DataMovies 挑战&#xff0c;吸引了全球各地的数据爱好者与影迷参与。今年&#xff0c;TC24 Viz 竞赛也以此为主题&#xff0c;让我们领…

SprinBoot+Vue问卷调查微信小程序的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue3.6 uniapp代码 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平…

vsCode多文件标签栏换行显示

1.文件——首选项——点‘设置’ 2.输入 wrap tabs 并勾选Workbench › Editor: Wrap Tabs

Spring Boot源码阅读——spring.factories的加载机制

Spring Boot源码阅读——spring.factories的加载 提到 SpringBoot 的自动装配&#xff0c;不管是文章还是视频&#xff0c;都会提到 spring.factories 这个文件&#xff0c;这篇文章就来简单讲讲 spring.factories 的作用&#xff0c;以及它是怎么被加载的 简介 位置 以 Sprin…

Opencv中的直方图(3)直方图比较函数compareHist()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 比较两个直方图。 函数 cv::compareHist 使用指定的方法比较两个密集或两个稀疏直方图。 该函数返回 d ( H 1 , H 2 ) d(H_1, H_2) d(H1​,H2​…

学习笔记--Docker

安装 1.卸载旧版 首先如果系统中已经存在旧的Docker&#xff0c;则先卸载&#xff1a; yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine 2.配置Docker的yum库 首先要安…

Socket编程---UDP篇

目录 一. UDP协议 二. Socket编程 2.1 sockaddr家族 2.2 接口介绍 三. 服务端实现 四. 服务端调用实现 五. 客户端实现 六. 效果展示 一. UDP协议 何为UDP协议的含义&#xff0c;上篇粗略提及了一下TCP与UDP的区别&#xff1a; TCP&#xff1a; •…

2024最新盘点:主流的仓库管理软件有哪些?

本文将盘点10款主流的仓库管理软件&#xff0c;为企业选型仓库管理软件提供参考。 库存数据不准确、订单处理效率低下、仓库作业流程不规范&#xff1f;在数据管理与分析上遇到困难&#xff0c;企业发展和竞争力受阻&#xff1f;物流行业高速发展&#xff0c;而仓储管理却遇到重…

【Python入门】第1节 基础语法

&#x1f4d6;第1节 基础语法 ✅字面量✅注释✅变量✅数据类型&#x1f9ca;数据类型转换 ✅标识符✅运算符✅字符串扩展&#x1f9ca;字符串的三种定义方式&#x1f9ca;字符串拼接&#x1f9ca;字符串格式化&#x1f9ca;格式化的精度控制&#x1f9ca;字符串格式化方式2&…

Windows bat脚本学习六(十六进制与十进制互转)

一、十六进制转十进制 十六进制数转十进制数相对比较简单&#xff0c;可以直接通过0x来实现。 见如下代码&#xff1a; echo off chcp 65001set taaset /a hex0x%t% echo data%hex%pause 结果&#xff1a; 二、十进制转十六进制 这个转化比较麻烦&#xff0c;没有简便的方式转…

通过 GitHub Actions 执行数据库 Schema 变更工作流

原文地址 https://www.bytebase.com/docs/tutorials/github-ci/ 教程库&#xff1a;https://github.com/bytebase/github-action-example 开发者们喜欢将 Schema 变更脚本与应用程序代码一起保存在 Git 中&#xff0c;这样变更脚本就能像应用程序代码一样接受审核和版本控制&…

哪款直流电能表可在电信基站、直流充电桩、太阳能光伏用

安科瑞徐赟杰18706165067 在电信基站、直流充电桩、太阳能光伏等应用场合中&#xff0c;可使用DJSF1352-RN导轨式直流电能表&#xff0c;此表带有双路直流输入&#xff0c;该系列仪表可测量直流系统中的电压、电流、功率以及正反向电能等。可计量总电能&#xff0c;又可计量规…

ev录屏损坏修复

ev录屏应该不正常关闭&#xff0c;录屏损坏 淘宝买了一个软件&#xff0c;修复成功&#xff0c;需找一个当时时间段的正常录屏学习&#xff0c;然后高级修复。整体花费5毛钱