异步任务——CompletabelFuture

本专栏学习内容又是来自尚硅谷周阳老师的视频

有兴趣的小伙伴可以点击视频地址观看

在学习CompletableFuture之前,必须要先了解一下Future

Future

概念

Future接口(FutureTask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

比如主线程在执行多个任务时,其中有一个任务耗时特别久,可以交给子线程执行,主线程继续做自己的事情,过一会再去获取子任务的执行结果。

FutureTask

FutureTaskFuture的一个实现类

引出FutureTask

Future提供了一种异步并行计算的功能,目的是:异步多线程任务执行且返回有结果,三个特点

  • 多线程
  • 有返回
  • 异步任务

说句题外话,小黄一直不了解多线程和异步有什么区别,刚好问了ChatGpt

多线程和异步是两个相关但又不完全相同的概念。

多线程指的是在一个程序中同时运行多个线程,每个线程都是独立执行的。多线程可以使程序具有并发性,提高系统的吞吐量和响应速度。通过多线程,可以将耗时的操作和任务分配给不同的线程来执行,从而提高整体的工作效率。

异步指的是一种编程模型,其中任务的执行不需要等待前一个任务的完成。在异步编程中,可以执行一个任务,而不需要阻塞等待其完成,而是继续执行后续的任务。异步操作通常适用于涉及网络请求、文件读写、数据库查询等可能造成阻塞的操作。通过异步操作,可以充分利用等待时间来执行其他任务,提高程序的效率。

多线程和异步之间存在一定的关系。在实现异步操作时,通常会使用多线程来实现异步任务的执行。通过创建新的线程,可以在后台执行耗时的操作,而不会阻塞主线程的执行。

例如,对于一个网络请求,使用多线程可以在主线程发送请求后,继续执行其他任务,而不需要等待网络请求返回的数据。当网络请求完成后,在新的线程中处理响应数据,从而实现异步操作。

总结起来,多线程是一种实现并发性的机制,而异步是一种编程模型,在实现异步操作时通常会使用多线程来达到异步执行的效果。多线程可以提供资源的并行使用,而异步可以提高程序的运行效率和响应性。

来看一下创建线程的方式

实现Runnable接口可以开启多线程

实现Callable接口可以开启多线程并且有返回值

class MyThread1 implements Runnable {@Overridepublic void run() {}
}class MyThread2 implements Callable<String> {@Overridepublic String call() throws Exception {return null;}
}

但是Thread的构造方法中只有参数为Runnable的方法,无法满足我们的需求

image-20230712161344204

继续往下看,发现RunnableFuture接口继承了Runnable以及Future,那也就是说他具有多线程、异步两个特点

image-20230712161533902

FutureTask实现了RunnableFuture接口,并且他的构造方法中可以传入Callable,那么他就同时具有了多线程、异步、有返回三大特点。

image-20230712161709601

优点

Future配合线程池异步多线程,能显著提高程序的效率

需求

主线程需要执行三个任务,且三个任务耗时分别是500毫秒、300毫秒、300毫秒,如果主线程自己执行的话,那程序至少需要花费11秒的时间,现在使用Future + 线程池来优化

实现

public static void main(String[] args) throws InterruptedException {//创建线程池ExecutorService threadPool = Executors.newFixedThreadPool(3);long startTime = new Date().getTime();//创建任务FutureTask<String> task1 = new FutureTask<>(() -> {Thread.sleep(500);return "task1 over";});threadPool.submit(task1,"t1");FutureTask<String> task2 = new FutureTask<>(() -> {Thread.sleep(300);return "task2 over";});threadPool.submit(task2,"t2");Thread.sleep(300);System.out.println("task3 over");long endTime = new Date().getTime();System.out.println("花费" + (endTime - startTime) + "毫秒");  //花费338毫秒
}

缺点

get方法会阻塞

调用task1.get()会使主线程阻塞,因为get()他会一直等待子线程返回结果在继续运行。

public static void main(String[] args) throws ExecutionException, InterruptedException {//创建任务FutureTask<String> task1 = new FutureTask<>(() -> {Thread.sleep(5000);return "task1 over";});new Thread(task1,"t1").start();System.out.println("t1线程结果:" + task1.get());System.out.println("主线程执行完毕");
}

isDone方法轮询

对于上述代码,没有一个友好的提示,导致我们不知道程序为何阻塞,FutureTask提供了isDone(),调用该方法,结果为true表示线程执行完毕。

但是这种方法的结果就是需要不停的轮询,大量的消耗了CPU

public static void main(String[] args) throws ExecutionException, InterruptedException {//创建任务FutureTask<String> task1 = new FutureTask<>(() -> {Thread.sleep(5000);return "task1 over";});new Thread(task1,"t1").start();while (true) {if (task1.isDone()) {System.out.println("t1线程结果:" + task1.get());break;}else {Thread.sleep(500);System.out.println("请等待");}}System.out.println("主线程执行完毕");
}

更复杂的任务

对于简单的任务使用Future完全可以解决,下面有几个更为复杂的需求Future不好解决了

  • 多个任务前后可以组合处理

    例如:子线程A计算返回的结果,在子线程B中需要用到

  • 对计算速度选最快

    例如:联机游戏,谁先到终点谁就赢了,那么当A到达终点时,B的线程也需要中断

对此,就引出了CompletableFuture,这就有点像一个名场面东厂管不了的,我西厂来管,东厂管得了的,我西厂更要管,也就是说Future有的功能CompletableFuture都有,Future没有的功能CompletableFuture也有,有点像plus版本。

CompletableFuture

之前介绍了Future,发现他只能解决一些简单的逻辑,并且阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无谓的CPU资源,所有CompletableFuture应运而生。

概念

CompletableFuture提供了一种类似于观察者模式的机制,可以让任务执行完后通知监听的一方。

他是JDK1.8新增的类,实现了FutureCompletionStage接口

Future不用在介绍了,CompletionStage提供了一种处理异步操作结果的机制,可以与回调函数一起使用,来处理 CompletableFuture 的计算结果。

image-20230713151615088

创建方式

我们学习一个新的类的方式,第一步就是看他的构造函数,创建这个类,CompletableFuture虽然有一个空参构造函数,但是官方并不推荐我们使用,一般我们通过4个静态方法来创建。

调用静态方法创建返回值
CompletableFuture runAsync(Runnable runnable)
CompletableFuture runAsync(Runnable runnable,Executor executor)
CompletableFuture supplyAsync(Supplier supplier)
supplyAsync(Supplier supplier, Executor executor)

代码实现

runAsync()

不带有线程池,默认使用ForkJoinPool的线程

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println("进入子线程:" + Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}});System.out.println(completableFuture.get()); //nullSystem.out.println("主线程结束");
}

supplyAsync

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("进入子线程:" + Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}int result = ThreadLocalRandom.current().nextInt(10);System.out.println("子线程执行结果:" + result);return result;});System.out.println(completableFuture.get());System.out.println("主线程结束");
}

带有线程池的创建就不举例了

通用异步编程

上面还是在演示Future原有的功能,接下来学一下新的功能

通过whenComplete来监听子进程执行完毕,来做一系列操作

通过exceptionally来解决子进程出现异常的情况

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("进入子线程:" + Thread.currentThread().getName());System.out.println(Thread.currentThread().isDaemon()); //守护线程try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}int result = ThreadLocalRandom.current().nextInt(10);System.out.println("子线程执行结果:" + result);return result;}).whenComplete((v,e) -> {if (e == null) {System.out.println("计算结果为:" + v);}}).exceptionally(e -> {//处理异常e.printStackTrace();System.out.println("计算过程出现异常:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println("主线程结束");
}//输出
进入子线程:ForkJoinPool.commonPool-worker-9
主线程结束

但是发现控制台输出没有等待结果,主线程就直接结束了,这是因为默认情况下ForkJoinPool里面是守护线程,解决方法有两种

  1. 在主线程结束前等待
  2. 使用自定义的线程池

修改代码

public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(10);CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("进入子线程:" + Thread.currentThread().getName());System.out.println(Thread.currentThread().isDaemon()); //守护线程try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}int result = ThreadLocalRandom.current().nextInt(10);System.out.println("子线程执行结果:" + result);return result;},threadPool).whenComplete((v,e) -> {if (e == null) {System.out.println("计算结果为:" + v);}}).exceptionally(e -> {//处理异常e.printStackTrace();System.out.println("计算过程出现异常:" + e.getCause() + "\t" + e.getMessage());return null;});System.out.println("主线程结束");threadPool.shutdown();
}//输出
进入子线程:pool-1-thread-1
false
主线程结束
子线程执行结果:7
计算结果为:7

通过控制台输出可以看到,自定义线程池创建的是用户线程,所以即使是主线程执行完毕,程序还是要等待所有用户线程执行完毕才会结束。

链式语法

接下来会用到很多链式语法,这个在Java8很常见,其实就是在写法上更加简洁了

public class CompletableFutureDemo5 {public static void main(String[] args) throws ExecutionException, InterruptedException {Student student = new Student();/*以前的写法student.setId(1);student.setName("张三"));*/student.setId(1).setName("张三");System.out.println(student);}
}@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
class Student{private Integer id;private String name;
}

join方法

在介绍join()之前,必须先介绍一下get()

get()是获取异步计算结果,但是在编译期会需要抛出异常

image-20230718150444583

join()也是获取异步计算结果,但是不需要抛出异常

image-20230718150548558

电商比价案例

需求:

需要查询《深入理解JVM虚拟机》这本书在各大电商平台销售的价格,显示结果如下

《深入理解JVM虚拟机》in jd price is 100

普通解决方案

使用同步方式,一步一步来,查一个保存一个价格

此方法的优点是简洁粗暴,缺点是非常的耗时

public class CompletableFutureDemo7 {static List<NetMall> malls = Arrays.asList(new NetMall("jd"),new NetMall("tb"),new NetMall("pdd"));public static List<String> step(List<NetMall> list ,String productName){return list.stream().map(netMall -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName))).collect(Collectors.toList());}public static void main(String[] args) {long startTime = new Date().getTime();List<String> list = step(malls, "MySQL");long endTime = new Date().getTime();System.out.println("耗时:" + (endTime - startTime));for (String item : list) {System.out.println(item);}}
}@Data
@AllArgsConstructor
@NoArgsConstructor
class NetMall{private String netMallName;public double calcPrice(String productName) {//模拟请求过程耗时try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}//结果
耗时:3067
MySQL in jd price is 78.86
MySQL in tb price is 78.95
MySQL in pdd price is 78.98

异步解决方案

核心计算方法,使用异步的方式进行,这样大大的节约了时间

public static List<String> byCompletableFuture(List<NetMall> list ,String productName){return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName)))).collect(Collectors.toList()).stream().map(s -> s.join()).collect(Collectors.toList());
}//结果
耗时:1050
MySQL in jd price is 77.77
MySQL in tb price is 77.18
MySQL in pdd price is 77.32

计算方法

CompletableFuture提供了非常多的计算方法

获取结果和触发计算

方法名作用
public T get()获取结果,会造成当前线程阻塞
public T get(long timeout, TimeUnit unit)获取结果,在指定的时间内获取不到,抛出异常
public T join()获取结果,跟get()用法一致,区别是编译器不需要抛异常
public T getNow(T valueIfAbsent)立刻获取结果,如果结果没出来,使用指定值代替结果
public boolean complete(T value)中断计算,计算过程被中断返回true,并且用指定值代替计算结果
public static void main(String[] args) throws InterruptedException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return "123";});TimeUnit.SECONDS.sleep(2);System.out.println(completableFuture.complete("completeValue") + "\t " + completableFuture.join());//true	 completeValue
}

对计算结果进行处理

方法名作用
public CompletableFuture thenApply()获取计算结果,对其进行处理
public CompletableFuture handle()作用同thenApply,区别在于遇到异常不会组织下一步运行

thenApply()

public class CompletableFutureDemo9 {public static void main(String[] args) throws InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(10);CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;},threadPool).thenApply(f -> {System.out.println("222");return f + 1;}).thenApply(f -> {System.out.println("333");return f + 2;}).whenComplete((v,e) -> {if (e == null) {System.out.println("计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("主线程去忙其他的了 : " + Thread.currentThread().getName());threadPool.shutdown();}
}

可以看到程序在抛异常时,就停止了,不会继续往下执行

image-20230719135405676

hanlde()

public static void main(String[] args) throws InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(10);CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("111");return 1;}, threadPool).handle((f, e) -> {int i = 1 / 0;System.out.println("222");return f + 1;}).handle((f, e) -> {System.out.println("333");return f + 2;}).whenComplete((v, e) -> {if (e == null) {System.out.println("计算结果:" + v);}}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("主线程去忙其他的了 : " + Thread.currentThread().getName());threadPool.shutdown();
}

通过输出可以看出在第一个handle中出现异常,不继续往下执行该handle的方法,但是不影响后续的hanlde方法

image-20230719135510783

对计算结果进行消费

消费,顾名思义就是把这条消息消费掉,后面的人就获取不到这条消息了。

方法作用
public CompletableFuture thenRun(Runnable action)任务A执行完执行B,并且B不需要A的结果
public CompletableFuture thenAccept(Consumer<? super T> action)任务A执行完执行B,B需要A的结果,但是任务B没有返回值
public CompletableFuture thenApply任务A执行完执行B,B需要A的结果,同事任务B有返回值

如下所示,thenAccept()方法,在获取结果时为null

public static void main(String[] args) throws InterruptedException {System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); //nullSystem.out.println(CompletableFuture.supplyAsync(() -> "resultB").thenAccept(y -> {System.out.println(y);}).join()); //resultB  nullSystem.out.println(CompletableFuture.supplyAsync(() -> "resultC").thenApply(y -> y + "resultD").join()); //resultCresultD
}

对运行线程池进行选择

不使用线程池,默认走的是ForkJoinPool

image-20230719155843600

使用线程池,走的全都是自定义线程池

image-20230719160752242

使用线程池,中间调用了thenRunAsync方法,那么之后的方法都会使用ForkJoinPool

image-20230719160832347

源码

thenRun()thenRunAsync()区别在于一个传参使用了默认的线程池

image-20230719161125302

image-20230719161136157

对计算速度进行选用

调用applyToEither()方法,他会将两个异步任务先完成的值返回

public class CompletableFutureDemo12 {public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return "future1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {throw new RuntimeException(e);}return "future2";});CompletableFuture<String> result = future1.applyToEither(future2, s ->  s + " is win");System.out.println(result.join()); //future1 is win}
}

对计算结果进行合并

thenCombine()可以将两个计算结果合并

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {throw new RuntimeException(e);}return 20;});CompletableFuture<Integer> result = future1.thenCombine(future2, (x,y) -> {System.out.println("计算结果合并");return x + y;});System.out.println(result.join()); //30
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/9.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android应用启动全流程分析(源码深度剖析)

作者&#xff1a;努比亚技术团队 源码来源&#xff1a;努比亚技术团队 1.前言 从用户手指点击桌面上的应用图标到屏幕上显示出应用主Activity界面而完成应用启动&#xff0c;快的话往往都不需要一秒钟&#xff0c;但是这整个过程却是十分复杂的&#xff0c;其中涉及了Android系…

【java】【基础2】程序流程控制

目录 一、最经典的三种执行顺序 二、分支结构 2.1 if 2.2 switch 2.3 if与switch区别 三、循环结构 3.1 for循环 3.2 while循环 3.3 do-while循环 3.4 三种循环区别 3.5 补充知识&#xff1a;死循环 3.6 补充知识&#xff1a;循环嵌套 四、跳转关键字&#xff1a;br…

自建DNSlog服务器

DNSlog简介 在某些情况下&#xff0c;无法利用漏洞获得回显。但是&#xff0c;如果目标可以发送DNS请求&#xff0c;则可以通过DNS log方式将想获得的数据外带出来。 DNS log常用于以下情况&#xff1a; SQL盲注无回显的命令执行无回显的SSRF 网上公开提供dnslog服务有很多…

MySQL 主从复制与读写分离

概念 主从复制与读写分离的意义 企业中的业务通常数据量都比较大&#xff0c;而单台数据库在数据存储、安全性和高并发方面都无法满足实际的需求&#xff0c;所以需要配置多台主从数据服务器&#xff0c;以实现主从复制&#xff0c;增加数据可靠性&#xff0c;读写分离&#x…