目录
- 线程池的执行原理
- 线程执行
- 参考:
线程池的执行原理
假设最大核心数是2,非核心线程数为1,队列长度是3
来第一个任务的时候,没有工作线程在工作,需要创建一个
来第二个任务的时候,发现当前核心线程数小于最大核心线程数,所以继续创建线程来处理任务
当来第三个任务的时候,发现当前核心线程数已经等于最大核心线程数了,所以把新来的任务放到taskQueue中
后面来第四个、第五个任务也会放在taskQueue中
当来第六个任务的时候,发现taskQueue已经满了,所以会创建一个非核心线程来处理任务
当来第七个任务的时候,因为线程数量到最大限度了,taskQueue也满了,所以就会走拒绝策略,把其中一个任务给抛弃掉,具体抛弃哪个需要根据选择的拒绝策略来定。
创建线程这里需要考虑并发的问题,即多个任务同时过来了,需要串行创建线程,否则,可能会导致超卖的情况(即创建的线程超过了最大线程数),具体是通过CAS乐观锁实现,代码解释如下:
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();//获取当前线程池的控制状态。int rs = runStateOf(c);//获取当前线程池的运行状态。// Check if queue empty only if necessary.//下面这个条件判断用于判断线程池是否处于关闭状态,并且任务队列不为空。如果线程池的状态大于等于SHUTDOWN,并且不满足线程池状态为SHUTDOWN、首个任务为null且任务队列为空的条件,则返回false。这个判断是为了确保在线程池关闭时,不再添加新的工作线程。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//获取当前线程池中的工作线程数量。//这个条件判断用于判断工作线程数量是否达到上限。如果工作线程数量大于等于CAPACITY(工作线程数量的上限)或者大于等于核心线程数(如果core为true)或最大线程数(如果core为false),则返回false。这个判断是为了确保工作线程数量不超过线程池的限制。if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//尝试通过CAS(比较并交换)操作增加工作线程数量。如果成功增加工作线程数量,则跳出循环,继续执行后续逻辑。if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // 重新读取当前线程池的控制状态。//再次判断线程池的运行状态是否发生了变化。如果运行状态发生了变化,则继续重试内部循环。这个判断是为了处理在CAS操作过程中,线程池的状态发生了变化的情况。if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}...创建线程的逻辑忽略...
}
线程执行
花开两朵,各表一枝。
上面说了任务来了之后,是怎么创建线程的,又是怎么暂存任务的。这一节介绍一下线程是怎么执行任务的,以及在不用的时候,线程怎么被销毁或者保活。
线程池创建线程并调了thread的start方法之后,该线程会走到下面的runWorker方法。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
当一个线程执行完就会自动退出,但是我们知道线程池中的核心线程会一直存活着,想要一直存活,不退出程序就可以了,即循环,从上面的代码也可以看出来确实是这样的。但是还有一个疑问,核心线程是一直存活的,但是非核心线程在一定情况是会销毁的,他们用的是一套代码逻辑,该怎么实现呢?关键点就在getTask这个方法中。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//销毁线程需要满足这两个条件:1. (允许核心线程销毁 || 线程数大于核心线程数)&& 达到了销毁时间;2. 任务队列中没有任务了if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))// 这里返回了null,外层方法就跳出了while循环,从而结束该线程return null;continue;}try {// 超时时间就是在这里设置的,如果允许超时销毁,那么就用poll进行拉取任务,超过了keepAliveTime就返回null。take是阻塞性等待Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
从上面的解析可以看出,如果队列中没有任务时,小于核心数的线程(核心线程数不销毁的情况下)会一直阻塞在获取任务的方法,直到返回任务。(判断阻塞时并没有核心线程和非核心线程的概念,只要保证创建出来的线程销毁到符合预期数量就ok)。而且执行完后 会继续循环执行getTask的逻辑,不断的处理任务。
参考:
https://blog.csdn.net/lbh199466/article/details/102700780