Files
home-voice-assistant/satellite/llm_claude.py
Cosmo 52c42f3d06 feat(tools): calendar CRUD tools — create_event, update_event, delete_event
- create_event(title, date, start_time?, end_time?, all_day?, owner)
  owner обязателен (daniil | sveta). System prompt велит LLM уточнять
  чей это календарь, если неясно.
- update_event(event_id, owner, ...fields) — меняет только переданные
  поля. Сначала нужно вызвать get_today_events для получения event_id.
- delete_event(event_id, owner) — сначала get_today_events, найти
  событие по названию, подтвердить если важное.

get_today_events теперь возвращает event_id и owner (daniil/sveta),
плюс принимает range=month. Description явно говорит LLM что это
первый tool для CRUD-сценариев.

System prompt (Cosmo и Люся) дополнен секцией 'Работа с календарём'
с правилами: даты YYYY-MM-DD, время HH:MM, «завтра» = +1 день,
вычислять от {today}.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 14:34:40 +00:00

400 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Прямой клиент Claude Haiku 4.5 (Anthropic SDK) — альтернатива OpenClaw gateway.
Отличия от `llm.ask_agent_stream`:
* Сессия и история живут **локально** на клиенте (JSON в HISTORY_DIR/{agent}-{date}.json).
Смена даты = автосброс.
* Prompt caching через Anthropic cache_control: system prompt и старая часть истории
кешируются на 5 минут → latency first-token ниже, стоимость -90% на cached-tokens.
* Используется когда LLM_BACKEND=claude.
Если HTTPS_PROXY задан (напр. http://192.168.31.103:8888) — httpx подхватит автоматически,
Anthropic SDK пойдёт через прокси.
"""
import json
import os
import time
from datetime import date
from pathlib import Path
try:
import anthropic
except ImportError:
anthropic = None # SDK опциональный, активируется только при LLM_BACKEND=claude
from .config import log
from .text import clean_for_speech
from .tts import speak, play_error_sound
from . import notifier
from .llm import strip_fillers # переиспользуем чистку филлеров
from .tools import TOOL_SCHEMAS, execute_tool
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = os.getenv("ANTHROPIC_MODEL", "claude-haiku-4-5")
HISTORY_DIR = Path(os.getenv("HISTORY_DIR", "data/history"))
MAX_TOKENS = int(os.getenv("VOICE_MAX_TOKENS", "300"))
MAX_HISTORY_MESSAGES = int(os.getenv("MAX_HISTORY", "40"))
# Граница кеша — все сообщения кроме последних N идут в cache block,
# что даёт prompt caching хит каждый турн.
CACHE_TAIL_UNCACHED = 2
COSMO_SYSTEM_PROMPT = """Ты — Cosmo, домашний голосовой ассистент Даниила (Санкт-Петербург).
Стиль:
- Короткие ответы: 1-2 предложения, редко 3. Это голосовой канал — многословность утомляет.
- Разговорный русский, без канцелярита, без формальных оборотов («здравствуйте», «уважаемый»).
- Обращение на «ты».
- Не предваряй ответ фразами-заполнителями («сейчас посмотрю», «минутку», «проверяю») — сразу отвечай.
- Без эмодзи, маркированных списков, код-блоков — всё будет зачитано.
- Если не знаешь — скажи коротко, не оправдывайся.
ЖЁСТКИЕ ПРАВИЛА про tools:
1. Любое ДЕЙСТВИЕ (поставить/отменить/изменить таймер, что-то включить/выключить)
делается ТОЛЬКО через вызов tool. Без tool действие не произошло.
2. Никогда не говори «поставил», «отменил», «удалил», «добавил», «изменил»,
если ты в этом же turn'e не вызвал соответствующий tool. Это галлюцинация,
пользователь потом обнаружит что ничего не изменилось и не будет тебе доверять.
3. Любая АКТУАЛЬНАЯ ИНФОРМАЦИЯ (погода, транспорт, события в календаре,
содержимое заметок) — всегда через tool. Не выдумывай числа и факты.
4. Порядок: сначала tool → потом в том же turn'e сформулируй ответ на основе
результата. Не пересказывай сырые данные дословно — дай человеческую сводку.
5. Если подходящего tool нет — честно скажи «так я не умею», а не притворяйся.
Доступные tools: get_weather, get_transport, get_today_events, create_event,
update_event, delete_event, get_notes, set_timer, cancel_timer, adjust_timer.
Работа с календарём:
- У Даниила и Светы разные календари. Параметр owner обязательный.
- Если пользователь не уточнил чей календарь — СПРОСИ прежде чем вызывать
create_event. Не угадывай даже если контекст намекает.
- Для изменения или удаления события сначала вызови get_today_events
(можно с range=week/month), найди нужное событие по названию и времени,
потом действуй с его event_id и owner.
- Даты в формате YYYY-MM-DD (2026-04-24), времена HH:MM (14:30).
«завтра» = сегодня+1 по дате, «послезавтра» = +2. Сегодня {today}.
Контекст: Даниил — разработчик, живёт в СПб с женой Светой."""
LUSYA_SYSTEM_PROMPT = """Ты — Люся, домашний голосовой ассистент Светы (Санкт-Петербург).
Стиль:
- Тёплый, заботливый, чуть эмоциональный, но лаконичный. 1-2 предложения.
- Обращение на «ты».
- Без эмодзи, списков, код-блоков — это голос.
- Если не знаешь — скажи коротко.
ЖЁСТКИЕ ПРАВИЛА про tools:
1. Действия (таймер, события) — только через вызов tool. Без tool действие не произошло.
2. Не говори «поставила/отменила/изменила», если ты не вызвала соответствующий tool.
3. Информацию (погода, транспорт, события) — всегда через tool, не выдумывай.
4. Tool → результат → короткий ответ человеческим языком.
Календарь:
- Свой = Светин, ещё есть календарь Данила. Для create_event уточняй
в какой календарь, если неясно.
- Для update_event / delete_event: сначала get_today_events, найди по
названию, потом действуй.
- Даты YYYY-MM-DD, время HH:MM. Сегодня {today}."""
_client: "anthropic.Anthropic | None" = None
def _get_client() -> "anthropic.Anthropic":
global _client
if anthropic is None:
raise RuntimeError(
"anthropic SDK не установлен. Запусти `pip install anthropic` "
"или оставь LLM_BACKEND=openclaw."
)
if not ANTHROPIC_API_KEY:
raise RuntimeError("ANTHROPIC_API_KEY не задан в .env")
if _client is None:
_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
return _client
def _system_prompt(agent_id: str) -> str:
template = LUSYA_SYSTEM_PROMPT if agent_id == "lusya" else COSMO_SYSTEM_PROMPT
return template.format(today=date.today().isoformat())
def _history_path(agent_id: str) -> Path:
HISTORY_DIR.mkdir(parents=True, exist_ok=True)
today = date.today().isoformat()
return HISTORY_DIR / f"{agent_id}-{today}.json"
def load_history(agent_id: str) -> list[dict]:
path = _history_path(agent_id)
if not path.exists():
return []
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
log.exception(f"Не смог прочитать историю {path}")
return []
def save_history(agent_id: str, history: list[dict]):
path = _history_path(agent_id)
try:
path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception:
log.exception(f"Не смог сохранить историю {path}")
def reset_history(agent_id: str):
"""Удаляет историю диалога за текущий день."""
path = _history_path(agent_id)
if path.exists():
path.unlink()
log.info(f"История сброшена: {path}")
def _strip_cache_control(content):
"""Убирает cache_control из блоков — при сохранении в историю оно не нужно
(следующий turn заново посчитает границу)."""
if isinstance(content, list):
cleaned = []
for block in content:
if isinstance(block, dict):
block_copy = {k: v for k, v in block.items() if k != "cache_control"}
cleaned.append(block_copy)
else:
cleaned.append(block)
return cleaned
return content
def _wrap_last_block_with_cache(content):
"""Добавляет cache_control на последний блок/строку content.
Для string: оборачивает в [{type:text, text, cache_control}].
Для list[block]: делает копию и добавляет cache_control к последнему блоку."""
if isinstance(content, str):
return [{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"},
}]
if isinstance(content, list) and content:
new_list = list(content)
last = dict(new_list[-1]) if isinstance(new_list[-1], dict) else new_list[-1]
if isinstance(last, dict):
last["cache_control"] = {"type": "ephemeral"}
new_list[-1] = last
return new_list
return content
def _build_messages(history: list[dict]) -> list[dict]:
"""
Готовит messages array для Claude API с prompt caching.
Последние N=CACHE_TAIL_UNCACHED сообщений остаются динамическими (без кеша),
всё что раньше — помечается cache_control на границе (на последнем блоке
последнего «старого» сообщения).
Content может быть строкой или списком блоков (tool_use/tool_result turn'ы).
"""
if len(history) <= CACHE_TAIL_UNCACHED:
return [{"role": m["role"], "content": m["content"]} for m in history]
cache_boundary = len(history) - CACHE_TAIL_UNCACHED
messages = []
for i, msg in enumerate(history):
if i == cache_boundary - 1:
messages.append({
"role": msg["role"],
"content": _wrap_last_block_with_cache(msg["content"]),
})
else:
messages.append({"role": msg["role"], "content": msg["content"]})
return messages
MAX_TOOL_ROUNDS = 4 # safety: не даём Claude крутить tools бесконечно
def _call_once(client, system_blocks, messages):
"""Один вызов без стрима — нужен для tool-use round trips.
Возвращает final Message object (с usage, content blocks, stop_reason)."""
return client.messages.create(
model=ANTHROPIC_MODEL,
max_tokens=MAX_TOKENS,
system=system_blocks,
messages=messages,
tools=TOOL_SCHEMAS,
)
def ask_claude_stream(text: str, agent_id: str = "cosmo") -> str:
"""Спросить Claude Haiku 4.5 напрямую, с поддержкой tool use.
Поток tool-use раундов: Claude → tool_use → мы выполняем → tool_result → Claude → ... → текст.
Возвращает финальный текст ответа (cleaned)."""
def _speak_if_local(t: str):
if t.strip() and notifier.speak_locally():
speak(t, agent_id)
try:
client = _get_client()
except RuntimeError as e:
log.error(str(e))
msg = "Клод не настроен, попробуй OpenClaw."
play_error_sound()
notifier.error(msg, agent_id)
_speak_if_local(msg)
return msg
history = load_history(agent_id)
history.append({"role": "user", "content": text})
if len(history) > MAX_HISTORY_MESSAGES:
history = history[-MAX_HISTORY_MESSAGES:]
system_blocks = [{
"type": "text",
"text": _system_prompt(agent_id),
"cache_control": {"type": "ephemeral"},
}]
# messages для API — строится из history плюс накапливающихся tool-use/tool-result блоков
api_messages = _build_messages(history)
total_start = time.time()
total_in = 0
total_out = 0
total_cache_r = 0
total_cache_w = 0
final_text = ""
# Собираем всю цепочку (assistant content blocks, tool results) чтобы одним куском сохранить в history
assistant_blocks_accumulated: list[dict] = []
try:
for round_i in range(MAX_TOOL_ROUNDS):
round_start = time.time()
resp = _call_once(client, system_blocks, api_messages)
usage = resp.usage
total_in += usage.input_tokens
total_out += usage.output_tokens
total_cache_r += getattr(usage, "cache_read_input_tokens", 0) or 0
total_cache_w += getattr(usage, "cache_creation_input_tokens", 0) or 0
# Разбираем content на text + tool_use
text_chunks = []
tool_uses = []
for block in resp.content:
btype = getattr(block, "type", None)
if btype == "text":
text_chunks.append(block.text)
elif btype == "tool_use":
tool_uses.append(block)
# Копим текст ассистента (может быть между tool-вызовами в новых моделях)
final_text += "".join(text_chunks)
# Добавляем ответ ассистента в api_messages (как есть)
# Это ВАЖНО: для tool_result следующим сообщением assistant content должен быть сохранён
# ровно как вернул API, чтобы tool_use_id совпал.
assistant_content = [
# Конвертируем объекты anthropic SDK в dict-представление
b.model_dump() if hasattr(b, "model_dump") else dict(b.__dict__)
for b in resp.content
]
api_messages.append({"role": "assistant", "content": assistant_content})
assistant_blocks_accumulated.extend(assistant_content)
print(
f"🧠 round {round_i + 1} {time.time() - round_start:.2f}s · "
f"stop={resp.stop_reason} · in={usage.input_tokens} out={usage.output_tokens} "
f"cache_r={getattr(usage, 'cache_read_input_tokens', 0) or 0}"
)
if resp.stop_reason == "tool_use" and tool_uses:
# Выполняем все запрошенные tools, собираем tool_result блоки
tool_results = []
for tu in tool_uses:
name = tu.name
tu_id = tu.id
params = tu.input or {}
print(f"🔧 Tool: {name}({params})")
result = execute_tool(name, params, agent_id)
# Упаковываем результат в JSON-строку (Claude ожидает string в tool_result content)
import json as _json
result_str = _json.dumps(result, ensure_ascii=False)
print(f"{result_str[:200]}")
tool_results.append({
"type": "tool_result",
"tool_use_id": tu_id,
"content": result_str,
})
# Добавляем user-message с результатами tools
api_messages.append({"role": "user", "content": tool_results})
# Продолжаем цикл — Claude обработает результаты и либо вызовет ещё, либо выдаст текст
continue
# stop_reason="end_turn" / "max_tokens" / "stop_sequence" — готов финальный ответ
break
elapsed = time.time() - total_start
print(
f"🧠 Claude {ANTHROPIC_MODEL} total {elapsed:.2f}s · "
f"in={total_in} out={total_out} "
f"cache_r={total_cache_r} cache_w={total_cache_w}"
)
except anthropic.APIConnectionError:
log.exception("Anthropic API connection error")
msg = "Не могу связаться с Клодом."
play_error_sound()
notifier.error(msg, agent_id)
_speak_if_local(msg)
return msg
except anthropic.APITimeoutError:
log.exception("Anthropic timeout")
msg = "Клод не ответил вовремя."
play_error_sound()
notifier.error(msg, agent_id)
_speak_if_local(msg)
return msg
except anthropic.APIStatusError as e:
status = getattr(e, "status_code", "?")
log.exception(f"Anthropic API status {status}")
msg = "Ошибка Клода."
play_error_sound()
notifier.error(msg, agent_id)
_speak_if_local(msg)
return msg
except Exception as e:
log.exception(f"Неожиданная ошибка Claude: {e}")
msg = "Что-то сломалось."
play_error_sound()
notifier.error(msg, agent_id)
_speak_if_local(msg)
return msg
if not final_text:
msg = "Не получил ответ."
notifier.error(msg, agent_id)
_speak_if_local(msg)
return msg
# Сохраняем полный ассистентский turn (включая tool_use / tool_result блоки).
# Это критично чтобы Claude помнил что он реально делал инструментами —
# иначе на следующем turn'e он может галлюцинировать действия («отменил таймер»)
# не вызывая реальные tools.
# api_messages к концу содержит: [...history_before_user, user(text), ...turns]
# где history уже включает новый user. Нам надо добавить всё после user msg.
initial_user_idx = len(history) - 1 # позиция текущего user msg в api_messages
new_turns = api_messages[initial_user_idx + 1:]
for turn in new_turns:
history.append({
"role": turn["role"],
"content": _strip_cache_control(turn["content"]),
})
save_history(agent_id, history)
result = clean_for_speech(strip_fillers(final_text))
_speak_if_local(result)
return result