system/agents/stacks/PROJECT_WORKFLOW.md

Workflow проекта — Что и когда использовать

Версия: 1.0.0
Для: Проектор, Кодер
Стек: FastAPI + Python


КАСКАД ЗАДАЧ ПРОЕКТА

ФАЗА 0: ИНИЦИАЛИЗАЦИЯ
       ↓
ФАЗА 1: ДАННЫЕ
       ↓
ФАЗА 2: БИЗНЕС-ЛОГИКА
       ↓
ФАЗА 3: API
       ↓
ФАЗА 4: БЕЗОПАСНОСТЬ
       ↓
ФАЗА 5: ФОНОВЫЕ ЗАДАЧИ
       ↓
ФАЗА 6: ИНТЕГРАЦИИ
       ↓
ФАЗА 7: ТЕСТИРОВАНИЕ
       ↓
ФАЗА 8: ДЕПЛОЙ
       ↓
ФАЗА 9: МОНИТОРИНГ

ФАЗА 0: ИНИЦИАЛИЗАЦИЯ ПРОЕКТА

0.1 Структура проекта

project/
├── app/
│   ├── __init__.py
│   ├── main.py              ← FastAPI app
│   ├── config.py            ← Settings
│   └── dependencies.py      ← DI
├── tests/
│   └── conftest.py
├── alembic/                  ← Миграции
├── pyproject.toml
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── .env.example

0.2 Конфигурация

Задача Решение Файл
Настройки pydantic-settings app/config.py
Секреты .env + python-dotenv .env
Docker docker-compose docker-compose.yml
# app/config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # Database
    database_url: str

    # Auth
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # External
    redis_url: str = "redis://localhost:6379"

    class Config:
        env_file = ".env"

settings = Settings()

0.3 Зависимости (requirements.txt)

# Core
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
pydantic>=2.0
pydantic-settings>=2.0

# Database
sqlalchemy[asyncio]>=2.0
asyncpg
alembic

# Auth
python-jose[cryptography]
passlib[bcrypt]

# Utils
httpx
orjson
python-multipart
aiofiles

# Dev
pytest
pytest-asyncio
httpx
ruff

ФАЗА 1: ДАННЫЕ (Database Layer)

1.1 Последовательность

1.1 Подключение к БД
     
1.2 Базовые модели (Base)
     
1.3 Модели сущностей (User, Product, Order...)
     
1.4 Миграции (Alembic)
     
1.5 Сессии и DI

1.2 Решения

Задача Решение Файл
ORM SQLAlchemy 2.0 async app/models/
Миграции Alembic alembic/
PostgreSQL asyncpg
MongoDB Beanie app/models/
Redis redis-py async app/core/redis.py

1.3 Код

# app/core/database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

from app.config import settings

engine = create_async_engine(settings.database_url)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session
# app/models/user.py
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import Base

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(unique=True, index=True)
    hashed_password: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

1.4 Alembic

# Инициализация
alembic init alembic

# Создание миграции
alembic revision --autogenerate -m "add users table"

# Применение
alembic upgrade head

ФАЗА 2: БИЗНЕС-ЛОГИКА (Service Layer)

2.1 Последовательность

2.1 CRUD операции (базовые)
     
2.2 Сервисы (бизнес-логика)
     
2.3 Валидаторы
     
2.4 Исключения

2.2 Решения

Задача Решение Файл
CRUD Базовый класс app/crud/base.py
Бизнес-логика Services app/services/
Исключения HTTPException app/core/exceptions.py

2.3 Код

# app/crud/base.py
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

ModelType = TypeVar("ModelType")

class CRUDBase(Generic[ModelType]):
    def __init__(self, model: type[ModelType]):
        self.model = model

    async def get(self, db: AsyncSession, id: int) -> ModelType | None:
        result = await db.execute(select(self.model).where(self.model.id == id))
        return result.scalar_one_or_none()

    async def get_all(self, db: AsyncSession, skip: int = 0, limit: int = 100):
        result = await db.execute(select(self.model).offset(skip).limit(limit))
        return result.scalars().all()

    async def create(self, db: AsyncSession, obj_in: dict) -> ModelType:
        db_obj = self.model(**obj_in)
        db.add(db_obj)
        await db.commit()
        await db.refresh(db_obj)
        return db_obj
# app/services/user_service.py
from app.crud.user import user_crud
from app.core.security import get_password_hash, verify_password

class UserService:
    async def create_user(self, db, email: str, password: str):
        hashed = get_password_hash(password)
        return await user_crud.create(db, {"email": email, "hashed_password": hashed})

    async def authenticate(self, db, email: str, password: str):
        user = await user_crud.get_by_email(db, email)
        if not user or not verify_password(password, user.hashed_password):
            return None
        return user

user_service = UserService()

ФАЗА 3: API (Presentation Layer)

3.1 Последовательность

3.1 Schemas (Pydantic)
     
3.2 Routers (endpoints)
     
3.3 Dependencies (DI)
     
3.4 Подключение к main.py

3.2 Решения

Задача Решение Файл
DTO Pydantic v2 app/schemas/
Endpoints APIRouter app/routers/
Валидация Pydantic автоматически
Документация OpenAPI автоматически

3.3 Код

# app/schemas/user.py
from pydantic import BaseModel, EmailStr, ConfigDict

class UserBase(BaseModel):
    email: EmailStr

class UserCreate(UserBase):
    password: str

class UserRead(UserBase):
    model_config = ConfigDict(from_attributes=True)

    id: int
    is_active: bool
# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_db
from app.schemas.user import UserCreate, UserRead
from app.services.user_service import user_service

router = APIRouter(prefix="/users", tags=["users"])

@router.post("/", response_model=UserRead)
async def create_user(user_in: UserCreate, db: AsyncSession = Depends(get_db)):
    return await user_service.create_user(db, user_in.email, user_in.password)

@router.get("/{user_id}", response_model=UserRead)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    user = await user_service.get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user
# app/main.py
from fastapi import FastAPI
from app.routers import users, products, orders

app = FastAPI(title="My API")

app.include_router(users.router)
app.include_router(products.router)
app.include_router(orders.router)

ФАЗА 4: БЕЗОПАСНОСТЬ (Security Layer)

4.1 Последовательность

4.1 Хеширование паролей
     
4.2 JWT токены
     
4.3 OAuth2 схема
     
4.4 Dependencies (get_current_user)
     
4.5 Защита endpoints

4.2 Решения

Задача Решение Файл
Хеширование passlib + bcrypt app/core/security.py
JWT python-jose app/core/security.py
OAuth2 FastAPI Security app/core/auth.py
RBAC роли в модели User app/dependencies.py

4.3 Код

# app/core/security.py
from datetime import datetime, timedelta
from jose import jwt
from passlib.context import CryptContext

from app.config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
# app/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession

from app.config import settings
from app.core.database import get_db
from app.crud.user import user_crud

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await user_crud.get(db, user_id)
    if user is None:
        raise credentials_exception
    return user

ФАЗА 5: ФОНОВЫЕ ЗАДАЧИ (Background Tasks)

5.1 Дерево выбора

Задача занимает < 5 сек?
├── ДА → BackgroundTasks (встроенный FastAPI)
│
└── НЕТ → Нужен retry/статус/результат?
          │
          ├── НЕТ → BackgroundTasks
          │
          └── ДА → Тип задачи?
                   │
                   ├── I/O (HTTP, файлы, email) → ARQ + Redis
                   │
                   └── CPU (расчёты, обработка) → Celery + Redis

5.2 Решения

Задача Решение Когда
Email после регистрации BackgroundTasks < 5 сек
Отправка 1000 email ARQ I/O, нужен retry
Генерация отчёта Celery CPU, долго
Синхронизация с API ARQ I/O, retry
Обработка изображений Celery CPU

5.3 Код: BackgroundTasks

# app/routers/users.py
from fastapi import BackgroundTasks

async def send_welcome_email(email: str):
    # async email sending
    pass

@router.post("/register")
async def register(
    user_in: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    user = await user_service.create_user(db, user_in)
    background_tasks.add_task(send_welcome_email, user.email)
    return user

5.4 Код: ARQ

# app/tasks/worker.py
from arq import create_pool
from arq.connections import RedisSettings

async def send_bulk_emails(ctx, emails: list[str], template: str):
    for email in emails:
        await send_email(email, template)

class WorkerSettings:
    functions = [send_bulk_emails]
    redis_settings = RedisSettings(host='redis')

# Использование в роутере
@router.post("/notify-all")
async def notify_all(db: AsyncSession = Depends(get_db)):
    redis = await create_pool(RedisSettings(host='redis'))
    emails = await get_all_user_emails(db)
    await redis.enqueue_job('send_bulk_emails', emails, "notification")

ФАЗА 6: ИНТЕГРАЦИИ (External Services)

6.1 Типы интеграций

ИСХОДЯЩИЕ (мы вызываем)          ВХОДЯЩИЕ (нас вызывают)
├── REST API                     ├── Webhooks
├── GraphQL                      └── Callbacks
├── SOAP
└── gRPC

6.2 Решения

Задача Решение Файл
HTTP клиент httpx async app/integrations/
Retry tenacity decorator
Rate limit asyncio.Semaphore
Webhook приём FastAPI endpoint app/routers/webhooks.py

6.3 Код

# app/integrations/base.py
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class BaseClient:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def _request(self, method: str, path: str, **kwargs):
        async with httpx.AsyncClient(timeout=30) as client:
            response = await client.request(
                method,
                f"{self.base_url}{path}",
                headers=self.headers,
                **kwargs
            )
            response.raise_for_status()
            return response.json()

# app/integrations/ozon.py
class OzonClient(BaseClient):
    async def get_products(self):
        return await self._request("POST", "/v2/product/list", json={})

ФАЗА 7: ТЕСТИРОВАНИЕ

7.1 Последовательность

7.1 Фикстуры (conftest.py)
     
7.2 Unit тесты (services, utils)
     
7.3 Integration тесты (API endpoints)
     
7.4 E2E тесты (полный flow)

7.2 Решения

Задача Решение
Фреймворк pytest
Async тесты pytest-asyncio
HTTP клиент httpx.AsyncClient
Моки HTTP respx
Моки общие pytest-mock
Покрытие pytest-cov
БД тесты transaction rollback

7.3 Код

# tests/conftest.py
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from app.main import app
from app.core.database import Base, get_db

TEST_DATABASE_URL = "postgresql+asyncpg://test:test@localhost/test_db"

@pytest.fixture
async def db_session():
    engine = create_async_engine(TEST_DATABASE_URL)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        yield session
        await session.rollback()

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

@pytest.fixture
async def client(db_session):
    def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test"
    ) as ac:
        yield ac

    app.dependency_overrides.clear()

# tests/test_users.py
@pytest.mark.asyncio
async def test_create_user(client):
    response = await client.post("/users/", json={
        "email": "test@example.com",
        "password": "secret123"
    })
    assert response.status_code == 200
    assert response.json()["email"] == "test@example.com"

ФАЗА 8: ДЕПЛОЙ

8.1 Последовательность

8.1 Dockerfile
     
8.2 docker-compose.yml
     
8.3 CI/CD (GitHub Actions)
     
8.4 Production config

8.2 Решения

Задача Решение
Контейнер Docker
Оркестрация docker-compose
ASGI сервер gunicorn + uvicorn
Прокси Traefik / Nginx
HTTPS Let's Encrypt
CI/CD GitHub Actions

8.3 Код

# Dockerfile
FROM python:3.12-slim

WORKDIR /app

# Dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# App
COPY . .

# Run
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.9"

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/app
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis

  db:
    image: postgres:16
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: app
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine

volumes:
  postgres_data:

ФАЗА 9: МОНИТОРИНГ

9.1 Решения

Задача Решение
Метрики prometheus-fastapi-instrumentator
Логи structlog + JSON
Трейсинг OpenTelemetry
Алерты Prometheus + Alertmanager

9.2 Код

# app/main.py
from prometheus_fastapi_instrumentator import Instrumentator

app = FastAPI()

# Prometheus metrics
Instrumentator().instrument(app).expose(app)
# app/core/logging.py
import structlog

structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

log = structlog.get_logger()

СВОДНАЯ ТАБЛИЦА

Фаза Задача Стандарт
0 Конфиг pydantic-settings
1 ORM SQLAlchemy 2.0 async
1 Миграции Alembic
2 CRUD Базовый класс
2 Бизнес-логика Services
3 DTO Pydantic v2
3 Endpoints APIRouter
4 Пароли passlib + bcrypt
4 JWT python-jose
5 Простые задачи BackgroundTasks
5 I/O задачи ARQ
5 CPU задачи Celery
6 HTTP клиент httpx + tenacity
7 Тесты pytest + httpx
8 Контейнер Docker
8 Сервер gunicorn + uvicorn
9 Метрики Prometheus
9 Логи structlog

ЧЕКЛИСТ НОВОГО ПРОЕКТА


Версия: 1.0.0