玩转SpringCloud Stream

背景及痛点

现如今消息中间件(MQ)在互联网项目中被广泛的应用,特别是大数据行业应用的特别的多,现在市面上也流行这多个消息中间件框架,比如ActiveMQRabbitMQRocketMQKafka等,这些消息中间件各有各的优劣,但是想要解决的问题都基本相同。由于每个框架都有它自己的使用方式,这无疑是增加了开发者的学习成本以及添加相同的业务复杂度。框架的变更或者多个中间件的混合使用使得业务逻辑代码中中间件的切换、项目的维护和开发都会变得更加繁琐。

有没有一种技术让我们不再需要关注具体MQ的使用细节,我们只需要专注业务逻辑的开发,让程序根据实际项目的使用自己去适配绑定,自动在各种MQ内切换呢?springcloud stream便为此而生。

关于stream

我们用一句话来描述stream就是:屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型

官方定义SpringCloud Stream是一个构建消息驱动微服务的框架,应用通过inputs或者outputs来与SpringCloud Stream中的binder对象交互,我们通过配置来绑定消息中间件,而SpringCloud Streambinder对象负责与消息中间件交互,所以我们只需要搞清楚如何与SpringCloud Stream交互即可方便的使用消息中间件。

SpringCloud Stream通过Spring Integration来连接消息代理中间件以实现消息事件驱动,它提供了个性化的自动化配置,引用了发布订阅消费组分区的三个核心概念,但是目前仅支持RabbitMQKafka

设计思想

在此之前

以前的架构

生产者和消费者通过消息媒介(queue等)传递信息内容(Message),消息必须通过特定的通道(MessageChannel),通过消息的发布与订阅来决定消息的发送和消费(publish/subscrib)。

引入中间件

现在假如我们用到了RabbitMQKafka,由于这两个消息中间件的架构上的不同,像RabbitMQExchange,而KafkatopichePartitions分区

引入中间件之后

(binder中,input对于消费者,output对应生产者。)

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,但是后面因为业务需求,需要改用另外一种消息队列进行迁移,这时候无疑就是一 个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

屏蔽底层差异

在没有绑定器(Builder)这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间件,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

处理架构

Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

处理架构

其遵循了发布-订阅模式,主要使用的就是Topic主题进行广播,RabbitMQ就是Exchange,在Kafka中就是Topic

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

stream流程

stream流程
  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入

常用api和注解

常用api和注解

使用示例

基本环境

  • 注册中心:Eureka,可以是其他。

  • 消息中间件:RabbitMQ

    rabbitmq:host: localhostport: 5672username: guestpassword: guest
    

生产端

依赖
 <!--stream rabbit -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:port: 8801spring:application:name: cloud-stream-providercloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit  # 消息组件的类型environment:  #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output:  # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json  # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbiteureka:client:#表示是否将自己注册进EurekaServer默认为trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#单机版defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: trueinstance-id: sender01
定义接口

这里需要定义一个接口并实现它,方便其他业务调用。

public interface IMessageProvider {/*** 发送接口* @param msg* @return*/public String send(String msg);
}
接口实现

接口实现中需要添加@EnableBinding注解,并引入Source.class,为什么引入Source.class呢?因为它是生产者,我们参考stream流程图就可以知道

import com.martain.study.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder; 
import javax.annotation.Resource;@EnableBinding(Source.class)
public class MessageProvider implements IMessageProvider {/*** 注入消息发送管道*/@Resourceprivate MessageChannel output;@Overridepublic String send(String msg) {output.send(MessageBuilder.withPayload(msg).build());System.out.println("******send message:"+msg);return msg;}
}
定义测试controller

@RestController
public class TestController {@AutowiredIMessageProvider messageProvider;@GetMapping("/sendMsg")public String sendMsg(){String msg = UUID.randomUUID().toString();return messageProvider.send(msg);}}
启动类
@SpringBootApplication
public class StreamProviderApplication8801 {public static void main(String[] args) {SpringApplication.run(StreamProviderApplication8801.class,args);}
} 

服务启动之后,多次请求/sendMsg,发送了多条消息。

生产服务生产消息

消费端

依赖
   <!--stream rabbit --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--eureka client--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
配置文件

与生产者类似,只是bindings中的output改成了input

server:port: 8802
spring:application:name: cloud-stream-consumecloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit  # 消息组件的类型environment:  #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input:  # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json  # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbiteureka:client:#表示是否将自己注册进EurekaServer默认为trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#单机版defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: trueinstance-id: recover01
接收服务

接收服务只需要再类名前添加@EnableBinding()注解,并引入Sink.class类,而实际接收的方法中需要添加@StreamListener(Sink.INPUT)注解。

package com.martain.study.controller; 
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; @Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {/*** 获取本服务的端口*/@Value("${server.port}")private String serverPort;/*** 这里表示监听sink的input* @param message*/@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("**** recv msg :"+message.getPayload()+"   in port "+serverPort);}
}
启动类
@SpringBootApplication
public class StreamConsumerApplication8802 {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication8802.class,args);}
}

启动生产服务后,在启动消费服务,多次请求生产服务发送消息,我们可以发现消费者能很快的消费这些消息。

消费者消费消息

消息分组

当我们有多个消费者时,这个时候生产者生产一条消息,会发现所有的消费者都会消费这个消息。比如在一些订单系统的场景中,如果一个订单被多个处理服务一起获取到,就容易造成数据错误,那我们如何避免这种情况呢?这时我们就可以使用Stream的消息分组来解决重复消费问题。

如何实现Stream的消息分组呢?我们只要简单的在yml文件中配置spring.cloud.stream.bindings.input.group即可。示例如下:

...
spring:application:name: cloud-stream-consumecloud:# stream 配置stream:binders: # 配置绑定的消息中间件的服务信息defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定type: rabbit  # 消息组件的类型environment:  #相关环境配置,设置rabbitmq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input:  # 通道名称destination: testExchange # 定义要使用的Exchange的名称content-type: application/json  # 设置消息类型,对象为json,文本是text/plainbinder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbitgroup: groupA # 配置分组...

如果没有设置该属性,当消费服务启动时,会有个随机的组名

如果我们将所有的消费服务的group熟悉都设置成一致的话,这些服务就会在同一个组里面,从而能够保证消息只被应用消费一次。

同一组的消费者是竞争关系,不可以重复消费。

消息持久化

当生产者在持续生产消息,消费服务突然挂了,使得拥有许多消息并没有被消费,如果消费没有配置分组的话,消费服务重启是无法消费未消费的消息的,如果配置了分组的话,当消费服务重启之后可以自动去消费未消费的数据。



喜欢的朋友记得点赞、收藏、关注哦!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/20803.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决 Mac 只显示文件大小,不显示目录大小

前言 在使用 mac 的时候总是只显示文件的大小&#xff0c;不显示文件夹的大小&#xff0c;为了解决问题可以开启“计算文件夹”。 步骤 1.进入访达 2.工具栏点击“显示”选项&#xff0c;点击 “查看显示选项” 3.勾选 显示“资源库"文件夹 和 计算所有大小 或者点击…

STM32 定时器产生定周期方法

目录 背景 程序 第一步、使能PCLK1外设时钟​编辑 第二步、时基单元配置 第三步、配置NVIC&#xff08;设置定时中断优先级&#xff09;​编辑 第四步、使能溢出中断 第五步、使能定时器 第六步、填写中断处理函数&#xff08;ISR&#xff09; 背景 在单片机开发当中&…

【DeepSeek系列】04 DeepSeek-R1:带有冷启动的强化学习

文章目录 1、简介2、主要改进点3、两个重要观点4、四阶段后训练详细步骤4.1 冷启动4.2 推理导向的强化学习4.3 拒绝采样和有监督微调4.4 针对所有场景的强化学习 5、蒸馏与强化学习对比6、评估6.1 DeepSeek-R1 评估6.2 蒸馏模型评估 7、结论8、局限性与未来方向 1、简介 DeepS…

Compose常用UI组件

Compose常用UI组件 概述Modifier 修饰符常用Modifier修饰符作用域限定Modifier Modifier 实现原理Modifier.Element链的构建链的解析 常用基础组件文字组件图片组件按钮组件选择器对话框进度条 常用布局组件线性布局帧布局 列表组件 概述 Compose 预置了很多基础组件&#xff…

Ansys EMC Plus:HIRF 与飞机耦合演示

在本篇博文中&#xff0c;我们将深入探讨 EMC Plus 高强度辐射场 (HIRF) 与软件示例中提供的飞机演示的耦合。本概述将指导您完成整个工作流程&#xff0c;从设置问题空间到基本后处理&#xff0c;包括材料属性分配和创建探针。 概述 在本演示中&#xff0c;下图所示的预先简化…

DeepSeek + Mermaid编辑器——常规绘图

下面这张图出自&#xff1a;由清华大学出品的 《DeepSeek&#xff1a;从入门到精通》。 作为纯文本生成模型&#xff0c;DeepSeek虽不具备多媒体内容生成接口&#xff0c;但其开放式架构允许通过API接口与图像合成引擎、数据可视化工具等第三方系统进行协同工作&#xff0c;最终…

红蓝对抗之常见网络安全事件研判、了解网络安全设备、Webshell入侵检测

文章目录 ​​研判&#xff08;入侵检测&#xff09;​​ ​​设备​​ ​​经典网络​​​​云网络​​ ​​异常HTTP请求​​​​Webshell分析​​ ​​Webshell 的分类​​​​Webshell 的检测​​ ​​主机层面​​​​流量层面​​ ​​附录​​ ​​常见端口漏洞…

基于levmar(Levenberg-Marquardt 非线性最小二乘优化库)的椭圆拟合

1. 包含必要的头文件 #include <opencv2/core.hpp> #include <opencv2/imgproc.hpp> #include <opencv2/highgui.hpp> #include <vector> #include <cmath>2. 定义生成椭圆点的函数 编写一个函数&#xff0c;接受椭圆的中心坐标、长轴半径、短…

Fastgpt学习(5)- FastGPT 私有化部署问题解决

1.☺ 问题描述&#xff1a; Windows系统&#xff0c;本地私有化部署&#xff0c;postgresql数据库镜像日志持续报错" data directory “/var/lib/postgresql/data” has invalid permissions "&#xff0c;“ DETAIL: Permissions should be urwx (0700) or urwx,gr…

基于SpringBoot+vue粮油商城小程序系统

粮油商城小程序为用户提供方便快捷的在线购物体验&#xff0c;包括大米、面粉、食用油、调味品等各种粮油产品的选购&#xff0c;用户可以浏览商品详情、对比价格、下单支付等操作。同时&#xff0c;商城还提供优惠活动、积分兑换等福利&#xff0c;让用户享受到更多实惠和便利…

Python编程之数据分组

有哪些方式可以进行数据分组利用Pandas库进行分组使用itertools库的groupby分组操作构建Python字典方式实现(小规模数据,不适用数量特别大的情况,不需要依赖其它python库)利用NumPy的groupby函数分组操作利用Python的Dask库提供的函数进行分组下面看一个如何去实现坐标数据…

【Linux】认识协议、Mac/IP地址和端口号、网络字节序、socket套接字

⭐️个人主页&#xff1a;小羊 ⭐️所属专栏&#xff1a;Linux 很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~ 目录 1、初识协议2、Mac、IP地址3、端口号4、网络字节序5、socket 1、初识协议 协议就是一种约定。如何让不同厂商生产的计算机之间能…

ubuntu 安装docker

ubuntu 安装docker 官网地址 https://docs.docker.com/engine/install/ubuntu/ 尽量根据官网的来&#xff0c;网上找的很多都是一大堆各种报错 卸载旧版本 新机器不需要操作 卸载的非官方包是&#xff1a; docker.iodocker-composedocker-compose-v2docker-docpodman-docker…

环境变量2

目录 环境变量PATH 如何改变PATH 我们今天继续来学习环境变量2&#xff01;&#xff01;&#xff01; 环境变量PATH PATH的作用是知道命令的搜索路径&#xff0c;我们都知道Linux上的命令行指令&#xff0c;ll&#xff0c;pwd什么的为什么我们写出来系统就知道是什么并且运…

网络安全中的机器学习

当涉及到网络安全时&#xff0c;技术一直是保护系统免受攻击和数据泄露的关键。在这篇论文中&#xff0c;我将介绍一些当前在网络安全领域使用的关键技术&#xff0c;包括加密&#xff0c;身份验证和防火墙。 首先&#xff0c;加密是网络安全中最常见的技术之一。加密是指使用算…

sass报错:[sass] Undefined variable. @import升级@use语法注意事项

今天创建vue3项目&#xff0c;迁移老项目代码&#xff0c;使用sass的时候发现import语法已经废弃&#xff0c;官方推荐使用use替换。 这里我踩了一个坑找半天的问题&#xff0c;原因是sass升级到1.85之后 定义变量前加上 - 就是表示变量私有&#xff0c;即使使用use导出 在新的…

嵌入式经常用到串口,如何判断串口数据接收完成?

说起通信&#xff0c;首先想到的肯定是串口&#xff0c;日常中232和485的使用比比皆是&#xff0c;数据的发送、接收是串口通信最基础的内容。这篇文章主要讨论串口接收数据的断帧操作。 空闲中断断帧 一些mcu&#xff08;如&#xff1a;stm32f103&#xff09;在出厂时就已经在…

激光雷达YDLIDAR X2 SDK安装

激光雷达YDLIDAR X2 SDK安装 陈拓 2024/12/15-2024/12/19 1. 简介 YDLIDAR X2官方网址https://ydlidar.cn/index.html‌YDLIDAR X2 YDLIDAR X2是一款高性能的激光雷达传感器&#xff0c;具有以下主要特点和规格参数‌&#xff1a; ‌测距频率‌&#xff1a;3000Hz ‌扫描频…

deepseek本地调用

目录 1.介绍 2.开始调用 2.1模型检验 2.2 通过url调用 3.总结 1.介绍 这篇博客用来教你如何从本地调用ollama中deepseek的模型接口&#xff0c;直接和deepseek进行对话。 2.开始调用 2.1模型检验 首先要保证ollama已经安装到本地&#xff0c;并且已经下载了deepseek模型…

word$deepseep

1、进入官网地址。 DeepSeek 2、进入DeepSeek的API文档 3、点击DeepSeek开放平台左侧的“API Keys”, 再点击“创建API Key” 4、在弹出的对话框中&#xff0c;输入自己的API Key名称&#xff0c;点击创建。 sk-0385cad5e19346a0a4ac8b7f0d7be428 5、打开Word文档。 6、Word找…