diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index 9d9a016..aaf9c5c 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -112,6 +112,8 @@ def extract_input_features(audio_array: Any, sampling_rate: int) -> Any: class AudioProcessor: """Handles audio stream processing and transcription with sentence chunking for a specific peer.""" + main_loop: Optional[asyncio.AbstractEventLoop] + def __init__( self, peer_name: str, send_chat_func: Callable[[str], Awaitable[None]] ): @@ -133,6 +135,15 @@ class AudioProcessor: self.current_phrase_audio: AudioArray = np.array([], dtype=np.float32) self.transcription_history: list[TranscriptionHistoryItem] = [] + # Capture the main thread's event loop for background processing + try: + self.main_loop = asyncio.get_running_loop() + logger.debug(f"Captured main event loop for {self.peer_name}") + except RuntimeError: + # No event loop running, we'll need to create one + self.main_loop = None + logger.warning(f"No event loop running when initializing AudioProcessor for {self.peer_name}") + # Background processing self.processing_queue: Queue[AudioQueueItem] = Queue() self.is_running = True @@ -153,8 +164,9 @@ class AudioProcessor: # Resample if needed (WebRTC might provide different sample rates) if len(audio_data) > 0: + audio_received_time = time.time() self.audio_buffer.append(audio_data) - self.last_activity_time = time.time() + self.last_activity_time = audio_received_time # Calculate audio metrics to detect silence audio_rms = np.sqrt(np.mean(audio_data**2)) @@ -166,7 +178,7 @@ class AudioProcessor: # Only log if we have meaningful audio or every 50 frames if audio_rms > 0.001 or len(self.audio_buffer) % 50 == 0: logger.info( - f"Added audio chunk: {len(audio_data)} samples, buffer size: {len(self.audio_buffer)} frames ({buffer_duration_ms}ms), RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" + f"📥 AUDIO BUFFER ADD at {audio_received_time:.3f}: {len(audio_data)} samples, buffer size: {len(self.audio_buffer)} frames ({buffer_duration_ms}ms), RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f} (peer: {self.peer_name})" ) else: logger.debug( @@ -180,8 +192,9 @@ class AudioProcessor: combined_rms = np.sqrt(np.mean(combined_audio**2)) if combined_rms > 0.001: # Only process if not silence + buffer_queue_time = time.time() logger.info( - f"Buffer threshold reached with meaningful audio (RMS: {combined_rms:.4f}), queuing for processing" + f"🚀 BUFFER QUEUING at {buffer_queue_time:.3f}: Buffer threshold reached with meaningful audio (RMS: {combined_rms:.4f}), queuing for processing (peer: {self.peer_name})" ) self._queue_for_processing() else: @@ -213,7 +226,7 @@ class AudioProcessor: return logger.info( - f"Queuing audio chunk: {len(combined_audio)} samples, {audio_duration_sec:.2f}s duration, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" + f"📦 AUDIO CHUNK QUEUED at {time.time():.3f}: {len(combined_audio)} samples, {audio_duration_sec:.2f}s duration, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f} (peer: {self.peer_name})" ) # Add to processing queue @@ -221,7 +234,7 @@ class AudioProcessor: queue_item = AudioQueueItem(audio=combined_audio, timestamp=time.time()) self.processing_queue.put_nowait(queue_item) logger.info( - f"Added to processing queue, queue size: {self.processing_queue.qsize()}" + f"📋 PROCESSING QUEUE ADD at {time.time():.3f}: Added to processing queue, queue size: {self.processing_queue.qsize()} (peer: {self.peer_name})" ) except Exception as e: # Queue full, skip this chunk @@ -238,8 +251,9 @@ class AudioProcessor: # Get audio chunk to process (blocking with timeout) try: audio_data = self.processing_queue.get(timeout=1.0) - logger.debug( - f"Retrieved audio chunk from queue, remaining queue size: {self.processing_queue.qsize()}" + processing_start_time = time.time() + logger.info( + f"🔄 PROCESSING STARTED at {processing_start_time:.3f}: Retrieved audio chunk from queue, remaining queue size: {self.processing_queue.qsize()} (peer: {self.peer_name})" ) except Empty: logger.debug("Processing queue timeout, checking for more audio...") @@ -265,18 +279,16 @@ class AudioProcessor: f"Processing completed phrase: {phrase_duration:.2f}s duration, {len(self.current_phrase_audio)} samples, RMS: {phrase_rms:.4f}" ) - try: - loop = asyncio.get_event_loop() + if self.main_loop and not self.main_loop.is_closed(): asyncio.run_coroutine_threadsafe( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ), - loop, + self.main_loop, ) - except RuntimeError as e: - # No event loop running, skip this transcription + else: logger.warning( - f"No event loop available for final transcription: {e}" + f"No event loop available for final transcription (peer: {self.peer_name})" ) pass self.current_phrase_audio = np.array([], dtype=np.float32) @@ -305,20 +317,17 @@ class AudioProcessor: logger.info( f"Current phrase >= {min_transcription_duration}s (RMS: {phrase_rms:.4f}), attempting streaming transcription" ) - try: - loop = asyncio.get_event_loop() + if self.main_loop and not self.main_loop.is_closed(): asyncio.run_coroutine_threadsafe( self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=False ), - loop, + self.main_loop, ) - except RuntimeError as e: - # No event loop running, skip this transcription + else: logger.warning( - f"No event loop available for streaming transcription: {e}" + f"No event loop available for streaming transcription (peer: {self.peer_name})" ) - pass except Exception as e: logger.error(f"Error in audio processing loop: {e}", exc_info=True) @@ -355,7 +364,7 @@ class AudioProcessor: return logger.info( - f"Starting {transcription_type} transcription: {audio_duration_sec:.2f}s audio, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f}" + f"🎬 TRANSCRIPTION STARTED ({transcription_type}) at {time.time():.3f}: {audio_duration_sec:.2f}s audio, RMS: {audio_rms:.4f}, Peak: {audio_peak:.4f} (peer: {self.peer_name})" ) # Ensure audio is in the right format for Whisper @@ -389,18 +398,30 @@ class AudioProcessor: ) if text and len(text) > 0: # Accept any non-empty text - prefix = ( - f"🎤 {self.peer_name}: " - if is_final - else f"🎤 {self.peer_name} [partial]: " - ) - message = f"{prefix}{text}" + # Calculate timing information for the message + chat_send_start = time.time() + total_transcription_time = chat_send_start - transcription_start_time + + # Create message with timing information included + status_marker = "🎤" if is_final else "🎤" + type_marker = "" if is_final else " [partial]" + timing_info = f" (⏱️ {total_transcription_time:.2f}s from start: {transcription_start_time:.3f})" + + prefix = f"{status_marker} {self.peer_name}{type_marker}: " + message = f"{prefix}{text}{timing_info}" - # Avoid sending duplicate messages - if is_final or message not in [ - h.message for h in self.transcription_history[-3:] + # Avoid sending duplicate messages (check text only, not timing) + text_only_message = f"{prefix}{text}" + if is_final or text_only_message not in [ + h.message.split(' (⏱️')[0] for h in self.transcription_history[-3:] ]: await self.send_chat_func(message) + chat_send_time = time.time() - chat_send_start + message_sent_time = time.time() + + logger.info( + f"💬 CHAT MESSAGE SENT at {message_sent_time:.3f}: '{text}' (transcription started: {transcription_start_time:.3f}, chat send took: {chat_send_time:.3f}s, peer: {self.peer_name})" + ) # Keep history for deduplication history_item = TranscriptionHistoryItem( @@ -415,6 +436,12 @@ class AudioProcessor: logger.info( f"✅ Transcribed ({transcription_type}) for {self.peer_name}: '{text}' (processing time: {total_transcription_time:.3f}s, audio duration: {audio_duration_sec:.2f}s)" ) + + # Log end-to-end pipeline timing + total_pipeline_time = message_sent_time - transcription_start_time + logger.info( + f"⏱️ PIPELINE TIMING ({transcription_type}): Total={total_pipeline_time:.3f}s (Transcription={total_transcription_time:.3f}s, Chat Send={chat_send_time:.3f}s, peer: {self.peer_name}) | 🕐 Start: {transcription_start_time:.3f}, End: {message_sent_time:.3f}" + ) else: logger.debug( f"Skipping duplicate {transcription_type} transcription: '{text}'"