摘要
本文主要介绍了Java.util.concurrent包所提供的 Executor 异步执行器框架,涵盖了相关的接口和类。
并发执行器类图
图1 java 并发执行器相关类图
Executor 接口
Executor
接口提供了一种将任务的提交与任务的实际执行机制分离开来的方法。它只有一个方法 execute(Runnable command)
,用于执行给定的 Runnable
任务。
主要功能
-
任务提交与执行分离:
Executor
允许客户端将任务的提交与任务的实际执行细节分离,使得任务的创建和执行解耦。 -
线程管理:使用
Executor
可以避免显式地创建和管理线程,而是由Executor
管理线程的生命周期。 -
灵活性和可扩展性:
Executor
接口可以有不同的实现,比如ThreadPoolExecutor
或ScheduledThreadPoolExecutor
,这些实现提供了不同的任务执行策略。
使用场景
-
当需要异步执行任务时。
-
当需要重用线程而不是为每个任务创建新线程时。
-
当需要控制并发执行的任务数量时。
-
当需要执行定时任务时(使用
ScheduledExecutorService
)。
ExecutorService 接口
java.util.concurrent.ExecutorService
扩展了 Executor
接口并提供了管理终止和生成 Future 的方法。ExecutorService
允许你提交可调用的任务(Callable<V>
)或运行任务(Runnable
),并且可以管理线程池的生命周期。通过使用 ExecutorService
,你可以更方便地执行、调度、管理和控制并发任务,而无需直接处理线程创建和销毁等细节。
主要功能
-
任务提交:允许提交
Runnable
和Callable
任务,并返回相应的Future
对象来跟踪任务的状态和结果。 -
批量提交:支持一次性提交多个任务,并且可以通过
invokeAll()
方法等待所有任务完成,或者通过invokeAny()
方法等待任意一个任务完成。 -
线程池管理:提供了一系列的方法来控制线程池的生命周期,包括启动、关闭以及等待所有任务完成。
-
调度能力:虽然
ScheduledExecutorService
是ExecutorService
的子接口,专门用于定时和周期性任务的调度,但ExecutorService
本身也具备基本的调度能力。
AbstractExecutorService
AbstractExecutorService
是 Java 中java.util.concurrent
包下的一个抽象类,它实现了ExecutorService
接口。ExecutorService
接口提供了管理异步任务执行的方法,包括任务提交、任务执行控制和任务结果获取等功能。AbstractExecutorService
对ExecutorService
接口中的方法提供了默认实现,这使得创建自定义的线程池或者执行服务变得更加方便。
ThreadPoolExecutor
ThreadPoolExecutor
是一个可扩展的线程池实现,允许你更细粒度地控制线程池的行为。它提供了丰富的构造函数参数,使得你可以根据应用程序的需求来定制线程池的工作方式。
主要特性
-
核心线程数 (
corePoolSize
):线程池中保持的最小线程数量,即使这些线程处于空闲状态也不会被终止。 -
最大线程数 (
maximumPoolSize
):线程池中允许的最大线程数量。当有新任务提交且当前运行的线程数小于最大值时,线程池可以创建新的线程来处理任务。 -
保持时间 (
keepAliveTime
):当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。 -
工作队列 (
BlockingQueue<Runnable>
):用于保存等待执行的任务的队列。常见的实现包括LinkedBlockingQueue
、SynchronousQueue
等。 -
线程工厂 (
ThreadFactory
):用于创建新线程的对象。默认情况下会创建具有默认属性的新线程,但你可以通过提供自定义的ThreadFactory
来改变这一行为。 -
拒绝策略 (
RejectedExecutionHandler
):当线程池无法处理新提交的任务时(例如因为队列已满),所采取的策略。Java 提供了四种内置的拒绝策略,分别是抛出异常、调用者运行、丢弃任务和丢弃最老的任务。
示例代码
import java.util.concurrent.*;public class ThreadPoolExecutorExample {public static void main(String[] args) throws InterruptedException {// 创建一个 ThreadPoolExecutor 实例ThreadPoolExecutor executor = new ThreadPoolExecutor(2, // 核心线程数4, // 最大线程数60L, TimeUnit.SECONDS, // 空闲线程存活时间new LinkedBlockingQueue<>(10), // 工作队列Executors.defaultThreadFactory(), // 线程工厂new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);// 提交一些任务给线程池for (int i = 0; i < 5; i++) {final int taskNumber = i;executor.submit(() -> {System.out.println("Executing Task " + taskNumber);try {Thread.sleep(1000); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 关闭线程池executor.shutdown();if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}}
}
ScheduledExecutorService 接口
ScheduledExecutorService
提供了更灵活、强大的调度功能,可以替代 Timer
和 TimerTask
。它支持并发执行多个任务,并且提供了更多定制化选项,如固定速率和固定延迟执行等。
使用步骤
-
使用
Executors
工厂方法创建一个ScheduledExecutorService
实例。 -
使用
ScheduledExecutorService
的schedule()
,scheduleAtFixedRate()
, 或scheduleWithFixedDelay()
方法安排任务执行。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
继承自 ThreadPoolExecutor
,并进一步实现了 ScheduledExecutorService
接口。这意味着除了具备普通线程池的功能外,它还支持定时和周期性任务的调度。这使得它可以用来安排在未来某个时间点执行的任务,或者以固定速率或固定延迟重复执行的任务。
主要特性
-
调度能力:可以通过
schedule()
方法安排任务在未来某个时间点执行;通过scheduleAtFixedRate()
或scheduleWithFixedDelay()
方法安排周期性任务。 -
继承特性:由于它是
ThreadPoolExecutor
的子类,所以也继承了所有与线程管理相关的配置选项,如核心线程数、最大线程数、工作队列等。 -
灵活性:虽然
Executors
类提供了简便的方法来创建调度线程池(如newScheduledThreadPool()
),但在某些情况下,直接使用ScheduledThreadPoolExecutor
可能更加灵活,因为它允许你自定义更多参数。
示例代码
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ScheduledThreadPoolExecutorExample {public static void main(String[] args) throws InterruptedException {// 创建一个 ScheduledThreadPoolExecutor 实例ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(2);// 安排一个任务,在2秒后执行一次scheduler.schedule(() -> System.out.println("Scheduled Task executed"), 2, TimeUnit.SECONDS);// 安排一个周期性任务,首次执行延迟1秒,之后每隔3秒执行一次scheduler.scheduleAtFixedRate(() -> System.out.println("Periodic Task executed"), 1, 3, TimeUnit.SECONDS);// 让主线程稍微等待一下,以便观察调度任务的输出Thread.sleep(10000);// 关闭调度线程池scheduler.shutdown();if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {scheduler.shutdownNow();}}
}
ForkJoinPool
ForkJoinPool 是 Java 7 引入的一个用于并行执行任务的线程池。它基于工作窃取(Work - Stealing)算法,这使得它在处理可以被递归分解为子任务的计算密集型任务时非常高效。这种任务模型非常适合分治法(Divide - and - Conquer)策略,例如对大型数组进行排序、矩阵运算、树形数据结构的遍历和处理等场景。
工作原理
-
工作窃取算法
-
ForkJoinPool 中的每个工作线程都有自己的双端队列(Deque)来存放任务。当一个线程完成自己队列中的任务后,它会尝试从其他线程的队列末尾 “窃取” 任务来执行。这种机制充分利用了线程资源,避免了线程空闲,提高了整体的并行效率。
-
例如,假设有线程 A 和线程 B,线程 A 的任务队列已经为空,而线程 B 的任务队列还有任务。此时线程 A 会尝试从线程 B 的任务队列末尾窃取任务来执行,因为队列末尾的任务通常是新添加的,窃取这些任务对线程 B 正在执行的任务影响较小。
-
-
任务分解与合并(Fork 和 Join)
-
Fork 操作是指将一个大任务分解为多个较小的子任务。这些子任务可以被不同的线程并行处理。例如,对于一个计算大型数组部分和的任务,可以将数组分成多个子数组,每个子数组的求和任务作为一个子任务。
-
Join 操作是指等待所有子任务完成,并将子任务的结果合并为最终结果。在上述数组求和的例子中,当各个子数组求和的子任务完成后,需要将这些子和相加得到整个数组的和,这就是 Join 操作。
-
核心组件和数据结构
-
线程池(Worker Threads):ForkJoinPool 包含一组工作线程,这些线程用于执行任务。线程数量可以通过构造函数指定,也可以使用默认配置。
-
任务队列(Work - Queues):每个线程都有自己的任务队列,用于存放分配给该线程的任务以及通过工作窃取获取到的任务。这些队列采用双端队列的形式,方便线程从两端操作任务。
应用场景
-
大规模数据处理:如对大型数据集进行排序(例如,对一个包含数百万个元素的数组进行快速排序)、数据筛选、数据转换等操作。
-
递归算法实现:在处理树形结构(如二叉树的遍历、计算树的深度等)或者图结构(如深度优先搜索、广度优先搜索的并行化)的问题时,ForkJoinPool 可以很好地发挥作用。
-
复杂的数学计算:例如矩阵乘法、大型数值计算等任务,这些任务可以分解为多个子任务进行并行处理。
优势
-
高效的并行计算:通过工作窃取算法,充分利用多核处理器的性能,提高计算效率,减少任务执行时间。
-
易于使用:对于可以分解为子任务的问题,使用 ForkJoinPool 和 ForkJoinTask 可以方便地实现并行计算,不需要复杂的线程管理和同步操作。
-
自动负载均衡:工作窃取机制使得线程之间的工作负载能够自动均衡,避免了某些线程过度忙碌而其他线程空闲的情况。
ExecutorCompletionService
ExecutorCompletionService
是 Java 并发包中的一个辅助类,它结合了 Executor
和 CompletionService
接口的功能。ExecutorCompletionService
提供了一种机制来提交任务并获取它们的结果,同时允许你以非阻塞的方式检查哪些任务已经完成。这使得你可以更灵活地处理并发任务的执行结果,特别是当你需要根据任务完成的顺序来处理结果时。
主要功能
- 任务提交:你可以通过
submit()
方法向ExecutorCompletionService
提交可调用的任务(Callable<V>
)或运行任务(Runnable
)。每个提交的任务都会被包装成一个Future
对象,并由内部的Executor
来执行。 - 结果获取:使用
take()
或poll()
方法可以从CompletionService
中取出已完成的任务的结果。take()
会阻塞当前线程直到有任务完成,而poll()
则是非阻塞的,如果没有任何任务完成则立即返回null
。还有带超时参数的poll(long timeout, TimeUnit unit)
可以等待指定的时间。 - 任务状态跟踪:无论任务是以何种顺序完成的,
ExecutorCompletionService
都能保证你能够按照完成的顺序来获取任务的结果,而不是按照提交的顺序。
使用场景
-
当你需要处理大量并发任务,并且希望按照任务完成的顺序来处理结果,而不是按照提交的顺序。
-
当你需要在所有任务完成之前就开始处理某些已经完成的任务。
-
当你需要实现一种工作窃取模式,即让空闲的工作线程从其他忙碌的工作线程那里窃取任务来执行。
示例代码
import java.util.concurrent.*;public class ExecutorCompletionServiceExample {public static void main(String[] args) throws InterruptedException, ExecutionException {// 创建一个固定大小为2的线程池ExecutorService executor = Executors.newFixedThreadPool(2);// 创建一个 ExecutorCompletionService 实例ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(executor);// 提交一些任务给 ECSfor (int i = 0; i < 5; i++) {final int taskNumber = i;ecs.submit(() -> {System.out.println("Executing Task " + taskNumber);Thread.sleep((long)(Math.random() * 1000)); // 模拟不同耗时的操作return taskNumber * 2;});}// 获取并处理任务的结果,按照任务完成的顺序for (int i = 0; i < 5; i++) {Future<Integer> future = ecs.take(); // 阻塞直到有任务完成Integer result = future.get(); // 获取任务的结果System.out.println("Task completed with result: " + result);}// 关闭线程池executor.shutdown();if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}}
}
在这个例子中,我们创建了一个 ExecutorCompletionService
实例,并通过它提交了五个不同的任务。然后,我们使用 ecs.take()
方法按照任务完成的顺序来获取每个任务的结果。请注意,即使任务是按顺序提交的,但因为每个任务的执行时间不同,它们可能会以任意顺序完成。
注意事项
-
资源管理:务必确保在线程池不再需要时调用
shutdown()
或shutdownNow()
来释放资源。未关闭的线程池可能会导致程序无法正常退出。 -
异常处理:当使用
submit()
提交Callable
任务时,任何抛出的异常都会被封装成ExecutionException
抛出。因此,在调用get()
方法获取结果时应当做好异常处理。 -
任务依赖:如果你的任务之间存在依赖关系,考虑使用
CompletableFuture
或者其他高级并发工具来更好地表达这些依赖。 -
性能优化:选择合适的线程池类型和大小对于性能至关重要。例如,对于 I/O 密集型任务,线程池大小可以设置得相对较大;而对于 CPU 密集型任务,则应保持较小的线程数以避免过多上下文切换带来的开销。
Executors
Executors
提供了一系列工厂方法,用于创建不同类型的 ExecutorService
、ScheduledExecutorService
、ThreadFactory
和 Callable
对象。这些工厂方法简化了并发编程中常见任务的处理,无需手动实现这些接口。
创建 ExecutorService 实例
通常,你会使用 Executors
工厂类来创建不同类型的 ExecutorService
实例。以下是几种常见的类型:
-
FixedThreadPool:创建一个固定大小的线程池,其中包含一定数量的线程。当有新任务提交时,如果线程池中有空闲线程,则会立即执行任务;否则,任务将被放入队列中等待。
-
CachedThreadPool:创建一个根据需要创建新线程的线程池,但在可能的情况下会重用已有的空闲线程。适合执行大量短期异步任务。
-
SingleThreadExecutor:创建一个只有一个线程的线程池,确保所有任务都在同一个线程上按顺序执行。
-
ScheduledThreadPool:创建一个支持定时及周期性任务执行的线程池。
-
WorkStealingPool:创建一个基于Fork-Join框架的工作窃取线程池。
示例代码
package person.wend.javalearnexample.util.executor;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class ExecutorsExample {public static void main(String[] args) {// 创建一个可缓存线程池,如果线程空闲60秒后将会被回收ExecutorService cachedThreadPool = Executors.newCachedThreadPool();// 创建一个固定大小的线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);// 创建一个单线程化的线程池ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();// 创建一个支持定时及周期性任务执行的线程池ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);// 创建一个json-task 工作窃取线程池ExecutorService workStealingPool = Executors.newWorkStealingPool();// 使用cachedThreadPool执行任务for (int i = 0; i < 10; i++) {cachedThreadPool.execute(new RunnableTask("CachedThreadPool Task " + i));}// 使用fixedThreadPool执行任务for (int i = 0; i < 5; i++) {fixedThreadPool.execute(new RunnableTask("FixedThreadPool Task " + i));}// 使用singleThreadExecutor执行任务for (int i = 0; i < 3; i++) {singleThreadExecutor.execute(new RunnableTask("SingleThreadExecutor Task " + i));}// 使用scheduledThreadPool执行定时任务for (int i = 0; i < 3; i++) {// 延迟1秒后执行任务scheduledThreadPool.execute(new RunnableTask("ScheduledThreadPool Task " + i));}// 使用 workStealingPool 执行任务for (int i = 0; i < 5; i++) {workStealingPool.execute(new RunnableTask("WorkStealingPool Task " + i));}// 关闭所有线程池shutdownAndAwaitTermination(cachedThreadPool);shutdownAndAwaitTermination(fixedThreadPool);shutdownAndAwaitTermination(singleThreadExecutor);shutdownAndAwaitTermination(scheduledThreadPool);shutdownAndAwaitTermination(workStealingPool);}// 一个简单的Runnable任务static class RunnableTask implements Runnable {private final String taskName;RunnableTask(String taskName) {this.taskName = taskName;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " is executing " + taskName);try {// 模拟任务执行时间TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}// 安全关闭线程池的方法static void shutdownAndAwaitTermination(ExecutorService pool) {pool.shutdown(); // 禁止提交新任务try {// 等待现有任务终止if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {pool.shutdownNow(); // 取消当前执行的任务// 等待任务响应被取消if (!pool.awaitTermination(60, TimeUnit.SECONDS))System.err.println("线程池未正常终止");}} catch (InterruptedException ie) {// 重新取消当前线程进行中断pool.shutdownNow();// 保留中断状态Thread.currentThread().interrupt();}}
}
创建 ScheduledExecutorService
newSingleThreadScheduledExecutor:
该方法用于创建一个单线程的ScheduledExecutorService
。这个单线程可以用于调度任务,使其在给定的延迟之后执行,或者周期性地执行。如果这个单线程在执行任务过程中由于某种故障而终止(在关闭之前),那么在需要执行后续任务时,会有一个新的线程来替代它。并且,任务是保证按顺序执行的,在任何给定时间最多只有一个任务处于活动状态。newScheduledThreadPool:
此方法用于创建一个可以调度命令在给定延迟后运行,或者周期性运行的线程池。你可以指定线程池的核心线程数(corePoolSize
),这个线程池会根据需要保持一定数量的线程(即使它们处于空闲状态)来处理任务。
示例代码
public static void createScheduledExecutorService(){ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();// 提交一个延迟2秒后执行的任务executor.schedule(() -> System.out.println("任务执行了"), 2, TimeUnit.SECONDS);// 关闭执行器executor.shutdown();ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(3);// 提交一个延迟2秒后执行的任务executor2.schedule(() -> System.out.println("任务执行了"), 2, TimeUnit.SECONDS);executor2.shutdown();}
创建 ThreadFactory
我们可以通过Executors.defaultThreadFactory()方法创建一个默认的线程工厂。
public static void usedDefaultThreadFactory() {// 创建一个Executors.defaultThreadFactoryThreadFactory customThreadFactory = Executors.defaultThreadFactory();// 使用自定义的 ThreadFactory 创建一个 ExecutorServiceExecutorService executorService = Executors.newCachedThreadPool(customThreadFactory);// 提交任务到线程池执行for (int i = 0; i < 10; i++) {executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " is running.");});}// 关闭线程池executorService.shutdown();}
Callable
Executors
中包含几个重载的静态方法,用于将 Runnable
、PrivilegedAction
和 PrivilegedExceptionAction
转换为 Callable
对象。这种转换在需要一个返回结果的任务时非常有用,特别是当你有一个不返回任何结果的 Runnable
任务但希望将其适配到需要 Callable
的上下文中时。
注意事项
虽然 Executors
类提供了方便的工厂方法,但是在某些情况下,直接使用这些方法可能不是最佳实践。例如,newCachedThreadPool()
和 newFixedThreadPool(int nThreads)
在默认情况下使用的是无界队列,这可能导致内存耗尽问题。因此,在生产环境中,通常建议直接使用 ThreadPoolExecutor
构造函数来创建线程池,以便更精确地控制线程池的行为和性能。
参考文献/AIGC
通义tongyi.ai_你的全能AI助手-通义千问
豆包
相关文章推荐
JavaUsefulMode: 基于Java 语言的自定义实用工具集
JavaUsefulMode是小编编写Java方向学习专栏时的代码示例汇总总结,其中内容包含了该篇文章所涉及的Java代码示例。感兴趣的小伙伴可以直接下载学习。
Wend看源码-Java.util 工具类学习(上)-CSDN博客
Wend看源码-Java.util 工具类学习(下)-CSDN博客
Wend看源码-Java-Collections 工具集学习-CSDN博客
Wend看源码-Java-Arrays 工具集学习-CSDN博客
Wend看源码-Java-fork/Join并行执行任务框架学习-CSDN博客