# library/storages/base.py
"""
Базовые классы хранилищ.
"""
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, List, Optional, Dict, Any
from pydantic import BaseModel
import logging
from library.primitives.exceptions import StorageError, NotFoundError
T = TypeVar("T", bound=BaseModel)
class BaseRepository(ABC, Generic[T]):
"""
Базовый репозиторий (Repository Pattern).
CRUD операции для сущности.
Использование:
class ProductRepo(BaseRepository[Product]):
async def get(self, id: str) -> Optional[Product]:
...
repo = ProductRepo(connection)
product = await repo.get("123")
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
async def get(self, id: str) -> Optional[T]:
"""
Получить по ID.
Args:
id: Идентификатор
Returns:
Сущность или None
"""
pass
async def get_or_raise(self, id: str) -> T:
"""
Получить по ID или исключение.
Args:
id: Идентификатор
Returns:
Сущность
Raises:
NotFoundError: Если не найдено
"""
result = await self.get(id)
if result is None:
raise NotFoundError(
f"{self.__class__.__name__}: not found",
details={"id": id}
)
return result
@abstractmethod
async def get_many(self, ids: List[str]) -> List[T]:
"""
Получить несколько по ID.
Args:
ids: Список идентификаторов
Returns:
Список сущностей (может быть меньше чем ids)
"""
pass
@abstractmethod
async def find(
self,
filters: Dict[str, Any],
limit: int = 100,
offset: int = 0,
) -> List[T]:
"""
Найти по фильтрам.
Args:
filters: Фильтры {field: value}
limit: Лимит результатов
offset: Смещение
Returns:
Список сущностей
"""
pass
@abstractmethod
async def save(self, entity: T) -> T:
"""
Сохранить (insert или update).
Args:
entity: Сущность
Returns:
Сохранённая сущность (с ID)
"""
pass
@abstractmethod
async def save_many(self, entities: List[T]) -> List[T]:
"""
Сохранить несколько.
Args:
entities: Список сущностей
Returns:
Сохранённые сущности
"""
pass
@abstractmethod
async def delete(self, id: str) -> bool:
"""
Удалить по ID.
Args:
id: Идентификатор
Returns:
True если удалено
"""
pass
async def exists(self, id: str) -> bool:
"""
Проверить существование.
Args:
id: Идентификатор
Returns:
True если существует
"""
return await self.get(id) is not None
async def count(self, filters: Optional[Dict[str, Any]] = None) -> int:
"""
Подсчёт записей.
Args:
filters: Фильтры (опционально)
Returns:
Количество записей
"""
# Default: получить все и посчитать
# Переопределить для эффективности
results = await self.find(filters or {}, limit=100000)
return len(results)
async def upsert(self, entity: T) -> T:
"""
Insert или Update по ключу.
Args:
entity: Сущность
Returns:
Сохранённая сущность
"""
return await self.save(entity)
class BaseCache(ABC):
"""
Базовый класс кэша.
Простой key-value storage с TTL.
"""
@abstractmethod
async def get(self, key: str) -> Optional[Any]:
"""Получить значение."""
pass
@abstractmethod
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None,
) -> bool:
"""
Установить значение.
Args:
key: Ключ
value: Значение
ttl: Время жизни в секундах
Returns:
True если успешно
"""
pass
@abstractmethod
async def delete(self, key: str) -> bool:
"""Удалить значение."""
pass
@abstractmethod
async def exists(self, key: str) -> bool:
"""Проверить существование."""
pass
async def get_or_set(
self,
key: str,
factory,
ttl: Optional[int] = None,
) -> Any:
"""
Получить или создать значение.
Args:
key: Ключ
factory: Функция создания значения (sync или async)
ttl: TTL
Returns:
Значение
"""
value = await self.get(key)
if value is not None:
return value
# Создаём значение
import asyncio
if asyncio.iscoroutinefunction(factory):
value = await factory()
else:
value = factory()
await self.set(key, value, ttl)
return value