Spring Cloud Stream如何屏蔽不同MQ带来的差异性?

引言

在当前的微服务架构下,使用消息队列(MQ)技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术的依赖

市面上有多种消息队列技术,如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系中实现这些MQ组件的无缝切换,以减少代码修改需求。

Spring Cloud Stream 通过其与主流消息中间件的灵活集成,实现了通过仅修改配置文件的方式来切换不同的MQ实现,从而提高了系统的适应性和可维护性。

什么是 Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。

基于 Spring Boot 构建,用于创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了来自多个供应商的中间件的固定配置,引入了持久发布-订阅语义、消费者组和分区的概念。

简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并。

图一

图一

主要概念:

1. application model(应用模型)

图二.Spring Cloud Stream 应用程序

图二.Spring Cloud Stream 应用程序

由中间件提供的 Binder 来处理绑定。 应用程序通过绑定这个 Binder 与其建立联系,发送消息时应用程序通过 outputs 通道将消息传递给 BinderBinder 再把消息给消息中间件。接收消息时消息中间件将消息传递给 BinderBinder 再把消息通过 inputs 通道传递给应用程序。

比如 Kafka Binder 依赖如下图:

图三 spring cloud stream kafka依赖

图三 spring cloud stream kafka依赖

2. The Binder Abstraction(Binder抽象)

Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。

Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现。

Binder 抽象也是该框架的扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。

3. Programming Model(编程模型)

核心概念

  • Destination Binders(目标绑定器):负责提供与外部消息传递系统集成的组件。

  • Bindings(绑定):外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。

  • Message(消息):生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

图四

图四

环境搭建

本文环境:

  • Java:17

  • Spring Boot:3.0.2

  • Spring Cloud:2022.0.2

  • Spring Cloud Alibaba:2022.0.0.0

maven依赖配置

pom.xml依赖如下:

消息驱动jar,用哪个mq引入哪个即可。<dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>
</dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

配置文件

application.yml RocketMq 配置信息:

spring:cloud:stream:stream:rocketmq:binder:name-server: 127.0.0.1:9876;127.0.0.1:9877function:# 组装和绑定definition: myTopicCbinders:default:type: rocketmqbindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

application.yml Kafka 配置信息:

spring:cloud:stream:stream:kafka:binder:brokers: 127.0.0.1:9092function:# 组装和绑定definition: myTopicCbinders:default:type: kafkabindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

消息生产者

创建一个简单的消息生产者:

@RestController
@Slf4j
public class ProducerStream {@Autowiredprivate StreamBridge streamBridge;@GetMapping("/test-stream")public String testStream() {streamBridge.send("demoChannel-out-0",MessageBuilder.withPayload("消息体").build());return "success";}
}

消息消费者

创建一个消息消费者来接收消息:

@Slf4j
@Configuration
public class TestStreamConsumer {@Beanpublic Consumer<String> demoChannel() {return message -> {log.info("demoChannel接到消息:{}", message);};}
}

假如需要从 Kafka 替换成 RocketMq ,只需要修改pom文件和配置文件即可。

在之前的 Spring Cloud Stream 版本中是采用注解的方式来实现绑定,在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想,简化了应用程序配置。

具体可见官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names

Spring Cloud Stream 发送消息流程

图五 spring cloud stream消息流程图

图五 spring cloud stream消息流程图

消息模型

通过图三可以看到 Sping Cloud Stream 的依赖关系。

Sping Cloud Stream -> Spring Integration -> Spring Messaging

可以看出来 Sping Cloud Stream 是基于 Spring Integration 做了一层封装,是依赖于 Spring Integration 这个组件的,而 Spring Integration 则依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。

Spring Integration 是对 Spring Messaging 的扩展,设计目标是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。

各个异构系统相互集成时,Spring Integration 通过通道之间的消息传递,让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。

Spring MessagingSpring 框架中的一个底层模块,用于提供统一的消息编程模型。

消息 Message 接口定义:

public interface Message<T> {//消息体T getPayload();//消息头MessageHeaders getHeaders();
}

消息通道 MessageChannel 接口定义:

@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;//发送消息,无限期阻塞default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}//发送消息,阻塞直到到达指定超时时间boolean send(Message<?> message, long timeout);
}

消息通道 MessageChannel 接收消息,调用send()方法将消息发送至该消息通道。

消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。

Spring message 把通道抽象成两种基本表现形式

  • 支持轮询的 PollableChannel

  • 实现发布-订阅模式的 SubscribableChannel

这两个通道都继承自具有消息发送功能的 MessageChannel

public interface SubscribableChannel extends MessageChannel {//通过注册回调函数MessageHandler来实现事件响应//注册消息处理器boolean subscribe(MessageHandler handler);//取消注册消息处理器boolean unsubscribe(MessageHandler handler);
}
public interface PollableChannel extends MessageChannel {//通过轮询操作主动获取消息//从通道中接收消息@NullableMessage<?> receive();//指定超时时间,从通道中接收消息@NullableMessage<?> receive(long timeout);
}

MessageHandler接口定义:

@FunctionalInterface
public interface MessageHandler {//处理消息方法void handleMessage(Message<?> message) throws MessagingException;
}

再回到图五流程图中,我们最终可以看到 KafkaRocketMQ 通过继承 AbstractMessageHandler 抽象类( AbstractMessageHandler 抽象类是实现了 MessageHandler 接口)来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder 代码中来实现。

结论

回到我们的主题,Spring Cloud Stream 如何屏蔽不同 MQ 带来的差异性?

  • 统一的编程模型:发送和接收代码一致,开发者专注于业务逻辑即可。不用管底层消息中间件的实现。

  • Binder 抽象:封装与消息队列的交互逻辑,每种队列有自己的 Binder 实现。

  • 自动配置和约定优于配置:采用约定大于配置的思想,极少的改动配置文件实现消息队列的切换,而代码不用变动。

  • 高级特性的抽象:如分区、消息分组、持久性订阅等高级特性,Spring Cloud Stream 提供了抽象层,由不同的消息队列去实现。

参考资料

  • 官方文档:Spring Cloud Stream Reference Guide

  • 《Spring核心技术和案例实战》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/205688.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp2023年微信小程序头像+昵称分别获取

1、DOM <view class"m-user"><view class"user-info"><!--头像 GO--><button class"avatar avatar-wrapper" open-type"chooseAvatar" chooseavatar"onChooseAvatar"slot"right"><im…

智能手表上的音频(四):语音通话

上篇讲了智能手表上音频文件播放。本篇开始讲语音通话。同音频播放一样有两种case&#xff1a;内置codec和BT。先看这两种case下audio data path&#xff0c;分别如下图&#xff1a; 内置codec下的语音通话audio data path 蓝牙下的语音通话audio data path 从上面两张图可以看…

数字化时代的政务服务:构建便捷高效的线上政务大厅

引言&#xff1a; 随着数字化时代的来临&#xff0c;如何通过线上政务大厅搭建一个便捷高效的服务平台&#xff0c;以更好地满足公众需求值得探究。线上政务大厅是政务服务的新方式&#xff0c;但搭建线上政务大厅并不是一件容易的事情&#xff0c;需要精心的规划和设计。 一…

linux 应用开发笔记---【I/O文件/基础篇 】

文章笔记来自于【速学Linux】手把手教你学嵌入式Linux C应用编程_哔哩哔哩_bilibili 一&#xff0c;什么是linux应用程序 1.运行在linux操作系统用户空间的程序 2.内核程序运行在内核空间&#xff0c;应用程序运行在用户空间 在终端执行的命令ls,ps。。。。。。都是运行在用…

vue3实现元素拖拽移动功能

效果图 实现拖拽移动 首先我们给需要实现功能的元素加一个draggable"true"让元素能够被拖拽 先来认识两个搭配draggable属性一起使用的事件——ondragstart和ondragend&#xff0c;它们的定义分别为&#xff1a; ①. ondragstart 事件在用户开始拖动元素或选择的文…

BUUCTF [GXYCTF2019]BabyUpload 1详解(.htaccess配置文件特性)

题目环境&#xff1a;查看题目源码 SetHandler application/x-httpd-php 通过源码可以看出这道文件上传题目主要还是考察.htaccess配置文件的特性 倘若不先上传.htaccess配置文件&#xff0c;那么后台服务器就无法解析php代码 这个是需要注意的 .htaccess配置文件特性 概述来说…

python爬虫AES案例:某招聘网站

声明&#xff1a; 该文章为学习使用&#xff0c;严禁用于商业用途和非法用途&#xff0c;违者后果自负&#xff0c;由此产生的一切后果均与作者无关 一、找出需要加密的参数 js运行 atob(‘aHR0cHM6Ly93d3cua2Fuemh1bi5jb20vc2VhcmNoLz9xdWVyeT1weXRob24mdHlwZT0w’) 拿到网址…

基于Java web的多功能游戏大厅系统的开发与实现

摘 要 目前&#xff0c;国内游戏市场上的网络游戏有许多种类&#xff0c;游戏在玩法上也越来越雷同&#xff0c;形式越来越单调。这种游戏性系统给玩家带来的成就感虽然是无穷的&#xff0c;但是也有随之而来的疲惫感&#xff0c;尤其是需要花费大量的时间和精力&#xff0c;这…

【Docker】从零开始:8.Docker命令:Commit提交命令

【Docker】从零开始&#xff1a;8.Docker命令:Commit命令 基本概念镜像镜像分层什么是镜像分层为什么 Docker 镜像要采用这种分层结构 本章要点commit 命令命令格式docker commit 操作参数实例演示1.下载一个新的ubuntu镜像2.运行容器3.查看并安装vim4.退出容器5提交自己的镜像…

机器学习笔记 - 3D数据的常见表示方式

一、简述 从单一角度而自动合成3D数据是人类视觉和大脑的基本功能,这对计算机视觉算法来说是比较难的。但随着LiDAR、RGB-D 相机(RealSense、Kinect)和3D扫描仪等3D传感器的普及和价格的降低,3D 采集技术的最新进展取得了巨大飞跃。与广泛使用的 2D 数据不同,3D 数据具有丰…

FDTD方法与其他数值方法有哪些区别?(案例分享)

FDTD方法是一种时域方法&#xff0c;它直接在时间域上模拟电磁波的传播和散射过程。相比之下&#xff0c;其他常见的数值方法&#xff0c;如有限元法&#xff08;FEM&#xff09;和有限积分法&#xff08;FIT&#xff09;&#xff0c;则是在空间域上进行离散化&#xff0c;将电…

运维笔记111

运维笔记 Navicat中查询指定字段名所在的表名tomcat设置JVM的初始堆内存修改catalina.sh文件修改完保存并关闭tomcat启动tomcat 查询数据库连接数查询是否存在死锁 Navicat中查询指定字段名所在的表名 SELECT * FROM information_schema.COLUMNS WHERE COLUMN_NAME‘替换成你要…

redis-cluster集群模式

Redis-cluster集群 1 Redis3.0引入的分布式存储方案 2集群由多个node节点组成,redis数据分布在节点之中,在集群之中分为主节点和从节点3集群模式当中,主从一一对应,数据写入和读取与主从模式一样&#xff0c;主负责写&#xff0c;从只能读4集群模式自带哨兵模式&#xff0c;可…

SourceInsight - Relation Windows

磨刀不误砍柴工&#xff0c;你使用的工具决定了你的下限。我平时使用较多的代码编辑工具就是SourceInsight&#xff0c;这个工具速度快&#xff0c;操作方便&#xff0c;但处理非常大的项目的性能不是很理想&#xff0c;比如你要是添加整个Linux Kernel的源代码的话。 在使用SI…

微信小程序本地和真机调试文件上传成功但体验版不成功

可能是微信小程序ip白名单的问题&#xff0c;去微信公众平台&#xff08;小程序&#xff09;上设置小程序的ip白名单 1、在本地中取消不校验 然后在本地去上传文件&#xff0c;就会发现控制台报错了&#xff0c;会提示一个https什么不在ip白名单&#xff0c;复制那个网址 2、…

rabbitMQ对优先级队列的使用

注意事项&#xff1a; 1.队列设置优先级 权制范围&#xff08;0-255&#xff09;推荐0-10 否则浪费CPU与内存 2.发消息时给消息设置优先级 3.消息需要完全事先在队列中&#xff0c;在被消费者消费 会被排序&#xff0c;否则边生产边消费不会达到预期的队列优先效果。 优先级队列…

yolov4、yolov5优化策略

一、yolov4优化策略 1.Mosaic data augmentation&#xff1a;四张图像拼接成一张进行训练&#xff0c;现在一个batch相当于以前4个batch。 2.Random Erase&#xff1a;用随机值或训练集的平均像素值替换图像的区域。 3.Self-adversarial-training(SAT)&#xff1a;引入噪音点…

PTA-6-45 工厂设计模式-运输工具

题目如下&#xff1a; 工厂类用于根据客户提交的需求生产产品&#xff08;火车、汽车或拖拉机&#xff09;。火车类有两个子类属性&#xff1a;车次和节数。拖拉机类有1个子类方法耕地&#xff0c;方法只需简单输出“拖拉机在耕地”。为了简化程序设计&#xff0c;所有…

python获取json所有节点和子节点

使用python获取json的所有父结点和子节点 并使用父节点加下划线命名子节点 先展示一段json代码 {"level1": {"level2": {"level3": [{"level4": "4value"},{"level4_2": "4_2value"}]},"level2_…

安卓底部导航栏BottomNavigationView

目录 1. BottomNavigationView (1) 准备BottomNavigationView使用的菜单资源文件 (2) 准备颜色选择器 (3) BottomNavigationView控件设置 (4) 在Java代码中设置OnItemSelectedListener监听器 (5) 与Fragment配合 (6) 标签显示模式 (7) 水波纹特效 (8) 文本外观 2. Bo…