Files
home-voice-assistant/satellite/llm.py
2026-04-13 18:33:19 +03:00

167 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import re
import requests
from datetime import date
from .config import AGENTS, log
from .text import clean_for_speech, find_sentence_end
from .tts import speak, play_error_sound
SYSTEM_PROMPT = (
"Отвечай кратко, 1-2 предложения, без markdown, без эмодзи. "
"Ответ будет озвучен голосом, поэтому: "
"числа пиши прописью (двадцать три, а не 23), "
"единицы измерения пиши полностью (километров в час, а не км/ч), "
"не используй спецсимволы (+, -, /, %, °) — заменяй словами (плюс, минус, из, процентов, градусов). "
"Температуру пиши так: 'плюс девять градусов', а не '+9°C'."
)
MAX_HISTORY = int(os.getenv("MAX_HISTORY", "20"))
# "stream" — режем по предложениям (быстро, но рваная интонация)
# "full" — собираем весь ответ, потом TTS (естественно, но пауза перед началом)
TTS_MODE = os.getenv("TTS_MODE", "full")
RESET_PATTERNS = re.compile(
r"(начни|начать|создай|открой|давай).{0,10}(новую|новый|чистую|чистый).{0,10}(сессию|сессия|диалог|разговор|чат)"
r"|"
r"(сбрось|очисти|обнови).{0,10}(сессию|диалог|разговор|чат|историю|контекст)",
re.IGNORECASE,
)
class Conversation:
"""Хранит историю сообщений — одна сессия на день"""
def __init__(self, agent_id: str = "cosmo"):
self.agent_id = agent_id
self.created_date = date.today()
self.messages = [{"role": "system", "content": SYSTEM_PROMPT}]
def is_expired(self) -> bool:
return date.today() != self.created_date
def reset(self):
self.created_date = date.today()
self.messages = [{"role": "system", "content": SYSTEM_PROMPT}]
def add_user(self, text: str):
self.messages.append({"role": "user", "content": text})
self._trim()
def add_assistant(self, text: str):
self.messages.append({"role": "assistant", "content": text})
self._trim()
def _trim(self):
if len(self.messages) > MAX_HISTORY + 1:
self.messages = [self.messages[0]] + self.messages[-(MAX_HISTORY):]
def is_reset_command(text: str) -> bool:
return bool(RESET_PATTERNS.search(text))
def ask_agent_stream(text: str, conv: "Conversation | None" = None, agent_id: str = "cosmo") -> str:
if conv is None:
conv = Conversation(agent_id)
conv.add_user(text)
cfg = AGENTS.get(agent_id, AGENTS["cosmo"])
gateway_url = cfg["gateway_url"]
session = cfg["session"]
agent = cfg["agent"]
try:
resp = session.post(
f"{gateway_url}/v1/chat/completions",
headers={
"x-openclaw-model": cfg["voice_model"],
"x-openclaw-session-key": cfg["session_key"],
},
json={
"model": agent,
"stream": True,
"messages": conv.messages,
"max_tokens": 150,
},
stream=True,
timeout=60,
)
resp.raise_for_status()
except requests.ConnectionError:
log.exception("Gateway недоступен")
msg = "Не могу связаться с сервером, попробуй ещё раз."
print(f"⚠️ {msg}")
play_error_sound()
speak(msg, agent_id)
return msg
except requests.Timeout:
log.exception("Gateway таймаут")
msg = "Сервер не ответил вовремя, попробуй ещё раз."
print(f"⚠️ {msg}")
play_error_sound()
speak(msg, agent_id)
return msg
except requests.HTTPError:
log.exception(f"Gateway HTTP ошибка {resp.status_code}")
msg = "Ошибка сервера, попробуй ещё раз."
print(f"⚠️ Gateway {resp.status_code}: {resp.text}")
play_error_sound()
speak(msg, agent_id)
return msg
full_text = ""
buffer = ""
try:
for line in resp.iter_lines():
if not line or line == b"data: [DONE]":
continue
if line.startswith(b"data: "):
try:
chunk = json.loads(line[6:])
delta = chunk["choices"][0]["delta"].get("content", "")
if not delta:
continue
full_text += delta
buffer += delta
if TTS_MODE == "stream":
last_punct = find_sentence_end(buffer, min_len=120)
if last_punct > -1:
sentence = clean_for_speech(buffer[:last_punct + 1])
if sentence.strip():
print(f"🔊 Говорю: {sentence}")
speak(sentence, agent_id)
buffer = buffer[last_punct + 1:].lstrip()
except (json.JSONDecodeError, KeyError, IndexError):
continue
except Exception as e:
log.exception("Ошибка при чтении стрима")
print(f"⚠️ Стрим прервался: {e}")
if not full_text:
msg = "Не получил ответ, попробуй ещё раз."
speak(msg, agent_id)
return msg
result = clean_for_speech(full_text)
if TTS_MODE == "full":
# LLM уже доримил — озвучиваем весь ответ одним куском с цельной интонацией
if result.strip():
print(f"🔊 Говорю: {result}")
speak(result, agent_id)
else:
# остаток буфера в stream-режиме
if buffer.strip():
tail = clean_for_speech(buffer)
if tail:
speak(tail, agent_id)
conv.add_assistant(full_text)
return result