openclaw-voice-assistant/hotword_detector.py
2026-03-01 20:42:13 -08:00

282 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Hotword Detector
Detects wake words: "Hey Osiris" / "你好 Osiris"
Supports:
- Porcupine (PicoVoice) for accurate hotword detection
- Custom keyword spotting
- Bilingual support (English/Mandarin)
"""
import os
import json
import logging
import struct
import wave
from typing import Optional, Callable, List
from pathlib import Path
# Try to import Porcupine
HAS_PORCUPINE = False
porcupine_instance = None
try:
import pvporcupine
HAS_PORCUPINE = True
logging.info(f"Porcupine module found (version: {pvporcupine.__version__})")
except ImportError:
logging.warning("Porcupine not installed. Install with: pip install pvporcupine")
# Try to import WebRTC VAD
HAS_VAD = False
try:
import webrtcvad
HAS_VAD = True
except ImportError:
logging.warning("WebRTC VAD not installed")
logger = logging.getLogger(__name__)
class HotwordDetector:
"""
Hotword detection with support for "Hey Osiris" in English and Mandarin.
"""
def __init__(self, config_path: str = "hotword_config.json"):
self.config = self._load_config(config_path)
self.audio_config = self.config.get("audio", {
"sample_rate": 16000,
"frame_length": 512
})
self.hotwords = self.config.get("hotwords", [])
self.is_running = False
self.callback = None
# Porcupine setup
self.porcupine = None
self.keyword_index = -1
if HAS_PORCUPINE:
self._init_porcupine()
# VAD setup
self.vad = None
if HAS_VAD:
self.vad = webrtcvad.Vad(2) # Aggressiveness level 2
logger.info(f"HotwordDetector initialized (Porcupine: {HAS_PORCUPINE})")
def _load_config(self, config_path: str) -> dict:
"""Load configuration."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
"hotwords": [
{
"keyword": "hey osiris",
"keyword_zh": "你好 osiris",
"sensitivity": 0.5
}
],
"audio": {
"sample_rate": 16000,
"frame_length": 512
}
}
def _init_porcupine(self):
"""Initialize Porcupine hotword detection."""
if not HAS_PORCUPINE:
return
try:
# Get API key from environment or .env file
api_key = os.getenv('PICOVOICE_API_KEY')
if not api_key:
# Try to load from .env file
env_file = Path(__file__).parent / '.env'
if env_file.exists():
with open(env_file) as f:
for line in f:
if line.startswith('PICOVOICE_API_KEY='):
api_key = line.split('=')[1].strip()
break
if not api_key:
logger.warning("Porcupine API key not found. Set PICOVOICE_API_KEY environment variable.")
self.porcupine = None
return
# Initialize Porcupine with the built-in "hey osiris" keyword
self.porcupine = pvporcupine.create(
access_key=api_key,
keywords=["hey osiris"],
sensitivities=[0.5]
)
self.keyword_index = 0
logger.info("✓ Porcupine initialized with 'Hey Osiris'")
except Exception as e:
logger.warning(f"Porcupine initialization failed: {e}")
logger.warning("Falling back to simple detection")
self.porcupine = None
def set_callback(self, callback: Callable[[], None]):
"""Set callback function for when hotword is detected."""
self.callback = callback
def detect(self, timeout: int = None) -> Optional[str]:
"""
Start detection and wait for hotword.
Args:
timeout: Maximum time to wait in seconds (None = infinite)
Returns:
Detected hotword or None
"""
if not self.porcupine:
logger.warning("Porcupine not available, using simple detection")
return self._simple_detect(timeout)
return self._porcupine_detect(timeout)
def _porcupine_detect(self, timeout: int = None) -> Optional[str]:
"""Detect using Porcupine."""
if not self.porcupine:
return None
import pyaudio
pa = pyaudio.PyAudio()
try:
# Open audio stream
stream = pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length
)
logger.info("Listening for 'Hey Osiris'...")
self.is_running = True
start_time = None
if timeout:
import time
start_time = time.time()
while self.is_running:
# Check timeout
if timeout and start_time:
if time.time() - start_time > timeout:
logger.info("Hotword detection timeout")
break
# Read audio frame
pcm = stream.read(
self.porcupine.frame_length,
exception_on_overflow=False
)
# Convert to signed 16-bit integers
pcm = struct.unpack_from(
f"h{self.porcupine.frame_length}",
pcm
)
# Process frame
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
logger.info("🎯 Hotword detected!")
if self.callback:
self.callback()
return "hey osiris"
except KeyboardInterrupt:
logger.info("Detection interrupted")
except Exception as e:
logger.error(f"Detection error: {e}")
finally:
stream.close()
pa.terminate()
self.is_running = False
return None
def _simple_detect(self, timeout: int = None) -> Optional[str]:
"""
Simple voice activity detection (fallback).
Detects any speech as hotword.
"""
logger.warning("Using simple voice detection (not recommended)")
# This is a placeholder - in production you'd use:
# - Snowboy
# - Custom trained model
# - Or just use Porcupine
return None
def stop(self):
"""Stop detection."""
self.is_running = False
if self.porcupine:
self.porcupine.delete()
def create_custom_hotword(self, keyword: str, output_path: str):
"""
Create custom hotword model (Porcupine only).
Args:
keyword: Keyword phrase
output_path: Path to save the model
"""
if not HAS_PORCUPINE:
raise RuntimeError("Porcupine not available")
# This would require Porcupine training API
logger.warning("Custom hotword training not yet implemented")
class SimpleHotwordDetector:
"""Simple energy-based hotword detection."""
def __init__(self, keyword: str = "hey osiris"):
self.keyword = keyword
def detect(self, timeout: int = None) -> Optional[str]:
"""Simple detection - not reliable."""
logger.warning("Simple detection is not reliable. Install Porcupine for best results.")
return None
def main():
"""Test hotword detection."""
import time
def on_hotword():
print("✨ Hotword detected!")
detector = HotwordDetector()
detector.set_callback(on_hotword)
print("Listening for hotword... (Ctrl+C to stop)")
try:
while True:
result = detector.detect(timeout=30)
if result:
print(f"Detected: {result}")
time.sleep(1)
except KeyboardInterrupt:
print("\nStopped")
detector.stop()
if __name__ == '__main__':
main()