目录
- 一、概述
- 1.1 动机
- 1.2 核心思想
- 1.3 别名
- 二、角色与实现原理
- 2.1 角色
- 2.2 实现原理
- 2.3 类图
- 三、经典接口实现
- 3.1 示例
- 3.1.1 观察者接口
- 3.1.2 目标接口
- 3.1.3 具体被观察者
- 3.1.4 具体观察者
- 3.1.5 Client
- 3.1.6 UML时序图
- 3.2 特点
- 四、其他实现方式
- 4.1 委托与事件(.NET 原生实现)
- 4.1.1 示例
- 4.1.2 UML类图
- 4.1.3 特点
- 4.1.4 适用场景
- 4.2 IObservable<T> 和 IObserver<T> 接口
- 4.2.1 接口概述
- 4.2.1.1 被观察者接口 : IObservable<out T>
- 4.2.1.2 观察者接口 : IObserver<in T>
- 4.2.2 示例
- 4.2.2.1 具体被观察者Subject:实现 IObservable<T>
- 4.2.2.1.1 订阅管理 (`Subscribe` 方法)
- 4.2.2.1.2 取消订阅 (`Unsubscriber`类)
- 4.2.2.1.3 状态通知 (`NotifyObservers` 方法)
- 4.2.2.1.4 完成与错误通知 (`OnCompleted` 和 `OnError`)
- 4.2.2.1.5 线程安全设计
- 4.2.2.1.6 全部代码
- 4.2.3 UML类图
- 4.2.4 扩展内容
- 4.2.4.1 异步通知
- 4.2.4.2 事件过滤
- 4.2.5 特点
- 4.2.6 适用场景
- 4.3 System.Reactive
- 4.3.1 安装
- 4.3.2 示例
- 4.3.3 特点
- 4.3.4 适用场景
- 五、使用场景
- 六、扩展
- 6.1 初始化和基础架构搭建
- 6.1.1 初始化和基础架构搭建
- 6.1.1.1 一个观察者观察多个目标
- 6.1.1.2 **目标多而观察者少**
- 6.1.1.3 **目标对象之间存在复杂依赖关系**
- 6.2 注册机制
- 6.2.1 问题
- 6.2.2 解决方案
- 6.2.2.1 实现思路
- 6.2.2.2 示例
- 6.2.2.3 优点
- 6.2.2.4 适用场景
- 6.3 触发机制
- 6.3.1 触发之前
- 6.3.2 更新的触发者
- 6.3.3 示例
- 6.3.3.1 抽象目标对象
- 6.3.3.2 观察者接口
- 6.3.3.3 具体的的观察者
- 6.3.3.4 自动触发的目标对象
- 6.3.3.5 手动触发的目标对象
- 6.3.3.6 使用示例
- 6.4 **信息传递机制**
- 6.4.1 解决方案与实现方式
- 6.4.1.1 推模型(Push Model)
- 6.4.1.2 拉模型(Pull Model)
- 6.4.1.3 设计对比与权衡
- 6.4.1.4 最佳实践建议
- 6.5 资源管理和错误处理
-
参考
设计模式:可复用面向对象软件的基础(典藏版) - 埃里克·伽玛 - 微信读书
第七章:C#响应式编程System.Reactive - 平元兄 - 博客园
ReactiveX · ReactiveX文档中文翻译
-
项目地址
NitasDemo/10DesignPatterns/DesignPatterns/ObserverPattern at main · Nita121388/NitasDemo
一、概述
观察者模式是一种行为设计模式
,用于定义对象之间的一对多依赖关系。当一个对象(被观察者)的状态发生改变时,所有依赖于它的对象(观察者)都会自动得到通知并更新。
1.1 动机
将一个系统分割成一系列相互协作的类有一个常见的副作用:需要维护相关对象间的一致性。我们不希望为了维持一致性而使各类紧密耦合,因为这样降低了其可复用性。
1.2 核心思想
- 解耦:将被观察者与观察者解耦,使它们之间通过接口或事件机制交互。
- 自动通知:当被观察者状态改变时,自动通知所有观察者。
1.3 别名
-
依赖模式:强调对象间的依赖关系。
-
发布–订阅模式:发布者将消息发送给多个订阅者。
-
模型-视图模式:模型变化时,视图自动更新。
-
源-监听器模式:事件源触发事件后通知所有监听器。
-
从属者模式:从属者依赖于其他对象的状态变化。
这些别名都体现了观察者模式的核心思想:定义对象间的一对多依赖关系,实现状态变化的自动通知。
二、角色与实现原理
2.1 角色
- Subject(目标/主题/被观察者)
- 维护一个观察者列表,允许观察者订阅或取消订阅。
- 当状态改变时,通知所有观察者。
- Observer(观察者)
- 观察者将对观察目标的改变做出反应。观察者一般定义为接口,该接口声明了更新数据的方法update(),因此又称为抽象观察者。
- ConcreteSubject(具体被观察者)
- 实现Subject接口,维护自身状态,并在状态改变时通知观察者。
- ConcreteObserver(具体观察者)
- 实现Observer接口,根据被观察者的状态改变做出相应反应。
2.2 实现原理
- 被观察者维护观察者列表
- 被观察者类中包含一个观察者列表,用于存储所有订阅的观察者对象。
- 被观察者类可以注册与注销观察者,提供方法允许观察者对象注册到被观察者列表中,或从列表中注销。
- 通知机制
- 当被观察者状态改变时,遍历观察者列表,调用每个观察者的更新方法。
2.3 类图
三、经典接口实现
Gang of Four(GoF)是指四位著名软件设计模式专家(Erich Gamma、Richard Helm、Ralph Johnson 和 John Vlissides)在1994年出版的《设计模式》。
GoF模式本质*:通过接口规范化观察者模式中的角色职责,强调设计契约优先,适用于需要长期维护、高可扩展性的复杂系统架构设计。*
核心思想:通过显式接口定义观察者和主题的关系。
3.1 示例
3.1.1 观察者接口
// 观察者接口
public interface IObserver
{void Update(string message);
}
3.1.2 目标接口
// 目标接口
public interface ISubject
{void Attach(IObserver observer);void Detach(IObserver observer);void Notify();
}
3.1.3 具体被观察者
// 具体目标
public class ConcreteSubject : ISubject
{// 观察者列表private List<IObserver> _observers = new();// 目标状态private string _state;// 注册观察者: 将观察者对象注册到目标对象中public void Attach(IObserver observer) => _observers.Add(observer);// 注销观察者: 移除一个观察者public void Detach(IObserver observer) => _observers.Remove(observer);// 通知观察者: 改变目标对象的状态,触发通知public void Notify(){foreach (var observer in _observers)observer.Update(_state);}// 设置目标状态: 改变目标对象的状态public void SetState(string state){_state = state;Notify();}
}
3.1.4 具体观察者
// 具体观察者
public class ConcreteObserver : IObserver
{// 接收通知并处理public void Update(string message) => Console.WriteLine($"Received: {message}");
}
3.1.5 Client
using System;class Program
{static void Main(string[] args){// 创建具体目标对象ConcreteSubject subject = new ConcreteSubject();// 创建多个具体观察者对象ConcreteObserver observer1 = new ConcreteObserver();ConcreteObserver observer2 = new ConcreteObserver();ConcreteObserver observer3 = new ConcreteObserver();// 将观察者对象注册到目标对象中subject.Attach(observer1);subject.Attach(observer2);subject.Attach(observer3);// 改变目标对象的状态,触发通知Console.WriteLine("第一次状态更新:");subject.SetState("Hello, Observers!");// 移除一个观察者subject.Detach(observer2);// 再次改变目标对象的状态,触发通知Console.WriteLine("\n第二次状态更新:");subject.SetState("State has changed!");}
}
结果:
第一次状态更新:
Received: Hello, Observers!
Received: Hello, Observers!
Received: Hello, Observers!第二次状态更新:
Received: State has changed!
Received: State has changed!
3.1.6 UML时序图
3.2 特点
- 符合设计模式原生定义,代码结构清晰。
- 强类型约束,编译时检查接口实现。
- 对语言无特殊要求,通用性强。
- 显式依赖关系,逻辑透明。
- 适用于简单场景,无需框架支持。
四、其他实现方式
4.1 委托与事件(.NET 原生实现)
- 机制:利用语言或框架提供的事件监听机制,被观察者触发事件,观察者通过监听器接收事件。
4.1.1 示例
public class EventSubject
{public event EventHandler<string> StateChanged;private string _state;public void SetState(string state){_state = state;StateChanged?.Invoke(this, _state);}
}public class EventObserver
{public void Subscribe(EventSubject subject){subject.StateChanged += HandleStateChange;}private void HandleStateChange(object sender, string message){Console.WriteLine($"Event received: {message}");}public void Unsubscribe(EventSubject subject){subject.StateChanged -= HandleStateChange;}
}
class Program{static void Main(string[] args){// 创建 EventSubject 和 EventObserver 对象EventSubject subject = new EventSubject();EventObserver observer = new EventObserver();// 订阅事件observer.Subscribe(subject);Console.WriteLine("Observer has subscribed to the subject.");// 改变状态,触发事件subject.SetState("State 1");subject.SetState("State 2");// 取消订阅observer.Unsubscribe(subject);Console.WriteLine("Observer has unsubscribed from the subject.");// 再次改变状态,观察是否还会触发事件subject.SetState("State 3"); // 不会触发事件,因为已取消订阅}}
结果
Observer has subscribed to the subject.
Event received: State 1
Event received: State 2
Observer has unsubscribed from the subject.
4.1.2 UML类图
4.1.3 特点
- 代码更加简洁,轻量级,利用语言的内置特性,减少了手动管理观察者列表的复杂性。
- 内置线程安全的事件触发机制(?.Invoke)
- 支持多播(多个观察者订阅同一事件)
- 对于一些复杂的业务逻辑,可能无法完全满足需求,因为事件机制通常是基于固定的事件类型和参数进行设计的,不够灵活。
- 而且如果事件的定义不合理,可能会导致系统的可扩展性和维护性变差。
- 无法跨模块解耦(需直接访问事件)。
4.1.4 适用场景
- GUI 事件处理(如按钮点击)。
- 单模块内的局部解耦。
- 适合简单通知逻辑且不涉及复杂数据流的场景。
4.2 IObservable 和 IObserver 接口
核心思想:使用.NET框架内置的观察者模式标准化接口。
4.2.1 接口概述
接口 | 角色 | 职责 |
---|---|---|
`IObservable<T>` | 被观察对象 | 数据/事件的生产者 |
`IObserver<T>` | 观察者 | 数据/事件的消费者 |
4.2.1.1 被观察者接口 : IObservable
namespace System
{/// <summary>/// 定义了一个基于推送的事件通知提供者/ 被观察者/// </summary>/// <typeparam name="T">提供通知信息的对象类型。</typeparam>public interface IObservable<out T>{/// <summary>/// 通知提供者有一个观察者将要接收通知。/// </summary>/// observer">将要接收通知的对象。/// <returns>一个接口引用,允许观察者在提供者完成发送通知之前停止接收通知。</returns>IDisposable Subscribe(IObserver<T> observer);}
}
IObservable<T>
是一个接口,属于 C# 中的事件驱动编程模型,是响应式编程(Reactive Programming)的核心接口之一。
它定义了一个基于推送的事件通知机制,允许观察者(IObserver<T>
)订阅
通知源(IObservable<T>
),并在通知源产生数据或事件时接收通知。
- 泛型参数
T
:表示通知中携带的数据类型。 - Subscribe方法:是
IObservable<T>
的核心方法。- 它接收一个实现了
IObserver<T>
接口的对象作为参数,表示观察者。 - 当调用
Subscribe
方法时,观察者会注册到通知源,从而能够接收通知。 - 方法返回一个
IDisposable
对象,观察者可以通过调用其Dispose
方法来取消订阅,停止接收通知。
- 它接收一个实现了
4.2.1.2 观察者接口 : IObserver
namespace System
{/// <summary>/// 提供一种接收基于推送的通知的机制。//观察者/// </summary>/// <typeparam name="T">提供通知信息的对象类型。</typeparam>public interface IObserver<in T>{/// <summary>/// 向观察者提供新的数据。/// </summary>/// <param name="value">当前的通知信息。</param>void OnNext(T value);/// <summary>/// 通知观察者提供者遇到了错误条件。/// </summary>/// <param name="error">一个提供有关错误的额外信息的对象。</param>void OnError(Exception error);/// <summary>/// 通知观察者提供者已经完成发送基于推送的通知。/// </summary>void OnCompleted();}
}
IObserver<T>
它定义了一个观察者的角色,用于接收来自通知源(IObservable<T>
)的推送通知。
- 泛型参数
T
:表示通知中携带的数据类型。 OnNext
方法:当通知源有新的数据可用时,调用此方法向观察者传递数据。参数value
是当前的通知信息。OnError
方法:当通知源在发送通知过程中遇到错误时,调用此方法通知观察者。参数error
是一个Exception
对象,提供有关错误的详细信息。OnCompleted
方法:当通知源完成所有通知的发送后,调用此方法通知观察者。这表示通知源不会再发送任何新的通知。
4.2.2 示例
4.2.2.1 具体被观察者Subject:实现 IObservable
4.2.2.1.1 订阅管理 (Subscribe
方法)
public IDisposable Subscribe(IObserver<string> observer) {lock (_lock) {if (!_observers.Contains(observer)) {_observers.Add(observer);}}return new Unsubscriber(_observers, observer, _lock);
}
-
功能:允许观察者订阅主题。
-
线程安全:通过
lock
确保多线程下订阅操作的原子性。 -
防止重复订阅:检查观察者是否已存在。
-
返回
Unsubscriber
:通过IDisposable
实现优雅的取消订阅机制。关于
IDisposable
,可以查看我的另一篇文章C#中的非托管资源释放机制详解|Finalizer与Dispose模式-CSDN博客。
4.2.2.1.2 取消订阅 (Unsubscriber
类)
private class Unsubscriber : IDisposable {// ... 略去字段和构造函数 ...public void Dispose() {lock (_lock) {if (_observer != null && _observers.Contains(_observer)) {_observers.Remove(_observer);_observer = null;}}}
}
- 功能:调用
Dispose()
时从观察者列表中移除目标观察者。 - 资源释放:移除后置空引用,避免内存泄漏。
- 线程安全:通过
lock
确保取消订阅的原子性。
4.2.2.1.3 状态通知 (NotifyObservers
方法)
public void NotifyObservers(string state) {lock (_lock) {foreach (var observer in _observers) {observer.OnNext(state);}}
}
- 功能:遍历所有观察者,调用其
OnNext
方法推送新状态。 - 线程安全:遍历期间锁定列表,防止并发修改。
4.2.2.1.4 完成与错误通知 (OnCompleted
和 OnError
)
public void OnCompleted() {lock (_lock) {foreach (var observer in _observers) {observer.OnCompleted();}_observers.Clear();}
}public void OnError(Exception error) {lock (_lock) {foreach (var observer in _observers) {observer.OnError(error);}_observers.Clear();}
}
- 完成通知:调用所有观察者的
OnCompleted()
,清空列表(终止后续通知)。 - 错误通知:调用所有观察者的
OnError()
,清空列表。 - 线程安全:全程加锁。
4.2.2.1.5 线程安全设计
- 锁对象
_lock
:所有对观察者列表的操作(增、删、遍历)均通过lock (_lock)
确保原子性。 - 场景覆盖:
- 多线程同时订阅/取消订阅。
- 通知过程中触发新的订阅/取消订阅。
4.2.2.1.6 全部代码
-
具体目标
using System; using System.Collections.Generic; using System.Threading;// Subject 类实现了 IObservable<string> 接口,用于管理观察者并通知状态变化 public class Subject : IObservable<string> {// 用于存储所有订阅的观察者private List> _observers = new();// 用于线程安全的锁对象private readonly object _lock = new();// 订阅方法,允许观察者订阅状态变化public IDisposable Subscribe(IObserver observer){lock (_lock) // 确保线程安全{if (!_observers.Contains(observer)) // 防止重复订阅{_observers.Add(observer);}}// 返回一个 Unsubscriber 对象,用于取消订阅return new Unsubscriber(_observers, observer, _lock);}// Unsubscriber 类实现了 IDisposable 接口,用于取消观察者的订阅private class Unsubscriber : IDisposable{private List> _observers;private IObserver _observer;private readonly object _lock;// 构造函数,初始化观察者列表、当前观察者和锁对象public Unsubscriber(List> observers, IObserver observer, object lockObj){_observers = observers;_observer = observer;_lock = lockObj;}// Dispose 方法用于取消订阅public void Dispose(){lock (_lock) // 确保线程安全{if (_observer != null && _observers.Contains(_observer)){_observers.Remove(_observer); // 从观察者列表中移除当前观察者_observer = null; // 清空当前观察者引用}}}}// SetState 方法用于设置状态并通知所有观察者public void NotifyObservers(string state){lock (_lock) // 确保线程安全{foreach (var observer in _observers){observer.OnNext(state); // 调用观察者的 OnNext 方法通知状态变化}}}// OnCompleted 方法用于通知所有观察者完成事件public void OnCompleted(){lock (_lock) // 确保线程安全{foreach (var observer in _observers){observer.OnCompleted(); // 调用观察者的 OnCompleted 方法通知完成事件}_observers.Clear(); // 清空观察者列表}}// OnError 方法用于通知所有观察者发生错误public void OnError(Exception error){lock (_lock) // 确保线程安全{foreach (var observer in _observers){observer.OnError(error); // 调用观察者的 OnError 方法通知错误事件}_observers.Clear(); // 清空观察者列表}} }
-
具体观察者
ConcreteObserver
类实现了IObserver<string>
接口,用于接收被观察者的状态变化通知。OnNext
方法:接收状态变化通知,并输出状态信息。OnError
方法:接收错误通知,并输出错误信息。OnCompleted
方法:接收完成通知,并输出完成信息。
// ConcreteObserver 类实现了 IObserver<string> 接口,用于接收状态变化通知 public class ConcreteObserver : IObserver<string> {// 观察者的名称,用于区分不同的观察者private readonly string _name;// 构造函数,初始化观察者名称public ConcreteObserver(string name){_name = name;}// OnNext 方法用于接收状态变化通知public void OnNext(string value){Console.WriteLine($"{_name} received: {value}"); // 输出接收到的状态信息}// OnError 方法用于接收错误通知public void OnError(Exception error){Console.WriteLine($"{_name} received an error: {error.Message}"); // 输出错误信息}// OnCompleted 方法用于接收完成通知public void OnCompleted(){Console.WriteLine($"{_name} received completion notification."); // 输出完成通知} }
-
客户端使用实例(Client)
using System;namespace IObservableTDemo {class Program{static void Main(string[] args){// 1. 创建被观察者和观察者Subject subject = new Subject();ConcreteObserver observer1 = new ConcreteObserver("observer 1");ConcreteObserver observer2 = new ConcreteObserver("observer 2");// 2. 订阅观察者IDisposable subscription1 = subject.Subscribe(observer1);IDisposable subscription2 = subject.Subscribe(observer2);// 状态通知subject.NotifyObservers("Hello, World!");// 取消订阅 observer2subscription2.Dispose();// 再次设置状态,观察者1会收到通知,观察者2不会收到subject.NotifyObservers("Hello again!");// 模拟错误 此时会清空观察者列表subject.OnError(new Exception("Something went wrong!"));// 再次设置状态,观察者1和观察者2都不会收到通知subject.NotifyObservers("Hello again!");// 再次订阅观察者IDisposable subscription3 = subject.Subscribe(observer1);// 再次设置状态,观察者1收到通知subject.NotifyObservers("Hello again!");// 完成通知subject.OnCompleted();//再次设置状态,都不会收到通知subject.NotifyObservers("Hello again!");// 等待用户输入后退出Console.WriteLine("Press any key to exit...");Console.ReadKey();}} }
-
结果
Observer 1 received: Hello, World! Observer 2 received: Hello, World! Observer 1 received: Hello again! Observer 1 received an error: Something went wrong! Observer 3 received: Hello again! Observer 3 received completion notification. Press any key to exit...
4.2.3 UML类图
4.2.4 扩展内容
4.2.4.1 异步通知
可以通过Task
或async/await
来实现异步通知。
-
异步接口
- 每个方法都返回Task以支持异步操作
- 完全异步的观察者接口
public interface IAsyncObserver<in T> {Task OnNextAsync(T value);Task OnErrorAsync(Exception exception);Task OnCompletedAsync(); }
-
顺序异步通知机制
- 严格按顺序通知观察者
- 每个观察者处理完成后再通知下一个
- 保留通知顺序性
foreach (var observer in observersCopy) {try{await observer.OnNextAsync(value);}// ... }
-
完整代码
using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Threading;#region Client Code var subject = new AsyncSubject<string>();var observer1 = new AsyncObserver("observer 1"); var observer2 = new AsyncObserver("observer 2");// 订阅观察者 using var subscription1 = subject.Subscribe(observer1); using var subscription2 = subject.Subscribe(observer2);// 异步通知 await subject.NotifyAsync("First Message");// 取消订阅 observer2 subscription2.Dispose();// 再次通知 await subject.NotifyAsync("Second Message");// 错误通知 await subject.NotifyErrorAsync(new Exception("Test Error"));// 完成通知 await subject.OnCompletedAsync();Console.WriteLine("Press any key to exit..."); Console.ReadKey(); #endregion#region Interfaces public interface IAsyncObserver<in T> {Task OnNextAsync(T value);Task OnErrorAsync(Exception exception);Task OnCompletedAsync(); }public interface IAsyncObservable<out T> {IDisposable Subscribe(IAsyncObserver observer); } #endregion#region Async Subject public class AsyncSubject<T> : IAsyncObservable<T> {private readonly List> _observers = new();private readonly object _lock = new();public IDisposable Subscribe(IAsyncObserver observer){lock (_lock){if (!_observers.Contains(observer)){_observers.Add(observer);}}return new Unsubscriber(() =>{lock (_lock){_observers.Remove(observer);}});}public async Task NotifyAsync(T value){IAsyncObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();}foreach (var observer in observersCopy){try{await observer.OnNextAsync(value);}catch (Exception ex){Console.WriteLine($"Notification failed: {ex.Message}");}}}public async Task NotifyErrorAsync(Exception error){IAsyncObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{await observer.OnErrorAsync(error);}catch (Exception ex){Console.WriteLine($"Error notification failed: {ex.Message}");}}}public async Task OnCompletedAsync(){IAsyncObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{await observer.OnCompletedAsync();}catch (Exception ex){Console.WriteLine($"Completion notification failed: {ex.Message}");}}}private class Unsubscriber : IDisposable{private readonly Action _unsubscribeAction;public Unsubscriber(Action unsubscribeAction){_unsubscribeAction = unsubscribeAction;}public void Dispose() => _unsubscribeAction?.Invoke();} } #endregion#region Async Observer public class AsyncObserver : IAsyncObserver<string> {private readonly string _name;public AsyncObserver(string name) => _name = name;public async Task OnNextAsync(string value){await Task.Delay(100); // 模拟异步处理Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} received: {value}");}public async Task OnErrorAsync(Exception exception){await Task.Delay(100); // 模拟异步处理Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} error: {exception.Message}");}public async Task OnCompletedAsync(){await Task.Delay(100); // 模拟异步处理Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} completed");} } #endregion
结果
[22:14:48.269] Observer 1 received: First Message [22:14:48.449] Observer 2 received: First Message [22:14:48.554] Observer 1 received: Second Message [22:14:48.662] Observer 1 error: Test Error Press any key to exit...
4.2.4.2 事件过滤
可以通过在通知方法中添加过滤逻辑来实现事件过滤。
FilteredObservable
的构造函数接收一个过滤函数(Func<T, bool>
),用于决定哪些消息需要通知给观察者。- 在
Notify
方法中,只有满足过滤条件的消息才会被发送。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;#region Client Codevar subject = new FilteredObservable<string>(s => s.StartsWith("[IMPORTANT]"));var observer1 = new Observer("Observer 1");
var observer2 = new Observer("Observer 2");using var subscription1 = subject.Subscribe(observer1);
using var subscription2 = subject.Subscribe(observer2);// 这些消息将被过滤
subject.Notify("Normal Message 1");
subject.Notify("Normal Message 2");// 这些消息将被传递
subject.Notify("[IMPORTANT] Message 1");
subject.Notify("[IMPORTANT] Message 2");// 取消订阅 observer2
subscription2.Dispose();// 再次通知
subject.Notify("[IMPORTANT] Message 3"); // 只有 observer1 收到// 错误通知(不过滤)
subject.NotifyError(new Exception("Critical Error"));// 完成通知(不过滤)
subject.OnCompleted();#endregion#region Subjectpublic class FilteredObservable<T> : IObservable<T>
{private readonly List<IObserver<T>> _observers = new();private readonly Func<T, bool> _filter;private readonly object _lock = new();public FilteredObservable(Func<T, bool> filter){_filter = filter ?? throw new ArgumentNullException(nameof(filter));}public IDisposable Subscribe(IObserver<T> observer){lock (_lock){if (!_observers.Contains(observer))_observers.Add(observer);}return new Unsubscriber(() =>{lock (_lock){_observers.Remove(observer);}});}public async Task Notify(T value){if (!_filter(value)) return;IObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();}foreach (var observer in observersCopy){try{observer.OnNext(value);}catch (Exception ex){Console.WriteLine($"Notification failed: {ex.Message}");}}}public async Task NotifyError(Exception error){IObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{ observer.OnError(error);}catch (Exception ex){Console.WriteLine($"Error notification failed: {ex.Message}");}}}public async Task OnCompleted(){IObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{observer.OnCompleted();}catch (Exception ex){Console.WriteLine($"Completion notification failed: {ex.Message}");}}}private class Unsubscriber : IDisposable{private readonly Action _unsubscribeAction;public Unsubscriber(Action unsubscribeAction) => _unsubscribeAction = unsubscribeAction;public void Dispose() => _unsubscribeAction?.Invoke();}
}#endregion#region Observerpublic class Observer : IObserver<string>
{private readonly string _name;public Observer(string name){_name = name;}public void OnCompleted(){Console.WriteLine($"{_name} completed.");}public void OnError(Exception error){Console.WriteLine($"{_name} error: {error.Message}");}public void OnNext(string value){Console.WriteLine($"{_name} received: {value}");}
}
#endregion
Observer 1 received: [IMPORTANT] Message 1
Observer 2 received: [IMPORTANT] Message 1
Observer 1 received: [IMPORTANT] Message 2
Observer 2 received: [IMPORTANT] Message 2
Observer 1 received: [IMPORTANT] Message 3
Observer 1 error: Critical Error
4.2.5 特点
- 依赖框架:依赖于.NET框架,不适合跨平台或非.NET环境。
- 学习曲线:需要一定的.NET框架知识才能熟练使用。
- 与LINQ集成:可以使用LINQ查询语法对事件流进行操作,简化代码。
- 性能优化:通过高效的订阅机制和事件分发,提升了性能。
- 扩展性强:支持事件过滤、组合、转换等高级功能。
- 线程安全:框架提供了线程安全的机制,减少了线程冲突的风险。
- 标准化:基于.NET框架的标准接口,具有统一的规范。
4.2.6 适用场景
- 复杂事件处理:适用于事件流复杂、需要高级操作(如过滤、组合、转换)的场景。
- 多线程环境:在多线程或异步编程中,可以有效避免线程安全问题。
- 数据流处理:适合处理数据流,如传感器数据、实时消息等。
- 与.NET生态系统集成:与.NET的其他功能(如LINQ、Task并行库)无缝集成。
4.3 System.Reactive
System.Reactive 是基于IObservable<T>
和IObserver<T>
的扩展库,用于处理事件流和异步数据流。
它将事件和数据流抽象为可观察序列,并通过 LINQ 风格的操作符实现订阅、过滤、转换和合并。
核心思想:使用响应式扩展库处理复杂事件流。Rx可以这样定义:Rx = Observables + LINQ + Schedulers。
4.3.1 安装
通过NuGet包管理器安装System.Reactive包
4.3.2 示例
using System.Reactive.Linq;
using System.Reactive.Subjects;// 创建可观察序列
var subject = new Subject<string>();
var observable = subject.AsObservable();// 订阅观察者
var subscription = observable.Where(msg => msg.StartsWith("IMPORTANT")).Subscribe(msg => Console.WriteLine($"Rx received: {msg}"));// 推送消息
subject.OnNext("IMPORTANT: System update");
subject.OnNext("Normal message"); // 被过滤// 取消订阅
subscription.Dispose();
输出:
Rx received: IMPORTANT: System update
4.3.3 特点
- 强大的事件流处理(过滤、映射、合并等)
- 支持LINQ查询操作
- 异步事件处理支持
- 自动管理资源释放(通过IDisposable)
- 需引入第三方库(System.Reactive)
- 学习曲线较陡
4.3.4 适用场景
- 适合复杂事件流处理
- 实时数据更新
- 多线程和异步编程场景
- 功能强大且与.NET生态系统无缝集成
- 优先选择在需要复杂事件流处理的场景
五、使用场景
以下是观察者模式不同实现方式的对比总结:
实现方式 | 适合场景 | 选择建议 |
---|---|---|
经典实现 | 适用于简单的事件通知场景,如 GUI 编程中组件间的交互。 | 当事件逻辑简单、不需要复杂的数据流处理时,适合使用经典实现。 |
委托与事件实现 | 适用于.NET中的事件处理,尤其是需要在多个组件或类之间传递事件的场景。 | 如果使用的是.NET框架,并且需要在类之间传递事件,委托与事件是首选。 |
`IObservable<T>`/`IObserver<T>`实现 | 适用于需要灵活处理数据流的场景,如异步数据处理、多线程环境下的事件推送。 | 当需要更灵活地控制数据流,或者需要支持多个观察者订阅时,`IObservable<T>`/`IObserver<T>`是一个不错的选择。 |
`System.Reactive`实现 | 适用于复杂的数据流处理,尤其是需要对事件进行转换、过滤、组合等操作的场景。 | 如果涉及复杂的数据流处理,或者需要响应式编程的支持,`System.Reactive`是最佳选择。 |
六、扩展
6.1 初始化和基础架构搭建
6.1.1 初始化和基础架构搭建
6.1.1.1 一个观察者观察多个目标
-
场景:有时观察者需要依赖多个目标对象。
-
问题:如果观察者无法区分通知的来源,导致无法针对不同目标对象做出准确响应。
示例:假设一个用户界面中有一个观察者用于监控多个数据源(如温度、湿度和空气质量传感器)。
当任何一个数据源更新时,观察者都会收到通知,但无法区分是哪个数据源发生了变化,从而无法针对性地更新界面元素。
-
改动:对目标对象的
Update接口
进行扩展,确保观察者能够准确识别通知的来源
using System;
using System.Collections.Generic;public class Subject
{private List<IObserver> observers = new List<IObserver>();// 注册观察者public void Attach(IObserver observer){observers.Add(observer);}// 注销观察者public void Detach(IObserver observer){observers.Remove(observer);}// 通知所有观察者public void NotifyObservers(string message){foreach (var observer in observers){observer.Update(this, message); // 将自己作为参数传递给观察者}}
}
6.1.1.2 目标多而观察者少
-
问题
在传统观察者模式中,目标对象(
Subject
)直接保存观察者的引用。这种方式简单直观,但当目标对象数量多而观察者数量少时,会产生明显弊端:- 存储开销问题
每个目标对象都要分配存储空间保存观察者引用,即使某些目标对象没有观察者。
这会导致大量不必要的存储开销,尤其在目标对象数量庞大时,开销更加显著。
-
生命周期管理问题
如果目标对象的生命周期较短,而观察者集合被强引用保留,可能会导致内存泄漏。
因为即使目标对象被垃圾回收,观察者集合仍然占用内存,无法被清理。
-
解决方案
- 使用
HashSet<IObserver>
为了解决存储开销问题,可以考虑使用HashSet<IObserver>
来存储观察者。HashSet
提供了高效的动态添加和移除操作,能够更好地支持观察者在运行时的动态变化。
- 优点:
- 动态管理观察者:
HashSet
提供高效的动态添加和移除操作,支持观察者在运行时的动态变化。 - 避免重复存储:
HashSet
自动去重,避免了重复存储相同的观察者。 - 提高存储效率:通过哈希表实现快速查找和插入操作,减少了存储和检索观察者的开销。
- 动态管理观察者:
- 缺点:
- 内存泄漏风险:如果目标对象被垃圾回收,但观察者集合(
HashSet<IObserver>
)仍然存在引用,那么这些观察者可能不会被正确清理,从而导致内存泄漏。 - 生命周期管理复杂:需要手动管理目标对象和观察者集合的生命周期,确保在目标对象被销毁时,观察者集合也被正确清理。
- 内存泄漏风险:如果目标对象被垃圾回收,但观察者集合(
- 使用
ConditionalWeakTable
ConditionalWeakTable
是一种特殊的哈希表,它允许键(目标对象)在没有其他强引用时被垃圾回收,而不会影响值(观察者集合)的存在。
通过ConditionalWeakTable
,只有真正有观察者的目标对象才会占用存储空间,同时避免了内存泄漏问题。
以下是使用ConditionalWeakTable
实现观察者模式的示例代码:
using System.Runtime.CompilerServices;
using System.Collections.Generic;// 目标和观察者接口省略...public class Subject : ISubject
{private static readonly object _syncLock = new object();private static readonly ConditionalWeakTable<Subject, HashSet<IObserver>> observerMap = new ConditionalWeakTable<Subject, HashSet<IObserver>>();public void Attach(IObserver observer){lock (_syncLock){if (!observerMap.TryGetValue(this, out var observers)){observers = new HashSet();observerMap.Add(this, observers);}observers.Add(observer);}}public void Detach(IObserver observer){lock (_syncLock){if (observerMap.TryGetValue(this, out var observers)){observers.Remove(observer);if (observers.Count == 0){observerMap.Remove(this);}}}}public void NotifyObservers(string message){HashSet<IObserver> observersCopy;lock (_syncLock){if (!observerMap.TryGetValue(this, out var observers)){return;}observersCopy = new HashSet(observers);}foreach (var observer in observersCopy){observer.Update(this, message);}}
}
-
优点
- 避免内存泄漏:
ConditionalWeakTable
允许目标对象在没有其他强引用时被垃圾回收,从而避免了内存泄漏问题。 - 动态管理:目标对象和观察者之间的关系是动态的,
ConditionalWeakTable
提供了一种灵活的方式来管理这种关系。 - 优化存储效率:只有真正有观察者的目标对象才会占用存储空间,减少了不必要的存储开销。
- 避免内存泄漏:
-
缺点
- 性能开销:
ConditionalWeakTable
的查找和管理操作比直接使用HashSet<IObserver>
更复杂,可能会引入额外的性能开销。 - 复杂性增加:代码的复杂性增加,需要理解
ConditionalWeakTable
的工作机制。
- 性能开销:
-
实际应用中的权衡
观察者模式在实际应用中需要根据具体需求选择合适的实现方式。
如果目标对象数量较多且生命周期较短,推荐使用
ConditionalWeakTable
,以避免内存泄漏并优化存储效率。如果目标对象生命周期较长且观察者管理较为简单,则可以直接使用
HashSet<IObserver>
,以简化实现和提高性能。
6.1.1.3 目标对象之间存在复杂依赖关系
-
问题
当目标对象存在复杂依赖关系时,直接通知观察者可能引发以下问题:
- 多次更新:观察者可能因多个目标对象的改变收到重复通知,导致冗余操作。
- 更新顺序问题:目标对象的改变顺序可能导致观察者在状态未完全更新时收到通知,获取不一致的状态。
- 维护成本高:复杂的依赖关系增加了代码复杂性和维护难度。
-
解决方案
引入一个独立的更改管理器(ChangeManager) 来封装和管理复杂的更新逻辑。
-
更改管理器的作用
- 维护映射关系:管理目标对象与观察者的映射,降低耦合度。
- 定义更新策略:在所有相关目标对象状态更新完毕后统一通知观察者,避免冗余和不一致问题。
- 优化更新逻辑:根据依赖关系优化更新流程,确保观察者只接收一次更新。
-
更改管理器的两种实现
是一个典型的中介者模式实例,通常以单例模式全局可见,从而确保整个系统中只有一个协调中心。两种特殊的更改管理器实现:
-
SimpleChangeManager
-
实现方案
- 使用字典维护目标对象与观察者的映射
- 使用脏标记集合跟踪需要更新的目标对象
- Commit时统一通知观察者并去重
-
示例
using System;using System.Collections.Generic;using System.Linq;#region Client Code// 使用简单更改管理器ChangeManager.Instance = SimpleChangeManager.Instance;var subject = new ConcreteSubject();var observer = new ConcreteObserver();ChangeManager.Instance.Register(subject, observer);subject.State = 42;subject.Notify();ChangeManager.Instance.Commit();// 使用DAG更改管理器ChangeManager.Instance = DAGChangeManager.Instance;var subjectA = new ConcreteSubject();var subjectB = new ConcreteSubject();var dagObserver = new ConcreteObserver();ChangeManager.Instance.Register(subjectA, dagObserver);ChangeManager.Instance.Register(subjectB, dagObserver);((DAGChangeManager)ChangeManager.Instance).AddDependency(subjectA, subjectB);subjectA.State = 10;subjectB.State = 20;subjectA.Notify();subjectB.Notify();ChangeManager.Instance.Commit();#endregion#region IObserver// 观察者接口public interface IObserver{void Update(ISubject subject);}#endregion#region IObservable// 目标对象接口public interface ISubject{void Notify();}#endregion#region ChangeManager// 更改管理器抽象类public abstract class ChangeManager{public static ChangeManager Instance { get; set; }public abstract void Register(ISubject subject, IObserver observer);public abstract void Unregister(ISubject subject, IObserver observer);public abstract void Notify(ISubject subject);public abstract void Commit();}#endregion#region SimpleChangeManager// 简单更改管理器实现public sealed class SimpleChangeManager : ChangeManager{private readonly Dictionary> _observers = new();private readonly HashSet<ISubject> _dirtySubjects = new();static SimpleChangeManager(){}private static SimpleChangeManager _instance;public static SimpleChangeManager Instance{get{if (_instance == null){_instance = new SimpleChangeManager();}return _instance;}}public override void Register(ISubject subject, IObserver observer){if (!_observers.ContainsKey(subject)){_observers[subject] = new HashSet();}_observers[subject].Add(observer);}public override void Unregister(ISubject subject, IObserver observer){if (_observers.TryGetValue(subject, out var observers)){observers.Remove(observer);}}public override void Notify(ISubject subject){lock (_dirtySubjects){_dirtySubjects.Add(subject);}}public override void Commit(){HashSet<IObserver> notified = new();List<ISubject> toProcess;lock (_dirtySubjects){toProcess = _dirtySubjects.ToList();_dirtySubjects.Clear();}foreach (var subject in toProcess){if (_observers.TryGetValue(subject, out var observers)){foreach (var observer in observers.Where(observer => notified.Add(observer))){observer.Update(subject);}}}}}#endregion#region ConcreteSubject// 示例使用public class ConcreteSubject : ISubject{public int State { get; set; }public void Notify(){ChangeManager.Instance.Notify(this);}}#endregion#region ConcreteObserverpublic class ConcreteObserver : IObserver{public void Update(ISubject subject){if (subject is ConcreteSubject concreteSubject){Console.WriteLine($"Received update: {concreteSubject.State}");}}}#endregion``````c#Received update: 42
-
特点:总是更新每个目标对象的所有观察者。实现简单,易于理解。可能会导致冗余更新,效率较低。
-
适用场景:当目标对象之间没有复杂的依赖关系,或者更新逻辑简单时,这种实现方式比较合适。
-
DAGChangeManager
-
实现方案
-
继承简单管理器基础功能
-
添加依赖关系管理
-
使用拓扑排序确保更新顺序
1. 使用深度优先搜索(DFS)。
2. 先处理依赖项,再处理当前主题。
3. 最终得到的排序结果是一个线性顺序,满足所有依赖关系 -
示例
#region DAGChangeManager// 基于DAG的复杂更改管理器public sealed class DAGChangeManager : ChangeManager{private readonly Dictionary> _observers = new();private readonly Dictionary<ISubject, HashSet<ISubject>> _dependencies = new();private readonly HashSet<ISubject> _dirtySubjects = new();static DAGChangeManager(){}private static DAGChangeManager _instance;public static DAGChangeManager Instance{get{if (_instance == null){_instance = new DAGChangeManager();}return _instance;}}// 添加依赖关系(dependent 依赖于 dependency)public void AddDependency(ISubject dependent, ISubject dependency){if (!_dependencies.ContainsKey(dependent)){_dependencies[dependent] = new HashSet();}_dependencies[dependent].Add(dependency);}public override void Register(ISubject subject, IObserver observer){if (!_observers.ContainsKey(subject)){_observers[subject] = new HashSet();}_observers[subject].Add(observer);}public override void Unregister(ISubject subject, IObserver observer) {if (_observers.TryGetValue(subject, out var observers)){observers.Remove(observer);}}public override void Notify(ISubject subject){lock (_dirtySubjects){_dirtySubjects.Add(subject);}}public override void Commit(){List<ISubject> processingOrder;lock (_dirtySubjects){processingOrder = TopologicalSort(_dirtySubjects);_dirtySubjects.Clear();}HashSet<IObserver> notified = new();foreach (var subject in processingOrder){if (_observers.TryGetValue(subject, out var observers)){foreach (var observer in observers.Where(observer => notified.Add(observer))){observer.Update(subject);}}}}private List<ISubject> TopologicalSort(HashSet<ISubject> subjects){var sorted = new List<ISubject>();var visited = new HashSet<ISubject>();foreach (var subject in subjects.OrderBy(s => s.GetHashCode())){Visit(subject, visited, sorted);}return sorted;}private void Visit(ISubject subject, HashSet<ISubject> visited, List<ISubject> sorted){if (!visited.Add(subject)) return;if (_dependencies.TryGetValue(subject, out var dependencies)){foreach (var dependency in dependencies){Visit(dependency, visited, sorted);}}sorted.Add(subject);}}#endregion``````c#Received update: 20
- **特点**:处理目标对象及其观察者之间依赖关系构成的**无环有向图(DAG,Directed Acyclic Graph)**。
-
优点:可以避免冗余更新,确保观察者只接收一次更新。
-
缺点:实现复杂度较高,需要维护依赖关系图。
-
适用场景:当观察者可能依赖多个目标对象,且目标对象之间存在复杂的依赖关系时,这种实现方式更好。
-
总结
更改管理器(ChangeManager)是一种优化机制,用于封装复杂更新逻辑,简化目标对象与观察者之间的依赖关系。它通过以下方式实现优化:
- 职责分离:将映射关系和更新逻辑封装到独立对象中。
- 统一通知:在所有目标对象状态更新完毕后,一次性通知观察者。
- 优化策略:避免冗余更新。
更改管理器可以是简单的SimpleChangeManager或复杂的DAGChangeManager,具体取决于系统需求。它通常以单例模式全局可见,作为系统的协调中心。
6.2 注册机制
6.2.1 问题
在观察者模式中,传统的事件通知机制可能存在以下问题:
- 通知效率低:目标对象可能向所有观察者发送通知,即使某些观察者并不关心某些事件。
- 耦合度高:观察者与目标对象之间的依赖关系较强,难以灵活调整。
- 缺乏灵活性:观察者无法动态选择关注的事件类型,难以适应复杂的应用场景。
这些问题可能导致系统性能下降,代码难以维护和扩展。
6.2.2 解决方案
通过显式注册机制,观察者可以明确指定其感兴趣的事件类型,目标对象仅向已注册的观察者发送相关通知。
6.2.2.1 实现思路
- 引入“方面(Aspect)”概念:将目标对象的状态变化分解为多个独立的方面,每个方面代表一种特定类型的变更。
- 观察者选择性注册:观察者可以根据需要注册对特定方面的兴趣,从而只接收关注的事件通知。
- 目标对象优化通知:目标对象仅向已注册特定方面的观察者发送通知,避免不必要的消息传递。
6.2.2.2 示例
-
定义方面(Aspect)枚举
方面(Aspect)枚举:定义了目标对象可能的状态变化类型,例如状态变化、数据更新和错误发生。
// 定义方面(Aspect)枚举,表示目标对象可能的状态变化类型 public enum Aspect {StateChange,DataUpdate,ErrorOccurred }
-
目标对象类 Subject
- 使用字典存储每个方面对应的观察者列表。
- 提供注册和取消注册的方法,允许观察者显式指定感兴趣的方面。
- 提供通知方法,仅向注册了特定方面的观察者发送通知。
// 目标对象类 public class Subject {// 用于存储观察者订阅的方面private Dictionary>> observers = new Dictionary>>();public Subject(){// 初始化方面列表foreach (Aspect aspect in Enum.GetValues(typeof(Aspect))){observers[aspect] = new List>();}}// 注册观察者public void RegisterObserver(Aspect aspect, Action observer){observers[aspect].Add(observer);Console.WriteLine($"Observer registered for aspect: {aspect}");}// 取消注册观察者public void UnregisterObserver(Aspect aspect, Action observer){observers[aspect].Remove(observer);Console.WriteLine($"Observer unregistered from aspect: {aspect}");}// 通知观察者public void NotifyObservers(Aspect aspect, string message){if (observers[aspect].Count > 0){Console.WriteLine($"Notifying observers for aspect: {aspect}");foreach (var observer in observers[aspect]){observer(message);}}else{Console.WriteLine($"No observers registered for aspect: {aspect}");}}// 模拟目标对象状态变化public void ChangeState(){Console.WriteLine("Subject state changed.");NotifyObservers(Aspect.StateChange, "State has changed.");}public void UpdateData(){Console.WriteLine("Subject data updated.");NotifyObservers(Aspect.DataUpdate, "Data has been updated.");}public void ErrorOccurred(){Console.WriteLine("Error occurred in the subject.");NotifyObservers(Aspect.ErrorOccurred, "An error has occurred.");} }
-
观察者类 Observer
- 包含多个回调方法,分别对应不同的方面。
- 观察者可以根据需要注册对特定方面的兴趣。
// 观察者类 public class Observer {private string name;public Observer(string name){this.name = name;}public void OnStateChange(string message){Console.WriteLine($"{name} received state change notification: {message}");}public void OnDataUpdate(string message){Console.WriteLine($"{name} received data update notification: {message}");}public void OnError(string message){Console.WriteLine($"{name} received error notification: {message}");} }
-
Client
- 创建目标对象和观察者。
- 观察者显式注册对特定方面的兴趣。
- 模拟目标对象的状态变化,观察通知机制的运行。
// 测试程序 public class Program {public static void Main(){// 创建目标对象Subject subject = new Subject();// 创建观察者Observer observer1 = new Observer("Observer1");Observer observer2 = new Observer("Observer2");// 观察者1注册对所有方面的兴趣subject.RegisterObserver(Aspect.StateChange, observer1.OnStateChange);subject.RegisterObserver(Aspect.DataUpdate, observer1.OnDataUpdate);subject.RegisterObserver(Aspect.ErrorOccurred, observer1.OnError);// 观察者2仅注册对错误方面的兴趣subject.RegisterObserver(Aspect.ErrorOccurred, observer2.OnError);// 模拟目标对象状态变化subject.ChangeState();subject.UpdateData();subject.ErrorOccurred();// 观察者2取消对错误方面的兴趣subject.UnregisterObserver(Aspect.ErrorOccurred, observer2.OnError);// 再次触发错误事件,观察者2不再接收通知subject.ErrorOccurred();} }
-
结果
Observer registered for aspect: StateChange Observer registered for aspect: DataUpdate Observer registered for aspect: ErrorOccurred Observer registered for aspect: ErrorOccurred Subject state changed. Notifying observers for aspect: StateChange Observer1 received state change notification: State has changed. Subject data updated. Notifying observers for aspect: DataUpdate Observer1 received data update notification: Data has been updated. Error occurred in the subject. Notifying observers for aspect: ErrorOccurred Observer1 received error notification: An error has occurred. Observer2 received error notification: An error has occurred. Observer unregistered from aspect: ErrorOccurred Error occurred in the subject. Notifying observers for aspect: ErrorOccurred Observer1 received error notification: An error has occurred.
6.2.2.3 优点
- 提高通知效率:目标对象仅发送观察者关心的事件。
- 降低耦合度:观察者与目标对象之间保持松散耦合。
- 灵活性强:观察者可根据需求动态调整关注的事件类型。
6.2.2.4 适用场景
这种方法通过事件驱动机制实现了高效的通信,适用于需要灵活配置和优化性能的场景,例如:
- 多用户系统中对不同事件的关注。
- 复杂系统的状态监控和事件响应。
- 需要动态调整事件监听的应用。
6.3 触发机制
6.3.1 触发之前
在通知观察者之前,目标对象的状态必须是完整且正确的,否则观察者可能会基于错误的状态进行操作,从而引发问题。
6.3.2 更新的触发者
目标对象与观察者通过通知机制保持同步,存在以下两种触发方式:
- 自动触发:目标对象状态变化后自动执行
Notify
。优势在于无需客户手动操作,但可能因连续操作频繁触发更新,影响效率。 - 手动触发:客户在状态变化完成后适时调用
Notify
。优点在于可避免不必要的中间更新,但增加了客户操作负担,且存在因遗漏调用而导致错误的风险。
6.3.3 示例
6.3.3.1 抽象目标对象
observers
:存储观察者的集合。mainState
和secondaryState
:主题的状态信息。Attach
和Detach
:用于添加和移除观察者。Notify
:抽象方法,由具体主题类实现,用于通知观察者。PrepareUpdate
:用于准备状态更新,但不触发通知。ShowState
:用于打印当前状态。
#region abstract Subject// 抽象目标对象
abstract class Subject
{protected HashSet<IObserver> observers = new HashSet<IObserver>();protected int mainState;protected int secondaryState;public void Attach(IObserver observer) => observers.Add(observer);public void Detach(IObserver observer) => observers.Remove(observer);public abstract void Notify();protected void PrepareUpdate(int main, int secondary){mainState = main;secondaryState = secondary;}public void ShowState(){Console.WriteLine($"State: [{mainState}, {secondaryState}]");}
}#endregion
6.3.3.2 观察者接口
#region IObserver// 观察者接口
interface IObserver
{void Update();
}#endregion
6.3.3.3 具体的的观察者
#region ConcreteObservers// 具体观察者
class StateObserver : IObserver
{private readonly Subject subject;public StateObserver(Subject subject){this.subject = subject;}public void Update(){Console.Write("Observer received update: ");subject.ShowState();}
}#endregion
6.3.3.4 自动触发的目标对象
状态发生改变时,自动触发通知
#region AutoTriggerSubject// 自动触发实现
class AutoTriggerSubject : Subject
{public void SetMainState(int value){mainState = value;Notify(); // 自动触发}public override void Notify(){Console.WriteLine("[AutoTrigger] Notifying observers...");foreach (var observer in observers){observer.Update();}}
}#endregion
6.3.3.5 手动触发的目标对象
#region ManualTriggerSubject// 手动触发实现
class ManualTriggerSubject : Subject
{public void CompleteUpdate(int main, int secondary){PrepareUpdate(main, secondary);// 不自动触发}public override void Notify(){Console.WriteLine("[ManualTrigger] Notifying observers...");foreach (var observer in observers){observer.Update();}}
}
#endregion
6.3.3.6 使用示例
#region Client Code// 自动触发演示
Console.WriteLine("=== Automatic Trigger Demo ===");
var autoSubject = new AutoTriggerSubject();
var obs1 = new StateObserver(autoSubject);
autoSubject.Attach(obs1);autoSubject.SetMainState(10); // 触发通知
autoSubject.SetMainState(20); // 再次触发// 手动触发演示
Console.WriteLine("\n=== Manual Trigger Demo ===");
var manualSubject = new ManualTriggerSubject();
var obs2 = new StateObserver(manualSubject);
manualSubject.Attach(obs2);manualSubject.CompleteUpdate(1, 100);
manualSubject.CompleteUpdate(2, 200);
manualSubject.CompleteUpdate(3, 300);
manualSubject.Notify(); // 单次触发#endregion
结果
=== Automatic Trigger Demo ===
[AutoTrigger] Notifying observers...
Observer received update: State: [10, 0]
[AutoTrigger] Notifying observers...
Observer received update: State: [20, 0]=== Manual Trigger Demo ===
[ManualTrigger] Notifying observers...
Observer received update: State: [3, 300]
6.4 信息传递机制
观察者模式中目标将这些信息作为Update操作的一个参数传递出去。这些信息的量可能很小,也可能很大。其信息量传递大小的两个极端便是:推模型(Push Model)和拉模型(Pull Model)。
推模型:一种主动推送信息的机制,主题对象在状态发生变化时,主动将包含具体变更信息的参数推送给所有观察者,观察者通过更新方法来接收这些信息。这种方式传递的是结构化数据,适用于需要接收完整、结构化数据且数据格式相对稳定的场景。
拉模型:一种按需获取信息的机制,主题对象在状态发生变化时,仅发送一个简单的通知给观察者,观察者需要主动调用主题对象的查询接口来获取所需的信息。这种方式更加灵活,适用于观察者需要不同数据子集或数据格式可能频繁变化的场景。
6.4.1 解决方案与实现方式
6.4.1.1 推模型(Push Model)
-
实现原理
目标对象主动推送包含具体变更信息的参数至观察者的更新方法,采用事件驱动机制传递结构化数据。
-
典型实现示例(气象监测系统)
-
IObserver 接口
#region IObserver 接口 // 观察者接口 public interface IObserver {void Update(WeatherData data); } #endregion
-
具体观察者 :Display 类
#region 具体观察者 :Display 类 // 具体观察者 public class Display : IObserver {public void Update(WeatherData data){Console.WriteLine($"当前温度:{data.Temperature}℃");} } #endregion
-
Subject 类: WeatherStation 类
SetMeasurements(float temp)
:设置新的温度数据,并触发通知。NotifyObservers(WeatherData data)
:遍历观察者列表,调用每个观察者的Update
方法。
#region Subject 类: WeatherStation 类// 目标对象 public class WeatherStation {private List<IObserver> _observers = new List<IObserver>();private float _temperature;public void SetMeasurements(float temp){_temperature = temp;NotifyObservers(new WeatherData(temp));}private void NotifyObservers(WeatherData data){foreach (var observer in _observers){observer.Update(data);}}public void Attach(IObserver observer) => _observers.Add(observer);public void Detach(IObserver observer) => _observers.Remove(observer); }// 数据传输对象 public class WeatherData {public float Temperature { get; }public WeatherData(float temperature){Temperature = temperature;} }#endregion
-
Client
#region Client Code// 创建一个天气站对象 WeatherStation weatherStation = new WeatherStation();// 创建两个观察者对象 Display display1 = new Display(); Display display2 = new Display();// 将观察者添加到天气站的观察者列表 weatherStation.Attach(display1); weatherStation.Attach(display2);// 模拟天气站更新数据 Console.WriteLine("天气站更新温度为 25.5℃:"); weatherStation.SetMeasurements(25.5f);// 移除一个观察者 weatherStation.Detach(display1);// 再次更新数据 Console.WriteLine("\n天气站更新温度为 28.0℃:"); weatherStation.SetMeasurements(28.0f);Console.ReadLine();#endregion
结果
天气站更新温度为 25.5℃: 当前温度:25.5℃ 当前温度:25.5℃天气站更新温度为 28.0℃: 当前温度:28℃
-
-
适用场景
- 观察者需要接收完整、结构化的数据
- 数据格式相对稳定且预先可知
- 需要最小化观察者的查询操作
- 实时性要求高于通信成本
6.4.1.2 拉模型(Pull Model)
-
实现原理
目标对象仅发送简单通知,观察者主动调用目标对象的查询接口获取所需信息,实现按需获取
-
典型实现示例
假设一个股票市场监控系统,目标对象是股票市场,观察者是不同的投资者。当股票价格发生变化时,股票市场仅通知投资者价格发生了变化,而投资者需要主动查询当前的股票价格。
-
目标和观察者接口
#region Interface Code// 观察者接口 public interface IObserver {void Update(); }// 目标接口 public interface ISubject {void Attach(IObserver observer);void Detach(IObserver observer);void Notify(); }#endregion
-
具体目标
#region Concrete Subject : StockMarket// 具体目标类 public class StockMarket : ISubject {private List<IObserver> observers = new List<IObserver>();private decimal stockPrice;public decimal StockPrice{get { return stockPrice; }set{stockPrice = value;Notify();}}public void Attach(IObserver observer){observers.Add(observer);}public void Detach(IObserver observer){observers.Remove(observer);}public void Notify(){foreach (var observer in observers){observer.Update();}} }#endregion
-
具体观察者
通过
Update
方法接收通知,并从目标对象中获取当前股票价格。#region Concrete Observer : Investor // 具体观察者类 public class Investor : IObserver {private string name;private ISubject stockMarket;public Investor(string name, ISubject stockMarket){this.name = name;this.stockMarket = stockMarket;}public void Update(){decimal currentPrice = ((StockMarket)stockMarket).StockPrice;Console.WriteLine($"{name} received notification. Current stock price: {currentPrice:C}");} }#endregion
-
使用示例
#region Client Code// 创建股票市场对象 StockMarket stockMarket = new StockMarket();// 创建投资者 Investor investor1 = new Investor("Alice", stockMarket); Investor investor2 = new Investor("Bob", stockMarket);// 将投资者添加到股票市场的观察者列表 stockMarket.Attach(investor1); stockMarket.Attach(investor2);// 模拟股票价格变化 Console.WriteLine("Stock price changes to $100."); stockMarket.StockPrice = 100;Console.WriteLine("Stock price changes to $120."); stockMarket.StockPrice = 120;// 移除一个投资者 stockMarket.Detach(investor1);Console.WriteLine("Stock price changes to $150."); stockMarket.StockPrice = 150;Console.ReadLine(); #endregion
结果
Stock price changes to $100. Alice received notification. Current stock price: ¥100.00 Bob received notification. Current stock price: ¥100.00 Stock price changes to $120. Alice received notification. Current stock price: ¥120.00 Bob received notification. Current stock price: ¥120.00 Stock price changes to $150. Bob received notification. Current stock price: ¥150.00
-
-
适用场景
- 适用于观察者需要不同数据子集的情况。
- 数据格式可能频繁变化,观察者可以根据需要动态获取数据。
- 通信成本不是主要瓶颈,因为观察者主动查询数据。
6.4.1.3 设计对比与权衡
维度 | 推模型 | 拉模型 |
---|---|---|
耦合度 | 高(需预知观察者需求) | 低(观察者自主决定获取内容) |
通信效率 | 高(单次传输完整数据) | 低(需多次请求-响应) |
接口稳定性 | 要求高(参数结构需固定) | 要求低(仅需保持查询接口) |
可扩展性 | 较差(新增观察者类型需修改接口) | 较好(新观察者可自主获取数据) |
通过理解两种模型的本质特征,可以根据具体业务需求、系统约束和演进方向,制定出最适合当前上下文的信息传递策略。
在分布式系统和微服务架构中,这种设计权衡往往直接影响系统的可维护性和扩展能力。
6.4.1.4 最佳实践建议
-
混合模式设计
结合两种模型的优势,推送基础变更通知,允许观察者拉取补充信息。
-
观察者接口
#region Interface Codepublic interface IObserver {void Update(int state); // 接收基础状态 }#endregion
-
具体目标
#region Subject public class Subject {private List<IObserver> observers = new List<IObserver>();private int state; // 被观察者的状态public int State{get { return state; }set{state = value;NotifyObservers(); // 状态改变时通知观察者}}// 注册观察者public void Attach(IObserver observer){observers.Add(observer);}// 移除观察者public void Detach(IObserver observer){observers.Remove(observer);}// 通知观察者public void NotifyObservers(){foreach (IObserver observer in observers){observer.Update(state); // 推送基础状态}} }#endregion
-
具体观察者:ConcreteObserver
#region ConcreteObserverpublic class ConcreteObserver : IObserver {private Subject subject; // 持有被观察者的引用public ConcreteObserver(Subject subject){this.subject = subject;subject.Attach(this); // 注册到被观察者}public void Update(int state){Console.WriteLine($"Received base state: {state}");// 根据需要拉取补充信息if (state > 10){int additionalInfo = subject.State; // 拉取补充信息Console.WriteLine($"Additional info: {additionalInfo}");}} }#endregion
-
使用示例:Client
#region Client CodeSubject subject = new Subject();// 创建观察者并注册到被观察者 ConcreteObserver observer1 = new ConcreteObserver(subject); ConcreteObserver observer2 = new ConcreteObserver(subject);// 改变被观察者的状态 Console.WriteLine("Setting state to 5:"); subject.State = 5; // 输出基础状态Console.WriteLine("\nSetting state to 15:"); subject.State = 15; // 输出基础状态和补充信息#endregion
结果
Setting state to 5: Received base state: 5 Received base state: 5Setting state to 15: Received base state: 15 Additional info: 15 Received base state: 15 Additional info: 15
-
-
通信优化策略
实现思路
- 批量处理拉取请求:通过在
Subject
中维护一个队列,将观察者的拉取请求批量处理。 - 建立数据缓存机制:在
Subject
中缓存状态信息,避免重复计算或重复拉取。 - 使用差异更新(delta update):仅推送状态变化的部分,而不是完整状态。
-
观察者接口 IObserver
#region Interface Codepublic interface IObserver {void Update(int state); // 接收基础状态 }#endregion
-
目标对象 Subject
#region Subjectpublic class Subject {private List<IObserver> observers = new List<IObserver>();private int state; // 被观察者的状态private int previousState; // 上一次的状态,用于差异更新public int State{get { return state; }set{if (state != value) // 检查状态是否变化{previousState = state; // 保存上一次状态state = value;NotifyObservers(); // 状态改变时通知观察者}}}// 注册观察者public void Attach(IObserver observer){observers.Add(observer);}// 移除观察者public void Detach(IObserver observer){observers.Remove(observer);}// 通知观察者public void NotifyObservers(){foreach (IObserver observer in observers){observer.Update(state); // 推送基础状态}}// 提供拉取补充信息的接口public int GetAdditionalInfo(){// 模拟缓存机制:如果状态未变,直接返回缓存值if (state == previousState){Console.WriteLine("Using cached additional info.");return previousState;}// 模拟拉取补充信息Console.WriteLine("Fetching additional info...");return state;} }#endregion
-
具体观察者:ConcreteObserver
#region ConcreteObserverpublic class ConcreteObserver : IObserver {private Subject subject; // 持有被观察者的引用public ConcreteObserver(Subject subject){this.subject = subject;subject.Attach(this); // 注册到被观察者}public void Update(int state){Console.WriteLine($"Received base state: {state}");// 根据需要拉取补充信息if (state > 10){int additionalInfo = subject.GetAdditionalInfo(); // 拉取补充信息Console.WriteLine($"Additional info: {additionalInfo}");}} }#endregion
-
使用示例 Client
#region Client CodeSubject subject = new Subject();// 创建观察者并注册到被观察者 ConcreteObserver observer1 = new ConcreteObserver(subject); ConcreteObserver observer2 = new ConcreteObserver(subject);// 改变被观察者的状态 Console.WriteLine("Setting state to 5:"); subject.State = 5; // 输出基础状态Console.WriteLine("\nSetting state to 15:"); subject.State = 15; // 输出基础状态和补充信息Console.WriteLine("\nSetting state to 15 again (cached info will be used):"); subject.State = 15; // 使用缓存信息 #endregion
结果
Setting state to 5: Received base state: 5 Received base state: 5Setting state to 15: Received base state: 15 Fetching additional info... Additional info: 15 Received base state: 15 Fetching additional info... Additional info: 15Setting state to 15 again (cached info will be used):
- 批量处理拉取请求:通过在
6.5 资源管理和错误处理
避免已删除目标的悬挂引用
删除目标时,需确保其观察者中不遗留对该目标的无效引用。
否则,当观察者尝试访问已销毁的目标时,可能会引发错误或异常,导致程序崩溃或行为不可预测。
为了避免这种情况,可以在目标对象被销毁之前,主动通知所有观察者解除对其的订阅。
示例
-
目标接口和观察者接口
#region Interface// 定义观察者接口 public interface IObserver {void Update(string message);void Unsubscribe(); }// 定义目标接口 public interface ISubject {void Attach(IObserver observer);void Detach(IObserver observer);void Notify(string message); }#endregion
-
具体目标类 Subject
在目标对象被销毁之前,通知所有观察者解除订阅
#region 定义具体目标类 : Subject// 定义具体目标(主题)类 public class Subject : ISubject {private List _observers = new List();public void Attach(IObserver observer){_observers.Add(observer);}public void Detach(IObserver observer){_observers.Remove(observer);}public void Notify(string message){foreach (var observer in _observers){observer.Update(message);}}// 在目标对象被销毁之前,通知所有观察者解除订阅public void Dispose(){foreach (var observer in _observers){observer.Unsubscribe();}_observers.Clear();} }#endregion
-
具体观察者类 ConcreteObserver
#region 具体观察者类 : ConcreteObserver // 定义具体观察者类 public class ConcreteObserver : IObserver {private string _name;private Subject _subject;public ConcreteObserver(string name, Subject subject){_name = name;_subject = subject;}public void Update(string message){Console.WriteLine($"{_name} received message: {message}");}public void Unsubscribe(){if (_subject != null){_subject.Detach(this);_subject = null;}} } #endregion
-
使用实例 Client Code
在销毁目标对象之前,通知所有观察者解除订阅.
#region Client CodeSubject subject = new Subject();IObserver observer1 = new ConcreteObserver("observer 1", subject); IObserver observer2 = new ConcreteObserver("observer 2", subject);subject.Attach(observer1); subject.Attach(observer2);subject.Notify("Hello Observers!");// 在销毁目标对象之前,通知所有观察者解除订阅 subject.Dispose();// 尝试再次通知(应该不会有任何效果,因为观察者已被移除) subject.Notify("This should not be received.");#endregion
结果
Observer 1 received message: Hello Observers! Observer 2 received message: Hello Observers!