""" STT модуль на базе RealtimeSTT. Использует faster-whisper + Silero VAD под капотом. Поддерживает стриминг — partial transcriptions во время речи. """ import threading from RealtimeSTT import AudioToTextRecorder from loguru import logger class Transcriber: def __init__(self, config: dict): whisper_cfg = config["whisper"] audio_cfg = config["audio"] self._recorder: AudioToTextRecorder | None = None self._config = { "model": whisper_cfg["model_size"], "language": whisper_cfg["language"], "device": whisper_cfg["device"], "compute_type": whisper_cfg["compute_type"], # Подсказка для Whisper — улучшает распознавание русского "initial_prompt": whisper_cfg.get("initial_prompt", ""), # Silero VAD параметры "silero_sensitivity": 0.4, "webrtc_sensitivity": 3, "post_speech_silence_duration": audio_cfg["silence_duration"], "min_length_of_recording": 0.5, "min_gap_between_recordings": 0.01, # Отключаем wake word в RealtimeSTT — используем свой "wakeword_backend": "none", # Не запускать в режиме непрерывного прослушивания "use_microphone": True, "spinner": False, "level": 0, # минимальный лог уровень внутри RealtimeSTT } logger.info( f"Инициализирую RealtimeSTT: модель={whisper_cfg['model_size']}, " f"device={whisper_cfg['device']}, compute={whisper_cfg['compute_type']}" ) self._init_recorder() def _init_recorder(self): try: self._recorder = AudioToTextRecorder(**self._config) logger.info("RealtimeSTT готов") except Exception as e: logger.error(f"Ошибка инициализации RealtimeSTT: {e}") raise def record_and_transcribe(self, on_partial: callable = None) -> str: """ Записывает команду и транскрибирует. on_partial(text) — опциональный колбэк для частичных результатов. Возвращает финальный текст. """ if self._recorder is None: self._init_recorder() result_holder = [] done_event = threading.Event() def on_text(text: str): result_holder.append(text) done_event.set() # Partial results — показываем что слышим в реальном времени if on_partial: self._recorder.on_realtime_transcription_update = on_partial logger.info("Слушаю команду...") self._recorder.text(on_text) done_event.wait(timeout=12.0) text = result_holder[0].strip() if result_holder else "" if text: logger.info(f"Транскрипция: '{text}'") else: logger.info("Команда не распознана (тишина или таймаут)") return text def shutdown(self): if self._recorder: try: self._recorder.shutdown() except Exception: pass