#!/usr/bin/env python3
"""
Sessions — анализ и поиск сессий Claude Code
DEPRECATED: Используйте library.services.session.SessionManager
Команды:
python3 sessions.py list # Список сессий
python3 sessions.py list pirotehnika # Фильтр по проекту
python3 sessions.py search ozon # Поиск по слову
python3 sessions.py analyze 3 # Анализ сессии #3
python3 sessions.py resume магазин # Подключиться к сессии
Новый способ:
python3 -m library.services.session list
python3 -m library.services.session search ozon
python3 -m library.services.session resume магазин
"""
# Делегируем в SessionManager
try:
from library.services.session.cli import main as session_main
USE_NEW_MANAGER = True
except ImportError:
USE_NEW_MANAGER = False
import json
import re
import sys
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from collections import Counter
# Пути
CLAUDE_DIR = Path.home() / '.claude'
HISTORY_FILE = CLAUDE_DIR / 'history.jsonl'
PROJECTS_DIR = CLAUDE_DIR / 'projects' / '-opt-claude-workspace'
TODOS_DIR = CLAUDE_DIR / 'todos'
INDEX_FILE = CLAUDE_DIR / 'sessions_index.json'
# Настройки
TOPIC_PAUSE_MINUTES = 20 # Пауза для определения новой темы
DAYS_DEFAULT = 7
def ts_to_dt(ts: int) -> datetime:
"""Timestamp (ms) -> datetime"""
return datetime.fromtimestamp(ts / 1000)
def extract_keywords(text: str, top_n: int = 5) -> List[str]:
"""Извлечь ключевые слова из текста"""
# Убрать служебные слова
stop_words = {
'и', 'в', 'на', 'с', 'по', 'для', 'что', 'как', 'это', 'не', 'да', 'нет',
'а', 'но', 'или', 'то', 'же', 'бы', 'ли', 'вот', 'все', 'так', 'его', 'ещё',
'the', 'a', 'an', 'is', 'are', 'to', 'of', 'and', 'in', 'for', 'on', 'with',
'файл', 'сделай', 'покажи', 'давай', 'нужно', 'можно', 'хочу', 'есть',
'надо', 'еще', 'ещё', 'если', 'тогда', 'потом', 'сначала', 'теперь',
'когда', 'где', 'кто', 'почему', 'зачем', 'какой', 'который',
'быть', 'этот', 'тот', 'свой', 'весь', 'такой', 'сам', 'наш', 'ваш',
'очень', 'только', 'уже', 'там', 'тут', 'здесь', 'сюда', 'туда',
}
# Слова 3+ символа
words = re.findall(r'[а-яёa-z0-9_-]{3,}', text.lower())
words = [w for w in words if w not in stop_words]
# Частотность
counter = Counter(words)
return [w for w, _ in counter.most_common(top_n)]
def is_skip_message(text: str) -> bool:
"""Проверить, нужно ли пропустить сообщение при анализе"""
text_lower = text.lower().strip()
# Слишком короткие
if len(text) < 3:
return True
# Команды режима
if text_lower.startswith('режим '):
return True
# Короткие ответы
skip_phrases = [
'да', 'нет', 'ок', 'хорошо', 'ага', 'угу', 'норм', 'ясно', 'понял',
'1', '2', '3', '4', '5', '1 2 3', '1 2', '2 3',
'делай', 'давай', 'го', 'окей', 'good', 'yes', 'no', 'ok',
]
if text_lower in skip_phrases:
return True
# Системные
if text.startswith('This session is being continued'):
return True
if text.startswith('/'):
return True
return False
def clean_task_text(text: str) -> str:
"""Очистить текст задачи для отображения"""
# Убрать переносы
text = text.replace('\n', ' ').strip()
# Убрать множественные пробелы
while ' ' in text:
text = text.replace(' ', ' ')
# Первая буква заглавная
if text:
text = text[0].upper() + text[1:]
return text
def make_summary(messages: List[Dict], files: List[str] = None) -> str:
"""Создать осмысленное описание сессии"""
# Собрать осмысленные сообщения
meaningful_msgs = []
for m in messages[:20]:
text = m.get('display', '').strip()
if is_skip_message(text):
continue
if len(text) > 10: # Достаточно длинное
meaningful_msgs.append(text)
if not meaningful_msgs:
# Fallback на файлы
if files:
# Определить тему по файлам
file_hints = []
for f in files[:5]:
if 'sessions' in f.lower():
file_hints.append('сессии')
elif 'ozon' in f.lower():
file_hints.append('ozon')
elif 'nocodb' in f.lower():
file_hints.append('nocodb')
elif 'CLAUDE.md' in f:
file_hints.append('CLAUDE.md')
if file_hints:
return ', '.join(list(set(file_hints))[:3])
return 'без описания'
# Взять первое осмысленное сообщение
first = meaningful_msgs[0]
# Очистить
first = clean_task_text(first)
# Ограничить длину
if len(first) > 45:
first = first[:45] + '...'
return first
def classify_project_by_files(files: List[str]) -> Optional[str]:
"""Определить проект по файлам"""
if not files:
return None
# Счётчики
counts = {
'pirotehnika': 0,
'lider': 0,
'platform': 0,
'infra': 0,
}
for f in files:
f_lower = f.lower()
if '/pirotehnika/' in f_lower or '/mp1' in f_lower:
counts['pirotehnika'] += 1
elif '/lider' in f_lower:
counts['lider'] += 1
elif '/system/' in f_lower or '/architect/' in f_lower or 'claude.md' in f_lower or 'sessions' in f_lower:
counts['platform'] += 1
elif '/infra/' in f_lower:
counts['infra'] += 1
# Вернуть максимальный
if max(counts.values()) > 0:
return max(counts, key=counts.get)
return None
def classify_project(text: str, files: List[str] = None) -> str:
"""Определить проект по тексту и файлам"""
# Сначала по файлам — более точно
if files:
proj = classify_project_by_files(files)
if proj:
return proj
# По тексту
text = text.lower()
# platform — проверяем первым, т.к. сессии/sessions важнее
if any(x in text for x in ['sessions', 'сессии', 'сессия', 'восстановлен', 'session']):
return 'platform'
if any(x in text for x in ['platform', 'платформ', 'architect', 'архитектор', 'claude.md', 'agent', 'агент', 'стандарт', 'тикет']):
return 'platform'
if any(x in text for x in ['pirotehnika', 'пиротехник', 'ozon', 'озон', 'фейерверк', 'pim', '1c', '1с', 'nocodb', 'товар', 'прайс', 'mp1']):
return 'pirotehnika'
elif any(x in text for x in ['lider', 'лидер', 'cs-cart', 'авто', 'запчаст']):
return 'lider'
elif any(x in text for x in ['marketplace', 'маркетплейс', 'streamlit', 'mvp']):
return 'marketplace'
elif any(x in text for x in ['infra', 'сервер', 'backup', 'nginx', 'docker', 'deploy', 'hub']):
return 'infra'
else:
return 'workspace'
def load_index() -> Dict:
"""Загрузить индекс статусов сессий"""
if INDEX_FILE.exists():
try:
with open(INDEX_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
pass
return {'sessions': {}, 'updated': None}
def save_index(index: Dict):
"""Сохранить индекс"""
index['updated'] = datetime.now().isoformat()
with open(INDEX_FILE, 'w') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
def load_all_sessions() -> Dict[str, Dict]:
"""Загрузить все сессии из history.jsonl"""
sessions = {}
index = load_index()
if not HISTORY_FILE.exists():
return sessions
with open(HISTORY_FILE, 'r') as f:
for line in f:
try:
entry = json.loads(line.strip())
sid = entry.get('sessionId')
if not sid:
continue
if sid not in sessions:
sessions[sid] = {
'id': sid,
'messages': [],
'first_ts': entry.get('timestamp'),
'last_ts': entry.get('timestamp'),
}
sessions[sid]['messages'].append(entry)
sessions[sid]['last_ts'] = entry.get('timestamp')
except (KeyError, TypeError, json.JSONDecodeError):
pass
# Обогатить данными
for sid, data in sessions.items():
all_text = ' '.join([m.get('display', '') for m in data['messages']])
# Получить файлы из деталей сессии (быстрая версия)
files = get_session_files_quick(sid)
data['project'] = classify_project(all_text, files)
data['keywords'] = extract_keywords(all_text)
data['summary'] = make_summary(data['messages'], files)
data['msg_count'] = len(data['messages'])
data['first_dt'] = ts_to_dt(data['first_ts'])
data['last_dt'] = ts_to_dt(data['last_ts'])
data['files'] = files
# Статус из индекса
if sid in index['sessions']:
data['status'] = index['sessions'][sid].get('status', 'open')
else:
data['status'] = 'open'
return sessions
def get_session_files_quick(session_id: str) -> List[str]:
"""Быстро получить список файлов сессии"""
session_file = PROJECTS_DIR / f"{session_id}.jsonl"
if not session_file.exists():
return []
files = set()
try:
with open(session_file, 'r') as f:
for line in f:
# Ищем file_path в строке без полного парсинга
if '"file_path"' in line:
try:
d = json.loads(line)
content = d.get('message', {}).get('content', [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
fp = item.get('input', {}).get('file_path', '')
if fp:
files.add(fp)
except (json.JSONDecodeError, KeyError, TypeError):
pass
if len(files) > 20: # Достаточно для определения
break
except (FileNotFoundError, IOError):
pass
return list(files)
def get_session_details(session_id: str) -> Optional[Dict]:
"""Загрузить детали сессии из файла проекта"""
session_file = PROJECTS_DIR / f"{session_id}.jsonl"
if not session_file.exists():
return None
messages = []
with open(session_file, 'r') as f:
for line in f:
try:
messages.append(json.loads(line.strip()))
except json.JSONDecodeError:
pass
result = {
'files': set(),
'commands': [],
'human_messages': [],
}
for msg in messages:
msg_type = msg.get('type')
if msg_type == 'user':
content = msg.get('message', {}).get('content', '')
if isinstance(content, str) and content.strip():
result['human_messages'].append(content[:300])
elif isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get('type') == 'text':
text = item.get('text', '')
if text.strip():
result['human_messages'].append(text[:300])
elif msg_type == 'assistant':
content = msg.get('message', {}).get('content', [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
name = item.get('name', '')
inp = item.get('input', {})
if name in ['Read', 'Write', 'Edit']:
fp = inp.get('file_path', '')
if fp:
result['files'].add(fp)
elif name == 'Bash':
cmd = inp.get('command', '')
if cmd:
result['commands'].append(cmd[:80])
result['files'] = list(result['files'])
return result
def is_task_request(text: str) -> bool:
"""Проверить, является ли текст запросом на задачу"""
text_lower = text.lower()
# Слишком короткие — не задачи
if len(text) < 5:
return False
# Команды режима — не задачи
if text_lower.startswith(('режим ', '/')) or text_lower in ('да', 'нет', 'ок', 'хорошо', 'ага', 'угу', '1', '2', '3'):
return False
# Системные сообщения
if text.startswith('This session is being continued'):
return False
# Глаголы действия
action_verbs = [
'сделай', 'создай', 'добавь', 'удали', 'исправь', 'поправь', 'настрой',
'запусти', 'проверь', 'покажи', 'найди', 'напиши', 'обнови', 'измени',
'реализуй', 'импортируй', 'экспортируй', 'загрузи', 'выгрузи', 'синхронизируй',
'протестируй', 'отладь', 'оптимизируй', 'рефактори', 'разберись', 'продолж',
'do', 'create', 'add', 'delete', 'fix', 'run', 'check', 'show', 'find', 'write',
]
for verb in action_verbs:
if verb in text_lower:
return True
# Вопросы тоже могут быть задачами
if '?' in text and len(text) > 20:
return True
# Достаточно длинный текст — скорее всего задача
if len(text) > 30:
return True
return False
def extract_task_name(text: str) -> str:
"""Извлечь краткое название задачи из текста"""
# Убрать переносы строк
text = text.replace('\n', ' ').strip()
# Ограничить длину
if len(text) > 50:
text = text[:50] + '...'
return text
def analyze_session_tasks(session_id: str) -> Optional[Dict]:
"""Глубокий анализ задач сессии"""
session_file = PROJECTS_DIR / f"{session_id}.jsonl"
if not session_file.exists():
return None
# Загрузить все сообщения
messages = []
with open(session_file, 'r') as f:
for line in f:
try:
msg = json.loads(line.strip())
if msg.get('type') in ('user', 'assistant'):
messages.append(msg)
except json.JSONDecodeError:
pass # Malformed JSON line, skip
if not messages:
return None
# Собрать задачи
tasks = []
current_task = None
for msg in messages:
msg_type = msg.get('type')
timestamp = msg.get('timestamp', '')
if msg_type == 'user':
content = msg.get('message', {}).get('content', '')
if isinstance(content, str) and content.strip():
text = content.strip()
# Если это запрос на задачу — начать новую задачу
if is_task_request(text):
# Завершить предыдущую задачу
if current_task:
tasks.append(current_task)
current_task = {
'name': extract_task_name(text),
'full_text': text[:300],
'timestamp': timestamp,
'files_written': [],
'files_read': [],
'commands': [],
'has_error': False,
'result': 'unknown',
}
elif msg_type == 'assistant' and current_task:
content = msg.get('message', {}).get('content', [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
tool_name = item.get('name', '')
tool_input = item.get('input', {})
if tool_name == 'Read':
fp = tool_input.get('file_path', '')
if fp and fp not in current_task['files_read']:
current_task['files_read'].append(fp)
elif tool_name in ('Write', 'Edit'):
fp = tool_input.get('file_path', '')
if fp and fp not in current_task['files_written']:
current_task['files_written'].append(fp)
elif tool_name == 'Bash':
cmd = tool_input.get('command', '')[:60]
if cmd:
current_task['commands'].append(cmd)
# Текст ответа — проверить на ошибки
if item.get('type') == 'text':
text = item.get('text', '').lower()
if any(err in text for err in ['error', 'ошибка', 'failed', 'не удалось', 'traceback']):
current_task['has_error'] = True
# Добавить последнюю задачу
if current_task:
tasks.append(current_task)
# Определить результаты задач
for i, task in enumerate(tasks):
is_last = (i == len(tasks) - 1)
if task['files_written']:
# Были изменения файлов
if task['has_error']:
task['result'] = 'error'
elif is_last:
task['result'] = 'in_progress'
else:
task['result'] = 'done'
elif task['commands']:
# Были команды но не файлы
if task['has_error']:
task['result'] = 'error'
elif is_last:
task['result'] = 'in_progress'
else:
task['result'] = 'done'
elif task['files_read']:
# Только чтение — обсуждали
task['result'] = 'discussed'
else:
# Ничего не делали — обсуждали
task['result'] = 'discussed'
# Собрать незавершённые
unfinished = [t for t in tasks if t['result'] in ('in_progress', 'discussed', 'error')]
return {
'tasks': tasks,
'unfinished': unfinished,
'total': len(tasks),
'done_count': len([t for t in tasks if t['result'] == 'done']),
'unfinished_count': len(unfinished),
}
def get_todos(session_id: str) -> List[Dict]:
"""Получить TODO сессии"""
todo_file = TODOS_DIR / f"{session_id}.json"
if not todo_file.exists():
return []
try:
with open(todo_file, 'r') as f:
data = json.load(f)
return data.get('todos', [])
except (FileNotFoundError, json.JSONDecodeError, KeyError):
return [] # Missing or corrupted todo file
def analyze_topics(messages: List[Dict]) -> List[Dict]:
"""Разбить сессию на темы по паузам и смене контекста"""
if not messages:
return []
topics = []
current_topic = {
'start_ts': messages[0].get('timestamp'),
'messages': [],
'keywords': [],
}
prev_ts = messages[0].get('timestamp', 0)
for msg in messages:
ts = msg.get('timestamp', 0)
text = msg.get('display', '')
# Пауза > 20 минут = новая тема
pause_min = (ts - prev_ts) / 60000
# Явное переключение
switch_phrases = ['подожди', 'давай другое', 'отвлечёмся', 'сначала', 'переключ']
is_switch = any(p in text.lower() for p in switch_phrases)
if pause_min > TOPIC_PAUSE_MINUTES or is_switch:
# Завершить текущую тему
if current_topic['messages']:
all_text = ' '.join([m.get('display', '') for m in current_topic['messages']])
current_topic['keywords'] = extract_keywords(all_text, 3)
current_topic['name'] = ', '.join(current_topic['keywords']) or 'без темы'
current_topic['end_ts'] = prev_ts
topics.append(current_topic)
# Начать новую
current_topic = {
'start_ts': ts,
'messages': [],
'keywords': [],
'switch_reason': 'пауза' if pause_min > TOPIC_PAUSE_MINUTES else 'явное переключение',
}
current_topic['messages'].append(msg)
prev_ts = ts
# Последняя тема
if current_topic['messages']:
all_text = ' '.join([m.get('display', '') for m in current_topic['messages']])
current_topic['keywords'] = extract_keywords(all_text, 3)
current_topic['name'] = ', '.join(current_topic['keywords']) or 'без темы'
current_topic['end_ts'] = prev_ts
topics.append(current_topic)
return topics
def find_unfinished(session: Dict, topics: List[Dict], todos: List[Dict]) -> List[Dict]:
"""Найти незавершённые дела"""
unfinished = []
# Незавершённые TODO
for todo in todos:
if todo.get('status') != 'completed':
unfinished.append({
'type': 'todo',
'status': todo.get('status', 'pending'),
'text': todo.get('content', '?'),
})
# Темы без завершения (кроме последней)
for i, topic in enumerate(topics[:-1]):
# Если после темы было переключение — она не завершена
if i + 1 < len(topics):
unfinished.append({
'type': 'topic',
'name': topic['name'],
'reason': topics[i + 1].get('switch_reason', 'переключение'),
})
return unfinished
# ============== КОМАНДЫ ==============
def cmd_list(filter_project: str = None, days: int = DAYS_DEFAULT, show_all: bool = False, inactive_only: bool = False):
"""Показать список сессий"""
sessions = load_all_sessions()
cutoff = datetime.now() - timedelta(days=days)
now = datetime.now()
# Фильтрация
filtered = []
for sid, data in sessions.items():
if data['last_dt'] < cutoff:
continue
if filter_project and filter_project.lower() not in data['project'].lower():
continue
# Скрыть continued/closed если не show_all
if not show_all and data.get('status') in ('continued', 'closed'):
continue
# Фильтр неактивных (больше 1 минуты)
if inactive_only:
inactive_seconds = (now - data['last_dt']).total_seconds()
if inactive_seconds < 60: # Активна если меньше 60 секунд
continue
filtered.append(data)
# Сортировка по времени
filtered.sort(key=lambda x: x['last_ts'], reverse=True)
# Вывод
title = "СЕССИИ"
if filter_project:
title += f" ({filter_project})"
if show_all:
title += " [все]"
title += f" — последние {days} дней"
print()
print(title)
print("━" * 75)
print(f" # │ {'Проект':<11} │ {'Что делали':<35} │ Когда")
print("────┼" + "─" * 13 + "┼" + "─" * 37 + "┼" + "─" * 12)
for i, s in enumerate(filtered[:15], 1):
proj = s['project'][:11]
summary = s['summary'][:35]
when = s['last_dt'].strftime('%m-%d %H:%M')
status_mark = ""
if s.get('status') == 'continued':
status_mark = " ✓"
elif s.get('status') == 'closed':
status_mark = " ✗"
print(f" {i:<2} │ {proj:<11} │ {summary:<35} │ {when}{status_mark}")
print("━" * 75)
print()
print(" сессия N — детали сессии")
print(" сессия СЛОВО — поиск по слову")
print(" продолжить N — пометить как продолженную")
print(" закрыть N — убрать из списка")
print(" сессии --all — показать все (включая закрытые)")
print()
def cmd_search(query: str, days: int = DAYS_DEFAULT):
"""Поиск сессий по ключевому слову"""
sessions = load_all_sessions()
cutoff = datetime.now() - timedelta(days=days)
query_lower = query.lower()
# Поиск
matches = []
for sid, data in sessions.items():
if data['last_dt'] < cutoff:
continue
all_text = ' '.join([m.get('display', '') for m in data['messages']]).lower()
if query_lower in all_text or query_lower in data['project'].lower():
# Считаем релевантность
score = all_text.count(query_lower)
data['_score'] = score
matches.append(data)
# Сортировка по релевантности
matches.sort(key=lambda x: x['_score'], reverse=True)
# Вывод
print()
print(f"ПОИСК: \"{query}\" — найдено {len(matches)}")
print("━" * 70)
if not matches:
print(" Ничего не найдено")
else:
print(f" # │ {'Проект':<12} │ {'Что делали':<28} │ Когда")
print("────┼" + "─" * 14 + "┼" + "─" * 30 + "┼" + "─" * 12)
for i, s in enumerate(matches[:10], 1):
proj = s['project'][:12]
summary = s['summary'][:28]
when = s['last_dt'].strftime('%m-%d %H:%M')
print(f" {i:<2} │ {proj:<12} │ {summary:<28} │ {when}")
print("━" * 70)
print()
def cmd_analyze(identifier: str):
"""Анализ конкретной сессии — задачи и результаты"""
sessions = load_all_sessions()
# Найти сессию по номеру или ID
session = None
if identifier.isdigit():
# По номеру из списка (только open сессии)
idx = int(identifier) - 1
sorted_sessions = sorted(
[s for s in sessions.values() if s.get('status') not in ('continued', 'closed')],
key=lambda x: x['last_ts'], reverse=True
)
if 0 <= idx < len(sorted_sessions):
session = sorted_sessions[idx]
else:
# По ID (частичному)
for sid, data in sessions.items():
if identifier in sid:
session = data
break
if not session:
print(f"\nСессия '{identifier}' не найдена\n")
return
# Анализ задач
analysis = analyze_session_tasks(session['id'])
# Вывод заголовка
print()
print(f"СЕССИЯ: {session['project']} | {session['summary'][:40]}")
print("━" * 75)
print(f"ID: {session['id'][:16]}...")
print(f"Когда: {session['first_dt'].strftime('%Y-%m-%d %H:%M')} — {session['last_dt'].strftime('%H:%M')}")
print(f"Сообщений: {session['msg_count']}")
if analysis:
print(f"Задач: {analysis['total']} (выполнено: {analysis['done_count']}, незавершено: {analysis['unfinished_count']})")
print()
# Таблица задач
if analysis and analysis['tasks']:
result_icons = {
'done': '✓',
'in_progress': '▶',
'discussed': '○',
'error': '✗',
'unknown': '?',
}
result_labels = {
'done': 'Готово',
'in_progress': 'В процессе',
'discussed': 'Обсудили',
'error': 'Ошибка',
'unknown': '?',
}
print("ЗАДАЧИ:")
print("━" * 75)
print(f" # │ {'Задача':<45} │ Результат")
print("────┼" + "─" * 47 + "┼" + "─" * 15)
for i, task in enumerate(analysis['tasks'], 1):
icon = result_icons.get(task['result'], '?')
label = result_labels.get(task['result'], '?')
name = task['name'][:45]
print(f" {i:<2} │ {name:<45} │ {icon} {label}")
print("━" * 75)
print()
# Незавершённое
if analysis and analysis['unfinished']:
print(f"НЕЗАВЕРШЁННОЕ ({len(analysis['unfinished'])}):")
print("-" * 75)
for i, task in enumerate(analysis['unfinished'], 1):
icon = result_icons.get(task['result'], '?')
print(f" [{i}] {icon} {task['name']}")
if task['files_written']:
files_short = [f.replace('/opt/claude-workspace/', '') for f in task['files_written'][:2]]
print(f" Файлы: {', '.join(files_short)}")
print()
print("━" * 75)
print(" разобрать N — продолжить работу над задачей N")
print(" продолжить — пометить сессию продолженной")
print()
def cmd_continue(identifier: str):
"""Пометить сессию как продолженную"""
sessions = load_all_sessions()
index = load_index()
# Найти сессию
session = None
if identifier.isdigit():
idx = int(identifier) - 1
sorted_sessions = sorted(
[s for s in sessions.values() if s.get('status') not in ('continued', 'closed')],
key=lambda x: x['last_ts'], reverse=True
)
if 0 <= idx < len(sorted_sessions):
session = sorted_sessions[idx]
else:
for sid, data in sessions.items():
if identifier in sid:
session = data
break
if not session:
print(f"\nСессия '{identifier}' не найдена\n")
return
# Обновить статус
index['sessions'][session['id']] = {
'status': 'continued',
'project': session['project'],
'summary': session['summary'][:50],
'continued_at': datetime.now().isoformat(),
}
save_index(index)
print(f"\n✓ Сессия помечена как продолженная: {session['project']} | {session['summary'][:40]}\n")
def cmd_close(identifier: str):
"""Закрыть сессию (убрать из списка)"""
sessions = load_all_sessions()
index = load_index()
# Найти сессию
session = None
if identifier.isdigit():
idx = int(identifier) - 1
sorted_sessions = sorted(
[s for s in sessions.values() if s.get('status') not in ('continued', 'closed')],
key=lambda x: x['last_ts'], reverse=True
)
if 0 <= idx < len(sorted_sessions):
session = sorted_sessions[idx]
else:
for sid, data in sessions.items():
if identifier in sid:
session = data
break
if not session:
print(f"\nСессия '{identifier}' не найдена\n")
return
# Обновить статус
index['sessions'][session['id']] = {
'status': 'closed',
'project': session['project'],
'summary': session['summary'][:50],
'closed_at': datetime.now().isoformat(),
}
save_index(index)
print(f"\n✗ Сессия закрыта: {session['project']} | {session['summary'][:40]}\n")
def cmd_reopen(identifier: str):
"""Вернуть сессию в список (статус open)"""
sessions = load_all_sessions()
index = load_index()
# Найти сессию (включая закрытые)
session = None
if identifier.isdigit():
idx = int(identifier) - 1
sorted_sessions = sorted(sessions.values(), key=lambda x: x['last_ts'], reverse=True)
if 0 <= idx < len(sorted_sessions):
session = sorted_sessions[idx]
else:
for sid, data in sessions.items():
if identifier in sid:
session = data
break
if not session:
print(f"\nСессия '{identifier}' не найдена\n")
return
# Удалить из индекса (вернуть статус open)
if session['id'] in index['sessions']:
del index['sessions'][session['id']]
save_index(index)
print(f"\n↩ Сессия возвращена в список: {session['project']} | {session['summary'][:40]}\n")
# ============== MAIN ==============
def main():
# Пробуем использовать новый SessionManager
if USE_NEW_MANAGER:
session_main()
return
# Fallback на старую логику
args = sys.argv[1:]
if not args or args[0] in ['list', 'ls', '-l']:
# Список
show_all = '--all' in args or '-a' in args
inactive_only = '--inactive' in args or '-i' in args
filter_proj = None
for a in args[1:]:
if not a.startswith('-'):
filter_proj = a
break
cmd_list(filter_proj, show_all=show_all, inactive_only=inactive_only)
elif args[0] in ['search', 's', 'find']:
# Поиск
if len(args) < 2:
print("\nИспользование: sessions.py search СЛОВО\n")
return
cmd_search(args[1])
elif args[0] in ['analyze', 'a', 'show']:
# Анализ
if len(args) < 2:
print("\nИспользование: sessions.py analyze N|ID\n")
return
cmd_analyze(args[1])
elif args[0] in ['continue', 'c', 'cont']:
# Продолжить
if len(args) < 2:
print("\nИспользование: sessions.py continue N|ID\n")
return
cmd_continue(args[1])
elif args[0] in ['close', 'x', 'done']:
# Закрыть
if len(args) < 2:
print("\nИспользование: sessions.py close N|ID\n")
return
cmd_close(args[1])
elif args[0] in ['reopen', 'open', 'r']:
# Вернуть
if len(args) < 2:
print("\nИспользование: sessions.py reopen N|ID\n")
return
cmd_reopen(args[1])
elif args[0] == '--all' or args[0] == '-a':
cmd_list(show_all=True)
else:
# Первый аргумент = поиск или номер
if args[0].isdigit():
cmd_analyze(args[0])
else:
cmd_search(args[0])
if __name__ == '__main__':
main()