并发编程之FutureTask.get()阻塞陷阱:深度解析线程池CPU飚高问题排查与解决方案

FutureTask.get方法阻塞陷阱:深度解析线程池CPU飚高问题排查与解决方法

  • FutureTask.get()方法阻塞陷阱:深度解析线程池CPU飚高问题排查与解决方法
    • 1、情景复现
      • 1.1 线程池工作原理
      • 1.2 业务场景模拟
      • 1.3 运行结果
      • 1.4 发现问题:线程池没有被关闭
      • 1.5 引发思考
    • 2、结合源码剖析 get 方法阻塞原因
      • 2.1 submit()方法提交任务
      • 2.2 FutureTask
        • 局部变量
        • 构造方法
        • FutureTask的run方法
        • FutureTask的get方法
        • FutureTask的get(timeout)方法
          • FutureTask的 awaitDone方法(核心)
        • FutureTask的report 方法
      • 2.3 解决方案

FutureTask.get()方法阻塞陷阱:深度解析线程池CPU飚高问题排查与解决方法

FutureTask的get()方法在多线程并发编程中应用场景还是蛮多的,作用是通过get方法阻塞直到获取到结果为止,而FutureTask一般是结合线程池来运行任务的,目的是由线程池统一管理和复用线程的资源。但是如果使用不当则会引发CPU飙升的问题? 接下来我们结合源码底层来剖析下到底会不会引发CPU飙升呢?

1、情景复现

1.1 线程池工作原理

在这里插入图片描述

1.2 业务场景模拟

结合上图线程池工作原理进行模拟场景:最大线程数为1,核心线程数为1,队列大小为1,也就是说当前线程池最多可以处理两个任务,如果大于两个任务,那么就会执行拒绝策略(注意此处是自定义拒绝策略,这里设置为打印日志,为FutureTask的get阻塞陷阱埋下伏笔)。

  • 自定义线程池配置
 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, // 核心线程数为11, // 最大线程数为12, // 非核心线程不工作时,存活的时间 2sTimeUnit.SECONDS,// 非核心线程不工作时,存活时间对应的时间单位new ArrayBlockingQueue<>(1), // 阻塞队列 容量大小为1new RejectedExecutionHandler() { // 自定义拒绝策略@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("====任务丢失啦啦啦===="); // 打印一行日志// throw new RejectedExecutionException("任务丢失啦啦啦"); // 或抛出异常提示}});
  • 提交任务执行
try {// 模拟任务执行List<Future<Integer>> futureList = Stream.of(2, 4, 6).map(num -> {System.out.println(Thread.currentThread().getName() + "<>>>>>> , 添加数字num(Begin):" + num);Future<Integer> future = poolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + ":=====任务开始执行=====Start!");try {// 模拟任务执行逻辑TimeUnit.SECONDS.sleep(num);} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + ">>任务被中断了,中断原因:" + e.getMessage());}System.out.println(Thread.currentThread().getName() + "=====任务执行完毕=====End!");return num;});System.out.println(Thread.currentThread().getName() + "<>>>>>> , 添加数字num(End):" + num);return future;}).collect(Collectors.toList());// 获取任务执行结果for (Future<Integer> future : futureList) {try {System.out.println(">>:" + future.get());} catch (InterruptedException | ExecutionException e) {System.out.println("=====获取任务执行结果失败=====,原因:" + e.getMessage());}}} catch (Exception e) {e.printStackTrace();} finally {poolExecutor.shutdown();}

1.3 运行结果

根据线程池的配置,最多处理的任务数量=最大线程数+阻塞队列 = 2,所以任务2和任务4会被处理,而任务6会根据拒绝策略输出一行日志。

在这里插入图片描述

1.4 发现问题:线程池没有被关闭

根据 1.3 输出的日志和运行结果截图分析可得:都是按照预想结果执行的,但问题是:为什么主线程任务为什么没有停止运行呢?因为业务逻辑使用了try…finally包裹,其中finally会关闭线程池的,按照正常执行逻辑是一定会关闭线程池的(因为我们代码中没有任何地方使用System.exit() 强制终止JVM)

在这里插入图片描述

结合以上运行截图可以发现,是由于拒绝策略中仅仅是打印了一行日志,导致FutureTask一直以为任务6还存活着,所以在调用futureTask的get方法时一直处于阻塞中,这是导致线程池没有关闭的直接原因。

1.5 引发思考

试想下,如果是在多线程环境下出现这种情况,那么线程池的CPU岂不是会持续飚高运行,从而直接影响服务器的处理性能(此时让我想到工作中有个万能公式:没有什么问题是重启解决不了的呢)。

既然我们已经清楚是因为 futureTask的get方法导致线程阻塞,下面我们继续结合源码来进行验证为什么会被阻塞?

2、结合源码剖析 get 方法阻塞原因

2.1 submit()方法提交任务

public <T> Future<T> submit(Callable<T> task) {// 若任务为空时,抛出空指针异常if (task == null) throw new NullPointerException();// 创建一个FutureTask对象RunnableFuture<T> ftask = newTaskFor(task);// 将该任务添加线程池中execute(ftask);// 返回FutureTask对象return ftask;
}
  • submit执行原理图
    在这里插入图片描述

2.2 FutureTask

在这里插入图片描述

局部变量
/*** Possible state transitions(可能的状态转换):NEW -> COMPLETING -> NORMAL(业务逻辑执行正常时)NEW -> COMPLETING -> EXCEPTIONAL(业务逻辑执行异常时)NEW -> CANCELLEDNEW -> INTERRUPTING -> INTERRUPTED*/
private volatile int state; // 被volatile关键字修饰,确保线程可见
private static final int NEW          = 0; // 首次submit方法提交任务时,初始化值为NEW
private static final int COMPLETING   = 1; // 
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
构造方法
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable
}
FutureTask的run方法

如果业务逻辑call执行分为两种:1、执行异常(NEW -> COMPLETING -> EXCEPTIONAL);2、正常执行(NEW -> COMPLETING -> NORMAL)

public void run() {// 若state不等于NEW 或 CAS 将期望值null设置为当前线程失败时,直接returnif (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 当前state等于NEW 或 CAS占用为当前线程成功Callable<V> c = callable;if (c != null && state == NEW) {// c代表当前Task,不等于空且state等于NEWV result;boolean ran;try {// 执行Task的业务逻辑result = c.call();// task执行成功,则将ran变量设置为trueran = true;} catch (Throwable ex) {// 如果Task执行异常,则将结果result置为空,ran变量设置为falseresult = null;ran = false;// 状态变更: NEW -> COMPLETING -> EXCEPTIONALsetException(ex);}// ran变量为true时, 状态变更为:NEW -> COMPLETING -> NORMALif (ran)set(result);}} finally {runner = null;int s = state;// INTERRUPTING值等于5// 若state 大于或等于 5 ,此时state状态为 INTERRUPTING 或 INTERRUPTEDif (s >= INTERRUPTING)// 如果 state等于INTERRUPTING(5)时,调用 Thread.yield() 方法,让出CPU的使用权// 当前线程状态由 运行状态(Running) 转化为 就绪状态(Runnable)。handlePossibleCancellationInterrupt(s);}}
FutureTask的get方法
public V get() throws InterruptedException, ExecutionException {int s = state; // 获取当前任务的 state 变量// 如果 state 变量的值 小于或等于 COMPLETING(1) 则进入 awaitDone (翻译为:等待完成)if (s <= COMPLETING)s = awaitDone(false, 0L);// 此处表示s 大于COMPLETING(1)return report(s);
}
FutureTask的get(timeout)方法
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();// 超时会抛出异常TimeoutExceptionreturn report(s);
}
FutureTask的 awaitDone方法(核心)
/*** <p>等待完成</p>* 根据 timed 参数分为两种情况:*     1、如果timed为true时,调用LockSupport.parkNanos(this, nanos);// 表示仅等待nanos时间*     2、如果timed为false时,则调用LockSupport.park(this); // 表示一直等待*/
private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;// 默认为false// 进入死循环for (;;) {// 当前线程被中断if (Thread.interrupted()) {// q不为空时移除waiterremoveWaiter(q);// 抛出中断异常InterruptedExceptionthrow new InterruptedException();}// 表示当前线程未被中断int s = state;// 如果 state 大于 COMPLETING(1) 时,// 则代表此时的state值为NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTEDif (s > COMPLETING) {// waitNode不为空时,将thread置为nullif (q != null)q.thread = null;// 返回当前的state值return s;}// 如果 state 值为COMPLETING时,则让出CPU的使用权else if (s == COMPLETING)Thread.yield();// 让出CPU的使用权else if (q == null)q = new WaitNode(); // 如果q等于空时,创建一个WaitNode节点else if (!queued)// 通过CAS(Compare-And-Swap)操作将当前线程的等待节点q插入到waiters链表中queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {// timed 为 true 时,调用LockSupport.parkNanos(this, nanos);nanos = deadline - System.nanoTime();if (nanos <= 0L) {// 如果小于或等于0,则表示等待时间到了removeWaiter(q);// 此时返回 statereturn state;}LockSupport.parkNanos(this, nanos);}else// 表示当前线程一直等待完成, 一直阻塞LockSupport.park(this);}
}
  • 引发阻塞的核心原因(LockSupport.park(this))

如果使用的get(timeout)方法,则使用 LockSupport.parkNanos(this, nanos); 会阻塞 nanos 时间后会释放锁;反之使用 get()方法,则使用LockSupport.park(this); 会一直阻塞

FutureTask的report 方法
private V report(int s) throws ExecutionException {Object x = outcome;// 如果 state 变量 等于 NORMAL,则返回结果值 Valueif (s == NORMAL)return (V)x;// state 大于或等于 CANCELLED,则state可能的值为:CANCELLED、INTERRUPTING、INTERRUPTEDif (s >= CANCELLED) throw new CancellationException();// 抛出异常// 抛出异常ExecutionExceptionthrow new ExecutionException((Throwable)x);
}

2.3 解决方案

在这里插入图片描述

重要事情讲三遍,注意、注意、注意:在使用get方法时首先需要结合线程池的拒绝策略,避免直接 使用get方法(导致线程一直阻塞中,进而引发服务器CPU飚高)。

综上所述是对FutureTask的get方法阻塞陷阱问题结合源码底层进行深度剖析,是我自己在工作中遇到的坑,如果你有用到这块知识,希望可以帮你避坑,当然如果有理解不到的地方望指正哟。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/44360.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记录vite引入sass预编译报错error during build: [vite:css] [sass] Undefined variable.问题

vite.config.ts resolve: {alias: {: path.resolve(__dirname, src),},},css: {// css预处理器preprocessorOptions: {scss: {additionalData: use "/assets/styles/block.scss" as *;,}}},block.scss $colorGreen: #00ff00;index.vue :v-deep .font-size-14{colo…

代码小练习

public class Test3 {public static void main(String[] args) throws ParseException {ArrayList<Integer> listnew ArrayList<>();Scanner scnew Scanner(System.in);while (true){System.out.println("请输入一个整数");String s sc.nextLine();int…

百人会上的蔚小理与「来的刚刚好」的雷军

这就是2025百人会上的蔚小理&#xff0c;努力的李斌、宣扬飞行汽车的何小鹏与大讲开源的李想。那么小米汽车的模式是什么呢&#xff1f;站在蔚小理的肩上。 这就是2025百人会上的蔚小理&#xff0c;努力的李斌、宣扬飞行汽车的何小鹏与大讲开源的李想。那么小米汽车的模式是什么…

日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习(3号通知)

日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习&#xff08;3号通知&#xff09; 日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习&#xff08;3号通知&#xff09;

<em>赚</em><em>钱</em><em>彩</em><em>票</em><em>软</em><em>件</em>

&#xff1c;em&#xff1e;赚&#xff1c;/em&#xff1e;&#xff1c;em&#xff1e;钱&#xff1c;/em&#xff1e;&#xff1c;em&#xff1e;彩&#xff1c;/em&#xff1e;&#xff1c;em&#xff1e;票&#xff1c;/em&#xff1e;&#xff1c;em&#xff1e;软&#xf…

随机2级域名引导页HTML源码

源码介绍 随机2级域名引导页HTML源码,每次点进去都随机一个域名前缀。 修改跳转域名在 350 行代码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行 效果预览 源码免费获取 随机2级域名引导页…

入栈操作-出栈操作

入栈操作 其 入栈操作 汇编代码流程解析如下&#xff1a; 出栈操作 其 出栈操作 汇编代码流程解析如下&#xff1a;

B3637 最长上升子序列

题目链接&#xff1a; 代码如下&#xff1a; #include<bits/stdc.h> #define int long long using namespace std; const int N 5050;int n; int arr[N]; int dp[N]; //dp数组signed main(){cin >> n;for(int i 1; i < n; i) cin >> arr[i];for(int i…

vscode通过root远程连接wsl

参考&#xff1a;vscode远程wsl时默认用root登录_vscode wsl root-CSDN博客

硬件基础--14_电功率

电功率 电功率:指电流在单位时间内做的功(表示用电器消耗电能快慢的一个物理量)。 单位:瓦特(W)&#xff0c;简称瓦。 公式:PUI(U为电压&#xff0c;单位为V&#xff0c;i为电流&#xff0c;单位为A&#xff0c;P为电功率&#xff0c;单位为W)。 单位换算:进位为1000&#xff…

【云服务器 | 下载 FFmpeg】云服务器上下载 ffmpeg + 配置

文章目录 FFmpeg 下载报错&#xff1a;已加载插件&#xff1a;fastestmirror1. 压缩包上传至服务器2. 解压3. 配置4. 添加FFmpeg到环境变量5. FFmpeg的配置5.1 安装 NASM5.2 安装x264 总结 可以看该博客&#xff0c;跟着这个步骤来的&#xff1a;https://blog.csdn.net/Aarstg/…

逆向--ARM64汇编

一、查看寄存器值 bl指令&#xff08;函数调用 bl的时候ret这个才有效&#xff09; 二、 bl 和lr 配合使用才达到函数调用的作用

【wow-rag系列】 task05 Ollama+llamaIndex+流式部署页面

文章目录 1.构建问答引擎2.构建基于FastAPI的后台3.构建流式输出的前端 1.构建问答引擎 新建一个engine.py文件 import os from llama_index.core.node_parser import SentenceSplitter# --------------------- # step 1.设定key、模型url、推理模型名称以及embedding模型名称 …

瑞芯微RKRGA(librga)Buffer API 分析

一、Buffer API 简介 在瑞芯微官方的 librga 库的手册中&#xff0c;有两组配置 buffer 的API&#xff1a; importbuffer 方式&#xff1a; importbuffer_virtualaddr importbuffer_physicaladdr importbuffer_fd wrapbuffer 方式&#xff1a; wrapbuffer_virtualaddr wrapb…

pycharm虚拟环境项目转移后配置解释器

添加解析器提示&#xff1a;无效的 Python SDK 解决方法 在到电脑安装python解析器&#xff0c;复制&#xff1a;python.exe和pythonw.exe 项目虚拟环境venv/Scripts Python解释器添加 项目现有虚拟环境&#xff0c;就可以正常使用

【智能体系统AgentOS】核心九:MCP工具

MCP&#xff08;Master Control Program&#xff09;是计算机控制系统中的核心部分&#xff0c;负责协调和管理整个系统的功能模块。不同的MCP可能会根据具体的应用场景有所不同&#xff0c;但通常有以下几类功能模块&#xff1a; 1. 输入输出&#xff08;I/O&#xff09;模块…

强化学习课程:stanford_cs234 学习笔记(2)introduction to RL

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言5、强化学习课程大纲5.1 课程内容主&#xff1a;5.2 马尔可夫决策过程&#xff1a;5.2.1 马尔可夫性 markov propterty5.2.2 马尔可夫过程 markov process5.2.3…

[Linux]在vim中批量注释与批量取消注释

1.在vim中批量注释的步骤&#xff1a; 1.在normal模式下按Ctrl v &#xff0c;进入V-BLOCK模式 2.按 J 键 或 K 键选择要注释的内容&#xff0c;J向上K向下 我们给第5&#xff0c;6&#xff0c;7行进行注释 3.按住shift i进入插入模式&#xff0c;输入 // 4.点击ESC键&…

16-CSS3新增选择器

知识目标 掌握属性选择器的使用掌握关系选择器的使用掌握结构化伪类选择器的使用掌握伪元素选择器的使用 如何减少文档内class属性和id属性的定义&#xff0c;使文档变得更加简洁&#xff1f; 可以通过属性选择器、关系选择器、结构化伪类选择器、伪元素选择器。 1. 属性选择…

Spring Initializr搭建spring boot项目

介绍 Spring Initializr 是一个用于快速生成 Spring Boot 项目结构的工具。它为开发者提供了一种便捷的方式&#xff0c;可以从预先定义的模板中创建一个新的 Spring Boot 应用程序&#xff0c;从而节省了从头开始设置项目的大量时间。 使用 Spring Initializr&#xff0c;你…