【线程池】springboot线程池的底层设计原理
- 【一】Java 线程池基础
- 【1】ThreadPoolExecutor 核心参数
- 【2】ThreadPoolExecutor 工作流程
- 【二】Spring Boot 中的线程池配置
- 【1】配置类方式
- 【2】配置文件方式
- 【三】Spring Boot 线程池的使用
- 【1】启用异步支持
- 【2】异步方法示例
- 【四】Spring Boot 线程池的底层调度机制
- 【五】线程池监控和调优
- 【六】常见底层原理问题
- 【1】线程池里的线程是怎么创建的?
- 【2】线程池里的线程是怎么阻塞的?
- 【3】线程池里的线程是怎么唤醒的?
- 【4】线程池里的线程是怎么回收的?
- 【5】线程池如何实现线程复用?
- 【6】线程池如何知道一个线程的任务已经执行完成
在 Spring Boot 中,线程池是一个重要的组件,用于管理和调度线程,提高应用程序的性能和资源利用率。下面将详细汇总 Spring Boot 线程池的底层原理。
【一】Java 线程池基础
Spring Boot 的线程池底层基于 Java 的 java.util.concurrent 包中的线程池实现,主要涉及到 ThreadPoolExecutor 类。理解 ThreadPoolExecutor 的工作原理是掌握 Spring Boot 线程池的基础。
【1】ThreadPoolExecutor 核心参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
(1)corePoolSize:核心线程数,线程池初始化时创建的线程数量,当有新任务提交时,会优先使用核心线程来执行任务。
(2)maximumPoolSize:最大线程数,线程池允许创建的最大线程数量。当核心线程都在执行任务,且任务队列已满时,会创建新的线程直到达到最大线程数。
(3)keepAliveTime:线程空闲时间,当线程空闲时间超过该值时,非核心线程会被销毁。
(4)unit:空闲时间的时间单位,如 TimeUnit.SECONDS。
(5)workQueue:任务队列,用于存储等待执行的任务。常见的任务队列有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等。
(6)threadFactory:线程工厂,用于创建线程。可以自定义线程工厂来设置线程的名称、优先级等属性。
(7)handler:拒绝策略,当任务队列已满且线程数达到最大线程数时,新提交的任务会触发拒绝策略。常见的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy 等。
【2】ThreadPoolExecutor 工作流程
(1)当有新任务提交时,首先检查核心线程数是否已满。如果核心线程数未满,则创建新的核心线程来执行任务。
(2)如果核心线程数已满,将任务放入任务队列中等待执行。
如果任务队列已满,检查线程数是否达到最大线程数。如果未达到最大线程数,则创建新的(3)非核心线程来执行任务。
(4)如果线程数达到最大线程数,且任务队列已满,新提交的任务会触发拒绝策略。
【二】Spring Boot 中的线程池配置
在 Spring Boot 中,可以通过配置类或配置文件来创建和配置线程池。
【1】配置类方式
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;@Configuration
public class ThreadPoolConfig {@Bean("customThreadPool")public Executor customThreadPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5); // 核心线程数executor.setMaxPoolSize(10); // 最大线程数executor.setQueueCapacity(20); // 任务队列容量executor.setKeepAliveSeconds(60); // 线程空闲时间executor.setThreadNamePrefix("custom-thread-"); // 线程名称前缀executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略executor.initialize();return executor;}
}
【2】配置文件方式
在 application.properties 或 application.yml 中配置线程池属性:
spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=20
spring.task.execution.pool.keep-alive=60s
spring.task.execution.thread-name-prefix=custom-thread-
【三】Spring Boot 线程池的使用
在 Spring Boot 中,可以通过 @Async 注解来异步执行方法,使用线程池来管理这些异步任务。
【1】启用异步支持
在主应用类上添加 @EnableAsync 注解来启用异步支持:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@SpringBootApplication
@EnableAsync
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
【2】异步方法示例
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;@Service
public class AsyncService {@Async("customThreadPool")public void asyncMethod() {// 异步执行的方法逻辑System.out.println("异步方法执行,线程名称:" + Thread.currentThread().getName());}
}
【四】Spring Boot 线程池的底层调度机制
Spring Boot 的线程池调度主要依赖于 ThreadPoolTaskExecutor 类,它是对 ThreadPoolExecutor 的封装。当调用 @Async 注解的方法时,Spring 会将任务提交给线程池进行处理。
(1)任务提交
ThreadPoolTaskExecutor 提供了 execute 和 submit 方法来提交任务:
execute 方法用于提交 Runnable 任务,没有返回值。
submit 方法可以提交 Runnable 或 Callable 任务,并且可以返回一个 Future 对象,用于获取任务的执行结果。
(2)任务调度
当任务提交到线程池后,线程池会根据 ThreadPoolExecutor 的工作流程进行任务调度。如果核心线程有空闲,会优先使用核心线程执行任务;如果核心线程已满,任务会被放入任务队列;如果任务队列已满,会创建新的非核心线程;如果线程数达到最大线程数,会触发拒绝策略。
【五】线程池监控和调优
在 Spring Boot 中,可以通过 Actuator 来监控线程池的状态,例如线程池的活跃线程数、任务队列大小等。同时,可以根据监控结果对线程池的参数进行调优,以提高应用程序的性能。
(1)线程池监控和调优
在 Spring Boot 中,可以通过 Actuator 来监控线程池的状态,例如线程池的活跃线程数、任务队列大小等。同时,可以根据监控结果对线程池的参数进行调优,以提高应用程序的性能。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
(2)监控线程池
通过访问 /actuator/metrics 端点可以查看线程池的相关指标,例如 executor.active 表示活跃线程数,executor.completed 表示已完成的任务数等。
综上所述,Spring Boot 线程池的底层原理基于 Java 的 ThreadPoolExecutor,通过配置和使用 ThreadPoolTaskExecutor 来实现任务的异步执行和调度。同时,可以通过 Actuator 对线程池进行监控和调优,以提高应用程序的性能和稳定性。
【六】常见底层原理问题
【1】线程池里的线程是怎么创建的?
当有新任务提交到线程池时,线程池会根据当前线程数量和配置参数决定是否创建新线程。若当前线程数小于核心线程数(corePoolSize),会直接创建新的核心线程;若核心线程已满,则将任务放入任务队列;若任务队列已满且线程数小于最大线程数(maximumPoolSize),会创建非核心线程。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取线程池的控制状态(包含线程数和运行状态)int c = ctl.get();// 如果当前线程数小于核心线程数if (workerCountOf(c) < corePoolSize) { // 尝试创建新线程执行任务if (addWorker(command, true)) return;c = ctl.get();}// 如果线程池在运行且任务能放入队列if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();// 再次检查线程池状态,如果已停止则移除任务并拒绝if (! isRunning(recheck) && remove(command)) reject(command);// 如果线程数为 0,创建一个无初始任务的线程else if (workerCountOf(recheck) == 0) addWorker(null, false);}// 如果任务队列已满,尝试创建非核心线程else if (!addWorker(command, false)) // 创建失败则拒绝任务reject(command);
}
addWorker方法,这个方法负责创建新的工作线程。Worker类是继承AQS,同时实现了Runnable,每个Worker持有一个线程,在run方法里不断从队列里取任务执行。
// 线程创建(addWorker)
private boolean addWorker(Runnable firstTask, boolean core) {// 创建 Worker 对象(包含 Thread 和初始任务)Worker w = new Worker(firstTask);Thread t = w.thread;workers.add(w); // 加入线程集合t.start(); // 启动线程
}private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread;Runnable firstTask;Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this); // 核心执行循环}
}
// 任务执行循环
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;while (task != null || (task = getTask()) != null) {try {task.run(); // 执行任务} finally {task = null; // 清空任务引用}}processWorkerExit(w, false); // 线程回收处理
}
【2】线程池里的线程是怎么阻塞的?
当线程执行完一个任务后,会通过getTask()方法从任务队列中获取下一个任务。若任务队列为空,阻塞队列的take()会阻塞线程,线程会进入阻塞状态等待新任务。而如果线程池正在关闭,可能就会返回null,导致Worker退出循环,线程结束。
ThreadPoolExecutor 中的线程通过 workQueue.take() 或 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法获取任务,这两个方法在队列为空时会使线程阻塞。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许线程被中断boolean completedAbruptly = true;try {// 循环获取任务并执行while (task != null || (task = getTask()) != null) { w.lock();// 检查线程池状态,确保线程在需要时被中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行任务前的钩子方法beforeExecute(wt, task); Throwable thrown = null;try {// 执行任务task.run(); } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行任务后的钩子方法afterExecute(task, thrown); }} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 处理线程退出逻辑processWorkerExit(w, completedAbruptly); }
}private Runnable getTask() {boolean timedOut = false; // 记录上次 poll 是否超时for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查线程池状态和任务队列情况if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 判断线程是否需要超时控制boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 根据是否需要超时控制选择获取任务的方法Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {// 响应中断信号timedOut = false;}}
}
当队列为空时,线程通过workQueue.take()阻塞
【3】线程池里的线程是怎么唤醒的?
当有新任务提交到任务队列时,会唤醒处于阻塞状态的线程。任务队列(如 LinkedBlockingQueue)使用 ReentrantLock 和 Condition 来实现线程的阻塞和唤醒机制。新任务加入队列时,会调用 Condition 的 signal() 或 signalAll() 方法唤醒等待的线程。
以 LinkedBlockingQueue 的 offer 方法为例:
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;// 如果队列已满,返回 falseif (count.get() == capacity) return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {// 再次检查队列是否未满if (count.get() < capacity) { // 将任务加入队列enqueue(node); c = count.getAndIncrement();// 如果队列还有空间,唤醒等待放入任务的线程if (c + 1 < capacity) notFull.signal();}} finally {putLock.unlock();}// 如果之前队列为空,唤醒等待获取任务的线程if (c == 0) signalNotEmpty();return c >= 0;
}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {// 唤醒等待获取任务的线程notEmpty.signal(); } finally {takeLock.unlock();}
}
当队列为空时,线程通过workQueue.take()阻塞
【4】线程池里的线程是怎么回收的?
线程池根据线程的空闲时间和配置参数回收线程。当线程空闲时间超过 keepAliveTime,且线程数大于核心线程数时,非核心线程会被回收。在 getTask 方法中,若使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法获取任务超时,线程会退出 runWorker 循环,最终被回收。
在 getTask 方法中:
private Runnable getTask() {boolean timedOut = false; // 记录上次 poll 是否超时for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查线程池状态和任务队列情况if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 判断线程是否需要超时控制boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 减少线程计数if (compareAndDecrementWorkerCount(c)) return null;continue;}try {// 根据是否需要超时控制选择获取任务的方法Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
【5】线程池如何实现线程复用?
线程池实现线程复用的核心在于避免频繁地创建和销毁线程,而是让线程在完成一个任务后继续去执行其他任务。下面详细介绍其底层原理:
(1)线程的创建和管理
线程池在初始化时会根据配置创建一定数量的核心线程。这些线程在创建后不会立即销毁,而是进入等待状态,随时准备执行新的任务。线程池使用一个内部的数据结构(通常是一个集合)来管理这些线程。
(2)任务队列
线程池中有一个任务队列,用于存储待执行的任务。当有新任务提交时,如果核心线程都在忙碌,任务会被放入任务队列中等待执行。
(3)线程的执行逻辑
每个线程在启动后会进入一个循环,不断从任务队列中获取任务并执行。当任务队列为空时,线程会进入阻塞状态,等待新任务的到来。一旦有新任务被添加到队列中,线程会被唤醒并继续执行任务。
(4)线程的复用
线程在完成一个任务后,不会终止,而是继续从任务队列中获取下一个任务。这样,同一个线程可以执行多个不同的任务,实现了线程的复用。
【6】线程池如何知道一个线程的任务已经执行完成
(1)基于任务执行逻辑
线程池中的线程在执行任务时,通常会遵循特定的执行逻辑。一般来说,线程会从任务队列中取出一个任务并调用其 run 方法(对于 Runnable 任务)或 call 方法(对于 Callable 任务),当这个方法正常返回或者抛出异常结束时,就意味着该任务执行完成,FutureTask会标记任务为完成,并调用done()方法。可以通过Future的isDone()方法来判断状态。
(2)线程池的状态管理
线程池内部会对线程和任务的状态进行管理。每个线程在执行任务时,线程池会记录该线程的状态,当任务执行完成后,线程池会更新相应的状态信息。
原理分析
1-任务队列操作:线程从任务队列中取出任务执行,当任务完成后,线程池会知道该任务已经从队列中移除并执行完毕。
2-线程状态标记:线程池可以通过内部的状态标记来记录线程是处于忙碌状态(正在执行任务)还是空闲状态(任务执行完成)。
(3)回调机制
有些线程池实现会使用回调机制来通知任务执行完成。例如,Future 接口和 CompletableFuture 类就提供了这样的功能。
bmit 方法返回一个 Future 对象,通过调用 isDone 方法可以检查任务是否执行完成。当 isDone 方法返回 true 时,就表示任务已经执行完成,可以通过 get 方法获取任务的执行结果。
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;protected void done() { // 任务完成时回调(可用于异步通知)}public boolean isDone() {return state != NEW; // 任何非NEW状态都表示完成}
}
通过FutureTask的状态机跟踪任务生命周期
isDone()方法检查state是否为非NEW状态