Spring Boot整合RabbitMQ之路由模式(Direct)

RabbitMQ中的路由模式(Direct模式)应该是在实际工作中运用的比较多的一种模式了,这个模式和发布与订阅模式的区别在于路由模式需要有一个routingKey,在配置上,交换机类型需要注入DirectExchange类型的交换机bean对象。在交换机和队列的绑定过程中,绑定关系需要在绑定一个路由key。由于在实际的工作中不大可能会用自动确认的模式,所以我们在整合路由模式的过程中,依然采用发送消息双确认机制和消费端手动确认的机制来保证消息的准确送达与消息防丢失。

1. 添加配置

在配置文件中,配置rabbitmq的相关账号信息,开启消息发送回调机制,配置文件其实和发布订阅模式是一样的。配置详情如下:

server:port: 10001spring:application:name: springboot-rabbitmq-s1rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: adminpassword: admin# 发送者开启 return 确认机制publisher-returns: true# 发送者开启 confirm 确认机制publisher-confirm-type: correlated

2. 创建配置类

    创建配置类RabbitMQConfig,用于声明交换机、队列,建立队列和交换机的绑定关系,注入RabbitTemplate的bean对象。配置类详情如下:
package com.study.rabbitmq.config;import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author alen* @DATE 2022/6/7 23:50*/
@Slf4j
@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "direct-order-exchange";public static final String SMS_QUEUE = "sms-direct-queue";public static final String EMAIL_QUEUE = "email-direct-queue";public static final String WECHAT_QUEUE = "wechat-direct-queue";/*** 1.* 声明交换机* @return*/@Beanpublic DirectExchange directExchange() {/*** directExchange的参数说明:* 1. 交换机名称* 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除* 3. 是否自动删除 false:不自动删除 true:自动删除*/return new DirectExchange(EXCHANGE_NAME, true, false);}/*** 2.* 声明队列* @return*/@Beanpublic Queue smsQueue() {/*** Queue构造函数参数说明* 1. 队列名* 2. 是否持久化 true:持久化 false:不持久化*/return new Queue(SMS_QUEUE, true);}@Beanpublic Queue emailQueue() {return new Queue(EMAIL_QUEUE, true);}@Beanpublic Queue wechatQueue() {return new Queue(WECHAT_QUEUE, true);}/*** 3.* 队列与交换机绑定*/@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");}@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");}@Beanpublic Binding wechatBinding() {return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");}/*** 将自定义的RabbitTemplate对象注入bean容器** @param connectionFactory* @return*/@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启消息推送结果回调rabbitTemplate.setMandatory(true);//设置ConfirmCallback回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("==============ConfirmCallback start ===============");log.info("回调数据:{}", correlationData);log.info("确认结果:{}", ack);log.info("返回原因:{}", cause);log.info("==============ConfirmCallback end =================");}});//设置ReturnCallback回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("==============ReturnCallback start ===============");log.info("发送消息:{}", JSONUtil.toJsonStr(message));log.info("结果状态码:{}", replyCode);log.info("结果状态信息:{}", replyText);log.info("交换机:{}", exchange);log.info("路由key:{}", routingKey);log.info("==============ReturnCallback end =================");}});return rabbitTemplate;}
}

3. 消费者配置

    在消费者项目的配置文件中开启手动确认,配置详情如下:
server:port: 10002spring:application:name: springboot-rabbitmq-s2rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: adminpassword: adminlistener:simple:# 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 autoacknowledge-mode: manual

4. 创建消费者

分别创建三个消费者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer来监听对应的队列,有消息后进行消费,三个消费者大同小异,分别如下

4.1 DirectEmailConsumer

package com.study.rabbitmq.service.direct;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;/*** @Author alen* @DATE 2022/6/10 22:54*/
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //监听队列
public class DirectEmailConsumer {//标记消费者逻辑执行方法@RabbitHandlerpublic void emailMessage(String msg, Channel channel, Message message) throws IOException {try {log.info("Email direct --接收到消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

4.2 DirectSmsConsumer

package com.study.rabbitmq.service.direct;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;/*** @Author alen* @DATE 2022/6/10 22:55*/
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //监听队列
public class DirectSmsConsumer {@RabbitHandlerpublic void smsMessage(String msg, Channel channel, Message message) throws IOException {try {log.info("sms direct --接收到消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

4.3 DirectWechatConsumer

package com.study.rabbitmq.service.direct;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;/*** @Author chaoxian.wu* @DATE 2022/6/10 22:55*/
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //监听队列
public class DirectWechatConsumer {@RabbitHandlerpublic void wechatlMessage(String msg, Channel channel, Message message) throws IOException {try {log.info("wechat direct --接收到消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

以上就是全部的代码部分,接下来我们在进入测试,看看实际效果如何,先发布一个routingKey=sms的消息,查看是不是只有对应的一个队列中接收到消息,消息发送详情:

package com.study.rabbitmq;import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {@Autowiredprivate OrderService orderService;@Testvoid contextLoads() {for (long i = 1; i < 2; i++) {//交换机名称String exchangeName = "direct-order-exchange";//路由keyString routingKey = "sms";Order order = buildOrder(i);orderService.createOrder(order, routingKey, exchangeName);}}private Order buildOrder(long id) {Order order = new Order();order.setRequestId(id);order.setUserId(id);order.setOrderNo(UUID.randomUUID().toString());order.setAmount(10L);order.setGoodsNum(1);order.setTotalAmount(10L);return order;}
}

我们登录rabbitmq管理后台查看下,只有sms-direct-queue这个队列有一条消息,效果如下:

我们启动消费者,看下是不是只有监听了sms-direct-queue这个队列的消费者有消费日志,效果如下:

再发一条routingKey=email的消息,消费的日志,效果图示如下

到此其实已经springboot整合rabbitmq的路由模式结束了,这种模式在工作中还是比较常见的,我们演示的是单点的效果,实际工作中,不大可能会使用服务单点部署,现在都讲究服务的高可用,就得服务集群部署,又会涉及到消息重复消费的问题需要处理,我个人觉得,遇到重复消费问题,我第一时间想到的就是分布式锁,哈哈~。但是锁什么呢?肯定是消息中的具备唯一性的属性。来达到防止消息的重复消费。

整个过程中,其实还存在一个小问题没有验证,就是ReturnCallback回调机制没有触发,因为这个得发生在交换机将消息发送到队列的时候失败才会触发,那么我们就发送一个不存在的routingKey就可以触发了,我们发送一个routingKey=duanxin的消息,这个肯定不会发送成功,我们通过断点来看看效果,效果如下:

然后我们常见的就全部整合完成了,当然,开启了双确认机制,虽然我们可以检测到消息投送的结果,然后可以针对投送失败的结果进行预警。但是开启了这个操作,就必然会对消息的处理效率产生影响。所以还得根据实际业务场景而定是否需要使用这个确认机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/107200.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JMeter性能测试(上)

一、基础简介 界面 打开方式 双击 jmeter.bat双击 ApacheJMeter.jsr命令行输入 java -jar ApacheJMeter.jar 目录 BIN 目录&#xff1a;存放可执行文件和配置文件 docs目录&#xff1a;api文档&#xff0c;用于开发扩展组件 printable-docs目录&#xff1a;用户帮助手册 li…

docker harbor私有库

目录 一.Harbor介绍 二.Harbor的特性 三.Harbor的构成 四.Harbor构建Docker私有仓库 4.2在Server主机上部署Harbor服务&#xff08;192.168.158.25&#xff09; 4.2.1 这时候这边就可以去查看192.168.158.25网页 4.3此时可真机访问serverIP 4.4通过127.0.0.1来登陆和推送镜…

vue 中 axios 的安装及使用

vue 中 axios 的安装及使用 1. axios 安装2. axios使用 1. axios 安装 首先&#xff0c;打开当前的项目终端&#xff0c;输入 npm install axios --save-dev验证是否安装成功&#xff0c;检查项目根目录下的 package.json,其中的 devDependencies 里面会多出一个axios及其版本…

Linux(基础篇二)

Linux基础篇 Linux基础篇二5. 系统管理5.1 Linux中的进程和服务5.3 systemctl5.4 运行级别CentOS 6CentOS 7 5.5 关机重启命令 Linux基础篇二 5. 系统管理 5.1 Linux中的进程和服务 计算机中&#xff0c;一个正在执行的程序或命令&#xff0c;被叫做“进程”(process) 启动之…

OpenCV实战(基础知识三)

简介 OpenCV是一个流行的开源计算机视觉库&#xff0c;由英特尔公司发起发展。它提供了超过2500个优化算法和许多工具包&#xff0c;可用于灰度、彩色、深度、基于特征和运动跟踪等的图像处理和计算机视觉应用。OpenCV主要使用C语言编写&#xff0c;同时也支持Python、Java、C…

网络安全(红队)自学学习路线

想自学网络安全&#xff08;黑客技术&#xff09;首先你得了解什么是网络安全&#xff01;什么是黑客&#xff01; 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全…

UE4 地形编辑基础知识 学习笔记

之前自己写过这样的功能&#xff0c;今天看到一个UE现成的 点击地形&#xff0c;选择样条 按住CTRL键点击屏幕中某一个点会在场景内生成一个这样的图标 再点两次&#xff0c;会生成B样条的绿线条 点击号再选择一个模型&#xff0c;会生成对应的链条状的mesh 拉高最远处的一个图…

python网络爬虫指南二:多线程网络爬虫、动态内容爬取(待续)

文章目录 一、多线程网络爬虫1.1 线程的基础内容、GIL1.2 创建线程的两种方式1.3 threading.Thread类1.4 线程常用方法和锁机制1.5 生产者-消费者模式1.5.1 生产者-消费者模式简介1.5.2 Condition 类协调线程 1.6 线程中的安全队列1.6 多线程爬取王者荣耀壁纸1.6.1 网页分析1.6…

开源双语对话语言模型 ChatGLM-6B 本地私有化部署

本文首发于&#xff1a;https://www.licorne.ink/2023/08/llm-chatglm-6b-local-deploy/ ChatGLM-6B 是一个开源的、支持中英双语的对话语言模型&#xff0c;基于 General Language Model (GLM) 架构&#xff0c;具有 62 亿参数。结合模型量化技术&#xff0c;用户可以在消费级…

H36M VS 3DPW datasets

1采集设备方面 H36M使用了高精度的多视角摄像机动态捕捉系统获得了非常准确和连贯的3D关节坐标标注。 3DPW使用了单目摄像机与IMU的复合传感系统进行采集,存在一定程度的标注噪声。 2场景环境方面 H36M主要针对室内定向动作,背景单一简洁。 3DPW重点是室外复杂环境中人的自…

3d max插件CG MAGIC中的蜂窝材质功能可提升效率吗?

工作中能提升效率也都是大家所想的&#xff0c;对于设计师的一个设计过程中&#xff0c;可能想怎么样可以更快呀&#xff0c;是哪个步骤慢了呢&#xff1f; 这样的结果只能说会很多&#xff0c;但是建模这个步骤&#xff0c;肯定是有多无少的。 为了让模型更加逼真&#xff0c…

html实现元素拖动替换

效果 实现 复制粘贴.html即可使用 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title>拖动替换</title></head><style>.box {width: 500px;height: 500px;background: gainsboro;border-radius: 10px;}…

开源的经济影响:商业与社区的平衡

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Docker搭建私有仓库并迁移

目录 方案 A、B机器安装docker 设置阿里云镜像源 安装 Docker-CE并设置为开机自动启动 A机器准备数据 拷贝数据 B机器运行redis、mysql镜像 重启docker服务 方案 准备两台机器&#xff1a;A机器&#xff08;可以连接外网&#xff09;&#xff0c;B机器&#xff08;内网机器…

单片机学习-蜂鸣器电子元件

蜂鸣器是有什么作用的&#xff1f; 蜂鸣器 是 一种 一体化结构 的电子训响器&#xff0c;可以发出声音的电子元器件 蜂鸣器分类&#xff1f; ①压电式蜂鸣器&#xff08;图左&#xff09; 称&#xff1a; 无源蜂鸣器 ②电磁式蜂鸣器&#xff08;图右&#xff09; 称&#xf…

电商PC端设计之店招

设计总资料&#xff1a; 1、裂帛 2、小狗电器 3、店招120PX&#xff0c;导航栏30px 4、毕业那些年的店招 5、操作实例&#xff0c;建立120PX的画布 6、产品多的时候要加搜索框 7、产品店招在左边&#xff0c;主推产品在右边 8、贝蒂佩琪 实战案例 1、利用画布调整图像大小 2、…

前端需要理解的Vue知识

1 模板语法 Vue使用基于 HTML 的模板语法&#xff0c;能声明式地将其组件实例的数据绑定到DOM。所有Vue 模板可以被符合规范的浏览器和 HTML 解析器解析。Vue 会将模板编译成高度优化的 JavaScript 代码。结合响应式系统&#xff0c;当应用状态变更时&#xff0c;Vue 能够智能…

AUTOSAR规范与ECU软件开发(实践篇)6.6 BSW模块代码生成

在BCT界面中配置完所需要的BSW模块后&#xff0c; 可以进行BSW模块相关代码与描述文件的生成&#xff0c; 点击ISOLAR-A主菜单中“ ”右边箭头&#xff0c; 选择Run Configuraions&#xff0c; 如图6.57所示。 将弹出如图6.58所示的界面。 图6.57 Run Configuraions配置&#x…

Cpp学习——编译链接

目录 ​编辑 一&#xff0c;两种环境 二&#xff0c;编译环境下四个部分的 1.预处理 2.编译 3.汇编 4.链接 三&#xff0c;执行环境 一&#xff0c;两种环境 在程序运行时会有两种环境。第一种便是编译环境&#xff0c;第二种则是执行环境。如下图&#xff1a; 在程序运…