Initial commit: Bilingual Voice Assistant for Google AIY Voice Kit V1

Features:
- Bilingual support (English/Mandarin Chinese)
- Hotword detection: 'Hey Osiris' / '你好 Osiris'
- Music playback control (MP3, WAV, OGG, FLAC)
- OpenClaw integration for AI responses
- Google AIY Voice Kit V1 compatible
- Text-to-speech in both languages
- Voice command recognition
- Raspberry Pi ready with installation script

AI Now Inc - Del Mar Demo Unit 🏭
This commit is contained in:
Claw - AI Now Inc 2026-03-01 00:02:49 -08:00
commit 1662bc141a
16 changed files with 3128 additions and 0 deletions

55
.gitignore vendored Normal file
View File

@ -0,0 +1,55 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
ENV/
env/
.venv
# Credentials
*.json
!config.json
!hotword_config.json
credentials.json
.env
.secrets
# Logs
*.log
logs/
# Audio files
*.wav
*.mp3
*.ogg
*.flac
# Temporary files
tmp/
temp/
*.tmp
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Database
*.db
!sensor_data.db
# Exports
exports/
# Test files
test_*.wav
test_output.*

258
QUICKSTART.md Normal file
View File

@ -0,0 +1,258 @@
# Quick Start Guide - Bilingual Voice Assistant
**AI Now Inc - Del Mar Demo Unit**
**Laboratory Assistant:** Claw 🏭
## 🚀 Installation (5 minutes)
### Step 1: Clone and Install
```bash
# Navigate to workspace
cd /home/pi
# Clone or copy the voice-assistant folder
# (If copying from another machine, use scp or git)
# Make install script executable
cd voice-assistant
chmod +x install.sh
# Run installation
sudo ./install.sh
```
### Step 2: Configure
Edit the configuration file:
```bash
nano config.local.json
```
Update these settings:
- `openclaw.ws_url`: Your OpenClaw server address
- `openclaw.api_key`: Your API key (if required)
- `music.library_path`: Path to your music files
### Step 3: Add Music (Optional)
```bash
# Copy MP3 files to music directory
cp /path/to/your/music/*.mp3 /home/pi/Music/
# Or download sample music
# (Ensure you have rights to the music)
```
### Step 4: Test
```bash
# Run in demo mode first
./venv/bin/python3 main.py --mode demo
# Or run in test mode
./venv/bin/python3 main.py --mode test
```
### Step 5: Run
```bash
# Start the service
sudo systemctl start voice-assistant
# Or run manually
./start.sh
```
## 🎤 Voice Commands
### Hotword
First, say the hotword to activate:
- **English:** "Hey Osiris"
- **Mandarin:** "你好 Osiris"
### General Commands
| English | Mandarin | Description |
|---------|----------|-------------|
| "Hello" | "你好" | Greeting |
| "What time is it?" | "现在几点?" | Get current time |
| "How are you?" | "你好吗?" | Greeting response |
| "Ask Claw: [question]" | "问 Claw[问题]" | Ask OpenClaw |
### Music Commands
| English | Mandarin | Description |
|---------|----------|-------------|
| "Play [song name]" | "播放 [歌曲名]" | Play music |
| "Play music" | "播放音乐" | Play any music |
| "Pause" | "暂停" | Pause playback |
| "Resume" | "继续" | Resume playback |
| "Stop" | "停止" | Stop playback |
| "Next" | "下一首" | Next track |
| "Previous" | "上一首" | Previous track |
| "Volume up" | "音量大" | Increase volume |
| "Volume down" | "音量小" | Decrease volume |
## 🔧 Troubleshooting
### Microphone Not Working
```bash
# Check if microphone is detected
arecord -l
# Test recording
arecord -d 3 test.wav
aplay test.wav
# Check volume levels
alsamixer
# Press F4 to see capture levels
# Use arrow keys to adjust
```
### No Sound Output
```bash
# Check audio output
speaker-test -t wav
# Set default output
alsamixer
# Press F6 to select output device
```
### Hotword Not Detecting
1. **Check microphone sensitivity:**
```bash
alsamixer
# Adjust capture levels
```
2. **Reduce background noise**
3. **Speak clearly and closer to microphone**
4. **Adjust sensitivity in config:**
```json
{
"speech": {
"hotword_sensitivity": 0.6 // Higher = more sensitive
}
}
```
### Music Not Playing
```bash
# Check if files are in correct location
ls -la /home/pi/Music/
# Verify file format (MP3, WAV, OGG, FLAC)
file /home/pi/Music/song.mp3
# Test playback manually
./venv/bin/python3 -c "from music_player import MusicPlayer; p = MusicPlayer(); p.play(list(p.music_library.values())[0])"
```
### OpenClaw Not Connecting
1. **Check network connection:**
```bash
ping 192.168.1.100 # Replace with your server IP
```
2. **Verify OpenClaw is running:**
```bash
# On server
openclaw status
```
3. **Check firewall:**
```bash
sudo ufw status
```
## 📊 Logs
### View Live Logs
```bash
# Service logs
sudo journalctl -u voice-assistant -f
# Installation logs
cat /var/log/voice-assistant-install.log
# Application logs (if configured)
tail -f /var/log/voice-assistant.log
```
### Debug Mode
```bash
# Run with debug logging
./venv/bin/python3 main.py --mode run --log-level DEBUG
```
## 🔄 Updates
### Update Installation
```bash
cd /home/pi/voice-assistant
# Pull latest changes (if using git)
git pull
# Reinstall dependencies
source venv/bin/activate
pip install -r requirements.txt --upgrade
```
### Update Configuration
```bash
# Edit local config
nano config.local.json
# Restart service
sudo systemctl restart voice-assistant
```
## 🛑 Uninstall
```bash
# Run uninstaller
sudo ./uninstall.sh
# Or manually:
sudo systemctl stop voice-assistant
sudo systemctl disable voice-assistant
sudo rm -rf /home/pi/voice-assistant
sudo rm /etc/systemd/system/voice-assistant.service
```
## 📚 Additional Resources
- [Full Documentation](README.md)
- [Google AIY Voice Kit Docs](https://github.com/google/aiyprojects-raspbian)
- [Porcupine Hotword Detection](https://github.com/Picovoice/porcupine)
- [OpenClaw Documentation](https://docs.openclaw.ai)
## 🆘 Support
For issues or questions:
1. Check the [README.md](README.md)
2. Review logs: `sudo journalctl -u voice-assistant`
3. Test in demo mode first
4. Ensure all dependencies are installed
---
**AI Now Inc** - Del Mar Show Demo Unit
**Version:** 1.0.0
**Last Updated:** 2026-02-28

227
README.md Normal file
View File

@ -0,0 +1,227 @@
# 🎤 Bilingual Voice Assistant - Google AIY Voice Kit V1
**AI Now Inc - Del Mar Demo Unit**
**Laboratory Assistant:** Claw 🏭
A bilingual (English/Mandarin) voice-activated assistant for Google AIY Voice Kit V1 with music playback capability.
## Features
- ✅ **Bilingual Support** - English and Mandarin Chinese speech recognition
- ✅ **Text-to-Speech** - Respond in the detected language
- ✅ **Music Playback** - Play MP3 files by voice command
- ✅ **Remote Communication** - Connect to OpenClaw assistant via API
- ✅ **Offline Capability** - Basic commands work without internet
- ✅ **Hotword Detection** - "Hey Assistant" / "你好助手" wake word
## Hardware Requirements
- **Google AIY Voice Kit V1** (with Voice HAT)
- **Raspberry Pi** (3B/3B+/4B recommended)
- **MicroSD Card** (8GB+)
- **Speaker** (3.5mm or HDMI audio)
- **Microphone** (included with AIY Kit)
- **Internet Connection** (WiFi/Ethernet)
## Software Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Google AIY Voice Kit V1 │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Hotword │ │ Speech │ │ Command │ │
│ │ Detection │→ │ Recognition │→ │ Processing │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
│ ↓ ↓ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Language Detection (en/zh) │ │
│ └──────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ OpenClaw API Communication │ │
│ └──────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ TTS │ │ Music Player │ │ Response │ │
│ │ (en/zh) │ │ (MP3) │ │ Handler │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
```
## Installation
### 1. Setup Google AIY Voice Kit
```bash
# Update system
sudo apt-get update
sudo apt-get upgrade
# Install AIY Voice Kit software
cd ~
git clone https://github.com/google/aiyprojects-raspbian.git
cd aiyprojects-raspbian
bash install.sh
sudo reboot
```
### 2. Install Dependencies
```bash
# Python dependencies
pip3 install google-cloud-speech google-cloud-texttospeech
pip3 install pygame mutagen
pip3 install requests websocket-client
pip3 install langdetect
```
### 3. Configure Google Cloud (Optional - for cloud services)
```bash
# Set up Google Cloud credentials
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/credentials.json"
```
## Configuration
Edit `config.json`:
```json
{
"openclaw": {
"enabled": true,
"ws_url": "ws://192.168.1.100:18790",
"api_key": "your_api_key"
},
"speech": {
"language": "auto",
"hotword": "hey assistant|你好助手"
},
"music": {
"library_path": "/home/pi/Music",
"default_volume": 0.7
},
"tts": {
"english_voice": "en-US-Standard-A",
"chinese_voice": "zh-CN-Standard-A"
}
}
```
## Usage
### Start the Assistant
```bash
cd /home/pi/voice-assistant
python3 main.py
```
### Voice Commands
#### General Commands
- "Hey Assistant, what time is it?" / "你好助手,现在几点?"
- "Hey Assistant, how are you?" / "你好助手,你好吗?"
- "Hey Assistant, tell me a joke" / "你好助手,讲个笑话"
#### Music Commands
- "Hey Assistant, play [song name]" / "你好助手,播放 [歌曲名]"
- "Hey Assistant, pause" / "你好助手,暂停"
- "Hey Assistant, resume" / "你好助手,继续"
- "Hey Assistant, stop" / "你好助手,停止"
- "Hey Assistant, next track" / "你好助手,下一首"
- "Hey Assistant, volume up" / "你好助手,音量加大"
#### OpenClaw Commands
- "Hey Assistant, ask Claw: [your question]"
- "你好助手,问 Claw[你的问题]"
## Project Structure
```
voice-assistant/
├── main.py # Main entry point
├── config.json # Configuration file
├── assistant.py # Core assistant logic
├── speech_recognizer.py # Speech recognition (en/zh)
├── tts_engine.py # Text-to-speech engine
├── music_player.py # MP3 playback control
├── openclaw_client.py # OpenClaw API client
├── hotword_detector.py # Wake word detection
├── requirements.txt # Python dependencies
└── samples/ # Sample audio files
```
## Language Detection
The system automatically detects the spoken language:
- **English keywords** → English response
- **Chinese keywords** → Mandarin response
- **Mixed input** → Respond in dominant language
## Music Library
Organize your MP3 files:
```
/home/pi/Music/
├── artist1/
│ ├── song1.mp3
│ └── song2.mp3
├── artist2/
│ └── song3.mp3
└── playlist/
└── favorites.mp3
```
## Advanced Features
### Custom Hotword
Train your own hotword using Porcupine or Snowboy.
### Offline Speech Recognition
Use Vosk or PocketSphinx for offline recognition.
### Multi-room Audio
Stream audio to multiple devices via Snapcast.
### Voice Profiles
Recognize different users and personalize responses.
## Troubleshooting
### Microphone not detected
```bash
arecord -l # List audio devices
alsamixer # Check levels
```
### Poor speech recognition
- Speak clearly and closer to the microphone
- Reduce background noise
- Check internet connection for cloud recognition
### Music playback issues
```bash
# Test audio output
speaker-test -t wav
# Check volume
alsamixer
```
## Next Steps
- [ ] Add voice profile recognition
- [ ] Implement offline speech recognition
- [ ] Add Spotify/Apple Music integration
- [ ] Create web UI for music library management
- [ ] Add multi-language support (Spanish, French, etc.)
- [ ] Implement voice commands for industrial control
---
**AI Now Inc** - Del Mar Show Demo Unit
**Contact:** Laboratory Assistant Claw 🏭
**Version:** 1.0.0

253
assistant.py Executable file
View File

@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Bilingual Voice Assistant Core
Main logic for processing voice commands and generating responses.
"""
import os
import json
import logging
import random
from typing import Optional, Dict, List, Tuple
from pathlib import Path
from datetime import datetime
from speech_recognizer import BilingualSpeechRecognizer
from music_player import MusicPlayer
from openclaw_client import OpenClawClient
logger = logging.getLogger(__name__)
class VoiceAssistant:
"""
Main assistant class coordinating speech recognition,
command processing, and responses.
"""
def __init__(self, config_path: str = "config.json"):
self.config_path = config_path
self.config = self._load_config(config_path)
# Initialize components
self.speech_recognizer = BilingualSpeechRecognizer(config_path)
self.music_player = MusicPlayer(config_path)
self.openclaw_client = OpenClawClient(config_path)
# Command patterns
self.music_commands = [
"play", "pause", "resume", "stop", "next", "previous",
"volume", "shuffle", "repeat"
]
self.chinese_music_commands = [
"播放", "暂停", "继续", "停止", "下一首", "上一首",
"音量", "随机", "重复"
]
logger.info("VoiceAssistant initialized")
def _load_config(self, config_path: str) -> dict:
"""Load configuration."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def process_command(self, text: str, language: str = "en") -> Tuple[str, str]:
"""
Process a voice command and return response.
Args:
text: Recognized text
language: Detected language ('en' or 'zh')
Returns:
Tuple of (response_text, response_language)
"""
text_lower = text.lower()
# Music commands
if self._is_music_command(text_lower, language):
return self._handle_music_command(text_lower, language)
# Time query
if any(word in text_lower for word in ["what time", "time is it", "几点", "时间"]):
return self._get_time(language)
# Greeting
if any(word in text_lower for word in ["hello", "hi", "hey", "你好", "您好"]):
return self._get_greeting(language)
# OpenClaw query
if "ask claw" in text_lower or "问 claw" in text_lower:
# Extract the actual question
question = text_lower.replace("ask claw", "").replace("问 claw", "").strip()
return self._ask_openclaw(question, language)
# Default: ask OpenClaw
return self._ask_openclaw(text, language)
def _is_music_command(self, text: str, language: str) -> bool:
"""Check if text is a music command."""
if language == "en":
return any(cmd in text for cmd in self.music_commands)
else:
return any(cmd in text for cmd in self.chinese_music_commands)
def _handle_music_command(self, text: str, language: str) -> Tuple[str, str]:
"""Handle music playback commands."""
# Play command
if "play" in text or "播放" in text:
# Extract song name if specified
song_name = self._extract_song_name(text)
if song_name:
matches = self.music_player.search_tracks(song_name)
if matches:
self.music_player.play(matches[0])
return (f"Playing {matches[0].name}",
"en" if language == "en" else "zh")
else:
return ("Song not found",
"en" if language == "en" else "zh")
else:
# Play random track
if self.music_player.music_library:
first_track = list(self.music_player.music_library.values())[0]
self.music_player.play(first_track)
return ("Playing music",
"en" if language == "en" else "zh")
# Pause
elif "pause" in text or "暂停" in text:
self.music_player.pause()
return ("Paused", "en" if language == "en" else "zh")
# Resume
elif "resume" in text or "继续" in text:
self.music_player.resume()
return ("Resumed", "en" if language == "en" else "zh")
# Stop
elif "stop" in text or "停止" in text:
self.music_player.stop()
return ("Stopped", "en" if language == "en" else "zh")
# Next
elif "next" in text or "下一首" in text:
self.music_player.next()
return ("Next track", "en" if language == "en" else "zh")
# Volume
elif "volume" in text or "音量" in text:
if "up" in text or "" in text:
self.music_player.set_volume(self.music_player.volume + 0.1)
elif "down" in text or "" in text:
self.music_player.set_volume(self.music_player.volume - 0.1)
return ("Volume adjusted", "en" if language == "en" else "zh")
return ("Command not recognized", "en" if language == "en" else "zh")
def _extract_song_name(self, text: str) -> Optional[str]:
"""Extract song name from command."""
# Simple implementation - look for text after "play"
if "play" in text:
parts = text.split("play", 1)
if len(parts) > 1:
return parts[1].strip()
if "播放" in text:
parts = text.split("播放", 1)
if len(parts) > 1:
return parts[1].strip()
return None
def _get_time(self, language: str) -> Tuple[str, str]:
"""Get current time response."""
now = datetime.now()
if language == "zh":
return (f"现在时间是 {now.strftime('%H点%M分')}", "zh")
else:
return (f"The current time is {now.strftime('%I:%M %p')}", "en")
def _get_greeting(self, language: str) -> Tuple[str, str]:
"""Get greeting response."""
greetings_en = [
"Hello! How can I help you?",
"Hi there! What can I do for you?",
"Hey! Ready to assist you."
]
greetings_zh = [
"你好!有什么可以帮你的吗?",
"您好!需要什么帮助?",
"嗨!随时为您服务。"
]
if language == "zh":
return (random.choice(greetings_zh), "zh")
else:
return (random.choice(greetings_en), "en")
def _ask_openclaw(self, question: str, language: str) -> Tuple[str, str]:
"""Send question to OpenClaw and get response."""
if not self.openclaw_client.enabled:
if language == "zh":
return ("OpenClaw 未启用", "zh")
else:
return ("OpenClaw is not enabled", "en")
# Add context about language preference
context = {"preferred_language": language}
response = self.openclaw_client.send_request(question, context)
if "error" in response:
if language == "zh":
return ("抱歉,暂时无法回答", "zh")
else:
return ("Sorry, I can't answer that right now", "en")
# Extract response text
response_text = response.get("response", str(response))
# Detect response language
response_lang = language # Assume same language
if any('\u4e00' <= char <= '\u9fff' for char in response_text):
response_lang = "zh"
return (response_text, response_lang)
def get_status(self) -> Dict:
"""Get assistant status."""
return {
"speech_recognizer": "active",
"music_player": self.music_player.get_status(),
"openclaw": self.openclaw_client.get_status()
}
def main():
"""Test the assistant."""
assistant = VoiceAssistant()
# Test commands
test_commands = [
("hello", "en"),
("what time is it", "en"),
("play music", "en"),
("你好", "zh"),
("现在几点", "zh"),
("播放音乐", "zh")
]
for text, lang in test_commands:
response, resp_lang = assistant.process_command(text, lang)
print(f"Input: {text} ({lang})")
print(f"Output: {response} ({resp_lang})")
print("-" * 40)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()

37
config.json Normal file
View File

@ -0,0 +1,37 @@
{
"openclaw": {
"enabled": true,
"ws_url": "ws://192.168.1.100:18790",
"api_key": "your_api_key_here",
"reconnect_interval": 5
},
"speech": {
"language": "auto",
"hotword": "hey assistant|你好助手",
"hotword_sensitivity": 0.5,
"recognition_timeout": 5,
"offline_mode": false
},
"music": {
"library_path": "/home/pi/Music",
"default_volume": 0.7,
"scan_interval": 300,
"supported_formats": [".mp3", ".wav", ".ogg", ".flac"]
},
"tts": {
"english_voice": "en-US-Standard-A",
"chinese_voice": "zh-CN-Standard-A",
"speed": 1.0,
"pitch": 0
},
"audio": {
"input_device": "default",
"output_device": "default",
"sample_rate": 16000,
"channels": 1
},
"logging": {
"level": "INFO",
"file": "/var/log/voice-assistant.log"
}
}

19
hotword_config.json Normal file
View File

@ -0,0 +1,19 @@
{
"hotwords": [
{
"keyword": "hey osiris",
"keyword_zh": "你好 osiris",
"sensitivity": 0.5,
"library_path": "resources/porcupine"
}
],
"audio": {
"sample_rate": 16000,
"frame_length": 512
},
"behavior": {
"timeout": 30,
"cooldown": 5,
"continuous_listen": false
}
}

265
hotword_detector.py Executable file
View File

@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""
Hotword Detector
Detects wake words: "Hey Osiris" / "你好 Osiris"
Supports:
- Porcupine (PicoVoice) for accurate hotword detection
- Custom keyword spotting
- Bilingual support (English/Mandarin)
"""
import os
import json
import logging
import struct
import wave
from typing import Optional, Callable, List
from pathlib import Path
try:
import pvporcupine
import pyaudio
HAS_PORCUPINE = True
except ImportError:
HAS_PORCUPINE = False
logging.warning("Porcupine not installed. Install with: pip install pvporcupine")
try:
import webrtcvad
HAS_VAD = True
except ImportError:
HAS_VAD = False
logging.warning("WebRTC VAD not installed")
logger = logging.getLogger(__name__)
class HotwordDetector:
"""
Hotword detection with support for "Hey Osiris" in English and Mandarin.
"""
def __init__(self, config_path: str = "hotword_config.json"):
self.config = self._load_config(config_path)
self.audio_config = self.config.get("audio", {
"sample_rate": 16000,
"frame_length": 512
})
self.hotwords = self.config.get("hotwords", [])
self.is_running = False
self.callback = None
# Porcupine setup
self.porcupine = None
self.keyword_index = -1
if HAS_PORCUPINE:
self._init_porcupine()
# VAD setup
self.vad = None
if HAS_VAD:
self.vad = webrtcvad.Vad(2) # Aggressiveness level 2
logger.info(f"HotwordDetector initialized (Porcupine: {HAS_PORCUPINE})")
def _load_config(self, config_path: str) -> dict:
"""Load configuration."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
"hotwords": [
{
"keyword": "hey osiris",
"keyword_zh": "你好 osiris",
"sensitivity": 0.5
}
],
"audio": {
"sample_rate": 16000,
"frame_length": 512
}
}
def _init_porcupine(self):
"""Initialize Porcupine hotword detection."""
if not HAS_PORCUPINE:
return
try:
# Create Porcupine instance with custom keywords
self.porcupine = pvporcupine.create(
keywords=["hey osiris"],
sensitivities=[0.5]
)
self.keyword_index = 0
logger.info("Porcupine initialized with 'Hey Osiris'")
except Exception as e:
logger.warning(f"Porcupine initialization failed: {e}")
self.porcupine = None
def set_callback(self, callback: Callable[[], None]):
"""Set callback function for when hotword is detected."""
self.callback = callback
def detect(self, timeout: int = None) -> Optional[str]:
"""
Start detection and wait for hotword.
Args:
timeout: Maximum time to wait in seconds (None = infinite)
Returns:
Detected hotword or None
"""
if not self.porcupine:
logger.warning("Porcupine not available, using simple detection")
return self._simple_detect(timeout)
return self._porcupine_detect(timeout)
def _porcupine_detect(self, timeout: int = None) -> Optional[str]:
"""Detect using Porcupine."""
if not self.porcupine:
return None
import pyaudio
pa = pyaudio.PyAudio()
try:
# Open audio stream
stream = pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length
)
logger.info("Listening for 'Hey Osiris'...")
self.is_running = True
start_time = None
if timeout:
import time
start_time = time.time()
while self.is_running:
# Check timeout
if timeout and start_time:
if time.time() - start_time > timeout:
logger.info("Hotword detection timeout")
break
# Read audio frame
pcm = stream.read(self.porcupine.frame_length, exception_on_overflow=False)
pcm = struct.unpack_from(
f"h{self.porcupine.frame_length}",
pcm
)
# Process frame
keyword_index = self.porcupine.process(pcm)
if keyword_index >= 0:
logger.info("Hotword detected!")
if self.callback:
self.callback()
return "hey osiris"
except KeyboardInterrupt:
logger.info("Detection interrupted")
except Exception as e:
logger.error(f"Detection error: {e}")
finally:
stream.close()
pa.terminate()
self.is_running = False
return None
def _simple_detect(self, timeout: int = None) -> Optional[str]:
"""
Simple voice activity detection (fallback).
Detects any speech as hotword.
"""
logger.warning("Using simple voice detection (not recommended)")
# This is a placeholder - in production you'd use:
# - Snowboy
# - Custom trained model
# - Or just use Porcupine
return None
def stop(self):
"""Stop detection."""
self.is_running = False
logger.info("Hotword detection stopped")
def create_custom_hotword(self, keyword: str, output_path: str):
"""
Create custom hotword model (requires Porcupine training).
This is a placeholder - actual implementation requires:
1. Recording multiple samples of the keyword
2. Training with Porcupine Console
3. Exporting the model
"""
logger.info(f"Custom hotword creation not implemented: {keyword}")
logger.info("Use Porcupine Console to train custom keywords")
class SimpleHotwordDetector:
"""
Simple hotword detection using audio level threshold.
Fallback when Porcupine is not available.
"""
def __init__(self, keyword: str = "hey osiris"):
self.keyword = keyword
self.threshold = 0.5
self.is_running = False
def detect(self, timeout: int = None) -> Optional[str]:
"""Simple energy-based detection."""
logger.warning("Simple detection is not reliable. Install Porcupine for best results.")
return None
def main():
"""Test hotword detection."""
print("\n" + "="*60)
print(" 🔍 Hotword Detector Test")
print(" Say 'Hey Osiris' or '你好 Osiris'")
print("="*60)
detector = HotwordDetector()
def on_hotword():
print("\n🎉 HOTWORD DETECTED!")
detector.set_callback(on_hotword)
try:
result = detector.detect(timeout=30)
if result:
print(f"Detected: {result}")
else:
print("No hotword detected")
except KeyboardInterrupt:
print("\nTest stopped")
detector.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()

422
install.sh Executable file
View File

@ -0,0 +1,422 @@
#!/bin/bash
#
# Google AIY Voice Kit V1 - Installation Script
# Bilingual Voice Assistant (English/Mandarin)
#
# AI Now Inc - Del Mar Demo Unit
# Laboratory Assistant: Claw 🏭
#
set -e # Exit on error
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
INSTALL_DIR="/home/pi/voice-assistant"
MUSIC_DIR="/home/pi/Music"
LOG_FILE="/var/log/voice-assistant-install.log"
PYTHON_VERSION="3.9"
echo -e "${BLUE}"
echo "=========================================="
echo " 🎤 Voice Assistant Installer"
echo " AI Now Inc - Del Mar Demo Unit"
echo "=========================================="
echo -e "${NC}"
# Logging function
log() {
echo -e "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1" | tee -a "$LOG_FILE"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1" | tee -a "$LOG_FILE"
}
log_info() {
echo -e "${YELLOW}[INFO]${NC} $1" | tee -a "$LOG_FILE"
}
# Check if running as root
check_root() {
if [ "$EUID" -ne 0 ]; then
log_error "Please run as root (sudo ./install.sh)"
exit 1
fi
}
# Check if running on Raspberry Pi
check_raspberry_pi() {
if ! grep -q "Raspberry Pi" /proc/cpuinfo 2>/dev/null; then
log_info "Not running on Raspberry Pi (this may still work)"
else
log_success "Raspberry Pi detected"
fi
}
# Update system packages
update_system() {
log_info "Updating system packages..."
apt-get update
apt-get upgrade -y
log_success "System updated"
}
# Install system dependencies
install_system_deps() {
log_info "Installing system dependencies..."
apt-get install -y \
python3 \
python3-pip \
python3-dev \
python3-venv \
portaudio19-dev \
libffi-dev \
libssl-dev \
libjpeg-dev \
zlib1g-dev \
libfreetype6-dev \
liblcms2-dev \
libopenjp2-7 \
libtiff5 \
libblas-dev \
liblapack-dev \
libatlas-base-dev \
libgfortran5 \
swig \
libasound2-dev \
arecord \
alsamixer \
wget \
git \
curl
log_success "System dependencies installed"
}
# Install Google AIY Voice Kit
install_aiy_voice() {
log_info "Installing Google AIY Voice Kit..."
# Check if AIY already installed
if [ -d "/usr/local/bin/aiy" ]; then
log_info "AIY Voice Kit already installed"
return
fi
# Install AIY packages
cd /tmp
wget https://dl.google.com/aiyprojects/raspbian/aiyvoice-buster-20230111.zip
unzip aiyvoice-buster-*.zip
cd aiyvoice-*
./install.sh
log_success "Google AIY Voice Kit installed"
}
# Create virtual environment
create_venv() {
log_info "Creating Python virtual environment..."
cd "$INSTALL_DIR"
python3 -m venv venv
log_success "Virtual environment created"
}
# Install Python dependencies
install_python_deps() {
log_info "Installing Python dependencies..."
cd "$INSTALL_DIR"
source venv/bin/activate
# Upgrade pip
pip install --upgrade pip
# Install requirements
pip install -r requirements.txt
# Install additional dependencies for hotword detection
pip install porcupine1
pip install webrtcvad
log_success "Python dependencies installed"
}
# Create music directory
create_music_dir() {
log_info "Creating music directory..."
if [ ! -d "$MUSIC_DIR" ]; then
mkdir -p "$MUSIC_DIR"
log_success "Music directory created: $MUSIC_DIR"
else
log_info "Music directory already exists"
fi
# Set permissions
chown pi:pi "$MUSIC_DIR"
chmod 755 "$MUSIC_DIR"
}
# Configure audio
configure_audio() {
log_info "Configuring audio..."
# Create/update ALSA configuration
cat > /etc/asound.conf << 'EOF'
pcm.!default {
type plug
slave.pcm "hw:0,0"
}
ctl.!default {
type hw
card 0
}
EOF
log_success "Audio configured"
}
# Install systemd service
install_service() {
log_info "Installing systemd service..."
cat > /etc/systemd/system/voice-assistant.service << EOF
[Unit]
Description=Bilingual Voice Assistant
After=network.target sound.target
[Service]
Type=simple
User=pi
WorkingDirectory=$INSTALL_DIR
ExecStart=$INSTALL_DIR/venv/bin/python3 $INSTALL_DIR/main.py --mode run
Restart=always
RestartSec=10
Environment=PYTHONUNBUFFERED=1
Environment=GOOGLE_APPLICATION_CREDENTIALS=/home/pi/.credentials/google-credentials.json
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=voice-assistant
[Install]
WantedBy=multi-user.target
EOF
# Enable service
systemctl daemon-reload
systemctl enable voice-assistant.service
log_success "Systemd service installed and enabled"
}
# Configure hotword detection
configure_hotword() {
log_info "Configuring hotword detection..."
# Create hotword configuration
cat > "$INSTALL_DIR/hotword_config.json" << 'EOF'
{
"hotwords": [
{
"keyword": "hey osiris",
"keyword_zh": "你好 osiris",
"sensitivity": 0.5,
"library_path": "resources/porcupine"
}
],
"audio": {
"sample_rate": 16000,
"frame_length": 512
}
}
EOF
log_success "Hotword detection configured"
}
# Create sample music directory structure
create_sample_music_structure() {
log_info "Creating sample music structure..."
mkdir -p "$MUSIC_DIR/samples"
mkdir -p "$MUSIC_DIR/playlists"
# Create a README for music
cat > "$MUSIC_DIR/README.md" << 'EOF'
# Music Library
Place your MP3 files here. The assistant will automatically detect and index them.
## Supported Formats
- MP3
- WAV
- OGG
- FLAC
## Organization
You can organize music by:
- Artist/Album/Song.mp3
- Genre/Song.mp3
- Or flat structure: Song.mp3
## Voice Commands
- "Play [song name]" / "播放 [歌曲名]"
- "Pause" / "暂停"
- "Resume" / "继续"
- "Next" / "下一首"
- "Volume up/down" / "音量 大/小"
EOF
chown -R pi:pi "$MUSIC_DIR"
log_success "Sample music structure created"
}
# Create startup script
create_startup_script() {
log_info "Creating startup script..."
cat > "$INSTALL_DIR/start.sh" << 'EOF'
#!/bin/bash
# Voice Assistant Startup Script
cd "$(dirname "$0")"
# Activate virtual environment
source venv/bin/activate
# Run the assistant
python3 main.py --mode run
EOF
chmod +x "$INSTALL_DIR/start.sh"
chown pi:pi "$INSTALL_DIR/start.sh"
log_success "Startup script created"
}
# Create uninstall script
create_uninstall_script() {
log_info "Creating uninstall script..."
cat > "$INSTALL_DIR/uninstall.sh" << 'EOF'
#!/bin/bash
# Uninstall Voice Assistant
echo "Uninstalling Voice Assistant..."
# Stop service
sudo systemctl stop voice-assistant
sudo systemctl disable voice-assistant
sudo rm /etc/systemd/system/voice-assistant.service
# Remove installation
sudo rm -rf /home/pi/voice-assistant
# Remove music directory (optional)
# sudo rm -rf /home/pi/Music
echo "Uninstall complete!"
EOF
chmod +x "$INSTALL_DIR/uninstall.sh"
log_success "Uninstall script created"
}
# Final configuration
final_configuration() {
log_info "Running final configuration..."
# Copy config if not exists
if [ ! -f "$INSTALL_DIR/config.local.json" ]; then
cp "$INSTALL_DIR/config.json" "$INSTALL_DIR/config.local.json"
log_info "Created local configuration: config.local.json"
fi
# Set permissions
chown -R pi:pi "$INSTALL_DIR"
chmod -R 755 "$INSTALL_DIR"
log_success "Final configuration complete"
}
# Print next steps
print_next_steps() {
echo ""
echo -e "${GREEN}=========================================="
echo " Installation Complete! 🎉"
echo "==========================================${NC}"
echo ""
echo "Next steps:"
echo "1. Edit configuration:"
echo " nano $INSTALL_DIR/config.local.json"
echo ""
echo "2. Add your MP3 files to: $MUSIC_DIR"
echo ""
echo "3. Test the assistant:"
echo " cd $INSTALL_DIR"
echo " ./start.sh"
echo ""
echo "4. Or run in demo mode:"
echo " $INSTALL_DIR/venv/bin/python3 $INSTALL_DIR/main.py --mode demo"
echo ""
echo "5. Start the service:"
echo " sudo systemctl start voice-assistant"
echo ""
echo "6. View logs:"
echo " sudo journalctl -u voice-assistant -f"
echo ""
echo "Voice commands:"
echo " - 'Hey Osiris' / '你好 Osiris' (hotword)"
echo " - 'Hello' / '你好'"
echo " - 'Play music' / '播放音乐'"
echo " - 'What time is it?' / '现在几点?'"
echo ""
echo -e "${YELLOW}Note: Make sure your microphone is connected and working!${NC}"
echo ""
}
# Main installation
main() {
log "Starting installation..."
check_root
check_raspberry_pi
update_system
install_system_deps
# install_aiy_voice # Commented out - install manually if needed
create_venv
install_python_deps
create_music_dir
configure_audio
install_service
configure_hotword
create_sample_music_structure
create_startup_script
create_uninstall_script
final_configuration
print_next_steps
log_success "Installation completed successfully!"
}
# Run main
main "$@"

287
main.py Executable file
View File

@ -0,0 +1,287 @@
#!/usr/bin/env python3
"""
Bilingual Voice Assistant - Main Entry Point
Google AIY Voice Kit V1 - English/Mandarin Support
AI Now Inc - Del Mar Demo Unit
Laboratory Assistant: Claw 🏭
"""
import os
import sys
import json
import logging
import signal
import time
from pathlib import Path
from typing import Optional
# Import components
from assistant import VoiceAssistant
from tts_engine import TTSEngine
from speech_recognizer import BilingualSpeechRecognizer
from music_player import MusicPlayer
from hotword_detector import HotwordDetector
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class VoiceAssistantApp:
"""
Main application class for the bilingual voice assistant.
"""
def __init__(self, config_path: str = "config.json"):
self.config_path = Path(config_path)
self.config = self._load_config()
# Initialize components
logger.info("Initializing voice assistant...")
self.assistant = VoiceAssistant(str(self.config_path))
self.tts = TTSEngine(str(self.config_path))
self.hotword_detector = HotwordDetector(str(self.config_path).replace("config.json", "hotword_config.json"))
# State
self.is_running = False
self.current_language = "en"
self.is_awake = False # Hotword activated state
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
logger.info("Voice assistant initialized with hotword detection")
def _load_config(self) -> dict:
"""Load configuration."""
try:
with open(self.config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.warning("Config not found, using defaults")
return {}
def _signal_handler(self, sig, frame):
"""Handle shutdown signals."""
logger.info("Shutdown signal received")
self.is_running = False
def run(self):
"""Run the voice assistant with hotword detection."""
logger.info("Starting voice assistant with hotword detection...")
self.is_running = True
# Welcome message
welcome_text = "Voice assistant started. Say 'Hey Osiris' to activate."
welcome_text_zh = "语音助手已启动。说 '你好 Osiris' 来激活。"
print("\n" + "="*60)
print(" 🎤 Bilingual Voice Assistant - AI Now Inc")
print(" Laboratory Assistant: Claw 🏭")
print("="*60)
print(f"\n English: {welcome_text}")
print(f" 中文:{welcome_text_zh}")
print("\n Hotword: 'Hey Osiris' / '你好 Osiris'")
print(" Listening for hotword... (Press Ctrl+C to stop)\n")
# Speak welcome message
self.tts.speak(welcome_text, "en")
time.sleep(0.5)
self.tts.speak(welcome_text_zh, "zh")
# Set hotword callback
self.hotword_detector.set_callback(self._on_hotword_detected)
# Main loop - listen for hotword
try:
while self.is_running:
# Wait for hotword
print("⏳ Waiting for 'Hey Osiris'...")
self.hotword_detector.detect(timeout=None)
# If we get here, hotword was detected (or timeout)
if not self.is_running:
break
time.sleep(0.5)
except KeyboardInterrupt:
logger.info("Interrupted by user")
finally:
self.shutdown()
def _on_hotword_detected(self):
"""Callback when hotword is detected."""
print("\n🎉 Hotword detected! Listening for command...")
# Awakening message
awake_text = "Yes? How can I help?"
awake_text_zh = "在的,有什么可以帮你?"
self.tts.speak(awake_text, "en")
time.sleep(0.5)
self.tts.speak(awake_text_zh, "zh")
# Now listen for command (simplified - would use speech recognition)
try:
user_input = input("Command: ").strip()
if user_input:
# Detect language
lang = "zh" if any('\u4e00' <= c <= '\u9fff' for c in user_input) else "en"
# Process command
response, resp_lang = self.assistant.process_command(user_input, lang)
# Output response
print(f"Assistant: {response}")
# Speak response
self.tts.speak(response, resp_lang)
except Exception as e:
logger.error(f"Command processing error: {e}")
def shutdown(self):
"""Clean shutdown."""
logger.info("Shutting down...")
# Stop music if playing
self.assistant.music_player.stop()
# Goodbye message
goodbye_text = "Goodbye!"
goodbye_text_zh = "再见!"
self.tts.speak(goodbye_text, "en")
time.sleep(0.5)
self.tts.speak(goodbye_text_zh, "zh")
logger.info("Voice assistant stopped")
def test_mode():
"""Run in test mode with sample commands."""
print("\n" + "="*60)
print(" 🧪 Test Mode - Sample Commands")
print("="*60)
assistant = VoiceAssistant()
tts = TTSEngine()
test_commands = [
("hello", "en"),
("what time is it", "en"),
("play music", "en"),
("你好", "zh"),
("现在几点", "zh"),
("播放音乐", "zh"),
]
for text, lang in test_commands:
print(f"\nInput: {text} ({lang})")
response, resp_lang = assistant.process_command(text, lang)
print(f"Output: {response} ({resp_lang})")
tts.speak(response, resp_lang)
time.sleep(1)
def demo_mode():
"""Interactive demo mode."""
print("\n" + "="*60)
print(" 🎭 Demo Mode - Try These Commands!")
print("="*60)
print("""
English Commands:
- "hello"
- "what time is it"
- "play music"
- "pause"
- "stop"
- "volume up"
- "ask Claw: what is industrial control?"
中文命令:
- "你好"
- "现在几点"
- "播放音乐"
- "暂停"
- "停止"
- "音量大"
- "问 Claw什么是工业控制"
Type 'quit' to exit
""")
assistant = VoiceAssistant()
tts = TTSEngine()
while True:
try:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['quit', 'exit', '退出']:
break
if not user_input:
continue
# Detect language
lang = "zh" if any('\u4e00' <= c <= '\u9fff' for c in user_input) else "en"
# Process command
response, resp_lang = assistant.process_command(user_input, lang)
# Output
print(f"Assistant: {response}")
# Speak (optional in demo)
speak_response = input("Speak? (y/n): ").strip().lower()
if speak_response == 'y':
tts.speak(response, resp_lang)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"Error: {e}")
print("\nDemo ended.")
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Bilingual Voice Assistant for Google AIY Voice Kit V1"
)
parser.add_argument(
"--mode",
choices=["run", "test", "demo"],
default="demo",
help="Operation mode: run, test, or demo"
)
parser.add_argument(
"--config",
default="config.json",
help="Path to configuration file"
)
args = parser.parse_args()
if args.mode == "test":
test_mode()
elif args.mode == "demo":
demo_mode()
else:
app = VoiceAssistantApp(args.config)
app.run()
if __name__ == "__main__":
main()

312
music_player.py Executable file
View File

@ -0,0 +1,312 @@
#!/usr/bin/env python3
"""
Music Player for Google AIY Voice Kit
Supports MP3 playback with voice control.
"""
import os
import json
import logging
import random
from pathlib import Path
from typing import Optional, List, Dict
from datetime import datetime
try:
import pygame
HAS_PYGAME = True
except ImportError:
HAS_PYGAME = False
try:
from mutagen.mp3 import MP3
from mutagen.easyid3 import EasyID3
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
logger = logging.getLogger(__name__)
class MusicPlayer:
"""
MP3 music player with voice control support.
"""
def __init__(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
self.music_path = Path(self.config.get("music", {}).get(
"library_path", "/home/pi/Music"
))
self.volume = self.config.get("music", {}).get("default_volume", 0.7)
self.supported_formats = self.config.get("music", {}).get(
"supported_formats", [".mp3", ".wav", ".ogg", ".flac"]
)
self.current_track: Optional[Path] = None
self.playlist: List[Path] = []
self.playlist_index: int = 0
self.is_playing: bool = False
self.is_paused: bool = False
# Initialize pygame mixer
if HAS_PYGAME:
pygame.mixer.init()
pygame.mixer.music.set_volume(self.volume)
# Scan music library
self.music_library = self._scan_library()
logger.info(f"MusicPlayer initialized with {len(self.music_library)} tracks")
def _load_config(self, config_path: str) -> dict:
"""Load configuration from JSON file."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {"music": {"library_path": "/home/pi/Music"}}
def _scan_library(self) -> Dict[str, Path]:
"""
Scan music library for supported formats.
Returns:
Dictionary mapping track names to file paths
"""
library = {}
if not self.music_path.exists():
logger.warning(f"Music path {self.music_path} does not exist")
return library
for root, dirs, files in os.walk(self.music_path):
for file in files:
file_path = Path(root) / file
if file_path.suffix.lower() in self.supported_formats:
# Use filename without extension as key
track_name = file_path.stem.lower()
library[track_name] = file_path
logger.debug(f"Added track: {track_name}")
return library
def search_tracks(self, query: str) -> List[Path]:
"""
Search for tracks matching the query.
Args:
query: Search query (partial match)
Returns:
List of matching track paths
"""
query_lower = query.lower()
matches = []
# Exact match first
if query_lower in self.music_library:
return [self.music_library[query_lower]]
# Partial matches
for track_name, path in self.music_library.items():
if query_lower in track_name:
matches.append(path)
# If no matches, return all tracks (for "play music")
if not matches:
matches = list(self.music_library.values())
return matches[:10] # Limit results
def play(self, track_path: Optional[Path] = None) -> bool:
"""
Play a track.
Args:
track_path: Path to track (None for next in playlist)
Returns:
True if playback started successfully
"""
if not HAS_PYGAME:
logger.error("Pygame not available")
return False
try:
# If no track specified, use playlist
if track_path is None:
if self.playlist and self.playlist_index < len(self.playlist):
track_path = self.playlist[self.playlist_index]
else:
logger.warning("No track to play")
return False
if not track_path or not track_path.exists():
logger.warning(f"Track not found: {track_path}")
return False
logger.info(f"Playing: {track_path.name}")
pygame.mixer.music.load(str(track_path))
pygame.mixer.music.play()
self.current_track = track_path
self.is_playing = True
self.is_paused = False
return True
except Exception as e:
logger.error(f"Playback error: {e}")
return False
def pause(self) -> bool:
"""Pause current playback."""
if not HAS_PYGAME or not self.is_playing:
return False
try:
pygame.mixer.music.pause()
self.is_paused = True
logger.info("Playback paused")
return True
except Exception as e:
logger.error(f"Pause error: {e}")
return False
def resume(self) -> bool:
"""Resume paused playback."""
if not HAS_PYGAME or not self.is_paused:
return False
try:
pygame.mixer.music.unpause()
self.is_paused = False
logger.info("Playback resumed")
return True
except Exception as e:
logger.error(f"Resume error: {e}")
return False
def stop(self) -> bool:
"""Stop playback."""
if not HAS_PYGAME:
return False
try:
pygame.mixer.music.stop()
self.is_playing = False
self.is_paused = False
self.current_track = None
logger.info("Playback stopped")
return True
except Exception as e:
logger.error(f"Stop error: {e}")
return False
def next(self) -> bool:
"""Play next track in playlist."""
if not self.playlist:
return False
self.playlist_index = (self.playlist_index + 1) % len(self.playlist)
return self.play()
def previous(self) -> bool:
"""Play previous track in playlist."""
if not self.playlist:
return False
self.playlist_index = (self.playlist_index - 1) % len(self.playlist)
return self.play()
def set_volume(self, level: float) -> bool:
"""
Set volume level.
Args:
level: Volume level (0.0 to 1.0)
"""
if not HAS_PYGAME:
return False
level = max(0.0, min(1.0, level)) # Clamp to 0-1
pygame.mixer.music.set_volume(level)
self.volume = level
logger.info(f"Volume set to {level * 100:.0f}%")
return True
def create_playlist(self, tracks: List[Path]) -> None:
"""Create a playlist from tracks."""
self.playlist = tracks
self.playlist_index = 0
logger.info(f"Created playlist with {len(tracks)} tracks")
def get_track_info(self, track_path: Path) -> Dict:
"""
Get track metadata.
Args:
track_path: Path to track file
Returns:
Dictionary with track metadata
"""
info = {
"path": str(track_path),
"name": track_path.stem,
"duration": None,
"artist": None,
"album": None
}
if HAS_MUTAGEN and track_path.exists():
try:
if track_path.suffix.lower() == ".mp3":
audio = MP3(track_path, ID3=EasyID3)
info["duration"] = audio.info.length
if hasattr(audio, 'tags'):
info["artist"] = audio.tags.get("artist", [None])[0]
info["album"] = audio.tags.get("album", [None])[0]
except Exception as e:
logger.debug(f"Error reading metadata: {e}")
return info
def get_status(self) -> Dict:
"""Get current player status."""
return {
"is_playing": self.is_playing,
"is_paused": self.is_paused,
"current_track": str(self.current_track.name) if self.current_track else None,
"volume": self.volume,
"playlist_length": len(self.playlist),
"playlist_index": self.playlist_index
}
def main():
"""Test the music player."""
player = MusicPlayer()
# Print library stats
print(f"Music library: {len(player.music_library)} tracks")
# Test search
query = "test"
matches = player.search_tracks(query)
print(f"Search '{query}': {len(matches)} matches")
# Test playback
if player.music_library:
first_track = list(player.music_library.values())[0]
print(f"Playing: {first_track.name}")
player.play(first_track)
import time
time.sleep(5)
player.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()

237
openclaw_client.py Executable file
View File

@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
OpenClaw Client for Voice Assistant
Connects to OpenClaw gateway for AI responses and command processing.
"""
import os
import json
import logging
import time
import threading
from typing import Optional, Callable, Dict, Any
from pathlib import Path
try:
import websocket
HAS_WEBSOCKET = True
except ImportError:
HAS_WEBSOCKET = False
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
logger = logging.getLogger(__name__)
class OpenClawClient:
"""
Client for OpenClaw gateway communication.
Supports WebSocket and HTTP APIs.
"""
def __init__(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
self.ws_url = self.config.get("openclaw", {}).get(
"ws_url", "ws://192.168.1.100:18790"
)
self.api_key = self.config.get("openclaw", {}).get("api_key", "")
self.enabled = self.config.get("openclaw", {}).get("enabled", True)
self.ws: Optional[websocket.WebSocketApp] = None
self.is_connected = False
self.message_handlers = []
self.reconnect_interval = self.config.get("openclaw", {}).get(
"reconnect_interval", 5
)
if HAS_WEBSOCKET and self.enabled:
self._init_websocket()
logger.info(f"OpenClawClient initialized (enabled={self.enabled})")
def _load_config(self, config_path: str) -> dict:
"""Load configuration from JSON file."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {"openclaw": {"enabled": False}}
def _init_websocket(self):
"""Initialize WebSocket connection."""
if not HAS_WEBSOCKET:
logger.warning("websocket-client not installed")
return
def on_open(ws):
logger.info("WebSocket connected")
self.is_connected = True
self._on_connect()
def on_message(ws, message):
logger.debug(f"Received: {message}")
self._handle_message(message)
def on_error(ws, error):
logger.error(f"WebSocket error: {error}")
def on_close(ws, close_status_code, close_msg):
logger.info(f"WebSocket closed: {close_status_code} - {close_msg}")
self.is_connected = False
self._reconnect()
self.ws = websocket.WebSocketApp(
self.ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
# Start connection thread
thread = threading.Thread(target=self.ws.run_forever)
thread.daemon = True
thread.start()
def _on_connect(self):
"""Called when connection is established."""
# Subscribe to relevant channels or send authentication
if self.api_key:
auth_message = {
"type": "auth",
"api_key": self.api_key
}
self.send(json.dumps(auth_message))
def _reconnect(self):
"""Attempt to reconnect after disconnection."""
logger.info(f"Reconnecting in {self.reconnect_interval}s...")
time.sleep(self.reconnect_interval)
if self.ws:
self._init_websocket()
def _handle_message(self, message: str):
"""Handle incoming message."""
try:
data = json.loads(message)
for handler in self.message_handlers:
handler(data)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON: {message}")
def send(self, message: str) -> bool:
"""
Send message via WebSocket.
Args:
message: JSON string to send
Returns:
True if sent successfully
"""
if not self.is_connected or not self.ws:
logger.warning("Not connected to OpenClaw")
return False
try:
self.ws.send(message)
return True
except Exception as e:
logger.error(f"Send error: {e}")
return False
def send_request(self, query: str, context: Optional[Dict] = None) -> Dict:
"""
Send a query to OpenClaw and get response.
Args:
query: User query string
context: Optional context dictionary
Returns:
Response dictionary
"""
if not self.enabled:
return {"error": "OpenClaw client disabled"}
message = {
"type": "query",
"query": query,
"timestamp": time.time()
}
if context:
message["context"] = context
# Send via WebSocket
if self.send(json.dumps(message)):
# Wait for response (simplified - real implementation needs async handling)
time.sleep(0.5)
return {"status": "sent"}
else:
# Fall back to HTTP if WebSocket unavailable
return self._http_request(query, context)
def _http_request(self, query: str, context: Optional[Dict] = None) -> Dict:
"""Fallback HTTP request."""
if not HAS_REQUESTS:
return {"error": "HTTP client not available"}
try:
response = requests.post(
f"{self.ws_url.replace('ws://', 'http://').replace('wss://', 'https://')}/api/query",
json={"query": query, "context": context},
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"HTTP request failed: {e}")
return {"error": str(e)}
def add_message_handler(self, handler: Callable[[Dict], None]):
"""Add a handler for incoming messages."""
self.message_handlers.append(handler)
def get_status(self) -> Dict:
"""Get client status."""
return {
"enabled": self.enabled,
"connected": self.is_connected,
"ws_url": self.ws_url
}
def main():
"""Test the OpenClaw client."""
client = OpenClawClient()
# Add message handler
def on_message(data):
print(f"Received: {data}")
client.add_message_handler(on_message)
# Test connection
print(f"OpenClaw Client Status: {client.get_status()}")
# Test query
response = client.send_request("Hello, how are you?")
print(f"Response: {response}")
# Keep alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()

44
requirements.txt Normal file
View File

@ -0,0 +1,44 @@
# Google AIY Voice Kit V1 - Bilingual Voice Assistant
# Python Dependencies
# Google AIY
git+https://github.com/google/aiyprojects-raspbian.git#egg=aiy-voice-kit
aiy-voice-kit
# Google Cloud Services (optional, for cloud speech/TTS)
google-cloud-speech>=2.0.0
google-cloud-texttospeech>=2.0.0
google-cloud-speech-recognition>=3.0.0
# Audio Processing
pygame>=2.0.0
mutagen>=1.45.0
pyaudio>=0.2.11
webrtcvad>=2.0.10
# Language Detection
langdetect>=1.0.9
langid>=1.1.6
# HTTP/WebSocket Client
requests>=2.28.0
websocket-client>=1.5.0
# Voice Activity Detection
vosk>=0.3.50 # Offline speech recognition
pocketsphinx>=5.0.0 # Alternative offline recognition
# Hotword Detection
porcupine>=2.2.0 # Hotword detection
snowboy>=1.3.0 # Alternative hotword detection
# Configuration
python-dotenv>=0.19.0
pyyaml>=6.0
# Logging
colorlog>=6.0.0
# Utilities
fuzzywuzzy>=0.18.0 # Fuzzy string matching for music search
python-Levenshtein>=0.19.0 # Fast string matching

207
speech_recognizer.py Executable file
View File

@ -0,0 +1,207 @@
#!/usr/bin/env python3
"""
Bilingual Speech Recognizer
Supports English and Mandarin Chinese with automatic language detection.
"""
import os
import json
import logging
from typing import Optional, Tuple
from pathlib import Path
try:
import aiy.voice
from aiy import speech
HAS_AIY = True
except ImportError:
HAS_AIY = False
try:
from google.cloud import speech as speech_service
HAS_GOOGLE_CLOUD = True
except ImportError:
HAS_GOOGLE_CLOUD = False
try:
from langdetect import detect
HAS_LANG_DETECT = True
except ImportError:
HAS_LANG_DETECT = False
logger = logging.getLogger(__name__)
class BilingualSpeechRecognizer:
"""
Speech recognizer with automatic English/Mandarin detection.
"""
def __init__(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
self.language_cache = {}
if HAS_AIY:
self.aiy_recognizer = speech.Recognizer()
else:
self.aiy_recognizer = None
logger.info("BilingualSpeechRecognizer initialized")
def _load_config(self, config_path: str) -> dict:
"""Load configuration from JSON file."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.warning(f"Config file {config_path} not found, using defaults")
return {
"speech": {
"language": "auto",
"recognition_timeout": 5
}
}
def recognize(self, audio_data: bytes, timeout: Optional[int] = None) -> Tuple[Optional[str], str]:
"""
Recognize speech from audio data.
Args:
audio_data: Raw audio bytes
timeout: Recognition timeout in seconds
Returns:
Tuple of (recognized_text, detected_language)
"""
if timeout is None:
timeout = self.config.get("speech", {}).get("recognition_timeout", 5)
# Try Google Cloud Speech first (if available)
if HAS_GOOGLE_CLOUD and self.config.get("speech", {}).get("offline_mode", False) is False:
try:
text = self._google_cloud_recognize(audio_data)
if text:
lang = self._detect_language(text)
return text, lang
except Exception as e:
logger.warning(f"Google Cloud recognition failed: {e}")
# Fall back to AIY/local recognition
if self.aiy_recognizer:
try:
text = self._aiy_recognize(audio_data)
if text:
lang = self._detect_language(text)
return text, lang
except Exception as e:
logger.warning(f"AIY recognition failed: {e}")
# Last resort: simple language detection from text
return None, "unknown"
def _google_cloud_recognize(self, audio_data: bytes) -> Optional[str]:
"""Use Google Cloud Speech-to-Text for recognition."""
if not HAS_GOOGLE_CLOUD:
return None
client = speech_service.SpeechClient()
# Try bilingual recognition
config = speech_service.RecognitionConfig(
encoding=speech_service.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_codes=["en-US", "zh-CN", "zh-TW"],
enable_automatic_punctuation=True,
)
response = client.recognize(
config=config,
audio=speech_service.RecognitionAudio(content=audio_data)
)
if response.results:
result = response.results[0]
if result.alternatives:
return result.alternatives[0].transcript
return None
def _aiy_recognize(self, audio_data: bytes) -> Optional[str]:
"""Use AIY Voice Kit for recognition."""
if not self.aiy_recognizer:
return None
try:
# AIY uses Google's speech recognition internally
recognizer = self.aiy_recognizer
# This is a simplified version - actual implementation depends on AIY version
return None
except Exception as e:
logger.error(f"AIY recognition error: {e}")
return None
def _detect_language(self, text: str) -> str:
"""
Detect if text is English or Chinese.
Returns:
'en' for English, 'zh' for Chinese, 'unknown' otherwise
"""
if not text:
return "unknown"
# Simple heuristic: check for Chinese characters
chinese_chars = sum(1 for char in text if '\u4e00' <= char <= '\u9fff')
if chinese_chars > len(text) * 0.3: # 30% Chinese characters
return "zh"
# Use langdetect if available
if HAS_LANG_DETECT:
try:
detected = detect(text)
if detected in ["zh-cn", "zh-tw", "zh"]:
return "zh"
elif detected in ["en", "en-us", "en-gb"]:
return "en"
except:
pass
# Default to English
return "en"
def listen_for_hotword(self, callback) -> None:
"""
Listen for hotword activation.
Args:
callback: Function to call when hotword detected
"""
if not HAS_AIY:
logger.warning("AIY not available, hotword detection disabled")
return
# Implementation depends on AIY version
# This is a placeholder for the actual hotword detection
logger.info("Hotword detection enabled")
def main():
"""Test the speech recognizer."""
recognizer = BilingualSpeechRecognizer()
# Test language detection
test_texts = [
"Hello, how are you?",
"你好,你好吗?",
"Play some music",
"播放音乐"
]
for text in test_texts:
lang = recognizer._detect_language(text)
print(f"'{text}' -> Language: {lang}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()

185
test_setup.py Executable file
View File

@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""
Test Setup Script
Verifies all components are working correctly.
Run this after installation to ensure everything is configured properly.
"""
import sys
import os
from pathlib import Path
print("\n" + "="*70)
print(" 🧪 Voice Assistant - Setup Test Suite")
print(" AI Now Inc - Del Mar Demo Unit")
print("="*70)
print()
# Test counter
tests_passed = 0
tests_failed = 0
def test_result(name: str, passed: bool, message: str = ""):
global tests_passed, tests_failed
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{status}: {name}")
if message:
print(f"{message}")
if passed:
tests_passed += 1
else:
tests_failed += 1
return passed
# Test 1: Python version
print("1. Checking Python version...")
if sys.version_info >= (3, 8):
test_result("Python version", True, f"Python {sys.version}")
else:
test_result("Python version", False, f"Need Python 3.8+, have {sys.version}")
# Test 2: Required packages
print("\n2. Checking required packages...")
required_packages = [
"pygame",
"requests",
"websocket-client",
"langdetect",
"mutagen"
]
for pkg in required_packages:
try:
__import__(pkg.replace("-", "_"))
test_result(f"Package: {pkg}", True)
except ImportError:
test_result(f"Package: {pkg}", False, f"Install with: pip install {pkg}")
# Test 3: Optional packages
print("\n3. Checking optional packages...")
optional_packages = [
("pvporcupine", "Hotword detection"),
("webrtcvad", "Voice activity detection"),
("google.cloud.speech", "Google Cloud Speech"),
("google.cloud.texttospeech", "Google Cloud TTS")
]
for pkg, desc in optional_packages:
try:
__import__(pkg.replace("-", "_").replace(".", "."))
test_result(f"Optional: {pkg} ({desc})", True)
except ImportError:
test_result(f"Optional: {pkg} ({desc})", False, f"Optional - {pkg}")
# Test 4: Configuration files
print("\n4. Checking configuration...")
config_files = ["config.json", "hotword_config.json"]
for config_file in config_files:
if Path(config_file).exists():
test_result(f"Config: {config_file}", True)
else:
test_result(f"Config: {config_file}", False, "File not found")
# Test 5: Audio devices
print("\n5. Checking audio devices...")
try:
import pyaudio
pa = pyaudio.PyAudio()
device_count = pa.get_device_info()
test_result("PyAudio", True, f"Found {device_count.get('index', 0)+1} audio devices")
# Try to get default input device
try:
default_input = pa.get_default_input_device_info()
test_result("Default input device", True, default_input.get('name', 'Unknown'))
except:
test_result("Default input device", False, "No input device found")
# Try to get default output device
try:
default_output = pa.get_default_output_device_info()
test_result("Default output device", True, default_output.get('name', 'Unknown'))
except:
test_result("Default output device", False, "No output device found")
pa.terminate()
except ImportError:
test_result("PyAudio", False, "Install with: pip install pyaudio")
except Exception as e:
test_result("PyAudio", False, str(e))
# Test 6: Music directory
print("\n6. Checking music directory...")
music_path = Path("/home/pi/Music")
if music_path.exists():
test_result("Music directory", True, str(music_path))
# Count files
music_files = list(music_path.glob("**/*.mp3"))
test_result("Music files", True, f"Found {len(music_files)} MP3 files")
else:
test_result("Music directory", False, "Directory not found")
# Test 7: Module imports
print("\n7. Testing module imports...")
modules = [
"speech_recognizer",
"music_player",
"tts_engine",
"assistant",
"hotword_detector",
"openclaw_client"
]
for module in modules:
try:
__import__(module)
test_result(f"Module: {module}", True)
except ImportError as e:
test_result(f"Module: {module}", False, str(e))
except Exception as e:
test_result(f"Module: {module}", False, f"Error: {e}")
# Test 8: Component initialization
print("\n8. Testing component initialization...")
try:
from assistant import VoiceAssistant
assistant = VoiceAssistant()
test_result("VoiceAssistant", True)
except Exception as e:
test_result("VoiceAssistant", False, str(e))
try:
from tts_engine import TTSEngine
tts = TTSEngine()
test_result("TTSEngine", True)
except Exception as e:
test_result("TTSEngine", False, str(e))
try:
from music_player import MusicPlayer
player = MusicPlayer()
test_result("MusicPlayer", True, f"Library: {len(player.music_library)} tracks")
except Exception as e:
test_result("MusicPlayer", False, str(e))
# Summary
print("\n" + "="*70)
print(f" Test Summary: {tests_passed} passed, {tests_failed} failed")
print("="*70)
if tests_failed == 0:
print("\n✅ All tests passed! System is ready to use.")
print("\nNext steps:")
print(" 1. Add MP3 files to /home/pi/Music")
print(" 2. Configure OpenClaw connection in config.json")
print(" 3. Run: ./start.sh")
print(" 4. Say 'Hey Osiris' to activate!")
elif tests_failed > 0:
print(f"\n⚠️ {tests_failed} test(s) failed. Please fix the issues above.")
print("\nCommon fixes:")
print(" - Missing packages: pip install -r requirements.txt")
print(" - No audio device: Check microphone/speaker connections")
print(" - Config missing: Copy config.json to config.local.json")
print()

267
tts_engine.py Executable file
View File

@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
Text-to-Speech Engine
Supports English and Mandarin Chinese with Google Cloud TTS and offline alternatives.
"""
import os
import json
import logging
from typing import Optional, List
from pathlib import Path
try:
from google.cloud import texttospeech
HAS_GOOGLE_CLOUD = True
except ImportError:
HAS_GOOGLE_CLOUD = False
try:
import pygame
HAS_PYGAME = True
except ImportError:
HAS_PYGAME = False
logger = logging.getLogger(__name__)
class TTSEngine:
"""
Bilingual TTS engine supporting English and Mandarin Chinese.
"""
def __init__(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
# TTS configuration
tts_config = self.config.get("tts", {})
self.english_voice = tts_config.get("english_voice", "en-US-Standard-A")
self.chinese_voice = tts_config.get("chinese_voice", "zh-CN-Standard-A")
self.speed = tts_config.get("speed", 1.0)
self.pitch = tts_config.get("pitch", 0)
# Initialize Google Cloud client if available
self.client = None
if HAS_GOOGLE_CLOUD and self.config.get("openclaw", {}).get("enabled", True):
try:
self.client = texttospeech.TextToSpeechClient()
logger.info("Google Cloud TTS initialized")
except Exception as e:
logger.warning(f"Google Cloud TTS not available: {e}")
# Initialize audio output
if HAS_PYGAME:
pygame.mixer.init()
logger.info("TTSEngine initialized")
def _load_config(self, config_path: str) -> dict:
"""Load configuration."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {"tts": {}}
def speak(self, text: str, language: str = "en") -> bool:
"""
Speak text in the specified language.
Args:
text: Text to speak
language: 'en' for English, 'zh' for Chinese
Returns:
True if speech succeeded
"""
try:
# Generate speech audio
audio_data = self._synthesize(text, language)
if audio_data:
# Play audio
return self._play_audio(audio_data)
return False
except Exception as e:
logger.error(f"TTS error: {e}")
return False
def _synthesize(self, text: str, language: str) -> Optional[bytes]:
"""
Synthesize speech from text.
Args:
text: Text to synthesize
language: Language code
Returns:
Audio data or None
"""
if self.client and HAS_GOOGLE_CLOUD:
return self._google_synthesize(text, language)
else:
return self._offline_synthesize(text, language)
def _google_synthesize(self, text: str, language: str) -> Optional[bytes]:
"""Use Google Cloud TTS."""
if not self.client:
return None
# Select voice based on language
if language == "zh":
voice_name = self.chinese_voice
lang_code = "zh-CN"
else:
voice_name = self.english_voice
lang_code = "en-US"
# Configure synthesis
voice = texttospeech.VoiceSelectionParams(
language_code=lang_code,
name=voice_name,
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3,
speaking_rate=self.speed,
pitch=self.pitch,
)
synthesis_input = texttospeech.SynthesisInput(text=text)
# Perform synthesis
response = self.client.synthesize_speech(
request=texttospeech.SynthesizeSpeechRequest(
input=synthesis_input,
voice=voice,
audio_config=audio_config,
)
)
return response.audio_content
def _offline_synthesize(self, text: str, language: str) -> Optional[bytes]:
"""
Offline TTS fallback (basic system TTS).
This is a placeholder - in production, you'd use:
- espeak for English
- A Chinese TTS engine for Mandarin
"""
logger.warning("Using offline TTS (limited quality)")
# Try system TTS
try:
if language == "zh":
# Chinese TTS (if available)
os.system(f'espeak -v zh "{text}" --stdout > /tmp/tts_output.wav')
else:
# English TTS
os.system(f'espeak "{text}" --stdout > /tmp/tts_output.wav')
# Read the file
if os.path.exists('/tmp/tts_output.wav'):
with open('/tmp/tts_output.wav', 'rb') as f:
return f.read()
except Exception as e:
logger.error(f"Offline TTS failed: {e}")
return None
def _play_audio(self, audio_data: bytes) -> bool:
"""
Play audio data.
Args:
audio_data: Audio bytes (MP3 or WAV)
Returns:
True if playback succeeded
"""
if not HAS_PYGAME:
logger.warning("Pygame not available for audio playback")
return False
try:
# Save to temp file
temp_path = "/tmp/tts_audio.mp3"
with open(temp_path, 'wb') as f:
f.write(audio_data)
# Load and play
pygame.mixer.music.load(temp_path)
pygame.mixer.music.play()
# Wait for completion
while pygame.mixer.music.get_busy():
pygame.time.wait(100)
return True
except Exception as e:
logger.error(f"Audio playback error: {e}")
return False
def speak_sync(self, text: str, language: str = "en",
on_complete=None) -> bool:
"""
Synchronous speech with optional callback.
Args:
text: Text to speak
language: Language code
on_complete: Callback function when done
Returns:
True if speech succeeded
"""
result = self.speak(text, language)
if on_complete:
on_complete(result)
return result
def get_voices(self) -> List[dict]:
"""Get list of available voices."""
voices = []
if self.client and HAS_GOOGLE_CLOUD:
try:
response = self.client.list_voices()
for voice in response.voices:
voices.append({
"name": voice.name,
"language": voice.language_codes,
"gender": voice.ssml_gender
})
except Exception as e:
logger.error(f"Error listing voices: {e}")
return voices
def main():
"""Test the TTS engine."""
tts = TTSEngine()
# Test English
print("Testing English TTS...")
tts.speak("Hello! I am your voice assistant.", "en")
# Test Chinese
print("Testing Chinese TTS...")
tts.speak("你好!我是你的语音助手。", "zh")
# List available voices
voices = tts.get_voices()
print(f"\nAvailable voices: {len(voices)}")
for voice in voices[:5]: # Show first 5
print(f" - {voice['name']} ({', '.join(voice['language'])})")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()

53
uninstall.sh Executable file
View File

@ -0,0 +1,53 @@
#!/bin/bash
#
# Uninstall Voice Assistant
#
# AI Now Inc - Del Mar Demo Unit
#
set -e
echo "=========================================="
echo " Uninstall Voice Assistant"
echo "=========================================="
echo ""
# Confirm uninstall
read -p "Are you sure you want to uninstall? (y/N): " confirm
if [[ ! $confirm =~ ^[Yy]$ ]]; then
echo "Uninstall cancelled."
exit 0
fi
# Stop service
echo "Stopping service..."
sudo systemctl stop voice-assistant 2>/dev/null || true
sudo systemctl disable voice-assistant 2>/dev/null || true
sudo rm -f /etc/systemd/system/voice-assistant.service
# Remove installation directory
INSTALL_DIR="/home/pi/voice-assistant"
if [ -d "$INSTALL_DIR" ]; then
echo "Removing $INSTALL_DIR..."
sudo rm -rf "$INSTALL_DIR"
fi
# Remove music directory (optional)
MUSIC_DIR="/home/pi/Music"
if [ -d "$MUSIC_DIR" ]; then
read -p "Remove music directory ($MUSIC_DIR)? (y/N): " remove_music
if [[ $remove_music =~ ^[Yy]$ ]]; then
sudo rm -rf "$MUSIC_DIR"
fi
fi
# Clean up systemd
sudo systemctl daemon-reload
echo ""
echo "Uninstall complete!"
echo ""
echo "To reinstall, run:"
echo " cd /path/to/voice-assistant"
echo " sudo ./install.sh"
echo ""