Workflow проекта — Что и когда использовать
Версия: 1.0.0
Для: Проектор, Кодер
Стек: FastAPI + Python
КАСКАД ЗАДАЧ ПРОЕКТА
ФАЗА 0: ИНИЦИАЛИЗАЦИЯ
↓
ФАЗА 1: ДАННЫЕ
↓
ФАЗА 2: БИЗНЕС-ЛОГИКА
↓
ФАЗА 3: API
↓
ФАЗА 4: БЕЗОПАСНОСТЬ
↓
ФАЗА 5: ФОНОВЫЕ ЗАДАЧИ
↓
ФАЗА 6: ИНТЕГРАЦИИ
↓
ФАЗА 7: ТЕСТИРОВАНИЕ
↓
ФАЗА 8: ДЕПЛОЙ
↓
ФАЗА 9: МОНИТОРИНГ
ФАЗА 0: ИНИЦИАЛИЗАЦИЯ ПРОЕКТА
0.1 Структура проекта
project/
├── app/
│ ├── __init__.py
│ ├── main.py ← FastAPI app
│ ├── config.py ← Settings
│ └── dependencies.py ← DI
├── tests/
│ └── conftest.py
├── alembic/ ← Миграции
├── pyproject.toml
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── .env.example
0.2 Конфигурация
| Задача |
Решение |
Файл |
| Настройки |
pydantic-settings |
app/config.py |
| Секреты |
.env + python-dotenv |
.env |
| Docker |
docker-compose |
docker-compose.yml |
# app/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Database
database_url: str
# Auth
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# External
redis_url: str = "redis://localhost:6379"
class Config:
env_file = ".env"
settings = Settings()
0.3 Зависимости (requirements.txt)
# Core
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
pydantic>=2.0
pydantic-settings>=2.0
# Database
sqlalchemy[asyncio]>=2.0
asyncpg
alembic
# Auth
python-jose[cryptography]
passlib[bcrypt]
# Utils
httpx
orjson
python-multipart
aiofiles
# Dev
pytest
pytest-asyncio
httpx
ruff
ФАЗА 1: ДАННЫЕ (Database Layer)
1.1 Последовательность
1.1 Подключение к БД
↓
1.2 Базовые модели (Base)
↓
1.3 Модели сущностей (User, Product, Order...)
↓
1.4 Миграции (Alembic)
↓
1.5 Сессии и DI
1.2 Решения
| Задача |
Решение |
Файл |
| ORM |
SQLAlchemy 2.0 async |
app/models/ |
| Миграции |
Alembic |
alembic/ |
| PostgreSQL |
asyncpg |
— |
| MongoDB |
Beanie |
app/models/ |
| Redis |
redis-py async |
app/core/redis.py |
1.3 Код
# app/core/database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import settings
engine = create_async_engine(settings.database_url)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
# app/models/user.py
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True, index=True)
hashed_password: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
1.4 Alembic
# Инициализация
alembic init alembic
# Создание миграции
alembic revision --autogenerate -m "add users table"
# Применение
alembic upgrade head
ФАЗА 2: БИЗНЕС-ЛОГИКА (Service Layer)
2.1 Последовательность
2.1 CRUD операции (базовые)
↓
2.2 Сервисы (бизнес-логика)
↓
2.3 Валидаторы
↓
2.4 Исключения
2.2 Решения
| Задача |
Решение |
Файл |
| CRUD |
Базовый класс |
app/crud/base.py |
| Бизнес-логика |
Services |
app/services/ |
| Исключения |
HTTPException |
app/core/exceptions.py |
2.3 Код
# app/crud/base.py
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
ModelType = TypeVar("ModelType")
class CRUDBase(Generic[ModelType]):
def __init__(self, model: type[ModelType]):
self.model = model
async def get(self, db: AsyncSession, id: int) -> ModelType | None:
result = await db.execute(select(self.model).where(self.model.id == id))
return result.scalar_one_or_none()
async def get_all(self, db: AsyncSession, skip: int = 0, limit: int = 100):
result = await db.execute(select(self.model).offset(skip).limit(limit))
return result.scalars().all()
async def create(self, db: AsyncSession, obj_in: dict) -> ModelType:
db_obj = self.model(**obj_in)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
# app/services/user_service.py
from app.crud.user import user_crud
from app.core.security import get_password_hash, verify_password
class UserService:
async def create_user(self, db, email: str, password: str):
hashed = get_password_hash(password)
return await user_crud.create(db, {"email": email, "hashed_password": hashed})
async def authenticate(self, db, email: str, password: str):
user = await user_crud.get_by_email(db, email)
if not user or not verify_password(password, user.hashed_password):
return None
return user
user_service = UserService()
ФАЗА 3: API (Presentation Layer)
3.1 Последовательность
3.1 Schemas (Pydantic)
↓
3.2 Routers (endpoints)
↓
3.3 Dependencies (DI)
↓
3.4 Подключение к main.py
3.2 Решения
| Задача |
Решение |
Файл |
| DTO |
Pydantic v2 |
app/schemas/ |
| Endpoints |
APIRouter |
app/routers/ |
| Валидация |
Pydantic |
автоматически |
| Документация |
OpenAPI |
автоматически |
3.3 Код
# app/schemas/user.py
from pydantic import BaseModel, EmailStr, ConfigDict
class UserBase(BaseModel):
email: EmailStr
class UserCreate(UserBase):
password: str
class UserRead(UserBase):
model_config = ConfigDict(from_attributes=True)
id: int
is_active: bool
# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.schemas.user import UserCreate, UserRead
from app.services.user_service import user_service
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserRead)
async def create_user(user_in: UserCreate, db: AsyncSession = Depends(get_db)):
return await user_service.create_user(db, user_in.email, user_in.password)
@router.get("/{user_id}", response_model=UserRead)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
user = await user_service.get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# app/main.py
from fastapi import FastAPI
from app.routers import users, products, orders
app = FastAPI(title="My API")
app.include_router(users.router)
app.include_router(products.router)
app.include_router(orders.router)
ФАЗА 4: БЕЗОПАСНОСТЬ (Security Layer)
4.1 Последовательность
4.1 Хеширование паролей
↓
4.2 JWT токены
↓
4.3 OAuth2 схема
↓
4.4 Dependencies (get_current_user)
↓
4.5 Защита endpoints
4.2 Решения
| Задача |
Решение |
Файл |
| Хеширование |
passlib + bcrypt |
app/core/security.py |
| JWT |
python-jose |
app/core/security.py |
| OAuth2 |
FastAPI Security |
app/core/auth.py |
| RBAC |
роли в модели User |
app/dependencies.py |
4.3 Код
# app/core/security.py
from datetime import datetime, timedelta
from jose import jwt
from passlib.context import CryptContext
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
# app/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.core.database import get_db
from app.crud.user import user_crud
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await user_crud.get(db, user_id)
if user is None:
raise credentials_exception
return user
ФАЗА 5: ФОНОВЫЕ ЗАДАЧИ (Background Tasks)
5.1 Дерево выбора
Задача занимает < 5 сек?
├── ДА → BackgroundTasks (встроенный FastAPI)
│
└── НЕТ → Нужен retry/статус/результат?
│
├── НЕТ → BackgroundTasks
│
└── ДА → Тип задачи?
│
├── I/O (HTTP, файлы, email) → ARQ + Redis
│
└── CPU (расчёты, обработка) → Celery + Redis
5.2 Решения
| Задача |
Решение |
Когда |
| Email после регистрации |
BackgroundTasks |
< 5 сек |
| Отправка 1000 email |
ARQ |
I/O, нужен retry |
| Генерация отчёта |
Celery |
CPU, долго |
| Синхронизация с API |
ARQ |
I/O, retry |
| Обработка изображений |
Celery |
CPU |
5.3 Код: BackgroundTasks
# app/routers/users.py
from fastapi import BackgroundTasks
async def send_welcome_email(email: str):
# async email sending
pass
@router.post("/register")
async def register(
user_in: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
user = await user_service.create_user(db, user_in)
background_tasks.add_task(send_welcome_email, user.email)
return user
5.4 Код: ARQ
# app/tasks/worker.py
from arq import create_pool
from arq.connections import RedisSettings
async def send_bulk_emails(ctx, emails: list[str], template: str):
for email in emails:
await send_email(email, template)
class WorkerSettings:
functions = [send_bulk_emails]
redis_settings = RedisSettings(host='redis')
# Использование в роутере
@router.post("/notify-all")
async def notify_all(db: AsyncSession = Depends(get_db)):
redis = await create_pool(RedisSettings(host='redis'))
emails = await get_all_user_emails(db)
await redis.enqueue_job('send_bulk_emails', emails, "notification")
ФАЗА 6: ИНТЕГРАЦИИ (External Services)
6.1 Типы интеграций
ИСХОДЯЩИЕ (мы вызываем) ВХОДЯЩИЕ (нас вызывают)
├── REST API ├── Webhooks
├── GraphQL └── Callbacks
├── SOAP
└── gRPC
6.2 Решения
| Задача |
Решение |
Файл |
| HTTP клиент |
httpx async |
app/integrations/ |
| Retry |
tenacity |
decorator |
| Rate limit |
asyncio.Semaphore |
— |
| Webhook приём |
FastAPI endpoint |
app/routers/webhooks.py |
6.3 Код
# app/integrations/base.py
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class BaseClient:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
async def _request(self, method: str, path: str, **kwargs):
async with httpx.AsyncClient(timeout=30) as client:
response = await client.request(
method,
f"{self.base_url}{path}",
headers=self.headers,
**kwargs
)
response.raise_for_status()
return response.json()
# app/integrations/ozon.py
class OzonClient(BaseClient):
async def get_products(self):
return await self._request("POST", "/v2/product/list", json={})
ФАЗА 7: ТЕСТИРОВАНИЕ
7.1 Последовательность
7.1 Фикстуры (conftest.py)
↓
7.2 Unit тесты (services, utils)
↓
7.3 Integration тесты (API endpoints)
↓
7.4 E2E тесты (полный flow)
7.2 Решения
| Задача |
Решение |
| Фреймворк |
pytest |
| Async тесты |
pytest-asyncio |
| HTTP клиент |
httpx.AsyncClient |
| Моки HTTP |
respx |
| Моки общие |
pytest-mock |
| Покрытие |
pytest-cov |
| БД тесты |
transaction rollback |
7.3 Код
# tests/conftest.py
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.core.database import Base, get_db
TEST_DATABASE_URL = "postgresql+asyncpg://test:test@localhost/test_db"
@pytest.fixture
async def db_session():
engine = create_async_engine(TEST_DATABASE_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
yield session
await session.rollback()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def client(db_session):
def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as ac:
yield ac
app.dependency_overrides.clear()
# tests/test_users.py
@pytest.mark.asyncio
async def test_create_user(client):
response = await client.post("/users/", json={
"email": "test@example.com",
"password": "secret123"
})
assert response.status_code == 200
assert response.json()["email"] == "test@example.com"
ФАЗА 8: ДЕПЛОЙ
8.1 Последовательность
8.1 Dockerfile
↓
8.2 docker-compose.yml
↓
8.3 CI/CD (GitHub Actions)
↓
8.4 Production config
8.2 Решения
| Задача |
Решение |
| Контейнер |
Docker |
| Оркестрация |
docker-compose |
| ASGI сервер |
gunicorn + uvicorn |
| Прокси |
Traefik / Nginx |
| HTTPS |
Let's Encrypt |
| CI/CD |
GitHub Actions |
8.3 Код
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# App
COPY . .
# Run
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.9"
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/app
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
db:
image: postgres:16
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: app
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
postgres_data:
ФАЗА 9: МОНИТОРИНГ
9.1 Решения
| Задача |
Решение |
| Метрики |
prometheus-fastapi-instrumentator |
| Логи |
structlog + JSON |
| Трейсинг |
OpenTelemetry |
| Алерты |
Prometheus + Alertmanager |
9.2 Код
# app/main.py
from prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI()
# Prometheus metrics
Instrumentator().instrument(app).expose(app)
# app/core/logging.py
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
log = structlog.get_logger()
СВОДНАЯ ТАБЛИЦА
| Фаза |
Задача |
Стандарт |
| 0 |
Конфиг |
pydantic-settings |
| 1 |
ORM |
SQLAlchemy 2.0 async |
| 1 |
Миграции |
Alembic |
| 2 |
CRUD |
Базовый класс |
| 2 |
Бизнес-логика |
Services |
| 3 |
DTO |
Pydantic v2 |
| 3 |
Endpoints |
APIRouter |
| 4 |
Пароли |
passlib + bcrypt |
| 4 |
JWT |
python-jose |
| 5 |
Простые задачи |
BackgroundTasks |
| 5 |
I/O задачи |
ARQ |
| 5 |
CPU задачи |
Celery |
| 6 |
HTTP клиент |
httpx + tenacity |
| 7 |
Тесты |
pytest + httpx |
| 8 |
Контейнер |
Docker |
| 8 |
Сервер |
gunicorn + uvicorn |
| 9 |
Метрики |
Prometheus |
| 9 |
Логи |
structlog |
ЧЕКЛИСТ НОВОГО ПРОЕКТА
- [ ] Структура папок создана
- [ ] pyproject.toml / requirements.txt
- [ ] .env.example
- [ ] config.py (pydantic-settings)
- [ ] database.py (SQLAlchemy async)
- [ ] Alembic инициализирован
- [ ] Модели созданы
- [ ] Миграции применены
- [ ] CRUD базовый
- [ ] Services
- [ ] Schemas (Pydantic)
- [ ] Routers
- [ ] Auth (JWT)
- [ ] Dependencies
- [ ] Tests (conftest.py)
- [ ] Dockerfile
- [ ] docker-compose.yml
- [ ] README.md
Версия: 1.0.0