spring-cloud-stream

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列
第十六章 spring-cloud-stream

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、stream设计思想
  • 2、编码常用的注解
  • 3、编码步骤
    • 3.1、添加依赖
    • 3.2、修改配置文件
    • 3.3、生产
    • 3.4、消费
    • 3.5、延迟队列
      • 3.5.1、修改配置文件
      • 3.5.2、生产端
      • 3.5.2、消息确认机制 消费端
  • 总结

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

在这里插入图片描述
在这里插入图片描述

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

在这里插入图片描述

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组

3.3、生产

/*** 订单消息输出通道处理器*/
@Component
public interface OrderOutputChannelProcesor {@Output("saveOrderOutput")MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).build());log.info("消息发送成功:" + userInfo);}
}

3.4、消费

/*** 订单消息输入通道处理器*/
@Component
public interface OrderInputChannelProcesor {@Input("saveOrderInput")SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {@StreamListener("saveOrderInput")public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());}
}

3.5、延迟队列

安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组rabbit:bindings: #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道producer:delayed-exchange: truesaveOrderInput:consumer:delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());log.info("消息发送成功:" + userInfo);}
}

3.5.2、消息确认机制 消费端

rabbit:bindings: #服务的整合处理saveOrderInput:consumer:acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);/** deliveryTag:Channel的消息投递的唯一标识符。* multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;* 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。* requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;* 如果设置为false,则消息被丢弃或发送到死信Exchange。*/try {channel.basicAck(delieverTag,true);} catch (IOException e) {e.printStackTrace();}
}

定义交换机类型为direct

rabbit:bindings: #服务的整合处理saveOrderInput:consumer:bindingRoutingKey: orderRoutingKeybindQueue: trueexchangeType: directsaveOrderOutput:producer:routingKeyExpression: orderRoutingKeyexchangeType: direct

总结

spring-cloud-stream目前支持RabbitMQ和Kafka,与spring-cloud无缝集成,非常方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/189004.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IIS前端服务和代理

前端服务可以用nginx和IIS开启&#xff0c;windows自带IIS方便管理一点。其实用docker的nginx更方便管理。 记录一下IIS的安装和开启服务过程 1、打开控制面板点击程序&#xff0c;再点击启用或关闭windows功能。 2、 点击左侧启用或关闭Windows功能。 3、把框框中全选上之后点…

便捷Benchmark.sh 自动匹配workload(自用)

​ 因为db_bench选项太多&#xff0c;而测试纬度很难做到统一&#xff08;可能一个memtable大小的配置都会导致测试出来的写性能相关的的数据差异很大&#xff09;&#xff0c;所以官方给出了一个benchmark.sh脚本用来对各个workload进行测试。 该脚本能够将db_bench测试结果中…

CMOS介绍

1 二极管 2 CMOS 2.1 栅极、源极、漏极 2.2 内部结构 2.2 导电原理 - 原理&#xff1a;1.通过门级和衬底加一个垂直电场Ev&#xff0c;从而在两口井之间形成反形层2.如果加的电场足够强&#xff0c;反形层就可以把source&#xff08;源极&#xff09;和drain&#xff08;漏极…

Doris学习--1、Doris简介、操作Doris、Doris架构(数据模型)

星光下的赶路人star的个人主页 心之所向&#xff0c;剑之所往 文章目录 1、Doris简介1.1 快速开始1.2 安装配置1.2.1 应知前提1.2.2 配置Doris1.2.2.0 配置前提1.2.2.1 配置FE&#xff08;Frontend&#xff09;1.2.2.2 启动FE1.2.2.3 连接FE1.2.2.4 停止FE1.2.2.5 配置BE&#…

物联网水表电子阀工作原理是怎样的?

随着科技的不断发展&#xff0c;物联网技术逐渐深入到我们的生活之中。作为智能家居的重要组成部分&#xff0c;物联网水表电子阀凭借其智能化、节能环保等优势&#xff0c;受到了越来越多用户的青睐。接下来&#xff0c;合众小编将来为大家介绍下物联网水表电子阀工作原理。 一…

Git 进阶使用

一. Git图形化操作 1.1.什么是图形化管理工具 图形化管理工具是一种通过可视化界面来操作计算机系统或应用程序的软件工具。在软件开发中&#xff0c;它通常用于管理和操作版本控制系统&#xff08;如Git、SVN等&#xff09;以及代码开发环境&#xff08;如IDE&#xff09;。与…

SSM图书管理系统开发mysql数据库web结构java编程计算机网页源码eclipse项目

一、源码特点 SSM 图书管理系统是一套完善的信息系统&#xff0c;结合springboot框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和 数据库&#xff0c;系统主要…

(二)正点原子I.MX6ULL u-boot移植

一、概述 这里使用的是NXP官方2022.04发布的uboot&#xff0c;移植到正点原子阿尔法开发板&#xff08;v2.1&#xff09; u-boot下载&#xff1a;gitgithub.com:nxp-imx/uboot-imx.git 移植是基于NXP的mx6ull_14x14_evk 二、编译NXP官方uboot 进入NXP的u-boot目录 先在Makefile…

Word 插入的 Visio 图片显示为{EMBED Visio.Drawing.11} 解决方案

World中&#xff0c;如果我们插入了Visio图还用了Endnote&#xff0c; 就可能出现&#xff1a;{EMBED Visio.Drawing.11}问题 解决方案&#xff1a; 1.在相应的文字上右击&#xff0c;在出现的快捷菜单中单击“切换域代码”&#xff0c;一个一个的修复。 2.在菜单工具–>…

亚马逊云AI应用科技创新下的Amazon SageMaker使用教程

目录 Amazon SageMaker简介 Amazon SageMaker在控制台的使用 模型的各项参数 pytorch训练绘图部分代码 Amazon SageMaker简介 亚马逊SageMaker是一种完全托管的机器学习服务。借助 SageMaker&#xff0c;数据科学家和开发人员可以快速、轻松地构建和训练机器学习模型&#…

Apache APISIX Dashboard 未经认证访问导致 RCE(CVE-2021-45232)漏洞复现

漏洞描述 Apache APISIX 是一个动态、实时、高性能的 API 网关&#xff0c;而 Apache APISIX Dashboard 是一个简单易用的前端界面&#xff0c;用于管理 Apache APISIX。 在 2.10.1 之前的 Apache APISIX Dashboard 中&#xff0c;Manager API 使用了两个框架&#xff0c;并在…

《视觉SLAM十四讲》-- 相机与图像

04 相机与图像 4.1 相机模型 4.1.1 针孔相机模型 针孔模型描述了一束光线通过针孔后&#xff0c;在针孔背面投影成像的关系&#xff08;类似小孔成像原理&#xff09;。 根据相似三角关系 Z f − X X ′ − Y Y ′ (3-1) \frac{Z}{f}-\frac{X}{X^{\prime}}-\frac{Y}{Y^{\p…

自然语言处理(一):RNN

「循环神经网络」&#xff08;Recurrent Neural Network&#xff0c;RNN&#xff09;是一个非常经典的面向序列的模型&#xff0c;可以对自然语言句子或是其他时序信号进行建模。进一步讲&#xff0c;它只有一个物理RNN单元&#xff0c;但是这个RNN单元可以按照时间步骤进行展开…

HarmonyOS开发:回调实现网络的拦截

前言 上一篇文章&#xff0c;分享了一个基于http封装的一个网络库&#xff0c;里面有一个知识点&#xff0c;在初始化的时候&#xff0c;可以设置请求头拦截和请求错误后的信息的拦截&#xff0c;具体案例如下&#xff1a; Net.getInstance().init({netErrorInterceptor: new M…

C++套接字库sockpp介绍

sockpp是一个开源、简单、现代的C套接字库&#xff0c;地址为&#xff1a;https://github.com/fpagliughi/sockpp&#xff0c;最新发布版本为0.8.1&#xff0c;license为BSD-3-Clause。目前支持Linux、Windows、Mac上的IPv4、IPv6和Unix域套接字。其它*nix和POSIX系统只需很少的…

获取请求IP以及IP解析成省份

某些业务需要获取请求IP以及将IP解析成省份之类的&#xff0c;于是我写了一个工具类&#xff0c;可以直接COPY /*** IP工具类* author xxl* since 2023/11/9*/ Slf4j public class IPUtils {/*** 过滤本地地址*/public static final String LOCAL_ADDRESS "127.0.0.1&quo…

【数据结构初阶】算法的时间复杂度和空间复杂度

各位读者老爷好&#xff01;现在鼠鼠我呀来浅谈一下数据结构初阶中的一个知识点&#xff1a;算法的时间复杂度和空间复杂度&#xff0c;希望对你有所帮助。 在浅谈时间复杂度和空间复杂度之前&#xff0c;咱们可以来了解一下一下几个概念&#xff1a; 1.什么是数据结构 数据结…

Http状态码502常见原因及排错思路(实战)

Http状态码502常见原因及排错思路 502表示Bad Gateway。当Nginx返回502错误时&#xff0c;通常表示Nginx作为代理服务器无法从上游服务器&#xff08;如&#xff1a;我们的后端服务器地址&#xff09;获取有效的响应。导致这种情况的原因有很多&#xff1a; 后端服务器故障ngin…

3 分钟看完 NVIDIA GPU 架构及演进

近期随着 AI 市场的爆发式增长&#xff0c;作为 AI 背后技术的核心之一 GPU&#xff08;图形处理器&#xff09;的价格也水涨船高。GPU 在人工智能中发挥着巨大的重要&#xff0c;特别是在计算和数据处理方面。目前生产 GPU 主流厂商其实并不多&#xff0c;主要就是 NVIDIA、AM…

React 递归手写流程图展示树形数据

需求 根据树的数据结构画出流程图展示&#xff0c;支持新增前一级、后一级、同级以及删除功能&#xff08;便于标记节点&#xff0c;把节点数据当作label展示出来了&#xff0c;实际业务中跟据情况处理&#xff09; 文件结构 初始数据 [{"ticketTemplateCode": &…