diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index e5a6add..e17c791 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -17,6 +17,8 @@ from typing import Dict, Optional, Callable, Awaitable, Any, List, Union from pathlib import Path import numpy.typing as npt from pydantic import BaseModel, Field, ConfigDict +from typing import Tuple + # Core dependencies import librosa @@ -246,6 +248,185 @@ def setup_intel_arc_environment() -> None: logger.info("Intel Arc B580 environment variables configured") +class AdvancedVAD: + """Advanced Voice Activity Detection with noise rejection.""" + + def __init__(self, sample_rate: int = SAMPLE_RATE): + self.sample_rate = sample_rate + # More conservative thresholds + self.energy_threshold = 0.02 # Increased from 0.01 + self.zcr_min = 0.1 + self.zcr_max = 0.5 # Reject very high ZCR (digital noise) + self.spectral_centroid_min = 300 # Human speech starts around 300Hz + self.spectral_centroid_max = 3400 # Human speech typically under 3400Hz + self.spectral_rolloff_threshold = 2000 # Speech energy concentrated lower + self.minimum_duration = 0.3 # Require 300ms of consistent speech + + # Temporal consistency tracking + self.speech_history = [] + self.max_history = 10 # Track last 10 frames (1 second at 100ms chunks) + + # Adaptive noise floor + self.noise_floor_energy = 0.001 + self.noise_floor_centroid = 1000 + self.adaptation_rate = 0.02 + + def analyze_frame(self, audio_data: AudioArray) -> Tuple[bool, dict]: + """Analyze audio frame for speech vs noise.""" + + # Basic energy features + energy = np.sqrt(np.mean(audio_data**2)) + + # Zero-crossing rate + signs = np.sign(audio_data) + signs[signs == 0] = 1 + zcr = np.sum(np.abs(np.diff(signs))) / (2 * len(audio_data)) + + # Spectral features + spectral_features = self._compute_spectral_features(audio_data) + + # Individual feature checks + energy_check = energy > max(self.energy_threshold, self.noise_floor_energy * 3) + zcr_check = self.zcr_min < zcr < self.zcr_max + spectral_check = ( + self.spectral_centroid_min < spectral_features['centroid'] < self.spectral_centroid_max + and spectral_features['rolloff'] < self.spectral_rolloff_threshold + and spectral_features['flux'] > 0.01 # Spectral change indicates speech + ) + + # Harmonicity check (speech has harmonic structure) + harmonic_check = spectral_features['harmonicity'] > 0.3 + + # Combined decision with temporal consistency + frame_has_speech = ( + energy_check and + zcr_check and + spectral_check and + harmonic_check + ) + + # Update history + self.speech_history.append(frame_has_speech) + if len(self.speech_history) > self.max_history: + self.speech_history.pop(0) + + # Require temporal consistency (at least 3 of last 5 frames) + recent_speech = sum(self.speech_history[-5:]) >= 3 if len(self.speech_history) >= 5 else frame_has_speech + + # Update noise floor during silence + if not frame_has_speech: + self.noise_floor_energy = ( + self.adaptation_rate * energy + + (1 - self.adaptation_rate) * self.noise_floor_energy + ) + self.noise_floor_centroid = ( + self.adaptation_rate * spectral_features['centroid'] + + (1 - self.adaptation_rate) * self.noise_floor_centroid + ) + + metrics = { + 'energy': energy, + 'zcr': zcr, + 'centroid': spectral_features['centroid'], + 'rolloff': spectral_features['rolloff'], + 'flux': spectral_features['flux'], + 'harmonicity': spectral_features['harmonicity'], + 'noise_floor_energy': self.noise_floor_energy, + 'energy_check': energy_check, + 'zcr_check': zcr_check, + 'spectral_check': spectral_check, + 'harmonic_check': harmonic_check, + 'temporal_consistency': recent_speech + } + + return recent_speech, metrics + + def _compute_spectral_features(self, audio_data: AudioArray) -> dict: + """Compute spectral features for speech detection.""" + + # Apply window to reduce spectral leakage + windowed = audio_data * np.hanning(len(audio_data)) + + # FFT + fft_data = np.fft.rfft(windowed) + magnitude = np.abs(fft_data) + freqs = np.fft.rfftfreq(len(windowed), 1/self.sample_rate) + + # Avoid division by zero + if np.sum(magnitude) == 0: + return { + 'centroid': 0, 'rolloff': 0, 'flux': 0, 'harmonicity': 0 + } + + # Spectral centroid + centroid = np.sum(freqs * magnitude) / np.sum(magnitude) + + # Spectral rolloff (frequency below which 85% of energy is contained) + cumsum = np.cumsum(magnitude) + rolloff_point = 0.85 * cumsum[-1] + rolloff_idx = np.where(cumsum >= rolloff_point)[0] + rolloff = freqs[rolloff_idx[0]] if len(rolloff_idx) > 0 else freqs[-1] + + # Spectral flux (measure of spectral change) + if hasattr(self, '_prev_magnitude'): + flux = np.sum((magnitude - self._prev_magnitude) ** 2) + else: + flux = 0 + self._prev_magnitude = magnitude.copy() + + # Harmonicity (detect harmonic structure typical of speech) + harmonicity = self._compute_harmonicity(magnitude, freqs) + + return { + 'centroid': centroid, + 'rolloff': rolloff, + 'flux': flux, + 'harmonicity': harmonicity + } + + def _compute_harmonicity(self, magnitude: np.ndarray, freqs: np.ndarray) -> float: + """Compute harmonicity score (0-1, higher = more harmonic/speech-like).""" + + # Find fundamental frequency candidate (peak in 80-400Hz range for speech) + speech_range = (freqs >= 80) & (freqs <= 400) + if not np.any(speech_range): + return 0.0 + + speech_magnitude = magnitude[speech_range] + speech_freqs = freqs[speech_range] + + if len(speech_magnitude) == 0: + return 0.0 + + # Find strongest peak in speech range + f0_idx = np.argmax(speech_magnitude) + f0 = speech_freqs[f0_idx] + f0_strength = speech_magnitude[f0_idx] + + if f0_strength < np.max(magnitude) * 0.1: # F0 should be reasonably strong + return 0.0 + + # Check for harmonics (2*f0, 3*f0, etc.) + harmonic_strength = 0.0 + total_harmonics = 0 + + for harmonic in range(2, 6): # Check 2nd through 5th harmonics + harmonic_freq = f0 * harmonic + if harmonic_freq > freqs[-1]: + break + + # Find closest frequency bin + harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq)) + harmonic_strength += magnitude[harmonic_idx] + total_harmonics += 1 + + if total_harmonics == 0: + return 0.0 + + # Normalize by fundamental strength and number of harmonics + harmonicity = (harmonic_strength / total_harmonics) / f0_strength + return min(harmonicity, 1.0) + class OpenVINOWhisperModel: """OpenVINO optimized Whisper model for Intel Arc B580.""" @@ -758,34 +939,39 @@ def extract_input_features(audio_array: AudioArray, sampling_rate: int) -> torch ) return inputs.input_features + class VoiceActivityDetector(BaseModel): has_speech: bool = False energy: float = 0.0 zcr: float = 0.0 centroid: float = 0.0 + def simple_robust_vad( audio_data: AudioArray, energy_threshold: float = 0.01, sample_rate: int = SAMPLE_RATE, ) -> VoiceActivityDetector: """Simplified robust VAD.""" - + # Energy-based detection (RMS) energy = np.sqrt(np.mean(audio_data**2)) - + # Zero-crossing rate signs = np.sign(audio_data) signs[signs == 0] = 1 zcr = np.sum(np.abs(np.diff(signs))) / (2 * len(audio_data)) - + # Relaxed speech detection - use OR instead of AND for some conditions has_speech = ( - energy > energy_threshold or # Primary condition - (energy > energy_threshold * 0.5 and zcr > 0.05) # Secondary condition + energy > energy_threshold # Primary condition + or (energy > energy_threshold * 0.5 and zcr > 0.05) # Secondary condition ) - - return VoiceActivityDetector(has_speech=has_speech, energy=energy, zcr=zcr, centroid=0.0) + + return VoiceActivityDetector( + has_speech=has_speech, energy=energy, zcr=zcr, centroid=0.0 + ) + def enhanced_vad( audio_data: AudioArray, @@ -823,7 +1009,9 @@ def enhanced_vad( and 200 < centroid < 3000 # Human speech frequency range ) - return VoiceActivityDetector(has_speech=has_speech, energy=energy, zcr=zcr, centroid=centroid) + return VoiceActivityDetector( + has_speech=has_speech, energy=energy, zcr=zcr, centroid=centroid + ) class OptimizedAudioProcessor: @@ -849,24 +1037,13 @@ class OptimizedAudioProcessor: self.write_ptr = 0 self.read_ptr = 0 - # Enhanced VAD parameters with EMA for noise adaptation - self.vad_energy_threshold: float = VAD_THRESHOLD - self.vad_zcr_threshold: float = 0.1 - self.noise_energy_ema: float = 0.001 # Initial noise estimate - self.noise_zcr_ema: float = 0.05 - self.ema_alpha: float = 0.05 # Adaptation rate - self.energy_multiplier: float = 3.0 # Threshold = noise_ema * multiplier - self.zcr_multiplier: float = 2.0 - self.min_energy_threshold: float = 0.005 - self.min_zcr_threshold: float = 0.05 - + # Silence handling parameters self.silence_frames: int = 0 self.max_silence_frames: int = MAX_SILENCE_FRAMES self.max_trailing_silence_frames: int = MAX_TRAILING_SILENCE_FRAMES - - # VAD metrics tracking for adaptive thresholds - self.vad_metrics_history: list[VoiceActivityDetector] = [] - self.adaptive_threshold_enabled: bool = True + + # Enhanced VAD parameters with EMA for noise adaptation + self.advanced_vad = AdvancedVAD(sample_rate=self.sample_rate) # Processing state self.current_phrase_audio = np.array([], dtype=np.float32) @@ -899,99 +1076,41 @@ class OptimizedAudioProcessor: logger.error("Processor not running or empty audio data") return - vad_metrics = simple_robust_vad( - audio_data, - energy_threshold=0.01, # self.vad_energy_threshold, - sample_rate=self.sample_rate, - ) - # Use enhanced VAD - # vad_metrics = enhanced_vad( - # audio_data, - # energy_threshold=self.vad_energy_threshold, - # zcr_threshold=self.vad_zcr_threshold, - # sample_rate=self.sample_rate, - # ) + is_speech, vad_metrics = self.advanced_vad.analyze_frame(audio_data) - # Update noise estimates if no speech - if not vad_metrics.has_speech: - self.noise_energy_ema = ( - self.ema_alpha * vad_metrics.energy - + (1 - self.ema_alpha) * self.noise_energy_ema - ) - # self.noise_zcr_ema = ( - # self.ema_alpha * vad_metrics.zcr - # + (1 - self.ema_alpha) * self.noise_zcr_ema - # ) + # Update visualization status + WaveformVideoTrack.speech_status[self.peer_name] = { + 'is_speech': is_speech, + **vad_metrics + } - # Adapt thresholds - self.vad_energy_threshold = max( - # self.noise_energy_ema * self.energy_multiplier, self.min_energy_threshold - self.noise_energy_ema * 2.0, 0.005 - ) - # self.vad_zcr_threshold = max( - # self.noise_zcr_ema * self.zcr_multiplier, self.min_zcr_threshold - # ) - - # Store metrics for additional tracking - self.vad_metrics_history.append(vad_metrics) - if len(self.vad_metrics_history) > 100: - self.vad_metrics_history.pop(0) - - # Log detailed VAD decision occasionally for debugging - if len(self.vad_metrics_history) % 50 == 0: - logger.debug( - f"VAD metrics for {self.peer_name}: " - f"energy={vad_metrics.energy:.4f}, " - f"zcr={vad_metrics.zcr:.4f}, " - f"centroid={vad_metrics.centroid:.1f}Hz, " - f"speech={vad_metrics.has_speech}, " - f"noise_energy_ema={self.noise_energy_ema:.4f}, " - f"threshold={self.vad_energy_threshold:.4f}" - ) - - # Decision logic to avoid leading silence and limit trailing silence - if vad_metrics.has_speech: - logger.info(f"Speech detected for {self.peer_name}: {vad_metrics} (current phrase length: {len(self.current_phrase_audio) / self.sample_rate})") + # Log VAD decisions periodically + if hasattr(self, '_vad_log_counter'): + self._vad_log_counter += 1 + else: + self._vad_log_counter = 0 + + if self._vad_log_counter % 50 == 0: # Every 5 seconds at 100ms chunks + logger.info(f"VAD Decision for {self.peer_name}: {is_speech}, " + f"Energy: {vad_metrics['energy']:.3f}, " + f"Harmonicity: {vad_metrics['harmonicity']:.2f}, " + f"Noise floor: {vad_metrics['noise_floor_energy']:.4f}") + + # Speech processing logic + if is_speech: self.silence_frames = 0 self.last_activity_time = time.time() self._add_to_circular_buffer(audio_data) - # Update visualization buffer - WaveformVideoTrack.buffer[self.peer_name] = np.concatenate([WaveformVideoTrack.buffer[self.peer_name], audio_data]) - # Limit buffer size to last 10 seconds - max_samples = SAMPLE_RATE * 10 - if len(WaveformVideoTrack.buffer[self.peer_name]) > max_samples: - WaveformVideoTrack.buffer[self.peer_name] = WaveformVideoTrack.buffer[self.peer_name][-max_samples:] - elif ( - len(self.current_phrase_audio) > 0 - and self.silence_frames < self.max_trailing_silence_frames - ): - logger.info(f"Trailing silence accepted for {self.peer_name}") + elif (len(self.current_phrase_audio) > 0 and + self.silence_frames < self.max_trailing_silence_frames): self.silence_frames += 1 self._add_to_circular_buffer(audio_data) - # Update visualization buffer - WaveformVideoTrack.buffer[self.peer_name] = np.concatenate([WaveformVideoTrack.buffer[self.peer_name], audio_data]) - # Limit buffer size to last 10 seconds - max_samples = SAMPLE_RATE * 10 - if len(WaveformVideoTrack.buffer[self.peer_name]) > max_samples: - WaveformVideoTrack.buffer[self.peer_name] = WaveformVideoTrack.buffer[self.peer_name][-max_samples:] else: - if (self.silence_frames % 10 == 0) and (self.silence_frames > 0): - logger.info( - f"VAD metrics for {self.peer_name}: " - f"energy={vad_metrics.energy:.4f}, " - f"zcr={vad_metrics.zcr:.4f}, " - f"centroid={vad_metrics.centroid:.1f}Hz, " - f"speech={vad_metrics.has_speech}, " - f"noise_energy_ema={self.noise_energy_ema:.4f}, " - f"threshold={self.vad_energy_threshold:.4f}" - ) self.silence_frames += 1 - if ( - self.silence_frames > self.max_silence_frames - and len(self.current_phrase_audio) > 0 - ): + if (self.silence_frames > self.max_silence_frames and + len(self.current_phrase_audio) > 0): self._queue_final_transcription() - return # Drop pure silence chunks (leading or excessive trailing) + return # Drop non-speech audio # Check if we should process if self._available_samples() >= self.chunk_size: @@ -1110,7 +1229,9 @@ class OptimizedAudioProcessor: len(self.current_phrase_audio) > 0 and time.time() - self.last_activity_time > 2.0 ): - logger.info(f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError)") + logger.info( + f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError)" + ) await self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) @@ -1141,7 +1262,9 @@ class OptimizedAudioProcessor: if phrase_duration >= 1.0: if self.main_loop: - logger.info(f"Transcribing from thread for {self.peer_name} (_thread_processing_loop > 1s interval)") + logger.info( + f"Transcribing from thread for {self.peer_name} (_thread_processing_loop > 1s interval)" + ) asyncio.run_coroutine_threadsafe( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=False @@ -1156,7 +1279,9 @@ class OptimizedAudioProcessor: and time.time() - self.last_activity_time > 2.0 ): if self.main_loop: - logger.info(f"Final transcription from thread for {self.peer_name}") + logger.info( + f"Final transcription from thread for {self.peer_name}" + ) asyncio.run_coroutine_threadsafe( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True @@ -1397,6 +1522,7 @@ class WaveformVideoTrack(MediaStreamTrack): # Shared buffer for audio data buffer: Dict[str, npt.NDArray[np.float32]] = {} + speech_status: Dict[str, dict] = {} def __init__( self, session_name: str, width: int = 640, height: int = 480, fps: int = 15 @@ -1472,13 +1598,14 @@ class WaveformVideoTrack(MediaStreamTrack): 2, ) - # Select the most active audio buffer (highest RMS) and draw its waveform + # Select the most active audio buffer and get its speech status best_proc = None best_rms = 0.0 + speech_info = None + try: for pname, arr in self.__class__.buffer.items(): try: - # Allow empty arrays to be selected if len(arr) == 0: rms = 0.0 else: @@ -1486,6 +1613,7 @@ class WaveformVideoTrack(MediaStreamTrack): if rms > best_rms: best_rms = rms best_proc = (pname, arr.copy()) + speech_info = self.__class__.speech_status.get(pname, {}) except Exception: continue except Exception: @@ -1502,7 +1630,9 @@ class WaveformVideoTrack(MediaStreamTrack): arr_segment = arr[-samples_needed:].copy() else: # Pad with zeros at the beginning - arr_segment = np.concatenate([np.zeros(samples_needed - len(arr), dtype=np.float32), arr]) + arr_segment = np.concatenate( + [np.zeros(samples_needed - len(arr), dtype=np.float32), arr] + ) # Assume arr_segment is already in [-1, 1] norm = arr_segment @@ -1523,22 +1653,34 @@ class WaveformVideoTrack(MediaStreamTrack): dtype=np.float32, ) - # Create polyline points, avoid NaN + # Draw waveform with color coding for speech detection points: list[tuple[int, int]] = [] + colors: list[tuple[int, int, int]] = [] # Color for each point + for x in range(self.width): v = float(norm[x]) if x < norm.size and not np.isnan(norm[x]) else 0.0 - y = int((1.0 - ((v + 1.0) / 2.0)) * (self.height - 80)) + 80 + y = int((1.0 - ((v + 1.0) / 2.0)) * (self.height - 120)) + 100 points.append((x, y)) + + # Color based on speech detection status + is_speech = speech_info.get('is_speech', False) if speech_info else False + energy_check = speech_info.get('energy_check', False) if speech_info else False + + if is_speech: + colors.append((0, 255, 0)) # Green for detected speech + elif energy_check: + colors.append((255, 255, 0)) # Yellow for energy but not speech + else: + colors.append((128, 128, 128)) # Gray for background noise + # Draw colored waveform if len(points) > 1: - pts_np = np.array(points, dtype=np.int32) - cv2.polylines( - frame_array, - [pts_np], - isClosed=False, - color=(255, 255, 255), - thickness=4, - ) + for i in range(len(points) - 1): + cv2.line(frame_array, points[i], points[i+1], colors[i], 2) + + # Add speech detection status overlay + if speech_info: + self._draw_speech_status(frame_array, speech_info, pname) cv2.putText( frame_array, @@ -1549,47 +1691,51 @@ class WaveformVideoTrack(MediaStreamTrack): (255, 255, 255), 2, ) - else: - # Draw a test sine wave to verify drawing works - import math - test_points: list[tuple[int, int]] = [] - for x in range(self.width): - # Create a sine wave with some amplitude - v = 0.5 * math.sin(2 * math.pi * x / self.width * 4) # 4 cycles across width - y = int((1.0 - ((v + 1.0) / 2.0)) * (self.height - 80)) + 80 - test_points.append((x, y)) - - if len(test_points) > 1: - pts_np = np.array(test_points, dtype=np.int32) - cv2.polylines( - frame_array, - [pts_np], - isClosed=False, - color=(255, 255, 255), - thickness=4, - ) - - # Show buffer status - buffer_keys = len(self.__class__.buffer) - total_samples = sum(len(arr) for arr in self.__class__.buffer.values()) - cv2.putText( - frame_array, - f"Test waveform - Buffer: {buffer_keys} keys, {total_samples} samples", - (10, 400), - cv2.FONT_HERSHEY_SIMPLEX, - 0.8, - (255, 255, 255), - 2, - ) frame = VideoFrame.from_ndarray(frame_array, format="bgr24") frame.pts = pts frame.time_base = fractions.Fraction(1 / 90000).limit_denominator(1000000) return frame - + def _draw_speech_status(self, frame_array: np.ndarray, speech_info: dict, pname: str): + """Draw speech detection status information.""" + + y_offset = 100 + line_height = 25 + + # Main status + is_speech = speech_info.get('is_speech', False) + status_text = "🎤 SPEECH" if is_speech else "🔇 NOISE" + status_color = (0, 255, 0) if is_speech else (128, 128, 128) + + cv2.putText(frame_array, f"{pname}: {status_text}", + (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, status_color, 2) + + # Detailed metrics (smaller text) + metrics = [ + f"Energy: {speech_info.get('energy', 0):.3f} ({'✓' if speech_info.get('energy_check', False) else '✗'})", + f"ZCR: {speech_info.get('zcr', 0):.3f} ({'✓' if speech_info.get('zcr_check', False) else '✗'})", + f"Spectral: ({'✓' if speech_info.get('spectral_check', False) else '✗'})", + f"Harmonic: ({'✓' if speech_info.get('harmonic_check', False) else '✗'})", + f"Temporal: ({'✓' if speech_info.get('temporal_consistency', False) else '✗'})" + ] + + for i, metric in enumerate(metrics): + cv2.putText(frame_array, metric, + (320, y_offset + i * 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, + (255, 255, 255), 1) + + # Noise floor indicator + noise_floor = speech_info.get('noise_floor_energy', 0) + cv2.putText(frame_array, f"Noise Floor: {noise_floor:.4f}", + (10, y_offset + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, + (200, 200, 200), 1) + async def handle_track_received(peer: Peer, track: MediaStreamTrack) -> None: """Handle incoming audio tracks from WebRTC peers.""" + logger.info( + f"handle_track_received called for {peer.peer_name} with track kind: {track.kind}" + ) global _audio_processors, _send_chat_func if track.kind != "audio": @@ -1600,6 +1746,32 @@ async def handle_track_received(peer: Peer, track: MediaStreamTrack) -> None: if peer.peer_name not in WaveformVideoTrack.buffer: WaveformVideoTrack.buffer[peer.peer_name] = np.array([], dtype=np.float32) + # Start background task to load model and create processor + async def init_processor(): + global _model_loading_status, _model_loading_progress + # Load model asynchronously to avoid blocking frame reading + _model_loading_status = "Initializing model loading..." + _model_loading_progress = 0.0 + + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _ensure_model_loaded) + + _model_loading_status = "Model loaded, creating processor..." + _model_loading_progress = 0.8 + + logger.info(f"Creating OptimizedAudioProcessor for {peer.peer_name}") + if _send_chat_func is None: + logger.error(f"No send_chat_func available for {peer.peer_name}") + _model_loading_status = "Error: No send function available" + return + _audio_processors[peer.peer_name] = OptimizedAudioProcessor( + peer_name=peer.peer_name, send_chat_func=_send_chat_func + ) + + _model_loading_status = "Ready for transcription" + _model_loading_progress = 1.0 + logger.info(f"OptimizedAudioProcessor ready for {peer.peer_name}") + if peer.peer_name not in _audio_processors: if _send_chat_func is None: logger.error( @@ -1607,82 +1779,63 @@ async def handle_track_received(peer: Peer, track: MediaStreamTrack) -> None: ) return - # Start background task to load model and create processor - async def init_processor(): - global _model_loading_status, _model_loading_progress - # Load model asynchronously to avoid blocking frame reading - _model_loading_status = "Initializing model loading..." - _model_loading_progress = 0.0 - - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _ensure_model_loaded) - - _model_loading_status = "Model loaded, creating processor..." - _model_loading_progress = 0.8 - - logger.info(f"Creating OptimizedAudioProcessor for {peer.peer_name}") - if _send_chat_func is None: - logger.error(f"No send_chat_func available for {peer.peer_name}") - _model_loading_status = "Error: No send function available" - return - _audio_processors[peer.peer_name] = OptimizedAudioProcessor( - peer_name=peer.peer_name, send_chat_func=_send_chat_func - ) - - audio_processor = _audio_processors[peer.peer_name] - # asyncio.create_task(calibrate_vad(audio_processor)) - - _model_loading_status = "Ready for transcription" - _model_loading_progress = 1.0 - logger.info(f"Starting OpenVINO audio processing for {peer.peer_name}") - - # Now start processing frames - frame_count = 0 - while True: - try: - frame = await track.recv() - frame_count += 1 - - if frame_count % 100 == 0: - logger.debug( - f"Received {frame_count} frames from {peer.peer_name}" - ) - - except MediaStreamError as e: - logger.info(f"Audio stream ended for {peer.peer_name}: {e}") - break - except Exception as e: - logger.error(f"Error receiving frame from {peer.peer_name}: {e}") - break - - if isinstance(frame, AudioFrame): - try: - # Convert frame to numpy array - audio_data = frame.to_ndarray() - - # Handle audio format conversion - audio_data = _process_audio_frame(audio_data, frame) - - # Resample if needed - if frame.sample_rate != SAMPLE_RATE: - audio_data = _resample_audio( - audio_data, frame.sample_rate, SAMPLE_RATE - ) - - # Convert to float32 - audio_data_float32 = audio_data.astype(np.float32) - - # Process with optimized processor - audio_processor.add_audio_data(audio_data_float32) - - except Exception as e: - logger.error( - f"Error processing audio frame for {peer.peer_name}: {e}" - ) - continue - + # Start the processor initialization in background asyncio.create_task(init_processor()) - return # Exit early, processor is handled in the background + + # Start processing frames immediately (before processor is ready) + logger.info(f"Starting frame processing loop for {peer.peer_name}") + frame_count = 0 + while True: + try: + frame = await track.recv() + frame_count += 1 + + if frame_count % 100 == 0: + logger.debug(f"Received {frame_count} frames from {peer.peer_name}") + + except MediaStreamError as e: + logger.info(f"Audio stream ended for {peer.peer_name}: {e}") + break + except Exception as e: + logger.error(f"Error receiving frame from {peer.peer_name}: {e}") + break + + if isinstance(frame, AudioFrame): + try: + # Convert frame to numpy array + audio_data = frame.to_ndarray() + + # Handle audio format conversion + audio_data = _process_audio_frame(audio_data, frame) + + # Resample if needed + if frame.sample_rate != SAMPLE_RATE: + audio_data = _resample_audio( + audio_data, frame.sample_rate, SAMPLE_RATE + ) + + # Convert to float32 + audio_data_float32 = audio_data.astype(np.float32) + + # Update visualization buffer immediately + WaveformVideoTrack.buffer[peer.peer_name] = np.concatenate( + [WaveformVideoTrack.buffer[peer.peer_name], audio_data_float32] + ) + # Limit buffer size to last 10 seconds + max_samples = SAMPLE_RATE * 10 + if len(WaveformVideoTrack.buffer[peer.peer_name]) > max_samples: + WaveformVideoTrack.buffer[peer.peer_name] = ( + WaveformVideoTrack.buffer[peer.peer_name][-max_samples:] + ) + + # Process with optimized processor if available + if peer.peer_name in _audio_processors: + audio_processor = _audio_processors[peer.peer_name] + audio_processor.add_audio_data(audio_data_float32) + + except Exception as e: + logger.error(f"Error processing audio frame for {peer.peer_name}: {e}") + continue # If processor already exists, just continue processing audio_processor = _audio_processors[peer.peer_name] @@ -1726,7 +1879,9 @@ async def handle_track_received(peer: Peer, track: MediaStreamTrack) -> None: # Convert to float32 audio_data_float32 = audio_data.astype(np.float32) - logger.debug(f"Processed audio frame {frame_count} from {peer.peer_name}: {len(audio_data_float32)} samples") + logger.debug( + f"Processed audio frame {frame_count} from {peer.peer_name}: {len(audio_data_float32)} samples" + ) # Process with optimized processor if available audio_processor.add_audio_data(audio_data_float32)