- Swift Combine 学习(一):Combine 初印象
- Swift Combine 学习(二):发布者 Publisher
- Swift Combine 学习(三):Subscription和 Subscriber
- Swift Combine 学习(四):操作符 Operator
- Swift Combine 学习(五):Backpressure和 Scheduler
- Swift Combine 学习(六):自定义 Publisher 和 Subscriber
- Swift Combine 学习(七):实践应用场景举例
文章目录
- 引言
- 订阅的生命周期
- Backpressure
- Scheduler
- 结语
引言
在前面的文章中,已经介绍了 Combine 的基础概念、订阅机制和操作符的使用。本文将深入探讨 Combine 中的异步流程控制,包括 Backpressure 和 Scheduler。这些概念对于编写健壮的异步应用程序非常重要。
订阅的生命周期
Combine 中的订阅遵循以下生命周期:
- 创建订阅:当调用
Publisher
的subscribe(_:)
方法时,创建一个新的订阅。 - 请求值:订阅者通过
Subscription
的request(_:)
方法请求值。 - 接收值:发布者通过调用订阅者的
receive(_:)
方法发送值。 - 完成或取消:发布者通过两种方式终止:
- 调用
receive(completion:)
方法表示完成(成功或失败) - 订阅者调用
cancel()
方法取消订阅。
- 调用
Backpressure
Combine 的 Backpressure 技术用于控制发布者(Publisher)向订阅者(Subscriber)发送数据的速率,防止订阅者因处理能力不足而被过多的数据淹没掉。Backpressure 本质上就是一种流量控制机制,确保系统在高负载或高并发情况下仍然能正常工作。在 Combine 中,Backpressure 通过 Subscribers.Demand
来处理:
Subscribers.Demand
允许订阅者精确控制它希望接收的元素数量。- 订阅者可以请求有限数量的元素(如
.max(n)
)、无限元素(.unlimited
),或者不请求任何元素(.none
)。 - 发布者必须尊重这个需求,不发送超过请求数量的元素。
Demand 的源码:
@frozen public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible {/// A request for as many values as the publisher can produce.public static let unlimited: Subscribers.Demand/// A request for no elements from the publisher.////// This is equivalent to `Demand.max(0)`.public static let none: Subscribers.Demand/// Creates a demand for the given maximum number of elements.////// The publisher is free to send fewer than the requested maximum number of elements.////// - Parameter value: The maximum number of elements. Providing a negative value for this parameter results in a fatal error.@inlinable public static func max(_ value: Int) -> Subscribers.Demand...
}
一个简单的例子:在5秒快速数据生成器可以生成50个值,但由于 Backpressure,实际上只处理了5个值。用 Backpressure 防止数据消费者被过多的数据淹没。
import Combine
import Foundation
import PlaygroundSupportclass FastDataProducer: Publisher {typealias Output = Inttypealias Failure = Neverprivate var current = 0private var timer: Timer?private var subscriber: AnySubscriber<Int, Never>?private class FastDataSubscription: Subscription {private var producer: FastDataProducer?init(producer: FastDataProducer) {self.producer = producer}func request(_ demand: Subscribers.Demand) {// 写需求}func cancel() {producer?.timer?.invalidate()producer?.timer = nilproducer?.subscriber = nilproducer = nil}}func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {self.subscriber = AnySubscriber(subscriber)subscriber.receive(subscription: FastDataSubscription(producer: self))timer = Timer.scheduledTimer(withTimeInterval: 0.1, repeats: true) { [weak self] _ inguard let self = self else { return }_ = self.subscriber?.receive(self.current)self.current += 1}}func stop() {timer?.invalidate()timer = nilsubscriber?.receive(completion: .finished)subscriber = nil}
}class SlowDataConsumer: Subscriber {typealias Input = Inttypealias Failure = Neverprivate var subscription: Subscription?func receive(subscription: Subscription) {self.subscription = subscriptionsubscription.request(.max(1))}func receive(_ input: Int) -> Subscribers.Demand {print("接收到值: \(input)")Thread.sleep(forTimeInterval: 1)return .max(1)}func receive(completion: Subscribers.Completion<Never>) {print("完成")}
}let producer = FastDataProducer()
let consumer = SlowDataConsumer()
producer.subscribe(consumer)DispatchQueue.main.asyncAfter(deadline: .now() + 5) {print("停止")producer.stop()
}// 我是用 playground 运行,就使用 PlaygroundPage 防止过早退出
PlaygroundPage.current.needsIndefiniteExecution = trueDispatchQueue.main.asyncAfter(deadline: .now() + 6) {PlaygroundPage.current.finishExecution()
}/*输出:
接收到值: 0
接收到值: 1
接收到值: 2
接收到值: 3
接收到值: 4
停止
完成
*/
Scheduler
在 Swift Combine 框架中, Secheduler 是一个协议,它定义了如何调度工作。 Seheduler 可以被用来控制事件的执行时机和执行线程。
public protocol Scheduler<SchedulerTimeType> {/// Describes an instant in time for this scheduler.associatedtype SchedulerTimeType : Strideable where Self.SchedulerTimeType.Stride : SchedulerTimeIntervalConvertible/// A type that defines options accepted by the scheduler.////// This type is freely definable by each `Scheduler`. Typically, operations that take a `Scheduler` parameter will also take `SchedulerOptions`.associatedtype SchedulerOptions/// This scheduler’s definition of the current moment in time.var now: Self.SchedulerTimeType { get }/// The minimum tolerance allowed by the scheduler./// 调度器容许的最小容差var minimumTolerance: Self.SchedulerTimeType.Stride { get }/// Performs the action at the next possible opportunity./// 在下一个可能的时机执行func schedule(options: Self.SchedulerOptions?, _ action: @escaping () -> Void)/// Performs the action at some time after the specified date./// 在指定日期之后的某个时间执行操作func schedule(after date: Self.SchedulerTimeType, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void)/// Performs the action at some time after the specified date, at the specified frequency, optionally taking into account tolerance if possible. 在指定日期后,以给定频率执行。如果可能的话,可选择考虑公差。func schedule(after date: Self.SchedulerTimeType, interval: Self.SchedulerTimeType.Stride, tolerance: Self.SchedulerTimeType.Stride, options: Self.SchedulerOptions?, _ action: @escaping () -> Void) -> any Cancellable
}
三个主要方法:
schedule(options: _:)
:安排一个工作项在调度器上执行。schedule(after:options: _:)
:安排一个工作项在指定时间后执行。schedule(after:interval:options: _:)
:安排一个工作项在指定时间后,以指定的间隔重复执行。
常见的 Seheduler 类型
-
DispatchQueueScheduler:基于
DispatchQueue
的调度器,能够在指定的串行或并行队列上调度任务。适用于在主线程或后台线程上执行异步操作。 -
RunLoopScheduler:基于
RunLoop
的调度器,适用于需要与用户界面事件交互的场景。它可以在主线程的事件循环中调度任务,通常用于处理与用户交互相关的操作。 -
ImmediateScheduler:立即执行调度器,用于在当前调用堆栈中立即执行操作。这对于调试和测试非常有用,因为它允许在同步上下文中执行代码。
-
TimerScheduler:基于
Timer
的调度器,用于定时执行任务。通常用于周期性任务,比如定时更新或轮询。 -
CurrentValueSubjectScheduler:与
CurrentValueSubject
一起调度的调度器,允许在流中引入时间因素。
import Foundation
import Combinefinal class SchedulerExample {private var cancellables: Set<AnyCancellable> = []private let backgroundQueue = DispatchQueue(label: "com.example.background")// MARK: - 基础调度方法private func basicSchedulerExample() -> Future<Void, Never> {Future { promise inprint("\n🍎DispatchQueue Scheduler 示例")// 1. 主队列立即调度DispatchQueue.main.schedule {print("1️⃣ 主队列立即执行")print(" 线程: \(Thread.current)")}// 2. 后台队列延迟调度let time = DispatchQueue.SchedulerTimeType(.now() + 1.0)self.backgroundQueue.schedule(after: time) {print("\n2️⃣ 后台队列延迟执行")print(" 线程: \(Thread.current)")}// 3. 使用 Timer 进行周期性调度print("3️⃣ 开始周期性任务")Timer.publish(every: 1.0, on: .main, in: .common).autoconnect().prefix(2) // 限制发出的元素数量为 2.sink { date inprint(" 周期性任务触发: \(date)")print(" 线程: \(Thread.current)")}.store(in: &self.cancellables)// 3秒后完成这个示例DispatchQueue.main.schedule(after: .init(.now() + 3)) {promise(.success(()))}}}// MARK: - RunLoopprivate func runLoopExample() -> Future<Void, Never> {Future { promise inprint("\n🍎RunLoop Scheduler 示例")// 1. 立即执行RunLoop.main.schedule {print("1️⃣ RunLoop 立即执行")print(" 线程: \(Thread.current)")}// 2. 延迟执行let time = RunLoop.SchedulerTimeType(.now + 1.0)RunLoop.main.schedule(after: time) {print("\n2️⃣ RunLoop 延迟执行")print(" 线程: \(Thread.current)")}// 2秒后完成DispatchQueue.main.schedule(after: .init(.now() + 2)) {promise(.success(()))}}}// MARK: - ImmediateScheduler 示例private func immediateExample() {print("🍎ImmediateScheduler 示例")ImmediateScheduler.shared.schedule {print("➡️ 同步立即执行")print(" 线程: \(Thread.current)")}}// MARK: - CurrentValueSubjectprivate func currentValueSubjectExample() -> Future<Void, Never> {Future { promise inprint("\n🍎CurrentValueSubject 调度示例")let subject = CurrentValueSubject<Int, Never>(0)// 1. 在主队列上接收值subject.receive(on: DispatchQueue.main).sink { value inprint("1️⃣ 主队列接收到值: \(value)")print(" 线程: \(Thread.current)")}.store(in: &self.cancellables)// 2. 在后台队列上接收值subject.receive(on: self.backgroundQueue).sink { value inprint("2️⃣ 后台队列接收到值: \(value)")print(" 线程: \(Thread.current)")}.store(in: &self.cancellables)// 在不同时间发送值DispatchQueue.main.schedule(after: .init(.now() + 0.5)) {subject.send(1)}DispatchQueue.main.schedule(after: .init(.now() + 1.0)) {subject.send(2)}// 2秒后完成DispatchQueue.main.schedule(after: .init(.now() + 2)) {subject.send(completion: .finished)promise(.success(()))}}}func runAllExamples() {immediateExample()basicSchedulerExample().flatMap { self.runLoopExample() }.flatMap { self.currentValueSubjectExample() }.sink { _ inprint("\n✅ 所有示例执行完成")}.store(in: &cancellables)}
}let ep = SchedulerExample()
ep.runAllExamples()// 保持运行
RunLoop.main.run(until: Date(timeIntervalSinceNow: 6))/*输出:
🍎ImmediateScheduler 示例
➡️ 同步立即执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}🍎DispatchQueue Scheduler 示例
3️⃣ 开始周期性任务
1️⃣ 主队列立即执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}周期性任务触发: 2024-12-18 11:03:52 +0000线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}2️⃣ 后台队列延迟执行线程: <NSThread: 0x600001729a00>{number = 9, name = (null)}周期性任务触发: 2024-12-18 11:03:53 +0000线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}🍎RunLoop Scheduler 示例
1️⃣ RunLoop 立即执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}2️⃣ RunLoop 延迟执行线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}🍎CurrentValueSubject 调度示例
2️⃣ 后台队列接收到值: 0线程: <NSThread: 0x600001704580>{number = 5, name = (null)}
1️⃣ 主队列接收到值: 0线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}
1️⃣ 主队列接收到值: 1线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}
2️⃣ 后台队列接收到值: 1线程: <NSThread: 0x600001709b80>{number = 10, name = (null)}
1️⃣ 主队列接收到值: 2线程: <_NSMainThread: 0x600001708000>{number = 1, name = main}
2️⃣ 后台队列接收到值: 2线程: <NSThread: 0x600001729a00>{number = 9, name = (null)}✅ 所有示例执行完成
*/
结语
Backpressure 和 Scheduler 是 Combine 框架中用于控制异步数据流的关键机制。通过掌握这些概念,开发者可以更好地管理数据流的速率和调度,提高应用的稳定性和性能。在下一篇文章中,我们将探索如何自定义 Publisher 和 Subscriber,以满足特定的应用需求。
- Swift Combine 学习(六):自定义 Publisher 和 Subscriber