diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index a19fcac..c756e39 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -1366,37 +1366,40 @@ class OptimizedAudioProcessor: def _queue_final_transcription(self) -> None: """Queue final transcription of current phrase.""" - if ( - len(self.current_phrase_audio) > self.sample_rate * 0.5 - ): # At least 0.5 seconds - logger.info(f"Queueing final transcription for {self.peer_name}") - self.final_transcription_pending = True - # Use a blocking-worker path for final transcriptions that runs - # model.generate in a threadpool. This isolates the heavy OpenVINO - # call from the event loop and reduces races / reentrancy issues. - if self.main_loop: - try: - asyncio.run_coroutine_threadsafe( - self._blocking_transcribe_and_send( - self.current_phrase_audio.copy(), is_final=True - ), - self.main_loop, - ) - except Exception as e: - logger.error(f"Failed to schedule blocking final transcription: {e}") - # Also schedule an immediate lightweight final marker so the UI - #/client sees a final event right away (helps when generation is - #delayed or fails). We schedule this on the main loop. + # Always attempt to include any remaining samples in the circular + # buffer when creating a final transcription. Because the thread + # watchdog may call this method from a non-event-loop thread, we + # schedule the actual drain + final transcription on the configured + # main event loop. This avoids concurrent access to the circular + # buffer pointers and ensures the final audio contains trailing + # partial chunks that haven't reached `chunk_size` yet. + + async def _queue_final_coroutine(): try: - async def _send_final_marker(): + # Drain any samples remaining in the circular buffer + available = 0 + try: + available = self._available_samples() + if available > 0: + tail = self._extract_chunk(available) + if tail.size > 0: + self.current_phrase_audio = np.concatenate( + [self.current_phrase_audio, tail] + ) + except Exception as e: + logger.debug(f"Failed to drain circular buffer for final: {e}") + + if len(self.current_phrase_audio) > self.sample_rate * 0.5: + logger.info(f"Queueing final transcription for {self.peer_name} (drained={available if 'available' in locals() else 0})") + self.final_transcription_pending = True + + # Send an immediate lightweight final marker so the UI + # receives a quick final event while the heavy generate + # runs in the background. try: marker_text = f"⚡ {self.peer_name}: (finalizing...)" - # Reuse existing streaming message id if present so the - # UI updates the streaming message into a final marker message_id = self.current_message.id if self.current_message is not None else None cm = self.create_chat_message_func(marker_text, message_id) - # Keep the current_message reference so the final send - # can reuse the same id. if self.current_message is None: try: self.current_message = cm @@ -1407,25 +1410,58 @@ class OptimizedAudioProcessor: except Exception as e: logger.debug(f"Failed to send final marker for {self.peer_name}: {e}") - asyncio.run_coroutine_threadsafe(_send_final_marker(), self.main_loop) - except Exception: - logger.debug(f"Could not schedule final marker for {self.peer_name}") - # As a fallback (if we couldn't schedule the marker on the - # main loop), try to schedule the normal async transcription - # coroutine. This is only used when the immediate marker - # cannot be scheduled — avoid scheduling both paths. - try: - asyncio.create_task( - self._transcribe_and_send( + # Run the blocking final transcription in a coroutine + # that offloads the heavy work to a threadpool (existing + # helper handles this). We await it here so we can clear + # state afterwards in the same coroutine context. + try: + await self._blocking_transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) - ) - except Exception: - logger.debug( - f"Could not schedule final transcription for {self.peer_name} (no main_loop)" - ) + except Exception as e: + logger.error(f"Error running blocking final transcription coroutine: {e}") - self.current_phrase_audio = np.array([], dtype=np.float32) + # Clear current phrase buffer after scheduling/completing final + self.current_phrase_audio = np.array([], dtype=np.float32) + finally: + # Ensure the pending flag is cleared if something went wrong + try: + self.final_transcription_pending = False + except Exception: + pass + + # If we have an event loop available, schedule the coroutine there so + # buffer operations happen on the loop and avoid races with the + # producer side. If no main loop is available, fall back to running + # the coroutine via create_task (best-effort) or thread executor. + try: + if self.main_loop is not None: + try: + asyncio.run_coroutine_threadsafe(_queue_final_coroutine(), self.main_loop) + return + except Exception as e: + logger.debug(f"Failed to schedule final coroutine on main loop: {e}") + + # Fallback: try to create a task on the current loop + try: + asyncio.create_task(_queue_final_coroutine()) + return + except Exception: + # As a last resort, run the coroutine synchronously in a new + # event loop (blocking) so a final is still produced. + import asyncio as _asyncio + + try: + _loop = _asyncio.new_event_loop() + _asyncio.set_event_loop(_loop) + _loop.run_until_complete(_queue_final_coroutine()) + finally: + try: + _asyncio.set_event_loop(None) + except Exception: + pass + except Exception as e: + logger.error(f"Unexpected error scheduling final transcription: {e}") async def _blocking_transcribe_and_send( self, audio_array: AudioArray, is_final: bool, language: str = "en" @@ -1447,11 +1483,23 @@ class OptimizedAudioProcessor: ).input_features # Perform generation (blocking) + # Use the same generation configuration as the async path + # (higher-quality beam search) to avoid weaker final + # transcriptions when using the blocking path. + gen_cfg = GenerationConfig( + max_length=448, + num_beams=6, + no_repeat_ngram_size=3, + use_cache=True, + early_stopping=True, + max_new_tokens=128, + ) + # Serialize access to the underlying OpenVINO generation call # to avoid concurrency problems with the OpenVINO runtime. with _generate_global_lock: gen_out = ov_model.ov_model.generate( - input_features, generation_config=GenerationConfig(max_new_tokens=128) + input_features, generation_config=gen_cfg ) # Try to extract sequences if present @@ -1569,12 +1617,27 @@ class OptimizedAudioProcessor: # another final. Otherwise set the pending flag and run the # final transcription. if not self.final_transcription_pending: + # Drain any remaining circular-buffer samples into the + # current phrase so trailing partial packets are included + # in the final transcription. + try: + available = self._available_samples() + if available > 0: + tail = self._extract_chunk(available) + if tail.size > 0: + self.current_phrase_audio = np.concatenate([ + self.current_phrase_audio, tail + ]) + except Exception as e: + logger.debug(f"Failed to drain circular buffer before async final: {e}") + self.final_transcription_pending = True await self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True ) else: logger.debug(f"Final already pending for {self.peer_name}; skipping async final") + self.current_phrase_audio = np.array([], dtype=np.float32) except Exception as e: logger.error( @@ -1633,17 +1696,24 @@ class OptimizedAudioProcessor: logger.info( f"Final transcription from thread for {self.peer_name} (inactivity)" ) - # Avoid scheduling duplicates if a final is already pending - if not self.final_transcription_pending: - self.final_transcription_pending = True - asyncio.run_coroutine_threadsafe( - self._transcribe_and_send( - self.current_phrase_audio.copy(), is_final=True - ), - self.main_loop, - ) - else: - logger.debug(f"Final already pending for {self.peer_name}; skipping thread-scheduled final") + # Delegate to the safe finalization path which drains the + # circular buffer on the main loop and schedules the heavy + # blocking transcription there. This avoids concurrent + # buffer access races between threads. + try: + self._queue_final_transcription() + except Exception: + # As a fallback, try to schedule the transcription + # directly on the main loop (best-effort). + try: + asyncio.run_coroutine_threadsafe( + self._transcribe_and_send( + self.current_phrase_audio.copy(), is_final=True + ), + self.main_loop, + ) + except Exception as e: + logger.debug(f"Failed to schedule thread final fallback: {e}") self.current_phrase_audio = np.array([], dtype=np.float32) except Exception as e: logger.error(