SpringBoot:使用Spring Batch实现批处理任务

引言

在这里插入图片描述

在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。

项目初始化

首先,我们需要创建一个SpringBoot项目,并添加Spring Batch相关的依赖项。可以通过Spring Initializr快速生成项目。

添加依赖

pom.xml中添加以下依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency><groupId>org.hsqldb</groupId><artifactId>hsqldb</artifactId><scope>runtime</scope>
</dependency>

配置Spring Batch

基本配置

Spring Batch需要一个数据库来存储批处理的元数据。我们可以使用HSQLDB作为内存数据库。配置文件application.properties

spring.datasource.url=jdbc:hsqldb:mem:testdb
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.username=sa
spring.datasource.password=
spring.batch.initialize-schema=always
创建批处理任务

一个典型的Spring Batch任务包括三个主要部分:ItemReader、ItemProcessor和ItemWriter。

  1. ItemReader:读取数据的接口。
  2. ItemProcessor:处理数据的接口。
  3. ItemWriter:写数据的接口。
创建示例实体类

创建一个示例实体类,用于演示批处理操作:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;@Entity
public class Person {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String firstName;private String lastName;// getters and setters
}
创建ItemReader

我们将使用一个简单的FlatFileItemReader从CSV文件中读取数据:

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;@Configuration
public class BatchConfiguration {@Beanpublic FlatFileItemReader<Person> reader() {return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited().names(new String[]{"firstName", "lastName"}).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class);}}).build();}
}
创建ItemProcessor

创建一个简单的ItemProcessor,将读取的数据进行处理:

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {@Overridepublic Person process(Person person) throws Exception {final String firstName = person.getFirstName().toUpperCase();final String lastName = person.getLastName().toUpperCase();final Person transformedPerson = new Person();transformedPerson.setFirstName(firstName);transformedPerson.setLastName(lastName);return transformedPerson;}
}
创建ItemWriter

我们将使用一个简单的JdbcBatchItemWriter将处理后的数据写入数据库:

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;@Configuration
public class BatchConfiguration {@Beanpublic JdbcBatchItemWriter<Person> writer(NamedParameterJdbcTemplate jdbcTemplate) {return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()).sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(jdbcTemplate.getJdbcTemplate().getDataSource()).build();}
}

配置Job和Step

一个Job由多个Step组成,每个Step包含一个ItemReader、ItemProcessor和ItemWriter。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;@Beanpublic Job importUserJob(JobCompletionNotificationListener listener, Step step1) {return jobBuilderFactory.get("importUserJob").listener(listener).flow(step1).end().build();}@Beanpublic Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();}
}

监听Job完成事件

创建一个监听器,用于监听Job完成事件:

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;@Component
public class JobCompletionNotificationListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {System.out.println("Job Started");}@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println("Job Ended");}
}

测试与运行

创建一个简单的CommandLineRunner,用于启动批处理任务:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class BatchApplication implements CommandLineRunner {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job job;public static void main(String[] args) {SpringApplication.run(BatchApplication.class, args);}@Overridepublic void run(String... args) throws Exception {jobLauncher.run(job, new JobParameters());}
}

在完成配置后,可以运行应用程序,并检查控制台输出和数据库中的数据,确保批处理任务正常运行。

扩展功能

在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:

  • 多步骤批处理:一个Job可以包含多个Step,每个Step可以有不同的ItemReader、ItemProcessor和ItemWriter。
  • 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
  • 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
  • 数据验证:在处理数据前进行数据验证,确保数据的正确性。
多步骤批处理
@Bean
public Job multiStepJob(JobCompletionNotificationListener listener, Step step1, Step step2) {return jobBuilderFactory.get("multiStepJob").listener(listener).start(step1).next(step2).end().build();
}@Bean
public Step step2(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step2").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();
}
并行处理

可以通过配置多个线程来实现并行处理:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).taskExecutor(taskExecutor()).build();
}@Bean
public TaskExecutor taskExecutor() {SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();taskExecutor.setConcurrencyLimit(10);return taskExecutor;
}

结论

通过本文的介绍,我们了解了如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。从项目初始化、配置Spring Batch、实现ItemReader、ItemProcessor和ItemWriter,到配置Job和Step,Spring Batch提供了一系列强大的工具和框架,帮助开发者高效地实现批处理任务。通过合理利用这些工具和框架

,开发者可以构建出高性能、可靠且易维护的批处理系统。希望这篇文章能够帮助开发者更好地理解和使用Spring Batch,在实际项目中实现批处理任务的目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/363718.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

排序(冒泡排序、选择排序、插入排序、希尔排序)-->深度剖析(一)

欢迎来到我的Blog&#xff0c;点击关注哦&#x1f495; 前言 排序是一种基本的数据处理操作&#xff0c;它涉及将一系列项目重新排列&#xff0c;以便按照指定的标准&#xff08;通常是数值大小&#xff09;进行排序。在C语言中&#xff0c;排序算法是用来对元素进行排序的一系…

【高性能服务器】服务器概述

&#x1f525;博客主页&#xff1a; 我要成为C领域大神&#x1f3a5;系列专栏&#xff1a;【C核心编程】 【计算机网络】 【Linux编程】 【操作系统】 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 ​ 服务器概述 服…

DDMA信号处理以及数据处理的流程---聚类

Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…

静态链表详解(C语言版)

顺序表和链表的优缺点 顺序表和链表是两种基本的线性数据结构&#xff0c;它们各自有不同的优缺点&#xff0c;适用于不同的应用场景。 顺序表&#xff08;Sequential List&#xff0c;通常指数组&#xff09; 优点&#xff1a; 随机访问&#xff1a;可以通过索引快速访问任…

【技术追踪】UNest:一种用于非配对医学图像合成的新框架(MICCAI-2024)

前天看了一篇文章图像分割用diffusion&#xff0c;今天看了篇文章图像合成不用diffusion&#xff0c;你说说这~ 传送门&#xff1a;【技术追踪】SDSeg&#xff1a;医学图像的 Stable Diffusion 分割&#xff08;MICCAI-2024&#xff09; UNest&#xff1a;UNet结构的Transforme…

Java对象类辨识指南:Object与Objects类的区别详解

今天在写lambda表达式时&#xff0c;用filter来做过滤判断我的结果是否为null时使用到了Objects.nonNull&#xff0c;但是敲着敲着发现不对劲&#xff0c;怎么没有nonNull方法?? 其实时我少敲了一个s&#xff0c;当时自己并没有很清楚Object和Objects两者之前的区别&#xf…

Ansible-综合练习-生产案例

斌的招儿 网上教程大多都是官网模板化的教程和文档&#xff0c;这里小斌用自己实际生产环境使用的例子给大家做一个详解。涉及到一整套ansible的使用&#xff0c;对于roles的使用&#xff0c;也仅涉及到tasks和files目录&#xff0c;方便大家快速上手并规范化管理。 0.环境配置…

波音危机:星际客机飞船故障,宇航员被困太空!马斯克的SpaceX的“龙”飞船来救援?

本文首发于公众号“AntDream”&#xff0c;欢迎微信搜索“AntDream”或扫描文章底部二维码关注&#xff0c;和我一起每天进步一点点 在人类探索宇宙的漫漫征途中&#xff0c;波音公司的“星际客机”承载着无限的希望与梦想&#xff0c;却也面临着前所未有的挑战。从原计划的8天…

pdf已加密如何解除?解密密码的两个方法【可加密】

电脑文件加密的目的就是保护重要信息&#xff0c;防止数据泄露。如果需要解除密码&#xff0c;应该如何操作呢&#xff1f;pdf已加密如何解除&#xff1f;本文整理了以下两种解除文件方法&#xff0c;希望能够帮到有需要的朋友们&#xff01; 方法一、使用金舟文件夹加密大师解…

实验八 T_SQL编程

题目 以电子商务系统数据库ecommerce为例 1、在ecommerce数据库&#xff0c;针对会员表member首先创建一个“呼和浩特地区”会员的视图view_hohhot&#xff0c;然后通过该视图查询来自“呼和浩特”地区的会员信息&#xff0c;用批处理命令语句将问题进行分割&#xff0c;并分…

c语言--指针

前言 欢迎来到我的博客 个人主页:北岭敲键盘的荒漠猫-CSDN博客 本文整理c语言中指针的相关知识点。 指针概念 指针存储的就是数据的地址。 直观理解: 李华家是北洋路130号1单元101 用变量处理数据: 我们去李华家拿数据。 用指针处理数据: 我们去北洋路130号1单元101拿数据…

【嵌入式DIY实例】-Nokia 5110显示BME280传感器数据

Nokia 5110显示BME280传感器数据 文章目录 Nokia 5110显示BME280传感器数据1、硬件准备与接线2、代码实现本文将介绍如何使用 ESP8266 NodeMCU 板(ESP12-E 模块)和 BME280 气压、温度和湿度传感器构建一个简单的本地气象站。 NodeMCU 从 BME280 传感器读取温度、湿度和压力值…

Java学习 - 布隆过滤器

前置需求 需求 已经有50亿个电话号码&#xff0c;现在给出10万个电话号码&#xff0c;如何快速准确地判断这些电话号码是否已经存在&#xff1f; 参考方案 通过数据库查询&#xff1a;比如MySQL&#xff0c;性能不行&#xff0c;速度太慢将数据先放进内存&#xff1a;50亿*8字…

6.优化算法之模拟

1.替换所有的问号 . - 力扣&#xff08;LeetCode&#xff09; class Solution {public String modifyString(String s) {char[] sss.toCharArray();int nss.length;for(int i0;i<n;i){if(ss[i]?){for(char cha;ch<z;ch){if((i0||ss[i-1]!ch)&&(in-1||ss[i1]!c…

基于CNN卷积神经网络的步态识别matlab仿真,数据库采用CASIA库

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1步态识别系统框架 4.2 CNN原理及数学表述 4.3 CASIA步态数据库 5.算法完整程序工程 1.算法运行效果图预览 (完整程序运行后无水印) 1.训练过程 2.样本库 3.提取的步态能量图 4.步态识…

二进制方式部署consul单机版

1.consul的下载 mkdir -p /root/consul/data && cd /root/consul wget https://releases.hashicorp.com/consul/1.18.0/consul_1.18.0_linux_amd64.zip unzip consul_1.18.0_linux_amd64.zip mv consul /usr/local/bin/ 2.配置文件 // 配置文件路径&#xff1a; /roo…

抖音矩阵云混剪系统源码 短视频矩阵营销系统V2(全开源版)

>>>系统简述&#xff1a; 抖音阵营销系统多平台多账号一站式管理&#xff0c;一键发布作品。智能标题&#xff0c;关键词优化&#xff0c;排名查询&#xff0c;混剪生成原创视频&#xff0c;账号分组&#xff0c;意向客户自动采集&#xff0c;智能回复&#xff0c;多…

高效数据采集监控平台 一体化平台 数据可视化!

提高工作效率&#xff0c;一直是各种厂家在寻找的方法。任何一种有效且实用的方法都值得去尝试。数据采集监控平台是一种能高效处理数据的方式&#xff0c;其主要工作内容是从各个产生数据的仪器设备传感器中采集数据、对数据进行集中整理整合、分析、显示、绘制图表、存储、传…

2, 搭建springCloud 项目 测试demo

上篇文章 新建了父依赖服务&#xff0c;这篇文章就建两个demo测试服务。 因为后面需要做服务间的通讯测试&#xff0c;所以至少需要建两个服务 建个子模块 同样的方式建连个demo服务 给java 和resources目录添加属性 在resources目录下建一个applications.yml文件&#xff0c;…

基于香农编码的图像压缩算法实现,聊聊!

&#x1f3c6;本文收录于《CSDN问答解答》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&…