CompletableFuture详解
学习链接:https://juejin.cn/post/7124124854747398175?searchId=20240806151438B643DF2AAD2FC5E6F11E
一、CompletableFuture简介
在JAVA8开始引入了全新的CompletableFuture
类,它是Future
接口的一个实现类。也就是在Future
接口的基础上,额外封装提供了一些执行方法,用来解决Future使用场景中的一些不足,对流水线处理能力提供了支持。
当我们需要进行异步处理的时候,我们可以通过CompletableFuture.supplyAsync
方法,传入一个具体的要执行的处理逻辑函数,这样就轻松的完成了CompletableFuture的创建与触发执行。
方法名称 | 作用描述 |
---|---|
supplyAsync | 静态方法,用于构建一个CompletableFuture<T> 对象,并异步执行传入的参数,允许执行函数有返回值 |
runAsync | 静态方法,用于构建一个CompletableFuture<Void> 对象,并异步执行传入函数,与supplyAsync的区别在于此方法传入的是Callable类型,仅执行,没有返回值 |
使用示例:
public void testCreateFuture(String product) {// supplyAsync, 执行逻辑有返回值PriceResultCompletableFuture<PriceResult> supplyAsyncResult =CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product));// runAsync, 执行逻辑没有返回值CompletableFuture<Void> runAsyncResult =CompletableFuture.runAsync(() -> System.out.println(product));
}
特别补充:
supplyAsync
或者runAsync
创建后便会立即执行,无需手动调用触发。
二、环环相扣处理
在流水线处理场景中,往往都是一个任务环节处理完成后,下一个任务环节接着上一环节处理结果继续处理。CompletableFuture
用于这种流水线环节驱动类的方法有很多,相互之间主要是在返回值或者给到下一环节的入参上有些许差异,使用时需要注意区分:
具体的方法的描述归纳如下:
方法名称 | 作用描述 |
---|---|
thenApply | 对CompletableFuture 的执行后的结果进行追加处理,并将当前的CompletableFuture 泛型对象更改为处理后新的对象类型,返回当前CompletableFuture 对象 |
thenCompose | 与thenApply 类似,区别点在于:此方法的入参函数返回一个CompletableFuture 类型对象 |
thenAccept | 在所有异步任务完成后执行一系列操作,与thenApply 类似,区别点在于thenApply 返回void类型,没有具体结果输出,适合无需返回值的场景 |
thenRun | 与thenAccept 类似,区别点在于thenAccept 可以将前面CompletableFuture 执行的实际结果作为参数进行传入并使用,但是thenRun 方法没有任何入参,只能执行一个Runnable 函数,并且返回void 类型 |
因为上述thenApply
、thenCompose
方法的输出仍然都是一个CompletableFuture
对象,所以各个方法是可以一环接一环的进行调用,形成流水线式的处理逻辑:
期望总是美好的,但是实际情况却总不尽如人意。在我们编排流水线的时候,如果某一个环节执行抛出异常了,会导致整个流水线后续的环节就没法再继续下去了,比如下面的例子:
public void testExceptionHandle() {CompletableFuture.supplyAsync(() -> {throw new RuntimeException("supplyAsync excetion occurred...");}).thenApply(obj -> {System.out.println("thenApply executed...");return obj;}).join();
}
执行之后会发现,supplyAsync
抛出异常后,后面的thenApply
并没有被执行。
那如果我们想要让流水线的每个环节处理失败之后都能让流水线继续往下面环节处理,让后续环节可以拿到前面环节的结果或者是抛出的异常并进行对应的应对处理,就需要用到handle
和whenCompletable
方法了。
先看下两个方法的作用描述:
方法名称 | 方法描述 |
---|---|
handle | 与thenApply 类似,区别点在于handle执行函数的入参有两个,一个是CompletableFuture 执行的实际结果,一个是是Throwable 对象,这样如果前面执行出现异常的时候,可以通过handle获取到异常并进行处理。 |
whenComplete | 与handle 类似,区别点在于whenComplete 执行后无返回值。 |
我们对上面一段代码示例修改使用handle方法来处理:
public void testExceptionHandle() {CompletableFuture.supplyAsync(() -> {throw new RuntimeException("supplyAsync excetion occurred...");}).handle((obj, e) -> {if (e != null) {System.out.println("thenApply executed, exception occurred...");}return obj;}).join();
}
再执行可以发现,即使前面环节出现异常,后面环节也可以继续处理,且可以拿到前一环节抛出的异常信息:
thenApply executed, exception occurred...
三、多个CompletableFuture组合操作
前面一直在介绍流水线式的处理场景,但是很多时候,流水线处理场景也不会是一个链路顺序往下走的情况,很多时候为了提升并行效率,一些没有依赖的环节我们会让他们同时去执行,然后在某些环节需要依赖的时候,进行结果的依赖合并处理,类似如下图的效果。
CompletableFuture
相比于Future
的一大优势,就是可以方便的实现多个并行环节的合并处理。相关涉及方法介绍归纳如下:
方法名称 | 方法描述 |
---|---|
thenCombine | 将两个CompletableFuture 对象组合起来进行下一步处理,可以拿到两个执行结果,并传给自己的执行函数进行下一步处理,最后返回一个新的CompletableFuture 对象。 |
thenAcceptBoth | 与thenCombine 类似,区别点在于thenAcceptBoth 传入的执行函数没有返回值,即thenAcceptBoth 返回值为CompletableFuture<Void> 。 |
runAfterBoth | 等待两个CompletableFuture 都执行完成后再执行某个Runnable 对象,再执行下一个的逻辑,类似thenRun。 |
applyToEither | 两个CompletableFuture 中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenApply 。 |
acceptEither | 两个CompletableFuture 中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenAccept 。 |
runAfterEither | 等待两个CompletableFuture 中任意一个执行完成后再执行某个Runnable 对象,可以理解为thenRun 的升级版,注意与runAfterBoth 对比理解。 |
allOf | 静态方法,阻塞等待所有给定的CompletableFuture 执行结束后,返回一个CompletableFuture<Void> 结果。 |
anyOf | 静态方法,阻塞等待任意一个给定的CompletableFuture 对象执行结束后,返回一个CompletableFuture<Void> 结果。 |
四、结果等待与获取
在执行线程中将任务放到工作线程中进行处理的时候,执行线程与工作线程之间是异步执行的模式,如果执行线程需要获取到共工作线程的执行结果,则可以通过get
或者join
方法,阻塞等待并从CompletableFuture
中获取对应的值。
对get
和join
的方法功能含义说明归纳如下:
方法名称 | 作用描述 |
---|---|
get() | 等待CompletableFuture 执行完成并获取其具体执行结果,可能会抛出异常,需要代码调用的地方手动try...catch 进行处理。 |
get(long, TimeUnit) | 与get() 相同,只是允许设定阻塞等待超时时间,如果等待超过设定时间,则会抛出异常终止阻塞等待。 |
join() | 等待CompletableFuture 执行完成并获取其具体执行结果,可能会抛出运行时异常,无需代码调用的地方手动try...catch 进行处理。 |
从介绍上可以看出,两者的区别就在于是否需要调用方显式的进行try…catch处理逻辑,使用代码示例如下:
public void testGetAndJoin(String product) {// join无需显式try...catch...PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)).join();try {// get显式try...catch...PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)).get(5L, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}
}
五、CompletableFuture方法及其Async版本
我们在使用CompletableFuture
的时候会发现,有很多的方法,都会同时有两个以Async
命名结尾的方法版本。以前面我们用的比较多的thenCombine
方法为例:
thenCombine(CompletionStage, BiFunction)
thenCombineAsync(CompletionStage, BiFunction)
thenCombineAsync(CompletionStage, BiFunction, Executor)
从参数上看,区别并不大,仅第三个方法入参中多了线程池Executor
对象。看下三个方法的源码实现,会发现其整体实现逻辑都是一致的,仅仅是使用线程池这个地方的逻辑有一点点的差异:
有兴趣的可以去翻一下此部分的源码实现,这里概括下三者的区别:
thenCombine
方法,沿用上一个执行任务所使用的线程池进行处理thenCombineAsync
两个入参的方法,使用默认的ForkJoinPool线程池中的工作线程进行处理themCombineAsync
三个入参的方法,支持自定义线程池并指定使用自定义线程池中的线程作为工作线程去处理待执行任务。
为了更好的理解下上述的三个差异点,我们通过下面的代码来演示下:
用法1:其中一个supplyAsync
方法以及thenCombineAsync
指定使用自定义线程池,另一个supplyAsync
方法不指定线程池(使用默认线程池)
public PriceResult getCheapestPlatAndPrice4(String product) {// 构造自定义线程池ExecutorService executor = Executors.newFixedThreadPool(5);returnCompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product), executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),this::computeRealPrice,executor).join();
}
对上述代码实现策略的解读,以及与执行结果的关系展示如下图所示,可以看出,没有指定自定义线程池的supplyAsync
方法,其使用了默认的ForkJoinPool
工作线程来运行,而另外两个指定了自定义线程池的方法,则使用了自定义线程池来执行。
用法2: 不指定自定义线程池,使用默认线程池策略,使用thenCombine
方法
public PriceResult getCheapestPlatAndPrice5(String product) {returnCompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),this::computeRealPrice).join();
}
执行结果如下,可以看到执行线程名称与用法1示例相比发生了变化。因为没有指定线程池,所以两个supplyAsync
方法都是用的默认的ForkJoinPool
线程池,而thenCombine
使用的是上一个任务所使用的线程池,所以也是用的ForkJoinPool
。
14:34:27.815[ForkJoinPool.commonPool-worker-1|12]获取某夕夕上 Iphone13的价格
14:34:27.815[ForkJoinPool.commonPool-worker-2|13]获取某夕夕上 Iphone13的优惠
14:34:28.831[ForkJoinPool.commonPool-worker-2|13]获取某夕夕上 Iphone13的优惠完成: -5300
14:34:28.831[ForkJoinPool.commonPool-worker-1|12]获取某夕夕上 Iphone13的价格完成: 5399
14:34:28.831[ForkJoinPool.commonPool-worker-2|13]某夕夕最终价格计算完成:99
获取最优价格信息:【平台:某夕夕, 原价:5399, 折扣:0, 实付价:99】
-----执行耗时: 1083ms ------