Java的Future机制详解

Java的Future机制详解

  • 一、为什么出现Future机制
  • 二、Future的相关类图
    • 2.1 Future 接口
    • 2.2 FutureTask 类
  • 三、FutureTask的使用方法
  • 四、FutureTask源码分析
    • 4.1 state字段
    • 4.2 其他变量
    • 4.4 构造函数
    • 4.5 run方法及其他

一、为什么出现Future机制

常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

在这里插入图片描述

上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

二、Future的相关类图

2.1 Future 接口

首先,我们需要清楚,Futrue是个接口。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

在这里插入图片描述

接口定义行为,我们通过上图可以看到实现Future接口的子类会具有哪些行为:

  • 我们可以取消这个执行逻辑,如果这个逻辑已经正在执行,提供可选的参数来控制是否取消已经正在执行的逻辑。
  • 我们可以判断执行逻辑是否已经被取消。
  • 我们可以判断执行逻辑是否已经执行完成。
  • 我们可以获取执行逻辑的执行结果。
  • 我们可以允许在一定时间内去等待获取执行结果,如果超过这个时间,抛TimeoutException。

2.2 FutureTask 类

类图如下:

在这里插入图片描述

FutureTask是Future的具体实现。FutureTask实现了RunnableFuture接口。RunnableFuture接口又同时继承了Runnable 和 Future 接口。所以FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

三、FutureTask的使用方法

举个例子,假设我们要执行一个算法,算法需要两个输入 input1 和 input2, 但是input1和input2需要经过一个非常耗时的运算才能得出。由于算法必须要两个输入都存在,才能给出输出,所以我们必须等待两个输入的产生。接下来就模仿一下这个过程。

package src;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class FutureTaskTest {public static void main(String[] args) throws InterruptedException, ExecutionException {long starttime = System.currentTimeMillis();//input2生成, 需要耗费3秒FutureTask<Integer> input2_futuretask = new FutureTask<>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Thread.sleep(3000);return 5;}});new Thread(input2_futuretask).start();//input1生成,需要耗费2秒FutureTask<Integer> input1_futuretask = new FutureTask<>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Thread.sleep(2000);return 3;}});new Thread(input1_futuretask).start();Integer integer2 = input2_futuretask.get();Integer integer1 = input1_futuretask.get();System.out.println(algorithm(integer1, integer2));long endtime = System.currentTimeMillis();System.out.println("用时:" + String.valueOf(endtime - starttime));}//这是我们要执行的算法public static int algorithm(int input, int input2) {return input + input2;}
}

输出结果:

在这里插入图片描述

我们可以看到用时3001毫秒,与最费时的input2生成时间差不多。
注意,我们在程序中生成input1时,也让线程休眠了2秒,但是结果不是3+2。说明FutureTask是被异步执行了。

四、FutureTask源码分析

4.1 state字段

volatile修饰的state字段;表示FutureTask当前所处的状态。可能的转换过程见注释。

	/*** Possible state transitions:* NEW -> COMPLETING -> NORMAL(创建到正常运行结束的状态变化轨迹)* NEW -> COMPLETING -> EXCEPTIONAL(创建到异常运行结束的状态变化轨迹)* NEW -> CANCELLED  (创建到取消的状态变化轨迹)* NEW -> INTERRUPTING -> INTERRUPTED(创建到中断结束的状态变化轨迹)*/private volatile int state;// NEW 新建状态,表示这个 FutureTask还没有开始运行private static final int NEW          = 0;// COMPLETING 完成状态, 表示 FutureTask 任务已经计算完毕了// 但是还有一些后续操作,例如唤醒等待线程操作,还没有完成。private static final int COMPLETING   = 1;// FutureTask 任务完结,正常完成,没有发生异常private static final int NORMAL       = 2;// FutureTask 任务完结,因为发生异常。private static final int EXCEPTIONAL  = 3;// FutureTask 任务完结,因为取消任务private static final int CANCELLED    = 4;// FutureTask 任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求private static final int INTERRUPTING = 5;// FutureTask 任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求private static final int INTERRUPTED  = 6;

4.2 其他变量

    /** 任务 */private Callable<V> callable;/** 储存结果*/private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程*/private volatile Thread runner;/** get方法阻塞的线程队列 */private volatile WaitNode waiters;//FutureTask的内部类,get方法的等待队列static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}

4.3 CAS工具初始化

    // Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}

这段代码是为了后面使用CAS而准备的。可以这么理解:

一个java对象可以看成是一段内存,各个字段都得按照一定的顺序放在这段内存里,同时考虑到对齐要求,可能这些字段不是连续放置的,用这个UNSAFE.objectFieldOffset()方法能准确地告诉你某个字段相对于对象的起始内存地址的字节偏移量,因为是相对偏移量,所以它其实跟某个具体对象又没什么太大关系,跟class的定义和虚拟机的内存模型的实现细节更相关。

4.4 构造函数

FutureTask有两个构造函数:

public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable
}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}

这两个构造函数区别在于,如果使用第一个构造函数最后获取线程实行结果就是callable的执行的返回结果;而如果使用第二个构造函数那么最后获取线程实行结果就是参数中的result,接下来让我们看一下FutureTask的run方法。

同时两个构造函数都把当前状态设置为NEW。

4.5 run方法及其他

构造完FutureTask后,会把它当做线程的参数传进去,然后线程就会运行它的run方法。所以我们先来看一下run方法:

public void run() {//如果状态不是new,或者runner旧值不为null(已经启动过了),就结束if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable; // 这里的callable是从构造方法里面传人的if (c != null && state == NEW) {V result;boolean ran;try {result = c.call(); //执行任务,并将结果保存在result字段里。ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex); // 保存call方法抛出的异常}if (ran)set(result); // 保存call方法的执行结果}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

其中,catch语句中的setException(ex)如下:

//发生异常时设置state和outcome
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion();// 唤醒get()方法阻塞的线程}}

而正常完成时,set(result);方法如下:

//正常完成时,设置state和outcome
protected void set(V v) {
//正常完成时,NEW->COMPLETING->NORMALif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); // 唤醒get方法阻塞的线程}}

这两个set方法中,都是用到了finishCompletion()去唤醒get方法阻塞的线程。下面来看看这个方法:

//移除并唤醒所有等待的线程,调用done,并清空callable
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t); //唤醒线程}//接下来的这几句代码是将当前节点剥离出队列,然后将q指向下一个等待节点。被剥离的节点由于thread和next都为null,所以会被GC回收。WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done(); //这个是空的方法,子类可以覆盖,实现回调的功能。callable = null;        // to reduce footprint}

好,到这里我们把运行以及设置结果的流程分析完了。那接下来看一下怎么获得执行结果把。也就是get方法。

get方法有两个,一个是有超时时间设置,另一个没有超时时间设置。

    public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {// get(timeout, unit) 也很简单, 主要还是在 awaitDone里面if(unit == null){throw new NullPointerException();}int s = state;// 判断state状态是否 <= Completing, 调用awaitDone进行自旋等待if(s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING){throw new TimeoutException();}// 根据state的值进行返回结果或抛出异常return report(s);}

两个get方法都用到了awaitDone()。这个方法的作用是: 等待任务执行完成、被中断或超时。看一下源码:

    //等待完成,可能是是中断、异常、正常完成,timed:true,考虑等待时长,false:不考虑等待时长private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L; //如果设置了超时时间WaitNode q = null;boolean queued = false;for (;;) {/***  有优先级顺序*  1、如果线程已中断,则直接将当前节点q从waiters中移出*  2、如果state已经是最终状态了,则直接返回state*  3、如果state是中间状态(COMPLETING),意味很快将变更过成最终状态,让出cpu时间片即可*  4、如果发现尚未有节点,则创建节点*  5、如果当前节点尚未入队,则将当前节点放到waiters中的首节点,并替换旧的waiters*  6、线程被阻塞指定时间后再唤醒*  7、线程一直被阻塞直到被其他线程唤醒**/if (Thread.interrupted()) {// 1removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {// 2if (q != null)q.thread = null;return s; }else if (s == COMPLETING) // 3Thread.yield();else if (q == null) // 4q = new WaitNode();else if (!queued) // 5queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) {// 6nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q); //从waiters中移出节点qreturn state; }LockSupport.parkNanos(this, nanos); }else // 7LockSupport.park(this);}}

接下来看下removeWaiter()移除等待节点的源码:

    private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null; // 将移除的节点的thread=null, 为移除做标示retry:for (;;) {          // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;//通过 thread 判断当前 q 是否是需要移除的 q节点,因为我们刚才标示过了if (q.thread != null) pred = q; //当不是我们要移除的节点,就往下走else if (pred != null) {//当p.thread==null时,到这里。下面这句话,相当于把q从队列移除。pred.next = s;//pred.thread == null 这种情况是在多线程进行并发 removeWaiter 时产生的//此时正好移除节点 node 和 pred, 所以loop跳到retry, 从新进行这个过程。想象一下,如果在并发的情况下,其他线程把pred的线程置为空了。那说明这个链表不应该包含pred了。所以我们需要跳到retry从新开始。if (pred.thread == null) // check for racecontinue retry;}//到这步说明p.thread==null 并且 pred==null。说明node是头结点。else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}

最后在get方法中调用report(s),根据状态s的不同进行返回结果或抛出异常。

    private V report(int s) throws ExecutionException {Object x = outcome;  //之前我们set的时候,已经设置过这个值了。所以直接用。if (s == NORMAL)return (V)x;  //正常执行结束,返回结果if (s >= CANCELLED)throw new CancellationException(); //被取消或中断了,就抛异常。throw new ExecutionException((Throwable)x);}

以上就是FutureTask的源码分析。

最后总结一下:

FutureTask既可以当做Runnable也可以当做Future。线程通过执行FutureTask的run方法,将正常运行的结果放入FutureTask类的result变量中。使用get方法可以阻塞直到获得结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/310702.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源模型应用落地-chatglm3-6b-gradio-入门篇(七)

一、前言 早前的文章&#xff0c;我们都是通过输入命令的方式来使用Chatglm3-6b模型。现在&#xff0c;我们可以通过使用gradio&#xff0c;通过一个界面与模型进行交互。这样做可以减少重复加载模型和修改代码的麻烦&#xff0c; 让我们更方便地体验模型的效果。 二、术语 2.…

《剑指 Offer》专项突破版 - 面试题 110 : 所有路径(C++ 实现)

题目链接&#xff1a;所有路径 题目&#xff1a; 一个有向无环图由 n 个节点&#xff08;标号从 0 到 n - 1&#xff0c;n > 2&#xff09;组成&#xff0c;请找出从节点 0 到节点 n - 1 的所有路径。图用一个数组 graph 表示&#xff0c;数组的 graph[i] 包含所有从节点 …

组件与组件之间的传递-事件总线

两个组件之间的数据传递&#xff08;属于非父子组件通讯&#xff09; 当项目中只是两个组件的少量数据传递时使用事件总线这种方法会比较方便&#xff0c;但当遇到大量数据传递时推荐使用vuex 思路 组件与组件之间不能直接传递&#xff0c;这是候可以创建一个EventBus.js文件…

ELK日志分析系统之Zookeeper

一、Zookeeper简介 ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务&#xff0c;它提供了一项基本服务&#xff1a;分布式锁服务。分布式应用可以基于它实现更高级的服务&#xff0c;实现诸如同步服务、配置维护和集群管理或者命名的服务。 Zookeepe…

力扣:49. 字母异位词分组

知识点&#xff1a; 散列函数 散列函数能使对一个数据序列的访问过程更加迅速有效&#xff0c;通过散列函数&#xff0c;数据元素将被更快地定位&#xff1a; 1. 直接寻址法&#xff1a;取关键字或关键字的某个线性函数值为散列地址。即H&#xff08;key&#xff09;key或H&a…

计算机网络 Cisco路由器基本配置

一、实验内容 1、按照下表配置好PC机IP地址和路由器端口IP地址 2、配置好路由器特权密文密码“abcd&#xff0b;两位班内序号”和远程登录密码“star” 3、验证测试 a.验证各个接口的IP地址是否正确配置和开启 b.PC1 和 PC2 互ping c.验证PC1通过远程登陆到路由器上&#…

C#医学实验室/检验信息管理系统(LIS系统)源码

目录 检验系统的总体目标 LIS主要包括以下功能&#xff1a; LIS是集&#xff1a;申请、采样、核收、计费、检验、审核、发布、质控、耗材控制等检验科工作为一体的信息管理系统。LIS系统不仅是自动接收检验数据&#xff0c;打印检验报告&#xff0c;系统保存检验信息的工具&a…

初级软件测试常见问题

1.JMeter &#xff08;1&#xff09;在http请求的时候&#xff0c;消息体数据中的数据需要用{}和“”标记起来&#xff0c;变量要用${}括起来。 &#xff08;2&#xff09;在响应断言的时候&#xff0c;要根据测试模式输出的内容来改变测试字段&#xff0c;假如输出错误可以把…

系统学c#:1、基础准备(软件下载与安装)

一、Vs软件下载与安装 访问Visual Studio官方网站&#xff1a; https://visualstudio.microsoft.com/zh-hans/downloads 下载Visual Studio 运行exe文件&#xff0c;点击“继续” 初始文件安装完成后选择我们需要安装的项&#xff0c;并勾选好必要的单个组件&#xff0c;设…

代码随想录阅读笔记-回溯【全排列】

题目 给定一个 没有重复 数字的序列&#xff0c;返回其所有可能的全排列。 示例 输入: [1,2,3]输出: [ [1,2,3], [1,3,2], [2,1,3], [2,3,1], [3,1,2], [3,2,1] ] 思路 以[1,2,3]为例&#xff0c;抽象成树形结构如下&#xff1a; 回溯三部曲 1、递归函数参数 首先排列是有…

Emacs之实现复制当前已打开文件buffer(一百三十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

Day55 动态规划 part15

Day55 动态规划 part15 392.判断子序列 我的思路&#xff1a; 自己还是只能想到双指针法 解答: class Solution {public boolean isSubsequence(String s, String t) {if(s.length() 0) {return true;}if(s.length() > t.length() || t.length() 0) {return false;}ch…

(九)C++自制植物大战僵尸游戏自定义对话框的实现

植物大战僵尸游戏开发教程专栏地址http://t.csdnimg.cn/m0EtD 对话框在游戏的交互中非常重要。在游戏中&#xff0c;对话框不仅可以提醒用户下达任务指令&#xff0c;而且还可以让用户进行操作&#xff0c;自定义游戏中的各种属性。对话框在游戏的交互中非常常见且大量使用。Co…

LigaAI x 极狐GitLab,共探 AI 时代研发提效新范式

近日&#xff0c;LigaAI 和极狐GitLab 宣布合作&#xff0c;双方将一起探索 AI 时代的研发效能新范式&#xff0c;提供 AI 赋能的一站式研发效能解决方案&#xff0c;让 AI 成为中国程序员和企业发展的新质生产力。 软件研发是一个涉及人员多、流程多、系统多的复杂工程&#…

[docker] 核心知识 - 概念和运行

[docker] 核心知识 - 概念和运行 之前 docker 学了个开头就去搞项目去了&#xff0c;不过项目也开展了好久了&#xff0c;前端差不多吃透了&#xff0c;有些新功能需要用 docker 和 k8s……是时候重新学习一下了。 这一部分简单的过一下概念和讲一下怎么运行 docker 镜像和启…

wps使用Latex编辑公式没有Latex formula

wps使用Latex编辑公式没有Latex formula 1. 下载CTEX2. 下载LaTeXEE3. 配置Miktex4. 配置latexee5. 用管理员权限运行latexeqedit.exe6. wps插入latex公式 1. 下载CTEX 下载CTEX网址&#xff0c;我下载的下图这个&#xff0c;下载完了之后运行exe文件安装ctex。 2. 下载LaTe…

深入理解k8s kube-proxy

1、概述 我觉得只要大家知道kube-proxy是用来配置网络规则的而不是转发流量的&#xff0c;真正的流量由iptables/ipvs来转发就可以了。 网络是k8s的一个关键部分。理解k8s中网络组件如何工作可以帮助更好的设计和配置我们的应用。 kube-proxy就是K8s网络的核心组件。它把我们…

janus部署

配置和运行janus 1. 配置nginx 安装nginx&#xff0c;主要用来提供web访问。 生成证书 mkdir -p ~/cert cd ~/cert # CA私钥 openssl genrsa -out key.pem 2048 # 自签名证书 openssl req -new -x509 -key key.pem -out cert.pem -days 1095安装nginx #下载nginx 1.15.8版…

OOCT WPF_D3D项目报错无法加载依赖项

运行示例项目报错缺少dll&#xff0c;发现运用了这个大老李&#xff0c;通过添加PATH路径也无法解决&#xff0c;看到debug文件夹下面没有其他的依赖项。 通过depneds工具可以看到 OCCTProxy_D3D.dll 缺少依赖项&#xff0c;图中的缺项都是OCCT生成的模块dll所以讲这些dll从..…

百度 千帆sdk 试用

主要是Java SDK的使用&#xff1a; <dependency> <groupId>com.baidubce</groupId> <artifactId>qianfan</artifactId> <version>0.0.4</version> </dependency> 参考文档&#xff1a;bce-qianfan-sdk/java at main baidub…