响应式编程初探-自定义实现Reactive Streams规范

最近在学响应式编程,这里先记录下,响应式编程的一些基础内容

1.名词解释

Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。

  1. Reactive Streams:

    • Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
    • Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 java.util.concurrent.Flow 类,定义了Reactive Streams规范。
  2. Reactor:

    • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
    • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。
  3. WebFlux:

    • WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
    • Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
  4. 响应式编程:

    • 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
    • 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。

综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。

2.Reactive Streams 规范

2.1.Reactive Streams规范定义

java.util.concurrent.Flow 类中,定义了Reactive Streams规范
在这里插入图片描述

  • Publisher(发布者):负责生成数据流,并向订阅者发送数据。
  • Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
  • Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
  • Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。

2.2.API方法

1. Publisher(发布者):
interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
}
  • subscribe(Subscriber<? super T> subscriber) 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
}
  • onSubscribe(Subscription subscription) 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。

  • onNext(T item) 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。

  • onError(Throwable throwable) 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。

  • onComplete() 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。

3. Subscription(订阅):
interface Subscription {void request(long n);void cancel();
}
  • request(long n) 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。

  • cancel() 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。

4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 接口是 SubscriberPublisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。

  • Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()

  • Publisher 部分的方法:subscribe(Subscriber<? super R> subscriber)。表示 Processor 可以被其他订阅者订阅。

5.泛型T

泛型T即为数据流

这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。

2.3.工作流程

在 Reactive Streams 中,PublisherSubscriberSubscriptionProcessor 之间的协作流程如下:

有时间再补流程图
在这里插入图片描述

  1. Publisher(发布者):

    • Publisher 是异步产生数据流的组件,它通过 subscribe 方法允许订阅者订阅。subscribe 方法会接收一个 Subscriber 对象作为参数。
    • Publisher 有新数据准备好时,通过调用订阅者的 onNext 方法将数据推送给订阅者。
    interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
    }
    
  2. Subscriber(订阅者):

    • Subscriber 是数据流的消费者,通过实现 Subscriber 接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n) 请求一定数量的数据,处理数据时通过 onNext 方法接收元素。
    • 当订阅者无法处理更多的元素时,可以调用 subscription.cancel() 来取消订阅。
    interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
    }
    
  3. Subscription(订阅):

    • Subscription 表示订阅关系,它在 onSubscribe 方法中被传递给订阅者。通过 Subscription,订阅者可以请求数据和取消订阅。
    • 订阅者通过 request(long n) 方法请求处理 n 个元素,通过 cancel() 方法取消订阅。
    interface Subscription {void request(long n);void cancel();
    }
    
  4. Processor(处理器):

    • Processor 是一个同时实现了 PublisherSubscriber 接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。
    • Processor 既能接收数据,也能发布数据。它将 onNextonErroronComplete 方法委托给下游的订阅者,并将数据推送给上游的发布者。
    interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。

3.自定义实现Reactive Streams规范

自己实现了一个,参考了SubmissionPublisher

  • 同步实现的
  • 功能不完善
  • 有bug
class MyPublisher implements Flow.Publisher<String>{MySubscription<String> subscription;public int request ;public void publish(String item){subscription.items.add(item);while (true) {if (request > 0) {for (int i = 0; i < request; i++) {if (!subscription.items.isEmpty()) {try {Object o = subscription.items.get(subscription.items.size() - 1);subscription.subscriber.onNext(o.toString());subscription.items.remove(o);}catch (Exception e){subscription.subscriber.onError(e);return;}}}}if (subscription.items.isEmpty()) {break;}}}@Overridepublic void subscribe(Flow.Subscriber<? super String> subscriber) {System.out.println("第一步:绑定订阅者" );MySubscription<String> subscription = new MySubscription<>(subscriber,this);this.subscription = subscription;subscriber.onSubscribe(subscription);}}class MySubscriber implements Flow.Subscriber<String>{private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("第二步:接收Subscription" );this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("第四步:推送数据" );System.out.println("MySubscriber 消费了item = " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("出异常了 = " + throwable);}@Overridepublic void onComplete() {}}class MySubscription<T> implements Flow.Subscription{final Flow.Subscriber<? super T> subscriber;final MyPublisher publisher;List items = new ArrayList();public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {this.subscriber = subscriber;this.publisher = publisher;}@Overridepublic void request(long n) {this.publisher.request++;System.out.println("第三步:拉取请求" );}@Overridepublic void cancel() {}
}
public class FlowDemo {public static void main(String[] args) {MyPublisher myPublisher = new MyPublisher();MySubscriber mySubscriber = new MySubscriber();myPublisher.subscribe(mySubscriber);myPublisher.publish("111");myPublisher.publish("222");myPublisher.publish(null);}
}

4.Jdk实现Reactive Streams使用示例

class SimplePublisher implements Flow.Publisher<Integer> {private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();public void publishItems() {for (int i = 1; i <= 5; i++) {publisher.submit(i);}// 发布者完成发布publisher.close();}@Overridepublic void subscribe(Flow.Subscriber<? super Integer> subscriber) {publisher.subscribe(subscriber);}
}class SimpleSubscriber implements Flow.Subscriber<Integer> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(Integer item) {System.out.println("Received item: " + item);// 处理完一个元素后请求下一个subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("Processing completed.");}
}public class ReactiveStreamsExample {public static void main(String[] args) throws InterruptedException {// 创建发布者和订阅者SimplePublisher simplePublisher = new SimplePublisher();SimpleSubscriber simpleSubscriber = new SimpleSubscriber();// 订阅者订阅发布者simplePublisher.subscribe(simpleSubscriber);// 发布者发布数据simplePublisher.publishItems();// 睡一觉,确保数据处理完成Thread.sleep(3000);}
}

学习打卡:Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/239313.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

查看Linux系统内存、CPU、磁盘使用率和详细信息

一、查看内存占用 1、free # free -m 以MB为单位显示内存使用情况 [rootlocalhost ~]# free -mtotal used free shared buff/cache available Mem: 11852 1250 8668 410 1934 9873 Swap: 601…

【Kotlin】协程的字节码原理

前言 协程是Koltin语言最重要的特性之一&#xff0c;也是最难理解的特性。网上关于kotlin协程的描述也是五花八门&#xff0c;有人说它是轻量级线程&#xff0c;有人说它是无阻塞式挂起&#xff0c;有人说它是一个异步框架等等&#xff0c;众说纷芸。甚至还有人出了书籍专门介…

Linux用户提权

新建用户 用root账户修改文件&#xff0c;添加信任用户 使用sudo提权&#xff0c;可以使用 **root删除新建账户**

各种版本对应关系:SpringCloudAlibaba——SpringCloud——SpringBoot——SpringFramework——JDK

SpringCloudAlibaba——SpringCloud——SpringBoot——SpringFramework——JDK 一般情况&#xff0c;在https://github.com/项目/wiki目录下有发布信息及对应的要求其他依赖的版本信息SpringCloudAlibaba——SpringCloud——SpringBootSpringBoot和SpringFramework的版本对应关…

Web3的应用发展及其影响

Web3&#xff0c;又被称为去中心化Web&#xff0c;是互联网发展的一个阶段&#xff0c;其核心特点是数据的去中心化和用户自主权。近年来&#xff0c;随着区块链技术的不断成熟&#xff0c;Web3的应用也得到了广泛的关注和发展。在这篇文章中&#xff0c;我们将深入探讨Web3目前…

vuex的初步使用-1

1. 介绍 Vuex 是一个 Vue 的 状态管理工具&#xff0c;状态就是数据。 简单讲&#xff1a;Vuex 就是一个插件&#xff0c;可以帮我们管理 Vue 通用的数据 (多组件共享的数据)。相对于一个仓库&#xff1a;存放组件共享的数据。 2. 安装 vuex 安装vuex与vue-router类似&…

LabVIEW模拟荧光显微管滑动实验

LabVIEW模拟荧光显微管滑动实验 在现代生物医学研究中&#xff0c;对微观生物过程的精准模拟和观察至关重要。本案例展示了如何利用LabVIEW软件和专业硬件平台&#xff0c;创新地模拟荧光显微管在滑动实验中的动态行为&#xff0c;这一过程不仅提升了实验效率&#xff0c;还为…

Jenkins基础篇--添加用户和用户权限设置

添加用户 点击系统管理&#xff0c;点击管理用户&#xff0c;然后点击创建用户&#xff08;Create User&#xff09; 用户权限管理 点击系统管理&#xff0c;点击全局安全配置&#xff0c;找到授权策略&#xff0c;选择安全矩阵&#xff0c;配置好用户权限后&#xff0c;点击…

C++ 开发 + VSCode 调试

C 开发 VSCode 调试 MSYS2 安装 gcc、make下载安装MSMYS2pacman 添加镜像源 GCC1. 安装2. 查看结果3. 环境变量 GDB VSCode 调试所需插件创建项目调试代码1. tasks.json 配置任务2. launch.json 配置调试3. 运行 更进一步的 C/C 设置 参考资料 MSYS2 安装 gcc、make 下载 官…

1.16云支教(集合的概念、定义)

1.集合 1.集合的概念&#xff08;意思&#xff09; 把集合理解成一个塑料袋 要往塑料袋装什么&#xff0c; 装的元素就是里面装的各个东西&#xff0c; 表示方法 1。直接穷举 A{装的到底是什么&#xff08;竹子&#xff09;} 2.描述法 描述装的东西的性质&#xff0c;…

MySQL 从零开始:03 基本入门语句

文章目录 1、连接数据库1.1 命令提示符登陆1.2 MySQL 8.0 Command Line Client 登陆1.3 MySQL Workbench 登陆 2、基本语句2.1 查看所有库2.2 创建库2.3 删除库2.4 选择数据库2.5 查看表2.6 创建表2.7 删除表2.8 改表名2.9 清空表 在上一小节中介绍了 MySQL 数据库的安装&#…

统计学-R语言-4.1

文章目录 前言编写R函数图形的控制和布局par函数layout函数 练习 前言 安装完R软件之后就可以对其进行代码的编写了。 编写R函数 如果对数据分析有些特殊需要&#xff0c;已有的R包或函数不能满足&#xff0c;可以在R中编写自己的函数。函数的定义格式如下所示&#xff1a; …

找不到mfc100.dll的解决方法,怎么修复mfc100.dll文件

当我们在使用电脑时&#xff0c;时常可能会遇到各类系统提示的错误信息。"找不到mfc100.dll" 就是这些错误之一&#xff0c;该错误提示会妨碍我们执行一些应用程序或特定代码。为了帮助读者克服这个技术障碍&#xff0c;本篇文章将详尽阐明导致该问题的根本原因&…

白码CRM快速实现报价转订单功能

某crm项目已经做到销售模块了&#xff0c;销售模块实现了从报价到销售单&#xff0c;再到财务模块的应收流程。但使用过程中发现不好用的地方&#xff1a;报价通过后客户下单&#xff0c;销售相关人员又要重新录入数据一样的销售单&#xff0c;觉得这样的操作比较繁琐&#xff…

vue3-响应式基础之reactive

reactive() 还有另一种声明响应式状态的方式&#xff0c;即使用 reactive() API。与将内部值包装在特殊对象中的 ref 不同&#xff0c;reactive() 将使对象本身具有响应性&#xff1a; 「点击按钮1」 <script lang"ts" setup> import { reactive } from vuec…

一文了解Spring线程池(超详细+干货满满)

Spring默认线程池 simpleAsyncTaskExecutor Spring异步线程池的接口类是TaskExecutor &#xff0c;本质还是 java.util.concurrent.Executor&#xff0c;没有配置的情况下&#xff0c;默认使用的是 simpleAsyncTaskExecutor Component EnableAsync public class ScheduleTask…

如何理解产品经理和项目经理的区别?

最近很多人咨询“项目经理跟产品经理该怎么选&#xff0c;我更适合哪个&#xff1f;”“项目经理跟产品经理哪个更有钱途 ”等等&#xff0c;今天就一次性说清楚项目经理跟产品经理有什么区别&#xff0c;应该怎么选择。 不想看长篇大论的&#xff0c;来找我&#xff0c;直接把…

2023年上半年软件设计师下午真题及答案解析

试题一(15分) 随着农业领域科学种植的发展&#xff0c;需要对农业基地及农事进行信息化管理&#xff0c;为租户和农户等人员提供种植相关服务&#xff0c;现欲开发农事管理服务平台&#xff0c;其主要功能是&#xff1a; (1)人员管理&#xff1a;平台管理员管理租户&#xff…

在linux环境下安装lnmp

lnmp官网&#xff1a;https://lnmp.org 一&#xff1a;lnmp安装 参考&#xff1a;https://lnmp.org/install.html 1&#xff1a;下载lnmp安装包 wget https://soft.lnmp.com/lnmp/lnmp2.0.tar.gz -O lnmp2.0.tar.gz 2&#xff1a;解压lnmp安装包 tar zxf lnmp2.0.tar.gz …

Python-高阶函数

在Python中&#xff0c;高阶函数是指能够接收函数作为参数&#xff0c;或者能够返回函数的函数。这种特性使得函数在Python中可以被灵活地传递和使用。以下是一些关于Python高阶函数的详细解释&#xff1a; 函数作为参数&#xff1a; 高阶函数可以接收其他函数作为参数。这样的…