在公司开发时,客户说需要支持大数据量excel导入,所以打算写一篇文章记录下思路和优化过程。
一、前期准备
- 首先我们选用的肯定是阿里出品的EasyExcel,对比poi和jxl占内存更少
easyexcel官方网站 - 准备测试的数据库和excel文件,已经和代码一起上传到gitee仓库
项目代码 - 修改mysql的max_allowed_packet
解决MySQL的PacketTooBigException异常问题 - 修改了tomcat上传文件的默认限制,因为文件可能过大,会报错
server:port: 8888maxHttpHeaderSize: 102400servlet:context-path: /apierror:include-exception: falseinclude-message: always
spring:servlet:multipart:max-file-size: 100MBmax-request-size: 100MB
- 开启MyBatis-Plus的批量插入功能,如果不需要请忽略
Mybatis-Plus自定义批量插入的实现方法
二、依赖引入
<!-- easyexcel依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.3.4</version></dependency><!-- hutool依赖 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency><!-- mysql依赖 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.15</version><scope>runtime</scope></dependency><!--MyBatis plus配置--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
三、创建所需工具类
- MultipartFileToFileUtils
import org.springframework.web.multipart.MultipartFile;import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;/*** 文件流工具 将传入的MultipartFile类型转为File类型,Controller接收到的是MultipartFile类型,EasyExcel.read方法所需要的是File类型。*/
public class MultipartFileUtil {private final static String STATIC_PATH = "d:/upload/file/";public static File multipartFileToFile(MultipartFile file) throws Exception {File toFile = null;if (file.getSize() > 0) {InputStream ins;ins = file.getInputStream();toFile = new File(STATIC_PATH+file.getName());inputStreamToFile(ins, toFile);ins.close();}return toFile;}//获取流文件private static void inputStreamToFile(InputStream ins, File file) {try {OutputStream os = Files.newOutputStream(file.toPath());int bytesRead;byte[] buffer = new byte[8192];while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {os.write(buffer, 0, bytesRead);}os.close();ins.close();} catch (Exception e) {e.printStackTrace();}}}
四、创建业务层代码
1.实体类
import com.alibaba.excel.annotation.ExcelProperty;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;@Data
@TableName("sys_user")
public class User {@ExcelProperty("id")private Long id;@ExcelProperty("姓名")private String name;@ExcelProperty("身份证号码")private String idCard;@ExcelProperty("年龄")private Integer age;@ExcelProperty("性别")private String sex;@ExcelProperty("备注")private String remark;
}
2. Mapper层
如果不使用MyBatis-Plus的批量插入功能,正常继承BaseMapper就好
import com.demo.config.GemBaseMapper;
import com.demo.eneity.User;
import org.apache.ibatis.annotations.Mapper;@Mapper
public interface UserMapper extends GemBaseMapper<User> {}
3. service层
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import com.alibaba.excel.EasyExcel;
import com.demo.eneity.User;
import com.demo.excel.SimpleThreadListener;
import com.demo.utils.MultipartFileUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;@Service
@AllArgsConstructor
@Slf4j
public class UserService {public String importUserList(MultipartFile file) throws Exception {TimeInterval timer = DateUtil.timer();//不分片单线程插入EasyExcel.read(MultipartFileUtil.multipartFileToFile(file), User.class,new SimpleThreadListener()).sheet().doRead();//分片单线程插入
// EasyExcel.read(MultipartFileUtil.multipartFileToFile(file), User.class,new CutDataListener()).sheet().doRead();//多线程
// EasyExcel.read(MultipartFileUtil.multipartFileToFile(file), User.class,new MultiThreadListener()).sheet().doRead();log.info("导入成功,花费时间为{}毫秒", timer.interval());return "导入成功";}
}
4. controller层
import com.demo.service.UserService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;import javax.annotation.Resource;@RestController
@RequestMapping("/user")
public class UserController {@ResourceUserService userService;@PostMapping("/import")public String importUserList(@RequestParam("file") MultipartFile file) throws Exception {return userService.importUserList(file);}}
五、创建事件监听器
这里有三个版本的事件监听器,分别为单线程事件监听器、分片事件监听器、多线程事件监听器
1. 单线程事件监听器
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.demo.eneity.User;
import com.demo.mapper.UserMapper;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;@Getter
@Component
@Slf4j
public class SimpleThreadListener extends AnalysisEventListener<User> {private List<User> list = Collections.synchronizedList(new ArrayList<>());public SimpleThreadListener() {}@Overridepublic void invoke(User user, AnalysisContext analysisContext) {if (user != null) {list.add(user);}}@Overridepublic void doAfterAllAnalysed(AnalysisContext analysisContext) {log.info("解析完毕,开始插入");UserMapper userMapper = SpringUtil.getBean(UserMapper.class);userMapper.insertBatchSomeColumn(list);}
}
2. 分片事件监听器
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.demo.eneity.User;
import com.demo.mapper.UserMapper;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Getter
@Setter
@Component
@Slf4j
public class CutDataListener extends AnalysisEventListener<User> {private List<User> list = new ArrayList<>();public CutDataListener() {}@Overridepublic void invoke(User user, AnalysisContext analysisContext) {if (user != null) {list.add(user);}//分批插入,大于10w执行一次if(list.size() >= 100000) {saveData();list.clear();}}/*** 保存数据到db*/private void saveData() {UserMapper userMapper = SpringUtil.getBean(UserMapper.class);userMapper.insertBatchSomeColumn(list);list.clear();}@Overridepublic void doAfterAllAnalysed(AnalysisContext analysisContext) {saveData();list.clear();}
}
3. 多线程事件监听器
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.demo.eneity.User;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;@Getter
@Component
@Slf4j
public class MultiThreadListener extends AnalysisEventListener<User> {private List<User> list = Collections.synchronizedList(new ArrayList<>());private final static int CORE_POOL_SIZE = 5;//核心线程数private final static int MAX_POOL_SIZE = 10;//最大线程数private final static int QUEUE_CAPACITY = 100;//队列大小private final static long KEEP_ALIVE_TIME = 1L;//存活时间public MultiThreadListener() {}@Overridepublic void invoke(User user, AnalysisContext analysisContext) {if (user != null) {list.add(user);}}@Overridepublic void doAfterAllAnalysed(AnalysisContext analysisContext) {log.info("解析完毕,开始插入新数据");//创建一个新的线程池ExecutorService executorService = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.MINUTES,new ArrayBlockingQueue<>(QUEUE_CAPACITY),new ThreadPoolExecutor.CallerRunsPolicy());//设置每个线程处理数据的数量int singleTreadDealCount = 1000;//要提交到线程池的线程数量int threadSize = (list.size() / singleTreadDealCount) + 1;//开始位置int startIndex = 0;//结束位置int endIndex = 0;//初始化闭锁,数量为线程数量CountDownLatch countDownLatch = new CountDownLatch(threadSize);for (int i = 0; i < threadSize; i++) {//最后一个的结束位置是数组的大小if ((i + 1) == threadSize) {startIndex = i * singleTreadDealCount;endIndex = list.size();} else {startIndex = i * singleTreadDealCount;endIndex = (i + 1) * singleTreadDealCount;}//创建自定义线程任务类,执行run方法UserThread thread = new UserThread(startIndex,endIndex,list,countDownLatch);executorService.execute(thread);}try {//当前线程开始等待countDownLatch.await();}catch (InterruptedException e){e.printStackTrace();}//通过countDownLatch控制所有线程都执行完,再关闭线程池executorService.shutdown();list.clear();}
}
4.创建多线程监控器用的线程任务类
import cn.hutool.extra.spring.SpringUtil;
import com.demo.eneity.User;
import com.demo.mapper.UserMapper;import java.util.List;
import java.util.concurrent.CountDownLatch;public class UserThread implements Runnable {private int startIndex;private int endIndex;private List<User> list;private CountDownLatch count;private UserMapper userMapper;public UserThread(int startIndex, int endIndex, List<User> list, CountDownLatch count) {this.startIndex = startIndex;this.endIndex = endIndex;this.list = list;this.count = count;}@Overridepublic void run() {try {List<User> newList = list.subList(startIndex, endIndex);//防止空插入if (newList.size() > 0) {UserMapper userMapper = SpringUtil.getBean(UserMapper.class);userMapper.insertBatchSomeColumn(newList);}} catch (Exception e) {e.printStackTrace();} finally {//计数减一count.countDown();}}
}
实现Callable或者Runable或者继承Thread都行,这里实现Runable,重写run方法。然后根据传入位置区间,通过subList方法分割,执行批量插入方法进行数据的入库。在finally中执行coutDown,是为了防止插入时出现异常
注:关于CountDownLatch的详解,可以看我的这篇文章
浅谈CountDownLatch 和 CyclicBarrier
六、测试三种方法的效率
1.单线程
什么操作都不处理的情况下,耗时88秒
2.分片单线程
我们在UserService开启分片单线程的方法,然后清空整个数据库
重启调用接口,总耗时79秒
3.多线程
我们同样在UserService开启多线程的方法,然后清空整个数据库,重启调用接口,总耗时39秒,数据插入也正常
速度相对于单线程来说差不多快了一倍的速度