Improved normalizazation visualization

This commit is contained in:
James Ketr 2025-09-15 16:59:28 -07:00
parent 916151307f
commit 3cfc148724
8 changed files with 528 additions and 315 deletions

View File

@ -604,7 +604,7 @@
"Bot Configuration"
],
"summary": "Get Bot Config Schema",
"description": "Get configuration schema for a specific bot",
"description": "Get configuration schema for a specific bot.\n\nThis endpoint will query registered bot providers each time and\nrequest the bot's /config-schema endpoint without relying on any\nserver-side cached schema. This ensures the UI always receives the\nup-to-date schema from the provider.",
"operationId": "get_bot_config_schema_ai_voicebot_api_bots_config_schema__bot_name__get",
"parameters": [
{
@ -641,6 +641,94 @@
}
}
},
"/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}": {
"get": {
"tags": [
"Bot Configuration"
],
"summary": "Get Bot Config Schema By Instance",
"description": "Get configuration schema for a specific bot instance",
"operationId": "get_bot_config_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__get",
"parameters": [
{
"name": "bot_instance_id",
"in": "path",
"required": true,
"schema": {
"type": "string",
"title": "Bot Instance Id"
}
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/BotConfigSchema"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}/refresh": {
"post": {
"tags": [
"Bot Configuration"
],
"summary": "Refresh Bot Schema By Instance",
"description": "Refresh configuration schema for a specific bot instance",
"operationId": "refresh_bot_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__refresh_post",
"parameters": [
{
"name": "bot_instance_id",
"in": "path",
"required": true,
"schema": {
"type": "string",
"title": "Bot Instance Id"
}
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"type": "object",
"additionalProperties": true,
"title": "Response Refresh Bot Schema By Instance Ai Voicebot Api Bots Config Schema Instance Bot Instance Id Refresh Post"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/ai-voicebot/api/bots/config/lobby/{lobby_id}": {
"get": {
"tags": [
@ -978,51 +1066,6 @@
}
}
},
"/ai-voicebot/api/bots/config/schema/{bot_name}/cache": {
"delete": {
"tags": [
"Bot Configuration"
],
"summary": "Clear Bot Schema Cache",
"description": "Clear cached schema for a specific bot",
"operationId": "clear_bot_schema_cache_ai_voicebot_api_bots_config_schema__bot_name__cache_delete",
"parameters": [
{
"name": "bot_name",
"in": "path",
"required": true,
"schema": {
"type": "string",
"title": "Bot Name"
}
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"type": "object",
"additionalProperties": true,
"title": "Response Clear Bot Schema Cache Ai Voicebot Api Bots Config Schema Bot Name Cache Delete"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/ai-voicebot/api/health/ready": {
"get": {
"tags": [

View File

@ -209,6 +209,14 @@ export class ApiClient {
return this.request<any>(this.getApiPath(`/ai-voicebot/api/bots/config/schema/${bot_name}`), { method: "GET" });
}
async getBotConfigSchemaByInstance(bot_instance_id: string): Promise<any> {
return this.request<any>(this.getApiPath(`/ai-voicebot/api/bots/config/schema/instance/${bot_instance_id}`), { method: "GET" });
}
async createRefreshBotSchemaByInstance(bot_instance_id: string): Promise<any> {
return this.request<any>(this.getApiPath(`/ai-voicebot/api/bots/config/schema/instance/${bot_instance_id}/refresh`), { method: "POST" });
}
async getLobbyBotConfigs(lobby_id: string): Promise<any> {
return this.request<any>(this.getApiPath(`/ai-voicebot/api/bots/config/lobby/${lobby_id}`), { method: "GET" });
}
@ -241,10 +249,6 @@ export class ApiClient {
return this.request<any>(this.getApiPath(`/ai-voicebot/api/bots/config/schema/${bot_name}/refresh`), { method: "POST" });
}
async deleteClearBotSchemaCache(bot_name: string): Promise<any> {
return this.request<any>(this.getApiPath(`/ai-voicebot/api/bots/config/schema/${bot_name}/cache`), { method: "DELETE" });
}
async getReadinessProbe(): Promise<any> {
return this.request<any>(this.getApiPath(`/ai-voicebot/api/health/ready`), { method: "GET" });
}
@ -321,6 +325,8 @@ export const botsApi = {
requestLeaveLobby: (bot_instance_id: string) => apiClient.createRequestBotLeaveLobby(bot_instance_id),
getInstance: (bot_instance_id: string) => apiClient.getBotInstance(bot_instance_id),
getSchema: (bot_name: string) => apiClient.getBotConfigSchema(bot_name),
getSchemaByInstance: (bot_instance_id: string) => apiClient.getBotConfigSchemaByInstance(bot_instance_id),
refreshSchema: (bot_instance_id: string) => apiClient.createRefreshBotSchemaByInstance(bot_instance_id),
getBotConfigs: (lobby_id: string) => apiClient.getLobbyBotConfigs(lobby_id),
deleteBotConfigs: (lobby_id: string) => apiClient.deleteLobbyConfigs(lobby_id),
getBotConfig: (lobby_id: string, bot_instance_id: string) => apiClient.getLobbyBotConfig(lobby_id, bot_instance_id),
@ -328,8 +334,7 @@ export const botsApi = {
updateConfig: (data: any) => apiClient.createUpdateBotConfig(data),
getConfigStatistics: () => apiClient.getConfigStatistics(),
refreshAllSchemas: () => apiClient.createRefreshBotSchemas(),
refreshSchema: (bot_name: string) => apiClient.createRefreshBotSchema(bot_name),
clearSchemaCache: (bot_name: string) => apiClient.deleteClearBotSchemaCache(bot_name)
refreshSchemaByName: (bot_name: string) => apiClient.createRefreshBotSchema(bot_name)
};
export const metricsApi = {

View File

@ -6,13 +6,13 @@ import { base } from './Common';
export const knownEndpoints = new Set([
`DELETE:${base}/api/bots/config/lobby/{lobby_id}`,
`DELETE:${base}/api/bots/config/lobby/{lobby_id}/bot/{bot_instance_id}`,
`DELETE:${base}/api/bots/config/schema/{bot_name}/cache`,
`GET:${base}/api/admin/names`,
`GET:${base}/api/admin/session_metrics`,
`GET:${base}/api/admin/validate_sessions`,
`GET:${base}/api/bots`,
`GET:${base}/api/bots/config/lobby/{lobby_id}`,
`GET:${base}/api/bots/config/lobby/{lobby_id}/bot/{bot_instance_id}`,
`GET:${base}/api/bots/config/schema/instance/{bot_instance_id}`,
`GET:${base}/api/bots/config/schema/{bot_name}`,
`GET:${base}/api/bots/config/statistics`,
`GET:${base}/api/bots/instances/{bot_instance_id}`,
@ -34,6 +34,7 @@ export const knownEndpoints = new Set([
`POST:${base}/api/admin/clear_password`,
`POST:${base}/api/admin/set_password`,
`POST:${base}/api/bots/config/refresh-schemas`,
`POST:${base}/api/bots/config/schema/instance/{bot_instance_id}/refresh`,
`POST:${base}/api/bots/config/schema/{bot_name}/refresh`,
`POST:${base}/api/bots/config/update`,
`POST:${base}/api/bots/instances/{bot_instance_id}/leave`,

View File

@ -111,10 +111,29 @@ export interface paths {
"/ai-voicebot/api/bots/config/schema/{bot_name}": {
/**
* Get Bot Config Schema
* @description Get configuration schema for a specific bot
* @description Get configuration schema for a specific bot.
*
* This endpoint will query registered bot providers each time and
* request the bot's /config-schema endpoint without relying on any
* server-side cached schema. This ensures the UI always receives the
* up-to-date schema from the provider.
*/
get: operations["get_bot_config_schema_ai_voicebot_api_bots_config_schema__bot_name__get"];
};
"/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}": {
/**
* Get Bot Config Schema By Instance
* @description Get configuration schema for a specific bot instance
*/
get: operations["get_bot_config_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__get"];
};
"/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}/refresh": {
/**
* Refresh Bot Schema By Instance
* @description Refresh configuration schema for a specific bot instance
*/
post: operations["refresh_bot_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__refresh_post"];
};
"/ai-voicebot/api/bots/config/lobby/{lobby_id}": {
/**
* Get Lobby Bot Configs
@ -167,13 +186,6 @@ export interface paths {
*/
post: operations["refresh_bot_schema_ai_voicebot_api_bots_config_schema__bot_name__refresh_post"];
};
"/ai-voicebot/api/bots/config/schema/{bot_name}/cache": {
/**
* Clear Bot Schema Cache
* @description Clear cached schema for a specific bot
*/
delete: operations["clear_bot_schema_cache_ai_voicebot_api_bots_config_schema__bot_name__cache_delete"];
};
"/ai-voicebot/api/health/ready": {
/**
* Readiness Probe
@ -1146,7 +1158,12 @@ export interface operations {
};
/**
* Get Bot Config Schema
* @description Get configuration schema for a specific bot
* @description Get configuration schema for a specific bot.
*
* This endpoint will query registered bot providers each time and
* request the bot's /config-schema endpoint without relying on any
* server-side cached schema. This ensures the UI always receives the
* up-to-date schema from the provider.
*/
get_bot_config_schema_ai_voicebot_api_bots_config_schema__bot_name__get: {
parameters: {
@ -1169,6 +1186,58 @@ export interface operations {
};
};
};
/**
* Get Bot Config Schema By Instance
* @description Get configuration schema for a specific bot instance
*/
get_bot_config_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__get: {
parameters: {
path: {
bot_instance_id: string;
};
};
responses: {
/** @description Successful Response */
200: {
content: {
"application/json": components["schemas"]["BotConfigSchema"];
};
};
/** @description Validation Error */
422: {
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
/**
* Refresh Bot Schema By Instance
* @description Refresh configuration schema for a specific bot instance
*/
refresh_bot_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__refresh_post: {
parameters: {
path: {
bot_instance_id: string;
};
};
responses: {
/** @description Successful Response */
200: {
content: {
"application/json": {
[key: string]: unknown;
};
};
};
/** @description Validation Error */
422: {
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
/**
* Get Lobby Bot Configs
* @description Get all bot configurations for a lobby
@ -1364,33 +1433,6 @@ export interface operations {
};
};
};
/**
* Clear Bot Schema Cache
* @description Clear cached schema for a specific bot
*/
clear_bot_schema_cache_ai_voicebot_api_bots_config_schema__bot_name__cache_delete: {
parameters: {
path: {
bot_name: string;
};
};
responses: {
/** @description Successful Response */
200: {
content: {
"application/json": {
[key: string]: unknown;
};
};
};
/** @description Validation Error */
422: {
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
/**
* Readiness Probe
* @description Kubernetes readiness probe endpoint.

View File

@ -201,7 +201,31 @@ class ApiClientGenerator {
// Generate convenience methods for each namespace
const namespaceDefinitions = Object.entries(namespaceGroups).map(([namespace, endpoints]) => {
const methods = endpoints.map(endpoint => this.generateConvenienceMethod(endpoint)).join(',\n ');
// Track used method names to avoid duplicate object keys
const usedNames = new Set();
const methods = endpoints.map(endpoint => {
// Compute desired method name
let candidate = this.generateConvenienceMethodName(endpoint);
// If name already used, try to disambiguate with contextual suffixes
if (usedNames.has(candidate)) {
if (endpoint.path.includes('/instance/')) {
candidate = candidate + 'ByInstance';
} else if (endpoint.path.match(/\/config\/schema\/\{?bot_name\}?/)) {
candidate = candidate + 'ByName';
} else {
// Fallback: append a numeric suffix to ensure uniqueness
let i = 2;
while (usedNames.has(candidate + i)) i++;
candidate = candidate + i;
}
}
usedNames.add(candidate);
return this.generateConvenienceMethod(endpoint, candidate);
}).join(',\n ');
return `export const ${namespace}Api = {\n ${methods}\n};`;
}).join('\n\n');
@ -255,8 +279,8 @@ class ApiClientGenerator {
/**
* Generate a convenience method for an endpoint with intuitive naming
*/
generateConvenienceMethod(endpoint) {
const methodName = this.generateConvenienceMethodName(endpoint);
generateConvenienceMethod(endpoint, overrideName) {
const methodName = overrideName || this.generateConvenienceMethodName(endpoint);
const clientMethodName = this.generateMethodName(endpoint);
const params = this.extractMethodParameters(endpoint);

View File

@ -61,14 +61,16 @@ def create_bot_config_router(
@router.get("/schema/{bot_name}")
async def get_bot_config_schema(bot_name: str) -> BotConfigSchema: # type: ignore
"""Get configuration schema for a specific bot"""
try:
# Check if we have cached schema
schema = config_manager.get_bot_config_schema(bot_name)
"""Get configuration schema for a specific bot.
if not schema:
# Try to discover schema from bot provider
This endpoint will query registered bot providers each time and
request the bot's /config-schema endpoint without relying on any
server-side cached schema. This ensures the UI always receives the
up-to-date schema from the provider.
"""
try:
providers_response = bot_manager.list_providers()
schema = None
for provider in providers_response.providers:
try:
# Check if this provider has the bot
@ -81,8 +83,9 @@ def create_bot_config_router(
# Get the full provider object to access base_url
full_provider = bot_manager.get_provider(provider.provider_id)
if full_provider:
# Force discovery from the provider on every request
schema = await config_manager.discover_bot_config_schema(
bot_name, full_provider.base_url
bot_name, full_provider.base_url, force_refresh=True
)
if schema:
break
@ -91,34 +94,6 @@ def create_bot_config_router(
f"Failed to check provider {provider.provider_id} for bot {bot_name}: {e}"
)
continue
else:
# We have a cached schema, but check if it might be stale
# Try to refresh it automatically if it's older than 1 hour
providers_response = bot_manager.list_providers()
for provider in providers_response.providers:
try:
provider_bots = await bot_manager.get_provider_bots(
provider.provider_id
)
bot_names = [bot.name for bot in provider_bots.bots]
if bot_name in bot_names:
# This will only refresh if the cached schema is older than 1 hour
# Get the full provider object to access base_url
full_provider = bot_manager.get_provider(provider.provider_id)
if full_provider:
fresh_schema = (
await config_manager.discover_bot_config_schema(
bot_name, full_provider.base_url, force_refresh=False
)
)
if fresh_schema:
schema = fresh_schema
break
except Exception as e:
logger.debug(f"Failed to refresh schema for {bot_name}: {e}")
# Continue with cached schema if refresh fails
continue
if not schema:
raise HTTPException(
@ -142,16 +117,12 @@ def create_bot_config_router(
bot_instance = await bot_manager.get_bot_instance(bot_instance_id)
bot_name = bot_instance.bot_name
# Check if we have cached schema
schema = config_manager.get_bot_config_schema(bot_name)
if not schema:
# Try to discover schema from bot provider
# Always query the provider directly for the latest schema
provider = bot_manager.get_provider(bot_instance.provider_id)
if provider:
try:
schema = await config_manager.discover_bot_config_schema(
bot_name, provider.base_url
bot_name, provider.base_url, force_refresh=True
)
except Exception as e:
logger.warning(
@ -195,8 +166,9 @@ def create_bot_config_router(
status_code=404, detail=f"Provider '{provider_id}' not found"
)
schema = await config_manager.refresh_bot_schema(
bot_name, provider.base_url
# Force a fresh fetch from the provider
schema = await config_manager.discover_bot_config_schema(
bot_name, provider.base_url, force_refresh=True
)
if schema:
return {
@ -478,25 +450,9 @@ def create_bot_config_router(
logger.error(f"Failed to refresh schema for bot {bot_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/schema/{bot_name}/cache")
async def clear_bot_schema_cache(bot_name: str) -> Dict[str, Any]:
"""Clear cached schema for a specific bot"""
try:
success = config_manager.clear_bot_schema_cache(bot_name)
if success:
return {
"success": True,
"message": f"Schema cache cleared for bot {bot_name}",
}
else:
raise HTTPException(
status_code=404, detail=f"No cached schema found for bot {bot_name}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to clear schema cache for bot {bot_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# NOTE: schema cache management endpoints removed. Server no longer
# stores or serves cached bot config schemas; callers should fetch
# live schemas from providers via the /schema endpoints.
return router

View File

@ -31,10 +31,10 @@ class BotConfigManager:
def __init__(self, storage_dir: str = "./bot_configs"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
# In-memory cache for fast access
self.config_cache: Dict[str, Dict[str, BotLobbyConfig]] = {} # lobby_id -> bot_name -> config
self.schema_cache: Dict[str, BotConfigSchema] = {} # bot_name -> schema
self.config_cache: Dict[
str, Dict[str, BotLobbyConfig]
] = {} # lobby_id -> bot_name -> config
# Load existing configurations
self._load_configurations()
@ -54,7 +54,7 @@ class BotConfigManager:
for config_file in self.storage_dir.glob("lobby_*.json"):
try:
lobby_id = config_file.stem.replace("lobby_", "")
with open(config_file, 'r') as f:
with open(config_file, "r") as f:
data = json.load(f)
self.config_cache[lobby_id] = {}
@ -65,20 +65,7 @@ class BotConfigManager:
except Exception as e:
logger.error(f"Failed to load lobby config {config_file}: {e}")
# Load bot schemas
for schema_file in self.storage_dir.glob("schema_*.json"):
try:
bot_name = schema_file.stem.replace("schema_", "")
with open(schema_file, 'r') as f:
schema_data = json.load(f)
schema = BotConfigSchema(**schema_data)
self.schema_cache[bot_name] = schema
except Exception as e:
logger.error(f"Failed to load bot schema {schema_file}: {e}")
logger.info(f"Loaded configurations for {len(self.config_cache)} lobbies and {len(self.schema_cache)} bot schemas")
logger.info(f"Loaded configurations for {len(self.config_cache)} lobbies")
except Exception as e:
logger.error(f"Failed to load configurations: {e}")
@ -96,7 +83,7 @@ class BotConfigManager:
for bot_name, config in self.config_cache[lobby_id].items():
data[bot_name] = config.model_dump()
with open(config_file, 'w') as f:
with open(config_file, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
@ -104,83 +91,49 @@ class BotConfigManager:
def _save_bot_schema(self, bot_name: str):
"""Save bot schema to disk"""
try:
if bot_name not in self.schema_cache:
return
schema_file = self._get_schema_file(bot_name)
schema_data = self.schema_cache[bot_name].model_dump()
with open(schema_file, 'w') as f:
json.dump(schema_data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save bot schema {bot_name}: {e}")
# Schema file persistence disabled: server no longer caches provider schemas to disk.
logger.debug(
f"_save_bot_schema called for {bot_name}, but schema persistence is disabled"
)
async def discover_bot_config_schema(
self, bot_name: str, provider_url: str, force_refresh: bool = False
) -> Optional[BotConfigSchema]:
"""Discover configuration schema from bot provider"""
try:
# Check if we have a cached schema and it's not forced refresh
if not force_refresh and bot_name in self.schema_cache:
cached_schema = self.schema_cache[bot_name]
# Check if schema is less than 1 hour old
schema_file = self._get_schema_file(bot_name)
if schema_file.exists():
file_age = time.time() - schema_file.stat().st_mtime
if file_age < 3600: # 1 hour
logger.debug(
f"Using cached schema for bot {bot_name} (age: {file_age:.0f}s)"
)
return cached_schema
# Always fetch schema directly from the provider; do not use or
# update any server-side caches or files. This ensures callers
# receive the live schema from the provider on each request.
async with httpx.AsyncClient() as client:
# Try to get configuration schema from bot provider
response = await client.get(
f"{provider_url}/bots/{bot_name}/config-schema",
timeout=10.0
timeout=10.0,
)
if response.status_code == 200:
schema_data = response.json()
schema = BotConfigSchema(**schema_data)
# Check if schema has actually changed
if bot_name in self.schema_cache:
old_schema = self.schema_cache[bot_name]
if old_schema.model_dump() == schema.model_dump():
logger.debug(
f"Schema for bot {bot_name} unchanged, updating timestamp only"
)
else:
logger.info(f"Schema for bot {bot_name} has been updated")
# Cache the schema
self.schema_cache[bot_name] = schema
self._save_bot_schema(bot_name)
logger.info(
f"Discovered/refreshed config schema for bot {bot_name}"
)
logger.info(f"Fetched live config schema for bot {bot_name}")
return schema
else:
logger.warning(f"Bot {bot_name} does not support configuration (HTTP {response.status_code})")
logger.warning(
f"Bot {bot_name} does not support configuration (HTTP {response.status_code})"
)
except Exception as e:
logger.warning(f"Failed to discover config schema for bot {bot_name}: {e}")
# Return cached schema if available, even if refresh failed
if bot_name in self.schema_cache:
logger.info(
f"Returning cached schema for bot {bot_name} after refresh failure"
)
return self.schema_cache[bot_name]
return None
def get_bot_config_schema(self, bot_name: str) -> Optional[BotConfigSchema]:
"""Get cached configuration schema for a bot"""
return self.schema_cache.get(bot_name)
"""Deprecated: server no longer maintains a cached schema.
Return None to indicate no server-side cached schema is available.
"""
logger.debug(
"get_bot_config_schema called but server-side schema cache is disabled"
)
return None
async def refresh_bot_schema(
self, bot_name: str, provider_url: str
@ -192,14 +145,11 @@ class BotConfigManager:
def clear_bot_schema_cache(self, bot_name: str) -> bool:
"""Clear cached schema for a specific bot"""
if bot_name in self.schema_cache:
del self.schema_cache[bot_name]
# Also remove the cached file
schema_file = self._get_schema_file(bot_name)
if schema_file.exists():
schema_file.unlink()
logger.info(f"Cleared schema cache for bot {bot_name}")
return True
# No-op: server-side schema caching has been disabled. Return False to
# indicate there was no cached schema to clear.
logger.info(
f"clear_bot_schema_cache called for {bot_name} but caching is disabled"
)
return False
def get_lobby_bot_config(self, lobby_id: str, bot_name: str) -> Optional[BotLobbyConfig]:
@ -221,12 +171,9 @@ class BotConfigManager:
config_values: Dict[str, Any],
session_id: str) -> BotLobbyConfig:
"""Set or update bot configuration for a lobby"""
# Validate configuration against schema if available
schema = self.get_bot_config_schema(bot_name)
if schema:
validated_values = self._validate_config_values(config_values, schema)
else:
# Schema validation against a server-side cache is disabled. If
# callers want strict validation, they should fetch the provider's
# live schema and validate prior to calling set_bot_config.
validated_values = config_values
# Create or update configuration
@ -388,9 +335,9 @@ class BotConfigManager:
return {
"total_lobbies": len(self.config_cache),
"total_configs": total_configs,
"cached_schemas": len(self.schema_cache),
# server-side schema caching is disabled; omit cached_schemas
"lobbies": {
lobby_id: len(configs)
for lobby_id, configs in self.config_cache.items()
}
},
}

View File

@ -254,6 +254,12 @@ VAD_CONFIG = {
"speech_freq_max": 3000, # Hz
}
# Normalization defaults: used to control optional per-stream normalization
# applied before sending audio to the model and for visualization.
NORMALIZATION_ENABLED = True
NORMALIZATION_TARGET_PEAK = 0.95
MAX_NORMALIZATION_GAIN = 3.0
# How long (seconds) of no-arriving audio before we consider the phrase ended
INACTIVITY_TIMEOUT = 1.5
@ -1154,6 +1160,15 @@ class OptimizedAudioProcessor:
# Enhanced VAD parameters with EMA for noise adaptation
self.advanced_vad = AdvancedVAD(sample_rate=self.sample_rate)
# Track maximum observed absolute amplitude for this input stream
# This is used optionally to normalize incoming audio to the "observed"
# maximum which helps models expect a consistent level across peers.
# It's intentionally permissive and capped to avoid amplifying noise.
self.max_observed_amplitude: float = 1e-6
self.normalization_enabled: bool = True
self.normalization_target_peak: float = 0.95
self.max_normalization_gain: float = 3.0 # avoid amplifying tiny noise too much
# Processing state
self.current_phrase_audio = np.array([], dtype=np.float32)
self.transcription_history: List[TranscriptionHistoryItem] = []
@ -1232,6 +1247,15 @@ class OptimizedAudioProcessor:
# Update last audio time whenever any audio is received
self.last_audio_time = time.time()
# Update max observed amplitude (used later for optional normalization)
try:
peak = float(np.max(np.abs(audio_data))) if audio_data.size > 0 else 0.0
if peak > self.max_observed_amplitude:
self.max_observed_amplitude = float(peak)
except Exception:
# Be defensive - don't fail audio ingestion for amplitude tracking
pass
is_speech, vad_metrics = self.advanced_vad.analyze_frame(audio_data)
# Update visualization status
@ -1386,8 +1410,10 @@ class OptimizedAudioProcessor:
asyncio.run_coroutine_threadsafe(_send_final_marker(), self.main_loop)
except Exception:
logger.debug(f"Could not schedule final marker for {self.peer_name}")
else:
# As a fallback, try to schedule the normal coroutine if possible
# As a fallback (if we couldn't schedule the marker on the
# main loop), try to schedule the normal async transcription
# coroutine. This is only used when the immediate marker
# cannot be scheduled — avoid scheduling both paths.
try:
asyncio.create_task(
self._transcribe_and_send(
@ -1538,9 +1564,17 @@ class OptimizedAudioProcessor:
logger.info(
f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError, inactivity)"
)
# Avoid duplicate finals: if a final is already pending
# (for example the blocking final was queued), skip scheduling
# another final. Otherwise set the pending flag and run the
# final transcription.
if not self.final_transcription_pending:
self.final_transcription_pending = True
await self._transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True
)
else:
logger.debug(f"Final already pending for {self.peer_name}; skipping async final")
self.current_phrase_audio = np.array([], dtype=np.float32)
except Exception as e:
logger.error(
@ -1599,12 +1633,17 @@ class OptimizedAudioProcessor:
logger.info(
f"Final transcription from thread for {self.peer_name} (inactivity)"
)
# Avoid scheduling duplicates if a final is already pending
if not self.final_transcription_pending:
self.final_transcription_pending = True
asyncio.run_coroutine_threadsafe(
self._transcribe_and_send(
self.current_phrase_audio.copy(), is_final=True
),
self.main_loop,
)
else:
logger.debug(f"Final already pending for {self.peer_name}; skipping thread-scheduled final")
self.current_phrase_audio = np.array([], dtype=np.float32)
except Exception as e:
logger.error(
@ -1679,15 +1718,38 @@ class OptimizedAudioProcessor:
try:
audio_duration = len(audio_array) / self.sample_rate
# Skip very short audio
if audio_duration < 0.3:
# Compute basic energy/peak metrics for filtering decisions
audio_rms = float(np.sqrt(np.mean(audio_array**2)))
audio_peak = float(np.max(np.abs(audio_array))) if audio_array.size > 0 else 0.0
# Short-burst filtering: drop very short bursts that are likely noise.
# - If duration < 0.5s and RMS is very low -> drop
# - If duration < 0.8s and peak is very small relative to the
# max observed amplitude -> drop. This prevents single-packet
# random noises from becoming transcriptions.
short_duration_threshold = 0.5
relaxed_short_duration = 0.8
rms_min_threshold = 0.002
relative_peak_min_ratio = 0.05
if audio_duration < short_duration_threshold and audio_rms < rms_min_threshold:
logger.debug(
f"Skipping {transcription_type} transcription: too short ({audio_duration:.2f}s)"
f"Skipping {transcription_type} transcription: short & quiet ({audio_duration:.2f}s, RMS {audio_rms:.6f})"
)
return
# Audio quality check
audio_rms = np.sqrt(np.mean(audio_array**2))
# If we have observed a stronger level on this stream, require a
# sensible fraction of that to consider this burst valid.
max_amp = getattr(self, "max_observed_amplitude", 0.0) or 0.0
if audio_duration < relaxed_short_duration and max_amp > 0.0:
rel = audio_peak / (max_amp + 1e-12)
if rel < relative_peak_min_ratio:
logger.debug(
f"Skipping {transcription_type} transcription: short burst with low relative peak ({audio_duration:.2f}s, rel {rel:.3f})"
)
return
# Very quiet audio - skip entirely
if audio_rms < 0.001:
logger.debug(
f"Skipping {transcription_type} transcription: too quiet (RMS: {audio_rms:.6f})"
@ -1698,8 +1760,28 @@ class OptimizedAudioProcessor:
f"🎬 OpenVINO transcription ({transcription_type}) started: {audio_duration:.2f}s, RMS: {audio_rms:.4f}"
)
# Optionally normalize audio prior to feature extraction. We use
# the historical maximum observed amplitude for this stream to
# compute a conservative gain. The gain is clamped to avoid
# amplifying noise excessively.
audio_for_model = audio_array
try:
if getattr(self, "normalization_enabled", False):
stream_max = getattr(self, "max_observed_amplitude", 0.0) or 0.0
# Use the larger of observed max and current peak to avoid
# over-scaling when current chunk is the loudest.
denom = max(stream_max, audio_peak, 1e-12)
gain = float(self.normalization_target_peak) / denom
# Clamp gain
gain = max(min(gain, float(self.max_normalization_gain)), 0.25)
if abs(gain - 1.0) > 1e-3:
logger.debug(f"Applying normalization gain {gain:.3f} for {self.peer_name}")
audio_for_model = np.clip(audio_array * gain, -0.999, 0.999).astype(np.float32)
except Exception as e:
logger.debug(f"Normalization step failed for {self.peer_name}: {e}")
# Extract features for OpenVINO
input_features = extract_input_features(audio_array, self.sample_rate)
input_features = extract_input_features(audio_for_model, self.sample_rate)
# logger.info(f"Features extracted for OpenVINO: {input_features.shape}")
# GPU inference with OpenVINO
@ -2125,8 +2207,29 @@ class WaveformVideoTrack(MediaStreamTrack):
[np.zeros(samples_needed - len(arr), dtype=np.float32), arr]
)
# Assume arr_segment is already in [-1, 1]
norm = arr_segment
# Single normalization code path: normalize based on the historical
# peak observed for this stream (proc.max_observed_amplitude). This
# ensures the waveform display is consistent over time and avoids
# using the instantaneous buffer peak.
proc = None
norm = arr_segment.astype(np.float32)
try:
proc = _audio_processors.get(pname)
if proc is not None and getattr(proc, "normalization_enabled", False):
stream_max = getattr(proc, "max_observed_amplitude", 0.0) or 0.0
denom = max(stream_max, 1e-12)
gain = float(proc.normalization_target_peak) / denom
gain = max(min(gain, float(proc.max_normalization_gain)), 0.25)
if abs(gain - 1.0) > 1e-6:
norm = np.clip(arr_segment * gain, -1.0, 1.0).astype(np.float32)
else:
norm = arr_segment.astype(np.float32)
else:
norm = arr_segment.astype(np.float32)
except Exception:
# Fall back to raw samples if normalization computation fails
proc = None
norm = arr_segment.astype(np.float32)
# Map audio samples to pixels across the width
if norm.size < self.width:
@ -2144,12 +2247,17 @@ class WaveformVideoTrack(MediaStreamTrack):
dtype=np.float32,
)
# For display we use the same `norm` computed above (single code
# path). Use `display_norm` alias to avoid confusion later in the
# code but don't recompute normalization.
display_norm = norm
# Draw waveform with color coding for speech detection
points: list[tuple[int, int]] = []
colors: list[tuple[int, int, int]] = [] # Color for each point
for x in range(self.width):
v = float(norm[x]) if x < norm.size and not np.isnan(norm[x]) else 0.0
v = float(display_norm[x]) if x < display_norm.size and not np.isnan(display_norm[x]) else 0.0
y = int((1.0 - ((v + 1.0) / 2.0)) * (self.height - 120)) + 100
points.append((x, y))
@ -2169,6 +2277,33 @@ class WaveformVideoTrack(MediaStreamTrack):
for i in range(len(points) - 1):
cv2.line(frame_array, points[i], points[i+1], colors[i], 1)
# Draw historical peak indicator (horizontal lines at +/-(target_peak))
try:
if proc is not None and getattr(proc, "normalization_enabled", False):
target_peak = float(getattr(proc, "normalization_target_peak", 0.0))
# Ensure target_peak is within [0, 1]
target_peak = max(0.0, min(1.0, target_peak))
def _amp_to_y(a: float) -> int:
return int((1.0 - ((a + 1.0) / 2.0)) * (self.height - 120)) + 100
top_y = _amp_to_y(target_peak)
bot_y = _amp_to_y(-target_peak)
# Draw thin magenta lines across the waveform area
cv2.line(frame_array, (0, top_y), (self.width - 1, top_y), (255, 0, 255), 1)
cv2.line(frame_array, (0, bot_y), (self.width - 1, bot_y), (255, 0, 255), 1)
# Label the peak with small text near the right edge
label = f"Peak:{target_peak:.2f}"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
lx = max(10, self.width - tw - 12)
ly = max(12, top_y - 6)
cv2.putText(frame_array, label, (lx, ly), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1)
except Exception:
# Non-critical: ignore any drawing errors
pass
# Add speech detection status overlay
if speech_info:
self._draw_speech_status(frame_array, speech_info, pname)
@ -2706,11 +2841,41 @@ def get_config_schema() -> Dict[str, Any]:
"max_value": 0.5,
"step": 0.05
}
,
{
"name": "normalization_enabled",
"type": "boolean",
"label": "Enable Normalization",
"description": "Normalize incoming audio based on observed peak amplitude before transcription and visualization",
"default_value": NORMALIZATION_ENABLED,
"required": False
},
{
"name": "normalization_target_peak",
"type": "number",
"label": "Normalization Target Peak",
"description": "Target peak (0-1) used when normalizing audio",
"default_value": NORMALIZATION_TARGET_PEAK,
"required": False,
"min_value": 0.5,
"max_value": 1.0
},
{
"name": "max_normalization_gain",
"type": "range",
"label": "Max Normalization Gain",
"description": "Maximum allowed gain applied during normalization",
"default_value": MAX_NORMALIZATION_GAIN,
"required": False,
"min_value": 1.0,
"max_value": 10.0,
"step": 0.1
}
],
"categories": [
{"Model Settings": ["model_id", "device", "enable_quantization"]},
{"Performance Settings": ["throughput_streams", "max_threads"]},
{"Audio Settings": ["sample_rate", "chunk_duration_ms"]},
{"Audio Settings": ["sample_rate", "chunk_duration_ms", "normalization_enabled", "normalization_target_peak", "max_normalization_gain"]},
{"Voice Activity Detection": ["vad_threshold", "max_silence_frames", "max_trailing_silence_frames", "vad_energy_threshold", "vad_zcr_min", "vad_zcr_max", "vad_spectral_centroid_min", "vad_spectral_centroid_max", "vad_spectral_rolloff_threshold", "vad_minimum_duration", "vad_max_history", "vad_noise_floor_energy", "vad_adaptation_rate", "vad_harmonic_threshold"]}
]
}
@ -2839,6 +3004,36 @@ def handle_config_update(lobby_id: str, config_values: Dict[str, Any]) -> bool:
# For now, we'll log that a restart may be needed
logger.info("VAD configuration updated - existing processors may need restart to take effect")
# Normalization updates: apply to global defaults and active processors
norm_updates = False
if "normalization_enabled" in config_values:
NORMALIZATION_ENABLED = bool(config_values["normalization_enabled"])
norm_updates = True
logger.info(f"Updated NORMALIZATION_ENABLED to: {NORMALIZATION_ENABLED}")
if "normalization_target_peak" in config_values:
NORMALIZATION_TARGET_PEAK = float(config_values["normalization_target_peak"])
norm_updates = True
logger.info(f"Updated NORMALIZATION_TARGET_PEAK to: {NORMALIZATION_TARGET_PEAK}")
if "max_normalization_gain" in config_values:
MAX_NORMALIZATION_GAIN = float(config_values["max_normalization_gain"])
norm_updates = True
logger.info(f"Updated MAX_NORMALIZATION_GAIN to: {MAX_NORMALIZATION_GAIN}")
if norm_updates:
# Propagate changes to existing processors
try:
for pname, proc in list(_audio_processors.items()):
try:
proc.normalization_enabled = NORMALIZATION_ENABLED
proc.normalization_target_peak = NORMALIZATION_TARGET_PEAK
proc.max_normalization_gain = MAX_NORMALIZATION_GAIN
logger.info(f"Applied normalization config to processor: {pname}")
except Exception:
logger.debug(f"Failed to apply normalization config to processor: {pname}")
config_applied = True
except Exception:
logger.debug("Failed to propagate normalization settings to processors")
if config_applied:
logger.info(f"Configuration update completed for lobby {lobby_id}")
else: