异步回调
所谓异步回调,本质上就是多线程中线程的通信,如今很多业务系统中,某个业务或者功能调用多个外部接口,通常这种调用就是异步的调用。如何得到这些异步调用的结果自然也就很重要了。
Callable、Future、FutureTask
public class test implements Callable<Boolean>{public static void main(String[] args) {test a=new test();FutureTask futureTask=new FutureTask<>(a);new Thread(futureTask).start();Object su=null;try {su=futureTask.get();}catch (Exception e){e.printStackTrace();}System.out.println(su);}@Overridepublic Boolean call() throws Exception {return null;}
}
FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。通过FutureTask获取异步线程的执行结果,但是其调用get()方法获取异步结果时,主线程也会被阻塞。属于异步阻塞模式。异步阻塞模式属于主动模式的异步调用,异步回调属于被动模式的异步调用。Java中回调模式的标准实现类为CompletableFuture。由于此类出现时间比较晚,期间Guava和Netty等都提出了自己的异步回调模式API来使用。这里主要介绍CompletableFuture,其他的有时间后面再学习。
CompletableFuture
CompletableFuture实现Future和CompletionStage两个接口。此类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。主要方法如下所示:
runAsync和supplyAsync创建子任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
}
可以看出runAsync没有返回值,supplyAsync有返回值,此处用supplyAsync举例:
ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{return "你好,周先生";
},executor);
System.out.println(completableFuture.get());//输出你好,周先生
executor.shutdown();
上例中的线程池可以自己构造,如若不指定使用CompletableFuture中默认的线程池ForkJoinPool。
handle()方法统一处理异常和结果
//在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(null, fn);
}
//可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(asyncPool, fn);
}
//在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {return uniHandleStage(screenExecutor(executor), fn);
}
案例:
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{throw new RuntimeException("你好");
});
completableFuture.handle(new BiFunction<String,Throwable,String>(){@Overridepublic String apply(String s, Throwable throwable) {if(throwable==null){System.out.println("mei");;}else {System.out.println("出错了");}return "ok";}
});
异步任务的串行执行
主要方法为以下几种:thenApply()、thenAccept()、thenRun()和 thenCompose()。
thenApply()
此方法实现异步任务的串行化执行,前一个任务结果作为下一个任务的入参。
后一个任务与前一个任务在同一个线程中执行public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}//后一个任务与前一个任务不在同一个线程中执行public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(asyncPool, fn);}//后一个任务在指定的executor线程池中执行public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);}
其中泛型参数T:上一个任务所返回结果的类型。泛型参数U:当前任务的返回类型。
案例:
ExecutorService executor= Executors.newFixedThreadPool(10);CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,周先生";},executor).thenApplyAsync(new Function<String,String>() {@Overridepublic String apply(String s) {System.out.println(Thread.currentThread().getId());//13return "你好,毛先生";}});System.out.println(completableFuture.get());//输出你好,毛先生executor.shutdown();
thenRun()
此方法不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。而且没有返回值。
//后一个任务与前一个任务在同一个线程中执行public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}//后一个任务与前一个任务可以不在同一个线程中执行public CompletableFuture<Void> thenRunAsync(Runnable action) {return uniRunStage(asyncPool, action);}//后一个任务在executor线程池中执行public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {return uniRunStage(screenExecutor(executor), action);}
thenAccept()
使用此方法时一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。
//后一个任务与前一个任务在同一个线程中执行public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);}//后一个任务与前一个任务不在同一个线程中执行public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(asyncPool, other, action);}//后一个任务在指定的executor线程池中执行public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) {return biAcceptStage(screenExecutor(executor), other, action);}
thenCompose()
对两个任务进行串行的调度操作,第一个任务操作完成时,将其结果作为参数传递给第二个任务。
//后一个任务与前一个任务在同一个线程中执行public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);}//后一个任务与前一个任务不在同一个线程中执行public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(asyncPool, fn);}//后一个任务在指定的executor线程池中执行public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}
thenCompose()方法第二个任务的返回值是一个CompletionStage异步实例。
ExecutorService executor= Executors.newFixedThreadPool(10);CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,周先生";},executor).thenComposeAsync(new Function<String,CompletableFuture<String>>(){@Overridepublic CompletableFuture<String> apply(String s) {return CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,毛先生";});}});System.out.println(completableFuture.get());//输出你好,毛先生executor.shutdown();
异步任务的合并执行
主要实现为以下几个方法:thenCombine()、runAfterBoth()、
thenAcceptBoth()。
thenCombine()
thenCombine()会在两个CompletionStage任务都执行完成后,一块来处理两个任务的执行结果。如果要合并多个任务,可以使用allOf()。
//合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStagepublic <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);}//不一定在同一个线程中执行第三步任务的CompletionStage实例public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(asyncPool, other, fn);}//第三步任务的CompletionStage实例在指定的executor线程池中执行public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) {return biApplyStage(screenExecutor(executor), other, fn);}
其中泛型参数T:表示第一个任务所返回结果的类型。泛型参数U:表示第二个任务所返回结果的类型。泛型参数V:表示第三个任务所返回结果的类型。
案例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,周先生";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,毛先生";});CompletableFuture<String> future3 = future1.thenCombine(future2, new BiFunction<String, String, String>(){@Overridepublic String apply(String s, String s2) {return s+"-----"+s2;}});String s = future3.get();System.out.println(s);//你好,周先生-----你好,毛先生
而runAfterBoth()方法不关注每一步任务的输入参数和输出参数,thenAcceptBoth()中第三个任务接收第一个和第二个任务的结果,但是不返回结果。
异步任务的选择执行
若异步任务的选择执行不是按照某种条件进行选择的,而按照执行速度进行选择的:前面两并行任务,谁的结果返回速度快,其结果将作为第三步任务的输入。对两个异步任务的选择可以通过CompletionStage接口的applyToEither()、acceptEither()等方法来实现。
applyToEither()
//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(null, other, fn);}//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,不一定在同一个线程中执行fn回调函数public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(asyncPool, other, fn);}//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,在指定线程池执行fn回调函数public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor) {return orApplyStage(screenExecutor(executor), other, fn);}
案例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "你好,周先生";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,毛先生";});CompletableFuture<String> future3 = future1.applyToEither(future2, new Function<String, String>(){@Overridepublic String apply(String s) {return s;}});String s = future3.get();System.out.println(s);//你好,毛先生