步入响应式编程篇(二)之Reactor API

步入响应式编程篇(二)之Reactor API

  • 前言
  • 回顾响应式编程
  • Reactor API的使用
    • Stream
    • 引入依赖
    • Reactor API的使用
      • 流源头的创建
    • reactor api的背压模式
    • 发布者与订阅者使用的线程
      • 查看弹珠图
      • 查看形成新流的日志

前言

对于响应式编程的基于概念,以及JDK自带的落地实现,可以查看步入响应式编程篇(一)
本篇将介绍Reactor API的使用:

reactor官网介绍,响应式编程是一种与数据流和变化传播相关的异步编程范式。这意味着可以通过所采用的编程语言轻松表达静态(例如数组)或动态(例如时间发射器)数据流;

对比Flow api以及completableFuture,前者编写代码比较麻烦,编写处理器还要自定义一个类,后者还不能满足响应式编程,两者都有其局限性,而Reactor API是基于Stream流操作的,无论是编写还是响应式编程都能满足;

回顾响应式编程

①在面向对象语言中,反应式编程范式通常作为 观察者设计模式的扩展。还可以比较主要的反应流模式与熟悉的迭代器设计模式,因为 Iterable- 所有这些库中的迭代器对。一个主要的区别是,虽然Iterator是基于拉的,但反应流是基于推的。

②使用迭代器是一种命令式编程模式,即使访问值的方法完全由Iterable负责。实际上,由开发人员来选择何时访问序列中的下一个()项。在反应式流中,上述对的等价物是发布者-订阅者。但它是 发布者在新的可用值到来时通知订阅者,这种推送方面是响应的关键。此外,应用于推送值的操作是以声明方式而不是命令方式表达的:程序员表达计算的逻辑,而不是描述其确切的控制流。

③除了推送值之外,还以定义良好的方式涵盖了错误处理和完成方面。发布者可以将新值推送到其订阅者(通过调用onNext),但也可以发出错误信号(通过调用onError)或完成信号(通过调用onComplete)。错误和完成都会终止序列。这可以归纳如下:

onNext x 0…N [onError | onComplete]

这种方法非常灵活。该模式支持没有值、只有一个值或有n个值的用例(包括无限序列的值,例如时钟的连续滴答声)。

Reactor API的使用

Stream

用的就是Java 8引用的Stream API,使用Stream api时也会结合lambda表达式和函数式接口,所以Stream api是包括它们的;
虽然是众所周知,但笔者在此还是提一嘴,Stream API的链式调用,是每个元素完成整个流所有步骤的处理后,再遍历下一个元素的。而不是所有元素完成后,再让流中的下一个步骤处理全部元素,这是一不小心就会陷入的理解误区;

还有诸如flatMap()、map()等参数为函数式接口的种类,总共有四类
//Predicate 有入参且返回值固定为boolean
//Consumer 有入参无返回值
//Function 有一个入参且一个返回值
//Supplier 无入参且有返回值

引入依赖

需引入Reactor API的依赖

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version>
</dependency>

Reactor API的使用

Reactor API的使用是定义流源头,然后往下流,使用链式处理多个步骤,最后流向最后,就是异常或响应信号。

流源头的创建

创建流的方式有Flux和Mono两种,前者是用于创建多个元素的流,后者是创建只有一个元素的流;
在这里插入图片描述

这就分别创建了Flux流和Mono流,很轻易实现了一个全异步的系统。相比Flow流的一顿操作,是不是简便了很多;

而创建流,需指定流的源头,有四种方式
①just(),如上例,穷举出所有元素;
②rang();

//表示流有1到100个元素
Flux.range(1,100);

③generate();
这适用于同步和逐个发射,这意味着接收器是一个SynchronousSink,并且它的next()方法在每次回调调用中最多只能被调用一次。

//入参是泛型为SynchronousSink的Consumer接口
Flux.generate(sink->{//查库存sink.next(queryStock("1"));sink.complete();
}).subscribe(System.out::println);//但同一次回调中循环调用next()会报错,所以先初始化state值为0
Flux.generate(()->0,(state,sink)->{if (state<=10) {//小于10,则继续调用,里面的逻辑是每次将state+1sink.next(state);}else{sink.complete();}return state+1;}).subscribe(System.out::println);

④create方式,create是一种更高级的Flux程序化创建形式,它适合于每轮多个发射,甚至来自多个线程。create无需指定初始值而且可以多次执行sink#next():

Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(queryStock("1"));}sink.complete();
}).subscribe(System.out::println);

在业务上通常使用后两种,例如可以从缓存或DB查到数据,然后类似于观察者模式,往后推送处理;更多使用create,因为generate需设置初始值,而且每次回调只能调用一次sink#next()

reactor api的背压模式

使用背压模式的原因——响应式编程的思想,主要在于推,推送到每个处理器操作,如果不管当前处理能力就很容易处理出问题,类似于MQ消费者控制从队列中获取数据量,要控制消费能力,于是引出背压模式,接收时通过request告诉上游,所以在Reactor中实现背压时,消费者压力传播回源的方式是向上游操作员发送请求。当前请求的总和有时被称为当前“需求”或“待决请求”。需求的上限为Long.MAX_VALUE,表示一个无限的请求(意思是“尽可能快地生产”-基本上禁用反压)。
如下,与Flow API类似,也是在自定义订阅者中指定背压请求request()

Flux.range(1, 10).doOnRequest(r -> System.out.println("request of " + r)).subscribe(new BaseSubscriber<Integer>() {@Overridepublic void hookOnSubscribe(Subscription subscription) {request(1);}@Overridepublic void hookOnNext(Integer integer) {System.out.println("Cancelling after having received " + integer);cancel();}});

发布者与订阅者使用的线程

查看弹珠图

Flux.create(sink->{for (int i=0;i<2;i++) {sink.next(i*queryStock("1"));}//代表推送给下一个操作形成新流sink.complete();//过滤不等于0的数
}).filter(Predicate.not(Predicate.isEqual(0))).subscribe(System.out::println);

鼠标点到filter操作,就可以看到元素(这些不同形状的方块就是了)经过filter后,变少了,点到每个操作map、flatmap等都可以看到对应的弹珠图,可根据弹珠图快速理解形成新流的操作
在这里插入图片描述

查看形成新流的日志

在这里插入图片描述
在每个流后面使用log(),查看该流形成每个元素的日志,如都是request无限元素,然后create后,就调onNext(0),形成元素0,后面filter就过滤了0,就没有再调onNext(0),继续create元素1时,就调 onNext(1),形成元素1,后面filter后,形成的新流中仍然有元素1,所以还会调用onNext(1),以此类推。

默认情况下,发布者使用的线程就是订阅者的线程,那就证明一下:
如上用log()打印出,订阅者和发布者使用的线程都是main,那如果订阅者开启新的线程,下图也能看到发布者回调onNext操作也是使用订阅者的线程:

在这里插入图片描述

下图,使用publishOn()里指定新线程Schedulers.single(),代表过滤后,发布者使用新线程会回调onNext()
在这里插入图片描述
图中①是filter之前,发布者还是用的订阅者的线程回调onNext,②是filter之后,发布者使用新的线程回调onNext()。

总之,上面只是介绍reactor api操作的冰山一角,至于更多操作可以查看官网哈

如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!

参考官网:https://projectreactor.io/docs/core/release/reference/aboutDoc.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/5533.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

66,【6】buuctf web [HarekazeCTF2019]Avatar Uploader 1

进入靶场 习惯性输入admin 还想用桌面上的123.png 发现不行 看看给的源码 <?php // 关闭错误报告&#xff0c;可能会隐藏一些错误信息&#xff0c;在开发阶段可考虑开启&#xff08;例如 error_reporting(E_ALL)&#xff09; error_reporting(0); // 引入配置文件&#x…

FortiGate配置远程拨号VPN

我们前面介绍了FortiGate如何配置IPsec VPN的两种类型&#xff1a;站到站&#xff08;卷土重来&#xff01;这次终于把FortiGate的IPsec VPN配置成功了&#xff01;&#xff09;和Hub-and-Spoke&#xff08;漂亮&#xff01;FortiGate配置Hub-Spoke类型的IPsec VPN竟然是Full-M…

linux下springboot项目nohup日志或tomcat日志切割处理方案

目录 1. 配置流程 2. 配置说明 其他配置选项&#xff1a; 3. 测试执行 4. 手动执行 https://juejin.cn/post/7081890486453010469 通常情况下&#xff0c;我们的springboot项目部署到linux服务器中&#xff0c;通过nohup java -jar xxx.jar &指令来进行后台运行我们…

CSS中相对定位和绝对定位详解

文章目录 CSS中相对定位和绝对定位详解一、引言二、相对定位1、相对定位的概念1.1、代码示例 三、绝对定位1、绝对定位的概念1.1、代码示例 四、相对定位与绝对定位的区别五、使用示例六、总结 CSS中相对定位和绝对定位详解 一、引言 在CSS布局中&#xff0c;定位是一种强大的…

XCode-Color-Fixer 常见问题解决方案

XCode-Color-Fixer 常见问题解决方案 XCode-Color-Fixer StoryBoard / XIB 颜色偏差很严重&#xff0c;怎么破&#xff1f;XCode-Color-Fixer帮你忙&#xff01; 项目地址: https://gitcode.com/gh_mirrors/xc/XCode-Color-Fixer 项目基础介绍 XCode-Color-Fixer 是一个…

Visual Studio2019调试DLL

1、编写好DLL代码之后&#xff0c;对DLL项目的属性进行设置&#xff0c;选择待注入的DLL&#xff0c;如下图所示 2、生成DLL文件 3、将DLL设置为启动项目之后&#xff0c;按F5启动调试。弹出选择注入的exe的界面之后&#xff0c;使用代码注入器注入步骤2中生成的dll&#xff0…

C++入门基础篇:域、C++的输入输出、缺省参数、函数重载、引用、inline、nullptr

本篇文章是对C学习前期的一些基础部分的学习分享&#xff0c;希望也能够对你有所帮助。 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 目录 1.第一个C程序 2. 域 3. namespace 3.1 namespace的作用 3.2 namespace的定义 3.3 namespace使用说明 4.C的输入和输出…

在Ubuntu上安装RabbitMQ教程

1、安装erlang 因为rabbitmq是基于erlang开发的&#xff0c;所以要安装rabbitmq&#xff0c;首先需要安装erlang运行环境 apt-get install erlang执行命令查是否安装成功&#xff1a;erl&#xff0c;疯狂 Ctrlc 就能退出命令行 2、安装rabbitmq 1、查看erlang与rabbitmq版本…

latin1_swedish_ci(latin1 不支持存储中文、日文、韩文等多字节字符)

文章目录 1、SHOW TABLE STATUS WHERE Name batch_version;2、latin1_swedish_ci使用场景注意事项修改字符集和排序规则修改表的字符集和排序规则修改列的字符集和排序规则修改数据库的默认字符集和排序规则 3、ALTER TABLE batch_version CONVERT TO CHARACTER SET utf8mb4 C…

使用vue-next-admin框架后台修改动态路由

vue-next-admin框架是一个基于 Vue 3 和 Vite 构建的后台管理系统框架。它采用了最新的前端技术栈&#xff0c;旨在提供一个高效、灵活、现代化的管理后台解决方案。该框架主要用于构建功能丰富且易于定制的管理后台应用&#xff0c;适合各种中大型项目。 其主要特点包括&am…

qiankun+vite+vue3

基座与子应用代码示例 本示例中,基座为Vue3,子应用也是Vue3,由于qiankun不支持Vite构建的项目,这里还要引入 vite-plugin-qiankun 插件 基座(主应用) 加载qiankun依赖 npm i qiankun -S qiankun配置(src/qiankun) src/qiankun/config.ts export default {subApp…

深度学习中Batch Normalization(BN)原理、作用浅析

最近做剪枝学习&#xff0c;其中一种是基于BN层的γ作为缩放因子进行剪枝的&#xff0c;那么我想搞懂BN的工作原理更好的理解网络、剪枝等&#xff0c;所以有了该文。 首先先说BN的作用在详细拆解&#xff0c;理解。以知乎一条高赞评论说明BN层到底在干什么。 Batch Norm 为什…

Python----Python高级(正则表达式:语法规则,re库)

一、正则表达式 1.1、概念 正则表达式&#xff0c;又称规则表达式,&#xff08;Regular Expression&#xff0c;在代码中常简写为regex、 regexp或RE&#xff09;&#xff0c;是一种文本模式&#xff0c;包括普通字符&#xff08;例如&#xff0c;a 到 z 之间的字母&#xff0…

【16届蓝桥杯寒假刷题营】第1期DAY5

5.依依的询问最小值 - 蓝桥云课 问题描述 依依有个长度为 n 的序列 a&#xff0c;下标从 1 开始。 她有 m 次查询操作&#xff0c;每次她会查询下标区间在 [li​,ri​] 的 a 中元素和。她想知道你可以重新排序序列 a&#xff0c;使得这 m 次查询的总和最小。 求你求出 m 次…

机器学习10-解读CNN代码Pytorch版

机器学习10-解读CNN代码Pytorch版 我个人是Java程序员&#xff0c;关于Python代码的使用过程中的相关代码事项&#xff0c;在此进行记录 文章目录 机器学习10-解读CNN代码Pytorch版1-核心逻辑脉络2-参考网址3-解读CNN代码Pytorch版本1-MNIST数据集读取2-CNN网络的定义1-无注释版…

【机器学习实战中阶】使用SARIMAX,ARIMA预测比特币价格,时间序列预测

数据集说明 比特币价格预测&#xff08;轻量级CSV&#xff09;关于数据集 致谢 这些数据来自CoinMarketCap&#xff0c;并且可以免费使用该数据。 https://coinmarketcap.com/ 数据集:链接: 价格预测器 源代码与数据集 算法说明 SARIMAX&#xff08;Seasonal AutoRegressive …

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)

问题 项目里使用了 AzureBlob 存储了用户上传的各种资源文件&#xff0c;近期 AzureBlob 的流量费用增长很快&#xff0c;想通过分析Blob的日志&#xff0c;获取一些可用的信息&#xff0c;所以有了这个需求&#xff1a;将存储账户的日志&#xff08;读写&#xff0c;审计&…

【Red Hat8】:搭建FTP服务器

目录 一、匿名FTP访问 1、新建挂载文件 2、挂载 3、关闭防火墙 4、搭建yum源 5、安装VSFTPD 6、 打开配置文件 7、设置配置文件如下几个参数 8、重启vsftpd服务 9、进入图形化界面配置网络 10、查看IP地址 11、安装ftp服务 12、遇到拒绝连接 13、测试 二、本地…

Redis的Windows版本安装以及可视化工具

文章目录 redis安装redis安装包下载解压文件夹启动redis服务Redis路径配置环境变量打开redis客户端进行连接基础操作测试 redis可视化工具下载Redis Desktop Manager redis安装 redis安装包下载 windows版本readis下载&#xff1a;Releases tporadowski/redis 解压文件夹 我…

汽车钥匙发展史

介绍 最近在研究UWB数字钥匙的过程中&#xff0c;了解到汽车钥匙在短短的100多年以来的发展历程&#xff0c;让我不禁感慨科技的发展速度&#xff0c;本文主要介绍汽车发展过程中&#xff0c;车钥匙形态变化的历程及技术原理。 总体概述&#xff0c;汽车钥匙的发展&#xff0…