Версия: 1.0.0
Дата: 2025-11-10
API Request → [Task Queue] → Worker Pool → Execute Task → Update Status
↓
Redis/RabbitMQ
↓
[Scheduler] → Cron Tasks
Components:
- Task Queue: Redis (Celery) or RabbitMQ
- Worker Pool: Multiple worker processes
- Scheduler: Celery Beat for cron tasks
- Monitoring: Flower dashboard
from cifra.tasks import task
@task(name='send_email')
async def send_email(to: str, subject: str, body: str):
"""Send email task"""
await email_service.send(
to=to,
subject=subject,
body=body
)
return {'status': 'sent', 'to': to}
# Enqueue task
await send_email.delay(
to='user@example.com',
subject='Welcome',
body='Hello!'
)
@task(
name='process_order',
retry=3, # Retry 3 times
retry_delay=60, # Wait 60 seconds between retries
timeout=300, # Timeout after 5 minutes
priority=5, # Priority 0-9 (9=highest)
max_concurrency=10 # Max 10 concurrent executions
)
async def process_order(order_id: UUID):
"""Process order with retry logic"""
try:
order = await Order.get(order_id)
# Process payment
payment_result = await payment_service.charge(order)
# Update inventory
await inventory_service.reserve(order.items)
# Send confirmation
await send_email.delay(
to=order.customer.email,
subject='Order Confirmation',
body=f'Your order {order.id} is confirmed'
)
await order.update({'status': 'processed'})
return {'status': 'success', 'order_id': order_id}
except PaymentError as e:
# Don't retry payment errors
raise TaskFailed(f"Payment failed: {e}", retry=False)
except InventoryError as e:
# Retry inventory errors
logger.warning(f"Inventory error: {e}, will retry")
raise TaskRetry(delay=120) # Retry after 2 minutes
# Fire and forget
task_id = await send_email.delay(to='user@example.com', ...)
# Get result later
result = await task_id.get()
# Block until complete
result = await send_email.apply(to='user@example.com', ...)
from datetime import datetime, timedelta
# Execute in 1 hour
await send_email.apply_at(
datetime.now() + timedelta(hours=1),
to='user@example.com',
...
)
# Execute at specific time
await send_email.apply_at(
datetime(2025, 11, 15, 10, 0, 0),
to='user@example.com',
...
)
from cifra.tasks import scheduled_task
@scheduled_task(cron='0 2 * * *') # Every day at 2 AM
async def daily_backup():
"""Run daily backup"""
await backup_service.backup_database()
await backup_service.backup_uploads()
return {'status': 'completed', 'timestamp': datetime.now()}
# Cron syntax:
# * * * * * = minute hour day month weekday
# 0 2 * * * = 2:00 AM every day
# 0 */4 * * * = Every 4 hours
# 0 9 * * 1 = Every Monday at 9 AM
# 0 0 1 * * = First day of month at midnight
from cifra.tasks import chain, group
# Sequential execution (chain)
result = await chain(
download_file.s(url='https://example.com/data.csv'),
parse_csv.s(),
import_contacts.s()
).apply_async()
# Parallel execution (group)
results = await group(
send_email.s(to='user1@example.com'),
send_email.s(to='user2@example.com'),
send_email.s(to='user3@example.com')
).apply_async()
# Wait for all to complete
await results.wait()
from cifra.tasks import chord
# Chord: Group + Callback
# Run tasks in parallel, then execute callback with results
result = await chord([
process_chunk.s(chunk_id=1),
process_chunk.s(chunk_id=2),
process_chunk.s(chunk_id=3)
])(merge_results.s())
# Example: Process large dataset
@task
async def process_chunk(chunk_id: int):
"""Process data chunk"""
chunk = await load_chunk(chunk_id)
results = []
for record in chunk:
results.append(await process_record(record))
return results
@task
async def merge_results(chunk_results: list):
"""Merge results from all chunks"""
all_results = []
for chunk in chunk_results:
all_results.extend(chunk)
await save_final_results(all_results)
return {'total': len(all_results)}
from cifra.tasks import TaskStatus
# Get task status
task = await TaskStatus.get(task_id)
print(task.status) # pending, running, success, failed
print(task.progress) # 0-100
print(task.result) # Task result (if completed)
print(task.error) # Error message (if failed)
# Update progress from inside task
@task
async def long_running_task(total_items: int):
"""Task with progress updates"""
for i, item in enumerate(items):
await process_item(item)
# Update progress
await self.update_progress(
percent=(i + 1) / total_items * 100,
message=f"Processed {i + 1} of {total_items}"
)
return {'processed': total_items}
Frontend tracking:
async function trackTask(taskId) {
while (true) {
const task = await fetch(`/api/tasks/${taskId}`).then(r => r.json());
console.log(`Progress: ${task.progress}%`);
if (task.status === 'success') {
console.log('Task completed:', task.result);
break;
}
if (task.status === 'failed') {
console.error('Task failed:', task.error);
break;
}
await sleep(1000); // Check every second
}
}
@task(retry=3, retry_delay=60)
async def api_call_with_retry(url: str):
"""API call with exponential backoff"""
try:
response = await http_client.get(url)
return response.json()
except HTTPError as e:
if e.status_code in [500, 502, 503]:
# Retry on server errors
# Exponential backoff: 60s, 120s, 240s
retry_count = self.request.retries
delay = 60 * (2 ** retry_count)
logger.warning(f"API call failed, retry in {delay}s")
raise TaskRetry(delay=delay)
else:
# Don't retry on client errors
raise TaskFailed(f"API call failed: {e}", retry=False)
# High priority (executed first)
@task(priority=9)
async def urgent_task():
"""Critical task"""
pass
# Normal priority
@task(priority=5)
async def normal_task():
"""Normal task"""
pass
# Low priority (executed last)
@task(priority=1)
async def batch_task():
"""Batch processing"""
pass
# Enqueue with priority
await send_email.apply_async(
kwargs={'to': 'user@example.com'},
priority=9
)
from cifra.tasks import rate_limit
@task
@rate_limit(max_requests=100, window=60) # 100 per minute
async def send_sms(phone: str, message: str):
"""Send SMS with rate limiting"""
await sms_service.send(phone, message)
# worker_config.yaml
workers:
# Default worker
default:
concurrency: 4 # 4 concurrent tasks
prefetch_multiplier: 4 # Prefetch 4 tasks per worker
queues: [default]
# High priority worker
priority:
concurrency: 2
queues: [high_priority]
# CPU-intensive worker
cpu:
concurrency: 2
max_tasks_per_child: 100 # Restart after 100 tasks
queues: [cpu_intensive]
# IO-intensive worker
io:
concurrency: 20
queues: [io_intensive]
Start workers:
# Default worker
cifra worker start --config worker_config.yaml --worker default
# All workers
cifra worker start --config worker_config.yaml --all
# Specific queue
cifra worker start --queue high_priority --concurrency 10
# Define queues
@task(queue='default')
async def normal_task():
pass
@task(queue='high_priority')
async def urgent_task():
pass
@task(queue='cpu_intensive')
async def heavy_computation():
pass
@task(queue='io_intensive')
async def download_file():
pass
# Route tasks to queues
task_routes = {
'send_email': {'queue': 'default'},
'process_video': {'queue': 'cpu_intensive'},
'import_data': {'queue': 'io_intensive'},
'urgent_notification': {'queue': 'high_priority'}
}
from cifra.tasks import scheduled_task
# Every minute
@scheduled_task(cron='* * * * *')
async def check_health():
"""Health check every minute"""
pass
# Every 5 minutes
@scheduled_task(cron='*/5 * * * *')
async def sync_data():
"""Sync data every 5 minutes"""
pass
# Every hour
@scheduled_task(cron='0 * * * *')
async def cleanup_temp():
"""Cleanup temp files hourly"""
pass
# Every day at 2 AM
@scheduled_task(cron='0 2 * * *')
async def daily_report():
"""Generate daily report"""
pass
# Every Monday at 9 AM
@scheduled_task(cron='0 9 * * 1')
async def weekly_summary():
"""Send weekly summary"""
pass
# First day of month
@scheduled_task(cron='0 0 1 * *')
async def monthly_billing():
"""Process monthly billing"""
pass
Manage scheduled tasks:
# List scheduled tasks
cifra scheduler list
# Enable task
cifra scheduler enable daily_report
# Disable task
cifra scheduler disable daily_report
# Run task now
cifra scheduler run daily_report
from cifra.tasks import task
@task(store_result=True, result_expires=3600) # Store for 1 hour
async def expensive_computation(data):
"""Expensive computation with result caching"""
result = await compute(data)
return result
# Get result
task_id = await expensive_computation.delay(data)
# Result is cached for 1 hour
result = await task_id.get() # Fast (from cache)
from cifra.tasks import task, TaskFailed, TaskRetry
@task(retry=3)
async def robust_task():
"""Task with error handling"""
try:
# Do work
result = await do_work()
return result
except TemporaryError as e:
# Retry temporary errors
logger.warning(f"Temporary error: {e}, will retry")
raise TaskRetry(delay=60)
except PermanentError as e:
# Don't retry permanent errors
logger.error(f"Permanent error: {e}")
raise TaskFailed(str(e), retry=False)
except Exception as e:
# Log unexpected errors
logger.exception(f"Unexpected error: {e}")
raise
# Error callbacks
@task
async def on_failure(task_id, exception, traceback):
"""Called when task fails"""
logger.error(f"Task {task_id} failed: {exception}")
# Notify admin
await send_email.delay(
to='admin@example.com',
subject=f'Task Failed: {task_id}',
body=f'Error: {exception}\n\nTraceback:\n{traceback}'
)
@task(on_failure=on_failure)
async def my_task():
"""Task with failure callback"""
pass
# Start Flower
cifra flower start --port 5555
# Access: http://localhost:5555
Metrics:
- Active tasks
- Task success/failure rates
- Task execution time
- Worker status
- Queue lengths
from cifra.tasks import task_monitor
# Get metrics
stats = await task_monitor.get_stats()
print(f"Active tasks: {stats['active']}")
print(f"Completed today: {stats['completed_today']}")
print(f"Failed today: {stats['failed_today']}")
print(f"Average execution time: {stats['avg_execution_time']}s")
# Get task history
history = await task_monitor.get_history(
task_name='send_email',
from_date=datetime.now() - timedelta(days=7)
)
for task in history:
print(f"{task.created_at}: {task.status} - {task.execution_time}s")
@task
async def send_campaign(campaign_id: UUID):
"""Send email campaign to all subscribers"""
campaign = await Campaign.get(campaign_id)
subscribers = await campaign.get_subscribers()
# Create subtasks for each batch
batch_size = 100
tasks = []
for i in range(0, len(subscribers), batch_size):
batch = subscribers[i:i + batch_size]
tasks.append(send_campaign_batch.s(campaign_id, [s.id for s in batch]))
# Execute in parallel
result = await group(tasks).apply_async()
# Wait for all to complete
await result.wait()
# Update campaign status
await campaign.update({'status': 'sent', 'sent_at': datetime.now()})
@task
async def send_campaign_batch(campaign_id: UUID, subscriber_ids: list):
"""Send campaign to batch of subscribers"""
campaign = await Campaign.get(campaign_id)
for subscriber_id in subscriber_ids:
subscriber = await Subscriber.get(subscriber_id)
await send_email.delay(
to=subscriber.email,
subject=campaign.subject,
body=render_template(campaign.template, subscriber=subscriber)
)
@task
async def import_csv(file_id: UUID):
"""Import contacts from CSV file"""
file = await FileAttachment.get(file_id)
# Download file
content = await storage.download(file.storage_path)
# Parse CSV
import csv
reader = csv.DictReader(content.decode().splitlines())
rows = list(reader)
total = len(rows)
await self.update_progress(0, f"Starting import of {total} rows")
# Import in batches
batch_size = 100
for i in range(0, total, batch_size):
batch = rows[i:i + batch_size]
# Import batch
for row in batch:
await Contact.create({
'first_name': row['first_name'],
'last_name': row['last_name'],
'email': row['email'],
'phone': row.get('phone'),
})
# Update progress
progress = (i + len(batch)) / total * 100
await self.update_progress(
progress,
f"Imported {i + len(batch)} of {total} rows"
)
return {'imported': total}
@task(timeout=600) # 10 minutes timeout
async def generate_monthly_report(month: int, year: int):
"""Generate monthly sales report"""
# Query data
await self.update_progress(10, "Querying sales data...")
sales = await Sale.filter(
created_at__gte=datetime(year, month, 1),
created_at__lt=datetime(year, month + 1, 1)
).all()
await self.update_progress(30, "Aggregating data...")
# Aggregate
report_data = {
'total_sales': sum(s.amount for s in sales),
'total_count': len(sales),
'by_category': {},
'by_day': {}
}
await self.update_progress(50, "Generating charts...")
# Generate charts
chart_urls = await generate_charts(report_data)
await self.update_progress(70, "Generating PDF...")
# Generate PDF
pdf = await pdf_generator.generate(
template='monthly_report.html',
data=report_data,
charts=chart_urls
)
await self.update_progress(90, "Saving report...")
# Upload PDF
file_id = await storage.save(pdf, path=f"reports/{year}-{month:02d}.pdf")
# Create record
report = await Report.create({
'month': month,
'year': year,
'file_url': storage.url(file_id),
'data': report_data
})
await self.update_progress(100, "Report completed")
return {'report_id': report.id, 'url': report.file_url}
# config.yaml
background_jobs:
# Broker
broker:
type: redis # redis, rabbitmq
url: redis://localhost:6379/0
# Result backend
result_backend:
type: redis
url: redis://localhost:6379/1
expires: 3600 # Results expire after 1 hour
# Workers
workers:
default_concurrency: 4
prefetch_multiplier: 4
max_tasks_per_child: 1000
# Retries
retry:
max_retries: 3
default_retry_delay: 60
retry_backoff: true
retry_backoff_max: 600
# Monitoring
monitoring:
flower:
enabled: true
port: 5555
metrics:
enabled: true
prometheus_port: 9090
Документация: https://docs.cifra.io/background-jobs