Kotlin Flow响应式编程,操作符函数进阶

本文同步发表于我的微信公众号,扫一扫文章底部的二维码或在微信搜索 郭霖 即可关注,每个工作日都有文章更新。

大家好,今天原创。

在上一篇原创文章当中,我跟大家说了会开启一个新的系列,讲一讲Kotlin Flow响应式编程从入门到进阶的内容。

总共计划是用三篇文章讲完,而本篇则是这个系列的第二篇文章。如果你还没有看过前面的基础知识入门的话,可以先去参考这里 Kotlin Flow响应式编程,基础知识入门 。

本篇文章我打算着重讲解一下操作符函数的相关内容。什么是操作符函数?如果你熟悉RxJava,那么对于操作符函数一定不会陌生。如果你不熟悉RxJava,那么操作符函数就是那个让RxJava如此难学的元凶。

准确来说,RxJava的操作符函数不是难,而是多。之前Google在刚推出自家LiveData时也曾调侃过,我们的操作符函数只有两个,可不像某些库有上百万个那么多。

我相信应该没有任何一个人能够熟练掌握RxJava的所有操作符函数,这一点越是RxJava的老手应该越是深有体会。正确的做法是,只去熟记那些最常用的操作符函数即可,剩下绝大多数的操作符函数都是不太能用得到的,或者需要用到时再去查阅文档学习即可。

那么Kotlin Flow的操作符函数也是类似的,虽然它没有RxJava那么多,但是着实也不少。本篇文章我会尽可能将最常用的操作符函数全部覆盖到,那么学完本篇文章之后,在绝大部分Kotlin Flow的操作符函数使用场景中,你应该都可以比较得心应手了。

另外,我对接下来即将介绍的所有操作符函数按照功能相似度以及难易程度进行了分组,会按照先易后难的原则一一进行介绍。

话不多说,开始走起。


0. setup

我们会以全程实践的方式来学习文中所有的操作符函数。因此,将实践环境提前搭建好是必不可少的。

首先创建一个Android项目,并在项目中添加如下依赖库:

dependencies {...implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1"implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.1"
}

由于Flow以及操作符函数并不依赖于Android项目,因此我们并不必非得在Android项目中来进行实践。简单起见,我们只需要在一个Kotlin类当中来实践本篇文章的内容即可。

创建一个Flow.kt类,并在其中定义一个main()函数,如下所示:

fun main() {runBlocking {}
}

这里我们在main()函数中添加了一个runBlocking代码块,它的作用是提供协程作用域给稍后的Flow使用,并且在代码块中的所有代码执行完之前阻塞当前线程。

因此,我们把后续的所有例子都放到runBlocking代码块中去执行就可以了。

另外当你定义好了main()函数之后,你会发现它的左边会出现一个运行箭头:

在这里插入图片描述

只要点击这个箭头就可以执行main()函数中的代码了。

好了,实践环境到这里已经搭建完成,接下来正式开始学习操作符函数吧。


1. map

刚才我们有说会按照先易后难的原则进行学习,那么毫无疑问,map一定是最容易的操作符函数了。

在很多编程语言里面都有内置的map函数,甚至Kotlin自己就有。RxJava中也有map这个操作符函数,所以我们在Flow中第一个介绍它简直就是理所应当的事情。

那么顾名思义,map就是用于将一个值映射成另一个值,具体映射的规则我们则可以在map函数中自行定义。

这里我通过一个简单的例子就能带大家快速理解map函数,这种简单的操作符函数没必要花费太多时间。

fun main() {runBlocking {val flow = flowOf(1, 2, 3, 4, 5)flow.map {it * it}.collect {println(it)}}
}

首先,我们通过flowOf函数构造了一个flow对象,里面依次发送了1, 2, 3, 4, 5这几个值。

那么如果直接对这个flow去collect,我们理所应当打印出来的也是1, 2, 3, 4, 5这几个值。

但是这里在collect之前,我们调用了map操作符函数,并在里面做了一下平方运算。因此collect之后的结果就会变成1, 4, 9, 16, 25。

验证一下,打印结果如下图所示:

在这里插入图片描述

这就是map操作符函数,非常简单,相信你一下子就已经理解了。


2. filter

filter也是一个非常简单的操作符函数。顾名思义,它是用来过滤掉一些数据的。

Flow当中的操作符函数既可以单独使用,也可以结合其他操作符函数一起使用。

这里我们通过结合filter和map这两个操作符函数,来快速演示一下用法,你就立刻能掌握了。

fun main() {runBlocking {val flow = flowOf(1, 2, 3, 4, 5)flow.filter { it % 2 == 0}.map {it * it}.collect {println(it)}}
}

这里在filter函数中指定了一个条件,判断数据是否是偶数。

因而,flow中的数据只有是偶数的情况下才会继续向下传递,奇数则会被filter函数过滤掉。

验证一下结果,如下图所示:

在这里插入图片描述

非常好理解,现在filter操作符函数你也已经掌握了。


3. onEach

onEach又是一个非常简单的操作符函数,它甚至比map和filter还要简单直白,因为它就是用来遍历每一条数据的。

比如说我们想把flow中的每一条数据打印出来,借助onEach函数就可以这样写:

fun main() {runBlocking {val flow = flowOf(1, 2, 3, 4, 5)flow.onEach {println(it)}.collect {}}
}

可能有的朋友会说,这不是脱了裤子放屁嘛,我在collect函数中就可以把数据打印出来了,还需要借助onEach函数干什么。

确实,但是collect函数中打印出的是最终的结果。如果你想要查看某个中间状态时flow的数据状态,借助onEach就非常有用了。

fun main() {runBlocking {val flow = flowOf(1, 2, 3, 4, 5)flow.filter {it % 2 == 0}.onEach {println(it)}.map {it * it}.collect {}}
}

可以看到,这里我们将onEach函数插入到了filter函数和map函数之间。那么现在onEach所处的就是一个经过了偶数过滤,但是还没有经过乘积运算的一个状态。

验证一下结果,如下图所示:

在这里插入图片描述

没有问题,结果正如我们想象的那样,所有偶数都被打印出来了。


4. debounce

最简单的操作符函数差不多就这些,接下来要学习稍微有些难度的操作符函数了。

debounce函数可以用来确保flow的各项数据之间存在一定的时间间隔,如果是时间点过于临近的数据只会保留最后一条。

这个函数在某些特定场景下特别有用。

想象一下我们正在Edge浏览器的地址栏中输入搜索关键字,浏览器的地址栏下方通常都会给出一些搜索建议。

在这里插入图片描述

这些搜索建议是根据用户当前输入的内容通过发送网络请求去服务器端实时获取的。

但如果用户每输入一个字符就立刻发起一条网络请求,这是非常不合理的设计。

原因很简单,网络请求是不可能做到无延时响应的,而用户的打字速度通常都比较快。如果用户每输入一个字符都立刻发起一条网络请求,那么很有可能用户输完了第3个字符之后,对应第1个字符的网络请求结果才刚刚返回回来,而此时的数据早已是无效数据了。

正确的做法是,我们不应该在用户每输入一个字符时就立刻发起一条网络请求,而是应该在一定时间的停顿之后再发起网络请求。如果用户停顿了一段时间,很有可能说明用户已经结束了快速输入,开始期待一些搜索建议了。这样就能有效地避免发起无效网络请求的情况。

而要实现这种功能,使用debounce操作符函数就会非常简单,它就是为了这种场景而设计的。

还是通过代码来举例演示:

fun main() {runBlocking {flow {emit(1)emit(2)delay(600)emit(3)delay(100)emit(4)delay(100)emit(5)}.debounce(500).collect {println(it)}}
}

可以看到,我们调用了debounce函数,并且传入了500作为参数,意义就是说只有两条数据之间的间隔超过500毫秒才能发送成功。

这里使用emit()函数依次发送了1、2、3、4、5这几条数据。其中,1和2是连续发送的,2和3之间存在600毫秒的间隔,因此2可以发送成功。3和4之间、4和5之间间隔只有100毫秒,因此都无法发送成功。5由于是最后一条数据,因此可以发送成功。

那么打印结果应该是2和5:

在这里插入图片描述

这就是debounce操作符函数的用法了。


5. sample

sample操作符函数和debounce稍微有点类似,它们的用法也比较接近,同样都是接收一个时间参数。

sample是采样的意思,也就是说,它可以从flow的数据流当中按照一定的时间间隔来采样某一条数据。

这个函数在某些源数据量很大,但我们又只需展示少量数据的时候比较有用。

比如说视频网站的弹幕功能,理论上来说每个时间点用户发送的弹幕数量可以是无限多的,但是视频网站又不可能把每条弹幕都展示出来,不然的话弹幕和视频都没法看了。

因此,这个时候就需要对数据进行采样。

我们来模拟一下这种场景,假设某个视频的播放量很大,每时每刻都有无数人在发送弹幕,但是我们每秒钟最多只允许显示1条弹幕,代码就可以这样写:

fun main() {runBlocking {flow {while (true) {emit("发送一条弹幕")}}.sample(1000).flowOn(Dispatchers.IO).collect {println(it)}}
}

这里我们在flow构建函数中写了一个死循环,不断地在发送弹幕,那么这个弹幕的发送量无疑是巨大的。

而接下来我们借助sample函数进行数据采集,每秒钟只取一条弹幕,这样就轻松满足了前面说的弹幕采样率的要求。

另外还有一点需要注意,由于flow是通过死循环不断发送的,我们必须调用flowOn函数,让它在IO线程里去执行,否则我们的主线程就一直被卡死了。

接下来运行一下程序来看看结果吧:

在这里插入图片描述

可以看到,虽然弹幕的发送量无限大,但是我们每秒钟只会打印出一条弹幕,这就是sample操作符函数的用法了。


6. reduce

刚才学习的几个操作符函数最终都还是要通过collect函数来收集结果的。

接下来我们学习两个不需要借助collect函数,自己就能终结整个flow流程的操作符函数,这种操作符函数也被称为终端操作符函数。

首先来看reduce函数。我个人认为reduce函数还是比较好理解的,它的基本公式如下:

flow.reduce { acc, value -> acc + value }

其中acc是累积值的意思,value则是当前值的意思。

也就是说,reduce函数会通过参数给我们一个Flow的累积值和一个Flow的当前值,我们可以在函数体中对它们进行一定的运算,运算的结果会作为下一个累积值继续传递到reduce函数当中。

举一个更加具体点的例子,我们上学时学等差数列都会讲这个故事,高斯的老师让全班同学计算从1加到100的结果。

今天我们不需要借助等差数列,只需要借助reduce函数就可以立刻算出结果了:

fun main() {runBlocking {val result = flow {for (i in (1..100)) {emit(i)}}.reduce { acc, value -> acc + value}println(result)}
}

这里需要注意的是,reduce函数是一个终端操作符函数,它的后面不可以再接其他操作符函数了,而是只能获取最终的运行结果。

那么运算结果毫无疑问是5050:

在这里插入图片描述

7. fold

fold函数和reduce函数基本上是完全类似的,它也是一个终端操作符函数。

主要的区别在于,fold函数需要传入一个初始值,这个初始值会作为首个累积值被传递到fold的函数体当中,它的基本公式如下:

flow.fold(initial) { acc, value -> acc + value }

总体区别就这么多,所以我感觉fold函数并没有什么好讲的,它和reduce函数具体用谁只取决于你编程时的业务逻辑需求。

但是其实reduce函数和fold函数并不是只能用作数值计算,相反它们可以作用于任何类型的数据。因此,这里我就用fold函数来演示一个字符串拼接的功能吧:

fun main() {runBlocking {val result = flow {for (i in ('A'..'Z')) {emit(i.toString())}}.fold("Alphabet: ") { acc, value -> acc + value}println(result)}
}

这里我们将字母A-Z进行了拼接,另外fold函数要求传入一个初始值,那么我们就再添加一个Alphabet的头部,打印结果如下所示:

在这里插入图片描述


8. flatMapConcat

操作符函数到了flatMap系列,难度就开始骤升了。

其实为了搞明白这几个flatMap操作符函数的用法,我也去参考了不少资料,但是基本上没有哪个能够讲得非常简单易懂的,因此我自己也学得很吃力。

但是没办法,本来它们就是有一定难度的操作符函数,再好的文笔也不可能将这些难度全部抹平了。

那么这里我就按照我的方式,用尽可能简单的讲解和代码演示来向大家介绍flatMap系列操作符函数的用法。

这里我一直在说系列,是因为目前一共有3种以flatMap开头的操作符函数,分别是flatMapConcat、flatMapMerge和flatMapLatest。

这3种操作符函数我们都会介绍,先从flatMapConcat开始。

前面我们所学的所有内容都是在一个flow上进行操作,而从flatMap开始,要上升到对两个flow进行操作了。

flatMap的核心,就是将两个flow中的数据进行映射、合并、压平成一个flow,最后再进行输出。

举例讲解可能会更加容易理解一些,观察如下代码:

fun main() {runBlocking {flowOf(1, 2, 3).flatMapConcat {flowOf("a$it", "b$it")}.collect {println(it)}}
}

这里的第一个flow会依次发送1、2、3这几个数据。

然后在flatMapConcat函数中,我们传入了第二个flow。

第二个flow会依次发送a、b这两个数据,但是在a、b的后面又各自拼接了一个it。

这个it就是来自第一个flow中的数据。所以,flow1中的1、2、3会依次与flow2中的a、b进行组合,这样就能组合出a1、b1、a2、b2、a3、b3这样几条数据。

而collect函数最终收集到的就是这些组合后的数据。

验证一下,打印结果如下所示:

在这里插入图片描述

这样我们就弄明白示例中flatMapConcat函数的用法了,但是在实际的业务中,这个函数又有什么具体的应用场景呢?

不知道你有没有遇到过这样的情况,请求一个网络资源时需要依赖于先去请求另外一个网络资源。

比如说我们想要获取用户的数据,但是获取用户数据必须要有token授权信息才行,因此我们得先发起一个请求去获取token信息,然后再发起另一个请求去获取用户数据。

这种两个网络请求之间存在依赖关系的代码其实挺不好写的,稍微一不注意就可能会陷入嵌套地狱:

public void getUserInfo() {sendGetTokenRequest(new Callback() {@Overridepublic void result(String token) {sendGetUserInfoRequest(token, new Callback() {@Overridepublic void result(String userInfo) {// handle with userInfo}});}});
}

可以看出来,网终请求代码由于需要开线程执行,然后在回调中获取结果,通常会嵌套得比较深。

而这个问题我们就可以借助flatMapConcat函数来解决。

假设我们将sendGetTokenRequest()函数和sendGetUserInfoRequest()函数都使用flow的写法进行改造:

fun sendGetTokenRequest(): Flow<String> = flow {// send request to get tokenemit(token)
}fun sendGetUserInfoRequest(token: String): Flow<String> = flow {// send request with token to get user infoemit(userInfo)
}

那么接下来就可以用flatMapConcat函数将它们串连成一条链式执行的任务了:

fun main() {runBlocking {sendGetTokenRequest().flatMapConcat { token ->sendGetUserInfoRequest(token)}.flowOn(Dispatchers.IO).collect { userInfo ->println(userInfo)}}
}

当然,这个用法并不仅限于只能将两个flow串连成一条链式任务,如果你有更多的任务需要串到这同一条链上,只需要不断连缀flatMapConcat即可:

fun main() {runBlocking {flow1.flatMapConcat { flow2 }.flatMapConcat { flow3 }.flatMapConcat { flow4 }.collect { userInfo ->println(userInfo)}}
}

可以看到,这种写法,不管串连多少任务,都可以用完全平级的写法搞定,完全不会遇到之前嵌套地狱的困扰。

不知道我这样讲解flatMapConcat函数,是不是已经比较清楚了?


9. flatMapMerge

理解了flatMapConcat函数,再来看flatMapMerge函数会比较容易一些。

很多人觉得flatMap这几个操作符函数难以理解,其中一个原因就是,不管代码怎么写,flatMapConcat和flatMapMerge的效果好像都是一样的。

没错,如果只是用我们上面学习的代码示例,你会发现不管是用flatMapConcat还是flatMapMerge,最终的结果都是相同的。

这两个函数最主要的区别其实就在字面上。concat是连接的意思,merge是合并的意思。连接一定会保证数据是按照原有的顺序连接起来的,而合并则只保证将数据合并到一起,并不会保证顺序。

因此,flatMapMerge函数的内部是启用并发来进行数据处理的,它不会保证最终结果的顺序。

当然,刚才我们所使用的示例并不能演示出这种场景,下面我来对代码稍微进行一下改造:

fun main() {runBlocking {flowOf(300, 200, 100).flatMapConcat {flow {delay(it.toLong())emit("a$it")emit("b$it")}}.collect {println(it)}}
}

变化主要在于,我将第一个flow发送的数据改成了300、200、100。然后第二个flow中,在发送数据之前,我们要先去delay相对应的毫秒数。

现在运行的结果你觉得会是什么样子呢?

我们来看看吧:

在这里插入图片描述

可以看到,最终的结果仍然是按照flow1中数据发送的顺序输出的,即使第一个数据被delay了300毫秒,后面的数据也没有优先执行权。这就是flatMapConcat函数所代表的涵义。

而到了这里,flatMapMerge函数的区别也就呼之欲出了,它是可以并发着去处理数据的,而并不保证顺序。那么哪条数据被delay的时间更短,它就可以更优先地得到处理。

将flatMapConcat函数替换成flatMapMerge函数,如下所示:

fun main() {runBlocking {flowOf(300, 200, 100).flatMapMerge {flow {delay(it.toLong())emit("a$it")emit("b$it")}}.collect {println(it)}}
}

现在重新运行一下程序:

在这里插入图片描述

这两个操作符函数的区别,相信你已经掌握了吧?


10. flatMapLatest

终于到了flatMap系列的最后一个操作符函数。

掌握了前面两个flatMap函数,再来看flatMapLatest函数就不会觉得很难了。

它的作用和其他两个flatMap函数都是类似的,也是把两个flow合并、压平成一个flow。它的行为,和我们在 Kotlin Flow响应式编程,基础知识入门 这篇文章中学到的collectLatest函数是比较接近的。

因此我们把这几个知识点稍微融合一下就能理解flatMapLatest函数了。

先来回顾一下collectLatest函数的特性,它只接收处理最新的数据。如果有新数据到来了而前一个数据还没有处理完,则会将前一个数据剩余的处理逻辑全部取消。

flatMapLatest函数也是类似的,flow1中的数据传递到flow2中会立刻进行处理,但如果flow1中的下一个数据要发送了,而flow2中上一个数据还没处理完,则会直接将剩余逻辑取消掉,开始处理最新的数据。

我们还是通过一段代码来演示一下吧:

fun main() {runBlocking {flow {emit(1)delay(150)emit(2)delay(50)emit(3)}.flatMapLatest {flow {delay(100)emit("$it")}}.collect {println(it)}}
}

这里我们在flow1中依次发送了1、2、3这几条数据。其中,1和2之间间隔了150毫秒,2和3之间间隔了50毫秒。

而在flow2中,每次处理数据需要消耗100毫秒。

那么由此我们可以分析出,当flow1中的第2条数据发送过来时,flow2中的第1条数据肯定已经处理完了。但是当flow1中的第3条数据发送过来时,flow2中的第2条数据并没有处理完。那么根据collectLatest函数的规则,这条数据的剩余处理逻辑会被取消掉。因此,2不会被打印出来。

最终我们看到的打印结果应该是1和3:

在这里插入图片描述

flatMap操作符函数系列到此结束。


11. zip

和flatMap函数有点类似,zip函数也是作用在两个flow上的。不过,它们的适用场景完全不同。

使用zip连接的两个flow,它们之间是并行的运行关系。这点和flatMap差别很大,因为flatMap的运行方式是一个flow中的数据流向另外一个flow,是串行的关系。

我们先来看一下zip函数的基本用法:

fun main() {runBlocking {val flow1 = flowOf("a", "b", "c")val flow2 = flowOf(1, 2, 3, 4, 5)flow1.zip(flow2) { a, b ->a + b}.collect {println(it)}}
}

这里使用zip函数连接了两个flow,并且在zip的函数体中将两个flow中的数据进行了拼接。

但是需要注意的是,这两个flow中的数据量并不相同,第一个flow中有3个数据,第二个flow中有5个数据。

那么zip函数的规则是,只要其中一个flow中的数据全部处理结束就会终止运行,剩余未处理的数据将不会得到处理。因此,flow2中的4和5这两个数据会被舍弃掉。

运行一下程序,结果如下图所示:

在这里插入图片描述

但是我们又如何证明flow1和flow2之间是并行运行的呢?

这里我稍微对代码进行一下改造,把运行时间打印出来就知道了:

fun main() {runBlocking {val start = System.currentTimeMillis()val flow1 = flow {delay(3000)emit("a")}val flow2 = flow {delay(2000)emit(1)}flow1.zip(flow2) { a, b ->a + b}.collect {val end = System.currentTimeMillis()println("Time cost: ${end - start}ms")}}
}

可以看到,我们在flow1中delay了3秒钟,flow2中delay了2秒钟。

如果它们之间是串行关系的话,那么最终的总耗时一定是5秒以上。

现在重新运行一下程序,看一看结果如何:

在这里插入图片描述

结果是3036毫秒。由此可以证明flow1和flow2之间是并行的关系,最终的总耗时取决于运行耗时更久的那个flow。

那么zip函数具体又有什么应用场景呢?

想象一下如下需求,我们正在开发一个天气预报应用,需要去一个接口请求当前实时的天气信息,还需要去另一个接口请求未来7天的天气信息。

这两个接口之间并没有先后依赖关系,但是却需要两个接口同时返回数据之后再将天气信息展示给用户。

如果我们先去请求当前实时的天气信息,等得到数据响应之后再去请求未来7天的天气信息,那效率就会比较低了,因为这两件事情很明显可以同时去做。

而zip函数就刚好完美贴合这种应用场景,使用zip函数模拟上述场景的代码示例如下:

fun sendRealtimeWeatherRequest(): Flow<String> = flow {// send request to realtime weatheremit(realtimeWeather)
}fun sendSevenDaysWeatherRequest(): Flow<String> = flow {// send request to get 7 days weatheremit(sevenDaysWeather)
}fun main() {runBlocking {sendRealtimeWeatherRequest().zip(sendSevenDaysWeatherRequest()) { realtimeWeather, sevenDaysWeather ->weather.realtimeWeather = realtimeWeatherweather.sevenDaysWeather = sevenDaysWeatherweather}.collect { weather ->}}
}

有没有感觉用这种扁平式的代码去写多并发请求实在是太优雅了。

另外,假如你要同时去请求的接口数并不是两个,而是更多的话,zip函数也可以做到。

比方说,这里我们还需要再去另外一个接口请求天气信息的背景图,3个请求同时并发处理,代码就可以这样写:

fun sendRealtimeWeatherRequest(): Flow<String> = flow {// send request to realtime weatheremit(realtimeWeather)
}fun sendSevenDaysWeatherRequest(): Flow<String> = flow {// send request to get 7 days weatheremit(sevenDaysWeather)
}fun sendWeatherBackgroundImageRequest(): Flow<String> = flow {// send request to get weather background imageemit(weatherBackgroundImage)
}fun main() {runBlocking {sendRealtimeWeatherRequest().zip(sendSevenDaysWeatherRequest()) { realtimeWeather, sevenDaysWeather ->weather.realtimeWeather = realtimeWeatherweather.sevenDaysWeather = sevenDaysWeatherweather}.zip(sendWeatherBackgroundImageRequest()) { weather, weatherBackgroundImage ->weather.weatherBackgroundImage = weatherBackgroundImageweather}.collect { weather ->}}
}

以此类推,不管你有多少个请求需要并发着去处理,zip函数都是可以搞定的。并且还能做到完全扁平式地处理,没有任何层级嵌套。

关于zip操作符函数就介绍到这里。


12. buffer

buffer又是一个不太容易理解的操作符函数。

我个人觉得buffer函数和我们上篇文章学到的collectLatest函数,以及接下来要学习的conflate函数,可以并称为背压三剑客(我瞎起的名字)。

我也不知道该叫什么名字更好一些,但是它们三个处理的问题都是类似的,那就是解决Flow流速不均匀的问题。

所谓流速不均匀问题,指的就是Flow上游发送数据的速度和Flow下游处理数据的速度不匹配,从而可能引发的一系列问题。

而这3个函数处理这类问题的方式以及适用的场景都各不相同,所以我们必须将每种函数的功能和差异都搞清楚,再根据对应的场景选择合适的处理方式。

首先我们来看一下如下代码:

fun main() {runBlocking {flow {emit(1);delay(1000);emit(2);delay(1000);emit(3);}.onEach {println("$it is ready")}.collect {delay(1000)println("$it is handled")}}
}

这里在flow当中依次发送了1、2、3这几条数据,每条数据的发送间隔是1秒,然后通过onEach函数将它们都打印出来。

在collect函数中,我们会逐个对这些数据进行处理,每条数据的处理耗时也是1秒。

运行一下程序,效果如下图所示:

在这里插入图片描述

每条数据都依次被打印出来了,效果看上去非常不错。

但是,不知道你有没有发现一个细节,这里每条数据都是要耗费2秒时长才能处理完。没有发现这个细节的请再仔细观察一下上图的输出。

发现这个细节之后你就发现了一个惊天秘密,collect函数中的数据处理是会对flow函数中的数据发送产生影响的。

因为默认情况下,collect函数和flow函数会运行在同一个协程当中,因此collect函数中的代码没有执行完,flow函数中的代码也会被挂起等待。

也就是说,我们在collect函数中处理数据需要花费1秒,flow函数同样就要等待1秒。collect函数处理完成数据之后,flow函数恢复运行,发现又要等待1秒,这样2秒钟就过去了才能发送下一条数据。

这种行为是不是你想要的呢?或许是,或许不是。

如果不是的话,没关系,Kotlin Flow还提供了另外一种你想要的行为,借助buffer函数就能实现。

我们略微改造一下上述代码:

fun main() {runBlocking {flow {emit(1);delay(1000);emit(2);delay(1000);emit(3);}.onEach {println("$it is ready")}.buffer().collect {delay(1000)println("$it is handled")}}
}

可以看到,这里只是在Flow链式调用上串接了一个buffer函数。

buffer函数会让flow函数和collect函数运行在不同的协程当中,这样flow中的数据发送就不会受collect函数的影响了。

现在重新运行一下程序,效果如下图所示:

在这里插入图片描述

现在打印的顺序有点乱,但是没关系,乱才是正常的。

因为有了buffer的存在,数据发送和数据处理之间变得互不干扰了。

buffer函数其实就是一种背压的处理策略,它提供了一份缓存区,当Flow数据流速不均匀的时候,使用这份缓存区来保证程序运行效率。

flow函数只管发送自己的数据,它不需要关心数据有没有被处理,反正都缓存在buffer当中。

而collect函数只需要一直从buffer中获取数据来进行处理就可以了。

但是,如果流速不均匀问题持续放大,缓存区的内容越来越多时又该怎么办呢?

这个时候,我们又需要引入一种新的策略了,来适当地丢弃一些数据。

那么进入到本篇文章的最后一个操作符函数:conflate。


13. conflate

buffer函数最大的问题在于,不管怎样调整它缓冲区的大小(buffer函数可以通过传入参数来指定缓冲区大小),都无法完全地保障程度的运行效果。究其原因,主要还是因为buffer函数不会丢弃数据。

而在某些场景下,我们可能并不需要保留所有的数据。

比如拿股票软件举例,服务器端会将股票价格的实时数据源源不断地发送到客户端这边,而客户端这边只需要永远显示最新的股票价格即可,将过时的数据展示给用户是没有意义的。

因此,这种场景下使用buffer函数来提升运行效率就完全不合理,它会缓存太多完全没有必要保留的数据。

那么针对这种场景,其中一个可选的方案就是借助我们在上篇文章中学习的collectLatest函数。

它的特性是,只接收处理最新的数据,如果有新数据到来了而前一个数据还没有处理完,则会将前一个数据剩余的处理逻辑全部取消。

我们通过以下例子来验证这个函数的特性:

fun main() {runBlocking {flow {var count = 0while (true) {emit(count)delay(1000)count++}}.collectLatest {println("start handle $it")delay(2000)println("finish handle $it")}}
}

可以看到,这里我们在flow函数体当中编写了一个while循环,每隔1秒钟就会发送一条数据。

接下来通过collectLatest函数来接收和处理数据。但是注意,collectLatest函数中每处理一条数据都需要耗费2秒钟时间。

流速不均匀问题就此产生。

那么collectLatest函数会彼处理呢?我们来观察一下运行结果吧:

在这里插入图片描述

通过日志打印我们发现,每条数据都是有输出的,但是每条数据都只输出了start部分,而finish部分则都没有输出。

这就充分说明了collectLatest函数的特性,当有新数据到来时而前一个数据还没有处理完,则会将前一个数据剩余的处理逻辑全部取消。

所以,finish部分的日志是永远得不到输出的。

对于这种行为结果,我个人认为是有点反直觉的。我的第一直觉是,当前正在处理的数据无论如何都应该处理完,然后准备去处理下一条数据时,直接处理最新的数据即可,中间的数据就都可以丢弃掉了。

如果这也正是你所期望的行为的话,那么恭喜你,conflate函数就是用来做这件事的。

我们稍微对以上代码进行一些修改:

fun main() {runBlocking {flow {var count = 0while (true) {emit(count)delay(1000)count++}}.conflate().collect {println("start handle $it")delay(2000)println("finish handle $it")}}
}

改动并不大,主要就是将collectLatest函数改回了collect函数,然后中间添加了一个conflate函数。

现在重新运行一下程序,效果如下图所示:

在这里插入图片描述

可以看到,现在start日志和finish日志就会结对输出了。

但是,有些数据则被完全丢弃掉了,比如说0、2、4都没有输出。因为当上一条数据处理完时,又有更新的数据发送过来了,那么这些过期的数据就会被直接舍弃。

到这里,conflate操作符函数也就全部讲完了。

好长的一篇文章,而且可能也是很难的一篇文章。主要是因为我在学习这些知识点的时候觉得很难,不知道大家看完之后感受如何。

大家如果觉得不难的话,那我就当做是一种褒奖吧,毕竟一直都有人说我写的文章就是好理解。

好了,这样我们Flow三部曲的第二篇也讲完了,下一篇将会是终结篇,讲一讲StateFlow和SharedFlow的用法。

敬请期待。


如果想要学习Kotlin和最新的Android知识,可以参考我的新书 《第一行代码 第3版》,点击此处查看详情。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/60765.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如果再来一次,你还会选择互联网么?

现在互联网的就业环境&#xff0c;大家都在感受着一股寒意。也不知道从什么时候开始&#xff0c;身边悲观的声音越来越多了。 如果再给你一次机会&#xff0c;你还会选择互联网吗&#xff1f; 回答这个问题之前&#xff0c;我想跟大家聊聊一个我朋友的故事。 他从学渣到大厂程…

奉劝各位学弟学妹们,该打造你的技术影响力了!

CSDN 的学弟学妹们&#xff0c;大家好呀&#xff0c;我是沉默王二。放在一年前&#xff0c;打死我也不相信&#xff0c;这四个平淡无奇的字组合在一起竟然充满了魔力&#xff01; 2019 年的时候&#xff0c;我看过一本书&#xff0c;名叫《影响力》&#xff0c;应该有不少学弟…

文笔如何迅速提升?国企老员工和你聊聊

很多人说在我们国企混&#xff0c;人际交往能力很重要&#xff0c;然而还有一项能力和人际交往能力一样重要&#xff0c;那就是——文笔。 文笔好有哪些好处&#xff1f; 1、工作总结汇报的时候&#xff0c;能让自己的工作看起来更有价值 2、在公司的一些活动比赛中&#xff…

刘宇凡:浅谈文笔、文采、文案的区别

声明:刘宇凡不是这方面的专家,我只是作为一个喜爱文字的搬运工谈谈自己的体会罢了。 此文的缘起,是一位微信好友的问题。本来当时就该回答的,但是一直拖到现在才以文章的形式作答,实在是对不住,也许你已经通过其它渠道了解到其本质与区别。但还是希望我的见解能够给你不…

5G与AI“点燃”数字应用,云原生重塑数据库未来

进入2023年&#xff0c;5G与AI技术逼近爆发的临界点&#xff0c;即将点燃数字应用的宇宙。以ChatGPT为代表的AI大模型&#xff0c;引发了机器智能的涌现&#xff0c;不仅能够与人类进行创造性对话&#xff0c;还能代替程序员编写代码&#xff01;而5G全面加速了物理世界的数字化…

下一代数据中心怎么建?数据中心能源十大趋势揭秘答案

国家发改委的信息显示&#xff0c;当前&#xff0c;“东数西算”工程的8个国家算力枢纽节点建设已全部开工&#xff0c;2023年新开工的数据中心项目近70个&#xff0c;“东数西算”工程正从系统布局进入全面建设阶段。可以预见&#xff0c;新一轮的数据中心建设大潮将蓬勃兴起。…

全网最详细中英文ChatGPT-GPT-4示例文档-从0到1快速入门自然语言指令创建支付API代码——官网推荐的48种最佳应用场景(附python/node.js/curl命令源代码,小白也能学)

从0到1快速入门自然语言指令创建支付API代码 Introduce 简介setting 设置Prompt 提示Sample response 回复样本API request 接口请求python接口请求示例node.js接口请求示例curl命令示例json格式示例 其它资料下载 ChatGPT是目前最先进的AI聊天机器人&#xff0c;它能够理解图片…

大模型与端到端会成为城市自动驾驶新范式吗?

摘要&#xff1a; 最近可以明显看到或者感受到第一梯队的城市自动驾驶量产已经进入快车道&#xff0c;他们背后所依靠的正是当下最热的大模型和端到端的技术。 近期&#xff0c;城市自动驾驶量产在产品和技术上都出现了新的变化。 在产品层面&#xff0c;出现了记性行车或者称…

跳妹儿读绘本:我家孩子爱不释手的经典绘本之套装书

上篇分享了我给跳妹儿买书的经验&#xff0c;这里我就来说说这些跳妹儿非常喜欢并点读率高的绘本。 今天分享的主要是套装书&#xff0c;套装书的优点我在之前的文章中有提到过&#xff0c;而且迄今为止我买回来的套装书&#xff0c;每一套我家跳妹儿都非常喜欢&#xff0c;读了…

儿童绘本

亲情类绘本&#xff1a; 1、猜猜我有多爱你 适合年龄段&#xff1a;2-4岁 2、爱心树 适合年龄段&#xff1a;3-6岁 3、巴巴爸爸 适合年龄段&#xff1a;3-6岁 情商教育类绘本&#xff1a; 随着社会的发展&#xff0c;对宝宝 的教育&#xff0c;已经不仅仅关注与脑力开…

合格的CTO应该是什么样?雷军王海峰王小川等共谈「技术创新」| CNCC2020

金磊 发自 CNCC现场 量子位 报道 | 公众号 QbitAI 企业在社会中的分量有多重&#xff1f; 从17世纪到20世纪70年代&#xff0c;改变人类生活的160种主流创新工业&#xff0c;80%以上是由公司来完成。 今天&#xff0c;全世界70%的专业和三分之二的研究的开发经费&#xff0c;都…

倔强的王小川,倔强的技术人

从上市到退市&#xff0c;搜狗经历了近4年时间&#xff0c;搜狗也从搜狐旗下投入到腾讯怀抱。 搜狗CEO王小川今日发布内部邮件&#xff0c;称从今天开始&#xff0c;搜狗融入腾讯大家庭&#xff0c;向新而行&#xff0c;共赴山海。 王小川指出&#xff0c;根据383天前的约定&am…

中年王小川:成就了搜狗,但终究错过了新时代

站在敦煌千年壁画前&#xff0c;中年王小川不由得发出“是非成败转头空”的叹息。曾承载过半生夙愿的搜狗&#xff0c;最终陨落成企鹅帝国中的几片瓦砾。 如今43岁的王小川&#xff0c;开始相信命运&#xff0c;偶尔也会伤感。 在办公室的书架上&#xff0c;王小川开始摆放《…

搜狗员工吐槽“统计加班时长裁人”,CEO 王小川破口大骂:“赶快滚”

作者 | 伍杏玲 出品 | 程序人生&#xff08;ID&#xff1a;coder_life&#xff09; 之前在报道996.ICU时&#xff0c;有粉丝担心道&#xff0c;如果被老板看到自己的发声&#xff0c;会不会有掉饭碗的危险&#xff1f; 最近搜狗员工用真实经历验证了这个问题&#xff1a;是会…

天才少年王小川,18年青春“喂了狗”

文丨惊蛰研究所&#xff0c;作者丨路涵 2003年&#xff0c;随着张朝阳的一声令下&#xff0c;刚刚从清华大学毕业的王小川挂帅组建搜狐研发中心&#xff0c;开发搜索引擎&#xff0c;开始了他和搜狗的前半生。18年后&#xff0c;随着搜狗公司发布公告宣布与腾讯完成合并&#…

视频:搜狗CEO王小川终于把区块链讲通透了

看完视频终于可以畅快地和小伙伴们聊聊区块链了 目前从区块链的最大应用来看&#xff0c;各种虚拟货币发行和闹剧&#xff0c;让人逐渐意识到这只是一场大佬割韭菜的游戏&#xff1f;对于币圈的现状&#xff0c;各位有什么看法&#xff1f;欢迎留言交流~ 程序员的一天 小视频 1…

巴比特 | 元宇宙每日必读:“中国需要自己的OpenAI”,王小川官宣新公司百川智能,争取年内发布国内最好的大模型和颠覆性的产品...

摘要&#xff1a;据新智元报道&#xff0c;4 月 10 日&#xff0c;搜狗创始人王小川正式对外宣布开启人工智能大模型领域创业。王小川与前搜狗 COO 茹立云联合成立了人工智能公司百川智能&#xff0c;旨在打造中国版的 OpenAI 基础大模型及颠覆性上层应用。目前已初步组建了50人…

搜狗王小川“吐槽”李彦宏,称其活在平行宇宙

&#x1f447;&#x1f447;关注后回复 “进群” &#xff0c;拉你进程序员交流群&#x1f447;&#x1f447; 来源丨 扩展迷EXTFANS https://mp.weixin.qq.com/s/YtYDD93qw6c3gwH8h6bi_w 2023年&#xff0c;AI“狂飙”&#xff0c;ChatGPT一经问世&#xff0c;就掀起了新一轮A…

王小川想走张一鸣和黄峥的老路,但终点不同

互联网大佬的终极追求是生命科学&#xff1f; 生命科学&#xff0c;这扇未打开的生命本源大门&#xff0c;为何有如此大的魔力吸引着一众互联网大佬&#xff1f;我们统计分析后发现&#xff0c;这并非是一次突然的决策&#xff0c;也不是一个人的选择&#xff0c;背后与他们的生…

王小川官宣大模型创业!5000万美元启动资金,年中发布首个产品,目前在训500亿参数版本...

梦晨 发自 凹非寺量子位 | 公众号 QbitAI 搜狗创始人王小川&#xff0c;正式官宣入场大模型创业&#xff1a; 有信心在年底做出中国最好的大语言模型。 新公司百川智能&#xff0c;前搜狗COO茹立云是已经对外公布的合伙人。 团队方面&#xff0c;以前搜狗团队为基础&#xff0c…