RabbitMQ的安装与使用

RabbitMQ的安装与使用

  • 介绍
  • 一、RabbitMQ的安装
    • 1 查找镜像
    • 2 拉取镜像
    • 3 查看镜像
    • 4 创建容器
    • 5 查看容器
    • 6 访问测试
  • 二、RabbitMQ的使用
    • 1 创建项目
    • 2 配置文件
    • 3 队列配置文件
    • 4 消费者
    • 5 生产者
    • 6 测试
  • 三、交换器
  • 四、普通队列Demo
  • 五、死信队列Demo
    • 1 介绍
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费者
      • 2.4 死信消费者
      • 2.5 结果
  • 六、延时队列Demo
    • 1 安装延迟插件
      • 1.1 下载插件
      • 1.2 将插件拷贝到RabbitMQ容器的插件目录
      • 1.3 进入到容器
      • 1.4 开启插件
      • 1.5 查看
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费产者
      • 2.4 结果

介绍

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。
linux系统中安装RabbitMQ比较繁琐,这里使用的是Docker安装。

一、RabbitMQ的安装

1 查找镜像

docker search rabbitmq:management

在这里插入图片描述

2 拉取镜像

docker pull macintoshplus/rabbitmq-management

在这里插入图片描述

3 查看镜像

docker images

在这里插入图片描述

4 创建容器

docker run -d --hostname mzw-rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 c20

命令解读:

  • 运行一个镜像
  • -d 后台守护运行
  • –hostname mzw-rabbitmq 指定主机名称
  • –name 指定容器名称
  • -e RABBITMQ_DEFAULT_USER=admin 指定用户名
  • -e RABBITMQ_DEFAULT_PASS=admin 指定密码
  • -p 15672:15672 -p 5672:5672 端口映射
  • c20 镜像ID 简写
    在这里插入图片描述

5 查看容器

docker ps

在这里插入图片描述

6 访问测试

访问地址:http://192.168.2.xx:15672/
在这里插入图片描述
输入启动容器时设置的用户密码登录
在这里插入图片描述
这就表示RabbitMQ安装成功了

二、RabbitMQ的使用

1 创建项目

创建SpringBoot项目并引入相关依赖
在这里插入图片描述

2 配置文件

# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue

3 队列配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;// 添加@Configuration 注解,表示一个注解类
@Configuration
public class QueueConfig {@Value("${mq.queue.name}")private String queueName;/*** 初始化短信队列* @return*/@Beanpublic Queue delayedSmsQueueInit() {return new Queue(queueName);}
}

4 消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 创建一个rabbitmq消费者*/
@Component
public class Receiver {// 接受MQ消息 并 处理消息@RabbitListener(queues = {"${mq.queue.name}"})public void process(String msg){// 处理消息System.out.println("我是MQ消费者,我接收到的消息是:" + msg );}
}

5 生产者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 消息提供者*/
@Component
public class Sender {@Autowiredprivate AmqpTemplate template;@Value("${mq.queue.name}")private String queueName;// 发送消息public void send(String msg){// 队列名,消息内容template.convertAndSend(queueName,msg);}}

6 测试

  • 发送消息
    @Autowired
    private Sender sender;
    @Test
    void contextLoads() {sender.send("你好啊......");
    }
    
  • 接收消息
    在这里插入图片描述

三、交换器

RabbitMQ中有五种主要的交互器分别如下

交换器说明
direct发布与订阅 完全匹配
fanout广播
topic主体,规则匹配
fanout转发
custom自定义

四、普通队列Demo

上边已经演示,这里不重复演示。

五、死信队列Demo

1 介绍

死信队列就是在某种情况下,导致消息无法被正常消费(异常,过期,队列已满等),存放这些未被消费的消息的队列即为死信队列

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin###死信队列
mq.dlx.exchange=mq_dlx_exchange
mq.dlx.queue=mq_dlx_queue
mq.dlx.routingKey=mq_dlx_key
###备胎交换机
mq.exchange=mq_exchange
mq.queue=mq_queue
mq.routingKey=routing_key
  • 配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class MQConfig {/*** 普通交换机*/@Value("${mq.exchange}")private String mqExchange;/*** 普通队列*/@Value("${mq.queue}")private String mqQueue;/*** 普通路由key*/@Value("${mq.routingKey}")private String mqRoutingKey;/*** 死信交换机*/@Value("${mq.dlx.exchange}")private String dlxExchange;/*** 死信队列*/@Value("${mq.dlx.queue}")private String dlxQueue;/*** 死信路由*/@Value("${mq.dlx.routingKey}")private String dlxRoutingKey;/*** 声明死信交换机* @return DirectExchange*/@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(dlxExchange);}/*** 声明死信队列* @return Queue*/@Beanpublic Queue dlxQueue() {return new Queue(dlxQueue);}/*** 声明普通业务交换机* @return DirectExchange*/@Beanpublic DirectExchange mqExchange() {return new DirectExchange(mqExchange);}/*** 声明普通队列* @return Queue*/@Beanpublic Queue mqQueue() {// 普通队列绑定我们的死信交换机Map<String, Object> arguments = new HashMap<>(2);//死信交换机arguments.put("x-dead-letter-exchange", dlxExchange);//死信队列arguments.put("x-dead-letter-routing-key", dlxRoutingKey);return new Queue(mqQueue, true, false, false, arguments);}/*** 绑定死信队列到死信交换机* @return Binding*/@Beanpublic Binding binding(Queue dlxQueue,DirectExchange dlxExchange) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);}/*** 绑定普通队列到普通交换机* @return Binding*/@Beanpublic Binding mqBinding(Queue mqQueue,DirectExchange mqExchange) {return BindingBuilder.bind(mqQueue).to(mqExchange).with(mqRoutingKey);}
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;/*** 生产者*/
@RestController
@Slf4j
public class MQProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 普通交换机*/@Value("${mq.exchange}")private String mqExchange;/*** 普通路由key*/@Value("${mq.routingKey}")private String mqRoutingKey;@RequestMapping("/sendMsg")public String sendMsg() {String msg = "Hello RabbitMQ ......";//发送消息  参数一:交换机 参数二:路由键(用来指定发送到哪个队列)rabbitTemplate.convertAndSend(mqExchange, mqRoutingKey, msg, message -> {// 设置消息过期时间 10秒过期    如果过期时间内还没有被消费 就会发送给死信队列message.getMessageProperties().setExpiration("10000");return message;});log.info("生产者发送消息:{}", msg);return "success";}
}

2.3 消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Component
@Slf4j
public class MQConsumer {/*** 监听队列回调的方法** @param msg*/@RabbitListener(queues = {"${mq.queue}"})public void mqConsumer(String msg) {log.info("正常普通消费者消息MSG:{}", msg);}
}

2.4 死信消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信消费者*/
@Component
@Slf4j
public class MQDlxConsumer {/*** 死信队列监听队列回调的方法** @param msg*/@RabbitListener(queues = {"${mq.dlx.queue}"})public void mqConsumer(String msg) {log.info("死信队列消费普通消息:msg{}", msg);}}

2.5 结果

访问:http://127.0.0.1:9023/sendMsg 会被 消费者 消费掉
消费者 代码注释掉,在访问http://127.0.0.1:9023/sendMsg,等待10秒钟后会被死信队列接收到。
在这里插入图片描述

六、延时队列Demo

  • 两种方式:
    • 第一种:使用死信队列,将消息放入一个没有被监听的队列上,设置TTL(一条消息的最大存活时间)为延迟的时间,时间到了没有被消费,直接成为死信。监听死信队列来进行操作。
    • 第二种:使用rabbitmq官方提供的delayed插件来真正实现延迟队列。本文对第二种进行详解

1 安装延迟插件

官网下载:https://www.rabbitmq.com/community-plugins.html
我的RabbitMQ是3.12 b版本的,下载此插件
在这里插入图片描述

1.1 下载插件

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

在这里插入图片描述

1.2 将插件拷贝到RabbitMQ容器的插件目录

docker cp ./rabbitmq_delayed_message_exchange-3.12.0.ez de24369edeb4:/plugins

在这里插入图片描述

1.3 进入到容器

docker exec -it de24369edeb4 /bin/bash

1.4 开启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

1.5 查看

rabbitmq-plugins list

E* 或 e* 代表 插件已启用
在这里插入图片描述
在RabbitMQ控制台可以看到
在这里插入图片描述

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue
  • 配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 使用x-delayed-message 延时队列插件*/
@Configuration
public class QueueConfig {@Value("${mq.queue.name}")private String queueName;/*** 初始化短信队列* @return*/@Beanpublic Queue delayedSmsQueueInit() {return new Queue(queueName);}/*** 初始化延迟交换机* @return*/@Beanpublic CustomExchange delayedExchangeInit() {Map<String, Object> args = new HashMap<>();// 设置类型,可以为fanout、direct、topicargs.put("x-delayed-type", "direct");// 第一个参数是延迟交换机名字,第二个是交换机类型,第三个设置持久化,第四个设置自动删除,第五个放参数return new CustomExchange("delayed_exchange","x-delayed-message", true,false,args);}/*** 短信队列绑定到交换机* @param delayedSmsQueueInit* @param customExchange* @return*/@Beanpublic Binding delayedBindingSmsQueue(Queue delayedSmsQueueInit, CustomExchange customExchange) {// 延迟队列绑定延迟交换机并设置RoutingKey为smsreturn BindingBuilder.bind(delayedSmsQueueInit).to(customExchange).with("sms").noargs();}
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** 生产者*/
@RestController
@Slf4j
public class Sender {@Autowiredprivate AmqpTemplate template;@Value("${mq.queue.name}")private String queueName;// 发送消息@RequestMapping("/sendMsg")public void send(){String msg = "Hello RabbitMQ ......";// 队列名,消息内容template.convertAndSend(queueName,msg);log.info("生产者发送消息:{}", msg);}@RequestMapping("/sendDelayedMsg")public void sendDelayedMsg(){String msg = "Hello RabbitMQ Delayed ......";// 第一个参数是延迟交换机名称,第二个是Routingkey,第三个是消息主题,第四个是X,并设置延迟时间,单位		是毫秒template.convertAndSend("delayed_exchange","sms",msg,a -> {a.getMessageProperties().setDelay(2000);return a;});log.info("生产者发送延时消息:{}", msg);}
}

2.3 消费产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Component
@Slf4j
public class Receiver {// 接受MQ消息 并 处理消息@RabbitListener(queues = {"${mq.queue.name}"})public void process(String msg){// 处理消息log.info("我是MQ消费者,我接收到的消息是:{}", msg);}
}

2.4 结果

访问:http://127.0.0.1:9022/sendMsg
访问:http://127.0.0.1:9022/sendDelayedMsg
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/262179.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【非常详细!】QT基础【二万字长文】

&#x1f308;个人主页&#xff1a;godspeed_lucip &#x1f525; 系列专栏&#xff1a;QT从基础到进阶 1 QMake2 Qt中三个窗口部件的区别2.1 QMainWindow2.2 QWidget2.3 QDialog 3 Visual Studio的QT项目与QtCreater项目相互转换3.1 QtCreater项目转VS项目3.2 VS项目转QtCreat…

百度地图海量点方案趟坑记录(百度地图GL版 + MapVGL + vue3 + ts)

核心需求描述 不同层级有不同的海量图标展示底层海量图标需要展示文字拖动、放大缩小都需要重新请求数据并展示固定地图中心点&#xff08;拖动、放大缩小&#xff0c;中心点始终在地图中心&#xff09; 示例图片&#xff1a;&#xff08;某些图片涉及公司数据&#xff0c;就未…

苍穹外卖Day02——总结2

前期文章 文章标题地址苍穹外卖Day01——总结1https://blog.csdn.net/qq_43751200/article/details/135466359?spm1001.2014.3001.5501苍穹外卖Day01——解决总结1中存在的问题https://lushimeng.blog.csdn.net/article/details/135473412 总结2 前期文章1. 新增员工模块1.1 …

初阶数据结构之---顺序表和链表(C语言)

引言-线性表 线性表&#xff1a; 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构。线性表在逻辑上是线性结构&#xff0c;也就是说是连续的一条直线。但在物理上并不一定是连续的。线性表在物理上…

Django学习笔记-创建第一个django项目

1.创建一个虚拟环境的python项目 2.点击解释器设置 3.安装django包 4.终端选择Command Prompt 5.创建django项目运行django-admin startproject demo01(自命名) 6.修改连接数据库为mysql 7.修改语言(中国汉语)和时区(亚洲上海)USE_TZ改为False,否则时区不生效 8.修改TEMPLA…

并发List、Set、ConcurrentHashMap底层原理

并发List、Set、ConcurrentHashMap底层原理 ArrayList: List特点&#xff1a;元素有放入顺序&#xff0c;元素可重复 存储结构&#xff1a;底层采用数组来实现 public class ArrayList<E> extends AbstractList<E>implements List<E>, RandomAccess, Clon…

C 语言基本语法及实用案例分享

一、什么是 C 语言&#xff1f; C语言是一种较早的程序设计语言&#xff0c;诞生于1972年的贝尔实验室。1972 年&#xff0c;Dennis Ritchie 设计了C语言&#xff0c;它继承了B语言的许多思想&#xff0c;并加入了数据类型的概念及其他特性。C语言是一门面向过程的计算机编程语…

Unity NavMesh 清除不可行走区域

通常场景中物体设置为static或Navigation Static后&#xff0c;打开Navigation使用默认设置烘焙NavMesh&#xff0c;模型顶部和底部会出现蓝色网格&#xff0c;但其中有部分属于不可能到达区域&#xff0c;如下图 本文介绍两种可去掉NavMesh中不需要网格的方法&#xff1a; 方…

OpenCV运行gstreamer管道获取相机数据,处理以后,再交给gstreamer显示(QT实现)

效果: 前言 无意中发现,OpenCV也可以运行gstreamer的命令管道,然后使用appsink来与OpenCV连接起来进行处理,在不断测试之下,先后实现了以下功能: 1. OpenCV运行gstreamer命令,通过appsink传递给OpenCV显示 2. OpenCV运行gstreamer命令,然后再把Mat图像数据通过appsrc传…

(3)(3.6) 用于OpenTX的Yaapu遥测脚本

文章目录 前言 1 安装和操作 2 参数说明 前言 这是一个开源 LUA 脚本&#xff0c;用于在使用 OpenTX 2.2.3 的 Horus X10、X12、Jumper T16、T18、Radiomaster TX16S、Taranis X9D、X9E、QX7 和 Jumper T12 无线电设备上显示 FrSky 的直通遥测数据(FrSky passthrough telem…

解决IntelliJ IDEA 2023版本创建Spring项目时Java只能选择17或21的问题

问题描述&#xff1a; 当使用IntelliJ IDEA2023版本中Spring Initializr新建Spring项目时&#xff0c;即使JDK配置项为1.8&#xff0c;Java配置项仍然只能选17或21. 在JDK为1.8版本情况下&#xff0c;Java选择17或21&#xff0c;点击NEXT按钮&#xff0c;则会弹窗提示SDK不支持…

航空航天5G智能工厂数字孪生可视化平台,推进航空航天数字化转型

航空航天5G智能工厂数字孪生可视化平台&#xff0c;推进航空航天数字化转型。随着科技的不断发展&#xff0c;数字化转型已经成为各行各业关注的焦点。航空航天业作为高端制造业的代表&#xff0c;也在积极探索数字化转型之路。为了更好地推进航空航天数字化转型&#xff0c;一…

安卓游戏开发之音频技术优劣分析

一、引言 在安卓游戏开发中&#xff0c;音频处理技术扮演着至关重要的角色&#xff0c;它不仅能够增强游戏的沉浸感和玩家体验&#xff0c;还能通过声音效果传达关键的游戏信息。以下将对几种常见的安卓游戏音频处理技术进行优劣分析&#xff0c;并结合应用场景来阐述其特点。 …

十九、图像的放缩和插值

项目功能实现&#xff1a;对一张图像进行放大和缩小操作 按照之前的博文结构来&#xff0c;这里就不在赘述了 一、头文件 resizing.h #pragma once#include<opencv2/opencv.hpp>using namespace cv;class RESIZING { public:void resizing(Mat& image); };#pragma…

【Linux】进程状态

进程状态 进程状态的简要介绍运行状态进程排队 阻塞状态挂起状态Linux中的进程状态 进程状态的简要介绍 进程状态指的是一个操作系统中正在运行的进程当前所处的状态。根据不同的操作系统&#xff0c;进程状态可能会有一些细微的差别&#xff0c;但最主要的是以下三种状态 运行…

电路设计(26)——速度表的multisim仿真

1.设计要求 设计一款电路&#xff0c;能够实时显示当前速度。 用输入信号模拟行驶的汽车&#xff0c;信号频率的1hz代表汽车速度的1m/s。最后速度显示&#xff0c;以km/h为单位。 2.电路设计 当输入信号频率为40HZ时&#xff0c;显示的速度应该为144KM/h&#xff0c;仿真结果为…

IP地址证书详解

IP地址证书是一种特殊的SSL/TLS证书&#xff0c;它直接绑定到服务器的公网IP地址而不是域名&#xff0c;这种类型的证书特别适合于那些没有域名只有公网IP或者不方便使用域名的企业或个人。证书允许通过特定的IP地址访问的Web服务提供HTTPS加密连接&#xff0c;确保在没有使用域…

深入理解flinksql执行流程,calcite与catalog相关概念,扩展解析器实现语法的扩展

深入理解Flink Sql执行流程 1 Flink SQL 解析引擎1.1SQL解析器1.2Calcite处理流程1.2.1 SQL 解析阶段&#xff08;SQL–>SqlNode&#xff09;1.2.2 SqlNode 验证&#xff08;SqlNode–>SqlNode&#xff09;1.2.3 语义分析&#xff08;SqlNode–>RelNode/RexNode&#…

EfficientNet环境搭建网络修改

引子 在深度学习CV领域&#xff0c;最初2012年突破的就是图像分类&#xff0c;发展这么多年&#xff0c;基本上已经没有什么进展了。此篇作为之前EfficientNet挽留过的总结&#xff0c;现在整理下&#xff0c;OK&#xff0c;让我们开始吧。 一、EfficientNet安装 1、pytorch…

Window部署Exceptionless

Exceptionless Elasticsearch 版本&#xff1a; Exceptionless&#xff1a;8.1.0 Elasticsearch&#xff1a;7.17.5 JDK&#xff1a;11.0.10 目录 一、Elasticsearch运行 二、 Exceptionless 一、Elasticsearch运行 bin目录下elasticsearch.bat 直接运行 访问 http://lo…