openclaw-voice-assistant/hotword_detector.py
Claw - AI Now Inc 1662bc141a Initial commit: Bilingual Voice Assistant for Google AIY Voice Kit V1
Features:
- Bilingual support (English/Mandarin Chinese)
- Hotword detection: 'Hey Osiris' / '你好 Osiris'
- Music playback control (MP3, WAV, OGG, FLAC)
- OpenClaw integration for AI responses
- Google AIY Voice Kit V1 compatible
- Text-to-speech in both languages
- Voice command recognition
- Raspberry Pi ready with installation script

AI Now Inc - Del Mar Demo Unit 🏭
2026-03-01 00:02:49 -08:00

266 lines
7.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Hotword Detector
Detects wake words: "Hey Osiris" / "你好 Osiris"
Supports:
- Porcupine (PicoVoice) for accurate hotword detection
- Custom keyword spotting
- Bilingual support (English/Mandarin)
"""
import os
import json
import logging
import struct
import wave
from typing import Optional, Callable, List
from pathlib import Path
try:
import pvporcupine
import pyaudio
HAS_PORCUPINE = True
except ImportError:
HAS_PORCUPINE = False
logging.warning("Porcupine not installed. Install with: pip install pvporcupine")
try:
import webrtcvad
HAS_VAD = True
except ImportError:
HAS_VAD = False
logging.warning("WebRTC VAD not installed")
logger = logging.getLogger(__name__)
class HotwordDetector:
"""
Hotword detection with support for "Hey Osiris" in English and Mandarin.
"""
def __init__(self, config_path: str = "hotword_config.json"):
self.config = self._load_config(config_path)
self.audio_config = self.config.get("audio", {
"sample_rate": 16000,
"frame_length": 512
})
self.hotwords = self.config.get("hotwords", [])
self.is_running = False
self.callback = None
# Porcupine setup
self.porcupine = None
self.keyword_index = -1
if HAS_PORCUPINE:
self._init_porcupine()
# VAD setup
self.vad = None
if HAS_VAD:
self.vad = webrtcvad.Vad(2) # Aggressiveness level 2
logger.info(f"HotwordDetector initialized (Porcupine: {HAS_PORCUPINE})")
def _load_config(self, config_path: str) -> dict:
"""Load configuration."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
"hotwords": [
{
"keyword": "hey osiris",
"keyword_zh": "你好 osiris",
"sensitivity": 0.5
}
],
"audio": {
"sample_rate": 16000,
"frame_length": 512
}
}
def _init_porcupine(self):
"""Initialize Porcupine hotword detection."""
if not HAS_PORCUPINE:
return
try:
# Create Porcupine instance with custom keywords
self.porcupine = pvporcupine.create(
keywords=["hey osiris"],
sensitivities=[0.5]
)
self.keyword_index = 0
logger.info("Porcupine initialized with 'Hey Osiris'")
except Exception as e:
logger.warning(f"Porcupine initialization failed: {e}")
self.porcupine = None
def set_callback(self, callback: Callable[[], None]):
"""Set callback function for when hotword is detected."""
self.callback = callback
def detect(self, timeout: int = None) -> Optional[str]:
"""
Start detection and wait for hotword.
Args:
timeout: Maximum time to wait in seconds (None = infinite)
Returns:
Detected hotword or None
"""
if not self.porcupine:
logger.warning("Porcupine not available, using simple detection")
return self._simple_detect(timeout)
return self._porcupine_detect(timeout)
def _porcupine_detect(self, timeout: int = None) -> Optional[str]:
"""Detect using Porcupine."""
if not self.porcupine:
return None
import pyaudio
pa = pyaudio.PyAudio()
try:
# Open audio stream
stream = pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length
)
logger.info("Listening for 'Hey Osiris'...")
self.is_running = True
start_time = None
if timeout:
import time
start_time = time.time()
while self.is_running:
# Check timeout
if timeout and start_time:
if time.time() - start_time > timeout:
logger.info("Hotword detection timeout")
break
# Read audio frame
pcm = stream.read(self.porcupine.frame_length, exception_on_overflow=False)
pcm = struct.unpack_from(
f"h{self.porcupine.frame_length}",
pcm
)
# Process frame
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
logger.info("Hotword detected!")
if self.callback:
self.callback()
return "hey osiris"
except KeyboardInterrupt:
logger.info("Detection interrupted")
except Exception as e:
logger.error(f"Detection error: {e}")
finally:
stream.close()
pa.terminate()
self.is_running = False
return None
def _simple_detect(self, timeout: int = None) -> Optional[str]:
"""
Simple voice activity detection (fallback).
Detects any speech as hotword.
"""
logger.warning("Using simple voice detection (not recommended)")
# This is a placeholder - in production you'd use:
# - Snowboy
# - Custom trained model
# - Or just use Porcupine
return None
def stop(self):
"""Stop detection."""
self.is_running = False
logger.info("Hotword detection stopped")
def create_custom_hotword(self, keyword: str, output_path: str):
"""
Create custom hotword model (requires Porcupine training).
This is a placeholder - actual implementation requires:
1. Recording multiple samples of the keyword
2. Training with Porcupine Console
3. Exporting the model
"""
logger.info(f"Custom hotword creation not implemented: {keyword}")
logger.info("Use Porcupine Console to train custom keywords")
class SimpleHotwordDetector:
"""
Simple hotword detection using audio level threshold.
Fallback when Porcupine is not available.
"""
def __init__(self, keyword: str = "hey osiris"):
self.keyword = keyword
self.threshold = 0.5
self.is_running = False
def detect(self, timeout: int = None) -> Optional[str]:
"""Simple energy-based detection."""
logger.warning("Simple detection is not reliable. Install Porcupine for best results.")
return None
def main():
"""Test hotword detection."""
print("\n" + "="*60)
print(" 🔍 Hotword Detector Test")
print(" Say 'Hey Osiris' or '你好 Osiris'")
print("="*60)
detector = HotwordDetector()
def on_hotword():
print("\n🎉 HOTWORD DETECTED!")
detector.set_callback(on_hotword)
try:
result = detector.detect(timeout=30)
if result:
print(f"Detected: {result}")
else:
print("No hotword detected")
except KeyboardInterrupt:
print("\nTest stopped")
detector.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()