newWorkStealingPool是什么?
newWorkStealingPool简单翻译是任务窃取线程池。
newWorkStealingPool 是Java8添加的线程池。和别的4种不同,它用的是ForkJoinPool。
使用ForkJoinPool的好处是,把1个任务拆分成多个“小任务”,把这些“小任务”分发到多个线程上执行。这些“小任务”都执行完成后,再将结果合并。
之前的线程池中,多个线程共有一个阻塞队列,而newWorkStealingPool 中每一个线程都有一个自己的队列。
当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行。可以简单理解为”窃取“。
一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从头开始执行,一个从尾部开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。
它有2种实现,如下:
无参
public static ExecutorService newWorkStealingPool() {return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
}
Runtime.getRuntime().availableProcessors()是获取当前系统可以的CPU核心数。
有参
就一个参数parallelism,可以自定义并行度。
public static ExecutorService newWorkStealingPool(int parallelism) {return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
}
newWorkStealingPool测试案例
public class Thread08_WorkStealing {public static void main(String[] args) {ExecutorService executorService = Executors.newWorkStealingPool(3);for (int i=1; i<= 100; i++){executorService.submit(new MyWorker(i));}while (true){}}
}
运行结果:
ForkJoinPool-1-worker-2正在执行,数值:2
ForkJoinPool-1-worker-1正在执行,数值:1
ForkJoinPool-1-worker-3正在执行,数值:3
ForkJoinPool-1-worker-2正在执行,数值:5
ForkJoinPool-1-worker-1正在执行,数值:4
ForkJoinPool-1-worker-3正在执行,数值:6
ForkJoinPool-1-worker-2正在执行,数值:8
ForkJoinPool-1-worker-3正在执行,数值:9
ForkJoinPool-1-worker-1正在执行,数值:7
。。。。。。
发现确实创建了3个线程来执行任务。
把newWorkStealingPool(3)中参数去掉改成newWorkStealingPool(),结果如下:
ForkJoinPool-1-worker-1正在执行,数值:1
ForkJoinPool-1-worker-3正在执行,数值:3
ForkJoinPool-1-worker-2正在执行,数值:2
ForkJoinPool-1-worker-4正在执行,数值:4
ForkJoinPool-1-worker-5正在执行,数值:5
ForkJoinPool-1-worker-6正在执行,数值:6
ForkJoinPool-1-worker-7正在执行,数值:7
ForkJoinPool-1-worker-0正在执行,数值:8
ForkJoinPool-1-worker-6正在执行,数值:10
ForkJoinPool-1-worker-2正在执行,数值:13
ForkJoinPool-1-worker-0正在执行,数值:15
。。。。。。
发现确实创建了8个线程共同完成任务,因为我CPU有8个核。
ThreadPoolExecutor的核心点:
在ThreadPoolExecutor中只有一个阻塞队列存放当前任务
ForkJoinPool从名字上就能看出一些东西。当有一个特别大的任务时,如果采用上述方式,这个大任务只能会某一个线程去执行。ForkJoin第一个特点是可以将一个大任务拆分成多个小任务,放到当前线程的阻塞队列中。其他的空闲线程就可以去处理有任务的线程的阻塞队列中的任务
来一个比较大的数组,里面存满值,计算总和
单线程处理一个任务:
/** 非常大的数组 */
static int[] nums = new int[1_000_000_000];
// 填充值
static{for (int i = 0; i < nums.length; i++) {nums[i] = (int) ((Math.random()) * 1000);}
}
public static void main(String[] args) {// ===================单线程累加10亿数据================================System.out.println("单线程计算数组总和!");long start = System.nanoTime();int sum = 0;for (int num : nums) {sum += num;}long end = System.nanoTime();System.out.println("单线程运算结果为:" + sum + ",计算时间为:" + (end - start));
}
多线程分而治之的方式处理:
/** 非常大的数组 */
static int[] nums = new int[1_000_000_000];
// 填充值
static{for (int i = 0; i < nums.length; i++) {nums[i] = (int) ((Math.random()) * 1000);}
}
public static void main(String[] args) {// ===================单线程累加10亿数据================================System.out.println("单线程计算数组总和!");long start = System.nanoTime();int sum = 0;for (int num : nums) {sum += num;}long end = System.nanoTime();System.out.println("单线程运算结果为:" + sum + ",计算时间为:" + (end - start));// ===================多线程分而治之累加10亿数据================================// 在使用forkJoinPool时,不推荐使用Runnable和Callable// 可以使用提供的另外两种任务的描述方式// Runnable(没有返回结果) -> RecursiveAction// Callable(有返回结果) -> RecursiveTaskForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();System.out.println("分而治之计算数组总和!");long forkJoinStart = System.nanoTime();ForkJoinTask<Integer> task = forkJoinPool.submit(new SumRecursiveTask(0, nums.length - 1));Integer result = task.join();long forkJoinEnd = System.nanoTime();System.out.println("分而治之运算结果为:" + result + ",计算时间为:" + (forkJoinEnd - forkJoinStart));
}private static class SumRecursiveTask extends RecursiveTask<Integer>{/** 指定一个线程处理哪个位置的数据 */private int start,end;private final int MAX_STRIDE = 100_000_000;// 200_000_000: 147964900// 100_000_000: 145942100public SumRecursiveTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {// 在这个方法中,需要设置好任务拆分的逻辑以及聚合的逻辑int sum = 0;int stride = end - start;if(stride <= MAX_STRIDE){// 可以处理任务for (int i = start; i <= end; i++) {sum += nums[i];}}else{// 将任务拆分,分而治之。int middle = (start + end) / 2;// 声明为2个任务SumRecursiveTask left = new SumRecursiveTask(start, middle);SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);// 分别执行两个任务left.fork();right.fork();// 等待结果,并且获取sumsum = left.join() + right.join();}return sum;}
}
最终可以发现,这种累加的操作中,采用分而治之的方式效率提升了2倍多。
但是也不是所有任务都能拆分提升效率,首先任务得大,耗时要长。
知识来源:
Java多线程(十四) Java8 newWorkStealingPool 线程池_瑟王的博客-CSDN博客