【Java异步编程】CompletableFuture基础(1):创建不同线程的子任务、子任务链式调用与异常处理

文章目录

    • 1. 三种实现接口
    • 2. 链式调用:保证链的顺序性与异步性
    • 3. CompletableFuture创建CompletionStage子任务
    • 4. 处理异常
      • a. 创建回调钩子
      • b. 调用handle()方法统一处理异常和结果
    • 5. 如何选择线程池:不同的业务选择不同的线程池

CompletableFuture是JDK 1.8引入的实现类,该类实现了Future和CompletionStage两个接口。该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。

CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。

 

1. 三种实现接口

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。

这三个常用的函数式接口的特点如下:

被包装接口功能描述
FunctionFunction接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步
RunnableRunnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
ConsumerConsumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。

 

2. 链式调用:保证链的顺序性与异步性

多个CompletionStage构成了一条任务流水线,一个环节执行完成了可以将结果移交给下一个环节(子任务)​。多个CompletionStage子任务之间可以使用链式调用。

下面是一个顺序调用的例子:

使用 CompletionStage 及其方法构建了一个异步任务链,thenApply 用于对前一个阶段的结果进行计算并传递结果,thenAccept 用于消费前一个阶段的结果并执行操作,thenRun 用于执行无输入输出的操作。

oneStage//被thenApply包装CompletionStage子任务,由输入输出.thenApply(x -> square(x))  //消耗上游输出,但是没有输出.thenAccept(y -> System.out.println(y)) //不消耗上一个子任务的输出又不产生结果.thenRun(() -> System.out.println()) 

这种链式操作可以方便地将多个异步操作连接起来,同时保证了操作的顺序性和异步性,提高了代码的可维护性和并发性能。

 

接下来是一个异步调用的例子:

在这个例子中,task2 和 task3 都依赖于 task1 完成后执行,但它们可能并行执行,也就是说,task2 和 task3 的执行顺序是不确定的,它们不一定会按照 thenRunAsync 的顺序执行。

CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {System.out.println("Task 1");
});CompletableFuture<Void> task2 = task1.thenRunAsync(() -> {System.out.println("Task 2");
});CompletableFuture<Void> task3 = task1.thenRunAsync(() -> {System.out.println("Task 3");
});

 

3. CompletableFuture创建CompletionStage子任务

CompletableFuture定义了一组方法用于创建CompletionStage子任务(或者阶段性任务)​,基础的方法如下:

//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)//子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)//子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//子任务包装一个Supplier实例,并使用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

其中主要注意的信息是:

  1. Supplier 表示一个无参数但有返回值的函数,Runnable表示无惨无返回值的函数
  2. 在使用CompletableFuture创建CompletionStage子任务时,如果没有指定Executor线程池,在默认情况下CompletionStage会使用公共的ForkJoinPool线程池。
  3. 它们都会交给线程池执行,get方法会堵塞主线程等待执行结果。

给一个例子:

//无返回值异步调用  
@Test  
public void runAsyncDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  });  //等待异步任务执行完成,最多等待2秒  future.get(2, TimeUnit.SECONDS);  
}  //有返回值异步调用  
@Test  
public void supplyAsyncDemo() throws Exception {  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() ->  {  long start = System.currentTimeMillis();  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  return System.currentTimeMillis() - start;  });  //等待异步任务执行完成,现时等待2秒  long time = future.get(2, TimeUnit.SECONDS);  Print.tco("异步执行耗时(秒) = " + time / 1000);  
}

 

4. 处理异常

a. 创建回调钩子

可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的回调钩子。

//设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor)//设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
@Test  
public void whenCompleteDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.whenComplete(new BiConsumer<Void, Throwable>() {  @Override  public void accept(Void t, Throwable action) {  Print.tco("执行完成!");  }  });  //设置发生异常后的回调钩子  future.exceptionally(new Function<Throwable, Void>() {  @Override  public Void apply(Throwable t) {  Print.tco("执行失败!" + t.getMessage());  return null;  }  });  future.get();  
}[ForkJoinPool.commonPool-worker-9]:抛出异常!
[main]:执行完成!
[ForkJoinPool.commonPool-worker-9]:执行失败!java.lang.RuntimeException: 发生异常
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 发生异常

有如下几个注意点:

  1. 调用cancel()方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally()方法所设置的异常回调钩子也会被执行。
  2. 如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:
    • 在调用get()时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)​。
    • 在调用join()和getNow(T)启动任务时,如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。

 

b. 调用handle()方法统一处理异常和结果

//在执行任务的同一个线程中处理异常和结果
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);//可能不在执行任务的**同一个线程**中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);//在指定线程池executor中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
@Test  
public void handleDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.handle(new BiFunction<Void, Throwable, Void>() {  @Override  public Void apply(Void input, Throwable throwable) {  if (throwable == null) {  Print.tcfo("没有发生异常!");  } else {  Print.tcfo("sorry,发生了异常!");  }  return null;  }  });  future.get();  
}//
//[ForkJoinPool.commonPool-worker-1]:抛出异常! 
//[ForkJoinPool.commonPool-worker-1|CompletableFutureDemo$3.apply]: sorry,发生了异常!

 

5. 如何选择线程池:不同的业务选择不同的线程池

默认情况下,通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置

     option:-Djava.util.concurrent.ForkJoinPool.common.parallelism

 

如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/10624.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Rust自学】15.5. Rc<T>:引用计数智能指针与共享所有权

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 15.5.1. 什么是Rc<T> 所有权在大部分情况下都是清晰的。对于一个给定的值&#xff0c;程序员可以准确地推断出哪个变量拥有它。 …

UE5制作视差图

双目深度估计开源数据集很多都是用UE制作的&#xff0c;那么我们自己能否通过UE制作自己想要的场景的数据集呢。最近花了点时间研究了一下&#xff0c;分享给需要的小伙伴。 主要使用的是UnrealCV插件&#xff0c;UnrealCV是一个开源项目&#xff0c;旨在帮助计算机视觉研究人…

基于遗传优化GRNN和Hog特征提取的交通标志识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 HOG 4.2 GRNN&#xff08;General Regression Neural Network&#xff09;模型原理 4.3 遗传算法&#xff08;GA&#xff09;优化GRNN平滑因子 5.算法完整程序工程 1.算法运行效果图预…

C语言【基础篇】之流程控制——掌握三大结构的奥秘

流程控制 &#x1f680;前言&#x1f99c;顺序结构&#x1f4af; 定义&#x1f4af;执行规则 &#x1f31f;选择结构&#x1f4af;if语句&#x1f4af;switch语句&#x1f4af;case穿透规则 &#x1f914;循环结构&#x1f4af;for循环&#x1f4af;while循环&#x1f4af;do -…

C++实现状态模式

首先上代码&#xff1a; #include <iostream> #include <memory>class Context;class State { public:virtual void Handle(Context * context) 0; //纯虚函数virtual ~State() default; //虚析构函数 };//创建状态A class ConcreateStateA : public State{…

【React】PureComponent 和 Component 的区别

前言 在 React 中&#xff0c;PureComponent 和 Component 都是用于创建组件的基类&#xff0c;但它们有一个主要的区别&#xff1a;PureComponent 会给类组件默认加一个shouldComponentUpdate周期函数。在此周期函数中&#xff0c;它对props 和 state (新老的属性/状态)会做一…

二级C语言:二维数组每行最大值与首元素交换、删除结构体的重复项、取出单词首字母

目录 一、程序填空 --- 二维数组每行最大值与首元素交换 题目 分析 知识点 --- 交换语句 二、程序修改 --- 删除结构体的重复项 题目 分析 三、程序设计 --- 取出单词首字母 题目 分析 前言 本章讲解&#xff1a;二维数组每行最大值与首元素交换、删除结构体的重复项…

CUDA学习-内存访问

一 访存合并 1.1 说明 本部分内容主要参考: 搞懂 CUDA Shared Memory 上的 bank conflicts 和向量化指令(LDS.128 / float4)的访存特点 - 知乎 1.2 share memory结构 图1.1 share memory结构 放在 shared memory 中的数据是以 4 bytes(即 32 bits)作为 1 个 word,依…

【python】python基于机器学习与数据分析的手机特性关联与分类预测(源码+数据集)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;专__注&#x1f448;&#xff1a;专注主流机器人、人工智能等相关领域的开发、测试技术。 python基于机器学习与数据分析的手机特性关联与分类…

【leetcode详解】T3175(一点反思)

解题心得 要写出一个好的程序&#xff0c;有效解决问题&#xff0c;思路上就不能“太乖” —— 不能被题目的叙述过程所束缚&#xff0c;而是力求细思问题&#xff0c;抽象化问题&#xff0c;并找到背后的逻辑&#xff1b;最后抓住核心对象&#xff0c;去除多余项&#xff0c;…

图论——最小生成树

最小生成树 给定一个无向图&#xff0c;在图中选择若干条边把图的所有节点连起来。要求边长之和最小。在图论中&#xff0c;叫做求最小生成树。 prim算法 prim 算法采用的是一种贪心的策略。 每次将离连通部分的最近的点和点对应的边加入的连通部分&#xff0c;连通部分逐渐扩大…

jvisualvm工具使用

jvisualvm 是JDK自带的具有图形界面操作功能的JVM性能监控和诊断工具&#xff0c;它不仅能分析和诊断堆转储文件&#xff0c;在线实时监控本地JVM进程&#xff0c;还能监控远程服务器上的JVM进程。 1 分析服务器下载dump文件 1&#xff09;在我们在安装JDK的bin目录双击jvisa…

C++ list

list需知&#xff1a; list不会出现insert迭代器失效问题 链表插入不会影响原有数据相对位置&#xff0c;且不用扩容 但是erase会导致相对数据位置移动&#xff0c;所有其erase会导致迭代器失效 list排序效率很低 不建议使用 小规模数据量可以使用&#xff0c;比较方便 此外…

DeepSeek-R1 论文解读 —— 强化学习大语言模型新时代来临?

近年来&#xff0c;人工智能&#xff08;AI&#xff09;领域发展迅猛&#xff0c;大语言模型&#xff08;LLMs&#xff09;为通用人工智能&#xff08;AGI&#xff09;的发展开辟了道路。OpenAI 的 o1 模型表现非凡&#xff0c;它引入的创新性推理时缩放技术显著提升了推理能力…

【基于SprintBoot+Mybatis+Mysql】电脑商城项目之用户注册

&#x1f9f8;安清h&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;【计算机网络】【Mybatis篇】 &#x1f6a6;作者简介&#xff1a;一个有趣爱睡觉的intp&#xff0c;期待和更多人分享自己所学知识的真诚大学生。 目录 &#x1f3af;项目基本介绍 &#x1f6a6;项…

蓝桥杯思维训练营(一)

文章目录 题目总览题目详解翻之一起做很甜的梦 蓝桥杯的前几题用到的算法较少&#xff0c;大部分考察的都是思维能力&#xff0c;方法比较巧妙&#xff0c;所以我们要积累对应的题目&#xff0c;多训练 题目总览 翻之 一起做很甜的梦 题目详解 翻之 思维分析&#xff1a;一开…

【AI】DeepSeek 概念/影响/使用/部署

在大年三十那天&#xff0c;不知道你是否留意到&#xff0c;“deepseek”这个词出现在了各大热搜榜单上。这引起了我的关注&#xff0c;出于学习的兴趣&#xff0c;我深入研究了一番&#xff0c;才有了这篇文章的诞生。 概念 那么&#xff0c;什么是DeepSeek&#xff1f;首先百…

minimind - 从零开始训练小型语言模型

大语言模型&#xff08;LLM&#xff09;领域&#xff0c;如 GPT、LLaMA、GLM 等&#xff0c;虽然它们效果惊艳&#xff0c; 但动辄10 Bilion庞大的模型参数个人设备显存远不够训练&#xff0c;甚至推理困难。 几乎所有人都不会只满足于用Lora等方案fine-tuing大模型学会一些新的…

【机器学习】自定义数据集 使用pytorch框架实现逻辑回归并保存模型,然后保存模型后再加载模型进行预测,对预测结果计算精确度和召回率及F1分数

一、使用pytorch框架实现逻辑回归 1. 数据部分&#xff1a; 首先自定义了一个简单的数据集&#xff0c;特征 X 是 100 个随机样本&#xff0c;每个样本一个特征&#xff0c;目标值 y 基于线性关系并添加了噪声。将 numpy 数组转换为 PyTorch 张量&#xff0c;方便后续在模型中…

数据分析系列--④RapidMiner进行关联分析(案例)

一、核心概念 1.项集&#xff08;Itemset&#xff09; 2.规则&#xff08;Rule&#xff09; 3.支持度&#xff08;Support&#xff09; 3.1 支持度的定义 3.2 支持度的意义 3.3 支持度的应用 3.4 支持度的示例 3.5 支持度的调整 3.6 支持度与其他指标的关系 4.置信度&#xff0…