Spring Cloud — 消息驱动 Stream

Spring Cloud Stream 是让微服务更容易在应用中实现消息的发布和订阅处理的框架。Stream 支持与多种消息中间件整合,如Kafka、RibbitMQ等。

本文使用的是Kafka消息中间件,依赖文件为:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>1.2.1.RELEASE</version>
</dependency>

1 Stream 应用模型

图 Spring Cloud Stream 的应用模式

通道接口与监听接口:用于定于消息的输入和输出通道的接口,通过注解(@Input或@Output)标记方法。该注解的参数为通道名称。而通道名称与消息中间件的主题一对一绑定。

消息通道:对消息队列的一种抽象,用来存放消息发布者发布的消息或消费者所要消费的信息及信息的发送与接收。

消息绑定器:实现应用程序与消息中间件之间交互的隔离。封装了交互细节。

1.1 消息发送和监听步骤

负责发送消息的微服务称为生产者,负责接收消息的微服务称为消费者。

1.1.1 生产者的消息发送

1)添加@EnableBinding注解。例如,@EnableBinding(Source.class)

该注解会触发Spring Cloud Stream进行基本配置,将应用升级为一个Spring Cloud Stream 应用。其可以声明一个或多个(发送或监听)通道接口参数。

2)访问消息通道及发送消息。

发送消息通道用于发送消息(MessageChannel的send方法)。通过上面声明的发送通道接口的方法获取通道。

1.1.2 消费者的消息监听

1)添加@EnableBinding注解。例如,@EnableBinding(Sink.class)

2)@StreamListener 注解方法来实现消息的监听。

@StreamListener(Sink.INPUT)
public void onUserMsgSink(UserMsg userMsg) {System.out.println("收到消息:" + userMsg);
}

1.1.3 相关配置

还需要配置Kafka的连接信息,及消息通道对应的主题。

spring.cloud.stream.kafka.binder.brekers=Kafka的Host

spring.cloud.stream.kafka.binder.zkNodes=zookeeper的Host

生产者的配置:

spring.cloud.bindings.[通道名称].destination= 发送通道对应的主题

消费者的配置:

spring.cloud.bindings.[通道名称].destination= 监听通道对应的主题

spring.cloud.bindings.[通道名称].group= 消费者组名

1.2 通道接口 Channel Interfaces

public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}public interface Sink {String INPUT = "input";@Input("input")SubscribableChannel input();
}

Source 及Sink 为Spring Cloud Stream 框架定义的发送消息通道接口及监听通道接口。@EnableBinding注解的参数就是通道接口的class。@Output来标注获取发送消息通道的方法。其中返回值要是MessageChannel及其子类。@Input来标注获取监听消息通道的方法,返回值要是SubscribableChannel及其子类。

1.2.1 通道接口的注入及使用

@Component
public class UserMsgSender {private final Source source;public UserMsgSender(Source source) {this.source = source;}
}

获取通道,调用Source接口的output()方法。也可以直接在Bean中注入消息通道。

@Component
public class UserMsgSender {private final MessageChannel channel;public UserMsgSender(@Qualifier("output") MessageChannel channel) {this.channel = channel;}
}

如果在服务中定义了多个消息通道,可以通过@Qualifier(“通道名称”)来明确具体的通道。

1.2.2 自定义消息通道

Source定义的发送通道名称为output,Sink定义的监听通道名称为input。我们可以自定义通道。步骤如下:

1)定义通道接口。

public interface ConsumerStreamChannel {@Input("consumerUserMsg")SubscribableChannel userMsg();
}

2)@EnableBinding中声明这个接口。

@EnableBinding(ConsumerStreamChannel.class)

3)在配置文件中为该通道配置相应的主题(或消费组名)。

spring.cloud.bindings.consumerUserMsg.destination= 监听通道对应的主题

spring.cloud.bindings.consumerUserMsg.group= 消费者组名

注意,在使用@StreamListener注解来标注监听方法时,该注解的参数为通道名称。例如,@StreamListener("consumerUserMsg")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/26081.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络安全应急响应中主机历史命令被删除 网络安全事件应急响应

17.1 网络安全应急响应概述 “居安思危&#xff0c;思则有备&#xff0c;有备无患。”网络安全应急响应是针对潜在发生的网络安全事件而采取的网络安全措施。本节主要阐述网络安全响应的概念、网络安全应急响应的发展、网络安全应急响应的相关要求。 17.1.1 网络安全应急响应概…

QT——c++界面编程库

非界面编程 QT编译的时候&#xff0c;依赖于 .pro 配置文件&#xff1a; SOURCES: 所有需要参与编译的 .cpp 源文件 HEADERS:所有需要参与编译的.h 头文件 QT&#xff1a;所有需要参与编译的 QT函数库 .pro文件一旦修改&#xff0c;注意需要键盘按 ctrls 才能加载最新的配置文…

Typora的Github主题美化

[!note] Typora的Github主题进行一些自己喜欢的修改&#xff0c;主要包括&#xff1a;字体、代码块、表格样式 美化前&#xff1a; 美化后&#xff1a; 一、字体更换 之前便看上了「中文网字计划」的「朱雀仿宋」字体&#xff0c;于是一直想更换字体&#xff0c;奈何自己拖延症…

达梦数据库系列之安装及Mysql数据迁移

达梦数据库系列之安装及Mysql数据迁移 1. 达梦数据库1.1 简介1.2 Docker安装达梦1.2.1 默认密码查询1.2.2 docker启动指定密码 1.3 达梦数据库连接工具1.3.1 快捷键 2 Mysql数据库迁移至达梦2.1 使用SQLark进行数据迁移 1. 达梦数据库 1.1 简介 DM8是达梦公司在总结DM系列产品…

使用 Kubeflow 和 Ray 构建机器学习平台

使用 Kubeflow 和 Ray 构建一个稳健的 ML 平台。我们将深入讨论 Kubeflow 和 Ray 的独特功能,以及它们如何互补,共同创建一个强大的 ML 生态系统 集中化 ML 平台的需求 随着企业在 ML 旅程中的成熟,初始 ML 项目的临时性质逐渐让位于对更结构化和可扩展方法的需求。集中化…

【MySQL】数据库-图书管理系统(CC++实现)

一.预期功能 该图书管理系统设计提供基本的设计模版&#xff0c;涉及数据库的增删查改等操作&#xff0c;包含登录功能&#xff0c;图书管理功能&#xff0c;图书借阅功能&#xff0c;用户管理功能等基础功能&#xff0c;详细功能查看以下菜单表&#xff0c;共包含三个菜单&am…

Pany-v2:LFI漏洞探测与敏感文件(私钥窃取/其他)自动探测工具

地址:https://github.com/MartinxMax/pany 关于Pany-v2 Pany-v2 是一款 LFI&#xff08;本地文件包含&#xff09;漏洞探测工具&#xff0c;具备自动识别敏感文件的能力。它能够利用 LFI 漏洞检测并提取 id_rsa 私钥、系统密码文件以及其他可能导致安全风险的敏感信息。该工具…

【Prometheus】prometheus服务发现与relabel原理解析与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…

HONOR荣耀MagicBook 15 2021款 独显(BOD-WXX9,BDR-WFH9HN)原厂Win10系统

适用型号&#xff1a;【BOD-WXX9】 MagicBook 15 2021款 i7 独显 MX450 16GB512GB (BDR-WFE9HN) MagicBook 15 2021款 i5 独显 MX450 16GB512GB (BDR-WFH9HN) MagicBook 15 2021款 i5 集显 16GB512GB (BDR-WFH9HN) 链接&#xff1a;https://pan.baidu.com/s/1S6L57ADS18fnJZ1…

【tplink】校园网接路由器如何单独登录自己的账号,wan-lan和lan-lan区别

老式路由器TPLINK&#xff0c;接入校园网后一人登录&#xff0c;所有人都能通过连接此路由器上网&#xff0c;无法解决遂上网搜索&#xff0c;无果&#xff0c;幸而偶然看到一个帖子说要把信号源网线接入路由器lan口&#xff0c;开启新世界。 一、wan-lan&#xff0c;lan-lan区…

Vue程序下载

Vue是一个基于JavaScript&#xff08;JS&#xff09;实现的框架&#xff0c;想要使用它&#xff0c;就得先拿到Vue的js文件 Vue官网 Vue2&#xff1a;Vue.js Vue3&#xff1a;Vue.js - 渐进式 JavaScript 框架 | Vue.js 下载并安装vue.js 第一步&#xff1a;打开Vue2官网&a…

ShenNiusModularity项目源码学习(14:ShenNius.Infrastructure项目分析)

ShenNius.Infrastructure项目用于定义ShenNius.Admin.Mvc项目和ShenNius.Admin.API项目共用的特性类、数据操作接口实现类、上下文类、通讯类&#xff0c;主要文件的用途如下&#xff1a;   Attributes文件夹保存特性类或过滤器类定义&#xff0c;主要包括&#xff1a;   …

音乐游戏Pump It Up(PIU)模拟器

文章目录 &#xff08;一&#xff09;Pump It Up&#xff08;1.1&#xff09;基本情况&#xff08;1.2&#xff09;机体 &#xff08;二&#xff09;模拟器&#xff08;2.1&#xff09;主程序&#xff08;2.2&#xff09;模拟器主题 &#xff08;三&#xff09;曲谱文件&#x…

Flink同步数据mysql到doris问题合集

Flink同步数据mysql到doris 官方同步流程Doris安装下载地址导入镜像启动配置 Flink-cdc安装&#xff08;自制&#xff09;下载地址导入镜像启动命令 启动问题修复Flink报错Could not acquire the minimum required resources.作业报错 Mysql8.0 Public Key Retrieval is not al…

STM32呼吸灯实验手册(TIM定时器)

一、实验目标 使用TIM定时器的PWM模式控制LED亮度实现LED渐亮渐灭的呼吸灯效果掌握HAL库的TIM配置方法 二、硬件准备 开发板&#xff1a;STM32F103C8T6LED模块&#xff1a;LED串联220Ω电阻两组USB-TTL调试器硬件连接 三、软件配置&#xff08;STM32CubeMX&#xff09; 打开…

vue框架后遗症∶被遗忘的dom操作

用多了vue、react等前端框架&#xff0c;不得不说用数据驱动视图来开发真的很香&#xff0c;但是也免不了会有不用这些框架的项目&#xff0c;dom操作还是很有必要的&#xff0c;一开始学习网页设计的时候就教过&#xff0c;后面一直开发项目基本上用框架。虽然有些想不起来了&…

Centos7源码编译安装Sqlite最新版本

下载源码 https://www.sqlite.org/download.html 复制下载链接&#xff0c;然后用 wget 下载 wget https://www.sqlite.org/2025/sqlite-autoconf-3490100.tar.gz 解压缩编译安装 tar -zxf sqlite-autoconf-3490100.tar.gz cd sqlite-autoconf-3490100 ./configure --prefi…

数据结构秘籍(二)图(含图的概念、存储以及图的两大搜索)

1 引言 线性数据结构的元素满足唯一的线性关系&#xff0c;每个元素&#xff08;初第一个和最后一个外&#xff09;只有一个直接前趋和一个直接后继。树形数据结构的元素之间有着明显的层次关系。但是图形结构的元素之间的关系是任意的。 什么是图&#xff1f; 简单来说&…

遗传算法详解及在matlab中的使用

遗传算法分析 一 遗传算法概述1 算法概念2 基本特点3 启发式算法 二 原理与方法1 实现步骤1.1 个体编码1.2 种群初始化1.3 适应度计算1.4 选择运算1.5 交叉运算1.6 变异运算 2 总结 三 应用实例1 GA工具使用教程2 设置目标函数3 搜索最小值4 搜索最大值 一 遗传算法概述 本章简…

pytorch基础-nn.linear

import torch import torch.nn as nn# 定义线性层 linear_layer nn.Linear(in_features10, out_features5, biasTrue)# 输入数据 input_data torch.randn(32, 10) # (batch_size32, in_features10)# 前向传播 output linear_layer(input_data) print(output.shape) # 输出…