openclaw-voice-assistant/music_player.py
Claw - AI Now Inc 1662bc141a Initial commit: Bilingual Voice Assistant for Google AIY Voice Kit V1
Features:
- Bilingual support (English/Mandarin Chinese)
- Hotword detection: 'Hey Osiris' / '你好 Osiris'
- Music playback control (MP3, WAV, OGG, FLAC)
- OpenClaw integration for AI responses
- Google AIY Voice Kit V1 compatible
- Text-to-speech in both languages
- Voice command recognition
- Raspberry Pi ready with installation script

AI Now Inc - Del Mar Demo Unit 🏭
2026-03-01 00:02:49 -08:00

313 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Music Player for Google AIY Voice Kit
Supports MP3 playback with voice control.
"""
import os
import json
import logging
import random
from pathlib import Path
from typing import Optional, List, Dict
from datetime import datetime
try:
import pygame
HAS_PYGAME = True
except ImportError:
HAS_PYGAME = False
try:
from mutagen.mp3 import MP3
from mutagen.easyid3 import EasyID3
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
logger = logging.getLogger(__name__)
class MusicPlayer:
"""
MP3 music player with voice control support.
"""
def __init__(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
self.music_path = Path(self.config.get("music", {}).get(
"library_path", "/home/pi/Music"
))
self.volume = self.config.get("music", {}).get("default_volume", 0.7)
self.supported_formats = self.config.get("music", {}).get(
"supported_formats", [".mp3", ".wav", ".ogg", ".flac"]
)
self.current_track: Optional[Path] = None
self.playlist: List[Path] = []
self.playlist_index: int = 0
self.is_playing: bool = False
self.is_paused: bool = False
# Initialize pygame mixer
if HAS_PYGAME:
pygame.mixer.init()
pygame.mixer.music.set_volume(self.volume)
# Scan music library
self.music_library = self._scan_library()
logger.info(f"MusicPlayer initialized with {len(self.music_library)} tracks")
def _load_config(self, config_path: str) -> dict:
"""Load configuration from JSON file."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {"music": {"library_path": "/home/pi/Music"}}
def _scan_library(self) -> Dict[str, Path]:
"""
Scan music library for supported formats.
Returns:
Dictionary mapping track names to file paths
"""
library = {}
if not self.music_path.exists():
logger.warning(f"Music path {self.music_path} does not exist")
return library
for root, dirs, files in os.walk(self.music_path):
for file in files:
file_path = Path(root) / file
if file_path.suffix.lower() in self.supported_formats:
# Use filename without extension as key
track_name = file_path.stem.lower()
library[track_name] = file_path
logger.debug(f"Added track: {track_name}")
return library
def search_tracks(self, query: str) -> List[Path]:
"""
Search for tracks matching the query.
Args:
query: Search query (partial match)
Returns:
List of matching track paths
"""
query_lower = query.lower()
matches = []
# Exact match first
if query_lower in self.music_library:
return [self.music_library[query_lower]]
# Partial matches
for track_name, path in self.music_library.items():
if query_lower in track_name:
matches.append(path)
# If no matches, return all tracks (for "play music")
if not matches:
matches = list(self.music_library.values())
return matches[:10] # Limit results
def play(self, track_path: Optional[Path] = None) -> bool:
"""
Play a track.
Args:
track_path: Path to track (None for next in playlist)
Returns:
True if playback started successfully
"""
if not HAS_PYGAME:
logger.error("Pygame not available")
return False
try:
# If no track specified, use playlist
if track_path is None:
if self.playlist and self.playlist_index < len(self.playlist):
track_path = self.playlist[self.playlist_index]
else:
logger.warning("No track to play")
return False
if not track_path or not track_path.exists():
logger.warning(f"Track not found: {track_path}")
return False
logger.info(f"Playing: {track_path.name}")
pygame.mixer.music.load(str(track_path))
pygame.mixer.music.play()
self.current_track = track_path
self.is_playing = True
self.is_paused = False
return True
except Exception as e:
logger.error(f"Playback error: {e}")
return False
def pause(self) -> bool:
"""Pause current playback."""
if not HAS_PYGAME or not self.is_playing:
return False
try:
pygame.mixer.music.pause()
self.is_paused = True
logger.info("Playback paused")
return True
except Exception as e:
logger.error(f"Pause error: {e}")
return False
def resume(self) -> bool:
"""Resume paused playback."""
if not HAS_PYGAME or not self.is_paused:
return False
try:
pygame.mixer.music.unpause()
self.is_paused = False
logger.info("Playback resumed")
return True
except Exception as e:
logger.error(f"Resume error: {e}")
return False
def stop(self) -> bool:
"""Stop playback."""
if not HAS_PYGAME:
return False
try:
pygame.mixer.music.stop()
self.is_playing = False
self.is_paused = False
self.current_track = None
logger.info("Playback stopped")
return True
except Exception as e:
logger.error(f"Stop error: {e}")
return False
def next(self) -> bool:
"""Play next track in playlist."""
if not self.playlist:
return False
self.playlist_index = (self.playlist_index + 1) % len(self.playlist)
return self.play()
def previous(self) -> bool:
"""Play previous track in playlist."""
if not self.playlist:
return False
self.playlist_index = (self.playlist_index - 1) % len(self.playlist)
return self.play()
def set_volume(self, level: float) -> bool:
"""
Set volume level.
Args:
level: Volume level (0.0 to 1.0)
"""
if not HAS_PYGAME:
return False
level = max(0.0, min(1.0, level)) # Clamp to 0-1
pygame.mixer.music.set_volume(level)
self.volume = level
logger.info(f"Volume set to {level * 100:.0f}%")
return True
def create_playlist(self, tracks: List[Path]) -> None:
"""Create a playlist from tracks."""
self.playlist = tracks
self.playlist_index = 0
logger.info(f"Created playlist with {len(tracks)} tracks")
def get_track_info(self, track_path: Path) -> Dict:
"""
Get track metadata.
Args:
track_path: Path to track file
Returns:
Dictionary with track metadata
"""
info = {
"path": str(track_path),
"name": track_path.stem,
"duration": None,
"artist": None,
"album": None
}
if HAS_MUTAGEN and track_path.exists():
try:
if track_path.suffix.lower() == ".mp3":
audio = MP3(track_path, ID3=EasyID3)
info["duration"] = audio.info.length
if hasattr(audio, 'tags'):
info["artist"] = audio.tags.get("artist", [None])[0]
info["album"] = audio.tags.get("album", [None])[0]
except Exception as e:
logger.debug(f"Error reading metadata: {e}")
return info
def get_status(self) -> Dict:
"""Get current player status."""
return {
"is_playing": self.is_playing,
"is_paused": self.is_paused,
"current_track": str(self.current_track.name) if self.current_track else None,
"volume": self.volume,
"playlist_length": len(self.playlist),
"playlist_index": self.playlist_index
}
def main():
"""Test the music player."""
player = MusicPlayer()
# Print library stats
print(f"Music library: {len(player.music_library)} tracks")
# Test search
query = "test"
matches = player.search_tracks(query)
print(f"Search '{query}': {len(matches)} matches")
# Test playback
if player.music_library:
first_track = list(player.music_library.values())[0]
print(f"Playing: {first_track.name}")
player.play(first_track)
import time
time.sleep(5)
player.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()