Vert.x 源码解析(4.x)(一)——Future源码解析

目录

在这里插入图片描述

1. 简介

在现代的软件开发中,异步编程已经变得非常重要。它可以提高应用程序的并发性能,使应用程序能够更有效地处理大量的并行操作。Vert.x 是一个面向事件驱动、非阻塞的异步编程框架,它提供了丰富的工具来简化异步编程的复杂性。

如下图就是Vert.x实现异步设计到的类,主要关键在于FutureImpl以及PromiseImpl。下面会介绍他们分别负责什么。
在这里插入图片描述

2.关键类简介

2.1 AsyncResult

这是 Vert.x 中的一个通用接口,用于表示异步操作的结果。它可以包含成功的结果值或失败的异常,允许您在异步操作完成后检查结果,并相应地采取行动。

关键函数如下

public interface AsyncResult<T> {//执行完后的值T result();//异常值Throwable cause();//判断是否成功boolean succeeded();//判断是否失败boolean failed();.........}

2.2 Future

它扩展了AsyncResult接口,并且内部增加了很多组合操作符,比如join,any,all,map等

关键函数如下

public interface Future<T> extends AsyncResult<T> {static <T> CompositeFuture all(List<? extends Future<?>> futures) {return CompositeFutureImpl.all(futures.toArray(new Future[0]));}static CompositeFuture any(List<? extends Future<?>> futures) {return CompositeFutureImpl.any(futures.toArray(new Future[0]));}static CompositeFuture join(List<? extends Future<?>> futures) {return CompositeFutureImpl.join(futures.toArray(new Future[0]));}default <U> Future<U> flatMap(Function<T, Future<U>> mapper) {return compose(mapper);}//是否完成boolean isComplete();//完成后的回调监听Future<T> onComplete(Handler<AsyncResult<T>> handler);//成功监听default Future<T> onSuccess(Handler<T> handler) {return onComplete(ar -> {if (ar.succeeded()) {handler.handle(ar.result());}});}//失败监听default Future<T> onFailure(Handler<Throwable> handler) {return onComplete(ar -> {if (ar.failed()) {handler.handle(ar.cause());}});}
}

2.3 FutureInternal

这个接口主要定义了添加监听器的方法,后续所以监听完成的监听器都是调用这个方法,但是这个是内部调用的。

//上下文,Vert.x线程等都是由这个来执行的,具体后续会出context文章
ContextInternal context();
//添加监听器
void addListener(Listener<T> listener);

2.4 FutureImpl、FutureBase

FutureBase这里就emitSuccess和emitFailure方法主要是是执行listener的方法以及一些转换函数添加监听器的方法,这边就不单独列出了

FutureImpl

这里就主要介绍两个方法

onComplete以及tryComplete

onComplete:传入一个handler,后续任务完成后会调用handler

tryComplete:当你要操作的异步的有结果后会调用tryComplete,接着就会调用OnComplete传入的handler方法(其实实际上暴露出来给我们使用者调用的是PromiseImpl的complete方法,接着再调用tryComplete,这个后面会讲

 //这是结果值private Object value;
/*** 尝试完成,就是把结果传入进来,接着调用监听器,告知函所有监听者完成信号* @param result* @return*/
public boolean tryComplete(T result) {Listener<T> l;synchronized (this) {//如果value有值了直接返回,主要是把传进来的result赋值。if (value != null) {return false;}value = result == null ? NULL_VALUE : result;l = listener;listener = null;}if (l != null) {//这就是我们刚刚说的FutureBase定义的接口,里面就是调用监听器的方法emitSuccess(result, l);}return true;
}/*** 添加完成回调的方法* @param handler the handler that will be called with the result* @return*/@Overridepublic Future<T> onComplete(Handler<AsyncResult<T>> handler) {Objects.requireNonNull(handler, "No null handler accepted");Listener<T> listener;//判断是否属于Listener,如果不是则把handler包裹进来,通过listener成功和失败的回调进行执行handlerif (handler instanceof Listener) {listener = (Listener<T>) handler;} else {listener = new Listener<T>() {@Overridepublic void onSuccess(T value) {try {//成功后调用handlehandler.handle(FutureImpl.this);} catch (Throwable t) {if (context != null) {context.reportException(t);} else {throw t;}}}@Overridepublic void onFailure(Throwable failure) {try {//失败后调用handler.handle(FutureImpl.this);} catch (Throwable t) {if (context != null) {context.reportException(t);} else {throw t;}}}};}//就是调用FutureInternal的添加监听器的方法。addListener(listener);return this;}

2.5 Promise、PromiseInternal、PromiseImpl

Promise

public interface Promise<T> extends Handler<AsyncResult<T>> {//提供直接返回实现类实例static <T> Promise<T> promise() {return new PromiseImpl<>();}//这里页定义了tryCompleteboolean tryComplete(T result);   //看到了吧,这个我们使用者其实是调用这个complete方法,它内部会调用tryComplete方法来default void complete(T result) {if (!tryComplete(result)) {throw new IllegalStateException("Result is already complete");}}  
}

PromiseInternal

内部增加了一个context获取方法

ContextInternal context();

PromiseImpl

看以下代码PromiseImpl继承了FutureImpl

public final class PromiseImpl<T> extends FutureImpl<T> implements PromiseInternal<T>, Listener<T> {@Overridepublic Future<T> future() {return this;}}

2.6 疑问

是不是疑问来了

  1. 为什么还需要Promise来调用complete,不直接用FutureImpl呢?

职责分明问题FutureImpl是结果容器,代表未来会生成结果的容器,接着通过监听器来告知你这个结果。

PromiseImpl 是可以允许你手动设置异步操作结果所以Future像Get,Promise是Set

2.7 其他

CompositeFutureImpl:继承FutureImpl,用于实现 CompositeFuture 接口的默认实现。CompositeFuture 用于组合多个异步操作,等待它们全部完成或任意一个完成(Future做组合变换等操作都是继承FutureImpl类实现,包括PromiseImpl,Otherwise,Mapping等

3. 入门实例

3.1 案例1(独立使用Future)

    //创建PromisePromise promise=new PromiseImpl();//根据Promise获取FutureFuture future=promise.future();//模拟异步任务new Thread(() -> {try {Thread.sleep(5000);//执行完后调用complete方法,并传入结果promise.complete(123);} catch (InterruptedException e) {e.printStackTrace();}}).start();//回调future.onComplete(event -> {System.out.println(event+"成功了");});System.out.println("代码执行完成");Thread.sleep(15000);

结果:

代码执行完成
Future{result=123}成功了

3.2 案例2(Vert.x内部封装的文件调用)

这是Vert.x内部打开文件的方法,内部也是使用了Future和Promise。后面会分析源代码如何实现

FileSystem fs = vertx.fileSystem();Future<FileProps> future = fs.props("/my_file.txt");future.onComplete((AsyncResult<FileProps> ar) -> {if (ar.succeeded()) {FileProps props = ar.result();System.out.println("File size = " + props.size());} else {System.out.println("Failure: " + ar.cause().getMessage());}
});

4.源码分析

根据入门实例,进行源码分析

前面是直接创建PromiseImpl,接着调用promise.future直接获取Future类了。

我们重点分析的就是OnComplete类和complete类。

//创建Promise
Promise promise=new PromiseImpl();
//根据Promise获取Future
Future future=promise.future();//模拟异步任务
new Thread(() -> {try {Thread.sleep(5000);//执行完后调用complete方法promise.complete(123);} catch (InterruptedException e) {e.printStackTrace();}
}).start();
//回调
future.onComplete(event -> {System.out.println(event+"成功了");
});
System.out.println("代码执行完成");

4.1 OnComplete

public Future<T> onComplete(Handler<AsyncResult<T>> handler) {Objects.requireNonNull(handler, "No null handler accepted");Listener<T> listener;//判断是否属于Listener,如果不是则把handler包裹进来,通过listener成功和失败的回调进行执行handlerif (handler instanceof Listener) {listener = (Listener<T>) handler;} else {listener = new Listener<T>() {@Overridepublic void onSuccess(T value) {try {handler.handle(FutureImpl.this);} catch (Throwable t) {if (context != null) {context.reportException(t);} else {throw t;}}}@Overridepublic void onFailure(Throwable failure) {try {handler.handle(FutureImpl.this);} catch (Throwable t) {if (context != null) {context.reportException(t);} else {throw t;}}}};}//就是调用FutureInternal的添加监听器的方法。addListener(listener);return this;
}

addListener

@Override
public void addListener(Listener<T> listener) {Object v;synchronized (this) {//将value赋值,这就是判断当前Future是否已存在结果值v = value;//如果等于null说明现在还没有结果值//因为可能添加监听器的时候已经有值了,那么直接调用监听器方法if (v == null) {//如果等于空就赋值if (this.listener == null) {this.listener = listener;} else {ListenerArray<T> listeners;//根据类型是单一还是列表要么转换,要么new再添加if (this.listener instanceof FutureImpl.ListenerArray) {listeners = (ListenerArray<T>) this.listener;} else {listeners = new ListenerArray<>();listeners.add(this.listener);this.listener = listeners;}listeners.add(listener);}return;}}if (v instanceof CauseHolder) {emitFailure(((CauseHolder)v).cause, listener);} else {if (v == NULL_VALUE) {v = null;}emitSuccess((T) v, listener);}
}

emitSuccess 这个方法再complete会说

4.2 complete

promise.complete(123);

complete里面默认就是调用tryComplete并传入值

default void complete(T result) {if (!tryComplete(result)) {throw new IllegalStateException("Result is already complete");}
}
/*** 尝试完成,就是把结果传入进来,接着调用监听器,告知函所有监听者完成信号* @param result* @return*/
public boolean tryComplete(T result) {Listener<T> l;synchronized (this) {//如果value有值了直接返回,主要是把传进来的result赋值。if (value != null) {return false;}value = result == null ? NULL_VALUE : result;l = listener;listener = null;}if (l != null) {//这就是我们刚刚说的FutureBase定义的接口,里面就是调用监听器的方法emitSuccess(result, l);}return true;
}

emitSuccess

protected final void emitSuccess(T value, Listener<T> listener) {//context不等于空,则再context里面的线程进行执行,调用listener.onSuccessif (context != null && !context.isRunningOnContext()) {context.execute(() -> {ContextInternal prev = context.beginDispatch();try {listener.onSuccess(value);} finally {context.endDispatch(prev);}});} else {//这边直接执行listener.onSuccess(value);}
}

5 总结

创建PromiseImpl,并且获取Future类,通过Future.OnComplete来添加监听器,通过Promise的complete设置值并且通知监听器。是不是很简单。

后续我看情况再写一篇关于针对于Future的其他实现类,来解释all,any,map等原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/121060.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决windows下git操作提示用户名密码错误的问题

当代码从一个平台切换到另一个平台的时候&#xff0c;需要做两步操作&#xff0c;第一步就是更新git的仓库地址&#xff0c;在项目的.git/config文件里面修改&#xff0c;这一步做完之后&#xff0c;就可以推送代码到新的仓库了&#xff0c;这里就是重点来了。 一般第一次推动代…

JVM虚拟机对象探秘

对象的创建 Java是一门面向对象的编程语言&#xff0c;创建对象通常只是通过new关键字。 对象创建过程 当Java虚拟机遇到一条字节码new指令时&#xff0c;首先将去检查这个指令的参数是否能在常量池中定位到 一个类的符号引用&#xff0c;并且检查这个符号引用&#xff08;类…

如何设计微服务

一、序幕 最近在思考&#xff0c;自己哪些不足&#xff0c;需要学习点什么&#xff1f;看着Java基础知识&#xff0c;千遍一律&#xff0c;没有太大的动力需深挖&#xff0c;只能在写业务项目的时候边写边思考边夯实自己的基础。于是看了网上的一些资料&#xff0c;结合以前面试…

linux并发服务器 —— 多线程并发(六)

线程概述 同一个程序中的所有线程均会独立执行相同程序&#xff0c;且共享同一份全局内存区域&#xff1b; 进程是CPU分配资源的最小单位&#xff0c;线程是操作系统调度执行的最小单位&#xff1b; Linux环境下&#xff0c;线程的本质就是进程&#xff1b; ps -Lf pid&…

101序列检测器

本次所做设计&#xff0c;使用数字电路芯片实现的101序列检测器。电路图如下&#xff1a; 主要首先要根据需求画出状态转移方程&#xff0c;然后写出它的逻辑表达式。最后根据所选触发器种类确定电路图。序列由按键控制输入&#xff0c;按键按下&#xff0c;代表输入1 &#xf…

RT-Thread UART

UART 简介 UART&#xff08;Universal Asynchronous Receiver/Transmitter&#xff09;通用异步收发传输器&#xff0c;UART 作为异步串口通信协议的一种&#xff0c;工作原理是将传输数据的每个字符一位接一位地传输。是在应用程序开发过程中使用频率最高的数据总线。 UART …

Error: Cannot find module ‘timers/promises‘

这个错误很要命 他会导致你本机所有的npm 命令全部报错 首先 这个错误是因为 npm 与 node版本不匹配导致的 最简单的办法就是 查一下你安装的这个npm 的版本适配那个版本的 node 然后将本地的node删除 控制面板写在node 然后去官方文档现在与本地npm 匹配的node.js 这样 你执行…

【LeetCode-中等题】146. LRU 缓存

文章目录 题目方法一&#xff1a;直接继承LinkedHashMap调用api方法二&#xff1a;自定义LinkedHashMap HashMap ListNode LinkedHashMap 题目 LRU缓存是什么&#xff1a;LRU缓存机制&#xff0c;你想知道的这里都有 实现 LRU 缓存算法 方法一&#xff1a;直接继承Linked…

彻底学会Unity从网上加载资源到场景

使用类WWW 该类实例化的对象可以存储多种多媒体资源&#xff0c;只需要在构造函数中附上可访问的资源链接 Unity 中&#xff0c;WWW 类用于实例化互联网上的资源&#xff0c;如文本、图像、音频和视频等。WWW 实例化的对象可以存储多种多媒体素材。以下是一些常见的例子&…

固定资产卡片乱怎么管理

固定资产卡片是记录公司固定资产信息的重要工具&#xff0c;如果管理不善&#xff0c;容易造成卡片混乱、数据错误等问题。 为了避免这种情况的发生&#xff0c;可以采取以下措施&#xff1a;  建立完善的资产管理制度&#xff0c;明确固定资产的分类、标准和使用情况&#x…

C# 中什么是重写(子类改写父类方法)

方法重写是指在继承关系中&#xff0c;子类重新实现父类或基类的某个方法。这种方法允许子类根据需要修改或扩展父类或基类的方法功能。在面向对象编程中&#xff0c;方法重写是一种多态的表现形式&#xff0c;它使得子类可以根据不同的需求和场景提供不同的方法实现。 方法重…

热点探测技术架构设计与实践

1. 概述 说到热点问题&#xff0c;首先我们先理解一下什么是热点&#xff1f; 热点通常意义来说&#xff0c;是指在一段时间内&#xff0c;被广泛关注的物品或事件&#xff0c;例如微博热搜&#xff0c;热卖商品&#xff0c;热点新闻&#xff0c;明星直播等等&#xff0c;所以…

You must install at least one postgresql-client-<version> package

使用主机上的映射端口来连接到 PostgreSQL 数据库。例如&#xff0c;使用以下命令连接到数据库&#xff1a; psql -h localhost -p 5432 -U postgres出现下面的问题&#xff1a; 分析&#xff1a; 如果您在运行 psql 命令时遇到错误消息 You must install at least one pos…

应用案例 | 3D视觉引导解决方案汽车零部件上下料

Part.1 行业背景 三维视觉引导技术在国内外汽车零部件领域得到了广泛应用。随着汽车制造业的不断发展和创新&#xff0c;对于零部件的加工和装配要求越来越高&#xff0c;而三维视觉引导技术能够帮助企业实现更精确、更高效的零部件上下料过程。 纵览国外&#xff0c;部分汽车…

若依 MyBatis改为MyBatis-Plus

主要内容&#xff1a;升级成mybatis-plus&#xff0c;代码生成也是mybatis-plus版本 跟着我一步一步来&#xff0c;就可完成升级&#xff01; 检查&#xff1a;启动程序&#xff0c;先保证若依能启动 第一步&#xff1a;添加依赖 这里需要在两个地方添加&#xff0c;一个是最…

软件游戏丢失d3dcompiler_47.dll怎么办?这个几个解决方法可修复

当我们在玩软件游戏时&#xff0c;有时候会出现丢失 d3dcompiler_47.dll 的问题&#xff0c;这让我们感到非常困扰。d3dcompiler_47.dll 是 DirectX 中的一个重要组件&#xff0c;如果它丢失了&#xff0c;那么很多游戏就无法正常运行。我将和大家分享一下我在解决软件游戏丢失…

计算机竞赛 基于深度学习的中文情感分类 - 卷积神经网络 情感分类 情感分析 情感识别 评论情感分类

文章目录 1 前言2 情感文本分类2.1 参考论文2.2 输入层2.3 第一层卷积层&#xff1a;2.4 池化层&#xff1a;2.5 全连接softmax层&#xff1a;2.6 训练方案 3 实现3.1 sentence部分3.2 filters部分3.3 featuremaps部分3.4 1max部分3.5 concat1max部分3.6 关键代码 4 实现效果4.…

小白学go基础03-了解Go项目的项目结构

我们先来看看第一个Go项目——Go语言自身——的项目结构是什么样的。Go项目的项目结构自1.0版本发布以来一直十分稳定&#xff0c;直到现在Go项目的顶层结构基本没有大的改变。 截至Go项目commit 1e3ffb0c&#xff08;2019.5.14&#xff09;&#xff0c;Go1.0 项目结构如下&am…

Ansible学习笔记15

1、roles&#xff1a;&#xff08;难点&#xff09; roles介绍&#xff1a; roles&#xff08;角色&#xff09;&#xff1a;就是通过分别将variables&#xff0c;tasks及handlers等放置于单独的目录中&#xff0c;并可以便捷地调用他们的一种机制。 假设我们要写一个playbo…

KaTex用法

KaTeX是一个用于数学公式渲染的JavaScript库&#xff0c;可以在网页上方便地显示数学符号和公式。下面是KaTeX的使用方法&#xff1a; 在网页中引入KaTeX的CSS和JS文件&#xff1a; <link rel"stylesheet" href"https://cdnjs.cloudflare.com/ajax/libs/Ka…