Initial commit: Cosmo voice assistant

Полностью локальный голосовой ассистент на Python.

Стек:
- Wake word: openWakeWord (onnxruntime)
- STT: RealtimeSTT + faster-whisper + Silero VAD (CUDA)
- LLM-агент: smolagents ToolCallingAgent + Ollama qwen2.5:7b
- TTS: Silero V4 (torch.hub) + sounddevice
- Shell: Git Bash (Windows) / bash (macOS)

Поддерживает Windows и macOS. Агент с памятью и tool calling —
находит программы самостоятельно, запоминает пути, выполняет
произвольные shell-команды.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
d.klimov
2026-04-10 15:58:12 +03:00
commit 6010816f1d
23 changed files with 1969 additions and 0 deletions

0
cosmo/__init__.py Normal file
View File

89
cosmo/agent.py Normal file
View File

@@ -0,0 +1,89 @@
"""
Агент на базе smolagents + Ollama.
Использует ToolCallingAgent — LLM вызывает инструменты через JSON tool calling.
Продолжает работу пока задача не решена (до max_steps).
"""
import os
import platform
from loguru import logger
from smolagents import ToolCallingAgent, LiteLLMModel
from cosmo.memory import Memory
# Выбираем инструменты под текущую платформу
if platform.system() == "Darwin" or os.environ.get("COSMO_PLATFORM") == "mac":
from cosmo.tools_mac import ALL_TOOLS, set_memory
_PLATFORM_NOTE = "macOS. Используй bash, 'open -a AppName' для запуска приложений, mdfind для поиска файлов."
else:
from cosmo.tools import ALL_TOOLS, set_memory
_PLATFORM_NOTE = "Windows. Используй Git Bash, 'start' для запуска приложений."
SYSTEM_PROMPT = f"""Ты — Cosmo, умный голосовой ассистент. Платформа: {_PLATFORM_NOTE}
Правила:
1. Используй инструменты для выполнения задач — не выдумывай результаты
2. Если первая попытка не сработала — пробуй другой подход, не сдавайся
3. Перед поиском программы — проверь память (memory_get), может путь уже известен
4. Если нашёл путь к программе — сохрани в память (memory_set) чтобы не искать повторно
5. Отвечай коротко на русском языке — пользователь слушает голосом, не читает
Факты из памяти о пользователе и системе:
{memory_facts}
"""
class Agent:
def __init__(self, config: dict, memory: Memory):
self.memory = memory
self._cfg = config["ollama"]
# Передаём память в инструменты
set_memory(memory)
model_id = f"ollama/{self._cfg['model']}"
logger.info(f"Инициализирую smolagents с моделью {model_id}")
self._model = LiteLLMModel(
model_id=model_id,
api_base=self._cfg["base_url"],
temperature=self._cfg.get("temperature", 0.2),
max_tokens=self._cfg.get("max_tokens", 1024),
)
self._agent = ToolCallingAgent(
tools=ALL_TOOLS,
model=self._model,
max_steps=self._cfg.get("max_agent_steps", 10),
verbosity_level=1,
)
logger.info("Агент готов")
def run(self, user_input: str) -> str:
"""
Обработать команду пользователя.
Возвращает финальный текст ответа для TTS.
"""
logger.info(f"Агент: '{user_input}'")
# Сохраняем в историю
self.memory.add_message("user", user_input)
# Формируем промпт с текущей памятью
system = SYSTEM_PROMPT.format(memory_facts=self.memory.facts_as_text())
# smolagents принимает задачу и опциональный системный промпт
try:
result = self._agent.run(
user_input,
additional_args={"system_prompt_override": system},
)
response = str(result).strip() if result else "Готово"
except Exception as e:
logger.error(f"Ошибка агента: {e}")
response = "Произошла ошибка при выполнении команды"
self.memory.add_message("assistant", response)
logger.info(f"Агент ответил: '{response}'")
return response

144
cosmo/main.py Normal file
View File

@@ -0,0 +1,144 @@
"""
Cosmo — локальный голосовой ассистент.
Стек: openWakeWord → RealtimeSTT (Whisper + Silero VAD) → smolagents + Ollama → RealtimeTTS (Silero)
Запуск:
bash run.sh
python cosmo/main.py
"""
import sys
import os
import argparse
import threading
import yaml
# Указываем pydub где искать ffmpeg (установлен через imageio-ffmpeg)
try:
import imageio_ffmpeg
from pydub import AudioSegment
AudioSegment.converter = imageio_ffmpeg.get_ffmpeg_exe()
except Exception:
pass
from loguru import logger
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cosmo.wake_word import WakeWordDetector
from cosmo.transcriber import Transcriber
from cosmo.memory import Memory
from cosmo.agent import Agent
from cosmo.tts import TTS
def load_config(path: str) -> dict:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def setup_logging(config: dict):
log_cfg = config.get("logging", {})
level = log_cfg.get("level", "INFO")
log_file = log_cfg.get("file", "logs/cosmo.log")
os.makedirs(os.path.dirname(log_file), exist_ok=True)
logger.remove()
logger.add(
sys.stderr, level=level, colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | {message}",
)
logger.add(log_file, level="DEBUG", rotation="10 MB", retention="7 days", encoding="utf-8")
class Cosmo:
def __init__(self, config: dict):
self.config = config
self.name = config["assistant"]["name"]
self._running = False
self._command_event = threading.Event()
logger.info("Инициализирую модули...")
self.tts = TTS(config)
self.transcriber = Transcriber(config)
self.memory = Memory()
self.agent = Agent(config, self.memory)
self.wake_word = WakeWordDetector(config, on_detected_callback=self._on_wake_word)
def _on_wake_word(self):
logger.info(f"=== {self.name} активирован! ===")
self._command_event.set()
def _process_command(self):
self.tts.say_async("Слушаю")
# Partial results — печатаем в лог что слышим в реальном времени
def on_partial(text):
logger.debug(f"[partial] {text}")
text = self.transcriber.record_and_transcribe(on_partial=on_partial)
if not text.strip():
self.tts.say_async("Не расслышал, попробуй ещё раз")
return
# Агент обрабатывает команду и возвращает ответ для TTS
response = self.agent.run(text)
self.tts.say(response)
def run(self):
self._running = True
self.wake_word.start()
logger.info("=" * 50)
logger.info(f" {self.name} запущен!")
logger.info(f" Скажи '{self.name}' чтобы активировать.")
logger.info(f" Ctrl+C для выхода.")
logger.info("=" * 50)
self.tts.say(f"{self.name} запущен")
try:
while self._running:
triggered = self._command_event.wait(timeout=0.5)
if triggered and self._running:
self._command_event.clear()
try:
self._process_command()
finally:
self.wake_word.resume()
except KeyboardInterrupt:
logger.info("Ctrl+C — завершаю...")
finally:
self.shutdown()
def shutdown(self):
logger.info("Завершаю работу...")
self._running = False
self.wake_word.stop()
self.transcriber.shutdown()
logger.info(f"{self.name} остановлен")
def main():
parser = argparse.ArgumentParser(description="Cosmo — голосовой ассистент")
parser.add_argument("--config", default="config/config.yaml")
args = parser.parse_args()
config_path = args.config
if not os.path.isabs(config_path):
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(project_root, config_path)
if not os.path.exists(config_path):
print(f"Конфиг не найден: {config_path}")
sys.exit(1)
config = load_config(config_path)
setup_logging(config)
Cosmo(config).run()
if __name__ == "__main__":
main()

136
cosmo/memory.py Normal file
View File

@@ -0,0 +1,136 @@
"""
Персистентная память ассистента.
Хранится в data/memory.json — LLM читает и пишет её через инструменты.
Структура:
{
"facts": {
"user.name": "Даниил",
"app.webstorm": "C:/Program Files/JetBrains/WebStorm.../webstorm64.exe",
"user.browser": "chrome",
...
},
"history": [
{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}
]
}
"""
import json
import os
import threading
from datetime import datetime
from loguru import logger
MEMORY_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"data", "memory.json"
)
class Memory:
def __init__(self, path: str = MEMORY_PATH, history_limit: int = 20):
self.path = path
self.history_limit = history_limit
self._lock = threading.Lock()
self._data = {"facts": {}, "history": []}
self._load()
# ------------------------------------------------------------------
# Персистентность
# ------------------------------------------------------------------
def _load(self):
os.makedirs(os.path.dirname(self.path), exist_ok=True)
if os.path.exists(self.path):
try:
with open(self.path, "r", encoding="utf-8") as f:
self._data = json.load(f)
logger.info(
f"Память загружена: {len(self._data.get('facts', {}))} фактов, "
f"{len(self._data.get('history', []))} сообщений в истории"
)
except Exception as e:
logger.warning(f"Не удалось загрузить память: {e}. Начинаю с чистой.")
self._data = {"facts": {}, "history": []}
else:
logger.info("Файл памяти не найден — создаю новый")
def _save(self):
try:
with open(self.path, "w", encoding="utf-8") as f:
json.dump(self._data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Не удалось сохранить память: {e}")
# ------------------------------------------------------------------
# Факты (key-value долгосрочная память)
# ------------------------------------------------------------------
def get(self, key: str) -> str | None:
"""Получить факт по ключу. Возвращает None если не найден."""
with self._lock:
return self._data["facts"].get(key)
def set(self, key: str, value: str):
"""Сохранить факт. Перезаписывает если уже существует."""
with self._lock:
self._data["facts"][key] = value
self._save()
logger.debug(f"Память: сохранено [{key}] = {value!r}")
def delete(self, key: str) -> bool:
"""Удалить факт. Возвращает True если был."""
with self._lock:
existed = key in self._data["facts"]
if existed:
del self._data["facts"][key]
self._save()
return existed
def list_facts(self, prefix: str = "") -> dict:
"""Вернуть все факты, опционально отфильтрованные по префиксу."""
with self._lock:
facts = self._data["facts"]
if prefix:
return {k: v for k, v in facts.items() if k.startswith(prefix)}
return dict(facts)
def facts_as_text(self) -> str:
"""Все факты в виде читаемого текста для системного промпта."""
facts = self.list_facts()
if not facts:
return "Память пуста."
lines = [f" {k}: {v}" for k, v in sorted(facts.items())]
return "\n".join(lines)
# ------------------------------------------------------------------
# История разговора (краткосрочная, последние N сообщений)
# ------------------------------------------------------------------
def add_message(self, role: str, content: str):
"""Добавить сообщение в историю (role: user/assistant/tool)."""
with self._lock:
self._data["history"].append({
"role": role,
"content": content,
"ts": datetime.now().isoformat(timespec="seconds"),
})
# Ограничиваем длину истории
if len(self._data["history"]) > self.history_limit:
self._data["history"] = self._data["history"][-self.history_limit:]
self._save()
def get_history(self) -> list[dict]:
"""Вернуть историю в формате для LLM API (без поля ts)."""
with self._lock:
return [
{"role": m["role"], "content": m["content"]}
for m in self._data["history"]
]
def clear_history(self):
with self._lock:
self._data["history"] = []
self._save()

232
cosmo/tools.py Normal file
View File

@@ -0,0 +1,232 @@
"""
Инструменты агента для smolagents.
Каждый инструмент — функция с декоратором @tool.
smolagents автоматически генерирует схему из docstring и type hints.
"""
import os
import subprocess
import webbrowser
import urllib.parse
from loguru import logger
from smolagents import tool
from cosmo.memory import Memory
# Глобальная ссылка на память — устанавливается из agent.py
_memory: Memory | None = None
def set_memory(mem: Memory):
global _memory
_memory = mem
# ------------------------------------------------------------------
# Поиск Git Bash
# ------------------------------------------------------------------
_GIT_BASH_CANDIDATES = [
"C:/Program Files/Git/bin/bash.exe",
"C:/Program Files (x86)/Git/bin/bash.exe",
os.path.expandvars("%LOCALAPPDATA%/Programs/Git/bin/bash.exe"),
]
def _find_git_bash() -> str:
for path in _GIT_BASH_CANDIDATES:
if os.path.exists(path):
return path
try:
result = subprocess.run(["where", "bash"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
first = result.stdout.strip().splitlines()[0]
if "git" in first.lower():
return first
except Exception:
pass
return "bash"
# ------------------------------------------------------------------
# Инструменты
# ------------------------------------------------------------------
@tool
def run_shell(command: str) -> str:
"""
Выполнить команду в Git Bash и получить вывод.
Используй bash-синтаксис. Для запуска Windows-программ: start '' '/c/path/app.exe'
или cmd //c start. Для поиска программ: which, find, cmd //c where.
Args:
command: bash команда для выполнения
"""
bash = _find_git_bash()
logger.info(f"[run_shell] {command}")
try:
result = subprocess.run(
[bash, "-c", command],
capture_output=True,
text=True,
timeout=20,
encoding="utf-8",
errors="replace",
)
output = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 and stderr:
return f"[returncode={result.returncode}]\nstdout: {output}\nstderr: {stderr}"
return output if output else f"[выполнено, returncode={result.returncode}]"
except FileNotFoundError:
return "[ошибка: Git Bash не найден]"
except subprocess.TimeoutExpired:
return "[таймаут 20с]"
except Exception as e:
return f"[ошибка: {e}]"
@tool
def find_program(name: str) -> str:
"""
Найти программу или файл на Windows по имени.
Ищет в PATH, Program Files, AppData и реестре.
Используй когда не знаешь точный путь к программе.
Args:
name: имя программы без расширения, например 'webstorm' или 'chrome'
"""
stem = name.lower().strip().replace(".exe", "")
logger.info(f"[find_program] {stem}")
steps = [
f"which {stem} 2>/dev/null",
f"cmd //c where {stem} 2>/dev/null",
f"find '/c/Program Files' -iname '{stem}*.exe' 2>/dev/null | head -3",
f"find '/c/Program Files (x86)' -iname '{stem}*.exe' 2>/dev/null | head -3",
f"find \"$LOCALAPPDATA\" -iname '{stem}*.exe' 2>/dev/null | head -3",
f"cmd //c 'reg query \"HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\{stem}.exe\" /ve 2>nul'",
]
results = []
for cmd in steps:
out = run_shell(cmd)
if out and not out.startswith("[") and len(out) > 3:
results.append(out.strip())
if results:
return "Найдено:\n" + "\n".join(results)
return f"Программа '{name}' не найдена. Попробуй другое имя."
@tool
def open_browser(url: str, search: bool = False) -> str:
"""
Открыть URL в браузере или выполнить поиск в Google.
Args:
url: полный URL (https://...) или поисковый запрос если search=True
search: если True — выполнить поиск Google по тексту в url
"""
if search:
url = "https://www.google.com/search?q=" + urllib.parse.quote(url)
elif not url.startswith(("http://", "https://")):
url = "https://" + url
webbrowser.open(url)
logger.info(f"[open_browser] {url}")
return f"Открыт браузер: {url}"
@tool
def read_file(path: str) -> str:
"""
Прочитать содержимое текстового файла.
Args:
path: абсолютный путь к файлу
"""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read(8000)
except FileNotFoundError:
return f"Файл не найден: {path}"
except Exception as e:
return f"Ошибка чтения: {e}"
@tool
def write_file(path: str, content: str) -> str:
"""
Записать текст в файл (создаёт или перезаписывает).
Args:
path: абсолютный путь к файлу
content: содержимое файла
"""
try:
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return f"Файл записан: {path}"
except Exception as e:
return f"Ошибка записи: {e}"
@tool
def memory_set(key: str, value: str) -> str:
"""
Сохранить факт в долгосрочную память ассистента.
Используй для запоминания путей программ, предпочтений пользователя, часто используемых команд.
Ключи: 'app.название', 'user.имя', 'pref.что-то'.
Args:
key: ключ факта, например 'app.webstorm' или 'user.name'
value: значение для сохранения
"""
if _memory is None:
return "Память не инициализирована"
_memory.set(key, value)
return f"Запомнено: {key} = {value!r}"
@tool
def memory_get(key: str) -> str:
"""
Получить сохранённый факт из долгосрочной памяти по ключу.
Args:
key: ключ факта, например 'app.webstorm'
"""
if _memory is None:
return "Память не инициализирована"
value = _memory.get(key)
return f"{key} = {value!r}" if value else f"Факт '{key}' не найден"
@tool
def memory_list(prefix: str = "") -> str:
"""
Показать все факты из памяти, опционально по префиксу.
Args:
prefix: префикс для фильтрации, например 'app.' чтобы видеть пути программ
"""
if _memory is None:
return "Память не инициализирована"
facts = _memory.list_facts(prefix)
if not facts:
return "Память пуста" if not prefix else f"Нет фактов с префиксом '{prefix}'"
return "\n".join(f"{k}: {v}" for k, v in sorted(facts.items()))
# Список всех инструментов для передачи в агент
ALL_TOOLS = [
run_shell,
find_program,
open_browser,
read_file,
write_file,
memory_set,
memory_get,
memory_list,
]

206
cosmo/tools_mac.py Normal file
View File

@@ -0,0 +1,206 @@
"""
Инструменты агента для smolagents — macOS версия.
Отличия от Windows: нативный bash, поиск через mdfind/Spotlight,
запуск приложений через 'open -a', нет реестра.
"""
import os
import subprocess
import webbrowser
import urllib.parse
from loguru import logger
from smolagents import tool
from cosmo.memory import Memory
_memory: Memory | None = None
def set_memory(mem: Memory):
global _memory
_memory = mem
# ------------------------------------------------------------------
# Инструменты
# ------------------------------------------------------------------
@tool
def run_shell(command: str) -> str:
"""
Выполнить команду в bash и получить вывод.
На macOS доступны все стандартные unix-команды.
Для запуска приложений используй: open -a "AppName" или open /path/to/app.
Для поиска файлов: mdfind, find, which.
Args:
command: bash команда для выполнения
"""
logger.info(f"[run_shell] {command}")
try:
result = subprocess.run(
["bash", "-c", command],
capture_output=True,
text=True,
timeout=20,
encoding="utf-8",
errors="replace",
)
output = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0 and stderr:
return f"[returncode={result.returncode}]\nstdout: {output}\nstderr: {stderr}"
return output if output else f"[выполнено, returncode={result.returncode}]"
except subprocess.TimeoutExpired:
return "[таймаут 20с]"
except Exception as e:
return f"[ошибка: {e}]"
@tool
def find_program(name: str) -> str:
"""
Найти программу или приложение на macOS по имени.
Ищет в PATH, /Applications, ~/Applications и через Spotlight (mdfind).
Args:
name: имя программы, например 'webstorm', 'chrome', 'cursor'
"""
stem = name.strip()
logger.info(f"[find_program] {stem}")
steps = [
# which — ищет в PATH
f"which {stem} 2>/dev/null",
# Spotlight — самый надёжный способ найти .app
f"mdfind 'kMDItemKind == \"Application\"' | grep -i '{stem}' | head -3",
# /Applications напрямую
f"find /Applications -maxdepth 2 -iname '*{stem}*.app' 2>/dev/null | head -3",
# ~/Applications
f"find ~/Applications -maxdepth 2 -iname '*{stem}*.app' 2>/dev/null | head -3",
# Homebrew bin
f"find /opt/homebrew/bin /usr/local/bin -iname '*{stem}*' 2>/dev/null | head -3",
]
results = []
for cmd in steps:
out = run_shell(cmd)
if out and not out.startswith("[") and len(out) > 3:
results.append(out.strip())
if results:
return "Найдено:\n" + "\n".join(results)
return f"Программа '{name}' не найдена."
@tool
def open_browser(url: str, search: bool = False) -> str:
"""
Открыть URL в браузере или выполнить поиск в Google.
Args:
url: полный URL (https://...) или поисковый запрос если search=True
search: если True — выполнить поиск Google по тексту в url
"""
if search:
url = "https://www.google.com/search?q=" + urllib.parse.quote(url)
elif not url.startswith(("http://", "https://")):
url = "https://" + url
webbrowser.open(url)
logger.info(f"[open_browser] {url}")
return f"Открыт браузер: {url}"
@tool
def read_file(path: str) -> str:
"""
Прочитать содержимое текстового файла.
Args:
path: абсолютный путь к файлу
"""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read(8000)
except FileNotFoundError:
return f"Файл не найден: {path}"
except Exception as e:
return f"Ошибка чтения: {e}"
@tool
def write_file(path: str, content: str) -> str:
"""
Записать текст в файл (создаёт или перезаписывает).
Args:
path: абсолютный путь к файлу
content: содержимое файла
"""
try:
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return f"Файл записан: {path}"
except Exception as e:
return f"Ошибка записи: {e}"
@tool
def memory_set(key: str, value: str) -> str:
"""
Сохранить факт в долгосрочную память ассистента.
Используй для запоминания путей программ, предпочтений пользователя.
Ключи: 'app.название', 'user.имя', 'pref.что-то'.
Args:
key: ключ факта, например 'app.webstorm'
value: значение для сохранения
"""
if _memory is None:
return "Память не инициализирована"
_memory.set(key, value)
return f"Запомнено: {key} = {value!r}"
@tool
def memory_get(key: str) -> str:
"""
Получить сохранённый факт из долгосрочной памяти по ключу.
Args:
key: ключ факта, например 'app.webstorm'
"""
if _memory is None:
return "Память не инициализирована"
value = _memory.get(key)
return f"{key} = {value!r}" if value else f"Факт '{key}' не найден"
@tool
def memory_list(prefix: str = "") -> str:
"""
Показать все факты из памяти, опционально по префиксу.
Args:
prefix: префикс для фильтрации, например 'app.'
"""
if _memory is None:
return "Память не инициализирована"
facts = _memory.list_facts(prefix)
if not facts:
return "Память пуста" if not prefix else f"Нет фактов с префиксом '{prefix}'"
return "\n".join(f"{k}: {v}" for k, v in sorted(facts.items()))
ALL_TOOLS = [
run_shell,
find_program,
open_browser,
read_file,
write_file,
memory_set,
memory_get,
memory_list,
]

87
cosmo/transcriber.py Normal file
View File

@@ -0,0 +1,87 @@
"""
STT модуль на базе RealtimeSTT.
Использует faster-whisper + Silero VAD под капотом.
Поддерживает стриминг — partial transcriptions во время речи.
"""
import threading
from RealtimeSTT import AudioToTextRecorder
from loguru import logger
class Transcriber:
def __init__(self, config: dict):
whisper_cfg = config["whisper"]
audio_cfg = config["audio"]
self._recorder: AudioToTextRecorder | None = None
self._config = {
"model": whisper_cfg["model_size"],
"language": whisper_cfg["language"],
"device": whisper_cfg["device"],
"compute_type": whisper_cfg["compute_type"],
# Silero VAD параметры
"silero_sensitivity": 0.4,
"webrtc_sensitivity": 3,
"post_speech_silence_duration": audio_cfg["silence_duration"],
"min_length_of_recording": 0.3,
"min_gap_between_recordings": 0.01,
# Отключаем wake word в RealtimeSTT — используем свой
"wakeword_backend": "none",
# Не запускать в режиме непрерывного прослушивания
"use_microphone": True,
"spinner": False,
"level": 0, # минимальный лог уровень внутри RealtimeSTT
}
logger.info(
f"Инициализирую RealtimeSTT: модель={whisper_cfg['model_size']}, "
f"device={whisper_cfg['device']}, compute={whisper_cfg['compute_type']}"
)
self._init_recorder()
def _init_recorder(self):
try:
self._recorder = AudioToTextRecorder(**self._config)
logger.info("RealtimeSTT готов")
except Exception as e:
logger.error(f"Ошибка инициализации RealtimeSTT: {e}")
raise
def record_and_transcribe(self, on_partial: callable = None) -> str:
"""
Записывает команду и транскрибирует.
on_partial(text) — опциональный колбэк для частичных результатов.
Возвращает финальный текст.
"""
if self._recorder is None:
self._init_recorder()
result_holder = []
done_event = threading.Event()
def on_text(text: str):
result_holder.append(text)
done_event.set()
# Partial results — показываем что слышим в реальном времени
if on_partial:
self._recorder.on_realtime_transcription_update = on_partial
logger.info("Слушаю команду...")
self._recorder.text(on_text)
done_event.wait(timeout=12.0)
text = result_holder[0].strip() if result_holder else ""
if text:
logger.info(f"Транскрипция: '{text}'")
else:
logger.info("Команда не распознана (тишина или таймаут)")
return text
def shutdown(self):
if self._recorder:
try:
self._recorder.shutdown()
except Exception:
pass

79
cosmo/tts.py Normal file
View File

@@ -0,0 +1,79 @@
"""
TTS модуль на базе Silero V4 (torch.hub) + sounddevice.
Silero — лучший русскоязычный офлайн TTS.
Модель скачивается автоматически при первом запуске (~50 MB).
"""
import threading
import numpy as np
import sounddevice as sd
from loguru import logger
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
class TTS:
def __init__(self, config: dict):
tts_cfg = config.get("tts", {})
self.enabled = tts_cfg.get("enabled", True)
self.speaker = tts_cfg.get("silero_speaker", "xenia")
self.sample_rate = tts_cfg.get("sample_rate", 48000)
self._lock = threading.Lock()
self._model = None
if not self.enabled:
return
if not TORCH_AVAILABLE:
logger.warning("torch не установлен — TTS отключён")
self.enabled = False
return
self._load_model()
def _load_model(self):
try:
logger.info(f"Загружаю Silero TTS (голос: {self.speaker}, {self.sample_rate} Hz)...")
# torch.hub кэширует модель в ~/.cache/torch/hub
model, _ = torch.hub.load(
repo_or_dir="snakers4/silero-models",
model="silero_tts",
language="ru",
speaker="v4_ru",
trust_repo=True,
)
self._model = model
logger.info("Silero TTS готов")
except Exception as e:
logger.error(f"Ошибка загрузки Silero TTS: {e}")
logger.warning("TTS отключён")
self.enabled = False
def say(self, text: str):
"""Произнести текст синхронно."""
if not self.enabled or self._model is None:
logger.info(f"[TTS]: {text}")
return
logger.debug(f"TTS: '{text}'")
with self._lock:
try:
with torch.no_grad():
audio = self._model.apply_tts(
text=text,
speaker=self.speaker,
sample_rate=self.sample_rate,
)
audio_np = audio.numpy() if hasattr(audio, "numpy") else np.array(audio)
sd.play(audio_np, samplerate=self.sample_rate)
sd.wait()
except Exception as e:
logger.error(f"Ошибка TTS: {e}")
def say_async(self, text: str):
"""Произнести текст асинхронно."""
t = threading.Thread(target=self.say, args=(text,), daemon=True)
t.start()

139
cosmo/wake_word.py Normal file
View File

@@ -0,0 +1,139 @@
"""
Wake word detector для Cosmo.
Слушает микрофон непрерывно, детектирует слово "cosmo" через openwakeword.
"""
import os
import glob
import threading
import queue
import numpy as np
import sounddevice as sd
from openwakeword.model import Model
from loguru import logger
class WakeWordDetector:
def __init__(self, config: dict, on_detected_callback):
"""
config: секция audio + assistant из config.yaml
on_detected_callback: вызывается без аргументов при детекте wake word
"""
self.sample_rate = config["audio"].get("sample_rate", 16000)
self.chunk_duration = config["audio"].get("chunk_duration", 0.08)
self.chunk_size = int(self.sample_rate * self.chunk_duration)
self.wake_word = config["assistant"]["wake_word"]
self.on_detected = on_detected_callback
self._audio_queue = queue.Queue()
self._running = False
self._paused = False
self._thread = None
# Порог уверенности для срабатывания (0.0 1.0)
self.threshold = 0.5
logger.info("Загружаю wake word модель openwakeword...")
# Ищем кастомную модель в папке models/
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
custom_models = glob.glob(os.path.join(project_root, "models", "*.onnx"))
if custom_models:
model_path = custom_models[0]
model_name = os.path.basename(model_path)
try:
self.model = Model(
wakeword_models=[model_path],
inference_framework="onnx",
)
logger.info(f"Кастомная wake word модель загружена: {model_name}")
except Exception as e:
logger.error(f"Не удалось загрузить кастомную модель {model_name}: {e}")
raise
else:
# Fallback на встроенную hey_jarvis
try:
self.model = Model(
wakeword_models=["hey_jarvis"],
inference_framework="onnx",
)
logger.info("Wake word модель 'hey_jarvis' загружена (onnx)")
logger.warning(
"Кастомная модель не найдена в папке models/. "
"Активация по слову 'Hey Jarvis'. "
"Запусти train_wakeword/train.sh чтобы обучить модель на 'Hey Cosmo'."
)
except Exception as e:
logger.error(f"Не удалось загрузить wake word модель: {e}")
raise
# ------------------------------------------------------------------
# Публичный интерфейс
# ------------------------------------------------------------------
def start(self):
"""Запустить детектор в фоновом потоке."""
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
logger.info(f"Wake word детектор запущен. Жду слово '{self.wake_word}'...")
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=2)
def pause(self):
"""Приостановить детект (пока идёт запись команды)."""
self._paused = True
def resume(self):
"""Возобновить детект после записи команды."""
# Очищаем очередь, чтобы не срабатывать на эхо
while not self._audio_queue.empty():
try:
self._audio_queue.get_nowait()
except queue.Empty:
break
self._paused = False
logger.debug("Wake word детектор возобновлён")
# ------------------------------------------------------------------
# Внутренняя логика
# ------------------------------------------------------------------
def _audio_callback(self, indata, frames, time_info, status):
if status:
logger.debug(f"sounddevice статус: {status}")
if not self._paused:
# Копируем данные, т.к. буфер перезаписывается
self._audio_queue.put(indata[:, 0].copy())
def _run(self):
with sd.InputStream(
samplerate=self.sample_rate,
channels=1,
dtype="float32",
blocksize=self.chunk_size,
callback=self._audio_callback,
):
while self._running:
try:
chunk = self._audio_queue.get(timeout=0.5)
except queue.Empty:
continue
# openwakeword ожидает int16 PCM
chunk_int16 = (chunk * 32767).astype(np.int16)
prediction = self.model.predict(chunk_int16)
for model_name, score in prediction.items():
if score >= self.threshold:
logger.info(
f"Wake word задетектирован! Модель={model_name}, "
f"уверенность={score:.2f}"
)
self.pause()
self.on_detected()
break