目录标题
- 一、学习目标
- 学习目标
- 前置知识
- 二、Spring Batch简介
- 2.1 何为批处理?
- 2.2 Spring Batch了解
- 2.3 Spring Batch 优势
- 2.4 Spring Batch 架构
- 三、入门案例
- 3.1 批量处理流程
- 3.2 入门案例-H2版(内存)
- 3.3 入门案例-MySQL版
- 四、入门案例解析
一、学习目标
学习目标
-
系统了解Spring Batch批处理
-
项目中能熟练使用Spring Batch批处理
前置知识
-
Java基础
-
Maven
-
Spring SpringMVC SpringBoot
-
MyBatis
二、Spring Batch简介
2.1 何为批处理?
何为批处理,大白话:就是将数据分批次进行处理的过程。比如:银行对账逻辑,跨系统数据同步等。
常规的批处理操作步骤:系统A从数据库中导出数据到文件,系统B读取文件数据并写入到数据库
典型批处理特点:
-
自动执行,根据系统设定的工作步骤自动完成
-
数据量大,少则百万,多则上千万甚至上亿。(如果是10亿,100亿那只能上大数据了)
-
定时执行,比如:每天,每周,每月执行。
2.2 Spring Batch了解
官网介绍:https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-intro.html#spring-batch-intro
这里挑重点讲下:
-
Sping Batch 是一个轻量级的、完善的的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。
-
Spring Batch 是Spring的一个子项目,基于Spring框架为基础的开发的框架
-
Spring Batch 提供大量可重用的组件,比如:日志,追踪,事务,任务作业统计,任务重启,跳过,重复,资源管理等
-
Spring Batch 是一个批处理应用框架,不提供调度框架,如果需要定时处理需要额外引入-调度框架,比如: Quartz
2.3 Spring Batch 优势
Spring Batch 框架通过提供丰富的开箱即用的组件和高可靠性、高扩展性的能力,使得开发批处理应用的人员专注于业务处理,提高处理应用的开发能力。下面就是使用Spring Batch后能获取到优势:
-
丰富的开箱即用组件
-
面向Chunk的处理
-
事务管理能力
-
元数据管理
-
易监控的批处理应用
-
丰富的流程定义
-
健壮的批处理应用
-
易扩展的批处理应用
-
复用企业现有的IT代码
2.4 Spring Batch 架构
Spring Batch 核心架构分三层:应用层,核心层,基础架构层。
Application:应用层,包含所有的批处理作业,程序员自定义代码实现逻辑。
Batch Core:核心层,包含Spring Batch启动和控制所需要的核心类,比如:JobLauncher, Job,Step等。
Batch Infrastructure:基础架构层,提供通用的读,写与服务处理。
三层体系使得Spring Batch 架构可以在不同层面进行扩展,避免影响,实现高内聚低耦合设计。
三、入门案例
3.1 批量处理流程
前面对Spring Batch 有大体了解之后,那么开始写个案例玩一下。
开始前,先了解一下Spring Batch程序运行大纲:
**
JobLauncher:作业调度器,作业启动主要入口。
Job:作业,需要执行的任务逻辑,
Step:作业步骤,一个Job作业由1个或者多个Step组成,完成所有Step操作,一个完整Job才算执行结束。
ItemReader:Step步骤执行过程中数据输入。可以从数据源(文件系统,数据库,队列等)中读取Item(数据记录)。
ItemWriter:Step步骤执行过程中数据输出,将Item(数据记录)写入数据源(文件系统,数据库,队列等)。
ItemProcessor:Item数据加工逻辑(输入),比如:数据清洗,数据转换,数据过滤,数据校验等
JobRepository: 保存Job或者检索Job的信息。SpringBatch需要持久化Job(可以选择数据库/内存),JobRepository就是持久化的接口
3.2 入门案例-H2版(内存)
需求:打印一个hello spring batch!不带读/写/处理
步骤1:导入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--内存版--><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
其中的h2是一个嵌入式内存数据库,后续可以使用MySQL替换
步骤2:创建测试方法
package com.langfeiyes.batch._01_hello;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@EnableBatchProcessing
public class HelloJob {//job调度器@Autowiredprivate JobLauncher jobLauncher;//job构造器工厂@Autowiredprivate JobBuilderFactory jobBuilderFactory;//step构造器工厂@Autowiredprivate StepBuilderFactory stepBuilderFactory;//任务-step执行逻辑由tasklet完成@Beanpublic Tasklet tasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("Hello SpringBatch....");return RepeatStatus.FINISHED;}};}//作业步骤-不带读/写/处理@Beanpublic Step step1(){return stepBuilderFactory.get("step1").tasklet(tasklet()).build();}//定义作业@Beanpublic Job job(){return jobBuilderFactory.get("hello-job").start(step1()).build();}public static void main(String[] args) {SpringApplication.run(HelloJob.class, args);}}
步骤3:分析
例子是一个简单的SpringBatch 入门案例,使用了最简单的一种步骤处理模型:Tasklet模型,step1中没有带上读/写/处理逻辑,只有简单打印操作,后续随学习深入,我们再讲解更复杂化模型。
package com.zyy.batch._01hello;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;/*** Spring Batch 入门示例* * Spring Batch 核心组件:* - JobLauncher: 作业调度器,作业启动的主要入口* - Job: 作业,需要执行的任务逻辑* - Step: 作业步骤,一个Job由1个或多个Step组成,完成所有Step后,Job才算执行结束* - ItemReader: 数据读取器,从数据源(文件系统,数据库,队列等)中读取数据记录* - ItemWriter: 数据写入器,将数据记录写入目标数据源* - ItemProcessor: 数据处理器,进行数据清洗、转换、过滤、校验等操作* - JobRepository: 作业仓库,保存和检索Job信息,实现Job的持久化(数据库或内存)*/
@EnableBatchProcessing // 开启Spring Batch功能,让Spring容器创建相关组件
@SpringBootApplication // 标记为SpringBoot应用启动类
public class HelloJob {// 自动注入Spring Batch核心组件@Autowiredprivate JobLauncher jobLauncher; // 作业启动器@Autowiredprivate JobBuilderFactory jobBuilderFactory; // Job构建工厂@Autowiredprivate StepBuilderFactory stepBuilderFactory; // Step构建工厂/*** 创建Tasklet,定义Step的执行逻辑* Tasklet是最简单的Step实现方式,适合简单的处理逻辑*/@Beanpublic Tasklet tasklet() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {// 业务逻辑实现System.out.println("hello 瓦塔西在学习springbatch");// 返回执行状态:FINISHED表示执行完成return RepeatStatus.FINISHED;}};}/*** 创建Step,一个Job可以包含多个Step*/@Beanpublic Step step1() {// 使用构建器模式创建Stepreturn stepBuilderFactory.get("step1") // 指定Step名称.tasklet(tasklet()) // 绑定Tasklet.build(); // 构建Step对象}/*** 创建Job,整个批处理作业的入口*/@Beanpublic Job job() {// 使用构建器模式创建Jobreturn jobBuilderFactory.get("hello-job") // 指定Job名称.start(step1()) // 指定起始Step.build(); // 构建Job对象}/*** 应用程序入口*/public static void main(String[] args) {SpringApplication.run(HelloJob.class, args);}
}
详细到逆天的分析
package com.zyy.batch._01hello;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;/*** Spring Batch 入门示例 - Hello World Job* * Spring Batch 是一个批处理框架,用于处理大量数据操作。它提供了可重用的功能,如日志/跟踪,* 事务管理,作业处理统计,作业重启,跳过和资源管理等。* * 核心概念详解:* * 1. JobLauncher - 作业调度器* - 负责启动Job的执行* - 可以同步或异步方式运行Job* - 处理Job参数的传递* * 2. Job - 批处理作业* - 表示一个完整的批处理过程* - 由一个或多个Step组成* - 具有自己的生命周期和状态管理* * 3. Step - 作业步骤* - Job的组成部分,表示Job中的一个独立工作阶段* - 可以是简单的Tasklet模式,也可以是复杂的Chunk处理模式* - 每个Step可以有自己的ItemReader, ItemProcessor和ItemWriter* * 4. Tasklet - 任务组件* - 最简单的Step实现方式* - 用于简单的处理逻辑,例如执行存储过程、文件操作等* - 一次性执行的任务单元* * 5. ItemReader - 数据读取器* - 负责从各种数据源(文件、数据库、消息队列等)读取数据* - 一次读取一条数据记录* - 支持多种数据源的读取策略* * 6. ItemProcessor - 数据处理器* - 对ItemReader读取的数据进行处理* - 可进行数据转换、验证、过滤、清洗等操作* - 是可选组件,可以没有* * 7. ItemWriter - 数据写入器* - 负责将处理后的数据写入目标位置* - 批量写入多条数据记录* - 支持多种目标位置(文件、数据库、消息队列等)* * 8. JobRepository - 作业仓库* - 持久化和检索Job相关的元数据* - 跟踪Job的执行状态* - 存储Job的执行历史信息* * 9. JobInstance - 作业实例* - 特定Job的逻辑运行单元* - 由Job和JobParameters组合确定* - 每次使用相同参数运行同一Job时会重用JobInstance* * 10. JobExecution - 作业执行* - 表示一次Job运行的尝试* - 包含运行状态、开始时间、结束时间等信息* - 一个JobInstance可以有多个JobExecution(如失败后重试)* * 11. StepExecution - 步骤执行* - 表示一次Step运行的尝试* - 记录Step执行的详细信息* - 包含读取、处理、写入和跳过的记录数等统计信息* * 12. ExecutionContext - 执行上下文* - 保存执行过程中的状态信息* - 用于作业重启时恢复状态* - 存在于JobExecution和StepExecution级别*/
@EnableBatchProcessing // 启用Spring Batch功能,自动配置JobRepository, JobLauncher, JobRegistry等组件
@SpringBootApplication // 标识为SpringBoot应用的启动类,启用自动配置和组件扫描
public class HelloJob {/*** JobLauncher - 作业启动器* * 职责:* 1. 接收并验证Job和JobParameters* 2. 根据Job和JobParameters解析对应的JobInstance* 3. 创建JobExecution并执行Job* 4. 处理作业执行中的异常* 5. 返回JobExecution给调用者* * 注:通过@EnableBatchProcessing注解,Spring Boot会自动创建并注册JobLauncher Bean*/@Autowired // 自动注入由Spring容器管理的JobLauncher实例private JobLauncher jobLauncher;/*** JobBuilderFactory - Job构建工厂* * 作用:* 1. 提供流式API创建和配置Job* 2. 简化Job的创建过程* 3. 设置Job的各种属性(名称、监听器、验证器等)* 4. 配置Job的流程控制(顺序执行、条件执行等)* * 常用方法:* - get(String name): 创建指定名称的JobBuilder* - start(Step step): 设置Job的第一个Step* - next(Step step): 添加下一个要执行的Step* - flow(Step step): 创建基于流程的Job* - validator(JobParametersValidator validator): 设置参数验证器* - listener(JobExecutionListener listener): 添加Job执行监听器*/@Autowired // 自动注入由Spring容器管理的JobBuilderFactory实例private JobBuilderFactory jobBuilderFactory;/*** StepBuilderFactory - Step构建工厂* * 作用:* 1. 提供流式API创建和配置Step* 2. 简化Step的创建过程* 3. 支持创建不同类型的Step(Tasklet步骤、Chunk步骤)* 4. 配置Step的各种属性(名称、监听器、事务等)* * 常用方法:* - get(String name): 创建指定名称的StepBuilder* - tasklet(Tasklet tasklet): 创建Tasklet类型的Step* - chunk(int commitInterval): 创建Chunk类型的Step,指定提交间隔* - reader(ItemReader reader): 设置数据读取器* - processor(ItemProcessor processor): 设置数据处理器* - writer(ItemWriter writer): 设置数据写入器* - listener(StepExecutionListener listener): 添加Step执行监听器* - faultTolerant(): 配置容错处理* - transactionManager(PlatformTransactionManager tm): 设置事务管理器*/@Autowired // 自动注入由Spring容器管理的StepBuilderFactory实例private StepBuilderFactory stepBuilderFactory;/*** 创建Tasklet - 定义Step的执行逻辑* * Tasklet是Step的最简单实现方式,适合简单的处理逻辑。* 它的execute方法会被反复调用,直到返回RepeatStatus.FINISHED或抛出异常。* * 参数说明:* - StepContribution: 包含更新当前StepExecution所需的信息* - ChunkContext: 包含当前Step执行的相关上下文信息* * 返回值说明:* - RepeatStatus.FINISHED: 表示Tasklet执行完成,不再重复执行* - RepeatStatus.CONTINUABLE: 表示Tasklet需要继续执行* * @return 配置好的Tasklet实例*/@Bean // 标记为Spring Bean,由Spring容器管理生命周期public Tasklet tasklet() {// 创建匿名内部类实现Tasklet接口return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {// 实现业务逻辑 - 这里只是简单打印一条消息System.out.println("hello 瓦塔西在学习springbatch");// 返回FINISHED状态,表示任务执行完成,不再重复执行return RepeatStatus.FINISHED;// 注:如果返回RepeatStatus.CONTINUABLE,则会重复执行此Tasklet}};// 也可以使用Lambda表达式简化代码:// return (contribution, chunkContext) -> {// System.out.println("hello 瓦塔西在学习springbatch");// return RepeatStatus.FINISHED;// };}/*** 创建Step - 定义批处理作业的一个步骤* * Step表示批处理作业中的一个独立阶段,可以配置自己的处理逻辑、事务控制、重启策略等。* 这里使用最简单的Tasklet类型Step,适合一次性执行的简单任务。* * 构建过程:* 1. 使用stepBuilderFactory.get()获取StepBuilder* 2. 通过StepBuilder配置Step属性* 3. 调用build()方法创建Step实例* * @return 配置好的Step实例*/@Bean // 标记为Spring Bean,由Spring容器管理生命周期public Step step1() {return stepBuilderFactory.get("step1") // 指定Step名称,用于在Job执行过程中标识该Step.tasklet(tasklet()) // 使用上面定义的Tasklet实现Step的处理逻辑.build(); // 构建Step对象并返回// 更复杂的Step配置示例(仅供参考):// return stepBuilderFactory.get("step1")// .tasklet(tasklet())// .listener(new StepExecutionListener() { ... }) // 添加Step执行监听器// .allowStartIfComplete(true) // 允许重新执行已完成的Step// .startLimit(3) // 设置重启次数上限// .build();}/*** 创建Job - 定义整个批处理作业* * Job是Spring Batch中的顶级概念,表示一个完整的批处理过程。* 它由一个或多个Step组成,定义了Steps的执行顺序和条件。* * 构建过程:* 1. 使用jobBuilderFactory.get()获取JobBuilder* 2. 通过JobBuilder配置Job属性和流程* 3. 调用build()方法创建Job实例* * @return 配置好的Job实例*/@Bean // 标记为Spring Bean,由Spring容器管理生命周期public Job job() {return jobBuilderFactory.get("hello-job") // 指定Job名称,用于在执行和监控中标识该Job.start(step1()) // 设置Job的第一个Step.build(); // 构建Job对象并返回// 多步骤Job配置示例(仅供参考):// return jobBuilderFactory.get("hello-job")// .start(step1())// .next(step2())// .next(step3())// .listener(new JobExecutionListener() { ... }) // 添加Job执行监听器// .validator(new JobParametersValidator() { ... }) // 添加参数验证器// .preventRestart() // 禁止重启该Job// .build();// 条件分支Job配置示例(仅供参考):// return jobBuilderFactory.get("hello-job")// .start(step1())// .on("COMPLETED").to(step2()) // 如果step1完成,则执行step2// .from(step1()).on("FAILED").to(errorStep()) // 如果step1失败,则执行errorStep// .end()// .build();}/*** 应用程序入口方法* * 作用:* 1. 启动SpringBoot应用* 2. 初始化Spring上下文* 3. 注册和配置所有Bean* 4. 触发批处理作业的执行* * 注:* 在默认配置下,当应用启动时,Spring Batch会自动执行所有已配置的Job。* 这一行为可通过application.properties中的spring.batch.job.enabled=false禁用。* * @param args 命令行参数*/public static void main(String[] args) {// 启动SpringBoot应用,传入主类和命令行参数SpringApplication.run(HelloJob.class, args);// 注:如果需要手动控制Job的启动,可以注入JobLauncher和Job,然后使用以下代码启动Job:// JobParameters parameters = new JobParametersBuilder()// .addLong("time", System.currentTimeMillis())// .toJobParameters();// jobLauncher.run(job, parameters);}
}
3.3 入门案例-MySQL版
MySQL跟上面的h2一样,区别在连接数据库不一致。
步骤1:在H2版本基础上导入MySQL依赖
爆红记得多刷新maven
<!-- <dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope>
</dependency> --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version>
</dependency>
步骤2:配置数据库四要素与初始化SQL脚本
spring:datasource:
# 数据库四要素 账号 密码 链接地址 驱动名称username: rootpassword: 123456url: jdbc:mysql://127.0.0.1:3306/springbatch?serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=truedriver-class-name: com.mysql.cj.jdbc.Driver# 初始化数据库,文件在依赖jar包中sql:init:schema-locations: classpath:org/springframework/batch/core/schema-mysql.sql
# mode: alwaysmode: never
# 全局搜索快捷鍵 ctrl n 或者 Shift按两次
# 繁体简体转换 “Ctrl + Shift + F” 快捷键
这里要注意, sql.init.model 第一次启动为always, 后面启动需要改为never,否则每次执行SQL都会异常。
第一次启动会自动执行指定的脚本,后续不需要再初始化
步骤3:测试
跟H2版一样。
如果是想再执行,需要换一下job名字,不然springboot不会执行:Hello-job1
四、入门案例解析
idea 查看 类 所有方法的快捷键
Idea:ctrl+F12
Eclipse:Ctrl+O
1>@EnableBatchProcessing
批处理启动注解,要求贴配置类或者启动类上
@SpringBootApplication
@EnableBatchProcessing
public class HelloJob {...
}
贴上@EnableBatchProcessing注解后,SpringBoot会自动加载JobLauncher JobBuilderFactory StepBuilderFactory 类并创建对象交给容器管理,要使用时,直接@Autowired即可
//job调度器
@Autowired
private JobLauncher jobLauncher;
//job构造器工厂
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造器工厂
@Autowired
private StepBuilderFactory stepBuilderFactory;
2>配置数据库四要素
批处理允许重复执行,异常重试,此时需要保存批处理状态与数据,Spring Batch 将数据缓存在H2内存中或者缓存在指定数据库中。入门案例如果要保存在MySQL中,所以需要配置数据库四要素。
3>创建Tasklet对象
//任务-step执行逻辑由tasklet完成
@Bean
public Tasklet tasklet(){return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {System.out.println("Hello SpringBatch....");return RepeatStatus.FINISHED;}};
}
Tasklet负责批处理step步骤中具体业务执行,它是一个接口,有且只有一个execute方法,用于定制step执行逻辑。
public interface Tasklet {RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
execute方法返回值是一个状态枚举类:RepeatStatus,里面有可继续执行态与已经完成态
public enum RepeatStatus {/*** 可继续执行的-tasklet返回这个状态会进入死循环*/CONTINUABLE(true), /*** 已经完成态*/FINISHED(false);....
}
4>创建Step对象
//作业步骤-不带读/写/处理
@Bean
public Step step1(){return stepBuilderFactory.get("step1").tasklet(tasklet()).build();
}
Job作业执行靠Step步骤执行,入门案例选用最简单的Tasklet模式,后续再讲Chunk块处理模式。
5>创建Job并执行Job
//定义作业
@Bean
public Job job(){return jobBuilderFactory.get("hello-job").start(step1()).build();
}
创建Job对象交给容器管理,当springboot启动之后,会自动去从容器中加载Job对象,并将Job对象交给JobLauncherApplicationRunner类,再借助JobLauncher类实现job执行。
验证过程;