95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
"""Redis 缓存服务层。
|
||
|
||
提供统一的缓存读写接口,供各 API 端点使用:
|
||
- 品牌列表(TTL: 5 分钟)
|
||
- 仪表盘统计数据(TTL: 2 分钟)
|
||
- 用户配置信息(TTL: 10 分钟)
|
||
"""
|
||
import json
|
||
import logging
|
||
|
||
from app.core.redis import get_redis
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# TTL 常量(秒)
|
||
TTL_BRANDS = 300 # 5 分钟
|
||
TTL_DASHBOARD = 120 # 2 分钟
|
||
TTL_USER_PROFILE = 600 # 10 分钟
|
||
|
||
|
||
class CacheService:
|
||
"""异步 Redis 缓存服务。使用全局统一 Redis 连接池。"""
|
||
|
||
@property
|
||
def redis(self):
|
||
"""获取全局 Redis 连接(懒加载)。"""
|
||
from app.core.redis import _redis
|
||
return _redis
|
||
|
||
async def get(self, key: str) -> str | None:
|
||
"""从缓存读取字符串值,不存在或出错时返回 None。"""
|
||
try:
|
||
redis = await get_redis()
|
||
return await redis.get(key)
|
||
except Exception as exc:
|
||
logger.warning("Cache GET failed for key=%s: %s", key, exc)
|
||
return None
|
||
|
||
async def get_json(self, key: str) -> dict | list | str | int | float | bool | None:
|
||
"""从缓存读取并反序列化 JSON 值。"""
|
||
raw = await self.get(key)
|
||
if raw is None:
|
||
return None
|
||
try:
|
||
return json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
return None
|
||
|
||
async def set(self, key: str, value: str, expire: int = 300) -> None:
|
||
"""写入缓存字符串值,expire 单位为秒。"""
|
||
try:
|
||
redis = await get_redis()
|
||
await redis.set(key, value, ex=expire)
|
||
except Exception as exc:
|
||
logger.warning("Cache SET failed for key=%s: %s", key, exc)
|
||
|
||
async def set_json(self, key: str, value: dict | list | str | int | float | bool, expire: int = 300) -> None:
|
||
"""序列化为 JSON 后写入缓存。"""
|
||
try:
|
||
await self.set(key, json.dumps(value, default=str), expire=expire)
|
||
except Exception as exc:
|
||
logger.warning("Cache SET_JSON failed for key=%s: %s", key, exc)
|
||
|
||
async def delete(self, key: str) -> None:
|
||
"""删除指定缓存键。"""
|
||
try:
|
||
redis = await get_redis()
|
||
await redis.delete(key)
|
||
except Exception as exc:
|
||
logger.warning("Cache DELETE failed for key=%s: %s", key, exc)
|
||
|
||
async def invalidate_pattern(self, pattern: str) -> int:
|
||
"""批量删除匹配 pattern 的所有缓存键,返回删除数量。"""
|
||
try:
|
||
redis = await get_redis()
|
||
keys = await redis.keys(pattern)
|
||
if keys:
|
||
return await redis.delete(*keys)
|
||
return 0
|
||
except Exception as exc:
|
||
logger.warning("Cache INVALIDATE_PATTERN failed for pattern=%s: %s", pattern, exc)
|
||
return 0
|
||
|
||
|
||
# 模块级单例
|
||
_cache_service: CacheService | None = None
|
||
|
||
|
||
def get_cache_service() -> CacheService:
|
||
"""获取全局缓存服务单例。"""
|
||
global _cache_service
|
||
if _cache_service is None:
|
||
_cache_service = CacheService()
|
||
return _cache_service
|