Версия: 1.0.0
Дата: 2025-11-10
Client → [Upload API] → [Validation] → [Storage] → [Processing Queue]
↓ ↓ ↓
Virus Scan S3/Local Image Resize
Size Check Video Transcode
Type Check PDF Preview
from fastapi import UploadFile
from cifra.storage import storage
@app.post("/api/files/upload")
async def upload_file(
file: UploadFile,
entity_type: str,
entity_id: UUID
):
"""Direct upload for small files"""
# Validate
if file.size > 10_000_000:
raise ValueError("File too large")
# Upload
file_id = await storage.save(
file=file,
path=f"{entity_type}/{entity_id}/{file.filename}"
)
# Create record
file_record = await FileAttachment.create({
'id': file_id,
'filename': file.filename,
'size': file.size,
'mime_type': file.content_type,
'entity_type': entity_type,
'entity_id': entity_id
})
return {'file_id': file_id, 'url': storage.url(file_id)}
from cifra.upload import ChunkedUploadManager
upload_manager = ChunkedUploadManager()
# Step 1: Initialize upload
@app.post("/api/files/upload/init")
async def init_upload(
filename: str,
size: int,
mime_type: str,
chunk_size: int = 5_000_000 # 5MB chunks
):
"""Initialize chunked upload"""
upload_id = await upload_manager.init(
filename=filename,
total_size=size,
mime_type=mime_type,
chunk_size=chunk_size
)
total_chunks = (size + chunk_size - 1) // chunk_size
return {
'upload_id': upload_id,
'total_chunks': total_chunks,
'chunk_size': chunk_size
}
# Step 2: Upload chunks
@app.post("/api/files/upload/chunk")
async def upload_chunk(
upload_id: UUID,
chunk_number: int,
chunk: UploadFile
):
"""Upload single chunk"""
await upload_manager.upload_chunk(
upload_id=upload_id,
chunk_number=chunk_number,
data=await chunk.read()
)
progress = await upload_manager.get_progress(upload_id)
return {
'chunk_number': chunk_number,
'progress': progress['percent'],
'uploaded_chunks': progress['uploaded'],
'total_chunks': progress['total']
}
# Step 3: Finalize upload
@app.post("/api/files/upload/finalize")
async def finalize_upload(upload_id: UUID):
"""Finalize chunked upload"""
file_record = await upload_manager.finalize(upload_id)
# Trigger post-processing
await task_queue.enqueue('process_file', file_record.id)
return {
'file_id': file_record.id,
'url': storage.url(file_record.id)
}
Client-side example (JavaScript):
// Chunked upload client
async function uploadLargeFile(file) {
const chunkSize = 5 * 1024 * 1024; // 5MB
// 1. Initialize
const {upload_id, total_chunks} = await fetch('/api/files/upload/init', {
method: 'POST',
body: JSON.stringify({
filename: file.name,
size: file.size,
mime_type: file.type,
chunk_size: chunkSize
})
}).then(r => r.json());
// 2. Upload chunks in parallel (max 3 concurrent)
const uploadChunk = async (chunkNumber) => {
const start = chunkNumber * chunkSize;
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end);
const formData = new FormData();
formData.append('chunk', chunk);
await fetch(`/api/files/upload/chunk?upload_id=${upload_id}&chunk_number=${chunkNumber}`, {
method: 'POST',
body: formData
});
};
// Upload with concurrency limit
for (let i = 0; i < total_chunks; i += 3) {
await Promise.all([
uploadChunk(i),
uploadChunk(i + 1),
uploadChunk(i + 2)
].filter((_, idx) => i + idx < total_chunks));
}
// 3. Finalize
const result = await fetch(`/api/files/upload/finalize?upload_id=${upload_id}`, {
method: 'POST'
}).then(r => r.json());
return result;
}
@app.post("/api/files/upload/resume")
async def resume_upload(upload_id: UUID):
"""Get resumable upload status"""
progress = await upload_manager.get_progress(upload_id)
if progress['status'] == 'completed':
return {'status': 'completed', 'file_id': progress['file_id']}
# Return list of uploaded chunks
return {
'status': 'in_progress',
'uploaded_chunks': progress['chunks'],
'missing_chunks': [
i for i in range(progress['total'])
if i not in progress['chunks']
]
}
from cifra.storage.backends import LocalStorage
storage = LocalStorage(
base_path='/var/www/uploads',
base_url='https://example.com/uploads'
)
# Save file
file_id = await storage.save(file, path='contacts/123/photo.jpg')
# Get URL
url = storage.url(file_id)
# → https://example.com/uploads/contacts/123/photo.jpg
# Delete
await storage.delete(file_id)
from cifra.storage.backends import S3Storage
storage = S3Storage(
bucket='my-bucket',
region='us-east-1',
access_key=os.getenv('AWS_ACCESS_KEY'),
secret_key=os.getenv('AWS_SECRET_KEY'),
public=True # Public read access
)
# Save file
file_id = await storage.save(file, path='contacts/123/photo.jpg')
# Get presigned URL (for private files)
url = storage.presigned_url(file_id, expires=3600)
# Get public URL
url = storage.url(file_id)
# → https://my-bucket.s3.amazonaws.com/contacts/123/photo.jpg
from cifra.storage.backends import AzureStorage
storage = AzureStorage(
account_name='myaccount',
account_key=os.getenv('AZURE_STORAGE_KEY'),
container='uploads'
)
from cifra.validation import FileValidator
validator = FileValidator(
allowed_types=['image/jpeg', 'image/png', 'application/pdf'],
max_size=10_000_000, # 10MB
virus_scan=True
)
@app.post("/api/files/upload")
async def upload_file(file: UploadFile):
# Validate
result = await validator.validate(file)
if not result.valid:
raise ValueError(result.error)
# Upload
file_id = await storage.save(file)
return {'file_id': file_id}
Validation checks:
class FileValidator:
async def validate(self, file: UploadFile) -> ValidationResult:
"""Validate uploaded file"""
# 1. Check size
if file.size > self.max_size:
return ValidationResult(
valid=False,
error=f"File too large: {file.size} > {self.max_size}"
)
# 2. Check MIME type
if file.content_type not in self.allowed_types:
return ValidationResult(
valid=False,
error=f"Invalid file type: {file.content_type}"
)
# 3. Check magic bytes (verify real type)
magic_type = await self._check_magic_bytes(file)
if magic_type != file.content_type:
return ValidationResult(
valid=False,
error="File type mismatch"
)
# 4. Virus scan
if self.virus_scan:
is_clean = await self._virus_scan(file)
if not is_clean:
return ValidationResult(
valid=False,
error="Virus detected"
)
return ValidationResult(valid=True)
from cifra.media import ImageProcessor
processor = ImageProcessor()
@app.post("/api/files/upload/image")
async def upload_image(file: UploadFile):
"""Upload and process image"""
# Upload original
original_id = await storage.save(file, path=f"originals/{uuid4()}.jpg")
# Create thumbnails
thumbnails = await processor.create_thumbnails(
file=file,
sizes={
'small': (150, 150),
'medium': (300, 300),
'large': (800, 800)
}
)
# Save thumbnails
thumbnail_urls = {}
for size, image_data in thumbnails.items():
thumb_id = await storage.save(
image_data,
path=f"thumbnails/{size}/{uuid4()}.jpg"
)
thumbnail_urls[size] = storage.url(thumb_id)
# Create record
image = await Image.create({
'original_url': storage.url(original_id),
'thumbnails': thumbnail_urls,
'width': await processor.get_width(file),
'height': await processor.get_height(file)
})
return image
Image optimization:
class ImageProcessor:
async def optimize(self, file: UploadFile) -> bytes:
"""Optimize image (reduce size, strip metadata)"""
from PIL import Image
img = Image.open(file.file)
# Convert to RGB (remove alpha)
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
img = background
# Resize if too large
max_dimension = 2048
if max(img.size) > max_dimension:
ratio = max_dimension / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
# Save optimized
output = BytesIO()
img.save(output, format='JPEG', quality=85, optimize=True)
output.seek(0)
return output.read()
from cifra.media import VideoProcessor
processor = VideoProcessor()
@app.post("/api/files/upload/video")
async def upload_video(file: UploadFile):
"""Upload video and queue for transcoding"""
# Upload original
video_id = await storage.save(file, path=f"videos/{uuid4()}.mp4")
# Create video record
video = await Video.create({
'original_url': storage.url(video_id),
'status': 'processing'
})
# Queue transcoding job
await task_queue.enqueue('transcode_video', video.id, {
'formats': ['720p', '480p', '360p'],
'codec': 'h264',
'generate_thumbnail': True
})
return video
# Background job
@task('transcode_video')
async def transcode_video(video_id: UUID, options: dict):
"""Transcode video to multiple formats"""
video = await Video.get(video_id)
# Download original
original = await storage.download(video.original_url)
# Transcode
variants = await processor.transcode(
original,
formats=options['formats'],
codec=options['codec']
)
# Upload variants
variant_urls = {}
for format_name, video_data in variants.items():
variant_id = await storage.save(
video_data,
path=f"videos/{format_name}/{uuid4()}.mp4"
)
variant_urls[format_name] = storage.url(variant_id)
# Generate thumbnail
if options['generate_thumbnail']:
thumbnail = await processor.generate_thumbnail(original, time=5.0)
thumbnail_id = await storage.save(thumbnail, path=f"thumbnails/{uuid4()}.jpg")
variant_urls['thumbnail'] = storage.url(thumbnail_id)
# Update record
await video.update({
'variants': variant_urls,
'status': 'ready'
})
from cifra.media import PDFProcessor
processor = PDFProcessor()
@app.post("/api/files/upload/pdf")
async def upload_pdf(file: UploadFile):
"""Upload PDF and generate previews"""
# Upload original
pdf_id = await storage.save(file, path=f"documents/{uuid4()}.pdf")
# Extract metadata
metadata = await processor.extract_metadata(file)
# Generate preview images (first 3 pages)
previews = await processor.generate_previews(
file,
pages=[1, 2, 3],
dpi=150
)
# Upload previews
preview_urls = []
for page_num, image_data in previews.items():
preview_id = await storage.save(
image_data,
path=f"previews/{uuid4()}.jpg"
)
preview_urls.append(storage.url(preview_id))
# Create record
document = await Document.create({
'file_url': storage.url(pdf_id),
'filename': file.filename,
'page_count': metadata['pages'],
'preview_urls': preview_urls
})
return document
entities:
FileAttachment:
table_name: file_attachments
fields:
id: {type: uuid, primary_key: true}
filename: {type: string, required: true}
original_filename: {type: string}
size: {type: integer} # Bytes
mime_type: {type: string}
storage_backend: {type: string} # local, s3, azure
storage_path: {type: string}
url: {type: string}
# Image-specific
width: {type: integer}
height: {type: integer}
thumbnails: {type: json}
# Polymorphic relation
entity_type: {type: string} # Contact, Deal, etc.
entity_id: {type: uuid}
# Metadata
uploaded_by: {type: uuid, foreign_key: User.id}
uploaded_at: {type: datetime, auto_now_add: true}
indexes:
- [entity_type, entity_id]
- [uploaded_by]
entities:
Contact:
fields:
avatar: {type: file, allowed_types: [image/*]}
Deal:
fields:
attachments: {type: file[], max_count: 10}
Product:
fields:
images: {type: file[], allowed_types: [image/*], max_count: 5}
brochure: {type: file, allowed_types: [application/pdf]}
Generated code:
# Get attachments
contact = await Contact.get(uuid)
avatar_url = contact.avatar.url
avatar_thumbnail = contact.avatar.thumbnails['small']
# Upload new attachment
await contact.update_avatar(uploaded_file)
# List attachments
deal = await Deal.get(uuid)
for attachment in deal.attachments:
print(f"{attachment.filename} - {attachment.size} bytes")
@app.post("/api/files/upload/presigned")
async def get_presigned_url(
filename: str,
mime_type: str,
size: int
):
"""Get presigned URL for direct S3 upload"""
# Validate
if size > 100_000_000:
raise ValueError("File too large")
# Generate presigned URL
file_id = uuid4()
path = f"uploads/{file_id}/{filename}"
presigned = await storage.generate_presigned_post(
path=path,
conditions=[
['content-length-range', 0, size],
['starts-with', '$Content-Type', mime_type]
],
expires=3600
)
# Create pending record
await FileAttachment.create({
'id': file_id,
'filename': filename,
'status': 'pending',
'storage_path': path
})
return presigned
# Client-side JavaScript
async function uploadToS3(file) {
// 1. Get presigned URL
const {url, fields} = await fetch('/api/files/upload/presigned', {
method: 'POST',
body: JSON.stringify({
filename: file.name,
mime_type: file.type,
size: file.size
})
}).then(r => r.json());
// 2. Upload directly to S3
const formData = new FormData();
Object.entries(fields).forEach(([key, value]) => {
formData.append(key, value);
});
formData.append('file', file);
await fetch(url, {
method: 'POST',
body: formData
});
// 3. Notify backend
await fetch('/api/files/upload/complete', {
method: 'POST',
body: JSON.stringify({file_id: fields.key})
});
}
from cifra.security import VirusScanner
scanner = VirusScanner(engine='clamav')
async def scan_file(file: UploadFile) -> bool:
"""Scan file for viruses"""
result = await scanner.scan(file)
if result.infected:
logger.warning(f"Virus detected: {result.virus_name}")
return False
return True
from cifra.auth import require_permission
@app.get("/api/files/{file_id}/download")
@require_permission('file:download')
async def download_file(file_id: UUID, current_user: User):
"""Download file with access check"""
file = await FileAttachment.get(file_id)
# Check ownership
if file.entity_type == 'Contact':
contact = await Contact.get(file.entity_id)
if not await current_user.can_view(contact):
raise PermissionError("Access denied")
# Generate temporary URL
url = await storage.presigned_url(file.storage_path, expires=300)
return {'url': url}
from fastapi_limiter.depends import RateLimiter
@app.post("/api/files/upload")
@rate_limit(max_requests=10, window=60) # 10 uploads per minute
async def upload_file(file: UploadFile):
...
from cifra.tasks import scheduled_task
@scheduled_task(cron='0 2 * * *') # 2 AM daily
async def cleanup_orphaned_files():
"""Delete files not referenced by any entity"""
# Find orphaned files (older than 24 hours)
cutoff = datetime.now() - timedelta(hours=24)
orphaned = await FileAttachment.filter(
entity_id=None,
uploaded_at__lt=cutoff
).all()
for file in orphaned:
await storage.delete(file.storage_path)
await file.delete()
logger.info(f"Cleaned up {len(orphaned)} orphaned files")
@scheduled_task(cron='0 3 * * 0') # Sunday 3 AM
async def cleanup_old_versions():
"""Delete old file versions"""
cutoff = datetime.now() - timedelta(days=90)
old_versions = await FileVersion.filter(
created_at__lt=cutoff,
is_current=False
).all()
for version in old_versions:
await storage.delete(version.storage_path)
await version.delete()
# config.yaml
file_upload:
# Storage backend
storage:
backend: s3 # local, s3, azure
s3:
bucket: my-bucket
region: us-east-1
public: true
local:
base_path: /var/www/uploads
base_url: https://example.com/uploads
# Validation
validation:
max_size: 100_000_000 # 100MB
allowed_types:
- image/jpeg
- image/png
- application/pdf
- video/mp4
virus_scan: true
# Image processing
images:
optimize: true
max_dimension: 2048
thumbnails:
small: [150, 150]
medium: [300, 300]
large: [800, 800]
# Chunked upload
chunked:
enabled: true
chunk_size: 5_000_000 # 5MB
max_chunks: 1000
temp_storage: /tmp/uploads
# Cleanup
cleanup:
orphaned_files_after: 24 # hours
old_versions_after: 90 # days
Документация: https://docs.cifra.io/file-upload