Spring Cloud Data Flow快速入门Demo

1.什么是Spring Cloud Data Flow?

Spring Cloud Data Flow 是一个用于构建和编排数据处理流水线的云原生框架。它提供了一种简化的方式来定义、部署和管理数据处理任务和流应用程序。以下是一些关键特性和组件:

关键特性

  1. 流处理

    • 支持实时数据流处理,可以通过定义源、处理器和接收器来构建数据流。
  2. 批处理任务

    • 支持批处理任务的调度和执行,适用于需要定期运行的任务。
  3. 可扩展性

    • 支持多种数据处理引擎,如 Apache Kafka、RabbitMQ 和 Apache Spark,允许用户根据需求选择合适的技术栈。
  4. 可视化界面

    • 提供基于 Web 的用户界面,用户可以通过拖放组件来设计数据流和任务。
  5. 监控和管理

    • 提供监控和管理工具,可以查看应用程序的运行状态、日志和指标。

组件

  1. Spring Cloud Data Flow Server

    • 核心组件,负责管理和协调数据流和任务的部署。
  2. Skipper

    • 用于应用程序的版本管理和滚动更新,确保在更新过程中最小化停机时间。
  3. 数据流应用程序

    • 由 Spring Boot 构建的微服务应用程序,分为源、处理器和接收器三种类型。
  4. 任务应用程序

    • 用于执行一次性任务或批处理作业。

Spring Cloud Data Flow 适用于需要处理大规模数据流和批处理任务的场景,特别是在分布式系统和云环境中。它简化了数据管道的开发和管理,使开发者能够专注于业务逻辑的实现

2.环境搭建

docker-compose.yml

version: '3'services:mysql:image: mysql:5.7.25container_name: dataflow-mysqlenvironment:MYSQL_DATABASE: dataflowMYSQL_USER: rootMYSQL_ROOT_PASSWORD: rootpwexpose:- 3306ports:- "3306:3306"volumes:- ./my.cnf:/etc/mysql/my.cnfkafka-broker:image: confluentinc/cp-kafka:5.3.1container_name: dataflow-kafkaexpose:- "9092"environment:- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_ADVERTISED_HOST_NAME=kafka-broker- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1depends_on:- zookeeperzookeeper:image: confluentinc/cp-zookeeper:5.3.1container_name: dataflow-kafka-zookeeperexpose:- "2181"environment:- ZOOKEEPER_CLIENT_PORT=2181dataflow-server:image: springcloud/spring-cloud-dataflow-server:2.6.3container_name: dataflow-serverports:- "9393:9393"environment:- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=PLAINTEXT://kafka-broker:9092- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.streams.binder.brokers=PLAINTEXT://kafka-broker:9092- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=zookeeper:2181- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.streams.binder.zkNodes=zookeeper:2181- spring.cloud.skipper.client.serverUri=http://skipper-server:7577/api- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow- SPRING_DATASOURCE_USERNAME=root- SPRING_DATASOURCE_PASSWORD=rootpw- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driverdepends_on:- kafka-brokerentrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-dataflow-server.jar"volumes:- ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf}app-import:image: springcloud/openjdk:2.0.0.RELEASEcontainer_name: dataflow-app-importdepends_on:- dataflow-servercommand: >/bin/sh -c "./wait-for-it.sh -t 180 dataflow-server:9393;wget -qO- 'http://dataflow-server:9393/apps' --post-data='uri=${STREAM_APPS_URI:-https://dataflow.spring.io/kafka-maven-latest&force=true}';echo 'Stream apps imported'wget -qO- 'http://dataflow-server:9393/apps' --post-data='uri=${TASK_APPS_URI:-https://dataflow.spring.io/task-maven-latest&force=true}';echo 'Task apps imported'"skipper-server:image: springcloud/spring-cloud-skipper-server:2.5.2container_name: skipperports:- "7577:7577"- "20000-20105:20000-20105"environment:- SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_LOW=20000- SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_HIGH=20100- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow- SPRING_DATASOURCE_USERNAME=root- SPRING_DATASOURCE_PASSWORD=rootpw- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driverentrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-skipper-server.jar"volumes:- ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf}

启动

docker-compose -f .\docker-compose.yml up -d

dashboard

http://localhost:9393/dashboard/

dashboard

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springcloud-demo(Spring Cloud Data Flow)

3.使用指南

Task

任务处理用于一次性或批量数据处理,适合处理需要在特定时间点或周期性执行的任务。任务应用程序通常具有以下特性:

  1. 一次性执行

    • 任务在被触发时执行一次,完成后即停止。
    • 适用于需要定期运行的批处理作业,如数据迁移、报告生成和数据清理。
  2. 批处理支持

    • 可以处理大量数据,通常与 Spring Batch 集成以支持复杂的批处理需求。
    • 支持事务管理、重试机制和并行处理。

任务应用程序适用于需要在后台执行的长时间运行作业,特别是在需要处理大量数据的情况下

任务列表

task1

创建任务

task2

streams

流处理主要用于实时数据处理,适合处理持续不断的数据流。流应用程序通常由以下三种组件组成:

  1. 源(Source)

    • 负责从外部系统(如消息队列、数据库、文件系统等)读取数据并将其发送到流中。
    • 例如,从 Kafka 主题中读取消息。
  2. 处理器(Processor)

    • 接收源发送的数据,对其进行处理或转换,然后将结果发送到下一个组件。
    • 例如,对数据进行过滤、聚合或格式转换。
  3. 接收器(Sink)

    • 负责将处理后的数据输出到目标系统(如数据库、文件系统、消息队列等)。
    • 例如,将处理后的数据写入到数据库表中。

流应用程序通常用于需要低延迟和高吞吐量的场景,如实时数据分析、事件驱动架构和物联网数据处理。

streams列表

 

stream创建

4.引用

  • Spring Cloud Data Flow
  • Spring Cloud Data Flow快速入门Demo | Harries Blog™

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/477017.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# .NET环境下调用ONNX格式YOLOV8模型问题总结

我的环境是: Visual Studio: 2019 显卡: 一、遇到问题 1、EntryPointNotFoundException:无法在DLL“onnxruntime”中找到名为“OrtGetApiBase”的入口点。差了下原因,入口点是启动项中的问题。 原因:之前用yolov7时安装的版本在C…

量子感知机

神经网络类似于人类大脑,是模拟生物神经网络进行信息处理的一种数学模型。它能解决分类、回归等问题,是机器学习的重要组成部分。量子神经网络是将量子理论与神经网络相结合而产生的一种新型计算模式。1995年美国路易斯安那州立大学KAK教授首次提出了量子…

AI Large Language Model

AI 的 Large Language model LLM , 大语言模型: 是AI的模型,专门设计用来处理自然语言相关任务。它们通过深度学习和庞大的训练数据集,在理解和生成自然语言文本方面表现出色。常见的 LLM 包括 OpenAI 的 GPT 系列、Google 的 PaLM 和 Meta…

运维团队3D可视化智能机房管理方案

随着信息技术的飞速发展,机房作为信息技术基础设施的核心部分,其管理效率与可视化程度对运维团队的工作质量有着直接影响。本文将介绍一种结合3D可视化技术的机房管理方案,为运维团队提供一种新的视角和工具,以提升机房管理的效率…

CKA认证 | Day2 K8s内部监控与日志

第三章 Kubernetes监控与日志 1、查看集群资源状态 在 Kubernetes 集群中,查看集群资源状态和组件状态是非常重要的操作。以下是一些常用的命令和解释,帮助你更好地管理和监控 Kubernetes 集群。 1.1 查看master组件状态 Kubernetes 的 Master 组件包…

111 - Lecture 10

File I/O and GUI with JavaFX 文件输入/输出与JavaFX图形用户界面 一、Overview 1. File I/O (1) learning Java File I/O mechanism (2) writing into and reading from a file 使用文件I/O进行数据读取和…

分享一下arr的意义(c基础)(必看)(牢记)

arr 即数组名 一般指数组首元素地址 在两种情况下不是 1:sizeof(arr) arr指整个数组简单讲解一下strlen与sizeof(c基础)_strzeof在c语言中什么意思-CSDN博客 2:printf("%p",&…

大数据基于Spring Boot的化妆品推荐系统的设计与实现

摘 要 随着大数据时代的到来,人们对于个性化服务的需求越来越高。化妆品推荐系统作为一个认知智能模型段,在为消费者提供更好的购物体验方面发挥了重要作用。本研究基于大数据技术设计了一个高效准确的化妆品推荐系统。通过对海量数据的分析和处理&…

NUXT3学习日记四(路由中间件、导航守卫)

前言 在 Nuxt 3 中,中间件(Middleware)是用于在页面渲染之前或导航发生之前执行的函数。它们允许你在路由切换时执行逻辑,像是身份验证、重定向、权限控制、数据预加载等任务。中间件可以被全局使用,也可以只在特定页…

在Unity环境中读取Excel配置文件(入门)

使用Excel作为配置的优势 使用Excel作为配置文件有相对普通的文本文档/json等类型的配置文件有一个更好的优点,更易于编辑,更易读.譬如上面的例子,我可以制作一个人员名单,可以记录它们的姓名,年龄等信息,每一行就是一个对象,该表就是一个List. 环境准备 GitHub - ExcelDataR…

Maven maven项目构建的生命周期 Maven安装配置 IDEA 配置 Maven

一,Maven的概述 Maven的作用:专门用于管理和构建Java项目的工具,它的主要功能有: 提供了一套标准化的项目结构提供了一套标准化的构建流程(编译,测试,打包,发布……)提…

VM虚拟机装MAC后无法联网,如何解决?

✨在vm虚拟机上,给虚拟机MacOS设置网络适配器。选择NAT模式用于共享主机的IP地址 ✨在MacOS设置中设置网络 以太网 使用DHCP ✨回到本地电脑上,打开 服务,找到VMware DHCP和VMware NAT,把这两个服务打开,专一般问题就…

day06(单片机高级)PCB设计

目录 PCB设计 PCB设计流程 元器件符号设计 原理图设计 元器件封装设计 元器件库使用 PCB设计 目的:学习从画原理图到PCB设计的整个流程 PCB设计流程 元器件符号设计 元器件符号:这是电子元器件的图形表示,用于在原理图中表示特定的元器件。例…

向量数据库FAISS之一:官方简单教程

1.安装 1.conda安装 # CPU-only version --> Linux (x86_64 and arm64), OSX (arm64 only), and Windows (x86_64) $ conda install -c pytorch faiss-cpu1.8.0# GPU(CPU) version --> Linux (x86_64 only) for CUDA 11.4 and 12.1 $ conda install -c pytorch -c nvid…

VMware Workstation 17.6.1

概述 目前 VMware Workstation Pro 发布了最新版 v17.6.1: 本月11号官宣:针对所有人免费提供,包括商业、教育和个人用户。 使用说明 软件安装 获取安装包后,双击默认安装即可: 一路单击下一步按钮: 等待…

java实现小程序接口返回Base64图片

文章目录 引言I java 接口返回Base64图片接口设计获取验证码图片-base64字符串获取验证码图片-二进制流arraybufferII 小程序端代码过期代码: 显示文件流图片(arraybuffer)知识扩展:微信小程序下载后端返回的文件流引言 场景: 图形验证码 背景: 接口返回arraybuffer的格式…

transformer.js(一):这个前端大模型运行框架的可运行环境、使用方式、代码示例以及适合与不适合的场景

随着大模型的广泛应用,越来越多的开发者希望在前端直接运行机器学习模型,从而减少对后端的依赖,并提升用户体验。Transformer.js 是一个专为前端环境设计的框架,它支持运行基于 Transformer 架构的深度学习模型,尤其是…

xiaolin coding 图解网络笔记——HTTP篇

1. HTTP 是什么? HTTP 是超文本传输协议(HyperText Transfer Protocol),一个用在计算机世界里专门在【两点】之间【传输】文字、图片、音频、视频等【超文本】数据的【约定和规范】。 2. HTTP 常见的状态码有哪些? …

23种设计模式-模板方法(Template Method)设计模式

文章目录 一.什么是模板方法模式?二.模板方法模式的特点三.模板方法模式的结构四.模板方法模式的应用场景五.模板方法模式的优缺点六.模板方法模式的C实现七.模板方法模式的JAVA实现八.代码解析九.总结 类图: 模板方法设计模式类图 一.什么是模板方法模…

力扣 LeetCode 235. 二叉搜索树的最近公共祖先(Day10:二叉树)

解题思路: 方法一:递归 递归法没有中的逻辑 class Solution {public TreeNode lowestCommonAncestor(TreeNode root, TreeNode p, TreeNode q) {return recur(root, p, q);}public TreeNode recur(TreeNode root, TreeNode p, TreeNode q) {if (root…