Reactive响应式编程系列:解密Lettuce如何实现响应式

        响应式编程历史悠久,早在2005年,Microsoft Cloud Programmability Team 开始探索一种简单的编程模型,以构建大规模的异步和数据密集型互联网服务架构,响应式编程的理念逐步诞生。这本是为了解决服务端系统而提出的理念,最后也逐步也应用到客户端等其他领域,而在Java服务端领域,最著名的两大响应式库就是 RxJava (GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.) 和 Project Reactor (GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM)。本文的重点不是探究响应式编程的内部实现,而是剖析 Lettuce 如何实现响应式范式。
        我们知道,作为三个最流行Redis Java客户端 Jedis、Lettuce 和 Redisson,只有 Jedis 没有实现了响应式接口(Reactive)。我们今天的主角是:Lettuce (GitHub - lettuce-io/lettuce-core: Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.)。我们先来看看Lettuce的常规使用方式(或者说命令式编程的使用方式),为了简单起见,我们直接使用Redis单实例来演示(生产上当然都是Redis Cluster或者Redis Sentinel部署模式):

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStringCommands command = connection.sync();
System.out.println(command.get("key")); // 输出key的值

很显然第4行的 “command.get("key")” 是阻塞的,意味着当前线程必须等着拿到结果才能进行下一步,期间什么事情都做不了。虽然通常来说Redis操作都足够快,但我们需要考虑特殊情况,比如大Key或者网络不佳等场景,一旦慢查出现,当前系统的业务处理线程都将阻塞在这一步,整个系统无法响应新的请求,Reactive响应式编程就可以改善这一局面,更具体的说,响应式编程是为了解决非CPU密集型(例如数据或IO密集型)系统资源消耗问题,它带来的一个好处在于系统能根据业务实际情况来调整资源消耗,并且让系统应对故障时能更具弹性,而它的核心就是“异步事件驱动” + “背压背压一改往常订阅者只能被迫“投喂”的尴尬局面,让订阅者也能控制发布者发布元素的顺序。

        像其他事物一样,响应式编程以及库类从历经坎坷到蓬勃发展好几年之后,Reactive Streams组织(包括Pivotal、Netflix、Typesafe和Lightbend于2013年成立,旨在为异步流处理提供一个通用的API规范)在2014年发布了第一个API规范(Java的可以看GitHub - reactive-streams/reactive-streams-jvm: Reactive Streams Specification for the JVM),当你把这个reactive-streams-jvm clone 下来后关注下api模块,因为里面就4接口加1个类:

之前对响应式模式有过了解的同学肯定觉得它和 观察者模式 很像,但看了上面的接口,会觉得它更像 发布订阅模式,让我们从ChatGPT的描述中再来回顾下这两种模式吧:

观察者模式(Observer Pattern)是一种常见的设计模式,其核心思想是:当一个对象的状态发生改变时,其他依赖于它的对象都会收到通知并自动更新。在观察者模式中,存在一个被观察的对象(Subject),它维护一个列表,用来保存所有观察它的对象(Observer)。当Subject的状态发生改变时,它会自动通知所有的Observer对象,让它们可以及时更新自己的状态。

发布订阅模式(Publish-Subscribe Pattern)也是一种消息传递模式,其核心思想是:在多个对象之间,有一个消息中心(Message Broker)来协调对象之间的通信。在发布订阅模式中,发布者(Publisher)不直接发送消息给订阅者(Subscriber),而是通过消息中心来传递消息。订阅者可以向消息中心注册自己感兴趣的消息类型,并在消息中心有消息发布时,自动收到通知。

目前,我们可以把响应式模式看做本地化的发布订阅模式,但目前主流的响应式库类实现的功能远多于传统的发布订阅框架,例如,其中 “背压” 就是核心中的核心。在响应流API规范中,我们看到了四个接口,即 发布者(Publisher)订阅者(Subscriber)订阅关系(Subscription)处理者(Processor),处理者就是就是发布者和订阅者的结合体。我们来认真看下这四个接口,便于我们进一步了解响应式的秘密:

public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}public interface Subscription {public void request(long n);public void cancel();
}public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
  • 发布者接口Publisher 很简单,就一个订阅方法,传入一个订阅者即可,用于建立订阅关系。
  • 订阅者接口Subscriber 稍微复杂点,有四个方法,第一个onSubscribe方法在订阅成功后会被调用,入参是一个订阅关系对象,意味着订阅者能通过订阅关系来主动和发布者通信,记住,这是“背压”机制的关键。第二个方法是onNext,即发布者通过该方法给订阅者“发射”一个元素或事件,而第三个onError方法,是指发布者发射元素有错误时,会通过该方法让订阅者感知。最后的onComplete方法在所有元素都发射完成后被调用。
  • 订阅关系接口Subscription 中只有两个方法,即reqeust方法,用于订阅者像发布者请求一定数量的元素,另一个是cancel方法,用于取消订阅关系。而这个request就是“背压”的关键,毕竟订阅者的消费能力只有自己知道。
  • 处理者接口Processor 就不多说了,所见即所得。

当然,我们肯定也记得2016年发布的Java9中添加了Flow类来支持响应式流,可以看出Java官方也坐不住了,所以上图 reactive-streams-jvm 中后面加了个java9的目录,其中的 FlowAdapters 类就是用于 响应流API和Java9的Flow类互转。

        有人肯定会说,有了 Project Reactor (前面提到的主流的响应式库之一),想把 lettuce-core 从阻塞的命令式编程变成异步的响应式编程还不简单,通过对之前命令式编程代码进行如下改造即可:

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStringCommands command = connection.sync();
// System.out.println(command.get("key")); // 输出key的值
myReactiveRedisGet(command, "key").subscribe(value -> System.out.println("查询Redis key的结果:" + value)); // 采用Reactive方式输出key的值private static Mono<String> myReactiveRedisGet(RedisStringCommands<String, String> command, String key) {return Mono.just(command.get(key));
}

这里我们并没有直接使用阻塞的command.get("key")来获取结果,而是新建了一个myReactiveRedisGet方法,它的返回值是Mono<String>,Mono是project reactor库(后面简称reactor)中的核心类之一,用于发射0个或1个元素的响应式流类,是属于发布者角色,即它是一种Publisher(另一种发射1个或多个元素的是Flux,它也是一种Publisher)。Mono.just方法用于直接发射入参传进来的元素(即command.get(key)),仔细看这行代码,我们会发现Mong.just调用前传入的入参就已经是阻塞的方式获取的结果了,这当然不符合Reactive的预期,所以我们改成了如下代码:

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStringCommands command = connection.sync();
// System.out.println(command.get("key")); // 输出key的值
myReactiveRedisGet(command, "key").subscribe(value -> System.out.println("查询Redis key的结果:" + value)); // 采用Reactive方式输出key的值private static Mono<String> myReactiveRedisGet(RedisStringCommands<String, String> command, String key) {return Mono.defer(() -> Mono.just(command.get(key)));
}

可以看到,我们在Mono.just外用Mono.defer包了一层,defer的入参是一个返回Mono的Supplier,defer用来延迟创建Mono,这也意味着我们会延迟获取结果(command.get(key)),这是响应式编程的另一个重要特征,即有订阅时才生产和发送数据,没订阅的时候啥事也不做,避免消耗,这很环保,千万别小看这一点。那难道用Mono.defer后就相当于Redis操作就变成Reactive了吗?运行上述代码,你会发现所有操作其实都是由一个线程来执行,这意味着一个线程(在有订阅者订阅时)完成了所有事情,这肯定是阻塞的,因为command.get就是妥妥的阻塞方法!!该线程只有等它返回,否则它做不了其他任何事情。幸运的是reactor库给我们提供了异步处理 “发布者发射元素” 和 “订阅者订阅和请求元素个数” 操作的方法,即 publishOn 和 subscribeOn 方法,它们的入参都是一个 Scheduler 类型的对象,而 Scheduler 在 reactor 中是专为运算符提供抽象异步边界的(简单的说就是放到另一个线程去执行)。由于我们的订阅只是一个简单的结果打印,是元素发射过程中才去阻塞访问的Redis,所以我们希望在元素发射过程中异步,于是我们用publishOn方法,代码修改成如下:

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStringCommands command = connection.sync();
// System.out.println(command.get("key")); // 输出key的值
myReactiveRedisGet(command, "key").subscribe(value -> System.out.println("查询Redis key的结果:" + value)); // 采用Reactive方式输出key的值private static Mono<String> myReactiveRedisGet(RedisStringCommands<String, String> command, Supplier<String> keySupplier) {return Mono.defer(() -> Mono.just(command.get(key))).publishOn(Schedulers.parallel()); // 元素发射的异步执行
}

可以看到,我们在myReactiveRedisGet方法中在Mono.defer方法后加了publishOn方法,入参是Schedulers.parallel(),它返回的是个计划线程池(ScheduledThreadPoolExecutor),这样执行后,你会发现command.get确实在另一个线程中执行了,而不是在当前线程。这样就大功告成啦?难道我们就这样通过几行代码完美的实现了Lettuce库访问Redis的响应式化?如果真是这样,那么我们就可以直接给Jedis提交一个PR,宣告Jedis也支持响应式编程了!然而现实是残酷的,虽然我们针对Lettuce的命令式编程的代码通过响应式库reactor进行了多次调整,但仍然掩盖不了一个事实,就是我们调用的始终是阻塞方法command.get,虽然我们掩耳盗铃的将其放在另一个线程异步来执行了,但这个执行它的一部线程始终也需要阻塞的等待应答结果,我们只是没让当前业务线程来等待而已(突然想起之前网上有人说的一句话,复杂度它不会消失,只会转移),一旦请求多起来,Schedulers.parallel()中的线程也会被耗尽成为新的瓶颈点!!我们只考虑了异步,但忽略了最重要一点,那就是事件驱动,你得让一个非阻塞的第三方来通知你操作已经完成了,而不是换一个线程去阻塞等待!!这里,我们可以得出一个结论:仅仅只靠reactor之类的响应式库加持,我们无法真正把一个命令式阻塞操作变成Reactive

        让我们先冷静分析一下,回顾下Lettuce的命令式编程的API,发现有4个关键动作:

  1. 创建RedisClient对象。
  2. 创建StatefulRedisConnection连接对象。
  3. 从连接对象获取Command命令对象。
  4. 从Command命令对象进行真正的Redis操作。

第1、2两个步骤不会每个Redis操作都执行,一般是初始化好后管理起来(例如Spring容器)重复使用,所以要想Lettuce支持Reactive,只能需要在第3、4步进行扩展。铺垫了这么多,我们来看看 lettuce-core 中使用Reactive响应式编程的demo:

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStringReactiveCommands<String, String> command = connection.reactive();
Mono<String> get = command.get("key");
get.subscribe(System.out::println); // 控制台输出value

可以看出,相比之前的命令式编程从connection获取的command就开始不同了,这是一个ReactiveCommands对象,然后用该对象来进行Redis操作,同时command.get也不是直接返回结果了,而是一个Mono,最后我们订阅这个发布者(get),将结果值输出到控制台。

我们接下来将解密这背后的原理,Lettuce的阻塞调用方式是从 StatefulRedisConnection 类型的连接中调用sync方法拿到一个Command对象,那么为了支持响应式编程,所以在接口StatefulRedisConnection中新增了reactive方法:

/*** Returns the {@link RedisCommands} API for the current connection. Does not create a new connection.** @return the synchronous API for the underlying connection.*/RedisCommands<K, V> sync();   /*** Returns the {@link RedisReactiveCommands} API for the current connection. Does not create a new connection.** @return the reactive API for the underlying connection.*/RedisReactiveCommands<K, V> reactive();

我们可以看到,阻塞Command是 RedisCommands 类型,而Reactive的Commands是 RedisReactiveComands 类型,不看 RedisReactiveComands 类基本都能猜到它和 RedisCommands 之间最大的区别就是它的返回值是一个结果的发布者类型(例如Mono或者Flux)而不直接是结果类型,例如(由于Redis不同的数据结构操作不同,所以RedisCommands和RedisReactiveComands都会有对应每种数据结构更具体的接口类,例如针对字符串数据结构的 RedisStringCommands 和 RedisStringReactiveCommands):

public interface RedisStringCommands<K, V> {/*** Get the value of a key.** @param key the key.* @return V bulk-string-reply the value of {@code key}, or {@code null} when {@code key} does not exist.*/V get(K key);// 其他方法略
}public interface RedisStringReactiveCommands<K, V> {/*** Get the value of a key.** @param key the key.* @return V bulk-string-reply the value of {@code key}, or {@code null} when {@code key} does not exist.*/Mono<V> get(K key);// 其他方法略
}

由于字符串的get操作只返回一个对象,所以Reactive操作用的是Mono<V>而不是Flux<V>。对于Reactive操作而言,抽象类 AbstractRedisReactiveCommands 负责对直接对各个具体数据类型的Command来进行实现(需要注意的是它并不是直接实现RedisReactiveCommands接口),我们还是拿get操作举例,AbstractRedisReactiveCommands的实现如下:

public abstract class AbstractRedisReactiveCommands<K, V> implements RedisAclReactiveCommands<K, V>,RedisHashReactiveCommands<K, V>, RedisKeyReactiveCommands<K, V>, RedisStringReactiveCommands<K, V>,RedisListReactiveCommands<K, V>, RedisSetReactiveCommands<K, V>, RedisSortedSetReactiveCommands<K, V>,RedisScriptingReactiveCommands<K, V>, RedisServerReactiveCommands<K, V>, RedisHLLReactiveCommands<K, V>,BaseRedisReactiveCommands<K, V>, RedisTransactionalReactiveCommands<K, V>, RedisGeoReactiveCommands<K, V>,RedisClusterReactiveCommands<K, V> {@Overridepublic Mono<V> get(K key) {return createMono(() -> commandBuilder.get(key));}public <T> Mono<T> createMono(Supplier<RedisCommand<K, V, T>> commandSupplier) {if (tracingEnabled) {return withTraceContext().flatMap(it -> Mono.from(new RedisPublisher<>(decorate(commandSupplier, it), connection, false, getScheduler().next())));}return Mono.from(new RedisPublisher<>(commandSupplier, connection, false, getScheduler().next()));}// 其他方法略
}

我们可以看到,它直接调用了createMono方法,而且createMono方法的入参是一个 RedisCommand 的Supplier对象,意味着它支持延迟创建RedisCommand,遵循了我们之前说的响应式的特征之一:延迟创建(只有被真正订阅时才产生数据和开销)。需要注意的是,这里的 RedisCommand 和前面说的 RedisCommands、RedisReactiveCommands 是有区别的,RedisCommands、RedisReactiveCommands 用来封装各种数据类型的命令操作,而 RedisCommand 用于封装输出、参数和状态,用于命令执行结果的处理。我们接着看createMono的具体实现,抛开if语句的部分,它通过 RedisPublisher 来构建了一个Mono,采用的是Mono.from,该方法会将一个响应式规范API中的发布者(Publisher接口的实现类)转换成reactor库的Mono来返回,而且你可以发现,createMono方法几乎用所有资源都用来构建 RedisPublisher 对象,包括Command的Supplier、连接、异步执行的线程等。

        既然 RedisPublisher 是一个标准的 Publisher,那么它肯定会实现 subscribe 方法,我们来一探究竟:

class RedisPublisher<K, V, T> implements Publisher<T> {private final Supplier<? extends RedisCommand<K, V, T>> commandSupplier;private final AtomicReference<RedisCommand<K, V, T>> ref;private final StatefulConnection<K, V> connection;private final boolean dissolve;private final Executor executor;public RedisPublisher(RedisCommand<K, V, T> staticCommand, StatefulConnection<K, V> connection, boolean dissolve,Executor publishOn) {this(() -> staticCommand, connection, dissolve, publishOn);}public RedisPublisher(Supplier<RedisCommand<K, V, T>> commandSupplier, StatefulConnection<K, V> connection,boolean dissolve, Executor publishOn) {LettuceAssert.notNull(commandSupplier, "CommandSupplier must not be null");LettuceAssert.notNull(connection, "StatefulConnection must not be null");LettuceAssert.notNull(publishOn, "Executor must not be null");this.commandSupplier = commandSupplier;this.connection = connection;this.dissolve = dissolve;this.executor = publishOn;this.ref = new AtomicReference<>(commandSupplier.get());}@Overridepublic void subscribe(Subscriber<? super T> subscriber) {if (this.traceEnabled) {LOG.trace("subscribe: {}@{}", subscriber.getClass().getName(), Objects.hashCode(subscriber));}// Reuse the first command but then discard it.RedisCommand<K, V, T> command = ref.get();if (command != null) {if (!ref.compareAndSet(command, null)) {command = commandSupplier.get();}} else {command = commandSupplier.get();}RedisSubscription<T> redisSubscription = new RedisSubscription<>(connection, command, dissolve, executor);redisSubscription.subscribe(subscriber);}// 其他方法略
}

我们知道标准的 Publisher.subscribe 主要是完成订阅工作,ref 是一个 RedisCommand Supplier 的原子引用,上述代码有段奇怪的逻辑,即获取command的部分,其实主要目的是为了防止单个 command 被重复使用,一旦发现重复使用(即ref.compareAndSet(command, null)为false),则重新使用 RedisCommand Supplier 获得 command。当然,最关键的还是下面的构建 RedisSubscription 对象(即前面Reactive中的订阅关系角色)并调用其subscribe方法,需要注意一点的是,我们是通过传入订阅者subscriber来构建 RedisSubscription,这当然合情合理。

        接下来,我们剖析下 RedisSubscription,有人肯定会好奇,订阅关系Subscription理论上有用于请求n个元素的request和取消订阅的cancel方法,为啥还会有发布者Publisher中的subscribe方法?其实这并不是说RedisSubscription也是一个Publisher角色,而是这个方法发是被Publisher的subscribe方法来调用,所以干脆也叫做subscribe。RedisSubscription.subscribe方法实现如下:

        /*** Subscription procedure called by a {@link Publisher}** @param subscriber the subscriber, must not be {@code null}.*/void subscribe(Subscriber<? super T> subscriber) {if (subscriber == null) {throw new NullPointerException("Subscriber must not be null");}// UNSUBSCRIBED 状态State state = state();if (traceEnabled) {LOG.trace("{} subscribe: {}@{}", state, subscriber.getClass().getName(), subscriber.hashCode());}// 调用后进入 NO_DEMAND 状态state.subscribe(this, subscriber);}

可以看到该方法很简单,就是调用state的subscribe方法。State 是什么?State 是 RedisPublisher 中的一个枚举类型,用来表示 RedisSubscription(其实也是在RedisPublisher中定义)所处的状态,你可以发现,RedisSubscription 作为订阅关系用来表示订阅者Subscriber的订阅状态,而 State 用来表示订阅关系的状态。按照该枚举的定义,State有如下状态:

  • UNSUBSCRIBED 初始的未订阅状态,调用其subscribe方法可以进入NO_DEMAND状态。
  • NO_DEMAND 没有需求时进入的状态
  • DEMAND 有需求时进入的状态
  • READING 有数据可以读取时进入的状态
  • COMPLETED 完成状态,不会再接受任何事件

部分状态能双向转换(例如 NO_DEMAND 和 DEMAND),最初是UNSUBSCRIBED状态,如代码所示,调用state.subscribe就会由UNSUBSCRIBED变为NO_DEMAND状态,我们看下state.subscribe的代码:

        UNSUBSCRIBED {@SuppressWarnings("unchecked")@Overridevoid subscribe(RedisSubscription<?> subscription, Subscriber<?> subscriber) {LettuceAssert.notNull(subscriber, "Subscriber must not be null");if (subscription.changeState(this, NO_DEMAND)) {// 实际创建的是 PublishOnSubscriber 类型subscription.subscriber = RedisSubscriber.create(subscriber, subscription.executor);subscriber.onSubscribe(subscription);} else {throw new IllegalStateException(toString());}}}

可以看到,一旦修改成 NO_DEMAND 成功,就会构建订阅者 RedisSubscriber 并绑定到订阅关系RedisSubscription上(后面会用到),最后调用了subscriber的onSubscribe方法(还记得之前聊到的响应式API规范吗?订阅成功后需要调用订阅者的onSubscribe方法),很显然到这里为止我们提到过的两个类 RedisPublisher 和 RedisSubscriber 就是发布者和订阅者的关系。

        然后呢?后面貌似也没代码了,我们为啥最后我们能神奇的获取到redis get命令的返回结果呢?其实内部是reactor这个响应式框架在驱动,我们回过头再来看下之前的响应式demo:

RedisStringReactiveCommands<String, String> command = connection.reactive();
Mono<String> get = command.get("key");
get.subscribe(System.out::println); // 控制台输出value

我们知道最后一行我们通过Lambda表达式创建了一个订阅者,对于reactor这个框架而言,实际创建的是 LambdaMonoSubscriber 对象,而这个get当然也是被reactor包装过的Mono,但前面说过创建它的createMono方法实际上内部是从 RedisPublisher 来创建Mono的,所以我们调用的get.subscribe实际上最后会调用RedisPublisher的subscribe方法,这个方法我们前面已经讲解过逻辑了,所以上面的state.subscribe方法很重要的一点是调用了实际订阅者的onSubscribe方法(理论上这个方法应该是reactor框架来驱动调用的,但由于lettuce-core在实现Reactive时是自己构建的发布者-RedisPublisher,所以这个方法得lettuce自己来调用)。而这个订阅者subscriber(这里是LambdaMonoSubscriber对象)的onSubscribe方法中会去调用订阅关系(即RedisSubscription)的request方法:

        @Overridepublic final void request(long n) {// 这时的 state 是 NO_DEMAND 状态State state = state();if (traceEnabled) {LOG.trace("{} request: {}", state, n);}state.request(this, n);}

这时的state是 NO_DEMAND 状态request方法实际上是去调用state.request方法,我们看下 NO_DEMAND 的request方法做了什么:

        NO_DEMAND {@Overridevoid request(RedisSubscription<?> subscription, long n) {if (Operators.request(RedisSubscription.DEMAND, subscription, n)) {if (subscription.changeState(this, DEMAND)) {try {// 实际上调用的是它的dispatchCommand方法,向Redis发送要执行的命令subscription.checkCommandDispatch();} catch (Exception ex) {subscription.onError(ex);}subscription.checkOnDataAvailable();}subscription.potentiallyReadMore();subscription.state().onDataAvailable(subscription);} else {onError(subscription, Exceptions.nullOrNegativeRequestException(n));}}}

可以看到,该方法会将state由 NO_DEMAND 更新成 DEMAND,一旦更新成功,就会调用订阅关系 RedisSubscription 的checkOnDataAvailable方法,该方法最终会调用 RedisChannelHandler 对象的dispatch方法往其 RedisChannelWriter 类型的属性对象channelWriter中写数据,代码如下:

    protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {return channelWriter.write(cmd);}

说明这时候已经开始向Redis发送命令了,接着我们还是回过头来看NO_DEMAND 的request方法,接着调用的是订阅关系 RedisSubscription 的checkOnDataAvailable方法,该方法代码如下:

        final Queue<T> data = Operators.newQueue();// 其他代码略void checkOnDataAvailable() {// 如果数据为空,就去请求数据if (data.isEmpty()) {potentiallyReadMore();}// 如果数据不为空,那么通知数据可用if (!data.isEmpty()) {onDataAvailable();}}void potentiallyReadMore() {// getDemand获取的是订阅关系中request中请求的元素数量值,reactor中默认传Long.MAX_VALUEif ((getDemand() + 1) > data.size()) {// 当前state是DEMAND状态,调用readData方法后变成READING状态state().readData(this);}}/*** Called via a listener interface to indicate that reading is possible.*/final void onDataAvailable() {State state = state();state.onDataAvailable(this);}

这块代码我们先忽略(笔者发现这段代码逻辑有点问题,详见:Fix the RedisPublisher.RedisSubscription#potentiallyReadMore demand long overflow bug by manzhizhen · Pull Request #2383 · lettuce-io/lettuce-core · GitHub),我们先看NO_DEMAND 的request方法的 “subscription.state().onDataAvailable(subscription);” 这一行,该行才是关键,这时候subscription.state()是DEMAND,所以我们看下DEMAND的onDataAvailable方法:

        DEMAND {@Overridevoid onDataAvailable(RedisSubscription<?> subscription) {try {do {if (!read(subscription)) {return;}} while (subscription.hasDemand());} catch (Exception e) {subscription.onError(e);}}private boolean read(RedisSubscription<?> subscription) {State state = subscription.state();// concurrency/entry guardif (state == NO_DEMAND || state == DEMAND) {// 尝试将state变成READING状态,表明我们在读取数据if (!subscription.changeState(state, READING)) {return false;}} else {return false;}// 尝试读取并发射数据subscription.readAndPublish();if (subscription.allDataRead && subscription.data.isEmpty()) {state.onAllDataRead(subscription);return false;}// concurrency/leave guardsubscription.afterRead();if (subscription.allDataRead || !subscription.data.isEmpty()) {return true;}return false;}// 其他代码略}

可以看出onDataAvailable方法只要发现订阅关系RedisSubscription一直有需求(subscription.hasDemand())则会一直调用read方法,而read方法中最关键的就是subscription.readAndPublish();方法的调用,我们看看其实现:

        /*** Reads and publishes data from the input. Continues until either there is no more demand, or until there is no more* data to be read.*/void readAndPublish() {while (hasDemand()) {T data = read();if (data == null) {return;}DEMAND.decrementAndGet(this);this.subscriber.onNext(data);}}/*** Reads data from the input, if possible.** @return the data that was read or {@code null}*/protected T read() {return data.poll();}

可以看出其还是通过data(final Queue<T> data = Operators.newQueue();)来交互的,调用一次readAndPublish方法只尝试从data里面poll一次,如果为null,则直接返回,如果能poll到元素,那么将调用订阅者的onNext方法(这也符合响应流API规范)来发射该数据。

        总得有地方往data这个队列中放数据吧?否则上面的代码从data中获取到的永远是null,也不会有机会调用订阅者的onNext方法。Lettuce底层用的是Netty来实现Redis协议的,而且将 CommandHandler 注册到Netty的Pipeline中,CommandHandler监听了读事件,后面会调用 CommandWrapper(前面提到的RedisCommand的实现类,负责处理Redis应答)的complete方法,而complete方法后续又会调用订阅关系RedisSubscription的onNext方法,我们这时候来看看该onNext方法:

        @Overridepublic void onNext(T t) {State state = state();if (state == State.COMPLETED) {return;}// 如果data为空,而且当前state处于DEMAND状态(说明还没进入读数据状态),那么直接调用订阅者的onNext来发射数据if (data.isEmpty() && state() == State.DEMAND) {long initial = getDemand();if (initial > 0) {try {DEMAND.decrementAndGet(this);// 直接调用订阅者的onNext来发射数据,此例子中就是我们的System.out.println的Lambda表达式this.subscriber.onNext(t);} catch (Exception e) {onError(e);}return;}}// 如果有data队列中有数据,那么就将数据放进队列,如果放进队列失败,则发射一个错误if (!data.offer(t)) {Subscriber<?> subscriber = this.subscriber;Context context = Context.empty();if (subscriber instanceof CoreSubscriber) {context = ((CoreSubscriber) subscriber).currentContext();}Throwable e = Operators.onOperatorError(this, Exceptions.failWithOverflow(), t, context);onError(e);return;}// 通知数据可用,后续会从data中读取数据onDataAvailable();}

这样一切就圆满了,订阅关系RedisSubscription的onNext方法最终被Redis的读事件驱动(CommandHandler),而最终通过data队列或者直接调用订阅者的onNext方法,让订阅者来消费该元素。

        让我们简单总结下,Lettuce的Reactive的实现方式就是将Redis数据发送数据应答事件和Reactive库类(之前用的是RxJava,后面用的是reactor)绑定,从而让Lettuce支持响应式API编程。那这种方式和我们之前直接用 myReactiveRedisGet 方法来包装Lettuce的阻塞执行方法有什么区别?最大的一个区别是我们不用阻塞当前的业务线程,而是让其他线程(这里是Netty的工作线程)来发射元素并执行订阅者的方法(即本例子中的System.out.println的Lambda表达式),这样业务线程可以做其他事情(例如继续处理其他请求),而且重要的是另一点,即使Redis响应慢,也不会占用到Netty的工作线程,有应答时才会需要该工作线程处理。注意,订阅者的方法真的应该由Netty的工作线程来执行吗?这也许并不一定是个好注意,我们期望可以通过publishOn来修改此行为,我们来修改下之前给的Reactive的demo:

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStringReactiveCommands<String, String> command = connection.reactive();
Mono<String> get = command.get("key");
// get.subscribe(System.out::println); // 控制台输出value
get.publishOn(Schedulers.parallel()).subscribe(System.out::println); // 使用Schedulers.parallel()的线程池来进行控制台输出value

这里我们直接使用的是Schedulers.parallel()线程池。


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/25287.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习入门学习小记录5——【Pytorch】,模型推理间隔较大时,推理耗时增加且存在波动

目录 前言 ChatGPT的回答&#xff1a; 实际测试 解决方法&#xff1a; 来自chatGPT的解答&#xff1a; 前言 最近在进行模型部署的工作&#xff0c;比较关注模型的推理耗时&#xff0c;发现相同的模型在推理时存在耗时波动。所以做了一些小测试&#xff0c;并做一下经验记…

【chatgpt提效工具——cursor】

一、cursor&#xff1a; 地址&#xff1a;https://www.cursor.so/ 使用方法 ctrlk:需求 ctrlL:对话 二、vscode插件 cursorcode&#xff1a; 界面 快捷键一&#xff1a;在代码框中按下CtrlAltY弹出代码生成/优化命令框 快捷键二&#xff1a;在代码框中按下CtrlAltU弹出…

在Apple Watch上使用Siri发送消息的方法

1.抬起手腕唤醒Apple Watch后&#xff0c;说Hey Siri。或者&#xff0c;按住“ Digital Crown ”&#xff08;数字表冠&#xff09;或简单地举起您的手腕&#xff0c;然后在“ 提高说话能力”已启用的情况下开始讲话。说向[联系人/电话号码]发送消息。也可以说类似“告诉[联系]…

为什么每次和 Siri 聊天我都一肚子火

硅谷Live / 实地探访 / 热点探秘 / 深度探讨 全世界最痛苦的事&#xff0c;就是和一个跟你不在同一频道的人尬聊。 相反&#xff0c;和自己的亲朋好友另一半说话就轻松多了&#xff0c;比如你说 “今天上班好累啊”&#xff0c;你的家人、朋友就会说&#xff0c;“那今晚别做饭…

苹果“Enhanced Siri”,你知道多少?

小编发现从2015年2月&#xff0c;中国大陆开发CarPlay认证窗口后&#xff0c; CarPlay在市场上的普及度越来越高&#xff0c; 说到CarPlay认证&#xff0c;圈子内的人都能说出几个专业词语&#xff0c;而其中"Siri"就是最耳熟能详的。专业词语说大家都会能提两个词&a…

Logoist - 适用于设计师以及初次使用者,快速制作精美 logo

Logoist - 适用于设计师以及初次使用者的快速制作精美 logo 工具 从简单的标识到设计开发。它只需要一点时间来创建令人印象深刻的图像和矢量图形与Logoist。 我们的一体化应用程序为您提供了您需要的一切&#xff0c;将您的创意付诸实践或寻找新的灵感!它适合专业设计师和插画…

chatgpt赋能Python-python照片处理

介绍 Python是一种高级编程语言&#xff0c;它被广泛用于图像和照片处理。Python的大量第三方库和框架使得它成为处理照片的理想工具。本文将介绍如何用Python处理照片&#xff0c;并展示Python在照片处理领域的强大功能。 用Python处理照片 Python中最常用的照片处理库是Pi…

Midjourney词典秘籍,你真的不来看看吗?

引言&#xff1a; 今日有幸得到Midjourney秘籍&#xff0c;在此分享给各位伙伴&#xff0c;因篇幅限制&#xff0c;可以私我免费分享完整版&#xff0c;快来看看吧。

解密亚洲一号巨型仓储管理

解密亚洲一号巨型仓储管理 由京东商城运营研发部仓储研发部分享的有关京东一号巨型仓储管理经验... 详细解读 和小伙伴们一起来吐槽

【麒麟操作系统】查看和关闭139、445端口的方法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、查看139、445端口的方法二、关闭139、445端口的方法 三、139、445端口的作用 前言 作为国产操作系统&#xff0c;无论是银河麒麟&#xff0c;还是中标麒麟…

Apache 首次亚洲在线峰会: Workflow 数据治理专场

背景 大数据发展到今天已有 10 年时间&#xff0c;早已渗透到各个行业&#xff0c;数据需 求越来越多&#xff0c;这使得大数据 业务间的依赖关系也越来越复杂&#xff0c;另外也相信做数据的伙伴肯定对如何治理数据也是痛苦之至&#xff0c;再加上现今云原生时代的要求&#x…

2021十大亚洲现货黄金交易APP平台排名榜单

对于国内投资者而言&#xff0c;贵金属投资有内、外盘之分&#xff0c;因此在选择平台时&#xff0c;投资者应该从市场成熟度和产品优势出发&#xff0c;先明确自己在哪一个市场进行投资&#xff0c;再挑选那些信誉度极高、以客户利益为大前提、在行业内排名较前平台。 至于外…

“走进名企”之参观微软亚洲研究院

2021-11-24&#xff0c;感谢 CSDN 组织的活动“走进名企”——探访北京微软亚洲研究院活动。 微软亚研院简介 首先&#xff0c;我们来了解一下微软亚研院一些基本资料。 微软亚洲研究院是微软公司在亚太地区设立的研究机构&#xff0c;也是微软在美国本土以外规模最大的一个。…

人脸数据集——亚洲人脸数据集

大规模亚洲人脸数据的制作 在这次大规模亚洲人脸数据制作主要是亚洲明星人脸数据集&#xff0c;此次我爬取了大概20万张亚洲人脸图像&#xff0c;可以修改爬取每位明星图片的数量来获取更多的图片&#xff0c;过程中主要分以下几步&#xff1a; 获取明星名字列表 &#xff…

激活工银亚洲账号(收到开通成功通知短信以后,汇款到“港元储蓄”子账号)

前言 【工银亚洲】您的账户及银行服务已开通&#xff0c;请存入同名存款激活相关账户及银行服务。账户激活前一概拒纳收取任何非同名存款。 现在需要汇款到“港元储蓄”子账号以激活工银亚洲的账号。 三部曲 首次办理&#xff1a;2018-06-14 [查看详情] 账号已开通&#x…

亚洲名人人脸数据库制作

大规模亚洲人脸数据的制作 原文&#xff1a;https://blog.csdn.net/Alvin_FZW/article/details/82146800 在这次大规模亚洲人脸数据制作主要是亚洲明星人脸数据集&#xff0c;此次我爬取了大概20万张亚洲人脸图像&#xff0c;可以修改爬取每位明星图片的数量来获取更多的图片…

【北京】亚洲微软研究院-微软游记

文章目录 微软游记黑科技交流会办公区晚餐 结束 微软游记 11月24日&#xff0c;身为一名初出茅庐得技术博主&#xff0c;有幸来到微软亚洲研究院&#xff0c;与CSDN一些博主们共同学习&#xff01;通过这一天对微软的认识&#xff0c;让我有了更多的想法。今天我看到了微软在物…

CASIA-FaceV5亚洲人脸数据集以及对应的测试pairs文件

目录 一、前言 二、生成Pairs文件 三、下载资源 1、官网下载地址 2、百度网盘下载资源 一、前言 含有CASIA-FaceV5亚洲人脸数据集&#xff0c;以该数据集作为测试集所生成的同一人和不同人对应的测试文件。 CASIA-FaceV5亚洲人脸数据集有500人、每个人5张图片&#xff0…

【Windows系统】查看和关闭139、445端口的方法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、 Windows查看139、445端口的方法二、关闭445端口的方法三、关闭139端口的方法 前言 “航天派”公众号上一期文章介绍了“麒麟操作系统查看和关闭139、445端…

【无标题】chatgpt

演示站&#xff1a;https://ai.sybkxx.com/ 测试账号&#xff1a;demo 测试密码&#xff1a;12345678 源码下载&#xff1a;http://yp.1379.cloud:5212/s/WOco 安装方法&#xff1a; 上传程序到服务器 安装php扩展sg11 php支持7.3-7.4 解压程序安装 授权可以联系 你的域名…