Features:
- Bilingual support (English/Mandarin Chinese)
- Hotword detection: 'Hey Osiris' / '你好 Osiris'
- Music playback control (MP3, WAV, OGG, FLAC)
- OpenClaw integration for AI responses
- Google AIY Voice Kit V1 compatible
- Text-to-speech in both languages
- Voice command recognition
- Raspberry Pi ready with installation script
AI Now Inc - Del Mar Demo Unit 🏭
288 lines
8.2 KiB
Python
Executable File
288 lines
8.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Bilingual Voice Assistant - Main Entry Point
|
||
Google AIY Voice Kit V1 - English/Mandarin Support
|
||
|
||
AI Now Inc - Del Mar Demo Unit
|
||
Laboratory Assistant: Claw 🏭
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import logging
|
||
import signal
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
# Import components
|
||
from assistant import VoiceAssistant
|
||
from tts_engine import TTSEngine
|
||
from speech_recognizer import BilingualSpeechRecognizer
|
||
from music_player import MusicPlayer
|
||
from hotword_detector import HotwordDetector
|
||
|
||
# Configure logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class VoiceAssistantApp:
|
||
"""
|
||
Main application class for the bilingual voice assistant.
|
||
"""
|
||
|
||
def __init__(self, config_path: str = "config.json"):
|
||
self.config_path = Path(config_path)
|
||
self.config = self._load_config()
|
||
|
||
# Initialize components
|
||
logger.info("Initializing voice assistant...")
|
||
self.assistant = VoiceAssistant(str(self.config_path))
|
||
self.tts = TTSEngine(str(self.config_path))
|
||
self.hotword_detector = HotwordDetector(str(self.config_path).replace("config.json", "hotword_config.json"))
|
||
|
||
# State
|
||
self.is_running = False
|
||
self.current_language = "en"
|
||
self.is_awake = False # Hotword activated state
|
||
|
||
# Setup signal handlers
|
||
signal.signal(signal.SIGINT, self._signal_handler)
|
||
signal.signal(signal.SIGTERM, self._signal_handler)
|
||
|
||
logger.info("Voice assistant initialized with hotword detection")
|
||
|
||
def _load_config(self) -> dict:
|
||
"""Load configuration."""
|
||
try:
|
||
with open(self.config_path, 'r') as f:
|
||
return json.load(f)
|
||
except FileNotFoundError:
|
||
logger.warning("Config not found, using defaults")
|
||
return {}
|
||
|
||
def _signal_handler(self, sig, frame):
|
||
"""Handle shutdown signals."""
|
||
logger.info("Shutdown signal received")
|
||
self.is_running = False
|
||
|
||
def run(self):
|
||
"""Run the voice assistant with hotword detection."""
|
||
logger.info("Starting voice assistant with hotword detection...")
|
||
self.is_running = True
|
||
|
||
# Welcome message
|
||
welcome_text = "Voice assistant started. Say 'Hey Osiris' to activate."
|
||
welcome_text_zh = "语音助手已启动。说 '你好 Osiris' 来激活。"
|
||
|
||
print("\n" + "="*60)
|
||
print(" 🎤 Bilingual Voice Assistant - AI Now Inc")
|
||
print(" Laboratory Assistant: Claw 🏭")
|
||
print("="*60)
|
||
print(f"\n English: {welcome_text}")
|
||
print(f" 中文:{welcome_text_zh}")
|
||
print("\n Hotword: 'Hey Osiris' / '你好 Osiris'")
|
||
print(" Listening for hotword... (Press Ctrl+C to stop)\n")
|
||
|
||
# Speak welcome message
|
||
self.tts.speak(welcome_text, "en")
|
||
time.sleep(0.5)
|
||
self.tts.speak(welcome_text_zh, "zh")
|
||
|
||
# Set hotword callback
|
||
self.hotword_detector.set_callback(self._on_hotword_detected)
|
||
|
||
# Main loop - listen for hotword
|
||
try:
|
||
while self.is_running:
|
||
# Wait for hotword
|
||
print("⏳ Waiting for 'Hey Osiris'...")
|
||
self.hotword_detector.detect(timeout=None)
|
||
|
||
# If we get here, hotword was detected (or timeout)
|
||
if not self.is_running:
|
||
break
|
||
|
||
time.sleep(0.5)
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info("Interrupted by user")
|
||
|
||
finally:
|
||
self.shutdown()
|
||
|
||
def _on_hotword_detected(self):
|
||
"""Callback when hotword is detected."""
|
||
print("\n🎉 Hotword detected! Listening for command...")
|
||
|
||
# Awakening message
|
||
awake_text = "Yes? How can I help?"
|
||
awake_text_zh = "在的,有什么可以帮你?"
|
||
|
||
self.tts.speak(awake_text, "en")
|
||
time.sleep(0.5)
|
||
self.tts.speak(awake_text_zh, "zh")
|
||
|
||
# Now listen for command (simplified - would use speech recognition)
|
||
try:
|
||
user_input = input("Command: ").strip()
|
||
|
||
if user_input:
|
||
# Detect language
|
||
lang = "zh" if any('\u4e00' <= c <= '\u9fff' for c in user_input) else "en"
|
||
|
||
# Process command
|
||
response, resp_lang = self.assistant.process_command(user_input, lang)
|
||
|
||
# Output response
|
||
print(f"Assistant: {response}")
|
||
|
||
# Speak response
|
||
self.tts.speak(response, resp_lang)
|
||
except Exception as e:
|
||
logger.error(f"Command processing error: {e}")
|
||
|
||
def shutdown(self):
|
||
"""Clean shutdown."""
|
||
logger.info("Shutting down...")
|
||
|
||
# Stop music if playing
|
||
self.assistant.music_player.stop()
|
||
|
||
# Goodbye message
|
||
goodbye_text = "Goodbye!"
|
||
goodbye_text_zh = "再见!"
|
||
|
||
self.tts.speak(goodbye_text, "en")
|
||
time.sleep(0.5)
|
||
self.tts.speak(goodbye_text_zh, "zh")
|
||
|
||
logger.info("Voice assistant stopped")
|
||
|
||
|
||
def test_mode():
|
||
"""Run in test mode with sample commands."""
|
||
print("\n" + "="*60)
|
||
print(" 🧪 Test Mode - Sample Commands")
|
||
print("="*60)
|
||
|
||
assistant = VoiceAssistant()
|
||
tts = TTSEngine()
|
||
|
||
test_commands = [
|
||
("hello", "en"),
|
||
("what time is it", "en"),
|
||
("play music", "en"),
|
||
("你好", "zh"),
|
||
("现在几点", "zh"),
|
||
("播放音乐", "zh"),
|
||
]
|
||
|
||
for text, lang in test_commands:
|
||
print(f"\nInput: {text} ({lang})")
|
||
response, resp_lang = assistant.process_command(text, lang)
|
||
print(f"Output: {response} ({resp_lang})")
|
||
tts.speak(response, resp_lang)
|
||
time.sleep(1)
|
||
|
||
|
||
def demo_mode():
|
||
"""Interactive demo mode."""
|
||
print("\n" + "="*60)
|
||
print(" 🎭 Demo Mode - Try These Commands!")
|
||
print("="*60)
|
||
print("""
|
||
English Commands:
|
||
- "hello"
|
||
- "what time is it"
|
||
- "play music"
|
||
- "pause"
|
||
- "stop"
|
||
- "volume up"
|
||
- "ask Claw: what is industrial control?"
|
||
|
||
中文命令:
|
||
- "你好"
|
||
- "现在几点"
|
||
- "播放音乐"
|
||
- "暂停"
|
||
- "停止"
|
||
- "音量大"
|
||
- "问 Claw:什么是工业控制?"
|
||
|
||
Type 'quit' to exit
|
||
""")
|
||
|
||
assistant = VoiceAssistant()
|
||
tts = TTSEngine()
|
||
|
||
while True:
|
||
try:
|
||
user_input = input("\nYou: ").strip()
|
||
|
||
if user_input.lower() in ['quit', 'exit', '退出']:
|
||
break
|
||
|
||
if not user_input:
|
||
continue
|
||
|
||
# Detect language
|
||
lang = "zh" if any('\u4e00' <= c <= '\u9fff' for c in user_input) else "en"
|
||
|
||
# Process command
|
||
response, resp_lang = assistant.process_command(user_input, lang)
|
||
|
||
# Output
|
||
print(f"Assistant: {response}")
|
||
|
||
# Speak (optional in demo)
|
||
speak_response = input("Speak? (y/n): ").strip().lower()
|
||
if speak_response == 'y':
|
||
tts.speak(response, resp_lang)
|
||
|
||
except KeyboardInterrupt:
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"Error: {e}")
|
||
|
||
print("\nDemo ended.")
|
||
|
||
|
||
def main():
|
||
"""Main entry point."""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Bilingual Voice Assistant for Google AIY Voice Kit V1"
|
||
)
|
||
parser.add_argument(
|
||
"--mode",
|
||
choices=["run", "test", "demo"],
|
||
default="demo",
|
||
help="Operation mode: run, test, or demo"
|
||
)
|
||
parser.add_argument(
|
||
"--config",
|
||
default="config.json",
|
||
help="Path to configuration file"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.mode == "test":
|
||
test_mode()
|
||
elif args.mode == "demo":
|
||
demo_mode()
|
||
else:
|
||
app = VoiceAssistantApp(args.config)
|
||
app.run()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|