system/scripts/session_miner.py
#!/usr/bin/env python3
"""
Session Miner — глубокий анализ всех сессий Claude Code
Извлекает: незавершённые задачи, важные решения, факты, ошибки

Использование:
    python3 session_miner.py --full       # Полный анализ
    python3 session_miner.py --todos      # Только незавершённые задачи
    python3 session_miner.py --recent 7   # Последние N дней
    python3 session_miner.py --session UUID  # Конкретная сессия
"""

import json
import os
import re
import sys
from pathlib import Path
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Any, Optional

CLAUDE_DIR = Path.home() / '.claude'
PROJECTS_DIR = CLAUDE_DIR / 'projects' / '-opt-claude-workspace'
TODOS_DIR = CLAUDE_DIR / 'todos'
HISTORY_FILE = CLAUDE_DIR / 'history.jsonl'
OUTPUT_DIR = CLAUDE_DIR / 'sessions' / 'mined'


class SessionMiner:
    def __init__(self):
        self.sessions: Dict[str, Dict] = {}
        self.todos: List[Dict] = []
        self.facts: List[Dict] = []
        self.decisions: List[Dict] = []
        self.errors: List[Dict] = []
        self.commands: List[Dict] = []

    def load_history(self) -> None:
        """Загрузить history.jsonl — индекс всех сообщений"""
        if not HISTORY_FILE.exists():
            print("❌ history.jsonl не найден")
            return

        with open(HISTORY_FILE, 'r') as f:
            for line in f:
                try:
                    entry = json.loads(line.strip())
                    sid = entry.get('sessionId', 'unknown')
                    if sid not in self.sessions:
                        self.sessions[sid] = {
                            'id': sid,
                            'messages': [],
                            'first_ts': entry.get('timestamp'),
                            'last_ts': entry.get('timestamp'),
                            'project': entry.get('project', ''),
                        }
                    self.sessions[sid]['messages'].append(entry)
                    self.sessions[sid]['last_ts'] = entry.get('timestamp')
                except json.JSONDecodeError:
                    pass  # Malformed JSON line

        print(f"✅ Загружено {len(self.sessions)} сессий из history.jsonl")

    def load_session_details(self, session_id: str) -> Optional[List[Dict]]:
        """Загрузить полный диалог сессии из .jsonl файла"""
        session_file = PROJECTS_DIR / f"{session_id}.jsonl"
        if not session_file.exists():
            return None

        messages = []
        with open(session_file, 'r') as f:
            for line in f:
                try:
                    messages.append(json.loads(line.strip()))
                except json.JSONDecodeError:
                    pass  # Malformed JSON line
        return messages

    def extract_todos_from_session(self, messages: List[Dict]) -> List[Dict]:
        """Извлечь todos из сообщений сессии"""
        todos = []
        for msg in messages:
            try:
                # Ищем TodoWrite в tool calls
                if msg.get('type') == 'assistant':
                    content = msg.get('message', {}).get('content', [])
                    if isinstance(content, list):
                        for item in content:
                            if isinstance(item, dict) and item.get('type') == 'tool_use' and item.get('name') == 'TodoWrite':
                                input_data = item.get('input', {})
                                todo_list = input_data.get('todos', [])
                                for t in todo_list:
                                    if isinstance(t, dict):
                                        todos.append({
                                            'content': t.get('content', ''),
                                            'status': t.get('status', ''),
                                            'session': msg.get('sessionId', ''),
                                        })
            except Exception:
                pass
        return todos

    def extract_patterns(self, text: str) -> Dict[str, List[str]]:
        """Извлечь паттерны из текста: решения, факты, команды"""
        patterns = {
            'decisions': [],
            'facts': [],
            'commands': [],
            'errors': [],
            'urls': [],
            'files': [],
        }

        # Решения (ПЛАН, ВЫПОЛНЕНО, etc.)
        decision_patterns = [
            r'ПЛАН:\s*\n(.*?)(?=\n\n|\Z)',
            r'ВЫПОЛНЕНО:\s*\n(.*?)(?=\n\n|\Z)',
            r'РЕШЕНИЕ:\s*\n(.*?)(?=\n\n|\Z)',
            r'✅\s+(.+)',
        ]
        for p in decision_patterns:
            patterns['decisions'].extend(re.findall(p, text, re.DOTALL | re.MULTILINE))

        # Факты
        fact_patterns = [
            r'ФАКТ:\s*(.+)',
            r'Важно:\s*(.+)',
            r'Запомнить:\s*(.+)',
        ]
        for p in fact_patterns:
            patterns['facts'].extend(re.findall(p, text, re.IGNORECASE))

        # Bash команды
        patterns['commands'] = re.findall(r'```bash\n(.*?)```', text, re.DOTALL)

        # Ошибки
        patterns['errors'] = re.findall(r'(?:error|ошибка|failed|fail)[:]\s*(.+)', text, re.IGNORECASE)

        # URLs
        patterns['urls'] = re.findall(r'https?://[^\s<>"]+', text)

        # Файлы
        patterns['files'] = re.findall(r'[/\w-]+\.\w{2,4}', text)

        return patterns

    def analyze_session(self, session_id: str) -> Dict:
        """Полный анализ одной сессии"""
        messages = self.load_session_details(session_id)
        if not messages:
            return {}

        result = {
            'id': session_id,
            'message_count': len(messages),
            'todos': [],
            'decisions': [],
            'facts': [],
            'errors': [],
            'commands': [],
            'files_changed': [],
        }

        # Извлечь todos
        result['todos'] = self.extract_todos_from_session(messages)

        # Анализ текста сообщений
        for msg in messages:
            text = ''
            if msg.get('type') == 'human':
                text = msg.get('message', {}).get('content', '')
            elif msg.get('type') == 'assistant':
                content = msg.get('message', {}).get('content', [])
                if isinstance(content, list):
                    for item in content:
                        if item.get('type') == 'text':
                            text += item.get('text', '')

            if text:
                patterns = self.extract_patterns(text)
                result['decisions'].extend(patterns['decisions'])
                result['facts'].extend(patterns['facts'])
                result['errors'].extend(patterns['errors'])
                result['commands'].extend(patterns['commands'])

        return result

    def get_recent_sessions(self, days: int = 7) -> List[str]:
        """Получить ID сессий за последние N дней"""
        cutoff = datetime.now() - timedelta(days=days)
        cutoff_ts = cutoff.timestamp() * 1000

        recent = []
        for sid, data in self.sessions.items():
            if data.get('last_ts', 0) > cutoff_ts:
                recent.append(sid)
        return recent

    def get_incomplete_todos(self) -> List[Dict]:
        """Собрать все незавершённые задачи из всех сессий"""
        incomplete = []

        # Из todos/*.json
        if TODOS_DIR.exists():
            for f in TODOS_DIR.glob('*.json'):
                try:
                    with open(f) as fp:
                        data = json.load(fp)
                        todos = data.get('todos', [])
                        for t in todos:
                            if t.get('status') in ['pending', 'in_progress']:
                                incomplete.append({
                                    'content': t.get('content', ''),
                                    'status': t.get('status'),
                                    'source': f.stem[:8],
                                })
                except (json.JSONDecodeError, KeyError):
                    pass  # Corrupted todo file

        return incomplete

    def mine_all(self, days: int = None) -> Dict:
        """Полный обыск всех сессий"""
        self.load_history()

        if days:
            session_ids = self.get_recent_sessions(days)
        else:
            session_ids = list(self.sessions.keys())

        print(f"📊 Анализ {len(session_ids)} сессий...")

        all_results = {
            'total_sessions': len(session_ids),
            'todos_incomplete': [],
            'all_decisions': [],
            'all_facts': [],
            'all_errors': [],
            'sessions_summary': [],
        }

        # Незавершённые todos
        all_results['todos_incomplete'] = self.get_incomplete_todos()

        # Анализ каждой сессии
        for i, sid in enumerate(session_ids):
            if i % 50 == 0:
                print(f"  Обработано {i}/{len(session_ids)}...")
            result = self.analyze_session(sid)
            if result:
                all_results['all_decisions'].extend(result.get('decisions', []))
                all_results['all_facts'].extend(result.get('facts', []))
                all_results['all_errors'].extend(result.get('errors', []))

                if result.get('todos'):
                    all_results['sessions_summary'].append({
                        'id': sid[:8],
                        'messages': result['message_count'],
                        'todos': len(result['todos']),
                    })

        return all_results

    def generate_report(self, results: Dict) -> str:
        """Сгенерировать отчёт"""
        report = f"""# Отчёт Session Miner
**Дата:** {datetime.now().isoformat()}
**Сессий проанализировано:** {results['total_sessions']}

---

## Незавершённые задачи ({len(results['todos_incomplete'])})

"""
        for t in results['todos_incomplete'][:50]:
            status = "⏳" if t['status'] == 'pending' else "🔄"
            report += f"- {status} [{t['source']}] {t['content']}\n"

        report += f"""

---

## Решения ({len(results['all_decisions'])})

"""
        for d in results['all_decisions'][:30]:
            if len(d) > 10:
                report += f"- {d[:200]}...\n" if len(d) > 200 else f"- {d}\n"

        report += f"""

---

## Ошибки ({len(results['all_errors'])})

"""
        for e in set(results['all_errors'][:30]):
            report += f"- ❌ {e[:150]}\n"

        report += f"""

---

## Факты ({len(results['all_facts'])})

"""
        for f in results['all_facts'][:30]:
            report += f"- 📌 {f}\n"

        return report


def main():
    import argparse
    parser = argparse.ArgumentParser(description='Session Miner')
    parser.add_argument('--full', action='store_true', help='Полный анализ')
    parser.add_argument('--todos', action='store_true', help='Только todos')
    parser.add_argument('--recent', type=int, help='Последние N дней')
    parser.add_argument('--session', type=str, help='Конкретная сессия')
    parser.add_argument('--output', type=str, help='Файл для отчёта')
    args = parser.parse_args()

    miner = SessionMiner()

    if args.todos:
        miner.load_history()
        todos = miner.get_incomplete_todos()
        print(f"\n📋 НЕЗАВЕРШЁННЫЕ ЗАДАЧИ: {len(todos)}\n")
        for t in todos:
            status = "⏳" if t['status'] == 'pending' else "🔄"
            print(f"{status} [{t['source']}] {t['content']}")

    elif args.session:
        result = miner.analyze_session(args.session)
        print(json.dumps(result, indent=2, ensure_ascii=False))

    else:
        days = args.recent if args.recent else None
        results = miner.mine_all(days)
        report = miner.generate_report(results)

        # Сохранить отчёт
        OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
        output_file = args.output or OUTPUT_DIR / f"report_{datetime.now().strftime('%Y%m%d_%H%M')}.md"
        with open(output_file, 'w') as f:
            f.write(report)
        print(f"\n✅ Отчёт сохранён: {output_file}")

        # Краткая сводка
        print(f"\n{'='*60}")
        print(f"СВОДКА:")
        print(f"  Сессий: {results['total_sessions']}")
        print(f"  Незавершённых задач: {len(results['todos_incomplete'])}")
        print(f"  Решений: {len(results['all_decisions'])}")
        print(f"  Ошибок: {len(results['all_errors'])}")
        print(f"  Фактов: {len(results['all_facts'])}")
        print(f"{'='*60}")


if __name__ == '__main__':
    main()