文章目录
- 一、什么是RxJava
- 二、使用前的准备
- 1、导入相关依赖
- 2、字段含意
- 3、Upstream/Downstream——上/下游
- 4、BackPressure
- 5、BackPressure策略
- 6、“热” and “冷” Observables
- 7、 基类
- 8、事件调度器
- 9、操作符是什么?
- 三、RxJava的简单用法
- 1、Observable——Observer
- 2、Flowable——Subscriber
- 3、Completable——CompletableObserver
- 4、Maybe——MaybeObserver
- 5、Single——SingleObserver
- **Single的操作符**
- 5.1、just: 创建一个发射单一数据项的 `Single`
- 5.2、error:创建一个发射错误通知的 `Single`
- 5.3 map: 对 `Single` 中的数据进行转换
- 5.4 flatMap: 将一个 `Single` 转换为另一个 `Single`
- 5.5 zip:将多个 `Single` 组合成一个新的 `Single`,并在它们都成功时触发。
- 5.6 Single的转化模式
- 5.6.1 将 Single 转换为 Observable——single.toObservable
- 5.6.2 将 Observable 转换为 Single
- 5.6.3 将 Single转换为 Completable——single.ignoreElement
- 5.6.4 将 Single转换为 Maybe——single.toMaybe
- 四、事件调度器释放事件
- 五、Scheduler——调度者
一、什么是RxJava
ReactiveX 是一个使用可观察序列编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据和/或事件序列,并添加了运算符,允许以声明方式将序列组合在一起,同时抽象出对低级线程、同步、线程安全、并发数据结构和非线程等问题的关注和阻塞 I/O。
官网链接:ReactiveX
什么是响应式编程?
响应式编程是一种编程范式,旨在处理异步数据流和事件驱动的编程。它着重于数据流和变化的处理,使得在异步和事件驱动环境中更容易构建可维护
、可伸缩
和高响应
的应用程序。
- 数据流: 响应式编程关注数据流,将数据视为一系列事件或变化。
- 响应性: 响应式编程强调应用程序对事件和数据的即时响应能力。它允许应用程序根据数据流中的事件来触发操作,而不是等待数据的拉取或轮询。
- 观察者模式: 响应式编程经常使用观察者模式,其中存在一个可观察对象(Observable)和一个或多个观察者(Observer)。可观察对象发出事件,观察者订阅并对这些事件作出反应。
- 流式操作: 响应式编程提供了一组丰富的操作符,用于处理、过滤、转换和合并数据流。这些操作符允许开发人员以声明性方式构建数据流处理管道。
- 背压处理: 响应式编程处理异步数据流时,考虑了背压问题,即生产者产生数据的速度大于消费者处理数据的速度。它提供了一些机制来处理背压,如缓冲、丢弃、错误处理等。
- 异步性: 在响应式编程中,大部分操作都是异步执行的,这有助于避免应用程序的阻塞,提高性能和响应能力。
RxJava的观察者模式
RxJava有四个基本概念:Observer
(观察者),Observable
(被观察者),subscribe
(订阅),事件
。
Observer
和Observable
通过subscribe()
实现订阅关系,从而Observable
可以在需要的时候发出事件通知Observer。
- Observer: 观察者,它决定事件发生时有怎么样的行为;
- Observable: 被观察者,它决定什么时候出发事件以及触发什么样的事件;
- subscribe:订阅,将Observer和Observable关联起来
二、使用前的准备
1、导入相关依赖
最新依赖地址:Github-RxJava/RxAndroid
implementation "io.reactivex.rxjava3:rxjava:3.1.8"
2、字段含意
Reactive
:根据上下文一般翻译为反应式、响应式。
Iterable
:可迭代对象,支持以迭代器的形式遍历。
Observable
: 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者。
Observer
:观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现。
emit
: 含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,翻译为发射。
items
: 在Rx里是指Observable发射的数据项。
3、Upstream/Downstream——上/下游
在响应式编程中,"上游"和"下游"通常用于描述数据流的生产者和消费者之间的关系。
- 上游(Upstream):上游是数据流的生产者,它生成和发出数据项。上游通常是源(例如,传感器、数据库查询、文件读取等),它们生成数据并将其传递给下游。
- 下游(Downstream):下游是数据流的消费者,它接收和处理来自上游的数据项。下游可以执行各种操作,如过滤、映射、转换、订阅等。它们通常是应用程序中的组件,用于处理和响应来自上游的数据。
在响应式编程中,数据通过流动的方式从上游传递到下游,这是一种异步的、非阻塞的方式。
上游和下游之间的通信通常是通过观察者模式或发布-订阅模式进行的,以实现数据的异步传递和处理。这种方式使得可以构建高效的、响应式的应用程序,能够处理异步数据流。
4、BackPressure
BackPressure直译为:背压,也叫做反压。
背压(Backpressure)是指在异步编程中,当生产者(Producer)生成数据的速度快于消费者(Consumer)处理数据的速度时,数据压力会在系统中积累,可能导致一些问题,如内存溢出或性能下降。
在RxJava中也就是被观察者(Observable)发送事件的速度快于观察者(Observer)的速度。
背压问题通常出现在处理数据流的情况下,其中数据生产速度不受消费速度的限制。
5、BackPressure策略
MISSING
:缺省设置,不做任何操作,而不进行任何缓冲或丢弃。ERROR
: 当订阅者无法处理来自发布者的数据时,会引发MissingBackpressureException
异常,表示出现了背压问题。BUFFER
:当订阅者无法处理来自发布者的数据时,数据会被缓冲在内存中,直到订阅者可以处理它们。DROP
: 把存不下的事件丢弃。LATEST
:只保留最新的数据项,丢弃之前的数据。
6、“热” and “冷” Observables
Observable 何时开始发出其items
?这取决于Observable
。一个“热”Observable 可能会在创建后立即开始发射items
,因此任何后续订阅该 Observable 的观察者都可能会开始观察中间某个位置的序列。另一方面,“冷”Observable 会等到观察者订阅它之后才开始发射items
,因此这个观察者可以确保会收到整个数据序列。
7、 基类
RxJava 3 中的基类相比RxJava 2 没啥改变,主要有以下几个基类:
-
io.reactivex.Observable:发送0个/N个的数据,不支持BackPressure,有
onNext
和onComplete
-
io.reactivex.Flowable:发送0个/N个的数据,支持Reactive-Streams和支持BackPressure,有
onNext
和onComplete
-
io.reactivex.Single:只能发送单个数据或者一个错误,有
onSuccess
。 -
io.reactivex.Completable:没有发送任何数据,但只处理 onComplete 和 onError 事件。有
onComplete
-
io.reactivex.Maybe:能够发射0或者1个数据,要么成功,要么失败。有
onSuccess
和onComplete
8、事件调度器
RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进行释放事件,这样才不会导致内存泄漏,这里的管理者我们称为事件调度器CompositeDisposable
。
9、操作符是什么?
RxJava 提供了各种操作符,用于对观察序列进行转换、过滤、组合和处理。这些操作符可帮助你更灵活地处理异步数据流。
常见的操作符有:create、just、error、map、flatMap。在后面介绍Single的时候会简单的介绍。更多关于操作符的使用在下一篇博客这里简单了解概念就行了。
三、RxJava的简单用法
**RxJava以观察者模式为骨架,**有两种常见的观察者模式:
- Observable(被观察者)/Observer(观察者)
- Flowable(被观察者)/Subscriber(观察者)
使用流程:
- 创建被观察者
- 创建观察者
- 订阅被观察者
- 取消订阅(这一步可以省略)
1、Observable——Observer
一般用法:
//创建被观察者/事件源
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("a");emitter.onNext("b");emitter.onNext("c");emitter.onComplete();}
});//创建观察者
Observer observer = new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.e("TAG", "onSubscribe == 订阅");}@Overridepublic void onNext(@NonNull String s) {Log.e("TAG", "onNext == " + s);}@Overridepublic void onError(@NonNull Throwable e) {Log.e("TAG", "onError == " + e.toString());}@Overridepublic void onComplete() {Log.e("TAG", "onComplete");}
};//订阅(观察者监视被观察着)
observable.subscribe(observer);//取消订阅
observable.distinct();
这种观察者模型不支持背压:当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException
。
消耗内存过大只会OOM
。所以,当我们使用Observable——Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线作为参考)。
并且观察者具有多个重载方法:
//观察者不对被观察者发送的事件做出响应(但是被观察者还可以继续发送事件)public final Disposable subscribe()//观察者对被观察者发送的任何事件都做出响应public final void subscribe(Observer<? super T> observer)//表示观察者只对被观察者发送的Next事件做出响应public final Disposable subscribe(Consumer<? super T> onNext)//表示观察者只对被观察者发送的Next & Error事件做出响应public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)//表示观察者只对被观察者发送的Next & Error & Complete事件做出响应public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)//表示观察者只对被观察者发送的Next & Error & Complete & onSubscribe事件做出响应public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)
2、Flowable——Subscriber
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onNext(4);emitter.onComplete();}
}, BackpressureStrategy.BUFFER);Subscriber<Integer> subscriber = new Subscriber<Integer>() {Subscription sub;@Overridepublic void onSubscribe(Subscription s) {Log.w("TAG", "onsubscribe start");sub = s;s.request(1);Log.w("TAG", "onsubscribe end");}@Overridepublic void onNext(Integer integer) {Log.e("TAG", "onNext == " + integer);sub.request(1);}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {Log.e("TAG", "onComplete");}
};flowable.subscribe(subscriber);
Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)
来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
Flowable使用create()创建时,必须指定BackPressure策略。
注意
尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。
3、Completable——CompletableObserver
它只有onComplete和onError两个事件
//被观察者
Completable completable = Completable.create(new CompletableOnSubscribe() {@Overridepublic void subscribe(@NonNull CompletableEmitter emitter) throws Throwable {emitter.onComplete();}
});//订阅观察者
completable.subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onComplete() {Log.e("TAG","onComplete");}@Overridepublic void onError(@NonNull Throwable e) {}
});
要转换成其他类型的被观察者,也是可以使用toFlowable()
、toObservable()
等方法去转换。
4、Maybe——MaybeObserver
如果你的需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single
和Completable
的混合体。
//被观察者Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull MaybeEmitter<String> emitter) throws Throwable {emitter.onSuccess("have Data"); //发送一个数据的情况
// emitter.onComplete(); //不发送数据的情况}});//订阅观察者maybe.subscribe(new MaybeObserver<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onSuccess(@NonNull String s) {Log.e("TAG",s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {Log.e("TAG","无数据");}});
5、Single——SingleObserver
Single类似于Observable,不同的是,它总是只发射一个值,而不是发射一系列的值(并不存在MissingBackpressureException
问题),所以当你使用一个单一连续事件流,这样可以使用Single。
Single观察者只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError。
Single<String> stringSingle = Single.create(new SingleOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {emitter.onSuccess("success1");emitter.onSuccess("success2");}
});stringSingle.subscribe(new SingleObserver<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.e("TAG", "onSubscribe: "+d);}@Overridepublic void onSuccess(@NonNull String s) {Log.e("TAG", "onSuccess: "+s);}@Overridepublic void onError(@NonNull Throwable e) {e.printStackTrace();}
});
可以看见数据只会发送一次,Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
Single
类型的操作符用于处理这些单一的数据项或错误。
Single的操作符
更多关于操作符的使用在下一篇博客这里简单了解概念就行了。
操作符 | 返回值 | 说明 |
---|---|---|
compose | Single | 创建一个自定义的操作符 |
concat and concatWith | Observable | 连接多个 Single 和 Observable 发射的数据 |
create | Single | 调用观察者的 create 方法创建一个 Single |
error | Single | 返回一个立即给订阅者发射错误通知的 Single |
flatMap | Single | 返回一个 Single,它发射对原 Single 的数据执行 flatMap 操作后的结果 |
flatMapObservable | Observable | 返回一个 Observable,它发射对原 Single 的数据执行 flatMap 操作后的结果 |
from | Single | 将 Future 转换成 Single |
just | Single | 返回一个发射一个指定值的 Single |
map | Single | 返回一个 Single,它发射对原 Single 的数据执行 map 操作后的结果 |
merge | Single | 将一个 Single(它发射的数据是另一个 Single,假设为 B)转换成另一个 Single(它发射来自另一个 Single(B) 的数据) |
merge and mergeWith | Observable | 合并发射来自多个 Single 的数据 |
observeOn | Single | 指示 Single 在指定的调度程序上调用订阅者的方法 |
onErrorReturn | Single | 将一个发射错误通知的 Single 转换成一个发射指定数据项的 Single |
subscribeOn | Single | 指示 Single 在指定的调度程序上执行操作 |
timeout | Single | 它给原有的 Single 添加超时控制,如果超时了就发射一个错误通知 |
toSingle | Single | 将一个发射单个值的 Observable 转换为一个 Single |
zip and zipWith | Single | 将多个 Single 转换为一个,后者发射的数据是对前者应用一个函数后的结果 |
用法示例:
5.1、just: 创建一个发射单一数据项的 Single
Single<Integer> single = Single.just(42);
5.2、error:创建一个发射错误通知的 Single
Single<String> single = Single.error(new RuntimeException("Something went wrong"));
5.3 map: 对 Single
中的数据进行转换
Single<Integer> source = Single.just(5);Single<String> mapped = source.map(new Function<Integer, String>() {@Overridepublic String apply(Integer integer) throws Throwable {return "integer : "+integer;}});
这意味着原始整数数据 5
经过映射操作转变为了字符串数据,带有特定的前缀。
这种操作在响应式编程中非常有用,因为它允许你对数据进行转换和处理,而不改变数据流的类型。你可以将原始数据映射为需要的格式,以满足应用程序的需求。
5.4 flatMap: 将一个 Single
转换为另一个 Single
Single<Integer> source = Single.just(5);Single<String> mapped = source.flatMap(new Function<Integer, SingleSource<? extends String>>() {@Overridepublic SingleSource<? extends String> apply(Integer integer) throws Throwable {return Single.just("Return : "+integer);}});
5.5 zip:将多个 Single
组合成一个新的 Single
,并在它们都成功时触发。
Single<Integer> source = Single.just(5);
Single<String> mapped = source.flatMap(new Function<Integer, SingleSource<? extends String>>() {@Overridepublic SingleSource<? extends String> apply(Integer integer) throws Throwable {return Single.just("Return : " + integer);}
});Single single = Single.zip(source, mapped, new BiFunction<Integer, String, Object>() {@Overridepublic Object apply(Integer integer, String s) throws Throwable {return "Return : " + integer + s;}
});
5.6 Single的转化模式
5.6.1 将 Single 转换为 Observable——single.toObservable
Single<Integer> source = Single.just(5);// 将 Single 转换为 Observable
Observable<Integer> observable = source.toObservable();// 现在你可以将 Single 的结果集成到 Observable 中
observable.subscribe(value -> Log.e("TAG","Received value: " + value),error -> Log.e("TAG","Error: " + error),() -> Log.e("TAG","Completed")
);
5.6.2 将 Observable 转换为 Single
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);// 将 Observable 转换为 Single,只发射第一个数据项或错误
Single<Integer> single = observable.first(0);// 现在你可以将 Observable 的结果集成到 Single 中
single.subscribe(value -> System.out.println("Received value: " + value),error -> System.err.println("Error: " + error)
);
5.6.3 将 Single转换为 Completable——single.ignoreElement
Single<Integer> single = Single.just(42);// 将 Single 转换为 Completable,忽略结果,只关注完成或错误
Completable completable = single.ignoreElement();// 现在你可以使用 Completable 来执行某些操作
completable.subscribe(() -> System.out.println("Completed"),error -> System.err.println("Error: " + error)
);
5.6.4 将 Single转换为 Maybe——single.toMaybe
Single<Integer> single = Single.just(42);// 将 Single 转换为 Maybe,考虑成功结果、错误或没有结果
Maybe<Integer> maybe = single.toMaybe();// 现在你可以使用 Maybe 来处理这三种情况
maybe.subscribe(value -> System.out.println("Received value: " + value),error -> System.err.println("Error: " + error),() -> System.out.println("No result")
);
四、事件调度器释放事件
-
Disposable:
Disposable
是 RxJava 的通用接口,用于表示订阅关系。- 它与所有的 RxJava 数据类型都相关,包括
Observable
、Flowable
、Single
、Completable
和Maybe
。 - 当你订阅一个数据流时,RxJava 会返回一个
Disposable
对象,你可以使用它来取消订阅或检查订阅状态。
// 创建一个简单的 Observable,发射一些数据Observable stringObservable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("Xiyou");emitter.onNext("3G");emitter.onNext("Android");emitter.onComplete();}});// 订阅 Observable 并获取 Disposable 对象Disposable disposable = stringObservable.subscribe(value -> Log.e("TAG", value.toString()),error -> Log.e("TAG", "ERROR" + error),() -> Log.e("TAG", "Completed"));disposable.dispose(); //在需要的时候取消订阅
-
CompositeDisposable:
CompositeDisposable
是Disposable
接口的实现。- 它特别用于管理多个订阅关系,以便一次性取消多个订阅。
CompositeDisposable
可以添加多个Disposable
对象,并在需要时一次性取消它们。- 这在管理多个订阅关系时非常有用,例如在 Android 中管理多个异步任务的订阅。
//创建一个简单的 Observable,发射一些数据Observable stringObservable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("Xiyou");emitter.onNext("3G");emitter.onNext("Android");emitter.onComplete();}});// 创建一个 CompositeDisposable 来管理订阅关系CompositeDisposable compositeDisposable = new CompositeDisposable();// 订阅 Observable 并获取 Disposable 对象Disposable disposable = stringObservable.subscribe(value -> Log.e("TAG", value.toString()),error -> Log.e("TAG", "ERROR" + error),() -> Log.e("TAG", "Completed"));// 将 Disposable 对象添加到 CompositeDisposable 中compositeDisposable.add(disposable);// 在不再需要订阅关系时,可以取消它们// compositeDisposable.clear(); // 取消所有订阅// 或者单独取消某个订阅// disposable.dispose();// 在不再需要 CompositeDisposable 时,清理它compositeDisposable.dispose();
CompositeDisposable提供的方法中,都是对事件的管理
- dispose():释放所有事件
- clear():释放所有事件,实现同dispose()
- add():增加某个事件
- addAll():增加所有事件
- remove():移除某个事件并释放
- delete():移除某个事件
五、Scheduler——调度者
在RxJava默认规则中,事件的发出和消费都是在同一个线程中发生的,那么上面的这些例子来说,就是一个同步的观察者模式。
在RxJava中Scheduler(调度器)相当于线程控制器,RxJava通过Scheduler来指定那一部分代码执行在哪一个线程。我们来看看简单的例子:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("RxJava:e.onNext== 第一次");emitter.onComplete();Log.d("TAG", "subscribe()线程==" + Thread.currentThread().getId());}}).subscribeOn(Schedulers.io())//指定被观察者subscribe()(发射事件的线程)在IO线程().observeOn(AndroidSchedulers.mainThread());//指定观察者接收响应事件的线程在主线程observable.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull String s) {// 接收到数据时的回调,s 是传递的数据Log.d("TAG", "Received data: " + s);Log.d("TAG", "onNext()线程==" + Thread.currentThread().getId());}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});
- subscribeOn():用于指定
Observable
被观察者subscribe()
时所发生的线程,即指定发生事件的线程 - observeOn():指定
Observer
观察者接收&响应事件的线程,即观察者接收事件的线程
注意:多次指定发射事件的线程只有第一次指定有效,也就是说多次调用subscribeOn()只有第一次有效,其余的会被忽略;但是多次指定订阅者接收事件的线程是可以的,也就是说每observeOn()一次,接收事件的线程就会切换一次。
- Schedulers.io():代表IO操作的线程,通常用于网络、读写文件等IO密集型的操作。行为模式和new Thread()差不多,只是IO的内部是一个无上限的线程池,可重用空闲的线程,更高效(不要把计算工作放在IO内,可以避免创建不必要的线程)
- AndroidSchedulers.mainThread():Android的主线程;用于更新UI
- Schedulers.newThread():总是启用新线程,并在新线程中执行操作;多用于耗时操作
- Schedulers.computation(): 代表CPU计算密集型的操作,即不会被IO等操作限制性能的操作。