diff --git a/voicebot/bots/minimal.py b/voicebot/bots/minimal.py index 2401568..699a001 100644 --- a/voicebot/bots/minimal.py +++ b/voicebot/bots/minimal.py @@ -11,8 +11,9 @@ import time from av.audio.frame import AudioFrame from av import VideoFrame from aiortc import MediaStreamTrack -from typing import Dict, Optional, Tuple, Any +from typing import Awaitable, Callable, Dict, Optional, Tuple, Any, Union from shared.logger import logger +from shared.models import ChatMessageModel # Global registry to store active tracks by session @@ -495,6 +496,15 @@ def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]: return create_minimal_bot_tracks(session_name) + +async def handle_chat_message( + chat_message: ChatMessageModel, + send_message_func: Callable[[Union[str, ChatMessageModel]], Awaitable[None]] +) -> Optional[str]: + """Handle chat messages.""" + logger.info(f"Received chat message: {chat_message.message}") + return None + def handle_config_update(lobby_id: str, config_values: Dict[str, Any]) -> bool: """ Handle runtime configuration updates for the minimal bot. diff --git a/voicebot/bots/vibevoice.py b/voicebot/bots/vibevoice.py index 92eabd7..c34d9c6 100644 --- a/voicebot/bots/vibevoice.py +++ b/voicebot/bots/vibevoice.py @@ -1,111 +1,39 @@ -#!/usr/bin/env python3 """ -VibeVoice Text-to-Speech Bot for Voicebot Framework +VibeVoice WebRTC Bot - Text-to-Speech WebRTC Agent -Integrates Microsoft's VibeVoice TTS with the voicebot framework. -Watches for chat messages and converts them to speech with text display. +A WebRTC bot that converts incoming text messages to speech using VibeVoice +and streams the generated audio in real-time. """ -import threading -import queue -import time +import secrets import numpy as np import cv2 -import math -from typing import Dict, Optional, Any, Tuple, Callable, Awaitable, Union +import fractions +import time +import threading +from queue import Queue, Empty +from typing import Awaitable, Callable, Dict, Optional, Tuple, Any, Union +import torch +import librosa +import soundfile as sf from av.audio.frame import AudioFrame from av import VideoFrame from aiortc import MediaStreamTrack -import fractions -import torch -import librosa -import os + +from vibevoice.modular.modeling_vibevoice_inference import VibeVoiceForConditionalGenerationInference +from vibevoice.processor.vibevoice_processor import VibeVoiceProcessor +from vibevoice.modular.streamer import AudioStreamer +from transformers.utils import logging + from shared.logger import logger from shared.models import ChatMessageModel -# Implement a local WaveformVideoTrack-like helper to hold shared waveform buffers -# and lightweight speech status per session. This avoids depending on bots.whisper -class WaveformVideoTrack: - """Lightweight shared storage for waveform visualization and speech status. - - This class is not itself a MediaStreamTrack; it's used as a shared in-memory - store that video tracks in this file will read from to render waveforms and - status overlays. - """ - - # session_name -> np.ndarray(float32) containing recent audio samples (mono) - buffer: Dict[str, np.ndarray] = {} - - # session_name -> dict with status flags (is_speech, energy, is_processing, is_playing, etc.) - speech_status: Dict[str, Dict[str, Any]] = {} - - # session_name -> sample_rate used for that buffer - sample_rates: Dict[str, int] = {} - - -# Proxy wrapper for AudioStreamer to log put() calls and basic stats without -# modifying upstream VibeVoice internals. We'll wrap any created AudioStreamer -# with this to capture whether model.generate() actually calls put(). -class ProxyAudioStreamer: - def __init__(self, real_streamer, session_name: Optional[str] = None): - self._real = real_streamer - self.session_name = session_name or "unknown" - self.put_calls = 0 - self.total_samples = 0 - - def put(self, audio_chunk, *args, **kwargs): - # Try to measure number of samples in the chunk for diagnostics - try: - if torch.is_tensor(audio_chunk): - length = int(audio_chunk.numel()) - else: - arr = np.array(audio_chunk) - length = int(arr.size) - except Exception: - length = -1 - - try: - # Inspect possible sample_indices positional argument for diagnostics - si_info = None - if len(args) >= 1: - try: - si = args[0] - if torch.is_tensor(si): - si_info = f"tensor(shape={tuple(si.shape)}, min={int(torch.min(si).item())}, max={int(torch.max(si).item())}, unique={int(len(torch.unique(si)))} )" - else: - arrsi = np.array(si) - si_info = f"array(shape={arrsi.shape}, min={int(arrsi.min()) if arrsi.size>0 else -1}, max={int(arrsi.max()) if arrsi.size>0 else -1}, unique={int(len(np.unique(arrsi))) if arrsi.size>0 else 0})" - except Exception: - si_info = str(type(args[0])) - - logger.info(f"VibeVoice audio: ProxyAudioStreamer.put called for session {self.session_name} - samples={length} sample_indices={si_info}") - except Exception: - pass - - self.put_calls += 1 - if length > 0: - self.total_samples += length - - return getattr(self._real, 'put')(audio_chunk, *args, **kwargs) - - def get_stream(self, *args, **kwargs): - return getattr(self._real, 'get_stream')(*args, **kwargs) - - def end(self, *args, **kwargs): - return getattr(self._real, 'end')(*args, **kwargs) - - def __getattr__(self, name): - return getattr(self._real, name) - - -# Import VibeVoice components -try: - from vibevoice import VibeVoiceForConditionalGenerationInference, VibeVoiceProcessor - from vibevoice.modular.streamer import AudioStreamer -except Exception as e: - logger.warning("VibeVoice not available. Install with: git clone https://github.com/microsoft/VibeVoice.git && cd VibeVoice && pip install -e .") - raise e +# Configure logging +logging.set_verbosity_info() +tts_logger = logging.get_logger(__name__) +# Global registry to store active tracks by session +_active_tracks: Dict[str, Dict[str, Any]] = {} class MediaClock: @@ -118,88 +46,78 @@ class MediaClock: return time.perf_counter() - self.t0 -class VibeVoiceTTS: - """Minimal VibeVoice Text-to-Speech wrapper.""" - - def __init__(self, device: str = "cpu", inference_steps: int = 10, config: Optional[Dict[str, Any]] = None): +class VibeVoiceWebRTCProcessor: + """VibeVoice processor adapted for WebRTC streaming.""" + + def __init__(self, model_path: str, device: str = "cuda", inference_steps: int = 5): + """Initialize the VibeVoice processor for WebRTC.""" + self.model_path = model_path self.device = device self.inference_steps = inference_steps - self.config = config or {} - self.model = None - self.processor = None - self.sample_rate = 24000 # VibeVoice uses 24kHz - self.is_initialized = False - self.voice_presets = {} - self.available_voices = {} - + self.is_loaded = False + self.audio_queue = Queue() + self.is_generating = False + self.load_model() + self.setup_voice_presets() + + def load_model(self): + """Load the VibeVoice model and processor.""" try: - self._initialize_model() - self._setup_voice_presets() - except Exception as e: - logger.error(f"Failed to initialize VibeVoice: {e}") - - def _initialize_model(self): - """Initialize the VibeVoice model with robust device handling.""" - try: - logger.info("Loading VibeVoice model...") - - # Normalize potential 'mpx' + logger.info(f"Loading VibeVoice model from {self.model_path}") + + # Normalize device if self.device.lower() == "mpx": - logger.info("Note: device 'mpx' detected, treating it as 'mps'.") self.device = "mps" if self.device == "mps" and not torch.backends.mps.is_available(): - logger.warning("Warning: MPS not available. Falling back to CPU.") + logger.warning("MPS not available. Falling back to CPU.") self.device = "cpu" - - logger.info(f"Using device: {self.device}") - + # Load processor - self.processor = VibeVoiceProcessor.from_pretrained("vibevoice/VibeVoice-1.5B") - - # Decide dtype & attention + self.processor = VibeVoiceProcessor.from_pretrained(self.model_path) + + # Determine dtype and attention implementation if self.device == "mps": load_dtype = torch.float32 - attn_impl_primary = "sdpa" + attn_impl = "sdpa" elif self.device == "cuda": load_dtype = torch.bfloat16 - attn_impl_primary = "flash_attention_2" + attn_impl = "flash_attention_2" else: load_dtype = torch.float32 - attn_impl_primary = "sdpa" - - logger.info(f"Using device: {self.device}, torch_dtype: {load_dtype}, attn_implementation: {attn_impl_primary}") - - # Load model + attn_impl = "sdpa" + + logger.info(f"Using device: {self.device}, dtype: {load_dtype}, attention: {attn_impl}") + + # Load model with fallback try: if self.device == "mps": self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( - "vibevoice/VibeVoice-1.5B", + self.model_path, torch_dtype=load_dtype, - attn_implementation=attn_impl_primary, + attn_implementation=attn_impl, device_map=None, ) self.model.to("mps") elif self.device == "cuda": self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( - "vibevoice/VibeVoice-1.5B", + self.model_path, torch_dtype=load_dtype, device_map="cuda", - attn_implementation=attn_impl_primary, + attn_implementation=attn_impl, ) else: self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( - "vibevoice/VibeVoice-1.5B", + self.model_path, torch_dtype=load_dtype, device_map="cpu", - attn_implementation=attn_impl_primary, + attn_implementation=attn_impl, ) except Exception as e: - if attn_impl_primary == 'flash_attention_2': - logger.warning(f"Error with flash_attention_2: {e}") - logger.info("Falling back to attention implementation: sdpa") + if attn_impl == 'flash_attention_2': + logger.warning(f"Flash attention failed: {e}, falling back to SDPA") fallback_attn = "sdpa" self.model = VibeVoiceForConditionalGenerationInference.from_pretrained( - "vibevoice/VibeVoice-1.5B", + self.model_path, torch_dtype=load_dtype, device_map=(self.device if self.device in ("cuda", "cpu") else None), attn_implementation=fallback_attn, @@ -208,1134 +126,459 @@ class VibeVoiceTTS: self.model.to("mps") else: raise e - + self.model.eval() - - # Use SDE solver by default + + # Configure noise scheduler self.model.model.noise_scheduler = self.model.model.noise_scheduler.from_config( - self.model.model.noise_scheduler.config, + self.model.model.noise_scheduler.config, algorithm_type='sde-dpmsolver++', beta_schedule='squaredcos_cap_v2' ) self.model.set_ddpm_inference_steps(num_steps=self.inference_steps) - - if hasattr(self.model.model, 'language_model'): - logger.info(f"Language model attention: {self.model.model.language_model.config._attn_implementation}") - - self.is_initialized = True - logger.info("VibeVoice model loaded successfully!") - + + self.is_loaded = True + logger.info("VibeVoice model loaded successfully") + except Exception as e: - logger.error(f"Error loading VibeVoice model: {e}") - raise - - def _setup_voice_presets(self): + logger.error(f"Failed to load VibeVoice model: {e}") + self.is_loaded = False + + def setup_voice_presets(self): """Setup voice presets by scanning the voices directory.""" - # Look for voices directory in multiple possible locations - possible_voice_dirs = [ - os.path.join(os.path.dirname(__file__), "voices"), # /voicebot/bots/voices/ - os.path.join(os.path.dirname(__file__), "..", "VibeVoice", "demo", "voices"), # /voicebot/VibeVoice/demo/voices/ - "/voicebot/VibeVoice/demo/voices", # Absolute path - ] + import os + voices_dir = os.path.join(os.path.dirname(__file__), "voices") - voices_dir = None - for possible_dir in possible_voice_dirs: - if os.path.exists(possible_dir): - voices_dir = possible_dir - break + self.voice_presets = {} + self.available_voices = {} - # Check if voices directory exists - if not voices_dir: - logger.warning(f"Warning: Voices directory not found in any of: {possible_voice_dirs}") - self.voice_presets = {} - self.available_voices = {} - self.speaker_mapping = {} + if not os.path.exists(voices_dir): + logger.warning(f"Voices directory not found at {voices_dir}") + # Create default fallback voices + self.available_voices = { + 'default_female': None, + 'default_male': None + } return - # Scan for all WAV files in the voices directory - self.voice_presets = {} - - # Get all supported audio files + # Scan for audio files audio_extensions = ('.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac') - audio_files = [f for f in os.listdir(voices_dir) - if f.lower().endswith(audio_extensions) and os.path.isfile(os.path.join(voices_dir, f))] - - # Create dictionary with filename (without extension) as key - for audio_file in audio_files: - # Remove extension to get the name - name = os.path.splitext(audio_file)[0] - # Create full path - full_path = os.path.join(voices_dir, audio_file) - self.voice_presets[name] = full_path - - # Sort the voice presets alphabetically by name for better UI - self.voice_presets = dict(sorted(self.voice_presets.items())) - - # Filter out voices that don't exist (this is now redundant but kept for safety) + for filename in os.listdir(voices_dir): + if filename.lower().endswith(audio_extensions): + name = os.path.splitext(filename)[0] + full_path = os.path.join(voices_dir, filename) + if os.path.isfile(full_path): + self.voice_presets[name] = full_path + self.available_voices = { name: path for name, path in self.voice_presets.items() if os.path.exists(path) } - - # Map speaker numbers (1, 2, 3, 4) to available voice files - self.speaker_mapping = {} - available_voice_names = list(self.available_voices.keys()) - for i in range(1, 5): # Support speakers 1-4 - if i <= len(available_voice_names): - voice_name = available_voice_names[i-1] # 0-indexed - self.speaker_mapping[str(i)] = voice_name - logger.info(f"Mapped Speaker {i} to voice '{voice_name}'") - else: - logger.warning(f"No voice file available for Speaker {i}") - - if not self.available_voices: - logger.warning("No voice presets found. Please add audio files to the voices directory.") + + if self.available_voices: + logger.info(f"Found {len(self.available_voices)} voice presets: {list(self.available_voices.keys())}") else: - logger.info(f"Found {len(self.available_voices)} voice files in {voices_dir}") - logger.info(f"Available voices: {', '.join(self.available_voices.keys())}") - logger.info(f"Speaker mapping: {self.speaker_mapping}") - + logger.warning("No voice presets found") + def read_audio(self, audio_path: str, target_sr: int = 24000) -> np.ndarray: """Read and preprocess audio file.""" try: - import soundfile as sf + if audio_path is None: + # Return a simple sine wave as fallback + duration = 1.0 # 1 second + t = np.linspace(0, duration, int(target_sr * duration)) + return np.sin(2 * np.pi * 440 * t).astype(np.float32) + wav, sr = sf.read(audio_path) if len(wav.shape) > 1: wav = np.mean(wav, axis=1) if sr != target_sr: wav = librosa.resample(wav, orig_sr=sr, target_sr=target_sr) - return wav + return wav.astype(np.float32) except Exception as e: logger.error(f"Error reading audio {audio_path}: {e}") - return np.array([]) - - def generate_speech(self, text: str, speaker: str = "1", cfg_scale: float = 1.3) -> Optional[np.ndarray]: - """Generate speech using the AudioStreamer and return a single concatenated numpy array. - - This removes the old synchronous model.generate path and uses the streamer-based - generation even for blocking calls. Returns None if generation isn't possible. - """ - # Must have model initialized and streamer available - if not self.is_initialized: - logger.error("VibeVoice TTS: Model not initialized - cannot generate speech synchronously") - return None - + # Return silence as fallback + return np.zeros(target_sr, dtype=np.float32) + + async def generate_speech_async(self, text: str, voice_name: str = None, cfg_scale: float = 1.3): + """Generate speech asynchronously and put audio chunks in queue.""" + if not self.is_loaded: + logger.error("Model not loaded, cannot generate speech") + return + + if self.is_generating: + logger.warning("Already generating speech, skipping new request") + return + + self.is_generating = True + try: - # Prepare formatted text and voice samples (same as demo) - formatted_text = f"Speaker {speaker}: {text}" - voice_samples = [] - if speaker in self.speaker_mapping: - voice_name = self.speaker_mapping[speaker] - if voice_name in self.available_voices: - audio_path = self.available_voices[voice_name] - audio_data = self.read_audio(audio_path) - if len(audio_data) > 0: - voice_samples.append(audio_data) - else: - voice_samples.append([]) - else: - voice_samples.append([]) + # Select voice + if voice_name and voice_name in self.available_voices: + voice_path = self.available_voices[voice_name] else: - voice_samples.append([]) - - inputs = self.processor( # type: ignore - text=[formatted_text], + # Use first available voice or fallback + if self.available_voices: + voice_path = list(self.available_voices.values())[0] + voice_name = list(self.available_voices.keys())[0] + else: + voice_path = None + voice_name = "default" + + logger.info(f"Generating speech for text: '{text[:50]}...' using voice: {voice_name}") + + # Load voice sample + voice_sample = self.read_audio(voice_path) + voice_samples = [voice_sample] + + # Format script + formatted_script = f"Speaker 0: {text}" + + # Process input + inputs = self.processor( + text=[formatted_script], voice_samples=[voice_samples], padding=True, - return_tensors="pt" + return_tensors="pt", + return_attention_mask=True, ) - - # Move tensors to device + + # Move to device target_device = self.device if self.device in ("cuda", "mps") else "cpu" for k, v in inputs.items(): if torch.is_tensor(v): inputs[k] = v.to(target_device) - - # Create streamer and run generation - real_streamer = AudioStreamer(batch_size=1, stop_signal=None, timeout=None) - audio_streamer = ProxyAudioStreamer(real_streamer, session_name=self.session_name) - - with torch.no_grad(): + + # Create audio streamer + audio_streamer = AudioStreamer(batch_size=1, stop_signal=None, timeout=None) + + # Run generation in thread + def generate_worker(): try: - self.model.generate( # type: ignore + outputs = self.model.generate( **inputs, max_new_tokens=None, cfg_scale=cfg_scale, - tokenizer=self.processor.tokenizer, # type: ignore + tokenizer=self.processor.tokenizer, generation_config={'do_sample': False}, + audio_streamer=audio_streamer, verbose=False, - streamer=audio_streamer, + refresh_negative=True, ) - finally: - # ensure streamer end if model.generate returns - try: - audio_streamer.end() - except Exception: - pass - - # Collect streamed chunks - collected = [] - for audio_chunk in audio_streamer.get_stream(0): - try: - if torch.is_tensor(audio_chunk): - if audio_chunk.dtype == torch.bfloat16: - audio_chunk = audio_chunk.float() - audio_np = audio_chunk.cpu().numpy().astype(np.float32) - else: - audio_np = np.array(audio_chunk, dtype=np.float32) - - if audio_np.ndim > 1: - audio_np = audio_np.squeeze() - - collected.append(audio_np) except Exception as e: - logger.error(f"VibeVoice TTS: Error collecting chunk: {e}") - - if not collected: - logger.error("VibeVoice TTS: No audio chunks received from streamer") - return None - - audio = np.concatenate(collected) - - # Mix with background noise if enabled - noise_type = self.config.get('background_noise_type', 'none') - noise_volume = self.config.get('background_noise_volume', 0.0) - audio = self.mix_audio_with_background_noise(audio, noise_type, noise_volume) - - # Resample to 16kHz for compatibility with existing audio pipeline - audio_16k = librosa.resample(audio, orig_sr=24000, target_sr=16000) - return audio_16k.astype(np.float32) - - except Exception as e: - logger.error(f"VibeVoice TTS: Error generating speech via streamer: {e}") - return None - - def generate_background_noise(self, duration_seconds: float, noise_type: str = "white", volume: float = 0.01, sample_rate: Optional[int] = None) -> np.ndarray: - """Generate background noise of specified type and duration.""" - if sample_rate is None: - sample_rate = self.sample_rate + logger.error(f"Error in generation: {e}") + audio_streamer.end() - if noise_type == "none": - return np.zeros(int(duration_seconds * sample_rate), dtype=np.float32) - - num_samples = int(duration_seconds * sample_rate) - - if noise_type == "white": - # White noise - equal power across all frequencies - noise = np.random.normal(0, 1, num_samples).astype(np.float32) - elif noise_type == "pink": - # Pink noise - 1/f frequency response (approximated) - white = np.random.normal(0, 1, num_samples).astype(np.float32) - # Simple pink noise approximation using IIR filter - b = [0.049922035, -0.095993537, 0.050612699, -0.004408786] - a = [1, -2.494956002, 2.017265875, -0.522189400] - noise = np.zeros_like(white) - for i in range(len(b), len(white)): - noise[i] = b[0] * white[i] + b[1] * white[i-1] + b[2] * white[i-2] + b[3] * white[i-3] - a[1] * noise[i-1] - a[2] * noise[i-2] - a[3] * noise[i-3] - elif noise_type == "brown": - # Brown noise - 1/f² frequency response (integrated white noise) - white = np.random.normal(0, 1, num_samples).astype(np.float32) - noise = np.cumsum(white) - # Normalize to prevent drift - noise = (noise - np.mean(noise)) / np.std(noise) - else: - # Default to white noise - noise = np.random.normal(0, 1, num_samples).astype(np.float32) - - # Apply volume - noise *= volume - return noise - - def mix_audio_with_background_noise(self, audio: np.ndarray, noise_type: str = "white", volume: float = 0.01) -> np.ndarray: - """Mix generated audio with background noise.""" - # Default to disabled when not present in config to avoid unexpected noise - if not self.config.get('background_noise_enabled', False): - return audio - - # Generate background noise for the duration of the audio using the TTS sample rate - duration_seconds = len(audio) / self.sample_rate - background_noise = self.generate_background_noise(duration_seconds, noise_type, volume, self.sample_rate) - - # Mix audio with background noise - mixed_audio = audio + background_noise - - # Normalize to prevent clipping - max_val = np.max(np.abs(mixed_audio)) - if max_val > 1.0: - mixed_audio /= max_val - - return mixed_audio + # Start generation + generation_thread = threading.Thread(target=generate_worker) + generation_thread.start() + + # Process audio chunks + audio_stream = audio_streamer.get_stream(0) + + for audio_chunk in audio_stream: + if torch.is_tensor(audio_chunk): + if audio_chunk.dtype == torch.bfloat16: + audio_chunk = audio_chunk.float() + audio_np = audio_chunk.cpu().numpy().astype(np.float32) + else: + audio_np = np.array(audio_chunk, dtype=np.float32) + + # Ensure 1D + if len(audio_np.shape) > 1: + audio_np = audio_np.squeeze() + + # Put in queue for audio track + self.audio_queue.put(audio_np) + + # Wait for generation to complete + generation_thread.join(timeout=30.0) + + # Signal end of audio + self.audio_queue.put(None) # End marker + + logger.info("Speech generation completed") + + except Exception as e: + logger.error(f"Error generating speech: {e}") + finally: + self.is_generating = False -class VibeVoiceVideoTrack(MediaStreamTrack): - """Video track that displays text being spoken.""" - - kind = "video" - - def __init__(self, clock, config: Dict[str, Any], session_name: Optional[str] = None): +class VibeVoiceAudioTrack(MediaStreamTrack): + """Audio track that streams VibeVoice generated speech.""" + + kind = "audio" + + def __init__(self, clock: MediaClock, config: Dict[str, Any], tts_processor: VibeVoiceWebRTCProcessor): + """Initialize the VibeVoice audio track.""" super().__init__() self.clock = clock self.config = config - # Keep session_name for looking up waveform buffers and status - self.session_name = session_name or config.get('session_name') or f"VibeVoice:{int(time.time())}" - self.width = config.get('width', 640) - self.height = config.get('height', 480) - self.fps = config.get('fps', 15) - - # Text display state - self.current_text = "" - self.text_queue = queue.Queue() - self.display_start_time = 0 - self.display_duration = 3.0 # seconds to display each text - self.frame_count = 0 - - # Font settings - self.font = cv2.FONT_HERSHEY_SIMPLEX - self.font_scale = min(self.width, self.height) / 800 - self.font_thickness = max(1, int(self.font_scale * 2)) - - def update_text(self, text: str): - """Update the text to display.""" - self.text_queue.put(text) - logger.info(f"VibeVoice video: Queued text '{text}'") - + self.tts_processor = tts_processor + self.sample_rate = config.get('sample_rate', 24000) # VibeVoice uses 24kHz + self.samples_per_frame = config.get('samples_per_frame', 480) # 20ms at 24kHz + self._samples_generated: int = 0 + + # Audio buffer for TTS audio + self.audio_buffer = np.array([], dtype=np.float32) + self.buffer_lock = threading.Lock() + + # Fallback tone parameters + self.frequency = config.get('frequency', 440.0) + self.volume = config.get('volume', 0.1) # Lower default volume + self.mode = config.get('audio_mode', 'silence') # Default to silence when no TTS + def update_config(self, config_updates: Dict[str, Any]) -> bool: - """Update video configuration.""" + """Update the audio track configuration.""" try: self.config.update(config_updates) + + if 'sample_rate' in config_updates: + self.sample_rate = config_updates['sample_rate'] + if 'samples_per_frame' in config_updates: + self.samples_per_frame = config_updates['samples_per_frame'] + if 'frequency' in config_updates: + self.frequency = config_updates['frequency'] + if 'volume' in config_updates: + self.volume = config_updates['volume'] + if 'audio_mode' in config_updates: + self.mode = config_updates['audio_mode'] + + logger.info(f"VibeVoice audio track configuration updated: {config_updates}") + return True + except Exception as e: + logger.error(f"Error updating VibeVoice audio track configuration: {e}") + return False + + def add_tts_audio(self, audio_chunk: np.ndarray): + """Add TTS generated audio to the buffer.""" + with self.buffer_lock: + self.audio_buffer = np.append(self.audio_buffer, audio_chunk) + + def _consume_audio_buffer(self, num_samples: int) -> np.ndarray: + """Consume audio samples from the buffer.""" + with self.buffer_lock: + if len(self.audio_buffer) >= num_samples: + samples = self.audio_buffer[:num_samples].copy() + self.audio_buffer = self.audio_buffer[num_samples:] + return samples + elif len(self.audio_buffer) > 0: + # Return what we have and pad with silence + samples = self.audio_buffer.copy() + self.audio_buffer = np.array([], dtype=np.float32) + padding = np.zeros(num_samples - len(samples), dtype=np.float32) + return np.concatenate([samples, padding]) + else: + return np.zeros(num_samples, dtype=np.float32) + + async def next_timestamp(self) -> Tuple[int, float]: + pts = self._samples_generated + time_base = 1 / self.sample_rate + return pts, time_base + + async def recv(self) -> AudioFrame: + pts, time_base = await self.next_timestamp() + + # Check for TTS audio in queue and add to buffer + while True: + try: + audio_chunk = self.tts_processor.audio_queue.get_nowait() + if audio_chunk is None: + # End marker + break + self.add_tts_audio(audio_chunk) + except Empty: + break + + # Try to get audio from TTS buffer first + samples = self._consume_audio_buffer(self.samples_per_frame) + + # If no TTS audio available, generate fallback audio based on mode + if np.all(samples == 0) and self.mode != 'silence': + if self.mode == 'tone': + samples = self._generate_tone() + elif self.mode == 'noise': + samples = self._generate_noise() + + # Convert to WebRTC format (16-bit stereo) + # Resample from 24kHz to target sample rate if needed + if self.sample_rate != 24000: + samples = librosa.resample(samples, orig_sr=24000, target_sr=self.sample_rate) + + # Convert to 16-bit + left = (samples * self.volume * 32767).astype(np.int16) + right = left.copy() + + # Interleave channels for stereo + interleaved = np.empty(len(left) * 2, dtype=np.int16) + interleaved[0::2] = left + interleaved[1::2] = right + + stereo = interleaved.reshape(1, -1) + + frame = AudioFrame.from_ndarray(stereo, format="s16", layout="stereo") + frame.sample_rate = self.sample_rate + frame.pts = pts + frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) + + self._samples_generated += self.samples_per_frame + return frame + + def _generate_tone(self) -> np.ndarray: + """Generate sine wave tone as fallback.""" + t = (np.arange(self.samples_per_frame) + self._samples_generated) / self.sample_rate + return np.sin(2 * np.pi * self.frequency * t).astype(np.float32) + + def _generate_noise(self) -> np.ndarray: + """Generate white noise as fallback.""" + return np.random.uniform(-1, 1, self.samples_per_frame).astype(np.float32) + + +class ConfigurableVideoTrack(MediaStreamTrack): + """Configurable video track with different visualization modes""" + + kind = "video" + + def __init__(self, clock: MediaClock, config: Dict[str, Any]): + """Initialize the configurable video track.""" + super().__init__() + self.clock = clock + self.config = config + self.width = config.get('width', 320) + self.height = config.get('height', 240) + self.fps = config.get('fps', 15) + self.mode = config.get('visualization', 'ball') + self.frame_count = 0 + + # Initialize ball attributes + self.ball_x = self.width // 2 + self.ball_y = self.height // 2 + self.ball_dx = 2 + self.ball_dy = 2 + self.ball_radius = 20 + + def update_config(self, config_updates: Dict[str, Any]) -> bool: + """Update the video track configuration dynamically.""" + try: + old_mode = self.mode + old_width = self.width + old_height = self.height + + self.config.update(config_updates) + if 'width' in config_updates: self.width = config_updates['width'] if 'height' in config_updates: self.height = config_updates['height'] if 'fps' in config_updates: self.fps = config_updates['fps'] + if 'visualization' in config_updates: + self.mode = config_updates['visualization'] + + if self.mode != old_mode: + self._initialize_mode_state() + logger.info(f"Video mode changed from {old_mode} to {self.mode}") + + if self.width != old_width or self.height != old_height: + self._initialize_ball_state() + + logger.info(f"Video track configuration updated: {config_updates}") return True + except Exception as e: - logger.error(f"Error updating video config: {e}") + logger.error(f"Error updating video track configuration: {e}") return False + def _initialize_mode_state(self): + """Initialize state specific to the current visualization mode.""" + self._initialize_ball_state() + + def _initialize_ball_state(self): + """Initialize bouncing ball state.""" + self.ball_x = self.width // 2 + self.ball_y = self.height // 2 + self.ball_dx = 2 + self.ball_dy = 2 + self.ball_radius = min(self.width, self.height) * 0.06 + async def next_timestamp(self) -> Tuple[int, float]: - """Get next timestamp for video frame.""" pts = int(self.frame_count * (90000 / self.fps)) time_base = 1 / 90000 return pts, time_base async def recv(self) -> VideoFrame: - """Generate video frame with current text.""" - # Update current text if needed - current_time = time.time() - if (not self.current_text or - current_time - self.display_start_time > self.display_duration): - try: - self.current_text = self.text_queue.get_nowait() - self.display_start_time = current_time - logger.info(f"VibeVoice video: Displaying '{self.current_text}'") - except queue.Empty: - self.current_text = "" - # Create frame - frame = np.zeros((self.height, self.width, 3), dtype=np.uint8) + pts, time_base = await self.next_timestamp() - if self.current_text: - # Add background - cv2.rectangle(frame, (0, 0), (self.width, self.height), (0, 0, 0), -1) - - # Split text into lines if too long - words = self.current_text.split() - lines = [] - current_line = "" - max_chars_per_line = int(self.width / (self.font_scale * 20)) - - for word in words: - if len(current_line + " " + word) <= max_chars_per_line: - current_line += " " + word if current_line else word - else: - if current_line: - lines.append(current_line) - current_line = word - if current_line: - lines.append(current_line) - - # Draw text lines - line_height = int(self.font_scale * 40) - total_text_height = len(lines) * line_height - start_y = (self.height - total_text_height) // 2 + line_height - - for i, line in enumerate(lines): - text_size = cv2.getTextSize(line, self.font, self.font_scale, self.font_thickness)[0] - text_x = (self.width - text_size[0]) // 2 - text_y = start_y + i * line_height - - # Add text shadow - cv2.putText(frame, line, (text_x + 2, text_y + 2), - self.font, self.font_scale, (0, 0, 0), self.font_thickness + 1) - # Add main text - cv2.putText(frame, line, (text_x, text_y), - self.font, self.font_scale, (255, 255, 255), self.font_thickness) + # Create frame based on mode + if self.mode == 'ball': + frame_array = self._generate_ball_frame() + elif self.mode == 'waveform': + frame_array = self._generate_waveform_frame() + elif self.mode == 'static': + frame_array = self._generate_static_frame() else: - # Default background when no text - cv2.putText(frame, "VibeVoice TTS", (50, self.height // 2), - self.font, self.font_scale * 2, (255, 255, 255), self.font_thickness) + frame_array = self._generate_ball_frame() - # Draw waveform and status overlays from shared WaveformVideoTrack buffers - try: - pname = self.session_name - buf = WaveformVideoTrack.buffer.get(pname, None) - status = WaveformVideoTrack.speech_status.get(pname, {}) - - # Draw small status box in top-left - status_text = "Idle" - if status.get('is_processing'): - status_text = "Processing..." - elif status.get('is_speech'): - status_text = "Speaking" - elif buf is not None and len(buf) > 0: - # buffered seconds approx - sr = WaveformVideoTrack.sample_rates.get(pname, self.config.get('sample_rate', 16000)) - buffered_sec = len(buf) / float(sr) if sr > 0 else 0.0 - status_text = f"Buffered: {buffered_sec:.1f}s" - - box_w = int(self.width * 0.28) - box_h = int(self.height * 0.12) - cv2.rectangle(frame, (10, 10), (10 + box_w, 10 + box_h), (50, 50, 50), -1) - cv2.putText(frame, status_text, (20, 10 + int(box_h/2)), self.font, self.font_scale, (200, 200, 200), self.font_thickness) - - # Draw small energy meter - energy = status.get('energy', 0.0) - meter_h = int(box_h * 0.4) - meter_w = int(box_w * 0.6) - mx = 20 - my = 10 + box_h - 5 - filled = int(min(1.0, energy * 50.0) * meter_w) - cv2.rectangle(frame, (mx, my - meter_h), (mx + meter_w, my), (80, 80, 80), -1) - cv2.rectangle(frame, (mx, my - meter_h), (mx + filled, my), (0, 200, 0), -1) - - # Draw waveform at bottom area - if buf is not None and buf.size > 4: - sr = WaveformVideoTrack.sample_rates.get(pname, self.config.get('sample_rate', 16000)) - # Use last N samples corresponding to width pixels - samples_to_show = min(buf.size, max(1, int(sr * 5))) # show up to last 5s - slice_buf = buf[-samples_to_show:] - - # Downsample to width points - idx = (np.linspace(0, samples_to_show - 1, num=self.width)).astype(np.int32) - waveform = slice_buf[idx] - # Normalize waveform to -1..1 - maxv = np.max(np.abs(waveform)) if waveform.size > 0 else 1.0 - if maxv <= 0: - maxv = 1.0 - waveform = waveform / maxv - - # Map to pixel coordinates in bottom strip - wf_h = int(self.height * 0.22) - wf_y0 = self.height - wf_h - 10 - pts = [] - for i, v in enumerate(waveform): - px = int(i * (self.width / len(waveform))) - py = int(wf_y0 + (wf_h / 2) * (1 - v)) - pts.append((px, py)) - - if len(pts) >= 2: - cv2.polylines(frame, [np.array(pts, dtype=np.int32)], False, (100, 200, 255), 1) - # Fill under curve for nicer look - fill_pts = pts + [(self.width - 1, wf_y0 + wf_h), (0, wf_y0 + wf_h)] - cv2.fillPoly(frame, [np.array(fill_pts, dtype=np.int32)], (30, 60, 80)) - except Exception: - # Non-critical rendering failure shouldn't break video - pass + frame = VideoFrame.from_ndarray(frame_array, format="bgr24") + frame.pts = pts + frame.time_base = fractions.Fraction(time_base).limit_denominator(1000000) self.frame_count += 1 - return VideoFrame.from_ndarray(frame, format="bgr24") + return frame + def _generate_ball_frame(self) -> Any: + """Generate bouncing ball visualization""" + frame = np.zeros((self.height, self.width, 3), dtype=np.uint8) -class VibeVoiceAudioTrack(MediaStreamTrack): - """Audio track that plays TTS speech.""" + # Update ball position + self.ball_x += self.ball_dx + self.ball_y += self.ball_dy - kind = "audio" + # Bounce off walls + if self.ball_x <= self.ball_radius or self.ball_x >= self.width - self.ball_radius: + self.ball_dx = -self.ball_dx + if self.ball_y <= self.ball_radius or self.ball_y >= self.height - self.ball_radius: + self.ball_dy = -self.ball_dy - def __init__(self, clock, config: Dict[str, Any], tts_engine: VibeVoiceTTS, session_name: Optional[str] = None): - super().__init__() - self.clock = clock - self.config = config - self.tts = tts_engine - self.sample_rate = config.get('sample_rate', 16000) - self.samples_per_frame = config.get('samples_per_frame', 960) # 60ms at 16kHz + # Draw ball + cv2.circle(frame, (int(self.ball_x), int(self.ball_y)), int(self.ball_radius), (0, 255, 0), -1) - # Audio playback state - self.audio_queue = queue.Queue() - self.current_audio = None - self.audio_position = 0 - self.is_speaking = False - self.speaker = config.get('speaker', 'Alice') - - # Audio buffer for mixing multiple TTS segments - self.audio_buffer = np.array([], dtype=np.float32) - self.buffer_lock = threading.Lock() - - # Optional looping and debug options - self.loop = config.get('loop', True) - self.debug_save_wav = config.get('debug_save_wav', True) - # Keep the last fully-generated audio to enable looping - self.last_generated_audio = np.array([], dtype=np.float32) - # Protect last_generated_audio updates - self._last_gen_lock = threading.Lock() - - # Track total samples generated for proper PTS calculation - self._samples_generated = 0 - # Optional session name used to publish waveform data for visualization - self.session_name = session_name or f"VibeVoice:{int(time.time())}" - - def update_config(self, config_updates: Dict[str, Any]) -> bool: - """Update audio configuration.""" - try: - self.config.update(config_updates) - if 'sample_rate' in config_updates: - self.sample_rate = config_updates['sample_rate'] - if 'samples_per_frame' in config_updates: - self.samples_per_frame = config_updates['samples_per_frame'] - if 'speaker' in config_updates: - self.speaker = config_updates['speaker'] - if 'loop' in config_updates: - self.loop = bool(config_updates['loop']) - logger.info(f"🔁 Looping {'enabled' if self.loop else 'disabled'} for session {self.session_name}") - if 'debug_save_wav' in config_updates: - self.debug_save_wav = bool(config_updates['debug_save_wav']) - logger.info(f"🐞 Debug save wav {'enabled' if self.debug_save_wav else 'disabled'} for session {self.session_name}") - - # Log background noise configuration updates - background_noise_updated = False - if 'background_noise_enabled' in config_updates: - logger.info(f"🎵 Background noise enabled: {config_updates['background_noise_enabled']}") - background_noise_updated = True - if 'background_noise_type' in config_updates: - logger.info(f"🎵 Background noise type: {config_updates['background_noise_type']}") - background_noise_updated = True - if 'background_noise_volume' in config_updates: - logger.info(f"🎵 Background noise volume: {config_updates['background_noise_volume']}") - background_noise_updated = True - - if background_noise_updated: - logger.info("🎵 Background noise configuration updated - changes will take effect on next audio frame") - - return True - except Exception as e: - logger.error(f"Error updating audio config: {e}") - return False - - def speak_text(self, text: str, cfg_scale: Optional[float] = None): - """Queue text for speech synthesis.""" - if cfg_scale is None: - cfg_scale = 1.3 # Default value - - logger.info(f"VibeVoice audio: Starting background TTS generation for '{text}' with cfg_scale={cfg_scale}") - - # Start TTS generation in a background thread - import threading - thread = threading.Thread( - target=self._generate_tts_background, - args=(text, self.speaker, cfg_scale), - daemon=True - ) - thread.start() - - def _generate_tts_background(self, text: str, speaker: str, cfg_scale: float): - """Generate TTS in background thread and add to audio buffer.""" - try: - logger.info(f"VibeVoice audio: Background TTS generation started for '{text}'") - - # Log some diagnostic info about the TTS engine state - try: - logger.info(f"VibeVoice audio: TTS engine initialized={getattr(self.tts, 'is_initialized', False)}, device={getattr(self.tts, 'device', None)}, tts_sample_rate={getattr(self.tts, 'sample_rate', None)}") - # available_voices and speaker_mapping may be large; log summaries - try: - avv = getattr(self.tts, 'available_voices', {}) - smap = getattr(self.tts, 'speaker_mapping', {}) - logger.info(f"VibeVoice audio: available_voices={list(avv.keys())[:5]} (count={len(avv)}), speaker_mapping_count={len(smap)}") - except Exception: - pass - except Exception: - pass - - # Mark processing state for video overlay - try: - WaveformVideoTrack.speech_status[self.session_name] = WaveformVideoTrack.speech_status.get(self.session_name, {}) - WaveformVideoTrack.speech_status[self.session_name]['is_processing'] = True - except Exception: - pass - - # Require model and streamer to be available for streaming generation - if not self.tts.is_initialized: - logger.error("VibeVoice audio: Model or AudioStreamer not available - background generation disabled") - return - - # Prepare formatted text and inputs (same expectations as generate_speech) - formatted_text = f"Speaker {speaker}: {text}" - voice_samples = [] - if speaker in self.tts.speaker_mapping: - voice_name = self.tts.speaker_mapping[speaker] - if voice_name in self.tts.available_voices: - audio_path = self.tts.available_voices[voice_name] - audio_data = self.tts.read_audio(audio_path) - if len(audio_data) > 0: - voice_samples.append(audio_data) - else: - voice_samples.append([]) - else: - voice_samples.append([]) - else: - voice_samples.append([]) - - inputs = self.tts.processor( # type: ignore - text=[formatted_text], - voice_samples=[voice_samples], - padding=True, - return_tensors="pt" - ) - - # Move tensors to device - target_device = self.tts.device if self.tts.device in ("cuda", "mps") else "cpu" - for k, v in inputs.items(): - if torch.is_tensor(v): - inputs[k] = v.to(target_device) - - # Log a summary of inputs for diagnostic purposes - try: - inp_summary = {} - for k, v in inputs.items(): - if torch.is_tensor(v): - inp_summary[k] = f"tensor(shape={tuple(v.shape)}, dtype={v.dtype})" - else: - try: - inp_summary[k] = f"{type(v).__name__}(len={len(v)})" - except Exception: - inp_summary[k] = type(v).__name__ - logger.info(f"VibeVoice audio: Input summary for generation: {inp_summary}") - except Exception: - pass - - # Create audio streamer and start model.generate in a separate thread - real_streamer = AudioStreamer(batch_size=1, stop_signal=None, timeout=None) - audio_streamer = ProxyAudioStreamer(real_streamer, session_name=self.session_name) - - def _run_generate(): - try: - logger.info(f"VibeVoice audio: model.generate starting for session {self.session_name}") - with torch.no_grad(): - self.tts.model.generate( # type: ignore - **inputs, - max_new_tokens=None, - cfg_scale=cfg_scale, - tokenizer=self.tts.processor.tokenizer, # type: ignore - generation_config={'do_sample': False}, - verbose=False, - streamer=audio_streamer, - ) - except Exception as e: - logger.error(f"VibeVoice audio: Error during model.generate: {e}") - finally: - # Ensure streamer is ended - try: - audio_streamer.end() - except Exception: - pass - logger.info(f"VibeVoice audio: model.generate finished for session {self.session_name}") - - gen_thread = threading.Thread(target=_run_generate, daemon=True) - gen_thread.start() - - # Consume chunks from streamer and append to audio buffer as they arrive - generated_chunks = [] - chunk_count = 0 - total_samples_streamed = 0 - logger.info(f"VibeVoice audio: Audio streamer started for session {self.session_name}") - try: - logger.info(f"VibeVoice audio: audio_streamer repr: {repr(audio_streamer)[:400]}") - gs = None - try: - gs = audio_streamer.get_stream(0) - logger.info(f"VibeVoice audio: get_stream returned object type: {type(gs)}") - except Exception as _e: - logger.error(f"VibeVoice audio: calling audio_streamer.get_stream raised: {_e}") - gs = None - except Exception: - gs = None - - if gs is None: - logger.warning(f"VibeVoice audio: audio_streamer.get_stream did not return a stream for session {self.session_name}") - iterator = [] - else: - iterator = gs - - for audio_chunk in iterator: - try: - # Convert tensor to numpy if needed - if torch.is_tensor(audio_chunk): - if audio_chunk.dtype == torch.bfloat16: - audio_chunk = audio_chunk.float() - audio_np = audio_chunk.cpu().numpy().astype(np.float32) - else: - audio_np = np.array(audio_chunk, dtype=np.float32) - - # Squeeze to 1D if needed - if audio_np.ndim > 1: - audio_np = audio_np.squeeze() - - # Resample from model sampling rate (usually 24000) to track sample rate - if hasattr(self.tts, 'sample_rate') and self.tts.sample_rate != self.sample_rate: - try: - audio_np = librosa.resample(audio_np, orig_sr=self.tts.sample_rate, target_sr=self.sample_rate) - except Exception: - # If resample fails, keep original chunk - pass - - # Append to internal buffer - with self.buffer_lock: - if len(self.audio_buffer) == 0: - self.audio_buffer = audio_np - else: - self.audio_buffer = np.concatenate([self.audio_buffer, audio_np]) - - # Also collect into generated_chunks for possible looping/debug save - try: - generated_chunks.append(audio_np.astype(np.float32)) - except Exception: - pass - - total_samples_streamed += len(audio_np) - chunk_count += 1 - # Log every few chunks to avoid log spam - if chunk_count % 5 == 0: - logger.info(f"VibeVoice audio: Streamed {total_samples_streamed} samples so far for session {self.session_name} (chunks={chunk_count})") - else: - logger.debug(f"VibeVoice audio: Streamed {len(audio_np)} samples to buffer (total buffer: {len(self.audio_buffer)})") - - # Also publish into the global waveform buffer used by WaveformVideoTrack - try: - if WaveformVideoTrack is not None: - pname = self.session_name - # Ensure buffer key exists - if pname not in WaveformVideoTrack.buffer: - WaveformVideoTrack.buffer[pname] = np.array([], dtype=np.float32) - - # Append to shared waveform buffer - WaveformVideoTrack.buffer[pname] = np.concatenate([ - WaveformVideoTrack.buffer[pname], audio_np.astype(np.float32) - ]) - - # Ensure sample rate is set for this session - WaveformVideoTrack.sample_rates[pname] = self.sample_rate - - # Limit buffer to last 10 seconds for this track - max_samples = int(self.sample_rate * 10) - if len(WaveformVideoTrack.buffer[pname]) > max_samples: - WaveformVideoTrack.buffer[pname] = WaveformVideoTrack.buffer[pname][-max_samples:] - - # Update a lightweight speech_status for display - energy = float(np.sqrt(np.mean(audio_np.astype(np.float32) ** 2))) if audio_np.size > 0 else 0.0 - # Approximate zero-crossing rate - try: - if audio_np.size > 1: - zcr = float(np.mean(np.abs(np.diff(np.sign(audio_np)) ) > 0)) - else: - zcr = 0.0 - except Exception: - zcr = 0.0 - - is_speech = energy > 0.005 - - WaveformVideoTrack.speech_status[pname] = { - 'is_speech': bool(is_speech), - 'energy': float(energy), - 'zcr': float(zcr), - 'centroid': 0.0, - 'rolloff': 0.0, - 'flux': 0.0, - 'harmonicity': 0.0, - 'noise_floor_energy': 0.0, - 'adaptive_threshold': 0.0, - 'energy_check': bool(energy > 0.002), - 'zcr_check': bool(zcr > 0.01), - 'spectral_check': False, - 'harmonic_check': False, - 'temporal_consistency': True, - 'is_processing': True, - 'is_playing': False, - } - except Exception: - # Non-critical - don't break TTS on visualization failures - pass - except Exception as e: - logger.error(f"VibeVoice audio: Error processing audio chunk from streamer: {e}") - - # Ensure generation thread finishes - gen_thread.join(timeout=5.0) - - # If generation thread is still alive after join, log a warning - if gen_thread.is_alive(): - logger.warning(f"VibeVoice audio: generation thread still alive after join for session {self.session_name}") - - # When generation completes, store last_generated_audio for looping and optionally save debug WAV - logger.info(f"VibeVoice audio: Generation completed for session {self.session_name}. total_samples_streamed={total_samples_streamed}, chunks={chunk_count}") - - # If no chunks were received, emit a diagnostic warning with some state to help debugging - if chunk_count == 0: - try: - # Provide more diagnostic info: inputs summary and streamer introspection - try: - sdi = { - 'repr': repr(audio_streamer)[:400], - 'dir': [n for n in dir(audio_streamer) if not n.startswith('_')][:40] - } - except Exception: - sdi = {'repr': 'unavailable', 'dir': []} - - try: - logger.warning( - f"VibeVoice audio: No audio chunks were streamed for session {self.session_name}. " - f"is_initialized={getattr(self.tts, 'is_initialized', False)}, model_present={hasattr(self.tts, 'model')} ; " - f"audio_streamer={sdi}" - ) - except Exception: - logger.warning(f"VibeVoice audio: No audio chunks were streamed for session {self.session_name} (diagnostics failed)") - except Exception: - logger.warning(f"VibeVoice audio: No audio chunks were streamed for session {self.session_name} (additional diagnostics unavailable)") - # Fallback: attempt a synchronous generation that returns a full numpy audio array - try: - logger.info(f"VibeVoice audio: Attempting synchronous fallback generation for session {self.session_name}") - fallback_audio = None - try: - fallback_audio = self.tts.generate_speech(text, speaker, cfg_scale=cfg_scale) - except Exception as e: - logger.error(f"VibeVoice audio: synchronous fallback generation raised: {e}") - - if fallback_audio is not None and getattr(fallback_audio, 'size', 0) > 0: - try: - fa = fallback_audio.astype(np.float32) - except Exception: - fa = np.array(fallback_audio, dtype=np.float32) - - # Resample if needed - try: - tts_sr = getattr(self.tts, 'sample_rate', 24000) - if tts_sr != self.sample_rate: - fa = librosa.resample(fa, orig_sr=tts_sr, target_sr=self.sample_rate) - except Exception: - pass - - # Append into internal buffer and last_generated_audio - with self.buffer_lock: - if len(self.audio_buffer) == 0: - self.audio_buffer = fa - else: - self.audio_buffer = np.concatenate([self.audio_buffer, fa]) - with self._last_gen_lock: - self.last_generated_audio = fa.copy() - - # Publish to waveform buffer - try: - pname = self.session_name - if pname not in WaveformVideoTrack.buffer: - WaveformVideoTrack.buffer[pname] = np.array([], dtype=np.float32) - WaveformVideoTrack.buffer[pname] = np.concatenate([WaveformVideoTrack.buffer[pname], fa.astype(np.float32)]) - WaveformVideoTrack.sample_rates[pname] = self.sample_rate - except Exception: - pass - - # Optionally save debug wav - if self.debug_save_wav: - try: - try: - import soundfile as sf - fname = f"/tmp/vibevoice_{self.session_name}_{int(time.time())}_fallback.wav" - sf.write(fname, fa, samplerate=self.sample_rate) - logger.info(f"🐞 Saved fallback generated wav to {fname} (soundfile)") - except Exception: - try: - from scipy.io import wavfile - fname = f"/tmp/vibevoice_{self.session_name}_{int(time.time())}_fallback.wav" - wavfile.write(fname, self.sample_rate, (fa * 32767).astype('int16')) - logger.info(f"🐞 Saved fallback generated wav to {fname} (scipy)") - except Exception: - try: - import wave - fname = f"/tmp/vibevoice_{self.session_name}_{int(time.time())}_fallback.wav" - with wave.open(fname, 'wb') as wf: - wf.setnchannels(1) - wf.setsampwidth(2) - wf.setframerate(self.sample_rate) - int_data = (fa * 32767).astype('int16') - wf.writeframes(int_data.tobytes()) - logger.info(f"🐞 Saved fallback generated wav to {fname} (wave)") - except Exception as e: - logger.error(f"Error saving fallback debug wav (all methods failed): {e}") - except Exception as e: - logger.error(f"Error saving fallback debug wav: {e}") - - logger.info(f"VibeVoice audio: Fallback synchronous generation successful for session {self.session_name} (samples={len(fa)})") - else: - logger.warning(f"VibeVoice audio: Fallback synchronous generation produced no audio for session {self.session_name}") - except Exception as e: - logger.error(f"VibeVoice audio: Exception during synchronous fallback generation: {e}") - try: - if len(generated_chunks) > 0: - try: - all_gen = np.concatenate(generated_chunks).astype(np.float32) - except Exception: - all_gen = np.array([], dtype=np.float32) - with self._last_gen_lock: - self.last_generated_audio = all_gen.copy() - - # Optionally save to disk for debugging - if self.debug_save_wav: - try: - try: - import soundfile as sf - fname = f"/tmp/vibevoice_{self.session_name}_{int(time.time())}.wav" - sf.write(fname, all_gen, samplerate=self.sample_rate) - logger.info(f"🐞 Saved generated wav to {fname} (soundfile)") - except Exception: - # Try scipy fallback - try: - from scipy.io import wavfile - fname = f"/tmp/vibevoice_{self.session_name}_{int(time.time())}.wav" - # scipy expects int16 - wavfile.write(fname, self.sample_rate, (all_gen * 32767).astype('int16')) - logger.info(f"🐞 Saved generated wav to {fname} (scipy)") - except Exception: - # Ultimate fallback: write raw wave via wave module - try: - import wave - fname = f"/tmp/vibevoice_{self.session_name}_{int(time.time())}.wav" - with wave.open(fname, 'wb') as wf: - wf.setnchannels(1) - wf.setsampwidth(2) - wf.setframerate(self.sample_rate) - int_data = (all_gen * 32767).astype('int16') - wf.writeframes(int_data.tobytes()) - logger.info(f"🐞 Saved generated wav to {fname} (wave)") - except Exception as e: - logger.error(f"Error saving debug wav (all methods failed): {e}") - except Exception as e: - logger.error(f"Error saving debug wav: {e}") - - except Exception: - pass - - # Clear processing flag when generation completes - try: - if self.session_name in WaveformVideoTrack.speech_status: - WaveformVideoTrack.speech_status[self.session_name]['is_processing'] = False - except Exception: - pass - - except Exception as e: - logger.error(f"VibeVoice audio: Error in background TTS generation: {e}") - - def _get_samples_from_buffer(self, num_samples: int) -> np.ndarray: - """Get samples from audio buffer, removing them from buffer.""" - # Try to refill from last_generated_audio if looping is enabled - with self._last_gen_lock: - last_gen = self.last_generated_audio.copy() if getattr(self, 'last_generated_audio', None) is not None else np.array([], dtype=np.float32) - - with self.buffer_lock: - if len(self.audio_buffer) == 0: - # If we're configured to loop and have a generated sample, refill the buffer - if getattr(self, 'loop', False) and last_gen.size > 0: - try: - # Repeat last_gen as needed to reach at least num_samples - repeats = int(math.ceil(float(num_samples) / float(len(last_gen)))) if len(last_gen) > 0 else 1 - refill = np.tile(last_gen, repeats) - self.audio_buffer = refill.astype(np.float32) - logger.debug(f"VibeVoice audio: Refilled audio_buffer from last_generated_audio (len={len(last_gen)}) repeats={repeats}") - except Exception: - # Fallback to silence on any failure - self.audio_buffer = np.zeros(num_samples, dtype=np.float32) - else: - return np.zeros(num_samples, dtype=np.float32) - - if len(self.audio_buffer) >= num_samples: - samples = self.audio_buffer[:num_samples] - self.audio_buffer = self.audio_buffer[num_samples:] - return samples - else: - # Return remaining samples and pad with zeros - samples = self.audio_buffer - padding = np.zeros(num_samples - len(self.audio_buffer), dtype=np.float32) - self.audio_buffer = np.array([], dtype=np.float32) - return np.concatenate([samples, padding]) - - async def next_timestamp(self) -> Tuple[int, float]: - """Get next timestamp for audio frame.""" - pts = self._samples_generated - time_base = 1 / self.sample_rate - return pts, time_base - - async def recv(self) -> AudioFrame: - """Generate audio frame with TTS speech from buffer.""" - # Get samples from buffer - samples = self._get_samples_from_buffer(self.samples_per_frame) - - # If no TTS audio available, generate background noise - if np.all(samples == 0): - # Default to disabled when not present in config to avoid unexpected noise - if self.config.get('background_noise_enabled', False): - noise_type = self.config.get('background_noise_type', 'white') - noise_volume = self.config.get('background_noise_volume', 0.01) - # Generate noise for this frame duration - frame_duration = self.samples_per_frame / self.sample_rate - logger.debug(f"🎵 Generating background noise: type={noise_type}, volume={noise_volume}, duration={frame_duration:.3f}s") - background_noise = self.tts.generate_background_noise(frame_duration, noise_type, noise_volume, self.sample_rate) - logger.debug(f"🎵 Generated background noise: {len(background_noise)} samples") - samples = background_noise - else: - # Generate silence if background noise is disabled - logger.debug("🎵 Background noise disabled - generating silence") - samples = np.zeros(self.samples_per_frame, dtype=np.float32) - - # Convert to 16-bit PCM - # Update shared speech_status for visualization: energy + playing flag - try: - energy = float(np.sqrt(np.mean(samples.astype(np.float32) ** 2))) if samples.size > 0 else 0.0 - pname = self.session_name - st = WaveformVideoTrack.speech_status.get(pname, {}) - st['energy'] = float(energy) - # Consider playing when energy above small threshold - st['is_playing'] = bool(energy > 0.001) - st['is_speech'] = bool(energy > 0.003) - WaveformVideoTrack.speech_status[pname] = st - except Exception: - pass - - samples_int16 = (samples * 32767).astype(np.int16) - - # Create stereo audio (duplicate mono channel) - left = samples_int16 - right = samples_int16.copy() - stereo = np.empty(self.samples_per_frame * 2, dtype=np.int16) - stereo[0::2] = left - stereo[1::2] = right - - # Create audio frame - frame = AudioFrame.from_ndarray(stereo.reshape(1, -1), format="s16", layout="stereo") - frame.sample_rate = self.sample_rate - frame.pts = self._samples_generated - frame.time_base = fractions.Fraction(1, self.sample_rate) - - # Increment sample counter - self._samples_generated += self.samples_per_frame + # Add timestamp + timestamp = f"Frame: {self.frame_count}" + cv2.putText(frame, timestamp, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) return frame + def _generate_waveform_frame(self) -> Any: + """Generate waveform visualization""" + frame = np.zeros((self.height, self.width, 3), dtype=np.uint8) -class VibeVoiceTTSBot: - """VibeVoice Text-to-Speech Bot for voicebot framework.""" + # Generate sine wave + x = np.linspace(0, 4*np.pi, self.width) + y = np.sin(x + self.frame_count * 0.1) * self.height // 4 + self.height // 2 - def __init__(self, session_name: str, config: Optional[Dict[str, Any]] = None): - self.session_name = session_name - self.config = config or {} + # Draw waveform + for i in range(1, len(y)): + cv2.line(frame, (i-1, int(y[i-1])), (i, int(y[i])), (255, 255, 255), 2) - # Initialize TTS engine with enhanced parameters - device = self.config.get('device', 'cpu') - inference_steps = self.config.get('inference_steps', 10) - self.tts_engine = VibeVoiceTTS(device=device, inference_steps=inference_steps, config=self.config) + return frame - # Store generation parameters - self.cfg_scale = self.config.get('cfg_scale', 1.3) - self.speaker = self.config.get('speaker', '1') - - # Initialize media components - self.media_clock = MediaClock() - # Pass session name into video track so it can show per-session waveform/status - self.video_track = VibeVoiceVideoTrack(self.media_clock, self.config, session_name=session_name) - self.audio_track = VibeVoiceAudioTrack(self.media_clock, self.config, self.tts_engine, session_name=session_name) - - # Initialize shared waveform store sample rate and empty buffer/status - try: - WaveformVideoTrack.sample_rates[session_name] = self.config.get('sample_rate', 16000) - if session_name not in WaveformVideoTrack.buffer: - WaveformVideoTrack.buffer[session_name] = np.array([], dtype=np.float32) - if session_name not in WaveformVideoTrack.speech_status: - WaveformVideoTrack.speech_status[session_name] = {'is_speech': False, 'energy': 0.0, 'is_processing': False, 'is_playing': False} - except Exception: - pass - - # Apply initial configuration values to ensure defaults from schema/config provider - try: - self.update_config(self.config) - except Exception: - # Don't let config application stop initialization - pass - - logger.info(f"VibeVoice bot initialized for session {session_name} with cfg_scale={self.cfg_scale}, speaker={self.speaker}") - - def get_tracks(self) -> Dict[str, MediaStreamTrack]: - """Get video and audio tracks.""" - return { - "video": self.video_track, - "audio": self.audio_track - } - - def handle_chat_message(self, message: ChatMessageModel): - """Handle incoming chat messages by converting them to speech.""" - try: - text = message.message.strip() - if text: - logger.info(f"VibeVoice bot received chat: '{text}' from {message.sender_name}") - - # Queue text for both video display and audio speech - self.video_track.update_text(text) - self.audio_track.speak_text(text, self.cfg_scale) - - except Exception as e: - logger.error(f"Error handling chat message in VibeVoice bot: {e}") - - def update_config(self, config_updates: Dict[str, Any]) -> bool: - """Update bot configuration.""" - try: - self.config.update(config_updates) - - # Update TTS-specific parameters - if 'cfg_scale' in config_updates: - self.cfg_scale = config_updates['cfg_scale'] - if 'speaker' in config_updates: - self.speaker = config_updates['speaker'] - - # Update tracks - video_success = self.video_track.update_config(config_updates) - audio_success = self.audio_track.update_config(config_updates) - - if video_success and audio_success: - logger.info(f"VibeVoice bot configuration updated: {config_updates}") - return True - else: - logger.warning("Partial configuration update failure in VibeVoice bot") - return False - - except Exception as e: - logger.error(f"Error updating VibeVoice bot configuration: {e}") - return False + def _generate_static_frame(self) -> Any: + """Generate static color frame""" + color = self.config.get('static_color', (128, 128, 128)) + frame = np.full((self.height, self.width, 3), color, dtype=np.uint8) + return frame -# Global bot instance registry -_vibevoice_bots: Dict[str, VibeVoiceTTSBot] = {} - - -def create_vibevoice_bot_tracks(session_name: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, MediaStreamTrack]: +def create_vibevoice_tracks(session_name: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, MediaStreamTrack]: """ - Create VibeVoice TTS bot tracks. - + Create VibeVoice-enabled WebRTC tracks. + Args: session_name: Name for the session - config: Configuration dictionary with options: - - width: video width (default 640) - - height: video height (default 480) - - fps: frames per second (default 15) - - sample_rate: audio sample rate (default 16000) - - samples_per_frame: audio samples per frame (default 960) - - speaker: TTS speaker name (default '1') - - device: device for TTS ('cpu', 'cuda', 'mps') - - cfg_scale: CFG scale for generation (default 1.3) - - inference_steps: Number of inference steps (default 10) - + config: Configuration dictionary + Returns: Dictionary containing 'video' and 'audio' tracks """ @@ -1344,82 +587,60 @@ def create_vibevoice_bot_tracks(session_name: str, config: Optional[Dict[str, An # Set defaults default_config = { - 'width': 640, - 'height': 480, + 'visualization': 'ball', + 'audio_mode': 'silence', # Start with silence + 'width': 320, + 'height': 240, 'fps': 15, - 'sample_rate': 16000, - 'samples_per_frame': 960, - 'speaker': '1', - 'device': 'cpu', - 'cfg_scale': 1.3, - 'inference_steps': 10, - # Explicit background noise defaults - disabled by default - 'background_noise_enabled': False, - 'background_noise_type': 'none', - 'background_noise_volume': 0.0, + 'sample_rate': 24000, # VibeVoice native sample rate + 'samples_per_frame': 480, # 20ms at 24kHz + 'frequency': 440.0, + 'volume': 0.1, + 'static_color': (128, 128, 128), + 'model_path': 'vibevoice/VibeVoice-1.5B', # Default model path + 'device': 'cuda' if torch.cuda.is_available() else 'cpu', + 'voice_preset': None } default_config.update(config) - # Create bot instance - bot = VibeVoiceTTSBot(session_name, default_config) - _vibevoice_bots[session_name] = bot + # Parse static_color if it's a string + if isinstance(default_config.get('static_color'), str): + try: + color_str = default_config['static_color'] + r, g, b = map(int, color_str.split(',')) + default_config['static_color'] = (r, g, b) + except (ValueError, TypeError): + logger.warning(f"Invalid static_color format: {default_config.get('static_color')}, using default") + default_config['static_color'] = (128, 128, 128) - logger.info(f"Created VibeVoice bot tracks for {session_name}") - return bot.get_tracks() + media_clock = MediaClock() + + # Initialize VibeVoice processor + tts_processor = VibeVoiceWebRTCProcessor( + model_path=default_config['model_path'], + device=default_config['device'], + inference_steps=5 + ) + + # Create tracks + video_track = ConfigurableVideoTrack(media_clock, default_config) + audio_track = VibeVoiceAudioTrack(media_clock, default_config, tts_processor) + + logger.info(f"Created VibeVoice tracks for {session_name} with config: {default_config}") + + # Store tracks and processor in global registry + _active_tracks[session_name] = { + "video": video_track, + "audio": audio_track, + "tts_processor": tts_processor + } + + return {"video": video_track, "audio": audio_track} -def handle_config_update(session_name: str, config_values: Dict[str, Any]) -> bool: - """ - Handle runtime configuration updates for VibeVoice bot. - - Args: - session_name: Name of the session/bot instance - config_values: Dictionary of configuration values to update - - Returns: - bool: True if update was successful, False otherwise - """ - try: - if session_name in _vibevoice_bots: - return _vibevoice_bots[session_name].update_config(config_values) - else: - logger.warning(f"No VibeVoice bot found for session {session_name}") - return False - except Exception as e: - logger.error(f"Error updating VibeVoice bot configuration: {e}") - return False - - -async def handle_chat_message( - chat_message: ChatMessageModel, - send_message_func: Callable[[Union[str, ChatMessageModel]], Awaitable[None]] -) -> Optional[str]: - """ - Handle incoming chat messages and convert them to speech. - - Args: - chat_message: The chat message to process - send_message_func: Function to send chat responses (not used by TTS bot) - """ - try: - # Find the bot instance - we need to get session name from somewhere - # For now, we'll use the first available bot instance - if _vibevoice_bots: - session_name = list(_vibevoice_bots.keys())[0] - _vibevoice_bots[session_name].handle_chat_message(chat_message) - logger.info(f"VibeVoice bot processed chat message from {chat_message.sender_name}: '{chat_message.message}'") - else: - logger.warning("No VibeVoice bot instances available to handle chat message") - except Exception as e: - logger.error(f"Error handling chat message in VibeVoice bot: {e}") - - # TTS bot doesn't send chat responses, so return None - return None - - -# Agent descriptor exported for dynamic discovery by the FastAPI service +# Agent descriptor AGENT_NAME = "VibeVoice TTS Bot" -AGENT_DESCRIPTION = "Microsoft VibeVoice text-to-speech bot with visual text display" +AGENT_DESCRIPTION = "WebRTC bot with VibeVoice text-to-speech capabilities" def agent_info() -> Dict[str, str]: """Return agent metadata for discovery.""" @@ -1428,24 +649,72 @@ def agent_info() -> Dict[str, str]: "description": AGENT_DESCRIPTION, "has_media": "true", "configurable": "true", - "chat_enabled": "true" + "has_tts": "true" } def get_config_schema() -> Dict[str, Any]: - """Get the configuration schema for the VibeVoice Bot.""" + """Get the configuration schema for the VibeVoice TTS Bot.""" return { "bot_name": AGENT_NAME, "version": "1.0", "parameters": [ + { + "name": "visualization", + "type": "select", + "label": "Video Visualization Mode", + "description": "Choose the type of video visualization to display", + "default_value": "ball", + "required": True, + "options": [ + {"value": "ball", "label": "Bouncing Ball"}, + {"value": "waveform", "label": "Sine Wave Animation"}, + {"value": "static", "label": "Static Color Frame"} + ] + }, + { + "name": "audio_mode", + "type": "select", + "label": "Fallback Audio Mode", + "description": "Audio mode when no TTS is active", + "default_value": "silence", + "required": True, + "options": [ + {"value": "silence", "label": "Silence"}, + {"value": "tone", "label": "Sine Wave Tone"}, + {"value": "noise", "label": "White Noise"} + ] + }, + { + "name": "voice_preset", + "type": "string", + "label": "Voice Preset", + "description": "Name of the voice preset to use for TTS", + "default_value": "", + "required": False, + "max_length": 50 + }, + { + "name": "device", + "type": "select", + "label": "Processing Device", + "description": "Device to use for TTS processing", + "default_value": "cuda", + "required": True, + "options": [ + {"value": "cuda", "label": "CUDA (GPU)"}, + {"value": "cpu", "label": "CPU"}, + {"value": "mps", "label": "Apple Metal (MPS)"} + ] + }, { "name": "width", "type": "number", "label": "Video Width", "description": "Width of the video frame in pixels", - "default_value": 640, + "default_value": 320, "required": False, - "min_value": 320, + "min_value": 160, "max_value": 1920, "step": 1 }, @@ -1454,116 +723,33 @@ def get_config_schema() -> Dict[str, Any]: "type": "number", "label": "Video Height", "description": "Height of the video frame in pixels", - "default_value": 480, + "default_value": 240, "required": False, - "min_value": 240, + "min_value": 120, "max_value": 1080, "step": 1 }, { - "name": "fps", - "type": "number", - "label": "Frames Per Second", - "description": "Video frame rate", - "default_value": 15, - "required": False, - "min_value": 1, - "max_value": 60, - "step": 1 - }, - { - "name": "speaker", - "type": "select", - "label": "TTS Speaker", - "description": "Voice to use for text-to-speech", - "default_value": "1", - "required": True, - "options": [ - {"value": "1", "label": "Speaker 1 (en-Alice_woman)"}, - {"value": "2", "label": "Speaker 2 (en-Carter_man)"}, - {"value": "3", "label": "Speaker 3 (en-Frank_man)"}, - {"value": "4", "label": "Speaker 4 (en-Mary_woman_bgm)"} - ] - }, - { - "name": "background_noise_enabled", - "type": "boolean", - "label": "Enable Background Noise", - "description": "Add background noise to ensure continuous audio streaming", - "default_value": False, - "required": False - }, - { - "name": "background_noise_type", - "type": "select", - "label": "Background Noise Type", - "description": "Type of background noise to generate", - # 'none' indicates no noise - matches default disabled behavior - "default_value": "none", - "required": False, - "options": [ - {"value": "white", "label": "White Noise"}, - {"value": "pink", "label": "Pink Noise"}, - {"value": "brown", "label": "Brown Noise"}, - {"value": "none", "label": "None"} - ] - }, - { - "name": "background_noise_volume", - "type": "number", - "label": "Background Noise Volume", - "description": "Volume level of background noise (0.0 to 1.0)", - "default_value": 0.01, + "name": "volume", + "type": "range", + "label": "Audio Volume", + "description": "Volume level (0.0 to 1.0)", + "default_value": 0.1, "required": False, "min_value": 0.0, "max_value": 1.0, - "step": 0.001 - }, - { - "name": "device", - "type": "select", - "label": "Processing Device", - "description": "Device to use for TTS processing", - "default_value": "cpu", - "required": True, - "options": [ - {"value": "cpu", "label": "CPU"}, - {"value": "cuda", "label": "CUDA (GPU)"}, - {"value": "mps", "label": "MPS (Apple Silicon)"} - ] - }, - { - "name": "cfg_scale", - "type": "number", - "label": "CFG Scale", - "description": "Classifier-free guidance scale for controlling generation quality", - "default_value": 1.3, - "required": False, - "min_value": 1.0, - "max_value": 2.0, - "step": 0.05 - }, - { - "name": "inference_steps", - "type": "number", - "label": "Inference Steps", - "description": "Number of denoising steps for audio generation", - "default_value": 10, - "required": False, - "min_value": 5, - "max_value": 50, - "step": 1 + "step": 0.1 } ], "categories": [ { - "Video Settings": ["width", "height", "fps"] + "TTS Settings": ["voice_preset", "device"] }, { - "TTS Settings": ["speaker", "device", "cfg_scale", "inference_steps"] + "Video Settings": ["visualization", "width", "height"] }, { - "Background Noise": ["background_noise_enabled", "background_noise_type", "background_noise_volume"] + "Audio Settings": ["audio_mode", "volume"] } ] } @@ -1571,4 +757,123 @@ def get_config_schema() -> Dict[str, Any]: def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]: """Factory wrapper used by the FastAPI service to instantiate tracks for an agent.""" - return create_vibevoice_bot_tracks(session_name) \ No newline at end of file + return create_vibevoice_tracks(session_name) + + +async def handle_chat_message( + chat_message: ChatMessageModel, + send_message_func: Callable[[Union[str, ChatMessageModel]], Awaitable[None]] +) -> Optional[str]: + """Handle chat messages and convert them to speech.""" + try: + message_text = chat_message.message.strip() + if not message_text: + return None + + logger.info(f"Processing TTS for message: '{message_text[:50]}...'") + + # Find the TTS processor for this session + # Note: This assumes the session_name is available somehow + # You might need to modify this based on how sessions are handled + session_name = getattr(chat_message, 'session_id', None) + if not session_name: + # Try to find any active session (fallback) + if _active_tracks: + session_name = list(_active_tracks.keys())[0] + + if session_name and session_name in _active_tracks: + tts_processor = _active_tracks[session_name].get("tts_processor") + if tts_processor: + # Get voice configuration + audio_track = _active_tracks[session_name].get("audio") + voice_preset = audio_track.config.get('voice_preset') if audio_track else None + + # Generate speech asynchronously + await tts_processor.generate_speech_async(message_text, voice_preset) + + # Send confirmation message + response_message = ChatMessageModel( + id=secrets.token_hex(16), + message=f"🔊 Converting to speech: '{message_text[:30]}{'...' if len(message_text) > 30 else ''}'", + sender_name="VibeVoice Bot", + sender_session_id=session_name, + lobby_id=chat_message.lobby_id, + timestamp=time.time() + ) + await send_message_func(response_message) + else: + logger.error(f"No TTS processor found for session {session_name}") + else: + logger.error("No active session found for TTS processing") + + return None + + except Exception as e: + logger.error(f"Error processing chat message for TTS: {e}") + return None + + +def handle_config_update(lobby_id: str, config_values: Dict[str, Any]) -> bool: + """Handle runtime configuration updates for the VibeVoice bot.""" + try: + validated_config = {} + + # Parse and validate static_color if provided + if 'static_color' in config_values: + color_value = config_values['static_color'] + if isinstance(color_value, str): + try: + r, g, b = map(int, color_value.split(',')) + validated_config['static_color'] = (r, g, b) + except (ValueError, TypeError): + logger.warning(f"Invalid static_color format: {color_value}, ignoring update") + return False + elif isinstance(color_value, (tuple, list)) and len(color_value) == 3: + validated_config['static_color'] = tuple(color_value) + else: + logger.warning(f"Invalid static_color type: {type(color_value)}, ignoring update") + return False + + # Copy other valid configuration values + valid_keys = {'visualization', 'audio_mode', 'width', 'height', 'fps', + 'sample_rate', 'frequency', 'volume', 'voice_preset', 'device'} + for key in valid_keys: + if key in config_values: + validated_config[key] = config_values[key] + + if validated_config: + logger.info(f"Configuration updated for {lobby_id}: {validated_config}") + + # Update running tracks if they exist in the registry + if lobby_id in _active_tracks: + tracks = _active_tracks[lobby_id] + video_track = tracks.get("video") + audio_track = tracks.get("audio") + + # Update video track configuration + if video_track and hasattr(video_track, 'update_config'): + video_updated = video_track.update_config(validated_config) + if video_updated: + logger.info(f"Video track configuration updated for {lobby_id}") + else: + logger.warning(f"Failed to update video track configuration for {lobby_id}") + + # Update audio track configuration + if audio_track and hasattr(audio_track, 'update_config'): + audio_updated = audio_track.update_config(validated_config) + if audio_updated: + logger.info(f"Audio track configuration updated for {lobby_id}") + else: + logger.warning(f"Failed to update audio track configuration for {lobby_id}") + + return True + else: + logger.warning(f"No active tracks found for session {lobby_id}") + return False + else: + logger.warning(f"No valid configuration values provided for {lobby_id}") + return False + + except Exception as e: + logger.error(f"Error updating configuration for {lobby_id}: {e}") + return False \ No newline at end of file