Spring Cloud Stream实现数据流处理

1.什么是Spring Cloud Stream?

我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

module

我们很容易总结出每个模块的流程:

  1. 从上一个模块拉取数据
  2. 处理数据
  3. 将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

2.环境准备

采用docker-compose搭建kafaka环境

version: '3'networks:kafka:ipam:driver: defaultconfig:- subnet: "172.22.6.0/24"services:zookepper:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latestcontainer_name: zookeeper-serverrestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_ANONYMOUS_LOGIN: yesports:- "2181:2181"networks:kafka:ipv4_address: 172.22.6.11kafka:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1container_name: kafkarestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_PLAINTEXT_LISTENER: yesKAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092ports:- "9092:9092"depends_on:- zookeppernetworks:kafka:ipv4_address: 172.22.6.12kafka-map:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-mapcontainer_name: kafka-maprestart: unless-stoppedvolumes:- "./kafka/kafka-map/data:/usr/local/kafka-map/data"environment:DEFAULT_USERNAME: adminDEFAULT_PASSWORD: 123456ports:- "9080:8080"depends_on:                         - kafkanetworks:kafka:ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 访问:http://127.0.0.1:9080
  • 账号密码:admin/123456

3.代码工程

stream

 实验目标

  1. 生成UUID并将其发送到Kafka主题batch-in
  2. batch-in主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out主题。
  3. 监听batch-out主题并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springcloud-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>spring-cloud-stream-kafaka</artifactId><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>

处理流

/** Copyright 2023 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      https://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.et;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;/*** @author Steven Gantz*/
@SpringBootApplication
public class CloudStreamsFunctionBatch {public static void main(String[] args) {SpringApplication.run(CloudStreamsFunctionBatch.class, args);}@Beanpublic Supplier<UUID> stringSupplier() {return () -> {var uuid = UUID.randomUUID();System.out.println(uuid + " -> batch-in");return uuid;};}@Beanpublic Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {return idBatch -> {System.out.println("Removed digits from batch of " + idBatch.size());return idBatch.stream().map(UUID::toString)// Remove all digits from the UUID.map(uuid -> uuid.replaceAll("\\d","")).map(noDigitString -> MessageBuilder.withPayload(noDigitString).build()).toList();};}@KafkaListener(id = "batch-out", topics = "batch-out")public void listen(String in) {System.out.println("batch-out -> " + in);}}
  • 定义一个名为stringSupplier的Bean,它实现了Supplier<UUID>接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in主题。
  • 定义一个名为digitRemovingConsumer的Bean,它实现了Function<List<UUID>, List<Message<String>>>接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。
  • 使用@KafkaListener注解定义一个Kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

配置文件

spring:cloud:function:definition: stringSupplier;digitRemovingConsumerstream:bindings:stringSupplier-out-0:destination: batch-indigitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: truedigitRemovingConsumer-out-0:destination: batch-outkafka:binder:brokers: localhost:9092bindings:digitRemovingConsumer-in-0:consumer:configuration:# Forces consumer to wait 5 seconds before polling for messagesfetch.max.wait.ms: 5000fetch.min.bytes: 1000000000max.poll.records: 10000000

参数解释

spring:cloud:function:definition: stringSupplier;digitRemovingConsumer
  • spring.cloud.function.definition:定义了两个函数,stringSupplierdigitRemovingConsumer。这两个函数将在应用程序中被使用。
    stream:bindings:stringSupplier-out-0:destination: batch-in
  • stream.bindings.stringSupplier-out-0.destination:将stringSupplier函数的输出绑定到Kafka主题batch-in
        digitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: true
  • stream.bindings.digitRemovingConsumer-in-0.destination:将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in
  • group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。
  • consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。
        digitRemovingConsumer-out-0:destination: batch-out
  • stream.bindings.digitRemovingConsumer-out-0.destination:将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springcloud-demo(Spring Cloud Stream)

4.测试

启动弄Spring Boot应用,可以看到控制台输出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

5.引用

  • https://github.com/spring-cloud/spring-cloud-stream-samples
  • Spring Cloud Stream 3.X - coderZoe的博客
  • Spring Cloud Stream实现数据流处理 | Harries Blog™

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/475988.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenMMlab导出Mask R-CNN模型并用onnxruntime和tensorrt推理

onnxruntime推理 使用mmdeploy导出onnx模型&#xff1a; from mmdeploy.apis import torch2onnx from mmdeploy.backend.sdk.export_info import export2SDKimg demo.JPEG work_dir ./work_dir/onnx/mask_rcnn save_file ./end2end.onnx deploy_cfg mmdeploy/configs/mmd…

【大语言模型】ACL2024论文-19 SportsMetrics: 融合文本和数值数据以理解大型语言模型中的信息融合

【大语言模型】ACL2024论文-19 SportsMetrics: 融合文本和数值数据以理解大型语言模型中的信息融合 https://arxiv.org/pdf/2402.10979 目录 文章目录 【大语言模型】ACL2024论文-19 SportsMetrics: 融合文本和数值数据以理解大型语言模型中的信息融合目录摘要研究背景问题与挑…

39页PDF | 毕马威_数据资产运营白皮书(限免下载)

一、前言 《毕马威数据资产运营白皮书》探讨了数据作为新型生产要素在企业数智化转型中的重要性&#xff0c;提出了数据资产运营的“三要素”&#xff08;组织与意识、流程与规范、平台与工具&#xff09;和“四重奏”&#xff08;数据资产盘点、评估、治理、共享&#xff09;…

【UE5】使用基元数据对材质传参,从而避免新建材质实例

在项目中&#xff0c;经常会遇到这样的需求&#xff1a;多个模型&#xff08;例如 100 个&#xff09;使用相同的材质&#xff0c;但每个模型需要不同的参数设置&#xff0c;比如不同的颜色或随机种子等。 在这种情况下&#xff0c;创建 100 个实例材质不是最佳选择。正确的做…

[STBC]

空时分组编码STBC&#xff08;Space Time Block Coding&#xff09;: //一个数据流通过多个天线发射发送&#xff0c;硬件编码器 STBC概念是从MIMO技术衍生出来的&#xff0c;目的是在多天线系统中提高数据传输的可靠性和传输距离。在rx&#xff08;接收天线&#xff09;和tx&…

241120学习日志——[CSDIY] [InternStudio] 大模型训练营 [09]

CSDIY&#xff1a;这是一个非科班学生的努力之路&#xff0c;从今天开始这个系列会长期更新&#xff0c;&#xff08;最好做到日更&#xff09;&#xff0c;我会慢慢把自己目前对CS的努力逐一上传&#xff0c;帮助那些和我一样有着梦想的玩家取得胜利&#xff01;&#xff01;&…

PCB 间接雷击模拟

雷击是一种危险的静电放电事件&#xff0c;其中两个带电区域会瞬间释放高达 1 千兆焦耳的能量。雷击就像一个短暂而巨大的电流脉冲&#xff0c;会对建筑物和电子设备造成严重损坏。雷击可分为直接和间接两类&#xff0c;其中间接影响是由于感应能量耦合到靠近雷击位置的物体。间…

IDEA2019搭建Springboot项目基于java1.8 解决Spring Initializr无法创建jdk1.8项目 注释乱码

后端界面搭建 将 https://start.spring.io/ 替换https://start.aliyun.com/ 报错 打开设置 修改如下在这里插入代码片 按此方法无果 翻阅治疗后得知 IDEA2019无法按照网上教程修改此问题因此更新最新idea2024或利用插件Alibaba Clouod Toolkit 换用IDEA2024创建项目 下一步…

单向C to DP视频传输解决方案 | LDR6500

LDR6500D如何通过Type-C接口实现手机到DP接口的单向视频传输 在当今数字化浪潮中&#xff0c;投屏技术作为连接设备、共享视觉内容的桥梁&#xff0c;其重要性日益凸显。PD&#xff08;Power Delivery&#xff09;芯片&#xff0c;特别是集成了Type-C接口与DisplayPort&#xf…

Leetcode 第 143 场双周赛题解

Leetcode 第 143 场双周赛题解 Leetcode 第 143 场双周赛题解题目1&#xff1a;3345. 最小可整除数位乘积 I思路代码复杂度分析 题目2&#xff1a;3346. 执行操作后元素的最高频率 I思路代码复杂度分析 题目3&#xff1a;3347. 执行操作后元素的最高频率 II题目4&#xff1a;33…

Spark 之 Aggregate

Aggregate 参考链接&#xff1a; https://github.com/PZXWHU/SparkSQL-Kernel-Profiling 完整的聚合查询的关键字包括 group by、 cube、 grouping sets 和 rollup 4 种 。 分组语句 group by 后面可以是一个或多个分组表达式&#xff08; groupingExpressions &#xff09;…

【IDEA】解决总是自动导入全部类(.*)问题

文章目录 问题描述解决方法 我是一名立志把细节说清楚的博主&#xff0c;欢迎【关注】&#x1f389; ~ 原创不易&#xff0c; 如果有帮助 &#xff0c;记得【点赞】【收藏】 哦~ ❥(^_-)~ 如有错误、疑惑&#xff0c;欢迎【评论】指正探讨&#xff0c;我会尽可能第一时间回复…

如何快速将Excel数据导入到SQL Server数据库

工作中&#xff0c;我们经常需要将Excel数据导入到数据库&#xff0c;但是对于数据库小白来说&#xff0c;这可能并非易事&#xff1b;对于数据库专家来说&#xff0c;这又可能非常繁琐。 这篇文章将介绍如何帮助您快速的将Excel数据导入到sql server数据库。 准备工作 这里&…

在centos7中安装SqlDeveloper的Oracle可视化工具

1.下载安装包 &#xff08;1&#xff09;在SqlDeveloper官网下载&#xff08;Oracle SQL Developer Release 19.2 - Get Started&#xff09;对应版本的安装包即可&#xff08;安装包和安装命令如下&#xff09;&#xff1a; &#xff08;2&#xff09;执行完上述命令后&#x…

【动手学深度学习Pytorch】4. 神经网络基础

模型构造 回顾一下感知机。 nn.Sequential()&#xff1a;定义了一种特殊的module。 torch.rand()&#xff1a;用于生成具有均匀分布的随机数&#xff0c;这些随机数的范围在[0, 1)之间。它接受一个形状参数&#xff08;shape&#xff09;&#xff0c;返回一个指定形状的张量&am…

Spring Boot + Vue 基于 RSA 的用户身份认证加密机制实现

Spring Boot Vue 基于 RSA 的用户身份认证加密机制实现 什么是RSA&#xff1f;安全需求介绍前后端交互流程前端使用 RSA 加密密码安装 jsencrypt库实现敏感信息加密 服务器端生成RSA的公私钥文件Windows环境 生成rsa的公私钥文件Linux环境 生成rsa的公私钥文件 后端代码实现返…

一键部署 200+ 开源软件的 Websoft9 面板,Github 2k+ 星星

Websoft9面板是一款基于Web的PaaS/Linux面板&#xff0c;可用于在自己的服务器上一键部署200多种热门开源应用&#xff0c;在Github上获得了2k星星。 特点与优势 丰富的开源软件集成&#xff1a;涵盖数据库、Web服务器、企业建站、电商系统、教育系统、中间件、大数据工具等多…

NLP论文速读(MPO)|通过混合偏好优化提高多模态大型语言模型的推理能力

论文速读|Dynamic Rewarding with Prompt Optimization Enables Tuning-free Self-Alignment of Language Models 论文信息&#xff1a; 简介&#xff1a; 本文探讨的背景是多模态大型语言模型&#xff08;MLLMs&#xff09;在多模态推理能力上的局限性&#xff0c;尤其是在链式…

动态规划子数组系列一>等差数列划分

题目&#xff1a; 解析&#xff1a; 代码&#xff1a; public int numberOfArithmeticSlices(int[] nums) {int n nums.length;int[] dp new int[n];int ret 0;for(int i 2; i < n; i){dp[i] nums[i] - nums[i-1] nums[i-1] - nums[i-2] ? dp[i-1]1 : 0;ret dp[i…

用 React18 构建Tic-Tac-Toe(井字棋)游戏

下面是一个完整的 Tic-Tac-Toe&#xff08;井字棋&#xff09;游戏的实现&#xff0c;用 React 构建。包括核心逻辑和组件分离&#xff0c;支持两人对战。 1. 初始化 React 项目&#xff1a; npx create-react-app tic-tac-toe cd tic-tac-toe2.文件结构 src/ ├── App.js…