Версия: 2.0.0
Дата: 2025-11-11
Статус: READY
┌─────────────────────┐
│ ORCHESTRATOR │
│ (Координатор) │
└──────────┬──────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│Projector│ │Designer │ │Developer│
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────────┼────────────────┘
│
┌────▼────┐
│ Logger │
│(Журнал) │
└─────────┘
class AgentMessage:
"""
Стандартное сообщение между агентами.
"""
message_id: str # UUID
from_agent: str # "Orchestrator", "Projector", etc.
to_agent: str # "Designer", "Developer", etc.
message_type: str # "request", "response", "error", "log"
task: str # Описание задачи
context: dict # Контекст выполнения
payload: dict # Данные задачи
timestamp: str # ISO 8601
priority: str # "critical", "high", "medium", "low"
{
"message_id": "uuid-123-456",
"from_agent": "Orchestrator",
"to_agent": "Projector",
"message_type": "request",
"task": "Создать структуру проекта analytics",
"context": {
"user": "admin",
"session_id": "session-789",
"project_path": "/opt/claude-workspace/projects/"
},
"payload": {
"project_name": "analytics",
"project_type": "web-app",
"team_size": 3,
"mode": "agile"
},
"timestamp": "2025-11-11T15:30:00Z",
"priority": "high"
}
{
"message_id": "uuid-456-789",
"from_agent": "Projector",
"to_agent": "Orchestrator",
"message_type": "response",
"task": "Создать структуру проекта analytics",
"context": {
"request_id": "uuid-123-456",
"session_id": "session-789"
},
"payload": {
"status": "success",
"result": {
"project_path": "projects/analytics/",
"created_files": [
"design/PROJECT.md",
"design/ROADMAP.md",
"management/README.md",
"management/CHANGELOG.md"
],
"project_level": "C",
"estimated_days": {
"optimistic": 24,
"realistic": 30,
"pessimistic": 45
}
},
"duration": 2.3,
"tokens_used": 1240
},
"timestamp": "2025-11-11T15:30:02Z",
"priority": "high"
}
{
"message_id": "uuid-789-012",
"from_agent": "Developer",
"to_agent": "Orchestrator",
"message_type": "error",
"task": "Реализовать функцию export_to_excel",
"context": {
"request_id": "uuid-234-567",
"session_id": "session-789"
},
"payload": {
"error_type": "DependencyError",
"error_message": "Module 'openpyxl' not found",
"error_details": {
"missing_dependency": "openpyxl",
"suggested_fix": "pip install openpyxl"
},
"recovery_possible": true
},
"timestamp": "2025-11-11T15:35:00Z",
"priority": "critical"
}
{
"message_id": "uuid-012-345",
"from_agent": "Developer",
"to_agent": "Logger",
"message_type": "log",
"task": "Реализовать функцию export_to_excel",
"context": {
"session_id": "session-789",
"project": "analytics"
},
"payload": {
"action": "code_created",
"files_created": ["solution/mvp/export.py"],
"lines_of_code": 150,
"duration": 45.2,
"tokens_used": 3400
},
"timestamp": "2025-11-11T15:35:45Z",
"priority": "low"
}
def sequential_workflow():
"""
Orchestrator активирует агентов по очереди.
"""
# Шаг 1: Projector
projector_response = send_message(
to_agent="Projector",
task="Создать структуру проекта CRM",
payload={"project_name": "crm", "type": "web-app"}
)
# Дождаться ответа
wait_for_response(projector_response.message_id)
# Шаг 2: Designer (использует результат Projector)
designer_response = send_message(
to_agent="Designer",
task="Спроектировать архитектуру для CRM",
payload={
"project_path": projector_response.result["project_path"],
"requirements": {...}
}
)
# Шаг 3: Developer (использует результат Designer)
developer_response = send_message(
to_agent="Developer",
task="Реализовать MVP",
payload={
"architecture": designer_response.result["architecture"],
"database": designer_response.result["database"]
}
)
def parallel_workflow():
"""
Независимые задачи выполняются параллельно.
"""
# Запустить параллельно
messages = [
send_message_async("Monitor", "Проверить здоровье системы"),
send_message_async("Optimizer", "Проанализировать производительность"),
send_message_async("Security", "Проверить уязвимости")
]
# Дождаться всех ответов
responses = wait_for_all(messages)
# Агрегировать результаты
return aggregate_diagnostic_results(responses)
def error_recovery_workflow():
"""
При ошибке активируется Recovery агент.
"""
try:
response = send_message("Developer", "Реализовать фичу X")
if response.payload["status"] == "error":
# Активировать Recovery
recovery_response = send_message(
to_agent="Recovery",
task="Обработать ошибку в Developer",
payload={
"failed_agent": "Developer",
"error": response.payload["error_message"],
"error_details": response.payload["error_details"]
}
)
# Recovery может:
# 1. Автоматически исправить (установить зависимость)
# 2. Откатить изменения
# 3. Эскалировать пользователю
if recovery_response.payload["recovered"]:
# Повторить попытку
response = send_message("Developer", "Реализовать фичу X")
except Exception as e:
# Critical error - notify user
notify_user(f"Critical error: {e}")
class Orchestrator:
def activate_agent(self, agent_name: str, task: str, payload: dict) -> AgentResult:
"""
Активирует агента и дожидается результата.
"""
# 1. Создать сообщение
message = AgentMessage(
message_id=generate_uuid(),
from_agent="Orchestrator",
to_agent=agent_name,
message_type="request",
task=task,
payload=payload,
timestamp=now(),
priority="high"
)
# 2. Отправить сообщение
self.send_message(message)
# 3. Визуализация для пользователя
print(f"🔄 {agent_name}: {task}...")
# 4. Дождаться ответа
response = self.wait_for_response(
message_id=message.message_id,
timeout=300 # 5 минут
)
# 5. Проверить статус
if response.payload["status"] == "success":
print(f"✅ {format_result(response.payload['result'])}")
return response.payload["result"]
elif response.payload["status"] == "error":
# Активировать Recovery
return self.handle_error(agent_name, response)
# 6. Записать в лог (через Logger)
self.log_action(agent_name, task, response)
return response.payload["result"]
class Agent:
def process_request(self, message: AgentMessage) -> AgentMessage:
"""
Каждый агент обрабатывает запрос по этой схеме.
"""
try:
# 1. Валидация запроса
self.validate_message(message)
# 2. Загрузка контекста
context = self.load_context(message.context)
# 3. Выполнение задачи
result = self.execute_task(
task=message.task,
payload=message.payload,
context=context
)
# 4. Создание ответа
response = AgentMessage(
message_id=generate_uuid(),
from_agent=self.name,
to_agent=message.from_agent,
message_type="response",
task=message.task,
context={"request_id": message.message_id},
payload={
"status": "success",
"result": result,
"duration": self.execution_time,
"tokens_used": self.tokens_used
},
timestamp=now()
)
return response
except Exception as e:
# Создать error response
return self.create_error_response(message, e)
# User input
user_request = "Создать проект CRM"
# ========================================
# ORCHESTRATOR: Анализ и планирование
# ========================================
orchestrator.receive_request(user_request)
↓
plan = orchestrator.create_plan(
task_type="CREATE_PROJECT",
project_name="crm"
)
# plan = [
# ("Projector", "создать структуру"),
# ("Designer", "спроектировать архитектуру"),
# ("Developer", "создать MVP"),
# ("Tester", "провести тесты"),
# ("Documenter", "создать документацию")
# ]
orchestrator.show_plan_to_user(plan)
user_confirms() # Пользователь: "да"
# ========================================
# STEP 1: PROJECTOR
# ========================================
message_to_projector = {
"to_agent": "Projector",
"task": "Создать структуру проекта crm",
"payload": {
"project_name": "crm",
"project_type": "web-app",
"team_size": 3
}
}
projector_response = orchestrator.activate_agent(message_to_projector)
# projector_response.result = {
# "project_path": "projects/crm/",
# "created_files": [...],
# "estimated_days": 30
# }
# ========================================
# STEP 2: DESIGNER
# ========================================
message_to_designer = {
"to_agent": "Designer",
"task": "Спроектировать архитектуру для crm",
"payload": {
"project_path": projector_response.result["project_path"],
"requirements": {
"users": true,
"products": true,
"orders": true
}
}
}
designer_response = orchestrator.activate_agent(message_to_designer)
# designer_response.result = {
# "architecture": "design/ARCHITECTURE.md",
# "database": "design/DATABASE.md",
# "tables": 5,
# "api_endpoints": 12
# }
# ========================================
# STEP 3: DEVELOPER
# ========================================
message_to_developer = {
"to_agent": "Developer",
"task": "Создать MVP для crm",
"payload": {
"architecture": designer_response.result["architecture"],
"database": designer_response.result["database"],
"tech_stack": ["Python", "Streamlit", "PostgreSQL"]
}
}
developer_response = orchestrator.activate_agent(message_to_developer)
# developer_response.result = {
# "created_files": [
# "solution/mvp/app.py",
# "solution/mvp/pages/users.py",
# "solution/mvp/pages/products.py",
# "solution/mvp/pages/orders.py"
# ],
# "lines_of_code": 850
# }
# ========================================
# STEP 4: TESTER
# ========================================
message_to_tester = {
"to_agent": "Tester",
"task": "Провести тесты для crm MVP",
"payload": {
"code_path": "solution/mvp/",
"test_types": ["unit", "integration"]
}
}
tester_response = orchestrator.activate_agent(message_to_tester)
# tester_response.result = {
# "tests_passed": 15,
# "tests_failed": 0,
# "coverage": 85
# }
# ========================================
# STEP 5: DOCUMENTER
# ========================================
message_to_documenter = {
"to_agent": "Documenter",
"task": "Создать документацию для crm",
"payload": {
"project": "crm",
"components": [
projector_response.result,
designer_response.result,
developer_response.result,
tester_response.result
]
}
}
documenter_response = orchestrator.activate_agent(message_to_documenter)
# documenter_response.result = {
# "created_docs": [
# "design/PROJECT.md",
# "management/README.md",
# "management/CHANGELOG.md"
# ]
# }
# ========================================
# ORCHESTRATOR: Агрегация результатов
# ========================================
final_result = orchestrator.aggregate_results([
projector_response,
designer_response,
developer_response,
tester_response,
documenter_response
])
# ========================================
# LOGGER: Журналирование (параллельно)
# ========================================
logger.log_workflow({
"task": "CREATE_PROJECT crm",
"agents_used": 5,
"total_duration": 120, # секунды
"total_tokens": 18500,
"status": "success"
})
# ========================================
# OUTPUT TO USER
# ========================================
print(f"""
🎉 ПРОЕКТ CRM СОЗДАН
📊 Сводка:
├─ Агентов: 5
├─ Время: 2м 0с
├─ Токены: 18,500
└─ Статус: ✅ Успешно
📦 Результаты:
├─ Структура: projects/crm/
├─ Архитектура: 5 таблиц, 12 API эндпоинтов
├─ Код: 850 строк (4 страницы)
├─ Тесты: 15/15 пройдено (coverage 85%)
└─ Документация: PROJECT.md, README.md, CHANGELOG.md
🚀 Проект готов к использованию!
""")
def validate_message(message: AgentMessage) -> bool:
"""
Валидация сообщения перед обработкой.
"""
# 1. Проверка обязательных полей
required_fields = ["message_id", "from_agent", "to_agent", "message_type", "task"]
if not all(field in message for field in required_fields):
raise ValueError("Missing required fields")
# 2. Проверка типа сообщения
valid_types = ["request", "response", "error", "log"]
if message.message_type not in valid_types:
raise ValueError(f"Invalid message type: {message.message_type}")
# 3. Проверка существования агента
if message.to_agent not in REGISTERED_AGENTS:
raise ValueError(f"Unknown agent: {message.to_agent}")
# 4. Проверка прав доступа
if not has_permission(message.from_agent, message.to_agent):
raise PermissionError(f"{message.from_agent} cannot call {message.to_agent}")
return True
class IntegrationMetrics:
"""
Метрики интеграции агентов.
"""
def collect_metrics(self):
return {
"messages_sent": 1500,
"messages_received": 1500,
"average_response_time": 2.3, # seconds
"error_rate": 0.02, # 2%
"agents_active": 16,
"agents_failed": 0,
"total_tokens_used": 125000
}
Статус: ✅ READY TO USE
Следующий шаг: Реализовать код интеграции в orchestrator/code/