源码角度看待线程池的执行流程

文章目录

  • 前言
  • 一、线程池的相关接口和实现类
    • 1.Executor接口
    • 2.ExecutorService接口
    • 3.AbstractExecutorService接口
    • 4.ThreadPoolExecutor 实现类
  • 二、ThreadPoolExecutor源码解析
    • 1.Worker内部类
    • 2.execute()方法
    • 3.addWorker()方法
  • 总结


前言

线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。
线程池在多线程编程中扮演着重要角色,它能够管理和复用线程,提高并发执行效率。

在之前的学习中,我们知道了线程池的基本流程如下,而今天我们则用这个流程配合着源码的来重新分析线程池的执行流程。
在这里插入图片描述


一、线程池的相关接口和实现类

1.Executor接口

public interface Executor {/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command);
}

Executor接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()方法。未来线程池中所有的线程任务,都将由exuecte()方法来执行。

2.ExecutorService接口

public interface ExecutorService extends Executor {//.....
}

ExecutorService接口继承了Executor接口,扩展了awaitTermination()、submit()、shutdown()等专门用于管理线程任务的方法。

3.AbstractExecutorService接口

public abstract class AbstractExecutorService implements ExecutorService {//....
}

ExecutorService接口的抽象实现类AbstractExecutorService,为不同的线程池实现类,提供submit()、invokeAll()等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()执行策略不同,所以在AbstractExecutorService并未提供该方法的具体实现。

4.ThreadPoolExecutor 实现类

public class ThreadPoolExecutor extends AbstractExecutorService {//...
}

ThreadPoolExecutor实现类是AbstractExecutorService接口的的两个重要实现类之一,(ForkJoinPool是另一个),也是要掌握的关于线程池的重点区域;
ThreadPoolExecutor线程池通过Woker工作线程、BlockingQueue阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;


二、ThreadPoolExecutor源码解析

在对源码进行解析之前,我们先看看官方给我们关于ThreadPoolExecutor的解析是什么:

The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* …

这句话什么意思呢?
主池控制状态ctl是一个原子整数封装?

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

再结合ctl的实例化,我们这个发现这个ctl是一个具有原子性的整数,再往后就是告诉我们这个具有原子性的整数是由两个概念组成的:workerCount工作线程数和runState运行状态。
他是一个32位的整数,具体表示形式位:
在这里插入图片描述
所以说,它的作用就是通过位运算来存储线程池的状态和活动线程数信息

1.Worker内部类

每个Woker类的对象,都代表线程池中的一个工作线程。
Worker类是ThreadPoolExecutor类中定义的一个私有内部类,保存了每个Worker工作线程要执行的Runnable线程任务和Thread线程对象。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {	private static final long serialVersionUID = 6138294804551838833L;//用于存储当前工作线程的引用。final Thread thread;//用于存储该工作线程要执行的第一个任务。Runnable firstTask;//用于记录该工作线程已经完成的任务数量volatile long completedTasks;//初始化工作线程的状态,设置第一个任务并创建一个线程对象Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//Runnable 接口的 run 方法,调用了 runWorker(this),将工作线程自身作为参数传递给 runWorker方法public void run() {runWorker(this);}//因为继承了AbstractQueuedSynchronizer 类,一下方法都是基于线程安全的方法//.....       }

当ThreadPoolExecutor线程池,通过exeute()方法执行1个线程任务时,会调用addWorker()方法创建一个Woker工作线程对象。并且,创建好的Worker工作线程对象,会被添加到一个HashSet workders工作线程集合,统一由线程池进行管理。
当Worker工作线程,在第一次执行完成线程任务后,这个Worker工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()方法,获取阻塞工作队列中新的Runnable线程任务,并通过当前Worker工作线程中所绑定Thread线程,完成新线程任务的执行,从而实现了线程池的中Thread线程的重复使用

2.execute()方法

ThreadPoolExecutor线程池中,会通过execute(Runnable command)方法执行Runnable类型的线程任务。
在分析execute()方法之前,也来看看官方是怎么解释这个方法的:

    /** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/

简单来说就是将execute()方法分成三个步骤:
1.如果,工作线程的数量小于核心线程数,则通过addWorker()方法,创建新的Worker工作线程,并添加至workers工作线程集合;
2.如果一个任务可以成功排队(工作线程的数量大于核心线程数),并且线程池处于RUNNING状态,那么,线程池会将Runnable类型的线程任务,缓存至workQueue阻塞工作队列,等待某个空闲工作线程获取并执行该任务;
3.如果我们不能排队任务,那么我们尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了,因此拒绝该任务。

源码:

/* *@param 要提交给线程池的任务
*/
public void execute(Runnable command) {//如果传入的任务为空,抛出空指针异常。if (command == null)throw new NullPointerException();//获取当前线程池的状态和活动线程数。int c = ctl.get();//如果当前线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {//如果可以通过addWorker方法创建一个新的工作线程来执行任务//因为当前线程数小于核心线程数,所以第二个参数穿入true代表创建的是核心线程if (addWorker(command, true))return;//创建成功,直接返回。//创建失败,获取更新后的线程池状态c = ctl.get();}//如果线程池状态是运行中且工作队列能够接受新任务if (isRunning(c) && workQueue.offer(command)) {//任务进入工作队列//重新获取线程池的状态和工作线程数int recheck = ctl.get();//如果线程池不是运行状态,则删除任务if (! isRunning(recheck) && remove(command))//执行拒绝策略reject(command);// 如果工作线程数等于零,通过addWorker()方法检查线程池状态和工作队列else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果无法将任务添加到队列中,也无法创建新的工作线程,那么拒绝任务的执行else if (!addWorker(command, false))reject(command);}

3.addWorker()方法

在分析execute()方法中,发现execute()方法多次调用了addWorker()创建一个工作线程,用于执行当前线程任务。
addWorker()可以分为两个执行部分,检查线程池的状态和工作线程数量和创建并执行工作线程。
第1部分:检查线程池的状态和工作线程数量

//参数:1.传入的任务,2.是否创建核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出retry:for (;;) {//获取线程池运行状态int c = ctl.get();int rs = runStateOf(c);//如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;//检查工作线程数量for (;;) {//获取当前工作线程数int wc = workerCountOf(c);//如果工作线程数量如果超出线程池的最大容量或者核心线程数(最大线程数)//三元运算符表示的是当前要的是核心线程还是非核心线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//不再创建新的线程//通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环if (compareAndIncrementWorkerCount(c))break retry;//再次获取线程池状态,检查是否发生变化c = ctl.get();  if (runStateOf(c) != rs)continue retry;//...}}}

第二部分:创建并执行线程工程

		//....//用于判断工作线程是否启动和保存boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建新工作线程,并通过线程工厂创建Thread线程w = new Worker(firstTask);//获取新工作线程的Thread线程对象,用于启动真正的线程final Thread t = w.thread;   if (t != null) {//获取线程池的reentrantLock主锁,保证线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查线程池运行状态int rs = runStateOf(ctl.get());//如果线程池状态小于关闭状态或者线程池状态为关闭且没有初始任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果工作线程已经在运行(存活),抛出非法线程状态异常。if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//保存工作线程workers.add(w);//记录线程池的最大工作线程数int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//正式启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {//如果工作线程没有成功启动,则调用添加失败的方法if (! workerStarted)addWorkerFailed(w);}//返回线程启动状态return workerStarted;

总结

execute()方法是ThreadPoolExecutor线程池执行的开始,它完整实现了Executor接口定义execute()方法,这个方法作用是执行一个Runnable类型的线程任务。整体的执行流程是:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/112799.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++笔记之静态成员函数可以在类外部访问私有构造函数吗?

C笔记之静态成员函数可以在类外部访问私有构造函数吗&#xff1f; 参考笔记&#xff1a; 1.C笔记之静态成员函数可以在类外部访问私有构造函数吗&#xff1f; 2.C笔记之设计模式&#xff1a;setter函数、依赖注入 3.C笔记之两个类的实例之间传递参数——通过构造函数传递类对象…

Springboot集成Docker并将镜像推送linux服务器

案例使用springboot项目&#xff0c;在IDEA 中集成Docker生成镜像&#xff0c;并将镜像发布到linux服务器 具体步骤如下&#xff1a; 1、Centos7安装Docker 更新系统的软件包列表 sudo yum update安装Docker所需的软件包和依赖项&#xff1a; sudo yum install docker完成…

说说FLINK细粒度滑动窗口如何处理

分析&回答 Flink的窗口机制是其底层核心之一&#xff0c;也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类&#xff0c;下面的类图示出了Flink能够提供的所有窗口类型。 Flink窗口分为滚动&#xff08;tumbling&#xff09;、滑动&#xff08;sliding&am…

postgresql-子查询

postgresql-子查询 简介派生表IN 操作符ALL 操作符ANY 操作符关联子查询横向子查询EXISTS 操作符 简介 子查询&#xff08;Subquery&#xff09;是指嵌套在其他 SELECT、INSERT、UPDATE 以及 DELETE 语句中的 查询语句。 子查询的作用与多表连接查询有点类似&#xff0c;也是为…

NTP时钟同步服务器

目录 一、什么是NTP&#xff1f; 二、计算机时间分类 三、NTP如何工作&#xff1f; 四、NTP时钟同步方式&#xff08;linux&#xff09; 五、时间同步实现软件&#xff08;既是客户端软件也是服务端软件&#xff09; 六、chrony时钟同步软件介绍 七、/etc/chrony.conf配置文件介…

JVM内存管理、内存分区:堆、方法区、虚拟机栈、本地方法栈、程序计数器

内存管理 内存分区 线程共享 堆 存放实例&#xff0c;字符串常量&#xff08;直接引用&#xff09;&#xff0c;静态变量&#xff0c;线程分配缓冲区&#xff08;TLAB线程私有&#xff09;。垃圾收集器管理的区域 方法区 非堆&#xff0c;和堆相对的概念。存储已被虚拟机加载的…

uniapp的 picker 日期时间选择器

效果图&#xff1a; dateTimePicker.js function withData(param){return param < 10 ? 0 param : param; } function getLoopArray(start,end){var start start || 0;var end end || 1;var array [];for (var i start; i < end; i) {array.push(withData(i))…

【ACM出版】第四届人工智能与计算工程国际学术会议(ICAICE 2023)

ACM出版|第四届人工智能与计算工程国际学术会议 The 4th International Conference on Artificial Intelligence and Computer Engineering 为了在人工智能技术应用与计算工程领域进一步的探索&#xff0c;与国内外学界和业界相关人员交流新问题、新发现、新成果、新应用&…

__call__函数

一、定义 在Python中&#xff0c;__call__函数是一个特殊的方法&#xff0c;用于使一个对象可以像函数一样被调用。当一个对象定义了__call__方法时&#xff0c;它就成为了一个可调用对象。 二、使用 class Counter:def __init__(self):self.count 0def __call__(self):sel…

查局域网所有占用IP

查局域网所有占用IP 按&#xff1a;winr 出现下面界面&#xff0c;在文本框中输入 cmd 按确定即可出现cmd命令界面 在cmd命令窗口输入你想要ping的网段&#xff0c;下面192.168.20.%i即为你想要ping的网段&#xff0c;%i代表0-255 for /L %i IN (1,1,254) DO ping -w 1 -n 1…

优化物料编码规则,提升物料管理效率

导 读 ( 文/ 2358 ) 物料是生产过程的必需品。对物料进行身份的唯一标识&#xff0c;可以更好的管理物料库存、库位&#xff0c;更方便的对物料进行追溯。通过编码规则的设计&#xff0c;可以对物料按照不同的属性、类别或特征进行分类&#xff0c;从而更好地进行库存分析、计划…

业务需要咨询?开发遇到 bug 想反馈?开发者在线提单功能上线!

大家是否遇到过下列问题—— 在开发的时候&#xff0c;遇到 bug 需要反馈… 有合作意向的时候&#xff0c;想更多了解业务和相关产品… 在接入的时候&#xff0c;需要得到专业技术支持… 别急&#xff0c;荣耀开发者服务平台在线提单功能上线了~ 处理问题分类说明&#xff1…

ElasticSearch(一)数据类型

ElasticSearch&#xff08;一&#xff09;数据类型 1.简述 Es数据类型分为基础数据类型和复杂类型数据&#xff0c;掌握ES数据类型才能进一步使用ES检索数据内容。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot…

阿里云申请免费SSL证书的两种验证方式及配置服务器Tomcat升级HTTPS协议

通用教程&#xff0c;其他服务商的免费 SSL 证书也差不多是这个流程。&#xff08;至少腾讯云的操作步骤和本文是一致&#xff0c;嘻嘻&#xff01;&#xff09; 申请 SSL 证书 首先在阿里云上创建并申请 SSL 证书&#xff0c;之后选择 DNS 验证的方式&#xff0c;一种是手动配…

【Go 基础篇】Go语言结构体详解:打开自定义类型的大门

嗨&#xff0c;Go语言学习者们&#xff01;在编程的世界里&#xff0c;数据是核心&#xff0c;而结构体&#xff08;Struct&#xff09;是一种能够帮助我们更有组织地存储和操作数据的重要工具。在本篇博客中&#xff0c;我们将深入探讨Go语言中结构体的概念、定义、初始化、嵌…

PYTHON链家租房数据分析:岭回归、LASSO、随机森林、XGBOOST、KERAS神经网络、KMEANS聚类、地理可视化...

全文下载链接:http://tecdat.cn/?p29480 作者&#xff1a;Xingsheng Yang 1 利用 python 爬取链家网公开的租房数据&#xff1b; 2 对租房信息进行分析&#xff0c;主要对房租相关特征进行分析&#xff0c;并搭建模型用于预测房租&#xff08;点击文末“阅读原文”获取完整代码…

自动化运维:Ansible脚本之playbook剧本

目录 一、理论 1.playbooks 2.YAML 3.使用ansible批量安装apache服务 4.定义、引用变量 5.指定远程主机sudo切换用户 6.when条件判断 7.迭代 8.Templates 模块 9.tags 模块 10.Roles 模块 二、实验 1.使用ansible批量安装apache服务 2.定义、引用变量…

k8s之存储篇---数据卷Volume

数据卷概述 Kubernetes Volume&#xff08;数据卷&#xff09;主要解决了如下两方面问题&#xff1a; 数据持久性&#xff1a;通常情况下&#xff0c;容器运行起来之后&#xff0c;写入到其文件系统的文件暂时性的。当容器崩溃后&#xff0c;kubelet 将会重启该容器&#xff…

深度学习4. 循环神经网络 – Recurrent Neural Network | RNN

目录 循环神经网络 – Recurrent Neural Network | RNN 为什么需要 RNN &#xff1f;独特价值是什么&#xff1f; RNN 的基本原理 RNN 的优化算法 RNN 到 LSTM – 长短期记忆网络 从 LSTM 到 GRU RNN 的应用和使用场景 总结 百度百科维基百科 循环神经网络 – Recurre…

无涯教程-Android - List View函数

Android ListView 是垂直滚动列表中显示的视图&#xff0c;使用 Adapter 从列表(如数组或数据库)中获取内容的列表项会自动插入列表中。 适配器(Adapter)实际上是UI组件和将数据填充到UI组件中的数据源之间的桥梁&#xff0c;适配器保存数据并将数据发送到适配器视图&#xff0…