Reactive StreamsReactor Core

Reactive Streams&Reactor Core

  • 一、概述
    • 1、问题
    • 2、优势
    • 3、发展
  • 二、Reactive Streams
    • 1、依赖
    • 2、API
  • 三、Project Reactor
    • 1、概述
    • 2、并发模型
    • 3、入门
      • 1)依赖
      • 2)Flux和Mono
      • 3)空流&错误流
    • 4、订阅响应式流
      • 1)常见订阅
      • 2)自定义订阅
    • 5、API
      • 1)index
      • 2)timestamp
      • 3)any
      • 4)map
      • 5)filter
      • 6)collect
      • 7)distinct
      • 8)flatMap
      • 9)scan
      • 10)thenMany
      • 11)组合响应流
      • 12)流元素批处理
      • 13)sample(采样)
      • 14)blockLast(响应流转化为阻塞流)
      • 15)materialize(物化和非物化)
      • 16)onErrorResume(错误处理)
      • 17)defer(冷热数据流)
      • 18)transform(组合&转换响应流)
    • 6、编程方式创建流
      • 1)push&create
      • 2)generate
      • 3)using-disposable
      • 4)usingWhen

一、概述

1、问题

传统的命令式编程在面对当前需求时会有一些限制,比如,在应用负载较高时,应用需要有更高的可用性,并提供低的延迟时间。

1)资源消耗大

使用Servlet开发的单体应用,是基于传统的Thread per Request模型。当服务部署到Tomcat后,Tomcat有线程池,每个请求交给线程池中的一个线程来执行,如果执行过程中包括访问数据库,或者包括读取文件,则在调用数据库时或读取文件时,请求线程是阻塞的,即使是阻塞线程也是占用资源的,典型的每个线程要使用1MB的内存。如果有并发请求,则会同时有多个线程处于阻塞状态,每个线程占据一份资源。

同时,Tomcat的线程池大小决定了可以同时处理多少个请求。如果应用基于微服务架构,我们可以横向扩展,但是也有内存高占用的问题。因此,当并发数很大的时候,Thread per Request模型很消耗资源。

2)压垮客户端

服务A请求服务的数据,如果数据量很大,超过了服务能处理的程度,则导致服务OOM


2、优势

使用响应式编程的优势:

  1. 不用Thread per request模型,使用少量线程即可处理大量的请求
  2. 在执行I/O操作时不让线程等待
  3. 简化并行调用
  4. 支持背压,让客户端告诉服务端它可以处理多少负载。

3、发展

Reactive Streams:

Reactive Streams是个规范,它规范了“有非阻塞背压机制的异步的流处理”。真正正确理解异步、非阻塞并不容易。实际上Reactive Streams规范或者说它的第三方代码实现包含的内容更加丰富:除了non-blocking,还有:Composable、Deferred、Flow Controll、Resilient、Interruptible。

其中Composable就是函数式编程思想的体现。 可体会下Java8里的Stream API各种算子的参数,所以Lamda表达式是进行Reactive Streams实现的基本前提,否则很难想象臃肿的面向对象的Composable。有了JDK8的铺垫,Reactive Streams接口被JDK9定义在Flow里才是可能的


ReactiveX Java(Rx Java):

2011年,微软发布了NET的响应式扩展(Reactive Extensions,即ReactiveX或Rx),以方便异步、事件驱动的程序。ReactiveX混合了迭代模式和观察者模式。不同之处在于一个是推模式,一个是基于迭代器的拉模式。除了对变化事件的观察,完成事件和异常事件也会发布给订阅者。

ReactiveX的基本思想是事件是数据,数据是事件。响应式扩展被移植到几种语言和平台上,当然包括JavaScript、Python、C++、Swift和Java。ReactiveX很快成为一种跨语言的标准,将反应式编程引入到行业中。

RxJava 1.0于2014年11月发布。RxJava是其他Reactivex JVM端口的主干,比如Rx Scala、Rx Kotin、RxGroovy。它已经成为Android开发的核心技术,并且已经进入Java后端开发。许多RxJavaAdapter库,例如RxAndroid、RxJava JDBC、Rx Netty和RxJavaF X调整了几个Java框架,使之成为响应式的,并且可以开箱即用地使用RxJava


Akka:

Akka是一个受欢迎的框架,具有大量功能和大型社区。然而,Akka最初是作为Scala生态系统的一部分构建的,在很长一段时间内,它仅在基于Scala编写的解决方案中展示了它的强大功能。尽管Scala是一种基于JVM的语言,但它与Java明显不同。几年前,Akka直接开始支持Java,但出于某些原因,它在Java世界中不像在Scala世界中那么受欢迎。

Vert.x:

Vert.x框架也是构建高效响应式系统的强大解决方案。Vert.x的设计初衷是作为Node.js在Java虚拟机上的替代方法,它支持非阻塞和事件驱动。然而,Vert.x仅在几年前才开始具备竞争力。


Project Reactor:

既然 Spring 都提供了对 Reactive Streams 的实现,感觉其实上面列出的几个库已经没有太多的意义。各家对Reactive Streams规范的实现在细节上都有很大不同,因为Spring 的生态太强大了,如果没有特殊的需求,比如JDK小于8,那么我们的项目使用Project Reactor应该是较好的选择。

Project Reactor 到目前为止经历了 1.0、2.0、 3.0。其中1.0这个阶段还没有Reactive Stream的规范。在2.0开始Follow 规范并基本定型。3.0 感觉是个重构版,形成 Reactive-Streams-commons库。

有了Project Reactor这样的基础库,整个Spring组件基本都有了Reactive Style的版本,在这个基础上用Netty(或 Servet 3.1 Containe)+ Reactive Streams 适配层 + Spring Security Reactive + WebFlux + Spring Data Reactive Repository,就可以构建出重头到尾的 Reactive 应用。

从Spring Cloud的组件角度讲,也衍生出Reactive Discovery Client、Reactive Load Balancer、Blockhound, Reactor Debug、Improved Reactor Micrometer Support、Reactor Netty Metric…




二、Reactive Streams

是一种规范
访问地址:https://www.reactive-streams.org/https://github.com/reactive-streams/reactive-streams-jvm/tree/v1.0.4

Reactive Streams是一种基于异步流处理的标准化规范,旨在使流处理更加可靠、高效和响应式。反应式流的特点简单来说就是:基本特性(变化传递 + 数据流 + 声明式) + 高级特性(非阻塞回压 + 异步边界)


Reactive Streams的核心思想是让发布者(Publisher)和订阅者(Subscriber)之间进行异步流处理,以实现非阻塞的响应式应用程序。发布者可以产生任意数量的元素并将其发送到订阅者,而订阅者则可以以异步方式处理这些元素。Reactive Streams还定义了一些接口和协议,以确保流处理的正确性和可靠性。


1、依赖

<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams</artifactId><version>1.0.4</version>
</dependency>
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams-tck</artifactId><version>1.0.4</version><scope>test</scope>
</dependency>

2、API

Publisher:定义了生产元素并将其发送给订阅者的方法。

public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}

Subscriber:定义了接收元素并进行处理的方法。

public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}

Subscription:定义了订阅者和发布者之间的协议,包括请求元素和取消订阅等。

public interface Subscription {public void request(long n);public void cancel();
}

Processor:定义了同时实现Publisher和Subscriber接口的中间件,它可以对元素进行转换或者过滤等操作。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}




三、Project Reactor

1、概述

Project Reactor由Reactor文档中列出的一组模块组成。主要组件是Reactor Core,其中包含响应式类型Flux和Mono,它们实现了Reactive Streams的Publisher接口以及一组可应用于这些类型的运算符。

其他模块:

  1. Reactor Test:提供一些实用程序来测试响应流
  2. Reactor Extra:提供一些额外的Flux运算符
  3. Reactor Netty:无阻塞且支持背压的TCP、HTTP和UDP的客户端和服务器
  4. Reactor Adapter:用于与其他响应式库,如RxJava和Akka Streams的适配
  5. Reactor Kafka:用于Kafka的响应式API,作为Kafka的生产者和消费者

2、并发模型

有两种在响应式链中切换执行某些的方式:publishOn和subscribeOn

区别如下:

  • publishOn(Scheduler Scheduler):影响所有后续运算符的执行(只要未指定其他任何内容)
  • subscribeOn(Scheduler Scheduler):根据链中最早的subscribeOn调用,更改整个操作符链所订阅的线程。它不影影响随后对publishOn的调用的行为

Schedulers类包含用于提供执行上下文的静态方法:

  • parallel():为并行工作而调整的固定工作池,可创建与cpu内核数量一样多的工作线程池。
  • single:单个可重用线程。此方法为所有调用方重用同一线程,直到调度程序被释放为止。如果您希望使用按调用专用线程,则可以为每个调用使用schedulers.newSingle()
  • boundedElastic:动态创建一定数量的工作者,它限制了它可以创建的支持线程的数量,并且可以在线程可用时重新调度要排队的任务。这是包装同步阻塞调用的不错选择
  • immediate():立即在执行线程上运行,而不切换执行上下文
  • fromExecutorService(Executorservice):可用于从任何现有ExecutorService中创建调度程序

3、入门

1)依赖

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.3.5.RELEASE</version>
</dependency>
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.3.5.RELEASE</version>
</dependency>

2)Flux和Mono

Flux和Mono提供了许多工厂方法,可以根据已有的数据创建响应流。

Flux:

public static void main(String[] args) {Flux.just("d1", "d2").subscribe(System.out::print);Flux.fromArray(new String[]{"d1", "d2"}).subscribe(System.out::println);Flux.fromIterable(Arrays.asList("d1", "d2")).subscribe(System.out::println);Flux.range(1, 10).subscribe(System.out::println);Flux.defer(() -> Flux.range(1, 3)).subscribe(System.out::println);
}

Mono:

public static void main(String[] args) {Mono.just(1).subscribe(System.out::println);Mono.justOrEmpty(Optional.empty()).subscribe(System.out::println);Mono.defer(() -> Mono.just("hello")).subscribe(System.out::println);Mono.fromCallable(() -> "fromCallable").subscribe(System.out::println);Mono.fromSupplier(() -> "fromSupplier").subscribe(System.out::println);Mono.fromFuture(() -> CompletableFuture.completedFuture("fromFuture")).subscribe(System.out::println);Mono.fromCompletionStage(() -> CompletableFuture.completedFuture("fromCompletionStage")).subscribe(System.out::println);Mono.fromRunnable(() -> {System.out.println("fromRunnable");}).subscribe(System.out::println);
}

3)空流&错误流

  1. empty():工厂方法,它们分别生成Flux或Mono的空实例
  2. never():方法会创建一个永远不会发出完成、数据或错误等信号的流
  3. error(Throwable):工厂方法创建一个序列,该序列在订阅时始终通过每个订阅者的onError方法传播错误,由于错误是在Flux或Mono声明期间被创建的,因此,每个订阅者都会收到相同的Throwable实例
public static void main(String[] args) {Mono.never().subscribe(System.out::println);Flux.never().subscribe(System.out::println);Mono.empty().subscribe(System.out::println);Flux.empty().subscribe(System.out::println);// subscribe的第二个参数代表错误的捕获Mono.error(new RuntimeException("Mono error")).subscribe(System.out::println, System.out::println);Flux.error(new RuntimeException("Flux error")).subscribe(System.out::println, System.out::println);
}

4、订阅响应式流

Flux和Mono提供了对subscribe()方法的基于Lambda的重载,简化了订阅的开发。subscribe方法的所有重载都返回Disposable接口的实例,可以用于取消基础的订阅过程。

1)常见订阅

重载相关方法

public static void main(String[] args) {Flux.range(100, 10).subscribe(e -> {System.out.println("接收到:" + e);}, ex -> {System.out.println("发生异常:" + ex);}, () -> {System.out.println("执行完成");});
}

2)自定义订阅

自定义Subscriber类:

public static void main(String[] args) {Flux.range(100, 10).subscribe(new Subscriber<Integer>() {Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {subscription = s;subscription.request(1);}@Overridepublic void onNext(Integer integer) {System.out.println("接收到:" + integer);subscription.request(1);}@Overridepublic void onError(Throwable t) {System.out.println("发生异常:" + t);}@Overridepublic void onComplete() {System.out.println("执行完成");}});
}

该定义订阅的方法存在一定问题。它打破了线性代码流,也容易出错。最困难的部分是需要自己管理背压并正确实现订阅者的所有TCK要求。在该示例中,就打破了有关订阅验证和取消这几个TCK要求。


建议扩展Project Reactor提供的BaseSubscriber类。在这种情况下订阅示例:

public static void main(String[] args) {Flux.range(100, 10).subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("发生异常:" + throwable);}@Overrideprotected void hookOnComplete() {System.out.println("执行完成");}@Overrideprotected void hookOnSubscribe(Subscription subscription) {request(1);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("接收到:" + value);request(1);}});
}

使用BaseSubscriber类,实现符合TCK的订阅者更为容易。订阅者在本身拥有生命周期管理的宝贵资源时,会需要这种方法,例如,订阅者可能包装文件处理程序或连接到第三方服务的WebSocket链接。


5、API

SDK内部还有具体的弹珠图示意
如:Flux.collect(Supplier containerSupplier, BiConsumer<E, ? super T> collector)

1)index

public static void main(String[] args) {Flux.range(100, 10).index().subscribe(System.out::println);
}

结果:

2)timestamp

public static void main(String[] args) {Flux.range(100, 10).timestamp().subscribe(System.out::println);
}

结果:

3)any

public static void main(String[] args) {Flux.just("456", "789", "123").all(e -> e.equalsIgnoreCase("123")).subscribe(System.out::println);Flux.just("456", "789", "123").any(e -> e.equalsIgnoreCase("123")).subscribe(System.out::println);Flux.just("456", "789", "123").hasElement("123").subscribe(System.out::println);Flux.just("456", "789", "123").hasElements().subscribe(System.out::println);Flux.just("456").hasElements().subscribe(System.out::println);Flux.empty().hasElements().subscribe(System.out::println);try {System.in.read();} catch (IOException e) {throw new RuntimeException(e);}
}

结果:

4)map

public static void main(String[] args) {Flux.range(100, 10).map(w -> "map:" + w).subscribe(System.out::println);
}

结果:

5)filter

  1. take(n)操作符限制所获取的元素,该方法忽略除前n个元素之外的所有
  2. takeLast仅返回流的最后一个元素。
  3. takeUntil(Predicate)传递一个元素直到满足某个条件。
  4. elementAt(n)只可用于获取序列的第n个元素。
  5. single操作符从数据源发出单个数据项,也为空数据源发出NoSuchElementException错误信号,或者为具有多个元素的数据源发出IndexOutOfBoundsException信号。它不仅可以基于一定数量来获取或跳过元素,还可以通过带有Duration的skip(Duration)或take(Duration)操作符。
  6. takeUntilOther(Publisher)或skipUntilOther(Publisher)操作符,可以跳过或获取一个元素,直到某些消息从另一个流到达。
public static void main(String[] args) {// filterFlux.range(100, 10).filter(e -> e.equals(105)).subscribe(e -> {System.out.printf(e + "\t");});System.out.println();// takeFlux.range(100, 10).take(5).subscribe(e -> {System.out.printf(e + "\t");});System.out.println();// takeLastFlux.range(100, 10).takeLast(3).subscribe(e -> {System.out.printf(e + "\t");});System.out.println();// takeUntilFlux.range(100, 10).takeUntil(e -> e.equals(108)).subscribe(e -> {System.out.printf(e + "\t");});System.out.println();// elementAtFlux.range(100, 10).elementAt(2).subscribe(e -> {System.out.printf(e + "\t");});System.out.println();// singleFlux.range(100, 1).single().subscribe(e -> {System.out.printf(e + "\t");}, ex -> {System.out.printf("ex:" + ex + "\t");});System.out.println();Flux.range(100, 10).single().subscribe(e -> {System.out.printf(e + "\t");}, ex -> {System.out.printf("ex:" + ex + "\t");});System.out.println();Flux.empty().single().subscribe(e -> {System.out.printf(e + "\t");}, ex -> {System.out.printf("ex:" + ex + "\t");});System.out.println();// skipUntilOther&takeUntilOtherMono<String> start = Mono.just("start").delayElement(Duration.ofSeconds(1));Mono<String> stop = Mono.just("stop").delayElement(Duration.ofSeconds(3));Flux.range(100, 10).delayElements(Duration.ofMillis(400)).map(item -> "map:" + item).skipUntilOther(start).takeUntilOther(stop).subscribe(e -> {System.out.printf(e + "\t");});try {// 阻塞线程,避免延时任务提前结束System.in.read();} catch (IOException e) {throw new RuntimeException(e);}
}

结果:

6)collect

收集列表中的所有元素,并使用Flux.collectList()和Flux.collectSortedList()将结果集合处理为Mono流是可能的。Flux.collectSortedList()不仅会收集元素,还会对它们进行排序。

请注意,收集集合中的序列元素可能耗费资源,当序列具有许多元素时这种现象尤为突出。此外,尝试在无限流上收集数据可能消耗掉所有可用的内存。

public static void main(String[] args) {Flux.just("123", "456", "789", "123").subscribe(System.out::println);Flux.just("456", "789", "123").collectList().subscribe(System.out::println);Flux.just("456", "789", "123").collectSortedList().subscribe(System.out::println);Flux.just("456", "789", "123").collectMap(e -> "key-" + e, e -> "value-" + e).subscribe(System.out::println);Flux.just("456", "789", "123").collectMap(e -> Integer.parseInt(e) < 500 ? "small" : "large").subscribe(System.out::println);Flux.just("456", "789", "123").collectMultimap(e -> Integer.parseInt(e) < 500 ? "small" : "large").subscribe(System.out::println);try {System.in.read();} catch (IOException e) {throw new RuntimeException(e);}
}

结果:

7)distinct

public static void main(String[] args) {AtomicInteger times = new AtomicInteger(0);Flux.just("456", "789", "123").repeat(2).subscribe(e -> {if (e.equalsIgnoreCase("456")) {times.addAndGet(1);System.out.print("\n" + "重复打印:" + times.get());}System.out.print("\t" + e);});times.set(0);Flux.just("456", "789", "123").repeat(2).distinct().subscribe(e -> {if (e.equalsIgnoreCase("456")) {times.addAndGet(1);System.out.print("\n" + "重复打印:" + times.get());}System.out.print("\t" + e);});System.out.print("\n" + "删除重复行:");Flux.just("5456", "5789", "5123", "5456", "5789", "5123", "5456").distinct().subscribe(e -> {System.out.printf(e + "\t");});System.out.print("\n" + "删除相邻行:");Flux.just("5456", "5456", "5123", "5456", "5789", "5123", "5456").distinctUntilChanged().subscribe(e -> {System.out.printf(e + "\t");});System.out.println();Flux.empty().defaultIfEmpty("Flux#empty").subscribe(System.out::println);try {System.in.read();} catch (IOException e) {throw new RuntimeException(e);}
}

结果:

8)flatMap

  1. flatMap:将传入的元素转化为响应流后,再合并为一个新的响应流。会立即订阅新的流,不一定保证原始顺序,且允许来自不同子流的元素进行交错
  2. concatMap:整体与flatMap类似,但不会立即订阅新的流,会在生成下一个子流并订阅它之前等待每个内部完成,天生保留与源元素相同的顺序,不允许来自不同子流的元素交错
  3. flatMapSequential:整体与flatMap类似,但是会通过对所接收的元素进行排序来进行保留顺序,同样不允许来自不同子流的元素交错
@SneakyThrows
public static void main(String[] args) {Random random = new Random();Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)).flatMap(e -> Flux.fromIterable(e).delayElements(Duration.ofMillis(random.nextInt(200)))).subscribe(System.out::println, null, () -> {System.out.println("complete");});Thread.sleep(2000);long l = System.currentTimeMillis();Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)).concatMap(e -> Flux.fromIterable(e).delayElements(Duration.ofMillis(500))).subscribe(System.out::println, null, () -> {System.out.println(System.currentTimeMillis() - l);});Thread.sleep(4000);long l2 = System.currentTimeMillis();Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)).flatMapSequential(e -> Flux.fromIterable(e).delayElements(Duration.ofMillis(500))).subscribe(System.out::println, null, () -> {System.out.println(System.currentTimeMillis() - l2);});System.in.read();
}

结果:

9)scan

public static void main(String[] args) {Flux.just(1, 2, 3, 4, 5).scan(0, (a, b) -> a + b).subscribe(System.out::println);
}

结果:

10)thenMany

public static void main(String[] args) {Flux.just(1, 2, 3, 4, 5).thenMany(Flux.just(6, 7, 8, 9, 10)).subscribe(System.out::println);
}

结果:

11)组合响应流

  1. concat操作符通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后第二个流执行相同的操作
  2. merge操作符将来自上游序列的数据合并到一个下游序列中。与concat操作符不同,上游数据源是立即(同时)被订阅的
  3. zip操作符号订阅所有上游,等待所有数据源发出一个元素,然后将接收到的元素组合到一个输出元素中
  4. combineLatest操作符与zip操作符的工作方式类似。但是,只要至少一个上游数据源发出一个值,它就会生成一个新值
@SneakyThrows
public static void main(String[] args) {Flux.concat(Flux.range(1, 3),Flux.just(5, 6, 7)).subscribe(e -> {System.out.println("concat:" + e);});Flux.merge(Flux.range(1, 3).delayElements(Duration.ofMillis(500)),Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000))).subscribe(e -> {System.out.println("merge:" + e);});Flux.zip(Flux.range(1, 4).delayElements(Duration.ofMillis(500)),Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000))).subscribe(e -> {System.out.println("zip:" + e);});Flux.combineLatest(Flux.range(1, 4).delayElements(Duration.ofMillis(500)),Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000)),(a, b) -> "combineLatest==>" + a + b).subscribe(System.out::println);System.in.read();
}

结果:

12)流元素批处理

  1. 将元素缓冲(buffering)到容器(如list)中,结果流的类型为Flux<List>
  2. 通过开窗(windowing)方式将元素加入诸如Flux<Flux>等流中。请注意,现在的流信号不是值,而是可以处理的子流。
  3. 通过某些键将元素分组(grouping)到具有Flux<GroupedFlux<K,T>>类型的流中。每个新键都会触发一个新的GroupedFlux实例,并且具有该键的所有元素都将被推送到GroupedFlux类的该实例中。

可以基于以下场景进行缓冲和开窗操作:

  1. 处理元素的数量,比方说每10个元素;
  2. 一段时间,比方说每5分钟一次;
  3. 基于一些谓语,比方说在每个新的偶数之前切割;
  4. 基于来自其他Flux的一个事件,该事件控制着执行过程。
@SneakyThrows
public static void main(String[] args) {Flux.range(1, 7).buffer(2).doOnNext(e -> {System.out.println("buffer start");}).subscribe(System.out::println);Flux.range(1, 7).window(2).doOnNext(e -> {System.out.println("window start");}).subscribe(integerFlux -> {// 由于是子流,所以需要再次订阅integerFlux.subscribe(System.out::println);});Flux.range(1, 7).groupBy(e -> e < 4 ? "small" : "large").subscribe(e -> {ArrayList<Integer> integers = new ArrayList<>();e.scan(integers, (list, value) -> {list.add(value);return list;}).doOnComplete(() -> {System.out.println(e.key() + " ==>: " + integers);})// 由于是子流,所以需要再次订阅.subscribe();});System.in.read();
}

结果:

13)sample(采样)

可以让产生的流能够周期性的发出与时间窗口内最近看到的值相对应的数据项

@SneakyThrows
public static void main(String[] args) {Flux.range(1, 5).delayElements(Duration.ofMillis(400)).sample(Duration.ofMillis(1000)).subscribe(System.out::println);System.in.read();
}

结果:

14)blockLast(响应流转化为阻塞流)

  1. toIterable方法将响应式Flux转换为阻塞Iterable
  2. toStream方法将响应式Flux转换为阻塞Stream API。从Reactor3.2开始,在底层使用toIterable方法
  3. blockFirst方法阻塞了当前线程,直到上游发出第一个值或完成流为止。
  4. blockLast方法阻塞当前线程,直到上游发出最后一个值或完成流为止。在onError的情况下,它会在被阻塞的线程中抛出异常。

补充:

  1. blockFirst操作符和blockLast操作符具有方法重载,可用于设置线程阻塞的持续时间。这可以防止线程被无限阻塞。
  2. toIterable和toStream方法能够使用Queue来存储事件,这些事件可能比客户端代码阻塞Iterable或Stream更快到达。微批处理。
public static void main(String[] args) {Flux.range(1, 5).delayElements(Duration.ofMillis(200)).toIterable().forEach(System.out::println);List<String> collect = Flux.range(1, 5).delayElements(Duration.ofMillis(200)).toStream().map(e -> "stream:" + e).collect(Collectors.toList());System.out.println(JSON.toJSONString(collect));Flux.range(1, 5).delayElements(Duration.ofMillis(200)).doOnEach(System.out::println).blockFirst();Flux.range(10, 5).delayElements(Duration.ofMillis(200)).doOnEach(System.out::println).blockLast();
}

结果:

15)materialize(物化和非物化)

使用materialize方法将流中的元素封装为Signa对象进行处理,使用dematerialize方法对Signa对象进行解封处理

@SneakyThrows
public static void main(String[] args) {Flux.error(new IOException("error")).subscribe(e -> {System.out.println("consumer==>" + e);}, ex -> {System.out.println("error==>" + ex);});Flux.error(new IOException("error2")).materialize().subscribe(e -> {System.out.println("consumer==>" + e);}, ex -> {System.out.println("error==>" + ex);});Flux.error(new IOException("error2")).materialize().dematerialize().subscribe(e -> {System.out.println("consumer==>" + e);}, ex -> {System.out.println("error==>" + ex);});System.in.read();
}

结果:

16)onErrorResume(错误处理)

onError信号是响应式流规范的一个组成部分,一种将异常传播给可以处理它的用户,但是,如果最终订阅者没有为onError信号定义处理程序,将会直接抛出异常。捕获异常主要有以下几种方式:

  1. subscribe操作符中的onError信号定义处理程序
  2. onErrorReturn操作符号捕获一个错误,并且用一个静态值或者根据异常类型选择静态值进行替换
  3. onErrorResume操作符捕获异常并执行额外的逻辑
  4. onErrorMap操作符捕获异常后,可以转化为一个新的异常
  5. retry可以在该响应流发生错误时,重新订阅该流,即再次执行相关的逻辑
@SneakyThrows
public static void main(String[] args) {Flux.just(1, 2, 3, 5).flatMap(TestController::hand).subscribe(System.out::println, ex -> {System.out.println("subscribe catch error:" + ex);});Thread.sleep(200);System.out.println();Flux.just(1, 2, 3, 5).flatMap(TestController::hand).onErrorReturn(0).subscribe(System.out::println);Thread.sleep(200);System.out.println();Flux.just(1, 2, 3, 5).flatMap(TestController::hand).onErrorResume(e -> {System.out.println("onErrorResume:" + e);return Flux.just(0);}).subscribe(System.out::println);Thread.sleep(200);System.out.println();Flux.just(1, 2, 3, 5).flatMap(TestController::hand).onErrorMap(e -> {System.out.println("onErrorMap:" + e);return new RuntimeException("add onErrorMap" + e);}).onErrorResume(ex -> {System.out.println("onErrorResume:" + ex);return Flux.just(0);}).subscribe(System.out::println);Thread.sleep(200);System.out.println();Flux.just(1, 2, 3, 5).flatMap(TestController::hand).retry(3).subscribe(System.out::println, ex -> {System.out.println("retry catch error:" + ex);});System.in.read();
}private static Flux<Integer> hand(Integer i) {return Flux.defer(() -> {if (i < 2) {return Flux.just(i);} else {return Flux.error(new RuntimeException("error"));}});
}

结果:

17)defer(冷热数据流)

区别:

  1. 冷发布者行为方式:无论订阅者何时出现,都为该订阅者生成所有序列数据,没有订阅者就不会生成数据。每当订阅者出现时都会有一个新序列生成,而这些语义可以代表http请求。
  2. 热发布者行为方式:数据生成不依赖于订阅者而存在。因此,热发布者可能在第一个订阅者出现之前开始生成元素。

因此,当我们使用just工厂方法生成一个热发布者时,它的值只在构建发布者时计算一次,并且在新订阅者到达时不会重新计算。我们可以通过将just包装在defer中来将其转换为冷发行者。这样,即使just在初始化时生成值,这种初始化也只会在新订阅出现时发生。可以类比为一个lambda表达式,实例化时没有任何动作,只有当具体方法被调用时才会执行相关动作

@SneakyThrows
public static void main(String[] args) {Flux<Integer> defer = Flux.defer(() -> {System.out.println("defer hand");return Flux.range(1, 5);});System.out.println("start");defer.subscribe(System.out::println);defer.subscribe(System.out::println);System.in.read();
}

结果:

18)transform(组合&转换响应流)

当我们构建复杂的响应式工作流时,通常需要在几个不同的地方使用相同的操作符序列。transform操作符,可以将这些常见的部分提取到单独的对象中,并在需要时重用它们。transform操作符仅在流生命周期的组装阶段更新一次流行为,可以在响应式应用程序中实现代码重用。

区别于map,一个接收到的是T,一个是Flux

public static void main(String[] args) {Function<Flux<Integer>, Flux<String>> function = f -> {System.out.println("function");return f.index().map(t -> t.getT1() + "=" + t.getT2());};Flux.range(1, 5).transform(function).subscribe(System.out::println);System.in.read();
}

结果:

6、编程方式创建流

在实际的需求开发中,我们需要以一种更复杂的方法来在流中生成数据,或将对象的生命周期绑定到响应式流的生命周期中

1)push&create

push工厂方法能通过适配一个单线程生产者来编程创建Flux实例。create整体与push方法相同,都是起到桥接的作用,但是create能够支持不同线程发送的事件

@SneakyThrows
public static void main(String[] args) {Flux.push(sink -> {IntStream.range(10, 15).forEach(e -> {if (e != 13) {sink.next(e);} else {sink.onCancel(() -> System.out.println("cancel"));}});}).subscribe(e -> {System.out.print(e + "\t");});Thread.sleep(1000);ExecutorService executorService = Executors.newFixedThreadPool(3);System.out.print("\n" + "push   == > ");Flux.push(sink -> {Runnable task = () -> {for (int i = 10; i < 15; i++) {sink.next(i);try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {sink.error(e);}}};for (int i = 0; i < 3; i++) {executorService.submit(task);}}).subscribe(e -> {System.out.print(e + "\t");});Thread.sleep(2000);System.out.print("\n" + "create == > ");Flux.create(sink -> {Runnable task = () -> {for (int i = 10; i < 15; i++) {sink.next(i);try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {sink.error(e);}}};for (int i = 0; i < 3; i++) {executorService.submit(task);}}).subscribe(e -> {System.out.print(e + "\t");});System.in.read();
}

结果:

2)generate

generate工厂方法旨在基于生成器的内部处理状态创建复杂序列。

它可以使用一个初始值和一个函数,该函数可借助初始值的内部状态计算下一个状态,并将onNext信号发送给下游订订阅者

@SneakyThrows
public static void main(String[] args) {AtomicInteger num = new AtomicInteger(1);Flux.generate(sink -> {sink.next("Hello");if (num.get() == 3) {sink.complete();} else {num.addAndGet(1);}}).subscribe(System.out::println);// 创建一个斐波那契数列Flux.generate(() -> Tuples.of(1, 2),(state, sink) -> {sink.next(state.getT1() + state.getT2());if (state.getT1() + state.getT2() > 50) {sink.complete();}return Tuples.of(state.getT2(), state.getT1() + state.getT2());}).subscribe(System.out::println);System.in.read();
}

结果:

3)using-disposable

@SneakyThrows
public static void main(String[] args) {Flux.using(() -> {System.out.println("create");return Executors.newFixedThreadPool(3);},es -> Flux.range(1, 3),(executorService -> {System.out.println("shutdown");executorService.shutdown();})).subscribe(System.out::println);System.in.read();
}

结果:

4)usingWhen

基于usingWhen工厂包装响应式事务与using操作符类似,usingwhen操作符使我们能以响应式方式管理资源。区别在于using操作符会同步获取。usingWhen操作符响应式地获取受托管资源(通过订阅Pubisher的实例)。此外,usingWhen操作符接受不同的处理程序,以便应对主处理流终止的成功和失败。这些处理程序由发布者实现。

可以仅使用usingWhen一个操作符实现完全无阻塞的响应式事务。

@SneakyThrows
public static void main(String[] args) {Flux.usingWhen(Mono.fromSupplier(() -> {System.out.println("create");return Executors.newFixedThreadPool(1);}),es -> Flux.range(1, 3),(executorService -> {System.out.println("shutdown");executorService.shutdown();return Flux.empty();})).subscribe(System.out::println);Flux.usingWhen(Mono.fromSupplier(() -> {System.out.println("create");return Executors.newFixedThreadPool(1);}),resource -> {return Flux.concat(Flux.just(1, 2, 3),Flux.error(new RuntimeException("Error")));},resource -> {System.out.println("Completed successfully");return Mono.fromRunnable(resource::shutdown);},resource -> {System.out.println("Error occurred");return Mono.fromRunnable(resource::shutdown);},resource -> {System.out.println("Cancelled");return Mono.fromRunnable(resource::shutdown);}).subscribe(System.out::println, System.out::println);System.in.read();
}

结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/4239.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据分享】1929-2024年全球站点的逐日平均气温数据(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff0c;其中又以气温指标最为常用&#xff01;说到气温数据&#xff0c;最详细的气温数据是具体到气象监测站点的气温数据&#xff01;本次我们为大家带来的就是具体到气象监…

简单介绍JSONStream的使用

地址 作用 这个模块是根据需要筛选出json数据中自己所需要的数据 使用 var JSONStream require("JSONStream"); var parse require("fast-json-parse"); var fs require("fs");fs.createReadStream("./time.json").pipe(JSONSt…

信息奥赛一本通 1168:大整数加法

这道题是一道大整数加法&#xff0c;涉及到高精度的算法&#xff0c;比如说有两个数要进行相加&#xff0c;1111111111111111111111111111111111111112222222222222222222222222222222&#xff0c;那么如果这两个数很大的话我们常用的数据类型是不能进行计算的&#xff0c;那么…

架构思考与实践:从通用到场景的转变

在当今复杂多变的商业环境中&#xff0c;企业架构的设计与优化成为了一个关键议题。本文通过一系列随笔&#xff0c;探讨了业务架构的价值、从通用架构到场景架构的转变、恰如其分的架构设计以及如何避免盲目低效等问题。通过对多个实际案例的分析&#xff0c;笔者揭示了架构设…

[JavaScript] 运算符详解

文章目录 算术运算符&#xff08;Arithmetic Operators&#xff09;注意事项&#xff1a; 比较运算符&#xff08;Comparison Operators&#xff09;注意事项&#xff1a; 逻辑运算符&#xff08;Logical Operators&#xff09;短路运算&#xff1a;逻辑运算符的返回值&#xf…

Java测试开发平台搭建(九)前端

1. 搭建前端vue环境 Vue3 安装 | 菜鸟教程 2. 创建项目 1.进入ui vue ui 2. create项目 3. 成功之后添加插件&#xff1a; cli-plugin-router vue-cli-plugin-vuetify 4. 添加依赖 axios 5. 点击任务开始运行 如果报错&#xff1a; 修改vue.config.jsconst { defineConfig }…

【Linux系统编程】—— 深度解析进程等待与终止:系统高效运行的关键

文章目录 进程创建再次认识fork()函数fork()函数返回值 写时拷贝fork常规⽤法以及调用失败的原因 进程终⽌进程终止对应的三种情况进程常⻅退出⽅法_exit函数exit函数return退出 进程等待进程等待的必要性进程等待的⽅法 进程创建 再次认识fork()函数 fork函数初识&#xff1…

最新版Edge浏览器加载ActiveX控件技术——allWebPlugin中间件之awp_CreateActiveXObject接口用法

背景 ActiveXObject‌是JavaScript中的一个特殊对象&#xff0c;用于在Internet Explorer&#xff08;IE&#xff09;浏览器中创建和操作COM&#xff08;Component Object Model&#xff09;对象。COM是一种面向对象的软件组件技术&#xff0c;允许不同应用程序之间的互操作性。…

使用 Java 和 FreeMarker 实现自动生成供货清单,动态生成 Word 文档,简化文档处理流程。

在上一篇博客中主要是使用SpringBootApache POI实现了BOM物料清单Excel表格导出&#xff0c;详见以下博客&#xff1a; Spring Boot Apache POI 实现 Exc&#xff08;&#xff09;el 导出&#xff1a;BOM物料清单生成器&#xff08;支持中文文件名、样式美化、数据合并&#…

JS基础(5):运算符和语句

一.运算符 1.赋值运算符 加减乘除都是一样的&#xff0c;&#xff0c;-&#xff0c;*&#xff0c;/ 2.一元运算符&#xff1a;经常用来计数 自增&#xff1a; 每次只能加一 自减&#xff1a;-- 前置自增 后置自增 结…

以租赁合同的例子讲清楚 开源协议原理和区别

开源协议通俗易懂的方式介绍清楚原理和区别 开源协议其实就是软件的“使用规则”&#xff0c;决定了别人可以如何使用、修改、分享你的代码。通俗一点说&#xff0c;如果你写了一段代码&#xff0c;开源协议就是告诉别人在什么条件下他们可以使用你的代码&#xff0c;以及他们可…

Flowable 管理各业务流程:流程设计器 (获取流程模型 XML)、流程部署、启动流程、流程审批、流程挂起和激活、任务分配

文章目录 引言I 表结构主要表前缀及其用途核心表II 流程设计器(Flowable BPMN模型编辑器插件)Flowable-UIvue插件III 流程部署部署步骤例子:根据流程模型ID部署IV 启动流程启动步骤ACT_RE_PROCDEF:流程定义相关信息例子:根据流程 ID 启动流程V 流程审批审批步骤Flowable 审…

【C++课程学习】:C++中的IO流(istream,iostream,fstream,sstream)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 C学习笔记&#xff1a; https://blog.csdn.net/djdjiejsn/category_12682189.html 前言&#xff1a; 在C语…

四、华为交换机 STP

生成树协议&#xff08;STP&#xff09;的核心目的是在存在冗余链路的网络中&#xff0c;构建一个无环的拓扑结构&#xff0c;从而防止网络环路带来的广播风暴等问题 一、STP 原理 选举根桥&#xff1a;网络中的每台交换机都会有一个唯一的桥 ID&#xff08;BID&#xff09;&am…

数字图像处理:实验二

任务一&#xff1a; 将不同像素&#xff08;32、64和256&#xff09;的原图像放大为像素大 小为1024*1024的图像&#xff08;图像自选&#xff09; 要求&#xff1a;1&#xff09;输出一幅图&#xff0c;该图包含六幅子图&#xff0c;第一排是原图&#xff0c;第 二排是对应放大…

综述:大语言模型在机器人导航中的最新进展!

简介 机器人导航是指机器人能够在环境中自主移动和定位的能力。本文系统地回顾了基于大语言模型&#xff08;LLMs&#xff09;的机器人导航研究&#xff0c;将其分为感知、规划、控制、交互和协调等方面。具体来说&#xff0c;机器人导航通常被视为一个几何映射和规划问题&…

VIVADO FIFO (同步和异步) IP 核详细使用配置步骤

VIVADO FIFO (同步和异步) IP 核详细使用配置步骤 目录 前言 一、同步FIFO的使用 1、配置 2、仿真 二、异步FIFO的使用 1、配置 2、仿真 前言 在系统设计中&#xff0c;利用FIFO&#xff08;first in first out&#xff09;进行数据处理是再普遍不过的应用了&#xff0c…

嵌入式知识点总结 C/C++ 专题提升(一)-关键字

针对于嵌入式软件杂乱的知识点总结起来&#xff0c;提供给读者学习复习对下述内容的强化。 目录 1.C语言宏中"#“和"##"的用法 1.1.(#)字符串化操作符 1.2.(##)符号连接操作符 2.关键字volatile有什么含意?并举出三个不同的例子? 2.1.并行设备的硬件寄存…

【前端动效】HTML + CSS 实现打字机效果

目录 1. 效果展示 2. 思路分析 2.1 难点 2.2 实现思路 3. 代码实现 3.1 html部分 3.2 css部分 3.3 完整代码 4. 总结 1. 效果展示 如图所示&#xff0c;这次带来的是一个有趣的“擦除”效果&#xff0c;也可以叫做打字机效果&#xff0c;其中一段文本从左到右逐渐从…

AI守护煤矿安全生产:基于视频智能的煤矿管理系统架构全解析

前言 本文我将介绍我和我的团队自主研发设计的一款AI产品的成果展示——“基于视频AI识别技术的煤矿安全生产管理系统”。 这款产品是目前我在创业阶段和几位矿业大学的博士共同从架构设计、开发到交付的全过程中首次在博客频道发布, 我之前一直想写但没有机会来整理这套系统的…