ThreadPoolExecutor线程池详解

ThreadPoolExecutor线程池详解

1. 背景

项目最近的迭代中使用到了ThreadPoolExecutor线程池,之前都只是知道怎么用,没有了解过线程池的底层原理,项目刚上线,有时间整理一下线程池的用法,学习一下线程池的底层实现与工作原理。

2. ThreadPoolExecutor工作原理

2.1 构造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

2.2 线程池的使用

  • worker
/*** @author itender* @date 2023/8/7 14:41* @desc*/
public class Worker implements Runnable {private String command;public Worker(String s) {this.command = s;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + command + " startTie = " + DateUtil.now());processCommand();System.out.println(Thread.currentThread().getName() + command +  " endTime = " + DateUtil.now());}private void processCommand() {try {Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + command +" 处理任务逻辑。。。。。。。。");} catch (InterruptedException e) {Thread.currentThread().interrupt();e.printStackTrace();}}@Overridepublic String toString() {return this.command;}
}
  • 线程池
/*** @author itender* @date 2023/8/7 14:37* @desc*/
public class ThreadPoolExecutorDemo {private static final int CORE_POOL_SIZE = 5;private static final int MAX_POOL_SIZE = 10;private static final int QUEUE_CAPACITY = 100;private static final Long KEEP_ALIVE_TIME = 1L;public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(// 核心线程数 5CORE_POOL_SIZE,// 最大线程数 10MAX_POOL_SIZE,// 超过核心线程数,线程最大存活时间KEEP_ALIVE_TIME,// 时间单位TimeUnit.MINUTES,// 工作队列最大值new ArrayBlockingQueue<>(QUEUE_CAPACITY),// 线程工厂,创建线程的时候使用r -> {Thread thread = new Thread(r);thread.setName("pool-");return thread;},new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 10; i++) {// 创建任务Worker myRunnable = new Worker("" + i);// 执行任务threadPoolExecutor.execute(myRunnable);}// 种植线程池,不接受新任务,但是有工作线程处理队列中的任务threadPoolExecutor.shutdown();while (!threadPoolExecutor.isTerminated()) {}System.out.println("Finished All Threads!");}
}

2.3 核心参数

2.3.1 核心参数详解

  • corePoolSize:核心线程数,任务队列没有达到队列最大容量时,最大可以同时运行的线程数。
  • maximumPoolSize:最大线程数。当任务队列中存储的任务达到队列的容量时,当前可以同时运行的线程数量变为最大线程数。
  • keepAliveTime:线程池中的线程数量超过corePoolSize时,如果没有新任务提交,核心线程外的线程不会立即销毁,而是等待,直到等待的时间超过了keepAliveTime才会被销毁回收。
  • unitkeepAliveTime参数的时间单位。
  • workQueue:工作队列。当有新的任务提交的时候,会先判断当前运行的线程数是否达到核心线程数,如果达到核心线程数,则会把新提交的任务放到工作队列中。
  • threadFactory:线程工厂,创建新的线程时会使用。
  • handler:拒绝策略。

2.3.2 拒绝策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:

  • AbortPolicy:默认拒绝策略。抛出RejectExecutionException来拒绝新任务的处理。
  • CallerRunsPolicy:调用当前提交任务的线程来执行任务。一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。
  • DiscardPolicy:不处理新任务,直接丢弃。
  • DiscardOldestPolicy:丢弃最早的未处理的任务。

2.4 执行流程

在这里插入图片描述

2.5 线程池状态

2.5.1 线程池核心属性ctl

	// ctl本质是 Integer 型变量,进行了原子性的封装// ctl表示两种状态:// 高3位:线程池当前的状态// 低29位:线程池当前工作线程的数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// COUNT_BITS 的值为 29(整型Integer.SIZE = 32 位);private static final int COUNT_BITS = Integer.SIZE - 3;// CAPACITY = (1 << 29) - 1; 1左移29位,减去1;即1*2^29-1;// 0001 1111 1111 1111 1111 1111 1111 1111// 低29位用来表示线程池的最大线程容量private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 高3位用来表示线程池5种状态// 111 运行状态private static final int RUNNING    = -1 << COUNT_BITS;// 000 shutdown状态private static final int SHUTDOWN   =  0 << COUNT_BITS;// 001 停止状态private static final int STOP       =  1 << COUNT_BITS;// 010 过渡状态private static final int TIDYING    =  2 << COUNT_BITS;// 011 中介状态private static final int TERMINATED =  3 << COUNT_BITS;// 根据ctl的值,计算当前线程池的状态// 计算方式:c 与 非capacityprivate static int runStateOf(int c)     { return c & ~CAPACITY; }// 根据ctl的值,计算线程池当前运行的线程的容量private static int workerCountOf(int c)  { return c & CAPACITY; }// 通过运行状态和工作线程数计算ctl的值,或运算private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {return c < s;private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}/*** Attempts to CAS-increment the workerCount field of ctl.*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** Attempts to CAS-decrement the workerCount field of ctl.*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/*** Decrements the workerCount field of ctl. This is called only on* abrupt termination of a thread (see processWorkerExit). Other* decrements are performed within getTask.*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}

2.5.2 状态切换

在这里插入图片描述

  • RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务。
  • SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown() 方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用 shutdown() 方法进入该状态)。
  • STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
  • TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入 TERMINATED 状态。
  • TERMINATED:在terminated()方法执行完后进入该状态,默认 terminated() 方法中什么也没有做。

3. 源码分析

3.1 execute方法

  • 源码
public void execute(Runnable command) {// 判断任务是否为空,如果任务为空,抛出空指针异常if (command == null)throw new NullPointerException();// 获取ctl属性int c = ctl.get();// 判断当前工作线程数量是否小于核心线程的数量if (workerCountOf(c) < corePoolSize) {// 工作线程数小于核心线程数,创建一个核心线程执行command任务if (addWorker(command, true))// 创建核心线程成功,直接返回return;// 并发情况下添加核心线程失败,需要重新获取ctl属性c = ctl.get();}// 创建核心线程失败,当前工作线程数量大于或等于核心线程数量corePoolSize// 判断线程池的状态是否为running,如果是添加任务到工作队列中(放入任务失败返回false)if (isRunning(c) && workQueue.offer(command)) {// 任务添加到队列成功,再次获取ctl属性int recheck = ctl.get();// 二次检查,判断线程池的状态是否为running,如果不是队列中移除刚刚添加的任务if (!isRunning(recheck) && remove(command))// 执行拒绝策略reject(command);// 1.任务添加到队列// 2.线程池可能是running状态// 3.传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了)// 判断工作线程数量是否为0else if (workerCountOf(recheck) == 0)// 工作线程数量为0// 工作队列中有任务在排队,添加一个空任务,创建非核心线程执行队列中等待的任务addWorker(null, false);}// 创建核心线程失败,// 线程池状态不是running状态// 线程池可能是running状态,但是任务队列已经满了// 添加任务到工作队列失败,创建非核心线程执行任务else if (!addWorker(command, false))// 创建非核心线程失败,执行拒绝策略reject(command);}

第一点核心:通过execute方法源码可以看出线程池具体的执行流程,以及一些避免并发情况的判断。

第二点核心:线程池为什么会添加空任务非核心线程到线程池。

这里是一个疑惑点:为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:

如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。

3.2 addWorker方法

  • 源码
private boolean addWorker(Runnable firstTask, boolean core) {// for循环标识// 对线程池当前状态和当前工作线程数量的判断retry:for (;;) {// 获取线程池的状态int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取线程池工作线程的数量int wc = workerCountOf(c);// 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败// 2. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败// core参数为false说明工作队列已经满了,线程池大小变为maximumPoolSize最大线程数if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS更新工作线程数wc,原子操作将workCount的数量加1,更新成功则直接跳出最外层循环if (compareAndIncrementWorkerCount(c))break retry;// CAS更新工作线程数失败,判断线程池的状态是否从running编程shutdown,如果线程池的状态改变了在执行上面的操作c = ctl.get();  // Re-read ctl// 如果线程池状态已经变成shutdown,跳过最外层本次循环,执行下一次循环if (runStateOf(c) != rs)continue retry;// 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可 // else CAS failed due to workerCount change; retry inner loop}}// 工作线程是否启动成功boolean workerStarted = false;// 工作线程是否创建成功boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 加锁,因为会改变一些指标值和非线程安全的集合final ReentrantLock mainLock = this.mainLock;// 加锁mainLock.lock();try {// 获取线程池状态int rs = runStateOf(ctl.get());//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker// 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker// 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象// firstTask == null证明只新建线程而不执行任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 将新建的工作线程添加到工作线程的集合workers.add(w);// 更新当前工作线程的最大容量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 工作线程是否添加成功workerAdded = true;}} finally {// 释放锁mainLock.unlock();}// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例if (workerAdded) {// 启动线程,标识线程启动成功t.start();workerStarted = true;}}} finally {// 线程启动失败,需要从工作线程中移除对应的Workerif (!workerStarted)addWorkerFailed(w);}return workerStarted;
}

4. 线程池常见问题

4.1 execute()submit()的区别

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  • submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException

4.3 阻塞队列的作用

  • 一般的队列只能是有限长度的缓冲区,一旦超出缓冲长度,就无法保留了。阻塞队列通过阻塞可以保留住当前想要继续入队的任务。

  • 阻塞队列可以在队列中没有任务时,阻塞想要获取任务的线程,使其进入wait状态,释放cpu资源。

  • 阻塞队列带有阻塞和唤醒的功能,不需要额外处理,无任务执行时,线程池利用阻塞队列的take方法挂起,从而维持核心线程的存活,不至于一直占用cpu资源。

4.2 为什么先添加队列而不是先创建最大线程

  • 在创建新线程的时候,是要获取全局锁的,这时候其他线程会被阻塞,影响整体效率。

  • 在核心线程已满时,如果任务继续增加那么放在队列中,等队列满了而任务还在增加那么就要创建临时线程了,这样代价低。

5. 参考文章

https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/

https://javaguide.cn/java/concurrent/java-thread-pool-summary.html#%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/81682.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

运行 Jmeter 文件生成 HTML 测试报告,我选择 ANT 工具

概述 ant 是一个将软件编译、测试、部署等步骤联系在一起加以自动化的一个工具&#xff0c;大多用于 Java 环境中的软件开发。 在与 Jmeter 生成的 jmx 文件配合使用中&#xff0c;ant 会完成jmx计划的执行和生成jtl文件&#xff0c;并将jtl文件转化为html页面进行查看。 还可…

Node.js |(三)Node.js API:path模块及Node.js 模块化 | 尚硅谷2023版Node.js零基础视频教程

学习视频&#xff1a;尚硅谷2023版Node.js零基础视频教程&#xff0c;nodejs新手到高手 文章目录 &#x1f4da;path模块&#x1f4da;Node.js模块化&#x1f407;介绍&#x1f407;模块暴露数据⭐️模块初体验⭐️暴露数据 &#x1f407;导入文件模块&#x1f407;导入文件夹的…

CAPL - XML和TestModule结合实现测试项可选

目录 目的:是否想实现如下面的功能呢? 一、.can和.cin文件中函数开发

C/C++面试总结

一、关键字static、const、extern、volatile作用 1、const 1.修饰常量 用const修饰的变量是不可变的&#xff0c;修饰后的变量只能使用&#xff0c;不能修改。 2.修饰指针 如果const位于*的左侧&#xff0c;eg&#xff1a;const int* a&#xff0c;则const就是用来修饰指针…

研发工程师玩转Kubernetes——hostPath

有别于《研发工程师玩转Kubernetes——emptyDir》一文中介绍的emptyDir&#xff0c;hostPath可以在同一个Node的不同Pod间共享卷。 下面的清单文件利用了Pod亲和性&#xff0c;让Pod集中到一个Node上。 apiVersion: apps/v1 kind: Deployment metadata:name: hostpath-deploy…

Adobe ColdFusion 反序列化漏洞复现(CVE-2023-29300)

0x01 产品简介 Adobe ColdFusion是美国奥多比&#xff08;Adobe&#xff09;公司的一套快速应用程序开发平台。该平台包括集成开发环境和脚本语言。 0x02 漏洞概述 Adobe ColdFusion存在代码问题漏洞&#xff0c;该漏洞源于受到不受信任数据反序列化漏洞的影响&#xff0c;攻击…

FinClip 支持小程序维度域名配置;桌面端体验活动进行中

FinClip 的使命是使您&#xff08;业务专家和开发人员&#xff09;能够通过小程序解决关键业务流程挑战&#xff0c;并完成数字化转型的相关操作。不妨让我们看看在本月的产品与市场发布亮点&#xff0c;看看是否有助于您实现目标。 产品方面的相关动向&#x1f447;&#x1f…

Kafka概论

前言 任何消息中间件&#xff0c;除了基础组件架构外&#xff0c;核心特性无非三个&#xff0c;消息可靠性、消息模型、吞吐量&#xff0c;本文要聊的正是这些东西&#xff0c;其余诸如API、下载安装、集群搭建等都是死的&#xff0c;而且会随着版本的变动而改变&#xff0c;这…

uni-app 封装api请求

前端封装api请求 前端封装 API 请求可以提高代码的可维护性和重用性&#xff0c;同时使得 API 调用更加简洁和易用。 下面是一种常见的前端封装 API 请求的方式&#xff1a; 创建一个 API 封装模块或类&#xff1a;可以使用 JavaScript 或 TypeScript 创建一个独立的模块或类来…

VoxWeekly|The Sandbox 生态周报|20230731

欢迎来到由 The Sandbox 发布的《VoxWeekly》。我们会在每周发布&#xff0c;对上一周 The Sandbox 生态系统所发生的事情进行总结。 如果你喜欢我们内容&#xff0c;欢迎与朋友和家人分享。请订阅我们的 Medium 、关注我们的 Twitter&#xff0c;并加入 Discord 社区&#xf…

架构实践方法

一、识别复杂度 将主要的复杂度问题列出来&#xff0c;然后根据业务、技术、团队等综合情况进行排序&#xff0c;优先解决当前面临的最主要的复杂度问题。对于按照复杂度优先级解决的方式&#xff0c;存在一个普遍的担忧&#xff1a;如果按照优先级来解决复杂度&#xff0c;可…

【MATLAB第68期】基于MATLAB的LSTM长短期记忆网络多变量时间序列数据多步预测含预测未来(非单步预测)

【MATLAB第68期】基于MATLAB的LSTM长短期记忆网络多变量时间序列数据多步预测含预测未来&#xff08;非单步预测&#xff09; 输入前25个时间&#xff0c;输出后5个时间 一、数据转换 1、原始数据 5列时间序列数据&#xff0c;70行样本 705 数据矩阵结构 2、数据转换 将…

利用awk筛选给定时间范围内的日志

时间戳 什么是时间戳&#xff1f; ​ 时间戳是指格林威治时间自1970年1月1日&#xff08;00:00:00 GMT&#xff09;至当前时间的总秒数。它也被称为Unix时间戳&#xff08;Unix Timestamp&#xff09;。通俗的讲&#xff0c;时间戳是一份能够表示一份数据在一个特定时间点已经存…

【数据分析专栏之Python篇】五、pandas数据结构之Series

前言 大家好&#xff01;本期跟大家分享的知识是 Pandas 数据结构—Series。 一、Series的创建 Series 是一种类似于一维数组的对象&#xff0c;由下面两部分组成&#xff1a; values&#xff1a;一组数据&#xff0c;ndarray 类型index&#xff1a;数据索引 顾名思义&…

STM32——STM32F401x系列标准库的下载+环境搭建+建工程步骤(更完整)

文章目录 标准库的下载环境搭建建工程最后的话 标准库的下载 1.STM32标准库的官网下载网站https://www.st.com/content/st_com/en.html 2. 3. 4. 5. 6. 7.点击之后下滑 8.选择自己需要的版本下载 环境搭建建工程 大致步骤同之前我写的一篇STM32——建工程差不多&#xff0…

Vue 本地应用 记事本 v-on v-model v-for使用

vue当中如何生成列表结构&#xff1f;使用的指令是v-for&#xff0c;同时要有一个可以生成列表的数据&#xff0c;常用的是数组。记事本里面的内容并不复杂&#xff0c;所以这里使用字符串数组就行了。 获取用户输入的内容使用绑定v-model&#xff0c;双向数据绑定&#xff08…

【IMX6ULL驱动开发学习】02.hello驱动程序之cdev注册字符设备驱动程序和设置次设备号

目录 ​编辑 一、register_chrdev 二、解决方法 2.1 alloc_chrdev_region函数&#xff1a;注册一系列字符设备编号 2.2 cdev_init函数&#xff1a;初始化cdev结构体 2.3 cdev_add函数&#xff1a;将字符设备添加到系统中 三、驱动程序 一、register_chrdev major reg…

NAND Flash 失效之 Data Rentention | 闪存数据保持力 | 数据放几年就坏掉了?

依公知及经验整理,原创保护,禁止转载。专栏 《深入理解Flash:闪存特性与实践》 图1: Data Retention 对 Vt 电压分布影响 图片来源: 知乎 [2] 全文 1900 字, 内容摘要 Data Retention 产生 Data Retention 的影响因素  如何规避 Data Rention 问题 发生Data Retent…

MyCat分片规则——应用指定分片规则、日期分片、固定分片hash算法

1.应用指定分片规则 2.固定分片hash算法 3.字符串hash解析 4.按天&#xff08;日期&#xff09;分片 5.按自然月进行分片

Django入门 - Http协议前后端交互

Http协议前后端交互 在前后端交互当中&#xff0c;前端后端用的协议是http协议 先请求后响应&#xff0c;响应完之后连接就会断开我们可以认为它其实是一个短连接 或者 无连接。在它内部其实是基于TCP协议的&#xff0c;它也会有三次握手&#xff0c;但是这是内部的&#xff…