使用RabbitMQ实现延迟消息的完整指南

在分布式系统中,消息队列通常用于解耦服务,RabbitMQ是一个广泛使用的消息队列服务。延迟消息(也称为延时队列或TTL消息)是一种常见的场景应用,特别适合处理某些任务在一段时间后执行的需求,如订单超时处理、延时通知等。

本文将以具体代码为例,展示如何使用RabbitMQ来实现延迟消息处理,涵盖队列和交换机的配置、消息的发送与接收以及死信队列的处理。

什么是延迟消息?

延迟消息是指消息在发送到队列后,经过设定的时间延迟再被消费。RabbitMQ 本身没有直接支持延迟队列的功能,但可以通过 TTL(Time To Live)+ 死信队列(Dead Letter Queue, DLQ) 的组合来实现。当消息超过TTL(消息存活时间)后,不会被立即消费,而是会被转发到绑定的死信队列,从而实现延迟处理。

RabbitMQ中的延迟消息原理

在RabbitMQ中,我们可以通过以下几个概念来实现延迟消息:

  1. TTL(Time To Live):可以为队列设置TTL,消息超过该时间后会被标记为“死信”。
  2. 死信队列(Dead Letter Queue):当消息在正常队列中过期或处理失败时,RabbitMQ可以将它们路由到一个死信队列,死信队列可以用来处理这些过期或未处理的消息。
  3. x-dead-letter-exchangex-dead-letter-routing-key:可以通过配置队列的参数,将过期消息发送到一个专门的死信交换器,并根据指定的路由键转发到死信队列。

 

 消息来到ttl.queue消息队列,过期时间内无人消费,消息来到死信交换机hmall.direct,在direct.queue消息队列无需等待。

1. RabbitMQ的配置

首先,我们需要配置两个队列和两个交换机:一个用于存放延时消息,另一个用于处理超时的死信消息。

package com.heima.stroke.configuration;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 延迟时间 单位:毫秒 (这里设为30秒)private static final long DELAY_TIME = 1000 * 30;// 行程超时队列public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";// 行程死信队列public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";// 行程超时队列交换机public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";// 行程死信队列交换机public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";// 行程超时交换机 Routing Keypublic static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";// 行程死信交换机 Routing Keypublic static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";/*** 声明行程超时队列,并设置其参数* x-dead-letter-exchange:绑定的死信交换机* x-dead-letter-routing-key:死信路由Key* x-message-ttl:消息的过期时间*/@Beanpublic Queue strokeOverQueue() {Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);args.put("x-message-ttl", DELAY_TIME); // 设置TTL为30秒return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();}@Beanpublic DirectExchange strokeOverQueueExchange() {return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);}@Beanpublic Binding bindingStrokeOverDirect() {return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);}
}

解释:

TTL设置:我们通过x-message-ttl设置消息的过期时间为30秒。

死信队列绑定:通过x-dead-letter-exchangex-dead-letter-routing-key设置,当消息过期时,它会被转发到死信交换机,再路由到死信队列。

2. 生产者发送延迟消息

接下来,我们通过生产者向超时队列发送消息,这些消息将在TTL过期后转发到死信队列。

package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MQProducer {private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);@AutowiredRabbitTemplate rabbitTemplate;/*** 发送延时消息到行程超时队列** @param strokeVO 消息体*/public void sendOver(StrokeVO strokeVO) {String mqMessage = JSON.toJSONString(strokeVO);logger.info("send timeout msg:{}", mqMessage);rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);}
}

解释:

sendOver 方法将消息发送到超时队列,消息将在超时后进入死信队列。生产者不需要额外处理TTL或死信的配置,只需发送消息即可。

3. 消费者监听死信队列

当消息超过TTL后,将会被转发到死信队列。消费者需要监听死信队列并处理这些消息。

j

package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Component
public class MQConsumer {private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);@Autowiredprivate StrokeHandler strokeHandler;/*** 监听死信队列** @param message 消息体* @param channel RabbitMQ的Channel* @param tag 消息的Delivery Tag*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),key = RabbitConfig.STROKE_DEAD_KEY)})@RabbitHandlerpublic void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);logger.info("get dead msg:{}", message.getBody());if (strokeVO == null) {return;}try {// 处理超时的行程消息strokeHandler.timeoutHandel(strokeVO);// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {e.printStackTrace();}}
}

解释:

@RabbitListener 注解绑定了死信队列的监听器。当消息被转发到死信队列时,该消费者会接收到消息。

使用 channel.basicAck(tag, false) 手动确认消息处理成功,确保消息不会重复消费。

4. 处理超时业务逻辑

在我们的业务中,当消息超时未处理时,将其状态设置为超时。

public void timeoutHandel(StrokeVO strokeVO) {// 获取司机行程ID和乘客行程IDString inviterTripId = strokeVO.getInviterTripId();String inviteeTripId = strokeVO.getInviteeTripId();// 检查邀请状态是否为未确认String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {// 更新为超时状态redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/457018.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

零基础Java第十期:类和对象(一)

目录 一、拜访对象村 1.1. 什么是面向对象 1.2. 面向对象与面向过程 二、类定义和使用 2.1. 类的定义格式 2.2. 类的定义练习 三、类的实例化 3.1. 什么是实例化 3.2. 类和对象的说明 四、this引用 4.1. 什么是this引用 4.2. this引用的特性 一、拜访对象村 在…

使用python代码绘制好看的统计图

代码功能 上述代码使用 matplotlib 和 seaborn 生成四种不同的统计图&#xff0c;具体如下&#xff1a; 玫瑰图&#xff1a;在极坐标上绘制柱状图&#xff0c;展示不同角度的数值分布。雷达图&#xff1a;绘制多维数据的雷达图&#xff0c;用于对比不同维度的数值。热力图&am…

<项目代码>YOLOv8煤矿输送带异物识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

java项目之基于web的智慧社区设计与实现(springboot)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的基于web的智慧社区设计与实现。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 基于web的智…

【优先算法】双指针 --(结合例题讲解解题思路)(C++)

今日鸡汤&#xff1a; “无人负我青云志&#xff0c;我自踏雪至山巅。” -徐霞客《青云志》 释义&#xff1a;没有人能够帮助我实现我的理想&#xff0c;即使面对再大的困难&#xff0c;我也要踏着积雪&#xff0c;一步步&#xff0c;到达山巅。 目录 1.快乐数 2.盛最多的…

【Unity 安装教程】

Unity 中国官网地址链接 Unity - 实时内容开发平台 | 3D、2D、VR & AR可视化https://unity.cn/首先我们想要安装Unity之前&#xff0c;需要安装Unity Hub&#xff1a; Unity Hub 是 Unity Technologies 开发的一个集成软件&#xff0c;它为使用 Unity 引擎的开发者提供了一…

大一物联网要不要转专业,转不了该怎么办?

有幸在2014年&#xff0c;踩中了物联网的风口&#xff0c;坏消息&#xff0c;牛马的我&#xff0c;一口汤都没喝上。 依稀记得&#xff0c;当时市场部老大&#xff0c;带我去上海参加电子展会&#xff0c;印象最深的&#xff0c;一些物联网云平台&#xff0c;靠着一份精美PPT&a…

从汇编角度看C/C++函数指针与函数的调用差异

函数指针本质上是一个指针变量&#xff0c;只不过这个变量保存的地址是一个函数的地址&#xff0c;那么直接调用函数和通过函数指针调用有没有区别呢&#xff1f;答案是有的&#xff0c;下面的代码是一个直接调用函数和通过指针调用函数的例子&#xff0c;使用gdb反汇编main函数…

蓝桥杯题目理解

1. 一维差分 1.1. 小蓝的操作 1.1.1. 题目解析&#xff1a; 这道题提到了对于“区间”进行操作&#xff0c;而差分数列就是对于区间进行操作的好方法。 观察差分数列&#xff1a; 给定数列&#xff1a;1 3 5 2 7 1 差分数列&#xff1a;1 2 2 -3 5 6 题目要求把原数组全部…

【2024|滑坡数据集论文解读1】CAS滑坡数据集:用于深度学习滑坡检测的大规模多传感器数据集

【2024|滑坡数据集论文解读1】CAS滑坡数据集&#xff1a;用于深度学习滑坡检测的大规模多传感器数据集 【2024|滑坡数据集论文解读1】CAS滑坡数据集&#xff1a;用于深度学习滑坡检测的大规模多传感器数据集 文章目录 【2024|滑坡数据集论文解读1】CAS滑坡数据集&#xff1a;用…

【TFR-Net】基于transformer的鲁棒多模态情感分析特征重构网络

代码地址&#xff1a;TFR-Net/models at main thuiar/TFR-Net GitHub abstract&#xff1a; 提高对数据缺失的鲁棒性已经成为多模态情感分析&#xff08;MSA&#xff09;的核心挑战之一&#xff0c;MSA旨在从语言、视觉和声学信号中判断说话者的情感。在目前的研究中&#…

HBuilder X 中Vue.js基础使用->计算属性的应用(三)

一、通过简单的计算属性&#xff1a;对两数进行加法&#xff0c;减法&#xff0c;乘法&#xff0c;除法运算 <template><div><h1>computed 计算属性</h1><el-input type"text" v-model"numOne" /> <el-input type"t…

D49【python 接口自动化学习】- python基础之类

day49 类的常见错误 学习日期&#xff1a;20241026 学习目标&#xff1a;类 -- 63 避坑指南&#xff1a;类的常见错误 学习笔记&#xff1a; 语法错误 设计错误 总结 self 是刚开始学习面向对象编程时&#xff0c;最容易忽略的语法编写多个类时&#xff0c;解决依赖关系是…

51单片机快速入门之 AD(模数) DA(数模) 转换 2024/10/25

51单片机快速入门之 AD(模数) DA(数模) 转换 2024/10/25 声明:本文图片来源于网络 A模拟信号特点: 电压或者电流 缓慢上升 随着时间连续缓慢上升或下降 D数字信号特点:电压或者电流 保持一段时间的高/低电平 状态 / 突变 (高电压瞬间低电压) 数字电路中 通常将0-1v电压称…

Segugio:一款针对恶意软件的进程执行跟踪与安全分析工具

关于Segugio Segugio是一款功能强大的恶意软件安全分析工具&#xff0c;该工具允许我们轻松分析恶意软件执行的关键步骤&#xff0c;并对其进行跟踪分析和安全审计。 Segugio允许执行和跟踪恶意软件感染过程中的关键步骤&#xff0c;其中包括从点击第一阶段到提取恶意软件的最…

STM32Lx GXHT3x SHT3x iic 驱动开发应用详解

简介 项目开发过程中&#xff0c;采用STM32L151 为主控芯片进行设计&#xff0c;并外接GXHT3x进行温湿度数据采集。这里MCU采用片上IIC与GXHT3x进行数据交互&#xff0c;本文详细记录了开发过程&#xff0c;为今后的项目提供参考&#xff0c;加速项目开发进度。 硬件设计 相…

【WebGis开发 - Cesium】三维可视化项目教程---图层管理拓展图层顺序调整功能

目录 引言一、为什么要开发图层顺序调整功能二、开发思路整理1. 拖拽库方案选择2. cesium图层api查询 三、代码编写1. 编写拖拽组件代码2. 修改原有图层管理代码2.1 图层加载移除的调整2.2 图层顺序与拖拽列表的矛盾 3. 编写图层移动代码 四、总结 引言 本教程主要是围绕Cesium…

Linux-Centos操作系统备份及还原(整机镜像制作与还原)--再生龙

适用场景 Linux系统设备需要备份整机数据&#xff0c;或者需要还原到多台设备上。适用再生龙工具进行整机备用和还原。 镜像制作 下载再生龙镜像&#xff1a;clonezilla-live-2.6.4-10-amd64.iso&#xff0c;制作启动盘-设置U盘启动 启动后界面如下选择第四项other modes of…

使用Python来下一场深夜雪

效果图&#xff1a;&#xff08;真实情况是动态的&#xff09; 完整代码&#xff1a; import turtle import random# 初始化画布 turtle.bgcolor("#001f3f") # 偏深蓝色的背景 turtle.title("下雪的画面") turtle.speed(0) turtle.hideturtle() turtle.t…

steam新品节!GameViewr远程随时随地手机平板玩主机游戏教程

Steam平台在10月14日迎来了新品节&#xff0c;你可以尝试即将推出的游戏的免费试用版&#xff0c;将他们加入愿望单&#xff0c;像是《迷失之径》《贪婪大地》《疯狂手机大亨》等等。不知道大家是否已经选择好自己心怡的游戏呢&#xff1f;要是你想随时随地体验steam新品节的游…