#!/usr/bin/env python3
"""
Session Miner — глубокий анализ всех сессий Claude Code
Извлекает: незавершённые задачи, важные решения, факты, ошибки
Использование:
python3 session_miner.py --full # Полный анализ
python3 session_miner.py --todos # Только незавершённые задачи
python3 session_miner.py --recent 7 # Последние N дней
python3 session_miner.py --session UUID # Конкретная сессия
"""
import json
import os
import re
import sys
from pathlib import Path
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Any, Optional
CLAUDE_DIR = Path.home() / '.claude'
PROJECTS_DIR = CLAUDE_DIR / 'projects' / '-opt-claude-workspace'
TODOS_DIR = CLAUDE_DIR / 'todos'
HISTORY_FILE = CLAUDE_DIR / 'history.jsonl'
OUTPUT_DIR = CLAUDE_DIR / 'sessions' / 'mined'
class SessionMiner:
def __init__(self):
self.sessions: Dict[str, Dict] = {}
self.todos: List[Dict] = []
self.facts: List[Dict] = []
self.decisions: List[Dict] = []
self.errors: List[Dict] = []
self.commands: List[Dict] = []
def load_history(self) -> None:
"""Загрузить history.jsonl — индекс всех сообщений"""
if not HISTORY_FILE.exists():
print("❌ history.jsonl не найден")
return
with open(HISTORY_FILE, 'r') as f:
for line in f:
try:
entry = json.loads(line.strip())
sid = entry.get('sessionId', 'unknown')
if sid not in self.sessions:
self.sessions[sid] = {
'id': sid,
'messages': [],
'first_ts': entry.get('timestamp'),
'last_ts': entry.get('timestamp'),
'project': entry.get('project', ''),
}
self.sessions[sid]['messages'].append(entry)
self.sessions[sid]['last_ts'] = entry.get('timestamp')
except json.JSONDecodeError:
pass # Malformed JSON line
print(f"✅ Загружено {len(self.sessions)} сессий из history.jsonl")
def load_session_details(self, session_id: str) -> Optional[List[Dict]]:
"""Загрузить полный диалог сессии из .jsonl файла"""
session_file = PROJECTS_DIR / f"{session_id}.jsonl"
if not session_file.exists():
return None
messages = []
with open(session_file, 'r') as f:
for line in f:
try:
messages.append(json.loads(line.strip()))
except json.JSONDecodeError:
pass # Malformed JSON line
return messages
def extract_todos_from_session(self, messages: List[Dict]) -> List[Dict]:
"""Извлечь todos из сообщений сессии"""
todos = []
for msg in messages:
try:
# Ищем TodoWrite в tool calls
if msg.get('type') == 'assistant':
content = msg.get('message', {}).get('content', [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get('type') == 'tool_use' and item.get('name') == 'TodoWrite':
input_data = item.get('input', {})
todo_list = input_data.get('todos', [])
for t in todo_list:
if isinstance(t, dict):
todos.append({
'content': t.get('content', ''),
'status': t.get('status', ''),
'session': msg.get('sessionId', ''),
})
except Exception:
pass
return todos
def extract_patterns(self, text: str) -> Dict[str, List[str]]:
"""Извлечь паттерны из текста: решения, факты, команды"""
patterns = {
'decisions': [],
'facts': [],
'commands': [],
'errors': [],
'urls': [],
'files': [],
}
# Решения (ПЛАН, ВЫПОЛНЕНО, etc.)
decision_patterns = [
r'ПЛАН:\s*\n(.*?)(?=\n\n|\Z)',
r'ВЫПОЛНЕНО:\s*\n(.*?)(?=\n\n|\Z)',
r'РЕШЕНИЕ:\s*\n(.*?)(?=\n\n|\Z)',
r'✅\s+(.+)',
]
for p in decision_patterns:
patterns['decisions'].extend(re.findall(p, text, re.DOTALL | re.MULTILINE))
# Факты
fact_patterns = [
r'ФАКТ:\s*(.+)',
r'Важно:\s*(.+)',
r'Запомнить:\s*(.+)',
]
for p in fact_patterns:
patterns['facts'].extend(re.findall(p, text, re.IGNORECASE))
# Bash команды
patterns['commands'] = re.findall(r'```bash\n(.*?)```', text, re.DOTALL)
# Ошибки
patterns['errors'] = re.findall(r'(?:error|ошибка|failed|fail)[:]\s*(.+)', text, re.IGNORECASE)
# URLs
patterns['urls'] = re.findall(r'https?://[^\s<>"]+', text)
# Файлы
patterns['files'] = re.findall(r'[/\w-]+\.\w{2,4}', text)
return patterns
def analyze_session(self, session_id: str) -> Dict:
"""Полный анализ одной сессии"""
messages = self.load_session_details(session_id)
if not messages:
return {}
result = {
'id': session_id,
'message_count': len(messages),
'todos': [],
'decisions': [],
'facts': [],
'errors': [],
'commands': [],
'files_changed': [],
}
# Извлечь todos
result['todos'] = self.extract_todos_from_session(messages)
# Анализ текста сообщений
for msg in messages:
text = ''
if msg.get('type') == 'human':
text = msg.get('message', {}).get('content', '')
elif msg.get('type') == 'assistant':
content = msg.get('message', {}).get('content', [])
if isinstance(content, list):
for item in content:
if item.get('type') == 'text':
text += item.get('text', '')
if text:
patterns = self.extract_patterns(text)
result['decisions'].extend(patterns['decisions'])
result['facts'].extend(patterns['facts'])
result['errors'].extend(patterns['errors'])
result['commands'].extend(patterns['commands'])
return result
def get_recent_sessions(self, days: int = 7) -> List[str]:
"""Получить ID сессий за последние N дней"""
cutoff = datetime.now() - timedelta(days=days)
cutoff_ts = cutoff.timestamp() * 1000
recent = []
for sid, data in self.sessions.items():
if data.get('last_ts', 0) > cutoff_ts:
recent.append(sid)
return recent
def get_incomplete_todos(self) -> List[Dict]:
"""Собрать все незавершённые задачи из всех сессий"""
incomplete = []
# Из todos/*.json
if TODOS_DIR.exists():
for f in TODOS_DIR.glob('*.json'):
try:
with open(f) as fp:
data = json.load(fp)
todos = data.get('todos', [])
for t in todos:
if t.get('status') in ['pending', 'in_progress']:
incomplete.append({
'content': t.get('content', ''),
'status': t.get('status'),
'source': f.stem[:8],
})
except (json.JSONDecodeError, KeyError):
pass # Corrupted todo file
return incomplete
def mine_all(self, days: int = None) -> Dict:
"""Полный обыск всех сессий"""
self.load_history()
if days:
session_ids = self.get_recent_sessions(days)
else:
session_ids = list(self.sessions.keys())
print(f"📊 Анализ {len(session_ids)} сессий...")
all_results = {
'total_sessions': len(session_ids),
'todos_incomplete': [],
'all_decisions': [],
'all_facts': [],
'all_errors': [],
'sessions_summary': [],
}
# Незавершённые todos
all_results['todos_incomplete'] = self.get_incomplete_todos()
# Анализ каждой сессии
for i, sid in enumerate(session_ids):
if i % 50 == 0:
print(f" Обработано {i}/{len(session_ids)}...")
result = self.analyze_session(sid)
if result:
all_results['all_decisions'].extend(result.get('decisions', []))
all_results['all_facts'].extend(result.get('facts', []))
all_results['all_errors'].extend(result.get('errors', []))
if result.get('todos'):
all_results['sessions_summary'].append({
'id': sid[:8],
'messages': result['message_count'],
'todos': len(result['todos']),
})
return all_results
def generate_report(self, results: Dict) -> str:
"""Сгенерировать отчёт"""
report = f"""# Отчёт Session Miner
**Дата:** {datetime.now().isoformat()}
**Сессий проанализировано:** {results['total_sessions']}
---
## Незавершённые задачи ({len(results['todos_incomplete'])})
"""
for t in results['todos_incomplete'][:50]:
status = "⏳" if t['status'] == 'pending' else "🔄"
report += f"- {status} [{t['source']}] {t['content']}\n"
report += f"""
---
## Решения ({len(results['all_decisions'])})
"""
for d in results['all_decisions'][:30]:
if len(d) > 10:
report += f"- {d[:200]}...\n" if len(d) > 200 else f"- {d}\n"
report += f"""
---
## Ошибки ({len(results['all_errors'])})
"""
for e in set(results['all_errors'][:30]):
report += f"- ❌ {e[:150]}\n"
report += f"""
---
## Факты ({len(results['all_facts'])})
"""
for f in results['all_facts'][:30]:
report += f"- 📌 {f}\n"
return report
def main():
import argparse
parser = argparse.ArgumentParser(description='Session Miner')
parser.add_argument('--full', action='store_true', help='Полный анализ')
parser.add_argument('--todos', action='store_true', help='Только todos')
parser.add_argument('--recent', type=int, help='Последние N дней')
parser.add_argument('--session', type=str, help='Конкретная сессия')
parser.add_argument('--output', type=str, help='Файл для отчёта')
args = parser.parse_args()
miner = SessionMiner()
if args.todos:
miner.load_history()
todos = miner.get_incomplete_todos()
print(f"\n📋 НЕЗАВЕРШЁННЫЕ ЗАДАЧИ: {len(todos)}\n")
for t in todos:
status = "⏳" if t['status'] == 'pending' else "🔄"
print(f"{status} [{t['source']}] {t['content']}")
elif args.session:
result = miner.analyze_session(args.session)
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
days = args.recent if args.recent else None
results = miner.mine_all(days)
report = miner.generate_report(results)
# Сохранить отчёт
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_file = args.output or OUTPUT_DIR / f"report_{datetime.now().strftime('%Y%m%d_%H%M')}.md"
with open(output_file, 'w') as f:
f.write(report)
print(f"\n✅ Отчёт сохранён: {output_file}")
# Краткая сводка
print(f"\n{'='*60}")
print(f"СВОДКА:")
print(f" Сессий: {results['total_sessions']}")
print(f" Незавершённых задач: {len(results['todos_incomplete'])}")
print(f" Решений: {len(results['all_decisions'])}")
print(f" Ошибок: {len(results['all_errors'])}")
print(f" Фактов: {len(results['all_facts'])}")
print(f"{'='*60}")
if __name__ == '__main__':
main()