SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

前言

在现代分布式系统中,消息队列是实现服务解耦和异步处理的关键组件。Spring框架提供了强大的支持,使得与消息队列(如RabbitMQ、Kafka等)的集成变得更加便捷和灵活。本文将深入探讨如何利用Spring的注解驱动方式来配置和管理队列、交换机、消息转换器等组件,从而实现一个高效且可扩展的消息处理架构。

在本博客中,我们将重点介绍:

如何使用Spring的注解方式配置RabbitMQ的队列和交换机。
如何配置消息转换器(如Jackson2JsonMessageConverter)来处理不同格式的消息。
如何根据业务需求对现有代码进行改造,将消息队列引入到系统中,从而实现消息的异步处理与解耦。
通过这篇文章,您将了解如何使用Spring框架的注解配置简化消息队列的管理,同时提升系统的可扩展性和维护性。


基于注解的声明队列交换机

利用SpringAMQP声明DirectExchange并与队列绑定
需求如下:

  1. 在consumer服务中,声明队列direct.queue1和direct.queue2
  2. 在consumer服务中,声明交换机hmall.direct,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

在这里插入图片描述
基于Bean声明队列和交换机代码如下:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct")}@Beanpublic Queue directQueue1(){return new Queue("direct.queuue1");}@Beanpublic Binding directQueue1bindingRed( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1bindingBlue( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queuue2");}@Beanpublic Binding directQueue2bindingRed( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2bindingYellow( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

SpringAMOP还提供了基于@RabbitListener注解来声明队列和交换机的方式

@RabbitListener(bindings =@QueueBinding(value = @Queue(name =direct.queue1),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueuel(string msg){System.out.println("消费者1接收到Direct消息:【+msg+"】");
}

接收者代码如下:

	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String message)throws Exception {log.info("消费者1监听到direct.queue2的消息,["+message+"]");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))

消息转换器

消息转换器
需求:测试利用SpringAMQP发送对象类型的消息

  • 声明一个队列,名为object.queue
  • 编写单元测试,向队列中直接发送一条消息,消息类型为Map
  • 在控制台查看消息,总结你能发现的问题
// 准备消息
Map<String,0bject>msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age"21);

创建队列object.queue
在这里插入图片描述
测试代码如下:

	@Testpublic void TestSendObject(){Map<String, Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 18);//3.发送消息 参数分别是:交换机名称、RoutingKey(暂时为空)、消息rabbitTemplate.convertAndSend("object.queue",msg);}

在控制台上找到object.queue中得到消息
在这里插入图片描述
Spring的对消息对象的处理是由org.springframework.amgp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

在publisher和consumer中都要配置Messageconverter:

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}

在这里插入图片描述
在这里插入图片描述
消费者代码:

	@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg)throws Exception {log.info("消费者监听到pbject.queue的消息,["+msg+"]");}

在这里插入图片描述
运行结果如下:
在这里插入图片描述

业务改造

需求:改造余额支付功能,不再同步调用交易服务的0penFeign接口,而是采用异步MO通知交易服务更新订单状态。
在这里插入图片描述
在trade-service微服务消费者配置和pay-service微服务发送者都配置MQ依赖

	<!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在trade-service微服务和pay-service微服务添加上RabbitMQ配置信息

spring:rabbitmq:host: 192.168.244.136port: 5672virtual-host: /hmallusername: hmallpassword: 1234

因为消费者和发送者都需要消息转换器,故直接将代码写到hm-common服务中,在config包中创建MqConfig类

package com.hmall.common.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

同时trade-service微服务和pay-service微服务是无法自动扫描到该类,采用SpringBoot自动装配的原理,在resource文件夹下的META-INF文件夹下的spring.factories文件中添加类路径:
在这里插入图片描述

在接收者trade-service微服务中创建PayStatusListener

package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue("trade.pay.success.queue"),exchange = @Exchange(value = "pay.direct"),key = "pay.success"))public void ListenPaySuccess(Long orderId) {orderService.markOrderPaySuccess(orderId);}}

修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

@Service
@RequiredArgsConstructor
@Slf4j
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {private final RabbitTemplate rabbitTemplate;...@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查询支付单PayOrder po = getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}}
}

在这里插入图片描述
在这里插入图片描述


总结

本文介绍了基于Spring框架的注解方式来配置消息队列、交换机以及消息转换器的实现方法。通过注解配置,开发者可以更轻松地创建和管理RabbitMQ等消息队列的组件,而无需过多的 XML 配置或繁琐的手动配置。具体来说,我们探讨了如何:

使用 @RabbitListener 和 @EnableRabbit 注解配置消息监听器和消息队列。
配置消息转换器,特别是如何通过 Jackson2JsonMessageConverter 将消息转换为JSON格式,从而实现数据的序列化与反序列化。
结合业务需求,讲解如何对现有系统进行改造,集成消息队列,实现异步处理和服务解耦。
通过这些配置和改造,系统的消息处理能力得到了增强,性能和可扩展性也得到了显著提升。消息队列的使用不仅能够减少服务之间的紧耦合,还能够通过异步方式提高系统的响应速度和吞吐量。

希望本博客能够帮助您理解Spring在消息队列方面的强大功能,并为您的业务应用提供参考。随着系统复杂度的增加,合理的使用消息队列将成为构建高可用、高性能系统的关键之一。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/22677.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习经验分享【39】YOLOv12——2025 年 2 月 19 日发布的以注意力为核心的实时目标检测器

YOLO算法更新速度很快&#xff0c;已经出到V12版本&#xff0c;后续大家有想发论文或者搞项目可更新自己的baseline了。 代码&#xff1a;GitHub - sunsmarterjie/yolov12: YOLOv12: Attention-Centric Real-Time Object Detectors 摘要&#xff1a;长期以来&#xff0c;增强 …

Pytorch实现之特征损失与残差结构稳定GAN训练,并训练自己的数据集

简介 简介:生成器和鉴别器分别采用了4个新颖设计的残差结构实现,同时在损失中结合了鉴别器层的特征损失来提高模型性能。 论文题目:Image Generation by Residual Block Based Generative Adversarial Networks(基于残留块的生成对抗网络产生图像) 会议:2022 IEEE Int…

后“智驾平权”时代,谁为安全冗余和体验升级“买单”

线控底盘&#xff0c;正在成为新势力争夺下一个技术普及红利的新赛点。 尤其是进入2025年&#xff0c;比亚迪、长安等一线传统自主品牌率先开启高阶智驾的普及战&#xff0c;加上此前已经普及的智能座舱&#xff0c;舱驾智能的「科技平权」进一步加速行业启动「线控底盘」上车窗…

【Node.js】express框架

目录 1初识express框架 2 初步使用 2.1 安装 2.2 创建基本的Web服务器 2.3 监听方法 2.3.1 监听get请求 2.3.2 监听post请求 2.4 响应客户端 2.5 获取url中的参数(get) 2.5.1 获取查询参数 2.5.2 获取动态参数 2.6 托管静态资源 2.6.1 挂载路径前缀 2.6.2 托管多…

树形DP(树形背包+换根DP)

树形DP 没有上司的舞会 家常便饭了&#xff0c;写了好几遍&#xff0c;没啥好说的&#xff0c;正常独立集问题。 int head[B]; int cnt; struct node {int v,nxt; }e[B<<1]; void modify(int u,int v) {e[cnt].nxthead[u];e[cnt].vv;head[u]cnt; } int a[B]; int f[B]…

REACT--组件通信

组件之间如何进行通信&#xff1f; 组件通信 组件的通信主要借助props传递值 分为整体接收、解构接收 整体接收 import PropTypes from prop-types;//子组件 function Welcome(props){return (<div>hello Welcome,{props.count},{props.msg}</div>) }// 对 We…

【排序算法】六大比较类排序算法——插入排序、选择排序、冒泡排序、希尔排序、快速排序、归并排序【详解】

文章目录 六大比较类排序算法&#xff08;插入排序、选择排序、冒泡排序、希尔排序、快速排序、归并排序&#xff09;前言1. 插入排序算法描述代码示例算法分析 2. 选择排序算法描述优化代码示例算法分析 3. 冒泡排序算法描述代码示例算法分析与插入排序对比 4. 希尔排序算法描…

纠错检索增广生成论文

一、摘要 动机&#xff1a;RAG严重依赖于检索文档的相关性&#xff0c;如果检索出错&#xff0c;那么LLM的输出结果也会出现问题 解决方案&#xff1a;提出纠正性检索增强生成&#xff08;CRAG&#xff09;即设计一个轻量级的检索评估器&#xff0c;用来评估针对某个查询检索…

Java NIO与传统IO性能对比分析

Java NIO与传统IO性能对比分析 在Java中&#xff0c;I/O&#xff08;输入输出&#xff09;操作是开发中最常见的任务之一。传统的I/O方式基于阻塞模型&#xff0c;而Java NIO&#xff08;New I/O&#xff09;引入了非阻塞和基于通道&#xff08;Channel&#xff09;和缓冲区&a…

easelog(1)基础C++日志功能实现

EaseLog(1)基础C日志功能实现 Author: Once Day Date: 2025年2月22日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 注&#xff1a;本简易日志组件代码实现参考了Google …

Vue面试2

1.跨域问题以及如何解决跨域 跨域问题&#xff08;Cross-Origin Resource Sharing, CORS&#xff09;是指在浏览器中&#xff0c;当一个资源试图从一个不同的源请求另一个资源时所遇到的限制。这种限制是浏览器为了保护用户安全而实施的一种同源策略&#xff08;Same-origin p…

MongoDB应用设计调优

应用范式设计 什么是范式 数据库范式概念是数据库技术的基本理论&#xff0c;几乎是伴随着数据库软件产品的推出而产生的。在传统关系型数据库领域&#xff0c;应用开发中遵循范式是最基本的要求。但随着互联网行业的发展&#xff0c;NoSQL开始变得非常流行&#xff0c;在许多…

Mac安装配置Tomcat 8

一、官网下载 Index of /disthttps://archive.apache.org/dist/ 1、进入界面如下&#xff1a; 2、我们找到Tomcat文件夹并进入 3、找到Tomcat 8并打开 4、找到对应的版本打开 5、打开bin 6、找到“apache-tomcat-8.5.99.tar.gz”并下载 二、配置Tomcat 1、解压已经下载好的…

【论文精读】VLM-AD:通过视觉-语言模型监督实现端到端自动驾驶

论文地址&#xff1a; VLM-AD: End-to-End Autonomous Driving through Vision-Language Model Supervision 摘要 人类驾驶员依赖常识推理来应对复杂多变的真实世界驾驶场景。现有的端到端&#xff08;E2E&#xff09;自动驾驶&#xff08;AD&#xff09;模型通常被优化以模仿…

百度搜索,能否将DeepSeek变成“内功”?

最近&#xff0c;所有的云平台和主流APP都在努力接入DeepSeek。其中&#xff0c;搜索类APP与搜索引擎更是“战况激烈”。那么问题来了&#xff0c;接入DeepSeek已经变成了标准配置&#xff0c;到底应该如何做出差异化&#xff1f;接入DeepSeek这件事能不能实现11大于2的效果&am…

Flask实现高效日志记录模块

目录 一. 简介&#xff1a; 1. 为什么需要请求日志 二. 日志模块组成 1. 对应日志表创建&#xff08;包含日志记录的关键字段&#xff09; 2. 编写日志记录静态方法 3. 在Flask中捕获请求日志 4. 捕获异常并记录错误日志 5. 编写日志接口数据展示 6. 写入数据展…

25轻化工程研究生复试面试问题汇总 轻化工程专业知识问题很全! 轻化工程复试全流程攻略 轻化工程考研复试真题汇总

轻化工程复试心里没谱&#xff1f;学姐带你玩转面试准备&#xff01; 是不是总觉得老师会问些刁钻问题&#xff1f;别焦虑&#xff01;其实轻化工程复试套路就那些&#xff0c;看完这篇攻略直接掌握复试通关密码&#xff01;文中有重点面试题可直接背~ 目录 一、这些行为赶紧避…

查看已经安装的Python库,高效管理自己的Python开发环境

在日常的Python开发中&#xff0c;掌握如何管理和查看已经安装的库是非常重要的。这不仅能帮助你了解当前项目的依赖关系&#xff0c;还能避免出现版本冲突等问题。在这篇文章中&#xff0c;我们将详细介绍查看已安装Python库的方法&#xff0c;并提供一些实用的工具和技巧。 …

Selenium实战案例1:论文pdf自动下载

在上一篇文章中&#xff0c;我们介绍了Selenium的基础用法和一些常见技巧。今天&#xff0c;我们将通过中国科学&#xff1a;信息科学网站内当前目录论文下载这一实战案例来进一步展示Selenium的web自动化流程。 目录 中国科学&#xff1a;信息科学当期目录论文下载 1.网页内…

Visual Studio Code 2025 安装与高效配置教程

一、软件简介与下载 1. Visual Studio Code 是什么&#xff1f; Visual Studio Code&#xff08;简称VS Code&#xff09;是微软推出的免费开源代码编辑器&#xff0c;支持 智能代码补全、Git集成、插件扩展 等功能&#xff0c;适用于前端开发、Python、Java等多种编程场景。…