这个类会在后台自动更新缓存数据,你只需要调用方法来获取数据即可。
自动更新缓存类
以下是 AutoUpdatingCache
类的实现:
import threading
import timeclass AutoUpdatingCache:def __init__(self, update_function, expiry_time=60):"""初始化缓存类。:param update_function: 一个函数,用于生成或更新缓存数据。:param expiry_time: 缓存的更新周期(秒)。"""self.update_function = update_functionself.expiry_time = expiry_timeself.cache_data = Noneself.last_updated = 0self.lock = threading.Lock()self._start_background_update()def _start_background_update(self):# 启动后台线程更新缓存self.update_thread = threading.Thread(target=self._update_cache_periodically)self.update_thread.daemon = Trueself.update_thread.start()def _update_cache_periodically(self):while True:current_time = time.time()if current_time - self.last_updated >= self.expiry_time:self._update_cache()time.sleep(1) # 每秒检查一次def _update_cache(self):with self.lock:try:print("Updating cache...")new_data = self.update_function()self.cache_data = new_dataself.last_updated = time.time()print("Cache updated!")except Exception as e:print(f"Error updating cache: {e}")def get_data(self):with self.lock:if self.cache_data is not None:return self.cache_dataelse:return "Cache is initializing, please try again later."
使用说明
-
定义一个数据生成函数
首先,需要定义一个用于生成或更新缓存数据的函数。这个函数可以是任何耗时的操作,例如从数据库查询、计算复杂结果等。
import timedef generate_cache_data():# 模拟耗时操作time.sleep(5)return {"value": "fresh data", "timestamp": time.time()}
-
创建缓存类的实例
将数据生成函数传递给
AutoUpdatingCache
类,并设置缓存更新周期。cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
-
获取缓存数据
在需要的地方调用
get_data()
方法即可获取缓存数据。data = cache.get_data() print(data)
完整示例
将以上步骤组合起来:
import threading
import timeclass AutoUpdatingCache:def __init__(self, update_function, expiry_time=60):self.update_function = update_functionself.expiry_time = expiry_timeself.cache_data = Noneself.last_updated = 0self.lock = threading.Lock()self._start_background_update()def _start_background_update(self):self.update_thread = threading.Thread(target=self._update_cache_periodically)self.update_thread.daemon = Trueself.update_thread.start()def _update_cache_periodically(self):while True:current_time = time.time()if current_time - self.last_updated >= self.expiry_time:self._update_cache()time.sleep(1)def _update_cache(self):with self.lock:try:print("Updating cache...")new_data = self.update_function()self.cache_data = new_dataself.last_updated = time.time()print("Cache updated!")except Exception as e:print(f"Error updating cache: {e}")def get_data(self):with self.lock:if self.cache_data is not None:return self.cache_dataelse:return "Cache is initializing, please try again later."# 数据生成函数
def generate_cache_data():time.sleep(5) # 模拟耗时操作return {"value": "fresh data", "timestamp": time.time()}# 创建缓存实例
cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)# 模拟获取数据
for _ in range(10):data = cache.get_data()print(data)time.sleep(10)
代码解释
-
AutoUpdatingCache 类
- init 方法:
- 初始化缓存,设置数据生成函数和缓存更新周期。
- 启动后台线程
_update_cache_periodically
。
- _update_cache_periodically 方法:
- 无限循环,每隔一秒检查缓存是否需要更新。
- 如果当前时间距离上次更新时间超过了
expiry_time
,则调用_update_cache
。
- _update_cache 方法:
- 使用
update_function
更新缓存数据。 - 使用锁机制
threading.Lock
确保线程安全。
- 使用
- get_data 方法:
- 获取缓存数据。
- 如果缓存数据为空(初始化中),返回提示信息。
- init 方法:
-
数据生成函数
generate_cache_data
函数模拟一个耗时操作,生成新的缓存数据。
-
使用示例
- 创建缓存实例并在循环中每隔 10 秒获取一次数据,观察缓存的更新情况。
注意事项
-
线程安全:
- 使用
threading.Lock
确保在多线程环境下数据访问的安全性。
- 使用
-
异常处理:
- 在更新缓存时,捕获可能的异常,防止线程崩溃。
-
后台线程:
- 将线程设置为守护线程(
daemon=True
),使得主程序退出时,线程自动结束。
- 将线程设置为守护线程(
应用场景
你可以将这个缓存类应用在 Web 应用程序中,例如在 Sanic 的路由中:
from sanic import Sanic
from sanic.response import jsonapp = Sanic("CacheApp")@app.route("/data")
async def get_cached_data(request):data = cache.get_data()return json({"data": data})if __name__ == "__main__":# 确保缓存在应用启动前初始化cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)app.run(host="0.0.0.0", port=8000)
这样,用户在访问 /data
路由时,总是能得到缓存中的数据,而缓存会在后台自动更新,不会因为更新缓存而导致请求超时。
😊