4.5-Channel 和 Flow:SharedFlow 和 StateFlow

文章目录

  • SharedFlow
    • 数据流的收集和事件订阅的区别
    • launchIn() 和 shareIn() 的区别
    • SharedFlow 与 Flow、Channel 的区别
    • shareIn() 适用场景
  • shareIn() 的具体参数说明
    • shareIn() 的 replay 参数
    • shareIn() 的 started 参数
    • WhileSubscribed() 的参数及适用场景
  • MutableSharedFlow、asSharedFlow()
    • MutableSharedFlow
    • asSharedFlow()
  • StateFlow、MutableStateFlow、asStateFlow()
  • 总结

SharedFlow 和 StateFlow 都是一种特殊的 Flow 的变种,SharedFlow 是把 Flow 的功能从数据流的收集改成了事件流;StateFlow 是 SharedFlow 的细化细分,StateFlow 将事件流改成了状态订阅,经过这两个 Flow 的变种一下就把 Flow 的适用场景切换到了一个非常实用的范围。

但是需要了解 SharedFlow 和 StateFlow 就必须得掌握 Flow,所以在开始这篇文章前建议回看 Flow 的功能定位、工作原理及应用场景 和 Flow 的创建、收集和操作符,在掌握 Flow 的基础上学习 SharedFlow 和 StateFlow 才能更好的了解透彻。

SharedFlow

数据流的收集和事件订阅的区别

在前面的章节我们有讲过一个用 Flow 持续获取实时天气信息的例子:

var count = 0// 只负责数据的生产获取
val weatherFlow = flow {while (count < 10) {emit(getWeather())count++delay(1000)}
}suspend fun getWeather() = withContext(Dispatchers.IO) {"Sunny"
}fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)scope.launch {weatherFlow.collect {println("count = ${count}, Weather: $it")}println("done")}delay(15000)
}

通过调用 collect() [订阅] 接收数据。说是 [订阅] 更确切的说其实是 [收集],Flow 虽然是个数据流但是它只是设定好了数据流的规则,而并不是直接开始启动数据流开始生产,生产过程是在调用 collect() 时才启动的,并且是每次调用 collect() 都分别启动一个新的数据流。这也是为什么 Flow 提供数据收集的函数名为 collect() 而不是 subscribe()。

按上面这么说你或许会有疑惑:既然数据收集这个视角它这么没用,为什么不一开始就把 Flow 设计成事件订阅的视角?

因为 数据收集并不是没用,而是更加通用而已。事件订阅的场景比普通的数据收集要多得多,但并不能简单的说它更有用,而是它更专、更垂直

实际上事件订阅就是一种特殊类型的数据收集,用数据收集的功能是能实现事件订阅的功能,这种事件订阅的 API 在 Flow 也有提供就是 SharedFlow

launchIn() 和 shareIn() 的区别

在讲解 Flow 操作符时有提到一个函数 launchIn(),它会立即用你指定的 CoroutineScope 启动一个协程,然后调用 collect() 在这个协程启动收集流程:

Collect.ktpublic fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {collect() // tail-call
}

还有一个函数 shareIn(),shareIn() 可以将一个已存在的 Flow 转换成 SharedFlow,同样的也是调用 collect() 收集:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 通过 shareIn() 将 Flow 转换成 SharedFlowval sharedFlow = flow.shareIn(scope)scope.launch {sharedFlow.collect { println("SharedFlow in Coroutine: $it")}}delay(10000)
}输出结果:
SharedFlow in Coroutine: 1
SharedFlow in Coroutine: 2
SharedFlow in Coroutine: 3

shareIn() 还可以定制指定数据收集的时间,比如修改为 SharingStarted.Eagerly 立即启动:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 指定启动收集时间为 Eagerly,创建 SharedFlow 的同时立即启动生产val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {sharedFlow.collect { println("SharedFlow in Coroutine: $it")}}delay(10000)
}

shareIn() 其实是会创建一个新的 Flow,把上游 Flow 发送的每条数据转发到下游每个调用 collect() 的 FlowCollector:

Share.ktpublic fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T> {val config = configureSharing(replay)// 创建了一个 SharedFlow,转发上游发送的数据val shared = MutableSharedFlow<T>(replay = replay,extraBufferCapacity = config.extraBufferCapacity,onBufferOverflow = config.onBufferOverflow)@Suppress("UNCHECKED_CAST")val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}

launchIn() 和 shareIn() 都是启动一个协程并在协程里调用 Flow 的 collect(),但它们有两点区别

  • shareIn() 并不是第一时间就启动 Flow 的收集,可以通过参数定制启动收集的时间

  • shareIn() 会创建一个新的 Flow 并返回,返回的 Flow 类型就是 SharedFlow;SharedFlow 实际上只是把上游 Flow 发送的每条数据做转发

SharedFlow 与 Flow、Channel 的区别

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)// 上游 Flow 数据val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// shareIn() 创建了一个新的 Flow 为 SharedFlowval sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {// 上游 Flow 已经准备了数据,所以这里的 SharedFlow 其实是转发上游的数据下来收集sharedFlow.collect { println("SharedFlow in Coroutine: $it")}}delay(10000)
}

从这个角度分析,shareIn() 创建了 SharedFlow 把 [数据生产和数据收集流程分拆开],这个特点让 SharedFlow 相比传统的 Flow,倒不如说 SharedFlow 更像是 Channel。

SharedFlow 和 Channel 不一样的是,SharedFlow 不是瓜分式的,而是每条数据都会发送到每一个进行中的 collect()

比如下面我们调用多个 collect(),正常是会完整打印所有数据,和 Flow 一样:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {sharedFlow.collect { println("SharedFlow in Coroutine 1: $it")}}scope.launch {sharedFlow.collect { println("SharedFlow in Coroutine 2: $it")}}	delay(10000)
}输出结果:
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 2: 1
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3

普通的 Flow 多次调用 collect() 都独立完整跑一次流程,SharedFlow 是多次调用 collect() 只跑一次流程,即用 SharedFlow 事件订阅调用 collect() 发生在数据发送之后,调用 collect() 前发送的数据将丢失

为了验证上面的说法,我们给调用 collect() 前分别加延迟:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)scope.launch {delay(500)// SharedFlow 设置了调用 sharedIn() 就立即启动// 这里延迟 500ms 后才调用的 collect(),错过了第一条数据sharedFlow.collect { println("SharedFlow in Coroutine 1: $it")}}scope.launch {delay(1500)// 这里延迟 1500ms 后才调用的 collect(),错过了两条数据sharedFlow.collect { println("SharedFlow in Coroutine 2: $it")}}	delay(10000)
}Flow 的输出结果:
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 2: 1
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3SharedFlow 的输出结果:
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 2: 3
SharedFlow in Coroutine 1: 3

Flow 还是会正常打印完整,这里 SharedFlow 分别加了延迟后都错过了数据接收没有打印出来。这就是 SharedFlow。

接下来我们再聊聊 [冷] 和 [热] 的话题。

在官方说法,Channel 是 [热] 的,Flow 是 [冷] 的,Channel 的 [热] 其实就是不读取数据它也可以发送,Flow 的 [冷] 是只在每次 collect() 被调用的时候才会启动数据发送流程。

SharedFlow 虽然是 Flow,但它是 [热] 的,因为 SharedFlow 的活跃状态跟它是否正在被调用 collect() 函数来收集数据是无关的,所以它的活跃状态是独立的,这就跟 Channel 一样了,所以它是 [热] 的

SharedFlow 的 [热] 和 Channel 的 [热] 不太一样:Channel 的 [热] 是真的数据的发送和读取两个流程完全独立的;SharedFlow 的 [热] 其实并不是技术角度的描述,而是业务逻辑角度的,它的本质依然是在 collect() 被调用时才开始生产,本质上 SharedFlow 依然是 [冷] 的,但是由于它背靠着一个独立运作的 Flow,所以它生产出来的数据跟 collect() 的调用并没有绑定,而是独立生产的

所以我们说 SharedFlow 是 [冷] 的那就是从技术角度分析,说它是 [热] 的那就是从业务逻辑角度分析,两个说法都对

对 SharedFlow 调用多次 collect() 虽然它被收集了多次,但它们的数据源是同一套而不是各自一套,这就是共享

shareIn() 适用场景

shareIn() 适用场景:

  • 数据来源共享:如果想要一个 Flow 它被收集多次的时候都可以共享相同的数据生产流程,就可以用 shareIn() 将 Flow 转成 SharedFlow,再让下游去收集 SharedFlow,多次的收集之间是依赖的同一个数据流

  • 生产提前启动:SharedFlow 能做到数据生产的提前启动,如果有一个 Flow 有耗时的初始化的操作,但不希望在调用 collect() 的时候等待这个初始化,也可以将 Flow 转成 SharedFlow,因为在这里的目的并不是共享,而是为了提前启动生产

  • 事件订阅:因为 SharedFlow 是 [热] 的,生产流程是独立的,那么在开始生产之后才开始收集,那就会漏掉之前生产的数据,所以 SharedFlow 也适合对从头开始收集数据没有需求的场景

SharedFlow 的效果是把 [数据生产和数据收集流程分拆开],这个效果让 SharedFlow 可以满足各种需求场景,比如事件订阅、提前启动生产、数据来源共享等,通常来讲我们也会把它用在事件流订阅的场景

shareIn() 的适用场景本质上就是 [数据生产和数据收集流程分拆开] 的需求,都可以将 Flow 转成 SharedFlow 来解决。SharedFlow 的 [热] 就是我们使用 SharedFlow 的根本原因

SharedFlow 并不会因为生产流程的结束而结束订阅,即数据生产都发送完了,SharedFlow 的 collect() 会一直运行,直到外部协程的取消而抛异常结束

shareIn() 的具体参数说明

Share.ktpublic fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T> {val config = configureSharing(replay)val shared = MutableSharedFlow<T>(replay = replay,extraBufferCapacity = config.extraBufferCapacity,onBufferOverflow = config.onBufferOverflow)@Suppress("UNCHECKED_CAST")val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}

shareIn() 有三个参数,第一个参数在前面小节也有讲到是指定启动事件订阅时所在的协程,主要是讲后面的两个参数。

shareIn() 的 replay 参数

replay 参数第一个功能是缓冲,可以做到根据设置的数值暂存数据

比如在前面小节用 SharedFlow 收集数据,延迟一段时间在调用 collect(),最终结果就是会丢失数据,因为 SharedFlow 创建的同时也已经启动了生产流程。

现在我们设置 replay 参数暂存一条数据:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 设置 replay 参数为 1,表示暂存缓冲一条数据val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly, 1)scope.launch {delay(500)sharedFlow.collect { println("SharedFlow in Coroutine 1: $it")}}scope.launch {delay(1500)sharedFlow.collect {println("SharedFlow in Coroutine 2: $it")}}delay(10000)
}输出结果:
// SharedFlow in Coroutine 1: 1,这个来不及消费的数据打印了出来
// SharedFlow in Coroutine 2: 1,没有打印,因为缓冲暂存 1 条数据,所以第一条数据被丢弃,但打印了第二条数据 SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 1 
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 2: 3
SharedFlow in Coroutine 1: 3

将 replay 参数设置为 1 暂存一条数据,第一个 collect() 来不及消费的第一条数据也正常打印了出来,第二个 collect() 因为错过了两条数据,但暂存数据只有一条,所以第一条数据被丢弃了,第二条数据正常打印。

replay 参数第二个功能是缓存,对于已经消费过的数据,也依然缓冲下来,用来给后面新订阅的 collect() 使用。简单说就是除了缓冲来不及消费的数据,还会把已经消费过的数据也缓冲下来

用个例子说明下 replay 缓存的功能:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// replay 设置缓存两条数据val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly, 2)scope.launch {delay(500)sharedFlow.collect { println("SharedFlow in Coroutine 1: $it")}}scope.launch {delay(5000) // 都错过了数据,订阅时会立即被发送出来sharedFlow.collect {println("SharedFlow in Coroutine 2: $it")}}delay(10000)
}输出结果:
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3 
SharedFlow in Coroutine 2: 2 // 立即收到缓存发送的数据
SharedFlow in Coroutine 2: 3 // 立即收到缓存发送的数据

上面的例子可以看到,第二个 collect() 已经都错过了数据的发送,但在调用 collect() 时还是能立即接收到数据,接收到的数据数量是 replay 设置的大小。

总结下 replay 参数的功能

  • replay 参数是即有缓冲功能又有缓存功能的缓冲,提前生产数据但收集靠后时要缓冲暂存多少漏接收的数据

  • 在来不及消费的时候可以先把数据缓冲下来,缓冲的尺寸就是 replay 的大小

  • 对于已经使用完的数据它也会继续缓存下来,等到有新订阅的时候直接发送出来,缓存的大小也是 replay 的大小

shareIn() 的 started 参数

started 是用于设置数据生产的启动时间。它有三个数值

  • SharingStarted.Eagerly:调用 shareIn() 创建 SharedFlow 的同时立即启动数据的生产

  • SharingStarted.Lazily:调用第一次 collect() 时才会启动数据的生产

  • SharingStarted.WhileSubscribed():可以把上游的数据流给结束和重启的规则,它是一种复杂化的 Lazily,不仅是在第一次订阅的时候启动上游的数据流,而且在下游所有订阅全都结束之后,它会把上游 Flow 的生产流程也结束掉,这时候如果再有订阅,它就会重新启动上游的数据流

在讲解 WhileSubscribed() 前插入一个话题,SharedFlow 的 collect() 订阅并不会因为上游 Flow 数据发送完成而结束。SharedFlow 的 collect() 返回值是 Nothing:

SharedFlow.ktoverride suspend fun collect(collector: FlowCollector<T>): Nothing

一个返回值为 Nothing,说明它永远不会返回一直运行下去,除非抛出异常。比如 SharedFlow 这里就可以通过外部协程的取消或抛出其他异常取消 SharedFlow 的订阅

继续回到 WhileSubscribed(),模拟调用第一个 collect() 后结束了 Flow 的生产流程,调用第二个 collect() 时会重新启动生产流程,但这时候使用的是上一个 collect() 生产过的数据缓存:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}val sharedFlow = flow.shareIn(scope, SharingStarted.WhileSubscribed(), 2)scope.launch {val parent = thislaunch {// 第二个 collect() 订阅前取消了第一个 SharedFlow 的订阅,触发上游重启生产delay(4000) // 通过取消协程的方式结束 SharedFlow 的订阅// 模拟 SharedFlow 所有 collect() 结束了,才让第二个 collect() 能触发上游的 Flow 的重启parent.cancel()}delay(1500)sharedFlow.collect { println("SharedFlow in Coroutine 1: $it")}}scope.launch {delay(5000)sharedFlow.collect {println("SharedFlow in Coroutine 2: $it")}}delay(10000)
}输出结果:
SharedFlow in Coroutine 1: 1 
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 2 // 立即打印出来,使用了上一个 collect() 的数据缓存
SharedFlow in Coroutine 2: 3 // 立即打印出来,使用了上一个 collect() 的数据缓存
SharedFlow in Coroutine 2: 1 // 重启上游 Flow 生产发送的数据
SharedFlow in Coroutine 2: 2 // 重启上游 Flow 生产发送的数据
SharedFlow in Coroutine 2: 3 // 重启上游 Flow 生产发送的数据

上面的例子通过第一个 collect() 手动取消外部协程的方式,模拟第二个 collect() 触发重启上游 Flow 生产发送。从打印可以看到第二个 collect() 在重启前会先收到上一个 collect() 的缓存数据,然后重新接收到了上游 Flow 重启发送的数据。

WhileSubscribed() 的参数及适用场景

SharingStarted.ktpublic fun WhileSubscribed(stopTimeoutMillis: Long = 0,replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)

WhileSubscribed() 还可以定制参数

  • stopTimeoutMillis:所有 collect() 结束之后依然不判定为 [所有 collect() 都结束],等待该参数设置的时间后还没有新的 collect(),才认为是都结束将上游 Flow 的生产流程结束

  • replayExpirationMillis:设置订阅流程结束之后、清空缓存的让缓存失效的时间,最后一个 collect() 结束且 stopTimeoutMillis 也超时之后,再过 replayExpirationMillis 时间后还是没有新的 collect() 被调用,缓存下来的数据就会被丢弃

WhileSubscribed() 的适用场景:假设软件里有一个可以被订阅的事件流,这个事件流会在多个地方被订阅,而同时这个事件流还非常的重即生产流程非常消耗资源,所以想要在所有订阅都结束的时候及时的结束生产;这种场景就很适合用 WhileSubscribed() 配置自动结束、自动重启的 SharedFlow

MutableSharedFlow、asSharedFlow()

MutableSharedFlow

一直以来我们讲解 Flow 都是在内部调用 emit() 生产数据,然后在一个地方调用 collect() 收集发送过来的数据;但我们的业务需求可能需要能支持在外部调用 emit() 发送数据,比如 UI 交互点击事件,在用户点击按钮的时候,可以从它的点击监听回调能调用一下 emit(),而不是只能从上游的 Flow 把事件发送出来,这是一种很正常的需求。

需要能在外部调用 emit() 发送数据要用 MutableSharedFlow

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val mutableSharedFlow = MutableSharedFlow<String>()scope.launch {mutableSharedFlow.emit("Hello") // 可以外部发送数据mutableSharedFlow.collect {println("mutableSharedFlow: $it")}}delay(10000)
}

或许你会有疑问:为什么 Flow 不直接提供可以外部发送数据的 Flow?

Flow 不提供外部发送数据的原因也很简单,Flow 数据流本就是一个需要指定规则然后按规则一条条把数据发送出来,本来就不需要从外部发送数据,如果还提供外部发送数据,内部和外部的数据混乱数据源不统一,反而更容易让开发者不小心写出错误代码

SharedFlow 是事件流,它天然就是需要从各个地方发送数据的,Flow 数据流的限制就不需要了,允许让我们在任何协程发送数据

MutableSharedFlow 相比 shareIn() 并不是从内部发送数据变成了外部发送数据,而是从只能从上游 Flow 发送数据变成可以从任何协程发送数据。

MutableSharedFlow 和 shareIn() 的选择

  • 如果要创建一个事件流,在外部生产数据发送数据源,就用 MutableSharedFlow

  • 如果已经有了一个生产事件流的 Flow,不需要自己写生产数据的代码,直接将 Flow 用 shareIn() 转成 SharedFlow 即可

asSharedFlow()

在 Android 经常会在 ViewModel 定义 MutableSharedFlow,在数据请求后通过 MutableSharedFlow 调用 emit() 将结果通知到页面 Activity 或 Fragment,也就是页面需要订阅事件,希望把 MutableSharedFlow 暴露出来给外部去订阅,但又不希望让外部也来发送数据的时候,可以通过 asSharedFlow() 将 MutableSharedFlow 转换成 SharedFlow

class MyViewModel : ViewModel() {private val repository = Repository()private val flow = MutableSharedFlow<String>()val sharedFlow = flow.asSharedFlow() // 提供给外部订阅fun request() {viewModelScope.launch {repository.request {flow.emit(it)}}	}
}class MyActivity : ComponentActivity() {private val viewModel by viewModels<MyViewModel>()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)lifecycleScope.launch {viewModel.sharedFlow.collect {// ...}}}
}

StateFlow、MutableStateFlow、asStateFlow()

StateFlow 是一种特殊的 SharedFlow,SharedFlow 是把数据流的收集收窄到了事件流的订阅,StateFlow 则是进一步的收窄,从事件流的订阅收窄到了状态的订阅。

StateFlow.ktpublic interface StateFlow<out T> : SharedFlow<T> {// 最新的一条数据public val value: T
}

可以看到 StateFlow 其实就是 SharedFlow 的子接口,并且增加了一个 value 属性。value 就是 SharedFlow 里最新的一条数据,所以 StateFlow 实际上就是一个仅仅保存最新的一个事件的 SharedFlow 事件流

我们用 StateFlow 实现状态订阅:

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val name = MutableStateFlow("test") // 提供初始值scope.launch { name.collect {println("State: $it")}}scope.launch {delay(2000)name.emit("Hello world")}delay(10000)
}输出结果:
State: test
State: Hello world

如果想 将 Flow 转换成 StateFlow 也可以使用 stateIn()

fun main() = runBlocking {val scope = CoroutineScope(EmptyCoroutineContext)val flow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 通过 stateIn() 将 Flow 转换成 StateFlowval stateFlow = flow.stateIn(scope)scope.launch { stateFlow.collect {println("State: $it")}}delay(10000)
}

asStateFlow() 可以把可读写的 MutableStateFlow 转换为只读的 StateFlow,用来对外暴露的时候把写数据的功能给隐藏

总结

1、数据流的收集和事件订阅的区别

Flow 数据流的数据收集相比事件订阅场景更加通用,事件订阅的场景比普通的数据收集要多得多,但并不能简单的说它更有用,而是它更专、更垂直

实际上事件订阅就是一种特殊类型的数据收集,用数据收集的功能是能实现事件订阅的功能,这种事件订阅的 API 在 Flow 也有提供就是 SharedFlow

2、launchIn() 和 shareIn() 的区别

launchIn() 和 shareIn() 都是启动一个协程并在协程里调用 Flow 的 collect(),但它们有两点区别

  • shareIn() 并不是第一时间就启动 Flow 的收集,可以通过参数定制启动收集的时间

  • shareIn() 会创建一个新的 Flow 并返回,返回的 Flow 类型就是 SharedFlow;SharedFlow 实际上只是把上游 Flow 发送的每条数据做转发

3、SharedFlow 与 Flow、Channel 的区别

(1)SharedFlow 和 Flow 的区别

普通的 Flow 多次调用 collect() 都独立完整跑一次流程,SharedFlow 是多次调用 collect() 只跑一次流程,即用 SharedFlow 事件订阅调用 collect() 发生在数据发送之后,调用 collect() 前发送的数据将丢失

(2)SharedFlow 和 Channel 的区别

在官方说法,Channel 是 [热] 的,Flow 是 [冷] 的,Channel 的 [热] 其实就是不读取数据它也可以发送,Flow 的 [冷] 是只在每次 collect() 被调用的时候才会启动数据发送流程。

SharedFlow 虽然是 Flow,但它是 [热] 的,因为 SharedFlow 的活跃状态跟它是否正在被调用 collect() 函数来收集数据是无关的,所以它的活跃状态是独立的,这就跟 Channel 一样了,所以它是 [热] 的

SharedFlow 的 [热] 和 Channel 的 [热] 不太一样:Channel 的 [热] 是真的数据的发送和读取两个流程完全独立的;SharedFlow 的 [热] 其实并不是技术角度的描述,而是业务逻辑角度的,它的本质依然是在 collect() 被调用时才开始生产,本质上 SharedFlow 依然是 [冷] 的,但是由于它背靠着一个独立运作的 Flow,所以它生产出来的数据跟 collect() 的调用并没有绑定,而是独立生产的

所以我们说 SharedFlow 是 [冷] 的那就是从技术角度分析,说它是 [热] 的那就是从业务逻辑角度分析,两个说法都对

对 SharedFlow 调用多次 collect() 虽然它被收集了多次,但它们的数据源是同一套而不是各自一套,这就是共享

4、shareIn() 的适用场景

  • 数据来源共享:如果想要一个 Flow 它被收集多次的时候都可以共享相同的数据生产流程,就可以用 shareIn() 将 Flow 转成 SharedFlow,再让下游去收集 SharedFlow,多次的收集之间是依赖的同一个数据流

  • 生产提前启动:SharedFlow 能做到数据生产的提前启动,如果有一个 Flow 有耗时的初始化的操作,但不希望在调用 collect() 的时候等待这个初始化,也可以将 Flow 转成 SharedFlow,因为在这里的目的并不是共享,而是为了提前启动生产

  • 事件订阅:因为 SharedFlow 是 [热] 的,生产流程是独立的,那么在开始生产之后才开始收集,那就会漏掉之前生产的数据,所以 SharedFlow 也适合对从头开始收集数据没有需求的场景

SharedFlow 的效果是把 [数据生产和数据收集流程分拆开],这个效果让 SharedFlow 可以满足各种需求场景,比如事件订阅、提前启动生产、数据来源共享等,通常来讲我们也会把它用在事件流订阅的场景

shareIn() 的适用场景本质上就是 [数据生产和数据收集流程分拆开] 的需求,都可以将 Flow 转成 SharedFlow 来解决。SharedFlow 的 [热] 就是我们使用 SharedFlow 的根本原因

SharedFlow 并不会因为生产流程的结束而结束订阅,即数据生产都发送完了,SharedFlow 的 collect() 会一直运行,直到外部协程的取消而抛异常结束

5、shareIn() 的具体参数

(1)shareIn() 的 replay 参数

replay 参数是即有缓冲功能又有缓存功能的缓冲,提前生产数据但收集靠后时要缓冲暂存多少漏接收的数据

  • 在来不及消费的时候可以先把数据缓冲下来,缓冲的尺寸就是 replay 的大小

  • 对于已经使用完的数据它也会继续缓存下来,等到有新订阅的时候直接发送出来,缓存的大小也是 replay 的大小

(2)shareIn() 的 started 参数

started 是用于设置数据生产的启动时间。它有三个数值

  • SharingStarted.Eagerly:调用 shareIn() 创建 SharedFlow 的同时立即启动数据的生产

  • SharingStarted.Lazily:调用第一次 collect() 时才会启动数据的生产

  • SharingStarted.WhileSubscribed():可以把上游的数据流给结束和重启的规则,它是一种复杂化的 Lazily,不仅是在第一次订阅的时候启动上游的数据流,而且在下游所有订阅全都结束之后,它会把上游 Flow 的生产流程也结束掉,这时候如果再有订阅,它就会重新启动上游的数据流

除此之外还插入说明了 SharedFlow 的 collect() 订阅并不会因为上游 Flow 数据发送完成而结束

SharedFlow 的 collect() 返回值为 Nothing,说明它永远不会返回一直运行下去,除非抛出异常。比如可以通过外部协程的取消间接取消 SharedFlow 的订阅

(3)WhileSubscribed() 的适用场景

假设软件里有一个可以被订阅的事件流,这个事件流会在多个地方被订阅,而同时这个事件流还非常的重即生产流程非常消耗资源,所以想要在所有订阅都结束的时候及时的结束生产;这种场景就很适合用 WhileSubscribed() 配置自动结束、自动重启的 SharedFlow

6、MutableSharedFlow、asSharedFlow()

(1)MutableSharedFlow

需要能在外部调用 emit() 发送数据要用 MutableSharedFlow

Flow 不提供外部发送数据的原因也很简单,Flow 数据流本就是一个需要指定规则然后按规则一条条把数据发送出来,本来就不需要从外部发送数据,如果还提供外部发送数据,内部和外部的数据混乱数据源不统一,反而更容易让开发者不小心写出错误代码

SharedFlow 是事件流,它天然就是需要从各个地方发送数据的,Flow 数据流的限制就不需要了,允许让我们在任何协程发送数据

MutableSharedFlow 和 shareIn() 的选择

  • 如果要创建一个事件流,在外部生产数据发送数据源,就用 MutableSharedFlow

  • 如果已经有了一个生产事件流的 Flow,不需要自己写生产数据的代码,直接将 Flow 用 shareIn() 转成 SharedFlow 即可

(2)asSharedFlow()

希望把 MutableSharedFlow 暴露出来给外部去订阅,但又不希望让外部也来发送数据的时候,可以通过 asSharedFlow() 将 MutableSharedFlow 转换成 SharedFlow

7、StateFlow、MutableStateFlow、asStateFlow()

StateFlow 其实就是 SharedFlow 的子接口,StateFlow 实际上就是一个仅仅保存最新的一个事件的 SharedFlow 事件流

将 Flow 转换成 StateFlow 也可以使用 stateIn()

asStateFlow() 可以把可读写的 MutableStateFlow 转换为只读的 StateFlow,用来对外暴露的时候把写数据的功能给隐藏

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/482134.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM CCA机密计算安全模型之硬件强制安全

安全之安全(security)博客目录导读 [要求 R0004] Arm 强烈建议所有 CCA 实现都使用硬件强制的安全(CCA HES)。本文件其余部分假设系统启用了 CCA HES。 CCA HES 是一个可信子系统的租户——一个 CCA HES 主机(Host),见下图所示。它将以下监控安全域服务从应用处理元件(P…

【电子通识】失效分析的流程和方法

在文章:【电子通识】失效分析的基本概念-CSDN博客 中我们讲到失效分析是是指产品失效后,根据失效的现象/模式,通过分析和验证,模拟重现失效的现象,找出失效的原因,挖掘出失效的机理的活动。 同时还讲到失效模式和失效机理,并且以LED和贴片电阻做为举例。 失效模式是失效…

Flutter:页面滚动

1、单一页面&#xff0c;没有列表没分页的&#xff0c;推荐使用&#xff1a;SingleChildScrollView() return Scaffold(backgroundColor: Color(0xffF6F6F6),body: SingleChildScrollView(child: _buildView()) );2、列表没分页&#xff0c;如购物车页&#xff0c;每个item之间…

facebook欧洲户开户条件有哪些又有何优势?

在当今数字营销时代&#xff0c;Facebook广告已成为企业推广产品和服务的重要渠道。而为了更好地利用这一平台&#xff0c;广告主们需要理解不同类型的Facebook广告账户。Facebook广告账户根据其属性可分为多种类型&#xff0c;包括个人广告账户、企业管理&#xff08;BM&#…

WEB攻防-通用漏洞CSRFSSRF协议玩法内网探针漏洞利用

CSRF构造工具&#xff0c;也可以用bp构造 选中要保存的请求&#xff0c;点击Generate HTML,生成带有添加用户请求的html文件&#xff0c;然后将构造的html放在网站上&#xff0c;生成访问地址&#xff0c;诱导管理员点击链接&#xff0c;就会添加用户 start Recording之后就会…

C#面向对象之访问限制,类基础,继承

文章目录 1 访问限制1.1 简介 2 类基础讲解2.1 类定义2.2 构造函数2.2.1 构造函数2.2.2 静态构造函数2.2.3 初始化顺序2.2.4 对象初始化器 2.3 析构函数2.4 类的静态成员2.5 匿名对象2.5.1 定义2.5.2 匿名对象的创建 3 继承3.1 基类和派生类3.2 基类初始化3.3 Partial类3.3.1 定…

代码之丑第一期-缩进

各位小伙伴们&#xff0c;大家好&#xff01;咱今天就算是正式开张了。实不相瞒&#xff0c;第一期的内容早已写好&#xff0c;但唯独这开篇方式&#xff0c;笔者想了好些时间&#xff0c;包括但不限于如下风格&#xff1a; 斗破苍穹式&#xff08;已经三刷&#xff09;&#…

JVM 性能调优 -- JVM常用调优工具【jps、jstack、jmap、jstats 命令】

前言&#xff1a; 前面我们分析怎么去预估系统资源&#xff0c;怎么去设置 JVM 参数以及怎么去看 GC 日志&#xff0c;本篇我们分享一些常用的 JVM 调优工具&#xff0c;我们在进行 JVM 调优的时候&#xff0c;通常需要借助一些工具来对系统的进行相关分析&#xff0c;从而确定…

linux上离线部署Mysql5.7.22

官网下载地址: https://downloads.mysql.com/archives/community/ Mysql安装步骤&#xff1a; 1.上传mysql安装包 上传 mysql-5.7.22-linux-glibc2.12-x86_64.tar.gz 到服务器指定目录 2.解压缩 tar -zxvf mysql-5.7.22-linux-glibc2.12-x86_64.tar.gz 3.修改名称 mv mysq…

日志与线程池

这里写自定义目录标题 日志Log.hpp测试main.cpp结果 线程池线程池的种类ThreadPool.hpp测试 Task.hpp 和 main.cppTask.hppmain.cpp结果 线程池的单例模式实现方式SignalThreadPool.hpp测试main.cpp 线程安全与重入问题死锁死锁的四个必要条件 日志 日志需要包含的信息 • 时间…

1.1 数据结构的基本概念

1.1.1 基本概念和术语 一、数据、数据对象、数据元素和数据项的概念和关系 数据&#xff1a;是客观事物的符号表示&#xff0c;是所有能输入到计算机中并被计算机程序处理的符号的总称。 数据是计算机程序加工的原料。 数据对象&#xff1a;是具有相同性质的数据元素的集合&…

泷羽sec学习打卡-shell命令5

声明 学习视频来自B站UP主 泷羽sec,如涉及侵权马上删除文章 笔记的只是方便各位师傅学习知识,以下网站只涉及学习内容,其他的都 与本人无关,切莫逾越法律红线,否则后果自负 关于shell的那些事儿-shell5 字符串运算符逻辑运算符之布尔运算符实践是检验真理的唯一标准 字符串运算…

Elasticearch索引mapping写入、查看、修改

作者&#xff1a;京东物流 陈晓娟 一、ES Elasticsearch是一个流行的开源搜索引擎&#xff0c;它可以将大量数据快速存储和检索。Elasticsearch还提供了强大的实时分析和聚合查询功能&#xff0c;数据模式更加灵活。它不需要预先定义固定的数据结构&#xff0c;可以随时添加或修…

Mybatis Plus 增删改查方法(一、增)

先定义一个简单的测试表&#xff0c;执行脚本如下&#xff1a; create table user(id bigint primary key auto_increment,name varchar(255) not null,age int not null default 0 check (age > 0) ); 根据Spingbootmybatisplus的结构根据表自行构建结构&#xff0c;大致…

本地部署 WireGuard 无需公网 IP 实现异地组网

WireGuard 是一个高性能、极简且易于配置的开源虚拟组网协议。使用路由侠内网穿透使其相互通讯。 第一步&#xff0c;服务端&#xff08;假设为公司电脑&#xff09;和客户端&#xff08;假设为公司外的电脑&#xff09;安装部署 WireGuard 1&#xff0c;点此下载&#xff08;…

unity中添加预制体及其基本设置

unity中添加预制体及其基本设置 Unity 中使用预制体的好处使用示例代码解释 Unity 中使用预制体的好处 1. 提高代码复用性 预制体可将一个游戏对象及其所有组件、子对象和设置存储在一个资源文件中&#xff0c;然后在项目中多次使用这个资源。这大大提高了代码的复用性&#x…

给定一个整数可能为正,0,负数,统计这个数据的位数.

题目描述 给定一个整数可能为正,0,负数,统计这个数据的位数. 例如1234567输出7位; -12345678输出8位;0输出1位 代码实现 int main() { long long m; long long n; scanf("%lld",&n); mn; int count0;//位数 do { count; n/10;//舍弃个位 }while(n!0); printf(&…

Linux:文件系统inode

早期&#xff0c;存储文件的设备是磁盘&#xff08;当下的市场几乎都是SSD&#xff09;&#xff0c;但大家习惯的把它们都称为磁盘&#xff0c;磁盘是用来表示区分内存的存储设备。而在操作系统看来&#xff0c;这个存储设备的结构就是一个线性结构&#xff0c;这一点很重要。 …

C++STL之vector(超详细)

CSTL之vector 1.vector基本介绍2.vector重要接口2.1.构造函数2.2.迭代器2.3.空间2.3.1.resize2.3.2.capacity 2.4.增删查找 3.迭代器失效4.迭代器分类 &#x1f31f;&#x1f31f;hello&#xff0c;各位读者大大们你们好呀&#x1f31f;&#x1f31f; &#x1f680;&#x1f68…

深入浅出机器学习中的梯度下降算法

大家好&#xff0c;在机器学习中&#xff0c;梯度下降算法&#xff08;Gradient Descent&#xff09;是一个重要的概念。它是一种优化算法&#xff0c;用于最小化目标函数&#xff0c;通常是损失函数。梯度下降可以帮助找到一个模型最优的参数&#xff0c;使得模型的预测更加准…