SpringCloud之Stream框架集成RocketMQ消息中间件

        Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

     目前 Spring Cloud Stream只支持 RabbitMQ 和 Kafka 的自动化配置。

     Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.

在 SpringCloudStream 3.x 版本前是通过 @StreamListener 和 @EnableBinding 进行消息的发送和消费的,springCloudStream 3.x 版本后 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解,不建议使用了;后续的版本更新中替换成函数式的方式实现。

既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?

通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic。

不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:

  • 输入 - + -in- + < index >

     例如:myTopic-in-0

  • 输出 - + -out- + < index >

       例如:myTopic-out-0

注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致(后面还会在项目实战中展示一遍)

代码示例:

----------------------------------项目实战--------------------------------------

看下我们项目中的配置,配置文件是放在nacos上面的:

消息发送:

/*** @ClassName MessageParamParentDto* @Author zxd* @Version 1.0.0* @Description TODO* @CreateTime 2023/6/13 11:27 - 星期二*/
@Data
public class MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7963819193258646924L;private  String routeUrl;}

--------------------------------------------------------------------------------------------------------------

/*** @ClassName MessageParamDto* @Author kch* @Version 1.0.0* @Description 消息队列接收系统消息实体对象* @CreateTime 2022/9/18 15:16 - 星期日*/
@Data
public class MessageParamDto  extends MessageParamParentDto implements Serializable {private static final long serialVersionUID = 7111819193258646924L;/*** 消息模板code*/@NotNull(message = "消息模板不能为空")private String templateCode;/*** 可变参数,必传字段* 该参数匹配模板字符串中的变量和URL中的变量,所以模板和URL中的变量名不能重复*/@NotNull(message = "参数不能为空")private Map<String, String> params;/*** 消息详情跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String routerParams;/*** 消息操作跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)* 例如:userId=1&userCode=test*/
//    private String contentPathParams;/*** 接收者租户*/@NotNull(message = "接收者租户ID不能为空")private Long tenantId;/*** 接收人*/@NotNull(message = "接收者用户ID不能为空")@Size(min = 1, message = "接收者用户ID不能为空")private List<RecipientUser> recipientUsers;@Valid@Data@AllArgsConstructor@NoArgsConstructorpublic static class RecipientUser implements Serializable {/*** 接收人id*/@NotNull(message = "接收者用户ID不能为空")private Long recipientId;/*** 接收人手机号*/@Pattern(regexp = RegexPool.MOBILE, message = "手机格式错误")private String phone;}}

-----------------------------------------------------------------------------------------------------------

/*** @ClassName MessageMqBinding* @Author zpp* @Version 1.0.0* @Description TODO* @CreateTime 2023/2/10 15:37 - 星期五*/
public interface MessageMqBinding {/*** 系统消息生产者交换机*/String MESSAGE_MQ_OUTPUT = "dyzsMessageProvider-out-0";
}

----------------------------------------------------------------------------------------

@Slf4j
@RestController
@RequestMapping("/mq")
public class MessageMqController {@Resourceprivate StreamBridge streamBridge;/*** @param :* @Author zpp* @Description 发送系统消息* @Date 2023/2/10 15:27* @Return com.zysy.common.api.entity.Result<java.lang.Boolean>*/@PostMappingpublic Result<Boolean> sendMessage(@RequestBody @Validated MessageParamDto dto) {log.info("接收到系统消息发送请求:{}", JSONObject.toJSONString(dto));MessageMQParamDto paramDto = new MessageMQParamDto(dto);paramDto.setCreateBy(UserUtil.getUserId());paramDto.setCreateDept(UserUtil.getDeptId());List<MessageMQParamDto> paramDtoList = new ArrayList<>();paramDtoList.add(paramDto);MessageBuilder builder = MessageBuilder.withPayload(paramDtoList).setHeader("Content-Type", "application/json");return Result.success(streamBridge.send(MessageMqBinding.MESSAGE_MQ_OUTPUT, builder.build()));}

------------------------------------------------------------------------------------------------------

消息消费:

          下图是在代码中配置的消息消费者,这里的函数名称要和上图中的function.definition配置的名称一样;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/156020.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】文件的操作与文件函数的使用(详细讲解)

前言&#xff1a;我们在学习C语言的时候会发现在编写一个程序的时候&#xff0c;数据是存在内存当中的&#xff0c;而当我们退出这个程序的时候会发现这个数据不复存在了&#xff0c;因此我们可以通过文件把数据记录下来&#xff0c;使用文件我们可以将数据直接存放在电脑的硬盘…

进来了解实现官网搜索引擎的三种方法

做网站的目的是对自己的品牌进行推广&#xff0c;让越来越多的人知道自己的产品&#xff0c;但是如果只是做了一个网站放着&#xff0c;然后等着生意找上门来那是不可能的。在当今数字时代&#xff0c;实现官网搜索引擎对于提升用户体验和推动整体性能至关重要。搜索引擎可以帮…

【C#】什么是并发,C#常规解决高并发的基本方法

给自己一个目标&#xff0c;然后坚持一段时间&#xff0c;总会有收获和感悟&#xff01; 在实际项目开发中&#xff0c;多少都会遇到高并发的情况&#xff0c;有可能是网络问题&#xff0c;连续点击鼠标无反应快速发起了N多次调用接口&#xff0c; 导致极短时间内重复调用了多次…

互联网性能和可用性优化CDN和DNS

当涉及到互联网性能和可用性优化时&#xff0c;DNS&#xff08;Domain Name System&#xff09;和CDN&#xff08;Content Delivery Network&#xff09;是两个至关重要的元素。它们各自发挥着关键作用&#xff0c;以确保用户能够快速、可靠地访问网站和应用程序。在本文中&…

第一个 Python 程序

三、第一个 Python 程序 好了&#xff0c;说了那么多&#xff0c;现在我们可以来写一下第一个 Python 程序了。 一开始写 Python 程序&#xff0c;个人不太建议用专门的工具来写&#xff0c;不方便熟悉语法&#xff0c;所以这里我先用 Sublime Text 来写&#xff0c;后期可以…

如何在会计面试中展现自己的优势?

在会计面试中展现自己的优势是非常重要的&#xff0c;因为这将决定你是否能够脱颖而出并获得这个职位。下面是一些可以帮助你展示自己优势的方法&#xff1a; 1. 准备充分&#xff1a;在面试前&#xff0c;确保你对公司的背景和业务有所了解。研究公司的财务报告和新闻&#xf…

智能井盖传感器:提升城市安全与便利的利器

在智能化城市建设的浪潮中&#xff0c;WITBEE万宾智能井盖传感器&#xff0c;正以其卓越的性能和创新的科技&#xff0c;吸引着越来越多的关注。本文小编将为大家详细介绍这款产品的独特优势和广阔应用前景。 在我们生活的城市中&#xff0c;井盖可能是一个最不起眼的存在。然而…

ctfshow萌新计划web9-14(正则匹配绕过)

目录 web9 web10 web11 web12 web13 web14 web9 审一下代码&#xff0c;需要匹配到system|exec|highlight才会执行eval函数 先看一下当前目录下有什么 payload&#xff1a;?csystem(ls); index.php是首页&#xff0c;我们看看config.php payload&#xff1a;?csystem…

Android笔记(二):JetPack Compose定义移动界面概述

一、JetPack Compose组件概述 JetPack Compose是Google公司在2021年正式推出的声明式UI工具包。Compose库用于开发原生Android应用界面。它取代传统XML文件配置界面&#xff0c;不需要界面编辑工具&#xff0c;而是采用强大Kotlin API以及函数搭建移动应用界面&#xff0c;代码…

nextjs构建服务端渲染,同时使用Material UI进行项目配置

一、创建一个next项目 使用create-next-app来启动一个新的Next.js应用&#xff0c;它会自动为你设置好一切 运行命令: npx create-next-applatest 执行结果如下&#xff1a; 启动项目&#xff1a; pnpm dev 执行结果&#xff1a; 启动成功&#xff01; 二、安装Mater…

课题学习(六)----安装误差校准、实验方法

一、 安装误差校准 1.1 数学模型 在实际情况下&#xff0c;即使努力尝试使三轴加速度计和三轴磁通门正交&#xff0c;也不可能保证坐标轴的正交和安装的准确居中。无论采用何种解法&#xff0c;都会导致最终解的误差。因此&#xff0c;要想提高测量精度&#xff0c;就必须开发…

验收测试的关键步骤您知道吗

验收测试是软件开发生命周期中的重要环节&#xff0c;用于验证项目交付是否符合用户需求和质量标准。本文将介绍验收测试的定义及实施验收测试的关键步骤。 一、验收测试的定义和目标 确保项目交付质量&#xff1a;通过主动验证和评估软件系统的功能、性能和质量&#xff0c;确…

轻量级虚拟化技术草稿

Support Tech ST.1 virtiofs ST.1.1 fuse framework 引用wiki中关于fuse的定义&#xff1a; Filesystem in Userspace (FUSE) is a software interface for Unix and Unix-like computer operating systems that lets non-privileged users create their own file systems w…

Java中在循环体内拼接字符串时为什么使用StringBuilder而不是String

在循环体内拼接字符串时为什么使用StringBuilder而不是String 在《阿里巴巴Java开发手册》一书中提到了&#xff1a; 循环体内&#xff0c;字符串的连接方式&#xff0c;请使用 StringBuilder 的 append 方法进行扩展。&#xff08;而不要用String的方式&#xff09; 说明&…

31 数据分析(中)numpy介绍

文章目录 工具excelTableauPower Queryjupytermatplotlibnumpy安装导入包快速掌握&#xff08;bushi&#xff09;array和list的相互转化 np的range多维数组的属性array的改变形状array升降维度array内元素的类型数和array的运算array之间的加减法认识轴切片条件与逻辑修改值app…

系统韧性研究(1)| 何谓「系统韧性」?

过去十年&#xff0c;系统韧性作为一个关键问题被广泛讨论&#xff0c;在数据中心和云计算方面尤甚&#xff0c;同时它对赛博物理系统也至关重要&#xff0c;尽管该术语在该领域不太常用。大伙都希望自己的系统具有韧性&#xff0c;但这到底意味着什么&#xff1f;韧性与其他质…

气象台卫星监测vr交互教学增强学生的学习兴趣和动力

对地观测是以地球为研究对象&#xff0c;依托卫星、飞船等光电仪器&#xff0c;进行各种探测活动&#xff0c;其核心是遥感技术&#xff0c;因此为了让遥感专业学员能提前熟悉对地观测规则、流程、方法及注意事项&#xff0c;借助VR虚拟现实制作的三维仿真场景&#xff0c;能让…

【PX4】解决Resource not found: px4问题【踩坑实录】

【PX4】解决Resource not found: px4问题【踩坑实录】 文章目录 【PX4】解决Resource not found: px4问题【踩坑实录】1. 问题描述2. 错误排查 1. 问题描述 笔者在配置好px4的所有环境后&#xff0c;使用自己写的launch文件时&#xff0c;出现了报错 sjhsjhR9000X:~$ roslaunc…

spring 注入 当有两个参数的时候 接上面

新加一个int 型的 age 记得写getset方法和构造方法 &#xff08;&#xff08;&#xff08;&#xff08;&#xff08;&#xff08;&#xff08; 构造方法的作用——无论是有参构造还是无参构造&#xff0c;他的作用都是为了方便为对象的属性初始化值 构造方法是一种特殊的方…

【C++14算法】make_unique

文章目录 前言一、make_unique函数1.1 什么是make_unique?1.2 如何使用make_unique?1.3 make_unique的函数原型如下&#xff1a;1.4 示例代码示例1: 创建一个动态分配的整数对象示例2: 创建一个动态分配的自定义类型对象示例3: 创建一个动态分配的数组对象示例4: 创建一个动态…