ai-voicebot/server/core/bot_config_manager.py

397 lines
16 KiB
Python

"""
Bot Configuration Manager
This module handles per-lobby bot configuration management including:
- Configuration schema discovery from bot providers
- Configuration storage and retrieval per lobby
- Real-time configuration updates
- Validation of configuration values
"""
import json
import time
import httpx
from typing import Dict, List, Optional, Any
from pathlib import Path
from logger import logger
# Import shared models
import sys
import os
sys.path.append(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
from shared.models import BotConfigSchema, BotLobbyConfig
class BotConfigManager:
"""Manages bot configurations for lobbies"""
def __init__(self, storage_dir: str = "./bot_configs"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
# In-memory cache for fast access
self.config_cache: Dict[str, Dict[str, BotLobbyConfig]] = {} # lobby_id -> bot_name -> config
self.schema_cache: Dict[str, BotConfigSchema] = {} # bot_name -> schema
# Load existing configurations
self._load_configurations()
def _get_config_file(self, lobby_id: str) -> Path:
"""Get configuration file path for a lobby"""
return self.storage_dir / f"lobby_{lobby_id}.json"
def _get_schema_file(self, bot_name: str) -> Path:
"""Get schema file path for a bot"""
return self.storage_dir / f"schema_{bot_name}.json"
def _load_configurations(self):
"""Load all configurations from disk"""
try:
# Load lobby configurations
for config_file in self.storage_dir.glob("lobby_*.json"):
try:
lobby_id = config_file.stem.replace("lobby_", "")
with open(config_file, 'r') as f:
data = json.load(f)
self.config_cache[lobby_id] = {}
for bot_name, config_data in data.items():
config = BotLobbyConfig(**config_data)
self.config_cache[lobby_id][bot_name] = config
except Exception as e:
logger.error(f"Failed to load lobby config {config_file}: {e}")
# Load bot schemas
for schema_file in self.storage_dir.glob("schema_*.json"):
try:
bot_name = schema_file.stem.replace("schema_", "")
with open(schema_file, 'r') as f:
schema_data = json.load(f)
schema = BotConfigSchema(**schema_data)
self.schema_cache[bot_name] = schema
except Exception as e:
logger.error(f"Failed to load bot schema {schema_file}: {e}")
logger.info(f"Loaded configurations for {len(self.config_cache)} lobbies and {len(self.schema_cache)} bot schemas")
except Exception as e:
logger.error(f"Failed to load configurations: {e}")
def _save_lobby_config(self, lobby_id: str):
"""Save lobby configuration to disk"""
try:
config_file = self._get_config_file(lobby_id)
if lobby_id not in self.config_cache:
return
# Convert to serializable format
data = {}
for bot_name, config in self.config_cache[lobby_id].items():
data[bot_name] = config.model_dump()
with open(config_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save lobby config {lobby_id}: {e}")
def _save_bot_schema(self, bot_name: str):
"""Save bot schema to disk"""
try:
if bot_name not in self.schema_cache:
return
schema_file = self._get_schema_file(bot_name)
schema_data = self.schema_cache[bot_name].model_dump()
with open(schema_file, 'w') as f:
json.dump(schema_data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save bot schema {bot_name}: {e}")
async def discover_bot_config_schema(
self, bot_name: str, provider_url: str, force_refresh: bool = False
) -> Optional[BotConfigSchema]:
"""Discover configuration schema from bot provider"""
try:
# Check if we have a cached schema and it's not forced refresh
if not force_refresh and bot_name in self.schema_cache:
cached_schema = self.schema_cache[bot_name]
# Check if schema is less than 1 hour old
schema_file = self._get_schema_file(bot_name)
if schema_file.exists():
file_age = time.time() - schema_file.stat().st_mtime
if file_age < 3600: # 1 hour
logger.debug(
f"Using cached schema for bot {bot_name} (age: {file_age:.0f}s)"
)
return cached_schema
async with httpx.AsyncClient() as client:
# Try to get configuration schema from bot provider
response = await client.get(
f"{provider_url}/bots/{bot_name}/config-schema",
timeout=10.0
)
if response.status_code == 200:
schema_data = response.json()
schema = BotConfigSchema(**schema_data)
# Check if schema has actually changed
if bot_name in self.schema_cache:
old_schema = self.schema_cache[bot_name]
if old_schema.model_dump() == schema.model_dump():
logger.debug(
f"Schema for bot {bot_name} unchanged, updating timestamp only"
)
else:
logger.info(f"Schema for bot {bot_name} has been updated")
# Cache the schema
self.schema_cache[bot_name] = schema
self._save_bot_schema(bot_name)
logger.info(
f"Discovered/refreshed config schema for bot {bot_name}"
)
return schema
else:
logger.warning(f"Bot {bot_name} does not support configuration (HTTP {response.status_code})")
except Exception as e:
logger.warning(f"Failed to discover config schema for bot {bot_name}: {e}")
# Return cached schema if available, even if refresh failed
if bot_name in self.schema_cache:
logger.info(
f"Returning cached schema for bot {bot_name} after refresh failure"
)
return self.schema_cache[bot_name]
return None
def get_bot_config_schema(self, bot_name: str) -> Optional[BotConfigSchema]:
"""Get cached configuration schema for a bot"""
return self.schema_cache.get(bot_name)
async def refresh_bot_schema(
self, bot_name: str, provider_url: str
) -> Optional[BotConfigSchema]:
"""Force refresh of bot schema from provider"""
return await self.discover_bot_config_schema(
bot_name, provider_url, force_refresh=True
)
def clear_bot_schema_cache(self, bot_name: str) -> bool:
"""Clear cached schema for a specific bot"""
if bot_name in self.schema_cache:
del self.schema_cache[bot_name]
# Also remove the cached file
schema_file = self._get_schema_file(bot_name)
if schema_file.exists():
schema_file.unlink()
logger.info(f"Cleared schema cache for bot {bot_name}")
return True
return False
def get_lobby_bot_config(self, lobby_id: str, bot_name: str) -> Optional[BotLobbyConfig]:
"""Get bot configuration for a specific lobby"""
if lobby_id in self.config_cache and bot_name in self.config_cache[lobby_id]:
return self.config_cache[lobby_id][bot_name]
return None
def get_lobby_configs(self, lobby_id: str) -> List[BotLobbyConfig]:
"""Get all bot configurations for a lobby"""
if lobby_id in self.config_cache:
return list(self.config_cache[lobby_id].values())
return []
def set_bot_config(self,
lobby_id: str,
bot_name: str,
provider_id: str,
config_values: Dict[str, Any],
session_id: str) -> BotLobbyConfig:
"""Set or update bot configuration for a lobby"""
# Validate configuration against schema if available
schema = self.get_bot_config_schema(bot_name)
if schema:
validated_values = self._validate_config_values(config_values, schema)
else:
validated_values = config_values
# Create or update configuration
now = time.time()
existing_config = self.get_lobby_bot_config(lobby_id, bot_name)
if existing_config:
# Update existing
config = BotLobbyConfig(
bot_name=bot_name,
lobby_id=lobby_id,
provider_id=provider_id,
config_values=validated_values,
created_at=existing_config.created_at,
updated_at=now,
created_by=existing_config.created_by
)
else:
# Create new
config = BotLobbyConfig(
bot_name=bot_name,
lobby_id=lobby_id,
provider_id=provider_id,
config_values=validated_values,
created_at=now,
updated_at=now,
created_by=session_id
)
# Store in cache
if lobby_id not in self.config_cache:
self.config_cache[lobby_id] = {}
self.config_cache[lobby_id][bot_name] = config
# Save to disk
self._save_lobby_config(lobby_id)
logger.info(f"Updated config for bot {bot_name} in lobby {lobby_id}")
return config
def _validate_config_values(self, values: Dict[str, Any], schema: BotConfigSchema) -> Dict[str, Any]:
"""Validate configuration values against schema"""
validated = {}
for param in schema.parameters:
value = values.get(param.name)
# Check required parameters
if param.required and value is None:
if param.default_value is not None:
value = param.default_value
else:
raise ValueError(f"Required parameter '{param.name}' is missing")
# Skip None values for optional parameters
if value is None:
continue
# Type validation
if param.type == "string":
value = str(value)
if param.max_length and len(value) > param.max_length:
raise ValueError(f"Parameter '{param.name}' exceeds max length {param.max_length}")
elif param.type == "number":
value = float(value)
if param.min_value is not None and value < param.min_value:
raise ValueError(f"Parameter '{param.name}' below minimum {param.min_value}")
if param.max_value is not None and value > param.max_value:
raise ValueError(f"Parameter '{param.name}' above maximum {param.max_value}")
elif param.type == "boolean":
value = bool(value)
elif param.type == "select":
if param.options:
valid_values = [opt["value"] for opt in param.options]
if value not in valid_values:
raise ValueError(f"Parameter '{param.name}' must be one of {valid_values}")
elif param.type == "range":
value = float(value)
if param.min_value is not None and value < param.min_value:
value = param.min_value
if param.max_value is not None and value > param.max_value:
value = param.max_value
validated[param.name] = value
return validated
def delete_bot_config(self, lobby_id: str, bot_name: str) -> bool:
"""Delete bot configuration for a lobby"""
if lobby_id in self.config_cache and bot_name in self.config_cache[lobby_id]:
del self.config_cache[lobby_id][bot_name]
# Clean up empty lobby configs
if not self.config_cache[lobby_id]:
del self.config_cache[lobby_id]
config_file = self._get_config_file(lobby_id)
if config_file.exists():
config_file.unlink()
else:
self._save_lobby_config(lobby_id)
logger.info(f"Deleted config for bot {bot_name} in lobby {lobby_id}")
return True
return False
def delete_lobby_configs(self, lobby_id: str) -> bool:
"""Delete all bot configurations for a lobby"""
if lobby_id in self.config_cache:
del self.config_cache[lobby_id]
config_file = self._get_config_file(lobby_id)
if config_file.exists():
config_file.unlink()
logger.info(f"Deleted all configs for lobby {lobby_id}")
return True
return False
async def notify_bot_config_change(self,
provider_url: str,
bot_name: str,
lobby_id: str,
config: BotLobbyConfig):
"""Notify bot provider of configuration change"""
try:
async with httpx.AsyncClient() as client:
# Notify the bot provider of the configuration change
response = await client.post(
f"{provider_url}/bots/{bot_name}/config",
json={
"lobby_id": lobby_id,
"config_values": config.config_values
},
timeout=10.0
)
if response.status_code == 200:
logger.info(f"Successfully notified bot {bot_name} of config change")
return True
else:
logger.warning(f"Failed to notify bot {bot_name} of config change: HTTP {response.status_code}")
except Exception as e:
logger.error(f"Failed to notify bot {bot_name} of config change: {e}")
return False
def get_statistics(self) -> Dict[str, Any]:
"""Get configuration manager statistics"""
total_configs = sum(len(lobby_configs) for lobby_configs in self.config_cache.values())
return {
"total_lobbies": len(self.config_cache),
"total_configs": total_configs,
"cached_schemas": len(self.schema_cache),
"lobbies": {
lobby_id: len(configs)
for lobby_id, configs in self.config_cache.items()
}
}