"""Streaming Whisper agent (bots/whisper) - OpenVINO Optimized for Intel Arc B580 Real-time speech transcription agent that processes incoming audio streams and sends transcriptions as chat messages to the lobby. Optimized for Intel Arc B580 GPU using OpenVINO inference engine. """ import asyncio import numpy as np import time import threading import os import gc import shutil from queue import Queue, Empty from typing import Dict, Optional, Callable, Awaitable, Any, List, Union from pathlib import Path import numpy.typing as npt from pydantic import BaseModel, Field, ConfigDict from typing import Tuple # Core dependencies import librosa from shared.logger import logger from aiortc import MediaStreamTrack from aiortc.mediastreams import MediaStreamError from av import AudioFrame, VideoFrame import cv2 import fractions from time import perf_counter # Import shared models for chat functionality import sys sys.path.append( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) from shared.models import ChatMessageModel from voicebot.models import Peer # OpenVINO optimized imports import openvino as ov from optimum.intel.openvino import OVModelForSpeechSeq2Seq # type: ignore from transformers import WhisperProcessor from transformers.generation.configuration_utils import GenerationConfig from openvino.runtime import Core # Part of optimum.intel.openvino # type: ignore import torch # Import quantization dependencies with error handling import nncf # type: ignore from optimum.intel.openvino.quantization import InferRequestWrapper # type: ignore QUANTIZATION_AVAILABLE = True # Type definitions AudioArray = npt.NDArray[np.float32] ModelConfig = Dict[str, Union[str, int, bool]] CalibrationData = List[Dict[str, Any]] _device = "GPU.1" # Default to Intel Arc B580 GPU # Global lock to serialize calls into the OpenVINO model.generate/decode # since some backends are not safe for concurrent generate calls. _generate_global_lock = threading.Lock() def get_available_devices() -> list[dict[str, Any]]: """List available OpenVINO devices with their properties.""" try: core = Core() devices = core.available_devices device_info: list[dict[str, Any]] = [] for device in devices: try: # Get device properties properties = core.get_property(device, "FULL_DEVICE_NAME") # Attempt to get additional properties if available try: device_type = core.get_property(device, "DEVICE_TYPE") except Exception: device_type = "N/A" try: capabilities: Any = core.get_property( device, "SUPPORTED_PROPERTIES" ) except Exception: capabilities = "N/A" device_info.append( { "name": device, "full_name": properties, "type": device_type, "capabilities": capabilities, } ) except Exception as e: logger.error(f"Failed to retrieve properties for device {device}: {e}") device_info.append( { "name": device, "full_name": "Unknown", "type": "N/A", "capabilities": "N/A", } ) return device_info except Exception as e: logger.error(f"Failed to retrieve available devices: {e}") return [] def print_available_devices(device: str | None = None): """Print available OpenVINO devices in a formatted manner.""" devices = get_available_devices() if not devices: logger.info("No OpenVINO devices detected.") return logger.info("Available OpenVINO Devices:") for d in devices: logger.info( f"- Device: {d.get('name')} {'*' if d.get('name') == device else ''}" ) logger.info(f" Full Name: {d.get('full_name')}") logger.info(f" Type: {d.get('type')}") print_available_devices(_device) class AudioQueueItem(BaseModel): """Audio data with timestamp for processing queue.""" model_config = ConfigDict(arbitrary_types_allowed=True) audio: AudioArray = Field(..., description="Audio data as numpy array") timestamp: float = Field(..., description="Timestamp when audio was captured") class TranscriptionHistoryItem(BaseModel): """Transcription history item with metadata.""" model_config = ConfigDict(arbitrary_types_allowed=True) message: str = Field(..., description="Transcribed text message") timestamp: float = Field(..., description="When transcription was completed") is_final: bool = Field( ..., description="Whether this is final or streaming transcription" ) class OpenVINOConfig(BaseModel): """OpenVINO configuration for Intel Arc B580 optimization.""" model_config = ConfigDict(arbitrary_types_allowed=True) device: str = Field(default=_device, description="Target device for inference") cache_dir: str = Field( default="./ov_cache", description="Cache directory for compiled models" ) enable_quantization: bool = Field( default=True, description="Enable INT8 quantization" ) throughput_streams: int = Field( default=2, description="Number of inference streams" ) max_threads: int = Field(default=8, description="Maximum number of threads") def to_ov_config(self) -> ModelConfig: """Convert to OpenVINO configuration dictionary.""" cfg: ModelConfig = {"CACHE_DIR": self.cache_dir} # Only include GPU-specific tuning options when the target device is GPU. # Some OpenVINO plugins (notably the CPU plugin) will raise NotFound # errors for GPU_* properties, so avoid passing them unless applicable. device = (self.device or "").upper() if device == "GPU": cfg.update( { # Throughput / stream tuning "GPU_THROUGHPUT_STREAMS": str(self.throughput_streams), # Threading controls may be driver/plugin-specific; keep minimal # NOTE: We intentionally do NOT set GPU_MAX_NUM_THREADS here # because some OpenVINO plugins / builds (and the CPU plugin # during a fallback) do not recognize the property and will # raise NotFound/UnsupportedProperty errors. If you need to # tune GPU threads for a specific driver, set that externally # or via vendor-specific tools. } ) else: # Safe CPU-side defaults cfg.update( { "CPU_THROUGHPUT_NUM_THREADS": str(self.max_threads), # "CPU_BIND_THREAD": "YES", # Removed: not supported by CPU plugin } ) return cfg # Global configuration and constants AGENT_NAME = "Transcription Bot" AGENT_DESCRIPTION = "Real-time speech transcription (OpenVINO Whisper) - converts speech to text on Intel Arc B580" SAMPLE_RATE = 16000 # Whisper expects 16kHz CHUNK_DURATION_MS = 100 # Reduced latency - 100ms chunks VAD_THRESHOLD = 0.01 # Initial voice activity detection threshold MAX_SILENCE_FRAMES = 30 # 3 seconds of silence before stopping (for overall silence) MAX_TRAILING_SILENCE_FRAMES = 5 # 0.5 seconds of trailing silence VAD_CONFIG: Dict[str, Any] = { "energy_threshold": 0.01, "zcr_threshold": 0.1, "adapt_thresholds": True, "adaptation_window": 100, # samples to consider for adaptation "speech_freq_min": 200, # Hz "speech_freq_max": 3000, # Hz } # Normalization defaults: used to control optional per-stream normalization # applied before sending audio to the model and for visualization. NORMALIZATION_ENABLED = True NORMALIZATION_TARGET_PEAK = 0.95 MAX_NORMALIZATION_GAIN = 3.0 # How long (seconds) of no-arriving audio before we consider the phrase ended INACTIVITY_TIMEOUT = 1.5 model_ids = { "Distil-Whisper": [ "distil-whisper/distil-large-v2", "distil-whisper/distil-medium.en", "distil-whisper/distil-small.en", ], "Whisper": [ "openai/whisper-large-v3", "openai/whisper-large-v2", "openai/whisper-large", "openai/whisper-medium", "openai/whisper-small", "openai/whisper-base", "openai/whisper-tiny", "openai/whisper-medium.en", "openai/whisper-small.en", "openai/whisper-base.en", "openai/whisper-tiny.en", ], } # Global model configuration _model_type = model_ids["Distil-Whisper"] _model_id = _model_type[0] # Use distil-large-v2 for best quality _ov_config = OpenVINOConfig() def setup_intel_arc_environment() -> None: """Configure environment variables for optimal Intel Arc B580 performance.""" os.environ["OV_GPU_CACHE_MODEL"] = "1" os.environ["OV_GPU_ENABLE_OPENCL_THROTTLING"] = "0" os.environ["OV_GPU_DISABLE_WINOGRAD"] = "1" logger.info("Intel Arc B580 environment variables configured") class AdvancedVAD: """Advanced Voice Activity Detection with noise rejection.""" def __init__(self, sample_rate: int = 16000): self.sample_rate = sample_rate # More permissive thresholds based on research self.energy_threshold = 0.005 # Reduced from 0.02 self.zcr_min = 0.02 # Reduced from 0.1 (voiced speech < 0.1) self.zcr_max = 0.8 # Increased from 0.5 (unvoiced speech ~0.3-0.8) # Spectral thresholds (keep existing - these work well) self.spectral_centroid_min = 200 # Slightly lower self.spectral_centroid_max = 4000 # Slightly higher self.spectral_rolloff_threshold = 3000 # More permissive # Relaxed temporal consistency self.minimum_duration = 0.2 # Reduced from 0.3s self.speech_history: List[bool] = [] self.max_history = 8 # Reduced from 10 # Adaptive noise floor self.noise_floor_energy = 0.001 self.noise_floor_centroid = 1000 self.adaptation_rate = 0.05 # Harmonicity improvements self.prev_magnitude = None self.harmonic_threshold = 0.15 # Reduced from 0.3 def analyze_frame(self, audio_data: AudioArray) -> Tuple[bool, Dict[str, Any]]: """Analyze audio frame for speech vs noise.""" # Basic energy features energy = np.sqrt(np.mean(audio_data**2)) # Zero-crossing rate signs = np.sign(audio_data) signs[signs == 0] = 1 zcr = np.sum(np.abs(np.diff(signs))) / (2 * len(audio_data)) # Spectral features spectral_features = self._compute_spectral_features(audio_data) # Individual feature checks # Use adaptive energy threshold (reduced multiplier) adaptive_energy_threshold = max( self.energy_threshold, self.noise_floor_energy * 2.0 # Reduced from 3.0 ) energy_check = energy > adaptive_energy_threshold # More permissive ZCR check (allow voiced OR unvoiced speech) zcr_check = ( self.zcr_min < zcr < self.zcr_max or # General range zcr < 0.1 or # Definitely voiced (0.2 < zcr < 0.6 and energy > self.energy_threshold * 2) # Unvoiced with energy ) # Spectral check (more permissive) spectral_check = ( self.spectral_centroid_min < spectral_features['centroid'] < self.spectral_centroid_max and spectral_features['rolloff'] < self.spectral_rolloff_threshold and spectral_features['flux'] > 0.005 # Reduced threshold ) # Improved harmonicity check harmonic_check = spectral_features['harmonicity'] > self.harmonic_threshold # More permissive combined decision (OR logic for some conditions) frame_has_speech = ( energy_check and ( zcr_check or # ZCR is good, OR spectral_check or # Spectral features are good, OR harmonic_check # Harmonicity is good ) ) or ( # Alternative path: strong energy + reasonable spectral energy > adaptive_energy_threshold * 1.5 and spectral_check ) # Update history self.speech_history.append(frame_has_speech) if len(self.speech_history) > self.max_history: self.speech_history.pop(0) # More permissive temporal consistency (2 of last 4, or 1 of last 2 if strong) if len(self.speech_history) >= 4: recent_speech = sum(self.speech_history[-4:]) >= 2 elif len(self.speech_history) >= 2: # For shorter history, be more permissive if recent frame is strong recent_speech = ( sum(self.speech_history[-2:]) >= 1 and (energy > adaptive_energy_threshold * 1.2 or frame_has_speech) ) else: recent_speech = frame_has_speech # Faster noise floor adaptation during silence if not frame_has_speech: self.noise_floor_energy = ( self.adaptation_rate * energy + (1 - self.adaptation_rate) * self.noise_floor_energy ) metrics: Dict[str, Any] = { 'energy': energy, 'zcr': zcr, 'centroid': spectral_features['centroid'], 'rolloff': spectral_features['rolloff'], 'flux': spectral_features['flux'], 'harmonicity': spectral_features['harmonicity'], 'noise_floor_energy': self.noise_floor_energy, 'adaptive_threshold': adaptive_energy_threshold, 'energy_check': energy_check, 'zcr_check': zcr_check, 'spectral_check': spectral_check, 'harmonic_check': harmonic_check, 'temporal_consistency': recent_speech } return recent_speech, metrics # type: ignore def _compute_spectral_features(self, audio_data: AudioArray) -> Dict[str, Any]: """Compute spectral features for speech detection.""" # Apply window to reduce spectral leakage windowed = audio_data * np.hanning(len(audio_data)) # FFT fft_data = np.fft.rfft(windowed) magnitude = np.abs(fft_data) freqs = np.fft.rfftfreq(len(windowed), 1/self.sample_rate) # Avoid division by zero if np.sum(magnitude) == 0: return { 'centroid': 0, 'rolloff': 0, 'flux': 0, 'harmonicity': 0 } # Spectral centroid centroid = np.sum(freqs * magnitude) / np.sum(magnitude) # Spectral rolloff (frequency below which 85% of energy is contained) cumsum = np.cumsum(magnitude) rolloff_point = 0.85 * cumsum[-1] rolloff_idx = np.where(cumsum >= rolloff_point)[0] rolloff = freqs[rolloff_idx[0]] if len(rolloff_idx) > 0 else freqs[-1] # Spectral flux (measure of spectral change) if hasattr(self, '_prev_magnitude'): flux = np.sum((magnitude - self._prev_magnitude) ** 2) else: flux = 0 self._prev_magnitude = magnitude.copy() # Harmonicity (detect harmonic structure typical of speech) harmonicity = self._compute_harmonicity(magnitude, freqs) return { 'centroid': centroid, 'rolloff': rolloff, 'flux': flux, 'harmonicity': harmonicity } def _compute_harmonicity(self, magnitude: npt.NDArray[np.float32], freqs: npt.NDArray[np.float32]) -> float: """Compute harmonicity score (0-1, higher = more harmonic/speech-like).""" # Find fundamental frequency candidate (peak in 80-400Hz range for speech) # Expanded F0 range for better detection speech_range = (freqs >= 60) & (freqs <= 500) # Expanded from 80-400Hz if not np.any(speech_range): return 0.0 speech_magnitude = magnitude[speech_range] speech_freqs = freqs[speech_range] if len(speech_magnitude) == 0: return 0.0 # Find strongest peak in speech range # More robust F0 detection - find peaks instead of just max try: # Import scipy here to handle missing dependency gracefully from scipy.signal import find_peaks # type: ignore # Ensure distance is at least 1 min_distance = max(1, int(len(speech_magnitude) * 0.05)) peaks, properties = find_peaks( # type: ignore speech_magnitude, height=np.max(speech_magnitude) * 0.05, # Lowered from 0.1 distance=min_distance, # Minimum peak separation ) if len(peaks) == 0: # type: ignore # Fallback to simple max if no peaks found f0_idx = np.argmax(speech_magnitude) else: # Use the strongest peak strongest_peak_idx = np.argmax(speech_magnitude[peaks]) f0_idx = int(peaks[strongest_peak_idx]) # type: ignore except ImportError: # scipy not available, use simple max f0_idx = np.argmax(speech_magnitude) f0 = speech_freqs[f0_idx] f0_strength = speech_magnitude[f0_idx] # More lenient F0 strength requirement if f0_strength < np.max(magnitude) * 0.03: # Reduced from 0.1 return 0.0 # Check for harmonics (2*f0, 3*f0, etc.) harmonic_strength = 0.0 total_harmonics = 0 for harmonic in range(2, 5): # Check 2nd through 4th harmonics harmonic_freq = f0 * harmonic if harmonic_freq > freqs[-1]: break # Find closest frequency bins (check neighboring bins too) harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq)) # Check a small neighborhood around the harmonic frequency start_idx = max(0, int(harmonic_idx) - 2) end_idx = min(len(magnitude), int(harmonic_idx) + 3) local_max = np.max(magnitude[start_idx:end_idx]) harmonic_strength += local_max total_harmonics += 1 if total_harmonics == 0: return 0.0 # Normalize and return harmonicity = (harmonic_strength / total_harmonics) / f0_strength return min(harmonicity, 1.0) class OpenVINOWhisperModel: """OpenVINO optimized Whisper model for Intel Arc B580.""" def __init__(self, model_id: str, config: OpenVINOConfig, device: str): self.model_id = model_id self.config = config self.device = device self.model_path = Path(model_id.replace("/", "_")) self.quantized_model_path = Path(f"{self.model_path}_quantized") self.processor: Optional[WhisperProcessor] = None self.ov_model: Optional[OVModelForSpeechSeq2Seq] = None self.is_quantized = False self._initialize_model() def _initialize_model(self) -> None: """Initialize processor and OpenVINO model with robust error handling.""" logger.info(f"Initializing OpenVINO Whisper model: {self.model_id}") try: # Initialize processor logger.info( f"Loading Whisper model '{self.model_id}' on device: {self.device}" ) self.processor = WhisperProcessor.from_pretrained( # type: ignore self.model_id, use_fast=True ) # type: ignore logger.info("Whisper processor loaded successfully") # Export the model to OpenVINO IR if not already converted self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained( # type: ignore self.model_id, export=True, device=self.device ) # type: ignore logger.info("Whisper model exported as OpenVINO IR") # # Try to load quantized model first if it exists # if self.config.enable_quantization and self.quantized_model_path.exists(): # if self._try_load_quantized_model(): # return # # Load or create FP16 model # if self.model_path.exists(): # self._load_fp16_model() # else: # self._convert_model() # # Try quantization after model is loaded and compiled # if self.config.enable_quantization and not self.is_quantized: # self._try_quantize_existing_model() except Exception as e: logger.error(f"Error initializing model: {e}") # Fallback to basic conversion without quantization self._fallback_initialization() def _fallback_initialization(self) -> None: """Fallback initialization without quantization.""" logger.warning("Falling back to basic OpenVINO conversion without quantization") try: if not self.model_path.exists(): self._convert_model_basic() self._load_fp16_model() except Exception as e: logger.error(f"Fallback initialization failed: {e}") raise RuntimeError("Failed to initialize OpenVINO model") from e def _convert_model(self) -> None: """Convert PyTorch model to OpenVINO format.""" logger.info(f"Converting {self.model_id} to OpenVINO format...") try: # Convert to OpenVINO with FP16 for Arc GPU ov_model = OVModelForSpeechSeq2Seq.from_pretrained( # type: ignore self.model_id, ov_config=self.config.to_ov_config(), export=True, compile=False, load_in_8bit=False, ) # Enable FP16 for Intel Arc performance if hasattr(ov_model, 'half'): ov_model.half() # type: ignore ov_model.save_pretrained(self.model_path) # type: ignore logger.info("Model converted and saved in FP16 format") # Load the converted model self.ov_model = ov_model # type: ignore self._compile_model() except Exception as e: logger.error(f"Model conversion failed: {e}") raise def _convert_model_basic(self) -> None: """Basic model conversion without advanced features.""" logger.info(f"Basic conversion of {self.model_id} to OpenVINO format...") ov_model = OVModelForSpeechSeq2Seq.from_pretrained(# type: ignore self.model_id, export=True, compile=False ) ov_model.save_pretrained(self.model_path)# type: ignore logger.info("Basic model conversion completed") def _load_fp16_model(self) -> None: """Load existing FP16 OpenVINO model.""" logger.info("Loading existing FP16 OpenVINO model...") try: self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(# type: ignore self.model_path, ov_config=self.config.to_ov_config(), compile=False ) # type: ignore self._compile_model() except Exception as e: logger.error(f"Failed to load FP16 model: {e}") # Try basic loading self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(# type: ignore self.model_path, compile=False ) # type: ignore self._compile_model() def _try_load_quantized_model(self) -> bool: """Try to load existing quantized model.""" try: logger.info("Loading existing INT8 quantized model...") self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(# type: ignore self.quantized_model_path, ov_config=self.config.to_ov_config(), compile=False, ) # type: ignore self._compile_model() self.is_quantized = True logger.info("Quantized model loaded successfully") return True except Exception as e: logger.warning(f"Failed to load quantized model: {e}") return False def _try_quantize_existing_model(self) -> None: """Try to quantize the existing model after it's loaded.""" if not QUANTIZATION_AVAILABLE: logger.info("Quantization libraries not available, skipping quantization") return if self.ov_model is None: logger.warning("No model loaded, cannot quantize") return # Check if model components are available if not hasattr(self.ov_model, "encoder"): logger.warning("Model encoder not available, skipping quantization") return if ( not hasattr(self.ov_model, "decoder_with_past") ): logger.warning( "Model decoder_with_past not available, skipping quantization" ) return try: logger.info("Attempting to quantize compiled model...") self._quantize_model_safe() except Exception as e: logger.warning(f"Quantization failed, continuing with FP16 model: {e}") def _quantize_model_safe(self) -> None: """Safely quantize the model with extensive error handling.""" if not nncf: logger.info("Quantization libraries not available, skipping quantization") return if self.quantized_model_path.exists(): logger.info("Quantized model already exists") return if self.ov_model is None: raise RuntimeError("No model to quantize") if not self.ov_model.decoder_with_past: raise RuntimeError("Model decoder_with_past not available") logger.info("Creating INT8 quantized model for Intel Arc B580...") try: # Collect calibration data with error handling calibration_data = self._collect_calibration_data_safe() if not calibration_data: logger.warning("No calibration data collected, skipping quantization") return # Quantize encoder if calibration_data.get("encoder"): logger.info("Quantizing encoder...") quantized_encoder = nncf.quantize( self.ov_model.encoder.model, nncf.Dataset(calibration_data["encoder"]), model_type=nncf.ModelType.TRANSFORMER, subset_size=min(len(calibration_data["encoder"]), 50), ) else: logger.warning("No encoder calibration data, copying original encoder") quantized_encoder = self.ov_model.encoder.model # Quantize decoder if calibration_data.get("decoder"): logger.info("Quantizing decoder with past...") quantized_decoder = nncf.quantize( self.ov_model.decoder_with_past.model, nncf.Dataset(calibration_data["decoder"]), model_type=nncf.ModelType.TRANSFORMER, subset_size=min(len(calibration_data["decoder"]), 50), ) else: logger.warning("No decoder calibration data, copying original decoder") quantized_decoder = self.ov_model.decoder_with_past.model # Save quantized models self.quantized_model_path.mkdir(parents=True, exist_ok=True) ov.save_model(# type: ignore quantized_encoder, self.quantized_model_path / "openvino_encoder_model.xml", ) # type: ignore ov.save_model(# type: ignore quantized_decoder, self.quantized_model_path / "openvino_decoder_with_past_model.xml", ) # type: ignore # type: ignore # Copy remaining files self._copy_model_files() # Clean up del quantized_encoder, quantized_decoder, calibration_data gc.collect() # Load quantized model if self._try_load_quantized_model(): logger.info("Quantization completed successfully") except Exception as e: logger.error(f"Quantization failed: {e}") # Clean up partial quantization if self.quantized_model_path.exists(): shutil.rmtree(self.quantized_model_path, ignore_errors=True) def _collect_calibration_data_safe( self, dataset_size: int = 20 ) -> Dict[str, CalibrationData]: """Safely collect calibration data with extensive error handling.""" if self.ov_model is None or self.processor is None: return {} logger.info(f"Collecting calibration data ({dataset_size} samples)...") # Check model components if not self.ov_model.encoder: logger.warning("Encoder not available for calibration") return {} if not self.ov_model.decoder_with_past: logger.warning("Decoder with past not available for calibration") return {} # Check if requests are available if ( not hasattr(self.ov_model.encoder, "request") or self.ov_model.encoder.request is None ): logger.warning("Encoder request not available for calibration") return {} if ( not hasattr(self.ov_model.decoder_with_past, "request") or self.ov_model.decoder_with_past.request is None ): logger.warning("Decoder request not available for calibration") return {} # Setup data collection original_encoder_request = self.ov_model.encoder.request original_decoder_request = self.ov_model.decoder_with_past.request encoder_data: CalibrationData = [] decoder_data: CalibrationData = [] try: self.ov_model.encoder.request = InferRequestWrapper(# type: ignore original_encoder_request, encoder_data# type: ignore ) # type: ignore self.ov_model.decoder_with_past.request = InferRequestWrapper( original_decoder_request, decoder_data ) # type: ignore # Generate synthetic calibration data instead of loading dataset logger.info("Generating synthetic calibration data...") for i in range(dataset_size): try: # Generate random audio similar to speech duration = 2.0 + np.random.random() * 3.0 # 2-5 seconds synthetic_audio = ( np.random.randn(int(16000 * duration)).astype(np.float32) * 0.1 ) inputs: Any = self.processor(# type: ignore synthetic_audio, sampling_rate=16000, return_tensors="pt" ) # type: ignore # Run inference to collect calibration data _ = self.ov_model.generate( # type: ignore inputs.input_features, max_new_tokens=10 # type: ignore ) if i % 5 == 0: logger.debug( f"Generated calibration sample {i + 1}/{dataset_size}" ) except Exception as e: logger.warning(f"Failed to generate calibration sample {i}: {e}") continue except Exception as e: logger.error(f"Error during calibration data collection: {e}") finally: # Restore original requests try: self.ov_model.encoder.request = original_encoder_request self.ov_model.decoder_with_past.request = original_decoder_request except Exception as e: logger.warning(f"Failed to restore original requests: {e}") result = {} if encoder_data: result["encoder"] = encoder_data logger.info(f"Collected {len(encoder_data)} encoder calibration samples") if decoder_data: result["decoder"] = decoder_data logger.info(f"Collected {len(decoder_data)} decoder calibration samples") return result # type: ignore def _copy_model_files(self) -> None: """Copy necessary model files for quantized model.""" try: # Copy config and first-step decoder if (self.model_path / "config.json").exists(): shutil.copy( self.model_path / "config.json", self.quantized_model_path / "config.json", ) decoder_xml = self.model_path / "openvino_decoder_model.xml" decoder_bin = self.model_path / "openvino_decoder_model.bin" if decoder_xml.exists(): shutil.copy( decoder_xml, self.quantized_model_path / "openvino_decoder_model.xml", ) if decoder_bin.exists(): shutil.copy( decoder_bin, self.quantized_model_path / "openvino_decoder_model.bin", ) except Exception as e: logger.warning(f"Failed to copy some model files: {e}") def _compile_model(self) -> None: """Compile model for Intel Arc B580.""" if self.ov_model is None: raise RuntimeError("Model not loaded") logger.info("Compiling model for Intel Arc B580...") try: self.ov_model.to(self.config.device) self.ov_model.compile() # Warmup for optimal performance self._warmup_model() logger.info("Model compiled and warmed up successfully") except Exception as e: logger.warning( f"Failed to compile for {self.config.device}, attempting safe CPU fallback: {e}" ) # Fallback: reload/compile model with a CPU-only ov_config to avoid # passing GPU-specific properties to the CPU plugin which can raise # NotFound/UnsupportedProperty exceptions. try: cpu_cfg = ( OpenVINOConfig(**{**self.config.model_dump()}) if hasattr(self.config, "model_dump") else self.config ) # Ensure device is CPU and use conservative CPU threading options cpu_cfg = OpenVINOConfig( device="CPU", cache_dir=self.config.cache_dir, enable_quantization=self.config.enable_quantization, throughput_streams=1, max_threads=self.config.max_threads, ) logger.info( "Reloading model with CPU-only OpenVINO config for safe compilation" ) # Try to reload using the existing saved model path if possible try: self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(# type: ignore self.model_path, ov_config=cpu_cfg.to_ov_config(), compile=False ) # type: ignore except Exception: # If loading the saved model failed, try loading without ov_config self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(# type: ignore self.model_path, compile=False ) # type: ignore # Compile on CPU if self.ov_model is not None: self.ov_model.to("CPU") # type: ignore # Provide CPU-only ov_config if supported try: self.ov_model.compile() # type: ignore except Exception as compile_cpu_e: logger.warning( f"CPU compile with CPU ov_config failed, retrying default compile: {compile_cpu_e}" ) self.ov_model.compile() # type: ignore self._warmup_model() logger.info("Model compiled for CPU successfully") except Exception as cpu_e: logger.error(f"Failed to compile for CPU as well: {cpu_e}") raise def _warmup_model(self) -> None: """Warmup model for consistent GPU performance.""" if self.ov_model is None or self.processor is None: return try: logger.info("Warming up model...") dummy_audio = np.random.randn(16000).astype(np.float32) # 1 second dummy_features = self.processor(# type: ignore dummy_audio, sampling_rate=16000, return_tensors="pt" ).input_features # Run warmup iterations for i in range(3): _ = self.ov_model.generate(dummy_features, max_new_tokens=10)# type: ignore if i == 0: logger.debug("First warmup iteration completed") except Exception as e: logger.warning(f"Model warmup failed: {e}") def decode( self, token_ids: torch.Tensor, skip_special_tokens: bool = True ) -> List[str]: """Decode token IDs to text.""" if self.processor is None: raise RuntimeError("Processor not initialized") return self.processor.batch_decode(# type: ignore token_ids, skip_special_tokens=skip_special_tokens ) # type: ignore # Global model instance with deferred loading _whisper_model: Optional[OpenVINOWhisperModel] = None _audio_processors: Dict[str, "OptimizedAudioProcessor"] = {} _send_chat_func: Optional[Callable[[ChatMessageModel], Awaitable[None]]] = None _create_chat_message_func: Optional[Callable[[str, Optional[str]], ChatMessageModel]] = None # Model loading status for video display _model_loading_status: str = "Not loaded" _model_loading_progress: float = 0.0 # Raw audio buffer for immediate graphing (now handled by WaveformVideoTrack.buffer) def _ensure_model_loaded(device: str = _device) -> OpenVINOWhisperModel: """Ensure the global model is loaded.""" global _whisper_model, _model_loading_status, _model_loading_progress if _whisper_model is None: setup_intel_arc_environment() logger.info(f"Loading OpenVINO Whisper model: {_model_id}") _model_loading_status = "Loading model..." _model_loading_progress = 0.1 _whisper_model = OpenVINOWhisperModel( model_id=_model_id, config=_ov_config, device=device ) _model_loading_status = "Model loaded successfully" _model_loading_progress = 1.0 logger.info("OpenVINO Whisper model loaded successfully") return _whisper_model def extract_input_features(audio_array: AudioArray, sampling_rate: int) -> torch.Tensor: """Extract input features from audio array optimized for OpenVINO.""" ov_model = _ensure_model_loaded() if ov_model.processor is None: raise RuntimeError("Processor not initialized") inputs = ov_model.processor(# type: ignore audio_array, sampling_rate=sampling_rate, return_tensors="pt", ) # type: ignore return inputs.input_features # type: ignore class VoiceActivityDetector(BaseModel): has_speech: bool = False energy: float = 0.0 zcr: float = 0.0 centroid: float = 0.0 def simple_robust_vad( audio_data: AudioArray, energy_threshold: float = 0.01, sample_rate: int = 16000, ) -> VoiceActivityDetector: """Simplified robust VAD.""" # Energy-based detection (RMS) energy = np.sqrt(np.mean(audio_data**2)) # Zero-crossing rate signs = np.sign(audio_data) signs[signs == 0] = 1 zcr = np.sum(np.abs(np.diff(signs))) / (2 * len(audio_data)) # Relaxed speech detection - use OR instead of AND for some conditions has_speech = ( energy > energy_threshold # Primary condition or (energy > energy_threshold * 0.5 and zcr > 0.05) # Secondary condition ) return VoiceActivityDetector( has_speech=has_speech, energy=energy, zcr=zcr, centroid=0.0 ) def enhanced_vad( audio_data: AudioArray, energy_threshold: float = 0.01, zcr_threshold: float = 0.1, sample_rate: int = 16000, ) -> VoiceActivityDetector: """Enhanced VAD using multiple features. Returns: tuple: (has_speech, metrics_dict) """ # Energy-based detection energy = np.sqrt(np.mean(audio_data**2)) # Zero-crossing rate for speech detection signs = np.sign(audio_data) signs[signs == 0] = 1 # Handle zeros zcr = np.sum(np.abs(np.diff(signs))) / (2 * len(audio_data)) # Spectral centroid for voice vs noise discrimination fft = np.fft.rfft(audio_data) magnitude = np.abs(fft) freqs = np.fft.rfftfreq(len(audio_data), 1 / sample_rate) if np.sum(magnitude) > 0: centroid = np.sum(freqs * magnitude) / np.sum(magnitude) else: centroid = 0 # Combined decision with configurable thresholds has_speech = ( energy > energy_threshold and zcr > zcr_threshold and 200 < centroid < 3000 # Human speech frequency range ) return VoiceActivityDetector( has_speech=has_speech, energy=energy, zcr=zcr, centroid=centroid ) class OptimizedAudioProcessor: """Optimized audio processor for Intel Arc B580 with reduced latency.""" def __init__( self, peer_name: str, send_chat_func: Callable[[ChatMessageModel], Awaitable[None]], create_chat_message_func: Callable[[str, Optional[str]], ChatMessageModel] ): self.peer_name = peer_name self.send_chat_func = send_chat_func self.create_chat_message_func = create_chat_message_func # Audio processing settings (use defaults, can be overridden per instance) self.sample_rate = 16000 # Default Whisper sample rate self.chunk_duration_ms = 100 # Default chunk duration self.chunk_size = int(self.sample_rate * self.chunk_duration_ms / 1000) # Silence handling parameters self.max_silence_frames = 30 # Default max silence frames self.max_trailing_silence_frames = 5 # Default trailing silence frames # VAD settings (use defaults, can be overridden per instance) self.vad_energy_threshold = 0.005 self.vad_zcr_min = 0.02 self.vad_zcr_max = 0.8 self.vad_spectral_centroid_min = 200 self.vad_spectral_centroid_max = 4000 self.vad_spectral_rolloff_threshold = 3000 self.vad_minimum_duration = 0.2 self.vad_max_history = 8 self.vad_noise_floor_energy = 0.001 self.vad_adaptation_rate = 0.05 self.vad_harmonic_threshold = 0.15 # Normalization settings self.normalization_enabled = True # Default normalization enabled self.normalization_target_peak = 0.7 # Default target peak self.max_normalization_gain = 10.0 # Default max gain # Initialize visualization buffer if not already done if self.peer_name not in WaveformVideoTrack.buffer: WaveformVideoTrack.buffer[self.peer_name] = np.array([], dtype=np.float32) # Optimized buffering parameters self.buffer_size = self.chunk_size * 50 # Circular buffer for zero-copy operations self.audio_buffer = np.zeros(self.buffer_size, dtype=np.float32) self.write_ptr = 0 self.read_ptr = 0 # Silence handling parameters self.silence_frames: int = 0 # Enhanced VAD parameters with EMA for noise adaptation self.advanced_vad = AdvancedVAD(sample_rate=self.sample_rate) # Track maximum observed absolute amplitude for this input stream # This is used optionally to normalize incoming audio to the "observed" # maximum which helps models expect a consistent level across peers. # It's intentionally permissive and capped to avoid amplifying noise. self.max_observed_amplitude: float = 1e-6 # Processing state self.current_phrase_audio = np.array([], dtype=np.float32) self.transcription_history: List[TranscriptionHistoryItem] = [] self.last_activity_time = time.time() self.last_audio_time = time.time() # Track when any audio chunk is received self.final_transcription_pending = False # Flag to prevent accumulating audio during final transcription # Current transcription message for refinements self.current_message: Optional[ChatMessageModel] = None # Async processing self.processing_queue: asyncio.Queue[AudioQueueItem] = asyncio.Queue(maxsize=10) self.is_running = True # Start async processing task try: self.main_loop = asyncio.get_running_loop() asyncio.create_task(self._async_processing_loop()) # Start inactivity watchdog to ensure finalization when frames stop arriving try: asyncio.create_task(self._silence_watchdog()) except Exception: logger.debug(f"Could not start silence watchdog task for {self.peer_name}") logger.info(f"Started async processing for {self.peer_name}") # Lock to serialize model.generate calls (OpenVINO model may not # be reentrant across concurrent generate calls). try: self._generate_lock = asyncio.Lock() except Exception: self._generate_lock = None except RuntimeError: # Fallback to thread-based processing self.main_loop = None self.processor_thread = threading.Thread( target=self._thread_processing_loop, daemon=True ) self.processor_thread.start() logger.warning(f"Using thread-based processing for {self.peer_name}") # For thread-fallback create a thread lock used if asyncio lock is unavailable self._generate_lock = threading.Lock() logger.info(f"OptimizedAudioProcessor initialized for {self.peer_name}") async def _silence_watchdog(self) -> None: """Watch for inactivity (no frames arriving) and queue a final transcription. This runs as a lightweight task and uses `last_audio_time` which is updated on any received audio frame. This makes finalization robust in the case where the remote peer simply stops sending frames (no non-speech frames will arrive to increment `silence_frames`). """ logger.debug(f"Silence watchdog started for {self.peer_name}") try: while self.is_running: await asyncio.sleep(0.5) try: if ( len(self.current_phrase_audio) > 0 and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT ): logger.info( f"Silence watchdog: no audio for {time.time() - self.last_audio_time:.2f}s, queuing final for {self.peer_name}" ) self._queue_final_transcription() except Exception as e: logger.debug(f"Silence watchdog error for {self.peer_name}: {e}") finally: logger.debug(f"Silence watchdog exiting for {self.peer_name}") def add_audio_data(self, audio_data: AudioArray) -> None: """Add audio data with enhanced Voice Activity Detection, preventing leading silence.""" if not self.is_running or len(audio_data) == 0: logger.error("Processor not running or empty audio data") return # Update last audio time whenever any audio is received self.last_audio_time = time.time() # Update max observed amplitude (used later for optional normalization) try: peak = float(np.max(np.abs(audio_data))) if audio_data.size > 0 else 0.0 if peak > self.max_observed_amplitude: self.max_observed_amplitude = float(peak) except Exception: # Be defensive - don't fail audio ingestion for amplitude tracking pass is_speech, vad_metrics = self.advanced_vad.analyze_frame(audio_data) # Update visualization status WaveformVideoTrack.speech_status[self.peer_name] = { 'is_speech': is_speech, **vad_metrics } # Log VAD decisions periodically if hasattr(self, '_vad_log_counter'): self._vad_log_counter += 1 else: self._vad_log_counter = 0 if self._vad_log_counter % 50 == 0: # Every 5 seconds at 100ms chunks logger.info(f"VAD Decision for {self.peer_name}: {is_speech}, " f"Energy: {vad_metrics['energy']:.3f}, " f"Harmonicity: {vad_metrics['harmonicity']:.2f}, " f"Noise floor: {vad_metrics['noise_floor_energy']:.4f}") # Speech processing logic if is_speech: self.silence_frames = 0 self.last_activity_time = time.time() self.final_transcription_pending = False # Reset flag when new speech is detected self._add_to_circular_buffer(audio_data) elif (len(self.current_phrase_audio) > 0 and self.silence_frames < self.max_trailing_silence_frames): self.silence_frames += 1 self._add_to_circular_buffer(audio_data) else: self.silence_frames += 1 if (self.silence_frames > self.max_silence_frames and len(self.current_phrase_audio) > 0): self._queue_final_transcription() return # Drop non-speech audio # Check if we should process if self._available_samples() >= self.chunk_size: self._queue_for_processing() def _add_to_circular_buffer(self, audio_data: AudioArray) -> None: """Add data to circular buffer efficiently.""" chunk_len = len(audio_data) if self.write_ptr + chunk_len <= self.buffer_size: # Simple case - no wraparound self.audio_buffer[self.write_ptr : self.write_ptr + chunk_len] = audio_data else: # Wraparound case first_part = self.buffer_size - self.write_ptr self.audio_buffer[self.write_ptr :] = audio_data[:first_part] self.audio_buffer[: chunk_len - first_part] = audio_data[first_part:] self.write_ptr = (self.write_ptr + chunk_len) % self.buffer_size def _available_samples(self) -> int: """Calculate available samples in circular buffer.""" if self.write_ptr >= self.read_ptr: return self.write_ptr - self.read_ptr else: return self.buffer_size - self.read_ptr + self.write_ptr def _extract_chunk(self, size: int) -> AudioArray: """Extract chunk from circular buffer.""" if self.read_ptr + size <= self.buffer_size: chunk = self.audio_buffer[self.read_ptr : self.read_ptr + size].copy() else: first_part = self.buffer_size - self.read_ptr chunk = np.concatenate( [ self.audio_buffer[self.read_ptr :], self.audio_buffer[: size - first_part], ] ) self.read_ptr = (self.read_ptr + size) % self.buffer_size return chunk.astype(np.float32) def _queue_for_processing(self) -> None: """Queue audio chunk for processing.""" available = self._available_samples() if available < self.chunk_size: return # Extract chunk for processing chunk = self._extract_chunk(self.chunk_size) # Create queue item queue_item = AudioQueueItem(audio=chunk, timestamp=time.time()) # Queue for processing if self.main_loop: try: self.processing_queue.put_nowait(queue_item) except asyncio.QueueFull: logger.warning( f"Processing queue full for {self.peer_name}, dropping chunk" ) else: # Thread-based fallback try: threading_queue = getattr(self, "_threading_queue", None) if threading_queue: threading_queue.put_nowait(queue_item) except Exception as e: logger.warning(f"Threading queue issue for {self.peer_name}: {e}") def _queue_final_transcription(self) -> None: """Queue final transcription of current phrase.""" # Always attempt to include any remaining samples in the circular # buffer when creating a final transcription. Because the thread # watchdog may call this method from a non-event-loop thread, we # schedule the actual drain + final transcription on the configured # main event loop. This avoids concurrent access to the circular # buffer pointers and ensures the final audio contains trailing # partial chunks that haven't reached `chunk_size` yet. async def _queue_final_coroutine(): # Prevent duplicate finals: if a final is already pending, skip if getattr(self, "final_transcription_pending", False): logger.info(f"Final transcription already pending for {self.peer_name}, skipping duplicate final queue.") return self.final_transcription_pending = True try: # Drain any samples remaining in the circular buffer available = 0 try: available = self._available_samples() if available > 0: tail = self._extract_chunk(available) if tail.size > 0: self.current_phrase_audio = np.concatenate( [self.current_phrase_audio, tail] ) except Exception as e: logger.debug(f"Failed to drain circular buffer for final: {e}") if len(self.current_phrase_audio) > self.sample_rate * 0.5: logger.info(f"Queueing final transcription for {self.peer_name} (drained={available if 'available' in locals() else 0})") # Send an immediate lightweight final marker so the UI receives a quick final event while the heavy generate runs in the background. try: marker_text = f"⚡ {self.peer_name}: (finalizing...)" message_id = self.current_message.id if self.current_message is not None else None cm = self.create_chat_message_func(marker_text, message_id) if self.current_message is None: try: self.current_message = cm except Exception: pass await self.send_chat_func(cm) logger.info(f"{self.peer_name}: sent immediate final marker") except Exception as e: logger.debug(f"Failed to send final marker for {self.peer_name}: {e}") # Run the blocking final transcription in a coroutine that offloads the heavy work to a threadpool (existing helper handles this). We await it here so we can clear state afterwards in the same coroutine context. try: await self._blocking_transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) except Exception as e: logger.error(f"Error running blocking final transcription coroutine: {e}") # Clear current phrase buffer after scheduling/completing final self.current_phrase_audio = np.array([], dtype=np.float32) finally: # Ensure the pending flag is cleared if something went wrong try: self.final_transcription_pending = False except Exception: pass # If we have an event loop available, schedule the coroutine there so # buffer operations happen on the loop and avoid races with the # producer side. If no main loop is available, fall back to running # the coroutine via create_task (best-effort) or thread executor. try: if self.main_loop is not None: try: asyncio.run_coroutine_threadsafe(_queue_final_coroutine(), self.main_loop) return except Exception as e: logger.debug(f"Failed to schedule final coroutine on main loop: {e}") # Fallback: try to create a task on the current loop try: asyncio.create_task(_queue_final_coroutine()) return except Exception: # As a last resort, run the coroutine synchronously in a new # event loop (blocking) so a final is still produced. import asyncio as _asyncio try: _loop = _asyncio.new_event_loop() _asyncio.set_event_loop(_loop) _loop.run_until_complete(_queue_final_coroutine()) finally: try: _asyncio.set_event_loop(None) except Exception: pass except Exception as e: logger.error(f"Unexpected error scheduling final transcription: {e}") async def _blocking_transcribe_and_send( self, audio_array: AudioArray, is_final: bool, language: str = "en" ) -> None: """Run the heavy generate+decode work inside a threadpool, then send the chat message on the event loop. This reduces reentrancy and resource contention with streaming transcriptions. """ loop = asyncio.get_event_loop() def blocking_work(audio_in: AudioArray) -> tuple[str, float]: try: # Ensure model is loaded in this thread/process ov_model = _ensure_model_loaded() # Extract features (this is relatively cheap but keep on thread) input_features = ov_model.processor(# type: ignore audio_in, sampling_rate=self.sample_rate, return_tensors="pt" ).input_features # type: ignore # Perform generation (blocking) # Use the same generation configuration as the async path # (higher-quality beam search) to avoid weaker final # transcriptions when using the blocking path. gen_cfg = GenerationConfig( max_length=448, num_beams=6, no_repeat_ngram_size=3, use_cache=True, early_stopping=True, max_new_tokens=128, ) # Serialize access to the underlying OpenVINO generation call # to avoid concurrency problems with the OpenVINO runtime. with _generate_global_lock: gen_out = ov_model.ov_model.generate(# type: ignore input_features, generation_config=gen_cfg# type: ignore ) # Try to extract sequences if present if hasattr(gen_out, "sequences"): # type: ignore ids = gen_out.sequences # type: ignore else: ids = gen_out # type: ignore # Decode text: str = "" try: text = ov_model.processor.batch_decode(ids, skip_special_tokens=True)[0].strip() # type: ignore except Exception: text = "" return text, 0.0 # type: ignore except Exception as e: logger.error(f"Blocking transcription failed for {self.peer_name}: {e}", exc_info=True) return "", 0.0 try: # Run blocking work in executor transcription, _ = await loop.run_in_executor(None, blocking_work, audio_array) if transcription: # Build message and send on event loop status_marker = "⚡" if is_final else "🎤" type_marker = "" if is_final else " [streaming]" message_text = f"{status_marker} {self.peer_name}{type_marker}: {transcription} (blocking final)" # Reuse existing message id for final update when possible message_id = self.current_message.id if self.current_message is not None else None chat_message = self.create_chat_message_func(message_text, message_id) await self.send_chat_func(chat_message) # After sending final, clear current_message so streaming restarts cleanly try: self.current_message = None except Exception: pass logger.info(f"{self.peer_name}: blocking final transcription sent: '{transcription}'") else: # If decode failed/returned empty, fallback to the most recent # streaming transcription from history (if any) and send it as # the final message. This ensures clients get a final marker. fallback_text = None try: if self.transcription_history: # Take last non-final streaming message if present for h in reversed(self.transcription_history): if not h.is_final: # Extract raw transcription portion from stored message fallback_text = h.message.split(": ", 1)[-1].split(" (🚀")[0] break # If none non-final found, take most recent entry if fallback_text is None: fallback_text = self.transcription_history[-1].message.split(": ", 1)[-1].split(" (🚀")[0] except Exception: fallback_text = None if fallback_text: message_text = f"⚡ {self.peer_name}: {fallback_text} (final - fallback)" message_id = self.current_message.id if self.current_message is not None else None chat_message = self.create_chat_message_func(message_text, message_id) await self.send_chat_func(chat_message) try: self.current_message = None except Exception: pass logger.info(f"{self.peer_name}: blocking final fallback sent: '{fallback_text}'") else: logger.info(f"{self.peer_name}: blocking final transcription produced no text and no fallback available") finally: # Always clear the pending flag when the blocking final finishes try: self.final_transcription_pending = False except Exception: pass async def _async_processing_loop(self) -> None: """Async processing loop for audio chunks.""" logger.info(f"Started async processing loop for {self.peer_name}") while self.is_running: try: # Get audio chunk audio_item = await asyncio.wait_for( self.processing_queue.get(), timeout=1.0 ) # Add to current phrase if not self.final_transcription_pending: self.current_phrase_audio = np.concatenate( [self.current_phrase_audio, audio_item.audio] ) # Check if we should transcribe phrase_duration = len(self.current_phrase_audio) / self.sample_rate if phrase_duration >= 1.0: # Transcribe every 1 second logger.info(f"Transcribing for {self.peer_name} (1s interval)") await self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=False ) except asyncio.TimeoutError: # Check for final transcription on timeout; use last_audio_time so # we also detect the case where frames simply stopped arriving. if ( len(self.current_phrase_audio) > 0 and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT ): logger.info( f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError, inactivity)" ) # Avoid duplicate finals: if a final is already pending # (for example the blocking final was queued), skip scheduling # another final. Otherwise set the pending flag and run the # final transcription. if not self.final_transcription_pending: # Drain any remaining circular-buffer samples into the # current phrase so trailing partial packets are included # in the final transcription. try: available = self._available_samples() if available > 0: tail = self._extract_chunk(available) if tail.size > 0: self.current_phrase_audio = np.concatenate([ self.current_phrase_audio, tail ]) except Exception as e: logger.debug(f"Failed to drain circular buffer before async final: {e}") self.final_transcription_pending = True await self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) else: logger.debug(f"Final already pending for {self.peer_name}; skipping async final") self.current_phrase_audio = np.array([], dtype=np.float32) except Exception as e: logger.error( f"Error in async processing loop for {self.peer_name}: {e}" ) # Final transcription for any remaining audio if len(self.current_phrase_audio) > 0 and not self.final_transcription_pending: logger.info(f"Final transcription for remaining audio in async loop for {self.peer_name}") await self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) self.current_phrase_audio = np.array([], dtype=np.float32) self.final_transcription_pending = False logger.info(f"Async processing loop ended for {self.peer_name}") def _thread_processing_loop(self) -> None: """Thread-based processing loop fallback.""" self._threading_queue: Queue[AudioQueueItem] = Queue(maxsize=10) logger.info(f"Started thread processing loop for {self.peer_name}") while self.is_running: try: audio_item = self._threading_queue.get(timeout=1.0) # Add to current phrase if not self.final_transcription_pending: self.current_phrase_audio = np.concatenate( [self.current_phrase_audio, audio_item.audio] ) # Check if we should transcribe phrase_duration = len(self.current_phrase_audio) / self.sample_rate if phrase_duration >= 1.0: if self.main_loop: logger.info( f"Transcribing from thread for {self.peer_name} (_thread_processing_loop > 1s interval)" ) asyncio.run_coroutine_threadsafe( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=False ), self.main_loop, ) except Empty: # Check for final transcription using last_audio_time so we react # if frames stop arriving entirely. if ( len(self.current_phrase_audio) > 0 and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT ): if self.main_loop: logger.info( f"Final transcription from thread for {self.peer_name} (inactivity)" ) # Delegate to the safe finalization path which drains the # circular buffer on the main loop and schedules the heavy # blocking transcription there. This avoids concurrent # buffer access races between threads. try: self._queue_final_transcription() except Exception: # As a fallback, try to schedule the transcription # directly on the main loop (best-effort). try: asyncio.run_coroutine_threadsafe( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ), self.main_loop, ) except Exception as e: logger.debug(f"Failed to schedule thread final fallback: {e}") self.current_phrase_audio = np.array([], dtype=np.float32) except Exception as e: logger.error( f"Error in thread processing loop for {self.peer_name}: {e}" ) # Final transcription for any remaining audio if len(self.current_phrase_audio) > 0 and not self.final_transcription_pending: if self.main_loop: logger.info(f"Final transcription for remaining audio in thread loop for {self.peer_name}") asyncio.run_coroutine_threadsafe( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ), self.main_loop, ) self.current_phrase_audio = np.array([], dtype=np.float32) self.final_transcription_pending = False def _start_thread_watchdog(self) -> None: """Start a lightweight thread-based watchdog when using the thread fallback. It periodically checks `last_audio_time` and queues final transcription if inactivity exceeds INACTIVITY_TIMEOUT. """ if hasattr(self, "_thread_watchdog") and self._thread_watchdog: return def watchdog(): logger.debug(f"Thread watchdog started for {self.peer_name}") try: while self.is_running: time.sleep(0.5) try: if ( len(self.current_phrase_audio) > 0 and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT ): logger.info( f"Thread watchdog: no audio for {time.time() - self.last_audio_time:.2f}s, queuing final for {self.peer_name}" ) self._queue_final_transcription() except Exception as e: logger.debug(f"Thread watchdog error for {self.peer_name}: {e}") finally: logger.debug(f"Thread watchdog exiting for {self.peer_name}") self._thread_watchdog = threading.Thread(target=watchdog, daemon=True) self._thread_watchdog.start() async def _transcribe_and_send( self, audio_array: AudioArray, is_final: bool, language: str = "en" ) -> None: """ Transcribe raw numpy audio data using OpenVINO Whisper. Parameters: - audio_array: 1D numpy array containing mono PCM data at 16 kHz. - is_final: whether this is a final transcription (True) or interim (False) - language: language code for transcription (default 'en' for English) """ if audio_array.ndim != 1: raise ValueError("Expected mono audio as a 1D numpy array.") # Do NOT reset final_transcription_pending here; keep it set until the # final transcription task completes to avoid races where new audio is # accumulated while a final transcription was requested. transcription_start = time.time() transcription_type = "final" if is_final else "streaming" try: audio_duration = len(audio_array) / self.sample_rate # Compute basic energy/peak metrics for filtering decisions audio_rms = float(np.sqrt(np.mean(audio_array**2))) audio_peak = float(np.max(np.abs(audio_array))) if audio_array.size > 0 else 0.0 # Short-burst filtering: drop very short bursts that are likely noise. # - If duration < 0.5s and RMS is very low -> drop # - If duration < 0.8s and peak is very small relative to the # max observed amplitude -> drop. This prevents single-packet # random noises from becoming transcriptions. short_duration_threshold = 0.5 relaxed_short_duration = 0.8 rms_min_threshold = 0.002 relative_peak_min_ratio = 0.05 if audio_duration < short_duration_threshold and audio_rms < rms_min_threshold: logger.debug( f"Skipping {transcription_type} transcription: short & quiet ({audio_duration:.2f}s, RMS {audio_rms:.6f})" ) return # If we have observed a stronger level on this stream, require a # sensible fraction of that to consider this burst valid. max_amp = getattr(self, "max_observed_amplitude", 0.0) or 0.0 if audio_duration < relaxed_short_duration and max_amp > 0.0: rel = audio_peak / (max_amp + 1e-12) if rel < relative_peak_min_ratio: logger.debug( f"Skipping {transcription_type} transcription: short burst with low relative peak ({audio_duration:.2f}s, rel {rel:.3f})" ) return # Very quiet audio - skip entirely if audio_rms < 0.001: logger.debug( f"Skipping {transcription_type} transcription: too quiet (RMS: {audio_rms:.6f})" ) return logger.info( f"🎬 OpenVINO transcription ({transcription_type}) started: {audio_duration:.2f}s, RMS: {audio_rms:.4f}" ) # Optionally normalize audio prior to feature extraction. We use # the historical maximum observed amplitude for this stream to # compute a conservative gain. The gain is clamped to avoid # amplifying noise excessively. audio_for_model = audio_array try: if getattr(self, "normalization_enabled", False): stream_max = getattr(self, "max_observed_amplitude", 0.0) or 0.0 # Use the larger of observed max and current peak to avoid # over-scaling when current chunk is the loudest. denom = max(stream_max, audio_peak, 1e-12) gain = float(self.normalization_target_peak) / denom # Clamp gain gain = max(min(gain, float(self.max_normalization_gain)), 0.25) if abs(gain - 1.0) > 1e-3: logger.debug(f"Applying normalization gain {gain:.3f} for {self.peer_name}") audio_for_model = np.clip(audio_array * gain, -0.999, 0.999).astype(np.float32) except Exception as e: logger.debug(f"Normalization step failed for {self.peer_name}: {e}") # Extract features for OpenVINO input_features = extract_input_features(audio_for_model, self.sample_rate) # logger.info(f"Features extracted for OpenVINO: {input_features.shape}") # GPU inference with OpenVINO ov_model = _ensure_model_loaded() generation_config = GenerationConfig( # Quality parameters max_length=448, num_beams=6, # Increase from 4 for better quality (slower) # temperature=0.8, # Set to 0 for deterministic, best-guess output -- not supported in OpenVINO # "length_penalty": 1.0, # Adjust to favor longer/shorter sequences no_repeat_ngram_size=3, # Confidence and alternatives # output_score=True, # Get token probabilities -- not supported in OpenVINO # return_dict_in_generate=True, # Get detailed output -- not supported in OpenVINO output_attentions=False, # Set True if you need attention weights num_return_sequences=1, # Set >1 to get alternatives max_new_tokens=128, # Limit response length # Task settings: Cannot specify `task` or `language` for an English-only model. # # If the model is intended to be multilingual, pass `is_multilingual=True` to generate, # # or update the generation config.ValueError: Cannot specify `task` or `language` for # # an English-only model. If the model is intended to be multilingual, pass # # `is_multilingual=True` to generate, or update the generation config # language=language, # task="transcribe", # Performance vs quality tradeoffs early_stopping=True, # Stop when EOS token is found use_cache=True, # Speed up decoding # Threshold parameters # logprob_threshold=-1.0, # Filter tokens below this log probability -- not supported in OpenVINO compression_ratio_threshold=2.4, # Reject if compression ratio too high # no_speech_threshold=0.6, # Threshold for detecting non-speech -- not supported in OpenVINO ) # Serialize calls to model.generate to avoid reentrancy issues and # add diagnostic logging so we can see whether generate/decoding # completes for final transcriptions. generation_output = None try: if is_final: logger.info(f"{self.peer_name}: attempting to acquire generate lock (final)") else: logger.debug(f"{self.peer_name}: attempting to acquire generate lock") if hasattr(self, "_generate_lock") and isinstance(self._generate_lock, asyncio.Lock): await self._generate_lock.acquire() try: if is_final: logger.info(f"{self.peer_name}: calling model.generate (async lock) (final)") else: logger.debug(f"{self.peer_name}: calling model.generate (async lock)") generation_output = ov_model.ov_model.generate( # type: ignore input_features, generation_config=generation_config ) finally: self._generate_lock.release() elif hasattr(self, "_generate_lock") and isinstance(self._generate_lock, threading.Lock): with self._generate_lock: if is_final: logger.info(f"{self.peer_name}: calling model.generate (thread lock) (final)") else: logger.debug(f"{self.peer_name}: calling model.generate (thread lock)") generation_output = ov_model.ov_model.generate( # type: ignore input_features, generation_config=generation_config ) else: if is_final: logger.info(f"{self.peer_name}: calling model.generate (no lock) (final)") else: logger.debug(f"{self.peer_name}: calling model.generate (no lock)") generation_output = ov_model.ov_model.generate( # type: ignore input_features, generation_config=generation_config ) if is_final: logger.info(f"{self.peer_name}: model.generate complete (final) (type={type(generation_output)})") else: logger.debug(f"{self.peer_name}: model.generate complete (type={type(generation_output)})") except Exception as e: logger.error(f"{self.peer_name}: model.generate failed: {e}", exc_info=True) raise # Many generate implementations return an object with a # `.sequences` attribute, so prefer that when available. if hasattr(generation_output, "sequences"): generated_ids = generation_output.sequences # type: ignore else: generated_ids = generation_output # # Extract transcription and scores # generated_ids = generation_output.sequences # # Get confidence scores if available # if hasattr(generation_output, "scores") and generation_output.scores: # # Calculate average confidence # token_probs = [] # for score in generation_output.scores: # probs = torch.nn.functional.softmax(score, dim=-1) # max_probs = torch.max(probs, dim=-1).values # token_probs.extend(max_probs.cpu().numpy()) # avg_confidence = np.mean(token_probs) if token_probs else 0.0 # min_confidence = np.min(token_probs) if token_probs else 0.0 # else: # avg_confidence = min_confidence = 0.0 # Decode text # Primary decode attempt transcription: str = "" try: transcription = ov_model.processor.batch_decode(# type: ignore generated_ids, skip_special_tokens=True )[0].strip() # type: ignore except Exception as decode_e: logger.warning(f"{self.peer_name}: primary decode failed: {decode_e}") # Fallback: if decode produced empty result, attempt to decode # `generation_output.sequences` (if not already used) or log details if not transcription: try: if hasattr(generation_output, "sequences") and ( generated_ids is not generation_output.sequences # type: ignore ): transcription = ov_model.processor.batch_decode(# type: ignore generation_output.sequences, skip_special_tokens=True # type: ignore )[0].strip() # type: ignore except Exception as fallback_e: logger.warning(f"{self.peer_name}: fallback decode failed: {fallback_e}") # Diagnostic logging if we still have no transcription if not transcription: try: if is_final: logger.info( f"{self.peer_name}: final transcription empty after decode" ) else: logger.debug( f"{self.peer_name}: streaming transcription empty after decode" ) except Exception: logger.debug(f"{self.peer_name}: generated_ids unavailable for diagnostics") if is_final: logger.info(f"{self.peer_name}: decoded transcription (final): '{transcription}'") else: logger.debug(f"{self.peer_name}: decoded transcription: '{transcription}'") transcription_time = time.time() - transcription_start # Apply confidence threshold # confidence_threshold = 0.7 # Adjustable # if avg_confidence < confidence_threshold: # logger.warning( # f"Low confidence transcription ({avg_confidence:.2f}): '{transcription}'" # ) # # Optionally retry with different parameters or skip # if avg_confidence < 0.5: # return # Skip very low confidence # # Include confidence in message # Create ChatMessageModel for transcription if transcription: # Create message with timing status_marker = "⚡" if is_final else "🎤" type_marker = "" if is_final else " [streaming]" timing_info = f" (🚀 {transcription_time:.2f}s)" message_text = f"{status_marker} {self.peer_name}{type_marker}: {transcription}{timing_info}" # Avoid duplicates for streaming updates, but always send final # transcriptions so the UI/clients receive the final marker even # if the text matches a recent interim result. if is_final or not self._is_duplicate(transcription): # type: ignore # Reuse the existing message ID when possible so the frontend # updates the streaming message into a final message instead # of creating a new one. If there is no current_message, a # new message will be created (message_id=None). message_id = self.current_message.id if self.current_message is not None else None # Create ChatMessageModel (reusing message_id when present) chat_message = self.create_chat_message_func(message_text, message_id) # Update current message for streaming; for final messages # clear the current_message after sending so future streams # start a new message. if not is_final: self.current_message = chat_message if is_final: logger.info(f"{self.peer_name}: sending chat message (final) -> '{message_text}'") else: logger.debug(f"{self.peer_name}: sending chat message (streaming) -> '{message_text}'") await self.send_chat_func(chat_message) # Maintain or clear the current_message depending on finality. if is_final: # Final message should update the existing message on the client. # After sending final, clear current_message so a future # streaming sequence starts a fresh message. try: self.current_message = None except Exception: pass logger.info(f"{self.peer_name}: send_chat_func completed for final message") else: # Streaming message remains current try: self.current_message = chat_message except Exception: pass logger.debug(f"{self.peer_name}: send_chat_func completed for streaming message") # Update history self.transcription_history.append( TranscriptionHistoryItem( message=message_text, timestamp=time.time(), is_final=is_final ) ) # Limit history if len(self.transcription_history) > 10: self.transcription_history.pop(0) logger.info( f"✅ OpenVINO transcription ({transcription_type}): '{transcription}' ({transcription_time:.3f}s)" ) else: logger.debug( f"Skipping duplicate {transcription_type} transcription: '{transcription}'" ) else: logger.debug( f"Empty or too short transcription result: '{transcription}'" ) except Exception as e: logger.error( f"Error in OpenVINO {transcription_type} transcription: {e}", exc_info=True, ) finally: # Only clear the pending flag after the transcription task completes if is_final: try: self.final_transcription_pending = False logger.debug(f"Cleared final_transcription_pending for {self.peer_name}") except Exception: pass def _is_duplicate(self, text: str) -> bool: """Check if transcription is duplicate of recent ones.""" recent_texts = [ h.message.split(": ", 1)[-1].split(" (🚀")[0] for h in self.transcription_history[-3:] ] return text in recent_texts def shutdown(self) -> None: """Shutdown the audio processor.""" logger.info(f"Shutting down OptimizedAudioProcessor for {self.peer_name}...") self.is_running = False # Final transcription if needed if len(self.current_phrase_audio) > 0: if self.main_loop: asyncio.create_task( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) ) # Cleanup thread if exists if hasattr(self, "processor_thread") and self.processor_thread.is_alive(): self.processor_thread.join(timeout=2.0) logger.info(f"OptimizedAudioProcessor shutdown complete for {self.peer_name}") async def calibrate_vad( audio_processor: OptimizedAudioProcessor, calibration_duration: float = 2.0 ) -> None: """Calibrate VAD thresholds based on ambient noise (placeholder for initial calibration).""" logger.info(f"Calibrating VAD for {audio_processor.peer_name}...") # Since EMA adapts on the fly, initial calibration can be minimal or skipped. # For better initial estimate, assume first few chunks are noise (handled in add_audio_data). await asyncio.sleep(calibration_duration) logger.info( f"VAD initial calibration complete: energy_threshold={audio_processor.vad_energy_threshold:.4f}" ) class MediaClock: """Simple monotonic clock for media tracks.""" def __init__(self) -> None: self.t0 = perf_counter() def now(self) -> float: return perf_counter() - self.t0 class WaveformVideoTrack(MediaStreamTrack): """Video track that renders a live waveform of the incoming audio. The track reads the most-active `OptimizedAudioProcessor` in `_audio_processors` and renders the last ~2s of its `current_phrase_audio`. If no audio is available, the track will display a "No audio" message. """ kind = "video" # Shared buffer for audio data buffer: Dict[str, npt.NDArray[np.float32]] = {} speech_status: Dict[str, Dict[str, Any]] = {} def __init__( self, session_name: str, width: int = 640, height: int = 480, fps: int = 15 ) -> None: super().__init__() self.session_name = session_name self.width = int(width) self.height = int(height) self.fps = int(fps) self.clock = MediaClock() self._next_frame_index = 0 async def next_timestamp(self) -> tuple[int, float]: pts = int(self._next_frame_index * (1 / self.fps) * 90000) time_base = 1 / 90000 return pts, time_base async def recv(self) -> VideoFrame: pts, _ = await self.next_timestamp() # schedule frame according to clock target_t = self._next_frame_index / self.fps now = self.clock.now() if target_t > now: await asyncio.sleep(target_t - now) self._next_frame_index += 1 frame_array: npt.NDArray[np.uint8] = np.zeros( (self.height, self.width, 3), dtype=np.uint8 ) # Display model loading status prominently status_text = _model_loading_status progress = _model_loading_progress # Draw status background (increased height for larger text) cv2.rectangle(frame_array, (0, 0), (self.width, 80), (0, 0, 0), -1) # Draw progress bar if loading if progress < 1.0 and "Ready" not in status_text: bar_width = int(progress * (self.width - 40)) cv2.rectangle(frame_array, (20, 55), (20 + bar_width, 70), (0, 255, 0), -1) cv2.rectangle( frame_array, (20, 55), (self.width - 20, 70), (255, 255, 255), 2 ) # Draw status text (larger font) cv2.putText( frame_array, f"Status: {status_text}", (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 255), 3, ) # Draw clock in lower right corner, right justified current_time = time.strftime("%H:%M:%S") (text_width, _), _ = cv2.getTextSize( current_time, cv2.FONT_HERSHEY_SIMPLEX, 1.0, 2 ) clock_x = self.width - text_width - 10 # 10px margin from right edge clock_y = self.height - 30 # Move to 450 for height=480 cv2.putText( frame_array, current_time, (clock_x, clock_y), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, ) # Select the most active audio buffer and get its speech status best_proc = None best_rms = 0.0 speech_info = None try: for pname, arr in self.__class__.buffer.items(): try: if len(arr) == 0: rms = 0.0 else: rms = float(np.sqrt(np.mean(arr**2))) if rms > best_rms: best_rms = rms best_proc = (pname, arr.copy()) speech_info = self.__class__.speech_status.get(pname, {}) except Exception: continue except Exception: best_proc = None if best_proc is not None: pname, arr = best_proc # Use the last 2 second of audio data, padded with zeros if less samples_needed = SAMPLE_RATE * 2 # 2 second(s) if len(arr) <= 0: arr_segment = np.zeros(samples_needed, dtype=np.float32) elif len(arr) >= samples_needed: arr_segment = arr[-samples_needed:].copy() else: # Pad with zeros at the beginning arr_segment = np.concatenate( [np.zeros(samples_needed - len(arr), dtype=np.float32), arr] ) # Single normalization code path: normalize based on the historical # peak observed for this stream (proc.max_observed_amplitude). This # ensures the waveform display is consistent over time and avoids # using the instantaneous buffer peak. proc = None norm = arr_segment.astype(np.float32) try: proc = _audio_processors.get(pname) if proc is not None and getattr(proc, "normalization_enabled", False): stream_max = getattr(proc, "max_observed_amplitude", 0.0) or 0.0 denom = max(stream_max, 1e-12) gain = float(proc.normalization_target_peak) / denom gain = max(min(gain, float(proc.max_normalization_gain)), 0.25) if abs(gain - 1.0) > 1e-6: norm = np.clip(arr_segment * gain, -1.0, 1.0).astype(np.float32) else: norm = arr_segment.astype(np.float32) else: norm = arr_segment.astype(np.float32) except Exception: # Fall back to raw samples if normalization computation fails proc = None norm = arr_segment.astype(np.float32) # Map audio samples to pixels across the width if norm.size < self.width: padded = np.zeros(self.width, dtype=np.float32) if norm.size > 0: padded[-norm.size :] = norm norm = padded else: block = int(np.ceil(norm.size / self.width)) norm = np.array( [ np.mean(norm[i * block : min((i + 1) * block, norm.size)]) for i in range(self.width) ], dtype=np.float32, ) # For display we use the same `norm` computed above (single code # path). Use `display_norm` alias to avoid confusion later in the # code but don't recompute normalization. display_norm = norm # Draw waveform with color coding for speech detection points: list[tuple[int, int]] = [] colors: list[tuple[int, int, int]] = [] # Color for each point for x in range(self.width): v = float(display_norm[x]) if x < display_norm.size and not np.isnan(display_norm[x]) else 0.0 y = int((1.0 - ((v + 1.0) / 2.0)) * (self.height - 120)) + 100 points.append((x, y)) # Color based on speech detection status is_speech = speech_info.get('is_speech', False) if speech_info else False energy_check = speech_info.get('energy_check', False) if speech_info else False if is_speech: colors.append((0, 255, 0)) # Green for detected speech elif energy_check: colors.append((255, 255, 0)) # Yellow for energy but not speech else: colors.append((128, 128, 128)) # Gray for background noise # Draw colored waveform if len(points) > 1: for i in range(len(points) - 1): cv2.line(frame_array, points[i], points[i+1], colors[i], 1) # Draw historical peak indicator (horizontal lines at +/-(target_peak)) try: if proc is not None and getattr(proc, "normalization_enabled", False): target_peak = float(getattr(proc, "normalization_target_peak", 0.0)) # Ensure target_peak is within [0, 1] target_peak = max(0.0, min(1.0, target_peak)) def _amp_to_y(a: float) -> int: return int((1.0 - ((a + 1.0) / 2.0)) * (self.height - 120)) + 100 top_y = _amp_to_y(target_peak) bot_y = _amp_to_y(-target_peak) # Draw thin magenta lines across the waveform area cv2.line(frame_array, (0, top_y), (self.width - 1, top_y), (255, 0, 255), 1) cv2.line(frame_array, (0, bot_y), (self.width - 1, bot_y), (255, 0, 255), 1) # Label the peak with small text near the right edge label = f"Peak:{target_peak:.2f}" (tw, _), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) lx = max(10, self.width - tw - 12) ly = max(12, top_y - 6) cv2.putText(frame_array, label, (lx, ly), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1) except Exception: # Non-critical: ignore any drawing errors pass # Add speech detection status overlay if speech_info: self._draw_speech_status(frame_array, speech_info, pname) cv2.putText( frame_array, f"Waveform: {pname}", (10, self.height - 15), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2, ) frame = VideoFrame.from_ndarray(frame_array, format="bgr24") frame.pts = pts frame.time_base = fractions.Fraction(1 / 90000).limit_denominator(1000000) return frame def _draw_speech_status(self, frame_array: npt.NDArray[np.uint8], speech_info: Dict[str, Any], pname: str): """Draw speech detection status information.""" y_offset = 100 # Main status is_speech = speech_info.get('is_speech', False) status_text = "SPEECH" if is_speech else "NOISE" status_color = (0, 255, 0) if is_speech else (128, 128, 128) adaptive_thresh = speech_info.get('adaptive_threshold', 0) cv2.putText(frame_array, f"{pname}: {status_text} (thresh: {adaptive_thresh:.4f})", (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, status_color, 2) # Detailed metrics (smaller text) metrics = [ f"Energy: {speech_info.get('energy', 0):.3f} ({'Y' if speech_info.get('energy_check', False) else 'N'})", f"ZCR: {speech_info.get('zcr', 0):.3f} ({'Y' if speech_info.get('zcr_check', False) else 'N'})", f"Spectral: {'Y' if 300 < speech_info.get('centroid', 0) < 3400 else 'N'}/{'Y' if speech_info.get('rolloff', 0) < 2000 else 'N'}/{'Y' if speech_info.get('flux', 0) > 0.01 else 'N'} ({'Y' if speech_info.get('spectral_check', False) else 'N'})", f"Harmonic: {speech_info.get('hamonicity', 0):.3f} ({'Y' if speech_info.get('harmonic_check', False) else 'N'})", f"Temporal: ({'Y' if speech_info.get('temporal_consistency', False) else 'N'})" ] for _, metric in enumerate(metrics): cv2.putText(frame_array, metric, (320, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) y_offset += 15 logic_result = "E:" + ("Y" if speech_info.get('energy_check', False) else "N") logic_result += " Z:" + ("Y" if speech_info.get('zcr_check', False) else "N") logic_result += " S:" + ("Y" if speech_info.get('spectral_check', False) else "N") logic_result += " H:" + ("Y" if speech_info.get('harmonic_check', False) else "N") logic_result += " T:" + ("Y" if speech_info.get('temporal_consistency', False) else "N") cv2.putText(frame_array, logic_result, (320, y_offset + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) # Noise floor indicator noise_floor = speech_info.get('noise_floor_energy', 0) cv2.putText(frame_array, f"Noise Floor: {noise_floor:.4f}", (10, y_offset + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) async def handle_track_received(peer: Peer, track: MediaStreamTrack) -> None: """Handle incoming audio tracks from WebRTC peers.""" logger.info( f"handle_track_received called for {peer.peer_name} with track kind: {track.kind}" ) global _audio_processors, _send_chat_func if track.kind != "audio": logger.info(f"Ignoring non-audio track from {peer.peer_name}: {track.kind}") return # Initialize raw audio buffer for immediate graphing if peer.peer_name not in WaveformVideoTrack.buffer: WaveformVideoTrack.buffer[peer.peer_name] = np.array([], dtype=np.float32) # Start background task to load model and create processor async def init_processor(): global _model_loading_status, _model_loading_progress try: # Load model asynchronously to avoid blocking frame reading _model_loading_status = "Initializing model loading..." _model_loading_progress = 0.0 loop = asyncio.get_event_loop() await loop.run_in_executor(None, _ensure_model_loaded) _model_loading_status = "Model loaded, creating processor..." _model_loading_progress = 0.8 logger.info(f"Creating OptimizedAudioProcessor for {peer.peer_name}") if _send_chat_func is None or _create_chat_message_func is None: logger.error(f"No send function available for {peer.peer_name}") _model_loading_status = "Error: No send function available" _model_loading_progress = 1.0 # Hide progress bar on error return _audio_processors[peer.peer_name] = OptimizedAudioProcessor( peer_name=peer.peer_name, send_chat_func=_send_chat_func, create_chat_message_func=_create_chat_message_func ) _model_loading_status = "Ready for transcription" _model_loading_progress = 1.0 logger.info(f"OptimizedAudioProcessor ready for {peer.peer_name}") except Exception as e: logger.error(f"Failed to initialize processor for {peer.peer_name}: {e}") _model_loading_status = f"Error: {str(e)[:50]}..." _model_loading_progress = 1.0 # Hide progress bar on error if peer.peer_name not in _audio_processors: if _send_chat_func is None or _create_chat_message_func is None: logger.error( f"Cannot create processor for {peer.peer_name}: no send_chat_func or create_chat_message_func" ) return # Start the processor initialization in background asyncio.create_task(init_processor()) # Start processing frames immediately (before processor is ready) logger.info(f"Starting frame processing loop for {peer.peer_name}") frame_count = 0 while True: try: frame = await track.recv() frame_count += 1 if frame_count % 100 == 0: logger.debug(f"Received {frame_count} frames from {peer.peer_name}") except MediaStreamError as e: logger.info(f"Audio stream ended for {peer.peer_name}: {e}") break except Exception as e: logger.error(f"Error receiving frame from {peer.peer_name}: {e}") break if isinstance(frame, AudioFrame): try: # Convert frame to numpy array audio_data = frame.to_ndarray() # Handle audio format conversion audio_data = _process_audio_frame(audio_data, frame) # Resample if needed if frame.sample_rate != SAMPLE_RATE: audio_data = _resample_audio( audio_data, frame.sample_rate, SAMPLE_RATE ) # Convert to float32 audio_data_float32 = audio_data.astype(np.float32) # Update visualization buffer immediately WaveformVideoTrack.buffer[peer.peer_name] = np.concatenate( [WaveformVideoTrack.buffer[peer.peer_name], audio_data_float32] ) # Limit buffer size to last 10 seconds max_samples = SAMPLE_RATE * 10 if len(WaveformVideoTrack.buffer[peer.peer_name]) > max_samples: WaveformVideoTrack.buffer[peer.peer_name] = ( WaveformVideoTrack.buffer[peer.peer_name][-max_samples:] ) # Process with optimized processor if available if peer.peer_name in _audio_processors: audio_processor = _audio_processors[peer.peer_name] audio_processor.add_audio_data(audio_data_float32) except Exception as e: logger.error(f"Error processing audio frame for {peer.peer_name}: {e}") continue # If processor already exists, just continue processing audio_processor = _audio_processors[peer.peer_name] logger.info(f"Continuing OpenVINO audio processing for {peer.peer_name}") try: frame_count = 0 logger.info(f"Entering frame processing loop for {peer.peer_name}") while True: try: logger.debug(f"Waiting for frame from {peer.peer_name}") frame = await track.recv() frame_count += 1 if frame_count == 1: logger.info(f"Received first frame from {peer.peer_name}") elif frame_count % 50 == 0: logger.info(f"Received {frame_count} frames from {peer.peer_name}") except MediaStreamError as e: logger.info(f"Audio stream ended for {peer.peer_name}: {e}") break except Exception as e: logger.error(f"Error receiving frame from {peer.peer_name}: {e}") break if isinstance(frame, AudioFrame): try: # Convert frame to numpy array audio_data = frame.to_ndarray() # Handle audio format conversion audio_data = _process_audio_frame(audio_data, frame) # Resample if needed if frame.sample_rate != SAMPLE_RATE: audio_data = _resample_audio( audio_data, frame.sample_rate, SAMPLE_RATE ) # Convert to float32 audio_data_float32 = audio_data.astype(np.float32) logger.debug( f"Processed audio frame {frame_count} from {peer.peer_name}: {len(audio_data_float32)} samples" ) # Process with optimized processor if available audio_processor.add_audio_data(audio_data_float32) except Exception as e: logger.error( f"Error processing audio frame for {peer.peer_name}: {e}" ) continue except Exception as e: logger.error( f"Unexpected error in audio processing for {peer.peer_name}: {e}", exc_info=True, ) finally: cleanup_peer_processor(peer.peer_name) def _process_audio_frame( audio_data: npt.NDArray[Any], frame: AudioFrame ) -> npt.NDArray[np.float32]: """Process audio frame format conversion.""" # Handle stereo to mono conversion if audio_data.ndim == 2: if audio_data.shape[0] == 1: audio_data = audio_data.squeeze(0) else: audio_data = np.mean( audio_data, axis=0 if audio_data.shape[0] > audio_data.shape[1] else 1 ) # Normalize based on data type if audio_data.dtype == np.int16: audio_data = audio_data.astype(np.float32) / 32768.0 elif audio_data.dtype == np.int32: audio_data = audio_data.astype(np.float32) / 2147483648.0 return audio_data.astype(np.float32) def _resample_audio( audio_data: npt.NDArray[np.float32], orig_sr: int, target_sr: int ) -> npt.NDArray[np.float32]: """Resample audio efficiently.""" try: # Handle stereo audio by converting to mono if necessary if audio_data.ndim > 1: audio_data = np.mean(audio_data, axis=1) # Use high-quality resampling resampled = librosa.resample( # type: ignore audio_data.astype(np.float64), orig_sr=orig_sr, target_sr=target_sr, res_type="kaiser_fast", # Good balance of quality and speed ) return resampled.astype(np.float32) # type: ignore except Exception as e: logger.error(f"Resampling failed: {e}") raise ValueError( f"Failed to resample audio from {orig_sr} Hz to {target_sr} Hz: {e}" ) # Public API functions def agent_info() -> Dict[str, str]: return {"name": AGENT_NAME, "description": AGENT_DESCRIPTION, "has_media": "true", "configurable": "true"} def get_config_schema() -> Dict[str, Any]: """Get the configuration schema for the Whisper bot""" return { "bot_name": AGENT_NAME, "version": "1.0", "parameters": [ { "name": "model_id", "type": "select", "label": "Whisper Model", "description": "The Whisper model to use for transcription", "default_value": _model_id, "required": True, "options": [ {"value": "distil-whisper/distil-large-v2", "label": "Distil-Whisper Large v2 (Fast)"}, {"value": "distil-whisper/distil-medium.en", "label": "Distil-Whisper Medium EN"}, {"value": "distil-whisper/distil-small.en", "label": "Distil-Whisper Small EN"}, {"value": "openai/whisper-large-v3", "label": "OpenAI Whisper Large v3"}, {"value": "openai/whisper-large-v2", "label": "OpenAI Whisper Large v2"}, {"value": "openai/whisper-large", "label": "OpenAI Whisper Large"}, {"value": "openai/whisper-medium", "label": "OpenAI Whisper Medium"}, {"value": "openai/whisper-small", "label": "OpenAI Whisper Small"}, {"value": "openai/whisper-base", "label": "OpenAI Whisper Base"}, {"value": "openai/whisper-tiny", "label": "OpenAI Whisper Tiny"} ] }, { "name": "device", "type": "select", "label": "Inference Device", "description": "The device to run inference on", "default_value": _device, "required": True, "options": [ {"value": "GPU.1", "label": "Intel Arc GPU (GPU.1)"}, {"value": "GPU", "label": "GPU"}, {"value": "CPU", "label": "CPU"} ] }, { "name": "enable_quantization", "type": "boolean", "label": "Enable Quantization", "description": "Enable INT8 quantization for faster inference", "default_value": _ov_config.enable_quantization, "required": False }, { "name": "throughput_streams", "type": "range", "label": "Throughput Streams", "description": "Number of parallel inference streams", "default_value": _ov_config.throughput_streams, "required": False, "min_value": 1, "max_value": 8, "step": 1 }, { "name": "max_threads", "type": "range", "label": "Max CPU Threads", "description": "Maximum number of CPU threads for inference", "default_value": _ov_config.max_threads, "required": False, "min_value": 1, "max_value": 16, "step": 1 }, { "name": "sample_rate", "type": "number", "label": "Sample Rate", "description": "Audio sample rate in Hz", "default_value": SAMPLE_RATE, "required": True, "min_value": 8000, "max_value": 48000 }, { "name": "chunk_duration_ms", "type": "range", "label": "Chunk Duration (ms)", "description": "Duration of audio chunks in milliseconds", "default_value": CHUNK_DURATION_MS, "required": False, "min_value": 50, "max_value": 500, "step": 10 }, { "name": "vad_threshold", "type": "range", "label": "VAD Threshold", "description": "Voice activity detection threshold", "default_value": 0.01, "required": False, "min_value": 0.001, "max_value": 0.1, "step": 0.001 }, { "name": "max_silence_frames", "type": "range", "label": "Max Silence Frames", "description": "Maximum frames of silence before stopping", "default_value": MAX_SILENCE_FRAMES, "required": False, "min_value": 10, "max_value": 100, "step": 5 }, { "name": "max_trailing_silence_frames", "type": "range", "label": "Max Trailing Silence Frames", "description": "Maximum trailing silence frames to include", "default_value": MAX_TRAILING_SILENCE_FRAMES, "required": False, "min_value": 1, "max_value": 20, "step": 1 }, { "name": "vad_energy_threshold", "type": "range", "label": "VAD Energy Threshold", "description": "Energy threshold for voice activity detection", "default_value": 0.005, "required": False, "min_value": 0.001, "max_value": 0.05, "step": 0.001 }, { "name": "vad_zcr_min", "type": "range", "label": "VAD ZCR Min", "description": "Minimum zero-crossing rate for speech", "default_value": 0.02, "required": False, "min_value": 0.01, "max_value": 0.5, "step": 0.01 }, { "name": "vad_zcr_max", "type": "range", "label": "VAD ZCR Max", "description": "Maximum zero-crossing rate for speech", "default_value": 0.8, "required": False, "min_value": 0.1, "max_value": 1.0, "step": 0.05 }, { "name": "vad_spectral_centroid_min", "type": "range", "label": "VAD Spectral Centroid Min", "description": "Minimum spectral centroid for speech", "default_value": 200, "required": False, "min_value": 50, "max_value": 1000, "step": 50 }, { "name": "vad_spectral_centroid_max", "type": "range", "label": "VAD Spectral Centroid Max", "description": "Maximum spectral centroid for speech", "default_value": 4000, "required": False, "min_value": 1000, "max_value": 8000, "step": 500 }, { "name": "vad_spectral_rolloff_threshold", "type": "range", "label": "VAD Spectral Rolloff Threshold", "description": "Spectral rolloff threshold for speech detection", "default_value": 3000, "required": False, "min_value": 1000, "max_value": 10000, "step": 500 }, { "name": "vad_minimum_duration", "type": "range", "label": "VAD Minimum Duration", "description": "Minimum duration for speech segments", "default_value": 0.2, "required": False, "min_value": 0.1, "max_value": 1.0, "step": 0.1 }, { "name": "vad_max_history", "type": "range", "label": "VAD Max History", "description": "Maximum history frames for temporal consistency", "default_value": 8, "required": False, "min_value": 4, "max_value": 20, "step": 1 }, { "name": "vad_noise_floor_energy", "type": "range", "label": "VAD Noise Floor Energy", "description": "Initial noise floor energy level", "default_value": 0.001, "required": False, "min_value": 0.0001, "max_value": 0.01, "step": 0.0001 }, { "name": "vad_adaptation_rate", "type": "range", "label": "VAD Adaptation Rate", "description": "Rate of noise floor adaptation", "default_value": 0.05, "required": False, "min_value": 0.01, "max_value": 0.2, "step": 0.01 }, { "name": "vad_harmonic_threshold", "type": "range", "label": "VAD Harmonic Threshold", "description": "Threshold for harmonic content detection", "default_value": 0.15, "required": False, "min_value": 0.05, "max_value": 0.5, "step": 0.05 } , { "name": "normalization_enabled", "type": "boolean", "label": "Enable Normalization", "description": "Normalize incoming audio based on observed peak amplitude before transcription and visualization", "default_value": True, "required": False }, { "name": "normalization_target_peak", "type": "number", "label": "Normalization Target Peak", "description": "Target peak (0-1) used when normalizing audio", "default_value": 0.7, "required": False, "min_value": 0.5, "max_value": 1.0 }, { "name": "max_normalization_gain", "type": "range", "label": "Max Normalization Gain", "description": "Maximum allowed gain applied during normalization", "default_value": 10.0, "required": False, "min_value": 1.0, "max_value": 10.0, "step": 0.1 } ], "categories": [ {"Model Settings": ["model_id", "device", "enable_quantization"]}, {"Performance Settings": ["throughput_streams", "max_threads"]}, {"Audio Settings": ["sample_rate", "chunk_duration_ms", "normalization_enabled", "normalization_target_peak", "max_normalization_gain"]}, {"Voice Activity Detection": ["vad_threshold", "max_silence_frames", "max_trailing_silence_frames", "vad_energy_threshold", "vad_zcr_min", "vad_zcr_max", "vad_spectral_centroid_min", "vad_spectral_centroid_max", "vad_spectral_rolloff_threshold", "vad_minimum_duration", "vad_max_history", "vad_noise_floor_energy", "vad_adaptation_rate", "vad_harmonic_threshold"]} ] } def handle_config_update(lobby_id: str, config_values: Dict[str, Any]) -> bool: """Handle configuration update for a specific lobby""" global _model_id, _device, _ov_config try: logger.info(f"Updating Whisper config for lobby {lobby_id}: {config_values}") config_applied = False # Update model configuration (global - affects all instances) if "model_id" in config_values: new_model_id = config_values["model_id"] if new_model_id in [model for models in model_ids.values() for model in models]: _model_id = new_model_id config_applied = True logger.info(f"Updated model_id to: {_model_id}") else: logger.warning(f"Invalid model_id: {new_model_id}") # Update device configuration (global - affects all instances) if "device" in config_values: new_device = config_values["device"] # type: ignore available_devices = [d["name"] for d in get_available_devices()] if new_device in available_devices or new_device in ["CPU", "GPU", "GPU.1"]: _device = new_device _ov_config.device = new_device config_applied = True logger.info(f"Updated device to: {_device}") else: logger.warning(f"Invalid device: {new_device}, available: {available_devices}") # Update OpenVINO configuration (global - affects all instances) if "enable_quantization" in config_values: _ov_config.enable_quantization = bool(config_values["enable_quantization"]) config_applied = True logger.info(f"Updated quantization to: {_ov_config.enable_quantization}") if "throughput_streams" in config_values: streams = int(config_values["throughput_streams"]) if 1 <= streams <= 8: _ov_config.throughput_streams = streams config_applied = True logger.info(f"Updated throughput_streams to: {_ov_config.throughput_streams}") if "max_threads" in config_values: threads = int(config_values["max_threads"]) if 1 <= threads <= 16: _ov_config.max_threads = threads config_applied = True logger.info(f"Updated max_threads to: {_ov_config.max_threads}") # Update audio processing parameters for existing processors if "sample_rate" in config_values: rate = int(config_values["sample_rate"]) if 8000 <= rate <= 48000: # Update existing processors for pname, proc in list(_audio_processors.items()): try: proc.sample_rate = rate proc.chunk_size = int(proc.sample_rate * proc.chunk_duration_ms / 1000) logger.info(f"Updated sample_rate to {rate} for processor: {pname}") except Exception: logger.debug(f"Failed to update sample_rate for processor: {pname}") config_applied = True logger.info(f"Updated sample_rate to: {rate}") if "chunk_duration_ms" in config_values: duration = int(config_values["chunk_duration_ms"]) if 50 <= duration <= 500: # Update existing processors for pname, proc in list(_audio_processors.items()): try: proc.chunk_duration_ms = duration proc.chunk_size = int(proc.sample_rate * proc.chunk_duration_ms / 1000) logger.info(f"Updated chunk_duration_ms to {duration} for processor: {pname}") except Exception: logger.debug(f"Failed to update chunk_duration_ms for processor: {pname}") config_applied = True logger.info(f"Updated chunk_duration_ms to: {duration}") if "max_silence_frames" in config_values: frames = int(config_values["max_silence_frames"]) if 10 <= frames <= 100: # Update existing processors for pname, proc in list(_audio_processors.items()): try: proc.max_silence_frames = frames logger.info(f"Updated max_silence_frames to {frames} for processor: {pname}") except Exception: logger.debug(f"Failed to update max_silence_frames for processor: {pname}") config_applied = True logger.info(f"Updated max_silence_frames to: {frames}") if "max_trailing_silence_frames" in config_values: frames = int(config_values["max_trailing_silence_frames"]) if 1 <= frames <= 20: # Update existing processors for pname, proc in list(_audio_processors.items()): try: proc.max_trailing_silence_frames = frames logger.info(f"Updated max_trailing_silence_frames to {frames} for processor: {pname}") except Exception: logger.debug(f"Failed to update max_trailing_silence_frames for processor: {pname}") config_applied = True logger.info(f"Updated max_trailing_silence_frames to: {frames}") # Update VAD configuration for existing processors vad_updates = False if "vad_energy_threshold" in config_values: threshold = float(config_values["vad_energy_threshold"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_energy_threshold = threshold logger.info(f"Updated vad_energy_threshold to {threshold} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_energy_threshold for processor: {pname}") vad_updates = True if "vad_zcr_min" in config_values: zcr_min = float(config_values["vad_zcr_min"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_zcr_min = zcr_min logger.info(f"Updated vad_zcr_min to {zcr_min} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_zcr_min for processor: {pname}") vad_updates = True if "vad_zcr_max" in config_values: zcr_max = float(config_values["vad_zcr_max"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_zcr_max = zcr_max logger.info(f"Updated vad_zcr_max to {zcr_max} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_zcr_max for processor: {pname}") vad_updates = True if "vad_spectral_centroid_min" in config_values: centroid_min = int(config_values["vad_spectral_centroid_min"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_spectral_centroid_min = centroid_min logger.info(f"Updated vad_spectral_centroid_min to {centroid_min} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_spectral_centroid_min for processor: {pname}") vad_updates = True if "vad_spectral_centroid_max" in config_values: centroid_max = int(config_values["vad_spectral_centroid_max"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_spectral_centroid_max = centroid_max logger.info(f"Updated vad_spectral_centroid_max to {centroid_max} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_spectral_centroid_max for processor: {pname}") vad_updates = True if "vad_spectral_rolloff_threshold" in config_values: rolloff = int(config_values["vad_spectral_rolloff_threshold"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_spectral_rolloff_threshold = rolloff logger.info(f"Updated vad_spectral_rolloff_threshold to {rolloff} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_spectral_rolloff_threshold for processor: {pname}") vad_updates = True if "vad_minimum_duration" in config_values: duration = float(config_values["vad_minimum_duration"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_minimum_duration = duration logger.info(f"Updated vad_minimum_duration to {duration} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_minimum_duration for processor: {pname}") vad_updates = True if "vad_max_history" in config_values: history = int(config_values["vad_max_history"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_max_history = history logger.info(f"Updated vad_max_history to {history} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_max_history for processor: {pname}") vad_updates = True if "vad_noise_floor_energy" in config_values: noise_floor = float(config_values["vad_noise_floor_energy"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_noise_floor_energy = noise_floor logger.info(f"Updated vad_noise_floor_energy to {noise_floor} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_noise_floor_energy for processor: {pname}") vad_updates = True if "vad_adaptation_rate" in config_values: adaptation_rate = float(config_values["vad_adaptation_rate"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_adaptation_rate = adaptation_rate logger.info(f"Updated vad_adaptation_rate to {adaptation_rate} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_adaptation_rate for processor: {pname}") vad_updates = True if "vad_harmonic_threshold" in config_values: harmonic_threshold = float(config_values["vad_harmonic_threshold"]) for pname, proc in list(_audio_processors.items()): try: proc.vad_harmonic_threshold = harmonic_threshold logger.info(f"Updated vad_harmonic_threshold to {harmonic_threshold} for processor: {pname}") except Exception: logger.debug(f"Failed to update vad_harmonic_threshold for processor: {pname}") vad_updates = True if vad_updates: config_applied = True logger.info("VAD configuration updated for existing processors") # Normalization updates: apply to existing processors norm_updates = False if "normalization_enabled" in config_values: enabled = bool(config_values["normalization_enabled"]) for pname, proc in list(_audio_processors.items()): try: proc.normalization_enabled = enabled logger.info(f"Updated normalization_enabled to {enabled} for processor: {pname}") except Exception: logger.debug(f"Failed to update normalization_enabled for processor: {pname}") norm_updates = True if "normalization_target_peak" in config_values: target_peak = float(config_values["normalization_target_peak"]) for pname, proc in list(_audio_processors.items()): try: proc.normalization_target_peak = target_peak logger.info(f"Updated normalization_target_peak to {target_peak} for processor: {pname}") except Exception: logger.debug(f"Failed to update normalization_target_peak for processor: {pname}") norm_updates = True if "max_normalization_gain" in config_values: max_gain = float(config_values["max_normalization_gain"]) for pname, proc in list(_audio_processors.items()): try: proc.max_normalization_gain = max_gain logger.info(f"Updated max_normalization_gain to {max_gain} for processor: {pname}") except Exception: logger.debug(f"Failed to update max_normalization_gain for processor: {pname}") norm_updates = True if norm_updates: config_applied = True logger.info("Normalization configuration updated for existing processors") if config_applied: logger.info(f"Configuration update completed for lobby {lobby_id}") else: logger.warning(f"No valid configuration changes applied for lobby {lobby_id}") return config_applied except Exception as e: logger.error(f"Failed to apply Whisper config update: {e}") return False def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]: """Create agent tracks. Provides a synthetic video waveform track and a silent audio track for compatibility.""" class SilentAudioTrack(MediaStreamTrack): kind = "audio" def __init__( self, sample_rate: int = SAMPLE_RATE, channels: int = 1, fps: int = 50 ): super().__init__() self.sample_rate = sample_rate self.channels = channels self.fps = fps self.samples_per_frame = int(self.sample_rate / self.fps) self._timestamp = 0 async def recv(self) -> AudioFrame: # Generate silent audio as int16 (required by aiortc) data = np.zeros((self.channels, self.samples_per_frame), dtype=np.int16) frame = AudioFrame.from_ndarray( data, layout="mono" if self.channels == 1 else "stereo" ) frame.sample_rate = self.sample_rate frame.pts = self._timestamp frame.time_base = fractions.Fraction(1, self.sample_rate) self._timestamp += self.samples_per_frame await asyncio.sleep(1 / self.fps) return frame try: video_track = WaveformVideoTrack( session_name=session_name, width=640, height=480, fps=15 ) audio_track = SilentAudioTrack() return {"video": video_track, "audio": audio_track} except Exception as e: logger.error(f"Failed to create agent tracks: {e}") return {} async def handle_chat_message( chat_message: ChatMessageModel, send_message_func: Callable[[Union[str, ChatMessageModel]], Awaitable[None]] ) -> Optional[str]: """Handle incoming chat messages.""" return None async def on_track_received(peer: Peer, track: MediaStreamTrack) -> None: """Callback when a new track is received from a peer.""" await handle_track_received(peer, track) def get_track_handler() -> Callable[[Peer, MediaStreamTrack], Awaitable[None]]: """Return the track handler function.""" return on_track_received def bind_send_chat_function(send_chat_func: Callable[[ChatMessageModel], Awaitable[None]], create_chat_message_func: Callable[[str, Optional[str]], ChatMessageModel]) -> None: """Bind the send chat function.""" global _send_chat_func, _create_chat_message_func, _audio_processors logger.info("Binding send chat function to OpenVINO whisper agent") _send_chat_func = send_chat_func _create_chat_message_func = create_chat_message_func # Update existing processors for peer_name, processor in _audio_processors.items(): processor.send_chat_func = send_chat_func processor.create_chat_message_func = create_chat_message_func logger.debug(f"Updated processor for {peer_name} with new send chat function") def cleanup_peer_processor(peer_name: str) -> None: """Clean up processor for disconnected peer.""" global _audio_processors if peer_name in _audio_processors: logger.info(f"Cleaning up processor for {peer_name}") processor = _audio_processors[peer_name] processor.shutdown() del _audio_processors[peer_name] logger.info(f"Processor cleanup complete for {peer_name}") if peer_name in WaveformVideoTrack.buffer: del WaveformVideoTrack.buffer[peer_name] def get_active_processors() -> Dict[str, OptimizedAudioProcessor]: """Get active processors for debugging.""" return _audio_processors.copy() def get_model_info() -> Dict[str, Any]: """Get information about the loaded model.""" ov_model = _ensure_model_loaded() return { "model_id": _model_id, "device": _ov_config.device, "quantization_enabled": _ov_config.enable_quantization, "is_quantized": ov_model.is_quantized, "sample_rate": SAMPLE_RATE, "chunk_duration_ms": CHUNK_DURATION_MS, "loading_status": _model_loading_status, "loading_progress": _model_loading_progress, }