RxJava学习记录

文章目录

  • 1. 总览
    • 1.1 基本原理
    • 1.2 导入包和依赖
  • 2. 操作符
    • 2.1 创建操作符
    • 2.2 转换操作符
    • 2.3 组合操作符
    • 2.4 功能操作符

1. 总览

1.1 基本原理

在这里插入图片描述
参考文献
在这里插入图片描述
构建流:每一步操作都会生成一个新的Observable节点(没错,包括ObserveOn和SubscribeOn线程变换操作),并将新生成的Observable返回,直到最后一步执行subscribe方法。编写Rxjava代码的过程其实就是构建一个一个Observable节点的过程
订阅流:从最后一个N5节点的订阅行为开始,依次执行前面各个节点真正的订阅方法。在每个节点的订阅方法中,都会生成一个新的Observer**,这个Observer会包含“下游”的Observer,这样当每个节点都执行完订阅(subscribeActual)后,也就生成了一串Observer,它们通过downstream,upstream引用连接
回调流: 当订阅流执行到最后,也就是第一个节点N0时,用onNext方法,两个作用,一个是把上个节点返回的数据进行一次map变换,另一个就是将map后的结果传递给下游。
小结:先从上到下把各个变换的Observable连成链(拼装流水线),然后在最后subscribe的时候,又从下到上通过每个Observable的OnSubscribe从最下的Subscriber对象开始连成链(流水线开始工作包装Subscriber),直到顶端,当顶端的Subscriber对象调用了onNext方法的时候,又从上往下调用Subscriber链的onNext(用户一层层拆开包装盒),里面执行了每个操作的变换逻辑。

1.2 导入包和依赖

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

2. 操作符

添加链接描述

2.1 创建操作符

  • Create
    private void test1() {//被观察者Observable;观察者Observer/消费者consumer;通过subsribe订阅Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext("1");
//                emitter.onError(new Throwable("异常模拟"));emitter.onComplete();}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("subscribe");}@Overridepublic void onNext(Object o) {System.out.println("onNext Observer " + o);}@Overridepublic void onError(Throwable e) {System.out.println("erro");}@Overridepublic void onComplete() {System.out.println("Complete Observer....");}});}private void test2() {Disposable d = Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext("2");emitter.onError(new Throwable("模拟异常"));emitter.onComplete();}}).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("Accept " + o);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("Accept " + throwable);}});}

Observer:
适合需要完整事件处理的场景,包括处理数据、错误和完成信号。
提供了更灵活的事件处理能力,可以根据需求实现对错误和完成事件的响应。
Consumer:
适合简单的场景,只需处理每个发出的数据项,而不需要关心错误或完成事件。
简化了代码结构,特别是在处理简单流时,使用起来更为便捷和直观。

  • 其他
    just 10个发射源
    from 将一个Iterable、一个Future、 或者一个数组,内部通过代理的方式转换成一个Observable
    interval操作符 创建一个按固定时间间隔发射整数序列的Observable,这个序列为一个无限递增的整数序列
    range操作符 发射一个范围内的有序整数序列,并且我们可以指定范围的起始和长度
    repeat操作符 重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数

2.2 转换操作符

map
将源Observable发送的数据转换为一个新的Observable对象

    private void test3(){Observable.just("111").map(new Function<String, Object>() {@Overridepublic Object apply(String s) throws Exception {return "my name is " + s;}}).subscribe(ob);}//subscribe
//onNext Observer my name is 111
//Complete Observer....

flatmap
添加链接描述
在这里插入图片描述
将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里(但是是无序的)

    private void test4(){Disposable ob = Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);}}).flatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(Integer o) throws Exception {final List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + o);}return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//为了无序 加了延迟}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String o) throws Exception {System.out.println(o);}});}//出现的 1 2 3会随机出现

concatMap
concatMap操作符类似于flatMap操作符,不同的一点是它按次序连接。

2.3 组合操作符

concat
concatArray
合并多个对象,按照一定的顺序
在这里插入图片描述
merge
在这里插入图片描述

2.4 功能操作符

SubscribeOn 改变调用它之前代码的线程,只有第一次有效
ObserveOn 改变调用它之后代码的线程, 可以多次调用

        Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {Log.d(TAG,"加了subscribeOn和observeOn: " + Thread.currentThread().getName());emitter.onNext("1111");emitter.onNext("22222");emitter.onComplete();}}).subscribeOn(Schedulers.newThread()) //1 进行创建和发射在子线程.observeOn(AndroidSchedulers.mainThread())// 2 在主线程消费;由于程序是test里面执行,所以不是main线程;后续改成了main是一样的道理.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG,"onSubscribe " + Thread.currentThread().getName());}@Overridepublic void onNext(Object o) {Log.d(TAG,"onNext " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG,"onError " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG,"onComplete " + Thread.currentThread().getName());}});}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
这一个onSubsribe 一直是在测试线程里


1. **Observable 的创建和订阅**:- 在 `subscribe()` 方法中,你创建了一个 `Observer` 对象,并将其订阅到了 `Observable` 对象上。2. **onSubscribe 方法执行**:- 当 `subscribe()` 方法被调用后,`Observer` 对象的 `onSubscribe` 方法会立即执行。这是因为 `onSubscribe` 是 `Observer` 接口的一部分,它负责接收 `Disposable` 对象,表示订阅关系,而不是响应数据流本身。3. **异步操作执行**:- 然后,`Observable` 中的异步操作开始执行。在你的例子中,通过 `Observable.create()` 创建了一个新的数据流,该数据流会在新线程(通过 `subscribeOn(Schedulers.newThread())` 指定的线程)中执行。这意味着 `Observable.create()` 中的代码块会在新线程中运行,而不会阻塞主线程。4. **数据流发射和消费**:- 在新线程中,`ObservableEmitter` 会发射数据项(通过 `emitter.onNext()` 发送数据)并在合适的时机调用 `onComplete()` 或者 `onError()`,表示数据流的结束。5. **observeOn 切换到主线程**:- 通过 `observeOn(AndroidSchedulers.mainThread())`,确保在数据流中的消费者部分(即 `Observer` 的 `onNext()`, `onError()`, `onComplete()` 方法)在主线程中执行。这个切换保证了在主线程更新UI或处理数据,从而避免了在主线程中执行耗时操作而导致的UI阻塞问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/372308.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【零基础】学JS之APIS(基于黑马)

喝下这碗鸡汤 披盔戴甲,一路勇往直前! 1. 什么是事件 事件是在编程时系统内发生的动作或者发生的事情 比如用户在网页上单击一个按钮 2. 什么是事件监听? 就是让程序检测是否有事件产生&#xff0c;一旦有事件触发&#xff0c;就立即调用一个函数做出响应&#xff0c;也称为 注…

详解Java垃圾回收(GC)机制

一、为什么需要垃圾回收 如果不进行垃圾回收&#xff0c;内存迟早都会被消耗空&#xff0c;因为我们在不断的分配内存空间而不进行回收。除非内存无限大&#xff0c;我们可以任性的分配而不回收&#xff0c;但是事实并非如此。所以&#xff0c;垃圾回收是必须的。 二、哪些内…

【Selenium配置】WebDriver安装浏览器驱动(ChromeEdge)

【Selenium配置】WebDriver安装浏览器驱动&#xff08;Chrome&Edge&#xff09; 文章目录 【Selenium配置】WebDriver安装浏览器驱动&#xff08;Chrome&Edge&#xff09;Chrome确认Chrome版本下载对应driver把解压后的chromedriver文件放在chrome安装目录下&#xff0…

深入讲解C++基础知识(三)

目录 一、命名空间1. 创建命名空间2. 访问命名空间2.1 using 编译指令2.2 using 声明2.3 直接使用全名 3. 嵌套命名空间4. 匿名命名空间5. 命名空间的注意事项5.1 头文件中不应该包含 using 声明和 using 编译指令。5.2 最好使用 using 声明而不是 using 编译指令 二、标准库类…

极客天成RDMA分布式存储加速自动驾驶仿真建模

01 自动驾驶汽车行业发展现状 随着全球5G技术的应用与发展&#xff0c;人工智能产业的逐步推进&#xff0c;无人驾驶汽车行业市场规模显著增长。中商产业研究院发布的《2024-2029全球与中国无人驾驶列车系统市场现状及未来发展趋势》显示&#xff0c;2023年全球无人驾驶汽车行…

Redis 7.x 系列【16】持久化机制之 AOF

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 执行原理2.1 Redis 6.x2.1.1 直接写2.1.2 重写 2.2 Redis 7.x2.2.1 直接写2…

JAVA基础-----128陷阱

一、何为128陷阱 Java中Integer类型在使用比较时的特殊行为------128陷阱&#xff0c;解释了当数值在-128到127范围内&#xff0c;由于valueOf方法的缓存机制导致地址相同&#xff0c;比较为真&#xff1b;超出这个范围则新分配内存&#xff0c;地址不同&#xff0c;比较为假。…

抖音矩阵云剪系统saas源码 短视频矩阵获客管理系统

2024抖音矩阵云混剪系统是一款专业的短视频营销管理工具。该系统支持多平台多账号的集中式管理&#xff0c;并实现一键式作品发布功能。它配备了智能标题生成和关键词优化工具&#xff0c;以及排名查询机制&#xff0c;帮助用户提升内容在平台上更好的矩阵管理. 智能剪辑 托管发…

Vue框架引入

vue简介 1.1.vue是什么?Vue官网 英文官网: https://vuejs.org/中文官网: https://cn.vuejs.org/ vue是一套构建用户界面的渐进式javascript框架 构建用户界面:将我们手里拿到的数据通过某种办法变成用户可以看见的界面前端工程师的职责:就是在合适的时候发出合适的请求,然后…

树莓派_Pytorch学习笔记20:初步认识深度学习框架

今日继续学习树莓派4B 4G&#xff1a;&#xff08;Raspberry Pi&#xff0c;简称RPi或RasPi&#xff09; 本人所用树莓派4B 装载的系统与版本如下: 版本可用命令 (lsb_release -a) 查询: ​ Python 版本3.7.3&#xff1a; ​ 本文很水&#xff0c;就介绍一下我以后的学习使用P…

R包:ggsci期刊配色

介绍 不同期刊配色大多数时候不一样&#xff0c;为了更好符合期刊图片颜色的配色&#xff0c;有人开发了ggsci这个R包。它提供以下函数&#xff1a; scale_color_palname() scale_fill_palname() 对应不同期刊的color和fill函数。 导入数据R包 library("ggsci")…

vue + element ui 实现侧边栏导航栏折叠收起

首页布局如下 要求点击按钮,将侧边栏收缩, 通过 row 和 col 组件&#xff0c;并通过 col 组件的 span 属性我们就可以自由地组合布局。 折叠前 折叠后 <template><div class"app-layout" :class"{ collapse: app.isFold }"><div class&…

CANoe的capl调用Qt制作的dll

闲谈 因为Qt封装了很多个人感觉很好用的库&#xff0c;所以一直想通过CAPL去调用Qt实现一些功能&#xff0c;但是一直没机会&#xff08;网络上也没看到这方面的教程&#xff09;&#xff0c;这次自己用了两天&#xff0c;踩了很多坑&#xff0c;终于是做成了一个初步的调用方…

Zabbix分布式监控

目录 分布式监控架构 实现分布式监控的步骤 优点和应用场景 安装Zabbix_Proxy Server端Web页面配置 测试 Zabbix 的分布式监控架构允许在大规模和地理上分散的环境中进行高效的监控。通过分布式监控&#xff0c;Zabbix 可以扩展其监控能力&#xff0c;支持大量主机和设备…

汇川CodeSysPLC教程 Modbus变量编址

线圈&#xff1a;位变量&#xff0c;只有两种状态0和1。汇川PLC中包含Q区及SM区等变量。 寄存器&#xff1a;16位&#xff08;字&#xff09;变量&#xff0c;本PLC中包含M区及SD区等变量 说明&#xff1a; 汇川HMI的专用协议使用不同功能码&#xff1a;在访问SM时&#xff0c…

Linux搭建hive手册

一、将hive安装包上传到NameNode节点并解压 1、删除安装MySQL时的.rpm文件 cd /opt/install_packages/ rm -rf *.rpm 2、将安装包拖进/install_packages目录 3、解压安装包 tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /opt/softs/ 4、修改包名 cd /opt/softs mv apache-…

如何在应用运行时定期监控内存使用情况

如何在应用运行时定期监控内存使用情况 在 iOS 应用开发中&#xff0c;实时监控内存使用情况对于优化性能和排查内存泄漏等问题非常重要。本文将介绍如何在应用运行时定期监控内存使用情况&#xff0c;使用 Swift 编写代码并结合必要的工具和库。 1. 创建桥接头文件 首先&…

PyCharm如何安装requirements.txt中的依赖包

问题&#xff1a;下载别人的源码&#xff0c;如何安装代码中requirement.txt中的依赖包。 解决方案&#xff1a; &#xff08;1&#xff09;打开PyCharm下面的Terminal&#xff0c;先为代码创建单独的虚拟环境并进入到虚拟环境中&#xff08;每个项目单独的环境&#xff0c;这…

LLM- 注意力机制

一&#xff1a;什么是注意力机制&#xff0c;以及产生背景&#xff1f; &#xff08;1&#xff09;&#xff1a;RNN模型[RNN模型]的缺点&#xff1a;下图是例如RNN模型解决机器翻译的例子&#xff0c;从这个例子可以看到Encoder最后一个向量&#xff08;eos&#xff09;送给了…

bdeaver mysql忘记localhost密码修改密码添加用户

描述 bdeaver可以连接当前的localhost数据库&#xff0c;但不知道数据库密码是什么。用这个再建一个用户&#xff0c;用来连接数据库 解决 1、在当前的数据库localhost右键&#xff0c;创建-用户 设置这个用户&#xff0c;密码 加权限 2、连接 用新的账号密码去连接&#x…