From 9089edaeeab4bde310845754ce0498ff237ee922 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Fri, 5 Sep 2025 16:50:19 -0700 Subject: [PATCH] Multi-user transcription --- client/src/MediaControl.tsx | 80 ++++- voicebot/bots/whisper.py | 488 +++++++++++++++++++++++------- voicebot/force_transcription.py | 148 +++++++++ voicebot/set_whisper_debug.py | 53 ++++ voicebot/test_whisper_pipeline.py | 110 +++++++ voicebot/webrtc_signaling.py | 16 + 6 files changed, 788 insertions(+), 107 deletions(-) create mode 100644 voicebot/force_transcription.py create mode 100644 voicebot/set_whisper_debug.py create mode 100644 voicebot/test_whisper_pipeline.py diff --git a/client/src/MediaControl.tsx b/client/src/MediaControl.tsx index 1c7ef41..eb12fb6 100644 --- a/client/src/MediaControl.tsx +++ b/client/src/MediaControl.tsx @@ -792,8 +792,44 @@ const MediaAgent = (props: MediaAgentProps) => { for (const candidate of pendingCandidates) { try { - await pc.addIceCandidate(new RTCIceCandidate(candidate)); - console.log(`media-agent - sessionDescription:${peer_name} - Queued ICE candidate added`); + if (!candidate.candidate) { + // End-of-candidates signal + await pc.addIceCandidate(undefined); + console.log(`media-agent - sessionDescription:${peer_name} - Queued end-of-candidates added`); + } else { + // Coerce and sanitize the incoming candidate before handing to the browser + let candStr: string | null = candidate.candidate ?? null; + if (typeof candStr === "string") { + candStr = candStr.trim(); + // Strip leading 'a=' if present (sometimes sent from SDP parsing) + if (candStr.startsWith("a=candidate:")) { + candStr = candStr.replace(/^a=/, ""); + } + // Ensure the string starts with the expected keyword + if (!candStr.startsWith("candidate:")) { + candStr = `candidate:${candStr}`; + } + } + + const candidateInit: RTCIceCandidateInit = { + candidate: candStr ?? "", + sdpMid: candidate.sdpMid ?? undefined, + sdpMLineIndex: + typeof candidate.sdpMLineIndex === "number" + ? candidate.sdpMLineIndex + : undefined, + }; + + try { + await pc.addIceCandidate(candidateInit); + console.log(`media-agent - sessionDescription:${peer_name} - Queued ICE candidate added`); + } catch (err) { + console.error( + `media-agent - sessionDescription:${peer_name} - Failed to add queued ICE candidate:`, + { candidateInit, err } + ); + } + } } catch (err) { console.error(`media-agent - sessionDescription:${peer_name} - Failed to add queued ICE candidate:`, err); } @@ -899,10 +935,42 @@ const MediaAgent = (props: MediaAgentProps) => { } // Add the ICE candidate - peer.connection - .addIceCandidate(new RTCIceCandidate(candidate)) - .then(() => console.log(`media-agent - iceCandidate::${peer_name} - ICE candidate added for ${peer.peer_name}`)) - .catch((err) => console.error(`media-agent - iceCandidate::${peer_name} - Failed to add ICE candidate:`, err)); + if (!candidate.candidate) { + // End-of-candidates signal + peer.connection + .addIceCandidate(undefined) + .then(() => + console.log(`media-agent - iceCandidate::${peer_name} - End-of-candidates added for ${peer.peer_name}`) + ) + .catch((err) => + console.error(`media-agent - iceCandidate::${peer_name} - Failed to add end-of-candidates:`, err) + ); + } else { + // Sanitize and coerce incoming candidate + let candStr: string | null = candidate.candidate ?? null; + if (typeof candStr === "string") { + candStr = candStr.trim(); + if (candStr.startsWith("a=candidate:")) { + candStr = candStr.replace(/^a=/, ""); + } + if (!candStr.startsWith("candidate:")) { + candStr = `candidate:${candStr}`; + } + } + + const candidateInit: RTCIceCandidateInit = { + candidate: candStr ?? "", + sdpMid: candidate.sdpMid ?? undefined, + sdpMLineIndex: typeof candidate.sdpMLineIndex === "number" ? candidate.sdpMLineIndex : undefined, + }; + + peer.connection + .addIceCandidate(candidateInit) + .then(() => + console.log(`media-agent - iceCandidate::${peer_name} - ICE candidate added for ${peer.peer_name}`) + ) + .catch((err) => console.error(`media-agent - iceCandidate::${peer_name} - Failed to add ICE candidate:`, { candidateInit, err })); + } }, [peers] ); diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index db20e17..9246882 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -25,7 +25,10 @@ import sys import os from voicebot.models import Peer -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +sys.path.append( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) from shared.models import ChatMessageModel from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq @@ -33,20 +36,25 @@ from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq # Type definitions AudioArray = npt.NDArray[np.float32] + class AudioQueueItem(BaseModel): """Audio data with timestamp for processing queue.""" + audio: AudioArray timestamp: float - + class Config: arbitrary_types_allowed = True + class TranscriptionHistoryItem(BaseModel): """Transcription history item with metadata.""" + message: str timestamp: float is_final: bool + AGENT_NAME = "whisper" AGENT_DESCRIPTION = "Real-time speech transcription (Whisper) - converts speech to text" sample_rate = 16000 # Whisper expects 16kHz @@ -55,7 +63,7 @@ model_ids = { "Distil-Whisper": [ "distil-whisper/distil-large-v2", "distil-whisper/distil-medium.en", - "distil-whisper/distil-small.en" + "distil-whisper/distil-small.en", ], "Whisper": [ "openai/whisper-large-v3", @@ -69,15 +77,24 @@ model_ids = { "openai/whisper-small.en", "openai/whisper-base.en", "openai/whisper-tiny.en", - ] + ], } # Global whisper model and transcription handler _model_type = model_ids["Distil-Whisper"] _model_id = _model_type[0] + +logger.info(f"Loading Whisper model: {_model_id}") _processor: Any = AutoProcessor.from_pretrained(pretrained_model_name_or_path=_model_id) # type: ignore -_pt_model: Any = AutoModelForSpeechSeq2Seq.from_pretrained(pretrained_model_name_or_path=_model_id) # type: ignore +logger.info("Whisper processor loaded successfully") + +_pt_model: Any = AutoModelForSpeechSeq2Seq.from_pretrained( + pretrained_model_name_or_path=_model_id +) # type: ignore _pt_model.eval() # type: ignore -_audio_processor: Optional['AudioProcessor'] = None +logger.info("Whisper model loaded and set to evaluation mode") + +_audio_processors: Dict[str, "AudioProcessor"] = {} # Per-peer audio processors +_send_chat_func: Optional[Callable[[str], Awaitable[None]]] = None def extract_input_features(audio_array: Any, sampling_rate: int) -> Any: @@ -90,214 +107,456 @@ def extract_input_features(audio_array: Any, sampling_rate: int) -> Any: input_features: Any = processor_output.input_features # type: ignore return input_features # type: ignore + class AudioProcessor: - """Handles audio stream processing and transcription with sentence chunking.""" - - def __init__(self, send_chat_func: Callable[[str], Awaitable[None]]): + """Handles audio stream processing and transcription with sentence chunking for a specific peer.""" + + def __init__( + self, peer_name: str, send_chat_func: Callable[[str], Awaitable[None]] + ): + self.peer_name = peer_name self.send_chat_func = send_chat_func self.sample_rate = 16000 # Whisper expects 16kHz self.samples_per_frame = 480 # Common WebRTC frame size at 16kHz (30ms) - + # Audio buffering - self.audio_buffer: Deque[AudioArray] = deque(maxlen=1000) # ~30 seconds at 30ms frames - self.phrase_timeout = 3.0 # seconds of silence before considering phrase complete + self.audio_buffer: Deque[AudioArray] = deque( + maxlen=1000 + ) # ~30 seconds at 30ms frames + self.phrase_timeout = ( + 3.0 # seconds of silence before considering phrase complete + ) self.last_activity_time = time.time() - + # Transcription state self.current_phrase_audio: AudioArray = np.array([], dtype=np.float32) self.transcription_history: list[TranscriptionHistoryItem] = [] - + # Background processing self.processing_queue: Queue[AudioQueueItem] = Queue() self.is_running = True - self.processor_thread = threading.Thread(target=self._processing_loop, daemon=True) + self.processor_thread = threading.Thread( + target=self._processing_loop, daemon=True + ) self.processor_thread.start() - - logger.info("AudioProcessor initialized for real-time transcription") - + + logger.info( + f"AudioProcessor initialized for {self.peer_name} - sample_rate: {self.sample_rate}Hz, frame_size: {self.samples_per_frame}, phrase_timeout: {self.phrase_timeout}s" + ) + def add_audio_data(self, audio_data: AudioArray): """Add new audio data to the processing buffer.""" if not self.is_running: + logger.debug("AudioProcessor not running, ignoring audio data") return - + # Resample if needed (WebRTC might provide different sample rates) if len(audio_data) > 0: self.audio_buffer.append(audio_data) self.last_activity_time = time.time() - + + # Calculate audio metrics to detect silence + audio_rms = np.sqrt(np.mean(audio_data**2)) + audio_peak = np.max(np.abs(audio_data)) + + # Log audio buffer status (reduced verbosity) + buffer_duration_ms = len(self.audio_buffer) * 30 # assuming 30ms frames + + # Only log if we have meaningful audio or every 50 frames + if audio_rms > 0.001 or len(self.audio_buffer) % 50 == 0: + logger.info( + f"Added audio chunk: {len(audio_data)} samples, buffer size: {len(self.audio_buffer)} frames ({buffer_duration_ms}ms), RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" + ) + else: + logger.debug( + f"Added silent audio chunk: {len(audio_data)} samples, buffer size: {len(self.audio_buffer)} frames" + ) + # Check if we should process accumulated audio if len(self.audio_buffer) >= 10: # Process every ~300ms (10 * 30ms frames) - self._queue_for_processing() - + # Check if we have any meaningful audio in the buffer + combined_audio = np.concatenate(list(self.audio_buffer)) + combined_rms = np.sqrt(np.mean(combined_audio**2)) + + if combined_rms > 0.001: # Only process if not silence + logger.info( + f"Buffer threshold reached with meaningful audio (RMS: {combined_rms:.4f}), queuing for processing" + ) + self._queue_for_processing() + else: + logger.debug( + f"Buffer threshold reached but audio is silent (RMS: {combined_rms:.4f}), clearing buffer" + ) + self.audio_buffer.clear() # Clear silent audio + def _queue_for_processing(self): """Queue current audio buffer for transcription processing.""" if not self.audio_buffer: + logger.debug("No audio in buffer to queue for processing") return - + # Combine recent audio frames combined_audio = np.concatenate(list(self.audio_buffer)) self.audio_buffer.clear() - + + # Calculate audio metrics + audio_duration_sec = len(combined_audio) / self.sample_rate + audio_rms = np.sqrt(np.mean(combined_audio**2)) + audio_peak = np.max(np.abs(combined_audio)) + + # Skip completely silent audio + if audio_rms < 0.001 and audio_peak < 0.001: + logger.debug( + f"Skipping silent audio chunk: RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" + ) + return + + logger.info( + f"Queuing audio chunk: {len(combined_audio)} samples, {audio_duration_sec:.2f}s duration, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" + ) + # Add to processing queue try: - queue_item = AudioQueueItem( - audio=combined_audio, - timestamp=time.time() - ) + queue_item = AudioQueueItem(audio=combined_audio, timestamp=time.time()) self.processing_queue.put_nowait(queue_item) - except Exception: + logger.info( + f"Added to processing queue, queue size: {self.processing_queue.qsize()}" + ) + except Exception as e: # Queue full, skip this chunk - logger.debug("Audio processing queue full, dropping audio chunk") - + logger.warning(f"Audio processing queue full, dropping audio chunk: {e}") + def _processing_loop(self): """Background thread that processes audio chunks for transcription.""" global _whisper_model - + + logger.info("ASR processing loop started") + while self.is_running: try: # Get audio chunk to process (blocking with timeout) try: audio_data = self.processing_queue.get(timeout=1.0) + logger.debug( + f"Retrieved audio chunk from queue, remaining queue size: {self.processing_queue.qsize()}" + ) except Empty: + logger.debug("Processing queue timeout, checking for more audio...") continue - + audio_array = audio_data.audio chunk_timestamp = audio_data.timestamp - + # Check if this is a new phrase (gap in audio) time_since_last = chunk_timestamp - self.last_activity_time phrase_complete = time_since_last > self.phrase_timeout - + + logger.debug( + f"Processing audio chunk: {len(audio_array)} samples, time since last: {time_since_last:.2f}s, phrase_complete: {phrase_complete}" + ) + if phrase_complete and len(self.current_phrase_audio) > 0: # Process the completed phrase + phrase_duration = len(self.current_phrase_audio) / self.sample_rate + phrase_rms = np.sqrt(np.mean(self.current_phrase_audio**2)) + + logger.info( + f"Processing completed phrase: {phrase_duration:.2f}s duration, {len(self.current_phrase_audio)} samples, RMS: {phrase_rms:.4f}" + ) + try: loop = asyncio.get_event_loop() asyncio.run_coroutine_threadsafe( - self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=True), - loop + self._transcribe_and_send( + self.current_phrase_audio.copy(), is_final=True + ), + loop, ) - except RuntimeError: + except RuntimeError as e: # No event loop running, skip this transcription + logger.warning( + f"No event loop available for final transcription: {e}" + ) pass self.current_phrase_audio = np.array([], dtype=np.float32) - + # Add new audio to current phrase - self.current_phrase_audio = np.concatenate([self.current_phrase_audio, audio_array]) - - # Also do streaming transcription for immediate feedback - if len(self.current_phrase_audio) > self.sample_rate * 2: # At least 2 seconds + old_phrase_length = len(self.current_phrase_audio) + self.current_phrase_audio = np.concatenate( + [self.current_phrase_audio, audio_array] + ) + current_phrase_duration = ( + len(self.current_phrase_audio) / self.sample_rate + ) + + logger.debug( + f"Updated current phrase: {old_phrase_length} -> {len(self.current_phrase_audio)} samples ({current_phrase_duration:.2f}s)" + ) + + # Lower the threshold for streaming transcription to catch shorter phrases + min_transcription_duration = 1.0 # Reduced from 2.0 seconds + + if ( + len(self.current_phrase_audio) + > self.sample_rate * min_transcription_duration + ): # At least 1 second + phrase_rms = np.sqrt(np.mean(self.current_phrase_audio**2)) + logger.info( + f"Current phrase >= {min_transcription_duration}s (RMS: {phrase_rms:.4f}), attempting streaming transcription" + ) try: loop = asyncio.get_event_loop() asyncio.run_coroutine_threadsafe( - self._transcribe_and_send(self.current_phrase_audio.copy(), is_final=False), - loop + self._transcribe_and_send( + self.current_phrase_audio.copy(), is_final=False + ), + loop, ) - except RuntimeError: + except RuntimeError as e: # No event loop running, skip this transcription + logger.warning( + f"No event loop available for streaming transcription: {e}" + ) pass - + except Exception as e: logger.error(f"Error in audio processing loop: {e}", exc_info=True) - + + logger.info("ASR processing loop ended") + async def _transcribe_and_send(self, audio_array: AudioArray, is_final: bool): """Transcribe audio and send result as chat message.""" global sample_rate - + + transcription_start_time = time.time() + transcription_type = "final" if is_final else "streaming" + try: - if len(audio_array) < self.sample_rate * 0.5: # Skip very short audio + audio_duration_sec = len(audio_array) / self.sample_rate + + # Reduce minimum audio duration threshold + min_duration = 0.3 # Reduced from 0.5 seconds + if len(audio_array) < self.sample_rate * min_duration: + logger.debug( + f"Skipping {transcription_type} transcription: audio too short ({audio_duration_sec:.2f}s < {min_duration}s)" + ) return - + + # Calculate audio quality metrics + audio_rms = np.sqrt(np.mean(audio_array**2)) + audio_peak = np.max(np.abs(audio_array)) + + # More lenient silence detection + if audio_rms < 0.0005: # Very quiet threshold + logger.debug( + f"Skipping {transcription_type} transcription: audio too quiet (RMS: {audio_rms:.6f})" + ) + return + + logger.info( + f"Starting {transcription_type} transcription: {audio_duration_sec:.2f}s audio, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" + ) + # Ensure audio is in the right format for Whisper audio_array = audio_array.astype(np.float32) - - # Transcribe with Whisper - input_features = extract_input_features(audio_array, sample_rate) - predicted_ids = _pt_model.generate(input_features) # type: ignore - transcription = _processor.batch_decode(predicted_ids, skip_special_tokens=True) # type: ignore - - text = transcription.strip() - - if text and len(text) > 1: # Only send meaningful transcriptions - prefix = "šŸŽ¤ " if is_final else "šŸŽ¤ [partial] " + # Transcribe with Whisper + feature_extraction_start = time.time() + input_features = extract_input_features(audio_array, sample_rate) + feature_extraction_time = time.time() - feature_extraction_start + + model_inference_start = time.time() + predicted_ids = _pt_model.generate(input_features) # type: ignore + model_inference_time = time.time() - model_inference_start + + decoding_start = time.time() + transcription = _processor.batch_decode( + predicted_ids, skip_special_tokens=True + ) # type: ignore + decoding_time = time.time() - decoding_start + + total_transcription_time = time.time() - transcription_start_time + + logger.debug( + f"ASR timing - Feature extraction: {feature_extraction_time:.3f}s, Model inference: {model_inference_time:.3f}s, Decoding: {decoding_time:.3f}s, Total: {total_transcription_time:.3f}s" + ) + + text = ( + transcription[0].strip() + if transcription and len(transcription) > 0 + else "" + ) + + if text and len(text) > 0: # Accept any non-empty text + prefix = ( + f"šŸŽ¤ {self.peer_name}: " + if is_final + else f"šŸŽ¤ {self.peer_name} [partial]: " + ) message = f"{prefix}{text}" - + # Avoid sending duplicate messages - if is_final or message not in [h.message for h in self.transcription_history[-3:]]: + if is_final or message not in [ + h.message for h in self.transcription_history[-3:] + ]: await self.send_chat_func(message) - + # Keep history for deduplication history_item = TranscriptionHistoryItem( - message=message, - timestamp=time.time(), - is_final=is_final + message=message, timestamp=time.time(), is_final=is_final ) self.transcription_history.append(history_item) - + # Limit history size if len(self.transcription_history) > 10: self.transcription_history.pop(0) - - logger.info(f"Transcribed ({'final' if is_final else 'partial'}): {text}") - + + logger.info( + f"āœ… Transcribed ({transcription_type}) for {self.peer_name}: '{text}' (processing time: {total_transcription_time:.3f}s, audio duration: {audio_duration_sec:.2f}s)" + ) + else: + logger.debug( + f"Skipping duplicate {transcription_type} transcription: '{text}'" + ) + else: + logger.info( + f"āŒ No text from {transcription_type} transcription for {self.peer_name} (empty result from model)" + ) + except Exception as e: - logger.error(f"Error in transcription: {e}", exc_info=True) - + logger.error( + f"Error in {transcription_type} transcription: {e}", exc_info=True + ) + def shutdown(self): """Shutdown the audio processor.""" + logger.info(f"Shutting down AudioProcessor for {self.peer_name}...") self.is_running = False if self.processor_thread.is_alive(): + logger.debug( + f"Waiting for processor thread for {self.peer_name} to finish..." + ) self.processor_thread.join(timeout=2.0) + if self.processor_thread.is_alive(): + logger.warning( + f"Processor thread for {self.peer_name} did not shut down cleanly within timeout" + ) + else: + logger.info( + f"Processor thread for {self.peer_name} shut down successfully" + ) + logger.info(f"AudioProcessor for {self.peer_name} shutdown complete") async def handle_track_received(peer: Peer, track: MediaStreamTrack): """Handle incoming audio tracks from WebRTC peers.""" - global _audio_processor - + global _audio_processors, _send_chat_func + if track.kind != "audio": - logger.info(f"Ignoring non-audio track: {track.kind}") + logger.info(f"Ignoring non-audio track from {peer.peer_name}: {track.kind}") return - - logger.info(f"Received audio track from {peer.peer_name}, starting transcription") - + + # Create or get audio processor for this peer + if peer.peer_name not in _audio_processors: + if _send_chat_func is None: + logger.error( + f"Cannot create AudioProcessor for {peer.peer_name}: no send_chat_func available" + ) + return + + logger.info(f"Creating new AudioProcessor for {peer.peer_name}") + _audio_processors[peer.peer_name] = AudioProcessor( + peer_name=peer.peer_name, send_chat_func=_send_chat_func + ) + + audio_processor = _audio_processors[peer.peer_name] + + logger.info( + f"Received audio track from {peer.peer_name}, starting transcription (processor available: {audio_processor is not None})" + ) + try: while True: # Receive audio frame frame = await track.recv() if isinstance(frame, AudioFrame): - logger.info(f"Received audio frame: {frame.sample_rate}Hz, {frame.format.name}, {frame.layout.name}") + frame_info = ( + f"{frame.sample_rate}Hz, {frame.format.name}, {frame.layout.name}" + ) + logger.debug( + f"Received audio frame from {peer.peer_name}: {frame_info}" + ) + # Convert AudioFrame to numpy array audio_data = frame.to_ndarray() - + original_shape = audio_data.shape + original_dtype = audio_data.dtype + + logger.debug( + f"Audio frame data: shape={original_shape}, dtype={original_dtype}" + ) + # Handle different audio formats if audio_data.ndim == 2: # Stereo -> mono audio_data = np.mean(audio_data, axis=1) - + logger.debug( + f"Converted stereo to mono: {original_shape} -> {audio_data.shape}" + ) + # Convert to float32 and normalize if audio_data.dtype == np.int16: audio_data = audio_data.astype(np.float32) / 32768.0 + logger.debug("Normalized int16 audio to float32") elif audio_data.dtype == np.int32: audio_data = audio_data.astype(np.float32) / 2147483648.0 - + logger.debug("Normalized int32 audio to float32") + # Resample to 16kHz if needed if frame.sample_rate != sample_rate: - audio_data = librosa.resample( # type: ignore - audio_data, - orig_sr=frame.sample_rate, - target_sr=sample_rate + original_length = len(audio_data) + audio_data = librosa.resample( # type: ignore + audio_data, orig_sr=frame.sample_rate, target_sr=sample_rate ) - - # Ensure audio_data is AudioArray (float32) + logger.debug( + f"Resampled audio: {frame.sample_rate}Hz -> {sample_rate}Hz, {original_length} -> {len(audio_data)} samples" + ) + + # Ensure audio_data is AudioArray (float32) audio_data_float32 = cast(AudioArray, audio_data.astype(np.float32)) - + + # Calculate audio quality metrics for this frame + frame_rms = np.sqrt(np.mean(audio_data_float32**2)) + frame_peak = np.max(np.abs(audio_data_float32)) + + # Only log full frame details every 20 frames to reduce noise + frame_count = getattr(peer, "_whisper_frame_count", 0) + 1 + setattr(peer, "_whisper_frame_count", frame_count) + + if frame_count % 20 == 0: + logger.info( + f"Audio frame #{frame_count} from {peer.peer_name}: {frame_info}, {len(audio_data_float32)} samples, RMS: {frame_rms:.4f}, Peak: {frame_peak:.4f}" + ) + else: + logger.debug( + f"Audio frame #{frame_count}: RMS: {frame_rms:.4f}, Peak: {frame_peak:.4f}" + ) + # Send to audio processor - if _audio_processor: - _audio_processor.add_audio_data(audio_data_float32) + if audio_processor: + audio_processor.add_audio_data(audio_data_float32) + else: + logger.warning( + f"No audio processor available to handle audio data for {peer.peer_name}" + ) else: - logger.warning(f"Received non-audio frame on audio track from {peer.peer_name}") - + logger.warning( + f"Received non-audio frame on audio track from {peer.peer_name}: type={type(frame)}" + ) + except Exception as e: - logger.error(f"Error processing audio track from {peer.peer_name}: {e}", exc_info=True) + logger.error( + f"Error processing audio track from {peer.peer_name}: {e}", exc_info=True + ) def agent_info() -> Dict[str, str]: @@ -309,7 +568,9 @@ def create_agent_tracks(session_name: str) -> dict[str, MediaStreamTrack]: return {} -async def handle_chat_message(chat_message: ChatMessageModel, send_message_func: Callable[[str], Awaitable[None]]) -> Optional[str]: +async def handle_chat_message( + chat_message: ChatMessageModel, send_message_func: Callable[[str], Awaitable[None]] +) -> Optional[str]: """Handle incoming chat messages and optionally return a response.""" pass @@ -318,16 +579,41 @@ async def on_track_received(peer: Peer, track: MediaStreamTrack): """Callback when a new track is received from a peer.""" await handle_track_received(peer, track) + # Export functions for the orchestrator to discover def get_track_handler(): """Return the track handler function for the orchestrator to use.""" return on_track_received + def bind_send_chat_function(send_chat_func: Callable[[str], Awaitable[None]]): - """Bind the send chat function to the audio processor.""" - global _send_chat_func, _audio_processor + """Bind the send chat function to be used for all audio processors.""" + global _send_chat_func, _audio_processors + logger.info("Binding send chat function to whisper agent") _send_chat_func = send_chat_func - if _audio_processor: - _audio_processor.send_chat_func = send_chat_func + + # Update existing audio processors + for peer_name, processor in _audio_processors.items(): + logger.debug( + f"Updating AudioProcessor for {peer_name} with new send chat function" + ) + processor.send_chat_func = send_chat_func + + +def cleanup_peer_processor(peer_name: str): + """Clean up audio processor for a disconnected peer.""" + global _audio_processors + + if peer_name in _audio_processors: + logger.info(f"Cleaning up AudioProcessor for disconnected peer: {peer_name}") + processor = _audio_processors[peer_name] + processor.shutdown() + del _audio_processors[peer_name] + logger.info(f"AudioProcessor for {peer_name} cleaned up successfully") else: - _audio_processor = AudioProcessor(send_chat_func=send_chat_func) + logger.debug(f"No AudioProcessor found for peer {peer_name} during cleanup") + + +def get_active_processors() -> Dict[str, "AudioProcessor"]: + """Get currently active audio processors (for debugging).""" + return _audio_processors.copy() diff --git a/voicebot/force_transcription.py b/voicebot/force_transcription.py new file mode 100644 index 0000000..24f222b --- /dev/null +++ b/voicebot/force_transcription.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Force transcription debug - processes any accumulated audio immediately. +Run this to force the whisper agent to attempt transcription of current audio buffer. +""" + +import sys +import os +import asyncio +import numpy as np + +# Add the voicebot directory to the path +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + + +def force_transcription(): + """Force transcription of any accumulated audio.""" + try: + from bots.whisper import _audio_processors + + if not _audio_processors: + print( + "āŒ No audio processors found. Whisper agent may not be running or no peers connected." + ) + return + + print(f"šŸ” Found {len(_audio_processors)} active audio processors:") + + for peer_name, audio_processor in _audio_processors.items(): + print(f"\nšŸ‘¤ {peer_name}:") + print(f" - Running: {audio_processor.is_running}") + print(f" - Buffer size: {len(audio_processor.audio_buffer)} frames") + print(f" - Queue size: {audio_processor.processing_queue.qsize()}") + print( + f" - Current phrase length: {len(audio_processor.current_phrase_audio)} samples" + ) + + # Force processing of current buffer + if len(audio_processor.audio_buffer) > 0: + print( + f"šŸ”„ Forcing processing of {len(audio_processor.audio_buffer)} buffered frames for {peer_name}..." + ) + audio_processor._queue_for_processing() + else: + print(f"šŸ“­ No audio in buffer to process for {peer_name}") + + # If we have a current phrase, try to transcribe it + if len(audio_processor.current_phrase_audio) > 0: + phrase_duration = ( + len(audio_processor.current_phrase_audio) + / audio_processor.sample_rate + ) + phrase_rms = np.sqrt(np.mean(audio_processor.current_phrase_audio**2)) + print( + f"šŸŽ¤ Current phrase for {peer_name}: {phrase_duration:.2f}s, RMS: {phrase_rms:.6f}" + ) + + if phrase_duration > 0.3: # Minimum duration + print( + f"šŸš€ Forcing transcription of current phrase for {peer_name}..." + ) + + # Create an event loop if none exists + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Force transcription + async def force_transcribe(): + await audio_processor._transcribe_and_send( + audio_processor.current_phrase_audio.copy(), is_final=True + ) + + loop.run_until_complete(force_transcribe()) + print(f"āœ… Forced transcription completed for {peer_name}") + else: + print( + f"ā±ļø Current phrase too short for {peer_name} ({phrase_duration:.2f}s < 0.3s)" + ) + else: + print(f"🤐 No current phrase to transcribe for {peer_name}") + + except ImportError: + print( + "āŒ Could not import whisper components. Make sure the whisper agent is loaded." + ) + except Exception as e: + print(f"āŒ Error: {e}") + + +def show_audio_stats(): + """Show detailed audio statistics.""" + try: + from bots.whisper import _audio_processors + + if not _audio_processors: + print("āŒ No audio processors found") + return + + print( + f"\nšŸ“Š Detailed Audio Statistics for {len(_audio_processors)} processors:" + ) + + for peer_name, audio_processor in _audio_processors.items(): + print(f"\nšŸ‘¤ {peer_name}:") + print(f"Sample rate: {audio_processor.sample_rate}Hz") + print(f"Samples per frame: {audio_processor.samples_per_frame}") + print(f"Phrase timeout: {audio_processor.phrase_timeout}s") + print(f"Buffer max length: {audio_processor.audio_buffer.maxlen}") + print(f"Current buffer size: {len(audio_processor.audio_buffer)}") + print(f"Processing queue size: {audio_processor.processing_queue.qsize()}") + + if len(audio_processor.current_phrase_audio) > 0: + phrase_duration = ( + len(audio_processor.current_phrase_audio) + / audio_processor.sample_rate + ) + phrase_rms = np.sqrt(np.mean(audio_processor.current_phrase_audio**2)) + phrase_peak = np.max(np.abs(audio_processor.current_phrase_audio)) + print(" Current phrase:") + print(f" Duration: {phrase_duration:.2f}s") + print(f" Samples: {len(audio_processor.current_phrase_audio)}") + print(f" RMS: {phrase_rms:.6f}") + print(f" Peak: {phrase_peak:.6f}") + + if len(audio_processor.audio_buffer) > 0: + combined = np.concatenate(list(audio_processor.audio_buffer)) + buffer_duration = len(combined) / audio_processor.sample_rate + buffer_rms = np.sqrt(np.mean(combined**2)) + buffer_peak = np.max(np.abs(combined)) + print(" Buffer contents:") + print(f" Duration: {buffer_duration:.2f}s") + print(f" Samples: {len(combined)}") + print(f" RMS: {buffer_rms:.6f}") + print(f" Peak: {buffer_peak:.6f}") + + except Exception as e: + print(f"āŒ Error getting stats: {e}") + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "stats": + show_audio_stats() + else: + force_transcription() + show_audio_stats() diff --git a/voicebot/set_whisper_debug.py b/voicebot/set_whisper_debug.py new file mode 100644 index 0000000..ab48d05 --- /dev/null +++ b/voicebot/set_whisper_debug.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +""" +Helper script to adjust whisper ASR logging levels for debugging. +Run this to see more detailed ASR logging. +""" + +import logging +import sys +import os + +# Add the voicebot directory to the path +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from logger import logger + + +def set_debug_logging(): + """Set logger to DEBUG level for detailed ASR logging.""" + logger.setLevel(logging.DEBUG) + + # Also set the root logger + logging.getLogger().setLevel(logging.DEBUG) + + # Create a more detailed formatter if needed + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s" + ) + + # Update all handlers + for handler in logger.handlers: + handler.setLevel(logging.DEBUG) + handler.setFormatter(formatter) + + logger.info("Debug logging enabled for Whisper ASR") + + +def set_info_logging(): + """Set logger back to INFO level.""" + logger.setLevel(logging.INFO) + logging.getLogger().setLevel(logging.INFO) + + # Update all handlers + for handler in logger.handlers: + handler.setLevel(logging.INFO) + + logger.info("Info logging enabled for Whisper ASR") + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "info": + set_info_logging() + else: + set_debug_logging() diff --git a/voicebot/test_whisper_pipeline.py b/voicebot/test_whisper_pipeline.py new file mode 100644 index 0000000..61e80fc --- /dev/null +++ b/voicebot/test_whisper_pipeline.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +""" +Debug script to test Whisper transcription with synthetic audio. +This helps identify if the issue is with audio processing or the transcription pipeline. +""" + +import numpy as np +import time +import sys +import os + +# Add the voicebot directory to the path +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +try: + from bots.whisper import extract_input_features, _pt_model, _processor, sample_rate +except ImportError as e: + print(f"Error importing whisper components: {e}") + print("Make sure you're running this from the voicebot directory") + sys.exit(1) + + +def generate_test_audio( + duration_seconds: float = 2.0, frequency: float = 440.0 +) -> np.ndarray: + """Generate a synthetic sine wave for testing.""" + samples = int(duration_seconds * sample_rate) + t = np.linspace(0, duration_seconds, samples, False) + # Generate a sine wave with some amplitude modulation to simulate speech-like patterns + amplitude = 0.1 * ( + 1 + 0.5 * np.sin(2 * np.pi * 2 * t) + ) # Amplitude modulation at 2Hz + audio = amplitude * np.sin(2 * np.pi * frequency * t) + return audio.astype(np.float32) + + +def test_transcription_pipeline(): + """Test the Whisper transcription pipeline with synthetic audio.""" + print("Testing Whisper transcription pipeline...") + + # Test 1: Complete silence + print("\n=== Test 1: Complete Silence ===") + silent_audio = np.zeros(int(sample_rate * 2.0), dtype=np.float32) + test_audio_transcription(silent_audio, "Silent audio") + + # Test 2: Very quiet noise + print("\n=== Test 2: Very Quiet Noise ===") + quiet_noise = np.random.normal(0, 0.001, int(sample_rate * 2.0)).astype(np.float32) + test_audio_transcription(quiet_noise, "Quiet noise") + + # Test 3: Sine wave (should produce some output) + print("\n=== Test 3: Sine Wave ===") + sine_audio = generate_test_audio(2.0, 440.0) + test_audio_transcription(sine_audio, "Sine wave") + + # Test 4: Multiple frequency sine wave + print("\n=== Test 4: Complex Sine Wave ===") + complex_audio = ( + generate_test_audio(2.0, 220.0) + + generate_test_audio(2.0, 440.0) + + generate_test_audio(2.0, 880.0) + ) / 3.0 + test_audio_transcription(complex_audio, "Complex sine wave") + + +def test_audio_transcription(audio_array: np.ndarray, description: str): + """Test transcription of a specific audio array.""" + try: + # Calculate metrics + duration = len(audio_array) / sample_rate + rms = np.sqrt(np.mean(audio_array**2)) + peak = np.max(np.abs(audio_array)) + + print(f"Testing {description}:") + print(f" Duration: {duration:.2f}s") + print(f" Samples: {len(audio_array)}") + print(f" RMS: {rms:.6f}") + print(f" Peak: {peak:.6f}") + + # Test feature extraction + start_time = time.time() + input_features = extract_input_features(audio_array, sample_rate) + feature_time = time.time() - start_time + print(f" Feature extraction: {feature_time:.3f}s") + + # Test model inference + start_time = time.time() + predicted_ids = _pt_model.generate(input_features) + inference_time = time.time() - start_time + print(f" Model inference: {inference_time:.3f}s") + + # Test decoding + start_time = time.time() + transcription = _processor.batch_decode(predicted_ids, skip_special_tokens=True) + decoding_time = time.time() - start_time + print(f" Decoding: {decoding_time:.3f}s") + + # Show result + text = ( + transcription[0].strip() if transcription and len(transcription) > 0 else "" + ) + print(f" Result: '{text}'" if text else " Result: (empty)") + print(f" Result length: {len(text)}") + + except Exception as e: + print(f" ERROR: {e}") + + +if __name__ == "__main__": + test_transcription_pipeline() diff --git a/voicebot/webrtc_signaling.py b/voicebot/webrtc_signaling.py index a4f6f5c..80e747f 100644 --- a/voicebot/webrtc_signaling.py +++ b/voicebot/webrtc_signaling.py @@ -778,6 +778,14 @@ class WebRTCSignalingClient: f"ICE candidate outgoing for {peer_name}: type={cand_type} protocol={protocol} sdp={raw}" ) + # Ensure candidate has the proper SDP format + if raw and not raw.startswith("candidate:"): + raw = f"candidate:{raw}" + + # Clean up any extra spaces + if raw: + raw = raw.replace("candidate: ", "candidate:") + candidate_model = ICECandidateDictModel( candidate=raw, sdpMid=getattr(candidate, "sdpMid", None), @@ -965,6 +973,14 @@ class WebRTCSignalingClient: elif line.startswith("a=candidate:"): candidate_sdp = line[2:] # Remove 'a=' prefix + # Ensure candidate has the proper SDP format + if candidate_sdp and not candidate_sdp.startswith("candidate:"): + candidate_sdp = f"candidate:{candidate_sdp}" + + # Clean up any extra spaces + if candidate_sdp: + candidate_sdp = candidate_sdp.replace("candidate: ", "candidate:") + # Only send if we have valid MID and media index if current_section_mid is not None and current_media_index >= 0: candidate_model = ICECandidateDictModel(