9、异常相关操作符
9.1 try-catch 和 Flow 的异常可见性
Kotlin 官方似乎并不鼓励在 Flow 中通过 try-catch 来捕获异常,而是推荐使用 catch 操作符。
那么这里我们有必要再次对异常管理的观点进行阐述。
我们所说的“异常管理”,在大部分情况下,管理的都是已知异常。对未知异常的管理通常是交给 UncaughtExceptionHandler 来做一个“兜底”的处理。
所谓的已知异常,不是“我知道这里一定会发生异常”,一定会发生的那就不是异常了。异常本来就是正常流程之外的例外流程。对于一个业务而言,异常是除了完美情况下会走的那条通路之外,还可能会走的别的通路。比如网络请求连接超时会收到 TimeoutException,针对这些异常可以去做专门的处理,让整个大流程在发生异常之后,依然可以“正常地”往下走。“正常地”是指,在发生异常时按照设定好的路线继续执行下去。比如连接超时了那就重试或者给用户报错,或者在重试失败后再给用户报错。正确地处理已知的异常,也可以视为“正常”。
为了进一步说明 try-catch 的情况,我们先建立一个在 Flow 中进行网络请求,但是发生超时异常的伪代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)// 模拟上游发送数据val flow = flow {emit(1)emit(2)emit(3)}val job = scope.launch {flow.collect {// 如果网络连接超时,有可能抛出 TimeoutExceptiongitHub.contributors("square", "retrofit")}}job.join()
}
如果只针对某一条数据的异常进行处理,可以在 collect() 内添加 try-catch:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)// 模拟上游发送数据val flow = flow {emit(1)emit(2)emit(3)}val job = scope.launch {flow.collect {// 对可能发生异常的代码 try-catch,这属于是对 Flow 中的每一条数据进行检查val contributors = try {gitHub.contributors("square", "retrofit")} catch (e: TimeoutException) {// 我们用返回字符串模拟异常处理过程"Handle Network error"}println("Contributors: $contributors")}}job.join()
}
也可以对整个 collect() 加 try-catch:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)// 模拟上游发送数据val flow = flow {emit(1)emit(2)emit(3)}val job = scope.launch {// 对整个 collect() try-catch,进行整体检查try {flow.collect {val contributors = gitHub.contributors("square", "retrofit")println("Contributors: $contributors")}} catch (e: TimeoutException) {// 模拟异常处理,实际中可以进行重试或者通知用户println("Handle Network error")}}job.join()
}
上述两种情况,在发生 TimeoutException 时都会进入 catch 对异常的处理流程中。
现在,我在使用 flow 发送数据时加 try-catch,注意本意是要包住获取数据可能用到的数据库或网络操作,“顺便”包上了 emit():
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)// 模拟上游发送数据val flow = flow {try {for (i in 1..5) {// 生产数据的过程可能包含读数据库、网络请求等操作,因此需要加 try-catch,// 只有正常获取数据之后,才能通过 emit() 在最后发送这些数据emit(i)}} catch (e: TimeoutException) {println("Error in flow(): $e")}}val job = scope.launch {try {flow.collect {// 对可能发生异常的代码 try-catch,这属于是对 Flow 中的每一条数据进行检查val contributors = gitHub.contributors("square", "retrofit")throw TimeoutException()}} catch (e: TimeoutException) {// 我们用返回字符串模拟异常处理过程println("Handle Network error")}}job.join()
}
运行结果:
Error in flow(): java.util.concurrent.TimeoutException
可以看到,下游抛出的异常,在上游被捕获了,这不是一个合理的结果。
之所以会造成这种现象,要追溯一下源码。collect() 的大括号的内容并不是 collect() 本身,而是指定 collect() 的参数 FlowCollector 接口函数 emit() 的内容:
public interface Flow<out T> {public suspend fun collect(collector: FlowCollector<T>)
}public fun interface FlowCollector<in T> {public suspend fun emit(value: T)
}
完整形式相当于:
flow.collect(object : FlowCollector<Int> {override suspend fun emit(value: Int) {// collect 后 lambda 表达式的内容}
})
而这个内容也就是在上游发送数据调用 emit() 时被执行的。所以当上游在 flow 内添加了 try-catch 进行异常捕获时,如果发生了异常,上游的 try-catch 会先于下游在 collect() 之外的 try-catch 捕获到异常。
这样就会产生一个问题:上游的 try-catch 虽然本意是要捕获数据获取过程中的异常,但是由于它包住了 emit(),所以顺便也把下游的数据消费过程的异常也给拦截了。本该由下游捕获的异常被上游拦截了,假如上游和下游的代码是两个程序员写的,那么下游发生异常的信息没有在预期的位置抛出,不仅给调试带来了困难,同时也会因为上游并不关注下游的异常而忽视这个问题,埋下隐患。
协程里有一个 Exception Transparency 的概念,译为异常可见性,说的是上游的 Flow 不应该吞掉下游的异常。因此,上游在使用 try-catch 时,不应该包住 emit():
val flow = flow {for (i in 1..5) {// 让 try-catch 只包含获取数据操作,而不包住 emit()try {// 生产数据的过程可能包含读数据库、网络请求等操作...} catch (e: TimeoutException) {println("Error in flow(): $e")throw e}emit(i)}
}
或者使用原来的方式,但是在 catch 中将捕获到的异常原封不动的抛出,以便让下游可以捕获到这个异常:
val flow = flow {try {for (i in 1..5) {emit(i)}} catch (e: TimeoutException) {println("Error in flow(): $e")// 将原异常继续抛出,让下游捕获throw e}
}
两种做法都是为了保证异常的可见性,即为了保证开发者可以拿到他认为可以拿到的异常。
保证异常的可见性只是 Kotlin 对开发者提出的建议,因为保证异常可见性对开发者是有利的。但是遵守这个建议要靠开发者自己,虽然不遵守程序也能运行,但是开发者自己会不方便。
现在进行一下延伸,假如在 Flow 创建后,调用 collect() 之前,使用一些中间操作符,比如 map,考虑两个问题:
- 下游抛出的异常会经过 map 的代码块吗?
- map 代码块里抛出的异常最终会走向哪里?
捋一下代码流程:collect() 会触发它的直接上游 map() 进行数据创建,而 map() 只是一个中间商,它所生产的 Flow 会继续触发它的直接上游的 Flow 开始生产数据,也就是在头部的 Flow 中调用 emit() 发射数据。发射数据时,头部 Flow 发射的数据会经过 map() 进行转换处理,再调用其下游的 emit() 发射数据,也就是 collect 代码块的内容:
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->return@transform emit(transform(value))
}
这个 emit() 函数的内容,就是下游 collect 代码块的内容。因此 collect 代码块内抛出异常,会先到达 map() 内的 emit(),然后再向上传递到 map 的上游 Flow 的内。因此,两个问题也有了答案,下游抛出的异常会经过 map 函数(但注意不是 map 后面接的代码块,代码块指定的是参数上的 transform 如果对数据进行转换,而异常是从 map 内部调用的 emit() 抛出的),map 抛出的异常会向上游的 Flow 继续抛出。
假如在中间加一个 transform 操作符,由于 transform 内部也是通过显示调用 emit() 将数据发送到下游,假如对 transform 的 emit() 也加了 try-catch,那么与前面例子中的头部 Flow 一样,它也会拦截下游抛出的异常。因此,我们不能只关注起始的 Flow 中的 emit(),对于其他通过显式调用 emit() 发送数据的操作符,也要注意不要对 emit() 加 try-catch。
结论就一句话,别用 try-catch 包住 emit()。
9.2 catch 操作符
catch() 能捕获上游 Flow 的异常,不会捕获下游(代码块与各种操作符)异常。
catch() 的作用,相当于在 flow {…} 范围内加了 try-catch,但是捕获 flow {…} 内除了 emit() 以外代码抛出的异常。即 catch() 只会捕获上游异常,而不会捕获下游异常。
catch() 也不会捕获 CancellationException,因为这个异常就是用来取消协程的。
先看示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {for (i in 1..5) {emit(i)}}.catch { println("catch(): $it") }val job = scope.launch {try {flow.collect {val contributors = gitHub.contributors("square", "retrofit")throw TimeoutException()}} catch (e: TimeoutException) {println("Handle Network error")}}job.join()
}
运行结果:
Handle Network error
通过结果能看出这个异常是被下游捕获的,而不是 catch() 捕获的。假如,我在上游获取数据时发生了异常:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {for (i in 1..5) {// 模拟获取数据时发生的异常throw RuntimeException("flow() error")emit(i)}}.catch { println("catch(): $it") }val job = scope.launch {try {flow.collect {val contributors = gitHub.contributors("square", "retrofit")throw TimeoutException()}} catch (e: TimeoutException) {println("Handle Network error")}}job.join()
}
那么就只会由 catch() 捕获到这个上游异常:
catch(): java.lang.RuntimeException: flow() error
因为还没到 emit() 发送数据到下游这一步呢,就发生异常并被 catch() 捕获了。
如果有多个 catch(),那么每个 catch() 拿到的就是它上游的异常:第一个 catch() 拿到的是它到起始 Flow 之间发生的异常;第二个 catch() 拿到的是与第一个 catch() 之间发生的异常,以此类推……
什么时候用 catch()?如何在 catch() 与 try-catch 之间做选择?
二者有一个关键区别就是 try-catch 是在 Flow 里面工作的,而 catch() 是在 Flow 之后工作的。这就导致了,如果 Flow 内部发生异常,可以通过 try-catch 直接进行修复,让 Flow “活着”继续进行生产工作;但 catch() 就只能在 Flow 之后接管数据生产操作,因为只有 Flow 内部的异常没有被处理,而是被抛出了,才能流到 catch():
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)// 模拟上游发送数据val flow = flow {for (i in 1..5) {// 假设在 i = 3 时生产数据过程发生了异常,在 Flow 内部可以通过 try-catch 修复if (i == 3) {try {throw RuntimeException("flow() error")} catch (e: Exception) {emit(i)}} else {emit(i)}}}val job = scope.launch {try {flow.collect {println("Data: $it")}} catch (e: RuntimeException) {// 我们用返回字符串模拟异常处理过程println("Error")}}job.join()
}
假如在 Flow 内部通过 try-catch 修复异常,仍有可能会收到完整的数据:
Data: 1
Data: 2
Data: 3
Data: 4
Data: 5
但假如由于权限问题,我们不能修改 Flow 内部的代码,那么就只能通过 Flow 外接 catch() 来尝试接管数据生产流程:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)// 模拟上游发送数据val flow = flow {for (i in 1..5) {if (i == 3) {throw RuntimeException("flow() error")} else {emit(i)}}}.catch {println("flow() error")emit(100)emit(200)emit(300)}val job = scope.launch {try {flow.collect {println("Data: $it")}} catch (e: RuntimeException) {// 我们用返回字符串模拟异常处理过程println("Error")}}job.join()
}
这样下游能得到一部分原有数据,以及 catch() 接管生产的数据:
Data: 1
Data: 2
flow() error
Data: 100
Data: 200
Data: 300
因此,在二者选择的问题上,优先选择 try-catch,如果因为 Flow 的代码结构问题而无法选择 try-catch,才使用 catch()。结构问题具体是指没有权限修改 Flow 内部代码时,只能在 Flow 外接 catch() 来做一个无奈之下的接管。
但实际情况往往是,catch() 没有办法完美接管上游生产数据的逻辑,所以在 catch() 中通常只能做一些收尾工作。无缝、完美的接管通常是做不到的。
9.3 retry() 与 retryWhen()
retry() 与 retryWhen() 也是针对上游 Flow 的异常的,核心原理与 catch() 相同,区别在于,retry() 与 retryWhen() 在异常时重启上游 Flow。
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {for (i in 1..5) {if (i == 3) {throw RuntimeException("flow() error")} else {emit(i)}}}.map { it * 2 }.retry(2) { // it:Throwableit is RuntimeException}val job = scope.launch {try {flow.collect {println("Data: $it")}} catch (e: RuntimeException) {// 我们用返回字符串模拟异常处理过程println("Caught RuntimeException!")}}job.join()
}
运行结果:
Data: 2
Data: 4
Data: 2
Data: 4
Data: 2
Data: 4
Caught RuntimeException!
retry() 括号内的参数指的是重试的次数,由于上游在发出 1、2 两条数据后才抛出异常,再加上重试的两次,因此最终会有 3 组数据输出。
此外,retry() {…} 的大括号内指定的是一个 predicate 谓词条件,只有该条件为 true 时 retry() 才会进行重试。如果返回了 false,即使重试次数尚未完成,也不会继续重试,而是将异常向下游抛出。
retryWhen() 相当于是把 retry() 的两个参数合并到一起的版本:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {for (i in 1..5) {if (i == 3) {throw RuntimeException("flow() error")} else {emit(i)}}}.map {it * 2}.retryWhen { cause, attempt ->println("Exception cause: ${cause.message}, attempted time: $attempt")// 返回 Boolean 类型的重试条件,为 true 时才重试cause is RuntimeException && attempt <= 2}val job = scope.launch {try {flow.collect {println("Data: $it")}} catch (e: RuntimeException) {// 我们用返回字符串模拟异常处理过程println("Caught RuntimeException!")}}job.join()
}
运行结果:
Data: 2
Data: 4
Exception cause: flow() error, attempted time: 0
Data: 2
Data: 4
Exception cause: flow() error, attempted time: 1
Data: 2
Data: 4
Exception cause: flow() error, attempted time: 2
Data: 2
Data: 4
Exception cause: flow() error, attempted time: 3
Caught RuntimeException!
retryWhen() 的两个参数,cause 就是导致异常的 Throwable 对象,而 attempt 是已经尝试过的次数,异常第一次到达 retryWhen() 时,attempt 是 0。由于设置了 attempt <= 2 都可进行重试,因此一共重试三次。
10、全流程监听操作符
其实前面讲的 catch 与 retry 操作符也属于流程监听操作符,只不过异常的处理比较特殊,所以单列出来。接下来几个操作符是对 Flow 的启动和结束的监听。
10.1 onStart 操作符
onStart() 负责监听 Flow 的收集流程的开始事件,执行时机是在 collect() 被调用之后,在正式开始生产数据之前(调用上游的 collect() 之前)。
示例代码:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {for (i in 1..5) {emit(i)}}.onStart { println("onStart 1") }.onStart { println("onStart 2") }val job = scope.launch {flow.collect {println("Data: $it")}}job.join()
}
运行结果:
onStart 2
onStart 1
Data: 1
Data: 2
Data: 3
Data: 4
Data: 5
可以看到,onStart() 是在 collect() 之前运行的,并且下面的 onStart() 先于上面的 onStart() 运行。
由于 onStart() 是在 collect() 之前运行的,因此假如在 onStart() 内抛出了异常,在上游 try-catch 是捕获不到的,只能通过 catch() 捕获:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {// onStart() 先于这里的代码执行,因此即便对 emit() 使用 try-catch 也捕获不到。try {for (i in 1..5) {emit(i)}} catch (e: Exception) {e.printStackTrace()}}.onStart {println("onStart 1")throw RuntimeException("onStart error")}.onStart {println("onStart 2")}.catch {// 使用 catch() 可以捕获到 onStart() 内抛出的异常println("catch: $it")}val job = scope.launch {flow.collect {println("Data: $it")}}job.join()
}
运行结果:
onStart 2
onStart 1
catch: java.lang.RuntimeException: onStart error
由于生产过程还没开始就抛出了异常,因此结果中不会打印任何数据。但是这个由 onStart() 抛出的异常确实被 catch() 捕获到了。
10.2 onCompletion 操作符
onCompletion() 监听的是 Flow 的结束,在所有数据发送完毕后触发。除了正常结束,异常结束也会触发 onCompletion():
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {try {for (i in 1..5) {emit(i)}} catch (e: Exception) {e.printStackTrace()}}.onStart {println("onStart 1")throw RuntimeException("onStart error")}.onStart {println("onStart 2")}.onCompletion { // it:Throwable? 如果是正常结束,it 就是 nullprintln("onCompletion $it")}.catch {println("catch: $it")}val job = scope.launch {flow.collect {println("Data: $it")}}job.join()
}
运行结果:
onStart 2
onStart 1
onCompletion java.lang.RuntimeException: onStart error
catch: java.lang.RuntimeException: onStart error
可以看到,异常结束确实触发了 onCompletion() 的执行,并且这个 onCompletion() 没有拦截异常,catch() 中的内容也打印了。
10.3 onEmpty 操作符
onEmpty() 监听一条数据都没有的情况,其代码块会在 Flow 正常结束且没有发送过一条数据的时候被触发。一定是正常结束才触发,异常结束不会触发。
11、flowOn 操作符
flowOn() 用来定制其上游 Flow 运行的 CoroutineContext 的,大多数使用用来切线程,但是也可以切换其他的 CoroutineContext,如 CoroutineName 等。
先关注如何获取 Flow 的 CoroutineContext:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {println("CoroutineContext in flow(): $coroutineContext")println("CoroutineContext in flow(): ${currentCoroutineContext()}")for (i in 1..5) {emit(i)}}val job = scope.launch {flow.collect {println("Data: $it")}}job.join()
}
运行结果:
CoroutineContext in flow(): [BlockingCoroutine{Active}@6108c233, BlockingEventLoop@74ab1442]
CoroutineContext in flow(): [StandaloneCoroutine{Active}@19406503, Dispatchers.Default]
Data: 1
Data: 2
Data: 3
Data: 4
Data: 5
flow.collect() 是在 scope 内调用的,而 scope 用的是 EmptyCoroutineContext,因此其协程上下文应该是 Default。这是因为 Flow 在哪个协程调用的 collect(),它的生产流程就在该协程中启动,也就处于该协程的 CoroutineContext 的上下文环境中。因此,在 Flow 中需要使用 currentCoroutineContext() 才能获取到正确的 CoroutineContext。
再看 flowOn() 的效果,只会对其上游进行切换,不会影响到下游:
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {println("CoroutineContext in flow(): ${currentCoroutineContext()}")for (i in 1..5) {emit(i)}}.map {println("CoroutineContext in map() 1: ${currentCoroutineContext()}")it * 2}.flowOn(Dispatchers.IO).map {println("CoroutineContext in map() 2: ${currentCoroutineContext()}")it * 2}val job = scope.launch {flow.collect {println("Data: $it - ${currentCoroutineContext()}")}}job.join()
}
运行结果:
CoroutineContext in flow(): [ProducerCoroutine{Active}@a9dc865, Dispatchers.IO]
CoroutineContext in map() 1: [ProducerCoroutine{Active}@a9dc865, Dispatchers.IO]
CoroutineContext in map() 1: [ProducerCoroutine{Active}@a9dc865, Dispatchers.IO]
CoroutineContext in map() 1: [ProducerCoroutine{Active}@a9dc865, Dispatchers.IO]
CoroutineContext in map() 1: [ProducerCoroutine{Active}@a9dc865, Dispatchers.IO]
CoroutineContext in map() 1: [ProducerCoroutine{Active}@a9dc865, Dispatchers.IO]
CoroutineContext in map() 2: [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
Data: 4 - [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
CoroutineContext in map() 2: [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
Data: 8 - [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
CoroutineContext in map() 2: [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
Data: 12 - [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
CoroutineContext in map() 2: [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
Data: 16 - [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
CoroutineContext in map() 2: [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
Data: 20 - [ScopeCoroutine{Active}@5553375d, Dispatchers.Default]
可以看到 flowOn() 的上游都被切换为 Dispatchers.IO,而下游还都在 Dispatchers.Default 下。
flowOn() 只切换上游的 CoroutineContext,是因为上下游的代码有可能是两个程序员写的。下游通常不会关注上游写了什么,因此如果 flowOn() 连下游的 CoroutineContext 都能切换,会让下游的代码行为变得难以(只写被下游代码的程序员)预期。