Python异步Redis客户端与通用缓存装饰器

前言

这里我将通过 redis-py 简易封装一个异步的Redis客户端,然后主要讲解设计一个支持各种缓存代理(本地内存、Redis等)的缓存装饰器,用于在减少一些不必要的计算、存储层的查询、网络IO等。

具体代码都封装在 HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com) 中,以大家便捷使用。

异步redis客户端

首先安装 redis-py 库

pip install redis

Redis 之前是不支持异步的,后面为了统一异步redis操作与python常用的redis.py 的api接口一致,aioredis的作者已经将 aioredis 加入了redis中维护,安装的版本大于 4.2.0rc1 就行。

  • aioredis:https://github.com/aio-libs-abandoned/aioredis-py
  • redis:https://github.com/redis/redis-py

BaseRedisManager 封装

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { redis连接处理模块 }
# @Date: 2023/05/03 21:13
from datetime import timedelta
from typing import Optional, Unionfrom redis import Redis
from redis import asyncio as aioredisfrom py_tools import constants
from py_tools.decorators.cache import CacheMeta, cache_json, RedisCacheProxy, AsyncRedisCacheProxyclass BaseRedisManager:"""Redis客户端管理器"""client: Union[Redis, aioredis.Redis] = Nonecache_key_prefix = constants.CACHE_KEY_PREFIX@classmethoddef init_redis_client(cls,async_client: bool = False,host: str = "localhost",port: int = 6379,db: int = 0,password: Optional[str] = None,max_connections: Optional[int] = None,**kwargs):"""初始化 Redis 客户端。Args:async_client (bool): 是否使用异步客户端,默认为 False(同步客户端)host (str): Redis 服务器的主机名,默认为 'localhost'port (int): Redis 服务器的端口,默认为 6379db (int): 要连接的数据库编号,默认为 0password (Optional[str]): 密码可选max_connections (Optional[int]): 最大连接数。默认为 None(不限制连接数)**kwargs: 传递给 Redis 客户端的其他参数Returns:None"""if cls.client is None:redis_client_cls = Redisif async_client:redis_client_cls = aioredis.Rediscls.client = redis_client_cls(host=host, port=port, db=db, password=password, max_connections=max_connections, **kwargs)return cls.client@classmethoddef cache_json(cls,ttl: Union[int, timedelta] = 60,key_prefix: str = None,):"""缓存装饰器(仅支持缓存能够json序列化的数据)缓存函数整体结果Args:ttl: 过期时间 默认60skey_prefix: 默认的key前缀, 再未指定key时使用Returns:"""key_prefix = key_prefix or cls.cache_key_prefixif isinstance(ttl, timedelta):ttl = int(ttl.total_seconds())cache_proxy = RedisCacheProxy(cls.client)if isinstance(cls.client, aioredis.Redis):cache_proxy = AsyncRedisCacheProxy(cls.client)return cache_json(cache_proxy=cache_proxy, key_prefix=key_prefix, ttl=ttl)

还是跟之前封装客户端一样的简易封装,由类属性 client 维护真正操作的redis的客户端,通过 init_redis_client 方法进行初始化。这样封装的目的就是在系统中只初始化一份 redis 客户端,操作时可以直接使用类方法。BaseRedisManager 只实现一些通用的 redis 操作(有待挖掘),具体还是需要业务Manager来继承封装业务中操作redis的方法。目前只实现了一个redis的缓存装饰器,其实内部就是组织参数设置redis代理,然后调用另外一个通用的缓存装饰器,这样使用的时候不需要制定缓存代理了。

缓存装饰器

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedeltaimport cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constantsMEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))def cache_json(cache_proxy: BaseCacheProxy = MEMORY_PROXY,key_prefix: str = constants.CACHE_KEY_PREFIX,ttl: Union[int, timedelta] = 60,
):"""缓存装饰器(仅支持缓存能够json序列化的数据)Args:cache_proxy: 缓存代理客户端, 默认系统内存ttl: 过期时间 默认60skey_prefix: 默认的key前缀Returns:"""key_prefix = f"{key_prefix}:cache_json"if isinstance(ttl, timedelta):ttl = int(ttl.total_seconds())def _cache(func):def _gen_key(*args, **kwargs):"""生成缓存的key"""# 根据函数信息与参数生成# key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hashparam_args_str = ",".join([str(arg) for arg in args])param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()# 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hashhash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"return hash_key@functools.wraps(func)def sync_wrapper(*args, **kwargs):"""同步处理"""# 生成缓存的keyhash_key = _gen_key(*args, **kwargs)# 先从缓存获取数据cache_data = cache_proxy.get(hash_key)if cache_data:# 有直接返回print(f"命中缓存: {hash_key}")return json.loads(cache_data)# 没有,执行函数获取结果ret = func(*args, **kwargs)# 缓存结果cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)return ret@functools.wraps(func)async def async_wrapper(*args, **kwargs):"""异步处理"""# 生成缓存的keyhash_key = _gen_key(*args, **kwargs)# 先从缓存获取数据cache_data = await cache_proxy.get(hash_key)if cache_data:# 有直接返回return json.loads(cache_data)# 没有,执行函数获取结果ret = await func(*args, **kwargs)# 缓存结果await cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)return retreturn async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapperreturn _cache

cache_json 是一个带单参数的缓存装饰器,可以指定一些缓存的代理、缓存key前缀、缓存ttl等。

内部实现了同步、异步函数的缓存处理,关键点其实就是如何构造唯一的缓存key,这里就是根据key前缀与函数的一些签名信息来构造的。

def _gen_key(*args, **kwargs):"""生成缓存的key"""# 没有传递key信息,根据函数信息与参数生成# key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hashparam_args_str = ",".join([str(arg) for arg in args])param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()# 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hashhash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"return hash_key

函数所在模块:函数名:函数位置参数:函数关键字参数 进行hash,在处理关键字参数的需要排个序,来保证相同的参数,顺序不同但缓存key一致。后面的逻辑就是常见的设置缓存操作。

image.png

缓存代理类

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedeltaimport cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constantsclass CacheMeta(BaseModel):"""缓存元信息"""key: str = Field(description="缓存的key")ttl: Union[int, timedelta] = Field(description="缓存有效期")cache_client: str = Field(description="缓存的客户端(Redis、Memcached等)")data_type: str = Field(description="缓存的数据类型(str、list、hash、set)")class BaseCacheProxy(object):"""缓存代理基类"""def __init__(self, cache_client):self.cache_client = cache_client  # 具体的缓存客户端,例如Redis、Memcached等def set(self, key: str, value: str, ttl: int):raise NotImplementeddef get(self, key):cache_data = self.cache_client.get(key)return cache_dataclass RedisCacheProxy(BaseCacheProxy):"""同步redis缓存代理"""def __init__(self, cache_client: Redis):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.setex(name=key, value=value, time=ttl)class AsyncRedisCacheProxy(BaseCacheProxy):"""异步Redis缓存代理"""def __init__(self, cache_client: aioredis.Redis):super().__init__(cache_client)async def set(self, key, value, ttl):await self.cache_client.setex(name=key, value=value, time=ttl)async def get(self, key):cache_data = await self.cache_client.get(key)return cache_dataclass MemoryCacheProxy(BaseCacheProxy):"""系统内存缓存代理"""def __init__(self, cache_client: cacheout.Cache):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.set(key=key, value=value, ttl=ttl)MEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))

这里设置一个缓存代理抽象类是用于封装屏蔽不同缓存客户端的操作不一致性。统一成如下入口

def set(self, key: str, value: str, ttl: int):raise NotImplementeddef get(self, key):cache_data = self.cache_client.get(key)return cache_data

让具体的缓存客户端重写(实现)这两个方法,以达到缓存装饰器的通用性。目前只实现了同步、异步redis缓存代理以及通过 cacheout 库实现的本地内存缓存代理,后面接入其他的缓存代理(例如Memcached等)就不用动cache_json函数了,只要继承 BaseCacheProxy,实现具体的 set、get 操作即可。

pip install python-memcached
import memcacheclass MemcacheCacheProxy(BaseCacheProxy):def __init__(self, cache_client: memcache.Client):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.set(key, value, time=ttl)

由于获取缓存的方法逻辑一致,故而直接复用就行。

测试Demo

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @File: cache.py
# @Desc: { cache demo 模块 }
# @Date: 2024/04/23 11:11
import asyncio
import time
from datetime import timedeltaimport cacheoutfrom py_tools.connections.db.redis_client import BaseRedisManager
from py_tools.decorators.cache import cache_json, MemoryCacheProxy, RedisCacheProxy, AsyncRedisCacheProxyclass RedisManager(BaseRedisManager):client = Noneclass AsyncRedisManager(BaseRedisManager):client = NoneRedisManager.init_redis_client(async_client=False)
AsyncRedisManager.init_redis_client(async_client=True)memory_proxy = MemoryCacheProxy(cache_client=cacheout.Cache())
redis_proxy = RedisCacheProxy(cache_client=RedisManager.client)
aredis_proxy = AsyncRedisCacheProxy(cache_client=AsyncRedisManager.client)@cache_json(key_prefix="demo", ttl=3)
def memory_cache_demo_func(name: str, age: int):return {"test_memory_cache": "hui-test", "name": name, "age": age}@cache_json(cache_proxy=redis_proxy, ttl=10)
def redis_cache_demo_func(name: str, age: int):return {"test_redis_cache": "hui-test", "name": name, "age": age}@cache_json(cache_proxy=aredis_proxy, ttl=timedelta(minutes=1))
async def aredis_cache_demo_func(name: str, age: int):return {"test_async_redis_cache": "hui-test", "name": name, "age": age}@AsyncRedisManager.cache_json(ttl=30)
async def aredis_manager_cache_demo_func(name: str, age: int):return {"test_async_redis_manager_cache": "hui-test", "name": name, "age": age}def memory_cache_demo():print("memory_cache_demo")ret1 = memory_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = memory_cache_demo_func(name="hui", age=18)print("ret2", ret2)print()time.sleep(3)ret3 = memory_cache_demo_func(age=18, name="hui")print("ret3", ret3)print()assert ret1 == ret2 == ret3def redis_cache_demo():print("redis_cache_demo")ret1 = redis_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = redis_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def aredis_cache_demo():print("aredis_cache_demo")ret1 = await aredis_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = await aredis_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def aredis_manager_cache_demo():print("aredis_manager_cache_demo")ret1 = await aredis_manager_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = await aredis_manager_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def main():memory_cache_demo()redis_cache_demo()await aredis_cache_demo()await aredis_manager_cache_demo()if __name__ == '__main__':asyncio.run(main())

输出结果

memory_cache_demo
ret1 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}命中缓存: demo:cache_json:__main__:memory_cache_demo_func:46c6a618a88eb5067a00915c10c97c6c72d5073ecf9b04060433de75b2d21f51
ret2 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}ret3 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}redis_cache_demo
ret1 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}命中缓存: py-tools:cache_json:__main__:redis_cache_demo_func:a00b13aa2e1e56ad328d1956bc3c3fb8e89b7007453a780e866cc3ccafb51d73
ret2 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_cache_demo
ret1 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}ret2 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_manager_cache_demo
ret1 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}ret2 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}

Redis 缓存情况

缓存信息还是挺清晰的就是有点长。由于是从主入口调用的函数,所以 func.__module__ 是 __main__。

这缓存装饰器一般适用于一些参数相同, 结果经常不变的情况下,以及允许短时间内出现数据不一致的场景。如下是一些典型的应用场景

  1. API Token缓存:对于需要使用API Token进行身份验证的API请求,通常API Token具有一定的有效期。在这种情况下,你可以使用缓存装饰器来缓存API Token,以避免在每次请求时重新生成或从后端服务获取。这样可以降低对后端服务的负载,并提高系统的响应速度。
  2. OSS Sign URL缓存:当需要生成签名URL来访问对象存储服务(如AWS S3、阿里云OSS)中的资源时,通常需要对URL进行签名以确保安全性。在这种情况下,你可以使用缓存装饰器来缓存已签名的URL,在一定时间内重复使用相同的签名URL,而不必重新计算签名。这样可以降低对签名计算资源的消耗,并减少重复的签名请求。
  3. 频繁查询的数据缓存:对于一些数据不经常变化但是频繁被查询的情况,比如一些静态配置信息、全局参数等,可以使用缓存装饰器将查询结果缓存起来,减少数据库查询次数,提高系统的响应速度。
  4. 外部API响应结果缓存:当你调用外部API获取数据时,有时这些数据在一段时间内不会发生变化。在这种情况下,你可以使用缓存装饰器来缓存外部API的响应结果,以避免频繁地向外部API发出请求。这不仅可以提高系统的性能,还可以降低对外部服务的依赖性。

总的来说,缓存装饰器可以应用于许多场景,特别是在需要提高性能、减少资源消耗和避免重复请求数据的情况下。通过合理地设置缓存时间,可以权衡数据的新鲜度和系统性能,从而实现更好的用户体验。

源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/317887.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯练习系统(算法训练)ALGO-953 混合积

资源限制 内存限制:256.0MB C/C时间限制:1.0s Java时间限制:3.0s Python时间限制:5.0s 问题描述 众所周知,人人都在学习线性代数,既然都学过,那么解决本题应该很方便。   宇宙大战中&…

Python量化炒股的财务因子选股—质量因子选股

Python量化炒股的财务因子选股—质量因子选股 在Python财务因子量化选股中,质量类因子有2个,分别是净资产收益率和总资产净利率。需要注意的是,质量类因子在财务指标数据表indicator中。 净资产收益率(roe)选股 净资…

Linux基础——Linux开发工具(上)_vim

前言:在了解完Linux基本指令和Linux权限后,我们有了足够了能力来学习后面的内容,但是在真正进入Linux之前,我们还得要学会使用Linux中的几个开发工具。而我们主要介绍的是以下几个: yum, vim, gcc / g, gdb, make / ma…

Spark核心名词解释与编程

Spark核心概念 名词解释 1)ClusterManager:在Standalone(上述安装的模式,也就是依托于spark集群本身)模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager(国内…

YOLOv9/YOLOv8算法改进【NO.128】 使用ICCV2023超轻量级且高效的动态上采样器( DySample)改进yolov8中的上采样

前 言 YOLO算法改进系列出到这,很多朋友问改进如何选择是最佳的,下面我就根据个人多年的写作发文章以及指导发文章的经验来看,按照优先顺序进行排序讲解YOLO算法改进方法的顺序选择。具体有需求的同学可以私信我沟通: 首推…

如何远程访问连接管理器?

远程访问连接管理器是一种方便的工具,可以实现远程访问计算机和网络设备的功能。它使用户能够从任何地点连接到远程计算机,并进行文件传输、桌面共享和远程控制等操作。远程访问连接管理器不仅提供了便利性,还能提高工作效率,并为…

机器人正反向运动学(FK和IK)

绕第一个顶点可以沿Z轴转动,角度用alpha表示 绕第二个点沿X轴转动,角度为Beta 第三个点沿X轴转动,记作gama 这三个点构成姿态(pose) 我们记第一个点为P0,画出它的本地坐标系,和世界坐标系一样红…

AI智能名片商城小程序:引领企业迈向第三增长极

随着数字化浪潮的席卷,私域流量的重要性逐渐凸显,为企业增长提供了全新的动力。在这一背景下,AI智能名片商城系统崭露头角,以其独特的优势,引领企业迈向第三增长极。 私域流量的兴起,为企业打开了一扇新的销…

知乎广告开户流程,知乎广告的优势是什么?

社交媒体平台不仅是用户获取知识、分享见解的场所,更是品牌展示、产品推广的重要舞台。知乎作为国内知名的知识分享社区,以其高质量的内容生态和庞大的用户基础,成为了众多企业进行广告投放的优选之地。云衔科技通过其专业服务,助…

LabVIEW飞机机电系统综合测试平台

LabVIEW飞机机电系统综合测试平台 在现代航空领域,机电系统的准确性与可靠性对飞行安全至关重要。针对飞机机电管理计算机(UMC)复杂度增加、测试覆盖率低、效率不高等问题,开发了一套基于LabVIEW的机电系统综合测试平台。平台通过…

go设计模式之抽象工厂模式

抽象工厂模式 提供一个创建一系列相关或相互依赖对象的接口,而无需指定它们具体的类。 工厂方法模式通过引入工厂等级结构,解决了简单工厂模式中工厂类职责太重的问题,但由于工厂方法模式中的每个工厂只生产一类产品,可能会导致…

06_电子设计教程基础篇(学习视频推荐)

文章目录 前言一、基础视频1、电路原理3、模电4、高频电子线路5、电力电子技术6、数学物理方法7、电磁场与电磁波8、信号系统9、自动控制原理10、通信原理11、单片机原理 二、科普视频1、工科男孙老师2、达尔闻3、爱上半导体4、华秋商城5、JT硬件乐趣6、洋桃电子 三、教学视频1…

24.4.28(板刷dp,拓扑判环,区间dp+容斥算回文串总数)

星期一: 昨晚cf又掉分,小掉不算掉 补ABC350 D atc传送门 思路:对每个连通块,使其成为一个完全图,完全图的边数为 n*(n-1)/2 , 答案加上每个连通块成为完全图后的…

VS2022 配置OpenCV开发环境详细教程

OpenCV OpenCV(Open Source Computer Vision Library)是一个开源的计算机视觉和机器学习软件库,由Intel开发并首先发布于1999年。OpenCV被广泛用于实时图像处理、视频分析、物体检测、面部识别、机器人视觉以及许多其他领域。它支持C、Pytho…

远距离、高品质、低延迟、高保真——SA316无线音频模块带您探索新的音频体验

SA316系列产品分为发射端模块SA316S-TX,SA316F30和接收端模块SA316-RX,该系列方案采用了无线高品质的语音传输芯片来设计,它可以支持外部 PCM / IIS 双模数字音频接口,同时模块为客户提供了标准化的串行接口,使用者可通过串口指令…

Linux-进程调度器

1. 前言 在计算机中,进程的数量远多于cpu的数量,所以就存在,多个进程抢占一个cpu的情况,所以就需要一套规则,决定这些进程被处理的顺序,这就叫做进程调度。 在我的简单理解下,其实就是把进程放…

Docker 安装部署 postgres

Docker 安装部署 postgres 1、拉取 postgres 镜像文件 [rootiZbp19a67kznq0h0rgosuxZ ~]# docker pull postgres:latest latest: Pulling from library/postgres b0a0cf830b12: Pull complete dda3d8fbd5ed: Pull complete 283a477db7bb: Pull complete 91d2729fa4d5: Pul…

【docker】Docker开启远程访问

将构建的镜像自动上传到服务器。 需要开放 Docker 的端口,让我们在本地能连接上服务器的 Docker,这样,才能上传构建的镜像给 Docker。 开启远程访问 首先在服务器打开 Docker 的服务文件 vim /usr/lib/systemd/system/docker.service修改…

android studio项目实战——备忘录(附源码)

成果展示&#xff1a; 1.前期准备 &#xff08;1&#xff09;在配置文件中添加权限及启动页面顺序 ①展开工程&#xff0c;打开app下方的AndroidManifest.xml,添加权限&#xff0c;如下&#xff1a; <uses-permission android:name"android.permission.CAMERA"…

OpenHarmony开源软件供应链安全风险

慕冬亮&#xff0c;华中科技大学网络空间安全学院副教授&#xff0c;武汉英才&#xff0c;华中科技大学OpenHarmony技术俱乐部、开放原子开源社团指导教师。研究方向为软件与系统安全&#xff0c;在国际安全会议上发表十余篇论文&#xff0c;并获得ACM CCS 2018杰出论文奖。创立…