RabbitMQ高级特性 - 消费者消息确认机制

文章目录

  • RabbitMQ 消息确认机制
    • 背景
    • 消费者消息确认机制
      • 概述
      • 手动确认(RabbitMQ 原生 SDK)
      • 手动确认(Spring-AMQP 封装 RabbitMQ SDK)
        • AcknowledgeMode.NONE
        • AcknowledgeMode.AUTO(默认)
        • AcknowledgeMode.MANUAL
        • MANUAL 可能会引发的问题

RabbitMQ 消息确认机制


背景

在这里插入图片描述

上图中可以看出,从生产者发送消息消费者接收到消息并正确处理,这些里路线都可能会出现问题,那么为了保证这些消息最后能被正确处理,RabbitMQ 就提供了消息确认机制.

消费者消息确认机制

概述

在这里插入图片描述
为了保证消息从 队列 到 消费者正确消费,那么就引入了消费者消息确认机制.

a)消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种(以下讲到的方法和参数来自于 RabbitMQ 原生的 SDK,非 Spring 提供).

  • 自动确认:当 autoAck = true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后不管消费者是否真正的消费这些消息,都会从内存中删除.(适合对消息可靠性要求不高的场景).
  • 手动确认:当 autoAck = false 时,RabbitMQ 会等待消费者显示的调用 Basic.Ack 命令(波安排时间哦且确认消息),然后才会从 内存或磁盘 中删除消息.(适合对消息可靠性要求高的场景).

Ps:可靠性高了,性能也就下降了,所以请综合考虑.

b)对于 MQ队列 中的消息,在 MQ管理平台上可以看到以下两种类别:
在这里插入图片描述
Ready:队列已经准备好消息,随时准备发送给消费者 的消息数量(只要消费者来要,就立刻发送).
Unacked:消息已经发送给消费者,但是消费者没有返回消息确认 的消息数量(消息确认包括 ack肯定确认nack否定确认

手动确认(RabbitMQ 原生 SDK)

消费者在收到消息之后,可以选择确认,也可以选择拒绝或者跳过,RabbitMQ因此提供了不同的确认应答方式,消费者客户端可通过调用 channel 的相关方法实现.

a)肯定确认:消费者已经接收到消息,并且成功处理消息,可以将其丢弃了.

Channel.basicAck(long deliveryTag, boolean multiple)
  • deliveryTag:消息的唯一标识(单调递增的 long),特点如下:
    • deliveryTag 是每个 Channel 通道独立维护,所以每个通道上的都是唯一的(生产者 和 Broker 建立一个 channel 会生成一个 deliverTag,消费者 和 Broker 建立一个 channel 会生成一个 deliverTag,这俩 deliverTag 是不同的).
    • 当消费者 ack确认 一条消息时,必须使用对应的通道上 deliveryTag 进行确认.
  • multiple:是否批量确认. 如果值为 true,那么就会一次性 ack确认 所有小于或等于指定的 deliveryTag 的消息,大大减少了网络开销
    • 假设 deliveryTag = 8,multiple = true:那么 deliveryTag <= 8 的消息都会被确认.
    • 假设 deliveryTag = 8,multiple = false:只确认 8.

Ps:deliveryTag 确保了消息传递的可靠性和顺序性.

b)否定确认(单个):用来拒绝这个消息. 被拒绝的消息如何处理,具体要看 requeue 参数.

Channel.basicReject(long deliveryTag, boolean requeue)
  • requeue:标识拒绝后,这条消息如何处理.
    • requeue = true:消息会重新存入队列,将来会发送给下一个订阅的消费者.
    • requeue = false:消息会从队列中移除,因此不会发送给消费者.

c)否定确认(批量):Channel.basicReject 只能拒绝一条消息,如果要批量拒绝消息,就可以使用 Channel.basicNack.

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

multiple:参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.

d)MQ 的管理平台上也提供了几种确认方式.
在这里插入图片描述

手动确认(Spring-AMQP 封装 RabbitMQ SDK)

Spring-AMQP 对消息确认提供了三种策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}

这里根 RabbitMQ 原生 SDK 是有些不同的.

AcknowledgeMode.NONE

不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,然后从 队列 中移除消息.

a)配置手动确认

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: none

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...println("业务逻辑处理完成")}}

在这里插入图片描述

d)效果演示
触发接口之后,回到 MQ 管理平台,可以看到队列中消息已经被删除.
在这里插入图片描述

AcknowledgeMode.AUTO(默认)

分为以下情况:

  • 消费者处理消息过程中没有抛出异常,则自动确认消息,然后从 队列 中移除消息.
  • 消费者处理消息过程中抛出异常,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: auto

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...println("业务逻辑处理完成")}}

效果如下:
在这里插入图片描述
在这里插入图片描述
d)消费者(异常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")//业务处理...val a = 1 / 0println("业务逻辑处理完成")}}

效果如下:
消息未被确认,会不断重返队列,进行重试,因此 IDEA 中会循环报错输出.
在这里插入图片描述

AcknowledgeMode.MANUAL

分为以下情况:

  • 消费者在处理完消息后显示调用 basicAck 方法 来确认消息,然后从 队列 中移除消息.
  • 消费者在处理完消息后显示调用 basicNack 方法 来否定确认消息,是否从队列中移除消息需要看 requeue 参数的值
    • requeue = true:重返队列,不断重试.
    • requeue = false:丢弃消息.
  • 消费者在处理完消息后什么都不做,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111listener:simple:acknowledge-mode: manual

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val rabbitTemplate: RabbitTemplate
) {@RequestMapping("/ack")fun ack(): String {rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")return "ok"}}

c)消费者(异常处理消息,requeue = true)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...val a = 1 / 0println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, true) //requeue: true}}}

由于消息处理异常,发送 nack,并且 requeue = true,因此消息会重返队列,不断重试.
在这里插入图片描述
在这里插入图片描述

d)消费者(异常处理消息,requeue = false)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...val a = 1 / 0println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}

由于消息处理异常,发送 nack,并且 requeue = false,因此消息不会重返队列,消息被丢弃.
在这里插入图片描述
d)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset@Component
class AckListener {@RabbitListener(queues = [MQConst.ACK_QUEUE])fun handMessage(message: Message,channel: Channel,) {val deliveryTag = message.messageProperties.deliveryTagtry {println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")//业务处理...println("业务逻辑处理完成")channel.basicAck(deliveryTag, false)} catch (e: Exception) {channel.basicNack(deliveryTag, false, false) //requeue: false}}}

消息被正常处理,返回 ack.
在这里插入图片描述

MANUAL 可能会引发的问题

在这里插入图片描述
如果这里捕获的不是 Exception 异常,那么消费者处理消息的时候,可能会引发一些不会被捕获的异常,就会导致没有返回 nack.
也就意味着,没有进行确认应答,那么 mq管理平台 上就会显示 Unacked 数值 +1.

Ps:具体还是需要根据业务场景而定

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/391985.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA通过debezium实时采集mysql数据

前期准备 需要提前安装mysql并且开启binlog,需要准备kafka和zookeeper环境 示例采用debezium1.9.0版本 Maven配置 <version.debezium>1.9.0.Final</version.debezium> <dependency> <groupId>io.debezium</groupId> <artifactId>debe…

Java获取exe文件详细信息:产品名称,产品版本等

使用Maven项目&#xff0c;在pom.xml文件中注入&#xff1a; <dependency><groupId>com.kichik.pecoff4j</groupId><artifactId>pecoff4j</artifactId><version>0.4.1</version></dependency> 程序代码&#xff1a; import …

Sun Frame:基于 SpringBoot 的轻量级开发框架(个人开源项目)

文章目录 &#x1f31e; Sun Frame&#xff1a;基于 SpringBoot 的轻量级开发框架&#xff08;个人开源项目&#xff09;&#x1f680; 欢迎使用 Sun Frame&#x1f31f; 项目亮点&#x1f4e6; 模块结构&#x1f310; Sun-Cloud&#x1f4e6; Sun-Common &#x1f4a1; 示例与…

微力同步如何安装使用并使用内网穿透配置公网地址远程访问

文章目录 1.前言2. 微力同步网站搭建2.1 微力同步下载和安装2.2 微力同步网页测试2.3 内网穿透工具安装 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1.前言 私有云盘作为云存储概念的延伸&#xff0c;虽然谈不上多么新颖&#xff0c;但是其广…

主成分分析和线性判别分析

主成分分析 (PCA) PCA 是一种线性降维方法&#xff0c;通过投影到主成分空间&#xff0c;尽可能保留数据的方差。 原理 PCA 通过寻找数据投影后方差最大的方向&#xff0c;主成分是这些方向上的正交向量。 公式推理 对数据中心化&#xff1a; 其中&#xff0c;μ 是数据的…

“微软蓝屏”事件敲响网络安全的警钟

文章目录 前言一、对网络安全的警醒二、我们如何应对&#xff1f;总结 前言 “微软蓝屏”事件是一次由微软合作伙伴CrowdStrike的终端安全产品更新与操作系统内核冲突导致的全球性技术故障。这一事件不仅影响了多个国家的航空、银行、金融、零售、餐饮等多个行业&#xff0c;还…

吴恩达老师机器学习-ex8

data1 导入库&#xff0c;读取数据并进行可视化 因为这次的数据是mat文件&#xff0c;需要使用scipy库中的loadmat进行读取数据。 通过对数据类型的分析&#xff0c;发现是字典类型&#xff0c;查看该字典的键&#xff0c;可以发现又X等关键字。 import numpy as np import…

十七、Intellij IDEA2022.1.1下载、安装、激活

目录 &#x1f33b;&#x1f33b; 一、下载二、 安装三、激活 一、下载 官网下载地址 本地直接下载 目前Intellij IDEA的最新版本已经更新到了 2024.1.4&#xff0c;由于最新版本可能存在不稳定的问题&#xff0c;此处选择其他版本进行下载&#xff0c;此处以2022.1.1为例进行下…

【Spring】Bean详细解析

1.Spring Bean的生命周期 整体上可以简单分为四步&#xff1a;实例化 —> 属性赋值 —> 初始化 —> 销毁。初始化这一步涉及到的步骤比较多&#xff0c;包含 Aware 接口的依赖注入、BeanPostProcessor 在初始化前后的处理以及 InitializingBean 和 init-method 的初始…

基于STM32的智能家居安防系统教程

目录 引言环境准备智能家居安防系统基础代码实现&#xff1a;实现智能家居安防系统 门窗传感器模块视频监控模块报警与通知模块用户界面与远程控制应用场景&#xff1a;家庭安防与监控常见问题与解决方案收尾与总结 引言 随着智能家居的普及&#xff0c;家庭安防系统成为保护…

艾瑞白皮书解读(一)丨为什么说数据工程是中国企业数据治理的最佳实践?

2024年7月 艾瑞咨询公司对国内数据治理行业进行了研究&#xff0c;访问了国内多位大中型企业数据治理相关负责人&#xff0c;深度剖析中国企业在数字化转型过程中面临到的核心数据问题后&#xff0c;重磅发布《2024中国企业数据治理白皮书》&#xff08;以下简称“白皮书”&…

算法通关:017_1:二叉树及三种顺序的递归遍历

文章目录 题目思路代码运行结果 题目 二叉树及三种顺序的递归遍历 思路 代码 /*** Author: ggdpzhk* CreateTime: 2024-08-04** 二叉树及三种顺序的递归遍历* LeetCode 144. 二叉树的前序遍历* LeetCode 94. 二叉树的中序遍历* LeetCode 145. 二叉树的后序遍历* LeetCode 10…

龙迅LT8713SX 高性能TYPE-C/DP转三端口DP1.4/HDMI 2.0转换器,带音频

龙迅LT8713SX描述&#xff1a; LT8713SX是一个高性能类型-C/DP1.4到Type-C/DP1.4/HDMI2.0转换器&#xff0c;具有三个可配置的DP1.4/HDMI2.0/DP输出接口和音频输出接口。LT8713SX同时支持显示端口™单流传输&#xff08;SST&#xff09;模式和多流传输&#xff08;MST&#xf…

Adobe Acrobat不支持图片格式转换PDF文件

我在将图片格式&#xff08;PNG,JPEG&#xff09;转换为PDF的过程中遇到了如下问题&#xff1a; 单文件的解决办法——在软件外实现转换&#xff1a; 使用照片打开图片 选择打印 打印机选择Adobe PDF&#xff0c;执行打印 选择PDF文件的保存位置&#xff0c;过一会儿可以正…

基本K8s搭建Jekins+gitee项目自动部署

这里写目录标题 1.基本K8s部署安装Jekins2.设置Jenkins国内镜像源2.安装Gitee插件1.安装Gitee Plugin2.验证安装Gitee Plugin 3.新建任务1.输入任务名称2.输入你gitee上的项目链接3.测试构建 4.查看项目在k8s集群master节点的位置1.确认 Jenkins Pod 名称2.使用kubectl exec到 …

视频如何生成二维码(自动生成二维码)完整教程

在企业中&#xff0c;产品视频二维码怎么制作&#xff0c;产品二维码怎么实现微信扫码便捷观看&#xff1f;上图文教程&#xff1a;视频二维码生成器/上传视频自动生成二维码完整教程。 目前市面上有很多工具&#xff0c;可以实现&#xff0c;比如草料二维码、酷播云二维码等等…

【Web开发手礼】探索Web开发的秘密(十四)-Vue2(1)Node.js的安装、Vue入门

主要介绍了Node.js的安装教程、Vue2常用的一些指令、声明周期&#xff01;&#xff01;&#xff01; 文章目录 前言 Node.js安装 选择安装目录 验证NodeJS环境变量 配置npm的全局安装路径 切换npm的淘宝镜像 安装Vue-cli ​编辑 Vue2入门 引入vue.js文件 入门代码 常用指令 生…

前端(vue3)和后端(django)的交互

vue3中&#xff1a; <template><div><h2>注册页面</h2><form submit.prevent"submitForm"><label for"username">用户名&#xff1a;</label><input type"text" id"username" v-model…

AWS S3怎么收费的?一文带你搞懂!

Amazon Simple Storage Service&#xff08;S3&#xff09;是亚马逊网络服务&#xff08;AWS&#xff09;提供的一个高度可扩展的对象存储服务&#xff0c;广泛应用于数据存储、备份、归档和大数据分析等领域。S3的计费模式相对灵活&#xff0c;旨在满足不同用户的需求。本文中…

小试牛刀-walletconnect二维码及交互

目录 1.编写目的 2.实现功能 3.功能详解 依赖组件 3.1 二维码生成 3.1.1 初始化SignClient 3.1.2 创建会话空间获取WC协议uri 3.1.3 生成二维码供用户扫描 3.1.4 等待扫描 3.2 发送交易事务 3.2.1 创建交易事务 3.2.2 向用户发送交易事务 3.3 签名事务 3.3.1 接收…