Java——基于CompletableFuture的流水线并行处理

CompletableFuture在JDK1.8提供了一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。

CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果。

CompletableFuture常见api详解:

1、实例化CompletableFuture

// 构造方法创建
CompletableFuture<String> completableFuture = new CompletableFuture<>();
completableFuture.complete("hello");
System.out.println(completableFuture.get());
// 静态方法创建
/**
supply 和 run 的主要区别就是 supply 可以有返回值,run 没有返回值。
Executor 就是用来执行异步任务的线程池,
如果不传Executor 的话,默认是ForkJoinPool这个线程池的实现。
一旦通过静态方法来构造,会立马开启异步线程执行Supplier或者Runnable提交的任务。
**/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
// 一旦任务执行完成,就可以打印返回值,这里的使用方法跟Future是一样的。
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "hello");
System.out.println(completableFuture.get());

使用构造方法需要其它线程主动调用complete来表示任务执行完成,因为很简单,因为在构造的时候没有执行异步的任务,所以需要其它线程主动调用complete来表示任务执行完成。

2、获取任务执行结果

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();
  • get()和get(long timeout, TimeUnit
    unit)是实现了Future接口的功能,两者主要区别就是get()会一直阻塞直到获取到结果,get(long timeout,
    TimeUnit unit)值可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。
  • getNow(T valueIfAbsent):就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的
    valueIfAbsent 参数值,如果执行完成了,就会返回任务执行的结果。
  • join():跟get()的主要区别就是,get()会抛出检查时异常,join()不会。

3、主动触发任务完成

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
  • complete:主动触发当前异步任务的完成。调用此方法时如果你的任务已经完成,那么方法就会返回false;如果任务没完成,就会返回true,并且其它线程获取到的任务的结果就是complete的参数值。
  • completeExceptionally:跟complete的作用差不多,complete是正常结束任务,返回结果,而completeExceptionally就是触发任务执行的异常。

4、对任务执行结果进行下一步处理

// 只能接收任务正常执行后的回调
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

这类回调的特点就是,当任务正常执行完成,没有异常的时候就会回调。

  • thenApply:可以拿到上一步任务执行的结果进行处理,并且返回处理的结果
  • thenRun:拿不到上一步任务执行的结果,但会执行Runnable接口的实现
  • thenAccept:可以拿到上一步任务执行的结果进行处理,但不需要返回处理的结果
// 只能接收任务处理异常后的回调
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

当上面的任务执行过程中出现异常的时候,会回调exceptionally方法指定的回调,但是如果没有出现异常,是不会回调的。

exceptionally能够将异常给吞了,并且fn的返回值会返回回去。

其实这个exceptionally方法有点像降级的味道。当出现异常的时候,走到这个回调,可以返回一个默认值回去。

// 能同时接收任务执行正常和异常的回调
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);

不论前面的任务执行成功还是失败都会回调的这类方法指定的回调方法。

  • handle :
    跟exceptionally有点像,但是exceptionally是出现异常才会回调,两者都有返回值,都能吞了异常,但是handle正常情况下也能回调。
  • whenComplete:能接受正常或者异常的回调,并且不影响上个阶段的返回值,也就是主线程能获取到上个阶段的返回值;当出现异常时,whenComplete并不能吞了这个异常,也就是说主线程在获取执行异常任务的结果时,会抛出异常。

示例:

    @Testpublic void then(){CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> 10).thenApply(v -> ("上一步的执行的结果为:" + v));System.out.println(completableFuture1.join());CompletableFuture<Void> completableFuture2 = CompletableFuture.supplyAsync(() -> 10).thenRun(() -> System.out.println("上一步执行完成"));CompletableFuture<Void> completableFuture3 = CompletableFuture.supplyAsync(() -> 10).thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));CompletableFuture<Integer> completableFuture5 = CompletableFuture.supplyAsync(() -> {return 100;}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;});System.out.println(completableFuture5.join());CompletableFuture<Integer> completableFuture6 = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;});System.out.println(completableFuture6.join());CompletableFuture<String> completableFuture4 = CompletableFuture.supplyAsync(() -> {//模拟异常int i = 1 / 0;return 10;}).thenApply(v -> ("上一步的执行的结果为:" + v));System.out.println(completableFuture4.join());CompletableFuture<Integer> completableFuture7 = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 10;}).whenComplete((r, e) -> {System.out.println("whenComplete被调用了");});System.out.println(completableFuture7.join());}
上一步的执行的结果为:10
上一步执行完成
上一步执行完成,结果为:10
100
出现异常了,返回默认值
110java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zerowhenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

5、对任务结果进行合并

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。

6、以Async结尾的方法

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

thenAcceptAsync跟thenAccept的主要区别就是thenAcceptAsync会重新开一个线程来执行下一阶段的任务,而thenAccept还是用上一阶段任务执行的线程执行。

两个thenAcceptAsync主要区别就是一个使用默认的线程池来执行任务,也就是ForkJoinPool,一个是使用方法参数传入的线程池来执行任务。

当然除了thenAccept方法之外,上述提到的方法还有很多带有Async结尾的对应的方法,他们的主要区别就是执行任务是否开启异步线程来执行的区别。

CompletableFuture实战测试:

场景:

  • 从某宝、某东、某多多去获取某个商品的价格、折扣
  • 并计算出实际付款金额
  • 最终返回最优的平台与价格信息

多线程形式
多线程流程图

@Slf4j
public class CompleteFutureTest1 {static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(50,100,10,TimeUnit.SECONDS,new ArrayBlockingQueue<>(200),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {userFutureTask();}private static void userFutureTask(){long start = System.currentTimeMillis();Future<BigDecimal> moubaoFuture = threadPool.submit(() -> {// 1、查询某宝价格log.info("查询 某宝价格");TimeUnit.SECONDS.sleep(2);BigDecimal bigDecimal = new BigDecimal("100");// 2、查询某宝折扣log.info("查询 某宝折扣");TimeUnit.SECONDS.sleep(2);BigDecimal discount = new BigDecimal("0.6");// 3、计算某宝实时价格log.info("计算 某宝实时价格");TimeUnit.SECONDS.sleep(2);return bigDecimal.multiply(discount);});Future<BigDecimal> moudongFuture = threadPool.submit(() -> {// 1、查询某宝价格log.info("查询 某东价格");TimeUnit.SECONDS.sleep(2);BigDecimal bigDecimal = new BigDecimal("100");// 2、查询某宝折扣log.info("查询 某东折扣");TimeUnit.SECONDS.sleep(2);BigDecimal discount = new BigDecimal("0.6");// 3、计算某宝实时价格log.info("计算 某东实时价格");TimeUnit.SECONDS.sleep(2);return bigDecimal.multiply(discount);});Future<BigDecimal> mouduoFuture = threadPool.submit(() -> {// 1、查询某宝价格log.info("查询 某多价格");TimeUnit.SECONDS.sleep(2);BigDecimal bigDecimal = new BigDecimal("100");// 2、查询某宝折扣log.info("查询 某多折扣");TimeUnit.SECONDS.sleep(2);BigDecimal discount = new BigDecimal("0.6");// 3、计算某宝实时价格log.info("计算 某多实时价格");TimeUnit.SECONDS.sleep(2);return bigDecimal.multiply(discount);});BigDecimal minPrice = Stream.of(moubaoFuture, moudongFuture, mouduoFuture).map(priceResultFuture -> {try {return priceResultFuture.get(10, TimeUnit.SECONDS);} catch (Exception e) {log.error("exception:{}", e.getMessage());return null;}}).filter(Objects::nonNull).sorted(BigDecimal::compareTo).findFirst().get();log.info("minPrice: {}", minPrice);long end = System.currentTimeMillis();log.info("耗时: {}" + (end - start));}}
17:54:09.417 [pool-1-thread-1] INFO com.ooamo.example.E13.CompleteFutureTest1 - 查询 某宝价格
17:54:09.417 [pool-1-thread-2] INFO com.ooamo.example.E13.CompleteFutureTest1 - 查询 某东价格
17:54:09.417 [pool-1-thread-3] INFO com.ooamo.example.E13.CompleteFutureTest1 - 查询 某多价格
17:54:11.421 [pool-1-thread-3] INFO com.ooamo.example.E13.CompleteFutureTest1 - 查询 某多折扣
17:54:11.421 [pool-1-thread-2] INFO com.ooamo.example.E13.CompleteFutureTest1 - 查询 某东折扣
17:54:11.421 [pool-1-thread-1] INFO com.ooamo.example.E13.CompleteFutureTest1 - 查询 某宝折扣
17:54:13.421 [pool-1-thread-1] INFO com.ooamo.example.E13.CompleteFutureTest1 - 计算 某宝实时价格
17:54:13.421 [pool-1-thread-2] INFO com.ooamo.example.E13.CompleteFutureTest1 - 计算 某东实时价格
17:54:13.421 [pool-1-thread-3] INFO com.ooamo.example.E13.CompleteFutureTest1 - 计算 某多实时价格
17:54:15.422 [main] INFO com.ooamo.example.E13.CompleteFutureTest1 - minPrice: 60.0
17:54:15.423 [main] INFO com.ooamo.example.E13.CompleteFutureTest1 - 耗时: {}6050

使用多线程的方法去实现,耗时为6s

CompletableFuture形式:
completableFuture流程图

@Slf4j
public class CompleteFutureTest2 {public static void main(String[] args) {userFutureTask();}private static void userFutureTask(){long start = System.currentTimeMillis();CompletableFuture<BigDecimal> moubao = CompletableFuture.supplyAsync(() -> {// 1、查询某宝价格log.info("查询 某宝价格");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return new BigDecimal("100");}).thenCombine(CompletableFuture.supplyAsync(() -> {// 2、查询某宝折扣log.info("查询 某宝折扣");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return new BigDecimal("0.6");}), CompleteFutureTest2::computeRealPrice);CompletableFuture<BigDecimal> moudong = CompletableFuture.supplyAsync(() -> {// 1、查询某宝价格log.info("查询 某东价格");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return new BigDecimal("100");}).thenCombine(CompletableFuture.supplyAsync(() -> {// 2、查询某宝折扣log.info("查询 某东折扣");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return new BigDecimal("0.6");}), CompleteFutureTest2::computeRealPrice);CompletableFuture<BigDecimal> mouduo = CompletableFuture.supplyAsync(() -> {// 1、查询某宝价格log.info("查询 某多价格");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return new BigDecimal("100");}).thenCombine(CompletableFuture.supplyAsync(() -> {// 2、查询某宝折扣log.info("查询 某多折扣");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return new BigDecimal("0.6");}), CompleteFutureTest2::computeRealPrice);BigDecimal minPrice = Stream.of(moubao, mouduo, moudong).map(future -> {try {return future.get();} catch (Exception e) {throw new RuntimeException(e);}}).filter(Objects::nonNull).sorted(BigDecimal::compareTo).findFirst().get();log.info("minPrice: {}", minPrice);long end = System.currentTimeMillis();log.info("耗时: {}" + (end - start));}private static BigDecimal computeRealPrice(BigDecimal price, BigDecimal discount){log.info("计算价格");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e){e.printStackTrace();}return price.multiply(discount);}}
18:10:35.199 [ForkJoinPool.commonPool-worker-6] INFO com.ooamo.example.E14.CompleteFutureTest2 - 查询 某多折扣
18:10:35.199 [ForkJoinPool.commonPool-worker-2] INFO com.ooamo.example.E14.CompleteFutureTest2 - 查询 某宝折扣
18:10:35.199 [ForkJoinPool.commonPool-worker-9] INFO com.ooamo.example.E14.CompleteFutureTest2 - 查询 某宝价格
18:10:35.199 [ForkJoinPool.commonPool-worker-4] INFO com.ooamo.example.E14.CompleteFutureTest2 - 查询 某东折扣
18:10:35.199 [ForkJoinPool.commonPool-worker-13] INFO com.ooamo.example.E14.CompleteFutureTest2 - 查询 某多价格
18:10:35.199 [ForkJoinPool.commonPool-worker-11] INFO com.ooamo.example.E14.CompleteFutureTest2 - 查询 某东价格
18:10:37.203 [ForkJoinPool.commonPool-worker-9] INFO com.ooamo.example.E14.CompleteFutureTest2 - 计算价格
18:10:37.203 [ForkJoinPool.commonPool-worker-4] INFO com.ooamo.example.E14.CompleteFutureTest2 - 计算价格
18:10:37.203 [ForkJoinPool.commonPool-worker-13] INFO com.ooamo.example.E14.CompleteFutureTest2 - 计算价格
18:10:39.203 [main] INFO com.ooamo.example.E14.CompleteFutureTest2 - minPrice: 60.0
18:10:39.205 [main] INFO com.ooamo.example.E14.CompleteFutureTest2 - 耗时: {}4038

使用CompletableFuture的方法耗时为4s

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/286890.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言结构体之位段

位段&#xff08;节约内存&#xff09;&#xff0c;和王者段位联想记忆 位段是为了节约内存的。刚好和结构体相反。 那么什么是位段呢&#xff1f;我们现引入情景&#xff1a;我么如果要记录一个人是男是女&#xff0c;用数字0 1表示。我们发现只要一个bit内存就可以完成我们想…

Linux离线安装mysql,node,forever

PS:本文是基于centos7实现的,要求系统能够查看ifconfig和unzip解压命令, 实现无网络可安装运行 首先现在百度网盘的离线文件包****安装Xftp 和 Xshell 把机房压缩包传到 home目录下****解压unzip 包名.zip 获取IP先获取到 linux 主机的ip ifconfig Xftp 连接输入IP,然后按照…

FPGA----ZCU106的petalinux 2019.1使用USB传输数据

1、实际项目中需要用到开发板的串口进行数据交互&#xff0c;之前讲的几节只是启动了网口&#xff08;如下链接&#xff09;。因此&#xff0c;本次给大家带来的官方自带串口例程的使用方法&#xff0c;本文的vivado工程和下述连接一样&#xff0c;PL端什么配置都没有。 FPGA-…

ElasticSearch8 - 基本操作

前言 本文记录 ES 的一些基本操作&#xff0c;就是对官方文档的一些整理&#xff0c;按自己的习惯重新排版&#xff0c;凑合着看。官方的更详细&#xff0c;建议看官方的。 下文以 books 为索引名举例。 新增 添加单个文档 (没有索引会自动创建) POST books/_doc {"n…

flutter 弹窗之系列三

Overlay 部分源码 class Overlay extends StatefulWidget{...static OverlayState of(BuildContext context, {bool rootOverlay false,Widget debugRequiredFor,})... } // rootOverlay: // 值为false, 就近查找&#xff0c;找到树中最近的节点; // 如果为true, 则去找最顶…

数据库专题(oracle基础和进阶)

前言 本专题主要记录自己最近学的数据库&#xff0c;有兴趣一起补习的可以一起看看&#xff0c;有补充和不足之处请多多指出。希望专题可以给自己还有读者带去一点点提高。 数据库基本概念 本模块有参考&#xff1a;数据库基本概念-CSDN博客 数据库管理系统是一个由互相关联的…

[Java基础揉碎]抽象类

目录 通过问题引出 介绍 关键点 细节 ​编辑 抽象类的最佳设计模式--模版设计模式 1.先用最容易想到的方法 2.分析问题&#xff0c;提出使用模板设计模式 通过问题引出 假如我们有个动物类, 动物都有eat吃的方法, 但是具体吃什么, 我们不知道, 因为是什么动物我们不知道…

Git版本控制

这是两个学习Git推荐必看的文档&#xff0c;第一个链接是Git的官方权威文档&#xff0c;第二个链接是国内程序员在开发中&#xff0c;总结的Git快速入门教程&#xff0c;掌握这个&#xff0c;也足够应付在工作中的场景。 Git权威书籍《ProGit》中文版https://gitee.com/progit…

jdk11中自定义java类在jvm是如何被查找、加载

yym带你了解jvm源码&#xff0c;openjdk11源码&#xff0c;java类jvm加载原理 jdk11中java类在jvm是如何被1查找、2加载 以下说明的是MiDept类是如何被java classloader 和 jvm加载步骤 上源代码 public static void main(String[] args) {Thread.currentThread().setName…

Python机器学习赋能GIS:地质灾害风险评价的新方法论

地质灾害是指全球地壳自然地质演化过程中&#xff0c;由于地球内动力、外动力或者人为地质动力作用下导致的自然地质和人类的自然灾害突发事件。由于降水、地震等自然作用下&#xff0c;地质灾害在世界范围内频繁发生。我国除滑坡灾害外&#xff0c;还包括崩塌、泥石流、地面沉…

k8s记录-基础配置1

1、基础yaml文件格式 1.1、namespace apiVersion: v1 kind: Namespace metadata:name: namelabels:name: namekubectl apply -f namespace.yaml 1.2、Service service示例 apiVersion: v1 kind: Service metadata:name: ilanni-httpd-svcnamespace: ilanni-namespace spec:…

【代驾+顺风车+货运】全开源双端APP代驾+顺风车+货运代驾小程序源码

内容目录 一、详细介绍二、效果展示1.部分代码2.效果图展示 一、详细介绍 系统是基于Thinkphpuniapp开发的&#xff0c;全开源未加密&#xff0c;这套源码可以拿回去自己做二开 后台用户端司机端 功能详情介绍&#xff1a; 车主实名认证&#xff0c;驾驶证认证&#xff0c;车…

大学生实习被企业坑了,教训比较深刻

帮实习生解决一些疑惑&#xff0c;所以出了一个视频&#xff0c;大家多多支持 实习途径 1 靠自己&#xff08;招聘平台投简历&#xff0c;大专及普通大学的选择&#xff09; 2 靠关系&#xff08;亲人、老师、朋友帮推荐&#xff0c;有关系就是好&#xff09; 3 靠校招&#xf…

ROS机器人入门第四课:话题通信

文章目录 ROS机器人入门第四课&#xff1a;话题通信一、话题通信概述&#xff08;一&#xff09;概念&#xff08;二&#xff09;作用 二、话题通信基本操作需求:分析:流程:&#xff08;一&#xff09;发布方解释一些关键的ROS函数和概念&#xff1a; &#xff08;二&#xff0…

力扣刷题44-46(力扣0062/0152/0198)

62. 不同路径 题目描述&#xff1a; 一个机器人位于一个 m x n 网格的左上角 &#xff0c;机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角。问总共有多少条不同的路径&#xff1f; 思路&#xff1a; 其实就是问(0,0)->(m-1,n-1)一共有几条路。 第一个…

office办公技能|word中的常见使用问题解决方案2.0

一、设置多级列表将表注从0开始&#xff0c;设置为从1开始 问题描述&#xff1a;word中插入题注&#xff0c;出来的是表0-1&#xff0c;不是1-1&#xff0c;怎么办&#xff1f; 写论文时&#xff0c;虽然我设置了“第一章”为一级标题&#xff0c;但是这三个字并不是自动插入的…

QMT量化交易上手

文章目录 QMT介绍基本使用代码初始化股票和行情交易量化策略示例相关链接QMT介绍 QMT是迅投公司出品量化交易客户端软件,目前只能运行在windows机器上,分为QMT 和 miniQMT两种模式,后者可以采用python API做程序化交易,极大方便了广大散户。这点上比同花顺/通信达好很多。…

软件推荐 篇三十七:安卓软件推荐IP Tools「IP工具」:全面解析网络状态与管理的必备神器

引言&#xff1a; 随着互联网的普及&#xff0c;网络已经成为我们日常生活中不可或缺的一部分。无论是工作、学习还是娱乐&#xff0c;我们都需要通过网络来进行各种操作。然而&#xff0c;网络问题的出现往往会给我们带来诸多困扰。为了更好地管理和优化网络&#xff0c;我们…

tomcat配置静态资源后无法正常访问

目录 一、场景二、配置三、访问异常四、排查五、原因六、解决 一、场景 1、将前端文件存在到指定目录 2、在tomcat配置静态资源 3、配置后无法正常访问到前端文件 二、配置 1、tomcat配置 2、静态资源 三、访问异常 四、排查 可以ping通&#xff0c;但是访问不了3080端口 …