┌─────────────────────────────────────────────────────────────┐
│ КРИТИЧНЫЕ ПРАВИЛА РАБОТЫ С PIM │
├─────────────────────────────────────────────────────────────┤
│ │
│ ✓ С PIM работают ТОЛЬКО утвержденные оператором функции │
│ ✓ Агент НЕ может сам создавать функции без разрешения │
│ ✓ Агент НЕ может вмешиваться в данные напрямую │
│ ✓ Агент ТОЛЬКО предлагает новые функции → оператор решает │
│ ✓ Парсеры создаются агентом АВТОМАТИЧЕСКИ (некритично) │
│ │
└─────────────────────────────────────────────────────────────┘
Работа с мастер-данными:
- ✓ Создание/обновление товаров в pim_products
- ✓ Удаление товаров
- ✓ Изменение цен (base_price, cost_price)
- ✓ Создание/изменение правил ценообразования
- ✓ Массовое обновление данных
Интеграция с 1С:
- ✓ Импорт из 1С → 1c_products
- ✓ Экспорт в 1С (обновление учетной системы)
- ✓ Синхронизация данных
Обработка данных:
- ✓ Применение правил ценообразования
- ✓ Обогащение товаров
- ✓ Массовый пересчет цен
Парсеры (агент создает сам):
- ✓ Парсинг прайс-листов → staging
- ✓ Парсинг сайтов → staging
- ✓ Скачивание изображений
- ✓ Извлечение данных из файлов
Чтение данных:
- ✓ Получение товаров
- ✓ Поиск товаров
- ✓ Просмотр логов
- ✓ Статистика
Staging операции:
- ✓ Запись в pim_staging_products
- ✓ Валидация staging данных
- ✓ Сопоставление (matching)
┌─────────────────────────────────────────────────────────────┐
│ ЖИЗНЕННЫЙ ЦИКЛ ФУНКЦИИ │
└─────────────────────────────────────────────────────────────┘
1. ПРЕДЛОЖЕНИЕ АГЕНТОМ
├─→ Агент анализирует требования
├─→ Генерирует код функции
├─→ Создает описание и документацию
├─→ Сохраняет в pim_function_proposals
└─→ Статус: 'proposed'
↓
2. РЕВЬЮ ОПЕРАТОРОМ
├─→ Оператор просматривает код
├─→ Проверяет безопасность
├─→ Тестирует на тестовых данных
└─→ Принимает решение:
├─→ APPROVE → переход к шагу 3
├─→ REJECT → функция отклонена
└─→ REQUEST_CHANGES → возврат агенту
↓
3. УТВЕРЖДЕНИЕ
├─→ Функция добавляется в pim_approved_functions
├─→ Код деплоится в production
├─→ Функция становится доступной
└─→ Статус: 'approved'
↓
4. ИСПОЛЬЗОВАНИЕ
├─→ Агент может вызывать функцию
├─→ Функция выполняется с логированием
├─→ Все действия записываются в audit log
└─→ Оператор видит все операции
↓
5. ОТЗЫВ (опционально)
├─→ Оператор может отозвать утверждение
├─→ Функция становится недоступной
└─→ Статус: 'revoked'
class PimFunctionProposal(Base):
"""Предложенные агентом функции (ожидают утверждения)"""
__tablename__ = "pim_function_proposals"
id = Column(Integer, primary_key=True)
# Идентификация
function_name = Column(String(100), nullable=False, unique=True)
category = Column(String(50)) # 'product', 'pricing', 'integration', 'enrichment'
criticality = Column(String(20)) # 'critical', 'moderate', 'low'
# Описание
description = Column(Text, nullable=False)
purpose = Column(Text) # зачем нужна функция
use_cases = Column(JSON) # примеры использования
# Код
code = Column(Text, nullable=False) # Python код функции
code_hash = Column(String(64)) # SHA256 для контроля изменений
dependencies = Column(JSON) # список зависимостей
# Параметры
parameters_schema = Column(JSON) # JSON Schema параметров
return_schema = Column(JSON) # JSON Schema возвращаемых данных
# Безопасность
reads_tables = Column(JSON) # какие таблицы читает
writes_tables = Column(JSON) # какие таблицы модифицирует
external_calls = Column(Boolean, default=False) # делает ли внешние вызовы
# Предложение
proposed_by = Column(String(50), default='agent')
proposed_at = Column(DateTime, default=datetime.utcnow)
proposal_reason = Column(Text)
# Статус утверждения
status = Column(String(20), default='proposed')
# 'proposed', 'under_review', 'approved', 'rejected', 'changes_requested'
# Ревью
reviewed_by = Column(String(100)) # оператор
reviewed_at = Column(DateTime)
review_notes = Column(Text)
rejection_reason = Column(Text)
# Тестирование
test_results = Column(JSON)
test_passed = Column(Boolean)
# Метаданные
version = Column(Integer, default=1)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
class PimApprovedFunction(Base):
"""Утвержденные функции (разрешены к использованию)"""
__tablename__ = "pim_approved_functions"
id = Column(Integer, primary_key=True)
proposal_id = Column(Integer, ForeignKey('pim_function_proposals.id'))
# Идентификация
function_name = Column(String(100), nullable=False, unique=True)
category = Column(String(50))
criticality = Column(String(20))
# Код (FROZEN - не меняется после утверждения)
code = Column(Text, nullable=False)
code_hash = Column(String(64))
# Параметры
parameters_schema = Column(JSON)
return_schema = Column(JSON)
# Утверждение
approved_by = Column(String(100), nullable=False) # оператор
approved_at = Column(DateTime, default=datetime.utcnow)
approval_notes = Column(Text)
# Статус
is_active = Column(Boolean, default=True)
revoked_at = Column(DateTime)
revoked_by = Column(String(100))
revoke_reason = Column(Text)
# Использование
usage_count = Column(Integer, default=0)
last_used_at = Column(DateTime)
# Версионирование
version = Column(Integer, default=1)
class PimFunctionExecutionLog(Base):
"""Audit log всех выполнений функций"""
__tablename__ = "pim_function_execution_log"
id = Column(Integer, primary_key=True)
function_id = Column(Integer, ForeignKey('pim_approved_functions.id'))
function_name = Column(String(100), nullable=False)
# Контекст выполнения
executed_by = Column(String(100)) # 'agent' или user ID
executed_at = Column(DateTime, default=datetime.utcnow)
# Входные данные
parameters = Column(JSON) # переданные параметры
# Результат
status = Column(String(20)) # 'success', 'error', 'warning'
result = Column(JSON) # результат выполнения
error_message = Column(Text) # если ошибка
# Влияние на данные
rows_affected = Column(Integer) # сколько записей изменено
tables_modified = Column(JSON) # какие таблицы изменены
# Производительность
execution_time_ms = Column(Integer)
# Метаданные
request_id = Column(String(100)) # для трейсинга
class PimParserRegistry(Base):
"""Реестр автоматически созданных парсеров"""
__tablename__ = "pim_parser_registry"
id = Column(Integer, primary_key=True)
# Идентификация
parser_name = Column(String(100), nullable=False, unique=True)
brand = Column(String(100), nullable=False)
parser_type = Column(String(50)) # 'price_excel', 'price_csv', 'site'
# Конфигурация
config = Column(JSON, nullable=False) # mapping колонок и т.д.
# Код (генерируется агентом)
code = Column(Text, nullable=False)
code_hash = Column(String(64))
# Создание
created_by = Column(String(50), default='agent')
created_at = Column(DateTime, default=datetime.utcnow)
# Статус
is_active = Column(Boolean, default=True)
last_used_at = Column(DateTime)
usage_count = Column(Integer, default=0)
# Производительность
avg_parse_time_ms = Column(Integer)
success_rate = Column(Numeric(5, 2)) # процент успешных парсингов
last_error = Column(Text)
# Версия
version = Column(Integer, default=1)
# Предложение новой функции
POST /api/v1/functions/propose
{
"function_name": "recalculate_brand_prices",
"category": "pricing",
"criticality": "critical",
"description": "Пересчитать цены для всех товаров бренда",
"code": "async def recalculate_brand_prices(...)...",
"reads_tables": ["pim_products", "pim_cost_rules"],
"writes_tables": ["pim_products", "pim_price_history"],
"purpose": "Массовое обновление цен при изменении правил",
"use_cases": [
"Поставщик изменил скидку с 10% на 15%",
"Нужно пересчитать 200 товаров"
]
}
# Получить список утвержденных функций (доступных агенту)
GET /api/v1/functions/approved
# Выполнить утвержденную функцию
POST /api/v1/functions/execute/{function_name}
{
"parameters": {
"brand": "ФЕЙЕРВЕРК",
"discount_percent": 15.0
}
}
# Создать парсер (автоматически, без утверждения)
POST /api/v1/parsers/create
{
"parser_name": "salut_excel_parser",
"brand": "САЛЮТ",
"parser_type": "price_excel",
"config": {
"article_column": "A",
"name_column": "B",
"price_column": "C"
},
"code": "class SalutExcelParser(BaseParser)..."
}
# Получить список предложенных функций
GET /api/v1/operator/proposals
GET /api/v1/operator/proposals?status=proposed
GET /api/v1/operator/proposals?criticality=critical
# Получить детали предложения
GET /api/v1/operator/proposals/{id}
# Утвердить функцию
POST /api/v1/operator/proposals/{id}/approve
{
"approval_notes": "Проверено, безопасно",
"test_results": {...}
}
# Отклонить функцию
POST /api/v1/operator/proposals/{id}/reject
{
"rejection_reason": "Слишком опасная операция без дополнительных проверок"
}
# Запросить изменения
POST /api/v1/operator/proposals/{id}/request_changes
{
"changes_requested": "Добавить валидацию параметров и проверку прав"
}
# Отозвать утвержденную функцию
POST /api/v1/operator/functions/{id}/revoke
{
"revoke_reason": "Обнаружена уязвимость"
}
# Просмотр audit log
GET /api/v1/operator/audit-log
GET /api/v1/operator/audit-log?function_name=recalculate_brand_prices
GET /api/v1/operator/audit-log?date_from=2024-01-01
# Статистика использования функций
GET /api/v1/operator/stats/functions
class ApprovalService:
"""Сервис управления утверждением функций"""
async def propose_function(
self,
db: AsyncSession,
function_name: str,
code: str,
description: str,
category: str,
criticality: str,
**metadata
) -> PimFunctionProposal:
"""
Агент предлагает новую функцию
1. Валидирует код (синтаксис)
2. Анализирует безопасность (SQL injection, etc)
3. Извлекает зависимости
4. Создает запись в proposals
5. Уведомляет оператора
"""
async def approve_function(
self,
db: AsyncSession,
proposal_id: int,
operator_id: str,
approval_notes: str
) -> PimApprovedFunction:
"""
Оператор утверждает функцию
1. Обновляет статус proposal
2. Создает approved_function
3. Деплоит код (добавляет в runtime)
4. Логирует утверждение
"""
async def reject_function(
self,
db: AsyncSession,
proposal_id: int,
operator_id: str,
rejection_reason: str
):
"""Оператор отклоняет функцию"""
async def revoke_function(
self,
db: AsyncSession,
function_id: int,
operator_id: str,
revoke_reason: str
):
"""Оператор отзывает утвержденную функцию"""
async def get_approved_functions(
self,
db: AsyncSession,
category: str = None,
is_active: bool = True
) -> List[PimApprovedFunction]:
"""Получить список утвержденных функций"""
class FunctionExecutor:
"""Безопасное выполнение утвержденных функций"""
async def execute(
self,
db: AsyncSession,
function_name: str,
parameters: dict,
executed_by: str = 'agent'
) -> ExecutionResult:
"""
Выполнить утвержденную функцию
1. Проверяет, что функция утверждена и активна
2. Валидирует параметры по schema
3. Выполняет код в изолированном контексте
4. Логирует выполнение
5. Возвращает результат
КРИТИЧНО: Только утвержденные функции!
"""
# 1. Проверка утверждения
function = await self._get_approved_function(db, function_name)
if not function or not function.is_active:
raise FunctionNotApprovedError(f"Function {function_name} not approved")
# 2. Валидация параметров
self._validate_parameters(parameters, function.parameters_schema)
# 3. Создание audit log entry
log_entry = PimFunctionExecutionLog(
function_id=function.id,
function_name=function_name,
executed_by=executed_by,
parameters=parameters
)
# 4. Выполнение с контролем времени
start_time = time.time()
try:
# Динамическая загрузка кода
func = self._load_function_code(function.code)
# Выполнение
result = await func(db, **parameters)
# Успех
log_entry.status = 'success'
log_entry.result = result
log_entry.execution_time_ms = int((time.time() - start_time) * 1000)
# Обновление счетчиков
function.usage_count += 1
function.last_used_at = datetime.utcnow()
except Exception as e:
# Ошибка
log_entry.status = 'error'
log_entry.error_message = str(e)
log_entry.execution_time_ms = int((time.time() - start_time) * 1000)
raise
finally:
db.add(log_entry)
await db.commit()
return ExecutionResult(
success=True,
result=result,
execution_time_ms=log_entry.execution_time_ms
)
class ParserFactory:
"""Автоматическое создание парсеров агентом"""
async def create_parser(
self,
db: AsyncSession,
brand: str,
parser_type: str,
config: dict,
sample_file: str = None
) -> PimParserRegistry:
"""
Агент автоматически создает парсер
1. Анализирует sample_file (если есть)
2. Генерирует код парсера на основе config
3. Тестирует на примере
4. Сохраняет в registry
5. Возвращает готовый парсер
ВАЖНО: Парсеры НЕ требуют утверждения оператора!
Они работают только со staging, не с мастер-данными.
"""
# Генерация кода
if parser_type == 'price_excel':
code = await self._generate_excel_parser(brand, config, sample_file)
elif parser_type == 'price_csv':
code = await self._generate_csv_parser(brand, config, sample_file)
elif parser_type == 'site':
code = await self._generate_site_parser(brand, config)
else:
raise ValueError(f"Unknown parser type: {parser_type}")
# Тестирование
if sample_file:
test_result = await self._test_parser(code, sample_file)
if not test_result.success:
raise ParserTestError(f"Parser test failed: {test_result.error}")
# Сохранение в registry
parser = PimParserRegistry(
parser_name=f"{brand.lower()}_{parser_type}_parser",
brand=brand,
parser_type=parser_type,
config=config,
code=code,
code_hash=hashlib.sha256(code.encode()).hexdigest(),
created_by='agent'
)
db.add(parser)
await db.commit()
return parser
async def _generate_excel_parser(
self,
brand: str,
config: dict,
sample_file: str
) -> str:
"""
Генерирует код парсера Excel на основе конфигурации
config = {
'article_column': 'A',
'name_column': 'B',
'price_column': 'C',
'start_row': 5
}
"""
template = f'''
import pandas as pd
from typing import List, Dict
class {brand.capitalize()}ExcelParser(BaseParser):
"""Автоматически созданный парсер для бренда {brand}"""
async def parse_file(self, file_path: str) -> ParseResult:
df = pd.read_excel(
file_path,
skiprows={config.get('start_row', 1) - 1}
)
products = []
for _, row in df.iterrows():
product = {{
'article': str(row['{config['article_column']}']).strip(),
'name': str(row['{config['name_column']}']).strip(),
'base_price': float(row['{config['price_column']}']),
'brand': '{brand}'
}}
# Характеристики (если есть)
characteristics = {{}}
{self._generate_characteristics_extraction(config)}
if characteristics:
product['characteristics'] = characteristics
products.append(product)
return ParseResult(
success=True,
products=products,
count=len(products)
)
'''
return template
# АГЕНТ
# Агент понимает, что нужна функция массового обновления цен
proposal = await approval_service.propose_function(
db,
function_name="recalculate_brand_prices",
category="pricing",
criticality="critical",
description="Массовый пересчет цен для товаров бренда",
code="""
async def recalculate_brand_prices(
db: AsyncSession,
brand: str,
discount_percent: float
) -> Dict:
# 1. Получить правило
rule = await pricing_service.get_pricing_rule(db, brand)
# 2. Обновить процент скидки
if rule:
rule.discount_percent = discount_percent
else:
rule = await pricing_service.create_pricing_rule(db, brand, discount_percent)
# 3. Получить все товары бренда
products = await product_service.list_products(db, filters={'brand': brand})
# 4. Пересчитать цены
updated_count = 0
for product in products:
old_price = product.cost_price
new_price = await pricing_service.calculate_cost_price(
db, product.article, product.base_price, brand
)
product.cost_price = new_price
updated_count += 1
# Логировать изменение
await price_history.log_change(
db, product.article, old_price, new_price, source='mass_update'
)
await db.commit()
return {
'brand': brand,
'updated_count': updated_count,
'new_discount': discount_percent
}
""",
reads_tables=["pim_products", "pim_cost_rules"],
writes_tables=["pim_products", "pim_cost_rules", "pim_price_history"],
purpose="Массовое обновление при изменении правил поставщика",
use_cases=[
"Поставщик ФЕЙЕРВЕРК увеличил скидку с 10% до 15%",
"Нужно обновить 200+ товаров одной операцией"
]
)
# Функция сохранена, статус: 'proposed'
# Оператор получает уведомление
# ОПЕРАТОР
# 1. Просматривает список предложений
proposals = await approval_service.get_proposals(db, status='proposed')
# 2. Изучает код
proposal = proposals[0]
print(proposal.code)
print(f"Читает: {proposal.reads_tables}")
print(f"Пишет: {proposal.writes_tables}")
# 3. Тестирует на тестовых данных
test_result = await test_function_on_staging(proposal.code)
# 4. Утверждает
approved = await approval_service.approve_function(
db,
proposal_id=proposal.id,
operator_id="operator_123",
approval_notes="Проверено, безопасно. Есть логирование изменений."
)
# Функция теперь доступна агенту
# АГЕНТ
# Нужно обновить цены для ФЕЙЕРВЕРК (скидка 15%)
result = await function_executor.execute(
db,
function_name="recalculate_brand_prices",
parameters={
"brand": "ФЕЙЕРВЕРК",
"discount_percent": 15.0
},
executed_by="agent"
)
# Результат:
# {
# 'brand': 'ФЕЙЕРВЕРК',
# 'updated_count': 215,
# 'new_discount': 15.0
# }
# Автоматически создан audit log:
# - function_name: recalculate_brand_prices
# - parameters: {"brand": "ФЕЙЕРВЕРК", "discount_percent": 15.0}
# - rows_affected: 215
# - tables_modified: ["pim_products", "pim_price_history"]
# - executed_by: agent
# - execution_time_ms: 1250
# АГЕНТ
# Получили новый прайс от САЛЮТ, нужен парсер
parser = await parser_factory.create_parser(
db,
brand="САЛЮТ",
parser_type="price_excel",
config={
"article_column": "A",
"name_column": "B",
"price_column": "D",
"start_row": 3,
"characteristics_columns": {
"caliber": "E",
"shots": "F"
}
},
sample_file="/data/price-lists/САЛЮТ/sample.xlsx"
)
# Парсер создан, протестирован и готов к использованию
# БЕЗ утверждения оператора!
# Используем парсер
result = await parser.parse_file("/data/price-lists/САЛЮТ/2024-01-15.xlsx")
# Данные загружены в staging
await staging_service.import_from_parser(db, result.products, brand="САЛЮТ")
# Просмотр предложенных функций
pim operator proposals list
pim operator proposals list --status proposed --criticality critical
# Детали предложения
pim operator proposals show 123
# Утверждение
pim operator proposals approve 123 --notes "Проверено, безопасно"
# Отклонение
pim operator proposals reject 123 --reason "Слишком опасно"
# Просмотр утвержденных функций
pim operator functions list
pim operator functions show recalculate_brand_prices
# Отзыв функции
pim operator functions revoke recalculate_brand_prices --reason "Уязвимость"
# Audit log
pim operator audit-log --function recalculate_brand_prices
pim operator audit-log --date 2024-01-15
pim operator audit-log --executed-by agent
# Статистика
pim operator stats functions
pim operator stats usage --period 7d
# Когда агент предлагает критичную функцию
→ Email оператору: "Новая критичная функция требует утверждения"
→ Slack/Telegram: "🔴 CRITICAL: recalculate_brand_prices"
# Когда функция выполнена
→ Audit log (всегда)
→ Email оператору (для critical операций)
# Когда ошибка в функции
→ Alert: "Функция recalculate_brand_prices завершилась с ошибкой"
→ Auto-revoke (опционально для критичных функций)
# Статистика использования
→ Weekly digest оператору:
- Предложено функций: 3
- Утверждено: 2
- Отклонено: 1
- Выполнено операций: 150
- Ошибок: 2
Разделение прав:
┌──────────────────────┬─────────────────┬──────────────────┐
│ ТИП ОПЕРАЦИИ │ АГЕНТ │ ОПЕРАТОР │
├──────────────────────┼─────────────────┼──────────────────┤
│ Изменение мастер- │ ❌ Запрещено │ ✓ Разрешено │
│ данных напрямую │ │ │
├──────────────────────┼─────────────────┼──────────────────┤
│ Предложение функций │ ✓ Разрешено │ - │
├──────────────────────┼─────────────────┼──────────────────┤
│ Утверждение функций │ ❌ Запрещено │ ✓ Разрешено │
├──────────────────────┼─────────────────┼──────────────────┤
│ Выполнение │ ✓ Только │ ✓ Разрешено │
│ утвержденных функций │ утвержденные │ │
├──────────────────────┼─────────────────┼──────────────────┤
│ Создание парсеров │ ✓ Автоматически │ ✓ Просмотр │
├──────────────────────┼─────────────────┼──────────────────┤
│ Работа со staging │ ✓ Разрешено │ ✓ Разрешено │
├──────────────────────┼─────────────────┼──────────────────┤
│ Чтение данных │ ✓ Разрешено │ ✓ Разрешено │
└──────────────────────┴─────────────────┴──────────────────┘
Ключевые гарантии безопасности:
- ✓ Агент не может изменять данные без утвержденной функции
- ✓ Все операции логируются в audit log
- ✓ Оператор контролирует весь критичный код
- ✓ Парсеры создаются автоматически (работают только со staging)
- ✓ Функции можно отозвать в любой момент
Реализовывать эту систему?