【ThreadPoolExecutor】源码阅读

一、生命周期

1)项目中是创建线程池的代码如下:

private ExecutorService accountDeleteExecutor = new ThreadPoolExecutor(// corePoolSize2, // maximumPoolSize Runtime.getRuntime().availableProcessors()*2, // keepAliveTime0L, // unitTimeUnit.MILLISECONDS,// workQueuenew LinkedBlockingQueue<Runnable>(),// threadFactorynew NamedThreadFactory("account_delete")
);

创建线程池源码分析:

/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.* @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set保持在线程池中的核心线程数量,也就是即使是空闲,也会保留下来不销毁。当然了,除非设置了:allowCoreThreadTimeout参数。* @param maximumPoolSize the maximum number of threads to allow in the*        pool允许保留在线程池中的最大线程的数量。理解:随着任务的增多线程是会不断添加,但是总得有个上限吧,这个就是啦!!!* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.如果线程池中的线程大于核心数量,超过核心数的空闲线程在销毁前存活的最大时间* @param unit the time unit for the {@code keepAliveTime} argument空闲时间的单位* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.在任务被执行之前,保留任务的队列,当然了,必须是通过execute方法提交的任务。* @param threadFactory the factory to use when the executor*        creates a new threadexecutor创建出一个新线程使用的线程工厂。* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached任务队列满的时候,如何处理接收的任务* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}理解:
可以看出来,传递完参数后,其实并没有创建任何线程,也就是创建线程是一个懒惰的过程,只有提交任务时,才会创建线程。

2)提交任务到任务队列 // java.util.concurrent.ThreadPoolExecutor#execute

使用:

 public void accountDeleteExecute(Runnable command){accountDeleteExecutor.execute(command);}

源码分析:

   /*** Executes the given task sometime in the future.  The task* may execute in a new thread or in an existing pooled thread.  在未来某一时刻执行给定的任务,任务可能在一个新线程执行也可能在一个池子中已经存在的线程执行。** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@link RejectedExecutionHandler}.提交的任务不会被执行的情况:1.executor被关闭2.容量达到上限,此时就会使用拒绝策略进行处理。     * @param command the task to execute* @throws RejectedExecutionException at discretion of*         {@code RejectedExecutionHandler}, if the task*         cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.如果运行着的线程数小于核心数,会尝试new一个新的线程,拿command作为第一个任务。addWorker方法会自动检查运行状态和任务的数量,在不该添加线程时返回false** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.如果一个任务成功入队,我们依然需要去双重检测一下是否我们应该添加一个新的线程,因为有可能最后一次检查,可能线程死了,或者当我们进入方法执行时,线程池被关闭,所以我们重新检查是否需要出队如果线程池被关闭了,或者开启一个新的线程。** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.如果我们不能入队,则我们尝试添加一个新的线程,失败的话,我们就知道现在线程池被关闭了或者饱和了,所以我们拒绝这个任务。*/// 这个值由: wrkerCount和runState组成int c = ctl.get();// 看下worker的数量是否小于核心线程数量if (workerCountOf(c) < corePoolSize) {// 尝试添加一个worker,添加成功就直接返回return了if (addWorker(command, true))return;c = ctl.get();}// 走到这里说明没添加新的线程,那么就把任务扔到任务队列中// 这里使用低x位去取得线程池运行状态if (isRunning(c) && workQueue.offer(command)) {// 添加成功后,重新检查下状态int recheck = ctl.get();// 状态不成功,则移除掉这个任务,并走拒绝策略if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)// 添加一个非核心线程addWorker(null, false);}else if (!addWorker(command, false))  // 尝试添加一个非核心线程// 线程没添加成功,则走拒绝策略reject(command);}

3)任务队列中取任务执行

源码分析(新建Worker,并处理第一个任务)

 /*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked.  If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// new一个Worker,其实这就是一个Runnable,作为Worker的第一个任务w = new Worker(firstTask);// 这个是Worker里面使用线程工厂new的线程,但是还没启动final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();// 新的Worker记录下workers.add(w);workerAdded = true;int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {mainLock.unlock();}if (workerAdded) { // Worker线程真正的启动t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

源码分析(Worker不断地取任务并执行)

/*** Main worker run loop.  Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters.  Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread's* UncaughtExceptionHandler).  Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/这个方法是在Worker的run方法中执行,所以就是不断出去任务执行final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

4)关闭

使用:

    @Overridepublic void shutdown(){if(accountDeleteExecutor != null){accountDeleteExecutor.shutdown();}}

线程池关闭源码分析

    /*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution.  Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException {@inheritDoc}*/public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);// 调用线程的interruptinterruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}

二、拒绝策略 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/387876.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ElasticSearch学习篇15_《检索技术核心20讲》进阶篇之TopK检索

背景 学习极客实践课程《检索技术核心20讲》https://time.geekbang.org/column/article/215243&#xff0c;文档形式记录笔记。 相关问题&#xff1a; ES全文检索是如何进行相关性打分的&#xff1f;ES中计算相关性得分的时机?如何加速TopK检索&#xff1f;三种思路 精准To…

eclipse ui bug

eclipse ui bug界面缺陷&#xff0c;可能项目过多&#xff0c;特别maven项目过多&#xff0c;下载&#xff0c;自动编译&#xff0c;加载更新界面异常 所有窗口死活Restore不回去了 1&#xff09;尝试创建项目&#xff0c;还原界面&#xff0c;失败 2&#xff09;关闭所有窗口&…

Python写UI自动化--playwright(pytest.ini配置)

在 pytest.ini 文件中配置 playwright 的选项可以更好地控制测试执行的过程。 在终端输入pytest --help&#xff0c;可以找到playwright的配置参数 目录 1. --browser{chromium,firefox,webkit} 2. --headed 3. --browser-channelBROWSER_CHANNEL 4. --slowmoSLOWMO 5. …

Photos框架 - 自定义媒体选择器(UI列表)

​​​​​​​Photos框架 - 自定义媒体资源选择器&#xff08;数据部分&#xff09; Photos框架 - 自定义媒体选择器&#xff08;UI列表&#xff09;​​​​​​​ Photos框架 - 自定义媒体选择器&#xff08;UI预览&#xff09; Photos框架 - 自定义媒体选择器&#xff0…

规划决策算法(四)---Frenet坐标系

知乎&#xff1a;坐标系转换 1.Frenet 坐标系 什么是 Frenet 坐标系&#xff1a; 为什么使用 Frenet 坐标系&#xff1a; 通常情况&#xff0c;我们只会关注车辆当前距离左右车道线的距离&#xff0c;来判断是否偏离车道&#xff0c;是否需要打方向盘进行方向微调。而不是基于…

【YashanDB知识库】yasdb jdbc驱动集成BeetISQL中间件,业务(java)报autoAssignKey failure异常

问题现象 BeetISQL中间件版本&#xff1a;2.13.8.RELEASE 客户在调用BeetISQL提供的api向yashandb的表中执行batch insert并将返回sequence设置到传入的java bean时&#xff0c;报如下异常&#xff1a; 问题的风险及影响 影响业务流程正常执行&#xff0c;无法获得batch ins…

matlab仿真 数字信号载波传输(下)

&#xff08;内容源自详解MATLAB&#xff0f;SIMULINK 通信系统建模与仿真 刘学勇编著第七 章内容&#xff0c;有兴趣的读者请阅读原书&#xff09; clear all M8; msg[1 4 3 0 7 5 2 6]; ts0.01; T1; %t0:ts:T; t0:ts:T-ts; %x0:ts:length(msg); x0:ts:length(msg)-ts; f…

决策树基础

概述 决策树是一种树型结构&#xff0c;其中每个内部结点表示在一个属性上的测试&#xff0c;每个分支代表一 个测试输出&#xff0c;每个叶结点代表一种类别。决策树学习采用的是自顶向下的递归方法&#xff0c;其基本思想是以信息熵为度量构造一棵熵值下降最快的树&#xff…

一层5x1神经网络绘制训练100轮后权重变化的图像

要完成这个任务&#xff0c;我们可以使用Python中的PyTorch库来建立一个简单的神经网络&#xff0c;网络结构只有一个输入层和一个输出层&#xff0c;输入层有5个节点&#xff0c;输出层有1个节点。训练过程中&#xff0c;我们将记录权重的变化&#xff0c;并在训练100轮后绘制…

github简单地操作

1.调节字体大小 选择options 选择text 选择select 选择你需要的参数就可以了。 2.配置用户名和邮箱 桌面右键&#xff0c;选择git Bash Here git config --global user.name 用户名 git config --global user.email 邮箱名 3.用git实现代码管理的过程 下载别人的项目 git …

反爬虫限制:有哪些方法可以保护网络爬虫不被限制?

目前&#xff0c;爬虫已经成为互联网数据获取最主流的方式。但为了保证爬虫顺利采集数据&#xff0c;需要防范网站的反爬虫机制&#xff0c;降低IP被限制的风险&#xff0c;这样才能提高爬虫工作的效率。那么&#xff0c;如何防止网络爬虫被限制呢&#xff1f;下面介绍几种有效…

dpdk发送udp报文

dpdk接收到udp报文后&#xff0c;自己构造一个udp报文&#xff0c;将收到的报文中的源mac&#xff0c;目的mac&#xff0c;源ip&#xff0c;目的ip&#xff0c;源端口和目的端口交换下顺序填充到新的udp报文中&#xff0c;报文中的负载数据和收到的udp保持一致。 注&#xff1…

Yarn UI 时间问题,相差8小时

位置 $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-2.6.1.jar 查看 jar tf hadoop-yarn-common-2.6.1.jar |grep yarn.dt.plugins.js webapps/static/yarn.dt.plugins.js 解压 jar -xvf hadoop-yarn-common-2.6.1.jar webapps/static/yarn.dt.plugins.js inflated: we…

【文件解析漏洞】实战详解!

漏洞描述&#xff1a; 文件解析漏洞是由于中间件错误的将任意格式的文件解析成网页可执行文件&#xff0c;配合文件上传漏洞进行GetShell的漏洞! IIS解析漏洞&#xff1a; IIS6.X&#xff1a; 方式一:目录解析 在网站下建立文件夹的名字为.asp/.asa 的文件夹&#xff0c;其目…

传输层(port)UDP/TCP——解决怎么发,发多少,出错了怎么办

**传输层&#xff1a;**负责数据能够从发送端传输接收端. 传输层所封装的报头里一定有&#xff1a;源端口号和目的端口号的。 **端口号&#xff1a;**可以标识一台主机中的唯一一个进程&#xff08;运用程序&#xff09;&#xff0c;这样当数据传输到传输层的时候就可以通过端…

单向链表(常规和带哨兵)

1.定义 在计算机科学中&#xff0c;链表是数据元素的线性集合&#xff0c;每个元素都指向下一个元素&#xff0c;元素存储上并不连续 2.分类 链表中还有一种特殊的节点称为哨兵结点&#xff0c;也叫哑元结点、首元结点&#xff0c;它不存储数据&#xff0c;通常用作头尾&…

艾体宝干货 | 如何分析关键网络性能指标?持续接收样品试用申请!

网络性能是企业顺利运营的重要基础&#xff0c;而Allegro流量分析仪作为一款强大的网络性能分析工具&#xff0c;为企业提供了深入了解网络运行状况的途径。在本文中&#xff0c;我们将探讨如何利用Allegro 流量分析仪分析关键网络性能指标&#xff0c;以优化网络性能、提高安全…

视频监控国标GB28181平台EasyGBS如何更换默认的SQLite数据库?

视频流媒体安防监控国标GB28181平台EasyGBS视频能力丰富&#xff0c;部署灵活&#xff0c;既能作为业务平台使用&#xff0c;也能作为安防监控视频能力层被业务管理平台调用。国标GB28181视频EasyGBS平台可提供流媒体接入、处理、转发等服务&#xff0c;支持内网、公网的安防视…

Apache2之Ubuntu-XXE漏洞渗透

一、配置靶场 第一步&#xff1a;打开kali&#xff0c;作为攻击机&#xff0c;打开是黑屏不要蒙圈&#xff0c;是正常的 第二步&#xff1a;配置局域网主机 探测局域网内的所有主机-- 1、查看虚拟机的网络配置 2、查看到我的子网地址为192.168.189.0 第三步&#xff1a;使用…

文心智能体零代码开发实践,创建一个智能体:从理论到实践AI技术落地

文心智能体引领零代码智能体开发新风尚&#xff0c;诚邀您一同探索这前沿科技的魅力&#xff01;以下为实践创建一个叫”从理论到实践AI技术落地“智能体的步骤。 首先登录官网&#xff1a;文心智能体平台AgentBuilder | 想象即现实 登录后点击&#xff1a;创建智能体 输入“…