projects/org/pirotehnika/app/pim/SECURITY_APPROVAL_SYSTEM.md

Система безопасности и утверждения функций PIM

1. Принципы безопасности

┌─────────────────────────────────────────────────────────────┐
│  КРИТИЧНЫЕ ПРАВИЛА РАБОТЫ С PIM                             │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  ✓ С PIM работают ТОЛЬКО утвержденные оператором функции    │
│  ✓ Агент НЕ может сам создавать функции без разрешения      │
│  ✓ Агент НЕ может вмешиваться в данные напрямую             │
│  ✓ Агент ТОЛЬКО предлагает новые функции → оператор решает  │
│  ✓ Парсеры создаются агентом АВТОМАТИЧЕСКИ (некритично)     │
│                                                               │
└─────────────────────────────────────────────────────────────┘

2. Разделение функций по уровням критичности

УРОВЕНЬ 1: Критичные операции (требуют утверждения)

Работа с мастер-данными:
- ✓ Создание/обновление товаров в pim_products
- ✓ Удаление товаров
- ✓ Изменение цен (base_price, cost_price)
- ✓ Создание/изменение правил ценообразования
- ✓ Массовое обновление данных

Интеграция с 1С:
- ✓ Импорт из 1С → 1c_products
- ✓ Экспорт в 1С (обновление учетной системы)
- ✓ Синхронизация данных

Обработка данных:
- ✓ Применение правил ценообразования
- ✓ Обогащение товаров
- ✓ Массовый пересчет цен


УРОВЕНЬ 2: Некритичные операции (автоматические)

Парсеры (агент создает сам):
- ✓ Парсинг прайс-листов → staging
- ✓ Парсинг сайтов → staging
- ✓ Скачивание изображений
- ✓ Извлечение данных из файлов

Чтение данных:
- ✓ Получение товаров
- ✓ Поиск товаров
- ✓ Просмотр логов
- ✓ Статистика

Staging операции:
- ✓ Запись в pim_staging_products
- ✓ Валидация staging данных
- ✓ Сопоставление (matching)


3. Workflow утверждения функций

┌─────────────────────────────────────────────────────────────┐
│  ЖИЗНЕННЫЙ ЦИКЛ ФУНКЦИИ                                     │
└─────────────────────────────────────────────────────────────┘

1. ПРЕДЛОЖЕНИЕ АГЕНТОМ
   ├─→ Агент анализирует требования
   ├─→ Генерирует код функции
   ├─→ Создает описание и документацию
   ├─→ Сохраняет в pim_function_proposals
   └─→ Статус: 'proposed'

            ↓

2. РЕВЬЮ ОПЕРАТОРОМ
   ├─→ Оператор просматривает код
   ├─→ Проверяет безопасность
   ├─→ Тестирует на тестовых данных
   └─→ Принимает решение:
            ├─→ APPROVE → переход к шагу 3
            ├─→ REJECT → функция отклонена
            └─→ REQUEST_CHANGES → возврат агенту

            ↓

3. УТВЕРЖДЕНИЕ
   ├─→ Функция добавляется в pim_approved_functions
   ├─→ Код деплоится в production
   ├─→ Функция становится доступной
   └─→ Статус: 'approved'

            ↓

4. ИСПОЛЬЗОВАНИЕ
   ├─→ Агент может вызывать функцию
   ├─→ Функция выполняется с логированием
   ├─→ Все действия записываются в audit log
   └─→ Оператор видит все операции

            ↓

5. ОТЗЫВ (опционально)
   ├─→ Оператор может отозвать утверждение
   ├─→ Функция становится недоступной
   └─→ Статус: 'revoked'

4. Таблицы БД для системы безопасности

pim_function_proposals

class PimFunctionProposal(Base):
    """Предложенные агентом функции (ожидают утверждения)"""
    __tablename__ = "pim_function_proposals"

    id = Column(Integer, primary_key=True)

    # Идентификация
    function_name = Column(String(100), nullable=False, unique=True)
    category = Column(String(50))  # 'product', 'pricing', 'integration', 'enrichment'
    criticality = Column(String(20))  # 'critical', 'moderate', 'low'

    # Описание
    description = Column(Text, nullable=False)
    purpose = Column(Text)  # зачем нужна функция
    use_cases = Column(JSON)  # примеры использования

    # Код
    code = Column(Text, nullable=False)  # Python код функции
    code_hash = Column(String(64))  # SHA256 для контроля изменений
    dependencies = Column(JSON)  # список зависимостей

    # Параметры
    parameters_schema = Column(JSON)  # JSON Schema параметров
    return_schema = Column(JSON)  # JSON Schema возвращаемых данных

    # Безопасность
    reads_tables = Column(JSON)  # какие таблицы читает
    writes_tables = Column(JSON)  # какие таблицы модифицирует
    external_calls = Column(Boolean, default=False)  # делает ли внешние вызовы

    # Предложение
    proposed_by = Column(String(50), default='agent')
    proposed_at = Column(DateTime, default=datetime.utcnow)
    proposal_reason = Column(Text)

    # Статус утверждения
    status = Column(String(20), default='proposed')
    # 'proposed', 'under_review', 'approved', 'rejected', 'changes_requested'

    # Ревью
    reviewed_by = Column(String(100))  # оператор
    reviewed_at = Column(DateTime)
    review_notes = Column(Text)
    rejection_reason = Column(Text)

    # Тестирование
    test_results = Column(JSON)
    test_passed = Column(Boolean)

    # Метаданные
    version = Column(Integer, default=1)
    updated_at = Column(DateTime, onupdate=datetime.utcnow)

pim_approved_functions

class PimApprovedFunction(Base):
    """Утвержденные функции (разрешены к использованию)"""
    __tablename__ = "pim_approved_functions"

    id = Column(Integer, primary_key=True)
    proposal_id = Column(Integer, ForeignKey('pim_function_proposals.id'))

    # Идентификация
    function_name = Column(String(100), nullable=False, unique=True)
    category = Column(String(50))
    criticality = Column(String(20))

    # Код (FROZEN - не меняется после утверждения)
    code = Column(Text, nullable=False)
    code_hash = Column(String(64))

    # Параметры
    parameters_schema = Column(JSON)
    return_schema = Column(JSON)

    # Утверждение
    approved_by = Column(String(100), nullable=False)  # оператор
    approved_at = Column(DateTime, default=datetime.utcnow)
    approval_notes = Column(Text)

    # Статус
    is_active = Column(Boolean, default=True)
    revoked_at = Column(DateTime)
    revoked_by = Column(String(100))
    revoke_reason = Column(Text)

    # Использование
    usage_count = Column(Integer, default=0)
    last_used_at = Column(DateTime)

    # Версионирование
    version = Column(Integer, default=1)

pim_function_execution_log

class PimFunctionExecutionLog(Base):
    """Audit log всех выполнений функций"""
    __tablename__ = "pim_function_execution_log"

    id = Column(Integer, primary_key=True)
    function_id = Column(Integer, ForeignKey('pim_approved_functions.id'))
    function_name = Column(String(100), nullable=False)

    # Контекст выполнения
    executed_by = Column(String(100))  # 'agent' или user ID
    executed_at = Column(DateTime, default=datetime.utcnow)

    # Входные данные
    parameters = Column(JSON)  # переданные параметры

    # Результат
    status = Column(String(20))  # 'success', 'error', 'warning'
    result = Column(JSON)  # результат выполнения
    error_message = Column(Text)  # если ошибка

    # Влияние на данные
    rows_affected = Column(Integer)  # сколько записей изменено
    tables_modified = Column(JSON)  # какие таблицы изменены

    # Производительность
    execution_time_ms = Column(Integer)

    # Метаданные
    request_id = Column(String(100))  # для трейсинга

pim_parser_registry

class PimParserRegistry(Base):
    """Реестр автоматически созданных парсеров"""
    __tablename__ = "pim_parser_registry"

    id = Column(Integer, primary_key=True)

    # Идентификация
    parser_name = Column(String(100), nullable=False, unique=True)
    brand = Column(String(100), nullable=False)
    parser_type = Column(String(50))  # 'price_excel', 'price_csv', 'site'

    # Конфигурация
    config = Column(JSON, nullable=False)  # mapping колонок и т.д.

    # Код (генерируется агентом)
    code = Column(Text, nullable=False)
    code_hash = Column(String(64))

    # Создание
    created_by = Column(String(50), default='agent')
    created_at = Column(DateTime, default=datetime.utcnow)

    # Статус
    is_active = Column(Boolean, default=True)
    last_used_at = Column(DateTime)
    usage_count = Column(Integer, default=0)

    # Производительность
    avg_parse_time_ms = Column(Integer)
    success_rate = Column(Numeric(5, 2))  # процент успешных парсингов
    last_error = Column(Text)

    # Версия
    version = Column(Integer, default=1)

5. API для работы с утверждениями

Endpoints для АГЕНТА

# Предложение новой функции
POST   /api/v1/functions/propose
{
    "function_name": "recalculate_brand_prices",
    "category": "pricing",
    "criticality": "critical",
    "description": "Пересчитать цены для всех товаров бренда",
    "code": "async def recalculate_brand_prices(...)...",
    "reads_tables": ["pim_products", "pim_cost_rules"],
    "writes_tables": ["pim_products", "pim_price_history"],
    "purpose": "Массовое обновление цен при изменении правил",
    "use_cases": [
        "Поставщик изменил скидку с 10% на 15%",
        "Нужно пересчитать 200 товаров"
    ]
}

# Получить список утвержденных функций (доступных агенту)
GET    /api/v1/functions/approved

# Выполнить утвержденную функцию
POST   /api/v1/functions/execute/{function_name}
{
    "parameters": {
        "brand": "ФЕЙЕРВЕРК",
        "discount_percent": 15.0
    }
}

# Создать парсер (автоматически, без утверждения)
POST   /api/v1/parsers/create
{
    "parser_name": "salut_excel_parser",
    "brand": "САЛЮТ",
    "parser_type": "price_excel",
    "config": {
        "article_column": "A",
        "name_column": "B",
        "price_column": "C"
    },
    "code": "class SalutExcelParser(BaseParser)..."
}

Endpoints для ОПЕРАТОРА

# Получить список предложенных функций
GET    /api/v1/operator/proposals
GET    /api/v1/operator/proposals?status=proposed
GET    /api/v1/operator/proposals?criticality=critical

# Получить детали предложения
GET    /api/v1/operator/proposals/{id}

# Утвердить функцию
POST   /api/v1/operator/proposals/{id}/approve
{
    "approval_notes": "Проверено, безопасно",
    "test_results": {...}
}

# Отклонить функцию
POST   /api/v1/operator/proposals/{id}/reject
{
    "rejection_reason": "Слишком опасная операция без дополнительных проверок"
}

# Запросить изменения
POST   /api/v1/operator/proposals/{id}/request_changes
{
    "changes_requested": "Добавить валидацию параметров и проверку прав"
}

# Отозвать утвержденную функцию
POST   /api/v1/operator/functions/{id}/revoke
{
    "revoke_reason": "Обнаружена уязвимость"
}

# Просмотр audit log
GET    /api/v1/operator/audit-log
GET    /api/v1/operator/audit-log?function_name=recalculate_brand_prices
GET    /api/v1/operator/audit-log?date_from=2024-01-01

# Статистика использования функций
GET    /api/v1/operator/stats/functions

6. Сервисы для системы безопасности

ApprovalService

class ApprovalService:
    """Сервис управления утверждением функций"""

    async def propose_function(
        self,
        db: AsyncSession,
        function_name: str,
        code: str,
        description: str,
        category: str,
        criticality: str,
        **metadata
    ) -> PimFunctionProposal:
        """
        Агент предлагает новую функцию

        1. Валидирует код (синтаксис)
        2. Анализирует безопасность (SQL injection, etc)
        3. Извлекает зависимости
        4. Создает запись в proposals
        5. Уведомляет оператора
        """

    async def approve_function(
        self,
        db: AsyncSession,
        proposal_id: int,
        operator_id: str,
        approval_notes: str
    ) -> PimApprovedFunction:
        """
        Оператор утверждает функцию

        1. Обновляет статус proposal
        2. Создает approved_function
        3. Деплоит код (добавляет в runtime)
        4. Логирует утверждение
        """

    async def reject_function(
        self,
        db: AsyncSession,
        proposal_id: int,
        operator_id: str,
        rejection_reason: str
    ):
        """Оператор отклоняет функцию"""

    async def revoke_function(
        self,
        db: AsyncSession,
        function_id: int,
        operator_id: str,
        revoke_reason: str
    ):
        """Оператор отзывает утвержденную функцию"""

    async def get_approved_functions(
        self,
        db: AsyncSession,
        category: str = None,
        is_active: bool = True
    ) -> List[PimApprovedFunction]:
        """Получить список утвержденных функций"""

FunctionExecutor

class FunctionExecutor:
    """Безопасное выполнение утвержденных функций"""

    async def execute(
        self,
        db: AsyncSession,
        function_name: str,
        parameters: dict,
        executed_by: str = 'agent'
    ) -> ExecutionResult:
        """
        Выполнить утвержденную функцию

        1. Проверяет, что функция утверждена и активна
        2. Валидирует параметры по schema
        3. Выполняет код в изолированном контексте
        4. Логирует выполнение
        5. Возвращает результат

        КРИТИЧНО: Только утвержденные функции!
        """

        # 1. Проверка утверждения
        function = await self._get_approved_function(db, function_name)
        if not function or not function.is_active:
            raise FunctionNotApprovedError(f"Function {function_name} not approved")

        # 2. Валидация параметров
        self._validate_parameters(parameters, function.parameters_schema)

        # 3. Создание audit log entry
        log_entry = PimFunctionExecutionLog(
            function_id=function.id,
            function_name=function_name,
            executed_by=executed_by,
            parameters=parameters
        )

        # 4. Выполнение с контролем времени
        start_time = time.time()
        try:
            # Динамическая загрузка кода
            func = self._load_function_code(function.code)

            # Выполнение
            result = await func(db, **parameters)

            # Успех
            log_entry.status = 'success'
            log_entry.result = result
            log_entry.execution_time_ms = int((time.time() - start_time) * 1000)

            # Обновление счетчиков
            function.usage_count += 1
            function.last_used_at = datetime.utcnow()

        except Exception as e:
            # Ошибка
            log_entry.status = 'error'
            log_entry.error_message = str(e)
            log_entry.execution_time_ms = int((time.time() - start_time) * 1000)
            raise

        finally:
            db.add(log_entry)
            await db.commit()

        return ExecutionResult(
            success=True,
            result=result,
            execution_time_ms=log_entry.execution_time_ms
        )

ParserFactory (автоматический)

class ParserFactory:
    """Автоматическое создание парсеров агентом"""

    async def create_parser(
        self,
        db: AsyncSession,
        brand: str,
        parser_type: str,
        config: dict,
        sample_file: str = None
    ) -> PimParserRegistry:
        """
        Агент автоматически создает парсер

        1. Анализирует sample_file (если есть)
        2. Генерирует код парсера на основе config
        3. Тестирует на примере
        4. Сохраняет в registry
        5. Возвращает готовый парсер

        ВАЖНО: Парсеры НЕ требуют утверждения оператора!
        Они работают только со staging, не с мастер-данными.
        """

        # Генерация кода
        if parser_type == 'price_excel':
            code = await self._generate_excel_parser(brand, config, sample_file)
        elif parser_type == 'price_csv':
            code = await self._generate_csv_parser(brand, config, sample_file)
        elif parser_type == 'site':
            code = await self._generate_site_parser(brand, config)
        else:
            raise ValueError(f"Unknown parser type: {parser_type}")

        # Тестирование
        if sample_file:
            test_result = await self._test_parser(code, sample_file)
            if not test_result.success:
                raise ParserTestError(f"Parser test failed: {test_result.error}")

        # Сохранение в registry
        parser = PimParserRegistry(
            parser_name=f"{brand.lower()}_{parser_type}_parser",
            brand=brand,
            parser_type=parser_type,
            config=config,
            code=code,
            code_hash=hashlib.sha256(code.encode()).hexdigest(),
            created_by='agent'
        )

        db.add(parser)
        await db.commit()

        return parser

    async def _generate_excel_parser(
        self,
        brand: str,
        config: dict,
        sample_file: str
    ) -> str:
        """
        Генерирует код парсера Excel на основе конфигурации

        config = {
            'article_column': 'A',
            'name_column': 'B',
            'price_column': 'C',
            'start_row': 5
        }
        """

        template = f'''
import pandas as pd
from typing import List, Dict

class {brand.capitalize()}ExcelParser(BaseParser):
    """Автоматически созданный парсер для бренда {brand}"""

    async def parse_file(self, file_path: str) -> ParseResult:
        df = pd.read_excel(
            file_path,
            skiprows={config.get('start_row', 1) - 1}
        )

        products = []
        for _, row in df.iterrows():
            product = {{
                'article': str(row['{config['article_column']}']).strip(),
                'name': str(row['{config['name_column']}']).strip(),
                'base_price': float(row['{config['price_column']}']),
                'brand': '{brand}'
            }}

            # Характеристики (если есть)
            characteristics = {{}}
            {self._generate_characteristics_extraction(config)}

            if characteristics:
                product['characteristics'] = characteristics

            products.append(product)

        return ParseResult(
            success=True,
            products=products,
            count=len(products)
        )
'''

        return template

7. Пример workflow

Сценарий 1: Агент предлагает новую функцию

# АГЕНТ
# Агент понимает, что нужна функция массового обновления цен

proposal = await approval_service.propose_function(
    db,
    function_name="recalculate_brand_prices",
    category="pricing",
    criticality="critical",
    description="Массовый пересчет цен для товаров бренда",
    code="""
async def recalculate_brand_prices(
    db: AsyncSession,
    brand: str,
    discount_percent: float
) -> Dict:
    # 1. Получить правило
    rule = await pricing_service.get_pricing_rule(db, brand)

    # 2. Обновить процент скидки
    if rule:
        rule.discount_percent = discount_percent
    else:
        rule = await pricing_service.create_pricing_rule(db, brand, discount_percent)

    # 3. Получить все товары бренда
    products = await product_service.list_products(db, filters={'brand': brand})

    # 4. Пересчитать цены
    updated_count = 0
    for product in products:
        old_price = product.cost_price
        new_price = await pricing_service.calculate_cost_price(
            db, product.article, product.base_price, brand
        )

        product.cost_price = new_price
        updated_count += 1

        # Логировать изменение
        await price_history.log_change(
            db, product.article, old_price, new_price, source='mass_update'
        )

    await db.commit()

    return {
        'brand': brand,
        'updated_count': updated_count,
        'new_discount': discount_percent
    }
""",
    reads_tables=["pim_products", "pim_cost_rules"],
    writes_tables=["pim_products", "pim_cost_rules", "pim_price_history"],
    purpose="Массовое обновление при изменении правил поставщика",
    use_cases=[
        "Поставщик ФЕЙЕРВЕРК увеличил скидку с 10% до 15%",
        "Нужно обновить 200+ товаров одной операцией"
    ]
)

# Функция сохранена, статус: 'proposed'
# Оператор получает уведомление

Сценарий 2: Оператор утверждает функцию

# ОПЕРАТОР

# 1. Просматривает список предложений
proposals = await approval_service.get_proposals(db, status='proposed')

# 2. Изучает код
proposal = proposals[0]
print(proposal.code)
print(f"Читает: {proposal.reads_tables}")
print(f"Пишет: {proposal.writes_tables}")

# 3. Тестирует на тестовых данных
test_result = await test_function_on_staging(proposal.code)

# 4. Утверждает
approved = await approval_service.approve_function(
    db,
    proposal_id=proposal.id,
    operator_id="operator_123",
    approval_notes="Проверено, безопасно. Есть логирование изменений."
)

# Функция теперь доступна агенту

Сценарий 3: Агент использует утвержденную функцию

# АГЕНТ

# Нужно обновить цены для ФЕЙЕРВЕРК (скидка 15%)
result = await function_executor.execute(
    db,
    function_name="recalculate_brand_prices",
    parameters={
        "brand": "ФЕЙЕРВЕРК",
        "discount_percent": 15.0
    },
    executed_by="agent"
)

# Результат:
# {
#     'brand': 'ФЕЙЕРВЕРК',
#     'updated_count': 215,
#     'new_discount': 15.0
# }

# Автоматически создан audit log:
# - function_name: recalculate_brand_prices
# - parameters: {"brand": "ФЕЙЕРВЕРК", "discount_percent": 15.0}
# - rows_affected: 215
# - tables_modified: ["pim_products", "pim_price_history"]
# - executed_by: agent
# - execution_time_ms: 1250

Сценарий 4: Агент создает парсер (автоматически)

# АГЕНТ

# Получили новый прайс от САЛЮТ, нужен парсер
parser = await parser_factory.create_parser(
    db,
    brand="САЛЮТ",
    parser_type="price_excel",
    config={
        "article_column": "A",
        "name_column": "B",
        "price_column": "D",
        "start_row": 3,
        "characteristics_columns": {
            "caliber": "E",
            "shots": "F"
        }
    },
    sample_file="/data/price-lists/САЛЮТ/sample.xlsx"
)

# Парсер создан, протестирован и готов к использованию
# БЕЗ утверждения оператора!

# Используем парсер
result = await parser.parse_file("/data/price-lists/САЛЮТ/2024-01-15.xlsx")

# Данные загружены в staging
await staging_service.import_from_parser(db, result.products, brand="САЛЮТ")

8. CLI команды для оператора

# Просмотр предложенных функций
pim operator proposals list
pim operator proposals list --status proposed --criticality critical

# Детали предложения
pim operator proposals show 123

# Утверждение
pim operator proposals approve 123 --notes "Проверено, безопасно"

# Отклонение
pim operator proposals reject 123 --reason "Слишком опасно"

# Просмотр утвержденных функций
pim operator functions list
pim operator functions show recalculate_brand_prices

# Отзыв функции
pim operator functions revoke recalculate_brand_prices --reason "Уязвимость"

# Audit log
pim operator audit-log --function recalculate_brand_prices
pim operator audit-log --date 2024-01-15
pim operator audit-log --executed-by agent

# Статистика
pim operator stats functions
pim operator stats usage --period 7d

9. Уведомления и алерты

# Когда агент предлагает критичную функцию
 Email оператору: "Новая критичная функция требует утверждения"
 Slack/Telegram: "🔴 CRITICAL: recalculate_brand_prices"

# Когда функция выполнена
 Audit log (всегда)
 Email оператору (для critical операций)

# Когда ошибка в функции
 Alert: "Функция recalculate_brand_prices завершилась с ошибкой"
 Auto-revoke (опционально для критичных функций)

# Статистика использования
 Weekly digest оператору:
  - Предложено функций: 3
  - Утверждено: 2
  - Отклонено: 1
  - Выполнено операций: 150
  - Ошибок: 2

Резюме

Разделение прав:

┌──────────────────────┬─────────────────┬──────────────────┐
│ ТИП ОПЕРАЦИИ         │ АГЕНТ           │ ОПЕРАТОР         │
├──────────────────────┼─────────────────┼──────────────────┤
│ Изменение мастер-    │ ❌ Запрещено    │ ✓ Разрешено      │
│ данных напрямую      │                 │                  │
├──────────────────────┼─────────────────┼──────────────────┤
│ Предложение функций  │ ✓ Разрешено     │ -                │
├──────────────────────┼─────────────────┼──────────────────┤
│ Утверждение функций  │ ❌ Запрещено    │ ✓ Разрешено      │
├──────────────────────┼─────────────────┼──────────────────┤
│ Выполнение           │ ✓ Только        │ ✓ Разрешено      │
│ утвержденных функций │   утвержденные  │                  │
├──────────────────────┼─────────────────┼──────────────────┤
│ Создание парсеров    │ ✓ Автоматически │ ✓ Просмотр       │
├──────────────────────┼─────────────────┼──────────────────┤
│ Работа со staging    │ ✓ Разрешено     │ ✓ Разрешено      │
├──────────────────────┼─────────────────┼──────────────────┤
│ Чтение данных        │ ✓ Разрешено     │ ✓ Разрешено      │
└──────────────────────┴─────────────────┴──────────────────┘

Ключевые гарантии безопасности:
- ✓ Агент не может изменять данные без утвержденной функции
- ✓ Все операции логируются в audit log
- ✓ Оператор контролирует весь критичный код
- ✓ Парсеры создаются автоматически (работают только со staging)
- ✓ Функции можно отозвать в любой момент

Реализовывать эту систему?