Rabbitmq延迟消息

目录

  • 一、延迟消息
    • 1.基于死信实现延迟消息
      • 1.1 消息的TTL(Time To Live)
      • 1.2 死信交换机 Dead Letter Exchanges
      • 1.3 代码实现
    • 2.基于延迟插件实现延迟消息
      • 2.1 插件安装
      • 2.2 代码实现
    • 3.基于延迟插件封装消息

一、延迟消息

延迟消息有两种实现方案:
1,基于死信队列
2,集成延迟插件

1.基于死信实现延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:
消息的TTL(存活时间)和死信交换机Exchange,通过这两者的组合来实现延迟队列

1.1 消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如何设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。

1.2 死信交换机 Dead Letter Exchanges

一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1) 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息过期了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
在这里插入图片描述
我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建交换机
(3)建立交换器与队列之间的绑定
(4)创建队列

1.3 代码实现

在service-mq 中添加配置类

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterMqConfig {// 声明一些变量public static final String exchange_dead = "exchange.dead";public static final String routing_dead_1 = "routing.dead.1";public static final String routing_dead_2 = "routing.dead.2";public static final String queue_dead_1 = "queue.dead.1";public static final String queue_dead_2 = "queue.dead.2";// 定义交换机@Beanpublic DirectExchange exchange(){return new DirectExchange(exchange_dead,true,false,null);}@Beanpublic Queue queue1(){// 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上!HashMap<String, Object> map = new HashMap<>();// 参数绑定 此处的key 固定值,不能随意写map.put("x-dead-letter-exchange",exchange_dead);map.put("x-dead-letter-routing-key",routing_dead_2);// 设置延迟时间map.put("x-message-ttl ", 10 * 1000);// 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数return new Queue(queue_dead_1,true,false,false,map);}@Beanpublic Binding binding(){// 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);}// 这个队列二就是一个普通队列@Beanpublic Queue queue2(){return new Queue(queue_dead_2,true,false,false,null);}// 设置队列二的绑定规则@Beanpublic Binding binding2(){// 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上!return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);}
}

配置发送消息

@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitService rabbitService;@GetMapping("sendDeadLettle")public Result sendDeadLettle() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok");System.out.println(sdf.format(new Date()) + " Delay sent.");return Result.ok();}
}

消息接收方

@Component
public class DeadLetterReceiver {@RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)public void getMessage(String msg, Message message, Channel channel) throws IOException {//时间格式化SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss");System.out.println("消息接收的时间:\t"+simpleDateFormat.format(new Date()));System.out.println("消息的内容"+msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}

在这里插入图片描述

2.基于延迟插件实现延迟消息

2.1 插件安装

Rabbitmq实现了一个插件x-delay-message来实现延时队列

  1. 首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.9.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html
  2. 切换到插件所在目录,执行 docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins 命令,将刚插件拷贝到容器内plugins目录下
  3. 执行 docker exec -it rabbitmq /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录
  4. 执行 ls -l|grep delay 命令查看插件是否copy成功
  5. 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
  6. exit命令退出RabbitMQ容器内部,然后执行 docker restart rabbitmq 命令重启RabbitMQ容器

2.2 代码实现

配置队列

@Configuration
public class DelayedMqConfig {public static final String exchange_delay = "exchange.delay";public static final String routing_delay = "routing.delay";public static final String queue_delay_1 = "queue.delay.1";@Beanpublic Queue delayQeue1() {// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化return new Queue(queue_delay_1, true);}@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);}@Beanpublic Binding delayBbinding1() {return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();}
}

发送消息

@GetMapping("sendelay")
public Result sendDelay() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, sdf.format(new Date()), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10 * 1000);System.out.println(sdf.format(new Date()) + " Delay sent.");return message;}});return Result.ok();
}

接收消息

@Component
public class DelayReceiver {@RabbitListener(queues = DelayedMqConfig.queue_delay_1)public void get(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("Receive queue_delay_1: " + sdf.format(new Date()) + " Delay rece." + msg);}}

3.基于延迟插件封装消息

/*** 封装发送延迟消息方法* @param exchange* @param routingKey* @param msg* @param delayTime* @return*/
public Boolean sendDelayMsg(String exchange,String routingKey, Object msg, int delayTime){//  将发送的消息 赋值到 自定义的实体类GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  声明一个correlationId的变量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);gmallCorrelationData.setDelayTime(delayTime);gmallCorrelationData.setDelay(true);//  将数据存到缓存this.redisTemplate.opsForValue().set(correlationId,JSON.toJSONString(gmallCorrelationData),10,TimeUnit.MINUTES);//  发送消息this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,message -> {//  设置延迟时间message.getMessageProperties().setDelay(delayTime*1000);return message;},gmallCorrelationData);//  默认返回return true;
}

修改retrySendMsg方法 – 添加判断是否属于延迟消息

//  判断是否属于延迟消息
if (gmallCorrelationData.isDelay()){//  属于延迟消息this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),message -> {//  设置延迟时间message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime()*1000);return message;},gmallCorrelationData);
}else {//  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);
}

利用封装好的工具类 测试发送延迟消息

//  基于延迟插件的延迟消息
@GetMapping("sendDelay")
public Result sendDelay(){//  声明一个时间对象SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("发送时间:"+simpleDateFormat.format(new Date()));this.rabbitService.sendDelayMsg(DelayedMqConfig.exchange_delay,DelayedMqConfig.routing_delay,"iuok",3);return Result.ok();
}

重试了4次,所以我们需要保证幂等性
在这里插入图片描述
结果会 回发送三次,也被消费三次!
如何保证消息幂等性?
1.使用数据方式
2.使用redis setnx 命令解决 — 推荐

@SneakyThrows
@RabbitListener(queues = DelayedMqConfig.queue_delay_1)
public void getMsg2(String msg,Message message,Channel channel){//  使用setnx 命令来解决 msgKey = delay:iuokString msgKey = "delay:"+msg;Boolean result = this.redisTemplate.opsForValue().setIfAbsent(msgKey, "0", 10, TimeUnit.MINUTES);//  result = true : 说明执行成功,redis 里面没有这个key ,第一次创建, 第一次消费。//  result = false : 说明执行失败,redis 里面有这个key//  不能: 那么就表示这个消息只能被消费一次!  那么第一次消费成功或失败,我们确定不了!  --- 只能被消费一次!//        if (result){//            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//            System.out.println("接收时间:"+simpleDateFormat.format(new Date()));//            System.out.println("接收的消息:"+msg);//            //  手动确认消息//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//        } else {//          //    不能消费!//        }//  能: 保证消息被消费成功    第二次消费,可以进来,但是要判断上一个消费者,是否将消息消费了。如果消费了,则直接返回,如果没有消费成功,我消费。//  在设置key 的时候给了一个默认值 0 ,如果消费成功,则将key的值 改为1if (!result){//  获取缓存key对应的数据String status = (String) this.redisTemplate.opsForValue().get(msgKey);if ("1".equals(status)){//  手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);return;} else {//  说明第一个消费者没有消费成功,所以消费并确认SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收时间:"+simpleDateFormat.format(new Date()));System.out.println("接收的消息:"+msg);//  修改redis 中的数据this.redisTemplate.opsForValue().set(msgKey,"1");channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);return;}}SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收时间:"+simpleDateFormat.format(new Date()));System.out.println("接收的消息:"+msg);//  修改redis 中的数据this.redisTemplate.opsForValue().set(msgKey,"1");//  手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/90960.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微软杀入Web3:打造基于区块链的AI产品

作者&#xff1a;秦晋 2023年1月&#xff0c;微软向 ChatGPT 创建者 OpenAI 投资 100 亿美元&#xff0c;在AI业界引发格外关注。此举也让微软在AI的战略探索上提前取得有利位置。 2023年3月&#xff0c;微软软件工程师 Albacore 披露微软正在为Edge 浏览器测试内置的非托管加密…

全新 – Amazon EC2 M1 Mac 实例

去年&#xff0c;在 re: Invent 2021 大会期间&#xff0c;我写了一篇博客文章&#xff0c;宣布推出 EC2 M1 Mac 实例的预览版。我知道你们当中许多人请求访问预览版&#xff0c;我们尽了最大努力&#xff0c;却无法让所有人满意。不过&#xff0c;大家现在已经无需等待了。我很…

LeNet中文翻译

Gradient-Based Learning Applied to Document Recognition 基于梯度的学习应用于文档识别 摘要 使用反向传播算法训练的多层神经网络构成了成功的基于梯度的学习技术的最佳示例。给定适当的网络架构&#xff0c;基于梯度的学习算法可用于合成复杂的决策表面&#xff0c;该决策…

C++ STL priority_queue

目录 一.认识priority_queue 二. priority_queue的使用 三.仿函数 1.什么是仿函数 2.控制大小堆 3.TopK问题 四.模拟实现priority_queue 1.priority_queue的主要接口框架 2.堆的向上调整算法 3.堆的向下调整算法 4.仿函数控制大小堆 五.priority_queue模拟实现整体代码和测…

用js快速生成一个简单的css原子库 例如: .mr-18 .pl-18

第三方css原子库的缺点 比如 tailwindcss&#xff0c;有学习成本最开始写的时候效率可能还没有我们自己手写效率高&#xff0c;需要配置&#xff0c;会有原始样式被覆盖的问题&#xff1b;总之就是一个字重 自己搓的优点 学习成本低灵活不会有副作用 <!DOCTYPE html>…

Redis详解

Redis 简介 Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的高性能键值对存储数据库&#xff0c;最初由 Salvatore Sanfilippo 开发&#xff0c;它在内存中存储数据&#xff0c;并提供了持久化功能&#xff0c;可以将数据保存到磁盘中&#xff0c;是一种N…

Stephen Wolfram:那么…ChatGPT 在做什么,为什么它有效呢?

So … What Is ChatGPT Doing, and Why Does It Work? 那么…ChatGPT在做什么&#xff0c;为什么它有效呢&#xff1f; The basic concept of ChatGPT is at some level rather simple. Start from a huge sample of human-created text from the web, books, etc. Then train…

阿里云内容审核服务使用(图片审核)

说明&#xff1a;在项目中&#xff0c;我们经常会对用户上传的内容&#xff08;如文字、图片&#xff09;等资源内容进行审核&#xff0c;审核包括两方面&#xff0c;一方面是内容与描述不符&#xff0c;一方面是违反法律法规。本文介绍使用阿里提供的内容审核服务&#xff0c;…

mqttfx连上OneNET生成token时的一大坑,报用户名或密码错误

整个流程如下连接&#xff1a; MQTT.fx和MQTTX 链接ONENET物联网开发平台避坑细节干货。 其中在生成token时&#xff0c;搞了半天在连接后都会报用户名密码错误 最后发现是格式问题&#xff0c;输入所有字符后一定要双击看是否可以全选中&#xff0c;可以全选中说明字符的格式…

第G1周:生成对抗网络(GAN)入门

&#x1f368; 本文为[&#x1f517;365天深度学习训练营]内部限免文章&#xff08;版权归 *K同学啊* 所有&#xff09; &#x1f356; 作者&#xff1a;[K同学啊] 一、理论基础 生成对抗网络&#xff08;Generative Adversarial Networks, GAN&#xff09;是近年来深度学习领域…

adb对安卓app进行抓包(ip连接设备)

adb对安卓app进行抓包&#xff08;ip连接设备&#xff09; 一&#xff0c;首先将安卓设备的开发者模式打开&#xff0c;提示允许adb调试 二&#xff0c;自己的笔记本要和安卓设备在同一个网段下&#xff08;同连一个WiFi就可以了&#xff09; 三&#xff0c;在笔记本上根据i…

【Git】安装以及基本操作

目录 一、初识Git二、 在Linux底下安装Git一&#xff09;centOS二&#xff09;Ubuntu 三、 Git基本操作一&#xff09; 创建本地仓库二&#xff09;配置本地仓库三&#xff09;认识工作区、暂存区、版本库四&#xff09;添加文件五&#xff09;查看.git文件六&#xff09;修改文…

【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

痛点背景 业务场景 假设有这么一个需求&#xff0c;用户下单后如果30分钟未支付&#xff0c;则该订单需要被关闭。你会怎么做&#xff1f; 之前方案 最简单的做法&#xff0c;可以服务端启动个定时器&#xff0c;隔个几秒扫描数据库中待支付的订单&#xff0c;如果(当前时间-订…

RocketMQ双主双从同步集群部署

&#x1f388; 作者&#xff1a;互联网-小啊宇 &#x1f388; 简介&#xff1a; CSDN 运维领域创作者、阿里云专家博主。目前从事 Kubernetes运维相关工作&#xff0c;擅长Linux系统运维、开源监控软件维护、Kubernetes容器技术、CI/CD持续集成、自动化运维、开源软件部署维护…

Java接口压力测试—如何应对并优化Java接口的压力测试

导言 在如今的互联网时代&#xff0c;Java接口压力测试是评估系统性能和可靠性的关键一环。一旦接口不能承受高并发量&#xff0c;用户体验将受到严重影响&#xff0c;甚至可能导致系统崩溃。因此&#xff0c;了解如何进行有效的Java接口压力测试以及如何优化接口性能至关重要…

Linux系统USB摄像头测试程序(二)_读取配置

1、收先安装gtk3&#xff0c;我的测试机器是ubutn16.04&#xff0c;只要执行下面的安装命令就可以了 apt-get install libgtk-3-dev 使用下列命令验证是否安装好gtk3&#xff1a; pkg-config --cflags --libs gtk-3.0 2、显示结果类似如下&#xff1a; -pthre…

这是一篇关于SQL 脚本表间连接join的可视化说明

使用SQL合并两个数据集可以通过JOINS来完成。JOIN是查询的FROM子句中的SQL指令&#xff0c;用于标识要查询的表以及它们应该如何组合。 主键和外键 通常&#xff0c;在关系数据库中&#xff0c;数据被组织到由属性&#xff08;列&#xff09;和记录&#xff08;行&#xff09…

MySQL运维

日志 错误日志 show VARIABLES like %log_error%;使用 tail -f 错误文件路径 可以查看具体错误二进制日志 show variables like %log_bin%;在my.ini文件下的mysqlID下添加 log_binmysql-bin binlog-formatROW重启就开启binlog了 show VARIABLES like %binlog_format%;mys…

i18n 配置vue项目中英文语言包(中英文转化)

一、实现效果 二、下载插件创建文件夹 2.1 下载cookie来存储 npm install --save js-cookienpm i vue-i18n -S 2.2 封装组件多页面应用 2.3 创建配置语言包字段 三、示例代码 3.1 main.js 引用 i18n.js import i18n from ./lang// 实现语言切换:i18n处理element&#xff0c…

屏蔽socket 实例化时,握手阶段报错信息WebSocket connection to ‘***‘ failed

事情起因是这样的&#xff1a; 我们网站是需要socket链接实行实时推送服务&#xff0c;有恶意竞争对手通过抓包或者断网&#xff0c;获取到了我们的socket链接地址&#xff0c;那么他就可以通过java写一个脚本无限链接这个socket地址。形成dos攻击。使socket服务器资源耗尽&…