""" Прямой клиент Claude Haiku 4.5 (Anthropic SDK) — альтернатива OpenClaw gateway. Отличия от `llm.ask_agent_stream`: * Сессия и история живут **локально** на клиенте (JSON в HISTORY_DIR/{agent}-{date}.json). Смена даты = автосброс. * Prompt caching через Anthropic cache_control: system prompt и старая часть истории кешируются на 5 минут → latency first-token ниже, стоимость -90% на cached-tokens. * Используется когда LLM_BACKEND=claude. Если HTTPS_PROXY задан (напр. http://192.168.31.103:8888) — httpx подхватит автоматически, Anthropic SDK пойдёт через прокси. """ import json import os import time from datetime import date from pathlib import Path try: import anthropic except ImportError: anthropic = None # SDK опциональный, активируется только при LLM_BACKEND=claude from .config import log from .text import clean_for_speech from .tts import speak, play_error_sound from . import notifier from .llm import strip_fillers # переиспользуем чистку филлеров from .tools import TOOL_SCHEMAS, execute_tool ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") ANTHROPIC_MODEL = os.getenv("ANTHROPIC_MODEL", "claude-haiku-4-5") HISTORY_DIR = Path(os.getenv("HISTORY_DIR", "data/history")) MAX_TOKENS = int(os.getenv("VOICE_MAX_TOKENS", "300")) MAX_HISTORY_MESSAGES = int(os.getenv("MAX_HISTORY", "40")) # Граница кеша — все сообщения кроме последних N идут в cache block, # что даёт prompt caching хит каждый турн. CACHE_TAIL_UNCACHED = 2 COSMO_SYSTEM_PROMPT = """Ты — Cosmo, домашний голосовой ассистент Даниила (Санкт-Петербург). Стиль: - Короткие ответы: 1-2 предложения, редко 3. Это голосовой канал — многословность утомляет. - Разговорный русский, без канцелярита, без формальных оборотов («здравствуйте», «уважаемый»). - Обращение на «ты». - Не предваряй ответ фразами-заполнителями («сейчас посмотрю», «минутку», «проверяю») — сразу отвечай. - Без эмодзи, маркированных списков, код-блоков — всё будет зачитано. - Если не знаешь — скажи коротко, не оправдывайся. ЖЁСТКИЕ ПРАВИЛА про tools: 1. Любое ДЕЙСТВИЕ (поставить/отменить/изменить таймер, что-то включить/выключить) делается ТОЛЬКО через вызов tool. Без tool действие не произошло. 2. Никогда не говори «поставил», «отменил», «удалил», «добавил», «изменил», если ты в этом же turn'e не вызвал соответствующий tool. Это галлюцинация, пользователь потом обнаружит что ничего не изменилось и не будет тебе доверять. 3. Любая АКТУАЛЬНАЯ ИНФОРМАЦИЯ (погода, транспорт, события в календаре, содержимое заметок) — всегда через tool. Не выдумывай числа и факты. 4. Порядок: сначала tool → потом в том же turn'e сформулируй ответ на основе результата. Не пересказывай сырые данные дословно — дай человеческую сводку. 5. Если подходящего tool нет — честно скажи «так я не умею», а не притворяйся. Доступные tools: get_weather, get_transport, get_today_events, create_event, update_event, delete_event, get_notes, set_timer, cancel_timer, adjust_timer. Работа с календарём: - У Даниила и Светы разные календари. Параметр owner обязательный. - Если пользователь не уточнил чей календарь — СПРОСИ прежде чем вызывать create_event. Не угадывай даже если контекст намекает. - Для изменения или удаления события сначала вызови get_today_events (можно с range=week/month), найди нужное событие по названию и времени, потом действуй с его event_id и owner. - Даты в формате YYYY-MM-DD (2026-04-24), времена HH:MM (14:30). «завтра» = сегодня+1 по дате, «послезавтра» = +2. Сегодня {today}. Контекст: Даниил — разработчик, живёт в СПб с женой Светой.""" LUSYA_SYSTEM_PROMPT = """Ты — Люся, домашний голосовой ассистент Светы (Санкт-Петербург). Стиль: - Тёплый, заботливый, чуть эмоциональный, но лаконичный. 1-2 предложения. - Обращение на «ты». - Без эмодзи, списков, код-блоков — это голос. - Если не знаешь — скажи коротко. ЖЁСТКИЕ ПРАВИЛА про tools: 1. Действия (таймер, события) — только через вызов tool. Без tool действие не произошло. 2. Не говори «поставила/отменила/изменила», если ты не вызвала соответствующий tool. 3. Информацию (погода, транспорт, события) — всегда через tool, не выдумывай. 4. Tool → результат → короткий ответ человеческим языком. Календарь: - Свой = Светин, ещё есть календарь Данила. Для create_event уточняй в какой календарь, если неясно. - Для update_event / delete_event: сначала get_today_events, найди по названию, потом действуй. - Даты YYYY-MM-DD, время HH:MM. Сегодня {today}.""" _client: "anthropic.Anthropic | None" = None def _get_client() -> "anthropic.Anthropic": global _client if anthropic is None: raise RuntimeError( "anthropic SDK не установлен. Запусти `pip install anthropic` " "или оставь LLM_BACKEND=openclaw." ) if not ANTHROPIC_API_KEY: raise RuntimeError("ANTHROPIC_API_KEY не задан в .env") if _client is None: _client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) return _client def _system_prompt(agent_id: str) -> str: template = LUSYA_SYSTEM_PROMPT if agent_id == "lusya" else COSMO_SYSTEM_PROMPT return template.format(today=date.today().isoformat()) def _history_path(agent_id: str) -> Path: HISTORY_DIR.mkdir(parents=True, exist_ok=True) today = date.today().isoformat() return HISTORY_DIR / f"{agent_id}-{today}.json" def load_history(agent_id: str) -> list[dict]: path = _history_path(agent_id) if not path.exists(): return [] try: return json.loads(path.read_text(encoding="utf-8")) except Exception: log.exception(f"Не смог прочитать историю {path}") return [] def save_history(agent_id: str, history: list[dict]): path = _history_path(agent_id) try: path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding="utf-8") except Exception: log.exception(f"Не смог сохранить историю {path}") def reset_history(agent_id: str): """Удаляет историю диалога за текущий день.""" path = _history_path(agent_id) if path.exists(): path.unlink() log.info(f"История сброшена: {path}") def _strip_cache_control(content): """Убирает cache_control из блоков — при сохранении в историю оно не нужно (следующий turn заново посчитает границу).""" if isinstance(content, list): cleaned = [] for block in content: if isinstance(block, dict): block_copy = {k: v for k, v in block.items() if k != "cache_control"} cleaned.append(block_copy) else: cleaned.append(block) return cleaned return content def _wrap_last_block_with_cache(content): """Добавляет cache_control на последний блок/строку content. Для string: оборачивает в [{type:text, text, cache_control}]. Для list[block]: делает копию и добавляет cache_control к последнему блоку.""" if isinstance(content, str): return [{ "type": "text", "text": content, "cache_control": {"type": "ephemeral"}, }] if isinstance(content, list) and content: new_list = list(content) last = dict(new_list[-1]) if isinstance(new_list[-1], dict) else new_list[-1] if isinstance(last, dict): last["cache_control"] = {"type": "ephemeral"} new_list[-1] = last return new_list return content def _build_messages(history: list[dict]) -> list[dict]: """ Готовит messages array для Claude API с prompt caching. Последние N=CACHE_TAIL_UNCACHED сообщений остаются динамическими (без кеша), всё что раньше — помечается cache_control на границе (на последнем блоке последнего «старого» сообщения). Content может быть строкой или списком блоков (tool_use/tool_result turn'ы). """ if len(history) <= CACHE_TAIL_UNCACHED: return [{"role": m["role"], "content": m["content"]} for m in history] cache_boundary = len(history) - CACHE_TAIL_UNCACHED messages = [] for i, msg in enumerate(history): if i == cache_boundary - 1: messages.append({ "role": msg["role"], "content": _wrap_last_block_with_cache(msg["content"]), }) else: messages.append({"role": msg["role"], "content": msg["content"]}) return messages MAX_TOOL_ROUNDS = 4 # safety: не даём Claude крутить tools бесконечно def _call_once(client, system_blocks, messages): """Один вызов без стрима — нужен для tool-use round trips. Возвращает final Message object (с usage, content blocks, stop_reason).""" return client.messages.create( model=ANTHROPIC_MODEL, max_tokens=MAX_TOKENS, system=system_blocks, messages=messages, tools=TOOL_SCHEMAS, ) def ask_claude_stream(text: str, agent_id: str = "cosmo") -> str: """Спросить Claude Haiku 4.5 напрямую, с поддержкой tool use. Поток tool-use раундов: Claude → tool_use → мы выполняем → tool_result → Claude → ... → текст. Возвращает финальный текст ответа (cleaned).""" def _speak_if_local(t: str): if t.strip() and notifier.speak_locally(): speak(t, agent_id) try: client = _get_client() except RuntimeError as e: log.error(str(e)) msg = "Клод не настроен, попробуй OpenClaw." play_error_sound() notifier.error(msg, agent_id) _speak_if_local(msg) return msg history = load_history(agent_id) history.append({"role": "user", "content": text}) if len(history) > MAX_HISTORY_MESSAGES: history = history[-MAX_HISTORY_MESSAGES:] system_blocks = [{ "type": "text", "text": _system_prompt(agent_id), "cache_control": {"type": "ephemeral"}, }] # messages для API — строится из history плюс накапливающихся tool-use/tool-result блоков api_messages = _build_messages(history) total_start = time.time() total_in = 0 total_out = 0 total_cache_r = 0 total_cache_w = 0 final_text = "" # Собираем всю цепочку (assistant content blocks, tool results) чтобы одним куском сохранить в history assistant_blocks_accumulated: list[dict] = [] try: for round_i in range(MAX_TOOL_ROUNDS): round_start = time.time() resp = _call_once(client, system_blocks, api_messages) usage = resp.usage total_in += usage.input_tokens total_out += usage.output_tokens total_cache_r += getattr(usage, "cache_read_input_tokens", 0) or 0 total_cache_w += getattr(usage, "cache_creation_input_tokens", 0) or 0 # Разбираем content на text + tool_use text_chunks = [] tool_uses = [] for block in resp.content: btype = getattr(block, "type", None) if btype == "text": text_chunks.append(block.text) elif btype == "tool_use": tool_uses.append(block) # Копим текст ассистента (может быть между tool-вызовами в новых моделях) final_text += "".join(text_chunks) # Добавляем ответ ассистента в api_messages (как есть) # Это ВАЖНО: для tool_result следующим сообщением assistant content должен быть сохранён # ровно как вернул API, чтобы tool_use_id совпал. assistant_content = [ # Конвертируем объекты anthropic SDK в dict-представление b.model_dump() if hasattr(b, "model_dump") else dict(b.__dict__) for b in resp.content ] api_messages.append({"role": "assistant", "content": assistant_content}) assistant_blocks_accumulated.extend(assistant_content) print( f"🧠 round {round_i + 1} {time.time() - round_start:.2f}s · " f"stop={resp.stop_reason} · in={usage.input_tokens} out={usage.output_tokens} " f"cache_r={getattr(usage, 'cache_read_input_tokens', 0) or 0}" ) if resp.stop_reason == "tool_use" and tool_uses: # Выполняем все запрошенные tools, собираем tool_result блоки tool_results = [] for tu in tool_uses: name = tu.name tu_id = tu.id params = tu.input or {} print(f"🔧 Tool: {name}({params})") result = execute_tool(name, params, agent_id) # Упаковываем результат в JSON-строку (Claude ожидает string в tool_result content) import json as _json result_str = _json.dumps(result, ensure_ascii=False) print(f" → {result_str[:200]}") tool_results.append({ "type": "tool_result", "tool_use_id": tu_id, "content": result_str, }) # Добавляем user-message с результатами tools api_messages.append({"role": "user", "content": tool_results}) # Продолжаем цикл — Claude обработает результаты и либо вызовет ещё, либо выдаст текст continue # stop_reason="end_turn" / "max_tokens" / "stop_sequence" — готов финальный ответ break elapsed = time.time() - total_start print( f"🧠 Claude {ANTHROPIC_MODEL} total {elapsed:.2f}s · " f"in={total_in} out={total_out} " f"cache_r={total_cache_r} cache_w={total_cache_w}" ) except anthropic.APIConnectionError: log.exception("Anthropic API connection error") msg = "Не могу связаться с Клодом." play_error_sound() notifier.error(msg, agent_id) _speak_if_local(msg) return msg except anthropic.APITimeoutError: log.exception("Anthropic timeout") msg = "Клод не ответил вовремя." play_error_sound() notifier.error(msg, agent_id) _speak_if_local(msg) return msg except anthropic.APIStatusError as e: status = getattr(e, "status_code", "?") log.exception(f"Anthropic API status {status}") msg = "Ошибка Клода." play_error_sound() notifier.error(msg, agent_id) _speak_if_local(msg) return msg except Exception as e: log.exception(f"Неожиданная ошибка Claude: {e}") msg = "Что-то сломалось." play_error_sound() notifier.error(msg, agent_id) _speak_if_local(msg) return msg if not final_text: msg = "Не получил ответ." notifier.error(msg, agent_id) _speak_if_local(msg) return msg # Сохраняем полный ассистентский turn (включая tool_use / tool_result блоки). # Это критично чтобы Claude помнил что он реально делал инструментами — # иначе на следующем turn'e он может галлюцинировать действия («отменил таймер») # не вызывая реальные tools. # api_messages к концу содержит: [...history_before_user, user(text), ...turns] # где history уже включает новый user. Нам надо добавить всё после user msg. initial_user_idx = len(history) - 1 # позиция текущего user msg в api_messages new_turns = api_messages[initial_user_idx + 1:] for turn in new_turns: history.append({ "role": turn["role"], "content": _strip_cache_control(turn["content"]), }) save_history(agent_id, history) result = clean_for_speech(strip_fillers(final_text)) _speak_if_local(result) return result