分页多线程处理大批量数据

1.业务场景

因为需要从一个返利明细表中获取大量的数据,生成返利报告,耗时相对较久,作为后台任务执行。但是后台任务如果不用多线程处理,也会要很长时间才能处理完。

另外考虑到数据量大,不能一次查询所有数据在内存中处理,为了防止内存溢出,分页查询数据,然后分批次多线程处理。

主要思想是采取分治的思想,首先分页查询数据,然后每页数据分成均匀的不同片段,多个线程处理这些片段,一个线程处理一个片段,可以加上等待的同步计数器,让这一页数据全部处理完后再去查询下一页的数据。

2.关键代码

//线程池配置
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,10,10L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());public String generateReport(String periodType, String monthWid, String quarterWid) {int totalNum = 0;//计时器StopWatch stopWatch = new StopWatch();stopWatch.start();try {//这里省略了一些其他的逻辑,只关注分页查询然后多线程任务处理的逻辑......//查询总数量totalNum = getReportTotalNum(periodType, monthWid, quarterWid, totalNum);int pageIndex = 0;int pageSize = 500;int pageNum = 1;StoreRebateDetailForReportQueryReq req = null;while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {//分页查询,每页500条数据pageIndex = pageSize * (pageNum - 1);List<StoreRebateDetail> list = storeRebateDetailService.selectListForRebateReport(pageIndex, pageSize);int batchNum = list.size();//每个线程处理100条                                                                                       int perThreadCount = 100;LOGGER.info("开始处理第{}页(共{}条)数据", pageNum, batchNum);final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器for (int j = 0; j < batchNum; j++) {//每100条一个线程处理if (j % perThreadCount == 0) {int start = j;int end = (batchNum - j) >= perThreadCount ? (j + perThreadCount) : batchNum;int pageNums = pageNum;poolExecutor.submit(()->{LOGGER.info("第{}页的第{}-{}条数据处理开始", pageNums, start+1, end);//处理比较复杂的业务逻辑(耗时较久)processInsert(list, start, end);LOGGER.info("第{}页的第{}-{}条数据处理结束", pageNums, start+1, end);cdl.countDown();});}}cdl.await();pageNum++;}stopWatch.stop();double totalTimeSeconds = stopWatch.getTotalTimeSeconds();result.put("syncStatus", "success");result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");return SToolUtils.convertResultJSONObj(CommonAbstractService.SUCCESS_STATUS, "处理成功", totalNum, new JSONArray().fluentAdd(result)).toString();} catch (Exception e) {stopWatch.stop();double totalTimeSeconds = stopWatch.getTotalTimeSeconds();LOGGER.error("调度处理异常:{}--{}", e.getMessage(), e);result.put("syncStatus", "fail");result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");return SToolUtils.convertResultJSONObj(CommonAbstractService.ERROR_STATUS, "处理异常", 0, new JSONArray().fluentAdd(result)).toString();} finally {//做业务需要处理的,可以没有}}

后面改了个通用版,采用接口中的默认方法实现主要公共逻辑,其他几个需要不同实现的方法让子类去实现。

batchProcess方法为主要处理逻辑入口方法,供其子类继承,子类需要传递线程池、每页大小、每个线程处理的条数、查询数据的参数等参数。

processLongTimeLogic方法为处理时间比较长,需要多线程去执行的逻辑,子类直接覆写这个方法,将复杂的耗时比较长的业务逻辑放在里面就可以了。

queryTotalNum方法为查询总记录数的方法,子类去具体实现查询逻辑,查询数量是为了后续分页处理。

queryDataListByPage方法为分页查询数据的方法,也是子类去实现具体的逻辑,这里的第一个参数list加了泛型处理,<T>为查询数据返回的实体对象类,这样在后续处理的时候就不要去强转类型了。

这样子类只需要关注查询大表的查询逻辑,以及需要处理的具体业务逻辑,而不需要去处理分页和多线程处理的逻辑,这样增加了代码的可读性以及减少了出错的可能性。

public interface BatchProcessService<T> {/*** 批量处理,分页+多线程处理* @param poolExecutor       线程池* @param pageSize           每页查询的大小* @param perThreadCount     每个线程处理的记录数* @param queryTotalNumParam 查询记录总数的参数,必须继承PageReq* @param queryDataParam     查询分页列表的参数,必须继承PageReq* @param logger             子类的日志对象* @param otherParam         其他参数,需要给processLongTimeLogic方法传递的参数* @throws InterruptedException*/default int batchProcess(ThreadPoolExecutor poolExecutor, int pageSize, int perThreadCount, Object queryTotalNumParam, PageReq queryDataParam, Logger logger, Map<String, Object> otherParam) throws InterruptedException {int pageIndex = 0;int pageNum = 1;int totalNum = queryTotalNum(queryTotalNumParam);if (totalNum == 0) {logger.info("需要处理的数据数量为0");return 0;}try {while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {pageIndex = pageSize * (pageNum - 1);queryDataParam.setPageIndex(pageIndex);queryDataParam.setPageRows(pageSize);List<T> list = queryDataListByPage(queryDataParam);int batchNum = list.size();final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器for (int j = 1; j <= (batchNum % perThreadCount == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); j++) {//每100条一个线程处理int start = perThreadCount * (j - 1);int end = (batchNum - start) >= perThreadCount ? (start + perThreadCount) : batchNum;int pageNums = pageNum;poolExecutor.submit(() -> {logger.info("第{}页的第{}-{}条数据处理开始", pageNums, start + 1, end);//处理其他长时间的逻辑processLongTimeLogic(list.subList(start, end), otherParam);logger.info("第{}页的第{}-{}条数据处理结束", pageNums, start + 1, end);cdl.countDown();});}cdl.await();pageNum++;}} catch (Exception e) {logger.error("批量处理数据异常", e);throw e;}return totalNum;}/*** 查询记录总数** @param queryParam* @return*/int queryTotalNum(Object queryParam);/*** 分页查询数据** @param queryDataParam* @return*/List<T> queryDataListByPage(PageReq queryDataParam);/*** 处理长时间业务逻辑** @param list  处理的数据列表* @param otherParam 其他参数*/void processLongTimeLogic(List<T> list, Map<String, Object> otherParam);
}

PageReq类为分页查询参数的父类,里面包含了分页的一些属性,查询参数的实体继承该类就可以了,其他是自己的业务相关的参数。

import lombok.Getter;
import lombok.Setter;import java.io.Serializable;@Getter
@Setter
public class PageReq implements Serializable {/*** 当前页码*/private Integer pageIndex = 1;/*** 页大小*/private Integer pageRows = 10;public PageReq() {}public PageReq(Integer pageIndex, Integer pageRows) {this.pageIndex = pageIndex;this.pageRows = pageRows;}}

3.测试效果

原来跑一个月的数据需要40多分钟,后面通过这样处理后,采用5个线程跑,时间缩短至8分钟左右,相当于差不多时间缩短到原来的1/5。

image-20240320124945462

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/283400.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式Raft原理详解,从不同角色视角分析相关状态

分布式Raft原理详解&#xff0c;从不同角色视角分析相关状态 1. CAP定理2.Raft 要解决的问题3. Raft的核心逻辑3.1. Raft的核心逻辑2.1. 复制状态机2.2. 任期 Term2.3. 任期的意义&#xff1a;逻辑时钟2.4 选举定时器 3. Leader选举逻辑4. 从节点视角查看Leader选举4.1. Follow…

qt+ffmpeg 实现音视频播放(三)之视频播放

一、视频播放流程 &#xff08;PS&#xff1a;视频的播放流程跟音频的及其相似&#xff01;&#xff01;&#xff09; 1、打开视频文件 通过 avformat_open_input() 打开媒体文件并分配和初始化 AVFormatContext 结构体。 函数原型如下&#xff1a; int avformat_open_inpu…

ideaSSM 工程车辆人员管理系统bootstrap开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 idea 开发 SSM 工程车辆人员管理系统是一套完善的信息管理系统&#xff0c;结合SSM框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具 有完整的源代码和数据库&…

[AIGC] 在Spring Boot中指定请求体格式

在使用Spring Boot开发Web应用的时候&#xff0c;我们经常会遇到需要接收并处理HTTP请求的情况。一个HTTP请求通常包括一个请求行、若干请求头和一个请求体。请求体在POST和PUT请求中特别重要&#xff0c;因为它通常用于向服务器传递数据。 文章目录 创建并使用一个Java Bean指…

计算机二级Python基础操作题

题目来源&#xff1a;计算机二级Python半个月抱佛脚大法&#xff08;内呈上真题版&#xff09; - 知乎 第4&#xff0c;5&#xff0c;6&#xff0c;7&#xff0c;9&#xff0c;10&#xff0c;11套 1. 基础题1 sinput() print("{:\"^30x}".format(eval(s))) b …

【S5PV210】 | GPIO编程

【S5PV210】 | GPIO编程 时间:2024年3月17日22:02:32 目录 文章目录 【`S5PV210`】 | `GPIO`编程目录1.参考2.`DataSheet`2.1.概述2.1.1.特色2.1.2 输入/输出配置2.1.3 `S5PV210` 输入/输出类型2.1.4 IO驱动强度**2.1.4.1 类型A IO驱动强度****2.1.4.2 类型A IO驱动强度****2…

Selenium不同版本配置自动下载驱动及打包细节

Selenium配置浏览器驱动 自动下载浏览器驱动的方法 selenium4.7.0自动下载浏览器驱动的方法 selenium4.11.0 或4.11.1手动设置浏览器驱动路径的方法pyinstaller打包程序时同时打包ChromeDriverchromedriver路径需要sys._MEIPASS的路径进行引用方法一&#xff1a;通过–add-data…

LiveGBS流媒体平台GB/T28181功能-HTTPS 服务支持配置开启什么时候需要开启HTTPS测试SSL证书配置HTTPS测试证书

LiveGBS功能支持HTTPS 服务支持配置开启什么时候需要开启HTTPS测试SSL证书配置HTTPS测试证书 1、配置开启HTTPS1.1、准备https证书1.1.1、选择Nginx类型证书下载 1.2、配置 LiveCMS 开启 HTTPS1.2.1 web页面配置1.2.2 配置文件配置 2、HTTPS测试证书3、验证HTTPS服务4、为什么要…

安防监控视频汇聚平台EasyCVR接入海康Ehome设备,设备在线但视频无法播放是什么原因?

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

python5:基于多进程的并发编程、基于协程的并发编程的学习笔记

进程 为什么要使用多进程&#xff1f;——GIL的存在&#xff0c;多线程实际不是并发执行 将任务分为两类&#xff1a;IO密集型&#xff08;多线程&#xff09;CPU密集型&#xff08;多进程&#xff09; 多进程的基本用法 concurrent.futures.process.ProcessPoolExecutor#进…

Airgorah:一款功能强大的WiFi安全审计工具

关于Airgorah Airgorah是一款功能强大的WiFi安全审计工具&#xff0c;该工具可以轻松发现和识别连接到无线接入点的客户端&#xff0c;并对特定的客户端执行身份验证攻击测试&#xff0c;捕捉WPA握手包&#xff0c;并尝试破解接入点的密码。在该工具的帮助下&#xff0c;广大研…

C语言联合体和枚举

前言 这篇博客就把剩下的两个自定义类型联合体和枚举好好总结一下&#xff0c;让我们好好看看联合体和枚举到底是什么 个人主页&#xff1a;小张同学zkf 若有问题 评论区见 感兴趣就关注一下吧 目录 1. 联合体 1.1 联合体类型的声明 1.2 联合体的特点 1.3 相同成员的结构体和联…

分类预测 | Matlab实现PSO-KELM粒子群优化算法优化核极限学习机分类预测

分类预测 | Matlab实现PSO-KELM粒子群优化算法优化核极限学习机分类预测 目录 分类预测 | Matlab实现PSO-KELM粒子群优化算法优化核极限学习机分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.MATLAB实现PSO-KELM粒子群优化算法优化核极限学习机分类预测(完整源…

ideaSSM 高校公寓交流员管理系统bootstrap开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 idea 开发 SSM 高校公寓交流管理系统是一套完善的信息管理系统&#xff0c;结合SSM框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和数据库&…

C语言:自定义类型(结构体)

目录 一、结构的特殊声明二、结构的自引用三、结构体内存对齐1.对齐规则2.为什么存在内存对齐(1)平台原因 (移植原因)&#xff1a;(2)性能原因&#xff1a; 3.修改默认对齐数 四、结构体传参五、结构体实现位段1.什么是位段2.位段的内存分配3.位段的跨平台问题4.位段使用的注意…

硬件工程师入门基础知识(四)多层陶瓷电容应用(三)

多层陶瓷电容应用(三) 1.开关电源输出滤波电容器如何选择和计算?2.线性电源和开关电源对于滤波电容的选择有何区别?3.哪些电路适合三端多层陶瓷电容?4.哪些电路适合金属支架多层陶瓷电容?1.开关电源输出滤波电容器如何选择和计算? 无论任何开关电源拓扑,其输出滤波的目…

乐得瑞科技PD协议芯片:OTG与充电并行,引领数据交互

在科技日新月异的今天&#xff0c;数据交互的方式对于我们的日常生活和工作都起到了至关重要的作用。但在OTG技术诞生之前&#xff0c;这一过程却显得相当繁琐和耗时。想象一下&#xff0c;你需要将数码相机的照片导入到笔记本电脑中&#xff0c;却不得不频繁地拔出内存卡&…

YOLOv5目标检测学习(6):源码解析之:训练部分train.py

文章目录 前言一、导入相关包与配置二、主函数main2.1 checks&#xff1a;检查rank值来判断是否打印参数、检查git仓库、检查包的安装2.2 判断是否恢复上一次模型训练提问&#xff1a;opt.data, opt.cfg, opt.hyp, opt.weights, opt.project各是什么&#xff1f; 2.3 DDP mode&…

基于JavaWeb+BS架构+SpringBoot+Vue+O2O生鲜食品订购小程序系统的设计和实现

基于JavaWebBS架构SpringBootVueO2O生鲜食品订购小程序系统的设计和实现 文末获取源码Lun文目录前言主要技术系统设计功能截图 文末获取源码 Lun文目录 目 录 摘 要 I Abstract II 1 绪 论 1 1.1课题研究背景及意义 1 1.2研究现状 1 1.3本论文的主要论文结构 3 2系统相关技术…

Qt实现TFTP Server和 TFTP Client(一)

1 概述 TFTP协议是基于UDP的简单文件传输协议&#xff0c;协议双方为Client和Server.Client和Server之间通过5种消息来传输文件,消息前两个字节Code是消息类型&#xff0c;消息内容随消息类型不同而不同。传输模式有三种&#xff1a;octet,netascii和mail&#xff0c;octet为二…