From 916151307f8ef3c6f466dd81072b38920a4a6a31 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Mon, 15 Sep 2025 15:34:04 -0700 Subject: [PATCH] Whisper reworks to get final transcription to reliably fire --- voicebot/bots/whisper.py | 458 +++++++++++++++++++++++++++++++++++---- 1 file changed, 415 insertions(+), 43 deletions(-) diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index d2627d7..a1e2ba3 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -60,6 +60,47 @@ CalibrationData = List[Dict[str, Any]] _device = "GPU.1" # Default to Intel Arc B580 GPU +# Global lock to serialize calls into the OpenVINO model.generate/decode +# since some backends are not safe for concurrent generate calls. +_generate_global_lock = threading.Lock() + + +def _blocking_generate_decode(audio_array: AudioArray, sample_rate: int, generation_config: GenerationConfig | None = None) -> str: + """Blocking helper to run processor -> model.generate -> decode while + holding a global lock to serialize OpenVINO access. + """ + try: + with _generate_global_lock: + ov_model = _ensure_model_loaded() + if ov_model.processor is None: + raise RuntimeError("Processor not initialized for OpenVINO model") + + # Extract features + inputs = ov_model.processor(audio_array, sampling_rate=sample_rate, return_tensors="pt") + input_features = inputs.input_features + + # Use a basic generation config if none provided + gen_cfg = generation_config or GenerationConfig(max_new_tokens=128) + + gen_out = ov_model.ov_model.generate(input_features, generation_config=gen_cfg) # type: ignore + + # Prefer .sequences if available + if hasattr(gen_out, "sequences"): + ids = gen_out.sequences + else: + ids = gen_out + + # Decode + try: + transcription = ov_model.processor.batch_decode(ids, skip_special_tokens=True)[0].strip() + except Exception: + transcription = "" + + return transcription + except Exception as e: + logger.error(f"blocking_generate_decode failed: {e}", exc_info=True) + return "" + def get_available_devices() -> list[dict[str, Any]]: """List available OpenVINO devices with their properties.""" @@ -213,6 +254,9 @@ VAD_CONFIG = { "speech_freq_max": 3000, # Hz } +# How long (seconds) of no-arriving audio before we consider the phrase ended +INACTIVITY_TIMEOUT = 1.5 + model_ids = { "Distil-Whisper": [ "distil-whisper/distil-large-v2", @@ -1128,7 +1172,18 @@ class OptimizedAudioProcessor: try: self.main_loop = asyncio.get_running_loop() asyncio.create_task(self._async_processing_loop()) + # Start inactivity watchdog to ensure finalization when frames stop arriving + try: + asyncio.create_task(self._silence_watchdog()) + except Exception: + logger.debug(f"Could not start silence watchdog task for {self.peer_name}") logger.info(f"Started async processing for {self.peer_name}") + # Lock to serialize model.generate calls (OpenVINO model may not + # be reentrant across concurrent generate calls). + try: + self._generate_lock = asyncio.Lock() + except Exception: + self._generate_lock = None except RuntimeError: # Fallback to thread-based processing self.main_loop = None @@ -1137,9 +1192,37 @@ class OptimizedAudioProcessor: ) self.processor_thread.start() logger.warning(f"Using thread-based processing for {self.peer_name}") + # For thread-fallback create a thread lock used if asyncio lock is unavailable + self._generate_lock = threading.Lock() logger.info(f"OptimizedAudioProcessor initialized for {self.peer_name}") + async def _silence_watchdog(self) -> None: + """Watch for inactivity (no frames arriving) and queue a final transcription. + + This runs as a lightweight task and uses `last_audio_time` which is + updated on any received audio frame. This makes finalization robust in + the case where the remote peer simply stops sending frames (no + non-speech frames will arrive to increment `silence_frames`). + """ + logger.debug(f"Silence watchdog started for {self.peer_name}") + try: + while self.is_running: + await asyncio.sleep(0.5) + try: + if ( + len(self.current_phrase_audio) > 0 + and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT + ): + logger.info( + f"Silence watchdog: no audio for {time.time() - self.last_audio_time:.2f}s, queuing final for {self.peer_name}" + ) + self._queue_final_transcription() + except Exception as e: + logger.debug(f"Silence watchdog error for {self.peer_name}: {e}") + finally: + logger.debug(f"Silence watchdog exiting for {self.peer_name}") + def add_audio_data(self, audio_data: AudioArray) -> None: """Add audio data with enhanced Voice Activity Detection, preventing leading silence.""" if not self.is_running or len(audio_data) == 0: @@ -1262,17 +1345,163 @@ class OptimizedAudioProcessor: if ( len(self.current_phrase_audio) > self.sample_rate * 0.5 ): # At least 0.5 seconds + logger.info(f"Queueing final transcription for {self.peer_name}") + self.final_transcription_pending = True + # Use a blocking-worker path for final transcriptions that runs + # model.generate in a threadpool. This isolates the heavy OpenVINO + # call from the event loop and reduces races / reentrancy issues. if self.main_loop: - logger.info(f"Queueing final transcription for {self.peer_name}") - self.final_transcription_pending = True - asyncio.create_task( - self._transcribe_and_send( - self.current_phrase_audio.copy(), is_final=True + try: + asyncio.run_coroutine_threadsafe( + self._blocking_transcribe_and_send( + self.current_phrase_audio.copy(), is_final=True + ), + self.main_loop, + ) + except Exception as e: + logger.error(f"Failed to schedule blocking final transcription: {e}") + # Also schedule an immediate lightweight final marker so the UI + #/client sees a final event right away (helps when generation is + #delayed or fails). We schedule this on the main loop. + try: + async def _send_final_marker(): + try: + marker_text = f"⚡ {self.peer_name}: (finalizing...)" + # Reuse existing streaming message id if present so the + # UI updates the streaming message into a final marker + message_id = self.current_message.id if self.current_message is not None else None + cm = self.create_chat_message_func(marker_text, message_id) + # Keep the current_message reference so the final send + # can reuse the same id. + if self.current_message is None: + try: + self.current_message = cm + except Exception: + pass + await self.send_chat_func(cm) + logger.info(f"{self.peer_name}: sent immediate final marker") + except Exception as e: + logger.debug(f"Failed to send final marker for {self.peer_name}: {e}") + + asyncio.run_coroutine_threadsafe(_send_final_marker(), self.main_loop) + except Exception: + logger.debug(f"Could not schedule final marker for {self.peer_name}") + else: + # As a fallback, try to schedule the normal coroutine if possible + try: + asyncio.create_task( + self._transcribe_and_send( + self.current_phrase_audio.copy(), is_final=True + ) + ) + except Exception: + logger.debug( + f"Could not schedule final transcription for {self.peer_name} (no main_loop)" ) - ) self.current_phrase_audio = np.array([], dtype=np.float32) + async def _blocking_transcribe_and_send( + self, audio_array: AudioArray, is_final: bool, language: str = "en" + ) -> None: + """Run the heavy generate+decode work inside a threadpool, then send the + chat message on the event loop. This reduces reentrancy and resource + contention with streaming transcriptions. + """ + loop = asyncio.get_event_loop() + + def blocking_work(audio_in: AudioArray) -> tuple[str, float]: + try: + # Ensure model is loaded in this thread/process + ov_model = _ensure_model_loaded() + + # Extract features (this is relatively cheap but keep on thread) + input_features = ov_model.processor( + audio_in, sampling_rate=self.sample_rate, return_tensors="pt" + ).input_features + + # Perform generation (blocking) + # Serialize access to the underlying OpenVINO generation call + # to avoid concurrency problems with the OpenVINO runtime. + with _generate_global_lock: + gen_out = ov_model.ov_model.generate( + input_features, generation_config=GenerationConfig(max_new_tokens=128) + ) + + # Try to extract sequences if present + if hasattr(gen_out, "sequences"): + ids = gen_out.sequences + else: + ids = gen_out + + # Decode + try: + text = ov_model.processor.batch_decode(ids, skip_special_tokens=True)[0].strip() + except Exception: + text = "" + + return text, 0.0 + except Exception as e: + logger.error(f"Blocking transcription failed for {self.peer_name}: {e}", exc_info=True) + return "", 0.0 + + try: + # Run blocking work in executor + transcription, _ = await loop.run_in_executor(None, blocking_work, audio_array) + + if transcription: + # Build message and send on event loop + status_marker = "⚡" if is_final else "🎤" + type_marker = "" if is_final else " [streaming]" + message_text = f"{status_marker} {self.peer_name}{type_marker}: {transcription} (blocking final)" + # Reuse existing message id for final update when possible + message_id = self.current_message.id if self.current_message is not None else None + chat_message = self.create_chat_message_func(message_text, message_id) + await self.send_chat_func(chat_message) + # After sending final, clear current_message so streaming restarts cleanly + try: + self.current_message = None + except Exception: + pass + logger.info(f"{self.peer_name}: blocking final transcription sent: '{transcription}'") + else: + # If decode failed/returned empty, fallback to the most recent + # streaming transcription from history (if any) and send it as + # the final message. This ensures clients get a final marker. + fallback_text = None + try: + if self.transcription_history: + # Take last non-final streaming message if present + for h in reversed(self.transcription_history): + if not h.is_final: + # Extract raw transcription portion from stored message + fallback_text = h.message.split(": ", 1)[-1].split(" (🚀")[0] + break + # If none non-final found, take most recent entry + if fallback_text is None: + fallback_text = self.transcription_history[-1].message.split(": ", 1)[-1].split(" (🚀")[0] + except Exception: + fallback_text = None + + if fallback_text: + message_text = f"⚡ {self.peer_name}: {fallback_text} (final - fallback)" + message_id = self.current_message.id if self.current_message is not None else None + chat_message = self.create_chat_message_func(message_text, message_id) + await self.send_chat_func(chat_message) + try: + self.current_message = None + except Exception: + pass + logger.info(f"{self.peer_name}: blocking final fallback sent: '{fallback_text}'") + else: + logger.info(f"{self.peer_name}: blocking final transcription produced no text and no fallback available") + finally: + # Always clear the pending flag when the blocking final finishes + try: + self.final_transcription_pending = False + except Exception: + pass + async def _async_processing_loop(self) -> None: """Async processing loop for audio chunks.""" logger.info(f"Started async processing loop for {self.peer_name}") @@ -1300,19 +1529,19 @@ class OptimizedAudioProcessor: ) except asyncio.TimeoutError: - # Check for final transcription on timeout + # Check for final transcription on timeout; use last_audio_time so + # we also detect the case where frames simply stopped arriving. if ( len(self.current_phrase_audio) > 0 - and time.time() - self.last_activity_time > 2.0 + and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT ): logger.info( - f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError)" + f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError, inactivity)" ) await self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) self.current_phrase_audio = np.array([], dtype=np.float32) - self.final_transcription_pending = False # Reset the flag except Exception as e: logger.error( f"Error in async processing loop for {self.peer_name}: {e}" @@ -1360,14 +1589,15 @@ class OptimizedAudioProcessor: ) except Empty: - # Check for final transcription + # Check for final transcription using last_audio_time so we react + # if frames stop arriving entirely. if ( len(self.current_phrase_audio) > 0 - and time.time() - self.last_activity_time > 2.0 + and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT ): if self.main_loop: logger.info( - f"Final transcription from thread for {self.peer_name}" + f"Final transcription from thread for {self.peer_name} (inactivity)" ) asyncio.run_coroutine_threadsafe( self._transcribe_and_send( @@ -1376,7 +1606,6 @@ class OptimizedAudioProcessor: self.main_loop, ) self.current_phrase_audio = np.array([], dtype=np.float32) - self.final_transcription_pending = False # Reset the flag except Exception as e: logger.error( f"Error in thread processing loop for {self.peer_name}: {e}" @@ -1395,6 +1624,37 @@ class OptimizedAudioProcessor: self.current_phrase_audio = np.array([], dtype=np.float32) self.final_transcription_pending = False + def _start_thread_watchdog(self) -> None: + """Start a lightweight thread-based watchdog when using the thread fallback. + + It periodically checks `last_audio_time` and queues final transcription + if inactivity exceeds INACTIVITY_TIMEOUT. + """ + if hasattr(self, "_thread_watchdog") and self._thread_watchdog: + return + + def watchdog(): + logger.debug(f"Thread watchdog started for {self.peer_name}") + try: + while self.is_running: + time.sleep(0.5) + try: + if ( + len(self.current_phrase_audio) > 0 + and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT + ): + logger.info( + f"Thread watchdog: no audio for {time.time() - self.last_audio_time:.2f}s, queuing final for {self.peer_name}" + ) + self._queue_final_transcription() + except Exception as e: + logger.debug(f"Thread watchdog error for {self.peer_name}: {e}") + finally: + logger.debug(f"Thread watchdog exiting for {self.peer_name}") + + self._thread_watchdog = threading.Thread(target=watchdog, daemon=True) + self._thread_watchdog.start() + async def _transcribe_and_send( self, audio_array: AudioArray, is_final: bool, language: str = "en" ) -> None: @@ -1409,9 +1669,9 @@ class OptimizedAudioProcessor: if audio_array.ndim != 1: raise ValueError("Expected mono audio as a 1D numpy array.") - # Reset the flag for final transcriptions to allow new processing - if is_final: - self.final_transcription_pending = False + # Do NOT reset final_transcription_pending here; keep it set until the + # final transcription task completes to avoid races where new audio is + # accumulated while a final transcription was requested. transcription_start = time.time() transcription_type = "final" if is_final else "streaming" @@ -1473,11 +1733,59 @@ class OptimizedAudioProcessor: # no_speech_threshold=0.6, # Threshold for detecting non-speech -- not supported in OpenVINO ) - generation_output = ov_model.ov_model.generate( # type: ignore - input_features, generation_config=generation_config - ) + # Serialize calls to model.generate to avoid reentrancy issues and + # add diagnostic logging so we can see whether generate/decoding + # completes for final transcriptions. + generation_output = None + try: + if is_final: + logger.info(f"{self.peer_name}: attempting to acquire generate lock (final)") + else: + logger.debug(f"{self.peer_name}: attempting to acquire generate lock") + if hasattr(self, "_generate_lock") and isinstance(self._generate_lock, asyncio.Lock): + await self._generate_lock.acquire() + try: + if is_final: + logger.info(f"{self.peer_name}: calling model.generate (async lock) (final)") + else: + logger.debug(f"{self.peer_name}: calling model.generate (async lock)") + generation_output = ov_model.ov_model.generate( # type: ignore + input_features, generation_config=generation_config + ) + finally: + self._generate_lock.release() + elif hasattr(self, "_generate_lock") and isinstance(self._generate_lock, threading.Lock): + with self._generate_lock: + if is_final: + logger.info(f"{self.peer_name}: calling model.generate (thread lock) (final)") + else: + logger.debug(f"{self.peer_name}: calling model.generate (thread lock)") + generation_output = ov_model.ov_model.generate( # type: ignore + input_features, generation_config=generation_config + ) + else: + if is_final: + logger.info(f"{self.peer_name}: calling model.generate (no lock) (final)") + else: + logger.debug(f"{self.peer_name}: calling model.generate (no lock)") + generation_output = ov_model.ov_model.generate( # type: ignore + input_features, generation_config=generation_config + ) - generated_ids = generation_output + if is_final: + logger.info(f"{self.peer_name}: model.generate complete (final) (type={type(generation_output)})") + else: + logger.debug(f"{self.peer_name}: model.generate complete (type={type(generation_output)})") + except Exception as e: + logger.error(f"{self.peer_name}: model.generate failed: {e}", exc_info=True) + raise + + # Many generate implementations return an object with a + # `.sequences` attribute, so prefer that when available. + if hasattr(generation_output, "sequences"): + generated_ids = generation_output.sequences + else: + generated_ids = generation_output # # Extract transcription and scores # generated_ids = generation_output.sequences @@ -1497,9 +1805,45 @@ class OptimizedAudioProcessor: # avg_confidence = min_confidence = 0.0 # Decode text - transcription: str = ov_model.processor.batch_decode( - generated_ids, skip_special_tokens=True - )[0].strip() + # Primary decode attempt + transcription: str = "" + try: + transcription = ov_model.processor.batch_decode( + generated_ids, skip_special_tokens=True + )[0].strip() + except Exception as decode_e: + logger.warning(f"{self.peer_name}: primary decode failed: {decode_e}") + + # Fallback: if decode produced empty result, attempt to decode + # `generation_output.sequences` (if not already used) or log details + if not transcription: + try: + if hasattr(generation_output, "sequences") and ( + generated_ids is not generation_output.sequences + ): + transcription = ov_model.processor.batch_decode( + generation_output.sequences, skip_special_tokens=True + )[0].strip() + except Exception as fallback_e: + logger.warning(f"{self.peer_name}: fallback decode failed: {fallback_e}") + + # Diagnostic logging if we still have no transcription + if not transcription: + try: + if is_final: + logger.info( + f"{self.peer_name}: final transcription empty after decode; generated_ids repr/shape: {repr(generated_ids)[:200]}" + ) + else: + logger.debug( + f"{self.peer_name}: streaming transcription empty after decode; generated_ids repr/shape: {repr(generated_ids)[:200]}" + ) + except Exception: + logger.debug(f"{self.peer_name}: generated_ids unavailable for diagnostics") + if is_final: + logger.info(f"{self.peer_name}: decoded transcription (final): '{transcription}'") + else: + logger.debug(f"{self.peer_name}: decoded transcription: '{transcription}'") transcription_time = time.time() - transcription_start @@ -1523,30 +1867,50 @@ class OptimizedAudioProcessor: message_text = f"{status_marker} {self.peer_name}{type_marker}: {transcription}{timing_info}" - # Avoid duplicates - if not self._is_duplicate(transcription): - # For streaming transcriptions, reuse the current message ID if it exists - # For final transcriptions, always create a new message - if is_final: - # Final transcription - reset current message - self.current_message = None - message_id = None - else: - # Streaming transcription - reuse current message ID or create new - if self.current_message is None: - message_id = None # Will create new ID - else: - message_id = self.current_message.id - - # Create ChatMessageModel + # Avoid duplicates for streaming updates, but always send final + # transcriptions so the UI/clients receive the final marker even + # if the text matches a recent interim result. + if is_final or not self._is_duplicate(transcription): + # Reuse the existing message ID when possible so the frontend + # updates the streaming message into a final message instead + # of creating a new one. If there is no current_message, a + # new message will be created (message_id=None). + message_id = self.current_message.id if self.current_message is not None else None + + # Create ChatMessageModel (reusing message_id when present) chat_message = self.create_chat_message_func(message_text, message_id) - - # Update current message for streaming + + # Update current message for streaming; for final messages + # clear the current_message after sending so future streams + # start a new message. if not is_final: self.current_message = chat_message - + + if is_final: + logger.info(f"{self.peer_name}: sending chat message (final) -> '{message_text}'") + else: + logger.debug(f"{self.peer_name}: sending chat message (streaming) -> '{message_text}'") + await self.send_chat_func(chat_message) + # Maintain or clear the current_message depending on finality. + if is_final: + # Final message should update the existing message on the client. + # After sending final, clear current_message so a future + # streaming sequence starts a fresh message. + try: + self.current_message = None + except Exception: + pass + logger.info(f"{self.peer_name}: send_chat_func completed for final message") + else: + # Streaming message remains current + try: + self.current_message = chat_message + except Exception: + pass + logger.debug(f"{self.peer_name}: send_chat_func completed for streaming message") + # Update history self.transcription_history.append( TranscriptionHistoryItem( @@ -1575,6 +1939,14 @@ class OptimizedAudioProcessor: f"Error in OpenVINO {transcription_type} transcription: {e}", exc_info=True, ) + finally: + # Only clear the pending flag after the transcription task completes + if is_final: + try: + self.final_transcription_pending = False + logger.debug(f"Cleared final_transcription_pending for {self.peer_name}") + except Exception: + pass def _is_duplicate(self, text: str) -> bool: """Check if transcription is duplicate of recent ones."""