Spring Cloud Stream实践

概述

不同中间件,有各自的使用方法,代码也不一样。

可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间件时,绑定层使用不同的绑定器即可,也就是把切换中间件需要做相应的修改工作交给绑定层来做。

本文的操作是在 微服务调用链路追踪 的基础上进行。

环境说明

jdk1.8

maven3.6.3

mysql8

spring cloud2021.0.8

spring boot2.7.12

idea2022

rabbitmq3.12.4

步骤

消息生产者

创建子模块stream_producer

添加依赖

    <dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

刷新依赖

配置application.yml

server:port: 7001
spring:application:name: stream_producerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit

查看Source.class源码

编写生产者代码,发送一条消息("hello world")到rabbitmq的my-default exchange中

package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@EnableBinding(Source.class)
@SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {@Autowiredprivate MessageChannel output;@Overridepublic void run(String... args) throws Exception {//发送消息// messageBuilder 工具类,创建消息output.send(MessageBuilder.withPayload("hello world").build());}public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}

查看rabbitmq web UI

http://localhost:15672/

看到Exchanges中还没有my-default

运行StreamProductApplication

刷新rabbitmq Web UI,看到了my-dafault的exchange

消息消费者

创建子模块stream_consumer

添加依赖

    <dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

配置application.yml

server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbit

查看内置通道名称为input

编写消息消费者启动类,在启动类监听接收消息

package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamConsumerApplication {@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("监听收到:" + message.getPayload());}public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}
}

运行stream_consumer消费者服务,监听消息

运行stream_producer生产者服务,发送消息

查看消费者服务控制台日志,接收到了消息

优化代码

之前把生产和消费的消息都写在启动类中了,代码耦合高。

优化思路是把不同功能的代码分开放。

消息生产者

stream_producer 代码结构如下

package org.example.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(Source.class)
public class MessageSender {@Autowiredprivate MessageChannel output;//通道//发送消息public void send(Object obj){output.send(MessageBuilder.withPayload(obj).build());}
}

修改启动类

package org.example.stream;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;@SpringBootApplication
public class StreamProductApplication {public static void main(String[] args) {SpringApplication.run(StreamProductApplication.class, args);}}

pom.xml添加junit依赖

<dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope>
</dependency>

刷新依赖

编写测试类

在stream_producerm模块的src/test目录下,新建org.example.stream包,再建出ProducerTest类,代码如下

package org.example.stream;import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {@Autowiredprivate MessageSender messageSender;//注入发送消息工具类@Testpublic void testSend(){messageSender.send("hello world");}
}

 

消息消费者

stream_consumer代码结构如下

添加MessageListener类获取消息

package org.example.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Sink.class)
public class MessageListener {// 监听binding中的信息@StreamListener(Sink.INPUT)public void input(String message){System.out.println("获取信息:" + message);}
}

修改启动类

package org.example.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;@SpringBootApplication
public class StreamConsumerApplication {public static void main(String[] args) {SpringApplication.run(StreamConsumerApplication.class, args);}}

启动consumer接收消息

执行producer单元测试类ProducerTest的testSend()方法,发送消息

查看consumer控制台输出,接收到信息了

代码解耦后,同样能成功生产消息和消费消息。

自定义消息通道

此前使用默认的消息通道outputinput。

也可以自己定义消息通道,例如:myoutputmyinput

消息生产者

org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产这的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

修改MessageSender

package org.example.stream.producer;import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 向中间件发送数据*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {@Autowiredprivate MessageChannel myoutput;//通道//发送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload(obj).build());}
}

修改application.yml

cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地myoutput:destination: custom-output

消息消费者

在stream_consumer服务的org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;/*** 自定义的消息通道*/
public interface MyProcessor {/*** 消息生产者的配置*/String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();/*** 消息消费者的配置*/String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}

修改MessageListener

package org.example.stream.stream;import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {// 监听binding中的信息@StreamListener(MyProcessor.MYINPUT)public void input(String message){System.out.println("获取信息:" + message);}
}

修改application.yml配置

cloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地myinput:destination: custom-output

测试

启动stream_consumer

运行单元测试的testSend()方法生产消息

查看stream_consumer控制台,能看到生产的消息,如下

获取信息:hello world

消息分组

采用复制配置方式运行两个消费者

启动第一个消费者(端口为7002)

修改端口为7003,copy configuration,再启动另一个消费者

执行生产者单元测试生产消息,看到两个消费者都接收到了信息

说明:如果有两个消费者,生产一条消息后,两个消费者均能收到信息。

但当我们发送一条消息只需要其中一个消费者消费消息时,这时候就需要用到消息分组,发送一条消息消费者组内只有一个消费者消费到。

我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可

重启两个消费者

修改端口号为7002,重新启动第一个消费者

修改端口号为7003,重新启动第二个消费者

生产者生产一条消息

查看消费者接收消息情况,只有一个消费者接收到信息。

消息分区

消息分区就是实现特定消息只往特定机器发送。

修改生产者配置

  cloud:stream:bindings:output:destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myoutput:destination: custom-outputproducer:partition-key-expression: payload #分区关键字 可以是对象中的id,或对象partition-count: 2 #分区数量

修改消费者1的application.yml配置

server:port: 7002
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 0 #当前消费者的索引

启动消费者1

修改消费者2的配置

server:port: 7003
spring:application:name: stream_consumerrabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #内置获取消息的通道,从destination配置值的exchange中获取信息destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称myinput:destination: custom-outputgroup: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息consumer:partitioned: true #开启分区支持binders:defaultRabbit:type: rabbit #配置默认的绑定器为rabbitinstance-count: 2 #消费者总数instance-index: 1 #当前消费者的索引

修改端口号为7003,当前消费者的索引instance-index的值修改为1

启动消费者2

生产者发送消息,看到只有Application(2)接收到消息

再用生产者发送一次消息,也是Application(2)接收到消息

说明实现了消息分区

也可以更改发送的数据,看是否能发送到不同消费者

修改生产者,发送数据由hello world变为hello world1,同时发送5次

	public void testSend(){for (int i = 0; i < 5; i++) {messageSender.send("hello world1");}}

看到hello world1全部被Application消费

所以消息分区是根据发送的消息不同,发送到不同消费者中。

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/199231.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AIGC ChatGPT 4 将数据接口文件使用Python进行入库Mysql

数据分析,数据处理的过程,往往将采集到的数据,或者从生产库过来的接口文件,我们都需要进行入库操作。 如下图数据: 将这样的数据接口文件,进行入库,插入到Mysql数据库中。 用Python代码来完成。 ChatGPT4来完成代码输入。 ChatGPT4完整内容如下: 这个任务可以使用`…

持续集成交付CICD:Jenkins通过API触发流水线

目录 一、理论 1.HTTP请求 2.调用接口的方法 3.HTTP常见错误码 二、实验 1.Jenkins通过API触发流水线 三、问题 1.如何拿到上一次jenkinsfile文件进行自动触发流水线 一、理论 1.HTTP请求 &#xff08;1&#xff09;概念 HTTP超文本传输协议&#xff0c;是确保服务器…

Flink(七)【输出算子(Sink)】

前言 今天是我写博客的第 200 篇&#xff0c;恍惚间两年过去了&#xff0c;现在已经是大三的学长了。仍然记得两年前第一次写博客的时候&#xff0c;当时学的应该是 Java 语言&#xff0c;菜的一批&#xff0c;写了就删&#xff0c;怕被人看到丢脸。当时就想着自己一年之后&…

OpenCV快速入门:直方图、掩膜、模板匹配和霍夫检测

文章目录 前言一、直方图基础1.1 直方图的概念和作用1.2 使用OpenCV生成直方图1.3 直方图归一化1.3.1 直方图归一化原理1.3.2 直方图归一化公式1.3.3 直方图归一化代码示例1.3.4 OpenCV内置方法&#xff1a;normalize()1.3.4.1 normalize()方法介绍1.3.4.2 normalize()方法参数…

SSH协议简介与使用

Secure Shell(SSH) 是由 IETF(The Internet Engineering Task Force) 制定的建立在应用层基础上的安全网络协议。它是专为远程登录会话(甚至可以用Windows远程登录Linux服务器进行文件互传)和其他网络服务提供安全性的协议&#xff0c;可有效弥补网络中的漏洞。通过SSH&#xf…

【封装UI组件库系列】搭建项目及准备工作

封装UI组件库系列第一篇搭建项目 前言 &#x1f31f;搭建项目 创建工程 基本结构 1.创建8个组件展示页面 ​ 2.配置路由文件router/index.js 3.页面布局 &#x1f31f;总结 前言 在前端开发中&#xff0c;大家可能已经用过各种各样的UI组件库了&#xff0c;现在市面上热…

单链表相关面试题--7.链表的回文结构

/* 解题思路&#xff1a; 此题可以先找到中间节点&#xff0c;然后把后半部分逆置&#xff0c;最近前后两部分一一比对&#xff0c;如果节点的值全部相同&#xff0c;则即为回文。 */ class PalindromeList { public:bool chkPalindrome(ListNode* A) {if (A NULL || A->ne…

Python---函数的嵌套(一个函数里面又调用了另外一个函数)

函数嵌套调用------就是一个函数里面又调用了另外一个函数。 基本语法&#xff1a; # 定义 函数B def funcB():print(这是funcB函数的函数体部分...)# 定义 函数A def funcA():print(- * 80) # 这一行为了更好区分print(这是funcA函数的函数体部分...)# 假设我们在调用funcA…

车载通信架构 —— 传统车内通信网络发展回顾

车载通信架构 —— 传统车内通信网络发展回顾 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 屏蔽力是信息过载时代一个人的特殊竞争力&#xff0c;任何…

CentOS 7搭建Gitlab流程

目录 1、查询docker镜像gitlab-ce 2、拉取镜像 3、查询已下载的镜像 4、新建gitlab文件夹 5、在gitlab文件夹下新建相关文件夹 6、创建运行gitlab的容器 7、查看docker容器 8、根据Linux地址访问gitlab 9、进入docker容器&#xff0c;设置用户名的和密码 10、登录git…

2023.11.14 关于 Spring Boot 创建和使用

目录 Spring Boot Spring Boot 项目的创建 网页版创建 Spring Boot 项目 Spring Boot 目录说明 项目运行 Spring Boot Spring Boot 是基于 Spring 设计的一个全新的框架&#xff0c;其目的是用来简化 Spring 的应用、初始搭建、开发的整个过程Spring Boot 就是一个整合了…

前缀和(c++,超详细,含二维)

前缀和与差分 当给定一段整数序列a1,a2,a3,a4,a5…an; 每次让我们求一段区间的和&#xff0c;正常做法是for循环遍历区间起始点到结束点&#xff0c;进行求和计算&#xff0c;但是当询问次数很多并且区间很长的时候 比如&#xff0c;10^5 个询问和10^6区间长度&#xff0c;相…

Springboot框架中使用 Redis + Lua 脚本进行限流功能

Springboot框架中使用 Redis Lua 脚本进行限流功能 限流是一种用于控制系统资源利用率或确保服务质量的策略。在Web应用中&#xff0c;限流通常用于控制接口请求的频率&#xff0c;防止过多的请求导致系统负载过大或者防止恶意攻击。 什么是限流&#xff1f; 限流是一种通过…

深度学习乳腺癌分类 计算机竞赛

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度&#xff0c;召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…

[算法学习笔记](超全)概率与期望

引子 先来讲个故事 话说在神奇的OI大陆上&#xff0c;有一只paper mouse 有一天&#xff0c;它去商场购物&#xff0c;正好是11.11&#xff0c;商店有活动 它很荣幸被选上给1832抽奖 在抽奖箱里&#xff0c;有3个篮蓝球&#xff0c;12个红球 paper mouse能抽3次 蒟蒻的p…

Java —— 抽象类和接口

目录 1. 抽象类 1.1 抽象类概念 1.2 抽象类语法与特性 1.3 抽象类的作用 2. 接口 2.1 接口的概念 2.2 接口的语法规则与特性 2.3 实现多个接口(解决多继承的问题) 2.4 接口间的继承 2.5 抽象类和接口的区别 2.6 接口的使用实例 2.7 Clonable 接口和深拷贝 2.7.1 Cloneable接口 …

手把手从零开始训练YOLOv8改进项目(官方ultralytics版本)教程

手把手从零开始训练 YOLOv8 改进项目 (Ultralytics版本) 教程,改进 YOLOv8 算法 本文以Windows服务器为例:从零开始使用Windows训练 YOLOv8 算法项目 《芒果 YOLOv8 目标检测算法 改进》 适用于芒果专栏改进 YOLOv8 算法 文章目录 官方 YOLOv8 算法介绍改进网络代码汇总第…

Apache阿帕奇安装配置

目录 一、下载程序 1. 点击Download 2. 点击Files for Microsoft Windows 3. 点击Apache Lounge 4. 点击httpd-2.4.54-win64-VSI6.zip ​5. 下载压缩包 6.解压到文件夹里 二、配置环境变量 1. 右键我的电脑 - 属性 2. 高级系统设置 3. 点击环境变量 4. 点击系统变…

IDEA调用接口超时,但Postman可成功调用接口

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

运行软件报错mfc140.dll丢失?分享mfc140.dll丢失的解决方法

小伙伴们&#xff0c;你是否也有过这样的经历&#xff1a;每当碰到诸如" mfc140.dll 丢失 "之类的烦人错误时&#xff0c;你是不是会一头雾水&#xff0c;完全不知道从何下手去解决&#xff1f;不要担心&#xff0c;接下来咱就给你提供这样一篇实用教程&#xff0c;教…