supplyAsync
supplyAsync是创建带有返回值的异步任务。有两个方法:
1.带返回值异步请求,默认线程池:supplyAsync(Supplier supplier)
2.带返回值异步请求,自定义线程池:supplyAsync(Supplier supplier, Executor executor)
@SneakyThrowspublic static void main(String[] args) {
// supplyAsync1();supplyAsync2();}/*** 带返回值异步请求,默认线程池,ForkJoinPool.commonPool()*/public static void supplyAsync1() throws Exception {Map<String, Object> map = new HashMap<>();CompletableFuture<Map<String, Object>> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("do something...");map.put("name", "helele");return map;});//等待任务完成获取返回值,会阻塞线程System.out.println("result==>" + completableFuture.get().get("name"));}/*** 带返回值异步请求,自定义线程池*/public static void supplyAsync2() throws ExecutionException, InterruptedException, TimeoutException {//实际开发中不要这么创建线程ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("do something...");ThreadUtil.sleep(5000);return "result";}, executorService);/*** get():获取异步线程执行完成的结果,会阻塞线程,需要抛出异常* get(long timeout, TimeUnit unit):设定一个等待时间,在设定的时间内没有获取到就报错,需要抛出异常* join():同get()功能一样,不需要主动抛出异常* getNow(T valueIfAbsent):获取异步线程的返回结果,在主程序走到getNow方法的时候,异步如果执行完成了就返回异步线程的结果,如果异步线程没有执行完也不会报错,会返回一个托底的值* complete(T value):先判断get()是否能获取到异步线程里面返回的值,如果能就返回true,不能就返回false并将线程返回值设置为complete的方法值* completeExceptionally:如果任务没有完成就抛出指定的异常*/System.out.println("join result==>" + completableFuture.join());System.out.println("get result==>" + completableFuture.get());System.out.println("complete result==>" + completableFuture.complete("complete"));System.out.println("getT result1==>" + completableFuture.get(3, TimeUnit.SECONDS));System.out.println("getNow result==>" + completableFuture.getNow("helele"));System.out.println("completeExceptionally result==>" + completableFuture.completeExceptionally(new RuntimeException("异常...")));//和supplyAsync测试无关,只是停止ExecutorService/*** shutdown:停止接收新任务,原来的任务继续执行,但不保证任务一定会执行完毕* shutdownNow:尝试停止所有正在执行的任务, 停止对正在等待执行的任务的处理, 并且返回正在等待执行的任务* awaitTermination(long timeOut, TimeUnit unit):方法是阻塞的, 阻塞到所有任务都完成(必须在shutdown调用之后)或者超时. 如果executor在超时之前终止了, 那么返回true, 否则返回false* 注意, 如果不在awaitTermination前调用shutdown, 则即使在超时之前所有任务都已经完成, awaitTermination仍然会等待着, 而且最后一定返回false, 因为没有shutdown的调用不会使executor的状态变为terminated*/executorService.shutdown();boolean b = executorService.awaitTermination(10, TimeUnit.SECONDS);System.out.println(b);}
runAsync
runAsync是创建没有返回值的异步任务。有两个方法:
1.不带返回值的异步请求,默认线程池: runAsync(Runnable runnable)
2.不带返回值的异步请求,自定义线程池:runAsync(Runnable runnable, Executor executor)
/*** runAsync是创建没有返回值的异步任务。有两个方法:* 1.不带返回值的异步请求,默认线程池: runAsync(Runnable runnable)* 2.不带返回值的异步请求,自定义线程池:runAsync(Runnable runnable, Executor executor)*/@SneakyThrowspublic static void main(String[] args) {runAsync1();runAsync2();}/*** 不带返回值的异步请求,默认线程池,ForkJoinPool.commonPool()*/public static void runAsync1() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println("do something...");});/*** completableFuture.get() getNow()...* 具体说明看t1_SupplyAsync类解释*///打印nullSystem.out.println("get result=>" + completableFuture.get());}/*** 不带返回值的异步请求,自定义线程池*/public static void runAsync2() throws ExecutionException, InterruptedException {//自定义线程池,实际项目不建议这样创建ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println("do something...");}, executorService);//打印nullSystem.out.println("get result=>" + completableFuture.get());}
thenApply
thenApply表示某个任务执行完成后执行的动作,即回调方法,会将该任务的的执行结果作为入参传递到回调方法中,带有返回值 thenApply是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,且后续的thenApply也会使用主线程,其他情况会使用上一个任务的线程池
@SneakyThrowspublic static void main(String[] args) {thenApply();thenApplyAsync();}/*** [回调函数]* thenApply表示某个任务执行完成后执行的动作,即回调方法,会将该任务的的执行结果作为入参传递到回调方法中,带有返回值* thenApply是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,且后续的thenApply也会使用主线程,其他情况会使用上一个任务的线程池*/public static void thenApply() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " future1 do something");return 1;});CompletableFuture<Integer> future2 = future1.thenApply((future1Res) -> {System.out.println(Thread.currentThread().getName() + " future2 do something");/*** 这个future1Res会拿到future1的返回值,然后+1,最后future2返回2*/return future1Res + 1;});//任务1执行结果System.out.println("future1 get=>" + future1.get());//任务2执行结果System.out.println("future2 get=>" + future2.get());}/*** thenApplyAsync是thenApply的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void thenApplyAsync() throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(2);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");return 1;}, executor);CompletableFuture<Integer> future2 = future1.thenApplyAsync((result) -> {System.out.println(Thread.currentThread() + " future1 do something");return result + 1;}, executor);//任务1执行结果System.out.println("future1 get=>" + future1.get());//任务2执行结果System.out.println("future2 get=>" + future2.get());executor.shutdown();}
thenAccept
thenAccept表示某个任务执行完成后执行的动作,即在某个任务完成后执行的回调方法,之后会将该任务的执行结果作为入参传递到回调方法中,无返回值
thenAccept是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,且后续的thenAccept也会使用主线程,其他情况会使用上一个任务的线程池
@SneakyThrowspublic static void main(String[] args) {thenAccept();thenAcceptAsync();}/*** [回调函数]* thenAccept表示某个任务执行完成后执行的动作,即在某个任务完成后执行的回调方法,之后会将该任务的执行结果作为入参传递到回调方法中,无返回值* thenAccept是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,且后续的thenAccept也会使用主线程,其他情况会使用上一个任务的线程池*/public static void thenAccept() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " future1 do something");return 1;});//thenAccept无返回值CompletableFuture<Void> future2 = future1.thenAccept((future1Res) -> {System.out.println(Thread.currentThread().getName() + " future2 do something");System.out.println("future2中获取future1执行结果:" + future1Res);//无返回值});//等待任务1执行完成System.out.println("cf1结果->" + future1.get());//等待任务2执行完成System.out.println("cf2结果->" + future2.get());}/*** thenAcceptAsync是thenAccept的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void thenAcceptAsync() throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(2);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " future1 do something");return 1;}, executor);CompletableFuture<Void> future2 = future1.thenAcceptAsync((future1Res) -> {System.out.println(Thread.currentThread().getName() + " future2 do something");System.out.println("future2中获取future1执行结果:" + future1Res);}, executor);//等待任务1执行完成System.out.println("cf1结果->" + future1.get());//等待任务2执行完成System.out.println("cf2结果->" + future2.get());executor.shutdown();}
thenRun
thenRun表示某个任务完成后执行的动作,即回调方法,无入参,无返回值 thenRun是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,且后续的thenRun也会使用主线程,其他情况会使用上一个任务的线程池
@SneakyThrowspublic static void main(String[] args) {thenRun();thenRunAsync();}/*** [回调函数]* thenRun表示某个任务完成后执行的动作,即回调方法,无入参,无返回值* thenRun是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,且后续的thenRun也会使用主线程,其他情况会使用上一个任务的线程池*/public static void thenRun() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do complete");return 1;});//没有入参CompletableFuture future2 = future1.thenRun(() -> {System.out.println(Thread.currentThread() + " future2 do complete");//无返回值});//等待任务1执行完成System.out.println("cf1结果->" + future1.get());//等待任务2执行完成System.out.println("cf2结果->" + future2.get());}/*** thenRunAsync是thenRun的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void thenRunAsync() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do complete");return 1;});//没有入参CompletableFuture future2 = future1.thenRunAsync(() -> {System.out.println(Thread.currentThread() + " future2 do complete");//无返回值});//等待任务1执行完成System.out.println("cf1结果->" + future1.get());//等待任务2执行完成System.out.println("cf2结果->" + future2.get());}
whenComplete
whenComplete是当某个任务执行完成后执行的回调方法,然后将执行结果或者执行期间抛出的异常传递给回调方法 不论任务正常完成还是出现异常,都会调用whenComplete的回调方法 正常完成:whenComplete返回结果和上级任务结果一致
出现异常:whenComplete返回结果为null,异常为上级任务异常
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要处理该异常。
注意:get()获取到结果只是上级任务的结果,并不是该任务的返回值,whenComplete是没有返回值的。
@SneakyThrowspublic static void main(String[] args) {
// whenComplete();whenCompleteAsync();}/*** whenComplete是当某个任务执行完成后执行的回调方法,然后将执行结果或者执行期间抛出的异常传递给回调方法* 不论任务正常完成还是出现异常,都会调用whenComplete的回调方法 <br>* <span color="orange">正常完成:whenComplete返回结果和上级任务结果一致</span>* <br>* <span color="orange">出现异常:whenComplete返回结果为null,异常为上级任务异常</span>* 即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要处理该异常。* 注意:get()获取到结果只是上级任务的结果,并不是该任务的返回值,whenComplete是没有返回值的。*/public static void whenComplete() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 completed");//在任务1中制造一个异常
// int a = 1/0;return 1;});/*** future2的泛型只能是Integer,需要和future1一样*/CompletableFuture<Integer> future2 = future1.whenComplete((future1Res, exception) -> {System.out.println(Thread.currentThread() + " future2 completed");System.out.println("future1任务结果:" + future1Res);System.out.println("future1任务异常:" + exception);});//等待任务1执行完成System.out.println("cf1结果->" + future1.get());//等待任务2执行完成System.out.println("cf2结果->" + future2.get());}/*** whenCompleteAsync是whenComplete的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool* 注意:get()获取到结果只是上级任务的结果,并不是该任务的返回值,whenCompleteAsync是没有返回值的。*/public static void whenCompleteAsync() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 completed");//在任务1中制造一个异常int a = 1/0;return 1;});/*** future2的泛型只能是Integer,需要和future1一样*/CompletableFuture<Integer> future2 = future1.whenCompleteAsync((future1Res, exception) -> {System.out.println(Thread.currentThread() + " future2 completed");System.out.println("future1任务结果:" + future1Res);System.out.println("future1任务异常:" + exception);});//等待任务1执行完成System.out.println("cf1结果->" + future1.get());//等待任务2执行完成System.out.println("cf2结果->" + future2.get());}
handle
handle可以理解为带有返回值的whenComplete的,最终get()到的结果是handle自己return的而不是上级任务的
@SneakyThrowspublic static void main(String[] args) {
// handle();handleAsync();}/*** handle可以理解为带有返回值的whenComplete的,最终get()到的结果是handle自己return的而不是上级任务的*/public static void handle() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");//制造异常int a = 1/0;return 1;});CompletableFuture<Integer> future2 = future1.handle((future1Res, exception) -> {System.out.println(Thread.currentThread() + " future2 do something");System.out.println("上个任务结果:" + future1Res);System.out.println("上个任务异常:" + exception);return future1Res + 1;});System.out.println("future1结果:" + future1.get());System.out.println("future2结果:" + future2.get());}/*** handleAsync是handle的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void handleAsync() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");//制造异常int a = 1/0;return 1;});CompletableFuture<Integer> future2 = future1.handleAsync((future1Res, exception) -> {System.out.println(Thread.currentThread() + " future2 do something");System.out.println("上个任务结果:" + future1Res);System.out.println("上个任务异常:" + exception);return future1Res + 1;});// System.out.println("future1结果:" + future1.get());System.out.println("future2结果:" + future2.get());}
thenCombine
thenCombine会在两个CompletableFuture任务都执行完成后执行下阶段任务,可获取到两个任务的结果作为入参,且有返回值 可以将两个任务的结果合并然后返回合并的结果 注意:
1.两个任务只要有一个异常,则将该异常信息作为执行任务的执行结果
2.两个任务是并行执行,之间没有依赖顺序
@SneakyThrowspublic static void main(String[] args) {
// thenCombine();thenCombineAsync();}/*** thenCombine会在两个CompletableFuture任务都执行完成后执行下阶段任务,可获取到两个任务的结果作为入参,且有返回值* 可以将两个任务的结果合并然后返回合并的结果* 注意:* 1.两个任务只要有一个异常,则将该异常信息作为执行任务的执行结果* 2.两个任务是并行执行,之间没有依赖顺序*/public static void thenCombine() throws ExecutionException, InterruptedException {//任务1CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");int a = 1/0;return 1;});//任务2CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return 2;});//通过thenCombine处理任务1和任务2的结果CompletableFuture<Integer> future3 = future1.thenCombine(future2, (future1Result, future2Result) -> {System.out.println(Thread.currentThread() + " future3 do something");System.out.println("future1结果:" + future1Result);System.out.println("future2结果:" + future2Result);return future1Result + future2Result;});System.out.println("future3结果:" + future3.get());}/*** thenCombineAsync是thenCombine的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void thenCombineAsync() throws ExecutionException, InterruptedException {//任务1CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");
// int a = 1/0;return 1;});//任务2CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return 2;});//通过thenCombine处理任务1和任务2的结果CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (future1Result, future2Result) -> {System.out.println(Thread.currentThread() + " future3 do something");System.out.println("future1结果:" + future1Result);System.out.println("future2结果:" + future2Result);return future1Result + future2Result;});System.out.println("future3结果:" + future3.get());}
thenCompose
thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着业务逻辑上的先后顺序。
thenApply和thenCompose区别:
thenApply()转换的是泛型中的类型,相当于将CompletableFuture
转换生成新的CompletableFuture
thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
@SneakyThrowspublic static void main(String[] args) {
// thenCompose();thenComposeAsync();}/*** thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着业务逻辑上的先后顺序。** thenApply和thenCompose区别:* thenApply()转换的是泛型中的类型,相当于将CompletableFuture<T> 转换生成新的CompletableFuture<U>* thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture。*/public static void thenCompose() throws ExecutionException, InterruptedException {CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");return 1;}).thenCompose(future1Res -> CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return future1Res + 2;}));System.out.println("future1.get() = " + future1.get());}/*** thenComposeAsync是thenCompose的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void thenComposeAsync() throws ExecutionException, InterruptedException {CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");return 1;}).thenComposeAsync(future1Res -> CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return future1Res + 2;}));System.out.println("future1.get() = " + future1.get());}
thenAcceptBoth
thenAcceptBoth会在两个CompletableFuture任务都执行完成后执行下阶段任务,可获取到两个任务的结果作为入参,无返回值
注意:
1.两个任务只要有一个异常,则将该异常信息作为执行任务的执行结果
2.两个任务是并行执行,之间没有依赖顺序
@SneakyThrowspublic static void main(String[] args) {thenAcceptBoth();thenAcceptBothAsync();}/*** thenAcceptBoth会在两个CompletableFuture任务都执行完成后执行下阶段任务,可获取到两个任务的结果作为入参,无返回值* 注意:* 1.两个任务只要有一个异常,则将该异常信息作为执行任务的执行结果* 2.两个任务是并行执行,之间没有依赖顺序*/public static void thenAcceptBoth() throws ExecutionException, InterruptedException {CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future1 do something");return 1;});CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future2 do something");return 2;});//thenAcceptBoth无返回值CompletableFuture future3 = future1.thenAcceptBoth(future2, (future1Result, future2Result) -> {//可以内部消费future1、future2的结果System.out.println("future1结果:" + future1Result);System.out.println("future2结果:" + future2Result);});System.out.println("future1结果:" + future1.get());System.out.println("future2结果:" + future2.get());//会打印nullSystem.out.println("future3结果:" + future3.get());}/*** thenAcceptBothAsync是thenAcceptBoth的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void thenAcceptBothAsync() throws ExecutionException, InterruptedException {CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future1 do something");return 1;});CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future2 do something");return 2;});//thenAcceptBoth无返回值CompletableFuture future3 = future1.thenAcceptBothAsync(future2, (future1Result, future2Result) -> {//可以内部消费future1、future2的结果System.out.println("future1结果:" + future1Result);System.out.println("future2结果:" + future2Result);});//会打印nullSystem.out.println("future3结果:" + future3.get());}
runAfterBoth
runAfterBoth会在两个CompletableFuture任务都执行完成后执行下阶段任务,无入参,无返回值 注意:
1.两个任务只要有一个异常,则将该异常信息作为执行任务的执行结果
2.两个任务是并行执行,之间没有依赖顺序
@SneakyThrowspublic static void main(String[] args) {runAfterBoth();runAfterBothAsync();}/*** runAfterBoth会在两个CompletableFuture任务都执行完成后执行下阶段任务,无入参,无返回值* 注意:* * 1.两个任务只要有一个异常,则将该异常信息作为执行任务的执行结果* * 2.两个任务是并行执行,之间没有依赖顺序*/public static void runAfterBoth() throws ExecutionException, InterruptedException {CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future1 do something");return 1;});CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future2 do something");return 2;});//runAfterBoth无入参无返回值CompletableFuture future3 = future1.runAfterBoth(future2, () -> {System.out.println(Thread.currentThread() + "future3 do something");});System.out.println("future1结果:" + future1.get());System.out.println("future2结果:" + future2.get());//会打印nullSystem.out.println("future3结果:" + future3.get());}/*** runAfterBothAsync是runAfterBoth的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void runAfterBothAsync() throws ExecutionException, InterruptedException {CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future1 do something");return 1;});CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future2 do something");return 2;});//runAfterBothAsync无入参无返回值CompletableFuture future3 = future1.runAfterBothAsync(future2, () -> {System.out.println(Thread.currentThread() + "future3 do something");});System.out.println("future1结果:" + future1.get());System.out.println("future2结果:" + future2.get());//会打印nullSystem.out.println("future3结果:" + future3.get());}
applyToEither
将两个CompletableFuture任务组合起来处理,但和thenCombine thenAcceptBoth runAfterBoth不一样,applyToEither中只要有一个任务正常完成,就会进入下阶段任务
applyToEither会将先完成任务的执行结果作为所提供函数的参数,且该方法有返回值
@SneakyThrowspublic static void main(String[] args) {applyToEither();
// applyToEitherAsync();}/*** 将两个CompletableFuture任务组合起来处理,但和thenCombine thenAcceptBoth runAfterBoth不一样,applyToEither中只要有一个任务正常完成,就会进入下阶段任务* applyToEither会将先完成任务的执行结果作为所提供函数的参数,且该方法有返回值*/public static void applyToEither() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
// int a = 1/0;try {//模拟任务1耗时Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread() + "future1 do something");return 1;}).exceptionally(e -> 3);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {int a = 1/0;System.out.println(Thread.currentThread() + "future2 do something");return 2;});CompletableFuture<Integer> future3 = future1.applyToEither(future2, (res) -> {System.out.println(Thread.currentThread() + "future3 do something");/*** 因异常等因素导致的任务未正常执行完成,会获取不到任务的返回值,如果两个任务都异常了,两个任务的返回值就都会获取不到* 可以用exceptionally来处理,做一个托底的响应*///由于任务1会耗时,所以这里会先得到任务2的结果,然后返回System.out.println("接受到:" + res);return res;});System.out.println("future1结果:" + future1.get());System.out.println("future2结果:" + future2.get());System.out.println("future3结果:" + future3.get());}/*** applyToEitherAsync是applyToEither的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void applyToEitherAsync() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {try {//模拟任务1耗时Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread() + "future1 do something");return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "future2 do something");return 2;});CompletableFuture<Integer> future3 = future1.applyToEitherAsync(future2, (res) -> {System.out.println(Thread.currentThread() + "future3 do something");//由于任务1会耗时,所以这里会先得到任务2的结果,然后返回System.out.println("接受到:" + res);return res;});System.out.println("future1结果:" + future1.get());System.out.println("future2结果:" + future2.get());System.out.println("future3结果:" + future3.get());}
acceptEither
将两个CompletableFuture任务组合起来处理,但和thenCombine thenAcceptBoth runAfterBoth不一样,acceptEither中只要有一个任务正常完成,就会进入下阶段任务
acceptEither会将先完成任务的执行结果作为所提供函数的参数,该方法无返回值
@SneakyThrowspublic static void main(String[] args) {
// acceptEither();acceptEitherAsync();}/*** 将两个CompletableFuture任务组合起来处理,但和thenCombine thenAcceptBoth runAfterBoth不一样,acceptEither中只要有一个任务正常完成,就会进入下阶段任务* acceptEither会将先完成任务的执行结果作为所提供函数的参数,该方法无返回值*/public static void acceptEither() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");ThreadUtil.sleep(1, TimeUnit.SECONDS);int a = 1/0;return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return 2;});CompletableFuture future3 = future1.acceptEither(future2, (res) -> {System.out.println(Thread.currentThread() + " future3 do something");//future1 future2哪个先完成就获取到哪个结果//异常会影响最终结果的获取,需要exceptionally进行托底System.out.println("res = " + res);});//无返回值。future3.get()获取到nullSystem.out.println("future3.get() = " + future3.get());}/*** acceptEitherAsync是acceptEither的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void acceptEitherAsync() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");ThreadUtil.sleep(1, TimeUnit.SECONDS);int a = 1/0;return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return 2;});CompletableFuture future3 = future1.acceptEitherAsync(future2, (res) -> {System.out.println(Thread.currentThread() + " future3 do something");//future1 future2哪个先完成就获取到哪个结果//异常会影响最终结果的获取,需要exceptionally进行托底System.out.println("res = " + res);});//无返回值。future3.get()获取到nullSystem.out.println("future3.get() = " + future3.get());}
runAfterEither
将两个CompletableFuture任务组合起来处理,但和thenCombine thenAcceptBoth runAfterBoth不一样,runAfterEither中只要有一个任务正常完成,就会进入下阶段任务
runAfterEither没有入参,也没有返回值
@SneakyThrowspublic static void main(String[] args) {
// runAfterEither();runAfterEitherAsync();}/*** 将两个CompletableFuture任务组合起来处理,但和thenCombine thenAcceptBoth runAfterBoth不一样,runAfterEither中只要有一个任务正常完成,就会进入下阶段任务* runAfterEither没有入参,也没有返回值*/public static void runAfterEither() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return 2;});//无返回值,无入参CompletableFuture<Void> future3 = future1.runAfterEither(future2, () -> {System.out.println(Thread.currentThread() + " future3 do something");});//获取为nullSystem.out.println("future3.get() = " + future3.get());}/*** runAfterEitherAsync是runAfterEither的异步方法,可以指定自定义的线程池,如果未指定,默认使用ForkJoinPool.commonPool*/public static void runAfterEitherAsync() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return 2;});//无返回值,无入参CompletableFuture<Void> future3 = future1.runAfterEitherAsync(future2, () -> {System.out.println(Thread.currentThread() + " future3 do something");});//获取为nullSystem.out.println("future3.get() = " + future3.get());}
allOf
allOf是多个任务都完成后才会执行,只要有一个任务执行出现异常,则最终allOf返回的CompletableFuture执行get就会抛出异常,只有全部都正常执行完成,get会返回null
如果想要拿到多个任务里面的值,可以在外部定一个变量处理,比如:list
@SneakyThrowspublic static void main(String[] args) {allOf();}/*** allOf是多个任务都完成后才会执行,只要有一个任务执行出现异常,则最终allOf返回的CompletableFuture执行get就会抛出异常,只有全部都正常执行完成,get会返回null* 如果想要拿到多个任务里面的值,可以在外部定一个变量处理,比如:list*/public static void allOf() throws ExecutionException, InterruptedException {CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList();CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");
// int a = 1 / 0;list.add(1);//这里的return返回值对于allOf没用return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");//这里的return返回值对于allOf没用list.add(2);return 2;});CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future3 do something");//模拟任务耗时,需要等待这个任务完成后,allOf才能正常执行ThreadUtil.sleep(2, TimeUnit.SECONDS);list.add(3);//这里的return返回值对于allOf没用return 3;});CompletableFuture<Void> futureAll = CompletableFuture.allOf(future1, future2, future3);/*** 如果任务全都正常执行获取为null* 如果有任务异常,则抛出异常*/System.out.println("futureAll.get() = " + futureAll.get());//也可以这样//List<CompletableFuture<?>> futureList = new ArrayList<>();//futureList.add(future1);//futureList.add(future2);//futureList.add(future3);//CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();list.stream().forEach(System.out::println);}
anyOf
anyOf是多个任务中只要有一个任务完成后就会执行,且会拿到最先完成任务的返回值 只要有一个任务执行出现异常,则最终anyOf返回的CompletableFuture执行get就会抛出异常,这里抛出异常不表示某个任务出现异常就代表anyOf一定会出现异常
取决于某个任务抛出异常之前有没有别的任务已经正常执行完成了,如果正常任务完成比异常任务先完成,那么anyOf就会get到正常任务的值
@SneakyThrowspublic static void main(String[] args) {anyOf();}/*** anyOf是多个任务中只要有一个任务完成后就会执行,且会拿到最先完成任务的返回值* 只要有一个任务执行出现异常,则最终anyOf返回的CompletableFuture执行get就会抛出异常,这里抛出异常不表示某个任务出现异常就代表anyOf一定会出现异常* 取决于某个任务抛出异常之前有没有别的任务已经正常执行完成了,如果正常任务完成比异常任务先完成,那么anyOf就会get到正常任务的值*/public static void anyOf() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");int a = 1 / 0;ThreadUtil.sleep(2, TimeUnit.SECONDS);//模拟任务耗时return 1;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future2 do something");return 2;});CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future3 do something");//模拟任务耗时ThreadUtil.sleep(2, TimeUnit.SECONDS);return 3;});CompletableFuture<Object> futureAny = CompletableFuture.anyOf(future1, future2, future3);/*** 由于future1 future3任务都有耗时操作,所以最先完成的任务是future2,则最终anyOf的结果是future2的结果*/System.out.println("futureAll.get() = " + futureAny.get());}
exceptionally
在用CompletableFuture编写多线程时,如果需要处理异常,可以用exceptionally,它的作用相当于catch。 当出现异常时,会触发回调方法exceptionally
exceptionally中可指定默认返回结果,如果出现异常,则返回默认的返回结果,可对任务结果进行一个兜底
@SneakyThrowspublic static void main(String[] args) {exceptionally();}/*** 在用CompletableFuture编写多线程时,如果需要处理异常,可以用exceptionally,它的作用相当于catch。* 当出现异常时,会触发回调方法exceptionally* exceptionally中可指定默认返回结果,如果出现异常,则返回默认的返回结果,可对任务结果进行一个兜底*/public static void exceptionally() throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " future1 do something");int a = 1/0;return 1;}).exceptionally(exception -> {System.out.println("任务异常了");System.out.println(exception);return 2;});//get到exceptionally返回的值2System.out.println("future.get() = " + future.get());}