Message Processing With Spring Integration高级应用:自定义消息通道与端点

一、Spring Integration 简介

Spring Integration 是 Spring 框架的扩展,支持企业集成模式(EIP),提供轻量级的消息处理功能,帮助开发者构建可维护、可测试的企业集成解决方案。

核心目标:
  1. 提供简单的模型来实现复杂的企业集成。
  2. 支持与外部系统的集成。
  3. 提供模块化、松耦合的消息处理架构。

二、Spring Integration 核心组件

1. 消息(Message)
  • 定义:消息是 Spring Integration 的核心,包含 payload(负载)和 header(头部)。
  • 创建消息:通过 MessageBuilder 创建消息。

代码示例

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;Message<String> message = MessageBuilder.withPayload("Message Payload").setHeader("Message_Header1", "Header1_Value").setHeader("Message_Header2", "Header2_Value").build();

2. 消息通道(Message Channel)
  • 定义:消息通道是消息传递的管道,连接消息的生产者和消费者。
  • 类型
    • 点对点(Point-to-Point):每条消息最多被一个消费者接收。
    • 发布/订阅(Publish/Subscribe):每条消息可以被多个订阅者接收。
  • 常见实现
    • DirectChannel:默认点对点通道。
    • NullChannel:虚拟通道,用于测试和调试。
    • 其他:PublishSubscribeChannelQueueChannelPriorityChannel 等。

3. 消息端点(Message Endpoint)

消息端点是应用程序代码与消息基础设施之间的桥梁,主要类型包括:

  • Transformer:转换消息内容或结构。
  • Filter:过滤不符合条件的消息。
  • Router:根据条件将消息路由到不同的通道。
  • Splitter:将消息拆分为多个子消息。
  • Aggregator:将多个消息聚合为一个消息。
  • Service Activator:连接服务实例到消息系统。
  • Channel Adapter:连接消息通道与外部系统。

三、货物处理系统示例

1. 需求

实现一个货物处理系统,功能包括:

  1. 接收货物消息。
  2. 拆分货物列表为单个货物消息。
  3. 基于重量过滤货物。
  4. 根据运输类型(国内/国际)路由货物。
  5. 转换货物消息。
  6. 最终处理并记录货物信息。

2. 项目环境
  • JDK:1.8
  • Spring:4.1.2
  • Spring Integration:4.1.0
  • Maven:3.2.2
  • 操作系统:Ubuntu 14.04

3. 完整代码实现
Step 1:添加依赖

pom.xml 中添加 Spring 和 Spring Integration 的依赖:

<properties><spring.version>4.1.2.RELEASE</spring.version><spring.integration.version>4.1.0.RELEASE</spring.integration.version>
</properties><dependencies><!-- Spring 核心依赖 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency><!-- Spring Integration 核心依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId><version>${spring.integration.version}</version></dependency>
</dependencies>

Step 2:配置类

创建 AppConfiguration 类,配置消息通道和启用 Spring Integration:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {@Beanpublic MessageChannel cargoGWDefaultRequestChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoSplitterOutputChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoFilterOutputChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoTransformerOutputChannel() {return new DirectChannel();}
}

Step 3:消息网关

定义 CargoGateway 接口,作为消息系统的入口:

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;import java.util.List;@MessagingGateway
public interface CargoGateway {@Gateway(requestChannel = "cargoGWDefaultRequestChannel")void processCargoRequest(Message<List<Cargo>> message);
}

Step 4:消息拆分器

实现 CargoSplitter,将货物列表拆分为单个货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;import java.util.List;@MessageEndpoint
public class CargoSplitter {@Splitter(inputChannel = "cargoGWDefaultRequestChannel", outputChannel = "cargoSplitterOutputChannel")public List<Cargo> splitCargoList(Message<List<Cargo>> message) {return message.getPayload();}
}

Step 5:消息过滤器

实现 CargoFilter,过滤重量超过限制的货物:

import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;@MessageEndpoint
public class CargoFilter {private static final double CARGO_WEIGHT_LIMIT = 1000.0;@Filter(inputChannel = "cargoSplitterOutputChannel", outputChannel = "cargoFilterOutputChannel", discardChannel = "cargoFilterDiscardChannel")public boolean filterCargo(Cargo cargo) {return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;}
}

Step 6:服务激活器

实现 CargoServiceActivator,处理最终的货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;@MessageEndpoint
public class CargoServiceActivator {@ServiceActivator(inputChannel = "cargoTransformerOutputChannel")public void processCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {System.out.println("Processed Cargo: " + cargo + " in Batch: " + batchId);}
}

Step 7:运行主程序

创建 Application 类,初始化 Spring 容器并发送货物请求:

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;import java.util.Arrays;
import java.util.List;public class Application {public static void main(String[] args) {ApplicationContext context = new AnnotationConfigApplicationContext(AppConfiguration.class);CargoGateway gateway = context.getBean(CargoGateway.class);List<Cargo> cargos = Arrays.asList(new Cargo(1, "Receiver1", "Address1", 500, "Domestic"),new Cargo(2, "Receiver2", "Address2", 1500, "International"));gateway.processCargoRequest(MessageBuilder.withPayload(cargos).build());}
}

四、运行过程

  1. 启动 Application 类。
  2. 系统会根据配置:
    • 拆分货物列表。
    • 过滤重量超过限制的货物。
    • 路由货物到不同的通道。
    • 最终处理并记录货物信息。
  3. 控制台输出处理结果。

五、适用场景

Spring Integration 非常适合以下场景:

  1. 企业系统集成:如 ERP、CRM、供应链系统之间的数据交换。
  2. 消息驱动架构:如基于事件的微服务通信。
  3. 复杂消息处理:如批量处理、过滤、路由、转换等。
  4. 与外部系统交互:如文件系统、消息队列(RabbitMQ、Kafka)、数据库等。

通过 Spring Integration,可以轻松实现复杂的企业集成需求,同时保持代码的可维护性和扩展性。
参考链接:https://dzone.com/articles/message-processing-spring

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/492676.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EE308FZ_Sixth Assignment_Beta Sprint_Sprint Essay 3

Assignment 6Beta SprintCourseEE308FZ[A] — Software EngineeringClass Link2401_MU_SE_FZURequirementsTeamwork—Beta SprintTeam NameFZUGOObjectiveSprint Essay 3_Day5-Day6 (12.15-12.16)Other Reference1. WeChat Mini Program Design Guide 2. Javascript Style Guid…

凯酷全科技抖音电商服务的卓越践行者

在数字经济蓬勃发展的今天&#xff0c;电子商务已成为企业增长的新引擎。随着短视频平台的崛起&#xff0c;抖音作为全球领先的短视频社交平台&#xff0c;不仅改变了人们的娱乐方式&#xff0c;也为品牌和商家提供了全新的营销渠道。厦门凯酷全科技有限公司&#xff08;以下简…

架构信息收集(小迪网络安全笔记~

附&#xff1a;完整笔记目录~ ps&#xff1a;本人小白&#xff0c;笔记均在个人理解基础上整理&#xff0c;若有错误欢迎指正&#xff01; 2.2 架构信息收集 引子&#xff1a;一个Web应用的构成&#xff0c;由诸多组件&服务相结合&#xff0c;而域名仅是处于Web架构中最表…

一.photoshop导入到spine

这里使用的是 photoshoptospine脚本 下载地址:https://download.csdn.net/download/boyxgb/90156744 脚本的使用,可以通过文件的脚本的浏览,浏览该脚本使用该脚本,也可以将该脚本放在photoshop安装文件夹里的script文件夹下,具体路径:Photoshop\Presets\Scripts,重启photosho…

Mapbox-GL 的源码解读的一般步骤

Mapbox-GL 是一个非常优秀的二三维地理引擎&#xff0c;随着智能驾驶时代的到来&#xff0c;应用也会越来越广泛&#xff0c;关于mapbox-gl和其他地理引擎的详细对比&#xff08;比如CesiumJS&#xff09;&#xff0c;后续有时间会加更。地理首先理解 Mapbox-GL 的源码是一项复…

SparkSQL运行架构及原理

文章目录 SparkSQL运行架构及原理1.1. Catalyst优化器简介1.2. SparkSQL运行架构1.3. SparkSQL解析Core底层原理1.4. 执行计划查看 SparkSQL运行架构及原理 1.1. Catalyst优化器简介 SparkSQL使得我们开发人员可以使用DSL风格的数据来处理数据&#xff0c;甚至可以直接使用SQ…

大数据-254 离线数仓 - Airflow 任务调度 核心交易调度任务集成

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…

昇思25天学习打卡营第33天|共赴算力时代

文章目录 一、平台简介二、深度学习模型2.1 处理数据集2.2 模型训练2.3 加载模型 三、共赴算力时代 一、平台简介 昇思大模型平台&#xff0c;就像是AI学习者和开发者的超级基地&#xff0c;这里不仅提供丰富的项目、模型和大模型体验&#xff0c;还有一大堆经典数据集任你挑。…

Docker 镜像加速和配置的分享 云服务器搭建beef-xss

前言 最近很多的docker镜像加速都鸡鸡了 找点资源是越来越不容易了 什么事docker 因为我是个业余的人 我简单的说 docker就是比如我们的软件商店的 下载 docker镜像&#xff08;之前就是我们在服务器上搭建网站 和环境的很费力费时 之后就有了这个 镜像 &#xff1a;这…

浅谈怎样系统的准备前端面试

前言 创业梦碎&#xff0c;回归现实&#xff0c;7 月底毅然裸辞&#xff0c;苦战两个月&#xff0c;拿到了美团和字节跳动的 offer&#xff0c;这算是从业以来第一次真正意义的面试&#xff0c;遇到蛮多问题&#xff0c;比如一开始具体的面试过程我都不懂&#xff0c;基本一直是…

告别机器人味:如何让ChatGPT写出有灵魂的内容

目录 ChatGPT的一些AI味道小问题 1.提供编辑指南 2.提供样本 3.思维链大纲 4.融入自己的想法 5.去除重复增加多样性 6.删除废话 ChatGPT的一些AI味道小问题 大多数宝子们再使用ChatGPT进行写作时&#xff0c;发现我们的老朋友ChatGPT在各类写作上还有点“机器人味”太重…

【长城杯】Web题 hello_web 解题思路

查看源代码发现路径提示 访问…/tips.php显示无用页面&#xff0c;怀疑…/被过滤&#xff0c;采用…/./形式&#xff0c;看到phpinfo()页面 注意到disable_functions&#xff0c;禁用了很多函数 访问hackme.php,看到页面源码 发现eval函数&#xff0c;包含base64 解密获得php代…

Windows部署Docker及PostgreSQL数据库相关操作

一、Windows安装Docker 1.wsl安装 以管理员身份启动命令行&#xff0c;运行&#xff1a;wsl --install&#xff1b; 安装结束后&#xff0c;重启电脑&#xff0c;以管理员身份启动命令行&#xff0c;运行&#xff1a;wsl --install -d Ubuntu&#xff1b; 中间需要输入用户名…

HTML零基础入门教学

目录 一. HTML语言 二. HTML结构 三. HTML文件基本结构 四. 准备开发环境 五. 快速生成代码框架 六. HTML常见标签 6.1 注释标签 6.2 标题标签&#xff1a;h1-h6 6.3 段落标签&#xff1a;p 6.4 换行标签&#xff1a;br 6.5 格式化标签 6.6 图片标签&a…

Springboot应用开发:工具类整理

目录 一、编写目的 二、映射工具类 2.1 依赖 2.2 代码 三、日期格式 3.1 依赖 3.2 代码 四、加密 4.1 代码 五、Http请求 5.1 依赖 5.2 代码 六、金额 6.1 代码 七、二维码 7.1 依赖 7.2 代码 八、坐标转换 8.1 代码 九、树结构 9.1 代码 9.1.1 节点 9.1…

libaom 源码分析:熵编码模块介绍

AV1 熵编码原理介绍 关于AV1 熵编码原理介绍可以参考:AV1 编码标准熵编码技术概述libaom 熵编码相关源码介绍 函数流程图 核心函数介绍 av1_pack_bitstream 函数:该函数负责将编码后的数据打包成符合 AV1 标准的比特流格式;包括写入序列头 OBU 的函数 av1_write_obu_header…

一个开源的自托管虚拟浏览器项目,支持在安全、私密的环境中使用浏览器

大家好&#xff0c;今天给大家分享一个开源的自托管虚拟浏览器项目Neko&#xff0c;旨在利用 WebRTC 技术在 Docker 容器中运行虚拟浏览器&#xff0c;为用户提供安全、私密且多功能的浏览体验。 项目介绍 Neko利用 WebRTC 技术在 Docker 容器中运行虚拟浏览器&#xff0c;提供…

【已解决】启动此实时调试器时未使用必需的安全权限。要调试该进程,必须以管理员身份运行此实时调试器。是否调试该进程?

【已解决】启动此实时调试器时未使用必需的安全权限。要调试该进程&#xff0c;必须以管理员身份运行此实时调试器。是否调试该进程? 目录一、前言二、具体原因三、解决方法 目录 报错截图 一、前言 进行应用程序开发时&#xff0c;需要对w3wp进行附加调试等场景&#xff…

Docker--Docker Registry(镜像仓库)

什么是Docker Registry&#xff1f; 镜像仓库&#xff08;Docker Registry&#xff09;是Docker生态系统中用于存储、管理和分发Docker镜像的关键组件。 镜像仓库主要负责存储Docker镜像&#xff0c;这些镜像包含了应用程序及其相关的依赖项和配置&#xff0c;是构建和运行Doc…

如何用细节提升用户体验?

前端给用户反馈是提升用户体验的重要部分&#xff0c;根据场景选择不同的方式可以有效地提升产品的易用性和用户满意度。以下是常见的方法&#xff1a; 1. 视觉反馈 用户执行了某些操作后&#xff0c;需要即时确认操作结果。例如&#xff1a;按钮点击、数据提交、页面加载等。…