CompletableFuture
- 1、概述
- 1.1、Future接口
- 1.2、CompletionStage接口
- 2、CompletableFuture结构
- 2.1、字段和常量
- 2.2、内部类
- 3、CompletableFuture原理
- 3.1、设计思想
- 3.2、获取任务结果的实现
- 不带参数的Get方法实现
- 带超时参数的Get方法实现
- 立即返回结果的GetNow方法实现
- 3.3、开始任务的实现
- 4、CompletableFuture使用
- 4.1、创建
- runAsync()——没有返回值的异步任务
- supplyAsync()——没有返回值的异步任务
- 自定义线程池执行
- 4.2、串行化处理
- thenApply()
- thenAccept()
- ThenRun()
- thenCompose()
- 4.3、组合处理
- thenCombine()
- thenAcceptBoth()
- runAfterBoth
- applyToEither()
- acceptEither()
- runAfterEither()
- allOf()/anyOf()
- 4.4、异常与超时
- handle()
- whenComplete()
- exceptionally()
- completeOnTimeout()
- orTimeout()
- 4.5、获取结果
1、概述
在JAVA8开始引入了全新的CompletableFuture类,它是Future接口的一个实现类。也就是在Future接口的基础上,额外封装提供了一些执行方法,用来解决Future使用场景中的一些不足,对流水线处理能力提供了支持。
1.1、Future接口
CompletableFuture实现自JDK 5出现的Future
接口,该接口属于java.util.concurrent
包,这个包提供了用于并发编程的一些基础设施,其中就包括 Future 接口。Future接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 Executor(执行器),并在稍后获取任务的结果。尽管 Future 提供了一种机制来检查任务是否完成、等待任务完成,并获取其结果,但它的设计也有一些局限性,比如无法取消任务、无法组合多个任务的结果等。
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口为CompletableFuture提供了以下功能:
- 异步任务的提交:通过Future的接口,可以提交异步任务,并在稍后获取任务的结果,这是 Future 接口最基本的功能之一。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
- 检查任务完成状态: 使用 isDone 方法可以检查任务是否已经完成。
boolean isDone = future.isDone();
- 等待任务完成: 通过get方法,阻塞当前线程,直到异步任务完成并获取其结果。
System.out.println("main Thread");
//开启异步线程
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
//阻塞异步线程执行完成
String result = future.get();
- 取消任务: 通过 cancel 方法,你可以尝试取消异步任务的执行。这是 Future 接口的一项功能,但在实际使用中,由于限制和不确定性,这个方法并不总是能够成功取消任务。
boolean canceled = future.cancel(true);
1.2、CompletionStage接口
CompletableFuture同时也实现自CompletionStage
接口,CompletionStage 接口是 Java 8 中引入的,在CompletableFuture中用于表示一个步骤,这个步骤可能是由另外一个CompletionStage触发的,随当前步骤的完成,可以触发其他CompletionStage的执行。CompletableFuture 类实现了 CompletionStage 接口,因此继承了这些功能。
以下是 CompletionStage 为 CompletableFuture 提供的一些关键功能:
- 链式操作:CompletionStage 定义了一系列方法,如
thenApply
,thenAccept
,thenRun
,允许你在一个异步操作完成后,基于其结果进行进一步的操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Integer> lengthFuture = future.thenApply(String::length);
- 组合多个阶段:CompletionStage 提供了
thenCombine
,thenCompose
,thenAcceptBoth
等方法,用于组合多个阶段的结果,形成新的 CompletionStage。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
try {//Hello WorldSystem.out.println(combinedFuture.get());
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}
- 异常处理:CompletionStage 提供了一系列处理异常的方法,如
exceptionally
,handle
,用于在异步计算过程中处理异常情况。
CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {//抛出异常throw new RuntimeException("error");
});
CompletableFuture<Object> exceptionally = future.exceptionally(ex -> "Handled Exception: " + ex.getMessage());
try {//Handled Exception: java.lang.RuntimeException: errorSystem.out.println(exceptionally.get());
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}
- 顺序执行:
thenApply
,thenAccept
,thenRun
等方法可以用于在上一个阶段完成后执行下一个阶段,形成顺序执行的链式操作。
2、CompletableFuture结构
2.1、字段和常量
字段:
//存储异步计算的结果
volatile Object result; //存储观察者链
volatile Completion stack; //空结果
static final AltResult NIL = new AltResult(null);//默认线程池是否支持并行
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);//默认线程池
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();//Unsafe类及操作所需偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
常量:
//这三个变量用于Completion类中tryFire方法的标志,表示不同的触发模式。
static final int SYNC = 0;
static final int ASYNC = 1;
static final int NESTED = -1;
- SYNC:表示同步触发(默认触发方式),即当前计算完成后直接执行后续的操作。适用于当前计算的结果已经准备好并且可以直接进行下一步操作的情况。
- AYSNC:表示异步触发,当前计算完成后将后续的操作提交到异步线程池中执行。即当前计算完成后将后续的操作提交到异步线程池中执行。适用于需要在不同线程上执行后续操作的情况。
- NESTED:嵌套触发,通常表示当前阶段的触发是由另一个阶段触发的,因此无需再次触发后续操作。在某些情况下,可能会避免重复触发。
2.2、内部类
CompletableFuture 类包含多个内部类,这些内部类用于为CompletableFuture提供不同的API而设计的,用于异步编程中的不同阶段和操作。
常用内部类列举:
- UniCompletion、BiCompletion:UniCompletion 和 BiCompletion 是用于表示异步操作链中的单一阶段和二元阶段的基础抽象类。它们提供了一些通用的方法和字段,用于处理阶段之间的关系,尤其是观察者链的构建和触发。
- UniApply、UniAccept、UniRun:UniApply、UniAccept 和 UniRun 是 UniCompletion 的具体子类,分别用于表示异步操作链中的 thenApply、thenAccept 和 thenRun 阶段。它们实现了具体的 tryFire 方法,用于触发阶段的执行。
- BiApply、BiAccept、BiRun::BiApply、BiAccept 和 BiRun 是 BiCompletion 的具体子类,分别用于表示异步操作链中的 thenCombine、thenAcceptBoth 和 runAfterBoth 阶段。它们同样实现了具体的 tryFire 方法。
- OrApply、OrAccept、OrRun:OrApply、OrAccept 和 OrRun 是 BiCompletion 的另一组具体子类,用于表示异步操作链中的 applyToEither、acceptEither 和 runAfterEither 阶段。同样,它们实现了具体的 tryFire 方法。
abstract CompletableFuture<?> tryFire(int mode);
- .触发方式( mode ):tryFire 方法接收一个 mode 参数,表示触发的方式。常见的触发方式包括同步触发(
SYNC
)、异步触发(ASYNC
)以及嵌套触发(NESTED
)。 - .触发下一个阶段:在 tryFire 方法中,通过 next 字段获取下一个阶段的引用,然后调用下一个阶段的 tryFire 方法,将当前阶段的计算结果传递给下一个阶段。
- 递归触发:tryFire 方法可能会递归调用下一个阶段的 tryFire 方法,以确保整个异步操作链中的阶段能够依次触发。这个递归调用保证了异步操作链的串联执行。
- 触发逻辑的条件判断:tryFire 方法中通常还包含一些条件判断,用于确定是否应该触发后续的操作。例如,可能会检查当前阶段的状态,如果满足触发条件,则继续触发。
总体而言,tryFire 方法是 CompletableFuture 异步操作链中触发后续阶段的核心方法。通过递归调用,它实现了异步操作链的顺序执行,确保了各个阶段按照期望的顺序执行,并将计算结果传递给下一个阶段。
3、CompletableFuture原理
3.1、设计思想
CompletableFuture 按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {//fn1函数return "hello world";
});
CompletableFuture<String> cf2 = cf1.thenApply(e -> {//fn2函数return e + "!!!";
});
try {//hello world!!!System.out.println(cf2.get());
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}
由于回调种类多,但结构差异不大,所以这里单以一元依赖中的thenApply为例,不再枚举全部回调类型。
被观察者:
- 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
- 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。
观察者:
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result!=null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
- 观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。
- 观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。
- 观察者Completion中的fn属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。
3.2、获取任务结果的实现
先来分析一下CompletableFuture的get方法的实现细节,CompletableFuture实现了Future的所有接口,包括两个get方法,一个是不带参数的get方法,一个是可以设置等待时间的get方法。
不带参数的Get方法实现
首先来看一下CompletableFuture中不带参数的get方法的具体实现:
public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);
}
result字段代表任务的执行结果,所以首先判断是否为null,为null则表示任务还没有执行结束,那么就会调用waitingGet方法来等待任务执行完成,如果result不为null,那么说明任务已经成功执行结束了,那么就调用reportGet来返回结果,下面先来看一下waitingGet方法的具体实现细节:
private Object waitingGet(boolean interruptible) {Signaller q = null;boolean queued = false;int spins = -1;Object r;while ((r = result) == null) {if (spins < 0)spins = (Runtime.getRuntime().availableProcessors() > 1) ?1 << 8 : 0; // Use brief spin-wait on multiprocessorselse if (spins > 0) {if (ThreadLocalRandom.nextSecondarySeed() >= 0)--spins;}else if (q == null)q = new Signaller(interruptible, 0L, 0L);else if (!queued)queued = tryPushStack(q);else if (interruptible && q.interruptControl < 0) {q.thread = null;cleanStack();return null;}else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}if (q != null) {q.thread = null;if (q.interruptControl < 0) {if (interruptible)r = null; // report interruptionelseThread.currentThread().interrupt();}}postComplete();return r;
}
这个方法的实现时比较复杂的,方法中有几个地方需要特别注意,下面先来看一下spins是做什么的,根据注释,可以知道spins是用来在多核心环境下的自旋操作的,所谓自旋就是不断循环等待判断,从代码可以看出在多核心环境下,spins会被初始化为1 << 8,然后在自旋的过程中如果发现spins大于0,那么就通过一个关键方法ThreadLocalRandom.nextSecondarySeed()来进行spins的更新操作,如果ThreadLocalRandom.nextSecondarySeed()返回的结果大于0,那么spins就减1,否则不更新spins。ThreadLocalRandom.nextSecondarySeed()方法其实是一个类似于并发环境下的random,是线程安全的。
接下来还需要注意的一个点是Signaller,从Signaller的实现上可以发现,Signaller实现了ForkJoinPool.ManagedBlocker,下面是ForkJoinPool.ManagedBlocker的接口定义:
public static interface ManagedBlocker {boolean block() throws InterruptedException;boolean isReleasable();}
ForkJoinPool.ManagedBlocker的目的是为了保证ForkJoinPool的并行性,具体分析还需要更为深入的学习Fork/Join框架。
继续回到waitingGet方法中,在自旋过程中会调用ForkJoinPool.managedBlock(ForkJoinPool.ManagedBlocker)来进行阻塞工作,实际的效果就是让线程等任务执行完成,CompletableFuture中与Fork/Join的交叉部分内容不再本文的描述范围,日后再进行分析总结。总得看起来,waitingGet实现的功能就是等待任务执行完成,执行完成返回结果并做一些收尾工作。
现在来看reportGet方法的实现细节,在判断任务执行完成之后,get方法会调用reportGet方法来获取结果:
private static <T> T reportGet(Object r)throws InterruptedException, ExecutionException {if (r == null) // by convention below, null means interruptedthrow new InterruptedException();if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw new ExecutionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;
}
带超时参数的Get方法实现
public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {Object r;long nanos = unit.toNanos(timeout);return reportGet((r = result) == null ? timedGet(nanos) : r);
}
和不带参数的get方法一样,还是会判断任务是否已经执行完成了,如果完成了会调用reportGet方法来返回最终的执行结果(或者抛出异常),否则,会调用timedGet来进行超时等待,timedGet会等待一段时间,然后抛出超时异常(或者执行结束返回正常结果),下面是timedGet方法的具体细节:
private Object timedGet(long nanos) throws TimeoutException {if (Thread.interrupted())return null;if (nanos <= 0L)throw new TimeoutException();long d = System.nanoTime() + nanos;Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0boolean queued = false;Object r;// We intentionally don't spin here (as waitingGet does) because// the call to nanoTime() above acts much like a spin.while ((r = result) == null) {if (!queued)queued = tryPushStack(q);else if (q.interruptControl < 0 || q.nanos <= 0L) {q.thread = null;cleanStack();if (q.interruptControl < 0)return null;throw new TimeoutException();}else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}if (q.interruptControl < 0)r = null;q.thread = null;postComplete();return r;
}
在timedGet中不再使用spins来进行自旋,因为现在可以确定需要等待多少时间了。timedGet的逻辑和waitingGet的逻辑类似,毕竟都是在等待任务的执行结果。
立即返回结果的GetNow方法实现
除了两个get方法之前,CompletableFuture还提供了一个方法getNow,代表需要立刻返回不进行阻塞等待,下面是getNow的实现细节:
public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
getNow很简单,判断result是否为null,如果不为null则直接返回,否则返回参数中传递的默认值。
3.3、开始任务的实现
分析完了get部分的内容,下面开始分析CompletableFuture最为重要的一个部分,就是如何开始一个任务的执行。下文中将分析supplyAsync的具体执行流程,supplyAsync有两个版本,一个是不带Executor的,还有一个是指定Executor的,下面首先分析一下不指定Executor的supplyAsync版本的具体实现流程:
supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier); // asyncPool为ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(实现了Executor接口,里面的内容是{new Thread(r).start();})
}
asyncSupplyStage(Executor e, Supplier<U> f)
:
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {if (f == null)throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>(); // 构建一个新的CompletableFuture, 以此构建AsyncSupply作为Executor的执行参数e.execute(new AsyncSupply<U>(d, f)); // AsyncSupply继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口return d; // 返回d,立返
}
AsyncSupply
// CompletableFuture的静态内部类,作为一个ForkJoinTaskstatic final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; // AsyncSupply作为一个依赖Task,dep作为这个Task的FutureSupplier<T> fn; // fn作为这个Task的具体执行逻辑,函数式编程AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep;this.fn = fn;}public final Void getRawResult() {return null;}public final void setRawResult(Void v) {}public final boolean exec() {run();return true;}public void run() {CompletableFuture<T> d;Supplier<T> f;if ((d = dep) != null && (f = fn) != null) { // 非空判断dep = null;fn = null;if (d.result == null) { // 查看任务是否结束,如果已经结束(result != null),直接调用postComplete()方法try {d.completeValue(f.get()); // 等待任务结束,并设置结果} catch (Throwable ex) {d.completeThrowable(ex); // 异常}}d.postComplete(); // 任务结束后,会执行所有依赖此任务的其他任务,这些任务以一个无锁并发栈的形式存在}}}
final void postComplete() {CompletableFuture<?> f = this; // 当前CompletableFutureCompletion h; // 无锁并发栈,(Completion next), 保存的是依靠当前的CompletableFuture一串任务,完成即触发(回调)while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { // 当f的stack为空时,使f重新指向当前的CompletableFuture,继续后面的结点CompletableFuture<?> d;Completion t;if (f.casStack(h, t = h.next)) { // 从头遍历stack,并更新头元素if (t != null) {if (f != this) { // 如果f不是当前CompletableFuture,则将它的头结点压入到当前CompletableFuture的stack中,使树形结构变成链表结构,避免递归层次过深pushStack(h);continue; // 继续下一个结点,批量压入到当前栈中}h.next = null; // 如果是当前CompletableFuture, 解除头节点与栈的联系}f = (d = h.tryFire(NESTED)) == null ? this : d; // 调用头节点的tryFire()方法,该方法可看作Completion的钩子方法,执行完逻辑后,会向后传播的}}}
每个CompletableFuture持有一个Completion栈stack, 每个Completion持有一个CompletableFuture -> dep, 如此递归循环下去,是层次很深的树形结构,所以想办法将其变成链表结构。
首先取出头结点,下图中灰色Completion结点,它会返回一个CompletableFuture, 同样也拥有一个stack,策略是遍历这个CompletableFuture的stack的每个结点,依次压入到当前CompletableFuture的stack中,关系如下箭头所示,灰色结点指的是处理过的结点。
第一个Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面.
后续的Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面,重新构成了一个链表结构,后续也按照前面的逻辑操作,如此反复,便会遍历完所有的CompletableFuture, 这些CompletableFuture(叶子结点)的stack为空,也是结束条件。
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {volatile Completion next; // 无锁并发栈/*** 钩子方法,有三种模式,postComplete()方法里面使用的是NESTED模式,避免过深的递归调用 SYNC, ASYNC, or NESTED*/abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都调用了这个钩子方法/** cleanStack()方法里有用到 */abstract boolean isLive();public final void run() {tryFire(ASYNC);}public final boolean exec() {tryFire(ASYNC);return true;}public final Void getRawResult() {return null;}public final void setRawResult(Void v) {}}
继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口,它有诸多子类上面已经介绍过。
先看一个子类UniCompletion:
abstract static class UniCompletion<T,V> extends Completion {Executor executor; // 执行器CompletableFuture<V> dep; // 依赖的任务CompletableFuture<T> src; // 被依赖的任务UniCompletion(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src) {this.executor = executor; this.dep = dep; this.src = src;}final boolean claim() { // 如果当前任务可以被执行,返回true,否则,返回false; 保证任务只被执行一次Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null)return true;executor = null; // 设置为不可用e.execute(this);}return false;}final boolean isLive() { return dep != null; }}
claim()方法保证任务只被执行一次。
whenComplete()/whenCompleteAsync()
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);}public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(asyncPool, action);}
xxx和xxxAsync方法的区别是,有没有asyncPool作为入参,有的话,任务直接入参,不检查任务是否完成。uniWhenCompleteStage方法有说明。
uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f)
private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) {if (f == null)throw new NullPointerException();CompletableFuture<T> d = new CompletableFuture<T>(); // 构建futureif (e != null || !d.uniWhenComplete(this, f, null)) { // 如果线程池不为空,直接构建任务入栈,并调用tryFire()方法;否则,调用uniWhenComplete()方法,检查依赖的那个任务是否完成,没有完成返回false,// 完成了返回true, 以及后续一些操作。UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete继承了UniCompletionpush(c);c.tryFire(SYNC); // 先调一下钩子方法,检查一下任务是否结束}return d;}
uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c)
final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) {Object r;T t;Throwable x = null;if (a == null || (r = a.result) == null || f == null) // 被依赖的任务还未完成return false;if (result == null) { // 被依赖的任务完成了try {if (c != null && !c.claim()) // 判断任务是否能被执行return false;if (r instanceof AltResult) { // 判断异常,AltResult类型很简单,里面只有一个属性Throwable ex; x = ((AltResult) r).ex;t = null;} else {@SuppressWarnings("unchecked")T tr = (T) r; // 正常的结果t = tr;}f.accept(t, x); // 执行任务if (x == null) {internalComplete(r); // 任务的结果设置为被依赖任务的结果return true;}} catch (Throwable ex) {if (x == null)x = ex; // 记录异常}completeThrowable(x, r); // 设置异常和结果}return true;}
push()
final void push(UniCompletion<?, ?> c) {if (c != null) {while (result == null && !tryPushStack(c))lazySetNext(c, null); // 失败重置c的next域}}final boolean tryPushStack(Completion c) {Completion h = stack;lazySetNext(c, h);return UNSAFE.compareAndSwapObject(this, STACK, h, c);}static void lazySetNext(Completion c, Completion next) {UNSAFE.putOrderedObject(c, NEXT, next);}
再回顾下supplyAsync方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}
可以看到supplyAsync会调用asyncSupplyStage,并且指定一个默认的asyncPool来执行任务,CompletableFuture是管理执行任务的线程池的,这一点是和FutureTask的区别,FutureTask只是一个可以被执行的task,而CompletableFuture本身就管理者线程池,可以由CompletableFuture本身来管理任务的执行。
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);/*** Default executor -- ForkJoinPool.commonPool() unless it cannot* support parallelism.*/private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
首先会做一个判断,如果条件满足就使用ForkJoinPool的commonPool作为默认的Executor,否则会使用一个ThreadPerTaskExecutor来作为CompletableFuture来做默认的Executor。
接着看asyncSupplyStage,我们提交的任务会被包装成一个AsyncSupply对象,然后交给CompletableFuture发现的Executor来执行,那AsyncSupply是什么呢?
static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; Supplier<T> fn;AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}public final boolean exec() { run(); return true; }public void run() {CompletableFuture<T> d; Supplier<T> f;if ((d = dep) != null && (f = fn) != null) {dep = null; fn = null;if (d.result == null) {try {d.completeValue(f.get());} catch (Throwable ex) {d.completeThrowable(ex);}}d.postComplete();}}}
观察到AsyncSupply实现了Runnable,而Executor会执行Runnable的run方法来获得结构,所以主要看AsyncSupply的run方法的具体细节,可以看到,run方法中会试图去获取任务的结果,如果不抛出异常,那么会调用CompletableFuture的completeValue方法,否则会调用CompletableFuture的completeThrowable方法,最后会调用CompletableFuture的postComplete方法来做一些收尾工作,主要来看前两个方法的细节,首先是completeValue方法:
/** Completes with a non-exceptional result, unless already completed. */final boolean completeValue(T t) {return UNSAFE.compareAndSwapObject(this, RESULT, null,(t == null) ? NIL : t);}
completeValue方法会调用UNSAFE.compareAndSwapObject来讲任务的结果设置到CompletableFuture的result字段中去。如果在执行任务的时候抛出异常,会调用completeThrowable方法,下面是completeThrowable方法的细节:
/** Completes with an exceptional result, unless already completed. */final boolean completeThrowable(Throwable x) {return UNSAFE.compareAndSwapObject(this, RESULT, null,encodeThrowable(x));}
指定Executor的supplyAsync方法和没有指定Executor参数的supplyAsync方法的唯一区别就是执行任务的Executor.这里不多讲。
到这里,可以知道Executor实际执行的代码到底是什么了,回到asyncSupplyStage方法,接着就会执行Executor.execute方法来执行任务,需要注意的是,asyncSupplyStage方法返回的是一个CompletableFuture,并且立刻返回的,具体的任务处理逻辑是有Executor来执行的,当任务处理完成的时候,Executor中负责处理的线程会将任务的执行结果设置到CompletableFuture的result字段中去。
4、CompletableFuture使用
4.1、创建
创建CompletableFuture对象,提供了四个静态方法用来创建CompletableFuture对象:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Asynsc 表示异步,而 supplyAsync 与 runAsync 不同在与前者异步返回一个结果,后者是 void。第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的 ForkJoinPool.commonPool() 作为它的线程池.其中Supplier是一个函数式接口,代表是一个生成者的意思,传入0个参数,返回一个结果。
CompletableFuture.runAsync(() -> {System.out.println("hello");
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello world";
});
try {System.out.println(future.get());
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}
//hello
//hello world
runAsync()——没有返回值的异步任务
如果你想异步的运行一个后台任务并且不需要任务返回结果,就可以使用 runAsync()。
runAsync() 返回一个新的 CompletableFuture,它在运行给定操作后由在 ForkJoinPool.commonPool() 运行的任务异步完成。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//模拟任务try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}});//主线程阻塞future.get();System.out.println("主线程结束");
}
//ForkJoinPool.commonPool-worker-9
//主线程结束
supplyAsync()——没有返回值的异步任务
当运行一个异步任务并且需要有返回结果时,就可以使用 supplyAsync()。
CompletableFuture.supplyAsync() 它持有 supplier 并且返回 CompletableFuture,T 是通过调用传入的 supplier 取得的值的类型。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "Success";});System.out.println(future.get());
}
//Success
自定义线程池执行
CompletableFuture 可以从全局的 ForkJoinPool.commonPool() 获得一个线程中执行这些任务。但也可以创建一个线程池并传给 runAsync() 和 supplyAsync() 来让他们从线程池中获取一个线程执行它们的任务。
CompletableFuture API 的所有方法都有两个变体:
- 一个接受Executor作为参数
- 另一个不接受
我们只需要传入一个自定义线程池即可:
public static void main(String[] args) throws ExecutionException, InterruptedException {Executor executor = Executors.newFixedThreadPool(10);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());return "Success";}, executor);System.out.println(future.get());
}
//pool-1-thread-1
//Success
4.2、串行化处理
在流水线处理场景中,往往都是一个任务环节处理完成后,下一个任务环节接着上一环节处理结果继续处理。CompletableFuture用于这种流水线环节驱动类的方法有很多,相互之间主要是在返回值或者给到下一环节的入参上有些许差异,使用时需要注意区分:
方法 | 参数 | 描述 |
---|---|---|
thenApply | T -> U | 对结果应用一个函数 |
thenAccept | T -> void | 类似于thenApply,不过结果为void |
thenCompose | T -> CompletableFuture<U> | 对结果调用函数并执行返回的future |
thenRun | Runnable | 执行Runnalbe,结果为void |
因为上述thenApply、thenCompose方法的输出仍然都是一个CompletableFuture对象,所以各个方法是可以一环接一环的进行调用,形成流水线式的处理逻辑:
thenApply()
当一个线程依赖另一个线程时,可以使用 thenApply() 来把这两个线程串行化,可以使用 thenApply() 处理和改变 CompletableFuture 的结果。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "world";}).thenApply(o -> {return "hello " + o;});System.out.println(future.get());
}
//hello world
thenAccept()
如果你不想从你的回调函数中返回任何东西,仅仅想在 Future 完成后运行一些代码片段,你可以使用 thenAccept() 和 thenRun(),这些方法经常在调用链的最末端的最后一个回调函数中使用。
thenAccept:消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello world";}).thenAccept(o -> {System.out.println(o.length());});System.out.println(future.get());
}
//11
//null
ThenRun()
thenRun() 不能访 Future 的结果,它持有一个 Runnable 返回 CompletableFuture:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello world";}).thenRun(() -> {System.out.println("作业完成后执行一些代码");});System.out.println(future.get());
}
//作业完成后执行一些代码
//null
thenCompose()
henApply和thenCompose都是对一个CompletableFuture返回的结果进行后续操作,返回一个新的CompletableFuture。
- thenApply:fn函数是一个对一个已完成的stage或者说CompletableFuture的的返回值进行计算、操作,也就是说转换的是泛型中的类型,相当于将CompletableFuture 转换生成新的CompletableFuture
- thenCompose:fn函数是对另一个CompletableFuture进行计算、操作,也就是说用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
大白话说就是thenApply从 CompletableFuture 生成新的CompletableFuture<U>,只是将CompletableFuture的T类型转换为了U类型而已,CompletableFuture还是同一个CompletableFuture。而thenCompose是生成了一个新的CompletableFuture。
public static void main(String[] args) throws ExecutionException, InterruptedException {final CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {// 先执行supplyAsync方法,得到返回值return "hello";}).thenCompose(o -> {// thenCompose方法返回一个新的CompletableFuturereturn CompletableFuture.supplyAsync(() -> {if ("hello".equals(o)) {return 111;}else {return 000;}});});final Integer integer = completableFuture.get();System.out.println("结果:" + integer);
}
//结果:111
4.3、组合处理
前面一直在介绍流水线式的处理场景,但是很多时候,流水线处理场景也不会是一个链路顺序往下走的情况,很多时候为了提升并行效率,一些没有依赖的环节我们会让他们同时去执行,然后在某些环节需要依赖的时候,进行结果的依赖合并处理,类似如下图的效果。
方法 | 参数 | 描述 |
---|---|---|
thenCombine | CompletableFuture<U>, (T, U) -> V | 执行两个动作并用给定函数组合结果 |
thenAcceptBoth | CompletableFuture<U>, (T, U) -> void | 与thenCombine类似,不过结果为void |
runAfterBoth | CompletableFuture<?>, Runnable | 两都都完成后执行runable |
applyToEither | CompletableFuture<T>, T -> V | 得到其中一个的结果时,传入给定的函数 |
acceptEither | CompletableFuture<T>, T -> void | 与applyToEither类似,不过结果为void |
runAfterEither | CompletableFuture<?>, Runnable | 其中一个完成后执行runable |
static allOf | CompletableFuture<?>... | 所有给定的future都完成后完成,结果为void |
static anyOf | CompletableFuture<?>... | 任意给定的future完成后则完成,结果为void |
thenCombine()
使用 thenCombine() 组合两个独立的 future。当两个独立的 Future 都完成的时候使用 thenCombine() 用来做一些事情。
public static void main(String[] args) throws ExecutionException, InterruptedException {// 长方形:S=长*宽CompletableFuture<Integer> lengthFuture = CompletableFuture.supplyAsync(() -> {return 50;});CompletableFuture<Integer> widthFuture = CompletableFuture.supplyAsync(() -> {return 30;});CompletableFuture<String> combinedFuture = lengthFuture.thenCombine(widthFuture, (t1, t2) -> {System.out.println(t1);System.out.println(t2);return "长方形面积为:" + t1 * t2;});System.out.println(combinedFuture.get());
}
//50
//30
//长方形面积为:1500
thenAcceptBoth()
与thenCombine类似,不过结果为void。
public static void main(String[] args) throws ExecutionException, InterruptedException {// 长方形:S=长*宽CompletableFuture<Integer> lengthFuture = CompletableFuture.supplyAsync(() -> {return 50;});CompletableFuture<Integer> widthFuture = CompletableFuture.supplyAsync(() -> {return 30;});CompletableFuture<Void> combinedFuture = lengthFuture.thenAcceptBoth(widthFuture, (t1, t2) -> {System.out.println(t1);System.out.println(t2);System.out.println("长方形面积为:" + t1 * t2);});System.out.println(combinedFuture.get());
}
//50
//30
//长方形面积为:1500
//null
runAfterBoth
两者都完成后执行runable。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {System.out.println("task1...");return "hello";});CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {System.out.println("task2...");return "world";});CompletableFuture<Void> future = f1.runAfterBoth(f2, () -> {System.out.println("两个任务都完成了");});System.out.println(future.get());
}
//task1...
//task2...
//两个任务都完成了
//null
applyToEither()
当任意一个CompletionStage完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture的计算结果。
下面这个例子有时会输出100,有时候会输出200,哪个Future先完成就会根据它的结果计算。
public static void main(String[] args) {Random rand = new Random();CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 100;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 200;});CompletableFuture<String> f = future.applyToEither(future2, i -> i.toString());System.out.println(f.join()); //有时候输出100 有时候输出200
}
acceptEither()
与applyToEither类似,不过结果为void。
public static void main(String[] args) {Random rand = new Random();CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 100;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 200;});CompletableFuture<Void> f = future.acceptEither(future2, i -> {System.out.println(i.toString());});System.out.println(f.join()); //null
}
runAfterEither()
其中一个完成后执行runable。
public static void main(String[] args) {Random rand = new Random();CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 100;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 200;});CompletableFuture<Void> f = future.runAfterEither(future2, () -> {System.out.println("两者完成了任意一个");});System.out.println(f.join());
}
//两者完成了任意一个
//null
allOf()/anyOf()
- allOf:所有给定的future都完成后完成,结果为void。
- anyOf:任意给定的future完成后则完成,结果为void
public static void main(String[] args) {Random rand = new Random();CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return 100;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000 + rand.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}return "abc";});//CompletableFuture<Void> f = CompletableFuture.allOf(future1,future2);CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2);System.out.println(f.join());
}
4.4、异常与超时
方法 | 参数 | 描述 |
---|---|---|
handle | (T, Throwable) -> U | 处理结果或错误,生成一个新结果 |
whenComplete | (T, Throwable) -> void | 类似于handle,不过结果为void |
exceptionally | Throwable -> T | 从错误计算一个结果 |
completeOnTimeout | T, long, TimeUnit | 如果超时,生成给定值作为结果 |
orTimeout | long, TimeUnit | 如果超时,生成一个TimeoutException异常 |
handle()
从异常恢复,无论一个异常是否发生它都会被调用。与thenApply类似,区别点在于handle执行函数的入参有两个,一个是CompletableFuture执行的实际结果,一个是是Throwable对象,这样如果前面执行出现异常的时候,可以通过handle获取到异常并进行处理。
public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 0;return 10 / i;//异常}).handle((obj, e) -> {if (e != null) {System.out.println("thenApply executed, exception occurred...");System.out.println(e.getMessage());}return obj;});System.out.println(future.join());
}
//thenApply executed, exception occurred...
//java.lang.ArithmeticException: / by zero
//null
whenComplete()
与handle类似,区别点在于whenComplete执行后无返回值
public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 0;return 10 / i;//异常}).whenComplete((obj, e) -> {if (e != null) {System.out.println("thenApply executed, exception occurred...");System.out.println(e.getMessage());}});
}
exceptionally()
回调处理异常,从原始Future中生成的错误恢复的机会。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 0;return 10 / i;//异常}).exceptionally(ex -> {System.out.println(ex.getMessage());return Integer.MIN_VALUE;});System.out.println(future.get());
}
//java.lang.ArithmeticException: / by zero
//-2147483648
completeOnTimeout()
如果超时,生成给定值作为结果。(jdk9)
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {//模拟任务try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}return "success";}).completeOnTimeout("default value", 2, TimeUnit.SECONDS);System.out.println(future.get());//default value
}
orTimeout()
如果超时,生成一个TimeoutException异常。(jdk9)
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {//模拟任务try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}return "success";}).orTimeout(3, TimeUnit.SECONDS);System.out.println(future.get());//ava.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
}
4.5、获取结果
在执行线程中将任务放到工作线程中进行处理的时候,执行线程与工作线程之间是异步执行的模式,如果执行线程需要获取到共工作线程的执行结果,则可以通过get或者join方法,阻塞等待并从CompletableFuture中获取对应的值。
- get():等待CompletableFuture执行完成并获取其具体执行结果,可能会抛出异常,需要代码调用的地方手动try…catch进行处理。
- get(long, TimeUnit):与get()相同,只是允许设定阻塞等待超时时间,如果等待超过设定时间,则会抛出异常终止阻塞等待。
- join():等待CompletableFuture执行完成并获取其具体执行结果,可能会抛出运行时异常,无需代码调用的地方手动try…catch进行处理。
从介绍上可以看出,两者的区别就在于是否需要调用方显式的进行try…catch处理逻辑,使用代码示例如下:
public void testGetAndJoin(String product) {// join无需显式try...catch...PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)).join();try {// get显式try...catch...PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)).get(5L, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}
}