一、配置线程池
1、不推荐的方式
ExecutorService executorService = Executors.newFixedThreadPool(); // 创建⼀个固定⼤⼩的线程池,可控制并发的线程数,超出的线程会在队列中等待;
ExecutorService executorService = Executors.newCachedThreadPool(); // 创建⼀个可缓存的线程池,若线程数超过处理所需,缓存⼀段时间后会回收,若线程数不够,则新建线程;
ExecutorService executorService = Executors.newSingleThreadExecutor(); // 创建单个线程数的线程池,它可以保证先进先出的执⾏顺序;
ExecutorService executorService = Executors.newScheduledThreadPool(); // 创建⼀个可以执⾏延迟任务的线程池;
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); // 创建⼀个单线程的可以执⾏延迟任务的线程池;
ExecutorService executorService = Executors.newWorkStealingPool(); // 创建⼀个抢占式执⾏的线程池(任务执⾏顺序不确定)【JDK1.8 添加】。
2、原始方式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {// 核心线程数private static int corePoolSize =10;// 最大线程数private static int maxmumPoolSize =30;// 空闲存活时间private static long keepTime = 30;// 时间单位private static TimeUnit unit = TimeUnit.SECONDS;// 任务队列private static ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(1000);// 创建线程工厂private static ThreadFactory threadFactory1 = Executors.defaultThreadFactory();private static ThreadPoolExecutor.AbortPolicy policy = new ThreadPoolExecutor.AbortPolicy();public static void main(String[] args) throws Exception{ExecutorService executorService1 = Executors.newFixedThreadPool(10);ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,maxmumPoolSize,keepTime,unit,blockingQueue,threadFactory1,policy);executorService.execute(new Runnable(){public void run(){System.out.println("new Runnable!");};});/*//线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。executorService.shutdownNow();*//*//线程池拒接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。executorService.shutdown();*/// 这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,// 否则返回false,一般情况下会和shutdown方法组合使用。boolean boole = executorService.awaitTermination(3,TimeUnit.SECONDS);}}
3、Spring的方式
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
@EnableAsync // 同一个类的中调用无效
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); // 获取cpu个数// 核心线程数private static final int COUR_SIZE = CPU_COUNT * 2;// 最大线程数private static final int MAX_COUR_SIZE = CPU_COUNT * 4;// 队列容量private static final int QUEUE_SIZE = CPU_COUNT * 4 * 4;// 空闲存活时间private static long keepTime = 30;// 时间单位private static TimeUnit unit = TimeUnit.SECONDS;// 任务队列// private static ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(10*10000);@Bean(name = "asyncDownLoadExcelExecutor")public ThreadPoolTaskExecutor threadPoolTaskExecutor() {// ThreadPoolTaskSchedulerThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();// 设置核心线程数threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setQueueCapacity(QUEUE_SIZE);// 默认是 60s,这里设置 30sthreadPoolTaskExecutor.setKeepAliveSeconds(30);// 给线程池设置名称threadPoolTaskExecutor.setThreadNamePrefix("async-download-excel");threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}@Bean(name = "asyncUploadExcelExecutor")public ThreadPoolTaskExecutor asyncUploadExcelExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();// 设置核心线程数threadPoolTaskExecutor.setCorePoolSize(5);// 设置最大线程数threadPoolTaskExecutor.setMaxPoolSize(10);// 设置阻塞队列大小threadPoolTaskExecutor.setQueueCapacity(999);// 默认是 60s,这里设置30sthreadPoolTaskExecutor.setKeepAliveSeconds(30);// 设置线程池中线程名前缀threadPoolTaskExecutor.setThreadNamePrefix("async-upload-excel");//当达到 MaxPoolSize 不再调用新线程,用调用者所在线程之星异步任务。threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return threadPoolTaskExecutor;}}
二、百万数据的导出(生成多个文件,统一压缩)
2.1、引入依赖
<dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>4.0.1</version></dependency>
2.2、实体类
public class PersonEntity {private Long id;private String name;private Integer age;private String address;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}
}
2.3、Controller
/*** 导出方法* 此处应注意 线程池拒绝策略 抛出的异常,若任务过大,则走降级方法。*/public void exportMillionData(HttpServletRequest request, HttpServletResponse response) throws IOException {// 表格表头String[] TITLE = new String[]{ "姓名", "年龄", "地址"};// 获取数据进行分割int count = 100*10000; // personService.count();int pageSize = 50000;// 获取批次数int tableNum = count % pageSize == 0 ? (count / pageSize) : (count / pageSize) + 1;// 将数据多线程方式导出到excelCountDownLatch latch = new CountDownLatch(tableNum);for (int i = 0; i < tableNum; i++) {exportDataToExcel(latch, TITLE, pageSize, i);}try {// 阻塞 —— 等待全部执行完latch.await();// 压缩响应// 处理中文名不显示的问题String fileName = URLEncoder.encode("人员信息.zip", "UTF-8");response.setContentType("application/octet-stream;charset=UTF-8");response.setContentType("application/x-zip-compressed;charset=UTF-8");response.setHeader("Content-Disposition", "attachment;filename=" + fileName);response.addHeader("Pargam", "no-cache");response.addHeader("Cache-Control", "no-cache");response.addHeader("Access-Contro1-A11ow-0rigin", "*");File zip = ZipUtil.zip(new File("D://file/sys/"));ServletOutputStream output = response.getOutputStream();FileInputStream input = new FileInputStream(zip);byte[] buff = new byte[1024 * 10];int len = 0;while ((len = input.read(buff)) > -1) {output.write(buff, 0, len);}output.flush();output.close();if (zip.exists()) {zip.delete();}} catch (InterruptedException e){e.printStackTrace();}finally {FileUtil.deleteDir(new File("D://file/sys/"));}}
2.4、Service
/*** 导出数据到 Excel* @param latch 锁* @param TITLE 表格头* @param pageSize 每个sheet的记录数* @param first 表格序号*/@Async("asyncDownLoadExcelExecutor")public void exportDataToExcel(CountDownLatch latch, String[] TITLE, int pageSize, int first) throws IOException {// IPage page = new Page();// page.setCurrent(i + 1);// page.setSize(pageSize);List<PersonEntity> records = new ArrayList<>();// personService.page(page).getRecords();int start = first * pageSize;int end = start + pageSize;String fileName = start + "-" + end + "人员信息" + ".xlsx";// 写出到本地的excel文件中SXSSFWorkbook wb = new SXSSFWorkbook();Sheet sheet = wb.createSheet(fileName);Row row = sheet.createRow(0);Cell cell = null;// 写标题for (int j = 0; j < TITLE.length; j++) {cell = row.createCell(j);cell.setCellValue(TITLE[j]);}// 写内容int rowNum = 1;for (PersonEntity entity : records) {row = sheet.createRow(rowNum++);row.createCell(0).setCellValue(entity.getName());row.createCell(1).setCellValue(entity.getAge());row.createCell(2).setCellValue(entity.getAddress());}fileName = new String(fileName.getBytes(), "UTF-8");File file = new File("D://file/sys/" + fileName);if (!file.exists()) {file.getParentFile().mkdirs();}FileOutputStream outputStream = new FileOutputStream(file);wb.write(outputStream);outputStream.flush();outputStream.close();latch.countDown();}
三、多线程插入数据 (类似分布式的TCC)
1、引入依赖
<dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>5.2.9.RELEASE</version>
</dependency>
2、定义线程池
@Bean(name = "asyncInsertDataExecutor")
public ThreadPoolTaskExecutor asyncUploadExcelExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();// 设置核心线程数threadPoolTaskExecutor.setCorePoolSize(5);// 设置最大线程数threadPoolTaskExecutor.setMaxPoolSize(10);// 设置阻塞队列大小threadPoolTaskExecutor.setQueueCapacity(999);// 默认是 60s,这里设置30sthreadPoolTaskExecutor.setKeepAliveSeconds(30);// 设置线程池中线程名前缀threadPoolTaskExecutor.setThreadNamePrefix("async-upload-excel");//当达到 MaxPoolSize 不再调用新线程,用调用者所在线程之星异步任务。threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return threadPoolTaskExecutor;
}
3、Controller
@Autowired
private PlatformTransactionManager transactionManager;public String insertData() throws InterruptedException {CountDownLatch latch = new CountDownLatch(10);AtomicReference<Boolean> rollback = new AtomicReference<>(false);// 先在开启多线程外面,定义一个同步集合:List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<>());// 调用线程方法for(int i=0;i<10;i++){exportDataToExcel(latch,rollback,transactionStatuses,new ArrayList<>());}// 阻塞 —— 等待全部执行完latch.await();// 如果出错回滚事务if (rollback.get()) {transactionStatuses.forEach(status -> transactionManager.rollback(status));return " 插入失败 ";} else {transactionStatuses.forEach(status -> transactionManager.commit(status));return " 插入成功 ";}
}
4、Service
@Async("asyncInsertDataExecutor")
public void exportDataToExcel(CountDownLatch latch,AtomicReference<Boolean> rollback,List<TransactionStatus> transactionStatuses,List<Object> list) {try {// 开启事务(可封装成方法)DefaultTransactionDefinition def = new DefaultTransactionDefinition();def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);TransactionStatus status = transactionManager.getTransaction(def);transactionStatuses.add(status);// .... 业务代码list.clear();} catch (Exception e) {rollback.set(true);e.printStackTrace();}latch.countDown();
}