From eee5727fdf1c8ca2567562800c6abbaba8e8fe13 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Mon, 8 Sep 2025 09:50:05 -0700 Subject: [PATCH] Improving B580 work Signed-off-by: James Ketrenos --- voicebot/bots/whisper.py | 1395 +++++++++++++++++++++++--------------- 1 file changed, 852 insertions(+), 543 deletions(-) diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index aaf9c5c..c36eff0 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -1,18 +1,22 @@ -"""Streaming Whisper agent (bots/whisper) +"""Streaming Whisper agent (bots/whisper) - OpenVINO Optimized for Intel Arc B580 Real-time speech transcription agent that processes incoming audio streams and sends transcriptions as chat messages to the lobby. +Optimized for Intel Arc B580 GPU using OpenVINO inference engine. """ import asyncio import numpy as np import time import threading -from collections import deque +import os +import gc +import shutil from queue import Queue, Empty -from typing import Dict, Optional, Callable, Awaitable, Deque, Any, cast +from typing import Dict, Optional, Callable, Awaitable, Any, cast, List, Union +from pathlib import Path import numpy.typing as npt -from pydantic import BaseModel +from pydantic import BaseModel, Field, ConfigDict # Core dependencies import librosa @@ -23,47 +27,84 @@ from av import AudioFrame # Import shared models for chat functionality import sys -import os - -from voicebot.models import Peer - sys.path.append( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) from shared.models import ChatMessageModel +from voicebot.models import Peer -from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq +# OpenVINO optimized imports +import openvino as ov +from optimum.intel.openvino import OVModelForSpeechSeq2Seq +from transformers import AutoProcessor +import torch + +# Import quantization dependencies with error handling +try: + import nncf + from optimum.intel.openvino.quantization import InferRequestWrapper + QUANTIZATION_AVAILABLE = True +except ImportError as e: + logger.warning(f"Quantization libraries not available: {e}") + QUANTIZATION_AVAILABLE = False # Type definitions AudioArray = npt.NDArray[np.float32] +ModelConfig = Dict[str, Union[str, int, bool]] +CalibrationData = List[Dict[str, Any]] class AudioQueueItem(BaseModel): """Audio data with timestamp for processing queue.""" - - audio: AudioArray - timestamp: float - - class Config: - arbitrary_types_allowed = True + model_config = ConfigDict(arbitrary_types_allowed=True) + + audio: AudioArray = Field(..., description="Audio data as numpy array") + timestamp: float = Field(..., description="Timestamp when audio was captured") class TranscriptionHistoryItem(BaseModel): """Transcription history item with metadata.""" - - message: str - timestamp: float - is_final: bool + model_config = ConfigDict(arbitrary_types_allowed=True) + + message: str = Field(..., description="Transcribed text message") + timestamp: float = Field(..., description="When transcription was completed") + is_final: bool = Field(..., description="Whether this is final or streaming transcription") +class OpenVINOConfig(BaseModel): + """OpenVINO configuration for Intel Arc B580 optimization.""" + model_config = ConfigDict(arbitrary_types_allowed=True) + + device: str = Field(default="GPU", description="Target device for inference") + cache_dir: str = Field(default="./ov_cache", description="Cache directory for compiled models") + enable_quantization: bool = Field(default=True, description="Enable INT8 quantization") + throughput_streams: int = Field(default=2, description="Number of inference streams") + max_threads: int = Field(default=8, description="Maximum number of threads") + + def to_ov_config(self) -> ModelConfig: + """Convert to OpenVINO configuration dictionary.""" + return { + "CACHE_DIR": self.cache_dir, + "GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES", + "GPU_ENABLE_LOOP_UNROLLING": "YES", + "GPU_THROUGHPUT_STREAMS": str(self.throughput_streams), + "GPU_MAX_NUM_THREADS": str(self.max_threads), + "GPU_ENABLE_OPENCL_THROTTLING": "NO" + } + + +# Global configuration and constants AGENT_NAME = "whisper" -AGENT_DESCRIPTION = "Real-time speech transcription (Whisper) - converts speech to text" -sample_rate = 16000 # Whisper expects 16kHz +AGENT_DESCRIPTION = "Real-time speech transcription (OpenVINO Whisper) - converts speech to text on Intel Arc B580" +SAMPLE_RATE = 16000 # Whisper expects 16kHz +CHUNK_DURATION_MS = 100 # Reduced latency - 100ms chunks +VAD_THRESHOLD = 0.01 # Voice activity detection threshold +MAX_SILENCE_FRAMES = 30 # 3 seconds of silence before stopping model_ids = { "Distil-Whisper": [ "distil-whisper/distil-large-v2", - "distil-whisper/distil-medium.en", + "distil-whisper/distil-medium.en", "distil-whisper/distil-small.en", ], "Whisper": [ @@ -75,638 +116,906 @@ model_ids = { "openai/whisper-base", "openai/whisper-tiny", "openai/whisper-medium.en", - "openai/whisper-small.en", + "openai/whisper-small.en", "openai/whisper-base.en", "openai/whisper-tiny.en", ], } -# Global whisper model and transcription handler + +# Global model configuration _model_type = model_ids["Distil-Whisper"] -_model_id = _model_type[0] +_model_id = _model_type[0] # Use distil-large-v2 for best quality +_ov_config = OpenVINOConfig() -logger.info(f"Loading Whisper model: {_model_id}") -_processor: Any = AutoProcessor.from_pretrained(pretrained_model_name_or_path=_model_id) # type: ignore -logger.info("Whisper processor loaded successfully") -_pt_model: Any = AutoModelForSpeechSeq2Seq.from_pretrained( - pretrained_model_name_or_path=_model_id -) # type: ignore -_pt_model.eval() # type: ignore -logger.info("Whisper model loaded and set to evaluation mode") +def setup_intel_arc_environment() -> None: + """Configure environment variables for optimal Intel Arc B580 performance.""" + os.environ["OV_GPU_CACHE_MODEL"] = "1" + os.environ["OV_GPU_ENABLE_OPENCL_THROTTLING"] = "0" + os.environ["OV_GPU_DISABLE_WINOGRAD"] = "1" + logger.info("Intel Arc B580 environment variables configured") -_audio_processors: Dict[str, "AudioProcessor"] = {} # Per-peer audio processors + +class OpenVINOWhisperModel: + """OpenVINO optimized Whisper model for Intel Arc B580.""" + + def __init__(self, model_id: str, config: OpenVINOConfig): + self.model_id = model_id + self.config = config + self.model_path = Path(model_id.replace('/', '_')) + self.quantized_model_path = Path(f"{self.model_path}_quantized") + + self.processor: Optional[AutoProcessor] = None + self.ov_model: Optional[OVModelForSpeechSeq2Seq] = None + self.is_quantized = False + + self._initialize_model() + + def _initialize_model(self) -> None: + """Initialize processor and OpenVINO model with robust error handling.""" + logger.info(f"Initializing OpenVINO Whisper model: {self.model_id}") + + try: + # Initialize processor + self.processor = AutoProcessor.from_pretrained(self.model_id) + logger.info("Whisper processor loaded successfully") + + # Try to load quantized model first if it exists + if QUANTIZATION_AVAILABLE and self.config.enable_quantization and self.quantized_model_path.exists(): + if self._try_load_quantized_model(): + return + + # Load or create FP16 model + if self.model_path.exists(): + self._load_fp16_model() + else: + self._convert_model() + + # Try quantization after model is loaded and compiled + if QUANTIZATION_AVAILABLE and self.config.enable_quantization and not self.is_quantized: + self._try_quantize_existing_model() + + except Exception as e: + logger.error(f"Error initializing model: {e}") + # Fallback to basic conversion without quantization + self._fallback_initialization() + + def _fallback_initialization(self) -> None: + """Fallback initialization without quantization.""" + logger.warning("Falling back to basic OpenVINO conversion without quantization") + try: + if not self.model_path.exists(): + self._convert_model_basic() + self._load_fp16_model() + except Exception as e: + logger.error(f"Fallback initialization failed: {e}") + raise RuntimeError("Failed to initialize OpenVINO model") from e + + def _convert_model(self) -> None: + """Convert PyTorch model to OpenVINO format.""" + logger.info(f"Converting {self.model_id} to OpenVINO format...") + + try: + # Convert to OpenVINO with FP16 for Arc GPU + ov_model = OVModelForSpeechSeq2Seq.from_pretrained( + self.model_id, + ov_config=self.config.to_ov_config(), + export=True, + compile=False, + load_in_8bit=False + ) + + # Enable FP16 for Intel Arc performance + ov_model.half() + ov_model.save_pretrained(self.model_path) + logger.info("Model converted and saved in FP16 format") + + # Load the converted model + self.ov_model = ov_model + self._compile_model() + + except Exception as e: + logger.error(f"Model conversion failed: {e}") + raise + + def _convert_model_basic(self) -> None: + """Basic model conversion without advanced features.""" + logger.info(f"Basic conversion of {self.model_id} to OpenVINO format...") + + ov_model = OVModelForSpeechSeq2Seq.from_pretrained( + self.model_id, + export=True, + compile=False + ) + + ov_model.save_pretrained(self.model_path) + logger.info("Basic model conversion completed") + + def _load_fp16_model(self) -> None: + """Load existing FP16 OpenVINO model.""" + logger.info("Loading existing FP16 OpenVINO model...") + try: + self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained( + self.model_path, + ov_config=self.config.to_ov_config(), + compile=False + ) + self._compile_model() + except Exception as e: + logger.error(f"Failed to load FP16 model: {e}") + # Try basic loading + self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained( + self.model_path, + compile=False + ) + self._compile_model() + + def _try_load_quantized_model(self) -> bool: + """Try to load existing quantized model.""" + try: + logger.info("Loading existing INT8 quantized model...") + self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained( + self.quantized_model_path, + ov_config=self.config.to_ov_config(), + compile=False + ) + self._compile_model() + self.is_quantized = True + logger.info("Quantized model loaded successfully") + return True + except Exception as e: + logger.warning(f"Failed to load quantized model: {e}") + return False + + def _try_quantize_existing_model(self) -> None: + """Try to quantize the existing model after it's loaded.""" + if not QUANTIZATION_AVAILABLE: + logger.info("Quantization libraries not available, skipping quantization") + return + + if self.ov_model is None: + logger.warning("No model loaded, cannot quantize") + return + + # Check if model components are available + if not hasattr(self.ov_model, 'encoder') or self.ov_model.encoder is None: + logger.warning("Model encoder not available, skipping quantization") + return + + if not hasattr(self.ov_model, 'decoder_with_past') or self.ov_model.decoder_with_past is None: + logger.warning("Model decoder_with_past not available, skipping quantization") + return + + try: + logger.info("Attempting to quantize compiled model...") + self._quantize_model_safe() + except Exception as e: + logger.warning(f"Quantization failed, continuing with FP16 model: {e}") + + def _quantize_model_safe(self) -> None: + """Safely quantize the model with extensive error handling.""" + if self.quantized_model_path.exists(): + logger.info("Quantized model already exists") + return + + if self.ov_model is None: + raise RuntimeError("No model to quantize") + + logger.info("Creating INT8 quantized model for Intel Arc B580...") + + try: + # Collect calibration data with error handling + calibration_data = self._collect_calibration_data_safe() + if not calibration_data: + logger.warning("No calibration data collected, skipping quantization") + return + + # Quantize encoder + if calibration_data.get('encoder'): + logger.info("Quantizing encoder...") + quantized_encoder = nncf.quantize( + self.ov_model.encoder.model, + nncf.Dataset(calibration_data['encoder']), + model_type=nncf.ModelType.TRANSFORMER, + subset_size=min(len(calibration_data['encoder']), 50) + ) + else: + logger.warning("No encoder calibration data, copying original encoder") + quantized_encoder = self.ov_model.encoder.model + + # Quantize decoder + if calibration_data.get('decoder'): + logger.info("Quantizing decoder with past...") + quantized_decoder = nncf.quantize( + self.ov_model.decoder_with_past.model, + nncf.Dataset(calibration_data['decoder']), + model_type=nncf.ModelType.TRANSFORMER, + subset_size=min(len(calibration_data['decoder']), 50) + ) + else: + logger.warning("No decoder calibration data, copying original decoder") + quantized_decoder = self.ov_model.decoder_with_past.model + + # Save quantized models + self.quantized_model_path.mkdir(parents=True, exist_ok=True) + ov.save_model(quantized_encoder, self.quantized_model_path / "openvino_encoder_model.xml") + ov.save_model(quantized_decoder, self.quantized_model_path / "openvino_decoder_with_past_model.xml") + + # Copy remaining files + self._copy_model_files() + + # Clean up + del quantized_encoder, quantized_decoder, calibration_data + gc.collect() + + # Load quantized model + if self._try_load_quantized_model(): + logger.info("Quantization completed successfully") + + except Exception as e: + logger.error(f"Quantization failed: {e}") + # Clean up partial quantization + if self.quantized_model_path.exists(): + shutil.rmtree(self.quantized_model_path, ignore_errors=True) + + def _collect_calibration_data_safe(self, dataset_size: int = 20) -> Dict[str, CalibrationData]: + """Safely collect calibration data with extensive error handling.""" + if self.ov_model is None or self.processor is None: + return {} + + logger.info(f"Collecting calibration data ({dataset_size} samples)...") + + # Check model components + if not hasattr(self.ov_model, 'encoder') or self.ov_model.encoder is None: + logger.warning("Encoder not available for calibration") + return {} + + if not hasattr(self.ov_model, 'decoder_with_past') or self.ov_model.decoder_with_past is None: + logger.warning("Decoder with past not available for calibration") + return {} + + # Check if requests are available + if not hasattr(self.ov_model.encoder, 'request') or self.ov_model.encoder.request is None: + logger.warning("Encoder request not available for calibration") + return {} + + if not hasattr(self.ov_model.decoder_with_past, 'request') or self.ov_model.decoder_with_past.request is None: + logger.warning("Decoder request not available for calibration") + return {} + + # Setup data collection + original_encoder_request = self.ov_model.encoder.request + original_decoder_request = self.ov_model.decoder_with_past.request + + encoder_data: CalibrationData = [] + decoder_data: CalibrationData = [] + + try: + self.ov_model.encoder.request = InferRequestWrapper(original_encoder_request, encoder_data) + self.ov_model.decoder_with_past.request = InferRequestWrapper(original_decoder_request, decoder_data) + + # Generate synthetic calibration data instead of loading dataset + logger.info("Generating synthetic calibration data...") + for i in range(dataset_size): + try: + # Generate random audio similar to speech + duration = 2.0 + np.random.random() * 3.0 # 2-5 seconds + synthetic_audio = np.random.randn(int(SAMPLE_RATE * duration)).astype(np.float32) * 0.1 + + input_features = self.processor( + synthetic_audio, + sampling_rate=SAMPLE_RATE, + return_tensors="pt" + ).input_features + + # Run inference to collect calibration data + _ = self.ov_model.generate(input_features, max_new_tokens=10) + + if i % 5 == 0: + logger.debug(f"Generated calibration sample {i+1}/{dataset_size}") + + except Exception as e: + logger.warning(f"Failed to generate calibration sample {i}: {e}") + continue + + except Exception as e: + logger.error(f"Error during calibration data collection: {e}") + finally: + # Restore original requests + try: + self.ov_model.encoder.request = original_encoder_request + self.ov_model.decoder_with_past.request = original_decoder_request + except Exception as e: + logger.warning(f"Failed to restore original requests: {e}") + + result = {} + if encoder_data: + result['encoder'] = encoder_data + logger.info(f"Collected {len(encoder_data)} encoder calibration samples") + if decoder_data: + result['decoder'] = decoder_data + logger.info(f"Collected {len(decoder_data)} decoder calibration samples") + + return result + + def _copy_model_files(self) -> None: + """Copy necessary model files for quantized model.""" + try: + # Copy config and first-step decoder + if (self.model_path / "config.json").exists(): + shutil.copy(self.model_path / "config.json", self.quantized_model_path / "config.json") + + decoder_xml = self.model_path / "openvino_decoder_model.xml" + decoder_bin = self.model_path / "openvino_decoder_model.bin" + + if decoder_xml.exists(): + shutil.copy(decoder_xml, self.quantized_model_path / "openvino_decoder_model.xml") + if decoder_bin.exists(): + shutil.copy(decoder_bin, self.quantized_model_path / "openvino_decoder_model.bin") + + except Exception as e: + logger.warning(f"Failed to copy some model files: {e}") + + def _compile_model(self) -> None: + """Compile model for Intel Arc B580.""" + if self.ov_model is None: + raise RuntimeError("Model not loaded") + + logger.info("Compiling model for Intel Arc B580...") + try: + self.ov_model.to(self.config.device) + self.ov_model.compile() + + # Warmup for optimal performance + self._warmup_model() + logger.info("Model compiled and warmed up successfully") + except Exception as e: + logger.warning(f"Failed to compile for GPU, trying CPU: {e}") + # Fallback to CPU + try: + self.ov_model.to("CPU") + self.ov_model.compile() + self._warmup_model() + logger.info("Model compiled for CPU successfully") + except Exception as cpu_e: + logger.error(f"Failed to compile for CPU as well: {cpu_e}") + raise + + def _warmup_model(self) -> None: + """Warmup model for consistent GPU performance.""" + if self.ov_model is None or self.processor is None: + return + + try: + logger.info("Warming up model...") + dummy_audio = np.random.randn(SAMPLE_RATE).astype(np.float32) # 1 second + dummy_features = self.processor( + dummy_audio, + sampling_rate=SAMPLE_RATE, + return_tensors="pt" + ).input_features + + # Run warmup iterations + for i in range(3): + _ = self.ov_model.generate(dummy_features, max_new_tokens=10) + if i == 0: + logger.debug("First warmup iteration completed") + except Exception as e: + logger.warning(f"Model warmup failed: {e}") + + def generate(self, input_features: torch.Tensor) -> torch.Tensor: + """Generate transcription from input features.""" + if self.ov_model is None: + raise RuntimeError("Model not initialized") + + return self.ov_model.generate( + input_features, + max_new_tokens=128, + num_beams=1, # Greedy decoding for speed + do_sample=False + ) + + def decode(self, token_ids: torch.Tensor, skip_special_tokens: bool = True) -> List[str]: + """Decode token IDs to text.""" + if self.processor is None: + raise RuntimeError("Processor not initialized") + + return self.processor.batch_decode(token_ids, skip_special_tokens=skip_special_tokens) + + +# Global model instance with deferred loading +_whisper_model: Optional[OpenVINOWhisperModel] = None +_audio_processors: Dict[str, "OptimizedAudioProcessor"] = {} _send_chat_func: Optional[Callable[[str], Awaitable[None]]] = None -def extract_input_features(audio_array: Any, sampling_rate: int) -> Any: - """Extract input features from audio array and sampling rate.""" - processor_output = _processor( # type: ignore +def _ensure_model_loaded() -> OpenVINOWhisperModel: + """Ensure the global model is loaded.""" + global _whisper_model + if _whisper_model is None: + setup_intel_arc_environment() + logger.info(f"Loading OpenVINO Whisper model: {_model_id}") + _whisper_model = OpenVINOWhisperModel(_model_id, _ov_config) + logger.info("OpenVINO Whisper model loaded successfully") + return _whisper_model + + +def extract_input_features(audio_array: AudioArray, sampling_rate: int) -> torch.Tensor: + """Extract input features from audio array optimized for OpenVINO.""" + model = _ensure_model_loaded() + if model.processor is None: + raise RuntimeError("Processor not initialized") + + processor_output = model.processor( audio_array, sampling_rate=sampling_rate, return_tensors="pt", ) - input_features: Any = processor_output.input_features # type: ignore - return input_features # type: ignore + return processor_output.input_features -class AudioProcessor: - """Handles audio stream processing and transcription with sentence chunking for a specific peer.""" - - main_loop: Optional[asyncio.AbstractEventLoop] - - def __init__( - self, peer_name: str, send_chat_func: Callable[[str], Awaitable[None]] - ): +class OptimizedAudioProcessor: + """Optimized audio processor for Intel Arc B580 with reduced latency.""" + + def __init__(self, peer_name: str, send_chat_func: Callable[[str], Awaitable[None]]): self.peer_name = peer_name self.send_chat_func = send_chat_func - self.sample_rate = 16000 # Whisper expects 16kHz - self.samples_per_frame = 480 # Common WebRTC frame size at 16kHz (30ms) - - # Audio buffering - self.audio_buffer: Deque[AudioArray] = deque( - maxlen=1000 - ) # ~30 seconds at 30ms frames - self.phrase_timeout = ( - 3.0 # seconds of silence before considering phrase complete - ) + self.sample_rate = SAMPLE_RATE + + # Optimized buffering parameters + self.chunk_size = int(self.sample_rate * CHUNK_DURATION_MS / 1000) # 100ms chunks + self.buffer_size = self.chunk_size * 50 # 5 seconds max + + # Circular buffer for zero-copy operations + self.audio_buffer = np.zeros(self.buffer_size, dtype=np.float32) + self.write_ptr = 0 + self.read_ptr = 0 + + # Voice Activity Detection + self.vad_threshold = VAD_THRESHOLD + self.silence_frames = 0 + self.max_silence_frames = MAX_SILENCE_FRAMES + + # Processing state + self.current_phrase_audio = np.array([], dtype=np.float32) + self.transcription_history: List[TranscriptionHistoryItem] = [] self.last_activity_time = time.time() - - # Transcription state - self.current_phrase_audio: AudioArray = np.array([], dtype=np.float32) - self.transcription_history: list[TranscriptionHistoryItem] = [] - - # Capture the main thread's event loop for background processing + + # Async processing + self.processing_queue: asyncio.Queue[AudioQueueItem] = asyncio.Queue(maxsize=10) + self.is_running = True + + # Start async processing task try: self.main_loop = asyncio.get_running_loop() - logger.debug(f"Captured main event loop for {self.peer_name}") + asyncio.create_task(self._async_processing_loop()) + logger.info(f"Started async processing for {self.peer_name}") except RuntimeError: - # No event loop running, we'll need to create one + # Fallback to thread-based processing self.main_loop = None - logger.warning(f"No event loop running when initializing AudioProcessor for {self.peer_name}") - - # Background processing - self.processing_queue: Queue[AudioQueueItem] = Queue() - self.is_running = True - self.processor_thread = threading.Thread( - target=self._processing_loop, daemon=True - ) - self.processor_thread.start() - - logger.info( - f"AudioProcessor initialized for {self.peer_name} - sample_rate: {self.sample_rate}Hz, frame_size: {self.samples_per_frame}, phrase_timeout: {self.phrase_timeout}s" - ) - - def add_audio_data(self, audio_data: AudioArray): - """Add new audio data to the processing buffer.""" - if not self.is_running: - logger.debug("AudioProcessor not running, ignoring audio data") + self.processor_thread = threading.Thread(target=self._thread_processing_loop, daemon=True) + self.processor_thread.start() + logger.warning(f"Using thread-based processing for {self.peer_name}") + + logger.info(f"OptimizedAudioProcessor initialized for {self.peer_name}") + + def add_audio_data(self, audio_data: AudioArray) -> None: + """Add audio data with Voice Activity Detection and circular buffering.""" + if not self.is_running or len(audio_data) == 0: return - - # Resample if needed (WebRTC might provide different sample rates) - if len(audio_data) > 0: - audio_received_time = time.time() - self.audio_buffer.append(audio_data) - self.last_activity_time = audio_received_time - - # Calculate audio metrics to detect silence - audio_rms = np.sqrt(np.mean(audio_data**2)) - audio_peak = np.max(np.abs(audio_data)) - - # Log audio buffer status (reduced verbosity) - buffer_duration_ms = len(self.audio_buffer) * 30 # assuming 30ms frames - - # Only log if we have meaningful audio or every 50 frames - if audio_rms > 0.001 or len(self.audio_buffer) % 50 == 0: - logger.info( - f"📥 AUDIO BUFFER ADD at {audio_received_time:.3f}: {len(audio_data)} samples, buffer size: {len(self.audio_buffer)} frames ({buffer_duration_ms}ms), RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f} (peer: {self.peer_name})" - ) - else: - logger.debug( - f"Added silent audio chunk: {len(audio_data)} samples, buffer size: {len(self.audio_buffer)} frames" - ) - - # Check if we should process accumulated audio - if len(self.audio_buffer) >= 10: # Process every ~300ms (10 * 30ms frames) - # Check if we have any meaningful audio in the buffer - combined_audio = np.concatenate(list(self.audio_buffer)) - combined_rms = np.sqrt(np.mean(combined_audio**2)) - - if combined_rms > 0.001: # Only process if not silence - buffer_queue_time = time.time() - logger.info( - f"🚀 BUFFER QUEUING at {buffer_queue_time:.3f}: Buffer threshold reached with meaningful audio (RMS: {combined_rms:.4f}), queuing for processing (peer: {self.peer_name})" - ) - self._queue_for_processing() - else: - logger.debug( - f"Buffer threshold reached but audio is silent (RMS: {combined_rms:.4f}), clearing buffer" - ) - self.audio_buffer.clear() # Clear silent audio - - def _queue_for_processing(self): - """Queue current audio buffer for transcription processing.""" - if not self.audio_buffer: - logger.debug("No audio in buffer to queue for processing") + + # Voice Activity Detection + energy = np.sqrt(np.mean(audio_data**2)) + has_speech = energy > self.vad_threshold + + if not has_speech: + self.silence_frames += 1 + if self.silence_frames > self.max_silence_frames: + # Clear current phrase on long silence + if len(self.current_phrase_audio) > 0: + self._queue_final_transcription() + return + else: + self.silence_frames = 0 + self.last_activity_time = time.time() + + # Add to circular buffer (zero-copy when possible) + self._add_to_circular_buffer(audio_data) + + # Check if we should process + if self._available_samples() >= self.chunk_size: + self._queue_for_processing() + + def _add_to_circular_buffer(self, audio_data: AudioArray) -> None: + """Add data to circular buffer efficiently.""" + chunk_len = len(audio_data) + + if self.write_ptr + chunk_len <= self.buffer_size: + # Simple case - no wraparound + self.audio_buffer[self.write_ptr:self.write_ptr + chunk_len] = audio_data + else: + # Wraparound case + first_part = self.buffer_size - self.write_ptr + self.audio_buffer[self.write_ptr:] = audio_data[:first_part] + self.audio_buffer[:chunk_len - first_part] = audio_data[first_part:] + + self.write_ptr = (self.write_ptr + chunk_len) % self.buffer_size + + def _available_samples(self) -> int: + """Calculate available samples in circular buffer.""" + if self.write_ptr >= self.read_ptr: + return self.write_ptr - self.read_ptr + else: + return self.buffer_size - self.read_ptr + self.write_ptr + + def _extract_chunk(self, size: int) -> AudioArray: + """Extract chunk from circular buffer.""" + if self.read_ptr + size <= self.buffer_size: + chunk = self.audio_buffer[self.read_ptr:self.read_ptr + size].copy() + else: + first_part = self.buffer_size - self.read_ptr + chunk = np.concatenate([ + self.audio_buffer[self.read_ptr:], + self.audio_buffer[:size - first_part] + ]) + + self.read_ptr = (self.read_ptr + size) % self.buffer_size + return chunk.astype(np.float32) + + def _queue_for_processing(self) -> None: + """Queue audio chunk for processing.""" + available = self._available_samples() + if available < self.chunk_size: return - - # Combine recent audio frames - combined_audio = np.concatenate(list(self.audio_buffer)) - self.audio_buffer.clear() - - # Calculate audio metrics - audio_duration_sec = len(combined_audio) / self.sample_rate - audio_rms = np.sqrt(np.mean(combined_audio**2)) - audio_peak = np.max(np.abs(combined_audio)) - - # Skip completely silent audio - if audio_rms < 0.001 and audio_peak < 0.001: - logger.debug( - f"Skipping silent audio chunk: RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" - ) - return - - logger.info( - f"📦 AUDIO CHUNK QUEUED at {time.time():.3f}: {len(combined_audio)} samples, {audio_duration_sec:.2f}s duration, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f} (peer: {self.peer_name})" - ) - - # Add to processing queue - try: - queue_item = AudioQueueItem(audio=combined_audio, timestamp=time.time()) - self.processing_queue.put_nowait(queue_item) - logger.info( - f"📋 PROCESSING QUEUE ADD at {time.time():.3f}: Added to processing queue, queue size: {self.processing_queue.qsize()} (peer: {self.peer_name})" - ) - except Exception as e: - # Queue full, skip this chunk - logger.warning(f"Audio processing queue full, dropping audio chunk: {e}") - - def _processing_loop(self): - """Background thread that processes audio chunks for transcription.""" - global _whisper_model - - logger.info("ASR processing loop started") - + + # Extract chunk for processing + chunk = self._extract_chunk(self.chunk_size) + + # Create queue item + queue_item = AudioQueueItem(audio=chunk, timestamp=time.time()) + + # Queue for processing + if self.main_loop: + try: + self.processing_queue.put_nowait(queue_item) + except asyncio.QueueFull: + logger.warning(f"Processing queue full for {self.peer_name}, dropping chunk") + else: + # Thread-based fallback + try: + threading_queue = getattr(self, '_threading_queue', None) + if threading_queue: + threading_queue.put_nowait(queue_item) + except: + logger.warning(f"Threading queue issue for {self.peer_name}") + + def _queue_final_transcription(self) -> None: + """Queue final transcription of current phrase.""" + if len(self.current_phrase_audio) > self.sample_rate * 0.5: # At least 0.5 seconds + if self.main_loop: + asyncio.create_task(self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=True)) + + self.current_phrase_audio = np.array([], dtype=np.float32) + + async def _async_processing_loop(self) -> None: + """Async processing loop for audio chunks.""" + logger.info(f"Started async processing loop for {self.peer_name}") + while self.is_running: try: - # Get audio chunk to process (blocking with timeout) - try: - audio_data = self.processing_queue.get(timeout=1.0) - processing_start_time = time.time() - logger.info( - f"🔄 PROCESSING STARTED at {processing_start_time:.3f}: Retrieved audio chunk from queue, remaining queue size: {self.processing_queue.qsize()} (peer: {self.peer_name})" - ) - except Empty: - logger.debug("Processing queue timeout, checking for more audio...") - continue - - audio_array = audio_data.audio - chunk_timestamp = audio_data.timestamp - - # Check if this is a new phrase (gap in audio) - time_since_last = chunk_timestamp - self.last_activity_time - phrase_complete = time_since_last > self.phrase_timeout - - logger.debug( - f"Processing audio chunk: {len(audio_array)} samples, time since last: {time_since_last:.2f}s, phrase_complete: {phrase_complete}" - ) - - if phrase_complete and len(self.current_phrase_audio) > 0: - # Process the completed phrase - phrase_duration = len(self.current_phrase_audio) / self.sample_rate - phrase_rms = np.sqrt(np.mean(self.current_phrase_audio**2)) - - logger.info( - f"Processing completed phrase: {phrase_duration:.2f}s duration, {len(self.current_phrase_audio)} samples, RMS: {phrase_rms:.4f}" - ) - - if self.main_loop and not self.main_loop.is_closed(): - asyncio.run_coroutine_threadsafe( - self._transcribe_and_send( - self.current_phrase_audio.copy(), is_final=True - ), - self.main_loop, - ) - else: - logger.warning( - f"No event loop available for final transcription (peer: {self.peer_name})" - ) - pass + # Get audio chunk + audio_item = await asyncio.wait_for(self.processing_queue.get(), timeout=1.0) + + # Add to current phrase + self.current_phrase_audio = np.concatenate([self.current_phrase_audio, audio_item.audio]) + + # Check if we should transcribe + phrase_duration = len(self.current_phrase_audio) / self.sample_rate + + if phrase_duration >= 1.0: # Transcribe every 1 second + await self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=False) + + except asyncio.TimeoutError: + # Check for final transcription on timeout + if len(self.current_phrase_audio) > 0 and time.time() - self.last_activity_time > 2.0: + await self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=True) self.current_phrase_audio = np.array([], dtype=np.float32) - - # Add new audio to current phrase - old_phrase_length = len(self.current_phrase_audio) - self.current_phrase_audio = np.concatenate( - [self.current_phrase_audio, audio_array] - ) - current_phrase_duration = ( - len(self.current_phrase_audio) / self.sample_rate - ) - - logger.debug( - f"Updated current phrase: {old_phrase_length} -> {len(self.current_phrase_audio)} samples ({current_phrase_duration:.2f}s)" - ) - - # Lower the threshold for streaming transcription to catch shorter phrases - min_transcription_duration = 1.0 # Reduced from 2.0 seconds - - if ( - len(self.current_phrase_audio) - > self.sample_rate * min_transcription_duration - ): # At least 1 second - phrase_rms = np.sqrt(np.mean(self.current_phrase_audio**2)) - logger.info( - f"Current phrase >= {min_transcription_duration}s (RMS: {phrase_rms:.4f}), attempting streaming transcription" - ) - if self.main_loop and not self.main_loop.is_closed(): - asyncio.run_coroutine_threadsafe( - self._transcribe_and_send( - self.current_phrase_audio.copy(), is_final=False - ), - self.main_loop, - ) - else: - logger.warning( - f"No event loop available for streaming transcription (peer: {self.peer_name})" - ) - except Exception as e: - logger.error(f"Error in audio processing loop: {e}", exc_info=True) - - logger.info("ASR processing loop ended") - - async def _transcribe_and_send(self, audio_array: AudioArray, is_final: bool): - """Transcribe audio and send result as chat message.""" - global sample_rate - - transcription_start_time = time.time() + logger.error(f"Error in async processing loop for {self.peer_name}: {e}") + + logger.info(f"Async processing loop ended for {self.peer_name}") + + def _thread_processing_loop(self) -> None: + """Thread-based processing loop fallback.""" + self._threading_queue: Queue[AudioQueueItem] = Queue(maxsize=10) + logger.info(f"Started thread processing loop for {self.peer_name}") + + while self.is_running: + try: + audio_item = self._threading_queue.get(timeout=1.0) + + # Add to current phrase + self.current_phrase_audio = np.concatenate([self.current_phrase_audio, audio_item.audio]) + + # Check if we should transcribe + phrase_duration = len(self.current_phrase_audio) / self.sample_rate + + if phrase_duration >= 1.0: + if self.main_loop: + asyncio.run_coroutine_threadsafe( + self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=False), + self.main_loop + ) + + except Empty: + # Check for final transcription + if len(self.current_phrase_audio) > 0 and time.time() - self.last_activity_time > 2.0: + if self.main_loop: + asyncio.run_coroutine_threadsafe( + self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=True), + self.main_loop + ) + self.current_phrase_audio = np.array([], dtype=np.float32) + except Exception as e: + logger.error(f"Error in thread processing loop for {self.peer_name}: {e}") + + async def _transcribe_and_send(self, audio_array: AudioArray, is_final: bool) -> None: + """Transcribe audio using OpenVINO optimized model.""" + transcription_start = time.time() transcription_type = "final" if is_final else "streaming" - + try: - audio_duration_sec = len(audio_array) / self.sample_rate - - # Reduce minimum audio duration threshold - min_duration = 0.3 # Reduced from 0.5 seconds - if len(audio_array) < self.sample_rate * min_duration: - logger.debug( - f"Skipping {transcription_type} transcription: audio too short ({audio_duration_sec:.2f}s < {min_duration}s)" - ) + audio_duration = len(audio_array) / self.sample_rate + + # Skip very short audio + if audio_duration < 0.3: + logger.debug(f"Skipping {transcription_type} transcription: too short ({audio_duration:.2f}s)") return - - # Calculate audio quality metrics + + # Audio quality check audio_rms = np.sqrt(np.mean(audio_array**2)) - audio_peak = np.max(np.abs(audio_array)) - - # More lenient silence detection - if audio_rms < 0.0005: # Very quiet threshold - logger.debug( - f"Skipping {transcription_type} transcription: audio too quiet (RMS: {audio_rms:.6f})" - ) + if audio_rms < 0.001: + logger.debug(f"Skipping {transcription_type} transcription: too quiet (RMS: {audio_rms:.6f})") return - - logger.info( - f"🎬 TRANSCRIPTION STARTED ({transcription_type}) at {time.time():.3f}: {audio_duration_sec:.2f}s audio, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f} (peer: {self.peer_name})" - ) - - # Ensure audio is in the right format for Whisper - audio_array = audio_array.astype(np.float32) - - # Transcribe with Whisper - feature_extraction_start = time.time() - input_features = extract_input_features(audio_array, sample_rate) - feature_extraction_time = time.time() - feature_extraction_start - - model_inference_start = time.time() - predicted_ids = _pt_model.generate(input_features) # type: ignore - model_inference_time = time.time() - model_inference_start - - decoding_start = time.time() - transcription = _processor.batch_decode( - predicted_ids, skip_special_tokens=True - ) # type: ignore - decoding_time = time.time() - decoding_start - - total_transcription_time = time.time() - transcription_start_time - - logger.debug( - f"ASR timing - Feature extraction: {feature_extraction_time:.3f}s, Model inference: {model_inference_time:.3f}s, Decoding: {decoding_time:.3f}s, Total: {total_transcription_time:.3f}s" - ) - - text = ( - transcription[0].strip() - if transcription and len(transcription) > 0 - else "" - ) - - if text and len(text) > 0: # Accept any non-empty text - # Calculate timing information for the message - chat_send_start = time.time() - total_transcription_time = chat_send_start - transcription_start_time + + logger.info(f"🎬 OpenVINO transcription ({transcription_type}) started: {audio_duration:.2f}s, RMS: {audio_rms:.4f}") + + # Extract features for OpenVINO + input_features = extract_input_features(audio_array, self.sample_rate) + + # GPU inference with OpenVINO + model = _ensure_model_loaded() + predicted_ids = model.generate(input_features) + + # Decode results + transcription = model.decode(predicted_ids, skip_special_tokens=True) + text = transcription[0].strip() if transcription else "" + + transcription_time = time.time() - transcription_start + + if text and len(text.split()) >= 2: + # Create message with timing + status_marker = "⚡" if is_final else "🎤" + type_marker = "" if is_final else " [streaming]" + timing_info = f" (🚀 {transcription_time:.2f}s)" - # Create message with timing information included - status_marker = "🎤" if is_final else "🎤" - type_marker = "" if is_final else " [partial]" - timing_info = f" (⏱️ {total_transcription_time:.2f}s from start: {transcription_start_time:.3f})" + message = f"{status_marker} {self.peer_name}{type_marker}: {text}{timing_info}" - prefix = f"{status_marker} {self.peer_name}{type_marker}: " - message = f"{prefix}{text}{timing_info}" - - # Avoid sending duplicate messages (check text only, not timing) - text_only_message = f"{prefix}{text}" - if is_final or text_only_message not in [ - h.message.split(' (⏱️')[0] for h in self.transcription_history[-3:] - ]: + # Avoid duplicates + if not self._is_duplicate(text): await self.send_chat_func(message) - chat_send_time = time.time() - chat_send_start - message_sent_time = time.time() - - logger.info( - f"💬 CHAT MESSAGE SENT at {message_sent_time:.3f}: '{text}' (transcription started: {transcription_start_time:.3f}, chat send took: {chat_send_time:.3f}s, peer: {self.peer_name})" - ) - - # Keep history for deduplication - history_item = TranscriptionHistoryItem( - message=message, timestamp=time.time(), is_final=is_final - ) - self.transcription_history.append(history_item) - - # Limit history size + + # Update history + self.transcription_history.append(TranscriptionHistoryItem( + message=message, + timestamp=time.time(), + is_final=is_final + )) + + # Limit history if len(self.transcription_history) > 10: self.transcription_history.pop(0) - - logger.info( - f"✅ Transcribed ({transcription_type}) for {self.peer_name}: '{text}' (processing time: {total_transcription_time:.3f}s, audio duration: {audio_duration_sec:.2f}s)" - ) - # Log end-to-end pipeline timing - total_pipeline_time = message_sent_time - transcription_start_time - logger.info( - f"⏱️ PIPELINE TIMING ({transcription_type}): Total={total_pipeline_time:.3f}s (Transcription={total_transcription_time:.3f}s, Chat Send={chat_send_time:.3f}s, peer: {self.peer_name}) | 🕐 Start: {transcription_start_time:.3f}, End: {message_sent_time:.3f}" - ) + logger.info(f"✅ OpenVINO transcription ({transcription_type}): '{text}' ({transcription_time:.3f}s)") else: - logger.debug( - f"Skipping duplicate {transcription_type} transcription: '{text}'" - ) + logger.debug(f"Skipping duplicate {transcription_type} transcription: '{text}'") else: - logger.info( - f"❌ No text from {transcription_type} transcription for {self.peer_name} (empty result from model)" - ) - + logger.debug(f"Empty or too short transcription result: '{text}'") + except Exception as e: - logger.error( - f"Error in {transcription_type} transcription: {e}", exc_info=True - ) - - def shutdown(self): + logger.error(f"Error in OpenVINO {transcription_type} transcription: {e}", exc_info=True) + + def _is_duplicate(self, text: str) -> bool: + """Check if transcription is duplicate of recent ones.""" + recent_texts = [h.message.split(': ', 1)[-1].split(' (🚀')[0] + for h in self.transcription_history[-3:]] + return text in recent_texts + + def shutdown(self) -> None: """Shutdown the audio processor.""" - logger.info(f"Shutting down AudioProcessor for {self.peer_name}...") + logger.info(f"Shutting down OptimizedAudioProcessor for {self.peer_name}...") self.is_running = False - if self.processor_thread.is_alive(): - logger.debug( - f"Waiting for processor thread for {self.peer_name} to finish..." - ) + + # Final transcription if needed + if len(self.current_phrase_audio) > 0: + if self.main_loop: + asyncio.create_task(self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=True)) + + # Cleanup thread if exists + if hasattr(self, 'processor_thread') and self.processor_thread.is_alive(): self.processor_thread.join(timeout=2.0) - if self.processor_thread.is_alive(): - logger.warning( - f"Processor thread for {self.peer_name} did not shut down cleanly within timeout" - ) - else: - logger.info( - f"Processor thread for {self.peer_name} shut down successfully" - ) - logger.info(f"AudioProcessor for {self.peer_name} shutdown complete") + + logger.info(f"OptimizedAudioProcessor shutdown complete for {self.peer_name}") -async def handle_track_received(peer: Peer, track: MediaStreamTrack): +async def handle_track_received(peer: Peer, track: MediaStreamTrack) -> None: """Handle incoming audio tracks from WebRTC peers.""" global _audio_processors, _send_chat_func - + if track.kind != "audio": logger.info(f"Ignoring non-audio track from {peer.peer_name}: {track.kind}") return - - # Create or get audio processor for this peer + + # Create audio processor if peer.peer_name not in _audio_processors: if _send_chat_func is None: - logger.error( - f"Cannot create AudioProcessor for {peer.peer_name}: no send_chat_func available" - ) + logger.error(f"Cannot create processor for {peer.peer_name}: no send_chat_func") return - - logger.info(f"Creating new AudioProcessor for {peer.peer_name}") - _audio_processors[peer.peer_name] = AudioProcessor( - peer_name=peer.peer_name, send_chat_func=_send_chat_func + + logger.info(f"Creating OptimizedAudioProcessor for {peer.peer_name}") + _audio_processors[peer.peer_name] = OptimizedAudioProcessor( + peer_name=peer.peer_name, + send_chat_func=_send_chat_func ) - + audio_processor = _audio_processors[peer.peer_name] - - logger.info( - f"Received audio track from {peer.peer_name}, starting transcription" - ) - - # Start the frame reception loop - + logger.info(f"Starting OpenVINO audio processing for {peer.peer_name}") + try: frame_count = 0 while True: try: - # Receive audio frame frame = await track.recv() frame_count += 1 - # Log less frequently now that we know frames are being received + if frame_count % 100 == 0: - logger.info(f"Received {frame_count} frames from {peer.peer_name}") + logger.debug(f"Received {frame_count} frames from {peer.peer_name}") + except MediaStreamError as e: - # Connection was closed or media stream ended - this is normal - logger.info( - f"Audio stream ended for {peer.peer_name} (MediaStreamError: {e})" - ) + logger.info(f"Audio stream ended for {peer.peer_name}: {e}") break except Exception as e: - # Other errors during frame reception - logger.error( - f"Error receiving audio frame from {peer.peer_name}: {e}", exc_info=True - ) + logger.error(f"Error receiving frame from {peer.peer_name}: {e}") break - - # Check if this is an audio frame and convert to numpy array for processing + if isinstance(frame, AudioFrame): - # Convert AudioFrame to numpy array try: + # Convert frame to numpy array audio_data = frame.to_ndarray() - except Exception as e: - logger.error(f"Error converting frame to ndarray for {peer.peer_name}: {e}") - continue - - original_shape = audio_data.shape - original_dtype = audio_data.dtype - - logger.debug( - f"Audio frame data: shape={original_shape}, dtype={original_dtype}, samples={frame.samples if hasattr(frame, 'samples') else 'unknown'}" - ) - - # Handle different audio formats - convert stereo to mono if needed - if audio_data.ndim == 2: # Stereo -> mono - if audio_data.shape[0] == 1: # Shape is (1, samples) - just squeeze the first dimension - audio_data = audio_data.squeeze(0) - logger.debug(f"Squeezed single-channel audio: {original_shape} -> {audio_data.shape}") - else: # True stereo (2, samples) or (samples, 2) - average channels - audio_data = np.mean(audio_data, axis=0 if audio_data.shape[0] > audio_data.shape[1] else 1) - logger.debug(f"Converted stereo to mono: {original_shape} -> {audio_data.shape}") - - # Convert to float32 and normalize based on data type - if audio_data.dtype == np.int16: - audio_data = audio_data.astype(np.float32) / 32768.0 - logger.debug("Normalized int16 audio to float32") - elif audio_data.dtype == np.int32: - audio_data = audio_data.astype(np.float32) / 2147483648.0 - logger.debug("Normalized int32 audio to float32") - - # Resample to 16kHz if needed for Whisper model - if frame.sample_rate != sample_rate: - original_length = len(audio_data) - # Use librosa to resample with explicit float64 conversion for better precision - try: - audio_float64 = audio_data.astype(np.float64) - - audio_data = librosa.resample( # type: ignore - audio_float64, orig_sr=frame.sample_rate, target_sr=sample_rate - ) - except Exception as e: - logger.error(f"Resampling failed for {peer.peer_name}: {str(e)}") - # Fall back to original data - audio_data = audio_data + # Handle audio format conversion + audio_data = _process_audio_frame(audio_data, frame) - logger.debug( - f"Resampled audio: {frame.sample_rate}Hz -> {sample_rate}Hz, {original_length} -> {len(audio_data)} samples" - ) - else: - # No resampling needed - pass - - # Ensure audio_data is properly typed as float32 and calculate frame metrics - audio_data_float32 = cast(AudioArray, audio_data.astype(np.float32)) - frame_rms = np.sqrt(np.mean(audio_data_float32**2)) - frame_peak = np.max(np.abs(audio_data_float32)) - - # Track frame count and audio state - frame_count = getattr(peer, "_whisper_frame_count", 0) + 1 - setattr(peer, "_whisper_frame_count", frame_count) - - # Track if we've seen audio before (to detect start of speech) - had_audio = getattr(peer, "_whisper_had_audio", False) - - # Define thresholds for "real audio" detection - audio_threshold = 0.001 # RMS threshold for detecting speech - has_audio = frame_rms > audio_threshold - - # Log important audio events - if has_audio and not had_audio: - # Started receiving audio - frame_info = f"{frame.sample_rate}Hz, {frame.format.name}, {frame.layout.name}" - logger.info( - f"🎤 AUDIO DETECTED from {peer.peer_name}! Frame #{frame_count}: {frame_info}, RMS: {frame_rms:.4f}, Peak: {frame_peak:.4f}" - ) - setattr(peer, "_whisper_had_audio", True) - setattr(peer, "_whisper_last_audio_frame", frame_count) - elif not has_audio and had_audio: - # Stopped receiving audio - last_audio_frame = getattr(peer, "_whisper_last_audio_frame", 0) - logger.info( - f"🔇 Audio stopped from {peer.peer_name} at frame #{frame_count} (last audio was frame #{last_audio_frame})" - ) - setattr(peer, "_whisper_had_audio", False) - elif has_audio: - # Continue receiving audio - update last audio frame but don't spam logs - setattr(peer, "_whisper_last_audio_frame", frame_count) - # Only log every 100 frames when continuously receiving audio - if frame_count % 100 == 0: - logger.info( - f"🎤 Audio continuing from {peer.peer_name}: Frame #{frame_count}, RMS: {frame_rms:.4f}" - ) - - # Log connection info much less frequently (every 200 frames when silent) - if not has_audio and frame_count % 200 == 0: - logger.debug( - f"Connection active from {peer.peer_name}: Frame #{frame_count} (silent, RMS: {frame_rms:.6f})" - ) - - # Send processed audio to the audio processor for transcription - if audio_processor: + # Resample if needed + if frame.sample_rate != SAMPLE_RATE: + audio_data = _resample_audio(audio_data, frame.sample_rate, SAMPLE_RATE) + + # Convert to float32 + audio_data_float32 = cast(AudioArray, audio_data.astype(np.float32)) + + # Process with optimized processor audio_processor.add_audio_data(audio_data_float32) - else: - logger.warning( - f"No audio processor available to handle audio data for {peer.peer_name}" - ) - else: - logger.warning( - f"Received non-audio frame on audio track from {peer.peer_name}: type={type(frame)}" - ) - + + except Exception as e: + logger.error(f"Error processing audio frame for {peer.peer_name}: {e}") + continue + except Exception as e: - logger.error( - f"Unexpected error processing audio track from {peer.peer_name}: {e}", exc_info=True - ) + logger.error(f"Unexpected error in audio processing for {peer.peer_name}: {e}", exc_info=True) finally: - # Clean up the audio processor when the stream ends cleanup_peer_processor(peer.peer_name) +def _process_audio_frame(audio_data: npt.NDArray[Any], frame: AudioFrame) -> npt.NDArray[np.float32]: + """Process audio frame format conversion.""" + # Handle stereo to mono conversion + if audio_data.ndim == 2: + if audio_data.shape[0] == 1: + audio_data = audio_data.squeeze(0) + else: + audio_data = np.mean(audio_data, axis=0 if audio_data.shape[0] > audio_data.shape[1] else 1) + + # Normalize based on data type + if audio_data.dtype == np.int16: + audio_data = audio_data.astype(np.float32) / 32768.0 + elif audio_data.dtype == np.int32: + audio_data = audio_data.astype(np.float32) / 2147483648.0 + + return audio_data.astype(np.float32) + + +def _resample_audio(audio_data: npt.NDArray[np.float32], orig_sr: int, target_sr: int) -> npt.NDArray[np.float32]: + """Resample audio efficiently.""" + try: + # Use high-quality resampling for better results + resampled = librosa.resample( + audio_data.astype(np.float64), + orig_sr=orig_sr, + target_sr=target_sr, + res_type='kaiser_fast' # Good balance of quality and speed + ) + return resampled.astype(np.float32) + except Exception as e: + logger.error(f"Resampling failed: {e}") + return audio_data + + +# Public API functions def agent_info() -> Dict[str, str]: return {"name": AGENT_NAME, "description": AGENT_DESCRIPTION, "has_media": "false"} -def create_agent_tracks(session_name: str) -> dict[str, MediaStreamTrack]: +def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]: """Whisper is not a media source - return no local tracks.""" return {} async def handle_chat_message( - chat_message: ChatMessageModel, send_message_func: Callable[[str], Awaitable[None]] + chat_message: ChatMessageModel, + send_message_func: Callable[[str], Awaitable[None]] ) -> Optional[str]: - """Handle incoming chat messages and optionally return a response.""" - pass + """Handle incoming chat messages.""" + return None -async def on_track_received(peer: Peer, track: MediaStreamTrack): +async def on_track_received(peer: Peer, track: MediaStreamTrack) -> None: """Callback when a new track is received from a peer.""" await handle_track_received(peer, track) -# Export functions for the orchestrator to discover -def get_track_handler(): - """Return the track handler function for the orchestrator to use.""" +def get_track_handler() -> Callable[[Peer, MediaStreamTrack], Awaitable[None]]: + """Return the track handler function.""" return on_track_received -def bind_send_chat_function(send_chat_func: Callable[[str], Awaitable[None]]): - """Bind the send chat function to be used for all audio processors.""" +def bind_send_chat_function(send_chat_func: Callable[[str], Awaitable[None]]) -> None: + """Bind the send chat function.""" global _send_chat_func, _audio_processors - logger.info("Binding send chat function to whisper agent") + + logger.info("Binding send chat function to OpenVINO whisper agent") _send_chat_func = send_chat_func - - # Update existing audio processors + + # Update existing processors for peer_name, processor in _audio_processors.items(): - logger.debug( - f"Updating AudioProcessor for {peer_name} with new send chat function" - ) processor.send_chat_func = send_chat_func + logger.debug(f"Updated processor for {peer_name} with new send chat function") -def cleanup_peer_processor(peer_name: str): - """Clean up audio processor for a disconnected peer.""" +def cleanup_peer_processor(peer_name: str) -> None: + """Clean up processor for disconnected peer.""" global _audio_processors - + if peer_name in _audio_processors: - logger.info(f"Cleaning up AudioProcessor for disconnected peer: {peer_name}") + logger.info(f"Cleaning up processor for {peer_name}") processor = _audio_processors[peer_name] processor.shutdown() del _audio_processors[peer_name] - logger.info(f"AudioProcessor for {peer_name} cleaned up successfully") - else: - logger.debug(f"No AudioProcessor found for peer {peer_name} during cleanup") + logger.info(f"Processor cleanup complete for {peer_name}") -def get_active_processors() -> Dict[str, "AudioProcessor"]: - """Get currently active audio processors (for debugging).""" +def get_active_processors() -> Dict[str, OptimizedAudioProcessor]: + """Get active processors for debugging.""" return _audio_processors.copy() + + +def get_model_info() -> Dict[str, Any]: + """Get information about the loaded model.""" + model = _ensure_model_loaded() + return { + "model_id": _model_id, + "device": _ov_config.device, + "quantization_enabled": _ov_config.enable_quantization, + "is_quantized": model.is_quantized, + "sample_rate": SAMPLE_RATE, + "chunk_duration_ms": CHUNK_DURATION_MS + } \ No newline at end of file