Whisper reworks to get final transcription to reliably fire

This commit is contained in:
James Ketr 2025-09-15 15:34:04 -07:00
parent 38099aeb3c
commit 916151307f

View File

@ -60,6 +60,47 @@ CalibrationData = List[Dict[str, Any]]
_device = "GPU.1" # Default to Intel Arc B580 GPU _device = "GPU.1" # Default to Intel Arc B580 GPU
# Global lock to serialize calls into the OpenVINO model.generate/decode
# since some backends are not safe for concurrent generate calls.
_generate_global_lock = threading.Lock()
def _blocking_generate_decode(audio_array: AudioArray, sample_rate: int, generation_config: GenerationConfig | None = None) -> str:
"""Blocking helper to run processor -> model.generate -> decode while
holding a global lock to serialize OpenVINO access.
"""
try:
with _generate_global_lock:
ov_model = _ensure_model_loaded()
if ov_model.processor is None:
raise RuntimeError("Processor not initialized for OpenVINO model")
# Extract features
inputs = ov_model.processor(audio_array, sampling_rate=sample_rate, return_tensors="pt")
input_features = inputs.input_features
# Use a basic generation config if none provided
gen_cfg = generation_config or GenerationConfig(max_new_tokens=128)
gen_out = ov_model.ov_model.generate(input_features, generation_config=gen_cfg) # type: ignore
# Prefer .sequences if available
if hasattr(gen_out, "sequences"):
ids = gen_out.sequences
else:
ids = gen_out
# Decode
try:
transcription = ov_model.processor.batch_decode(ids, skip_special_tokens=True)[0].strip()
except Exception:
transcription = ""
return transcription
except Exception as e:
logger.error(f"blocking_generate_decode failed: {e}", exc_info=True)
return ""
def get_available_devices() -> list[dict[str, Any]]: def get_available_devices() -> list[dict[str, Any]]:
"""List available OpenVINO devices with their properties.""" """List available OpenVINO devices with their properties."""
@ -213,6 +254,9 @@ VAD_CONFIG = {
"speech_freq_max": 3000, # Hz "speech_freq_max": 3000, # Hz
} }
# How long (seconds) of no-arriving audio before we consider the phrase ended
INACTIVITY_TIMEOUT = 1.5
model_ids = { model_ids = {
"Distil-Whisper": [ "Distil-Whisper": [
"distil-whisper/distil-large-v2", "distil-whisper/distil-large-v2",
@ -1128,7 +1172,18 @@ class OptimizedAudioProcessor:
try: try:
self.main_loop = asyncio.get_running_loop() self.main_loop = asyncio.get_running_loop()
asyncio.create_task(self._async_processing_loop()) asyncio.create_task(self._async_processing_loop())
# Start inactivity watchdog to ensure finalization when frames stop arriving
try:
asyncio.create_task(self._silence_watchdog())
except Exception:
logger.debug(f"Could not start silence watchdog task for {self.peer_name}")
logger.info(f"Started async processing for {self.peer_name}") logger.info(f"Started async processing for {self.peer_name}")
# Lock to serialize model.generate calls (OpenVINO model may not
# be reentrant across concurrent generate calls).
try:
self._generate_lock = asyncio.Lock()
except Exception:
self._generate_lock = None
except RuntimeError: except RuntimeError:
# Fallback to thread-based processing # Fallback to thread-based processing
self.main_loop = None self.main_loop = None
@ -1137,9 +1192,37 @@ class OptimizedAudioProcessor:
) )
self.processor_thread.start() self.processor_thread.start()
logger.warning(f"Using thread-based processing for {self.peer_name}") logger.warning(f"Using thread-based processing for {self.peer_name}")
# For thread-fallback create a thread lock used if asyncio lock is unavailable
self._generate_lock = threading.Lock()
logger.info(f"OptimizedAudioProcessor initialized for {self.peer_name}") logger.info(f"OptimizedAudioProcessor initialized for {self.peer_name}")
async def _silence_watchdog(self) -> None:
"""Watch for inactivity (no frames arriving) and queue a final transcription.
This runs as a lightweight task and uses `last_audio_time` which is
updated on any received audio frame. This makes finalization robust in
the case where the remote peer simply stops sending frames (no
non-speech frames will arrive to increment `silence_frames`).
"""
logger.debug(f"Silence watchdog started for {self.peer_name}")
try:
while self.is_running:
await asyncio.sleep(0.5)
try:
if (
len(self.current_phrase_audio) > 0
and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT
):
logger.info(
f"Silence watchdog: no audio for {time.time() - self.last_audio_time:.2f}s, queuing final for {self.peer_name}"
)
self._queue_final_transcription()
except Exception as e:
logger.debug(f"Silence watchdog error for {self.peer_name}: {e}")
finally:
logger.debug(f"Silence watchdog exiting for {self.peer_name}")
def add_audio_data(self, audio_data: AudioArray) -> None: def add_audio_data(self, audio_data: AudioArray) -> None:
"""Add audio data with enhanced Voice Activity Detection, preventing leading silence.""" """Add audio data with enhanced Voice Activity Detection, preventing leading silence."""
if not self.is_running or len(audio_data) == 0: if not self.is_running or len(audio_data) == 0:
@ -1262,17 +1345,163 @@ class OptimizedAudioProcessor:
if ( if (
len(self.current_phrase_audio) > self.sample_rate * 0.5 len(self.current_phrase_audio) > self.sample_rate * 0.5
): # At least 0.5 seconds ): # At least 0.5 seconds
logger.info(f"Queueing final transcription for {self.peer_name}")
self.final_transcription_pending = True
# Use a blocking-worker path for final transcriptions that runs
# model.generate in a threadpool. This isolates the heavy OpenVINO
# call from the event loop and reduces races / reentrancy issues.
if self.main_loop: if self.main_loop:
logger.info(f"Queueing final transcription for {self.peer_name}") try:
self.final_transcription_pending = True asyncio.run_coroutine_threadsafe(
asyncio.create_task( self._blocking_transcribe_and_send(
self._transcribe_and_send( self.current_phrase_audio.copy(), is_final=True
self.current_phrase_audio.copy(), is_final=True ),
self.main_loop,
)
except Exception as e:
logger.error(f"Failed to schedule blocking final transcription: {e}")
# Also schedule an immediate lightweight final marker so the UI
#/client sees a final event right away (helps when generation is
#delayed or fails). We schedule this on the main loop.
try:
async def _send_final_marker():
try:
marker_text = f"{self.peer_name}: (finalizing...)"
# Reuse existing streaming message id if present so the
# UI updates the streaming message into a final marker
message_id = self.current_message.id if self.current_message is not None else None
cm = self.create_chat_message_func(marker_text, message_id)
# Keep the current_message reference so the final send
# can reuse the same id.
if self.current_message is None:
try:
self.current_message = cm
except Exception:
pass
await self.send_chat_func(cm)
logger.info(f"{self.peer_name}: sent immediate final marker")
except Exception as e:
logger.debug(f"Failed to send final marker for {self.peer_name}: {e}")
asyncio.run_coroutine_threadsafe(_send_final_marker(), self.main_loop)
except Exception:
logger.debug(f"Could not schedule final marker for {self.peer_name}")
else:
# As a fallback, try to schedule the normal coroutine if possible
try:
asyncio.create_task(
self._transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True
)
)
except Exception:
logger.debug(
f"Could not schedule final transcription for {self.peer_name} (no main_loop)"
) )
)
self.current_phrase_audio = np.array([], dtype=np.float32) self.current_phrase_audio = np.array([], dtype=np.float32)
async def _blocking_transcribe_and_send(
self, audio_array: AudioArray, is_final: bool, language: str = "en"
) -> None:
"""Run the heavy generate+decode work inside a threadpool, then send the
chat message on the event loop. This reduces reentrancy and resource
contention with streaming transcriptions.
"""
loop = asyncio.get_event_loop()
def blocking_work(audio_in: AudioArray) -> tuple[str, float]:
try:
# Ensure model is loaded in this thread/process
ov_model = _ensure_model_loaded()
# Extract features (this is relatively cheap but keep on thread)
input_features = ov_model.processor(
audio_in, sampling_rate=self.sample_rate, return_tensors="pt"
).input_features
# Perform generation (blocking)
# Serialize access to the underlying OpenVINO generation call
# to avoid concurrency problems with the OpenVINO runtime.
with _generate_global_lock:
gen_out = ov_model.ov_model.generate(
input_features, generation_config=GenerationConfig(max_new_tokens=128)
)
# Try to extract sequences if present
if hasattr(gen_out, "sequences"):
ids = gen_out.sequences
else:
ids = gen_out
# Decode
try:
text = ov_model.processor.batch_decode(ids, skip_special_tokens=True)[0].strip()
except Exception:
text = ""
return text, 0.0
except Exception as e:
logger.error(f"Blocking transcription failed for {self.peer_name}: {e}", exc_info=True)
return "", 0.0
try:
# Run blocking work in executor
transcription, _ = await loop.run_in_executor(None, blocking_work, audio_array)
if transcription:
# Build message and send on event loop
status_marker = "" if is_final else "🎤"
type_marker = "" if is_final else " [streaming]"
message_text = f"{status_marker} {self.peer_name}{type_marker}: {transcription} (blocking final)"
# Reuse existing message id for final update when possible
message_id = self.current_message.id if self.current_message is not None else None
chat_message = self.create_chat_message_func(message_text, message_id)
await self.send_chat_func(chat_message)
# After sending final, clear current_message so streaming restarts cleanly
try:
self.current_message = None
except Exception:
pass
logger.info(f"{self.peer_name}: blocking final transcription sent: '{transcription}'")
else:
# If decode failed/returned empty, fallback to the most recent
# streaming transcription from history (if any) and send it as
# the final message. This ensures clients get a final marker.
fallback_text = None
try:
if self.transcription_history:
# Take last non-final streaming message if present
for h in reversed(self.transcription_history):
if not h.is_final:
# Extract raw transcription portion from stored message
fallback_text = h.message.split(": ", 1)[-1].split(" (🚀")[0]
break
# If none non-final found, take most recent entry
if fallback_text is None:
fallback_text = self.transcription_history[-1].message.split(": ", 1)[-1].split(" (🚀")[0]
except Exception:
fallback_text = None
if fallback_text:
message_text = f"{self.peer_name}: {fallback_text} (final - fallback)"
message_id = self.current_message.id if self.current_message is not None else None
chat_message = self.create_chat_message_func(message_text, message_id)
await self.send_chat_func(chat_message)
try:
self.current_message = None
except Exception:
pass
logger.info(f"{self.peer_name}: blocking final fallback sent: '{fallback_text}'")
else:
logger.info(f"{self.peer_name}: blocking final transcription produced no text and no fallback available")
finally:
# Always clear the pending flag when the blocking final finishes
try:
self.final_transcription_pending = False
except Exception:
pass
async def _async_processing_loop(self) -> None: async def _async_processing_loop(self) -> None:
"""Async processing loop for audio chunks.""" """Async processing loop for audio chunks."""
logger.info(f"Started async processing loop for {self.peer_name}") logger.info(f"Started async processing loop for {self.peer_name}")
@ -1300,19 +1529,19 @@ class OptimizedAudioProcessor:
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
# Check for final transcription on timeout # Check for final transcription on timeout; use last_audio_time so
# we also detect the case where frames simply stopped arriving.
if ( if (
len(self.current_phrase_audio) > 0 len(self.current_phrase_audio) > 0
and time.time() - self.last_activity_time > 2.0 and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT
): ):
logger.info( logger.info(
f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError)" f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError, inactivity)"
) )
await self._transcribe_and_send( await self._transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True self.current_phrase_audio.copy(), is_final=True
) )
self.current_phrase_audio = np.array([], dtype=np.float32) self.current_phrase_audio = np.array([], dtype=np.float32)
self.final_transcription_pending = False # Reset the flag
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error in async processing loop for {self.peer_name}: {e}" f"Error in async processing loop for {self.peer_name}: {e}"
@ -1360,14 +1589,15 @@ class OptimizedAudioProcessor:
) )
except Empty: except Empty:
# Check for final transcription # Check for final transcription using last_audio_time so we react
# if frames stop arriving entirely.
if ( if (
len(self.current_phrase_audio) > 0 len(self.current_phrase_audio) > 0
and time.time() - self.last_activity_time > 2.0 and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT
): ):
if self.main_loop: if self.main_loop:
logger.info( logger.info(
f"Final transcription from thread for {self.peer_name}" f"Final transcription from thread for {self.peer_name} (inactivity)"
) )
asyncio.run_coroutine_threadsafe( asyncio.run_coroutine_threadsafe(
self._transcribe_and_send( self._transcribe_and_send(
@ -1376,7 +1606,6 @@ class OptimizedAudioProcessor:
self.main_loop, self.main_loop,
) )
self.current_phrase_audio = np.array([], dtype=np.float32) self.current_phrase_audio = np.array([], dtype=np.float32)
self.final_transcription_pending = False # Reset the flag
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error in thread processing loop for {self.peer_name}: {e}" f"Error in thread processing loop for {self.peer_name}: {e}"
@ -1395,6 +1624,37 @@ class OptimizedAudioProcessor:
self.current_phrase_audio = np.array([], dtype=np.float32) self.current_phrase_audio = np.array([], dtype=np.float32)
self.final_transcription_pending = False self.final_transcription_pending = False
def _start_thread_watchdog(self) -> None:
"""Start a lightweight thread-based watchdog when using the thread fallback.
It periodically checks `last_audio_time` and queues final transcription
if inactivity exceeds INACTIVITY_TIMEOUT.
"""
if hasattr(self, "_thread_watchdog") and self._thread_watchdog:
return
def watchdog():
logger.debug(f"Thread watchdog started for {self.peer_name}")
try:
while self.is_running:
time.sleep(0.5)
try:
if (
len(self.current_phrase_audio) > 0
and time.time() - self.last_audio_time > INACTIVITY_TIMEOUT
):
logger.info(
f"Thread watchdog: no audio for {time.time() - self.last_audio_time:.2f}s, queuing final for {self.peer_name}"
)
self._queue_final_transcription()
except Exception as e:
logger.debug(f"Thread watchdog error for {self.peer_name}: {e}")
finally:
logger.debug(f"Thread watchdog exiting for {self.peer_name}")
self._thread_watchdog = threading.Thread(target=watchdog, daemon=True)
self._thread_watchdog.start()
async def _transcribe_and_send( async def _transcribe_and_send(
self, audio_array: AudioArray, is_final: bool, language: str = "en" self, audio_array: AudioArray, is_final: bool, language: str = "en"
) -> None: ) -> None:
@ -1409,9 +1669,9 @@ class OptimizedAudioProcessor:
if audio_array.ndim != 1: if audio_array.ndim != 1:
raise ValueError("Expected mono audio as a 1D numpy array.") raise ValueError("Expected mono audio as a 1D numpy array.")
# Reset the flag for final transcriptions to allow new processing # Do NOT reset final_transcription_pending here; keep it set until the
if is_final: # final transcription task completes to avoid races where new audio is
self.final_transcription_pending = False # accumulated while a final transcription was requested.
transcription_start = time.time() transcription_start = time.time()
transcription_type = "final" if is_final else "streaming" transcription_type = "final" if is_final else "streaming"
@ -1473,11 +1733,59 @@ class OptimizedAudioProcessor:
# no_speech_threshold=0.6, # Threshold for detecting non-speech -- not supported in OpenVINO # no_speech_threshold=0.6, # Threshold for detecting non-speech -- not supported in OpenVINO
) )
generation_output = ov_model.ov_model.generate( # type: ignore # Serialize calls to model.generate to avoid reentrancy issues and
input_features, generation_config=generation_config # add diagnostic logging so we can see whether generate/decoding
) # completes for final transcriptions.
generation_output = None
try:
if is_final:
logger.info(f"{self.peer_name}: attempting to acquire generate lock (final)")
else:
logger.debug(f"{self.peer_name}: attempting to acquire generate lock")
if hasattr(self, "_generate_lock") and isinstance(self._generate_lock, asyncio.Lock):
await self._generate_lock.acquire()
try:
if is_final:
logger.info(f"{self.peer_name}: calling model.generate (async lock) (final)")
else:
logger.debug(f"{self.peer_name}: calling model.generate (async lock)")
generation_output = ov_model.ov_model.generate( # type: ignore
input_features, generation_config=generation_config
)
finally:
self._generate_lock.release()
elif hasattr(self, "_generate_lock") and isinstance(self._generate_lock, threading.Lock):
with self._generate_lock:
if is_final:
logger.info(f"{self.peer_name}: calling model.generate (thread lock) (final)")
else:
logger.debug(f"{self.peer_name}: calling model.generate (thread lock)")
generation_output = ov_model.ov_model.generate( # type: ignore
input_features, generation_config=generation_config
)
else:
if is_final:
logger.info(f"{self.peer_name}: calling model.generate (no lock) (final)")
else:
logger.debug(f"{self.peer_name}: calling model.generate (no lock)")
generation_output = ov_model.ov_model.generate( # type: ignore
input_features, generation_config=generation_config
)
generated_ids = generation_output if is_final:
logger.info(f"{self.peer_name}: model.generate complete (final) (type={type(generation_output)})")
else:
logger.debug(f"{self.peer_name}: model.generate complete (type={type(generation_output)})")
except Exception as e:
logger.error(f"{self.peer_name}: model.generate failed: {e}", exc_info=True)
raise
# Many generate implementations return an object with a
# `.sequences` attribute, so prefer that when available.
if hasattr(generation_output, "sequences"):
generated_ids = generation_output.sequences
else:
generated_ids = generation_output
# # Extract transcription and scores # # Extract transcription and scores
# generated_ids = generation_output.sequences # generated_ids = generation_output.sequences
@ -1497,9 +1805,45 @@ class OptimizedAudioProcessor:
# avg_confidence = min_confidence = 0.0 # avg_confidence = min_confidence = 0.0
# Decode text # Decode text
transcription: str = ov_model.processor.batch_decode( # Primary decode attempt
generated_ids, skip_special_tokens=True transcription: str = ""
)[0].strip() try:
transcription = ov_model.processor.batch_decode(
generated_ids, skip_special_tokens=True
)[0].strip()
except Exception as decode_e:
logger.warning(f"{self.peer_name}: primary decode failed: {decode_e}")
# Fallback: if decode produced empty result, attempt to decode
# `generation_output.sequences` (if not already used) or log details
if not transcription:
try:
if hasattr(generation_output, "sequences") and (
generated_ids is not generation_output.sequences
):
transcription = ov_model.processor.batch_decode(
generation_output.sequences, skip_special_tokens=True
)[0].strip()
except Exception as fallback_e:
logger.warning(f"{self.peer_name}: fallback decode failed: {fallback_e}")
# Diagnostic logging if we still have no transcription
if not transcription:
try:
if is_final:
logger.info(
f"{self.peer_name}: final transcription empty after decode; generated_ids repr/shape: {repr(generated_ids)[:200]}"
)
else:
logger.debug(
f"{self.peer_name}: streaming transcription empty after decode; generated_ids repr/shape: {repr(generated_ids)[:200]}"
)
except Exception:
logger.debug(f"{self.peer_name}: generated_ids unavailable for diagnostics")
if is_final:
logger.info(f"{self.peer_name}: decoded transcription (final): '{transcription}'")
else:
logger.debug(f"{self.peer_name}: decoded transcription: '{transcription}'")
transcription_time = time.time() - transcription_start transcription_time = time.time() - transcription_start
@ -1523,30 +1867,50 @@ class OptimizedAudioProcessor:
message_text = f"{status_marker} {self.peer_name}{type_marker}: {transcription}{timing_info}" message_text = f"{status_marker} {self.peer_name}{type_marker}: {transcription}{timing_info}"
# Avoid duplicates # Avoid duplicates for streaming updates, but always send final
if not self._is_duplicate(transcription): # transcriptions so the UI/clients receive the final marker even
# For streaming transcriptions, reuse the current message ID if it exists # if the text matches a recent interim result.
# For final transcriptions, always create a new message if is_final or not self._is_duplicate(transcription):
if is_final: # Reuse the existing message ID when possible so the frontend
# Final transcription - reset current message # updates the streaming message into a final message instead
self.current_message = None # of creating a new one. If there is no current_message, a
message_id = None # new message will be created (message_id=None).
else: message_id = self.current_message.id if self.current_message is not None else None
# Streaming transcription - reuse current message ID or create new
if self.current_message is None: # Create ChatMessageModel (reusing message_id when present)
message_id = None # Will create new ID
else:
message_id = self.current_message.id
# Create ChatMessageModel
chat_message = self.create_chat_message_func(message_text, message_id) chat_message = self.create_chat_message_func(message_text, message_id)
# Update current message for streaming # Update current message for streaming; for final messages
# clear the current_message after sending so future streams
# start a new message.
if not is_final: if not is_final:
self.current_message = chat_message self.current_message = chat_message
if is_final:
logger.info(f"{self.peer_name}: sending chat message (final) -> '{message_text}'")
else:
logger.debug(f"{self.peer_name}: sending chat message (streaming) -> '{message_text}'")
await self.send_chat_func(chat_message) await self.send_chat_func(chat_message)
# Maintain or clear the current_message depending on finality.
if is_final:
# Final message should update the existing message on the client.
# After sending final, clear current_message so a future
# streaming sequence starts a fresh message.
try:
self.current_message = None
except Exception:
pass
logger.info(f"{self.peer_name}: send_chat_func completed for final message")
else:
# Streaming message remains current
try:
self.current_message = chat_message
except Exception:
pass
logger.debug(f"{self.peer_name}: send_chat_func completed for streaming message")
# Update history # Update history
self.transcription_history.append( self.transcription_history.append(
TranscriptionHistoryItem( TranscriptionHistoryItem(
@ -1575,6 +1939,14 @@ class OptimizedAudioProcessor:
f"Error in OpenVINO {transcription_type} transcription: {e}", f"Error in OpenVINO {transcription_type} transcription: {e}",
exc_info=True, exc_info=True,
) )
finally:
# Only clear the pending flag after the transcription task completes
if is_final:
try:
self.final_transcription_pending = False
logger.debug(f"Cleared final_transcription_pending for {self.peer_name}")
except Exception:
pass
def _is_duplicate(self, text: str) -> bool: def _is_duplicate(self, text: str) -> bool:
"""Check if transcription is duplicate of recent ones.""" """Check if transcription is duplicate of recent ones."""