详解CompletableFuture

最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。

Future接口以及它的局限性

我们都知道,Java中创建线程的方式主要有两种方式,继承Thread或者实现Runnable接口。但是这两种都是有一个共同的缺点,那就是都无法获取到线程执行的结果,也就是没有返回值。于是在JDK1.5 以后为了解决这种没有返回值的问题,提供了Callable和Future接口以及Future对应的实现类FutureTask,通过FutureTask的就可以获取到异步执行的结果。

于是乎,我们想要开启异步线程,执行任务,获取结果,就可以这么实现。

FutureTask<String> futureTask = new FutureTask<>(() -> "三友");new Thread(futureTask).start();System.out.println(futureTask.get());

或者使用线程池的方式

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。

Future接口的局限性

虽然通过Future接口的get方法可以获取任务异步执行的结果,但是get方法会阻塞主线程,也就是异步任务没有完成,主线程会一直阻塞,直到任务结束。

Future也提供了isDone方法来查看异步线程任务执行是否完成,如果完成,就可以获取任务的执行结果,代码如下。

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
while (!future.isDone()) {//任务有没有完成,没有就继续循环判断
}
System.out.println(future.get());
executorService.shutdown();

但是这种轮询查看异步线程任务执行状态,也是非常消耗cpu资源。

同时对于一些复杂的异步操作任务的处理,可能需要各种同步组件来一起完成。

所以,通过上面的介绍可以看出,Future在使用的过程中还是有很强的局限性,所以为了解决这种局限性,在JDK1.8的时候,Doug Lea 大神为我们提供了一种更为强大的类CompletableFuture。

什么是CompletableFuture?

CompletableFuture在JDK1.8提供了一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。

CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果。

CompletableFuture常见api详解

CompletableFuture的方法api多,但主要可以分为以下几类。

1、实例化CompletableFuture

构造方法创建
CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

此时如果有其它线程执行如下代码,就能执行打印出 三友

completableFuture.complete("三友")
静态方法创建

除了使用构造方法构造,CompletableFuture还提供了静态方法来创建

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

supply 和 run 的主要区别就是 supply 可以有返回值,run 没有返回值。至于另一个参数Executor 就是用来执行异步任务的线程池,如果不传Executor 的话,默认是ForkJoinPool这个线程池的实现。

一旦通过静态方法来构造,会立马开启异步线程执行Supplier或者Runnable提交的任务。

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(completableFuture.get());

一旦任务执行完成,就可以打印返回值,这里的使用方法跟Future是一样的。

所以对比两个两种实例化的方法,使用静态方法的和使用构造方法主要区别就是,使用构造方法需要其它线程主动调用complete来表示任务执行完成,因为很简单,因为在构造的时候没有执行异步的任务,所以需要其它线程主动调用complete来表示任务执行完成。

2、获取任务执行结果

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();

get()和get(long timeout, TimeUnit unit)是实现了Future接口的功能,两者主要区别就是get()会一直阻塞直到获取到结果,get(long timeout, TimeUnit unit)值可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。

getNow(T valueIfAbsent):就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的 valueIfAbsent 参数值,如果执行完成了,就会返回任务执行的结果。

join():跟get()的主要区别就是,get()会抛出检查时异常,join()不会。

3、主动触发任务完成

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);

complete:主动触发当前异步任务的完成。调用此方法时如果你的任务已经完成,那么方法就会返回false;如果任务没完成,就会返回true,并且其它线程获取到的任务的结果就是complete的参数值。

completeExceptionally:跟complete的作用差不多,complete是正常结束任务,返回结果,而completeExceptionally就是触发任务执行的异常。

4、对任务执行结果进行下一步处理

只能接收任务正常执行后的回调
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

这类回调的特点就是,当任务正常执行完成,没有异常的时候就会回调。

thenApply:可以拿到上一步任务执行的结果进行处理,并且返回处理的结果 thenRun:拿不到上一步任务执行的结果,但会执行Runnable接口的实现 thenAccept:可以拿到上一步任务执行的结果进行处理,但不需要返回处理的结果

thenApply示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

上一步的执行的结果为:10

thenRun示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenRun(() -> System.out.println("上一步执行完成"));

执行结果:

上一步执行完成

thenAccept示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));

执行结果:

上一步执行完成,结果为:10

thenApply有异常示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {//模拟异常int i = 1 / 0;return 10;
}).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

当有异常时是不会回调的

只能接收任务处理异常后的回调
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

当上面的任务执行过程中出现异常的时候,会回调exceptionally方法指定的回调,但是如果没有出现异常,是不会回调的。

exceptionally能够将异常给吞了,并且fn的返回值会返回回去。

其实这个exceptionally方法有点像降级的味道。当出现异常的时候,走到这个回调,可以返回一个默认值回去。

没有异常情况下:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());

执行结果:

100

有异常情况下:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());

执行结果:

出现异常了,返回默认值
110
能同时接收任务执行正常和异常的回调
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);

不论前面的任务执行成功还是失败都会回调的这类方法指定的回调方法。

handle : 跟exceptionally有点像,但是exceptionally是出现异常才会回调,两者都有返回值,都能吞了异常,但是handle正常情况下也能回调。

whenComplete:能接受正常或者异常的回调,并且不影响上个阶段的返回值,也就是主线程能获取到上个阶段的返回值;当出现异常时,whenComplete并不能吞了这个异常,也就是说主线程在获取执行异常任务的结果时,会抛出异常。

这里演示一下whenComplete处理异常示例情况,handle跟exceptionally对异常的处理差不多。

whenComplete处理异常示例:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 10;
}).whenComplete((r, e) -> {System.out.println("whenComplete被调用了");
});
System.out.println(completableFuture.join());

执行结果:

whenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

5、对任务结果进行合并

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。

thenCombine的例子请往下继续看。

6、以Async结尾的方法

上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下:

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

thenAcceptAsync跟thenAccept的主要区别就是thenAcceptAsync会重新开一个线程来执行下一阶段的任务,而thenAccept还是用上一阶段任务执行的线程执行。

两个thenAcceptAsync主要区别就是一个使用默认的线程池来执行任务,也就是ForkJoinPool,一个是使用方法参数传入的线程池来执行任务。

当然除了thenAccept方法之外,上述提到的方法还有很多带有Async结尾的对应的方法,他们的主要区别就是执行任务是否开启异步线程来执行的区别。

当然,还有一些其它的api,可以自行查看

CompletableFuture在RocketMQ中的使用

CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。

在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。

图片

实现代码如下

消息存储刷盘任务和主从复制任务:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 提交刷盘的请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//提交主从复制的请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);//刷盘 和 主从复制 两个异步任务通过thenCombine联合
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {// 当两个刷盘和主从复制任务都完成的时候,就会回调// 如果刷盘没有成功,那么就将消息存储的状态设置为失败if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}// 如果主从复制没有成功,那么就将消息存储的状态设置为失败if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}// 最终返回消息存储的结果return putMessageResult;
});

对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时:

//消息存储的开始时间
long beginTime = this.getSystemClock().now();
// 存储消息,然后返回 CompletableFuture,也就是上面一段代码得返回值‍
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);//监听消息存储的结果
putResultFuture.thenAccept((result) -> {// 消息存储完成之后会回调long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}
});

CompletableFuture的优点

1、异步函数式编程,实现优雅,易于维护;

2、它提供了异常管理的机制,让你有机会抛出、管理异步任务执行中发生的异常,监听这些异常的发生;

3、拥有对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/491460.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS 非线性容器LightWeightMap 常用的几个方法

LightWeightMap可用于存储具有关联关系的key-value键值对集合&#xff0c;存储元素中key值唯一&#xff0c;每个key对应一个value。 LightWeightMap依据泛型定义&#xff0c;采用轻量级结构&#xff0c;初始默认容量大小为8&#xff0c;每次扩容大小为原始容量的两倍。 集合中k…

三极管功能

1 三极管的结构 2 三极管开关电路设计注意事项 1 三极管进入饱和状态 电机&#xff1a;500毫安 2 判断三级什么状态&#xff1a;电压法 3 判断三级什么状态&#xff1a;电流法 4 求IB的电阻 5 当三极管用作开关时&#xff0c;通常N型三极管控制负载的gnd端&#xff0c;P型…

P6打卡—Pytorch实现人脸识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 1.检查GPU import torch import torch.nn as nn import matplotlib.pyplot as plt import torchvisiondevicetorch.device("cuda" if torch.cuda.is_…

R square 的计算方法和一点思考

模型的性能评价指标有几种方案&#xff1a;RMSE&#xff08;平方根误差&#xff09;、MAE&#xff08;平均绝对误差&#xff09;、MSE(平均平方误差)、R2_score 其中&#xff0c;当量纲不同时&#xff0c;RMSE、MAE、MSE难以衡量模型效果好坏。这就需要用到R2_score&#xff1…

解决并发情况下调用 Instruct-pix2pix 模型推理错误:index out of bounds 问题

解决并发情况下调用 Instruct-pix2pix 模型推理错误&#xff1a;index out of bounds 问题 背景介绍 在对 golang 开发的 图像生成网站 进行并发测试时&#xff0c;调用基于 Instruct-pix2pix 模型和 FastAPI 的图像生成 API 遇到了以下错误&#xff1a; Model inference er…

利用DFT画有限长序列的DTFT

MATLAB中没有DTFT函数&#xff0c;计算机不可能给出连续结果&#xff0c;可以只能利用DFT的fft函数来实现。 %% L 7; x ones(1, L) figure; tiledlayout(2,3,"TileSpacing","tight") nexttile; stem([0:L-1],x) box off title([num2str(L), points rect…

【进程篇】03.进程的概念与基本操作

一、进程的概念与理解 1.1 概念 进程是程序的一个执行实例&#xff0c;即正在执行的程序。 1.2 理解 我们编写代码运行后会在磁盘中会形成一个可执行程序&#xff0c;当我们运行这个可执行程序时&#xff0c;这个程序此时就会被操作系统的调度器加载到内存中&#xff1b;操…

基于MATLAB 的数字图像处理技术总结

大家好&#xff01;欢迎来到本次的总结性的一篇文章&#xff0c;因为咸鱼哥这几个月是真的有点小忙&#xff08;参加了点小比赛&#xff0c;准备考试等等&#xff09;所以&#xff0c;在数字图像学习后&#xff0c;我来写一个总结性的文章&#xff0c;同时帮助大家学习&#xf…

llama2——微调lora,第一次参考教程实践完成包括训练和模型

前言&#xff1a;磕磕绊绊&#xff0c;不过收获很多&#xff0c;最大的收获就是解决报错error的分析方法和解决思路 1、首先&#xff0c;我参考的是这篇博客&#xff1a;怎样训练一个自己的大语言模型&#xff1f;全网最简单易懂的教程&#xff01;_开源模型训练出一个语言模型…

类OCSP靶场-Kioptrix系列-Kioptrix Level 3

一、前情提要 二、实战打靶 1. 信息收集 1.1. 主机发现 1.2. 端口扫描 1.3.目录遍历 1.4. 敏感信息 2.漏洞发现 2.1.登录功能账号密码爆破 2.2.CMS历史漏洞 2.2.1.exp利用 2.2.2.提权 2.3. sql注入getshell 2.3.1.发现注入点 2.3.2. 测试字段和类型 2.3.3.查询字…

WPF实现曲线数据展示【案例:震动数据分析】

wpf实现曲线数据展示&#xff0c;函数曲线展示&#xff0c;实例&#xff1a;震动数据分析为例。 如上图所示&#xff0c;如果你想实现上图中的效果&#xff0c;请详细参考我的内容&#xff0c;创作不易&#xff0c;给个赞吧。 一共有两种方式来实现&#xff0c;一种是使用第三…

PHP代码审计学习(一)--命令注入

1、漏洞原理 参数用户可控&#xff0c;程序将用户可控的恶意参数通过php可执行命令的函数中运行导致。 2、示例代码 <?php echorec-test; $command ping -c 1 .$_GET[ip]; system($command); //system函数特性 执行结果会自动打印 ?> 通过示例代码可知通过system函…

Vivado安装System Generator不支持新版Matlab解决方法

目录 前言&#xff1a; Vivado安装System Generator不支持新版Matlab解决方法 前言&#xff1a; 本文介绍一下Vivado不支持新版Matlab的解决办法&#xff0c;Vivado只支持最近两年3个版本的Matlab&#xff0c;当前最新版vivado 2018.3只支持2017a,2017b,2018a。 Vivado安装Sy…

半导体数据分析(二):徒手玩转STDF格式文件 -- 码农切入半导体系列

一、概述 在上一篇文章中&#xff0c;我们一起学习了STDF格式的文件&#xff0c;知道了这是半导体测试数据的标准格式文件。也解释了为什么码农掌握了STDF文件之后&#xff0c;好比掌握了切入半导体行业的金钥匙。 从今天开始&#xff0c;我们一起来一步步地学习如何解构、熟…

#渗透测试#漏洞挖掘#红蓝攻防#SRC漏洞挖掘02之权限漏洞挖掘技巧

免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。 权限相关漏洞 越权、未授权访问、oss、后台暴露、…

IS-IS协议

IS-IS协议介绍 IS-IS&#xff08;Intermediate System to Intermediate System&#xff09;协议是一种链路状态的内部网关协议&#xff08;IGP&#xff09;&#xff0c;用于在同一个自治系统&#xff08;Autonomous System, AS&#xff09;内部的路由器之间交换路由信息。IS-I…

4.7 TCP 的流量控制

欢迎大家订阅【计算机网络】学习专栏&#xff0c;开启你的计算机网络学习之旅&#xff01; 文章目录 前言1 滑动窗口与流量控制2 持续计时器与零窗口探测3 控制TCP发送报文段的时机3.1 控制发送时机的三种机制3.2 糊涂窗口综合症 前言 在网络通信中&#xff0c;流量控制是确保…

不良人系列-复兴数据结构(栈和队列)

个人主页&#xff1a;爱编程的小新☆ 不良人经典语录&#xff1a;“相呴相济 玉汝于成 勿念 心安” 目录 一. 栈(stack) 1. 栈的概念 2. 栈的常见方法 3.栈的模拟实现 ​编辑 二. 队列 1. 队列的概念 2. 队列的使用 2.1 队列的常见方法 2.2 队列的模拟实现 2.3 队列…

在clion中使用MySQL的教程

首先就是配置好东西&#xff0c;也是非常简单的&#xff1a; 1.把mysql安装目录&#xff08;其中的lib好像&#xff09;中的2个文件复制到下面就行 2.然后配置&#xff0c;这个文件 cmake_minimum_required(VERSION 3.24) project(2024_12project)include_directories(D:\\mys…

某名校考研自命题C++程序设计——近10年真题汇总(下)

第二期&#xff0c;相比上一贴本帖的题目难度更高一些&#xff0c;我当然不会告诉你我先挑简单的写~ 某名校考研自命题C程序设计——近10年真题汇总&#xff08;上&#xff09;-CSDN博客文章浏览阅读651次&#xff0c;点赞9次&#xff0c;收藏13次。本帖更新一些某校的编程真题…