Swift Combine 学习(五):Backpressure和 Scheduler

  • Swift Combine 学习(一):Combine 初印象
  • Swift Combine 学习(二):发布者 Publisher
  • Swift Combine 学习(三):Subscription和 Subscriber
  • Swift Combine 学习(四):操作符 Operator
  • Swift Combine 学习(五):Backpressure和 Scheduler
  • Swift Combine 学习(六):自定义 Publisher 和 Subscriber
  • Swift Combine 学习(七):实践应用场景举例

    文章目录

      • 引言
      • 订阅的生命周期
      • Backpressure
      • Scheduler
      • 结语

引言

在前面的文章中,已经介绍了 Combine 的基础概念、订阅机制和操作符的使用。本文将深入探讨 Combine 中的异步流程控制,包括 Backpressure 和 Scheduler。这些概念对于编写健壮的异步应用程序非常重要。

订阅的生命周期

Combine 中的订阅遵循以下生命周期:

  1. 创建订阅:当调用 Publishersubscribe(_:) 方法时,创建一个新的订阅。
  2. 请求值:订阅者通过 Subscriptionrequest(_:) 方法请求值。
  3. 接收值:发布者通过调用订阅者的 receive(_:) 方法发送值。
  4. 完成或取消:发布者通过两种方式终止:
    1. 调用 receive(completion:) 方法表示完成(成功或失败)
    2. 订阅者调用 cancel() 方法取消订阅。

Backpressure

Combine 的 Backpressure 技术用于控制发布者(Publisher)向订阅者(Subscriber)发送数据的速率,防止订阅者因处理能力不足而被过多的数据淹没掉。Backpressure 本质上就是一种流量控制机制,确保系统在高负载或高并发情况下仍然能正常工作。在 Combine 中,Backpressure 通过 Subscribers.Demand 来处理:

  • Subscribers.Demand 允许订阅者精确控制它希望接收的元素数量。
  • 订阅者可以请求有限数量的元素(如.max(n))、无限元素(.unlimited),或者不请求任何元素(.none)。
  • 发布者必须尊重这个需求,不发送超过请求数量的元素。

Demand 的源码:

@frozen public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible {/// A request for as many values as the publisher can produce.public static let unlimited: Subscribers.Demand/// A request for no elements from the publisher.////// This is equivalent to `Demand.max(0)`.public static let none: Subscribers.Demand/// Creates a demand for the given maximum number of elements.////// The publisher is free to send fewer than the requested maximum number of elements.////// - Parameter value: The maximum number of elements. Providing a negative value for this parameter results in a fatal error.@inlinable public static func max(_ value: Int) -> Subscribers.Demand...
}

一个简单的例子:在5秒快速数据生成器可以生成50个值,但由于 Backpressure,实际上只处理了5个值。用 Backpressure 防止数据消费者被过多的数据淹没。

import Combine
import Foundation
import PlaygroundSupportclass FastDataProducer: Publisher {typealias Output = Inttypealias Failure = Neverprivate var current = 0private var timer: Timer?private var subscriber: AnySubscriber<Int, Never>?private class FastDataSubscription: Subscription {private var producer: FastDataProducer?init(producer: FastDataProducer) {self.producer = producer}func request(_ demand: Subscribers.Demand) {// 写需求}func cancel() {producer?.timer?.invalidate()producer?.timer = nilproducer?.subscriber = nilproducer = nil}}func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {self.subscriber = AnySubscriber(subscriber)subscriber.receive(subscription: FastDataSubscription(producer: self))timer = Timer.scheduledTimer(withTimeInterval: 0.1, repeats: true) { [weak self] _ inguard let self = self else { return }_ = self.subscriber?.receive(self.current)self.current += 1}}func stop() {timer?.invalidate()timer = nilsubscriber?.receive(completion: .finished)subscriber = nil}
}class SlowDataConsumer: Subscriber {typealias Input = Inttypealias Failure = Neverprivate var subscription: Subscription?func receive(subscription: Subscription) {self.subscription = subscriptionsubscription.request(.max(1))}func receive(_ input: Int) -> Subscribers.Demand {print("接收到值: \(input)")Thread.sleep(forTimeInterval: 1)return .max(1)}func receive(completion: Subscribers.Completion<Never>) {print("完成")}
}let producer = FastDataProducer()
let consumer = SlowDataConsumer()
producer.subscribe(consumer)DispatchQueue.main.asyncAfter(deadline: .now() + 5) {print("停止")producer.stop()
}// 我是用 playground 运行,就使用 PlaygroundPage 防止过早退出
PlaygroundPage.current.needsIndefiniteExecution = trueDispatchQueue.main.asyncAfter(deadline: .now() + 6) {PlaygroundPage.current.finishExecution()
}/*输出:
接收到值: 0
接收到值: 1
接收到值: 2
接收到值: 3
接收到值: 4
停止
完成
*/

Scheduler

在 Swift Combine 框架中, Secheduler 是一个协议,它定义了如何调度工作。 Seheduler 可以被用来控制事件的执行时机和执行线程。

public protocol Scheduler<SchedulerTimeType> {/// Describes an instant in time for this scheduler.associatedtype SchedulerTimeType : Strideable where Self.SchedulerTimeType.Stride : SchedulerTimeIntervalConvertible/// A type that defines options accepted by the scheduler.////// This type is freely definable by each `Scheduler`. Typically, operations that take a `Scheduler` parameter will also take `SchedulerOptions`.associatedtype SchedulerOptions/// This scheduler’s definition of the current moment in time.var now: Self.SchedulerTimeType { get }/// The minimum tolerance allowed by the scheduler./// 调度器容许的最小容差var minimumTolerance: Self.SchedulerTimeType.Stride { get }/// Performs the action at the next possible opportunity./// 在下一个可能的时机执行func schedule(options: Self.SchedulerOptions?, _ action: @escaping () -> Void)/// Performs the action at some time after the specified date./// 在指定日期之后的某个时间执行操作func schedule(after date: Self.SchedulerTimeType, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void)/// Performs the action at some time after the specified date, at the specified frequency, optionally taking into account tolerance if possible.  在指定日期后,以给定频率执行。如果可能的话,可选择考虑公差。func schedule(after date: Self.SchedulerTimeType, interval: Self.SchedulerTimeType.Stride, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void) -> any Cancellable
}

三个主要方法:

  1. schedule(options: _:):安排一个工作项在调度器上执行。
  2. schedule(after:options: _:):安排一个工作项在指定时间后执行。
  3. schedule(after:interval:options: _:):安排一个工作项在指定时间后,以指定的间隔重复执行。

常见的 Seheduler 类型

  • DispatchQueueScheduler:基于 DispatchQueue 的调度器,能够在指定的串行或并行队列上调度任务。适用于在主线程或后台线程上执行异步操作。

  • RunLoopScheduler:基于 RunLoop 的调度器,适用于需要与用户界面事件交互的场景。它可以在主线程的事件循环中调度任务,通常用于处理与用户交互相关的操作。

  • ImmediateScheduler:立即执行调度器,用于在当前调用堆栈中立即执行操作。这对于调试和测试非常有用,因为它允许在同步上下文中执行代码。

  • TimerScheduler:基于 Timer 的调度器,用于定时执行任务。通常用于周期性任务,比如定时更新或轮询。

  • CurrentValueSubjectScheduler:与 CurrentValueSubject 一起调度的调度器,允许在流中引入时间因素。

import Foundation
import Combinefinal class SchedulerExample {private var cancellables: Set<AnyCancellable> = []private let backgroundQueue = DispatchQueue(label: "com.example.background")// MARK: - 基础调度方法private func basicSchedulerExample() -> Future<Void, Never> {Future { promise inprint("\n🍎DispatchQueue Scheduler 示例")// 1. 主队列立即调度DispatchQueue.main.schedule {print("1️⃣ 主队列立即执行")print("   线程: \(Thread.current)")}// 2. 后台队列延迟调度let time = DispatchQueue.SchedulerTimeType(.now() + 1.0)self.backgroundQueue.schedule(after: time) {print("\n2️⃣ 后台队列延迟执行")print("   线程: \(Thread.current)")}// 3. 使用 Timer 进行周期性调度print("3️⃣ 开始周期性任务")Timer.publish(every: 1.0, on: .main, in: .common).autoconnect().prefix(2)  // 限制发出的元素数量为 2.sink { date inprint("   周期性任务触发: \(date)")print("   线程: \(Thread.current)")}.store(in: &self.cancellables)// 3秒后完成这个示例DispatchQueue.main.schedule(after: .init(.now() + 3)) {promise(.success(()))}}}// MARK: - RunLoopprivate func runLoopExample() -> Future<Void, Never> {Future { promise inprint("\n🍎RunLoop Scheduler 示例")// 1. 立即执行RunLoop.main.schedule {print("1️⃣ RunLoop 立即执行")print("   线程: \(Thread.current)")}// 2. 延迟执行let time = RunLoop.SchedulerTimeType(.now + 1.0)RunLoop.main.schedule(after: time) {print("\n2️⃣ RunLoop 延迟执行")print("   线程: \(Thread.current)")}// 2秒后完成DispatchQueue.main.schedule(after: .init(.now() + 2)) {promise(.success(()))}}}// MARK: - ImmediateScheduler 示例private func immediateExample() {print("🍎ImmediateScheduler 示例")ImmediateScheduler.shared.schedule {print("➡️ 同步立即执行")print("   线程: \(Thread.current)")}}// MARK: - CurrentValueSubjectprivate func currentValueSubjectExample() -> Future<Void, Never> {Future { promise inprint("\n🍎CurrentValueSubject 调度示例")let subject = CurrentValueSubject<Int, Never>(0)// 1. 在主队列上接收值subject.receive(on: DispatchQueue.main).sink { value inprint("1️⃣ 主队列接收到值: \(value)")print("   线程: \(Thread.current)")}.store(in: &self.cancellables)// 2. 在后台队列上接收值subject.receive(on: self.backgroundQueue).sink { value inprint("2️⃣ 后台队列接收到值: \(value)")print("   线程: \(Thread.current)")}.store(in: &self.cancellables)// 在不同时间发送值DispatchQueue.main.schedule(after: .init(.now() + 0.5)) {subject.send(1)}DispatchQueue.main.schedule(after: .init(.now() + 1.0)) {subject.send(2)}// 2秒后完成DispatchQueue.main.schedule(after: .init(.now() + 2)) {subject.send(completion: .finished)promise(.success(()))}}}func runAllExamples() {immediateExample()basicSchedulerExample().flatMap { self.runLoopExample() }.flatMap { self.currentValueSubjectExample() }.sink { _ inprint("\n✅ 所有示例执行完成")}.store(in: &cancellables)}
}let ep = SchedulerExample()
ep.runAllExamples()// 保持运行
RunLoop.main.run(until: Date(timeIntervalSinceNow: 6))/*输出:
🍎ImmediateScheduler 示例
➡️ 同步立即执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}🍎DispatchQueue Scheduler 示例
3️⃣ 开始周期性任务
1️⃣ 主队列立即执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}周期性任务触发: 2024-12-18 11:03:52 +0000线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}2️⃣ 后台队列延迟执行线程: <NSThread: 0x600001729a00>{number = 9, name = (null)}周期性任务触发: 2024-12-18 11:03:53 +0000线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}🍎RunLoop Scheduler 示例
1️⃣ RunLoop 立即执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}2️⃣ RunLoop 延迟执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}🍎CurrentValueSubject 调度示例
2️⃣ 后台队列接收到值: 0线程: <NSThread: 0x600001704580>{number = 5, name = (null)}
1️⃣ 主队列接收到值: 0线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}
1️⃣ 主队列接收到值: 1线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}
2️⃣ 后台队列接收到值: 1线程: <NSThread: 0x600001709b80>{number = 10, name = (null)}
1️⃣ 主队列接收到值: 2线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}
2️⃣ 后台队列接收到值: 2线程: <NSThread: 0x600001729a00>{number = 9, name = (null)}✅ 所有示例执行完成
*/

结语

Backpressure 和 Scheduler 是 Combine 框架中用于控制异步数据流的关键机制。通过掌握这些概念,开发者可以更好地管理数据流的速率和调度,提高应用的稳定性和性能。在下一篇文章中,我们将探索如何自定义 Publisher 和 Subscriber,以满足特定的应用需求。

  • Swift Combine 学习(六):自定义 Publisher 和 Subscriber

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/501697.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【paddle】初次尝试

张量 张量是 paddlepaddle&#xff0c; torch&#xff0c; tensorflow 等 python 主流机器学习包中唯一通货变量&#xff0c;因此应当了解其基本的功能。 张量 paddle.Tensor 与 numpy.array 的转化 import paddle as paddle import matplotlib.pyplot as plt apaddle.to_t…

VBA 64位API声明语句第005讲

跟我学VBA&#xff0c;我这里专注VBA, 授人以渔。我98年开始&#xff0c;从源码接触VBA已经20余年了&#xff0c;随着年龄的增长&#xff0c;越来越觉得有必要把这项技能传递给需要这项技术的职场人员。希望职场和数据打交道的朋友&#xff0c;都来学习VBA,利用VBA,起码可以提高…

Redis(二)value 的五种常见数据类型简述

目录 一、string&#xff08;字符串&#xff09; 1、raw 2、int 3、embstr 二、hash&#xff08;哈希表&#xff09; 1、hashtable 2、ziplist 三、list&#xff08;列表&#xff09; ​编辑 1、linkedlist 2、ziplist 3、quicklist&#xff08;redis 3.2后的列表内…

Linux硬盘分区 --- 挂载分区mount、卸载分区umount、永久挂载

四、挂载分区 1.查看分区信息 在挂载分区之前&#xff0c;需要先确定要挂载的分区设备名称。可以使用命令lsblk来查看系统中的所有块设备及分区情况。例如&#xff0c;可能会看到类似/dev/sda1、/dev/sdb2等的设备名称&#xff0c;它们分别代表不同的硬盘分区。 2.创建挂载点…

基于51单片机和16X16LED点阵屏(74HC138和74HC595驱动)的小游戏《贪吃蛇》

目录 系列文章目录前言一、效果展示二、原理分析三、各模块代码1、定时器02、自制八位独立按键3、点阵屏模块 四、主函数总结 系列文章目录 前言 《贪吃蛇》&#xff0c;一款经典的、怀旧的小游戏&#xff0c;单片机入门必写程序。 以《贪吃蛇》为载体&#xff0c;熟悉各种屏…

[Qt] Qt介绍 | 搭建SDK

目录 1. Qt 简介 什么是 Qt&#xff1f; 1.1 引入 1.2 GUI 1.3 Qt 介绍 2. Qt 发展史 3. Qt 支持的平台 4. Qt 版本信息 5. Qt 的优点 6. Qt 应用场景 7. Qt 成功案例 8. Qt 发展前景及就业分析 二. Qt 开发环境搭建 1. 开发工具概述 2.Qt SDK 安装 3.使用 1. …

mysql连接时报错1130-Host ‘hostname‘ is not allowed to connect to this MySQL server

不在mysql服务器上通过ip连接服务提示1130错误怎么回事呢。这个错误是因为在数据库服务器中的mysql数据库中的user的表中没有权限。 解决方案 查询mysql库的user表指定账户的连接方式 SELECT user, host FROM mysql.user;修改指定账户的host连接方式 update mysql.user se…

Elasticsearch: 高级搜索

这里写目录标题 一、match_all匹配所有文档1、介绍&#xff1a; 二、精确匹配1、term单字段精确匹配查询2、terms多字段精确匹配3、range范围查询4、exists是否存在查询5、ids根据一组id查询6、prefix前缀匹配7、wildcard通配符匹配8、fuzzy支持编辑距离的模糊查询9、regexp正则…

把vue项目或者vue组件发布成npm包或者打包成lib库文件本地使用

将vue项目发布成npm库文件&#xff0c;第三方通过npm依赖安装使用&#xff1b;使用最近公司接了一个项目&#xff0c;这个项目需要集成到第三方页面&#xff0c;在第三方页面点击项目名称&#xff0c;页面变成我们的项目页面&#xff1b;要求以npm库文件提供给他们&#xff1b;…

实现一个通用的树形结构构建工具

文章目录 1. 前言2. 树结构3. 具体实现逻辑3.1 TreeNode3.2 TreeUtils3.3 例子 4. 小结 1. 前言 树结构的生成在项目中应该都比较常见&#xff0c;比如部门结构树的生成&#xff0c;目录结构树的生成&#xff0c;但是大家有没有想过&#xff0c;如果在一个项目中有多个树结构&…

【新教程】华为昇腾NPU的pytorch环境搭建

1 硬件配置 使用学校的集群&#xff0c;相关配置如下&#xff1a; CPU&#xff1a;鲲鹏920 NPU&#xff1a;昇腾910B 操作系统&#xff1a;openEuler 22.03 2 安装版本 根据昇腾官方gitee上的信息&#xff0c;Pytoch 2.1.0是长期支持版本&#xff0c;因此选择安装这一版本&a…

在Ubuntu 18.04.6 LTS安装OpenFace流程

一、修改配置:将gcc8&#xff0c;g8作为默认选项 sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-8 100 sudo update-alternatives --config gcc 选择版本&#xff0c;再查看gcc --version sudo update-alternatives --install /usr/bin/g g /usr/bin/g-…

typescript安装后仍然不能使用tsc,如何解决

1.全局安装 npm i typescript -g 2.发现仍然不行 解决方法&#xff1a; C:\Users\你的用户名\AppData\Roaming\npm解决办法&#xff1a; 1.确定对应的文件下载了 我们发现typescript是下载了的 2.设置环境变量的path 路径为typescript下的npm 3.cmd运行

硬件-射频-PCB-常见天线分类-ESP32实例

文章目录 一&#xff1a;常见天线1.1 PCB天线①蓝牙模块的蛇形走线-天线②倒F天线-IFA&#xff1a;③蛇形倒F天线-MIFA④立体的倒F天线-PIFA 1.2 实例示意图1.21 对数周期天线(LPDA):1.22 2.4GHZ的八木天线&#xff1a;1.23 陶瓷天线&#xff1a;1.24 外接天线&#xff1a; 二&…

PCA降维算法详细推导

关于一个小小的PCA的推导 文章目录 关于一个小小的PCA的推导1 谱分解 (spectral decomposition)2 奇异矩阵(singular matrix)3 酉相似(unitary similarity)4 酉矩阵5 共轭变换6 酉等价7 矩阵的迹的计算以及PCA算法推导8 幂等矩阵(idempotent matrix)9 Von Neumanns 迹不等式 [w…

25.1.3

java数组&#xff1a; dataType[] arrayRefVar //推荐写法 //int[] mylist //或 dataType arrayRefVar[] //int mylist[]创建数组对象&#xff1a; arrayRefVar new dataType[arraySize]; dataType[] arrayRefVar new dataType[arraySize];for-each循环&#xff1a; jav…

音频进阶学习九——离散时间傅里叶变换DTFT

文章目录 前言一、DTFT的解释1.DTFT公式2.DTFT右边释义1&#xff09; 复指数 e − j ω n e^{-j\omega n} e−jωn2&#xff09;序列与复指数相乘 x [ n ] ∗ e − j ω n x[n]*e^{-j\omega n} x[n]∗e−jωn复指数序列复数的共轭正交正交集 3&#xff09;复指数序列求和 3.DTF…

【保姆级】sql注入之堆叠注入

一、堆叠注入的原理 mysql数据库sql语句的默认结束符是以";"号结尾&#xff0c;在执行多条sql语句时就要使用结束符隔 开,而堆叠注入其实就是通过结束符来执行多条sql语句 比如我们在mysql的命令行界面执行一条查询语句,这时语句的结尾必须加上分号结束 select * fr…

我的桌面 1.9.75 | 个性化定制手机桌面,丰富的小组件和主题

我的桌面iScreen是一款万能桌面小组件APP&#xff0c;提供各种高颜值桌面主题与创意小组件自由组合。支持X面板、照片、待办清单、时钟、日历等实用有趣的小组件。拥有超过500种小组件供选择&#xff0c;包括灵动面板、滚动相册等&#xff0c;搭配300多种精美主题和高清壁纸&am…

汽车燃油软件标定测试

油箱测试 确定油箱的参数&#xff1a; 总容积&#xff0c;额定容积&#xff0c;不可用容积等。油泵测试&#xff08;静态&#xff09; 分为加油测试&#xff0c;减油测试&#xff0c;1L或者500ml增减&#xff1b; 分别测试油泵的阻值输出&#xff0c;类似&#xff1a; 油量 阻…