1、ThreadPoolTaskExecutor 创建线程池
从它的创建和使用说起,创建和使用的代码如下:
创建:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setThreadNamePrefix("ExecutorPool-");executor.initialize();return executor;
使用:
threadPoolTaskExecutor.execute(() -> {//do something...});
上面创建线程池使用的是ThreadPoolTaskExecutor ,点击进入executor.initialize()
方法:
ExecutorConfigurationSupport.java
/*** Set up the ExecutorService.*/public void initialize() {if (logger.isInfoEnabled()) {logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));}if (!this.threadNamePrefixSet && this.beanName != null) {setThreadNamePrefix(this.beanName + "-");}this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);}
再进入initializeExecutor方法:
ThreadPoolTaskExecutor.java
/*** Note: This method exposes an {@link ExecutorService} to its base class* but stores the actual {@link ThreadPoolExecutor} handle internally.* Do not override this method for replacing the executor, rather just for* decorating its {@code ExecutorService} handle or storing custom state.*/@Overrideprotected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);ThreadPoolExecutor executor;if (this.taskDecorator != null) {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler) {@Overridepublic void execute(Runnable command) {Runnable decorated = taskDecorator.decorate(command);if (decorated != command) {decoratedTaskMap.put(decorated, command);}super.execute(decorated);}};}else {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler);}if (this.allowCoreThreadTimeOut) {executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor = executor;return executor;}
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveSeconds,TimeUnit.SECONDS,queue, threadFactory,rejectedExecutionHandler);
可以看到new ThreadPoolExecutor()
,ThreadPoolTaskExecutor 本质是使用ThreadPoolExecutor,那为何要存在ThreadPoolTaskExecutor?
ThreadPoolTaskExecutor的方法基本都是围绕如何创建ThreadPoolExecutor,安全有效设置各种参数,添一些行为等。
如下面设置MaxPoolSize(最大线程池数),它考虑到并发同步、threadPoolExecutor 是否为null的问题。
ThreadPoolTaskExecutor.java
/*** Set the ThreadPoolExecutor's maximum pool size.* Default is {@code Integer.MAX_VALUE}.* <p><b>This setting can be modified at runtime, for example through JMX.</b>*/
public void setMaxPoolSize(int maxPoolSize) {synchronized (this.poolSizeMonitor) {this.maxPoolSize = maxPoolSize;if (this.threadPoolExecutor != null) {this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);}}
}
再如:
设置队列时,它会创建一个安全的队列,有合适大小的LinkedBlockingQueue或者没有大小的SynchronousQueue。从而避免使用newSingleThreadExecutor这类创建一个不安全的等待队列。
ThreadPoolTaskExecutor.java
/*** Create the BlockingQueue to use for the ThreadPoolExecutor.* <p>A LinkedBlockingQueue instance will be created for a positive* capacity value; a SynchronousQueue else.* @param queueCapacity the specified queue capacity* @return the BlockingQueue instance* @see java.util.concurrent.LinkedBlockingQueue* @see java.util.concurrent.SynchronousQueue*/
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new LinkedBlockingQueue<>(queueCapacity);}else {return new SynchronousQueue<>();}
}
newSingleThreadExecutor方式创建的等待队列中的LinkedBlockingQueue容量是Integer.MAX_VALUE
Executors.java
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
LinkedBlockingQueue.java
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}
ThreadPoolTaskExecutor的父类ExecutorConfigurationSupport的方法可以设置BeanName、ThreadNamePrefix等。
总结来说在spring项目中使用ThreadPoolTaskExecutor创建线程池是首推使用的。
2、ThreadPoolExecutor解读
2.1 基本使用介绍
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveSeconds,TimeUnit.SECONDS,queue, threadFactory,rejectedExecutionHandler);
-
corePoolSize:线程池的核心线程数,定义了最小可以同时运行的线程数量。
-
maximumPoolSize:线程池的最大线程数。队列中存放的任务达到队列容量时,可以同时运行的线程数量变为最大线程数。
-
keepAliveTime:当线程池中的线程数量大于corePoolSize时,如果没有新任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了KeepAliveTime才会被回收销毁。
-
unit:keepAliveTime参数的时间单位,包括DAYS、HOURS、MINUTES、MILLISECONDS等。
-
workQueue:用于保存等待执行任务的阻塞队列。可以选择以下集个阻塞队列:
- ArrayBlockingQueue:是一个基于数组结构的阻塞队列,此队列按FIFO原则对元素进行排序;
- LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列;
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量常高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列;
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列;
-
threadFactory:用于设置创建线程的工厂,可以通过工厂给每个创造出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字:new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();7)handler:饱和策略。若当前同时运行的线程数量达到最大线程数量并且队列已经被放满,ThreadPoolExecutor定义了一些饱和策略:
- ThreadPoolExecutor.AbortPolicy:直接抛出RejectedExecutionException异常来拒绝处理新任务;
- ThreadPoolExecutor.CallerRunsPolicy:只用调用者所在的线程来运行任务,会降低新任务的提交速度,影响程序的整体性能;
- ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉;
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最近的一个任务,执行当前任务;
处理流程:
1.在创建了线程池之后,等待提交过来的任务请求
2.当调用execute()方法添加一个请求任务的时候,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个程序
2.2 如果正在运行的线程数量大于或者等于corePoolSize,那么将这个任务放入队列
2.3 如果这个时候队列满了并且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻执行这个任务
2.4 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
3. 当一个线程完成任务的时候,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定时间(keepAliveTime)时:线程会判断如果当前运行的线程数大于corePoolSize,那么这个线程就会被停掉。
最经典的比喻是银行办理业务,可以很形象的去理解线程池七个核心参数之间的关系:
银行刚开始上班,然后又一位需要办理业务的顾客1进来,柜员1就来一个帮这位顾客1办理,其他在吃早餐摸鱼。这位顾客1办理完了业务就走了,柜员1坐在柜台上静等。
这时又有一位顾客2过来,这时不是柜员1去接待,而是叫上柜员2,因为这天安排上班的两位柜员(corePoolSize=2)。
顾客3来办理业务,因为上班的柜员已经全部就位了,柜员1有空,柜员1就去接待顾客3。
顾客4来办理业务,因为柜员1和柜员2都在忙,所以顾客4在凳子上静等。柜员2帮顾客2办理完了,就去给顾客4办理。这时凳子又空出3张(Queue容量为3)。
顾客5、顾客6、顾客7同时过来,柜员1和柜员2都在忙,顾客5、顾客6、顾客7就坐满了凳子。
过了一会,顾客8来了,柜员1和柜员2在忙同时没有凳子坐了,本着顾客就是上帝的原则,打电话叫来一个在休息的柜员3帮顾客8办理业务。
生意兴隆,顾客9来了,打电话叫来一个在休息的柜员4帮顾客9办理业务。
这是顾客10来了,一进门,柜员没有了,连休息中的也没有了(maximumPoolSize=4),银行直接赶走他。
过了一段时间,银行的客户都办完业务走了,那两位本来应该休息的柜员说:我们再等一下(keepAliveTime),不忙了我们就撤了。
看上面的场景中,搬救兵请休息中的柜员回来干活是很麻烦的,要先坐满凳子再说。线程池中也是,要等等待队列满了以后才会去创建核心线程数之外的线程,因为创建线程的花销是很大的。