diff --git a/client/openapi-schema.json b/client/openapi-schema.json index 61b2800..6d24a96 100644 --- a/client/openapi-schema.json +++ b/client/openapi-schema.json @@ -604,7 +604,7 @@ "Bot Configuration" ], "summary": "Get Bot Config Schema", - "description": "Get configuration schema for a specific bot", + "description": "Get configuration schema for a specific bot.\n\nThis endpoint will query registered bot providers each time and\nrequest the bot's /config-schema endpoint without relying on any\nserver-side cached schema. This ensures the UI always receives the\nup-to-date schema from the provider.", "operationId": "get_bot_config_schema_ai_voicebot_api_bots_config_schema__bot_name__get", "parameters": [ { @@ -641,6 +641,94 @@ } } }, + "/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}": { + "get": { + "tags": [ + "Bot Configuration" + ], + "summary": "Get Bot Config Schema By Instance", + "description": "Get configuration schema for a specific bot instance", + "operationId": "get_bot_config_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__get", + "parameters": [ + { + "name": "bot_instance_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Bot Instance Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BotConfigSchema" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, + "/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}/refresh": { + "post": { + "tags": [ + "Bot Configuration" + ], + "summary": "Refresh Bot Schema By Instance", + "description": "Refresh configuration schema for a specific bot instance", + "operationId": "refresh_bot_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__refresh_post", + "parameters": [ + { + "name": "bot_instance_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Bot Instance Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "type": "object", + "additionalProperties": true, + "title": "Response Refresh Bot Schema By Instance Ai Voicebot Api Bots Config Schema Instance Bot Instance Id Refresh Post" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, "/ai-voicebot/api/bots/config/lobby/{lobby_id}": { "get": { "tags": [ @@ -978,51 +1066,6 @@ } } }, - "/ai-voicebot/api/bots/config/schema/{bot_name}/cache": { - "delete": { - "tags": [ - "Bot Configuration" - ], - "summary": "Clear Bot Schema Cache", - "description": "Clear cached schema for a specific bot", - "operationId": "clear_bot_schema_cache_ai_voicebot_api_bots_config_schema__bot_name__cache_delete", - "parameters": [ - { - "name": "bot_name", - "in": "path", - "required": true, - "schema": { - "type": "string", - "title": "Bot Name" - } - } - ], - "responses": { - "200": { - "description": "Successful Response", - "content": { - "application/json": { - "schema": { - "type": "object", - "additionalProperties": true, - "title": "Response Clear Bot Schema Cache Ai Voicebot Api Bots Config Schema Bot Name Cache Delete" - } - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - } - }, "/ai-voicebot/api/health/ready": { "get": { "tags": [ diff --git a/client/src/api-client.ts b/client/src/api-client.ts index 5420a1b..ee51f83 100644 --- a/client/src/api-client.ts +++ b/client/src/api-client.ts @@ -209,6 +209,14 @@ export class ApiClient { return this.request(this.getApiPath(`/ai-voicebot/api/bots/config/schema/${bot_name}`), { method: "GET" }); } + async getBotConfigSchemaByInstance(bot_instance_id: string): Promise { + return this.request(this.getApiPath(`/ai-voicebot/api/bots/config/schema/instance/${bot_instance_id}`), { method: "GET" }); + } + + async createRefreshBotSchemaByInstance(bot_instance_id: string): Promise { + return this.request(this.getApiPath(`/ai-voicebot/api/bots/config/schema/instance/${bot_instance_id}/refresh`), { method: "POST" }); + } + async getLobbyBotConfigs(lobby_id: string): Promise { return this.request(this.getApiPath(`/ai-voicebot/api/bots/config/lobby/${lobby_id}`), { method: "GET" }); } @@ -241,10 +249,6 @@ export class ApiClient { return this.request(this.getApiPath(`/ai-voicebot/api/bots/config/schema/${bot_name}/refresh`), { method: "POST" }); } - async deleteClearBotSchemaCache(bot_name: string): Promise { - return this.request(this.getApiPath(`/ai-voicebot/api/bots/config/schema/${bot_name}/cache`), { method: "DELETE" }); - } - async getReadinessProbe(): Promise { return this.request(this.getApiPath(`/ai-voicebot/api/health/ready`), { method: "GET" }); } @@ -321,6 +325,8 @@ export const botsApi = { requestLeaveLobby: (bot_instance_id: string) => apiClient.createRequestBotLeaveLobby(bot_instance_id), getInstance: (bot_instance_id: string) => apiClient.getBotInstance(bot_instance_id), getSchema: (bot_name: string) => apiClient.getBotConfigSchema(bot_name), + getSchemaByInstance: (bot_instance_id: string) => apiClient.getBotConfigSchemaByInstance(bot_instance_id), + refreshSchema: (bot_instance_id: string) => apiClient.createRefreshBotSchemaByInstance(bot_instance_id), getBotConfigs: (lobby_id: string) => apiClient.getLobbyBotConfigs(lobby_id), deleteBotConfigs: (lobby_id: string) => apiClient.deleteLobbyConfigs(lobby_id), getBotConfig: (lobby_id: string, bot_instance_id: string) => apiClient.getLobbyBotConfig(lobby_id, bot_instance_id), @@ -328,8 +334,7 @@ export const botsApi = { updateConfig: (data: any) => apiClient.createUpdateBotConfig(data), getConfigStatistics: () => apiClient.getConfigStatistics(), refreshAllSchemas: () => apiClient.createRefreshBotSchemas(), - refreshSchema: (bot_name: string) => apiClient.createRefreshBotSchema(bot_name), - clearSchemaCache: (bot_name: string) => apiClient.deleteClearBotSchemaCache(bot_name) + refreshSchemaByName: (bot_name: string) => apiClient.createRefreshBotSchema(bot_name) }; export const metricsApi = { diff --git a/client/src/api-evolution-checker.ts b/client/src/api-evolution-checker.ts index 27e8937..ed1554e 100644 --- a/client/src/api-evolution-checker.ts +++ b/client/src/api-evolution-checker.ts @@ -6,13 +6,13 @@ import { base } from './Common'; export const knownEndpoints = new Set([ `DELETE:${base}/api/bots/config/lobby/{lobby_id}`, `DELETE:${base}/api/bots/config/lobby/{lobby_id}/bot/{bot_instance_id}`, - `DELETE:${base}/api/bots/config/schema/{bot_name}/cache`, `GET:${base}/api/admin/names`, `GET:${base}/api/admin/session_metrics`, `GET:${base}/api/admin/validate_sessions`, `GET:${base}/api/bots`, `GET:${base}/api/bots/config/lobby/{lobby_id}`, `GET:${base}/api/bots/config/lobby/{lobby_id}/bot/{bot_instance_id}`, + `GET:${base}/api/bots/config/schema/instance/{bot_instance_id}`, `GET:${base}/api/bots/config/schema/{bot_name}`, `GET:${base}/api/bots/config/statistics`, `GET:${base}/api/bots/instances/{bot_instance_id}`, @@ -34,6 +34,7 @@ export const knownEndpoints = new Set([ `POST:${base}/api/admin/clear_password`, `POST:${base}/api/admin/set_password`, `POST:${base}/api/bots/config/refresh-schemas`, + `POST:${base}/api/bots/config/schema/instance/{bot_instance_id}/refresh`, `POST:${base}/api/bots/config/schema/{bot_name}/refresh`, `POST:${base}/api/bots/config/update`, `POST:${base}/api/bots/instances/{bot_instance_id}/leave`, diff --git a/client/src/api-types.ts b/client/src/api-types.ts index 5d935cf..3cd31f3 100644 --- a/client/src/api-types.ts +++ b/client/src/api-types.ts @@ -111,10 +111,29 @@ export interface paths { "/ai-voicebot/api/bots/config/schema/{bot_name}": { /** * Get Bot Config Schema - * @description Get configuration schema for a specific bot + * @description Get configuration schema for a specific bot. + * + * This endpoint will query registered bot providers each time and + * request the bot's /config-schema endpoint without relying on any + * server-side cached schema. This ensures the UI always receives the + * up-to-date schema from the provider. */ get: operations["get_bot_config_schema_ai_voicebot_api_bots_config_schema__bot_name__get"]; }; + "/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}": { + /** + * Get Bot Config Schema By Instance + * @description Get configuration schema for a specific bot instance + */ + get: operations["get_bot_config_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__get"]; + }; + "/ai-voicebot/api/bots/config/schema/instance/{bot_instance_id}/refresh": { + /** + * Refresh Bot Schema By Instance + * @description Refresh configuration schema for a specific bot instance + */ + post: operations["refresh_bot_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__refresh_post"]; + }; "/ai-voicebot/api/bots/config/lobby/{lobby_id}": { /** * Get Lobby Bot Configs @@ -167,13 +186,6 @@ export interface paths { */ post: operations["refresh_bot_schema_ai_voicebot_api_bots_config_schema__bot_name__refresh_post"]; }; - "/ai-voicebot/api/bots/config/schema/{bot_name}/cache": { - /** - * Clear Bot Schema Cache - * @description Clear cached schema for a specific bot - */ - delete: operations["clear_bot_schema_cache_ai_voicebot_api_bots_config_schema__bot_name__cache_delete"]; - }; "/ai-voicebot/api/health/ready": { /** * Readiness Probe @@ -1146,7 +1158,12 @@ export interface operations { }; /** * Get Bot Config Schema - * @description Get configuration schema for a specific bot + * @description Get configuration schema for a specific bot. + * + * This endpoint will query registered bot providers each time and + * request the bot's /config-schema endpoint without relying on any + * server-side cached schema. This ensures the UI always receives the + * up-to-date schema from the provider. */ get_bot_config_schema_ai_voicebot_api_bots_config_schema__bot_name__get: { parameters: { @@ -1169,6 +1186,58 @@ export interface operations { }; }; }; + /** + * Get Bot Config Schema By Instance + * @description Get configuration schema for a specific bot instance + */ + get_bot_config_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__get: { + parameters: { + path: { + bot_instance_id: string; + }; + }; + responses: { + /** @description Successful Response */ + 200: { + content: { + "application/json": components["schemas"]["BotConfigSchema"]; + }; + }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; + }; + }; + /** + * Refresh Bot Schema By Instance + * @description Refresh configuration schema for a specific bot instance + */ + refresh_bot_schema_by_instance_ai_voicebot_api_bots_config_schema_instance__bot_instance_id__refresh_post: { + parameters: { + path: { + bot_instance_id: string; + }; + }; + responses: { + /** @description Successful Response */ + 200: { + content: { + "application/json": { + [key: string]: unknown; + }; + }; + }; + /** @description Validation Error */ + 422: { + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; + }; + }; /** * Get Lobby Bot Configs * @description Get all bot configurations for a lobby @@ -1364,33 +1433,6 @@ export interface operations { }; }; }; - /** - * Clear Bot Schema Cache - * @description Clear cached schema for a specific bot - */ - clear_bot_schema_cache_ai_voicebot_api_bots_config_schema__bot_name__cache_delete: { - parameters: { - path: { - bot_name: string; - }; - }; - responses: { - /** @description Successful Response */ - 200: { - content: { - "application/json": { - [key: string]: unknown; - }; - }; - }; - /** @description Validation Error */ - 422: { - content: { - "application/json": components["schemas"]["HTTPValidationError"]; - }; - }; - }; - }; /** * Readiness Probe * @description Kubernetes readiness probe endpoint. diff --git a/client/update-api-client.js b/client/update-api-client.js index c7e9569..a8499b7 100644 --- a/client/update-api-client.js +++ b/client/update-api-client.js @@ -201,7 +201,31 @@ class ApiClientGenerator { // Generate convenience methods for each namespace const namespaceDefinitions = Object.entries(namespaceGroups).map(([namespace, endpoints]) => { - const methods = endpoints.map(endpoint => this.generateConvenienceMethod(endpoint)).join(',\n '); + // Track used method names to avoid duplicate object keys + const usedNames = new Set(); + + const methods = endpoints.map(endpoint => { + // Compute desired method name + let candidate = this.generateConvenienceMethodName(endpoint); + + // If name already used, try to disambiguate with contextual suffixes + if (usedNames.has(candidate)) { + if (endpoint.path.includes('/instance/')) { + candidate = candidate + 'ByInstance'; + } else if (endpoint.path.match(/\/config\/schema\/\{?bot_name\}?/)) { + candidate = candidate + 'ByName'; + } else { + // Fallback: append a numeric suffix to ensure uniqueness + let i = 2; + while (usedNames.has(candidate + i)) i++; + candidate = candidate + i; + } + } + + usedNames.add(candidate); + + return this.generateConvenienceMethod(endpoint, candidate); + }).join(',\n '); return `export const ${namespace}Api = {\n ${methods}\n};`; }).join('\n\n'); @@ -255,8 +279,8 @@ class ApiClientGenerator { /** * Generate a convenience method for an endpoint with intuitive naming */ - generateConvenienceMethod(endpoint) { - const methodName = this.generateConvenienceMethodName(endpoint); + generateConvenienceMethod(endpoint, overrideName) { + const methodName = overrideName || this.generateConvenienceMethodName(endpoint); const clientMethodName = this.generateMethodName(endpoint); const params = this.extractMethodParameters(endpoint); diff --git a/server/api/bot_config.py b/server/api/bot_config.py index 6c10c65..781efcd 100644 --- a/server/api/bot_config.py +++ b/server/api/bot_config.py @@ -61,64 +61,39 @@ def create_bot_config_router( @router.get("/schema/{bot_name}") async def get_bot_config_schema(bot_name: str) -> BotConfigSchema: # type: ignore - """Get configuration schema for a specific bot""" + """Get configuration schema for a specific bot. + + This endpoint will query registered bot providers each time and + request the bot's /config-schema endpoint without relying on any + server-side cached schema. This ensures the UI always receives the + up-to-date schema from the provider. + """ try: - # Check if we have cached schema - schema = config_manager.get_bot_config_schema(bot_name) + providers_response = bot_manager.list_providers() + schema = None + for provider in providers_response.providers: + try: + # Check if this provider has the bot + provider_bots = await bot_manager.get_provider_bots( + provider.provider_id + ) + bot_names = [bot.name for bot in provider_bots.bots] - if not schema: - # Try to discover schema from bot provider - providers_response = bot_manager.list_providers() - for provider in providers_response.providers: - try: - # Check if this provider has the bot - provider_bots = await bot_manager.get_provider_bots( - provider.provider_id - ) - bot_names = [bot.name for bot in provider_bots.bots] - - if bot_name in bot_names: - # Get the full provider object to access base_url - full_provider = bot_manager.get_provider(provider.provider_id) - if full_provider: - schema = await config_manager.discover_bot_config_schema( - bot_name, full_provider.base_url - ) - if schema: - break - except Exception as e: - logger.warning( - f"Failed to check provider {provider.provider_id} for bot {bot_name}: {e}" - ) - continue - else: - # We have a cached schema, but check if it might be stale - # Try to refresh it automatically if it's older than 1 hour - providers_response = bot_manager.list_providers() - for provider in providers_response.providers: - try: - provider_bots = await bot_manager.get_provider_bots( - provider.provider_id - ) - bot_names = [bot.name for bot in provider_bots.bots] - - if bot_name in bot_names: - # This will only refresh if the cached schema is older than 1 hour - # Get the full provider object to access base_url - full_provider = bot_manager.get_provider(provider.provider_id) - if full_provider: - fresh_schema = ( - await config_manager.discover_bot_config_schema( - bot_name, full_provider.base_url, force_refresh=False - ) - ) - if fresh_schema: - schema = fresh_schema - break - except Exception as e: - logger.debug(f"Failed to refresh schema for {bot_name}: {e}") - # Continue with cached schema if refresh fails - continue + if bot_name in bot_names: + # Get the full provider object to access base_url + full_provider = bot_manager.get_provider(provider.provider_id) + if full_provider: + # Force discovery from the provider on every request + schema = await config_manager.discover_bot_config_schema( + bot_name, full_provider.base_url, force_refresh=True + ) + if schema: + break + except Exception as e: + logger.warning( + f"Failed to check provider {provider.provider_id} for bot {bot_name}: {e}" + ) + continue if not schema: raise HTTPException( @@ -142,23 +117,19 @@ def create_bot_config_router( bot_instance = await bot_manager.get_bot_instance(bot_instance_id) bot_name = bot_instance.bot_name - # Check if we have cached schema - schema = config_manager.get_bot_config_schema(bot_name) - - if not schema: - # Try to discover schema from bot provider - provider = bot_manager.get_provider(bot_instance.provider_id) - if provider: - try: - schema = await config_manager.discover_bot_config_schema( - bot_name, provider.base_url - ) - except Exception as e: - logger.warning( - f"Failed to discover schema for bot {bot_name} from provider {bot_instance.provider_id}: {e}" - ) - else: - logger.warning(f"Provider {bot_instance.provider_id} not found") + # Always query the provider directly for the latest schema + provider = bot_manager.get_provider(bot_instance.provider_id) + if provider: + try: + schema = await config_manager.discover_bot_config_schema( + bot_name, provider.base_url, force_refresh=True + ) + except Exception as e: + logger.warning( + f"Failed to discover schema for bot {bot_name} from provider {bot_instance.provider_id}: {e}" + ) + else: + logger.warning(f"Provider {bot_instance.provider_id} not found") if not schema: raise HTTPException( @@ -195,8 +166,9 @@ def create_bot_config_router( status_code=404, detail=f"Provider '{provider_id}' not found" ) - schema = await config_manager.refresh_bot_schema( - bot_name, provider.base_url + # Force a fresh fetch from the provider + schema = await config_manager.discover_bot_config_schema( + bot_name, provider.base_url, force_refresh=True ) if schema: return { @@ -478,25 +450,9 @@ def create_bot_config_router( logger.error(f"Failed to refresh schema for bot {bot_name}: {e}") raise HTTPException(status_code=500, detail="Internal server error") - @router.delete("/schema/{bot_name}/cache") - async def clear_bot_schema_cache(bot_name: str) -> Dict[str, Any]: - """Clear cached schema for a specific bot""" - try: - success = config_manager.clear_bot_schema_cache(bot_name) - if success: - return { - "success": True, - "message": f"Schema cache cleared for bot {bot_name}", - } - else: - raise HTTPException( - status_code=404, detail=f"No cached schema found for bot {bot_name}" - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Failed to clear schema cache for bot {bot_name}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") + # NOTE: schema cache management endpoints removed. Server no longer + # stores or serves cached bot config schemas; callers should fetch + # live schemas from providers via the /schema endpoints. return router diff --git a/server/core/bot_config_manager.py b/server/core/bot_config_manager.py index ff0f455..d7d3484 100644 --- a/server/core/bot_config_manager.py +++ b/server/core/bot_config_manager.py @@ -31,22 +31,22 @@ class BotConfigManager: def __init__(self, storage_dir: str = "./bot_configs"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(exist_ok=True) - # In-memory cache for fast access - self.config_cache: Dict[str, Dict[str, BotLobbyConfig]] = {} # lobby_id -> bot_name -> config - self.schema_cache: Dict[str, BotConfigSchema] = {} # bot_name -> schema - + self.config_cache: Dict[ + str, Dict[str, BotLobbyConfig] + ] = {} # lobby_id -> bot_name -> config + # Load existing configurations self._load_configurations() - + def _get_config_file(self, lobby_id: str) -> Path: """Get configuration file path for a lobby""" return self.storage_dir / f"lobby_{lobby_id}.json" - + def _get_schema_file(self, bot_name: str) -> Path: """Get schema file path for a bot""" return self.storage_dir / f"schema_{bot_name}.json" - + def _load_configurations(self): """Load all configurations from disk""" try: @@ -54,133 +54,86 @@ class BotConfigManager: for config_file in self.storage_dir.glob("lobby_*.json"): try: lobby_id = config_file.stem.replace("lobby_", "") - with open(config_file, 'r') as f: + with open(config_file, "r") as f: data = json.load(f) - + self.config_cache[lobby_id] = {} for bot_name, config_data in data.items(): config = BotLobbyConfig(**config_data) self.config_cache[lobby_id][bot_name] = config - + except Exception as e: logger.error(f"Failed to load lobby config {config_file}: {e}") - - # Load bot schemas - for schema_file in self.storage_dir.glob("schema_*.json"): - try: - bot_name = schema_file.stem.replace("schema_", "") - with open(schema_file, 'r') as f: - schema_data = json.load(f) - - schema = BotConfigSchema(**schema_data) - self.schema_cache[bot_name] = schema - - except Exception as e: - logger.error(f"Failed to load bot schema {schema_file}: {e}") - - logger.info(f"Loaded configurations for {len(self.config_cache)} lobbies and {len(self.schema_cache)} bot schemas") - + + logger.info(f"Loaded configurations for {len(self.config_cache)} lobbies") + except Exception as e: logger.error(f"Failed to load configurations: {e}") - + def _save_lobby_config(self, lobby_id: str): """Save lobby configuration to disk""" try: config_file = self._get_config_file(lobby_id) - + if lobby_id not in self.config_cache: return - + # Convert to serializable format data = {} for bot_name, config in self.config_cache[lobby_id].items(): data[bot_name] = config.model_dump() - - with open(config_file, 'w') as f: + + with open(config_file, "w") as f: json.dump(data, f, indent=2) - + except Exception as e: logger.error(f"Failed to save lobby config {lobby_id}: {e}") - + def _save_bot_schema(self, bot_name: str): """Save bot schema to disk""" - try: - if bot_name not in self.schema_cache: - return - - schema_file = self._get_schema_file(bot_name) - schema_data = self.schema_cache[bot_name].model_dump() - - with open(schema_file, 'w') as f: - json.dump(schema_data, f, indent=2) - - except Exception as e: - logger.error(f"Failed to save bot schema {bot_name}: {e}") + # Schema file persistence disabled: server no longer caches provider schemas to disk. + logger.debug( + f"_save_bot_schema called for {bot_name}, but schema persistence is disabled" + ) async def discover_bot_config_schema( self, bot_name: str, provider_url: str, force_refresh: bool = False ) -> Optional[BotConfigSchema]: """Discover configuration schema from bot provider""" try: - # Check if we have a cached schema and it's not forced refresh - if not force_refresh and bot_name in self.schema_cache: - cached_schema = self.schema_cache[bot_name] - # Check if schema is less than 1 hour old - schema_file = self._get_schema_file(bot_name) - if schema_file.exists(): - file_age = time.time() - schema_file.stat().st_mtime - if file_age < 3600: # 1 hour - logger.debug( - f"Using cached schema for bot {bot_name} (age: {file_age:.0f}s)" - ) - return cached_schema - + # Always fetch schema directly from the provider; do not use or + # update any server-side caches or files. This ensures callers + # receive the live schema from the provider on each request. async with httpx.AsyncClient() as client: - # Try to get configuration schema from bot provider response = await client.get( f"{provider_url}/bots/{bot_name}/config-schema", - timeout=10.0 + timeout=10.0, ) - + if response.status_code == 200: schema_data = response.json() schema = BotConfigSchema(**schema_data) - - # Check if schema has actually changed - if bot_name in self.schema_cache: - old_schema = self.schema_cache[bot_name] - if old_schema.model_dump() == schema.model_dump(): - logger.debug( - f"Schema for bot {bot_name} unchanged, updating timestamp only" - ) - else: - logger.info(f"Schema for bot {bot_name} has been updated") - - # Cache the schema - self.schema_cache[bot_name] = schema - self._save_bot_schema(bot_name) - - logger.info( - f"Discovered/refreshed config schema for bot {bot_name}" - ) + logger.info(f"Fetched live config schema for bot {bot_name}") return schema else: - logger.warning(f"Bot {bot_name} does not support configuration (HTTP {response.status_code})") - + logger.warning( + f"Bot {bot_name} does not support configuration (HTTP {response.status_code})" + ) + except Exception as e: logger.warning(f"Failed to discover config schema for bot {bot_name}: {e}") - # Return cached schema if available, even if refresh failed - if bot_name in self.schema_cache: - logger.info( - f"Returning cached schema for bot {bot_name} after refresh failure" - ) - return self.schema_cache[bot_name] - + return None def get_bot_config_schema(self, bot_name: str) -> Optional[BotConfigSchema]: - """Get cached configuration schema for a bot""" - return self.schema_cache.get(bot_name) + """Deprecated: server no longer maintains a cached schema. + + Return None to indicate no server-side cached schema is available. + """ + logger.debug( + "get_bot_config_schema called but server-side schema cache is disabled" + ) + return None async def refresh_bot_schema( self, bot_name: str, provider_url: str @@ -192,14 +145,11 @@ class BotConfigManager: def clear_bot_schema_cache(self, bot_name: str) -> bool: """Clear cached schema for a specific bot""" - if bot_name in self.schema_cache: - del self.schema_cache[bot_name] - # Also remove the cached file - schema_file = self._get_schema_file(bot_name) - if schema_file.exists(): - schema_file.unlink() - logger.info(f"Cleared schema cache for bot {bot_name}") - return True + # No-op: server-side schema caching has been disabled. Return False to + # indicate there was no cached schema to clear. + logger.info( + f"clear_bot_schema_cache called for {bot_name} but caching is disabled" + ) return False def get_lobby_bot_config(self, lobby_id: str, bot_name: str) -> Optional[BotLobbyConfig]: @@ -221,13 +171,10 @@ class BotConfigManager: config_values: Dict[str, Any], session_id: str) -> BotLobbyConfig: """Set or update bot configuration for a lobby""" - - # Validate configuration against schema if available - schema = self.get_bot_config_schema(bot_name) - if schema: - validated_values = self._validate_config_values(config_values, schema) - else: - validated_values = config_values + # Schema validation against a server-side cache is disabled. If + # callers want strict validation, they should fetch the provider's + # live schema and validate prior to calling set_bot_config. + validated_values = config_values # Create or update configuration now = time.time() @@ -388,9 +335,9 @@ class BotConfigManager: return { "total_lobbies": len(self.config_cache), "total_configs": total_configs, - "cached_schemas": len(self.schema_cache), + # server-side schema caching is disabled; omit cached_schemas "lobbies": { - lobby_id: len(configs) + lobby_id: len(configs) for lobby_id, configs in self.config_cache.items() - } + }, } diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index a1e2ba3..a19fcac 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -254,6 +254,12 @@ VAD_CONFIG = { "speech_freq_max": 3000, # Hz } +# Normalization defaults: used to control optional per-stream normalization +# applied before sending audio to the model and for visualization. +NORMALIZATION_ENABLED = True +NORMALIZATION_TARGET_PEAK = 0.95 +MAX_NORMALIZATION_GAIN = 3.0 + # How long (seconds) of no-arriving audio before we consider the phrase ended INACTIVITY_TIMEOUT = 1.5 @@ -1154,6 +1160,15 @@ class OptimizedAudioProcessor: # Enhanced VAD parameters with EMA for noise adaptation self.advanced_vad = AdvancedVAD(sample_rate=self.sample_rate) + # Track maximum observed absolute amplitude for this input stream + # This is used optionally to normalize incoming audio to the "observed" + # maximum which helps models expect a consistent level across peers. + # It's intentionally permissive and capped to avoid amplifying noise. + self.max_observed_amplitude: float = 1e-6 + self.normalization_enabled: bool = True + self.normalization_target_peak: float = 0.95 + self.max_normalization_gain: float = 3.0 # avoid amplifying tiny noise too much + # Processing state self.current_phrase_audio = np.array([], dtype=np.float32) self.transcription_history: List[TranscriptionHistoryItem] = [] @@ -1232,6 +1247,15 @@ class OptimizedAudioProcessor: # Update last audio time whenever any audio is received self.last_audio_time = time.time() + # Update max observed amplitude (used later for optional normalization) + try: + peak = float(np.max(np.abs(audio_data))) if audio_data.size > 0 else 0.0 + if peak > self.max_observed_amplitude: + self.max_observed_amplitude = float(peak) + except Exception: + # Be defensive - don't fail audio ingestion for amplitude tracking + pass + is_speech, vad_metrics = self.advanced_vad.analyze_frame(audio_data) # Update visualization status @@ -1386,8 +1410,10 @@ class OptimizedAudioProcessor: asyncio.run_coroutine_threadsafe(_send_final_marker(), self.main_loop) except Exception: logger.debug(f"Could not schedule final marker for {self.peer_name}") - else: - # As a fallback, try to schedule the normal coroutine if possible + # As a fallback (if we couldn't schedule the marker on the + # main loop), try to schedule the normal async transcription + # coroutine. This is only used when the immediate marker + # cannot be scheduled — avoid scheduling both paths. try: asyncio.create_task( self._transcribe_and_send( @@ -1538,9 +1564,17 @@ class OptimizedAudioProcessor: logger.info( f"Final transcription timeout for {self.peer_name} (asyncio.TimeoutError, inactivity)" ) - await self._transcribe_and_send( - self.current_phrase_audio.copy(), is_final=True - ) + # Avoid duplicate finals: if a final is already pending + # (for example the blocking final was queued), skip scheduling + # another final. Otherwise set the pending flag and run the + # final transcription. + if not self.final_transcription_pending: + self.final_transcription_pending = True + await self._transcribe_and_send( + self.current_phrase_audio.copy(), is_final=True + ) + else: + logger.debug(f"Final already pending for {self.peer_name}; skipping async final") self.current_phrase_audio = np.array([], dtype=np.float32) except Exception as e: logger.error( @@ -1599,12 +1633,17 @@ class OptimizedAudioProcessor: logger.info( f"Final transcription from thread for {self.peer_name} (inactivity)" ) - asyncio.run_coroutine_threadsafe( - self._transcribe_and_send( - self.current_phrase_audio.copy(), is_final=True - ), - self.main_loop, - ) + # Avoid scheduling duplicates if a final is already pending + if not self.final_transcription_pending: + self.final_transcription_pending = True + asyncio.run_coroutine_threadsafe( + self._transcribe_and_send( + self.current_phrase_audio.copy(), is_final=True + ), + self.main_loop, + ) + else: + logger.debug(f"Final already pending for {self.peer_name}; skipping thread-scheduled final") self.current_phrase_audio = np.array([], dtype=np.float32) except Exception as e: logger.error( @@ -1679,15 +1718,38 @@ class OptimizedAudioProcessor: try: audio_duration = len(audio_array) / self.sample_rate - # Skip very short audio - if audio_duration < 0.3: + # Compute basic energy/peak metrics for filtering decisions + audio_rms = float(np.sqrt(np.mean(audio_array**2))) + audio_peak = float(np.max(np.abs(audio_array))) if audio_array.size > 0 else 0.0 + + # Short-burst filtering: drop very short bursts that are likely noise. + # - If duration < 0.5s and RMS is very low -> drop + # - If duration < 0.8s and peak is very small relative to the + # max observed amplitude -> drop. This prevents single-packet + # random noises from becoming transcriptions. + short_duration_threshold = 0.5 + relaxed_short_duration = 0.8 + rms_min_threshold = 0.002 + relative_peak_min_ratio = 0.05 + + if audio_duration < short_duration_threshold and audio_rms < rms_min_threshold: logger.debug( - f"Skipping {transcription_type} transcription: too short ({audio_duration:.2f}s)" + f"Skipping {transcription_type} transcription: short & quiet ({audio_duration:.2f}s, RMS {audio_rms:.6f})" ) return - # Audio quality check - audio_rms = np.sqrt(np.mean(audio_array**2)) + # If we have observed a stronger level on this stream, require a + # sensible fraction of that to consider this burst valid. + max_amp = getattr(self, "max_observed_amplitude", 0.0) or 0.0 + if audio_duration < relaxed_short_duration and max_amp > 0.0: + rel = audio_peak / (max_amp + 1e-12) + if rel < relative_peak_min_ratio: + logger.debug( + f"Skipping {transcription_type} transcription: short burst with low relative peak ({audio_duration:.2f}s, rel {rel:.3f})" + ) + return + + # Very quiet audio - skip entirely if audio_rms < 0.001: logger.debug( f"Skipping {transcription_type} transcription: too quiet (RMS: {audio_rms:.6f})" @@ -1698,8 +1760,28 @@ class OptimizedAudioProcessor: f"🎬 OpenVINO transcription ({transcription_type}) started: {audio_duration:.2f}s, RMS: {audio_rms:.4f}" ) + # Optionally normalize audio prior to feature extraction. We use + # the historical maximum observed amplitude for this stream to + # compute a conservative gain. The gain is clamped to avoid + # amplifying noise excessively. + audio_for_model = audio_array + try: + if getattr(self, "normalization_enabled", False): + stream_max = getattr(self, "max_observed_amplitude", 0.0) or 0.0 + # Use the larger of observed max and current peak to avoid + # over-scaling when current chunk is the loudest. + denom = max(stream_max, audio_peak, 1e-12) + gain = float(self.normalization_target_peak) / denom + # Clamp gain + gain = max(min(gain, float(self.max_normalization_gain)), 0.25) + if abs(gain - 1.0) > 1e-3: + logger.debug(f"Applying normalization gain {gain:.3f} for {self.peer_name}") + audio_for_model = np.clip(audio_array * gain, -0.999, 0.999).astype(np.float32) + except Exception as e: + logger.debug(f"Normalization step failed for {self.peer_name}: {e}") + # Extract features for OpenVINO - input_features = extract_input_features(audio_array, self.sample_rate) + input_features = extract_input_features(audio_for_model, self.sample_rate) # logger.info(f"Features extracted for OpenVINO: {input_features.shape}") # GPU inference with OpenVINO @@ -2125,8 +2207,29 @@ class WaveformVideoTrack(MediaStreamTrack): [np.zeros(samples_needed - len(arr), dtype=np.float32), arr] ) - # Assume arr_segment is already in [-1, 1] - norm = arr_segment + # Single normalization code path: normalize based on the historical + # peak observed for this stream (proc.max_observed_amplitude). This + # ensures the waveform display is consistent over time and avoids + # using the instantaneous buffer peak. + proc = None + norm = arr_segment.astype(np.float32) + try: + proc = _audio_processors.get(pname) + if proc is not None and getattr(proc, "normalization_enabled", False): + stream_max = getattr(proc, "max_observed_amplitude", 0.0) or 0.0 + denom = max(stream_max, 1e-12) + gain = float(proc.normalization_target_peak) / denom + gain = max(min(gain, float(proc.max_normalization_gain)), 0.25) + if abs(gain - 1.0) > 1e-6: + norm = np.clip(arr_segment * gain, -1.0, 1.0).astype(np.float32) + else: + norm = arr_segment.astype(np.float32) + else: + norm = arr_segment.astype(np.float32) + except Exception: + # Fall back to raw samples if normalization computation fails + proc = None + norm = arr_segment.astype(np.float32) # Map audio samples to pixels across the width if norm.size < self.width: @@ -2144,12 +2247,17 @@ class WaveformVideoTrack(MediaStreamTrack): dtype=np.float32, ) + # For display we use the same `norm` computed above (single code + # path). Use `display_norm` alias to avoid confusion later in the + # code but don't recompute normalization. + display_norm = norm + # Draw waveform with color coding for speech detection points: list[tuple[int, int]] = [] colors: list[tuple[int, int, int]] = [] # Color for each point for x in range(self.width): - v = float(norm[x]) if x < norm.size and not np.isnan(norm[x]) else 0.0 + v = float(display_norm[x]) if x < display_norm.size and not np.isnan(display_norm[x]) else 0.0 y = int((1.0 - ((v + 1.0) / 2.0)) * (self.height - 120)) + 100 points.append((x, y)) @@ -2169,6 +2277,33 @@ class WaveformVideoTrack(MediaStreamTrack): for i in range(len(points) - 1): cv2.line(frame_array, points[i], points[i+1], colors[i], 1) + # Draw historical peak indicator (horizontal lines at +/-(target_peak)) + try: + if proc is not None and getattr(proc, "normalization_enabled", False): + target_peak = float(getattr(proc, "normalization_target_peak", 0.0)) + # Ensure target_peak is within [0, 1] + target_peak = max(0.0, min(1.0, target_peak)) + + def _amp_to_y(a: float) -> int: + return int((1.0 - ((a + 1.0) / 2.0)) * (self.height - 120)) + 100 + + top_y = _amp_to_y(target_peak) + bot_y = _amp_to_y(-target_peak) + + # Draw thin magenta lines across the waveform area + cv2.line(frame_array, (0, top_y), (self.width - 1, top_y), (255, 0, 255), 1) + cv2.line(frame_array, (0, bot_y), (self.width - 1, bot_y), (255, 0, 255), 1) + + # Label the peak with small text near the right edge + label = f"Peak:{target_peak:.2f}" + (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) + lx = max(10, self.width - tw - 12) + ly = max(12, top_y - 6) + cv2.putText(frame_array, label, (lx, ly), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1) + except Exception: + # Non-critical: ignore any drawing errors + pass + # Add speech detection status overlay if speech_info: self._draw_speech_status(frame_array, speech_info, pname) @@ -2706,11 +2841,41 @@ def get_config_schema() -> Dict[str, Any]: "max_value": 0.5, "step": 0.05 } + , + { + "name": "normalization_enabled", + "type": "boolean", + "label": "Enable Normalization", + "description": "Normalize incoming audio based on observed peak amplitude before transcription and visualization", + "default_value": NORMALIZATION_ENABLED, + "required": False + }, + { + "name": "normalization_target_peak", + "type": "number", + "label": "Normalization Target Peak", + "description": "Target peak (0-1) used when normalizing audio", + "default_value": NORMALIZATION_TARGET_PEAK, + "required": False, + "min_value": 0.5, + "max_value": 1.0 + }, + { + "name": "max_normalization_gain", + "type": "range", + "label": "Max Normalization Gain", + "description": "Maximum allowed gain applied during normalization", + "default_value": MAX_NORMALIZATION_GAIN, + "required": False, + "min_value": 1.0, + "max_value": 10.0, + "step": 0.1 + } ], "categories": [ {"Model Settings": ["model_id", "device", "enable_quantization"]}, {"Performance Settings": ["throughput_streams", "max_threads"]}, - {"Audio Settings": ["sample_rate", "chunk_duration_ms"]}, + {"Audio Settings": ["sample_rate", "chunk_duration_ms", "normalization_enabled", "normalization_target_peak", "max_normalization_gain"]}, {"Voice Activity Detection": ["vad_threshold", "max_silence_frames", "max_trailing_silence_frames", "vad_energy_threshold", "vad_zcr_min", "vad_zcr_max", "vad_spectral_centroid_min", "vad_spectral_centroid_max", "vad_spectral_rolloff_threshold", "vad_minimum_duration", "vad_max_history", "vad_noise_floor_energy", "vad_adaptation_rate", "vad_harmonic_threshold"]} ] } @@ -2838,6 +3003,36 @@ def handle_config_update(lobby_id: str, config_values: Dict[str, Any]) -> bool: # Note: Existing processors would need to be recreated to pick up VAD changes # For now, we'll log that a restart may be needed logger.info("VAD configuration updated - existing processors may need restart to take effect") + + # Normalization updates: apply to global defaults and active processors + norm_updates = False + if "normalization_enabled" in config_values: + NORMALIZATION_ENABLED = bool(config_values["normalization_enabled"]) + norm_updates = True + logger.info(f"Updated NORMALIZATION_ENABLED to: {NORMALIZATION_ENABLED}") + if "normalization_target_peak" in config_values: + NORMALIZATION_TARGET_PEAK = float(config_values["normalization_target_peak"]) + norm_updates = True + logger.info(f"Updated NORMALIZATION_TARGET_PEAK to: {NORMALIZATION_TARGET_PEAK}") + if "max_normalization_gain" in config_values: + MAX_NORMALIZATION_GAIN = float(config_values["max_normalization_gain"]) + norm_updates = True + logger.info(f"Updated MAX_NORMALIZATION_GAIN to: {MAX_NORMALIZATION_GAIN}") + + if norm_updates: + # Propagate changes to existing processors + try: + for pname, proc in list(_audio_processors.items()): + try: + proc.normalization_enabled = NORMALIZATION_ENABLED + proc.normalization_target_peak = NORMALIZATION_TARGET_PEAK + proc.max_normalization_gain = MAX_NORMALIZATION_GAIN + logger.info(f"Applied normalization config to processor: {pname}") + except Exception: + logger.debug(f"Failed to apply normalization config to processor: {pname}") + config_applied = True + except Exception: + logger.debug("Failed to propagate normalization settings to processors") if config_applied: logger.info(f"Configuration update completed for lobby {lobby_id}")