SpringCloudStream原理和深入使用

简单概述

Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。

应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:Spring Cloud Stream能够屏蔽底层消息中间件【RabbitMQ,kafka等】的差异,降低切换成本,统一消息的编程模型

相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  • Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息

  • Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

**消费者组:**在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

Spring Message

Spring Message是Spring Framework的一个模块,其作用就是统一消息的编程模型。

package org.springframework.messaging;public interface Message<T> {T getPayload();MessageHeaders getHeaders();
}

消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}boolean send(Message<?> message, long timeout);}

消息通道里的消息由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅

public interface SubscribableChannel extends MessageChannel {boolean subscribe(MessageHandler handler);boolean unsubscribe(MessageHandler handler);}

MessageHandler真正地消费/处理消息

@FunctionalInterface
public interface MessageHandler {void handleMessage(Message<?> message) throws MessagingException;
}

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

Spring-Cloud-Stream的架构

img

快速入门

引入依赖

        <!--stream--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

增加配置文件

spring:cloud:stream:# 定义消息中间件binders:MyRabbit:type: rabbitenvironment:spring:rabbitmq:host: localhostport: 5672username: rootpassword: rootvhost: /bindings:# 生产者中定义,定义发布对象myInput:destination: myStreamExchangegroup: myStreamGroupbinder: MyRabbit# 消费者中定义,定义订阅的对象myOutput-in-0:destination: myStreamExchangegroup: myStreamGroupbinder: MyRabbit# 消费者中定义,定义输出的函数function:definition: myOutput

生产者

	@Resourceprivate StreamBridge streamBridge;public void sendNormal() {streamBridge.send("myInput", "hello world");}

消费者

	@Bean("myOutput")public Consumer<Message<String>> myOutput() {return (message) -> {MessageHeaders headers = message.getHeaders();System.out.println("myOutput head is : " + headers);String payload = message.getPayload();System.out.println("myOutput payload is : " + payload);};}

如何自定义Binder

  1. 添加spring-cloud-stream依赖
  2. 提供ProvisioningProvider的实现
  3. 提供MessageProducer的实现
  4. 提供MessageHandler的实现
  5. 提供Binder的实现
  6. 创建Binder的配置
  7. META-INF/spring.binders中定义绑定器

添加spring-cloud-stream依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId><version>${spring.cloud.stream.version}</version>
</dependency>

提供ProvisioningProvider的实现

ProvisioningProvider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

public class FileProvisioningProvider implements ProvisioningProvider<ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> {public FileProvisioningProvider() {super();}@Overridepublic ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException {return new FileMessageDestination(name);}@Overridepublic ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException {return new FileMessageDestination(name);}private static class FileMessageDestination implements ProducerDestination, ConsumerDestination {private final String destination;private FileMessageDestination(final String destination) {this.destination = destination;}@Overridepublic String getName() {return destination.trim();}@Overridepublic String getNameForPartition(int partition) {throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");}}}

提供MessageProducer的实现

MessageProducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。

		super.onInit();executorService = Executors.newScheduledThreadPool(1);}@Overridepublic void doStart() {executorService.scheduleWithFixedDelay(() -> {String payload = getPayload();if (payload != null) {Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();sendMessage(receivedMessage);}}, 0, 50, TimeUnit.MILLISECONDS);}@Overrideprotected void doStop() {executorService.shutdownNow();}private String getPayload() {try {List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));String currentPayload = allLines.get(allLines.size() - 1);if (!currentPayload.equals(previousPayload)) {previousPayload = currentPayload;return currentPayload;}} catch (IOException e) {FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));}return null;}
}

提供MessageHandler的实现

MessageHandler提供产生事件所需的逻辑。

public class FileMessageHandler extends AbstractMessageHandler {FileExtendedBindingProperties fileExtendedBindingProperties;ProducerDestination destination;public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) {this.destination = destination;this.fileExtendedBindingProperties = fileExtendedBindingProperties;}@Overrideprotected void handleMessageInternal(Message<?> message) {try {if (message.getPayload() instanceof byte[]) {Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload());} else {throw new RuntimeException("处理消息失败");}} catch (IOException e) {throw new RuntimeException(e);}}
}

提供Binder的实现

提供自己的Binder抽象实现:

  • 扩展AbstractMessageChannelBinder
  • 将自定义的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的通用参数
  • 重写createProducerMessageHandlercreateConsumerEndpoint方法
public class FileMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider>implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> {FileExtendedBindingProperties fileExtendedBindingProperties;public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) {super(headersToEmbed, provisioningProvider);this.fileExtendedBindingProperties = fileExtendedBindingProperties;}@Overrideprotected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties);return fileMessageHandler;}@Overrideprotected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception {FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties);return fileMessageProducerAdapter;}@Overridepublic FileConsumerProperties getExtendedConsumerProperties(String channelName) {return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName);}@Overridepublic FileProducerProperties getExtendedProducerProperties(String channelName) {return fileExtendedBindingProperties.getExtendedProducerProperties(channelName);}@Overridepublic String getDefaultsPrefix() {return fileExtendedBindingProperties.getDefaultsPrefix();}@Overridepublic Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {return fileExtendedBindingProperties.getExtendedPropertiesEntryClass();}
}

创建Binder的配置

严格要求创建一个 Spring 配置来初始化你的绑定器实现的 bean

@EnableConfigurationProperties(FileExtendedBindingProperties.class)
@Configuration
public class FileMessageBinderConfiguration {@Bean@ConditionalOnMissingBeanpublic FileProvisioningProvider fileMessageBinderProvisioner() {return new FileProvisioningProvider();}@Bean@ConditionalOnMissingBeanpublic FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) {return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties);}@Beanpublic FileProducerProperties fileConsumerProperties() {return new FileProducerProperties();}
}

详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/352645.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索Web Components

title: 探索Web Components date: 2024/6/16 updated: 2024/6/16 author: cmdragon excerpt: 这篇文章介绍了Web Components技术&#xff0c;它允许开发者创建可复用、封装良好的自定义HTML元素&#xff0c;并直接在浏览器中运行&#xff0c;无需依赖外部库。通过组合HTML模…

多尺度特征提取:原理、应用与挑战

多尺度 多尺度特征提取&#xff1a;原理、应用与挑战**原理****应用****挑战****总结** 多尺度特征提取&#xff1a;原理、应用与挑战 在计算机视觉、自然语言处理和信号处理等领域&#xff0c;有效地捕捉和解析数据的多种尺度特性是至关重要的。多尺度特征提取是一种技术&…

【机器学习】智能创意工厂:机器学习驱动的AIGC,打造未来内容新生态

&#x1f680;时空传送门 &#x1f50d;机器学习在AIGC中的核心技术&#x1f4d5;深度学习&#x1f388;生成对抗网络&#xff08;GANs&#xff09; &#x1f680;机器学习在AIGC中的具体应用&#x1f340;图像生成与编辑⭐文本生成与对话系统&#x1f320;音频生成与语音合成 …

SpringMVC01-初始SpringMVC

SpringMVC 回顾MVC 什么是MVC MVC是模型(Model)、视图(View)、控制器(Controller)的简写&#xff0c;是一种软件设计规范。是将业务逻辑、数据、显示分离的方法来组织代码。MVC主要作用是降低了视图与业务逻辑间的双向偶合。MVC不是一种设计模式&#xff0c;MVC是一种架构模…

高通Android 12 右边导航栏改成底部显示

最近同事说需要修改右边导航栏到底部&#xff0c;问怎么搞&#xff1f;然后看下源码尝试下。 1、Android 12修改代码路径 frameworks/base/services/core/java/com/android/server/wm/DisplayPolicy.java a/frameworks/base/services/core/java/com/android/server/wm/Display…

【LeetCode:2786. 访问数组中的位置使分数最大 + 递归 + 记忆化缓存 + dp】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

电感的本质是什么

什么是电感&#xff1f; 电感器件一般是指螺线圈&#xff0c;由导线圈一圈靠一圈地绕在绝缘管上&#xff0c;绝缘管可以是空心的&#xff0c;也可以包含铁芯或磁粉芯。 为什么把’线’绕成’圈’就是电感&#xff1f; 电感的工作原理非常抽象&#xff0c;为了解释什么是电感…

IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照(snapshot)JAR 包

IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照&#xff08;snapshot&#xff09;JAR 包 目录 IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照&#xff08;snapshot&#xff09;JAR 包1. 检查 settings.xml2. IDEA Maven 配置3. 强制更新 Snapshot4. 使用…

使用 C# 学习面向对象编程:第 8 部分

抽象方法 亲爱的读者&#xff0c;本文是 OOP 的第四大支柱&#xff0c;也是最后一大支柱。对于 OOP 初学者来说&#xff0c;这很容易让人困惑。因此&#xff0c;我们用非常简单的语言提供了一个示例。 “抽象用于管理复杂性。无法创建抽象类的对象。抽象类用于继承。” 例如…

端口映射工具下载?

天联是一款强大的端口映射工具&#xff0c;它能够帮助用户实现远程数据采集管理、异地统一管理、随时随地协同办公等多种场景的应用。无论您是医药、餐饮、商超等零售行业的企业&#xff0c;还是需要使用OA、CRM、ERP、财务进销存等系统的企业&#xff0c;甚至是使用视频监控设…

Python自动化测试面试题精选(一)

今天大家介绍一些Python自动化测试中常见的面试题&#xff0c;涵盖了Python基础、测试框架、测试工具、测试方法等方面的内容&#xff0c;希望能够帮助你提升自己的水平和信心。 项目相关 什么项目适合做自动化测试&#xff1f; 答&#xff1a;一般来说&#xff0c;适合做自…

前端菜鸡流水账日记 -- git管理工具(多版本)

哈喽哇&#xff0c;我又又又来了&#xff0c;其实之前就挺想进行一篇关于git管理工具的分享的&#xff0c;但是一直都没有来的及&#xff0c;直到今天&#xff0c;在学习的时候&#xff0c;&#xff0c;一个朋友新发现了一个vscode中的小插件&#xff0c;所以我就决定一起来分享…

Github入门教程,适合新手学习(非常详细)

前言&#xff1a;本篇博客为手把手教学的 Github 代码管理教程&#xff0c;属于新手入门级别的难度。教程简单易操作&#xff0c;能够基本满足读者朋友日常项目寄托于 Github 平台上进行代码管理的需求。Git 与 Github 是一名合格程序员 coder 必定会接触到的工具与平台&#x…

React+TS前台项目实战(十)-- 全局常用组件CopyText封装

文章目录 前言CopyText组件1. 功能分析2. 代码详细注释3. 使用方式4. 效果展示 总结 前言 今天这篇主要讲项目常用复制文本组件封装&#xff0c;这个组件是一个用于拷贝文本的 React 组件&#xff0c;它提供了拷贝&#xff0c;国际化和消息提示的功能 CopyText组件 1. 功能分…

linux远程访问及控制

补充&#xff1a; 终端&#xff1a;接收用户的指令 TTY终端 虚拟终端 ssh:22端口号&#xff0c;加密。 telnet&#xff1a;23端口号&#xff0c;不加密。 解释器&#xff1a;shell SSH 远程管理 SSH&#xff08;Secure Shell&#xff09;是一种安全通道协议&#xff0c…

012.指纹浏览器编译-修改canvas指纹(高级)

指纹浏览器编译-修改canvas指纹(高级) 一、canvas指纹是什么 之前介绍过canvas指纹和常见网站绕过canvas指纹&#xff0c;插眼&#xff1a; https://blog.csdn.net/w1101662433/article/details/137959179 二、为啥有更高级的canvas指纹 众所周知&#xff0c;creepjs和brow…

利用CUDA加速卷积计算:原理、实践与示例代码

利用CUDA加速卷积计算:原理、实践与示例代码 在深度学习领域,卷积神经网络(Convolutional Neural Networks,CNN)是目前最流行和有效的模型之一。然而,随着模型复杂度的增加,卷积计算的计算量也随之增加,这使得在CPU上进行卷积计算变得非常耗时。因此,利用CUDA加速卷积…

我在高职教STM32——GPIO入门之蜂鸣器

大家好&#xff0c;我是老耿&#xff0c;高职青椒一枚&#xff0c;一直从事单片机、嵌入式、物联网等课程的教学。对于高职的学生层次&#xff0c;同行应该都懂的&#xff0c;老师在课堂上教学几乎是没什么成就感的。正因如此&#xff0c;才有了借助 CSDN 平台寻求认同感和成就…

6.17 作业

使用qt实现优化自己的登录界面 要求&#xff1a; 1. qss实现 2. 需要有图层的叠加 &#xff08;QFrame&#xff09; 3. 设置纯净窗口后&#xff0c;有关闭等窗口功能。 4. 如果账号密码正确&#xff0c;则实现登录界面关闭&#xff0c;另一个应用界面显示。 第一个源文件 …

做材料科学领域研究热点:高通量多尺度材料计算和机器学习

研究背景 材料科学是一个重要领域&#xff0c;涉及物质的研究和利用。随着科技进步&#xff0c;材料学已成为多学科交叉的前沿领域之一&#xff0c;融合物理、化学、数学、信息、力学和计算科学等知识。寻找更坚固的新材料已成为当今急需解决的问题。 材料基因工程作为一项颠覆…