Версия: 1.0.0
Дата: 2025-11-10
Report Definition (YAML) → Query Engine → Data → Template → PDF/Excel/CSV
↓
Cache Layer
↓
Background Job (for large reports)
report:
id: sales_by_month
name: "Monthly Sales Report"
description: "Sales breakdown by month and category"
# Data source
query:
entity: Sale
filters:
created_at: {$date_range: ["{{start_date}}", "{{end_date}}"]}
status: "completed"
# Aggregations
aggregations:
- field: amount
operation: sum
alias: total_sales
- field: id
operation: count
alias: total_count
- field: amount
operation: avg
alias: avg_sale
# Group by
group_by:
- field: "DATE_TRUNC('month', created_at)"
alias: month
- field: category
# Sort
sort: [month, -total_sales]
# Parameters (user input)
parameters:
start_date:
type: date
label: "Start Date"
default: "{{first_day_of_month}}"
required: true
end_date:
type: date
label: "End Date"
default: "{{today}}"
required: true
category:
type: select
label: "Category"
options:
- Electronics
- Clothing
- Food
multiple: true
required: false
# Output formats
outputs:
- type: pdf
template: sales_report.html
orientation: portrait
- type: excel
sheet_name: "Sales Data"
- type: csv
# Visualization
charts:
- type: bar
title: "Sales by Month"
x: month
y: total_sales
- type: pie
title: "Sales by Category"
label: category
value: total_sales
from cifra.reports import ReportBuilder
builder = ReportBuilder()
# Generate report
report = await builder.generate(
report_id='sales_by_month',
parameters={
'start_date': '2025-01-01',
'end_date': '2025-12-31',
'category': ['Electronics', 'Clothing']
},
format='pdf'
)
# Get URL
url = report.url
# → https://example.com/reports/abc-123.pdf
class ReportQueryEngine:
async def execute_query(self, query_def: dict) -> pd.DataFrame:
"""Execute report query"""
# 1. Build SQLAlchemy query
q = self.build_query(query_def)
# 2. Apply filters
for field, condition in query_def.get('filters', {}).items():
q = self.apply_filter(q, field, condition)
# 3. Apply aggregations
if 'aggregations' in query_def:
q = self.apply_aggregations(q, query_def['aggregations'])
# 4. Group by
if 'group_by' in query_def:
q = self.apply_group_by(q, query_def['group_by'])
# 5. Sort
if 'sort' in query_def:
q = self.apply_sort(q, query_def['sort'])
# 6. Execute
result = await db.execute(q)
# 7. Convert to DataFrame
df = pd.DataFrame(result.fetchall(), columns=result.keys())
return df
Example query:
# Report definition
query_def = {
'entity': 'Sale',
'filters': {
'created_at': {'$gte': '2025-01-01'},
'status': 'completed'
},
'aggregations': [
{'field': 'amount', 'operation': 'sum', 'alias': 'total_sales'},
{'field': 'id', 'operation': 'count', 'alias': 'count'}
],
'group_by': [
{'field': "DATE_TRUNC('month', created_at)", 'alias': 'month'}
],
'sort': ['month']
}
# Execute
df = await query_engine.execute_query(query_def)
# Result DataFrame:
# month total_sales count
# 0 2025-01-01 125000 450
# 1 2025-02-01 142000 520
# 2 2025-03-01 138000 498
<!-- sales_report.html -->
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; }
table { width: 100%; border-collapse: collapse; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #4CAF50; color: white; }
.chart { margin: 20px 0; }
</style>
</head>
<body>
<h1>{{ report.name }}</h1>
<p>Generated: {{ now }}</p>
<p>Period: {{ parameters.start_date }} - {{ parameters.end_date }}</p>
<h2>Summary</h2>
<table>
<tr>
<th>Total Sales</th>
<td>${{ summary.total_sales | number_format(2) }}</td>
</tr>
<tr>
<th>Total Orders</th>
<td>{{ summary.total_count }}</td>
</tr>
<tr>
<th>Average Order</th>
<td>${{ summary.avg_sale | number_format(2) }}</td>
</tr>
</table>
<h2>Sales by Month</h2>
<table>
<thead>
<tr>
<th>Month</th>
<th>Sales</th>
<th>Orders</th>
<th>Avg Order</th>
</tr>
</thead>
<tbody>
{% for row in data %}
<tr>
<td>{{ row.month | date('F Y') }}</td>
<td>${{ row.total_sales | number_format(2) }}</td>
<td>{{ row.total_count }}</td>
<td>${{ (row.total_sales / row.total_count) | number_format(2) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<div class="chart">
<img src="{{ charts.sales_by_month }}" alt="Sales by Month">
</div>
<div class="chart">
<img src="{{ charts.sales_by_category }}" alt="Sales by Category">
</div>
</body>
</html>
from cifra.charts import ChartBuilder
import matplotlib.pyplot as plt
class ChartBuilder:
async def generate_chart(self, chart_def: dict, data: pd.DataFrame) -> str:
"""Generate chart and return URL"""
if chart_def['type'] == 'bar':
return await self.bar_chart(chart_def, data)
elif chart_def['type'] == 'line':
return await self.line_chart(chart_def, data)
elif chart_def['type'] == 'pie':
return await self.pie_chart(chart_def, data)
async def bar_chart(self, chart_def: dict, data: pd.DataFrame) -> str:
"""Generate bar chart"""
fig, ax = plt.subplots(figsize=(10, 6))
x = data[chart_def['x']]
y = data[chart_def['y']]
ax.bar(x, y)
ax.set_title(chart_def['title'])
ax.set_xlabel(chart_def['x'])
ax.set_ylabel(chart_def['y'])
# Rotate x labels
plt.xticks(rotation=45, ha='right')
# Save to temp file
filename = f"{uuid4()}.png"
filepath = f"/tmp/charts/{filename}"
plt.savefig(filepath, bbox_inches='tight', dpi=150)
plt.close()
# Upload to storage
with open(filepath, 'rb') as f:
url = await storage.save(f, path=f"charts/{filename}")
return url
from cifra.reports import PDFGenerator
class PDFGenerator:
async def generate(
self,
template: str,
data: dict,
orientation: str = 'portrait'
) -> bytes:
"""Generate PDF from HTML template"""
# Render template
html = await self.render_template(template, data)
# Convert to PDF (using WeasyPrint or wkhtmltopdf)
from weasyprint import HTML
pdf_bytes = HTML(string=html).write_pdf()
return pdf_bytes
# Usage
pdf_generator = PDFGenerator()
pdf = await pdf_generator.generate(
template='sales_report.html',
data={
'report': report_def,
'parameters': parameters,
'data': df.to_dict('records'),
'summary': summary,
'charts': chart_urls,
'now': datetime.now()
}
)
# Save to file
file_id = await storage.save(pdf, path=f"reports/{uuid4()}.pdf")
from cifra.reports import ExcelGenerator
import pandas as pd
class ExcelGenerator:
async def generate(
self,
data: pd.DataFrame,
sheet_name: str = 'Sheet1',
include_charts: bool = True
) -> bytes:
"""Generate Excel file"""
from io import BytesIO
output = BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
# Write data
data.to_excel(writer, sheet_name=sheet_name, index=False)
# Get workbook and worksheet
workbook = writer.book
worksheet = writer.sheets[sheet_name]
# Format header
header_format = workbook.add_format({
'bold': True,
'bg_color': '#4CAF50',
'font_color': 'white',
'border': 1
})
for col_num, value in enumerate(data.columns.values):
worksheet.write(0, col_num, value, header_format)
# Auto-fit columns
for i, col in enumerate(data.columns):
max_len = max(
data[col].astype(str).map(len).max(),
len(col)
)
worksheet.set_column(i, i, max_len + 2)
# Add chart
if include_charts:
chart = workbook.add_chart({'type': 'column'})
chart.add_series({
'categories': f'={sheet_name}!$A$2:$A${len(data)+1}',
'values': f'={sheet_name}!$B$2:$B${len(data)+1}',
'name': 'Sales'
})
chart.set_title({'name': 'Sales by Month'})
worksheet.insert_chart('E2', chart)
output.seek(0)
return output.read()
async def generate_csv(data: pd.DataFrame) -> bytes:
"""Generate CSV file"""
from io import StringIO
output = StringIO()
data.to_csv(output, index=False)
output.seek(0)
return output.read().encode('utf-8')
from cifra.reports import ReportSchedule
# Schedule daily report
schedule = await ReportSchedule.create({
'report_id': 'sales_by_month',
'cron': '0 8 * * *', # Every day at 8 AM
'parameters': {
'start_date': '{{first_day_of_month}}',
'end_date': '{{today}}'
},
'format': 'pdf',
'recipients': ['manager@example.com', 'ceo@example.com']
})
# Background job
@scheduled_task(cron='0 8 * * *')
async def run_scheduled_reports():
"""Run scheduled reports"""
schedules = await ReportSchedule.filter(enabled=True).all()
for schedule in schedules:
# Generate report
report = await builder.generate(
report_id=schedule.report_id,
parameters=schedule.parameters,
format=schedule.format
)
# Email to recipients
for recipient in schedule.recipients:
await send_email.delay(
to=recipient,
subject=f"Scheduled Report: {report.name}",
body=f"Your scheduled report is ready.",
attachments=[report.file]
)
parameters:
# Text input
search_term:
type: text
label: "Search Term"
default: ""
# Number input
min_amount:
type: number
label: "Minimum Amount"
default: 0
min: 0
max: 1000000
# Date input
start_date:
type: date
label: "Start Date"
default: "{{first_day_of_month}}"
# Date range
date_range:
type: daterange
label: "Date Range"
default: ["{{first_day_of_month}}", "{{today}}"]
# Select (single)
status:
type: select
label: "Status"
options:
- value: active
label: Active
- value: inactive
label: Inactive
default: active
# Select (multiple)
categories:
type: select
label: "Categories"
options: [Electronics, Clothing, Food]
multiple: true
# Boolean
include_archived:
type: boolean
label: "Include Archived"
default: false
# Entity reference
company:
type: entity
entity: Company
label: "Company"
required: false
# Dynamic parameter defaults
parameter_defaults = {
'{{today}}': datetime.now().date(),
'{{first_day_of_month}}': datetime.now().replace(day=1).date(),
'{{last_day_of_month}}': (datetime.now().replace(day=1) + timedelta(days=32)).replace(day=1) - timedelta(days=1),
'{{first_day_of_year}}': datetime.now().replace(month=1, day=1).date(),
'{{last_year}}': datetime.now().year - 1,
'{{current_user_id}}': lambda ctx: ctx.user.id,
'{{current_company_id}}': lambda ctx: ctx.user.company_id
}
import React, { useState } from 'react';
function ReportBuilder({ reportId }) {
const [parameters, setParameters] = useState({});
const [loading, setLoading] = useState(false);
async function generateReport(format) {
setLoading(true);
const response = await fetch('/api/reports/generate', {
method: 'POST',
body: JSON.stringify({
report_id: reportId,
parameters: parameters,
format: format
})
});
const result = await response.json();
// Download file
window.open(result.url, '_blank');
setLoading(false);
}
return (
<div>
<h2>Generate Report</h2>
{/* Parameter inputs */}
<div>
<label>Start Date:</label>
<input
type="date"
value={parameters.start_date || ''}
onChange={e => setParameters({...parameters, start_date: e.target.value})}
/>
</div>
<div>
<label>End Date:</label>
<input
type="date"
value={parameters.end_date || ''}
onChange={e => setParameters({...parameters, end_date: e.target.value})}
/>
</div>
{/* Format buttons */}
<div>
<button onClick={() => generateReport('pdf')} disabled={loading}>
Generate PDF
</button>
<button onClick={() => generateReport('excel')} disabled={loading}>
Generate Excel
</button>
<button onClick={() => generateReport('csv')} disabled={loading}>
Generate CSV
</button>
</div>
</div>
);
}
from cifra.cache import cached
@cached(ttl=3600, key_prefix='report')
async def generate_report_cached(report_id: str, parameters: dict, format: str):
"""Generate report with caching"""
# Cache key based on report_id + parameters
cache_key = f"report:{report_id}:{hash(json.dumps(parameters))}:{format}"
# Try cache
cached_url = await cache.get(cache_key)
if cached_url:
return {'url': cached_url, 'cached': True}
# Generate report
report = await builder.generate(
report_id=report_id,
parameters=parameters,
format=format
)
# Cache URL
await cache.set(cache_key, report.url, ttl=3600)
return {'url': report.url, 'cached': False}
permissions:
- id: report:view
name: "View Reports"
- id: report:generate
name: "Generate Reports"
- id: report:schedule
name: "Schedule Reports"
- id: report:manage
name: "Manage Report Definitions"
from cifra.auth import require_permission
@app.post("/api/reports/generate")
@require_permission('report:generate')
async def generate_report(
report_id: str,
parameters: dict,
format: str,
current_user: User
):
"""Generate report with permission check"""
report_def = await ReportDefinition.get(report_id)
# Check if user has access to this specific report
if not await current_user.can_access_report(report_def):
raise PermissionError("Access denied")
# Generate
report = await builder.generate(
report_id=report_id,
parameters=parameters,
format=format,
user=current_user
)
return report
# config.yaml
reports:
# Storage
storage:
backend: s3
bucket: reports
path: reports/
# PDF generation
pdf:
engine: weasyprint # weasyprint, wkhtmltopdf
orientation: portrait
page_size: A4
# Excel
excel:
engine: xlsxwriter
# Charts
charts:
backend: matplotlib # matplotlib, plotly
dpi: 150
format: png
# Caching
cache:
enabled: true
ttl: 3600 # 1 hour
# Limits
limits:
max_rows: 100000
max_generation_time: 600 # 10 minutes
# Background jobs
background:
threshold: 10 # Use background job if generation > 10s
Документация: https://docs.cifra.io/reports