响应式编程-Project Reactor Mono 介绍

响应式编程-Project Reactor Mono 介绍

本文以Mono的角度来介绍Reactor编程,Flux的使用同理。

初体验

Web应用 controller 方法在Spring webmvc 和 Spring webFlux下Controller方法实现示例如下:

Spring webmvc:

    @GetMapping("/test1")

    @ResponseBody

    public String test1(){

        String result =  geterateTest();

        return result;

    }

Spring webFlux

    @GetMapping("/test2")

    @ResponseBody

    public Mono<String> test2(){

        Mono<String> result = Mono.fromSupplier(this:: geterateTest);

        return result;

    }

一个的响应是String对象, 另一个是Mono<String>对象。Reactor Mono表示一个产生0-1元素的异步序列,异步指Mono创建的时候并不会执行任何操作,当Mono发生订阅时才触发Mono序列的运行。非阻塞表示test2方法不会产生任何阻塞,即使genereateTest里面是一个阻塞的操作,因为此时不会执行实际的逻辑,所以不会发生任何阻塞。

NettyHttpServer.onStateChange方法中构建Mono并进行订阅。

HttpServerOperations ops = (HttpServerOperations)connection;

//Web Flux将按照Spring Web中的约定构建一个Publisher(执行过滤器、Controller方//法)

Publisher<Void> publisher = (Publisher)this.handler.apply(ops, ops);

Mono<Void> mono = Mono.deferContextual((ctx) -> {

      ops.currentContext = Context.of(ctx);

      return Mono.fromDirect(publisher);

});

……

//subscribe将触发前面Spring web中封装在Mono构建过程中的业务逻辑的真正执行。

//如果我们按照命令是编程去编写代码,业务逻辑在构建Mono的过程中就执行了。

mono.subscribe(ops.disposeSubscriber());

注: Spring web flux框架下也可以按照传统的命令式编程。

Mono的构建

Reactor编程可以分为 异步序列Mono/Flux的构建和和使用两部分。

Mono的基本构建

Mono类 提供了大量静态方法帮助构建Mono。

  • just(T):返回T类型对象的Mono序列
  • fromFuture(future):Mono序列的元素对象由future产生,订阅时Future产生T并推送至订阅者。其他from方法类似。
  • empty():返回一个订阅时直接完成的异步序列
  • error():返回一个订阅时直接推送错误信号的序列

其他方法详见Mono类API:

如:Mono<String> mono = Mono.just("TEST");

Mono装配

假设我们按照上面示例,将整个程序都以响应式编程的模式进行开发,方法都返回一个异步序列Mono/Flux。当调用者调用某一个方法时,面对返回的Mono/Flux对象有两种选择:1. 订阅(触发执行), 2.装配(Assembly):继续将获取到的异步序列封装到一个新的异步序列中,继续返回给外部调用者。如:Spring Web Flux 则是将Spring web 定义的包括WebFilter、Controller等逻辑组装成一个复合的Mono,最终进行订阅。

图1 Mono装配示例

OptimizableOperator 接口

       OptimizableOperator <IN, OUT>接口提供了指向下一个OptimizableOperator的指针,并且提供了从IN型订阅者获取OUT订阅者的方法,提供了一个Mono串行的组装方法。

图2 OptimizableOperator接口串行组装示意图

要实现一个串行化的Mono组装类通常实现抽象类InternalMonoOperator<I, O>,构造函数传入一个Mono<I>,得到一个新的O型序列。实现subscribeOrReturn方法将O型订阅转化为原I型订阅者,新的I型订阅者实现了基于O性订阅者之上的强化操作。Mono提供了大量InternalMonoOperator<I,O>的实现类。下面对MonoFilter进行分析,解释了如果创建基于InternalMonoOperator实现的装配类和使用方法。

MonoFilter

将原Mono上增加一个过滤Predicate函数,当原Mono产生元素时,只有Predicate测试通过的元素才会传递给最终的订阅者,测试失败将进行过滤,Mono元素直接完成。

final class MonoFilter<T> extends InternalMonoOperator<T, T> {

         final Predicate<? super T> predicate;

         //构造函数必须包含源Mono,和其他附加增加元素,这里是一个Predicate函数

         MonoFilter(Mono<? extends T> source, Predicate<? super T> predicate) {

                  super(source);

                  this.predicate = Objects.requireNonNull(predicate, "predicate");

         }

         /**

         * 实现subscribeOrReturn,接收新Mono类型的订阅者,返回原Mono类型的订阅者。

         * 新的订阅者实现订阅时装配的目的,这里只有通过Predicate函数测试的元素,才会

         * 调用actual.onNext(T)方法推送给最终的订阅者

         **/

         @Override

         @SuppressWarnings("unchecked")

         public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {

                  if (actual instanceof ConditionalSubscriber) {

                          return new FluxFilter.FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);

                  }

                  return new FluxFilter.FilterSubscriber<>(actual, predicate);

         }

    ......

}

Mono内置了大量的InternalMonoOperator实现类,如MonoFilter,但Reactor框架并不对外暴露这些类,(这些实现类都是包内可见的),而是通过Mono方法的形式去方便获取各个可实现类的对象,并且统一以Mono类型的对外暴露。抽象统一的Mono使用范式比起暴露各种各样的实现细节显得简洁清晰。

我们可以使用Mono内置的InternalMonoOperator实现类,也可以实现自己的InternalMonoOperator类,但应和Reactor框架保持统一的用法, 在Mono的使用上统一以Mono类型和协议进行操作,不对外暴露具体的实现细节。

Mono 提供的装配方法

       Reactor框架并不暴露具体的装配类细节,而是提供了大量静态或实例方法来对Mono进行装配,返回装配后的新Mono。如上节所述的MonoFilter使用方法如下:

Mono.just(2).filter( (v -> v % 2 != 0)).subscribe(i -> System.out.println(i),

                error -> System.err.println("Error: " + error),

                ()-> System.out.println("complete"));

Mono filter方法返回了一个可以对原序列元素进行检测的增强Mono,上述例子因Mono.just(2) 中的元素值2 无法通过(v -> v % 2 != 0)的测试,将被过滤掉,无法传给最终的订阅者,而只能接受到原序列的结束信号, 因此只会打印“complete“。

Filter方法显示实际是返回的MonoFilter对象。

public final Mono<T> filter(final Predicate<? super T> tester) {

         ……

         return onAssembly(new MonoFilter<>(this, tester));

}

其他Mono装配方法:

  • Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)

:将一个T1类型元素的Mono和一个T2类型元素的Mono中的元素组合成一个Tuple2<T1,T2>元素的Mono. Mono还提供了zip的多种版本,满足各种情况的Mono组合模式。

  • public final Mono<T> timeout(Duration timeout): 当原序列产生一个T类型元素后,如果没有在指定的时间内完成,则将触发一个错误。如果在限期内完成则没有任何影响,该实现使用了MonoTimeout<T, U, V> extends InternalMonoOperator<T, T>。
  • doOnXXXX系列方法,如doOnCancel,  doOnNext, doOnError等, 返回在特定事件上加入行为的增强Mono。

更多Mono的装配方法详见Mono API。

Mono的使用

Mono的使用其实只有一种就是对Mono进行订阅, 但是Mono类也提供了其他传统的接口来进行Mono的使用。

Mono的订阅

订阅Mono很简单,调用Mono对象的subscribe方法,传入一个CoreSubscriber的实现对象即可。

Mono.subscribe.源码中展示了对Mono装配后的复合Mono进行订阅的处理逻辑。

public final void subscribe(Subscriber<? super T> actual) {

    //获取最后一个装配的Mono corePublisher

         CorePublisher publisher = Operators.onLastAssembly(this);

         CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

         ......

             //如果最后一个装配的publisher 实现了OptmizableOperator接口,一路组装

             //增强的Subscriber,按照循序后去下一个OptmizableOperator

                  if (publisher instanceof OptimizableOperator) {

                          OptimizableOperator operator = (OptimizableOperator) publisher;

                          while (true) {

                                   subscriber = operator.subscribeOrReturn(subscriber);

                                   if (subscriber == null) {

                                            return;

                                   }

                                   OptimizableOperator newSource = operator.nextOptimizableSource();

                                   if (newSource == null) {

                                            publisher = operator.source();

                                            break;

                                   }

                                   operator = newSource;

                          }

                  }

             //直到最底层的CorePublisher,使用最终转换所得的subscriber进行订阅,

             //原始序列产生的序号,将在一些列增强subscriber的增强下,或丢弃、或加工后传给

             //实际的订阅者

                  publisher.subscribe(subscriber);

}

Mono的简化使用

       Mono 提供了一些方法简化Mono的订阅操作,如block() 阻塞当前线程知道Mono序列返回元素或完成/异常信号

PublishOn和SubscribeOn

       publishOn 和 SubscribeOn 传入Scheduler对象,将Mono的行为交由Scheduler的现成执行。其中publishOn调用之后的序列行为在新的执行线程执行,而SubscribeOn则是整个序列的执行都在新的现成中执行。

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .publishOn(s) 

.map(i -> "value " + i);

flux.subscribe(System.out::println)

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .subscribeOn(s) 

    .map(i -> "value " + i);

flux.subscribe(System.out::println)

总结

       本文对Reactor的Mono编程进行了初步的介绍,体现了响应式编程的核心在于异步序列的构建(Mono/Flux)和订阅使用。 其中构建时对Mono/Flux的装配(Assembly)是整个编程模型的核心。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/184851.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最新Cocos Creator 3.x 如何动态修改3D物体的透明度

Cocos Creator 3.x 的2D UI有个组件UIOpacity组件可以动态修改UI的透明度,非常方便。很多同学想3D物体上也有一个这样的组件来动态的控制与修改3D物体的透明度。今天基于Cocos Creator 3.8 来实现一个可以动态修改3D物体透明度的组件Opacity3D。 对啦&#xff01;这里有个游戏…

【深度神经网络(DNN)】实现车牌识别

文章目录 前言一、数据集介绍二、步骤1.导包2.参数配置3.数据处理4.模型定义5.模型训练6.模型预测 总结 前言 课内实践作业 车牌识别 一、数据集介绍 1.车牌识别数据集&#xff1a;VehicleLicense车牌识别数据集包含16151张单字符数据&#xff0c;所有的单字符均为严格切割且…

PTL仓储亮灯拣选系统优化仓库作业流程实现物料快速定位

随着现代企业的发展和生产模式的不断演进&#xff0c;仓库管理作为生产供应链中的重要环节&#xff0c;也在不断追求效率和精益化。为了实现企业的现代化仓库管理&#xff0c;实现仓库条码化、自动化、无纸化&#xff0c;做到物料和成品从入库、出库、退库、移库、盘点整个过程…

【UE4】UE编辑器乱码问题

环境&#xff1a;UE4.27、vs2019 如何解决 问题原因&#xff0c;UE的编码默认是UTF-8&#xff0c;VS的默认编码是GBK 通过"高级保存选项" 直接修改VS的 .h头文件 的 编码 为 UTF-8 步骤1. 步骤2. 修改编码后&#xff0c;从新编译&#xff0c;然后就可以解决编辑器…

Docker 介绍

Docker 介绍 1 介绍1.1 概述1.2 资源高效利用1.3 发展历程1.4 组件1.5 工具1.6 对环境部署和虚拟化的影响1.7 优点1.8 容器技术核心CgroupNamespaceUnionFS 2 命令信息、状态、配置info命令用于显示当前系统信息、docker容器、镜像个数、设置等信息 镜像容器资源 3 安装3.1 版本…

2022年电工杯数学建模B题5G网络环境下应急物资配送问题求解全过程论文及程序

2022年电工杯数学建模 B题 5G网络环境下应急物资配送问题 原题再现&#xff1a; 一些重特大突发事件往往会造成道路阻断、损坏、封闭等意想不到的情况&#xff0c;对人们的日常生活会造成一定的影响。为了保证人们的正常生活&#xff0c;将应急物资及时准确地配送到位尤为重要…

jacoco和sonar

目录 jacoco 引入依赖 构建配置修改 单元测试 生成报告 查看报告 报告说明 1. Instructions 2. Branches 3. Cyclomatic Complexity 4. Lines 5. Methods 6. Classes sonar7.7 基础环境 需要下载软件 解压文件并配置 运行启动 jacoco 引入依赖 <dep…

【论文阅读】PSDF Fusion:用于动态 3D 数据融合和场景重建的概率符号距离函数

【论文阅读】PSDF Fusion&#xff1a;用于动态 3D 数据融合和场景重建的概率符号距离函数 Abstract1 Introduction3 Overview3.1 Hybrid Data Structure3.2 3D Representations3.3 Pipeline 4 PSDF Fusion and Surface Reconstruction4.1 PSDF Fusion4.2 Inlier Ratio Evaluati…

Spring boot集成sentinel限流服务

Sentinel集成文档 Sentinel控制台 Sentinel本身不支持持久化&#xff0c;项目通过下载源码改造后&#xff0c;将规则配置持久化进nacos中&#xff0c;sentinel重启后&#xff0c;配置不会丢失。 架构图&#xff1a; 改造步骤&#xff1a; 接着我们就要改造Sentinel的源码。…

Hikyuu 1.3.0 发布,高性能量化交易研究框架

Hikyuu 是一款基于 C/Python 的高性能开源量化交易研究框架&#xff0c;用于快速策略分析及回测。与其他量化平台或回测软件相比&#xff0c;具备&#xff1a; 超快的回测速度&#xff1b;对完整的系统交易理念进行抽象&#xff0c;并分解为不同的组件&#xff0c;通过重用不同…

Windows下MSYS2下载与安装

下载地址&#xff1a; 官网下载地址 https://www.msys2.org/阿里云镜像下载 https://mirrors.aliyun.com/msys2/distrib/x86_64/https://mirrors.aliyun.com/msys2/distrib/x86_64/msys2-x86_64-20231026.exe?spma2c6h.25603864.0.0.12b92551XW5OSM官网下载 ![官网下载](htt…

使用vscode开发uniapp项目常用的辅助插件,提升开发效率

为什么不使用hbuilder开发呢&#xff1f;因为hbuilder对ts和vue3语法支持并不友好&#xff0c;而且代码提示不智能&#xff0c;也不能使用最近很流行的coplit和CodeGeex智能提示&#xff0c;所以就换掉hbulider&#xff0c;使用我们熟悉的vscode开发吧。 第一个&#xff1a;un…

高德地图设置电子围栏

高德地图设置电子围栏 需求效果图代码实现 需求 给地图上人员锚点设置围栏区域&#xff0c;如果在此区域内则是在线状态&#xff0c;不在此区域内则是离线状态 效果图 双击可编辑或清除当前围栏 代码实现 前端实现区域框选&#xff1a; //引入高德地图sdk"amap/amap…

将全连接层替换为卷积层的意义(CNN和FCN)

全连接层&#xff1a;CNN 将特征整合&#xff0c;用于分类&#xff0c;在图像中具体化为知道图像中有猫&#xff0c;但是不知道猫在哪儿。 在传统CNN中&#xff0c;输入是唯一确定大小的。因为全连接层要求输入是固定的。 全卷积网络&#xff1a;FCN 不仅可以用来分类&…

项目文件下载器,基于Thread多线程

目录 1、Http 工具类 2、关于下载的关系类 2.1 展示下载信息 #下载信息展现线程类 #在主下载类中&#xff0c;进行调用上述线程类 2.2 文件的分块下载 #文件分块下载类 #文件按分块进行分别切分的方法 # 使用 LongAdder 类型&#xff0c;更改 DownLoadInfoThread 展现…

计算机中丢失mfc140u.dll怎么解决

mfc140u.dll是一个Microsoft Visual C库文件&#xff0c;主要用于MFC&#xff08;Microsoft Foundation Class&#xff09;应用程序的开发。它包含了MFC应用程序所需的一些常用功能&#xff0c;如对话框、窗口、菜单等。当mfc140u.dll丢失时&#xff0c;可能会导致MFC应用程序无…

margin-bottom、margin-top设置不起效

问题&#xff1a; 如下&#xff0c;在uniapp项目中使用了tag标签&#xff0c;设置margin时&#xff0c;只有margin-left、margin-right生效&#xff0c;margin-bottom、margin-top设置不起效。f12直接在元素中修改也无效。 原因&#xff1a; uni-ui中的tag标签是行内元素&…

idea 模板参数注释 {@link}

1. 新增组 2. 设置方法注释及变量 增加模板文本 ** * $param$ * return {link $return$} */3. 设置变量表达式 勾选跳过param 参数表达式 groovyScript("def result ;def params \"${_1}\".replaceAll([\\\\[|\\\\]|\\\\s], ).split(,).toList();def param…

Vscode禁止插件自动更新

由于电脑的vscode版本不是很新。2022.10月份的版本1.7.2&#xff0c;电脑vscode的python插件装的也是2022年4月份的某个版本&#xff0c;但插件经常自动更新&#xff0c;导致python代码无法Debug,解决办法&#xff1a; 点设置&#xff0c;搜autoUpdate, 把红色框选成无

Linux中固定ip端口和修改ip地址

一&#xff0c;更改虚拟网络编辑器 1&#xff0c;首先启动VMware&#xff0c;选择自己要更改ip或固定ip的虚拟机&#xff0c;并找到虚拟网络配编辑器&#xff0c;点击进入 2&#xff0c;进入之后需要点击右下角获取管理员权限后才能修改&#xff0c;有管理员权限之后图片如下 …