目录
1 线程池各参数
1.1 corePoolSize
1.2 maximunPoolSize
1.3 keepAliveTime
1.4 workQueue
1.5 RejectedExecutionHandler
2 线程池工作机制
2.1 流程
2.2 提交任务
3 相关问题
3.1 线程池核心线程数、最大线程数设置
3.2 ApiPost压测
3.3 为什么要用阻塞队列
4 源码分析
4.1 execute
4.2 addWorker
4.3 runWorker
4.4 getTask
4.5 异常处理
为什么要使用线程池?
1 重复利用已创建的线程,减少线程创建和销毁带来的开销
2 提高响应速度:任务可以不用等待线程创建就能立即执行(T1 创建线程 T2执行任务 T3销毁线程),若T1+T3>T2,可以通过线程池提高响应速度
3 提高线程可管理性:线程是稀缺资源,会降低系统稳定性,通过线程池可以对线程进行统一分配、调优和监控
1 线程池各参数
1.1 corePoolSize
核心线程数(CPU核数),任务提交后
线程数若小于corePoolSize,会一直创建(之前空闲的线程也不会使用)
线程数=corePoolSize,提交的任务会提交到队列中,等待被被执行(非核心线程or核心线程)
执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程
1.2 maximunPoolSize
线程中允许的最大线程数;当前阻塞队列满且提交新任务,则会开辟非核心线程去执行任务(前提是当前线程<maximunPoolSize)
1.3 keepAliveTime
线程空闲的闲置时间(当没其它任务时,允许存活的时间)
1.4 workQueue
当线程池的线程数超过corePoolSize时,任务会进入BlockingQueue进行阻塞
一般来说,选择有界队列,使用无界队列会带来如下影响:
1 线程数达到corePoolSize时,新任务会被放到无界队列中等待,线程池中的线程数不会超过corePoolSize(maximunPoolSize会无效)
2 由于1,maximunPoolSize会无效
3 由于1,keepAliveTime会无效
4 无界队列会占用系统资源,有界队列有利于预防资源耗尽
一般使用:ArrayBlockingQueue、LinkedBlockingQueue、SychronousQueue、PriorityBlockingQueue
1.5 RejectedExecutionHandler
当阻塞队列满,且没有空闲线程工作时,如果继续提交任务,会采取一种策略处理该任务:
AbortPolicy:直接抛异常(默认策略)
CallerRunsPolicy:用调用者所在的线程执行
DiscardOldestPolicy:丢弃阻塞队列靠前的任务,并执行当前任务
DiscardPolicy:直接丢弃任务
2 线程池工作机制
2.1 流程
注意:
1 提交任务时,不管核心线程数是否空闲,只要线程数<corePoolSize,线程池都会优先创建线程
2 ThreadPoolExecutor是非公平的,队列满之后提交的Runnable可能比队列中靠前的先执行
2.2 提交任务
1 execute
2 submit:本质是1,只不过操作的是Future对象
3 相关问题
3.1 线程池核心线程数、最大线程数设置
corePoolSize:核心线程数,常驻线程数
maximumPoolSize:线程池能开辟的最大线程数
任务的性质:CPU密集型、IO密集型、混合型
CPU密集型:
线程在执行任务时,会一直利用CPU,要避免发生线程的上下文切换,通常设置为Ncpu+1
IO密集型:
线程执行任务时使用CPU的时间<<接口响应时间;可以考虑设置为 2*Ncpu
获取系统CPU个数(Ncpu):Runtime.getRuntime().availableProcessors() //逻辑核
计算核心线程数核心理论:Nthreads = CPU核数*(1+线程总等待时间/线程总运行时间)
总结:
1 CPU密集型:CPU核数+1,充分利用CPU,不至于有太多上下文切换
2 IO密集型:建议压测,或用公式计算一个理论值
3 对于核心业务(访问频率高),可以把核心线程数设置为压测的结果,最大线程数可以和核心线程差不多或更大一些;譬如:压测后发现500线程最佳,600线程也许还行,则最大线程数可以设置为600
4 对于非核心业务(访问率不高),核心线程可以设置比较小,避免操作系统维护不必要的线程,最大线程可以设置为计算、压测出的结果
3.2 ApiPost压测
ApiPost压测结果:
1 尝试不同的线程数,然后进行压测,或把可以接收的线程数设置为核心线程数
2 若请求一直不断,则核心线程数可以设置为最大线程数,一开始就创建很多线程,高效处理
3 如果平时压力小,偶尔高并发,则可设置10核心线程数,500最大线程数
4 核心业务,总有500请求过来,核心线程数可以设置为500;最大线程数可以设置为500,也可设置为800
3.3 为什么要用阻塞队列
线程池中的线程,从创建线程并领取任务执行后,执行完时,后续会一直从阻塞队列中获取任务,线程为了不消亡,会在获取队列时阻塞(核心线程不消亡;非核心线程根据keepAliveTime决定存活时间)
4 源码分析
4.1 execute
4.2 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//1 尝试增加工作线程计数;未成功重新尝试if (compareAndIncrementWorkerCount(c)) break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//2 创建Work对象,封装要执行的任务firstTaskw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());//线程池未处于关闭状态if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())throw new IllegalThreadStateException();//3 在锁状态下将新的worker对象添加到工作线程列表workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//4 若任务被添加到线程列表,则执行该任务if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)//5 若出现异常则调用addWorkerFailed方法处理异常addWorkerFailed(w);}return workerStarted;}
4.3 runWorker
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;//获取到任务后置空w.firstTask = null;//在任何时候可以响应中断???w.unlock();boolean completedAbruptly = true;try {//不断尝试获取任务并执行while (task != null || (task = getTask()) != null) {w.lock();//线程池终止时,所有线程都中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//记录日志等beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {//执行完成,释放锁task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
4.4 getTask
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//自旋,获取任务for (;;) {int c = ctl.get();int rs = runStateOf(c);//线程池即将关闭if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//工作线程数>最大线程数,清理工作线程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//返回队列任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
4.5 异常处理
private void processWorkerExit(Worker w, boolean completedAbruptly) {//如果是意外退出则workerCount--if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//加锁统计执行的任务数 并移除WorkerSet中的Workerfinal ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//记录执行的任务次数 然后重worker set中移除completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//判断线程池状态 根据状态是否需要终止线程tryTerminate();int c = ctl.get();//如果线程池状态是小于stop的 非停止状态if (runStateLessThan(c, STOP)) {//如果没有意外退出 需要根据是否运行核心线程数存活标识来是否销毁线程if (!completedAbruptly) {//如果设置了KeepAliveTime之后,核心线程也会被销毁int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果核心线程最少保留0且队列不为空的情况下 //线程池并没停止 则保留一个工作线程if (min == 0 && ! workQueue.isEmpty())min = 1;//如果当前的工作线程总数大于需要的Worker数 那就直接返回if (workerCountOf(c) >= min)return; // replacement not needed}//新建一个worker 核心线程addWorker(null, false);}
}