282 lines
8.5 KiB
Python
Executable File
282 lines
8.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Hotword Detector
|
|
Detects wake words: "Hey Osiris" / "你好 Osiris"
|
|
Supports:
|
|
- Porcupine (PicoVoice) for accurate hotword detection
|
|
- Custom keyword spotting
|
|
- Bilingual support (English/Mandarin)
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import struct
|
|
import wave
|
|
from typing import Optional, Callable, List
|
|
from pathlib import Path
|
|
|
|
# Try to import Porcupine
|
|
HAS_PORCUPINE = False
|
|
porcupine_instance = None
|
|
try:
|
|
import pvporcupine
|
|
HAS_PORCUPINE = True
|
|
logging.info(f"Porcupine module found (version: {pvporcupine.__version__})")
|
|
except ImportError:
|
|
logging.warning("Porcupine not installed. Install with: pip install pvporcupine")
|
|
|
|
# Try to import WebRTC VAD
|
|
HAS_VAD = False
|
|
try:
|
|
import webrtcvad
|
|
HAS_VAD = True
|
|
except ImportError:
|
|
logging.warning("WebRTC VAD not installed")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HotwordDetector:
|
|
"""
|
|
Hotword detection with support for "Hey Osiris" in English and Mandarin.
|
|
"""
|
|
|
|
def __init__(self, config_path: str = "hotword_config.json"):
|
|
self.config = self._load_config(config_path)
|
|
self.audio_config = self.config.get("audio", {
|
|
"sample_rate": 16000,
|
|
"frame_length": 512
|
|
})
|
|
self.hotwords = self.config.get("hotwords", [])
|
|
self.is_running = False
|
|
self.callback = None
|
|
|
|
# Porcupine setup
|
|
self.porcupine = None
|
|
self.keyword_index = -1
|
|
|
|
if HAS_PORCUPINE:
|
|
self._init_porcupine()
|
|
|
|
# VAD setup
|
|
self.vad = None
|
|
if HAS_VAD:
|
|
self.vad = webrtcvad.Vad(2) # Aggressiveness level 2
|
|
|
|
logger.info(f"HotwordDetector initialized (Porcupine: {HAS_PORCUPINE})")
|
|
|
|
def _load_config(self, config_path: str) -> dict:
|
|
"""Load configuration."""
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
return {
|
|
"hotwords": [
|
|
{
|
|
"keyword": "hey osiris",
|
|
"keyword_zh": "你好 osiris",
|
|
"sensitivity": 0.5
|
|
}
|
|
],
|
|
"audio": {
|
|
"sample_rate": 16000,
|
|
"frame_length": 512
|
|
}
|
|
}
|
|
|
|
def _init_porcupine(self):
|
|
"""Initialize Porcupine hotword detection."""
|
|
if not HAS_PORCUPINE:
|
|
return
|
|
|
|
try:
|
|
# Get API key from environment or .env file
|
|
api_key = os.getenv('PICOVOICE_API_KEY')
|
|
if not api_key:
|
|
# Try to load from .env file
|
|
env_file = Path(__file__).parent / '.env'
|
|
if env_file.exists():
|
|
with open(env_file) as f:
|
|
for line in f:
|
|
if line.startswith('PICOVOICE_API_KEY='):
|
|
api_key = line.split('=')[1].strip()
|
|
break
|
|
|
|
if not api_key:
|
|
logger.warning("Porcupine API key not found. Set PICOVOICE_API_KEY environment variable.")
|
|
self.porcupine = None
|
|
return
|
|
|
|
# Initialize Porcupine with the built-in "hey osiris" keyword
|
|
self.porcupine = pvporcupine.create(
|
|
access_key=api_key,
|
|
keywords=["hey osiris"],
|
|
sensitivities=[0.5]
|
|
)
|
|
self.keyword_index = 0
|
|
logger.info("✓ Porcupine initialized with 'Hey Osiris'")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Porcupine initialization failed: {e}")
|
|
logger.warning("Falling back to simple detection")
|
|
self.porcupine = None
|
|
|
|
def set_callback(self, callback: Callable[[], None]):
|
|
"""Set callback function for when hotword is detected."""
|
|
self.callback = callback
|
|
|
|
def detect(self, timeout: int = None) -> Optional[str]:
|
|
"""
|
|
Start detection and wait for hotword.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait in seconds (None = infinite)
|
|
|
|
Returns:
|
|
Detected hotword or None
|
|
"""
|
|
if not self.porcupine:
|
|
logger.warning("Porcupine not available, using simple detection")
|
|
return self._simple_detect(timeout)
|
|
|
|
return self._porcupine_detect(timeout)
|
|
|
|
def _porcupine_detect(self, timeout: int = None) -> Optional[str]:
|
|
"""Detect using Porcupine."""
|
|
if not self.porcupine:
|
|
return None
|
|
|
|
import pyaudio
|
|
|
|
pa = pyaudio.PyAudio()
|
|
|
|
try:
|
|
# Open audio stream
|
|
stream = pa.open(
|
|
rate=self.porcupine.sample_rate,
|
|
channels=1,
|
|
format=pyaudio.paInt16,
|
|
input=True,
|
|
frames_per_buffer=self.porcupine.frame_length
|
|
)
|
|
|
|
logger.info("Listening for 'Hey Osiris'...")
|
|
self.is_running = True
|
|
|
|
start_time = None
|
|
if timeout:
|
|
import time
|
|
start_time = time.time()
|
|
|
|
while self.is_running:
|
|
# Check timeout
|
|
if timeout and start_time:
|
|
if time.time() - start_time > timeout:
|
|
logger.info("Hotword detection timeout")
|
|
break
|
|
|
|
# Read audio frame
|
|
pcm = stream.read(
|
|
self.porcupine.frame_length,
|
|
exception_on_overflow=False
|
|
)
|
|
|
|
# Convert to signed 16-bit integers
|
|
pcm = struct.unpack_from(
|
|
f"h{self.porcupine.frame_length}",
|
|
pcm
|
|
)
|
|
|
|
# Process frame
|
|
keyword_index = self.porcupine.process(pcm)
|
|
|
|
if keyword_index >= 0:
|
|
logger.info("🎯 Hotword detected!")
|
|
if self.callback:
|
|
self.callback()
|
|
return "hey osiris"
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Detection interrupted")
|
|
except Exception as e:
|
|
logger.error(f"Detection error: {e}")
|
|
finally:
|
|
stream.close()
|
|
pa.terminate()
|
|
self.is_running = False
|
|
|
|
return None
|
|
|
|
def _simple_detect(self, timeout: int = None) -> Optional[str]:
|
|
"""
|
|
Simple voice activity detection (fallback).
|
|
Detects any speech as hotword.
|
|
"""
|
|
logger.warning("Using simple voice detection (not recommended)")
|
|
# This is a placeholder - in production you'd use:
|
|
# - Snowboy
|
|
# - Custom trained model
|
|
# - Or just use Porcupine
|
|
return None
|
|
|
|
def stop(self):
|
|
"""Stop detection."""
|
|
self.is_running = False
|
|
if self.porcupine:
|
|
self.porcupine.delete()
|
|
|
|
def create_custom_hotword(self, keyword: str, output_path: str):
|
|
"""
|
|
Create custom hotword model (Porcupine only).
|
|
|
|
Args:
|
|
keyword: Keyword phrase
|
|
output_path: Path to save the model
|
|
"""
|
|
if not HAS_PORCUPINE:
|
|
raise RuntimeError("Porcupine not available")
|
|
|
|
# This would require Porcupine training API
|
|
logger.warning("Custom hotword training not yet implemented")
|
|
|
|
|
|
class SimpleHotwordDetector:
|
|
"""Simple energy-based hotword detection."""
|
|
|
|
def __init__(self, keyword: str = "hey osiris"):
|
|
self.keyword = keyword
|
|
|
|
def detect(self, timeout: int = None) -> Optional[str]:
|
|
"""Simple detection - not reliable."""
|
|
logger.warning("Simple detection is not reliable. Install Porcupine for best results.")
|
|
return None
|
|
|
|
|
|
def main():
|
|
"""Test hotword detection."""
|
|
import time
|
|
|
|
def on_hotword():
|
|
print("✨ Hotword detected!")
|
|
|
|
detector = HotwordDetector()
|
|
detector.set_callback(on_hotword)
|
|
|
|
print("Listening for hotword... (Ctrl+C to stop)")
|
|
|
|
try:
|
|
while True:
|
|
result = detector.detect(timeout=30)
|
|
if result:
|
|
print(f"Detected: {result}")
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nStopped")
|
|
detector.stop()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|