Reactor介绍
- 1、异步执行技术
- 2、实现方式
响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。Java中的Reactor是一个用于响应式编程的库,它建立在Reactive Streams规范之上,旨在帮助开发者构建非阻塞的、高效的、具有弹性的应用程序。
1、异步执行技术
在传统开发过程中,通常是同步阻塞式的开发模式。举个例子:
假设消费者为A,服务者为B。A向B发送数据处理请求,在B处理的整个过程中,A一直处于阻塞状态,直到B处理完成返回数据。在整个过程中, CPU 利用效率低下,很多时间线程被浪费在了 I/O 阻塞上,无法执行其他的处理过程。
而响应式编程引入异步式调用的技术来实现异步非阻塞的能力。具体机制包括:回调和Future
-
回调:回调体现的是一种双向调用方式。
即A的methodA调用B的methodB,待B的methodB执行完成后再主动调用A的callback方法,在回调callback之前,A可以执行其他任务,这样就实现了异步功能,使得调用链路不发生任何的阻塞。
但是,回调的最大问题是复杂性,一旦在执行流程中包含了多层的异步执行和回调,那么就会形成一种嵌套结构,给代码的开发和调试带来很大的挑战。所以回调很难大规模地组合起来使用,因为很快就会导致代码难以理解和维护,从而造成所谓的“回调地狱”问题。
-
Future:Future相当于一个代理,有任务需要处理直接提交给Future,Future 就会在一定时间内完成这个任务,而在这段时间内我们可以去做其他事情。
回调的示例:
interface Callback {void onComplete(String result);
}class AsyncProcessor {void doAsyncTask(Callback callback) {new Thread(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 操作完成,调用回调函数callback.onComplete("Task Completed");}).start();}
}public class CallbackExample {public static void main(String[] args) {AsyncProcessor processor = new AsyncProcessor();processor.doAsyncTask(result -> {// 处理回调结果System.out.println(result);});System.out.println("Async task started");}
}
在AsyncProcessor类定义了一个异步任务方法doAsyncTask,该方法接受一个回调接口Callback。当异步任务完成时,回调函数onComplete被调用并传递结果。
“回调地狱”示例:
doAsyncTask1(result1 -> {doAsyncTask2(result2 -> {doAsyncTask3(result3 -> {doAsyncTask4(result4 -> {// 处理最终结果System.out.println(result4);});});});
});
2、实现方式
异步式编程的实现采用发布-订阅模式。
在此模式下,发布者和订阅者之间没有直接的交互,而是通过发送事件到事件处理平台的方式来完成整合。当发布者有新消息需要发布,就将该事件发布到处理平台,处理平台再将消息通知给所有的订阅者;订阅者也可以主动请求订阅,查看是否有新事件发布。如下图:
这样即使业务场景多种多样,事件处理方式也不同,所有事件都可以在处理平台进行处理,不同的场景只需要发送不同的事件即可。而对于这些事件,异步式编程称之为数据流。
数据流的概念类似为商品,生产者生产放到商店,消费者从商店获取。
相较传统开发所普遍采用的“拉”模式,在响应式编程下,基于事件的触发和订阅机制,这就形成了一种类似“推”的工作方式。这种工作方式的优势就在于,生成事件和消费事件的过程是异步执行的,所以线程的生命周期都很短,也就意味着资源之间的竞争关系较少,服务器的响应能力也就越高。
其他相关内容参考:
Spring响应式编程之Reactor背压机制
Spring响应式编程之Reactor核心接口
Spring响应式编程之Reactor核心组件
Spring响应式编程之Reactor操作符