Дата: 2025-11-10
Версия: 1.0
Статус: Production Ready
Этот документ описывает стратегии оптимизации производительности CIFRA Platform: от базы данных до frontend.
API Response Time:
p50: < 100ms
p95: < 200ms
p99: < 500ms
Database Query Time:
p50: < 10ms
p95: < 50ms
p99: < 100ms
Page Load Time (First Contentful Paint):
p50: < 1s
p95: < 2s
Time to Interactive (TTI):
p50: < 2s
p95: < 4s
Throughput:
Requests per second: > 1000 RPS
Concurrent users: > 10,000
Resource Usage:
CPU: < 70% (average)
Memory: < 80% (average)
Disk I/O: < 80% (average)
Database Connections:
Pool size: 20-50
Max connections: 100
Принцип: Индексы ускоряют чтение, замедляют запись. Баланс!
Какие поля индексировать:
-- Primary Key (автоматически)
id UUID PRIMARY KEY
-- Foreign Keys (ОБЯЗАТЕЛЬНО!)
CREATE INDEX idx_contacts_company_id ON contacts(company_id);
CREATE INDEX idx_deals_contact_id ON deals(contact_id);
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
-- Часто фильтруемые поля
CREATE INDEX idx_contacts_email ON contacts(email);
CREATE INDEX idx_deals_stage ON deals(stage);
CREATE INDEX idx_orders_status ON orders(status);
-- Сортируемые поля
CREATE INDEX idx_contacts_created_at ON contacts(created_at DESC);
CREATE INDEX idx_deals_amount ON deals(amount DESC);
-- Полнотекстовый поиск
CREATE INDEX idx_contacts_search ON contacts USING GIN(
to_tsvector('english', first_name || ' ' || last_name || ' ' || email)
);
-- Composite indexes (для комбинированных фильтров)
CREATE INDEX idx_deals_stage_amount ON deals(stage, amount DESC);
CREATE INDEX idx_orders_status_created ON orders(status, created_at DESC);
-- Partial indexes (условные индексы)
CREATE INDEX idx_active_contacts ON contacts(email)
WHERE deleted_at IS NULL;
CREATE INDEX idx_open_deals ON deals(stage, amount)
WHERE stage IN ('lead', 'qualified', 'proposal');
Анализ использования индексов:
-- Узнать какие индексы не используются
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0
ORDER BY pg_relation_size(indexrelid) DESC;
-- Узнать размер индексов
SELECT
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
ORDER BY pg_relation_size(indexrelid) DESC;
N+1 Problem - ГЛАВНАЯ ПРОБЛЕМА!
❌ Плохо (N+1 queries):
# Получаем контакты
contacts = await db.query(Contact).limit(100).all()
# Для каждого контакта - отдельный запрос к company (N queries)
for contact in contacts:
print(contact.company.name) # +1 query каждый раз!
# Total: 1 + 100 = 101 queries!
✅ Хорошо (2 queries):
from sqlalchemy.orm import selectinload
# Eager loading - загружаем связи сразу
contacts = await db.query(Contact)\
.options(selectinload(Contact.company))\
.limit(100)\
.all()
# Теперь company уже загружена
for contact in contacts:
print(contact.company.name) # 0 queries!
# Total: 2 queries (1 для contacts, 1 для companies)
Еще лучше (1 query с JOIN):
from sqlalchemy.orm import joinedload
contacts = await db.query(Contact)\
.options(joinedload(Contact.company))\
.limit(100)\
.all()
# Total: 1 query с LEFT JOIN
Разница:
- selectinload() - делает 2 запроса (SELECT IN)
- joinedload() - делает 1 запрос (LEFT JOIN)
- Когда что использовать?
- joinedload - для 1:1 и M:1 (many-to-one)
- selectinload - для 1:M и M:M (one-to-many, many-to-many)
Pagination (ВАЖНО!):
❌ Плохо (offset):
# Page 1000: offset 100,000 - БД должна прочитать и ПРОПУСТИТЬ 100,000 строк!
contacts = await db.query(Contact)\
.order_by(Contact.created_at.desc())\
.limit(100)\
.offset(100000)\
.all()
✅ Хорошо (cursor-based):
# Используем ID последнего элемента как курсор
last_id = request.query_params.get('cursor')
query = db.query(Contact).order_by(Contact.id.desc()).limit(100)
if last_id:
query = query.filter(Contact.id < last_id)
contacts = await query.all()
# Возвращаем cursor для следующей страницы
next_cursor = contacts[-1].id if contacts else None
SELECT только нужные поля:
❌ Плохо:
# Загружаем ВСЕ поля (включая TEXT fields)
contacts = await db.query(Contact).all()
✅ Хорошо:
# Загружаем только нужные поля
from sqlalchemy import select
stmt = select(Contact.id, Contact.email, Contact.name)
results = await db.execute(stmt)
contacts = results.all()
Batch operations:
❌ Плохо (N queries):
for contact_data in contacts_list:
contact = Contact(**contact_data)
db.add(contact)
await db.commit() # Commit каждый раз!
✅ Хорошо (1 transaction):
contacts = [Contact(**data) for data in contacts_list]
db.add_all(contacts)
await db.commit() # Commit один раз
# cifra/db.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
DATABASE_URL,
# Pool settings
pool_size=20, # Постоянных соединений
max_overflow=10, # Дополнительных при нагрузке
pool_timeout=30, # Timeout для получения соединения
pool_recycle=3600, # Пересоздавать соединение каждый час
pool_pre_ping=True, # Проверять соединение перед использованием
# Execution options
echo=False, # Не логировать SQL (в production)
echo_pool=False,
)
# Session factory
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False # Не делать доп запросы после commit
)
-- Анализ раздутых таблиц
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
n_dead_tup,
n_live_tup,
round(n_dead_tup * 100.0 / NULLIF(n_live_tup + n_dead_tup, 0), 2) AS dead_ratio
FROM pg_stat_user_tables
WHERE n_dead_tup > 1000
ORDER BY n_dead_tup DESC;
-- Manual vacuum (если autovacuum не справляется)
VACUUM ANALYZE contacts;
VACUUM ANALYZE deals;
Cron job для vacuum:
# crontab
0 3 * * 0 psql -U cifra -c "VACUUM ANALYZE;"
┌─────────────────────────────────┐
│ L1: Request Cache (In-Memory) │ 0.1ms
└─────────────────────────────────┘
↓ miss
┌─────────────────────────────────┐
│ L2: Redis (Application Cache) │ 1-5ms
└─────────────────────────────────┘
↓ miss
┌─────────────────────────────────┐
│ L3: Database (PostgreSQL) │ 10-50ms
└─────────────────────────────────┘
# cifra/cache.py
from functools import wraps
import redis.asyncio as redis
import json
# Redis connection
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
# L1: Request-level cache (in-memory dict)
class RequestCache:
"""Cache для одного request"""
def __init__(self):
self._cache = {}
def get(self, key):
return self._cache.get(key)
def set(self, key, value):
self._cache[key] = value
# L2: Application-level cache (Redis)
class CacheService:
"""Сервис кеширования"""
@staticmethod
def generate_key(prefix: str, **kwargs) -> str:
"""Генерация cache key"""
parts = [prefix]
for k, v in sorted(kwargs.items()):
parts.append(f"{k}:{v}")
return ":".join(parts)
@staticmethod
async def get(key: str):
"""Получить из кеша"""
value = await redis_client.get(key)
if value:
return json.loads(value)
return None
@staticmethod
async def set(key: str, value, ttl: int = 300):
"""Сохранить в кеш"""
await redis_client.set(
key,
json.dumps(value),
ex=ttl
)
@staticmethod
async def delete(key: str):
"""Удалить из кеша"""
await redis_client.delete(key)
@staticmethod
async def delete_pattern(pattern: str):
"""Удалить все ключи по паттерну"""
keys = []
async for key in redis_client.scan_iter(match=pattern):
keys.append(key)
if keys:
await redis_client.delete(*keys)
# Decorator для кеширования
def cached(ttl: int = 300, key_prefix: str = None):
"""Decorator для кеширования результата функции"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Генерация cache key
cache_key = CacheService.generate_key(
key_prefix or func.__name__,
args=str(args),
kwargs=str(kwargs)
)
# Try get from cache
cached_value = await CacheService.get(cache_key)
if cached_value is not None:
return cached_value
# Execute function
result = await func(*args, **kwargs)
# Save to cache
await CacheService.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Usage
@cached(ttl=600, key_prefix="contact")
async def get_contact(db: AsyncSession, contact_id: UUID):
"""Get contact with caching"""
result = await db.execute(
select(Contact).where(Contact.id == contact_id)
)
return result.scalar_one_or_none()
# Invalidation
async def invalidate_contact_cache(contact_id: UUID):
"""Invalidate cache when contact updated"""
await CacheService.delete_pattern(f"contact:*{contact_id}*")
# cifra/models/contact.py
from sqlalchemy import event
class Contact(Base):
# ... fields ...
@staticmethod
@event.listens_for(Contact, 'after_update')
def invalidate_cache_on_update(mapper, connection, target):
"""Invalidate cache when contact updated"""
# Schedule cache invalidation (в background task)
from cifra.cache import invalidate_contact_cache
asyncio.create_task(invalidate_contact_cache(target.id))
@staticmethod
@event.listens_for(Contact, 'after_delete')
def invalidate_cache_on_delete(mapper, connection, target):
"""Invalidate cache when contact deleted"""
asyncio.create_task(invalidate_contact_cache(target.id))
1. Cache-Aside (Lazy Loading):
async def get_user(user_id):
# 1. Try cache
user = await cache.get(f"user:{user_id}")
if user:
return user
# 2. Query DB
user = await db.query(User).get(user_id)
# 3. Save to cache
await cache.set(f"user:{user_id}", user, ttl=600)
return user
2. Write-Through:
async def update_user(user_id, data):
# 1. Update DB
user = await db.query(User).get(user_id)
user.update(data)
await db.commit()
# 2. Update cache immediately
await cache.set(f"user:{user_id}", user, ttl=600)
return user
3. Write-Behind (Write-Back):
async def update_user(user_id, data):
# 1. Update cache immediately
user = await cache.get(f"user:{user_id}")
user.update(data)
await cache.set(f"user:{user_id}", user, ttl=600)
# 2. Schedule DB update (async)
await task_queue.enqueue('update_user_in_db', user_id, data)
return user
# cifra/main.py
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
# Enable gzip compression
app.add_middleware(GZipMiddleware, minimum_size=1000)
# cifra/api/pagination.py
from fastapi import Query
from typing import Generic, TypeVar, List
from pydantic import BaseModel
T = TypeVar('T')
class PaginatedResponse(BaseModel, Generic[T]):
items: List[T]
total: int
page: int
per_page: int
has_next: bool
has_prev: bool
@property
def pages(self) -> int:
return (self.total + self.per_page - 1) // self.per_page
async def paginate(
query,
page: int = Query(1, ge=1),
per_page: int = Query(25, ge=1, le=100)
) -> PaginatedResponse:
"""Paginate query results"""
# Count total (with caching!)
total = await query.count()
# Get items
items = await query.offset((page - 1) * per_page).limit(per_page).all()
return PaginatedResponse(
items=items,
total=total,
page=page,
per_page=per_page,
has_next=page * per_page < total,
has_prev=page > 1
)
# GET /api/contacts?fields=id,email,name
@router.get("/contacts")
async def list_contacts(
fields: str = Query(None),
db: AsyncSession = Depends(get_db)
):
"""List contacts with field selection"""
if fields:
# Parse fields
selected_fields = [getattr(Contact, f) for f in fields.split(',')]
query = select(*selected_fields)
else:
query = select(Contact)
results = await db.execute(query)
return results.all()
# cifra/graphql/dataloaders.py
from aiodataloader import DataLoader
class ContactLoader(DataLoader):
"""Batch load contacts"""
def __init__(self, db: AsyncSession):
super().__init__()
self.db = db
async def batch_load_fn(self, contact_ids):
"""Load multiple contacts in one query"""
results = await self.db.execute(
select(Contact).where(Contact.id.in_(contact_ids))
)
contacts = results.scalars().all()
# Map by ID
contact_map = {c.id: c for c in contacts}
# Return in same order as requested
return [contact_map.get(id) for id in contact_ids]
# Usage in GraphQL resolver
@strawberry.type
class Deal:
contact_id: UUID
@strawberry.field
async def contact(self, info) -> Contact:
"""Get contact (batched)"""
loader = info.context['contact_loader']
return await loader.load(self.contact_id)
# Instead of N queries, DataLoader will batch:
# SELECT * FROM contacts WHERE id IN (1, 2, 3, ..., N)
# cifra/api/reports.py
from cifra.worker import generate_report_task
@router.post("/reports/generate")
async def generate_report(
filters: ReportFilters,
current_user: User = Depends(get_current_user)
):
"""Generate report (async)"""
# Schedule background task
task = generate_report_task.delay(
user_id=current_user.id,
filters=filters.dict()
)
return {
"task_id": task.id,
"status": "pending",
"message": "Report generation started"
}
@router.get("/reports/{task_id}/status")
async def get_report_status(task_id: str):
"""Check report status"""
from celery.result import AsyncResult
task = AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.state,
"result": task.result if task.ready() else None
}
# cifra/worker/celery_app.py
from celery import Celery
app = Celery('cifra')
# Optimization settings
app.conf.update(
# Prefetch multiplier (how many tasks to prefetch)
worker_prefetch_multiplier=4,
# Concurrency
worker_concurrency=4, # Number of worker processes
# Task settings
task_acks_late=True, # Ack after task done (not before)
task_reject_on_worker_lost=True,
# Time limits
task_soft_time_limit=300, # 5 minutes
task_time_limit=600, # 10 minutes (hard limit)
# Result backend
result_backend='redis://localhost:6379/1',
result_expires=3600, # Keep results for 1 hour
# Broker settings
broker_url='redis://localhost:6379/0',
broker_connection_retry_on_startup=True,
# Serialization
task_serializer='json',
accept_content=['json'],
result_serializer='json',
)
// Instead of importing everything
import { ComponentA, ComponentB, ComponentC } from './components';
// Lazy load components
const ComponentA = lazy(() => import('./components/ComponentA'));
const ComponentB = lazy(() => import('./components/ComponentB'));
const ComponentC = lazy(() => import('./components/ComponentC'));
<!-- Use WebP format with fallback -->
<picture>
<source srcset="image.webp" type="image/webp">
<source srcset="image.jpg" type="image/jpeg">
<img src="image.jpg" alt="Description" loading="lazy">
</picture>
<!-- Or use Next.js Image component -->
<Image
src="/image.jpg"
width={800}
height={600}
quality={85}
loading="lazy"
/>
# Analyze bundle size
npx webpack-bundle-analyzer
# Target bundle sizes:
# - Initial bundle: < 200KB (gzipped)
# - Per-route chunk: < 100KB (gzipped)
# - Vendor bundle: < 150KB (gzipped)
// Bad: Multiple requests
const contact = await fetch('/api/contacts/1');
const deals = await fetch('/api/deals?contact_id=1');
const tasks = await fetch('/api/tasks?contact_id=1');
// Good: Batch request
const data = await fetch('/api/batch', {
method: 'POST',
body: JSON.stringify([
{ url: '/api/contacts/1' },
{ url: '/api/deals?contact_id=1' },
{ url: '/api/tasks?contact_id=1' }
])
});
# cifra/monitoring.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
# Setup tracing
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
# Instrument SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=engine)
# cifra/middleware.py
import time
from fastapi import Request
from loguru import logger
@app.middleware("http")
async def log_slow_requests(request: Request, call_next):
"""Log slow requests"""
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
if duration > 1.0: # Slower than 1 second
logger.warning(
f"Slow request: {request.method} {request.url.path} "
f"took {duration:.2f}s"
)
return response
-- Enable pg_stat_statements extension
CREATE EXTENSION pg_stat_statements;
-- Find slow queries
SELECT
query,
calls,
total_exec_time,
mean_exec_time,
max_exec_time
FROM pg_stat_statements
WHERE mean_exec_time > 100 -- > 100ms
ORDER BY mean_exec_time DESC
LIMIT 20;
-- Reset stats
SELECT pg_stat_statements_reset();
# tests/performance/scenarios.py
from locust import HttpUser, task, between, TaskSet
class ContactScenario(TaskSet):
"""Contact CRUD scenario"""
@task(10)
def list_contacts(self):
self.client.get("/api/contacts")
@task(5)
def view_contact(self):
self.client.get("/api/contacts/1")
@task(2)
def create_contact(self):
self.client.post("/api/contacts", json={
"first_name": "Load",
"last_name": "Test",
"email": f"load{self.user.user_id}@test.com"
})
@task(1)
def update_contact(self):
self.client.put("/api/contacts/1", json={
"first_name": "Updated"
})
class APIUser(HttpUser):
tasks = [ContactScenario]
wait_time = between(1, 3)
host = "http://localhost:8000"
def on_start(self):
"""Login before tasks"""
response = self.client.post("/api/auth/login", json={
"email": "test@example.com",
"password": "password123"
})
self.token = response.json()["access_token"]
self.client.headers = {"Authorization": f"Bearer {self.token}"}
Запуск:
# Test with 100 concurrent users
locust -f tests/performance/scenarios.py \
--users 100 \
--spawn-rate 10 \
--run-time 10m \
--headless
Target SLA:
Availability: 99.9%
Error rate: < 0.1%
Response time p95: < 200ms
Response time p99: < 500ms
Current Performance:
Availability: 99.95% ✅
Error rate: 0.05% ✅
Response time p95: 150ms ✅
Response time p99: 350ms ✅
# 1. Найти медленные запросы
SELECT query, mean_exec_time
FROM pg_stat_statements
WHERE mean_exec_time > 100
ORDER BY mean_exec_time DESC
LIMIT 10;
# 2. Найти недостающие индексы
SELECT
schemaname,
tablename,
attname,
null_frac,
avg_width,
n_distinct,
correlation
FROM pg_stats
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY null_frac DESC;
# 3. Проверить размер таблиц
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
# Check Redis memory
redis-cli INFO memory
# Check top memory consumers
redis-cli --bigkeys
# Clear cache if needed
redis-cli FLUSHDB
# Enable request timing
import time
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
response.headers["X-Response-Time"] = f"{duration:.3f}s"
return response
# cifra/db.py
# Primary (write)
primary_engine = create_async_engine(PRIMARY_DATABASE_URL)
# Replica (read)
replica_engine = create_async_engine(REPLICA_DATABASE_URL)
async def get_db(read_only: bool = False):
"""Get DB session (primary or replica)"""
engine = replica_engine if read_only else primary_engine
async with AsyncSession(engine) as session:
yield session
# Usage
@router.get("/contacts")
async def list_contacts(
db: AsyncSession = Depends(lambda: get_db(read_only=True))
):
"""Read from replica"""
return await db.query(Contact).all()
-- Create materialized view for expensive queries
CREATE MATERIALIZED VIEW contact_stats AS
SELECT
company_id,
COUNT(*) as contact_count,
AVG(deal_total) as avg_deal_size
FROM contacts
LEFT JOIN deals ON deals.contact_id = contacts.id
GROUP BY company_id;
-- Refresh periodically (cron job)
REFRESH MATERIALIZED VIEW CONCURRENTLY contact_stats;
-- Query is now instant!
SELECT * FROM contact_stats WHERE company_id = 123;
# nginx.conf
location /static {
alias /var/www/static;
expires 1y;
add_header Cache-Control "public, immutable";
}
location /media {
# Proxy to S3/CloudFront
proxy_pass https://cdn.cifra.io;
proxy_cache media_cache;
proxy_cache_valid 200 1d;
}
Версия: 1.0
Дата: 2025-11-10
Статус: Production Ready ✅