Process all audio for final transcription

This commit is contained in:
James Ketr 2025-09-15 17:28:13 -07:00
parent 3cfc148724
commit cab90b6567

View File

@ -1366,37 +1366,40 @@ class OptimizedAudioProcessor:
def _queue_final_transcription(self) -> None:
"""Queue final transcription of current phrase."""
if (
len(self.current_phrase_audio) > self.sample_rate * 0.5
): # At least 0.5 seconds
logger.info(f"Queueing final transcription for {self.peer_name}")
self.final_transcription_pending = True
# Use a blocking-worker path for final transcriptions that runs
# model.generate in a threadpool. This isolates the heavy OpenVINO
# call from the event loop and reduces races / reentrancy issues.
if self.main_loop:
# Always attempt to include any remaining samples in the circular
# buffer when creating a final transcription. Because the thread
# watchdog may call this method from a non-event-loop thread, we
# schedule the actual drain + final transcription on the configured
# main event loop. This avoids concurrent access to the circular
# buffer pointers and ensures the final audio contains trailing
# partial chunks that haven't reached `chunk_size` yet.
async def _queue_final_coroutine():
try:
asyncio.run_coroutine_threadsafe(
self._blocking_transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True
),
self.main_loop,
# Drain any samples remaining in the circular buffer
available = 0
try:
available = self._available_samples()
if available > 0:
tail = self._extract_chunk(available)
if tail.size > 0:
self.current_phrase_audio = np.concatenate(
[self.current_phrase_audio, tail]
)
except Exception as e:
logger.error(f"Failed to schedule blocking final transcription: {e}")
# Also schedule an immediate lightweight final marker so the UI
#/client sees a final event right away (helps when generation is
#delayed or fails). We schedule this on the main loop.
try:
async def _send_final_marker():
logger.debug(f"Failed to drain circular buffer for final: {e}")
if len(self.current_phrase_audio) > self.sample_rate * 0.5:
logger.info(f"Queueing final transcription for {self.peer_name} (drained={available if 'available' in locals() else 0})")
self.final_transcription_pending = True
# Send an immediate lightweight final marker so the UI
# receives a quick final event while the heavy generate
# runs in the background.
try:
marker_text = f"{self.peer_name}: (finalizing...)"
# Reuse existing streaming message id if present so the
# UI updates the streaming message into a final marker
message_id = self.current_message.id if self.current_message is not None else None
cm = self.create_chat_message_func(marker_text, message_id)
# Keep the current_message reference so the final send
# can reuse the same id.
if self.current_message is None:
try:
self.current_message = cm
@ -1407,25 +1410,58 @@ class OptimizedAudioProcessor:
except Exception as e:
logger.debug(f"Failed to send final marker for {self.peer_name}: {e}")
asyncio.run_coroutine_threadsafe(_send_final_marker(), self.main_loop)
except Exception:
logger.debug(f"Could not schedule final marker for {self.peer_name}")
# As a fallback (if we couldn't schedule the marker on the
# main loop), try to schedule the normal async transcription
# coroutine. This is only used when the immediate marker
# cannot be scheduled — avoid scheduling both paths.
# Run the blocking final transcription in a coroutine
# that offloads the heavy work to a threadpool (existing
# helper handles this). We await it here so we can clear
# state afterwards in the same coroutine context.
try:
asyncio.create_task(
self._transcribe_and_send(
await self._blocking_transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True
)
)
except Exception:
logger.debug(
f"Could not schedule final transcription for {self.peer_name} (no main_loop)"
)
except Exception as e:
logger.error(f"Error running blocking final transcription coroutine: {e}")
# Clear current phrase buffer after scheduling/completing final
self.current_phrase_audio = np.array([], dtype=np.float32)
finally:
# Ensure the pending flag is cleared if something went wrong
try:
self.final_transcription_pending = False
except Exception:
pass
# If we have an event loop available, schedule the coroutine there so
# buffer operations happen on the loop and avoid races with the
# producer side. If no main loop is available, fall back to running
# the coroutine via create_task (best-effort) or thread executor.
try:
if self.main_loop is not None:
try:
asyncio.run_coroutine_threadsafe(_queue_final_coroutine(), self.main_loop)
return
except Exception as e:
logger.debug(f"Failed to schedule final coroutine on main loop: {e}")
# Fallback: try to create a task on the current loop
try:
asyncio.create_task(_queue_final_coroutine())
return
except Exception:
# As a last resort, run the coroutine synchronously in a new
# event loop (blocking) so a final is still produced.
import asyncio as _asyncio
try:
_loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(_loop)
_loop.run_until_complete(_queue_final_coroutine())
finally:
try:
_asyncio.set_event_loop(None)
except Exception:
pass
except Exception as e:
logger.error(f"Unexpected error scheduling final transcription: {e}")
async def _blocking_transcribe_and_send(
self, audio_array: AudioArray, is_final: bool, language: str = "en"
@ -1447,11 +1483,23 @@ class OptimizedAudioProcessor:
).input_features
# Perform generation (blocking)
# Use the same generation configuration as the async path
# (higher-quality beam search) to avoid weaker final
# transcriptions when using the blocking path.
gen_cfg = GenerationConfig(
max_length=448,
num_beams=6,
no_repeat_ngram_size=3,
use_cache=True,
early_stopping=True,
max_new_tokens=128,
)
# Serialize access to the underlying OpenVINO generation call
# to avoid concurrency problems with the OpenVINO runtime.
with _generate_global_lock:
gen_out = ov_model.ov_model.generate(
input_features, generation_config=GenerationConfig(max_new_tokens=128)
input_features, generation_config=gen_cfg
)
# Try to extract sequences if present
@ -1569,12 +1617,27 @@ class OptimizedAudioProcessor:
# another final. Otherwise set the pending flag and run the
# final transcription.
if not self.final_transcription_pending:
# Drain any remaining circular-buffer samples into the
# current phrase so trailing partial packets are included
# in the final transcription.
try:
available = self._available_samples()
if available > 0:
tail = self._extract_chunk(available)
if tail.size > 0:
self.current_phrase_audio = np.concatenate([
self.current_phrase_audio, tail
])
except Exception as e:
logger.debug(f"Failed to drain circular buffer before async final: {e}")
self.final_transcription_pending = True
await self._transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True
)
else:
logger.debug(f"Final already pending for {self.peer_name}; skipping async final")
self.current_phrase_audio = np.array([], dtype=np.float32)
except Exception as e:
logger.error(
@ -1633,17 +1696,24 @@ class OptimizedAudioProcessor:
logger.info(
f"Final transcription from thread for {self.peer_name} (inactivity)"
)
# Avoid scheduling duplicates if a final is already pending
if not self.final_transcription_pending:
self.final_transcription_pending = True
# Delegate to the safe finalization path which drains the
# circular buffer on the main loop and schedules the heavy
# blocking transcription there. This avoids concurrent
# buffer access races between threads.
try:
self._queue_final_transcription()
except Exception:
# As a fallback, try to schedule the transcription
# directly on the main loop (best-effort).
try:
asyncio.run_coroutine_threadsafe(
self._transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True
),
self.main_loop,
)
else:
logger.debug(f"Final already pending for {self.peer_name}; skipping thread-scheduled final")
except Exception as e:
logger.debug(f"Failed to schedule thread final fallback: {e}")
self.current_phrase_audio = np.array([], dtype=np.float32)
except Exception as e:
logger.error(