Springboot整合RabbitMq,详细使用步骤

Springboot整合RabbitMq,详细使用步骤

    • 1 添加springboot-starter依赖
    • 2 添加连接配置
    • 3 在启动类上添加开启注解`@EnableRabbit`
    • 4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
    • 5 生产者推送消息
    • 6 消费者接收消息
    • 7 生产者的消息回调机制
    • 8 消费者的确认机制

消息队列(Message Queue)是一种应用间的通信方式。顾名思义,将消息放到队列中,排队发出。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

而且消息队列一般有完整的接收确认,发布消息回调等一系列机制,可以确保接收方一定能接受。

用到的场景如:异步处理,应用解耦,流量削锋和消息通讯等。

以下先详细介绍下springboot项目怎么使用RabbitMq

1 添加springboot-starter依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 添加连接配置

以下几项是最基础的配置,其他配置下面用到时额外添加

spring:rabbitmq:host: 127.0.0.1port: 5672username: guest #默认用户名和密码password: guestvirtual-host: /  # 虚拟主机

3 在启动类上添加开启注解@EnableRabbit

4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。

可以直接在java代码中通过注入实体类的方式创建交换机及队列等设备。但此方式添加的’设备‘是懒加载的形式,只要当使用到识别到监听注解或调用发送消息的方法时,才会真在rabbitmq中创建。

可以定位到amqp依赖的源码看到在程序启动的时候并不创建连接,只有在添加了监听注解启动程序或要发送消息时,才会走创建连接的方法。

配置类的示例代码如下:

@Configuration
public class RabbitConfig {/*** 队列*/@BeanQueue createDirectQueue(){/*** durable:是否持久化,默认是false。true为持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;false为暂存队列:当前连接有效。* exclusive:默认也是false。true是只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。* 一般设置一下队列的持久化就好,其余两个就是默认false*///两种创建方式//QueueBuilder.durable("queue.test1").build();return new Queue("queue.test1",true,false,false);}/***  交换机*/@BeanDirectExchange createDirectExchange(){/*** durable、autoDelete参数性质和上面队列的一致*/return new DirectExchange("direct.test1",true,false);}/*** 将队列和交换机绑定, 并设置用于匹配键*/@BeanBinding binding(){return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with("testRoute");}}

以上是以直连交换机为例,创建其他交换机写法一样,具体对应哪个实体类可以在Exchange接口 —>AbstractExchange实现类下看到。

在这里插入图片描述

可以通过客户端看到队列、交换机、路由关系已经创建成功

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5 生产者推送消息

@Autowired
RabbitTemplate rabbitTemplate;@PostMapping("/sendMessage")
public AjaxResult sendMessage(@RequestBody Map params) {String id = UUID.randomUUID().toString();String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));params.put("messageId",id);params.put("createTime",createTime);/*** 发给交换机,在发给路由绑定的队列*/rabbitTemplate.convertAndSend("direct.test1","testRoute",params);return AjaxResult.success("成功");
}

可以看到,rabbitmq成功接收到消息。

在这里插入图片描述
在这里插入图片描述

6 消费者接收消息

@Component
@RabbitListener(queues = "queue.test1")
public class Receiver {@RabbitHandlerpublic void process(Map message){System.out.printf("消费者接收到消息:" + message.toString());}
}

可以看到消息成功被消费,监听处理方法也成功被执行

在这里插入图片描述

​ 如果多个监听器监听同一个队列,是轮询的方式进行消费,不会出现重复消费的情况;如果多个队列同时以相同的路由绑定同一个交换机,消息会以复制的形式发送至每个队列。

7 生产者的消息回调机制

在实际运用中,作为消息的生产者,很多时候我们需要确认消息是否成功发送到了mq中。同时我们还需要知道,假如消息出现异常时的异常情况。为了满足这个业务场景,我们就需要配置消息回调。

  • 增加配置项

    spring:rabbitmq:publisher-confirm-type: correlated #消息发送成功交互publisher-returns: true
    

    可能之前老的版本是publisher-confirm:true,但现在写的话会发现变红了,说明过时了。因为在springboot的自动配置依赖里该配置级别已经为error

在这里插入图片描述

  • 目前回调包含发送成功回调ConfirmCallback和失败回调ReturnsCallback。一些老版本的可能有ReturnCallback。下面先自定义两个回调的回调方法

    ConfirmCallback的回调

    /*** 消息发送成功回调*/
    public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback {/*** 消息成功到达exchange,ack=true* @param correlationData* @param ack* @param s*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {System.out.println("相关数据:" + correlationData);System.out.println("确认状态:" + ack);System.out.println("造成原因:" + s);}
    }
    

    ReturnsCallback的回调

    /*** 发生异常时的消息返回提醒*/
    public class RabbitReturnsCallback implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("失败回调:" + returnedMessage);}
    }
    

    将自定义回调配置到模板中

    在Rabbit配置类中添加RabbitTemplate并配置两个回调

    @Configuration
    public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());rabbitTemplate.setReturnsCallback(new RabbitReturnsCallback());return rabbitTemplate;}
    }
    

    那以上两种回调函数什么时候回执行呢?

    1. 消息发送到exchange,且传播到队列,则只有ConfirmCallback回调,ack=true
    2. 消息发送不到exchange,则只有ConfirmCallback回调,ack=false
    3. 消息发送到exchange,没传播到队列(或找不到路由),则ConfirmCallback回调,ack=true、ReturnsCallback回调

由此可见ConfirmCallback回调是exchange的一种反馈,是发生在生产者和交换机之间的,无论能不能发到都会回调。消息发送出去如果收到交换机的确认反馈则回调为成功,如果没有收到确认反馈,则回调为失败。

ReturnsCallback回调是队列的一种反馈,是发生在交换机和队列之间的。只有消息先到达交换机,且发送到队列失败才会执行此回调。

下面是对以上三种情况的测试

  • 消息完全成功发送到队列

    模拟:交换机和路由都存在

    @PostMapping("/sendMessage")
    public AjaxResult sendMessage(@RequestBody Map params) {String id = UUID.randomUUID().toString();String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));params.put("messageId",id);params.put("createTime",createTime);//direct.test1和testRoute都存在rabbitTemplate.convertAndSend("direct.test1","testRoute",params);return AjaxResult.success("成功");
    }
    

    消费者监听且ConfirmCallback回调为true
    在这里插入图片描述

  • 消息没有发送到exchange

    模拟:交换机不存在

    @PostMapping("/sendMessageFailByNoExchange")public AjaxResult sendMessageFailByNoExchange(@RequestBody Map params) {String id = UUID.randomUUID().toString();String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));params.put("messageId",id);params.put("createTime",createTime);//该交换机不存在rabbitTemplate.convertAndSend("direct.exchange不存在","testRoute",params);return AjaxResult.success("成功");}
    

    ConfirmCallback回调为false
    在这里插入图片描述

  • 消息发送到exchange,但没发送到队列

    模拟:该交换机存在但该路由不存在

    @PostMapping("/sendMessageFailByNoRoute")
    public AjaxResult sendMessageFailByNoRoute(@RequestBody Map params) {String id = UUID.randomUUID().toString();String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));params.put("messageId",id);params.put("createTime",createTime);//交换机存在但该路由不存在rabbitTemplate.convertAndSend("direct.test1","failRoute",params);return AjaxResult.success("成功");
    }
    

    ConfirmCallback回调为true,ReturnsCallback失败回调执行
    在这里插入图片描述

可以通过两个回调确定哪些消息没有成功发送到队列,记录下来再次发送,保证消息不丢失。

8 消费者的确认机制

消费者和生产者不同,消费者本身就是凭自己喜好,符合条件才会消费。

所有消费者的确认机制有三种模式:

  1. 自动确认

    是默认的消息确认模式,即mq成功将消息发出,消费者成功接收到,就反馈确认。不管消费者是不是已经成功处理。

    所以如果处理逻辑抛出异常,就相当于丢失了消息。

    一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  2. 手动确认

  • 自动确认

    自动确认没什么好说的,消费者确认机制的默认模式就是auto,自动反馈确认,所以可以看到只要消息被消费了队列中就不存在了。

  • 手动确认

    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。

    • basic.ack:确认正确
    • basic.nack:拒绝确认,可以选择是否重新发回队列、是否批处理
    • basic.reject:拒绝确认,可以选择是否重新发回队列

    后两者对应的方法为channel.basicNackchannel.basicReject两者都表示消息没有被正常处理。其中有个参数requeue,选择是否重新入队,开启此项可以避免消息丢失。

    但开启要慎重,如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,导致消息积压。

    两者有略微的区别channel.basicNack可以拒绝多个消息,channel.basicReject只能拒绝一个

    下面看下代码怎么实现

    如果使用的是RabbitListener注解,需要将ackMode设置为手动模式ackMode="MANUAL"

    三个种情况分别对应下面 【1、2、3】三个方法

    @RabbitHandler@RabbitListener(queues = "queue.test1",ackMode = "MANUAL")public void processQueueTest1(Map param, Message message, Channel channel) throws IOException {/*** 【1 确认】* deliveryTag:消息的标识符* multiple:*     false:仅确认当前消息*     true:确认所有消息*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);/***  【2 拒绝】*  第一个参数是消息的唯一ID*  第二个参数表示是否批量处理*  第三个参数表示是否将消息重发回队列*///channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);/*** 【3 拒绝】* 第一个参数deliveryTag表示消息ID* 第二个参数为true表示是否重新入列,如果是true则重新丢回队列里等待再次消费,否则数据只是被消费,不会丢回队列里*///channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);System.out.println("queue.test1消费者接收到消息:" + param.toString());System.out.println("message:" + message);System.out.println("channel:" + channel);}
    

    channel.basicAck确认

    以上代码中channel.basicAck是消费者向rabbitmq发送确认消息。向queue.test1队列发送消息,此时开启了手动确认,如果不写此行,队列中会一直存在一条Unacked(未确认)的消息。
    在这里插入图片描述
    执行了channel.basicAck消息才会被消费,如下图已经无滞留消息。
    在这里插入图片描述
    channel.basicNack、channel.basicReject否认
    可以看到拒绝消息之后,因为requeue参数为true,消息会被重新入队,入队后再次等待被消费者消费。如果requeue设为false的话则队列中该消息就是已经被消费。一般情况可以单独记录下,在轮询发送到队列。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/92165.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs+vue+elementui招聘求职网站系统的设计与实现-173lo

&#xff08;1&#xff09;管理员的功能是最高的&#xff0c;可以对系统所在功能进行查看&#xff0c;修改和删除&#xff0c;包括企业和用户功能。管理员用例如下&#xff1a; 图3-1管理员用例图 &#xff08;2&#xff09;企业关键功能包含个人中心、岗位类型管理、招聘信息…

分布式 - 消息队列Kafka:Kafka消费者和消费者组

文章目录 1. Kafka 消费者是什么&#xff1f;2. Kafka 消费者组的概念&#xff1f;3. Kafka 消费者和消费者组有什么关系&#xff1f;4. Kafka 多个消费者如何同时消费一个分区&#xff1f; 1. Kafka 消费者是什么&#xff1f; 消费者负责订阅Kafka中的主题&#xff0c;并且从…

7.5.tensorRT高级(2)-RAII接口模式下的生产者消费者多batch实现

目录 前言1. RAII接口模式封装生产者消费者2. 问答环节总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程&#xff0c;之前有看过一遍&#xff0c;但是没有做笔记&#xff0c;很多东西也忘了。这次重新撸一遍&#xff0c;顺便记记笔记。 本次课程学习 tensorRT 高级-RAI…

【福建事业单位-公共基础-】01哲学基本概述和唯物论

【福建事业单位-公共基础-】01哲学基本概述和唯物论 一、哲学基本概述二、辩证唯物论&#xff08;1题&#xff09; 相关考点 一、哲学基本概述 向导、导向、指导&#xff0c;都是中性词&#xff0c;都可以&#xff1b;但是先导是褒义词&#xff0c;要跟上真正的哲学&#xff1…

gitee上传一个本地项目到一个空仓库

gitee上传一个本地项目到一个空仓库 引入 比如&#xff0c;你现在本地下载了一个半成品的框架&#xff0c;现在想要把这个本地项目放到gitee的仓库上&#xff0c;这时就需要我们来做到把这个本地项目上传到gitee上了。 具体步骤 1. 登录码云 地址&#xff1a;https://gite…

tkinter自定义控件:通过继承Frame实现Expander

文章目录 继承Frame点击事件Add函数 tkinter系列&#xff1a; GUI初步&#x1f48e;布局&#x1f48e;绑定变量&#x1f48e;绑定事件&#x1f48e;消息框&#x1f48e;文件对话框Frame控件&#x1f48e;PanedWindow和notebook控件扫雷小游戏&#x1f48e;强行表白神器 和其他…

Keil开发STM32单片机项目的三种方式

STM32单片机相比51单片机&#xff0c;内部结构复杂很多&#xff0c;因此直接对底层寄存器编码&#xff0c;相对复杂&#xff0c;这个需要我们了解芯片手册&#xff0c;对于复杂项目&#xff0c;这些操作可能需要反复编写&#xff0c;因此出现了标准库的方式&#xff0c;对寄存器…

嵌入式编译FFmpeg6.0版本并且组合x264

下载直通车:我用的是6.0版本的 1.准备编译: 2.进入ffmpeg源码目录&#xff0c;修改Makefile&#xff0c;添加编译选项&#xff1a; CFLAGS -fPIC 不加会报错 3.使用命令直接编译 ./configure --cross-prefix/home/xxx/bin/arm-linux-gnueabihf- --enable-cross-compile --targ…

【Redis】Redis三种集群模式-主从、哨兵、集群各自架构的优点和缺点对比

文章目录 前言1. 单机模式2. 主从架构3. 哨兵4. 集群模式总结 前言 如果Redis的读写请求量很大&#xff0c;那么单个实例很有可能承担不了这么大的请求量&#xff0c;如何提高Redis的性能呢&#xff1f;你也许已经想到了&#xff0c;可以部署多个副本节点&#xff0c;业务采用…

上传excel文件

文件上传&#xff0c;其实就是用el-upload组件来实现上传&#xff0c;只是换了样式&#xff0c;和图片上传一样 <el-form-item label"选择文件"><el-input placeholder"请选择文件" v-model"form.file" disabled style"width: 45…

Hlang社区项目说明

文章目录 前言Hlang社区技术前端后端 前言 Hello,欢迎来到本专栏&#xff0c;那么这也是第一次做这种类型的专栏&#xff0c;如有不做多多指教。那么在这里我要隆重介绍的就是这个Hlang这个项目。 首先&#xff0c;这里我要说明的是&#xff0c;我们的这个项目其实是分为两个…

大疆秋招指南,网申测评和面试攻略

大疆秋招内容简介 这是一个非常卷的时代&#xff0c;一到毕业季&#xff0c;各种各样规模不一的公司&#xff0c;纷纷向社会招聘&#xff0c;竞争实力强&#xff0c;知名度越高的企业&#xff0c;往往越能得到能力出众的人才的青睐&#xff0c;也正是在一批批新血液的注入下&a…

考研算法45天:首字母大写 【字符串:简单】

题目前置知识 如何使用scanf输入一个有空格的字符串 如何输入带空格的字符串_我码了的博客-CSDN博客 scanf("%[^\n]",str); 如何用ascll码将字符串的小写换为大写 char a; a a - 32; 题目概况 AC代码 #include <iostream> using namespace std;int main()…

Prometheus的搭建与使用

一、安装Prometheus 官网下载地址&#xff1a;Download | Prometheus 解压&#xff1a;tar -zxvf prometheus-2.19.2.linux-amd64.tar.gz重命名&#xff1a; mv prometheus-2.19.2.linux-amd64 /home/prometheus进入对应目录&#xff1a; cd /home/prometheus查看配置文件&am…

vim键盘图

国外&#xff1a;http://www.viemu.com/a_vi_vim_graphical_cheat_sheet_tutorial.html&#xff0c;原创&#xff0c;有SVG图&#xff0c;有分步骤的图。 国内翻译&#xff1a;[https://blog.csdn.net/qq_41052753/article/details/101031847 有几个配色&#xff0c;很高清&…

使用 flatMap 进行扁平化映像处理数据

实战背景 &#xff1a; 小伙伴遇到了数据处理方面的问题如下 &#xff1a; 只能说看到这里我也一头雾水&#xff0c;毕竟我也是菜&#x1f436;&#xff0c;那就请教大佬吧 &#xff1a; Map.flat 循环 二维 变 一维 就是 flatMap 了 啊这&#xff0c;&#xff0c;但是 flatM…

Mysql中使用存储过程插入decimal和时间数据递增的模拟数据

场景 Mysql插入数据从指定选项中随机选择、插入时间从指定范围随机生成、Navicat使用存储过程模拟插入测试数据&#xff1a; Mysql插入数据从指定选项中随机选择、插入时间从指定范围随机生成、Navicat使用存储过程模拟插入测试数据_mysql循环插入随机数据_霸道流氓气质的博客…

企业权限管理(十五)-方法级别权限控制

方法级别权限控制 jsr-250 3.Secured注解使用 开启表达式的使用 页面控制 显示xxx在线 <div class"pull-left info"><p><security:authentication property"principal.username"></security:authentication></p><a h…

【管理运筹学】第 5 章 | 整数规划 (1,问题提出与分支定界法)

文章目录 引言一、整数规划问题的提出1.1 整数规划的数学模型1.2 整数规划问题的求解 二、分支定界法2.1 分支与定界2.2 基本求解步骤&#xff08;一&#xff09;初始化&#xff08;二&#xff09;分支与分支树&#xff08;三&#xff09;定界与剪枝&#xff08;四&#xff09;…

el-tree通过default-expand-all动态控制展开/折叠

1、如下图通过勾选框动态控制展开/折叠&#xff0c;全选/清空 2、实现方式如下&#xff1a;定义key&#xff0c;监听checked2修改treeKey&#xff0c;重新渲染tere&#xff1b;附加全选和清空。 <div class"tree"><el-checkbox v-model"checked1"…