1、Project Reactor 高级
1.1、响应式流的生命周期
要理解多线程的工作原理以及 Reactor 中实现的各种内部优化,首先必须了解 Reactor 中响应式类型的生命周期。
1.1.1、组装时
流生命周期的第一部分是组装时(assembly-time)。
Reactor 提供了一个流式 API,用于构建复杂的元素处理流程。Reactor 的 API 看起来像是一个组合流程中所选择的操作符的建造器。建造器模式不仅是可变的,还假设像 build 这样的终端操作会执行另一个对象的构建。Reactor API 提供了不变性,每个被使用的操作符都会生成一个新对象。
在响应式库中,构建执行流程的过程被称为组装(assembling)。如果从头开发,不考虑Reactor API,流程组装的可能表现形式:
FLux<Integer> sourceFlux = new FluxArray(1, 20, 300, 4000);
Flux<String> mapFlux = new MapFlux(sourceFlux, String::valueOf);
Flux<String> filterFlux = new FluxFilter(mapFlux, s -> s.length() > 1);
在底层,Flux 对象是相互组合的。在组装过程之后,就获得了一个 Publishers 链,每个新的 Publisher 包装了前一个。以下伪代码演示了这一点:
FluxFilter(FluxMap(FluxArray(1, 2, 3, 40, 500, 6000))
)
在流生命周期中,该阶段起着重要作用,因为在流组装期间,可以通过检查流的类型来一个接一个地替换操作符。例如, concatWith -> concatWith -> concatWith 操作符序列可以被很容易地被压缩到一个串联结构中。以下代码展示了它在 Reactor 中的实现过程:
public final Flux < T > concatWith(Publisher < ? extends T > other) {if (this instanceof FluxConcatArray) {@SuppressWarnings({"unchecked"})FLuxConcatArray < T > fluxConcatArray = (FluxConcatArray < T > ) this;return fluxConcatArray.concatAdditionalSourceLast(other);}return concat(this, other);
}
如果当前 Flux 是 FluxConcatArray 实例,则不创建 FluxConcatArray(FluxConcatArray(FluxA, FluxB), FluxC) ,而是创建一个 FluxConcatArray(FluxA, FluxB, FluxC) ,并以这种方式改善整体流性能。
此外,在组装时,可以在组装过程中为流提供一些 Hooks,并启用一些额外的日志记录、跟踪、度量收集,以及其他在调试或流监控期间可能有用的重要补充。在该阶段,可以操作流的构造过程并应用不同的技术来优化、监控或更好地进行流调试,这是构建响应式流必不可少的部分。
1.1.2、订阅时
流执行生命周期的第二个重要阶段是订阅时(subscription-time)。
当调用指定的Publisher的subscribe方法时,就会发生订阅。如下代码所示:
filteredFlux.subscribe(...);
为了构建执行流程,对 Publishers 进行相互传递,因而产生了 Publishers 链。一旦调用了顶层包装器的subscribe方法,就开始了该链的订阅过程。以下伪代码展示了一个 Subscriber 在订阅时如何通过 Subscriber 链进行传播:
filterFlux.subscribe(Subscriber) {mapFlux.subscribe(new FilterSubscriber(Subscriber)) {arrayFlux.subscribe(new MapSubscriber(FilterSubscriber(Subscriber))) {// 在这里开始推送真正的元素}}
}
最后,将获取如下相互包装的订阅者序列:
ArraySubscriber(MapSubscriber(FilterSubscriber(Subscriber))
)
订阅时阶段的重要性在于:
- 在该阶段中,可以执行与组装时阶段相同的优化。
- 其次,在 Reactor 中启用多线程的一些操作符能够更改订阅所发生的工作单元。
1.1.3、运行时
流执行的最后一步是运行时(runtime)阶段。
在该阶段,在 Publisher 和 Subscriber之间进行实际信号交换。响应式流规范规定,Publisher 和 Subscriber 交换的前两个信号是 onSubscribe 信号和 request 信号。
onSubscribe 方法由位于顶端的数据源调用,在本例中即 ArrayPublisher。这会将它的 Subscription 传递给给定的 Subscriber。即 array -> map -> filter 描述通过Subscribers 传递 Subscription 过程的伪代码如下所示:
// 首先是Array调用MapSubscriber的onSubscribe方法,传参ArraySubscription对象
MapSubscriber(FilterSubscriber(Subscriber)).onSubscribe(new ArraySubscription()) {// Map调用FilterSubscriber的onSubscribe方法,传参MapSubscription对象FilterSubscriber(Subscriber).onSubscribe(MapSubscription(ArraySubscription(...))) {// Filter调用真正的Subscriber的onSubscribe方法,传参FilterSubscription对象Subscriber.onSubscribe(FilterSubscription(MapSubscription(ArraySubscription(...)))) {// 真正的Subscriber对onSubscribe的处理逻辑。。。}}
}
一旦 Subscription 完全通过 Subscriber 链,链中的每个 Subscriber 会将 Subscription 包装为特定表示。如下面的代码所示:
FilterSubscription(MapSubscription(ArraySubscription())
)
最终,最后一个 Subscriber 接收 Subscription 链。并且,为了开始接收元素,应该调用 Subscription #request 方法。该方法会启动元素的发送。如下代码所示:
// 真正的Subscriber调用FilterSubscription的request方法,参数为请求的元素个数
FilterSubscription(MapSubscription(ArraySubscription(...))).request(10) {// Filter调用MapSubscription的reqeust方法,传参为请求的元素个数MapSubscription(ArraySubscription(...)).request(10) {// Map调用ArraySubscription的reqeust方法,传参请求的元素个数ArraySubscription(...).request(10) {// Array开始发送数据...}}
}
一旦所有订阅者传递了请求内容,并且 ArraySubscription 也接收到了这些请求,ArrayFlux 就可以开始向 MapSubscriber(FilterSubscriber(Subscriber))链发送元素。下面的伪代码,描述了通过所有 Subscriber 发送元素的过程:
// Array接到请求之后
ArraySubscription.request(10) {// 调用MapSubscriber的onNext方法,挨个儿传递元素MapSubscriber(FilterSubscriber(Subscriber)).onNext(1) {// Map接收到元素后,调用FilterSubscriber的onNext方法,传递处理完的元素FilterSubscriber(Subscriber).onNext(1) {// 最终Subscriber对传递来的元素进行处理// Subscriber再次请求一个元素MapSubscription(ArraySubscription(...)).request(1) {...}}}// ...// 调用MapSubscriber的onNext方法,传递第10个元素MapSubscriber(FilterSubscriber(Subscriber)).onNext(10) {// Map调用FilterSubscriber的onNext方法,传递第10个元素FilterSubscriber(Subscriber).onNext(10) {// Filter回调Subscriber的onNext方法,传递第10个元素Subscriber.onNext(10) {// 最终Subscriber的onNext方法中执行元素的处理逻辑}}}
}
在运行时,数据源中的元素通过 Subscriber 链,并在每个阶段执行不同的功能。在运行时我们可以应用优化,减少信号交换量。如,可以减少 Subscription#request 调用的次数,从而提高流的性能。下图总结了流的生命周期以及每个阶段的执行情况。
1.2、线程调度模型(🍖)
1.2.1、publishOn 操作符
publishOn 操作符能将部分运行时操作的执行移动到指定的工作单元。为了指定应该在运行时处理元素的工作单元,Reactor 为此引入了一个特定的抽象,叫作 Scheduler。Scheduler 是一个接口,代表 Project Reactor 中的一个工作单元或工作单元池。如以下代码:
Scheduler scheduler = ...;
Flux.range(0, 100).map(String::valueOf).filter(s -> s.length() > 1).publishOn(scheduler).map(this::calculateHash).map(this::doBusinessLogic).subscribe();
publishOn 操作符之后的执行位于不同的 Scheduler 工作单元上。这意味着对散列的计算发生在 Thread A 上,因此 calculateHash 和 doBusinessLogic 在与 Thread Main 不同的工作单元上执行。如果从执行模型角度来看 publishOn 操作符,可以得到下图所示流程。
publishOn 操作符的重点是运行时执行。在底层,publishOn 操作符会保留一个队列,并为该队列提供新元素,以便专用工作单元消费消息并逐个处理它们。
该示例表明工作正在单独的 Thread 上运行,因此其执行被一个异步边界所分割。所以,现在有两部分独立处理的流程。
注意:响应式流中的所有元素都是逐个处理的(而不是同时处理的),因此可以始终为所有事件定义严格的顺序。此属性也被称为串行化(serializability)。即,元素一旦进入 publishOn,就将被放入队列,并且一旦轮到它,它就将被移出队列进行处理。注意,由于只有一个工作单元专门负责处理队列,因而元素的顺序始终是可预测的。
1.2.2、publishOn 实现并行化
Project Reactor 提供的响应式编程范例可以使用 publishOn 操作符对处理流进行细粒度伸缩和并行化等处理。考虑下图的过程:
如上图,有一个处理流程,其中包含 3 个元素。由于流中元素的同步处理特性,必须在所有转换阶段中逐个移动元素。但是,为了开始处理下一个元素,必须完全处理完前一个元素。如上图,有一个处理流程,其中包含 3 个元素。由于流中元素的同步处理特性,必须在所有转换阶段中逐个移动元素。但是,为了开始处理下一个元素,必须完全处理完前一个元素。
如上图,只要保持元素的处理时间相同,并在处理阶段之间提供异步边界(由publishOn 操作符表示),就可以实现并行处理。现在,处理流程的左侧不需要等待右侧处理完成。相反,它们可以独立工作,以正确地实现并行处理。
1.2.3、subscribeOn 操作符
Reactor 中多线程的另一个要点是名为 subscribeOn 的操作符。与 publishOn 相比,subscribeOn 使你能更改正在运行的订阅链的工作单元。
当从函数的执行过程中创建流的数据源时,此操作符很有用。通常,此类执行在订阅时进行,它会调用一个函数,该函数会提供执行.subscribe 方法的数据源。如下代码:
ObjectMapper objectMapper = ...
String json = "{ \"color\" : \"Black\", \"type\" : \"BMW\" }";
Mono.fromCallable(() -> objectMapper.readValue(json, Car.class))
这里, Mono.fromCallable 从 Callable<T> 创建 Mono ,并将其评估结果提供给每个 Subscriber 。 Callable 实例在调用 subscribe 方法时执行。因此 Mono.fromCallable 在底层执行以下操作:
public void subscribe(Subscriber actual) {// 准备Subscription对象Subscription subscription = ...try {// 调用call方法,获取数据元素T t = callable.call();if (t == null) {// 如果没有数据,直接调用onComplete方法,结束响应式流subscription.onComplete();} else {// 如果有数据,则调用订阅票据的onNext方法传递元素。subscription.onNext(t);// 由于是Mono,传递一个元素之后,调用onComplete方法,结束响应式流subscription.onComplete();}} catch (Throwable e) {actual.onError(Operators.onOperatorError(e, actual.currentContext()));}
}
Callable 的执行发生在 subscribe 方法中。这意味着可以使用publishOn 来更改执行 Callable 的工作单元。可以使用subscribeOn 指定进行订阅的工作单元。以下示例展示了具体方法:
Scheduler scheduler = ...;
Mono.fromCallable(...).subscribeOn(scheduler).subscribe();
前面的示例展示了在单独的工作单元上执行给定的 Mono.fromCallable 的方法。在底层,subscribeOn 将父 Publisher 的订阅放在 Runnable 中执行(Runnable 是指定 Scheduler的调度程序)。
如果比较 subscribeOn 和 publishOn 的执行模型,如下图:左侧表示订阅,右侧表示消费。
由上图可知,subscribeOn 可以部分地指定运行时工作单元以及订阅时工作单元。发生这种情况是因为除了对 subscribe 方法执行的调度,subscribeOn 还会把每次调用调度到Subscription.request()方法,以使调用发生在 Scheduler 实例指定的工作单元上。
根据响应式流规范,Publisher 可以开始在调用者 Thread 上发送数据,因此后续的 Subscriber.onNext() 将在与初始的 Subscription.request() 相同的 Thread 上被调用。publishOn 只能为下游指定执行行为,而不能影响上游执行。
1.2.4、并行操作符
除了一些重要操作符(用于管理想要处理的执行流某些部分的线程),Reactor 还提供了一种熟悉的并行工作技术。为此,Reactor 有一个名为 parallel 的操作符,它能将流分割为并行子流并均衡它们之间的元素。以下是此操作符的使用示例:
@SneakyThrows
@Test
public void test1() {Random random = new Random();CountDownLatch latch = new CountDownLatch(1);Flux.range(1, 10000)//轮询方式将元素交给各个处理器核心来处理,这里只是准备阶段,需要调用runOn真正调度执行。.parallel().doOnNext(item -> {System.out.println("parallel:" + Thread.currentThread().getName());})//每个处理器核心一个执行单元。4c8t,线程名称: runOn: parallel - {1 - 8}.runOn(Schedulers.parallel()).doOnNext(item -> {System.out.println("runOn:" + Thread.currentThread().getName());})//4c8t,线程名称:runOn: parallel - {1 - 8}.map(num -> num + random.nextInt(10000)).doOnNext(item -> {System.out.println("map:" + Thread.currentThread().getName());})//某些核心执行单元的数字被过滤掉了.filter(num -> num % 2 == 0).doOnNext(item -> {System.out.println("filter:" + Thread.currentThread().getName());}).subscribe(//执行在上述各自线程中item -> System.out.println(Thread.currentThread().getName() + "_:_" + item),ex -> System.err.println(ex),() -> latch.countDown());latch.await();
}
parallel() 是 Flux API 的一部分。通过应用 parallel 操作符,开始在不同类型的 Flux 上执行操作,该 Flux 被称为 ParallelFlux 。
ParallelFlux 是一组 Flux 的抽象,其中源 Flux 中的元素是均衡的。然后,通过应用 runOn 操作符,可以将 publishOn 应用于内部 Flux,并分配与元素(正在不同工作单元之间进行处理)相关的工作。
1.2.5、调度器
调度器是一个接口,具有两个核心方法,即 Scheduler.schedule 和Scheduler.createWorker。
第一个方法可以调度 Runnable 任务;第二个方法不仅为我们提供了 Worker 接口的专用实例,还可以以相同的方式调度 Runnable 任务。Scheduler 接口和 Worker 接口之间的核心区别在于 Scheduler 接口表示工作单元池,而 Worker 是 Thread 或资源的专用抽象。
默认情况下,Reactor 提供 3 个核心调度程序接口实现。
- SingleScheduler 能为一个专用工作单元安排所有可能的任务。它具有时间性,因此可以延迟安排定期事件。此调度程序可以使用 Scheduler.single() 调用进行引用。
- ParallelScheduler 适用于固定大小的工作单元池(默认情况下,其大小受 CPU 内核数限制)。适合 CPU密集型任务。此外,默认情况下,它也处理与时间相关的调度事件,例如 Flux.interval(Duration.ofSeconds(1))。此调度程序可以使用 Scheduler.parallel() 调用进行引用。
- ElasticScheduler 可以动态创建工作单元并缓存线程池。由于其所创建的线程池没有最大数量限制,因此此调度程序非常适用于 I/O 密集型操作的调度。此调度程序可以使用 Scheduler.elastic() 调用进行引用。
1.2.6、响应式上下文(🍕重点)
Reactor 附带的另一个关键功能是 Context。Context 是沿数据流传递的接口。Context接口的核心思想是提供对某些上下文信息的访问,因为这些信息可能在稍后的运行时阶段有用。
既然已经有了可以做同样工作的 ThreadLocal,为什么还需要这个功能?例如,许多框架使用 ThreadLocal 来沿用户请求执行传递 SecurityContext,以在任何处理点访问授权用户。
只是,这种概念只有在进行单线程处理时才能正常工作,因为执行是依附于同一个 Thread。如果开始在异步处理中使用该概念,那么 ThreadLocal 将会非常快速地释放。例如,如果执行如下操作,将丢失可用的 ThreadLocal:
@Test
public void test2() {ThreadLocal < Map < Object, Object >> threadLocal = new ThreadLocal < > ();threadLocal.set(new HashMap < > ());Flux.range(0, 10)//将数据放到ThreadLocal中。.doOnNext(k -> threadLocal.get().put(k, new Random(k).nextGaussian()))//调度线程执行.publishOn(Schedulers.parallel())//线程已经改变,无法访问到先前的数据.map(k -> threadLocal.get().get(k)).blockLast();
}
在多线程环境中使用 ThreadLocal 是非常危险的,并且可能导致意外行为。尽管 Java API 能将 ThreadLocal 数据从一个 Thread 传输到另一个 Thread,但它并不保证传输的完全一致性。Reactor Context 通过以下方式解决了这个问题:
@Test
public void test3() {Flux.range(0, 10).flatMap(k -> Mono.subscriberContext().doOnNext(context -> {Map < Object,Object > map = context.get("randoms");map.put(k, new Random(k).nextGaussian());}).thenReturn(k)).publishOn(Schedulers.parallel()).flatMap(k -> Mono.subscriberContext().map(context -> {Map < Object,Object > map = context.get("randoms");return map.get(k);})).subscriberContext(context -> context.put("randoms", new HashMap())).blockLast();
}
- Reactor 使用静态操作符 subscriberContext 提供对当前流中 Context 实例的访问。一旦获取了 Context,就可以访问 Map 并将生成的值放在那里。最后,返回 flatMap 的初始参数。
- 在切换 Thread 后再次访问 Reactor 的 Context。尽管此示例与使用ThreadLocal 的前一个示例相同,但将成功获取存储的映射并获得生成的随机高斯双精度数。
- 最后,在这里,为了生成 randoms 键(该键返回一个 Map),我们在上游填充一个新的 Context 实例,该实例包含所需键对应的 Map。
Context 可以通过无参数的 Mono.subscriberContext 操作符进行访问,并且可以通过单参数 subscriberContext(Context) 操作符提供给流。
既然 Context 接口具有与 Map 接口类似的方法,那为什么需要使用 Map 来传输数据?
Context 是不可变对象,一旦向它添加新元素,就实现了 Context 的新实例。这样的设计决策有利于多线程访问模型。
这意味着,这是向流提供 Context 并动态提供某些数据的唯一方法,这些数据将在组装时或订阅时的整个运行执行期间可用。如果在组装时提供了 Context,那么所有订阅者将共享相同的静态上下文,但这在每个 Subscriber(可能代表用户连接)具有其自身的 Context 的情况下可能没有用。因此,可以向每个 Subscriber 提供其自身上下文的唯一生命周期时段是订阅时阶段。
在订阅时,Subscriber 通过 Publisher 链从流的底部上升到顶部,并在每个阶段中形成包装到本地Subscriber 的表现形式,从而引入额外的运行时逻辑。为了保持该流程不变并通过流传递额外的 Context 对象,Reactor 使用名为 CoreSubscriber 的接口,该接口是 Subscriber 接口的特定扩展。CoreSubscriber 将 Context 作为其字段进行传递。CoreSubscriber 接口形式如下所示:
interface CoreSubscriber < T > extends Subscriber < T > {default Context currentContext() {return Context.empty();}
}
CoreSubscriber 引入了一个名为 currentContext 的附加方法,该方法提供了对当前 Context 对象的访问。Project Reactor 中的大多数操作符提供了对CoreSubscriber 接口的实现,并引用了下游 Context。
唯一能修改当前 Context 的操作符是 subscriberContext,它是 CoreSubscriber 的实现,持有被合并的下游 Context 并将其作为参数进行传递。此外,这种行为意味着可访问的 Context 对象可能随流中的位置不同而不同。例如,以下代码展示了上述行为:
package com.blnp.net.reactor.stream;import cn.hutool.json.JSONUtil;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;/*** <p></p>** @author lyb 2045165565@qq.com* @version 1.0* @since 2024/11/6 17:49*/
public class StreamContextTest {public void run() {printCurrentContext("top").subscriberContext(Context.of("top", "context")).flatMap(__ -> printCurrentContext("middle")).subscriberContext(Context.of("middle", "context")).flatMap(__ -> printCurrentContext("bottom")).subscriberContext(Context.of("bottom", "context")).flatMap(__ -> printCurrentContext("initial")).block();}void print(String id, Context context) {String format = String.format("%s = %s", id, context.toString());System.out.println(format);}public Mono< Context > printCurrentContext(String id) {return Mono.subscriberContext().doOnNext(context -> print(id, context));}@Testpublic void test() {run();}
}
上述代码展示了我们如何在流构造过程中使用 Context。如果我们运行上述代码,控制台将显示以下结果:
如上述代码所示,流顶部的可用 Context 包含此流中可用的整个 Context,其中流的中间部分只能访问在下游中定义的 Context,而位于非常底部(具有 id 初始值)的上下文消费者上下文为空。
一般而言,Context 是一个杀手锏,它推动 Project Reactor 成为建立响应式系统的更高级别的工具。此外,这种特性对于我们需要访问上下文数据的许多场景很有用,例如,在流程中间处理用户请求的场景。此特性在 Spring 框架中得到了广泛使用,在响应式Spring Security 中尤其如此。
1.3、底层内幕
Reactor 拥有丰富的有用操作符。整个 API 具有与 RxJava 类似的操作符。Project Ractor 3 最显著的改进是:响应式流生命周期(Reactive Stream life-cycle)和操作符融合(operator fusion)。
1.3.1、宏融合
宏融合(macro-fusion)主要发生在组装时,其目的是用一个操作符替换另一个操作符。
同时,Flux 内部操作符的某些部分也应该处理一个或零个元素(例如,操作符 just(T)、empty() 和 error(Throwable))。
在大多数情况下,这些简单的操作符会与其他转换流一起使用。因此,减少这种开销至关重要。为此,Reactor 在组装时提供优化,如果它检测到上游 Publisher 实现了 Callable 或 ScalarCallable 等接口,那么上游 Publisher 将被经过优化的操作符所替换。应用此类优化的示例代码如下所示:
Flux.just(1).publishOn(...).map(...)
上面代码中,元素的执行应该在元素创建后立即移动到不同的工作单元。如果没有应用优化,这样的执行会分配一个队列来保存来自不同工作单元的元素,而从这样一个队列中入队和出队的元素会导致一些不稳定的读写,因此这种普通 Flux 的执行开销过大。
由于执行过程具体发生在哪个工作单元并不重要,并且提供一个元素可以被表示为 ScalarCallable#call,因而可以将 publishOn 操作符替换为不需要创建额外队列的 subscribeOn。此外,由于应用了优化,下游的执行不会改变,因此执行经过优化的流,将获得相同的结果。
前面的示例是隐藏在 Project Reactor 中的宏融合优化中的一种。一般而言,在 Project Reactor 中应用宏融合的目的是优化组装流程,这样一来,就可以使用更原始、成本更低的解决方案,而不会把强大工具的宝贵资源浪费在简单任务上。
1.3.2、微融合
微融合(micro-fusion)是一种更复杂的优化,与运行时优化以及重用共享资源有关。微融合的一个很好的例子是条件操作符。见下图:
商店订购了 n 件商品。过了一段时间,工厂用卡车将物品送到商店。但是,为了最终到达商店,卡车必须通过检验部门,以确保所有商品质量合格。由于有些物品没有仔细包装,因而只有部分订单到达了商店。在那之后,工厂准备了另一辆卡车,再次往商店送货。这种情况反复发生,直到所有订购的商品到达商店。幸好,工厂意识到他们在使商品通过单独的检验部门上花了太多的时间和金钱,并决定从检验部门雇用检验员到本地。
所有物品现在都可以在工厂进行检验后送到商店,而无须前往检验部门。
Flux.from(factory).filter(inspectionDepartment).subscribe(store);
下游订阅者已从数据源请求了一定数量的元素。在通过操作符链发出元素时,元素正在通过条件操作符,而这可能拒绝某些元素。为了满足下游的需求,每个被拒绝数据项的过滤器操作符必须执行附加的 request(1)上游调用。根据当前响应式库(例如RxJava 或 Reactor 3)的设计,request 操作有自己的额外 CPU 开销。
根据 David Karnok 的研究,每个“对 request()的调用通常最终都在一个原子 CAS 循环中,而每 21~45 个循环会掉落一个元素”。
这意味着条件操作符(如 filter 操作符)可能对整体性能产生重大影响!出于这个原因,出现了一种被称为 ConditionalSubscriber 的微融合类型。这种类型的优化使我们能在数据源端验证条件,并发送所需数量的元素而无须额外的 request 调用。
第二种微融合是最复杂的一种。这种融合与操作符之间的异步边界有关。为了理解这个问题,假设一个具有一些异步边界的操作符链,如下例所示:
Flux.just(1, 2, 3).publishOn(Schedulers.parallel()).concatMap(i -> Flux.range(0, i).publishOn(Schedulers.parallel())).subscribe();
前面的例子展示了 Reactor 的操作符链。此链包含两个异步边界,这意味着这里会出现队列。如,因为 concatMap 操作符的本质是它可能在来自上游的每个传入元素上产生 n 个元素,所以内部 Flux 将产生多少元素是无法预测的。
为了处理背压以避免压垮消费者,需要将结果放入队列中。而为了将响应式流中的元素从一个工作线程传输到另一个工作线程,publishOn 操作符也需要内部队列。除了队列开销,还有更危险的跨越异步边界的 request()调用。这些可能导致更大的内存开销。
上图展示了前面代码段的内部行为。在这里,concatMap 的内部有一个巨大的开销,这种情况下,需要为每个内部流发送一个 request 直到满足下游需求。
每个具有队列的操作符都有自己的 CAS 循环,这在不合理模型的请求事件中可能导致高额性能开销。例如,请求 1个或任何(相比整个数据量)少得不合理的数量的元素,都可以被认为是不合理的请求模型。
CAS(比较和交换)是一个单独的操作,它会根据操作是否成功返回值 1 或值 0。由于希望操作成功,我们会重复 CAS 操作直到成功为止。这些重复的 CAS 操作被称为 CAS 循环。
为了避免内存开销和性能开销,应该遵循响应式流规范的建议,切换通信协议。假设一个或多个边界内的元素链具有共享队列,那么可以切换整个操作符链以使用上游操作符作为无须额外 request 调用的队列,这样可以显著提高整体性能。
因此,下游可以从上游排出值,如果该值不可用于指示流的结束,则返回 null。为了通知下游元素可用,上游调用下游的 onNext,并使用null作为该协议的特例。此外,错误情况或流的完成将照常通过onError或onComplete进行通知。因此,先前的示例可以通过以下方式优化,如图:
在该示例中,publishOn 和 concatMap 操作符可以被显著优化。在第一种情况下,因为没有必须在主线程中执行的中间操作符,所以我们可以直接使用 just 操作符作为队列,并在单独的线程上从该队列中执行 pull 操作。在 concatMap 的情况下,所有内部流也可以被视为队列,以在没有附加任何request 调用的情况下排出每个流。
总之,Reactor 库的内部结构比看起来更复杂。通过强大的优化,Reactor 远远领先于 RxJava 1.x,从而提供了更好的性能。