architect/_archive/2025-11-26-cleanup/cifra/archive/2025-11-10-restructure-v2/BACKGROUND_JOBS.md

CIFRA Background Jobs System

Версия: 1.0.0
Дата: 2025-11-10


Architecture

API Request  [Task Queue]  Worker Pool  Execute Task  Update Status
                  
              Redis/RabbitMQ
                  
         [Scheduler]  Cron Tasks

Components:
- Task Queue: Redis (Celery) or RabbitMQ
- Worker Pool: Multiple worker processes
- Scheduler: Celery Beat for cron tasks
- Monitoring: Flower dashboard


Task Definition

from cifra.tasks import task

@task(name='send_email')
async def send_email(to: str, subject: str, body: str):
    """Send email task"""

    await email_service.send(
        to=to,
        subject=subject,
        body=body
    )

    return {'status': 'sent', 'to': to}

# Enqueue task
await send_email.delay(
    to='user@example.com',
    subject='Welcome',
    body='Hello!'
)

Task Options

@task(
    name='process_order',
    retry=3,                    # Retry 3 times
    retry_delay=60,             # Wait 60 seconds between retries
    timeout=300,                # Timeout after 5 minutes
    priority=5,                 # Priority 0-9 (9=highest)
    max_concurrency=10          # Max 10 concurrent executions
)
async def process_order(order_id: UUID):
    """Process order with retry logic"""

    try:
        order = await Order.get(order_id)

        # Process payment
        payment_result = await payment_service.charge(order)

        # Update inventory
        await inventory_service.reserve(order.items)

        # Send confirmation
        await send_email.delay(
            to=order.customer.email,
            subject='Order Confirmation',
            body=f'Your order {order.id} is confirmed'
        )

        await order.update({'status': 'processed'})

        return {'status': 'success', 'order_id': order_id}

    except PaymentError as e:
        # Don't retry payment errors
        raise TaskFailed(f"Payment failed: {e}", retry=False)

    except InventoryError as e:
        # Retry inventory errors
        logger.warning(f"Inventory error: {e}, will retry")
        raise TaskRetry(delay=120)  # Retry after 2 minutes

Task Execution Modes

1. Async (Default)

# Fire and forget
task_id = await send_email.delay(to='user@example.com', ...)

# Get result later
result = await task_id.get()

2. Sync (Wait for result)

# Block until complete
result = await send_email.apply(to='user@example.com', ...)

3. Scheduled (Execute later)

from datetime import datetime, timedelta

# Execute in 1 hour
await send_email.apply_at(
    datetime.now() + timedelta(hours=1),
    to='user@example.com',
    ...
)

# Execute at specific time
await send_email.apply_at(
    datetime(2025, 11, 15, 10, 0, 0),
    to='user@example.com',
    ...
)

4. Periodic (Cron)

from cifra.tasks import scheduled_task

@scheduled_task(cron='0 2 * * *')  # Every day at 2 AM
async def daily_backup():
    """Run daily backup"""

    await backup_service.backup_database()
    await backup_service.backup_uploads()

    return {'status': 'completed', 'timestamp': datetime.now()}

# Cron syntax:
# * * * * * = minute hour day month weekday
# 0 2 * * * = 2:00 AM every day
# 0 */4 * * * = Every 4 hours
# 0 9 * * 1 = Every Monday at 9 AM
# 0 0 1 * * = First day of month at midnight

Task Chains

from cifra.tasks import chain, group

# Sequential execution (chain)
result = await chain(
    download_file.s(url='https://example.com/data.csv'),
    parse_csv.s(),
    import_contacts.s()
).apply_async()

# Parallel execution (group)
results = await group(
    send_email.s(to='user1@example.com'),
    send_email.s(to='user2@example.com'),
    send_email.s(to='user3@example.com')
).apply_async()

# Wait for all to complete
await results.wait()

Complex Workflows

from cifra.tasks import chord

# Chord: Group + Callback
# Run tasks in parallel, then execute callback with results
result = await chord([
    process_chunk.s(chunk_id=1),
    process_chunk.s(chunk_id=2),
    process_chunk.s(chunk_id=3)
])(merge_results.s())

# Example: Process large dataset
@task
async def process_chunk(chunk_id: int):
    """Process data chunk"""
    chunk = await load_chunk(chunk_id)
    results = []

    for record in chunk:
        results.append(await process_record(record))

    return results

@task
async def merge_results(chunk_results: list):
    """Merge results from all chunks"""
    all_results = []
    for chunk in chunk_results:
        all_results.extend(chunk)

    await save_final_results(all_results)

    return {'total': len(all_results)}

Task Status Tracking

from cifra.tasks import TaskStatus

# Get task status
task = await TaskStatus.get(task_id)

print(task.status)  # pending, running, success, failed
print(task.progress)  # 0-100
print(task.result)  # Task result (if completed)
print(task.error)  # Error message (if failed)

# Update progress from inside task
@task
async def long_running_task(total_items: int):
    """Task with progress updates"""

    for i, item in enumerate(items):
        await process_item(item)

        # Update progress
        await self.update_progress(
            percent=(i + 1) / total_items * 100,
            message=f"Processed {i + 1} of {total_items}"
        )

    return {'processed': total_items}

Frontend tracking:

async function trackTask(taskId) {
  while (true) {
    const task = await fetch(`/api/tasks/${taskId}`).then(r => r.json());

    console.log(`Progress: ${task.progress}%`);

    if (task.status === 'success') {
      console.log('Task completed:', task.result);
      break;
    }

    if (task.status === 'failed') {
      console.error('Task failed:', task.error);
      break;
    }

    await sleep(1000);  // Check every second
  }
}

Retry Logic

@task(retry=3, retry_delay=60)
async def api_call_with_retry(url: str):
    """API call with exponential backoff"""

    try:
        response = await http_client.get(url)
        return response.json()

    except HTTPError as e:
        if e.status_code in [500, 502, 503]:
            # Retry on server errors
            # Exponential backoff: 60s, 120s, 240s
            retry_count = self.request.retries
            delay = 60 * (2 ** retry_count)

            logger.warning(f"API call failed, retry in {delay}s")
            raise TaskRetry(delay=delay)
        else:
            # Don't retry on client errors
            raise TaskFailed(f"API call failed: {e}", retry=False)

Task Priorities

# High priority (executed first)
@task(priority=9)
async def urgent_task():
    """Critical task"""
    pass

# Normal priority
@task(priority=5)
async def normal_task():
    """Normal task"""
    pass

# Low priority (executed last)
@task(priority=1)
async def batch_task():
    """Batch processing"""
    pass

# Enqueue with priority
await send_email.apply_async(
    kwargs={'to': 'user@example.com'},
    priority=9
)

Task Rate Limiting

from cifra.tasks import rate_limit

@task
@rate_limit(max_requests=100, window=60)  # 100 per minute
async def send_sms(phone: str, message: str):
    """Send SMS with rate limiting"""

    await sms_service.send(phone, message)

Worker Configuration

# worker_config.yaml
workers:
  # Default worker
  default:
    concurrency: 4  # 4 concurrent tasks
    prefetch_multiplier: 4  # Prefetch 4 tasks per worker
    queues: [default]

  # High priority worker
  priority:
    concurrency: 2
    queues: [high_priority]

  # CPU-intensive worker
  cpu:
    concurrency: 2
    max_tasks_per_child: 100  # Restart after 100 tasks
    queues: [cpu_intensive]

  # IO-intensive worker
  io:
    concurrency: 20
    queues: [io_intensive]

Start workers:

# Default worker
cifra worker start --config worker_config.yaml --worker default

# All workers
cifra worker start --config worker_config.yaml --all

# Specific queue
cifra worker start --queue high_priority --concurrency 10

Task Queues

# Define queues
@task(queue='default')
async def normal_task():
    pass

@task(queue='high_priority')
async def urgent_task():
    pass

@task(queue='cpu_intensive')
async def heavy_computation():
    pass

@task(queue='io_intensive')
async def download_file():
    pass

# Route tasks to queues
task_routes = {
    'send_email': {'queue': 'default'},
    'process_video': {'queue': 'cpu_intensive'},
    'import_data': {'queue': 'io_intensive'},
    'urgent_notification': {'queue': 'high_priority'}
}

Scheduled Tasks (Cron)

from cifra.tasks import scheduled_task

# Every minute
@scheduled_task(cron='* * * * *')
async def check_health():
    """Health check every minute"""
    pass

# Every 5 minutes
@scheduled_task(cron='*/5 * * * *')
async def sync_data():
    """Sync data every 5 minutes"""
    pass

# Every hour
@scheduled_task(cron='0 * * * *')
async def cleanup_temp():
    """Cleanup temp files hourly"""
    pass

# Every day at 2 AM
@scheduled_task(cron='0 2 * * *')
async def daily_report():
    """Generate daily report"""
    pass

# Every Monday at 9 AM
@scheduled_task(cron='0 9 * * 1')
async def weekly_summary():
    """Send weekly summary"""
    pass

# First day of month
@scheduled_task(cron='0 0 1 * *')
async def monthly_billing():
    """Process monthly billing"""
    pass

Manage scheduled tasks:

# List scheduled tasks
cifra scheduler list

# Enable task
cifra scheduler enable daily_report

# Disable task
cifra scheduler disable daily_report

# Run task now
cifra scheduler run daily_report

Task Result Storage

from cifra.tasks import task

@task(store_result=True, result_expires=3600)  # Store for 1 hour
async def expensive_computation(data):
    """Expensive computation with result caching"""

    result = await compute(data)
    return result

# Get result
task_id = await expensive_computation.delay(data)

# Result is cached for 1 hour
result = await task_id.get()  # Fast (from cache)

Error Handling

from cifra.tasks import task, TaskFailed, TaskRetry

@task(retry=3)
async def robust_task():
    """Task with error handling"""

    try:
        # Do work
        result = await do_work()
        return result

    except TemporaryError as e:
        # Retry temporary errors
        logger.warning(f"Temporary error: {e}, will retry")
        raise TaskRetry(delay=60)

    except PermanentError as e:
        # Don't retry permanent errors
        logger.error(f"Permanent error: {e}")
        raise TaskFailed(str(e), retry=False)

    except Exception as e:
        # Log unexpected errors
        logger.exception(f"Unexpected error: {e}")
        raise

# Error callbacks
@task
async def on_failure(task_id, exception, traceback):
    """Called when task fails"""

    logger.error(f"Task {task_id} failed: {exception}")

    # Notify admin
    await send_email.delay(
        to='admin@example.com',
        subject=f'Task Failed: {task_id}',
        body=f'Error: {exception}\n\nTraceback:\n{traceback}'
    )

@task(on_failure=on_failure)
async def my_task():
    """Task with failure callback"""
    pass

Task Monitoring

Flower Dashboard

# Start Flower
cifra flower start --port 5555

# Access: http://localhost:5555

Metrics:
- Active tasks
- Task success/failure rates
- Task execution time
- Worker status
- Queue lengths


Custom Monitoring

from cifra.tasks import task_monitor

# Get metrics
stats = await task_monitor.get_stats()

print(f"Active tasks: {stats['active']}")
print(f"Completed today: {stats['completed_today']}")
print(f"Failed today: {stats['failed_today']}")
print(f"Average execution time: {stats['avg_execution_time']}s")

# Get task history
history = await task_monitor.get_history(
    task_name='send_email',
    from_date=datetime.now() - timedelta(days=7)
)

for task in history:
    print(f"{task.created_at}: {task.status} - {task.execution_time}s")

Real-world Examples

Example 1: Bulk Email Campaign

@task
async def send_campaign(campaign_id: UUID):
    """Send email campaign to all subscribers"""

    campaign = await Campaign.get(campaign_id)
    subscribers = await campaign.get_subscribers()

    # Create subtasks for each batch
    batch_size = 100
    tasks = []

    for i in range(0, len(subscribers), batch_size):
        batch = subscribers[i:i + batch_size]
        tasks.append(send_campaign_batch.s(campaign_id, [s.id for s in batch]))

    # Execute in parallel
    result = await group(tasks).apply_async()

    # Wait for all to complete
    await result.wait()

    # Update campaign status
    await campaign.update({'status': 'sent', 'sent_at': datetime.now()})

@task
async def send_campaign_batch(campaign_id: UUID, subscriber_ids: list):
    """Send campaign to batch of subscribers"""

    campaign = await Campaign.get(campaign_id)

    for subscriber_id in subscriber_ids:
        subscriber = await Subscriber.get(subscriber_id)

        await send_email.delay(
            to=subscriber.email,
            subject=campaign.subject,
            body=render_template(campaign.template, subscriber=subscriber)
        )

Example 2: Data Import

@task
async def import_csv(file_id: UUID):
    """Import contacts from CSV file"""

    file = await FileAttachment.get(file_id)

    # Download file
    content = await storage.download(file.storage_path)

    # Parse CSV
    import csv
    reader = csv.DictReader(content.decode().splitlines())
    rows = list(reader)

    total = len(rows)
    await self.update_progress(0, f"Starting import of {total} rows")

    # Import in batches
    batch_size = 100
    for i in range(0, total, batch_size):
        batch = rows[i:i + batch_size]

        # Import batch
        for row in batch:
            await Contact.create({
                'first_name': row['first_name'],
                'last_name': row['last_name'],
                'email': row['email'],
                'phone': row.get('phone'),
            })

        # Update progress
        progress = (i + len(batch)) / total * 100
        await self.update_progress(
            progress,
            f"Imported {i + len(batch)} of {total} rows"
        )

    return {'imported': total}

Example 3: Report Generation

@task(timeout=600)  # 10 minutes timeout
async def generate_monthly_report(month: int, year: int):
    """Generate monthly sales report"""

    # Query data
    await self.update_progress(10, "Querying sales data...")

    sales = await Sale.filter(
        created_at__gte=datetime(year, month, 1),
        created_at__lt=datetime(year, month + 1, 1)
    ).all()

    await self.update_progress(30, "Aggregating data...")

    # Aggregate
    report_data = {
        'total_sales': sum(s.amount for s in sales),
        'total_count': len(sales),
        'by_category': {},
        'by_day': {}
    }

    await self.update_progress(50, "Generating charts...")

    # Generate charts
    chart_urls = await generate_charts(report_data)

    await self.update_progress(70, "Generating PDF...")

    # Generate PDF
    pdf = await pdf_generator.generate(
        template='monthly_report.html',
        data=report_data,
        charts=chart_urls
    )

    await self.update_progress(90, "Saving report...")

    # Upload PDF
    file_id = await storage.save(pdf, path=f"reports/{year}-{month:02d}.pdf")

    # Create record
    report = await Report.create({
        'month': month,
        'year': year,
        'file_url': storage.url(file_id),
        'data': report_data
    })

    await self.update_progress(100, "Report completed")

    return {'report_id': report.id, 'url': report.file_url}

Best Practices

  1. Keep tasks small and focused - One task = one responsibility
  2. Make tasks idempotent - Safe to retry without side effects
  3. Use exponential backoff - For retry delays
  4. Store task results - For expensive computations
  5. Monitor task performance - Track execution time and failure rates
  6. Use appropriate queues - Separate CPU/IO/priority tasks
  7. Set timeouts - Prevent hanging tasks
  8. Log errors - With context for debugging
  9. Test tasks - Unit test task logic separately
  10. Use task chains - For complex workflows

Configuration

# config.yaml
background_jobs:
  # Broker
  broker:
    type: redis  # redis, rabbitmq
    url: redis://localhost:6379/0

  # Result backend
  result_backend:
    type: redis
    url: redis://localhost:6379/1
    expires: 3600  # Results expire after 1 hour

  # Workers
  workers:
    default_concurrency: 4
    prefetch_multiplier: 4
    max_tasks_per_child: 1000

  # Retries
  retry:
    max_retries: 3
    default_retry_delay: 60
    retry_backoff: true
    retry_backoff_max: 600

  # Monitoring
  monitoring:
    flower:
      enabled: true
      port: 5555
    metrics:
      enabled: true
      prometheus_port: 9090

Документация: https://docs.cifra.io/background-jobs