Spring Boot集成Spring Batch快速入门Demo

1.什么是Spring Batch?

Spring Batch 是一个轻量级的开源框架,它提供了一种简单的方式来处理大量的数据。它基于Spring框架,提供了一套批处理框架,可以处理各种类型的批处理任务,如ETL、数据导入/导出、报表生成等。Spring Batch提供了一些重要的概念,如Job、Step、ItemReader、ItemProcessor、ItemWriter等,这些概念可以帮助我们构建可重用的批处理应用程序。通过Spring Batch,我们可以轻松地实现批处理的并发、容错、重试等功能,同时也可以方便地与其他Spring组件集成,如Spring Boot、Spring Data等。总之,Spring Batch是一个非常强大、灵活、易于使用的批处理框架,可以帮助我们快速构建高效、可靠的批处理应用程序。

分层架构

spring-batch-layers

可以看到它分为三层,分别是:

  • Application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
  • Batch Core核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
  • Batch Infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader写出writer、重试框架等。

主要概念

spring-batch-reference-model

2.2.1 JobRepository

专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以Spring Batch是需要依赖数据库来管理的。

2.2.2 任务启动器JobLauncher

负责启动任务Job

2.2.3 任务Job

Job是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个Job所定义的内容。

111

  上图介绍了Job的一些相关概念:

  • Job:封装处理实体,定义过程逻辑。
  • JobInstanceJob的运行实例,不同的实例,参数不同,所以定义好一个Job后可以通过不同参数运行多次。
  • JobParameters:与JobInstance相关联的参数。
  • JobExecution:代表Job的一次实际执行,可能成功、可能失败。

所以,开发人员要做的事情,就是定义Job

2.2.4 步骤Step

Step是对Job某个过程的封装,一个Job可以包含一个或多个Step,一步步的Step按特定逻辑执行,才代表Job执行完成。

222

  通过定义Step来组装Job可以更灵活地实现复杂的业务逻辑。

2.2.5 输入——处理——输出

所以,定义一个Job关键是定义好一个或多个Step,然后把它们组装好即可。而定义Step有多种方法,但有一种常用的模型就是输入——处理——输出,即Item ReaderItem ProcessorItem Writer。比如通过Item Reader从文件输入数据,然后通过Item Processor进行业务处理和数据转换,最后通过Item Writer写到数据库中去。 Spring Batch为我们提供了许多开箱即用的ReaderWriter,非常方便。

 

2.环境搭建

参照代码仓库mysql模块里面docker目录搭建

3.代码工程

实验目标

如何使用 Spring Boot 创建各种不同类型 Spring Batch Job

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>SpringBatch</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

job

第一个简单的任务

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FirstJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job firstJob() {return jobBuilderFactory.get("firstJob").start(step()).build();}private Step step() {return stepBuilderFactory.get("step").tasklet((contribution, chunkContext) -> {System.out.println("execute  step....");return RepeatStatus.FINISHED;}).build();}
}

多步骤的job

package com.et.batch.job;import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class MultiStepJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job multiStepJob() {/*return jobBuilderFactory.get("multiStepJob").start(step1()).next(step2()).next(step3()).build();*/// control the next step by last Statusreturn jobBuilderFactory.get("multiStepJob2").start(step1()).on(ExitStatus.COMPLETED.getExitCode()).to(step2()).from(step2()).on(ExitStatus.COMPLETED.getExitCode()).to(step3()).from(step3()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}
}

多flow控制的job, 创建一个flow对象,包含若干个step

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FlowJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job flowJob() {return jobBuilderFactory.get("flowJob").start(flow()).next(step3()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}private Flow flow() {return new FlowBuilder<Flow>("flow").start(step1()).next(step2()).build();}
}

并发执行的jobs

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Component;@Component
public class SplitJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job splitJob() {return jobBuilderFactory.get("splitJob").start(flow1()).split(new SimpleAsyncTaskExecutor()).add(flow2()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}private Flow flow1() {return new FlowBuilder<Flow>("flow1").start(step1()).next(step2()).build();}private Flow flow2() {return new FlowBuilder<Flow>("flow2").start(step3()).build();}
}

根据上次运行结果判断是否执行下一步

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class DeciderJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate MyDecider myDecider;@Beanpublic Job deciderJob() {return jobBuilderFactory.get("deciderJob").start(step1()).next(myDecider).from(myDecider).on("weekend").to(step2()).from(myDecider).on("workingDay").to(step3()).from(step3()).on("*").to(step4()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}private Step step4() {return stepBuilderFactory.get("step4").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step4。。。");return RepeatStatus.FINISHED;}).build();}
}

父子嵌套job

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;@Component
public class NestedJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobRepository jobRepository;@Autowiredprivate PlatformTransactionManager platformTransactionManager;@Beanpublic Job parentJob() {return jobBuilderFactory.get("parentJob").start(childJobOneStep()).next(childJobTwoStep()).build();}private Step childJobOneStep() {return new JobStepBuilder(new StepBuilder("childJobOneStep")).job(childJobOne()).launcher(jobLauncher).repository(jobRepository).transactionManager(platformTransactionManager).build();}private Step childJobTwoStep() {return new JobStepBuilder(new StepBuilder("childJobTwoStep")).job(childJobTwo()).launcher(jobLauncher).repository(jobRepository).transactionManager(platformTransactionManager).build();}private Job childJobOne() {return jobBuilderFactory.get("childJobOne").start(stepBuilderFactory.get("childJobOneStep").tasklet((stepContribution, chunkContext) -> {System.out.println("subtask1。。。");return RepeatStatus.FINISHED;}).build()).build();}private Job childJobTwo() {return jobBuilderFactory.get("childJobTwo").start(stepBuilderFactory.get("childJobTwoStep").tasklet((stepContribution, chunkContext) -> {System.out.println("subtask2。。。");return RepeatStatus.FINISHED;}).build()).build();}
}

application.yaml

自动会初始化脚本,只需要建立以恶搞空库就行

spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/springbatchusername: rootpassword: 123456batch:jdbc:schema: classpath:org/springframework/batch/core/schema-mysql.sqlinitialize-schema: always #Since Spring Boot 2.5.0 use spring.batch.jdbc.initialize-schema=neverjob:enabled: true

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.

4.测试

  • 启动Spring Boot应用程序,系统会自动运行job,跑过一次,下次启动不会继续执行
  • 如果要执行定时任务,可以利用spring提供的scheduledTaskRegistrar注册一个定时任务,扫描最新的定时任务,将这些定时任务注册到scheduleFuture中从而实现动态定时任务。

5.引用

  • Batch Applications :: Spring Boot
  • Spring Boot集成Spring Batch快速入门Demo | Harries Blog™

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/383162.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【通信协议-RTCM】MSM语句(2) - RINEXMSM7语句总结(重要!自动化开发计算卫星状态常用)

注释&#xff1a; 在工作中主要负责的是RTCM-MSM7语句相关开发工作&#xff0c;所以主要介绍的就是MSM7语句相关内容 1. 相位校准参考信号 2. MSM1、MSM2、MSM3、MSM4、MSM5、MSM6和MSM7的消息头内容 DATA FIELDDF NUMBERDATA TYPENO. OF BITSNOTES Message Number - 消息编…

基于STM32的农业大棚温湿度采集控制系统的设计

目录 1、设计要求 2、系统功能 3、演示视频和实物 4、系统设计框图 5、软件设计流程图 6、原理图 7、主程序 8、总结 &#x1f91e;大家好&#xff0c;这里是5132单片机毕设设计项目分享&#xff0c;今天给大家分享的是智能教室。 设备的详细功能见网盘中的文章《8、基…

pycharm git 新建备忘

git 提交时出现如下错误&#xff1a; Committer identity unknown *** Please tell me who you are. Run git config --global user.email "youexample.com" git config --global user.name "Your Name" to set your accounts default identity. Omit…

在STM32嵌入式中C/C++语言对栈空间的使用

像STM32这样的微控制器在进入main函数之前需要对栈进行初始化。可以说栈是C语言运行时的必要条件。我们知道栈实际上是一块内存空间&#xff0c;那么这块空间都用来存储什么呢&#xff1f;有什么办法能够优化栈空间的使用&#xff1f; 栈空间保存的内容 栈是一个先入后出的数据…

华杉研发九学习日记17 正则表达式 异常

华杉研发九学习日记17 一&#xff0c;正则表达式 ^ $ 作用&#xff1a; 测试字符串内的模式(匹配) 例如&#xff0c;可以测试输入字符串&#xff0c;以查看字符串内是否出现电话号码模式或信用卡号码模式。这称为数据验证. 替换文本&#xff08;替换》 可以使用正则表达式来…

ubuntu安装mysql8.0

文章目录 ubuntu版本安装修改密码取消root跳过密码验证 ubuntu版本 22.04 安装 更新软件包列表 sudo apt update安装 MySQL 8.0 服务器 sudo apt install mysql-server在安装过程中&#xff0c;系统可能会提示您设置 root 用户的密码&#xff0c;请务必牢记您设置的密码。…

微信小程序实现聊天界面,发送功能

.wxml <scroll-view scroll-y"true" style"height: {{windowHeight}}px;"><view wx:for"{{chatList}}" wx:for-index"index" wx:for-item"item" style"padding-top:{{index0?30:0}}rpx"><!-- 左…

MySQL数据库安装使用

我们都知道数据库又分为关系型数据库和非关系型数据库&#xff1b; 关系型数据库指采用了关系模型来组织数据的数据库&#xff0c;指的就是二维表格模型。可以先初步理解为Excel表格。非关系型数据库又被称为NoSQL&#xff0c;对NoSQL 最普遍的定义是“非关联型的”&#xff0…

C#测试控制台程序调用Quartz.NET的基本用法

Quartz.Net是常用的任务调用框架之一&#xff0c;既能在客户端程序中使用&#xff0c;也支持在网页程序后台调用。本文结合参考文献4中的示例代码学习其在控制台程序中的基本用法。   VS2022新建控制台项目&#xff0c;在Nuget包管理器中搜索并安装Quartz包&#xff0c;如下所…

SvelteKit - 1. 初始化项目

官方 doc - create a project 1、基本环境 &#xff08;下面是我这里的环境&#xff0c;亲测用 node 14 和 16 install 会报错&#xff09; node&#xff1a;20.9.0 npm&#xff1a;10.1.0 2、初始化项目 npm create sveltelatest my-app cd my-app npm install npm run de…

【C语言报错已解决】Use of Uninitialized Variable

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 引言&#xff1a; 在编程中&#xff0c;未初始化的变量是一个常见的问题&#xff0c;它可能导致程序的行为变得不可预测。未初…

Java基本数据类型与String类型的转换

目录 基本数据类型和Strng类型的转换 第一种方法 第二种方法 将字符串转成字符 注意事项 本章练习题 题1 题2 基本数据类型和Strng类型的转换 第一种方法 使用号和" "即可完成转换 第二种方法 第二种方法是通过基本类型的包装类调用parsexx方法 将字符…

理解进程status的二进制位表示及进程等待(是什么,为什么,怎么办)

信号编号&#xff1a;低7位 状态编号&#xff1a;次低8位 1.子进程退出后会变为僵尸进程&#xff0c;将退出结果写入自身的task_struct结构体中 2.wait/waitpid是一个系统调用->OS可以读取子进程的task_struct 1.为什么要进行进程等待&#xff1f; 1.将子进程&#xff…

达梦数据库系列—30. DTS迁移Mysql到DM

目录 1.MySQL 源端信息 2.DM 目的端信息 3.迁移评估 4.数据库迁移 4.1源端 MySQL 准备 4.2目的端达梦准备 初始化参数设置 兼容性参数设置 创建迁移用户和表空间 4.3迁移步骤 创建迁移 配置迁移对象及策略 开始迁移 对象补迁 5.数据校验 统计 MySQL 端对象及数…

Eclipse 搭建 C/C++ 开发环境以及eclipse的使用

一、下载、安装 MinGW 1、下载: 下载地址&#xff1a;MinGW - Minimalist GNU for Windows - Browse Files at SourceForge.net 点击“Download Latest Version”即可 下载完成后&#xff0c;得到一个名为 mingw-get-setup.exe 的安装文件。双击运行&#xff0c;安装即可。 …

Docker容器限制内存与CPU使用

文章目录 Docker 容器限制内存与 CPU 使用内存限额内存限制命令举例使用 `nginx` 镜像学习内存分配只指定 `-m` 参数的情况CPU 限制命令举例验证资源使用Docker 容器限制内存与 CPU 使用 在生产环境中,为了保证服务器不因某一个软件导致服务器资源耗尽,我们会限制软件的资源…

WPF启动失败报System.Windows.Automation.Peers.AutomationPeer.Initialize()错误解决

问题描述 win10系统上WPF程序启动后就崩溃&#xff0c;通过查看崩溃日志如下&#xff1a; 应用程序: xxx.exe Framework 版本: v4.0.30319 说明: 由于未经处理的异常&#xff0c;进程终止。 异常信息: System.TypeLoadException 在 System.Windows.Automation.Peers.Automatio…

META 备受期待的 Llama 3 405B 即将发布

本心、输入输出、结果 文章目录 META 备受期待的 Llama 3 405B 即将发布前言Llama 3 405B或许会彻底改变专用模型的数据质量Llama 3 405B将形成新的模型生态系统:从基础模型到专家组合Llama 3 405B有最高效 API 的竞争Llama 3 405B 基准测试META 备受期待的 Llama 3 405B 即将…

VMware三种网络模式---巨细

文章目录 目录 ‘一.网络模式概述 二.桥接模式 二.NAT模式 三.仅主机模式 四.案例演示 防火墙配置&#xff1a; 虚拟电脑配置 前言 本文主要介绍VMware的三种网络模式 ‘一.网络模式概述 VMware中分为三种网络模式&#xff1a; 桥接模式&#xff1a;默认与宿主机VMnet0绑…

pytest常用命令行参数解析

简介&#xff1a;pytest作为一个成熟的测试框架&#xff0c;它提供了许多命令行参数来控制测试的运行方式&#xff0c;以配合适用于不同的测试场景。例如 -x 可以用于希望出现错误就停止&#xff0c;以便定位和分析问题。–rerunsnum适用于希望进行失败重跑等个性化测试策略。 …