通过rabbitmq生成延时消息,并生成rabbitmq镜像

通过rabbitmq生成延时消息队列,并生成rabbitmq镜像

  • 整体描述
    • 1. 使用场景
    • 2. 目前问题
    • 3. 前期准备
  • 具体步骤
    • 1. 拉取镜像
    • 2. 运行镜像
    • 3. 安装插件
    • 4. 代码支持
      • 4.1 config文件
      • 4.2 消费监听
      • 4.2 消息生产
    • 5. 功能测试
  • 镜像操作
    • 1. 镜像制作
    • 2. 镜像导入
  • 总结

整体描述

1. 使用场景

在使用消息队列时,我们有时候需要生成一些延时消息,比如判断一个任务的开始时间,我在创建任务的时候计算出此时距离任务开始的时间,然后往消息队列里发送一个延时消息,我们希望等到任务开始的时候,再消费此消息,此时任务开始,可以进行一些业务上的操作。

2. 目前问题

之前写过一篇创建rabbitmq镜像的文章,链接: 在centos搭建rabbitmq并制作docker镜像,使用的rabbitmq的版本是3.6.8,只能通过过期时间expiration来设置消息的过期时间,在消息过期的时候,会进入死信队列中,也能达到上述要求。但是,但是,这个过期时间expiration,rabbitmq在处理的时候有个坑,前面消息如果没有过期,后面的消息就算过期了,也不会触发,就是先发的消息没有到期,之后再发的消息就算到期了,也不会触发回调。这显然不行。

3. 前期准备

需要准备的主要就是docker环境,这个可以自行搜一下怎么安装docker环境,由于和本文主要讲的内容关系不大,略…

具体步骤

为了解决此问题,我们可以用延时队列插件来实现,这个插件时一个开发者写的,在github上但是已经被rabbitmq官方接受了,所以可以放心用。

1. 拉取镜像

首先我们先拉取一个rabbitmq的官方镜像进行操作,这个需要注意一下拉取的版本,由于延时队列的插件支持的版本是3.7之后的rabbitmq,所以需要拉取3.7之后的,我这拉取的是3.8.17版本。在命令行输入:

docker pull rabbitmq:3.8.17-management

注:这个如果报错,看下自己的docker环境有没有问题。带management是带管理页面的镜像,我们选用的带management的镜像,后期使用的时候好操作和定位问题。

2. 运行镜像

拉取成功之后,使用命令:

docker images

查看镜像是否拉取成功,如下就是成功了:
rabbitmq镜像
之前用的3.6.8的,不支持延时消息队列的插件…
然后运行镜像,创建容器并启动:

docker run --name rabbitmq-server -p 5672:5672 -p 15672:15672 -d rabbitmq:3.8.17-management

此时用:

docker ps -a

查看容器:
rabbitmq容器
容器已经创建并启动,我们通过web页面可以访问rabbitmq的管理页面,在浏览器输入:http://localhost:15672/
默认账号:guest,密码:guest
rabbitmq登录页面

3. 安装插件

此时rabbitmq已经运行,我们需要安装插件来支持延时消息队列,插件下载地址
选择相应的rabbitmq版本进行下载,注意版本不要选错了。下载完是一个rabbitmq_delayed_message_exchange-3.8.0.ez的文件,我们需要把这个文件上传到docker的/opt/rabbitmq/plugins目录下。
上传之后,进入/opt/rabbitmq/sbin目录执行如下命令让插件生效:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行之后看到如下就成功了:
插件启动成功
成功之后刷新一下管理页面,在新建交换机那里,type能多一个x-delayed-message的选项:
添加延时交换机
此时,我们的rabbitmq就配置完成了。

4. 代码支持

rabbitmq目前已经可以接收延时消息了,在代码端我们也需要进行相应的修改,以达到发送延时消息的目的。

4.1 config文件

package com.thcb.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ的配置类** @author thcb* @date 2023-09-05*/
@Configuration
public class RabbitMqConfig {// 交换机private static final String DELAYED_EXCHANGE = "delayed.exchange";// 队列private static final String DELAYED_QUEUE = "delayed.queue";// 路由private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";/*** 队列*/@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE);}/*** 交换机*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);}/*** 绑定延迟队列和交换机*/@Beanpublic Binding delayQueueBindingDelayExchange() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();}}

4.2 消费监听

package com.thcb.rabbitmq.recevier;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 消费监听** @author thcb* @date 2023-09-05*/
@Slf4j
@Component
public class DelayQueueReceiver {@RabbitListener(queues = "delayed.queue")public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到DelayedQueue消息:{}", new Date().toString(), msg);}}

4.2 消息生产

这里创建一个controller来生产消息,里面有两个接口,一个生产消息的延时时间是5秒,另一个是30秒,用来测试延时时间。

package com.thcb.rabbitmq.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 消息生产controller** @author thcb* @date 2023-09-05*/
@RestController
@RequestMapping("/HelloController")
public class HelloController {private static final Logger log = LoggerFactory.getLogger(HelloController.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@RequestMapping("/sendXDLMessage1")@ResponseBodypublic String sendXDLMessage1() {int time = 5000;String message = "{\"type\":\"sendXDLMessage1\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage1 success";}@RequestMapping("/sendXDLMessage2")@ResponseBodypublic String sendXDLMessage2() {int time = 30000;String message = "{\"type\":\"sendXDLMessage2\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage2 success";}
}

5. 功能测试

代码修改完,就可以测试了,启动工程之后,在rabbitmq管理页面能看到自动创建了如下交换机和队列:
创建的交换机
创建的队列
可以看到交换机的类型是x-delayed-message。
接下来就可以调用测试接口,生产2条消息看看了。先调用sendXDLMessage2接口,生产一个延时30秒的消息,过一会再调用sendXDLMessage1的接口,生产一个延时5秒的消息。log结果如下:
运行结果
结果符合我们的预期,先发的30秒延时消息消息2,之后发的5秒延时消息1,然后过了5秒消息1先回调,之后30秒消息2回调。

镜像操作

使用docker主要就是要制作镜像,之后直接就可以用了要不每次还得配置。提示制作之前,把现在的队列和交换机都删除,队列和交换机是通过代码创建的,账号可以换一个,默认的guest不太安全。
一切都准备就绪,就可以制作镜像了。

1. 镜像制作

将镜像打包成tar文件。

docker commit 【镜像id】 rabbitmq:3.8.17
docker save -o rabbitmq-3.8.17.tar rabbitmq:3.8.17

2. 镜像导入

制作完镜像进行导入

docker load <rabbitmq-3.8.17.tar
docker run -d -p 5672:5672 -p 15672:15672 --privileged --restart=always --name rabbitmq rabbitmq:3.8.17

总结

以上就是rabbitmq延时消息的相关内容,另外这个延时消息在消息很多的情况下可能会有一些性能问题,使用的时候需要注意一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/130811.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023年中国光伏行业研究报告

第一章 行业概况 1.1 定义 光伏行业&#xff0c;也称为太阳能光伏行业&#xff0c;是一个专注于利用光伏技术将太阳能转化为电能的领域。该行业涵盖了太阳能电池的制造、光伏系统的设计、安装和维护&#xff0c;以及电能的销售和供应。光伏技术的核心是光伏效应&#xff0c;通…

java设计模式之观察者模式

. 基本概念 观察者&#xff08;Observer&#xff09;模式中包含两种对象&#xff0c;分别是目标对象和观察者对象。在目标对象和观察者对象间存在着一种一对多的对应关系&#xff0c;当这个目标对象的状态发生变化时&#xff0c;所有依赖于它的观察者对象都会得到通知并执行它…

图片怎么压缩大小?这样压缩图片很简单

在日常生活中&#xff0c;我们常常需要处理各种各样的图片文件&#xff0c;但有时候图片的大小可能会成为问题。比如在上传图片到网站或者将图片发送给朋友时&#xff0c;过大的图片可能会导致上传速度变慢或者占用过多内存。这时&#xff0c;我们就需要用到图片压缩了&#xf…

python 语法入门

文章目录 前言python 语法入门1. 语句分隔符2. 注释3. pep8规范4. 变量5. 扩展5.1. 运行此行代码的过程 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会…

SegGPT: Segmenting Everything In Context论文笔记

论文https://arxiv.org/pdf/2304.03284.pdfCodehttps://github.com/baaivision/Painter 文章目录 1. 背景2. Motivation3. Method3.1 In-Context Coloring3.2 Context Ensemble3.3 In-Context Tuning 1. 背景 在Painter中&#xff0c;将各种密集预测任务视为一种着色问题。 在…

类和对象(1)

文章目录 1.面向过程和面向对象初步认识2.类的引入3.类的定义4.类的访问限定符和封装4.1访问限定符4.2封装 5.类的作用域6.类的实例化6.2结构体内存对齐规则 7.this指针7.2this指针的特性 封装&#xff08;补充&#xff09; 1.面向过程和面向对象初步认识 C面向对象但不纯面向…

【Python】爬虫基础

爬虫是一种模拟浏览器实现&#xff0c;用以抓取网站信息的程序或者脚本。常见的爬虫有三大类&#xff1a; 通用式爬虫&#xff1a;通用式爬虫用以爬取一整个网页的信息。 聚焦式爬虫&#xff1a;聚焦式爬虫可以在通用式爬虫爬取到的一整个网页的信息基础上只选取一部分所需的…

关于运行franka_ros包中的franka_gazebo报错VMware: vmw_ioctl_command error 无效的参数.

参考的博文&#xff0c;感谢&#xff0c;解决Vmware下虚拟机下打开gazebo报错 &#xff0c;VMware: vmw_ioctl_command error 无效的参数. 首先第一个VMware: vmw_ioctl_command error 无效的参数的问题。这应该是虚拟机的bug&#xff0c;毕竟使用虚拟机和真实的物理机上是有差…

Error from server (NotFound): pods “nginx-57d84f57dc-b866m“ not found

原因&#xff1a;机房断电&#xff0c;导致服务重启 [rootmaster1 logs]# kubectl get pod NAME READY STATUS RESTARTS AGE nginx-57d84f57dc-57fkf 1/1 Running 0 75s [rootmaster1 logs]# kubectl logs -f nginx-5…

CPU-主存储器-副存储器-RAM-ROM-内存-运存-外存-硬盘-闪存-GPU-显存——关于这一系列概念的理解

文章目录 概念梳理CPU主存储器/内存/RAM/运存ROM副存储器/外存硬盘&#xff08;电脑&#xff09;、闪存&#xff08;手机&#xff09;GPU显存 可参考的手机内部结构示意图 做计算机组成原理上的题的时候&#xff0c;发觉自己对RAM和ROM的概念理解有所缺失&#xff0c;在看完一些…

QT Pyside2 Designer 的基本使用

文章目录 前言PySide2PySide2 Designer 一、安装PySide2、PyQt5二、使用designer.exe2.1 工具的大致介绍2.2 创建一个新的UI2.3 UI文件另存为/保存(CtrlS)2.4 使用python操作UI文件 总结 前言 PySide2 QT PySide2 是一个用于 Python 编程语言的开源框架&#xff0c;它提供了与…

Faster Rcnn

一、公用特征Feature Maps的获取 二、Region Proposal Network Feature Maps[bs,1024,38,38]经过3*3卷积&#xff0c;然后分别经过两个1*1的卷积&#xff0c;通道数分别为18&#xff0c;36 18 9*2 代表每个位置9个先验框为背景和目标的概率 36 9*4 代表每个位置9个先验框的…

pinduoduo(商品详情)API接口

为了进行电商平台 的API开发&#xff0c;首先我们需要做下面几件事情。 1&#xff09;开发者注册一个账号 2&#xff09;然后为每个pinduoduo应用注册一个应用程序键&#xff08;App Key) 。 3&#xff09;下载pinduoduo API的SDK并掌握基本的API基础知识和调用 4&#xff…

【Terraform】Terraform自动创建云服务器脚本

Terraform 是由 HashiCorp 创建的开源“基础架构即代码”工具 &#xff08;IaC&#xff09; 使用HCL&#xff08;配置语言&#xff09;描述云平台基础设施&#xff08;这里教你使用低级基础设施&#xff1a;交换机、云服务器、VPC、带宽&#xff09; Terraform提供者&#xf…

数据结构和算法(5):二叉树

树 向量允许通过下标或秩&#xff0c;在常数的时间内找到目标对象&#xff1b;然而&#xff0c;一旦需要对这类结构进行修改&#xff0c;那么无论是插入还是删除&#xff0c;都需要耗费线性的时间。 列表允许借助引用或位置对象&#xff0c;在常数的时间内插入或删除元素&…

Springboot整合JWT完成验证登录

目录 一、引入依赖二、JwtUtil 代码解读三、LoginController 代码解读四、整体代码五、结果展示 一、引入依赖 <dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version></depende…

el-table 实现表、表格行、表格列合并

最近写vue开发项目的时候&#xff0c;很多地方用到了Element组件中的Table 表格。经过一周的边学边做&#xff0c;我总结了以下三种有关表格的合并方法。 一、合并表头 话不多说&#xff0c;先看效果图 代码如下&#xff1a; 表格结构如上&#xff0c;其中:header-cell-style对…

233062C++QTday5

实现一个图形类&#xff08;Shape&#xff09;&#xff0c;包含受保护成员属性&#xff1a;周长、面积&#xff0c; 公共成员函数&#xff1a;特殊成员函数书写 定义一个圆形类&#xff08;Circle&#xff09;&#xff0c;继承自图形类&#xff0c;包含私有属性&#xff1a;半…

自动化测试框架unittest与pytest的区别!

引言   前面文章已经介绍了python单元测试框架&#xff0c;大家平时经常使用的是unittest&#xff0c;因为它比较基础&#xff0c;并且可以进行二次开发&#xff0c;如果你的开发水平很高&#xff0c;集成开发自动化测试平台也是可以的。而这篇文章主要讲unittest与pytest的区…

QQ 逻辑漏洞可执行文件 漏洞复现

本文由掌控安全学院- wax 投稿 首先拿到QQ的版本&#xff0c;目前可测试版本包括QQ最新版本&#xff0c;TIM最新版本 新创建一个bat文件&#xff08;这个可以随意&#xff0c;上马的也可以&#xff0c;exe也可以&#xff09; &#xff0c;本次测试内容如下首先向你的手机端发一…