1834 lines
70 KiB
Python
1834 lines
70 KiB
Python
"""Streaming Whisper agent (bots/whisper) - OpenVINO Optimized for Intel Arc B580
|
|
|
|
Real-time speech transcription agent that processes incoming audio streams
|
|
and sends transcriptions as chat messages to the lobby.
|
|
Optimized for Intel Arc B580 GPU using OpenVINO inference engine.
|
|
"""
|
|
|
|
import asyncio
|
|
import numpy as np
|
|
import time
|
|
import threading
|
|
import os
|
|
import gc
|
|
import shutil
|
|
from queue import Queue, Empty
|
|
from typing import Dict, Optional, Callable, Awaitable, Any, List, Union
|
|
from pathlib import Path
|
|
import numpy.typing as npt
|
|
from pydantic import BaseModel, Field, ConfigDict
|
|
|
|
# Core dependencies
|
|
import librosa
|
|
from shared.logger import logger
|
|
from aiortc import MediaStreamTrack
|
|
from aiortc.mediastreams import MediaStreamError
|
|
from av import AudioFrame, VideoFrame
|
|
import cv2
|
|
import fractions
|
|
from time import perf_counter
|
|
|
|
# Import shared models for chat functionality
|
|
import sys
|
|
|
|
sys.path.append(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
)
|
|
from shared.models import ChatMessageModel
|
|
from voicebot.models import Peer
|
|
|
|
# OpenVINO optimized imports
|
|
import openvino as ov
|
|
from optimum.intel.openvino import OVModelForSpeechSeq2Seq # type: ignore
|
|
from transformers import WhisperProcessor
|
|
from transformers.generation.configuration_utils import GenerationConfig
|
|
from openvino.runtime import Core # Part of optimum.intel.openvino # type: ignore
|
|
import torch
|
|
|
|
# Import quantization dependencies with error handling
|
|
import nncf # type: ignore
|
|
from optimum.intel.openvino.quantization import InferRequestWrapper # type: ignore
|
|
|
|
QUANTIZATION_AVAILABLE = True
|
|
|
|
# Type definitions
|
|
AudioArray = npt.NDArray[np.float32]
|
|
ModelConfig = Dict[str, Union[str, int, bool]]
|
|
CalibrationData = List[Dict[str, Any]]
|
|
|
|
_device = "GPU.1" # Default to Intel Arc B580 GPU
|
|
|
|
|
|
def get_available_devices() -> list[dict[str, Any]]:
|
|
"""List available OpenVINO devices with their properties."""
|
|
try:
|
|
core = Core()
|
|
devices = core.available_devices
|
|
device_info: list[dict[str, Any]] = []
|
|
for device in devices:
|
|
try:
|
|
# Get device properties
|
|
properties = core.get_property(device, "FULL_DEVICE_NAME")
|
|
# Attempt to get additional properties if available
|
|
try:
|
|
device_type = core.get_property(device, "DEVICE_TYPE")
|
|
except Exception:
|
|
device_type = "N/A"
|
|
try:
|
|
capabilities: Any = core.get_property(
|
|
device, "SUPPORTED_PROPERTIES"
|
|
)
|
|
except Exception:
|
|
capabilities = "N/A"
|
|
device_info.append(
|
|
{
|
|
"name": device,
|
|
"full_name": properties,
|
|
"type": device_type,
|
|
"capabilities": capabilities,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve properties for device {device}: {e}")
|
|
device_info.append(
|
|
{
|
|
"name": device,
|
|
"full_name": "Unknown",
|
|
"type": "N/A",
|
|
"capabilities": "N/A",
|
|
}
|
|
)
|
|
return device_info
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve available devices: {e}")
|
|
return []
|
|
|
|
|
|
def print_available_devices(device: str | None = None):
|
|
"""Print available OpenVINO devices in a formatted manner."""
|
|
devices = get_available_devices()
|
|
if not devices:
|
|
logger.info("No OpenVINO devices detected.")
|
|
return
|
|
logger.info("Available OpenVINO Devices:")
|
|
for d in devices:
|
|
logger.info(
|
|
f"- Device: {d.get('name')} {'*' if d.get('name') == device else ''}"
|
|
)
|
|
logger.info(f" Full Name: {d.get('full_name')}")
|
|
logger.info(f" Type: {d.get('type')}")
|
|
|
|
|
|
print_available_devices(_device)
|
|
|
|
|
|
class AudioQueueItem(BaseModel):
|
|
"""Audio data with timestamp for processing queue."""
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
audio: AudioArray = Field(..., description="Audio data as numpy array")
|
|
timestamp: float = Field(..., description="Timestamp when audio was captured")
|
|
|
|
|
|
class TranscriptionHistoryItem(BaseModel):
|
|
"""Transcription history item with metadata."""
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
message: str = Field(..., description="Transcribed text message")
|
|
timestamp: float = Field(..., description="When transcription was completed")
|
|
is_final: bool = Field(
|
|
..., description="Whether this is final or streaming transcription"
|
|
)
|
|
|
|
|
|
class OpenVINOConfig(BaseModel):
|
|
"""OpenVINO configuration for Intel Arc B580 optimization."""
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
device: str = Field(default=_device, description="Target device for inference")
|
|
cache_dir: str = Field(
|
|
default="./ov_cache", description="Cache directory for compiled models"
|
|
)
|
|
enable_quantization: bool = Field(
|
|
default=True, description="Enable INT8 quantization"
|
|
)
|
|
throughput_streams: int = Field(
|
|
default=2, description="Number of inference streams"
|
|
)
|
|
max_threads: int = Field(default=8, description="Maximum number of threads")
|
|
|
|
def to_ov_config(self) -> ModelConfig:
|
|
"""Convert to OpenVINO configuration dictionary."""
|
|
cfg: ModelConfig = {"CACHE_DIR": self.cache_dir}
|
|
|
|
# Only include GPU-specific tuning options when the target device is GPU.
|
|
# Some OpenVINO plugins (notably the CPU plugin) will raise NotFound
|
|
# errors for GPU_* properties, so avoid passing them unless applicable.
|
|
device = (self.device or "").upper()
|
|
if device == "GPU":
|
|
cfg.update(
|
|
{
|
|
# Throughput / stream tuning
|
|
"GPU_THROUGHPUT_STREAMS": str(self.throughput_streams),
|
|
# Threading controls may be driver/plugin-specific; keep minimal
|
|
# NOTE: We intentionally do NOT set GPU_MAX_NUM_THREADS here
|
|
# because some OpenVINO plugins / builds (and the CPU plugin
|
|
# during a fallback) do not recognize the property and will
|
|
# raise NotFound/UnsupportedProperty errors. If you need to
|
|
# tune GPU threads for a specific driver, set that externally
|
|
# or via vendor-specific tools.
|
|
}
|
|
)
|
|
else:
|
|
# Safe CPU-side defaults
|
|
cfg.update(
|
|
{
|
|
"CPU_THROUGHPUT_NUM_THREADS": str(self.max_threads),
|
|
"CPU_BIND_THREAD": "YES",
|
|
}
|
|
)
|
|
|
|
return cfg
|
|
|
|
|
|
# Global configuration and constants
|
|
AGENT_NAME = "whisper"
|
|
AGENT_DESCRIPTION = "Real-time speech transcription (OpenVINO Whisper) - converts speech to text on Intel Arc B580"
|
|
SAMPLE_RATE = 16000 # Whisper expects 16kHz
|
|
CHUNK_DURATION_MS = 100 # Reduced latency - 100ms chunks
|
|
VAD_THRESHOLD = 0.01 # Initial voice activity detection threshold
|
|
MAX_SILENCE_FRAMES = 30 # 3 seconds of silence before stopping (for overall silence)
|
|
MAX_TRAILING_SILENCE_FRAMES = 5 # 0.5 seconds of trailing silence
|
|
VAD_CONFIG = {
|
|
"energy_threshold": 0.01,
|
|
"zcr_threshold": 0.1,
|
|
"adapt_thresholds": True,
|
|
"adaptation_window": 100, # samples to consider for adaptation
|
|
"speech_freq_min": 200, # Hz
|
|
"speech_freq_max": 3000, # Hz
|
|
}
|
|
|
|
model_ids = {
|
|
"Distil-Whisper": [
|
|
"distil-whisper/distil-large-v2",
|
|
"distil-whisper/distil-medium.en",
|
|
"distil-whisper/distil-small.en",
|
|
],
|
|
"Whisper": [
|
|
"openai/whisper-large-v3",
|
|
"openai/whisper-large-v2",
|
|
"openai/whisper-large",
|
|
"openai/whisper-medium",
|
|
"openai/whisper-small",
|
|
"openai/whisper-base",
|
|
"openai/whisper-tiny",
|
|
"openai/whisper-medium.en",
|
|
"openai/whisper-small.en",
|
|
"openai/whisper-base.en",
|
|
"openai/whisper-tiny.en",
|
|
],
|
|
}
|
|
|
|
# Global model configuration
|
|
_model_type = model_ids["Distil-Whisper"]
|
|
_model_id = _model_type[0] # Use distil-large-v2 for best quality
|
|
_ov_config = OpenVINOConfig()
|
|
|
|
|
|
def setup_intel_arc_environment() -> None:
|
|
"""Configure environment variables for optimal Intel Arc B580 performance."""
|
|
os.environ["OV_GPU_CACHE_MODEL"] = "1"
|
|
os.environ["OV_GPU_ENABLE_OPENCL_THROTTLING"] = "0"
|
|
os.environ["OV_GPU_DISABLE_WINOGRAD"] = "1"
|
|
logger.info("Intel Arc B580 environment variables configured")
|
|
|
|
|
|
class OpenVINOWhisperModel:
|
|
"""OpenVINO optimized Whisper model for Intel Arc B580."""
|
|
|
|
def __init__(self, model_id: str, config: OpenVINOConfig, device: str):
|
|
self.model_id = model_id
|
|
self.config = config
|
|
self.device = device
|
|
self.model_path = Path(model_id.replace("/", "_"))
|
|
self.quantized_model_path = Path(f"{self.model_path}_quantized")
|
|
|
|
self.processor: Optional[WhisperProcessor] = None
|
|
self.ov_model: Optional[OVModelForSpeechSeq2Seq] = None
|
|
self.is_quantized = False
|
|
|
|
self._initialize_model()
|
|
|
|
def _initialize_model(self) -> None:
|
|
"""Initialize processor and OpenVINO model with robust error handling."""
|
|
logger.info(f"Initializing OpenVINO Whisper model: {self.model_id}")
|
|
|
|
try:
|
|
# Initialize processor
|
|
logger.info(
|
|
f"Loading Whisper model '{self.model_id}' on device: {self.device}"
|
|
)
|
|
self.processor = WhisperProcessor.from_pretrained(
|
|
self.model_id, use_fast=True
|
|
) # type: ignore
|
|
logger.info("Whisper processor loaded successfully")
|
|
|
|
# Export the model to OpenVINO IR if not already converted
|
|
self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.model_id, export=True, device=self.device
|
|
) # type: ignore
|
|
|
|
logger.info("Whisper model exported as OpenVINO IR")
|
|
|
|
# # Try to load quantized model first if it exists
|
|
# if self.config.enable_quantization and self.quantized_model_path.exists():
|
|
# if self._try_load_quantized_model():
|
|
# return
|
|
|
|
# # Load or create FP16 model
|
|
# if self.model_path.exists():
|
|
# self._load_fp16_model()
|
|
# else:
|
|
# self._convert_model()
|
|
|
|
# # Try quantization after model is loaded and compiled
|
|
# if self.config.enable_quantization and not self.is_quantized:
|
|
# self._try_quantize_existing_model()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing model: {e}")
|
|
# Fallback to basic conversion without quantization
|
|
self._fallback_initialization()
|
|
|
|
def _fallback_initialization(self) -> None:
|
|
"""Fallback initialization without quantization."""
|
|
logger.warning("Falling back to basic OpenVINO conversion without quantization")
|
|
try:
|
|
if not self.model_path.exists():
|
|
self._convert_model_basic()
|
|
self._load_fp16_model()
|
|
except Exception as e:
|
|
logger.error(f"Fallback initialization failed: {e}")
|
|
raise RuntimeError("Failed to initialize OpenVINO model") from e
|
|
|
|
def _convert_model(self) -> None:
|
|
"""Convert PyTorch model to OpenVINO format."""
|
|
logger.info(f"Converting {self.model_id} to OpenVINO format...")
|
|
|
|
try:
|
|
# Convert to OpenVINO with FP16 for Arc GPU
|
|
ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.model_id,
|
|
ov_config=self.config.to_ov_config(),
|
|
export=True,
|
|
compile=False,
|
|
load_in_8bit=False,
|
|
)
|
|
|
|
# Enable FP16 for Intel Arc performance
|
|
ov_model.half()
|
|
ov_model.save_pretrained(self.model_path)
|
|
logger.info("Model converted and saved in FP16 format")
|
|
|
|
# Load the converted model
|
|
self.ov_model = ov_model
|
|
self._compile_model()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Model conversion failed: {e}")
|
|
raise
|
|
|
|
def _convert_model_basic(self) -> None:
|
|
"""Basic model conversion without advanced features."""
|
|
logger.info(f"Basic conversion of {self.model_id} to OpenVINO format...")
|
|
|
|
ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.model_id, export=True, compile=False
|
|
)
|
|
|
|
ov_model.save_pretrained(self.model_path)
|
|
logger.info("Basic model conversion completed")
|
|
|
|
def _load_fp16_model(self) -> None:
|
|
"""Load existing FP16 OpenVINO model."""
|
|
logger.info("Loading existing FP16 OpenVINO model...")
|
|
try:
|
|
self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.model_path, ov_config=self.config.to_ov_config(), compile=False
|
|
)
|
|
self._compile_model()
|
|
except Exception as e:
|
|
logger.error(f"Failed to load FP16 model: {e}")
|
|
# Try basic loading
|
|
self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.model_path, compile=False
|
|
)
|
|
self._compile_model()
|
|
|
|
def _try_load_quantized_model(self) -> bool:
|
|
"""Try to load existing quantized model."""
|
|
try:
|
|
logger.info("Loading existing INT8 quantized model...")
|
|
self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.quantized_model_path,
|
|
ov_config=self.config.to_ov_config(),
|
|
compile=False,
|
|
)
|
|
self._compile_model()
|
|
self.is_quantized = True
|
|
logger.info("Quantized model loaded successfully")
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load quantized model: {e}")
|
|
return False
|
|
|
|
def _try_quantize_existing_model(self) -> None:
|
|
"""Try to quantize the existing model after it's loaded."""
|
|
if not QUANTIZATION_AVAILABLE:
|
|
logger.info("Quantization libraries not available, skipping quantization")
|
|
return
|
|
|
|
if self.ov_model is None:
|
|
logger.warning("No model loaded, cannot quantize")
|
|
return
|
|
|
|
# Check if model components are available
|
|
if not hasattr(self.ov_model, "encoder") or self.ov_model.encoder is None:
|
|
logger.warning("Model encoder not available, skipping quantization")
|
|
return
|
|
|
|
if (
|
|
not hasattr(self.ov_model, "decoder_with_past")
|
|
or self.ov_model.decoder_with_past is None
|
|
):
|
|
logger.warning(
|
|
"Model decoder_with_past not available, skipping quantization"
|
|
)
|
|
return
|
|
|
|
try:
|
|
logger.info("Attempting to quantize compiled model...")
|
|
self._quantize_model_safe()
|
|
except Exception as e:
|
|
logger.warning(f"Quantization failed, continuing with FP16 model: {e}")
|
|
|
|
def _quantize_model_safe(self) -> None:
|
|
"""Safely quantize the model with extensive error handling."""
|
|
if not nncf:
|
|
logger.info("Quantization libraries not available, skipping quantization")
|
|
return
|
|
if self.quantized_model_path.exists():
|
|
logger.info("Quantized model already exists")
|
|
return
|
|
|
|
if self.ov_model is None:
|
|
raise RuntimeError("No model to quantize")
|
|
|
|
if not self.ov_model.decoder_with_past:
|
|
raise RuntimeError("Model decoder_with_past not available")
|
|
|
|
logger.info("Creating INT8 quantized model for Intel Arc B580...")
|
|
|
|
try:
|
|
# Collect calibration data with error handling
|
|
calibration_data = self._collect_calibration_data_safe()
|
|
if not calibration_data:
|
|
logger.warning("No calibration data collected, skipping quantization")
|
|
return
|
|
|
|
# Quantize encoder
|
|
if calibration_data.get("encoder"):
|
|
logger.info("Quantizing encoder...")
|
|
quantized_encoder = nncf.quantize(
|
|
self.ov_model.encoder.model,
|
|
nncf.Dataset(calibration_data["encoder"]),
|
|
model_type=nncf.ModelType.TRANSFORMER,
|
|
subset_size=min(len(calibration_data["encoder"]), 50),
|
|
)
|
|
else:
|
|
logger.warning("No encoder calibration data, copying original encoder")
|
|
quantized_encoder = self.ov_model.encoder.model
|
|
|
|
# Quantize decoder
|
|
if calibration_data.get("decoder"):
|
|
logger.info("Quantizing decoder with past...")
|
|
quantized_decoder = nncf.quantize(
|
|
self.ov_model.decoder_with_past.model,
|
|
nncf.Dataset(calibration_data["decoder"]),
|
|
model_type=nncf.ModelType.TRANSFORMER,
|
|
subset_size=min(len(calibration_data["decoder"]), 50),
|
|
)
|
|
else:
|
|
logger.warning("No decoder calibration data, copying original decoder")
|
|
quantized_decoder = self.ov_model.decoder_with_past.model
|
|
|
|
# Save quantized models
|
|
self.quantized_model_path.mkdir(parents=True, exist_ok=True)
|
|
ov.save_model(
|
|
quantized_encoder,
|
|
self.quantized_model_path / "openvino_encoder_model.xml",
|
|
) # type: ignore
|
|
ov.save_model(
|
|
quantized_decoder,
|
|
self.quantized_model_path / "openvino_decoder_with_past_model.xml",
|
|
) # type: ignore
|
|
|
|
# Copy remaining files
|
|
self._copy_model_files()
|
|
|
|
# Clean up
|
|
del quantized_encoder, quantized_decoder, calibration_data
|
|
gc.collect()
|
|
|
|
# Load quantized model
|
|
if self._try_load_quantized_model():
|
|
logger.info("Quantization completed successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Quantization failed: {e}")
|
|
# Clean up partial quantization
|
|
if self.quantized_model_path.exists():
|
|
shutil.rmtree(self.quantized_model_path, ignore_errors=True)
|
|
|
|
def _collect_calibration_data_safe(
|
|
self, dataset_size: int = 20
|
|
) -> Dict[str, CalibrationData]:
|
|
"""Safely collect calibration data with extensive error handling."""
|
|
if self.ov_model is None or self.processor is None:
|
|
return {}
|
|
|
|
logger.info(f"Collecting calibration data ({dataset_size} samples)...")
|
|
|
|
# Check model components
|
|
if not self.ov_model.encoder:
|
|
logger.warning("Encoder not available for calibration")
|
|
return {}
|
|
|
|
if not self.ov_model.decoder_with_past:
|
|
logger.warning("Decoder with past not available for calibration")
|
|
return {}
|
|
|
|
# Check if requests are available
|
|
if (
|
|
not hasattr(self.ov_model.encoder, "request")
|
|
or self.ov_model.encoder.request is None
|
|
):
|
|
logger.warning("Encoder request not available for calibration")
|
|
return {}
|
|
|
|
if (
|
|
not hasattr(self.ov_model.decoder_with_past, "request")
|
|
or self.ov_model.decoder_with_past.request is None
|
|
):
|
|
logger.warning("Decoder request not available for calibration")
|
|
return {}
|
|
|
|
# Setup data collection
|
|
original_encoder_request = self.ov_model.encoder.request
|
|
original_decoder_request = self.ov_model.decoder_with_past.request
|
|
|
|
encoder_data: CalibrationData = []
|
|
decoder_data: CalibrationData = []
|
|
|
|
try:
|
|
self.ov_model.encoder.request = InferRequestWrapper(
|
|
original_encoder_request, encoder_data
|
|
)
|
|
self.ov_model.decoder_with_past.request = InferRequestWrapper(
|
|
original_decoder_request, decoder_data
|
|
)
|
|
|
|
# Generate synthetic calibration data instead of loading dataset
|
|
logger.info("Generating synthetic calibration data...")
|
|
for i in range(dataset_size):
|
|
try:
|
|
# Generate random audio similar to speech
|
|
duration = 2.0 + np.random.random() * 3.0 # 2-5 seconds
|
|
synthetic_audio = (
|
|
np.random.randn(int(SAMPLE_RATE * duration)).astype(np.float32)
|
|
* 0.1
|
|
)
|
|
|
|
inputs: Any = self.processor(
|
|
synthetic_audio, sampling_rate=SAMPLE_RATE, return_tensors="pt"
|
|
)
|
|
|
|
# Run inference to collect calibration data
|
|
generated_ids = self.ov_model.generate(
|
|
inputs.input_features, max_new_tokens=10
|
|
)
|
|
|
|
if i % 5 == 0:
|
|
logger.debug(
|
|
f"Generated calibration sample {i + 1}/{dataset_size}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate calibration sample {i}: {e}")
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during calibration data collection: {e}")
|
|
finally:
|
|
# Restore original requests
|
|
try:
|
|
self.ov_model.encoder.request = original_encoder_request
|
|
self.ov_model.decoder_with_past.request = original_decoder_request
|
|
except Exception as e:
|
|
logger.warning(f"Failed to restore original requests: {e}")
|
|
|
|
result = {}
|
|
if encoder_data:
|
|
result["encoder"] = encoder_data
|
|
logger.info(f"Collected {len(encoder_data)} encoder calibration samples")
|
|
if decoder_data:
|
|
result["decoder"] = decoder_data
|
|
logger.info(f"Collected {len(decoder_data)} decoder calibration samples")
|
|
|
|
return result
|
|
|
|
def _copy_model_files(self) -> None:
|
|
"""Copy necessary model files for quantized model."""
|
|
try:
|
|
# Copy config and first-step decoder
|
|
if (self.model_path / "config.json").exists():
|
|
shutil.copy(
|
|
self.model_path / "config.json",
|
|
self.quantized_model_path / "config.json",
|
|
)
|
|
|
|
decoder_xml = self.model_path / "openvino_decoder_model.xml"
|
|
decoder_bin = self.model_path / "openvino_decoder_model.bin"
|
|
|
|
if decoder_xml.exists():
|
|
shutil.copy(
|
|
decoder_xml,
|
|
self.quantized_model_path / "openvino_decoder_model.xml",
|
|
)
|
|
if decoder_bin.exists():
|
|
shutil.copy(
|
|
decoder_bin,
|
|
self.quantized_model_path / "openvino_decoder_model.bin",
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to copy some model files: {e}")
|
|
|
|
def _compile_model(self) -> None:
|
|
"""Compile model for Intel Arc B580."""
|
|
if self.ov_model is None:
|
|
raise RuntimeError("Model not loaded")
|
|
|
|
logger.info("Compiling model for Intel Arc B580...")
|
|
try:
|
|
self.ov_model.to(self.config.device)
|
|
self.ov_model.compile()
|
|
|
|
# Warmup for optimal performance
|
|
self._warmup_model()
|
|
logger.info("Model compiled and warmed up successfully")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to compile for {self.config.device}, attempting safe CPU fallback: {e}"
|
|
)
|
|
# Fallback: reload/compile model with a CPU-only ov_config to avoid
|
|
# passing GPU-specific properties to the CPU plugin which can raise
|
|
# NotFound/UnsupportedProperty exceptions.
|
|
try:
|
|
cpu_cfg = (
|
|
OpenVINOConfig(**{**self.config.model_dump()})
|
|
if hasattr(self.config, "model_dump")
|
|
else self.config
|
|
)
|
|
# Ensure device is CPU and use conservative CPU threading options
|
|
cpu_cfg = OpenVINOConfig(
|
|
device="CPU",
|
|
cache_dir=self.config.cache_dir,
|
|
enable_quantization=self.config.enable_quantization,
|
|
throughput_streams=1,
|
|
max_threads=self.config.max_threads,
|
|
)
|
|
|
|
logger.info(
|
|
"Reloading model with CPU-only OpenVINO config for safe compilation"
|
|
)
|
|
# Try to reload using the existing saved model path if possible
|
|
try:
|
|
self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.model_path, ov_config=cpu_cfg.to_ov_config(), compile=False
|
|
)
|
|
except Exception:
|
|
# If loading the saved model failed, try loading without ov_config
|
|
self.ov_model = OVModelForSpeechSeq2Seq.from_pretrained(
|
|
self.model_path, compile=False
|
|
)
|
|
|
|
# Compile on CPU
|
|
self.ov_model.to("CPU")
|
|
# Provide CPU-only ov_config if supported
|
|
try:
|
|
self.ov_model.compile()
|
|
except Exception as compile_cpu_e:
|
|
logger.warning(
|
|
f"CPU compile with CPU ov_config failed, retrying default compile: {compile_cpu_e}"
|
|
)
|
|
self.ov_model.compile()
|
|
|
|
self._warmup_model()
|
|
logger.info("Model compiled for CPU successfully")
|
|
except Exception as cpu_e:
|
|
logger.error(f"Failed to compile for CPU as well: {cpu_e}")
|
|
raise
|
|
|
|
def _warmup_model(self) -> None:
|
|
"""Warmup model for consistent GPU performance."""
|
|
if self.ov_model is None or self.processor is None:
|
|
return
|
|
|
|
try:
|
|
logger.info("Warming up model...")
|
|
dummy_audio = np.random.randn(SAMPLE_RATE).astype(np.float32) # 1 second
|
|
dummy_features = self.processor(
|
|
dummy_audio, sampling_rate=SAMPLE_RATE, return_tensors="pt"
|
|
).input_features
|
|
|
|
# Run warmup iterations
|
|
for i in range(3):
|
|
_ = self.ov_model.generate(dummy_features, max_new_tokens=10)
|
|
if i == 0:
|
|
logger.debug("First warmup iteration completed")
|
|
except Exception as e:
|
|
logger.warning(f"Model warmup failed: {e}")
|
|
|
|
def decode(
|
|
self, token_ids: torch.Tensor, skip_special_tokens: bool = True
|
|
) -> List[str]:
|
|
"""Decode token IDs to text."""
|
|
if self.processor is None:
|
|
raise RuntimeError("Processor not initialized")
|
|
|
|
return self.processor.batch_decode(
|
|
token_ids, skip_special_tokens=skip_special_tokens
|
|
)
|
|
|
|
|
|
# Global model instance with deferred loading
|
|
_whisper_model: Optional[OpenVINOWhisperModel] = None
|
|
_audio_processors: Dict[str, "OptimizedAudioProcessor"] = {}
|
|
_send_chat_func: Optional[Callable[[str], Awaitable[None]]] = None
|
|
|
|
# Model loading status for video display
|
|
_model_loading_status: str = "Not loaded"
|
|
_model_loading_progress: float = 0.0
|
|
|
|
|
|
def _ensure_model_loaded(device: str = _device) -> OpenVINOWhisperModel:
|
|
"""Ensure the global model is loaded."""
|
|
global _whisper_model, _model_loading_status, _model_loading_progress
|
|
if _whisper_model is None:
|
|
setup_intel_arc_environment()
|
|
logger.info(f"Loading OpenVINO Whisper model: {_model_id}")
|
|
_model_loading_status = "Loading model..."
|
|
_model_loading_progress = 0.1
|
|
|
|
_whisper_model = OpenVINOWhisperModel(
|
|
model_id=_model_id, config=_ov_config, device=device
|
|
)
|
|
_model_loading_status = "Model loaded successfully"
|
|
_model_loading_progress = 1.0
|
|
logger.info("OpenVINO Whisper model loaded successfully")
|
|
return _whisper_model
|
|
|
|
|
|
def extract_input_features(audio_array: AudioArray, sampling_rate: int) -> torch.Tensor:
|
|
"""Extract input features from audio array optimized for OpenVINO."""
|
|
ov_model = _ensure_model_loaded()
|
|
if ov_model.processor is None:
|
|
raise RuntimeError("Processor not initialized")
|
|
|
|
inputs = ov_model.processor(
|
|
audio_array,
|
|
sampling_rate=sampling_rate,
|
|
return_tensors="pt",
|
|
)
|
|
return inputs.input_features
|
|
|
|
class VoiceActivityDetector(BaseModel):
|
|
has_speech: bool = False
|
|
energy: float = 0.0
|
|
zcr: float = 0.0
|
|
centroid: float = 0.0
|
|
|
|
def simple_robust_vad(
|
|
audio_data: AudioArray,
|
|
energy_threshold: float = 0.01,
|
|
sample_rate: int = SAMPLE_RATE,
|
|
) -> VoiceActivityDetector:
|
|
"""Simplified robust VAD."""
|
|
|
|
# Energy-based detection (RMS)
|
|
energy = np.sqrt(np.mean(audio_data**2))
|
|
|
|
# Zero-crossing rate
|
|
signs = np.sign(audio_data)
|
|
signs[signs == 0] = 1
|
|
zcr = np.sum(np.abs(np.diff(signs))) / (2 * len(audio_data))
|
|
|
|
# Relaxed speech detection - use OR instead of AND for some conditions
|
|
has_speech = (
|
|
energy > energy_threshold or # Primary condition
|
|
(energy > energy_threshold * 0.5 and zcr > 0.05) # Secondary condition
|
|
)
|
|
|
|
return VoiceActivityDetector(has_speech=has_speech, energy=energy, zcr=zcr, centroid=0.0)
|
|
|
|
def enhanced_vad(
|
|
audio_data: AudioArray,
|
|
energy_threshold: float = 0.01,
|
|
zcr_threshold: float = 0.1,
|
|
sample_rate: int = SAMPLE_RATE,
|
|
) -> VoiceActivityDetector:
|
|
"""Enhanced VAD using multiple features.
|
|
|
|
Returns:
|
|
tuple: (has_speech, metrics_dict)
|
|
"""
|
|
# Energy-based detection
|
|
energy = np.sqrt(np.mean(audio_data**2))
|
|
|
|
# Zero-crossing rate for speech detection
|
|
signs = np.sign(audio_data)
|
|
signs[signs == 0] = 1 # Handle zeros
|
|
zcr = np.sum(np.abs(np.diff(signs))) / (2 * len(audio_data))
|
|
|
|
# Spectral centroid for voice vs noise discrimination
|
|
fft = np.fft.rfft(audio_data)
|
|
magnitude = np.abs(fft)
|
|
freqs = np.fft.rfftfreq(len(audio_data), 1 / sample_rate)
|
|
|
|
if np.sum(magnitude) > 0:
|
|
centroid = np.sum(freqs * magnitude) / np.sum(magnitude)
|
|
else:
|
|
centroid = 0
|
|
|
|
# Combined decision with configurable thresholds
|
|
has_speech = (
|
|
energy > energy_threshold
|
|
and zcr > zcr_threshold
|
|
and 200 < centroid < 3000 # Human speech frequency range
|
|
)
|
|
|
|
return VoiceActivityDetector(has_speech=has_speech, energy=energy, zcr=zcr, centroid=centroid)
|
|
|
|
|
|
class OptimizedAudioProcessor:
|
|
"""Optimized audio processor for Intel Arc B580 with reduced latency."""
|
|
|
|
def __init__(
|
|
self, peer_name: str, send_chat_func: Callable[[str], Awaitable[None]]
|
|
):
|
|
self.peer_name = peer_name
|
|
self.send_chat_func = send_chat_func
|
|
self.sample_rate = SAMPLE_RATE
|
|
|
|
# Optimized buffering parameters
|
|
self.chunk_size = int(self.sample_rate * CHUNK_DURATION_MS / 1000)
|
|
self.buffer_size = self.chunk_size * 50
|
|
|
|
# Circular buffer for zero-copy operations
|
|
self.audio_buffer = np.zeros(self.buffer_size, dtype=np.float32)
|
|
self.write_ptr = 0
|
|
self.read_ptr = 0
|
|
|
|
# Enhanced VAD parameters with EMA for noise adaptation
|
|
self.vad_energy_threshold: float = VAD_THRESHOLD
|
|
self.vad_zcr_threshold: float = 0.1
|
|
self.noise_energy_ema: float = 0.001 # Initial noise estimate
|
|
self.noise_zcr_ema: float = 0.05
|
|
self.ema_alpha: float = 0.05 # Adaptation rate
|
|
self.energy_multiplier: float = 3.0 # Threshold = noise_ema * multiplier
|
|
self.zcr_multiplier: float = 2.0
|
|
self.min_energy_threshold: float = 0.005
|
|
self.min_zcr_threshold: float = 0.05
|
|
|
|
self.silence_frames: int = 0
|
|
self.max_silence_frames: int = MAX_SILENCE_FRAMES
|
|
self.max_trailing_silence_frames: int = MAX_TRAILING_SILENCE_FRAMES
|
|
|
|
# VAD metrics tracking for adaptive thresholds
|
|
self.vad_metrics_history: list[VoiceActivityDetector] = []
|
|
self.adaptive_threshold_enabled: bool = True
|
|
|
|
# Processing state
|
|
self.current_phrase_audio = np.array([], dtype=np.float32)
|
|
self.transcription_history: List[TranscriptionHistoryItem] = []
|
|
self.last_activity_time = time.time()
|
|
|
|
# Async processing
|
|
self.processing_queue: asyncio.Queue[AudioQueueItem] = asyncio.Queue(maxsize=10)
|
|
self.is_running = True
|
|
|
|
# Start async processing task
|
|
try:
|
|
self.main_loop = asyncio.get_running_loop()
|
|
asyncio.create_task(self._async_processing_loop())
|
|
logger.info(f"Started async processing for {self.peer_name}")
|
|
except RuntimeError:
|
|
# Fallback to thread-based processing
|
|
self.main_loop = None
|
|
self.processor_thread = threading.Thread(
|
|
target=self._thread_processing_loop, daemon=True
|
|
)
|
|
self.processor_thread.start()
|
|
logger.warning(f"Using thread-based processing for {self.peer_name}")
|
|
|
|
logger.info(f"OptimizedAudioProcessor initialized for {self.peer_name}")
|
|
|
|
def add_audio_data(self, audio_data: AudioArray) -> None:
|
|
"""Add audio data with enhanced Voice Activity Detection, preventing leading silence."""
|
|
if not self.is_running or len(audio_data) == 0:
|
|
logger.error("Processor not running or empty audio data")
|
|
return
|
|
|
|
vad_metrics = simple_robust_vad(
|
|
audio_data,
|
|
energy_threshold=0.01, # self.vad_energy_threshold,
|
|
sample_rate=self.sample_rate,
|
|
)
|
|
# Use enhanced VAD
|
|
# vad_metrics = enhanced_vad(
|
|
# audio_data,
|
|
# energy_threshold=self.vad_energy_threshold,
|
|
# zcr_threshold=self.vad_zcr_threshold,
|
|
# sample_rate=self.sample_rate,
|
|
# )
|
|
|
|
# Update noise estimates if no speech
|
|
if not vad_metrics.has_speech:
|
|
self.noise_energy_ema = (
|
|
self.ema_alpha * vad_metrics.energy
|
|
+ (1 - self.ema_alpha) * self.noise_energy_ema
|
|
)
|
|
# self.noise_zcr_ema = (
|
|
# self.ema_alpha * vad_metrics.zcr
|
|
# + (1 - self.ema_alpha) * self.noise_zcr_ema
|
|
# )
|
|
|
|
# Adapt thresholds
|
|
self.vad_energy_threshold = max(
|
|
# self.noise_energy_ema * self.energy_multiplier, self.min_energy_threshold
|
|
self.noise_energy_ema * 2.0, 0.005
|
|
)
|
|
# self.vad_zcr_threshold = max(
|
|
# self.noise_zcr_ema * self.zcr_multiplier, self.min_zcr_threshold
|
|
# )
|
|
|
|
# Store metrics for additional tracking
|
|
self.vad_metrics_history.append(vad_metrics)
|
|
if len(self.vad_metrics_history) > 100:
|
|
self.vad_metrics_history.pop(0)
|
|
|
|
# Log detailed VAD decision occasionally for debugging
|
|
if len(self.vad_metrics_history) % 50 == 0:
|
|
logger.debug(
|
|
f"VAD metrics for {self.peer_name}: "
|
|
f"energy={vad_metrics.energy:.4f}, "
|
|
f"zcr={vad_metrics.zcr:.4f}, "
|
|
f"centroid={vad_metrics.centroid:.1f}Hz, "
|
|
f"speech={vad_metrics.has_speech}, "
|
|
f"noise_energy_ema={self.noise_energy_ema:.4f}, "
|
|
f"threshold={self.vad_energy_threshold:.4f}"
|
|
)
|
|
|
|
# Decision logic to avoid leading silence and limit trailing silence
|
|
if vad_metrics.has_speech:
|
|
logger.info(f"Speech detected for {self.peer_name}: {vad_metrics} (current phrase length: {len(self.current_phrase_audio) / self.sample_rate})")
|
|
self.silence_frames = 0
|
|
self.last_activity_time = time.time()
|
|
self._add_to_circular_buffer(audio_data)
|
|
elif (
|
|
len(self.current_phrase_audio) > 0
|
|
and self.silence_frames < self.max_trailing_silence_frames
|
|
):
|
|
logger.info(f"Trailing silence accepted for {self.peer_name}")
|
|
self.silence_frames += 1
|
|
self._add_to_circular_buffer(audio_data)
|
|
else:
|
|
if (self.silence_frames % 10 == 0) and (self.silence_frames > 0):
|
|
logger.info(
|
|
f"VAD metrics for {self.peer_name}: "
|
|
f"energy={vad_metrics.energy:.4f}, "
|
|
f"zcr={vad_metrics.zcr:.4f}, "
|
|
f"centroid={vad_metrics.centroid:.1f}Hz, "
|
|
f"speech={vad_metrics.has_speech}, "
|
|
f"noise_energy_ema={self.noise_energy_ema:.4f}, "
|
|
f"threshold={self.vad_energy_threshold:.4f}"
|
|
)
|
|
self.silence_frames += 1
|
|
if (
|
|
self.silence_frames > self.max_silence_frames
|
|
and len(self.current_phrase_audio) > 0
|
|
):
|
|
self._queue_final_transcription()
|
|
return # Drop pure silence chunks (leading or excessive trailing)
|
|
|
|
# Check if we should process
|
|
if self._available_samples() >= self.chunk_size:
|
|
self._queue_for_processing()
|
|
|
|
def _add_to_circular_buffer(self, audio_data: AudioArray) -> None:
|
|
"""Add data to circular buffer efficiently."""
|
|
chunk_len = len(audio_data)
|
|
|
|
if self.write_ptr + chunk_len <= self.buffer_size:
|
|
# Simple case - no wraparound
|
|
self.audio_buffer[self.write_ptr : self.write_ptr + chunk_len] = audio_data
|
|
else:
|
|
# Wraparound case
|
|
first_part = self.buffer_size - self.write_ptr
|
|
self.audio_buffer[self.write_ptr :] = audio_data[:first_part]
|
|
self.audio_buffer[: chunk_len - first_part] = audio_data[first_part:]
|
|
|
|
self.write_ptr = (self.write_ptr + chunk_len) % self.buffer_size
|
|
|
|
def _available_samples(self) -> int:
|
|
"""Calculate available samples in circular buffer."""
|
|
if self.write_ptr >= self.read_ptr:
|
|
return self.write_ptr - self.read_ptr
|
|
else:
|
|
return self.buffer_size - self.read_ptr + self.write_ptr
|
|
|
|
def _extract_chunk(self, size: int) -> AudioArray:
|
|
"""Extract chunk from circular buffer."""
|
|
if self.read_ptr + size <= self.buffer_size:
|
|
chunk = self.audio_buffer[self.read_ptr : self.read_ptr + size].copy()
|
|
else:
|
|
first_part = self.buffer_size - self.read_ptr
|
|
chunk = np.concatenate(
|
|
[
|
|
self.audio_buffer[self.read_ptr :],
|
|
self.audio_buffer[: size - first_part],
|
|
]
|
|
)
|
|
|
|
self.read_ptr = (self.read_ptr + size) % self.buffer_size
|
|
return chunk.astype(np.float32)
|
|
|
|
def _queue_for_processing(self) -> None:
|
|
"""Queue audio chunk for processing."""
|
|
available = self._available_samples()
|
|
if available < self.chunk_size:
|
|
return
|
|
|
|
# Extract chunk for processing
|
|
chunk = self._extract_chunk(self.chunk_size)
|
|
|
|
# Create queue item
|
|
queue_item = AudioQueueItem(audio=chunk, timestamp=time.time())
|
|
|
|
# Queue for processing
|
|
if self.main_loop:
|
|
try:
|
|
self.processing_queue.put_nowait(queue_item)
|
|
except asyncio.QueueFull:
|
|
logger.warning(
|
|
f"Processing queue full for {self.peer_name}, dropping chunk"
|
|
)
|
|
else:
|
|
# Thread-based fallback
|
|
try:
|
|
threading_queue = getattr(self, "_threading_queue", None)
|
|
if threading_queue:
|
|
threading_queue.put_nowait(queue_item)
|
|
except Exception as e:
|
|
logger.warning(f"Threading queue issue for {self.peer_name}: {e}")
|
|
|
|
def _queue_final_transcription(self) -> None:
|
|
"""Queue final transcription of current phrase."""
|
|
if (
|
|
len(self.current_phrase_audio) > self.sample_rate * 0.5
|
|
): # At least 0.5 seconds
|
|
if self.main_loop:
|
|
logger.info(f"Queueing final transcription for {self.peer_name}")
|
|
asyncio.create_task(
|
|
self._transcribe_and_send(
|
|
self.current_phrase_audio.copy(), is_final=True
|
|
)
|
|
)
|
|
|
|
self.current_phrase_audio = np.array([], dtype=np.float32)
|
|
|
|
async def _async_processing_loop(self) -> None:
|
|
"""Async processing loop for audio chunks."""
|
|
logger.info(f"Started async processing loop for {self.peer_name}")
|
|
|
|
while self.is_running:
|
|
try:
|
|
# Get audio chunk
|
|
audio_item = await asyncio.wait_for(
|
|
self.processing_queue.get(), timeout=1.0
|
|
)
|
|
|
|
# Add to current phrase
|
|
self.current_phrase_audio = np.concatenate(
|
|
[self.current_phrase_audio, audio_item.audio]
|
|
)
|
|
|
|
# Check if we should transcribe
|
|
phrase_duration = len(self.current_phrase_audio) / self.sample_rate
|
|
|
|
if phrase_duration >= 1.0: # Transcribe every 1 second
|
|
logger.info(f"Transcribing for {self.peer_name} (1s interval)")
|
|
await self._transcribe_and_send(
|
|
self.current_phrase_audio.copy(), is_final=False
|
|
)
|
|
|
|
except asyncio.TimeoutError:
|
|
# Check for final transcription on timeout
|
|
if (
|
|
len(self.current_phrase_audio) > 0
|
|
and time.time() - self.last_activity_time > 2.0
|
|
):
|
|
logger.info(f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError)")
|
|
await self._transcribe_and_send(
|
|
self.current_phrase_audio.copy(), is_final=True
|
|
)
|
|
self.current_phrase_audio = np.array([], dtype=np.float32)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in async processing loop for {self.peer_name}: {e}"
|
|
)
|
|
|
|
logger.info(f"Async processing loop ended for {self.peer_name}")
|
|
|
|
def _thread_processing_loop(self) -> None:
|
|
"""Thread-based processing loop fallback."""
|
|
self._threading_queue: Queue[AudioQueueItem] = Queue(maxsize=10)
|
|
logger.info(f"Started thread processing loop for {self.peer_name}")
|
|
|
|
while self.is_running:
|
|
try:
|
|
audio_item = self._threading_queue.get(timeout=1.0)
|
|
|
|
# Add to current phrase
|
|
self.current_phrase_audio = np.concatenate(
|
|
[self.current_phrase_audio, audio_item.audio]
|
|
)
|
|
|
|
# Check if we should transcribe
|
|
phrase_duration = len(self.current_phrase_audio) / self.sample_rate
|
|
|
|
if phrase_duration >= 1.0:
|
|
if self.main_loop:
|
|
logger.info(f"Transcribing from thread for {self.peer_name} (_thread_processing_loop > 1s interval)")
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._transcribe_and_send(
|
|
self.current_phrase_audio.copy(), is_final=False
|
|
),
|
|
self.main_loop,
|
|
)
|
|
|
|
except Empty:
|
|
# Check for final transcription
|
|
if (
|
|
len(self.current_phrase_audio) > 0
|
|
and time.time() - self.last_activity_time > 2.0
|
|
):
|
|
if self.main_loop:
|
|
logger.info(f"Final transcription from thread for {self.peer_name}")
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._transcribe_and_send(
|
|
self.current_phrase_audio.copy(), is_final=True
|
|
),
|
|
self.main_loop,
|
|
)
|
|
self.current_phrase_audio = np.array([], dtype=np.float32)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in thread processing loop for {self.peer_name}: {e}"
|
|
)
|
|
|
|
async def _transcribe_and_send(
|
|
self, audio_array: AudioArray, is_final: bool, language: str = "en"
|
|
) -> None:
|
|
"""
|
|
Transcribe raw numpy audio data using OpenVINO Whisper.
|
|
|
|
Parameters:
|
|
- audio_array: 1D numpy array containing mono PCM data at 16 kHz.
|
|
- is_final: whether this is a final transcription (True) or interim (False)
|
|
- language: language code for transcription (default 'en' for English)
|
|
"""
|
|
if audio_array.ndim != 1:
|
|
raise ValueError("Expected mono audio as a 1D numpy array.")
|
|
|
|
transcription_start = time.time()
|
|
transcription_type = "final" if is_final else "streaming"
|
|
|
|
try:
|
|
audio_duration = len(audio_array) / self.sample_rate
|
|
|
|
# Skip very short audio
|
|
if audio_duration < 0.3:
|
|
logger.debug(
|
|
f"Skipping {transcription_type} transcription: too short ({audio_duration:.2f}s)"
|
|
)
|
|
return
|
|
|
|
# Audio quality check
|
|
audio_rms = np.sqrt(np.mean(audio_array**2))
|
|
if audio_rms < 0.001:
|
|
logger.debug(
|
|
f"Skipping {transcription_type} transcription: too quiet (RMS: {audio_rms:.6f})"
|
|
)
|
|
return
|
|
|
|
logger.info(
|
|
f"🎬 OpenVINO transcription ({transcription_type}) started: {audio_duration:.2f}s, RMS: {audio_rms:.4f}"
|
|
)
|
|
|
|
# Extract features for OpenVINO
|
|
input_features = extract_input_features(audio_array, self.sample_rate)
|
|
|
|
# logger.info(f"Features extracted for OpenVINO: {input_features.shape}")
|
|
# GPU inference with OpenVINO
|
|
ov_model = _ensure_model_loaded()
|
|
generation_config = GenerationConfig(
|
|
# Quality parameters
|
|
max_length=448,
|
|
num_beams=6, # Increase from 4 for better quality (slower)
|
|
# temperature=0.8, # Set to 0 for deterministic, best-guess output -- not supported in OpenVINO
|
|
# "length_penalty": 1.0, # Adjust to favor longer/shorter sequences
|
|
no_repeat_ngram_size=3,
|
|
# Confidence and alternatives
|
|
# output_score=True, # Get token probabilities -- not supported in OpenVINO
|
|
# return_dict_in_generate=True, # Get detailed output -- not supported in OpenVINO
|
|
output_attentions=False, # Set True if you need attention weights
|
|
num_return_sequences=1, # Set >1 to get alternatives
|
|
max_new_tokens=128, # Limit response length
|
|
# Task settings: Cannot specify `task` or `language` for an English-only model.
|
|
# # If the model is intended to be multilingual, pass `is_multilingual=True` to generate,
|
|
# # or update the generation config.ValueError: Cannot specify `task` or `language` for
|
|
# # an English-only model. If the model is intended to be multilingual, pass
|
|
# # `is_multilingual=True` to generate, or update the generation config
|
|
# language=language,
|
|
# task="transcribe",
|
|
# Performance vs quality tradeoffs
|
|
early_stopping=True, # Stop when EOS token is found
|
|
use_cache=True, # Speed up decoding
|
|
# Threshold parameters
|
|
# logprob_threshold=-1.0, # Filter tokens below this log probability -- not supported in OpenVINO
|
|
compression_ratio_threshold=2.4, # Reject if compression ratio too high
|
|
# no_speech_threshold=0.6, # Threshold for detecting non-speech -- not supported in OpenVINO
|
|
)
|
|
|
|
generation_output = ov_model.ov_model.generate( # type: ignore
|
|
input_features, generation_config=generation_config
|
|
)
|
|
|
|
generated_ids = generation_output
|
|
|
|
# # Extract transcription and scores
|
|
# generated_ids = generation_output.sequences
|
|
|
|
# # Get confidence scores if available
|
|
# if hasattr(generation_output, "scores") and generation_output.scores:
|
|
# # Calculate average confidence
|
|
# token_probs = []
|
|
# for score in generation_output.scores:
|
|
# probs = torch.nn.functional.softmax(score, dim=-1)
|
|
# max_probs = torch.max(probs, dim=-1).values
|
|
# token_probs.extend(max_probs.cpu().numpy())
|
|
|
|
# avg_confidence = np.mean(token_probs) if token_probs else 0.0
|
|
# min_confidence = np.min(token_probs) if token_probs else 0.0
|
|
# else:
|
|
# avg_confidence = min_confidence = 0.0
|
|
|
|
# Decode text
|
|
transcription: str = ov_model.processor.batch_decode(
|
|
generated_ids, skip_special_tokens=True
|
|
)[0].strip()
|
|
|
|
transcription_time = time.time() - transcription_start
|
|
|
|
# Apply confidence threshold
|
|
# confidence_threshold = 0.7 # Adjustable
|
|
# if avg_confidence < confidence_threshold:
|
|
# logger.warning(
|
|
# f"Low confidence transcription ({avg_confidence:.2f}): '{transcription}'"
|
|
# )
|
|
# # Optionally retry with different parameters or skip
|
|
# if avg_confidence < 0.5:
|
|
# return # Skip very low confidence
|
|
|
|
# # Include confidence in message
|
|
# confidence_indicator = (
|
|
# "✓" if avg_confidence > 0.8 else "?" if avg_confidence < 0.6 else ""
|
|
# )
|
|
message = f"{self.peer_name}: {transcription}" # {confidence_indicator}[{avg_confidence:.1%}]"
|
|
|
|
await self.send_chat_func(message)
|
|
|
|
if transcription:
|
|
# Create message with timing
|
|
status_marker = "⚡" if is_final else "🎤"
|
|
type_marker = "" if is_final else " [streaming]"
|
|
timing_info = f" (🚀 {transcription_time:.2f}s)"
|
|
|
|
message = f"{status_marker} {self.peer_name}{type_marker}: {transcription}{timing_info}"
|
|
|
|
# Avoid duplicates
|
|
if not self._is_duplicate(transcription):
|
|
await self.send_chat_func(message)
|
|
|
|
# Update history
|
|
self.transcription_history.append(
|
|
TranscriptionHistoryItem(
|
|
message=message, timestamp=time.time(), is_final=is_final
|
|
)
|
|
)
|
|
|
|
# Limit history
|
|
if len(self.transcription_history) > 10:
|
|
self.transcription_history.pop(0)
|
|
|
|
logger.info(
|
|
f"✅ OpenVINO transcription ({transcription_type}): '{transcription}' ({transcription_time:.3f}s)"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"Skipping duplicate {transcription_type} transcription: '{transcription}'"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"Empty or too short transcription result: '{transcription}'"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in OpenVINO {transcription_type} transcription: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
def _is_duplicate(self, text: str) -> bool:
|
|
"""Check if transcription is duplicate of recent ones."""
|
|
recent_texts = [
|
|
h.message.split(": ", 1)[-1].split(" (🚀")[0]
|
|
for h in self.transcription_history[-3:]
|
|
]
|
|
return text in recent_texts
|
|
|
|
def shutdown(self) -> None:
|
|
"""Shutdown the audio processor."""
|
|
logger.info(f"Shutting down OptimizedAudioProcessor for {self.peer_name}...")
|
|
self.is_running = False
|
|
|
|
# Final transcription if needed
|
|
if len(self.current_phrase_audio) > 0:
|
|
if self.main_loop:
|
|
asyncio.create_task(
|
|
self._transcribe_and_send(
|
|
self.current_phrase_audio.copy(), is_final=True
|
|
)
|
|
)
|
|
|
|
# Cleanup thread if exists
|
|
if hasattr(self, "processor_thread") and self.processor_thread.is_alive():
|
|
self.processor_thread.join(timeout=2.0)
|
|
|
|
logger.info(f"OptimizedAudioProcessor shutdown complete for {self.peer_name}")
|
|
|
|
|
|
async def calibrate_vad(
|
|
audio_processor: OptimizedAudioProcessor, calibration_duration: float = 2.0
|
|
) -> None:
|
|
"""Calibrate VAD thresholds based on ambient noise (placeholder for initial calibration)."""
|
|
logger.info(f"Calibrating VAD for {audio_processor.peer_name}...")
|
|
|
|
# Since EMA adapts on the fly, initial calibration can be minimal or skipped.
|
|
# For better initial estimate, assume first few chunks are noise (handled in add_audio_data).
|
|
await asyncio.sleep(calibration_duration)
|
|
logger.info(
|
|
f"VAD initial calibration complete: energy_threshold={audio_processor.vad_energy_threshold:.4f}"
|
|
)
|
|
|
|
|
|
class MediaClock:
|
|
"""Simple monotonic clock for media tracks."""
|
|
|
|
def __init__(self) -> None:
|
|
self.t0 = perf_counter()
|
|
|
|
def now(self) -> float:
|
|
return perf_counter() - self.t0
|
|
|
|
|
|
class WaveformVideoTrack(MediaStreamTrack):
|
|
"""Video track that renders a live waveform of the incoming audio.
|
|
|
|
The track reads the most-active `OptimizedAudioProcessor` in
|
|
`_audio_processors` and renders the last ~2s of its `current_phrase_audio`.
|
|
If no audio is available, the track will display a "No audio" message.
|
|
"""
|
|
|
|
kind = "video"
|
|
|
|
def __init__(
|
|
self, session_name: str, width: int = 640, height: int = 480, fps: int = 15
|
|
) -> None:
|
|
super().__init__()
|
|
self.session_name = session_name
|
|
self.width = int(width)
|
|
self.height = int(height)
|
|
self.fps = int(fps)
|
|
self.clock = MediaClock()
|
|
self._next_frame_index = 0
|
|
|
|
async def next_timestamp(self) -> tuple[int, float]:
|
|
pts = int(self._next_frame_index * (1 / self.fps) * 90000)
|
|
time_base = 1 / 90000
|
|
return pts, time_base
|
|
|
|
async def recv(self) -> VideoFrame:
|
|
pts, time_base = await self.next_timestamp()
|
|
|
|
# schedule frame according to clock
|
|
target_t = self._next_frame_index / self.fps
|
|
now = self.clock.now()
|
|
if target_t > now:
|
|
await asyncio.sleep(target_t - now)
|
|
|
|
self._next_frame_index += 1
|
|
|
|
frame_array: npt.NDArray[np.uint8] = np.zeros(
|
|
(self.height, self.width, 3), dtype=np.uint8
|
|
)
|
|
|
|
# Display model loading status prominently
|
|
status_text = _model_loading_status
|
|
progress = _model_loading_progress
|
|
|
|
# Draw status background (increased height for larger text)
|
|
cv2.rectangle(frame_array, (0, 0), (self.width, 80), (0, 0, 0), -1)
|
|
|
|
# Draw progress bar if loading
|
|
if progress < 1.0 and "Ready" not in status_text:
|
|
bar_width = int(progress * (self.width - 40))
|
|
cv2.rectangle(frame_array, (20, 55), (20 + bar_width, 70), (0, 255, 0), -1)
|
|
cv2.rectangle(
|
|
frame_array, (20, 55), (self.width - 20, 70), (255, 255, 255), 2
|
|
)
|
|
|
|
# Draw status text (larger font)
|
|
cv2.putText(
|
|
frame_array,
|
|
f"Status: {status_text}",
|
|
(10, 35),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
1.2,
|
|
(255, 255, 255),
|
|
3,
|
|
)
|
|
|
|
# Select the most active processor (highest RMS) and draw its waveform
|
|
best_proc = None
|
|
best_rms = 0.0
|
|
try:
|
|
for pname, proc in _audio_processors.items():
|
|
try:
|
|
arr = getattr(proc, "current_phrase_audio", None)
|
|
if arr is None or len(arr) == 0:
|
|
continue
|
|
rms = float(np.sqrt(np.mean(arr**2)))
|
|
if rms > best_rms:
|
|
best_rms = rms
|
|
best_proc = (pname, arr.copy())
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
best_proc = None
|
|
|
|
if best_proc is not None:
|
|
pname, arr = best_proc
|
|
|
|
# Use the entire current phrase audio (from the start of the ongoing recording)
|
|
# This ensures the waveform shows audio from when recording began until it is processed.
|
|
if len(arr) <= 0:
|
|
arr_segment = np.zeros(1, dtype=np.float32)
|
|
else:
|
|
# Copy the buffer so downstream operations (resizing/bucketing) are safe
|
|
arr_segment = arr.copy()
|
|
|
|
# Normalize segment to -1..1 safely
|
|
maxv = float(np.max(np.abs(arr_segment))) if arr_segment.size > 0 else 0.0
|
|
if maxv > 0:
|
|
norm = arr_segment / maxv
|
|
else:
|
|
norm = np.zeros_like(arr_segment)
|
|
|
|
# Map audio samples to pixels across the width
|
|
if norm.size < self.width:
|
|
padded = np.zeros(self.width, dtype=np.float32)
|
|
if norm.size > 0:
|
|
padded[-norm.size :] = norm
|
|
norm = padded
|
|
else:
|
|
block = int(np.ceil(norm.size / self.width))
|
|
norm = np.array(
|
|
[
|
|
np.mean(norm[i * block : min((i + 1) * block, norm.size)])
|
|
for i in range(self.width)
|
|
],
|
|
dtype=np.float32,
|
|
)
|
|
|
|
# Create polyline points, avoid NaN
|
|
points: list[tuple[int, int]] = []
|
|
for x in range(self.width):
|
|
v = float(norm[x]) if x < norm.size and not np.isnan(norm[x]) else 0.0
|
|
y = (
|
|
int((1.0 - ((v + 1.0) / 2.0)) * (self.height - 90)) + 80
|
|
) # Offset below taller status bar
|
|
points.append((x, max(80, min(self.height - 1, y))))
|
|
|
|
if len(points) > 1:
|
|
pts_np = np.array(points, dtype=np.int32)
|
|
cv2.polylines(
|
|
frame_array,
|
|
[pts_np],
|
|
isClosed=False,
|
|
color=(0, 200, 80),
|
|
thickness=2,
|
|
)
|
|
|
|
cv2.putText(
|
|
frame_array,
|
|
f"Waveform: {pname}",
|
|
(10, self.height - 15),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
1.0,
|
|
(255, 255, 255),
|
|
2,
|
|
)
|
|
else:
|
|
cv2.putText(
|
|
frame_array,
|
|
"No audio",
|
|
(10, self.height - 15),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
1.2,
|
|
(200, 200, 200),
|
|
2,
|
|
)
|
|
|
|
frame = VideoFrame.from_ndarray(frame_array, format="bgr24")
|
|
frame.pts = pts
|
|
frame.time_base = fractions.Fraction(1 / 90000).limit_denominator(1000000)
|
|
return frame
|
|
|
|
|
|
async def handle_track_received(peer: Peer, track: MediaStreamTrack) -> None:
|
|
"""Handle incoming audio tracks from WebRTC peers."""
|
|
global _audio_processors, _send_chat_func
|
|
|
|
if track.kind != "audio":
|
|
logger.info(f"Ignoring non-audio track from {peer.peer_name}: {track.kind}")
|
|
return
|
|
|
|
if peer.peer_name not in _audio_processors:
|
|
if _send_chat_func is None:
|
|
logger.error(
|
|
f"Cannot create processor for {peer.peer_name}: no send_chat_func"
|
|
)
|
|
return
|
|
|
|
# Start background task to load model and create processor
|
|
async def init_processor():
|
|
global _model_loading_status, _model_loading_progress
|
|
# Load model asynchronously to avoid blocking frame reading
|
|
_model_loading_status = "Initializing model loading..."
|
|
_model_loading_progress = 0.0
|
|
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, _ensure_model_loaded)
|
|
|
|
_model_loading_status = "Model loaded, creating processor..."
|
|
_model_loading_progress = 0.8
|
|
|
|
logger.info(f"Creating OptimizedAudioProcessor for {peer.peer_name}")
|
|
if _send_chat_func is None:
|
|
logger.error(f"No send_chat_func available for {peer.peer_name}")
|
|
_model_loading_status = "Error: No send function available"
|
|
return
|
|
_audio_processors[peer.peer_name] = OptimizedAudioProcessor(
|
|
peer_name=peer.peer_name, send_chat_func=_send_chat_func
|
|
)
|
|
|
|
audio_processor = _audio_processors[peer.peer_name]
|
|
# asyncio.create_task(calibrate_vad(audio_processor))
|
|
|
|
_model_loading_status = "Ready for transcription"
|
|
_model_loading_progress = 1.0
|
|
logger.info(f"Starting OpenVINO audio processing for {peer.peer_name}")
|
|
|
|
# Now start processing frames
|
|
frame_count = 0
|
|
while True:
|
|
try:
|
|
frame = await track.recv()
|
|
frame_count += 1
|
|
|
|
if frame_count % 100 == 0:
|
|
logger.debug(
|
|
f"Received {frame_count} frames from {peer.peer_name}"
|
|
)
|
|
|
|
except MediaStreamError as e:
|
|
logger.info(f"Audio stream ended for {peer.peer_name}: {e}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error receiving frame from {peer.peer_name}: {e}")
|
|
break
|
|
|
|
if isinstance(frame, AudioFrame):
|
|
try:
|
|
# Convert frame to numpy array
|
|
audio_data = frame.to_ndarray()
|
|
|
|
# Handle audio format conversion
|
|
audio_data = _process_audio_frame(audio_data, frame)
|
|
|
|
# Resample if needed
|
|
if frame.sample_rate != SAMPLE_RATE:
|
|
audio_data = _resample_audio(
|
|
audio_data, frame.sample_rate, SAMPLE_RATE
|
|
)
|
|
|
|
# Convert to float32
|
|
audio_data_float32 = audio_data.astype(np.float32)
|
|
|
|
# Process with optimized processor
|
|
audio_processor.add_audio_data(audio_data_float32)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing audio frame for {peer.peer_name}: {e}"
|
|
)
|
|
continue
|
|
|
|
asyncio.create_task(init_processor())
|
|
return # Exit early, processing is handled in background
|
|
|
|
# If processor already exists, just continue processing
|
|
audio_processor = _audio_processors[peer.peer_name]
|
|
logger.info(f"Continuing OpenVINO audio processing for {peer.peer_name}")
|
|
|
|
try:
|
|
frame_count = 0
|
|
while True:
|
|
try:
|
|
frame = await track.recv()
|
|
frame_count += 1
|
|
|
|
if frame_count % 100 == 0:
|
|
logger.debug(f"Received {frame_count} frames from {peer.peer_name}")
|
|
|
|
except MediaStreamError as e:
|
|
logger.info(f"Audio stream ended for {peer.peer_name}: {e}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error receiving frame from {peer.peer_name}: {e}")
|
|
break
|
|
|
|
logger.info(f"Processing frame {frame_count} from {peer.peer_name}")
|
|
if isinstance(frame, AudioFrame):
|
|
try:
|
|
# Convert frame to numpy array
|
|
audio_data = frame.to_ndarray()
|
|
|
|
# Handle audio format conversion
|
|
audio_data = _process_audio_frame(audio_data, frame)
|
|
|
|
# Resample if needed
|
|
if frame.sample_rate != SAMPLE_RATE:
|
|
audio_data = _resample_audio(
|
|
audio_data, frame.sample_rate, SAMPLE_RATE
|
|
)
|
|
|
|
# Convert to float32
|
|
audio_data_float32 = audio_data.astype(np.float32)
|
|
|
|
# Process with optimized processor
|
|
audio_processor.add_audio_data(audio_data_float32)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing audio frame for {peer.peer_name}: {e}"
|
|
)
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Unexpected error in audio processing for {peer.peer_name}: {e}",
|
|
exc_info=True,
|
|
)
|
|
finally:
|
|
cleanup_peer_processor(peer.peer_name)
|
|
|
|
|
|
def _process_audio_frame(
|
|
audio_data: npt.NDArray[Any], frame: AudioFrame
|
|
) -> npt.NDArray[np.float32]:
|
|
"""Process audio frame format conversion."""
|
|
# Handle stereo to mono conversion
|
|
if audio_data.ndim == 2:
|
|
if audio_data.shape[0] == 1:
|
|
audio_data = audio_data.squeeze(0)
|
|
else:
|
|
audio_data = np.mean(
|
|
audio_data, axis=0 if audio_data.shape[0] > audio_data.shape[1] else 1
|
|
)
|
|
|
|
# Normalize based on data type
|
|
if audio_data.dtype == np.int16:
|
|
audio_data = audio_data.astype(np.float32) / 32768.0
|
|
elif audio_data.dtype == np.int32:
|
|
audio_data = audio_data.astype(np.float32) / 2147483648.0
|
|
|
|
return audio_data.astype(np.float32)
|
|
|
|
|
|
def _resample_audio(
|
|
audio_data: npt.NDArray[np.float32], orig_sr: int, target_sr: int
|
|
) -> npt.NDArray[np.float32]:
|
|
"""Resample audio efficiently."""
|
|
try:
|
|
# Handle stereo audio by converting to mono if necessary
|
|
if audio_data.ndim > 1:
|
|
audio_data = np.mean(audio_data, axis=1)
|
|
|
|
# Use high-quality resampling
|
|
resampled = librosa.resample(
|
|
audio_data.astype(np.float64),
|
|
orig_sr=orig_sr,
|
|
target_sr=target_sr,
|
|
res_type="kaiser_fast", # Good balance of quality and speed
|
|
)
|
|
return resampled.astype(np.float32)
|
|
except Exception as e:
|
|
logger.error(f"Resampling failed: {e}")
|
|
raise ValueError(
|
|
f"Failed to resample audio from {orig_sr} Hz to {target_sr} Hz: {e}"
|
|
)
|
|
|
|
|
|
# Public API functions
|
|
def agent_info() -> Dict[str, str]:
|
|
return {"name": AGENT_NAME, "description": AGENT_DESCRIPTION, "has_media": "true"}
|
|
|
|
|
|
def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]:
|
|
"""Create agent tracks. Provides a synthetic video waveform track and a silent audio track for compatibility."""
|
|
|
|
class SilentAudioTrack(MediaStreamTrack):
|
|
kind = "audio"
|
|
|
|
def __init__(
|
|
self, sample_rate: int = SAMPLE_RATE, channels: int = 1, fps: int = 50
|
|
):
|
|
super().__init__()
|
|
self.sample_rate = sample_rate
|
|
self.channels = channels
|
|
self.fps = fps
|
|
self.samples_per_frame = int(self.sample_rate / self.fps)
|
|
self._timestamp = 0
|
|
|
|
async def recv(self) -> AudioFrame:
|
|
# Generate silent audio as int16 (required by aiortc)
|
|
data = np.zeros((self.channels, self.samples_per_frame), dtype=np.int16)
|
|
frame = AudioFrame.from_ndarray(
|
|
data, layout="mono" if self.channels == 1 else "stereo"
|
|
)
|
|
frame.sample_rate = self.sample_rate
|
|
frame.pts = self._timestamp
|
|
frame.time_base = fractions.Fraction(1, self.sample_rate)
|
|
self._timestamp += self.samples_per_frame
|
|
await asyncio.sleep(1 / self.fps)
|
|
return frame
|
|
|
|
try:
|
|
video_track = WaveformVideoTrack(
|
|
session_name=session_name, width=640, height=480, fps=15
|
|
)
|
|
audio_track = SilentAudioTrack()
|
|
return {"video": video_track, "audio": audio_track}
|
|
except Exception as e:
|
|
logger.error(f"Failed to create agent tracks: {e}")
|
|
return {}
|
|
|
|
|
|
async def handle_chat_message(
|
|
chat_message: ChatMessageModel, send_message_func: Callable[[str], Awaitable[None]]
|
|
) -> Optional[str]:
|
|
"""Handle incoming chat messages."""
|
|
return None
|
|
|
|
|
|
async def on_track_received(peer: Peer, track: MediaStreamTrack) -> None:
|
|
"""Callback when a new track is received from a peer."""
|
|
await handle_track_received(peer, track)
|
|
|
|
|
|
def get_track_handler() -> Callable[[Peer, MediaStreamTrack], Awaitable[None]]:
|
|
"""Return the track handler function."""
|
|
return on_track_received
|
|
|
|
|
|
def bind_send_chat_function(send_chat_func: Callable[[str], Awaitable[None]]) -> None:
|
|
"""Bind the send chat function."""
|
|
global _send_chat_func, _audio_processors
|
|
|
|
logger.info("Binding send chat function to OpenVINO whisper agent")
|
|
_send_chat_func = send_chat_func
|
|
|
|
# Update existing processors
|
|
for peer_name, processor in _audio_processors.items():
|
|
processor.send_chat_func = send_chat_func
|
|
logger.debug(f"Updated processor for {peer_name} with new send chat function")
|
|
|
|
|
|
def cleanup_peer_processor(peer_name: str) -> None:
|
|
"""Clean up processor for disconnected peer."""
|
|
global _audio_processors
|
|
|
|
if peer_name in _audio_processors:
|
|
logger.info(f"Cleaning up processor for {peer_name}")
|
|
processor = _audio_processors[peer_name]
|
|
processor.shutdown()
|
|
del _audio_processors[peer_name]
|
|
logger.info(f"Processor cleanup complete for {peer_name}")
|
|
|
|
|
|
def get_active_processors() -> Dict[str, OptimizedAudioProcessor]:
|
|
"""Get active processors for debugging."""
|
|
return _audio_processors.copy()
|
|
|
|
|
|
def get_model_info() -> Dict[str, Any]:
|
|
"""Get information about the loaded model."""
|
|
ov_model = _ensure_model_loaded()
|
|
return {
|
|
"model_id": _model_id,
|
|
"device": _ov_config.device,
|
|
"quantization_enabled": _ov_config.enable_quantization,
|
|
"is_quantized": ov_model.is_quantized,
|
|
"sample_rate": SAMPLE_RATE,
|
|
"chunk_duration_ms": CHUNK_DURATION_MS,
|
|
"loading_status": _model_loading_status,
|
|
"loading_progress": _model_loading_progress,
|
|
}
|