ForkJoin框架与工作窃取算法详解

文章目录

  • 一、ForkJoin框架概述
    • 1_核心概念
    • 2_主要类和方法
      • 1_ForkJoinPool
      • 2_ForkJoinTask
  • 二、启用异步模式与否的区别
  • 三、ForkJoinPool的三种任务提交方式
  • 四、执行逻辑及使用示例
    • 1_示例:并行计算数组元素和
    • 2_`forkJoinPool.submit`
    • 3_`ForkJoinTask<?>`中任务的执行逻辑
    • 4_fork\join中的逻辑
    • 5_窃取任务的实现
  • 五、线程池的大小对性能的影响
  • 六、ForkJoin与传统线程池的比较
  • 七、同步与异步线程池
    • 1_Executors.newWorkStealingPool
    • 2_ForkJoinPool.commonPool
    • 3_对比与总结
      • 1_共同点
      • 2_不同点
      • 3_窃取工作进行计算
  • 八、总结

在现代计算中,高效并行处理是提升应用程序性能的关键之一。Java 7 引入的 ForkJoinPool 框架是实现高并行度和递归任务处理的强大工具。本文将详细介绍 ForkJoinPool 框架及其核心的工作窃取算法。


一、ForkJoin框架概述

ForkJoinPool 是 Java 并发包中的并行计算框架,旨在有效执行递归任务和大规模并行计算。其核心思想是通过任务拆分(fork)和任务合并(join)来实现高并行度,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算。并利用工作窃取算法来平衡负载。

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。

ForkJoinPool是ForkJoin框架执行任务的核心组件。它是工作窃取算法的执行地,负责管理工作线程和提供任务执行环境。

1_核心概念

  1. ForkJoinTask

    • ForkJoinTask 是一个抽象类,表示一个可以并行执行的任务,有两个子类:
      • RecursiveTask<V>:有返回值的任务。
      • RecursiveAction:没有返回值的任务。
    • 也就是说提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)
  2. 工作窃取算法

    • 每个工作线程维护一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
    • 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
    • 当一个线程的任务队列为空时或当前任务(join),它可以从其他忙碌线程的队列“窃取”任务来执行,极大提高了并行计算的效率。
    • 默认情况下,从自己队列中获取任务执行和其他线程窃取自己队列中的任务执行是从队列相反的两端(头部、尾部)来进行的,极大的减少了竞争的情况出现。
      在这里插入图片描述

2_主要类和方法

1_ForkJoinPool

ForkJoinPool 是核心类,负责管理和调度 ForkJoinTask 任务。

  • 构造函数
    Fork/Join 默认会创建与 cpu 核心数大小相同的线程池,也就是第一个构造方法。

    ForkJoinPool()
    ForkJoinPool(int parallelism)
    ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)
    
    • parallelism:并行度,通常设置为可用处理器的数量。
    • factory:用于创建工作线程的工厂。
    • handler:未捕获异常处理器。
    • asyncMode:是否启用异步模型。
  • 常用方法

    <T> T invoke(ForkJoinTask<T> task)
    void execute(ForkJoinTask<?> task)
    <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
    

2_ForkJoinTask

ForkJoinTask 是一个抽象类,表示一个可以并行执行的任务。

  • 常用方法
    final ForkJoinTask<V> fork() // 异步执行任务 开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
    final V join() // 等待任务完成并获取结果
    final V invoke() // 同步执行任务并返回结果
    final V get() // 获取任务结果(可能阻塞)
    

这里并不会每个 fork 都会创建新线程, 也不是每个 join 都会造成线程被阻塞, 而是采取了 work-stealing 原理


二、启用异步模式与否的区别

ForkJoinPool 构造函数中的 asyncMode 参数决定线程池是否启用异步模型。这一选项主要影响本地任务的调度和执行策略,窃取任务无论在哪种模式下都是 FIFO 是没有区别的,也就是从其他线程的工作队列头部窃取任务,先创建的任务会被优先处理。

  • 不启用异步模型 (asyncMode = false) 默认:

    • 工作线程从自己队列的尾部弹出(pop)任务(LIFO)执行,等同于”栈操作”,后续创建的任务会被优先处理,适合需要深度优先执行的任务。
    • 更快地触及递归的底部,使得子任务尽快完成和合并,适合小规模和短生命周期的任务。
      在这里插入图片描述
  • 启用异步模型 (asyncMode = true)

    • 工作线程从自己队列的头部弹出(poll)任务(FIFO)执行,等同于”队列操作”,先创建的任务会被优先处理,适合需要广度优先执行的任务。
    • 降低某些情况下的任务窃取成本,更适合较大批量和较长生命周期的任务。

根据 JDK 官方文档,将 asyncMode 设置为 true从未加入的分叉任务建立本地先进先出的调度模式。在工作线程仅处理事件样式异步任务的应用程序中,这个模式可能比默认的基于本地堆栈的模式更合适。

在这里插入图片描述

asyncMode ? FIFO_QUEUE : LIFO_QUEUE,

可以从图中看到 asyncMode 参数的值决定了使用的队列。

根据Doug Lea的说法,FIFO 方法比 LIFO 具有以下优势:

Doug Lea 是 JDK7 中 fork/join 框架的实现者,也是 JDK 中 JUC 的核心开发者,真 Java 大神。

  • 它通过让"窃贼"作为所有者在 deque 的另一侧操作来减少竞争。
  • 它利用了早期生成“大型”任务的递归分治算法的性质。

上面的第二点意味着可以通过一个被盗任务的线程可以进一步分解旧的被盗任务。

三、ForkJoinPool的三种任务提交方式

ForkJoinPool 提供了三种提交任务的方式:invokeexecutesubmit。这三种方式都利用了工作窃取算法来优化任务调度和执行。

  1. invoke 方法

    • 同步执行任务,并返回其结果。
    • 阻塞调用线程,直到任务完成。
    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(array, 0, array.length);
    Integer result = pool.invoke(task);
    
  2. execute 方法

    • 异步执行任务,不返回结果。
    • 常用于提交不需要返回结果的任务。
    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(array, 0, array.length);
    pool.execute(task);
    
  3. submit 方法

    • 异步执行任务,并返回一个 Future 对象,用于获取任务结果或检查任务状态。
    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(array, 0, array.length);
    Future<Integer> future = pool.submit(task);
    Integer result = future.get(); // 阻塞,直到任务完成
    

四、执行逻辑及使用示例

1_示例:并行计算数组元素和

下面是一个使用 ForkJoinPool 进行并行计算的示例,我们将实现一个计算数组元素和的任务,并展示如何使用 RecursiveTask 来实现任务拆分和合并。

import java.util.concurrent.*;public class ForkJoinPoolExample {public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool forkJoinPool = new ForkJoinPool();int[] array = new int[100];for (int i = 0; i < array.length; i++) {array[i] = i + 1; // 初始化数组}SumTask task = new SumTask(array, 0, array.length);Future<Integer> result = forkJoinPool.submit(task);System.out.println("Sum: " + result.get());forkJoinPool.shutdown();}static class SumTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 10; // 阈值private int[] array;private int start;private int end;SumTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Integer compute() {if (end - start <= THRESHOLD) {int sum = 0;for (int i = start; i < end; i++) {sum += array[i];}return sum;} else {int mid = (start + end) / 2;SumTask leftTask = new SumTask(array, start, mid);SumTask rightTask = new SumTask(array, mid, end);leftTask.fork(); // 异步执行左任务int rightResult = rightTask.compute(); // 当前线程执行右任务int leftResult = leftTask.join(); // 等待左任务完成并获取结果return leftResult + rightResult;}}}
}

2_forkJoinPool.submit

提交任务后的执行流程分析:

1、可以看到submit方法进行校验后,会执行到externalPush中,真正处理的方法是externalPush

在这里插入图片描述

2、externalPush执行流程分析如下:

  1. 判断工作队列数组是否为空

  2. 通过按位与得到当前任务分配的工作队列

  3. 尝试通过CAS锁定该队列

  4. 判断工作队列中的任务数组是否为空

  5. 释放锁

  6. 提交任务(如果前面的判断满足会直接返回,不会执行到这里)

final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&// 判断工作队列数组是否为空(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&// 通过按位与得到当前任务分配的工作队列U.compareAndSwapInt(q, QLOCK, 0, 1)) { // 尝试通过CAS锁定该队列ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&// 判断工作队列中的任务数组是否为空(am = a.length - 1) > (n = (s = q.top) - q.base)) {// 尝试添加任务int j = ((am & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);U.putIntVolatile(q, QLOCK, 0);if (n <= 1)// 如果就只有一个任务signalWork(ws, q);// 调用单个任务执行return;}U.compareAndSwapInt(q, QLOCK, 1, 0);// 释放锁}externalSubmit(task); // 提交任务
}

在这里插入图片描述

根据 debug 由于我们是第一次执行任务提交,workQueues参数为 null,所以直接进入到externalSubmit方法中。

3、externalSubmit(task);方法流程分析。主要流程是在for (;;) { 中分三次循环完成的。

  1. 创建工作队列数组。workQueues = new WorkQueue[n];
  2. 创建一个具体的队列 q = new WorkQueue(this, null); ws[k] = q;
  3. 大任务存放到队列中 U.putOrderedObject(a, j, task);
  4. signalWork(ws, q); 这个方法有两个参数,一个是工作队列数组,还有一个是工作队列。
private void externalSubmit(ForkJoinTask<?> task) {int r;                                    // initialize caller's probeif ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {//死循环中WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;if ((rs = runState) < 0) {tryTerminate(false, false);     // help terminatethrow new RejectedExecutionException();}else if ((rs & STARTED) == 0 ||     // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;rs = lockRunState();try {if ((rs & STARTED) == 0) {U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of twoint p = config & SMASK; // ensure at least 2 slotsint n = (p > 1) ? p - 1 : 1;n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;workQueues = new WorkQueue[n];//1.创建了一个工作队列数组 此时还没有东西ns = STARTED;}} finally {unlockRunState(rs, (rs & ~RSLOCK) | ns);}}else if ((q = ws[k = r & m & SQMASK]) != null) {if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;int s = q.top;boolean submitted = false; // initial submission or resizingtry {                      // locked version of pushif ((a != null && a.length > s + 1 - q.base) ||(a = q.growArray()) != null) {int j = (((a.length - 1) & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);//3.将任务存到队列中的位置U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {U.compareAndSwapInt(q, QLOCK, 1, 0);}if (submitted) {signalWork(ws, q);return;}}move = true;                   // move on failure}else if (((rs = runState) & RSLOCK) == 0) { // create new queueq = new WorkQueue(this, null);//2.构造出一个队列q.hint = r;q.config = k | SHARED_QUEUE;q.scanState = INACTIVE;rs = lockRunState();           // publish indexif (rs > 0 &&  (ws = workQueues) != null &&k < ws.length && ws[k] == null)ws[k] = q; 			//2.将队列存到某一个位置                // else terminatedunlockRunState(rs, rs & ~RSLOCK);}elsemove = true;                   // move if busyif (move)r = ThreadLocalRandom.advanceProbe(r);}
}

4、signalWork(ws, q);之后的流程分析

  1. signalWork中有一个主要的方法就是tryAddWorker

    在这里插入图片描述

  2. 而在tryAddWorker中又主要看createWorker

    在这里插入图片描述

  3. createWorker中利用 ForkJoinWorkerThreadFactory 的方法新建了一个线程并启动,然后执行任务。

    private boolean createWorker() {ForkJoinWorkerThreadFactory fac = factory;Throwable ex = null;ForkJoinWorkerThread wt = null;try {if (fac != null && (wt = fac.newThread(this)) != null) {//新建一个线程wt.start();//启动线程return true;}} catch (Throwable rex) {ex = rex;}deregisterWorker(wt, ex);return false;
    }
    
  4. 新建的线程是ForkJoinWorkerThread是 Thread 的子类。

    public static interface ForkJoinWorkerThreadFactory {/*** Returns a new worker thread operating in the given pool.** @param pool the pool this thread works in* @return the new worker thread* @throws NullPointerException if the pool is null*/public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    }
    public class ForkJoinWorkerThread extends Thread//Thread 的子类
    

到这里forkJoinPool.submit提交任务后的流程就结束了,主要做的事情就是将task任务放入到创建的workQueue中,并创建一个线程执行任务,此时也会将队列和工作线程相关联,剩下执行任务的逻辑就属于ForkJoinTask<?>的了。

在这里插入图片描述

3_ForkJoinTask<?>中任务的执行逻辑

经过上面的分析我们可以知道,最后新建了一个ForkJoinWorkerThread线程,然后调用了start方法。那么我们看看这个线程类的run方法吧~

  • 注意:下面的 onStart() 方法是一个钩子方法,里面是空的,仅仅是一个声明,可以被子类重写以执行一些其他方法。

在这里插入图片描述

1、在run方法中我们可以看到主要的代码逻辑在pool.runWorker(workQueue);中,ForkJoinPool执行任务的过程如下:

  1. 判断工作队列是否需要扩容

  2. 阻塞调用对应WorkQueue的runTask方法

  3. 如果无法获取到执行资源,就等待任务调用

final void runWorker(WorkQueue w) {w.growArray();                   // allocate queueint seed = w.hint;               // initially holds randomization hintint r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShiftfor (ForkJoinTask<?> t;;) {if ((t = scan(w, r)) != null)w.runTask(t);else if (!awaitWork(w, r))break;r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}
}

2、WorkQueue的rukTask

  1. 标记当前状态为忙碌

  2. 调用当前任务执行

  3. 继续执行当前队列中等待执行的任务

  4. 尝试从其他WorkQueue中窃取任务执行

final void runTask(ForkJoinTask<?> task) {if (task != null) {scanState &= ~SCANNING; // mark as busy(currentSteal = task).doExec();//执行任务U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GCexecLocalTasks();// *** 重点ForkJoinWorkerThread thread = owner;if (++nsteals < 0)      // collect on overflowtransferStealCount(pool);scanState |= SCANNING;if (thread != null)thread.afterTopLevelExec();}}

注意:doExec()方法无论在runTask还是execLocalTasks中都是真正执行任务的方法,最终会调用ForkJoinTask<?>实现类的compute()方法。

final void execLocalTasks() {int b = base, m, s;ForkJoinTask<?>[] a = array;if (b - (s = top - 1) <= 0 && a != null &&(m = a.length - 1) >= 0) {if ((config & FIFO_QUEUE) == 0) {//由asyncMode决定for (ForkJoinTask<?> t;;) {if ((t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) == null)break;U.putOrderedInt(this, QTOP, s);t.doExec();//执行任务if (base - (s = top - 1) > 0)break;}}elsepollAndExecAll();}}

4_fork\join中的逻辑

我们知道,如果在compute中使用fork会将大任务拆分成较小的任务来执行。那么流程是怎么样的呢?

1、将当前线程的工作队列取到,并将拆分的小任务放入到当前线程的工作队列中。

public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)//判断((ForkJoinWorkerThread)t).workQueue.push(this);//this是拆分的小任务elseForkJoinPool.common.externalPush(this);return this;
}

2、注意看放入队列的push方法,其中最主要的是p.signalWork(p.workQueues, this);这行代码。

final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) {    // ignore if queue removedint m = a.length - 1;     // fenced write for task visibilityU.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) {if ((p = pool) != null)p.signalWork(p.workQueues, this);//这里的this是workQueue}else if (n >= m)growArray();}
}

3、根据之前的分析流程(2_forkJoinPool.submit4、signalWork(ws, q))可知,signalWork 方法会新创建一个ForkJoinWorkerThread去执行任务。

4、总结,fork会将当前拆分的任务放入当前线程的工作队列中,并且还会创建新的线程(在没达到设定的上限时)执行任务,由于新建的线程任务队列中为空,多半会去窃取当前线程工作队列中的任务。之后将循环这个流程。

大致流程如下所示:

在这里插入图片描述

5、当 fork执行完了之后到达图中箭头位置的点,我们发现 线程0、线程1 阻塞住,但是任务4、任务5、任务6…这些拆分的小任务都还没有执行,现在没有线程执行任务那么怎么办呢?这就要我们去了解join方法中的逻辑了。

在这里插入图片描述

注意:这里的join方法与线程中的是不一样的。这里的 join 方法主要是调用 doJoin 方法:

  • status表示任务的状态,如果小于 0 表示任务已经结束了。
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?//判断当前线程是否是工作线程(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();
}

6、那么这个方法主要是干了啥呢?主要去看awaitJoin方法。

从下面源码发现,主要部分还是一个死循环。

  1. 首先还是检查当前任务是否执行完: if ((s = task.status) < 0)
  2. 然后可以发现如果当前任务没有执行完则可能会进入helpStealer(w, task)helpComplete(w, cc, 0)
    • helpStealer:先定位的偷取者的任务队列;从偷取者的base索引开始,每次偷取一个任务执行。
    • tryCompensate: tryCompensate主要用来补偿工作线程因为阻塞而导致的算力损失,当工作线程自身的队列不为空,且还有其它空闲工作线程时,如果自己阻塞了,则在此之前会唤醒一个工作线程。
  3. 根据这两个方法的方法名可以大致知晓:这两个方法是让当前线程帮助其他线程执行任务。
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {int s = 0;if (task != null && w != null) {ForkJoinTask<?> prevJoin = w.currentJoin;   // 获取给定Worker的join任务U.putOrderedObject(w, QCURRENTJOIN, task);  // 把currentJoin替换为给定任务// 判断是否为CountedCompleter类型的任务CountedCompleter<?> cc = (task instanceof CountedCompleter) ?(CountedCompleter<?>) task : null;for (; ; ) {if ((s = task.status) < 0)              // 已经完成|取消|异常 跳出循环break;if (cc != null)                         // CountedCompleter任务由helpComplete来完成joinhelpComplete(w, cc, 0);else if (w.base == w.top || w.tryRemoveAndExec(task))  //尝试执行helpStealer(w, task);               // 队列为空或执行失败,任务可能被偷,帮助偷取者执行该任务if ((s = task.status) < 0)              // 已经完成|取消|异常,跳出循环break;// 计算任务等待时间long ms, ns;if (deadline == 0L)ms = 0L;else if ((ns = deadline - System.nanoTime()) <= 0L)break;else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)ms = 1L;if (tryCompensate(w)) {                         // 执行补偿操作task.internalWait(ms);                      // 补偿执行成功,任务等待指定时间U.getAndAddLong(this, CTL, AC_UNIT);     // 更新活跃线程数}}U.putOrderedObject(w, QCURRENTJOIN, prevJoin);      // 循环结束,替换为原来的join任务}return s;
}

总结:假设当前线程是 线程0 ,如上图执行到 任务1.join。当前任务1没有执行完,但是线程会帮助其他线程执行拆分后的小任务,不断循环,直到任务1执行完成 也就是status<0。

在这里插入图片描述

5_窃取任务的实现

final class WorkQueue {// 省略其他代码...// 偷取任务的方法final ForkJoinTask<?> pollAt(int b) {ForkJoinTask<?>[] a; int i; ForkJoinTask<?> t;if ((a = array) != null && (i = a.length - 1) >= 0 &&(t = a[b & i]) != null && t != a[b & (i >>>= 1)]) {if (t instanceof CountedCompleter) // 偷取CountedCompleter任务((CountedCompleter<?>)t).propagateCompletion();return t;}return null;}// 从其他队列中偷取任务final ForkJoinTask<?> poll() {WorkQueue[] ws; int m;int r = ThreadLocalRandom.current().nextInt();if ((ws = pool.workQueues) != null && (m = ws.length - 1) >= 0 &&(ws = ws[m & r & SQMASK]) != null) {int j = r & SMASK; // 随机选择一个队列进行偷取WorkQueue w; ForkJoinTask<?>[] a; ForkJoinTask<?> t;if ((w = ws[j]) != null && w.base - w.top < 0 &&(a = w.array) != null) { // 如果队列不为空,且还有任务未执行for (int b = r;;) { // 循环偷取任务int n = a.length;if (n >= NCPU || (t = w.pollAt(b & (n - 1))) == null) // 如果当前队列中没有可偷取的任务break;if (w.base == (b + 1)) // 如果队列被其他线程修改return null;if (UNSAFE.compareAndSwapObject(a, ((n - 1) & b) << ASHIFT, t, null))return t;if (n <= 1 || w.currentSteal != b)break;}}}return null;}// 省略其他代码...
}

五、线程池的大小对性能的影响

线程池的大小直接影响到程序的性能。在ForkJoin框架中,理想的情况是线程数等于处理器核心数。

int processors = Runtime.getRuntime().availableProcessors();
ForkJoinPool pool = new ForkJoinPool(processors);

通常来讲,对于计算密集型任务,设置线程池大小为处理器核心数可获得最佳性能。对于包含IO操作或是响应中断的任务,可能需要更多的线程来维持CPU的利用率。

六、ForkJoin与传统线程池的比较

传统的线程池如Executors.newFixedThreadPool()对所有任务采用一个共享的工作队列,而ForkJoin采用工作窃取算法和每个线程一个工作队列的设计,这提高了处理并行任务的效率。

在这里插入图片描述

// 传统线程池使用示例
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.submit(new Task());// ForkJoin线程池使用示例
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(new ForkJoinTask<Void>() { /* ... */ });

对于计算密集型任务,尤其是可以进行合理拆分的任务,ForkJoin往往比传统线程池表现更好。

七、同步与异步线程池

ForkJoinPool 除了直接使用 new关键字直接创建,还可以使用 ForkJoinPool.commonPoolExecutors.newWorkStealingPool 的方式获得 ForkJoinPool 对象。

1_Executors.newWorkStealingPool

newWorkStealingPool 简单翻译是任务窃取线程池。

newWorkStealingPool 是Java8添加的线程池。和别的Executors创建的其他4种不同,它用的是ForkJoinPool

使用ForkJoinPool的好处是,把1个任务拆分成多个“小任务”,把这些“小任务”分发到多个线程上执行。这些“小任务”都执行完成后,再将结果合并。

之前的线程池中,多个线程共有一个阻塞队列,而newWorkStealingPool 中每一个线程都有一个自己的队列。

当线程发现自己的队列没有任务了,就会到别的线程的队列里窃取任务执行。

一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从尾部开始执行,一个从头开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。

它有2种实现,无参:

public static ExecutorService newWorkStealingPool() {return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
}

有参:

就一个参数parallelism,可以自定义并行度。

public static ExecutorService newWorkStealingPool(int parallelism) {return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
}

特点

  1. 独立实例:每次调用 newWorkStealingPool 都会创建一个新的 ForkJoinPool 实例,不是共享的单例。
  2. 默认线程数量:默认情况下,线程池的并行度是 CPU 内核数,可以通过传入一个整数参数来指定并行度。
  3. 灵活性高:适用于需要单独管理并行度或者需要隔离不同并行任务的应用场景。

根据上面的分析我们可以看到,无论是无参构造还是有参,asyncMode 的参数都是true。这证实了我们将使用 FIFO 的队列配置。

2_ForkJoinPool.commonPool

ForkJoinPool.commonPool 是一个全局共享的 ForkJoinPool 实例(内部 static final ForkJoinPool common的变量),Java 系统在启动时就会初始化这个线程池。

特点

  1. 全局共享:commonPool 是一个单例,适用于应用程序中的并行计算任务。
  2. 默认线程数量:默认情况下,线程池的并行度(parallelism level)是 CPU 内核数减一。可以通过设置系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 来修改默认并行度。
  3. 适用范围广:适用于相对轻量级的并行任务,比如并行流操作、递归任务等。

类初始化时会使用 makeCommonPool() 方法中完成,为 ForkJoinPool 中的 common 参数赋值。该方法根据可用的处理器数量计算并行度,并创建一个新的 ForkJoinPool 实例。

private static ForkJoinPool makeCommonPool() {final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =new CommonPoolForkJoinWorkerThreadFactory();int parallelism = -1;ForkJoinWorkerThreadFactory factory = null;UncaughtExceptionHandler handler = null;try {  // ignore exceptions in accessing/parsing propertiesString pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");String fp = System.getProperty("java.util.concurrent.ForkJoinPool.common.threadFactory");String hp = System.getProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler");if (pp != null)parallelism = Integer.parseInt(pp);if (fp != null)factory = ((ForkJoinWorkerThreadFactory)ClassLoader.getSystemClassLoader().loadClass(fp).newInstance());if (hp != null)handler = ((UncaughtExceptionHandler)ClassLoader.getSystemClassLoader().loadClass(hp).newInstance());} catch (Exception ignore) {}if (factory == null) {if (System.getSecurityManager() == null)factory = commonPoolForkJoinWorkerThreadFactory;else // use security-managed defaultfactory = new InnocuousForkJoinWorkerThreadFactory();}if (parallelism < 0 && // default 1 less than #cores(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)//这一步会将并行度设置为 CPU 内核数减一parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,//使用LIFO的队列模式"ForkJoinPool.commonPool-worker-");}

ForkJoinPool.commonPool使用后进先出(LIFO)队列配置,而Executors.newWorkStealingPool使用先进先出的方法(FIFO)配置。

3_对比与总结

1_共同点

  1. 工作窃取算法:二者都使用工作窃取算法来优化并行任务处理。
  2. 任务类型:都适用于 ForkJoinTask 及其子类,如 RecursiveTaskRecursiveAction

2_不同点

  1. 实例共享

    • ForkJoinPool.commonPool 是全局共享单例,适合对所有并行计算任务使用统一的线程池。
    • Executors.newWorkStealingPool 每次调用都会创建一个新的实例,适合需要单独管理不同任务群或不同并行度的场景。
  2. 并行度设置

    • ForkJoinPool.commonPool 的并行度可以通过系统属性来更改,但所有使用 commonPool 的任务共享同一个并行度配置。
    • Executors.newWorkStealingPool 可以灵活指定并行度,每个创建的线程池实例可以有不同的并行度。
  3. 使用场景

    • ForkJoinPool.commonPool 适合全局共享并行线程池的轻量并行任务,比如并行流操作。
    • Executors.newWorkStealingPool 适合需要隔离不同任务群的场景,例如不同模块有不同的并行处理需求,或者某些任务需要特定的并行度配置。

选择使用哪种线程池应根据具体的应用场景和需要,并考虑共享性、灵活性和并行度的需求。通过合理选择适合的线程池,可以有效提升应用程序的并行处理能力和整体性能。

3_窃取工作进行计算

有了同步线程池,ForkJoinPool.commonPool只要任务仍在进行中,就会将线程放入池中。

因此,工作窃取的程度并不取决于任务粒度水平。

异步Executors.newWorkStealingPool的管理更强,允许工作窃取级别取决于任务粒度级别。

我们使用ForkJoinPool类的 getStealCount获得工作窃取级别:

long steals = forkJoinPool.getStealCount();

确定Executors.newWorkStealingPoolForkJoinPool.commonPool的工作窃取计数给我们带来了不同的行为:

Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]

Executors.newWorkStealingPool的粒度从精细变为粗(1到10,000)时,工作窃取水平就会降低。

因此,当任务没有分解时,窃取计数为1(粒度[granularity]为10000)。

ForkJoinPool.commonPool有不同的行为。

工作窃取水平总是很高,受到任务粒度变化的影响并不大。

从技术上讲,我们的素数示例支持异步处理事件式任务。

这是因为我们的实现不会强制结果的加入。

可以证明,Executors.newWorkStealingPool在解决问题时提供了最佳资源利用。

八、总结

ForkJoinPool 通过任务拆分和工作窃取机制,实现了高效的并行计算,非常适合需要递归计算和大规模并行处理的场景。通过合理选择是否启用异步模式,以及合适的任务提交方式,ForkJoinPool 能显著提升应用程序的并行处理能力和整体性能。

希望本文能帮助您更好地理解 ForkJoinPool 框架及其工作窃取算法,并能在实际项目中有效应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/367712.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web3 前端攻击:原因、影响及经验教训

DeFi的崛起引领了一个创新和金融自由的新时代。然而&#xff0c;这种快速增长也吸引了恶意行为者的注意&#xff0c;他们试图利用漏洞进行攻击。尽管很多焦点都集中在智能合约安全上&#xff0c;但前端攻击也正在成为一个重要的威胁向量。 前端攻击的剖析 理解攻击者利用前端漏…

uniapp标题水平对齐微信小程序胶囊按钮及适配

uniapp标题水平对齐微信小程序胶囊按钮及适配 状态栏高度胶囊按钮的信息计算顶部边距模板样式 标签加样式加动态计算实现效果 t是胶囊按钮距离的top h是胶囊按钮的高度 s是状态栏高度 大概是这样 状态栏高度 获取系统信息里的状态栏高度 const statusBarHeight uni.getSy…

使用CubeIDE调试项目现stm32 no source available for “main() at 0x800337c:

使用CubeIDE调试项目现stm32 no source available for "main() at 0x800337c&#xff1a; 问题描述 使用CubeIDE编译工程代码和下载都没有任何问题&#xff0c;点击Debug调试工程时&#xff0c;出现stm32 no source available for "main() at 0x800337c 原因分析&a…

数据结构与算法笔记:实战篇 - 剖析微服务接口鉴权限流背后的数据结构和算法

概述 微服务是最近几年才兴起的概念。简单点将&#xff0c;就是把复杂的大应用&#xff0c;解耦成几个小的应用 。这样做的好处有很多。比如&#xff0c;这样有利于团队组织架构的拆分&#xff0c;比较团队越大协作的难度越大&#xff1b;再比如&#xff0c;每个应用都可以独立…

nginx优化和防盗链

1、隐藏版本号 [roottest1 conf]# vim nginx.conf ​ server_tokens off; ​ 2、防盗链 修改用户和所在组 [roottest1 conf]# vim nginx.conf ​ #user nginx nginx; #表示主进程master会有root创建&#xff0c;子进程会有nginx用户来创建。 3、设置页面的缓存时间 主要是…

2024-2025年本田维修电路图线路图接线图资料更新

此次更新了2024-2025年本田车系电路图资料&#xff0c;覆盖市面上99%车型&#xff0c;包括维修手册、电路图、新车特征、车身钣金维修数据、全车拆装、扭力、发动机大修、发动机正时、保养、电路图、针脚定义、模块传感器、保险丝盒图解对照表位置等等&#xff01; 汽修帮手汽…

15- 22题聚合函数 - 高频 SQL 50 题基础版

目录 1. 相关知识点2. 例子2.15 - 有趣的电影2.16 - 平均售价2.17 - 项目员工 I2.18 - 各赛事的用户注册率2.19 - 查询结果的质量和占比2.20 - 每月交易 I2.21 - 即时食物配送 II2.22 - 游戏玩法分析 IV 1. 相关知识点 函数 函数含义order by排序group by分组between 小值 an…

Sping源码(九)—— Bean的初始化(非懒加载)—mergeBeanDefinitionPostProcessor

序言 前几篇文章详细介绍了Spring中实例化Bean的各种方式&#xff0c;其中包括采用FactoryBean的方式创建对象、使用反射创建对象、自定义BeanFactoryPostProcessor以及构造器方式创建对象。 创建对象 这里再来简单回顾一下对象的创建&#xff0c;不知道大家有没有这样一个疑…

边缘混合计算智慧矿山视频智能综合管理方案:矿山安全生产智能转型升级之路

一、智慧矿山方案介绍 智慧矿山是以矿山数字化、信息化为前提和基础&#xff0c;通过物联网、人工智能等技术进行主动感知、自动分析、快速处理&#xff0c;实现安全矿山、高效矿山的矿山智能化建设。旭帆科技TSINGSEE青犀基于图像的前端计算、边缘计算技术&#xff0c;结合煤…

【原创实现 设计模式】Spring+策略+模版+工厂模式去掉if-else,实现开闭原则,优雅扩展

1 定义与优点 1.1 定义 策略模式&#xff08;Strategy Pattern&#xff09;属于对象的⾏为模式。他主要是用于针对同一个抽象行为&#xff0c;在程序运行时根据客户端不同的参数或者上下文&#xff0c;动态的选择不同的具体实现方式&#xff0c;即类的行为可以在运行时更改。…

WIN32核心编程 - 数据类型 错误处理 字符处理

公开视频 -> 链接点击跳转公开课程博客首页 -> 链接点击跳转博客主页 目录 数据类型 基本数据类型 Win32基本数据类型 错误处理 C语言中的错误处理 C中的错误处理 Win32中的错误处理 字符处理 C/C WIN32 字符处理 数据类型 基本数据类型 C/C语言定义了一系列…

双指针系列第 8 篇:盛水最多的容器。几句话讲明白!

Leetcode 题目链接 思路 取首尾双指针和水量如下所示&#xff0c;设高度函数为 h ( i ) h(i) h(i)&#xff0c;在下图中 h ( l ) < h ( r ) h(l) < h(r) h(l)<h(r)。 观察以 l l l 为左边界所能构成的其他水量&#xff0c;与矮的右边界搭配结果如下。 与高的…

Vue移动端地图App:van-uploader导致的卡顿问题

问题描述 基于Vue3+Vant IU 4开发的移动端地图App,在进行地图点位上报、上报记录查看过程中,出现App卡顿、甚至闪退的问题,进行问题定位之后,发现是van-uploader组件导致的问题。 van-uploader文件上传组件 van-uploader组件用于将本地的图片或文件上传至服务器,并在上传…

GOROOT GOPATH GOPROXY GO111MODULE

GOROOT GOROOT代表Go的安装目录。可执行程序go(或go.exe)和gofmt(或gofmt.exe)位于 GOROOT/bin目录中。 配置GOROOT环境变量&#xff0c;其值为Go的安装目录&#xff1b;然后在环境变量PATH中添加GOROOT/bin路径。 注意&#xff1a;GOROOT变量只是代表了安装目录&#xff0c;不…

Python基础小知识问答系列-可迭代型变量赋值

1. 问题&#xff1a; 怎样简洁的把列表中的元素赋值给单个变量&#xff1f; 当需要列表中指定几个值时&#xff0c;剩余的变量都收集在一起&#xff0c;该怎么进行变量赋值&#xff1f; 当只需要列表中指定某几个值&#xff0c;其他值都忽略时&#xff0c;该怎么…

红酒与建筑:品味历史与艺术的交汇

在时间的长河中&#xff0c;红酒与建筑都是人类智慧的结晶&#xff0c;它们各自承载着历史的厚重与艺术的韵味。当这两者交汇时&#xff0c;仿佛是一场穿越时空的对话&#xff0c;将我们带入一个既古老又现代、既深沉又温柔的世界。今天&#xff0c;就让我们一起走进这个奇妙的…

【前端VUE】VUE3第一节—vite创建vue3工程

什么是VUE Vue (发音为 /vjuː/&#xff0c;类似 view) 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建&#xff0c;并提供了一套声明式的、组件化的编程模型&#xff0c;帮助你高效地开发用户界面。无论是简单还是复杂的界面&#xff0…

网页计算器的实现

简介 该项目实现了一个功能完备、交互友好的网页计算器应用。只使用了 HTML、CSS 和 JavaScript &#xff0c;用于检验web前端基础水平。 开发环境&#xff1a;Visual Studio Code开发工具&#xff1a;HTML5、CSS3、JavaScript实现效果 功能设计和模块划分 显示模块&#…

[数据集][目标检测]围栏破损检测数据集VOC+YOLO格式1196张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1196 标注数量(xml文件个数)&#xff1a;1196 标注数量(txt文件个数)&#xff1a;1196 标注…

1、音视频解封装流程---解复用

对于一个视频文件(mp4格式/flv格式)&#xff0c;audio_pkt或者video_pkt是其最基本的数据单元&#xff0c;即视频文件是由独立的视频编码包或者音频编码包组成的。 解复用就是从视频文件中把视频包/音频包单独读取出来保存成独立文件&#xff0c;那么如何得知packet是视频包还是…