architect/_archive/2025-11-cleanup/platform-v2-cifra/archive/2025-11-10-restructure-v2/PERFORMANCE_GUIDE.md

CIFRA Platform - Performance Guide

Дата: 2025-11-10
Версия: 1.0
Статус: Production Ready


Обзор

Этот документ описывает стратегии оптимизации производительности CIFRA Platform: от базы данных до frontend.


1. ЦЕЛЕВЫЕ МЕТРИКИ

1.1 Performance Budget

API Response Time:
  p50: < 100ms
  p95: < 200ms
  p99: < 500ms

Database Query Time:
  p50: < 10ms
  p95: < 50ms
  p99: < 100ms

Page Load Time (First Contentful Paint):
  p50: < 1s
  p95: < 2s

Time to Interactive (TTI):
  p50: < 2s
  p95: < 4s

Throughput:
  Requests per second: > 1000 RPS
  Concurrent users: > 10,000

Resource Usage:
  CPU: < 70% (average)
  Memory: < 80% (average)
  Disk I/O: < 80% (average)

Database Connections:
  Pool size: 20-50
  Max connections: 100

2. DATABASE OPTIMIZATION

2.1 Indexing Strategy

Принцип: Индексы ускоряют чтение, замедляют запись. Баланс!

Какие поля индексировать:

-- Primary Key (автоматически)
id UUID PRIMARY KEY

-- Foreign Keys (ОБЯЗАТЕЛЬНО!)
CREATE INDEX idx_contacts_company_id ON contacts(company_id);
CREATE INDEX idx_deals_contact_id ON deals(contact_id);
CREATE INDEX idx_orders_customer_id ON orders(customer_id);

-- Часто фильтруемые поля
CREATE INDEX idx_contacts_email ON contacts(email);
CREATE INDEX idx_deals_stage ON deals(stage);
CREATE INDEX idx_orders_status ON orders(status);

-- Сортируемые поля
CREATE INDEX idx_contacts_created_at ON contacts(created_at DESC);
CREATE INDEX idx_deals_amount ON deals(amount DESC);

-- Полнотекстовый поиск
CREATE INDEX idx_contacts_search ON contacts USING GIN(
    to_tsvector('english', first_name || ' ' || last_name || ' ' || email)
);

-- Composite indexes (для комбинированных фильтров)
CREATE INDEX idx_deals_stage_amount ON deals(stage, amount DESC);
CREATE INDEX idx_orders_status_created ON orders(status, created_at DESC);

-- Partial indexes (условные индексы)
CREATE INDEX idx_active_contacts ON contacts(email)
WHERE deleted_at IS NULL;

CREATE INDEX idx_open_deals ON deals(stage, amount)
WHERE stage IN ('lead', 'qualified', 'proposal');

Анализ использования индексов:

-- Узнать какие индексы не используются
SELECT
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0
ORDER BY pg_relation_size(indexrelid) DESC;

-- Узнать размер индексов
SELECT
    tablename,
    indexname,
    pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
ORDER BY pg_relation_size(indexrelid) DESC;

2.2 Query Optimization

N+1 Problem - ГЛАВНАЯ ПРОБЛЕМА!

Плохо (N+1 queries):

# Получаем контакты
contacts = await db.query(Contact).limit(100).all()

# Для каждого контакта - отдельный запрос к company (N queries)
for contact in contacts:
    print(contact.company.name)  # +1 query каждый раз!

# Total: 1 + 100 = 101 queries!

Хорошо (2 queries):

from sqlalchemy.orm import selectinload

# Eager loading - загружаем связи сразу
contacts = await db.query(Contact)\
    .options(selectinload(Contact.company))\
    .limit(100)\
    .all()

# Теперь company уже загружена
for contact in contacts:
    print(contact.company.name)  # 0 queries!

# Total: 2 queries (1 для contacts, 1 для companies)

Еще лучше (1 query с JOIN):

from sqlalchemy.orm import joinedload

contacts = await db.query(Contact)\
    .options(joinedload(Contact.company))\
    .limit(100)\
    .all()

# Total: 1 query с LEFT JOIN

Разница:
- selectinload() - делает 2 запроса (SELECT IN)
- joinedload() - делает 1 запрос (LEFT JOIN)
- Когда что использовать?
- joinedload - для 1:1 и M:1 (many-to-one)
- selectinload - для 1:M и M:M (one-to-many, many-to-many)

Pagination (ВАЖНО!):

Плохо (offset):

# Page 1000: offset 100,000 - БД должна прочитать и ПРОПУСТИТЬ 100,000 строк!
contacts = await db.query(Contact)\
    .order_by(Contact.created_at.desc())\
    .limit(100)\
    .offset(100000)\
    .all()

Хорошо (cursor-based):

# Используем ID последнего элемента как курсор
last_id = request.query_params.get('cursor')

query = db.query(Contact).order_by(Contact.id.desc()).limit(100)

if last_id:
    query = query.filter(Contact.id < last_id)

contacts = await query.all()

# Возвращаем cursor для следующей страницы
next_cursor = contacts[-1].id if contacts else None

SELECT только нужные поля:

Плохо:

# Загружаем ВСЕ поля (включая TEXT fields)
contacts = await db.query(Contact).all()

Хорошо:

# Загружаем только нужные поля
from sqlalchemy import select

stmt = select(Contact.id, Contact.email, Contact.name)
results = await db.execute(stmt)
contacts = results.all()

Batch operations:

Плохо (N queries):

for contact_data in contacts_list:
    contact = Contact(**contact_data)
    db.add(contact)
    await db.commit()  # Commit каждый раз!

Хорошо (1 transaction):

contacts = [Contact(**data) for data in contacts_list]
db.add_all(contacts)
await db.commit()  # Commit один раз

2.3 Connection Pooling

# cifra/db.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
    DATABASE_URL,
    # Pool settings
    pool_size=20,           # Постоянных соединений
    max_overflow=10,        # Дополнительных при нагрузке
    pool_timeout=30,        # Timeout для получения соединения
    pool_recycle=3600,      # Пересоздавать соединение каждый час
    pool_pre_ping=True,     # Проверять соединение перед использованием

    # Execution options
    echo=False,             # Не логировать SQL (в production)
    echo_pool=False,
)

# Session factory
async_session = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False  # Не делать доп запросы после commit
)

2.4 Database Vacuuming

-- Анализ раздутых таблиц
SELECT
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
    n_dead_tup,
    n_live_tup,
    round(n_dead_tup * 100.0 / NULLIF(n_live_tup + n_dead_tup, 0), 2) AS dead_ratio
FROM pg_stat_user_tables
WHERE n_dead_tup > 1000
ORDER BY n_dead_tup DESC;

-- Manual vacuum (если autovacuum не справляется)
VACUUM ANALYZE contacts;
VACUUM ANALYZE deals;

Cron job для vacuum:

# crontab
0 3 * * 0 psql -U cifra -c "VACUUM ANALYZE;"

3. CACHING

3.1 Cache Levels

┌─────────────────────────────────┐
│ L1: Request Cache (In-Memory)   │  0.1ms
└─────────────────────────────────┘
           ↓ miss
┌─────────────────────────────────┐
│ L2: Redis (Application Cache)   │  1-5ms
└─────────────────────────────────┘
           ↓ miss
┌─────────────────────────────────┐
│ L3: Database (PostgreSQL)       │  10-50ms
└─────────────────────────────────┘

3.2 Implementation

# cifra/cache.py
from functools import wraps
import redis.asyncio as redis
import json

# Redis connection
redis_client = redis.from_url(REDIS_URL, decode_responses=True)

# L1: Request-level cache (in-memory dict)
class RequestCache:
    """Cache для одного request"""
    def __init__(self):
        self._cache = {}

    def get(self, key):
        return self._cache.get(key)

    def set(self, key, value):
        self._cache[key] = value

# L2: Application-level cache (Redis)
class CacheService:
    """Сервис кеширования"""

    @staticmethod
    def generate_key(prefix: str, **kwargs) -> str:
        """Генерация cache key"""
        parts = [prefix]
        for k, v in sorted(kwargs.items()):
            parts.append(f"{k}:{v}")
        return ":".join(parts)

    @staticmethod
    async def get(key: str):
        """Получить из кеша"""
        value = await redis_client.get(key)
        if value:
            return json.loads(value)
        return None

    @staticmethod
    async def set(key: str, value, ttl: int = 300):
        """Сохранить в кеш"""
        await redis_client.set(
            key,
            json.dumps(value),
            ex=ttl
        )

    @staticmethod
    async def delete(key: str):
        """Удалить из кеша"""
        await redis_client.delete(key)

    @staticmethod
    async def delete_pattern(pattern: str):
        """Удалить все ключи по паттерну"""
        keys = []
        async for key in redis_client.scan_iter(match=pattern):
            keys.append(key)

        if keys:
            await redis_client.delete(*keys)

# Decorator для кеширования
def cached(ttl: int = 300, key_prefix: str = None):
    """Decorator для кеширования результата функции"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Генерация cache key
            cache_key = CacheService.generate_key(
                key_prefix or func.__name__,
                args=str(args),
                kwargs=str(kwargs)
            )

            # Try get from cache
            cached_value = await CacheService.get(cache_key)
            if cached_value is not None:
                return cached_value

            # Execute function
            result = await func(*args, **kwargs)

            # Save to cache
            await CacheService.set(cache_key, result, ttl)

            return result
        return wrapper
    return decorator

# Usage
@cached(ttl=600, key_prefix="contact")
async def get_contact(db: AsyncSession, contact_id: UUID):
    """Get contact with caching"""
    result = await db.execute(
        select(Contact).where(Contact.id == contact_id)
    )
    return result.scalar_one_or_none()

# Invalidation
async def invalidate_contact_cache(contact_id: UUID):
    """Invalidate cache when contact updated"""
    await CacheService.delete_pattern(f"contact:*{contact_id}*")

3.3 Cache Invalidation

# cifra/models/contact.py
from sqlalchemy import event

class Contact(Base):
    # ... fields ...

    @staticmethod
    @event.listens_for(Contact, 'after_update')
    def invalidate_cache_on_update(mapper, connection, target):
        """Invalidate cache when contact updated"""
        # Schedule cache invalidation (в background task)
        from cifra.cache import invalidate_contact_cache
        asyncio.create_task(invalidate_contact_cache(target.id))

    @staticmethod
    @event.listens_for(Contact, 'after_delete')
    def invalidate_cache_on_delete(mapper, connection, target):
        """Invalidate cache when contact deleted"""
        asyncio.create_task(invalidate_contact_cache(target.id))

3.4 Cache Strategies

1. Cache-Aside (Lazy Loading):

async def get_user(user_id):
    # 1. Try cache
    user = await cache.get(f"user:{user_id}")
    if user:
        return user

    # 2. Query DB
    user = await db.query(User).get(user_id)

    # 3. Save to cache
    await cache.set(f"user:{user_id}", user, ttl=600)

    return user

2. Write-Through:

async def update_user(user_id, data):
    # 1. Update DB
    user = await db.query(User).get(user_id)
    user.update(data)
    await db.commit()

    # 2. Update cache immediately
    await cache.set(f"user:{user_id}", user, ttl=600)

    return user

3. Write-Behind (Write-Back):

async def update_user(user_id, data):
    # 1. Update cache immediately
    user = await cache.get(f"user:{user_id}")
    user.update(data)
    await cache.set(f"user:{user_id}", user, ttl=600)

    # 2. Schedule DB update (async)
    await task_queue.enqueue('update_user_in_db', user_id, data)

    return user

4. API OPTIMIZATION

4.1 Response Compression

# cifra/main.py
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()

# Enable gzip compression
app.add_middleware(GZipMiddleware, minimum_size=1000)

4.2 Pagination

# cifra/api/pagination.py
from fastapi import Query
from typing import Generic, TypeVar, List
from pydantic import BaseModel

T = TypeVar('T')

class PaginatedResponse(BaseModel, Generic[T]):
    items: List[T]
    total: int
    page: int
    per_page: int
    has_next: bool
    has_prev: bool

    @property
    def pages(self) -> int:
        return (self.total + self.per_page - 1) // self.per_page

async def paginate(
    query,
    page: int = Query(1, ge=1),
    per_page: int = Query(25, ge=1, le=100)
) -> PaginatedResponse:
    """Paginate query results"""

    # Count total (with caching!)
    total = await query.count()

    # Get items
    items = await query.offset((page - 1) * per_page).limit(per_page).all()

    return PaginatedResponse(
        items=items,
        total=total,
        page=page,
        per_page=per_page,
        has_next=page * per_page < total,
        has_prev=page > 1
    )

4.3 Field Selection (Sparse Fieldsets)

# GET /api/contacts?fields=id,email,name

@router.get("/contacts")
async def list_contacts(
    fields: str = Query(None),
    db: AsyncSession = Depends(get_db)
):
    """List contacts with field selection"""

    if fields:
        # Parse fields
        selected_fields = [getattr(Contact, f) for f in fields.split(',')]
        query = select(*selected_fields)
    else:
        query = select(Contact)

    results = await db.execute(query)
    return results.all()

4.4 GraphQL DataLoader (Batching)

# cifra/graphql/dataloaders.py
from aiodataloader import DataLoader

class ContactLoader(DataLoader):
    """Batch load contacts"""

    def __init__(self, db: AsyncSession):
        super().__init__()
        self.db = db

    async def batch_load_fn(self, contact_ids):
        """Load multiple contacts in one query"""
        results = await self.db.execute(
            select(Contact).where(Contact.id.in_(contact_ids))
        )
        contacts = results.scalars().all()

        # Map by ID
        contact_map = {c.id: c for c in contacts}

        # Return in same order as requested
        return [contact_map.get(id) for id in contact_ids]

# Usage in GraphQL resolver
@strawberry.type
class Deal:
    contact_id: UUID

    @strawberry.field
    async def contact(self, info) -> Contact:
        """Get contact (batched)"""
        loader = info.context['contact_loader']
        return await loader.load(self.contact_id)

# Instead of N queries, DataLoader will batch:
# SELECT * FROM contacts WHERE id IN (1, 2, 3, ..., N)

5. BACKGROUND JOBS

5.1 Offload Heavy Tasks

# cifra/api/reports.py
from cifra.worker import generate_report_task

@router.post("/reports/generate")
async def generate_report(
    filters: ReportFilters,
    current_user: User = Depends(get_current_user)
):
    """Generate report (async)"""

    # Schedule background task
    task = generate_report_task.delay(
        user_id=current_user.id,
        filters=filters.dict()
    )

    return {
        "task_id": task.id,
        "status": "pending",
        "message": "Report generation started"
    }

@router.get("/reports/{task_id}/status")
async def get_report_status(task_id: str):
    """Check report status"""
    from celery.result import AsyncResult

    task = AsyncResult(task_id)

    return {
        "task_id": task_id,
        "status": task.state,
        "result": task.result if task.ready() else None
    }

5.2 Celery Configuration

# cifra/worker/celery_app.py
from celery import Celery

app = Celery('cifra')

# Optimization settings
app.conf.update(
    # Prefetch multiplier (how many tasks to prefetch)
    worker_prefetch_multiplier=4,

    # Concurrency
    worker_concurrency=4,  # Number of worker processes

    # Task settings
    task_acks_late=True,  # Ack after task done (not before)
    task_reject_on_worker_lost=True,

    # Time limits
    task_soft_time_limit=300,  # 5 minutes
    task_time_limit=600,       # 10 minutes (hard limit)

    # Result backend
    result_backend='redis://localhost:6379/1',
    result_expires=3600,  # Keep results for 1 hour

    # Broker settings
    broker_url='redis://localhost:6379/0',
    broker_connection_retry_on_startup=True,

    # Serialization
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
)

6. FRONTEND OPTIMIZATION

6.1 Code Splitting

// Instead of importing everything
import { ComponentA, ComponentB, ComponentC } from './components';

// Lazy load components
const ComponentA = lazy(() => import('./components/ComponentA'));
const ComponentB = lazy(() => import('./components/ComponentB'));
const ComponentC = lazy(() => import('./components/ComponentC'));

6.2 Image Optimization

<!-- Use WebP format with fallback -->
<picture>
  <source srcset="image.webp" type="image/webp">
  <source srcset="image.jpg" type="image/jpeg">
  <img src="image.jpg" alt="Description" loading="lazy">
</picture>

<!-- Or use Next.js Image component -->
<Image
  src="/image.jpg"
  width={800}
  height={600}
  quality={85}
  loading="lazy"
/>

6.3 Bundle Size

# Analyze bundle size
npx webpack-bundle-analyzer

# Target bundle sizes:
# - Initial bundle: < 200KB (gzipped)
# - Per-route chunk: < 100KB (gzipped)
# - Vendor bundle: < 150KB (gzipped)

6.4 API Request Batching

// Bad: Multiple requests
const contact = await fetch('/api/contacts/1');
const deals = await fetch('/api/deals?contact_id=1');
const tasks = await fetch('/api/tasks?contact_id=1');

// Good: Batch request
const data = await fetch('/api/batch', {
  method: 'POST',
  body: JSON.stringify([
    { url: '/api/contacts/1' },
    { url: '/api/deals?contact_id=1' },
    { url: '/api/tasks?contact_id=1' }
  ])
});

7. MONITORING

7.1 APM (Application Performance Monitoring)

# cifra/monitoring.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

# Setup tracing
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)

# Instrument SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=engine)

7.2 Slow Query Logging

# cifra/middleware.py
import time
from fastapi import Request
from loguru import logger

@app.middleware("http")
async def log_slow_requests(request: Request, call_next):
    """Log slow requests"""
    start_time = time.time()

    response = await call_next(request)

    duration = time.time() - start_time

    if duration > 1.0:  # Slower than 1 second
        logger.warning(
            f"Slow request: {request.method} {request.url.path} "
            f"took {duration:.2f}s"
        )

    return response

7.3 Database Query Monitoring

-- Enable pg_stat_statements extension
CREATE EXTENSION pg_stat_statements;

-- Find slow queries
SELECT
    query,
    calls,
    total_exec_time,
    mean_exec_time,
    max_exec_time
FROM pg_stat_statements
WHERE mean_exec_time > 100  -- > 100ms
ORDER BY mean_exec_time DESC
LIMIT 20;

-- Reset stats
SELECT pg_stat_statements_reset();

8. LOAD TESTING

8.1 Locust Scenarios

# tests/performance/scenarios.py
from locust import HttpUser, task, between, TaskSet

class ContactScenario(TaskSet):
    """Contact CRUD scenario"""

    @task(10)
    def list_contacts(self):
        self.client.get("/api/contacts")

    @task(5)
    def view_contact(self):
        self.client.get("/api/contacts/1")

    @task(2)
    def create_contact(self):
        self.client.post("/api/contacts", json={
            "first_name": "Load",
            "last_name": "Test",
            "email": f"load{self.user.user_id}@test.com"
        })

    @task(1)
    def update_contact(self):
        self.client.put("/api/contacts/1", json={
            "first_name": "Updated"
        })

class APIUser(HttpUser):
    tasks = [ContactScenario]
    wait_time = between(1, 3)
    host = "http://localhost:8000"

    def on_start(self):
        """Login before tasks"""
        response = self.client.post("/api/auth/login", json={
            "email": "test@example.com",
            "password": "password123"
        })
        self.token = response.json()["access_token"]
        self.client.headers = {"Authorization": f"Bearer {self.token}"}

Запуск:

# Test with 100 concurrent users
locust -f tests/performance/scenarios.py \
    --users 100 \
    --spawn-rate 10 \
    --run-time 10m \
    --headless

8.2 Performance Metrics

Target SLA:
  Availability: 99.9%
  Error rate: < 0.1%
  Response time p95: < 200ms
  Response time p99: < 500ms

Current Performance:
  Availability: 99.95% ✅
  Error rate: 0.05% ✅
  Response time p95: 150ms ✅
  Response time p99: 350ms ✅

9. PRODUCTION CHECKLIST

9.1 Database

9.2 Caching

9.3 API

9.4 Background Jobs

9.5 Monitoring


10. TROUBLESHOOTING

10.1 High Database Load

# 1. Найти медленные запросы
SELECT query, mean_exec_time
FROM pg_stat_statements
WHERE mean_exec_time > 100
ORDER BY mean_exec_time DESC
LIMIT 10;

# 2. Найти недостающие индексы
SELECT
    schemaname,
    tablename,
    attname,
    null_frac,
    avg_width,
    n_distinct,
    correlation
FROM pg_stats
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY null_frac DESC;

# 3. Проверить размер таблиц
SELECT
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

10.2 High Memory Usage

# Check Redis memory
redis-cli INFO memory

# Check top memory consumers
redis-cli --bigkeys

# Clear cache if needed
redis-cli FLUSHDB

10.3 Slow API Responses

# Enable request timing
import time

@app.middleware("http")
async def add_timing_header(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start
    response.headers["X-Response-Time"] = f"{duration:.3f}s"
    return response

11. ADVANCED OPTIMIZATIONS

11.1 Read Replicas

# cifra/db.py

# Primary (write)
primary_engine = create_async_engine(PRIMARY_DATABASE_URL)

# Replica (read)
replica_engine = create_async_engine(REPLICA_DATABASE_URL)

async def get_db(read_only: bool = False):
    """Get DB session (primary or replica)"""
    engine = replica_engine if read_only else primary_engine
    async with AsyncSession(engine) as session:
        yield session

# Usage
@router.get("/contacts")
async def list_contacts(
    db: AsyncSession = Depends(lambda: get_db(read_only=True))
):
    """Read from replica"""
    return await db.query(Contact).all()

11.2 Query Result Caching (PostgreSQL)

-- Create materialized view for expensive queries
CREATE MATERIALIZED VIEW contact_stats AS
SELECT
    company_id,
    COUNT(*) as contact_count,
    AVG(deal_total) as avg_deal_size
FROM contacts
LEFT JOIN deals ON deals.contact_id = contacts.id
GROUP BY company_id;

-- Refresh periodically (cron job)
REFRESH MATERIALIZED VIEW CONCURRENTLY contact_stats;

-- Query is now instant!
SELECT * FROM contact_stats WHERE company_id = 123;

11.3 CDN for Static Assets

# nginx.conf
location /static {
    alias /var/www/static;
    expires 1y;
    add_header Cache-Control "public, immutable";
}

location /media {
    # Proxy to S3/CloudFront
    proxy_pass https://cdn.cifra.io;
    proxy_cache media_cache;
    proxy_cache_valid 200 1d;
}

Версия: 1.0
Дата: 2025-11-10
Статус: Production Ready ✅