RabbitMQ消息可靠性等机制详解(精细版三)

目录

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

7.1.2 Return机制(保证发送可靠)

7.1.3 编写配置文件

7.1.4 开启Confirm和Return

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件

7.2.2 手动ack

7.3 避免消息重复消费

7.3.1 导入依赖

7.3.2 编写配置文件

7.3.3 修改生产者

7.3.4 修改消费者


 官方文档  RabbitMQ Documentation | RabbitMQ

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章 

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

消息传递可靠性

7.1.2 Return机制(保证发送可靠)

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

在消息发送方项目上加入下面内容:

7.1.3 编写配置文件
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testpublisher-confirms: truepublisher-returns: true

7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/*** @author 听忆*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​@PostConstruct  // init-methodpublic void initMethod(){//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);
​//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}
​@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}
​@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件
  • 在消费方application.yml文件添加下面配置, 改为手动应答机制.

spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testlistener:simple:acknowledge-mode: manual

7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
/*** @author 听忆*/
@Component
public class Consumer {
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);try {int i = 1 / 0;/*** 消费者发起成功通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个参数:是否开启批量处理 false:不开启批量* 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,*          当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {e.printStackTrace();/*** 返回失败通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝* 第三个boolean true消息接收失败重新回到原有队列中*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}
​}
}

7.3 避免消息重复消费

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

  1. 为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

  2. id-0(正在执行业务)

  3. id-1(执行业务成功)

  4. 然后使用ack给RabbitMQ返回消息

  5. 如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

  6. 极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx

       作用:如果为空就set值,并返回1, true

​ 如果存在(不为空)不进行操作,并返回0, false​

7.3.1 导入依赖

生产者和消费者都加入下面依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.5</version>
</dependency>

7.3.2 编写配置文件
spring:redis:host: 你的地址port: 6379

7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//第四个参数: 设置消息唯一idrabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);System.in.read();
}

7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
​
/*** @author 听忆*/
/*** java中的方法叫做setIfAbsent, redis中的命令叫做setnx* 作用:*      如果为空就set值,并返回1, true*      如果存在(不为空)不进行操作,并返回0, false*/
@Component
public class Consumer {
​@Autowiredprivate StringRedisTemplate redisTemplate;
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageId, 消息唯一idString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redisif(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
​//2. 消费消息System.out.println("接收到消息:" + msg);
​//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
​//4.  手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
​}else {
​//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
​}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/368911.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA:文件防重设计指南

1、简述 在现代应用程序中&#xff0c;处理文件上传是一个常见的需求。为了保证文件存储的高效性和一致性&#xff0c;避免重复存储相同的文件是一个重要的优化点。本文将介绍一种基于哈希值的文件防重设计&#xff0c;并详细列出实现步骤。 2、设计原理 文件防重的基本思路…

如何使用 3D 建模库在 C# 中将 3DS 转换为 USDZ?

USDZ/USD是一种 3D 文件格式&#xff0c;被广泛用于跨平台共享 3D 资产。另一方面&#xff0c;3DS是另一种以块形式存储数据的 3D 文件格式。在某些情况下&#xff0c;您需要将3DS 文件转换为 USDZ/USD文件格式。因此&#xff0c;本篇博文介绍了一个功能丰富的3D 建模库&#x…

6月30日功能测试Day10

3.4.4拼团购测试点 功能位置&#xff1a;营销-----拼团购 后台优惠促销列表管理可以添加拼团&#xff0c;查看拼团活动&#xff0c;启动活动&#xff0c;编辑活动&#xff0c;删除活动。 可以查看拼团活动中已下单的订单以状态 需求分析 功能和添加拼团 商品拼团活动页 3…

【Sping Boot2】笔记

Spring Boot 2入门 如何创建一个Spring Boot的Web例子&#xff1f;1.如何创建一个Spring Boot项目1.1 使用Maven构建一个Spring Boot 2项目1.1.1创建Maven工程注&#xff1a;Maven项目结构&#xff1a; 1.1.2引入SpingBoot相关依赖依赖注意事项&#xff1a; 1.1.3创建主类1.1.4…

传统数据处理系统存在的问题

传统应用的数据系统架构设计时&#xff0c;应用直接访问数据库系统。当用户访问量增加时&#xff0c;数据库无法支撑日益增长的用户请求的负载&#xff0c;从而导致数据库服务器无法及时响应用户请求&#xff0c;出现超时的错误。 出现这种情况以后&#xff0c;在系统架构上就采…

【python】OpenCV—Nighttime Low Illumination Image Enhancement

文章目录 1 背景介绍2 代码实现3 原理分析4 效果展示5 附录np.ndindexnumpy.ravelnumpy.argsortcv2.detailEnhancecv2.edgePreservingFilter 1 背景介绍 学习参考来自&#xff1a;OpenCV基础&#xff08;24&#xff09;改善夜间图像的照明 源码&#xff1a; 链接&#xff1a…

Word “当前页“ 与 “前一页“ (含部分内容)间有大半页空白,删除空白方法

鼠标光标选中需要向上移的句子&#xff0c;右键点击“段落”&#xff0c;然后在跳出的窗口中按照“换行和分页”中的红色方框内取消勾选后&#xff0c;点击确定即可。

Python | Leetcode Python题解之第216题组合总和III

题目&#xff1a; 题解&#xff1a; class Solution:def combinationSum3(self, k: int, n: int) -> List[List[int]]:"""回溯法&#xff0c;对于当前k和n, 枚举元素"""def backtracking(k: int, n: int, ans: List[int]):if k 0 or n <…

《米小圈日记魔法》边看边学,轻松掌握写日记的魔法!

在当今充满数字化娱乐和信息快速变迁的时代&#xff0c;如何创新引导孩子们学习&#xff0c;特别是如何培养他们的写作能力&#xff0c;一直是家长和教育者们关注的焦点。今天就向大家推荐一部寓教于乐的动画片《米小圈日记魔法》&#xff0c;该系列动画通过其独特的故事情节和…

【Unity配置数据文件】ScriptableObject核心应用

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 专栏交流&#x1f9e7;&…

【Linux进程通信】共享内存

目录 共享内存函数 头文件 shmget ftok函数​ shmat shmdt shmctl 共享内存区是最快的IPC 形式。一旦这样的内存映射到共享它的进程的地址空间&#xff0c;这些进程间数据传递不再涉及到操作系统内核&#xff0c;换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据…

探索如何赋予对象迭代魔法,轻松实现非传统解构赋值的艺术

前言 今天下午在网上冲浪过程中看到这样一个问题 面试题&#xff1a;如何让 var [a, b] {a: 1, b: 2} 解构赋值成功&#xff1f; 据说是某大厂面试题&#xff0c;于是我学习了一下这个问题&#xff0c;写下这篇文章记录一下。 学习过程 要想解决这个问题首先要知道什么是解…

Qt:7.QWidget属性介绍(cursor属性-光标形状、font属性-控件文本样式、tooltip属性-控件提示信息)

目录 一、cursor属性-光标形状&#xff1a; 1.1cursor属性介绍&#xff1a; 1.2获取当前光标形状——cursor()&#xff1a; 1.3 设置光标的形状——setCursor()&#xff1a; 1.4 设置自定义图片为光标&#xff1a; 二、font属性-控件文本样式&#xff1a; 2.1font属性介绍…

excel PivotTable 透视表

开发数据导出excel功能&#xff0c;设置导出透视表 数据源&#xff1a; 透视表&#xff1a; 使用插件EPPlus 数据源&#xff1a; IF OBJECT_ID(tempdb..#temptable) IS NOT NULLDROP TABLE #temptable; CREATE TABLE #temptable ( [PROJECT] varchar(50), [PRODUCT_CODE] var…

VSCode 自动调整格式失效了 ESLint

ESLint【最新注意2.4.4版本有问题&#xff0c;需退回2.4.2版本就恢复正常了】 参考&#xff1a;vscode自动格式化失效_vscode保存自动格式化失效-CSDN博客

AI PC(智能电脑)技术分析

一文看懂AI PC&#xff08;智能电脑&#xff09; 2024年&#xff0c;英特尔、英伟达等芯片巨头革新CPU技术&#xff0c;融入AI算力&#xff0c;为传统PC带来质的飞跃&#xff0c;引领智能计算新时代。 2024年&#xff0c;因此被叫作人工智能电脑&#xff08;AI PC&#xff09;…

一文带你初探FreeRTOS信号量

本文记录我初步学习FreeRTOS的信号量的知识&#xff0c;在此记录分享&#xff0c;希望我的分享对你有所帮助&#xff01; 什么是信号量 在FreeRTOS中&#xff0c;信号量&#xff08;Semaphore&#xff09;是一种用于任务间同步和资源共享的机制。信号量主要用于管理对共享资源的…

汽车电子行业知识:什么是电子后视镜

文章目录 1.什么是电子后视镜2.有哪些汽车用到了电子后视镜3.电子后视镜的原理及算法4.电子后视镜的优点5.电子后视镜的未来市场将继续增长 1.什么是电子后视镜 电子后视镜是一种集成了电子元件和显示屏的汽车后视镜&#xff0c;用于替代传统的机械后视镜。它通过内置的摄像头捕…

九浅一深Jemalloc5.3.0 -- ⑨浅*gc

目前市面上有不少分析Jemalloc老版本的博文&#xff0c;但5.3.0却少之又少。而且5.3.0的架构与之前的版本也有较大不同&#xff0c;本着“与时俱进”、“由浅入深”的宗旨&#xff0c;我将逐步分析Jemalloc5.3.0的实现。 另外&#xff0c;单讲实现代码是极其枯燥的&#xff0c;…

使用React复刻ThreeJS官网示例——keyframes动画

最近在看three.js相关的东西&#xff0c;想着学习一下threejs给的examples。源码是用html结合js写的&#xff0c;恰好最近也在学习react&#xff0c;就用react框架学习一下。 本文参考的是threeJs给的第一个示例 three.js examples (threejs.org) 一、下载threeJS源码 通常我们…