architect/_archive/2025-11-26-cleanup/cifra/archive/2025-11-10-restructure-v2/FILE_UPLOAD.md

CIFRA File Upload System

Версия: 1.0.0
Дата: 2025-11-10


Architecture Overview

Client  [Upload API]  [Validation]  [Storage]  [Processing Queue]
                                                        
                        Virus Scan    S3/Local      Image Resize
                        Size Check                  Video Transcode
                        Type Check                  PDF Preview

Upload Strategies

1. Direct Upload (Small Files < 10MB)

from fastapi import UploadFile
from cifra.storage import storage

@app.post("/api/files/upload")
async def upload_file(
    file: UploadFile,
    entity_type: str,
    entity_id: UUID
):
    """Direct upload for small files"""

    # Validate
    if file.size > 10_000_000:
        raise ValueError("File too large")

    # Upload
    file_id = await storage.save(
        file=file,
        path=f"{entity_type}/{entity_id}/{file.filename}"
    )

    # Create record
    file_record = await FileAttachment.create({
        'id': file_id,
        'filename': file.filename,
        'size': file.size,
        'mime_type': file.content_type,
        'entity_type': entity_type,
        'entity_id': entity_id
    })

    return {'file_id': file_id, 'url': storage.url(file_id)}

2. Chunked Upload (Large Files > 10MB)

from cifra.upload import ChunkedUploadManager

upload_manager = ChunkedUploadManager()

# Step 1: Initialize upload
@app.post("/api/files/upload/init")
async def init_upload(
    filename: str,
    size: int,
    mime_type: str,
    chunk_size: int = 5_000_000  # 5MB chunks
):
    """Initialize chunked upload"""

    upload_id = await upload_manager.init(
        filename=filename,
        total_size=size,
        mime_type=mime_type,
        chunk_size=chunk_size
    )

    total_chunks = (size + chunk_size - 1) // chunk_size

    return {
        'upload_id': upload_id,
        'total_chunks': total_chunks,
        'chunk_size': chunk_size
    }

# Step 2: Upload chunks
@app.post("/api/files/upload/chunk")
async def upload_chunk(
    upload_id: UUID,
    chunk_number: int,
    chunk: UploadFile
):
    """Upload single chunk"""

    await upload_manager.upload_chunk(
        upload_id=upload_id,
        chunk_number=chunk_number,
        data=await chunk.read()
    )

    progress = await upload_manager.get_progress(upload_id)

    return {
        'chunk_number': chunk_number,
        'progress': progress['percent'],
        'uploaded_chunks': progress['uploaded'],
        'total_chunks': progress['total']
    }

# Step 3: Finalize upload
@app.post("/api/files/upload/finalize")
async def finalize_upload(upload_id: UUID):
    """Finalize chunked upload"""

    file_record = await upload_manager.finalize(upload_id)

    # Trigger post-processing
    await task_queue.enqueue('process_file', file_record.id)

    return {
        'file_id': file_record.id,
        'url': storage.url(file_record.id)
    }

Client-side example (JavaScript):

// Chunked upload client
async function uploadLargeFile(file) {
  const chunkSize = 5 * 1024 * 1024; // 5MB

  // 1. Initialize
  const {upload_id, total_chunks} = await fetch('/api/files/upload/init', {
    method: 'POST',
    body: JSON.stringify({
      filename: file.name,
      size: file.size,
      mime_type: file.type,
      chunk_size: chunkSize
    })
  }).then(r => r.json());

  // 2. Upload chunks in parallel (max 3 concurrent)
  const uploadChunk = async (chunkNumber) => {
    const start = chunkNumber * chunkSize;
    const end = Math.min(start + chunkSize, file.size);
    const chunk = file.slice(start, end);

    const formData = new FormData();
    formData.append('chunk', chunk);

    await fetch(`/api/files/upload/chunk?upload_id=${upload_id}&chunk_number=${chunkNumber}`, {
      method: 'POST',
      body: formData
    });
  };

  // Upload with concurrency limit
  for (let i = 0; i < total_chunks; i += 3) {
    await Promise.all([
      uploadChunk(i),
      uploadChunk(i + 1),
      uploadChunk(i + 2)
    ].filter((_, idx) => i + idx < total_chunks));
  }

  // 3. Finalize
  const result = await fetch(`/api/files/upload/finalize?upload_id=${upload_id}`, {
    method: 'POST'
  }).then(r => r.json());

  return result;
}

3. Resumable Upload

@app.post("/api/files/upload/resume")
async def resume_upload(upload_id: UUID):
    """Get resumable upload status"""

    progress = await upload_manager.get_progress(upload_id)

    if progress['status'] == 'completed':
        return {'status': 'completed', 'file_id': progress['file_id']}

    # Return list of uploaded chunks
    return {
        'status': 'in_progress',
        'uploaded_chunks': progress['chunks'],
        'missing_chunks': [
            i for i in range(progress['total'])
            if i not in progress['chunks']
        ]
    }

Storage Backends

Local Storage

from cifra.storage.backends import LocalStorage

storage = LocalStorage(
    base_path='/var/www/uploads',
    base_url='https://example.com/uploads'
)

# Save file
file_id = await storage.save(file, path='contacts/123/photo.jpg')

# Get URL
url = storage.url(file_id)
# → https://example.com/uploads/contacts/123/photo.jpg

# Delete
await storage.delete(file_id)

S3 Storage

from cifra.storage.backends import S3Storage

storage = S3Storage(
    bucket='my-bucket',
    region='us-east-1',
    access_key=os.getenv('AWS_ACCESS_KEY'),
    secret_key=os.getenv('AWS_SECRET_KEY'),
    public=True  # Public read access
)

# Save file
file_id = await storage.save(file, path='contacts/123/photo.jpg')

# Get presigned URL (for private files)
url = storage.presigned_url(file_id, expires=3600)

# Get public URL
url = storage.url(file_id)
# → https://my-bucket.s3.amazonaws.com/contacts/123/photo.jpg

Azure Blob Storage

from cifra.storage.backends import AzureStorage

storage = AzureStorage(
    account_name='myaccount',
    account_key=os.getenv('AZURE_STORAGE_KEY'),
    container='uploads'
)

File Validation

from cifra.validation import FileValidator

validator = FileValidator(
    allowed_types=['image/jpeg', 'image/png', 'application/pdf'],
    max_size=10_000_000,  # 10MB
    virus_scan=True
)

@app.post("/api/files/upload")
async def upload_file(file: UploadFile):
    # Validate
    result = await validator.validate(file)

    if not result.valid:
        raise ValueError(result.error)

    # Upload
    file_id = await storage.save(file)

    return {'file_id': file_id}

Validation checks:

class FileValidator:
    async def validate(self, file: UploadFile) -> ValidationResult:
        """Validate uploaded file"""

        # 1. Check size
        if file.size > self.max_size:
            return ValidationResult(
                valid=False,
                error=f"File too large: {file.size} > {self.max_size}"
            )

        # 2. Check MIME type
        if file.content_type not in self.allowed_types:
            return ValidationResult(
                valid=False,
                error=f"Invalid file type: {file.content_type}"
            )

        # 3. Check magic bytes (verify real type)
        magic_type = await self._check_magic_bytes(file)
        if magic_type != file.content_type:
            return ValidationResult(
                valid=False,
                error="File type mismatch"
            )

        # 4. Virus scan
        if self.virus_scan:
            is_clean = await self._virus_scan(file)
            if not is_clean:
                return ValidationResult(
                    valid=False,
                    error="Virus detected"
                )

        return ValidationResult(valid=True)

Image Processing

from cifra.media import ImageProcessor

processor = ImageProcessor()

@app.post("/api/files/upload/image")
async def upload_image(file: UploadFile):
    """Upload and process image"""

    # Upload original
    original_id = await storage.save(file, path=f"originals/{uuid4()}.jpg")

    # Create thumbnails
    thumbnails = await processor.create_thumbnails(
        file=file,
        sizes={
            'small': (150, 150),
            'medium': (300, 300),
            'large': (800, 800)
        }
    )

    # Save thumbnails
    thumbnail_urls = {}
    for size, image_data in thumbnails.items():
        thumb_id = await storage.save(
            image_data,
            path=f"thumbnails/{size}/{uuid4()}.jpg"
        )
        thumbnail_urls[size] = storage.url(thumb_id)

    # Create record
    image = await Image.create({
        'original_url': storage.url(original_id),
        'thumbnails': thumbnail_urls,
        'width': await processor.get_width(file),
        'height': await processor.get_height(file)
    })

    return image

Image optimization:

class ImageProcessor:
    async def optimize(self, file: UploadFile) -> bytes:
        """Optimize image (reduce size, strip metadata)"""
        from PIL import Image

        img = Image.open(file.file)

        # Convert to RGB (remove alpha)
        if img.mode in ('RGBA', 'LA'):
            background = Image.new('RGB', img.size, (255, 255, 255))
            background.paste(img, mask=img.split()[-1])
            img = background

        # Resize if too large
        max_dimension = 2048
        if max(img.size) > max_dimension:
            ratio = max_dimension / max(img.size)
            new_size = tuple(int(dim * ratio) for dim in img.size)
            img = img.resize(new_size, Image.Resampling.LANCZOS)

        # Save optimized
        output = BytesIO()
        img.save(output, format='JPEG', quality=85, optimize=True)
        output.seek(0)

        return output.read()

Video Processing

from cifra.media import VideoProcessor

processor = VideoProcessor()

@app.post("/api/files/upload/video")
async def upload_video(file: UploadFile):
    """Upload video and queue for transcoding"""

    # Upload original
    video_id = await storage.save(file, path=f"videos/{uuid4()}.mp4")

    # Create video record
    video = await Video.create({
        'original_url': storage.url(video_id),
        'status': 'processing'
    })

    # Queue transcoding job
    await task_queue.enqueue('transcode_video', video.id, {
        'formats': ['720p', '480p', '360p'],
        'codec': 'h264',
        'generate_thumbnail': True
    })

    return video

# Background job
@task('transcode_video')
async def transcode_video(video_id: UUID, options: dict):
    """Transcode video to multiple formats"""

    video = await Video.get(video_id)

    # Download original
    original = await storage.download(video.original_url)

    # Transcode
    variants = await processor.transcode(
        original,
        formats=options['formats'],
        codec=options['codec']
    )

    # Upload variants
    variant_urls = {}
    for format_name, video_data in variants.items():
        variant_id = await storage.save(
            video_data,
            path=f"videos/{format_name}/{uuid4()}.mp4"
        )
        variant_urls[format_name] = storage.url(variant_id)

    # Generate thumbnail
    if options['generate_thumbnail']:
        thumbnail = await processor.generate_thumbnail(original, time=5.0)
        thumbnail_id = await storage.save(thumbnail, path=f"thumbnails/{uuid4()}.jpg")
        variant_urls['thumbnail'] = storage.url(thumbnail_id)

    # Update record
    await video.update({
        'variants': variant_urls,
        'status': 'ready'
    })

PDF Processing

from cifra.media import PDFProcessor

processor = PDFProcessor()

@app.post("/api/files/upload/pdf")
async def upload_pdf(file: UploadFile):
    """Upload PDF and generate previews"""

    # Upload original
    pdf_id = await storage.save(file, path=f"documents/{uuid4()}.pdf")

    # Extract metadata
    metadata = await processor.extract_metadata(file)

    # Generate preview images (first 3 pages)
    previews = await processor.generate_previews(
        file,
        pages=[1, 2, 3],
        dpi=150
    )

    # Upload previews
    preview_urls = []
    for page_num, image_data in previews.items():
        preview_id = await storage.save(
            image_data,
            path=f"previews/{uuid4()}.jpg"
        )
        preview_urls.append(storage.url(preview_id))

    # Create record
    document = await Document.create({
        'file_url': storage.url(pdf_id),
        'filename': file.filename,
        'page_count': metadata['pages'],
        'preview_urls': preview_urls
    })

    return document

File Attachment Entity

entities:
  FileAttachment:
    table_name: file_attachments

    fields:
      id: {type: uuid, primary_key: true}
      filename: {type: string, required: true}
      original_filename: {type: string}
      size: {type: integer}  # Bytes
      mime_type: {type: string}
      storage_backend: {type: string}  # local, s3, azure
      storage_path: {type: string}
      url: {type: string}

      # Image-specific
      width: {type: integer}
      height: {type: integer}
      thumbnails: {type: json}

      # Polymorphic relation
      entity_type: {type: string}  # Contact, Deal, etc.
      entity_id: {type: uuid}

      # Metadata
      uploaded_by: {type: uuid, foreign_key: User.id}
      uploaded_at: {type: datetime, auto_now_add: true}

    indexes:
      - [entity_type, entity_id]
      - [uploaded_by]

Usage in Entities

entities:
  Contact:
    fields:
      avatar: {type: file, allowed_types: [image/*]}

  Deal:
    fields:
      attachments: {type: file[], max_count: 10}

  Product:
    fields:
      images: {type: file[], allowed_types: [image/*], max_count: 5}
      brochure: {type: file, allowed_types: [application/pdf]}

Generated code:

# Get attachments
contact = await Contact.get(uuid)
avatar_url = contact.avatar.url
avatar_thumbnail = contact.avatar.thumbnails['small']

# Upload new attachment
await contact.update_avatar(uploaded_file)

# List attachments
deal = await Deal.get(uuid)
for attachment in deal.attachments:
    print(f"{attachment.filename} - {attachment.size} bytes")

Direct Upload to S3 (Client-side)

@app.post("/api/files/upload/presigned")
async def get_presigned_url(
    filename: str,
    mime_type: str,
    size: int
):
    """Get presigned URL for direct S3 upload"""

    # Validate
    if size > 100_000_000:
        raise ValueError("File too large")

    # Generate presigned URL
    file_id = uuid4()
    path = f"uploads/{file_id}/{filename}"

    presigned = await storage.generate_presigned_post(
        path=path,
        conditions=[
            ['content-length-range', 0, size],
            ['starts-with', '$Content-Type', mime_type]
        ],
        expires=3600
    )

    # Create pending record
    await FileAttachment.create({
        'id': file_id,
        'filename': filename,
        'status': 'pending',
        'storage_path': path
    })

    return presigned

# Client-side JavaScript
async function uploadToS3(file) {
  // 1. Get presigned URL
  const {url, fields} = await fetch('/api/files/upload/presigned', {
    method: 'POST',
    body: JSON.stringify({
      filename: file.name,
      mime_type: file.type,
      size: file.size
    })
  }).then(r => r.json());

  // 2. Upload directly to S3
  const formData = new FormData();
  Object.entries(fields).forEach(([key, value]) => {
    formData.append(key, value);
  });
  formData.append('file', file);

  await fetch(url, {
    method: 'POST',
    body: formData
  });

  // 3. Notify backend
  await fetch('/api/files/upload/complete', {
    method: 'POST',
    body: JSON.stringify({file_id: fields.key})
  });
}

Security

Virus Scanning

from cifra.security import VirusScanner

scanner = VirusScanner(engine='clamav')

async def scan_file(file: UploadFile) -> bool:
    """Scan file for viruses"""

    result = await scanner.scan(file)

    if result.infected:
        logger.warning(f"Virus detected: {result.virus_name}")
        return False

    return True

Access Control

from cifra.auth import require_permission

@app.get("/api/files/{file_id}/download")
@require_permission('file:download')
async def download_file(file_id: UUID, current_user: User):
    """Download file with access check"""

    file = await FileAttachment.get(file_id)

    # Check ownership
    if file.entity_type == 'Contact':
        contact = await Contact.get(file.entity_id)
        if not await current_user.can_view(contact):
            raise PermissionError("Access denied")

    # Generate temporary URL
    url = await storage.presigned_url(file.storage_path, expires=300)

    return {'url': url}

Rate Limiting

from fastapi_limiter.depends import RateLimiter

@app.post("/api/files/upload")
@rate_limit(max_requests=10, window=60)  # 10 uploads per minute
async def upload_file(file: UploadFile):
    ...

Cleanup Jobs

from cifra.tasks import scheduled_task

@scheduled_task(cron='0 2 * * *')  # 2 AM daily
async def cleanup_orphaned_files():
    """Delete files not referenced by any entity"""

    # Find orphaned files (older than 24 hours)
    cutoff = datetime.now() - timedelta(hours=24)

    orphaned = await FileAttachment.filter(
        entity_id=None,
        uploaded_at__lt=cutoff
    ).all()

    for file in orphaned:
        await storage.delete(file.storage_path)
        await file.delete()

    logger.info(f"Cleaned up {len(orphaned)} orphaned files")

@scheduled_task(cron='0 3 * * 0')  # Sunday 3 AM
async def cleanup_old_versions():
    """Delete old file versions"""

    cutoff = datetime.now() - timedelta(days=90)

    old_versions = await FileVersion.filter(
        created_at__lt=cutoff,
        is_current=False
    ).all()

    for version in old_versions:
        await storage.delete(version.storage_path)
        await version.delete()

Configuration

# config.yaml
file_upload:
  # Storage backend
  storage:
    backend: s3  # local, s3, azure
    s3:
      bucket: my-bucket
      region: us-east-1
      public: true
    local:
      base_path: /var/www/uploads
      base_url: https://example.com/uploads

  # Validation
  validation:
    max_size: 100_000_000  # 100MB
    allowed_types:
      - image/jpeg
      - image/png
      - application/pdf
      - video/mp4
    virus_scan: true

  # Image processing
  images:
    optimize: true
    max_dimension: 2048
    thumbnails:
      small: [150, 150]
      medium: [300, 300]
      large: [800, 800]

  # Chunked upload
  chunked:
    enabled: true
    chunk_size: 5_000_000  # 5MB
    max_chunks: 1000
    temp_storage: /tmp/uploads

  # Cleanup
  cleanup:
    orphaned_files_after: 24  # hours
    old_versions_after: 90  # days

Документация: https://docs.cifra.io/file-upload