springCould中的Stream-从小白开始【12】

🥚今日鸡汤🥚

        见过一些人,他们朝九晚五😭,有时也要加班,却能把生活过得很😎有趣。他们有自己的爱好,不怕独处。他们有自己的坚持,哪怕没人在乎。🤦‍♂️

                                                               开心一点😁

                                                               认真一点🤔

                                                               努力一点🫡

目录

😶‍🌫️1.为什么引入Stream

🥚2.什么是Stream 

🧇3.Steam设计思想 

🥓4.案例说明 

🧂5.重复消费 


1.为什么引入Stream🥚🥚🥚

  • 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

1.1无感知的使用消息中间件

Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。

1.2中间件和服务的高度解耦

Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

2.什么是Stream 🥚🥚🥚

  • 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架
  • 应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。
  • 通过我们配置binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互

所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

3.Steam设计思想🥚🥚🥚 

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离

4.案例说明 🥚🥚🥚

  • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803,作为消息接收模块

4.1消息驱动-生产者

1.加pom

   <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot </groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--基础依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--eureka客户端--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--消息驱动--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

2.改yml

  • 注意:小张的Rabbitmq是在Linux上的,所以配置如下:
server:port: 8801spring:application:name: cloud-stream-providerrabbitmq:host: 192.168.20.129port: 5672username: rootpassword: 123456cloud:stream:binders:defaultRabbit:type: rabbitbindings:output:destination: studyExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

3.主启动类

@SpringBootApplication
public class StreamMqMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMqMain8801.class);}
}

4.业务类

  • 1.创建接口
  • 2.创建接口实现类
  • @EnableBinding:Spring Cloud Stream中用来启用消息传递功能的注释。
  • 它用于将应用程序绑定到消息传递系统(例如,Apache Kafka, RabbitMQ),并声明用于发送和接收消息的输入和输出通道。
  • 通过使用@EnableBinding,您可以定义应用程序所需的通道和消息处理程序
@EnableBinding(Source.class)//定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {@Autowiredprivate MessageChannel output; //消息发送管道@Overridepublic String send() {String serial= UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("======serial:"+serial);return null;}
}

5.测试

  • 1.浏览器192.168.20.129:15672访问RabbitMQ
  • 2.localhost:8801/sendMessage访问

4.2消息驱动-消费者

1.建模块

  • 1.在父工程下创建模块cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • 2.注意jdk和maven版本号

2.加pom

  • 1.springboot依赖
  • 2.通用依赖
  • 3.eureka客户端依赖
  • 4.消息驱动rabbitmq
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot </groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--基础依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--eureka客户端--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--消息驱动--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

3.添yml

server:port: 8802spring:application:name: cloud-stream-consumerrabbitmq:host: 192.168.20.129port: 5672username: rootpassword: 123456cloud:stream:binders:defaultRabbit:type: rabbitbindings:input:destination: studyExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

4.主启动类

@SpringBootApplication
public class StreamMqMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMqMain8802.class);}
}

5.业务类

  • 1.@StreamListener注解是Spring Cloud Stream框架提供的一个注解,用于定义一个消息监听器
  • 2.通过使用@StreamListener注解,可以将一个方法标记为消息的消费者并指定该方法要监听的消息通道
  • 3.当有消息到达指定的通道时,该方法会被自动触发执行,从而处理这个消息。
  • 4.@StreamListener注解通常与@EnableBinding注解一起使用,用于指定所要绑定的消息通道。
  • 5.@EnableBinding注解用于绑定消息通道与应用程序中的输入输出接口,@StreamListener注解则用于标记一个方法作为消息的消费者。
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1---接受消息:"+message.getPayload()+",port:"+serverPort);}}

6.测试

  • 1.使用8801生产者发送消息
  • 2.使用8802消费者接受消息

5.重复消费 🥚🥚🥚

问题描述:

  • 1.根据8802,重新创建cloud-stream-rabbitmq-consumer8803,作为消息接收模块
  • 2.8801生产者发送消息
  • 3.8802,8803都可以接收到

 如果一个订单同时被两个服务获取到,就会造成数据错误

注意:在Stream中处于同一个group中的多个消费者竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

5.1自定义分组

在消费者端添加group配置:分为xzA,xzB

5.2轮询分组 

8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

8802,8803的group配置相同名称,重新启动 ,使用8801发送两条消息,8802接受一条,8803接收一条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/238296.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RISC-V Bytes: Caller and Callee Saved Registers

原文链接1&#xff1a;https://danielmangum.com/posts/risc-v-bytes-caller-callee-registers/ 原文链接2&#xff1a;https://zhuanlan.zhihu.com/p/77663680 //主要讲栈帧 原文链接3&#xff1a;https://www.jianshu.com/p/b666213cdd8a //主要讲栈帧 This is part of a new…

MySQL面试题 | 08.精选MySQL面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

AArch64 memory management学习(二)

提示 该博客主要为个人学习&#xff0c;通过阅读官网手册整理而来&#xff08;个人觉得阅读官网的英文文档非常有助于理解各个IP特性&#xff09;。若有不对之处请参考参考文档&#xff0c;以官网文档为准。AArch64 memory management学习一共分为两章&#xff0c;这是第二章。…

实战 | 某电商平台类目SKU数获取与可视化展示

一、项目背景 最近又及年底&#xff0c;各类分析与规划报告纷至沓来&#xff0c;于是接到了公司平台类目商品增长方向的分析需求&#xff0c;其中需要结合外部电商平台做对比。我选择了国内某电商平台作为比较对象&#xff0c;通过获取最细层级前台类目下的SKU数以及结构占比&…

Java网络爬虫--HttpClient

目录标题 技术介绍有什么优点&#xff1f;怎么在项目中引入&#xff1f; 请求URLEntityUtils 类GET请求带参数的GET请求POST请求 总结 技术介绍 HttpClient 是 Apache Jakarta Common 下的子项目&#xff0c;用来提供高效的、功能丰富的、支持 HTTP 协议的客户端编程工具包。相…

八分钟了解一致性算法 -- Raft算法

前言 #### 分布式一致性在分布式环境中,一致性是指数据在多个副本之间是否能够保持一致的特性。 #### 分布式一致性算法比较常见的一致性算法包括Paxos算法,Raft算法,ZAB算法等• Paxos是Leslie Lamport提出的一种基于消息传递的分布式一致性算法。很多分布式一致性算法都由…

机器学习根据金标准标记数据-九五小庞

根据金标准标记数据是一种在机器学习和数据科学中常见的操作&#xff0c;主要用于评估分类模型的性能。其基本步骤如下&#xff1a; 收集数据&#xff1a;首先需要收集相关领域的原始数据&#xff0c;这些数据通常来自不同的来源和渠道。数据清洗和预处理&#xff1a;在这一步…

什么是Modbus协议?

Modbus协议是一种在工业自动化领域广泛应用的通信协议&#xff0c;它允许不同设备之间进行可靠的数据交换和控制。该协议最初由Modicon公司于1979年创建&#xff0c;旨在提供一种简单而有效的方法&#xff0c;使PLC&#xff08;可编程逻辑控制器&#xff09;和其他自动化设备能…

四搭建dockerhub私有仓库

搭建dockerhub私有仓库 很多场景下&#xff0c;我们需使用私有仓库管理Docker镜像。相比Docker Hub&#xff0c;私有仓库有以下优势&#xff1a; 节省带宽&#xff0c;对于私有仓库中已有的镜像&#xff0c;无需从Docker Hub下载&#xff0c;只需从私有仓库中下载即可&#x…

Vue学习笔记五--路由

1、什么是路由 2、VueRouter 2、1VueRouter介绍 2、2使用步骤 2、3路由封装 3、router-link 3.1两个类名 3.2声明式导航传参 4、路由重定向、404 当找不到路由时&#xff0c;跳转配置到404页面 5、路由模式 6、通过代码跳转路由---编程式导航&传参 路由跳转时传参 跳转方式…

【漏洞复现】大华 DSS 数字监控系统 itcBulletin SQL 注入

漏洞描述 大华 DSS存在SQL注入漏洞,攻击者 pota/services/itcBuletin 路由发送特殊构造的数据包,利用报错注入获取数据库敏感信息。攻击者除了可以利用 SQL注入漏词获取数据库中的信息例如,管理员后台密码、站点的用户人人信息)之外,甚至在高权限的情况可向服务器中写入木…

【Python数据可视化】matplotlib之绘制高级图形:散点图、热力图、等值线图、极坐标图

文章传送门 Python 数据可视化matplotlib之绘制常用图形&#xff1a;折线图、柱状图&#xff08;条形图&#xff09;、饼图和直方图matplotlib之设置坐标&#xff1a;添加坐标轴名字、设置坐标范围、设置主次刻度、坐标轴文字旋转并标出坐标值matplotlib之增加图形内容&#x…

嘴尚绝卤味:健康卤味风潮来袭,引领卤味市场新变革!

随着生活水平的提高&#xff0c;人们对食品的需求已不再满足于基本的口感和饱腹&#xff0c;健康、营养成为越来越多人关注的焦点。在这种背景下&#xff0c;健康卤味理念应运而生&#xff0c;并迅速在卤味市场引发了一场深刻的变革。 健康卤味理念强调选用优质、健康的食材&am…

SpringBoot3 WebFlux 可观测最佳实践

前言 链路追踪是可观测性软件系统的一个非常好的工具。它使开发人员能够了解应用程序中和应用程序之间不同交互发生的时间、地点和方式。同时让观测复杂的软件系统变得更加容易。 从Spring Boot 3开始&#xff0c;Spring Boot 中用于链路追踪的旧 Spring Cloud Sleuth 解决方…

反射助你无痛使用Semantic Kernel接入离线大模型

本文主要介绍如何使用 llama 的 server 部署离线大模型&#xff0c;并通过反射技术修改 Semantic Kernel 的 OpenAIClient 类&#xff0c;从而实现指定端点的功能。最后也推荐了一些学习 Semantic Kernel 的资料&#xff0c;希望能对你有所帮助。 封面图片&#xff1a; Dalle3 …

docker/华为云cce 部署nacos 2.3.0 集群模式

镜像地址 https://hub.docker.com/r/nacos/nacos-server 版本 nacos/nacos-server:v2.3.0-slim 关键环境变量 使用mysql数据源 变量值备注MODEcluster启用集群模式MYSQL_SERVICE_DB_NAME数据库名MYSQL_SERVICE_USER数据库用户名MYSQL_SERVICE_PASSWORD数据库密码SPRING_D…

WPF 布局

了解 WPF中所有布局如下&#xff0c;我们一一尝试实现&#xff0c;本文档主要以图形化的形式展示每个布局的功能。 布局&#xff1a; Border、 BulletDecorator、 Canvas、 DockPanel、 Expander、 Grid、 GridView、 GridSplitter、 GroupBox、 Panel、 ResizeGrip、 Separat…

API设计:从基础到最佳实践

1*vWvkkgG6uvgmJT8GkId98A.png 在这次深入探讨中&#xff0c;我们将深入了解API设计&#xff0c;从基础知识开始&#xff0c;逐步进阶到定义出色API的最佳实践。 作为开发者&#xff0c;你可能对许多这些概念很熟悉&#xff0c;但我将提供详细的解释&#xff0c;以加深你的理解…

13、Redis高频面试题

1、项目中为什么用Redis 我们项目中之所以选择Redis&#xff0c;主要是因为Redis有下面这些优点&#xff1a; 操作速度快&#xff1a;Redis的数据都保存在内存中&#xff0c;相比于其它硬盘类的存储&#xff0c;速度要快很多数据类型丰富&#xff1a;Redis支持 string&#x…

【题解】—— 每日一道题目栏

2024.1 【题解】—— LeetCode一周小结1 1. 1599. 经营摩天轮的最大利润 2. 466. 统计重复个数 3. 2487. 从链表中移除节点 4. 2397. 被列覆盖的最多行数 5. 1944. 队列中可以看到的人数 6. 2807. 在链表中插入最大公约数 7. 383. 赎金信 【题解】—— LeetCode一周小…