21.发布确认模式-高级

问题

生产环境中由于一些不明原因,导致rabbitmq重启,在重启的期间生产者消息投递失败,导致消息丢失,需要手动处理恢复。那么如何才能进行rabbitmq的消息可靠性投递?特别是在极端的情况,rabbitmq集群不可用的时候,无法投递的消息该如何处理?

例如这样的异常信息:

方案

生产者将发送的消息发给rabbitmq的同时,将消息备份到缓存中。如果rabbitmq宕机了。会有一个定时任务会对未成功发送的消息进行重新投递。如果交换机成功收到消息会从缓存中清除已收到的消息。

分析

造成消息丢失会有两种情况,一种是交换机故障,另一个中是队列故障。

交换机确认消息是否收到的解决办法

 启用发布确认的配置

spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlated

 默认是none值,是不开启的,禁用发布确认模式。

correlated,发布消息到交换机后会触发回调方法。

simple, 单个确认消息。

代码

配置类

package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String EXCHANGE_NAME = "confirm.exchange";//队列public static final String QUEUE_NAME = "confirm.queue";//Routing Keypublic static final String ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,@Qualifier("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);}}

回调接口

package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用*  1.1 correlationData 回调消息的id及相关信息*  1.2 交换机收到消息 ack = true*  1.3 cause null* 2.发消息 交换机接收失败 回调*  2.1 correlationData 回调消息的id及相关信息*  2.2 交换机收到消息 ack = false*  2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}
}

消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}",  msg);}
}

生产者

@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlation);log.info("发送消息内容{}", message);}

结果

小技巧:如果要是测试交换机接收失败的回调,可以通过修改生产者发消息的交换机的名字为一个不存在的名字即可。

@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange"+"123", "key1", message, correlation);log.info("发送消息内容{}", message);}

 问题:上面只能保证交换机收到消息的确认回调,不能保证队列收到消息的确认回调?

队列确认消息是否收到的解决办法

比如routingKey错了,或者队列出了问题,队列也将无法收到消息。

在仅开启生产者确认机制情况下,接换机接收到消息后,会直接给消息生产者发送确认消息。如果发现该消息不可路由,那么消息会直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

解决办法

通过设置mandatory参数可以在当消息传递过程中不可达目的时将消息返回给生产者。

添加配置

spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlatedpublisher-returns: true

publiser-returns发布退回消息。

说明:这里为了测试故意把routingkey写错

代码

生产者

估计把routingKey改成错误的 key1123

@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1"+"123", message, correlation);log.info("发送消息内容{}", message);}

消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}",  msg);}
}

配置

package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String EXCHANGE_NAME = "confirm.exchange";//队列public static final String QUEUE_NAME = "confirm.queue";//Routing Keypublic static final String ROUTING_KEY = "key1";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,@Qualifier("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);}}

回调接口

package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用*  1.1 correlationData 回调消息的id及相关信息*  1.2 交换机收到消息 ack = true*  1.3 cause null* 2.发消息 交换机接收失败 回调*  2.1 correlationData 回调消息的id及相关信息*  2.2 交换机收到消息 ack = false*  2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}/*** 可以在当消息传递过程中不可达目的时将消息返回给生产者* 注意此方法是消息传递失败才会调用,成功就不会执行* @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {try {log.error("消息:{},被交换机:{},给退回了,原因:{},RoutingKey={}",new String(message.getBody(), "UTF-8"),exchange,replyText,routingKey);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/386168.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文件操作相关的精讲

目录: 思维导图 一. 文件定义 二. 文件的打开和关闭 三. 文件的顺序读写操作 四. 文件的随机读写操作 五. 文本文件和二进制文件 六. 文件读取结束的判断 七.文件缓冲区 思维导图: 一. 文件定义 1.文件定义 C语言中,文件是指一组相…

Vue3可媲美Element Plus Tree组件实战之移除节点

Element Plus Tree自定义节点内容示例中介绍了移除节点的用法,个人觉得作为提供给用户API,应该遵循迪米特法则,把功能实现的细节封装在组件内部,而提供给用户最简单的操作方式,同时在此基础上支持用户的扩展。 因此&a…

接口测试支持IDEA插件一键同步API、新增思维导图快速评审测试用例,MeterSphere开源持续测试工具v3.1.0版本发布

2024年7月29日,MeterSphere开源持续测试工具正式发布v3.1.0版本。 在这一版本中,接口测试方面,支持通过IDEA插件一键同步API至MeterSphere;测试管理方面,“测试用例”模块新增通过思维导图模式快捷评审测试用例。在“…

挑战房市预测领头羊:KNN vs. 决策树 vs. 线性回归

挑战房市预测领头羊(KNN,决策树,线性回归) 1. 介绍1.1 K最近邻(KNN):与邻居的友谊1.1.1 KNN的基础1.1.2 KNN的运作机制1.1.3 KNN的优缺点 1.2 决策树:解码房价的逻辑树1.2.1 决策树的…

CSS实现文本溢出处理

1.单行文本溢出 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wid…

总结——TI_音频信号分析仪

一、简介 设备&#xff1a;MSPM0G3507 库&#xff1a;CMSIS-DSP TI 数据分析&#xff1a;FFT 软件&#xff1a;CCS CLion MATLAB 目的&#xff1a;对音频信号进行采样&#xff08;滤波偏置处理&#xff09;&#xff0c;通过FFT获取信号的频率成分&am…

【Vue3】watchEffect

【Vue3】watchEffect 背景简介开发环境开发步骤及源码 背景 随着年龄的增长&#xff0c;很多曾经烂熟于心的技术原理已被岁月摩擦得愈发模糊起来&#xff0c;技术出身的人总是很难放下一些执念&#xff0c;遂将这些知识整理成文&#xff0c;以纪念曾经努力学习奋斗的日子。本文…

了解Selenium中的WebElement

Selenium中到处都使用WebElement来执行各种操作。什么是WebElement&#xff1f;这篇文章将详细讨论WebElement。 Selenium中的WebElement是一个表示网站HTML元素的Java接口。HTML元素包含一个开始标记和一个结束标记&#xff0c;内容位于这两个标记之间。 HTML元素的重命名 …

C#插件 调用存储过程(输出参数类型)

存储过程 CREATE PROCEDURE [dbo].[GetSum]num1 INT,num2 INT,result INT OUTPUT AS BEGINselect result num1 num2 END C#代码 using Kingdee.BOS; using Kingdee.BOS.App.Data; using Kingdee.BOS.Core.Bill.PlugIn; using Kingdee.BOS.Util; using System; using System.…

放大电路总结

补充: 只有直流移动时才有Rbe动态等效电阻 从RsUs看进去,实际上不管接了什么东西都能够看成是一个Ri(输入电阻) Ri Ui/Ii Rb//Rbe Ui/Us Ri/(RiRs) Aus (Uo/Ui)*(Ui/Us) Au *Ri/(RiRs) 当前面是一个电压源的信号 我们就需要输入电阻更大 Ro--->输出电阻--->将…

基于FFmpeg和SDL的音视频解码播放的实现过程与相关细节

目录 1、视频播放器原理 2、FFMPEG解码 2.1 FFMPEG库 2.2、数据类型 2.3、解码 2.3.1、接口函数 2.3.2、解码流程 3、SDL播放 3.1、接口函数 3.2、视频播放 3.3、音频播放 4、音视频的同步 4.1、获取音频的播放时间戳 4.2、获取当前视频帧时间戳 4.3、获取视…

MongoDB教程(二十三):关于MongoDB自增机制

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; 文章目录 引言一、MongoD…

加密货币赋能跨境电商:PayPal供应链金融服务如何引领行业新趋势

跨境电商行业近年来呈现出爆发式增长&#xff0c;随着全球化贸易壁垒的降低和数字经济的快速发展&#xff0c;越来越多的商家和消费者跨越国界进行交易。根据eMarketer的数据&#xff0c;全球跨境电商交易额在2023年已超过4万亿美元&#xff0c;并预计在未来几年内仍将保持两位…

《Java初阶数据结构》----6.<优先级队列之PriorityQueue底层:堆>

前言 大家好&#xff0c;我目前在学习java。之前也学了一段时间&#xff0c;但是没有发布博客。时间过的真的很快。我会利用好这个暑假&#xff0c;来复习之前学过的内容&#xff0c;并整理好之前写过的博客进行发布。如果博客中有错误或者没有读懂的地方。热烈欢迎大家在评论区…

ProxmoxPVE虚拟化平台--安装PVE虚拟机

Proxmox 虚拟机 Proxmox是一个基于Debian Linux和KVM的虚拟化平台&#xff0c;‌它提供了虚拟化的环境&#xff0c;‌允许用户在同一台物理机上运行多个虚拟机。‌Proxmox虚拟环境&#xff08;‌PVE&#xff09;‌是一个开源项目&#xff0c;‌由Proxmox Server Solutions Gmb…

重生之我当程序猿外包

第一章 个人介绍与收入历程 我出生于1999年&#xff0c;在大四下学期进入了一家互联网公司实习。当时的实习工资是3500元&#xff0c;公司还提供住宿。作为一名实习生&#xff0c;这个工资足够支付生活开销&#xff0c;每个月还能给父母转1000元&#xff0c;自己留2500元用来吃…

科普文:万字详解Kafka基本原理和应用

一、Kafka 简介 1. 消息引擎系统ABC Apache Kafka是一款开源的消息引擎系统&#xff0c;也是一个分布式流处理平台。除此之外&#xff0c;Kafka还能够被用作分布式存储系统&#xff08;极少&#xff09;。 A. 常见的两种消息引擎系统传输协议&#xff08;即用什么方式把消息…

探索 Milvus 存储系统:如何评估和优化 Milvus 存储性能

欢迎来到探索 Milvus 系列。Milvus 是一款支持水平扩展和具备出色性能的开源向量数据库。Milvus 的核心是其强大的存储系统&#xff0c;是数据持久化和存储的关键基础。该系统包括几个关键组成部分&#xff1a;元数据存储&#xff08;meta storage&#xff09;、消息存储&#…

LexLIP——图片搜索中的多模态稀疏化召回方法

LexLIP——图片搜索中的多模态稀疏化召回方法 FesianXu 20240728 at WeChat Search Team 前言 最近笔者在回顾&笔记一些老论文&#xff0c;准备整理下之前看的一篇论文LexLIP&#xff0c;其很适合在真实的图片搜索业务场景中落地&#xff0c;希望笔记能给读者带来启发。如…

深度学习趋同性的量化探索:以多模态学习与联合嵌入为例

深度学习趋同性的量化探索&#xff1a;以多模态学习与联合嵌入为例 参考文献 据说是2024年最好的人工智能论文&#xff0c;是否有划时代的意义&#xff1f; [2405.07987] The Platonic Representation Hypothesis (arxiv.org) ​arxiv.org/abs/2405.07987 趋同性的量化表达 …