【Flink源码分析】5. Flink1.19源码分析-异步编程(CompletableFuture)

5 CompletableFuture

  1. 实现异步编排;
  2. 获取异步任务执行的结果。

CompletableFuture提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、异步任务回调、多个任务组合处理等方面。

5.1 supplyAsync 方法

supplyAsync 执行 CompletableFuture 任务,有返回值。

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}

5.1.1 supplyAsync方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SupplyAsyncDemo {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);/**supplyAsync 异步执行任务,任务有返回值*/CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() ->{System.out.println(Thread.currentThread().getName());return "你好";},executorService);/**supplyAsync 异步执行任务,任务有返回值*/CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() ->{System.out.println(Thread.currentThread().getName());return "你好1";});System.out.println(task1.get());System.out.println(task2.get());}}

5.1.2 执行结果

在这里插入图片描述

5.2 runAsync方法

runAsync 执行 CompletableFuture 任务,没有返回值。

    //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}//自定义线程,根据supplier构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}

5.2.1 runAsync方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class RunAsyncDemo {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);/**runAsync 异步执行任务,任务没有返回值*/CompletableFuture<Void> task1 = CompletableFuture.runAsync(() ->{System.out.println(Thread.currentThread().getName());},executorService);CompletableFuture<Void> task2 = CompletableFuture.runAsync(() ->{System.out.println(Thread.currentThread().getName());});System.out.println(task1.get());System.out.println(task2.get());}}

5.2.2 执行结果

在这里插入图片描述

5.3 then* 方法说明

  • 调用then*方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池;
  • 调用then*Async执行第二个任务时,则第一个任务使用的是自己传入的线程池,第二个任务使用的是ForkJoin线程池;
  • ThenAcceptThenAcceptAsyncthenApplythenApplyAsyncthenRunthenRunAsync区别都一样;

thenApplythenApplyAsync为例:

  • 前一个任务执行完,thenApply执行的时候,则main函数线程来执行thenApply
  • 前一个任务没有执行完,则执行前一个任务的线程来执行thenApply这个任务;
  • 这样才能保证顺序。

5.3.1 then 方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;public class ThenDemoOrder {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);Supplier<String> task1 = () ->{System.out.println("1."+Thread.currentThread().getName());return "task1";};Function<String,String> task2 = s -> {System.out.println("2."+Thread.currentThread().getName());return "task2";};/** 创建异步任务*/CompletableFuture<String> future = CompletableFuture.supplyAsync(task1,executorService);//Thread.sleep(1000);/** then* 调用会使用前面任务的线程池 */future.thenApply(task2);//Thread.sleep(1000);System.out.println("haha");}
}

5.3.2 执行结果

在这里插入图片描述

5.4 thenApply

  public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}

第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

有入参,有返回值

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class FutureThenApplyDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{System.out.println("原始CompletableFuture方法任务");return "1.supplyAsync";});CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {return a+ ",2.thenApply";});System.out.println(thenApplyFuture.get());}
}

执行结果:

在这里插入图片描述

5.5 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

有入参,没有返回值

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class FutureThenAcceptDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{return "1.supplyAsync";});CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {System.out.println(a);});System.out.println(thenAcceptFuture.get());}
}

执行结果:

在这里插入图片描述

5.6 thenRun

public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}

执行完第一个任务后,执行第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。

无入参,无返回值

package com.demo.annn;
import java.util.concurrent.CompletableFuture;
public class FutureThenRunDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{System.out.println("1.supplyAsync");return "1.supplyAsync";});CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {System.out.println("2.thenRun");});System.out.println(thenRunFuture.get());}
}

执行结果:

在这里插入图片描述

5.7 thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);
}

thenCompose用于链接两个异步操作,其中第二个操作依赖于第一个操作的结果。thenCompose方法接受一个函数作为参数,该函数接受第一个CompletableFuture的结果,并返回一个新的CompletableFuture
thenCompose的主要优势在于,它允许你以流程的方式组合多个异步操作,并将他们链接在一起,从而创建一个异步操作链。这对于构建复杂的异步逻辑非常有用。
用途:组合多个异步操作,创建一个异步操作链。返回新的CompletableFuture可以继续调用下去。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class ThenComposeDemo {public static void main(String[] args) throws Exception {/** 创建一个异步任务 */CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "First operation result";});/** 接收future1中异步任务的运行结果 ,内部再次创建异步任务返回新的CompletableFuture*/CompletableFuture<String> future2 = future1.thenCompose(result1 -> {// 这个lambda表达式将使用future1的结果,并返回一个新的CompletableFuturereturn CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 第二个异步操作,也模拟耗时} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Second operation result based on " + result1;});});// 获取最终的异步操作结果String finalResult = future2.join();System.out.println(finalResult); // 输出: Second operation result based on First operation result}
}

执行结果:

在这里插入图片描述

注意:thenComposethenApplythenApplyAsync方法有些相似,但它们之间存在关键差异。thenApplythenApplyAsync接收一个函数,该函数应用于前一个CompletableFuture的结果,并立即返回一个新值(而不是另一个CompletableFuture)。而thenCompose则允许你链接一个返回CompletableFuture的函数,从而可以构建更复杂的异步操作链。

thenCompose任务允许传进去一个CompletableFuture类型的函数。

5.8 thenCombine

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);
}
  • 它用于组合两个CompletableFuture对象的结果,并应用一个函数来生成一个新的结果。当两个CompletableFuture对象都完成时,thenCombine方法将这两个结果作为参数传递给提供的函数,并返回一个新的CompletableFuture对象,该对象包含函数的结果。
  • thenCombine的主要用途是当你有两个异步操作,并且想要基于这两个操作的结果来执行某些操作时。他允许你以一种声明性的方式来表达这种依赖关系,并且以异步的方式处理结果。

总结:组合两个任务结果,两个任务结果作为参数传递给Combine函数,并返回一个新的CompletableFuture,用途基于两个任务操作结果执行某些操作的时候用。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class ThenCombineDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {// 这里的lambda表达式将接收两个异步操作的结果,并返回一个新的结果return "The sum is: " + (result1 + result2);});// 获取最终的组合结果String finalResult = combinedFuture.join();System.out.println(finalResult); // 输出: The sum is: 30}
}

在这里插入图片描述

注意:

  • thenCombinethenCompose方法的主要区别在于他们处理结果的方式。
  • thenCombine直接应用一个函数来组合两个结果,并返回一个新的CompletableFuture
  • thenCompose则允许你链接一个返回CompletableFuture的函数,以构建更复杂的异步操作链。

5.9 thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);
}

它用于当两个CompletableFuture对象都完成时,执行一个接受这两个结果的操作,但不返回任何结果(即返回void)。thenAcceptBoth允许你执行一个其他操作,例如记录日志、更新UI等,而不关心操作的返回值。

总结:两个CompletableFuture对应的任务都完成时,执行一个接收两个结果的操作,但是没有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class AcceptBothDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});CompletableFuture<Void> combinedFuture = future1.thenAcceptBoth(future2, (result1, result2) -> {// 当两个future都完成时,执行这里的操作System.out.println("Future 1 result: " + result1);System.out.println("Future 2 result: " + result2);System.out.println("add result: " + result2 + result1);});// 获取最终的组合结果// 主线程等待,防止程序立即退出Thread.sleep(2000);}
}

在这里插入图片描述

5.10 runAfterBoth

    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) {return biRunStage(null, other, action);}

它用于两个CompletableFuture对象都完成(无论是正常完成还是异常完成)之后执行某个动作。与thenAcceptBoth不同,runAfterBoth不接受任何结果作为参数,并且也不返回任何值(返回void)。它纯粹用于执行一个需要在两个异步操作完成后执行其他操作。

总结:两个CompletableFuture对应的任务都完成时,执行一个一系列的操作,但不接受参数值,也没有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class RunAfterBothDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});future1.runAfterBoth(future2, () -> {// 当两个future都完成时,执行这里的操作System.out.println("Both futures have completed.");});// 主线程等待,防止程序立即退出Thread.sleep(2000);}
}

在这里插入图片描述

5.11 runAfterEither

    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) {return orRunStage(null, other, action);}

它用于两个CompletableFuture对象中的任意一个完成时执行某个动作。这个方法接受两个参数:另一个CompletableFuture对象和一个Runnable 。当这两个CompletableFuture对象中的任意一个完成时(无论正常完成还是异常完成),Runnable 中的代码将会被执行。

两个异步操作,任何一个完成了就会走runAfterEither定义的逻辑,没有参数,也没有返回值。

runAfterEither与runAfterBoth的主要区别在于,它不需要两个CompletableFuture都完成;只需要其中一个完成就会执行Runnable 。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class RunAfterEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);System.out.println("1000");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);System.out.println("500");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});future1.runAfterEither(future2, () -> {// 当两个future中的任意一个完成时,执行这里的操作System.out.println("One of the futures has completed.");});// 主线程等待,防止程序立即退出Thread.sleep(1500);}
}

在这里插入图片描述

5.12 applyToEither

    public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(null, other, fn);}

runAfterEither类似,CompletableFuture类也提供了一个applyToEither方法。applyToEither方法也是用于在两个CompletableFuture对象中的任意一个完成时执行一个操作,但与runAfterEither不同的是,applyToEither允许你提供一函数(Function),该函数将应用于已完成CompletableFuture的结果(如果有的话),并返回一个新的CompletableFuture

两个异步操作,任何一个完成了就会走applyToEither定义的逻辑,有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.function.Function;public class ApplyToEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});// 当任意一个future完成时,应用一个函数并返回一个新的CompletableFutureCompletableFuture<String> resultFuture = future1.applyToEither(future2, Function.identity());// 处理结果resultFuture.thenAccept(result -> {System.out.println("Result: " + result);// 在这里进行后续操作});// 主线程等待,防止程序立即退出resultFuture.join();}
}

在这里插入图片描述

5.12 acceptEither

    public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {return orAcceptStage(null, other, action);}

CompletableFuture类中的acceptEither方法与applyToEither方法类似,但是他们的使用场景和目的有所不同。acceptEither用于在任意一个CompletableFuture完成时执行一个动作,但这个操作是一个Consumer函数,它接受已完成CompletableFuture的结果作为参数,但不返回任何值或新的CompletableFuture

总结: 类似,区别acceptEither接收ConsumerapplyToEither接收 Function

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class AcceptEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});// 当任意一个future完成时,使用Consumer函数处理结果CompletableFuture<Void> resultFuture = future1.acceptEither(future2, (result) -> {System.out.println("Result: " + result);// 在这里进行基于结果的操作,但不返回新的CompletableFuture});// 主线程等待,防止程序立即退出CompletableFuture.anyOf(future1, future2).join();}
}

在这里插入图片描述

5.13 get()、get(long timeout, TimeUnit unit)、getNow

public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);
}public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {Object r;long nanos = unit.toNanos(timeout);return reportGet((r = result) == null ? timedGet(nanos) : r);
}public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
  • get 阻塞等待结果
  • get(long timeout, TimeUnit unit)超时等待结果
  • getNow立刻获取结果,如果任务没有执行完则返回 valueIfAbsent
package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class GetDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {//模拟一个耗时操作try {TimeUnit.SECONDS.sleep(2);}catch (InterruptedException e){Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "5";});//等待异步计算完成,并获取结果String result = future.get();System.out.println("异步计算的结果是:"+result);String result2 = future.get(3,TimeUnit.SECONDS);System.out.println("异步计算的结果2是:"+result2);Thread.sleep(4000);String result3 = future.getNow("valueIfAbsent");System.out.println("异步计算的结果3是:"+result3);}}

在这里插入图片描述

5.14 exceptionally

    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);}

用于处理异步任务执行过程中出现的异常。当CompletableFuture计算的异步任务抛出异常时,你可以使用exceptionally方法来提供一个函数,该函数会接收异常作为参数,并返回一个新的结果或抛出一个新的异常。
exceptionally方法非常有用,因为它允许你在异步任务失败时执行一些特定的逻辑,而不是让异常直接传播到调用者。这样,你可以控制异常的处理方式,比如记录日志、返回默认值或抛出更具体的异常。

总结:抛出异常的时候exceptionally定义一个函数接收异常,好处是异步任务失败的时候可以做一些特定逻辑,而不是直接返回给调用者。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class ExceptionallyDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("异步任务失败");});CompletableFuture<String> exceptionalFuture = future.exceptionally(throwable -> {// 处理异常,返回一个新的结果或抛出新的异常System.err.println("捕获到异常: " + throwable.getMessage());return "回退结果";});try {String result = exceptionalFuture.get();System.out.println("最终结果是: " + result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

在这里插入图片描述

5.15 completeExceptionally

    public boolean completeExceptionally(Throwable ex) {if (ex == null) throw new NullPointerException();boolean triggered = internalComplete(new AltResult(ex));postComplete();return triggered;}

CompletableFuturecompleteExceptionally方法用于在异步计算完成之前,以异常的方式标记这个CompletableFuture为已完成状态。这意味着任何等待这个CompletableFuture完成的操作(例如通过get()方法)将会立即抛出这个异常。

总结:程序执行结束的时候,通过completeExceptionally来标记任务为已完成的状态并且抛出异常。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CmpleteExceptionallyDemo {public static void main(String[] args) {CompletableFuture<String> future = new CompletableFuture<>();// 模拟异步操作失败,使用 completeExceptionally 来标记 future 为完成状态并附带异常future.completeExceptionally(new RuntimeException("异步操作失败"));try {// 尝试获取 future 的结果,将会抛出 ExecutionException,因为 future 是以异常完成的String result = future.get();System.out.println(result);} catch (InterruptedException | ExecutionException e) {// 打印出原始异常信息e.printStackTrace();}}
}

在这里插入图片描述

5.16 whenComplete

    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);}

用于在异步操作完成后执行一个回调函数。无论异步操作时正常完成还是抛出异常,whenComplete都会执行其提供的回调函数。这个方法非常有用,它允许你在异步任务结束时进行清理工作、记录日志、处理结果或异常,而不会阻塞主线程。

总结:无论任务是否抛出异常都会调用whenComplete回调函数,Flink内部经常用来关闭资源等操作。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class WhenCompleteDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result of the async operation";});future.whenComplete((result, throwable) -> {if (throwable != null) {// 处理异步操作抛出的异常System.err.println("Async operation failed: " + throwable.getMessage());} else {// 处理异步操作的结果System.out.println("Async operation completed successfully with result: " + result);}});// 主线程等待异步任务完成future.get();}
}

在这里插入图片描述

5.17 allOf

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);}

所有任务都执行后才执行下一个任务。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class AllOfFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {try {Thread.sleep(1000); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}System.out.println("Future 1 completed");});CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {try {Thread.sleep(2000); // 模拟另一个耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}System.out.println("Future 2 completed");});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);// 阻塞直到所有future都完成allFutures.get();System.out.println("All futures completed");}
}

在这里插入图片描述

5.18 anyOf

    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {return orTree(cfs, 0, cfs.length - 1);}

任意一个任务执行后就可以执行下一个任务了。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class AnyOfFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000); // 模拟耗时操作System.out.println(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000); // 模拟另一个耗时操作System.out.println(2000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1.toCompletableFuture(), future2.toCompletableFuture());// 阻塞直到任意一个future完成Object result = anyFuture.get();System.out.println("A future completed with result: " + result);}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/15957.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW在呼吸机测试气体容量计算

在呼吸机测试中&#xff0c;精确测量气体容量变化是评估设备性能的关键步骤。通过监测呼吸机气道内的压力变化&#xff0c;并结合流阻和肺顺应性等参数&#xff0c;可以计算出单位时间内的气体容量变化。本案例基于LabVIEW实现该计算过程&#xff0c;以确保测试数据的准确性和一…

Lombok使用指南

引言 lombok作为减少我们代码量的利器&#xff0c;本文将列举常用的几个注解&#xff0c;来帮助减少代码量 注解及其功能 Getter 和 Setter import lombok.Getter; import lombok.Setter;Getter Setter public class Person {private String name;private int age; } …

JAVA学习第一天

String的构造方法-118 String创建对象的特点——119 String字符串的比较——120 字符串的遍历——122 两个函数&#xff1a;length&#xff08;&#xff09;&#xff0c;charAt&#xff08;&#xff09; StringBuilder——127 String的内容是不可变的 StringBuilder的内容是可变…

sqli-lab靶场学习(六)——Less18-22(User-Agent、Referer、Cookie注入)

前言 前面的关卡&#xff0c;都是直接在输入框或者浏览器的地址栏上做文章即可。但本文这几关&#xff0c;需要用工具拦截请求修改请求头部才行。 Less18&#xff08;User-Agent注入&#xff09; 本关的注入点在User-Agent。我们在用户名和密码框中输入admin/admin后&#xf…

Spring依赖注入方式

写在前面&#xff1a;大家好&#xff01;我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正&#xff0c;感谢大家的不吝赐教。我的唯一博客更新地址是&#xff1a;https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油&#xff0c;冲鸭&#x…

arcgis界址点编号工具开发原理(西北角顺时针)

arcgis界址点编号工具开发原理&#xff08;西北角顺时针&#xff09; 1、工具实现思路。寻找离包络矩形左顶角最近的点作为起点。如下图&#xff1a;距离包络矩形左顶角最近的点&#xff0c;作为J1点没有任何问题。 问题在于并不是所有的地块&#xff0c;都这么中规中矩、合情…

分布式服务框架 如何设计一个更合理的协议

1、概述 前面我们聊了如何设计一款分布式服务框架的问题&#xff0c;并且编码实现了一个简单的分布式服务框架 cheese, 目前 cheese 基本具备分布式服务框架的基本功能。后面我们又引入了缓存机制&#xff0c;以及使用Socket替代了最开始的 RestTemplate。并且还学习了网络相关…

生信云服务器:让生物信息学分析更高效、更简单【附带西柚云优惠码】

随着生物信息学的快速发展&#xff0c;基因组测序、单细胞分析等复杂任务逐渐成为研究者们的日常工作。然而&#xff0c;个人电脑在处理这些任务时往往面临性能瓶颈&#xff0c;如内存不足、运算速度慢等问题&#xff0c;导致分析任务频繁失败或崩溃。为了解决这一难题&#xf…

[AUTOSAR通信] - PDUR模块解读

点击订阅专栏不迷路 文章目录 一、 PDUR模块概述二、功能描述2.1 发送路由功能2.2 接收路由功能2.3 网关路由功能2.4 路由控制功能 三、配置項介紹3.1. PduRBswModules3.2. PduRGeneral3.3. PduRRoutingTables3.4. PduRRoutingPath3.5. PduRSrcPdu3.6. PduRDestPdu 四、总结 &g…

分治下的快速排序(典型算法思想)—— OJ例题算法解析思路

目录 一、75. 颜色分类 - 力扣(LeetCode) 运行代码: 一、算法核心思想 二、指针语义与分区逻辑 三、操作流程详解 四、数学正确性证明 五、实例推演(数组[2,0,2,1,1,0]) 六、工程实践优势 七、对比传统实现 八、潜在问题与解决方案 九、性能测试数据 十、扩展…

分层耦合 - IOC详解

推荐使用下面三种, 第一种多用于其他类 声明bean的时候&#xff0c;可以通过value属性指定bean的名字&#xff0c;如果没有指定&#xff0c;默认为类名首字母小写。 使用以上四个注解都可以声明bean&#xff0c;但是在springboot集成web开发中&#xff0c;声明控制器bean只能用…

PDF Shaper:免费多功能 PDF 工具箱,一站式满足您的 PDF 需求!

​PDF Shaper 是一款功能强大且完全免费的 PDF 工具箱&#xff0c;它几乎涵盖了日常 PDF 操作的方方面面&#xff0c;无论是转换、编辑还是处理&#xff0c;都能轻松搞定。以下是这款软件的详细介绍&#xff1a; 功能丰富&#xff0c;一应俱全 PDF 转换功能强大 PDF 转 Word&am…

未来替代手机的产品,而非手机的本身

替代手机的产品包括以下几种&#xff1a; 可穿戴设备&#xff1a;智能手表、智能眼镜等可穿戴设备可以提供类似手机的功能&#xff0c;如通话、信息推送、浏览网页等。 虚拟现实&#xff08;VR&#xff09;技术&#xff1a;通过佩戴VR头显&#xff0c;用户可以进行语音通话、发…

deepseek+“D-id”或“即梦AI”快速生成短视频

1、deepseek生成视频脚本 1.1、第一步&#xff1a;使用通用模板提出需求&#xff0c;生成视频脚本 对话输入示例脚本1&#xff1a; 大年初五是迎财神的日志&#xff0c;帮我生成10秒左右的短视频&#xff0c; 体现一家3口在院子里欢庆新年&#xff0c; 孩子在院子里放鞭炮烟…

在CT107D单片机综合训练平台上实现外部中断控制LED闪烁

引言 在单片机开发中&#xff0c;外部中断是一个非常重要的功能&#xff0c;它可以让单片机在检测到外部信号变化时立即做出响应。本文将详细介绍如何在CT107D单片机综合训练平台上使用外部中断来控制LED灯的闪烁。我们将使用两种不同的方式来实现这一功能&#xff1a;一种是在…

为什么推荐使用 LabVIEW 开发

在仪器行业的软件开发中&#xff0c;LabVIEW 以其图形化编程、快速原型开发、高效硬件集成的优势&#xff0c;成为自动化测试和控制系统的理想选择。尽管一些工程师仍然坚持使用 C 语言&#xff0c;但这更多是出于习惯&#xff0c;而非技术上的必然。LabVIEW 不仅支持 NI 硬件&…

力扣1448. 统计二叉树中好节点的数目

Problem: 1448. 统计二叉树中好节点的数目 文章目录 题目描述思路复杂度Code 题目描述 思路 对二叉树进行先序遍历&#xff0c;边遍历边对比并更新当前路径上的最大值pathMax&#xff0c;若当pathMax小于等于当前节点值&#xff0c;则好节点的数目加一 复杂度 时间复杂度: O (…

DeepSeek帮助做【真】软件需求-而不是批量刷废话

尝试给DeepSeek一份系统用例规约&#xff0c;让它帮判断哪些地方还没有覆盖涉众利益。结果见以下 需求工作的重点可以放在建模精细的真实现状流程和精细的真实涉众利益上&#xff0c;AI帮助推演系统需求。

【JVM详解五】JVM性能调优

示例&#xff1a; 配置JVM参数运行 #前台运行 java -XX:MetaspaceSize-128m -XX:MaxMetaspaceSize-128m -Xms1024m -Xmx1024m -Xmn256m -Xss256k -XX:SurvivorRatio8 - XX:UseConcMarkSweepGC -jar /jar包路径 #后台运行 nohup java -XX:MetaspaceSize-128m -XX:MaxMetaspaceS…

Qt文本处理【正则表达式】示例详解:【QRegularExpression】

在 Qt 中&#xff0c;正则表达式是处理文本的强大工具&#xff0c;它能够帮助我们匹配、搜索和替换特定的字符串模式。自 Qt 5 起&#xff0c;QRegularExpression 类提供了对 ECMAScript 标准的正则表达式支持&#xff0c;这使得它在处理各种复杂的字符串任务时变得更加高效和灵…